Netty简单源码分析

我就是我 2023-07-11 06:16 83阅读 0赞
  1. Netty是基于NIO的一个异步网络框架,它将NIOselectorchannelbuffer封装在底层,提供了一层易于使用的api

Netty模型结构

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dqbHp4_size_16_color_FFFFFF_t_70

如上图所示,netty的入口是AbstractBootstrap:

  • 服务端使用的是ServerBootstrap,接收2个NioEventLoopGroup实例,按照职责划分成boss和work,boss负责处理accept请求,work负责处理read、write请求
  • 客户端使用的是Bootstrap,接收一个NioEventLoopGroup实例,负责处理read、write请求

Netty服务的创建和初始化

  1. NioEventLoopGroup里面管理着多个eventLoop,创建NioEventLoopGroup实例时,默认会创建处理器数量的两倍的eventLoop实例,每个eventLoop会维护一个selectortaskQueueselector即是NIO里面的多路复用器,taskQueue是存放请求任务的队列。

源码如下:

  1. /*MultithreadEventLoopGroup为EventLoopGroup的父类,创建实例时会调用以下方法,其中DEFAULT_EVENT_LOOP_THREADS为处理器数量的两倍*/
  2. protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
  3. super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
  4. }
  5. protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
  6. EventExecutorChooserFactory chooserFactory, Object... args) {
  7. if (nThreads <= 0) {
  8. throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
  9. }
  10. if (executor == null) {
  11. executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
  12. }
  13. children = new EventExecutor[nThreads];
  14. //这里遍历了nThreads次,调用了nThreads次newChild方法,创建了nThreads个NioEventLoop实例
  15. for (int i = 0; i < nThreads; i ++) {
  16. boolean success = false;
  17. try {
  18. children[i] = newChild(executor, args);
  19. success = true;
  20. } catch (Exception e) {
  21. // TODO: Think about if this is a good exception type
  22. throw new IllegalStateException("failed to create a child event loop", e);
  23. } finally {
  24. if (!success) {
  25. for (int j = 0; j < i; j ++) {
  26. children[j].shutdownGracefully();
  27. }
  28. for (int j = 0; j < i; j ++) {
  29. EventExecutor e = children[j];
  30. try {
  31. while (!e.isTerminated()) {
  32. e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
  33. }
  34. } catch (InterruptedException interrupted) {
  35. // Let the caller handle the interruption.
  36. Thread.currentThread().interrupt();
  37. break;
  38. }
  39. }
  40. }
  41. }
  42. }
  43. chooser = chooserFactory.newChooser(children);
  44. final FutureListener<Object> terminationListener = new FutureListener<Object>() {
  45. @Override
  46. public void operationComplete(Future<Object> future) throws Exception {
  47. if (terminatedChildren.incrementAndGet() == children.length) {
  48. terminationFuture.setSuccess(null);
  49. }
  50. }
  51. };
  52. for (EventExecutor e: children) {
  53. e.terminationFuture().addListener(terminationListener);
  54. }
  55. Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
  56. Collections.addAll(childrenSet, children);
  57. readonlyChildren = Collections.unmodifiableSet(childrenSet);
  58. }
  59. //创建NioEventLoop实例
  60. protected EventLoop newChild(Executor executor, Object... args) throws Exception {
  61. return new NioEventLoop(this, executor, (SelectorProvider) args[0],
  62. ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
  63. }

到这里,就已经创建了NIO里的多路复用器selector,接下来就是把channel注册到selector里去,netty的处理有点复杂,我也没有完全摸透,这里简单讲下我的理解(建议去看看狼哥的博客,讲得很详细,以下图片也来源于他的博客)

format_png

以ServerBootstrap为例,上图是ServerBootstrap创建的流程,我们重点看下initAndRegister方法:

  1. 先创建了一个netty对NIO的ServerSocketChannel封装的channel对象
  2. 通过chooser策略找到EventLoopGroup里的某个EventLoop
  3. 将channel注册到EventLoop的selector中

AbstractBootStrap的initAndRegister方法源码如下:

  1. final ChannelFuture initAndRegister() {
  2. Channel channel = null;
  3. try {
  4. //这里的channel是netty对NIO的channel自己封装的对象,用于接收Accept请求
  5. channel = channelFactory.newChannel();
  6. init(channel);
  7. } catch (Throwable t) {
  8. if (channel != null) {
  9. channel.unsafe().closeForcibly();
  10. return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
  11. }
  12. return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
  13. }
  14. //这里的group就是上面讲到的EventLoopGroup
  15. ChannelFuture regFuture = config().group().register(channel);
  16. if (regFuture.cause() != null) {
  17. if (channel.isRegistered()) {
  18. channel.close();
  19. } else {
  20. channel.unsafe().closeForcibly();
  21. }
  22. }
  23. return regFuture;
  24. }

MultithreadEventLoopGroup注册源码如下:

  1. //因为EventLoopGroup中维护了多个eventLoop,next方法会调用chooser策略找到下一个eventLoop,并执行eventLoop的register方法注册到eventLoop里的selector
  2. public ChannelFuture register(Channel channel) {
  3. return next().register(channel);
  4. }
  5. public EventLoop next() {
  6. return (EventLoop) super.next();
  7. }
  8. public EventExecutor next() {
  9. return chooser.next();
  10. }

Netty请求响应

  1. 如上面所说,netty注册到EventLoop的是自己封装的Channel对象,每个channel内部都会持有一个ChannelPipeline对象,ChannelPipeline的默认实现DefaultChannelPipeline类内部维护了一个ChannelHandlerContext链表,包括一个链表头head和链表尾tail
  2. channel注册到EventLoopselector多路复用器之后,当有请求时,EventLoop的处理方式也跟NIO类似:
  1. EventLoop会根据不同的key类型,调用channel的NioUnsafe对象中不同的方法来处理
  2. 在NioUnsafe的处理方法中,会遍历Channel里ChannelPipeline的ChannelHandlerContext链表,找到第一个符合要求的hanler类,执行其中的方法。

这里简单说下netty的事件在handler处理链中的传播:

  • read事件是靠调用ChannelHandlerContext.fireChannelRead()方法,会往后寻找下个Inboundhandler的channelRead方法,若调用channel或ChannelPipeline的fireChannelRead()方法,则从头开始找下个Inboundhandler;
  • writer事件则是靠ChannelHandlerContext.writer()方法,会往前找上一个outboundhandler,若调用channel或ChannelPipeline的writer()方法,则从尾开始找上个outboundhandler。

处理selectKey的源码如下:

  1. private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
  2. final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
  3. if (!k.isValid()) {
  4. final EventLoop eventLoop;
  5. try {
  6. eventLoop = ch.eventLoop();
  7. } catch (Throwable ignored) {
  8. return;
  9. }
  10. if (eventLoop != this || eventLoop == null) {
  11. return;
  12. }
  13. unsafe.close(unsafe.voidPromise());
  14. return;
  15. }
  16. //根据不同的key类型执行不同处理方法
  17. try {
  18. int readyOps = k.readyOps();
  19. if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
  20. int ops = k.interestOps();
  21. ops &= ~SelectionKey.OP_CONNECT;
  22. k.interestOps(ops);
  23. unsafe.finishConnect();
  24. }
  25. if ((readyOps & SelectionKey.OP_WRITE) != 0) {
  26. ch.unsafe().forceFlush();
  27. }
  28. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
  29. unsafe.read();
  30. }
  31. } catch (CancelledKeyException ignored) {
  32. unsafe.close(unsafe.voidPromise());
  33. }
  34. }

