jdk源码解析八之NIO(socketChannel)

亦凉 2023-02-24 03:59 81阅读 0赞

ServerSocketChannel

  1. ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
  2. super(sp);
  3. //创建FileDescriptor
  4. this.fd = Net.serverSocket(true);
  5. //获取到文件描述符的值。
  6. this.fdVal = IOUtil.fdVal(fd);
  7. //标记状态为使用
  8. this.state = ST_INUSE;
  9. }
  10. ServerSocketChannelImpl(SelectorProvider sp,
  11. FileDescriptor fd,
  12. boolean bound)
  13. throws IOException
  14. {
  15. super(sp);
  16. this.fd = fd;
  17. this.fdVal = IOUtil.fdVal(fd);
  18. this.state = ST_INUSE;
  19. //已绑定则直接获取地址
  20. if (bound)
  21. //获取传入的文件描述符的socket地址
  22. localAddress = Net.localAddress(fd);
  23. }
  24. @Override
  25. public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
  26. synchronized (lock) {
  27. //判断是否通道打开以及是否绑定
  28. if (!isOpen())
  29. throw new ClosedChannelException();
  30. if (isBound())
  31. throw new AlreadyBoundException();
  32. //绑定地址为null处理
  33. InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) :
  34. Net.checkAddress(local);
  35. SecurityManager sm = System.getSecurityManager();
  36. //检测端口是否被监听
  37. if (sm != null)
  38. sm.checkListen(isa.getPort());
  39. //将fd转换为SDP(Sockets Direct Protocol,Java套接字直接协议) socket。
  40. //SDP需要网卡支持InfiniBand高速网络通信技术,windows不支持该协议。
  41. NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
  42. //连接
  43. Net.bind(fd, isa.getAddress(), isa.getPort());
  44. //默认tcp待连接队列长度为50
  45. Net.listen(fd, backlog < 1 ? 50 : backlog);
  46. synchronized (stateLock) {
  47. //获取文件描述符的socket地址
  48. localAddress = Net.localAddress(fd);
  49. }
  50. }
  51. return this;
  52. }
  53. public SocketChannel accept() throws IOException {
  54. synchronized (lock) {
  55. if (!isOpen())
  56. throw new ClosedChannelException();
  57. if (!isBound())
  58. throw new NotYetBoundException();
  59. SocketChannel sc = null;
  60. int n = 0;
  61. FileDescriptor newfd = new FileDescriptor();
  62. InetSocketAddress[] isaa = new InetSocketAddress[1];
  63. try {
  64. begin();
  65. if (!isOpen())
  66. return null;
  67. thread = NativeThread.current();
  68. for (;;) {
  69. //接收一个新的连接
  70. //将isaa设置为socket的远程地址
  71. //将给定的文件描述符与socket客户端绑定
  72. n = accept(this.fd, newfd, isaa);
  73. //返回1成功
  74. //系统执行中断,继续等待接收
  75. if ((n == IOStatus.INTERRUPTED) && isOpen())
  76. continue;
  77. break;
  78. }
  79. } finally {
  80. thread = 0;
  81. end(n > 0);
  82. assert IOStatus.check(n);
  83. }
  84. //返回1成功
  85. if (n < 1)
  86. return null;
  87. //默认阻塞模式
  88. IOUtil.configureBlocking(newfd, true);
  89. InetSocketAddress isa = isaa[0];
  90. //初始化客户端SocketChannel
  91. sc = new SocketChannelImpl(provider(), newfd, isa);
  92. //检查权限
  93. SecurityManager sm = System.getSecurityManager();
  94. if (sm != null) {
  95. try {
  96. sm.checkAccept(isa.getAddress().getHostAddress(),
  97. isa.getPort());
  98. } catch (SecurityException x) {
  99. sc.close();
  100. throw x;
  101. }
  102. }
  103. return sc;
  104. }
  105. }
  106. //配置是否阻塞
  107. protected void implConfigureBlocking(boolean block) throws IOException {
  108. IOUtil.configureBlocking(fd, block);
  109. }
  110. //关闭socket
  111. protected void implCloseSelectableChannel() throws IOException {
  112. synchronized (stateLock) {
  113. if (state != ST_KILLED)
  114. nd.preClose(fd);
  115. long th = thread;
  116. if (th != 0)
  117. //发送信号给线程,将其从阻塞I/O中释放,避免一直被阻塞。
  118. NativeThread.signal(th);
  119. //若还有注册的channel,则不处理,等待key全部注销后再kill
  120. //若没有的话可以直接kill当前channel
  121. if (!isRegistered())
  122. kill();
  123. }
  124. }

