Netty4源码再次分析

野性酷女 2022-04-18 05:24 287阅读 0赞

先上个demo,好顺着往里跟代码

  1. public class Netty4Hello {
  2. /** * 服务端监听的端口地址 */
  3. private static final int portNumber = 7878;
  4. public static void main(String[] args) throws InterruptedException {
  5. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  6. EventLoopGroup workerGroup = new NioEventLoopGroup();
  7. try {
  8. ServerBootstrap b = new ServerBootstrap();
  9. b.group(bossGroup, workerGroup);
  10. b.channel(NioServerSocketChannel.class);
  11. b.childHandler(new ChannelInitializer<SocketChannel>() {
  12. @Override
  13. protected void initChannel(SocketChannel ch) throws Exception {
  14. ChannelPipeline pipeline = ch.pipeline();
  15. // 以("\n")为结尾分割的 解码器
  16. pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
  17. // 字符串解码 和 编码
  18. pipeline.addLast("decoder", new StringDecoder());
  19. pipeline.addLast("encoder", new StringEncoder());
  20. // 自己的逻辑Handler
  21. pipeline.addLast("handler", new HelloServerHandler());
  22. }
  23. });
  24. // 服务器绑定端口监听
  25. ChannelFuture f = b.bind(portNumber).sync();
  26. // 监听服务器关闭监听
  27. f.channel().closeFuture().sync();
  28. // 可以简写为
  29. /* b.bind(portNumber).sync().channel().closeFuture().sync(); */
  30. } finally {
  31. bossGroup.shutdownGracefully();
  32. workerGroup.shutdownGracefully();
  33. }
  34. }
  35. }

NioSocketChannel 的初始化

在 Netty 中, Channel 是一个 Socket 的抽象, 它为用户提供了关于 Socket 状态(是否是连接还是断开) 以及对 Socket 的读写等操作. 每当 Netty 建立了一个连接后, 都会有一个对应的 Channel 实例.

  1. io.netty.bootstrap.AbstractBootstrap#channel
  2. >io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.channel.ChannelFactory<? extends C>)
  3. >构造io.netty.channel.ReflectiveChannelFactory

Channel 实例化

Channel 是通过工厂方法 ChannelFactory.newChannel() 来实例化的, 那么 ChannelFactory.newChannel() 方法在哪里调用呢?
跟踪调用

  1. io.netty.bootstrap.Bootstrap#connect(java.net.SocketAddress, java.net.SocketAddress)
  2. >io.netty.bootstrap.Bootstrap#doResolveAndConnect
  3. >io.netty.bootstrap.AbstractBootstrap#initAndRegister
  4. >io.netty.bootstrap.ChannelFactory#newChannel

在 newChannel 中, 通过类对象的 newInstance 来获取一个新 Channel 实例, 因而会调用NioSocketChannel 的默认构造器.

NioServerSocketChannel

  1. io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel()
  2. >io.netty.channel.socket.nio.NioServerSocketChannel#newSocket
  3. >java.nio.channels.spi.SelectorProvider#openServerSocketChannel

NioEventLoopGroup构造

类结构图
在这里插入图片描述

NioEventLoop 有几个重载的构造器, 不过内容都没有什么大的区别, 最终都是调用的父类MultithreadEventLoopGroup构造器:

  1. io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
  2. >io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
  3. >io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
  4. >executor netty实现的线程池管理 io.netty.util.concurrent.ThreadPerTaskExecutor
  5. >选择策略io.netty.util.concurrent.DefaultEventExecutorChooserFactory
  6. >拒绝策略io.netty.util.concurrent.RejectedExecutionHandlers
  7. // 大小为 nThreads NioEventLoop
  8. >children = new EventExecutor[nThreads];
  9. >children[i] = newChild(executor, args)
  10. >io.netty.channel.nio.NioEventLoopGroup#newChild

NioEventLoop解析

类图
在这里插入图片描述

NioEventLoop 继承于 SingleThreadEventLoop, 而 SingleThreadEventLoop 又继承于 SingleThreadEventExecutor. SingleThreadEventExecutor 是 Netty 中对本地线程的抽象, 它内部有一个 Thread thread 属性, 存储了一个本地 Java 线程. 因此我们可以认为, 一个 NioEventLoop 其实和一个特定的线程绑定, 并且在其生命周期内, 绑定的线程都不会再改变.

  • 在 AbstractScheduledEventExecutor 中, Netty 实现了 NioEventLoop 的 schedule 功能, 即我们可以通过调用一个 NioEventLoop 实例的 schedule 方法来运行一些定时任务.
  • 在 SingleThreadEventLoop 中, 又实现了任务队列的功能, 通过它, 我们可以调用一个 NioEventLoop 实例的 execute 方法来向任务队列中添加一个 task, 并由 NioEventLoop 进行调度执行.
    在此方法的一开始调用的 fetchFromScheduledTaskQueue() 其实就是将 scheduledTaskQueue 中已经可以执行的(即定时时间已到的 schedule 任务) 拿出来并添加到 taskQueue 中, 作为可执行的 task 等待被调度执行.

