Session会话和Global全局窗口
原创大约 7 分钟
SessionWindow(会话窗口)
会话窗口按活动会话对对窗口进行区分。这种窗口和前面两种都不同,它既没有固定大小,也没有固定的开始时间和结束时间,它也不可能重叠,它完全是按照计算任务的活动状态来创建窗口的。
例如,如果某个计算任务量较大,那么它会持续通过会话窗口计算元素,直到任务结束;而如果暂时没有计算任务,那么一段时间之后窗口就会关闭。
这和浏览器或其他客户端上的用户会话机制是一样的。

会话窗口的代码模式如下。
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = environment.socketTextStream("localhost", 9528);
// 使用静态间隔的 ProcessingTime 会话窗口
source
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// 使用动态间隔的 ProcessingTime 会话窗口
source
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
// 使用静态间隔的 EventTime 会话窗口
source
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// 使用动态间隔的 EventTime 会话窗口
source
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
ProcessingTime窗口
使用SessionProcessingTimeWindowsJob()
类实现单词统计,代码如下。
package itechthink.window.session;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* SessionProcessingTimeWindows 实现单词统计
* 输入数据格式:单词,次数
*
*/
public class SessionProcessingTimeWindowsJob {
public static void main(String[] args) {
// 1. 准备环境 - Environment
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
// windowGap(environment);
windowDynamicGap(environment);
// 5. 启动任务 - Execute
try {
environment.execute("SessionProcessingTimeWindowsJob");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 使用静态间隔
*
*/
public static void windowGap(StreamExecutionEnvironment environment) {
// 2. 读取数据 - Source
DataStreamSource<String> source = environment.socketTextStream("localhost", 9528);
// 3. 处理数据 - Transform
source.filter(StringUtils::isNoneBlank)
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String input) throws Exception {
String[] split = input.split(",");
return Tuple2.of(split[0].trim(), Integer.parseInt(split[1].trim()));
}
})
// 按key分组
.keyBy(x -> x.f0)
// 静态会话窗口:每10秒计算一次
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
/*
* 输入:
* a,1
* a,1
* a,1
* 10秒后输出:
* (a,3)
*/
// 增量聚合方法
.sum(1)
// 4. 输出结果 - Sink
.print().setParallelism(1);
}
/**
* 使用动态间隔
*
*/
public static void windowDynamicGap(StreamExecutionEnvironment environment) {
// 2. 读取数据 - Source
DataStreamSource<String> source = environment.socketTextStream("localhost", 9528);
// 3. 处理数据 - Transform
source.filter(StringUtils::isNoneBlank)
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String input) throws Exception {
String[] split = input.split(",");
return Tuple2.of(split[0].trim(), Integer.parseInt(split[1].trim()));
}
})
// 按key分组
.keyBy(x -> x.f0)
// 动态会话窗口:动态计算空闲
.window(ProcessingTimeSessionWindows.withDynamicGap(
new SessionWindowTimeGapExtractor<Tuple2<String, Integer>>() {
@Override
public long extract(Tuple2<String, Integer> input) {
// 根据次数计算间隔(以最后一条输入的数据为准)
return input.f1 * 1000 * 2;
}
}
))
/*
* 输入:
* a,1
* a,2
* a,3
* a,2
* a,1
* 6秒后输出:因为Flink会先按照时间字段排序,找出最大的事件时间,然后×2
* (a,9)
*/
.sum(1)
// 4. 输出结果 - Sink
.print().setParallelism(1);
}
}
EventTime窗口
SessionEventTimeWindowsJob()
类代码如下。
package itechthink.window.session;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
* SessionEventTimeWindows + Watermark
* 输入数据格式:("key", 时间戳)
*
*/
public class SessionEventTimeWindowsJob {
private static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) {
// 1. 准备环境 - Environment
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
// sessionGap(environment);
sessionDynamicGap(environment);
// 5. 启动任务 - Execute
try {
environment.execute("SessionEventTimeWindowsJob");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 静态间隔,不允许数据延迟
*
*/
private static void sessionGap(StreamExecutionEnvironment environment) {
// 2. 读取数据 - Source
DataStreamSource<Tuple2<String, Integer>> source = environment.fromElements(
// 事件1(Session1)
Tuple2.of("key", 2000),
// 事件2(Session2)
Tuple2.of("key", 9000),
// 事件3(Session1)
Tuple2.of("key", 3000),
// 事件4(Session1)
Tuple2.of("key", 4000)
);
// 3. 处理数据 - Transform
source.assignTimestampsAndWatermarks(
/*
* Duration.ZERO:不允许数据延迟
*/
WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {
// 抽取时间戳字段
@Override
public long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) {
return element.f1.longValue();
}
})
)
.keyBy(x -> x.f0)
/*
* 当EventTimeSessionWindows.withGap(Time.seconds(3))时,结果是:
* time window ==> 1970-01-01 08:00:02 ~ 1970-01-01 08:00:07
* 9000
* time window ==> 1970-01-01 08:00:09 ~ 1970-01-01 08:00:12
* 9000
* 之所以被分为两个Session,是因为:
* 1. Flink会先按照时间戳对输入数据进行排序:2000, 3000, 4000, 9000
* 2. 然后会计算数据两两之间的时间间隔是否大于设定的Gap(3秒)
* 3. 如果满足条件,则认为这两个数据属于同一个Session,否则属于不同的Session
* 4. 例如:4000 + 3000(gap间隔时间) < 9000,所以 <2000, 3000, 4000> 会被分配到同一个Session中,而 <9000> 则被单独分到另一个Session
* 5. Session1窗口范围是[08:00:02 ~ 08:00:07),因为:02的时间戳=2000,07的时间戳=4000+3000(gap间隔时间)
* 6. Session2窗口范围是[08:00:09 ~ 08:00:12),因为:09的时间戳=9000,12的时间戳=9000+3000(gap间隔时间)
*
* 当EventTimeSessionWindows.withGap(Time.seconds(5))时,结果是:
* time window ==> 1970-01-01 08:00:02 ~ 1970-01-01 08:00:14
* 18000
* 数据生成的机制和上面一样
*/
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.process(new ProcessWindowFunction<Tuple2<String, Integer>, Integer, String, TimeWindow>() {
@Override
public void process(String input,
ProcessWindowFunction<Tuple2<String, Integer>, Integer, String, TimeWindow>.Context context,
Iterable<Tuple2<String, Integer>> iterable,
Collector<Integer> collector) throws Exception {
int count = 0;
for (Tuple2<String, Integer> t : iterable) {
count += t.f1;
}
System.out.println("time window ==> " + DATE_FORMAT.format(context.window().getStart()) + " ~ " + DATE_FORMAT.format(context.window().getEnd()));
collector.collect(count);
System.out.println();
}
})
// 4. 输出数据 - Sink
.print().setParallelism(1);
}
/**
* 动态间隔,不允许数据延迟
*
*/
private static void sessionDynamicGap(StreamExecutionEnvironment environment) {
DataStreamSource<Tuple2<String, Integer>> source = environment.fromElements(
// 事件1(Session1)
Tuple2.of("key", 2000),
// 事件2(Session2)
Tuple2.of("key", 9000),
// 事件3(Session1)
Tuple2.of("key", 3000),
// 事件4(Session1)
Tuple2.of("key", 4000)
);
// 3. 处理数据 - Transform
source.assignTimestampsAndWatermarks(
/*
* Duration.ZERO:不允许数据延迟
*/
WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {
// 抽取时间戳字段
@Override
public long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) {
return element.f1.longValue();
}
})
)
.keyBy(x -> x.f0)
/*
* 当输入数据为<key, 2000>、<key, 3000>、<key, 4000>、<key, 9000>时,结果是:
* time window ==> 1970-01-01 08:00:02 ~ 1970-01-01 08:00:08
* 9000
* time window ==> 1970-01-01 08:00:09 ~ 1970-01-01 08:00:18
* 9000
* 原因是:
* 1. Flink会先按照时间戳对输入数据进行排序:2000, 3000, 4000, 9000
* 2. 由于是动态的Gap,所以Gap = 输入数据的时间戳 + 数据的时间戳间隔
* 2000:时间戳为2000,所以和下一个Session之间的Gap = 2000 + 2000 = 4000
* 3000:时间戳为3000,所以和下一个Session之间的Gap = 3000 + 3000 = 6000
* 4000:时间戳为4000,所以和下一个Session之间的Gap = 4000 + 4000 = 8000
* 9000:时间戳为9000,所以和下一个Session之间的Gap = 9000 + 9000 = 18000
* 3. (2000 + 2000)4000 > 3000,所以 3000 被分配到 <2000> 所在的Session中
* (3000 + 3000)6000 > 4000,所以 4000 被分配到 <2000, 3000> 所在的Session中
* (4000 + 4000)8000 < 9000,所以 9000 被单独分配另一个Session中
* 4. Session1窗口范围是[08:00:02 ~ 08:00:08),因为:02的时间戳=2000,08的时间戳=4000 + 4000(也就是时间戳最大的那条记录的时间戳的两倍)
* 5. Session2窗口范围是[08:00:09 ~ 08:00:18),因为:09的时间戳=9000,18的时间戳=9000 + 9000(也就是时间戳最大的那条记录的时间戳的两倍)
*/
.window(EventTimeSessionWindows.withDynamicGap(
new SessionWindowTimeGapExtractor<Tuple2<String, Integer>>() {
@Override
public long extract(Tuple2<String, Integer> input) {
// 根据次数计算间隔(以最后一条输入的数据为准)
return input.f1;
}
}
)
)
.process(new ProcessWindowFunction<Tuple2<String, Integer>, Integer, String, TimeWindow>() {
@Override
public void process(String input,
ProcessWindowFunction<Tuple2<String, Integer>, Integer, String, TimeWindow>.Context context,
Iterable<Tuple2<String, Integer>> iterable,
Collector<Integer> collector) throws Exception {
int count = 0;
for (Tuple2<String, Integer> t : iterable) {
count += t.f1;
}
System.out.println("time window ==> " + DATE_FORMAT.format(context.window().getStart()) + " ~ " + DATE_FORMAT.format(context.window().getEnd()));
collector.collect(count);
System.out.println();
}
})
// 4. 输出数据 - Sink
.print().setParallelism(1);
}
}
sessionGap()
和sessionDynamicGap()
的执行过程可以用两张图来表示。
- sessionGap()执行过程

- sessionDynamicGap()执行过程

GlobalWindow(全局窗口)

当去掉了window()
方法中所有关于窗口时间以及会话机制的修饰以后,它就变成全局窗口了。
全局窗口的代码模式如下。
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = environment.socketTextStream("localhost", 9528);
source
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
感谢支持
更多内容,请移步《超级个体》。