netty的引导程序ServerBootStrap

墨蓝 2022-07-13 01:39 222阅读 0赞

BootStrap在netty的应用程序中负责引导服务器和客户端。netty包含了两种不同类型的引导:

  1. 使用服务器的ServerBootStrap,用于接受客户端的连接以及为已接受的连接创建子通道。
  2. 用于客户端的BootStrap,不接受新的连接,并且是在父通道类完成一些操作。

ServerBootStrap的运行原理

下面先看一下这两个引导类的类继承图:
服务端的ServerBootstrap类继承图:
服务端的ServerBootstrap类继承图

客户端的Bootstrap类继承图:
客户端的Bootstrap类继承图

下面我们分析一下服务端的ServerBootstrap的源码来分析引导流程:
首先给出一个很简单的基于netty的聊天室的服务端的实例:

  1. package netty.cookbook.simplechat;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelOption;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. import utils.LogUtil;
  9. /** * Created by louyuting on 16/12/8. * 启动服务端 */
  10. public class SimpleChatServer {
  11. private int port;
  12. public SimpleChatServer(int port){
  13. this.port = port;
  14. }
  15. public void run() throws Exception{
  16. //NioEventLoopGroup是用来处理IO操作的多线程事件循环器
  17. //boss用来接收进来的连接
  18. EventLoopGroup bossGroup = new NioEventLoopGroup();
  19. //用来处理已经被接收的连接;
  20. EventLoopGroup workerGroup = new NioEventLoopGroup();
  21. try{
  22. //是一个启动NIO服务的辅助启动类
  23. ServerBootstrap sBootstrap = new ServerBootstrap();
  24. //These EventLoopGroup's are used to handle all the events and IO for ServerChannel and Channel's.
  25. //为bootstrap设置acceptor的EventLoopGroup和client的EventLoopGroup
  26. //这些EventLoopGroups用于处理所有的IO事件
  27. //?这里为什么设置两个group呢?
  28. sBootstrap.group(bossGroup, workerGroup)
  29. .channel(NioServerSocketChannel.class)
  30. .childHandler(new SimpleChatServerInitializer())
  31. .option(ChannelOption.SO_BACKLOG, 128)
  32. .childOption(ChannelOption.SO_KEEPALIVE, true);
  33. LogUtil.log_debug("SimpleChatServer 启动了");
  34. //绑定端口,开始接收进来的连接
  35. ChannelFuture future = sBootstrap.bind(port).sync();
  36. //等待服务器socket关闭
  37. //在本例子中不会发生,这时可以关闭服务器了
  38. future.channel().closeFuture().sync();
  39. } finally {
  40. //
  41. workerGroup.shutdownGracefully();
  42. bossGroup.shutdownGracefully();
  43. LogUtil.log_debug("SimpleChatServer 关闭了");
  44. }
  45. }
  46. public static void main(String[] args) throws Exception {
  47. new SimpleChatServer(8080).run();
  48. }
  49. }

下面以上面的实例来分析ServerBootStrap的运行流程:

1.实例化ServerBootstrap

ServerBootstrap sBootstrap = new ServerBootstrap();
首先就是通过new关键字实例化一个ServerBootStrap对象。

2.配置ServerBootstrap的group()

  1. //boss用来接收进来的连接
  2. EventLoopGroup bossGroup = new NioEventLoopGroup();
  3. //用来处理已经被接收的连接;
  4. EventLoopGroup workerGroup = new NioEventLoopGroup();
  5. sBootstrap.group(bossGroup, workerGroup)
  6. .channel(NioServerSocketChannel.class)
  7. .childHandler(new SimpleChatServerInitializer())
  8. .option(ChannelOption.SO_BACKLOG, 128)
  9. .childOption(ChannelOption.SO_KEEPALIVE, true);