实际的处理链源码如下(这里以read请求为例):

  1. //NioUnsafe的read方法
  2. public void read() {
  3. //其他操作
  4. ···
  5. //调用pipeline的fireChannelRead和fireChannelReadComplete方法
  6. int size = readBuf.size();
  7. for (int i = 0; i < size; i ++) {
  8. readPending = false;
  9. pipeline.fireChannelRead(readBuf.get(i));
  10. }
  11. readBuf.clear();
  12. allocHandle.readComplete();
  13. pipeline.fireChannelReadComplete();
  14. }
  15. //pipeline的fireChannelRead方法
  16. public final ChannelPipeline fireChannelRead(Object msg) {
  17. //调用ChannelHandlerContext的invokeChannelRead方法
  18. AbstractChannelHandlerContext.invokeChannelRead(head, msg);
  19. return this;
  20. }
  21. //ChannelHandlerContext的invokeChannelRead方法
  22. static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
  23. final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
  24. EventExecutor executor = next.executor();
  25. if (executor.inEventLoop()) {
  26. //这里最终会找到第一个符合条件的handler类,执行里面的channelRead0方法
  27. next.invokeChannelRead(m);
  28. } else {
  29. executor.execute(new Runnable() {
  30. @Override
  31. public void run() {
  32. next.invokeChannelRead(m);
  33. }
  34. });
  35. }
  36. }

