RocketMQ源码解析二(Broker启动流程)

本是古典 何须时尚 2022-09-12 07:46 72阅读 0赞

RocketMQ版本4.6.0,记录自己看源码的过程

启动流程

broker入口在BrokerStartup类中main()方法

  1. public static void main(String[] args) {
  2. // 1、解析并创建相关配置,根据配置创建BrokerController组件
  3. // 2、初始化BrokerController组件,,主要是创建一个netty服务器以及一些线程池
  4. // 3、启动BrokerController中的各个功能组件,开启注册和发送心跳的定时任务
  5. start(createBrokerController(args));
  6. }

嗯,看起来跟namesrv启动过程差不多。
接下来就看上面的3个步骤
首先是解析并创建BrokerController组件,也是有一个核心组件:

  1. /** * 解析并创建相关配置,根据配置创建BrokerController组件并初始化 */
  2. public static BrokerController createBrokerController(String[] args) {
  3. System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
  4. if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
  5. NettySystemConfig.socketSndbufSize = 131072;
  6. }
  7. if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
  8. NettySystemConfig.socketRcvbufSize = 131072;
  9. }
  10. try {
  11. // 解析命令行参数
  12. //PackageConflictDetect.detectFastjson();
  13. Options options = ServerUtil.buildCommandlineOptions(new Options());
  14. commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
  15. new PosixParser());
  16. if (null == commandLine) {
  17. System.exit(-1);
  18. }
  19. // Broker核心配置
  20. final BrokerConfig brokerConfig = new BrokerConfig();
  21. // Broker分别作为netty服务端(与生产者消费者通信)和客户端的配置(与namesrv通信)
  22. final NettyServerConfig nettyServerConfig = new NettyServerConfig();
  23. final NettyClientConfig nettyClientConfig = new NettyClientConfig();
  24. nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
  25. String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
  26. // 设置netty服务器监听端口10911
  27. nettyServerConfig.setListenPort(10911);
  28. // 消息存储的相关配置
  29. final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
  30. // 如果是slave,就设置一个accessMessageInMemoryMaxRatio属性,暂时不懂
  31. if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
  32. int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
  33. messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
  34. }
  35. // 如果启动broker带了-c选项,读取-c后面的配置文件地址
  36. if (commandLine.hasOption('c')) {
  37. String file = commandLine.getOptionValue('c');
  38. if (file != null) {
  39. configFile = file;
  40. InputStream in = new BufferedInputStream(new FileInputStream(file));
  41. properties = new Properties();
  42. properties.load(in);
  43. properties2SystemEnv(properties);
  44. MixAll.properties2Object(properties, brokerConfig);
  45. MixAll.properties2Object(properties, nettyServerConfig);
  46. MixAll.properties2Object(properties, nettyClientConfig);
  47. MixAll.properties2Object(properties, messageStoreConfig);
  48. BrokerPathConfigHelper.setBrokerConfigPath(file);
  49. in.close();
  50. }
  51. }
  52. // 将配置解析到brokerConfig中
  53. MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
  54. // 没有ROCKETMQ_HOME这个环境变量,无法启动
  55. if (null == brokerConfig.getRocketmqHome()) {
  56. System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
  57. System.exit(-2);
  58. }
  59. // 读取配置的namesrvAddr,按;进行分割
  60. String namesrvAddr = brokerConfig.getNamesrvAddr();
  61. if (null != namesrvAddr) {
  62. try {
  63. String[] addrArray = namesrvAddr.split(";");
  64. for (String addr : addrArray) {
  65. RemotingUtil.string2SocketAddress(addr);
  66. }
  67. } catch (Exception e) {
  68. System.out.printf(
  69. "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
  70. namesrvAddr);
  71. System.exit(-3);
  72. }
  73. }
  74. // 根据角色判断
  75. switch (messageStoreConfig.getBrokerRole()) {
  76. case ASYNC_MASTER:
  77. case SYNC_MASTER:
  78. // master就设置id为0
  79. brokerConfig.setBrokerId(MixAll.MASTER_ID);
  80. break;
  81. case SLAVE:
  82. // slave的id必须大于0
  83. if (brokerConfig.getBrokerId() <= 0) {
  84. System.out.print("Slave's brokerId must be > 0");
  85. System.exit(-3);
  86. }
  87. break;
  88. default:
  89. break;
  90. }
  91. // 如果配置了DLeger技术,则id设为-1
  92. if (messageStoreConfig.isEnableDLegerCommitLog()) {
  93. brokerConfig.setBrokerId(-1);
  94. }
  95. // 设置HA监听端口
  96. messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
  97. LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
  98. JoranConfigurator configurator = new JoranConfigurator();
  99. configurator.setContext(lc);
  100. lc.reset();
  101. configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
  102. // 解析启动参数中的-p和-m选项
  103. if (commandLine.hasOption('p')) {
  104. InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
  105. MixAll.printObjectProperties(console, brokerConfig);
  106. MixAll.printObjectProperties(console, nettyServerConfig);
  107. MixAll.printObjectProperties(console, nettyClientConfig);
  108. MixAll.printObjectProperties(console, messageStoreConfig);
  109. System.exit(0);
  110. } else if (commandLine.hasOption('m')) {
  111. InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
  112. MixAll.printObjectProperties(console, brokerConfig, true);
  113. MixAll.printObjectProperties(console, nettyServerConfig, true);
  114. MixAll.printObjectProperties(console, nettyClientConfig, true);
  115. MixAll.printObjectProperties(console, messageStoreConfig, true);
  116. System.exit(0);
  117. }
  118. // 日志相关,跳过
  119. log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
  120. MixAll.printObjectProperties(log, brokerConfig);
  121. MixAll.printObjectProperties(log, nettyServerConfig);
  122. MixAll.printObjectProperties(log, nettyClientConfig);
  123. MixAll.printObjectProperties(log, messageStoreConfig);
  124. // 根据上面的4个配置创建BrokerController
  125. final BrokerController controller = new BrokerController(
  126. brokerConfig,
  127. nettyServerConfig,
  128. nettyClientConfig,
  129. messageStoreConfig);
  130. // remember all configs to prevent discard
  131. controller.getConfiguration().registerConfig(properties);
  132. // 初始化BrokerController组件,主要是创建一个netty服务器以及一些线程池
  133. boolean initResult = controller.initialize();
  134. if (!initResult) {
  135. controller.shutdown();
  136. System.exit(-3);
  137. }
  138. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  139. private volatile boolean hasShutdown = false;
  140. private AtomicInteger shutdownTimes = new AtomicInteger(0);
  141. @Override
  142. public void run() {
  143. synchronized (this) {
  144. log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
  145. if (!this.hasShutdown) {
  146. this.hasShutdown = true;
  147. long beginTime = System.currentTimeMillis();
  148. controller.shutdown();
  149. long consumingTimeTotal = System.currentTimeMillis() - beginTime;
  150. log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
  151. }
  152. }
  153. }
  154. }, "ShutdownHook"));
  155. return controller;
  156. } catch (Throwable e) {
  157. e.printStackTrace();
  158. System.exit(-1);
  159. }
  160. return null;
  161. }

