RocketMQ事务消费和顺序消费详解

刺骨的言语ヽ痛彻心扉 2023-02-25 10:22 69阅读 0赞

一、RocketMq有3中消息类型

1.普通消费

  1. 顺序消费

3.事务消费

  • 顺序消费场景

在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一、创建订单 ,第二:订单付款,第三:订单完成。也就是这个三个环节要有顺序,这个订单才有意义。RocketMQ可以保证顺序消费。

  • rocketMq实现顺序消费的原理

    produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息

注意:是把把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue

单个节点(Producer端1个、Consumer端1个)

1、Producer.java

复制代码

  1. package order;
  2. import java.util.List;
  3. import com.alibaba.rocketmq.client.exception.MQBrokerException;
  4. import com.alibaba.rocketmq.client.exception.MQClientException;
  5. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
  6. import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
  7. import com.alibaba.rocketmq.client.producer.SendResult;
  8. import com.alibaba.rocketmq.common.message.Message;
  9. import com.alibaba.rocketmq.common.message.MessageQueue;
  10. import com.alibaba.rocketmq.remoting.exception.RemotingException;
  11. /**
  12. * Producer,发送顺序消息
  13. */
  14. public class Producer {
  15. public static void main(String[] args) {
  16. try {
  17. DefaultMQProducer producer = new DefaultMQProducer("order_Producer");
  18. producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
  19. producer.start();
  20. // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",
  21. // "TagE" };
  22. for (int i = 1; i <= 5; i++) {
  23. Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());
  24. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
  25. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  26. Integer id = (Integer) arg;
  27. int index = id % mqs.size();
  28. return mqs.get(index);
  29. }
  30. }, 0);
  31. System.out.println(sendResult);
  32. }
  33. producer.shutdown();
  34. } catch (MQClientException e) {
  35. e.printStackTrace();
  36. } catch (RemotingException e) {
  37. e.printStackTrace();
  38. } catch (MQBrokerException e) {
  39. e.printStackTrace();
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. }

复制代码

2、Consumer.java

复制代码

  1. package order;
  2. import java.util.List;
  3. import java.util.concurrent.TimeUnit;
  4. import java.util.concurrent.atomic.AtomicLong;
  5. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
  6. import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
  7. import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
  8. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
  9. import com.alibaba.rocketmq.client.exception.MQClientException;
  10. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
  11. import com.alibaba.rocketmq.common.message.MessageExt;
  12. /**
  13. * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
  14. */
  15. public class Consumer1 {
  16. public static void main(String[] args) throws MQClientException {
  17. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
  18. consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
  19. /**
  20. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
  21. * 如果非第一次启动,那么按照上次消费的位置继续消费
  22. */
  23. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  24. consumer.subscribe("TopicOrderTest", "*");
  25. consumer.registerMessageListener(new MessageListenerOrderly() {
  26. AtomicLong consumeTimes = new AtomicLong(0);
  27. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  28. // 设置自动提交
  29. context.setAutoCommit(true);
  30. for (MessageExt msg : msgs) {
  31. System.out.println(msg + ",内容:" + new String(msg.getBody()));
  32. }
  33. try {
  34. TimeUnit.SECONDS.sleep(5L);
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. }
  38. ;
  39. return ConsumeOrderlyStatus.SUCCESS;
  40. }
  41. });
  42. consumer.start();
  43. System.out.println("Consumer1 Started.");
  44. }
  45. }

复制代码

结果如下图所示:

format_png

这个五条数据被顺序消费了

  • 多个节点(Producer端1个、Consumer端2个)

Producer.java

