由浅入深Netty源码分析

╰半夏微凉° 2024-03-17 09:13 161阅读 0赞

目录

  • 1 启动剖析
  • 2 NioEventLoop 剖析
  • 3 accept 剖析
  • 4 read 剖析

在这里插入图片描述

1 启动剖析

我们就来看看 netty 中对下面的代码是怎样进行处理的

  1. //1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
  2. Selector selector = Selector.open();
  3. //2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
  4. NioServerSocketChannel attachment = new NioServerSocketChannel();
  5. //3 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
  6. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  7. serverSocketChannel.configureBlocking(false);
  8. //4 启动 nio boss 线程执行接下来的操作
  9. //5 注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
  10. SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
  11. //6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor
  12. //7 绑定端口
  13. serverSocketChannel.bind(new InetSocketAddress(8080));
  14. //8 触发 channel active 事件,在 head 中关注 op_accept 事件
  15. selectionKey.interestOps(SelectionKey.OP_ACCEPT);

入口 io.netty.bootstrap.ServerBootstrap#bind

关键代码 io.netty.bootstrap.AbstractBootstrap#doBind

  1. private ChannelFuture doBind(final SocketAddress localAddress) {
  2. // 1. 执行初始化和注册 regFuture 会由 initAndRegister 设置其是否完成,从而回调 3.2 处代码
  3. final ChannelFuture regFuture = initAndRegister();
  4. final Channel channel = regFuture.channel();
  5. if (regFuture.cause() != null) {
  6. return regFuture;
  7. }
  8. // 2. 因为是 initAndRegister 异步执行,需要分两种情况来看,调试时也需要通过 suspend 断点类型加以区分
  9. // 2.1 如果已经完成
  10. if (regFuture.isDone()) {
  11. ChannelPromise promise = channel.newPromise();
  12. // 3.1 立刻调用 doBind0
  13. doBind0(regFuture, channel, localAddress, promise);
  14. return promise;
  15. }
  16. // 2.2 还没有完成
  17. else {
  18. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
  19. // 3.2 回调 doBind0
  20. regFuture.addListener(new ChannelFutureListener() {
  21. @Override
  22. public void operationComplete(ChannelFuture future) throws Exception {
  23. Throwable cause = future.cause();
  24. if (cause != null) {
  25. // 处理异常...
  26. promise.setFailure(cause);
  27. } else {
  28. promise.registered();
  29. // 3. 由注册线程去执行 doBind0
  30. doBind0(regFuture, channel, localAddress, promise);
  31. }
  32. }
  33. });
  34. return promise;
  35. }
  36. }

关键代码 io.netty.bootstrap.AbstractBootstrap#initAndRegister

  1. final ChannelFuture initAndRegister() {
  2. Channel channel = null;
  3. try {
  4. channel = channelFactory.newChannel();
  5. // 1.1 初始化 - 做的事就是添加一个初始化器 ChannelInitializer
  6. init(channel);
  7. } catch (Throwable t) {
  8. // 处理异常...
  9. return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
  10. }
  11. // 1.2 注册 - 做的事就是将原生 channel 注册到 selector 上
  12. ChannelFuture regFuture = config().group().register(channel);
  13. if (regFuture.cause() != null) {
  14. // 处理异常...
  15. }
  16. return regFuture;
  17. }

