java网络编程——NIO架构

以你之姓@ 2024-03-22 09:30 149阅读 0赞

目录

1.什么是NIO

2.NIO结构

3.基于NIO下的聊天系统实现

4.Netty


1.什么是NIO

NIO:java non-blocking IO,同步非阻塞IO。

BIO是阻塞IO,即每一个事件都需要分配一个进程给他,如果客户端没有连接上,则一直阻塞等待。

而NIO,异步 I/O 是一种没有阻塞地读写数据的方法:该架构下我们可以注册对特定 I/O 事件诸如数据可读、新连接到来等等,而在发生这样感兴趣的事件时,系统将会告诉您,而不用一直等待。

打个比方:

五个人(请求)写作业,BIO架构下是五个老师(五个进程)看着写,学生直接在书上写(内存读写)。
NIO架构下则是一个老师(进程)看着写,要求学生先写在本子上(buffer,缓冲区),并且一直问助教(selector),写好的(事件就绪)交上来!

2.NIO结构

NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector。

Channel**:**和IO中的Stream(流)类似。只不过Stream是单向的,channel是双向的,既可以用来进行读操作,又可以用来进行写操作。

Buffer:NIO中的缓冲区,本质上是一个可读取的内存块。当向buffer写入数据时,buffer会记录下写了多少数据。一旦要读取数据,需要通过flip()方法将Buffer从写模式切换到读模式,读取之前写入到buffer的所有数据。

NIO和java中普通IO最大的区别是数据打包和传输方式。传统IO基于字节流和字符流进行操作,而NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。

cb9ee6bb42694cdbbdb03952ad9a284b.jpeg

如上图所示:客户端/服务端将需要传输的数据存入到buffer缓冲区中,再通过java流获取对应的channel,然后使用channel写入/读取缓冲区中的数据,输出到对应的目标(文件/字节流)中。

selector:要使用Selector, 得向Selector注册Channel,然后调用它的select()方法进行监听:这个方法会一直阻塞到至少一个注册的通道有事件就绪;当事件就绪,会把对应的selectionKey(用于关联channel,每一个类型的事件对应一个selectionKey,可以帮助获得对应的channel进行后续操作)加入到内部集合,并返回;一旦这个方法返回,线程就可以处理这些事件。

注:select()方法是阻塞的,select(1000)方法阻塞1000毫秒;selectNow()方法是非阻塞的

当客户端发起一次请求事件过后,NIO架构的响应过程如下图所示:

9025978d58b940d4bd07c017e83e66e8.jpeg

首先,服务端会获得一个ServerSocketChannel并向selector注册,即可等待客户端连接。

客户端向服务端发起请求,selector通过select()方法监听到事件发生(如客户端读事件),分配线程处理该事件;服务端将事件类型对应的selectionKey返回,然后通过key获得处理的channel,通过channel.read()方法将数据写入buffer,并返回至客户端;客户端从buffer中读到需要的数据,请求结束。

Selector(选择区)用于监听多个通道的事件(比如:连接打开,数据到达),只有当 通道 真正有读写事件发生时,才会进行读写,大大减少系统开销,避免了多线程之间的上下文切换。这使得一个I/O线程可以并发处理多个客户端连接和读写操作,极大提升了性能。

3.基于NIO下的聊天系统实现

基于以上架构,我们可以根据NIO结构,编写一个聊天系统:

