Netty基础入门——文件编程、网络编程【2】

秒速五厘米 2024-03-30 15:07 125阅读 0赞

Netty基础入门——文件编程、网络编程【2】

基础入门【1】

1 文件编程

1.1 channel

  1. 两个channel传输数据

transferTo方法一次性最多传输2G大小的文件,如果超出会丢弃

  1. public static void main(String[] args) {
  2. try (
  3. FileChannel from = new FileInputStream("from.txt").getChannel();
  4. FileChannel to = new FileOutputStream("to.txt").getChannel();
  5. ) {
  6. //传输文件【从0位置开始,传输from.size()大小的数据,传输到to文件】
  7. from.transferTo(0, from.size(), to);
  8. } catch (IOException e) {
  9. e.printStackTrace();
  10. }
  11. }

超过2G大小的文件传输

  1. public static void main(String[] args) {
  2. try (
  3. FileChannel from = new FileInputStream("from.txt").getChannel();
  4. FileChannel to = new FileOutputStream("to.txt").getChannel();
  5. ) {
  6. //此种方式传输效率高,系统底层会利用操作系统的零拷贝进行优化
  7. long size = from.size();
  8. //left 代表还剩多少字节
  9. for(long left = size; left > 0;){
  10. System.out.println("position:" + (size - left) + ",left:" + left);
  11. //起始位置:size - left
  12. left -= from.transferTo((size - left), left, to);
  13. }
  14. } catch (IOException e) {
  15. e.printStackTrace();
  16. }
  17. }

1.2 Path、Paths、Files

jdk7 引入了 Path 和 Paths 类

  • Path 用来表示文件路径
  • Paths 是工具类,用来获取 Path 实例
①检查文件是否存在
  1. // Path path = Paths.get("test/from.txt");
  2. Path path = Paths.get("test\\from.txt");
  3. System.out.println(Files.exists(path));
②创建多级目录
  1. Path path = Paths.get("data/test");
  2. System.out.println(Files.createDirectories(path));
③拷贝文件
  1. Path from = Paths.get("test/from.txt");
  2. Path to = Paths.get("data/to.txt");
  3. Files.copy(from, to);
  4. //如果文件已存在,会抛异常 FileAlreadyExistsException
  5. //如果希望用 source 覆盖掉 target,需要用 StandardCopyOption 来控制
  6. //Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);
④移动文件
  1. Path from = Paths.get("test/from.txt");
  2. Path to = Paths.get("data/to.txt");
  3. Files.move(from, to, StandardCopyOption.ATOMIC_MOVE);
⑤删除文件、目录
  1. //删除文件
  2. Path from = Paths.get("test/from.txt");
  3. Files.delete(from);
  4. //删除目录
  5. Path dir = Paths.get("test");
  6. Files.delete(dir);
⑥遍历文件夹、统计特定文件个数

观察者模式

  • 遍历文件夹

    private static void walkDir() throws IOException {

    1. Path path = Paths.get("D:\\系统默认\\桌面\\Yi-music\\music-server\\src\\main\\java\\com\\zi\\music");
    2. AtomicInteger dirCount = new AtomicInteger();
    3. AtomicInteger fileCount = new AtomicInteger();
    4. //遍历文件夹
    5. Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
    6. //遍历目录
    7. @Override
    8. public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
    9. System.out.println(dir);
    10. dirCount.incrementAndGet();
    11. return super.preVisitDirectory(dir, attrs);
    12. }
    13. //遍历文件
    14. @Override
    15. public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
    16. System.out.println(file);
    17. fileCount.incrementAndGet();
    18. return super.visitFile(file, attrs);
    19. }
    20. });
    21. System.out.println(dirCount);
    22. System.out.println(fileCount);

    }

  • 统计.java文件个数

    private static void countAbsFile() throws IOException {

    1. Path path = Paths.get("D:\\系统默认\\桌面\\Yi-music\\music-server\\src\\main\\java\\com\\zi\\music");
    2. AtomicInteger javaCount = new AtomicInteger();
    3. //遍历文件夹
    4. Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
    5. @Override
    6. public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
    7. if(file.toFile().getName().endsWith(".java")){
    8. javaCount.incrementAndGet();
    9. }
    10. return super.visitFile(file, attrs);
    11. }
    12. });
    13. System.out.println("javaCount: " + javaCount);

    }

⑦删除目录、拷贝目录
  • 删除目录

