Netty组件详解

太过爱你忘了你带给我的痛 2022-02-09 09:57 339阅读 0赞

之前在做项目的时候使用到了Netty这个网络框架,对于Java中的IO模型有了进一步的了解,熟悉的NIO非阻塞的模式。而Netty就是对于Java NIO 的高级封装。这篇文章就是个人根据Netty4.1.6的源码,进行了总结。

Netty组件

NioEventLoop

  对于Netty中的NioEventLoop这个组件来说,它就是类似于写的普通网络编程中的通过创建一个新的线程Thread来实现对于客户端的监听,这个监听如果放到主线程中会导致主线程阻塞,所以在实现的时候通过创建要给新线程的方式来实现。从这个角度上理解,这个NioEventLoop组件,主要做的两件事情,第一,就是建立客户端和服务器端段的连接,保证连接正常。第二,实现客户端和服务器端的数据的交互。
  但是在使用Thread实现上面两个功能的时候要保证监听过程处于一个持续监听的状态,也就是说在其中要实现一个死循环。通过这个死循环来持续监听连接在Netty中也提供了这样的一个方法。那就是下面这个方法

  1. @Override
  2. protected void run() {
  3. //实现一个死循环不断去绑定对应的监听状态
  4. for (;;) {
  5. try {
  6. //根据不同的装响应实现不同的操作
  7. switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
  8. case SelectStrategy.CONTINUE:
  9. continue;
  10. case SelectStrategy.SELECT:
  11. select(wakenUp.getAndSet(false));
  12. // 'wakenUp.compareAndSet(false, true)' is always evaluated
  13. // before calling 'selector.wakeup()' to reduce the wake-up
  14. // overhead. (Selector.wakeup() is an expensive operation.)
  15. //
  16. // However, there is a race condition in this approach.
  17. // The race condition is triggered when 'wakenUp' is set to
  18. // true too early.
  19. //
  20. // 'wakenUp' is set to true too early if:
  21. // 1) Selector is waken up between 'wakenUp.set(false)' and
  22. // 'selector.select(...)'. (BAD)
  23. // 2) Selector is waken up between 'selector.select(...)' and
  24. // 'if (wakenUp.get()) { ... }'. (OK)
  25. //
  26. // In the first case, 'wakenUp' is set to true and the
  27. // following 'selector.select(...)' will wake up immediately.
  28. // Until 'wakenUp' is set to false again in the next round,
  29. // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
  30. // any attempt to wake up the Selector will fail, too, causing
  31. // the following 'selector.select(...)' call to block
  32. // unnecessarily.
  33. //
  34. // To fix this problem, we wake up the selector again if wakenUp
  35. // is true immediately after selector.select(...).
  36. // It is inefficient in that it wakes up the selector for both
  37. // the first case (BAD - wake-up required) and the second case
  38. // (OK - no wake-up required).
  39. if (wakenUp.get()) {
  40. selector.wakeup();
  41. }
  42. default:
  43. // fallthrough
  44. }
  45. cancelledKeys = 0;
  46. needsToSelectAgain = false;
  47. final int ioRatio = this.ioRatio;
  48. //开始处理每个链接
  49. if (ioRatio == 100) {
  50. try {
  51. processSelectedKeys();
  52. } finally {
  53. // Ensure we always run tasks.
  54. runAllTasks();
  55. }
  56. } else {
  57. final long ioStartTime = System.nanoTime();
  58. try {
  59. processSelectedKeys();
  60. } finally {
  61. // Ensure we always run tasks.
  62. final long ioTime = System.nanoTime() - ioStartTime;
  63. runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
  64. }
  65. }
  66. } catch (Throwable t) {
  67. handleLoopException(t);
  68. }
  69. // Always handle shutdown even if the loop processing threw an exception.
  70. try {
  71. if (isShuttingDown()) {
  72. closeAll();
  73. if (confirmShutdown()) {
  74. return;
  75. }
  76. }
  77. } catch (Throwable t) {
  78. handleLoopException(t);
  79. }
  80. }
  81. }

  从代码中可以看出来真正实现对于数据读写操作就是从processSelectedKeys()方法开始的,而这个方法processSelectedKeys()。主要的作用是什么呢?

  1. //调用上面的处理方法
  2. private void processSelectedKeys() {
  3. //如果SelectionKey为空
  4. if (selectedKeys != null) {
  5. processSelectedKeysOptimized(selectedKeys.flip());
  6. } else {
  7. processSelectedKeysPlain(selector.selectedKeys());
  8. }
  9. }
  10. private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
  11. for (int i = 0;; i ++) {
  12. final SelectionKey k = selectedKeys[i];
  13. if (k == null) {
  14. break;
  15. }
  16. // null out entry in the array to allow to have it GC'ed once the Channel close
  17. // See https://github.com/netty/netty/issues/2363
  18. selectedKeys[i] = null;
  19. final Object a = k.attachment();
  20. if (a instanceof AbstractNioChannel) {
  21. processSelectedKey(k, (AbstractNioChannel) a);
  22. } else {
  23. @SuppressWarnings("unchecked")
  24. NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
  25. processSelectedKey(k, task);
  26. }
  27. if (needsToSelectAgain) {
  28. // null out entries in the array to allow to have it GC'ed once the Channel close
  29. // See https://github.com/netty/netty/issues/2363
  30. for (;;) {
  31. i++;
  32. if (selectedKeys[i] == null) {
  33. break;
  34. }
  35. selectedKeys[i] = null;
  36. }
  37. selectAgain();
  38. // Need to flip the optimized selectedKeys to get the right reference to the array
  39. // and reset the index to -1 which will then set to 0 on the for loop
  40. // to start over again.
  41. //
  42. // See https://github.com/netty/netty/issues/1523
  43. selectedKeys = this.selectedKeys.flip();
  44. i = -1;
  45. }
  46. }
  47. }
  48. private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
  49. // check if the set is empty and if so just return to not create garbage by
  50. // creating a new Iterator every time even if there is nothing to process.
  51. // See https://github.com/netty/netty/issues/597
  52. if (selectedKeys.isEmpty()) {
  53. return;
  54. }
  55. Iterator<SelectionKey> i = selectedKeys.iterator();
  56. for (;;) {
  57. final SelectionKey k = i.next();
  58. final Object a = k.attachment();
  59. i.remove();
  60. if (a instanceof AbstractNioChannel) {
  61. processSelectedKey(k, (AbstractNioChannel) a);
  62. } else {
  63. @SuppressWarnings("unchecked")
  64. NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
  65. processSelectedKey(k, task);
  66. }
  67. if (!i.hasNext()) {
  68. break;
  69. }
  70. if (needsToSelectAgain) {
  71. selectAgain();
  72. selectedKeys = selector.selectedKeys();
  73. // Create the iterator again to avoid ConcurrentModificationException
  74. if (selectedKeys.isEmpty()) {
  75. break;
  76. } else {
  77. i = selectedKeys.iterator();
  78. }
  79. }
  80. }
  81. }

  可以看到上面的processSelectedKeys()方法通过对于SelectionKey的判断实现了两种不同的逻辑。但是这两个逻辑最为核心的操作就是AbstractNioChannel类的存在,可以继续跟进源码会发现这个类其实就是一个Channel,在Java NIO中我们知道一个Channel就类似于普通网络编程中的一个Socket。所以说这里的AbstractNioChannel是对于Channel在Netty NIO实现的基础上做了进一步的封装。

