深入理解RocketMQ事务源码--TransactionMQProducer、TransactionListener

朴灿烈づ我的快乐病毒、 2022-12-28 11:09 164阅读 0赞

前言

上篇文章,介绍了RocketMQ消息类型支持事务消息,常见用在分布式系统中,保证数据的一致性。接下来一起去走进 TransactionMQProducer、TransactionListener俩个核心类

一.基于apache-rocketmq事务消息的实现

(1) pom.xml配置

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-web</artifactId>
  8. </dependency>
  9. <!--lombok插件 -->
  10. <dependency>
  11. <groupId>org.projectlombok</groupId>
  12. <artifactId>lombok</artifactId>
  13. <scope>provided</scope>
  14. </dependency>
  15. <!--RocketMQ-->
  16. <dependency>
  17. <groupId>org.apache.rocketmq</groupId>
  18. <artifactId>rocketmq-client</artifactId>
  19. <version>4.3.0</version>
  20. </dependency>

(2)事务消息生产者配置

  1. import org.apache.rocketmq.client.exception.MQClientException;
  2. import org.apache.rocketmq.client.producer.TransactionMQProducer;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class PayProducer {
  6. /**
  7. * 生产组,消费者通过订阅发布,在同一个组内
  8. */
  9. private String producerGroup = "pay_group";
  10. /**
  11. * 端口
  12. */
  13. private String nameServer = "127.0.0.1:9876";
  14. //事务监听,其中rocketMQLisenter实现TransactionListener接口
  15. private RocketMQLisenter rocketMQLisenter;
  16. //事务消息生产者配置
  17. private TransactionMQProducer producer;
  18. public PayProducer() {
  19. producer = new TransactionMQProducer(producerGroup);
  20. rocketMQLisenter = new RocketMQLisenter();
  21. // 指定nameServer地址,多个地址之间以 ; 隔开
  22. producer.setNamesrvAddr(nameServer);
  23. producer.setTransactionListener(rocketMQLisenter);
  24. start();
  25. }
  26. public TransactionMQProducer getProducer() {
  27. return producer;
  28. }
  29. /**
  30. * 对象在使用之前必须调用一次,并且只能初始化一次
  31. */
  32. public void start() {
  33. try {
  34. this.producer.start();
  35. } catch (MQClientException e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. /**
  40. * 一般在应用上下文,使用上下文监听器,进行关闭
  41. */
  42. public void shutdown() {
  43. producer.shutdown();
  44. }
  45. }

(3)事务监听器接口实现

  1. import com.alibaba.fastjson.JSONObject;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.rocketmq.client.producer.LocalTransactionState;
  4. import org.apache.rocketmq.client.producer.TransactionListener;
  5. import org.apache.rocketmq.common.message.Message;
  6. import org.apache.rocketmq.common.message.MessageExt;
  7. @Slf4j
  8. public class RocketMQLisenter implements TransactionListener {
  9. @Override
  10. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  11. log.info("=========本地事务开始执行=============");
  12. String message = new String(msg.getBody());
  13. int type = 0;
  14. // 事务消息id,唯一
  15. String tid = msg.getTransactionId();
  16. //todo 为了解决重复消息《==》幂等性,可以将tid放入redis,并设置状态,消费端,每次从redis中获取事务id,根据状态并判断是否被消费过
  17. //模拟执行本地事务begin=======
  18. /**
  19. * 本地事务执行会有三种可能
  20. * 1、commit 成功
  21. * 2、Rollback 失败
  22. * 3、网络等原因服务宕机收不到返回结果
  23. */
  24. log.info("本地事务执行参数----------------------");
  25. //模拟执行本地事务end========
  26. //TODO 实际开发下面不需要我们手动返回,而是根据本地事务执行结果自动返回
  27. //1、二次确认消息,然后消费者可以消费
  28. if (type == 0) {
  29. return LocalTransactionState.COMMIT_MESSAGE;
  30. }
  31. //2、回滚消息,Broker端会删除半消息
  32. if (type == 1) {
  33. return LocalTransactionState.ROLLBACK_MESSAGE;
  34. }
  35. //3、Broker端会进行回查消息
  36. if (type == 2) {
  37. return LocalTransactionState.UNKNOW;
  38. }
  39. return LocalTransactionState.COMMIT_MESSAGE;
  40. }
  41. /**
  42. * 当executeLocalTransaction ,返回LocalTransactionState.UNKNOW 调用此方法
  43. * @param messageExt
  44. * @return
  45. */
  46. public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
  47. log.info("==========回查接口=========");
  48. String key = messageExt.getKeys();
  49. //TODO 1、必须根据key先去检查本地事务消息是否完成。
  50. /**
  51. * 因为有种情况就是:上面本地事务执行成功了,但是return LocalTransactionState.COMMIT_MESSAG的时候
  52. * 服务挂了,那么最终 Brock还未收到消息的二次确定,还是个半消息 ,所以当重新启动的时候还是回调这个回调接口。
  53. * 如果不先查询上面本地事务的执行情况 直接在执行本地事务,那么就相当于成功执行了两次本地事务了。
  54. */
  55. // TODO 2、这里返回要么commit 要么rollback。没有必要在返回 UNKNOW
  56. return LocalTransactionState.COMMIT_MESSAGE;
  57. }
  58. }