关键代码 io.netty.bootstrap.ServerBootstrap#init

  1. // 这里 channel 实际上是 NioServerSocketChannel
  2. void init(Channel channel) throws Exception {
  3. final Map<ChannelOption<?>, Object> options = options0();
  4. synchronized (options) {
  5. setChannelOptions(channel, options, logger);
  6. }
  7. final Map<AttributeKey<?>, Object> attrs = attrs0();
  8. synchronized (attrs) {
  9. for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
  10. @SuppressWarnings("unchecked")
  11. AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
  12. channel.attr(key).set(e.getValue());
  13. }
  14. }
  15. ChannelPipeline p = channel.pipeline();
  16. final EventLoopGroup currentChildGroup = childGroup;
  17. final ChannelHandler currentChildHandler = childHandler;
  18. final Entry<ChannelOption<?>, Object>[] currentChildOptions;
  19. final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
  20. synchronized (childOptions) {
  21. currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
  22. }
  23. synchronized (childAttrs) {
  24. currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
  25. }
  26. // 为 NioServerSocketChannel 添加初始化器
  27. p.addLast(new ChannelInitializer<Channel>() {
  28. @Override
  29. public void initChannel(final Channel ch) throws Exception {
  30. final ChannelPipeline pipeline = ch.pipeline();
  31. ChannelHandler handler = config.handler();
  32. if (handler != null) {
  33. pipeline.addLast(handler);
  34. }
  35. // 初始化器的职责是将 ServerBootstrapAcceptor 加入至 NioServerSocketChannel
  36. ch.eventLoop().execute(new Runnable() {
  37. @Override
  38. public void run() {
  39. pipeline.addLast(new ServerBootstrapAcceptor(
  40. ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
  41. }
  42. });
  43. }
  44. });
  45. }

关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#register

  1. public final void register(EventLoop eventLoop, final ChannelPromise promise) {
  2. // 一些检查,略...
  3. AbstractChannel.this.eventLoop = eventLoop;
  4. if (eventLoop.inEventLoop()) {
  5. register0(promise);
  6. } else {
  7. try {
  8. // 首次执行 execute 方法时,会启动 nio 线程,之后注册等操作在 nio 线程上执行
  9. // 因为只有一个 NioServerSocketChannel 因此,也只会有一个 boss nio 线程
  10. // 这行代码完成的事实是 main -> nio boss 线程的切换
  11. eventLoop.execute(new Runnable() {
  12. @Override
  13. public void run() {
  14. register0(promise);
  15. }
  16. });
  17. } catch (Throwable t) {
  18. // 日志记录...
  19. closeForcibly();
  20. closeFuture.setClosed();
  21. safeSetFailure(promise, t);
  22. }
  23. }
  24. }

io.netty.channel.AbstractChannel.AbstractUnsafe#register0

  1. private void register0(ChannelPromise promise) {
  2. try {
  3. if (!promise.setUncancellable() || !ensureOpen(promise)) {
  4. return;
  5. }
  6. boolean firstRegistration = neverRegistered;
  7. // 1.2.1 原生的 nio channel 绑定到 selector 上,注意此时没有注册 selector 关注事件,附件为 NioServerSocketChannel
  8. doRegister();
  9. neverRegistered = false;
  10. registered = true;
  11. // 1.2.2 执行 NioServerSocketChannel 初始化器的 initChannel
  12. pipeline.invokeHandlerAddedIfNeeded();
  13. // 回调 3.2 io.netty.bootstrap.AbstractBootstrap#doBind0
  14. safeSetSuccess(promise);
  15. pipeline.fireChannelRegistered();
  16. // 对应 server socket channel 还未绑定,isActive 为 false
  17. if (isActive()) {
  18. if (firstRegistration) {
  19. pipeline.fireChannelActive();
  20. } else if (config().isAutoRead()) {
  21. beginRead();
  22. }
  23. }
  24. } catch (Throwable t) {
  25. // Close the channel directly to avoid FD leak.
  26. closeForcibly();
  27. closeFuture.setClosed();
  28. safeSetFailure(promise, t);
  29. }
  30. }

关键代码 io.netty.channel.ChannelInitializer#initChannel

  1. private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
  2. if (initMap.add(ctx)) {
  3. // Guard against re-entrance.
  4. try {
  5. // 1.2.2.1 执行初始化
  6. initChannel((C) ctx.channel());
  7. } catch (Throwable cause) {
  8. exceptionCaught(ctx, cause);
  9. } finally {
  10. // 1.2.2.2 移除初始化器
  11. ChannelPipeline pipeline = ctx.pipeline();
  12. if (pipeline.context(this) != null) {
  13. pipeline.remove(this);
  14. }
  15. }
  16. return true;
  17. }
  18. return false;
  19. }

关键代码 io.netty.bootstrap.AbstractBootstrap#doBind0

  1. // 3.1 或 3.2 执行 doBind0
  2. private static void doBind0(
  3. final ChannelFuture regFuture, final Channel channel,
  4. final SocketAddress localAddress, final ChannelPromise promise) {
  5. channel.eventLoop().execute(new Runnable() {
  6. @Override
  7. public void run() {
  8. if (regFuture.isSuccess()) {
  9. channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
  10. } else {
  11. promise.setFailure(regFuture.cause());
  12. }
  13. }
  14. });
  15. }

