Netty 权威指南之Google protobuf 编解码

一时失言乱红尘 2023-10-16 21:50 53阅读 0赞

本章相关知识点:

Google 的protobuf 在业界非常流行,很多的商业项目选址使用protobuf作为编解码框架,这里一起回顾一下protobuf 框架的优点:

1、结构化数据存储(类似XML和JSON)

2、高效编码性能

3、语言无法、平台无法、拓展性好

4、官方支持Java、C++和Python

本章学习目标:

1、Protobuf 开发入门

2、开发支持Protobuf 的Netty 服务端

3、开发支持Protobuf 的Netty 客户端

4、运行基于Netty 开发的Protobuf 应用

第一节:Protobuf 开发入门

Protobuf 入门

protobuf 是一个灵活、高效、结构化的数据序列化框架。相比XML等传统序列化工具,它更小、更快、更简单.Protobuf支持数据结构化一次编译可以到处使用,甚至是跨语言使用,通过代码生成工具可以自动生成不同语言版本的源代码,甚至可以在不同的版本数据结构进程中进行数据传递,实现数据结构的前向兼容。

Protobuf 开发环境搭建

1、首先先下载protobuf 的windows 版本

Center

2、protoc.exe 主要根据.proto文件生成代码,我们以图书订购流程为例,定义SubscribeReq.proto和SubscribeResp.proto,数据定义格式如下:

SubscribeResp.proto

  1. option java_package="com.nio.serlizable";
  2. option java_outer_classname="SubscribeRespProto";
  3. message SubscribeResp{
  4. required int32 subReqId = 1;
  5. required int32 respCode = 2;
  6. required string desc = 3;
  7. }

SubscribeReq.proto

  1. option java_package="com.nio.serlizable";
  2. option java_outer_classname="SubscribeReqProto";
  3. message SubscribeReq{
  4. required int32 subReqId = 1;
  5. required string userName = 2;
  6. required string productName = 3;
  7. repeated string address = 4;
  8. }

3、通过protoc.exe命令生成java代码,命令行如图所示:

Center 1

4、将生成的的POJO代码SubscribeReqProto.java和SubscribeRespProto.java复制到对应的IDEA14 项目当中。

Center 2

5、到此,我们已经完成对google protobuf 开发环境搭建工作,我们接下来通过一个简单的Demo 来了解Protobuf 类库使用。

  1. package com.nio.protobuf;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. /**
  5. * Created by vixuan-008 on 2015/6/24.
  6. */
  7. public class TestSubscribeReq {
  8. public static void main(String[] args)throws Exception{
  9. SubscribeReqProto.SubscribeReq req=createSubscribeReq();
  10. System.out.println("Before encode:"+req.toString());
  11. SubscribeReqProto.SubscribeReq result=decode(encode(req));
  12. System.out.println("decode cotent is:"+result.toString());
  13. }
  14. private static byte[] encode(SubscribeReqProto.SubscribeReq req){
  15. return req.toByteArray();
  16. }
  17. private static SubscribeReqProto.SubscribeReq decode(byte[] body) throws Exception{
  18. return SubscribeReqProto.SubscribeReq.parseFrom(body);
  19. }
  20. private static SubscribeReqProto.SubscribeReq createSubscribeReq(){
  21. SubscribeReqProto.SubscribeReq.Builder builder= SubscribeReqProto.SubscribeReq.newBuilder();
  22. builder.setSubReqId(1);
  23. builder.setUserName("zhouzhigang");
  24. builder.setProductName("Netty Book");
  25. List<String> address=new ArrayList<String>();
  26. address.add("湖南长沙");
  27. address.add("湖南株洲");
  28. address.add("湖南湘潭");
  29. builder.addAllAddress(address);
  30. return builder.build();
  31. }
  32. }

首先我们看如何创建 SubscribeReqProto.SubscribeReq的实例,通过SubscribeReqProto.SubscribeReq的静态方法newBuilder创建SubscribeReqProto.SubscribeReq的Builder实例,通过Builder构造器对SubscribeReq的属性进行相关设置,对于集合类型,通过addAllXXX()方法可以将集合对象添加到对象属性当中。

编码通过调用SubscribeReqProto.SubscribeReq实例的toByteArray方法,即可将SubscribeReq对象编码为byte数组,使用非常方便。

解码通过调用SubscribeReqProto.SubscribeReq的静态方法parseFrom将二进制数组解码为原始数据对象。

6、Protobuf 测试程序效果截图:

Center 3

第二节:开发支持Protobuf 的Netty 服务端