也是有几个核心配置,BrokerConfig,NettyServerConfig,NettyClientConfig,MessageStoreConfig,其中BrokerConfig是broker自己本身的配置,NettyServerConfig是broker需要作为服务器端与生产者和消费端进行通信的配置,NettyClientConfig是broker作为客户端与namesrv通信的配置,至于里面的具体配置内容就不分析了,等具体用到了再说明。
然后根据上面的4个配置创建BrokerController:

  1. final BrokerController controller = new BrokerController(
  2. brokerConfig,
  3. nettyServerConfig,
  4. nettyClientConfig,
  5. messageStoreConfig);
  6. public BrokerController(
  7. final BrokerConfig brokerConfig,
  8. final NettyServerConfig nettyServerConfig,
  9. final NettyClientConfig nettyClientConfig,
  10. final MessageStoreConfig messageStoreConfig
  11. ) {
  12. this.brokerConfig = brokerConfig;
  13. this.nettyServerConfig = nettyServerConfig;
  14. this.nettyClientConfig = nettyClientConfig;
  15. this.messageStoreConfig = messageStoreConfig;
  16. // 创建broker各个功能对应的组件
  17. this.consumerOffsetManager = new ConsumerOffsetManager(this);
  18. this.topicConfigManager = new TopicConfigManager(this);
  19. this.pullMessageProcessor = new PullMessageProcessor(this);
  20. this.pullRequestHoldService = new PullRequestHoldService(this);
  21. this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
  22. this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
  23. this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
  24. this.consumerFilterManager = new ConsumerFilterManager(this);
  25. this.producerManager = new ProducerManager();
  26. this.clientHousekeepingService = new ClientHousekeepingService(this);
  27. this.broker2Client = new Broker2Client(this);
  28. this.subscriptionGroupManager = new SubscriptionGroupManager(this);
  29. this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
  30. this.filterServerManager = new FilterServerManager(this);
  31. this.slaveSynchronize = new SlaveSynchronize(this);
  32. // 创建各线程池的等待队列
  33. this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
  34. this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
  35. this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
  36. this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
  37. this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
  38. this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
  39. this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
  40. this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
  41. this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
  42. this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
  43. this.brokerFastFailure = new BrokerFastFailure(this);
  44. this.configuration = new Configuration(
  45. log,
  46. BrokerPathConfigHelper.getBrokerConfigPath(),
  47. this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
  48. );
  49. }

