自定义读写数据源
原创大约 8 分钟
读写MySQL
1. 准备环境
使用已有的环境:Docker部署MySQL。
2. 引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.0-1.17</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
3. 读取代码
package itechthink.source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.sql.*;
/**
* 自定义Source从MySQL读取数据
* (Flink 1.17.1 + Flink Connector JDBC 3.1.0-1.17 + MySQL Connector 8.0.33)
*
*/
public class MySQLSourceJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
mysql(env);
env.execute();
}
public static void mysql(StreamExecutionEnvironment env) {
// 设置上下文层面的并行度
env.setParallelism(1);
DataStreamSource<Person> source = env.addSource(new PersonSource());
source.print().setParallelism(1);
}
public static class PersonSource extends RichParallelSourceFunction<Person> {
Connection connection;
PreparedStatement statement;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
statement = connection.prepareStatement("SELECT * FROM person");
}
@Override
public void run(SourceFunction.SourceContext<Person> context) throws Exception {
ResultSet rs = statement.executeQuery();
while (rs.next()) {
int id = rs.getInt("id");
String name = rs.getString("name");
context.collect(new Person(id, name));
}
}
@Override
public void cancel() {
}
@Override
public void close() throws Exception {
super.close();
close(connection, statement);
}
public static Connection getConnection() {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
return DriverManager.getConnection("jdbc:mysql://172.16.185.176:3306/javabook", "root", "123456");
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static void close(Connection connection, PreparedStatement statement) {
if(null != statement) {
try {
statement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if(null != connection) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Connection connection = getConnection();
close(connection, null);
}
}
public static class Person {
private int id;
private String name;
public Person() {
}
public Person(int id, String name) {
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
如果是写入,只需要把connection.prepareStatement()
方法中的SQL
语句换掉就行了。
4. 写入代码
package itechthink.sink;
import itechthink.transformation.Person;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
* 自定义Sink向MySQL写入数据
* (Flink 1.17.1 + Flink Connector JDBC 3.1.0-1.17 + MySQL Connector 8.0.33)
*
*/
public class MySQLSinkJob extends RichSinkFunction<Person> {
private Connection connection;
private PreparedStatement statement;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
mysql(env);
env.execute();
}
public static void mysql(StreamExecutionEnvironment env) {
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("localhost", 9528);
// 或者用MapFunction<String, Person>也可以
SingleOutputStreamOperator<Person> sourceStream = source.flatMap(new FlatMapFunction<String, Person>() {
@Override
public void flatMap(String input, Collector<Person> output) throws Exception {
output.collect(new Person(Integer.parseInt(input.split(",")[0]), input.split(",")[1]));
}
});
sourceStream.addSink(new MySQLSinkJob());
sourceStream.print().setParallelism(1);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
statement = connection.prepareStatement("INSERT INTO person VALUES (?, ?);");
}
@Override
public void invoke(Person value, Context context) throws Exception {
super.invoke(value, context);
statement.setInt(1, value.getId());
statement.setNString(2, value.getName());
boolean flag = statement.execute();
if(!flag) {
System.out.println("插入成功");
}
}
@Override
public void close() throws Exception {
super.close();
close(connection, statement);
}
public static Connection getConnection() {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
return DriverManager.getConnection("jdbc:mysql://172.16.185.176:3306/javabook", "root", "123456");
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static void close(Connection connection, PreparedStatement statement) {
if(null != statement) {
try {
statement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if(null != connection) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
读写Redis
1. 准备环境
使用已有的环境:Docker部署Redis。
2. 引入依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.12</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>5.1.0</version>
</dependency>
3. 写入代码
package itechthink.sink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
* 自定义Sink向Redis写入数据
* (Flink 1.17.1 + Flink Connector Redis 1.1.0 + Jedis 5.1.0)
*
*/
public class RedisSinkJob implements RedisMapper<Tuple2<String, String>> {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
redis(env);
env.execute();
}
public static void redis(StreamExecutionEnvironment env) {
env.setParallelism(1);
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setHost("172.16.185.176")
.setPassword("123456")
.setPort(6379)
.build();
DataStreamSource<String> source = env.socketTextStream("localhost", 9528);
SingleOutputStreamOperator<Tuple2<String, String>> sourceSink = source.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String input) throws Exception {
String[] split = input.split(",");
return new Tuple2<>(split[0], split[1]);
}
});
sourceSink.addSink(new RedisSink<>(conf, new RedisSinkJob()));
source.print().setParallelism(1);
}
// 使用SET命令写入数据
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET);
}
// 使用元组的第一个字段作为key
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
// 使用元组的第二个字段作为value
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1.toString();
}
}
其实官方推荐的RedisSink
并不好用,完全可以在理解Flink机制的情况下,自己来实现RichSinkFunction
抽象类或者接口SinkFunction
的功能。
虽然官方没有提供从Redis读取数据的例子,但只要知道了RichParallelSourceFunction
、ParallelSourceFunction
或SourceFunction
这些类或接口的作用,完全可以自己动手实现它。
4. 读取代码
package itechthink.source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import redis.clients.jedis.Jedis;
/**
* 自定义Source从Redis读取数据
* (Flink 1.17.1 + Flink Connector Redis 1.1.0 + Jedis 5.1.0)
*
*/
public class RedisSourceJob extends RichParallelSourceFunction<String> {
private Jedis jedis;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.addSource(new RedisSourceJob()).print().setParallelism(1);
env.execute();
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
jedis = new Jedis("172.16.185.176", 6379);
jedis.auth("123456");
}
@Override
public void run(SourceContext<String> context) throws Exception {
String key = "test";
String value = jedis.get(key);
context.collect("key is:" + key + ",value is:" + value);
}
@Override
public void cancel() {
}
@Override
public void close() throws Exception {
super.close();
jedis.close();
}
}
在上面读取MySQL和Redis的例子中,配置参数都是写死的。在分布式的环境中,完全可以把它们都放到配置中心里面去,比如Nacos
。
读写Kafka
1. 准备环境
使用已有的环境:Docker部署Kafka。
2. 引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.1</version>
</dependency>
3. 读写代码
package itechthink.kafka;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
/**
* 读写Kafka数据
* (Flink 1.17.1 + Flink Connector 1.17.1)
*
*/
public class KafkaSourceAndSinkJob {
private static final String KAFKA_BROKER = "172.16.185.176:9092";
private static final String KAFKA_GROUP_ID = "testGroup";
private static final String KAFKA_TOPIC = "test";
private static final String KAFKA_TRANSACTION_PREFIX = "flink-";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
readKafka(environment, CustomerKafkaDeserializationSchema.class);
// writeKafka(environment);
environment.execute("KafkaSourceAndSinkJob");
}
/**
* 使用自定义序列化器从向Kafka读出数据
*
*/
public static void readKafka(StreamExecutionEnvironment environment, Class<? extends KafkaDeserializationSchema<?>> deserializer) throws InstantiationException, IllegalAccessException {
/*
* 所有配置项,分布式环境下要么放在本地yml,要么放在Nacos
*
*/
Properties properties = getProperties();
// 环境相关参数
environment.setParallelism(1);
environment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
// checkpoint相关参数
environment.enableCheckpointing(5000);
CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
checkpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage("file:///Users/bear/home/work/checkpoint"));
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
final FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(KAFKA_TOPIC, deserializer.newInstance(), properties);
// // 从最早的开始读取
// consumer.setStartFromEarliest();
// // 从最新的开始读取
// consumer.setStartFromLatest();
// // 从指定时间戳读取
// consumer.setStartFromTimestamp(0L);
// // 从指定的群组偏移量开始读取
// consumer.setStartFromGroupOffsets();
// // 从指定偏移量开始读取
// consumer.setStartFromSpecificOffsets(null);
// 打印Kafka数据
environment.addSource(consumer).print("kafka");
environment.socketTextStream("localhost", 9528)
.filter(StringUtils::isNoneBlank)
.map(new MapFunction<String, String>() {
@Override
public String map(String input) throws Exception {
if ("exit".equalsIgnoreCase(input)) {
throw new RuntimeException("exit");
}
return input;
}
}).print("socket");
// // 新的Kafka读取方式
// // 使用新的读取方式报错:java.lang.NoSuchMethodError: org.apache.kafka.clients.admin.DescribeTopicsResult.allTopicNames()Lorg/apache/kafka/common/KafkaFuture;
// KafkaSource<String> source = KafkaSource.<String>builder()
// .setBootstrapServers(KAFKA_BROKER)
// .setTopics(KAFKA_TOPIC)
// //.setGroupId(KAFKA_GROUP_ID)
// .setStartingOffsets(OffsetsInitializer.earliest())
// .setValueOnlyDeserializer(new SimpleStringSchema())
// .build();
// DataStreamSource<String> sourceStream = environment.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka DataSource");
// sourceStream.print();
}
/**
* 向Kafka写入数据
*
*/
public static void writeKafka(StreamExecutionEnvironment environment) {
// 设置并行度
environment.setParallelism(1);
// // 老版本的Kafka写入方式
// DataStreamSource<String> source = environment.socketTextStream("localhost", 9528);
// source.addSink(new FlinkKafkaProducer<String>(KAFKA_BROKER, KAFKA_TOPIC, new SimpleStringSchema( )));
// 新版本的Kafka写入方式
// 如果是精准一次,必须开启checkpoint
environment.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
DataStreamSource<String> source = environment.socketTextStream("localhost", 9528);
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(KAFKA_BROKER)
.setRecordSerializer(
KafkaRecordSerializationSchema
.builder()
.setTopic(KAFKA_TOPIC)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
// 配置容灾保证级别设置为精准一次
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// 如果是精准一次,必须设置事务的前缀
.setTransactionalIdPrefix(KAFKA_TRANSACTION_PREFIX)
// 如果是精准一次,必须设置事务超时时间: 大于checkpoint间隔,小于max15分钟
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "6000")
.build();
source.sinkTo(sink);
}
/**
* 读取配置
*
*/
private static Properties getProperties() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", KAFKA_BROKER);
properties.setProperty("group.id", KAFKA_GROUP_ID);
// 设置是否自动提交
properties.setProperty("enable.auto.commit", "false");
// 设置自动提交的时间间隔
properties.setProperty("auto.commit.interval.ms", "5000");
// 设置偏移量从最早的开始
properties.setProperty("auto.offset.reset", "earliest");
return properties;
}
}
自定义的CustomerKafkaDeserializationSchema
类代码如下。
package itechthink.kafka;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.nio.charset.StandardCharsets;
/**
* 自定义Kafka数据反序列化类
*
*/
public class CustomerKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
@Override
public boolean isEndOfStream(Tuple2<String, String> value) {
return false;
}
/**
* 解析出Kafka中的数据
*
*/
@Override
public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
// 自定义输出格式
return Tuple2.of(topic + "_" + partition + "_" + offset, new String(record.value(), StandardCharsets.UTF_8));
}
@Override
public TypeInformation<Tuple2<String, String>> getProducedType() {
return TypeInformation.of(new TypeHint<Tuple2<String, String>>() {});
}
}
读写Clickhouse
1. 准备环境
使用已有的环境:Docker部署Clickhouse。
2. 引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.0-1.17</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.4.6</version>
</dependency>
3. 读写代码
package itechthink.sink;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.sql.*;
/**
* Flink读取和写入Clickhouse
* (Flink 1.17.1 + Flink JDBC 1.17.1 + Clickhouse JDBC 0.4.6)
*
*/
public class ClickhouseSinkJob {
private static final String SELECT_SQL = "SELECT * FROM t_department;";
private static final String INSERT_SQL = "INSERT INTO t_department VALUES (?, ?, ?);";
private static final String DATABASE_URL = "jdbc:clickhouse://172.16.185.176:8123/default";
private static final String DATABASE_DRIVER_NAME = "com.clickhouse.jdbc.ClickHouseDriver";
public static void main(String[] args) throws Exception {
// 1. 准备环境 - Environment
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
readData(environment);
// writeData(environment);
// 5. 启动任务 - Execute
environment.execute("ClickhouseJob");
}
/**
* 从Clickhouse读取数据
*
*/
public static void readData(StreamExecutionEnvironment environment) throws ClassNotFoundException, SQLException {
Class.forName(DATABASE_DRIVER_NAME);
Connection connection = DriverManager.getConnection(DATABASE_URL);
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(SELECT_SQL);
while (rs.next()) {
int id = rs.getInt("id");
String name = rs.getString("name");
String location = rs.getString("location");
System.out.println(id + " ==> " + name + " ==> " + location);
}
rs.close();
stmt.close();
connection.close();
}
/**
* 向Clickhouse写入数据
*
*/
public static void writeData(StreamExecutionEnvironment environment) {
// 2. 读取数据 - Source
environment.socketTextStream("localhost", 9528)
// 3. 处理数据 - Transform
.filter(StringUtils::isNoneBlank)
.map(new MapFunction<String, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> map(String input) throws Exception {
String[] split = input.split(",");
return new Tuple3<>(split[0], split[1], split[2]);
}
})
// 4. 输出结果 - Sink
.addSink(JdbcSink.sink(
INSERT_SQL,
(statement, tuple3) -> {
statement.setString(1, tuple3.f0);
statement.setString(2, tuple3.f1);
statement.setString(3, tuple3.f2);
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(2000)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(DATABASE_URL)
.withDriverName(DATABASE_DRIVER_NAME)
.build()));
}
}
读写Elasticsearch
1. 准备环境
使用已有的环境:Docker部署Elasticsearch。
2. 引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch-base</artifactId>
<version>3.0.1-1.17</version>
</dependency>
3. 读取代码
4. 写入代码
感谢支持
更多内容,请移步《超级个体》。