RocketMQ——广播消费模式与集群消费模式

你的名字 2021-09-27 00:04 655阅读 0赞

RocketMQ有两种消费模式:BROADCASTING广播模式,CLUSTERING集群模式,默认的是 集群消费模式。

本博客主要以广播模式为例!!!

1.简介

广播消费指的是:一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次,广播消费中ConsumerGroup概念可以认为在消息划分方面无意义。

在CORBA Notification规范中,消费方式都属于广播消费。

在JMS规范中,相当于JMS publish/subscribe model

集群消费模式:一个ConsumerGroup中的Consumer实例平均分摊消费消息。例如某个Topic有9条消息,其中一个ConsumerGroup有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中部分,消费完的消息不能被其他实例消费。

在CORBA Notification规范中,无此消费方式。

在JMS规范中,JMS point-to-point model与之类似,但是RocketMQ的集群消费功能大等于PTP模型。因为RocketMQ单个ConsumerGroup内的消费者类似于PTP,但是一个Topic/Queue可以被多个ConsumerGroup消费。

2.实例

(1)Producer

  1. package com.gwd.rocketmq;
  2. import com.alibaba.rocketmq.client.exception.MQClientException;
  3. import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
  4. import com.alibaba.rocketmq.client.producer.SendResult;
  5. import com.alibaba.rocketmq.common.message.Message;
  6. /**
  7. * @FileName Producer.java
  8. * @Description:
  9. * @author gu.weidong
  10. * @version V1.0
  11. * @createtime 2018年6月25日 上午9:48:37
  12. * 修改历史:
  13. * 时间 作者 版本 描述
  14. *====================================================
  15. *
  16. */
  17. public class Producer {
  18. public static void main(String[] args) throws MQClientException, InterruptedException {
  19. //声明并初始化一个producer
  20. //需要一个producer group名字作为构造方法的参数,这里为producer1
  21. DefaultMQProducer producer = new DefaultMQProducer("producer1");
  22. //设置NameServer地址,此处应改为实际NameServer地址,多个地址之间用;分隔
  23. //NameServer的地址必须有,但是也可以通过环境变量的方式设置,不一定非得写死在代码里
  24. producer.setNamesrvAddr("192.168.140.128:9876;192.168.140.129:9876");
  25. //调用start()方法启动一个producer实例
  26. producer.start();
  27. //发送10条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值
  28. for (int i = 0; i < 100; i++) {
  29. try {
  30. Message msg = new Message("TopicTest",// topic
  31. "TagA",// tag
  32. ("Hello RocketMQ " + i).getBytes("utf-8")// body
  33. );
  34. //调用producer的send()方法发送消息
  35. //这里调用的是同步的方式,所以会有返回结果
  36. SendResult sendResult = producer.send(msg);
  37. System.out.println(sendResult.getSendStatus()); //发送结果状态
  38. //打印返回结果,可以看到消息发送的状态以及一些相关信息
  39. System.out.println(sendResult);
  40. } catch (Exception e) {
  41. e.printStackTrace();
  42. Thread.sleep(1000);
  43. }
  44. }
  45. //发送完消息之后,调用shutdown()方法关闭producer
  46. producer.shutdown();
  47. }
  48. }