在BrokerController的构造方法里会同时创建各个功能组件。
然后就需要对BrokerController进行初始化:

  1. boolean initResult = controller.initialize();
  2. /** * 对BrokerController进行初始化,主要是创建一个netty服务器以及一些线程池 */
  3. public boolean initialize() throws CloneNotSupportedException {
  4. // 加载配置
  5. boolean result = this.topicConfigManager.load();
  6. result = result && this.consumerOffsetManager.load();
  7. result = result && this.subscriptionGroupManager.load();
  8. result = result && this.consumerFilterManager.load();
  9. if (result) {
  10. try {
  11. // 创建消息存储相关组件及服务
  12. this.messageStore =
  13. new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
  14. this.brokerConfig);
  15. if (messageStoreConfig.isEnableDLegerCommitLog()) {
  16. DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
  17. ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
  18. }
  19. this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
  20. //load plugin
  21. MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
  22. this.messageStore = MessageStoreFactory.build(context, this.messageStore);
  23. this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
  24. } catch (IOException e) {
  25. result = false;
  26. log.error("Failed to initialize", e);
  27. }
  28. }
  29. // 加载消息存储相关信息
  30. result = result && this.messageStore.load();
  31. if (result) {
  32. // 创建一个netty服务器,毕竟也要生产者消费者通信
  33. this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
  34. NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
  35. fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
  36. this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
  37. // 下面是一些处理不同请求的线程池
  38. // 创建处理发送过来的消息的线程池
  39. this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
  40. this.brokerConfig.getSendMessageThreadPoolNums(),
  41. this.brokerConfig.getSendMessageThreadPoolNums(),
  42. 1000 * 60,
  43. TimeUnit.MILLISECONDS,
  44. this.sendThreadPoolQueue,
  45. new ThreadFactoryImpl("SendMessageThread_"));
  46. // 创建处理consumer过来拉取消息的线程池
  47. this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
  48. this.brokerConfig.getPullMessageThreadPoolNums(),
  49. this.brokerConfig.getPullMessageThreadPoolNums(),
  50. 1000 * 60,
  51. TimeUnit.MILLISECONDS,
  52. this.pullThreadPoolQueue,
  53. new ThreadFactoryImpl("PullMessageThread_"));
  54. // 回复消息的线程池
  55. this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
  56. this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
  57. this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
  58. 1000 * 60,
  59. TimeUnit.MILLISECONDS,
  60. this.replyThreadPoolQueue,
  61. new ThreadFactoryImpl("ProcessReplyMessageThread_"));
  62. // 查询消息的线程池
  63. this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
  64. this.brokerConfig.getQueryMessageThreadPoolNums(),
  65. this.brokerConfig.getQueryMessageThreadPoolNums(),
  66. 1000 * 60,
  67. TimeUnit.MILLISECONDS,
  68. this.queryThreadPoolQueue,
  69. new ThreadFactoryImpl("QueryMessageThread_"));
  70. // 管理broker一些命令执行的线程池
  71. this.adminBrokerExecutor =
  72. Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
  73. "AdminBrokerThread_"));
  74. // 管理客户端的线程池
  75. this.clientManageExecutor = new ThreadPoolExecutor(
  76. this.brokerConfig.getClientManageThreadPoolNums(),
  77. this.brokerConfig.getClientManageThreadPoolNums(),
  78. 1000 * 60,
  79. TimeUnit.MILLISECONDS,
  80. this.clientManagerThreadPoolQueue,
  81. new ThreadFactoryImpl("ClientManageThread_"));
  82. // 发送心跳的线程池
  83. this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
  84. this.brokerConfig.getHeartbeatThreadPoolNums(),
  85. this.brokerConfig.getHeartbeatThreadPoolNums(),
  86. 1000 * 60,
  87. TimeUnit.MILLISECONDS,
  88. this.heartbeatThreadPoolQueue,
  89. new ThreadFactoryImpl("HeartbeatThread_", true));
  90. // 结束事务的线程池
  91. this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
  92. this.brokerConfig.getEndTransactionThreadPoolNums(),
  93. this.brokerConfig.getEndTransactionThreadPoolNums(),
  94. 1000 * 60,
  95. TimeUnit.MILLISECONDS,
  96. this.endTransactionThreadPoolQueue,
  97. new ThreadFactoryImpl("EndTransactionThread_"));
  98. // 管理consumer的线程池
  99. this.consumerManageExecutor =
  100. Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
  101. "ConsumerManageThread_"));
  102. // 注册请求处理器和请求线程池
  103. this.registerProcessor();
  104. // 下面就是开启一些定时任务
  105. final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
  106. final long period = 1000 * 60 * 60 * 24;
  107. // 定时进行broker统计的任务
  108. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  109. @Override
  110. public void run() {
  111. try {
  112. BrokerController.this.getBrokerStats().record();
  113. } catch (Throwable e) {
  114. log.error("schedule record error.", e);
  115. }
  116. }
  117. }, initialDelay, period, TimeUnit.MILLISECONDS);
  118. // 定时持久化消费进度
  119. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  120. @Override
  121. public void run() {
  122. try {
  123. BrokerController.this.consumerOffsetManager.persist();
  124. } catch (Throwable e) {
  125. log.error("schedule persist consumerOffset error.", e);
  126. }
  127. }
  128. }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
  129. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  130. @Override
  131. public void run() {
  132. try {
  133. BrokerController.this.consumerFilterManager.persist();
  134. } catch (Throwable e) {
  135. log.error("schedule persist consumer filter error.", e);
  136. }
  137. }
  138. }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
  139. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  140. @Override
  141. public void run() {
  142. try {
  143. BrokerController.this.protectBroker();
  144. } catch (Throwable e) {
  145. log.error("protectBroker error.", e);
  146. }
  147. }
  148. }, 3, 3, TimeUnit.MINUTES);
  149. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  150. @Override
  151. public void run() {
  152. try {
  153. BrokerController.this.printWaterMark();
  154. } catch (Throwable e) {
  155. log.error("printWaterMark error.", e);
  156. }
  157. }
  158. }, 10, 1, TimeUnit.SECONDS);
  159. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  160. @Override
  161. public void run() {
  162. try {
  163. log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
  164. } catch (Throwable e) {
  165. log.error("schedule dispatchBehindBytes error.", e);
  166. }
  167. }
  168. }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
  169. if (this.brokerConfig.getNamesrvAddr() != null) {
  170. this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
  171. log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
  172. } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
  173. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  174. @Override
  175. public void run() {
  176. try {
  177. BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
  178. } catch (Throwable e) {
  179. log.error("ScheduledTask fetchNameServerAddr exception", e);
  180. }
  181. }
  182. }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
  183. }
  184. if (!messageStoreConfig.isEnableDLegerCommitLog()) {
  185. if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
  186. if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
  187. this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
  188. this.updateMasterHAServerAddrPeriodically = false;
  189. } else {
  190. this.updateMasterHAServerAddrPeriodically = true;
  191. }
  192. } else {
  193. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  194. @Override
  195. public void run() {
  196. try {
  197. BrokerController.this.printMasterAndSlaveDiff();
  198. } catch (Throwable e) {
  199. log.error("schedule printMasterAndSlaveDiff error.", e);
  200. }
  201. }
  202. }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
  203. }
  204. }
  205. if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
  206. // Register a listener to reload SslContext
  207. try {
  208. fileWatchService = new FileWatchService(
  209. new String[] {
  210. TlsSystemConfig.tlsServerCertPath,
  211. TlsSystemConfig.tlsServerKeyPath,
  212. TlsSystemConfig.tlsServerTrustCertPath
  213. },
  214. new FileWatchService.Listener() {
  215. boolean certChanged, keyChanged = false;
  216. @Override
  217. public void onChanged(String path) {
  218. if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
  219. log.info("The trust certificate changed, reload the ssl context");
  220. reloadServerSslContext();
  221. }
  222. if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
  223. certChanged = true;
  224. }
  225. if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
  226. keyChanged = true;
  227. }
  228. if (certChanged && keyChanged) {
  229. log.info("The certificate and private key changed, reload the ssl context");
  230. certChanged = keyChanged = false;
  231. reloadServerSslContext();
  232. }
  233. }
  234. private void reloadServerSslContext() {
  235. ((NettyRemotingServer) remotingServer).loadSslContext();
  236. ((NettyRemotingServer) fastRemotingServer).loadSslContext();
  237. }
  238. });
  239. } catch (Exception e) {
  240. log.warn("FileWatchService created error, can't load the certificate dynamically");
  241. }
  242. }
  243. initialTransaction();
  244. initialAcl();
  245. initialRpcHooks();
  246. }
  247. return result;
  248. }

