SlidingWindow滑动窗口
原创大约 16 分钟
SlidingWindow(滑动窗口)
滑动窗口
除了有一个固定的大小,还有一个定时滑动
的步长。例如,统计10分内的用户注册人数,每2分钟统计一次。那么Flink
就会每2分钟统计一次过去10分钟内的用户注册量。
这就像在一小步一小步地往前滑动一样,这也是滑动
窗口名字的由来。
如果同一个计算任务由多个滑动窗口组成,那么这些窗口在时间区间上会出现重叠。

滑动窗口的代码模式如下。
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = environment.socketTextStream("localhost", 9528);
// 滑动的 ProcessingTime 窗口
source
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// 滑动的 EventTime 窗口
source
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// 按天滑动 ProcessingTime 窗口,偏移量为-8小时
source
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
ProcessingTime窗口
使用SlidingProcessingTimeWindowsJob()
类实现单词统计,代码如下。
package itechthink.window.sliding;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* SlidingProcessingTimeWindows 实现单词统计
* 输入数据格式:单词,次数
*
* window():用于处理流中的数据,区分流中的数据是属于哪个窗口
* 使用window()的方法可以在多个终端中输出结果,但只会在最开始启动的那个终端中输出
* 如果第一个终端关闭,那么会在第二个启动的终端输出,依此类推
* windowAll():用于处理所有数据,不区分流中的数据是属于哪个窗口
* 使用windowAll()的方法只会在一个终端中输出结果
*
*/
public class SlidingProcessingTimeWindowsJob {
private static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) {
// 1. 准备环境 - Environment
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
/*
* 另外一个方法 windowAll() 和 window() 几乎没有差别
* 所以不再保留,同时删除 windowProcess()
*
*/
window(environment);
// windowReduce(environment);
// 5. 启动任务 - Execute
try {
environment.execute("SlidingProcessingTimeWindowsJob");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* window(),无偏移量
*
*/
public static void window(StreamExecutionEnvironment environment) {
// 2. 读取数据 - Source
DataStreamSource<String> source = environment.socketTextStream("localhost", 9528);
// 3. 处理数据 - Transform
source.filter(StringUtils::isNoneBlank)
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String input) throws Exception {
String[] split = input.split(",");
return Tuple2.of(split[0].trim(), Integer.parseInt(split[1].trim()));
}
})
// 按key分组
.keyBy(x -> x.f0)
// 窗口大小:5秒,滑动步长:1秒
.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
/*
* 方法可以在多个终端中输出结果,但只会在最开始启动的那个终端中输出
*
* 1. 先输入:
* a,1
* a,1
* a,1
* <这里带回车>
* 2. 然后(尽量以匀速)每秒持续输入:
* a,1
* 3. 持续输入一段时间后停止输入
* 当窗口结束时,只在其中某个终端输出结果如下:
* (a,3)
* (a,4)
* (a,5)
* (a,6)
* (a,7)
* (a,5) <--- 第一个5秒窗口结束时,之前输入的3个(a,1)也随之消失,而后续又输入了1个(a,1),所以结果 => 7 - 3 + 1 = 5
* (a,5) <--- 如果保持匀速输入,那么数据产生和消失的结果保持平衡,就会维持在5不变
* ...
* (a,5)
* (a,4) <--- 当停止输入后,数据不再产生,而每秒都会消失1个(a,1),所以结果就不断递减
* (a,3)
* (a,2)
* (a,1) <--- 结果虽然不断递减,但不会变成(a,0),而是停留在(a,1),也就是最后一次的输入结果上
*/
.sum(1)
// 4. 输出结果 - Sink
.print().setParallelism(1);
}
/**
* window() + 自定义窗口函数ReduceFunction(),无偏移量
*
*/
public static void windowReduce(StreamExecutionEnvironment environment) {
// 2. 读取数据 - Source
DataStreamSource<String> source = environment.socketTextStream("localhost", 9528);
// 3. 处理数据 - Transform
source.filter(StringUtils::isNoneBlank)
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String input) throws Exception {
String[] split = input.split(",");
return Tuple2.of(split[0].trim(), Integer.parseInt(split[1].trim()));
}
})
// 按key分组
.keyBy(x -> x.f0)
// 窗口大小:5秒,滑动步长:1秒
.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
/*
* 方法可以在多个终端中输出结果,但只会在最开始启动的那个终端中输出
*
* 输入方式与结果和前一个方法一样
*/
// 增量聚合方法
.reduce(
// 增量聚合函数
new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
System.out.println("ReduceFunction ==> " + t1.f0 + " ==> " + (t1.f1 + t2.f1));
return Tuple2.of(t1.f0, t1.f1 + t2.f1);
}
},
// 这里不调用全量函数,仅观察输出结果
new ProcessWindowFunction<Tuple2<String, Integer>, Object, String, TimeWindow>() {
@Override
public void process(String input,
ProcessWindowFunction<Tuple2<String, Integer>, Object, String, TimeWindow>.Context context,
Iterable<Tuple2<String, Integer>> iterable,
Collector<Object> collector) throws Exception {
System.out.println("currentWatermark ==> " + DATE_FORMAT.format(context.currentWatermark()));
collector.collect("[" + DATE_FORMAT.format(context.window().getStart()) + ", " + DATE_FORMAT.format(context.window().getEnd()) + ") ");
}
}
)
// 4. 输出结果 - Sink
.print().setParallelism(1);
}
}
window()
执行结果的说明可以看下面这张图。

