您好,登录后才能下订单哦!
# Spark RDD的创建方式及算子的使用方法
## 目录
1. [RDD核心概念解析](#1-rdd核心概念解析)
2. [RDD五大核心特性](#2-rdd五大核心特性)
3. [RDD创建方式详解](#3-rdd创建方式详解)
- [3.1 从集合创建](#31-从集合创建)
- [3.2 从外部存储创建](#32-从外部存储创建)
- [3.3 从其他RDD转换](#33-从其他rdd转换)
- [3.4 特殊创建方式](#34-特殊创建方式)
4. [RDD算子分类体系](#4-rdd算子分类体系)
5. [转换算子(Transformations)](#5-转换算子transformations)
- [5.1 基本转换算子](#51-基本转换算子)
- [5.2 键值对转换算子](#52-键值对转换算子)
- [5.3 分组聚合算子](#53-分组聚合算子)
6. [行动算子(Actions)](#6-行动算子actions)
7. [持久化算子详解](#7-持久化算子详解)
8. [RDD最佳实践](#8-rdd最佳实践)
9. [性能调优技巧](#9-性能调优技巧)
10. [常见问题解答](#10-常见问题解答)
## 1. RDD核心概念解析
RDD(Resilient Distributed Dataset)是Spark的核心数据抽象,代表一个不可变、可分区的元素集合,可以并行操作。其核心设计理念体现在三个关键词:
- **弹性(Resilient)**:具备容错机制,通过血缘关系(Lineage)实现数据重建
- **分布式(Distributed)**:数据分布在集群多个节点上
- **数据集(Dataset)**:包含实际数据的操作对象
```scala
// 典型RDD执行流程示例
val rdd = sc.textFile("hdfs://data.log") // 创建RDD
.filter(_.contains("ERROR")) // 转换操作
.map(_.split("\t")(0)) // 转换操作
.cache() // 持久化
.count() // 行动操作
特性 | 说明 | 实现意义 |
---|---|---|
分区列表 | 每个RDD都有多个分区 | 实现并行计算 |
计算函数 | 每个分区都有计算函数 | 定义数据处理逻辑 |
依赖关系 | 维护RDD间的血缘关系 | 实现容错机制 |
分区器 | 决定数据如何分片 | 控制数据分布 |
首选位置 | 计算向数据移动 | 优化数据本地性 |
最常用的开发测试创建方式,通过parallelize
或makeRDD
方法:
// 使用parallelize
val data = 1 to 10000
val rdd1 = sc.parallelize(data) // 默认分区数=集群核心数
// 使用makeRDD(实际内部调用parallelize)
val rdd2 = sc.makeRDD(Seq(("a",1), ("b",2)))
// 指定分区数(建议为CPU核心的2-3倍)
val rdd3 = sc.parallelize(data, 50)
参数调优建议: - 每个分区处理数据量建议在128MB左右 - 分区数=集群总核心数的2-4倍为佳
生产环境主要创建方式,支持多种数据源:
// 从HDFS创建
val hdfsRDD = sc.textFile("hdfs://namenode:8020/path")
// 从本地文件系统创建
val localRDD = sc.textFile("file:///path/to/file")
// 从S3创建
val s3RDD = sc.textFile("s3a://bucket/path")
// 读取整个目录
val wholeRDD = sc.wholeTextFiles("hdfs:///logs/") // 返回(filename, content)对
文件读取特性:
- 支持通配符(/path/*.log
)
- 支持压缩格式自动解压(gz、lzo等)
- 最小分区数由minPartitions
参数控制
通过转换算子生成新RDD(后续章节详细说明):
val rddA = sc.textFile("data.txt")
val rddB = rddA.filter(_.length > 10) // 新RDD
val rddC = rddB.map(_.toUpperCase) // 再生成新RDD
// 创建空RDD
val emptyRDD = sc.emptyRDD[String]
// 范围RDD
val rangeRDD = sc.range(1, 100, 2) // 1到100,步长2
// 随机RDD
import org.apache.spark.mllib.random.RandomRDDs
val randomRDD = RandomRDDs.normalRDD(sc, 1000000L)
Spark算子分为三大类:
算子类型 | 特点 | 示例 |
---|---|---|
Transformations | 惰性执行,返回新RDD | map, filter, groupBy |
Actions | 触发实际计算,返回值 | count, collect, save |
Persistence | 控制缓存策略 | cache, persist |
执行流程示意图:
[创建RDD] -> [多个Transformations] -> [Action触发执行] -> [结果输出]
// map:1对1转换
val rdd = sc.parallelize(Seq("hello world", "spark tutorial"))
rdd.map(_.toUpperCase).collect()
// 输出:Array("HELLO WORLD", "SPARK TUTORIAL")
// flatMap:1对多转换
rdd.flatMap(_.split(" ")).collect()
// 输出:Array("hello", "world", "spark", "tutorial")
val nums = sc.parallelize(1 to 10)
nums.filter(_ % 2 == 0).collect() // 输出:Array(2,4,6,8,10)
val dup = sc.parallelize(Seq(1,2,2,3,3,3))
dup.distinct().collect() // 输出:Array(1,2,3)
val data = sc.parallelize(1 to 10000)
// 简单采样(无放回)
data.sample(false, 0.1).count() // 约1000个
// 精确采样
data.takeSample(false, 100) // 固定取100个
val pairs = sc.parallelize(Seq(("a",1), ("b",2), ("a",3)))
// reduceByKey
pairs.reduceByKey(_ + _).collect() // Array(("a",4), ("b",2))
// sortByKey
pairs.sortByKey().collect() // 按key排序
val rdd1 = sc.parallelize(Seq(("a",1), ("b",2)))
val rdd2 = sc.parallelize(Seq(("a",3), ("c",4)))
// 内连接
rdd1.join(rdd2).collect() // Array(("a",(1,3)))
// 左外连接
rdd1.leftOuterJoin(rdd2).collect()
// Array(("a",(1,Some(3))), ("b",(2,None)))
val data = sc.parallelize(1 to 100)
// groupBy
data.groupBy(x => x % 2).collect()
// Array((0,CompactBuffer(2,4,...)), (1,CompactBuffer(1,3,...)))
// aggregate(更高效)
val result = data.aggregate((0,0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
// (总和, 元素个数)
val rdd = sc.parallelize(1 to 100)
// 计数
rdd.count() // 100
// 聚合
rdd.reduce(_ + _) // 5050
rdd.fold(0)(_ + _) // 5050(分布式版本)
// 极值
rdd.max() // 100
rdd.min() // 1
// 收集到Driver(慎用大数据集)
rdd.collect()
// 取前N个
rdd.take(5) // Array(1,2,3,4,5)
// 保存到文件系统
rdd.saveAsTextFile("hdfs://output")
// 遍历元素(用于副作用操作)
rdd.foreach(println)
// 累加器示例
val accum = sc.longAccumulator("My Accumulator")
rdd.foreach(x => accum.add(x))
accum.value // 5050
存储级别 | 说明 | 内存使用 | 磁盘使用 |
---|---|---|---|
MEMORY_ONLY | 默认级别 | 高 | 无 |
MEMORY_AND_DISK | 内存不足存磁盘 | 中等 | 可能 |
DISK_ONLY | 仅磁盘 | 低 | 高 |
MEMORY_ONLY_SER | 序列化存储 | 较低 | 无 |
val rdd = sc.textFile("large.txt").cache() // 等同于persist(MEMORY_ONLY)
// 显示指定存储级别
import org.apache.spark.storage.StorageLevel
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
// 释放缓存
rdd.unpersist()
分区策略优化
coalesce
减少分区repartition
或自定义分区器算子选择原则
reduceByKey
替代groupByKey
mapPartitions
优于map
(减少函数调用开销)持久化策略
// 读取时指定分区数
sc.textFile("data", 200)
// 调整shuffle并行度
spark.conf.set("spark.default.parallelism", 200)
# 建议配置
spark.executor.memory=4g
spark.storage.memoryFraction=0.6
// 加盐处理倾斜key
val skewedRDD = rdd.map{
case key if key == "hotkey" =>
(key + "_" + Random.nextInt(10), value)
case other => (other, value)
}
Q1: RDD与DataFrame如何选择? - RDD:需要细粒度控制时使用,处理非结构化数据 - DataFrame:结构化数据处理,性能更优(Catalyst优化)
Q2: 什么情况下会触发shuffle? - 宽依赖操作:reduceByKey、join、repartition等 - 可通过UI界面观察shuffle数据量
Q3: 如何排查OOM问题? 1. 检查driver/executor内存配置 2. 检查collect操作是否收集数据过多 3. 检查是否有不可序列化对象被传输
本文详细介绍了Spark RDD的核心概念、创建方式、算子使用及优化技巧,共计约7050字。实际开发中应结合具体场景选择合适的RDD操作,并注意性能调优要点,才能充分发挥Spark分布式计算的优势。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。