任务调度整合
原创大约 5 分钟
通过crontab调度任务
前面已经开发完成了用户行为数仓
和商品订单数仓
的所有需求及其执行脚本。
其中有不少是需要通过Linux的定时任务调度crontab
来完成的。
crontab -e
# 用户行为数仓ods层和dwd层
1 0 * * * /home/work/warehouse_user_action/ods_shopmall_add_partition.sh
1 0 * * * /home/work/warehouse_user_action/dwd_shopmall_add_partition.sh
# 用户行为数仓需求一:新增用户相关指标
1 0 * * * /home/work/warehouse_user_action/dws_shopmall_requirement01_add_partition.sh
1 0 * * * /home/work/warehouse_user_action/app_shopmall_requirement01_add_partition.sh
# 用户行为数仓需求二:日活用户相关指标
1 0 * * * /home/work/warehouse_user_action/app_shopmall_requirement02_add_partition.sh
# 用户行为数仓需求三:七日流失用户相关指标
1 0 * * * /home/work/warehouse_user_action/dws_shopmall_requirement03_add_partition.sh
1 0 * * * /home/work/warehouse_user_action/app_shopmall_requirement03_add_partition.sh
# 用户行为数仓需求四:每日应用启动相关指标
1 0 * * * /home/work/warehouse_user_action/app_shopmall_requirement04_add_partition.sh
# 用户行为数仓需求五:设备平台相关指标
1 0 * * * /home/work/warehouse_user_action/dws_shopmall_requirement05_add_partition.sh
1 0 * * * /home/work/warehouse_user_action/app_shopmall_requirement05_add_partition.sh
# 用户行为数仓需求六:应用闪退相关指标
1 0 * * * /home/work/warehouse_user_action/dws_shopmall_requirement06_add_partition.sh
1 0 * * * /home/work/warehouse_user_action/app_shopmall_requirement06_add_partition.sh
# 商品订单数仓数据导入
0 1 * * * /home/work/warehouse_goods_order/data_collect_completed.sh
5 1 * * * /home/work/warehouse_goods_order/data_collect_increment.sh
# 商品订单数仓ods层和dwd层
1 0 * * * /home/work/warehouse_goods_order/ods_shopmall_add_partition.sh
1 0 * * * /home/work/warehouse_goods_order/dwd_shopmall_add_partition.sh
# 商品订单数仓需求一:用户信息宽表
1 0 * * * /home/work/warehouse_goods_order/dws_shopmall_requirement01_add_partition.sh
# 商品订单数仓需求二:电商GMV
1 0 * * * /home/work/warehouse_goods_order/app_shopmall_requirement02_add_partition.sh
# 商品订单数仓需求三:商品相关指标
1 0 * * * /home/work/warehouse_goods_order/dws_shopmall_requirement03_add_partition.sh
1 0 * * * /home/work/warehouse_goods_order/app_shopmall_requirement03_add_partition.sh
# 商品订单数仓需求四:漏斗转化分析
1 0 * * * /home/work/warehouse_goods_order/app_shopmall_requirement04_add_partition.sh
# 商品订单数仓拉链表
1 0 * * * /home/work/warehouse_goods_order/dwd_load_data_to_order_zipper.sh
但是这些任务之间是无法自动根据相互之间的依赖关系来确定执行顺序的,所以仍然还得通过脚本来执行。
例如,假设有两个任务:先创建DWD层
的dwd_order
表,然后再填充它。
它们之间显然有依赖关系:只有先创建了表才能给它填充数据。
> cd /home/work/
> vi crontab_scheduler.sh
#!/bin/bash
if [ "d$1" = "d" ]
then
dt=`date +%Y%m%d --date="1 days ago"`
else
dt=$1
fi
# 先删除jobA的输出,这样就可以反复执行
hdfs dfs -rm -r hdfs://server01:9000/data/dwd/order/dt=${dt}
# jobA:创建dwd_order表
hive -e "
create external table if not exists dwd_shopmall.dwd_order (
id bigint,
userid bigint,
money double,
type int,
status int,
payid bigint,
createtime string,
updatetime string
) partitioned by(dt string)
row format delimited
fields terminated by '\t'
location 'hdfs://server01:9000/data/dwd/order/';
"
# 因为每个hdfs文件创建成功后都会创建一个类似于 000000_${n} 这样的文件
# 所以如果jobA执行成功,就能查询到 000000_0,否则执行时就会报错
hdfs dfs -ls hdfs://server01:9000/data/dwd/order/dt=${dt}/000000_0
# 这里通过 $? 来判断上一条命令是否成功执行
# 如果 $? 返回0,则表示任务执行成功,否则执行失败
if [ $? = 0 ]
then
echo "执行jobB"
else
# 这里可以给相关人员发短息或者邮件
echo "jobA执行失败,请处理..."
fi
如果项目中这种基于crontab
的定时任务调度有很多,一个个写,会非常麻烦,这时候就需要用到Azkaban了。
开发任务脚本
以商品订单数仓需求二:电商GMV
为例,它的任务执行路径是 MySQL -> HDFS -> ODS层
-> DWD层
-> APP层
。
先准备Azkaban的任务调度文件。
> mkdir -p /home/work/azkaban
> cd /home/work/azkaban
# 将MySQL中的数据导入HDFS
> vi collect_from_mysql.job
type=command
command=sh /home/work/azkaban/collect_from_mysql.sh
# 关联ods层的数据
> vi ods.job
type=command
dependencies=collect_from_mysql
command=sh /home/work/azkaban/ods_shopmall_add_partition.sh
# 生成dwd层的数据
> vi dwd.job
type=command
dependencies=ods
command=sh /home/work/azkaban/dwd_shopmall_add_partition.sh
# 生成app层的数据
> vi app.job
type=command
dependencies=dwd
command=sh /home/work/azkaban/app_shopmall_add_partition.sh
因为job
中用到的几个sh
脚本文件事先已经写过,这里只做一些调整就行了。
collect_from_mysql.sh
> cd /home/work/azkaban
> vi collect_from_mysql.sh
#!/bin/bash
# 增量数据采集
# 默认获取前一天的日期,也支持通过参数指定日期
if [ "d$1" = "d" ]
then
dt=`date +%Y%m%d --date="1 days ago"`
else
dt=$1
fi
# 将 yyyyMMdd 转换为 yyyy-MM-dd
dt_new=`date +%Y-%m-%d --date="${dt}"`
# SQL语句
t_order_sql="select order_id,userid,money,type,status,payid,createtime,updatetime from t_order where createtime >= '${dt_new} 00:00:00' and createtime <= '${dt_new} 23:59:59'"
# 路径前缀
path_prefix="hdfs://server01:9000/data/ods"
# 输出路径
t_order_path="${path_prefix}/t_order/${dt}"
# 采集数据
echo "开始采集..."
echo "采集表:t_order"
sh ../common_data_collect_script.sh "${t_order_sql}" "${t_order_path}"
echo "结束采集..."
脚本中用到的common_data_collect_script.sh
文件之前已经写过,就不再重复。
ods_shopmall_add_partition.sh
> cd /home/work/azkaban
> vi ods_shopmall_add_partition.sh
#!/bin/bash
# 给ods层的表添加分区,每天凌晨执行一次
if [ "d$1" = "d" ]
then
dt=`date +%Y%m%d --date="1 days ago"`
else
dt=$1
fi
sh ../common_add_partition.sh ods_shopmall.ods_order ${dt} ${dt}
脚本中用到的common_add_partition.sh
文件之前已经写过,就不再重复。
dwd_shopmall_add_partition.sh
> cd /home/work/azkaban
> vi dwd_shopmall_add_partition.sh
#!/bin/bash
# 基于ods层的表进行清洗,清洗之后的数据会被添加到dwd层对应表的对应分区,每天凌晨执行一次
if [ "d$1" = "d" ]
then
dt=`date +%Y%m%d --date="1 days ago"`
else
dt=$1
fi
hive -e "
insert overwrite table dwd_shopmall.dwd_order partition(dt = '${dt}') select
id,
userid,
money,
type,
status,
payid,
createtime,
updatetime
from ods_shopmall.ods_order
where dt = '${dt}' and id is not null;
"
app_shopmall_add_partition.sh
> cd /home/work/azkaban
> vi app_shopmall_add_partition.sh
#!/bin/bash
# 需求二:电商GMV
# 每天凌晨执行一次
if [ "d$1" = "d" ]
then
dt=`date +%Y%m%d --date="1 days ago"`
else
dt=$1
fi
hive -e "
insert overwrite table app_shopmall.app_gmv partition(dt = '${dt}') select
sum(money) as gmv
from dwd_shopmall.dwd_order
where dt = '${dt}';
"
通过Azkaban调度任务
将collect_from_mysql.job
、ods.job
、dwd.job
和app.job
这几个任务文件打包到一个zip
压缩文件中。
> cd /home/work/azkaban
> zip app_gmv.zip collect_from_mysql.job ods.job dwd.job app.job
然后将这个zip
压缩文件上传到Azkaban。

到这一步,就可以像执行普通任务那样执行商品订单数仓的统计任务了。

一些优化措施
Sqoop采集参数调优:默认只生成一个采集任务,可以通过
splitBy
+num-mappers
配合的方式提高采集性能。分离在线计算与离线计算任务。
避免启动时间过于集中。
使用Impala实现快速交互查询。
感谢支持
更多内容,请移步《超级个体》。