Channel

  在上面提到的概念就是Channel可以理解为一个Socket那么既然是用来做客户端和服务器端的数据传递工作,首先需要搞清楚的是Channel是什么时候创建?第二点就是Channel底层实现也是通过Socket来实现的又对Socket做了哪些优化呢?
在NioEventLoop类中有如下的一个方法

  1. private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
  2. //获取到unsafe
  3. final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
  4. if (!k.isValid()) {
  5. final EventLoop eventLoop;
  6. try {
  7. eventLoop = ch.eventLoop();
  8. } catch (Throwable ignored) {
  9. // If the channel implementation throws an exception because there is no event loop, we ignore this
  10. // because we are only trying to determine if ch is registered to this event loop and thus has authority
  11. // to close ch.
  12. return;
  13. }
  14. // Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop
  15. // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
  16. // still healthy and should not be closed.
  17. // See https://github.com/netty/netty/issues/5125
  18. if (eventLoop != this || eventLoop == null) {
  19. return;
  20. }
  21. // close the channel if the key is not valid anymore
  22. unsafe.close(unsafe.voidPromise());
  23. return;
  24. }
  25. try {
  26. int readyOps = k.readyOps();
  27. // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
  28. // the NIO JDK channel implementation may throw a NotYetConnectedException.
  29. if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
  30. // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
  31. // See https://github.com/netty/netty/issues/924
  32. int ops = k.interestOps();
  33. ops &= ~SelectionKey.OP_CONNECT;
  34. k.interestOps(ops);
  35. unsafe.finishConnect();
  36. }
  37. // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
  38. if ((readyOps & SelectionKey.OP_WRITE) != 0) {
  39. // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
  40. ch.unsafe().forceFlush();
  41. }
  42. // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
  43. // to a spin loop
  44. //这里有一个对于OP_ACCEPT的判断。
  45. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
  46. unsafe.read();
  47. if (!ch.isOpen()) {
  48. // Connection already closed - no need to handle write.
  49. return;
  50. }
  51. }
  52. } catch (CancelledKeyException ignored) {
  53. unsafe.close(unsafe.voidPromise());
  54. }
  55. }

