Spring和Netty整合详解

柔光的暖阳◎ 2022-01-26 06:37 610阅读 0赞

Spring和Netty整合详解

本篇主要介绍netty如何跟Spring配合,其实真的很没必要将netty和Spring牵扯在一起,我们完全可以用netty做出一个spring的;然而在《Spring环境下使用Netty写Socket和Http详解》一篇中,因为没怎么用到Spring,遭到部分网友质疑,因此这一篇着重介绍如何跟Spring做配合。

官方主页

Spring

Netty

一、概述

Netty是目前最流行的由JBOSS提供的一个Java开源框架NIO框架,Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

相比JDK原生NIO,Netty提供了相对十分简单易用的API,非常适合网络编程。Netty是完全基于NIO实现的,所以Netty是异步的。

Mina同样也是一款优秀的NIO框架,而且跟Netty是出自同一个人之手,但是Netty要晚一点,优点更多一些,想了解更多可以直接搜索mina和netty比较。

使用Netty,我们可以作为Socket服务器,也可以用来做Http服务器,这里,我们将这两种方式都详细介绍一下。

Git地址:
Gitee

首发地址:
品茗IT-首发

品茗IT 提供在线支持:

一键快速构建Spring项目工具

一键快速构建SpringBoot项目工具

一键快速构建SpringCloud项目工具

一站式Springboot项目生成

如果大家正在寻找一个java的学习环境,或者在开发中遇到困难,可以加入我们的java学习圈,点击即可加入,共同学习,节约学习时间,减少很多在学习中遇到的难题。

二、依赖Jar包

本文假设你已经引入Spring必备的一切了,已经是个Spring mvc项目了,如果不会搭建,可以打开这篇文章看一看《Spring和Spring Mvc 5整合详解》。

我们假定你已经建好了Spring环境。这里只引入Netty的jar包。

  1. <dependency>
  2. <groupId>io.netty</groupId>
  3. <artifactId>netty-all</artifactId>
  4. <version>4.1.17.Final</version>
  5. </dependency>

完整依赖:

  1. <?xml version="1.0"?>
  2. <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  3. <modelVersion>4.0.0</modelVersion>
  4. <parent>
  5. <groupId>cn.pomit</groupId>
  6. <artifactId>SpringWork</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. </parent>
  9. <artifactId>Netty</artifactId>
  10. <packaging>jar</packaging>
  11. <name>Netty</name>
  12. <url>http://maven.apache.org</url>
  13. <dependencies>
  14. <dependency>
  15. <groupId>io.netty</groupId>
  16. <artifactId>netty-all</artifactId>
  17. <version>4.1.17.Final</version>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.springframework</groupId>
  21. <artifactId>spring-core</artifactId>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.springframework</groupId>
  25. <artifactId>spring-context</artifactId>
  26. </dependency>
  27. </dependencies>
  28. <build>
  29. <finalName>Netty</finalName>
  30. </build>
  31. </project>

父pom管理了所有依赖jar包的版本,地址:
https://www.pomit.cn/spring/SpringWork/pom.xml

三、Netty服务器配置

我们写一个tcp的server,作为线程运行。