(2)Consumer1

  1. package com.gwd.rocketmq;
  2. import java.text.SimpleDateFormat;
  3. import java.util.Date;
  4. import java.util.List;
  5. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
  6. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  7. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  8. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  9. import com.alibaba.rocketmq.client.exception.MQClientException;
  10. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
  11. import com.alibaba.rocketmq.common.message.MessageExt;
  12. import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
  13. /**
  14. * @FileName Consumer.java
  15. * @Description:
  16. * @author gu.weidong
  17. * @version V1.0
  18. * @createtime 2018年6月25日 上午9:49:39
  19. * 修改历史:
  20. * 时间 作者 版本 描述
  21. *====================================================
  22. *
  23. */
  24. public class Consumer {
  25. public static void main(String[] args) throws MQClientException {
  26. //声明并初始化一个consumer
  27. //需要一个consumer group名字作为构造方法的参数,这里为consumer1
  28. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
  29. //同样也要设置NameServer地址
  30. consumer.setNamesrvAddr("192.168.140.128:9876;192.168.140.129:9876");
  31. consumer.setMessageModel(MessageModel.BROADCASTING);
  32. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  33. //设置consumer所订阅的Topic和Tag,*代表全部的Tag
  34. consumer.subscribe("TopicTest", "*");
  35. //设置一个Listener,主要进行消息的逻辑处理
  36. consumer.registerMessageListener(new MessageListenerConcurrently() {
  37. @Override
  38. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  39. ConsumeConcurrentlyContext context) {
  40. for (MessageExt messageExt : msgs) {
  41. String messageBody = new String(messageExt.getBody());
  42. System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(
  43. new Date())+"消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//输出消息内容
  44. }
  45. //返回消费状态
  46. //CONSUME_SUCCESS 消费成功
  47. //RECONSUME_LATER 消费失败,需要稍后重新消费
  48. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  49. }
  50. });
  51. //调用start()方法启动consumer
  52. consumer.start();
  53. System.out.println("Consumer Started.");
  54. }
  55. }

(3)consumer2

  1. package com.gwd.rocketmq;
  2. import java.text.SimpleDateFormat;
  3. import java.util.Date;
  4. import java.util.List;
  5. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
  6. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  7. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  8. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  9. import com.alibaba.rocketmq.client.exception.MQClientException;
  10. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
  11. import com.alibaba.rocketmq.common.message.MessageExt;
  12. import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
  13. /**
  14. * @FileName Consumer2.java
  15. * @Description:
  16. * @author gu.weidong
  17. * @version V1.0
  18. * @createtime 2018年6月25日 上午11:26:23
  19. * 修改历史:
  20. * 时间 作者 版本 描述
  21. *====================================================
  22. *
  23. */
  24. public class Consumer2 {
  25. public static void main(String[] args) throws MQClientException {
  26. //声明并初始化一个consumer
  27. //需要一个consumer group名字作为构造方法的参数,这里为consumer1
  28. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
  29. //同样也要设置NameServer地址
  30. consumer.setNamesrvAddr("192.168.140.128:9876;192.168.140.129:9876");
  31. consumer.setMessageModel(MessageModel.BROADCASTING);
  32. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  33. //设置consumer所订阅的Topic和Tag,*代表全部的Tag
  34. consumer.subscribe("TopicTest", "*");
  35. //设置一个Listener,主要进行消息的逻辑处理
  36. consumer.registerMessageListener(new MessageListenerConcurrently() {
  37. @Override
  38. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  39. ConsumeConcurrentlyContext context) {
  40. for (MessageExt messageExt : msgs) {
  41. String messageBody = new String(messageExt.getBody());
  42. System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(
  43. new Date())+"2--------消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//输出消息内容
  44. }
  45. //返回消费状态
  46. //CONSUME_SUCCESS 消费成功
  47. //RECONSUME_LATER 消费失败,需要稍后重新消费
  48. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  49. }
  50. });
  51. //调用start()方法启动consumer
  52. consumer.start();
  53. System.out.println("Consumer Started.");
  54. }
  55. }

(4)运行顺序:consumer1——consumer2——producer

  1. consumer.setMessageModel(MessageModel.BROADCASTING);//设置广播消费模式

注意:为了使结果更直观,这边的ConsumerGroup设置为同一个

(5)测试结果

consumer1消费情况:

70

consumer2消费情况:

70 1

(6)与集群消费模式对比

默认的是使用集群消费模式,这两者最大的区别在于同组中的消费,集群消费模式是同组公同消费一组消息,广播模式是同组各自都消费一组消息。

下面看下集群消费模式的情况:

consumer1:

70 2

consumer2:

70 3

发表评论

表情:
评论列表 (有 0 条评论,655人围观)

还没有评论,来说两句吧...

相关阅读