jdk源码解析八之NIO(socketChannel)
ServerSocketChannel
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
//创建FileDescriptor
this.fd = Net.serverSocket(true);
//获取到文件描述符的值。
this.fdVal = IOUtil.fdVal(fd);
//标记状态为使用
this.state = ST_INUSE;
}
ServerSocketChannelImpl(SelectorProvider sp,
FileDescriptor fd,
boolean bound)
throws IOException
{
super(sp);
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
//已绑定则直接获取地址
if (bound)
//获取传入的文件描述符的socket地址
localAddress = Net.localAddress(fd);
}
@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
synchronized (lock) {
//判断是否通道打开以及是否绑定
if (!isOpen())
throw new ClosedChannelException();
if (isBound())
throw new AlreadyBoundException();
//绑定地址为null处理
InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) :
Net.checkAddress(local);
SecurityManager sm = System.getSecurityManager();
//检测端口是否被监听
if (sm != null)
sm.checkListen(isa.getPort());
//将fd转换为SDP(Sockets Direct Protocol,Java套接字直接协议) socket。
//SDP需要网卡支持InfiniBand高速网络通信技术,windows不支持该协议。
NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
//连接
Net.bind(fd, isa.getAddress(), isa.getPort());
//默认tcp待连接队列长度为50
Net.listen(fd, backlog < 1 ? 50 : backlog);
synchronized (stateLock) {
//获取文件描述符的socket地址
localAddress = Net.localAddress(fd);
}
}
return this;
}
public SocketChannel accept() throws IOException {
synchronized (lock) {
if (!isOpen())
throw new ClosedChannelException();
if (!isBound())
throw new NotYetBoundException();
SocketChannel sc = null;
int n = 0;
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
try {
begin();
if (!isOpen())
return null;
thread = NativeThread.current();
for (;;) {
//接收一个新的连接
//将isaa设置为socket的远程地址
//将给定的文件描述符与socket客户端绑定
n = accept(this.fd, newfd, isaa);
//返回1成功
//系统执行中断,继续等待接收
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
break;
}
} finally {
thread = 0;
end(n > 0);
assert IOStatus.check(n);
}
//返回1成功
if (n < 1)
return null;
//默认阻塞模式
IOUtil.configureBlocking(newfd, true);
InetSocketAddress isa = isaa[0];
//初始化客户端SocketChannel
sc = new SocketChannelImpl(provider(), newfd, isa);
//检查权限
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
try {
sm.checkAccept(isa.getAddress().getHostAddress(),
isa.getPort());
} catch (SecurityException x) {
sc.close();
throw x;
}
}
return sc;
}
}
//配置是否阻塞
protected void implConfigureBlocking(boolean block) throws IOException {
IOUtil.configureBlocking(fd, block);
}
//关闭socket
protected void implCloseSelectableChannel() throws IOException {
synchronized (stateLock) {
if (state != ST_KILLED)
nd.preClose(fd);
long th = thread;
if (th != 0)
//发送信号给线程,将其从阻塞I/O中释放,避免一直被阻塞。
NativeThread.signal(th);
//若还有注册的channel,则不处理,等待key全部注销后再kill
//若没有的话可以直接kill当前channel
if (!isRegistered())
kill();
}
}
SocketChannel
生命周期
connect
public static SocketChannel open(SocketAddress remote)
throws IOException
{
SocketChannel sc = open();
try {
//建立连接
sc.connect(remote);
} catch (Throwable x) {
try {
sc.close();
} catch (Throwable suppressed) {
x.addSuppressed(suppressed);
}
throw x;
}
assert sc.isConnected();
return sc;
}
public boolean connect(SocketAddress sa) throws IOException {
int localPort = 0;
synchronized (readLock) {
synchronized (writeLock) {
ensureOpenAndUnconnected();
InetSocketAddress isa = Net.checkAddress(sa);
SecurityManager sm = System.getSecurityManager();
if (sm != null)
sm.checkConnect(isa.getAddress().getHostAddress(),
isa.getPort());
synchronized (blockingLock()) {
int n = 0;
try {
try {
begin();
synchronized (stateLock) {
if (!isOpen()) {
return false;
}
// notify hook only if unbound
if (localAddress == null) {
//将fd转换为SDP(Sockets Direct Protocol,Java套接字直接协议) socket。
NetHooks.beforeTcpConnect(fd,
isa.getAddress(),
isa.getPort());
}
readerThread = NativeThread.current();
}
for (;;) {
InetAddress ia = isa.getAddress();
if (ia.isAnyLocalAddress())
//返回本地主机地址
ia = InetAddress.getLocalHost();
//建立连接
n = Net.connect(fd,
ia,
isa.getPort());
if ( (n == IOStatus.INTERRUPTED)
&& isOpen())
continue;
break;
}
} finally {
readerCleanup();
end((n > 0) || (n == IOStatus.UNAVAILABLE));
assert IOStatus.check(n);
}
} catch (IOException x) {
// If an exception was thrown, close the channel after
// invoking end() so as to avoid bogus
// AsynchronousCloseExceptions
close();
throw x;
}
synchronized (stateLock) {
remoteAddress = isa;
if (n > 0) {
// Connection succeeded; disallow further
// invocation
//更新状态
state = ST_CONNECTED;
if (isOpen())
localAddress = Net.localAddress(fd);
return true;
}
// If nonblocking and no exception then connection
// pending; disallow another invocation
//非阻塞状态下更新为待连接
if (!isBlocking())
state = ST_PENDING;
else
assert false;
}
}
return false;
}
}
}
关于linux的聚集写和散步读具体内容,参考readv()和writev()函数
write
public int write(ByteBuffer buf) throws IOException {
if (buf == null)
throw new NullPointerException();
synchronized (writeLock) {
ensureWriteOpen();
int n = 0;
try {
begin();
synchronized (stateLock) {
if (!isOpen())
return 0;
writerThread = NativeThread.current();
}
for (;;) {
//写入buf
n = IOUtil.write(fd, buf, -1, nd);
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
return IOStatus.normalize(n);
}
} finally {
writerCleanup();
end(n > 0 || (n == IOStatus.UNAVAILABLE));
synchronized (stateLock) {
if ((n <= 0) && (!isOutputOpen))
throw new AsynchronousCloseException();
}
assert IOStatus.check(n);
}
}
}
static int write(FileDescriptor fd, ByteBuffer src, long position,
NativeDispatcher nd)
throws IOException
{
//如果是直接缓冲区则写入本地缓冲区中
if (src instanceof DirectBuffer)
return writeFromNativeBuffer(fd, src, position, nd);
//如果缓冲区是基于堆的,会多一次内存复制
// Substitute a native buffer
int pos = src.position();
int lim = src.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
//创建临时缓冲区
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
try {
//写入临时缓冲区中
bb.put(src);
bb.flip();
// Do not update src until we see how many bytes were written
src.position(pos);
//直接缓冲区数据写入
int n = writeFromNativeBuffer(fd, bb, position, nd);
if (n > 0) {
// now update src
//更新实际写入量
src.position(pos + n);
}
return n;
} finally {
//释放临时缓冲区
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
当写入的buffer使用堆,会多一次内存复制.《Java NIO为什么需要DirectByteBuffer作为中间缓冲区》
private static ThreadLocal<BufferCache> bufferCache =
new ThreadLocal<BufferCache>()
{
@Override
protected BufferCache initialValue() {
return new BufferCache();
}
};
public static ByteBuffer getTemporaryDirectBuffer(int size) {
//如果所需缓冲区太大,直接新建
if (isBufferTooLarge(size)) {
return ByteBuffer.allocateDirect(size);
}
//从线程缓冲区中,获取一个缓冲区
BufferCache cache = bufferCache.get();
//获取容量>=size的缓冲区
ByteBuffer buf = cache.get(size);
if (buf != null) {
return buf;
} else {
//当没有可用缓冲区
if (!cache.isEmpty()) {
//第一个缓冲区移除缓存
buf = cache.removeFirst();
//释放
free(buf);
}
//重新分配一个合适大小的缓冲区
return ByteBuffer.allocateDirect(size);
}
}
ByteBuffer get(int size) {
// Don't call this if the buffer would be too large.
//断言缓冲区是否过大
assert !isBufferTooLarge(size);
//缓存没直接返回
if (count == 0)
return null; // cache is empty
ByteBuffer[] buffers = this.buffers;
// search for suitable buffer (often the first buffer will do)
ByteBuffer buf = buffers[start];
//如果取出第一个buf容量<size,则遍历查找
if (buf.capacity() < size) {
buf = null;
int i = start;
//查找>=size缓冲区
while ((i = next(i)) != start) {
ByteBuffer bb = buffers[i];
if (bb == null)
break;
if (bb.capacity() >= size) {
buf = bb;
break;
}
}
if (buf == null)
return null;
// move first element to here to avoid re-packing
//覆盖取出的元素
buffers[i] = buffers[start];
}
// remove first element
//start下标的buf不管满足size与否,都会置空,如果满足size,置空后start移动到下一个位置
//如果不满足size,则从后遍历查找到符合size要求的,然后start覆盖,start移动到下一个位置
buffers[start] = null;
start = next(start);
count--;
// prepare the buffer and return it
//重置pos,设置limit上限
buf.rewind();
buf.limit(size);
return buf;
}
static void offerFirstTemporaryDirectBuffer(ByteBuffer buf) {
// If the buffer is too large for the cache we don't have to
// check the cache. We'll just free it.
//太大直接释放
if (isBufferTooLarge(buf)) {
free(buf);
return;
}
assert buf != null;
BufferCache cache = bufferCache.get();
//
if (!cache.offerFirst(buf)) {
// cache is full
//满了直接释放
free(buf);
}
}
boolean offerFirst(ByteBuffer buf) {
// Don't call this if the buffer is too large.
//断言
assert !isBufferTooLarge(buf);
//超过缓冲容量,返回false
if (count >= TEMP_BUF_POOL_SIZE) {
return false;
} else {
//计算前一个位置,前一位绝对是为null的
start = (start + TEMP_BUF_POOL_SIZE - 1) % TEMP_BUF_POOL_SIZE;
buffers[start] = buf;
count++;
return true;
}
}
聚集写
按照顺序写入多个buf
//一次性写入多个buf
//聚集写
public long write(ByteBuffer[] srcs, int offset, int length)
throws IOException
{
if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
synchronized (writeLock) {
ensureWriteOpen();
long n = 0;
try {
begin();
synchronized (stateLock) {
if (!isOpen())
return 0;
writerThread = NativeThread.current();
}
for (;;) {
//写入多个buf
n = IOUtil.write(fd, srcs, offset, length, nd);
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
return IOStatus.normalize(n);
}
} finally {
writerCleanup();
end((n > 0) || (n == IOStatus.UNAVAILABLE));
synchronized (stateLock) {
if ((n <= 0) && (!isOutputOpen))
throw new AsynchronousCloseException();
}
assert IOStatus.check(n);
}
}
}
public class IOUtil {
static long write(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length,
NativeDispatcher nd)
throws IOException
{
//封装多个buf
IOVecWrapper vec = IOVecWrapper.get(length);
boolean completed = false;
int iov_len = 0;
try {
// Iterate over buffers to populate native iovec array.
int count = offset + length;
int i = offset;
while (i < count && iov_len < IOV_MAX) {
//遍历每一块缓冲区
ByteBuffer buf = bufs[i];
//计算可读大小
int pos = buf.position();
int lim = buf.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
//将buf放入IOVecWrapper
if (rem > 0) {
vec.setBuffer(iov_len, buf, pos, rem);
// allocate shadow buffer to ensure I/O is done with direct buffer
//针对buf不是直接缓冲区的处理
if (!(buf instanceof DirectBuffer)) {
ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
shadow.put(buf);
shadow.flip();
vec.setShadow(iov_len, shadow);
buf.position(pos); // temporarily restore position in user buffer
buf = shadow;
pos = shadow.position();
}
//设置缓冲区的起始地址
vec.putBase(iov_len, ((DirectBuffer)buf).address() + pos);
//设置缓冲区长度
vec.putLen(iov_len, rem);
iov_len++;
}
i++;
}
if (iov_len == 0)
return 0L;
//写入多个缓冲区数据
long bytesWritten = nd.writev(fd, vec.address, iov_len);
// Notify the buffers how many bytes were taken
long left = bytesWritten;
for (int j=0; j<iov_len; j++) {
if (left > 0) {
ByteBuffer buf = vec.getBuffer(j);
int pos = vec.getPosition(j);
int rem = vec.getRemaining(j);
int n = (left > rem) ? rem : (int)left;
buf.position(pos + n);
left -= n;
}
// return shadow buffers to buffer pool
ByteBuffer shadow = vec.getShadow(j);
if (shadow != null)
//将写入的缓冲放入临时直接缓冲区
Util.offerLastTemporaryDirectBuffer(shadow);
//清除缓存
vec.clearRefs(j);
}
completed = true;
return bytesWritten;
} finally {
// if an error occurred then clear refs to buffers and return any shadow
// buffers to cache
if (!completed) {
for (int j=0; j<iov_len; j++) {
ByteBuffer shadow = vec.getShadow(j);
if (shadow != null)
Util.offerLastTemporaryDirectBuffer(shadow);
vec.clearRefs(j);
}
}
}
}
}
class IOVecWrapper {
// per thread IOVecWrapper
//每个线程保存一份IOVecWrapper缓存
private static final ThreadLocal<IOVecWrapper> cached =
new ThreadLocal<IOVecWrapper>();
private IOVecWrapper(int size) {
this.size = size;
this.buf = new ByteBuffer[size];
this.position = new int[size];
this.remaining = new int[size];
this.shadow = new ByteBuffer[size];
//false:无需页面对齐
this.vecArray = new AllocatedNativeObject(size * SIZE_IOVEC, false);
this.address = vecArray.address();
}
//通过get获取一块适合大小的空间
static IOVecWrapper get(int size) {
IOVecWrapper wrapper = cached.get();
if (wrapper != null && wrapper.size < size) {
// not big enough; eagerly release memory
//若获取到空间不够大,则重新初始化一个空间。
wrapper.vecArray.free();
wrapper = null;
}
if (wrapper == null) {
wrapper = new IOVecWrapper(size);
//native资源,当对象释放时使得操作系统可以释放内存
Cleaner.create(wrapper, new Deallocator(wrapper.vecArray));
cached.set(wrapper);
}
return wrapper;
}
//保存buf
void setBuffer(int i, ByteBuffer buf, int pos, int rem) {
this.buf[i] = buf;
this.position[i] = pos;
this.remaining[i] = rem;
}
static {
//获取本机指针大小
addressSize = Util.unsafe().addressSize();
//保存每个指针偏移量
LEN_OFFSET = addressSize;
//用于保存每个AllocatedNativeObject对象的元素的大小
//每个NativeObject有两个long属性,因此需要×2
SIZE_IOVEC = (short) (addressSize * 2);
}
}
read
读的时候,如果采用的时堆缓冲区,会多一遍读取到直接缓冲区,然后在读取到堆缓冲区
static int read(FileDescriptor fd, ByteBuffer dst, long position,
NativeDispatcher nd)
throws IOException
{
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
if (dst instanceof DirectBuffer)
//使用直接缓冲区
return readIntoNativeBuffer(fd, dst, position, nd);
// Substitute a native buffer
//获取临时直接缓冲区
ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
try {
int n = readIntoNativeBuffer(fd, bb, position, nd);
bb.flip();
//临时缓冲区数据写入堆内存中
if (n > 0)
dst.put(bb);
return n;
} finally {
//释放临时缓冲区
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
分散读
static long read(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length,
NativeDispatcher nd)
throws IOException
{
IOVecWrapper vec = IOVecWrapper.get(length);
boolean completed = false;
int iov_len = 0;
try {
// Iterate over buffers to populate native iovec array.
int count = offset + length;
int i = offset;
while (i < count && iov_len < IOV_MAX) {
ByteBuffer buf = bufs[i];
if (buf.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
int pos = buf.position();
int lim = buf.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
if (rem > 0) {
vec.setBuffer(iov_len, buf, pos, rem);
// allocate shadow buffer to ensure I/O is done with direct buffer
if (!(buf instanceof DirectBuffer)) {
ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
vec.setShadow(iov_len, shadow);
buf = shadow;
pos = shadow.position();
}
vec.putBase(iov_len, ((DirectBuffer)buf).address() + pos);
vec.putLen(iov_len, rem);
iov_len++;
}
i++;
}
if (iov_len == 0)
return 0L;
long bytesRead = nd.readv(fd, vec.address, iov_len);
// Notify the buffers how many bytes were read
long left = bytesRead;
for (int j=0; j<iov_len; j++) {
ByteBuffer shadow = vec.getShadow(j);
if (left > 0) {
ByteBuffer buf = vec.getBuffer(j);
int rem = vec.getRemaining(j);
int n = (left > rem) ? rem : (int)left;
//说明buf为直接内存,则修改pos
if (shadow == null) {
int pos = vec.getPosition(j);
buf.position(pos + n);
} else {
//修改上限
shadow.limit(shadow.position() + n);
//填充到堆缓冲区
buf.put(shadow);
}
left -= n;
}
if (shadow != null)
//将读取的缓冲放入临时直接缓冲区
Util.offerLastTemporaryDirectBuffer(shadow);
vec.clearRefs(j);
}
completed = true;
return bytesRead;
} finally {
// if an error occurred then clear refs to buffers and return any shadow
// buffers to cache
if (!completed) {
for (int j=0; j<iov_len; j++) {
ByteBuffer shadow = vec.getShadow(j);
if (shadow != null)
Util.offerLastTemporaryDirectBuffer(shadow);
vec.clearRefs(j);
}
}
}
}
close
public final void close() throws IOException {
synchronized (closeLock) {
if (!open)
return;
//标记关闭
open = false;
//调用子类实现
implCloseChannel();
}
}
protected final void implCloseChannel() throws IOException {
//关闭当前channel
implCloseSelectableChannel();
synchronized (keyLock) {
int count = (keys == null) ? 0 : keys.length;
for (int i = 0; i < count; i++) {
SelectionKey k = keys[i];
if (k != null)
//注册的channel全部取消
k.cancel();
}
}
}
protected void implCloseSelectableChannel() throws IOException {
synchronized (stateLock) {
//标记读写关闭,因为基于TCP所以支持双向读写操作
isInputOpen = false;
isOutputOpen = false;
if (state != ST_KILLED)
//windows不做处理
//linux和Solaris需要,关闭前将fd复制到另一个待关闭fd中,以防止被fd回收
nd.preClose(fd);
if (readerThread != 0)
//发送信号给读线程,将其从阻塞I/O中释放,避免一直被阻塞。
NativeThread.signal(readerThread);
if (writerThread != 0)
//发送信号给写线程,将其从阻塞I/O中释放,避免一直被阻塞。
NativeThread.signal(writerThread);
//若还有注册的channel,则不处理,等待key全部注销后再kill
//若没有的话可以直接kill当前channel
if (!isRegistered())
kill();
}
}
public void kill() throws IOException {
synchronized (stateLock) {
if (state == ST_KILLED)
return;
if (state == ST_UNINITIALIZED) {
state = ST_KILLED;
return;
}
assert !isOpen() && !isRegistered();
// Postpone the kill if there is a waiting reader
// or writer thread. See the comments in read() for
// more detailed explanation.
//若仍有线程还没释放,则等线程I/O执行完后再kill
if (readerThread == 0 && writerThread == 0) {
nd.close(fd);
state = ST_KILLED;
} else {
state = ST_KILLPENDING;
}
}
}
关闭读写流
//关闭输入流
@Override
public SocketChannel shutdownInput() throws IOException {
synchronized (stateLock) {
if (!isOpen())
throw new ClosedChannelException();
if (!isConnected())
throw new NotYetConnectedException();
if (isInputOpen) {
Net.shutdown(fd, Net.SHUT_RD);
if (readerThread != 0)
NativeThread.signal(readerThread);
isInputOpen = false;
}
return this;
}
}
//关闭输出流
@Override
public SocketChannel shutdownOutput() throws IOException {
synchronized (stateLock) {
if (!isOpen())
throw new ClosedChannelException();
if (!isConnected())
throw new NotYetConnectedException();
if (isOutputOpen) {
Net.shutdown(fd, Net.SHUT_WR);
if (writerThread != 0)
NativeThread.signal(writerThread);
isOutputOpen = false;
}
return this;
}
}
还没有评论,来说两句吧...