从零学Netty(三)AIO通信(多人聊天室)
AIO概念
异步非阻塞IO,全称AsynchronousI/O,JDK1.7升级了NIO类库,升级后的NIO类库被称为NIO2.0,也就是我们要介绍的AIO
NIO2.0引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。异步通道提供两种方式获取操作结果
它不需要通过多路复用器(Selector)对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO的编程模型
原理图
图片来源:https://mp.weixin.qq.com/s/CRd3-vRD7xwoexqv7xyHRw
实例demo
服务端代码
/**
* aio服务端
*
* @author LionLi
*/
public class AioServer {
public void startListen(int port) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
serverSocketChannel.bind(new InetSocketAddress(port));
// 接收并处理客户端请求
serverSocketChannel.accept(null, new ServerHandler(serverSocketChannel));
}
public static void main(String[] args) throws Exception {
AioServer aioServer = new AioServer();
aioServer.startListen(8088);
System.out.println("服务端正在监听.........");
// 阻塞代码
System.in.read();
}
}
服务端消息处理器代码
/**
* aio服务端消息处理器
*
* @author LionLi
*/
public class ServerHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
// 客户端通道列表
private static final List<AsynchronousSocketChannel> CHANNEL_LIST = new ArrayList<>();
private final AsynchronousServerSocketChannel serverSocketChannel;
public ServerHandler(AsynchronousServerSocketChannel serverSocketChannel) {
this.serverSocketChannel = serverSocketChannel;
}
/**
* 当实际IO操作完成时触发
*/
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
CHANNEL_LIST.add(result);
serverSocketChannel.accept(null, this);
result.read(byteBuffer, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result1, Object attachment) {
byteBuffer.flip();
// 获取消息输出
String msg = StandardCharsets.UTF_8.decode(byteBuffer).toString();
System.out.println("转发消息: " + msg);
// 分发消息
try {
for (AsynchronousSocketChannel asc : CHANNEL_LIST) {
if (asc.isOpen()) {
asc.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8))).get();
}
}
} catch (Exception e) {
e.printStackTrace();
}
byteBuffer.clear();
result.read(byteBuffer, null, this);
}
@Override
public void failed(Throwable exc, Object attachment) {
// 从该Channel中读取数据失败,将该Channel删除
CHANNEL_LIST.remove(result);
}
}
);
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}
客户端代码
/**
* aio客户端
*
* @author LionLi
*/
public class AioClient {
private final ExecutorService executorService;
// 连接通道
private AsynchronousSocketChannel socketChannel;
private AsynchronousChannelGroup channelGroup;
private final String username;
public AioClient(String ip, int port, String username) {
this.username = username;
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
executorService = Executors.newCachedThreadPool();
try {
channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
socketChannel = AsynchronousSocketChannel.open(channelGroup);
// 连接到远程服务器
socketChannel.connect(new InetSocketAddress(ip, port)).get();
send("上线了");
byteBuffer.clear();
socketChannel.read(byteBuffer, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
byteBuffer.flip();
// 将buff中的内容转换成字符串
String content = StandardCharsets.UTF_8.decode(byteBuffer).toString();
// 显示从服务器端读取的数据
System.out.println(content);
byteBuffer.clear();
socketChannel.read(byteBuffer, null, this);
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
public void send(String msg) {
msg = username + ":" + msg;
try {
socketChannel.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8))).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
System.out.print("请输入用户名: ");
Scanner scanner = new Scanner(System.in);
String username = scanner.nextLine();
AioClient client = new AioClient("localhost", 8088, username);
while (true) {
// 发送数据到服务端
String msg = scanner.nextLine();
if (msg.equals("exit")) {
client.send("下线了");
client.channelGroup.shutdown();
client.executorService.shutdown();
break;
} else {
client.send(msg);
}
}
}
}
测试
- 场景如下:
- 启动服务端
- 顺序启动客户端 用户ABC 并输入用户名
- 分别打招呼
倒序退出客户端 用户ABC
项目已上传到gitee
地址: netty-demo
如果帮到您了,请帮忙点个star
还没有评论,来说两句吧...