RocketMq 消息发送和消息消费机制
消费发送
生产者向消息队列里写入消息,不同的业务场景需要生产者采用不同的写入策略。
RocketMQ提供了吴中发送策略比如同步发送、异步发送、Oneway发送、延迟发送、发送事务消息等。
默认使用的是DefaultMQProducer类,发送消息要经过五个步骤:
- 1 设置Producer的GroupName。
GroupName 表示实例化生产者,指定生产组名称, - 2 设置实例化名称
当一个Jvm需要启动多个Producer的时候,通过设置不同的InstanceName来区分不同的Produce,不设置的话系统使用默认名称“DEFAULT”。 - 3 设置重试次数,如果发送出现异常mq会进行重试发送。
- 4 设置NameServer地址,mq broker地址
- 5 发送消息
发送消息成功之后,如何知道消息是否发送成功呢?在RabbitMq中会有消息发送确认机制,mq会返回给消息一个ack标识。
同样RocketMq也提供了、RocketMq提供了四种发送返回信息状态
- FLUSH_DISK_TIMEOUT
- FLUSH_SLAVE_TIMEOUT
- SLAVE_NOT_AVAILABLE
- SEND_OK
分别对应着在不同的场景下消息发送的状态
- FLUSH_DISK_TIMEOUT:表示没有在规定时间内完成刷盘(需要Broker的刷盘策略被设置成
SYNC_FLUSH才会报这个错误)。 - FLUSH_SLAVE_TIMEOUT:表示在主备方式下,并且Broker被设置成SYNC_MASTER方式,
没有在设定时间内完成主从同步。 - SLAVE_NOT_AVAILABLE:这个状态产生的场景和FLUSH_SLAVE_TIMEOUT类似,表示在主
备方式下,并且Broker被设置成SYNC_MASTER,但是没有找到被配置成Slave的Broker。 - SEND_OK:表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave上?消息在Slave上是否被写入磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND_OK。
和RabbitMq一样,也提供了同步发送和异步发送
同步发送机制
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
// 1 实例化生产者,并指定生产组名称
DefaultMQProducer producer = new DefaultMQProducer("myproducer_group_01");
//2 设置实例名称,一个jvm中有多个生产者可以根据实例名区分
//默认default
producer.setInstanceName("name");
//3 指定nameserver的地址
producer.setNamesrvAddr("localhost:9876");
//4 设置同步重试次数
producer.setRetryTimesWhenSendFailed(2);
//设置异步发送次数
//producer.setRetryTimesWhenSendAsyncFailed(2);
// 初始化生产者
producer.start();
Message message = new Message("topic_name", "发送消息".getBytes("utf-8"));
// 1 同步发送 如果发送失败会根据重试次数重试
SendResult send = producer.send(message);
// todo 提高发送速度
// 发送一次 将消息放到缓冲区就返回,速度快,会丢消息 ,不会等待broker返回
//对于一些对数据可靠性要求不高可以使用此方式发送 ,如日志收集
/** * Oneway方式只发送请求不等待应答,即将数据写入客户端的Socket缓冲区就返回,不等待对方返回结果。 * 用这种方式发送消息的耗时可以缩短到微秒级。 */
// 方法1 producer.sendOneway(message);
// 方法2 增加produce, 使用多个Producer同时并发发送, RocketMQ引入了一个并发窗口,在窗口内消息可以并发地写入DirectMem中,然后异步地将连续一段无空洞的数据刷入文件系统当中。
/* 消息发生返回状态(SendResult#SendStatus)有如下四种: FLUSH_DISK_TIMEOUT FLUSH_SLAVE_TIMEOUT SLAVE_NOT_AVAILABLE SEND_OK 表示发送成功 */
SendStatus sendStatus = send.getSendStatus();
System.out.println(sendStatus.toString());
// 关闭生产者
producer.shutdown();
}
发送消息可以看到发送了成功1条数据,然后数据发送成功SEND_OK
消息量比较大时候,同步发送如何提高发送速度,有这么几种处理方法
- 1 对于一些不重要的消息,比如日志消息 我们可以使用oneway发送模式进行发送,Oneway方式只发送请求不等待应答,即将数据写入客户端的Socket缓冲区就返回,不等待对方返回结果,发送消息的耗时可以缩短到微秒级。
设置发送
producer.sendOneway(message);
- 2 增加produce,多个同时进行发送。
这里不用担心多Producer同时写会降低消息写磁盘的效率,
RocketMQ引入了一个并发窗口,在窗口内消息可以并发地写入DirectMem中,然后异步地将连续一段无空洞的数据刷入文件系统当中。
异步发送
和同步发送一样,都需要前四步步骤,直接展示核心发送的代码
// 2 异步发送
// 消息的异步发送
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功:" + sendResult);
}
@Override
public void onException(Throwable throwable) {
//发送失败逻辑 重试次数耗尽 会抛异常
System.out.println("发送失败:" + throwable.getMessage());
}
});
可以看到消息发送成功,这里贴出消息发送成功的消息内容
发送成功:SendResult [sendStatus=SEND_OK, msgId=C0A800678D717C53A9EB42A082D00000, offsetMsgId=C0A8006700002A9F000000000000B008, messageQueue=MessageQueue [topic=topic_name, brokerName=PilgrimdeMacBook-Pro.local, queueId=2], queueOffset=1]
消息发送状态,id,消息id以及名字,偏移量等都包含。
消息消费
消息消费也有这几个步骤
- 消息消费方式(Pull和Push)
- 消息消费的模式(广播模式和集群模式)
- 流量控制 暂时略过
- 并发线程数设置
- 消息的过滤(Tag、Key) TagA||TagB||TagC * null
拉取
拉消息
//1 声明拉取消息topic
DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer("pull.consumer");
// 指定nameserver的地址
defaultMQPullConsumer.setNamesrvAddr("localhost:9876");
//2 消费模式
//广播模式
defaultMQPullConsumer.setMessageModel(MessageModel.BROADCASTING);
//集群模式
//defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING);
//获取所有的队列
Set<MessageQueue> messageQueues = defaultMQPullConsumer.fetchMessageQueuesInBalance("topic.pull");
for (MessageQueue me : messageQueues) {
//指定消费者拉取消费消息
// 3 TagA||TagB|| TagC *表示不对消息进行标签过滤
PullResult result = defaultMQPullConsumer.pull(me, "*", 0L, 100);
//... 消费代码省略
// 获取从指定消息队列中拉取到的消息
final List<MessageExt> msgFoundList = result.getMsgFoundList();
if (msgFoundList == null) continue;
for (MessageExt messageExt : msgFoundList) {
System.out.println(messageExt);
System.out.println(new String(messageExt.getBody(), "utf-8"));
}
}
defaultMQPullConsumer.start();
推送消息
推送消费消息和拉取消息最大不同点是推送消息属于被迫消费,推送时消费失败,服务端会重新推送,消费不及时会严重影响性能。
而拉消息不会,拉消息拉取一批消费一批。
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("pull.consumer");
// 指定nameserver的地址
defaultMQPushConsumer.setNamesrvAddr("localhost:9876");
//退送消息消费
// TagA||TagB|| TagC *表示不对消息进行标签过滤
//defaultMQPushConsumer.subscribe("topic.push","TagA||TagB|| TagC");
defaultMQPushConsumer.subscribe("topic.push", "*");
/** * 推送消息 提高消费处理能力 * 1 提高消费并行度 * 2 以批量方式进行 消费 * 3 检测延时情况,跳过非重要消息 */
//消费限流 只针对推送来设置,拉取消息自己控制
// 1 提高消费并行度
defaultMQPushConsumer.setConsumeThreadMax(10);
defaultMQPushConsumer.setConsumeThreadMin(1);
// 2 以批量方式进行 消费
// 设置消息批处理的一个批次中消息的最大个数
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);
//3 检测延时情况,跳过非重要消息 略...
// 添加消息监听器,一旦有消息推送过来,就进行消费
defaultMQPushConsumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//final MessageQueue messageQueue = context.getMessageQueue();
for (MessageExt msg : msgs) {
try {
System.out.println(new String(msg.getBody(), "utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
// 消息消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// 消息消费失败
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
消费端消息消费速度低于生产者生产速度就回出现消息挤压,这种情况下,我们下除了优化我们消费者的程序,除了生产者端限流之外,我们还可以使用以下三种方式进行处理:
- 1 提高消费并行度
在同一个ConsumerGroup下(Clustering方式),可以通过增加Consumer实例的数量来提高并行度。
通过加机器,或者在已有机器中启动多个Consumer进程都可以增加Consumer实例数。
注意:总的Consumer数量不要超过Topic下Read Queue数量,超过的Consumer实例接收不到消息。
此外,通过提高单个Consumer实例中的并行处理的线程数,可以在同一个Consumer内增加并行度来提高吞吐量(设置方法是修改consumeThreadMin和consumeThreadMax)。
defaultMQPushConsumer.setConsumeThreadMax(10);
defaultMQPushConsumer.setConsumeThreadMin(1);
-2 以批量方式进行消费
某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中
涉及update某个数据库,一次update10条的时间会大大小于十次update1条数据的时间。
可以通过批量方式消费来提高消费的吞吐量。实现方法是设置Consumer的consumeMessageBatchMaxSize
这个参数,默认是1,如果设置为N,在消息多的时候每次收到的是个长度为N的消息链表。
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);
- 3 检测延时情况,跳过非重要消息
Consumer在消费的过程中,如果发现由于某种原因发生严重的消息堆积,短时间无法消除堆
积,这个时候可以选择丢弃不重要的消息,使Consumer尽快追上Producer的进度。
这里的解决办法是,我们可以在生产消息时,通过tag
设置来区分消息的重要性,暂时跳过次消息,先不进行消费。
消息过滤
支持两种过滤方式 Tag过滤方式
和 SQL92的过滤方式
(仅对push的消费者起作用)
Tag方式虽然效率高,但是支持的过滤逻辑比较简单。
SQL表达式可以更加灵活的支持复杂过滤逻辑,这种方式的大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样
Tag方式
拉消息
PullResult result = defaultMQPullConsumer.pull("消息队topic", "*", 0L, 100);
推消息
defaultMQPushConsumer.subscribe("topic.push","TagA||TagB|| TagC");
||
表示或的意思,*
表示不过滤
SQL92的过滤方式
步骤
配置文件里开启
enablePropertyFilter=true
./mqbroker -n localhost:9876 -c …/conf/broker.conf -p | grep -i propertyfilter
首先需要开启支持SQL92的特性,然后重启broker:
mqbroker -n localhost:9876 -c /opt/rocket/conf/broker.conf
可以查看是否开启
基本的语法
- 数字比较: >, >=, <, <=, BETWEEN, =
- 字符串比较: =, <>, IN; IS NULL或者IS NOT NULL;
- 逻辑比较: AND, OR, NOT;
- Constant types are: 数字如:123, 3.1415; 字符串如:‘abc’,必须是单引号引起来 NULL,特殊常量 布尔型如:TRUE or FALSE;
发送消息
设置不同的属性key 消费端进行过滤
for (int i = 0; i <10 ; i++) {
Message message = new Message("topic_name", ("key="+i).getBytes("utf-8"));
String falg = "";
if (i%2==0){
falg = "v1";
}else {
falg = "v2";
}
// 给消息添加用户属性
message.putUserProperty("key", falg);
// 1 同步发送 如果发送失败会根据重试次数重试
SendResult send = producer.send(message);
SendStatus sendStatus = send.getSendStatus();
System.out.println(sendStatus.toString());
}
推送消费
defaultMQPushConsumer.subscribe("topic_name", MessageSelector.bySql("key = 'v1'"));
//defaultMQPushConsumer.subscribe("topic_name", MessageSelector.bySql("key = 'v1' and key = 'v2'"));
// defaultMQPushConsumer.subscribe("topic_name", MessageSelector.bySql("key IS NOT NULL"));
//监听消息
defaultMQPushConsumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//final MessageQueue messageQueue = context.getMessageQueue();
for (MessageExt msg : msgs) {
System.out.println(msg);
try {
System.out.println(new String(msg.getBody(), "utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
// 消息消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// 消息消费失败
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
可以看到 过滤后的消息都是key=v1类型的消息
MessageExt [queueId=0, storeSize=188, queueOffset=20, sysFlag=0, bornTimestamp=1628868816509, bornHost=/192.168.0.103:63441, storeTimestamp=1628868816532, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000001338C, commitLogOffset=78732, bodyCRC=858365373, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=21, CONSUME_START_TIME=1628868816545, UNIQ_KEY=C0A80067A5857C53A9EB42DA827C0000, CLUSTER=DefaultCluster, WAIT=true, key=v1}, body=[107, 101, 121, 61, 48], transactionId=‘null’}]
key=0
MessageExt [queueId=2, storeSize=188, queueOffset=18, sysFlag=0, bornTimestamp=1628868816540, bornHost=/192.168.0.103:63441, storeTimestamp=1628868816542, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F0000000000013504, commitLogOffset=79108, bodyCRC=1562901649, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=19, CONSUME_START_TIME=1628868816547, UNIQ_KEY=C0A80067A5857C53A9EB42DA829C0002, CLUSTER=DefaultCluster, WAIT=true, key=v1}, body=[107, 101, 121, 61, 50], transactionId=‘null’}]
key=2
MessageExt [queueId=0, storeSize=188, queueOffset=21, sysFlag=0, bornTimestamp=1628868816547, bornHost=/192.168.0.103:63441, storeTimestamp=1628868816548, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000001367C, commitLogOffset=79484, bodyCRC=876894628, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=22, CONSUME_START_TIME=1628868816551, UNIQ_KEY=C0A80067A5857C53A9EB42DA82A30004, CLUSTER=DefaultCluster, WAIT=true, key=v1}, body=[107, 101, 121, 61, 52], transactionId=‘null’}]
key=4
MessageExt [queueId=2, storeSize=188, queueOffset=19, sysFlag=0, bornTimestamp=1628868816552, bornHost=/192.168.0.103:63441, storeTimestamp=1628868816554, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F00000000000137F4, commitLogOffset=79860, bodyCRC=1514813576, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=20, CONSUME_START_TIME=1628868816558, UNIQ_KEY=C0A80067A5857C53A9EB42DA82A80006, CLUSTER=DefaultCluster, WAIT=true, key=v1}, body=[107, 101, 121, 61, 54], transactionId=‘null’}]
key=6
MessageExt [queueId=0, storeSize=188, queueOffset=22, sysFlag=0, bornTimestamp=1628868816560, bornHost=/192.168.0.103:63441, storeTimestamp=1628868816561, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000001396C, commitLogOffset=80236, bodyCRC=1039275407, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=23, CONSUME_START_TIME=1628868816565, UNIQ_KEY=C0A80067A5857C53A9EB42DA82B00008, CLUSTER=DefaultCluster, WAIT=true, key=v1}, body=[107, 101, 121, 61, 56], transactionId=‘null’}]
key=8
还没有评论,来说两句吧...