RocketMq4.7源码解析之二(name,broker启动流程)

Love The Way You Lie 2022-11-30 04:06 277阅读 0赞

文章目录

  • broker启动流程
    • NameServer启动流程
      • mian
        • createNamesrvController
          • NamesrvController构造
      • start
        • initialize
        • start

在这里插入图片描述

broker启动流程

broker配置了配置文件地址,以及环境变量配置文件所在目录
在这里插入图片描述
vf

NameServer启动流程

mian

  1. public static NamesrvController main0(String[] args) {
  2. try {
  3. NamesrvController controller = createNamesrvController(args);
  4. start(controller);
  5. String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
  6. log.info(tip);
  7. System.out.printf("%s%n", tip);
  8. return controller;
  9. } catch (Throwable e) {
  10. e.printStackTrace();
  11. System.exit(-1);
  12. }
  13. return null;
  14. }

createNamesrvController

  1. public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
  2. //设置版本号
  3. System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
  4. //PackageConflictDetect.detectFastjson();
  5. Options options = ServerUtil.buildCommandlineOptions(new Options());
  6. //解析命令行
  7. commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
  8. if (null == commandLine) {
  9. //命令行为null,则返回
  10. System.exit(-1);
  11. return null;
  12. }
  13. //names业务参数
  14. //初次加载时,从环境变量中获取ROCKETMQ_HOME并赋值
  15. final NamesrvConfig namesrvConfig = new NamesrvConfig();
  16. //网络参数
  17. final NettyServerConfig nettyServerConfig = new NettyServerConfig();
  18. //设置监听端口
  19. nettyServerConfig.setListenPort(9876);
  20. //解析-c configFile 通过,c 命令指定配置文件的路径。
  21. //使用“ 一属性名属性值”,例如一listenPort 9876 。
  22. if (commandLine.hasOption('c')) {
  23. String file = commandLine.getOptionValue('c');
  24. if (file != null) {
  25. //读取配置文件
  26. InputStream in = new BufferedInputStream(new FileInputStream(file));
  27. properties = new Properties();
  28. properties.load(in);
  29. //解析配置文件属性
  30. MixAll.properties2Object(properties, namesrvConfig);
  31. MixAll.properties2Object(properties, nettyServerConfig);
  32. //设置配置文件路径
  33. namesrvConfig.setConfigStorePath(file);
  34. System.out.printf("load config properties file OK, %s%n", file);
  35. in.close();
  36. }
  37. }
  38. //-p 打印当前加载配置属性
  39. if (commandLine.hasOption('p')) {
  40. InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
  41. MixAll.printObjectProperties(console, namesrvConfig);
  42. MixAll.printObjectProperties(console, nettyServerConfig);
  43. System.exit(0);
  44. }
  45. //解析命令行属性填充到对象
  46. MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
  47. //ROCKETMQ_HOME环境变量非空判断
  48. if (null == namesrvConfig.getRocketmqHome()) {
  49. System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
  50. System.exit(-2);
  51. }
  52. //日志相关
  53. LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
  54. JoranConfigurator configurator = new JoranConfigurator();
  55. configurator.setContext(lc);
  56. lc.reset();
  57. configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
  58. log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
  59. //log打印name和netty配置属性
  60. MixAll.printObjectProperties(log, namesrvConfig);
  61. MixAll.printObjectProperties(log, nettyServerConfig);
  62. final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
  63. // remember all configs to prevent discard
  64. //使用配置文件属性,注册并覆盖默认属性
  65. controller.getConfiguration().registerConfig(properties);
  66. return controller;
  67. }
NamesrvController构造
  1. public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
  2. this.namesrvConfig = namesrvConfig;
  3. this.nettyServerConfig = nettyServerConfig;
  4. this.kvConfigManager = new KVConfigManager(this);
  5. //路由信息管理,内部有topic,broker,集群,以及心跳之类的信息
  6. this.routeInfoManager = new RouteInfoManager();
  7. this.brokerHousekeepingService = new BrokerHousekeepingService(this);
  8. //合并对象成员变量属性,并保存对象
  9. this.configuration = new Configuration(
  10. log,
  11. this.namesrvConfig, this.nettyServerConfig
  12. );
  13. this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
  14. }
  15. public Configuration(InternalLogger log, Object... configObjects) {
  16. this.log = log;
  17. if (configObjects == null || configObjects.length == 0) {
  18. return;
  19. }
  20. //合并对象成员变量属性,并保存对象
  21. for (Object configObject : configObjects) {
  22. registerConfig(configObject);
  23. }
  24. }
  25. public Configuration registerConfig(Object configObject) {
  26. try {
  27. readWriteLock.writeLock().lockInterruptibly();
  28. try {
  29. //对象成员变量值填充到prop
  30. Properties registerProps = MixAll.object2Properties(configObject);
  31. //属性覆盖allConfigs
  32. merge(registerProps, this.allConfigs);
  33. //记录对象
  34. configObjectList.add(configObject);
  35. } finally {
  36. readWriteLock.writeLock().unlock();
  37. }
  38. } catch (InterruptedException e) {
  39. log.error("registerConfig lock error");
  40. }
  41. return this;
  42. }

