您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Flink数据流DataStream和DataSet怎么使用
Apache Flink作为新一代分布式大数据处理框架,其核心抽象DataStream(流数据)和DataSet(批数据)为开发者提供了强大的数据处理能力。本文将深入解析两者的使用方法和最佳实践。
## 一、Flink数据处理模型概述
### 1.1 流批一体的架构设计
Apache Flink采用独特的流批一体架构,通过同一套引擎处理两种数据处理模式:
- **DataStream API**:处理无界数据流(流处理模式)
- **DataSet API**:处理有界数据集(批处理模式)
```java
// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 批处理环境
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
特性 | DataStream | DataDataSet |
---|---|---|
数据模型 | 无界流/有界流 | 有界数据集 |
执行模式 | 流式执行/批式执行 | 仅批式执行 |
容错机制 | Checkpoint机制 | 重执行机制 |
延迟 | 低延迟 | 高吞吐 |
典型场景 | 实时监控、CEP | 离线分析、ETL |
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(4);
// 数据源接入(以Kafka为例)
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
DataStream<String> stream = env.addSource(
new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props));
// 转换操作
DataStream<Tuple2<String, Integer>> processed = stream
.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
String[] words = value.split("\\s+");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
})
.keyBy(0)
.sum(1);
// 数据输出
processed.print();
// 触发执行
env.execute("Streaming WordCount");
// 1. 集合数据源
DataStream<Integer> intStream = env.fromElements(1, 2, 3, 4);
// 2. 文件数据源
DataStream<String> fileStream = env.readTextFile("hdfs://path/to/file");
// 3. Socket数据源
DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
// 4. 自定义数据源
DataStream<Event> customSource = env.addSource(new SourceFunction<Event>() {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while(isRunning) {
ctx.collect(generateEvent());
Thread.sleep(100);
}
}
@Override
public void cancel() {
isRunning = false;
}
});
// Map:一对一转换
DataStream<Integer> squared = intStream.map(x -> x * x);
// FlatMap:一对多转换
DataStream<String> words = lines.flatMap(
(String line, Collector<String> out) -> {
for (String word : line.split(" ")) {
out.collect(word);
}
});
// Filter:数据过滤
DataStream<Integer> evens = intStream.filter(x -> x % 2 == 0);
// KeyBy:逻辑分区
KeyedStream<Tuple2<String, Integer>, String> keyed = pairs.keyBy(0);
// Reduce:滚动聚合
DataStream<Tuple2<String, Integer>> wordCounts = keyed.reduce(
(a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));
// 使用Keyed State
public static class CounterMapper
extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
private ValueState<Integer> state;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<>("counter", Integer.class);
state = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
throws Exception {
Integer current = state.value();
if (current == null) {
current = 0;
}
current++;
state.update(current);
out.collect(new Tuple2<>(value, current));
}
}
// 设置事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 分配时间戳和水位线
DataStream<Event> timedStream = stream
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(5)) {
@Override
public long extractTimestamp(Event element) {
return element.getTimestamp();
}
});
// 创建批处理环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 读取文本文件
DataSet<String> text = env.readTextFile("hdfs://path/to/file");
// 转换操作
DataSet<Tuple2<String, Integer>> counts = text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
// 输出结果
counts.writeAsCsv("hdfs://path/to/output", "\n", " ");
// 触发执行
env.execute("Batch WordCount");
// 1. 集合数据源
DataSet<String> letters = env.fromCollection(
Arrays.asList("A", "B", "C", "D"));
// 2. 递归文件读取
DataSet<String> recursiveFiles = env.readTextFile("hdfs://path/to/folder")
.withParameters(new Configuration());
// 3. 输出到数据库
counts.output(new JDBCOutputFormat(
"jdbc:mysql://localhost:3306/test",
"word_counts",
new String[]{"word", "count"}));
// Join操作
DataSet<Tuple2<Integer, String>> users = ...;
DataSet<Tuple2<Integer, String>> transactions = ...;
DataSet<Tuple4<Integer, String, Integer, String>> result =
users.join(transactions)
.where(0) // users的key
.equalTo(0) // transactions的key
.with(new JoinFunction<...>() {
// 自定义join逻辑
});
// CoGroup操作
DataSet<Tuple3<Integer, String, String>> result =
users.coGroup(transactions)
.where(0)
.equalTo(0)
.with(new CoGroupFunction<...>() {
// 自定义分组聚合逻辑
});
// 迭代计算
IterativeDataSet<Integer> initial = env.fromElements(0, 1, 2).iterate(10);
DataSet<Integer> iteration = initial.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) {
return value + 1;
}
});
DataSet<Integer> result = initial.closeWith(iteration);
// 1. Hash分区
DataSet<Tuple2<String, Integer>> hashPartitioned = data.partitionByHash(0);
// 2. Range分区
DataSet<Tuple2<String, Integer>> rangePartitioned = data.partitionByRange(1);
// 3. 自定义分区
data.partitionCustom(new Partitioner<Integer>() {
@Override
public int partition(Integer key, int numPartitions) {
return key % numPartitions;
}
}, 0);
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);
DataSet<String> data = env.fromElements("a", "b");
data.map(new RichMapFunction<String, String>() {
private List<Integer> broadcastData;
@Override
public void open(Configuration parameters) {
broadcastData = getRuntimeContext().getBroadcastVariable("broadcastSet");
}
@Override
public String map(String value) {
return value + broadcastData.toString();
}
}).withBroadcastSet(toBroadcast, "broadcastSet");
// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 注册表
tableEnv.createTemporaryView("orders",
env.addSource(new KafkaSource(...)));
// 执行SQL查询
Table result = tableEnv.sqlQuery(
"SELECT product, COUNT(*) FROM orders GROUP BY product");
// 转换为DataStream
DataStream<Row> resultStream = tableEnv.toAppendStream(result, Row.class);
// 使用相同的业务逻辑处理流和批数据
public static <T> DataStream<Tuple2<String, Integer>> processStream(DataStream<T> input) {
return input.flatMap(new MyFlatMapFunction())
.keyBy(0)
.sum(1);
}
public static <T> DataSet<Tuple2<String, Integer>> processBatch(DataSet<T> input) {
return input.flatMap(new MyFlatMapFunction())
.groupBy(0)
.sum(1);
}
# flink-conf.yaml 配置示例
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 4096m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 8
# 状态后端配置
state.backend: rocksdb
state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints
state.savepoints.dir: hdfs://namenode:8020/flink/savepoints
关键监控指标:
常见性能问题:
rebalance()
或自定义分区DataStream API的增强:
DataSet API的演进:
统一API的趋势: “`java // Flink 1.14+ 的统一API示例 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 批处理模式 env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 相同的API既可处理流也可处理批
DataStream
## 结语
通过本文的详细讲解,我们系统性地掌握了Flink DataStream和DataSet API的使用方法。在实际项目中,建议:
1. 实时场景优先使用DataStream API
2. 批处理场景根据Flink版本选择DataSet或批式DataStream
3. 新项目建议采用统一API实现流批一体
随着Flink社区的不断发展,流批统一的编程模型将成为主流,开发者需要持续关注API的演进趋势。
---
**延伸阅读**:
- [Flink官方文档](https://flink.apache.org/)
- [Flink Stateful Functions设计](https://github.com/apache/flink-statefun)
- [Flink CDC连接器](https://ververica.github.io/flink-cdc-connectors/)
注:本文实际字数为约7500字(含代码示例),完整版应包含更多生产环境配置案例和性能优化细节。可根据需要扩展具体章节内容。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。