SocketChannel

生命周期

在这里插入图片描述

connect

  1. public static SocketChannel open(SocketAddress remote)
  2. throws IOException
  3. {
  4. SocketChannel sc = open();
  5. try {
  6. //建立连接
  7. sc.connect(remote);
  8. } catch (Throwable x) {
  9. try {
  10. sc.close();
  11. } catch (Throwable suppressed) {
  12. x.addSuppressed(suppressed);
  13. }
  14. throw x;
  15. }
  16. assert sc.isConnected();
  17. return sc;
  18. }
  19. public boolean connect(SocketAddress sa) throws IOException {
  20. int localPort = 0;
  21. synchronized (readLock) {
  22. synchronized (writeLock) {
  23. ensureOpenAndUnconnected();
  24. InetSocketAddress isa = Net.checkAddress(sa);
  25. SecurityManager sm = System.getSecurityManager();
  26. if (sm != null)
  27. sm.checkConnect(isa.getAddress().getHostAddress(),
  28. isa.getPort());
  29. synchronized (blockingLock()) {
  30. int n = 0;
  31. try {
  32. try {
  33. begin();
  34. synchronized (stateLock) {
  35. if (!isOpen()) {
  36. return false;
  37. }
  38. // notify hook only if unbound
  39. if (localAddress == null) {
  40. //将fd转换为SDP(Sockets Direct Protocol,Java套接字直接协议) socket。
  41. NetHooks.beforeTcpConnect(fd,
  42. isa.getAddress(),
  43. isa.getPort());
  44. }
  45. readerThread = NativeThread.current();
  46. }
  47. for (;;) {
  48. InetAddress ia = isa.getAddress();
  49. if (ia.isAnyLocalAddress())
  50. //返回本地主机地址
  51. ia = InetAddress.getLocalHost();
  52. //建立连接
  53. n = Net.connect(fd,
  54. ia,
  55. isa.getPort());
  56. if ( (n == IOStatus.INTERRUPTED)
  57. && isOpen())
  58. continue;
  59. break;
  60. }
  61. } finally {
  62. readerCleanup();
  63. end((n > 0) || (n == IOStatus.UNAVAILABLE));
  64. assert IOStatus.check(n);
  65. }
  66. } catch (IOException x) {
  67. // If an exception was thrown, close the channel after
  68. // invoking end() so as to avoid bogus
  69. // AsynchronousCloseExceptions
  70. close();
  71. throw x;
  72. }
  73. synchronized (stateLock) {
  74. remoteAddress = isa;
  75. if (n > 0) {
  76. // Connection succeeded; disallow further
  77. // invocation
  78. //更新状态
  79. state = ST_CONNECTED;
  80. if (isOpen())
  81. localAddress = Net.localAddress(fd);
  82. return true;
  83. }
  84. // If nonblocking and no exception then connection
  85. // pending; disallow another invocation
  86. //非阻塞状态下更新为待连接
  87. if (!isBlocking())
  88. state = ST_PENDING;
  89. else
  90. assert false;
  91. }
  92. }
  93. return false;
  94. }
  95. }
  96. }

关于linux的聚集写和散步读具体内容,参考readv()和writev()函数

