编程范式
和之前的流式大数据处理一样,在Flink中,所有用于批
和流
的Table API & SQL
也都遵循相同的编程范式,也就是代码上的整体结构都基本上同。
package itechthink.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;
import org.apache.flink.table.api.*;
/**
* Table API & SQL编程范式
*
*/
public class TableApiAndSQLParadigm {
public static void main(String[] args) throws Exception {
// 1. 创建批或流执行的Table上下文环境
Configuration configuration = new Configuration();
TableEnvironment tableEnv = TableEnvironment.create(configuration);
// 2.1. 创建数据源表
tableEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build())
.option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)
.build());
// 2.2. 或者,从Table API查询中创建一个Table对象
Table table1 = tableEnv.from("SourceTable");
// 2.3. 或者,从SQL查询中创建一个Table对象
Table table2 = tableEnv.sqlQuery("SELECT * FROM SourceTable");
// 3.1. 创建一张保存结果的Sink数据表
tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable (EXCLUDING OPTIONS) ");
// 3.2. 或者,将查询结果保存到Sink数据表
TableResult tableResult1 = table1.insertInto("SinkTable").execute();
TableResult tableResult2 = table2.executeInsert("SinkTable");
}
}
原创大约 10 分钟