Spark Streaming运行流程是怎样的

发布时间:2021-12-16 16:29:38 作者:iii
来源:亿速云 阅读:143
# Spark Streaming运行流程是怎样的

## 一、Spark Streaming概述

Spark Streaming是Apache Spark核心API的扩展,用于构建可扩展、高吞吐量、容错的实时数据流处理系统。它能够将来自Kafka、Flume、Kinesis等数据源的实时数据流进行高效处理,并以微批次(Micro-Batch)的方式进行处理,最终将结果输出到文件系统、数据库或实时仪表盘。

### 1.1 核心特点
- **微批处理架构**:将实时数据流切分为小批次(通常0.5~2秒),转换为Spark的RDD进行处理
- **Exactly-Once语义**:通过检查点(Checkpoint)和预写日志(WAL)保证数据一致性
- **与Spark生态无缝集成**:可直接复用Spark的机器学习(MLlib)、图计算(GraphX)等能力
- **多语言支持**:提供Scala、Java、Python API

### 1.2 基本概念
| 术语          | 说明                                                                 |
|---------------|----------------------------------------------------------------------|
| DStream       | 离散化流(Discretized Stream),Spark Streaming的基础抽象              |
| Batch Interval| 批次时间间隔(如1秒),决定微批次的划分粒度                            |
| Receiver      | 数据接收器,负责从外部源获取数据并存储到Spark内存中                    |

## 二、系统架构与核心组件

### 2.1 整体架构
```mermaid
graph TD
    A[数据源] --> B[Spark Streaming]
    B --> C{核心组件}
    C --> D[Receiver]
    C --> E[DStream]
    C --> F[JobGenerator]
    B --> G[输出操作]

2.2 关键组件详解

2.2.1 Driver端组件

  1. StreamingContext

    • 所有功能的入口点
    • 负责创建DStream和调度作业
    • 示例代码:
      
      val ssc = new StreamingContext(sparkConf, Seconds(1))
      
  2. JobGenerator

    • 定时器触发批次生成
    • 维护作业依赖关系(DStream Graph)
    • 关键参数:
      • spark.streaming.gracefulStopTimeout:优雅停止超时时间
  3. ReceiverTracker

    • 管理所有Receiver的生命周期
    • 记录元数据(如数据块位置)

2.2.2 Executor端组件

  1. Receiver

    • 实现类举例:
      • KafkaReceiver
      • FlumeReceiver
    • 数据存储策略:
      • 默认MEMORY_ONLY
      • 可配置为MEMORY_AND_DISK_SER
  2. BlockGenerator

    • 将接收的数据组合成块
    • 内部维护缓冲区:
      
      class BlockGenerator:
       def __init__(self):
           self.currentBuffer = []
           self.blockInterval = 200ms  # 默认值
      

三、详细运行流程

3.1 初始化阶段

  1. 创建StreamingContext

    JavaStreamingContext jssc = new JavaStreamingContext(
       new SparkConf().setAppName("NetworkWordCount"),
       Durations.seconds(1)
    );
    
  2. 定义输入源(以Socket为例):

    val lines = ssc.socketTextStream("localhost", 9999)
    
  3. 构建DStream转换操作:

    words = lines.flatMap(lambda line: line.split(" "))
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b: a+b)
    

3.2 数据接收流程

  1. Receiver启动过程

    • Driver通过ReceiverTracker发送StartReceiver消息
    • Executor启动ReceiverSupervisor
    • 实际Receiver开始接收数据
  2. 数据分块存储

    • 每批次数据被划分为多个Block
    • 存储位置信息上报给ReceiverTracker
    • 数据块默认复制策略:MEMORY_ONLY_2
  3. 容错机制

    • 检查点(Checkpoint)周期:
      
      ssc.checkpoint("hdfs://checkpoint_dir")
      
    • 预写日志(WAL)启用:
      
      sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
      

3.3 作业调度流程

  1. 时间窗口划分

    • 滑动窗口操作示例:
      
      val windowedWordCounts = wordCounts.window(Seconds(30), Seconds(10))
      
  2. 作业生成时序

    timeline
       title 批次处理时序
       section Batch Interval
       批次N-1 : 2023-01-01 12:00:00
       批次N   : 2023-01-01 12:00:01
       批次N+1 : 2023-01-01 12:00:02
    
  3. 任务执行阶段

    • Stage划分依据:Shuffle依赖
    • 任务调度优先级:
      1. Receiver所在Executor优先
      2. 数据本地性(PROCESS_LOCAL > NODE_LOCAL)

3.4 输出操作

常见输出方式对比:

输出方式 特点 示例代码
print() 调试使用,打印前10条记录 wordCounts.print()
saveAsTextFiles() 保存到HDFS dstream.saveAsTextFiles("hdfs://output")
foreachRDD 灵活自定义输出逻辑 见下方代码示例

foreachRDD典型用法:

wordCounts.foreachRDD(rdd -> {
    rdd.foreachPartition(partition -> {
        // 创建数据库连接
        Connection conn = createConnection();
        while (partition.hasNext()) {
            Tuple2<String, Integer> record = partition.next();
            // 写入数据库
            insertRecord(conn, record); 
        }
        conn.close();
    });
});

四、性能优化要点

4.1 资源配置建议

  1. Executor配置

    • 核数:至少4-8个专用于Receiver
    • 内存:接收数据量的2倍以上
  2. 并行度调整

    sc.setLogLevel("WARN")  // 减少日志量
    ssc.sparkContext.setCheckpointDir("/tmp")
    

4.2 关键参数调优

参数名 推荐值 说明
spark.streaming.blockInterval 200ms 数据块生成间隔
spark.streaming.receiver.maxRate 10000 单个Receiver最大接收速率(条/秒)
spark.streaming.backpressure.enabled true 启用反压机制

4.3 常见问题解决方案

  1. 数据积压

    • 增加批次间隔
    • 启用动态资源分配:
      
      spark.dynamicAllocation.enabled=true
      
  2. Receiver故障

    • 配置多个Receiver:
      
      val kafkaStreams = (1 to 3).map(_ => KafkaUtils.createStream(...))
      val unifiedStream = ssc.union(kafkaStreams)
      

五、与Structured Streaming对比

5.1 架构差异

特性 Spark Streaming Structured Streaming
处理模型 微批处理 微批/持续处理
API层级 RDD级 DataFrame/DataSet级
事件时间处理 需手动实现 原生支持

5.2 迁移建议

六、总结

Spark Streaming通过将流数据离散化为一系列小批次RDD,实现了: 1. 高吞吐量的实时处理能力 2. 与批处理统一编程模型 3. 强大的容错机制

典型应用场景包括: - 实时监控告警系统 - 用户行为实时分析 - IoT设备数据处理

随着Spark 3.0的发布,虽然Structured Streaming成为主流,但理解Spark Streaming的运行机制仍是掌握Spark流处理体系的重要基础。 “`

推荐阅读:
  1. 五、spark--spark streaming原理和使用
  2. 1.spark简介

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

spark streaming

上一篇:Flex程序设计中正则表达式的属性和方法有哪些

下一篇:怎么解析Python中的Dict

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》