write

  1. public int write(ByteBuffer buf) throws IOException {
  2. if (buf == null)
  3. throw new NullPointerException();
  4. synchronized (writeLock) {
  5. ensureWriteOpen();
  6. int n = 0;
  7. try {
  8. begin();
  9. synchronized (stateLock) {
  10. if (!isOpen())
  11. return 0;
  12. writerThread = NativeThread.current();
  13. }
  14. for (;;) {
  15. //写入buf
  16. n = IOUtil.write(fd, buf, -1, nd);
  17. if ((n == IOStatus.INTERRUPTED) && isOpen())
  18. continue;
  19. return IOStatus.normalize(n);
  20. }
  21. } finally {
  22. writerCleanup();
  23. end(n > 0 || (n == IOStatus.UNAVAILABLE));
  24. synchronized (stateLock) {
  25. if ((n <= 0) && (!isOutputOpen))
  26. throw new AsynchronousCloseException();
  27. }
  28. assert IOStatus.check(n);
  29. }
  30. }
  31. }
  32. static int write(FileDescriptor fd, ByteBuffer src, long position,
  33. NativeDispatcher nd)
  34. throws IOException
  35. {
  36. //如果是直接缓冲区则写入本地缓冲区中
  37. if (src instanceof DirectBuffer)
  38. return writeFromNativeBuffer(fd, src, position, nd);
  39. //如果缓冲区是基于堆的,会多一次内存复制
  40. // Substitute a native buffer
  41. int pos = src.position();
  42. int lim = src.limit();
  43. assert (pos <= lim);
  44. int rem = (pos <= lim ? lim - pos : 0);
  45. //创建临时缓冲区
  46. ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
  47. try {
  48. //写入临时缓冲区中
  49. bb.put(src);
  50. bb.flip();
  51. // Do not update src until we see how many bytes were written
  52. src.position(pos);
  53. //直接缓冲区数据写入
  54. int n = writeFromNativeBuffer(fd, bb, position, nd);
  55. if (n > 0) {
  56. // now update src
  57. //更新实际写入量
  58. src.position(pos + n);
  59. }
  60. return n;
  61. } finally {
  62. //释放临时缓冲区
  63. Util.offerFirstTemporaryDirectBuffer(bb);
  64. }
  65. }

当写入的buffer使用堆,会多一次内存复制.《Java NIO为什么需要DirectByteBuffer作为中间缓冲区》

  1. private static ThreadLocal<BufferCache> bufferCache =
  2. new ThreadLocal<BufferCache>()
  3. {
  4. @Override
  5. protected BufferCache initialValue() {
  6. return new BufferCache();
  7. }
  8. };
  9. public static ByteBuffer getTemporaryDirectBuffer(int size) {
  10. //如果所需缓冲区太大,直接新建
  11. if (isBufferTooLarge(size)) {
  12. return ByteBuffer.allocateDirect(size);
  13. }
  14. //从线程缓冲区中,获取一个缓冲区
  15. BufferCache cache = bufferCache.get();
  16. //获取容量>=size的缓冲区
  17. ByteBuffer buf = cache.get(size);
  18. if (buf != null) {
  19. return buf;
  20. } else {
  21. //当没有可用缓冲区
  22. if (!cache.isEmpty()) {
  23. //第一个缓冲区移除缓存
  24. buf = cache.removeFirst();
  25. //释放
  26. free(buf);
  27. }
  28. //重新分配一个合适大小的缓冲区
  29. return ByteBuffer.allocateDirect(size);
  30. }
  31. }
  32. ByteBuffer get(int size) {
  33. // Don't call this if the buffer would be too large.
  34. //断言缓冲区是否过大
  35. assert !isBufferTooLarge(size);
  36. //缓存没直接返回
  37. if (count == 0)
  38. return null; // cache is empty
  39. ByteBuffer[] buffers = this.buffers;
  40. // search for suitable buffer (often the first buffer will do)
  41. ByteBuffer buf = buffers[start];
  42. //如果取出第一个buf容量<size,则遍历查找
  43. if (buf.capacity() < size) {
  44. buf = null;
  45. int i = start;
  46. //查找>=size缓冲区
  47. while ((i = next(i)) != start) {
  48. ByteBuffer bb = buffers[i];
  49. if (bb == null)
  50. break;
  51. if (bb.capacity() >= size) {
  52. buf = bb;
  53. break;
  54. }
  55. }
  56. if (buf == null)
  57. return null;
  58. // move first element to here to avoid re-packing
  59. //覆盖取出的元素
  60. buffers[i] = buffers[start];
  61. }
  62. // remove first element
  63. //start下标的buf不管满足size与否,都会置空,如果满足size,置空后start移动到下一个位置
  64. //如果不满足size,则从后遍历查找到符合size要求的,然后start覆盖,start移动到下一个位置
  65. buffers[start] = null;
  66. start = next(start);
  67. count--;
  68. // prepare the buffer and return it
  69. //重置pos,设置limit上限
  70. buf.rewind();
  71. buf.limit(size);
  72. return buf;
  73. }
  74. static void offerFirstTemporaryDirectBuffer(ByteBuffer buf) {
  75. // If the buffer is too large for the cache we don't have to
  76. // check the cache. We'll just free it.
  77. //太大直接释放
  78. if (isBufferTooLarge(buf)) {
  79. free(buf);
  80. return;
  81. }
  82. assert buf != null;
  83. BufferCache cache = bufferCache.get();
  84. //
  85. if (!cache.offerFirst(buf)) {
  86. // cache is full
  87. //满了直接释放
  88. free(buf);
  89. }
  90. }
  91. boolean offerFirst(ByteBuffer buf) {
  92. // Don't call this if the buffer is too large.
  93. //断言
  94. assert !isBufferTooLarge(buf);
  95. //超过缓冲容量,返回false
  96. if (count >= TEMP_BUF_POOL_SIZE) {
  97. return false;
  98. } else {
  99. //计算前一个位置,前一位绝对是为null的
  100. start = (start + TEMP_BUF_POOL_SIZE - 1) % TEMP_BUF_POOL_SIZE;
  101. buffers[start] = buf;
  102. count++;
  103. return true;
  104. }
  105. }

