RocketMQ原理学习---生产者事物消息发送

Myth丶恋晨 2022-04-11 02:45 386阅读 0赞
  1. 上一篇博客[《RocketMQ原理学习---生产者普通消息发送》][RocketMQ_---]我们已经对生产者发送普通消息有了简单的了解,这篇博客我们来学习一下RocketMQ在发送事物消息时做了什么处理操作。

一、生产者发送消息

  1. RocketMQ通过实现2PC协议来实现分布式事物,RocketMQ事物消息发送与消费流程图:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxOTI0ODYyMDc3_size_16_color_FFFFFF_t_70

接下来我们通过源码看看RocketMQ生产者在发送事物消息的时候做了什么操作。

调用事物消息接口发送事物消息。

  1. public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, Object arg) throws MQClientException {
  2. if (null == this.transactionCheckListener) {
  3. throw new MQClientException("localTransactionBranchCheckListener is null", (Throwable)null);
  4. } else {
  5. //发送事物消息
  6. return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
  7. }
  8. }

在sendMessageInTransaction可以看到会进行如下操作:

(1)调用 sendResult = this.send(msg); 发送消息获取结果

(2)执行本地事物localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);

(3)将执行本地事物的结果再调用this.endTransaction(sendResult, localTransactionState, localException);

  1. public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, Object arg) throws MQClientException {
  2. if (null == tranExecuter) {
  3. throw new MQClientException("tranExecutor is null", (Throwable)null);
  4. } else {
  5. Validators.checkMessage(msg, this.defaultMQProducer);
  6. SendResult sendResult = null;
  7. MessageAccessor.putProperty(msg, "TRAN_MSG", "true");
  8. MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup());
  9. try {
  10. //发送消息
  11. sendResult = this.send(msg);
  12. } catch (Exception var10) {
  13. throw new MQClientException("send message Exception", var10);
  14. }
  15. LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
  16. Throwable localException = null;
  17. switch(sendResult.getSendStatus()) {
  18. case SEND_OK:
  19. try {
  20. if (sendResult.getTransactionId() != null) {
  21. msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
  22. }
  23. //执行本地事物
  24. localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
  25. if (null == localTransactionState) {
  26. localTransactionState = LocalTransactionState.UNKNOW;
  27. }
  28. if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
  29. this.log.info("executeLocalTransactionBranch return {}", localTransactionState);
  30. this.log.info(msg.toString());
  31. }
  32. } catch (Throwable var9) {
  33. this.log.info("executeLocalTransactionBranch exception", var9);
  34. this.log.info(msg.toString());
  35. localException = var9;
  36. }
  37. break;
  38. case FLUSH_DISK_TIMEOUT:
  39. case FLUSH_SLAVE_TIMEOUT:
  40. case SLAVE_NOT_AVAILABLE:
  41. localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
  42. }
  43. try {
  44. //提交执行结果
  45. this.endTransaction(sendResult, localTransactionState, localException);
  46. } catch (Exception var8) {
  47. this.log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", var8);
  48. }
  49. TransactionSendResult transactionSendResult = new TransactionSendResult();
  50. transactionSendResult.setSendStatus(sendResult.getSendStatus());
  51. transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
  52. transactionSendResult.setMsgId(sendResult.getMsgId());
  53. transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
  54. transactionSendResult.setTransactionId(sendResult.getTransactionId());
  55. transactionSendResult.setLocalTransactionState(localTransactionState);
  56. return transactionSendResult;
  57. }
  58. }

在endTransaction方法中会根据本地事物执行结果的状态,将状态信息发送到broker

  1. public void endTransaction(SendResult sendResult, LocalTransactionState localTransactionState, Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
  2. MessageId id;
  3. if (sendResult.getOffsetMsgId() != null) {
  4. id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
  5. } else {
  6. id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
  7. }
  8. String transactionId = sendResult.getTransactionId();
  9. String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
  10. EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
  11. requestHeader.setTransactionId(transactionId);
  12. requestHeader.setCommitLogOffset(id.getOffset());
  13. switch(localTransactionState) {
  14. case COMMIT_MESSAGE:
  15. requestHeader.setCommitOrRollback(8);
  16. break;
  17. case ROLLBACK_MESSAGE:
  18. requestHeader.setCommitOrRollback(12);
  19. break;
  20. case UNKNOW:
  21. requestHeader.setCommitOrRollback(0);
  22. }
  23. requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
  24. requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
  25. requestHeader.setMsgId(sendResult.getMsgId());
  26. String remark = localException != null ? "executeLocalTransactionBranch exception: " + localException.toString() : null;
  27. this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, (long)this.defaultMQProducer.getSendMsgTimeout());
  28. }

二、Broker处理事物消息

一条完整消息的要经过如下处理:

(1)将消息持久化到commitlog文件中

(2)创建consumequeue文件

(3)创建index文件

1、普通消息在消息发送成功后会进行上面三步创建操作

2、事物消息prepare阶段只会进行第一步commitlog文件创建(此时消费者是无法消费消息的),当commit阶段时会进行第二部操作创建consumequeue文件,这样消费者才可以消费消息。

  1. class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
  2. @Override
  3. public void dispatch(DispatchRequest request) {
  4. final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
  5. switch (tranType) {
  6. //普通消息
  7. case MessageSysFlag.TRANSACTION_NOT_TYPE:
  8. //确认事物消息
  9. case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
  10. DefaultMessageStore.this.putMessagePositionInfo(request);
  11. break;
  12. //准备事物消息
  13. case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
  14. //回滚事物消息
  15. case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
  16. break;
  17. }
  18. }
  19. }

发表评论

表情:
评论列表 (有 0 条评论,386人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RocketMQ原理学习--消息类型

    一、消费模式 集群消费:当使用集群消费模式时,MQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。 广播消费:当使用广播消费模式时,MQ 会将每条消息推送给

    相关 kafka生产者发送消息

    本文简单介绍kafka发送消息一些基础,先上代码,复制粘贴然后根据自己情况改一下ip地址,可直接发消息!!!贼强!!! package producer;