RocketMQ源码分析【三】Broker启动流程源码分析

曾经终败给现在 2022-09-12 12:54 432阅读 0赞

文章目录

    • 入口
    • 核心流程
    • BrokerController是如何被创建的
    • Broker是如何注册到NameServer的
      • registerBrokerAll
      • doRegisterBrokerAll
      • registerBrokerAll()
    • 总结

入口

Broker启动的源码入口是BrokerStartup.java
在这里插入图片描述

核心流程

主要流程和NameServer启动有些类似

  1. 创建BrokerController
  2. 启动BrokerController

    public static void main(String[] args) {

    1. start(createBrokerController(args));
    2. }
    3. public static BrokerController start(BrokerController controller) {
    4. try {
    5. controller.start();
    6. String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
    7. + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
    8. if (null != controller.getBrokerConfig().getNamesrvAddr()) {
    9. tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
    10. }
    11. log.info(tip);
    12. System.out.printf("%s%n", tip);
    13. return controller;
    14. } catch (Throwable e) {
    15. e.printStackTrace();
    16. System.exit(-1);
    17. }
    18. return null;
    19. }

BrokerController是如何被创建的

  1. public static BrokerController createBrokerController(String[] args) {
  2. System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
  3. if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
  4. NettySystemConfig.socketSndbufSize = 131072;
  5. }
  6. // 设置socket 发送缓冲大小
  7. if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
  8. NettySystemConfig.socketRcvbufSize = 131072;
  9. }
  10. try {
  11. //PackageConflictDetect.detectFastjson();
  12. // 解析启动参数 args
  13. Options options = ServerUtil.buildCommandlineOptions(new Options());
  14. commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
  15. new PosixParser());
  16. if (null == commandLine) {
  17. System.exit(-1);
  18. }
  19. // Broker核心配置类
  20. final BrokerConfig brokerConfig = new BrokerConfig();
  21. // Netty服务器配置
  22. final NettyServerConfig nettyServerConfig = new NettyServerConfig();
  23. // Netty客户端配置TLS
  24. final NettyClientConfig nettyClientConfig = new NettyClientConfig();
  25. // 设置Netty客户端是否使用
  26. nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
  27. String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
  28. // 设置Netty服务器监听端口为 10911
  29. nettyServerConfig.setListenPort(10911);
  30. // Broker 消息配置类
  31. final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
  32. // broker是否为 slave broker 如果是 则将 消息占用内存百度分 减10%, 默认40%,超过内存的消息将置换出内存
  33. if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
  34. int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
  35. messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
  36. }
  37. // 读取解析启动配置信息
  38. if (commandLine.hasOption('c')) {
  39. String file = commandLine.getOptionValue('c');
  40. if (file != null) {
  41. configFile = file;
  42. InputStream in = new BufferedInputStream(new FileInputStream(file));
  43. properties = new Properties();
  44. properties.load(in);
  45. properties2SystemEnv(properties);
  46. MixAll.properties2Object(properties, brokerConfig);
  47. MixAll.properties2Object(properties, nettyServerConfig);
  48. MixAll.properties2Object(properties, nettyClientConfig);
  49. MixAll.properties2Object(properties, messageStoreConfig);
  50. BrokerPathConfigHelper.setBrokerConfigPath(file);
  51. in.close();
  52. }
  53. }
  54. MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
  55. // 检查ROCKETMQ_HOME环境变量, 如果没有直接退出
  56. if (null == brokerConfig.getRocketmqHome()) {
  57. System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
  58. System.exit(-2);
  59. }
  60. // 获取Nameserver地址,;分割解析为数组 因为Nameserver可能为集群,所以有多个
  61. String namesrvAddr = brokerConfig.getNamesrvAddr();
  62. if (null != namesrvAddr) {
  63. try {
  64. String[] addrArray = namesrvAddr.split(";");
  65. for (String addr : addrArray) {
  66. RemotingUtil.string2SocketAddress(addr);
  67. }
  68. } catch (Exception e) {
  69. System.out.printf(
  70. "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
  71. namesrvAddr);
  72. System.exit(-3);
  73. }
  74. }
  75. // 判断Broker角色,作相应的处理 master有两种同步消息方式
  76. switch (messageStoreConfig.getBrokerRole()) {
  77. // 异步复制:生产者写入消息到Master后无需等待消息复制到slave即可返回,消息的复制由旁路线程进行异步复制
  78. case ASYNC_MASTER:
  79. /* 同步复制的方式,表现出来的是类似同步双写的策略。即Master写入完消息之后,需要等待Slave的复制成功。 注,这里只需要有一个Slave复制成功并成功应答即算成功,所以在这种模式下,如果有3个Slave,当生产者获得SEND_OK的应答时, 代表消息已经达到Maser和一个Slave(注:这里并不代表已经持久化到磁盘,而只能证明肯定到了PageCache, 是否能刷到磁盘取决于刷盘策略是同步刷盘还是异步刷盘),而还有两个Slave实际上是无法保证的, 并且这里也不支持配置,即不支持如“”同步半数以上”之类的设置 */
  80. case SYNC_MASTER:
  81. brokerConfig.setBrokerId(MixAll.MASTER_ID);
  82. break;
  83. /** * 消息发送的状态除了SEND_OK外,还会多出以下的状态: * FLUSH_SLAVE_TIMEOUT : 同步到slave等待超时,即一直等Slave上报同步的进度,但过了超时时间都没有成功没有同步完。 * SLAVE_NOT_AVAILABLE:当前没有可用的Slave。注:如果slave落后master实在太多, * 那个slave也会认为是暂时不可用的slave,直到它同步到接近的范围为止, * 这个不可用的阈值由broker配置haSlaveFallbehindMax(默认是1024 * 1024 * 256)决定 */
  84. case SLAVE:
  85. if (brokerConfig.getBrokerId() <= 0) {
  86. System.out.printf("Slave's brokerId must be > 0");
  87. System.exit(-3);
  88. }
  89. break;
  90. default:
  91. break;
  92. }
  93. // 是否基于dleger技术管理主从同步和CommitLog
  94. if (messageStoreConfig.isEnableDLegerCommitLog()) {
  95. brokerConfig.setBrokerId(-1);
  96. }
  97. // 设置HA监听端口
  98. messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
  99. // 日志相关
  100. LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
  101. JoranConfigurator configurator = new JoranConfigurator();
  102. configurator.setContext(lc);
  103. lc.reset();
  104. configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
  105. // 打印启动参数信息
  106. if (commandLine.hasOption('p')) {
  107. InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
  108. MixAll.printObjectProperties(console, brokerConfig);
  109. MixAll.printObjectProperties(console, nettyServerConfig);
  110. MixAll.printObjectProperties(console, nettyClientConfig);
  111. MixAll.printObjectProperties(console, messageStoreConfig);
  112. System.exit(0);
  113. } else if (commandLine.hasOption('m')) {
  114. InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
  115. MixAll.printObjectProperties(console, brokerConfig, true);
  116. MixAll.printObjectProperties(console, nettyServerConfig, true);
  117. MixAll.printObjectProperties(console, nettyClientConfig, true);
  118. MixAll.printObjectProperties(console, messageStoreConfig, true);
  119. System.exit(0);
  120. }
  121. log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
  122. MixAll.printObjectProperties(log, brokerConfig);
  123. MixAll.printObjectProperties(log, nettyServerConfig);
  124. MixAll.printObjectProperties(log, nettyClientConfig);
  125. MixAll.printObjectProperties(log, messageStoreConfig);
  126. // 基于上面的配置参数 创建 BrokerController
  127. final BrokerController controller = new BrokerController(
  128. brokerConfig,
  129. nettyServerConfig,
  130. nettyClientConfig,
  131. messageStoreConfig);
  132. // remember all configs to prevent discard
  133. controller.getConfiguration().registerConfig(properties);
  134. // 初始化 BrokerController
  135. boolean initResult = controller.initialize();
  136. if (!initResult) {
  137. controller.shutdown();
  138. System.exit(-3);
  139. }
  140. // 钩子线程 JVM退出时打印消费时间
  141. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  142. private volatile boolean hasShutdown = false;
  143. private AtomicInteger shutdownTimes = new AtomicInteger(0);
  144. @Override
  145. public void run() {
  146. synchronized (this) {
  147. log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
  148. if (!this.hasShutdown) {
  149. this.hasShutdown = true;
  150. long beginTime = System.currentTimeMillis();
  151. controller.shutdown();
  152. long consumingTimeTotal = System.currentTimeMillis() - beginTime;
  153. log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
  154. }
  155. }
  156. }
  157. }, "ShutdownHook"));
  158. return controller;
  159. } catch (Throwable e) {
  160. e.printStackTrace();
  161. System.exit(-1);
  162. }
  163. return null;
  164. }

