WordCount:词频统计
每个新手软件工程师肯定知道什么是Hello World。
而Word Count在大数据领域的地位,就相当于Hello World在编程语言中的地位。
所谓Word Count,就是用大数据系统来读取某个文件中所有的(文本)内容,然后统计其中每个单词出现的次数。
这里统一引入开发Spark应用程序所需要的依赖。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.1</version>
</dependency>
Scala实现单词统计
package com.itechthink.rdd
import org.apache.spark.{SparkConf, SparkContext}
/**
* Scala版单词统计
*
*/
object ScalaWordCount {
// 判断是否以字母结尾
private def endsWithLetter(s: String): Boolean = {
s.lastOption.exists(_.isLetter)
}
def main(args: Array[String]): Unit = {
// 1. 先创建SparkContext
val conf = new SparkConf()
// 设置应用程序名称
.setAppName("WordCount")
// 设置Spark运行模式
.setMaster("local")
val context = new SparkContext(conf)
// 2. 加载数据
val linesRDD = context.textFile("/Users/bear/Downloads/wordcount.txt")
// 3. transformation操作
// 对数据进行切割
val wordsRDD = linesRDD.flatMap(_.split(" "))
// 将每个单词转换成(word, 1)的形式
val wordCountRDD = wordsRDD.map(word => {
// 判断结尾是否是字母
if (endsWithLetter(word)) {
(word, 1)
} else {
// 去除尾部非字母字符
(word.dropRight(1), 1)
}
// 按照key分组并对._2元素进行累加求和
}).reduceByKey(_ + _)
// 4. action操作
wordCountRDD.foreach(println)
// 5. 关闭SparkContext
context.stop()
}
}
Java实现单词统计
package com.itechthink.rdd;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
* Java版单词统计
*
*/
public class JavaWordCount {
public static void main(String[] args) {
// 1. 先创建SparkContext
SparkConf conf = new SparkConf()
// 设置应用程序名称
.setAppName("JavaWordCount")
// 设置Spark运行模式
.setMaster("local");
JavaSparkContext context = new JavaSparkContext(conf);
// 2. 加载数据
JavaRDD<String> linesRDD = context.textFile("/Users/bear/Downloads/wordcount.txt");
// 3. transformation操作
JavaRDD<String> wordsRDD = linesRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
});
JavaPairRDD<String, Integer> pairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String words) throws Exception {
// 判断是否以字母结尾
boolean endsWithLetter = Character.isLetter(words.charAt(words.length() - 1));
// 如果不是以字母结尾,则去掉最后一位
if (!endsWithLetter) {
words = words.substring(0, words.length() - 1);
}
return new Tuple2<>(words, 1);
}
});
// 分组聚合
JavaPairRDD<String, Integer> resultRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 4. action操作
resultRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> input) throws Exception {
System.out.println(input._1 + " : " + input._2);
}
});
// 5. 关闭SparkContext
context.close();
}
}
从代码行数可以看出来,Scala的代码量明显比Java的代码量少,而且更简洁易懂。
提交任务
有三种方式可以将任务提交到Spark中执行。
第一种方式:直接在IDEA编辑器中运行任务代码。
第二种方式:将代码打包,然后使用spark-submit将打好的
jar
包提交到Spark集群中执行。修改之前的添加的依赖(增加
<scope>provided</scope>
)项,并添加新的编译和打包插件配置内容。<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.1</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <!-- java编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <!-- scala编译插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>4.8.0</version> <configuration> <scalaCompatVersion>2.12</scalaCompatVersion> <scalaVersion>2.12.19</scalaVersion> </configuration> <executions> <execution> <id>compile-scala</id> <phase>compile</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>test-compile-scala</id> <phase>test-compile</phase> <goals> <goal>add-source</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <!-- 打包插件 --> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
修改代码中
textFile("...")
中的文件路径,例如,用HDFS文件的存储路径替代。或者可以通过命令行参数args
动态指定。执行打包命令
mvn clean package -DskipTests
。提交任务的脚本如下。
> ./bin/spark-submit \ --class com.itechthink.ScalaWordCount \ # 如果是部署到YARN,则可以输入`yarn` --master spark://hadoop:7077 \ # 这里可以选择`client`、`standalone`或`cluster` --deploy-mode client \ --executor-memory 1G \ --num-executors 1 \ /home/work/volumes/spark/job/ScalaJavaSpark-1.0-SNAPSHOT-jar-with-dependencies.jar \ # 指定数据源路径 hdfs://hadoop:9000/spark/job/wordcount.txt
第三种方式:使用Spark Shell在集群环境调试代码。执行Spark Shell命令,默认会在当前的机器上启动一个
local
本地集群环境。
> cd /home/work/spark-3.2.1
> ./bin/spark-shell
Spark context Web UI available at http://hadoop:4040
Spark context available as 'sc' (master = local[*], app id = local-1680786598577).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.2.1
/_/
Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_401)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
启动Spark Shell后的几行信息表明如下事实。
- Spark会在当前机器的
4040
端口启动一个Spark context Web UI
服务,也就是在浏览器中输入http://hadoop:4040就能访问本机的Spark Web UI,通过它就能查看任务调试的进度及实时运行状况。

