Action操作
原创大约 2 分钟
官方列举的常用Action算子。
Scala代码
package com.itechthink.rdd
import org.apache.spark.{SparkConf, SparkContext}
/**
* Scala中常用的Action算子
*
* reduce: 聚合计算
* collect: 获取元素集合
* take(n): 获取前n个元素
* count: 获取元素总数
* saveAsTextFile: 保存文件
* countByKey: 统计相同的key出现多少次
*/
object ScalaActionOperator {
private def getSparkContext: SparkContext = {
val conf = new SparkConf()
// 设置应用程序名称
.setAppName("ScalaActionOperator")
// 设置Spark运行模式
.setMaster("local")
new SparkContext(conf)
}
def main(args: Array[String]): Unit = {
// 1. 先创建SparkContext
val sc = getSparkContext
// reduce:聚合计算
//reduceOperator(sc);
// collect:获取元素集合
//collectOperator(sc);
// take(n):获取前n个元素
//takeOperator(sc);
// count:获取元素总数
//countOperator(sc);
// saveAsTextFile:保存文件
//saveAsTextFileOperator(sc);
// countByKey:统计相同的key出现多少次
//countByKeyOperator(sc);
// 2. 关闭SparkContext
sc.stop()
}
private def countByKeyOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3), ("b", 4)))
println(rdd.countByKey())
}
private def saveAsTextFileOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(1 to 10)
rdd.saveAsTextFile("/Users/bear/Downloads/result")
}
private def countOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(1 to 10)
println(rdd.count())
}
private def takeOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(1 to 10)
println(rdd.take(5).mkString(","))
}
private def collectOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(1 to 10)
println(rdd.collect().mkString(","))
}
private def reduceOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(1 to 10)
println(rdd.reduce(_ + _))
}
}
Java代码
package com.itechthink.rdd;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
/**
* Java中常用的Action算子
*
* reduce: 聚合计算
* collect: 获取元素集合
* take(n): 获取前n个元素
* count: 获取元素总数
* saveAsTextFile: 保存文件
* countByKey: 统计相同的key出现多少次
*/
public class JavaActionOperator {
private static JavaSparkContext getSparkContext() {
SparkConf conf = new SparkConf()
// 设置应用程序名称
.setAppName("JavaActionOperator")
// 设置Spark运行模式
.setMaster("local");
return new JavaSparkContext(conf);
}
public static void main(String[] args) {
// 1. 先创建SparkContext
JavaSparkContext sc = getSparkContext();
// reduce:聚合计算
//reduceOperator(sc);
// collect:获取元素集合
//collectOperator(sc);
// take(n):获取前n个元素
//takeOperator(sc);
// count:获取元素总数
//countOperator(sc);
// saveAsTextFile:保存文件
//saveAsTextFileOperator(sc);
// countByKey:统计相同的key出现多少次
countByKeyOperator(sc);
// 2. 关闭SparkContext
sc.stop();
}
private static void countByKeyOperator(JavaSparkContext sc) {
Tuple2<String, Integer> t1 = new Tuple2<>("hello", 1);
Tuple2<String, Integer> t2 = new Tuple2<>("world", 2);
Tuple2<String, Integer> t3 = new Tuple2<>("hello", 3);
Tuple2<String, Integer> t4 = new Tuple2<>("world", 4);
Tuple2<String, Integer> t5 = new Tuple2<>("hello", 5);
JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4, t5));
dataRDD.mapToPair(t -> new Tuple2<>(t._1, 1)).countByKey().forEach((k, v) -> System.out.println(k + ":" + v));
}
private static void saveAsTextFileOperator(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
dataRDD.saveAsTextFile("/Users/bear/Downloads/result");
}
private static void countOperator(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
System.out.println(dataRDD.count());
}
private static void takeOperator(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
System.out.println(dataRDD.take(3));
}
private static void collectOperator(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
System.out.println(dataRDD.collect());
}
private static void reduceOperator(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
System.out.println(dataRDD.reduce((a, b) -> a + b));
}
}
感谢支持
更多内容,请移步《超级个体》。