NettyServer :

  1. package cn.pomit.springwork.netty.server;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import cn.pomit.springwork.netty.config.NettyServerInitializer;
  4. import io.netty.bootstrap.ServerBootstrap;
  5. import io.netty.channel.Channel;
  6. import io.netty.channel.ChannelFuture;
  7. import io.netty.channel.EventLoopGroup;
  8. import io.netty.channel.nio.NioEventLoopGroup;
  9. import io.netty.channel.socket.nio.NioServerSocketChannel;
  10. public class NettyServer implements Runnable {
  11. private int port;
  12. private Channel channel;
  13. private EventLoopGroup bossGroup;
  14. private EventLoopGroup workerGroup;
  15. private Thread nserver;
  16. @Autowired
  17. NettyServerInitializer nettyServerInitializer;
  18. public void init() {
  19. nserver = new Thread(this);
  20. nserver.start();
  21. }
  22. public void destory() {
  23. System.out.println("destroy server resources");
  24. if (null == channel) {
  25. System.out.println("server channel is null");
  26. }
  27. bossGroup.shutdownGracefully();
  28. workerGroup.shutdownGracefully();
  29. channel.closeFuture().syncUninterruptibly();
  30. bossGroup = null;
  31. workerGroup = null;
  32. channel = null;
  33. }
  34. public int getPort() {
  35. return port;
  36. }
  37. public void setPort(int port) {
  38. this.port = port;
  39. }
  40. @Override
  41. public void run() {
  42. bossGroup = new NioEventLoopGroup();
  43. workerGroup = new NioEventLoopGroup();
  44. System.out.println(Thread.currentThread().getName() + "----位置4");
  45. try {
  46. ServerBootstrap b = new ServerBootstrap();
  47. b.group(bossGroup, workerGroup);
  48. b.channel(NioServerSocketChannel.class);
  49. b.childHandler(nettyServerInitializer);
  50. // 服务器绑定端口监听
  51. ChannelFuture f = b.bind(port).sync();
  52. // 监听服务器关闭监听
  53. f.channel().closeFuture().sync();
  54. channel = f.channel();
  55. // 可以简写为
  56. /* b.bind(portNumber).sync().channel().closeFuture().sync(); */
  57. } catch (InterruptedException e) {
  58. e.printStackTrace();
  59. } finally {
  60. bossGroup.shutdownGracefully();
  61. workerGroup.shutdownGracefully();
  62. }
  63. }
  64. }

代码贴出来之后,我们还是要讲下里面的一些重点:

bossGroup和workerGroup就是为了建立线程池,这个自行百度。

以上使用了

  1. f.channel().closeFuture().sync();

这部分代码是阻塞了当前主线程一直等待结束,后面的代码就不能执行了。

两次sync的不同:两次sync虽然都是针对ChannelFuture的,但是两次的ChannelFuture不一样,
原理就不细说了,我也不知道,毕竟不是专精这方面的。sync换成await是一样的。

这里面NettyServerInitializer使用Spring的bean注入功能。

四、启动Server

启动NettyServer。

4.1 Spring的xml配置

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:jaxws="http://cxf.apache.org/jaxws" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd ">
  3. <context:annotation-config />
  4. <context:component-scan base-package="cn.pomit.springwork">
  5. </context:component-scan>
  6. <bean id="annotationPropertyConfigurerNetty" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
  7. <property name="order" value="1" />
  8. <property name="ignoreUnresolvablePlaceholders" value="true" />
  9. <property name="locations">
  10. <list>
  11. <value>classpath:netty.properties</value>
  12. </list>
  13. </property>
  14. </bean>
  15. <bean id="nettyDemoServer" class="cn.pomit.springwork.netty.server.NettyServer" init-method="init" destroy-method="destory">
  16. <property name="port" value="${netty_port}" />
  17. </bean>
  18. <bean id="closeFutureHandler" class="cn.pomit.springwork.netty.handler.CloseFutureHandler">
  19. <property name="nextHandler"><null/></property>
  20. </bean>
  21. <bean id="exceptionFutureHandler" class="cn.pomit.springwork.netty.handler.ExceptionFutureHandler">
  22. <property name="nextHandler"><null/></property>
  23. </bean>
  24. <bean id="bussinessFutureHandler" class="cn.pomit.springwork.netty.handler.BussinessFutureHandler">
  25. <property name="nextHandler"><null/></property>
  26. </bean>
  27. </beans>

配置文件netty.properties:

  1. netty_port=4444

4.2 注解启动