(1)服务端:

  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.nio.ByteBuffer;
  4. import java.nio.channels.*;
  5. import java.nio.charset.StandardCharsets;
  6. import java.util.Iterator;
  7. public class ChatServer {
  8. private ServerSocketChannel serverSocketChannel;
  9. private Selector selector;
  10. public static final int port = 8080;
  11. public ChatServer(){
  12. try {
  13. //获取ServerSocketChannel供客户端连接,并开放端口
  14. serverSocketChannel = ServerSocketChannel.open();
  15. serverSocketChannel.socket().bind(new InetSocketAddress(port));
  16. serverSocketChannel.configureBlocking(false);
  17. //得到Selector,并注册channel
  18. selector = Selector.open();
  19. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  20. System.out.println("服务器就绪");
  21. }catch (IOException e){
  22. e.printStackTrace();
  23. }
  24. }
  25. public void listen(){
  26. try {
  27. while (true){
  28. //阻塞2秒
  29. int count = selector.select(2000);
  30. if (count>0){
  31. //有事件需要处理
  32. Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
  33. while (keyIterator.hasNext()){
  34. SelectionKey key = keyIterator.next();
  35. //客户端连接事件,为客户端生成SocketChannel
  36. if (key.isAcceptable()){
  37. SocketChannel socketChannel = serverSocketChannel.accept();
  38. //非阻塞channel才能进行注册
  39. socketChannel.configureBlocking(false);
  40. //注册,绑定事件读,并为该channel关联buffer
  41. socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
  42. System.out.println("客户端:"+socketChannel.getRemoteAddress()+" 成功连接");
  43. }
  44. //读取事件
  45. if (key.isReadable()){
  46. readMessage(key);
  47. }
  48. //删除已处理key,避免重复操作
  49. keyIterator.remove();
  50. }
  51. }else {
  52. continue;
  53. }
  54. }
  55. }catch (IOException e){
  56. e.printStackTrace();
  57. }
  58. }
  59. //从客户端读取消息
  60. private void readMessage(SelectionKey key){
  61. SocketChannel channel = null;
  62. try {
  63. //通过key反向获取channel
  64. channel = (SocketChannel) key.channel();
  65. //获取该channel关联buffer
  66. ByteBuffer buffer = ByteBuffer.allocate(1024);
  67. //从buffer中读取数据
  68. int read = channel.read(buffer);
  69. if (read>0){
  70. String msg = new String(buffer.array()).trim();
  71. //转发消息(排除自己)
  72. transformMessage(msg,channel);
  73. }
  74. }catch (IOException e){
  75. try {
  76. System.out.println(channel.getRemoteAddress()+"下线");
  77. //取消注册
  78. key.cancel();
  79. //关闭通道
  80. channel.close();
  81. }catch (Exception exception){
  82. exception.printStackTrace();
  83. }
  84. }
  85. }
  86. //转发消息给其他客户端
  87. public void transformMessage(String msg, SocketChannel self) throws IOException {
  88. System.out.println("服务器转发消息");
  89. //遍历所有注册到selector的channel,进行转发
  90. for (SelectionKey key:selector.keys()){
  91. Channel targetChannel = key.channel();
  92. //排除自己
  93. if (targetChannel instanceof SocketChannel && targetChannel!=self){
  94. //转发消息
  95. SocketChannel dest = (SocketChannel) targetChannel;
  96. ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
  97. dest.write(buffer);
  98. }
  99. }
  100. }
  101. public static void main(String[] args) {
  102. ChatServer chatServer = new ChatServer();
  103. chatServer.listen();
  104. }
  105. }
  1. 建立ServerSocketChannel,开放端口,并设置为非阻塞。
  2. 获取一个selector,并将channel注册进去,绑定触发事件及对应的SelectionKey
  3. 等待客户端连接。
  4. 当select()方法监听到事件,获取事件的selectionKey集合;遍历集合,处理事件。(连接事件则创建socketChannel连接;读事件则读取buffer中消息,并转发至其他客户端)

注:只有阻塞模式下,channel才可以向selector注册serverSocketChannel.accept()**获取的SocketChannel也需要设置;server端先生成一个ServerSocketChannel并注册后,客户端才可以用SocketChannel对其进行连接;

(2)客户端:

  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.nio.ByteBuffer;
  4. import java.nio.channels.SelectionKey;
  5. import java.nio.channels.Selector;
  6. import java.nio.channels.SocketChannel;
  7. import java.nio.charset.StandardCharsets;
  8. import java.util.Iterator;
  9. import java.util.Scanner;
  10. public class ChatClient implements Runnable{
  11. private SocketChannel socketChannel;
  12. private final String host = "127.0.0.1";
  13. private final int port = 8080;
  14. private Selector selector;
  15. private String username;
  16. public ChatClient(){
  17. try {
  18. socketChannel=SocketChannel.open(new InetSocketAddress(host, port));
  19. socketChannel.configureBlocking(false);
  20. selector=Selector.open();
  21. socketChannel.register(selector, SelectionKey.OP_READ);
  22. username = socketChannel.getLocalAddress().toString().substring(1);
  23. System.out.println("客户端就绪");
  24. }catch (IOException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. //发送消息
  29. public void sendMsg(String msg){
  30. msg = username + ":" + msg;
  31. try {
  32. socketChannel.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)));
  33. } catch (IOException e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. //读取回复消息
  38. public void readMsg(){
  39. try {
  40. int select = selector.select();
  41. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  42. while (iterator.hasNext()){
  43. SelectionKey key = iterator.next();
  44. if (key.isReadable()){
  45. SocketChannel channel = (SocketChannel) key.channel();
  46. ByteBuffer buffer = ByteBuffer.allocate(1024);
  47. channel.read(buffer);
  48. String str = new String(buffer.array());
  49. System.out.println(str.trim());
  50. }
  51. iterator.remove();
  52. }
  53. } catch (IOException e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. @Override
  58. public void run() {
  59. while (true){
  60. this.readMsg();
  61. //每隔1秒监听一次
  62. try {
  63. Thread.sleep(1000);
  64. } catch (Exception e) {
  65. e.printStackTrace();
  66. }
  67. }
  68. }
  69. public static void main(String[] args) {
  70. ChatClient chatClient = new ChatClient();
  71. //启动线程读取
  72. new Thread(chatClient).start();
  73. Scanner scanner = new Scanner(System.in);
  74. while (scanner.hasNextLine()){
  75. String str = scanner.nextLine();
  76. chatClient.sendMsg(str);
  77. }
  78. }
  79. }
  1. 获取scocketServer连接服务端,设置非阻塞模式。
  2. 连接服务端。
  3. 发起事件:发送消息,将消息写入channel中。
  4. 读取服务器转发的其他客户单消息。

(3)运行效果:

服务端:

d1d475444b604a2e87109198972db03e.png

客户端1:

d41196629359452d9379929fffadeb80.png

客户端2:

abe77ba7af1442a485fc036113cccf2a.png

客户端3:

69bd3c4ff04f439d8174f16ba8bc3d6c.png

4.Netty

Netty是 一个异步事件驱动的网络应用程序框架(即基于NIO架构),用于快速开发可维护的高性能协议服务器和客户端。

为什么会有Netty?

  • 1)NIO的类库和API繁杂,使用起来比较麻烦;
  • 2)开发工作量和难度大,面临例如断连重连、半包读写、失败缓存等问题;
  • 3)使用原生NIO编程需要掌握Reactor模型、多线程编程等额外技能,要求较高;
  • 4)java原生NIO存在一定的bug,可能会影响NIO编程。

