详尽Netty(三):Channel

Dear 丶 2023-02-22 13:44 41阅读 0赞

如果大家对java架构相关感兴趣,可以关注下面公众号,会持续更新java基础面试题, netty, spring boot,spring cloud等系列文章,一系列干货随时送达, 超神之路从此展开, BTAJ不再是梦想!

架构殿堂

概念

Channel 是java nio的一个基本构造。

​ 它代表一个到实体(如一个硬件设备,一个文件、一个网络套接字或者一个能够之行一个或者多个不同的I/O操作的程序组件)的开放链接,如读操作和写操作。

可以把Channel 看做是传送(入站)或者传出(出站)数据的载体。可以被打开或者被关闭,链接或者断开连接。

其UML图:
在这里插入图片描述

分类

Channel:是对网络Socket的封装,抽象了网络I/O的读、写、连接与绑定。

AbstractChannel:实现了Channel接口的大部分功能,一次连接用到的Channel、ChannelId、eventLoop、pipelIne、unsafe都会保存在这里。

AbstractNioChannel:通过select的方式对读写事件进行监听。

客户端Channel:主要注册read与write事件,关注于具体数据的读写。

服务端Channel:主要注册accept事件,关注于具体连接的接入,这也是与客户端Channel的read事件最主要的区别。

所有方法,如图:

在这里插入图片描述

Channel重要方法和参数

eventLoop: 返回分配给Channel 的EventLoop
pipeline: 返回分配给Channel 的ChannelPipeline
isActive: 如果Channel 是活动的,则返回true。活动的意义可能依赖于底层的传输。例如,一个Socket 传输一旦连接到了远程节点便是活动的,而一个Datagram 传输一旦被打开便是活动的。
localAddress: 返回本地的SokcetAddress
remoteAddress: 返回远程的SocketAddress
write: 将数据写到远程节点。这个数据将被传递给ChannelPipeline,并且排队直到它被冲刷
flush: 将之前已写的数据冲刷到底层传输,如一个Socket
writeAndFlush: 一个简便的方法,等同于调用write()并接着调用flush()

生命周期状态

ChannelUnregistered :Channel 已经被创建,但还未注册到EventLoop
ChannelRegistered :Channel 已经被注册到了EventLoop
ChannelActive :Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了
ChannelInactive :Channel 没有连接到远程节点
当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给ChannelPipeline 中的ChannelHandler,其可以随后对它们做出响应。

状态的切换:

在这里插入图片描述

相关源码

1. AbstractChannel

  1. public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
  2. // 父 Channel(NioServerSocketChannel 是没有父channel的)
  3. private final Channel parent;
  4. // Channel 唯一ID
  5. private final ChannelId id;
  6. // Unsafe 对象,封装 ByteBuf 的读写操作
  7. private final Unsafe unsafe;
  8. // 关联的 Pipeline 对象
  9. private final DefaultChannelPipeline pipeline;
  10. private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
  11. private final CloseFuture closeFuture = new CloseFuture(this);
  12. // 本地地址
  13. private volatile SocketAddress localAddress;
  14. //远端地址
  15. private volatile SocketAddress remoteAddress;
  16. // EventLoop 封装的 Selector,channel所注册的eventLoop
  17. private volatile EventLoop eventLoop;
  18. // 是否注册
  19. private volatile boolean registered;
  20. private boolean closeInitiated;
  21. /** Cache for the string representation of this channel */
  22. private boolean strValActive;
  23. private String strVal;

构造函数

  1. protected AbstractChannel(Channel parent, ChannelId id) {
  2. this.parent = parent;
  3. this.id = id;
  4. unsafe = newUnsafe();
  5. pipeline = newChannelPipeline();
  6. }
  7. // Unsafe 实现交给子类实现
  8. protected abstract AbstractUnsafe newUnsafe();
  9. // 创建 DefaultChannelPipeline 对象
  10. protected DefaultChannelPipeline newChannelPipeline() {
  11. return new DefaultChannelPipeline(this);
  12. }

