从零学Netty(二)NIO通信(多人聊天室)
NIO概念
非阻塞I/O,全称Non-BlockingIO,是在Jdk1.4之后增加的一套新的操作I/O工具包
NIO是为提供I/O吞吐量而专门设计,其卓越的性能甚至可以与C媲美。
NIO是通过Reactor模式的事件驱动机制来达到Non blocking的,就是我们将事件注册到Reactor中,当有相应的事件发生时,Reactor便会告知我们有哪些事件发生了,我们再根据具体的事件去做相应的处理。
NIO三大核心
NIO有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)
缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer
通道(Channel):NIO的通道类似于流区别如下:
- 通道可以同时进行读写,而流只能读或者只能写
- 通道可以实现异步读写数据
- 通道可以从缓冲读数据,也可以写数据到缓冲
选择器(Selector):
- Selector 能够检测多个注册的通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
- 只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程
- 避免了多线程之间的上下文切换导致的开销
原理图
图片来源:https://mp.weixin.qq.com/s/CRd3-vRD7xwoexqv7xyHRw
实例demo
服务端代码
/**
* nio服务端
*
* @author LionLi
*/
public class NioServer {
// 选择器
private Selector selector;
// 连接通道
private ServerSocketChannel listenChannel;
public NioServer(int port) {
try {
selector = Selector.open();
listenChannel = ServerSocketChannel.open();
// 绑定端口
listenChannel.socket().bind(new InetSocketAddress(port));
// 设置非阻塞模式
listenChannel.configureBlocking(false);
// 连接注册到selector
listenChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
e.printStackTrace();
}
}
public void listen() {
System.out.println("服务端正在监听.........");
try {
while (true) {
// 获取事件
if (selector.select() == 0) {
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 监听到不同的事件进行不同的处理
listenHandler(key);
// 删除当前key,防止重复处理
iterator.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 针对监听到的不同事件做不同的处理
*/
public void listenHandler(SelectionKey key) {
SocketChannel sc = null;
try {
// 连接事件处理
if (key.isAcceptable()) {
// 连接注册到selector
sc = listenChannel.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
// 读取事件处理
if (key.isReadable()) {
// 拿到socketChannel
sc = (SocketChannel) key.channel();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读取消息数据
if (sc.read(buffer) > 0) {
String msg = new String(buffer.array());
System.out.println("转发消息: " + msg);
// 将消息转发
sendMsgToClient(msg);
}
}
} catch (IOException e) {
try {
// 取消注册
key.cancel();
// 关闭通道
sc.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
/**
* 发送消息到客户端
*/
public void sendMsgToClient(String msg) throws IOException {
for (SelectionKey key : selector.keys()) {
Channel channel = key.channel();
if (channel instanceof SocketChannel) {
SocketChannel targetChannel = (SocketChannel) channel;
// 将msg写到buffer中
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
// 将buffer数据写入通道
targetChannel.write(buffer);
}
}
}
public static void main(String[] args) {
NioServer server = new NioServer(8088);
server.listen();
}
}
客户端代码
/**
* nio客户端
*
* @author LionLi
*/
public class NioClient {
// 选择器
private Selector selector;
// 连接通道
private SocketChannel socketChannel;
// 用户名
private String username;
// 启动标志位
private boolean flag;
public NioClient(String ip, int port,String username) {
try {
this.username = username;
flag = true;
selector = Selector.open();
// 连接服务器
socketChannel = SocketChannel.open(new InetSocketAddress(ip, port));
// 设置非阻塞
socketChannel.configureBlocking(false);
// 连接注册到selector
socketChannel.register(selector, SelectionKey.OP_READ);
sendMsgToServer("上线了");
// 监听线程
ExecutorService executor = Executors.newSingleThreadExecutor();
// 循环读取服务端的消息
executor.execute(() -> {
while (flag){
acceptMsgFromServer();
}
});
executor.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 向服务端发送消息
*/
public void sendMsgToServer(String msg) {
msg = username + ":" + msg;
try {
// 发送消息
socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 接收服务端发来的消息
*/
public void acceptMsgFromServer() {
try {
if (selector.select() > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext() && flag) {
SelectionKey key = iterator.next();
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
sc.read(buffer);
String msg = new String(buffer.array());
System.out.println(msg.trim());
// 移除当前的key,防止重复操作
iterator.remove();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
System.out.print("请输入用户名: ");
Scanner scanner = new Scanner(System.in);
String username = scanner.nextLine();
NioClient client = new NioClient("localhost", 8088,username);
while (true) {
// 发送数据到服务端
String msg = scanner.nextLine();
if (msg.equals("exit")) {
try {
client.flag = false;
client.sendMsgToServer("下线了");
client.socketChannel.finishConnect();
client.selector.close();
break;
} catch (IOException e) {
e.printStackTrace();
}
} else {
client.sendMsgToServer(msg);
}
}
}
}
测试
- 场景如下:
- 启动服务端
- 顺序启动客户端ABC 并输入用户名
- 分别打招呼
- 倒序退出客户端ABC
项目已上传到gitee
地址: netty-demo
如果帮到您了,请帮忙点个star
还没有评论,来说两句吧...