Netty(一)_网络IO、BIO、NIO

青旅半醒 2022-12-17 14:53 270阅读 0赞

Netty(一)_网络IO、BIO、NIO

“本篇是Netty的前置篇。”

文章目录

  • Netty(一)_网络IO、BIO、NIO
    • Socket
    • _BIO
    • _NIO
      • 缓冲区Buffer
      • 通道Channel
      • Selector(选择器)
    • NIO_TCP
    • 零拷贝

Socket

在接触网络IO前,我们必须先说说Socket这个东西。

在网络通信中,数据从一台设备发出到另一台设备接收,需要经过OSI七层模型。在TCP/IP模型将之抽象成了五层:

img

我的计网专栏

https://blog.csdn.net/junsirhl/category_9484744.html

在说Socket的时候,注意我们说的是Linux/Unix的Socket

在 UNIX/Linux 系统中,为了统一对各种硬件的操作,简化接口,不同的硬件设备也都被看成一个文件。对这些文件的操作,等同于对磁盘上普通文件的操作。

UNIX/Linux 中的一切都是文件!

为了表示和区分已经打开的文件,UNIX/Linux 会给每个文件分配一个 ID,这个 ID 就是一个整数,被称为文件描述符(File Descriptor)。例如:

  • 通常用 0 来表示标准输入文件(stdin),它对应的硬件设备就是键盘;
  • 通常用 1 来表示标准输出文件(stdout),它对应的硬件设备就是显示器。

UNIX/Linux 程序在执行任何形式的 I/O 操作时,都是在读取或者写入一个文件描述符。

一个文件描述符只是一个和打开的文件相关联的整数,它的背后可能是一个硬盘上的普通文件、FIFO、管道、终端、键盘、显示器,甚至是一个网络连接。

网络连接也是一个文件,它也有文件描述符!

你必须理解这句话。

我们可以通过 socket() 函数来创建一个网络连接,或者说打开一个网络文件,socket() 的返回值就是文件描述符。有了文件描述符,我们就可以使用普通的文件操作函数来传输数据了,例如:

  • 用 read() 读取从远程计算机传来的数据;
  • 用 write() 向远程计算机写入数据。

对这个网络连接进行文件的读写这其实就是网络IO了。也就是说,只要用 socket() 创建了连接,剩下的就是文件操作了。

所以,我们平常说的Socket,其实就是操作系统为我们创建的一个代表网络连接的文件,我们要进行网络IO,也即对这个文件进行IO。

在进行网络通信的时候,以BS通信为例,不管服务端还是客户端,双方都需要持有Socket,这样才能保证数据的双向传输。

在TCP连接中,双方经过三次握手建立连接,通过四次挥手释放连接。这就不多说了。

_BIO

学习BIO之前,我们需要知道什么是IO模型

IO模型

I/O 模型简单的理解:基本BS结构是Brower端发送请求,服务端接收并处理请求。

IO模型在其中的含义,就是用什么样的通道机制进行数据的发送和接收,IO模型的好坏很大程度上决定了程序通信的性能

在Java中, 共支持 3 种网络编程模型/IO 模式:BIO、NIO、AIO

Blocking IO - 同步并阻塞的IO模型

服务器实现模式为一个连接开启一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,该模型的缺点是如果这个连接不做任何事情会造成不必要的线程开销,并且当客户端的请求得不到响应时会一直阻塞,造成性能影响。

BIO会为每个连接进行系统调用,询问操作系统该连接是否有读写事件。系统调用需要进行上下文切换,即BIO不光线程数多,线程还要在操作系统内核与应用程序间频繁进行上下文切换

当然,BIO为线程数过多的问题提供了线程池方案,我们是可以使用线程池技术解决的,但是系统调用造成的上下文切换并没有解决。

BIO其基本IO模型的简易示意图如下:

65aecf7ab8fd48742dadcc82f8ca375b.png

在Java中,BIO相关的类和接口都在java.io下

BIO 方式适用于连接数目比较小且固定的架构,BIO方式编程也是JDK1.4 以前的唯一选择,程序简单易理解