聚集写

按照顺序写入多个buf

  1. //一次性写入多个buf
  2. //聚集写
  3. public long write(ByteBuffer[] srcs, int offset, int length)
  4. throws IOException
  5. {
  6. if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
  7. throw new IndexOutOfBoundsException();
  8. synchronized (writeLock) {
  9. ensureWriteOpen();
  10. long n = 0;
  11. try {
  12. begin();
  13. synchronized (stateLock) {
  14. if (!isOpen())
  15. return 0;
  16. writerThread = NativeThread.current();
  17. }
  18. for (;;) {
  19. //写入多个buf
  20. n = IOUtil.write(fd, srcs, offset, length, nd);
  21. if ((n == IOStatus.INTERRUPTED) && isOpen())
  22. continue;
  23. return IOStatus.normalize(n);
  24. }
  25. } finally {
  26. writerCleanup();
  27. end((n > 0) || (n == IOStatus.UNAVAILABLE));
  28. synchronized (stateLock) {
  29. if ((n <= 0) && (!isOutputOpen))
  30. throw new AsynchronousCloseException();
  31. }
  32. assert IOStatus.check(n);
  33. }
  34. }
  35. }
  36. public class IOUtil {
  37. static long write(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length,
  38. NativeDispatcher nd)
  39. throws IOException
  40. {
  41. //封装多个buf
  42. IOVecWrapper vec = IOVecWrapper.get(length);
  43. boolean completed = false;
  44. int iov_len = 0;
  45. try {
  46. // Iterate over buffers to populate native iovec array.
  47. int count = offset + length;
  48. int i = offset;
  49. while (i < count && iov_len < IOV_MAX) {
  50. //遍历每一块缓冲区
  51. ByteBuffer buf = bufs[i];
  52. //计算可读大小
  53. int pos = buf.position();
  54. int lim = buf.limit();
  55. assert (pos <= lim);
  56. int rem = (pos <= lim ? lim - pos : 0);
  57. //将buf放入IOVecWrapper
  58. if (rem > 0) {
  59. vec.setBuffer(iov_len, buf, pos, rem);
  60. // allocate shadow buffer to ensure I/O is done with direct buffer
  61. //针对buf不是直接缓冲区的处理
  62. if (!(buf instanceof DirectBuffer)) {
  63. ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
  64. shadow.put(buf);
  65. shadow.flip();
  66. vec.setShadow(iov_len, shadow);
  67. buf.position(pos); // temporarily restore position in user buffer
  68. buf = shadow;
  69. pos = shadow.position();
  70. }
  71. //设置缓冲区的起始地址
  72. vec.putBase(iov_len, ((DirectBuffer)buf).address() + pos);
  73. //设置缓冲区长度
  74. vec.putLen(iov_len, rem);
  75. iov_len++;
  76. }
  77. i++;
  78. }
  79. if (iov_len == 0)
  80. return 0L;
  81. //写入多个缓冲区数据
  82. long bytesWritten = nd.writev(fd, vec.address, iov_len);
  83. // Notify the buffers how many bytes were taken
  84. long left = bytesWritten;
  85. for (int j=0; j<iov_len; j++) {
  86. if (left > 0) {
  87. ByteBuffer buf = vec.getBuffer(j);
  88. int pos = vec.getPosition(j);
  89. int rem = vec.getRemaining(j);
  90. int n = (left > rem) ? rem : (int)left;
  91. buf.position(pos + n);
  92. left -= n;
  93. }
  94. // return shadow buffers to buffer pool
  95. ByteBuffer shadow = vec.getShadow(j);
  96. if (shadow != null)
  97. //将写入的缓冲放入临时直接缓冲区
  98. Util.offerLastTemporaryDirectBuffer(shadow);
  99. //清除缓存
  100. vec.clearRefs(j);
  101. }
  102. completed = true;
  103. return bytesWritten;
  104. } finally {
  105. // if an error occurred then clear refs to buffers and return any shadow
  106. // buffers to cache
  107. if (!completed) {
  108. for (int j=0; j<iov_len; j++) {
  109. ByteBuffer shadow = vec.getShadow(j);
  110. if (shadow != null)
  111. Util.offerLastTemporaryDirectBuffer(shadow);
  112. vec.clearRefs(j);
  113. }
  114. }
  115. }
  116. }
  117. }
  118. class IOVecWrapper {
  119. // per thread IOVecWrapper
  120. //每个线程保存一份IOVecWrapper缓存
  121. private static final ThreadLocal<IOVecWrapper> cached =
  122. new ThreadLocal<IOVecWrapper>();
  123. private IOVecWrapper(int size) {
  124. this.size = size;
  125. this.buf = new ByteBuffer[size];
  126. this.position = new int[size];
  127. this.remaining = new int[size];
  128. this.shadow = new ByteBuffer[size];
  129. //false:无需页面对齐
  130. this.vecArray = new AllocatedNativeObject(size * SIZE_IOVEC, false);
  131. this.address = vecArray.address();
  132. }
  133. //通过get获取一块适合大小的空间
  134. static IOVecWrapper get(int size) {
  135. IOVecWrapper wrapper = cached.get();
  136. if (wrapper != null && wrapper.size < size) {
  137. // not big enough; eagerly release memory
  138. //若获取到空间不够大,则重新初始化一个空间。
  139. wrapper.vecArray.free();
  140. wrapper = null;
  141. }
  142. if (wrapper == null) {
  143. wrapper = new IOVecWrapper(size);
  144. //native资源,当对象释放时使得操作系统可以释放内存
  145. Cleaner.create(wrapper, new Deallocator(wrapper.vecArray));
  146. cached.set(wrapper);
  147. }
  148. return wrapper;
  149. }
  150. //保存buf
  151. void setBuffer(int i, ByteBuffer buf, int pos, int rem) {
  152. this.buf[i] = buf;
  153. this.position[i] = pos;
  154. this.remaining[i] = rem;
  155. }
  156. static {
  157. //获取本机指针大小
  158. addressSize = Util.unsafe().addressSize();
  159. //保存每个指针偏移量
  160. LEN_OFFSET = addressSize;
  161. //用于保存每个AllocatedNativeObject对象的元素的大小
  162. //每个NativeObject有两个long属性,因此需要×2
  163. SIZE_IOVEC = (short) (addressSize * 2);
  164. }
  165. }