只是@Service注解和@PostConstruct注解的运用而已。

  1. package cn.pomit.springwork.netty.server;
  2. import javax.annotation.PostConstruct;
  3. import javax.annotation.PreDestroy;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. import cn.pomit.springwork.netty.config.NettyServerInitializer;
  7. import io.netty.bootstrap.ServerBootstrap;
  8. import io.netty.channel.Channel;
  9. import io.netty.channel.ChannelFuture;
  10. import io.netty.channel.EventLoopGroup;
  11. import io.netty.channel.nio.NioEventLoopGroup;
  12. import io.netty.channel.socket.nio.NioServerSocketChannel;
  13. @Service
  14. public class NettyServer implements Runnable {
  15. private int port;
  16. private Channel channel;
  17. private EventLoopGroup bossGroup;
  18. private EventLoopGroup workerGroup;
  19. private Thread nserver;
  20. @Autowired
  21. NettyServerInitializer nettyServerInitializer;
  22. @PostConstruct
  23. public void init() {
  24. nserver = new Thread(this);
  25. nserver.start();
  26. }
  27. @PreDestroy
  28. public void destory() {
  29. System.out.println("destroy server resources");
  30. if (null == channel) {
  31. System.out.println("server channel is null");
  32. }
  33. bossGroup.shutdownGracefully();
  34. workerGroup.shutdownGracefully();
  35. channel.closeFuture().syncUninterruptibly();
  36. bossGroup = null;
  37. workerGroup = null;
  38. channel = null;
  39. }
  40. public int getPort() {
  41. return port;
  42. }
  43. public void setPort(int port) {
  44. this.port = port;
  45. }
  46. @Override
  47. public void run() {
  48. bossGroup = new NioEventLoopGroup();
  49. workerGroup = new NioEventLoopGroup();
  50. System.out.println(Thread.currentThread().getName() + "----位置4");
  51. try {
  52. ServerBootstrap b = new ServerBootstrap();
  53. b.group(bossGroup, workerGroup);
  54. b.channel(NioServerSocketChannel.class);
  55. b.childHandler(nettyServerInitializer);
  56. // 服务器绑定端口监听
  57. ChannelFuture f = b.bind(port).sync();
  58. // 监听服务器关闭监听
  59. f.channel().closeFuture().sync();
  60. channel = f.channel();
  61. // 可以简写为
  62. /* b.bind(portNumber).sync().channel().closeFuture().sync(); */
  63. } catch (InterruptedException e) {
  64. e.printStackTrace();
  65. } finally {
  66. bossGroup.shutdownGracefully();
  67. workerGroup.shutdownGracefully();
  68. }
  69. }
  70. }

五、NettyServerInitializer

NettyServerInitializer是对NettyServer的handler初始化配置。可以作为Spring的bean处理,所以这里用了@Component将NettyServerInitializer声明为Spring的bean.

NettyServerInitializer:

  1. package cn.pomit.springwork.netty.config;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Component;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelPipeline;
  6. import io.netty.channel.socket.SocketChannel;
  7. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  8. import io.netty.handler.codec.Delimiters;
  9. import io.netty.handler.codec.string.StringDecoder;
  10. import io.netty.handler.codec.string.StringEncoder;
  11. @Component
  12. public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
  13. @Autowired
  14. NettyServerHandler nettyServerHandler;
  15. @Override
  16. protected void initChannel(SocketChannel ch) throws Exception {
  17. ChannelPipeline pipeline = ch.pipeline();
  18. // 以("\n")为结尾分割的 解码器
  19. pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
  20. // 字符串解码 和 编码
  21. pipeline.addLast("decoder", new StringDecoder());
  22. pipeline.addLast("encoder", new StringEncoder());
  23. System.out.println(Thread.currentThread().getName() + "----位置5");
  24. // 自己的逻辑Handler
  25. pipeline.addLast("handler", nettyServerHandler);
  26. }
  27. }

六、Netty的共享ChannelHandler

netty的ChannelHandler是不能共享的。是一个线程一个的,如果之间将ChannelHandler作为Spring的bean管理,是会报错的。

因此,需要用@Sharable注解对ChannelHandler做声明,然后再由Spring进行管理。