复制代码

  1. package order;
  2. import java.util.List;
  3. import com.alibaba.rocketmq.client.exception.MQBrokerException;
  4. import com.alibaba.rocketmq.client.exception.MQClientException;
  5. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
  6. import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
  7. import com.alibaba.rocketmq.client.producer.SendResult;
  8. import com.alibaba.rocketmq.common.message.Message;
  9. import com.alibaba.rocketmq.common.message.MessageQueue;
  10. import com.alibaba.rocketmq.remoting.exception.RemotingException;
  11. /**
  12. * Producer,发送顺序消息
  13. */
  14. public class Producer {
  15. public static void main(String[] args) {
  16. try {
  17. DefaultMQProducer producer = new DefaultMQProducer("order_Producer");
  18. producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
  19. producer.start();
  20. // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",
  21. // "TagE" };
  22. for (int i = 1; i <= 5; i++) {
  23. Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());
  24. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
  25. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  26. Integer id = (Integer) arg;
  27. int index = id % mqs.size();
  28. return mqs.get(index);
  29. }
  30. }, 0);
  31. System.out.println(sendResult);
  32. }
  33. for (int i = 1; i <= 5; i++) {
  34. Message msg = new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " + i).getBytes());
  35. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
  36. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  37. Integer id = (Integer) arg;
  38. int index = id % mqs.size();
  39. return mqs.get(index);
  40. }
  41. }, 1);
  42. System.out.println(sendResult);
  43. }
  44. for (int i = 1; i <= 5; i++) {
  45. Message msg = new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " + i).getBytes());
  46. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
  47. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  48. Integer id = (Integer) arg;
  49. int index = id % mqs.size();
  50. return mqs.get(index);
  51. }
  52. }, 2);
  53. System.out.println(sendResult);
  54. }
  55. producer.shutdown();
  56. } catch (MQClientException e) {
  57. e.printStackTrace();
  58. } catch (RemotingException e) {
  59. e.printStackTrace();
  60. } catch (MQBrokerException e) {
  61. e.printStackTrace();
  62. } catch (InterruptedException e) {
  63. e.printStackTrace();
  64. }
  65. }
  66. }

复制代码

Consumer1.java

复制代码

  1. /**
  2. * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
  3. */
  4. public class Consumer1 {
  5. public static void main(String[] args) throws MQClientException {
  6. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
  7. consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
  8. /**
  9. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
  10. * 如果非第一次启动,那么按照上次消费的位置继续消费
  11. */
  12. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  13. consumer.subscribe("TopicOrderTest", "*");
  14. /**
  15. * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到
  16. *,第二个线程无法访问这个队列
  17. */
  18. consumer.registerMessageListener(new MessageListenerOrderly() {
  19. AtomicLong consumeTimes = new AtomicLong(0);
  20. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  21. // 设置自动提交
  22. context.setAutoCommit(true);
  23. for (MessageExt msg : msgs) {
  24. System.out.println(msg + ",内容:" + new String(msg.getBody()));
  25. }
  26. try {
  27. TimeUnit.SECONDS.sleep(5L);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. ;
  32. return ConsumeOrderlyStatus.SUCCESS;
  33. }
  34. });
  35. consumer.start();
  36. System.out.println("Consumer1 Started.");
  37. }
  38. }

复制代码

Consumer2.java

复制代码

  1. /**
  2. * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
  3. */
  4. public class Consumer2 {
  5. public static void main(String[] args) throws MQClientException {
  6. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
  7. consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
  8. /**
  9. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
  10. * 如果非第一次启动,那么按照上次消费的位置继续消费
  11. */
  12. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  13. consumer.subscribe("TopicOrderTest", "*");
  14. /**
  15. * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到
  16. *,第二个线程无法访问这个队列
  17. */
  18. consumer.registerMessageListener(new MessageListenerOrderly() {
  19. AtomicLong consumeTimes = new AtomicLong(0);
  20. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  21. // 设置自动提交
  22. context.setAutoCommit(true);
  23. for (MessageExt msg : msgs) {
  24. System.out.println(msg + ",内容:" + new String(msg.getBody()));
  25. }
  26. try {
  27. TimeUnit.SECONDS.sleep(5L);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. ;
  32. return ConsumeOrderlyStatus.SUCCESS;
  33. }
  34. });
  35. consumer.start();
  36. System.out.println("Consumer2 Started.");
  37. }
  38. }

