RocketMq4.7源码解析之五(消息存储)
文章目录
- 消息队列与索引文件恢复
- 加载Commitlog文件
- 加载ConsumeQueue
- 加载索引文件
- 根据Broker是否是正常停止执行不同的恢复策略
- 恢复所有ConsumeQueue文件
- 正常停止commitLog文件恢复
- 异常停止commitLog文件恢复
- 存储流程
- 校验msg
- 异步存储消息(默认)
- 同步存储消息
- CommitLog刷盘
- 同步刷盘
- 异步刷盘
- 实时更新消息消费队列与索引文件
- 根据消息更新ConumeQueue
- 异步创建文件
- 同步创建文件 (默认)
- 根据消息更新Index 索引文件
- 获取或创建IndexFile 文件
- 添加到Hash索引文件中
- 过期文件删除机制
- 总结
消息队列与索引文件恢复
假设broker上次启动后,由于某些原因宕机了,导致ommitlog 、ConsumeQueue 、IndexFile 文件数据不一致,所以broker初次启动时,会去加载文件,然后尝试故障修复,让数据达到最终一致性.
加载存储文件
public boolean load() {
boolean result = true;
try {
//判断上一次退出是否正常。
/*
其实现机制是Broker 在启动时创建${ ROCKET_HOME} /store/abort 文件,
在退出时通过注册NM 钩子函数删除abort 文件。
如果下一次启动时存在abort文件。说明Broker 是异常退出的,
Commitlog 与Consumequeue数据有可能不一致,需要进行修复。
*/
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
//加载延迟队列, RocketMQ 定时消息相关
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
}
// load Commit Log 加载Commitlog 文件
result = result && this.commitLog.load();
// load Consume Queue 加载消息消费队列
result = result && this.loadConsumeQueue();
if (result) {
//加载存储检测点,检测点主要记录commitlog 文件、Consumequeue 文件、
//Index 索引文件的刷盘点,将在下文的文件刷盘机制中再次提交。
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
//加载索引文件
this.indexService.load(lastExitOK);
//根据Broker 是否是正常停止执行不同的恢复策略
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
//如果没能加载成功
if (!result) {
//关闭分配请求
this.allocateMappedFileService.shutdown();
}
return result;
}
判断上次退出是否正常
private boolean isTempFileExist() {
String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
File file = new File(fileName);
return file.exists();
}
也就是这个文件
加载Commitlog文件
public boolean load() {
boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed"));
return result;
}
public boolean load() {
//加载$ { ROCKET_HOME }/store/commitlog 目录下所有文件
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
//按照文件名排序
Arrays.sort(files);
for (File file : files) {
//如果文件大小与配置文件的单个文件大小不一致,将忽略该目录下所有文件,
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;
}
try {
//创建MappedFile 对象
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
//将wrotePosition 、flushedPosition ,
//committedPosition 三个指针都设置为文件大小。
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
}
return true;
}
加载ConsumeQueue
private boolean loadConsumeQueue() {
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {
//遍历消息消费队列根目录
for (File fileTopic : fileTopicList) {
//获取目录名
String topic = fileTopic.getName();
//加载每个消息消费队列下的文件,
File[] fileQueueIdList = fileTopic.listFiles();
if (fileQueueIdList != null) {
for (File fileQueueId : fileQueueIdList) {
int queueId;
try {
//获取文件名,就是queueId
queueId = Integer.parseInt(fileQueueId.getName());
} catch (NumberFormatException e) {
continue;
}
//构建ConsumeQueue 对象
//初始化ConsumeQueue 的topic 、queueld 、storePath 、mappedFileSize 属性
ConsumeQueue logic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
this);
//存入consumeQueueTable
this.putConsumeQueue(topic, queueId, logic);
//加载ConsumeQueue
if (!logic.load()) {
return false;
}
}
}
}
}
log.info("load logics queue all over, OK");
return true;
}
加载索引文件
IndexService
//加载索引文件
public boolean load(final boolean lastExitOK) {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
Arrays.sort(files);
for (File file : files) {
try {
IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);
//加载索引文件
f.load();
//如果上次异常退出
if (!lastExitOK) {
//且索引文件上次刷盘时间小于该索引文件最大的消息时间戳该文件将立即销毁。
if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()
.getIndexMsgTimestamp()) {
f.destroy(0);
continue;
}
}
log.info("load index file OK, " + f.getFileName());
//加入索引文件集合
this.indexFileList.add(f);
} catch (IOException e) {
log.error("load file {} error", file, e);
return false;
} catch (NumberFormatException e) {
log.error("load file {} error", file, e);
}
}
}
return true;
}
加载索引头部信息
IndexFile
public void load() {
this.indexHeader.load();
}
IndexHeader
public void load() {
//分别解析文件头部信息
this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex));
this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex));
this.beginPhyOffset.set(byteBuffer.getLong(beginPhyoffsetIndex));
this.endPhyOffset.set(byteBuffer.getLong(endPhyoffsetIndex));
this.hashSlotCount.set(byteBuffer.getInt(hashSlotcountIndex));
this.indexCount.set(byteBuffer.getInt(indexCountIndex));
if (this.indexCount.get() <= 0) {
this.indexCount.set(1);
}
}
根据Broker是否是正常停止执行不同的恢复策略
DefaultMessageStore
private void recover(final boolean lastExitOK) {
//恢复所有ConsumeQueue文件,返回最大的在ConsumeQueue存储的comlog偏移量
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
if (lastExitOK) {
//正常停止commitLog文件恢复
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
//异常停止commitLog文件恢复
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
this.recoverTopicQueueTable();
}
恢复所有ConsumeQueue文件
返回最大的在ConsumeQueue存储的comlog偏移量
//恢复所有ConsumeQueue文件
private long recoverConsumeQueue() {
long maxPhysicOffset = -1;
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
//遍历每个ConsumeQueue
for (ConsumeQueue logic : maps.values()) {
//恢复
logic.recover();
if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
maxPhysicOffset = logic.getMaxPhysicOffset();
}
}
}
//返回最大的在ConsumeQueue存储的comlog偏移量
return maxPhysicOffset;
}
恢复ConsumeQueue文件
ConsumeQueue
public void recover() {
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
//从倒数第三个文件开始进行恢复
int index = mappedFiles.size() - 3;
//不足3 个文件,则从第一个文件开始恢复。
if (index < 0)
index = 0;
int mappedFileSizeLogics = this.mappedFileSize;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
//获取该文件的初始偏移量
long processOffset = mappedFile.getFileFromOffset();
//当前文件已校验通过的offset
long mappedFileOffset = 0;
long maxExtAddr = 1;
while (true) {
for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
//每次获取一条消息
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
long tagsCode = byteBuffer.getLong();
if (offset >= 0 && size > 0) {
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
//重新记录消息体总长度加上消息在comlog偏移量
this.maxPhysicOffset = offset + size;
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
} else {
//说明遍历到没数据了
log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "
+ offset + " " + size + " " + tagsCode);
break;
}
}
//说明恢复到了结尾
if (mappedFileOffset == mappedFileSizeLogics) {
//重新计算下个遍历文件索引
index++;
if (index >= mappedFiles.size()) {
//遍历到最后一个,则结束遍历
log.info("recover last consume queue file over, last mapped file "
+ mappedFile.getFileName());
break;
} else {
//恢复下一个文件
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next consume queue file, " + mappedFile.getFileName());
}
} else {
//恢复消息队列结束
log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
+ (processOffset + mappedFileOffset));
break;
}
}
//记录该文件的恢复的物理偏移量
processOffset += mappedFileOffset;
//设置刷盘指针
this.mappedFileQueue.setFlushedWhere(processOffset);
//当前数据提交指针
this.mappedFileQueue.setCommittedWhere(processOffset);
//删除offset 之后的所有文件
this.mappedFileQueue.truncateDirtyFiles(processOffset);
if (isExtReadEnable()) {
this.consumeQueueExt.recover();
log.info("Truncate consume queue extend file by max {}", maxExtAddr);
this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
}
}
}
删除offset 之后的所有文件
MappedFileQueue
//删除offset 之后的所有文件
public void truncateDirtyFiles(long offset) {
List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
//遍历目录下文件
for (MappedFile file : this.mappedFiles) {
//获取文件尾部偏移
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
//若文件尾部偏移>offset
if (fileTailOffset > offset) {
//文件开始偏移>=offset
//说明当前文件包含了有效偏移
if (offset >= file.getFileFromOffset()) {
//分别设置当前文件的刷盘,提交,写入指针.
file.setWrotePosition((int) (offset % this.mappedFileSize));
file.setCommittedPosition((int) (offset % this.mappedFileSize));
file.setFlushedPosition((int) (offset % this.mappedFileSize));
} else {
//说明该文件是有效文件后面创建的
//释放MappedFile 占用的内存资源(内存映射与内存通道等)
file.destroy(1000);
//加入待删除集合
willRemoveFiles.add(file);
}
}
}
//删除文件
this.deleteExpiredFile(willRemoveFiles);
}
MappedFile文件销毁
MappedFile
public boolean destroy(final long intervalForcibly) {
//关闭MappedFile
this.shutdown(intervalForcibly);
//判断是否清理完成
if (this.isCleanupOver()) {
try {
//关闭通道
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
//删除整个物理文件
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeElapsedTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
关闭MappedFile
ReferenceResource
public void shutdown(final long intervalForcibly) {
//默认true
if (this.available) {
//初次调用时available 为true ,设置available为fal se
this.available = false;
//设置初次关闭的时间戳
this.firstShutdownTimestamp = System.currentTimeMillis();
//释放资源,引用次数小于1 的情况下才会释放资源
this.release();
} else if (this.getRefCount() > 0) {
//如果引用次数大于0
//对比当前时间与firstShutdownTimestamp ,如果已经超过了其最大拒绝存活期,每执行
//一次,将引用数减少1000 ,直到引用数小于0 时通过执行release方法释放资源。
if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
this.refCount.set(-1000 - this.getRefCount());
this.release();
}
}
}
释放引用和资源
ReferenceResource
public void release() {
//引用减1
long value = this.refCount.decrementAndGet();
if (value > 0)
return;
synchronized (this) {
//释放堆外内存
this.cleanupOver = this.cleanup(value);
}
}
释放堆外内存
MappedFile
public boolean cleanup(final long currentRef) {
//如果available为true ,表示MappedFile当前可用,无须清理,
if (this.isAvailable()) {
log.error("this file[REF:" + currentRef + "] " + this.fileName
+ " have not shutdown, stop unmapping.");
return false;
}
//如果资源已经被清除,返回true
if (this.isCleanupOver()) {
log.error("this file[REF:" + currentRef + "] " + this.fileName
+ " have cleanup, do not do it again.");
return true;
}
//如果是堆外内存,调用堆外内存的cleanup 方法清除
clean(this.mappedByteBuffer);
//维护虚拟内存
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
//对象个数-1
TOTAL_MAPPED_FILES.decrementAndGet();
log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
return true;
}
删除过期文件
MappedFileQueue
void deleteExpiredFile(List<MappedFile> files) {
if (!files.isEmpty()) {
Iterator<MappedFile> iterator = files.iterator();
while (iterator.hasNext()) {
MappedFile cur = iterator.next();
//mappedFiles,不包含,就跳过.
if (!this.mappedFiles.contains(cur)) {
iterator.remove();
log.info("This mappedFile {} is not contained by mappedFiles, so skip it.", cur.getFileName());
}
}
try {
//从mappedFiles删除所有
if (!this.mappedFiles.removeAll(files)) {
log.error("deleteExpiredFile remove failed.");
}
} catch (Exception e) {
log.error("deleteExpiredFile has exception.", e);
}
}
}
正常停止commitLog文件恢复
CommitLog
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
//在进行文件恢复时查找消息时是否验证CRC
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Began to recover from the last third file
//从倒数第三个文件开始进行恢复
int index = mappedFiles.size() - 3;
//如果不足3个文件,则从第一个文件开始恢复。
if (index < 0)
index = 0;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
//Commitlog 文件已确认的物理偏移量,等于mappedFile.getFileFromOffset 加上mappedFileOffset 。
long processOffset = mappedFile.getFileFromOffset();
//当前文件已校验通过的offset ,
long mappedFileOffset = 0;
while (true) {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
//取出一条消息
int size = dispatchRequest.getMsgSize();
// Normal data
//查找结果为true 并且消息的长度大于0 表示消息正确,
if (dispatchRequest.isSuccess() && size > 0) {
//mappedFileOffset 指针向前移动本条消息的长度
mappedFileOffset += size;
}
// Come the end of the file, switch to the next file Since the
// return 0 representatives met last hole,
// this can not be included in truncate offset
else if (dispatchRequest.isSuccess() && size == 0) {
//如果查找结果为true 并且消息的长度等于0 ,表示已到该文件的末尾
index++;
//没有文件则退出
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
//若还有文件
mappedFile = mappedFiles.get(index);
//重置变量,继续遍历下个文件,并重新进入循环
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
// Intermediate file read error
else if (!dispatchRequest.isSuccess()) {
//读取消息错误,直接结束
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
}
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
//删除offset 之后的所有文件
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
//queue记录的最大commitlog偏移若大于commitlog存储的最大偏移
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
//删除processOffset之后存储的ConsumeQueue脏数据文件
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
} else {
// Commitlog case files are deleted
log.warn("The commitlog files are deleted, and delete the consume queue files");
//Commitlog文件不存在
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
//销毁所有ConsumeQueue文件
this.defaultMessageStore.destroyLogics();
}
}
删除processOffset之后存储的ConsumeQueue脏数据文件
public void truncateDirtyLogicFiles(long phyOffset) {
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
//遍历每个队列目录
for (ConsumeQueue logic : maps.values()) {
//每个目录执行删除
logic.truncateDirtyLogicFiles(phyOffset);
}
}
}
截断phyOffet之后的文件
ConsumeQueue
public void truncateDirtyLogicFiles(long phyOffet) {
//获取逻辑文件大小
int logicFileSize = this.mappedFileSize;
//设置commitlog最大偏移
this.maxPhysicOffset = phyOffet;
long maxExtAddr = 1;
while (true) {
//获取最后一个文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile != null) {
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
//清空刷盘,写入,提交位置
mappedFile.setWrotePosition(0);
mappedFile.setCommittedPosition(0);
mappedFile.setFlushedPosition(0);
for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {
//获取每条消息
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
long tagsCode = byteBuffer.getLong();
//说明为该文件第一条消息
if (0 == i) {
//该文件记录第一条的commitlog偏移>=phyOffet
//则说明该文件记录消息无效
if (offset >= phyOffet) {
//删除文件
this.mappedFileQueue.deleteLastMappedFile();
break;
} else {
//继续遍历下条消息
int pos = i + CQ_STORE_UNIT_SIZE;
//重新设置刷盘,写入,提交位置
mappedFile.setWrotePosition(pos);
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
//设置刷入commitlog偏移
this.maxPhysicOffset = offset + size;
// This maybe not take effect, when not every consume queue has extend file.
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
}
} else {
//不是第一条消息的处理
//说明消息有效
if (offset >= 0 && size > 0) {
if (offset >= phyOffet) {
//这里直接返回,而不是删除文件
//是因为该文件记录的之前消息是有效的
//为什么不清空后面的消息了,这个采用后续消息覆盖解决
//而为了保证消息刷盘,写入,以及提交的位置正确,在前一次执行消息解析的时候
//已经存储了
return;
}
//继续遍历下条消息
int pos = i + CQ_STORE_UNIT_SIZE;
//重新设置刷盘,写入,提交位置
mappedFile.setWrotePosition(pos);
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
//设置刷入commitlog偏移
this.maxPhysicOffset = offset + size;
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
//遍历到最后一条消息,则返回
if (pos == logicFileSize) {
return;
}
} else {
//遍历到文件无效消息,则返回
return;
}
}
}
} else {
//没有文件,直接退出
break;
}
}
if (isExtReadEnable()) {
this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
}
}
销毁所有ConsumeQueue文件
DefaultMessageStore
public void destroyLogics() {
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
//遍历每一个ConsumeQueue目录
for (ConsumeQueue logic : maps.values()) {
//销毁每个目录所有ConsumeQueue文件
logic.destroy();
}
}
}
ConsumeQueue
public void destroy() {
this.maxPhysicOffset = -1;
this.minLogicOffset = 0;
//将消息消费队列目录下的所有文件全部删除。
this.mappedFileQueue.destroy();
if (isExtReadEnable()) {
this.consumeQueueExt.destroy();
}
}
public void destroy() {
//遍历目录下每个consumeQueue文件
for (MappedFile mf : this.mappedFiles) {
//销毁通道,物理文件
mf.destroy(1000 * 3);
}
//清除集合
this.mappedFiles.clear();
this.flushedWhere = 0;
// delete parent directory
//删除上级目录
File file = new File(storePath);
if (file.isDirectory()) {
file.delete();
}
}
异常停止commitLog文件恢复
CommitLog
//Broker 异常停止文件恢复
@Deprecated
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
//从最后一个文件,往前遍历
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
//判断一个消息文件是一个正确的文件
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this mapped file " + mappedFile.getFileName());
break;
}
}
//遍历到最后一个文件都没找到,则遍历最后一个文件消息
if (index < 0) {
index = 0;
mappedFile = mappedFiles.get(index);
}
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
//遍历消息
while (true) {
//从result 返回的ByteBuffer 中循环读取消息,一次读取一条,
//反序列化并创建DispatchRequest对象,主要记录一条消息数据
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
// Normal data
//说明有数据
if (size > 0) {
mappedFileOffset += size;
//是否允许转发
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
//消息物理偏移量>CommitLog的提交指针,则结束
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
//调用文件转发请求,分别同步index和queue文件
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
}
// Come the end of the file, switch to the next file
// Since the return 0 representatives met last hole, this can
// not be included in truncate offset
else if (size == 0) {
//无效数据
//遍历下一个文件
index++;
//遍历到了最后一个文件,直接退出
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
} else {
//解析数据不成功,直接结束
log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
break;
}
}
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
//删除offset 之后的所有文件
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
}
// Commitlog case files are deleted
else {
log.warn("The commitlog files are deleted, and delete the consume queue files");
//尚未找到文件
//设置commitlog 目录的flushedWhere 、committedWhere指针都为0
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
//销毁消息消费队列文件。
this.defaultMessageStore.destroyLogics();
}
}
判断一个消息文件是一个正确的文件
CommitLog
//判断一个消息文件是一个正确的文件
private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
//通过魔数判断该文件是否符合commitlog 消息文件的存储格式。
int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
if (magicCode != MESSAGE_MAGIC_CODE) {
return false;
}
int sysFlag = byteBuffer.getInt(MessageDecoder.SYSFLAG_POSITION);
int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
int msgStoreTimePos = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + bornhostLength;
//若存储时间为0,说明该消息存储文件中未存储任何消息。
long storeTimestamp = byteBuffer.getLong(msgStoreTimePos);
if (0 == storeTimestamp) {
return false;
}
//如果messagelndexEnable 为true , 表示索引文件的刷盘时间点也参与计算。
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
} else {
//文件第一条消息的时间戳小于文件检测点说明该文件部分消息是可靠的,
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
log.info("find check timestamp, {} {}",
storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
}
return false;
}
索引文件的刷盘时间点也参与计算
StoreCheckpoint
public long getMinTimestampIndex() {
//分别取三者最小值,文件刷盘时间点 消息消费队列文件刷盘时间点 索引文件刷盘时间点
return Math.min(this.getMinTimestamp(), this.indexMsgTimestamp);
}
1 ) CommitLog :消息存储文件,所有消息主题的消息都存储在CommitLog 文件中。
2 ) ConsumeQueue :消息消费队列,消息到达CommitLog 文件后,将异步转发到消息
消费队列,供消息消费者消费。
3 ) IndexFile :消息索引文件,主要存储消息Key 与Offset 的对应关系。
4 )事务状态服务: 存储每条消息的事务状态。
5 )定时消息服务:每一个延迟级别对应一个消息消费队列,存储延迟队列的消息拉取
进度。
RocketMQ 将所有主题的消息存储在同-个文件中,确保消息发送时顺序写文件
RocketMQ 引入了ConsumeQueue 消息队列文件,每个消息主题包含多个消息消费队列,每一个消息队列有一个消息文件。
IndexFile 索引文件,其主要设计理念就是为了加速消息的检索性能,根据消息的属性快速从Commitlog 文件中检索消息。
存储流程
SendMessageProcessor
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
//校验topic,以及创建topic配置文件
final RemotingCommand response = preSend(ctx, request, requestHeader);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
if (response.getCode() != -1) {
return CompletableFuture.completedFuture(response);
}
final byte[] body = request.getBody();
//获取topic配置
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
//创建消息扩展,主要封装一些其他参数
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return CompletableFuture.completedFuture(response);
}
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
CompletableFuture<PutMessageResult> putMessageResult = null;
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
//异步存储消息
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
校验msg
SendMessageProcessor
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageRequestHeader requestHeader) {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("Receive SendMessage request command {}", request);
final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimestamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));
return response;
}
response.setCode(-1);
//校验消息
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
return response;
}
return response;
}
AbstractSendMessageProcessor
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
//检查该Broker 是否有写权限。
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
&& this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending message is forbidden");
return response;
}
//验证topic长度以及命名格式
if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {
return response;
}
//检测该topic是否可以发送消息,默认主题不能发送,仅仅供路由查找。
if (TopicValidator.isNotAllowedSendTopic(requestHeader.getTopic(), response)) {
return response;
}
//获取该topic对应topicConfig
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
} else {
topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
}
}
log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
//创建topicConfig
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
if (null == topicConfig) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicConfig =
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
topicSysFlag);
}
}
if (null == topicConfig) {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
}
int queueIdInt = requestHeader.getQueueId();
int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
if (queueIdInt >= idValid) {
String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
queueIdInt,
topicConfig.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
return response;
}
TopicConfigManager
public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
TopicConfig topicConfig = null;
boolean createNew = false;
try {
if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
topicConfig = this.topicConfigTable.get(topic);
//再次获取topic配置文件
if (topicConfig != null)
return topicConfig;
//获取默认topic TBW102配置
TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
if (defaultTopicConfig != null) {
//isAutoCreateTopicEnable=true才能发现默认topic
if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
}
}
//是否允许继承defaultTopicConfig属性
if (PermName.isInherited(defaultTopicConfig.getPerm())) {
topicConfig = new TopicConfig(topic);
int queueNums =
clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
.getWriteQueueNums() : clientDefaultTopicQueueNums;
if (queueNums < 0) {
queueNums = 0;
}
topicConfig.setReadQueueNums(queueNums);
topicConfig.setWriteQueueNums(queueNums);
int perm = defaultTopicConfig.getPerm();
perm &= ~PermName.PERM_INHERIT;
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(topicSysFlag);
topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
} else {
log.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]",
defaultTopic, defaultTopicConfig.getPerm(), remoteAddress);
}
} else {
log.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]",
defaultTopic, remoteAddress);
}
if (topicConfig != null) {
log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]",
defaultTopic, topicConfig, remoteAddress);
this.topicConfigTable.put(topic, topicConfig);
this.dataVersion.nextVersion();
//标记创建新topic
createNew = true;
//创建topic文件
this.persist();
}
} finally {
this.lockTopicConfigTable.unlock();
}
}
} catch (InterruptedException e) {
log.error("createTopicInSendMessageMethod exception", e);
}
//若创建新的topic成功
if (createNew) {
//向name注册所有topic
this.brokerController.registerBrokerAll(false, true, true);
}
return topicConfig;
}
异步存储消息(默认)
DefaultMessageStore
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
//是否可写入
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
}
//校验消息
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
}
long beginTime = this.getSystemClock().now();
//写入消息到commitLog
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
putResultFuture.thenAccept((result) -> {
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
});
return putResultFuture;
}
是否可写入
private PutMessageStatus checkStoreStatus() {
//若消息存储关闭
if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden");
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}
//Broker 为SLAVE 角色
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store has shutdown, so putMessage is forbidden");
}
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
}
//当前Rocket不支持写入则拒绝消息写入;
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store has shutdown, so putMessage is forbidden");
}
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
} else {
this.printTimes.set(0);
}
if (this.isOSPageCacheBusy()) {
return PutMessageStatus.OS_PAGECACHE_BUSY;
}
return PutMessageStatus.PUT_OK;
}
校验消息
private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {
//消息主题长度超过256 个字符,拒绝该消息写
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
}
//消息属性长度超过65536 个字符
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
}
return PutMessageStatus.PUT_OK;
}
CommitLog
写入消息到commitLog
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// Set the storage time
//记录存储时间
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery 若消息的延迟级别大于0
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
//获取上次写入的文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global 设置存储时间,保证全局有序
msg.setStoreTimestamp(beginLockTimestamp);
//若没有获取到最后写入的文件,以及文件写满
if (null == mappedFile || mappedFile.isFull()) {
//创建新的文件,偏移地址为0
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
//若新建失败,则异常.可能磁盘不足或空间不够
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
//将消息追加到MappedFile中
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});
}
MappedFileQueue
获取上个文件
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;
while (!this.mappedFiles.isEmpty()) {
try {
///获取尾端文件
mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
break;
} catch (IndexOutOfBoundsException e) {
//continue;
} catch (Exception e) {
log.error("getLastMappedFile has exception.", e);
break;
}
}
return mappedFileLast;
}
创建文件
public MappedFile getLastMappedFile(final long startOffset) {
return getLastMappedFile(startOffset, true);
}
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
//创建的偏移必须是mappedFileSize设置的倍数
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
if (mappedFileLast != null && mappedFileLast.isFull()) {
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
if (createOffset != -1 && needCreate) {
//利用偏移为文件名
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
//分别异步创建2个文件
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
//若mappedFiles队列为空
if (this.mappedFiles.isEmpty()) {
//设置是MappedFileQueue 队列中第一个文件
mappedFile.setFirstCreateInQueue(true);
}
//添加mapppedFiles集合
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}
追加消息
MappedFile
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
return appendMessagesInner(msg, cb);
}
writeBuffer.slice() : this.mappedByteBuffer.slice();大家注意这句代码
这句代码的意思是
writeBuffer:堆内存ByteBuffer,如果不为空,数据首先将存储在该Buffer 中,然后提交到MappedFile 对应的内存映射文件Buffer .
注意:transientStorePoolEnable为true 时不为空。
mappedByteBuffer:物理文件对应的内存映射Buffer
//将消息追加到MappedFile 中
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
//获取文件当前写位置
int currentPos = this.wrotePosition.get();
//若写的位置小于文件大小
if (currentPos < this.fileSize) {
//创建一个与MappedFile 的共享内存区
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
//设置缓冲区写入位置
byteBuffer.position(currentPos);
AppendMessageResult result;
if (messageExt instanceof MessageExtBrokerInner) {
//序列化msg
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
//记录当前文件写的指针
this.wrotePosition.addAndGet(result.getWroteBytes());
//记录写的时间戳
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
//说明写满,抛出异常
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
CommitLog.DefaultAppendMessageCallback
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET 算出物理偏移量,也就是在内存中的逻辑偏移量
long wroteOffset = fileFromOffset + byteBuffer.position();
int sysflag = msgInner.getSysFlag();
int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
//重置可读的位置
this.resetByteBuffer(storeHostHolder, storeHostLength);
String msgId;
//创建全局唯一消息ID 4字节ip和4字节端口号,8字节消息偏移量
if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
} else {
msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
}
// Record ConsumeQueue information
//记录ConsumeQueue信息
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
// topicName-queueId组成
String key = keyBuilder.toString();
//通过key获得偏移
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
//当前所有消息队列的当前待写入偏移量。
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
// Transaction messages that require special handling
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queuec
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
/**
* Serialize message 序列化消息
*/
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
//根据消息体的长度、主题的长度、属性的长度结合消息存储格式计算消息的总长度。
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message 超过最大的单个消息大小,则异常
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
// Determines whether there is sufficient free space
/*
如果消息长度+END_FILE_ MIN_ BLANK_ LENGTH 大于CommitLog 文件
的空闲空间Broker 会重新创建一个新的CommitLog文件来存储该消息。
*/
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
//每个CommitLog 文件最少会空闲8 个字节,
// 高4 字节存储当前文件剩余空间,
// 低4 字节存储魔数: CommitLog.BLANK MAGICCODE 。
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
//返回AppendMessageStatus.END_OF_FILE
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(bornHostHolder, bornHostLength);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(storeHostHolder, storeHostLength);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
//将消息写入buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
//更新消息队列逻辑偏移量。
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
break;
default:
break;
}
return result;
}
计算消息长度
protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {
int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
final int msgLen = 4 //TOTALSIZE 该消息条目总长度
+ 4 //MAGICCODE 魔数固定值Ox daa320a7
+ 4 //BODYCRC 消息体e r e 校验码,
+ 4 //QUEUEID 消息消费队列ID
+ 4 //FLAG 消息FLA G , Rock e tMQ 不做处理, 供应用程序使用
+ 8 //QUEUEOFFSET 消息在消息消费队列的偏移量
+ 8 //PHYSICALOFFSET 消息在CommitLog 文件中的偏移量
+ 4 //SYSFLAG 消息系统Flag ,例如是否压缩、是否是事务消息等
+ 8 //BORNTIMESTAMP 消息生产者调用消息发送API 的时间戳
+ bornhostLength //BORNHOST 消息发送者IP 、端口号
+ 8 //STORETIMESTAMP 消息存储时间戳,
+ storehostAddressLength //STOREHOSTADDRESS Broker 服务器IP+ 端口号
+ 4 //RECONSUMETIMES 消息重试次数,
+ 8 //Prepared Transaction Offset 事务消息物理偏移量
+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY 消息体长度,
+ 1 + topicLength //TOPIC
+ 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength 消息属性长度
+ 0;
return msgLen;
}
同步存储消息
/**
* 同步存储消息
*/
@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
//校验是否可存
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
return new PutMessageResult(checkStoreStatus, null);
}
//校验消息属性和主题长度
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
return new PutMessageResult(msgCheckStatus, null);
}
long beginTime = this.getSystemClock().now();
//存储消息
PutMessageResult result = this.commitLog.putMessage(msg);
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
存储消息
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
//若消息的延迟级别大于0
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
//用延迟消息主题SCHEDULE TOPIC 、消息队列ID 更新原先消息的主题与队列
//用于消息重试机制与定时消息处理
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
//获取当前可以写入的Commitlog 文件
//文件名存偏移地址,表示该文件中的第一条消息的物理偏移量
MappedFile mappedFile = this.
//看作是${ ROCKET_HOME }/store/commitlog 文件夹
mappedFileQueue.
//对应该文件夹下一个个的文件。
getLastMappedFile();
//加锁,也就是说消息存commitLog是串行的,这样也是为了保证有序,和安全.
//依赖storeConfig
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
//设置消息存储时间,以保证全局有序
msg.setStoreTimestamp(beginLockTimestamp);
//若文件为空,或者写满.则新建一个commit文件,偏移地址为0
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
//若新建失败,则异常.可能磁盘不足或空间不够
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
//根据是同步刷盘还是异步刷盘方式,将内存中的数据持久化到磁盘
handleDiskFlush(result, putMessageResult, msg);
//执行HA 主从同步复制
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
CommitLog刷盘
可通过在broker 配置文件中配置flushDiskType 来设定刷盘方式,可选值为ASYNC FLUSH (异步刷盘)、S刊C_FLUSH ( 同步刷盘) , 默认为异步刷盘
索引文件的刷盘并不是采取定时刷盘机制,而是每更新一次索引文件就会将上一次的改动刷写到磁盘。
CommitLog
/**
* 刷盘
*/
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush 同步刷盘
//获取刷盘方式,是否为同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
//构建GroupCommitRequest 同步任务
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
//提交到service 。
service.putRequest(request);
CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
PutMessageStatus flushStatus = null;
try {
//等待同步刷盘任务完成,如果超时则返回刷盘错误,
flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
if (flushStatus != PutMessageStatus.PUT_OK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
//该线程处于等待状态则将其唤醒。
service.wakeup();
}
}
// Asynchronous flush 异步刷盘
else {
/*
如果transientStorePoolEnable 为true , RocketMQ 会单独申请一个与目标物理文件( commitlog)
同样大小的堆外内存, 该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消
息首先追加到堆外内存,然后提交到与物理文件的内存映射内存中,再flush 到磁盘。如果
transientStorePoolEnable 为false ,消息直接追加到与物理文件直接映射的内存中,然后刷
写到磁盘中。
*/
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}
同步刷盘
放入刷盘任务请求
CommitLog.GroupCommitService
public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
//客户端提交同步刷盘任务到GroupCommitService 线程
this.requestsWrite.add(request);
}
//该线程处于等待状态则将其唤醒。
this.wakeup();
}
默认等待5S,等待GroupCommitService线程调用
GroupCommitService
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//休息10ms
this.waitForRunning(10);
//处理一批同步刷盘请求
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
//交换任务,避免锁竞争
this.swapRequests();
}
//处理另外一批同步刷盘请求
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
执行刷盘提交任务
GroupCommitService
private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
//遍历请求
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush 刷新2次
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
//当前刷盘指针是否大于下一个刷盘指针
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) {
//刷盘
CommitLog.this.mappedFileQueue.flush(0);
}
}
//唤醒消息发送线程并通知刷盘结果。
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
//更新刷盘检测点StoreCheckpoint 中的PhysicMsgTimestamp
//但并没有执行检测点的刷盘操作,刷盘检测点的刷盘操作将在刷写消息队列文件时触发
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
//清除刷盘请求
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
//可能出现单个消息为不同步刷新
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
MappedFileQueue
public boolean flush(final int flushLeastPages) {
boolean result = true;
//根据消息偏移量offset 查找MappedFile
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
//获取文件最后一次内容写入时间
long tmpTimeStamp = mappedFile.getStoreTimestamp();
//将内存中的数据刷写到磁盘
int offset = mappedFile.flush(flushLeastPages);
//获取刷写磁盘指针
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;
//记录刷写磁盘指针
this.flushedWhere = where;
if (0 == flushLeastPages) {
//记录文件最后一次内容写入时间
//注意这里记录的是刷入channel或channel映射的byteBuffer的时间,而不是刷盘时间
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
将内存中的数据刷写到磁盘
MappedFile
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
//获取该文件内存映射写指针
//如果开启了堆内存池,则是堆内存写指针
int value = getReadPosition();
try {
//We only append data to fileChannel or mappedByteBuffer, never both.
//说明使用了堆内存,执行到这里堆内存已经写入fileChannel中了,重新刷入磁盘就行了
if (writeBuffer != null || this.fileChannel.position() != 0) {
//文件的所有待定修改立即同步到磁盘,布尔型参数表示在方法返回值前文件的元数据(metadata)是否也要被同步更新到磁盘
this.fileChannel.force(false);
} else {
//说明没用堆内存ByteBuffer,直接使用内存映射刷入即可.
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
//记录刷入磁盘最新指针
this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
//返回刷入磁盘最新指针
return this.getFlushedPosition();
}
唤醒消息发送线程并通知刷盘结果。
public void wakeupCustomer(final PutMessageStatus putMessageStatus) {
this.flushOKFuture.complete(putMessageStatus);
}
交换请求
避免同步刷盘消费任务与其他消息生产者提交任务直接的锁竞争
private void swapRequests() {
//这两个容器每执行完一次任务后交换,继续消费任务
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
异步刷盘
如果 transientStorePoolEnable 为false ,消息直接追加到与物理文件直接映射的内存中,然后刷写到磁盘中。
flushCommitLogService机制
CommitLog.FlushRealTimeService
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
//默认为false , 表示await 方法等待;如果为true ,表示使用Thread.sleep 方法等待。
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
//FlushRealTimeService 线程任务运行间隔。
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
//一次刷写任务至少包含页数, 如果待刷写数据不足,小于该参数配置的值,将忽略本次刷写任务,默认4 页。
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
//两次真实刷写任务最大间隔, 默认10s 。
int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
boolean printFlushProgress = false;
// Print flush progress
long currentTimeMillis = System.currentTimeMillis();
/*
如果距上次提交间隔超过flushPhysicQueueThoroughinterval ,则本次刷盘任务
将忽略flushPhysicQueuLeastPages , 也就是如果待刷写数据小于指定页数也执行刷写磁盘
操作。
*/
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
printFlushProgress = (printTimes++ % 10) == 0;
}
try {
//执行一次刷盘任务前先等待指定时间间隔, 然后再执行刷盘任务。
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}
if (printFlushProgress) {
this.printFlushProgress();
}
long begin = System.currentTimeMillis();
//将内存中数据刷写到磁盘
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
//更新存储检测点文件的comm1tlog 文件的更新时间戳
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}
// Normal shutdown, to ensure that all the flush before exit
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
this.printFlushProgress();
CommitLog.log.info(this.getServiceName() + " service end");
}
将内存中的数据刷写到磁盘
MappedFile
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
//获取该文件内存映射写指针
//如果开启了堆内存池,则是堆内存写指针
int value = getReadPosition();
try {
//We only append data to fileChannel or mappedByteBuffer, never both.
//说明使用了堆内存,执行到这里堆内存已经写入fileChannel中了,重新刷入磁盘就行了
if (writeBuffer != null || this.fileChannel.position() != 0) {
//文件的所有待定修改立即同步到磁盘,布尔型参数表示在方法返回值前文件的元数据(metadata)是否也要被同步更新到磁盘
this.fileChannel.force(false);
} else {
//说明没用堆内存ByteBuffer,直接使用内存映射刷入即可.
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
//记录刷入磁盘最新指针
this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
//返回刷入磁盘最新指针
return this.getFlushedPosition();
}
commitLogService提交
如果transientStorePoolEnable 为true , RocketMQ 会单独申请一个与目标物理文件( commitlog)同样大小的堆外内存, 该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到与物理文件的内存映射内存中,再flush到磁盘。
CommitLog.CommitRealTimeService
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
//CommitRealTimeService 线程间隔时间,默认200ms
//将ByteBuffer 新追加的内容提交到MappedByteBuffer
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
//一次提交任务至少包含页数, 如果待提交数据不足,小于该参数配置的值,将忽略本次提交任务,默认4 页。
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
//两次真实提交最大间隔,默认200ms 。
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
long begin = System.currentTimeMillis();
//如果距上次提交间隔超过commitDataThoroughlnterval ,
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
//本次提交忽略commitCommitLogLeastPages参数
//也就是如果待提交数据小于指定页数, 也执行提交操作。
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;
}
try {
//执行提交操作,将待提交数据提交到物理文件的内存映射内存区
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
//如果返回false ,并不是代表提交失败,而是只提交了一部分数据
if (!result) {
this.lastCommitTimestamp = end; // result = false means some data committed.
//now wake up flush thread.
//唤醒刷盘线程执行刷盘操作
//该线程每完成一次提交动作,将等待200ms 再继续执行下一次提交任务。
flushCommitLogService.wakeup();
}
if (end - begin > 500) {
log.info("Commit data to file costs {} ms", end - begin);
}
//等待200ms
this.waitForRunning(interval);
} catch (Throwable e) {
CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
}
}
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
CommitLog.log.info(this.getServiceName() + " service end");
}
}
MappedFileQueue
public boolean commit(final int commitLeastPages) {
boolean result = true;
//根据消息偏移量offset 查找MappedFile
MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
if (mappedFile != null) {
//执行提交操作
int offset = mappedFile.commit(commitLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.committedWhere;
//记录提交指针
this.committedWhere = where;
}
return result;
}
根据消息偏移量offset 查找MappedFile
MappedFileQueue
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
MappedFile firstMappedFile = this.getFirstMappedFile();
MappedFile lastMappedFile = this.getLastMappedFile();
if (firstMappedFile != null && lastMappedFile != null) {
//偏移不在起始和结束文件中,说明越界.
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
offset,
firstMappedFile.getFileFromOffset(),
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
this.mappedFileSize,
this.mappedFiles.size());
} else {
//因为RocketMQ定时删除存储文件
//所以第一个文件偏移开始并不一定是000000.
//同理可得offset / this.mappedFileSize并不能定位到具体文件
//所以还需要减去第一个文件的偏移/文件大小,算出磁盘中起始第几个文件
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
MappedFile targetFile = null;
try {
//获取映射文件
targetFile = this.mappedFiles.get(index);
} catch (Exception ignored) {
}
//再次检测是否在文件范围内
if (targetFile != null && offset >= targetFile.getFileFromOffset()
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
return targetFile;
}
//遍历所有文件查找
for (MappedFile tmpMappedFile : this.mappedFiles) {
if (offset >= tmpMappedFile.getFileFromOffset()
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
return tmpMappedFile;
}
}
}
//如果配置了没找到返回第一个,就返回第一个文件
if (returnFirstOnNotFound) {
return firstMappedFile;
}
}
} catch (Exception e) {
log.error("findMappedFileByOffset Exception", e);
}
return null;
}
执行提交操作
MappedFile
public int commit(final int commitLeastPages) {
/*
writeBuffer如果为空,直接返回wrotePosition 指针,无须执行commit 操作,
表明commit 操作的实际是writeBuffer堆外内存
*/
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
//判断是否执行commit 操作,主要判断页是否满足
if (this.isAbleToCommit(commitLeastPages)) {
//添加引用
if (this.hold()) {
//具体的提交实现
commit0(commitLeastPages);
//释放引用
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}
// All dirty data has been committed to FileChannel.
///所有脏数据已经写入channel,且该文件已经提交满了.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
//归还堆外内存给堆内存池
this.transientStorePool.returnBuffer(writeBuffer);
//释放GC
this.writeBuffer = null;
}
//返回最新提交位置
return this.committedPosition.get();
}
实时更新消息消费队列与索引文件
消息消费队列文件、消息属性索引文件都是基于CommitLog 文件构建的, 当消息生产者提交的消息存储在Commitlog 文件中,ConsumeQueue 、IndexFile 需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。
Broker 服务器在启动时会启动ReputMessageService 线程,准实时转发CommitLog 文件更新事件, 相应的任务处理器根据
转发的消息及时更新ConsumeQueue 、IndexFile 文件。
DefaultMessageStore
public void start() throws Exception {
//获取${ROCKETMQ_HOME}\store\lock的锁
lock = lockFile.getChannel().tryLock(0, 1, false);
//如果文件不存在,且锁不是抢占式锁则锁失败,说明已经开启锁了
if (lock == null || lock.isShared() || !lock.isValid()) {
throw new RuntimeException("Lock failed,MQ already started");
}
//文件写入Lock,并刷入磁盘
lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
lockFile.getChannel().force(true);
{
/**
* 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;
* 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;
* 3. Calculate the reput offset according to the consume queue;
* 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
*/
//获取commitLog最小偏移量
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
//获取ConsumeQueue记录的最大偏移
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
//因为ConsumeQueue是在commitLog之后刷入磁盘,所以ConsumeQueue里的数据可能会比commitLog小
for (ConsumeQueue logic : maps.values()) {
if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
}
}
}
//若还未获取第一个文件则从0开始
if (maxPhysicalPosInLogicQueue < 0) {
maxPhysicalPosInLogicQueue = 0;
}
//在这期间可能磁盘出现问题,将maxPhysicalPosInLogicQueue重置为最小偏移.保证数据安全.
if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
/**
* This happens in following conditions:
* 1. If someone removes all the consumequeue files or the disk get damaged. 如果有人删除了consumequeue文件或磁盘损坏。
* 2. Launch a new broker, and copy the commitlog from other brokers. 启动新的broker,并将委托日志复制到其他broker中。
*
* All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.
* If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.
*/
log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
}
log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
//maxPhysicalPosInLogicQueue:ReputMessageService 从哪个物理偏移量开始转发消息给ConsumeQueue和IndexFile 。
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
//开启线程转发消息给ConsumeQueu巳和IndexFile
this.reputMessageService.start();
/**
* 1. Finish dispatching the messages fall behind, then to start other services.
* 2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0
*/
while (true) {
if (dispatchBehindBytes() <= 0) {
break;
}
Thread.sleep(1000);
log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
}
this.recoverTopicQueueTable();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService.start();
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
this.flushConsumeQueueService.start();
this.commitLog.start();
this.storeStatsService.start();
this.createTempFile();
this.addScheduleTask();
this.shutdown = false;
}
lock也就是目录这个文件
DefaultMessageStore.ReputMessageService
@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
//如果线程没关闭
while (!this.isStopped()) {
try {
//每执行一次任务推送休息1毫秒
Thread.sleep(1);
// 继续尝试推送消息到消息消费队列和索引文件,
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
private void doReput() {
//说明调度过多,commitLog已经过期
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
//是否允许转发
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
//reputFromOffset>CommitLog的提交指针,则结束
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
//返回reputFromOffset 偏移量开始的全部有效数据(commitlog 文件)
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
//获取在文件中的偏移位置.算上文件起始地址
this.reputFromOffset = result.getStartOffset();
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
//从result 返回的ByteBuffer 中循环读取消息,一次读取一条,
//反序列化并创建DispatchRequest对象,主要记录一条消息数据
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
//在这里默认采用的是getMsgSize
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
//最终将分别调用CommitLogDispatcherBuildConsumeQueue (构建消息消费队列)、
//CommitLogDispatcherBuildlndex (构建索引文件) 。
DefaultMessageStore.this.doDispatch(dispatchRequest);
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
//更改为下一条消息偏移量
this.reputFromOffset += size;
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.addAndGet(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
//返回下一个文件的起始偏移
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
//跳过循环
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
//没解析到完整的消息
if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
//跳过不完整的消息,并声明这是一个bug
this.reputFromOffset += size;
} else {
//没数据,标记不需要执行下一条.
doNext = false;
// If user open the dledger pattern or the broker is master node,
// it will not ignore the exception and fix the reputFromOffset variable
if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
获取当前Commitlog 目录最小偏移量
CommitLog
public long getMinOffset() {
//获取第一个文件
MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
if (mappedFile != null) {
//第一个文件可用
if (mappedFile.isAvailable()) {
//返回该文件起始偏移量
return mappedFile.getFileFromOffset();
} else {
//返回下个文件的起始偏移量
return this.rollNextFile(mappedFile.getFileFromOffset());
}
}
return -1;
}
返回reputFromOffset 偏移量开始的全部有效数据(commitlog 文件)
public SelectMappedBufferResult getData(final long offset) {
return this.getData(offset, offset == 0);
}
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
//获取单个 CommitLog 文件大小,默认1G,
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
//根据消息偏移量offset 查找MappedFile
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
//查找pos 到当前最大可读之间的数据
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
return result;
}
return null;
}
根据消息偏移量offset 查找MappedFile
MappedFileQueue
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
MappedFile firstMappedFile = this.getFirstMappedFile();
MappedFile lastMappedFile = this.getLastMappedFile();
if (firstMappedFile != null && lastMappedFile != null) {
//偏移不在起始和结束文件中,说明越界.
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
offset,
firstMappedFile.getFileFromOffset(),
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
this.mappedFileSize,
this.mappedFiles.size());
} else {
//因为RocketMQ定时删除存储文件
//所以第一个文件偏移开始并不一定是000000.
//同理可得offset / this.mappedFileSize并不能定位到具体文件
//所以还需要减去第一个文件的偏移/文件大小,算出磁盘中起始第几个文件
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
MappedFile targetFile = null;
try {
//获取映射文件
targetFile = this.mappedFiles.get(index);
} catch (Exception ignored) {
}
//再次检测是否在文件范围内
if (targetFile != null && offset >= targetFile.getFileFromOffset()
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
return targetFile;
}
//遍历所有文件查找
for (MappedFile tmpMappedFile : this.mappedFiles) {
if (offset >= tmpMappedFile.getFileFromOffset()
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
return tmpMappedFile;
}
}
}
//如果配置了没找到返回第一个,就返回第一个文件
if (returnFirstOnNotFound) {
return firstMappedFile;
}
}
} catch (Exception e) {
log.error("findMappedFileByOffset Exception", e);
}
return null;
}
查找pos 到当前最大可读之间的数据
MappedFile
public SelectMappedBufferResult selectMappedBuffer(int pos) {
//获取最大可读数据位置
int readPosition = getReadPosition();
//若有数据可读
if (pos < readPosition && pos >= 0) {
if (this.hold()) {
/*
操作ByteBuffer 时如果使用了slice () 方法,对其ByteBuffer 进行读取时一般手动指定
position 与limit 指针,而不是调用flip 方法来切换读写状态。
*/
//由于在整个写入期间都未曾改变MappedByteBuffer的指针
//所以mappedByteBuffer.slice()方法返回的共享缓存区空间为整个MappedFile
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
int size = readPosition - pos;
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
}
}
return null;
}
调度
public void doDispatch(DispatchRequest req) {
//调用文件转发请求,分别同步index和queue文件
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}
根据消息更新ConumeQueue
DefaultMessageStore
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
//根据消息更新ConumeQueue
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
DefaultMessageStore
//根据消息更新ConumeQueue
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
//根据消息主题与队列ID ,先获取对应的ConumeQueue 文件
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
//将内容更新到内存映射文件中,不刷盘,因为ConumeQueue刷盘固定异步刷盘
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
根据topic和queueId查询ConsumeQueue,没有则新建
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
//根据主题获取消息消费队列目录
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
//没有则新建并保存
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
//根据队列Id获取队列文件
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
this);
//没有则新建并保存
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
return logic;
}
将内容更新到内存映射文件中,不刷盘,因为ConumeQueue刷盘固定异步刷盘
public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30;
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) {
long tagsCode = request.getTagsCode();
if (isExtWriteEnable()) {
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());
long extAddr = this.consumeQueueExt.put(cqExtUnit);
if (isExtAddr(extAddr)) {
tagsCode = extAddr;
} else {
log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
topic, queueId, request.getCommitLogOffset());
}
}
//将内容更新到内存映射文件中,不刷盘,因为ConumeQueue刷盘固定异步刷盘
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
if (result) {
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
}
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {
// XXX: warn and notify me
log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + i + " times");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.warn("", e);
}
}
}
// XXX: warn and notify me
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
//可能重复构建消息
// maxPhysicOffset:消息体总长度加上消息在comlog偏移量
if (offset + size <= this.maxPhysicOffset) {
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
return true;
}
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
//依次将消息偏移量、消息长度、taghashcode 写入到ByteBuffer堆缓冲区 中
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
/* 根据consumeQueueOffset 计算ConumeQueue 中的物理地址, 将内容追加到ConsumeQueue 的内
存映射文件中(本操作只追击并不刷盘), ConumeQueue 的刷盘方式固定为异步刷盘模式。*/
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
//获取最后一个文件,找不到则创建
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
//若是队列第一个文件,该文件写指针为0
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
//设置最小逻辑偏移
this.minLogicOffset = expectLogicOffset;
//设置刷盘指针
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
//当前数据提交指针,内存中ByteBuffer 当前的写指针,该值大于等于flushedWhere 。
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
this.maxPhysicOffset = offset + size;//写入文件通道
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
MappedFileQueue
获取最后一个文件,找不到则创建
/**
* 获取最后一个文件,找不到则创建
*/
public MappedFile getLastMappedFile(final long startOffset) {
return getLastMappedFile(startOffset, true);
}
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
//创建的偏移必须是mappedFileSize设置的倍数
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
//写满则新建
if (mappedFileLast != null && mappedFileLast.isFull()) {
//计算新建文件的起始偏移
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
if (createOffset != -1 && needCreate) {
//利用偏移为文件名
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
//分别异步创建2个文件
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
//同步创建文件
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
//若mappedFiles队列为空
if (this.mappedFiles.isEmpty()) {
//设置是MappedFileQueue 队列中第一个文件
mappedFile.setFirstCreateInQueue(true);
}
//添加mapppedFiles集合
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}
异步创建文件
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
int canSubmitRequests = 2;
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) {
//if broker is slave, don't fast fail even no buffer in pool
canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
}
}
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
if (nextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
this.requestTable.remove(nextFilePath);
return null;
}
//添加生产者队列,异步线程创建文件
boolean offerOK = this.requestQueue.offer(nextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
canSubmitRequests--;
}
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
}
}
//是否创建成功
if (hasException) {
log.warn(this.getServiceName() + " service has exception. so return null");
return null;
}
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
this.requestTable.remove(nextFilePath);
return result.getMappedFile();
}
} else {
log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
return null;
}
同步创建文件 (默认)
文件初始化
public MappedFile(final String fileName, final int fileSize) throws IOException {
init(fileName, fileSize);//文件初始化
}
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
//fileFromOffset为文件名
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent());
try {
//创建文件读写通道
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
//将文件映射内存中
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
//重新计算MappedFile虚拟内存。
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
//计算文件个数
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("Failed to create file " + this.fileName, e);
throw e;
} catch (IOException e) {
log.error("Failed to map file " + this.fileName, e);
throw e;
} finally {
//执行失败,但通道创建成功,则关闭通道
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
根据消息更新Index 索引文件
也是从doDispatch过来的
构建索引
public void buildIndex(DispatchRequest req) {
//获取或创建IndexFile 文件
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
//获取所有文件最大的物理偏移量
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
//如果该消息的物理偏移量小于索引文件中的物理偏移
if (msg.getCommitLogOffset() < endPhyOffset) {
//说明是重复数据,忽略本次索引构建
return;
}
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
return;
}
//如果消息的唯一键不为空
if (req.getUniqKey() != null) {
//则添加到Hash索引中,以便加速根据唯一键检索消息。
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
//构建索引键, RocketMQ 支持为同一个消息建立多个索引,多个索引键空格分开。
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
} else {
log.error("build index error, stop building index");
}
}
获取或创建IndexFile 文件
IndexService
public IndexFile retryGetAndCreateIndexFile() {
IndexFile indexFile = null;
//默认尝试创建MAX_TRY_IDX_CREATE次
for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {
//获取最后文件,如果没有则创建
indexFile = this.getAndCreateLastIndexFile();
if (null != indexFile)
break;
try {
log.info("Tried to create index file " + times + " times");
//创建失败,休眠1秒,尝试再次创建
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
if (null == indexFile) {
//标记索引文件不能创建
this.defaultMessageStore.getAccessRights().makeIndexFileError();
log.error("Mark index file cannot build flag");
}
return indexFile;
}
//获取最后一个文件,若没有则创建
public IndexFile getAndCreateLastIndexFile() {
IndexFile indexFile = null;
IndexFile prevIndexFile = null;
long lastUpdateEndPhyOffset = 0;
long lastUpdateIndexTimestamp = 0;
{
this.readWriteLock.readLock().lock();
//如果文件集合不为空
if (!this.indexFileList.isEmpty()) {
//获取最后一个索引文件
IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
//若文件没写满
if (!tmp.isWriteFull()) {
indexFile = tmp;
} else {
lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
lastUpdateIndexTimestamp = tmp.getEndTimestamp();
prevIndexFile = tmp;
}
}
this.readWriteLock.readLock().unlock();
}
//文件集合没文件,或者文件写满了,则新建文件
if (indexFile == null) {
try {
String fileName =
this.storePath + File.separator
+ UtilAll.timeMillisToHumanString(System.currentTimeMillis());
//这里创建了文件
indexFile =
new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
lastUpdateIndexTimestamp);
this.readWriteLock.writeLock().lock();
//加入文件集合
this.indexFileList.add(indexFile);
} catch (Exception e) {
log.error("getLastIndexFile exception ", e);
} finally {
this.readWriteLock.writeLock().unlock();
}
//将写满文件刷入磁盘
if (indexFile != null) {
final IndexFile flushThisFile = prevIndexFile;
Thread flushThread = new Thread(new Runnable() {
@Override
public void run() {
IndexService.this.flush(flushThisFile);
}
}, "FlushIndexFileThread");
flushThread.setDaemon(true);
flushThread.start();
}
}
return indexFile;
}
添加到Hash索引文件中
IndexService
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
//说明文件写满,则再次获取一个文件
indexFile = retryGetAndCreateIndexFile();
if (null == indexFile) {
return null;
}
//尝试写入
ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
}
return indexFile;
}
将消息索引键与消息偏移量映射关系写入到IndexFile
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
//如果当前已使用条目大于等于允许最大条目数时,则返回fasle ,表示当前索引文件已写满
if (this.indexHeader.getIndexCount() < this.indexNum) {
//根据key算出key 的hashcode
int keyHash = indexKeyHashMethod(key);
//定位hasbcode对应的hash槽下标
int slotPos = keyHash % this.hashSlotNum;
//hashcode对应的hash槽的物理地址=头部40字节+对应hash槽下标*槽大小
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
// false);
//读取hash 槽中存储的数据,每个槽占用4字节,也就是getInt就可以了
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
//如果hash 槽存储的数据小于等于0 或大于当前索引文件中的索引条目格式,则将slotValue 设置为0
//说明槽尚未占用
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
//计算待存储消息的时间戳与第一条消息时间戳的差值,并转换成秒。
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
//计算新添加条目起始偏移量
int absIndexPos =
//头部字节长度
IndexHeader.INDEX_HEADER_SIZE +
//hash槽数量*单个槽大小
this.hashSlotNum * hashSlotSize
//当前index条目个数*单个条目大小
+ this.indexHeader.getIndexCount() * indexSize;
//之所以只存储HashCode 而不存储具体的key , 是为
//了将Index 条目设计为定长结构,才能方便地检索与定位条目。
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
//消息对应的物理偏移量。
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
//该消息存储时间与第一条消息的时间戳的差值,小于0 该消息无效
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
//当产生hash槽冲突时,Hash 槽中存储的是该Hash Code 所对应的最新的Index 条目的下标,
// 新的Index 条目的最后4 个字节存储该Hash Code 上一个条目的Index 下标。
//如果Hash 槽中存储的值为0 或大于当前lndexFile 最大条目数或小于- 1,表示该Hash 槽当前并没有与之对应的Index 条目。
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
//当前hash槽的值存入MappedByteBuffer 中。将覆盖原先Hash 槽的值。
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
//如果当前文件只包含一个条目,默认值为1
if (this.indexHeader.getIndexCount() <= 1) {
//更新BeginPhyOffset和BeginTimestamp
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
//slotValue为0,说明新增hash槽使用
if (invalidIndex == slotValue) {
this.indexHeader.incHashSlotCount();
}
//记录新的索引个数
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}
过期文件删除机制
默认文件72小时尚未更新,则删除.,通过在Broker 配置文件中设置fi leReservedTime 来改变过期时间,单位为小时·
org.apache.rocketmq.store.DefaultMessageStore#addScheduleTask
DefaultMessageStore
private void cleanFilesPeriodically() {
//清除消息存储文件( Commitlog 文件)和消息消费队列文件( ConsumeQueue文件)
this.cleanCommitLogService.run();
this.cleanConsumeQueueService.run();
}
删除过期文件
DefaultMessageStore.CleanCommitLogService
private void deleteExpiredFiles() {
int deleteCount = 0;
//文件保留时间, 也就是从最后一次更新时间到现在, 如果超过了该时间, 则认为是过期文件, 可以被删除。
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
//删除物理文件的间隔,因为在一次清除过程中, 可能需要被删除的文件不止一个,该值指定两次删除文件的问隔时间。
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
/*
在清除过期文件时, 如果该文件被其他线程所占用(引用次数大于0 , 比如读取消息),
此时会阻止此次删除任务, 同时在第一次试图删除该
文件时记录当前时间戳, destroyMapedFilelntervalForcibly 表示第一次拒绝删除之后能保留
的最大时间,在此时间内, 同样可以被拒绝删除, 同时会将引用减少1000 个,超过该时间
间隔后,文件将被强制删除。
*/
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
boolean timeup = this.isTimeToDelete();
boolean spacefull = this.isSpaceToDelete();
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
/*
RocketMQ 在如下三种情况任意之一满足的情况下将继续执行删除文件操作。
指定删除文件的时间点, RocketMQ 通过delete When 设置一天的固定时间执行一次删除过期文件操作, 默认为凌晨4 点。
磁盘空间是否充足,如果磁盘空间不充足,则返回true ,表示应该触发过期文件删除操作。
预留,手工触发,可以通过调用excuteDeleteFilesManualy 方法手工触发过期文件删除
*/
if (timeup || spacefull || manualDelete) {
if (manualDelete)
this.manualDeleteFileSeveralTimes--;
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
fileReservedTime,
timeup,
spacefull,
manualDeleteFileSeveralTimes,
cleanAtOnce);
fileReservedTime *= 60 * 60 * 1000;
//删除文件
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMapedFileIntervalForcibly, cleanAtOnce);
if (deleteCount > 0) {
} else if (spacefull) {
//没有删除文件,但是磁盘空间很快不足.
log.warn("disk space will be full soon, but delete file failed.");
}
}
}
磁盘空间是否充足
CleanCommitLogService
private boolean isSpaceToDelete() {
//表示commitlog 、consumequeue 文件所在磁盘分区的最大使用量,如果超过该值, 则需要立即清除过期文件。
double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
//表示是否需要立即执行清除过期文件操作。
cleanImmediately = false;
{
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
/*
physicRatio: 当前commitlog 目录所在的磁盘分区的磁盘使用率,通过File # getTotalSpace
()获取文件所在磁盘分区的总容量,通过File#getFreeSpace () 获取文件所在磁盘分区
剩余容量。
*/
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
/*diskSpaceWarningLevelRatio 通过系统参数-Drocketmq. broker.diskSpaceWamingLevelRatio
设置,默认0 . 90 。如果磁盘分区使用率超过该阔值,将设置磁盘不可写, 此时会拒绝新消息
的写人。*/
if (physicRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
}
cleanImmediately = true;
} else if (physicRatio > diskSpaceCleanForciblyRatio) {
/*diskSpaceCleanForciblyRatio:通过系统参数-Drocketmq. broker. diskSpaceCleanForciblyRatio
设置, 默认0 . 85 。如果磁盘分区使用超过该阔值,建议立即执行过期文件清除,但不会拒绝
新消息的写入。*/
cleanImmediately = true;
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
}
}
//磁盘很快就要满了,需要执行清除文件
if (physicRatio < 0 || physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
return true;
}
}
{
String storePathLogics = StorePathConfigHelper
.getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
if (logicsRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");
}
cleanImmediately = true;
} else if (logicsRatio > diskSpaceCleanForciblyRatio) {
cleanImmediately = true;
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok");
}
}
if (logicsRatio < 0 || logicsRatio > ratio) {
DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio);
return true;
}
}
//磁盘使用率正常
return false;
}
执行文件销毁与删除
CommitLog
public int deleteExpiredFile(
final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately
) {
//执行文件销毁与删除
return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately) {
//获取所有文件映射数组
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return 0;
int mfsLength = mfs.length - 1;
int deleteCount = 0;
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
//从倒数第二个文件开始遍历
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
//计算文件的最大存活时间( = 文件的最后一次更新时间+文件存活时间(默认72 小时)) ,
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
if (//当前时间大于文件的最大存活
System.currentTimeMillis() >= liveMaxTimestamp ||
//需要强制删除文件(当磁盘使用超过设定的阔值)
cleanImmediately) {
//清除MappedFile 占有的相关资源
if (mappedFile.destroy(intervalForcibly)) {
//若执行成,将该文件加入到待删除文件列表中
files.add(mappedFile);
deleteCount++;
if (files.size() >= DELETE_FILES_BATCH_MAX) {
break;
}
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {
}
}
} else {
break;
}
} else {
//avoid deleting files in the middle
break;
}
}
}
//将文件从mappedFiles文件集合删除
deleteExpiredFile(files);
return deleteCount;
}
MappedFile文件销毁
MappedFile
public boolean destroy(final long intervalForcibly) {
//关闭MappedFile
this.shutdown(intervalForcibly);
//判断是否清理完成
if (this.isCleanupOver()) {
try {
//关闭通道
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
//删除整个物理文件
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeElapsedTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
总结
为了保证消息不丢失,MQ做了持久化处理
为了保证消息有序,MQ做了单一文件存储
为了方便定位消息文件,消息存文件以偏移为文件名
为了提高单一文件存储带来的读取慢问题,MQ添加了消费队列文件和hash索引文件
为了提高存储消息效率,MQ提供异步和同步刷盘模式,并且新增堆内存池,用来提高消息直接刷入channel效率
为了保证消息的容错性,MQ添加了abort文件,记录broker正常还是异常关闭,并且新增checkPoint文件用于文件异常关闭恢复.
为了节约磁盘空间,新增删除过期文件机制。
还没有评论,来说两句吧...