每个Topic下的每个MessageQueue都有一个对应的ConsumeQueue文件
该结构对应于消费者逻辑队列,为什么要将一个topic抽象出很多的queue呢?这样的话,对集群模式更有好处,可以使多个消费者共同消费,而不用上锁;
1 offset(8) commitlog的偏移量
2 size(4) commitlog消息大小
3 tagHashCode tag的哈希值
index索引文件
文件地址:${user.home}\store\index${fileName}
index文件特点:
以时间作为文件名
一个存储单元是20个字节定长的
索引文件(Index)提供消息检索的能力,主要在问题排查和数据统计等场景应用
存储单元的结构 顺序 字段名 说明1 keyHash(4) key的结构是
2 phyOffset(8) commitLog真实的物理位移
3 timeOffset(4) 时间偏移量
4 slotValue(4) 下一个记录的slot值
消息存储流程 RocketMQ文件存储模型层次结构
层次从上到下依次为:
业务层
QueueMessageProcessor类
PullMessageProcessor类
SendMessageProcessor类
DefaultMessageStore类
存储逻辑层
IndexService类
ConsumeQueue类
CommitLog类
IndexFile类
MappedFileQueue类
磁盘交互IO层
MappedFile类
MappedByteBuffer类
业务层 QueueMessageProcessor PullMessageProcessorSendMessageProcessor
DefaultMessageStore
存储逻辑层 IndexService ConsumeQueue CommitLog
IndexFile MappedFileQueue
磁盘交互IO层 MappedFile
MappedByteBuffer
Disk
写commoitlog流程 1. DefaultMessageStore,入口方法是putMessage方法
RocketMQ 的存储核心类为 DefaultMessageStore,入口方法是putMessage方法
1 // DefaultMessageStore#putMessage 2 public PutMessageResult putMessage(MessageExtBrokerInner msg) { 3 // 判断该服务是否shutdown,不可用直接返回【代码省略】 4 // 判断broke的角色,如果是从节点直接返回【代码省略】 5 // 判断runningFlags是否是可写状态,不可写直接返回,可写把printTimes设为0【代码省略】 6 // 判断topic名字是否大于byte字节127, 大于则直接返回【代码省略】 7 // 判断msg中properties属性长度是否大于short最大长度32767,大于则直接返回【代码省略】 8 9 if (this.isOSPageCacheBusy()) { // 判断操作系统页写入是否繁忙 10 return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); 11 } 12 13 long beginTime = this.getSystemClock().now(); 14 PutMessageResult result = this.commitLog.putMessage(msg); // $2 查看下方代码,写msg核心 15 16 long elapsedTime = this.getSystemClock().now() - beginTime; 17 if (elapsedTime > 500) { 18 log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); 19 } 20 // 记录写commitlog时间,大于最大时间则设置为这个最新的时间 21 this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); 22 23 if (null == result || !result.isOk()) { 24 // 记录写commitlog 失败次数 25 this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); 26 } 27 28 return result; 29 }