NIO之旅Selector
之前的文章介绍了Buffer和Channel,今天这篇文章简单聊一下Selector
。
Selector
是NIO中最为重要的组件之一,我们常常说的多路复用器就是指的Selector组件。Selector组件用于轮询一个或多个NIO Channel的状态是否处于可读、可写。通过轮询的机制就可以管理多个Channel,也就是说可以管理多个网络连接。
轮询机制
- 首先,需要将Channel注册到Selector上,这样Selector才知道需要管理哪些Channel
- 接着Selector会不断轮询其上注册的Channel,如果某个Channel发生了读或写的事件,这个Channel就会被Selector轮询出来,然后通过SelectionKey可以获取就绪的Channel集合,进行后续的IO操作
创建一个Selector对象
// 创建一个Selector
Selector selector = Selector.open();
注册Channel到Selector中
// 把一个Channel注册到Selector
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
需要注意的是,一定要使用configureBlocking(false)
把Channel设置成非阻塞模式,否则会抛出IllegalBlockingModeException
异常。
Channel的register有两个重载方法:
SelectionKey register(Selector sel, int ops) {
return register(sel, ops, null);
}
SelectionKey register(Selector sel, int ops, Object att);
对于ops参数,即selector要关心这个Channel的事件类型,在SelectionKey类里面有这样几个常量:
- OP_READ 可以从Channel读数据
- OP_WRITE 可以写数据到Channel
- OP_CONNECT 连接上了服务器
- OP_ACCEPT 有新的连接进来了
如果你对不止一种事件感兴趣,使用或运算符即可,如下:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
需要注意的是,FileChannel只有阻塞模式,不支持非阻塞模式,所以它是没有register方法的!
第三个参数att是attachment的缩写,代表可以传一个“附件”进去。在返回的SelectionKey对象里面,可以获取以下对象:
- channel():获取Channel
- selector():获取Selector
- attachment():获取附件
- attach(obj):更新附件
获取可操作的Channel
并不是所有注册过的键都仍然有效,有些可能已经被cancel()方法被调用过的键。所以一般来说,我们轮询selectedKeys()方法,Selector可以返回两种SelectionKey集合:
- keys():已注册的键的集合
selectedKeys():已选择的键的集合
Set selectedKeys = selector.selectedKeys();
当有新增就绪的Channel,调用select()方法,就会将key添加到Set集合中。
完整的实例代码
Server
public class Server {
public static void main(String[] args) {
try (
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
Selector selector = Selector.open();
) {
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8080));
serverSocketChannel.configureBlocking(false);
System.out.println("server 启动...");
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
SocketChannel socketChannel = (SocketChannel) key.channel();
int readBytes = socketChannel.read(buffer);
if (readBytes > 0) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String body = new String(bytes, StandardCharsets.UTF_8);
System.out.println("server 收到:" + body);
}
} else if(key.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
keyIterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Client
public class Client {
public static void main(String[] args) {
try (
SocketChannel socketChannel = SocketChannel.open();
) {
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
System.out.println("client 启动...");
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
buffer.put("hi, 这是client".getBytes(StandardCharsets.UTF_8));
buffer.flip();
socketChannel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
还没有评论,来说两句吧...