netty源码阅读之pipeline之添加channelHandler 男娘i 2022-05-16 03:53 223阅读 0赞 添加channelHandler分为以下几个步骤: 1、判断是否重复添加 2、创建节点并添加至链表 3、回调添加完成事件 添加handler由用户代码addLast进入,最终会来到DefaultChannelPipeline的这里: @Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this; } ### 判断是否重复添加 ### 从这个方法进入checkMultiplicity(handler); private static void checkMultiplicity(ChannelHandler handler) { if (handler instanceof ChannelHandlerAdapter) { ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler; if (!h.isSharable() && h.added) { throw new ChannelPipelineException( h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times."); } h.added = true; } } 首先判断是否ChannelHandlerAdapter的实例,是,强转, 判断如果不是分享的,并且添加过,就报错。如果添加成功之后,h.added=true,表示已经添加过。 这里我们回顾一下反射,从isHarasable()进入: /** * Return {@code true} if the implementation is {@link Sharable} and so can be added * to different {@link ChannelPipeline}s. */ public boolean isSharable() { /** * Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a * {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different * {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of * {@link Thread}s are quite limited anyway. * * See <a href="https://github.com/netty/netty/issues/2289">#2289</a>. */ Class<?> clazz = getClass(); Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache(); Boolean sharable = cache.get(clazz); if (sharable == null) { sharable = clazz.isAnnotationPresent(Sharable.class); cache.put(clazz, sharable); } return sharable; } 查看是否有shareable的cache,如果为空,那就通过放射的方式获取并放到cache里面,方便下次读取,并返回刚刚获取到的。否则,直接返回结果。 ### 创建节点并添加至链表 ### 回到一开始的代码,创建节点和添加链表: newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); 我们可以大概的想象到,这个newContext把handler包装成context,然后添加到这个pipeline的是这个context也就是newCtx。 首先我们看看filterName做了什么,表面上就是过滤名字的意思: private String filterName(String name, ChannelHandler handler) { if (name == null) { return generateName(handler); } checkDuplicateName(name); return name; } 如果名字为空,那就创建名字;否则,就判断名字是否重复: private void checkDuplicateName(String name) { if (context0(name) != null) { throw new IllegalArgumentException("Duplicate handler name: " + name); } } 最后看context0(name)到底做了什么: private AbstractChannelHandlerContext context0(String name) { AbstractChannelHandlerContext context = head.next; while (context != tail) { if (context.name().equals(name)) { return context; } context = context.next; } return null; } 很粗暴,一步步循环判断整个链表的元素context的名字是否和当前相同。 回来newContext看看怎么包装吧: private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); } 然后是添加addLast0(newCtx); private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; } 这就是双向链表在尾部添加节点的操作,但是这个节点必须在tail节点之前,也就是保证tail必须在最后面。 ### 回调添加完成事件 ### 回到这段代码: EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } 从callHandlerAdded0(newCtx)进入: private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { try { ctx.handler().handlerAdded(ctx); ctx.setAddComplete(); } catch (Throwable t) { boolean removed = false; try { remove0(ctx); try { ctx.handler().handlerRemoved(ctx); } finally { ctx.setRemoved(); } removed = true; } catch (Throwable t2) { if (logger.isWarnEnabled()) { logger.warn("Failed to remove a handler: " + ctx.name(), t2); } } if (removed) { fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; removed.", t)); } else { fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; also failed to remove.", t)); } } } 首先是通过handlerAdded(ctx)传播回调事件,对于ChannelInitializer这个特殊的handler,会回调到这个方法: @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // suprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. initChannel(ctx); } } 然后回调到initChannel(ctx),在这里判断这段代码是否被执行过: private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { remove(ctx); } return true; } return false; } 然后这个方法initChannel((C) ctx.channel());里面就调用到用户代码实现的匿名方法initChannel: childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new AuthHandler()); //.. } }); 然后继续添加handler。如果是ChannelInitializer,那么最后还会remove(ctx)删除自己,删除下一篇文章会讲解到。 如果是普通的用户代码的handler例如AuthHandler(),就会回调到我们自己的写的方法的handlerAdded() 最后通过setAddComplete设置添加完成状态,借助cas的操作: final void setAddComplete() { for (;;) { int oldState = handlerState; // Ensure we never update when the handlerState is REMOVE_COMPLETE already. // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not // exposing ordering guarantees. if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) { return; } } }
还没有评论,来说两句吧...