什么是Kafka?
什么是Kafka
Kafka和RocketMQ一样,也是一个分布式的消息队列系统,但它和RocketMQ的一个显著区别在于其超高吞吐量
,这也是它最大的特色。
之所以如此,主要是因为它利用了磁盘顺序读写速度 > 内存随机读写速度
这一计算机系统的特性,以致于Kafka可以达到每秒百万级别的消息生产和消费速度。
Kafka主要应用在一些需要超高并发但无需事务性保证
的业务场景中,例如秒杀
、大数据ETL,这一点也和RocketMQ显著不同。
RocketMQ的很多设计灵感都来自于Kafka,早期也借鉴了Kafka的许多优秀设计。因此,Kafka中自然也有Producer
、Consumer
、Broker
、Group
和Topic
这些组件,它们先于RocketMQ中的同类组件而存在。

Broker
:消息的代理,它就是Kafka集群中的一个节点,每个Broker
中存在一个或多个Topic
。Topic
:消息主题,Kafka处理不同消息的分类,是一个逻辑上的概念。Partition
:它是Topic
物理上的分组,每个Topic
都由至少1个或多个Partition
组成。Message
:消息通信的基本单位,每个Message
都属于某个Partition
。Producer
:消息生产者,负责向Topic
中发送数据。Consumer
:消息消费者,消费Topic
中的数据。
Docker部署
采用docker-compose
统一管理ZooKeeper和Kafka的启停。
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
environment:
TZ: Asia/Shanghai
ulimits:
nofile:
soft: 65535
hard: 65535
nproc: 65535
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
TZ: Asia/Shanghai
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENERS: PLAINTEXT://kafka:9092
打开两个命令行终端,均进入Kafka容器。
> docker exec -it <容器ID> bash
/> cd /opt/kafka_2.13-2.8.1/bin
# 可以使用如下命令控制Kafka
# 创建主题
/> sh kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 2 --topic test
# 查看主题
/> sh kafka-topics.sh --list --zookeeper zookeeper:2181
/> sh kafka-topics.sh --describe --zookeeper zookeeper:2181 --topic test
# 修改主题(注意:Partition的数量只能增加不能减少,也不支持topic重命名)
/> ./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 3 --topic test
# 删除主题
/> sh kafka-topics.sh --delete --zookeeper zookeeper:2181 --topic test
# 生产消息
/> sh kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer-property "group.id=testGroup"
# 消费消息
/> sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group testGroup
# 或者
/> sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
本机部署
> cd /home/work
# 下载并解压zookeeper
> wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.2/apache-zookeeper-3.7.2-bin.tar.gz
> tar zxvf apache-zookeeper-3.7.2-bin.tar.gz
> mv apache-zookeeper-3.7.2-bin zookeeper-3.7.2
# 下载并解压kafka
> wget https://downloads.apache.org/kafka/3.7.1/kafka_2.12-3.7.1.tgz
> tar zxvf kafka_2.12-3.7.1.tgz
然后再分别修改ZooKeeper和Kafka的配置文件(它们默认就在单机模式
下运行)。
# 修改zookeeper的配置文件
> cd /home/work/zookeeper-3.7.2/conf
> cp zoo_sample.cfg zoo.cfg
> vi zoo.cfg
dataDir=/home/work/volumes/zookeeper/data
dataLogDir=/home/work/volumes/zookeeper/logs
# 修改kafka的配置文件
> cd /home/work/kafka_2.12-3.7.1/config
> vi server.properties
listeners=PLAINTEXT://<本机IP>:9092
# kafka存储核心数据的目录,并非日志
log.dirs=/home/work/volumes/kafka/logs
# 如果zookeeper和kafka在同一台机器上就不用改其中的"localhost"(可选)
zookeeper.connect=localhost:2181
# 启动zookeeper
> cd /home/work/zookeeper-3.7.2/
> ./bin/zkServer.sh start
# 启动kafka
> cd /home/work/kafka_2.12-3.7.1
> ./bin/kafka-server-start.sh config/server.properties
# 或者以守护进程方式启动
> ./bin/kafka-server-start.sh -daemon config/server.properties
最后通过jps
或者相关命令查看进程是否启动成功。
> jps
# zookeeper进程
400478 QuorumPeerMain
# kafka进程
401114 Kafka
> cd /home/work/zookeeper-3.7.2/
> ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/work/zookeeper-3.7.2/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone
然后可以通过和之前Docker部署同样的方式控制Kafka来创建Topic
、发送消息和消费消息了,只不过有两个地方需要注意一下。
如果是Docker部署的话就不存在这些问题。
感谢支持
更多内容,请移步《超级个体》。