rocketMQ producer,consumer基本特征

亦凉 2022-05-09 13:10 284阅读 0赞

1 rocketMQ producer,consumer基本特征

  1. 介绍
    rocketMQ 有同步,异步,sendOneway发送消息机制。
    RocketMQ支持消费失败定时重试,每次重试间隔时间顺延。
    RocketMQ支持定时延迟发送机制。
    RocketMQ支持有序消息,及push,poll的不同消费机制。
  2. 延迟特征
    RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持系统默认间隔时间特定的 level,例如定时 5s, 10s, 1m 等。
    其中,level=0 级表示不延时,level=1 表示1级延时 5s延迟 ,level=2 表示 2 级延时 10s延迟,以此类推
    如果要支持任意的时间精度,在Broker层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。
  3. 延迟配置
    在服务器端(rocketmq-broker端)的属性配置文件中加入以下行:
    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    描述了各级别与延时时间的对应映射关系。
  1. 这个配置项配置了从1级开始,各级延时的时间,可以修改这个指定级别的延时时间;
  2. 时间单位支持:s、m、h、d,分别表示秒、分、时、天;
  3. 默认值就是上面声明的,可手工调整;
  4. 默认值已够用,不建议修改这个值。
    1
    2
    3
    4
    5
    6
  5. producer 发送延迟有序的消费

    /**
    * 有序消息
    */
    @Test
    public void test_sync_producer_order() {

    1. try \{
    2. DefaultMQProducer producer = new DefaultMQProducer("please\_rename\_unique\_group\_name");
    3. producer.setNamesrvAddr(NAMES\_SERVER\_ADDRESS);
    4. //发送失败后,重试几次r
    5. producer.setRetryTimesWhenSendFailed(2);
    6. //发送消息超时
    7. producer.setSendMsgTimeout(3000);
    8. // producer.set
  1. producer.start();
  2. CountDownLatch countDownLatch = new CountDownLatch(2);
  3. String\[\] tags = new String\[\]\{"TagA", "TagB", "TagC", "TagD", "TagE"\};
  4. long count = 0;
  5. for (int i = 0; i < 10; i++) \{
  6. // int orderId = i % 10;
  7. for (int j = 0; j < 5; j++) \{
  8. Message msg =
  9. new Message("TopicTest\_c", tags\[i % tags.length\], "KEY" + j,
  10. ("Hello RocketMQ " + i+j).getBytes(RemotingHelper.DEFAULT\_CHARSET));
  11. // This message will be delivered to consumer 10 seconds later.
  12. msg.setDelayTimeLevel(3);
  13. SendResult sendResult = producer.send(msg, new MessageQueueSelector() \{
  14. @Override
  15. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) \{
  16. //the are为orderId
  17. Integer id = (Integer) arg;

// int index = id % mqs.size();
// // 根据key的hash值来选择

  1. int index = RocketUtils.toPositive(RocketUtils.murmur2(id.toString().getBytes())) % mqs
  2. .size();
  3. MessageQueue queue = mqs.get(index);
  4. logger.info("input value \{\} get queue size:\{\},choose queue: \{\}", id, mqs.size(), queue
  5. .getBrokerName());
  6. return queue;
  7. \}
  8. \}, j);
  9. logger.info("return send result \{\}", sendResult);
  10. count++;
  11. \}
  12. \}
  13. logger.info("total send count\{\}", count);
  14. producer.shutdown();
  15. \} catch (Exception e) \{
  16. e.printStackTrace();
  17. \}
  18. \}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
