任务7:导出计算结果
原创大约 2 分钟
MySQL
使用Sqoop将HDFS中计算好的三度关系推荐结果导出到MySQL。
MySQL的表结构如下。
DROP TABLE IF EXISTS t_recommend_list;
CREATE TABLE t_recommend_list
(
uid VARCHAR(32) NOT NULL,
ruids VARCHAR(1024) NOT NULL,
PRIMARY KEY (uid)
)
Sqoop脚本如下。
# 默认获取上周一的时间
dt=`date -d "7 days ago" + "%Y%m%d"`
if [ "d$1" != "d" ]
then
dt=`date -d "7 days ago $1" + "%Y%m%d"`
fi
sqoop export \
--connect jdbc:mysql://172.16.185.176:3306/recommend?serverTimezone=UTC \
--username root \
--password 123456 \
--table t_recommend_list \
--export-dir hdfs://172.16.185.176:9000/data/recommend/${dt} \
--input-fields-terminated-by '\t' \
--update-key uid \
--update-mode allowinsert
Redis
使用Flink将HDFS中计算好的三度关系推荐结果导出到Redis。
package com.itechthink.recommend.flink
import org.apache.flink.api.scala.ExecutionEnvironment
import redis.clients.jedis.Jedis
/**
* 将三度关系数据导出到Redis
*
*/
object ExportData2RedisScala {
def main(args: Array[String]): Unit = {
var filePath = "hdfs://172.16.185.176:9000/data/recommend/20240101"
var redisHost = "172.16.185.176"
var redisPort = 6379
// 在集群中执行
if (args.length > 0) {
filePath = args(0)
redisHost = args(1)
redisPort = args(2).toInt
}
val env = ExecutionEnvironment.getExecutionEnvironment
// 读取HDFS数据
val text = env.readTextFile(filePath)
// 添加隐式转换代码
import org.apache.flink.api.scala._
// 处理数据
text.mapPartition(it => {
// 连接到jedis
val jedis = new Jedis(redisHost, redisPort)
// 开启管道
val pipeline = jedis.pipelined()
it.foreach(line => {
val fields = line.split("\t")
// 获取uid
val uid = fields(0)
// 获取待推荐主播列表
val uids = fields(1).split(",")
val key = "recommend_list_" + uid
// 先删除,保证每周更新一次
jedis.del(key)
for (uid <- uids) {
pipeline.rpush(key, uid)
// 设置过期时间
pipeline.expire(key, 30 * 24 * 60 * 60)
}
})
// 同步执行管道命令
pipeline.sync()
// 关闭jedis连接
jedis.close()
""
}).print()
}
}
提交到Flink集群的作业脚本如下。
> cd /home/work/recommend/jobs
> vi exportData2Redis.sh
#!/bin/bash
# 默认获取上周一的时间
dt=`date -d "7 days ago" + "%Y%m%d"`
if [ "d$1" != "d" ]
then
dt=`date -d "7 days ago $1" + "%Y%m%d"`
fi
# HDFS输入数据路径
filePath="hdfs://172.16.185.176:9000/data/recommend/${dt}"
masterUrl="yarn-cluster"
appName="ExportData2RedisScala"`date + %s`
redisHost="172.16.185.176"
redisPort=6379
flink run \
-m ${masterUrl} \
-ynm ${appName} \
-yqu default \
-yjm 1024 \
-ytm 1024 \
-ys 1 \
-p 5 \
-c com.itechthink.recommend.flink.ExportData2RedisScala \
/home/work/recommend2/jobs/export_data2_redis-2.0-SNAPSHOT-jar-with-dependencies.jar ${filePath} ${redisHost} ${redisPort}
# 验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $8}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi
感谢支持
更多内容,请移步《超级个体》。