从上面可知,上面创建了两个EventLoopGroup,分别是boss和worker,然后配置到ServerBootstrap的group中。我们先来看看ServerBootstrap.group(),这个函数有两个重载的实现:

  1. public ServerBootstrap group(EventLoopGroup group) {
  2. return group(group, group);
  3. }
  4. public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
  5. super.group(parentGroup);
  6. if (childGroup == null) {
  7. throw new NullPointerException("childGroup");
  8. }
  9. if (this.childGroup != null) {
  10. throw new IllegalStateException("childGroup set already");
  11. }
  12. this.childGroup = childGroup;
  13. return this;
  14. }

可知,这里如果调用时,如果只传入了一个EventLoopGroup,最后也会调用group(EventLoopGroup parentGroup, EventLoopGroup childGroup)。
这里传入的两个EventLoopGroup分别叫做parentGroup和childGroup。其实我觉得更加好理解的方式应该叫boss和worker。boss这个EventLoopGroup作为一个acceptor负责接收来自客户端的请求,然后分发给worker这个EventLoopGroup来处理所有的事件event和channel的IO。
查看上面的源码,我们可知,首先调用的是
super.group(parentGroup);
这个方法调用了ServerBootstrap的父类AbstractBootstrap的group(EventLoopGroup group) 方法,下面看看AbstractBootstrap中方法的定义:

  1. public B group(EventLoopGroup group) {
  2. //传参不能为空
  3. if (group == null) {
  4. throw new NullPointerException("group");
  5. }
  6. //AbstractBootstrap的group属性如果非空,就抛出异常,说明this.group是单例的
  7. if (this.group != null) {
  8. throw new IllegalStateException("group set already");
  9. }
  10. this.group = group;
  11. return (B) this;
  12. }

从上面的源码AbstractBootstrap里面的这个this.group属性是单例的。我们先看看该属性的定义:
volatile EventLoopGroup group;
这里定义的是volatile的变量,提供可见性,而且变量是包级别的权限。最后将传入的EventLoopGroup赋值给AbstractBootstrap的group属性。

后面的ServerBootstrap的childGroup也是同样的实现,不过childGroup是单例的赋值给了ServerBootstrap的childGroup属性。ServerBootstrap中的定义如下:
private volatile EventLoopGroup childGroup;

3.配置ServerBootstrap的channel()

这里以聊天室的实例为例,
ServerBootstrap.channel(NioServerSocketChannel.class)
实际上是调用的是AbstractBootstrap里面的channel()函数,下面先粘贴出该方法的源码:

  1. public B channel(Class<? extends C> channelClass) {
  2. if (channelClass == null) {
  3. throw new NullPointerException("channelClass");
  4. }
  5. return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
  6. }

这里传入的是一个Class对象,根据传入的不同的Class对象,实例化不同的Channel,主要是有两种代表NIO和OIO的对象:NioServerSocketChannel和OioServerSocketChannel。在函数体的最后调用了:
channelFactory(new ReflectiveChannelFactory<C>(channelClass));
参数里面实例化了一个 ReflectiveChannelFactory对象,这个对象实现了ChannelFactory这个接口的。
下面进入这个函数,最后调用的是:

  1. public B channelFactory(ChannelFactory<? extends C> channelFactory) {
  2. //判断传参非空
  3. if (channelFactory == null) {
  4. throw new NullPointerException("channelFactory");
  5. }
  6. //AbstractBootstrap的channelFactory属性非空
  7. if (this.channelFactory != null) {
  8. throw new IllegalStateException("channelFactory set already");
  9. }
  10. //传递给channelFactory属性
  11. this.channelFactory = channelFactory;
  12. return (B) this;
  13. }

可知,最后是把new的ReflectiveChannelFactory传递给了AbstractBootstrap的channelFactory属性,该属性定义如下:
private volatile ChannelFactory<? extends C> channelFactory;

4.配置ServerBootstrap的childHandler(ChannelHandler childHandler)