注意:删除的目录一定要是没有重要数据的文件夹,通过以下代码删除的方式,不走回收站,直接系统删除

  1. private static void deleteDir() throws IOException {
  2. Path path = Paths.get("d:\\a");
  3. Files.walkFileTree(path, new SimpleFileVisitor<Path>(){
  4. @Override
  5. public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
  6. throws IOException {
  7. Files.delete(file);
  8. return super.visitFile(file, attrs);
  9. }
  10. @Override
  11. public FileVisitResult postVisitDirectory(Path dir, IOException exc)
  12. throws IOException {
  13. Files.delete(dir);
  14. return super.postVisitDirectory(dir, exc);
  15. }
  16. });
  17. }
  • 拷贝目录

    private static void copyDir() throws IOException {

    1. long start = System.currentTimeMillis();
    2. String source = "D:\\Snipaste-1.16.2-x64";
    3. String target = "D:\\Snipaste-1.16.2-x64aaa";
    4. Files.walk(Paths.get(source)).forEach(path -> {
    5. try {
    6. String targetName = path.toString().replace(source, target);
    7. // 是目录
    8. if (Files.isDirectory(path)) {
    9. Files.createDirectory(Paths.get(targetName));
    10. }
    11. // 是普通文件
    12. else if (Files.isRegularFile(path)) {
    13. Files.copy(path, Paths.get(targetName));
    14. }
    15. } catch (IOException e) {
    16. e.printStackTrace();
    17. }
    18. });
    19. long end = System.currentTimeMillis();
    20. System.out.println(end - start);

    }

2 网络编程

2.1 阻塞模式

阻塞模式下,下面方法都会造成线程暂停

  • ServerSocketChannel.accept 会在没有连接建立时让线程暂停
  • SocketChannel.read 会在没有数据可读时让线程暂停
    阻塞:线程暂停,线程不占用cpu,但是相当于线程闲置
  • 单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持
  • 但多线程下,有新的问题,体现在以下方面

    • 32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
    • 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接

demo测试:
server:

  1. @Slf4j
  2. public class Server {
  3. public static void main(String[] args) throws IOException {
  4. //0.分配缓冲区u
  5. ByteBuffer buffer = ByteBuffer.allocate(16);
  6. //1. 创建服务器
  7. ServerSocketChannel ssc = ServerSocketChannel.open();
  8. //2. 绑定监听端口
  9. ssc.bind(new InetSocketAddress(8888));
  10. //3. 连接集合
  11. List<SocketChannel> channels = new ArrayList<>();
  12. while(true){
  13. //4. accept建立与客户端连接, SocketChannel用来与客户端通信
  14. log.debug("connecting...");
  15. SocketChannel sc = ssc.accept();//阻塞方法,线程停止运行
  16. log.debug("connected...{}", sc);
  17. channels.add(sc);
  18. for(SocketChannel channel : channels){
  19. //5. 接收客户端发送的数据
  20. log.debug("before read...{}", channel);
  21. channel.read(buffer);//阻塞方法,线程停止运行
  22. buffer.flip();//切换模式
  23. buffer.clear();
  24. log.debug("after read...{}", channel);
  25. }
  26. }
  27. }
  28. }

Client:

  1. @Slf4j
  2. public class Client {
  3. public static void main(String[] args) throws IOException {
  4. SocketChannel sc = SocketChannel.open();
  5. sc.connect(new InetSocketAddress("localhost", 8888));
  6. System.out.println("waiting....");
  7. }
  8. }
  1. 启动两个客户端和服务器端
    在这里插入图片描述

虽然启动了两个客户端,但是服务器只接收到一个,因为是阻塞的,需要等到第一个客户端连接处理完了才会轮到下一个

  1. client写入数据,让服务器处理第一个连接

选中sc变量,alt+F8,evaluate

写入数据:

  1. sc.write(StandardCharsets.UTF_8.encode("hello"))

在这里插入图片描述

  1. 结果:
    服务器已经由54250切换到处理54254
    在这里插入图片描述

如果你的idea无法同时打开两个run运行台

  • 新版:
    在这里插入图片描述
  • 旧版:
    在这里插入图片描述

2.2 非阻塞模式

