【RocketMQ源码学习】- 5. 消息存储机制 (2)

每个Topic下的每个MessageQueue都有一个对应的ConsumeQueue文件
该结构对应于消费者逻辑队列,为什么要将一个topic抽象出很多的queue呢?这样的话,对集群模式更有好处,可以使多个消费者共同消费,而不用上锁;

消息单元的存储结构 顺序 字段名 说明
1   offset(8)   commitlog的偏移量  
2   size(4)   commitlog消息大小  
3   tagHashCode   tag的哈希值  

 

index索引文件

文件地址:${user.home}\store\index${fileName}

index文件特点:

以时间作为文件名

一个存储单元是20个字节定长的

索引文件(Index)提供消息检索的能力,主要在问题排查和数据统计等场景应用

存储单元的结构 顺序 字段名 说明
1   keyHash(4)   key的结构是  
2   phyOffset(8)   commitLog真实的物理位移  
3   timeOffset(4)   时间偏移量  
4   slotValue(4)   下一个记录的slot值  

 

消息存储流程 RocketMQ文件存储模型层次结构

层次从上到下依次为:

业务层

QueueMessageProcessor类

PullMessageProcessor类

SendMessageProcessor类

DefaultMessageStore类

存储逻辑层

IndexService类

ConsumeQueue类

CommitLog类

IndexFile类

MappedFileQueue类

磁盘交互IO层

MappedFile类

MappedByteBuffer类

业务层   QueueMessageProcessor   PullMessageProcessor
SendMessageProcessor
 
DefaultMessageStore  
存储逻辑层   IndexService   ConsumeQueue   CommitLog  
IndexFile   MappedFileQueue  
磁盘交互IO层   MappedFile  
MappedByteBuffer  
Disk  

 

写commoitlog流程 1. DefaultMessageStore,入口方法是putMessage方法

RocketMQ 的存储核心类为 DefaultMessageStore,入口方法是putMessage方法

1 // DefaultMessageStore#putMessage 2 public PutMessageResult putMessage(MessageExtBrokerInner msg) { 3 // 判断该服务是否shutdown,不可用直接返回【代码省略】 4 // 判断broke的角色,如果是从节点直接返回【代码省略】 5 // 判断runningFlags是否是可写状态,不可写直接返回,可写把printTimes设为0【代码省略】 6 // 判断topic名字是否大于byte字节127, 大于则直接返回【代码省略】 7 // 判断msg中properties属性长度是否大于short最大长度32767,大于则直接返回【代码省略】 8 9 if (this.isOSPageCacheBusy()) { // 判断操作系统页写入是否繁忙 10 return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); 11 } 12 13 long beginTime = this.getSystemClock().now(); 14 PutMessageResult result = this.commitLog.putMessage(msg); // $2 查看下方代码,写msg核心 15 16 long elapsedTime = this.getSystemClock().now() - beginTime; 17 if (elapsedTime > 500) { 18 log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); 19 } 20 // 记录写commitlog时间,大于最大时间则设置为这个最新的时间 21 this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); 22 23 if (null == result || !result.isOk()) { 24 // 记录写commitlog 失败次数 25 this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); 26 } 27 28 return result; 29 }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zgjgys.html