发送与存储消息
消息发送机制
在RocketMQ中,每个Topic
的数据都是以分布式的方式存储在多个Broker
中的。

而决定数据存放策略的,就是RocketMQ中的MessageQueue
机制,而这个MessageQueue
是需要在创建Topic
时就设置好的。
例如,假设TopicA
中有4000条数据,而现在有4个MessageQueue
,那么可以根据某种策略来决定每个MessageQueue
中存放多少条数据。可以在每个MessageQueue
中存放1000条,也可以把这4000条数据全部存在一个MessageQueue
中,或者以其他的方式。
如果把这4个MessageQueue
放在2个不同的Broker
中,那么可能会是这样的形式。

所以,MessageQueue
在RocketMQ中所起作用其实就是一个数据分片
机制:通过MessageQueue
对Topic
进行拆分,然后又将这些MessageQueue
分别放在不同的Broker
里面,从而实现分布式地调度与管理。
那消息生产者Producer
在发送消息时是如何决定数据从哪个MessageQueue
中发出去的呢?它在总体上遵循着这样一个过程。
- 在消息发送之前,消息生产者
Producer
会从NameServer
中获取Topic
相关的路由数据,从而获得Topic
的MessageQueue
和Broker
信息。

假设是均匀写入数据的策略,那么当要发送20条数据时,RocketMQ就会往4个
MessageQueue
中分别各写5条数据,这种分摊的做法可以有效减轻每个Broker
的压力,从而实现高并发。如果在运行过程中,某个
Broker
突然挂掉,而在Slaver Broker
(假如有的话)切换为Master Broker
的过程中,该Broker
节点会存在短暂的不可用状态。所以,此时需要开启RocketMQ中的一个开关:sendLatencyFaultEnable
。

这是一种容错机制,它会让RocketMQ自动避开有故障的Broker
一段时间,直到它恢复正常之后再继续访问。
消息存储机制
Broker
发送完成之后并不是直接就把数据丢弃了,而是将它保存起来,因为此时消费者还没有消费数据。
而且,万一后续需要进行消息重发,或者执行数据回滚,都需要用到这些之前保存过的消息数据。
其实,Broker
的数据存储是比较关键的一个环节,因为它的性能好坏直接决定了整个RocketMQ的吞吐量,也就是消息写入和读取的速率。
Broker
消息数据存储的机制大概是这样的。
- 当
Broker
收到一条消息数据时,会按照顺序追加的方式把它直接写到磁盘上一个叫做CommitLog
的日志文件中(每个CommitLog
文件大小限定1GB,写满后重新创建新的CommitLog
文件继续写入)。

- 在
Broker
中的每一个MessageQueue
,都会有一个与之对应的ConsumeQueue
,它的格式就像这样:~/store/consumequeue/${topicId}/${messageQueueId}/${fileName}
,其中${fileName}
里面保存的是某条消息在CommitLog
中的偏移量、长度等信息,可以说它们是描述消息内容的元数据——很多个这样的${fileName}
保存的就是MessageQueue
中所有消息的元信息。
例如,在Broker
中存放了TopicA
的两个MessageQueue
,分别叫m0
和m1
,它们有自己对应的ConsumeQueue
,分别叫c0
和c1
,那么m0
和m1
中的消息数据的元数据就存放在下面的文件中。
~/store/consumequeue/TopicA/m0/c0
~/store/consumequeue/TopicA/m1/c1
所以,可这样说:CommitLog
保存的是消息内容,而c0
和c1
保存的是这些消息数据在CommitLog
中的描述信息,相当于是引用地址
。
也就是说:一个CommitLog
可能对应着很多个ConsumeQueue
的${fileName}
。

每次
Broker
将消息数据写入到CommitLog
时,都不会经过内存,而是直接写入磁盘,这一步对RocketMQ的性能影响很大:Broker
的写磁盘功能是借助操作系统的Page Cache和顺序写
这两个机制完成的,这是一种操作系统缓存 + 异步刷盘
来实现高性能磁盘写入的方式。如果在刷盘时操作系统崩溃宕机,那么
Broker
可能会丢失数据。所以为了保证数据能够100%不丢失,就有了另一种同步刷盘
策略,也就是当生产者Producer
发送消息时会立即执行保存(刷盘)操作,只有刷盘成功才认为是消息发送成功,否则Producer
会不停地重试,直到成功为止。但这种确保数据100%不丢失的策略是以损失性能为代价的。异步刷盘
可以满足超高的写入吞吐量,但有丢失数据的风险,所以非常适用于秒杀场景;而同步刷盘
可以100%保证数据不丢失,但写入性能大打折扣,所以适用于支付、退款、对账等涉及交易的场景。除了
CommitLog
,ConsumeQueue
也是通过同样的Page Cache和顺序写
技术实现刷盘的,而且它和CommitLog
不同的是,它几乎都是存放在操作系统的Cache
中的,所以RocketMQ才可能有那么高的读写性能及高吞吐量。