RocketMq 消息发送和消息消费机制

桃扇骨 2022-09-04 12:43 472阅读 0赞

消费发送

生产者向消息队列里写入消息,不同的业务场景需要生产者采用不同的写入策略。

RocketMQ提供了吴中发送策略比如同步发送、异步发送、Oneway发送、延迟发送、发送事务消息等。

默认使用的是DefaultMQProducer类,发送消息要经过五个步骤:

  • 1 设置Producer的GroupName。
    GroupName 表示实例化生产者,指定生产组名称,
  • 2 设置实例化名称
    当一个Jvm需要启动多个Producer的时候,通过设置不同的InstanceName来区分不同的Produce,不设置的话系统使用默认名称“DEFAULT”。
  • 3 设置重试次数,如果发送出现异常mq会进行重试发送。
  • 4 设置NameServer地址,mq broker地址
  • 5 发送消息

发送消息成功之后,如何知道消息是否发送成功呢?在RabbitMq中会有消息发送确认机制,mq会返回给消息一个ack标识。

同样RocketMq也提供了、RocketMq提供了四种发送返回信息状态

  • FLUSH_DISK_TIMEOUT
  • FLUSH_SLAVE_TIMEOUT
  • SLAVE_NOT_AVAILABLE
  • SEND_OK

分别对应着在不同的场景下消息发送的状态

  1. FLUSH_DISK_TIMEOUT:表示没有在规定时间内完成刷盘(需要Broker的刷盘策略被设置成
    SYNC_FLUSH才会报这个错误)。
  2. FLUSH_SLAVE_TIMEOUT:表示在主备方式下,并且Broker被设置成SYNC_MASTER方式,
    没有在设定时间内完成主从同步。
  3. SLAVE_NOT_AVAILABLE:这个状态产生的场景和FLUSH_SLAVE_TIMEOUT类似,表示在主
    备方式下,并且Broker被设置成SYNC_MASTER,但是没有找到被配置成Slave的Broker。
  4. SEND_OK:表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave上?消息在Slave上是否被写入磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND_OK。

和RabbitMq一样,也提供了同步发送和异步发送

同步发送机制

  1. public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
  2. // 1 实例化生产者,并指定生产组名称
  3. DefaultMQProducer producer = new DefaultMQProducer("myproducer_group_01");
  4. //2 设置实例名称,一个jvm中有多个生产者可以根据实例名区分
  5. //默认default
  6. producer.setInstanceName("name");
  7. //3 指定nameserver的地址
  8. producer.setNamesrvAddr("localhost:9876");
  9. //4 设置同步重试次数
  10. producer.setRetryTimesWhenSendFailed(2);
  11. //设置异步发送次数
  12. //producer.setRetryTimesWhenSendAsyncFailed(2);
  13. // 初始化生产者
  14. producer.start();
  15. Message message = new Message("topic_name", "发送消息".getBytes("utf-8"));
  16. // 1 同步发送 如果发送失败会根据重试次数重试
  17. SendResult send = producer.send(message);
  18. // todo 提高发送速度
  19. // 发送一次 将消息放到缓冲区就返回,速度快,会丢消息 ,不会等待broker返回
  20. //对于一些对数据可靠性要求不高可以使用此方式发送 ,如日志收集
  21. /** * Oneway方式只发送请求不等待应答,即将数据写入客户端的Socket缓冲区就返回,不等待对方返回结果。 * 用这种方式发送消息的耗时可以缩短到微秒级。 */
  22. // 方法1 producer.sendOneway(message);
  23. // 方法2 增加produce, 使用多个Producer同时并发发送, RocketMQ引入了一个并发窗口,在窗口内消息可以并发地写入DirectMem中,然后异步地将连续一段无空洞的数据刷入文件系统当中。
  24. /* 消息发生返回状态(SendResult#SendStatus)有如下四种: FLUSH_DISK_TIMEOUT FLUSH_SLAVE_TIMEOUT SLAVE_NOT_AVAILABLE SEND_OK 表示发送成功 */
  25. SendStatus sendStatus = send.getSendStatus();
  26. System.out.println(sendStatus.toString());
  27. // 关闭生产者
  28. producer.shutdown();
  29. }

发送消息可以看到发送了成功1条数据,然后数据发送成功SEND_OK在这里插入图片描述

消息量比较大时候,同步发送如何提高发送速度,有这么几种处理方法

  • 1 对于一些不重要的消息,比如日志消息 我们可以使用oneway发送模式进行发送,Oneway方式只发送请求不等待应答,即将数据写入客户端的Socket缓冲区就返回,不等待对方返回结果,发送消息的耗时可以缩短到微秒级。

