RocketMQ源码解析十三(事务消息流程)

港控/mmm° 2022-09-12 07:47 96阅读 0赞

RocketMQ版本4.6.0,记录自己看源码的过程

代码示例

  1. @Configuration
  2. public class UserProducerConfiguration {
  3. @Autowired
  4. private UserService userService;
  5. @Bean
  6. public TransactionMQProducer producer() throws MQClientException {
  7. TransactionMQProducer producer = new TransactionMQProducer("user_producer_group");
  8. producer.setNamesrvAddr("localhost:9876");
  9. producer.setTransactionListener(new TransactionListener() {
  10. @Override
  11. public LocalTransactionState executeLocalTransaction(Message message, Object o) {
  12. // 根据本地事务执行成与否判断 事务消息是否需要commit与 rollback
  13. ObjectMapper objectMapper = new ObjectMapper();
  14. LocalTransactionState state;
  15. try {
  16. User user = objectMapper.readValue(message.getBody(), User.class);
  17. userService.insert(user);
  18. state = LocalTransactionState.COMMIT_MESSAGE;
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. state = LocalTransactionState.ROLLBACK_MESSAGE;
  22. }
  23. return state;
  24. }
  25. @Override
  26. public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
  27. ObjectMapper objectMapper = new ObjectMapper();
  28. LocalTransactionState state = LocalTransactionState.UNKNOW;
  29. User user;
  30. try {
  31. user = objectMapper.readValue(messageExt.getBody(), User.class);
  32. } catch (IOException e) {
  33. e.printStackTrace();
  34. return state;
  35. }
  36. try {
  37. // 查询数据库是否有这条记录
  38. user = userService.select(user.getId());
  39. if (user != null) {
  40. state = LocalTransactionState.COMMIT_MESSAGE;
  41. } else {
  42. state = LocalTransactionState.ROLLBACK_MESSAGE;
  43. }
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. }
  47. return state;
  48. }
  49. });
  50. producer.start();
  51. return producer;
  52. }
  53. }
  54. @Service
  55. public class UserServiceImpl implements UserService {
  56. @Autowired
  57. private UserMapper userMapper;
  58. @Autowired
  59. private TransactionMQProducer producer;
  60. @Override
  61. public void insert() throws Exception {
  62. User user = User.builder().id(UUID.randomUUID().toString()).name("test").build();
  63. Message msg = new Message("userTopic", "tagA", user.getId(),
  64. new ObjectMapper().writeValueAsString(user).getBytes(RemotingHelper.DEFAULT_CHARSET));
  65. // 发送事务消息
  66. SendResult sendResult = producer.sendMessageInTransaction(msg, null);
  67. }
  68. @Override
  69. public boolean insert(User user) {
  70. return userMapper.insert(user);
  71. }
  72. @Override
  73. public User select(String id) {
  74. return userMapper.select(id);
  75. }
  76. public boolean update(User user) {
  77. return userMapper.update(user);
  78. }
  79. }

事务消息设计思想

在执行本地数据落库之前,会由生产者先发送一条半消息(prepare,这条消息对消费者是不可见的),发送成功表示MQ是可用的,然后会回调生产者的事件监听程序,执行本地事务使数据落库,之后根据本地事务执行情况向MQ发送commit或rollback,提交后,消费者就可以消费到消息了。
如果在commit或rollback时失败了,MQ服务器会开启一个定时任务,每隔60s会回查那些prepare状态下的消息,检查本地事务是否成功,根据状态再次发送commit或rollback,保证数据的一致性。
在这里插入图片描述

事务消息发送流程