NioEventLoop 肩负着两种任务,

  • 第一个是作为 IO 线程, 执行与 Channel 相关的 IO 操作, 包括 调用 select 等待就绪的 IO
    事件、读写数据与数据的处理等;
  • 第二个任务是作为任务队列, 执行 taskQueue 中的任务, 例如用户调用 eventLoop.schedule
    提交的定时任务
    也是这个线程执行的.

构造方法

  1. io.netty.channel.nio.NioEventLoop#NioEventLoop
  2. >io.netty.channel.SingleThreadEventLoop#SingleThreadEventLoop(io.netty.channel.EventLoopGroup, java.util.concurrent.Executor, boolean, int, io.netty.util.concurrent.RejectedExecutionHandler)
  3. >io.netty.util.concurrent.SingleThreadEventExecutor#SingleThreadEventExecutor(io.netty.util.concurrent.EventExecutorGroup, java.util.concurrent.Executor, boolean, int, io.netty.util.concurrent.RejectedExecutionHandler)

EventLoop 与 Channel 的关联

Netty 中, 每个 Channel 都有且仅有一个 EventLoop 与之关联,

在这里插入图片描述

从上图中我们可以看到, 当调用了 AbstractChannel#AbstractUnsafe.register 后, 就完成了 Channel 和 EventLoop 的关联. register 实现如下:

  1. @Override
  2. public final void register(EventLoop eventLoop, final ChannelPromise promise) {
  3. // 删除条件检查.
  4. ...
  5. AbstractChannel.this.eventLoop = eventLoop;
  6. if (eventLoop.inEventLoop()) {
  7. register0(promise);
  8. } else {
  9. try {
  10. eventLoop.execute(new OneTimeTask() {
  11. @Override
  12. public void run() {
  13. register0(promise);
  14. }
  15. });
  16. } catch (Throwable t) {
  17. ...
  18. }
  19. }
  20. }

在 AbstractChannel#AbstractUnsafe.register 中, 会将一个 EventLoop 赋值给 AbstractChannel 内部的 eventLoop 字段, 到这里就完成了 EventLoop 与 Channel 的关联过程.

什么时候启动

EventLoop 与 Channel 的关联完成后,就会调用eventLoop.execute

  1. // 向任务队列中添加一个 task
  2. io.netty.util.concurrent.SingleThreadEventExecutor#execute
  3. >io.netty.util.concurrent.SingleThreadEventExecutor#startThread
  4. >io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
  5. >java.util.concurrent.Executor#execute 实现类io.netty.util.concurrent.ThreadPerTaskExecutor

NioEventLoop run方法

上面了解到,当 EventLoop.execute 第一次被调用时, 就会触发 startThread() 的调用, 进而导致了 EventLoop 所对应的 Java 线程的启动.
run 方法可以说是十分简单, 主要就是调用了 SingleThreadEventExecutor.this.run() 方法. 而 SingleThreadEventExecutor.run() 是一个抽象方法, 它的实现在 NioEventLoop 中.

  1. @Override
  2. protected void run() {
  3. for (;;) {
  4. try {
  5. switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
  6. case SelectStrategy.CONTINUE:
  7. continue;
  8. case SelectStrategy.BUSY_WAIT:
  9. // fall-through to SELECT since the busy-wait is not supported with NIO
  10. case SelectStrategy.SELECT:
  11. // 关键1:select事件
  12. select(wakenUp.getAndSet(false));
  13. if (wakenUp.get()) {
  14. selector.wakeup();
  15. }
  16. // fall through
  17. default:
  18. }
  19. cancelledKeys = 0;
  20. needsToSelectAgain = false;
  21. // 有意思点:ioRatio 默认是 50, 则表示 IO 操作和执行 task 的所占用的线程执行时间比是 1 : 1.
  22. final int ioRatio = this.ioRatio;
  23. if (ioRatio == 100) {
  24. try {
  25. processSelectedKeys();
  26. } finally {
  27. // Ensure we always run tasks.
  28. runAllTasks();
  29. }
  30. } else {
  31. final long ioStartTime = System.nanoTime();
  32. try {
  33. // 迭代 selectedKeys 获取就绪的 IO 事件
  34. processSelectedKeys();
  35. } finally {
  36. // 处理IO的时间,按比例处理任务
  37. final long ioTime = System.nanoTime() - ioStartTime;
  38. // 有限时间内执行taskQueue任务
  39. runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
  40. }
  41. }
  42. } catch (Throwable t) {
  43. handleLoopException(t);
  44. }
  45. // Always handle shutdown even if the loop processing threw an exception.
  46. try {
  47. if (isShuttingDown()) {
  48. closeAll();
  49. if (confirmShutdown()) {
  50. return;
  51. }
  52. }
  53. } catch (Throwable t) {
  54. handleLoopException(t);
  55. }
  56. }
  57. }
  58. io.netty.channel.nio.NioEventLoop#run
  59. >io.netty.channel.nio.NioEventLoop#select
  60. >

