netty实战笔记 第二章 第一个Netty程序

叁歲伎倆 2022-04-10 21:23 365阅读 0赞

2.1 编写Echo服务器

所有的Netty服务器都需要一下两个部分:

  • 至少一个ChannelHandler
    该组件实现了服务器对客户端接收的数据的处理,即他的业务逻辑。
  • 引导
    这是配置服务器的启动代码. 至少,他会将服务器绑定到它要监听连接请求的端口上.

2.1.1 ChannelHandler 业务处理逻辑

接口ChannelInboundHandler
用来响应入站事件的方法。使用的时候只需要继承ChannelInboundHandlerAdapter类就行了。它提供了ChannelInboundHandler的默认实现。
主要的方法:

  • channelRead() 对于每个传入的消息都会调用
  • channelReadComplete() 通知ChannelInboundHandler最后一次对channelRead()的调用时当前批量读取中的最后一个条消息。
  • exceptionCaught() 在读取操作期间,有异常抛出时会调用。

    package com.moyang.echo;

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;

    /**

    • nettydemo1 — 这个组件实现了服务器的业务逻辑,决定了连接创建后和接收到信息后该如何处理
    • 这种使用 ChannelHandler 的方式体现了关注点分离的设计原则,并简化业务逻辑的迭代开发的要求
      *
    • @author 墨阳
    • @date 2018-11-09
      */
      // 标识ChannelHandler的实例之间可以在Channel里面共享
      @ChannelHandler.Sharable
      public class EchoServerHandler extends ChannelInboundHandlerAdapter {
  1. @Override
  2. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  3. // super.channelRead(ctx, msg);
  4. ByteBuf buf = (ByteBuf) msg;
  5. System.out.println("server has received: " + buf.toString(CharsetUtil.UTF_8));
  6. // 将接受到的消息发送给发送者,注意,这里并没有flush。
  7. ctx.write(buf);
  8. }
  9. @Override
  10. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  11. // 冲刷所有待审消息到远程节点。并关闭通道
  12. ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
  13. }
  14. @Override
  15. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  16. // 打印异常栈跟踪
  17. cause.printStackTrace();
  18. // 关闭Channel
  19. ctx.close();
  20. }
  21. }

2.2.2 引导服务器

引导服务器主要涉及一下内容:

  • 绑定到服务器上,进行监听,并接受传入连接请求的端口。
  • 配置channel以将有关的入站消息通知给EchoServerHandler实例。

    package com.moyang.echo;

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;

    import java.net.InetSocketAddress;

    /**

    • nettydemo1
    • 1.监听和接收进来的连接请求
    • 2.配置 Channel 来通知一个关于入站消息的 EchoServerHandler 实例
      *
    • @author 墨阳
    • @date 2018-11-09
      */
      public class EchoServer {

      private final int port;

      public EchoServer(int port) {

      1. this.port = port;

      }

      public static void main(String[] args) throws InterruptedException {

      1. int port = 8000;
      2. new EchoServer(port).start();

      }

      public void start() throws InterruptedException {

      1. // 创建EventLoopGroup
      2. EventLoopGroup group = new NioEventLoopGroup();
      3. try {
      4. // 创建ServerBootStrap
      5. ServerBootstrap bootstrap = new ServerBootstrap();
      6. bootstrap.group(group)
      7. // 为NIO指定使用 NioServerSocketChannel 这种信道类型
      8. .channel(NioServerSocketChannel.class)
      9. // 使用指定的端口设置套接字地址
      10. .localAddress(new InetSocketAddress(port))
      11. // 添加EchoServerHandler到子Channel的ChannelPipeline
      12. .childHandler(new ChannelInitializer<SocketChannel>() { //添加 EchoServerHandler 到 Channel 的 ChannelPipeline
      13. //当一个新的连接被接受,一个新的子 Channel 将被创建, ChannelInitializer 会添加我们EchoServerHandler 的实例到 Channel 的 ChannelPipeline。
      14. protected void initChannel(SocketChannel ch) throws Exception {
      15. // EchoServerHandler被标注为@Shareable,所以我们可以总是使用同样的实例。
      16. ch.pipeline().addLast(new EchoServerHandler());
      17. }
      18. });
      19. // 异步的绑定服务;调用sync()方法阻塞等待直到绑定完成
      20. ChannelFuture f = bootstrap.bind().sync(); // 8
      21. System.out.println(EchoServer.class.getName() + " started and listen on " + f.channel().localAddress());
      22. //获取channel的CloseFuture,调用sync()会导致阻塞当前线程直到它完成。
      23. f.channel().closeFuture().sync();
      24. } catch (Exception e) {
      25. e.printStackTrace();
      26. } finally {
      27. //关闭 EventLoopGroup,释放所有资源。包括所有创建的线程
      28. group.shutdownGracefully().sync();
      29. }

      }
      }

ChannelInitializer,当一个新的连接被接受的时候,一个新的子Channel将会被建立.ChannelInitializer将会把一个你的EchoServerHandler的实例添加到该ChannelChannelPipeline中。这个ChannelHandler就会收到有关入站消息的通知。

2.2 编写Echo客户端

