Netty(四)_核心源码分析

曾经终败给现在 2022-11-21 11:59 286阅读 0赞

Netty(四)_核心源码分析

文章目录

  • Netty(四)_核心源码分析
    • EventLoopGroup实例化源码分析
    • .channel()源码剖析
    • channel实例化源码分析
    • Handler添加源码
    • 异步原理_Future&Promise
    • .rigister()_1
    • .rigister()_2
    • .connect()源码分析
    • netty请求接收说明

本篇内容门槛还是有的,需要你对我前三篇netty所说的内容不算陌生

为了提高阅读舒适性,我们并不会一行一行代码解析,并删除部分与目的无关的源码。读源码要带着目的性读,最好你已经知道它背后基本原理,然后带着审视的感觉去进行阅读,反客为主。

在整个流程中,以下这张图要常驻脑海中!

img

我们选择netty提供的example程序,该程序可以在源码包netty.example.echo

  1. public final class EchoServer {
  2. //...
  3. static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
  4. public static void main(String[] args) throws Exception {
  5. // Configure SSL.
  6. //SSL配置 用于https
  7. // Configure the server.
  8. // 服务端配置
  9. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  10. EventLoopGroup workerGroup = new NioEventLoopGroup();
  11. // 服务端自定义的处理器Handler 这里不进行代码展示
  12. final EchoServerHandler serverHandler = new EchoServerHandler();
  13. try {
  14. //引导器
  15. ServerBootstrap b = new ServerBootstrap();
  16. b.group(bossGroup, workerGroup)
  17. .channel(NioServerSocketChannel.class) //设置服务端通道
  18. .option(ChannelOption.SO_BACKLOG, 100) //通道配置选项,连接队列容量100
  19. .handler(new LoggingHandler(LogLevel.INFO))//设置服务端通道处理器Handler
  20. .childHandler(new ChannelInitializer<SocketChannel>() {
  21. //客户端通道的处理器
  22. @Override
  23. //客户端通道完成初始化时
  24. public void initChannel(SocketChannel ch) throws Exception {
  25. ChannelPipeline p = ch.pipeline();
  26. if (sslCtx != null) {
  27. //向管道链中添加处理器
  28. p.addLast(sslCtx.newHandler(ch.alloc()));
  29. }
  30. //向管道链添加自定义处理器
  31. p.addLast(serverHandler);
  32. }
  33. });
  34. // Start the server.
  35. //异步绑定端口
  36. ChannelFuture f = b.bind(PORT).sync();
  37. // Wait until the server socket is closed.
  38. //阻塞直至服务端通道关闭 --> 维持服务端一直提供服务
  39. f.channel().closeFuture().sync();
  40. } finally {
  41. // Shut down all event loops to terminate all threads.
  42. //优雅地关闭所有资源
  43. bossGroup.shutdownGracefully();
  44. workerGroup.shutdownGracefully();
  45. }
  46. }
  47. }

客户端:

  1. public final class EchoClient {
  2. //属性
  3. static final boolean SSL = System.getProperty("ssl") != null;
  4. static final String HOST = System.getProperty("host", "127.0.0.1");
  5. static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
  6. static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
  7. public static void main(String[] args) throws Exception {
  8. // Configure SSL.git
  9. //...
  10. // Configure the client.
  11. //客户端只需要一个线程组 以下引导与服务端类似 不赘述
  12. EventLoopGroup group = new NioEventLoopGroup();
  13. try {
  14. Bootstrap b = new Bootstrap();
  15. b.group(group)
  16. .channel(NioSocketChannel.class)
  17. .option(ChannelOption.TCP_NODELAY, true)
  18. .handler(new ChannelInitializer<SocketChannel>() {
  19. @Override
  20. public void initChannel(SocketChannel ch) throws Exception {
  21. ChannelPipeline p = ch.pipeline();
  22. if (sslCtx != null) {
  23. p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
  24. }
  25. //p.addLast(new LoggingHandler(LogLevel.INFO));
  26. p.addLast(new EchoClientHandler());
  27. }
  28. });
  29. // Start the client.
  30. ChannelFuture f = b.connect(HOST, PORT).sync();
  31. // Wait until the connection is closed.
  32. f.channel().closeFuture().sync();
  33. } finally {
  34. // Shut down the event loop to terminate all threads.
  35. group.shutdownGracefully();
  36. }
  37. }
  38. }

EventLoopGroup实例化源码分析

我们这里从最具有一般性的服务端的EventLoopGroup workerGroup = new NioEventLoopGroup();

入手,看看线程池创建中的细节。

这一步是在服务端绑定端口号之间完成的,主要是为了把所有线程池进行实例化

在追溯源码前,我们根据对EventLoopGroup的认知,即

EventLoopGroup 是一组 EventLoop 的抽象,EventLoopGroup代表着线程池, EventLoop代表线程池中的线程。

在netty中BossGroup和WorkGroup都是NioEventLoopGroup类型。

每个 EventLoop 维护着一个 Selector 实例,Selector的作用是在Group中选择Loop来执行任务。

这里先看看NioEventLoopGroup中参数最全的构造器,看看NioEventLoopGroup的实例化会配置什么参数,或者说,一个Group有什么特性/属性。

  1. public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
  2. final SelectorProvider selectorProvider,
  3. final SelectStrategyFactory selectStrategyFactory,
  4. final RejectedExecutionHandler rejectedExecutionHandler,
  5. final EventLoopTaskQueueFactory taskQueueFactory) {
  6. super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
  7. rejectedExecutionHandler, taskQueueFactory);
  8. }

参数说明:

  • nThreads:线程池中的线程数,也就是 NioEventLoop 的实例数量。
  • executor:Java线程池(Executor)
  • chooserFactory:线程池选择线程(next)来执行任务的选择策略。追源码的过程中我们也可以看看具体有什么策略。
  • selectorProvider:通过它能实例化 Selector,每个线程池都持有一个 selectorProvider 用以实例化selector。
  • selectStrategyFactory:这个涉及到的是线程池中线程的工作流程。
  • rejectedExecutionHandler:用于处理线程池中没有可用的线程来执行任务的情况的处理器。在 Netty 中稍微有一点点不一样,这个是给 NioEventLoop 实例用的,后面说。
  • EventLoopTaskQueueFactory,顾名思义,EventLoop的工作队列。

从无参构造器开始跟踪,看看这些参数都是怎么被赋值的,并且被赋了什么值。

  1. public class NioEventLoopGroup extends MultithreadEventLoopGroup {
  2. //默认构造器
  3. public NioEventLoopGroup() {
  4. this(0);
  5. }
  6. //上面构造器所调用
  7. public NioEventLoopGroup(int nThreads) {
  8. this(nThreads, (Executor) null);
  9. }
  10. //上面构造器所调用
  11. public NioEventLoopGroup(int nThreads, Executor executor) {
  12. //SelectorProvider.provider()返回selectorProvider
  13. this(nThreads, executor, SelectorProvider.provider());
  14. }
  15. //上面构造器所调用
  16. public NioEventLoopGroup(
  17. int nThreads, Executor executor, final SelectorProvider selectorProvider) {
  18. //涉及到线程在做 select 操作和执行任务过程中的策略选择问题
  19. this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
  20. }
  21. //上面构造器所调用
  22. public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
  23. final SelectStrategyFactory selectStrategyFactory) {
  24. //rejectedExecutionHandler(线程池无线程可用的拒绝策略)默认值为RejectedExecutionHandlers.reject()
  25. //点进.reject()发现默认拒绝策略最终抛出异常
  26. //注意,这里将交由父类构造器
  27. super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
  28. }

到父类 MultithreadEventLoopGroup 的构造方法中

  1. protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
  2. super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
  3. }

这里将nThreads重新设置为DEFAULT_EVENT_LOOP_THREADS

该类中的静态代码块能获取本机CPU核数,并将之*2作为nThreads

  1. static {
  2. DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
  3. "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
  4. if (logger.isDebugEnabled()) {
  5. logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
  6. }
  7. }

