任务3:统计活跃时间
原创大约 3 分钟
这一部分的数据来源于客户端上报到HDFS中的user_active
,计算结果保存到Neo4j。
它的数据格式如下。
{
"uid":"1",
"version":"1.2.3",
"country":"China",
"ip":"102.247.59.36",
"timestamp":1718102763768,
"type":"user_active"
}
Spark
通过Spark每天定时计算并更新用户的活跃时间,然后更新到Neo4j中。
它的依赖和编译、打包插件同前一个任务一致。
Scala代码如下。
package com.itechthink.recommend.spark
import com.alibaba.fastjson.JSON
import org.apache.spark.{SparkConf, SparkContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase}
/**
* 每天定时更新用户活跃时间
*
*/
object UpdateUserActiveScala {
def main(args: Array[String]): Unit = {
var masterUrl = "local"
var appName = "UpdateUserActiveScala"
var filePath = "hdfs://172.16.185.176:9000/data/user_active/20240101"
var neo4jUrl = "neo4j://172.16.185.176:7687"
var username = "neo4j"
var password = "12345678"
// 在集群中执行
if (args.length > 0) {
masterUrl = args(0)
appName = args(1)
filePath = args(2)
neo4jUrl = args(3)
username = args(4)
password = args(5)
}
// 获取SparkContext
val conf = new SparkConf()
.setMaster(masterUrl)
.setAppName(appName)
val sc = new SparkContext(conf)
// 读取用户活跃数据
val linesRDD = sc.textFile(filePath)
// 处理数据
linesRDD.foreachPartition(it => {
// 获取neo4j的连接
val driver = GraphDatabase.driver(neo4jUrl, AuthTokens.basic(username, password))
// 开启一个会话
val session = driver.session()
it.foreach(line => {
val jsonObj = JSON.parseObject(line)
val uid = jsonObj.getString("uid")
val timestamp = jsonObj.getString("timestamp")
// 添加用户活跃时间
session.run("merge(u:User {uid:'" + uid + "'}) set u.timestamp = " + timestamp)
})
// 关闭会话
session.close()
// 关闭连接
driver.close()
})
}
}
提交到Spark集群的作业脚本如下。
> cd /home/work/recommend/jobs
> vi updateUserActive.sh
#!/bin/bash
# 默认获取前一天的时间
dt=`date -d "1 days ago" + "%Y%m%d"`
if [ "d$1" != "d" ]
then
dt=$1
fi
# HDFS输入数据路径
filePath="hdfs://172.16.185.176:9000/data/user_active/${dt}"
masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`
appName="UpdateUserActiveScala"`date + %s`
neo4jUrl="neo4j://172.16.185.176:7687"
username="neo4j"
password="12345678"
dependencies="hdfs://172.16.185.176:9000/dependencies"
spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.itechthink.recommend.spark.UpdateUserActiveScala \
--jars ${dependencies}/fastjson-1.2.68.jar,,${dependencies}/neo4j-java-driver-4.1.1.jar,${dependencies}/reactive-streams-1.0.3.jar \
/home/work/recommend/jobs/update_user_active-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${filePath} ${neo4jUrl} ${username} ${password}
# 验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $7}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi
Flink
通过Flink每天定时计算并更新用户的活跃时间,然后更新到Neo4j中。
它的依赖和编译、打包插件同前一个任务一致。
Scala代码如下。
package com.itechthink.recommend.flink
import com.alibaba.fastjson.JSON
import org.apache.flink.api.scala.ExecutionEnvironment
import org.neo4j.driver.{AuthTokens, GraphDatabase}
/**
* 每天定时更新用户活跃时间
*
*/
object UpdateUserActiveScala {
def main(args: Array[String]): Unit = {
var filePath = "hdfs://172.16.185.176:9000/data/user_active/20240101"
var neo4jUrl = "neo4j://172.16.185.176:7687"
var username = "neo4j"
var password = "12345678"
// 在集群中执行
if (args.length > 0) {
filePath = args(0)
neo4jUrl = args(1)
username = args(2)
password = args(3)
}
val env = ExecutionEnvironment.getExecutionEnvironment
// 读取hdfs中的数据
val text = env.readTextFile(filePath)
// 添加隐式转换代码
import org.apache.flink.api.scala._
// 处理数据
text.mapPartition(it => {
// 连接到neo4j
val driver = GraphDatabase.driver(neo4jUrl, AuthTokens.basic(username, password))
// 开启一个会话
val session = driver.session()
it.foreach(line => {
val jsonObj = JSON.parseObject(line)
val uid = jsonObj.getString("uid")
val timestamp = jsonObj.getString("timestamp")
// 添加用户活跃时间
session.run("merge (u:User {uid:'" + uid + "'}) set u.timestamp = " + timestamp)
})
// 关闭会话
session.close()
// 关闭连接
driver.close()
""
}).print()
}
}
提交到Flink集群的作业脚本如下。
> cd /home/work/recommend2/jobs
> vi updateUserActive.sh
#!/bin/bash
# 默认获取前一天时间
dt=`date -d "1 days ago" + "%Y%m%d"`
if [ "d$1" != "d" ]
then
dt=$1
fi
# HDFS输入数据路径
filePath="hdfs://172.16.185.176:9000/data/user_active/${dt}"
masterUrl="yarn-cluster"
appName="UpdateUserActiveScala"`date + %s`
neo4jUrl="neo4j://172.16.185.176:7687"
username="neo4j"
password="12345678"
flink run \
-m ${masterUrl} \
-ynm ${appName} \
-yqu default \
-yjm 1024 \
-ytm 1024 \
-ys 1 \
-p 5 \
-c com.itechthink.recommend.flink.UpdateUserActiveScala \
/home/work/recommend2/jobs/update_user_active-2.0-SNAPSHOT-jar-with-dependencies.jar ${filePath} ${neo4jUrl} ${username} ${password}
# 验证任务执行状态
appStatus=`yarn application -appStates FINISHED -list | grep ${appName} | awk '{print $8}'`
if [ "${appStatus}" != "SUCCEEDED" ]
then
echo "任务执行失败"
# 发送短信或者邮件
else
echo "任务执行成功"
fi
感谢支持
更多内容,请移步《超级个体》。