关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#bind

  1. public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
  2. assertEventLoop();
  3. if (!promise.setUncancellable() || !ensureOpen(promise)) {
  4. return;
  5. }
  6. if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
  7. localAddress instanceof InetSocketAddress &&
  8. !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
  9. !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
  10. // 记录日志...
  11. }
  12. boolean wasActive = isActive();
  13. try {
  14. // 3.3 执行端口绑定
  15. doBind(localAddress);
  16. } catch (Throwable t) {
  17. safeSetFailure(promise, t);
  18. closeIfClosed();
  19. return;
  20. }
  21. if (!wasActive && isActive()) {
  22. invokeLater(new Runnable() {
  23. @Override
  24. public void run() {
  25. // 3.4 触发 active 事件
  26. pipeline.fireChannelActive();
  27. }
  28. });
  29. }
  30. safeSetSuccess(promise);
  31. }

3.3 关键代码 io.netty.channel.socket.nio.NioServerSocketChannel#doBind

  1. protected void doBind(SocketAddress localAddress) throws Exception {
  2. if (PlatformDependent.javaVersion() >= 7) {
  3. javaChannel().bind(localAddress, config.getBacklog());
  4. } else {
  5. javaChannel().socket().bind(localAddress, config.getBacklog());
  6. }
  7. }

3.4 关键代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

  1. public void channelActive(ChannelHandlerContext ctx) {
  2. ctx.fireChannelActive();
  3. // 触发 read (NioServerSocketChannel 上的 read 不是读取数据,只是为了触发 channel 的事件注册)
  4. readIfIsAutoRead();
  5. }

关键代码 io.netty.channel.nio.AbstractNioChannel#doBeginRead

  1. protected void doBeginRead() throws Exception {
  2. // Channel.read() or ChannelHandlerContext.read() was called
  3. final SelectionKey selectionKey = this.selectionKey;
  4. if (!selectionKey.isValid()) {
  5. return;
  6. }
  7. readPending = true;
  8. final int interestOps = selectionKey.interestOps();
  9. // readInterestOp 取值是 16,在 NioServerSocketChannel 创建时初始化好,代表关注 accept 事件
  10. if ((interestOps & readInterestOp) == 0) {
  11. selectionKey.interestOps(interestOps | readInterestOp);
  12. }
  13. }

2 NioEventLoop 剖析

NioEventLoop 线程不仅要处理 IO 事件,还要处理 Task(包括普通任务和定时任务),

提交任务代码 io.netty.util.concurrent.SingleThreadEventExecutor#execute

  1. public void execute(Runnable task) {
  2. if (task == null) {
  3. throw new NullPointerException("task");
  4. }
  5. boolean inEventLoop = inEventLoop();
  6. // 添加任务,其中队列使用了 jctools 提供的 mpsc 无锁队列
  7. addTask(task);
  8. if (!inEventLoop) {
  9. // inEventLoop 如果为 false 表示由其它线程来调用 execute,即首次调用,这时需要向 eventLoop 提交首个任务,启动死循环,会执行到下面的 doStartThread
  10. startThread();
  11. if (isShutdown()) {
  12. // 如果已经 shutdown,做拒绝逻辑,代码略...
  13. }
  14. }
  15. if (!addTaskWakesUp && wakesUpForTask(task)) {
  16. // 如果线程由于 IO select 阻塞了,添加的任务的线程需要负责唤醒 NioEventLoop 线程
  17. wakeup(inEventLoop);
  18. }
  19. }

唤醒 select 阻塞线程io.netty.channel.nio.NioEventLoop#wakeup

  1. @Override
  2. protected void wakeup(boolean inEventLoop) {
  3. if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
  4. selector.wakeup();
  5. }
  6. }