read

读的时候,如果采用的时堆缓冲区,会多一遍读取到直接缓冲区,然后在读取到堆缓冲区

  1. static int read(FileDescriptor fd, ByteBuffer dst, long position,
  2. NativeDispatcher nd)
  3. throws IOException
  4. {
  5. if (dst.isReadOnly())
  6. throw new IllegalArgumentException("Read-only buffer");
  7. if (dst instanceof DirectBuffer)
  8. //使用直接缓冲区
  9. return readIntoNativeBuffer(fd, dst, position, nd);
  10. // Substitute a native buffer
  11. //获取临时直接缓冲区
  12. ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
  13. try {
  14. int n = readIntoNativeBuffer(fd, bb, position, nd);
  15. bb.flip();
  16. //临时缓冲区数据写入堆内存中
  17. if (n > 0)
  18. dst.put(bb);
  19. return n;
  20. } finally {
  21. //释放临时缓冲区
  22. Util.offerFirstTemporaryDirectBuffer(bb);
  23. }
  24. }

分散读

  1. static long read(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length,
  2. NativeDispatcher nd)
  3. throws IOException
  4. {
  5. IOVecWrapper vec = IOVecWrapper.get(length);
  6. boolean completed = false;
  7. int iov_len = 0;
  8. try {
  9. // Iterate over buffers to populate native iovec array.
  10. int count = offset + length;
  11. int i = offset;
  12. while (i < count && iov_len < IOV_MAX) {
  13. ByteBuffer buf = bufs[i];
  14. if (buf.isReadOnly())
  15. throw new IllegalArgumentException("Read-only buffer");
  16. int pos = buf.position();
  17. int lim = buf.limit();
  18. assert (pos <= lim);
  19. int rem = (pos <= lim ? lim - pos : 0);
  20. if (rem > 0) {
  21. vec.setBuffer(iov_len, buf, pos, rem);
  22. // allocate shadow buffer to ensure I/O is done with direct buffer
  23. if (!(buf instanceof DirectBuffer)) {
  24. ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
  25. vec.setShadow(iov_len, shadow);
  26. buf = shadow;
  27. pos = shadow.position();
  28. }
  29. vec.putBase(iov_len, ((DirectBuffer)buf).address() + pos);
  30. vec.putLen(iov_len, rem);
  31. iov_len++;
  32. }
  33. i++;
  34. }
  35. if (iov_len == 0)
  36. return 0L;
  37. long bytesRead = nd.readv(fd, vec.address, iov_len);
  38. // Notify the buffers how many bytes were read
  39. long left = bytesRead;
  40. for (int j=0; j<iov_len; j++) {
  41. ByteBuffer shadow = vec.getShadow(j);
  42. if (left > 0) {
  43. ByteBuffer buf = vec.getBuffer(j);
  44. int rem = vec.getRemaining(j);
  45. int n = (left > rem) ? rem : (int)left;
  46. //说明buf为直接内存,则修改pos
  47. if (shadow == null) {
  48. int pos = vec.getPosition(j);
  49. buf.position(pos + n);
  50. } else {
  51. //修改上限
  52. shadow.limit(shadow.position() + n);
  53. //填充到堆缓冲区
  54. buf.put(shadow);
  55. }
  56. left -= n;
  57. }
  58. if (shadow != null)
  59. //将读取的缓冲放入临时直接缓冲区
  60. Util.offerLastTemporaryDirectBuffer(shadow);
  61. vec.clearRefs(j);
  62. }
  63. completed = true;
  64. return bytesRead;
  65. } finally {
  66. // if an error occurred then clear refs to buffers and return any shadow
  67. // buffers to cache
  68. if (!completed) {
  69. for (int j=0; j<iov_len; j++) {
  70. ByteBuffer shadow = vec.getShadow(j);
  71. if (shadow != null)
  72. Util.offerLastTemporaryDirectBuffer(shadow);
  73. vec.clearRefs(j);
  74. }
  75. }
  76. }
  77. }

