双流JOIN问题
原创大约 5 分钟
大数据中的关联
在对传统的关系型数据库进行数据查询时,经常会遇到需要将两张或多张表进行关联的操作。
> SELECT a.id, a.name, b.deptid, ... FROM table1 AS a, table2 AS b
WHERE a.id = b.id;
# 或者
> SELECT a.id, a.name, b.deptid, ... FROM table1 join table2
ON table1.id = table2.id;
这是关系数据库中的典型应用场景。
但是在大数据中,尤其是在流式大数据系统中,一方面,数据都是以流
的状态存在的,也就是它可能时刻都在发生变化。
另一方面,还有一些大数据中特有的数据处理机制在起作用,例如Watermark水印和窗口,它的技术机制在之前的窗口连接器中已经讲过了,但在做业务数据表的连接时,还会有些不同。
处理思路
假设有这样两张表。
- 订单表。
orderid | time | monry |
---|---|---|
1 | 1000 | 300 |
2 | 2000 | 500 |
3 | 3000 | 800 |
4 | 5000 | 200 |
5 | 7000 | 1000 |
- 订单详情表。
itemid | orderid | time | sku | amount | money |
---|---|---|---|---|---|
1 | 1 | 1000 | A | 1 | 200 |
2 | 1 | 2000 | B | 1 | 100 |
3 | 3 | 6000 | B | 1 | 800 |
它们都在Flink的滚动窗口中被处理,每隔5秒处理一次。
因为两张表中的数据生成时间可能不一致,因此也就会产生时间窗口的错位问题,也就是明明有记录,但或者因为还没有到事件时间,或者数据还没有进入到Flink进行处理,而出现匹配不到的现象。
但这种数据遗漏是一定会发生的,无法避免,所以只能想办法降低它所带来的影响。
按照小表放在右边
的方法,应该把数据多的表放在左边,然后执行LEFT OUTER JOIN
操作,这样既能减少扫描次数,也能尽可能地少遗漏数据。
代码实现
package itechthink.joining;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.time.Duration;
/**
* 连接器的一点问题
*
*/
public class DataStreamJoiningJob {
public static void main(String[] args) {
// 1. 准备环境 - Environment
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
/*
* OrderInfo订单数据流
*
*/
SingleOutputStreamOperator<OrderInfo> orderInfoStream = environment.socketTextStream("localhost", 9528)
.filter(StringUtils::isNoneBlank)
.map(new MapFunction<String, OrderInfo>() {
@Override
public OrderInfo map(String input) throws Exception {
String[] split = input.split(",");
return new OrderInfo(Long.parseLong(split[0].trim()),
split[1].trim(),
Long.parseLong(split[2].trim()),
Double.parseDouble(split[3].trim())
);
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(
new SerializableTimestampAssigner<OrderInfo>() {
@Override
public long extractTimestamp(OrderInfo orderInfo, long record) {
return orderInfo.getTime();
}
}
)
);
/*
* OrderItem订单详情数据流
*
*/
SingleOutputStreamOperator<OrderItem> orderItemStream = environment.socketTextStream("localhost", 9529)
.filter(StringUtils::isNoneBlank)
.map(new MapFunction<String, OrderItem>() {
@Override
public OrderItem map(String input) throws Exception {
String[] split = input.split(",");
return new OrderItem(Long.parseLong(split[0].trim()),
Long.parseLong(split[1].trim()),
Long.parseLong(split[2].trim()),
split[3].trim(),
Integer.parseInt(split[4].trim()),
Double.parseDouble(split[5].trim())
);
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy.<OrderItem>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(
new SerializableTimestampAssigner<OrderItem>() {
@Override
public long extractTimestamp(OrderItem orderItem, long record) {
return orderItem.getTime();
}
}
)
);
// 把数据打印出来
// orderInfoStream.print("OrderInfo").setParallelism(1);
// orderItemStream.print("OrderItem").setParallelism(1);
/**
* 两条流数据的JOIN,总的思路是以Item作为左表,也就是数据量大的放在左边
*
* 第一次:
* OrderInfo订单输入数据:
* 1,1000,300
* 2,2000,500
* 3,3000,800
* 4,7000,600
* 5,12000,700
* 6,19000,1000
* OrderItem订单详情输入数据:
* 1,1,1000,A,1,200
* 2,1,2000,B,1,100
* 3,2,5000,A,1,200
* 4,2,6000,A,1,200
* 5,2,11000,B,1,100
* 6,3,4000,C,1,800
* 7,4,7000,D,1,300
* 8,4,8000,D,1,300
* 9,5,15000,E,1,700
* 输出数据:
* ({itemId=1, orderId=1, time=1000, sku='A', amount=1, money=200.0},{orderId=1, time=1000, money=300.0})
* ({itemId=2, orderId=1, time=2000, sku='B', amount=1, money=100.0},{orderId=1, time=1000, money=300.0})
* ({itemId=6, orderId=3, time=4000, sku='C', amount=1, money=800.0},{orderId=3, time=3000, money=800.0})
* ({itemId=7, orderId=4, time=7000, sku='D', amount=1, money=300.0},{orderId=4, time=7000, money=600.0})
* ({itemId=8, orderId=4, time=8000, sku='D', amount=1, money=300.0},{orderId=4, time=7000, money=600.0})
* ({itemId=3, orderId=2, time=5000, sku='A', amount=1, money=200.0},null)
* ({itemId=4, orderId=2, time=6000, sku='A', amount=1, money=200.0},null)
* ({itemId=5, orderId=2, time=11000, sku='B', amount=1, money=100.0},null)
*
*
* 第二次:
* OrderInfo订单输入数据:
* 1,1000,300
* 2,2000,500
* 3,3000,800
* 4,7000,600
* 5,12000,700
* 6,19000,1000
* OrderItem订单详情输入数据:
* 1,1,1000,A,1,200
* 2,1,2000,B,1,100
* 3,2,4000,A,1,200
* 4,2,4500,A,1,200
* 5,2,11000,B,1,100
* 6,3,4000,C,1,800
* 7,4,7000,D,1,300
* 8,4,8000,D,1,300
* 9,5,15000,E,1,700
* 输出数据:
* ({itemId=1, orderId=1, time=1000, sku='A', amount=1, money=200.0},{orderId=1, time=1000, money=300.0})
* ({itemId=2, orderId=1, time=2000, sku='B', amount=1, money=100.0},{orderId=1, time=1000, money=300.0})
* ({itemId=3, orderId=2, time=4000, sku='A', amount=1, money=200.0},{orderId=2, time=2000, money=500.0})
* ({itemId=4, orderId=2, time=4500, sku='A', amount=1, money=200.0},{orderId=2, time=2000, money=500.0})
* ({itemId=6, orderId=3, time=4000, sku='C', amount=1, money=800.0},{orderId=3, time=3000, money=800.0})
* ({itemId=7, orderId=4, time=7000, sku='D', amount=1, money=300.0},{orderId=4, time=7000, money=600.0})
* ({itemId=8, orderId=4, time=8000, sku='D', amount=1, money=300.0},{orderId=4, time=7000, money=600.0})
* ({itemId=5, orderId=2, time=11000, sku='B', amount=1, money=100.0},null)
*
*
* 对比输出结果可以看到:
* 第一次的时候,OrderItem中orderid = 2的数据由于时间窗口都在5秒之外,所以没有关联结果
* 第二次的时候,OrderItem中orderid = 2的数据由于时间窗口都在5秒之内,所以关联上OrderInfo中的数据,而同样orderid = 2的数据由于time=11000,也没有关联上
* 可以逐条输入数据进行对比,验证JOIN和窗口机制共同发挥作用
*
* 1. 对于因为时间本身不在一个窗口中而没有关联上的查询结果(orderItem, null):再把它输入到新的接口中,在业务库中专门找orderItem对应的orderInfo,消除null。因为这样的数据不会太多,能够行得通
* 2. 对于因为延迟而晚到的数据,也会产生(orderItem, null)这种结果:通过侧流输出的形式补齐:outputtag ==> orderItem ==> union
*/
orderItemStream.coGroup(orderInfoStream)
.where(OrderItem::getOrderId)
.equalTo(OrderInfo::getOrderId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 自定义
.apply(new CoGroupFunction<OrderItem, OrderInfo, Tuple2<OrderItem, OrderInfo>>() {
@Override
public void coGroup(Iterable<OrderItem> orderItems,
Iterable<OrderInfo> orderInfos,
Collector<Tuple2<OrderItem, OrderInfo>> output) throws Exception {
for (OrderItem orderItem : orderItems) {
boolean flag = false;
// 左边和右边能够关联上
for (OrderInfo orderInfo : orderInfos) {
output.collect(Tuple2.of(orderItem, orderInfo));
flag = true;
}
// 左边和右边不能关联上
if (!flag) {
output.collect(Tuple2.of(orderItem, null));
}
}
}
})
.print().setParallelism(1);
// 5. 启动任务 - Execute
try {
environment.execute("DataStreamJoiningJob");
} catch (Exception e) {
e.printStackTrace();
}
}
}
感谢支持
更多内容,请移步《超级个体》。