下面通过一个一段小程序,说明Java中的BIO的如何使用的

Java BIO 编程流程

  1. 服务器端启动一个 ServerSocket,这个Socket主要是用来接收客户端的请求的。
  2. 客户端启动 Socket 对服务器进行通信,客户端与服务端通过这个Socket对服务端发送和接收消息。注意,这个Socket的意义在于建立连接后的BS-IO行为。
  3. 客户端发出请求后, 先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝 - 此步得不到响应则阻塞
  4. 如果有响应,客户端线程会等待请求结束后,在继续执行,即客户端线程得到响应前持续阻塞

Java BIO 编程案例

1.使用 BIO 模型编写一个服务器端,监听 6666 端口,当有客户端连接时,就启动一个线程与之通讯。

2.要求使用线程池机制改善,可以连接多个客户端.

  1. 服务器端可以接收客户端发送的数据(telnet 方式:WIN+R -> CMD -> telnet ip 端口)
  1. /*
  2. * 服务端编程
  3. */
  4. //1. 创建一个线程池
  5. //2. 如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法)
  6. ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
  7. //创建 ServerSocket 并绑定端口
  8. ServerSocket serverSocket = new ServerSocket(6666);
  9. System.out.println("服务器启动了");
  10. while (true) {
  11. //监听,等待客户端连接
  12. System.out.println("等待连接....");
  13. //accept()方法持续保持等待接收请求
  14. //返回与该请求对应的套接字socket,通过此Socket,进行与客户端的连接。
  15. //根据前面所学,这个Socket是由客户端创建的,服务端这里要接收到
  16. final Socket socket = serverSocket.accept();
  17. System.out.println("连接到一个客户端");
  18. //有了Socket,就可以进行互相通信了,我们把Socket扔进线程池。
  19. //注意,下面这段代码并不会立即执行run!
  20. newCachedThreadPool.execute(new Runnable() {
  21. public void run() {
  22. //我们重写
  23. //可以和客户端通讯
  24. handler(socket); }
  25. });
  26. }
  27. }
  28. //编写一个 handler 方法,和客户端通讯
  29. public static void handler(Socket socket) {
  30. try {
  31. //创建容量1024个字节的字节数组
  32. //一个英文字符占1个字节 一个汉字占3个字节 换行占2个字节(\n)
  33. byte[] bytes = new byte[1024];
  34. //通过 socket 获取输入流
  35. InputStream inputStream = socket.getInputStream();
  36. //循环的读取客户端发送的数据
  37. while (true) {
  38. System.out.println("read....");
  39. //将内容读入byte[]数组
  40. int read = inputStream.read(bytes);
  41. //返回值为读取的长度 返回值 = -1 代表全部内容读取完毕
  42. if(read != -1) {
  43. //输出客户端发送的数据
  44. System.out.println(new String(bytes, 0, read));
  45. } else {
  46. break;
  47. }
  48. }
  49. }catch (Exception e) {
  50. e.printStackTrace();
  51. }finally {
  52. //服务端读取完客户端发送的数据后,关闭连接操作
  53. System.out.println("关闭和 client 的连接");
  54. try {
  55. socket.close();
  56. }catch (Exception e) {
  57. e.printStackTrace();
  58. }
  59. }
  60. }
  61. }

以上程序大致示意图

在这里插入图片描述

前面说过了BIO的致命缺陷,即服务器开启的线程过多所带来的问题。

这里强调下,Server对整个连接的IO读取都是同步的!即接收连接到网络IO都是一个线程又进行接收连接又进行处理的。但可惜的是,NIO并没有解决同步带来的问题,同步的问题后面再说,这里忍不住提一嘴…

下面我们看看NIO是如何进行解决的。

_NIO

Java NIO 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的 输入/输出的新特性,被统称为 NIO(即 New IO),是同步非阻塞的

NIO 相关类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写

NIO是面向缓冲区 ,或者面向编程的。而BIO则是面向流编程的,一个离散,一个分散。一个高效、一个低效。

