重学Netty(三)——Netty的概念及其简单的Echo服务器编写

Netty简介

刚开始学习一门技术的时候,不要先去看那些博客和视频,应该先去官网上通读一遍,然后有什么看不懂的再去找其他途径。下图为官网截图
在这里插入图片描述
据官网描述,Netty是一个异步的事件驱动型的网络应用框架,可以用来快速的开发高性能且可维护协议的服务器和客户端。
也可以从官网中看到它的特点:在这里插入图片描述

  1. 从设计方面来看:不仅可以实现非阻塞,当然也可以实现阻塞socket通信,还有灵活可扩展的事件模型,还可以自定义线程模型等等
  2. 从使用方面来看:良好的Javadoc有使用案例,不需要一些额外的依赖
  3. 从性能方面来看:更高的吞吐量,更低的延迟,尽量减少内存拷贝,更小的资源消耗等
  4. 从安全方面来看:完整的SSL / TLS和StartTLS支持
  5. 从社区方面来看:社区比较活跃经常发布新版本,当然使用的地方也很多,因此常见问题会有相对成熟的解决方案

接下来可以再看一下它可以用来干什么,它的核心就是基于缓冲区零拷贝,通用的一些APO还有一些可扩展的事件模型。它不仅支持Socket,Datagram还支持Http Tunnel和Pipe数据传输服务,还有支持HTTP,SSL等常见的协议

在这里插入图片描述
可以看出来Netty是一个十分强大的网络框架!

从定义分析核心

Netty的定义就是说的是一个异步的事件驱动型网络框架,基于非阻塞的基础上开发的,因此我们就可以根据定义来推组件

  1. 非阻塞的:那就需要在Channel和Buffer的基础上
  2. 异步的:说明需要Future和方法的回调
  3. 事件驱动:需要事件和ChannelHandler(在Channel中对事件的处理)
  4. 事件分发机制:EventLoop

Channel和Buffer

Channel和Buffer已经很熟悉了,在nio中是必要的组件,Channel可以看做是一个数据传输的载体,但是需要注意的是数据并不是放在Channel里,而是Buffer里。

方法回调和Future

方法回调

用通俗的话来说,它还是一个方法调用,只不过这个方法调用不同于普通的方法调用,比如A方法调用了B方法,在B执行时某条件触发,A方法又会调用C方法。

在前面的网络程序设计中,对Server端的连接处理中,可以设置一个回调方法,如果isConnec()就打印一句话,当请求连接的时候这个回调方法就会打印出一句话,可以看做是自定义一个监听器,然后并在特定情况下进行处理。这个在ChannelHandler中会经常使用。

Future

Future学Java的都很熟悉了,它位于java.util.concurrent包下,它提供了一种可以在一种操作完成后对应用进行通知,可以从接口中的方法看出,很明显是对异步的一个支持,可以对任务进行取消,可以判断异步任务是否完成,可以去获取异步任务的执行结果。

  1. public interface Future<V> {
  2. boolean cancel(boolean mayInterruptIfRunning);
  3. boolean isCancelled();
  4. boolean isDone();
  5. V get() throws InterruptedException, ExecutionException;
  6. V get(long timeout, TimeUnit unit)
  7. throws InterruptedException, ExecutionException, TimeoutException;
  8. }

Netty中并没有直接使用Future方法,而是对Future进行了扩展,因为Java中的Future只能手动进行检查是否完成,因此Netty提供了自己实现的ChannelFuture。因为如果有了监听器就不需要手动进行检查,可以由监听器判断和执行相应的回调

  1. public interface ChannelFuture extends Future<Void> {
  2. Channel channel();
  3. ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1);
  4. ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... var1);
  5. ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> var1);
  6. ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... var1);
  7. ChannelFuture sync() throws InterruptedException;
  8. ChannelFuture syncUninterruptibly();
  9. ChannelFuture await() throws InterruptedException;
  10. ChannelFuture awaitUninterruptibly();
  11. boolean isVoid();
  12. }

事件和ChannelHandler

事件

