(四)Rocketmq消息发送

以你之姓@ 2023-07-23 03:41 90阅读 0赞

文章目录

      • 一.消息发送
      • 二.消息结构
      • 三.生产者启动流程
      • 四.消息发送基本流程
        • 4.1消息长度验证
        • 4.2查找主题路由信息
        • 4.3选择消息队列
        • 4.4消息发送
      • 五.批量发送

Rocketmq发送消息有三种实现方式:可靠同步发送,可靠异步发送,单向(oneway)发送

一.消息发送

支持3种消息发送 sync同步 async异步 oneway单向
同步:发送者向MQ执行发送消息api,同步等待,知道消息服务器返回发送结果
异步:发送者向MQ执行发送消息api,指定消息发送成功后回调函数,然后调用消息发送API后,立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或者失败的毁掉任务在一个新的线程中执行
单向:发送者向MQ执行发送消息api,直接返回,不等待消息服务器的结果,也不注册回调函数,简单说,就是只管发,不在乎是否成功存储在消息服务器上

二.消息结构

org.apache.rocketmq.common.message.Message

  1. public class Message implements Serializable {
  2. private static final long serialVersionUID = 8445773977080406428L;
  3. private String topic;
  4. /** * @see org.apache.rocketmq.common.sysflag.MessageSysFlag */
  5. private int flag;
  6. /** * 扩展属性 */
  7. private Map<String, String> properties;
  8. private byte[] body;
  9. private String transactionId;
  10. public Message() {
  11. }
  12. public Message(String topic, byte[] body) {
  13. this(topic, "", "", 0, body, true);
  14. }
  15. public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
  16. this.topic = topic;
  17. this.flag = flag;
  18. this.body = body;
  19. //消息的tag用于消息过滤
  20. if (tags != null && tags.length() > 0)
  21. this.setTags(tags);
  22. //Message索引键,多个用空格隔开,rocketmq根据这些key快速检索消息
  23. if (keys != null && keys.length() > 0)
  24. this.setKeys(keys);
  25. //发送时是否等消息存储完成再返回
  26. this.setWaitStoreMsgOK(waitStoreMsgOK);
  27. }

三.生产者启动流程

Producer的代码在client模块中,对于Rocketmq来说,它就是客户端,也是消息的提供者;
org.apache.rocketmq.client.producer.DefaultMQProducer
我们可以从quickstart中producer.start()开始追寻消息发送者的启动流程
org.apache.rocketmq.client.producer.DefaultMQProducer#start

  1. public void start() throws MQClientException {
  2. //设置生产组
  3. this.setProducerGroup(withNamespace(this.producerGroup));
  4. this.defaultMQProducerImpl.start();
  5. if (null != traceDispatcher) {
  6. try {
  7. traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
  8. } catch (MQClientException e) {
  9. log.warn("trace dispatcher start failed ", e);
  10. }
  11. }
  12. }

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)

  1. public void start(final boolean startFactory) throws MQClientException {
  2. //某人CREATE_JUST
  3. switch (this.serviceState) {
  4. case CREATE_JUST:
  5. this.serviceState = ServiceState.START_FAILED;
  6. //检查配置
  7. this.checkConfig();
  8. //转换成进程id ->pid
  9. if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
  10. this.defaultMQProducer.changeInstanceNameToPID();
  11. }
  12. //创建MQClientInstance实例 是producer cunsumer NameServer broker的网络通道
  13. this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
  14. //生产组 生产者 放入producerTable
  15. boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
  16. if (!registerOK) {
  17. this.serviceState = ServiceState.CREATE_JUST;
  18. throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
  19. + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
  20. null);
  21. }
  22. this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
  23. if (startFactory) {
  24. //启动MQClientInstance
  25. mQClientFactory.start();
  26. }
  27. this.serviceState = ServiceState.RUNNING;
  28. break;
  29. .......
  30. default:
  31. break;
  32. }
  33. this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

四.消息发送基本流程

消息发送的主要流程是:验证消息,查找路由,消息发送
org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)

  1. public SendResult send(
  2. Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  3. //验证消息
  4. Validators.checkMessage(msg, this);
  5. msg.setTopic(withNamespace(msg.getTopic()));
  6. return this.defaultMQProducerImpl.send(msg);
  7. }

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message) 默认的同步发送

  1. public SendResult send(
  2. Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  3. return send(msg, this.defaultMQProducer.getSendMsgTimeout());
  4. }
  5. public SendResult send(Message msg,
  6. long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  7. return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
  8. }

