您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# 如何构建Apache Flink应用
## 1. 引言
### 1.1 Apache Flink概述
Apache Flink是一个开源的分布式流处理框架,由Apache软件基金会开发并维护。它最初诞生于柏林工业大学的研究项目,后于2014年加入Apache孵化器,2015年正式成为顶级项目。Flink的核心设计理念是"流处理优先"(Streaming-first),将批处理视为流处理的特例,实现了真正的批流一体化处理。
### 1.2 Flink的核心特性
- **低延迟高吞吐**:毫秒级延迟下仍能保持高吞吐量
- **精确一次(Exactly-once)**状态一致性保证
- **事件时间处理**与水位线(Watermark)机制
- **状态管理**:支持超大状态的高效存储和访问
- **容错机制**:基于Chandy-Lamport算法的分布式快照
### 1.3 典型应用场景
- 实时ETL与数据管道
- 实时监控与异常检测
- 事件驱动型应用
- 复杂事件处理(CEP)
- 机器学习模型实时推理
## 2. 环境准备与项目搭建
### 2.1 系统要求
- **Java**:JDK 8或11(推荐OpenJDK)
- **Maven**:3.0+(项目管理工具)
- **IDE**:IntelliJ IDEA或Eclipse(推荐前者)
- **集群环境**(可选):Standalone/YARN/Kubernetes
### 2.2 创建Maven项目
```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.16.0</version>
<scope>provided</scope>
</dependency>
// 设置本地执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(new Configuration());
// 设置并行度
env.setParallelism(4);
graph TD
A[DataSource] --> B[Transformation]
B --> C[Sink]
DataStream<String> text = env.socketTextStream("localhost", 9999);
DataStream<Tuple2<String, Integer>> counts = text
.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
for (String word : value.split("\\s")) {
out.collect(new Tuple2<>(word, 1));
}
})
.keyBy(0)
.sum(1);
counts.print();
类型 | 实现类 | 描述 |
---|---|---|
文件 | readTextFile |
读取文本文件 |
Socket | socketTextStream |
网络套接字 |
集合 | fromCollection |
内存集合数据 |
Kafka | FlinkKafkaConsumer |
Kafka消息队列 |
public class CustomSource implements SourceFunction<String> {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) {
while(isRunning) {
ctx.collect("event-" + System.currentTimeMillis());
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
// JDBC Sink示例
counts.addSink(JdbcSink.sink(
"INSERT INTO word_counts (word, count) VALUES (?, ?)",
(ps, t) -> {
ps.setString(1, t.f0);
ps.setInt(2, t.f1);
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/test")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("user")
.withPassword("pass")
.build()
));
// Map转换
DataStream<Integer> lengths = text.map(String::length);
// Filter过滤
DataStream<String> filtered = text.filter(s -> s.startsWith("A"));
// FlatMap展开
DataStream<String> words = text.flatMap((String value, Collector<String> out) -> {
for (String word : value.split(" ")) {
out.collect(word);
}
});
DataStream<Tuple2<String, Double>> sales = ...;
// KeyBy分组
KeyedStream<Tuple2<String, Double>, String> keyedSales = sales.keyBy(0);
// Reduce聚合
DataStream<Tuple2<String, Double>> totalSales = keyedSales
.reduce((x, y) -> new Tuple2<>(x.f0, x.f1 + y.f1));
// Union合并
DataStream<String> stream1 = ...;
DataStream<String> stream2 = ...;
DataStream<String> unioned = stream1.union(stream2);
// Connect连接
ConnectedStreams<String, Integer> connected = stream1.connect(stream2);
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>("average",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
sum = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) {
Tuple2<Long, Long> currentSum = sum.value();
currentSum.f0 += 1;
currentSum.f1 += input.f1;
sum.update(currentSum);
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
}
// 启用检查点(每10秒)
env.enableCheckpointing(10000);
// 高级配置
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setMinPauseBetweenCheckpoints(500);
config.setCheckpointTimeout(60000);
config.setMaxConcurrentCheckpoints(1);
config.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETN_ON_CANCELLATION);
DataStream<Event> events = ...;
// 周期性水位线
DataStream<Event> withTimestamps = events
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp()));
// 自定义水位线
public class CustomWatermarkGenerator implements WatermarkGenerator<Event> {
private final long maxOutOfOrderness = 5000; // 5秒
private long currentMaxTimestamp;
@Override
public void onEvent(Event event, long eventTimestamp,
WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
}
}
// 滚动窗口(Tumbling)
dataStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1);
// 滑动窗口(Sliding)
dataStream.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.sum(1);
// 会话窗口(Session)
dataStream.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.minutes(5)))
.sum(1);
AsyncDataStream.unorderedWait(
inputStream,
new AsyncDatabaseRequest(),
1000, // 超时时间
TimeUnit.MILLISECONDS,
100 // 最大并发请求数
);
class AsyncDatabaseRequest extends RichAsyncFunction<String, String> {
private transient Connection connection;
@Override
public void open(Configuration parameters) {
connection = DriverManager.getConnection(DB_URL);
}
@Override
public void asyncInvoke(String key, ResultFuture<String> resultFuture) {
// 异步查询
CompletableFuture.supplyAsync(() -> {
try (PreparedStatement stmt = connection.prepareStatement(SQL)) {
stmt.setString(1, key);
ResultSet rs = stmt.executeQuery();
return rs.getString(1);
}
}).thenAccept(resultFuture::complete);
}
}
final OutputTag<String> rejectedTag = new OutputTag<String>("rejected"){};
SingleOutputStreamOperator<String> mainStream = processStream
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(
String value,
Context ctx,
Collector<String> out) {
if (value.startsWith("A")) {
out.collect(value);
} else {
ctx.output(rejectedTag, value);
}
}
});
DataStream<String> rejected = mainStream.getSideOutput(rejectedTag);
# flink-conf.yaml配置示例
taskmanager.numberOfTaskSlots: 4
parallelism.default: 8
taskmanager.memory.process.size: 4096m
jobmanager.memory.process.size: 2048m
org.apache.flink.api.common.typeutils.TypeSerializer
// RocksDB状态后端(适合大状态)
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:8020/flink/checkpoints", true));
// FsStateBackend(适合中小状态)
env.setStateBackend(new FsStateBackend("hdfs://namenode:8020/flink/checkpoints"));
DataStream<Transaction> transactions = ...;
// 规则1:大额交易告警
transactions
.filter(t -> t.getAmount() > 10000)
.addSink(new AlertSink());
// 规则2:高频交易检测
transactions
.keyBy(Transaction::getAccountId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new FraudDetector());
// 用户行为事件流
DataStream<UserBehavior> behaviors = ...;
// 物品点击统计
behaviors
.filter(b -> b.getType().equals("click"))
.keyBy(UserBehavior::getItemId)
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
.aggregate(new CountAgg(), new WindowResultFunction());
// 实时关联规则
behaviors
.keyBy(UserBehavior::getSessionId)
.window(TumblingEventTimeWindows.of(Time.minutes(30)))
.process(new SessionAnalysis());
<!-- Kafka连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- JDBC连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
”`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。