设置发送

  1. producer.sendOneway(message);
  • 2 增加produce,多个同时进行发送。
    这里不用担心多Producer同时写会降低消息写磁盘的效率,
    RocketMQ引入了一个并发窗口,在窗口内消息可以并发地写入DirectMem中,然后异步地将连续一段无空洞的数据刷入文件系统当中。

异步发送

和同步发送一样,都需要前四步步骤,直接展示核心发送的代码

  1. // 2 异步发送
  2. // 消息的异步发送
  3. producer.send(message, new SendCallback() {
  4. @Override
  5. public void onSuccess(SendResult sendResult) {
  6. System.out.println("发送成功:" + sendResult);
  7. }
  8. @Override
  9. public void onException(Throwable throwable) {
  10. //发送失败逻辑 重试次数耗尽 会抛异常
  11. System.out.println("发送失败:" + throwable.getMessage());
  12. }
  13. });

可以看到消息发送成功,这里贴出消息发送成功的消息内容

发送成功:SendResult [sendStatus=SEND_OK, msgId=C0A800678D717C53A9EB42A082D00000, offsetMsgId=C0A8006700002A9F000000000000B008, messageQueue=MessageQueue [topic=topic_name, brokerName=PilgrimdeMacBook-Pro.local, queueId=2], queueOffset=1]

消息发送状态,id,消息id以及名字,偏移量等都包含。

消息消费

消息消费也有这几个步骤

  1. 消息消费方式(Pull和Push)
  2. 消息消费的模式(广播模式和集群模式)
  3. 流量控制 暂时略过
  4. 并发线程数设置
  5. 消息的过滤(Tag、Key) TagA||TagB||TagC * null
    拉取

拉消息

  1. //1 声明拉取消息topic
  2. DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer("pull.consumer");
  3. // 指定nameserver的地址
  4. defaultMQPullConsumer.setNamesrvAddr("localhost:9876");
  5. //2 消费模式
  6. //广播模式
  7. defaultMQPullConsumer.setMessageModel(MessageModel.BROADCASTING);
  8. //集群模式
  9. //defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING);
  10. //获取所有的队列
  11. Set<MessageQueue> messageQueues = defaultMQPullConsumer.fetchMessageQueuesInBalance("topic.pull");
  12. for (MessageQueue me : messageQueues) {
  13. //指定消费者拉取消费消息
  14. // 3 TagA||TagB|| TagC *表示不对消息进行标签过滤
  15. PullResult result = defaultMQPullConsumer.pull(me, "*", 0L, 100);
  16. //... 消费代码省略
  17. // 获取从指定消息队列中拉取到的消息
  18. final List<MessageExt> msgFoundList = result.getMsgFoundList();
  19. if (msgFoundList == null) continue;
  20. for (MessageExt messageExt : msgFoundList) {
  21. System.out.println(messageExt);
  22. System.out.println(new String(messageExt.getBody(), "utf-8"));
  23. }
  24. }
  25. defaultMQPullConsumer.start();

推送消息

