从Kafka到RocketMQ
原创大约 2 分钟
如果之前使用的是Kafka,但现在想将系统迁移到RocketMQ上去的话,是不可能一下子将所有Kafka服务器全部停掉,然后将业务数据直接写入到RocketMQ的,这需要有一个循序渐进的过程,主要是分三步走。
- 先执行
双写
方案,也就是让业务代码将结果同时写入到Kafka和RocketMQ。双写
的时间要持续一段时间,因为在这段时间内,除了要保证各个系统平稳运行不出错,还有一件很重要的事情要做:分别记录Kafka和RocketMQ发送消息的数量,并随机抽样调查写入的数据内容是否一致。
/**
* 生产消息
*
*/
public void payOrder() {
try {
// 在发送Kafka消息的地方,都加上发送RocketmQ消息
Producer<String, String> kafkaProducer = new KafkaProducer<>(properties);
RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
// 发送RocketmQ消息
SendResult sendResult = rocketProducer.send(message);
} catch (Exception e) {
....
}
}
- 再执行
双读
方案,也就是挑选几个用于测试的下游系统,然后将它从Kafka消费数据的逻辑,用RocketMQ再实现一遍,只不过此时RocketMQ消费的数据可以不必进入真实的业务系统,而是要么丢弃,要么保临时保存到别的地方。这里也同样要记录和统计读取到的消息数量,通过抽样调查的方式,来将它和Kafka消费的数据进行对比,从消息数量和到消息内容,再到接收时间,都可以仔细对比,看看有没有潜在的问题。
/**
* 消费消息
*
*/
public void grantCoupon() {
try {
// 在消费Kafka消息的地方,都加上消费RocketmQ消息
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
......
}
}
// 消费RocketmQ消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
......
});
} catch (Exception e) {
....
}
}
- 最后进行综合的统计分析,就是将Kafka和RocketMQ之间生产消息对比的情况,和它们消费消息对比的情况再进行一个综合的交叉对比,分别看看
发出去的
和收回来的
是否一致,延迟高还是低,异常或故障的处理方式有无不妥,数据内容是否一致,顺序是否正确等等,这一步至少要持续一周。
经过以上三步之后,如果没出现什么问题或风险,就可以逐步、按批次地下线之前的Kafka服务,然后逐步、按批次地上线新的RocketMQ服务了。