Netty4之处理链
本文是基于Netty4.1.x,Handler在Netty占据着很重要的位置,跟Servlet中的filter很像,通过Handler可以完成消息的编解码、拦截指定的消息、统一对日志错误进行处理、统一对请求进行计数。所有的Handler都实现ChannelHandler接口,分为两大类,ChannelInboundHandler与ChannelOutboundHandler,ChannelInboundHandler对从客户端发往服务器的报文进行处理,一般用来执行解码、读取客户端数据、进行业务处理等;ChannelOutboundHandler对从服务器发往客户端的报文进行处理,一般用来进行编码、发送报文到客户端。Netty中,可以注册多个handler。ChannelInboundHandler按照注册的先后顺序执行;ChannelOutboundHandler按照注册的先后顺序逆序执行,如下图所示,按照注册的先后顺序对Handler进行排序,request进入Netty后的执行顺序为:
从上图可以看出,在入站时,request会依次经过相关的Inbound处理器,然后出站时response也经过相应的Outbound处理器。从上图中可以看出Handler组成了一个链表放在ChannelPipeplie中,下面看看ChannelPipe中的Handler如何组成链表的。下面是一段NettyServer初始化的代码:
public class NettyServer {
public static void main(String[] args) throws Exception {
new NettyServer().start("127.0.0.1", 8081);
}
public void start(String host, int port) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
EventLoopGroup bossGroup = new NioEventLoopGroup(0, executorService);//Boss I/O线程池,用于处理客户端连接,连接建立之后交给work I/O处理
EventLoopGroup workerGroup = new NioEventLoopGroup(0, executorService);//Work I/O线程池
EventExecutorGroup businessGroup = new DefaultEventExecutorGroup(2);//业务线程池
ServerBootstrap server = new ServerBootstrap();//启动类
server.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(businessGroup, new ServerHandler());
}
});
server.childOption(ChannelOption.TCP_NODELAY, true);
server.childOption(ChannelOption.SO_RCVBUF, 32 * 1024);
server.childOption(ChannelOption.SO_SNDBUF, 32 * 1024);
InetSocketAddress addr = new InetSocketAddress(host, port);
server.bind(addr).sync().channel();//启动服务
}
}
从上面看出在initChannel时会调用 pipeline.addLast(“decoder”, new StringDecoder());逐个将Handler加入到Pipeline,下在看看 pipeline.addLast()这个方法:
@Override
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
上面代码片中 addLast()方法中先调用 checkMultiplicity(handler)检查一下是否有重复,然后将handler包装成一个AbstractChannelHandlerContext然后再调用 addLast0(newCtx);将newCtx放入到链表中,从addLast0方法中可以看出,处理链是一个双向链表。到这里处理链的执行逻辑及组成方式就说完了总结一下:
- 处理链是一个有顺的双向链表放在ChannelPipeline(DefaultChannelPipeline)中,消息上行或下行时依次会被相关的处理器处理;
- 可以通过 channel.pipeline().addLast()、 channel.pipeline().addFirst()、channel.pipeline().addBefore()等方法向链表的指定位置添加处理器,也可以在处理过程中动态添加或删除处理器;
- 可以为Handler指定处理的业务线程池;
- 可以使用相关的api将消息在处理链路中流转,如ctx.fireChannelRead()表示将消息传递到下一个处理器,ctx.channel().pipeline()表示将消息放到pipeline让处理器从第一个处理器开始处理;
还没有评论,来说两句吧...