Springboot+Netty搭建TCP服务端
Netty是业界最流行的nio框架之一,它具有功能强大、性能优异、可定制性和可扩展性的优点
Netty的优点:
1.API使用简单,开发入门门槛低。
2.功能十分强大,预置多种编码解码功能,支持多种主流协议。
3.可定制、可扩展能力强,可以通过其提供的ChannelHandler进行灵活的扩展。
4.性能优异,特别在综合性能上的优异性。
5.成熟,稳定,适用范围广。
6.可用于智能GSM/GPRS模块的通讯服务端开发,使用它进行MQTT协议的开发。
Netty结合Springboot快速开发框架搭建服务端程序:
SpringBoot+Netty实现TCP服务端客户端的源码Demo
新建Springboot的maven项目,pom.xml文件导入依赖包
<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
<relativePath />
</parent>
<groupId>boot.base.tcp.server</groupId>
<artifactId>boot-example-base-tcp-server-2.0.5</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>boot-example-base-tcp-server-2.0.5</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>swagger-bootstrap-ui</artifactId>
<version>1.9.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打包成一个可执行jar -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Springboot启动类,Netty启动
package boot.example.tcp.server;
import boot.example.tcp.server.netty.BootNettyServer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
/**
* 蚂蚁舞
*/
@SpringBootApplication
@EnableAsync
public class BootNettyServerApplication implements CommandLineRunner{
public static void main( String[] args ) {
SpringApplication app = new SpringApplication(BootNettyServerApplication.class);
app.run(args);
System.out.println( "Hello World!" );
}
@Async
@Override
public void run(String... args) throws Exception {
/**
* 使用异步注解方式启动netty服务端服务
*/
new BootNettyServer().bind(6655);
}
}
Netty的server类
package boot.example.tcp.server.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* 蚂蚁舞
*/
public class BootNettyServer {
public void bind(int port) throws Exception {
/**
* 配置服务端的NIO线程组
* NioEventLoopGroup 是用来处理I/O操作的Reactor线程组
* bossGroup:用来接收进来的连接,workerGroup:用来处理已经被接收的连接,进行socketChannel的网络读写,
* bossGroup接收到连接后就会把连接信息注册到workerGroup
* workerGroup的EventLoopGroup默认的线程数是CPU核数的二倍
*/
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
/**
* ServerBootstrap 是一个启动NIO服务的辅助启动类
*/
ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
* 设置group,将bossGroup, workerGroup线程组传递到ServerBootstrap
*/
serverBootstrap = serverBootstrap.group(bossGroup, workerGroup);
/**
* ServerSocketChannel是以NIO的selector为基础进行实现的,用来接收新的连接,这里告诉Channel通过NioServerSocketChannel获取新的连接
*/
serverBootstrap = serverBootstrap.channel(NioServerSocketChannel.class);
// option是设置 bossGroup,childOption是设置workerGroup
/**
* 服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝(队列被接收后,拒绝的客户端下次连接上来只要队列有空余就能连上)
*/
serverBootstrap = serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
/**
* 立即发送数据,默认值为Ture(Netty默认为True而操作系统默认为False)。
* 该值设置Nagle算法的启用,改算法将小的碎片数据连接成更大的报文来最小化所发送的报文的数量,如果需要发送一些较小的报文,则需要禁用该算法。
* Netty默认禁用该算法,从而最小化报文传输延时。
*/
serverBootstrap = serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
/**
* 连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。
* 可以将此功能视为TCP的心跳机制,默认的心跳间隔是7200s即2小时, Netty默认关闭该功能。
*/
serverBootstrap = serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
/**
* 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息
*/
serverBootstrap = serverBootstrap.childHandler(new BootNettyChannelInitializer<SocketChannel>());
/**
* 绑定端口,同步等待成功
*/
ChannelFuture f = serverBootstrap.bind(port).sync();
if(f.isSuccess()){
System.out.println("netty server start success!");
/**
* 等待服务器监听端口关闭
*/
f.channel().closeFuture().sync();
}
} catch (InterruptedException e) {
System.out.println(e.toString());
} finally {
/**
* 退出,释放线程池资源
*/
bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync();
}
}
}
通道初始化
package boot.example.tcp.server.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import java.util.concurrent.TimeUnit;
/**
* 通道初始化
* 蚂蚁舞
*/
@ChannelHandler.Sharable
public class BootNettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {
public static long READ_TIME_OUT = 60;
public static long WRITE_TIME_OUT = 60;
public static long ALL_TIME_OUT = 60;
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(READ_TIME_OUT, WRITE_TIME_OUT, ALL_TIME_OUT, TimeUnit.SECONDS));
// 带编码
ch.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
// // ChannelOutboundHandler,依照逆序执行
// ch.pipeline().addLast("encoder", new StringEncoder());
//
// // 属于ChannelInboundHandler,依照顺序执行
// ch.pipeline().addLast("decoder", new StringDecoder());
//自定义ChannelInboundHandlerAdapter
ch.pipeline().addLast(new BootNettyChannelInboundHandlerAdapter());
}
}
I/O数据读写处理类
package boot.example.tcp.server.netty;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* I/O数据读写处理类
* 蚂蚁舞
*/
@ChannelHandler.Sharable
public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter{
/**
* 注册时执行
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
System.out.println("--channelRegistered--"+ctx.channel().id().toString());
}
/**
* 离线时执行
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
System.out.println("--channelUnregistered--"+ctx.channel().id().toString());
}
/**
* 从客户端收到新的数据时,这个方法会在收到消息时被调用
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
if(msg == null){return;}
String data = (String) msg;
data = data.replaceAll("\r|\n", "");
String channelId = ctx.channel().id().toString();
System.out.println("channelId="+channelId + "data="+data);
// 这里我将通道id作为code来使用,实际是需要msg里来摘取的客户端数据里的唯一值的
// 如果没有则创建 如果有,更新data值
BootNettyChannel b = BootNettyChannelCache.get("server:"+channelId);
if(b == null){
BootNettyChannel bootNettyChannel = new BootNettyChannel();
bootNettyChannel.setChannel(ctx.channel());
bootNettyChannel.setCode("server:"+channelId);
bootNettyChannel.setReport_last_data(data);
BootNettyChannelCache.save("server:"+channelId, bootNettyChannel);
} else {
b.setReport_last_data(data);
}
ctx.writeAndFlush(Unpooled.buffer().writeBytes(("server:"+channelId).getBytes()));
// netty的编码已经指定,因此可以不需要再次确认编码
// ctx.writeAndFlush(Unpooled.buffer().writeBytes(channelId.getBytes(CharsetUtil.UTF_8)));
} catch (Exception e) {
System.out.println("channelRead--"+e.toString());
}
}
/**
* 从客户端收到新的数据、读取完成时调用
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
System.out.println("channelReadComplete");
ctx.flush();
}
/**
* 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException {
System.out.println("exceptionCaught");
cause.printStackTrace();
BootNettyChannel bootNettyChannel = BootNettyChannelCache.get("server:"+ctx.channel().id().toString());
if(bootNettyChannel != null){
BootNettyChannelCache.remove("server:"+ctx.channel().id().toString());
}
ctx.close();//抛出异常,断开与客户端的连接
}
/**
* 客户端与服务端第一次建立连接时 执行
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {
super.channelActive(ctx);
ctx.channel().read();
InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = inSocket.getAddress().getHostAddress();
//此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接
System.out.println("channelActive:"+clientIp+ctx.name());
}
/**
* 客户端与服务端 断连时 执行
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
super.channelInactive(ctx);
InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = inSocket.getAddress().getHostAddress();
System.out.println("channelInactive:"+clientIp);
BootNettyChannel bootNettyChannel = BootNettyChannelCache.get("server:"+ctx.channel().id().toString());
if(bootNettyChannel != null){
BootNettyChannelCache.remove("server:"+ctx.channel().id().toString());
}
ctx.close(); //断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
}
/**
* 服务端当read超时, 会调用这个方法
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
super.userEventTriggered(ctx, evt);
InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = inSocket.getAddress().getHostAddress();
ctx.close();//超时时断开连接
System.out.println("userEventTriggered:"+clientIp);
}
}
BootNettyChannel
package boot.example.tcp.server.netty;
import io.netty.channel.Channel;
/**
* 蚂蚁舞
*/
public class BootNettyChannel {
// 连接客户端唯一的code
private String code;
// 客户端最新发送的消息内容
private String report_last_data;
private transient volatile Channel channel;
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getReport_last_data() {
return report_last_data;
}
public void setReport_last_data(String report_last_data) {
this.report_last_data = report_last_data;
}
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
}
BootNettyChannelCache
package boot.example.tcp.server.netty;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 蚂蚁舞
*/
public class BootNettyChannelCache {
public static volatile Map<String, BootNettyChannel> channelMapCache = new ConcurrentHashMap<String, BootNettyChannel>();
public static void add(String code, BootNettyChannel channel){
channelMapCache.put(code,channel);
}
public static BootNettyChannel get(String code){
return channelMapCache.get(code);
}
public static void remove(String code){
channelMapCache.remove(code);
}
public static void save(String code, BootNettyChannel channel) {
if(channelMapCache.get(code) == null) {
add(code,channel);
}
}
}
BootNettyController
package boot.example.tcp.server.controller;
import boot.example.tcp.server.netty.BootNettyChannel;
import boot.example.tcp.server.netty.BootNettyChannelCache;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 蚂蚁舞
*/
@RestController
public class BootNettyController {
@GetMapping(value = {"", "/"})
public String index() {
return "netty springBoot tcp demo";
}
@GetMapping("/clientList")
public List<Map<String,String>> clientList() {
List<Map<String,String>> list = new ArrayList<>();
for (Map.Entry<String, BootNettyChannel> entry : BootNettyChannelCache.channelMapCache.entrySet()) {
Map<String, String> map = new HashMap<String, String>();
map.put("code", entry.getKey());
//map.put("code", entry.getValue().getCode());
map.put("report_last_data", entry.getValue().getReport_last_data());
list.add(map);
}
return list;
}
@PostMapping("/downDataToAllClient")
public String downDataToAllClient(@RequestParam(name="content", required = true) String content) {
for (Map.Entry<String, BootNettyChannel> entry : BootNettyChannelCache.channelMapCache.entrySet()) {
BootNettyChannel bootNettyChannel = entry.getValue();
if(bootNettyChannel != null && bootNettyChannel.getChannel().isOpen()){
bootNettyChannel.getChannel().writeAndFlush(Unpooled.buffer().writeBytes(content.getBytes()));
// netty的编码已经指定,因此可以不需要再次确认编码
// bootNettyChannel.getChannel().writeAndFlush(Unpooled.buffer().writeBytes(content.getBytes(CharsetUtil.UTF_8)));
}
}
return "ok";
}
@PostMapping("/downDataToClient")
public String downDataToClient(@RequestParam(name="code", required = true) String code, @RequestParam(name="content", required = true) String content) {
BootNettyChannel bootNettyChannel = BootNettyChannelCache.get(code);
if(bootNettyChannel != null && bootNettyChannel.getChannel().isOpen()){
bootNettyChannel.getChannel().writeAndFlush(Unpooled.buffer().writeBytes(content.getBytes()));
// netty的编码已经指定,因此可以不需要再次确认编码
// bootNettyChannel.getChannel().writeAndFlush(Unpooled.buffer().writeBytes(content.getBytes(CharsetUtil.UTF_8)));
return "success";
}
return "fail";
}
}
SwaggerConfig
package boot.example.tcp.server;
import com.google.common.base.Predicates;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
/**
* 蚂蚁舞
*/
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket createRestApi(){
return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()
.apis(RequestHandlerSelectors.any()).paths(PathSelectors.any())
.paths(Predicates.not(PathSelectors.regex("/error.*")))
.paths(PathSelectors.regex("/.*"))
.build().apiInfo(apiInfo());
}
private ApiInfo apiInfo(){
return new ApiInfoBuilder()
.title("netty tcp 服务端demo")
.description("netty tcp 服务端接口测试demo")
.version("0.01")
.build();
}
/**
* http://localhost:6654/doc.html 地址和端口根据实际项目查看
*/
}
目录结构
├─boot-example-base-tcp-server-2.0.5
│ │ pom.xml
│ │
│ ├─src
│ │ ├─main
│ │ │ ├─java
│ │ │ │ └─boot
│ │ │ │ └─example
│ │ │ │ └─tcp
│ │ │ │ └─server
│ │ │ │ │ BootNettyServerApplication.java
│ │ │ │ │ SwaggerConfig.java
│ │ │ │ │
│ │ │ │ ├─controller
│ │ │ │ │ BootNettyController.java
│ │ │ │ │
│ │ │ │ └─netty
│ │ │ │ BootNettyChannel.java
│ │ │ │ BootNettyChannelCache.java
│ │ │ │ BootNettyChannelInboundHandlerAdapter.java
│ │ │ │ BootNettyChannelInitializer.java
│ │ │ │ BootNettyServer.java
│ │ │ │
│ │ │ └─resources
│ │ │ application.properties
│ │ │
│ │ └─test
│ │ └─java
│ │ └─boot
│ │ └─example
│ │ └─tcp
│ │ └─server
│ │ BootNettyServerApplicationTest.java
│ │
很简单的几个类加swagger,启动Springboot应用的同时也就启动了Netty
Netty Server端口:6655
SpringBoot Web端口: 6654
访问
http://localhost:6654/doc.html
使用常见的tcp客户端工具发送字母或数字(客户端工具发送中文可能出现乱码的,虽然程序已经处理了中文乱码,但依旧容易出现,处理办法是用netty写一个客户端来测试)
可以看到客户端发送消息,服务端能收到消息,并且在服务端做了保活,服务端也可以根据客户端的信息向客户端发送消息
Springboot整合Netty的服务端demo开发测试完成。
注意:如果乱码的话需要统一编码,最简单的方式
// 带编码
ch.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
还没有评论,来说两句吧...