上面代码可以看到主要是读取一些配置文件,根据配置文件设置不同的一些枚举状态,然后在上面代码中又有一个比较核心的方法

  1. boolean initResult = controller.initialize();

我们进去看看

下面代码很长,可以简单过一下,这里先作一个总结吧
下面的代码主要是配置启动一些线程池,然后作内存持久化到磁盘的操作,然后就是一些Netty的配置,大致就是干了这些事。这里不用太注意细节,源码每个点都看懂太难了,我们主要是挑核心的看,下面代码我也作了一些注释

  1. public boolean initialize() throws CloneNotSupportedException {
  2. // 从磁盘中初始化 topicConfigManager
  3. boolean result = this.topicConfigManager.load();
  4. // 加载消费偏移
  5. result = result && this.consumerOffsetManager.load();
  6. // 加载消费组信息
  7. result = result && this.subscriptionGroupManager.load();
  8. // 加载过滤器配置
  9. result = result && this.consumerFilterManager.load();
  10. if (result) {
  11. try {
  12. // 创建消息存储管理组件,默认使用commitLog 管理磁盘消息
  13. this.messageStore =
  14. new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
  15. this.brokerConfig);
  16. // 如果使用Dleger 管理 commitlog 初始化相关信息
  17. if (messageStoreConfig.isEnableDLegerCommitLog()) {
  18. DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
  19. ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
  20. }
  21. this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
  22. //load plugin
  23. MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
  24. this.messageStore = MessageStoreFactory.build(context, this.messageStore);
  25. this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
  26. } catch (IOException e) {
  27. result = false;
  28. log.error("Failed to initialize", e);
  29. }
  30. }
  31. result = result && this.messageStore.load();
  32. if (result) {
  33. // 启动Netty服务器
  34. this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
  35. NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
  36. fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
  37. this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
  38. this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
  39. this.brokerConfig.getSendMessageThreadPoolNums(),
  40. this.brokerConfig.getSendMessageThreadPoolNums(),
  41. 1000 * 60,
  42. TimeUnit.MILLISECONDS,
  43. this.sendThreadPoolQueue,
  44. new ThreadFactoryImpl("SendMessageThread_"));
  45. this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
  46. this.brokerConfig.getPullMessageThreadPoolNums(),
  47. this.brokerConfig.getPullMessageThreadPoolNums(),
  48. 1000 * 60,
  49. TimeUnit.MILLISECONDS,
  50. this.pullThreadPoolQueue,
  51. new ThreadFactoryImpl("PullMessageThread_"));
  52. this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
  53. this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
  54. this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
  55. 1000 * 60,
  56. TimeUnit.MILLISECONDS,
  57. this.replyThreadPoolQueue,
  58. new ThreadFactoryImpl("ProcessReplyMessageThread_"));
  59. this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
  60. this.brokerConfig.getQueryMessageThreadPoolNums(),
  61. this.brokerConfig.getQueryMessageThreadPoolNums(),
  62. 1000 * 60,
  63. TimeUnit.MILLISECONDS,
  64. this.queryThreadPoolQueue,
  65. new ThreadFactoryImpl("QueryMessageThread_"));
  66. this.adminBrokerExecutor =
  67. Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
  68. "AdminBrokerThread_"));
  69. this.clientManageExecutor = new ThreadPoolExecutor(
  70. this.brokerConfig.getClientManageThreadPoolNums(),
  71. this.brokerConfig.getClientManageThreadPoolNums(),
  72. 1000 * 60,
  73. TimeUnit.MILLISECONDS,
  74. this.clientManagerThreadPoolQueue,
  75. new ThreadFactoryImpl("ClientManageThread_"));
  76. this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
  77. this.brokerConfig.getHeartbeatThreadPoolNums(),
  78. this.brokerConfig.getHeartbeatThreadPoolNums(),
  79. 1000 * 60,
  80. TimeUnit.MILLISECONDS,
  81. this.heartbeatThreadPoolQueue,
  82. new ThreadFactoryImpl("HeartbeatThread_", true));
  83. this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
  84. this.brokerConfig.getEndTransactionThreadPoolNums(),
  85. this.brokerConfig.getEndTransactionThreadPoolNums(),
  86. 1000 * 60,
  87. TimeUnit.MILLISECONDS,
  88. this.endTransactionThreadPoolQueue,
  89. new ThreadFactoryImpl("EndTransactionThread_"));
  90. this.consumerManageExecutor =
  91. Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
  92. "ConsumerManageThread_"));
  93. this.registerProcessor();
  94. final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
  95. final long period = 1000 * 60 * 60 * 24;
  96. // 定时记录broker状态 默认 86400s 一次
  97. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  98. @Override
  99. public void run() {
  100. try {
  101. BrokerController.this.getBrokerStats().record();
  102. } catch (Throwable e) {
  103. log.error("schedule record error.", e);
  104. }
  105. }
  106. }, initialDelay, period, TimeUnit.MILLISECONDS);
  107. // 定时持久化 offset, 5s 持久化一次
  108. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  109. @Override
  110. public void run() {
  111. try {
  112. BrokerController.this.consumerOffsetManager.persist();
  113. } catch (Throwable e) {
  114. log.error("schedule persist consumerOffset error.", e);
  115. }
  116. }
  117. }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
  118. // 定时持久化filter, 10s同步一次
  119. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  120. @Override
  121. public void run() {
  122. try {
  123. BrokerController.this.consumerFilterManager.persist();
  124. } catch (Throwable e) {
  125. log.error("schedule persist consumer filter error.", e);
  126. }
  127. }
  128. }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
  129. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  130. @Override
  131. public void run() {
  132. try {
  133. BrokerController.this.protectBroker();
  134. } catch (Throwable e) {
  135. log.error("protectBroker error.", e);
  136. }
  137. }
  138. }, 3, 3, TimeUnit.MINUTES);
  139. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  140. @Override
  141. public void run() {
  142. try {
  143. BrokerController.this.printWaterMark();
  144. } catch (Throwable e) {
  145. log.error("printWaterMark error.", e);
  146. }
  147. }
  148. }, 10, 1, TimeUnit.SECONDS);
  149. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  150. @Override
  151. public void run() {
  152. try {
  153. log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
  154. } catch (Throwable e) {
  155. log.error("schedule dispatchBehindBytes error.", e);
  156. }
  157. }
  158. }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
  159. if (this.brokerConfig.getNamesrvAddr() != null) {
  160. this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
  161. log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
  162. } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
  163. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  164. @Override
  165. public void run() {
  166. try {
  167. BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
  168. } catch (Throwable e) {
  169. log.error("ScheduledTask fetchNameServerAddr exception", e);
  170. }
  171. }
  172. }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
  173. }
  174. if (!messageStoreConfig.isEnableDLegerCommitLog()) {
  175. if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
  176. if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
  177. this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
  178. this.updateMasterHAServerAddrPeriodically = false;
  179. } else {
  180. this.updateMasterHAServerAddrPeriodically = true;
  181. }
  182. } else {
  183. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  184. @Override
  185. public void run() {
  186. try {
  187. BrokerController.this.printMasterAndSlaveDiff();
  188. } catch (Throwable e) {
  189. log.error("schedule printMasterAndSlaveDiff error.", e);
  190. }
  191. }
  192. }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
  193. }
  194. }
  195. if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
  196. // Register a listener to reload SslContext
  197. try {
  198. fileWatchService = new FileWatchService(
  199. new String[] {
  200. TlsSystemConfig.tlsServerCertPath,
  201. TlsSystemConfig.tlsServerKeyPath,
  202. TlsSystemConfig.tlsServerTrustCertPath
  203. },
  204. new FileWatchService.Listener() {
  205. boolean certChanged, keyChanged = false;
  206. @Override
  207. public void onChanged(String path) {
  208. if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
  209. log.info("The trust certificate changed, reload the ssl context");
  210. reloadServerSslContext();
  211. }
  212. if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
  213. certChanged = true;
  214. }
  215. if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
  216. keyChanged = true;
  217. }
  218. if (certChanged && keyChanged) {
  219. log.info("The certificate and private key changed, reload the ssl context");
  220. certChanged = keyChanged = false;
  221. reloadServerSslContext();
  222. }
  223. }
  224. private void reloadServerSslContext() {
  225. ((NettyRemotingServer) remotingServer).loadSslContext();
  226. ((NettyRemotingServer) fastRemotingServer).loadSslContext();
  227. }
  228. });
  229. } catch (Exception e) {
  230. log.warn("FileWatchService created error, can't load the certificate dynamically");
  231. }
  232. }
  233. initialTransaction();
  234. initialAcl();
  235. initialRpcHooks();
  236. }
  237. return result;
  238. }

