Netty系列(一):Springboot整合Netty,自定义协议实现

拼搏现实的明天。 2023-09-23 18:03 108阅读 0赞

Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程。

Springboot整合Netty

新建springboot项目,并在项目以来中导入netty包,用fastjson包处理jsonStr。

  1. <!-- netty -->
  2. <dependency>
  3. <groupId>io.netty</groupId>
  4. <artifactId>netty-all</artifactId>
  5. <version>4.1.42.Final</version>
  6. </dependency>
  7. <!-- Json处理 -->
  8. <dependency>
  9. <groupId>com.alibaba.fastjson2</groupId>
  10. <artifactId>fastjson2</artifactId>
  11. <version>2.0.16</version>
  12. </dependency>
  13. 复制代码

创建netty相关配置信息文件

  1. yml配置文件——application.yml

    netty 配置

    netty:

    boss线程数量

    boss: 4

    worker线程数量

    worker: 2

    连接超时时间

    timeout: 6000

    服务器主端口

    port: 18023

    服务器备用端口

    portSalve: 18026

    服务器地址

    host: 127.0.0.1
    复制代码

  2. netty配置实体类——NettyProperties与yml配置文件绑定 通过@ConfigurationProperties(prefix = "netty")注解读取配置文件中的netty配置,通过反射注入值,需要在实体类中提供对应的setter和getter方法。

