商品订单数据导入
原创大约 5 分钟
数据格式
服务端数据包括用户、商品、支付和订单等数据,这些数据都存储在MySQL中。
CREATE DATABASE IF NOT EXISTS t_shopmall;
-- 数据库
USE t_shopmall;
-- 1. 用户信息表
DROP TABLE IF EXISTS t_user;
CREATE TABLE t_user (
id BIGINT NOT NULL,
username VARCHAR(64) NOT NULL,
gender TINYINT NOT NULL,
birthday DATETIME NOT NULL,
email VARCHAR(64) NOT NULL,
mobile VARCHAR(16) NOT NULL,
createtime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
disabled TINYINT NOT NULL,
PRIMARY KEY (id)
);
-- 2. 用户收货地址表
DROP TABLE IF EXISTS t_user_addr;
CREATE TABLE t_user_addr (
id BIGINT NOT NULL,
userid BIGINT NOT NULL,
addrname VARCHAR(512) NOT NULL,
isdefault TINYINT NOT NULL,
username VARCHAR(64) NOT NULL,
mobile VARCHAR(16) NOT NULL,
PRIMARY KEY (id, userid)
);
-- 3. 类目编码表
DROP TABLE IF EXISTS t_category;
CREATE TABLE t_category (
id INT(11) NOT NULL,
parentids VARCHAR(128) NOT NULL COMMENT '所有父类目编码',
name VARCHAR(128) NOT NULL,
level TINYINT NOT NULL COMMENT '类目等级',
createtime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
-- 4. 商品信息表
DROP TABLE IF EXISTS t_goods_info;
CREATE TABLE t_goods_info (
id BIGINT NOT NULL,
name VARCHAR(256) NOT NULL,
desc VARCHAR(256) NOT NULL,
price DECIMAL(18, 2) NOT NULL,
categoryid INT(11) NOT NULL,
createtime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
-- 5. 支付流水表
DROP TABLE IF EXISTS t_payment;
CREATE TABLE t_payment (
id BIGINT NOT NULL,
orderid BIGINT NOT NULL,
tradeno VARCHAR(64) NOT NULL,
money DECIMAL(18, 2) NOT NULL,
type INT(11) NOT NULL COMMENT '支付类型',
createtime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
-- 6. 订单表
DROP TABLE IF EXISTS t_order;
CREATE TABLE t_order (
id BIGINT NOT NULL,
userid BIGINT NOT NULL,
money DECIMAL(18, 2) NOT NULL,
type INT(11) NOT NULL,
status INT(11) NOT NULL,
payid BIGINT NOT NULL,
createtime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updatetime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
-- 7. 订单详情表
DROP TABLE IF EXISTS t_orderitem;
CREATE TABLE t_orderitem (
orderid BIGINT NOT NULL,
goodsid BIGINT NOT NULL,
amount INT(11) NOT NULL,
price DECIMAL(18, 2) NOT NULL,
createtime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (orderid, goodsid)
);
数据采集
通过Sqoop来采集保存在MySQL中的业务数据,将它们导入到HDFS中去。
由于用户、商品等
事实表
的变动通常不会很大(即使成千上万对MySQL或Hadoop来说也并不算多),所以可以定期(例如,每天采集一次)给这些数据表做一次全量采集,将它们导入到HDFS。由于某些
维度表
,例如,地区维度,基本上很长时间内都是不会改变的,所以只需要做一次全量采集就行了。对于像订单、支付流水这种每天都会变而且数据量可能比较大的数据表(例如,大促期间日交易量可能达到几百万笔),一般采取增量采集的方式,就是每天只采集新增的数据。
某些数据需要以拉链表的方式进行采集。
采集时,需要对数据做信息脱敏处理,例如,用户的手机号脱敏后会变成
158****1234
。
最终的导入策略如下。
数据表 | 表名 | 导入策略 |
---|---|---|
t_user | 用户信息表 | 全量导入 |
t_user_addr | 用户收货地址表 | 全量导入 |
t_category | 类目编码表 | 全量导入 |
t_goods_info | 商品信息表 | 全量导入 |
t_payment | 支付流水表 | 增量导入 |
t_order | 订单表 | 增量导入 |
t_orderitem | 订单详情表 | 增量导入 |
# 通用的数据导入文件
> cd /home/work
# 编写一个通用的数据采集脚本
> vi common_data_collect_script.sh
#!/bin/bash
# 将MySQL中的数据导入HDFS
# 判断是否传参,增加脚本健壮性
if [ $# != 2 ]
then
echo "参数异常:common_data_collect_script.sh <sql> <hdfs_path>"
exit 100
fi
# SQL语句
sql=$1
# HDFS存储路径
hdfs_path=$2
sqoop import \
--connect jdbc:mysql://172.16.185.176:3306/t_shopmall?serverTimezone=UTC \
--username root \
--password 123456 \
--target-dir "${hdfs_path}" \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by '\t' \
--query "${sql}"' and $CONDITIONS' \
--null-string '\\N' \
--null-non-string '\\N'
有了一个通用的脚本,还需要根据导入策略(全量导入
和增量导入
)以及不同的数据表定制不同的导入脚本。
定制脚本
开发全量导入
脚本。
# 创建业务相关的商品订单数据导入文件夹,后续所有数据导入脚本都放在这里
> mkdir -p /home/work/warehouse_goods_order
> cd /home/work/warehouse_goods_order
# 编写全量导入脚本
> vi data_collect_completed.sh
#!/bin/bash
# 全量数据采集
# 默认获取前一天的日期,也支持通过参数指定日期
if [ "d$1" = "d" ]
then
dt=`date +%Y%m%d --date="1 days ago"`
else
dt=$1
fi
# SQL语句
t_user_sql="select id,username,gender,birthday,email,concat(left(mobile,3), '****' ,right(mobile,4)) as mobile,createtime,blacklist from t_user where 1=1"
t_user_addr_sql="select id,userid,addrname,orderflag,username,concat(left(mobile,3), '****' ,right(mobile,4)) as mobile from t_user_addr where 1=1"
t_goods_info_sql="select id,desc,name,price,categoryid,createtime from t_goods_info where 1=1"
t_category_sql="select id,parentids,name,level,createtime from t_category where 1=1"
# 路径前缀
path_prefix="hdfs://server01:9000/data/ods"
# 输出路径
t_user_path="${path_prefix}/t_user/${dt}"
t_user_addr_path="${path_prefix}/t_user_addr/${dt}"
t_category_path="${path_prefix}/t_category/${dt}"
t_goods_info_path="${path_prefix}/t_goods_info/${dt}"
# 采集数据
echo "开始采集..."
echo "开始采集表:t_user"
sh common_data_collect_script.sh "${t_user_sql}" "${t_user_path}"
echo "采集表:t_user_addr"
sh common_data_collect_script.sh "${t_user_addr_sql}" "${t_user_addr_path}"
echo "采集表:t_category"
sh common_data_collect_script.sh "${t_category_sql}" "${t_category_path}"
echo "采集表:t_goods_info"
sh common_data_collect_script.sh "${t_goods_info_sql}" "${t_goods_info_path}"
echo "结束采集..."
开发增量导入
脚本。
> cd /home/work/warehouse_goods_order
# 编写增量导入脚本
> vi data_collect_increment.sh
#!/bin/bash
# 增量数据采集
# 默认获取前一天的日期,也支持通过参数指定日期
if [ "d$1" = "d" ]
then
dt=`date +%Y%m%d --date="1 days ago"`
else
dt=$1
fi
# 将 yyyyMMdd 转换为 yyyy-MM-dd
dt_new=`date +%Y-%m-%d --date="${dt}"`
# SQL语句
t_payment_sql="select id,orderid,tradeno,money,type,createtime from t_payment where pay_time >= '${dt_new} 00:00:00' and createtime <= '${dt_new} 23:59:59'"
t_order_sql="select order_id,userid,money,type,status,payid,createtime,updatetime from t_order where createtime >= '${dt_new} 00:00:00' and createtime <= '${dt_new} 23:59:59'"
t_orderitem_sql="select orderid,goodsid,amount,price,createtime from t_orderitem where createtime >= '${dt_new} 00:00:00' and createtime <= '${dt_new} 23:59:59'"
# 路径前缀
path_prefix="hdfs://server01:9000/data/ods"
# 输出路径
t_payment_path="${path_prefix}/t_payment/${dt}"
t_order_path="${path_prefix}/t_order/${dt}"
t_orderitem_path="${path_prefix}/t_orderitem/${dt}"
# 采集数据
echo "开始采集..."
echo "采集表:t_payment"
sh common_data_collect_script.sh "${t_payment_sql}" "${t_payment_path}"
echo "采集表:t_order"
sh common_data_collect_script.sh "${t_order_sql}" "${t_order_path}"
echo "采集表:t_orderitem"
sh common_data_collect_script.sh "${t_orderitem_sql}" "${t_orderitem_path}"
echo "结束采集..."
最后需要通过定时器来执行这两个脚本。
> crontab -e
# 每天凌晨1点执行
0 1 * * * /home/work/warehouse_goods_order/data_collect_completed.sh
# 每天凌晨1点5分执行
5 1 * * * /home/work/warehouse_goods_order/data_collect_increment.sh
可以尝试着执行一次看看效果。
> cd /home/work/warehouse_goods_order
> sh data_collect_completed.sh 20240531
> sh data_collect_increment.sh 20240531
> hdfs dfs -ls /data/ods/t_user/20240531/
感谢支持
更多内容,请移步《超级个体》。