Broker是如何注册到NameServer的

比较关键的一个点就是Broker是如何注册到NameServer的,我们来看看这段代码吧
在创建出BrokerController后调用了start方法
在这里插入图片描述
我们进入这个方法会看到一个定时线程池

  1. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  2. @Override
  3. public void run() {
  4. try {
  5. BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
  6. } catch (Throwable e) {
  7. log.error("registerBrokerAll Exception", e);
  8. }
  9. }
  10. }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

registerBrokerAll

然后进入方法registerBrokerAll

  1. BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
  2. public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
  3. // Topic 相关配置信息
  4. TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
  5. // 处理 TopicConfig信息
  6. if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
  7. || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
  8. ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
  9. for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
  10. TopicConfig tmp =
  11. new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
  12. this.brokerConfig.getBrokerPermission());
  13. topicConfigTable.put(topicConfig.getTopicName(), tmp);
  14. }
  15. topicConfigWrapper.setTopicConfigTable(topicConfigTable);
  16. }
  17. // 判断是否进行Broker注册
  18. if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
  19. this.getBrokerAddr(),
  20. this.brokerConfig.getBrokerName(),
  21. this.brokerConfig.getBrokerId(),
  22. this.brokerConfig.getRegisterBrokerTimeoutMills())) {
  23. // 真正注册Broker关键入口
  24. doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
  25. }
  26. }