该函数的主要作用是设置channelHandler来处理客户端的请求的channel的IO。
这里我们一般都用ChannelInitializer这个类的实例或则继承自这个类的实例,在我的聊天室程序中实例如下:
ServerBootstrap.childHandler(new SimpleChatServerInitializer())
这里我是通过新建类SimpleChatServerInitializer继承自ChannelInitializer。具体的代码如下:

  1. /** * Created by louyuting on 16/12/8. * 用来增加多个的处理类到ChannelPipeline上:包括编码,解码,SimpleChatServerHandler */
  2. public class SimpleChatServerInitializer extends ChannelInitializer<SocketChannel>{
  3. @Override
  4. protected void initChannel(SocketChannel ch) throws Exception {
  5. ChannelPipeline pipeline = ch.pipeline();
  6. pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
  7. pipeline.addLast("decoder", new StringDecoder());
  8. pipeline.addLast("encoder", new StringEncoder());
  9. pipeline.addLast("handler", new SimpleChatServerHandler());
  10. System.out.println("SimpleChatClient:" + ch.remoteAddress()+"连接上");
  11. }
  12. }

我们再看看ChannelInitializer这个类的继承图可知ChannelInitializer其实就是继承自ChannelHandler的
ChannelInitializer类图
可知,这个类其实就是往pipeline中添加了很多的channelHandler。在ServerBootstrap.childHandler(ChannelHandler childHandler)中的实现是:

  1. public ServerBootstrap childHandler(ChannelHandler childHandler) {
  2. if (childHandler == null) {
  3. throw new NullPointerException("childHandler");
  4. }
  5. this.childHandler = childHandler;
  6. return this;
  7. }

由最后一句可知,其实就是讲传入的childHandler赋值给ServerBootstrap的childHandler属性。

5.配置ServerBootstrap的option(ChannelOption option, T value)

这里调用的是父类的AbstractBootstrap的option()方法,源码如下:

  1. public <T> B option(ChannelOption<T> option, T value) {
  2. if (option == null) {
  3. throw new NullPointerException("option");
  4. }
  5. if (value == null) {
  6. synchronized (options) {
  7. options.remove(option);
  8. }
  9. } else {
  10. synchronized (options) {
  11. options.put(option, value);
  12. }
  13. }
  14. return (B) this;
  15. }

其中最重要的一行代码就是:
options.put(option, value);
这里用到了options这个参数,在AbstractBootstrap的定义如下:
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
可知是私有变量,而且是一个Map集合。这个变量主要是设置TCP连接中的一些可选项,而且这些属性是作用于每一个连接到服务器被创建的channel。

6.配置ServerBootstrap的childOption(ChannelOption childOption, T value)

这里调用的是父类的ServerBootstrap的childOption()方法,源码如下:

  1. public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
  2. if (childOption == null) {
  3. throw new NullPointerException("childOption");
  4. }
  5. if (value == null) {
  6. synchronized (childOptions) {
  7. childOptions.remove(childOption);
  8. }
  9. } else {
  10. synchronized (childOptions) {
  11. childOptions.put(childOption, value);
  12. }
  13. }
  14. return this;
  15. }

这个函数功能与option()函数几乎一样,唯一的区别是该属性设定只作用于被acceptor(也就是boss EventLoopGroup)接收之后的channel。

7.配置ServerBootstrap的绑定端口和启动:

上面介绍的都是关于ServerBootstrap的一些配置的绑定,下面介绍在启动服务器时候做的工作:

通过聊天室的服务端程序我们可以知道,绑定端口并启动服务程序如下:

  1. //绑定端口,开始接收进来的连接
  2. ChannelFuture future = sBootstrap.bind(port).sync();