数据读取到一个缓冲区,需要时可在缓冲区中前后移动读写数据,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络。

注意这个缓冲区是存在于JVM中的。

Java NIO 的非阻塞模式体现在,一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。

非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入, 这个线程同时可以去做别的事情。

NIO的最简易版组件关系示意图

d5c41c69ed2e110f690e876d659a26b4.png

注意不管中间新增了什么组件,最终都是一个线程对应n个客户端的连接

该图表示及相关细节:

  1. 每个 channel 都会对应一个 Buffer(一个连接频道一个缓冲区),channel即对Socket的封装,即对应BIO中的Sokcet,后面细说。
  2. 新增一个Selector(选择器) 给一个线程,选择器后面说。
  3. 一个线程(选择器)对应多个 channel
  4. 程序切换到哪个 channel 是由事件Event决定的 (NIO是基于事件驱动的
  5. Selector 会根据不同的事件,使线程在各个通道上切换 ,进行IO操作。这里就必须由某个Selector对应的线程进行死循环,一旦某个Channel有事件发生,进行处理
  6. Buffer 就是一个内存块 , 底层是一个数组,由JDK定义。
  7. 数据的读取写入都是通过 Buffer, 即Buffer被定义为全双工通信,在BIO 中,要么是操作输入流,要么操作输出流。
  8. 这种模型依赖于HTTP2.0的多路复用的技术,做到同一个连接并发处理多个请求

以下对这个模型进行详细阐述,并对各个组件详细介绍。

缓冲区Buffer

缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),本质是一个数据结构,由JDK定义,该结构提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。

在 NIO 中,Buffer 是一个顶层父类,它是一个抽象类,类的层级关系图:

971475d84d34e400c89f9defabcfa0c5.png

各子类含义:

  • ByteBuffer:存储字节数据到缓冲区
  • …其他比对类名即可

Buffer 类定义了所有的缓冲区都具有的四个属性来提供关于其所包含的数据元素的信息:

  1. // Invariants: mark <= position <= limit <= capacity
  2. //标记用,不用关注
  3. private int mark = -1;
  4. //下一个要读写的元素的索引
  5. private int position = 0;
  6. //limit位置及之后位置的元素不可读写
  7. private int limit;
  8. //容量
  9. private int capacity;

Buffer类还定义了一系列针对Buffer容器的方法,由于用的少,这里就不赘述。

需要注意的是,ByteBuffer作为Buffer的子类使用率是最高的。因为大部分传输都是以字节为基本单位进行传输的

以intBuffer为例 - Buffer小实例

  1. //创建一个Buffer, 大小为 5, 即可以存放5个int
  2. IntBuffer intBuffer = IntBuffer.allocate(5);
  3. //向buffer 存放数据
  4. for(int i = 0; i < intBuffer.capacity(); i++) {
  5. intBuffer.put( i * 2);
  6. }
  7. //从buffer读取数据
  8. //读写切换
  9. intBuffer.flip();
  10. //自定义游标 设置从1位置开始读 也就是第二个位置
  11. intBuffer.position(1);
  12. System.out.println(intBuffer.get());
  13. //限制只能读到第三个字节之前
  14. intBuffer.limit(3);
  15. //把残留可操作元素全部拿出来
  16. while (intBuffer.hasRemaining()) {
  17. System.out.println(intBuffer.get());
  18. }
  19. //结果输出2,4

关于flip()

在进行读写的时候,都需要用到positon来指向操作元素,所以,读写切换时,必然需要使position归位

  1. //保证读写一致
  2. public final Buffer flip() {
  3. limit = position;
  4. position = 0; //游标归位
  5. mark = -1;
  6. return this;
  7. }