代码比较长,相关的流程都写在注释里了,简单概括就是加载配置和消费进度,将消费进度从磁盘读到内存,创建消息存储相关组件及服务以及加载该服务,创建一个netty服务器,毕竟也要和生产者消费者通信,创建一些处理不同请求的线程池和处理器(这里就会有专门的线程池和处理器,不像namesrv就只有一个默认的,毕竟broker要处理的东西比较多),以及开启一些定时任务。
最后一步就是开启BrokerController:

  1. controller.start();
  2. /** * 启动各个组件 */
  3. public void start() throws Exception {
  4. if (this.messageStore != null) {
  5. // 启动存储组件里的各个后台功能,比如定时刷盘,indexFile服务等
  6. this.messageStore.start();
  7. }
  8. if (this.remotingServer != null) {
  9. this.remotingServer.start();
  10. }
  11. if (this.fastRemotingServer != null) {
  12. this.fastRemotingServer.start();
  13. }
  14. if (this.fileWatchService != null) {
  15. this.fileWatchService.start();
  16. }
  17. // 这个组件就是让Broker通过Netty客户端去发送请求出去给别人
  18. if (this.brokerOuterAPI != null) {
  19. this.brokerOuterAPI.start();
  20. }
  21. if (this.pullRequestHoldService != null) {
  22. this.pullRequestHoldService.start();
  23. }
  24. if (this.clientHousekeepingService != null) {
  25. this.clientHousekeepingService.start();
  26. }
  27. if (this.filterServerManager != null) {
  28. this.filterServerManager.start();
  29. }
  30. if (!messageStoreConfig.isEnableDLegerCommitLog()) {
  31. startProcessorByHa(messageStoreConfig.getBrokerRole());
  32. handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
  33. this.registerBrokerAll(true, false, true);
  34. }
  35. // 开一个任务去向namesrv注册,第一次是注册,后续就相当于发送心跳了
  36. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  37. // 可以看到发送心跳间隔只能在10s-60s之间
  38. @Override
  39. public void run() {
  40. try {
  41. BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
  42. } catch (Throwable e) {
  43. log.error("registerBrokerAll Exception", e);
  44. }
  45. }
  46. }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
  47. if (this.brokerStatsManager != null) {
  48. this.brokerStatsManager.start();
  49. }
  50. if (this.brokerFastFailure != null) {
  51. this.brokerFastFailure.start();
  52. }
  53. }

可以看到是启动一些组件,但这些组件里又会启动一些后台任务,这些等后面解析具体功能时再说明。最后还会显式开启一个定时任务,用于broker向namesrv发送心跳。

好了,最后再来总结下broker启动流程:先解析并创建4个核心配置,根据这4个配置创建BrokerController组件,然后对该组件进行初始化,在初始化的过程中会创建一些必要的功能组件,netty客户端,netty服务端,请求处理器和线程池以及开启一些定时任务,最后再启动各个功能组件并创建一个发送心跳的定时任务。
在这里插入图片描述

路由注册和心跳流程

这节来研究broker向namesrv注册以及发送心跳的流程(同一个定时器)

Broker启动时向集群中所有的NameServer发送心跳,每隔30s发送一次,NameServer收到Broker心跳包时会更新brokerLiveTable的lastUpdateTimestamp,然后NameServer每隔10s扫描brokerLiveTable,如果连续120s没有收到心跳包,NameServer将移除该Broker的路由信息同时关闭Socket连接。

broker发送心跳包

入口代码在启动BrokerController里

  1. // 开一个任务去向namesrv注册,第一次是注册,后续就相当于发送心跳了
  2. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  3. // 可以看到发送心跳间隔只能在10s-60s之间
  4. @Override
  5. public void run() {
  6. try {
  7. BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
  8. } catch (Throwable e) {
  9. log.error("registerBrokerAll Exception", e);
  10. }
  11. }
  12. }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

定时器中调用BrokerController的registerBrokerAll方法:

  1. /** * 去向namesrv注册 */
  2. public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
  3. TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
  4. // 不懂,跳过
  5. if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
  6. || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
  7. ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
  8. for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
  9. TopicConfig tmp =
  10. new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
  11. this.brokerConfig.getBrokerPermission());
  12. topicConfigTable.put(topicConfig.getTopicName(), tmp);
  13. }
  14. topicConfigWrapper.setTopicConfigTable(topicConfigTable);
  15. }
  16. // 判断是否需要注册
  17. if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
  18. this.getBrokerAddr(),
  19. this.brokerConfig.getBrokerName(),
  20. this.brokerConfig.getBrokerId(),
  21. this.brokerConfig.getRegisterBrokerTimeoutMills())) {
  22. // 注册
  23. doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
  24. }
  25. }

其中,forceRegister默认为true,所以到下面注册的逻辑里

  1. private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
  2. TopicConfigSerializeWrapper topicConfigWrapper) {
  3. // 通过brokerOuterAPI组件发送网络请求到每个NameServer去注册
  4. List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
  5. this.brokerConfig.getBrokerClusterName(),
  6. this.getBrokerAddr(),
  7. this.brokerConfig.getBrokerName(),
  8. this.brokerConfig.getBrokerId(),
  9. this.getHAServerAddr(),
  10. topicConfigWrapper,
  11. this.filterServerManager.buildNewFilterServerList(),
  12. oneway,
  13. this.brokerConfig.getRegisterBrokerTimeoutMills(),
  14. this.brokerConfig.isCompressedRegister());
  15. if (registerBrokerResultList.size() > 0) {
  16. // 获取第一个注册结果进行处理
  17. RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
  18. if (registerBrokerResult != null) {
  19. if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
  20. this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
  21. }
  22. this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
  23. if (checkOrderConfig) {
  24. this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
  25. }
  26. }
  27. }
  28. }