默认发送超时时间为3000ms

4.1消息长度验证

消息发送之前,首先确保生产者处于运行状态,然后验证消息是否符合相应的规范,具体规范为主体名称,消息体不能为空,消息长度不能小于0且不能超过4m(maxMessageSize=102410244)

4.2查找主题路由信息

在发送之前,首先获取主题的路由信息,只有获取了这些信息我们才能知道要发送到具体的Broker节点,第一次发送消息时,本地没有缓存topic路由信息,查询NameServer尝试获取,如果路由信息未获取到,再次尝试用默认主题DefaultMQProducerImpl#createTopicKey去查询,如果BrokerConfig#autoCreateTopicEnable为true,NameServer将返回路由信息,如果为false,则抛出无法找到topic异常
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo

  1. private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
  2. //从topic信息列表获取
  3. TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
  4. //未获取到 或者消息队列为null
  5. if (null == topicPublishInfo || !topicPublishInfo.ok()) {
  6. this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
  7. //更新topic相关列表
  8. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
  9. topicPublishInfo = this.topicPublishInfoTable.get(topic);
  10. }
  11. if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
  12. return topicPublishInfo;
  13. } else {
  14. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
  15. topicPublishInfo = this.topicPublishInfoTable.get(topic);
  16. return topicPublishInfo;
  17. }
  18. }

org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)

  1. public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
  2. DefaultMQProducer defaultMQProducer) {
  3. try {
  4. if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
  5. try {
  6. TopicRouteData topicRouteData;
  7. //isDefault 为true时查询
  8. if (isDefault && defaultMQProducer != null) {
  9. //根据默认主题"TBW102"
  10. topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
  11. 1000 * 3);
  12. if (topicRouteData != null) {
  13. for (QueueData data : topicRouteData.getQueueDatas()) {
  14. int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
  15. data.setReadQueueNums(queueNums);
  16. data.setWriteQueueNums(queueNums);
  17. }
  18. }
  19. } else {
  20. //根据topic查询
  21. topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
  22. }
  23. //如果有topic路由信息
  24. if (topicRouteData != null) {
  25. TopicRouteData old = this.topicRouteTable.get(topic);
  26. //新老路由信息对比
  27. boolean changed = topicRouteDataIsChange(old, topicRouteData);
  28. //没有变化
  29. if (!changed) {
  30. changed = this.isNeedUpdateTopicRouteInfo(topic);
  31. } else {
  32. log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
  33. }
  34. //变化了
  35. if (changed) {
  36. TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
  37. for (BrokerData bd : topicRouteData.getBrokerDatas()) {
  38. this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
  39. }
  40. // Update Pub info
  41. {
  42. TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
  43. publishInfo.setHaveTopicRouterInfo(true);
  44. Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
  45. while (it.hasNext()) {
  46. Entry<String, MQProducerInner> entry = it.next();
  47. MQProducerInner impl = entry.getValue();
  48. if (impl != null) {
  49. impl.updateTopicPublishInfo(topic, publishInfo);
  50. }
  51. }
  52. }
  53. // Update sub info
  54. {
  55. Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
  56. Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
  57. while (it.hasNext()) {
  58. Entry<String, MQConsumerInner> entry = it.next();
  59. MQConsumerInner impl = entry.getValue();
  60. if (impl != null) {
  61. impl.updateTopicSubscribeInfo(topic, subscribeInfo);
  62. }
  63. }
  64. }
  65. log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
  66. this.topicRouteTable.put(topic, cloneTopicRouteData);
  67. return true;

4.3选择消息队列

根据路由选择消息队列,返回消息队列按着broker,序号排序,假设topicA在broker-a broker-b分别创建2个队列,返回的消息队列是
[{“broker-name”:“broker-a”,“queueId”:0},{“broker-name”:“broker-a”,“queueId”:1},{“broker-name”:“broker-b”,“queueId”:0},{“broker-name”:“broker-b”,“queueId”:1}]
消息发送端采用重试机制,由retryTimesWhenSendFailed==2指定同步方式重试次数,异步重试机制在收到消息发送结构后执行回调之前进行重试,接下来就是循环执行,选择消息队列,发送消息,发送成功则返回,收到异常则重试,选择消息队列方式有两种
1)sendLatencyFaultEnable=false,默认不启动Broker故障延迟机制
2)sendLatencyFaultEnable=true,启动Broker故障延迟机制
step1:选择队列org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue
org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue 默认的策略就是轮询
org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String)
默认模式

  1. public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
  2. //第一次 lastBrokerName为null
  3. if (lastBrokerName == null) {
  4. return selectOneMessageQueue();
  5. } else {
  6. //序号+1
  7. int index = this.sendWhichQueue.getAndIncrement();
  8. for (int i = 0; i < this.messageQueueList.size(); i++) {
  9. int pos = Math.abs(index++) % this.messageQueueList.size();
  10. if (pos < 0)
  11. pos = 0;
  12. //选择一个不一样的broker
  13. MessageQueue mq = this.messageQueueList.get(pos);
  14. if (!mq.getBrokerName().equals(lastBrokerName)) {
  15. return mq;
  16. }
  17. }
  18. return selectOneMessageQueue();
  19. }
  20. }
  21. public MessageQueue selectOneMessageQueue() {
  22. int index = this.sendWhichQueue.getAndIncrement();
  23. int pos = Math.abs(index) % this.messageQueueList.size();
  24. if (pos < 0)
  25. pos = 0;
  26. //轮序
  27. return this.messageQueueList.get(pos);
  28. }

