RocketMQ存储设计剖析

2021-12-05

最近在内部准备一个消息中间件的实践分享,顺便整理下RocketMQ的存储设计。存储设计是整个Broker最为重要的部分之一,好比你着手了解一个业务系统的逻辑时,如果先了解其DB表结构设计,再去理解业务代码逻辑就容易的多了。本文通过本地实验及相关文档参考,整理其核心概念的存储设计。

存储流程简介

image-20211205163509696

首先结合官方文档的图片对消息存储的流程有个大致的了解:

  1. 所有生产者都会往Broker指定的Topic发消息,在Broker收到消息后,无论消息属于哪个Topic,都会封装成一个标准格式后追加存储到一个CommitLog文件上。

  2. 存到到CommitLog文件后Broker会将这条消息在CommitLog的物理位置追加存储到一个ConsumeQueue的文件上,每个Topic都有多个ConsumeQueue,默认Broker在追加存储时会轮询这个Topic下的所有ConsumeQueue文件。

通过以上了解可以得知,其实CommitLog是消息的物理存储,而ConsumeQueue是消息的逻辑存储,类似于索引文件。另外在RocketMQ还提供了消息Key查询的功能特性,其实现也是在消息持久化后生成的索引文件IndexFile,在上图没有所有说明。

存储目录结构

RocketMQ的每个Broker启动后,会创建相应的存储目录来存储消息,默认目录在~/store,以下为其存储层级结构:

store
├── abort
├── checkpoint
├── commitlog
│   ├── 00000000000000000000
│   └── 00000000001073741824
├── config
│   ├── consumerFilter.json
│   ├── consumerFilter.json.bak
│   ├── consumerOffset.json
│   ├── consumerOffset.json.bak
│   ├── delayOffset.json
│   ├── delayOffset.json.bak
│   ├── subscriptionGroup.json
│   ├── topics.json
│   └── topics.json.bak
├── consumequeue
│   └── TopicTest
│       ├── 0
│       │   └── 00000000000000000000
│       ├── 1
│       │   └── 00000000000000000000
│       ├── 2
│       │   └── 00000000000000000000
│       └── 3
│           └── 00000000000000000000
├── index
│   └── 20211204001412810
└── lock

CommitLog设计

CommitLog是一种常见的设计思想,相关内容可以通过Martin Fowler的Write-Ahead Log了解,RocketMQ的CommitLog设计在里面都有相应的体现,如:1)为了防止单个CommitLog文件过大,RocketMQ对CommitLog文件做了分段拆分,默认1个文件为1GB。2)为了防止CommitLog文件无限追加导致存储不足,RocketMQ默认值保留3天的数据,超期的CommitLog会被清理删除。

CommitLog每个文件都以字节的offset来命名,从0开始(固定长度,左边做0填充),如第2个文件名可以通过计算获得,依此类推。

1GB = 1 * 1024 * 1024 * 1024 Byte = 1073741824 Byte

➜  ll store/commitlog
total 376
-rw-r--r--  1 nisiyong  staff   1.0G Dec  4 00:14 00000000000000000000
-rw-r--r--  1 nisiyong  staff   1.0G Dec  4 00:14 00000000001073741824

每条消息的编码格式如下表格,并追加到对应的CommitLog文件上,通过表格可得知每条消息的其中的固定部分占90个字节。

顺序字段名称数据类型字节数字段说明
1MsgLengthInt4消息总长度
2MagicCodeInt4魔数,固定值0xdaa320a7
3BodyCrcInt4消息内容CRC校验码
4QueueIdInt4消息的ConsumeQueue的ID
5FlagInt4消息FLAG,RocketMQ不做处理,供应用程序使用
6QueueOffsetLong8消息在ConsumeQueue上的偏移量
7PhysicalOffsetLong8消息在CommitLog上的偏移量
8SysFlagInt4消息系统FLAG,如是否压缩,是否为消息等
9BornTimestampLong8消息在客户端的生成的时间
10BornHostLong8消息在客户端的IP:PORT
11StoreTimestampLong8消息在服务端Broker的存储时间
12StoreHostLong8消息在服务端Broker的IP:PORT
13ReconsumeTimesInt4消息的重试次数
14PrepareTransactionOffsetLong8事物消息的偏移量
15BodyLengthInt4消息体的长度
16Bodybyte[]array size消息内容
17TopicLengthbyte1Topic的长度
18Topicbyte[]array sizeTopic名称
19PropertiesLengthbyte1扩展属性长度
20Propertiesbyte[]array size扩展属性内容

