Netty(一)——BIO、NIO、AIO

r囧r小猫 2022-05-11 15:42 372阅读 0赞
  1. 在工作中,无论什么项目我们都会用到网络传输,tomcatserver服务器、DubboRpc协议框架、RedisNOsql数据库、各种MQ的中间件等等。这篇文章是学习《Netty权威指南》后的总结,从Java基础的BIONIOAIO进行学习。
  2. 在说JavaIO以前,先看看unix提供的5IO模型:
  3. 1,阻塞IO模型:

70

  1. 2,非阻塞IO模型:

70 1

  1. 3IO复用模型:

70 2

  1. 4,信号驱动IO模型:

70 3

  1. 5,异步IO模型:

70 4

  1. 小结:最基本的11IO阻塞模型,通过使用轮询调用、selector管家进行协调、buffer缓存空间进行解耦、信号进行主动通知等方面进行优化实现的各种模型。
  2. 一,Java最基础的BIO,网络编程基于Client/Server模型,BIO就是其中最基础的阻塞模型,通过ServerSocketSocket建立通道,进行通讯。下边我们以“查询服务器时间”例子,来进行实现。
  3. public class TimeServer {
  4. public static void main(String[] args) throws IOException {
  5. int port = 8090;
  6. if (args != null && args.length > 0) {
  7. try {
  8. port = Integer.valueOf(args[0]);
  9. }catch (NumberFormatException e){
  10. }
  11. }
  12. ServerSocket serverSocket = null;
  13. try {
  14. serverSocket = new ServerSocket(port);
  15. System.out.println("The time server is start in port : " + port);
  16. Socket socket =null;
  17. while (true){
  18. socket =serverSocket.accept();
  19. new Thread(new TimeServerHandler(socket)).start();
  20. }
  21. }finally {
  22. if(serverSocket != null){
  23. System.out.println("The time server close");
  24. serverSocket.close();
  25. serverSocket =null;
  26. }
  27. }
  28. }
  29. }
  30. public class TimeServerHandler implements Runnable {
  31. private Socket socket;
  32. public TimeServerHandler(Socket socket) {
  33. this.socket = socket;
  34. }
  35. @Override
  36. public void run() {
  37. BufferedReader in = null;
  38. PrintWriter out = null;
  39. try {
  40. in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
  41. out = new PrintWriter(this.socket.getOutputStream(), true);
  42. String currentTime = null;
  43. String body = null;
  44. while (true) {
  45. body = in.readLine();
  46. if (body == null) {
  47. break;
  48. }
  49. System.out.println("the time server receive order : " + body);
  50. currentTime = "query time order".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "bad order";
  51. out.println(currentTime);
  52. }
  53. } catch (Exception e) {
  54. if (in != null) {
  55. try {
  56. in.close();
  57. } catch (IOException e1) {
  58. e1.printStackTrace();
  59. }
  60. }
  61. if (out != null) {
  62. out.close();
  63. out = null;
  64. }
  65. if (this.socket != null) {
  66. try {
  67. this.socket.close();
  68. } catch (IOException e1) {
  69. e1.printStackTrace();
  70. }
  71. this.socket = null;
  72. }
  73. } finally {
  74. }
  75. }
  76. }
  77. public class TimeClient {
  78. public static void main(String[] args) {
  79. int port = 8090;
  80. if(args !=null && args.length >0){
  81. try {
  82. port = Integer.valueOf(args[0]);
  83. }catch (NumberFormatException e){
  84. }
  85. }
  86. Socket socket =null;
  87. BufferedReader in =null;
  88. PrintWriter out = null;
  89. try {
  90. socket =new Socket("127.0.0.1",port);
  91. in =new BufferedReader(new InputStreamReader(socket.getInputStream()));
  92. out = new PrintWriter(socket.getOutputStream(),true);
  93. out.println("QUERY TIME ORDER");
  94. System.out.println("Send order 2 server succeed.");
  95. String resp = in.readLine();
  96. System.out.println("Now is :" + resp);
  97. } catch (UnknownHostException e) {
  98. e.printStackTrace();
  99. } catch (IOException e) {
  100. e.printStackTrace();
  101. }finally {
  102. if(out !=null){
  103. out.close();
  104. out =null;
  105. }
  106. if(in !=null){
  107. try {
  108. in.close();
  109. }catch (IOException e){
  110. e.printStackTrace();
  111. }
  112. in = null;
  113. }
  114. if(socket !=null){
  115. try {
  116. socket.close();
  117. } catch (IOException e) {
  118. e.printStackTrace();
  119. }
  120. socket = null;
  121. }
  122. }
  123. }
  124. }
  125. 二,伪异步IO:为了解决同步阻塞IO一个链路需要一个线程来处理的问题,可以利用引入线程池,可以使用线程池中的线程处理多个client客户端,提高线程的利用率。但是如果由于通选非常慢,耗尽了线程池中的线程,其它的client处理请求就会在队列中排队阻塞了,所以仅仅是理想情况下,看起来异步了。来看伪异步“查询服务器时间”的例子。
  126. public class TimeServer {
  127. public static void main(String[] args) throws IOException {
  128. int port = 8090;
  129. if (args != null && args.length > 0) {
  130. try {
  131. port = Integer.valueOf(args[0]);
  132. }catch (NumberFormatException e){
  133. }
  134. }
  135. ServerSocket serverSocket = null;
  136. try {
  137. serverSocket = new ServerSocket(port);
  138. System.out.println("The time server is start in port : " + port);
  139. Socket socket =null;
  140. //thread pool
  141. TimeServerHandlerExecutePool singleExecutePool = new TimeServerHandlerExecutePool(50,10000);
  142. while (true){
  143. socket =serverSocket.accept();
  144. singleExecutePool.execute(new TimeServerHandler(socket));
  145. }
  146. }finally {
  147. if(serverSocket != null){
  148. System.out.println("The time server close");
  149. serverSocket.close();
  150. serverSocket =null;
  151. }
  152. }
  153. }
  154. }
  155. public class TimeServerHandlerExecutePool {
  156. private ExecutorService executorService;
  157. public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {
  158. executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize));
  159. }
  160. public void execute(Runnable task) {
  161. executorService.execute(task);
  162. }
  163. }
  164. public class TimeServerHandler implements Runnable {
  165. private Socket socket;
  166. public TimeServerHandler(Socket socket) {
  167. this.socket = socket;
  168. }
  169. @Override
  170. public void run() {
  171. BufferedReader in = null;
  172. PrintWriter out = null;
  173. try {
  174. in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
  175. out = new PrintWriter(this.socket.getOutputStream(), true);
  176. String currentTime = null;
  177. String body = null;
  178. while (true) {
  179. body = in.readLine();
  180. if (body == null) {
  181. break;
  182. }
  183. System.out.println("the time server receive order : " + body);
  184. currentTime = "query time order".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "bad order";
  185. out.println(currentTime);
  186. }
  187. } catch (Exception e) {
  188. if (in != null) {
  189. try {
  190. in.close();
  191. } catch (IOException e1) {
  192. e1.printStackTrace();
  193. }
  194. }
  195. if (out != null) {
  196. out.close();
  197. out = null;
  198. }
  199. if (this.socket != null) {
  200. try {
  201. this.socket.close();
  202. } catch (IOException e1) {
  203. e1.printStackTrace();
  204. }
  205. this.socket = null;
  206. }
  207. } finally {
  208. }
  209. }
  210. }
  211. public class TimeClient {
  212. public static void main(String[] args) {
  213. int port = 8090;
  214. if(args !=null && args.length >0){
  215. try {
  216. port = Integer.valueOf(args[0]);
  217. }catch (NumberFormatException e){
  218. }
  219. }
  220. Socket socket =null;
  221. BufferedReader in =null;
  222. PrintWriter out = null;
  223. try {
  224. socket =new Socket("127.0.0.1",port);
  225. in =new BufferedReader(new InputStreamReader(socket.getInputStream()));
  226. out = new PrintWriter(socket.getOutputStream(),true);
  227. out.println("QUERY TIME ORDER");
  228. System.out.println("Send order 2 server succeed.");
  229. String resp = in.readLine();
  230. System.out.println("Now is :" + resp);
  231. } catch (UnknownHostException e) {
  232. e.printStackTrace();
  233. } catch (IOException e) {
  234. e.printStackTrace();
  235. }finally {
  236. if(out !=null){
  237. out.close();
  238. out =null;
  239. }
  240. if(in !=null){
  241. try {
  242. in.close();
  243. }catch (IOException e){
  244. e.printStackTrace();
  245. }
  246. in = null;
  247. }
  248. if(socket !=null){
  249. try {
  250. socket.close();
  251. } catch (IOException e) {
  252. e.printStackTrace();
  253. }
  254. socket = null;
  255. }
  256. }
  257. }
  258. }
  259. 三,NIO(非阻塞IO):在JDK1.4以后引入了NIO来解决BIO的一些问题,我们来看它是通过引入什么解决的(根据文章前边的几种IO模型联想一下)。
  260. 1Buffer缓存区,IO的读写,都直接往Buffer中进行操作,这样极大的解耦了两端;
  261. 2Channel(通道):通道是双向的,可以既进行读又进行写两种操作,和流不一样必须是inputstream或者outputstream的子类。
  262. 3Selector(多路复用器):Selector会不断轮询注册在之上的Channel,如果Channel发生了读或者写事件就出处于就绪状态,Seletor就会轮询出来,然后进行处理。
  263. 好,通过“查询服务器时间”例子来看下。
  264. public class TimeServer {
  265. public static void main(String[] args) throws IOException {
  266. int port = 8090;
  267. if (args != null && args.length > 0) {
  268. try {
  269. port = Integer.valueOf(args[0]);
  270. } catch (NumberFormatException e) {
  271. }
  272. }
  273. MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
  274. new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
  275. }
  276. }
  277. public class MultiplexerTimeServer implements Runnable {
  278. private Selector selector;
  279. private ServerSocketChannel serverSocketChannel;
  280. private volatile boolean stop;
  281. /**
  282. * 初始化多路复用器
  283. *
  284. * @param port
  285. */
  286. public MultiplexerTimeServer(int port) {
  287. try {
  288. selector = Selector.open();
  289. serverSocketChannel = ServerSocketChannel.open();
  290. serverSocketChannel.configureBlocking(false);
  291. serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
  292. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  293. System.out.println("The time server is start in port : " + port);
  294. } catch (IOException e) {
  295. e.printStackTrace();
  296. System.exit(1);
  297. }
  298. }
  299. public void stop() {
  300. this.stop = true;
  301. }
  302. @Override
  303. public void run() {
  304. while (!stop) {
  305. try {
  306. selector.select(1000);
  307. Set<SelectionKey> selectionKeySet = selector.selectedKeys();
  308. Iterator<SelectionKey> it = selectionKeySet.iterator();
  309. SelectionKey selectionKey = null;
  310. while (it.hasNext()) {
  311. selectionKey = it.next();
  312. it.remove();
  313. try {
  314. handlerInput(selectionKey);
  315. } catch (Exception e) {
  316. if (selectionKey != null) {
  317. selectionKey.cancel();
  318. if (selectionKey.channel() != null) {
  319. selectionKey.channel().close();
  320. }
  321. }
  322. }
  323. }
  324. } catch (Throwable t) {
  325. t.printStackTrace();
  326. }
  327. }
  328. //多路复用器关闭后,所有注册上边的channel和pipe等资源都会被自动去注册并关闭,所以不必重复释放资源
  329. if (selector != null) {
  330. try {
  331. selector.close();
  332. } catch (IOException e) {
  333. e.printStackTrace();
  334. }
  335. }
  336. }
  337. /**
  338. * hold key the handler step
  339. * @param key
  340. * @throws IOException
  341. */
  342. private void handlerInput(SelectionKey key) throws IOException {
  343. if (key.isValid()) {
  344. //处理新接入的请求消息
  345. if (key.isAcceptable()) {
  346. //accept new connection
  347. ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
  348. SocketChannel socketChannel = serverSocketChannel.accept();
  349. socketChannel.configureBlocking(false);
  350. //add the new connection to selector
  351. socketChannel.register(selector, SelectionKey.OP_READ);
  352. }
  353. if (key.isReadable()) {
  354. //read the data
  355. SocketChannel socketChannel = (SocketChannel) key.channel();
  356. ByteBuffer readBuffer = ByteBuffer.allocate(1024);
  357. int readBytes = socketChannel.read(readBuffer);
  358. if (readBytes > 0) {
  359. readBuffer.flip();
  360. byte[] bytes = new byte[readBuffer.remaining()];
  361. readBuffer.get(bytes);
  362. String body = new String(bytes, "UTF-8");
  363. System.out.println("the time server receive order: " + body);
  364. String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "bad order";
  365. doWrite(socketChannel, currentTime);
  366. } else if (readBytes < 0) {
  367. //对链路进行关闭
  368. key.cancel();
  369. socketChannel.close();
  370. } else {
  371. //read 0 byte data
  372. }
  373. }
  374. }
  375. }
  376. /**
  377. * socketChannel write response infomation
  378. * @param socketChannel
  379. * @param response
  380. * @throws IOException
  381. */
  382. private void doWrite(SocketChannel socketChannel, String response) throws IOException {
  383. if (response != null && response.trim().length() > 0) {
  384. byte[] bytes = response.getBytes();
  385. ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
  386. writeBuffer.put(bytes);
  387. writeBuffer.flip();
  388. socketChannel.write(writeBuffer);
  389. }
  390. }
  391. }
  392. public class TimeClient {
  393. public static void main(String[] args) {
  394. int port = 8090;
  395. if(args !=null && args.length >0){
  396. try {
  397. port = Integer.valueOf(args[0]);
  398. }catch (NumberFormatException e){
  399. }
  400. }
  401. new Thread(new TimeClientHandler("127.0.0.1",port),"Timer-Client-001").start();
  402. }
  403. }
  404. public class TimeClientHandler implements Runnable {
  405. private String host;
  406. private int port;
  407. private Selector selector;
  408. private SocketChannel socketChannel;
  409. private volatile boolean stop;
  410. /**
  411. * 初始化客户端
  412. * @param host
  413. * @param port
  414. */
  415. public TimeClientHandler(String host, int port) {
  416. this.host = host == null ? "127.0.0.1" : host;
  417. this.port = port;
  418. try {
  419. selector = Selector.open();
  420. socketChannel = SocketChannel.open();
  421. socketChannel.configureBlocking(false);
  422. } catch (IOException e) {
  423. e.printStackTrace();
  424. System.exit(1);
  425. }
  426. }
  427. @Override
  428. public void run() {
  429. try {
  430. doConnect();
  431. } catch (IOException e) {
  432. e.printStackTrace();
  433. System.exit(1);
  434. }
  435. //循环
  436. while (!stop) {
  437. try {
  438. selector.select(1000);
  439. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  440. Iterator<SelectionKey> it = selectionKeys.iterator();
  441. SelectionKey selectionKey = null;
  442. while (it.hasNext()) {
  443. selectionKey = it.next();
  444. it.remove();
  445. try {
  446. handlerInput(selectionKey);
  447. } catch (Exception e) {
  448. if (selectionKey != null) {
  449. selectionKey.cancel();
  450. if (selectionKey.channel() != null) {
  451. selectionKey.channel().close();
  452. }
  453. }
  454. }
  455. }
  456. } catch (IOException e) {
  457. e.printStackTrace();
  458. System.exit(1);
  459. }
  460. }
  461. if (selector != null) {
  462. try {
  463. selector.close();
  464. } catch (IOException e) {
  465. e.printStackTrace();
  466. }
  467. }
  468. }
  469. /**
  470. * handler selected key
  471. * @param key
  472. * @throws IOException
  473. */
  474. private void handlerInput(SelectionKey key) throws IOException {
  475. if (key.isValid()) {
  476. SocketChannel socketChannel = (SocketChannel) key.channel();
  477. if (key.isConnectable()) {
  478. if (socketChannel.finishConnect()) {
  479. socketChannel.register(selector, SelectionKey.OP_READ);
  480. doWrite(socketChannel);
  481. } else {
  482. //连接失败
  483. System.exit(1);
  484. }
  485. }
  486. if (key.isReadable()) {
  487. ByteBuffer readBuffer = ByteBuffer.allocate(1024);
  488. int readBytes = socketChannel.read(readBuffer);
  489. if (readBytes > 0) {
  490. readBuffer.flip();
  491. byte[] bytes = new byte[readBuffer.remaining()];
  492. readBuffer.get(bytes);
  493. String body = new String(bytes, "Utf-8");
  494. System.out.println("Now is : " + body);
  495. this.stop = true;
  496. } else if (readBytes < 0) {
  497. key.cancel();
  498. socketChannel.close();
  499. } else {
  500. }
  501. }
  502. }
  503. }
  504. /**
  505. * 进行连接
  506. *
  507. * @throws IOException
  508. */
  509. private void doConnect() throws IOException {
  510. //如果直接连接成功,则注册到多路复用器上,发送请求数据读应答
  511. if (socketChannel.connect(new InetSocketAddress(host, port))) {
  512. socketChannel.register(selector, SelectionKey.OP_READ);
  513. doWrite(socketChannel);
  514. } else {
  515. socketChannel.register(selector, SelectionKey.OP_CONNECT);
  516. }
  517. }
  518. /**
  519. * write data info
  520. *
  521. * @param socketChannel
  522. * @throws IOException
  523. */
  524. private void doWrite(SocketChannel socketChannel) throws IOException {
  525. byte[] req = "QUERY TIME ORDER".getBytes();
  526. ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
  527. writeBuffer.put(req);
  528. writeBuffer.flip();
  529. socketChannel.write(writeBuffer);
  530. if (!writeBuffer.hasRemaining()) {
  531. System.out.println("send order 2 server success");
  532. }
  533. }
  534. }
  535. 四,AIO(异步IO):AIO在处理数据传输时,直接调用APIreadwrite方法,即为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。 即可以理解为,read/write方法都是异步的,完成后会主动调用回调函数。 JDK1.7中,这部分内容被称作NIO.2,主要在Java.nio.channels包下增加了下面四个异步通道:其中的read/write方法,会返回一个带回调函数的对象,当执行完读取/写入操作后,直接调用回调函数。下边来看看“查询服务器时间”例子。
  536. public class TimeServer {
  537. public static void main(String[] args) throws IOException {
  538. int port = 8090;
  539. if (args != null && args.length > 0) {
  540. try {
  541. port = Integer.valueOf(args[0]);
  542. } catch (NumberFormatException e) {
  543. }
  544. }
  545. AsyncTimeServerHandler asyncTimeServerHandler =new AsyncTimeServerHandler(port);
  546. new Thread(asyncTimeServerHandler,"AIO-asyncTimeServerHandler-001").start();
  547. }
  548. }
  549. public class AsyncTimeServerHandler implements Runnable {
  550. CountDownLatch countDownLatch;
  551. AsynchronousServerSocketChannel asynchronousServerSocketChannel;
  552. private int port;
  553. /**
  554. * init server handler class
  555. * @param port
  556. */
  557. public AsyncTimeServerHandler(int port) {
  558. this.port = port;
  559. try {
  560. asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
  561. asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
  562. System.out.println("Time server is start in port :" + port);
  563. } catch (IOException e) {
  564. e.printStackTrace();
  565. }
  566. }
  567. @Override
  568. public void run() {
  569. countDownLatch = new CountDownLatch(1);
  570. doAccept();
  571. try {
  572. countDownLatch.await();
  573. } catch (InterruptedException e) {
  574. e.printStackTrace();
  575. }
  576. }
  577. /**
  578. * Accepts a connection.
  579. */
  580. public void doAccept() {
  581. asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());
  582. }
  583. }
  584. public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler> {
  585. /**
  586. * success handler
  587. * @param result
  588. * @param attachment
  589. */
  590. @Override
  591. public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
  592. attachment.asynchronousServerSocketChannel.accept(attachment,this);
  593. ByteBuffer buffer =ByteBuffer.allocate(1024);
  594. //注意回调
  595. result.read(buffer,buffer,new ReadComplateHandler(result));
  596. }
  597. @Override
  598. public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
  599. exc.printStackTrace();
  600. attachment.countDownLatch.countDown();
  601. }
  602. }
  603. public class ReadComplateHandler implements CompletionHandler<Integer, ByteBuffer> {
  604. private AsynchronousSocketChannel socketChannel;
  605. public ReadComplateHandler(AsynchronousSocketChannel socketChannel) {
  606. if (socketChannel != null) {
  607. this.socketChannel = socketChannel;
  608. }
  609. }
  610. @Override
  611. public void completed(Integer result, ByteBuffer attachment) {
  612. attachment.flip();
  613. byte[] body = new byte[attachment.remaining()];
  614. attachment.get(body);
  615. try {
  616. String req = new String(body, "UTF-8");
  617. System.out.println("the time server recieve order : " + req);
  618. String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new Date(System.currentTimeMillis()).toString() : "bad order";
  619. System.out.println("currentTime:" + currentTime);
  620. doWrite(currentTime);
  621. } catch (UnsupportedEncodingException e) {
  622. e.printStackTrace();
  623. }
  624. }
  625. /**
  626. * write data info to response
  627. * @param currentTime
  628. */
  629. private void doWrite(String currentTime) {
  630. if (currentTime != null && currentTime.trim().length() > 0) {
  631. byte[] bytes = currentTime.getBytes();
  632. ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
  633. writeBuffer.put(bytes);
  634. writeBuffer.flip();
  635. //注意回调
  636. socketChannel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
  637. @Override
  638. public void completed(Integer result, ByteBuffer attachment) {
  639. //如果没有发送完,则继续发送
  640. if (attachment.hasRemaining()) {
  641. socketChannel.write(attachment, attachment, this);
  642. }
  643. }
  644. @Override
  645. public void failed(Throwable exc, ByteBuffer attachment) {
  646. try {
  647. socketChannel.close();
  648. } catch (IOException e) {
  649. e.printStackTrace();
  650. }
  651. }
  652. });
  653. }
  654. }
  655. @Override
  656. public void failed(Throwable exc, ByteBuffer attachment) {
  657. try {
  658. this.socketChannel.close();
  659. } catch (IOException e) {
  660. e.printStackTrace();
  661. }
  662. }
  663. }
  664. public class TimeClient {
  665. public static void main(String[] args) {
  666. int port = 8090;
  667. if(args !=null && args.length >0){
  668. try {
  669. port = Integer.valueOf(args[0]);
  670. }catch (NumberFormatException e){
  671. }
  672. }
  673. new Thread(new AsyncTimeClientHandler("127.0.0.1",port),"AsyncTimeClientHandler-001").start();
  674. }
  675. }
  676. public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {
  677. private String host;
  678. private int port;
  679. private AsynchronousSocketChannel client;
  680. private CountDownLatch latch;
  681. /**
  682. * init client handler
  683. * @param host
  684. * @param port
  685. */
  686. public AsyncTimeClientHandler(String host, int port) {
  687. this.host = host == null ? "127.0.0.1" : host;
  688. this.port = port;
  689. try {
  690. client = AsynchronousSocketChannel.open();
  691. } catch (IOException e) {
  692. e.printStackTrace();
  693. }
  694. }
  695. @Override
  696. public void run() {
  697. latch = new CountDownLatch(1);
  698. //回调自己,到completed
  699. client.connect(new InetSocketAddress(host, port), this, this);
  700. try {
  701. latch.await();
  702. } catch (InterruptedException e) {
  703. e.printStackTrace();
  704. }
  705. try {
  706. client.close();
  707. } catch (IOException e) {
  708. e.printStackTrace();
  709. }
  710. }
  711. @Override
  712. public void completed(Void result, AsyncTimeClientHandler attachment) {
  713. byte[] req = "QUERY TIME ORDER".getBytes();
  714. ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
  715. writeBuffer.put(req);
  716. writeBuffer.flip();
  717. //write回调到new CompletaionHandler
  718. client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
  719. @Override
  720. public void completed(Integer result, ByteBuffer attachment) {
  721. if (attachment.hasRemaining()) {
  722. client.write(attachment, attachment, this);
  723. } else {
  724. ByteBuffer readBuffer = ByteBuffer.allocate(1024);
  725. //read回调到new CompletionHandler
  726. client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
  727. @Override
  728. public void completed(Integer result, ByteBuffer attachment) {
  729. attachment.flip();
  730. byte[] bytes = new byte[attachment.remaining()];
  731. attachment.get(bytes);
  732. String body;
  733. try {
  734. body = new String(bytes, "UTF-8");
  735. System.out.println("now is : " + body);
  736. latch.countDown();
  737. } catch (UnsupportedEncodingException e) {
  738. e.printStackTrace();
  739. }
  740. }
  741. @Override
  742. public void failed(Throwable exc, ByteBuffer attachment) {
  743. try {
  744. client.close();
  745. latch.countDown();
  746. } catch (IOException e) {
  747. e.printStackTrace();
  748. }
  749. }
  750. });
  751. }
  752. }
  753. @Override
  754. public void failed(Throwable exc, ByteBuffer attachment) {
  755. try {
  756. client.close();
  757. latch.countDown();
  758. } catch (IOException e) {
  759. e.printStackTrace();
  760. }
  761. }
  762. });
  763. }
  764. @Override
  765. public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
  766. exc.printStackTrace();
  767. try {
  768. client.close();
  769. latch.countDown();
  770. } catch (IOException e) {
  771. e.printStackTrace();
  772. }
  773. }
  774. }
  775. 好,下边来比较一下这几种方式异同:




















































 

BIO

伪异步IO

NIO

AIO

客户端数:IO线程数

1:1

M:N(其中M可以大于N)

M:1(1个线程处理多个客户端连接)

M:0(不需要启动额外的线程,被动回调)

IO类型(阻塞)

阻塞IO

阻塞IO

非阻塞IO

非阻塞IO

IO类型(同步)

同步IO

同步IO

同步IO(IO多路复用)

异步IO

可靠性

非常差

吞吐量

难度

简单

简单

复杂

复杂

  1. 好,网络编程的东西很多,而且比较复杂,需要不断的学习,不断的理解,当然也要善于站在巨人的肩膀上,例如各种中间件都已经实现好了,Netty也是一个非常棒的NIO框架,可以学习并为己用之。

发表评论

表情:
评论列表 (有 0 条评论,372人围观)

还没有评论,来说两句吧...

相关阅读

    相关 详尽Netty():初探netty

    如果大家对java架构相关感兴趣,可以关注下面公众号,会持续更新java基础面试题, netty, spring boot,spring cloud等系列文章,一系列干货随时送

    相关 Netty自学-Netty学习()

    什么Netty? Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客