最常用的ByteBuffer小案例

  1. //创建一个Buffer
  2. ByteBuffer buffer = ByteBuffer.allocate(64);
  3. //类型化方式放入数据 ByteBuffer专有方法
  4. buffer.putInt(100);
  5. buffer.putLong(9);
  6. buffer.putChar('尚');
  7. buffer.putShort((short) 4);
  8. //取出
  9. buffer.flip();
  10. System.out.println();
  11. System.out.println(buffer.getInt());
  12. System.out.println(buffer.getLong());
  13. System.out.println(buffer.getChar());
  14. System.out.println(buffer.getShort());
  15. }
  16. }
  17. //结果:
  18. 100
  19. 9
  20. 4

通道Channel

NIO 的Channel你可以想成上面所说的Socket。其实也是JDK对Socket的封装:

常 用 的 Channel 类 有 : FileChannel 、 DatagramChannel 、 ServerSocketChannel 和 SocketChannel

ServerSocketChannel 对应 BIO中的ServerSocket , SocketChannel 对应 BIO中的Socket

FileChannel 用于文件的数据读写,DatagramChannel 用于 UDP 的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写

对应的类继承结构如下:

2ea30241d9b18a19154aa400bdae87fb.png

关于FileChannel,主要用来对本地文件进行 IO 操作,常见的方法有

  1. //从通道读取数据并放到缓冲区中
  2. public int read(ByteBuffer dst)
  3. //把缓冲区的数据写到通道中
  4. public int write(ByteBuffer src)
  5. //从目标通道中复制数据到当前通道,count表示长度
  6. public long transferFrom(ReadableByteChannel src, long position, long count)
  7. //把数据从当前通道复制给目标通道
  8. public long transferTo(long position, long count, WritableByteChannel target)

下面通过四个实例演示如何使用FileChannel

实例1:使用前面的 ByteBuffer(缓冲) 和 FileChannel(通道), 将 “hello,world” 写入到 file01.txt 中

若文件不存在则会自动创建

  1. String str = "hello,world";
  2. //创建一个文件输出流,我们通过它得到FileChannel
  3. FileOutputStream fileOutputStream = new FileOutputStream("d:\\file01.txt");
  4. //通过 fileOutputStream 获取 对应的 FileChannel
  5. //这个 fileChannel 真实类型是 FileChannelImpl
  6. FileChannel fileChannel = fileOutputStream.getChannel();
  7. //创建一个缓冲区 ByteBuffer
  8. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  9. //将 str 放入 缓冲区byteBuffer
  10. byteBuffer.put(str.getBytes());
  11. //对byteBuffer 进行flip
  12. byteBuffer.flip();
  13. //将byteBuffer 数据写入到 fileChannel
  14. fileChannel.write(byteBuffer);
  15. fileOutputStream.close();

实例2:将 file01.txt 中的数据读入到程序,并显示在控制台屏幕
2) 假定文件已经存在

  1. //创建文件的输入流
  2. File file = new File("d:\\file01.txt");
  3. FileInputStream fileInputStream = new FileInputStream(file);
  4. //通过fileInputStream 获取对应的FileChannel -> 实际类型 FileChannelImpl
  5. FileChannel fileChannel = fileInputStream.getChannel();
  6. //创建缓冲区
  7. ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
  8. //将通道的数据读入到Buffer
  9. fileChannel.read(byteBuffer);
  10. //将byteBuffer的字节数据 转成String
  11. System.out.println(new String(byteBuffer.array()));
  12. fileInputStream.close();

实例3:使用一个Buffer完成上面两个功能,即把一个文件的内容复制到另一个文件。

  1. //文件输入流,从1.txt读取数据 一开始需要在项目根目录下创建1.txt文件并加入内容
  2. FileInputStream fileInputStream = new FileInputStream("1.txt");
  3. //获取输入通道
  4. FileChannel fileChannel01 = fileInputStream.getChannel();
  5. //文件输入流,把1.txt中的内容读取到2.txt
  6. FileOutputStream fileOutputStream = new FileOutputStream("2.txt");
  7. //获取输出通道
  8. FileChannel fileChannel02 = fileOutputStream.getChannel();
  9. //共享缓存区 512字节
  10. ByteBuffer byteBuffer = ByteBuffer.allocate(512);
  11. //如果文件字节长度大于Buffer容量,需要进行多次读取
  12. while (true) {
  13. //多次读取一定要清空buffer
  14. byteBuffer.clear();
  15. int read = fileChannel01.read(byteBuffer);
  16. if(read == -1) {
  17. //表示读完
  18. break;
  19. }
  20. //将buffer 中的数据写入到 fileChannel02 -- 2.txt
  21. byteBuffer.flip();
  22. fileChannel02.write(byteBuffer);
  23. }
  24. //关闭相关的流
  25. fileInputStream.close();
  26. fileOutputStream.close();