启动 EventLoop 主循环 io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread

  1. private void doStartThread() {
  2. assert thread == null;
  3. executor.execute(new Runnable() {
  4. @Override
  5. public void run() {
  6. // 将线程池的当前线程保存在成员变量中,以便后续使用
  7. thread = Thread.currentThread();
  8. if (interrupted) {
  9. thread.interrupt();
  10. }
  11. boolean success = false;
  12. updateLastExecutionTime();
  13. try {
  14. // 调用外部类 SingleThreadEventExecutor 的 run 方法,进入死循环,run 方法见下
  15. SingleThreadEventExecutor.this.run();
  16. success = true;
  17. } catch (Throwable t) {
  18. logger.warn("Unexpected exception from an event executor: ", t);
  19. } finally {
  20. // 清理工作,代码略...
  21. }
  22. }
  23. });
  24. }

io.netty.channel.nio.NioEventLoop#run 主要任务是执行死循环,不断看有没有新任务,有没有 IO 事件

  1. protected void run() {
  2. for (;;) {
  3. try {
  4. try {
  5. // calculateStrategy 的逻辑如下:
  6. // 有任务,会执行一次 selectNow,清除上一次的 wakeup 结果,无论有没有 IO 事件,都会跳过 switch
  7. // 没有任务,会匹配 SelectStrategy.SELECT,看是否应当阻塞
  8. switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
  9. case SelectStrategy.CONTINUE:
  10. continue;
  11. case SelectStrategy.BUSY_WAIT:
  12. case SelectStrategy.SELECT:
  13. // 因为 IO 线程和提交任务线程都有可能执行 wakeup,而 wakeup 属于比较昂贵的操作,因此使用了一个原子布尔对象 wakenUp,它取值为 true 时,表示该由当前线程唤醒
  14. // 进行 select 阻塞,并设置唤醒状态为 false
  15. boolean oldWakenUp = wakenUp.getAndSet(false);
  16. // 如果在这个位置,非 EventLoop 线程抢先将 wakenUp 置为 true,并 wakeup
  17. // 下面的 select 方法不会阻塞
  18. // 等 runAllTasks 处理完成后,到再循环进来这个阶段新增的任务会不会及时执行呢?
  19. // 因为 oldWakenUp 为 true,因此下面的 select 方法就会阻塞,直到超时
  20. // 才能执行,让 select 方法无谓阻塞
  21. select(oldWakenUp);
  22. if (wakenUp.get()) {
  23. selector.wakeup();
  24. }
  25. default:
  26. }
  27. } catch (IOException e) {
  28. rebuildSelector0();
  29. handleLoopException(e);
  30. continue;
  31. }
  32. cancelledKeys = 0;
  33. needsToSelectAgain = false;
  34. // ioRatio 默认是 50
  35. final int ioRatio = this.ioRatio;
  36. if (ioRatio == 100) {
  37. try {
  38. processSelectedKeys();
  39. } finally {
  40. // ioRatio 为 100 时,总是运行完所有非 IO 任务
  41. runAllTasks();
  42. }
  43. } else {
  44. final long ioStartTime = System.nanoTime();
  45. try {
  46. processSelectedKeys();
  47. } finally {
  48. // 记录 io 事件处理耗时
  49. final long ioTime = System.nanoTime() - ioStartTime;
  50. // 运行非 IO 任务,一旦超时会退出 runAllTasks
  51. runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
  52. }
  53. }
  54. } catch (Throwable t) {
  55. handleLoopException(t);
  56. }
  57. try {
  58. if (isShuttingDown()) {
  59. closeAll();
  60. if (confirmShutdown()) {
  61. return;
  62. }
  63. }
  64. } catch (Throwable t) {
  65. handleLoopException(t);
  66. }
  67. }
  68. }

注意

这里有个费解的地方就是 wakeup,它既可以由提交任务的线程来调用(比较好理解),也可以由 EventLoop 线程来调用(比较费解),这里要知道 wakeup 方法的效果:

  • 由非 EventLoop 线程调用,会唤醒当前在执行 select 阻塞的 EventLoop 线程
  • 由 EventLoop 自己调用,会本次的 wakeup 会取消下一次的 select 操作

参考下图

![Image 1][]