已经创建好了一个名为
sc
的Spark Context,可以直接使用了。在当前的机器上启动一个
local
本地集群环境:master = local[*], app id = local-1680786598577
。当前任务的会话名称是
spark
(Spark session available as 'spark'.
)。只能使用Scala代码来调试。
可以输入
:help
来查看帮助。
下面是一个完整的Spark Shell调试过程。
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.2.1
/_/
scala> val linesRDD = sc.textFile("hdfs://hadoop:9000/spark/job/wordcount.txt")
linesRDD: org.apache.spark.rdd.RDD[String] = hdfs://hadoop:9000/spark/job/wordcount.txt MapPartitionsRDD[6] at textFile at <console>:23
scala> val wordsRDD = linesRDD.flatMap(_.split(" "))
wordsRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at flatMap at <console>:23
scala> def endsWithLetter(s: String): Boolean = {s.lastOption.exists(_.isLetter)}
endsWithLetter: (s: String)Boolean
scala> val pairsRDD = wordsRDD.map(word => {if (endsWithLetter(word)) {(word, 1)} else {(word.dropRight(1), 1)}}).reduceByKey(_ + _)
pairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:24
scala> pairsRDD.foreach(println)
(sweet,1)> (0 + 2) / 2]
(And,1)
(largess,1)
(So,1)
(free,1)
......
scala> sc.stop()
这也是为什么要想真正把Spark用好,就必须先学好Scala 2的根本原因。
当然,如果想一次调试多行,同样可以通过Scala 2的:paste
命令来实现。
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.2.1
/_/
scala> :paste
// Entering paste mode (ctrl-D to finish)
......
// Nothing pasted, nothing gained.
关于使用Spark Shell的更多方式,可以通过spark-shell -h
命令来查看。
开启HistoryServer服务
如果使用YARN
模式执行Spark任务,默认是没有日志信息输出的。
如果希望有日志输出,可以通过开启Spark的History Server服务的方式实现。
可以选择集群中任意一台机器,或者在客户端节点上修改Spark的配置。
> cd /home/work/spark-3.2.1
> cp spark-defaults.conf.template spark-defaults.conf
> vi spark-defaults.conf
# 在文件末尾添加内容
spark.eventLog.enabled=true
spark.eventLog.compress=true
spark.eventLog.dir=hdfs://hadoop:9000/spark/logs
spark.history.fs.logDirectory=hdfs://hadoop:9000/spark/logs
# 这里的hadoop机器可以和前面的不是同一台,也就是指定在哪台机器上开启historyServer服务
spark.yarn.historyServer.address=http://hadoop:18080
然后修改配置文件spark-env.sh
。
> vi spark-env.sh
# 在文件末尾添加内容
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://hadoop:9000/spark/logs"
> cd /home/work/spark-3.2.1
> ./sbin/start-history-server.sh
starting org.apache.spark.deploy.history.HistoryServer, logging to /home/work/spark-3.2.1/logs/spark-root-org.apache.spark.deploy.history.HistoryServer-1-hadoop.out
[root@hadoop spark-3.2.1]# jps
219283 ResourceManager
227409 Jps
219446 NodeManager
219019 SecondaryNameNode
218591 NameNode
218798 DataNode
227358 HistoryServer
171820 Worker
171709 Master
如果能够看到HistoryServer
进程,说明已经启动成功。
修改之前的spark-submit任务命令,将--master spark://hadoop:7077 \
改为--master yarn \
,然后执行。
此时就可以在本机Hadoop的http://localhost:8088/cluster/apps中看到正在执行的任务了。
鼠标左键单击Tracking UI
列中的ApplicationMaster
链接,就能看到Spark的任务运行进程信息(因为这里没有配置Spark的YARN集群模式,所以查看不了)。
感谢支持
更多内容,请移步《超级个体》。