任务6:计算三度关系
原创大约 8 分钟
这一部分的数据来源于前几步在Neo4j中生成的结果,计算结果保存到HDFS的/data/recommend/
目录中。
Spark
通过Spark每周定时计算最近一周内活跃主播的三度关系列表,然后将结果保存到HDFS中。
它的依赖和编译、打包插件同前一个任务一致。
Scala代码如下。
package com.itechthink.recommend.spark
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase}
import scala.collection.mutable.ArrayBuffer
/**
* 每周定时计算最近一周内活跃主播的三度关系列表
* 待推荐主播最近一周内活跃过
* 待推荐主播等级 > 10
* 待推荐主播最近30天视频评级满足3A+
* 待推荐主播的粉丝列表关注重合度 > 2
*
*/
object GetRecommendListScala {
def main(args: Array[String]): Unit = {
var masterUrl = "local"
var appName = "GetRecommendListScala"
var neo4jUrl = "neo4j://172.16.185.176:7687"
var username = "neo4j"
var password = "admin"
// 最近一周内是否活跃过
var timestamp = 0L
// 粉丝列表关注重合度
var duplicateNum = 2
// 主播等级
var level = 10
var outputPath = "hdfs://172.16.185.176:9000/data/recommend/20240101"
// 在集群中执行
if (args.length > 0) {
masterUrl = args(0)
appName = args(1)
neo4jUrl = args(2)
username = args(3)
password = args(4)
timestamp = args(5).toLong
duplicateNum = args(6).toInt
level = args(7).toInt
outputPath = args(8)
}
// 获取SparkContext
val conf = new SparkConf()
.setAppName(appName)
.setMaster(masterUrl)
// 允许创建多个context
.set("spark.driver.allowMultipleContexts", "true")
// neo4j的地址
.set("spark.neo4j.url", neo4jUrl)
// neo4j用户名
.set("spark.neo4j.user", username)
// neo4j密码
.set("spark.neo4j.password", password)
val sc = new SparkContext(conf)
// 获取一周内活跃且等级大于10的主播
var params = Map[String, Long]()
params += ("timestamp" -> timestamp)
params += ("level" -> level)
val neo4j = Neo4j(sc).cypher("match (a:User) where a.timestamp >= {timestamp} and a.level >= {level} return a.uid").params(params)
// 7个线程并行查询neo4j数据库
val rowRDD = neo4j.loadRowRdd.repartition(7)
// 一次处理一批
// 过滤出粉丝关注重合度 > 2且对关注重合度倒序排列
// 数据格式:主播id,待推荐的主播id
val mapRDD = rowRDD.mapPartitions(it => {
// 获取neo4j的连接
val driver = GraphDatabase.driver(neo4jUrl, AuthTokens.basic(username, password))
// 开启一个会话
val session = driver.session()
// 保存计算出来的结果
val resultArr = ArrayBuffer[String]()
it.foreach(row => {
val uid = row.getString(0)
// 计算一个用户的三度关系
// val result = session.run("match (a:User {uid:'" + uid + "'}) <-[:follow]- (b:User) -[:follow]-> (c:User) return a.uid as auid, c.uid as cuid, count(c.uid) as sum order by sum desc limit 30")
// 过滤b、c的活跃时间,再对c的level和flag值进行过滤
val result = session.run("match (a:User {uid:'" + uid + "'}) <-[:follow]- (b:User) -[:follow]-> (c:User) " +
"where b.timestamp >= " + timestamp + " and c.timestamp >= " + timestamp + " and c.level >= " + level + " and c.flag = 1 " +
"return a.uid as auid, c.uid as cuid, count(c.uid) as sum order by sum desc limit 30")
while (result.hasNext) {
val record = result.next()
val sum = record.get("sum").asInt()
if (sum > duplicateNum) {
resultArr += record.get("auid").asString() + "\t" + record.get("cuid").asString()
}
}
})
// 关闭会话
session.close()
// 关闭连接
driver.close()
resultArr.iterator
}).persist(StorageLevel.MEMORY_AND_DISK) // 把RDD数据缓存起来
// 把数据转成tuple2的形式
val tup2RDD = mapRDD.map(line => {
val splits = line.split("\t")
(splits(0), splits(1))
})
// 根据主播id进行分组,可以获取到这个主播的待推荐列表
val reduceRDD = tup2RDD.reduceByKey((v1, v2) => {
v1 + "," + v2
})
// 把结果组装成:1001 1002,1003,1004
reduceRDD.map(tup => {
tup._1 + "\t" + tup._2
}).repartition(1).saveAsTextFile(outputPath)
}
}
提交到Spark集群的作业脚本如下。
> cd /home/work/recommend/jobs
> vi getRecommendList.sh
#!/bin/bash
# 默认获取上周一的时间
dt=`date -d "7 days ago" + "%Y%m%d"`
if [ "d$1" != "d" ]
then
dt=`date -d "7 days ago $1" + "%Y%m%d"`
fi
masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`
appName="GetRecommendListScala"`date + %s`
neo4jUrl="neo4j://172.16.185.176:7687"
username="neo4j"
password="12345678"
# 获取上周一的时间戳(单位:毫秒)
timestamp=`date --date="${dt}" + %s`000
# 粉丝列表关注重合度
duplicateNum=2
# 主播等级
level=10
# 输出结果数据路径
outputPath="hdfs://172.16.185.176:9000/data/recommend/${dt}"
dependencies="hdfs://172.16.185.176:9000/dependencies"
spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.itechthink.recommend.spark.GetRecommendListScala \
--jars ${dependencies}/neo4j-java-driver-4.1.1.jar,${dependencies}/reactive-streams-1.0.3.jar,${dependencies}/neo4j-spark-connector-2.4.5-M1.jar \
/home/work/recommend/jobs/get_recommend_list-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${neo4jUrl} ${username} ${password} ${timestamp} ${duplicateNum} ${level} ${outputPath}
# 验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $7}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi
Flink
通过Flink每周定时计算最近一周内活跃主播的三度关系列表,然后将结果保存到HDFS中。
它的依赖和编译、打包插件同前一个任务一致。
Scala代码如下。
package com.itechthink.recommend.flink
import org.apache.flink.api.scala.ExecutionEnvironment
import org.neo4j.driver.{AuthTokens, GraphDatabase}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
/**
* 每周定时计算最近一周内活跃主播的三度关系列表
* 待推荐主播最近一周内活跃过
* 待推荐主播等级 > 10
* 待推荐主播最近30天视频评级满足3A+
* 待推荐主播的粉丝列表关注重合度 > 2
*
*/
object GetRecommendListScala {
def main(args: Array[String]): Unit = {
var appName = "GetRecommendListScala"
var neo4jUrl = "neo4j://172.16.185.176:7687"
var username = "neo4j"
var password = "12345678"
// 最近一周内是否活跃过
var timestamp = 0L
var dumplicateNum = 2
var level = 10
var outputPath = "hdfs://172.16.185.176:9000/data/recommend/20240101"
// 在集群中执行
if (args.length > 0) {
appName = args(0)
neo4jUrl = args(1)
username = args(2)
password = args(3)
timestamp = args(4).toLong
dumplicateNum = args(5).toInt
level = args(6).toInt
outputPath = args(7)
}
val env = ExecutionEnvironment.getExecutionEnvironment
// 添加隐式转换代码
import org.apache.flink.api.scala._
val param = Map("boltUrl" -> neo4jUrl, "username" -> username, "password" -> password, "timestamp" -> timestamp.toString, "level" -> level.toString)
// 自定义输入数据源,因为这里并不是用的StreamExecutionEnvironment,所以无法使用addSource
// 读取neo4j中的数据,获取一周内活跃且等级大于10的主播
val uidSet = env.createInput(new Neo4jInputFormat(param))
// 一次处理一批
// 过滤出粉丝关注重合度>2的且对关注重合度倒序排序
// 数据格式是:主播id,待推荐的主播id
val mapSet = uidSet.mapPartition(it => {
// 连接到neo4j
val driver = GraphDatabase.driver(neo4jUrl, AuthTokens.basic(username, password))
// 开启会话
val session = driver.session()
// 保存计算结果
val resultArr = ArrayBuffer[String]()
it.foreach(uid => {
// 计算主播的三度关系
// val result = session.run("match (a:User {uid:'" + uid + "'}) <-[:follow]- (b:User) -[:follow]-> (c:User) return a.uid as auid, c.uid as cuid ,count(c.uid) as sum order by sum desc limit 30")
// 对b、c的主活时间进行过滤,以及对c的level和flag值进行过滤
val result = session.run("match (a:User {uid:'" + uid + "'}) <-[:follow]- (b:User) -[:follow]-> (c:User)" +
" where b.timestamp >= " + timestamp + " and c.timestamp >= " + timestamp + " and c.level >= " + level +
" and c.flag = 1 return a.uid as auid, c.uid as cuid, count(c.uid) as sum order by sum desc limit 30")
while (result.hasNext) {
val record = result.next()
val sum = record.get("sum").asInt()
if (sum > dumplicateNum) {
resultArr += record.get("auid").asString() + "\t" + record.get("cuid").asString()
}
}
})
// 关闭会话
session.close()
// 关闭连接
driver.close()
resultArr.iterator
})
// 把数据转成tupl2的形式
val tup2Set = mapSet.map(line => {
val splits = line.split("\t")
(splits(0), splits(1))
})
// 根据主播id分组,获取到主播的待推荐列表
val reduceSet = tup2Set.groupBy(_._1).reduceGroup(it => {
val list = it.toList
val tmpList = ListBuffer[String]()
for (l <- list) {
tmpList += l._2
}
// 把结果组装成:1001 1002,1003,1004
(list.head._1, tmpList.toList.mkString(","))
})
// writeAsCsv只能保存tuple类型的数据
// writerAsText可以支持任何类型,如果是对象,会调用对象的toString方法写入到文件中
reduceSet.writeAsCsv(outputPath, "\n", "\t")
// 执行任务
env.execute(appName)
}
}
package com.itechthink.recommend.flink
import org.apache.flink.api.common.io.statistics.BaseStatistics
import org.apache.flink.api.common.io.{DefaultInputSplitAssigner, NonParallelInput, RichInputFormat}
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.io.{GenericInputSplit, InputSplit, InputSplitAssigner}
import org.neo4j.driver.{AuthTokens, Driver, GraphDatabase, Result, Session}
/**
* 从Neo4j中查询满足条件的主播
* 一周内活跃过且主播等级大于10
*
*/
class Neo4jInputFormat extends RichInputFormat[String, InputSplit] with NonParallelInput {
// with NonParallelInput 表示此组件不支持多并行度
// 保存neo4j相关的配置参数
var param: Map[String, String] = Map()
var driver: Driver = _
var session: Session = _
var result: Result = _
/**
* 构造函数
* 接收neo4j相关的配置参数
*
*/
def this(param: Map[String, String]) {
this()
this.param = param
}
/**
* 配置此输入格式
*
*/
override def configure(parameters: Configuration): Unit = {}
/**
* 获取输入数据的基本统计信息
*
*/
override def getStatistics(cachedStatistics: BaseStatistics): BaseStatistics = {
cachedStatistics
}
/**
* 对输入数据切分split
*
*/
override def createInputSplits(minNumSplits: Int): Array[InputSplit] = {
Array(new GenericInputSplit(0, 1))
}
/**
* 获取切分的split
*
*/
override def getInputSplitAssigner(inputSplits: Array[InputSplit]): InputSplitAssigner = {
new DefaultInputSplitAssigner(inputSplits)
}
/**
* 初始化方法,仅执行一次
* 获取neo4j连接,开启会话
*
*/
override def openInputFormat(): Unit = {
// 初始化Neo4j连接
this.driver = GraphDatabase.driver(param("neo4jUrl"), AuthTokens.basic(param("username"), param("password")))
// 开启会话
this.session = driver.session()
}
/**
* 关闭neo4j连接
*
*/
override def closeInputFormat(): Unit = {
if (driver != null) {
driver.close()
}
}
/**
* 此方法也只执行一次
*
*/
override def open(split: InputSplit): Unit = {
this.result = session.run("match (a:User) where a.timestamp >= " + param("timestamp") +
" and a.level >= " + param("level") + " return a.uid")
}
/**
* 数据读取完毕后需要返回true
*
*/
override def reachedEnd(): Boolean = {
!result.hasNext
}
/**
* 读取结果数据,一次读取一条
*
*/
override def nextRecord(reuse: String): String = {
val record = result.next()
val uid = record.get(0).asString()
uid
}
/**
* 关闭会话
*
*/
override def close(): Unit = {
if (session != null) {
session.close()
}
}
}
提交到Flink集群的作业脚本如下。
> cd /home/work/recommend/jobs
> vi getRecommendList.sh
#!/bin/bash
# 默认获取上周一的时间
dt=`date -d "7 days ago" + "%Y%m%d"`
if [ "d$1" != "d" ]
then
dt=`date -d "7 days ago $1" + "%Y%m%d"`
fi
masterUrl="yarn-cluster"
appName="GetRecommendListScala"`date + %s`
neo4jUrl="neo4j://172.16.185.176:7687"
username="neo4j"
password="12345678"
# 获取上周一的时间戳(单位:毫秒)
timestamp=`date --date="${dt}" +%s`000
dumplicateNum=2
level=4
# 输出结果数据路径
outputPath="hdfs://172.16.185.176:9000/data/recommend/${dt}"
flink run \
-m ${masterUrl} \
-ynm ${appName} \
-yqu default \
-yjm 1024 \
-ytm 1024 \
-ys 1 \
-p 5 \
-c com.itechthink.recommend.flink.GetRecommendListScala \
/home/work/recommend2/jobs/get_recommend_list-2.0-SNAPSHOT-jar-with-dependencies.jar ${appName} ${neo4jUrl} ${username} ${password} ${timestamp} ${dumplicateNum} ${level} ${outputPath}
# 验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $8}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi
感谢支持
更多内容,请移步《超级个体》。