RocketMQ生产者消息发送源码解析总结
RocketMQ在发送消息时,内部使用DefaultMQProducerImpl#sendDefaultImpl(…)来做消息的发送工作。看其代码:
/** * 发送消息。 * 1. 获取消息路由信息 * 2. 选择要发送到的消息队列 * 3. 执行消息发送核心方法 * 4. 对发送结果进行封装返回 * * @param msg 消息 * @param communicationMode 通信模式 * @param sendCallback 发送回调 * @param timeout 发送消息请求超时时间 * @return 发送结果 * @throws MQClientException 当Producer发生异常 * @throws RemotingException 当远程请求发生异常 * @throws MQBrokerException 当Broker发生异常 * @throws InterruptedException 当线程被打断 */
private SendResult sendDefaultImpl(Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 校验 Producer 处于运行状态
this.makeSureStateOK();
// 校验消息格式
Validators.checkMessage(msg, this.defaultMQProducer);
//
final long invokeID = random.nextLong(); // 调用编号;用于下面打印日志,标记为同一次发送消息
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
@SuppressWarnings("UnusedAssignment")
long endTimestamp = beginTimestampFirst;
// 获取 Topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); //#35
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null; // 最后选择消息要发送到的队列
Exception exception = null;
SendResult sendResult = null; // 最后一次发送结果
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步3次调用
int times = 0; // 第几次发送
String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名
// 循环调用发送消息,直到成功
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
@SuppressWarnings("SpellCheckingInspection") //#47
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 选择消息要发送到的队列,默认策略下,下次发送选择其他的Broker
if (tmpmq != null) {
mq = tmpmq;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
// 调用发送消息核心方法
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
// 更新Broker可用性信息,发送时间超过550ms后会有不可用时长,至少30S
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时,进行重试
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) { // 打印异常,更新Broker可用性信息,停用10M,更新继续循环
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String
.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev,
mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) { // 打印异常,更新Broker可用性信息,停用10M,继续循环
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String
.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev,
mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String
.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev,
mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
// 如下异常continue,进行发送消息重试
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
// 如果有发送结果,进行返回,否则,抛出异常;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID,
endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
// 返回发送结果
if (sendResult != null) {
return sendResult;
}
......
}
#35 : 在发送消息之前,需要获取到当前Topic的路由信息,也就是BrokerData,QueueData。
tryToFindTopicPublishInfo(final String topic):
/** * 获取 Topic发布信息 * 如果获取不到,或者状态不正确,则从 Namesrv获取一次 * * @param topic Topic * @return topic 信息 */
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 缓存中获取 Topic发布信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 当无可用的 Topic发布信息时,从Namesrv获取一次
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 若获取的 Topic发布信息时候可用,则返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else { // 使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。用于 Topic发布信息不存在 && Broker支持自动创建Topic
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
Producer每隔30S从Namesrv处获取当前最新的TopicRouteData,其内包含了topic的所有BrokerData,QueueData。Producer从TopicRouteData中提取可写入的(Master)的Queue新封装成MessageQueue,然后放入TopicPublishInfo对象中。一个Topic可能含有多个Broker上的多个可写的MessageQueue。
Producer在发送消息前,提取相应Topic的TopicPublishInfo ,如果发送的是当前Topic的第一条消息,那么Broker上还没有当前Topic的数据,因此从Broker上获取默认的CreateTopic的TopicPublishInfo ,这个TopicPublishInfo相当于Topic的模板,当发送给Broker后,其发现消息内含有此模板信息时,会按模板创建一个TopicRouteData,然后赋值给消息内的实际Topic。
#47:从多个MessageQueue选择一个来发送消息:
selectOneMessageQueue(topicPublishInfo, lastBrokerName)
/** * 根据 Topic发布信息 选择一个消息队列 * 会尽量选择上次尝试的Broker * 默认情形下向所有Broker的MessageQueue按顺序轮流发送 * * @param tpInfo Topic发布信息 * @param lastBrokerName * @return 消息队列 */
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) { //若开启了延迟容错机制,默认未开启
try {
//循环所有MessageQueue
// 当 lastBrokerName == null 时,获取第一个可用的MessageQueue
// 当 lastBrokerName != null 时, 获取 brokerName=lastBrokerName && 可用的MessageQueue
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0) {
pos = 0;
}
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
}
// 选择一个相对好的broker,并获得其对应的一个消息队列,按 可用性 > 延迟 > 开始可用时间 选择
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
// 选择一个消息队列,不考虑队列的可用性
return tpInfo.selectOneMessageQueue();
}
// 默认情形下的MessageQueue获取,按顺序轮流获取所有的Broker的MessageQueue
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
/** * 默认策略下的MessageQueue选择 * * @param lastBrokerName * @return */
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
/** * 直接选择上次发送队列的下一位 * * @return */
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0) { pos = 0; }
return this.messageQueueList.get(pos);
}
Producer在选择MessageQueue时,会尽量选择同一个Broker上的。同步发送下,最多发送3次,发送一次后,第二次发送会尽量选择上次发送的Broker,前提是上次发送的Broker未被隔离。
Producer在发送失败,出现异常时,大部分情况下会隔离Broker,也就是在内部标识此Broker在一段时间内不可用,也就是尽量不选择他们最为发送目标,隔离时间10分钟。当向一个Broker发送失败,隔离此Broker。
若开启了MessageQueue延迟容错机制时,选择其他未被隔离的Broker作为发送目标,如果所有的Broker都被隔离了,则按 可用性 > 延迟 > 开始可用时间 的对比性选择。
若未开启延迟容错机制(默认):第一次发送,会按顺序获取所有MessageQueue里上次发送队列的的下一位;若第一次发送失败了,按顺序隔位筛选,直到找到一个不是上个Broker的MessageQueue,也就是不要再向发送失败的Broker发送消息了。
当Producer发送成功,但耗时较长,超过550ms时,会触发隔离,隔离时间按发送耗时阶梯增长,至少30S,隔离仅在延迟容错机制开启下才能发生作用。
还没有评论,来说两句吧...