用户行为数仓DWD层
原创大约 3 分钟
构建DWD层
参照ODS
层的经验,以脚本的方式执行DWD
层数据库和表的初始化。
> mkdir -p /home/work/warehouse_user_action
> cd /home/work/warehouse_user_action
> vi dwd_shopmall_init.sh
#!/bin/bash
# dwd层数据库和表初始化脚本,只需执行一次
hive -e "
create database if not exists dwd_shopmall;
create external table if not exists dwd_shopmall.dwd_app_open (
uid bigint,
did string,
platform tinyint,
ver string,
code string,
net tinyint,
brand string,
model string,
display string,
os string,
timestamp bigint,
adstatus tinyint,
loadingtime bigint
) partitioned by(dt string)
row format delimited
fields terminated by '\t'
location 'hdfs://server01:9000/data/dwd/app_open/';
create external table if not exists dwd_shopmall.dwd_goods_click (
uid bigint,
did string,
platform tinyint,
ver string,
code string,
net tinyint,
brand string,
model string,
display string,
os string,
timestamp bigint,
goodsid bigint,
location tinyint
) partitioned by(dt string)
row format delimited
fields terminated by '\t'
location 'hdfs://server01:9000/data/dwd/goods_click/';
create external table if not exists dwd_shopmall.dwd_goods_item (
uid bigint,
did string,
platform tinyint,
ver string,
code string,
net tinyint,
brand string,
model string,
display string,
os string,
timestamp bigint,
goodsid bigint,
staytime bigint,
loadingtime bigint
) partitioned by(dt string)
row format delimited
fields terminated by '\t'
location 'hdfs://server01:9000/data/dwd/goods_item/';
create external table if not exists dwd_shopmall.dwd_goods_list (
uid bigint,
did string,
platform tinyint,
ver string,
code string,
net tinyint,
brand string,
model string,
display string,
os string,
timestamp bigint,
loadingtime bigint,
loadingtype tinyint,
goodsnum tinyint
) partitioned by(dt string)
row format delimited
fields terminated by '\t'
location 'hdfs://server01:9000/data/dwd/goods_list/';
create external table if not exists dwd_shopmall.dwd_app_collapse (
uid bigint,
did string,
platform tinyint,
ver string,
code string,
net tinyint,
brand string,
model string,
display string,
os string,
timestamp bigint
) partitioned by(dt string)
row format delimited
fields terminated by '\t'
location 'hdfs://server01:9000/data/dwd/app_collapse/';
"
DWD层抽取
有了基础数据表之后,就需要让它抽取ODS层
的数据来填充自己的表了,依然通过脚本进行。
> mkdir -p /home/work/warehouse_user_action
> cd /home/work/warehouse_user_action
> vi dwd_shopmall_add_partition.sh
#!/bin/bash
# 基于ods层的表进行清洗,清洗之后的数据会被添加到dwd层对应表的对应分区,每天凌晨执行一次
if [ "d$1" = "d" ]
then
dt=`date +%Y%m%d --date="1 days ago"`
else
dt=$1
fi
hive -e "
insert overwrite table dwd_shopmall.dwd_app_open partition(dt = '${dt}') select
get_json_object(log,'$.uid') as id, # 这里改成id是为了和业务数据表t_user中的id对应
get_json_object(log,'$.did') as did,
get_json_object(log,'$.platform') as platform,
get_json_object(log,'$.ver') as ver,
get_json_object(log,'$.code') as code,
get_json_object(log,'$.net') as net,
get_json_object(log,'$.brand') as brand,
get_json_object(log,'$.model') as model,
get_json_object(log,'$.display') as display,
get_json_object(log,'$.os') as os,
get_json_object(log,'$.timestamp') as timestamp,
get_json_object(log,'$.adstatus') as adstatus,
get_json_object(log,'$.loadingtime') as loadingtime
from
(
select log from ods_shopmall.ods_app_open where dt = '${dt}' group by log
) as tmp
where get_json_object(log,'$.did') != ''; # 过滤出所有APP端的日志
insert overwrite table dwd_shopmall.dwd_goods_click partition(dt = '${dt}') select
get_json_object(log,'$.uid') as id,
get_json_object(log,'$.did') as did,
get_json_object(log,'$.platform') as platform,
get_json_object(log,'$.ver') as ver,
get_json_object(log,'$.code') as code,
get_json_object(log,'$.net') as net,
get_json_object(log,'$.brand') as brand,
get_json_object(log,'$.model') as model,
get_json_object(log,'$.display') as display,
get_json_object(log,'$.os') as os,
get_json_object(log,'$.timestamp') as timestamp,
get_json_object(log,'$.goodsid') as goodsid,
get_json_object(log,'$.location') as location
from
(
select log from ods_shopmall.ods_goods_click where dt = '${dt}' group by log
) as tmp
where get_json_object(log,'$.did') != '';
insert overwrite table dwd_shopmall.dwd_goods_item partition(dt = '${dt}') select
get_json_object(log,'$.uid') as id,
get_json_object(log,'$.did') as did,
get_json_object(log,'$.platform') as platform,
get_json_object(log,'$.ver') as ver,
get_json_object(log,'$.code') as code,
get_json_object(log,'$.net') as net,
get_json_object(log,'$.brand') as brand,
get_json_object(log,'$.model') as model,
get_json_object(log,'$.display') as display,
get_json_object(log,'$.os') as os,
get_json_object(log,'$.timestamp') as timestamp,
get_json_object(log,'$.goodsid') as goodsid,
get_json_object(log,'$.staytime') as staytime,
get_json_object(log,'$.loadingtime') as loadingtime
from
(
select log from ods_shopmall.ods_goods_item where dt = '${dt}' group by log
) as tmp
where get_json_object(log,'$.did') != '';
insert overwrite table dwd_shopmall.dwd_goods_list partition(dt = '${dt}') select
get_json_object(log,'$.uid') as id,
get_json_object(log,'$.did') as did,
get_json_object(log,'$.platform') as platform,
get_json_object(log,'$.ver') as ver,
get_json_object(log,'$.code') as code,
get_json_object(log,'$.net') as net,
get_json_object(log,'$.brand') as brand,
get_json_object(log,'$.model') as model,
get_json_object(log,'$.display') as display,
get_json_object(log,'$.os') as os,
get_json_object(log,'$.timestamp') as timestamp,
get_json_object(log,'$.loadingtime') as loadingtime,
get_json_object(log,'$.loadingtype') as loadingtype,
get_json_object(log,'$.goodsnum') as goodsnum
from
(
select log from ods_shopmall.ods_goods_list where dt = '${dt}' group by log
) as tmp
where get_json_object(log,'$.did') != '';
insert overwrite table dwd_shopmall.dwd_app_collapse partition(dt = '${dt}') select
get_json_object(log,'$.uid') as id,
get_json_object(log,'$.did') as did,
get_json_object(log,'$.platform') as platform,
get_json_object(log,'$.ver') as ver,
get_json_object(log,'$.code') as code,
get_json_object(log,'$.net') as net,
get_json_object(log,'$.brand') as brand,
get_json_object(log,'$.model') as model,
get_json_object(log,'$.display') as display,
get_json_object(log,'$.os') as os,
get_json_object(log,'$.timestamp') as timestamp
from
(
select log from ods_shopmall.ods_app_collapse where dt = '${dt}' group by log
) as tmp
where get_json_object(log,'$.did') != '';
"
可以通过定时任务来执行它。
> crontab -e
# 每天凌晨0点1分执行
1 0 * * * /home/work/warehouse_user_action/dwd_shopmall_add_partition.sh
可以先指定日期执行,测试一下效果。
> cd /home/work/warehouse_user_action
> sh dwd_shopmall_init.sh
> sh dwd_shopmall_add_partition.sh 20240520
# 进入hive确认结果
> cd /home/work/hive-3.1.3
> ./bin/hive
hive> use dwd_shopmall;
hive> show tables;
+-------------------+
| tab_name |
+-------------------+
| dwd_app_open |
| dwd_goods_click |
| dwd_goods_item |
| dwd_goods_list |
| dwd_app_collapse |
+-------------------+
5 row selected (1.702 seconds)
......
感谢支持
更多内容,请移步《超级个体》。