\[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jEXqV0Af-1604136623162)(C:\\Users\\20335\\AppData\\Roaming\\Typora\\typora-user-images\\image-20201031163843458.png)\]

实例4:实现文件拷贝,利用transferFrom()方法

  1. //创建相关流
  2. FileInputStream fileInputStream = new FileInputStream("d:\\a.jpg");
  3. FileOutputStream fileOutputStream = new FileOutputStream("d:\\a2.jpg");
  4. //获取各个流对应的filechannel
  5. FileChannel sourceCh = fileInputStream.getChannel();
  6. FileChannel destCh = fileOutputStream.getChannel();
  7. //使用transferForm完成拷贝
  8. destCh.transferFrom(sourceCh,0,sourceCh.size());
  9. //关闭相关通道和流
  10. sourceCh.close();
  11. destCh.close();
  12. fileInputStream.close();
  13. fileOutputStream.close();

Buffer与Channel的一些其他扩展功能

将buffer改成只读。

ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();

MappedByteBuffer类支持让文件直接在堆外(jvm外)内存进行修改。

  1. //创建随机存取文件流 并设置该文件流为读写流
  2. RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt", "rw");
  3. //获取对应的通道
  4. FileChannel channel = randomAccessFile.getChannel();
  5. /**
  6. * 参数1: FileChannel.MapMode.READ_WRITE 使用的读写模式
  7. * 参数2: 0 : 可以直接修改的起始位置
  8. * 参数3: 5: 是映射到内存的大小(不是索引位置) ,即将 1.txt 的多少个字节映射到内存
  9. * 可以直接修改的范围就是 0-5
  10. * 实际类型 MappedByteBuffer的实际类型是DirectByteBuffer
  11. */
  12. MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
  13. mappedByteBuffer.put(0, (byte) 'H');
  14. mappedByteBuffer.put(3, (byte) '9');
  15. mappedByteBuffer.put(5, (byte) 'Y');//IndexOutOfBoundsException越界异常
  16. //关闭流
  17. randomAccessFile.close();
  18. System.out.println("修改成功~~");
  1. //文件通道支持以下类型
  2. //只读
  3. public static final MapMode READ_ONLY
  4. = new MapMode("READ_ONLY");
  5. //读写类型
  6. public static final MapMode READ_WRITE
  7. = new MapMode("READ_WRITE");
  8. //写时复制 private (copy-on-write) mapping
  9. public static final MapMode PRIVATE
  10. = new MapMode("PRIVATE");