DefaultMQProducerImpl

  1. /** * 发送事务消息 */
  2. public TransactionSendResult sendMessageInTransaction(final Message msg,
  3. final LocalTransactionExecuter localTransactionExecuter, final Object arg)
  4. throws MQClientException {
  5. TransactionListener transactionListener = getCheckListener();
  6. if (null == localTransactionExecuter && null == transactionListener) {
  7. throw new MQClientException("tranExecutor is null", null);
  8. }
  9. // 忽略延时
  10. if (msg.getDelayTimeLevel() != 0) {
  11. MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
  12. }
  13. Validators.checkMessage(msg, this.defaultMQProducer);
  14. SendResult sendResult = null;
  15. // 事务消息属性设为true
  16. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
  17. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
  18. try {
  19. // 同步发送,跟发送普通消息一样
  20. sendResult = this.send(msg);
  21. } catch (Exception e) {
  22. throw new MQClientException("send message Exception", e);
  23. }
  24. LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
  25. Throwable localException = null;
  26. // 处理发送结果
  27. switch (sendResult.getSendStatus()) {
  28. // 发送成功
  29. case SEND_OK: {
  30. try {
  31. if (sendResult.getTransactionId() != null) {
  32. msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
  33. }
  34. String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
  35. if (null != transactionId && !"".equals(transactionId)) {
  36. msg.setTransactionId(transactionId);
  37. }
  38. if (null != localTransactionExecuter) {
  39. localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
  40. } else if (transactionListener != null) {
  41. log.debug("Used new transaction API");
  42. // 执行本地事务
  43. localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
  44. }
  45. if (null == localTransactionState) {
  46. localTransactionState = LocalTransactionState.UNKNOW;
  47. }
  48. if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
  49. log.info("executeLocalTransactionBranch return {}", localTransactionState);
  50. log.info(msg.toString());
  51. }
  52. } catch (Throwable e) {
  53. log.info("executeLocalTransactionBranch exception", e);
  54. log.info(msg.toString());
  55. localException = e;
  56. }
  57. }
  58. break;
  59. // 刷盘或同步slave失败则需要回滚,不会执行本地事务
  60. case FLUSH_DISK_TIMEOUT:
  61. case FLUSH_SLAVE_TIMEOUT:
  62. case SLAVE_NOT_AVAILABLE:
  63. localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
  64. break;
  65. default:
  66. break;
  67. }
  68. try {
  69. // 结束事务,根据本地事务决定消息事务是提交还是回滚,单向请求
  70. this.endTransaction(sendResult, localTransactionState, localException);
  71. } catch (Exception e) {
  72. log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
  73. }
  74. TransactionSendResult transactionSendResult = new TransactionSendResult();
  75. transactionSendResult.setSendStatus(sendResult.getSendStatus());
  76. transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
  77. transactionSendResult.setMsgId(sendResult.getMsgId());
  78. transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
  79. transactionSendResult.setTransactionId(sendResult.getTransactionId());
  80. transactionSendResult.setLocalTransactionState(localTransactionState);
  81. return transactionSendResult;
  82. }

可以看到发送的逻辑也是用发送普通消息的逻辑,只是在发送之前会先清除延时以及设置事务消息属性。
然后根据发送的结果进行分别处理:
1、如果发送成功,则执行监听器中的本地事务方法executeLocalTransaction,返回本地事务状态。

按我的理解,如果executeLocalTransaction方法,或者说执行发送事务消息的方法与本地事务(service中的方法)处于同一个事务内,则不管本地事务成功与否,都应该返回UNKNOW,交由broker回查来决定提交或回滚。因为同一个事务内,如果本地事务执行成功,但还未提交,向broker发送提交确认完后该机器宕机了,此时本地事务回滚,但发出去的确认消息无法收回,消费者会收到消息,导致数据不一致。所以发送UNKNOW,由broker来回查,没问题,消息确认,有问题,消息回滚。
如果没有处于同一个事务内,则直接根据本地事务的执行情况返回COMMIT_MESSAGE或ROLLBACK_MESSAGE

2、如果发送失败,则将本地事务状态设置为ROLLBACK_MESSAGE。
最后结束事务,根据上面的本地事务决定消息事务是提交还是回滚,单向请求。

接着看下broker端如果处理prepare状态的半消息。