说明

1.Unsafe类里实现了具体的连接与写数据。比如:网络的读,写,链路关闭,发起连接等。之所以命名为unsafe是不希望外部使用,并非是不安全的。

2.DefaultChannelPipeline 只是一个 Handler 的容器,也可以理解为一个Handler链,具体的逻辑由Handler处理,而每个Handler都会分配一个EventLoop,最终的请求还是要EventLoop来执行,而EventLoop中又调用Channel中的内部类Unsafe对应的方法。
新建一个channel会自动创建一个ChannelPipeline。

3.这里创建 DefaultChannelPipeline,构造中传入当前的 Channel,而读写数据都是在 ChannelPipeline 中进行的,ChannelPipeline 进行读写数据又委托给 Channel 中的 Unsafe 进行操作。

2.AbstractNioChannel

  1. public abstract class AbstractNioChannel extends AbstractChannel {
  2. // 抽象了 SocketChannel 和 ServerSocketChannel 的公共的父类
  3. //Socketchannle和ServerSocketChannel的公共操作类,用来设置SelectableChannel相关参数和IO操作
  4. private final SelectableChannel ch;
  5. // SelectionKey.OP_READ 读事件
  6. protected final int readInterestOp;
  7. // 注册到 selector 上返回的 selectorKey
  8. volatile SelectionKey selectionKey;
  9. // 是否还有未读的数据
  10. boolean readPending;
  11. private final Runnable clearReadPendingRunnable = new Runnable() {
  12. @Override
  13. public void run() {
  14. clearReadPending0();
  15. }
  16. };
  17. /** * The future of the current connection attempt. If not null, subsequent * connection attempts will fail. */
  18. // 连接操作的结果
  19. private ChannelPromise connectPromise;
  20. // 连接超时定时任务
  21. private ScheduledFuture<?> connectTimeoutFuture;
  22. // 客户端地址
  23. private SocketAddress requestedRemoteAddress;
  24. ....

核心方法:

  1. //核心操作,注册操作
  2. //1) 如果当前注册返回的selectionKey已经被取消,则抛出CancelledKeyException异常,捕获该异常进行处理。
  3. //2) 如果是第一次处理该异常,调用多路复用器的selectNow()方法将已经取消的selectionKey从多路复用器中删除掉。操作成功之后,将selected置为true, 说明之前失效的selectionKey已经被删除掉。继续发起下一次注册操作,如果成功则退出,
  4. //3) 如果仍然发生CancelledKeyException异常,说明我们无法删除已经被取消的selectionKey,按照JDK的API说明,这种意外不应该发生。如果发生这种问题,则说明可能NIO的相关类库存在不可恢复的BUG,直接抛出CancelledKeyException异常到上层进行统一处理。
  5. protected void doRegister() throws Exception {
  6. boolean selected = false;
  7. for (;;) {
  8. try {
  9. selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
  10. return;
  11. } catch (CancelledKeyException e) {
  12. if (!selected) {
  13. // Force the Selector to select now as the "canceled" SelectionKey may still be
  14. // cached and not removed because no Select.select(..) operation was called yet.
  15. eventLoop().selectNow();
  16. selected = true;
  17. } else {
  18. // We forced a select operation on the selector before but the SelectionKey is still cached
  19. // for whatever reason. JDK bug ?
  20. throw e;
  21. }
  22. }
  23. }
  24. }
  25. }