复制代码

先启动Consumer1和Consumer2,然后启动Producer,Producer会发送15条消息
Consumer1消费情况如图,都按照顺序执行了

format_png 1

Consumer2消费情况如图,都按照顺序执行了

format_png 2

二、事务消费

这里说的主要是分布式事物。下面的例子的数据库分别安装在不同的节点上。

事物消费需要先说说什么是事务。比如说:我们跨行转账,从工商银行转到建设银行,也就是我从工商银行扣除1000元之后,我的建设银行也必须加1000元。这样才能保证数据的一致性。假如工商银行转1000元之后,建设银行的服务器突然宕机,那么我扣除了1000,但是并没有在建设银行给我加1000,就出现了数据的不一致。因此加1000和减1000才行,减1000和减1000必须一起成功,一起失败。

再比如,我们进行网购的时候,我们下单之后,订单提交成功,仓库商品的数量必须减一。但是订单可能是一个数据库,仓库数量可能又是在另个数据库里面。有可能订单提交成功之后,仓库数量服务器突然宕机。这样也出现了数据不一致的问题。

使用消息队列来解决分布式事物:

现在我们去外面饭店吃饭,很多时候都不会直接给了钱之后直接在付款的窗口递饭菜,而是付款之后他会给你一张小票,你拿着这个小票去出饭的窗口取饭。这里和我们的系统类似,提高了吞吐量。即使你到第二个窗口,师傅告诉你已经没饭了,你可以拿着这个凭证去退款,即使中途由于出了意外你无法到达窗口进行取饭,但是只要凭证还在,可以将钱退给你。这样就保证了数据的一致性。

如何保证凭证(消息)有2种方法:

1、在工商银行扣款的时候,余额表扣除1000,同时记录日志,而且这2个表是在同一个数据库实例中,可以使用本地事物解决。然后我们通知建设银行需要加1000给该用户,建设银行收到之后给我返回已经加了1000给用户的确认信息之后,我再标记日志表里面的日志为已经完成。

2、通过消息中间件

原文地址:http://www.jianshu.com/p/453c6e7ff81c

format_png 3

RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改消息的状态。

细心的你可能又发现问题了,如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息,如果发现了Prepared消息,它会向消息发送端(生产者)确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

例子:

Consumer.java

复制代码

  1. package transaction;
  2. import java.util.List;
  3. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
  4. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  5. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  6. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  7. import com.alibaba.rocketmq.client.exception.MQClientException;
  8. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
  9. import com.alibaba.rocketmq.common.message.MessageExt;
  10. /**
  11. * Consumer,订阅消息
  12. */
  13. public class Consumer {
  14. public static void main(String[] args) throws InterruptedException, MQClientException {
  15. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_Consumer");
  16. consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
  17. consumer.setConsumeMessageBatchMaxSize(10);
  18. /**
  19. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
  20. * 如果非第一次启动,那么按照上次消费的位置继续消费
  21. */
  22. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  23. consumer.subscribe("TopicTransactionTest", "*");
  24. consumer.registerMessageListener(new MessageListenerConcurrently() {
  25. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  26. try {
  27. for (MessageExt msg : msgs) {
  28. System.out.println(msg + ",内容:" + new String(msg.getBody()));
  29. }
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
  33. }
  34. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
  35. }
  36. });
  37. consumer.start();
  38. System.out.println("transaction_Consumer Started.");
  39. }
  40. }

复制代码

Producer.java

