Change Data Capture(CDC)
原创大约 1 分钟
Flink CDC就是一个可以从不同数据源中实时读取数据的技术,它既可以全量读取,也可以增量读取。
它的核心原理是监测并捕获数据库的变动(增删改等),将捕获到的数据发送到数据仓库或者数据湖,也可以写入到消息队列(例如Kafka
)供其他服务消费。
引入依赖。
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
代码实现。
package itechthink.cdc;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Flink MySQL CDC
*
*/
public class FlinkCDCForMySQL {
public static void main(String[] args) {
// 1. 准备环境 - Environment
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
MySqlSource<String> source = MySqlSource.<String>builder()
.hostname("172.16.185.176")
.port(3306)
.databaseList("javabook")
// 监听多张表
.tableList("javabook.test1, javabook.test2")
.username("root")
.password("123456")
.startupOptions(
/*
* 指定读取数据的方式:
* earliest()
* latest()
* initial()
* specificOffset(offset)
* timestamp(timestamp)
*/
StartupOptions.initial()
)
// 指定反序列化方式
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
// 开启Checkpoint机制
environment.enableCheckpointing(5000);
environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
environment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置重启策略
environment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
// 设置StateBackend
environment.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink/checkpoint"));
// 设置数据源
environment.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source")
.print().setParallelism(1);
// 5. 启动任务 - Execute
try {
environment.execute("FlinkCDCForMySQL");
} catch (Exception e) {
e.printStackTrace();
}
}
}
感谢支持
更多内容,请移步《超级个体》。