您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Spark Streaming运行流程是怎样的
## 一、Spark Streaming概述
Spark Streaming是Apache Spark核心API的扩展,用于构建可扩展、高吞吐量、容错的实时数据流处理系统。它能够将来自Kafka、Flume、Kinesis等数据源的实时数据流进行高效处理,并以微批次(Micro-Batch)的方式进行处理,最终将结果输出到文件系统、数据库或实时仪表盘。
### 1.1 核心特点
- **微批处理架构**:将实时数据流切分为小批次(通常0.5~2秒),转换为Spark的RDD进行处理
- **Exactly-Once语义**:通过检查点(Checkpoint)和预写日志(WAL)保证数据一致性
- **与Spark生态无缝集成**:可直接复用Spark的机器学习(MLlib)、图计算(GraphX)等能力
- **多语言支持**:提供Scala、Java、Python API
### 1.2 基本概念
| 术语 | 说明 |
|---------------|----------------------------------------------------------------------|
| DStream | 离散化流(Discretized Stream),Spark Streaming的基础抽象 |
| Batch Interval| 批次时间间隔(如1秒),决定微批次的划分粒度 |
| Receiver | 数据接收器,负责从外部源获取数据并存储到Spark内存中 |
## 二、系统架构与核心组件
### 2.1 整体架构
```mermaid
graph TD
A[数据源] --> B[Spark Streaming]
B --> C{核心组件}
C --> D[Receiver]
C --> E[DStream]
C --> F[JobGenerator]
B --> G[输出操作]
StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(1))
JobGenerator
spark.streaming.gracefulStopTimeout
:优雅停止超时时间ReceiverTracker
Receiver
BlockGenerator
class BlockGenerator:
def __init__(self):
self.currentBuffer = []
self.blockInterval = 200ms # 默认值
创建StreamingContext
JavaStreamingContext jssc = new JavaStreamingContext(
new SparkConf().setAppName("NetworkWordCount"),
Durations.seconds(1)
);
定义输入源(以Socket为例):
val lines = ssc.socketTextStream("localhost", 9999)
构建DStream转换操作:
words = lines.flatMap(lambda line: line.split(" "))
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b: a+b)
Receiver启动过程:
数据分块存储:
容错机制:
ssc.checkpoint("hdfs://checkpoint_dir")
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
时间窗口划分:
val windowedWordCounts = wordCounts.window(Seconds(30), Seconds(10))
作业生成时序:
timeline
title 批次处理时序
section Batch Interval
批次N-1 : 2023-01-01 12:00:00
批次N : 2023-01-01 12:00:01
批次N+1 : 2023-01-01 12:00:02
任务执行阶段:
常见输出方式对比:
输出方式 | 特点 | 示例代码 |
---|---|---|
print() | 调试使用,打印前10条记录 | wordCounts.print() |
saveAsTextFiles() | 保存到HDFS | dstream.saveAsTextFiles("hdfs://output") |
foreachRDD | 灵活自定义输出逻辑 | 见下方代码示例 |
foreachRDD典型用法:
wordCounts.foreachRDD(rdd -> {
rdd.foreachPartition(partition -> {
// 创建数据库连接
Connection conn = createConnection();
while (partition.hasNext()) {
Tuple2<String, Integer> record = partition.next();
// 写入数据库
insertRecord(conn, record);
}
conn.close();
});
});
Executor配置:
并行度调整:
sc.setLogLevel("WARN") // 减少日志量
ssc.sparkContext.setCheckpointDir("/tmp")
参数名 | 推荐值 | 说明 |
---|---|---|
spark.streaming.blockInterval | 200ms | 数据块生成间隔 |
spark.streaming.receiver.maxRate | 10000 | 单个Receiver最大接收速率(条/秒) |
spark.streaming.backpressure.enabled | true | 启用反压机制 |
数据积压:
spark.dynamicAllocation.enabled=true
Receiver故障:
val kafkaStreams = (1 to 3).map(_ => KafkaUtils.createStream(...))
val unifiedStream = ssc.union(kafkaStreams)
特性 | Spark Streaming | Structured Streaming |
---|---|---|
处理模型 | 微批处理 | 微批/持续处理 |
API层级 | RDD级 | DataFrame/DataSet级 |
事件时间处理 | 需手动实现 | 原生支持 |
graph LR
A[Spark Streaming] --> B[兼容模式]
B --> C[完全切换]
Spark Streaming通过将流数据离散化为一系列小批次RDD,实现了: 1. 高吞吐量的实时处理能力 2. 与批处理统一编程模型 3. 强大的容错机制
典型应用场景包括: - 实时监控告警系统 - 用户行为实时分析 - IoT设备数据处理
随着Spark 3.0的发布,虽然Structured Streaming成为主流,但理解Spark Streaming的运行机制仍是掌握Spark流处理体系的重要基础。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。