环境和脚本
> go get -u github.com/apache/rocketmq-client-go/v2
环境和脚本
> go get -u github.com/apache/rocketmq-client-go/v2
下载并解压。
> cd /home/work
> wget https://mirror.bit.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
> tar -zxvf apache-maven-3.6.3-bin.tar.gz
> mv apache-maven-3.6.3 maven-3.6.3
# 编辑配置文件
> vi /etc/profile
MAVEN_HOME=/home/work/maven-3.6.3
export PATH=${MAVEN_HOME}/bin:${PATH}
# 让配置生效
> source /etc/profile
> mvn –v
安装好Docker Compose
,然后执行下面的docker-compose.yml
文件。
version: '3.5'
services:
rmqnamesrv:
image: foxiswho/rocketmq:server
restart: always
container_name: rmqnamesrv
ports:
- 9876:9876
volumes:
- /home/work/volumes/rocketmq/logs:/opt/logs
- /home/work/volumes/rocketmq/store:/opt/store
networks:
rmq:
aliases:
- rmqnamesrv
rmqbroker:
image: foxiswho/rocketmq:broker
restart: always
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
volumes:
- /home/work/volumes/rocketmq/logs:/opt/logs
- /home/work/volumes/rocketmq/store:/opt/store
- /home/work/volumes/rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf
environment:
NAMESRV_ADDR: "rmqnamesrv:9876"
JAVA_OPTS: " -Duser.home=/opt"
JAVA_OPT_EXT: "-server -Xms256m -Xmx256m -Xmn256m"
command: mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqbroker
rmqconsole:
image: styletang/rocketmq-console-ng
restart: always
container_name: rmqconsole
ports:
- 8080:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqconsole
networks:
rmq:
name: rmq
driver: bridge
如果之前使用的是Kafka,但现在想将系统迁移到RocketMQ上去的话,是不可能一下子将所有Kafka服务器全部停掉,然后将业务数据直接写入到RocketMQ的,这需要有一个循序渐进的过程,主要是分三步走。
当RocketMQ的消费者终端崩溃而生产者依旧在不停地写入并发送消息时,就会产生消息积压问题。
如果仅仅只是几百上千,哪怕上万的消息积压,倒也不是什么问题,但如果积压的消息量一旦达到某个数量级,就会对RocketMQ和业务系统造成冲击。
当某个消费消息的业务系统崩溃且在短时间内都无法恢复时,RocketMQ最多会重复进行16次
的消息投递动作。
如果16次之后还是无法得到确认,那么RocketMQ就会将这些迟迟不能被消费的消息放进一个它内部特有的队列中,这个队列的名称是DeadlineQueue
,也叫死信队列。
RocketMQ有一个非常强大的功能,就是它支持事务消息
。所谓事务消息
,它可以保证消息的正确投递,而且绝不丢失消息,要么全部投递成功,要么全部投递失败,绝对不存在成功一部分或失败一部分的可能。
所谓Half消息
,其实是一种试探性的机制,也就是在正式发送消息之前,先向RocketMQ发送一个Half消息
。这个Half消息
对于其他系统来说是看不见
的,起到类似于回声定位
的作用,发送它的目的是看RocketMQ是否能够正常响应,然后再决定后续将执行什么样的操作。
在知道了发送者
的发送和存储机制之后,就可以来看看消费者
的接收和消费机制了。
组(Group)
是对Topic
概念的进一步划分,类似于在大主题中再区分出不同的子主题。
当消息从Producer生产者
经过Broker
发出后,在正常情况下,关注同一个TopicA
的两个消费者组stock_consume_group
和markting_consume_group
都可以接收到这条消息,且在正常情况下,只有其中一台机器能够接收到消息。