io.netty.channel.nio.NioEventLoop#select

  1. private void select(boolean oldWakenUp) throws IOException {
  2. Selector selector = this.selector;
  3. try {
  4. int selectCnt = 0;
  5. long currentTimeNanos = System.nanoTime();
  6. // 计算等待时间
  7. // * 没有 scheduledTask,超时时间为 1s
  8. // * 有 scheduledTask,超时时间为 `下一个定时任务执行时间 - 当前时间`
  9. long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
  10. for (;;) {
  11. long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
  12. // 如果超时,退出循环
  13. if (timeoutMillis <= 0) {
  14. if (selectCnt == 0) {
  15. selector.selectNow();
  16. selectCnt = 1;
  17. }
  18. break;
  19. }
  20. // 如果期间又有 task 退出循环,如果没这个判断,那么任务就会等到下次 select 超时时才能被执行
  21. // wakenUp.compareAndSet(false, true) 是让非 NioEventLoop 不必再执行 wakeup
  22. if (hasTasks() && wakenUp.compareAndSet(false, true)) {
  23. selector.selectNow();
  24. selectCnt = 1;
  25. break;
  26. }
  27. // select 有限时阻塞
  28. // 注意 nio 有 bug,当 bug 出现时,select 方法即使没有时间发生,也不会阻塞住,导致不断空轮询,cpu 占用 100%
  29. int selectedKeys = selector.select(timeoutMillis);
  30. // 计数加 1
  31. selectCnt ++;
  32. // 醒来后,如果有 IO 事件、或是由非 EventLoop 线程唤醒,或者有任务,退出循环
  33. if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
  34. break;
  35. }
  36. if (Thread.interrupted()) {
  37. // 线程被打断,退出循环
  38. // 记录日志
  39. selectCnt = 1;
  40. break;
  41. }
  42. long time = System.nanoTime();
  43. if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
  44. // 如果超时,计数重置为 1,下次循环就会 break
  45. selectCnt = 1;
  46. }
  47. // 计数超过阈值,由 io.netty.selectorAutoRebuildThreshold 指定,默认 512
  48. // 这是为了解决 nio 空轮询 bug
  49. else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
  50. selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
  51. // 重建 selector
  52. selector = selectRebuildSelector(selectCnt);
  53. selectCnt = 1;
  54. break;
  55. }
  56. currentTimeNanos = time;
  57. }
  58. if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
  59. // 记录日志
  60. }
  61. } catch (CancelledKeyException e) {
  62. // 记录日志
  63. }
  64. }

处理 keys io.netty.channel.nio.NioEventLoop#processSelectedKeys

  1. private void processSelectedKeys() {
  2. if (selectedKeys != null) {
  3. // 通过反射将 Selector 实现类中的就绪事件集合替换为 SelectedSelectionKeySet
  4. // SelectedSelectionKeySet 底层为数组实现,可以提高遍历性能(原本为 HashSet)
  5. processSelectedKeysOptimized();
  6. } else {
  7. processSelectedKeysPlain(selector.selectedKeys());
  8. }
  9. }

io.netty.channel.nio.NioEventLoop#processSelectedKey

  1. private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
  2. final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
  3. // 当 key 取消或关闭时会导致这个 key 无效
  4. if (!k.isValid()) {
  5. // 无效时处理...
  6. return;
  7. }
  8. try {
  9. int readyOps = k.readyOps();
  10. // 连接事件
  11. if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
  12. int ops = k.interestOps();
  13. ops &= ~SelectionKey.OP_CONNECT;
  14. k.interestOps(ops);
  15. unsafe.finishConnect();
  16. }
  17. // 可写事件
  18. if ((readyOps & SelectionKey.OP_WRITE) != 0) {
  19. ch.unsafe().forceFlush();
  20. }
  21. // 可读或可接入事件
  22. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
  23. // 如果是可接入 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
  24. // 如果是可读 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
  25. unsafe.read();
  26. }
  27. } catch (CancelledKeyException ignored) {
  28. unsafe.close(unsafe.voidPromise());
  29. }
  30. }

3 accept 剖析

nio 中如下代码,在 netty 中的流程

  1. //1 阻塞直到事件发生
  2. selector.select();
  3. Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  4. while (iter.hasNext()) {
  5. //2 拿到一个事件
  6. SelectionKey key = iter.next();
  7. //3 如果是 accept 事件
  8. if (key.isAcceptable()) {
  9. //4 执行 accept
  10. SocketChannel channel = serverSocketChannel.accept();
  11. channel.configureBlocking(false);
  12. //5 关注 read 事件
  13. channel.register(selector, SelectionKey.OP_READ);
  14. }
  15. // ...
  16. }

