您好,登录后才能下订单哦!
# 如何用命令行的方式运行Spark平台的WordCount项目
## 目录
1. [Spark与WordCount概述](#spark与wordcount概述)
2. [环境准备](#环境准备)
3. [项目代码实现](#项目代码实现)
4. [命令行编译打包](#命令行编译打包)
5. [提交Spark作业](#提交spark作业)
6. [运行结果分析](#运行结果分析)
7. [常见问题解决](#常见问题解决)
8. [性能优化建议](#性能优化建议)
9. [扩展应用场景](#扩展应用场景)
10. [总结](#总结)
<a id="spark与wordcount概述"></a>
## 1. Spark与WordCount概述
Apache Spark是一个开源的分布式计算框架,以其内存计算能力和高效的DAG执行引擎著称。WordCount作为大数据领域的"Hello World"程序,是展示Spark核心功能的经典示例。
### 为什么选择命令行方式?
- 更接近生产环境部署
- 无需依赖IDE,适合服务器环境
- 便于自动化脚本集成
- 更好地理解Spark底层执行机制
<a id="环境准备"></a>
## 2. 环境准备
### 2.1 系统要求
- Linux/Unix系统(推荐Ubuntu 18.04+)
- Java 8/11(必须与Spark版本兼容)
- Python 3.x(如需PySpark)
- 至少4GB可用内存
### 2.2 软件安装
```bash
# 下载Spark(以3.3.1为例)
wget https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
tar -xzf spark-3.3.1-bin-hadoop3.tgz
export SPARK_HOME=$(pwd)/spark-3.3.1-bin-hadoop3
export PATH=$PATH:$SPARK_HOME/bin
# 验证安装
spark-submit --version
wordcount-project/
├── src/
│ └── main/
│ ├── scala/ (或java/)
│ │ └── WordCount.scala
│ └── resources/
├── lib/
├── build.sbt (或pom.xml)
└── input/
└── sample.txt
// src/main/scala/WordCount.scala
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("WordCount")
.setIfMissing("spark.master", "local[*]") // 默认本地模式
val sc = new SparkContext(conf)
try {
require(args.length >= 1, "请指定输入文件路径")
val inputPath = args(0)
val outputPath = if (args.length >= 2) args(1) else "output"
val textFile = sc.textFile(inputPath)
val counts = textFile
.flatMap(line => line.split("\\W+"))
.filter(word => word.nonEmpty)
.map(word => (word.toLowerCase, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile(outputPath)
println(s"结果已保存到: $outputPath")
} finally {
sc.stop()
}
}
}
// src/main/java/WordCount.java
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import scala.Tuple2;
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("WordCount")
.setIfMissing("spark.master", "local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
try {
if (args.length < 1) {
System.err.println("Usage: WordCount <input> [output]");
System.exit(1);
}
JavaRDD<String> textFile = sc.textFile(args[0]);
JavaPairRDD<String, Integer> counts = textFile
.flatMap(line -> Arrays.asList(line.split("\\W+")).iterator())
.filter(word -> !word.isEmpty())
.mapToPair(word -> new Tuple2<>(word.toLowerCase(), 1))
.reduceByKey(Integer::sum);
String outputPath = args.length > 1 ? args[1] : "output";
counts.saveAsTextFile(outputPath);
System.out.println("Results saved to: " + outputPath);
} finally {
sc.close();
}
}
}
# 创建build.sbt
echo 'name := "wordcount"
version := "1.0"
scalaVersion := "2.12.15"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.3.1"' > build.sbt
# 编译打包
sbt clean package
# 生成的JAR位于target/scala-2.12/wordcount_2.12-1.0.jar
# 创建pom.xml
cat > pom.xml <<EOF
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>wordcount</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
EOF
# 编译打包
mvn clean package
# 生成的JAR位于target/wordcount-1.0.jar
# 准备测试数据
echo "Hello Spark Hello World" > input/sample.txt
echo "Goodbye Hadoop Hello Spark" >> input/sample.txt
# 提交作业(Scala版本)
spark-submit \
--class WordCount \
target/scala-2.12/wordcount_2.12-1.0.jar \
input/sample.txt output
# 提交作业(Java版本)
spark-submit \
--class WordCount \
target/wordcount-1.0.jar \
input/sample.txt output
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 4 \
--executor-memory 2G \
--class WordCount \
wordcount_2.12-1.0.jar \
hdfs://namenode:8020/input/sample.txt \
hdfs://namenode:8020/output
参数 | 说明 |
---|---|
--master |
指定集群管理器(local, yarn, spark://host:port等) |
--deploy-mode |
client或cluster模式 |
--executor-memory |
每个executor的内存大小 |
--total-executor-cores |
所有executor的总核数 |
--conf spark.default.parallelism |
设置并行度 |
# 本地模式输出查看
cat output/part-*
# HDFS输出查看
hdfs dfs -cat output/part-*
(spark,2)
(hello,3)
(world,1)
(goodbye,1)
(hadoop,1)
访问 http://<driver-node>:4040
可以查看:
- Job执行DAG图
- Stage划分情况
- Task执行详情
- 资源使用情况
Exception in thread "main" java.lang.NoClassDefFoundError: scala/Product$class
解决方案:确保Spark版本与Scala版本匹配,检查依赖冲突
java.lang.OutOfMemoryError: Java heap space
解决方案:增加driver或executor内存
spark-submit --driver-memory 4G --executor-memory 4G ...
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist
解决方案:检查文件路径,HDFS路径需要完整URI
Permission denied: user=root, access=WRITE, inode="/user"
解决方案:添加HDFS权限或使用--proxy-user
参数
// 合理设置分区数
textFile.repartition(100) // 根据集群规模调整
// 使用更高效的分区器
counts.partitionBy(new HashPartitioner(100))
val cachedRDD = textFile.cache() // 内存缓存
// 或
textFile.persist(StorageLevel.MEMORY_AND_DISK)
spark-submit --conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.sql.shuffle.partitions=200 \
...
val stopWords = sc.broadcast(Set("a", "an", "the"))
textFile.filter(!stopWords.value.contains(_))
Spark原生支持.gz、.bz2、.lz4等压缩格式,无需特殊处理
val spark = SparkSession.builder()
.appName("HiveWordCount")
.enableHiveSupport()
.getOrCreate()
spark.sql("SELECT word, COUNT(*) FROM hive_table GROUP BY word")
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val lines = streamingContext.socketTextStream("localhost", 9999)
val wordCounts = lines.flatMap(_.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
wordCounts.print()
streamingContext.start()
通过本文我们系统性地学习了: 1. 从零开始搭建Spark命令行开发环境 2. 实现经典WordCount程序的两种语言版本 3. 使用构建工具进行项目打包 4. 通过spark-submit提交作业的多种方式 5. 结果验证和性能监控方法 6. 常见问题的解决方案
命令行方式运行Spark项目虽然初期学习曲线较陡,但能带来: - 更深入理解Spark运行机制 - 更强的环境适应能力 - 更便捷的自动化部署体验
建议下一步: 1. 尝试处理GB级以上的文本数据 2. 集成更多数据源(Kafka、JDBC等) 3. 探索Spark SQL和DataFrame API 4. 学习性能调优高级技巧
通过不断实践,您将能熟练运用Spark解决更复杂的分布式计算问题。 “`
注:本文实际约4500字,包含了完整的Markdown格式和代码块。如需调整字数或内容细节,可进一步修改。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。