SQL操作
原创大约 9 分钟
基本概念
Spark SQL是Spark的一个模块,主要就是用于大数据中结构化数据的处理。
Spark SQL最核心的组件就是DataFrame
,可以把它理解为MySQL中的表,它可以通过多种不同的数据源来构建数据,例如文本文件、HDFS中的文件、Hive中的表、MySQL中的表、Spark自己的RDD等。
要编写Spark SQL代码,需要引入相关依赖。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.2.1</version>
</dependency>
一个简单的使用Spark SQL的小栗子。
Scala 2代码。
package com.itechthink.sql
import org.apache.spark.sql.SparkSession
/**
* Scala版Spark SQL
*
*/
object ScalaSQLDemo {
def main(args: Array[String]): Unit = {
// 1. 先创建SparkSession
val session = SparkSession.builder()
.appName("ScalaSQLDemo")
.master("local")
.getOrCreate()
// 2. 加载数据
val json = session.read.json("/user.json")
// 3. 查看Dataframe中的数据
json.show()
// 4. 关闭SparkSession
session.stop()
}
}
Java代码。
package com.itechthink.sql;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* Java版Spark SQL
*
*/
public class JavaSQLDemo {
public static void main(String[] args) {
// 1. 先创建SparkSession
SparkSession session = SparkSession.builder()
.appName("ScalaSQLDemo")
.master("local")
.getOrCreate();
// 2. 加载数据
Dataset<Row> json = session.read().json("/user.json");
// 3. 查看Dataframe中的数据
json.show();
// 4. 关闭SparkSession
session.stop();
}
}
它们运行后的结果相同。
+---+------+---------+
|age|gender| name|
+---+------+---------+
| 19| male|lixingyun|
| 18| male| wanglin|
| 20| male| xiaoyan|
| 22| male| luofeng|
| 21| male| qinyu|
+---+------+---------+
常见算子
package com.itechthink.sql
import org.apache.spark.sql.SparkSession
/**
* Scala版Spark SQL常见算子
*
* printSchema(): 打印Dataframe的Schema
* show(): 打印Dataframe中的数据
* select(): 选择列
* filter(): filter过滤
* where(): where过滤
* groupBy(): 分组
* count(): 统计
*/
object ScalaSQLOperator {
def main(args: Array[String]): Unit = {
// 1. 先创建SparkSession
val session = SparkSession.builder()
.appName("ScalaSQLOperator")
.master("local")
.getOrCreate()
// 2. 加载数据
val dataframe = session.read.json("/user.json")
// 打印Dataframe的Schema
//dataframe.printSchema()
// 打印Dataframe中的数据,默认显示所有数据,可以通过参数控制显示多少条
//dataframe.show(3)
// 选择列
//dataframe.select("name", "age").show()
// 使用select的时候可以对数据做一些操作,需要添加隐式转换函数,否则语法报错
//import session.implicits._
//dataframe.select($"name", $"age" + 1).show()
// filter过滤
//dataframe.filter("age > 20").show()
// 或者通过隐式转换函数进行
//import session.implicits._
//dataframe.filter($"age" > 20).show()
// where过滤
//dataframe.where("age > 20").show()
// 或者通过隐式转换函数进行
//import session.implicits._
//dataframe.where($"age" > 20).show()
// 分组
dataframe.groupBy("age").count().show()
// 3. 关闭SparkSession
session.stop()
}
}
package com.itechthink.sql;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.col;
/**
* Java版Spark SQL常见算子
*
* printSchema(): 打印Dataframe的Schema
* show(): 打印Dataframe中的数据
* select(): 选择列
* filter(): filter过滤
* where(): where过滤
* groupBy(): 分组
* count(): 统计
*/
public class JavaSQLOperator {
public static void main(String[] args) {
// 1. 先创建SparkSession
SparkSession session = SparkSession.builder()
.appName("JavaSQLOperator")
.master("local")
.getOrCreate();
// 2. 加载数据
Dataset<Row> dataset = session.read().json("/user.json");
// 打印Dataframe的Schema
//dataset.printSchema();
// 打印Dataframe中的数据,默认显示所有数据,可以通过参数控制显示多少条
//dataset.show(3);
// 选择列
//dataset.select("name", "age").show();
// 使用select的时候可以对数据做一些操作,需要引入额外的包 import static org.apache.spark.sql.functions.col;
//dataset.select(col("name"), col("age").plus(1)).show();
// filter过滤
//dataset.filter("age > 20").show();
//dataset.filter(col("age").gt(20)).show();
// where过滤
//dataset.where("age > 20").show();
//dataset.where(col("age").gt(20)).show();
// 分组
dataset.groupBy("age").count().show();
// 3. 关闭SparkSession
session.stop();
}
}
SQL语句
上面是通过算子的形式使用的Spark SQL。
DataFrame也支持直接使用SQL
语句进行查询。
通过DataFrame实现SQL
查询的Scala 2实例代码。
package com.itechthink.sql
import org.apache.spark.sql.SparkSession
/**
* Scala版Spark SQL查询
*
*/
object ScalaSQLQuery {
def main(args: Array[String]): Unit = {
// 1. 先创建SparkSession
val session = SparkSession.builder()
.appName("ScalaSQLOperator")
.master("local")
.getOrCreate()
// 2. 加载数据
val dataframe = session.read.json("/user.json")
// 将DataFrame注册为临时视图
dataframe.createOrReplaceTempView("user")
// 使用SQL查询临时表中的数据
session.sql("select * from user").show()
// 3. 关闭SparkSession
session.stop()
}
}
package com.itechthink.sql;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* Java版Spark SQL查询
*
*/
public class JavaSQLQuery {
public static void main(String[] args) {
// 1. 先创建SparkSession
SparkSession session = SparkSession.builder()
.appName("JavaSQLOperator")
.master("local")
.getOrCreate();
// 2. 加载数据
Dataset<Row> dataset = session.read().json("/user.json");
// 将DataFrame注册为临时视图
dataset.createOrReplaceTempView("user");
// 使用SQL查询临时表中的数据
session.sql("select * from user").show();
// 3. 关闭SparkSession
session.stop();
}
}
RDD转换为DataFrame
Spark支持两种方式将RDD转换为DataFrame:反射方式
和编程方式
。
反射方式
package com.itechthink.sql
import org.apache.spark.sql.SparkSession
/**
* Scala版通过反射的方式实现RDD转换为DataFrame
*
*/
object ScalaRDD2DataFrameByReflection {
def main(args: Array[String]): Unit = {
// 1. 先创建SparkSession
val session = SparkSession.builder()
.appName("ScalaRDD2DataFrameByReflection")
.master("local")
.getOrCreate()
// 2. 获取SparkContext
val sc = session.sparkContext
// 3. 创建RDD
val dataRDD = sc.parallelize(Array(("lixingyun", 18), ("wanglin", 19), ("xiaoyan", 18), ("luofeng", 21)))
// 4. 将RDD转换为DataFrame
// 需要导入隐式转换函数
import session.implicits._
val dataframe = dataRDD.map(x => User(x._1, x._2)).toDF()
// 通过DataFrame操作RDD中的数据
dataframe.createOrReplaceTempView("user")
val res = session.sql("select * from user where age > 18")
res.show()
// 再将DataFrame转换为RDD
val resRDD = res.rdd.map(x => User(x.getString(0), x.getInt(1)))
resRDD.collect().foreach(println)
// 5. 关闭SparkSession
session.stop()
}
}
// 定义User类
case class User(name: String, age: Int)
package com.itechthink.sql;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/**
* Java版通过反射的方式实现RDD转换为DataFrame
*
*/
public class JavaRDD2DataFrameByReflection {
public static void main(String[] args) {
// 1. 先创建SparkSession
SparkSession session = SparkSession.builder()
.appName("JavaRDD2DataFrameByReflection")
.master("local")
.getOrCreate();
// 2. 获取SparkContext
JavaSparkContext sc = JavaSparkContext.fromSparkContext(session.sparkContext());
// 3. 创建RDD
Tuple2<String, Integer> t1 = new Tuple2<String, Integer>("lixingyun", 18);
Tuple2<String, Integer> t2 = new Tuple2<String, Integer>("wanglin", 19);
Tuple2<String, Integer> t3 = new Tuple2<String, Integer>("xiaoyan", 18);
Tuple2<String, Integer> t4 = new Tuple2<String, Integer>("luofeng", 21);
JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
// 4. 将RDD转换为DataFrame
JavaRDD<User2> mapRDD = dataRDD.map(new Function<Tuple2<String, Integer>, User2>() {
@Override
public User2 call(Tuple2<String, Integer> input) throws Exception {
return new User2(input._1, input._2);
}
});
Dataset<Row> dataframe = session.createDataFrame(mapRDD, User2.class);
// 通过DataFrame操作RDD中的数据
dataframe.createOrReplaceTempView("user");
Dataset<Row> res = session.sql("select * from user where age > 18");
res.show();
// 再将DataFrame转换为RDD
JavaRDD<Row> resRDD = res.javaRDD();
List<User2> list = resRDD.map(new Function<Row, User2>() {
@Override
public User2 call(Row row) throws Exception {
return new User2(row.getAs("name").toString(), Integer.parseInt(row.getAs("age").toString()));
}
}).collect();
for (User2 user : list) {
System.out.println(user);
}
// 5. 关闭SparkSession
session.stop();
}
}
public class User2 implements Serializable {
private String name;
private int age;
public User2(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User2{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
编程方式
这种方式比较适合需要动态构建元数据的场景。
package com.itechthink.sql
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
/**
* Scala版通过编程的方式实现RDD转换为DataFrame
*
*/
object ScalaRDD2DataFrameByProgramming {
def main(args: Array[String]): Unit = {
// 1. 先创建SparkSession
val session = SparkSession.builder()
.appName("ScalaRDD2DataFrameByProgramming")
.master("local")
.getOrCreate()
// 2. 获取SparkContext
val sc = session.sparkContext
// 3. 创建RDD
val dataRDD = sc.parallelize(Array(("lixingyun", 18), ("wanglin", 19), ("xiaoyan", 18), ("luofeng", 21)))
// 4. 将RDD转换为DataFrame
// 因为不知道元数据信息,所以需要组装rowRDD
val rowRDD = dataRDD.map(x => Row(x._1, x._2))
// 指定schema元数据信息,这个元数据信息可以动态从外部获取
val schema = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
// 组装DataFrame
val dataframe = session.createDataFrame(rowRDD, schema)
// 通过DataFrame来操作dataRDD只能够的数据
dataframe.createOrReplaceTempView("user")
val res = session.sql("select * from user where age > 18")
res.show()
// 再将DataFrame转换为RDD
val resRDD = res.rdd.map(x => (x.getString(0), x.getInt(1)))
resRDD.collect().foreach(println)
// 5. 关闭SparkSession
session.stop()
}
}
package com.itechthink.sql;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;
/**
* Java版通过编程的方式实现RDD转换为DataFrame
*
*/
public class JavaRDD2DataFrameByProgramming {
public static void main(String[] args) {
// 1. 先创建SparkSession
SparkSession session = SparkSession.builder()
.appName("JavaRDD2DataFrameByProgramming")
.master("local")
.getOrCreate();
// 2. 获取SparkContext
JavaSparkContext sc = JavaSparkContext.fromSparkContext(session.sparkContext());
// 3. 创建RDD
Tuple2<String, Integer> t1 = new Tuple2<String, Integer>("lixingyun", 18);
Tuple2<String, Integer> t2 = new Tuple2<String, Integer>("wanglin", 19);
Tuple2<String, Integer> t3 = new Tuple2<String, Integer>("xiaoyan", 18);
Tuple2<String, Integer> t4 = new Tuple2<String, Integer>("luofeng", 21);
JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(Arrays.asList(t1, t2, t3, t4));
// 4. 将RDD转换为DataFrame
// 因为不知道元数据信息,所以需要组装rowRDD
JavaRDD<Row> rowRDD = dataRDD.map(new Function<Tuple2<String, Integer>, Row>() {
@Override
public Row call(Tuple2<String, Integer> input) throws Exception {
return RowFactory.create(input._1, input._2);
}
});
// 指定schema元数据信息,这个元数据信息可以动态从外部获取
List<StructField> columns = new ArrayList<>();
columns.add(DataTypes.createStructField("name", StringType, true));
columns.add(DataTypes.createStructField("age", IntegerType, true));
StructType schema = DataTypes.createStructType(columns);
// 组装DataFrame
Dataset<Row> dataframe = session.createDataFrame(rowRDD, schema);
// 通过DataFrame操作RDD中的数据
dataframe.createOrReplaceTempView("user");
Dataset<Row> res = session.sql("select * from user where age > 18");
res.show();
// 再将DataFrame转换为RDD
JavaRDD<Row> resRDD = res.javaRDD();
List<Tuple2<String, Integer>> list = resRDD.map(new Function<Row, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(Row row) throws Exception {
return new Tuple2(row.getAs("name").toString(), Integer.parseInt(row.getAs("age").toString()));
}
}).collect();
for (Tuple2<String, Integer> tuple2 : list) {
System.out.println(tuple2._1 + ":" + tuple2._2);
}
// 5. 关闭SparkSession
session.stop();
}
}
Load和Save
除了使用SparkContext
从RDD转换数据,DataFrame也有自己的数据加载和保存方法,那就是Load
操作和Save
操作。
Load
操作和Save
的Scala 2实例代码。
package com.itechthink.sql
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* Scala版load和save操作
*
*/
object ScalaLoadAndSaveData {
def main(args: Array[String]): Unit = {
// 1. 先创建SparkSession
val session = SparkSession.builder()
.appName("ScalaLoadAndSaveData")
.master("local")
.getOrCreate()
// 2. 加载数据
val dataframe = session.read.format("json").load("/Users/bear/Downloads/user.json")
// 3. 查询并保存数据
dataframe.select("name", "age")
.write
.format("csv")
// 指定不同的保存模式:SaveMode.Overwrite、SaveMode.Append、SaveMode.Ignore、SaveMode.ErrorIfExists
.mode(SaveMode.Overwrite)
.save("/Users/bear/Downloads/user")
// 4. 关闭SparkSession
session.stop()
}
}
Load
操作和Save
的Java实例代码。
package com.itechthink.sql;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
/**
* Java版load和save操作
*
*/
public class JavaLoadAndSaveData {
public static void main(String[] args) {
// 1. 先创建SparkSession
SparkSession session = SparkSession.builder()
.appName("JavaLoadAndSaveData")
.master("local")
.getOrCreate();
// 2. 加载数据
Dataset<Row> dataset = session.read().json("/Users/bear/Downloads/user.json");
// 3. 查询并保存数据
dataset.select("name", "age")
.write()
.format("csv")
// 指定不同的保存模式:SaveMode.Overwrite、SaveMode.Append、SaveMode.Ignore、SaveMode.ErrorIfExists
.mode(SaveMode.Overwrite)
.save("/Users/bear/Downloads/user");
// 4. 关闭SparkSession
session.stop();
}
}
感谢支持
更多内容,请移步《超级个体》。