非阻塞模式下,accept和read都不会让线程暂停

  • 在 ServerSocketChannel.accept 在没有连接建立时,会返回 null,继续运行
  • SocketChannel.read 在没有数据可读时,会返回 0,但线程不必阻塞,可以去执行其它 SocketChannel 的 read 或是去执行 ServerSocketChannel.accept
  • 写数据时,线程只是等待数据写入 Channel 即可,无需等 Channel 通过网络把数据发送出去

但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了 cpu

数据复制过程中,线程实际还是阻塞的(AIO 改进的地方)

阻塞 -> 非阻塞:

  1. //1. 创建服务器
  2. ServerSocketChannel ssc = ServerSocketChannel.open();
  3. //调整为非阻塞模式
  4. ssc.configureBlocking(false);
  5. ....
  6. SocketChannel sc = ssc.accept();//阻塞方法,线程停止运行
  7. //改为非阻塞
  8. sc.configureBlocking(false);

Server:

  1. @Slf4j
  2. public class Server {
  3. public static void main(String[] args) throws IOException {
  4. //0.分配缓冲区u
  5. ByteBuffer buffer = ByteBuffer.allocate(16);
  6. //1. 创建服务器
  7. ServerSocketChannel ssc = ServerSocketChannel.open();
  8. ssc.configureBlocking(false); //非阻塞
  9. //2. 绑定监听端口
  10. ssc.bind(new InetSocketAddress(8888));
  11. //3. 连接集合
  12. List<SocketChannel> channels = new ArrayList<>();
  13. while(true){
  14. //4. accept建立与客户端连接, SocketChannel用来与客户端通信
  15. SocketChannel sc = ssc.accept();//阻塞方法,线程停止运行
  16. if(sc != null){
  17. //如果没有客户端连接,sc为null
  18. log.debug("connected...{}", sc);
  19. sc.configureBlocking(false); //非阻塞
  20. channels.add(sc);
  21. }
  22. for(SocketChannel channel : channels){
  23. //5. 接收客户端发送的数据
  24. int read = channel.read(buffer);//非阻塞,线程仍然会继续运行,如果没有读到数据,read返回0
  25. if(read > 0){
  26. //切换为读模式
  27. buffer.flip();
  28. buffer.clear();
  29. log.debug("after read...{}", channel);
  30. }
  31. }
  32. }
  33. }
  34. }

Client端代码不变:

  1. @Slf4j
  2. public class Client {
  3. public static void main(String[] args) throws IOException {
  4. SocketChannel sc = SocketChannel.open();
  5. sc.connect(new InetSocketAddress("localhost", 8888));
  6. System.out.println("waiting....");
  7. }
  8. }

测试:

与上面操作一样,启动一个Server和两个Client,可以发现,虽然第一个Client没有发送数据,但是Server依然可以处理两个Client

在这里插入图片描述

2.3 多路复用与Selector

2.3.1 多路复用

单线程可以配合Selector完成对多个Channel可读写事件的监控,这称之为多路复用

  • 多路复用仅针对网络IO、普通文件IO没法利用多路复用
  • Selector可以保证:
  1. 有可连接事件时才去连接
  2. 有可读事件才去读取
  3. 有可写事件才去写入【受限于网络传输能力,只有当Channel可写时才会处方Selector的可写事件】
  1. selector四种可绑定的事件类型:
  2. 1. accept - 会在有连接请求时触发
  3. 2. connect - 是客户端,连接建立后触发
  4. 3. read - 可读事件
  5. 4. write - 可写事件
2.3.2 Selector

selector 版

selector

thread

channel

channel

channel

①好处:

  • 一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功
  • 让这个线程能够被充分利用
  • 节约了线程的数量
  • 减少了线程上下文切换

②使用步骤

  1. 1. 创建Selector
  2. Selector selector = Selector.open();
  3. 2. 绑定Channel事件(注册事件)
  4. channel.configureBlocking(false);
  5. SelectionKey key = channel.register(selector, 绑定事件);
  6. 3. 监听Channel事件(方法的返回值代表有多少 channel 发生了事件)
  7. int count = selector.select();
  8. //int count = selector.select(long timeout); //阻塞直到绑定事件发生,或是超时单位:ms
  9. //int count = selector.selectNow();//不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件
  10. 4. 处理事件

③selector什么时候不会阻塞

  • 事件发生时

    • 客户端发起连接请求,会触发 accept 事件
    • 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
    • channel 可写,会触发 write 事件
    • 在 linux 下 nio bug 发生时
  • 调用 selector.wakeup()
  • 调用 selector.close()
  • selector 所在线程 interrupt