而Netty就是基于Reactor多线程模型 对 JDK 自带的 NIO 的 API 进行了良好的封装,解决了上述问题。且Netty拥有高性能、 吞吐量更高,延迟更低,减少资源消耗,最小化不必要的内存复制等优点。

66321ef25c304d748d59132dbf1e579a.png

其核心在于 可拓展事件驱动模型、全局交互API、零拷贝。

Netty快速使用,其使用形式类似于NIO架构,也是分为服务端、客户端:

(1)服务端:

  1. import io.netty.bootstrap.ServerBootstrap;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.ChannelInboundHandlerAdapter;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. import io.netty.handler.codec.string.StringDecoder;
  10. public class NettyServer {
  11. public static void main(String[] args) {
  12. //定义线程组:BossEventLoop(负责连接) , WorkerEventLoop(负责业务读写)
  13. EventLoopGroup bossEventLoop = new NioEventLoopGroup(1);
  14. EventLoopGroup workerEventLoop = new NioEventLoopGroup();
  15. try {
  16. // 1. ServerBootstrap:启动器,负责组装netty组件,启动服务器
  17. new ServerBootstrap()
  18. // 2.存入线程组
  19. .group(bossEventLoop,workerEventLoop)
  20. // 3.选择服务器的ServerSocketChannel实现
  21. .channel(NioServerSocketChannel.class)
  22. // 4. worker(child) , 决定了worker能执行什么操作(handler)
  23. .childHandler(
  24. // 5. channel 代表和客户端进行读写的通道 Initializer:初始化器,负责添加别的handler
  25. new ChannelInitializer<NioSocketChannel>() {
  26. @Override
  27. protected void initChannel(NioSocketChannel ch) throws Exception {
  28. // 6. 添加具体handler
  29. ch.pipeline().addLast(new StringDecoder()); //将 ByteBuf 转为字符串
  30. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //自定义handler
  31. @Override //读事件
  32. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  33. System.out.println(msg);
  34. }
  35. });
  36. }
  37. })
  38. .bind(8080);
  39. }catch (Exception e){
  40. System.out.println("客户端断开连接");
  41. }
  42. }
  43. }

(2)客户端:

  1. import io.netty.bootstrap.Bootstrap;
  2. import io.netty.channel.ChannelInitializer;
  3. import io.netty.channel.nio.NioEventLoopGroup;
  4. import io.netty.channel.socket.nio.NioSocketChannel;
  5. import io.netty.handler.codec.string.StringEncoder;
  6. import java.net.InetSocketAddress;
  7. public class NettyClient {
  8. public static void main(String[] args) throws InterruptedException {
  9. try {
  10. // 1.启动类
  11. new Bootstrap()
  12. // 2.添加EventLoop
  13. .group(new NioEventLoopGroup())
  14. .channel(NioSocketChannel.class)
  15. .handler(new ChannelInitializer<NioSocketChannel>() {
  16. @Override
  17. protected void initChannel(NioSocketChannel ch) throws Exception {
  18. ch.pipeline().addLast(new StringEncoder());
  19. }
  20. })
  21. .connect(new InetSocketAddress("localhost",8080))
  22. .sync()
  23. .channel()
  24. //发送数据
  25. .writeAndFlush("hello world");
  26. }catch (Exception e){
  27. System.out.println("服务器异常");
  28. }
  29. }
  30. }

上述代码实现了服务端创建ServerSocketChannel,客户端连接服务端并写入消息,服务端成功读取的功能。

发表评论

表情:
评论列表 (有 0 条评论,149人围观)

还没有评论,来说两句吧...

相关阅读