从零学Netty(二)NIO通信(多人聊天室)

港控/mmm° 2022-12-13 04:44 274阅读 0赞

NIO概念

非阻塞I/O,全称Non-BlockingIO,是在Jdk1.4之后增加的一套新的操作I/O工具包

NIO是为提供I/O吞吐量而专门设计,其卓越的性能甚至可以与C媲美。

NIO是通过Reactor模式的事件驱动机制来达到Non blocking的,就是我们将事件注册到Reactor中,当有相应的事件发生时,Reactor便会告知我们有哪些事件发生了,我们再根据具体的事件去做相应的处理。

NIO三大核心

NIO有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)

缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer

通道(Channel):NIO的通道类似于流区别如下:

  1. 通道可以同时进行读写,而流只能读或者只能写
  2. 通道可以实现异步读写数据
  3. 通道可以从缓冲读数据,也可以写数据到缓冲

选择器(Selector):

  1. Selector 能够检测多个注册的通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
  2. 只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程
  3. 避免了多线程之间的上下文切换导致的开销

原理图

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MDQ2MTI4MQ_size_16_color_FFFFFF_t_70

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MDQ2MTI4MQ_size_16_color_FFFFFF_t_70 1

图片来源:https://mp.weixin.qq.com/s/CRd3-vRD7xwoexqv7xyHRw

实例demo