bind()函数是AbstractBootstrap里面的方法,首先分析bind(port)函数的功能:直接看源码:

  1. /** * 1. *创建一个通道并绑定到当前BootStrap */
  2. public ChannelFuture bind(int inetPort) {
  3. return bind(new InetSocketAddress(inetPort));
  4. }
  5. /** * 2. *创建一个通道并绑定到当前BootStrap */
  6. public ChannelFuture bind(SocketAddress localAddress) {
  7. validate();//
  8. if (localAddress == null) {
  9. throw new NullPointerException("localAddress");
  10. }
  11. return doBind(localAddress);
  12. }

从上面的源码可知bind(port)最后会调用bind(SocketAddress localAddress)函数,里面的 validate()函数会先校验AbstractBootstrap的成员属性group(也就是boss)和channelFactory非空。

最后就是调用doBind(localAddress);方法了,这里才是bind()函数的核心:看源码分析这个函数做了什么工作:

  1. private ChannelFuture doBind(final SocketAddress localAddress) {
  2. //注册
  3. final ChannelFuture regFuture = initAndRegister();
  4. final Channel channel = regFuture.channel();
  5. if (regFuture.cause() != null) {
  6. return regFuture;
  7. }
  8. if (regFuture.isDone()) {
  9. // At this point we know that the registration was complete and successful.
  10. ChannelPromise promise = channel.newPromise();
  11. doBind0(regFuture, channel, localAddress, promise);
  12. return promise;
  13. } else {
  14. // Registration future is almost always fulfilled already, but just in case it's not.
  15. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
  16. regFuture.addListener(new ChannelFutureListener() {
  17. @Override
  18. public void operationComplete(ChannelFuture future) throws Exception {
  19. Throwable cause = future.cause();
  20. if (cause != null) {
  21. // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
  22. // IllegalStateException once we try to access the EventLoop of the Channel.
  23. promise.setFailure(cause);
  24. } else {
  25. // Registration was successful, so set the correct executor to use.
  26. // See https://github.com/netty/netty/issues/2586
  27. promise.registered();
  28. doBind0(regFuture, channel, localAddress, promise);
  29. }
  30. }
  31. });
  32. return promise;
  33. }
  34. }

从定义可知,这个函数是一个private函数,也就是类内部调用的。首先看
final ChannelFuture regFuture = initAndRegister();

里面initAndRegister()函数的功能:

  1. final ChannelFuture initAndRegister() {
  2. //new 了一个新的channel
  3. Channel channel = null;
  4. try {
  5. //调用了AbstractBootstrap的channelFactory属性新建了一个在ServerBootStrap中配置的channel类型,在我的聊天室程序中是NioServerSocketChannel。
  6. channel = channelFactory.newChannel();
  7. //初始化通道
  8. init(channel);
  9. } catch (Throwable t) {
  10. if (channel != null) {
  11. // channel can be null if newChannel crashed (eg SocketException("too many open files"))
  12. channel.unsafe().closeForcibly();
  13. }
  14. // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
  15. return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
  16. }
  17. ChannelFuture regFuture = config().group().register(channel);
  18. if (regFuture.cause() != null) {
  19. if (channel.isRegistered()) {
  20. channel.close();
  21. } else {
  22. channel.unsafe().closeForcibly();
  23. }
  24. }
  25. // If we are here and the promise is not failed, it's one of the following cases:
  26. // 1) If we attempted registration from the event loop, the registration has been completed at this point.
  27. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
  28. // 2) If we attempted registration from the other thread, the registration request has been successfully
  29. // added to the event loop's task queue for later execution.
  30. // i.e. It's safe to attempt bind() or connect() now:
  31. // because bind() or connect() will be executed *after* the scheduled registration task is executed
  32. // because register(), bind(), and connect() are all bound to the same thread.
  33. return regFuture;
  34. }

在initAndRegister()函数中,首先调用了AbstractBootstrap的channelFactory.newChannel()方法新建了一个在ServerBootStrap中配置的channel类型,在我的聊天室程序中NioServerSocketChannel。

