Java NIO网络编程

我不是女神ヾ 2023-06-22 13:59 88阅读 0赞

BIO

BIO:传统的网络通讯模型,同步阻塞IO。服务端创建一个ServerSocket, 客户端用Socket去连接服务端的ServerSocket, ServerSocket接收到了一个的连接请求就会创建一个Socket和一个线程去跟那个Socket进行通讯。接着客户端和服务端就进行阻塞式的通信
阻塞式具体体现在
①服务端等待连接请求
②创建一个线程跟客户端进行和通信阻塞等待客户端发送的数据。
③客户端阻塞等待服务器端处理后返回响应,在响应返回前,客户端那边就阻塞等待,什么事都做不了。
缺点
①每次一个客户端接入,都需要创建一个线程来服务这个客户端。当客户端过多时,就会造成服务端的线程数量可能达到了几千几万,这样就可能会造成服务端过载过高,最后宕机
②阻塞时什么事都做不了,只能阻塞等待,浪费资源。

服务器代码会在A行代码中阻塞,一直等待客户端连接。一旦有客户端连接就能得到一个Socket连接,接着就会继续往下阻塞等待客户端的输入信息(一下代码在正式使用时会增加线程池,当有连接时,从线程池获得一个线程执行操作)
服务端代码样例

  1. public class TCPServer {
  2. public static void main(String[] args) {
  3. ServerSocket serverSocket = null;
  4. Socket sc = null;
  5. try {
  6. serverSocket = new ServerSocket(9999);
  7. } catch (IOException e) {
  8. e.printStackTrace();
  9. }
  10. while (true){
  11. try {
  12. sc = serverSocket.accept(); //A
  13. String ip = sc.getInetAddress().getHostAddress();
  14. System.out.println(ip+"已连接");
  15. InputStream is = sc.getInputStream();//B
  16. byte[] b = new byte[1024];
  17. is.read(b);
  18. System.out.println(ip+":"+new String(b));
  19. OutputStream outputStream = sc.getOutputStream();
  20. outputStream.write("知道了".getBytes());
  21. } catch (IOException e) {
  22. e.printStackTrace();
  23. }finally {
  24. try {
  25. sc.close();
  26. } catch (IOException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. }
  31. }
  32. }

客户端代码样例
C行代码会一直阻塞直到服务器端做出响应

  1. public class TCPClient {
  2. public static void main(String[] args) {
  3. while (true){
  4. try {
  5. Socket s = new Socket("127.0.0.1",9999);
  6. OutputStream outputStream = s.getOutputStream();
  7. System.out.println("请输入");
  8. Scanner scanner = new Scanner(System.in);
  9. String msg = scanner.nextLine();
  10. outputStream.write(msg.getBytes());
  11. InputStream inputStream = s.getInputStream();//C
  12. byte[] b = new byte[1024];
  13. inputStream.read(b);
  14. System.out.println("服务器:"+new String(b));
  15. } catch (IOException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }
  20. }

NIO

核心组件

  • 通道(Channel)
    首先说一下Channel,“通道/连接”。Stream是单向的,譬如:InputStream, OutputStream是单向的而Channel是双向的,既可以用来进行读操作,又可以用来进行写操作。网络NIO中的Channel也是双向的,可以从另一端获得数据也可以向另一端写入数据。
    1)FileChannel(文件IO)
    2)DatagramChannel(UDP)
    3)SocketChannel(ClientTCP)
    4)ServerSocketChannel(ClientTCP)
  • 缓冲区(Buffer)
    NIO中的关键Buffer实现有
    ByteBuffer
    CharBuffer
    DoubleBuffer
    FloatBuffer
    IntBuffer
    LongBuffer,
    ShortBuffer
  • 选择器(Selectors)
    Selectors(选择器)能够检测一到多个NIO通道,并能够知道连接是否做好准备的组件。一个单独的线程就可以管理多个channel,从而管理多个网络连接。要使用Selector, 得向Selector注册Channel。轮询一个或多个NIO Channel的状态是否有事件发生。有相应事件发生则执行相应逻辑。这样既可实现了利用一个线程无管理多数连接。不用再一个客户端一个连接。避免了多个线程的切换与维护。在这里插入图片描述
  • selectionKey:代表socket与Select之间发生的事件关系。
    OP_ACCEPT:有新的网络连接可以建立(accept),值为16
    OP_CONNECT:连接已经建立,值为8
    OP_READ/OP_WRITE 读写操作 值为1,4