先来看可接入事件处理(accept)

io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read

  1. public void read() {
  2. assert eventLoop().inEventLoop();
  3. final ChannelConfig config = config();
  4. final ChannelPipeline pipeline = pipeline();
  5. final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
  6. allocHandle.reset(config);
  7. boolean closed = false;
  8. Throwable exception = null;
  9. try {
  10. try {
  11. do {
  12. // doReadMessages 中执行了 accept 并创建 NioSocketChannel 作为消息放入 readBuf
  13. // readBuf 是一个 ArrayList 用来缓存消息
  14. int localRead = doReadMessages(readBuf);
  15. if (localRead == 0) {
  16. break;
  17. }
  18. if (localRead < 0) {
  19. closed = true;
  20. break;
  21. }
  22. // localRead 为 1,就一条消息,即接收一个客户端连接
  23. allocHandle.incMessagesRead(localRead);
  24. } while (allocHandle.continueReading());
  25. } catch (Throwable t) {
  26. exception = t;
  27. }
  28. int size = readBuf.size();
  29. for (int i = 0; i < size; i ++) {
  30. readPending = false;
  31. // 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理
  32. // io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
  33. pipeline.fireChannelRead(readBuf.get(i));
  34. }
  35. readBuf.clear();
  36. allocHandle.readComplete();
  37. pipeline.fireChannelReadComplete();
  38. if (exception != null) {
  39. closed = closeOnReadError(exception);
  40. pipeline.fireExceptionCaught(exception);
  41. }
  42. if (closed) {
  43. inputShutdown = true;
  44. if (isOpen()) {
  45. close(voidPromise());
  46. }
  47. }
  48. } finally {
  49. if (!readPending && !config.isAutoRead()) {
  50. removeReadOp();
  51. }
  52. }
  53. }

关键代码 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

  1. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  2. // 这时的 msg 是 NioSocketChannel
  3. final Channel child = (Channel) msg;
  4. // NioSocketChannel 添加 childHandler 即初始化器
  5. child.pipeline().addLast(childHandler);
  6. // 设置选项
  7. setChannelOptions(child, childOptions, logger);
  8. for (Entry<AttributeKey<?>, Object> e: childAttrs) {
  9. child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
  10. }
  11. try {
  12. // 注册 NioSocketChannel 到 nio worker 线程,接下来的处理也移交至 nio worker 线程
  13. childGroup.register(child).addListener(new ChannelFutureListener() {
  14. @Override
  15. public void operationComplete(ChannelFuture future) throws Exception {
  16. if (!future.isSuccess()) {
  17. forceClose(child, future.cause());
  18. }
  19. }
  20. });
  21. } catch (Throwable t) {
  22. forceClose(child, t);
  23. }
  24. }

又回到了熟悉的 io.netty.channel.AbstractChannel.AbstractUnsafe#register 方法

  1. public final void register(EventLoop eventLoop, final ChannelPromise promise) {
  2. // 一些检查,略...
  3. AbstractChannel.this.eventLoop = eventLoop;
  4. if (eventLoop.inEventLoop()) {
  5. register0(promise);
  6. } else {
  7. try {
  8. // 这行代码完成的事实是 nio boss -> nio worker 线程的切换
  9. eventLoop.execute(new Runnable() {
  10. @Override
  11. public void run() {
  12. register0(promise);
  13. }
  14. });
  15. } catch (Throwable t) {
  16. // 日志记录...
  17. closeForcibly();
  18. closeFuture.setClosed();
  19. safeSetFailure(promise, t);
  20. }
  21. }
  22. }