SubRespProServer.java源代码(Handler 存在问题可能需要修改)

  1. package com.nio.server;
  2. import com.nio.handler.SubRespProHandler;
  3. import com.nio.protobuf.SubscribeReqProto;
  4. import io.netty.bootstrap.ServerBootstrap;
  5. import io.netty.channel.ChannelFuture;
  6. import io.netty.channel.ChannelInitializer;
  7. import io.netty.channel.ChannelOption;
  8. import io.netty.channel.EventLoopGroup;
  9. import io.netty.channel.nio.NioEventLoopGroup;
  10. import io.netty.channel.socket.SocketChannel;
  11. import io.netty.channel.socket.nio.NioServerSocketChannel;
  12. import io.netty.handler.codec.protobuf.ProtobufDecoder;
  13. import io.netty.handler.codec.protobuf.ProtobufEncoder;
  14. import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
  15. import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
  16. import io.netty.handler.logging.LogLevel;
  17. import io.netty.handler.logging.LoggingHandler;
  18. /**
  19. * Created by vixuan-008 on 2015/6/24.
  20. */
  21. public class SubRespProServer {
  22. public static void main(String[] args)throws Exception{
  23. int port=15444;
  24. new SubRespProServer().bind(port);
  25. }
  26. public void bind(int port)throws Exception{
  27. //配置服务端的NIO线程池
  28. EventLoopGroup bossGroup=new NioEventLoopGroup();
  29. EventLoopGroup workGroup=new NioEventLoopGroup();
  30. try{
  31. ServerBootstrap b=new ServerBootstrap();
  32. b.group(bossGroup, workGroup);
  33. b.channel(NioServerSocketChannel.class);
  34. b.option(ChannelOption.SO_BACKLOG, 100);
  35. b.handler(new LoggingHandler(LogLevel.INFO));
  36. b.childHandler(new ChannelInitializer<SocketChannel>() {
  37. @Override
  38. protected void initChannel(SocketChannel socketChannel) throws Exception {
  39. socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
  40. socketChannel.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
  41. socketChannel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
  42. socketChannel.pipeline().addLast(new ProtobufEncoder());
  43. socketChannel.pipeline().addLast(new SubRespProHandler());
  44. }
  45. });
  46. //绑定端口,等待同步成功
  47. ChannelFuture f=b.bind(port).sync();
  48. //等待服务端关闭监听端口
  49. f.channel().closeFuture().sync();
  50. }finally {
  51. //释放线程池资源
  52. bossGroup.shutdownGracefully();
  53. workGroup.shutdownGracefully();
  54. }
  55. }
  56. }
  57. package com.nio.handler;
  58. import com.nio.protobuf.SubscribeReqProto;
  59. import com.nio.protobuf.SubscribeRespProto;
  60. import io.netty.channel.ChannelHandlerAdapter;
  61. import io.netty.channel.ChannelHandlerContext;
  62. /**
  63. * Created by vixuan-008 on 2015/6/24.
  64. */
  65. public class SubRespProHandler extends ChannelHandlerAdapter {
  66. @Override
  67. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  68. ctx.close();
  69. }
  70. @Override
  71. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  72. super.channelActive(ctx);
  73. }
  74. @Override
  75. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  76. SubscribeReqProto.SubscribeReq req=(SubscribeReqProto.SubscribeReq)msg;
  77. System.out.println("server receiver client message is:"+req.toString());
  78. ctx.writeAndFlush(resp(req.getSubReqId()));
  79. }
  80. private SubscribeRespProto.SubscribeResp resp(int subReqId)throws Exception{
  81. SubscribeRespProto.SubscribeResp.Builder resp= SubscribeRespProto.SubscribeResp.newBuilder();
  82. resp.setSubReqId(subReqId);
  83. resp.setRespCode(0);
  84. resp.setDesc("Netty Book order succeed 3 day later,sent to the designated adderss");
  85. return resp.build();
  86. }
  87. @Override
  88. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  89. super.channelReadComplete(ctx);
  90. }
  91. }