分析select方法
2个关键步骤,见下代码

  1. private void select(boolean oldWakenUp) throws IOException {
  2. Selector selector = this.selector;
  3. try {
  4. int selectCnt = 0;
  5. long currentTimeNanos = System.nanoTime();
  6. long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
  7. for (;;) {
  8. long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
  9. if (timeoutMillis <= 0) {
  10. if (selectCnt == 0) {
  11. selector.selectNow();
  12. selectCnt = 1;
  13. }
  14. break;
  15. }
  16. // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
  17. // Selector#wakeup. So we need to check task queue again before executing select operation.
  18. // If we don't, the task might be pended until select operation was timed out.
  19. // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
  20. // 关键步骤1:有任务则执行selectNow(不阻塞),将wakenUp CAS设置为true
  21. if (hasTasks() && wakenUp.compareAndSet(false, true)) {
  22. selector.selectNow();
  23. selectCnt = 1;
  24. break;
  25. }
  26. // 关键步骤2:没有任务,则执行select (阻塞超时时间)
  27. int selectedKeys = selector.select(timeoutMillis);
  28. selectCnt ++;
  29. if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
  30. // - Selected something,
  31. // - waken up by user, or
  32. // - the task queue has a pending task.
  33. // - a scheduled task is ready for processing
  34. break;
  35. }
  36. if (Thread.interrupted()) {
  37. // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
  38. // As this is most likely a bug in the handler of the user or it's client library we will
  39. // also log it.
  40. //
  41. // See https://github.com/netty/netty/issues/2426
  42. if (logger.isDebugEnabled()) {
  43. logger.debug("Selector.select() returned prematurely because " +
  44. "Thread.currentThread().interrupt() was called. Use " +
  45. "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
  46. }
  47. selectCnt = 1;
  48. break;
  49. }
  50. long time = System.nanoTime();
  51. if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
  52. // timeoutMillis elapsed without anything selected.
  53. selectCnt = 1;
  54. } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
  55. selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
  56. // The selector returned prematurely many times in a row.
  57. // Rebuild the selector to work around the problem.
  58. logger.warn(
  59. "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
  60. selectCnt, selector);
  61. rebuildSelector();
  62. selector = this.selector;
  63. // Select again to populate selectedKeys.
  64. selector.selectNow();
  65. selectCnt = 1;
  66. break;
  67. }
  68. currentTimeNanos = time;
  69. }
  70. if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
  71. if (logger.isDebugEnabled()) {
  72. logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
  73. selectCnt - 1, selector);
  74. }
  75. }
  76. } catch (CancelledKeyException e) {
  77. if (logger.isDebugEnabled()) {
  78. logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
  79. selector, e);
  80. }
  81. // Harmless exception - log anyway
  82. }
  83. }

最后总结下调用流程

  1. io.netty.bootstrap.AbstractBootstrap#bind(int)
  2. >io.netty.bootstrap.AbstractBootstrap#bind(int)
  3. >io.netty.bootstrap.AbstractBootstrap#doBind
  4. >io.netty.bootstrap.AbstractBootstrap#initAndRegister
  5. >io.netty.bootstrap.ChannelFactory#newChannel 调用NioServerSocketChannel构造器实例化
  6. >NioServerSocketChannel构造,并注册SelectionKey.OP_ACCEPT事件
  7. io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel(java.nio.channels.ServerSocketChannel)
  8. > io.netty.bootstrap.AbstractBootstrap#init
  9. >io.netty.bootstrap.ServerBootstrap#init
  10. >io.netty.channel.ChannelPipeline初始化
  11. >添加Acceptor Handler io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor
  12. >接收OP_ACCEPT到事件后分发给WorkerGroup处理
  13. >(WorkerGroupchildGroup io.netty.channel.EventLoopGroup#register(io.netty.channel.Channel)
  14. >io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
  15. >io.netty.channel.AbstractChannel.AbstractUnsafe#register
  16. >NioEventLoop java.util.concurrent.Executor#execute
  17. >io.netty.util.concurrent.SingleThreadEventExecutor#execute
  18. >io.netty.util.concurrent.SingleThreadEventExecutor#addTask
  19. >io.netty.util.concurrent.SingleThreadEventExecutor#startThread

发表评论

表情:
评论列表 (有 0 条评论,287人围观)

还没有评论,来说两句吧...

相关阅读