Flink模式事件匹配
原创大约 2 分钟
对于那些短时长(统计区间一般在5~15分钟)和浅层操作(注册、登录、修改IP、修改用户名等)的行为,如果使用常规的实时计算方式,例如,从Kafka拉取数据 -> Flink实时计算 -> 指标保存到Redis -> 从Redis读取数据,就显得有些冗长了。
针对这类风控指标,如果使用Flink-CEP,是能即时进行计算并很快给出结果的。
通过Flink-CEP进行模式匹配的一般过程如下。
生成
DataStream
或KeyedStream
。生成模式
Pattern
(个体模式
或组合模式
)。将
Pattern
应用于KeyedStream
,生成PatternStream
。在
PatternStream
中过滤出符合Pattern
的数据。
如果用伪代码来表示的话就是这样的。
/**
* CEP模式匹配的常规做法
*
*/
public class CepPatternMatch {
public static void main(String[] args) {
// 创建流式计算上下文环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 生成流
DataStream<String> dataStream = environment.socketTextStream("localhost", 8080);
KeyedStream<String, Tuple> keyedStream = dataStream.keyBy("");
/*
* 生成模式或规则(Pattern对象)
* 个体模式
* 1. 单例模式:只接收1个事件
* 2. 循环模式:能接收多个事件或1个事件,单例模式 + 量词
*
*/
// 生成名叫“login”的单个Pattern
Pattern<String, String> pattern =
Pattern.<String>begin("login")
// 定义模式规则
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String s) throws Exception {
// Patter规则内容
return false;
}
})
// times为量词,例如登录3次才符合where条件中的事件
.times(3);
/*
* 生成模式或规则(Pattern对象)
* 组合模式
* 1. next:严格紧邻,指事件A和事件B之间不能有任何的第三个事件出现
* 2. fallowedBy:宽松近邻,事件A和事件B之间可以存在第三个事件
* 3. fallowedByAny:非严格匹配,比fallowedBy更宽松
*
*/
// 生成了三个Patten所组成的Pattern序列,分别名叫"login"、"buy"和"sale"
Pattern<String, String> patterns =
Pattern.<String>begin("login")
// 自定义事件的过滤条件
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String s) throws Exception {
return false;
}
}).times(3)
// 严格紧邻
.next("buy")
// 自定义事件的过滤条件
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String s) throws Exception {
return false;
}
}).times(3)
// 宽松近邻
.followedBy("sale");//.where().times(n);
// 2. 将Pattern应用于KeyedStream,生成PatternStream对象
PatternStream<String> patternStream = CEP.pattern(keyedStream, patterns);
// 3. 通过PatternStream对象的select()方法,过滤出符合规则的数据
DataStream<Object> patternResult = patternStream.select(new PatternSelectFunction<String, Object>() {
/**
* map:key指的是Pattern的名称,而value指的是符合这个Pattern的数据
*
*/
@Override
public Object select(Map<String, List<String>> map) throws Exception {
return null;
}
});
}
}
感谢支持
更多内容,请移步《超级个体》。