用Groovy生成模式对象
为什么用Groovy
Flink-CEP的底层是基于Non-deterministic Finite Automaton(NFA,非确定有限自动机)构建的,它是一种用来定义和描述不同状态转换过程的正则(表达式)语言。
Flink-CEP正是通过它实现的模式匹配。

如果在程序运行期间,希望能够实时动态地调整风控规则,该怎么做呢?
比较容易想到的是启动一个后台线程周期性地读取MySQL中的规则配置数据,然后通过广播流将规则广播到事件流中去。
但这种方式有延迟,无法做到立即更新规则然后匹配事件——可能刚加载完更新后的规则,事件就已经流
走了。
此时,就需要用到Groovy了,之所以选择Groovy,是由于它的下面几个特性。
它是一门动态语言,可以无缝地与Java衔接。
作为脚本型语言,只要改变脚本内容,它就能立即重新加载执行。
运行Groovy程序
运行Groovy程序有三种方式。
Groovy Shell
:在Java中运行Groovy表达式并计算结果,它每一次执行时都会动态将代码编译成.class
文件,性能较差。Groovy Script Engine
:可以加载指定位置的Groovy脚本,如文件系统、URL链接、数据库等,并且当脚本内容发生变化时能够动态加载而不必停止服务。Groovy Class Loader
:它是一个定制的类加载器,打破了Java的双亲委派模型
,负责解释、加载所需要的Groovy类。
Groovy Shell
package groovy;
import groovy.lang.Binding;
import groovy.lang.GroovyShell;
/**
* GroovyShell运行Groovy脚本
*
* 1. Groovy和Java是无缝连接
* 2. Java应用在运行时集成Groovy的3种方式
* 1). GroovyShell:每一次执行时会动态将代码编译成Java Class
* 在Java类中,使用Binding对象输入参数给表达式
* 缺点:性能较差
* 2). GroovyScriptEngine:指定位置加载Groovy脚本,并且随着脚本变化而重新加载它们
* 3). GroovyClassLoader:是一个定制的类加载器,负责解释加载Java类中用到的Groovy类
*
* 3. 编写Groovy代码的两种风格:
* 1). 脚本 (不定义和.groovy同名的class)
* 2). 类(定义class,所有代码在class)
*
* 4. Groovy编译器会将脚本编译为类
* 如果Groovy脚本文件里只有执行代码,没有定义任何类,那么编译器会生成一个Script的子类
* 类名和脚本文件的文件名一样,脚本的代码会被包含在一个名为run的方法中
*
* 5. 如果Groovy脚本文件有执行代码,并且有定义类,就生成对应的class文件,脚本本身也会被编译成一个Script的子类
*
* 6. 如果.groovy文件内部出现了和文件同名的类,则这个.groovy文件会被视作是"用Groovy语言编写的Java代码"
* 此时不能用作脚本使用,只能看作普通的类
*
* 7. JVM双亲委派模型:所有的类都尽可能由顶层的类加载器加载,保证了加载的类的唯一性
* 由JVM自带的类加载器所加载的类,在JVM的生命周期中,始终不会被卸载
* 可以自定义类加载器,所加载的类会打破双亲委派模型,不要委托给父类加载器,就能够实现类的动态加载
*
* 8. GroovyClassLoader打破双亲委派模型,从而实现了Groovy能够动态加载Class的功能
*
* 9. 无论是GroovyShell的evaluate()方法,还是GroovyScriptEngine的run()方法
* 它们调用的都是GroovyClassLoader中的parseClass()
*
*/
public class GroovyShellDemo {
public static void main(String[] args) {
// groovy 脚本内容
String groovy_script = "println 'groovy'; println 'name = ' + name;";
// 初始化 Binding
Binding binding = new Binding();
// Groovy Shell传参
// 使用setVariable
binding.setVariable("name", "李星云");
// 创建脚本对象
GroovyShell shell = new GroovyShell(binding);
// 执行脚本
Object variableRes = shell.evaluate(groovy_script);
}
}
Groovy Script Engine
package groovy;
import groovy.lang.Binding;
import groovy.util.GroovyScriptEngine;
import groovy.util.ResourceException;
import groovy.util.ScriptException;
import java.io.IOException;
/**
* 通过Groovy Script Engine从指定位置加载Groovy脚本
*
*/
public class GroovyScriptEngineDemo {
public static void main(String[] args) throws IOException, ScriptException, ResourceException {
// 初始化GroovyScriptEngine
GroovyScriptEngine engine = new GroovyScriptEngine("src/main/resources");
// 初始化Binding
Binding binding = new Binding();
// 执行脚本
engine.run("engine.groovy", binding);
}
}
engine.groovy
脚本中的内容如下。
package scripts
class User {
}
println('groovy');
Groovy Class Loader
package groovy;
import groovy.lang.GroovyClassLoader;
import groovy.lang.GroovyObject;
import org.codehaus.groovy.control.CompilerConfiguration;
import java.io.File;
import java.io.IOException;
/**
* 通过类加载器调用Groovy自定义类
*
*/
public class GroovyClassLoaderDemo {
public static void main(String[] args) throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
/*
* 1. 通过类加载器调用Groovy自定义类步骤
*
* 1). 实例化一个GroovyClassLoader的对象
* 2). GroovyClassLoader 解析groovy脚本并生成一个Class对象
* 3). 实例化一个GroovyObject
* 4). 通过GroovyObject执行自定义类中的方法
*
*/
// 这个GroovyClassLoader的父加载器是当前线程的类加载器
GroovyClassLoader loader = new GroovyClassLoader(Thread.currentThread().getContextClassLoader(), new CompilerConfiguration());
// groovy脚本路径
File file = new File("src/main/groovy/scripts/ScriptEngine.groovy");
// 通过GroovyClassLoader加载类
Class<?> groovyClass = loader.parseClass(file);
// 获取ScriptEngine对象的实例
GroovyObject groovyObject = (GroovyObject) groovyClass.newInstance();
// 通过invokeMethod调用实例方法
groovyObject.invokeMethod("print", null);
}
}
ScriptEngine.groovy
脚本中的内容如下。
package scripts
class ScriptEngine {
void print() {
println('this is Groovy Class By GroovyClassLoader')
}
}
动态加载Pattern
首先,定义一个可以返回Flink-CEPPattern
的接口。
package com.itechthink.risk.flink.job.groovy;
import org.apache.flink.cep.pattern.Pattern;
/**
* Groovy接口:返回Flink-Cep Pattern的接口
*
*/
public interface GroovyRule<T> {
/**
* 返回Flink-Cep Pattern结构体
*
*/
Pattern<T, ?> getPattern();
}
然后,在src/main
中创建Groovy的源码目录src/main/groovy
,并创建如下代码。
package scripts.cep
import com.itechthink.risk.flink.job.groovy.LoginFailBySingletonCondition
import com.itechthink.risk.flink.job.groovy.GroovyRule
import org.apache.flink.cep.pattern.Pattern
import org.apache.flink.streaming.api.windowing.time.Time
/**
* Groovy脚本:基于个体模式检测最近1分钟内登录失败超过3次的用户
*
*/
class LoginFailBySingleton<EventPO> implements GroovyRule<EventPO> {
@Override
Pattern getPattern() {
return Pattern
.begin("login_fail_first")
// 将失败条件封装为Aviator自定义函数
.where(new LoginFailBySingletonCondition("__FIELD__", "__EXP__"))
.times(4)
.within(Time.seconds(60))
}
}
package com.itechthink.risk.flink.job.cep;
import com.itechthink.risk.flink.utils.GroovyUtil;
import com.itechthink.risk.flink.utils.KafkaUtil;
import com.itechthink.risk.model.EventPO;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import java.io.IOException;
/**
* 基于个体模式使用Groovy脚本检测最近1分钟内登录失败超过3次的用户
* CEP模式:允许这3次登录失败事件之间出现其他行为事件
*
*/
public class LoginFailByGroovy {
public static void main(String[] args) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
// 消费Kafka数据
DataStream<EventPO> eventStream = KafkaUtil.read(args);
// 生成KeyedStream
KeyedStream<EventPO, Integer> keyedStream = eventStream.keyBy(new KeySelector<EventPO, Integer>() {
@Override
public Integer getKey(EventPO eventPO) throws Exception {
return eventPO.getUserid();
}
});
/*
* 解析Groovy脚本,获取Flink-Cep Pattern结构体
*
*/
Pattern<EventPO, ?> pattern = (Pattern<EventPO, ?>) GroovyUtil.groovyEval(clazz, method, null);
}
}
LoginFailBySingletonCondition.java
的代码如下。
package com.itechthink.risk.flink.job.groovy;
import com.googlecode.aviator.AviatorEvaluator;
import com.itechthink.risk.flink.job.aviator.SumFunction;
import com.itechthink.risk.model.EventPO;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
/**
* 登录失败检测条件
*
*/
public class LoginFailBySingletonCondition extends SimpleCondition<EventPO> implements Serializable {
/**
* Aviator字段
*
*/
private String field;
/**
* 规则表达式
*
*/
private String expression;
public LoginFailBySingletonCondition(String field, String expression) {
this.field = field;
this.expression = expression;
// 加载Aviator自定义函数
AviatorEvaluator.addFunction(new SumFunction(this.field));
}
public boolean filter(EventPO eventPO) throws Exception {
Map<String, Object> params = new HashMap<>();
params.put("data", eventPO.getEname());
// Aviator表达式计算
return (Boolean) AviatorEvaluator.execute(expression, params);
}
}
Groovy脚本工具类GroovyUtil.java
的代码如下。
package com.itechthink.risk.flink.utils;
import groovy.lang.GroovyClassLoader;
import groovy.lang.GroovyObject;
import groovy.lang.Script;
import org.apache.flink.api.java.utils.ParameterTool;
import org.codehaus.groovy.control.CompilerConfiguration;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.concurrent.ConcurrentHashMap;
/**
* Groovy脚本工具类
*
*/
public class GroovyUtil {
// 将类加载器GroovyClassLoader创建为单例模式
private static volatile GroovyClassLoader LOADER;
// 将Class<Script>保存到map
private static final ConcurrentHashMap<String, Class<Script>> clazzMaps = new ConcurrentHashMap<String, Class<Script>>();
private static String GROOVY_ROOT = null;
static {
ParameterTool parameterTool = ParameterUtil.getParameters();
GROOVY_ROOT = parameterTool.get(Constants.GROOVY_SCRIPTS);
}
/**
* 生成Groovy类加载器
*
*/
private static GroovyClassLoader getEngineByClassLoader(String key) {
if (LOADER == null) {
synchronized (GroovyClassLoader.class) {
// 双重校验
if (LOADER == null) {
// GroovyClassLoader的父ClassLoader为当前线程的加载器,并添加一个编译配置对象
LOADER = new GroovyClassLoader(Thread.currentThread().getContextClassLoader(), new CompilerConfiguration());
}
}
}
return LOADER;
}
/**
* 生成Groovy脚本代码的md5指纹,作为clazzMaps的key
*
*/
private static String fingerKey(String scriptText) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] bytes = md.digest(scriptText.getBytes(StandardCharsets.UTF_8));
final char[] HEX_DIGITS = "0123456789ABCDEF".toCharArray();
StringBuilder ret = new StringBuilder(bytes.length * 2);
for (byte aByte : bytes) {
ret.append(HEX_DIGITS[(aByte >> 4) & 0x0f]);
ret.append(HEX_DIGITS[aByte & 0x0f]);
}
return ret.toString();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 解析Groovy脚本
*
*/
public static Object groovyEval(String groovyClass, String method, Object args) {
Object obj = null;
File file = new File(GROOVY_ROOT + groovyClass + ".groovy");
String md5 = fingerKey(file.getAbsolutePath());
// 获取之前解析过的脚本
Class<Script> script = clazzMaps.get(md5);
// 判断读取到的脚本是否为null,如果为null,说明之前没有解析过
if (script == null) {
try {
// 生成类加载器解析
GroovyClassLoader loader = getEngineByClassLoader(md5);
// 解析脚本
Class<?> groovyScript = loader.parseClass(file);
script = (Class<Script>) groovyScript;
// 保存Class<Script>到map中
clazzMaps.put(md5, script);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// 之前已经解析过,则直接获取实例
try {
GroovyObject groovyObject = script.newInstance();
// 反射调用方法
obj = groovyObject.invokeMethod(method, args);
return obj;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
单元测试
首先,在src/test
目录中创建Groovy的源码目录src/test/groovy
,并创建如下代码。
package scripts.cep
import com.itechthink.risk.flink.job.groovy.LoginFailBySingletonCondition
import com.itechthink.risk.flink.job.groovy.GroovyRule
import com.itechthink.risk.flink.model.SimpleEventTestPO
import org.apache.flink.cep.pattern.Pattern
import org.apache.flink.streaming.api.windowing.time.Time
/**
* 单元测试:Groovy脚本加载Pattern
*
*/
class CepPatternTest implements GroovyRule<SimpleEventTestPO> {
@Override
Pattern getPattern() {
return Pattern
.<SimpleEventTestPO> begin("fail")
.where(new LoginFailBySingletonCondition<SimpleEventTestPO>("__FIELD__", "__EXP__"))
.times(3)
.within(Time.seconds(60))
}
}
接着,创建一个用于测试的POJO对象。
package com.itechthink.risk.flink.model;
/**
* 简单的行为事件POJO对象
*
*/
public class SimpleEventTestPO {
/**
* 事件名称
*
*/
private String ename;
/**
* 事件发生时间
*
*/
private Long etime;
/**
* 用户id
*/
private Integer userid;
public SimpleEventTestPO(String ename, Long etime, Integer userid) {
this.ename = ename;
this.etime = etime;
this.userid = userid;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public Long getEtime() {
return etime;
}
public void setEtime(Long etime) {
this.etime = etime;
}
public Integer getUserid() {
return userid;
}
public void setUserid(Integer userid) {
this.userid = userid;
}
}
然后,创建一个告警输出POJO
对象。
package com.itechthink.risk.flink.model;
import java.util.List;
/**
* Flink-CEP告警POJO对象
*
*/
public class CepWarnTestPO {
private String name;
private List<SimpleEventTestPO> events;
public CepWarnTestPO(List<SimpleEventTestPO> events, String name) {
this.events = events;
this.name = name;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("规则:").append(name).append(" ");
int i = 0;
for (SimpleEventTestPO simpleEventTestPO : events) {
++i;
builder.append("事件").append(i).append(" ")
.append("uid:")
.append(simpleEventTestPO.getUserid())
.append(", 时间:")
.append(simpleEventTestPO.getEtime())
.append(", 行为:")
.append(simpleEventTestPO.getEname())
.append("\n");
}
return builder.toString();
}
}
最后,再创建测试类和测试方法。
package com.itechthink.risk.flink.job.cep;
import com.itechthink.risk.flink.model.CepWarnTestPO;
import com.itechthink.risk.flink.model.SimpleEventTestPO;
import com.itechthink.risk.flink.utils.GroovyUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.List;
import java.util.Map;
/**
* Flink-CEP单元测试
*
*/
public class FlinkCepTest {
/**
* 公共方法: 水印和分组
*
*/
private KeyedStream<SimpleEventTestPO, Integer> getSteam(StreamExecutionEnvironment env, DataStream<SimpleEventTestPO> dataStream) {
return dataStream
// 必须生成水印
.assignTimestampsAndWatermarks(
WatermarkStrategy
// 最大的数据延迟时间
.<SimpleEventTestPO>forBoundedOutOfOrderness(Duration.ZERO)
// 事件流的每个事件元素设置时间戳
.withTimestampAssigner(new SerializableTimestampAssigner<SimpleEventTestPO>() {
@Override
public long extractTimestamp(SimpleEventTestPO event, long l) {
return event.getEtime();
}
}))
.keyBy(SimpleEventTestPO::getUserid);
}
@DisplayName("测试基于Flink-Cep循环模式检测登录失败超过3次的用户")
@Test
void testLoginFailByCep() throws Exception {
// 流式计算上下文环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 生成事件流
DataStream<SimpleEventTestPO> dataStream = env.fromElements(
new SimpleEventTestPO("fail", 2000L, 1),
new SimpleEventTestPO("fail", 3000L, 1),
new SimpleEventTestPO("fail", 4000L, 2),
new SimpleEventTestPO("fail", 5000L, 1),
new SimpleEventTestPO("fail", 6000L, 2),
new SimpleEventTestPO("fail", 7000L, 2),
new SimpleEventTestPO("fail", 8000L, 3)
);
// 根据uid分组
KeyedStream<SimpleEventTestPO, Integer> stream = getSteam(env, dataStream);
// 定义Pattern
Pattern<SimpleEventTestPO, SimpleEventTestPO> pattern = Pattern.<SimpleEventTestPO>begin("cep-login-fail")
.where(new SimpleCondition<SimpleEventTestPO>() {
@Override
public boolean filter(SimpleEventTestPO value) throws Exception {
return value.getEname().equals("fail");
}
}).times(3);
// 将Pattern应用到事件流
PatternStream<SimpleEventTestPO> patternStream = CEP.pattern(stream, pattern);
// 提取匹配事件
patternStream.process(new PatternProcessFunction<SimpleEventTestPO, String>() {
@Override
public void processMatch(Map<String, List<SimpleEventTestPO>> map, Context context, Collector<String> collector) throws Exception {
// 提取三次登录失败事件
List<SimpleEventTestPO> list = map.get("cep-login-fail");
SimpleEventTestPO first = list.get(0);
SimpleEventTestPO second = list.get(1);
SimpleEventTestPO third = list.get(2);
StringBuilder builder = new StringBuilder();
builder
.append("uid:")
.append(first.getUserid())
.append(", 登录失败时间:")
.append(first.getEtime())
.append(",")
.append(second.getEtime())
.append(",")
.append(third.getEtime());
collector.collect(builder.toString());
}
})
.print("warning");
env.execute();
}
@DisplayName("测试基于Flink-Cep循环模式检测连续事件的用户")
@Test
void testLoginFailConsecutiveByCep() throws Exception {
// 流式计算上下文环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 生成事件流
DataStream<SimpleEventTestPO> dataStream = env.fromElements(
new SimpleEventTestPO("fail", 2000L, 1),
new SimpleEventTestPO("register", 3000L, 1),
new SimpleEventTestPO("fail", 4000L, 1),
new SimpleEventTestPO("success", 5000L, 1),
new SimpleEventTestPO("fail", 6000L, 2),
new SimpleEventTestPO("fail", 7000L, 2),
new SimpleEventTestPO("success", 8000L, 3)
);
// 根据uid分组
KeyedStream<SimpleEventTestPO, Integer> stream = getSteam(env, dataStream);
// 定义Pattern
Pattern<SimpleEventTestPO, SimpleEventTestPO> pattern =
Pattern
.<SimpleEventTestPO>begin("cep-login-fail-consecutive")
.where(new SimpleCondition<SimpleEventTestPO>() {
@Override
public boolean filter(SimpleEventTestPO value) throws Exception {
return value.getEname().equals("fail");
}
})
//times是等于阈值
.times(2)
//严格紧邻(连续事件)
.consecutive();
// 将Pattern应用到事件流
PatternStream<SimpleEventTestPO> patternStream = CEP.pattern(stream, pattern);
// 提取匹配事件
patternStream.process(new PatternProcessFunction<SimpleEventTestPO, String>() {
@Override
public void processMatch(Map<String, List<SimpleEventTestPO>> map, Context context, Collector<String> collector) throws Exception {
// 提取事件
List<SimpleEventTestPO> event = map.get("cep-login-fail-consecutive");
CepWarnTestPO warn = new CepWarnTestPO(event, "连续登录失败");
collector.collect(warn.toString());
}
})
.print("warning");
env.execute();
}
@DisplayName("测试基于Flink-Cep组合模式检测连续事件的用户")
@Test
void testLoginFailConsecutiveWithCompositeByCep() throws Exception {
// 流式计算上下文环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 生成事件流
DataStream<SimpleEventTestPO> dataStream = env.fromElements(
new SimpleEventTestPO("fail", 2000L, 1),
new SimpleEventTestPO("register", 3000L, 1),
new SimpleEventTestPO("fail", 4000L, 2),
new SimpleEventTestPO("success", 5000L, 1),
new SimpleEventTestPO("fail", 6000L, 2),
new SimpleEventTestPO("success", 7000L, 2),
new SimpleEventTestPO("success", 8000L, 3)
);
// 根据uid分组
KeyedStream<SimpleEventTestPO, Integer> stream = getSteam(env, dataStream);
// 定义Pattern
Pattern<SimpleEventTestPO, SimpleEventTestPO> pattern =
Pattern
.<SimpleEventTestPO>begin("cep-login-first")
.where(new SimpleCondition<SimpleEventTestPO>() {
@Override
public boolean filter(SimpleEventTestPO value) throws Exception {
return value.getEname().equals("fail");
}
})
//严格紧邻 (连续事件)
//.next("cep-login-second")
//松散紧邻 (非连续事件)
.followedBy("cep-login-second")
.where(new SimpleCondition<SimpleEventTestPO>() {
@Override
public boolean filter(SimpleEventTestPO value) throws Exception {
return value.getEname().equals("success");
}
})
.within(Time.seconds(10));
// 将Pattern应用到事件流
PatternStream<SimpleEventTestPO> patternStream = CEP.pattern(stream, pattern);
// 提取匹配事件
patternStream.process(new PatternProcessFunction<SimpleEventTestPO, String>() {
@Override
public void processMatch(Map<String, List<SimpleEventTestPO>> map, Context context, Collector<String> collector) throws Exception {
// 提取事件
List<SimpleEventTestPO> first = map.get("cep-login-first");
List<SimpleEventTestPO> second = map.get("cep-login-second");
CepWarnTestPO warn1 = new CepWarnTestPO(first, "登录失败");
CepWarnTestPO warn2 = new CepWarnTestPO(second, "登录成功");
collector.collect(warn1.toString());
collector.collect(warn2.toString());
}
})
.print("warning");
env.execute();
}
@DisplayName("测试使用Groovy脚本加载Pattern")
@Test
void testLoginFailByGroovy() throws Exception {
// 流式计算上下文环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 生成事件流
DataStream<SimpleEventTestPO> dataStream = env.fromElements(
new SimpleEventTestPO("fail", 2000L, 1),
new SimpleEventTestPO("fail", 3000L, 1),
new SimpleEventTestPO("fail", 4000L, 2),
new SimpleEventTestPO("fail", 5000L, 1),
new SimpleEventTestPO("fail", 7000L, 2),
new SimpleEventTestPO("fail", 8000L, 2),
new SimpleEventTestPO("success", 6000L, 2)
);
// 根据uid分组
KeyedStream<SimpleEventTestPO, Integer> stream = getSteam(env, dataStream);
Pattern<SimpleEventTestPO, SimpleEventTestPO> pattern = (Pattern<SimpleEventTestPO, SimpleEventTestPO>) GroovyUtil.groovyEval("CepPatternTest", "getPattern", null);
PatternStream<SimpleEventTestPO> patternStream = CEP.pattern(stream, pattern);
// 匹配事件提取
patternStream.process(
new PatternProcessFunction<SimpleEventTestPO, String>() {
@Override
public void processMatch(Map<String, List<SimpleEventTestPO>> map, Context context, Collector<String> collector) throws Exception {
List<SimpleEventTestPO> event = map.get("fail");
CepWarnTestPO warn = new CepWarnTestPO(event, "从Groovy加载Pattern");
collector.collect(warn.toString());
}
})
.print("warning");
env.execute();
}
}
感谢支持
更多内容,请移步《超级个体》。