RocketMQ:顺序消息
消息有序指的是可以按照消息的发送顺序来消费。rockermq可以严格的保证消息有序,可以分为分区有序或者全部有序顺序消费的原理解析,在默认的情况下消息发送会采取round robin轮询方式把消息发送到不同的queue(分区队列); 而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只一次发送 到同一个queue中,消费的时候只从这个queue上一次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序; 如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的
下面用订单进行分区有序的示例,一个订单的顺序流程:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中, 消费时,同一个orderid获取到的肯定是同一个队列
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
构建消息:
public class OrderStep {
private long orderId;
private String desc;
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
public static List<OrderStep> buildOrders() {
// 1039L 创建 付款 推送 完成
// 1065L 创建 付款
// 7235L 创建 付款
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep demo = new OrderStep();
demo = new OrderStep();
demo.setOrderId(6L);
demo.setDesc("创建");
orderList.add(demo);
demo = new OrderStep();
demo.setOrderId(6L);
demo.setDesc("付款");
orderList.add(demo);
demo.setOrderId(6L);
demo.setDesc("推送");
orderList.add(demo);
demo = new OrderStep();
demo.setOrderId(6L);
demo.setDesc("完成");
orderList.add(demo);
demo = new OrderStep();
demo.setOrderId(7L);
demo.setDesc("推送");
orderList.add(demo);
demo = new OrderStep();
demo.setOrderId(7L);
demo.setDesc("完成");
orderList.add(demo);
demo = new OrderStep();
demo.setOrderId(9L);
demo.setDesc("创建");
orderList.add(demo);
demo = new OrderStep();
demo.setOrderId(9L);
demo.setDesc("付款");
orderList.add(demo);
demo = new OrderStep();
demo.setOrderId(9L);
demo.setDesc("推送");
orderList.add(demo);
demo = new OrderStep();
demo.setOrderId(9L);
demo.setDesc("完成");
orderList.add(demo);
return orderList;
}
}
生产者
public class Producer {
public static void main(String[] args) throws Exception {
// 1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2.指定NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 3.启动producer
producer.start();
// 构建消息集合
List<OrderStep> orderStepList = OrderStep.buildOrders();
// 发送消息
for (int i = 0; i < orderStepList.size(); i++) {
String body = orderStepList.get(i)+"";
Message message = new Message("OrderTopic","Order","i"+i,body.getBytes());
/**
* 参数1:消息对象
* 参数2:消息队列的选择器
* 参数3:选择队列的业务标识(订单id)
*/
SendResult send = producer.send(message, new MessageQueueSelector() {
/**
*
* @param list 队列集合
* @param message 消息对象
* @param o 业务标识的参数
* @return
*/
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
long orderId = (Long) o;
long index = orderId % list.size();
return list.get((int) index);
}
}, orderStepList.get(i).getOrderId());
System.out.println("发送结果:"+send);
}
producer.shutdown();
}
}
消费者
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 2.指定Nameserver地址
consumer.setNamesrvAddr("localhost:9876");
// 3.订阅主题Topic和Tag
consumer.subscribe("OrderTopic","*");
// 4.注册消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt messageExt : list) {
System.out.println("线程名称:["+Thread.currentThread().getName()+"]消费消息:"+new String(messageExt.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 5.启动消费者
consumer.start();
System.out.println("消费者启动");
}
}
还没有评论,来说两句吧...