Echo客户端会完成的工作:

  • 连接到服务器
  • 发送一个或者多个消息
  • 对于每个消息,等待并接受从服务器发回来的消息
  • 关闭连接

2.2.1 通过ChannelHandler实现业务逻辑

客户端将拥有一个用来处理数据库的ChannelInboundHandler,客户端使用SimpleChannelInboundHandler类来完成任务。主要的方法有:

  • channelActive() 在到服务器的连接已经建立之后被调用
  • channelRead0() 当从服务器接到一条消息的时候被调用
  • exceptionCaught() 在处理的过程中已发异常的时候被调用

    package com.moyang.echo;

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.util.CharsetUtil;

    /**

    • nettydemo1
      *
    • @author 墨阳
    • @date 2018-11-09
      */
      // 标记该实例可以被多个Channel共享
      @ChannelHandler.Sharable
      public class EchoClientHandler extends SimpleChannelInboundHandler {

      /**

      • 建立连接后该 channelActive() 方法被调用一次.
      • 逻辑很简单:一旦建立了连接,字节序列被发送到服务器。
      • 该消息的内容并不重要;在这里,我们使用了 Netty 编码字符串 “Netty rocks!”
      • 通过覆盖这种方法,我们确保东西被尽快写入到服务器。
        *
      • @param ctx
      • @throws Exception
        */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer(“netty rocks!!! “, CharsetUtil.UTF_8));
        }

        /**

      • 这种方法会在接收到数据时被调用。
      • 注意,由服务器所发送的消息可以以块的形式被接收。
      • 即,当服务器发送 5 个字节是不是保证所有的 5 个字节会立刻收到 - 即使是只有 5 个字节,
      • channelRead0() 方法可被调用两次,第一次用一个ByteBuf(Netty的字节容器)装载3个字节和第二次一个 ByteBuf 装载 2 个字节。
      • 唯一要保证的是,该字节将按照它们发送的顺序分别被接收。 (注意,这是真实的,只有面向流的协议如TCP)。
        *
      • @param ctx
      • @param msg
      • @throws Exception
        */
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer(“Netty rocks!”, CharsetUtil.UTF_8));
        System.out.println(“Client Received: “ + msg.toString(CharsetUtil.UTF_8));
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
        }
        }

思考

为什么客户端使用的SimpleChannelInboundHandler而服务端使用的ChannelInboundHandler?

这个关系到:业务逻辑如何处理消息以及Netty如何管理资源.
在客户端,当channelRead0()方法完成时,你已经有了传入消息,并且已经处理完它了。当该方法返回时,SimpleChannelInboundHandler 负责释放指向保存该消息的ByteBuf 的内存引用。在EchoServerHandler 中,你仍然需要将传入消息回送给发送者,而write()操作是异步的,直到channelRead()方法返回后可能仍然没有完成(如代码清单2-1 所示)。为此,EchoServerHandler扩展了ChannelInboundHandlerAdapter,其在这个时间点上不会释放消息。消息在EchoServerHandler 的channelReadComplete()方法中,当writeAndFlush()方法被调用时被释放

2.4.2 引导客户端

  1. package com.moyang.echo;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. import java.net.InetSocketAddress;
  10. /**
  11. * nettydemo1
  12. *
  13. * @author 墨阳
  14. * @date 2018-11-09
  15. */
  16. public class EchoClient {
  17. private final String host;
  18. private final int port;
  19. public EchoClient(String host, int port) {
  20. this.host = host;
  21. this.port = port;
  22. }
  23. public void start() throws InterruptedException {
  24. EventLoopGroup group = new NioEventLoopGroup();
  25. try {
  26. // 创建BootStrap
  27. Bootstrap bootstrap = new Bootstrap();
  28. // 指定EventLoopGroup以处理客户端事件;
  29. bootstrap.group(group)
  30. .channel(NioSocketChannel.class)
  31. // 设置服务器的InetSocketAddress
  32. .remoteAddress(new InetSocketAddress(host, port))
  33. .handler(new ChannelInitializer<SocketChannel>() {
  34. protected void initChannel(SocketChannel ch) throws Exception {
  35. ch.pipeline().addLast(new EchoClientHandler());
  36. }
  37. });
  38. // 连接到远程节点,调用sync()阻塞直到连接完成
  39. ChannelFuture f = bootstrap.connect().sync();
  40. // 阻塞,直到Channel关闭
  41. f.channel().closeFuture().sync();
  42. } finally {
  43. // 关闭线程池,释放所有资源
  44. group.shutdownGracefully().sync();
  45. }
  46. }
  47. public static void main(String[] args) throws InterruptedException {
  48. String host = "127.0.0.1";
  49. int port = 8000;
  50. new EchoClient(host, port).start();
  51. }
  52. }

注意:
可以在客户端和服务端使用不同的传输。

最后

如果你觉得写的还不错,就关注下公众号呗,关注后,有点小礼物回赠给你。
你可以获得5000+电子书,java,springCloud,adroid,python等各种视频教程,IT类经典书籍,各种软件的安装及破解教程。
希望一块学习,一块进步!
在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,365人围观)

还没有评论,来说两句吧...

相关阅读