写时复制:这是我们学习多线程的重要概念。其核心思想是,如果有多个调用者(callers)同时请求相同资源(如内存或磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者试图修改资源的内容时,系统才会真正复制一份专用副本(private copy)给该调用者,而其他调用者所见到的最初的资源仍然保持不变。

这过程对其他的调用者都是透明的。此作法主要的优点是如果调用者没有修改该资源,就不会有副本(private copy)被创建,减少系统开销。因此多个调用者只是读取操作时可以共享同一份资源。

ServerSocketChannel与SocketChannel

ServerSocketChannel 负责在服务器端监听新的客户端 Socket 连接

相关方法有:

  1. //得到一个ServerSocketChannel
  2. public static ServerSocketChannel open();
  3. //绑定端口号
  4. public final ServerSocketChannel bind(SocketAddress local);
  5. //设置阻塞和非阻塞,注意设置的系统中的Socket
  6. public final SelectableChannel configureBlocking(boolean block);
  7. //接收连接,返回连接后客户端的通道
  8. public final SocketChannel accept();
  9. //注册到并设置监听事件
  10. public final SelectionKey register(Selector sel, int ops);

SocketChannel,网络 IO 通道,具体负责进行读写操作。把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区

NIO 支持通过多个 Buffer (即 Buffer 数组) 完成读写操作,即 Scattering 和 Gathering

Scattering:将数据写入到buffer时,可以采用buffer数组,依次写入

Gathering: 从buffer读取数据时,可以采用buffer数组,依次读取

实例:使用ServerSocketChannel和SocketChannel共享Buffer数组网络通信

  1. //使用 ServerSocketChannel 和 SocketChannel 网络
  2. //ServerSocketChannel是应用于服务端的网络通道
  3. // SocketChannel是用于客户端的网络通道
  4. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  5. InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);
  6. //绑定端口 ,并启动
  7. serverSocketChannel.socket().bind(inetSocketAddress);
  8. //创建buffer数组,多个数组的目的是演示聚合
  9. ByteBuffer[] byteBuffers = new ByteBuffer[2];
  10. byteBuffers[0] = ByteBuffer.allocate(5);
  11. byteBuffers[1] = ByteBuffer.allocate(5);
  12. //accept得到IO的Channel
  13. SocketChannel socketChannel = serverSocketChannel.accept();
  14. //假定从客户端接收8个字节
  15. int messageLength = 8;
  16. //循环的读取
  17. while (true) {
  18. int byteRead = 0;
  19. //八个字节以内 一直读取内容到缓存区
  20. while (byteRead < messageLength ) {
  21. //从SocketChannel读取数据,输入缓冲区数组
  22. long l = socketChannel.read(byteBuffers);
  23. byteRead += l; //累计读取的字节数
  24. System.out.println("byteRead=" + byteRead);
  25. }
  26. //接收超过八个字节 打回客户端
  27. //将所有的buffer进行flip
  28. Arrays.asList(byteBuffers).forEach(buffer -> buffer.flip());
  29. //
  30. long byteWirte = 0;
  31. //将缓冲区数组中的数据写回Channel
  32. while (byteWirte < messageLength) {
  33. long l = socketChannel.write(byteBuffers);
  34. byteWirte += l;
  35. }
  36. //将所有的buffer 进行clear
  37. Arrays.asList(byteBuffers).forEach(buffer-> {
  38. buffer.clear();
  39. });
  40. }

Selector(选择器)

用一个线程,处理多个的客户端连接,Selector选择器(也称多路复用器)是必要组件。

(在Redis、Nginx、Tomcat中也有Selector的相关概念。)

多个Channel以事件的方式注册到同一个Selector

Selector能够检测多个注册的通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的处理。

Netty 的 IO 线程 NioEventLoop 聚合了 Selector(选择器,也叫多路复用器),可以同时并发处理成百上千个客户端连接

Select抽象类中定义了几个抽象方法

  1. //打开一个Selector
  2. public static Selector open();
  3. //返回所有注册到该Selector上的通道的key
  4. public abstract Set<SelectionKey> selectedKeys();
  5. //监控所有注册的通道 阻塞方法
  6. selector.select()
  7. //立即返回IO事件发生的通道数目 非阻塞方法 如无IO事件,返回0
  8. public abstract int selectNow()
  9. //监控所有注册通道,定时阻塞
  10. public abstract int select(long timeout)

SelectionKey

表示Selector和网络通道Channel的关系,Java中定义了四种不同类型的key,代表不同的事件

  1. //读操作 值为1
  2. public static final int OP_READ = 1 << 0;
  3. //写操作 值为4
  4. public static final int OP_WRITE = 1 << 2;
  5. //代表连接已建立 值为8
  6. public static final int OP_CONNECT = 1 << 3;
  7. //代表有新的网络可以连接 值为16
  8. public static final int OP_ACCEPT = 1 << 4;

