CEP(复杂事件处理)
什么是“CEP”
设想有下面这样的电商应用场景。
找出那些超时未支付的订单,例如,下单
10分钟内
没有支付的订单有多少?找出那些
1小时
内至少有过3次
有效交易的用户账户。找出那些在
5秒钟内
连续登录失败超过至少3次
的账号。
如果说上面的还算简单的话,那下面的这几个可就难办了。
找出行为事件小于8且浏览商品量少于3条的支付订单。
找出识别指定规则的时间并按指定方式输出。
如果用传统的MySQL等关系型数据库,这些都相当麻烦,即使是用Flink,它的代码量也不会少——这就是Flink-CEP大显身手的好机会。
Flink-CEP是一个基于Flink的事件处理库,它既可以在流中识别出符合特定规则的事件及其序列,也能识别出特定的行为模式,进而触发相应的处理逻辑。

组成部分
Flink-CEP的主要组件有下面这些。
EventStream
,事件流,通常是连续的行为事件或事件序列。Pattern
模式定义指的是对事件进行处理的规则。它可以是由多个简单模式组成的复杂模式,也可以是由多个复杂模式组成的模式序列。Pattern
模式检测。生成
Alert
。
Pattern
模式分为这几类。
个体模式,是指单独的一个模式,例如
单例模式
或循环模式
,这里的单例模式
可不是设计模式
中的那个单例模式
。组合模式,它是多个个体模式组成的模式序列。
模式组,是将某个模式作为条件嵌入到另一个模式(通常是嵌入到个体模式)中。
CEP简单模式
通用实现
CEP
开发并不复杂,只需要按照“套路”来就行。下面是CEP
模式检测的通用代码模板。
package itechthink.cep;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
import java.util.Map;
/**
* CEP模式检测的一般做法
*
*/
public class CepPatternDetection {
public static void main(String[] args) {
// 创建流式计算上下文环境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 生成流
DataStream<String> dataStream = null;
KeyedStream<String, Tuple> keyedStream = dataStream.keyBy("");
/* **********************
* 生成模式或规则(Pattern 对象)
*
* 个体模式
*
* 1. 单例模式:只接收1个事件
* 2. 循环模式:能接收多个事件或1个事件,单例模式 + 量词
*
* *********************/
// 生成名叫“login”的单个Pattern
Pattern<String, String> pattern =
Pattern.<String>begin("login").where(new SimpleCondition<String>() {
@Override
public boolean filter(String s) throws Exception {
// Patter规则内容
return false;
// times就是量词
}
// times为量词,例如登录3次才符合where条件中的事件
}).times(3);
/* **********************
*
* 组合模式
*
* 组合方式:
* 1. next: 严格紧邻,指事件A和事件B之间不能有任何的第三个事件出现
* 2. fallowedBy: 宽松近邻,事件A和事件B之间可以存在第三个事件
* 3. fallowedByAny: 非严格匹配,比fallowedBy更宽松
*
* *********************/
// 生成了三个Patten所组成的Pattern序列,分别名叫"login"、"buy"和"sale"
Pattern<String, String> patterns =
Pattern.<String>begin("login")
// 自定义事件的过滤条件
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String s) throws Exception {
return false;
}
}).times(3)
// 严格紧邻
.next("buy")
// 自定义事件的过滤条件
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String s) throws Exception {
return false;
}
}).times(3)
// 宽松近邻
.followedBy("sale");//.where().times(n);
// 2. 将Pattern应用于KeyedStream,生成PatternStream对象
PatternStream<String> patternStream = CEP.pattern(keyedStream, patterns);
// 3. 通过PatternStream对象的select()方法, 将符合规则的数据提取
DataStream<Object> patternResult = patternStream.select(new PatternSelectFunction<String, Object>() {
/**
* map: key指的是Pattern的名称,而value指的是符合这个Pattern的数据
*
*/
@Override
public Object select(Map<String, List<String>> map) throws Exception {
return null;
}
});
}
}
模式检测
例如,对于登录事件的检测,几乎是所有电商网站都必备的一项底层风控功能。
下面的这段CEP
代码可以检测出3秒内
连续出现2次
登录失败的事件。
package itechthink.cep;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.List;
import java.util.Map;
/**
* 一个简单的CEP模式检测:检测出连续出现2次登录失败的事件
* 1) 定义模式
* 2) 匹配结果
*
*/
public class CEPSimpleJob {
public static void main(String[] args) throws Exception {
// 1. 准备环境 - Environment
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
// 2. 读取数据 - Source
KeyedStream<Event, String> keyedStream = environment.fromElements(
"1,success,1622689918",
"2,failure,1622689952",
"2,failure,1622689953",
"2,failure,1622689954",
"2,success,1622689958",
// 用水印保证事件顺序
"2,failure,1622689956"
)
// 3. 处理数据 - Transform
.map(new MapFunction<String, Event>() {
@Override
public Event map(String s) throws Exception {
String[] split = s.split(",");
return new Event(split[0].trim(), split[1].trim(), Long.parseLong(split[2].trim()));
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy
// 延迟5秒
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((SerializableTimestampAssigner<Event>) (event, timestamp) -> event.time * 1000))
.keyBy(event -> event.userId);
// 4. 输出数据 - Sink
// 定义CEP规则模式
Pattern<Event, Event> pattern = Pattern.<Event>begin("login")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return event.result.equalsIgnoreCase("failure");
}
})
.next("next")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return event.result.equalsIgnoreCase("failure");
}
})
// 3秒内失败
.within(Time.seconds(3));
// 将规则应用于流上
PatternStream<Event> patternStream = CEP.pattern(keyedStream, pattern);
// 提取符合规则的数据
patternStream.select(new PatternSelectFunction<Event, Message>() {
@Override
public Message select(Map<String, List<Event>> result) throws Exception {
Event first = result.get("login").get(0);
Event last = result.get("next").get(0);
Message message = new Message();
message.userId = first.userId;
message.first = first.time;
message.second = last.time;
message.msg = "连续登录失败...";
return message;
}
}).print();
// 5. 启动任务 - Execute
environment.execute("CEPSimpleJob");
}
}
class Event {
public String userId;
public String result;
public Long time;
public Event() {
}
public Event(String userId, String result, Long time) {
this.userId = userId;
this.result = result;
this.time = time;
}
}
class Message {
public String userId;
public Long first;
public Long second;
public String msg;
@Override
public String toString() {
return "Message{" +
"userId='" + userId + '\'' +
", first=" + first +
", second=" + second +
", msg='" + msg + '\'' +
'}';
}
}
更复杂的场景
package itechthink.cep;
import itechthink.cep.condition.ClipCouponsDeep.BrowseCondition;
import itechthink.cep.condition.ClipCouponsDeep.EventIntervalCondition;
import itechthink.cep.condition.ClipCouponsDeep.GradeCondition;
import com.imooc.RiskCtrlSys.model.EventPO;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
/**
* 账号等级为L2以上的用户各行为之间时间间隔平均少于3分钟
* 或者浏览行为停留平均时间少于3分钟
*
*/
public class ClipCouponsDeep {
public static void main(String[] args) {
// Kafka
DataStream<EventPO> eventStream = null;
// 生成KeyedStream
KeyedStream<EventPO, Integer> keyedStream = eventStream.keyBy(new KeySelector<EventPO, Integer>() {
@Override
public Integer getKey(EventPO eventPO) throws Exception {
return eventPO.getUser_id_int();
}
});
// 生成组合模式或规则(Pattern)
Pattern<EventPO, ?> pattern = Pattern
.<EventPO>begin("before")
// 账号等级为L2以上,才能进入模式
.where(new GradeCondition())
// 前后行为事件
.next("after")
// 时间间隔平均少于3分钟
.where(new EventIntervalCondition())
// 或者浏览行为停留平均时间少于3分钟
.or(new BrowseCondition());
}
}
自定义CEP
在开发实际业务时,通常会用到多种不同的模式类别。
而且,真实的业务逻辑也不会这么简单,而是会附加诸多额外的条件。
基于个体模式
package itechthink.cep;
import itechthink.utils.EventConstantUtil;
import itechthink.utils.KafkaUtil;
import com.imooc.RiskCtrlSys.model.EventPO;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* 基于个体模式检测最近1分钟内连续登录失败超过3次的用户
* CEP模式:这3次登录失败事件之间不允许出现其他行为事件
*
*/
public class LoginFailByConsecutive {
public static void main(String[] args) {
// Kafka
DataStream<EventPO> eventStream = KafkaUtil.read(args);
// 生成KeyedStream
KeyedStream<EventPO, Integer> keyedStream = eventStream.keyBy(new KeySelector<EventPO, Integer>() {
@Override
public Integer getKey(EventPO eventPO) throws Exception {
return eventPO.getUser_id_int();
}
});
// 生成模式或规则(Pattern)
Pattern.
<EventPO>begin("login_fail_first")
.where(new SimpleCondition<EventPO>() {
@Override
public boolean filter(EventPO eventPO) throws Exception {
return "login_fail".equals(eventPO.getEvent_name());
}
})
/* **********************
*
* 事件流1(连续登录失败的事件流):event_A(login_fail), event_B(login_fail), event_C(login_fail)
* 事件流2(不连续登录失败的事件流):event_A(login_fail), event_D(login_success), event_B(login_fail), event_C(login_fail)
*
* 宽松近邻:不连续事件
* 严格紧邻:连续事件
*
* 1. 个体模式的循环模式匹配的是宽松近邻
*
* 2. consecutive()就指定匹配模式是严格紧邻
*
* *********************/
// 量词
.times(3)
// 严格紧邻,即连续的事件流
.consecutive()
// 时间限定
.within(Time.seconds(60));
}
}
基于组合模式
package itechthink.cep;
import itechthink.utils.KafkaUtil;
import com.imooc.RiskCtrlSys.model.EventPO;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* 基于组合模式检测最近1分钟内连续登录失败超过3次的用户
* CEP模式:这3次登录失败事件之间不允许出现其他行为事件
*
*/
public class LoginFailByComposite {
public static void main(String[] args) {
// Kafka
DataStream<EventPO> eventStream = KafkaUtil.read(args);
// 生成KeyedStream
KeyedStream<EventPO, Integer> keyedStream = eventStream.keyBy(new KeySelector<EventPO, Integer>() {
@Override
public Integer getKey(EventPO eventPO) throws Exception {
return eventPO.getUser_id_int();
}
});
// 生成模式或规则(Pattern)
/* **********************
*
* 事件流:A(login_fail), B(login_fail), C(login_fail)
*
* 1. 无论是个体模式还是组合模式,模式定义的开头都是begin()
*
* *********************/
Pattern.
<EventPO>begin("login_fail_first")
.where(new SimpleCondition<EventPO>() {
@Override
public boolean filter(EventPO eventPO) throws Exception {
return "login_fail".equals(eventPO.getEvent_name());
}
})
.next("login_fail_second")
.where(new SimpleCondition<EventPO>() {
@Override
public boolean filter(EventPO eventPO) throws Exception {
return "login_fail".equals(eventPO.getEvent_name());
}
})
// 这种组合模式的方式比较灵活
.followedBy("login_success")
.where(new SimpleCondition<EventPO>() {
@Override
public boolean filter(EventPO eventPO) throws Exception {
return "login_fail".equals(eventPO.getEvent_name());
}
})
.next("login_fail_third")
.where(new SimpleCondition<EventPO>() {
@Override
public boolean filter(EventPO eventPO) throws Exception {
return "login_fail".equals(eventPO.getEvent_name());
}
})
.within(Time.seconds(60));
}
}
基于自定义条件
package itechthink.cep;
import itechthink.cep.condition.ClipCouponsRoute.LoginCondition;
import itechthink.cep.condition.ClipCouponsRoute.UseCondition;
import itechthink.cep.condition.ClipCouponsRoute.ReceiveCondition;
import itechthink.utils.KafkaUtil;
import com.imooc.RiskCtrlSys.model.EventPO;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* 用户在15分钟内的行为路径是"登录-领券-下单"
* (明显薅羊毛行为特征)
*
*/
public class ClipCouponsRoute {
public static void main(String[] args) {
// Kafka
DataStream<EventPO> eventStream = KafkaUtil.read(args);
// 生成KeyedStream
KeyedStream<EventPO, Integer> keyedStream = eventStream.keyBy(new KeySelector<EventPO, Integer>() {
@Override
public Integer getKey(EventPO eventPO) throws Exception {
return eventPO.getUser_id_int();
}
});
// 生成组合模式或规则(Pattern)
Pattern<EventPO, ?> pattern = Pattern
// 过滤登录行为事件
.<EventPO>begin("login").where(new LoginCondition())
// 宽松近邻:过滤领取优惠券行为事件
.followedBy("receive").where(new ReceiveCondition())
// 宽松近邻:过滤使用优惠券行为事件
.followedBy("use").where(new UseCondition())
// 模式有效时间:15分钟内
.within(Time.minutes(15));
}
}
过滤登录行为事件。
package itechthink.cep.condition.ClipCouponsRoute;
import com.imooc.RiskCtrlSys.model.EventPO;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
/**
* 过滤登录行为事件
*
*/
public class LoginCondition extends SimpleCondition<EventPO> {
@Override
public boolean filter(EventPO eventPO) throws Exception {
return "login_success".equalsIgnoreCase(eventPO.getEvent_name());
}
}
过滤领取优惠券行为事件。
package itechthink.cep.condition.ClipCouponsRoute;
import com.imooc.RiskCtrlSys.model.EventPO;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
/**
* 过滤领取优惠券行为事件
*
*/
public class ReceiveCondition extends SimpleCondition<EventPO> {
@Override
public boolean filter(EventPO eventPO) throws Exception {
return "coupons_use".equalsIgnoreCase(eventPO.getEvent_type());
}
}
过滤使用优惠券行为事件。
package itechthink.cep.condition.ClipCouponsRoute;
import com.imooc.RiskCtrlSys.model.EventPO;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
/**
* 过滤使用优惠券行为事件
*
*/
public class UseCondition extends SimpleCondition<EventPO> {
@Override
public boolean filter(EventPO eventPO) throws Exception {
return "order".equalsIgnoreCase(eventPO.getEvent_type());
}
}
感谢支持
更多内容,请移步《超级个体》。