通过brokerOuterAPI组件去发送心跳

  1. /** * 构建请求去注册Broker */
  2. public List<RegisterBrokerResult> registerBrokerAll(
  3. final String clusterName,
  4. final String brokerAddr,
  5. final String brokerName,
  6. final long brokerId,
  7. final String haServerAddr,
  8. final TopicConfigSerializeWrapper topicConfigWrapper,
  9. final List<String> filterServerList,
  10. final boolean oneway,
  11. final int timeoutMills,
  12. final boolean compressed) {
  13. // 初始化注册结果列表
  14. final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
  15. // nameServer地址列表
  16. List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
  17. if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
  18. // 构建请求数据
  19. final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
  20. requestHeader.setBrokerAddr(brokerAddr);
  21. requestHeader.setBrokerId(brokerId);
  22. requestHeader.setBrokerName(brokerName);
  23. requestHeader.setClusterName(clusterName);
  24. requestHeader.setHaServerAddr(haServerAddr);
  25. requestHeader.setCompressed(compressed);
  26. // 主题配置信息
  27. RegisterBrokerBody requestBody = new RegisterBrokerBody();
  28. requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
  29. requestBody.setFilterServerList(filterServerList);
  30. final byte[] body = requestBody.encode(compressed);
  31. final int bodyCrc32 = UtilAll.crc32(body);
  32. requestHeader.setBodyCrc32(bodyCrc32);
  33. // 所有nameserver都注册完后再继续执行
  34. final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
  35. // 循环向每个nameserver注册
  36. for (final String namesrvAddr : nameServerAddressList) {
  37. // 提交任务给线程池同时向每个nameserver注册
  38. brokerOuterExecutor.execute(new Runnable() {
  39. @Override
  40. public void run() {
  41. try {
  42. RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
  43. if (result != null) {
  44. // 将注册结果放到列表中
  45. registerBrokerResultList.add(result);
  46. }
  47. log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
  48. } catch (Exception e) {
  49. log.warn("registerBroker Exception, {}", namesrvAddr, e);
  50. } finally {
  51. countDownLatch.countDown();
  52. }
  53. }
  54. });
  55. }
  56. try {
  57. // 都向每个nameserver注册完才继续往下,才算注册完成
  58. countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
  59. } catch (InterruptedException e) {
  60. }
  61. }
  62. return registerBrokerResultList;
  63. }

通过将数据封装在RegisterBrokerRequestHeader中发送请求。
通过CountDownLatch来保证等待向所有nameserver发送完心跳后返回。

  1. /** * 根据nameserver地址、构建的请求头和请求体去注册broker */
  2. private RegisterBrokerResult registerBroker(
  3. final String namesrvAddr,
  4. final boolean oneway,
  5. final int timeoutMills,
  6. final RegisterBrokerRequestHeader requestHeader,
  7. final byte[] body
  8. ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
  9. InterruptedException {
  10. // 构建一个注册的请求数据,其实请求数据和响应数据都是RemotingCommand
  11. RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
  12. request.setBody(body);
  13. // 单向请求,表示不用等待注册结果,返回null
  14. if (oneway) {
  15. try {
  16. this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
  17. } catch (RemotingTooMuchRequestException e) {
  18. // Ignore
  19. }
  20. return null;
  21. }
  22. // 同步调用
  23. RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
  24. assert response != null;
  25. // 根据响应码判断注册是否成功
  26. switch (response.getCode()) {
  27. case ResponseCode.SUCCESS: {
  28. RegisterBrokerResponseHeader responseHeader =
  29. (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
  30. RegisterBrokerResult result = new RegisterBrokerResult();
  31. result.setMasterAddr(responseHeader.getMasterAddr());
  32. result.setHaServerAddr(responseHeader.getHaServerAddr());
  33. if (response.getBody() != null) {
  34. result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
  35. }
  36. return result;
  37. }
  38. default:
  39. break;
  40. }
  41. // 失败则抛异常
  42. throw new MQBrokerException(response.getCode(), response.getRemark());
  43. }

发送心跳是同步请求,通过NettyRemotingClient发送请求,NettyRemotingClient可以看作是netty客户端,专门用来发送请求,与具体业务无关。
NettyRemotingClient:

  1. /** * netty客户端同步调用 */
  2. @Override
  3. public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
  4. throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
  5. long beginStartTime = System.currentTimeMillis();
  6. // 从缓存获取或创建channel
  7. final Channel channel = this.getAndCreateChannel(addr);
  8. if (channel != null && channel.isActive()) {
  9. try {
  10. doBeforeRpcHooks(addr, request);
  11. long costTime = System.currentTimeMillis() - beginStartTime;
  12. // 已经超时了
  13. if (timeoutMillis < costTime) {
  14. throw new RemotingTimeoutException("invokeSync call timeout");
  15. }
  16. // 同步调用
  17. RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
  18. doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
  19. return response;
  20. } catch (RemotingSendRequestException e) {
  21. log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
  22. this.closeChannel(addr, channel);
  23. throw e;
  24. } catch (RemotingTimeoutException e) {
  25. if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
  26. this.closeChannel(addr, channel);
  27. log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
  28. }
  29. log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
  30. throw e;
  31. }
  32. } else {
  33. this.closeChannel(addr, channel);
  34. throw new RemotingConnectException(addr);
  35. }
  36. }
  37. public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
  38. final long timeoutMillis)
  39. throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
  40. // 可以看作request id
  41. final int opaque = request.getOpaque();
  42. try {
  43. // 创建一个该请求的ResponseFuture暂存在响应表中
  44. final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
  45. this.responseTable.put(opaque, responseFuture);
  46. final SocketAddress addr = channel.remoteAddress();
  47. // 发送请求
  48. channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
  49. // 请求发送成功的回调函数
  50. @Override
  51. public void operationComplete(ChannelFuture f) throws Exception {
  52. if (f.isSuccess()) {
  53. responseFuture.setSendRequestOK(true);
  54. return;
  55. } else {
  56. responseFuture.setSendRequestOK(false);
  57. }
  58. // 请求发送失败,移除缓存,响应设置为null
  59. responseTable.remove(opaque);
  60. responseFuture.setCause(f.cause());
  61. responseFuture.putResponse(null);
  62. log.warn("send a request command to channel <" + addr + "> failed.");
  63. }
  64. });
  65. // 同步等待响应结果,由响应线程唤醒
  66. RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
  67. if (null == responseCommand) {
  68. // 响应为空但请求发送成功表示响应超时
  69. if (responseFuture.isSendRequestOK()) {
  70. throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
  71. responseFuture.getCause());
  72. } else {
  73. // 请求发送失败
  74. throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
  75. }
  76. }
  77. return responseCommand;
  78. } finally {
  79. this.responseTable.remove(opaque);
  80. }
  81. }