在上面这个方法中有一块逻辑是指定注意的

  1. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
  2. unsafe.read();
  3. if (!ch.isOpen()) {
  4. // Connection already closed - no need to handle write.
  5. return;
  6. }
  7. }

  在上面这段逻辑中,可以看到在SelectionKey判断的时候会有一个OP_ACCEPT事件,这里就会有一个unsafe.read的方法。可进入NioMessageUnsafe中查看read方法如下。而这个NioMessageUnsafe就是对于连接处理的一个类。

  1. private final class NioMessageUnsafe extends AbstractNioUnsafe {
  2. private final List<Object> readBuf = new ArrayList<Object>();
  3. @Override
  4. public void read() {
  5. assert eventLoop().inEventLoop();
  6. final ChannelConfig config = config();
  7. final ChannelPipeline pipeline = pipeline();
  8. final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
  9. allocHandle.reset(config);
  10. boolean closed = false;
  11. Throwable exception = null;
  12. try {
  13. try {
  14. do {
  15. int localRead = doReadMessages(readBuf);
  16. if (localRead == 0) {
  17. break;
  18. }
  19. if (localRead < 0) {
  20. closed = true;
  21. break;
  22. }
  23. allocHandle.incMessagesRead(localRead);
  24. } while (allocHandle.continueReading());
  25. } catch (Throwable t) {
  26. exception = t;
  27. }
  28. int size = readBuf.size();
  29. for (int i = 0; i < size; i ++) {
  30. readPending = false;
  31. pipeline.fireChannelRead(readBuf.get(i));
  32. }
  33. readBuf.clear();
  34. allocHandle.readComplete();
  35. pipeline.fireChannelReadComplete();
  36. if (exception != null) {
  37. closed = closeOnReadError(exception);
  38. pipeline.fireExceptionCaught(exception);
  39. }
  40. if (closed) {
  41. inputShutdown = true;
  42. if (isOpen()) {
  43. close(voidPromise());
  44. }
  45. }
  46. } finally {
  47. // Check if there is a readPending which was not processed yet.
  48. // This could be for two reasons:
  49. // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
  50. // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
  51. //
  52. // See https://github.com/netty/netty/issues/2254
  53. if (!readPending && !config.isAutoRead()) {
  54. removeReadOp();
  55. }
  56. }
  57. }
  58. }

  在上面逻辑中值得关注一个方法便是doReadMessages(readBuf)方法,查看这个方法的实现会发现,是由NioServerSocketChannel实现。到这里终于是接近Java NIO的底层了。

  1. @Override
  2. protected int doReadMessages(List<Object> buf) throws Exception {
  3. SocketChannel ch = javaChannel().accept();
  4. try {
  5. if (ch != null) {
  6. buf.add(new NioSocketChannel(this, ch));
  7. return 1;
  8. }
  9. } catch (Throwable t) {
  10. logger.warn("Failed to create a new channel from an accepted socket.", t);
  11. try {
  12. ch.close();
  13. } catch (Throwable t2) {
  14. logger.warn("Failed to close a socket.", t2);
  15. }
  16. }
  17. return 0;
  18. }

  关于Java底层NIO的实现,而这里所对应的SocketChannel就是对应的NIO模型中的SocketChannel。通过获取到对应的ServerSocketChannel,进行的accept操作。在获取到对象之后将其添加到一个Object的列表中。而这个列表理解为获取到了一个Channel的列表。Netty直接把一个SocketChannel封装成了一个NioSocketChannel。然后通过一个List进行返回,最后只需要通过这个NioSocketChannel进行数据的读写操作就可以了。

  1. @Override
  2. protected ServerSocketChannel javaChannel() {
  3. return (ServerSocketChannel) super.javaChannel();
  4. }