继续往下走,来到父类MultithreadEventExecutorGroup这个构造器

  1. protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
  2. //这里重新设置了线程池选择策略
  3. this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
  4. }

这里我们来看看它的选择策略是什么样的

点进DefaultEventExecutorChooserFactory类

  1. //这个方法根据executors的不同,返回不同的策略
  2. @Override
  3. public EventExecutorChooser newChooser(EventExecutor[] executors) {
  4. //如果2^n个线程数
  5. if (isPowerOfTwo(executors.length)) {
  6. return new PowerOfTwoEventExecutorChooser(executors);
  7. } else {
  8. //如果不是
  9. return new GenericEventExecutorChooser(executors);
  10. }
  11. }
  12. //我们再看看两个策略的next()方法,之前说过next()是线程池用来选择线程的方法
  13. @Override
  14. //如果2^n个线程数 与
  15. public EventExecutor next() {
  16. return executors[idx.getAndIncrement() & executors.length - 1];
  17. }
  18. @Override
  19. //如果2^n个线程数 模
  20. public EventExecutor next() {
  21. return executors[Math.abs(idx.getAndIncrement() % executors.length)];
  22. }

继续构造器:下面这个明显不一样了。我们关注其核心语句

  1. protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
  2. EventExecutorChooserFactory chooserFactory, Object... args) {
  3. //这里实例化executor,即把Java线程池对象进行实例化
  4. if (executor == null) {
  5. executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
  6. }
  7. //EventExecutor类型数组,即线程池,其元素是每个线程,即每个Loop
  8. //可用看出EventLoop最终类型是EventExecutor而不说线程对象Thread
  9. children = new EventExecutor[nThreads];
  10. //实例化 children 数组中的每一个元素,即实例化Group中每个loop
  11. for (int i = 0; i < nThreads; i ++) {
  12. boolean success = false;
  13. try {
  14. //newChild(executor, args)为每个Loop的实例化入口,重点关注。
  15. children[i] = newChild(executor, args);
  16. success = true;
  17. } catch (Exception e) {
  18. //...
  19. } finally {
  20. //如果Loop实例化失败
  21. if (!success) {
  22. //之前已实例化的Loop全部关闭
  23. for (int j = 0; j < i; j ++) {
  24. children[j].shutdownGracefully();
  25. }
  26. // 等待这些线程成功 shutdown
  27. for (int j = 0; j < i; j ++) {
  28. EventExecutor e = children[j];
  29. try {
  30. while (!e.isTerminated()) {
  31. e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
  32. }
  33. } catch (InterruptedException interrupted) {
  34. // 如果关闭过程发生异常,改变当前线程状态未打断状态,让调用者线程知悉并处理
  35. // Let the caller handle the interruption.
  36. Thread.currentThread().interrupt();
  37. break;
  38. }
  39. }
  40. }
  41. }
  42. }
  43. //通过之前设置的 chooserFactory 来实例化 Chooser即Group的选择策略,传入children用于根据线程数返回不同的choser
  44. chooser = chooserFactory.newChooser(children);
  45. // 设置一个 Listener 用来监听该线程池的 termination 事件即结束事件
  46. final FutureListener<Object> terminationListener = new FutureListener<Object>() {
  47. @Override
  48. public void operationComplete(Future<Object> future) throws Exception {
  49. //只有每个线程都发生termination事件,这个池的terminationFuture才是成功
  50. if (terminatedChildren.incrementAndGet() == children.length) {
  51. terminationFuture.setSuccess(null);
  52. }
  53. }
  54. };
  55. //给每个loop的结束任务注册监听器
  56. for (EventExecutor e: children) {
  57. e.terminationFuture().addListener(terminationListener);
  58. }
  59. //...这个不是重点
  60. }

我们重点关注上面newChild() 这个方法,这个方法非常重要,它将创建线程池中的线程即Group中的Loop。

newChild() 是在NioEventLoopGroup中进行的覆写

  1. protected EventLoop newChild(Executor executor, Object... args) throws Exception {
  2. EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
  3. //返回Loop对象
  4. return new NioEventLoop(this, executor, (SelectorProvider) args[0],
  5. ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
  6. }

它调用了 NioEventLoop 的构造方法,这里将会生产一个Loop,即“线程”对象:

  1. NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
  2. SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
  3. EventLoopTaskQueueFactory queueFactory) {
  4. //实例化两个任务队列后传进父类构造器
  5. super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
  6. rejectedExecutionHandler);
  7. if (selectorProvider == null) {
  8. throw new NullPointerException("selectorProvider");
  9. }
  10. if (strategy == null) {
  11. throw new NullPointerException("selectStrategy");
  12. }
  13. provider = selectorProvider;
  14. //开启了NIO 中最重要的组件:Selector
  15. //即Selector是在Loop实例化的时候进行同步创建的
  16. final SelectorTuple selectorTuple = openSelector();
  17. selector = selectorTuple.selector;
  18. unwrappedSelector = selectorTuple.unwrappedSelector;
  19. selectStrategy = strategy;
  20. }

我们看到上面的构造方法调用了父类的构造器,它的父类是 SingleThreadEventLoop。

  1. protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
  2. boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
  3. RejectedExecutionHandler rejectedExecutionHandler) {
  4. super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
  5. //看到这里设置了也给尾队列,即从子类传上来的两个任务队列拿一个来做尾队列,事实上目前还不指定尾队列是干啥用的。
  6. //事实上可以直接忽略这个东西。
  7. tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
  8. }

再点进其父类构造器,其父类是SingleThreadEventExecutor

  1. protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
  2. boolean addTaskWakesUp, Queue<Runnable> taskQueue,
  3. RejectedExecutionHandler rejectedHandler) {
  4. super(parent);
  5. this.addTaskWakesUp = addTaskWakesUp;
  6. this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
  7. //这一步是对executor即这个loop做一个封装,确保之在运行的时候能返回
  8. this.executor = ThreadExecutorMap.apply(executor, this);
  9. //这里又设置了一个重要的组件taskQueue 默认容量16
  10. this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
  11. //如果 submit 的任务堆积了到了 16,再往里面提交任务会触发 rejectedExecutionHandler 的执行策略
  12. rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
  13. }

我们发现它爹是SingleThreadEventExecutor,它的名字告诉我们,它是一个 Executor,是一个线程池,而且是 Single Thread 单线程的,也就是说,线程池 NioEventLoopGroup 中的每一个线程 NioEventLoop 也可以当做一个线程池来用,只不过池中只有一个线程。

好了。

至此为止,小结以下上面代码都做了什么:

  • EventLoopGroup实例化的过程中:
  • 通过SelectorProvider.provider()持有一个selectorProvider对象。
  • 通过DefaultSelectStrategyFactory.INSTANCE持有一个selectStrategyFactory,这指的是一个选择线程池选线程的策略
  • 通过RejectedExecutionHandlers.reject()持有无线程可用的拒绝策略
  • 在MultithreadEventLoopGroup中设置该线程池的线程数为机器CPU核数*2
  • 在MultithreadEventExecutorGroup中通过DefaultEventExecutorChooserFactory.INSTANCE重新持有线程池选线程的策略
  • 实例化executor,类型是 ThreadPerTaskExecutor
  • 实例化线程池children,类型是EventExecutor
  • 实例化线程池中的线程

    • 顺带着实例化Selector
    • 在SingleThreadEventLoop中实例化任务队列
  • 通过chooserFactory实例化选择策略

线程池 NioEventLoopGroup 创建完成了,并且实例化了池中的所有 NioEventLoop 实例。

同时,大家应该已经看到,上面并没有真正创建 NioEventLoop 中的线程(没有创建 Thread 实例)

创建线程的时机在第一个任务提交过来的时候,即服务端bind()或connect()时涉及到的 register 操作

.channel()源码剖析

