生产和消费消息
原创大约 2 分钟
先引入依赖。
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.13</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.13</version>
</dependency>
</dependencies>
生产者发送数据
package com.itechthink.kafka;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* 生产者发送数据
*
*/
public class KafkaProducer {
public static void main(String[] args) {
Properties prop = new Properties();
// 指定kafka的broker地址
prop.put("bootstrap.servers", "172.16.185.176:9092");
// 指定key-value数据的序列化格式
prop.put("key.serializer", StringSerializer.class.getName());
prop.put("value.serializer", StringSerializer.class.getName());
// 创建Kafka生产者
org.apache.kafka.clients.producer.KafkaProducer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(prop);
// 向topic发送数据
producer.send(new ProducerRecord<String, String>("test", "hello world"));
// 关闭链接
producer.close();
}
}
消费者消费数据
package com.itechthink.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Properties;
/**
* 消费者消费数据
*
*/
public class KafkaConsumer {
public static void main(String[] args) {
Properties prop = new Properties();
// 指定kafak的broker地址
prop.put("bootstrap.servers", "172.16.185.176:9092");
// 指定key-value的反序列化类型
prop.put("key.deserializer", StringDeserializer.class.getName());
prop.put("value.deserializer", StringDeserializer.class.getName());
// 指定消费者组
prop.put("group.id", "user2");
// 开启自动提交offset功能,默认就是开启的
prop.put("enable.auto.commit", "true");
// 自动提交offset的时间间隔,单位是毫秒
// 如果低于这个时间,将无法提交Consumer的offset信息,也会查询不到
prop.put("auto.commit.interval.ms", "5000");
/*
* 正常情况下kafka消费数据的流程如下
* 1. 先根据group.id指定的消费者组到kafka中查找之前保存的offset信息
* 2. 如果查找到了,说明之前使用这个消费者组消费过数据,则根据之前保存的offset继续进行消费
* 3. 如果没查找到(说明第一次消费),或者查找到了,但是查找到的那个offset对应的数据已经不存在了
* 4. 因为kafka默认只会保存7天的数据,超过时间数据会被删除,所以此时会根据auto.offset.reset的值执行不同的消费逻辑,这个参数的值有三种:[earliest,latest,none]
* earliest:表示从最早的数据开始消费(从头消费)
* latest: 表示从最新的数据开始消费,这是默认设置
* none:如果根据指定的group.id没有找到之前消费的offset信息,就会抛异常
* 5. 一般在实时计算的场景下,会消费最新的数据
* 6. 这个参数只有在消费者第一次消费数据,或者之前保存的offset信息已过期的情况下才会生效
*/
// 这里为了看测试效果,使用earliest
prop.put("auto.offset.reset", "earliest");
// 创建消费者
org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(prop);
Collection<String> topics = new ArrayList<String>();
topics.add("test");
// 订阅指定的topic
consumer.subscribe(topics);
// 循环消费数据
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : poll) {
System.out.println(consumerRecord);
}
consumer.commitSync();
}
}
}
感谢支持
更多内容,请移步《超级个体》。