Transformation操作
原创大约 5 分钟
官方列举的常用Transformation算子。
Scala代码
package com.itechthink.rdd
import org.apache.spark.{RangePartitioner, SparkConf, SparkContext}
/**
* Scala中常用的Transformation算子
*
* map: 对集合中每个元素乘以2
* filter: 过滤出集合中的偶数
* flatMap: 将行拆分为单词
* groupByKey: 对数据进行分组
* reduceByKey: 分组统计数量
* sortByKey: 分组进行排序
* join: 两个集合的笛卡尔积
* distinct: 数据去重
*/
object ScalaTransformationOperator {
private def getSparkContext: SparkContext = {
val conf = new SparkConf()
// 设置应用程序名称
.setAppName("ScalaTransformationOperator")
// 设置Spark运行模式
.setMaster("local")
new SparkContext(conf)
}
def main(args: Array[String]): Unit = {
// 1. 先创建SparkContext
val sc = getSparkContext
// map:对集合中每个元素乘以2
//mapOperator(sc)
// filter:过滤出集合中的偶数
//filterOperator(sc)
// flatMap:将行拆分为单词
//flatMapOperator(sc)
// groupByKey:对数据进行分组
//groupByKeyOperator(sc)
// reduceByKey:分组统计数量
//reduceByKeyOperator(sc)
// sortByKey:分组进行排序
//sortByKeyOperator(sc)
// join:两个集合的笛卡尔积
//joinOperator(sc)
// distinct:数据去重
distinctOperator(sc)
// 2. 关闭SparkContext
sc.stop()
}
private def distinctOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(1, 2, 2, 4, 5))
rdd.distinct().foreach(println)
}
private def joinOperator(sc: SparkContext): Unit = {
val rdd1 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3)))
val rdd2 = sc.parallelize(Array(("a", 4), ("b", 5), ("c", 6)))
rdd1.join(rdd2).foreach(println)
}
private def sortByKeyOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("a", 4), ("b", 5)))
// 通过RangePartitioner可以实现全局排序
val pairs = rdd.sortByKey()
val part = new RangePartitioner(2, pairs, false)
println(part.getPartition(("a")))
println(part.getPartition(("b")))
rdd.sortByKey().foreach(println)
}
private def reduceByKeyOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("a", 4), ("b", 5)))
rdd.reduceByKey(_ + _).foreach(println)
}
private def groupByKeyOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("a", 4), ("b", 5)))
rdd.groupByKey().foreach(println)
}
private def flatMapOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array("hello world", "hello scala", "hello java"))
rdd.flatMap(_.split(" ")).foreach(println)
}
private def filterOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
rdd.filter(_ % 2 == 0).foreach(println)
}
private def mapOperator(sc: SparkContext): Unit = {
val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
rdd.map(_ * 2).foreach(println)
}
}
Java代码
package com.itechthink.rdd;
import org.apache.spark.RangePartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.rdd.RDD;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
* Java中常用的Transformation算子
*
* map: 对集合中每个元素乘以2
* filter: 过滤出集合中的偶数
* flatMap: 将行拆分为单词
* groupByKey: 对数据进行分组
* reduceByKey: 分组统计数量
* sortByKey: 分组进行排序
* join: 两个集合的笛卡尔积
* distinct: 数据去重
*/
public class JavaTransformationOperator {
private static JavaSparkContext getSparkContext() {
SparkConf conf = new SparkConf()
// 设置应用程序名称
.setAppName("JavaTransformationOperator")
// 设置Spark运行模式
.setMaster("local");
return new JavaSparkContext(conf);
}
public static void main(String[] args) {
// 1. 先创建SparkContext
JavaSparkContext sc = getSparkContext();
// map:对集合中每个元素乘以2
//mapOperator(sc);
// filter:过滤出集合中的偶数
//filterOperator(sc);
// flatMap:将行拆分为单词
//flatMapOperator(sc);
// groupByKey:对数据进行分组
//groupByKeyOperator(sc);
// reduceByKey:分组统计数量
//reduceByKeyOperator(sc);
// sortByKey:分组进行排序(通过RangePartitioner可以实现全局排序)
//sortByKeyOperator(sc);
// join:两个集合的笛卡尔积
//joinOperator(sc);
// distinct:数据去重
distinctOperator(sc);
// 2. 关闭SparkContext
sc.stop();
}
private static void distinctOperator(JavaSparkContext sc) {
JavaRDD<String> dataRDD = sc.parallelize(Arrays.asList("hello world", "hello scala", "hello java"));
dataRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String input) throws Exception {
return Arrays.asList(input.split(" ")).iterator();
}
}).distinct().foreach(new VoidFunction<String>() {
@Override
public void call(String input) throws Exception {
System.out.println(input);
}
});
}
private static void joinOperator(JavaSparkContext sc) {
Tuple2<String, Integer> t1 = new Tuple2<>("hello", 1);
Tuple2<String, Integer> t2 = new Tuple2<>("world", 2);
Tuple2<String, Integer> t3 = new Tuple2<>("hello", 3);
Tuple2<String, Integer> t4 = new Tuple2<>("world", 4);
Tuple2<String, String> t5 = new Tuple2<>("world", "1");
Tuple2<String, String> t6 = new Tuple2<>("hello", "2");
Tuple2<String, String> t7 = new Tuple2<>("world", "3");
Tuple2<String, String> t8 = new Tuple2<>("hello", "4");
JavaRDD<Tuple2<String, Integer>> dataRDD1 = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
JavaRDD<Tuple2<String, String>> dataRDD2 = sc.parallelize(Arrays.asList(t5, t6, t7, t8));
JavaPairRDD<String, Integer> pairRDD1 = dataRDD1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> input) throws Exception {
return new Tuple2<>(input._1, input._2);
}
});
JavaPairRDD<String, String> pairRDD2 = dataRDD2.mapToPair(new PairFunction<Tuple2<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(Tuple2<String, String> input) throws Exception {
return new Tuple2<>(input._1, input._2);
}
});
pairRDD1.join(pairRDD2).foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, String>>>() {
@Override
public void call(Tuple2<String, Tuple2<Integer, String>> input) throws Exception {
System.out.println(input._1 + " -> " + "<" + input._2._1 + ":" + input._2._2 + ">");
}
});
}
private static void sortByKeyOperator(JavaSparkContext sc) {
Tuple2<String, Integer> t1 = new Tuple2<>("hello", 1);
Tuple2<String, Integer> t2 = new Tuple2<>("world", 2);
Tuple2<String, Integer> t3 = new Tuple2<>("hello", 3);
Tuple2<String, Integer> t4 = new Tuple2<>("world", 4);
Tuple2<String, Integer> t5 = new Tuple2<>("hello", 5);
JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4, t5));
JavaPairRDD<String, Integer> pairRDD = dataRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> input) throws Exception {
return new Tuple2<>(input._1, input._2);
}
})
.sortByKey(true);
// 通过RangePartitioner可以实现全局排序
// 手动指定调用RangePartitioner分区器
RDD<Tuple2<String, Integer>> prdd = JavaPairRDD.toRDD(pairRDD);
RangePartitioner rangePartitioner = new RangePartitioner(
2,
prdd,
true,
scala.math.Ordering.String$.MODULE$,
scala.reflect.ClassTag$.MODULE$.apply(String.class));
// 指定分区划分器
JavaPairRDD<String, Integer> rangeRDD = pairRDD.partitionBy(rangePartitioner).cache();
System.out.println("partitionBy后分区数:" + rangeRDD.getNumPartitions());
System.out.println("partitionBy后分区划分器:" + rangeRDD.partitioner().toString());
pairRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer>input) throws Exception {
System.out.println(input._1 + ":" + input._2);
}
});
}
private static void reduceByKeyOperator(JavaSparkContext sc) {
Tuple2<String, Integer> t1 = new Tuple2<>("hello", 1);
Tuple2<String, Integer> t2 = new Tuple2<>("hello", 2);
Tuple2<String, Integer> t3 = new Tuple2<>("world", 4);
Tuple2<String, Integer> t4 = new Tuple2<>("world", 9);
JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
dataRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, Integer> input) throws Exception {
return new Tuple2<>(input._1, input._2);
}
})
.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
}).foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> input) throws Exception {
System.out.println(input._1 + ":" + input._2);
}
});
}
private static void groupByKeyOperator(JavaSparkContext sc) {
JavaRDD<String> dataRDD = sc.parallelize(Arrays.asList("hello world", "hello scala", "hello java"));
dataRDD.mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String input) throws Exception {
String[] line = input.split(" ");
return new Tuple2<>(line[0], line[1]);
}
}).groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<String>>>() {
@Override
public void call(Tuple2<String, Iterable<String>> input) throws Exception {
System.out.println(input._1 + ":" + input._2);
}
});
}
private static void flatMapOperator(JavaSparkContext sc) {
JavaRDD<String> dataRDD = sc.parallelize(Arrays.asList("hello world", "hello scala", "hello java"));
dataRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String input) throws Exception {
return Arrays.asList(input.split(" ")).iterator();
}
}).foreach(new VoidFunction<String>() {
@Override
public void call(String input) throws Exception {
System.out.println(input);
}
});
}
private static void filterOperator(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
dataRDD.filter(new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer input) throws Exception {
return input % 2 == 0;
}
}).foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer input) throws Exception {
System.out.println(input);
}
});
}
private static void mapOperator(JavaSparkContext sc) {
JavaRDD<Integer> dataRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
dataRDD.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer input) throws Exception {
return input * 2;
}
}).foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer input) throws Exception {
System.out.println(input);
}
});
}
}
感谢支持
更多内容,请移步《超级个体》。