ByteBuf

  在Java实现的NIO编程中有一个ByteBuffer,这个ByteBuffer是对IO流的封装,在Netty中还有就是对于ByteBuffer的封装ByteBuf类,在这个类中定义了很多的read和write方法, 而这些方法就是对一普通IO中的输入输出流。如下图
在这里插入图片描述

Pipline

  Pipeline,对应的普通IO编程中的对于数据的处理操作。那么Netty是什么时候将Pipeline加入到对应的连接处理过程中的呢?深究一下会发现在AbstractChannel的构造方法中调用了newChannelPipeline方法。而这个方法创建了一个默认的ChannelPipeline。也就是说我们也可以自己实现这个Pipeline。

  1. protected AbstractChannel(Channel parent) {
  2. this.parent = parent;
  3. id = newId();
  4. unsafe = newUnsafe();
  5. pipeline = newChannelPipeline();
  6. }
  7. protected DefaultChannelPipeline newChannelPipeline() {
  8. return new DefaultChannelPipeline(this);
  9. }

通过下面的操作,最终将逻辑处理操作加载到Channel中。

  1. protected DefaultChannelPipeline(Channel channel) {
  2. this.channel = ObjectUtil.checkNotNull(channel, "channel");
  3. succeededFuture = new SucceededChannelFuture(channel, null);
  4. voidPromise = new VoidChannelPromise(channel, true);
  5. tail = new TailContext(this);
  6. head = new HeadContext(this);
  7. head.next = tail;
  8. tail.prev = head;
  9. }

ChannelHandler

  对应业务逻辑处理块,在ChannelPipeline中有很多的add方法、remove方法其实这些方法就是实现了对于处理逻辑的动态的处理。这里使用的一个策略模式 使用者可以根据不同的处理逻辑实现不同的Handler,但是整个的处理逻辑是不变的就是实现对于数据的读写操作。这里看一个比较常用的方法addLast()。无论传入什么样的Handler都不会改变这个方法的实现逻辑。而这方法就是将对应的处理逻辑加入到整个处理的最后。

  1. @Override
  2. public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
  3. if (handlers == null) {
  4. throw new NullPointerException("handlers");
  5. }
  6. for (ChannelHandler h: handlers) {
  7. if (h == null) {
  8. break;
  9. }
  10. addLast(executor, null, h);
  11. }
  12. return this;
  13. }

总结

  1. NioEventLoop组件
  2. Channel组件
  3. ByteBuf组件
  4. Pipeline组件
  5. ChannelHandler组件

发表评论

表情:
评论列表 (有 0 条评论,339人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Android详解—TextView

    这篇博文献给正在android学习路上的弟弟,希望有一天以下内容对你有所帮助。 了解一个类,首先要了解他的结构,尤其是API中的这种类。 先了解下`TextView`的结构