netty中的ChannelHandler

冷不防 2022-06-06 14:39 359阅读 0赞

原创 2017年01月16日 16:38:21

  • 2866

本文主要介绍以下几个相关类:
• ChannelPipeline
• ChannelHandlerContext
• ChannelHandler
• Inbound vs outbound(入站和出站)

接受客户端的连接和创建连接只是应用程序中的一步,更加重要的还是处理传入传出的数据。netty提供了强大的事件处理机制,允许用户自定义ChannelHandler的实现来处理数据。

1.ChannelPipeline

ChannelPipeline是ChannelHandler实例的列表(或则说是容器),用于处理或截获通道的接收和发送数据。ChannelPipeline提供了一种高级的截取过滤器模式,让用户可以在ChannelPipeline中完全控制一个事件及如何处理ChannelHandler与ChannelPipeline的交互。

可以这样说,一个新的通道就对应一个新的ChannelPipeline并附加至通道。一旦连接,通道Channel和ChannelPipeline之间的耦合是永久性的。通道Channel不能附加其他的ChannelPipeline或从ChannelPipeline分离。

下图显示了ChannelHandler在ChannelPipeline中的IO处理示意图:
这里写图片描述

很明显,ChannelPipeline里面就是一个ChannelHandler的列表。如果一个入站IO事件被触发,这个事件会从第一个开始依次通过ChannelPipeline中的ChannelHandler。若是一个入站I/O事件,则会从最后一个开始依次通过ChannelPipeline中的ChannelHandler。ChannelHandler可以处理事件并检查类型,如果某个ChannelHandler不能处理则会跳过,并将事件传递到下一个ChannelHandler。ChannelPipeline可以动态添加、删除、替换其中的ChannelHandler,这样的机制可以提高灵活性。

修改ChannelPipeline的方法:
• addFirst(…),添加ChannelHandler在ChannelPipeline的第一个位置
• addBefore(…),在ChannelPipeline中指定的ChannelHandler名称之前添加ChannelHandler
• addAfter(…),在ChannelPipeline中指定的ChannelHandler名称之后添加ChannelHandler
• addLast(ChannelHandler…),在ChannelPipeline的末尾添加ChannelHandler
• remove(…),删除ChannelPipeline中指定的ChannelHandler
• replace(…),替换ChannelPipeline中指定的ChannelHandler

比如:

  1. ChannelPipeline pipeline = ch.pipeline();
  2. FirstHandler firstHandler = new FirstHandler();
  3. pipeline.addLast("handler1", firstHandler);
  4. pipeline.addFirst("handler2", new SecondHandler());
  5. pipeline.addLast("handler3", new ThirdHandler());
  6. pipeline.remove("“handler3“");
  7. pipeline.remove(firstHandler);
  8. pipeline.replace("handler2", "handler4", new FourthHandler());
  9. 1
  10. 2
  11. 3
  12. 4
  13. 5
  14. 6
  15. 7
  16. 8

被添加到ChannelPipeline的ChannelHandler将通过IO-Thread处理事件,这意味了必须不能有其他的IO-Thread阻塞来影响IO的整体处理;有时候可能需要阻塞,例如JDBC。因此netty允许通过一个EventExecutorGroup到每一个ChannelPipeline.add*方法,自定义的事件会被包含在EventExecutorGroup中的EventExecutor来处理,默认的实现是DefaultEventExecutorGroup。

2.ChannelHandlerContext

每个ChannelHandler被添加到ChannelPipeline后,都会创建一个ChannelHandlerContext并与之创建的ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler实现进行交互。ChannelHandlerContext不会改变添加到其中的ChannelHandler,因此它是安全的。

下图显示了ChannelHandlerContext、ChannelHandler、ChannelPipeline的关系:
这里写图片描述

ChannelHandlerContext可以通知下一个ChannelHandler

如果我们想有一些事件流全部通过ChannelPipeline,有两个不同的方法可以做到:
• 调用Channel的方法
• 调用ChannelPipeline的方法

这两个方法都可以让事件流全部通过ChannelPipeline。无论从头部还是尾部开始,因为它主要依赖于事件的性质。如果是一个“入站”事件,它开始于头部;若是一个“出站”事件,则开始于尾部。

下面的代码显示了一个写事件如何通过ChannelPipeline从尾部开始:

  1. protected void initChannel(SocketChannel ch) throws Exception {
  2. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
  3. @Override
  4. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  5. //Event via Channel
  6. Channel channel = ctx.channel();
  7. channel.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8));
  8. //Event via ChannelPipeline
  9. ChannelPipeline pipeline = ctx.pipeline();
  10. pipeline.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8));
  11. }
  12. });
  13. }
  14. 1
  15. 2
  16. 3
  17. 4
  18. 5
  19. 6
  20. 7
  21. 8
  22. 9
  23. 10
  24. 11
  25. 12
  26. 13
  27. 14

下图表示通过Channel或ChannelPipeline的通知:
这里写图片描述

ChannelHandlerContext可以修改ChannelPipeline

调用ChannelHandlerContext的pipeline()方法能访问ChannelPipeline,能在运行时动态的增加、删除、替换ChannelPipeline中的ChannelHandler。我们可以保持ChannelHandlerContext以供以后使用,如外部Handler方法触发一个事件,甚至从一个不同的线程触发。
下面代码显示了保存ChannelHandlerContext供之后使用或其他线程使用:

  1. public class WriteHandler extends ChannelHandlerAdapter {
  2. private ChannelHandlerContext ctx;
  3. @Override
  4. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  5. this.ctx = ctx;
  6. }
  7. public void send(String msg){
  8. ctx.write(msg);
  9. }
  10. }
  11. 1
  12. 2
  13. 3
  14. 4
  15. 5
  16. 6
  17. 7
  18. 8
  19. 9
  20. 10
  21. 11
  22. 12

