Flink数据流DataStream和DataSet怎么使用

发布时间:2021-12-31 14:35:10 作者:iii
来源:亿速云 阅读:208
# Flink数据流DataStream和DataSet怎么使用

Apache Flink作为新一代分布式大数据处理框架,其核心抽象DataStream(流数据)和DataSet(批数据)为开发者提供了强大的数据处理能力。本文将深入解析两者的使用方法和最佳实践。

## 一、Flink数据处理模型概述

### 1.1 流批一体的架构设计

Apache Flink采用独特的流批一体架构,通过同一套引擎处理两种数据处理模式:

- **DataStream API**:处理无界数据流(流处理模式)
- **DataSet API**:处理有界数据集(批处理模式)

```java
// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 批处理环境
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();

1.2 运行时对比

特性 DataStream DataDataSet
数据模型 无界流/有界流 有界数据集
执行模式 流式执行/批式执行 仅批式执行
容错机制 Checkpoint机制 重执行机制
延迟 低延迟 高吞吐
典型场景 实时监控、CEP 离线分析、ETL

二、DataStream API深度解析

2.1 基础编程模型

// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 设置并行度
env.setParallelism(4);

// 数据源接入(以Kafka为例)
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
DataStream<String> stream = env.addSource(
    new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props));

// 转换操作
DataStream<Tuple2<String, Integer>> processed = stream
    .flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
        String[] words = value.split("\\s+");
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1));
        }
    })
    .keyBy(0)
    .sum(1);

// 数据输出
processed.print();

// 触发执行
env.execute("Streaming WordCount");

2.2 核心操作符详解

2.2.1 数据源(Source)

// 1. 集合数据源
DataStream<Integer> intStream = env.fromElements(1, 2, 3, 4);

// 2. 文件数据源
DataStream<String> fileStream = env.readTextFile("hdfs://path/to/file");

// 3. Socket数据源
DataStream<String> socketStream = env.socketTextStream("localhost", 9999);

// 4. 自定义数据源
DataStream<Event> customSource = env.addSource(new SourceFunction<Event>() {
    private volatile boolean isRunning = true;
    
    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        while(isRunning) {
            ctx.collect(generateEvent());
            Thread.sleep(100);
        }
    }
    
    @Override
    public void cancel() {
        isRunning = false;
    }
});

2.2.2 转换操作(Transformation)

// Map:一对一转换
DataStream<Integer> squared = intStream.map(x -> x * x);

// FlatMap:一对多转换
DataStream<String> words = lines.flatMap(
    (String line, Collector<String> out) -> {
        for (String word : line.split(" ")) {
            out.collect(word);
        }
    });

// Filter:数据过滤
DataStream<Integer> evens = intStream.filter(x -> x % 2 == 0);

// KeyBy:逻辑分区
KeyedStream<Tuple2<String, Integer>, String> keyed = pairs.keyBy(0);

// Reduce:滚动聚合
DataStream<Tuple2<String, Integer>> wordCounts = keyed.reduce(
    (a, b) -> new Tuple2<>(a.f0, a.f1 + b.f1));

2.3 高级特性

2.3.1 状态管理

// 使用Keyed State
public static class CounterMapper 
    extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
    
    private ValueState<Integer> state;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Integer> descriptor =
            new ValueStateDescriptor<>("counter", Integer.class);
        state = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
        throws Exception {
        Integer current = state.value();
        if (current == null) {
            current = 0;
        }
        current++;
        state.update(current);
        out.collect(new Tuple2<>(value, current));
    }
}

2.3.2 时间语义

// 设置事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// 分配时间戳和水位线
DataStream<Event> timedStream = stream
    .assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(5)) {
            @Override
            public long extractTimestamp(Event element) {
                return element.getTimestamp();
            }
        });

三、DataSet API全面解析

3.1 批处理编程模型

// 创建批处理环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// 读取文本文件
DataSet<String> text = env.readTextFile("hdfs://path/to/file");

// 转换操作
DataSet<Tuple2<String, Integer>> counts = text
    .flatMap(new LineSplitter())
    .groupBy(0)
    .sum(1);

// 输出结果
counts.writeAsCsv("hdfs://path/to/output", "\n", " ");

// 触发执行
env.execute("Batch WordCount");

3.2 核心操作符

3.2.1 数据源与输出

// 1. 集合数据源
DataSet<String> letters = env.fromCollection(
    Arrays.asList("A", "B", "C", "D"));

// 2. 递归文件读取
DataSet<String> recursiveFiles = env.readTextFile("hdfs://path/to/folder")
    .withParameters(new Configuration());

// 3. 输出到数据库
counts.output(new JDBCOutputFormat(
    "jdbc:mysql://localhost:3306/test",
    "word_counts",
    new String[]{"word", "count"}));

3.2.2 转换操作

// Join操作
DataSet<Tuple2<Integer, String>> users = ...;
DataSet<Tuple2<Integer, String>> transactions = ...;
DataSet<Tuple4<Integer, String, Integer, String>> result = 
    users.join(transactions)
         .where(0)    // users的key
         .equalTo(0)  // transactions的key
         .with(new JoinFunction<...>() {
             // 自定义join逻辑
         });

// CoGroup操作
DataSet<Tuple3<Integer, String, String>> result = 
    users.coGroup(transactions)
         .where(0)
         .equalTo(0)
         .with(new CoGroupFunction<...>() {
             // 自定义分组聚合逻辑
         });

// 迭代计算
IterativeDataSet<Integer> initial = env.fromElements(0, 1, 2).iterate(10);

DataSet<Integer> iteration = initial.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) {
        return value + 1;
    }
});

DataSet<Integer> result = initial.closeWith(iteration);

3.3 性能优化技巧

3.3.1 分区策略

// 1. Hash分区
DataSet<Tuple2<String, Integer>> hashPartitioned = data.partitionByHash(0);

// 2. Range分区
DataSet<Tuple2<String, Integer>> rangePartitioned = data.partitionByRange(1);

// 3. 自定义分区
data.partitionCustom(new Partitioner<Integer>() {
    @Override
    public int partition(Integer key, int numPartitions) {
        return key % numPartitions;
    }
}, 0);

3.3.2 广播变量

DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);

DataSet<String> data = env.fromElements("a", "b");

data.map(new RichMapFunction<String, String>() {
    private List<Integer> broadcastData;

    @Override
    public void open(Configuration parameters) {
        broadcastData = getRuntimeContext().getBroadcastVariable("broadcastSet");
    }

    @Override
    public String map(String value) {
        return value + broadcastData.toString();
    }
}).withBroadcastSet(toBroadcast, "broadcastSet");

四、流批统一实践

4.1 Table API/SQL统一层

// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 注册表
tableEnv.createTemporaryView("orders", 
    env.addSource(new KafkaSource(...)));

// 执行SQL查询
Table result = tableEnv.sqlQuery(
    "SELECT product, COUNT(*) FROM orders GROUP BY product");

// 转换为DataStream
DataStream<Row> resultStream = tableEnv.toAppendStream(result, Row.class);

4.2 批流一体代码示例

// 使用相同的业务逻辑处理流和批数据
public static <T> DataStream<Tuple2<String, Integer>> processStream(DataStream<T> input) {
    return input.flatMap(new MyFlatMapFunction())
               .keyBy(0)
               .sum(1);
}

public static <T> DataSet<Tuple2<String, Integer>> processBatch(DataSet<T> input) {
    return input.flatMap(new MyFlatMapFunction())
               .groupBy(0)
               .sum(1);
}

五、生产环境最佳实践

5.1 资源配置建议

# flink-conf.yaml 配置示例
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 4096m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 8

# 状态后端配置
state.backend: rocksdb
state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints
state.savepoints.dir: hdfs://namenode:8020/flink/savepoints

5.2 监控与调优

  1. 关键监控指标

    • 吞吐量(records/s)
    • 延迟(ms)
    • Checkpoint持续时间
    • 背压指标
  2. 常见性能问题

    • 数据倾斜:使用rebalance()或自定义分区
    • 状态过大:配置增量检查点
    • 网络瓶颈:调整缓冲区大小

六、未来演进方向

  1. DataStream API的增强

    • 更完善的批执行模式支持
    • 增强的窗口操作符
  2. DataSet API的演进

    • 逐步与DataStream API融合
    • 批处理能力将作为流处理的特例存在
  3. 统一API的趋势: “`java // Flink 1.14+ 的统一API示例 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 批处理模式 env.setRuntimeMode(RuntimeExecutionMode.BATCH);

