任务5:统计内容评级
原创大约 6 分钟
这一部分的数据来源于服务端上报到HDFS中的live_info
,计算结果保存到Neo4j。
它的数据格式如下。
{
"area":"asia_china",
"pv":183,
"uv":183,
"hosts":183,
"gifter":28,
"nofollow":257,
"length":5741,
"rating":"A",
"smlook":183,
"type":"live",
"gold":183,
"uid":2,
"nickname":"lixingyun",
"looktime":183,
"id":"208815658792108453",
"express":183,
"timestamp":1718102763768
}
Spark
通过Spark每周定时计算主播最近30天的内容评级,然后更新到Neo4j中。
它的依赖和编译、打包插件同前一个任务一致。
Scala代码如下。
package com.itechthink.recommend.spark
import com.alibaba.fastjson.JSON
import org.apache.spark.{SparkConf, SparkContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase}
import org.slf4j.LoggerFactory
/**
* 每周计算最近30天主播的内容评级
* 内容评价标准为:S、A、B、C、D
* 把最近几次内容评级在A+的主播在neo4j中设置flag=1
*
*/
object UpdateLiveInfoScala {
private val logger = LoggerFactory.getLogger("UpdateLiveInfoScala")
def main(args: Array[String]): Unit = {
var masterUrl = "local"
var appName = "UpdateLiveInfoScala"
var filePath = "hdfs://172.16.185.176:9000/data/live_info/20240101"
var neo4jUrl = "neo4j://172.16.185.176:7687"
var username = "neo4j"
var password = "12345678"
// 在集群中执行
if (args.length > 0) {
masterUrl = args(0)
appName = args(1)
filePath = args(2)
neo4jUrl = args(3)
username = args(4)
password = args(5)
}
// 在Driver端执行此代码,将flag=1的重置为0
// 获取neo4j的连接
val driver = GraphDatabase.driver(neo4jUrl, AuthTokens.basic(username, password))
// 开启一个会话
val session = driver.session()
// 在执行程序之前需要先把flag=1的重置为0
session.run("match(a:User) where a.flag = 1 set a.flag = 0")
// 关闭会话
session.close()
// 关闭连接
driver.close()
// 获取SparkContext
val conf = new SparkConf()
.setMaster(masterUrl)
.setAppName(appName)
val sc = new SparkContext(conf)
// 读取内容评级数据
val linesRDD = sc.textFile(filePath)
// 解析数据中的uid,rating,timestamp
val tup3RDD = linesRDD.map(line => {
try {
val jsonObj = JSON.parseObject(line)
val uid = jsonObj.getString("uid")
val rating = jsonObj.getString("rating")
val timestamp: Long = jsonObj.getLong("timestamp")
(uid, rating, timestamp)
} catch {
case _: Exception => logger.error("json数据解析失败:" + line)
("0", "0", 0L)
}
})
// 过滤异常数据
val filterRDD = tup3RDD.filter(_._2 != "0")
// 获取用户最近3场直播(内容)的评级信息
val top3RDD = filterRDD.groupBy(_._1).map(group => {
// 获取最近3次开播的数据,使用制表符拼接成一个字符串
// uid,rating,timestamp \t uid,rating,timestamp \t uid,rating,timestamp
val top3 = group._2.toList.sortBy(_._3).reverse.take(3).mkString("\t")
(group._1, top3)
})
// 过滤出满足3场A+的数据
val top3ARDD = top3RDD.filter(tup => {
var flag = false
val fields = tup._2.split("\t")
if (fields.length == 3) {
// 3场A+,表示里面没有出现B、C和D
val tmp_str = fields(0).split(",")(1) + "," + fields(1).split(",")(1) + "," + fields(2).split(",")(1)
if (!tmp_str.contains("B") && !tmp_str.contains("C") && !tmp_str.contains("D")) {
flag = true
}
}
flag
})
// 把满足3场A+的数据更新到neo4j中
// 还要求主播等级>=15
top3ARDD.foreachPartition(it => {
// 获取neo4j的连接
val driver = GraphDatabase.driver(neo4jUrl, AuthTokens.basic(username, password))
// 开启一个会话
val session = driver.session()
it.foreach(tup => {
session.run("match (a:User {uid:'" + tup._1 + "'}) where a.level >= 15 set a.flag = 1")
})
// 关闭会话
session.close()
// 关闭连接
driver.close()
})
}
}
提交到Spark集群的作业脚本如下。
> cd /home/work/recommend/jobs
> vi updateLiveInfo.sh
#!/bin/bash
# 获取最近30天的数据目录
filepath=""
for((i=1;i<=30;i++))
do
filepath+="hdfs://172.16.185.176:9000/data/live_info/"`date -d "$i days ago" + "%Y%m%d"`,
done
# 默认获取前一天的时间
dt=`date -d "1 days ago" + "%Y%m%d"`
if [ "x$1" != "x" ]
then
dt=$1
fi
# HDFS输入数据路径
filePath="hdfs://172.16.185.176:9000/data/live_info/${dt}"
masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`
appName="UpdateLiveInfoScala"`date + %s`
neo4jUrl="neo4j://172.16.185.176:7687"
username="neo4j"
password="12345678"
dependencies="hdfs://172.16.185.176:9000/dependencies"
spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.itechthink.recommend.spark.UpdateLiveInfoScala \
--jars ${dependencies}/fastjson-1.2.68.jar,${dependencies}/neo4j-java-driver-4.1.1.jar,${dependencies}/reactive-streams-1.0.3.jar \
/home/work/recommend/jobs/update_live_info-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${filePath} ${neo4jUrl} ${username} ${password}
# 验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $7}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi
Flink
通过Flink每周定时计算主播最近30天的内容评级,然后更新到Neo4j中。
它的依赖和编译、打包插件同前一个任务一致。
Scala代码如下。
package com.itechthink.recommend.flink
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment
import org.neo4j.driver.{AuthTokens, GraphDatabase}
import org.slf4j.{Logger, LoggerFactory}
/**
* 每周计算最近30天主播的内容评级
* 内容评价标准为:S、A、B、C、D
* 把最近几次内容评级在A+的主播在neo4j中设置flag=1
* 因为Flink不支持读取多个hdfs目录,但可以使用union算子间接实现
*
*/
object UpdateLiveInfoScala {
private val logger = LoggerFactory.getLogger("UpdateLiveInfoScala")
def main(args: Array[String]): Unit = {
var filePath = "hdfs://172.16.185.176:9000/data/live_info/20240101,hdfs://172.16.185.176:9000/data/live_info/20260102"
var neo4jUrl = "neo4j://172.16.185.176:7687"
var username = "neo4j"
var password = "12345678"
// 在集群中执行
if (args.length > 0) {
filePath = args(0)
neo4jUrl = args(1)
username = args(2)
password = args(3)
}
val env = ExecutionEnvironment.getExecutionEnvironment
// 添加隐式转换代码
import org.apache.flink.api.scala._
// 使用union实现读取多个hdfs目录中的数据
val files = filePath.split(",")
var allText: DataSet[String] = env.fromElements("test")
for (file <- files) {
val text = env.readTextFile(file)
allText = allText.union(text)
}
println("全部的数据条数:" + allText.count())
//// 读取单个hdfs目录中的数据
//val text = env.readTextFile(filePath)
// 在Driver端执行此代码,将flag=1的值重置为0
// 连接到neo4j
val driver = GraphDatabase.driver(neo4jUrl, AuthTokens.basic(username, password))
// 开启会话
val session = driver.session()
session.run("match (a:User) where a.flag = 1 set a.flag = 0")
// 关闭会话
session.close()
// 关闭连接
driver.close()
// 解析多个数据文件中的uid、rating、timestamp
// 如果换成text就是解析单个数据文件中的数据
// val tup3Set = text.map(line => {
val tup3Set = allText.map(line => {
try {
val jsonObj = JSON.parseObject(line)
val uid = jsonObj.getString("uid")
val rating = jsonObj.getString("rating")
val timestamp: Long = jsonObj.getLong("timestamp")
(uid, rating, timestamp)
} catch {
case _: Exception => logger.error("json数据解析失败:" + line)
("0", "0", 0L)
}
})
// 过滤异常数据
val filterSet = tup3Set.filter(_._2 != "0")
// 获取用户最近3场直播评级信息
val top3Set = filterSet.groupBy(0)
.sortGroup(2, Order.DESCENDING)
.reduceGroup(it => {
val list = it.toList
// uid,rating,timestamp \t uid,rating,timestamp \t uid,rating,timestamp
val top3 = list.take(3).mkString("\t")
// 拼装成 (2002, (2002,A,1769913940002) (2002,A,1769913940001) (2002,A,1769913940000))
(list.head._1, top3)
})
// 过滤出来满足3场A+的数据
val top3BSet = top3Set.filter(tup => {
var flag = false
val fields = tup._2.split("\t")
if (fields.length == 3) {
//3场A+,表示里面没有出现B、C和D
val tmp_str = fields(0).split(",")(1) + "," + fields(1).split(",")(1) + "," + fields(2).split(",")(1)
if (!tmp_str.contains("B") && !tmp_str.contains("C") && !tmp_str.contains("D")) {
flag = true
}
}
flag
})
// 把满足3场A+的数据更新到neo4j中,设置flag=1
top3BSet.mapPartition(it => {
// 连接到neo4j
val driver = GraphDatabase.driver(neo4jUrl, AuthTokens.basic(username, password))
// 开启会话
val session = driver.session()
it.foreach(tup => {
session.run("match (a:User {uid:'" + tup._1 + "'}) where a.level >= 15 set a.flag = 1")
})
// 关闭会话
session.close()
// 关闭连接
driver.close()
""
}).print()
}
}
提交到Flink集群的作业脚本如下。
> cd /home/work/recommend/jobs
> vi updateLiveInfo.sh
#!/bin/bash
# 获取最近30天的数据目录
filepath=""
for((i=1;i<=30;i++))
do
filepath+="hdfs://bigdata01:9000/data/live_info/"`date -d "$i days ago" + "%Y%m%d"`,
done
# 默认获取前一天时间
dt=`date -d "1 days ago" + "%Y%m%d"`
if [ "d$1" != "d" ]
then
dt=$1
fi
# HDFS输入数据路径
filePath="hdfs://172.16.185.176:9000/data/live_info/${dt}"
# 去除尾部的逗号:${filepath:0:-1}
filePath=${filepath:0:-1}
masterUrl="yarn-cluster"
appName="UpdateLiveInfoScala"`date + %s`
neo4jUrl="neo4j://172.16.185.176:7687"
username="neo4j"
password="12345678"
flink run \
-m ${masterUrl} \
-ynm ${appName} \
-yqu default \
-yjm 1024 \
-ytm 1024 \
-ys 1 \
-p 5 \
-c com.itechthink.recommend.flink.UpdateVideoInfoScala \
/home/work/recommend2/jobs/update_video_info-2.0-SNAPSHOT-jar-with-dependencies.jar ${filePath} ${neo4jUrl} ${username} ${password}
# 验证任务执行状态,因为会有两个任务并行执行,所以需要判断 SUCCEEDED,SUCCEEDED
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{tmp=$8;getline;print tmp","$8}'`
if [ "${appStatus}" != "SUCCEEDED,SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi
感谢支持
更多内容,请移步《超级个体》。