消息处理中的几个问题
死信队列
当某个消费消息的业务系统崩溃且在短时间内都无法恢复时,RocketMQ最多会重复进行16次
的消息投递动作。
如果16次之后还是无法得到确认,那么RocketMQ就会将这些迟迟不能被消费的消息放进一个它内部特有的队列中,这个队列的名称是DeadlineQueue
,也叫死信队列。
对于死信队列的处理,可以专门开启一个后台线程进行订阅拉取,然后做一些业务补偿的操作。
消息乱序
消息之所以会出现乱序问题,是因为每个Topic
都会有多个MessageQueue
,当向其中写入消息数据时,会把内容均匀地分发给不同的MessageQueue
。
然而读取的时候,却并不知道,也不关心当初写入数据时的顺序,所以就导致了消费消息时的乱序问题。
如果要想让消息有序,那么在数据从MySQL ==> Canal ==> RocketMQ的每一步,都必须实现数据的有序。
尤其是从Canal到RocketMQ这一步,必须将同一个订单的binlog
都发送到同一个MessageQueue
,而且还要有序。
而且这种有序数据的处理,是不能允许消息失败重试的。
所以,当遇到消息读取或处理失败时,就要立即暂停一批消息的处理,而且不能将它们放进重试队列,而是要自己开发出失败消息的处理方案。
同时还要开启事务消息,确保消息零丢失。
消息过滤
除了Topic
和Group
,消费者还可以指定消息的tags
属性,也就是标签,比如只消费具有某一类标签的消息。RocketMQ还支持一些基本的数据过滤语法,比如数值比较、逻辑比较和字符比较。
消息延迟
所谓消息延迟
,是借鉴了Redis
中的过期失效机制。
例如,有些用户在下单后并没有及时付款,所以这些待支付订单会短期存在于RocketMQ中。用后台线程来定期扫描这些短期存活消息的方法并不是很好的方法,因为消息队列系统一般都是分布式的,而且数据量比较大。如果多台机器都部署订单系统,那么每台机器上都要做这种扫描工作,时机和方式都不好把握,比较麻烦。
如果使用RocketMQ的延迟消息,那么可以指定在创建订单的30分钟后,该条消息才会被消费到。这样的话,如果30分钟后该订单还是待支付状态,那么就可以关闭订单了,否则什么都不用做。
不管是哪种情况,都无需再执行消息队列扫描工作。
消息权限
RocketMQ支持ACL的权限控制配置,通过设定用户和相应Topic
的操作权限,就可以实现对消息操作权限的控制。
可以做到某一类用户只能消费某一类Topic
,是无法读取到其他的Topic
消息的。
轨迹追踪
如果想知道某条消息是什么时候从哪个生产者发出来的,又是进入到哪个Topic
中去的,是被哪个消费者所消费的等这类消息的行动轨迹
问题,可以将Broker
配置文件中的traceTopicEnable
选项设置为true
。
RocketMQ会在内部自动创建一个专有的Topic
,用来存储所有消息的所有轨迹数据。