nio与netty

﹏ヽ暗。殇╰゛Y 2022-04-15 05:28 342阅读 0赞

IO分为三种

  1. BIO(传统IO) 同步阻塞IO
  2. NIO同步非阻塞IO
  3. AIO异步非阻塞IO(涉及linux网络底层,太难,这里不讨论)

阻塞和非阻塞区别:
传统IO是阻塞的,因为read(),write()方法就是阻塞的,此时必须等读完了或者写完了才能进行后面的操作。NIO是非阻塞的,才用多路复用技术,把read()和write()单独给了一个线程,由操作系统完成,所以操作系统读完了或者写完了只需要给我们代码发个消息,代码直接去拿结果,中间不用等待,可以做自己的事。

原生NIO代码

  1. package cn.nio;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectableChannel;
  6. import java.nio.channels.SelectionKey;
  7. import java.nio.channels.Selector;
  8. import java.nio.channels.ServerSocketChannel;
  9. import java.nio.channels.SocketChannel;
  10. import java.util.Iterator;
  11. public class NIOService {
  12. private Selector selector;
  13. public void initService() throws IOException{
  14. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  15. serverSocketChannel.configureBlocking(false);
  16. serverSocketChannel.bind(new InetSocketAddress(12345));
  17. this.selector = Selector.open();
  18. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  19. System.out.println("【星际联邦】:开启服务成功");
  20. }
  21. public void listenService() throws IOException{
  22. while(true){
  23. selector.select();
  24. Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
  25. while(iterator.hasNext()){
  26. SelectionKey selectionKey = iterator.next();
  27. iterator.remove();
  28. if (selectionKey.isAcceptable()) {
  29. ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
  30. SocketChannel channel = server.accept();
  31. channel.configureBlocking(false);
  32. channel.write(ByteBuffer.wrap("【星际联邦】:您已成功连接星际联邦。".getBytes("UTF-8")));
  33. channel.register(this.selector, SelectionKey.OP_READ);
  34. }else if (selectionKey.isReadable()) {
  35. SocketChannel channel =(SocketChannel) selectionKey.channel();
  36. ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
  37. channel.read(byteBuffer);
  38. byte[] data = byteBuffer.array();
  39. String message = new String(data).trim();
  40. System.out.println("【星际联邦】:接受到消息"+message);
  41. ByteBuffer responseBuffer = ByteBuffer.wrap(new String("【星际联邦】:星际联邦已派企业号前往").getBytes("UTF-8"));
  42. channel.write(responseBuffer);
  43. }
  44. }
  45. }
  46. }
  47. }
  48. public class StartService {
  49. public static void main(String[] args) {
  50. NIOService service = new NIOService();
  51. try {
  52. service.initService();
  53. service.listenService();
  54. } catch (IOException e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. }
  59. package cn.nio;
  60. import java.io.IOException;
  61. import java.net.InetSocketAddress;
  62. import java.nio.ByteBuffer;
  63. import java.nio.channels.SelectionKey;
  64. import java.nio.channels.Selector;
  65. import java.nio.channels.SocketChannel;
  66. import java.util.Iterator;
  67. public class NIOClient {
  68. private Selector selector;
  69. public void initClient() throws IOException{
  70. SocketChannel channel = SocketChannel.open();
  71. channel.configureBlocking(false);
  72. selector = Selector.open();
  73. channel.connect(new InetSocketAddress("127.0.0.1", 12345));
  74. channel.register(selector,SelectionKey.OP_CONNECT);
  75. System.out.println("客户端启动成功");
  76. }
  77. public void listenClient() throws IOException{
  78. while(true){
  79. selector.select();
  80. Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
  81. while(iterator.hasNext()){
  82. SelectionKey key = iterator.next();
  83. iterator.remove();
  84. if (key.isConnectable()) {
  85. SocketChannel socketChannel = (SocketChannel) key.channel();
  86. if (socketChannel.isConnectionPending()) {
  87. socketChannel.finishConnect();
  88. }
  89. socketChannel.configureBlocking(false);
  90. socketChannel.write(ByteBuffer.wrap(new String("hello word").getBytes("UTF-8")));
  91. socketChannel.register(selector, SelectionKey.OP_READ);
  92. }else if (key.isReadable()) {
  93. SocketChannel socketChannel = (SocketChannel) key.channel();
  94. ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
  95. socketChannel.read(byteBuffer);
  96. byte[] date = byteBuffer.array();
  97. String message = new String(date).trim();
  98. System.out.println("【客户端收到的消息为】:"+message);
  99. //ByteBuffer outBuffer = ByteBuffer.wrap(message.getBytes("utf-8"));
  100. //socketChannel.write(outBuffer);// 将消息回送给客户端
  101. }
  102. }
  103. }
  104. }
  105. }
  106. package cn.nio;
  107. import java.io.IOException;
  108. public class StartClinet {
  109. public static void main(String[] args) {
  110. NIOClient client = new NIOClient();
  111. try {
  112. client.initClient();
  113. client.listenClient();
  114. } catch (IOException e) {
  115. // TODO Auto-generated catch block
  116. e.printStackTrace();
  117. }
  118. }
  119. }

由于每次需要写这么多代码,于是一般使用第三方框架netty
netty注重点不在连接,而在我们的处理逻辑,逻辑一般写在handler

  1. package day01.netty;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.Channel;
  4. import io.netty.channel.ChannelFuture;
  5. import io.netty.channel.ChannelInitializer;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. public class EchoService {
  9. public void initService() {
  10. NioEventLoopGroup nioEventLoopGroup = null;
  11. try {
  12. //server端引导类
  13. ServerBootstrap serverBootstrap = new ServerBootstrap();
  14. //连接池数据,netty自动弄的连接池
  15. nioEventLoopGroup = new NioEventLoopGroup();
  16. //装配bootstap
  17. serverBootstrap.group(nioEventLoopGroup)
  18. //通道类型为NioServerSocketChannel
  19. .channel(NioServerSocketChannel.class)
  20. //监听端口号
  21. .localAddress(12345)
  22. .childHandler(new ChannelInitializer<Channel>() {
  23. @Override
  24. protected void initChannel(Channel channel) throws Exception {
  25. //全是模板代码,主要是这里绑定handler
  26. channel.pipeline().addLast(new EchoServiceHandler());
  27. }
  28. });
  29. // 最后绑定服务器等待直到绑定完成,调用sync()方法会阻塞直到服务器完成绑定,然后服务器等待通道关闭,因为使用sync(),所以关闭操作也会被阻塞。
  30. ChannelFuture channelFuture = serverBootstrap.bind().sync();
  31. System.out.println("netty服务端启动成功");
  32. channelFuture.channel().closeFuture().sync();
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. }finally {
  36. try {
  37. nioEventLoopGroup.shutdownGracefully().sync();
  38. } catch (InterruptedException e) {
  39. // TODO Auto-generated catch block
  40. e.printStackTrace();
  41. }
  42. }
  43. }
  44. public static void main(String[] args) {
  45. EchoService echoService = new EchoService();
  46. echoService.initService();
  47. }
  48. }
  49. package day01.netty;
  50. import io.netty.buffer.ByteBuf;
  51. import io.netty.buffer.Unpooled;
  52. import io.netty.channel.ChannelHandlerContext;
  53. import io.netty.channel.ChannelInboundHandlerAdapter;
  54. public class EchoServiceHandler extends ChannelInboundHandlerAdapter {
  55. /**
  56. * 服务端获取请求的时候被调用
  57. */
  58. @Override
  59. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  60. ByteBuf buf = (ByteBuf)msg;
  61. byte[] req = new byte[buf.readableBytes()];
  62. buf.readBytes(req);
  63. String content = new String(req,"UTF-8");
  64. System.out.println("netty服务端接受到的消息是:"+content);
  65. String rsp = "我是netty服务端,我已收到"+content;//响应内容
  66. ByteBuf rspBuf = Unpooled.copiedBuffer(rsp.getBytes());
  67. ctx.write(rspBuf);//响应给客户端
  68. }
  69. /**
  70. * channelRead方法完成之后调用
  71. */
  72. @Override
  73. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  74. ctx.flush();//flush之后才是真的发送了
  75. System.out.println("netty服务端响应成功");
  76. }
  77. /**
  78. * 发生异常时候被调用
  79. */
  80. @Override
  81. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  82. cause.printStackTrace();
  83. ctx.close();
  84. }
  85. }
  86. package day01.netty;
  87. import java.net.InetSocketAddress;
  88. import io.netty.bootstrap.Bootstrap;
  89. import io.netty.channel.ChannelFuture;
  90. import io.netty.channel.ChannelInitializer;
  91. import io.netty.channel.nio.NioEventLoopGroup;
  92. import io.netty.channel.socket.SocketChannel;
  93. import io.netty.channel.socket.nio.NioSocketChannel;
  94. public class EchoClient {
  95. private String ip;
  96. private Integer port;
  97. public EchoClient() {
  98. super();
  99. }
  100. public EchoClient(String ip, Integer port) {
  101. super();
  102. this.ip = ip;
  103. this.port = port;
  104. }
  105. public String getIp() {
  106. return ip;
  107. }
  108. public void setIp(String ip) {
  109. this.ip = ip;
  110. }
  111. public Integer getPort() {
  112. return port;
  113. }
  114. public void setPort(Integer port) {
  115. this.port = port;
  116. }
  117. public void initClient() throws Exception {
  118. Bootstrap bootstrap = new Bootstrap();
  119. NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
  120. bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(ip, port)).handler(new ChannelInitializer<SocketChannel>() {//业务处理类
  121. @Override
  122. protected void initChannel(SocketChannel ch)
  123. throws Exception {
  124. ch.pipeline().addLast(new EchoClientHandler());//注册handler
  125. }
  126. });
  127. ChannelFuture channelFuture = bootstrap.connect().sync();
  128. channelFuture.channel().closeFuture().sync();
  129. nioEventLoopGroup.shutdownGracefully().sync();
  130. }
  131. public static void main(String[] args) throws Exception {
  132. EchoClient client = new EchoClient("127.0.0.1", 12345);
  133. client.initClient();
  134. }
  135. }
  136. package day01.netty;
  137. import io.netty.buffer.ByteBuf;
  138. import io.netty.buffer.Unpooled;
  139. import io.netty.channel.ChannelHandlerContext;
  140. import io.netty.channel.SimpleChannelInboundHandler;
  141. public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf>{
  142. /**
  143. * 客户端连接服务器后被调用
  144. * 发送请求
  145. */
  146. @Override
  147. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  148. byte[] req = "hello word".getBytes();//消息
  149. ByteBuf buf = Unpooled.buffer(req.length);//把消息放入缓冲里面
  150. buf.writeBytes(req);//发送消息
  151. ctx.writeAndFlush(buf);//flush之后才是发送过去了
  152. System.out.println("netty客户端发送请求成功hello word");
  153. }
  154. /**
  155. * 服务端给客户端发消息时被调用
  156. * 获得响应
  157. */
  158. @Override
  159. protected void channelRead0(ChannelHandlerContext chx, ByteBuf msg) throws Exception {
  160. ByteBuf buf = (ByteBuf) msg;
  161. byte[] req = new byte[buf.readableBytes()];
  162. buf.readBytes(req);
  163. String rsp = new String(req, "UTF-8");
  164. System.out.println("netty客户端获得响应,内容是:"+rsp);
  165. }
  166. /**
  167. * 发生异常后被调用
  168. */
  169. @Override
  170. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  171. cause.printStackTrace();
  172. ctx.close();
  173. }
  174. }

发表评论

表情:
评论列表 (有 0 条评论,342人围观)

还没有评论,来说两句吧...

相关阅读