④处理事件

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发

cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件

iter.remove();处理完事件后需要移除,否则会报NPE

[1] 处理accept事件

Client:

  1. @Slf4j
  2. public class Client {
  3. public static void main(String[] args) throws IOException {
  4. try (Socket socket = new Socket("localhost", 8888)) {
  5. System.out.println(socket);
  6. socket.getOutputStream().write("hello".getBytes(StandardCharsets.UTF_8));
  7. System.in.read();
  8. }catch (Exception e){
  9. e.printStackTrace();
  10. }
  11. }
  12. }

Server:

  1. @Slf4j
  2. public class Server {
  3. public static void main(String[] args) throws IOException {
  4. try (ServerSocketChannel channel = ServerSocketChannel.open()) {
  5. channel.bind(new InetSocketAddress(8888));
  6. System.out.println(channel);
  7. Selector selector = Selector.open();
  8. //非阻塞
  9. channel.configureBlocking(false);
  10. //处理连接事件[注册事件]
  11. // accept
  12. // selector ---------> channel
  13. channel.register(selector, SelectionKey.OP_ACCEPT);
  14. while(true){
  15. int count = selector.select();
  16. log.debug("select count:{}", count);
  17. //获取所有事件
  18. Set<SelectionKey> keys = selector.selectedKeys();
  19. //遍历所有事件,逐一处理
  20. Iterator<SelectionKey> iter = keys.iterator();
  21. while(iter.hasNext()){
  22. SelectionKey key = iter.next();
  23. //判断事件类型
  24. if(key.isAcceptable()){
  25. ServerSocketChannel c = (ServerSocketChannel) key.channel();
  26. //必须处理:事件只能被处理或者撤销
  27. SocketChannel sc = c.accept();
  28. log.debug("{}", sc);
  29. }
  30. //处理完毕之后,必须将事件移除,否则会NPE
  31. iter.remove();
  32. }
  33. }
  34. }catch (IOException e){
  35. e.printStackTrace();
  36. }
  37. }
  38. }
[2]处理read事件
  • Client端代码同accept事件的客户端
  • Server端:

    @Slf4j
    public class Server {

  1. public static void main(String[] args) throws IOException {
  2. try (ServerSocketChannel channel = ServerSocketChannel.open()) {
  3. channel.bind(new InetSocketAddress(8888));
  4. System.out.println(channel);
  5. Selector selector = Selector.open();
  6. //非阻塞
  7. channel.configureBlocking(false);
  8. //处理连接事件[注册事件]
  9. // accept
  10. // selector ---------> channel
  11. channel.register(selector, SelectionKey.OP_ACCEPT);
  12. while(true){
  13. int count = selector.select();
  14. log.debug("select count:{}", count);
  15. //获取所有事件
  16. Set<SelectionKey> keys = selector.selectedKeys();
  17. //遍历所有事件,逐一处理
  18. Iterator<SelectionKey> iter = keys.iterator();
  19. while(iter.hasNext()){
  20. SelectionKey key = iter.next();
  21. //判断事件类型
  22. if(key.isAcceptable()){
  23. ServerSocketChannel c = (ServerSocketChannel) key.channel();
  24. //必须处理:事件只能被处理或者撤销
  25. SocketChannel sc = c.accept();
  26. sc.configureBlocking(false);
  27. // read
  28. // selector ---------> channel
  29. sc.register(selector, SelectionKey.OP_READ);
  30. log.debug("连接已建立:{}", sc);
  31. }else if(key.isReadable()){
  32. SocketChannel sc = (SocketChannel) key.channel();
  33. ByteBuffer buffer = ByteBuffer.allocate(128);
  34. int read = sc.read(buffer);
  35. if(read == -1){
  36. key.cancel();
  37. sc.close();
  38. }else {
  39. buffer.flip();
  40. }
  41. }
  42. //处理完毕之后,必须将事件移除
  43. iter.remove();
  44. }
  45. }
  46. }catch (IOException e){
  47. e.printStackTrace();
  48. }
  49. }
  50. }

在这里插入图片描述

[3]消息边界处理

法一:固定消息长度

固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽

法二:按指定分隔符拆分(如:换行符)

按分隔符拆分,缺点是效率低

法三:TLV格式(type、length、value)

TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量

  • Http 1.1 是 TLV 格式
  • Http 2.0 是 LTV 格式

以方式二为例:

netty也是类似的处理方式,不过netty底层的bytebuffer是自适应的(可以扩容、缩容)

客户端1服务器ByteBuffer1ByteBuffer2发送 01234567890abcdef3333\r第一次 read 存入 01234567890abcdef扩容拷贝 01234567890abcdef第二次 read 存入 3333\r01234567890abcdef3333\r客户端1服务器ByteBuffer1ByteBuffer2

Server:

  1. @Slf4j
  2. public class Server {
  3. public static void main(String[] args) throws IOException {
  4. //1. 创建selector, 管理多个channel
  5. Selector selector = Selector.open();
  6. ServerSocketChannel ssc = ServerSocketChannel.open();
  7. ssc.configureBlocking(false);//非阻塞
  8. //2. 建立selector和channel的联系(注册)
  9. //SelectionKey就是将来事件发生后,通过它可以知道事件来自哪个channel
  10. SelectionKey sscKey = ssc.register(selector, 0, null);
  11. //设置key只关注accept事件
  12. sscKey.interestOps(SelectionKey.OP_ACCEPT);
  13. log.debug("sscKey:{}", sscKey);
  14. ssc.bind(new InetSocketAddress(8888));//channel监听端口
  15. while(true){
  16. //3. select方法,没有事件发生,线程阻塞,有事件,线程才会恢复运行
  17. //select 再事件未处理时,它不会阻塞事件发生后要么处理,要么取消,不能置之不理
  18. selector.select();
  19. //4. 处理事件,selectedKeys内部包含了所有发生的事件
  20. Iterator<SelectionKey> iter = selector.selectedKeys().iterator();//accept、read
  21. while (iter.hasNext()) {
  22. SelectionKey key = iter.next();
  23. //处理key时,要从selectedKeys集合中删除,否则下次处理会有问题【客户端正常、异常断开】
  24. iter.remove();
  25. log.debug("key:{}", key);
  26. //5. 区分事件类型
  27. if(key.isAcceptable()){
  28. ServerSocketChannel channel = (ServerSocketChannel) key.channel();
  29. SocketChannel sc = channel.accept();
  30. sc.configureBlocking(false);
  31. ByteBuffer buffer = ByteBuffer.allocate(16);//attachement
  32. //将一个byteBuffer作为附件关联到selectionKey上
  33. SelectionKey scKey = sc.register(selector, 0, buffer);
  34. scKey.interestOps(SelectionKey.OP_READ);//关注read事件
  35. log.debug("{}", sc);
  36. log.debug("scKey:{}", scKey);
  37. }else if(key.isReadable()){
  38. try {
  39. SocketChannel channel = (SocketChannel) key.channel();//拿到触发事件的channel
  40. //获取selectionKey的关联附件
  41. ByteBuffer buffer = (ByteBuffer) key.attachment();
  42. int read = channel.read(buffer);//如果是客户端正常断开, read方法的返回值是-1
  43. if(read == -1){
  44. key.cancel();
  45. }else {
  46. //对数据进行处理,防止粘包半包
  47. split(buffer);
  48. //需要扩容[一个buffer存放不下一条完整的数据]
  49. if(buffer.position() == buffer.limit()){
  50. ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
  51. buffer.flip();
  52. newBuffer.put(buffer);// 0123456789abcdef3333\n
  53. key.attach(newBuffer);
  54. }
  55. }
  56. } catch (IOException e){
  57. e.printStackTrace();
  58. key.cancel();//因为客户端断开了,因此需要将key取消(从selector的keys集合中真正删除key)
  59. }
  60. }
  61. }
  62. }
  63. }
  64. /**
  65. * 处理粘包半包问题
  66. * @param source
  67. */
  68. private static void split(ByteBuffer source){
  69. //limit = position position = 0【切换为读模式】
  70. source.flip();
  71. for(int i = 0; i < source.limit(); i++){
  72. //根据规定字符找到一条完整消息【'\n'】
  73. if(source.get(i) == '\n'){
  74. //fad998877fa\n221 [source.position()当前读取到得位置]
  75. int length = i + 1 - source.position();
  76. //将这条完整消息存入新的ByteBuffer
  77. ByteBuffer target = ByteBuffer.allocate(length);
  78. //从source读,向target写
  79. for(int j = 0; j < length; j++){
  80. target.put(source.get());
  81. }
  82. debugAll(target);
  83. }
  84. }
  85. source.compact(); // 0123456789abcdef position 16 limit 16
  86. }
  87. }