channel.writeAndFlush(request)本质上也是异步请求,发送请求成功后就返回了,所以如果需要实现同步请求,就需要阻塞等待请求结果。
实现过程是创建一个该请求的ResponseFuture暂存在响应表中,用于客户端发送数据后,服务端返回响应结果的时候,和客户端的请求正确匹配起来。具体过程是客户端发送请求时将<请求id, responseFuture>缓存起来,这时用CountDownLatch阻塞请求线程,等待响应结果。当netty客户端接收到服务端的响应后,响应线程根据响应数据中的请求id从缓存中取出responseFuture, 将响应结果设置到responseFuture中,然后通过CountDownLatch操作通知请求线程,将请求线程唤醒,请求线程通过responseFuture拿到响应结果进行处理。所以说到底还是异步请求,只是通过CountDownLatch将线程阻塞住等待结果。

这里分析了netty请求过程,后面涉及netty请求就不再分析了。

下面看看nameserver接收到心跳请求后如何处理。

nameserver处理心跳包

netty收到请求的处理入口在NettyRemotingServer中,刚好与发送请求的NettyRemotingClient对应。
NettyRemotingServer

  1. public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
  2. @ChannelHandler.Sharable
  3. class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
  4. @Override
  5. protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
  6. // netty服务端处理请求入口
  7. processMessageReceived(ctx, msg);
  8. }
  9. }
  10. }

调用父类方法
NettyRemotingAbstract

  1. public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
  2. final RemotingCommand cmd = msg;
  3. if (cmd != null) {
  4. switch (cmd.getType()) {
  5. case REQUEST_COMMAND:
  6. // 处理请求
  7. processRequestCommand(ctx, cmd);
  8. break;
  9. case RESPONSE_COMMAND:
  10. // 处理响应
  11. processResponseCommand(ctx, cmd);
  12. break;
  13. default:
  14. break;
  15. }
  16. }
  17. }

可以看到这有两种类型,处理请求可以理解,但为什么会有处理响应的?上面说到了,由于netty发送同步请求本质上还是异步请求,所以netty处理完请求后需要将响应结果重新发送给客户端。看源码发现NettyRemotingClient也继承了NettyRemotingAbstract

  1. public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
  2. class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
  3. @Override
  4. protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
  5. processMessageReceived(ctx, msg);
  6. }
  7. }
  8. }

所以NettyRemotingClient这边的该方法是用来处理响应的。
好了,接着回到server端处理接收到的请求

  1. /** * 处理接收到的请求 * * @param ctx channel handler context. * @param cmd request command. */
  2. public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
  3. // 获得属于该请求类型的Processor和Executor,没有则用默认的
  4. // 这里的注册请求没有专门的处理器和线程池,使用默认的
  5. final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
  6. final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
  7. // 相当于请求id
  8. final int opaque = cmd.getOpaque();
  9. if (pair != null) {
  10. Runnable run = new Runnable() {
  11. @Override
  12. public void run() {
  13. try {
  14. doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
  15. // 对应处理器处理请求
  16. final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
  17. doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
  18. // 如果不是单向请求,需要将结果返回去,异步也会,只是不对响应进行处理
  19. if (!cmd.isOnewayRPC()) {
  20. if (response != null) {
  21. // 将请求id设到响应中,用于客户端根据请求id找到对应的请求
  22. response.setOpaque(opaque);
  23. response.markResponseType();
  24. try {
  25. ctx.writeAndFlush(response);
  26. } catch (Throwable e) {
  27. log.error("process request over, but response failed", e);
  28. log.error(cmd.toString());
  29. log.error(response.toString());
  30. }
  31. } else {
  32. }
  33. }
  34. } catch (Throwable e) {
  35. log.error("process request exception", e);
  36. log.error(cmd.toString());
  37. if (!cmd.isOnewayRPC()) {
  38. final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
  39. RemotingHelper.exceptionSimpleDesc(e));
  40. response.setOpaque(opaque);
  41. ctx.writeAndFlush(response);
  42. }
  43. }
  44. }
  45. };
  46. if (pair.getObject1().rejectRequest()) {
  47. final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
  48. "[REJECTREQUEST]system busy, start flow control for a while");
  49. response.setOpaque(opaque);
  50. ctx.writeAndFlush(response);
  51. return;
  52. }
  53. try {
  54. // 外面再封装一层任务,任务中直接调用Runnable对象的run方法执行
  55. final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
  56. // 将任务提交到对应线程池中
  57. // 这里注册请求也是使用默认的线程池
  58. pair.getObject2().submit(requestTask);
  59. } catch (RejectedExecutionException e) {
  60. if ((System.currentTimeMillis() % 10000) == 0) {
  61. log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
  62. + ", too many requests and system thread pool busy, RejectedExecutionException "
  63. + pair.getObject2().toString()
  64. + " request code: " + cmd.getCode());
  65. }
  66. if (!cmd.isOnewayRPC()) {
  67. final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
  68. "[OVERLOAD]system busy, start flow control for a while");
  69. response.setOpaque(opaque);
  70. ctx.writeAndFlush(response);
  71. }
  72. }
  73. } else {
  74. String error = " request type " + cmd.getCode() + " not supported";
  75. final RemotingCommand response =
  76. RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
  77. response.setOpaque(opaque);
  78. ctx.writeAndFlush(response);
  79. log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
  80. }
  81. }

