原创大约 2 分钟
原创大约 8 分钟
原创大约 6 分钟
原创大约 3 分钟
原创大约 3 分钟
整体的数据计算指标需求如下。
-
任务1:将数据库中的粉丝历史关注数据
follow_00
~follow_09
批量导入到Neo4j中,这个任务在初始化的时候只需要执行一次。 -
任务2:通过Spark实时维护粉丝的关注与取消关注数据,它的数据源自于Kafka中的
user_follow
这个Topic
,它的计算结果也保存到Neo4j。 -
任务3:通过Spark每天计算并更新用户的活跃时间,如果当天活跃过,就更新最近活跃时间为当前日期,这是针对主播和粉丝都会进行的计算任务,它的数据源自于HDFS中的
user_active
,计算结果保存到Neo4j。 -
任务4:通过Spark每天计算并更新主播等级数据,这项计算任务仅针对主播,它的数据源自于HDFS中的
user_level
,计算结果保存到Neo4j。 -
任务5:通过Spark每周计算主播最近30天的内容评级,它的数据源自于HDFS中的
live_info
,计算结果保存到Neo4j。 -
任务6:通过Spark每周计算主播近一个周的三度关系列表,参与计算的主播需要满足下面几个前提条件。
-
主播近一周内处于活跃状态。
-
主播等级 > 10。
-
主播近30天至少3条内容评价满足A+。
-
主播粉丝列表关注重合度 > 2。
-
-
任务7:每周将计算结果导出到MySQL。
原创大约 2 分钟
# 将之前导出的数据文件follower_0${i}.log拷贝到neo4j的import目录
> cp /var/logs/follower_*.log /home/work/neo4j-community-5.23.0/import
> cd /home/work/neo4j-community-5.23.0
> ./bin/cypher-shell -a neo4j://172.16.185.176:7687 -u root -p 12345678
# 创建索引
neo4j> CREATE CONSTRAINT ON (u:User) ASSERT u.uid IS UNIQUE;
# 逐个导入数据
neo4j> LOAD CSV FROM 'file:///follower_00.log' AS row
MERGE (viewer:User {uid: toString(row[1])})
MERGE (anchor:User {uid: toString(row[2])})
MERGE (viewer) -[:follow]-> (anchor);
neo4j> LOAD CSV FROM 'file:///follower_01.log' AS row
MERGE (viewer:User {uid: toString(row[1])})
MERGE (anchor:User {uid: toString(row[2])})
MERGE (viewer) -[:follow]-> (anchor);
......
neo4j> LOAD CSV FROM 'file:///follower_09.log' AS row
MERGE (viewer:User {uid: toString(row[1])})
MERGE (anchor:User {uid: toString(row[2])})
MERGE (viewer) -[:follow]-> (anchor);
原创小于 1 分钟
这一部分的数据来源于Kafka中的user_follow
这个Topic
,它的计算结果会被保存到Neo4j。
它的数据格式如下。
{
"fid":"1",
"uid":"2",
"time":1718102763768,
"type":"follow", # follow表示关注,或者为unfollow,表示取消关注
"desc":"follow" # 同上
}
原创大约 6 分钟