任务4:计算主播等级
原创大约 3 分钟
这一部分的数据来源于服务端MySQL数据库导出到HDFS中的user_level
,计算结果保存到Neo4j。
它的数据格式如下。
CREATE TABLE t_user_level
(
id BIGINT NOT NULL,
uid BIGINT NOT NULL,
vexpress INT(11) NOT NULL,
vlevel INT(11) NOT NULL,
CREATEtime TIMESTAMP NOT NULL default CURRENT_TIMESTAMP,
updatetime TIMESTAMP NOT NULL default CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
express INT(11) NOT NULL,
level INT(11) NOT NULL
);
Spark
通过Spark每天定时计算并更新主播的等级,然后更新到Neo4j中。
它的依赖和编译、打包插件同前一个任务一致。
Scala代码如下。
package com.itechthink.recommend.spark
import org.apache.spark.{SparkConf, SparkContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase}
/**
* 每天定时更新主播等级
*
*/
object UpdateUserLevelScala {
def main(args: Array[String]): Unit = {
var masterUrl = "local"
var appName = "UpdateUserLevelScala"
var filePath = "hdfs://172.16.185.176:9000/data/user_level/20240101"
var neo4jUrl = "neo4j://172.16.185.176:7687"
var username = "neo4j"
var password = "12345678"
// 在集群中执行
if (args.length > 0) {
masterUrl = args(0)
appName = args(1)
filePath = args(2)
neo4jUrl = args(3)
username = args(4)
password = args(5)
}
// 获取SparkContext
val conf = new SparkConf()
.setMaster(masterUrl)
.setAppName(appName)
val sc = new SparkContext(conf)
// 读取用户等级数据
val linesRDD = sc.textFile(filePath)
// 校验数据准确性
val filterRDD = linesRDD.filter(line => {
val fields = line.split("\t")
// 判断每一行的列数是否正确,以及这一行是不是表头
if (fields.length == 8 && !fields(0).equals("id")) {
true
} else {
false
}
})
// 处理数据
filterRDD.foreachPartition(it => {
// 获取neo4j的连接
val driver = GraphDatabase.driver(neo4jUrl, AuthTokens.basic(username, password))
// 开启一个会话
val session = driver.session()
it.foreach(line => {
val fields = line.split("\t")
// 添加等级
session.run("merge(u:User {uid:'" + fields(1).trim + "'}) set u.level = " + fields(3).trim)
})
// 关闭会话
session.close()
// 关闭连接
driver.close()
})
}
}
提交到Spark集群的作业脚本如下。
> cd /home/work/recommend/jobs
> vi updateUserLevel.sh
#!/bin/bash
# 默认获取前一天的时间
dt=`date -d "1 days ago" + "%Y%m%d"`
if [ "x$1" != "x" ]
then
dt=$1
fi
# HDFS输入数据路径
filePath="hdfs://172.16.185.176:9000/data/user_level/${dt}"
masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`
appName="UpdateUserLevelScala"`date + %s`
neo4jUrl="neo4j://172.16.185.176:7687"
username="neo4j"
password="12345678"
dependencies="hdfs://172.16.185.176:9000/dependencies"
spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.itechthink.recommend.spark.UpdateUserLevelScala \
--jars ${dependencies}/neo4j-java-driver-4.1.1.jar,${dependencies}/reactive-streams-1.0.3.jar \
/home/work/recommend/jobs/update_user_level-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${filePath} ${neo4jUrl} ${username} ${password}
# 验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $7}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi
Flink
通过Flink每天定时计算并更新主播的等级,然后更新到Neo4j中。
它的依赖和编译、打包插件同前一个任务一致。
Scala代码如下。
package com.itechthink.recommend.flink
import org.apache.flink.api.scala.ExecutionEnvironment
import org.neo4j.driver.{AuthTokens, GraphDatabase}
/**
* 每天定时更新主播等级
*
*/
object UpdateUserLevelScala {
def main(args: Array[String]): Unit = {
var filePath = "hdfs://172.16.185.176:9000/data/user_level/20240101"
var neo4jUrl = "neo4j://172.16.185.176:7687"
var username = "neo4j"
var password = "12345678"
// 在集群中执行
if (args.length > 0) {
filePath = args(0)
neo4jUrl = args(1)
username = args(2)
password = args(3)
}
val env = ExecutionEnvironment.getExecutionEnvironment
// 读取hdfs中的数据
val text = env.readTextFile(filePath)
// 校验数据准确性
val filterSet = text.filter(line => {
val fields = line.split("\t")
if (fields.length == 8 && !fields(0).equals("id")) {
true
} else {
false
}
})
// 添加隐式转换代码
import org.apache.flink.api.scala._
// 处理数据
filterSet.mapPartition(it => {
// 连接到neo4j
val driver = GraphDatabase.driver(neo4jUrl, AuthTokens.basic(username, password))
// 开启一个会话
val session = driver.session()
it.foreach(line => {
val fields = line.split("\t")
// 添加等级
session.run("merge (u:User {uid:'" + fields(1).trim + "'}) set u.level = " + fields(3).trim)
})
// 关闭会话
session.close()
// 关闭连接
driver.close()
""
}).print()
}
}
提交到Flink集群的作业脚本如下。
> cd /home/work/recommend2/jobs
> vi updateUserLevel.sh
#!/bin/bash
# 默认获取前一天时间
dt=`date -d "1 days ago" + "%Y%m%d"`
if [ "d$1" != "d" ]
then
dt=$1
fi
# HDFS输入数据路径
filePath="hdfs://172.16.185.176:9000/data/user_level/${dt}"
masterUrl="yarn-cluster"
appName="UpdateUserLevelScala"`date + %s`
neo4jUrl="neo4j://172.16.185.176:7687"
username="neo4j"
password="12345678"
flink run \
-m ${masterUrl} \
-ynm ${appName} \
-yqu default \
-yjm 1024 \
-ytm 1024 \
-ys 1 \
-p 5 \
-c com.itechthink.recommend.flink.UpdateUserLevelScala \
/home/work/recommend2/jobs/update_user_level-2.0-SNAPSHOT-jar-with-dependencies.jar ${filePath} ${neo4jUrl} ${username} ${password}
# 验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $8}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi
感谢支持
更多内容,请移步《超级个体》。