Netty(四)_核心源码分析
Netty(四)_核心源码分析
文章目录
- Netty(四)_核心源码分析
- EventLoopGroup实例化源码分析
- .channel()源码剖析
- channel实例化源码分析
- Handler添加源码
- 异步原理_Future&Promise
- .rigister()_1
- .rigister()_2
- .connect()源码分析
- netty请求接收说明
本篇内容门槛还是有的,需要你对我前三篇netty所说的内容不算陌生
为了提高阅读舒适性,我们并不会一行一行代码解析,并删除部分与目的无关的源码。读源码要带着目的性读,最好你已经知道它背后基本原理,然后带着审视的感觉去进行阅读,反客为主。
在整个流程中,以下这张图要常驻脑海中!
我们选择netty提供的example程序,该程序可以在源码包netty.example.echo
包
public final class EchoServer {
//...
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
//SSL配置 用于https
// Configure the server.
// 服务端配置
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 服务端自定义的处理器Handler 这里不进行代码展示
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
//引导器
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) //设置服务端通道
.option(ChannelOption.SO_BACKLOG, 100) //通道配置选项,连接队列容量100
.handler(new LoggingHandler(LogLevel.INFO))//设置服务端通道处理器Handler
.childHandler(new ChannelInitializer<SocketChannel>() {
//客户端通道的处理器
@Override
//客户端通道完成初始化时
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
//向管道链中添加处理器
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//向管道链添加自定义处理器
p.addLast(serverHandler);
}
});
// Start the server.
//异步绑定端口
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
//阻塞直至服务端通道关闭 --> 维持服务端一直提供服务
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
//优雅地关闭所有资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
客户端:
public final class EchoClient {
//属性
static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// Configure SSL.git
//...
// Configure the client.
//客户端只需要一个线程组 以下引导与服务端类似 不赘述
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}
EventLoopGroup实例化源码分析
我们这里从最具有一般性的服务端的EventLoopGroup workerGroup = new NioEventLoopGroup();
入手,看看线程池创建中的细节。
这一步是在服务端绑定端口号之间完成的,主要是为了把所有线程池进行实例化
在追溯源码前,我们根据对EventLoopGroup的认知,即
EventLoopGroup 是一组 EventLoop 的抽象,EventLoopGroup代表着线程池, EventLoop代表线程池中的线程。
在netty中BossGroup和WorkGroup都是NioEventLoopGroup类型。
每个 EventLoop 维护着一个 Selector 实例,Selector的作用是在Group中选择Loop来执行任务。
这里先看看NioEventLoopGroup中参数最全的构造器,看看NioEventLoopGroup的实例化会配置什么参数,或者说,一个Group有什么特性/属性。
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler,
final EventLoopTaskQueueFactory taskQueueFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
rejectedExecutionHandler, taskQueueFactory);
}
参数说明:
- nThreads:线程池中的线程数,也就是 NioEventLoop 的实例数量。
- executor:Java线程池(Executor)
- chooserFactory:线程池选择线程(next)来执行任务的选择策略。追源码的过程中我们也可以看看具体有什么策略。
- selectorProvider:通过它能实例化 Selector,每个线程池都持有一个 selectorProvider 用以实例化selector。
- selectStrategyFactory:这个涉及到的是线程池中线程的工作流程。
- rejectedExecutionHandler:用于处理线程池中没有可用的线程来执行任务的情况的处理器。在 Netty 中稍微有一点点不一样,这个是给 NioEventLoop 实例用的,后面说。
- EventLoopTaskQueueFactory,顾名思义,EventLoop的工作队列。
从无参构造器开始跟踪,看看这些参数都是怎么被赋值的,并且被赋了什么值。
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
//默认构造器
public NioEventLoopGroup() {
this(0);
}
//上面构造器所调用
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
//上面构造器所调用
public NioEventLoopGroup(int nThreads, Executor executor) {
//SelectorProvider.provider()返回selectorProvider
this(nThreads, executor, SelectorProvider.provider());
}
//上面构造器所调用
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
//涉及到线程在做 select 操作和执行任务过程中的策略选择问题
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
//上面构造器所调用
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
//rejectedExecutionHandler(线程池无线程可用的拒绝策略)默认值为RejectedExecutionHandlers.reject()
//点进.reject()发现默认拒绝策略最终抛出异常
//注意,这里将交由父类构造器
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
到父类 MultithreadEventLoopGroup 的构造方法中
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
这里将nThreads重新设置为DEFAULT_EVENT_LOOP_THREADS
该类中的静态代码块能获取本机CPU核数,并将之*2作为nThreads
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
继续往下走,来到父类MultithreadEventExecutorGroup这个构造器
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
//这里重新设置了线程池选择策略
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
这里我们来看看它的选择策略是什么样的
点进DefaultEventExecutorChooserFactory类
//这个方法根据executors的不同,返回不同的策略
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
//如果2^n个线程数
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
//如果不是
return new GenericEventExecutorChooser(executors);
}
}
//我们再看看两个策略的next()方法,之前说过next()是线程池用来选择线程的方法
@Override
//如果2^n个线程数 与
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
@Override
//如果2^n个线程数 模
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
继续构造器:下面这个明显不一样了。我们关注其核心语句
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
//这里实例化executor,即把Java线程池对象进行实例化
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//EventExecutor类型数组,即线程池,其元素是每个线程,即每个Loop
//可用看出EventLoop最终类型是EventExecutor而不说线程对象Thread
children = new EventExecutor[nThreads];
//实例化 children 数组中的每一个元素,即实例化Group中每个loop
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//newChild(executor, args)为每个Loop的实例化入口,重点关注。
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
//...
} finally {
//如果Loop实例化失败
if (!success) {
//之前已实例化的Loop全部关闭
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 等待这些线程成功 shutdown
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// 如果关闭过程发生异常,改变当前线程状态未打断状态,让调用者线程知悉并处理
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//通过之前设置的 chooserFactory 来实例化 Chooser即Group的选择策略,传入children用于根据线程数返回不同的choser
chooser = chooserFactory.newChooser(children);
// 设置一个 Listener 用来监听该线程池的 termination 事件即结束事件
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
//只有每个线程都发生termination事件,这个池的terminationFuture才是成功
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
//给每个loop的结束任务注册监听器
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
//...这个不是重点
}
我们重点关注上面newChild() 这个方法,这个方法非常重要,它将创建线程池中的线程即Group中的Loop。
newChild() 是在NioEventLoopGroup中进行的覆写
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
//返回Loop对象
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
它调用了 NioEventLoop 的构造方法,这里将会生产一个Loop,即“线程”对象:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
//实例化两个任务队列后传进父类构造器
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
//开启了NIO 中最重要的组件:Selector
//即Selector是在Loop实例化的时候进行同步创建的
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
我们看到上面的构造方法调用了父类的构造器,它的父类是 SingleThreadEventLoop。
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
//看到这里设置了也给尾队列,即从子类传上来的两个任务队列拿一个来做尾队列,事实上目前还不指定尾队列是干啥用的。
//事实上可以直接忽略这个东西。
tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}
再点进其父类构造器,其父类是SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
//这一步是对executor即这个loop做一个封装,确保之在运行的时候能返回
this.executor = ThreadExecutorMap.apply(executor, this);
//这里又设置了一个重要的组件taskQueue 默认容量16
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
//如果 submit 的任务堆积了到了 16,再往里面提交任务会触发 rejectedExecutionHandler 的执行策略
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
我们发现它爹是SingleThreadEventExecutor,它的名字告诉我们,它是一个 Executor,是一个线程池,而且是 Single Thread 单线程的,也就是说,线程池 NioEventLoopGroup 中的每一个线程 NioEventLoop 也可以当做一个线程池来用,只不过池中只有一个线程。
好了。
至此为止,小结以下上面代码都做了什么:
- EventLoopGroup实例化的过程中:
- 通过SelectorProvider.provider()持有一个selectorProvider对象。
- 通过DefaultSelectStrategyFactory.INSTANCE持有一个selectStrategyFactory,这指的是一个选择线程池选线程的策略
- 通过RejectedExecutionHandlers.reject()持有无线程可用的拒绝策略
- 在MultithreadEventLoopGroup中设置该线程池的线程数为机器CPU核数*2
- 在MultithreadEventExecutorGroup中通过DefaultEventExecutorChooserFactory.INSTANCE重新持有线程池选线程的策略
- 实例化executor,类型是 ThreadPerTaskExecutor
- 实例化线程池children,类型是EventExecutor
实例化线程池中的线程
- 顺带着实例化Selector
- 在SingleThreadEventLoop中实例化任务队列
- 通过chooserFactory实例化选择策略
线程池 NioEventLoopGroup 创建完成了,并且实例化了池中的所有 NioEventLoop 实例。
同时,大家应该已经看到,上面并没有真正创建 NioEventLoop 中的线程(没有创建 Thread 实例)
创建线程的时机在第一个任务提交过来的时候,即服务端bind()或connect()时涉及到的 register 操作
.channel()源码剖析
根据之前所学的知识,我们对channel应该有了一个基本认识
- 服务端需要NioServerSocketChannel作为其接收请求的通道,客户端需要NioSocketChannel作为其发送与接收服务端消息的通道。
- NioServerSocketChannel 是netty对JDK的ServerSocketChannel 进行的封装,NioSocketChannel是SocketChannel的封装,我们也要从源码中知道其封装过程
从服务端的引导代码开始:
//传入NioServerSocketChannel的类,那它日后必然被反射调用。
.channel(NioServerSocketChannel.class)
跟踪进入到AbstractBootstrap类
// channelFactory负责通过工厂方法反射创建Channel实例
public B channel(Class<? extends C> channelClass) {
//即将进入ReflectiveChannelFactory
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
看下ReflectiveChannelFactory工厂的内容
/**
...[原注释]
根据注释可知ReflectiveChannelFactory负责通过反射调用Channel的默认构造器实例化Channel
*/
//
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
//NioServerSocketChannel的构造器,通过它创建NioServerSocketChannel
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
//...
//此处获得NioServerSocketChanne的默认构造器
this.constructor = clazz.getConstructor();
//...
}
//创建Channel方法
@Override
public T newChannel() {
try {
//利用NioServerSocketChannel的构造器创建NioServerSocketChannel实例
return constructor.newInstance();
} catch (Throwable t) {
//...
}
}
}
这里我们找到了Channel实例化的入口,那么Channel何时被实例化?这里要说明一下:
- 对于 NioSocketChannel,由于它充当客户端的功能,它的实例化时机在
connect(…)
的时候; - 对于 NioServerSocketChannel 来说,它充当服务端功能,它的实例化时机是在绑定端口
bind(…)
的时候。
channel实例化源码分析
根据上面的说明,我们从服务端的connect方法入手,看看其channel的实例化过程
// 注意返回值为ChannelFuture。这个后面细说,现在只需要知道通过它能得知connect的状态与结果
//.sync()为异步方法,关于异步,后面说
ChannelFuture f = b.connect(HOST, PORT).sync();
点进connect()
public ChannelFuture connect(String inetHost, int inetPort) {
return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}
继续跟踪:
public ChannelFuture connect(SocketAddress remoteAddress) {
ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
//参数检验
validate();
return doResolveAndConnect(remoteAddress, config.localAddress());
}
跟踪doResolveAndConnect()
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
//关注点
//这里将会进行Channel实例化
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 略...
}
跟踪initAndRegister():
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//这里进行channel的实例化
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
//...
}
//一个特别重要的方法,观察之,发现与register有关
//即它可能将该客户端channel注册到服务端的WorkGroup上,这个留到后面讲
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
跟踪channelFactory.newChannel() ,根据前面说的,这里是调用 Channel 的无参构造方法进行Channel的实例化。
//该方法出现于上一节[.channel()]所说的ReflectiveChannelFactory中
@Override
public T newChannel() {
try {
//实例化NioSocketChannel
return constructor.newInstance();
} catch (Throwable t) {
//...
}
}
由于是通过反射调用,所以我们直接观察NioSocketChannel 的无参构造方法
//来到NioSocketChannel类
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
//DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
//SelectorProvider用于创建JDK的SocketChannel 实例
}
//为上面构造器调用
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
//为上所调用,传入参数是Java的SocketChannel。
//也就是说,上面的newSocket(provider)会实例化一个SocketChannel
public NioSocketChannel(SocketChannel socket) {
this(null, socket);
}
跟踪newSocket(provider)看看SocketChannel的实例化情况
private static SocketChannel newSocket(SelectorProvider provider) {
try {
//返回实例化的SocketChannel
return provider.openSocketChannel();
} catch (IOException e) {
//...
}
}
NioServerSocketChannel 同理,也非常简单,从 ServerBootstrap#bind(...)
方法一路点进去即可。
回到NioSocketChannel,接着看它的构造器
//上面说的构造器
public NioSocketChannel(SelectorProvider provider) {
//实例化SocketChannel
this(newSocket(provider));
}
//被上面的构造器调用
//此时的Channel还是SocketChannel
public NioSocketChannel(SocketChannel socket) {
this(null, socket);
}
//被上面的构造器调用
public NioSocketChannel(Channel parent, SocketChannel socket) {
//这里调用了父类构造器,传入null和SocketChannel
super(parent, socket);
//实例化配置对象 用于保存channel的配置信息
config = new NioSocketChannelConfig(this, socket.socket());
}
我们跟踪其父类构造器,来到AbstractNioByteChannel
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
//这里又传入了SelectionKey.OP_READ,代表该channel关心的事件是读取
//这正是客户端channel应该关心的事情
super(parent, ch, SelectionKey.OP_READ);
}
//被上面的构造器调用
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
//这里保存了SelectionKey.OP_READ
this.readInterestOp = readInterestOp;
try {
//设置该channel为非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
//...
}
}
跟踪其父类构造器,来到AbstractChannel,发现这里实例化了Pipeline!抱歉,关于Pipeline的内容,我们后面再讲…
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
在服务端的NioServerSocketChannel 的构造方法类似,也设置了非阻塞,然后设置服务端关心的 SelectionKey.OP_ACCEPT 事件,非常合理,对于服务端来说,关心的是 SelectionKey.OP_ACCEPT 事件,等待客户端连接。
对于客户端来说,关心的Read,只要客户端Channel有来自服务器的数据,马上读。
关于 Channel 实例化的问题先到这,目前为止,主要就是实例化了 JDK 层的 SocketChannel 或 ServerSocketChannel,设置了非阻塞模式,实例化了channel的配置对象。
Channel 实例化中的其他问题如其Pipeline、注册到Loop上、channel的处理链handler的实例化等问题,都是围绕着rigister()方法来的,我们后面专门讲解!
还是小解一下:
- 通过ReflectiveChannelFactory持有Channel的默认构造器,同时ReflectiveChannelFactory中留着实例化Channel的方法
channelFactory.newChannel()
直接实例化Channel- newSocket(SelectorProvider)创建JDK的SocketChannel
- 为该通道传入了SelectionKey.OP_READ,代表该channel关心的事件是读取
- 设置该channel为非阻塞
newChannelPipeline()
实例化Pipeline
Handler添加源码
回到echo的程序
b.handler(new ChannelInitializer<SocketChannel> //客户端
b.handler(new LoggingHandler(LogLevel.INFO)) //服务端
之前学习过:一个 Channel 关联一个 pipeline,NioSocketChannel 和 NioServerSocketChannel 实例化的时候,都会走到它们的父类 AbstractChannel 的构造方法中:
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 给每个 channel 分配一个唯一 id
id = newId();
// 每个 channel 内部需要一个 Unsafe 的实例
unsafe = newUnsafe();
// 每个 channel 内部都会创建一个 pipeline
pipeline = newChannelPipeline();
}
关于UnSafe:
在 JDK 的源码中,sun.misc.Unsafe 类提供了一些底层操作的能力,它设计出来是给 JDK 中的源码使用的,比如 AQS、ConcurrentHashMap 等
这个 Unsafe 类不是给我们的代码使用的
Netty 中的 Unsafe 也是同样的意思,它封装了 Netty 中会使用到的 JDK 提供的 NIO 接口,比如将 channel 注册到 selector 上,比如 bind 操作,比如 connect 操作等,这些操作都是稍微偏底层一些。
Netty 同样也是不希望我们的业务代码使用 Unsafe 的实例,它是提供给 Netty 中的源码使用的。
点进newChannelPipeline():
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
跟踪DefaultChannelPipeline():
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
//实例化tail和head 这两个 handler并且串起来
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
前面我们说过管道本质是个双向链表,入站事件从头走,出站事件从尾走。
从上面的 head 和 tail 我们也可以看到,其实 pipeline 中的每个元素是 ChannelHandlerContext 的实例,而不是 ChannelHandler 的实例,context只是对handler进行了一下包装。
看到这里,我们发现现在的pipeline还不涉及到自定义的 handler 代码执行。那我们添加的handler何时被加入到这个pipeline中?
我们从bind()开始看。
bind->doBind->initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//刚讲了,这里是channel的实例化,上面的pipeline也会随之实例化,只不过没有我们添加我们自定义的handler
channel = channelFactory.newChannel();
//这里才涉及我们handler的添加
init(channel);
} catch (Throwable t) {
//...
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
跟踪 init(channel),该方法位于ServerBootstrap类
void init(Channel channel) {
//...
//通过实例化的channel拿到pipeline实例
ChannelPipeline p = channel.pipeline();
//...
//开始往 pipeline 中添加一个 handler,这个handler是ChannelInitializer的实例
p.addLast(new ChannelInitializer<Channel>() {
@Override
//关于ChannelInitializer的initChannel()方法,也挺重要的,我们需要质疑它何时被调用,即我们自定义的handler何时被真正加入后面说
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
//从config中拿到我们.handler()指定的LoggingHandler
ChannelHandler handler = config.handler();
//添加到尾部
if (handler != null) {
pipeline.addLast(handler);
}
//先不管.eventLoop()
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//添加ServerBootstrapAcceptor到pipeline中
//ServerBootstrapAcceptor用于接收客户端的请求
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
关于ChannelInitializer,它本身是一个 handler(Inbound 类型),但是它的作用和普通 handler 有点不一样,它纯碎是用来将其他的 handler 加入到 pipeline 中的。
ChannelInitializer的initChannel 被调用时,会往 pipeline 中添加我们最开始指定的 LoggingHandler 和添加一个 ServerBootstrapAcceptor,这样,我们的NioServerSocketChannel在遇到READ事件时,就能由ServerBootstrapAcceptor进行处理
而initChannel方法何时会被调用?
服务端的NioSocketChannel与之类似,区别在于它只需要将 EchoClient 类中的 ChannelInitializer 实例加进来就可以了,它的 ChannelInitializer 中添加了两个 handler,LoggingHandler 和 EchoClientHandler
抱歉,又要留悬念了…本节没有介绍 handler 的向后传播,就是一个 handler 处理完了以后,怎么传递给下一个 handler 来处理?initChannel方法何时会被调用?这里还是跟rigister()有关…你会发现,我们在一直向rigiser()靠…它的重要性不用多说了吧
小结:
- DefaultChannelPipeline()中实例化了两个初始Handler
在
initAndRegister()
中的init()中- 通过实例化的channel拿到pipeline
- 向管道中添加ChannelInitializer,其中的方法定义了但未执行添加自定义Handler的逻辑。此时,自定义的Handler未实例化。
- 向该Channel对应Loop的Selector的工作队列中添加任务,执行管道添加Acceptor处理器
异步原理_Future&Promise
Future、Promise是netty实现异步核心类
在echo中
ChannelFuture f = b.connect(HOST, PORT).sync();
f.channel().closeFuture().sync();
ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
关于 Future 接口,用得最多的就是在使用 Java 的线程池 ThreadPoolExecutor 的时候了。在 submit 一个任务到线程池中的时候,返回的就是一个 Future 实例,通过它来获取提交的任务的执行状态和最终的执行结果,我们最常用它的 isDone()
和 get()
方法。
netty中,Future的一切操作都是由某个线程执行时调用的。
先看Java中的Future接口:
public interface Future<V> {
// 取消该任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否已取消
boolean isCancelled();
// 任务是否已完成
boolean isDone();
// 阻塞获取任务执行结果
V get() throws InterruptedException, ExecutionException;
// 带超时参数的获取任务执行结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Netty 中的 Future 接口继承了 JDK 中的 Future 接口,同时添加了一些方法:
public interface Future<V> extends java.util.concurrent.Future<V> {
//是否成功 只有IO操作完成时才返回true
boolean isSuccess();
// IO操作发生异常时,返回导致IO操作失败的原因,如果没有异常,返回null
Throwable cause();
//注册监听者,future完成时,进行回调(通知所有监听者)
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
//阻塞等待任务结束,如果任务失败,抛出导致任务失败的异常
Future<V> sync() throws InterruptedException;
// 不可中断的 sync() 即原子操作
Future<V> syncUninterruptibly();
// 阻塞等待任务结束,和 sync() 功能是一样的,不过如果任务失败,它不会抛出执行过程中的异常
//sync() 内部会先调用 await() 方法,等 await() 方法返回后,会检查下这个任务是否失败,如果失败,重新将导致失败的异常抛出来。await()本身不抛出异常
Future<V> await() throws InterruptedException;
// 不可中断的 await 即原子操作
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
// 不阻塞获取执行结果,如未完成,返回null
V getNow();
// 取消任务执行,如果取消成功,任务会因为 CancellationException 异常而导致失败
boolean cancel(boolean mayInterruptIfRunning);
}
说明:
Netty 自己实现的 Future 继承了 JDK 的 Future,新增了 sync()
和await()
用于阻塞等待,还加了 Listeners让我们可以不用阻塞等待结果,只要任务结束去回调 Listener 就可以了,那么我们就不一定要主动调用 isDone()
来获取状态,或通过 get()
阻塞方法来获取值。
Future 其子接口ChannelFuture:
public interface ChannelFuture extends Future<Void> {
Channel channel();
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();
@Override
ChannelFuture await() throws InterruptedException;
@Override
ChannelFuture awaitUninterruptibly();
// 用来标记该 future 是 void 的,
// 这样就不允许使用 addListener(...), sync(), await() 以及它们的几个重载方法
boolean isVoid();
}
发现其不过是 Future接口的方法重写,返回 Future改为ChannelFuture。
再来看看Promise接口:
public interface Promise<V> extends Future<V> {
//以下是Promise对Future新增的方法~
// 标记该future 成功及设置其执行结果,并且会通知所有的 listeners
Promise<V> setSuccess(V result);
//与setSuccess(V result)一样,区别是操作失败不抛异常,返回false[失败是指此方法本身的失败]
boolean trySuccess(V result);
// 标记该 future 失败,及其失败原因。
Promise<V> setFailure(Throwable cause);
//不抛异常,返回false
boolean tryFailure(Throwable cause);
//标记该 future 不可以被取消
boolean setUncancellable();
//以下为重写Future的方法,只是将返回值改成Promise<V>
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();
@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}
Promise 接口继承自 Future 接口,重点添加了上述几个方法,可以人工设置 future 的执行成功与失败,并通知所有监听的 listener。是Neety实现异步返回结果与回调的主要实现组件。
是否注意到Future是没有提供set方法的而Promise才有。
Promise 提供的 setSuccess(V result)
或 setFailure(Throwable t)
将来会被某个执行任务的线程在执行完成以后调用,同时那个线程在调用 setSuccess(result) 或 setFailure(t) 后会回调 listeners 的回调函数。
一旦 setSuccess(…) 或 setFailure(…) 后,那些 await() 或 sync() 的线程也会从等待中返回。
Promise 实现类是DefaultPromise类,该类十分重要,Future 的 listener 机制也是由它实现的。
DefaultPromise其重要属性:
//...
// 保存执行结果
private volatile Object result;
// 执行任务的线程池
private final EventExecutor executor;
// 监听者 由addListener(..)定义
private Object listeners;
// 等待这个 promise 的线程数(即调用sync()/await()进行等待的线程数量)
private short waiters;
// 是否正在唤醒等待线程,用于防止重复执行唤醒,不然会重复执行listeners的回调方法
private boolean notifyingListeners;
//......
DefaultPromise其核心方法:
@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
return this;
}
throw new IllegalStateException("complete already: " + this);
}
@Override
public boolean trySuccess(V result) {
return setSuccess0(result);
}
@Override
public Promise<V> setFailure(Throwable cause) {
if (setFailure0(cause)) {
return this;
}
throw new IllegalStateException("complete already: " + this, cause);
}
@Override
public boolean tryFailure(Throwable cause) {
return setFailure0(cause);
}
@Override
//get方法会将setSuccess(V result)的result返回
public V get() throws InterruptedException, ExecutionException {
...
}
这里明显验证了上面set和try一个抛异常一个不抛异常的说法。
我们随便点进一个最终设置值的方法看看:
private boolean setValue0(Object objResult) {
//先不管这里的CAS操作
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
if (checkNotifyWaiters()) {
//通知监听者任务结束以及结果
//这也是Future回调的实现,这里就不展开了
notifyListeners();
}
return true;
}
return false;
}
可以发现,在Promise设置完结果后,会立即回调函数,通知监听者结果
实例
package com.junsir.netty.source;
import io.netty.util.concurrent.*;
public class TestFutureAndPromise {
public static void main(String[] args) {
//创建线程池
EventExecutor executor = new DefaultEventExecutor();
//创建DefaultPromise实例
Promise promise = new DefaultPromise(executor);
//注册监听者
promise.addListener(new GenericFutureListener<Future<Integer>>() {
@Override
//future完成后被回调
public void operationComplete(Future future) throws Exception {
if (future.isSuccess()){
//.get()为juc下的Future接口定义的方法,用于拿到任务的结果,具体是由DefaultPromise实现。
System.out.println("任务结束,结果为:" + future.get());
}else {
//任务不成功
System.out.println("任务失败 原因:" + future.cause());
}
}
}).addListener(new GenericFutureListener<Future<Integer>>() {
@Override
public void operationComplete(Future future) throws Exception {
System.out.println("任务结束...");
}
});
//提交任务给线程池,五秒后执行结束,设置promise任务
executor.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//设置promise的结果,该语句会回调监听者
promise.setSuccess(12345);
}
});
//阻塞等待执行结果
try {
promise.sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
任务结束,结果为:12345
任务结束…
进一步说明netty的Future:
promise 代表的 future 是不需要和线程池或线程搅在一起的,它不负责具体任务的执行,future /promise只关心任务是否结束以及任务的执行结果,至于是哪个线程或哪个线程池执行的任务,future 其实是不关心的。
.rigister()_1
这里就是大头了,很多核心操作都在这里
回到EchoClient 中的 connect() —>connect–>doResolveAndConnect–>initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//讲过,完成了channel实例化的一系列事宜
channel = channelFactory.newChannel();
//讲过,配置了channel的pipeline的handler等
init(channel);
} catch (Throwable t) {
//...
}
//.rigistr()..
ChannelFuture regFuture = config().group().register(channel);
//...
initAndRegister() 这个方法我们已经接触过两次了,前面介绍了 Channel 的实例化,实例化过程中,会执行 Channel 内部 Unsafe 和 Pipeline 的实例化,以及在上面 init(channel) 方法中,会往 pipeline 中添加 handler。pipeline 此时是 head+channelnitializer+tail但并未实例化
ChannelFuture regFuture = config().group().register(channel);
我们说了,register 这一步是非常关键的,它发生在 channel 实例化以后,大家回忆一下当前 channel 中的一些情况:
实例化了 JDK 底层的 Channel,设置了非阻塞,实例化了 Unsafe,实例化了 Pipeline,同时往 pipeline 中添加了 head、tail 以及一个 ChannelInitializer 实例
group() 方法会返回前面实例化的 NioEventLoopGroup 的实例,然后调用其 register(channel) 方法:
跟踪发现NioEventLoopGroup的父类MultithreadEventLoopGroup中的register(channel)
public ChannelFuture register(Channel channel) {
//next用于选择线程池中的线程即loop实例
return next().register(channel);
}
在SingleThreadEventLoop中找到register():
@Override
public ChannelFuture register(Channel channel) {
//实例化一个Promise,传入channel,整体传入register
return register(new DefaultChannelPromise(channel, this));
}
//于同类,为上面所调用
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 这里可以看出promise关联了channel,channel 持有 Unsafe 实例,register操作就封装在Unsafe中
promise.channel().unsafe().register(this, promise);
return promise;
}
跟踪发现在AbstractChannel中实现register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//...
// 将eventLoop 实例设置给这个 channel,从此这个 channel 就是有 eventLoop 的了
//别晕,这里eventLoop是SingleThreadEventLoop,这个方法的入口.register(this, promise),this就是eventLoop
//再往上说eventLoop是实例好的Group通过.next()选出来的...忘了往上翻翻
//注意,后续所有异步操作都将交给这个eventLoop执行!!即每个channel实例都持有一个Group中的Loop来完成channel数据的读取!
AbstractChannel.this.eventLoop = eventLoop;
//如果发起 register 动作的线程就是 eventLoop 实例中的线程,那么直接调用 register0(promise)
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//否则,提交任务给 eventLoop,eventLoop 中的线程会负责调用 register0(promise)
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
//...
}
}
}
到这里,我们要明白,NioEventLoop 中是还没有实例化 Thread 实例的!也就是我们rigister到现在,NioEventLoop还没有线程执行操作!
这几步涉及到了好几个类:NioEventLoop、Promise、Channel、Unsafe 等,大家要仔细理清楚它们的关系。
提交到 eventLoop 以后,就直接返回 promise 实例了,剩下的register0 是异步操作,其操作结果会在 promise 中设置。
我们这边先不继续往里分析 register0(promise) 方法,先把 NioEventLoop 中的线程Thread到底出现在哪里说清楚,即.execute()提交了任务后何时执行,然后再回来继续介绍这个 register0 方法。
小结一下,到此为止
- Channel实例持有对应Loop(Channel与Loop关系建立)
- 提交任务给Loop中的任务队列,当前线程不负责真正Rigister0操作。在
eventLoop.execute()
中会另开线程进行真正Rigister0
execute()
这个代码在父类 SingleThreadEventExecutor 中
@Override
public void execute(Runnable task) {
//...
//见过,判断调用execute的线程是否在EventLoop中
boolean inEventLoop = inEventLoop();
//将task(Runable)添加到任务队列
addTask(task);
//如果调用execute的线程否在EventLoop中,启动线程
if (!inEventLoop) {
//启动Loop中线程
startThread();
//如果上面语句执行完毕后线程是关闭状态
if (isShutdown()) {
boolean reject = false;
try {
//将任务已从队列中移除,并将是否执行拒绝策略设为true
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
//前面说过,默认拒绝策略抛出异常
reject();
}
}
}
//....
}
这里我们终于发现EventLoop启动的线程的代码了,进入 startThread()
private void startThread() {
//如果线程未启动
if (state == ST_NOT_STARTED) {
//CAS更改状态
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
//线程执行成功标志
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
//CAS改回线程状态
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
点进doStartThread()
private void doStartThread() {
//Thread来了!
assert thread == null;
// 这里的 executor 大家是不是有点熟悉的感觉,它就是一开始我们实例化 NioEventLoop 的时候传进来的 ThreadPerTaskExecutor 的实例。它是每次来一个任务,创建一个线程的那种 executor。
// 一旦我们调用它的 execute 方法,它就会创建一个新的线程,所以这里终于会创建 Thread 实例
executor.execute(new Runnable() {
@Override
public void run() {
//将 “executor” 中创建的这个线程设置为 NioEventLoop 的线程
thread = Thread.currentThread();
//判断是否打断状态
if (interrupted) {
thread.interrupt();
}
//线程状态标志
boolean success = false;
//...
try {
//...执行 SingleThreadEventExecutor 的 run() 方法
//它在 NioEventLoop 中实现了
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
//死循环直至该线程已关闭。
//...
for (;;) {
if (confirmShutdown()) {
break;
}
}
//...
} finally {
try {
//执行清理工作
cleanup();
} finally {
//....
});
}
上面线程启动以后,会执行 NioEventLoop 中的 run() 方法,这是一个非常重要的方法,根据我们之前所学,它必然是个死循环,需要不断地做 select 操作和轮询 taskQueue 队列,也即,select 与轮询是个死循环,在channel register()过程中发送,也即,在客户端连接到服务端或服务端绑定端口号后开始!
我们具体看看该run()方法
@Override
protected void run() {
//注意这是个无限循环。
for (;;) {
try {
// selectStrategy 选择策略相关事宜
//它有三个值,一个是 CONTINUE 一个是 SELECT BUSY_WAIT(无用)且都是int类型
try {
//hasTasks判断任务队列中是否有任务
//如果有任务,selectNow(),代表立马轮询,因为该方法不会阻塞,必能拿到任务
//如果无任务,那应该返回CONTINUE,继续这个循环,即不断查看队列是否有任务!
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
// fall-through to SELECT since the busy-wait is not supported with NIO
//这个忽略即可
case SelectStrategy.BUSY_WAIT:
//如果是SELECT,当任务队列为空,即无任务执行时,执行select
case SelectStrategy.SELECT:
//进行选择,注意这个SELECT是阻塞的。
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
}
}
//...
// 默认ioRatio 的值是 50,表示IO操作所占的比重。
final int ioRatio = this.ioRatio;
//如果ioRatio为100,先执行IO再执行select
if (ioRatio == 100) {
try {
//先执行读写操作
processSelectedKeys();
} finally {
// 最后执行非 IO 任务,也就是 taskQueue 中的任务
runAllTasks();
}
} else {
// 如果 ioRatio 不是 100,那么根据 IO 操作耗时,限制非 IO 操作耗时
final long ioStartTime = System.nanoTime();
try {
//执行IO操作
processSelectedKeys();
} finally {
//根据 IO 操作消耗的时间
//计算执行非 IO 操作(runAllTasks)可以用多少时间
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
上面这段代码是 NioEventLoop 的核心
我们这里先不要去关心 select(oldWakenUp)、processSelectedKeys() 方法和 runAllTasks(…) 方法的细节,只要先理解它们分别做什么事情就可以了。
回过神来,我们前面在 register 的时候提交了 register 任务给 NioEventLoop,这是 NioEventLoop 接收到的第一个任务,所以这里会实例化 Thread 并且启动,然后进入到 NioEventLoop 中的 run 方法。NioEventLoop Run起来随之无限循环轮询操作,执行提交过来的 register 任务或其他任务
小结一下:
在eventLoop.execute(runnable)中
- 添加任务。
startThread()
启动线程。doStartThread()
,重线程池中搞一个线程出来- 该线程作为Loop的thread属性
- 执行NioEventLoop的run方法
- 该run方法是无限循环,剩下主要就是示意图中的三步曲了
.rigister()_2
再说回到前面的 register0(promise) 方法,我们知道,这个 register 任务进入到了 NioEventLoop 的 taskQueue 中,然后会启动 NioEventLoop 中的线程,该线程会轮询这个 taskQueue,然后从任务队列中取出并执行这个 register0 任务。
private void register0(ChannelPromise promise) {
try {
// ...
boolean firstRegistration = neverRegistered;
//进行 JDK 底层的操作,在JDK层面上将Channel注册到Selector
doRegister();
neverRegistered = false;
registered = true;
//至此 状态registered 代表register完毕
//涉及到ChannelInitializer 的 init(channel)
pipeline.invokeHandlerAddedIfNeeded();
//设置promise状态为sucess代表异步绑定/连接完毕。
//即通知提交 register 操作的线程
safeSetSuccess(promise);
//这一步是通知所有关心注册事件的线程,这上面结合形成回调
pipeline.fireChannelRegistered();
//如果channel是活跃状态
if (isActive()) {
//如果该channel是第一次执行register,通知所有监听者channel已经处于活跃状态。
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
//如果该channel之前已经注册过,立马让cheannel去监听OP_Read事件
beginRead();
}
}
} catch (Throwable t) {
//...
}
}
跟踪doRegister():
// JDK 中 Channel 的 register 方法:
// public final SelectionKey register(Selector sel, int ops, Object att) {...}
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//这里执行了JDK的register
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}
可以看到,上面代码做了 JDK 底层的 register 操作,将 SocketChannel(或 ServerSocketChannel) 注册到 Selector 中,并且可以看到,这里的监听集合设置为了 0,也就是什么都不监听。当然,也就意味着,后续一定有某个地方会需要修改这个 selectionKey 的监听集合。
跟踪pipeline.invokeHandlerAddedIfNeeded()方法,这里应该就是调用initChannel()设置管道handler的过程了
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
}
继续跟踪,会执行到 pipeline 中 ChannelInitializer 实例的 handlerAdded 方法,在这里会执行它的 init(context) 方法:
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
然后我们看下 initChannel(ctx),这里终于来了我们之前介绍过的 initChannel方法:
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
try {
// 1. 将把我们自定义的 handlers 添加到 pipeline 中
initChannel((C) ctx.channel());
} catch (Throwable cause) {
...
} finally {
// 2. 将 ChannelInitializer 实例从 pipeline 中删除
remove(ctx);
}
return true;
}
return false;
}
我们前面也说过,ChannelInitializer 的 initChannel()被执行以后,那么其内部添加的 handlers 会进入到 pipeline 中,然后上面的 finally 块中将 ChannelInitializer 的实例从 pipeline 中删除,那么此时 pipeline 就算建立起来了。
即,Handler的建立在Register0当中。
回到register0()方法
//进入这一句
pipeline.fireChannelRegistered();
来到DefaultChannelPipeline:
@Override
public final ChannelPipeline fireChannelRegistered() {
//invokeChannelRegistered代表往管道添加ChannelRegistered事件
//此处传入的next的head
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
继续跟踪:来到AbstractChannelHandlerContext类
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
//点进去是return channel().eventLoop();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
//channel出现ChannelRegistered事件后
//由head-handler去处理ChannelRegistered
//处理逻辑在head-handler中已经预先定义
//放在任务队列中去执行
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
跟踪next.invokeChannelRegistered(),看看head-handler是怎么处理ChannelRegistered事件的。
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
//ChannelRegistered是入站事件
// handler() 方法此时会返回 head,然后执行
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
我们去看 head 的 channelRegistered 方法:
来到HeadContex类,可以发现其定义了一系列针对channel事件的处理函数,这里不展开了
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
//对事件处理
invokeHandlerAddedIfNeeded();
//向后传播入站事件事件
ctx.fireChannelRegistered();
}
跟踪fireChannelRegistered()
在AbstractChannelHandlerContext中实现
@Override
public ChannelHandlerContext fireChannelRegistered() {
//findContextInbound() 方法会沿着 pipeline 找到下一个 Inbound 类型的 handler
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
//被上面调用
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
注意:pipeline.fireChannelRegistered() 是将 channelRegistered 事件抛到 pipeline 中,pipeline 中的 handlers 准备处理该事件。而 context.fireChannelRegistered() 是一个 handler 处理完了以后,向后传播给下一个 handler。
它们两个的方法名字是一样的,但是来自于不同的类。
至此。register 操作算是真正完成了
由于后续的 connect 或 bind 也会进入到同一个 eventLoop 的 queue 中,所以一定是会先 register 成功,才会执行 connect 或 bind
下面就比较轻松了,看看connect 或 bind是怎么玩的
.connect()源码分析
connect() —>connect–>doResolveAndConnect–>initA//ndRegister
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
//这里完成了一系列rigister操作,将连接所需要的channel实例化
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
//那么在这里就要完成连接操作了
//将经过rigister后的channel实例、ip、端口、Promise实例进行传入
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
return promise;
}
}
跟踪doResolveAndConnect0…一路到AbstactChannel的connect方法
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
//由channel的pipeline完成连接操作
return pipeline.connect(remoteAddress);
}
跟踪到DefaultChannelPipeline类中
public final ChannelFuture connect(SocketAddress remoteAddress) {
//connect 这种 Outbound 类型的操作,是从 pipeline 的 tail 开始的向前推进的
//反之亦然,服务端的bind属于InBound是从head开始的
return tail.connect(remoteAddress);
}
继续跟踪到AbstractChannelHandlerContext(前面说过,它是Handler的真实类型)
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
//...
//找到处理连接的handler,其就在head中,从tail->head
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
//返回channel对应的loop
EventExecutor executor = next.executor();
//如果调用当前的线程的loop线程
if (executor.inEventLoop()) {
//跟踪..
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
//...
return promise;
}
跟踪invokeConnect()
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
//Handler是否存在
//如果已有,执行Handler的connect方法
if (invokeHandler()) {
try {
//执行handler的connect方法
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
//重复执行
connect(remoteAddress, localAddress, promise);
}
}
那么。我们需要的操作在HeadContext中进行了定义
//我们同时还发现了和服务端有关的bind方法
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) {
unsafe.connect(remoteAddress, localAddress, promise);
}
不管是bind还是connect,最终都是用unsafe实例来执行JDK中的bind还是connect,那么,channel实例持有unsafe的意义我们也看到了。其就是用来封装JDK的NIO code的
我们再看看UnSafe中的connect操作:
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// ...
try {
//...
boolean wasActive = isActive();
//这一步就做JDK底层的,这一步会做 JDK 底层的 SocketChannel connect
//然后设置 interestOps 为 SelectionKey.OP_CONNECT
if (doConnect(remoteAddress, localAddress)) {
//netty对JDK中连接成功的进一步处理
fulfillConnectPromise(promise, wasActive);
} else {
//连接失败处理
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// 连接超时处理...
//...
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
在上一节介绍的 register 操作中,channel 已经 register 到了 selector 上,只不过将 interestOps 设置为了 0,也就是什么都不监听。
而在上面的 doConnect 方法中,我们看到它在调用底层的 connect 方法后,会设置 interestOps 为 SelectionKey.OP_CONNECT
。即让channel去监听connect事件
netty请求接收说明
服务器启动后肯定是要接受客户端请求,并返回客户端想要的信息的
从之前有关.handler()中,我们得知,服务器最终实例化了NioServerSocketChannel,向其管道pipeline中添加了一个 Acceptor用于处理客户端的连接。
我们也知道, NioServerSocketChannel 将自己注册到了 boss 单例线程池上,即 EventLoop 。
EventLoop 的作用是一个死循环,而这个循环中做 3 件事情:
- 有条件的等待 Nio 事件。
- 处理 Nio 事件。
- 处理消息队列中的任务。
服务端在bind之后该loop开始生效,客户端在connect之后开始生效。
注册之后的Channel后续的IO的操作都由Loop的Selector轮询到、加入TaskQueue中,由Loop线程进行处理。
那么,一个请求进来之后,服务端Channel会产生Read事件,Channel中的Handler中的Acceptor接收到这个请求,会为该请求创建一个新的 NioSocketChannel,注册到WorkGroup上一个EventLoop 上,并注册 selecot Read 事件。
服务器BossGroup轮询 Accept 事件,获取事件后调用 unsafe 的 read 方法读取请求信息。
还没有评论,来说两句吧...