close

  1. public final void close() throws IOException {
  2. synchronized (closeLock) {
  3. if (!open)
  4. return;
  5. //标记关闭
  6. open = false;
  7. //调用子类实现
  8. implCloseChannel();
  9. }
  10. }
  11. protected final void implCloseChannel() throws IOException {
  12. //关闭当前channel
  13. implCloseSelectableChannel();
  14. synchronized (keyLock) {
  15. int count = (keys == null) ? 0 : keys.length;
  16. for (int i = 0; i < count; i++) {
  17. SelectionKey k = keys[i];
  18. if (k != null)
  19. //注册的channel全部取消
  20. k.cancel();
  21. }
  22. }
  23. }
  24. protected void implCloseSelectableChannel() throws IOException {
  25. synchronized (stateLock) {
  26. //标记读写关闭,因为基于TCP所以支持双向读写操作
  27. isInputOpen = false;
  28. isOutputOpen = false;
  29. if (state != ST_KILLED)
  30. //windows不做处理
  31. //linux和Solaris需要,关闭前将fd复制到另一个待关闭fd中,以防止被fd回收
  32. nd.preClose(fd);
  33. if (readerThread != 0)
  34. //发送信号给读线程,将其从阻塞I/O中释放,避免一直被阻塞。
  35. NativeThread.signal(readerThread);
  36. if (writerThread != 0)
  37. //发送信号给写线程,将其从阻塞I/O中释放,避免一直被阻塞。
  38. NativeThread.signal(writerThread);
  39. //若还有注册的channel,则不处理,等待key全部注销后再kill
  40. //若没有的话可以直接kill当前channel
  41. if (!isRegistered())
  42. kill();
  43. }
  44. }
  45. public void kill() throws IOException {
  46. synchronized (stateLock) {
  47. if (state == ST_KILLED)
  48. return;
  49. if (state == ST_UNINITIALIZED) {
  50. state = ST_KILLED;
  51. return;
  52. }
  53. assert !isOpen() && !isRegistered();
  54. // Postpone the kill if there is a waiting reader
  55. // or writer thread. See the comments in read() for
  56. // more detailed explanation.
  57. //若仍有线程还没释放,则等线程I/O执行完后再kill
  58. if (readerThread == 0 && writerThread == 0) {
  59. nd.close(fd);
  60. state = ST_KILLED;
  61. } else {
  62. state = ST_KILLPENDING;
  63. }
  64. }
  65. }