Netty服务端处理Accept请求

  1. 上面已经简单介绍了netty请求响应的过程,但似乎都是EvenLoop处理自己接收到的selectKey的流程,那netty是怎么把不同的请求分配到不同的EvenLoop里的呢,这里要从ServerBootstrap的初始化重新讲起:
  1. 之前说过,ServerBootstrap初始化时,会创建一个Channel,这个Channel是用于绑定我们指定的端口和接收Accept请求,然后在ServerBootstrap的init()里,会为这个Channel里ChannelPipeline的ChannelHandlerContext链表插入一个ServerBootstrapAcceptor处理类,然后按上面Netty请求响应的逻辑,在接收到Accept请求后,调用ServerBootstrapAcceptor里的channelRead方法
  2. 在ServerBootstrapAcceptor的channelRead方法里,会找到ServerBootstrap的workGroup(用于处理read、writer请求的group),然后跟之前注册channel的流程一样,通过chooser策略找到workGroup里的某个EventLoop,将这次Accept请求的channel注册到EventLoop的selector里

ServerBootStrap的init方法源码如下:

  1. void init(Channel channel) throws Exception {
  2. final Map<ChannelOption<?>, Object> options = options0();
  3. synchronized (options) {
  4. setChannelOptions(channel, options, logger);
  5. }
  6. final Map<AttributeKey<?>, Object> attrs = attrs0();
  7. synchronized (attrs) {
  8. for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
  9. @SuppressWarnings("unchecked")
  10. AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
  11. channel.attr(key).set(e.getValue());
  12. }
  13. }
  14. ChannelPipeline p = channel.pipeline();
  15. final EventLoopGroup currentChildGroup = childGroup;
  16. final ChannelHandler currentChildHandler = childHandler;
  17. final Entry<ChannelOption<?>, Object>[] currentChildOptions;
  18. final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
  19. synchronized (childOptions) {
  20. currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
  21. }
  22. synchronized (childAttrs) {
  23. currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
  24. }
  25. //为ChannelPipeline的handler链表插入ServerBootstrapAcceptor处理类
  26. p.addLast(new ChannelInitializer<Channel>() {
  27. @Override
  28. public void initChannel(final Channel ch) throws Exception {
  29. final ChannelPipeline pipeline = ch.pipeline();
  30. ChannelHandler handler = config.handler();
  31. if (handler != null) {
  32. pipeline.addLast(handler);
  33. }
  34. ch.eventLoop().execute(new Runnable() {
  35. @Override
  36. public void run() {
  37. pipeline.addLast(new ServerBootstrapAcceptor(
  38. ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
  39. }
  40. });
  41. }
  42. });
  43. }

ServerBootstrapAcceptor的channelRead方法源码如下:

  1. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  2. final Channel child = (Channel) msg;
  3. child.pipeline().addLast(childHandler);
  4. setChannelOptions(child, childOptions, logger);
  5. for (Entry<AttributeKey<?>, Object> e: childAttrs) {
  6. child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
  7. }
  8. try {
  9. //这里的childGroup就是ServerBootStrap的workGroup
  10. childGroup.register(child).addListener(new ChannelFutureListener() {
  11. @Override
  12. public void operationComplete(ChannelFuture future) throws Exception {
  13. if (!future.isSuccess()) {
  14. forceClose(child, future.cause());
  15. }
  16. }
  17. });
  18. } catch (Throwable t) {
  19. forceClose(child, t);
  20. }
  21. }

发表评论

表情:
评论列表 (有 0 条评论,83人围观)

还没有评论,来说两句吧...

相关阅读