// 相同的API既可处理流也可处理批 DataStream input = env.readTextFile(“input.txt”); input.filter(…).keyBy(…).window(…).aggregate(…);


## 结语

通过本文的详细讲解,我们系统性地掌握了Flink DataStream和DataSet API的使用方法。在实际项目中,建议:

1. 实时场景优先使用DataStream API
2. 批处理场景根据Flink版本选择DataSet或批式DataStream
3. 新项目建议采用统一API实现流批一体

随着Flink社区的不断发展,流批统一的编程模型将成为主流,开发者需要持续关注API的演进趋势。

---

**延伸阅读**:
- [Flink官方文档](https://flink.apache.org/)
- [Flink Stateful Functions设计](https://github.com/apache/flink-statefun)
- [Flink CDC连接器](https://ververica.github.io/flink-cdc-connectors/)

注:本文实际字数为约7500字(含代码示例),完整版应包含更多生产环境配置案例和性能优化细节。可根据需要扩展具体章节内容。

推荐阅读:
  1. Apache Flink 官方文档--流(DataStream API)-旁路输出
  2. tensorflow中dataset.shuffle和dataset.batch dataset.repeat应该注意什么

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

flink datastream dataset

上一篇:命令行中怎么查看nodejs目录

下一篇:Araxis Merge for Mac工具有什么用

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》