Broker处理事务消息也是用SendMessageProcessor处理器处理。

  1. /** * 处理发送消息请求 */
  2. private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
  3. final RemotingCommand request,
  4. final SendMessageContext sendMessageContext,
  5. final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
  6. // 省略。。。
  7. // 校验是否不允许发送事务消息
  8. if (traFlag != null && Boolean.parseBoolean(traFlag)) {
  9. // 不允许发送事务消息
  10. if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
  11. response.setCode(ResponseCode.NO_PERMISSION);
  12. response.setRemark(
  13. "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
  14. + "] sending transaction message is forbidden");
  15. return response;
  16. }
  17. // 处理事务消息
  18. putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
  19. } else {
  20. // 使用MessageStore组件将消息存储在本地文件,只存储CommitLog文件,
  21. // ConsumerQueue文件和IndexFile文件会由后台线程异步存储
  22. putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
  23. }
  24. // 处理消息存储结果
  25. return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
  26. }

broker会判断是否是事务消息,事务消息则由事务消息服务处理。
TransactionalMessageServiceImpl

  1. @Override
  2. public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
  3. return transactionalMessageBridge.putHalfMessage(messageInner);
  4. }

TransactionalMessageBridge

  1. public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
  2. // 将消息存入事务消息专属的topic中,存储操作跟普通消息一样
  3. return store.putMessage(parseHalfMessageInner(messageInner));
  4. }
  5. /** * 转换事务消息,将原topic和queueId放到属性中,替换为topic为RMQ_SYS_TRANS_HALF_TOPIC, * queueId为0 */
  6. private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
  7. MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
  8. MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
  9. String.valueOf(msgInner.getQueueId()));
  10. msgInner.setSysFlag(
  11. MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
  12. msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
  13. msgInner.setQueueId(0);
  14. msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
  15. return msgInner;
  16. }

这里是事务消息与非事务消息发送流程的主要区别,如果是事务消息则备份消息的原主题与原消息消费队列,然后将主题变更为RMQ_ SYS_ TRANS_ HALF_ TOPIC,消费队列变更为0,然后消息按照普通消息存储在commitlog文件进而转发到RMQ_SYS_TRANS_HALF_ TOPIC主题对应的消息消费队列。也就是说,事务消息在未提交之前并不会存人消息原有主题,自然也不会被消费者消费。——摘自《RocketMQ技术内幕》

提交或回滚消息

上面说到,半消息发送完成后,会根据发送状态执行本地事务,再根据本地事务状态结束事务(提交或回滚消息)。
DefaultMQProducerImpl

  1. public void endTransaction(
  2. final SendResult sendResult,
  3. final LocalTransactionState localTransactionState,
  4. final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
  5. final MessageId id;
  6. if (sendResult.getOffsetMsgId() != null) {
  7. id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
  8. } else {
  9. id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
  10. }
  11. String transactionId = sendResult.getTransactionId();
  12. final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
  13. EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
  14. requestHeader.setTransactionId(transactionId);
  15. requestHeader.setCommitLogOffset(id.getOffset());
  16. // 根据本地状态设置提交或回滚
  17. switch (localTransactionState) {
  18. case COMMIT_MESSAGE:
  19. requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
  20. break;
  21. case ROLLBACK_MESSAGE:
  22. requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
  23. break;
  24. case UNKNOW:
  25. requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
  26. break;
  27. default:
  28. break;
  29. }
  30. requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
  31. requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
  32. requestHeader.setMsgId(sendResult.getMsgId());
  33. String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
  34. // 提交或回滚或不处理事务消息
  35. this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
  36. this.defaultMQProducer.getSendMsgTimeout());
  37. }

