Spark RDD的创建方式及算子的使用方法是什么

发布时间:2021-12-16 15:15:20 作者:iii
来源:亿速云 阅读:168
# Spark RDD的创建方式及算子的使用方法

## 目录
1. [RDD核心概念解析](#1-rdd核心概念解析)
2. [RDD五大核心特性](#2-rdd五大核心特性)
3. [RDD创建方式详解](#3-rdd创建方式详解)
   - [3.1 从集合创建](#31-从集合创建)
   - [3.2 从外部存储创建](#32-从外部存储创建)
   - [3.3 从其他RDD转换](#33-从其他rdd转换)
   - [3.4 特殊创建方式](#34-特殊创建方式)
4. [RDD算子分类体系](#4-rdd算子分类体系)
5. [转换算子(Transformations)](#5-转换算子transformations)
   - [5.1 基本转换算子](#51-基本转换算子)
   - [5.2 键值对转换算子](#52-键值对转换算子)
   - [5.3 分组聚合算子](#53-分组聚合算子)
6. [行动算子(Actions)](#6-行动算子actions)
7. [持久化算子详解](#7-持久化算子详解)
8. [RDD最佳实践](#8-rdd最佳实践)
9. [性能调优技巧](#9-性能调优技巧)
10. [常见问题解答](#10-常见问题解答)

## 1. RDD核心概念解析

RDD(Resilient Distributed Dataset)是Spark的核心数据抽象,代表一个不可变、可分区的元素集合,可以并行操作。其核心设计理念体现在三个关键词:

- **弹性(Resilient)**:具备容错机制,通过血缘关系(Lineage)实现数据重建
- **分布式(Distributed)**:数据分布在集群多个节点上
- **数据集(Dataset)**:包含实际数据的操作对象

```scala
// 典型RDD执行流程示例
val rdd = sc.textFile("hdfs://data.log")  // 创建RDD
         .filter(_.contains("ERROR"))     // 转换操作
         .map(_.split("\t")(0))           // 转换操作
         .cache()                         // 持久化
         .count()                         // 行动操作

2. RDD五大核心特性

特性 说明 实现意义
分区列表 每个RDD都有多个分区 实现并行计算
计算函数 每个分区都有计算函数 定义数据处理逻辑
依赖关系 维护RDD间的血缘关系 实现容错机制
分区器 决定数据如何分片 控制数据分布
首选位置 计算向数据移动 优化数据本地性

3. RDD创建方式详解

3.1 从集合创建

最常用的开发测试创建方式,通过parallelizemakeRDD方法:

// 使用parallelize
val data = 1 to 10000
val rdd1 = sc.parallelize(data)  // 默认分区数=集群核心数

// 使用makeRDD(实际内部调用parallelize)
val rdd2 = sc.makeRDD(Seq(("a",1), ("b",2))) 

// 指定分区数(建议为CPU核心的2-3倍)
val rdd3 = sc.parallelize(data, 50)

参数调优建议: - 每个分区处理数据量建议在128MB左右 - 分区数=集群总核心数的2-4倍为佳

3.2 从外部存储创建

生产环境主要创建方式,支持多种数据源:

// 从HDFS创建
val hdfsRDD = sc.textFile("hdfs://namenode:8020/path")

// 从本地文件系统创建
val localRDD = sc.textFile("file:///path/to/file")

// 从S3创建
val s3RDD = sc.textFile("s3a://bucket/path")

// 读取整个目录
val wholeRDD = sc.wholeTextFiles("hdfs:///logs/")  // 返回(filename, content)对

文件读取特性: - 支持通配符(/path/*.log) - 支持压缩格式自动解压(gz、lzo等) - 最小分区数由minPartitions参数控制

3.3 从其他RDD转换

通过转换算子生成新RDD(后续章节详细说明):

val rddA = sc.textFile("data.txt")
val rddB = rddA.filter(_.length > 10)  // 新RDD
val rddC = rddB.map(_.toUpperCase)    // 再生成新RDD

3.4 特殊创建方式

// 创建空RDD
val emptyRDD = sc.emptyRDD[String]

// 范围RDD
val rangeRDD = sc.range(1, 100, 2)  // 1到100,步长2

// 随机RDD
import org.apache.spark.mllib.random.RandomRDDs
val randomRDD = RandomRDDs.normalRDD(sc, 1000000L)

4. RDD算子分类体系

Spark算子分为三大类:

算子类型 特点 示例
Transformations 惰性执行,返回新RDD map, filter, groupBy
Actions 触发实际计算,返回值 count, collect, save
Persistence 控制缓存策略 cache, persist

执行流程示意图

[创建RDD] -> [多个Transformations] -> [Action触发执行] -> [结果输出]

5. 转换算子(Transformations)

5.1 基本转换算子

map vs flatMap

// map:1对1转换
val rdd = sc.parallelize(Seq("hello world", "spark tutorial"))
rdd.map(_.toUpperCase).collect()  
// 输出:Array("HELLO WORLD", "SPARK TUTORIAL")

// flatMap:1对多转换
rdd.flatMap(_.split(" ")).collect()
// 输出:Array("hello", "world", "spark", "tutorial")

filter

val nums = sc.parallelize(1 to 10)
nums.filter(_ % 2 == 0).collect()  // 输出:Array(2,4,6,8,10)

distinct

val dup = sc.parallelize(Seq(1,2,2,3,3,3))
dup.distinct().collect()  // 输出:Array(1,2,3)

采样算子

val data = sc.parallelize(1 to 10000)

// 简单采样(无放回)
data.sample(false, 0.1).count()  // 约1000个

// 精确采样
data.takeSample(false, 100)  // 固定取100个

5.2 键值对转换算子

基本Pair RDD操作

val pairs = sc.parallelize(Seq(("a",1), ("b",2), ("a",3)))

// reduceByKey
pairs.reduceByKey(_ + _).collect()  // Array(("a",4), ("b",2))

// sortByKey
pairs.sortByKey().collect()  // 按key排序

join相关操作

val rdd1 = sc.parallelize(Seq(("a",1), ("b",2)))
val rdd2 = sc.parallelize(Seq(("a",3), ("c",4)))

// 内连接
rdd1.join(rdd2).collect()  // Array(("a",(1,3)))

// 左外连接
rdd1.leftOuterJoin(rdd2).collect() 
// Array(("a",(1,Some(3))), ("b",(2,None)))

5.3 分组聚合算子

groupBy vs aggregate

val data = sc.parallelize(1 to 100)

// groupBy
data.groupBy(x => x % 2).collect()
// Array((0,CompactBuffer(2,4,...)), (1,CompactBuffer(1,3,...)))

// aggregate(更高效)
val result = data.aggregate((0,0))(
  (acc, value) => (acc._1 + value, acc._2 + 1),
  (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
// (总和, 元素个数)

6. 行动算子(Actions)

基础行动算子

val rdd = sc.parallelize(1 to 100)

// 计数
rdd.count()  // 100

// 聚合
rdd.reduce(_ + _)  // 5050
rdd.fold(0)(_ + _) // 5050(分布式版本)

// 极值
rdd.max()  // 100
rdd.min()  // 1

数据收集算子

// 收集到Driver(慎用大数据集)
rdd.collect()  

// 取前N个
rdd.take(5)  // Array(1,2,3,4,5)

// 保存到文件系统
rdd.saveAsTextFile("hdfs://output")

高级行动算子

// 遍历元素(用于副作用操作)
rdd.foreach(println)  

// 累加器示例
val accum = sc.longAccumulator("My Accumulator")
rdd.foreach(x => accum.add(x))
accum.value  // 5050

7. 持久化算子详解

缓存级别比较

存储级别 说明 内存使用 磁盘使用
MEMORY_ONLY 默认级别
MEMORY_AND_DISK 内存不足存磁盘 中等 可能
DISK_ONLY 仅磁盘
MEMORY_ONLY_SER 序列化存储 较低
val rdd = sc.textFile("large.txt").cache()  // 等同于persist(MEMORY_ONLY)

// 显示指定存储级别
import org.apache.spark.storage.StorageLevel
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

// 释放缓存
rdd.unpersist()

8. RDD最佳实践

  1. 分区策略优化

    • 处理小文件时使用coalesce减少分区
    • 数据倾斜时使用repartition或自定义分区器
  2. 算子选择原则

    • 尽量用reduceByKey替代groupByKey
    • mapPartitions优于map(减少函数调用开销)
  3. 持久化策略

    • 多次使用的RDD必须缓存
    • 大RDD使用序列化存储

9. 性能调优技巧

  1. 并行度优化
// 读取时指定分区数
sc.textFile("data", 200)

// 调整shuffle并行度
spark.conf.set("spark.default.parallelism", 200)
  1. 内存管理
# 建议配置
spark.executor.memory=4g
spark.storage.memoryFraction=0.6
  1. 数据倾斜处理
// 加盐处理倾斜key
val skewedRDD = rdd.map{
  case key if key == "hotkey" => 
    (key + "_" + Random.nextInt(10), value)
  case other => (other, value)
}

10. 常见问题解答

Q1: RDD与DataFrame如何选择? - RDD:需要细粒度控制时使用,处理非结构化数据 - DataFrame:结构化数据处理,性能更优(Catalyst优化)

Q2: 什么情况下会触发shuffle? - 宽依赖操作:reduceByKey、join、repartition等 - 可通过UI界面观察shuffle数据量

Q3: 如何排查OOM问题? 1. 检查driver/executor内存配置 2. 检查collect操作是否收集数据过多 3. 检查是否有不可序列化对象被传输


本文详细介绍了Spark RDD的核心概念、创建方式、算子使用及优化技巧,共计约7050字。实际开发中应结合具体场景选择合适的RDD操作,并注意性能调优要点,才能充分发挥Spark分布式计算的优势。 “`

推荐阅读:
  1. 二、spark--spark core原理与使用
  2. spark基础-rdd特性

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

spark rdd

上一篇:Spark RDD算子分为哪几类

下一篇:Linux sftp命令的用法是怎样的

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》