如果Broker宕机,生产端不会立刻知晓,需要消息生产者每隔30s进行一次路由更新
org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue

4.4消息发送

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl

  1. private SendResult sendKernelImpl(final Message msg,
  2. final MessageQueue mq,
  3. final CommunicationMode communicationMode,
  4. final SendCallback sendCallback,
  5. final TopicPublishInfo topicPublishInfo,
  6. final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  7. long beginStartTime = System.currentTimeMillis();
  8. //broker的ip:port
  9. String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  10. if (null == brokerAddr) {
  11. tryToFindTopicPublishInfo(mq.getTopic());
  12. brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  13. }
  14. SendMessageContext context = null;
  15. if (brokerAddr != null) {
  16. brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
  17. byte[] prevBody = msg.getBody();
  18. try {
  19. //for MessageBatch,ID has been set in the generating process 为消息分配全局id
  20. if (!(msg instanceof MessageBatch)) {
  21. MessageClientIDSetter.setUniqID(msg);
  22. }
  23. boolean topicWithNamespace = false;
  24. if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
  25. msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
  26. topicWithNamespace = true;
  27. }
  28. int sysFlag = 0;
  29. boolean msgBodyCompressed = false;
  30. //对消息体进行压缩处理
  31. if (this.tryToCompressMessage(msg)) {
  32. sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
  33. msgBodyCompressed = true;
  34. }
  35. //如果是事务消息 则标记消息
  36. final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
  37. if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
  38. sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
  39. }
  40. //如果注册了消息发送的钩子函数,则执行消息发送之前的增强逻辑
  41. if (hasCheckForbiddenHook()) {
  42. CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
  43. checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
  44. checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
  45. checkForbiddenContext.setCommunicationMode(communicationMode);
  46. checkForbiddenContext.setBrokerAddr(brokerAddr);
  47. checkForbiddenContext.setMessage(msg);
  48. checkForbiddenContext.setMq(mq);
  49. checkForbiddenContext.setUnitMode(this.isUnitMode());
  50. this.executeCheckForbiddenHook(checkForbiddenContext);
  51. }
  52. if (this.hasSendMessageHook()) {
  53. context = new SendMessageContext();
  54. context.setProducer(this);
  55. context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
  56. context.setCommunicationMode(communicationMode);
  57. context.setBornHost(this.defaultMQProducer.getClientIP());
  58. context.setBrokerAddr(brokerAddr);
  59. context.setMessage(msg);
  60. context.setMq(mq);
  61. context.setNamespace(this.defaultMQProducer.getNamespace());
  62. String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
  63. if (isTrans != null && isTrans.equals("true")) {
  64. context.setMsgType(MessageType.Trans_Msg_Half);
  65. }
  66. if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
  67. context.setMsgType(MessageType.Delay_Msg);
  68. }
  69. this.executeSendMessageHookBefore(context);
  70. }
  71. //构建发送请求包
  72. SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
  73. requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
  74. requestHeader.setTopic(msg.getTopic());
  75. requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
  76. requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
  77. requestHeader.setQueueId(mq.getQueueId());
  78. requestHeader.setSysFlag(sysFlag);
  79. requestHeader.setBornTimestamp(System.currentTimeMillis());
  80. requestHeader.setFlag(msg.getFlag());
  81. requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
  82. requestHeader.setReconsumeTimes(0);
  83. requestHeader.setUnitMode(this.isUnitMode());
  84. requestHeader.setBatch(msg instanceof MessageBatch);
  85. if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  86. String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
  87. if (reconsumeTimes != null) {
  88. requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
  89. MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
  90. }
  91. String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
  92. if (maxReconsumeTimes != null) {
  93. requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
  94. MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
  95. }
  96. }
  97. SendResult sendResult = null;
  98. //发送方式
  99. switch (communicationMode) {
  100. case ASYNC:
  101. Message tmpMessage = msg;
  102. boolean messageCloned = false;
  103. if (msgBodyCompressed) {
  104. //If msg body was compressed, msgbody should be reset using prevBody.
  105. //Clone new message using commpressed message body and recover origin massage.
  106. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
  107. tmpMessage = MessageAccessor.cloneMessage(msg);
  108. messageCloned = true;
  109. msg.setBody(prevBody);
  110. }
  111. if (topicWithNamespace) {
  112. if (!messageCloned) {
  113. tmpMessage = MessageAccessor.cloneMessage(msg);
  114. messageCloned = true;
  115. }
  116. msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
  117. }
  118. long costTimeAsync = System.currentTimeMillis() - beginStartTime;
  119. if (timeout < costTimeAsync) {
  120. throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
  121. }
  122. sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
  123. brokerAddr,
  124. mq.getBrokerName(),
  125. tmpMessage,
  126. requestHeader,
  127. timeout - costTimeAsync,
  128. communicationMode,
  129. sendCallback,
  130. topicPublishInfo,
  131. this.mQClientFactory,
  132. this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
  133. context,
  134. this);
  135. break;
  136. case ONEWAY:
  137. case SYNC:
  138. long costTimeSync = System.currentTimeMillis() - beginStartTime;
  139. if (timeout < costTimeSync) {
  140. throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
  141. }
  142. sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
  143. brokerAddr,
  144. mq.getBrokerName(),
  145. msg,
  146. requestHeader,
  147. timeout - costTimeSync,
  148. communicationMode,
  149. context,
  150. this);
  151. break;
  152. default:
  153. assert false;
  154. break;
  155. }
  156. //注册了消息发送钩子函数 执行after逻辑
  157. if (this.hasSendMessageHook()) {
  158. context.setSendResult(sendResult);
  159. this.executeSendMessageHookAfter(context);
  160. }
  161. return sendResult;
  162. } catch (RemotingException e) {
  163. if (this.hasSendMessageHook()) {
  164. context.setException(e);
  165. this.executeSendMessageHookAfter(context);
  166. }
  167. throw e;
  168. } catch (MQBrokerException e) {
  169. if (this.hasSendMessageHook()) {
  170. context.setException(e);
  171. this.executeSendMessageHookAfter(context);
  172. }
  173. ......

那么Broker是如何处理的呢?
org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest
org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage

  1. private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
  2. final RemotingCommand request,
  3. final SendMessageContext sendMessageContext,
  4. final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
  5. final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
  6. final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
  7. response.setOpaque(request.getOpaque());
  8. response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
  9. response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
  10. log.debug("receive SendMessage request command, {}", request);
  11. final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
  12. if (this.brokerController.getMessageStore().now() < startTimstamp) {
  13. response.setCode(ResponseCode.SYSTEM_ERROR);
  14. response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
  15. return response;
  16. }
  17. response.setCode(-1);
  18. //消息检查
  19. super.msgCheck(ctx, requestHeader, response);
  20. if (response.getCode() != -1) {
  21. return response;
  22. }
  23. final byte[] body = request.getBody();
  24. int queueIdInt = requestHeader.getQueueId();
  25. TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
  26. if (queueIdInt < 0) {
  27. queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
  28. }
  29. MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
  30. msgInner.setTopic(requestHeader.getTopic());
  31. msgInner.setQueueId(queueIdInt);
  32. //如果消息重试次数达到最大的重试次数,消息将进入DLD延迟队列 主题是%DLQ%+消费组
  33. if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
  34. return response;
  35. }
  36. msgInner.setBody(body);
  37. msgInner.setFlag(requestHeader.getFlag());
  38. MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
  39. msgInner.setPropertiesString(requestHeader.getProperties());
  40. msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
  41. msgInner.setBornHost(ctx.channel().remoteAddress());
  42. msgInner.setStoreHost(this.getStoreHost());
  43. msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
  44. PutMessageResult putMessageResult = null;
  45. Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
  46. String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
  47. if (traFlag != null && Boolean.parseBoolean(traFlag)) {
  48. if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
  49. response.setCode(ResponseCode.NO_PERMISSION);
  50. response.setRemark(
  51. "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
  52. + "] sending transaction message is forbidden");
  53. return response;
  54. }
  55. putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
  56. } else {
  57. //进行消息存储
  58. putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
  59. }
  60. return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
  61. }