Broker端处理结束事务的处理器是EndTransactionProcessor

  1. @Override
  2. public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
  3. RemotingCommandException {
  4. final RemotingCommand response = RemotingCommand.createResponseCommand(null);
  5. final EndTransactionRequestHeader requestHeader =
  6. (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
  7. LOGGER.debug("Transaction request:{}", requestHeader);
  8. if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
  9. response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
  10. LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
  11. return response;
  12. }
  13. // 省略不重要代码。。
  14. OperationResult result = new OperationResult();
  15. // 提交消息
  16. if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
  17. // 根据requestHeader中的物理偏移量从commitLog中取出prepared状态的消息
  18. result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
  19. if (result.getResponseCode() == ResponseCode.SUCCESS) {
  20. RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
  21. if (res.getCode() == ResponseCode.SUCCESS) {
  22. // 根据PrepareMessage复制一份消息,topic为原主题
  23. MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
  24. msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
  25. msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
  26. msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
  27. msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
  28. // 将消息投递到原主题队列中
  29. RemotingCommand sendResult = sendFinalMessage(msgInner);
  30. if (sendResult.getCode() == ResponseCode.SUCCESS) {
  31. // 删除PrepareMessage,表示消息处理过了
  32. this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
  33. }
  34. return sendResult;
  35. }
  36. return res;
  37. }
  38. } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
  39. // 回滚消息,与提交消息的唯一区别是不会将消息转发给原主题
  40. result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
  41. if (result.getResponseCode() == ResponseCode.SUCCESS) {
  42. RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
  43. if (res.getCode() == ResponseCode.SUCCESS) {
  44. // 删除PrepareMessage,表示消息处理过了
  45. this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
  46. }
  47. return res;
  48. }
  49. }
  50. response.setCode(result.getResponseCode());
  51. response.setRemark(result.getResponseRemark());
  52. return response;
  53. }

删除PrepareMessage,表示消息处理过了

  1. /** * 删除PrepareMessage,其实是将原预处理消息存入一个新的主题RMQ_SYS_TRANS_OP_HALF_TOPIC, * 代表该消息已被处理 */
  2. @Override
  3. public boolean deletePrepareMessage(MessageExt msgExt) {
  4. if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
  5. log.debug("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
  6. return true;
  7. } else {
  8. log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
  9. return false;
  10. }
  11. }

回查事务状态

如果执行完本地事务返回本地事务状态为UN_KNOW时,结束事务时将不做任何处理,或者在发送提交或回滚时失败,此时就需要通过事务回查机制定时的去应用程序检查本地的事务状态来决定是否提交或回滚。
MQ通过TransactionalMessageCheckService线程服务定时去检测RMQ_SYS_TRANS_HALF_TOPIC主题中的消息,回查消息的事务状态。

  1. @Override
  2. public void run() {
  3. log.info("Start transaction check service thread!");
  4. // 检测间隔,默认60s
  5. long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
  6. while (!this.isStopped()) {
  7. // 最终调用onWaitEnd方法
  8. this.waitForRunning(checkInterval);
  9. }
  10. log.info("End transaction check service thread!");
  11. }
  12. @Override
  13. protected void onWaitEnd() {
  14. // 表示事务的过期时间,一个消息的存储时间 + 该值 大于系统当前时间,才对该消息执行事务状态回查
  15. long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
  16. // 最多检测几次,超过检测次数,则会回滚
  17. int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
  18. long begin = System.currentTimeMillis();
  19. log.info("Begin to check prepare message, begin time:{}", begin);
  20. this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
  21. log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
  22. }