推送消费消息和拉取消息最大不同点是推送消息属于被迫消费,推送时消费失败,服务端会重新推送,消费不及时会严重影响性能。
而拉消息不会,拉消息拉取一批消费一批。

  1. DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("pull.consumer");
  2. // 指定nameserver的地址
  3. defaultMQPushConsumer.setNamesrvAddr("localhost:9876");
  4. //退送消息消费
  5. // TagA||TagB|| TagC *表示不对消息进行标签过滤
  6. //defaultMQPushConsumer.subscribe("topic.push","TagA||TagB|| TagC");
  7. defaultMQPushConsumer.subscribe("topic.push", "*");
  8. /** * 推送消息 提高消费处理能力 * 1 提高消费并行度 * 2 以批量方式进行 消费 * 3 检测延时情况,跳过非重要消息 */
  9. //消费限流 只针对推送来设置,拉取消息自己控制
  10. // 1 提高消费并行度
  11. defaultMQPushConsumer.setConsumeThreadMax(10);
  12. defaultMQPushConsumer.setConsumeThreadMin(1);
  13. // 2 以批量方式进行 消费
  14. // 设置消息批处理的一个批次中消息的最大个数
  15. defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);
  16. //3 检测延时情况,跳过非重要消息 略...
  17. // 添加消息监听器,一旦有消息推送过来,就进行消费
  18. defaultMQPushConsumer.setMessageListener(new MessageListenerConcurrently() {
  19. @Override
  20. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  21. //final MessageQueue messageQueue = context.getMessageQueue();
  22. for (MessageExt msg : msgs) {
  23. try {
  24. System.out.println(new String(msg.getBody(), "utf-8"));
  25. } catch (UnsupportedEncodingException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. // 消息消费成功
  30. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  31. // 消息消费失败
  32. // return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  33. }
  34. });

消费端消息消费速度低于生产者生产速度就回出现消息挤压,这种情况下,我们下除了优化我们消费者的程序,除了生产者端限流之外,我们还可以使用以下三种方式进行处理:

  • 1 提高消费并行度

在同一个ConsumerGroup下(Clustering方式),可以通过增加Consumer实例的数量来提高并行度。

通过加机器,或者在已有机器中启动多个Consumer进程都可以增加Consumer实例数。

注意:总的Consumer数量不要超过Topic下Read Queue数量,超过的Consumer实例接收不到消息。

此外,通过提高单个Consumer实例中的并行处理的线程数,可以在同一个Consumer内增加并行度来提高吞吐量(设置方法是修改consumeThreadMin和consumeThreadMax)。

  1. defaultMQPushConsumer.setConsumeThreadMax(10);
  2. defaultMQPushConsumer.setConsumeThreadMin(1);

-2 以批量方式进行消费

某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中
涉及update某个数据库,一次update10条的时间会大大小于十次update1条数据的时间。
可以通过批量方式消费来提高消费的吞吐量。实现方法是设置Consumer的consumeMessageBatchMaxSize这个参数,默认是1,如果设置为N,在消息多的时候每次收到的是个长度为N的消息链表。

  1. defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);
  • 3 检测延时情况,跳过非重要消息

Consumer在消费的过程中,如果发现由于某种原因发生严重的消息堆积,短时间无法消除堆
积,这个时候可以选择丢弃不重要的消息,使Consumer尽快追上Producer的进度。

这里的解决办法是,我们可以在生产消息时,通过tag设置来区分消息的重要性,暂时跳过次消息,先不进行消费。

消息过滤

支持两种过滤方式 Tag过滤方式SQL92的过滤方式(仅对push的消费者起作用)

Tag方式虽然效率高,但是支持的过滤逻辑比较简单。

SQL表达式可以更加灵活的支持复杂过滤逻辑,这种方式的大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样

Tag方式

拉消息

  1. PullResult result = defaultMQPullConsumer.pull("消息队topic", "*", 0L, 100);

推消息

  1. defaultMQPushConsumer.subscribe("topic.push","TagA||TagB|| TagC");

||表示或的意思,*表示不过滤

SQL92的过滤方式

步骤
配置文件里开启

  1. enablePropertyFilter=true

./mqbroker -n localhost:9876 -c …/conf/broker.conf -p | grep -i propertyfilter
在这里插入图片描述

首先需要开启支持SQL92的特性,然后重启broker:

mqbroker -n localhost:9876 -c /opt/rocket/conf/broker.conf

可以查看是否开启

在这里插入图片描述

基本的语法

  1. 数字比较: >, >=, <, <=, BETWEEN, =
  2. 字符串比较: =, <>, IN; IS NULL或者IS NOT NULL;
  3. 逻辑比较: AND, OR, NOT;
  4. Constant types are: 数字如:123, 3.1415; 字符串如:‘abc’,必须是单引号引起来 NULL,特殊常量 布尔型如:TRUE or FALSE;

发送消息
设置不同的属性key 消费端进行过滤

  1. for (int i = 0; i <10 ; i++) {
  2. Message message = new Message("topic_name", ("key="+i).getBytes("utf-8"));
  3. String falg = "";
  4. if (i%2==0){
  5. falg = "v1";
  6. }else {
  7. falg = "v2";
  8. }
  9. // 给消息添加用户属性
  10. message.putUserProperty("key", falg);
  11. // 1 同步发送 如果发送失败会根据重试次数重试
  12. SendResult send = producer.send(message);
  13. SendStatus sendStatus = send.getSendStatus();
  14. System.out.println(sendStatus.toString());
  15. }

推送消费

  1. defaultMQPushConsumer.subscribe("topic_name", MessageSelector.bySql("key = 'v1'"));
  2. //defaultMQPushConsumer.subscribe("topic_name", MessageSelector.bySql("key = 'v1' and key = 'v2'"));
  3. // defaultMQPushConsumer.subscribe("topic_name", MessageSelector.bySql("key IS NOT NULL"));
  4. //监听消息
  5. defaultMQPushConsumer.setMessageListener(new MessageListenerConcurrently() {
  6. @Override
  7. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  8. //final MessageQueue messageQueue = context.getMessageQueue();
  9. for (MessageExt msg : msgs) {
  10. System.out.println(msg);
  11. try {
  12. System.out.println(new String(msg.getBody(), "utf-8"));
  13. } catch (UnsupportedEncodingException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. // 消息消费成功
  18. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  19. // 消息消费失败
  20. // return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  21. }
  22. });

可以看到 过滤后的消息都是key=v1类型的消息

MessageExt [queueId=0, storeSize=188, queueOffset=20, sysFlag=0, bornTimestamp=1628868816509, bornHost=/192.168.0.103:63441, storeTimestamp=1628868816532, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000001338C, commitLogOffset=78732, bodyCRC=858365373, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=21, CONSUME_START_TIME=1628868816545, UNIQ_KEY=C0A80067A5857C53A9EB42DA827C0000, CLUSTER=DefaultCluster, WAIT=true, key=v1}, body=[107, 101, 121, 61, 48], transactionId=‘null’}]
key=0
MessageExt [queueId=2, storeSize=188, queueOffset=18, sysFlag=0, bornTimestamp=1628868816540, bornHost=/192.168.0.103:63441, storeTimestamp=1628868816542, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F0000000000013504, commitLogOffset=79108, bodyCRC=1562901649, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=19, CONSUME_START_TIME=1628868816547, UNIQ_KEY=C0A80067A5857C53A9EB42DA829C0002, CLUSTER=DefaultCluster, WAIT=true, key=v1}, body=[107, 101, 121, 61, 50], transactionId=‘null’}]
key=2
MessageExt [queueId=0, storeSize=188, queueOffset=21, sysFlag=0, bornTimestamp=1628868816547, bornHost=/192.168.0.103:63441, storeTimestamp=1628868816548, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000001367C, commitLogOffset=79484, bodyCRC=876894628, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=22, CONSUME_START_TIME=1628868816551, UNIQ_KEY=C0A80067A5857C53A9EB42DA82A30004, CLUSTER=DefaultCluster, WAIT=true, key=v1}, body=[107, 101, 121, 61, 52], transactionId=‘null’}]
key=4
MessageExt [queueId=2, storeSize=188, queueOffset=19, sysFlag=0, bornTimestamp=1628868816552, bornHost=/192.168.0.103:63441, storeTimestamp=1628868816554, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F00000000000137F4, commitLogOffset=79860, bodyCRC=1514813576, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=20, CONSUME_START_TIME=1628868816558, UNIQ_KEY=C0A80067A5857C53A9EB42DA82A80006, CLUSTER=DefaultCluster, WAIT=true, key=v1}, body=[107, 101, 121, 61, 54], transactionId=‘null’}]
key=6
MessageExt [queueId=0, storeSize=188, queueOffset=22, sysFlag=0, bornTimestamp=1628868816560, bornHost=/192.168.0.103:63441, storeTimestamp=1628868816561, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000001396C, commitLogOffset=80236, bodyCRC=1039275407, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=23, CONSUME_START_TIME=1628868816565, UNIQ_KEY=C0A80067A5857C53A9EB42DA82B00008, CLUSTER=DefaultCluster, WAIT=true, key=v1}, body=[107, 101, 121, 61, 56], transactionId=‘null’}]
key=8

发表评论

表情:
评论列表 (有 0 条评论,472人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RocketMq发送延迟消息

    什么是延迟消息? 对于消息中间件来说,producer将消息发送到mq的服务器,但并不期望这条消息马上被消费,而是推迟到当前时间点之后的某个时间点后再投递到queue中让

    相关 RocketMQ消息消费(三)

    顺序消息 RocketMQ支持局部消息顺序消费,可以确保同一个消息消费队列中的消息被顺序消费,如果需要做到全局顺序消费则可以将主题配置成一个队列,例如数据库BinLog等

    相关 RocketMQ消息消费(一)

    消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费组可订阅多个主题,消费组之间有集群模式与广播模式两种消费模式。集群模式,主题下的同一条消息只允许被其中一个消费

    相关 RocketMQ消息发送

    RocketMQ支持3种消息发送方式:同步(sync)、异步(async)、单向(oneway)。 同步:发送者向MQ执行发送消息API时,同步等待,直到消息服务器返回