1.发送延迟消息只需要setDelayTimeLevel,设置延迟level就可以。
2.发送有序消息,一般消息是通过轮询所有队列来发送的(负载均衡策略),顺序消息可以根据业务,比如说订单号相同的消息发送到同一个队列,上面的策略是通过参数的hash值与queue数取余来选择queue。
3.consumer端也有rebalance机制来分配queue给指定的consumer消费,类似kafka的顺序消费。

  1. consumer采用push模式消费,

    @Test
    public void consumer_push_Test_delay() {

    1. try \{
    2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please\_rename\_unique\_group\_name\_three\_test");
    3. // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME\_FROM\_FIRST\_OFFSET);
    4. consumer.setNamesrvAddr(RocketUtils.NAMES\_SERVER\_ADDRESS);
    5. //"TagA || TagC || TagD"
    6. // RocketUtils.TOPIC\_NAME\_DELAY
    7. consumer.subscribe("TopicTest\_test", "\*");
    8. //指定consumeMessage的时候,获取消息的数量,
    9. consumer.setConsumeMessageBatchMaxSize(1);
    10. consumer.setPullBatchSize(1);
    11. //以前是多线程的消费
    12. consumer.setConsumeThreadMin(1);
    13. consumer.setConsumeThreadMax(1);
    14. // consumer.setConsumeThreadMin();
    15. //consumer.resetClientConfig();
    16. final AtomicLong consumeTimes = new AtomicLong(0);
    17. final CountDownLatch latch = new CountDownLatch(1);
    18. consumer.registerMessageListener(new MessageListenerConcurrently() \{
    19. @Override
    20. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext
    21. context) \{
    22. // context.setAutoCommit(true);
    23. logger.info("===========start ============" + consumeTimes.longValue());
    24. for (MessageExt message : msgs) \{

    // logger.info(“ Receive New Messages: “ + ToStringBuilder.reflectionToString(message,
    // ToStringStyle.SHORT_PREFIX_STYLE) + “%n:” + new String(message.getBody()));

    1. logger.info(" Receive New Messages queue:\{\},queue offset:\{\},message:\{\} ",message
    2. .getQueueId(),message.getQueueOffset(),new
    3. String(message
    4. .getBody()));
  1. logger.info("Receive message\[msgId=" + message.getMsgId() + "\] "
  2. + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
  3. \}
  4. logger.info("===========end ============");
  5. consumeTimes.incrementAndGet();
  6. // context.setAckIndex();
  7. if (consumeTimes.longValue() > 5) \{
  8. // latch.countDown();
  9. return ConsumeConcurrentlyStatus.CONSUME\_SUCCESS;
  10. \} else \{
  11. /\*\*
  12. \* 2017-08-14 14:05:38 INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m
  13. // 7m 8m 9m 10m 20m 30m 1h 2h
  14. \*/
  15. context.setDelayLevelWhenNextConsume(3);
  16. return ConsumeConcurrentlyStatus.RECONSUME\_LATER;
  17. \}
  18. \}
  19. \});
  20. consumer.start();
  21. latch.await();
  22. Thread.sleep(60 \* 1000);
  23. logger.info("Consumer Started.%n");
  24. consumer.shutdown();
  25. \} catch (Exception e) \{
  26. e.printStackTrace();
  27. \}
  28. \}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
发送一条消息,发送反馈消息ID:C0A804A15F0014DAD5DC12B5CC860000

2017-09-04 15:11:43,706 NettyClientSelector_1 DEBUG [io.netty.util.internal.Cleaner0] - [java.nio.ByteBuffer.cleaner(): available]
SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=C0A804A15F0014DAD5DC12B5CC860000, offsetMsgId=AC101E0D00002A9F00000000002159A1, messageQueue=MessageQueue [topic=TopicTest_test, brokerName=broker-a, queueId=0], queueOffset=424]

1
2
3
4
5
2.根据上面的消费的代码,设置消息消费失败后延迟10s再试,根据下面的日志可以发现消息在10s后被重复消费。

