深入理解RocketMQ事务源码--TransactionMQProducer、TransactionListener
前言
上篇文章,介绍了RocketMQ消息类型支持事务消息,常见用在分布式系统中,保证数据的一致性。接下来一起去走进 TransactionMQProducer、TransactionListener俩个核心类
一.基于apache-rocketmq事务消息的实现
(1) pom.xml配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--lombok插件 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<!--RocketMQ-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
(2)事务消息生产者配置
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.springframework.stereotype.Component;
@Component
public class PayProducer {
/**
* 生产组,消费者通过订阅发布,在同一个组内
*/
private String producerGroup = "pay_group";
/**
* 端口
*/
private String nameServer = "127.0.0.1:9876";
//事务监听,其中rocketMQLisenter实现TransactionListener接口
private RocketMQLisenter rocketMQLisenter;
//事务消息生产者配置
private TransactionMQProducer producer;
public PayProducer() {
producer = new TransactionMQProducer(producerGroup);
rocketMQLisenter = new RocketMQLisenter();
// 指定nameServer地址,多个地址之间以 ; 隔开
producer.setNamesrvAddr(nameServer);
producer.setTransactionListener(rocketMQLisenter);
start();
}
public TransactionMQProducer getProducer() {
return producer;
}
/**
* 对象在使用之前必须调用一次,并且只能初始化一次
*/
public void start() {
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* 一般在应用上下文,使用上下文监听器,进行关闭
*/
public void shutdown() {
producer.shutdown();
}
}
(3)事务监听器接口实现
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
@Slf4j
public class RocketMQLisenter implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info("=========本地事务开始执行=============");
String message = new String(msg.getBody());
int type = 0;
// 事务消息id,唯一
String tid = msg.getTransactionId();
//todo 为了解决重复消息《==》幂等性,可以将tid放入redis,并设置状态,消费端,每次从redis中获取事务id,根据状态并判断是否被消费过
//模拟执行本地事务begin=======
/**
* 本地事务执行会有三种可能
* 1、commit 成功
* 2、Rollback 失败
* 3、网络等原因服务宕机收不到返回结果
*/
log.info("本地事务执行参数----------------------");
//模拟执行本地事务end========
//TODO 实际开发下面不需要我们手动返回,而是根据本地事务执行结果自动返回
//1、二次确认消息,然后消费者可以消费
if (type == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
//2、回滚消息,Broker端会删除半消息
if (type == 1) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
//3、Broker端会进行回查消息
if (type == 2) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
/**
* 当executeLocalTransaction ,返回LocalTransactionState.UNKNOW 调用此方法
* @param messageExt
* @return
*/
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
log.info("==========回查接口=========");
String key = messageExt.getKeys();
//TODO 1、必须根据key先去检查本地事务消息是否完成。
/**
* 因为有种情况就是:上面本地事务执行成功了,但是return LocalTransactionState.COMMIT_MESSAG的时候
* 服务挂了,那么最终 Brock还未收到消息的二次确定,还是个半消息 ,所以当重新启动的时候还是回调这个回调接口。
* 如果不先查询上面本地事务的执行情况 直接在执行本地事务,那么就相当于成功执行了两次本地事务了。
*/
// TODO 2、这里返回要么commit 要么rollback。没有必要在返回 UNKNOW
return LocalTransactionState.COMMIT_MESSAGE;
}
}
(4)消费者配置
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
@Component
public class PayConsumer {
private DefaultMQPushConsumer consumer;
private String consumerGroup = "pay_group";
public PayConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(Topic.NAME_SERVER);
// 设置消费地点,从最后一个进行消费(其实就是消费策略)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 订阅主题的哪些Topic,及标签
consumer.subscribe("order_topic", "*");
// 注册监听器
consumer.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
try {
// 获取Message
Message msg = msgs.get(0);
//获取事务id,并从redis获取事务的状态,判断是否消费过。
String tid = msg.getTransactionId();
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
String topic = msg.getTopic();
String body = new String(msg.getBody(), "utf-8");
// 标签
String tags = msg.getTags();
String keys = msg.getKeys();
System.out.println("topic=" + topic + ", tags=" + tags + ",keys=" + keys + ", msg=" + body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.out.println("Consumer Listener");
}
}
(5)通过生产者发送事务消息
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class PayController {
@Autowired
private PayProducer payProducer;
private static final String topic = "order_topic";
@GetMapping("/pay")
@ResponseBody
public void callback(int type) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
// 创建消息 主题 二级分类 消息内容好的字节数组
Message message = new Message(topic, "taga", ("rocketMQ").getBytes());
//调用TransactionMQProducer的sendMessageInTransaction事务方法,并获取发送结果
SendResult send = payProducer.getProducer().sendMessageInTransaction(message,type);
}
}
二.TransactionMQProducer、TransactionListener源码阅读
1.TransactionMQProducer,可以看出继承DefaultMQProducer类,核心方法是sendMessageInTransaction(Message msg,Object org)
public class TransactionMQProducer extends DefaultMQProducer {
private TransactionListener transactionListener;
private ExecutorService executorService;
public TransactionMQProducer() {
}
public TransactionMQProducer(String producerGroup) {
super(producerGroup);
}
public TransactionMQProducer(String producerGroup, RPCHook rpcHook) {
super(producerGroup, rpcHook);
}
public void start() throws MQClientException {
this.defaultMQProducerImpl.initTransactionEnv();
super.start();
}
public void shutdown() {
super.shutdown();
this.defaultMQProducerImpl.destroyTransactionEnv();
}
public TransactionSendResult sendMessageInTransaction(Message msg, Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", (Throwable)null);
} else {
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, this.transactionListener, arg);
}
}
public TransactionListener getTransactionListener() {
return this.transactionListener;
}
public void setTransactionListener(TransactionListener transactionListener) {
this.transactionListener = transactionListener;
}
public ExecutorService getExecutorService() {
return this.executorService;
}
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
}
2.DefaultMQProducer.sendMessageInTransaction方法具体实现,
(1)可以看出先获取消息发送到broker的状态(可以理解成半状态,等待本地事务执行结果,执行commit或在rollback),没有成功则返回事务状态设置为LocalTransactionState.ROLLBACK_MESSAGE;发送成功,则获取消息事务的ID(每个消息都会由一个transactionId)
(2)接着调用TransactionListener.executeLocalTransaction方法(上面已经沾贴了事务监听器的实现代码),执行本地事务。
/**
* 本地事务执行会有三种可能
* 1、commit 成功 ,对应 return LocalTransactionState.COMMIT_MESSAGE;
* 2、Rollback 失败,对应 return LocalTransactionState.ROLLBACK_MESSAGE;
* 3、网络等原因服务宕机收不到返回结果,对应return LocalTransactionState.UNKNOW;
*/
(3)调用this.endTransaction(sendResult, localTransactionState, localException)方法,将本地事务执行的状态、消息发送的状态、异常信息,一起发送给broker,broker会根据提交的信息,确定是否确定投递消息、删除消息、或在回查本地事务。
更多详细信息,请查看事务消息
public TransactionSendResult sendMessageInTransaction(Message msg, TransactionListener tranExecuter, Object arg) throws MQClientException {
if (null == tranExecuter) {
throw new MQClientException("tranExecutor is null", (Throwable)null);
} else {
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
MessageAccessor.putProperty(msg, "TRAN_MSG", "true");
MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception var10) {
throw new MQClientException("send message Exception", var10);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch(sendResult.getSendStatus()) {
case SEND_OK:
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty("UNIQ_KEY");
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
this.log.info("executeLocalTransactionBranch return {}", localTransactionState);
this.log.info(msg.toString());
}
} catch (Throwable var9) {
this.log.info("executeLocalTransactionBranch exception", var9);
this.log.info(msg.toString());
localException = var9;
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
}
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception var8) {
this.log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", var8);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
}
//将本地事务执行情况,及消息发送状态,异常信息,一起发送给broker,broker根据提交的信息,确定是否确定投递消息、删除消息、或在回查本地事务
public void endTransaction(SendResult sendResult, LocalTransactionState localTransactionState, Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch(localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(8);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(12);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(0);
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? "executeLocalTransactionBranch exception: " + localException.toString() : null;
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, (long)this.defaultMQProducer.getSendMsgTimeout());
}
还没有评论,来说两句吧...