TransactionalMessageServiceImpl

  1. @Override
  2. public void check(long transactionTimeout, int transactionCheckMax,
  3. AbstractTransactionalMessageCheckListener listener) {
  4. try {
  5. String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
  6. // 获取事务消息topic下的所有消息队列
  7. Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
  8. if (msgQueues == null || msgQueues.size() == 0) {
  9. log.warn("The queue of topic is empty :" + topic);
  10. return;
  11. }
  12. log.debug("Check topic={}, queues={}", topic, msgQueues);
  13. // 遍历所有消息队列
  14. for (MessageQueue messageQueue : msgQueues) {
  15. long startTime = System.currentTimeMillis();
  16. // 根据messageQueue从缓存获取操作队列,如果没有,则创建一个,topic为RMQ_SYS_TRANS_OP_HALF_TOPIC,
  17. // queueId与messageQueue的queueId一样
  18. MessageQueue opQueue = getOpQueue(messageQueue);
  19. // 获取两个队列的offset
  20. long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
  21. long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
  22. log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
  23. if (halfOffset < 0 || opOffset < 0) {
  24. log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
  25. halfOffset, opOffset);
  26. continue;
  27. }
  28. List<Long> doneOpOffset = new ArrayList<>();
  29. HashMap<Long, Long> removeMap = new HashMap<>();
  30. PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
  31. if (null == pullResult) {
  32. log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
  33. messageQueue, halfOffset, opOffset);
  34. continue;
  35. }
  36. // single thread
  37. int getMessageNullCount = 1;
  38. long newOffset = halfOffset;
  39. long i = halfOffset;
  40. while (true) {
  41. if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
  42. log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
  43. break;
  44. }
  45. if (removeMap.containsKey(i)) {
  46. log.info("Half offset {} has been committed/rolled back", i);
  47. Long removedOpOffset = removeMap.remove(i);
  48. doneOpOffset.add(removedOpOffset);
  49. } else {
  50. GetResult getResult = getHalfMsg(messageQueue, i);
  51. MessageExt msgExt = getResult.getMsg();
  52. if (msgExt == null) {
  53. if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
  54. break;
  55. }
  56. if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
  57. log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
  58. messageQueue, getMessageNullCount, getResult.getPullResult());
  59. break;
  60. } else {
  61. log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
  62. i, messageQueue, getMessageNullCount, getResult.getPullResult());
  63. i = getResult.getPullResult().getNextBeginOffset();
  64. newOffset = i;
  65. continue;
  66. }
  67. }
  68. if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
  69. listener.resolveDiscardMsg(msgExt);
  70. newOffset = i + 1;
  71. i++;
  72. continue;
  73. }
  74. if (msgExt.getStoreTimestamp() >= startTime) {
  75. log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
  76. new Date(msgExt.getStoreTimestamp()));
  77. break;
  78. }
  79. long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
  80. long checkImmunityTime = transactionTimeout;
  81. String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
  82. if (null != checkImmunityTimeStr) {
  83. checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
  84. if (valueOfCurrentMinusBorn < checkImmunityTime) {
  85. if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
  86. newOffset = i + 1;
  87. i++;
  88. continue;
  89. }
  90. }
  91. } else {
  92. if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
  93. log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
  94. checkImmunityTime, new Date(msgExt.getBornTimestamp()));
  95. break;
  96. }
  97. }
  98. List<MessageExt> opMsg = pullResult.getMsgFoundList();
  99. boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
  100. || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
  101. || (valueOfCurrentMinusBorn <= -1);
  102. if (isNeedCheck) {
  103. if (!putBackHalfMsgQueue(msgExt, i)) {
  104. continue;
  105. }
  106. // 消息回查
  107. listener.resolveHalfMsg(msgExt);
  108. } else {
  109. pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
  110. log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
  111. messageQueue, pullResult);
  112. continue;
  113. }
  114. }
  115. newOffset = i + 1;
  116. i++;
  117. }
  118. if (newOffset != halfOffset) {
  119. transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
  120. }
  121. long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
  122. if (newOpOffset != opOffset) {
  123. transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
  124. }
  125. }
  126. } catch (Exception e) {
  127. e.printStackTrace();
  128. log.error("Check error", e);
  129. }
  130. }

AbstractTransactionalMessageCheckListener

  1. public void resolveHalfMsg(final MessageExt msgExt) {
  2. executorService.execute(new Runnable() {
  3. @Override
  4. public void run() {
  5. try {
  6. sendCheckMessage(msgExt);
  7. } catch (Exception e) {
  8. LOGGER.error("Send check message error!", e);
  9. }
  10. }
  11. });
  12. }
  13. public void sendCheckMessage(MessageExt msgExt) throws Exception {
  14. CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
  15. checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
  16. checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
  17. checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
  18. checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
  19. checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
  20. msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
  21. msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
  22. msgExt.setStoreSize(0);
  23. String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
  24. Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
  25. if (channel != null) {
  26. // 发送回查请求
  27. brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
  28. } else {
  29. LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
  30. }
  31. }

