为什么用Groovy
Flink-CEP的底层是基于Non-deterministic Finite Automaton(NFA,非确定有限自动机)构建的,它是一种用来定义和描述不同状态转换过程的正则(表达式)语言。
Flink-CEP的底层是基于Non-deterministic Finite Automaton(NFA,非确定有限自动机)构建的,它是一种用来定义和描述不同状态转换过程的正则(表达式)语言。
在大多数系统中,都是通过Aviator自定义函数来实现关系表达式运算的。
Aviator有两种自定义函数。
AbstractFunction
:它实现的自定义函数,其call()
方法接受1 ~ 20
个参数。对于那些短时长(统计区间一般在5~15分钟)和浅层操作(注册、登录、修改IP、修改用户名等)的行为,如果使用常规的实时计算方式,例如,从Kafka拉取数据 -> Flink实时计算 -> 指标保存到Redis -> 从Redis读取数据,就显得有些冗长了。
Redis中不仅保存着Clickhouse计算出来的预聚合
数据,也保存Flink计算结果,例如,每5分钟统计一次近1小时的登录数据
。
类似近1小时内用户的登录次数多于3次
这样的风控指标,本质上是一种风控关系表达式
,它由左变量
、关系运算符
和右变量
(或阈值
)组成,如果把它用另外一种方式展现出来就是这样。
对于这种风控指标的计算来说,真正的难点不在于计算本身,而在于 如何快速且准确地取得指定时间片的数据
。
Flink CDC就是一个可以从不同数据源中实时读取数据的技术,它既可以全量读取,也可以增量读取。
它的核心原理是监测并捕获数据库的变动(增删改等),将捕获到的数据发送到数据仓库或者数据湖,也可以写入到消息队列(例如Kafka
)供其他服务消费。
引入依赖。
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
和之前的流式大数据处理一样,在Flink中,所有用于批
和流
的Table API & SQL
也都遵循相同的编程范式,也就是代码上的整体结构都基本上同。
package itechthink.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;
import org.apache.flink.table.api.*;
/**
* Table API & SQL编程范式
*
*/
public class TableApiAndSQLParadigm {
public static void main(String[] args) throws Exception {
// 1. 创建批或流执行的Table上下文环境
Configuration configuration = new Configuration();
TableEnvironment tableEnv = TableEnvironment.create(configuration);
// 2.1. 创建数据源表
tableEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build())
.option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L)
.build());
// 2.2. 或者,从Table API查询中创建一个Table对象
Table table1 = tableEnv.from("SourceTable");
// 2.3. 或者,从SQL查询中创建一个Table对象
Table table2 = tableEnv.sqlQuery("SELECT * FROM SourceTable");
// 3.1. 创建一张保存结果的Sink数据表
tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable (EXCLUDING OPTIONS) ");
// 3.2. 或者,将查询结果保存到Sink数据表
TableResult tableResult1 = table1.insertInto("SinkTable").execute();
TableResult tableResult2 = table2.executeInsert("SinkTable");
}
}
设想有下面这样的电商应用场景。
找出那些超时未支付的订单,例如,下单10分钟内
没有支付的订单有多少?
找出那些1小时
内至少有过3次
有效交易的用户账户。
找出那些在5秒钟内
连续登录失败超过至少3次
的账号。
如果说上面的还算简单的话,那下面的这几个可就难办了。
找出行为事件小于8且浏览商品量少于3条的支付订单。
找出识别指定规则的时间并按指定方式输出。