为什么用Groovy
Flink-CEP的底层是基于Non-deterministic Finite Automaton(NFA,非确定有限自动机)构建的,它是一种用来定义和描述不同状态转换过程的正则(表达式)语言。
Flink-CEP的底层是基于Non-deterministic Finite Automaton(NFA,非确定有限自动机)构建的,它是一种用来定义和描述不同状态转换过程的正则(表达式)语言。
在大多数系统中,都是通过Aviator自定义函数来实现关系表达式运算的。
Aviator有两种自定义函数。
AbstractFunction
:它实现的自定义函数,其call()
方法接受1 ~ 20
个参数。对于那些短时长(统计区间一般在5~15分钟)和浅层操作(注册、登录、修改IP、修改用户名等)的行为,如果使用常规的实时计算方式,例如,从Kafka拉取数据 -> Flink实时计算 -> 指标保存到Redis -> 从Redis读取数据,就显得有些冗长了。
Redis中不仅保存着Clickhouse计算出来的预聚合
数据,也保存Flink计算结果,例如,每5分钟统计一次近1小时的登录数据
。
类似近1小时内用户的登录次数多于3次
这样的风控指标,本质上是一种风控关系表达式
,它由左变量
、关系运算符
和右变量
(或阈值
)组成,如果把它用另外一种方式展现出来就是这样。
对于这种风控指标的计算来说,真正的难点不在于计算本身,而在于 如何快速且准确地取得指定时间片的数据
。
除了实现对风控指标的预计算,Clickhouse还需要存储并分析用户行为数据。
首先在Clickhouse中创建如下三张表。
-- 创建数据库
:) CREATE DATABASE IF NOT EXISTS dwd;
-- 创建数据同步表
:) DROP TABLE IF EXISTS dwd.event_middle;
:) CREATE TABLE IF NOT EXISTS dwd.event_middle (
eid UInt64,
etype String,
ename String,
esource String,
etime DateTime,
userid UInt64,
aid UInt64,
aname String,
tid String,
tname String,
context String
) ENGINE = Kafka() SETTINGS
kafka_broker_list = '172.16.185.176:9092',
kafka_topic_list = 'eventbus',
kafka_group_name = 'clickhouse',
kafka_format = 'JSONEachRow';
-- 创建物化视图
:) DROP VIEW IF EXISTS dwd.event_materialized_view;
:) CREATE MATERIALIZED VIEW IF NOT EXISTS dwd.event_materialized_view
TO dwd.event_table
SELECT
userid, ename, etime
FROM dwd.event_middle;
-- 创建用户行为数据表
:) DROP TABLE IF EXISTS dwd.event_table;
:) CREATE TABLE IF NOT EXISTS dwd.event_table (
userid UInt64,
ename String,
etime DateTime
) ENGINE = MergeTree()
ORDER BY userId;