netty源码阅读之客户端新连接之检测新连接
boss线程的NioEventLoop在用户代码调用bond的时候启动。
检测新连接我们从《netty源码阅读之NioEventLoop之NioEventLoop执行——-processSelectedKey()执行》这一篇文章的这个方法入口:processSelectedKeysOptimized(SelectionKey[] selectedKeys)
看到这一步:
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
之前我们的NioSocket以attachment的方式传入select key set里面,所以我们进入
processSelectedKey(k, (AbstractNioChannel) a);
这个方法里面。
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
selectAgain();
// Need to flip the optimized selectedKeys to get the right reference to the array
// and reset the index to -1 which will then set to 0 on the for loop
// to start over again.
//
// See https://github.com/netty/netty/issues/1523
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
..
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
...
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
...
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
由于服务端的socketChannel注册的是accept事件,所以我们会进入这个:
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
NioMessageUnsafe.read()
服务端NioServerSocketChannel的unsafe的实现是NioMessageUnsafe(后续会有文章介绍unsafe),所以我们到达了这个方法:
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
...
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
...
}
}
}
这个方法里面有个doMessageRead(),是主要的读方法,通过while循环不停地读数据。
这个final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();代码里面有个handle,是读的策略(控制连接接入的速度,每次有一个新连接就加1,默认最大只读16个连接)。在这个
allocHandle.continueReading()里面,可以知道它什么时候停下来:
@Override
public boolean continueReading() {
return config.isAutoRead() &&
attemptedBytesRead == lastBytesRead &&
totalMessages < maxMessagePerRead &&
totalBytesRead < Integer.MAX_VALUE;
}
totalMessages<maxMessagePerRead是判断读到的连接是否大于最大连接,maxMessagePerRead默认是16。
我们继续doMessageRead()吧。
doMessageRead()
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
在这里面,SocketChannel是通过jdk实现的。然后这个buf.add(new NioSocketChannel(this, ch));新建一个客户端的NioSocketChannel,把jdk实现得SocketChannel绑定进去(把jdk底层的channel封装进去),并且把this,也就是服务端NioServerSocketChannel也作为参数传递进去。
buf是之前传入进来的,其实他就是之前的readBuf,一个list,保存的是客户端的NioSocketChannel:
private final List<Object> readBuf = new ArrayList<Object>();
下一篇文章,我们分析new NioSocketChannel(this, ch)这一步做了什么事情。
还没有评论,来说两句吧...