2017-09-04 15:12:14,378 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========start ============0]
2017-09-04 15:12:14,378 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [ Receive New Messages queue:0,queue offset:1,message:Hello RocketMQ andex ]
2017-09-04 15:12:14,859 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [Receive message[msgId=C0A804A15F0014DAD5DC12B5CC860000] 839ms later]
2017-09-04 15:12:14,859 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========end ============]
2017-09-04 15:12:24,918 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========start ============1]
2017-09-04 15:12:24,918 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [ Receive New Messages queue:0,queue offset:18,message:Hello RocketMQ andex ]
2017-09-04 15:12:24,918 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [Receive message[msgId=C0A804A15F0014DAD5DC12B5CC860000] 351ms later]
2017-09-04 15:12:24,918 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========end ============]
2017-09-04 15:12:34,970 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========start ============2]
2017-09-04 15:12:34,970 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [ Receive New Messages queue:0,queue offset:19,message:Hello RocketMQ andex ]
2017-09-04 15:12:34,970 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [Receive message[msgId=C0A804A15F0014DAD5DC12B5CC860000] 349ms later]
2017-09-04 15:12:34,970 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========end ============]
2017-09-04 15:12:45,019 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========start ============3]
2017-09-04 15:12:45,019 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [ Receive New Messages queue:0,queue offset:20,message:Hello RocketMQ andex ]
2017-09-04 15:12:45,019 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [Receive message[msgId=C0A804A15F0014DAD5DC12B5CC860000] 346ms later]
2017-09-04 15:12:45,019 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========end ============]
2017-09-04 15:12:55,070 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========start ============4]
2017-09-04 15:12:55,070 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [ Receive New Messages queue:0,queue offset:21,message:Hello RocketMQ andex ]
2017-09-04 15:12:55,070 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [Receive message[msgId=C0A804A15F0014DAD5DC12B5CC860000] 348ms later]
2017-09-04 15:12:55,070 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========end ============]
2017-09-04 15:13:05,120 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========start ============5]
2017-09-04 15:13:05,120 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [ Receive New Messages queue:0,queue offset:22,message:Hello RocketMQ andex ]
2017-09-04 15:13:05,120 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [Receive message[msgId=C0A804A15F0014DAD5DC12B5CC860000] 346ms later]
2017-09-04 15:13:05,120 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========end ============]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

  1. 消费者push模式两种监听模式
    1.同一批的消息只要根据MessageQueueSelector投递消息到同一个queue,同一个queue只能被某一个消费者实例所拉取,同一个消费者实例肯定是按照顺序拉取消息提交给业务处理线程池。这样就保证消费端顺序消费。
    2.(这里假设触发了重排导致queue分配给了别人也没关系,由于queue的消息永远是FIFO,最多只是已经消费的消息重复而已,queue内顺序还是能保证)
  2. 但的确会有一些异常场景会导致乱序。如master宕机,导致写入队列的数量上出现变化。还有调整queue数,如果还是沿用取模的seletor,就会一批订单号的消息前面散列到q0,后面的可能散到q1,这样就不能保证顺序了。

  3. MessageListenerOrderly
    MessageListenerOrderly是自带的保证顺序消费的Listener,我们可以使用MessageListenerConcurrently设置为单线程来替代它,因为它有如下问题:
    遇到消息失败的消息,无法跳过,当前队列消费暂停。
    目前版本的RocketMQ的MessageListenerOrderly是不能从slave消费消息的。

  4. MessageListenerConcurrently
    主要流程是PullMessageService单线程轮询pull数据到ConsumeMessageConcurrentlyService去消费。
    消息集合按consumeBatchSize切分后放到consumeExecutor里去消费,所有消息被并行的执行。
  5. 参考资料
    RocketMQ_example 例子代码
    更多RocketMQ使用请下载github源码。
    深入浅出学习product,consumer源码:
    http://blog.csdn.net/quhongwei\_zhanqiu/article/details/39142693
    http://www.cnblogs.com/wxd0108/p/6055108.html
    -——————————
    作者:编程之路-java
    来源:CSDN
    原文:https://blog.csdn.net/xhpscdx/article/details/77853031?utm\_source=copy
    版权声明:本文为博主原创文章,转载请附上博文链接!

发表评论

表情:
评论列表 (有 0 条评论,284人围观)

还没有评论,来说两句吧...

相关阅读