任务2:维护实时关注数据
原创大约 6 分钟
这一部分的数据来源于Kafka中的user_follow
这个Topic
,它的计算结果会被保存到Neo4j。
它的数据格式如下。
{
"fid":"1",
"uid":"2",
"time":1718102763768,
"type":"follow", # follow表示关注,或者为unfollow,表示取消关注
"desc":"follow" # 同上
}
Spark
通过Spark Streaming实时维护粉丝关注的相关数据。
引入需要的相关依赖。
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<!-- neo4j相关依赖 -->
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
<version>4.1.1</version>
</dependency>
<dependency>
<groupId>neo4j-contrib</groupId>
<artifactId>neo4j-spark-connector</artifactId>
<version>2.4.5-M1</version>
</dependency>
然后加入编译、打包插件。
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
因为开发在Spark相关的应用时,Scala要比Java方便太多,所以这里使用Scala来编写代码。
package com.itechthink.recommend.spark
import com.alibaba.fastjson.JSON
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase, Transaction, TransactionWork}
/**
* 实时计算粉丝关注数据
*
*/
object RealTimeFollowScala {
def main(args: Array[String]): Unit = {
var masterUrl = "local"
var appName = "RealTimeFollowScala"
var seconds = 5
var kafkaBrokers = "172.16.185.176:9092"
var groupId = "group_id_1"
var topic = "user_follow"
var neo4jUrl = "neo4j://172.16.185.176:7687"
var username = "neo4j"
var password = "12345678"
// 在集群中执行
if (args.length > 0) {
masterUrl = args(0)
appName = args(1)
seconds = args(2).toInt
kafkaBrokers = args(3)
groupId = args(4)
topic = args(5)
neo4jUrl = args(6)
username = args(7)
password = args(8)
}
// 创建StreamingContext
val conf = new SparkConf().setMaster(masterUrl).setAppName(appName)
val ssc = new StreamingContext(conf, Seconds(seconds))
// 指定要读取的topic的名称
val topics = Array(topic)
// 指定kafka配置信息
val kafkaParams = Map[String, Object](
// broker地址信息
"bootstrap.servers" -> kafkaBrokers,
// key的序列化类型
"key.deserializer" -> classOf[StringDeserializer],
// value的序列化类型
"value.deserializer" -> classOf[StringDeserializer],
// 消费者组id
"group.id" -> groupId,
// 消费策略
"auto.offset.reset" -> "latest",
// 自动提交offset
"enable.auto.commit" -> (true: java.lang.Boolean)
)
// 获取消费kafka的数据流
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// 先将kafkaStream转换为rdd,然后调用rdd中的foreachPartition
kafkaStream.foreachRDD(rdd => {
// 一次处理一个分区的数据
rdd.foreachPartition(it => {
// 获取neo4j的连接
val driver = GraphDatabase.driver(neo4jUrl, AuthTokens.basic(username, password))
// 开启一个会话
val session = driver.session()
it.foreach(record => {
// 获取粉丝关注数据
val line = record.value()
// 解析数据
val userFollow = JSON.parseObject(line)
// 获取数据类型,关注或取消关注
val followType = userFollow.getString("type")
// 获取fid
val fid = userFollow.getString("fid")
// 获取uid
val uid = userFollow.getString("uid")
if ("follow".equals(followType)) {
// 添加关注:因为涉及多条命令,所以需要使用事务
session.writeTransaction(new TransactionWork[Unit]() {
override def execute(tx: Transaction): Unit = {
try {
tx.run("merge (:User {uid:'" + fid + "'})")
tx.run("merge (:User {uid:'" + uid + "'})")
tx.run("match(a:User {uid:'" + fid + "'}),(b:User {uid:'" + uid + "'}) merge (a) -[:follow]-> (b)")
// 提交事务
tx.commit()
} catch {
// 出现异常则回滚
case ex: Exception => tx.rollback()
}
}
})
} else {
// 取消关注
session.run("match (:User {uid:'" + fid + "'}) -[r:follow]-> (:User {uid:'" + uid + "'}) delete r")
}
})
// 关闭会话
session.close()
// 关闭连接
driver.close()
})
})
// 启动任务
ssc.start()
// 等待任务停止
ssc.awaitTermination()
}
}
提交到Spark集群的作业脚本如下。
> mkdir /home/work/recommend/jobs
> cd /home/work/recommend/jobs
> vi realTimeFollow.sh
#!/bin/bash
masterUrl="yarn-cluster"
master=`echo ${masterUrl} | awk -F'-' '{print $1}'`
deployMode=`echo ${masterUrl} | awk -F'-' '{print $2}'`
appName="RealTimeFollowScala"
seconds=5
kafkaBrokers="172.16.185.176:9092"
groupId="group_id_1"
topic="user_follow"
neo4jUrl="neo4j://172.16.185.176:7687"
username="neo4j"
password="12345678"
dependencies="hdfs://172.16.185.176:9000/dependencies"
spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.itechthink.recommend.spark.RealTimeFollowScala \
--jars ${dependencies}/fastjson-1.2.68.jar,${dependencies}/spark-streaming-kafka-0-10_2.11-2.4.3.jar,${dependencies}/neo4j-java-driver-4.1.1.jar,${dependencies}/kafka-clients-2.4.1.jar,${dependencies}/reactive-streams-1.0.3.jar \
/home/work/recommend/jobs/real_time_follow-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${seconds} ${kafkaBrokers} ${groupId} ${topic} ${neo4jUrl} ${username} ${password}
Flink
通过Flink实时维护粉丝关注的相关数据。
引入需要的相关依赖。
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
然后加入编译、打包插件。
<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.12</scalaCompatVersion>
<scalaVersion>2.12.11</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Scala代码如下。
package com.itechthink.recommend.flink
import java.util.Properties
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
/**
* 实时维护粉丝关注数据
* 消费kafka的user_follow数据,更新粉丝关注数据
*
*/
object RealTimeFollowScala {
def main(args: Array[String]): Unit = {
var appName = "RealTimeFollowScala"
var kafkaBrokers = "172.16.185.176:9092"
var groupId = "group_id_1"
var topic = "user_follow"
var neo4jUrl = "neo4j://172.16.185.176:7687"
var username = "neo4j"
var password = "12345678"
// 在集群中执行
if (args.length > 0) {
appName = args(0)
kafkaBrokers = args(1)
groupId = args(2)
topic = args(3)
neo4jUrl = args(4)
username = args(5)
password = args(6)
}
// 因为需要实时计算,所以这里用的是StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 指定FlinkKafkaConsumer相关配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", kafkaBrokers)
properties.setProperty("group.id", groupId)
val kafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties)
// kafka consumer的消费策略设置
kafkaConsumer.setStartFromGroupOffsets()
// 添加隐式转换
import org.apache.flink.api.scala._
// 指定kafka作为source
val text = env.addSource(kafkaConsumer)
// 解析json数据中的核心字段
val tupStream = text.map(line => {
val jsonObj = JSON.parseObject(line)
val desc = jsonObj.getString("type")
val fid = jsonObj.getString("fid")
val uid = jsonObj.getString("uid")
(desc, fid, uid)
})
// 为了避免频繁创建neo4j链接,使用Neo4jSink维护粉丝关注数据
val param = Map("neo4jUrl" -> neo4jUrl, "username" -> username, "password" -> password)
tupStream.addSink(new Neo4jSink(param))
env.execute(appName)
}
}
package com.itechthink.recommend.flink
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.neo4j.driver.{AuthTokens, Driver, GraphDatabase, Transaction, TransactionWork}
/**
* 实时维护Neo4j中的粉丝关系数据
*
*/
class Neo4jSink extends RichSinkFunction[(String, String, String)] {
// 保存neo4j相关的配置参数
var param: Map[String, String] = Map()
var driver: Driver = _
/**
* 构造函数
* 接收neo4j相关的配置参数
*
*/
def this(param: Map[String, String]) {
this()
this.param = param
}
/**
* 初始化方法,仅执行一次
*
*/
override def open(parameters: Configuration): Unit = {
this.driver = GraphDatabase.driver(param("neo4jUrl"), AuthTokens.basic(param("username"), param("password")))
}
/**
* 核心代码,来一条数据,此方法会执行一次
*
*/
override def invoke(value: (String, String, String), context: SinkFunction.Context[_]): Unit = {
// 开启会话
val session = driver.session()
val desc = value._1
val fid = value._2
val uid = value._3
if ("follow".equals(desc)) {
// 添加关注:因为涉及多条命令,所以需要使用事务
session.writeTransaction(new TransactionWork[Unit]() {
override def execute(tx: Transaction): Unit = {
try {
tx.run("merge (:User {uid:'" + fid + "'})")
tx.run("merge (:User {uid:'" + uid + "'})")
tx.run("match (a:User {uid:'" + fid + "'}),(b:User {uid:'" + uid + "'}) merge (a) -[:follow]-> (b)")
tx.commit()
} catch {
case ex: Exception => tx.rollback()
}
}
})
} else {
// 取消关注
session.run("match (:User {uid:'" + uid + "'}) -[r:follow]-> (:User {uid:'" + uid + "'}) delete r")
}
// 关闭会话
session.close()
}
/**
* 任务结束时会调用这个方法
*
*/
override def close(): Unit = {
// 关闭连接
if (driver != null) {
driver.close()
}
}
}
提交到Flink集群的作业脚本如下。
> mkdir /home/work/recommend2/jobs
> cd /home/work/recommend2/jobs
> vi realTimeFollow.sh
#!/bin/bash
masterUrl="yarn-cluster"
appName="RealTimeFollowScala"
kafkaBrokers="172.16.185.176:9092"
groupId="group_id_1"
topic="user_follow"
neo4jUrl="neo4j://172.16.185.176:7687"
username="neo4j"
password="12345678"
flink run \
-m ${masterUrl} \
-ynm ${appName} \
-yqu default \
-yjm 1024 \
-ytm 1024 \
-ys 1 \
-p 5 \
-c com.itechthink.recommend.flink.RealTimeFollowScala \
/home/work/recommend2/jobs/real_time_follow-2.0-SNAPSHOT-jar-with-dependencies.jar ${appName} ${kafkaBrokers} ${groupId} ${topic} ${neo4jUrl} ${username} ${password}
感谢支持
更多内容,请移步《超级个体》。