(4)消费者配置

  1. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  2. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  3. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  4. import org.apache.rocketmq.client.exception.MQClientException;
  5. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  6. import org.apache.rocketmq.common.message.Message;
  7. import org.springframework.stereotype.Component;
  8. import java.io.UnsupportedEncodingException;
  9. @Component
  10. public class PayConsumer {
  11. private DefaultMQPushConsumer consumer;
  12. private String consumerGroup = "pay_group";
  13. public PayConsumer() throws MQClientException {
  14. consumer = new DefaultMQPushConsumer(consumerGroup);
  15. consumer.setNamesrvAddr(Topic.NAME_SERVER);
  16. // 设置消费地点,从最后一个进行消费(其实就是消费策略)
  17. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  18. // 订阅主题的哪些Topic,及标签
  19. consumer.subscribe("order_topic", "*");
  20. // 注册监听器
  21. consumer.registerMessageListener((MessageListenerConcurrently)
  22. (msgs, context) -> {
  23. try {
  24. // 获取Message
  25. Message msg = msgs.get(0);
  26. //获取事务id,并从redis获取事务的状态,判断是否消费过。
  27. String tid = msg.getTransactionId();
  28. System.out.printf("%s Receive New Messages: %s %n",
  29. Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
  30. String topic = msg.getTopic();
  31. String body = new String(msg.getBody(), "utf-8");
  32. // 标签
  33. String tags = msg.getTags();
  34. String keys = msg.getKeys();
  35. System.out.println("topic=" + topic + ", tags=" + tags + ",keys=" + keys + ", msg=" + body);
  36. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  37. } catch (UnsupportedEncodingException e) {
  38. e.printStackTrace();
  39. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  40. }
  41. });
  42. consumer.start();
  43. System.out.println("Consumer Listener");
  44. }
  45. }

(5)通过生产者发送事务消息

  1. import org.apache.rocketmq.client.exception.MQBrokerException;
  2. import org.apache.rocketmq.client.exception.MQClientException;
  3. import org.apache.rocketmq.client.producer.SendResult;
  4. import org.apache.rocketmq.common.message.Message;
  5. import org.apache.rocketmq.remoting.exception.RemotingException;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.ResponseBody;
  9. import org.springframework.web.bind.annotation.RestController;
  10. @RestController
  11. public class PayController {
  12. @Autowired
  13. private PayProducer payProducer;
  14. private static final String topic = "order_topic";
  15. @GetMapping("/pay")
  16. @ResponseBody
  17. public void callback(int type) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
  18. // 创建消息 主题 二级分类 消息内容好的字节数组
  19. Message message = new Message(topic, "taga", ("rocketMQ").getBytes());
  20. //调用TransactionMQProducer的sendMessageInTransaction事务方法,并获取发送结果
  21. SendResult send = payProducer.getProducer().sendMessageInTransaction(message,type);
  22. }
  23. }

二.TransactionMQProducer、TransactionListener源码阅读

1.TransactionMQProducer,可以看出继承DefaultMQProducer类,核心方法是sendMessageInTransaction(Message msg,Object org)

  1. public class TransactionMQProducer extends DefaultMQProducer {
  2. private TransactionListener transactionListener;
  3. private ExecutorService executorService;
  4. public TransactionMQProducer() {
  5. }
  6. public TransactionMQProducer(String producerGroup) {
  7. super(producerGroup);
  8. }
  9. public TransactionMQProducer(String producerGroup, RPCHook rpcHook) {
  10. super(producerGroup, rpcHook);
  11. }
  12. public void start() throws MQClientException {
  13. this.defaultMQProducerImpl.initTransactionEnv();
  14. super.start();
  15. }
  16. public void shutdown() {
  17. super.shutdown();
  18. this.defaultMQProducerImpl.destroyTransactionEnv();
  19. }
  20. public TransactionSendResult sendMessageInTransaction(Message msg, Object arg) throws MQClientException {
  21. if (null == this.transactionListener) {
  22. throw new MQClientException("TransactionListener is null", (Throwable)null);
  23. } else {
  24. return this.defaultMQProducerImpl.sendMessageInTransaction(msg, this.transactionListener, arg);
  25. }
  26. }
  27. public TransactionListener getTransactionListener() {
  28. return this.transactionListener;
  29. }
  30. public void setTransactionListener(TransactionListener transactionListener) {
  31. this.transactionListener = transactionListener;
  32. }
  33. public ExecutorService getExecutorService() {
  34. return this.executorService;
  35. }
  36. public void setExecutorService(ExecutorService executorService) {
  37. this.executorService = executorService;
  38. }
  39. }

2.DefaultMQProducer.sendMessageInTransaction方法具体实现,