服务端代码

  1. /**
  2. * nio服务端
  3. *
  4. * @author LionLi
  5. */
  6. public class NioServer {
  7. // 选择器
  8. private Selector selector;
  9. // 连接通道
  10. private ServerSocketChannel listenChannel;
  11. public NioServer(int port) {
  12. try {
  13. selector = Selector.open();
  14. listenChannel = ServerSocketChannel.open();
  15. // 绑定端口
  16. listenChannel.socket().bind(new InetSocketAddress(port));
  17. // 设置非阻塞模式
  18. listenChannel.configureBlocking(false);
  19. // 连接注册到selector
  20. listenChannel.register(selector, SelectionKey.OP_ACCEPT);
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. public void listen() {
  26. System.out.println("服务端正在监听.........");
  27. try {
  28. while (true) {
  29. // 获取事件
  30. if (selector.select() == 0) {
  31. continue;
  32. }
  33. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  34. while (iterator.hasNext()) {
  35. SelectionKey key = iterator.next();
  36. // 监听到不同的事件进行不同的处理
  37. listenHandler(key);
  38. // 删除当前key,防止重复处理
  39. iterator.remove();
  40. }
  41. }
  42. } catch (Exception e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. /**
  47. * 针对监听到的不同事件做不同的处理
  48. */
  49. public void listenHandler(SelectionKey key) {
  50. SocketChannel sc = null;
  51. try {
  52. // 连接事件处理
  53. if (key.isAcceptable()) {
  54. // 连接注册到selector
  55. sc = listenChannel.accept();
  56. sc.configureBlocking(false);
  57. sc.register(selector, SelectionKey.OP_READ);
  58. }
  59. // 读取事件处理
  60. if (key.isReadable()) {
  61. // 拿到socketChannel
  62. sc = (SocketChannel) key.channel();
  63. sc.configureBlocking(false);
  64. ByteBuffer buffer = ByteBuffer.allocate(1024);
  65. // 读取消息数据
  66. if (sc.read(buffer) > 0) {
  67. String msg = new String(buffer.array());
  68. System.out.println("转发消息: " + msg);
  69. // 将消息转发
  70. sendMsgToClient(msg);
  71. }
  72. }
  73. } catch (IOException e) {
  74. try {
  75. // 取消注册
  76. key.cancel();
  77. // 关闭通道
  78. sc.close();
  79. } catch (IOException e1) {
  80. e1.printStackTrace();
  81. }
  82. }
  83. }
  84. /**
  85. * 发送消息到客户端
  86. */
  87. public void sendMsgToClient(String msg) throws IOException {
  88. for (SelectionKey key : selector.keys()) {
  89. Channel channel = key.channel();
  90. if (channel instanceof SocketChannel) {
  91. SocketChannel targetChannel = (SocketChannel) channel;
  92. // 将msg写到buffer中
  93. ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
  94. // 将buffer数据写入通道
  95. targetChannel.write(buffer);
  96. }
  97. }
  98. }
  99. public static void main(String[] args) {
  100. NioServer server = new NioServer(8088);
  101. server.listen();
  102. }
  103. }

客户端代码

  1. /**
  2. * nio客户端
  3. *
  4. * @author LionLi
  5. */
  6. public class NioClient {
  7. // 选择器
  8. private Selector selector;
  9. // 连接通道
  10. private SocketChannel socketChannel;
  11. // 用户名
  12. private String username;
  13. // 启动标志位
  14. private boolean flag;
  15. public NioClient(String ip, int port,String username) {
  16. try {
  17. this.username = username;
  18. flag = true;
  19. selector = Selector.open();
  20. // 连接服务器
  21. socketChannel = SocketChannel.open(new InetSocketAddress(ip, port));
  22. // 设置非阻塞
  23. socketChannel.configureBlocking(false);
  24. // 连接注册到selector
  25. socketChannel.register(selector, SelectionKey.OP_READ);
  26. sendMsgToServer("上线了");
  27. // 监听线程
  28. ExecutorService executor = Executors.newSingleThreadExecutor();
  29. // 循环读取服务端的消息
  30. executor.execute(() -> {
  31. while (flag){
  32. acceptMsgFromServer();
  33. }
  34. });
  35. executor.shutdown();
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. /**
  41. * 向服务端发送消息
  42. */
  43. public void sendMsgToServer(String msg) {
  44. msg = username + ":" + msg;
  45. try {
  46. // 发送消息
  47. socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
  48. } catch (Exception e) {
  49. e.printStackTrace();
  50. }
  51. }
  52. /**
  53. * 接收服务端发来的消息
  54. */
  55. public void acceptMsgFromServer() {
  56. try {
  57. if (selector.select() > 0) {
  58. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  59. while (iterator.hasNext() && flag) {
  60. SelectionKey key = iterator.next();
  61. if (key.isReadable()) {
  62. SocketChannel sc = (SocketChannel) key.channel();
  63. ByteBuffer buffer = ByteBuffer.allocate(1024);
  64. sc.read(buffer);
  65. String msg = new String(buffer.array());
  66. System.out.println(msg.trim());
  67. // 移除当前的key,防止重复操作
  68. iterator.remove();
  69. }
  70. }
  71. }
  72. } catch (Exception e) {
  73. e.printStackTrace();
  74. }
  75. }
  76. public static void main(String[] args) {
  77. System.out.print("请输入用户名: ");
  78. Scanner scanner = new Scanner(System.in);
  79. String username = scanner.nextLine();
  80. NioClient client = new NioClient("localhost", 8088,username);
  81. while (true) {
  82. // 发送数据到服务端
  83. String msg = scanner.nextLine();
  84. if (msg.equals("exit")) {
  85. try {
  86. client.flag = false;
  87. client.sendMsgToServer("下线了");
  88. client.socketChannel.finishConnect();
  89. client.selector.close();
  90. break;
  91. } catch (IOException e) {
  92. e.printStackTrace();
  93. }
  94. } else {
  95. client.sendMsgToServer(msg);
  96. }
  97. }
  98. }
  99. }

测试

  • 场景如下:
  • 启动服务端
  • 顺序启动客户端ABC 并输入用户名
  • 分别打招呼
  • 倒序退出客户端ABC

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MDQ2MTI4MQ_size_16_color_FFFFFF_t_70 2

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MDQ2MTI4MQ_size_16_color_FFFFFF_t_70 3

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MDQ2MTI4MQ_size_16_color_FFFFFF_t_70 4

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MDQ2MTI4MQ_size_16_color_FFFFFF_t_70 5

项目已上传到gitee

地址: netty-demo

如果帮到您了,请帮忙点个star

发表评论

表情:
评论列表 (有 0 条评论,274人围观)

还没有评论,来说两句吧...

相关阅读