高级特性
宽依赖和窄依赖
窄依赖
Scala语言版窄依赖API(Narrow Dependency API)。
Java语言版窄依赖API(Narrow Dependency API)。
不管是哪种语言版本的窄依赖(Narrow Dependency)
,它们都指的是父RDD的一个或多个分区被子RDD的一个或多个分区所使用,例如map
、filter
、union
、join
、cogroup
、cartesian
等算子。

宽依赖
Scala语言版宽依赖API(Shuffle Dependency API)。
Java语言版宽依赖API(Shuffle Dependency API)。
宽依赖(Shuffle Dependency)
指的是父RDD的每个分区的一部分被多个子RDD所使用,例如reduceByKey
、groupByKey
等算子。


Stage
RDD每执行一次Action
操作,就会启动一个Job
,而如果这个Job
有Shuffle(也就是宽依赖(Shuffle Dependency)
)操作,则这个Job
又会被划分成前后两个Stage
,每个Stage
都由一组并行的Task
组成。
Stage
会将一批Task
用TaskSet
封装起来,然后提交给TaskScheduler
分配,最后发送给Executor
去执行。
Stage
的划分过程是从后往前进行的,也就是从最后一个RDD开始倒推,每遇到一个Shuffle操作就划分。但它的执行过程却是从前往后,也就是按照正常的代码顺序来执行的。

Job的三种提交模式
Standalone模式
> ./bin/spark-submit --master spark://hadoop:7077 ......
yarn clint模式
> ./bin/spark-submit --yarn --deploy-mode client ......
yarn cluster模式
> ./bin/spark-submit --yarn --deploy-mode cluster ......
三种模式的区别如下图。

Shuffle
在Spark中,Shuffle指的是将数据重新分区并重新分发到不同的节点上进行处理的过程。
reduceByKey
、groupByKey
、sortByKey
、countByKey
(xxxByKey
类的算子)和join
等算子会产生Shuffle。
Shuffle的算法经过了几种不同的变化。
未优化的
Hash Based Shuffle
。优化后的
Hash Based Shuffle
。Sort-Based Shuffle
。
Checkpoint
当Spark的计算任务较为复杂,且运行时间较长时,比较适合使用Checkpoint功能,这是Spark的一个容灾备份机制,也可以把它理解为Spark任务执行过程中产生的快照。
Checkpoint和持久化
有两个关键区别。
持久化
将数据存储在内存中,一旦宕机就会丢失。而Checkpoint则是将数据保存在磁盘或高可用的分布式存储系统中,基本上不会有数据丢失的情况。RDD被持久化之后其相互依赖关系(又称为
血缘关系
)仍然存在,而一旦被Checkpoint存为快照之后,它所有的血缘关系
全部丢失。
所以,如果某个RDD需要执行Checkpoint,那么可以先执行一次持久化
操作,例如persist(StorageLevel.DISK_ONLY)
。
这样一来,当后面进行Checkpoint操作时就会直接从磁盘上读取RDD的数据,而不再需要重新计算一次。
调用Checkpoint的方法也很简单。
Scala 2调用Checkpoint方法的代码。
......
// 1. 设置checkpoint目录
sc.setCheckpointDir("hdfs://hadoop:9000/checkpoint/")
val linesRDD = sc.textFile("/wordcount.txt")
// 2. 先执行RDD的持久化方法
linesRDD.persist(StorageLevel.DISK_ONLY)
// 3. 再对RDD调用checkpoint()方法
linesRDD.checkpoint()
......
Java调用Checkpoint方法的代码。
......
// 1. 设置checkpoint目录
sc.setCheckpointDir("hdfs://hadoop:9000/checkpoint/");
JavaRDD<String> linesRDD = sc.textFile("/wordcount.txt");
// 2. 先执行RDD的持久化方法
linesRDD.persist(StorageLevel.DISK_ONLY());
// 3. 再对RDD调用checkpoint()方法
linesRDD.checkpoint();
......
感谢支持
更多内容,请移步《超级个体》。