请注意,ChannelHandler实例如果带有@Sharable注解则可以被添加到多个ChannelPipeline。也就是说单个ChannelHandler实例可以有多个ChannelHandlerContext,因此可以调用不同ChannelHandlerContext获取同一个ChannelHandler。如果添加不带@Sharable注解的ChannelHandler实例到多个ChannelPipeline则会抛出异常;使用@Sharable注解后的ChannelHandler必须在不同的线程和不同的通道上安全使用。怎么是不安全的使用?看下面代码:

  1. @Sharable
  2. public class NotSharableHandler extends ChannelInboundHandlerAdapter {
  3. private int count;
  4. @Override
  5. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  6. count++;
  7. System.out.println("channelRead(...) called the " + count + " time“");
  8. ctx.fireChannelRead(msg);
  9. }
  10. }
  11. 1
  12. 2
  13. 3
  14. 4
  15. 5
  16. 6
  17. 7
  18. 8
  19. 9
  20. 10
  21. 11
  22. 12
  23. 13

上面是一个带@Sharable注解的Handler,它被多个线程使用时,里面count是不安全的,会导致count值错误。
为什么要共享ChannelHandler?使用@Sharable注解共享一个ChannelHandler在一些需求中还是有很好的作用的,如使用一个ChannelHandler来统计连接数或来处理一些全局数据等等。

3. channel的状态模型

Netty有一个简单但强大的状态模型,并完美映射到ChannelInboundHandler的各个方法。下面是Channel生命周期四个不同的状态:

  1. channelUnregistered
  2. channelRegistered
  3. channelActive
  4. channelInactive

Channel的状态在其生命周期中变化,因为状态变化需要触发,下图显示了Channel状态变化:
这里写图片描述

4、ChannelHandler和其子类

先看一些类的继承图:
这里写图片描述

1、ChannelHandler中的方法

  1. Netty定义了良好的类型层次结构来表示不同的处理程序类型,所有的类型的父类是ChannelHandlerChannelHandler提供了在其生命周期内添加或从ChannelPipeline中删除的方法。
  2. 1
  3. 2
  1. handlerAdded,ChannelHandler添加到实际上下文中准备处理事件
  2. handlerRemoved,将ChannelHandler从实际上下文中删除,不再处理事件
  3. exceptionCaught,处理抛出的异常

netty还提供了一个实现了ChannelHandler的抽象类ChannelHandlerAdapter。ChannelHandlerAdapter实现了父类的所有方法,基本上就是传递事件到ChannelPipeline中的下一个ChannelHandler直到结束。我们也可以直接继承于ChannelHandlerAdapter,然后重写里面的方法。

2、ChannelInboundHandler

ChannelInboundHandler提供了一些方法再接收数据或Channel状态改变时被调用。下面是ChannelInboundHandler的一些方法:

  1. channelRegistered,ChannelHandlerContext的Channel被注册到EventLoop;
  2. channelUnregistered,ChannelHandlerContext的Channel从EventLoop中注销
  3. channelActive,ChannelHandlerContext的Channel已激活
  4. channelInactive,ChannelHanderContxt的Channel结束生命周期
  5. channelRead,从当前Channel的对端读取消息
  6. channelReadComplete,消息读取完成后执行
  7. userEventTriggered,一个用户事件被处罚
  8. channelWritabilityChanged,改变通道的可写状态,可以使用Channel.isWritable()检查
  9. exceptionCaught,重写父类ChannelHandler的方法,处理异常

netty提供了一个实现了ChannelInboundHandler接口并继承ChannelHandlerAdapter的类:ChannelInboundHandlerAdapter。ChannelInboundHandlerAdapter实现了ChannelInboundHandler的所有方法,作用就是处理消息并将消息转发到ChannelPipeline中的下一个ChannelHandler。ChannelInboundHandlerAdapter的channelRead方法处理完消息后不会自动释放消息,若想自动释放收到的消息,可以使用SimpleChannelInboundHandler。

看下面的代码:

  1. /** * 实现ChannelInboundHandlerAdapter的Handler,不会自动释放接收的消息对象 * @author c.k * */
  2. public class DiscardHandler extends ChannelInboundHandlerAdapter {
  3. @Override
  4. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  5. //手动释放消息
  6. ReferenceCountUtil.release(msg);
  7. }
  8. }
  9. 1
  10. 2
  11. 3
  12. 4
  13. 5
  14. 6
  15. 7
  16. 8
  17. 9
  18. 10
  19. 11
  20. 12

SimpleChannelInboundHandler会自动释放消息

  1. /** * 继承SimpleChannelInboundHandler,会自动释放消息对象 * @author c.k * */
  2. public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {
  3. @Override
  4. protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
  5. //不需要手动释放
  6. }
  7. }
  8. 1
  9. 2
  10. 3
  11. 4
  12. 5
  13. 6
  14. 7
  15. 8
  16. 9
  17. 10
  18. 11

ChannelInitializer用来初始化ChannelHandler,将自定义的各种ChannelHandler添加到ChannelPipeline中。

发表评论

表情:
评论列表 (有 0 条评论,359人围观)

还没有评论,来说两句吧...

相关阅读