然后调用init()函数初始化这个通道(该函数在ServerBootstrap中实现):完整的源码就不给出的,主要说说里面做了什么任务:

(1).获取ServerBootStrap中options和attrs的配置,然后设置在新创建的channel对象中,代码如下:

  1. final Map<ChannelOption<?>, Object> options = options0();
  2. synchronized (options) {
  3. channel.config().setOptions(options);
  4. }
  5. final Map<AttributeKey<?>, Object> attrs = attrs0();
  6. synchronized (attrs) {
  7. for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
  8. @SuppressWarnings("unchecked")
  9. AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
  10. channel.attr(key).set(e.getValue());
  11. }
  12. }

(2).获取新new的channel的pipeline,然后添加handler到channel中。具体看源码注释

  1. //获取channel的pipeline
  2. ChannelPipeline p = channel.pipeline();
  3. /** currentChildGroup也就是worker的EventLoopGroup*/
  4. final EventLoopGroup currentChildGroup = childGroup;
  5. /**currentChildHandler也就是在ServerBootStrap中添加的childHandler*/
  6. final ChannelHandler currentChildHandler = childHandler;
  7. /** 获取worker角色的EventLoopGroup的option设置和attr设置并配置到currentChildOptions和currentChildAttrs中 */
  8. final Entry<ChannelOption<?>, Object>[] currentChildOptions;
  9. final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
  10. synchronized (childOptions) {
  11. currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
  12. }
  13. synchronized (childAttrs) {
  14. currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
  15. }
  16. /** 向new的channel中添加handler **/
  17. p.addLast(new ChannelInitializer<Channel>() {
  18. @Override
  19. public void initChannel(Channel ch) throws Exception {
  20. final ChannelPipeline pipeline = ch.pipeline();
  21. //获取ServerBootStrap中添加的childhandler,然后添加到channel中
  22. ChannelHandler handler = config.handler();
  23. if (handler != null) {
  24. pipeline.addLast(handler);
  25. }
  26. // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
  27. // In this case the initChannel(...) method will only be called after this method returns. Because
  28. // of this we need to ensure we add our handler in a delayed fashion so all the users handler are
  29. // placed in front of the ServerBootstrapAcceptor.
  30. //新建一个线程,将channelHandler的子类之new的ServerBootstrapAcceptor添加到pipeline中。
  31. ch.eventLoop().execute(new Runnable() {
  32. @Override
  33. public void run() {
  34. pipeline.addLast(new ServerBootstrapAcceptor(
  35. currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
  36. }
  37. });
  38. }
  39. });

下面再看看ServerBootstrapAcceptor这个类里面做了什么?ServerBootstrapAcceptor是ServerBootstrap里面的静态类,这个类本身也是一个channelHandler,我们分析它的channelRead(ChannelHandlerContext ctx, Object msg)方法,主要还是做了以下几个工作:

  1. 向channel中添加handler;
  2. 向worker这个EventLoopGroup中注册当前channel

分析完init()函数,我们继续看initAndRegister()函数中的其余部分:
接下来往boss 这个EventLoopGroup中注册当前channel。

最后在doBind()函数中,都会调用doBind0()这个函数,这个函数源码如下:

  1. private static void doBind0(
  2. final ChannelFuture regFuture, final Channel channel,
  3. final SocketAddress localAddress, final ChannelPromise promise) {
  4. // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
  5. // the pipeline in its channelRegistered() implementation.
  6. channel.eventLoop().execute(new Runnable() {
  7. @Override
  8. public void run() {
  9. if (regFuture.isSuccess()) {
  10. /********/
  11. channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
  12. } else {
  13. promise.setFailure(regFuture.cause());
  14. }
  15. }
  16. });
  17. }

从上面的源码可知,最后都会调用channel.bind()函数。

发表评论

表情:
评论列表 (有 0 条评论,222人围观)

还没有评论,来说两句吧...

相关阅读