@ConfigurationProperties(prefix = "netty")对应的实体类属性名称不要求一定相同,只需保证“set”字符串拼接配置文件的属性和setter方法名相同即可。

  1. @Configuration
  2. @ConfigurationProperties(prefix = "netty")
  3. public class NettyProperties {
  4. /**
  5. * boss线程数量
  6. */
  7. private Integer boss;
  8. /**
  9. * worker线程数量
  10. */
  11. private Integer worker;
  12. /**
  13. * 连接超时时间
  14. */
  15. private Integer timeout = 30000;
  16. /**
  17. * 服务器主端口
  18. */
  19. private Integer port = 18023;
  20. /**
  21. * 服务器备用端口
  22. */
  23. private Integer portSalve = 18026;
  24. /**
  25. * 服务器地址 默认为本地
  26. */
  27. private String host = "127.0.0.1";
  28. // setter、getter 。。。。
  29. }
  30. 复制代码
  1. 对netty进行配置,绑定netty相关配置设置 Netty通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类。

    @Configuration
    @EnableConfigurationProperties
    public class NettyConfig {

    1. final NettyProperties nettyProperties;
    2. public NettyConfig(NettyProperties nettyProperties) {
    3. this.nettyProperties = nettyProperties;
    4. }
    5. /**
    6. * boss线程池-进行客户端连接
    7. *
    8. * @return
    9. */
    10. @Bean
    11. public NioEventLoopGroup boosGroup() {
    12. return new NioEventLoopGroup(nettyProperties.getBoss());
    13. }
    14. /**
    15. * worker线程池-进行业务处理
    16. *
    17. * @return
    18. */
    19. @Bean
    20. public NioEventLoopGroup workerGroup() {
    21. return new NioEventLoopGroup(nettyProperties.getWorker());
    22. }
    23. /**
    24. * 服务端启动器,监听客户端连接
    25. *
    26. * @return
    27. */
    28. @Bean
    29. public ServerBootstrap serverBootstrap() {
    30. ServerBootstrap serverBootstrap = new ServerBootstrap()
    31. // 指定使用的线程组
    32. .group(boosGroup(), workerGroup())
    33. // 指定使用的通道
    34. .channel(NioServerSocketChannel.class)
    35. // 指定连接超时时间
    36. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
    37. // 指定worker处理器
    38. .childHandler(new NettyServerHandler());
    39. return serverBootstrap;
    40. }

    }
    复制代码

  2. worker处理器,初始化通道以及配置对应管道的处理器 自定义了##@##分割符,通过DelimiterBasedFrameDecoder来处理拆包沾包问题; 通过MessageDecodeHandler将接收消息解码处理成对象实例; 通过MessageEncodeHandler将发送消息增加分割符后并编码; 最后通过ServerListenerHandler根据消息类型对应处理不同消息。

    public class NettyServerHandler extends ChannelInitializer {

    1. @Override
    2. protected void initChannel(SocketChannel socketChannel) throws Exception {
    3. // 数据分割符
    4. String delimiterStr = "##@##";
    5. ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes());
    6. ChannelPipeline pipeline = socketChannel.pipeline();
    7. // 使用自定义处理拆包/沾包,并且每次查找的最大长度为1024字节
    8. pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
    9. // 将上一步解码后的数据转码为Message实例
    10. pipeline.addLast(new MessageDecodeHandler());
    11. // 对发送客户端的数据进行编码,并添加数据分隔符
    12. pipeline.addLast(new MessageEncodeHandler(delimiterStr));
    13. // 对数据进行最终处理
    14. pipeline.addLast(new ServerListenerHandler());
    15. }

    }
    复制代码

  3. 数据解码 数据解码和编码都采用UTF8格式

    public class MessageDecodeHandler extends ByteToMessageDecoder {

    1. @Override
    2. protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) throws Exception {
    3. ByteBuf frame = in.retainedDuplicate();
    4. final String content = frame.toString(CharsetUtil.UTF_8);
    5. Message message = new Message(content);
    6. list.add(message);
    7. in.skipBytes(in.readableBytes());
    8. }

    }
    复制代码

  4. 数据解码转换的实例 Message类用于承载消息、转JsonString

    public class Message {

    1. /**
    2. * 数据长度
    3. */
    4. private Integer len;
    5. /**
    6. * 接收的通讯数据body
    7. */
    8. private String content;
    9. /**
    10. * 消息类型
    11. */
    12. private Integer msgType;
    13. public Message(Object object) {
    14. String str = object.toString();
    15. JSONObject jsonObject = JSONObject.parseObject(str);
    16. msgType = Integer.valueOf(jsonObject.getString("msg_type"));
    17. content = jsonObject.getString("body");
    18. len = str.length();
    19. }
    20. public String toJsonString() {
    21. return "{" +
    22. "\"msg_type\": " + msgType + ",\n" +
    23. "\"body\": " + content +
    24. "}";
    25. }
    26. // setter、getter 。。。。

    }
    复制代码

  5. 数据编码 netty服务端回复消息时,对消息转JsonString增加分割符,并进行编码。

    public class MessageEncodeHandler extends MessageToByteEncoder {

    1. // 数据分割符
    2. String delimiter;
    3. public MessageEncodeHandler(String delimiter) {
    4. this.delimiter = delimiter;
    5. }
    6. @Override
    7. protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {
    8. out.writeBytes((message.toJsonString() + delimiter).getBytes(CharsetUtil.UTF_8));
    9. }

    }
    复制代码

  6. 数据处理器,针对不同类型数据分类处理 在处理不同接收数据时使用了枚举类型,在使用switch时可以做下处理,具体参考代码,这里只演示如何操作,并没实现数据处理业务类。

    public class ServerListenerHandler extends SimpleChannelInboundHandler {

    1. private static final Logger log = LoggerFactory.getLogger(ServerListenerHandler.class);
    2. /**
    3. * 设备接入连接时处理
    4. *
    5. * @param ctx
    6. */
    7. @Override
    8. public void handlerAdded(ChannelHandlerContext ctx) {
    9. log.info("有新的连接:[{}]", ctx.channel().id().asLongText());
    10. }
    11. /**
    12. * 数据处理
    13. *
    14. * @param ctx
    15. * @param msg
    16. */
    17. @Override
    18. protected void channelRead0(ChannelHandlerContext ctx, Message msg) {
    19. // 获取消息实例中的消息体
    20. String content = msg.getContent();
    21. // 对不同消息类型进行处理
    22. MessageEnum type = MessageEnum.getStructureEnum(msg);
    23. switch (type) {
    24. case CONNECT:
    25. // TODO 心跳消息处理
    26. case STATE:
    27. // TODO 设备状态
    28. default:
    29. System.out.println(type.content + "消息内容" + content);
    30. }
    31. }
    32. /**
    33. * 设备下线处理
    34. *
    35. * @param ctx
    36. */
    37. @Override
    38. public void handlerRemoved(ChannelHandlerContext ctx) {
    39. log.info("设备下线了:{}", ctx.channel().id().asLongText());
    40. }
    41. /**
    42. * 设备连接异常处理
    43. *
    44. * @param ctx
    45. * @param cause
    46. */
    47. @Override
    48. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    49. // 打印异常
    50. log.info("异常:{}", cause.getMessage());
    51. // 关闭连接
    52. ctx.close();
    53. }

    }
    复制代码

  7. 数据类型枚举类

    public enum MessageEnum {

    1. CONNECT(1, "心跳消息"),
    2. STATE(2, "设备状态");
    3. public final Integer type;
    4. public final String content;
    5. MessageEnum(Integer type, String content) {
    6. this.type = type;
    7. this.content = content;
    8. }
    9. // case中判断使用
    10. public static MessageEnum getStructureEnum(Message msg) {
    11. Integer type = Optional.ofNullable(msg)
    12. .map(Message::getMsgType)
    13. .orElse(0);
    14. if (type == 0) {
    15. return null;
    16. } else {
    17. List<MessageEnum> objectEnums = Arrays.stream(MessageEnum.values())
    18. .filter((item) -> item.getType() == type)
    19. .distinct()
    20. .collect(Collectors.toList());
    21. if (objectEnums.size() > 0) {
    22. return objectEnums.get(0);
    23. }
    24. return null;
    25. }
    26. }
    27. // setter、getter。。。。

    }

    复制代码

