jdk源码解析八之NIO(selector)

女爷i 2023-02-24 12:28 169阅读 0赞

文章目录

  • 示例代码
  • register
  • open
  • select

示例代码

  1. ServerSocketChannel serverChannel = ServerSocketChannel.open();
  2. ServerSocket serverSocket = serverChannel.socket();
  3. Selector selector = Selector.open();
  4. serverSocket.bind (new InetSocketAddress (port));
  5. serverChannel.configureBlocking (false);
  6. serverChannel.register (selector, SelectionKey.OP_ACCEPT);
  7. while (true) {
  8. int n = selector.select();
  9. if (n == 0) {
  10. continue;
  11. }
  12. Iterator it = selector.selectedKeys().iterator();
  13. while (it.hasNext()) {
  14. SelectionKey key = (SelectionKey) it.next();
  15. if (key.isAcceptable()) {
  16. ServerSocketChannel server =
  17. (ServerSocketChannel) key.channel();
  18. SocketChannel channel = server.accept();
  19. }
  20. if (key.isReadable()) {
  21. //...
  22. }
  23. it.remove();
  24. }
  25. }

重点只需要关注通道的register和选择器的select方法

register

  1. public final SelectionKey register(Selector sel, int ops,
  2. Object att)
  3. throws ClosedChannelException
  4. {
  5. synchronized (regLock) {
  6. if (!isOpen())
  7. throw new ClosedChannelException();
  8. //是否支持特定操作
  9. if ((ops & ~validOps()) != 0)
  10. throw new IllegalArgumentException();
  11. //必须支持非阻塞
  12. if (blocking)
  13. throw new IllegalBlockingModeException();
  14. //从选择器keys中查找与通道匹配key
  15. SelectionKey k = findKey(sel);
  16. //查找到了
  17. if (k != null) {
  18. //修改特定操作
  19. k.interestOps(ops);
  20. //重新设置附件对象
  21. k.attach(att);
  22. }
  23. if (k == null) {
  24. // New registration
  25. synchronized (keyLock) {
  26. if (!isOpen())
  27. throw new ClosedChannelException();
  28. //注册
  29. k = ((AbstractSelector)sel).register(this, ops, att);
  30. //添加到缓存
  31. addKey(k);
  32. }
  33. }
  34. return k;
  35. }
  36. }
  37. private SelectionKey findKey(Selector sel) {
  38. synchronized (keyLock) {
  39. if (keys == null)
  40. return null;
  41. //从缓存的keys中获取匹配
  42. for (int i = 0; i < keys.length; i++)
  43. if ((keys[i] != null) && (keys[i].selector() == sel))
  44. return keys[i];
  45. return null;
  46. }
  47. }
  48. private void addKey(SelectionKey k) {
  49. assert Thread.holdsLock(keyLock);
  50. int i = 0;
  51. if ((keys != null) && (keyCount < keys.length)) {
  52. // Find empty element of key array
  53. for (i = 0; i < keys.length; i++)
  54. if (keys[i] == null)
  55. break;
  56. } else if (keys == null) {
  57. //第一次初始化容量3
  58. keys = new SelectionKey[3];
  59. } else {
  60. // Grow key array
  61. //空间不足,扩容一倍
  62. int n = keys.length * 2;
  63. SelectionKey[] ks = new SelectionKey[n];
  64. for (i = 0; i < keys.length; i++)
  65. ks[i] = keys[i];
  66. keys = ks;
  67. i = keyCount;
  68. }
  69. keys[i] = k;
  70. keyCount++;
  71. }
  72. protected final SelectionKey register(AbstractSelectableChannel ch,
  73. int ops,
  74. Object attachment)
  75. {
  76. if (!(ch instanceof SelChImpl))
  77. throw new IllegalSelectorException();
  78. //新建key,记录选择器和通道
  79. SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
  80. k.attach(attachment);
  81. synchronized (publicKeys) {
  82. //注册,与平台相关
  83. implRegister(k);
  84. }
  85. //设置感兴趣的特定操作
  86. k.interestOps(ops);
  87. return k;
  88. }

open

  1. public static Selector open() throws IOException {
  2. //与平台相关
  3. return SelectorProvider.provider().openSelector();
  4. }
  5. public static SelectorProvider provider() {
  6. synchronized (lock) {
  7. if (provider != null)
  8. return provider;
  9. return AccessController.doPrivileged(
  10. new PrivilegedAction<SelectorProvider>() {
  11. public SelectorProvider run() {
  12. //通过配置的java.nio.channels.spi.SelectorProvider值注入自定义的SelectorProvider
  13. if (loadProviderFromProperty())
  14. return provider;
  15. //通过ServiceLoad注入,然后获取配置的第一个服务
  16. if (loadProviderAsService())
  17. return provider;
  18. //创建的Provider与平台相关
  19. provider = sun.nio.ch.DefaultSelectorProvider.create();
  20. return provider;
  21. }
  22. });
  23. }
  24. }

select

  1. public int select(long timeout)
  2. throws IOException
  3. {
  4. if (timeout < 0)
  5. throw new IllegalArgumentException("Negative timeout");
  6. return lockAndDoSelect((timeout == 0) ? -1 : timeout);
  7. }
  8. //返回从上一个select( )调用之后进入就绪状态的通道的数量
  9. public int select() throws IOException {
  10. return select(0);
  11. }
  12. /*
  13. 1.已取消的键的集合将会被检查。如果它是非空的,每个已取消的键的集合中的键将从另外两个集合中移除,
  14. 并且相关的通道将被注销。这个步骤结束后,已取消的键的集合将是空的。
  15. 2.已注册的键的集合中的键的interest集合将被检查。在这个步骤中的检查执行过后,
  16. 对interest集合的改动不会影响剩余的检查过程。
  17. 一旦一个键处于已选择的键的集合中,这个键的ready集合将只会被设置,而不会被清理。
  18. 当通道上的至少一个感兴趣的操作就绪时,键的ready集合就会被清空,并且当前已经就绪的操作将会被添加到ready集合中。
  19. 该键之后将被添加到已选择的键的集合中。
  20. */
  21. protected abstract int doSelect(long timeout) throws IOException;
  22. private int lockAndDoSelect(long timeout) throws IOException {
  23. synchronized (this) {
  24. if (!isOpen())
  25. throw new ClosedSelectorException();
  26. synchronized (publicKeys) {
  27. synchronized (publicSelectedKeys) {
  28. return doSelect(timeout);
  29. }
  30. }
  31. }
  32. }

发表评论

表情:
评论列表 (有 0 条评论,169人围观)

还没有评论,来说两句吧...

相关阅读