io.netty.channel.AbstractChannel.AbstractUnsafe#register0

  1. private void register0(ChannelPromise promise) {
  2. try {
  3. if (!promise.setUncancellable() || !ensureOpen(promise)) {
  4. return;
  5. }
  6. boolean firstRegistration = neverRegistered;
  7. doRegister();
  8. neverRegistered = false;
  9. registered = true;
  10. // 执行初始化器,执行前 pipeline 中只有 head -> 初始化器 -> tail
  11. pipeline.invokeHandlerAddedIfNeeded();
  12. // 执行后就是 head -> logging handler -> my handler -> tail
  13. safeSetSuccess(promise);
  14. pipeline.fireChannelRegistered();
  15. if (isActive()) {
  16. if (firstRegistration) {
  17. // 触发 pipeline 上 active 事件
  18. pipeline.fireChannelActive();
  19. } else if (config().isAutoRead()) {
  20. beginRead();
  21. }
  22. }
  23. } catch (Throwable t) {
  24. closeForcibly();
  25. closeFuture.setClosed();
  26. safeSetFailure(promise, t);
  27. }
  28. }

回到了熟悉的代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

  1. public void channelActive(ChannelHandlerContext ctx) {
  2. ctx.fireChannelActive();
  3. // 触发 read (NioSocketChannel 这里 read,只是为了触发 channel 的事件注册,还未涉及数据读取)
  4. readIfIsAutoRead();
  5. }

io.netty.channel.nio.AbstractNioChannel#doBeginRead

  1. protected void doBeginRead() throws Exception {
  2. // Channel.read() or ChannelHandlerContext.read() was called
  3. final SelectionKey selectionKey = this.selectionKey;
  4. if (!selectionKey.isValid()) {
  5. return;
  6. }
  7. readPending = true;
  8. // 这时候 interestOps 是 0
  9. final int interestOps = selectionKey.interestOps();
  10. if ((interestOps & readInterestOp) == 0) {
  11. // 关注 read 事件
  12. selectionKey.interestOps(interestOps | readInterestOp);
  13. }
  14. }

4 read 剖析

再来看可读事件 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read,注意发送的数据未必能够一次读完,因此会触发多次 nio read 事件,一次事件内会触发多次 pipeline read,一次事件会触发一次 pipeline read complete

  1. public final void read() {
  2. final ChannelConfig config = config();
  3. if (shouldBreakReadReady(config)) {
  4. clearReadPending();
  5. return;
  6. }
  7. final ChannelPipeline pipeline = pipeline();
  8. // io.netty.allocator.type 决定 allocator 的实现
  9. final ByteBufAllocator allocator = config.getAllocator();
  10. // 用来分配 byteBuf,确定单次读取大小
  11. final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
  12. allocHandle.reset(config);
  13. ByteBuf byteBuf = null;
  14. boolean close = false;
  15. try {
  16. do {
  17. byteBuf = allocHandle.allocate(allocator);
  18. // 读取
  19. allocHandle.lastBytesRead(doReadBytes(byteBuf));
  20. if (allocHandle.lastBytesRead() <= 0) {
  21. byteBuf.release();
  22. byteBuf = null;
  23. close = allocHandle.lastBytesRead() < 0;
  24. if (close) {
  25. readPending = false;
  26. }
  27. break;
  28. }
  29. allocHandle.incMessagesRead(1);
  30. readPending = false;
  31. // 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理 NioSocketChannel 上的 handler
  32. pipeline.fireChannelRead(byteBuf);
  33. byteBuf = null;
  34. }
  35. // 是否要继续循环
  36. while (allocHandle.continueReading());
  37. allocHandle.readComplete();
  38. // 触发 read complete 事件
  39. pipeline.fireChannelReadComplete();
  40. if (close) {
  41. closeOnRead(pipeline);
  42. }
  43. } catch (Throwable t) {
  44. handleReadException(pipeline, byteBuf, t, close, allocHandle);
  45. } finally {
  46. if (!readPending && !config.isAutoRead()) {
  47. removeReadOp();
  48. }
  49. }
  50. }

io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)

  1. public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
  2. return
  3. // 一般为 true
  4. config.isAutoRead() &&
  5. // respectMaybeMoreData 默认为 true
  6. // maybeMoreDataSupplier 的逻辑是如果预期读取字节与实际读取字节相等,返回 true
  7. (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
  8. // 小于最大次数,maxMessagePerRead 默认 16
  9. totalMessages < maxMessagePerRead &&
  10. // 实际读到了数据
  11. totalBytesRead > 0;
  12. }

[Image 1]:

发表评论

表情:
评论列表 (有 0 条评论,161人围观)

还没有评论,来说两句吧...

相关阅读