对于事件,经过前面的NIO学习应该也不陌生了,因此可以将前面的一起串联起来,事件是什么?事件就是触发监听器的条件,触发后从而执行相应的回调逻辑,怎么控制回调逻辑?那就需要ChannelHandler来进行处理了。
因为Channel是一个双向通信的一个通道,因此它的事件也大致是基于请求/数据的进站/出站顺序而来的,比如以下这些事件

  • 连接请求状态
  • 连接的打开和断开
  • 数据从Socket读取和写入Socket
  • 用户自定义事件
  • 错误处理事件

这些可以算得上是事件,当然事件还都是人定义的。

ChannelHandler

上面说到了事件,那么对事件的处理就得交给ChannelHandler了,每一个事件都要有相对于的一个ChannelHandler进行事件处理,一个数据的发送或者接收往往是触发了很多的事件,因此ChannelHandler会连成一条责任链,每个ChannelHandler只负责自己的事件,如下图所示
在这里插入图片描述
在Netty中也有很多封装好的ChannelHandler供开发人员使用,避免重复造轮子,提高开发效率。

EventLoop和EventLoopGroup

在NIO编程中需要将事件注册到Selector上去,然后手动的根据情况分派任务,而Netty将Selector这写操作抽象了出来,不需要手动的进行分配。它内部会为每一个Channel都绑定一个EventLoop,用来事件处理,并在检测到感兴趣的事件后将事件自动分派给ChannelHandler进行处理。

EventLoop它是EventLoopGroup的子接口,因此它继承了很多父接口的方法,如父类的注册方法等(由它的注册方法也可以看出一个EventLoop对应一个Channel,因为入参只能有一个Channel)

  1. public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
  2. @Override
  3. EventLoopGroup parent();
  4. }

在这里插入图片描述

Echo服务器编写

服务端

  1. package Netty.SimpleExample;
  2. /*
  3. * @Author Wrial
  4. * @Date Created in 14:54 2020/4/13
  5. * @Description
  6. */
  7. import io.netty.bootstrap.ServerBootstrap;
  8. import io.netty.channel.Channel;
  9. import io.netty.channel.ChannelFuture;
  10. import io.netty.channel.ChannelInitializer;
  11. import io.netty.channel.EventLoopGroup;
  12. import io.netty.channel.nio.NioEventLoopGroup;
  13. import io.netty.channel.socket.nio.NioServerSocketChannel;
  14. public class EchoServer {
  15. public static void main(String[] args) {
  16. new EchoServer(8090).service();
  17. }
  18. private final ServerHandler serverHandler;
  19. private final EventLoopGroup eventLoopGroup;
  20. private final ServerBootstrap bootstrap;
  21. private final int port;
  22. public EchoServer(int port) {
  23. this.port = port;
  24. serverHandler = new ServerHandler();
  25. eventLoopGroup = new NioEventLoopGroup();
  26. bootstrap = new ServerBootstrap();
  27. }
  28. public void service() {
  29. try {
  30. bootstrap.group(eventLoopGroup)
  31. .channel(NioServerSocketChannel.class)
  32. .localAddress(port).childHandler(new ChannelInitializer<Channel>() {
  33. @Override
  34. protected void initChannel(Channel ch) throws Exception {
  35. ch.pipeline().addLast(serverHandler);
  36. }
  37. });
  38. ChannelFuture channelFuture = bootstrap.bind().sync();
  39. channelFuture.channel().closeFuture().sync();
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. } finally {
  43. try {
  44. eventLoopGroup.shutdownGracefully().sync();
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. }
  50. }
  51. package Netty.SimpleExample;
  52. /*
  53. * @Author Wrial
  54. * @Date Created in 14:56 2020/4/13
  55. * @Description 简单的处理器 因为Echo只需要对入站的消息进行处理
  56. * 因此就可以使用一个简单的ChannelInboundHandlerAdapter进行处理
  57. */
  58. import io.netty.buffer.ByteBuf;
  59. import io.netty.buffer.Unpooled;
  60. import io.netty.channel.ChannelFutureListener;
  61. import io.netty.channel.ChannelHandler;
  62. import io.netty.channel.ChannelHandlerContext;
  63. import io.netty.channel.ChannelInboundHandlerAdapter;
  64. import io.netty.util.CharsetUtil;
  65. @ChannelHandler.Sharable // 可以对此实例进行复用
  66. public class ServerHandler extends ChannelInboundHandlerAdapter {
  67. @Override
  68. public boolean isSharable() {
  69. return super.isSharable();
  70. }
  71. @Override
  72. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  73. ByteBuf byteBuf = (ByteBuf) msg;
  74. System.out.println("接收到" +byteBuf.toString(CharsetUtil.UTF_8));
  75. ctx.write(byteBuf);
  76. }
  77. @Override
  78. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  79. System.out.println("读完数据了,write and flush 并关闭Channel");
  80. ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
  81. .addListener(ChannelFutureListener.CLOSE);
  82. }
  83. @Override
  84. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  85. cause.printStackTrace();
  86. ctx.channel();
  87. }
  88. }

