除了Word Count,在大数据分析中另一类需求也有着和它差不多的地位:排名取前N
(或者分组后再排名取前N
),也就是将一组数据经过指定的排序之后取前N个
数据,这类需求被统称为TopN
。
例如,状元
、榜眼
和探花
,就是古代科举考试成绩经过排名后取前三的结果(而且这个结果一定是经过分组后再汇总计算得出的,例如考场
分组和省份
分组)。
除了Word Count,在大数据分析中另一类需求也有着和它差不多的地位:排名取前N
(或者分组后再排名取前N
),也就是将一组数据经过指定的排序之后取前N个
数据,这类需求被统称为TopN
。
例如,状元
、榜眼
和探花
,就是古代科举考试成绩经过排名后取前三的结果(而且这个结果一定是经过分组后再汇总计算得出的,例如考场
分组和省份
分组)。
Spark SQL是Spark的一个模块,主要就是用于大数据中结构化数据的处理。
Spark SQL最核心的组件就是DataFrame
,可以把它理解为MySQL中的表,它可以通过多种不同的数据源来构建数据,例如文本文件、HDFS中的文件、Hive中的表、MySQL中的表、Spark自己的RDD等。
既然Spark是一个基于内存执行计算的大数据系统,那么必须知道它是如何使用内存空间的。
可以通过实际的代码执行效果来估算Spark程序的内存使用情况。
使用Scala 2预估内存的执行情况。
官方列举的常用Action算子。
package com.itechthink.rdd
import org.apache.spark.{SparkConf, SparkContext}
/**
* Scala中常用的Action算子
*
* reduce: 聚合计算
* collect: 获取元素集合
* take(n): 获取前n个元素
* count: 获取元素总数
* saveAsTextFile: 保存文件
* countByKey: 统计相同的key出现多少次
*/
object ScalaActionOperator {
private def getSparkContext: SparkContext = {
val conf = new SparkConf()
// 设置应用程序名称
.setAppName("ScalaActionOperator")
// 设置Spark运行模式
.setMaster("local")
new SparkContext(conf)
}
def main(args: Array[String]): Unit = {
// 1. 先创建SparkContext
val sc = getSparkContext
// reduce:聚合计算
//reduceOperator(sc);
// collect:获取元素集合
//collectOperator(sc);
// take(n):获取前n个元素
//takeOperator(sc);
// count:获取元素总数
//countOperator(sc);
// saveAsTextFile:保存文件
//saveAsTextFileOperator(sc);
// countByKey:统计相同的key出现多少次
//countByKeyOperator(sc);
// 2. 关闭SparkContext
sc.stop()
}
private def countByKeyOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
println(rdd.countByKey())
}
private def saveAsTextFileOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(1 to 10)
rdd.saveAsTextFile("/Users/bear/Downloads/result")
}
private def countOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(1 to 10)
println(rdd.count())
}
private def takeOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(1 to 10)
println(rdd.take(5).mkString(","))
}
private def collectOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(1 to 10)
println(rdd.collect().mkString(","))
}
private def reduceOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(1 to 10)
println(rdd.reduce(_ + _))
}
}
官方列举的常用Transformation算子。
package com.itechthink.rdd
import org.apache.spark.{RangePartitioner, SparkConf, SparkContext}
/**
* Scala中常用的Transformation算子
*
* map: 对集合中每个元素乘以2
* filter: 过滤出集合中的偶数
* flatMap: 将行拆分为单词
* groupByKey: 对数据进行分组
* reduceByKey: 分组统计数量
* sortByKey: 分组进行排序
* join: 两个集合的笛卡尔积
* distinct: 数据去重
*/
object ScalaTransformationOperator {
private def getSparkContext: SparkContext = {
val conf = new SparkConf()
// 设置应用程序名称
.setAppName("ScalaTransformationOperator")
// 设置Spark运行模式
.setMaster("local")
new SparkContext(conf)
}
def main(args: Array[String]): Unit = {
// 1. 先创建SparkContext
val sc = getSparkContext
// map:对集合中每个元素乘以2
//mapOperator(sc)
// filter:过滤出集合中的偶数
//filterOperator(sc)
// flatMap:将行拆分为单词
//flatMapOperator(sc)
// groupByKey:对数据进行分组
//groupByKeyOperator(sc)
// reduceByKey:分组统计数量
//reduceByKeyOperator(sc)
// sortByKey:分组进行排序
//sortByKeyOperator(sc)
// join:两个集合的笛卡尔积
//joinOperator(sc)
// distinct:数据去重
distinctOperator(sc)
// 2. 关闭SparkContext
sc.stop()
}
private def distinctOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(1, 2, 2, 4, 5))
rdd.distinct().foreach(println)
}
private def joinOperator(sc: SparkContext): Unit = {
val rdd1 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3)))
val rdd2 = sc.parallelize(Array(("a", 4), ("b", 5), ("c", 6)))
rdd1.join(rdd2).foreach(println)
}
private def sortByKeyOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("a", 4), ("b", 5)))
// 通过RangePartitioner可以实现全局排序
val pairs = rdd.sortByKey()
val part = new RangePartitioner(2, pairs, false)
println(part.getPartition(("a")))
println(part.getPartition(("b")))
rdd.sortByKey().foreach(println)
}
private def reduceByKeyOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("a", 4), ("b", 5)))
rdd.reduceByKey(_ + _).foreach(println)
}
private def groupByKeyOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("a", 4), ("b", 5)))
rdd.groupByKey().foreach(println)
}
private def flatMapOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array("hello world", "hello scala", "hello java"))
rdd.flatMap(_.split(" ")).foreach(println)
}
private def filterOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
rdd.filter(_ % 2 == 0).foreach(println)
}
private def mapOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
rdd.map(_ * 2).foreach(println)
}
}
每个新手软件工程师肯定知道什么是Hello World。
而Word Count在大数据领域的地位,就相当于Hello World在编程语言中的地位。
RDD(Resilient Distributed Datasets,弹性分布式数据集
)是Spark提供的一个非常核心的抽象概念,它是一种可以被分为多个分区
(Partition
)的数据元素的集合
。
默认情况下,RDD的数据是存放在内存中的,而当内存资源不足时,Spark会自动将RDD中的数据写入磁盘(这是它之所以具有弹性
的原因)。
分区
分布在集群中的不同节点上,所以RDD中的数据可以被并行操作(这是它之所以具有分布式
的原因)。
RDD最重要的特性就是容错性
,它可以自动从节点失败中恢复过来。即如果某个节点上的分区
因为节点故障,导致数据丢失,那么RDD会自动通过自己的数据来源重新计算分区
,这一切对使用者都是透明的。
可以这样来理解:RDD = 数据 + 算子。