NettyServerHandler:

  1. package cn.pomit.springwork.netty.config;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.stereotype.Component;
  5. import cn.pomit.springwork.netty.handler.Handler;
  6. import io.netty.channel.ChannelHandler.Sharable;
  7. import io.netty.channel.ChannelHandlerContext;
  8. import io.netty.channel.SimpleChannelInboundHandler;
  9. @Component
  10. @Sharable
  11. public class NettyServerHandler extends SimpleChannelInboundHandler<String>{
  12. @Autowired(required = false)
  13. @Qualifier("closeFutureHandler")
  14. public Handler closeFutureHandler;
  15. @Autowired(required = false)
  16. @Qualifier("exceptionFutureHandler")
  17. public Handler exceptionFutureHandler;
  18. @Autowired
  19. @Qualifier("bussinessFutureHandler")
  20. public Handler bussinessFutureHandler;
  21. @Override
  22. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  23. // System.out.println(((HandlerServiceImp) exportServiceMap.get("helloWorldService")).test());
  24. // // 返回客户端消息 - 我已经接收到了你的消息
  25. System.out.println(Thread.currentThread().getName()+"----位置6");
  26. // handlerService.handle(msg);
  27. String retMsg = bussinessFutureHandler.hander(msg);
  28. ctx.writeAndFlush(retMsg);
  29. }
  30. @Override
  31. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  32. System.out.println(ctx.channel().remoteAddress() + " channelRegistered " );
  33. super.channelRegistered(ctx);
  34. }
  35. @Override
  36. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  37. System.out.println(ctx.channel().remoteAddress() + " channelUnregistered " );
  38. super.channelUnregistered(ctx);
  39. if(closeFutureHandler !=null){
  40. closeFutureHandler.hander(ctx.name());
  41. }
  42. }
  43. @Override
  44. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  45. System.out.println(ctx.channel().remoteAddress() + " channelActive " );
  46. super.channelActive(ctx);
  47. }
  48. @Override
  49. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  50. System.out.println(ctx.channel().remoteAddress() + " channelInactive " );
  51. super.channelInactive(ctx);
  52. }
  53. @Override
  54. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  55. System.out.println(ctx.channel().remoteAddress() + " exceptionCaught :" + cause.getMessage() );
  56. super.exceptionCaught(ctx, cause);
  57. if(exceptionFutureHandler !=null){
  58. exceptionFutureHandler.hander(cause.getMessage());
  59. }
  60. }
  61. }

这里面使用了上面xml中定义的三个业务处理Handler。

七、业务Handler

7.1 Handler接口

Handler :

  1. package cn.pomit.springwork.netty.handler;
  2. public interface Handler {
  3. public static String COMMONRET="200";
  4. public String hander(String msg);
  5. }

7.2 BussinessFutureHandler

BussinessFutureHandler:

  1. package cn.pomit.springwork.netty.handler;
  2. public class BussinessFutureHandler implements Handler {
  3. private Handler nextHandler;
  4. public Handler getNextHandler() {
  5. return nextHandler;
  6. }
  7. public void setNextHandler(Handler nextHandler) {
  8. this.nextHandler = nextHandler;
  9. }
  10. @Override
  11. public String hander(String msg) {
  12. System.out.println("接收到信息:" + msg);
  13. if (nextHandler != null) {
  14. nextHandler.hander(msg);
  15. }
  16. return msg;
  17. }
  18. }

7.3 CloseFutureHandler

CloseFutureHandler:

  1. package cn.pomit.springwork.netty.handler;
  2. public class CloseFutureHandler implements Handler {
  3. private Handler nextHandler;
  4. public Handler getNextHandler() {
  5. return nextHandler;
  6. }
  7. public void setNextHandler(Handler nextHandler) {
  8. this.nextHandler = nextHandler;
  9. }
  10. @Override
  11. public String hander(String msg) {
  12. System.out.println(msg + "正在关闭。");
  13. if (nextHandler != null) {
  14. nextHandler.hander(msg);
  15. }
  16. return COMMONRET;
  17. }
  18. }

7.4 ExceptionFutureHandler

ExceptionFutureHandler:

  1. package cn.pomit.springwork.netty.handler;
  2. public class ExceptionFutureHandler implements Handler {
  3. private Handler nextHandler;
  4. public Handler getNextHandler() {
  5. return nextHandler;
  6. }
  7. public void setNextHandler(Handler nextHandler) {
  8. this.nextHandler = nextHandler;
  9. }
  10. @Override
  11. public String hander(String msg) {
  12. System.out.println("出现异常,异常信息:" + msg);
  13. if (nextHandler != null) {
  14. nextHandler.hander(msg);
  15. }
  16. return COMMONRET;
  17. }
  18. }

至此,Spring整合netty简单的处理方式就完成了。

快速构建项目

Spring组件化构建

喜欢这篇文章么,喜欢就加入我们一起讨论Spring技术吧!
品茗IT交流群

发表评论

表情:
评论列表 (有 0 条评论,610人围观)

还没有评论,来说两句吧...

相关阅读