RocketMQ源码分析【二】NameServer启动流程源码分析

落日映苍穹つ 2022-09-12 04:54 403阅读 0赞

文章目录

    • 入口
    • NamesrvController是如何被创建出来的
    • 启动NameServer
      • 定时任务线程池
      • 构造Netty相关线程池
      • 启动Netty服务器
    • 总结

入口

NameServer 源码的启动入口在类NamesrvStartup中的main方法中
在这里插入图片描述
可以看到方法封装的非常好,每个方法干一件事。核心方法就是这两个

  1. NamesrvController controller = createNamesrvController(args);
  2. start(controller);

NamesrvController是如何被创建出来的

我们看看createNamesrvController()方法具体干了什么

  1. public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
  2. System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
  3. //PackageConflictDetect.detectFastjson();
  4. Options options = ServerUtil.buildCommandlineOptions(new Options());
  5. commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
  6. if (null == commandLine) {
  7. System.exit(-1);
  8. return null;
  9. }
  10. final NamesrvConfig namesrvConfig = new NamesrvConfig();
  11. final NettyServerConfig nettyServerConfig = new NettyServerConfig();
  12. nettyServerConfig.setListenPort(9876);
  13. // 启动namesrv 参数带c代表读取配置文件
  14. if (commandLine.hasOption('c')) {
  15. String file = commandLine.getOptionValue('c');
  16. if (file != null) {
  17. // 读取文件
  18. InputStream in = new BufferedInputStream(new FileInputStream(file));
  19. properties = new Properties();
  20. properties.load(in);
  21. MixAll.properties2Object(properties, namesrvConfig);
  22. MixAll.properties2Object(properties, nettyServerConfig);
  23. namesrvConfig.setConfigStorePath(file);
  24. System.out.printf("load config properties file OK, %s%n", file);
  25. in.close();
  26. }
  27. }
  28. // 如果启动参数带p 打印NameSrv启动参数
  29. if (commandLine.hasOption('p')) {
  30. InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
  31. MixAll.printObjectProperties(console, namesrvConfig);
  32. MixAll.printObjectProperties(console, nettyServerConfig);
  33. System.exit(0);
  34. }
  35. // 获取配置类容填充到namesrvConfig
  36. MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
  37. // 如果rocketmqHome 打印异常退出
  38. if (null == namesrvConfig.getRocketmqHome()) {
  39. System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
  40. System.exit(-2);
  41. }
  42. // 日志相关配置
  43. LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
  44. JoranConfigurator configurator = new JoranConfigurator();
  45. configurator.setContext(lc);
  46. lc.reset();
  47. configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
  48. // 打印NameServer所有配置信息
  49. log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
  50. MixAll.printObjectProperties(log, namesrvConfig);
  51. MixAll.printObjectProperties(log, nettyServerConfig);
  52. final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
  53. // remember all configs to prevent discard
  54. controller.getConfiguration().registerConfig(properties);
  55. return controller;
  56. }

上面源码比较长,都是解析参数构造一些配置类,这里作一个总结
在这里插入图片描述
这里构造的配置类我们没看下面的代码也不知道是干嘛的,所以我们先暂时不作过多分析,只用知道是解析了启动脚本的配置参数或设置的配置文件然后生成了对应的配置类

启动NameServer

启动NameServer的核心入口方法是

  1. start(controller);

我们看看这段代码的实现

  1. public static NamesrvController start(final NamesrvController controller) throws Exception {
  2. if (null == controller) {
  3. throw new IllegalArgumentException("NamesrvController is null");
  4. }
  5. // 初始化核心逻辑
  6. boolean initResult = controller.initialize();
  7. if (!initResult) {
  8. controller.shutdown();
  9. System.exit(-3);
  10. }
  11. // 注册钩子函数 在JVM进程关闭时,优雅地释放netty服务、线程池等资源
  12. Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
  13. @Override
  14. public Void call() throws Exception {
  15. controller.shutdown();
  16. return null;
  17. }
  18. }));
  19. controller.start();
  20. return controller;
  21. }