到此Netty整个配置已经完成,但如果要跟随springboot一起启动,仍需要做一些配置。

  1. netty启动类配置

    @Component
    public class NettyServerBoot {

    1. private static final Logger log = LoggerFactory.getLogger(NettyServerBoot.class);
    2. @Resource
    3. NioEventLoopGroup boosGroup;
    4. @Resource
    5. NioEventLoopGroup workerGroup;
    6. final ServerBootstrap serverBootstrap;
    7. final NettyProperties nettyProperties;
    8. public NettyServerBoot(ServerBootstrap serverBootstrap, NettyProperties nettyProperties) {
    9. this.serverBootstrap = serverBootstrap;
    10. this.nettyProperties = nettyProperties;
    11. }
  1. /**
  2. * 启动netty
  3. *
  4. * @throws InterruptedException
  5. */
  6. @PostConstruct
  7. public void start() throws InterruptedException {
  8. // 绑定端口启动
  9. serverBootstrap.bind(nettyProperties.getPort()).sync();
  10. // 备用端口
  11. serverBootstrap.bind(nettyProperties.getPortSalve()).sync();
  12. log.info("启动Netty: {},{}", nettyProperties.getPort(), nettyProperties.getPortSalve());
  13. }
  14. /**
  15. * 关闭netty
  16. */
  17. @PreDestroy
  18. public void close() {
  19. log.info("关闭Netty");
  20. boosGroup.shutdownGracefully();
  21. workerGroup.shutdownGracefully();
  22. }
  23. }
  24. 复制代码

增加NettyServerBoot配置后,启动application时,netty服务端会跟随一起启动。 同时,在springboot关闭前,会先销毁netty服务。

发表评论

表情:
评论列表 (有 0 条评论,108人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Netty实现定义协议

    关于协议,使用最为广泛的是HTTP协议,但是在一些服务交互领域,其使用则相对较少,主要原因有三方面: HTTP协议会携带诸如header和cookie等信息,其本身对字

    相关 netty定义协议

    《netty权威指南》一书中关于自定义协议开发的源码中有一部分错误导致代码无法运行,加了一点改变可以完美运行了, package nettyAgreement.dec