客户端

  1. package Netty.SimpleExample;
  2. /*
  3. * @Author Wrial
  4. * @Date Created in 14:54 2020/4/13
  5. * @Description
  6. */
  7. import io.netty.bootstrap.Bootstrap;
  8. import io.netty.channel.ChannelFuture;
  9. import io.netty.channel.ChannelInitializer;
  10. import io.netty.channel.EventLoopGroup;
  11. import io.netty.channel.nio.NioEventLoopGroup;
  12. import io.netty.channel.socket.SocketChannel;
  13. import io.netty.channel.socket.nio.NioSocketChannel;
  14. public class EchoClient {
  15. private final String host;
  16. private final int port;
  17. private final SocketChannel socketChannel;
  18. private final EventLoopGroup group;
  19. private final Bootstrap bootstrap;
  20. private final ClientHandler clientHandler;
  21. public EchoClient(String host, int port) {
  22. this.host = host;
  23. this.port = port;
  24. socketChannel = new NioSocketChannel();
  25. group = new NioEventLoopGroup();
  26. clientHandler = new ClientHandler();
  27. bootstrap = new Bootstrap();
  28. }
  29. private void start() {
  30. try {
  31. bootstrap.group(group)
  32. .channel(NioSocketChannel.class)
  33. .remoteAddress(host,port).handler(new ChannelInitializer<SocketChannel>() {
  34. @Override
  35. protected void initChannel(SocketChannel ch) throws Exception {
  36. ch.pipeline().addLast(clientHandler);
  37. }
  38. });
  39. ChannelFuture future = bootstrap.connect().sync();
  40. future.channel().closeFuture().sync();
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. } finally {
  44. try {
  45. group.shutdownGracefully().sync();
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. }
  51. public static void main(String[] args) throws Exception {
  52. new EchoClient("127.0.0.1", 8090).start();
  53. }
  54. }
  55. package Netty.SimpleExample;
  56. /*
  57. * @Author Wrial
  58. * @Date Created in 15:50 2020/4/13
  59. * @Description 此处使用SimpleChannelInboundHandler是根据资源分配角度来设定的
  60. */
  61. import io.netty.buffer.ByteBuf;
  62. import io.netty.buffer.Unpooled;
  63. import io.netty.channel.*;
  64. import io.netty.util.CharsetUtil;
  65. @ChannelHandler.Sharable
  66. public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
  67. // 活跃的时候发一条消息
  68. @Override
  69. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  70. System.out.println("发给服务端 Netty Client");
  71. ctx.writeAndFlush(Unpooled.copiedBuffer("Netty Client ", CharsetUtil.UTF_8));
  72. super.channelActive(ctx);
  73. }
  74. @Override
  75. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  76. cause.printStackTrace();
  77. ctx.close();
  78. }
  79. @Override
  80. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
  81. System.out.println("接收到服务端的响应:"+byteBuf.toString(CharsetUtil.UTF_8));
  82. }
  83. }

运行结果如下:
在这里插入图片描述
在这里插入图片描述

可以看得出Netty编写一个非阻塞的Echo服务器和客户端是十分简单的,这几行代码如果用NIO实现的话是需要上百行代码的!

发表评论

表情:
评论列表 (有 0 条评论,54人围观)

还没有评论,来说两句吧...

相关阅读