可以看到这段方法中也没有什么核心实现。只是注册了一个钩子函数,在JVM关闭的时候去释放一些线程池、Netty服务器资源等。核心实现还是封装在方法

  1. boolean initResult = controller.initialize();

中,我们来重点看看这个方法的实现

  1. public boolean initialize() {
  2. this.kvConfigManager.load();
  3. // 构建Netty服务器
  4. this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
  5. // netty 工作线程
  6. this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
  7. // 将工作线程池给netty
  8. this.registerProcessor();
  9. // 定时检测不活跃的Broker
  10. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  11. @Override
  12. public void run() {
  13. NamesrvController.this.routeInfoManager.scanNotActiveBroker();
  14. }
  15. }, 5, 10, TimeUnit.SECONDS);
  16. // 创建定时任务--每个10分钟打印一遍KV配置
  17. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  18. @Override
  19. public void run() {
  20. NamesrvController.this.kvConfigManager.printAllPeriodically();
  21. }
  22. }, 1, 10, TimeUnit.MINUTES);
  23. if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
  24. // Register a listener to reload SslContext
  25. try {
  26. fileWatchService = new FileWatchService(
  27. new String[] {
  28. TlsSystemConfig.tlsServerCertPath,
  29. TlsSystemConfig.tlsServerKeyPath,
  30. TlsSystemConfig.tlsServerTrustCertPath
  31. },
  32. new FileWatchService.Listener() {
  33. boolean certChanged, keyChanged = false;
  34. @Override
  35. public void onChanged(String path) {
  36. if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
  37. log.info("The trust certificate changed, reload the ssl context");
  38. reloadServerSslContext();
  39. }
  40. if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
  41. certChanged = true;
  42. }
  43. if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
  44. keyChanged = true;
  45. }
  46. if (certChanged && keyChanged) {
  47. log.info("The certificate and private key changed, reload the ssl context");
  48. certChanged = keyChanged = false;
  49. reloadServerSslContext();
  50. }
  51. }
  52. private void reloadServerSslContext() {
  53. ((NettyRemotingServer) remotingServer).loadSslContext();
  54. }
  55. });
  56. } catch (Exception e) {
  57. log.warn("FileWatchService created error, can't load the certificate dynamically");
  58. }
  59. }
  60. return true;
  61. }

定时任务线程池

上面代码容易看懂的就是也是开了很多定时器任务线程,比如去打印Namespace的 k v配置
在这里插入图片描述
开启一个定时任务去扫描不活跃的broker,并剔除下线

  1. NamesrvController.this.routeInfoManager.scanNotActiveBroker();
  2. public void scanNotActiveBroker() {
  3. Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
  4. while (it.hasNext()) {
  5. Entry<String, BrokerLiveInfo> next = it.next();
  6. long last = next.getValue().getLastUpdateTimestamp();
  7. // 当前时间超过上一次心跳时间 + 默认120s
  8. if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
  9. RemotingUtil.closeChannel(next.getValue().getChannel());
  10. it.remove();
  11. log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
  12. // 任务Broker下线 移除掉Broker
  13. this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
  14. }
  15. }
  16. }

构造Netty相关线程池

然后我们重点看看与Netty相关的代码

  1. this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
  2. public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
  3. final ChannelEventListener channelEventListener) {
  4. super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
  5. //用于启动NIO服务端的辅助启动类
  6. this.serverBootstrap = new ServerBootstrap();
  7. this.nettyServerConfig = nettyServerConfig;
  8. this.channelEventListener = channelEventListener;
  9. int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
  10. if (publicThreadNums <= 0) {
  11. publicThreadNums = 4;
  12. }
  13. this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
  14. private AtomicInteger threadIndex = new AtomicInteger(0);
  15. @Override
  16. public Thread newThread(Runnable r) {
  17. return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
  18. }
  19. });
  20. if (useEpoll()) {
  21. this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
  22. private AtomicInteger threadIndex = new AtomicInteger(0);
  23. @Override
  24. public Thread newThread(Runnable r) {
  25. return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
  26. }
  27. });
  28. this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
  29. private AtomicInteger threadIndex = new AtomicInteger(0);
  30. private int threadTotal = nettyServerConfig.getServerSelectorThreads();
  31. @Override
  32. public Thread newThread(Runnable r) {
  33. return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
  34. }
  35. });
  36. } else {
  37. // 工作线程
  38. this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
  39. private AtomicInteger threadIndex = new AtomicInteger(0);
  40. @Override
  41. public Thread newThread(Runnable r) {
  42. return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
  43. }
  44. });
  45. this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
  46. private AtomicInteger threadIndex = new AtomicInteger(0);
  47. private int threadTotal = nettyServerConfig.getServerSelectorThreads();
  48. @Override
  49. public Thread newThread(Runnable r) {
  50. return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
  51. }
  52. });
  53. }
  54. loadSslContext();
  55. }