异步发送:调用发送API之后,无需阻塞等待消息服务器返回本次消息发送结果,只需要提供一个回调函数即可,异步方式相比较同步方式来说,性能有所提高,单位了保护消息服务器的压力,rocketmq对消息异步发送进行了并发限制,通过clientAsyncSemaphoreTimes-WhenSendAsyncFailed属性来控制消息重试次数,但是重试的调用入口是在收到服务端响应包时进行的,如果出现网络异常,网络超时等将不会进行重试
单向发送:调用消息发送api之后,无须等待消息服务器返回本次消息发送结果,并且无须提供回调函数,表示消息发送压根就不关心本次消息发送是否成功,

五.批量发送

批量发送消息是同一主题的多条消息一起打包发送到消息服务器,减少网络调用次数,提高网络传输效率
单条消息发送是,消息体的内容将保存在body中,批量消息发送,需要将多条消息消息体的内容存储在body中,Rocketmq采取的方式是,对单条消息内容使用固定格式进行存储 ———->单条和批量
总长度4字节->魔数4字节->bode CRC4字节->Flag4字节->body长度4字节->消息体N字节->属性长度2字节->扩展属性N字节
org.apache.rocketmq.client.producer.DefaultMQProducer#send(java.util.Collection)

  1. public SendResult send(
  2. Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
  3. return this.defaultMQProducerImpl.send(batch(msgs));
  4. }
  5. private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
  6. MessageBatch msgBatch;
  7. try {
  8. msgBatch = MessageBatch.generateFromList(msgs);
  9. for (Message message : msgBatch) {
  10. Validators.checkMessage(message, this);
  11. MessageClientIDSetter.setUniqID(message);
  12. message.setTopic(withNamespace(message.getTopic()));
  13. }
  14. msgBatch.setBody(msgBatch.encode());
  15. } catch (Exception e) {
  16. throw new MQClientException("Failed to initiate the MessageBatch", e);
  17. }
  18. msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
  19. return msgBatch;
  20. }