doRegisterBrokerAll

我们进入到doRegisterBrokerAll看看,这下面就是注册Broker到NameServer的核心代码,涉及到Netty网络请求相关的代码被封装在registerBrokerAll()方法中,这段代码看完了我们可以进去看看

  1. private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
  2. TopicConfigSerializeWrapper topicConfigWrapper) {
  3. // 将Broker 注册到所有NameServer上,NameServer可能是集群有多个,所以这里是List
  4. List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
  5. this.brokerConfig.getBrokerClusterName(),
  6. this.getBrokerAddr(),
  7. this.brokerConfig.getBrokerName(),
  8. this.brokerConfig.getBrokerId(),
  9. this.getHAServerAddr(),
  10. topicConfigWrapper,
  11. this.filterServerManager.buildNewFilterServerList(),
  12. oneway,
  13. this.brokerConfig.getRegisterBrokerTimeoutMills(),
  14. this.brokerConfig.isCompressedRegister());
  15. // 如果注册结果大于 0 则继续处理相关逻辑
  16. if (registerBrokerResultList.size() > 0) {
  17. RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
  18. if (registerBrokerResult != null) {
  19. if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
  20. this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
  21. }
  22. this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
  23. if (checkOrderConfig) {
  24. this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
  25. }
  26. }
  27. }
  28. }