(1)可以看出先获取消息发送到broker的状态(可以理解成半状态,等待本地事务执行结果,执行commit或在rollback),没有成功则返回事务状态设置为LocalTransactionState.ROLLBACK_MESSAGE;发送成功,则获取消息事务的ID(每个消息都会由一个transactionId)

(2)接着调用TransactionListener.executeLocalTransaction方法(上面已经沾贴了事务监听器的实现代码),执行本地事务。

  1. /**
  2. * 本地事务执行会有三种可能
  3. * 1、commit 成功 ,对应 return LocalTransactionState.COMMIT_MESSAGE;
  4. * 2、Rollback 失败,对应 return LocalTransactionState.ROLLBACK_MESSAGE;
  5. * 3、网络等原因服务宕机收不到返回结果,对应return LocalTransactionState.UNKNOW;
  6. */

(3)调用this.endTransaction(sendResult, localTransactionState, localException)方法,将本地事务执行的状态、消息发送的状态、异常信息,一起发送给broker,broker会根据提交的信息,确定是否确定投递消息、删除消息、或在回查本地事务。

更多详细信息,请查看事务消息

  1. public TransactionSendResult sendMessageInTransaction(Message msg, TransactionListener tranExecuter, Object arg) throws MQClientException {
  2. if (null == tranExecuter) {
  3. throw new MQClientException("tranExecutor is null", (Throwable)null);
  4. } else {
  5. Validators.checkMessage(msg, this.defaultMQProducer);
  6. SendResult sendResult = null;
  7. MessageAccessor.putProperty(msg, "TRAN_MSG", "true");
  8. MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup());
  9. try {
  10. sendResult = this.send(msg);
  11. } catch (Exception var10) {
  12. throw new MQClientException("send message Exception", var10);
  13. }
  14. LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
  15. Throwable localException = null;
  16. switch(sendResult.getSendStatus()) {
  17. case SEND_OK:
  18. try {
  19. if (sendResult.getTransactionId() != null) {
  20. msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
  21. }
  22. String transactionId = msg.getProperty("UNIQ_KEY");
  23. if (null != transactionId && !"".equals(transactionId)) {
  24. msg.setTransactionId(transactionId);
  25. }
  26. localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);
  27. if (null == localTransactionState) {
  28. localTransactionState = LocalTransactionState.UNKNOW;
  29. }
  30. if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
  31. this.log.info("executeLocalTransactionBranch return {}", localTransactionState);
  32. this.log.info(msg.toString());
  33. }
  34. } catch (Throwable var9) {
  35. this.log.info("executeLocalTransactionBranch exception", var9);
  36. this.log.info(msg.toString());
  37. localException = var9;
  38. }
  39. break;
  40. case FLUSH_DISK_TIMEOUT:
  41. case FLUSH_SLAVE_TIMEOUT:
  42. case SLAVE_NOT_AVAILABLE:
  43. localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
  44. }
  45. try {
  46. this.endTransaction(sendResult, localTransactionState, localException);
  47. } catch (Exception var8) {
  48. this.log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", var8);
  49. }
  50. TransactionSendResult transactionSendResult = new TransactionSendResult();
  51. transactionSendResult.setSendStatus(sendResult.getSendStatus());
  52. transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
  53. transactionSendResult.setMsgId(sendResult.getMsgId());
  54. transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
  55. transactionSendResult.setTransactionId(sendResult.getTransactionId());
  56. transactionSendResult.setLocalTransactionState(localTransactionState);
  57. return transactionSendResult;
  58. }
  59. }
  60. //将本地事务执行情况,及消息发送状态,异常信息,一起发送给broker,broker根据提交的信息,确定是否确定投递消息、删除消息、或在回查本地事务
  61. public void endTransaction(SendResult sendResult, LocalTransactionState localTransactionState, Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
  62. MessageId id;
  63. if (sendResult.getOffsetMsgId() != null) {
  64. id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
  65. } else {
  66. id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
  67. }
  68. String transactionId = sendResult.getTransactionId();
  69. String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
  70. EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
  71. requestHeader.setTransactionId(transactionId);
  72. requestHeader.setCommitLogOffset(id.getOffset());
  73. switch(localTransactionState) {
  74. case COMMIT_MESSAGE:
  75. requestHeader.setCommitOrRollback(8);
  76. break;
  77. case ROLLBACK_MESSAGE:
  78. requestHeader.setCommitOrRollback(12);
  79. break;
  80. case UNKNOW:
  81. requestHeader.setCommitOrRollback(0);
  82. }
  83. requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
  84. requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
  85. requestHeader.setMsgId(sendResult.getMsgId());
  86. String remark = localException != null ? "executeLocalTransactionBranch exception: " + localException.toString() : null;
  87. this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, (long)this.defaultMQProducer.getSendMsgTimeout());
  88. }

发表评论

表情:
评论列表 (有 0 条评论,164人围观)

还没有评论,来说两句吧...

相关阅读