org.apache.rocketmq.common.message.MessageDecoder#encodeMessage

  1. public static byte[] encodeMessage(Message message) {
  2. //only need flag, body, properties
  3. byte[] body = message.getBody();
  4. int bodyLen = body.length;
  5. String properties = messageProperties2String(message.getProperties());
  6. byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
  7. //note properties length must not more than Short.MAX
  8. short propertiesLength = (short) propertiesBytes.length;
  9. int sysFlag = message.getFlag();
  10. int storeSize = 4 // 1 TOTALSIZE
  11. + 4 // 2 MAGICCOD
  12. + 4 // 3 BODYCRC
  13. + 4 // 4 FLAG
  14. + 4 + bodyLen // 4 BODY
  15. + 2 + propertiesLength;
  16. ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
  17. // 1 TOTALSIZE
  18. byteBuffer.putInt(storeSize);
  19. // 2 MAGICCODE
  20. byteBuffer.putInt(0);
  21. // 3 BODYCRC
  22. byteBuffer.putInt(0);
  23. // 4 FLAG
  24. int flag = message.getFlag();
  25. byteBuffer.putInt(flag);
  26. // 5 BODY
  27. byteBuffer.putInt(bodyLen);
  28. byteBuffer.put(body);
  29. // 6 properties
  30. byteBuffer.putShort(propertiesLength);
  31. byteBuffer.put(propertiesBytes);
  32. return byteBuffer.array();
  33. }

参考:rocketmq技术内幕

发表评论

表情:
评论列表 (有 0 条评论,90人围观)

还没有评论,来说两句吧...

相关阅读

    相关 RocketMq发送延迟消息

    什么是延迟消息? 对于消息中间件来说,producer将消息发送到mq的服务器,但并不期望这条消息马上被消费,而是推迟到当前时间点之后的某个时间点后再投递到queue中让

    相关 RocketMQ(04)——发送顺序消息

    发送顺序消息 如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代

    相关 RocketMQ消息发送

    RocketMQ支持3种消息发送方式:同步(sync)、异步(async)、单向(oneway)。 同步:发送者向MQ执行发送消息API时,同步等待,直到消息服务器返回