Netty4之处理链

Dear 丶 2023-10-11 02:47 101阅读 0赞

本文是基于Netty4.1.x,Handler在Netty占据着很重要的位置,跟Servlet中的filter很像,通过Handler可以完成消息的编解码、拦截指定的消息、统一对日志错误进行处理、统一对请求进行计数。所有的Handler都实现ChannelHandler接口,分为两大类,ChannelInboundHandler与ChannelOutboundHandler,ChannelInboundHandler对从客户端发往服务器的报文进行处理,一般用来执行解码、读取客户端数据、进行业务处理等;ChannelOutboundHandler对从服务器发往客户端的报文进行处理,一般用来进行编码、发送报文到客户端。Netty中,可以注册多个handler。ChannelInboundHandler按照注册的先后顺序执行;ChannelOutboundHandler按照注册的先后顺序逆序执行,如下图所示,按照注册的先后顺序对Handler进行排序,request进入Netty后的执行顺序为:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2FuZ2p1bnFpYW5n_size_16_color_FFFFFF_t_70

从上图可以看出,在入站时,request会依次经过相关的Inbound处理器,然后出站时response也经过相应的Outbound处理器。从上图中可以看出Handler组成了一个链表放在ChannelPipeplie中,下面看看ChannelPipe中的Handler如何组成链表的。下面是一段NettyServer初始化的代码:

  1. public class NettyServer {
  2. public static void main(String[] args) throws Exception {
  3. new NettyServer().start("127.0.0.1", 8081);
  4. }
  5. public void start(String host, int port) throws Exception {
  6. ExecutorService executorService = Executors.newCachedThreadPool();
  7. EventLoopGroup bossGroup = new NioEventLoopGroup(0, executorService);//Boss I/O线程池,用于处理客户端连接,连接建立之后交给work I/O处理
  8. EventLoopGroup workerGroup = new NioEventLoopGroup(0, executorService);//Work I/O线程池
  9. EventExecutorGroup businessGroup = new DefaultEventExecutorGroup(2);//业务线程池
  10. ServerBootstrap server = new ServerBootstrap();//启动类
  11. server.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
  12. .option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {
  13. @Override
  14. protected void initChannel(SocketChannel ch) throws Exception {
  15. ChannelPipeline pipeline = ch.pipeline();
  16. pipeline.addLast("decoder", new StringDecoder());
  17. pipeline.addLast("encoder", new StringEncoder());
  18. pipeline.addLast(businessGroup, new ServerHandler());
  19. }
  20. });
  21. server.childOption(ChannelOption.TCP_NODELAY, true);
  22. server.childOption(ChannelOption.SO_RCVBUF, 32 * 1024);
  23. server.childOption(ChannelOption.SO_SNDBUF, 32 * 1024);
  24. InetSocketAddress addr = new InetSocketAddress(host, port);
  25. server.bind(addr).sync().channel();//启动服务
  26. }
  27. }

从上面看出在initChannel时会调用 pipeline.addLast(“decoder”, new StringDecoder());逐个将Handler加入到Pipeline,下在看看 pipeline.addLast()这个方法:

  1. @Override
  2. public final ChannelPipeline addLast(String name, ChannelHandler handler) {
  3. return addLast(null, name, handler);
  4. }
  5. @Override
  6. public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
  7. final AbstractChannelHandlerContext newCtx;
  8. synchronized (this) {
  9. checkMultiplicity(handler);
  10. newCtx = newContext(group, filterName(name, handler), handler);
  11. addLast0(newCtx);
  12. // If the registered is false it means that the channel was not registered on an eventLoop yet.
  13. // In this case we add the context to the pipeline and add a task that will call
  14. // ChannelHandler.handlerAdded(...) once the channel is registered.
  15. if (!registered) {
  16. newCtx.setAddPending();
  17. callHandlerCallbackLater(newCtx, true);
  18. return this;
  19. }
  20. EventExecutor executor = newCtx.executor();
  21. if (!executor.inEventLoop()) {
  22. callHandlerAddedInEventLoop(newCtx, executor);
  23. return this;
  24. }
  25. }
  26. callHandlerAdded0(newCtx);
  27. return this;
  28. }
  29. private void addLast0(AbstractChannelHandlerContext newCtx) {
  30. AbstractChannelHandlerContext prev = tail.prev;
  31. newCtx.prev = prev;
  32. newCtx.next = tail;
  33. prev.next = newCtx;
  34. tail.prev = newCtx;
  35. }

上面代码片中 addLast()方法中先调用 checkMultiplicity(handler)检查一下是否有重复,然后将handler包装成一个AbstractChannelHandlerContext然后再调用 addLast0(newCtx);将newCtx放入到链表中,从addLast0方法中可以看出,处理链是一个双向链表。到这里处理链的执行逻辑及组成方式就说完了总结一下:

  • 处理链是一个有顺的双向链表放在ChannelPipeline(DefaultChannelPipeline)中,消息上行或下行时依次会被相关的处理器处理;
  • 可以通过 channel.pipeline().addLast()、 channel.pipeline().addFirst()、channel.pipeline().addBefore()等方法向链表的指定位置添加处理器,也可以在处理过程中动态添加或删除处理器;
  • 可以为Handler指定处理的业务线程池;
  • 可以使用相关的api将消息在处理链路中流转,如ctx.fireChannelRead()表示将消息传递到下一个处理器,ctx.channel().pipeline()表示将消息放到pipeline让处理器从第一个处理器开始处理;

发表评论

表情:
评论列表 (有 0 条评论,101人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Netty4启动流程

    etty是基于JAVA NIO的网络应用框架,使用Netty可以迅速的开发网络应用。主要是用在服务端。这篇文章主要是分析Netty4.1.x的启动流程。通过启动流程可以更...

    相关 Netty4编解码

    本文是基于Netty4.1.x,一般在使用Netty作为网络框架进行开发时,编解码框架是我们应该注意的一个重要部分。应用从网络层接收数据需要经过解码(Decode),将二进制的

    相关 Netty4处理

    本文是基于Netty4.1.x,Handler在Netty占据着很重要的位置,跟Servlet中的filter很像,通过Handler可以完成消息的编解码、拦截指定的消息、统一

    相关 Netty4详解三:Netty架构设计

            通过这一篇文章,我们基本上可以了解到Netty所有重要的组件,对Netty有一个全面的认识,这对下一步深入学习Netty是十分重要的,而学完这一章,我们其实已经