Flink指标采样计算
原创大约 17 分钟
Redis中不仅保存着Clickhouse计算出来的预聚合
数据,也保存Flink计算结果,例如,每5分钟统计一次近1小时的登录数据
。
由于整体实现较为复杂,此处无法完全展示,大致分为三个部分。
指标统计相关
:聚合计算框架、滑动窗口生成、执行聚合计算等类。
实体对象相关
KafkaMessagePO
package com.itechthink.risk.model;
/**
* Kafka消息数据PO对象
*
*/
public class KafkaMessagePO {
/**
* 事件id
*/
private String eid;
/**
* 事件类型
*/
private String etype;
/**
* 事件名称
*/
private String ename;
/**
* 事件接入来源
*/
private String esource;
/**
* 事件发生时间
*/
private String etime;
/**
* 用户id
*/
private Long userid;
/**
* 事件动作id
*/
private Integer aid;
/**
* 事件动作名称
*/
private String aname;
/**
* 事件目标id
*/
private Integer tid;
/**
* 事件目标名称
*/
private String tname;
/**
* 事件上下文
*/
private EventContext context;
/**
* kafka offset
*/
private long offset;
/**
* kafka partition
*/
private int partition;
}
ContextPO
package com.itechthink.risk.model;
/**
* 行为事件上下文PO对象
*
*/
public class ContextPO {
/**
* 设备PO
*/
private DevicePO device;
/**
* 用户信息PO
*/
private ProfilePO profile;
/**
* 商品信息PO
*/
private ProductPO product;
}
工具类相关
常量接口
package com.itechthink.risk.flink.utils;
/**
* 常量工具类
*
*/
public interface Constants {
/**
* Flink相关配置
*
*/
// 触发checkpoint时间间隔
public static final String FLINK_CHECKPOINT_INTERVAL = "flink.checkpoint.interval";
// checkpoint超时
public static final String FLINK_CHECKPOINT_TIMEOUT = "flink.checkpoint.timeout";
// checkpoint最小时间间隔
public static final String FLINK_CHECKPOINT_MINPAUSE = "flink.checkpoint.minPause";
// checkpoint允许失败次数
public static final String FLINK_CHECKPOINT_FAILURENUMBER = "flink.checkpoint.failureNumber";
// 同一时间checkpoint数量
public static final String FLINK_CHECKPOINT_MAXCONCURRENT = "flink.checkpoint.maxConcurrent";
// 并行度
public static final String FLINK_PARALLELISM = "flink.parallelism";
// 数据延迟的最大时间
public static final String FLINK_MAXOUTOFORDERNESS = "flink.maxOutOfOrderness";
// 根配置文件
public static final String FLINK_ROOT_FILE = "flink.properties";
// 不同环境配置文件
public static final String FLINK_ENV_FILE = "flink-%s.properties";
// groovy脚本路径
public static final String GROOVY_SCRIPTS = "groovy.scripts";
// 当前环境
public static final String FLINK_ENV_ACTIVE = "flink.env.active";
/**
* Kafka配置
*
*/
// brokers
public static final String KAFKA_BROKERS = "kafka.brokers";
// topic
public static final String KAFKA_TOPIC = "kafka.topic";
// group
public static final String KAFKA_GROUP = "kafka.group";
/**
* MySQL配置
*
*/
// url
public static final String MYSQL_URL = "mysql.url";
// username
public static final String MYSQL_USERNAME = "mysql.username";
// password
public static final String MYSQL_PASSWD = "mysql.password";
// port
public static final String MYSQL_PORT = "mysql.port";
// host
public static final String MYSQL_HOST = "mysql.host";
// driver
public static final String MYSQL_DRIVER = "mysql.driver";
// Flink CDC监听的表
public static final String FLINK_CDC_MYSQL_TABLE_LIST_1 = "flink.cdc.mysql.table.list.1";
// Flink CDC监听的表所在的库
public static final String FLINK_CDC_MYSQL_DATABASE = "flink.cdc.mysql.database";
/**
* 包完整路径
*
*/
// 累加器计算方法模型包路径
public static final String PACKAGE_AGGREGATE_ACC = "com.itechthink.risk.flink.job.aggregate.acc";
}
读取配置信息
package com.itechthink.risk.flink.utils;
import com.itechthink.risk.commons.exception.custom.FlinkPropertiesException;
import com.itechthink.risk.commons.exception.enums.FlinkPropertiesExceptionInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
/**
* 配置信息读取类
*
* 1. Flink读取参数的对象:
* a. Commons-cli: Apache提供的,需要引入依赖
* b. ParameterTool:Flink内置
* ParameterTool比Commons-cli 使用上简便
* ParameterTool能避免Jar包的依赖冲突
*
* 2. Flink读取对应环境的配置文件步骤:
* a. 获取环境配置变量
* b. 根据环境配置变量,读取对应的环境配置文件
*
* 3. ParameterTool获取参数的3种方式:
* a. fromPropertiesFile配置文件
* b. fromArgs程序启动参数
* - 或者 -- 开头 空格分隔,如:-name test --age 21
* c. fromSystemProperties系统环境变量,包括程序 -D启动的变量
* 内部调用的是Java提供的System.getProperties()
*
* 4. ParameterTool获取参数优先级
* 可通过mergeWith()设置优先级,但会覆盖前面的同名变量
*
* 5. ParameterTool是可序列化的,所以可以将它作为参数传递给函数,然后在函数内部使用ParameterTool获取参数变量
* 这样在Flink Job的任何地方都可以通过ParameterTool获取到配置值
*
* 6. 可以将ParameterTool注册为global变量:env.getConfig().setGlobalJobParameter()
* 这样,在上下文中就能获取ParameterTool
* (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
*
*/
public class ParameterUtil implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 默认配置文件
*
*/
private static final String DEFAULT_CONFIG = Constants.FLINK_ROOT_FILE;
/**
* 带环境配置文件
*
*/
private static final String FLINK_ENV_FILE = Constants.FLINK_ENV_FILE;
/**
* 环境变量
*
*/
private static final String ENV_ACTIVE = Constants.FLINK_ENV_ACTIVE;
/**
* 配置文件 + 启动参数 + 系统环境变量生成ParameterTool
*
*/
public static ParameterTool getParameters(final String[] args) {
/*
* 7. Java读取资源的方式:
* a. Class.getResourceAsStream(Path): Path必须以 “/”,表示从ClassPath的根路径读取资源
* b. Class.getClassLoader().getResourceAsStream(Path):Path无须以 “/”,默认从ClassPath的根路径读取资源
* 推荐使用第2种,也就是类加载器的方式获取静态资源文件,
*
*/
InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);
try {
// 读取根配置文件
ParameterTool defaultPropertiesFile = ParameterTool.fromPropertiesFile(inputStream);
// 获取环境参数
String envActive = getEnvActiveValue(defaultPropertiesFile);
// 读取真正的配置环境 (推荐使用 Thread.currentThread() 读取配置文件)
return ParameterTool
// ParameterTool读取变量优先级 系统环境变量>启动参数变量>配置文件变量
// 从配置文件获取配置
.fromPropertiesFile(
// 当前线程
Thread.currentThread()
// 返回该线程的上下文信息, 获取类加载器
.getContextClassLoader()
.getResourceAsStream(envActive))
// 从启动参数中获取配置
.mergeWith(ParameterTool.fromArgs(args))
// 从系统环境变量获取配置
.mergeWith(ParameterTool.fromSystemProperties());
} catch (IOException e) {
throw new RuntimeException("");
}
}
/**
* 配置文件 + 系统环境变量生成ParameterTool
*
*/
public static ParameterTool getParameters() {
InputStream inputStream = ParameterUtil.class.getClassLoader().getResourceAsStream(DEFAULT_CONFIG);
try {
ParameterTool defaultPropertiesFile = ParameterTool.fromPropertiesFile(inputStream);
// 获取环境参数
String envActive = getEnvActiveValue(defaultPropertiesFile);
// 读取真正的配置环境(推荐使用Thread.currentThread()读取配置文件)
return ParameterTool
// ParameterTool读取变量优先级:系统环境变量 > 启动参数变量 > 配置文件变量
// 从配置文件获取配置
.fromPropertiesFile(
// 当前线程
Thread.currentThread()
// 返回该线程的上下文信息, 获取类加载器
.getContextClassLoader()
.getResourceAsStream(envActive))
// 从系统环境变量获取配置
.mergeWith(ParameterTool.fromSystemProperties());
} catch (Exception e) {
throw new FlinkPropertiesException(FlinkPropertiesExceptionInfo.PROPERTIES_NULL);
}
}
/**
* 获取环境配置变量
*
*/
private static String getEnvActiveValue(ParameterTool defaultPropertiesFile) {
// 选择参数环境
String envActive = null;
if (defaultPropertiesFile.has(ENV_ACTIVE)) {
envActive = String.format(FLINK_ENV_FILE, defaultPropertiesFile.get(ENV_ACTIVE));
}
return envActive;
}
/**
* 从配置文件参数配置流式计算的上下文环境
*
*/
public static void envWithConfig(StreamExecutionEnvironment env, ParameterTool parameterTool) {
/*
* 1. checkpoint时间不要设置太短,这里的时间包括了超时时间
* 2. 如果设置了周期性checkpoint,那么上一个周期的checkpoint没完成时,下一个周期的checkpoint是不会开始的
* 3. 若checkpoint的持续时间超过了超时时间会出现排队,过多的checkpoint排队会耗费资源
* 4. 为了解决checkpoint排队堆积,需要优化checkpoint的完成效率
*
*/
// 每60秒触发checkpoint
env.enableCheckpointing(parameterTool.getInt(Constants.FLINK_CHECKPOINT_INTERVAL));
CheckpointConfig ck = env.getCheckpointConfig();
// checkpoint必须在60秒内结束,否则被丢弃
ck.setCheckpointTimeout(parameterTool.getInt(Constants.FLINK_CHECKPOINT_TIMEOUT));
//checkpoint间最小间隔 30秒 (指定了这个值, setMaxConcurrentCheckpoints自动默认为1)
ck.setMinPauseBetweenCheckpoints(parameterTool.getInt(Constants.FLINK_CHECKPOINT_MINPAUSE));
// checkpoint语义设置为精确一次(EXACTLY_ONCE)
ck.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 最多允许checkpoint失败3次
ck.setTolerableCheckpointFailureNumber(parameterTool.getInt(Constants.FLINK_CHECKPOINT_FAILURENUMBER));
// 同一时间只允许一个 checkpoint 进行
ck.setMaxConcurrentCheckpoints(parameterTool.getInt(Constants.FLINK_CHECKPOINT_MAXCONCURRENT));
// 设置State存储
env.setStateBackend(new HashMapStateBackend());
// 并行度设置
env.setParallelism(parameterTool.getInt(Constants.FLINK_PARALLELISM));
}
}
Kafka反序列化
/**
* 自定义Kafka反序列化类
* 1. 若使用setValueOnlyDeserializer(),那么方法内的参数必须是实现DeserializationSchema接口的对象
* 2. 若使用setDeserializer(),那么方法内的参数必须是实现KafkaDeserializationSchema接口的对象
*
*/
public class RiskDeserializationSchema implements KafkaDeserializationSchema<KafkaMessagePO> {
private static final String ENCODEING = "UTF8";
/**
* 判断当前位置是否到达数据流的末尾
*
*/
@Override
public boolean isEndOfStream(KafkaMessagePO o) {
return false;
}
/**
* 自定义反序列化的主要逻辑
*/
@Override
public KafkaMessagePO deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
KafkaMessagePO kafkaMessagePO = null;
if (consumerRecord != null) {
String value = new String(consumerRecord.value(), ENCODEING);
long offset = consumerRecord.offset();
int partition = consumerRecord.partition();
kafkaMessagePO = JsonUtil.jsonStr2Obj(value, KafkaMessagePO.class);
// 携带上offset和partition
kafkaMessagePO.setOffset(offset);
kafkaMessagePO.setPartition(partition);
}
return kafkaMessagePO;
}
/**
* 指定反序列之后的数据类型
*
*/
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(KafkaMessagePO.class);
}
}
Kafka消息转换
package com.itechthink.risk.flink.job.map;
import com.itechthink.risk.model.EventPO;
import com.itechthink.risk.model.KafkaMessagePO;
import org.apache.flink.api.common.functions.MapFunction;
/**
* 从Kafka JSON消息中提取项目需要的字段信息,即转换为EventPO
*
*/
public class KafkaETL implements MapFunction<KafkaMessagePO, EventPO> {
@Override
public EventPO map(KafkaMessagePO kafkaMessagePO) throws Exception {
EventPO eventPo = new EventPO(
kafkaMessagePO.getUserid(),
kafkaMessagePO.getEtype(),
kafkaMessagePO.getEname(),
kafkaMessagePO.getEtime(),
kafkaMessagePO.getTname(),
kafkaMessagePO.getContext()
);
// 携带上Kafka消息的offset和partition信息
eventPo.setOffset(kafkaMessagePO.getOffset());
eventPo.setPartition(kafkaMessagePO.getPartition());
return eventPo;
}
}
Kafka读写工具
package com.itechthink.risk.flink.utils;
import com.itechthink.risk.flink.job.map.KafkaETL;
import com.itechthink.risk.flink.kafka.RiskDeserializationSchema;
import com.itechthink.risk.model.EventPO;
import com.itechthink.risk.model.KafkaMessagePO;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Flink读写Kafka工具类
*
*/
public class KafkaUtil {
// 流式计算上下文环境
public static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
private static KafkaSource<KafkaMessagePO> KAFKA_SOURCE = null;
private static ParameterTool parameterTool = null;
/**
* 重载initEnv方法
*
*/
private static void initEnv(String[] args) {
// ParameterTool注册为global
parameterTool = ParameterUtil.getParameters(args);
env.getConfig().setGlobalJobParameters(parameterTool);
// 配置上下文环境
ParameterUtil.envWithConfig(env, parameterTool);
}
/**
* 重载initEnv方法
*
*/
private static void initEnv() {
// ParameterTool注册为global
env.getConfig().setGlobalJobParameters(parameterTool);
// 配置上下文环境
ParameterUtil.envWithConfig(env, parameterTool);
}
/**
* KafkaSource
*
*/
private static void kafkaSourceBuilder() {
String brokers = parameterTool.get(Constants.KAFKA_BROKERS);
String topic = parameterTool.get(Constants.KAFKA_TOPIC);
String group = parameterTool.get(Constants.KAFKA_GROUP);
/*
* 1.
* a. Flink-Kafka-Connector基于Flink CheckPoint的容错机制
* b. Flink-Kafka-Connector提供了Flink到Kafka的端到端的精确一次的语义(Exactly-Once)
*
* Flink的精确一次的语义(Exactly-Once):
* 即使在运行的过程了发生了故障,数据都不会丢失,数据也不会被重复处理
* Flink 的精确一次的语义只适用于Flink内部的数据流转
*
* 2.
* Flink 1.14 之前Flink-Kafka-Connector使用的主要是FlinkKafkaConsumer类
* Flink 1.15 之后就去掉了FlinkKafkaConsumer类
*
*/
KAFKA_SOURCE = KafkaSource.<KafkaMessagePO>builder()
// bootstrap
.setBootstrapServers(brokers)
// 主题
.setTopics(topic)
// groupid
.setGroupId(group)
// 偏移量
.setStartingOffsets(OffsetsInitializer.earliest())
/*
* 3. Kafka存放的数据是二进制,读取Kafka的数据需要反序列化
*
* 4. kafkaSourceBuilder对象提供了2个反序列化的方法:
* a. setDeserializer(): 完整的反序列化ConsumerRecord对象
* b. 反序列化ConsumerRecord对象的value值
*
* 5. SimpleStringScheme类实现了DeserializationSchema接口,它只是将反序列后的数据转化为DataStream<String>
*
*/
.setDeserializer(KafkaRecordDeserializationSchema.of(new RiskDeserializationSchema()))
//.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
}
/**
* 添加Source
*
*/
private static DataStream<EventPO> makeEventStream() {
/*
* 6. addSource和fromSource的区别是:
* addSource需要传入的参数是SourceFunction对象,对组件的连接逻辑需要自己实现
* fromSource需要传入的参数是Source对象,Flink已经封装好了简易的连接逻辑
*
*/
return env
.fromSource(
KAFKA_SOURCE,
WatermarkStrategy.noWatermarks(),
"Kafka Source")
// 将Kafka消息数据转换为用户事件行为POJO对象
.map(new KafkaETL());
}
/**
* 生成行为事件流(重载read方法)
*
*/
public static DataStream<EventPO> read(String[] args) {
// 初始化流式计算上下文环境
initEnv(args);
// 初始化KafkaSource
kafkaSourceBuilder();
// 返回类型为EventPO的事件流
return makeEventStream();
}
/**
* 生成行为事件流(重载read方法)
*
*/
public static DataStream<EventPO> read(ParameterTool parameter) {
// 初始化流式计算上下文环境
parameterTool = parameter;
initEnv();
// 初始化KafkaSource
kafkaSourceBuilder();
// 返回类型为EventPO的事件流
return makeEventStream();
}
}
组装Redis Key
package com.itechthink.risk.flink.job.map;
import com.itechthink.risk.flink.utils.RedisKeyUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
/**
* 组装Redis Key:将输出类型转换为Tuple2<String,String>
*
*/
public class MetricMapForRedisKey<T> implements MapFunction<T, Tuple2<String, String>> {
@Override
public Tuple2<String, String> map(T in) throws Exception {
String key = RedisKeyUtil.redisKeyFormatForMetric(String.valueOf(in));
Tuple2<String, String> tuple2 = new Tuple2<>();
// tuple2.f0 = key;
// tuple2.f1 = String.valueOf(in.f1);
return tuple2;
}
}
package com.itechthink.risk.flink.utils;
/**
* Redis Key组装
*
*/
public class RedisKeyUtil {
public static String redisKeyFormatForMetric(String keyby) {
// Redis Key = 指标id:分组:指标维度:计算方式:时间戳
String key = "1:" + keyby + ":1ml:sum:1704074160000";
// Redis Key键名大写
return key.toUpperCase();
}
}
指标统计相关
聚合计算框架
package com.itechthink.risk.flink.job.task;
import com.itechthink.risk.flink.job.map.MetricMapForRedisKey;
import com.itechthink.risk.flink.utils.KafkaUtil;
import com.itechthink.risk.flink.utils.ParameterUtil;
import com.itechthink.risk.flink.utils.RedisWriteUtil;
import com.itechthink.risk.model.EventPO;
import com.imooc.RiskCtrlSys.utils.date.DateUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
import java.time.LocalDateTime;
/**
* 聚合计算框架类
* 1. Flink消费Kafka,并将Kafka中的JSON数据转换为POJO对象
* 3. 用POJO对象的时间戳生成水印watermark
* 5. 通过keyBy()对事件数据进行分流
* 6. 通过窗口函数进行聚合计算
* 7. 通过State保存中间结果
*
*/
public class MetricLoginAction {
public static void main(String[] args) throws Exception {
// 流计算上下文环境
StreamExecutionEnvironment env = KafkaUtil.env;
// 多久执行一次Checkpoint
env.enableCheckpointing(60000);
// 每次Checkpoint要在1分钟内完成,否则就丢弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
// Checkpoint的语义设置为精确一次(EXACTLY_ONCE)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Checkpoint允许失败的次数
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
// 同一时间只有1个Checkpoint在运行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
/*
* Flink 1.12之前,设置流计算的时间类型是通过以下方法
* setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
* Flink 1.12之后
* setStreamTimeCharacteristic()被抛弃
* 流计算的时间类型默认为事件时间类型(EventTime)
*
* 1. Checkpoint机制开启之后,State会随着Checkpoint作持久化的存储
* State的存储位置通过StateBackend决定
*
* 2. StateBackend定义状态是如何存储的
* MemoryStateBackend 将工作状态(Task State)存储在TaskManager,将检查点(Job State)存储在JobManager
* FsStateBackend 将工作状态(Task State)存储在TaskManager,将检查点(Job State)存储在文件系统中
* RocksDBStateBackend 将工作状态(Task State存储在RocksDB中,将检查点存储在文件系统中(类似FsStateBackend)
*
* 3. Flink内置了以下这些开箱即用的StateBackend:
* HashMapStateBackend(默认,MemoryStateBackend, FsStateBackend的实现)
* EmbeddedRocksDBStateBackend(RocksDBStateBackend的实现)
*
*/
env.setStateBackend(new HashMapStateBackend());
/*
* 4. Flink支持不同级别的并行度设置
* 1). 配置文件flink-conf.yaml
* 2). env
* 3). 算子级别:每个算子都可以单独设置并行度, 1个job可以包含不同的并行度
* 4). Client级别:任务提交的命令行进行设置
* 不同级别设置的并行度的优先级
* 算子级别 > env级别 > Client级别 > 配置文件级别
*
* 5. 并行度的数值是不是越大越好?
* 并行度的数值不是越大越好,要合理
* 1). Job会划分为多个Task
* 这些Task会被分配到TaskManager运行
* TaskManager运行Task是每个Task,单独一个线程
* 会造成线程切换的开销, 引起效率的低下
*
* 2). 并行度和TaskSlot有关,TaskSlot是逻辑概念,并不是真正的物理存在
* 假设一台TaskManager设置了3个TaskSlot
* 每个TaskSlot在同一时间可以处理一个Task
* 一台TaskManager在同一时间可以处理3个Task
* TaskSlot对不同task的处理提供了资源隔离(内存的隔离)
*
* 3). TaskSlot数量在哪里设置?
* a. flink-conf.yaml
* b. 提交任务的命令行进行设置
*
* 4). TaskManager的TaskSlot的数量决定了TaskManager的并行度
*
* 假如有3台TaskManager
* 每台的TaskManager设置3个TaskSlot
* 3台TaskManager一共有3*3=9个TaskSlot
* 3台TaskManager在同一时间可以处理9个Task
* 如果并行度设置为1
* 只用了一个TaskSlot,有8个TaskSlot空闲
*
*/
// 3台TaskManager,每台的TaskManager设置3个TaskSlot
// 合理的并行度应该是6~9
env.setParallelism(6);
// 将ParameterTool注册为global
ParameterTool parameterTool = ParameterUtil.getParameters(args);
env.getConfig().setGlobalJobParameters(parameterTool);
// 加载Kafka数据源
DataStream<EventPO> eventStream = KafkaUtil.read(parameterTool);
/*
* 6. 使用的流计算的时间类型是事件时间类型(EventTime)
* Flink必须要知道事件的时间戳字段
* 这就要求事件流必须带有时间戳字段
* Flink通过TimestampAssigner API来提取事件流携带的时间字段
*
*/
// 提取EventPO对象的时间字段
SerializableTimestampAssigner<EventPO> serializableTimestampAssigner = new SerializableTimestampAssigner<EventPO>() {
@Override
public long extractTimestamp(EventPO eventPO, long l) {
// 提取时间字段, 并转换时间戳,时间戳是毫秒
LocalDateTime localDateTime = DateUtil.convertStr2LocalDateTime(eventPO.getEtime());
return DateUtil.convertLocalDateTime2Timestamp(localDateTime);
}
};
// 生成水印
// 数据延迟的最大时间10秒(要转换为毫秒)
long maxOutOfOrderness = 10 * 1000L;
/*
* Flink 1.12之后建议使用assignTimestampsAndWatermarks(WatermarkStrategy)的方式生成watermark
* WatermarkStrategy需要含有TimestampAssigner 和 WatermarkGenerator
*
*/
DataStream<EventPO> watermarks = eventStream.assignTimestampsAndWatermarks(WatermarkStrategy
/*
* 7. watermark策略
* 固定乱序长度策略(forBoundedOutOfOrderness)
* 单调递增策略(forMonotonousTimestamps)
* 不生成策略(noWatermarks)
*
* forBoundedOutOfOrderness周期性生成水印,可更好处理延迟数据
*
*/
// 设置WaterMark的生成策略
.<EventPO>forBoundedOutOfOrderness(Duration.ofMillis(maxOutOfOrderness))
// 将事件数据的事件时间提取出来
.withTimestampAssigner(serializableTimestampAssigner)
);
// Flink每5分钟计算用户最近1小时的登录次数
DataStream<Tuple2<Integer, Integer>> ac = LoginFreqHourTask.process(watermarks);
/*
* 写入Redis
* 因为这里只计算了近1小时的登录次数,如果需要近2小时的登录次数呢?近3小时的呢?
* 所以需要保存计算结果,将2个最近1小时的登录次数相加,就可以得到近2小时的登录次数,依此类推
* 要做到这一步关键的就是Redis key的设计,这个key必须能够快速的定位到“最近1小时”的登录次数
* 所以Redis key的格式为:指标id + 分组 + 指标维度 + 计算方式 + 时间戳
*
*/
// 组装Redis Key
DataStream<Tuple2<String, String>> redisKeyStream = ac.map(new MetricMapForRedisKey<Tuple2<Integer, Integer>>());
// 写入Redis
RedisWriteUtil.writeByBahirWithString(redisKeyStream);
env.execute();
}
}
滑动窗口生成
生成用于聚合计算的滑动窗口,可以通过滑动窗口灵活定义窗口大小和步长。
这种将窗口生成
与执行计算
的职责相分离的做法,也符合单一职责原则。
package com.itechthink.risk.flink.job.task;
import com.itechthink.risk.flink.utils.EventConstantUtil;
import com.itechthink.risk.model.EventPO;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* 生成聚合计算的滑动窗口
* 1. 过滤出登录的行为数据(filter)
* 2. 根据用户id进行分组统计(keyBy)
* 3. 设置窗口的类型,大小,步长值
* 4. 对窗口的数据进行聚合计算
*
*/
public class LoginFreqHourTask {
public static DataStream<Tuple2<Integer, Integer>> process(DataStream<EventPO> input) {
/*
* 过滤出登录行为数据
* 窗口类型:滑动窗口
* 窗口大小:1小时
* 滑动步长:5分钟,[8:00 - 9:00), [8:05 - 9:05) .... 登录次数
* 窗口类型: 滑动窗口
*
*/
DataStream<EventPO> filterDataStream = input.filter(new FilterFunction<EventPO>() {
@Override
public boolean filter(EventPO eventPO) throws Exception {
if (eventPO.getEname().equals(EventConstantUtil.LOGIN_SUCCESS)) {
return true;
}
return false;
}
});
// 按用户id分组
KeyedStream<EventPO, Integer> keyedStream = filterDataStream.keyBy(new KeySelector<EventPO, Integer>() {
@Override
public Integer getKey(EventPO eventPO) throws Exception {
return eventPO.getUserid();
}
});
/*
* Flink窗口聚合函数分为2类:
* a. 增量聚合函数:ReduceFunction, AggregateFunction
* b. 全量聚合函数: ProcessWindowFunction, sum, max, min
* 增量聚合函数性能高,不需要缓存数据,基于中间状态进行计算
* 全量聚合函数性能低,需要缓存数据,基于进入窗口的全部数据进行计算
* 例如对窗口数据进行排序取TopN就需要用到ProcessWindowFunction
* ProcessWindowFunction再结合ReduceFunction或者AggregateFunction
* 作增量计算
*
*/
// 窗口聚合计算
// 生成滑动窗口, 大小(1小时), 步长(5分钟)
SingleOutputStreamOperator<Tuple2<Integer, Integer>> ac = keyedStream
.window(SlidingEventTimeWindows.of(
// Time导入的包是 org.apache.flink.streaming.api.windowing.time
Time.seconds(3600),
Time.seconds(300))
)
.aggregate(new LoginFreqHourAggFunction());
// 返回Tuple2<用户id, 最近1小时内的登录次数>
return ac;
}
}
聚合计算执行
真正实现计算用户登录频率
的聚合计算类,它与滑动窗口无关,只管按照需求做计算。
package com.itechthink.risk.flink.job.task;
import com.itechthink.risk.model.EventPO;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
/**
* 真正实现计算用户登录频率的聚合计算类
* AggregateFunction需要指定3个泛型:<In, ACC, OUT>
* In: 输入数据类型
* ACC: 累加器类型
* OUT: 输出结果类型
*
*/
public class LoginFreqHourAggFunction implements AggregateFunction<EventPO, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
/**
* 创建累加器,并赋予初始值
* 累加器数据类型Tuple2:f0是uid, f1登录次数
*
* 同时,在这里读取Redis中存储的指标采样数据,并作为初始值
* 这里其实可以用Tuple5<Integer, Integer, Integer, Integer, Integer>
* 1). 第一个参数是指标id
* 2). 第二个参数是分组
* 3). 第三个参数是指标维度
* 4). 第四个参数是计算方式
* 5). 第五个参数是时间戳
* 其实这里还需要将窗口大小和步长传进来,方便后续计算
*/
@Override
public Tuple2<Integer, Integer> createAccumulator() {
/*
* 拼装key值:指标id:分组:指标维度:计算方式:时间戳,例如:"1:" + f0 + ":1ml:sum:1704074160000";
* 通过当前时间计算出需要读取Redis中的哪几个key,例如,当前时间是2024-01-01 10:00:00,而统计近1小时的用户登录数据,就可以这样做:
* 1. 先判断之前有没有保存过近1小时的登录数据
* 2. 再读取指标采样数据
* 1). 2024-01-01 10:00:00 - 1 * 3600 = 2024-01-01 09:00:00 其时间戳==> 1704070800000
* 2). 将此时间戳按分钟循环累加60次,将每次的累加结果保存在数组中,例如
* timestamp[0] = 1704070800000
* timestamp[1] = 1704070860000
* timestamp[2] = 1704070920000
* ...
* timestamp[59] = 1704074340000
*
* 3). 循环拼装key,例如:"1:" + f0 + ":1ml:sum:" + timestamp[i]
* 4). 通过这个key循环从Redis中读取数据,并进行累加
* 5). 将累加值n赋给初始值,return new Tuple2<>(0, n)
*/
return new Tuple2<>(0, 0);
}
/**
* 将输入数据添加到累加器,并返回更新后的累加器
*/
@Override
public Tuple2<Integer, Integer> add(EventPO input, Tuple2<Integer, Integer> acc) {
acc.f0 = input.getUserid();
acc.f1 += 1;
return acc;
}
/**
* 从累加器里提取聚合结果
*
*/
@Override
public Tuple2<Integer, Integer> getResult(Tuple2<Integer, Integer> acc) {
return acc;
}
/**
* 将两个累加器合并为一个新的累加器
*
*/
@Override
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) {
// 不实现
return null;
}
}
只要熟悉Flink并明白了指标采样的算法机制,上面的代码都很好实现。
感谢支持
更多内容,请移步《超级个体》。