TopN:排名取前N
原创大约 12 分钟
除了Word Count,在大数据分析中另一类需求也有着和它差不多的地位:排名取前N
(或者分组后再排名取前N
),也就是将一组数据经过指定的排序之后取前N个
数据,这类需求被统称为TopN
。
例如,状元
、榜眼
和探花
,就是古代科举考试成绩经过排名后取前三的结果(而且这个结果一定是经过分组后再汇总计算得出的,例如考场
分组和省份
分组)。
有一份电商购物订单记录和订单详情记录,需要根据它来计算每个地区用户当天消费的TopN
。
RDD实现
下面是用RDD来实现TopN
需求的代码。
package com.itechthink.rdd;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
/**
* Java版用RDD实现TopN
* 计算每个地区用户当天消费TopN
*
* 1. 先获取订单信息和订单详情中的核心字段(使用fastjson解析)
* 订单信息(order_info):oid、uid、area,放入tuple2:(oid, (uid, area))
* 订单详情(order_details):oid、money,放入tuple2:(oid, money)
*
* 2. 对订单详情数据进行聚合,对相同oid的记录求和,因为用户在某天可能会用到多个优惠券:(oid, money_sum)
*
* 3. 使用join连接两个RDD:(oid, ((uid, area), money_sum))
*
* 4. 使用map迭代join之后的数据,获取到oid、area和money_sum。用户每次可能购买多个商品,需要基于uid、area再做聚合:
* (oid, ((uid, area), money_sum)) ---> ((uid, area), money_sum)
*
* 5. 使用reduceByKey算子对数据进行聚合:((uid, area), money_sum_all)
*
* 6. 使用map转换,根据地区分组:((uid, area), money_sum_all) ---> (area, (uid, money_sum_all))
*
* 7. 使用groupByKey算子对数据进行分组:(area, (uid, money_sum_all)) ---> (area, <(uid, money_sum_all), (uid, money_sum_all), (uid, money_sum_all)>)
*
* 8. 使用map迭代每个分组内的数据,按照消费金额数量倒序排序,取前N条记录,最终输出:(area, (uid, money_sum_all))
*
* 9. 使用foreach算子打印到控制台,或者使用saveAsTextFile算子,将结果保存到HDFS
*/
public class JavaRDDTopN {
public static void main(String[] args) {
// 1. 先创建SparkContext
SparkConf conf = new SparkConf()
// 设置应用程序名称
.setAppName("JavaRDDTopN")
// 设置Spark运行模式
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 2. 先获取订单信息和订单详情中的核心字段(使用fastjson解析)
// 2.1 加载数据
String orderInfoPath = "/Users/bear/Downloads/order_info.log";
String orderDetailsPath = "/Users/bear/Downloads/order_details.log";
if (args.length == 1) {
// "hdfs://hadoop:9000/spark/data/order_info.log"
orderInfoPath = args[0];
// "hdfs://hadoop:9000/spark/data/order_details.log"
orderDetailsPath = args[1];
}
JavaRDD<String> orderInfoRDD = sc.textFile(orderInfoPath);
JavaRDD<String> orderDetailsRDD = sc.textFile(orderDetailsPath);
// 2.2 使用fastjson解析数据
// (oid, (uid, area))
JavaPairRDD<String, Tuple2<String, String>> orderInfoJsonRDD = orderInfoRDD.mapToPair(new PairFunction<String, String, Tuple2<String, String>>() {
@Override
public Tuple2<String, Tuple2<String, String>> call(String input) throws Exception {
// 获取订单信息中的核心字段
JSONObject orderInfo = JSON.parseObject(input);
String oid = orderInfo.getString("oid");
String uid = orderInfo.getString("uid");
String area = orderInfo.getString("area");
return new Tuple2<String, Tuple2<String, String>>(oid, new Tuple2<String, String>(uid, area));
}
});
// (oid, money)
JavaPairRDD<String, Integer> orderDetailsJsonRDD = orderDetailsRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String input) throws Exception {
// 获取详情信息中的核心字段
JSONObject orderDetails = JSON.parseObject(input);
String oid = orderDetails.getString("oid");
int money = orderDetails.getInteger("money");
return new Tuple2<String, Integer>(oid, money);
}
});
// 3. 对订单详情数据进行聚合,对相同oid的记录求和,因为用户在某天可能会用到多个优惠券:
// (oid, money_sum)
JavaPairRDD<String, Integer> orderDetailsJsonSumRDD = orderDetailsJsonRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
// 4. 使用join连接两个RDD:
// (oid, ((uid, area), money_sum))
JavaPairRDD<String, Tuple2<Tuple2<String, String>, Integer>> orderJoinRDD = orderInfoJsonRDD.join(orderDetailsJsonSumRDD);
// 5. 使用map迭代join之后的数据,获取到oid、area和money_sum。用户每次可能购买多个商品,需要基于uid、area再做聚合:
// (oid, ((uid, area), money_sum)) ---> ((uid, area), money_sum)
JavaPairRDD<Tuple2<String, String>, Integer> orderJoinMapRDD = orderJoinRDD.mapToPair(new PairFunction<Tuple2<String, Tuple2<Tuple2<String, String>, Integer>>, Tuple2<String, String>, Integer>() {
@Override
public Tuple2<Tuple2<String, String>, Integer> call(Tuple2<String, Tuple2<Tuple2<String, String>, Integer>> input) throws Exception {
return new Tuple2<Tuple2<String, String>, Integer>(new Tuple2<String, String>(input._2._1._1, input._2._1._2), input._2._2);
}
});
// 6. 使用reduceByKey算子对数据进行聚合:((uid, area), money_sum_all)
JavaPairRDD<Tuple2<String, String>, Integer> orderJoinMapReduceRDD = orderJoinMapRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
// 7. 使用map转换,根据地区分组:((uid, area), money_sum_all) ---> (area, (uid, money_sum_all))
JavaPairRDD<String, Tuple2<String, Integer>> orderJoinMapReduceMapRDD = orderJoinMapReduceRDD.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Tuple2<String, Integer>> call(Tuple2<Tuple2<String, String>, Integer> input) throws Exception {
return new Tuple2<String, Tuple2<String, Integer>>(input._1._2, new Tuple2<String, Integer>(input._1._1, input._2));
}
});
// 8. 使用groupByKey算子对数据进行分组:(area, (uid, money_sum_all)) ---> (area, <(uid, money_sum_all), (uid, money_sum_all), (uid, money_sum_all)>)
JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> orderJoinGroupRDD = orderJoinMapReduceMapRDD.groupByKey();
// 9. 使用map迭代每个分组内的数据,按照消费金额数量倒序排序,取前N条记录,最终输出:(area, (uid, money_sum_all))
// orderResultRDD
JavaRDD<Tuple2<String, List<Tuple2<String, Integer>>>> orderResultRDD = orderJoinGroupRDD.map(new Function<Tuple2<String, Iterable<Tuple2<String, Integer>>>, Tuple2<String, List<Tuple2<String, Integer>>>>() {
@Override
public Tuple2<String, List<Tuple2<String, Integer>>> call(Tuple2<String, Iterable<Tuple2<String, Integer>>> input) throws Exception {
// 获取area
String area = input._1;
// 将 Iterable<Tuple2<String, Integer>> 转换为根据Integer排序的且取前N的 Tuple2<String, Integer>
Iterable<Tuple2<String, Integer>> iterable = input._2;
ArrayList<Tuple2<String, Integer>> arrayList = Lists.newArrayList(iterable);
// 排序
arrayList.sort(new Comparator<Tuple2<String, Integer>>() {
@Override
public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
return t2._2 - t1._2;
}
});
// 取前N条记录
// List<Tuple2<String, Integer>> list = arrayList.subList(0, 3);
List<Tuple2<String, Integer>> result = Lists.newArrayList();
for (int i = 0; i < arrayList.size(); i++) {
if (i < 3) {
result.add(arrayList.get(i));
}
}
return new Tuple2<String, List<Tuple2<String, Integer>>>(area, result);
}
});
// 10. 使用foreach算子打印到控制台,或者使用saveAsTextFile算子,将结果保存到HDFS
orderResultRDD.foreach(new VoidFunction<Tuple2<String, List<Tuple2<String, Integer>>>>() {
@Override
public void call(Tuple2<String, List<Tuple2<String, Integer>>> input) throws Exception {
System.out.println(input._1 + " -> ");
List<Tuple2<String, Integer>> list = input._2;
for (Tuple2<String, Integer> t : list) {
System.out.println(t._1 + ":" + t._2);
}
}
});
// 11. 关闭SparkContext
sc.stop();
}
}
输出结果如下。
CN ->
8407173251008:120
8407173251003:60
8407173251014:50
ID ->
8407173251005:160
8407173251010:140
8407173251002:70
US ->
8407173251015:180
8407173251012:70
8407173251001:60
package com.itechthink.rdd;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
/**
* Java版TopN
* 计算每个地区用户当天消费TopN
*
* 1. 先获取订单信息和订单详情中的核心字段(使用fastjson解析)
* 订单信息(order_info):oid、uid、area,放入tuple2:(oid, (uid, area))
* 订单详情(order_details):oid、money,放入tuple2:(oid, money)
*
* 2. 对订单详情数据进行聚合,对相同oid的记录求和,因为用户在某天可能会用到多个优惠券:(oid, money_sum)
*
* 3. 使用join连接两个RDD:(oid, ((uid, area), money_sum))
*
* 4. 使用map迭代join之后的数据,获取到oid、area和money_sum。用户每次可能购买多个商品,需要基于uid、area再做聚合:
* (oid, ((uid, area), money_sum)) ---> ((uid, area), money_sum)
*
* 5. 使用reduceByKey算子对数据进行聚合:((uid, area), money_sum_all)
*
* 6. 使用map转换,根据地区分组:((uid, area), money_sum_all) ---> (area, (uid, money_sum_all))
*
* 7. 使用groupByKey算子对数据进行分组:(area, (uid, money_sum_all)) ---> (area, <(uid, money_sum_all), (uid, money_sum_all), (uid, money_sum_all)>)
*
* 8. 使用map迭代每个分组内的数据,按照消费金额数量倒序排序,取前N条记录,最终输出:(area, (uid, money_sum_all))
*
* 9. 使用foreach算子打印到控制台,或者使用saveAsTextFile算子,将结果保存到HDFS
*/
public class JavaTopN {
public static void main(String[] args) {
// 1. 先创建SparkContext
SparkConf conf = new SparkConf()
// 设置应用程序名称
.setAppName("JavaTopN")
// 设置Spark运行模式
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 2. 先获取订单信息和订单详情中的核心字段(使用fastjson解析)
// 2.1 加载数据
String orderInfoPath = "/order_info.log";
String orderDetailsPath = "/order_details.log";
if (args.length == 1) {
// "hdfs://hadoop:9000/spark/data/order_info.log"
orderInfoPath = args[0];
// "hdfs://hadoop:9000/spark/data/order_details.log"
orderDetailsPath = args[1];
}
JavaRDD<String> orderInfoRDD = sc.textFile(orderInfoPath);
JavaRDD<String> orderDetailsRDD = sc.textFile(orderDetailsPath);
// 2.2 使用fastjson解析数据
// (oid, (uid, area))
JavaPairRDD<String, Tuple2<String, String>> orderInfoJsonRDD = orderInfoRDD.mapToPair(new PairFunction<String, String, Tuple2<String, String>>() {
@Override
public Tuple2<String, Tuple2<String, String>> call(String input) throws Exception {
// 获取订单信息中的核心字段
JSONObject orderInfo = JSON.parseObject(input);
String oid = orderInfo.getString("oid");
String uid = orderInfo.getString("uid");
String area = orderInfo.getString("area");
return new Tuple2<String, Tuple2<String, String>>(oid, new Tuple2<String, String>(uid, area));
}
});
// (oid, money)
JavaPairRDD<String, Integer> orderDetailsJsonRDD = orderDetailsRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String input) throws Exception {
// 获取详情信息中的核心字段
JSONObject orderDetails = JSON.parseObject(input);
String oid = orderDetails.getString("oid");
int money = orderDetails.getInteger("money");
return new Tuple2<String, Integer>(oid, money);
}
});
// 3. 对订单详情数据进行聚合,对相同oid的记录求和,因为用户在某天可能会用到多个优惠券:
// (oid, money_sum)
JavaPairRDD<String, Integer> orderDetailsJsonSumRDD = orderDetailsJsonRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
// 4. 使用join连接两个RDD:
// (oid, ((uid, area), money_sum))
JavaPairRDD<String, Tuple2<Tuple2<String, String>, Integer>> orderJoinRDD = orderInfoJsonRDD.join(orderDetailsJsonSumRDD);
// 5. 使用map迭代join之后的数据,获取到oid、area和money_sum。用户每次可能购买多个商品,需要基于uid、area再做聚合:
// (oid, ((uid, area), money_sum)) ---> ((uid, area), money_sum)
JavaPairRDD<Tuple2<String, String>, Integer> orderJoinMapRDD = orderJoinRDD.mapToPair(new PairFunction<Tuple2<String, Tuple2<Tuple2<String, String>, Integer>>, Tuple2<String, String>, Integer>() {
@Override
public Tuple2<Tuple2<String, String>, Integer> call(Tuple2<String, Tuple2<Tuple2<String, String>, Integer>> input) throws Exception {
return new Tuple2<Tuple2<String, String>, Integer>(new Tuple2<String, String>(input._2._1._1, input._2._1._2), input._2._2);
}
});
// 6. 使用reduceByKey算子对数据进行聚合:((uid, area), money_sum_all)
JavaPairRDD<Tuple2<String, String>, Integer> orderJoinMapReduceRDD = orderJoinMapRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
// 7. 使用map转换,根据地区分组:((uid, area), money_sum_all) ---> (area, (uid, money_sum_all))
JavaPairRDD<String, Tuple2<String, Integer>> orderJoinMapReduceMapRDD = orderJoinMapReduceRDD.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Tuple2<String, Integer>> call(Tuple2<Tuple2<String, String>, Integer> input) throws Exception {
return new Tuple2<String, Tuple2<String, Integer>>(input._1._2, new Tuple2<String, Integer>(input._1._1, input._2));
}
});
// 8. 使用groupByKey算子对数据进行分组:(area, (uid, money_sum_all)) ---> (area, <(uid, money_sum_all), (uid, money_sum_all), (uid, money_sum_all)>)
JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> orderJoinGroupRDD = orderJoinMapReduceMapRDD.groupByKey();
// 9. 使用map迭代每个分组内的数据,按照消费金额数量倒序排序,取前N条记录,最终输出:(area, (uid, money_sum_all))
// orderResultRDD
JavaRDD<Tuple2<String, List<Tuple2<String, Integer>>>> orderResultRDD = orderJoinGroupRDD.map(new Function<Tuple2<String, Iterable<Tuple2<String, Integer>>>, Tuple2<String, List<Tuple2<String, Integer>>>>() {
@Override
public Tuple2<String, List<Tuple2<String, Integer>>> call(Tuple2<String, Iterable<Tuple2<String, Integer>>> input) throws Exception {
// 获取area
String area = input._1;
// 将 Iterable<Tuple2<String, Integer>> 转换为根据Integer排序的且取前N的 Tuple2<String, Integer>
Iterable<Tuple2<String, Integer>> iterable = input._2;
ArrayList<Tuple2<String, Integer>> arrayList = Lists.newArrayList(iterable);
// 排序
arrayList.sort(new Comparator<Tuple2<String, Integer>>() {
@Override
public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
return t2._2 - t1._2;
}
});
// 取前N条记录
// List<Tuple2<String, Integer>> list = arrayList.subList(0, 3);
List<Tuple2<String, Integer>> result = Lists.newArrayList();
for (int i = 0; i < arrayList.size(); i++) {
if (i < 3) {
result.add(arrayList.get(i));
}
}
return new Tuple2<String, List<Tuple2<String, Integer>>>(area, result);
}
});
// 10. 使用foreach算子打印到控制台,或者使用saveAsTextFile算子,将结果保存到HDFS
orderResultRDD.foreach(new VoidFunction<Tuple2<String, List<Tuple2<String, Integer>>>>() {
@Override
public void call(Tuple2<String, List<Tuple2<String, Integer>>> input) throws Exception {
System.out.println(input._1 + " -> ");
List<Tuple2<String, Integer>> list = input._2;
for (Tuple2<String, Integer> t : list) {
System.out.println(t._1 + ":" + t._2);
}
}
});
// 11. 关闭SparkContext
sc.stop();
}
}
输出结果如下。
CN ->
8407173251008:120
8407173251003:60
8407173251014:50
ID ->
8407173251005:160
8407173251010:140
8407173251002:70
US ->
8407173251015:180
8407173251012:70
8407173251001:60
SQL实现
相较于RDD算子,用SQL
来实现TopN
的代码会简单很多,但SQL
查询就会复杂一点,需要用到Spark Window Functions(开窗函数)。
用SQL
实现TopN
需求的Scala 2代码。
package com.itechthink.sql
import org.apache.spark.sql.{Row, SparkSession}
/**
* Scala版用SQL实现TopN
* 计算每个地区用户当天消费TopN
*
* 1. 先创建SparkSession
*
* 2. 使用SparkSession中的load操作加载两份json数据
*
* 3. 将这两份数据注册到临时表中
*
* 4. 执行SQL计算TopN消费数据
*
* 5. 使用foreach将结果打印到控制台
*/
object ScalaSQLTopN {
def main(args: Array[String]): Unit = {
// 1. 先创建SparkSession
val session = SparkSession.builder()
.appName("ScalaSQLTopN")
.master("local")
.getOrCreate()
// 2. 加载数据
val orderInfo = session.read.format("json").load("/Users/bear/Downloads/order_info.log")
val orderDetails = session.read.format("json").load("/Users/bear/Downloads/order_details.log")
// 3. 注册临时表
orderInfo.createOrReplaceTempView("order_info")
orderDetails.createOrReplaceTempView("order_details")
// 4. 执行SQL计算TopN消费数据
// 订单信息(order_info):oid、uid、area
// 订单详情(order_details):oid、money
val sql = {
// 行转列
"SELECT " +
"t4.area, CONCAT_WS(',', COLLECT_LIST(t4.TopN)) AS TopN " +
"FROM (" +
"SELECT " +
// 取TopN
"t3.area, CONCAT(t3.uid, ':', CAST(t3.money_sum_all AS INT)) AS TopN " +
"FROM ( " +
"SELECT " +
// 用开窗函数实现分组排序
"t2.uid, t2.area, t2.money_sum_all, row_number() over (PARTITION BY area ORDER BY money_sum_all DESC) AS total " +
"FROM ( " +
"SELECT " +
"t1.uid, max(t1.area) AS area, sum(t1.money_sum) AS money_sum_all " +
"FROM ( " +
"SELECT " +
"i.oid, i.uid, i.area, d.money_sum " +
"FROM " +
"order_info AS i " +
"JOIN ( " +
"SELECT " +
"oid, sum(money) AS money_sum " +
"FROM " +
"order_details " +
"GROUP BY oid " +
") AS d " +
"ON i.oid = d.oid " +
") AS t1 " +
"GROUP BY t1.uid " +
") AS t2 " +
") AS t3 " +
"WHERE t3.total <= 3 " +
") AS t4 " +
"GROUP BY t4.area"
}
// 执行SQL
val result = session.sql(sql)
// 5. 使用foreach将结果打印到控制台
result.foreach(row => {
System.out.println(row.getString(0) + ":" + row.getString(1))
})
// 或者使用这种方式更简单:result.foreach(println(_))
// 6. 关闭SparkSession
session.stop()
}
}
用SQL
实现TopN
需求的Java代码。
package com.itechthink.sql;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* Java版用SQL实现TopN
* 计算每个地区用户当天消费TopN
*
* 1. 先创建SparkSession
*
* 2. 使用SparkSession中的load操作加载两份json数据
*
* 3. 将这两份数据注册到临时表中
*
* 4. 执行SQL计算TopN消费数据
*
* 5. 使用foreach将结果打印到控制台
*/
public class JavaSQLTopN {
public static void main(String[] args) {
// 1. 先创建SparkContext
SparkSession session = SparkSession.builder()
.appName("JavaSQLTopN")
.master("local")
.getOrCreate();
// 2. 加载数据
Dataset<Row> orderInfo = session.read().format("json").load("/Users/bear/Downloads/order_info.log");
Dataset<Row> orderDetails = session.read().format("json").load("/Users/bear/Downloads/order_details.log");
// 3. 注册临时表
orderInfo.createOrReplaceTempView("order_info");
orderDetails.createOrReplaceTempView("order_details");
// 4. 执行SQL计算TopN消费数据
// 订单信息(order_info):oid、uid、area
// 订单详情(order_details):oid、money
String sql =
// 行转列
"SELECT " +
"t4.area, CONCAT_WS(',', COLLECT_LIST(t4.TopN)) AS TopN " +
"FROM (" +
"SELECT " +
// 取TopN
"t3.area, CONCAT(t3.uid, ':', CAST(t3.money_sum_all AS INT)) AS TopN " +
"FROM ( " +
"SELECT " +
// 用开窗函数实现分组排序
"t2.uid, t2.area, t2.money_sum_all, row_number() over (PARTITION BY area ORDER BY money_sum_all DESC) AS total " +
"FROM ( " +
"SELECT " +
"t1.uid, max(t1.area) AS area, sum(t1.money_sum) AS money_sum_all " +
"FROM ( " +
"SELECT " +
"i.oid, i.uid, i.area, d.money_sum " +
"FROM " +
"order_info AS i " +
"JOIN ( " +
"SELECT " +
"oid, sum(money) AS money_sum " +
"FROM " +
"order_details " +
"GROUP BY oid " +
") AS d " +
"ON i.oid = d.oid " +
") AS t1 " +
"GROUP BY t1.uid " +
") AS t2 " +
") AS t3 " +
"WHERE t3.total <= 3 " +
") AS t4 " +
"GROUP BY t4.area";
// 执行SQL
Dataset<Row> result = session.sql(sql);
// 5. 使用foreach将结果打印到控制台
result.foreach(row -> {
System.out.println(row.getString(0) + ":" + row.getString(1));
});
// 6. 关闭SparkContext
session.stop();
}
}
CN:8407173251008:120, 8407173251003:60, 8407173251014:50
ID:8407173251005:160, 8407173251010:140, 8407173251002:70
US:8407173251015:180, 8407173251012:70, 8407173251001:60
感谢支持
更多内容,请移步《超级个体》。