生产端使用ClientRemotingProcessor处理请求

  1. public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
  2. RemotingCommand request) throws RemotingCommandException {
  3. final CheckTransactionStateRequestHeader requestHeader =
  4. (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
  5. final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
  6. final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
  7. if (messageExt != null) {
  8. if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
  9. messageExt.setTopic(NamespaceUtil
  10. .withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));
  11. }
  12. String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
  13. if (null != transactionId && !"".equals(transactionId)) {
  14. messageExt.setTransactionId(transactionId);
  15. }
  16. final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
  17. if (group != null) {
  18. MQProducerInner producer = this.mqClientFactory.selectProducer(group);
  19. if (producer != null) {
  20. final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
  21. // 检查本地事务状态
  22. producer.checkTransactionState(addr, messageExt, requestHeader);
  23. } else {
  24. log.debug("checkTransactionState, pick producer by group[{}] failed", group);
  25. }
  26. } else {
  27. log.warn("checkTransactionState, pick producer group failed");
  28. }
  29. } else {
  30. log.warn("checkTransactionState, decode message failed");
  31. }
  32. return null;
  33. }

DefaultMQProducerImpl

  1. @Override
  2. public void checkTransactionState(final String addr, final MessageExt msg,
  3. final CheckTransactionStateRequestHeader header) {
  4. Runnable request = new Runnable() {
  5. private final String brokerAddr = addr;
  6. private final MessageExt message = msg;
  7. private final CheckTransactionStateRequestHeader checkRequestHeader = header;
  8. private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
  9. @Override
  10. public void run() {
  11. TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
  12. TransactionListener transactionListener = getCheckListener();
  13. if (transactionCheckListener != null || transactionListener != null) {
  14. LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
  15. Throwable exception = null;
  16. try {
  17. if (transactionCheckListener != null) {
  18. localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
  19. } else if (transactionListener != null) {
  20. log.debug("Used new check API in transaction message");
  21. // 回查本地事务状态
  22. localTransactionState = transactionListener.checkLocalTransaction(message);
  23. } else {
  24. log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
  25. }
  26. } catch (Throwable e) {
  27. log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
  28. exception = e;
  29. }
  30. this.processTransactionState(
  31. localTransactionState,
  32. group,
  33. exception);
  34. } else {
  35. log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
  36. }
  37. }
  38. private void processTransactionState(
  39. final LocalTransactionState localTransactionState,
  40. final String producerGroup,
  41. final Throwable exception) {
  42. final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
  43. thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
  44. thisHeader.setProducerGroup(producerGroup);
  45. thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
  46. thisHeader.setFromTransactionCheck(true);
  47. String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
  48. if (uniqueKey == null) {
  49. uniqueKey = message.getMsgId();
  50. }
  51. thisHeader.setMsgId(uniqueKey);
  52. thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
  53. switch (localTransactionState) {
  54. case COMMIT_MESSAGE:
  55. thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
  56. break;
  57. case ROLLBACK_MESSAGE:
  58. thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
  59. log.warn("when broker check, client rollback this transaction, {}", thisHeader);
  60. break;
  61. case UNKNOW:
  62. thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
  63. log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
  64. break;
  65. default:
  66. break;
  67. }
  68. String remark = null;
  69. if (exception != null) {
  70. remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
  71. }
  72. try {
  73. // 根据回查本地事务的状态提交或回滚消息,单向请求
  74. DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
  75. 3000);
  76. } catch (Exception e) {
  77. log.error("endTransactionOneway exception", e);
  78. }
  79. }
  80. };
  81. // 将回查任务提交给线程池执行
  82. this.checkExecutor.submit(request);
  83. }

通过调用自定义的监听器回查方法,检测本地事务状态,再根据回查本地事务的状态提交或回滚消息,单向请求。
在这里插入图片描述

参考资料

《儒猿技术窝——从 0 开始带你成为消息中间件实战高手》

发表评论

表情:
评论列表 (有 0 条评论,96人围观)

还没有评论,来说两句吧...

相关阅读