Netty入门及实现简单通信框架

电玩女神 2022-09-05 12:58 190阅读 0赞

1. Netty是什么

Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。
说白了,Netty就是一个java高性能的网络通信框架,它运用了操作系统底层提供的NIO通信模型,实现了高性能的网络通信,目前已经被广泛运用到各种中间件中。

2.快速入门

这里直接上服务端代码,netty的代码层次简单易懂,是一个很典型的建造者模式

  1. public static void main(String[] args) throws Exception {
  2. EventLoopGroup workerGroup = new NioEventLoopGroup();
  3. EventLoopGroup bossGroup = new NioEventLoopGroup(2);
  4. try {
  5. ServerBootstrap b = new ServerBootstrap();
  6. b.group(bossGroup,workerGroup)
  7. .channel(NioServerSocketChannel.class)
  8. //处理channel
  9. .childHandler(new ChannelInitializer<SocketChannel>() {
  10. @Override
  11. public void initChannel(SocketChannel ch) throws Exception {
  12. ChannelPipeline p = ch.pipeline();
  13. p.addLast(new IdleHandler());
  14. p.addLast(new LoggingHandler(LogLevel.INFO));
  15. //消息解码
  16. p.addLast(new MyEncoder());
  17. p.addLast(new MyDecoder(1024, 4, 4, 0, 0));
  18. //业务处理
  19. p.addLast(new ServerSingleHandler());
  20. p.addLast(new ServerGroupHandler());
  21. }
  22. });
  23. //绑定端口
  24. ChannelFuture f = b.bind(8090).sync();
  25. f.channel().closeFuture().sync();
  26. } finally {
  27. //处理完剩余请求再关闭
  28. workerGroup.shutdownGracefully();
  29. }
  30. }

客户端的编码基本大同小异

  1. public static void main(String[] args) throws Exception {
  2. // Configure the client.
  3. EventLoopGroup group = new NioEventLoopGroup();
  4. try {
  5. Bootstrap b = new Bootstrap();
  6. b.group(group)
  7. .channel(NioSocketChannel.class)
  8. .option(ChannelOption.TCP_NODELAY, true)
  9. .option(ChannelOption.SO_KEEPALIVE,true)
  10. .handler(new ChannelInitializer<SocketChannel>() {
  11. @Override
  12. public void initChannel(SocketChannel ch) throws Exception {
  13. ChannelPipeline p = ch.pipeline();
  14. p.addLast(new LoggingHandler(LogLevel.INFO));
  15. p.addLast(new MyEncoder());
  16. p.addLast(new MyDecoder(1024, 4, 4, 0, 0));
  17. p.addLast(new ClientHandler());
  18. }
  19. });
  20. // Start the client.
  21. ChannelFuture f = b.connect("127.0.0.1", 8090).sync();
  22. // Wait until the connection is closed.
  23. f.channel().closeFuture().sync();
  24. } finally {
  25. // Shut down the event loop to terminate all threads.
  26. group.shutdownGracefully();
  27. }
  28. }

3.Netty的Handler责任链模式

handler责任链
在上面代码可以看到 p.addLast()这个方法,这里就是按照前后顺序添加逻辑处理的handler,多个handler会构造成一条pipeline,也就是一条逻辑处理的责任链,对传入的信息进行顺序处理,而handler也分为inboundHandler和outboundHandler,因此在服务器初始化的时候,我们可以很轻松直白的完成出入信息的逻辑处理流程编写

4.编码和解码器

实现自定义的解码器只需要继承netty框架中自带的encoder/decoder即可,netty自带的简单编码解码器有如下几种:

固定长度的解码器FixedLengthFrameDecoder
行解码器LineBasedFrameDecoder
基于分隔符的解码器DelimiterBasedFrameDecoder
基于长度域的解码器LengthFieldBasedFrameDecoder
自定义格式编码器MessageToByteEncoder

此外还有多种自带的更复杂协议解码器,如redis、protobuf、json等
我们就以上面例子用到的编码解码器为例子,看看如何使用这些自带的类

这里我们使用 标志头(一个int)+body长度(一个int)+body实体的格式构造

  1. public class MyEncoder extends MessageToByteEncoder<MyProtocol> {
  2. //重写encode方法即可
  3. @Override
  4. protected void encode(ChannelHandlerContext channelHandlerContext, MyProtocol myProtocol, ByteBuf byteBuf) {
  5. byteBuf.writeInt(MyProtocol.header);//0x76
  6. byteBuf.writeInt(myProtocol.getContentLength());
  7. byteBuf.writeBytes(myProtocol.getContent());
  8. }
  9. }

当我们已经知道消息体格式后,剩下的事情就是通过自定义的decoder把消息体解析出来,这里由于有body长度因此适用基于长度域的解码器LengthFieldBasedFrameDecoder。
这时候,我们就可以配合起来看上面例子中解码器的参数了。
在这里插入图片描述
根据设置,我们消息体最长为1024字节,body长度字段偏移为4字节(因为一开始的4字节是标识数0
x76),body长度字段为4字节(一个int)。也就是说,消息体header长度为8字节,解析header中的body长度后可以精确截取body,这样我们一个完整的消息体就被解析出来了。

!注意,后续业务逻辑还要注意编码格式。例如:MessageEntity messageEntity =
GsonUtil.GsonToBean(new String(myProtocol.getContent(), “UTF-8”),
MessageEntity.class);

解码器代码如下:

  1. public class MyDecoder extends LengthFieldBasedFrameDecoder {
  2. public MyDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
  3. super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
  4. }
  5. @Override
  6. protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
  7. //在这里调用父类的方法,实现指得到想要的部分,我在这里全部都要,也可以只要body部分
  8. in = (ByteBuf) super.decode(ctx, in);
  9. if (in == null) {
  10. return null;
  11. }
  12. int header = in.readInt();
  13. if (header != MyProtocol.header) {
  14. throw new Exception("无法识别协议");
  15. }
  16. //读取length字段
  17. int length = in.readInt();
  18. if (in.readableBytes() != length) {
  19. throw new Exception("标记的长度不符合实际长度");
  20. }
  21. //读取body
  22. byte[] bytes = new byte[length];
  23. in.readBytes(bytes);
  24. return new MyProtocol(length, bytes);
  25. }
  26. }

附:demo代码

发表评论

表情:
评论列表 (有 0 条评论,190人围观)

还没有评论,来说两句吧...

相关阅读

    相关 基于Netty实现简单的RPC框架

            Dubbo采用Netty作为基础通信组件,模仿Dubbo实现简单版的RPC框架。服务消费者和服务提供者约定接口和协议,用于远程调用和TCP请求验证。服务提供者作