关闭读写流

  1. //关闭输入流
  2. @Override
  3. public SocketChannel shutdownInput() throws IOException {
  4. synchronized (stateLock) {
  5. if (!isOpen())
  6. throw new ClosedChannelException();
  7. if (!isConnected())
  8. throw new NotYetConnectedException();
  9. if (isInputOpen) {
  10. Net.shutdown(fd, Net.SHUT_RD);
  11. if (readerThread != 0)
  12. NativeThread.signal(readerThread);
  13. isInputOpen = false;
  14. }
  15. return this;
  16. }
  17. }
  18. //关闭输出流
  19. @Override
  20. public SocketChannel shutdownOutput() throws IOException {
  21. synchronized (stateLock) {
  22. if (!isOpen())
  23. throw new ClosedChannelException();
  24. if (!isConnected())
  25. throw new NotYetConnectedException();
  26. if (isOutputOpen) {
  27. Net.shutdown(fd, Net.SHUT_WR);
  28. if (writerThread != 0)
  29. NativeThread.signal(writerThread);
  30. isOutputOpen = false;
  31. }
  32. return this;
  33. }
  34. }

DatagramChannel

open

socket

bind

receive

send

发表评论

表情:
评论列表 (有 0 条评论,81人围观)

还没有评论,来说两句吧...

相关阅读