SelectionKey实例提供的部分方法

  1. //得到与之关联的通道
  2. public abstract SelectableChannel channel();
  3. //得到与该key关联的Selector对象
  4. public abstract Selector selector();
  5. //是否可accept
  6. public final boolean isAccpetable();
  7. //是否可读
  8. public final boolean isReadable();
  9. //是否可写
  10. public final boolean isWritable();

NIO_TCP

NIO的TCP通信大致示意图

cfaf331a573dcb2832fc886efa30cb2f.png

说明:

  • ServerSocketChannel本身也要接收请求事件注册到Selector上。
  • 每个代表客户端的SocketChannel都会以不同事件注册到Selector上,返回唯一SelectionKey。

实例:实现服务器端和客户端之间的数据简单读写通讯(非阻塞)

服务端编写:

  1. //ServerSocketChannel开启
  2. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  3. //得到一个Selector对象
  4. Selector selector = Selector.open();
  5. //绑定一个端口6666, 该频道套接字在服务器端监听连接
  6. serverSocketChannel.socket().bind(new InetSocketAddress(6666));
  7. //设置为非阻塞
  8. serverSocketChannel.configureBlocking(false);
  9. //把 serverSocketChannel注册到selector关心事件为连接事件
  10. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  11. //以上,运行到这的时候,serverSocketChannel还未真正开始等待连接,只是完成了一些初始化的工作
  12. //记得吗,这还是个同步模型。不能又轮询又处理IO
  13. //开始等待客户端连接或处理IO事件
  14. while (true) {
  15. //这里我们等待1秒,如果没有事件发生, 可以处理其他事件
  16. if(selector.select(1000) == 0) {
  17. System.out.println("服务器等待了1秒,无连接");
  18. continue;
  19. }
  20. //如果select()返回>0, 代表有通道发生事件。
  21. //以下处理事件,上面就无法进行轮询
  22. //通过selectionKeys 反向获取发生IO的通道
  23. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  24. //遍历 Set<SelectionKey>, 使用迭代器遍历
  25. Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
  26. //对事件做处理
  27. while (keyIterator.hasNext()) {
  28. //获取到SelectionKey
  29. SelectionKey key = keyIterator.next();
  30. //如果是请求事件
  31. if(key.isAcceptable()) {
  32. //获得请求的socketChannel
  33. SocketChannel socketChannel = serverSocketChannel.accept();
  34. //将SocketChannel 设置为非阻塞
  35. socketChannel.configureBlocking(false);
  36. //将socketChannel 注册到selector, 关注事件为 OP_READ
  37. socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
  38. }
  39. //如果是读取事件
  40. if(key.isReadable()) {
  41. //发生 OP_READ
  42. //通过key 反向获取到对应channel
  43. SocketChannel channel = (SocketChannel)key.channel();
  44. //获取到该channel关联的buffer
  45. ByteBuffer buffer = (ByteBuffer)key.attachment();
  46. //读取该buffer的数据
  47. channel.read(buffer);
  48. System.out.println("form 客户端 " + new String(buffer.array()));
  49. }
  50. //手动从集合中移动当前的selectionKey, 防止重复操作
  51. keyIterator.remove();
  52. }
  53. }

客户端编写:

  1. //得到一个网络通道
  2. SocketChannel socketChannel = SocketChannel.open();
  3. //设置非阻塞
  4. socketChannel.configureBlocking(false);
  5. //提供服务器端的ip 和 端口
  6. InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
  7. //尝试连接服务器 connect()返回true代表连接成功
  8. if (!socketChannel.connect(inetSocketAddress)) {
  9. //完成连接时返回true
  10. while (!socketChannel.finishConnect()) {
  11. System.out.println("因为连接需要时间,客户端不会阻塞,可以做其它工作..");
  12. }
  13. }
  14. //...如果连接成功,就发送数据到socketChannel的buffer缓冲区
  15. String str = "hello, world~";
  16. //Wraps a byte array into a buffer
  17. ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
  18. //发送数据,将 buffer 数据写入 channel
  19. socketChannel.write(buffer);
  20. //等待输入 保持会话
  21. System.in.read();

