您好,登录后才能下订单哦!
# Spark创建RDD的方式有哪些
## 目录
1. [RDD核心概念回顾](#1-rdd核心概念回顾)
2. [从集合创建RDD](#2-从集合创建rdd)
3. [从外部存储系统创建RDD](#3-从外部存储系统创建rdd)
4. [从其他RDD转换创建](#4-从其他rdd转换创建)
5. [特殊创建方式](#5-特殊创建方式)
6. [RDD创建的最佳实践](#6-rdd创建的最佳实践)
7. [总结](#7-总结)
---
## 1. RDD核心概念回顾
**弹性分布式数据集(RDD)** 是Spark中最基本的数据抽象,具有以下核心特性:
- **不可变性**:只读的数据分区集合
- **弹性**:支持数据重建的容错机制
- **分布式**:数据跨集群节点存储
- **惰性求值**:转换操作延迟执行
RDD的五大核心属性:
1. 分区列表
2. 计算每个分区的函数
3. 与其他RDD的依赖关系
4. 键值RDD的分区器
5. 计算每个分区的首选位置
## 2. 从集合创建RDD
### 2.1 parallelize方法
最基础的创建方式,适用于小规模测试数据
```scala
val data = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(data)
// 指定分区数
val rdd = sc.parallelize(data, 5)
特点: - 数据完全加载到驱动程序内存 - 适合开发和测试场景 - 分区数默认为集群的CPU核心数
Spark提供的增强版parallelize
val rdd = sc.makeRDD(Seq(1, 2, 3))
// 支持指定分区位置偏好
val rdd = sc.makeRDD(Seq(
(1 to 10, Seq("host1")),
(11 to 20, Seq("host2"))
))
优势: - 支持数据位置偏好设置 - 内部实现更高效
// 本地文件系统
val rdd = sc.textFile("file:///path/to/file")
// HDFS文件
val rdd = sc.textFile("hdfs://namenode:8020/path")
// 通配符匹配
val rdd = sc.textFile("/input/*.log")
// 整个目录
val rdd = sc.wholeTextFiles("/input/")
关键参数:
- minPartitions
:最小分区数
- wholeTextFiles
返回(文件名,内容)的键值对
val rdd = sc.binaryFiles("/path/to/binfiles")
// 保存
rdd.saveAsObjectFile("/output")
// 读取
val rdd = sc.objectFile("/output")
val rdd = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat]("/input")
支持格式: - SequenceFile - Avro - Parquet - 其他Hadoop兼容格式
// JDBC连接
val rdd = JdbcRDD.create(
sc,
() => DriverManager.getConnection("jdbc:postgresql://localhost/test"),
"SELECT * FROM table WHERE ? <= id AND id <= ?",
1, 100, 3,
r => (r.getInt(1), r.getString(2))
注意事项: - 需要将JDBC驱动jar包添加到classpath - 合理设置分区数避免连接数过多
// map转换
val newRDD = rdd.map(_ * 2)
// filter转换
val filtered = rdd.filter(_ > 3)
// flatMap转换
val words = lines.flatMap(_.split(" "))
// reduceByKey
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
// groupByKey
val grouped = pairs.groupByKey()
// join操作
val joined = rdd1.join(rdd2)
// mapPartitions
val partRDD = rdd.mapPartitions(iter => iter.map(_ * 2))
// repartition
val repartRDD = rdd.repartition(10)
// coalesce
val coalesced = rdd.coalesce(5)
// 创建数字序列
val rangeRDD = sc.range(0, 100, 1, 5)
// 随机双精度数
val randomRDD = RandomRDDs.normalRDD(sc, 100L)
val empty = sc.emptyRDD[String]
val pairRDD = sc.parallelizePairs(Seq(
("key1", 1), ("key2", 2)
))
sc.textFile(path, minPartitions)
// 控制并行化数据大小
val MAX_MEMORY = Runtime.getRuntime.maxMemory / 2
if (data.size > MAX_MEMORY) {
// 改用外部存储方式
}
// 添加随机前缀解决倾斜
val salted = rdd.map {
case (k, v) => (s"${Random.nextInt(10)}_$k", v)
}
// 使用Kryo序列化
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
Spark提供了多样化的RDD创建方式以满足不同场景需求:
创建方式 | 适用场景 | 特点 |
---|---|---|
并行化集合 | 小规模测试数据 | 简单快速 |
外部存储系统 | 生产环境大数据 | 支持多种文件格式 |
RDD转换 | 数据处理流水线 | 惰性求值 |
特殊方法 | 特定需求场景 | 功能专一 |
未来趋势: - 虽然DataSet/DataFrame API逐渐成为主流 - RDD仍然是底层核心抽象 - 理解RDD创建原理对性能调优至关重要
通过合理选择创建方式,可以显著提升Spark应用的执行效率和资源利用率。 “`
这篇文章共计约2700字,采用Markdown格式编写,包含: 1. 7个主要章节 2. 多个代码示例 3. 表格对比 4. 结构化标题 5. 关键点强调 6. 最佳实践建议
内容全面覆盖了Spark创建RDD的各种方式及其应用场景,适合作为技术文档或学习资料使用。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。