RocketMQ 相关知识

Bertha 。 2022-06-09 10:21 242阅读 0赞

RocketMQ 相关知识

Rocket MQ消息队列(Message Queue,简称 MQ)是阿里巴巴集团中间件技术部自主研发的专业消息中间件。产品基于高可用分布式集群技术,提供消息发布订阅、消息轨迹查询、定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。

Rocket MQ相关名词

  • Producer 消息生产者,负责生产消息
  • Consumer 消息消费者,负责消费消息
  • NameServer 无状态节点,用来保存活跃的broker列表和topic列表
  • Broker 消息中转角色,负责存储消息,转发消息
  • Topic 消息的逻辑管理单位
  • Message 消息
  1. * body 消息体,用于携带消息具体内容
  2. * key 消息的key,用于区别不同的消息
  3. * tags 消息的Tag,用于不同的订阅者过滤消息

消息发送方式

  • 同步方式
  1. > 发送消息,接收到结果之后再发送下一条消息,速度最慢,耗时最长
  • 异步方式
    发送消息,不论是否收到结果,直接发送下一条消息,发送速度介于同步和单向方式之间
  • 单向方式
    发送消息,直接发送消息,不返回发送结果,发送速度最快

消息类型

  • 定时消息
  1. > 在指定的发送时间发送消息
  • 延时消息
    从当前时间开始,经过延时时间后再发送消息
  • 顺序消息
    立即发送消息
  • 事务消息
    MQ 提供类似 X/Open XA 的分布事务功能,通过 MQ 事务消息能达到分布式事务的最终一致

示例代码

Producer

  1. public class ProducerDelayTest {
  2. public static void main(String[] args) {
  3. Properties properties = new Properties();
  4. //您在 MQ 控制台创建的Producer ID
  5. properties.put(PropertyKeyConst.ProducerId, "XXX");
  6. // 阿里云身份验证,在阿里云服务器管理控制台创建
  7. properties.put(PropertyKeyConst.AccessKey, "XXX");
  8. // 阿里云身份验证,在阿里云服务器管理控制台创建
  9. properties.put(PropertyKeyConst.SecretKey, "XXX");
  10. // 设置 TCP 接入域名(此处以公共云生产环境为例)
  11. properties.put(PropertyKeyConst.ONSAddr,
  12. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
  13. Producer producer = ONSFactory.createProducer(properties);
  14. // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
  15. producer.start();
  16. /** * 消息类型代码,参考下面消息类型代码 */
  17. /** * 消息发送方式代码,参考下面发送方式代码 */
  18. System.out.println("Message Id:" + sendResult.getMessageId());
  19. // 在应用退出前,销毁 Producer 对象
  20. // 注意:如果不销毁也没有问题,如果发送消息较多不应该销毁
  21. producer.shutdown();
  22. }
  23. }

消息类型代码

  • 定时消息

    Message msg = new Message();

    1. msg.setTag("TAG");
    2. msg.setKey("KEY");
    3. msg.setTopic("TOPIC");
    4. msg.setBody("BODY".getBytes());
    5. long timeStamp =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2017-09-03 16:21:00").getTime();
    6. msg.setStartDeliverTime(timeStamp);
  • 延时消息

    Message msg = new Message();

    1. msg.setTag("TAG");
    2. msg.setKey("KEY");
    3. msg.setTopic("TOPIC");
    4. msg.setBody("BODY".getBytes());
    5. long delayTime = 3000;//30秒后再发送
    6. msg.setStartDeliverTime(System.currentTimes() + delayTime);
  • 顺序消息

    Message msg = new Message();

    1. msg.setTag("TAG");
    2. msg.setKey("KEY");
    3. msg.setTopic("TOPIC".getBytes());
    4. msg.setBody("BODY");

消息发送方式代码

  • 同步方式发送

    SendResult sendResult = producer.send(msg);

  • 异步方式发送

    producer.sendAsync(message, new SendCallback() {

    1. @Override
    2. public void onSuccess(final SendResult sendResult) {
    3. logger.info("MQ send ASYNCHRONOUS message successed,response is " + JSON.toJSONString(sendResult));
    4. }
    5. @Override
    6. public void onException(OnExceptionContext onExceptionContext) {
    7. logger.info("MQ send ASYNCHRONOUS message failed, error is " + onExceptionContext.getException().getMessage());
    8. }
    9. });
  • 单向方式发送

    producer.sendOneway(message);

Consumer

  1. public class ConsumerTest {
  2. public static void main(String[] args) {
  3. Properties properties = new Properties();
  4. // 您在控制台创建的 Consumer ID
  5. properties.put(PropertyKeyConst.ConsumerId, "XXX");
  6. // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  7. properties.put(PropertyKeyConst.AccessKey, "XXX");
  8. // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  9. properties.put(PropertyKeyConst.SecretKey, "XXX");
  10. // 设置 TCP 接入域名(此处以公共云生产环境为例)
  11. properties.put(PropertyKeyConst.ONSAddr,
  12. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
  13. // 集群订阅方式 (默认)
  14. // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  15. // 广播订阅方式
  16. // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
  17. Consumer consumer = ONSFactory.createConsumer(properties);
  18. consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //订阅多个Tag
  19. public Action consume(Message message, ConsumeContext context) {
  20. System.out.println("Receive: " + message);
  21. return Action.CommitMessage;
  22. }
  23. });
  24. //订阅另外一个Topic
  25. consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { //订阅全部Tag
  26. public Action consume(Message message, ConsumeContext context) {
  27. System.out.println("Receive: " + message);
  28. return Action.CommitMessage;
  29. }
  30. });
  31. consumer.start();
  32. System.out.println("Consumer Started");
  33. }
  34. }

  • Spring Boot Demo 参照https://github.com/helloworlde/SpringBoot-RocketMQ

发表评论

表情:
评论列表 (有 0 条评论,242人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RocketMQ内部知识体系

    RocketMQ,阿里开源,社区活跃度高,现在很多企业都在用,我们今天来聊一下它的内部机制,以帮助我们更灵活的使用MQ。 RocketMQ内部结构: ![在这里插入图片

    相关 RocketMQ 相关知识

    RocketMQ 相关知识 > Rocket MQ消息队列(Message Queue,简称 MQ)是阿里巴巴集团中间件技术部自主研发的专业消息中间件。产品基于高可用分布

    相关 DHCP相关知识

    动态主机设置协议(英语:Dynamic Host Configuration Protocol,DHCP)是一个局域网的网络协议,使用UDP协议工作,主要有两个用途:用于内部网