rocketMQ producer,consumer基本特征
1 rocketMQ producer,consumer基本特征
- 介绍
rocketMQ 有同步,异步,sendOneway发送消息机制。
RocketMQ支持消费失败定时重试,每次重试间隔时间顺延。
RocketMQ支持定时延迟发送机制。
RocketMQ支持有序消息,及push,poll的不同消费机制。 - 延迟特征
RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持系统默认间隔时间特定的 level,例如定时 5s, 10s, 1m 等。
其中,level=0 级表示不延时,level=1 表示1级延时 5s延迟 ,level=2 表示 2 级延时 10s延迟,以此类推
如果要支持任意的时间精度,在Broker层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。 - 延迟配置
在服务器端(rocketmq-broker端)的属性配置文件中加入以下行:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
描述了各级别与延时时间的对应映射关系。
- 这个配置项配置了从1级开始,各级延时的时间,可以修改这个指定级别的延时时间;
- 时间单位支持:s、m、h、d,分别表示秒、分、时、天;
- 默认值就是上面声明的,可手工调整;
- 默认值已够用,不建议修改这个值。
1
2
3
4
5
6 producer 发送延迟有序的消费
/**
* 有序消息
*/
@Test
public void test_sync_producer_order() {try \{
DefaultMQProducer producer = new DefaultMQProducer("please\_rename\_unique\_group\_name");
producer.setNamesrvAddr(NAMES\_SERVER\_ADDRESS);
//发送失败后,重试几次r
producer.setRetryTimesWhenSendFailed(2);
//发送消息超时
producer.setSendMsgTimeout(3000);
// producer.set
producer.start();
CountDownLatch countDownLatch = new CountDownLatch(2);
String\[\] tags = new String\[\]\{"TagA", "TagB", "TagC", "TagD", "TagE"\};
long count = 0;
for (int i = 0; i < 10; i++) \{
// int orderId = i % 10;
for (int j = 0; j < 5; j++) \{
Message msg =
new Message("TopicTest\_c", tags\[i % tags.length\], "KEY" + j,
("Hello RocketMQ " + i+j).getBytes(RemotingHelper.DEFAULT\_CHARSET));
// This message will be delivered to consumer 10 seconds later.
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg, new MessageQueueSelector() \{
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) \{
//the are为orderId
Integer id = (Integer) arg;
// int index = id % mqs.size();
// // 根据key的hash值来选择
int index = RocketUtils.toPositive(RocketUtils.murmur2(id.toString().getBytes())) % mqs
.size();
MessageQueue queue = mqs.get(index);
logger.info("input value \{\} get queue size:\{\},choose queue: \{\}", id, mqs.size(), queue
.getBrokerName());
return queue;
\}
\}, j);
logger.info("return send result \{\}", sendResult);
count++;
\}
\}
logger.info("total send count\{\}", count);
producer.shutdown();
\} catch (Exception e) \{
e.printStackTrace();
\}
\}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
1.发送延迟消息只需要setDelayTimeLevel,设置延迟level就可以。
2.发送有序消息,一般消息是通过轮询所有队列来发送的(负载均衡策略),顺序消息可以根据业务,比如说订单号相同的消息发送到同一个队列,上面的策略是通过参数的hash值与queue数取余来选择queue。
3.consumer端也有rebalance机制来分配queue给指定的consumer消费,类似kafka的顺序消费。
consumer采用push模式消费,
@Test
public void consumer_push_Test_delay() {try \{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please\_rename\_unique\_group\_name\_three\_test");
// consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME\_FROM\_FIRST\_OFFSET);
consumer.setNamesrvAddr(RocketUtils.NAMES\_SERVER\_ADDRESS);
//"TagA || TagC || TagD"
// RocketUtils.TOPIC\_NAME\_DELAY
consumer.subscribe("TopicTest\_test", "\*");
//指定consumeMessage的时候,获取消息的数量,
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setPullBatchSize(1);
//以前是多线程的消费
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
// consumer.setConsumeThreadMin();
//consumer.resetClientConfig();
final AtomicLong consumeTimes = new AtomicLong(0);
final CountDownLatch latch = new CountDownLatch(1);
consumer.registerMessageListener(new MessageListenerConcurrently() \{
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext
context) \{
// context.setAutoCommit(true);
logger.info("===========start ============" + consumeTimes.longValue());
for (MessageExt message : msgs) \{
// logger.info(“ Receive New Messages: “ + ToStringBuilder.reflectionToString(message,
// ToStringStyle.SHORT_PREFIX_STYLE) + “%n:” + new String(message.getBody()));logger.info(" Receive New Messages queue:\{\},queue offset:\{\},message:\{\} ",message
.getQueueId(),message.getQueueOffset(),new
String(message
.getBody()));
logger.info("Receive message\[msgId=" + message.getMsgId() + "\] "
+ (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
\}
logger.info("===========end ============");
consumeTimes.incrementAndGet();
// context.setAckIndex();
if (consumeTimes.longValue() > 5) \{
// latch.countDown();
return ConsumeConcurrentlyStatus.CONSUME\_SUCCESS;
\} else \{
/\*\*
\* 2017-08-14 14:05:38 INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m
// 7m 8m 9m 10m 20m 30m 1h 2h
\*/
context.setDelayLevelWhenNextConsume(3);
return ConsumeConcurrentlyStatus.RECONSUME\_LATER;
\}
\}
\});
consumer.start();
latch.await();
Thread.sleep(60 \* 1000);
logger.info("Consumer Started.%n");
consumer.shutdown();
\} catch (Exception e) \{
e.printStackTrace();
\}
\}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
发送一条消息,发送反馈消息ID:C0A804A15F0014DAD5DC12B5CC860000
2017-09-04 15:11:43,706 NettyClientSelector_1 DEBUG [io.netty.util.internal.Cleaner0] - [java.nio.ByteBuffer.cleaner(): available]
SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=C0A804A15F0014DAD5DC12B5CC860000, offsetMsgId=AC101E0D00002A9F00000000002159A1, messageQueue=MessageQueue [topic=TopicTest_test, brokerName=broker-a, queueId=0], queueOffset=424]
1
2
3
4
5
2.根据上面的消费的代码,设置消息消费失败后延迟10s再试,根据下面的日志可以发现消息在10s后被重复消费。
2017-09-04 15:12:14,378 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========start ============0]
2017-09-04 15:12:14,378 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [ Receive New Messages queue:0,queue offset:1,message:Hello RocketMQ andex ]
2017-09-04 15:12:14,859 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [Receive message[msgId=C0A804A15F0014DAD5DC12B5CC860000] 839ms later]
2017-09-04 15:12:14,859 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========end ============]
2017-09-04 15:12:24,918 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========start ============1]
2017-09-04 15:12:24,918 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [ Receive New Messages queue:0,queue offset:18,message:Hello RocketMQ andex ]
2017-09-04 15:12:24,918 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [Receive message[msgId=C0A804A15F0014DAD5DC12B5CC860000] 351ms later]
2017-09-04 15:12:24,918 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========end ============]
2017-09-04 15:12:34,970 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========start ============2]
2017-09-04 15:12:34,970 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [ Receive New Messages queue:0,queue offset:19,message:Hello RocketMQ andex ]
2017-09-04 15:12:34,970 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [Receive message[msgId=C0A804A15F0014DAD5DC12B5CC860000] 349ms later]
2017-09-04 15:12:34,970 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========end ============]
2017-09-04 15:12:45,019 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========start ============3]
2017-09-04 15:12:45,019 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [ Receive New Messages queue:0,queue offset:20,message:Hello RocketMQ andex ]
2017-09-04 15:12:45,019 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [Receive message[msgId=C0A804A15F0014DAD5DC12B5CC860000] 346ms later]
2017-09-04 15:12:45,019 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========end ============]
2017-09-04 15:12:55,070 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========start ============4]
2017-09-04 15:12:55,070 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [ Receive New Messages queue:0,queue offset:21,message:Hello RocketMQ andex ]
2017-09-04 15:12:55,070 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [Receive message[msgId=C0A804A15F0014DAD5DC12B5CC860000] 348ms later]
2017-09-04 15:12:55,070 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========end ============]
2017-09-04 15:13:05,120 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========start ============5]
2017-09-04 15:13:05,120 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [ Receive New Messages queue:0,queue offset:22,message:Hello RocketMQ andex ]
2017-09-04 15:13:05,120 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [Receive message[msgId=C0A804A15F0014DAD5DC12B5CC860000] 346ms later]
2017-09-04 15:13:05,120 ConsumeMessageThread_1 INFO [com.keruyun.rocket.client.ApacheRocketProducerTest] - [===========end ============]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
- 消费者push模式两种监听模式
1.同一批的消息只要根据MessageQueueSelector投递消息到同一个queue,同一个queue只能被某一个消费者实例所拉取,同一个消费者实例肯定是按照顺序拉取消息提交给业务处理线程池。这样就保证消费端顺序消费。
2.(这里假设触发了重排导致queue分配给了别人也没关系,由于queue的消息永远是FIFO,最多只是已经消费的消息重复而已,queue内顺序还是能保证) 但的确会有一些异常场景会导致乱序。如master宕机,导致写入队列的数量上出现变化。还有调整queue数,如果还是沿用取模的seletor,就会一批订单号的消息前面散列到q0,后面的可能散到q1,这样就不能保证顺序了。
MessageListenerOrderly
MessageListenerOrderly是自带的保证顺序消费的Listener,我们可以使用MessageListenerConcurrently设置为单线程来替代它,因为它有如下问题:
遇到消息失败的消息,无法跳过,当前队列消费暂停。
目前版本的RocketMQ的MessageListenerOrderly是不能从slave消费消息的。- MessageListenerConcurrently
主要流程是PullMessageService单线程轮询pull数据到ConsumeMessageConcurrentlyService去消费。
消息集合按consumeBatchSize切分后放到consumeExecutor里去消费,所有消息被并行的执行。 - 参考资料
RocketMQ_example 例子代码
更多RocketMQ使用请下载github源码。
深入浅出学习product,consumer源码:
http://blog.csdn.net/quhongwei\_zhanqiu/article/details/39142693
http://www.cnblogs.com/wxd0108/p/6055108.html
-——————————
作者:编程之路-java
来源:CSDN
原文:https://blog.csdn.net/xhpscdx/article/details/77853031?utm\_source=copy
版权声明:本文为博主原创文章,转载请附上博文链接!
还没有评论,来说两句吧...