可以看到这里主要用于初始化Netty的一些相关线程,其中也会判断NIO是使用通用的(Common)NIO实现(NioEventLoopGroup),还是使用Linux封装的NIO实现(EpollEventLoopGroup)

可以看到这里只是创建了一些Netty相关的线程池,并没有启动Netty服务器。

启动Netty服务器

做了这么多准备工作,最后启动Netty服务器的实现还是这里启动的
在这里插入图片描述

  1. public void start() throws Exception {
  2. this.remotingServer.start();
  3. if (this.fileWatchService != null) {
  4. this.fileWatchService.start();
  5. }
  6. }
  7. @Override
  8. public void start() {
  9. this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
  10. nettyServerConfig.getServerWorkerThreads(),
  11. new ThreadFactory() {
  12. private AtomicInteger threadIndex = new AtomicInteger(0);
  13. @Override
  14. public Thread newThread(Runnable r) {
  15. return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
  16. }
  17. });
  18. prepareSharableHandlers();
  19. ServerBootstrap childHandler =
  20. this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
  21. .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
  22. .option(ChannelOption.SO_BACKLOG, 1024)//用于临时存放已完成三次握手的请求的队列的最大长度
  23. .option(ChannelOption.SO_REUSEADDR, true)
  24. .option(ChannelOption.SO_KEEPALIVE, false)// 是否启用心跳保活机制
  25. .childOption(ChannelOption.TCP_NODELAY, true)//启用或关于Nagle算法。如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true关闭Nagle算法;如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送。默认为false
  26. .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())// 定义传输的系统缓冲区buf的大小
  27. .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())// 定义接收的系统缓冲区buf的大小
  28. .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
  29. .childHandler(new ChannelInitializer<SocketChannel>() {
  30. @Override
  31. public void initChannel(SocketChannel ch) throws Exception {
  32. ch.pipeline()
  33. .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
  34. .addLast(defaultEventExecutorGroup,
  35. encoder,
  36. new NettyDecoder(),
  37. new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
  38. connectionManageHandler,
  39. serverHandler
  40. );
  41. }
  42. });
  43. if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
  44. childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
  45. }
  46. try {
  47. ChannelFuture sync = this.serverBootstrap.bind().sync();
  48. InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
  49. this.port = addr.getPort();
  50. } catch (InterruptedException e1) {
  51. throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
  52. }
  53. if (this.channelEventListener != null) {
  54. this.nettyEventExecutor.start();
  55. }
  56. // 定时扫描请求超过1秒的请求
  57. this.timer.scheduleAtFixedRate(new TimerTask() {
  58. @Override
  59. public void run() {
  60. try {
  61. NettyRemotingServer.this.scanResponseTable();
  62. } catch (Throwable e) {
  63. log.error("scanResponseTable exception", e);
  64. }
  65. }
  66. }, 1000 * 3, 1000);
  67. }

这里都是Netty相关的代码,无非就是绑定一些编解码Handler,一些心跳、认证Handler等,再就是Socket相关的一些配置。

总结

至此NameServer计算启动完成了,然后监听端口9876,等待Broker注册,上面的一些Netty相关的代码代码我们并没有去重点分析,因为这算是Netty相关的部分。总之我们对NameServer的启动有了大致的了解,后续我们就看看Broker是如何注册到NameServer上去的,Broker是如何与NameServer进行通信的

发表评论

表情:
评论列表 (有 0 条评论,403人围观)

还没有评论,来说两句吧...

相关阅读