SelectionKey 常量值

  1. public abstract class SelectionKey {
  2. public static final int OP_READ = 1 << 0; //读操作位
  3. public static final int OP_WRITE = 1 << 2; //写操作位
  4. public static final int OP_CONNECT = 1 << 3; //客户端连接操作位
  5. public static final int OP_ACCEPT = 1 << 4; //服务端接受连接操作位
  6. //如果注册的操作位为0表示只是完成注册功能,说明对任何事件都不感兴趣

doBeginRead() 读之前的准备

  1. @Override
  2. protected void doBeginRead() throws Exception {
  3. // Channel.read() or ChannelHandlerContext.read() was called
  4. final SelectionKey selectionKey = this.selectionKey;
  5. if (!selectionKey.isValid()) {
  6. return; //key无效的话直接返回
  7. }
  8. readPending = true; //表示读pending中
  9. final int interestOps = selectionKey.interestOps();
  10. if ((interestOps & readInterestOp) == 0) { //表示当前没有读操作位
  11. selectionKey.interestOps(interestOps | readInterestOp); //设置读操作位
  12. }
  13. }
  14. //SelectionKey中定义的是否可读操作
  15. public final boolean isReadable() {
  16. return (readyOps() & OP_READ) != 0;
  17. }

3.AbstractNioByteChannel

doWrite操作

配置中设置循环次数是避免半包中数据量过大,IO线程一直尝试写操作,此时IO线程无法处理其他IO操作或者定时任务,比如新的消息或者定时任务,如果网络IO慢或者对方读取慢等造成IO线程假死的状态.

  1. @Override
  2. protected void doWrite(ChannelOutboundBuffer in) throws Exception {
  3. int writeSpinCount = -1; // 写自选次数
  4. boolean setOpWrite = false; //写操作位为0
  5. for (;;) {
  6. Object msg = in.current();
  7. if (msg == null) { //从环形数组ChannelOutboundBuffer弹出一条消息,如果为null,表示消息已经发送完成,
  8. // Wrote all messages.
  9. clearOpWrite(); //清除写标志位,退出循环
  10. // Directly return here so incompleteWrite(...) is not called.
  11. return;
  12. }
  13. if (msg instanceof ByteBuf) {
  14. ByteBuf buf = (ByteBuf) msg;
  15. int readableBytes = buf.readableBytes();
  16. if (readableBytes == 0) { //如果可读字节为0,则丢弃该消息,循环处理其他消息
  17. in.remove();
  18. continue;
  19. }
  20. boolean done = false; //消息是否全部发送完毕表示
  21. long flushedAmount = 0; //发送的字节数量
  22. if (writeSpinCount == -1) {
  23. //如果为-1的时候从配置中获取写循环次数
  24. writeSpinCount = config().getWriteSpinCount();
  25. }
  26. for (int i = writeSpinCount - 1; i >= 0; i --) {
  27. int localFlushedAmount = doWriteBytes(buf); //由子类实现写
  28. if (localFlushedAmount == 0) { //这里表示本次发送字节为0,发送TCP缓冲区满了,所以此时为了避免空循环一直发送,这里就将半包写表示设置为true并退出循环
  29. setOpWrite = true;
  30. break;
  31. }
  32. //发送成功就对发送的字节计数
  33. flushedAmount += localFlushedAmount;
  34. if (!buf.isReadable()) { //如果没有可读字节,表示已经发送完毕
  35. done = true; //表示发送完成,并退出循环
  36. break;
  37. }
  38. }
  39. //通知promise当前写的进度
  40. in.progress(flushedAmount);
  41. if (done) { //如果发送完成,移除缓冲的数据
  42. in.remove();
  43. } else {
  44. 如果没有完成会调用incompleteWrite方法
  45. // Break the loop and so incompleteWrite(...) is called.
  46. break;
  47. }
  48. } else if (msg instanceof FileRegion) { //这个是文件传输和上面类似
  49. FileRegion region = (FileRegion) msg;
  50. boolean done = region.transferred() >= region.count();
  51. if (!done) {
  52. long flushedAmount = 0;
  53. if (writeSpinCount == -1) {
  54. writeSpinCount = config().getWriteSpinCount();
  55. }
  56. for (int i = writeSpinCount - 1; i >= 0; i--) {
  57. long localFlushedAmount = doWriteFileRegion(region);
  58. if (localFlushedAmount == 0) {
  59. setOpWrite = true;
  60. break;
  61. }
  62. flushedAmount += localFlushedAmount;
  63. if (region.transferred() >= region.count()) {
  64. done = true;
  65. break;
  66. }
  67. }
  68. in.progress(flushedAmount);
  69. }
  70. if (done) {
  71. in.remove();
  72. } else {
  73. // Break the loop and so incompleteWrite(...) is called.
  74. break;
  75. }
  76. } else {
  77. // Should not reach here.
  78. throw new Error();
  79. }
  80. }
  81. //如果没有完成写看看需要做的事情
  82. incompleteWrite(setOpWrite);
  83. }
  84. //未完成写操作,看看操作
  85. protected final void incompleteWrite(boolean setOpWrite) {
  86. // Did not write completely.
  87. if (setOpWrite) { //如果当前的写操作位true,那么当前多路复用器继续轮询处理
  88. setOpWrite();
  89. } else { //否则重新新建一个task任务,让eventLoop后面点执行flush操作,这样其他任务才能够执行
  90. // Schedule flush again later so other tasks can be picked up in the meantime
  91. Runnable flushTask = this.flushTask;
  92. if (flushTask == null) {
  93. flushTask = this.flushTask = new Runnable() {
  94. @Override
  95. public void run() {
  96. flush();
  97. }
  98. };
  99. }
  100. eventLoop().execute(flushTask);
  101. }
  102. }

4.AbstractNioMessageChannel

doWrite操作

  1. @Override
  2. protected void doWrite(ChannelOutboundBuffer in) throws Exception {
  3. final SelectionKey key = selectionKey();
  4. final int interestOps = key.interestOps();
  5. for (;;) {
  6. Object msg = in.current();
  7. if (msg == null) {
  8. // Wrote all messages.
  9. if ((interestOps & SelectionKey.OP_WRITE) != 0) {
  10. key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
  11. }
  12. break;
  13. }
  14. try {
  15. boolean done = false;
  16. for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
  17. if (doWriteMessage(msg, in)) {
  18. done = true;
  19. break;
  20. }
  21. }
  22. if (done) {
  23. in.remove();
  24. } else {
  25. // Did not write all messages.
  26. if ((interestOps & SelectionKey.OP_WRITE) == 0) {
  27. key.interestOps(interestOps | SelectionKey.OP_WRITE);
  28. }
  29. break;
  30. }
  31. } catch (Exception e) {
  32. if (continueOnWriteError()) {
  33. in.remove(e);
  34. } else {
  35. throw e;
  36. }
  37. }
  38. }
  39. }

AbstractNioMessageChannel 和AbstractNioByteChannel的消息发送实现比较相似,

不同之处在于:一个发送的是ByteBuf或者FileRegion,它们可以直接被发送;另一个发送的则是POJO对象。

如果大家对java架构相关感兴趣,可以关注下面公众号,会持续更新java基础面试题, netty, spring boot,spring cloud等系列文章,一系列干货随时送达, 超神之路从此展开, BTAJ不再是梦想!

架构殿堂

发表评论

表情:
评论列表 (有 0 条评论,41人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Netty剖析之NIO-Channel

    什么是Channel? Channel即通道的意思,NIO的通道类似于流,但有如下区别: 通道可以同时进行读写操作,而流同一时刻只能读或者写 通道可以实现异

    相关 详尽Netty():Channel

    如果大家对java架构相关感兴趣,可以关注下面公众号,会持续更新java基础面试题, netty, spring boot,spring cloud等系列文章,一系列干货随时送

    相关 详尽Netty(一):初探netty

    如果大家对java架构相关感兴趣,可以关注下面公众号,会持续更新java基础面试题, netty, spring boot,spring cloud等系列文章,一系列干货随时送