从零学Netty(三)AIO通信(多人聊天室)

心已赠人 2022-12-13 04:56 462阅读 0赞

AIO概念

异步非阻塞IO,全称AsynchronousI/O,JDK1.7升级了NIO类库,升级后的NIO类库被称为NIO2.0,也就是我们要介绍的AIO

NIO2.0引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。异步通道提供两种方式获取操作结果

它不需要通过多路复用器(Selector)对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO的编程模型

原理图

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MDQ2MTI4MQ_size_16_color_FFFFFF_t_70

图片来源:https://mp.weixin.qq.com/s/CRd3-vRD7xwoexqv7xyHRw

实例demo

服务端代码

  1. /**
  2. * aio服务端
  3. *
  4. * @author LionLi
  5. */
  6. public class AioServer {
  7. public void startListen(int port) throws Exception {
  8. ExecutorService executorService = Executors.newCachedThreadPool();
  9. AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
  10. AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
  11. serverSocketChannel.bind(new InetSocketAddress(port));
  12. // 接收并处理客户端请求
  13. serverSocketChannel.accept(null, new ServerHandler(serverSocketChannel));
  14. }
  15. public static void main(String[] args) throws Exception {
  16. AioServer aioServer = new AioServer();
  17. aioServer.startListen(8088);
  18. System.out.println("服务端正在监听.........");
  19. // 阻塞代码
  20. System.in.read();
  21. }
  22. }

服务端消息处理器代码

  1. /**
  2. * aio服务端消息处理器
  3. *
  4. * @author LionLi
  5. */
  6. public class ServerHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
  7. // 客户端通道列表
  8. private static final List<AsynchronousSocketChannel> CHANNEL_LIST = new ArrayList<>();
  9. private final AsynchronousServerSocketChannel serverSocketChannel;
  10. public ServerHandler(AsynchronousServerSocketChannel serverSocketChannel) {
  11. this.serverSocketChannel = serverSocketChannel;
  12. }
  13. /**
  14. * 当实际IO操作完成时触发
  15. */
  16. @Override
  17. public void completed(AsynchronousSocketChannel result, Object attachment) {
  18. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  19. CHANNEL_LIST.add(result);
  20. serverSocketChannel.accept(null, this);
  21. result.read(byteBuffer, null, new CompletionHandler<Integer, Object>() {
  22. @Override
  23. public void completed(Integer result1, Object attachment) {
  24. byteBuffer.flip();
  25. // 获取消息输出
  26. String msg = StandardCharsets.UTF_8.decode(byteBuffer).toString();
  27. System.out.println("转发消息: " + msg);
  28. // 分发消息
  29. try {
  30. for (AsynchronousSocketChannel asc : CHANNEL_LIST) {
  31. if (asc.isOpen()) {
  32. asc.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8))).get();
  33. }
  34. }
  35. } catch (Exception e) {
  36. e.printStackTrace();
  37. }
  38. byteBuffer.clear();
  39. result.read(byteBuffer, null, this);
  40. }
  41. @Override
  42. public void failed(Throwable exc, Object attachment) {
  43. // 从该Channel中读取数据失败,将该Channel删除
  44. CHANNEL_LIST.remove(result);
  45. }
  46. }
  47. );
  48. }
  49. @Override
  50. public void failed(Throwable exc, Object attachment) {
  51. exc.printStackTrace();
  52. }
  53. }

客户端代码

  1. /**
  2. * aio客户端
  3. *
  4. * @author LionLi
  5. */
  6. public class AioClient {
  7. private final ExecutorService executorService;
  8. // 连接通道
  9. private AsynchronousSocketChannel socketChannel;
  10. private AsynchronousChannelGroup channelGroup;
  11. private final String username;
  12. public AioClient(String ip, int port, String username) {
  13. this.username = username;
  14. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  15. executorService = Executors.newCachedThreadPool();
  16. try {
  17. channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
  18. socketChannel = AsynchronousSocketChannel.open(channelGroup);
  19. // 连接到远程服务器
  20. socketChannel.connect(new InetSocketAddress(ip, port)).get();
  21. send("上线了");
  22. byteBuffer.clear();
  23. socketChannel.read(byteBuffer, null, new CompletionHandler<Integer, Object>() {
  24. @Override
  25. public void completed(Integer result, Object attachment) {
  26. byteBuffer.flip();
  27. // 将buff中的内容转换成字符串
  28. String content = StandardCharsets.UTF_8.decode(byteBuffer).toString();
  29. // 显示从服务器端读取的数据
  30. System.out.println(content);
  31. byteBuffer.clear();
  32. socketChannel.read(byteBuffer, null, this);
  33. }
  34. @Override
  35. public void failed(Throwable exc, Object attachment) {
  36. exc.printStackTrace();
  37. }
  38. });
  39. } catch (Exception e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. public void send(String msg) {
  44. msg = username + ":" + msg;
  45. try {
  46. socketChannel.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8))).get();
  47. } catch (InterruptedException | ExecutionException e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. public static void main(String[] args) {
  52. System.out.print("请输入用户名: ");
  53. Scanner scanner = new Scanner(System.in);
  54. String username = scanner.nextLine();
  55. AioClient client = new AioClient("localhost", 8088, username);
  56. while (true) {
  57. // 发送数据到服务端
  58. String msg = scanner.nextLine();
  59. if (msg.equals("exit")) {
  60. client.send("下线了");
  61. client.channelGroup.shutdown();
  62. client.executorService.shutdown();
  63. break;
  64. } else {
  65. client.send(msg);
  66. }
  67. }
  68. }
  69. }

测试

  • 场景如下:
  • 启动服务端
  • 顺序启动客户端 用户ABC 并输入用户名
  • 分别打招呼
  • 倒序退出客户端 用户ABC

    watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MDQ2MTI4MQ_size_16_color_FFFFFF_t_70 1

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MDQ2MTI4MQ_size_16_color_FFFFFF_t_70 2

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MDQ2MTI4MQ_size_16_color_FFFFFF_t_70 3

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MDQ2MTI4MQ_size_16_color_FFFFFF_t_70 4

项目已上传到gitee

地址: netty-demo

如果帮到您了,请帮忙点个star

发表评论

表情:
评论列表 (有 0 条评论,462人围观)

还没有评论,来说两句吧...

相关阅读