Client

  1. @Slf4j
  2. public class Client {
  3. public static void main(String[] args) throws IOException {
  4. SocketChannel sc = SocketChannel.open();
  5. sc.connect(new InetSocketAddress("localhost", 8888));
  6. SocketAddress address = sc.getLocalAddress();
  7. // sc.write(Charset.defaultCharset().encode("hello\nworld\n"));
  8. sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
  9. sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
  10. System.in.read();
  11. }
  12. }
[4]ByteBuffer大小分配
  • 每个channel都需要维护一个独立的ByteBuffer
  • ByteBuffer不能太大

    • 方法1:首先分配一个较小的buffer,例如4K,如果数据不够再分配8k的buffer,将4k的内容拷贝到8k的buffer上【消息连续易处理,但是数据拷贝耗费性能】
    • 方法2:用多个数组组成buffer,一个数组不够,就把多出来的写入新数组,【避免了拷贝引起的性能损耗,但是消息存储不连续解析复杂】
2.3.3 处理write事件(一次写不完)
  • 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)
  • 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略

    • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
    • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
    • 如果不取消,会每次可写均会触发 write 事件

服务端:

  1. public class WriteServer {
  2. public static void main(String[] args) throws IOException {
  3. ServerSocketChannel ssc = ServerSocketChannel.open();
  4. ssc.configureBlocking(false);//设置非阻塞
  5. ssc.bind(new InetSocketAddress(8888));
  6. Selector selector = Selector.open();
  7. //注册selector到ssc,同时监听accept事件
  8. ssc.register(selector, SelectionKey.OP_ACCEPT);
  9. while(true){
  10. selector.select();
  11. //拿到绑定到selector上的所有key
  12. Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  13. while(iter.hasNext()){
  14. SelectionKey key = iter.next();
  15. iter.remove();
  16. if(key.isAcceptable()){
  17. //ssc只有一个,因此可以直接获取
  18. SocketChannel sc = ssc.accept();
  19. sc.configureBlocking(false);
  20. SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
  21. //1. 向客户端发送内容
  22. StringBuilder sb = new StringBuilder();
  23. for (int i = 0; i < 30000000; i++) {
  24. sb.append("a");
  25. }
  26. ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
  27. int write = sc.write(buffer);
  28. //3. write:实际写了多少字节
  29. System.out.println("实际写入字节:" + write);
  30. //4. 如果有剩余未读字节,才需要关注写事件
  31. if(buffer.hasRemaining()){
  32. //read:1 write:4
  33. //在原有关注事件的基础上 + 额外关注写事件
  34. scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
  35. //把buffer作为附件加入sckey
  36. scKey.attach(buffer);
  37. }
  38. }else if(key.isWritable()){
  39. //拿到附件
  40. ByteBuffer buffer = (ByteBuffer) key.attachment();
  41. SocketChannel sc = (SocketChannel) key.channel();
  42. int write = sc.write(buffer);
  43. System.out.println("实际写入字节数:" + write);
  44. if(!buffer.hasRemaining()){
  45. //没有剩余字节,写完了
  46. key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
  47. key.attach(null);
  48. }
  49. }
  50. }
  51. }
  52. }
  53. }

客户端:

  1. public class WriteClient {
  2. public static void main(String[] args) throws IOException {
  3. Selector selector = Selector.open();
  4. SocketChannel sc = SocketChannel.open();
  5. sc.configureBlocking(false);
  6. sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
  7. sc.connect(new InetSocketAddress("localhost", 8888));
  8. int count = 0; //统计接收到得数据量
  9. while(true){
  10. selector.select();
  11. Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  12. while(iter.hasNext()){
  13. SelectionKey key = iter.next();
  14. iter.remove();//拿到key之后要remove,否则会重复消费
  15. if(key.isConnectable()){
  16. System.out.println(sc.finishConnect());
  17. }else if(key.isReadable()){
  18. ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
  19. count += sc.read(buffer);
  20. buffer.clear();
  21. System.out.println("总共接收到得数据量:" + count);
  22. }
  23. }
  24. }
  25. }
  26. }

实现多次读写:
在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,125人围观)

还没有评论,来说两句吧...

相关阅读