registerBrokerAll()

这里我们来看看registerBrokerAll()方法

  1. public List<RegisterBrokerResult> registerBrokerAll(
  2. final String clusterName,
  3. final String brokerAddr,
  4. final String brokerName,
  5. final long brokerId,
  6. final String haServerAddr,
  7. final TopicConfigSerializeWrapper topicConfigWrapper,
  8. final List<String> filterServerList,
  9. final boolean oneway,
  10. final int timeoutMills,
  11. final boolean compressed) {
  12. // 用于存放每个远程注册的返回结果
  13. final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
  14. // NameServer的注册地址
  15. List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
  16. if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
  17. // 构建网络请求头
  18. final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
  19. requestHeader.setBrokerAddr(brokerAddr);
  20. requestHeader.setBrokerId(brokerId);
  21. requestHeader.setBrokerName(brokerName);
  22. requestHeader.setClusterName(clusterName);
  23. requestHeader.setHaServerAddr(haServerAddr);
  24. requestHeader.setCompressed(compressed);
  25. // 构建网络请求 body
  26. RegisterBrokerBody requestBody = new RegisterBrokerBody();
  27. requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
  28. requestBody.setFilterServerList(filterServerList);
  29. final byte[] body = requestBody.encode(compressed);
  30. final int bodyCrc32 = UtilAll.crc32(body);
  31. requestHeader.setBodyCrc32(bodyCrc32);
  32. // 多线程注册提高效率
  33. final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
  34. for (final String namesrvAddr : nameServerAddressList) {
  35. //自定义线程池
  36. brokerOuterExecutor.execute(() -> {
  37. try {
  38. RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
  39. if (result != null) {
  40. registerBrokerResultList.add(result);
  41. }
  42. log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
  43. } catch (Exception e) {
  44. log.warn("registerBroker Exception, {}", namesrvAddr, e);
  45. } finally {
  46. countDownLatch.countDown();
  47. }
  48. });
  49. }
  50. try {
  51. countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
  52. } catch (InterruptedException e) {
  53. }
  54. }
  55. return registerBrokerResultList;
  56. }

总结

我们这里就把Broker的启动流程分析完了,我们这里并没有一行代码一行代码去看,因为有些代码不知道实际的业务场景,我们也不知道是干嘛的,总的来说看了整个源码的一个核心流程,相关细节并没有很深入,主要是对Broker的启动有了一个大致的了解,后续如果需要修改或者遇到了相关问题,可以很好的找到入库,问题排查也会更轻松

发表评论

表情:
评论列表 (有 0 条评论,432人围观)

还没有评论,来说两句吧...

相关阅读