start

  1. public static NamesrvController start(final NamesrvController controller) throws Exception {
  2. if (null == controller) {
  3. throw new IllegalArgumentException("NamesrvController is null");
  4. }
  5. //初始化controller
  6. boolean initResult = controller.initialize();
  7. //初始化失败,退出
  8. if (!initResult) {
  9. controller.shutdown();
  10. System.exit(-3);
  11. }
  12. //注册JVM 钩子函数并启动服务器,
  13. //在JVM 进程关闭之前,先将线程池关闭,及时释放资源。 tomcat也有使用这种方法释放资源
  14. Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
  15. @Override
  16. public Void call() throws Exception {
  17. controller.shutdown();
  18. return null;
  19. }
  20. }));
  21. //启动
  22. controller.start();
  23. return controller;
  24. }

initialize

  1. public boolean initialize() {
  2. //加载KV 配置
  3. this.kvConfigManager.load();
  4. //创建NettyServer 网络处理对象,创建线程池和信号量
  5. this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
  6. //创建Netty业务线程池
  7. this.remotingExecutor =
  8. Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
  9. this.registerProcessor();
  10. //NameServer 每隔10s扫描一次Broker , 移除处于不激活状态的Broker
  11. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  12. @Override
  13. public void run() {
  14. NamesrvController.this.routeInfoManager.scanNotActiveBroker();
  15. }
  16. }, 5, 10, TimeUnit.SECONDS);
  17. //NameServer每隔10 分钟打印一次KV 配置。
  18. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  19. @Override
  20. public void run() {
  21. NamesrvController.this.kvConfigManager.printAllPeriodically();
  22. }
  23. }, 1, 10, TimeUnit.MINUTES);
  24. if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
  25. // Register a listener to reload SslContext
  26. try {
  27. fileWatchService = new FileWatchService(
  28. new String[] {
  29. TlsSystemConfig.tlsServerCertPath,
  30. TlsSystemConfig.tlsServerKeyPath,
  31. TlsSystemConfig.tlsServerTrustCertPath
  32. },
  33. new FileWatchService.Listener() {
  34. boolean certChanged, keyChanged = false;
  35. @Override
  36. public void onChanged(String path) {
  37. if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
  38. log.info("The trust certificate changed, reload the ssl context");
  39. reloadServerSslContext();
  40. }
  41. if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
  42. certChanged = true;
  43. }
  44. if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
  45. keyChanged = true;
  46. }
  47. if (certChanged && keyChanged) {
  48. log.info("The certificate and private key changed, reload the ssl context");
  49. certChanged = keyChanged = false;
  50. reloadServerSslContext();
  51. }
  52. }
  53. private void reloadServerSslContext() {
  54. ((NettyRemotingServer) remotingServer).loadSslContext();
  55. }
  56. });
  57. } catch (Exception e) {
  58. log.warn("FileWatchService created error, can't load the certificate dynamically");
  59. }
  60. }
  61. return true;
  62. }

start

  1. public void start() {
  2. //Worker线程池
  3. this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
  4. nettyServerConfig.getServerWorkerThreads(),
  5. new ThreadFactory() {
  6. private AtomicInteger threadIndex = new AtomicInteger(0);
  7. @Override
  8. public Thread newThread(Runnable r) {
  9. return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
  10. }
  11. });
  12. //初始化handler和编码器
  13. prepareSharableHandlers();
  14. ServerBootstrap childHandler =
  15. //设置EventLoopGroup,来接受和处理新的连接,以进行事件的处理,如接受新连接以及读/写数据
  16. //server socket接收了新建连接后,会把connection socket放到workerGroup中进行处理
  17. this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
  18. //epoll还是nio channel
  19. .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
  20. .option(ChannelOption.SO_BACKLOG, 1024)
  21. .option(ChannelOption.SO_REUSEADDR, true)
  22. .option(ChannelOption.SO_KEEPALIVE, false)
  23. .childOption(ChannelOption.TCP_NODELAY, true)
  24. .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
  25. .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
  26. //使用指定的端口设置套接字地址
  27. .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
  28. .childHandler(new ChannelInitializer<SocketChannel>() {
  29. @Override
  30. public void initChannel(SocketChannel ch) throws Exception {
  31. //分别添加2个handler,处理请求
  32. ch.pipeline()
  33. .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
  34. .addLast(defaultEventExecutorGroup,
  35. //设置编码器
  36. encoder,
  37. //设置解码器
  38. new NettyDecoder(),
  39. /*
  40. 当连接空闲时间太长时,将会触发一个IdleStateEvent 事件.
  41. ChannelInboundHandler中重写userEvent-Triggered()方法来处理该IdleStateEvent 事件
  42. */
  43. new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
  44. connectionManageHandler,
  45. serverHandler
  46. );
  47. }
  48. });
  49. if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
  50. childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
  51. }
  52. try {
  53. //异步地绑定服务器;调用 sync()方法阻塞等待直到绑定完成
  54. ChannelFuture sync = this.serverBootstrap.bind().sync();
  55. //记录port
  56. InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
  57. this.port = addr.getPort();
  58. } catch (InterruptedException e1) {
  59. throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
  60. }
  61. if (this.channelEventListener != null) {
  62. this.nettyEventExecutor.start();
  63. }
  64. this.timer.scheduleAtFixedRate(new TimerTask() {
  65. @Override
  66. public void run() {
  67. try {
  68. NettyRemotingServer.this.scanResponseTable();
  69. } catch (Throwable e) {
  70. log.error("scanResponseTable exception", e);
  71. }
  72. }
  73. }, 1000 * 3, 1000);
  74. }

发表评论

表情:
评论列表 (有 0 条评论,277人围观)

还没有评论,来说两句吧...

相关阅读