记一次netty的使用
前言:笔者在项目中使用的netty的场景是这样的,(物联网公司)所有机柜使用长连接连接到服务器,而服务器使用的正好是netty,所以不得已笔者需要学习netty的一些知识,这里只是浅浅的使用笔记,更深入的知识需要后期学习。这里是一个简单使用的笔记
准备:集成netty项目,需要准备好一个能访问使用的项目(略)
1:pom.xml文件(笔者认为所有项目第一步)导入netty的jar包
<!--netty start-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
<scope>compile</scope>
</dependency>
<!--netty end-->
因为业务需要有一些json的使用,所以还需要json的jar包,如果没有json需求可以不使用用其它代替也可
<!--fastjson start-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.3</version>
</dependency>
<!--fastjson end-->
2:开发netty的主要几个文件,这里分别是ServerHandler,ServerMain,NettyProperties,NettyConfig,业务类的主要几个文件ReturnInfo,ExecuteService和其它业务类等
netty部分代码
ServerHandler类
package com.xiaomage.crm.netty;
import com.xiaomage.crm.taskQueue.SpringContextUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @version V1.0
* @Title: netty服务器处理消息的类
* @date: 2019/3/18 15:53
*/
public class ServerHandler extends ChannelInboundHandlerAdapter{
//日志
protected static Logger logger = LoggerFactory.getLogger(ServerHandler.class);
/**
* 客户端与服务端创建连接的时候调用
* 只发生在第一次连接,后续发送消息不再连接
*/
@Override
public void channelActive(ChannelHandlerContext ctx){
logger.info("client connection start ..." + ctx);
//自己的业务处理保存通道信息
NettyConfig.addTemporary_Channel(ctx.channel());
}
/**
* 客户端与服务端断开连接时调用
*/
@Override
public void channelInactive(ChannelHandlerContext ctx){
logger.info("client close ..." + ctx);
//自己的业务处理清除通道信息
NettyConfig.removeClient(ctx.channel());
}
/**
* 工程出现异常的时候调用
* 异常处理逻辑
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
String line="server exceptionCaught method run:"+cause.getMessage();
if (NettyProperties.loggerDebugger) {
logger.info("exceptionCaught:"+line);
}
NettyConfig.removeClient(ctx.channel());
//连接已经断开
ctx.close();
}
/**
* 服务端处理客户端socket请求的核心方法,这里接收了客户端发来完整信息
* 业务处理逻辑放在这里 == 格式自己和客户端进行协商,笔者这里是
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object info){
String message = info.toString();
if (NettyProperties.loggerDebugger) {
logger.info("read:"+message);
}
//截取制定字符串,判断类型
int T_index = -1;
if((T_index=message.indexOf("\"T\""))!=-1){
int VCode_index = T_index + 4;
String vCode = message.substring(VCode_index, VCode_index+4);
vCode = vCode.replaceAll("\"", "");
vCode = vCode.replaceAll("'", "");
vCode = vCode.trim();
if(!vCode.equalsIgnoreCase("vv") && !vCode.equalsIgnoreCase("vc")){
vCode = vCode.replace("V","v");
}
if(!NettyConfig.VcStart || vCode.equalsIgnoreCase("vc")){
Object res= null;
try {
//动态调用方法 ==BeansUtils spring调用bean对象方法
ExecuteService execute = SpringContextUtil.getBean(vCode,ExecuteService.class);
// 用反射获取
// Object [] params = new Object[2];
// params[0]=ctx.channel();
// params[1]=message;
// res = execute.getClass().getDeclaredMethod("handler", NioSocketChannel.class,String.class).invoke(execute,params);
//直接获取
res = execute.handler(ctx.channel(),message);
}catch (Exception e) {
System.out.println("解析异常跑错"+e.getMessage());
LogErrorMessage(ctx.channel(),e.getMessage(),message,ReturnInfo.ERROR_UNKNOWN);
}
if(res!=null){
String line = res.toString();
ReturnInfo.sendMsg(ctx.channel(),line);
}
}else{
ReturnInfo.sendMsg(ctx.channel(),NettyConfig.VcContent);
ctx.close();
}
}else{
String errorInfo=ReturnInfo.DownStatePackage(ReturnInfo.ERROR_T_IS_NOT_FOUND);
ReturnInfo.sendMsg(ctx.channel(),errorInfo);
}
}
/**
* @param ctx 通道
* @param errMessage 异常信息
* @param channelContent 通道数据
* @param ErrorCode 异常代码
* @Title: 处理异常数据
* @date: 2020/1/9 11:14
* @version V1.0
*/
private void LogErrorMessage(Channel ctx, String errMessage, String channelContent, int ErrorCode) {
logger.error("unknown error:"+errMessage+",当前数据:"+channelContent);
String res= ReturnInfo.DownStatePackage(ErrorCode);
ReturnInfo.sendMsg(ctx,res);
}
}
ServerMain类
package com.xiaomage.crm.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
/**
* 1. 双线程组
* 2. Bootstrap配置启动信息
* 3. 注册业务处理Handler
* 4. 绑定服务监听端口并启动服务
*/
@Component
public class ServerMain {
//日志
protected static Logger logger = LoggerFactory.getLogger(ServerMain.class);
// 监听线程组,监听客户端请求
private EventLoopGroup acceptorGroup = null;
// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
private EventLoopGroup clientGroup = null;
// 服务启动相关配置信息
private ServerBootstrap bootstrap = null;
public ServerMain(){
init();
}
//初始化设置 netty相关模式
private void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 绑定线程组
bootstrap.group(acceptorGroup, clientGroup);
// 设定通讯模式为NIO
bootstrap.channel(NioServerSocketChannel.class);
// 设定缓冲区大小
bootstrap.option(ChannelOption.SO_BACKLOG, NettyProperties.so_backlog);
// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, NettyProperties.so_sndbuf)
.option(ChannelOption.SO_RCVBUF, NettyProperties.so_rcvbuf)
.option(ChannelOption.SO_KEEPALIVE, true);
}
//netty服务启动的主要方法
public ChannelFuture doAccept(int port) throws InterruptedException{
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
//tcp连接入口 默认重新initChannel方法
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 数据分隔符, 定义的数据分隔符一定是一个ByteBuf类型的数据对象。
ByteBuf [] delimiters = new ByteBuf [2];
delimiters[0] = Unpooled.copiedBuffer("\\r\\n".getBytes());
delimiters[1] = Unpooled.copiedBuffer(NettyProperties.CARRIAGE_RETURN.getBytes());
// 处理固定结束标记符号的Handler。这个Handler没有@Sharable注解修饰,
// 必须每次初始化通道时创建一个新对象
// 使用特殊符号分隔处理数据粘包问题,也要定义每个数据包最大长度。netty建议数据有最大长度。
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(NettyProperties.maxFrameLength, true,true,delimiters));
//字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
socketChannel.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
// 定义一个定时断线处理器,当多长时间内,没有任何的可读取数据,自动断开连接。
// 构造参数,就是间隔时长。 默认的单位是秒。
// 自定义间隔时长单位。 new ReadTimeoutHandler(long times, TimeUnit unit);
socketChannel.pipeline().addLast(new ReadTimeoutHandler(NettyProperties.readTimeout, TimeUnit.SECONDS));
//ServerHandler 自定义的处理 netty服务器处理消息类 -- 注册,发送消息,接收消息都在这个类
socketChannel.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
//线程组关闭方法
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
@PostConstruct
public void start(){
//加载参数
NettyProperties.init();
//使用1个线程启动netty服务
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
ChannelFuture future = null;
ServerMain server = null;
try{
server = new ServerMain();
future = server.doAccept(NettyProperties.port);
logger.info("server started...");
logger.info("启动成功");
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//关闭启动类
if(null != server){
server.release();
}
}
}
});
thread.start();
}
}
NettyProperties类
package com.xiaomage.crm.netty;
import java.util.function.Function;
/**
* @version V1.0
* @Title: netty 配置信息
* @Description: //netty 配置信息
* @date: 2019/9/24 14:31
*/
public class NettyProperties {
public static int port=9999; // 监听端口
public static int so_backlog=1024; // 设定缓冲区大小/可链接数
public static int so_sndbuf=16*1024; // SO_SNDBUF发送缓冲区
public static int so_rcvbuf=16*1024; // SO_RCVBUF接收缓冲区
public static int maxFrameLength=1024; // 每个数据包最大长度
public static int readTimeout=60; // 定义一个定时断线处理器,当多长时间内,没有任何的可读取数据,自动断开连接(单位秒)
public static String CARRIAGE_RETURN="\r\n"; // 结束符
public static boolean loggerDebugger=false;
/**
* 初始化配置文件的参数
*/
public static void init(){
Function<String, Integer> function = Integer::parseInt;
// 读配置文件赋值
// port= function.apply(Global.getConfig("port"));
// so_backlog= function.apply(Global.getConfig("so_backlog"));
// so_sndbuf= function.apply(Global.getConfig("so_sndbuf"));
// so_rcvbuf= function.apply(Global.getConfig("so_rcvbuf"));
// maxFrameLength= function.apply(Global.getConfig("maxFrameLength"));
// readTimeout= function.apply(Global.getConfig("readTimeout"));
// CARRIAGE_RETURN= Global.getConfig("CARRIAGE_RETURN");
}
}
NettyConfig类
package com.xiaomage.crm.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.springframework.util.StringUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @version V1.0
* @Title: 存储整个工程的全局配置
*/
public class NettyConfig {
/**
* 存储每一个客户端接入进来时的channel对象
*/
private static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public static boolean VcStart = false;
public static String VcContent = "sorry,is close!";
/**
* 认证通道,存储channel和相应客户端编号
* key:设备编号
* val:通道
*/
private static Map<String, ChannelId> clientMap = new ConcurrentHashMap<>();
/**
* 未认证的通道
* key:Channel:通道
* val:链接时间
*/
private static Map<Integer,Long> temporary_Channel = new ConcurrentHashMap<>();
/**
* 添加临时通道
* 自己的业务处理临时通道目的(将通道与自己的业务信息绑定)
*/
public static void addTemporary_Channel(Channel channel){
temporary_Channel.put(channel.id().hashCode(),System.currentTimeMillis());
group.add(channel);
}
/**
* 判断当前通道是否未注册
*/
public static boolean ChannelRegister(Channel channel){
return temporary_Channel.containsKey(channel.id().hashCode());
}
/**
* 添加删除临时通道
*/
public static void removeTemporary_Channel(Channel channel){
temporary_Channel.remove(channel.id().hashCode());
}
/**
* 保存设备的编号和连接通道的关系
* @param channel
* @param ap
*/
public static void addClient(Channel channel,String ap){
//已认证设备清除临时通道
removeTemporary_Channel(channel);
//存储已认证设备
clientMap.put(ap,channel.id());
}
/**
* 获取通道Channel
* @param ap 设备编号
* @return
*/
public static Channel getClient(String ap){
ChannelId channelId=clientMap.get(ap);
if(channelId==null)
return null;
Channel Channel=group.find(channelId);
return Channel;
}
/**
* 获取所有通道ChannelAll
* @return
*/
public static Map<String, ChannelId> getClientAll(){
return clientMap;
}
/**
* 清除通道
* @param channel 通道Channel
* @return
*/
public static void removeClient(Channel channel){
ChannelId cur_channelId=channel.id();
String key=getChannelAp(cur_channelId);
if(!StringUtils.isEmpty(key)){
clientMap.remove(key);
}
group.remove(channel);
}
/**
* 根据通道信息获取设备编号
* @param channelid 通道
* @return
*/
public static String getChannelAp(ChannelId channelid){
for (String key : clientMap.keySet()) {
ChannelId channelId=clientMap.get(key);
if(channelid.asLongText().equalsIgnoreCase(channelId.asLongText())){
return key;
}
}
return null;
}
}
3业务类
ExecuteService类
package com.xiaomage.crm.netty;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import org.springframework.stereotype.Service;
/**
* @version V1.0
* @Title: 业务处理接口
* @Description: //业务处理接口
* @date: 2019/9/24 16:32
*/
@Service
public class ExecuteService {
/**
* 当前接受的数据转换为json之后的数据
*/
protected JSONObject jsonObject;
/**
* 设备编号
*/
protected String deviceID;
/**
* 功能码编号
*/
protected String T;
/**
* 本次连接是否合法或者数据是否合法,默认为不合法
*/
protected boolean success;
/**
* 相应的业务处理
* @param mes 接收到的数据
* @return
*/
public String handler(Channel channel, String mes){
this.success = false;
this.jsonObject = veriData(mes);
if(jsonObject==null){
//解析失败
return ReturnInfo.DownStatePackage(ReturnInfo.ERROR_ANALYSIS);
}
this.T = this.jsonObject.getString("T");
if(!"VV".equals(T)){
if(NettyConfig.ChannelRegister(channel)){
//未注册
return ReturnInfo.DownStatePackage(ReturnInfo.ERROR_REGISTER);
}
}
this.success = true;
return ReturnInfo.DownStatePackage(ReturnInfo.ERROR_UNKNOWN);
}
/**
* 验证接收到的数据是否正确
* @param data 接收到的JSON数据
* @return 返回JSONObject
*/
public static JSONObject veriData(String data) {
JSONObject jsonObject=null;
//剔除空的在占位
data=data.trim();
try {
String tempData = data;
jsonObject = ((JSONObject) JSONObject.parse(tempData));
} catch (Exception e) {
System.out.println("convert JSONObject error:"+e.getMessage());
}
return jsonObject;
}
}
ReturnInfo类
package com.xiaomage.crm.netty;
import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.util.LinkedHashMap;
/**
* @version V1.0
* @Title: 返回信息控制类
* @Description: //返回信息,以及描述返回状态码的信息
* @date: 2019/9/24 17:57
*/
public class ReturnInfo {
/**
* 解析失败
*/
public static final int ERROR_ANALYSIS=0;
/**
* 接收成功
*/
public static final int sussecs=1;
/**
* 接收成功,有反馈数据(只有一条)
*/
public static final int sussecs2=2;
/**
* 接收成功,有反馈数据(多条)
*/
public static final int sussecs3=3;
/**
* 未知异常
*/
public static final int ERROR_UNKNOWN=-1;
/**
* 超时未通信
*/
public static final int ERROR_OVERTIME=-2;
/**
* T不存在(T表示功能编号,例如V1、V2)
*/
public static final int ERROR_T_IS_NOT_FOUND=-3;
/**
* 未注册
*/
public static final int ERROR_REGISTER=-4;
/**
* L数据出错
*/
public static final int ERROR_L=-5;
/**
* 查询固件不存在
*/
public static final int ERROR_PACKAGE=-6;
/**
* 命令数据转换json对象失败
*/
public static final int ERROR_CONVERTJSON=-7;
// 日志
protected static Logger logger = LoggerFactory.getLogger(ReturnInfo.class);
/**
* 下行-结果包
* @param t 0:json解析失败 1:接收成功(且无数据下发,表心跳) -4:通道未注册 -1:未知异常
* @return String 封装的结果内容
*/
public static String DownStatePackage(int t){
LinkedHashMap linkHashMap = new LinkedHashMap();
linkHashMap.put("S", t);
JSONObject jsonObject = new JSONObject(linkHashMap);
String resultInfo = jsonObject.toJSONString();
return resultInfo;
}
/**
* 指定通道下发数据
* @param channel 通道
* @param msg 发送消息内容
*/
public static ChannelFuture sendMsg(Channel channel, String msg){
//判断是否需要日志
if (NettyProperties.loggerDebugger) {
logger.info("sendMsg:"+ msg);
}
//统一添加结束符
msg = msg + NettyProperties.CARRIAGE_RETURN;
//发送并返回
return channel.writeAndFlush(Unpooled.copiedBuffer(getBytesStr(msg)));
}
/**
* 转换字符串为byte[]
* @param str 要发送的字符串内容
* @return byte[] 子节数组,异常返回null
*/
private static byte[] getBytesStr(String str){
try {
return str.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
logger.error("getBytesStr error:"+e.getMessage());
}
return null;
}
}
其它业务类
package com.xiaomage.crm.netty;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import org.springframework.stereotype.Service;
/**
* @version V1.0
* @Title: 通道注册 业务类
* @Description: //上行--通道注册
* @date: 2019/9/24 16:34
*/
@Service
public class VV extends ExecuteService{
@Override
public String handler(Channel channel, String mes) {
String superres=super.handler(channel,mes);
if(!this.success)
return superres;
JSONObject jsonObject=this.jsonObject;
this.deviceID=jsonObject.get("U").toString();
// //添加通道并注册
NettyConfig.addClient(channel,deviceID);
return ReturnInfo.DownStatePackage(ReturnInfo.sussecs);
}
}
与spring集成获取spring中bean的工具类
package com.xiaomage.crm.taskQueue;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;
/**
* 获取spring的bean
*/
@Component
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (SpringContextUtil.applicationContext == null) {
SpringContextUtil.applicationContext = applicationContext;
//System.out.println( "========ApplicationContext配置成功,在普通类可以通过调用ToolSpring.getAppContext()获取applicationContext对象,applicationContext="+ applicationContext + "========");
}
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
public static <T>T getBean(Class<T> clazz) {
// TODO Auto-generated method stub
return getApplicationContext().getBean(clazz);
}
public static <T> T getBean(String var1, @Nullable Class<T> var2){
return applicationContext.getBean(var1, var2);
}
}
还没有评论,来说两句吧...