如下是本地磁盘的通过xxd命令获取的commitlog二进制文本信息,通过魔数daa320a7可以区分出每一条消息的大致位置。image-20211205171926534

ConsumeQueue设计

RocketMQ创建Topic时都会指定需要几个Queue,这些Queue会均衡的分配到各个Broker服务器上。Queue在Topic目录下,名称也类似commitlog按offset来命名。

ConsumeQueue文件主要存储消息的摘要信息,在commitlog之上多了一层逻辑层抽象,便于Topic隔离维护等

默认1个ConsumeQueue文件包含30w个条目,每个条目大小固定共20个字节,结构如下:

顺序字段名称数据类型字节数
1CommitLogOffsetLong8
2MsgLengthInt4
3TagHashCodeLong8

通过计算可得知1个ConsumeQueue文件写满后大小约5.7MB

  ll store/consumequeue/TopicTest/0
total 11720
-rw-r--r--  1 nisiyong  staff   5.7M Dec  4 00:14 00000000000000000000

IndexFile设计

RocketMQ为了方便消息检索,支持了用户在发送消息时设置自定的Key,消息在服务端根据Key进行索引构建,在后续的控制台可以通过该Key来查询该消息。

Broker端的IndexFile就是这些Key的索引文件,与上述的文件命名不同,IndexFile是更加时间戳来命名的,方便后续结合时间维度来查询。每个IndexFile由以下3部分组成:

  • IndexHeader,共40个字节
  • SlotTable,每个Slot占4个字节,存放消息Key的hashCode,默认1个IndexFile有500w个Slot
  • IndexItems,每个IndexItem占20个字节,默认1个IndexFile有2000w个IndexItem

通过计算可得知1个Index文件写满后大小约401MB

  ll store/index
total 39104
-rw-r--r--  1 nisiyong  staff   401M Dec  5 00:57 20211204001412810

固定40个字节的IndexHeader结构如下:

顺序字段名称数据类型字节数字段说明
1BeginTimestampLong8该索引文件内消息的最小存储时间
2EndTimestampLong8该索引文件内消息的最大存储时间
3BeginPhyoffsetLong8该索引文件内消息的最小CommitLog Offset
4EndPhyoffsetLong8该索引文件内消息的最大CommitLog Offset
5HashSlotCountInt4该索引文件的Slot数量,默认500w个
6IndexInt4该索引文件的IndexItem数量,默认2000w个

每个IndexItem的结构如下:

顺序字段名称数据类型字节数字段说明
1HashCodeInt4消息Key字符串的HashCode
2PhyoffsetLong8消息的ComimitLog Offset
3TimediffInt4与第一条消息的时间错差值,小于0该消息无效
4PreIndexNoInt4上一条消息的IndexItem的索引,出现Hash碰撞时,形成链表结构

IndexFile存储过程简单说明下,结合下图重点关注以下几点:

  • 每条消息的Key都能计算出一个4个字节Int类型的HashCode,通过该HashCod%500w得到slot的位置
  • 每条消息的信息可以生成一个IndexItem,只要IndexItems有空间,就直接追加存放,并把该IndexItem的位置索引存放到对应的slot上
  • 当出现hash碰撞时,新的IndexItem需要记录上一个slot的IndexItem位置,然后用新的IndexItem的位置覆盖到slot的位置

image-20211205223634676

总结

至此RocketMQ消息在服务端的存储设计及相关数据结构已经介绍完毕,由于篇幅关系只介绍了消息生产写入的逻辑,而消费端如何消费读取消息,以及消费的进度位点没有详细介绍,这块大家可以结合store/config/consumerOffset.json文件内容进行了解,逻辑比较简单。总体来看,对存储的数据结构有较为清晰的了解对实际使用时帮助比较大,后续在学习一些复杂的中间件时先从总体关注其核心功能设计,再逐步去了解局部的功能逻辑。

References