netty源码阅读之客户端新连接之检测新连接

雨点打透心脏的1/2处 2022-05-17 12:18 221阅读 0赞

boss线程的NioEventLoop在用户代码调用bond的时候启动。

检测新连接我们从《netty源码阅读之NioEventLoop之NioEventLoop执行——-processSelectedKey()执行》这一篇文章的这个方法入口:processSelectedKeysOptimized(SelectionKey[] selectedKeys)

看到这一步:

  1. final Object a = k.attachment();
  2. if (a instanceof AbstractNioChannel) {
  3. processSelectedKey(k, (AbstractNioChannel) a);
  4. } else {
  5. @SuppressWarnings("unchecked")
  6. NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
  7. processSelectedKey(k, task);
  8. }

之前我们的NioSocket以attachment的方式传入select key set里面,所以我们进入

  1. processSelectedKey(k, (AbstractNioChannel) a);

这个方法里面。

  1. private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
  2. for (int i = 0;; i ++) {
  3. final SelectionKey k = selectedKeys[i];
  4. if (k == null) {
  5. break;
  6. }
  7. // null out entry in the array to allow to have it GC'ed once the Channel close
  8. // See https://github.com/netty/netty/issues/2363
  9. selectedKeys[i] = null;
  10. final Object a = k.attachment();
  11. if (a instanceof AbstractNioChannel) {
  12. processSelectedKey(k, (AbstractNioChannel) a);
  13. } else {
  14. @SuppressWarnings("unchecked")
  15. NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
  16. processSelectedKey(k, task);
  17. }
  18. if (needsToSelectAgain) {
  19. // null out entries in the array to allow to have it GC'ed once the Channel close
  20. // See https://github.com/netty/netty/issues/2363
  21. for (;;) {
  22. i++;
  23. if (selectedKeys[i] == null) {
  24. break;
  25. }
  26. selectedKeys[i] = null;
  27. }
  28. selectAgain();
  29. // Need to flip the optimized selectedKeys to get the right reference to the array
  30. // and reset the index to -1 which will then set to 0 on the for loop
  31. // to start over again.
  32. //
  33. // See https://github.com/netty/netty/issues/1523
  34. selectedKeys = this.selectedKeys.flip();
  35. i = -1;
  36. }
  37. }
  38. }
  39. private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
  40. final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
  41. ..
  42. try {
  43. int readyOps = k.readyOps();
  44. // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
  45. // the NIO JDK channel implementation may throw a NotYetConnectedException.
  46. if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
  47. ...
  48. }
  49. // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
  50. if ((readyOps & SelectionKey.OP_WRITE) != 0) {
  51. ...
  52. }
  53. // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
  54. // to a spin loop
  55. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
  56. unsafe.read();
  57. if (!ch.isOpen()) {
  58. // Connection already closed - no need to handle write.
  59. return;
  60. }
  61. }
  62. } catch (CancelledKeyException ignored) {
  63. unsafe.close(unsafe.voidPromise());
  64. }
  65. }

由于服务端的socketChannel注册的是accept事件,所以我们会进入这个:

  1. // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
  2. // to a spin loop
  3. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
  4. unsafe.read();
  5. if (!ch.isOpen()) {
  6. // Connection already closed - no need to handle write.
  7. return;
  8. }
  9. }

NioMessageUnsafe.read()

服务端NioServerSocketChannel的unsafe的实现是NioMessageUnsafe(后续会有文章介绍unsafe),所以我们到达了这个方法:

  1. private final class NioMessageUnsafe extends AbstractNioUnsafe {
  2. private final List<Object> readBuf = new ArrayList<Object>();
  3. @Override
  4. public void read() {
  5. assert eventLoop().inEventLoop();
  6. final ChannelConfig config = config();
  7. final ChannelPipeline pipeline = pipeline();
  8. final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
  9. allocHandle.reset(config);
  10. boolean closed = false;
  11. Throwable exception = null;
  12. try {
  13. try {
  14. do {
  15. int localRead = doReadMessages(readBuf);
  16. if (localRead == 0) {
  17. break;
  18. }
  19. if (localRead < 0) {
  20. closed = true;
  21. break;
  22. }
  23. allocHandle.incMessagesRead(localRead);
  24. } while (allocHandle.continueReading());
  25. } catch (Throwable t) {
  26. exception = t;
  27. }
  28. int size = readBuf.size();
  29. for (int i = 0; i < size; i ++) {
  30. readPending = false;
  31. pipeline.fireChannelRead(readBuf.get(i));
  32. }
  33. ...
  34. if (closed) {
  35. inputShutdown = true;
  36. if (isOpen()) {
  37. close(voidPromise());
  38. }
  39. }
  40. } finally {
  41. ...
  42. }
  43. }
  44. }

这个方法里面有个doMessageRead(),是主要的读方法,通过while循环不停地读数据。

这个final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();代码里面有个handle,是读的策略(控制连接接入的速度,每次有一个新连接就加1,默认最大只读16个连接)。在这个

allocHandle.continueReading()里面,可以知道它什么时候停下来:

  1. @Override
  2. public boolean continueReading() {
  3. return config.isAutoRead() &&
  4. attemptedBytesRead == lastBytesRead &&
  5. totalMessages < maxMessagePerRead &&
  6. totalBytesRead < Integer.MAX_VALUE;
  7. }

totalMessages<maxMessagePerRead是判断读到的连接是否大于最大连接,maxMessagePerRead默认是16。

我们继续doMessageRead()吧。

doMessageRead()

  1. @Override
  2. protected int doReadMessages(List<Object> buf) throws Exception {
  3. SocketChannel ch = javaChannel().accept();
  4. try {
  5. if (ch != null) {
  6. buf.add(new NioSocketChannel(this, ch));
  7. return 1;
  8. }
  9. } catch (Throwable t) {
  10. logger.warn("Failed to create a new channel from an accepted socket.", t);
  11. try {
  12. ch.close();
  13. } catch (Throwable t2) {
  14. logger.warn("Failed to close a socket.", t2);
  15. }
  16. }
  17. return 0;
  18. }

在这里面,SocketChannel是通过jdk实现的。然后这个buf.add(new NioSocketChannel(this, ch));新建一个客户端的NioSocketChannel,把jdk实现得SocketChannel绑定进去(把jdk底层的channel封装进去),并且把this,也就是服务端NioServerSocketChannel也作为参数传递进去。

buf是之前传入进来的,其实他就是之前的readBuf,一个list,保存的是客户端的NioSocketChannel:

  1. private final List<Object> readBuf = new ArrayList<Object>();

下一篇文章,我们分析new NioSocketChannel(this, ch)这一步做了什么事情。

发表评论

表情:
评论列表 (有 0 条评论,221人围观)

还没有评论,来说两句吧...

相关阅读

    相关 netty阅读ByteBuf

    今天我们开启新的篇章,netty很重要的内存概念将在这一章介绍。ByteBuf主要介绍以下几点: 1、内存与内存管理器的抽象 2、不同规格大小和不同类别的内存的分配策略