接下来就需要使用对应的处理器创建一个线程提交到线程池中,由于注册请求没有专门的处理器和线程池,所以都是用默认的。
DefaultRequestProcessor

  1. /** * 处理netty接收到的请求 */
  2. @Override
  3. public RemotingCommand processRequest(ChannelHandlerContext ctx,
  4. RemotingCommand request) throws RemotingCommandException {
  5. if (ctx != null) {
  6. log.debug("receive request, {} {} {}",
  7. request.getCode(),
  8. RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
  9. request);
  10. }
  11. // 根据不同请求类型进行不同处理
  12. switch (request.getCode()) {
  13. case RequestCode.PUT_KV_CONFIG:
  14. return this.putKVConfig(ctx, request);
  15. case RequestCode.GET_KV_CONFIG:
  16. return this.getKVConfig(ctx, request);
  17. case RequestCode.DELETE_KV_CONFIG:
  18. return this.deleteKVConfig(ctx, request);
  19. case RequestCode.QUERY_DATA_VERSION:
  20. return queryBrokerTopicConfig(ctx, request);
  21. // 注册broker和broke发送心跳的请求
  22. case RequestCode.REGISTER_BROKER:
  23. Version brokerVersion = MQVersion.value2Version(request.getVersion());
  24. if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
  25. return this.registerBrokerWithFilterServer(ctx, request);
  26. } else {
  27. // 注册broker
  28. return this.registerBroker(ctx, request);
  29. }
  30. // broker正常下线,移除broker的注册信息
  31. case RequestCode.UNREGISTER_BROKER:
  32. return this.unregisterBroker(ctx, request);
  33. // 根据topic获取路由信息
  34. case RequestCode.GET_ROUTEINTO_BY_TOPIC:
  35. return this.getRouteInfoByTopic(ctx, request);
  36. case RequestCode.GET_BROKER_CLUSTER_INFO:
  37. return this.getBrokerClusterInfo(ctx, request);
  38. case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
  39. return this.wipeWritePermOfBroker(ctx, request);
  40. case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
  41. return getAllTopicListFromNameserver(ctx, request);
  42. case RequestCode.DELETE_TOPIC_IN_NAMESRV:
  43. return deleteTopicInNamesrv(ctx, request);
  44. case RequestCode.GET_KVLIST_BY_NAMESPACE:
  45. return this.getKVListByNamespace(ctx, request);
  46. case RequestCode.GET_TOPICS_BY_CLUSTER:
  47. return this.getTopicsByCluster(ctx, request);
  48. case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
  49. return this.getSystemTopicListFromNs(ctx, request);
  50. case RequestCode.GET_UNIT_TOPIC_LIST:
  51. return this.getUnitTopicList(ctx, request);
  52. case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
  53. return this.getHasUnitSubTopicList(ctx, request);
  54. case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
  55. return this.getHasUnitSubUnUnitTopicList(ctx, request);
  56. case RequestCode.UPDATE_NAMESRV_CONFIG:
  57. return this.updateConfig(ctx, request);
  58. case RequestCode.GET_NAMESRV_CONFIG:
  59. return this.getConfig(ctx, request);
  60. default:
  61. break;
  62. }
  63. return null;
  64. }

可以看到该默认的请求处理器也可以处理多种类型的请求,我们就看处理注册broker和broke发送心跳的请求

  1. /** * 处理注册的请求 */
  2. public RemotingCommand registerBroker(ChannelHandlerContext ctx,
  3. RemotingCommand request) throws RemotingCommandException {
  4. // 构建响应的RemotingCommand
  5. final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
  6. final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
  7. final RegisterBrokerRequestHeader requestHeader =
  8. (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
  9. if (!checksum(ctx, request, requestHeader)) {
  10. response.setCode(ResponseCode.SYSTEM_ERROR);
  11. response.setRemark("crc32 not match");
  12. return response;
  13. }
  14. // topic路由配置信息
  15. TopicConfigSerializeWrapper topicConfigWrapper;
  16. if (request.getBody() != null) {
  17. topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
  18. } else {
  19. topicConfigWrapper = new TopicConfigSerializeWrapper();
  20. topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
  21. topicConfigWrapper.getDataVersion().setTimestamp(0);
  22. }
  23. // 向路由信息管理器实际注册broker信息
  24. RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
  25. requestHeader.getClusterName(),
  26. requestHeader.getBrokerAddr(),
  27. requestHeader.getBrokerName(),
  28. requestHeader.getBrokerId(),
  29. requestHeader.getHaServerAddr(),
  30. topicConfigWrapper,
  31. null,
  32. ctx.channel()
  33. );
  34. responseHeader.setHaServerAddr(result.getHaServerAddr());
  35. responseHeader.setMasterAddr(result.getMasterAddr());
  36. byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
  37. response.setBody(jsonValue);
  38. response.setCode(ResponseCode.SUCCESS);
  39. response.setRemark(null);
  40. return response;
  41. }

从NamesrvController中获取RouteInfoManager路由信息管理器来真正注册broker
在真正注册之前,先来看看RouteInfoManager中即将用到的属性吧

  1. private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
  2. private final ReadWriteLock lock = new ReentrantReadWriteLock();
  3. /** * 每个topic对应的队列 */
  4. private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
  5. /** * 维护每个主从架构中的broker信息 */
  6. private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
  7. /** * 维护每个broker集群中的每个broker名称,一般只会有一个集群 */
  8. private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
  9. /** * 维护心跳信息 */
  10. private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
  11. private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