根据之前所学的知识,我们对channel应该有了一个基本认识

  • 服务端需要NioServerSocketChannel作为其接收请求的通道,客户端需要NioSocketChannel作为其发送与接收服务端消息的通道。
  • NioServerSocketChannel 是netty对JDK的ServerSocketChannel 进行的封装,NioSocketChannel是SocketChannel的封装,我们也要从源码中知道其封装过程

从服务端的引导代码开始:

  1. //传入NioServerSocketChannel的类,那它日后必然被反射调用。
  2. .channel(NioServerSocketChannel.class)

跟踪进入到AbstractBootstrap类

  1. // channelFactory负责通过工厂方法反射创建Channel实例
  2. public B channel(Class<? extends C> channelClass) {
  3. //即将进入ReflectiveChannelFactory
  4. return channelFactory(new ReflectiveChannelFactory<C>(
  5. ObjectUtil.checkNotNull(channelClass, "channelClass")
  6. ));
  7. }

看下ReflectiveChannelFactory工厂的内容

  1. /**
  2. ...[原注释]
  3. 根据注释可知ReflectiveChannelFactory负责通过反射调用Channel的默认构造器实例化Channel
  4. */
  5. //
  6. public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
  7. //NioServerSocketChannel的构造器,通过它创建NioServerSocketChannel
  8. private final Constructor<? extends T> constructor;
  9. public ReflectiveChannelFactory(Class<? extends T> clazz) {
  10. //...
  11. //此处获得NioServerSocketChanne的默认构造器
  12. this.constructor = clazz.getConstructor();
  13. //...
  14. }
  15. //创建Channel方法
  16. @Override
  17. public T newChannel() {
  18. try {
  19. //利用NioServerSocketChannel的构造器创建NioServerSocketChannel实例
  20. return constructor.newInstance();
  21. } catch (Throwable t) {
  22. //...
  23. }
  24. }
  25. }

这里我们找到了Channel实例化的入口,那么Channel何时被实例化?这里要说明一下:

  • 对于 NioSocketChannel,由于它充当客户端的功能,它的实例化时机在 connect(…) 的时候;
  • 对于 NioServerSocketChannel 来说,它充当服务端功能,它的实例化时机是在绑定端口 bind(…) 的时候。

channel实例化源码分析

根据上面的说明,我们从服务端的connect方法入手,看看其channel的实例化过程

  1. // 注意返回值为ChannelFuture。这个后面细说,现在只需要知道通过它能得知connect的状态与结果
  2. //.sync()为异步方法,关于异步,后面说
  3. ChannelFuture f = b.connect(HOST, PORT).sync();

点进connect()

  1. public ChannelFuture connect(String inetHost, int inetPort) {
  2. return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
  3. }

继续跟踪:

  1. public ChannelFuture connect(SocketAddress remoteAddress) {
  2. ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
  3. //参数检验
  4. validate();
  5. return doResolveAndConnect(remoteAddress, config.localAddress());
  6. }

跟踪doResolveAndConnect()

  1. private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
  2. //关注点
  3. //这里将会进行Channel实例化
  4. final ChannelFuture regFuture = initAndRegister();
  5. final Channel channel = regFuture.channel();
  6. // 略...
  7. }

跟踪initAndRegister():

  1. final ChannelFuture initAndRegister() {
  2. Channel channel = null;
  3. try {
  4. //这里进行channel的实例化
  5. channel = channelFactory.newChannel();
  6. init(channel);
  7. } catch (Throwable t) {
  8. //...
  9. }
  10. //一个特别重要的方法,观察之,发现与register有关
  11. //即它可能将该客户端channel注册到服务端的WorkGroup上,这个留到后面讲
  12. ChannelFuture regFuture = config().group().register(channel);
  13. if (regFuture.cause() != null) {
  14. if (channel.isRegistered()) {
  15. channel.close();
  16. } else {
  17. channel.unsafe().closeForcibly();
  18. }
  19. }
  20. return regFuture;
  21. }

跟踪channelFactory.newChannel() ,根据前面说的,这里是调用 Channel 的无参构造方法进行Channel的实例化。

  1. //该方法出现于上一节[.channel()]所说的ReflectiveChannelFactory中
  2. @Override
  3. public T newChannel() {
  4. try {
  5. //实例化NioSocketChannel
  6. return constructor.newInstance();
  7. } catch (Throwable t) {
  8. //...
  9. }
  10. }

由于是通过反射调用,所以我们直接观察NioSocketChannel 的无参构造方法

//来到NioSocketChannel类

  1. public NioSocketChannel() {
  2. this(DEFAULT_SELECTOR_PROVIDER);
  3. //DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
  4. //SelectorProvider用于创建JDK的SocketChannel 实例
  5. }
  6. //为上面构造器调用
  7. public NioSocketChannel(SelectorProvider provider) {
  8. this(newSocket(provider));
  9. }
  10. //为上所调用,传入参数是Java的SocketChannel。
  11. //也就是说,上面的newSocket(provider)会实例化一个SocketChannel
  12. public NioSocketChannel(SocketChannel socket) {
  13. this(null, socket);
  14. }

跟踪newSocket(provider)看看SocketChannel的实例化情况

  1. private static SocketChannel newSocket(SelectorProvider provider) {
  2. try {
  3. //返回实例化的SocketChannel
  4. return provider.openSocketChannel();
  5. } catch (IOException e) {
  6. //...
  7. }
  8. }

NioServerSocketChannel 同理,也非常简单,从 ServerBootstrap#bind(...) 方法一路点进去即可。

回到NioSocketChannel,接着看它的构造器

  1. //上面说的构造器
  2. public NioSocketChannel(SelectorProvider provider) {
  3. //实例化SocketChannel
  4. this(newSocket(provider));
  5. }
  6. //被上面的构造器调用
  7. //此时的Channel还是SocketChannel
  8. public NioSocketChannel(SocketChannel socket) {
  9. this(null, socket);
  10. }
  11. //被上面的构造器调用
  12. public NioSocketChannel(Channel parent, SocketChannel socket) {
  13. //这里调用了父类构造器,传入null和SocketChannel
  14. super(parent, socket);
  15. //实例化配置对象 用于保存channel的配置信息
  16. config = new NioSocketChannelConfig(this, socket.socket());
  17. }

我们跟踪其父类构造器,来到AbstractNioByteChannel

  1. protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
  2. //这里又传入了SelectionKey.OP_READ,代表该channel关心的事件是读取
  3. //这正是客户端channel应该关心的事情
  4. super(parent, ch, SelectionKey.OP_READ);
  5. }
  6. //被上面的构造器调用
  7. protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
  8. super(parent);
  9. this.ch = ch;
  10. //这里保存了SelectionKey.OP_READ
  11. this.readInterestOp = readInterestOp;
  12. try {
  13. //设置该channel为非阻塞
  14. ch.configureBlocking(false);
  15. } catch (IOException e) {
  16. //...
  17. }
  18. }

跟踪其父类构造器,来到AbstractChannel,发现这里实例化了Pipeline!抱歉,关于Pipeline的内容,我们后面再讲…

  1. protected AbstractChannel(Channel parent) {
  2. this.parent = parent;
  3. id = newId();
  4. unsafe = newUnsafe();
  5. pipeline = newChannelPipeline();
  6. }

在服务端的NioServerSocketChannel 的构造方法类似,也设置了非阻塞,然后设置服务端关心的 SelectionKey.OP_ACCEPT 事件,非常合理,对于服务端来说,关心的是 SelectionKey.OP_ACCEPT 事件,等待客户端连接。

对于客户端来说,关心的Read,只要客户端Channel有来自服务器的数据,马上读。

关于 Channel 实例化的问题先到这,目前为止,主要就是实例化了 JDK 层的 SocketChannel 或 ServerSocketChannel,设置了非阻塞模式,实例化了channel的配置对象。

Channel 实例化中的其他问题如其Pipeline、注册到Loop上、channel的处理链handler的实例化等问题,都是围绕着rigister()方法来的,我们后面专门讲解!

