官方列举的常用Transformation算子。
Scala代码
package com.itechthink.rdd
import org.apache.spark.{RangePartitioner, SparkConf, SparkContext}
/**
* Scala中常用的Transformation算子
*
* map: 对集合中每个元素乘以2
* filter: 过滤出集合中的偶数
* flatMap: 将行拆分为单词
* groupByKey: 对数据进行分组
* reduceByKey: 分组统计数量
* sortByKey: 分组进行排序
* join: 两个集合的笛卡尔积
* distinct: 数据去重
*/
object ScalaTransformationOperator {
private def getSparkContext: SparkContext = {
val conf = new SparkConf()
// 设置应用程序名称
.setAppName("ScalaTransformationOperator")
// 设置Spark运行模式
.setMaster("local")
new SparkContext(conf)
}
def main(args: Array[String]): Unit = {
// 1. 先创建SparkContext
val sc = getSparkContext
// map:对集合中每个元素乘以2
//mapOperator(sc)
// filter:过滤出集合中的偶数
//filterOperator(sc)
// flatMap:将行拆分为单词
//flatMapOperator(sc)
// groupByKey:对数据进行分组
//groupByKeyOperator(sc)
// reduceByKey:分组统计数量
//reduceByKeyOperator(sc)
// sortByKey:分组进行排序
//sortByKeyOperator(sc)
// join:两个集合的笛卡尔积
//joinOperator(sc)
// distinct:数据去重
distinctOperator(sc)
// 2. 关闭SparkContext
sc.stop()
}
private def distinctOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(1, 2, 2, 4, 5))
rdd.distinct().foreach(println)
}
private def joinOperator(sc: SparkContext): Unit = {
val rdd1 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3)))
val rdd2 = sc.parallelize(Array(("a", 4), ("b", 5), ("c", 6)))
rdd1.join(rdd2).foreach(println)
}
private def sortByKeyOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("a", 4), ("b", 5)))
// 通过RangePartitioner可以实现全局排序
val pairs = rdd.sortByKey()
val part = new RangePartitioner(2, pairs, false)
println(part.getPartition(("a")))
println(part.getPartition(("b")))
rdd.sortByKey().foreach(println)
}
private def reduceByKeyOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("a", 4), ("b", 5)))
rdd.reduceByKey(_ + _).foreach(println)
}
private def groupByKeyOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("a", 4), ("b", 5)))
rdd.groupByKey().foreach(println)
}
private def flatMapOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array("hello world", "hello scala", "hello java"))
rdd.flatMap(_.split(" ")).foreach(println)
}
private def filterOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
rdd.filter(_ % 2 == 0).foreach(println)
}
private def mapOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
rdd.map(_ * 2).foreach(println)
}
}
原创大约 5 分钟