现在看真正执行注册的逻辑

  1. /** * 向该路由信息管理器注册broker信息 */
  2. public RegisterBrokerResult registerBroker(
  3. final String clusterName,
  4. final String brokerAddr,
  5. final String brokerName,
  6. final long brokerId,
  7. final String haServerAddr,
  8. final TopicConfigSerializeWrapper topicConfigWrapper,
  9. final List<String> filterServerList,
  10. final Channel channel) {
  11. RegisterBrokerResult result = new RegisterBrokerResult();
  12. try {
  13. try {
  14. // 获得写锁,防止并发修改路由表
  15. this.lock.writeLock().lockInterruptibly();
  16. // 取出该集群名下的broker名称集合,判断该broker所属集群是否存在,不存在则先创建一个
  17. Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
  18. if (null == brokerNames) {
  19. brokerNames = new HashSet<String>();
  20. this.clusterAddrTable.put(clusterName, brokerNames);
  21. }
  22. // 将broker名称添加到clusterName下
  23. brokerNames.add(brokerName);
  24. // broker是否是第一次注册
  25. boolean registerFirst = false;
  26. BrokerData brokerData = this.brokerAddrTable.get(brokerName);
  27. if (null == brokerData) {
  28. // 没有broker主从架构信息,说明broker是第一次注册
  29. registerFirst = true;
  30. // 构建broker数据
  31. brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
  32. this.brokerAddrTable.put(brokerName, brokerData);
  33. }
  34. Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
  35. //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
  36. //The same IP:PORT must only have one record in brokerAddrTable
  37. Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
  38. while (it.hasNext()) {
  39. Entry<Long, String> item = it.next();
  40. if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
  41. it.remove();
  42. }
  43. }
  44. // 设置对应brokerId的地址,若存在,则更新,并且registerFirst设为true
  45. String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
  46. registerFirst = registerFirst || (null == oldAddr);
  47. // broker是master
  48. if (null != topicConfigWrapper
  49. && MixAll.MASTER_ID == brokerId) {
  50. // broker的topic路由信息改变了或首次注册,则需要创建或更新topic路由元数据,填充topicQueueTable
  51. // 其实就是为默认主题自动注册路由信息,其中包含MixAll.DEFAULT_TOPIC的路由信息。当消息生产者发送
  52. // 主题时,如果该主题为创建并且BrokerConfig的autoCreateTopicEnable为true时,将返回
  53. // MixAll.DEFAULT_TOPIC的路由信息
  54. if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
  55. || registerFirst) {
  56. ConcurrentMap<String, TopicConfig> tcTable =
  57. topicConfigWrapper.getTopicConfigTable();
  58. if (tcTable != null) {
  59. for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
  60. // 要创建或更新topic路由元数据,填充topicQueueTable
  61. this.createAndUpdateQueueData(brokerName, entry.getValue());
  62. }
  63. }
  64. }
  65. }
  66. // 心跳逻辑
  67. // 默认每隔30s收到心跳请求,生成一个新的BrokerLiveInfo,覆盖掉原来的BrokerLiveInfo,
  68. // BrokerLiveInfo里有一个lastUpdateTimestamp属性,表示最近收到心跳的时间
  69. BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
  70. new BrokerLiveInfo(
  71. System.currentTimeMillis(),
  72. topicConfigWrapper.getDataVersion(),
  73. channel,
  74. haServerAddr));
  75. if (null == prevBrokerLiveInfo) {
  76. log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
  77. }
  78. if (filterServerList != null) {
  79. if (filterServerList.isEmpty()) {
  80. this.filterServerTable.remove(brokerAddr);
  81. } else {
  82. this.filterServerTable.put(brokerAddr, filterServerList);
  83. }
  84. }
  85. // 如果是从节点,并且master信息已经注册过了,则将master地址设置到RegisterBrokerResult
  86. if (MixAll.MASTER_ID != brokerId) {
  87. String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
  88. if (masterAddr != null) {
  89. BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
  90. if (brokerLiveInfo != null) {
  91. result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
  92. result.setMasterAddr(masterAddr);
  93. }
  94. }
  95. }
  96. } finally {
  97. this.lock.writeLock().unlock();
  98. }
  99. } catch (Exception e) {
  100. log.error("registerBroker Exception", e);
  101. }
  102. return result;
  103. }

具体的流程都写在注释里了

这里使用了读写锁来控制对路由表的并发读写,多个生产者可以同时来读取路由表,但同时只能一个broker来更改路由表。

broker处理响应

nameserver处理完注册请求后,会将结果响应回去,broker这边由NettyRemotingClient来处理响应,唤醒阻塞的请求线程,这时就会用到上面讲到的client来处理响应。

  1. public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
  2. final RemotingCommand cmd = msg;
  3. if (cmd != null) {
  4. switch (cmd.getType()) {
  5. case REQUEST_COMMAND:
  6. // 处理请求
  7. processRequestCommand(ctx, cmd);
  8. break;
  9. case RESPONSE_COMMAND:
  10. // 处理响应
  11. processResponseCommand(ctx, cmd);
  12. break;
  13. default:
  14. break;
  15. }
  16. }
  17. }

此时参数中的RemotingCommand就不是请求数据了,而是响应数据。

  1. /** * 处理响应 */
  2. public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
  3. final int opaque = cmd.getOpaque();
  4. // 根据id从正在等待响应的缓存中获取请求
  5. final ResponseFuture responseFuture = responseTable.get(opaque);
  6. if (responseFuture != null) {
  7. // 设置响应数据
  8. responseFuture.setResponseCommand(cmd);
  9. responseTable.remove(opaque);
  10. // 异步会有InvokeCallback
  11. if (responseFuture.getInvokeCallback() != null) {
  12. executeInvokeCallback(responseFuture);
  13. } else {
  14. // 同步执行这里
  15. // 会使用countDownLatch.countDown()唤醒
  16. responseFuture.putResponse(cmd);
  17. responseFuture.release();
  18. }
  19. } else {
  20. log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
  21. log.warn(cmd.toString());
  22. }
  23. }

重新贴一下发送同步请求的代码:

  1. public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
  2. final long timeoutMillis)
  3. throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
  4. // 可以看作request id
  5. final int opaque = request.getOpaque();
  6. try {
  7. final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
  8. this.responseTable.put(opaque, responseFuture);
  9. final SocketAddress addr = channel.remoteAddress();
  10. // 发送请求
  11. channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
  12. // 请求发送成功的回调函数
  13. @Override
  14. public void operationComplete(ChannelFuture f) throws Exception {
  15. if (f.isSuccess()) {
  16. responseFuture.setSendRequestOK(true);
  17. return;
  18. } else {
  19. responseFuture.setSendRequestOK(false);
  20. }
  21. // 请求发送失败,移除缓存,响应设置为null
  22. responseTable.remove(opaque);
  23. responseFuture.setCause(f.cause());
  24. responseFuture.putResponse(null);
  25. log.warn("send a request command to channel <" + addr + "> failed.");
  26. }
  27. });
  28. // 使用countDownLatch.await同步等待响应结果,由响应线程唤醒
  29. RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
  30. if (null == responseCommand) {
  31. // 响应为空但请求发送成功表示响应超时
  32. if (responseFuture.isSendRequestOK()) {
  33. throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
  34. responseFuture.getCause());
  35. } else {
  36. // 请求发送失败
  37. throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
  38. }
  39. }
  40. return responseCommand;
  41. } finally {
  42. this.responseTable.remove(opaque);
  43. }
  44. }

此时请求线程就会在31行代码处唤醒,去处理响应结果。
在这里插入图片描述

参考资料

《儒猿技术窝——从 0 开始带你成为消息中间件实战高手》

发表评论

表情:
评论列表 (有 0 条评论,72人围观)

还没有评论,来说两句吧...

相关阅读