复制代码

  1. package transaction;
  2. import com.alibaba.rocketmq.client.exception.MQClientException;
  3. import com.alibaba.rocketmq.client.producer.SendResult;
  4. import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
  5. import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
  6. import com.alibaba.rocketmq.common.message.Message;
  7. /**
  8. * 发送事务消息例子
  9. *
  10. */
  11. public class Producer {
  12. public static void main(String[] args) throws MQClientException, InterruptedException {
  13. TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
  14. TransactionMQProducer producer = new TransactionMQProducer("transaction_Producer");
  15. producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");
  16. // 事务回查最小并发数
  17. producer.setCheckThreadPoolMinSize(2);
  18. // 事务回查最大并发数
  19. producer.setCheckThreadPoolMaxSize(2);
  20. // 队列数
  21. producer.setCheckRequestHoldMax(2000);
  22. producer.setTransactionCheckListener(transactionCheckListener);
  23. producer.start();
  24. // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE"
  25. // };
  26. TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
  27. for (int i = 1; i <= 2; i++) {
  28. try {
  29. Message msg = new Message("TopicTransactionTest", "transaction" + i, "KEY" + i,
  30. ("Hello RocketMQ " + i).getBytes());
  31. SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
  32. System.out.println(sendResult);
  33. Thread.sleep(10);
  34. } catch (MQClientException e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. for (int i = 0; i < 100000; i++) {
  39. Thread.sleep(1000);
  40. }
  41. producer.shutdown();
  42. }
  43. }

复制代码

TransactionExecuterImpl .java —执行本地事务

复制代码

  1. package transaction;
  2. import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
  3. import com.alibaba.rocketmq.client.producer.LocalTransactionState;
  4. import com.alibaba.rocketmq.common.message.Message;
  5. /**
  6. * 执行本地事务
  7. */
  8. public class TransactionExecuterImpl implements LocalTransactionExecuter {
  9. // private AtomicInteger transactionIndex = new AtomicInteger(1);
  10. public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
  11. System.out.println("执行本地事务msg = " + new String(msg.getBody()));
  12. System.out.println("执行本地事务arg = " + arg);
  13. String tags = msg.getTags();
  14. if (tags.equals("transaction2")) {
  15. System.out.println("======我的操作============,失败了 -进行ROLLBACK");
  16. return LocalTransactionState.ROLLBACK_MESSAGE;
  17. }
  18. return LocalTransactionState.COMMIT_MESSAGE;
  19. // return LocalTransactionState.UNKNOW;
  20. }
  21. }

复制代码

TransactionCheckListenerImpl—未决事务,服务器回查客户端(目前已经被阉割啦)

复制代码

  1. package transaction;
  2. import com.alibaba.rocketmq.client.producer.LocalTransactionState;
  3. import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
  4. import com.alibaba.rocketmq.common.message.MessageExt;
  5. /**
  6. * 未决事务,服务器回查客户端
  7. */
  8. public class TransactionCheckListenerImpl implements TransactionCheckListener {
  9. // private AtomicInteger transactionIndex = new AtomicInteger(0);
  10. //在这里,我们可以根据由MQ回传的key去数据库查询,这条数据到底是成功了还是失败了。
  11. public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
  12. System.out.println("未决事务,服务器回查客户端msg =" + new String(msg.getBody().toString()));
  13. // return LocalTransactionState.ROLLBACK_MESSAGE;
  14. return LocalTransactionState.COMMIT_MESSAGE;
  15. // return LocalTransactionState.UNKNOW;
  16. }
  17. }

复制代码

producer端:发送数据到MQ,并且处理本地事物。这里模拟了一个成功一个失败。Consumer只会接收到本地事物成功的数据。第二个数据失败了,不会被消费。

format_png 4

Consumer只会接收到一个,第二个数据不会被接收到

format_png 5

转载自https://www.cnblogs.com/520playboy/p/6750023.html

发表评论

表情:
评论列表 (有 0 条评论,69人围观)

还没有评论,来说两句吧...

相关阅读

    相关 使用RocketMQ实现消息顺序消费

    消息的顺序消费在很多交易型的业务场景中都会被要求实现,而且,消息队列的顺序消费解决方案在很多互联网公司的面试中经常会被问到。 索尔老师在使用了多个消息队列后发现,虽然每个消息

    相关 rocketMq 顺序消费

    什么是顺序消费? 消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照这个顺序消费才

    相关 RocketMQ 顺序消费机制

    顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。 顺序消息分为分区顺序消息和全局