服务端代码样例

  1. public class ChatServer {
  2. private ServerSocketChannel serverSocketChannel;
  3. private Selector selector;
  4. private static final int PORT = 9999;
  5. public ChatServer() {
  6. try {
  7. //得到ServerSocketChannel
  8. serverSocketChannel = ServerSocketChannel.open();
  9. //得到selector
  10. selector = Selector.open();
  11. //绑定端口号
  12. serverSocketChannel.bind(new InetSocketAddress(PORT));
  13. //设置非阻塞方式
  14. serverSocketChannel.configureBlocking(false);
  15. //设置selector给ServerSocketChannel
  16. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  17. System.out.println("server reday");
  18. } catch (IOException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. public void start() throws IOException {
  23. //准备执行任务
  24. while (true) {
  25. //每四秒监控一下客户端,返回值为就绪的channel连接数量
  26. if (selector.select(4000) == 0) {
  27. //非阻塞式,可以执行其他的事务
  28. System.out.println("没有准备就绪的连接,或已注册的连接");
  29. continue;
  30. }
  31. //有连接就绪了,得到selectionKey判断事件
  32. //得到所有的发生事件
  33. Iterator<SelectionKey> selectionKeyIterable = selector.selectedKeys().iterator();
  34. while (selectionKeyIterable.hasNext()) {
  35. SelectionKey selectionKey = selectionKeyIterable.next();
  36. if (selectionKey.isAcceptable()) {
  37. System.out.println("isAcceptable");
  38. SocketChannel socketChannel = null;
  39. //客户端连接事件
  40. socketChannel = serverSocketChannel.accept();
  41. //设置非阻塞方式
  42. socketChannel.configureBlocking(false);
  43. //将该通道注册到监控器中,注重监听读事件
  44. socketChannel.register(selector, SelectionKey.OP_READ);
  45. System.out.println(socketChannel.getRemoteAddress());
  46. } else if (selectionKey.isReadable()) {
  47. readMsg(selectionKey);
  48. }
  49. selectionKeyIterable.remove();
  50. }
  51. }
  52. }
  53. /** * 收到消息,并且广播 * * @param selectionKey */
  54. private void readMsg(SelectionKey selectionKey) throws IOException {
  55. //获得通道
  56. Channel channel = selectionKey.channel();
  57. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  58. int count = ((SocketChannel) channel).read(byteBuffer);
  59. if (count > 0) {
  60. String msg = new String(byteBuffer.array());
  61. System.out.println(msg);
  62. //发送广播
  63. broadCast(msg, channel);
  64. }
  65. }
  66. /** * 给所有的连接发送消息 * * @param msg * @param expect */
  67. private void broadCast(String msg, Channel expect) throws IOException {
  68. for (SelectionKey key : selector.keys()) {
  69. Channel channel = key.channel();
  70. if (channel instanceof SocketChannel && channel != expect) {
  71. ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
  72. ((SocketChannel) channel).write(byteBuffer);
  73. }
  74. }
  75. }
  76. public static void main(String[] args) {
  77. ChatServer chatServer = new ChatServer();
  78. try {
  79. chatServer.start();
  80. } catch (IOException e) {
  81. e.printStackTrace();
  82. }
  83. }
  84. }

客户端样例代码

  1. public class ChatClient {
  2. private String userName;
  3. private final static String HOST="127.0.0.1";
  4. private int port=9999;
  5. private SocketChannel socketChannel;
  6. public ChatClient() {
  7. try {
  8. //得到一个网络连接
  9. socketChannel = SocketChannel.open();
  10. //设置阻塞方式
  11. socketChannel.configureBlocking(false);
  12. //封装连接地址
  13. InetSocketAddress address = new InetSocketAddress(HOST,port);
  14. //使连接连上客户端
  15. if (!socketChannel.connect(address)){
  16. //连接失败
  17. while (!socketChannel.finishConnect()){
  18. //未连接上,持续连接,在连接过程中非阻塞
  19. System.out.println("正在连接服务器....");
  20. }
  21. }
  22. userName = socketChannel.getLocalAddress().toString();
  23. } catch (IOException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. public void sendMsg(String msg) throws IOException {
  28. if (msg.equals("bye")){
  29. socketChannel.close();
  30. return;
  31. }
  32. ByteBuffer byteBuffer = ByteBuffer.wrap((userName+":"+msg).getBytes());
  33. socketChannel.write(byteBuffer);
  34. }
  35. public void receiveMsg() throws IOException {
  36. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  37. int count = socketChannel.read(byteBuffer);
  38. if (count > 0) {
  39. String msg = new String(byteBuffer.array());
  40. System.out.println(msg.trim());
  41. }
  42. }
  43. public static void main(String[] args) {
  44. ChatClient chatClient = new ChatClient();
  45. new Thread(){
  46. @Override
  47. public void run() {
  48. while (true){
  49. try {
  50. chatClient.receiveMsg();
  51. Thread.sleep(2000);
  52. } catch (IOException e) {
  53. e.printStackTrace();
  54. } catch (InterruptedException e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. }
  59. }.start();
  60. Scanner scanner = new Scanner(System.in);
  61. while (scanner.hasNext()){
  62. String msg = scanner.nextLine();
  63. try {
  64. chatClient.sendMsg(msg);
  65. } catch (IOException e) {
  66. e.printStackTrace();
  67. }
  68. }
  69. }
  70. }

发表评论

表情:
评论列表 (有 0 条评论,88人围观)

还没有评论,来说两句吧...

相关阅读