程序启动后,一次性给它来三个(a,1),得到结果3,窗口开始计算,并开启滑动,每秒滑动1次。
输入尽量保持匀速,可以看到在第一个5秒结束前,求和的结果都是累加的。
当第一个5秒结束时,之前程序启动时累计的结果
3
也随之消失
,此时的结果是5(= 7 + 1 - 3)
。如果此时不再输入任何数据,那么每隔一秒,结果就会减1,直到停留在(a,1)为止,也就是最后一次的输入结果上。
这就是滑动窗口。
如果给它加上Watermark水印,又会是什么结果呢?
下面就用代码来演示。
EventTime窗口
使用SlidingEventTimeWindowsJob()
类实现单词统计,代码如下。
package itechthink.window.sliding;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
/**
* SlidingEventTimeWindows + Watermark 实现单词统计
* 输入数据格式:时间,单词,次数
* 使用了增量聚合函数 + 全量聚合函数组合的方式,可以把Flink的工作机制看得更清楚
*
*/
public class SlidingEventTimeWindowsJob {
private static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) {
// 1. 准备环境 - Environment
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
reduce(environment);
// reduceOffset(environment);
// reduceWithDelayNoSide(environment);
// reduceWithDelayHaveSide(environment);
// 5. 启动任务 - Execute
try {
environment.execute("SlidingEventTimeWindowsJob");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 自定义reduce()方法,不允许数据延迟
*
*/
private static void reduce(StreamExecutionEnvironment environment) {
// 2. 读取数据 - Source
SingleOutputStreamOperator<String> result = environment.socketTextStream("localhost", 9528)
.filter(StringUtils::isNoneBlank)
.assignTimestampsAndWatermarks(
/*
* Duration.ZERO:不允许数据延迟
*/
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
// 抽取时间戳字段
@Override
public long extractTimestamp(String element, long recordTimestamp) {
return Long.parseLong(element.split(",")[0]);
}
})
)
// 3. 处理数据 - Transform
/*
*
* 为了能够更清楚地观察数据变化,把所有的数据都打印出来
* 所以映射为 Tuple3<Integer, String, Integer>,而不是 Tuple2<String, Integer>
* 因为要把时间戳字段也带上,这样就能更清楚地知道是哪个时间戳触发的窗口计算
*
*/
.map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {
@Override
public Tuple3<Integer, String, Integer> map(String input) throws Exception {
String[] value = input.split(",");
return Tuple3.of(Integer.valueOf(value[0].trim()), value[1].trim(), Integer.valueOf(value[2].trim()));
}
})
// 按Tuple3中的第二个元素分组
.keyBy(x -> x.f1)
// 窗口大小:15秒,滑动步长:5秒
.window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5)))
/*
* 第一次输入 第二次输入(连续输入和间断输入除了currentWatermark不同,其他都一样) 第三次输入
* 1000,a,1 1000,a,1 1000,a,1
* 2000,a,1 2000,a,1 2000,a,1
* 3000,a,1 3000,a,1 3000,a,1
* 4000,a,1 4000,a,1 4000,a,1
* 5000,a,1 5000,a,1 5000,a,1
* <这里如果间断,除了currentWatermark不同,其他都一样>,因为ProcessWindowFunction会被触发执行
* 6000,a,1 6000,a,1
* 7000,a,1 7000,a,1
* 8000,a,1 8000,a,1
* 9000,a,1 9000,a,1
* 10000,a,1 10000,a,1
* <这里如果间断,除了currentWatermark不同,其他都一样>
* 11000,a,1
* 12000,a,1
* 13000,a,1
* 14000,a,1
* 15000,a,1
* 输出:
* ReduceFunction ==> (2000) a ==> 2 ReduceFunction ==> (2000) a ==> 2 ReduceFunction ==> (2000)a ==> 2
* ReduceFunction ==> (2000) a ==> 2 ReduceFunction ==> (2000) a ==> 2 ReduceFunction ==> (2000)a ==> 2
* ReduceFunction ==> (2000) a ==> 2 ReduceFunction ==> (2000) a ==> 2 ReduceFunction ==> (2000)a ==> 2
* ReduceFunction ==> (3000) a ==> 3 ReduceFunction ==> (3000) a ==> 3 ReduceFunction ==> (3000)a ==> 3
* ReduceFunction ==> (3000) a ==> 3 ReduceFunction ==> (3000) a ==> 3 ReduceFunction ==> (3000)a ==> 3
* ReduceFunction ==> (3000) a ==> 3 ReduceFunction ==> (3000) a ==> 3 ReduceFunction ==> (3000)a ==> 3
* ReduceFunction ==> (4000) a ==> 4 ReduceFunction ==> (4000) a ==> 4 ReduceFunction ==> (4000)a ==> 4
* ReduceFunction ==> (4000) a ==> 4 ReduceFunction ==> (4000) a ==> 4 ReduceFunction ==> (4000)a ==> 4
* ReduceFunction ==> (4000) a ==> 4 ReduceFunction ==> (4000) a ==> 4 ReduceFunction ==> (4000)a ==> 4
* ReduceFunction ==> (5000) a ==> 5 ReduceFunction ==> (5000) a ==> 5 ReduceFunction ==> (5000)a ==> 5
* ReduceFunction ==> (5000) a ==> 5 ReduceFunction ==> (5000) a ==> 5 ReduceFunction ==> (5000)a ==> 5
* ProcessWindowFunction ==> currentWatermark == 1970-01-01 08:00:04 ReduceFunction ==> (6000) a ==> 2 ReduceFunction ==> (6000)a ==> 2
* ProcessWindowFunction ==> [1970-01-01 07:59:50, 1970-01-01 08:00:05) ==> (4000) a ==> 4 ReduceFunction ==> (6000) a ==> 6 ReduceFunction ==> (6000)a ==> 6
* ReduceFunction ==> (6000) a ==> 6 ReduceFunction ==> (6000)a ==> 6
* ReduceFunction ==> (7000) a ==> 3 ReduceFunction ==> (7000)a ==> 3
* ReduceFunction ==> (7000) a ==> 7 ReduceFunction ==> (7000)a ==> 7
* ReduceFunction ==> (7000) a ==> 7 ReduceFunction ==> (7000)a ==> 7
* ReduceFunction ==> (8000) a ==> 4 ReduceFunction ==> (8000)a ==> 4
* ReduceFunction ==> (8000) a ==> 8 ReduceFunction ==> (8000)a ==> 8
* ReduceFunction ==> (8000) a ==> 8 ReduceFunction ==> (8000)a ==> 8
* ReduceFunction ==> (9000) a ==> 5 ReduceFunction ==> (9000)a ==> 5
* ReduceFunction ==> (9000) a ==> 9 ReduceFunction ==> (9000)a ==> 9
* ReduceFunction ==> (9000) a ==> 9 ReduceFunction ==> (9000)a ==> 9
* ReduceFunction ==> (10000) a ==> 6 ReduceFunction ==> (10000)a ==> 6
* ReduceFunction ==> (10000) a ==> 10 ReduceFunction ==> (10000)a ==> 10
* ProcessWindowFunction ==> currentWatermark == 1970-01-01 08:00:09 ReduceFunction ==> (11000)a ==> 2
* ProcessWindowFunction ==> [1970-01-01 07:59:50, 1970-01-01 08:00:05) ==> (4000) a ==> 4 ReduceFunction ==> (11000)a ==> 7
* ProcessWindowFunction ==> currentWatermark == 1970-01-01 08:00:09 ReduceFunction ==> (11000)a ==> 11
* ProcessWindowFunction ==> [1970-01-01 07:59:55, 1970-01-01 08:00:10) ==> (9000) a ==> 9 ReduceFunction ==> (12000)a ==> 3
* ReduceFunction ==> (12000)a ==> 8
* ReduceFunction ==> (12000)a ==> 12
* ReduceFunction ==> (13000)a ==> 4
* ReduceFunction ==> (13000)a ==> 9
* ReduceFunction ==> (13000)a ==> 13
* ReduceFunction ==> (14000)a ==> 5
* ReduceFunction ==> (14000)a ==> 10
* ReduceFunction ==> (14000)a ==> 14
* ReduceFunction ==> (15000)a ==> 6
* ReduceFunction ==> (15000)a ==> 11
* ProcessWindowFunction ==> currentWatermark == 1970-01-01 08:00:14
* ProcessWindowFunction ==> [1970-01-01 07:59:50, 1970-01-01 08:00:05) ==> (4000) a ==> 4
* ProcessWindowFunction ==> currentWatermark == 1970-01-01 08:00:14
* ProcessWindowFunction ==> [1970-01-01 07:59:55, 1970-01-01 08:00:10) ==> (9000) a ==> 9
* ProcessWindowFunction ==> currentWatermark == 1970-01-01 08:00:14
* ProcessWindowFunction ==> [1970-01-01 08:00:00, 1970-01-01 08:00:15) ==> (14000) a ==> 14
* 重难点:
* 1. 理解水印机制的原理
* 因为每5秒统计一次,那么滑动窗口的大小就是5秒,超过5秒就销毁
* 2. 理解不同函数的触发机制和交替作用
* 2.1 增量聚合函数ReduceFunction :来一条数据就触发一次计算
* ReduceFunction ==> (5000) a ==> 5
* ReduceFunction ==> (5000) a ==> 5
* 这后面应该还有一条数据没有被显示,那就是 ReduceFunction ==> (5000) a ==> 1,否则 ReduceFunction ==> (6000) a ==> 2 不可能凭空产生
* 2.2 全量聚合函数ProcessWindowFunction :等到所有数据全部输入完成之后才触发计算
* 第一次触发ProcessWindowFunction时,窗口的时间范围是:[1970-01-01 07:59:50, 1970-01-01 08:00:05),而Flink系统的水印时间戳是:1970-01-01 08:00:04,符合`左闭右开`(也就是`[)`)的要求
* 3. 理解滑动窗口的触发机制
*
*/
.reduce(
// 增量聚合函数:每个不同时间戳的输入,都会触发一次
new ReduceFunction<Tuple3<Integer, String, Integer>>() {
@Override
public Tuple3<Integer, String, Integer> reduce(Tuple3<Integer, String, Integer> t1, Tuple3<Integer, String, Integer> t2) throws Exception {
// 这里必须以后进入的参数的时间戳字段为准,否则会一直都是`(1000) a`
System.out.println("ReduceFunction ==> (" + t2.f0 + ") " + t2.f1 + " ==> " + (t1.f2 + t2.f2));
return Tuple3.of(t2.f0, t2.f1, t1.f2 + t2.f2);
}
},
/*
* 全量聚合函数:全量统计的时间区间就是窗口的大小
* Tuple3<Integer, String, Integer> IN 待处理的DataStreamSource中每个元素的类型
* String OUT 结果DataStreamSource中每个元素的类型
* String KEY keyBy()中指定的key的类型
* TimeWindow TimeWindow
*/
new ProcessWindowFunction<Tuple3<Integer, String, Integer>, String, String, TimeWindow>() {
@Override
public void process(String input, Context context, Iterable<Tuple3<Integer, String, Integer>> iterable, Collector<String> collector) throws Exception {
for (Tuple3<Integer, String, Integer> t : iterable) {
// 查看Flink当前的Watermark时间
System.out.println("ProcessWindowFunction ==> currentWatermark == " + DATE_FORMAT.format(context.currentWatermark()));
collector.collect("ProcessWindowFunction ==> [" + DATE_FORMAT.format(context.window().getStart()) + ", " + DATE_FORMAT.format(context.window().getEnd()) + ") " +
" ==> (" + t.f0 + ") " + t.f1 + " ==> " + t.f2);
}
}
}
);
// 4. 输出结果 - Sink
result.print().setParallelism(1);
}
/**
* 自定义reduce()方法,不允许数据延迟
*
*/
private static void reduceOffset(StreamExecutionEnvironment environment) {
// 2. 读取数据 - Source
SingleOutputStreamOperator<String> result = environment.socketTextStream("localhost", 9528)
.filter(StringUtils::isNoneBlank)
.assignTimestampsAndWatermarks(
/*
* Duration.ZERO:不允许数据延迟
*/
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
// 抽取时间戳字段
@Override
public long extractTimestamp(String element, long recordTimestamp) {
return Long.parseLong(element.split(",")[0]);
}
})
)
// 3. 处理数据 - Transform
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String input) throws Exception {
String[] value = input.split(",");
return Tuple2.of(value[1].trim(), Integer.valueOf(value[2].trim()));
}
})
// 按key分组
.keyBy(x -> x.f0)
// 窗口大小:15秒,滑动步长:5秒
.window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5)))
/*
* 输入:
* ???
* 输出:
* ???
*/
.reduce(
// 增量聚合函数
new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
System.out.println("ReduceFunction ==> " + t1.f0 + " ==> " + (t1.f1 + t2.f1));
return Tuple2.of(t1.f0, t1.f1 + t2.f1);
}
},
/*
* 全量聚合函数
* Tuple2<String, Integer> IN 待处理的DataStreamSource中每个元素的类型
* String OUT 结果DataStreamSource中每个元素的类型
* String KEY keyBy()中指定的key的类型
* TimeWindow TimeWindow
*/
new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
@Override
public void process(String input,
Context context,
Iterable<Tuple2<String, Integer>> iterable,
Collector<String> collector) throws Exception {
for (Tuple2<String, Integer> t : iterable) {
// 查看Flink当前的Watermark时间,有几个key就算几次
System.out.println("currentWatermark ==> " + DATE_FORMAT.format(context.currentWatermark()));
collector.collect("[" + DATE_FORMAT.format(context.window().getStart()) +
", " + DATE_FORMAT.format(context.window().getEnd()) + ") " +
t.f0 + " ==> " + t.f1);
}
}
}
);
// 4. 输出结果 - Sink
result.print().setParallelism(1);
}
/**
* 自定义reduce()方法,允许数据延迟且不带sideOutputLateData(outputTag)旁路输出
*
*/
private static void reduceWithDelayNoSide(StreamExecutionEnvironment environment) {
// 2. 读取数据 - Source
SingleOutputStreamOperator<String> result = environment.socketTextStream("localhost", 9528)
.filter(StringUtils::isNoneBlank)
.assignTimestampsAndWatermarks(
/*
* Time.seconds(2):允许数据延迟2秒
*/
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
// 抽取时间戳字段
@Override
public long extractTimestamp(String element, long recordTimestamp) {
return Long.parseLong(element.split(",")[0]);
}
})
)
// 3. 处理数据 - Transform
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String input) throws Exception {
String[] value = input.split(",");
return Tuple2.of(value[1].trim(), Integer.valueOf(value[2].trim()));
}
})
// 按key分组
.keyBy(x -> x.f0)
// 窗口大小:15秒,滑动步长:5秒
.window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5)))
/*
* 输入:
* ???
* 输出:
* ???
*/
.reduce(
// 增量聚合函数
new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
System.out.println("ReduceFunction ==> " + t1.f0 + " ==> " + (t1.f1 + t2.f1));
return Tuple2.of(t1.f0, t1.f1 + t2.f1);
}
},
/*
* 全量聚合函数
* Tuple2<String, Integer> IN 待处理的DataStreamSource中每个元素的类型
* String OUT 结果DataStreamSource中每个元素的类型
* String KEY keyBy()中指定的key的类型
* TimeWindow TimeWindow
*/
new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
@Override
public void process(String input,
Context context,
Iterable<Tuple2<String, Integer>> iterable,
Collector<String> collector) throws Exception {
for (Tuple2<String, Integer> t : iterable) {
// 查看Flink当前的Watermark时间
System.out.println("currentWatermark ==> " + DATE_FORMAT.format(context.currentWatermark()));
collector.collect("[" + DATE_FORMAT.format(context.window().getStart()) +
", " + DATE_FORMAT.format(context.window().getEnd()) + ") " +
t.f0 + " ==> " + t.f1);
}
}
}
);
// 4. 输出结果 - Sink
result.print().setParallelism(1);
}
/**
* 自定义reduce()方法,允许数据延迟且带有sideOutputLateData(outputTag)旁路输出
*
*/
private static void reduceWithDelayHaveSide(StreamExecutionEnvironment environment) {
// 延迟数据处理
OutputTag<Tuple2<String, Integer>> outputTag = new OutputTag<Tuple2<String, Integer>>("late-data") {};
// 2. 读取数据 - Source
SingleOutputStreamOperator<String> result = environment.socketTextStream("localhost", 9528)
.filter(StringUtils::isNoneBlank)
.assignTimestampsAndWatermarks(
/*
* Time.seconds(2):允许数据延迟2秒
*/
WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
// 抽取时间戳字段
@Override
public long extractTimestamp(String element, long recordTimestamp) {
return Long.parseLong(element.split(",")[0]);
}
})
)
// 3. 处理数据 - Transform
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String input) throws Exception {
String[] value = input.split(",");
return Tuple2.of(value[1].trim(), Integer.valueOf(value[2].trim()));
}
})
// 按key分组
.keyBy(x -> x.f0)
// 窗口大小:15秒,滑动步长:5秒
.window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5)))
// 延迟数据旁路输出
.sideOutputLateData(outputTag)
/*
* 输入:
* ???
* 输出:
* ???
*/
.reduce(
// 增量聚合函数
new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
System.out.println("ReduceFunction ==> " + t1.f0 + " ==> " + (t1.f1 + t2.f1));
return Tuple2.of(t1.f0, t1.f1 + t2.f1);
}
},
/*
* 全量聚合函数
* Tuple2<String, Integer> IN 待处理的DataStreamSource中每个元素的类型
* String OUT 结果DataStreamSource中每个元素的类型
* String KEY keyBy()中指定的key的类型
* TimeWindow TimeWindow
*/
new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
@Override
public void process(String input,
Context context,
Iterable<Tuple2<String, Integer>> iterable,
Collector<String> collector) throws Exception {
for (Tuple2<String, Integer> t : iterable) {
// 查看Flink当前的Watermark时间
// 有几个key就算几次
System.out.println("currentWatermark ==> " + DATE_FORMAT.format(context.currentWatermark()));
collector.collect("[" + DATE_FORMAT.format(context.window().getStart()) +
", " + DATE_FORMAT.format(context.window().getEnd()) + ") " +
t.f0 + " ==> " + t.f1);
}
}
}
);
// 4. 输出结果 - Sink
result.print().setParallelism(1);
// 拿到延迟的数据 OutputTag<Tuple2<String, Integer>>
DataStream<Tuple2<String, Integer>> sideOutput = result.getSideOutput(outputTag);
// 以不同的颜色观察延迟数据输出
sideOutput.printToErr();
}
}
代码的执行结果在注释中已有详细记录,下面的表格也呈现了输入输出的结果。
第一次 | 第二次 | 第三次 | |
---|---|---|---|
输入 | 1000,a,1 2000,a,1 3000,a,1 4000,a,1 5000,a,1 | 1000,a,1 2000,a,1 3000,a,1 4000,a,1 5000,a,1 6000,a,1 7000,a,1 8000,a,1 9000,a,1 10000,a,1 | 1000,a,1 2000,a,1 3000,a,1 4000,a,1 5000,a,1 6000,a,1 7000,a,1 8000,a,1 9000,a,1 10000,a,1 11000,a,1 12000,a,1 13000,a,1 14000,a,1 15000,a,1 |
输出 | ReduceFunction ==> (2000) a ==> 2 ReduceFunction ==> (2000) a ==> 2 ReduceFunction ==> (2000) a ==> 2 ReduceFunction ==> (3000) a ==> 3 ReduceFunction ==> (3000) a ==> 3 ReduceFunction ==> (3000) a ==> 3 ReduceFunction ==> (4000) a ==> 4 ReduceFunction ==> (4000) a ==> 4 ReduceFunction ==> (4000) a ==> 4 ReduceFunction ==> (5000) a ==> 5 ReduceFunction ==> (5000) a ==> 5 ProcessWindowFunction ==> currentWatermark == 1970-01-01 08:00:04 ProcessWindowFunction ==> [1970-01-01 07:59:50, 1970-01-01 08:00:05) ==> (4000) a ==> 4 | ReduceFunction ==> (2000) a ==> 2 ReduceFunction ==> (2000) a ==> 2 ReduceFunction ==> (2000) a ==> 2 ReduceFunction ==> (3000) a ==> 3 ReduceFunction ==> (3000) a ==> 3 ReduceFunction ==> (3000) a ==> 3 ReduceFunction ==> (4000) a ==> 4 ReduceFunction ==> (4000) a ==> 4 ReduceFunction ==> (4000) a ==> 4 ReduceFunction ==> (5000) a ==> 5 ReduceFunction ==> (5000) a ==> 5 ReduceFunction ==> (6000) a ==> 2 ReduceFunction ==> (6000) a ==> 6 ReduceFunction ==> (6000) a ==> 6 ReduceFunction ==> (7000) a ==> 3 ReduceFunction ==> (7000) a ==> 7 ReduceFunction ==> (7000) a ==> 7 ReduceFunction ==> (8000) a ==> 4 ReduceFunction ==> (8000) a ==> 8 ReduceFunction ==> (8000) a ==> 8 ReduceFunction ==> (9000) a ==> 5 ReduceFunction ==> (9000) a ==> 9 ReduceFunction ==> (9000) a ==> 9 ReduceFunction ==> (10000) a ==> 6 ReduceFunction ==> (10000) a ==> 10 ProcessWindowFunction ==> currentWatermark == 1970-01-01 08:00:09 ProcessWindowFunction ==> [1970-01-01 07:59:50, 1970-01-01 08:00:05) ==> (4000) a ==> 4 ProcessWindowFunction ==> currentWatermark == 1970-01-01 08:00:09 ProcessWindowFunction ==> [1970-01-01 07:59:55, 1970-01-01 08:00:10) ==> (9000) a ==> 9 | ReduceFunction ==> (2000) a ==> 2 ReduceFunction ==> (2000) a ==> 2 ReduceFunction ==> (2000) a ==> 2 ReduceFunction ==> (3000) a ==> 3 ReduceFunction ==> (3000) a ==> 3 ReduceFunction ==> (3000) a ==> 3 ReduceFunction ==> (4000) a ==> 4 ReduceFunction ==> (4000) a ==> 4 ReduceFunction ==> (4000) a ==> 4 ReduceFunction ==> (5000) a ==> 5 ReduceFunction ==> (5000) a ==> 5 ReduceFunction ==> (6000) a ==> 2 ReduceFunction ==> (6000) a ==> 6 ReduceFunction ==> (6000) a ==> 6 ReduceFunction ==> (7000) a ==> 3 ReduceFunction ==> (7000) a ==> 7 ReduceFunction ==> (7000) a ==> 7 ReduceFunction ==> (8000) a ==> 4 ReduceFunction ==> (8000) a ==> 8 ReduceFunction ==> (8000) a ==> 8 ReduceFunction ==> (9000) a ==> 5 ReduceFunction ==> (9000) a ==> 9 ReduceFunction ==> (9000) a ==> 9 ReduceFunction ==> (10000) a ==> 6 ReduceFunction ==> (10000) a ==> 10 ReduceFunction ==> (11000) a ==> 2 ReduceFunction ==> (11000) a ==> 7 ReduceFunction ==> (11000) a ==> 11 ReduceFunction ==> (12000) a ==> 3 ReduceFunction ==> (12000) a ==> 8 ReduceFunction ==> (12000) a ==> 12 ReduceFunction ==> (13000) a ==> 4 ReduceFunction ==> (13000) a ==> 9 ReduceFunction ==> (13000) a ==> 13 ReduceFunction ==> (14000) a ==> 5 ReduceFunction ==> (14000) a ==> 10 ReduceFunction ==> a(14000) ==> 14 ReduceFunction ==> (15000) a ==> 6 ReduceFunction ==> (15000) a ==> 11 ProcessWindowFunction ==> currentWatermark == 1970-01-01 08:00:14 ProcessWindowFunction ==> [1970-01-01 07:59:50, 1970-01-01 08:00:05) ==> (4000) a ==> 4 ProcessWindowFunction ==> currentWatermark == 1970-01-01 08:00:14 ProcessWindowFunction ==> [1970-01-01 07:59:55, 1970-01-01 08:00:10) ==> (9000) a ==> 9 ProcessWindowFunction ==> currentWatermark == 1970-01-01 08:00:14 ProcessWindowFunction ==> [1970-01-01 08:00:00, 1970-01-01 08:00:15) ==> (14000) a ==> 14 |
reduce()
的执行过程可以用两张图来表示。


本来可以用比较简单的方式来验证滑动窗口的,但自己找虐,用了聚合函数和增量函数的组合,而且加上了Watermark水印,但其实细细探究,也不是那么难。
感谢支持
更多内容,请移步《超级个体》。