零拷贝

这里再说一个比较重要的概念,虽然它对我们是透明的,但是理解它可以提高我们的网络IO的认知水平。

所谓零拷贝,是从操作系统角度,无或少CPU拷贝

  1. 零拷贝是网络编程的关键,很多性能优化都离不开
  2. 在 Java 程序中,常用的零拷贝有 mmap(内存映射) 和 sendFile
  3. 零拷贝技术主要是在系统层面上定义并实现的

传统IO数据读写

2fa11909a163a40bac1133a8f5beaa78.png

从上图中可以看出,数据从磁盘到网络共产生了四次数据拷贝,即使使用了DMA来处理了与硬件的通讯,CPU仍然需要处理两次数据拷贝,与此同时,相关线程在用户态与内核态也发生了多次上下文切换,无疑也加重了CPU负担。

mmap内存映射型零拷贝

mmap 通过内存映射,将文件映射到内核缓冲区,同时,用户空间可以共享内核空间的数据。这样,在进行网络传输时,就可以减少内核空间到用户空间的拷贝次数

9fc28c42fa87c561f446eb85e09fc52f.png

如图,这样就减少了一次CPU级的拷贝

sendFile型零拷贝

56b807f82a4d57b4093ac06fa32ccf57.png

Linux 2.1 版本 提供了 sendFile 函数,其基本原理如下:数据根本不经过用户态,直接从内核缓冲区进入到 Socket Buffer,同时,由于和用户态完全无关,就减少了一次上下文切换

mmap与sendfile区别

  • mmap适合小数据量读写,sendfile适合大文件传输
  • mmap需要四次上下文切换,sendfile需要3次上下文切换

用Java-NIO的TCP通信,利用零拷贝,实现大文件传输

编写服务端接收客户端的数据:

  1. //设置端口
  2. InetSocketAddress address = new InetSocketAddress(7001);
  3. //打开ServerSocket通道
  4. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  5. //获得Socket对象
  6. ServerSocket serverSocket = serverSocketChannel.socket();
  7. //绑定端口
  8. serverSocket.bind(address);
  9. //创建buffer 大小4KB
  10. ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
  11. while (true) {
  12. //等待连接 获得该连接的socketChannel
  13. SocketChannel socketChannel = serverSocketChannel.accept();
  14. //读取字节数
  15. int readcount = 0;
  16. while (-1 != readcount) {
  17. try {
  18. //将socketChannel中的数据读入byteBuffer
  19. readcount = socketChannel.read(byteBuffer);
  20. }catch (Exception ex) {
  21. // ex.printStackTrace();
  22. break;
  23. }
  24. //倒带 position = 0 mark 作废
  25. byteBuffer.rewind();
  26. }
  27. }

客户端编写:

  1. //创建服务端SocketChannel
  2. SocketChannel socketChannel = SocketChannel.open();
  3. //连接目标地址
  4. socketChannel.connect(new InetSocketAddress("localhost", 7001));
  5. //根目录下的文件
  6. String filename = "nettycode.zip";
  7. //得到一个文件channel
  8. FileChannel fileChannel = new FileInputStream(filename).getChannel();
  9. //在linux下一个transferTo 系统调用就可以完成传输
  10. //在windows 下 一次调用 transferTo 只能发送8m , 就需要分段传输文件
  11. //transferTo 底层使用到零拷贝
  12. //将文件以零拷贝整到SocketChannel中
  13. long transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);
  14. System.out.println("向目标通道传输完毕");
  15. //关闭
  16. fileChannel.close();

发表评论

表情:
评论列表 (有 0 条评论,270人围观)

还没有评论,来说两句吧...

相关阅读

    相关 详尽Netty():初探netty

    如果大家对java架构相关感兴趣,可以关注下面公众号,会持续更新java基础面试题, netty, spring boot,spring cloud等系列文章,一系列干货随时送

    相关 Netty自学-Netty学习()

    什么Netty? Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客