第三节: 开发支持Protobuf 的Netty 客户端

  1. package com.nio.client;
  2. import com.nio.handler.SubReqProHandler;
  3. import com.nio.handler.SubReqServerHandler;
  4. import com.nio.protobuf.SubscribeRespProto;
  5. import io.netty.channel.ChannelFuture;
  6. import io.netty.channel.ChannelInitializer;
  7. import io.netty.channel.ChannelOption;
  8. import io.netty.channel.EventLoopGroup;
  9. import io.netty.channel.nio.NioEventLoopGroup;
  10. import io.netty.channel.socket.SocketChannel;
  11. import io.netty.channel.socket.nio.NioSocketChannel;
  12. import io.netty.handler.codec.protobuf.ProtobufDecoder;
  13. import io.netty.handler.codec.protobuf.ProtobufEncoder;
  14. import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
  15. import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
  16. import io.netty.handler.codec.serialization.ClassResolvers;
  17. import io.netty.handler.codec.serialization.ObjectDecoder;
  18. import io.netty.handler.codec.serialization.ObjectEncoder;
  19. /**
  20. * Created by vixuan-008 on 2015/6/24.
  21. */
  22. public class SubReqProClient {
  23. public static void main(String[] args)throws Exception{
  24. int port=15444;
  25. new SubReqProClient().bind(port, "127.0.0.1");
  26. }
  27. public void bind(int port,String host)throws Exception{
  28. //配置客户端NIO线程池
  29. EventLoopGroup workGroup=new NioEventLoopGroup();
  30. try{
  31. io.netty.bootstrap.Bootstrap b=new io.netty.bootstrap.Bootstrap();
  32. b.group(workGroup);
  33. b.channel(NioSocketChannel.class);
  34. b.option(ChannelOption.TCP_NODELAY,true);
  35. b.handler(new ChannelInitializer<SocketChannel>() {
  36. @Override
  37. protected void initChannel(SocketChannel socketChannel) throws Exception {
  38. socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
  39. socketChannel.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
  40. socketChannel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
  41. socketChannel.pipeline().addLast(new ProtobufEncoder());
  42. socketChannel.pipeline().addLast(new SubReqProHandler());
  43. }
  44. });
  45. //发起异步连接操作
  46. ChannelFuture f=b.connect(host,port).sync();
  47. //等待客户端链路关闭
  48. f.channel().closeFuture().sync();
  49. }finally {
  50. //释放NIO 线程组
  51. workGroup.shutdownGracefully();
  52. }
  53. }
  54. }
  55. package com.nio.handler;
  56. import com.nio.protobuf.SubscribeReqProto;
  57. import com.nio.protobuf.SubscribeRespProto;
  58. import io.netty.channel.ChannelHandlerAdapter;
  59. import io.netty.channel.ChannelHandlerContext;
  60. import java.util.ArrayList;
  61. import java.util.List;
  62. /**
  63. * Created by vixuan-008 on 2015/6/24.
  64. */
  65. public class SubReqProHandler extends ChannelHandlerAdapter {
  66. @Override
  67. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  68. ctx.close();
  69. }
  70. @Override
  71. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  72. for(int i=0;i<10;i++){
  73. ctx.write(subReq(i));
  74. }
  75. ctx.flush();
  76. }
  77. private SubscribeReqProto.SubscribeReq subReq(int i){
  78. SubscribeReqProto.SubscribeReq.Builder req=SubscribeReqProto.SubscribeReq.newBuilder();
  79. req.setProductName("Netty Book");
  80. req.setUserName("zhouzhigang");
  81. req.setSubReqId(i);
  82. List<String> address=new ArrayList<String>();
  83. address.add("china");
  84. address.add("usa");
  85. address.add("france");
  86. req.addAllAddress(address);
  87. return req.build();
  88. }
  89. @Override
  90. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  91. System.out.println("Receiver server message is:"+msg);
  92. }
  93. @Override
  94. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  95. ctx.flush();
  96. }
  97. }

第四节: 运行基于Netty 开发的Protobuf 应用

服务端截图:

Center 4

Protobuf 使用注意事项:

1、ProtobufDecoder仅仅负责解码,它不支持读半包。因此,在ProtobufDecode前面,一定要有能够处理读半包的解码器,有三种方式可以选择.

使用Netty提供的ProtobufVarint32FrameDecoder,它可以处理半包消息:

集成Netty提供的通用半包解码器LengthFieldBasedFrameDecoder;

继承ByteMessageDecoder类,自己处理半包消息。

如果你只使用ProtobufDecoder解码器而忽视对半包消息处理,程序是不能正常运行的。今天的学习就到这里

发表评论

表情:
评论列表 (有 0 条评论,53人围观)

还没有评论,来说两句吧...

相关阅读

    相关 ProtoBuf 解码

    在网络程序开发过程中,数据在网络中是以二进制字节码的格式传输的,在发送业务数据前,先对业务数据进行编码,经过网络传输,收到数据后对数据进行解码,Netty提供了一些编解码器,它