还是小解一下:

  • 通过ReflectiveChannelFactory持有Channel的默认构造器,同时ReflectiveChannelFactory中留着实例化Channel的方法
  • channelFactory.newChannel()直接实例化Channel

    • newSocket(SelectorProvider)创建JDK的SocketChannel
    • 为该通道传入了SelectionKey.OP_READ,代表该channel关心的事件是读取
    • 设置该channel为非阻塞
    • newChannelPipeline()实例化Pipeline

Handler添加源码

回到echo的程序

  1. b.handler(new ChannelInitializer<SocketChannel> //客户端
  2. b.handler(new LoggingHandler(LogLevel.INFO)) //服务端

之前学习过:一个 Channel 关联一个 pipeline,NioSocketChannel 和 NioServerSocketChannel 实例化的时候,都会走到它们的父类 AbstractChannel 的构造方法中:

  1. protected AbstractChannel(Channel parent) {
  2. this.parent = parent;
  3. // 给每个 channel 分配一个唯一 id
  4. id = newId();
  5. // 每个 channel 内部需要一个 Unsafe 的实例
  6. unsafe = newUnsafe();
  7. // 每个 channel 内部都会创建一个 pipeline
  8. pipeline = newChannelPipeline();
  9. }

关于UnSafe:

在 JDK 的源码中,sun.misc.Unsafe 类提供了一些底层操作的能力,它设计出来是给 JDK 中的源码使用的,比如 AQS、ConcurrentHashMap 等

这个 Unsafe 类不是给我们的代码使用的

Netty 中的 Unsafe 也是同样的意思,它封装了 Netty 中会使用到的 JDK 提供的 NIO 接口,比如将 channel 注册到 selector 上,比如 bind 操作,比如 connect 操作等,这些操作都是稍微偏底层一些。

Netty 同样也是不希望我们的业务代码使用 Unsafe 的实例,它是提供给 Netty 中的源码使用的。

点进newChannelPipeline():

  1. protected DefaultChannelPipeline newChannelPipeline() {
  2. return new DefaultChannelPipeline(this);
  3. }

跟踪DefaultChannelPipeline():

  1. protected DefaultChannelPipeline(Channel channel) {
  2. this.channel = ObjectUtil.checkNotNull(channel, "channel");
  3. succeededFuture = new SucceededChannelFuture(channel, null);
  4. voidPromise = new VoidChannelPromise(channel, true);
  5. //实例化tail和head 这两个 handler并且串起来
  6. tail = new TailContext(this);
  7. head = new HeadContext(this);
  8. head.next = tail;
  9. tail.prev = head;
  10. }

前面我们说过管道本质是个双向链表,入站事件从头走,出站事件从尾走。

从上面的 head 和 tail 我们也可以看到,其实 pipeline 中的每个元素是 ChannelHandlerContext 的实例,而不是 ChannelHandler 的实例,context只是对handler进行了一下包装。

看到这里,我们发现现在的pipeline还不涉及到自定义的 handler 代码执行。那我们添加的handler何时被加入到这个pipeline中?

我们从bind()开始看。

bind->doBind->initAndRegister

  1. final ChannelFuture initAndRegister() {
  2. Channel channel = null;
  3. try {
  4. //刚讲了,这里是channel的实例化,上面的pipeline也会随之实例化,只不过没有我们添加我们自定义的handler
  5. channel = channelFactory.newChannel();
  6. //这里才涉及我们handler的添加
  7. init(channel);
  8. } catch (Throwable t) {
  9. //...
  10. return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
  11. }

跟踪 init(channel),该方法位于ServerBootstrap类

  1. void init(Channel channel) {
  2. //...
  3. //通过实例化的channel拿到pipeline实例
  4. ChannelPipeline p = channel.pipeline();
  5. //...
  6. //开始往 pipeline 中添加一个 handler,这个handler是ChannelInitializer的实例
  7. p.addLast(new ChannelInitializer<Channel>() {
  8. @Override
  9. //关于ChannelInitializer的initChannel()方法,也挺重要的,我们需要质疑它何时被调用,即我们自定义的handler何时被真正加入后面说
  10. public void initChannel(final Channel ch) {
  11. final ChannelPipeline pipeline = ch.pipeline();
  12. //从config中拿到我们.handler()指定的LoggingHandler
  13. ChannelHandler handler = config.handler();
  14. //添加到尾部
  15. if (handler != null) {
  16. pipeline.addLast(handler);
  17. }
  18. //先不管.eventLoop()
  19. ch.eventLoop().execute(new Runnable() {
  20. @Override
  21. public void run() {
  22. //添加ServerBootstrapAcceptor到pipeline中
  23. //ServerBootstrapAcceptor用于接收客户端的请求
  24. pipeline.addLast(new ServerBootstrapAcceptor(
  25. ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
  26. }
  27. });
  28. }
  29. });
  30. }

关于ChannelInitializer,它本身是一个 handler(Inbound 类型),但是它的作用和普通 handler 有点不一样,它纯碎是用来将其他的 handler 加入到 pipeline 中的。

ChannelInitializer的initChannel 被调用时,会往 pipeline 中添加我们最开始指定的 LoggingHandler 和添加一个 ServerBootstrapAcceptor,这样,我们的NioServerSocketChannel在遇到READ事件时,就能由ServerBootstrapAcceptor进行处理

而initChannel方法何时会被调用?

服务端的NioSocketChannel与之类似,区别在于它只需要将 EchoClient 类中的 ChannelInitializer 实例加进来就可以了,它的 ChannelInitializer 中添加了两个 handler,LoggingHandler 和 EchoClientHandler

抱歉,又要留悬念了…本节没有介绍 handler 的向后传播,就是一个 handler 处理完了以后,怎么传递给下一个 handler 来处理?initChannel方法何时会被调用?这里还是跟rigister()有关…你会发现,我们在一直向rigiser()靠…它的重要性不用多说了吧

小结:

  • DefaultChannelPipeline()中实例化了两个初始Handler
  • initAndRegister()中的init()中

    • 通过实例化的channel拿到pipeline
    • 向管道中添加ChannelInitializer,其中的方法定义了但未执行添加自定义Handler的逻辑。此时,自定义的Handler未实例化。
    • 向该Channel对应Loop的Selector的工作队列中添加任务,执行管道添加Acceptor处理器

异步原理_Future&Promise

Future、Promise是netty实现异步核心类

在echo中

  1. ChannelFuture f = b.connect(HOST, PORT).sync();
  2. f.channel().closeFuture().sync();
  3. ChannelFuture f = b.bind(PORT).sync();
  4. f.channel().closeFuture().sync();

关于 Future 接口,用得最多的就是在使用 Java 的线程池 ThreadPoolExecutor 的时候了。在 submit 一个任务到线程池中的时候,返回的就是一个 Future 实例,通过它来获取提交的任务的执行状态和最终的执行结果,我们最常用它的 isDone()get() 方法

netty中,Future的一切操作都是由某个线程执行时调用的。

先看Java中的Future接口:

  1. public interface Future<V> {
  2. // 取消该任务
  3. boolean cancel(boolean mayInterruptIfRunning);
  4. // 任务是否已取消
  5. boolean isCancelled();
  6. // 任务是否已完成
  7. boolean isDone();
  8. // 阻塞获取任务执行结果
  9. V get() throws InterruptedException, ExecutionException;
  10. // 带超时参数的获取任务执行结果
  11. V get(long timeout, TimeUnit unit)
  12. throws InterruptedException, ExecutionException, TimeoutException;
  13. }

Netty 中的 Future 接口继承了 JDK 中的 Future 接口,同时添加了一些方法:

  1. public interface Future<V> extends java.util.concurrent.Future<V> {
  2. //是否成功 只有IO操作完成时才返回true
  3. boolean isSuccess();
  4. // IO操作发生异常时,返回导致IO操作失败的原因,如果没有异常,返回null
  5. Throwable cause();
  6. //注册监听者,future完成时,进行回调(通知所有监听者)
  7. Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
  8. Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  9. Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
  10. Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  11. //阻塞等待任务结束,如果任务失败,抛出导致任务失败的异常
  12. Future<V> sync() throws InterruptedException;
  13. // 不可中断的 sync() 即原子操作
  14. Future<V> syncUninterruptibly();
  15. // 阻塞等待任务结束,和 sync() 功能是一样的,不过如果任务失败,它不会抛出执行过程中的异常
  16. //sync() 内部会先调用 await() 方法,等 await() 方法返回后,会检查下这个任务是否失败,如果失败,重新将导致失败的异常抛出来。await()本身不抛出异常
  17. Future<V> await() throws InterruptedException;
  18. // 不可中断的 await 即原子操作
  19. Future<V> awaitUninterruptibly();
  20. boolean await(long timeout, TimeUnit unit) throws InterruptedException;
  21. boolean await(long timeoutMillis) throws InterruptedException;
  22. boolean awaitUninterruptibly(long timeout, TimeUnit unit);
  23. boolean awaitUninterruptibly(long timeoutMillis);
  24. // 不阻塞获取执行结果,如未完成,返回null
  25. V getNow();
  26. // 取消任务执行,如果取消成功,任务会因为 CancellationException 异常而导致失败
  27. boolean cancel(boolean mayInterruptIfRunning);
  28. }

说明:

Netty 自己实现的 Future 继承了 JDK 的 Future,新增了 sync()await() 用于阻塞等待,还加了 Listeners让我们可以不用阻塞等待结果,只要任务结束去回调 Listener 就可以了,那么我们就不一定要主动调用 isDone()来获取状态,或通过 get()阻塞方法来获取值。

Future 其子接口ChannelFuture:

  1. public interface ChannelFuture extends Future<Void> {
  2. Channel channel();
  3. @Override
  4. ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
  5. @Override
  6. ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
  7. @Override
  8. ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
  9. @Override
  10. ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
  11. @Override
  12. ChannelFuture sync() throws InterruptedException;
  13. @Override
  14. ChannelFuture syncUninterruptibly();
  15. @Override
  16. ChannelFuture await() throws InterruptedException;
  17. @Override
  18. ChannelFuture awaitUninterruptibly();
  19. // 用来标记该 future 是 void 的,
  20. // 这样就不允许使用 addListener(...), sync(), await() 以及它们的几个重载方法
  21. boolean isVoid();
  22. }

发现其不过是 Future接口的方法重写,返回 Future改为ChannelFuture。

再来看看Promise接口:

  1. public interface Promise<V> extends Future<V> {
  2. //以下是Promise对Future新增的方法~
  3. // 标记该future 成功及设置其执行结果,并且会通知所有的 listeners
  4. Promise<V> setSuccess(V result);
  5. //与setSuccess(V result)一样,区别是操作失败不抛异常,返回false[失败是指此方法本身的失败]
  6. boolean trySuccess(V result);
  7. // 标记该 future 失败,及其失败原因。
  8. Promise<V> setFailure(Throwable cause);
  9. //不抛异常,返回false
  10. boolean tryFailure(Throwable cause);
  11. //标记该 future 不可以被取消
  12. boolean setUncancellable();
  13. //以下为重写Future的方法,只是将返回值改成Promise<V>
  14. @Override
  15. Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
  16. @Override
  17. Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  18. @Override
  19. Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
  20. @Override
  21. Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  22. @Override
  23. Promise<V> await() throws InterruptedException;
  24. @Override
  25. Promise<V> awaitUninterruptibly();
  26. @Override
  27. Promise<V> sync() throws InterruptedException;
  28. @Override
  29. Promise<V> syncUninterruptibly();
  30. }

Promise 接口继承自 Future 接口,重点添加了上述几个方法,可以人工设置 future 的执行成功与失败,并通知所有监听的 listener。是Neety实现异步返回结果与回调的主要实现组件。

是否注意到Future是没有提供set方法的而Promise才有。

Promise 提供的 setSuccess(V result)setFailure(Throwable t) 将来会被某个执行任务的线程在执行完成以后调用,同时那个线程在调用 setSuccess(result) 或 setFailure(t) 后会回调 listeners 的回调函数。

一旦 setSuccess(…) 或 setFailure(…) 后,那些 await() 或 sync() 的线程也会从等待中返回。

Promise 实现类是DefaultPromise类,该类十分重要,Future 的 listener 机制也是由它实现的。

DefaultPromise其重要属性:

  1. //...
  2. // 保存执行结果
  3. private volatile Object result;
  4. // 执行任务的线程池
  5. private final EventExecutor executor;
  6. // 监听者 由addListener(..)定义
  7. private Object listeners;
  8. // 等待这个 promise 的线程数(即调用sync()/await()进行等待的线程数量)
  9. private short waiters;
  10. // 是否正在唤醒等待线程,用于防止重复执行唤醒,不然会重复执行listeners的回调方法
  11. private boolean notifyingListeners;
  12. //......

DefaultPromise其核心方法:

  1. @Override
  2. public Promise<V> setSuccess(V result) {
  3. if (setSuccess0(result)) {
  4. return this;
  5. }
  6. throw new IllegalStateException("complete already: " + this);
  7. }
  8. @Override
  9. public boolean trySuccess(V result) {
  10. return setSuccess0(result);
  11. }
  12. @Override
  13. public Promise<V> setFailure(Throwable cause) {
  14. if (setFailure0(cause)) {
  15. return this;
  16. }
  17. throw new IllegalStateException("complete already: " + this, cause);
  18. }
  19. @Override
  20. public boolean tryFailure(Throwable cause) {
  21. return setFailure0(cause);
  22. }
  23. @Override
  24. //get方法会将setSuccess(V result)的result返回
  25. public V get() throws InterruptedException, ExecutionException {
  26. ...
  27. }

这里明显验证了上面set和try一个抛异常一个不抛异常的说法。

我们随便点进一个最终设置值的方法看看:

  1. private boolean setValue0(Object objResult) {
  2. //先不管这里的CAS操作
  3. if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
  4. RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
  5. if (checkNotifyWaiters()) {
  6. //通知监听者任务结束以及结果
  7. //这也是Future回调的实现,这里就不展开了
  8. notifyListeners();
  9. }
  10. return true;
  11. }
  12. return false;
  13. }

可以发现,在Promise设置完结果后,会立即回调函数,通知监听者结果

实例

  1. package com.junsir.netty.source;
  2. import io.netty.util.concurrent.*;
  3. public class TestFutureAndPromise {
  4. public static void main(String[] args) {
  5. //创建线程池
  6. EventExecutor executor = new DefaultEventExecutor();
  7. //创建DefaultPromise实例
  8. Promise promise = new DefaultPromise(executor);
  9. //注册监听者
  10. promise.addListener(new GenericFutureListener<Future<Integer>>() {
  11. @Override
  12. //future完成后被回调
  13. public void operationComplete(Future future) throws Exception {
  14. if (future.isSuccess()){
  15. //.get()为juc下的Future接口定义的方法,用于拿到任务的结果,具体是由DefaultPromise实现。
  16. System.out.println("任务结束,结果为:" + future.get());
  17. }else {
  18. //任务不成功
  19. System.out.println("任务失败 原因:" + future.cause());
  20. }
  21. }
  22. }).addListener(new GenericFutureListener<Future<Integer>>() {
  23. @Override
  24. public void operationComplete(Future future) throws Exception {
  25. System.out.println("任务结束...");
  26. }
  27. });
  28. //提交任务给线程池,五秒后执行结束,设置promise任务
  29. executor.submit(new Runnable() {
  30. @Override
  31. public void run() {
  32. try {
  33. Thread.sleep(5000);
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. //设置promise的结果,该语句会回调监听者
  38. promise.setSuccess(12345);
  39. }
  40. });
  41. //阻塞等待执行结果
  42. try {
  43. promise.sync();
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }

任务结束,结果为:12345
任务结束…

进一步说明netty的Future:

promise 代表的 future 是不需要和线程池或线程搅在一起的,它不负责具体任务的执行,future /promise只关心任务是否结束以及任务的执行结果,至于是哪个线程或哪个线程池执行的任务,future 其实是不关心的。

.rigister()_1

这里就是大头了,很多核心操作都在这里

回到EchoClient 中的 connect() —>connect–>doResolveAndConnect–>initAndRegister

  1. final ChannelFuture initAndRegister() {
  2. Channel channel = null;
  3. try {
  4. //讲过,完成了channel实例化的一系列事宜
  5. channel = channelFactory.newChannel();
  6. //讲过,配置了channel的pipeline的handler等
  7. init(channel);
  8. } catch (Throwable t) {
  9. //...
  10. }
  11. //.rigistr()..
  12. ChannelFuture regFuture = config().group().register(channel);
  13. //...

initAndRegister() 这个方法我们已经接触过两次了,前面介绍了 Channel 的实例化,实例化过程中,会执行 Channel 内部 Unsafe 和 Pipeline 的实例化,以及在上面 init(channel) 方法中,会往 pipeline 中添加 handler。pipeline 此时是 head+channelnitializer+tail但并未实例化

  1. ChannelFuture regFuture = config().group().register(channel);

我们说了,register 这一步是非常关键的,它发生在 channel 实例化以后,大家回忆一下当前 channel 中的一些情况:

实例化了 JDK 底层的 Channel,设置了非阻塞,实例化了 Unsafe,实例化了 Pipeline,同时往 pipeline 中添加了 head、tail 以及一个 ChannelInitializer 实例

group() 方法会返回前面实例化的 NioEventLoopGroup 的实例,然后调用其 register(channel) 方法:

跟踪发现NioEventLoopGroup的父类MultithreadEventLoopGroup中的register(channel)

  1. public ChannelFuture register(Channel channel) {
  2. //next用于选择线程池中的线程即loop实例
  3. return next().register(channel);
  4. }

在SingleThreadEventLoop中找到register():

  1. @Override
  2. public ChannelFuture register(Channel channel) {
  3. //实例化一个Promise,传入channel,整体传入register
  4. return register(new DefaultChannelPromise(channel, this));
  5. }
  6. //于同类,为上面所调用
  7. @Override
  8. public ChannelFuture register(final ChannelPromise promise) {
  9. ObjectUtil.checkNotNull(promise, "promise");
  10. // 这里可以看出promise关联了channel,channel 持有 Unsafe 实例,register操作就封装在Unsafe中
  11. promise.channel().unsafe().register(this, promise);
  12. return promise;
  13. }

跟踪发现在AbstractChannel中实现register

  1. @Override
  2. public final void register(EventLoop eventLoop, final ChannelPromise promise) {
  3. //...
  4. // 将eventLoop 实例设置给这个 channel,从此这个 channel 就是有 eventLoop 的了
  5. //别晕,这里eventLoop是SingleThreadEventLoop,这个方法的入口.register(this, promise),this就是eventLoop
  6. //再往上说eventLoop是实例好的Group通过.next()选出来的...忘了往上翻翻
  7. //注意,后续所有异步操作都将交给这个eventLoop执行!!即每个channel实例都持有一个Group中的Loop来完成channel数据的读取!
  8. AbstractChannel.this.eventLoop = eventLoop;
  9. //如果发起 register 动作的线程就是 eventLoop 实例中的线程,那么直接调用 register0(promise)
  10. if (eventLoop.inEventLoop()) {
  11. register0(promise);
  12. } else {
  13. try {
  14. //否则,提交任务给 eventLoop,eventLoop 中的线程会负责调用 register0(promise)
  15. eventLoop.execute(new Runnable() {
  16. @Override
  17. public void run() {
  18. register0(promise);
  19. }
  20. });
  21. } catch (Throwable t) {
  22. //...
  23. }
  24. }
  25. }

到这里,我们要明白,NioEventLoop 中是还没有实例化 Thread 实例的!也就是我们rigister到现在,NioEventLoop还没有线程执行操作!

这几步涉及到了好几个类:NioEventLoop、Promise、Channel、Unsafe 等,大家要仔细理清楚它们的关系。

提交到 eventLoop 以后,就直接返回 promise 实例了,剩下的register0 是异步操作,其操作结果会在 promise 中设置。

我们这边先不继续往里分析 register0(promise) 方法,先把 NioEventLoop 中的线程Thread到底出现在哪里说清楚,即.execute()提交了任务后何时执行,然后再回来继续介绍这个 register0 方法。

小结一下,到此为止

  • Channel实例持有对应Loop(Channel与Loop关系建立)
  • 提交任务给Loop中的任务队列,当前线程不负责真正Rigister0操作。在eventLoop.execute()中会另开线程进行真正Rigister0

execute()这个代码在父类 SingleThreadEventExecutor

  1. @Override
  2. public void execute(Runnable task) {
  3. //...
  4. //见过,判断调用execute的线程是否在EventLoop中
  5. boolean inEventLoop = inEventLoop();
  6. //将task(Runable)添加到任务队列
  7. addTask(task);
  8. //如果调用execute的线程否在EventLoop中,启动线程
  9. if (!inEventLoop) {
  10. //启动Loop中线程
  11. startThread();
  12. //如果上面语句执行完毕后线程是关闭状态
  13. if (isShutdown()) {
  14. boolean reject = false;
  15. try {
  16. //将任务已从队列中移除,并将是否执行拒绝策略设为true
  17. if (removeTask(task)) {
  18. reject = true;
  19. }
  20. } catch (UnsupportedOperationException e) {
  21. }
  22. if (reject) {
  23. //前面说过,默认拒绝策略抛出异常
  24. reject();
  25. }
  26. }
  27. }
  28. //....
  29. }

这里我们终于发现EventLoop启动的线程的代码了,进入 startThread()

  1. private void startThread() {
  2. //如果线程未启动
  3. if (state == ST_NOT_STARTED) {
  4. //CAS更改状态
  5. if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
  6. //线程执行成功标志
  7. boolean success = false;
  8. try {
  9. doStartThread();
  10. success = true;
  11. } finally {
  12. if (!success) {
  13. //CAS改回线程状态
  14. STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
  15. }
  16. }
  17. }
  18. }
  19. }

点进doStartThread()

  1. private void doStartThread() {
  2. //Thread来了!
  3. assert thread == null;
  4. // 这里的 executor 大家是不是有点熟悉的感觉,它就是一开始我们实例化 NioEventLoop 的时候传进来的 ThreadPerTaskExecutor 的实例。它是每次来一个任务,创建一个线程的那种 executor。
  5. // 一旦我们调用它的 execute 方法,它就会创建一个新的线程,所以这里终于会创建 Thread 实例
  6. executor.execute(new Runnable() {
  7. @Override
  8. public void run() {
  9. //将 “executor” 中创建的这个线程设置为 NioEventLoop 的线程
  10. thread = Thread.currentThread();
  11. //判断是否打断状态
  12. if (interrupted) {
  13. thread.interrupt();
  14. }
  15. //线程状态标志
  16. boolean success = false;
  17. //...
  18. try {
  19. //...执行 SingleThreadEventExecutor 的 run() 方法
  20. //它在 NioEventLoop 中实现了
  21. SingleThreadEventExecutor.this.run();
  22. success = true;
  23. } catch (Throwable t) {
  24. logger.warn("Unexpected exception from an event executor: ", t);
  25. } finally {
  26. //死循环直至该线程已关闭。
  27. //...
  28. for (;;) {
  29. if (confirmShutdown()) {
  30. break;
  31. }
  32. }
  33. //...
  34. } finally {
  35. try {
  36. //执行清理工作
  37. cleanup();
  38. } finally {
  39. //....
  40. });
  41. }

上面线程启动以后,会执行 NioEventLoop 中的 run() 方法,这是一个非常重要的方法,根据我们之前所学,它必然是个死循环,需要不断地做 select 操作和轮询 taskQueue 队列,也即,select 与轮询是个死循环,在channel register()过程中发送,也即,在客户端连接到服务端或服务端绑定端口号后开始!

我们具体看看该run()方法

  1. @Override
  2. protected void run() {
  3. //注意这是个无限循环。
  4. for (;;) {
  5. try {
  6. // selectStrategy 选择策略相关事宜
  7. //它有三个值,一个是 CONTINUE 一个是 SELECT BUSY_WAIT(无用)且都是int类型
  8. try {
  9. //hasTasks判断任务队列中是否有任务
  10. //如果有任务,selectNow(),代表立马轮询,因为该方法不会阻塞,必能拿到任务
  11. //如果无任务,那应该返回CONTINUE,继续这个循环,即不断查看队列是否有任务!
  12. switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
  13. case SelectStrategy.CONTINUE:
  14. continue;
  15. // fall-through to SELECT since the busy-wait is not supported with NIO
  16. //这个忽略即可
  17. case SelectStrategy.BUSY_WAIT:
  18. //如果是SELECT,当任务队列为空,即无任务执行时,执行select
  19. case SelectStrategy.SELECT:
  20. //进行选择,注意这个SELECT是阻塞的。
  21. select(wakenUp.getAndSet(false));
  22. if (wakenUp.get()) {
  23. selector.wakeup();
  24. }
  25. }
  26. }
  27. //...
  28. // 默认ioRatio 的值是 50,表示IO操作所占的比重。
  29. final int ioRatio = this.ioRatio;
  30. //如果ioRatio为100,先执行IO再执行select
  31. if (ioRatio == 100) {
  32. try {
  33. //先执行读写操作
  34. processSelectedKeys();
  35. } finally {
  36. // 最后执行非 IO 任务,也就是 taskQueue 中的任务
  37. runAllTasks();
  38. }
  39. } else {
  40. // 如果 ioRatio 不是 100,那么根据 IO 操作耗时,限制非 IO 操作耗时
  41. final long ioStartTime = System.nanoTime();
  42. try {
  43. //执行IO操作
  44. processSelectedKeys();
  45. } finally {
  46. //根据 IO 操作消耗的时间
  47. //计算执行非 IO 操作(runAllTasks)可以用多少时间
  48. final long ioTime = System.nanoTime() - ioStartTime;
  49. runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
  50. }
  51. }
  52. } catch (Throwable t) {
  53. handleLoopException(t);
  54. }
  55. // Always handle shutdown even if the loop processing threw an exception.
  56. try {
  57. if (isShuttingDown()) {
  58. closeAll();
  59. if (confirmShutdown()) {
  60. return;
  61. }
  62. }
  63. } catch (Throwable t) {
  64. handleLoopException(t);
  65. }
  66. }
  67. }

上面这段代码是 NioEventLoop 的核心

我们这里先不要去关心 select(oldWakenUp)、processSelectedKeys() 方法和 runAllTasks(…) 方法的细节,只要先理解它们分别做什么事情就可以了。

回过神来,我们前面在 register 的时候提交了 register 任务给 NioEventLoop,这是 NioEventLoop 接收到的第一个任务,所以这里会实例化 Thread 并且启动,然后进入到 NioEventLoop 中的 run 方法。NioEventLoop Run起来随之无限循环轮询操作,执行提交过来的 register 任务或其他任务

小结一下:

在eventLoop.execute(runnable)中

  • 添加任务。
  • startThread()启动线程。
  • doStartThread(),重线程池中搞一个线程出来

    • 该线程作为Loop的thread属性
    • 执行NioEventLoop的run方法
    • 该run方法是无限循环,剩下主要就是示意图中的三步曲了

.rigister()_2

再说回到前面的 register0(promise) 方法,我们知道,这个 register 任务进入到了 NioEventLoop 的 taskQueue 中,然后会启动 NioEventLoop 中的线程,该线程会轮询这个 taskQueue,然后从任务队列中取出并执行这个 register0 任务。

  1. private void register0(ChannelPromise promise) {
  2. try {
  3. // ...
  4. boolean firstRegistration = neverRegistered;
  5. //进行 JDK 底层的操作,在JDK层面上将Channel注册到Selector
  6. doRegister();
  7. neverRegistered = false;
  8. registered = true;
  9. //至此 状态registered 代表register完毕
  10. //涉及到ChannelInitializer 的 init(channel)
  11. pipeline.invokeHandlerAddedIfNeeded();
  12. //设置promise状态为sucess代表异步绑定/连接完毕。
  13. //即通知提交 register 操作的线程
  14. safeSetSuccess(promise);
  15. //这一步是通知所有关心注册事件的线程,这上面结合形成回调
  16. pipeline.fireChannelRegistered();
  17. //如果channel是活跃状态
  18. if (isActive()) {
  19. //如果该channel是第一次执行register,通知所有监听者channel已经处于活跃状态。
  20. if (firstRegistration) {
  21. pipeline.fireChannelActive();
  22. } else if (config().isAutoRead()) {
  23. //如果该channel之前已经注册过,立马让cheannel去监听OP_Read事件
  24. beginRead();
  25. }
  26. }
  27. } catch (Throwable t) {
  28. //...
  29. }
  30. }

跟踪doRegister():

  1. // JDK 中 Channel 的 register 方法:
  2. // public final SelectionKey register(Selector sel, int ops, Object att) {...}
  3. @Override
  4. protected void doRegister() throws Exception {
  5. boolean selected = false;
  6. for (;;) {
  7. try {
  8. //这里执行了JDK的register
  9. selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
  10. return;
  11. } catch (CancelledKeyException e) {
  12. ...
  13. }
  14. }
  15. }

可以看到,上面代码做了 JDK 底层的 register 操作,将 SocketChannel(或 ServerSocketChannel) 注册到 Selector 中,并且可以看到,这里的监听集合设置为了 0,也就是什么都不监听。当然,也就意味着,后续一定有某个地方会需要修改这个 selectionKey 的监听集合。

跟踪pipeline.invokeHandlerAddedIfNeeded()方法,这里应该就是调用initChannel()设置管道handler的过程了

  1. final void invokeHandlerAddedIfNeeded() {
  2. assert channel.eventLoop().inEventLoop();
  3. if (firstRegistration) {
  4. firstRegistration = false;
  5. callHandlerAddedForAllHandlers();
  6. }
  7. }

继续跟踪,会执行到 pipeline 中 ChannelInitializer 实例的 handlerAdded 方法,在这里会执行它的 init(context) 方法:

  1. @Override
  2. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  3. if (ctx.channel().isRegistered()) {
  4. initChannel(ctx);
  5. }
  6. }

然后我们看下 initChannel(ctx),这里终于来了我们之前介绍过的 initChannel方法:

  1. private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
  2. if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
  3. try {
  4. // 1. 将把我们自定义的 handlers 添加到 pipeline 中
  5. initChannel((C) ctx.channel());
  6. } catch (Throwable cause) {
  7. ...
  8. } finally {
  9. // 2. 将 ChannelInitializer 实例从 pipeline 中删除
  10. remove(ctx);
  11. }
  12. return true;
  13. }
  14. return false;
  15. }

我们前面也说过,ChannelInitializer 的 initChannel()被执行以后,那么其内部添加的 handlers 会进入到 pipeline 中,然后上面的 finally 块中将 ChannelInitializer 的实例从 pipeline 中删除,那么此时 pipeline 就算建立起来了。

即,Handler的建立在Register0当中。

回到register0()方法

  1. //进入这一句
  2. pipeline.fireChannelRegistered();

来到DefaultChannelPipeline:

  1. @Override
  2. public final ChannelPipeline fireChannelRegistered() {
  3. //invokeChannelRegistered代表往管道添加ChannelRegistered事件
  4. //此处传入的next的head
  5. AbstractChannelHandlerContext.invokeChannelRegistered(head);
  6. return this;
  7. }

继续跟踪:来到AbstractChannelHandlerContext类

  1. static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
  2. //点进去是return channel().eventLoop();
  3. EventExecutor executor = next.executor();
  4. if (executor.inEventLoop()) {
  5. next.invokeChannelRegistered();
  6. } else {
  7. //channel出现ChannelRegistered事件后
  8. //由head-handler去处理ChannelRegistered
  9. //处理逻辑在head-handler中已经预先定义
  10. //放在任务队列中去执行
  11. executor.execute(new Runnable() {
  12. @Override
  13. public void run() {
  14. next.invokeChannelRegistered();
  15. }
  16. });
  17. }
  18. }

跟踪next.invokeChannelRegistered(),看看head-handler是怎么处理ChannelRegistered事件的。

  1. private void invokeChannelRegistered() {
  2. if (invokeHandler()) {
  3. try {
  4. //ChannelRegistered是入站事件
  5. // handler() 方法此时会返回 head,然后执行
  6. ((ChannelInboundHandler) handler()).channelRegistered(this);
  7. } catch (Throwable t) {
  8. notifyHandlerException(t);
  9. }
  10. } else {
  11. fireChannelRegistered();
  12. }
  13. }

我们去看 head 的 channelRegistered 方法:

来到HeadContex类,可以发现其定义了一系列针对channel事件的处理函数,这里不展开了

  1. @Override
  2. public void channelRegistered(ChannelHandlerContext ctx) {
  3. //对事件处理
  4. invokeHandlerAddedIfNeeded();
  5. //向后传播入站事件事件
  6. ctx.fireChannelRegistered();
  7. }

跟踪fireChannelRegistered()

在AbstractChannelHandlerContext中实现

  1. @Override
  2. public ChannelHandlerContext fireChannelRegistered() {
  3. //findContextInbound() 方法会沿着 pipeline 找到下一个 Inbound 类型的 handler
  4. invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
  5. return this;
  6. }
  7. //被上面调用
  8. private AbstractChannelHandlerContext findContextInbound(int mask) {
  9. AbstractChannelHandlerContext ctx = this;
  10. do {
  11. ctx = ctx.next;
  12. } while ((ctx.executionMask & mask) == 0);
  13. return ctx;
  14. }

注意:pipeline.fireChannelRegistered() 是将 channelRegistered 事件抛到 pipeline 中,pipeline 中的 handlers 准备处理该事件。而 context.fireChannelRegistered() 是一个 handler 处理完了以后,向后传播给下一个 handler。

它们两个的方法名字是一样的,但是来自于不同的类。

至此。register 操作算是真正完成了

由于后续的 connect 或 bind 也会进入到同一个 eventLoop 的 queue 中,所以一定是会先 register 成功,才会执行 connect 或 bind

下面就比较轻松了,看看connect 或 bind是怎么玩的

.connect()源码分析

connect() —>connect–>doResolveAndConnect–>initA//ndRegister

  1. private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
  2. //这里完成了一系列rigister操作,将连接所需要的channel实例化
  3. final ChannelFuture regFuture = initAndRegister();
  4. final Channel channel = regFuture.channel();
  5. if (regFuture.isDone()) {
  6. if (!regFuture.isSuccess()) {
  7. return regFuture;
  8. }
  9. //那么在这里就要完成连接操作了
  10. //将经过rigister后的channel实例、ip、端口、Promise实例进行传入
  11. return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
  12. } else {
  13. return promise;
  14. }
  15. }

跟踪doResolveAndConnect0…一路到AbstactChannel的connect方法

  1. @Override
  2. public ChannelFuture connect(SocketAddress remoteAddress) {
  3. //由channel的pipeline完成连接操作
  4. return pipeline.connect(remoteAddress);
  5. }

跟踪到DefaultChannelPipeline类中

  1. public final ChannelFuture connect(SocketAddress remoteAddress) {
  2. //connect 这种 Outbound 类型的操作,是从 pipeline 的 tail 开始的向前推进的
  3. //反之亦然,服务端的bind属于InBound是从head开始的
  4. return tail.connect(remoteAddress);
  5. }

继续跟踪到AbstractChannelHandlerContext(前面说过,它是Handler的真实类型)

  1. @Override
  2. public ChannelFuture connect(
  3. final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
  4. //...
  5. //找到处理连接的handler,其就在head中,从tail->head
  6. final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
  7. //返回channel对应的loop
  8. EventExecutor executor = next.executor();
  9. //如果调用当前的线程的loop线程
  10. if (executor.inEventLoop()) {
  11. //跟踪..
  12. next.invokeConnect(remoteAddress, localAddress, promise);
  13. } else {
  14. //...
  15. return promise;
  16. }

跟踪invokeConnect()

  1. private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
  2. //Handler是否存在
  3. //如果已有,执行Handler的connect方法
  4. if (invokeHandler()) {
  5. try {
  6. //执行handler的connect方法
  7. ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
  8. } catch (Throwable t) {
  9. notifyOutboundHandlerException(t, promise);
  10. }
  11. } else {
  12. //重复执行
  13. connect(remoteAddress, localAddress, promise);
  14. }
  15. }

那么。我们需要的操作在HeadContext中进行了定义

  1. //我们同时还发现了和服务端有关的bind方法
  2. @Override
  3. public void bind(
  4. ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
  5. unsafe.bind(localAddress, promise);
  6. }
  7. @Override
  8. public void connect(
  9. ChannelHandlerContext ctx,
  10. SocketAddress remoteAddress, SocketAddress localAddress,
  11. ChannelPromise promise) {
  12. unsafe.connect(remoteAddress, localAddress, promise);
  13. }

不管是bind还是connect,最终都是用unsafe实例来执行JDK中的bind还是connect,那么,channel实例持有unsafe的意义我们也看到了。其就是用来封装JDK的NIO code的

我们再看看UnSafe中的connect操作:

  1. @Override
  2. public final void connect(
  3. final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
  4. // ...
  5. try {
  6. //...
  7. boolean wasActive = isActive();
  8. //这一步就做JDK底层的,这一步会做 JDK 底层的 SocketChannel connect
  9. //然后设置 interestOps 为 SelectionKey.OP_CONNECT
  10. if (doConnect(remoteAddress, localAddress)) {
  11. //netty对JDK中连接成功的进一步处理
  12. fulfillConnectPromise(promise, wasActive);
  13. } else {
  14. //连接失败处理
  15. connectPromise = promise;
  16. requestedRemoteAddress = remoteAddress;
  17. // 连接超时处理...
  18. //...
  19. }
  20. } catch (Throwable t) {
  21. promise.tryFailure(annotateConnectException(t, remoteAddress));
  22. closeIfClosed();
  23. }
  24. }

在上一节介绍的 register 操作中,channel 已经 register 到了 selector 上,只不过将 interestOps 设置为了 0,也就是什么都不监听。

而在上面的 doConnect 方法中,我们看到它在调用底层的 connect 方法后,会设置 interestOps 为 SelectionKey.OP_CONNECT。即让channel去监听connect事件

netty请求接收说明

服务器启动后肯定是要接受客户端请求,并返回客户端想要的信息的

从之前有关.handler()中,我们得知,服务器最终实例化了NioServerSocketChannel,向其管道pipeline中添加了一个 Acceptor用于处理客户端的连接。

我们也知道, NioServerSocketChannel 将自己注册到了 boss 单例线程池上,即 EventLoop 。

EventLoop 的作用是一个死循环,而这个循环中做 3 件事情:

  1. 有条件的等待 Nio 事件。
  2. 处理 Nio 事件。
  3. 处理消息队列中的任务。

服务端在bind之后该loop开始生效,客户端在connect之后开始生效。

注册之后的Channel后续的IO的操作都由Loop的Selector轮询到、加入TaskQueue中,由Loop线程进行处理。

那么,一个请求进来之后,服务端Channel会产生Read事件,Channel中的Handler中的Acceptor接收到这个请求,会为该请求创建一个新的 NioSocketChannel,注册到WorkGroup上一个EventLoop 上,并注册 selecot Read 事件。

服务器BossGroup轮询 Accept 事件,获取事件后调用 unsafe 的 read 方法读取请求信息。

发表评论

表情:
评论列表 (有 0 条评论,286人围观)

还没有评论,来说两句吧...

相关阅读