您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Flink中TableAPI&SQL怎么使用
## 1. 概述
Apache Flink作为流批一体的分布式计算引擎,提供了Table API和SQL两种高级抽象接口,让开发者能够以声明式的方式处理结构化数据。这两种接口不仅简化了开发流程,还能通过Flink的优化器自动优化执行计划。
### 1.1 Table API与SQL的关系
- **Table API**:面向对象的编程接口,通过链式调用操作表
- **SQL**:标准ANSI SQL语法,适合熟悉SQL的用户
- **底层统一**:两者最终都会转换成相同的逻辑计划
### 1.2 核心优势
1. **开发效率高**:相比DataStream API减少大量样板代码
2. **自动优化**:内置优化器选择最优执行计划
3. **统一批流**:同一套语法处理批处理和流处理
4. **生态兼容**:支持对接多种外部系统(Kafka、JDBC等)
## 2. 环境准备
### 2.1 添加依赖
```xml
<!-- Flink Table API依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.16.0</version>
<scope>provided</scope>
</dependency>
<!-- 本地执行环境需要 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.16.0</version>
<scope>provided</scope>
</dependency>
<!-- 如果需要用户自定义函数 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.16.0</version>
<scope>provided</scope>
</dependency>
import org.apache.flink.table.api.*;
// 流处理环境
EnvironmentSettings streamSettings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment streamTableEnv = TableEnvironment.create(streamSettings);
// 批处理环境
EnvironmentSettings batchSettings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
TableEnvironment batchTableEnv = TableEnvironment.create(batchSettings);
// 定义DataStream
DataStream<User> userStream = env.fromElements(
new User("Alice", 12),
new User("Bob", 10)
);
// 转换为Table
Table userTable = tableEnv.fromDataStream(userStream);
tableEnv.createTemporaryView("Users", userTable);
tableEnv.executeSql("CREATE TABLE Orders (
order_id STRING,
product STRING,
amount INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)");
Table result = tableEnv.from("Orders")
.filter($("amount").gt(100))
.groupBy($("product"))
.select($("product"), $("amount").sum().as("total_amount"));
tableEnv.executeSql("
SELECT product, SUM(amount) AS total_amount
FROM Orders
WHERE amount > 100
GROUP BY product
");
// 输出到控制台
tableEnv.executeSql("
CREATE TABLE ConsoleOutput (
product STRING,
total_amount BIGINT
) WITH (
'connector' = 'print'
)
");
// 执行查询并输出
tableEnv.executeSql("
INSERT INTO ConsoleOutput
SELECT product, SUM(amount)
FROM Orders
GROUP BY product
");
// DDL方式
tableEnv.executeSql("
CREATE TABLE Events (
user_id STRING,
event_type STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
) WITH (...)
");
// DataStream转换方式
Table eventTable = tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.column("user_id", "STRING")
.column("event_time", "TIMESTAMP(3)")
.watermark("event_time", "event_time - INTERVAL '10' SECOND")
.build()
);
// 滚动窗口
Table tumbleResult = tableEnv.sqlQuery("
SELECT
user_id,
TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
COUNT(*) AS event_count
FROM Events
GROUP BY
user_id,
TUMBLE(event_time, INTERVAL '1' HOUR)
");
// 滑动窗口
Table hopResult = tableEnv.sqlQuery("
SELECT
product,
HOP_START(order_time, INTERVAL '30' SECOND, INTERVAL '1' HOUR) AS window_start,
SUM(amount) AS total_amount
FROM Orders
GROUP BY
product,
HOP(order_time, INTERVAL '30' SECOND, INTERVAL '1' HOUR)
");
// 注册函数
tableEnv.createTemporarySystemFunction("MY_UPPER", MyUpperFunction.class);
// 使用函数
tableEnv.executeSql("SELECT MY_UPPER(name) FROM Users");
// 函数实现
public class MyUpperFunction extends ScalarFunction {
public String eval(String str) {
return str.toUpperCase();
}
}
public class WeightedAvg extends AggregateFunction<Double, Tuple2<Double, Integer>> {
@Override
public Tuple2<Double, Integer> createAccumulator() {
return Tuple2.of(0.0, 0);
}
public void accumulate(Tuple2<Double, Integer> acc, Double value, Integer weight) {
acc.f0 += value * weight;
acc.f1 += weight;
}
@Override
public Double getValue(Tuple2<Double, Integer> acc) {
return acc.f1 == 0 ? null : acc.f0 / acc.f1;
}
}
// 创建Kafka源表
tableEnv.executeSql("
CREATE TABLE clicks (
user_id STRING,
url STRING,
click_time TIMESTAMP(3),
WATERMARK FOR click_time AS click_time - INTERVAL '10' SECOND
) WITH (...)
");
// 每10分钟统计UV
tableEnv.executeSql("
CREATE VIEW uv_counts AS
SELECT
HOP_START(click_time, INTERVAL '5' SECOND, INTERVAL '10' MINUTE) AS window_start,
COUNT(DISTINCT user_id) AS uv
FROM clicks
GROUP BY HOP(click_time, INTERVAL '5' SECOND, INTERVAL '10' MINUTE)
");
// 输出到Elasticsearch
tableEnv.executeSql("
INSERT INTO es_uv_output
SELECT * FROM uv_counts
");
tableEnv.executeSql("
CREATE TABLE kafka_source (
user_id STRING,
behavior STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
");
tableEnv.executeSql("
CREATE TABLE jdbc_sink (
product_id STRING,
sales BIGINT,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysql:3306/db',
'table-name' = 'product_sales',
'username' = 'user',
'password' = 'password'
)
");
tableEnv.executeSql("
CREATE TABLE fs_source (
id INT,
name STRING,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'filesystem',
'path' = 'file:///path/to/input',
'format' = 'csv'
)
");
// 设置并行度
tableEnv.getConfig().set("parallelism.default", "4");
// 开启微批处理
tableEnv.getConfig().set("table.exec.mini-batch.enabled", "true");
tableEnv.getConfig().set("table.exec.mini-batch.size", "5000");
// 状态TTL设置
tableEnv.getConfig().set("table.exec.state.ttl", "1 h");
table.optimizer.join.broadcast-threshold=10MB
// 设置状态后端
tableEnv.executeSql("
SET 'state.backend' = 'rocksdb';
SET 'state.checkpoints.dir' = 'file:///checkpoints';
SET 'state.backend.incremental' = 'true';
");
问题:事件时间、处理时间、摄入时间概念混淆
解决:明确指定时间属性,正确设置watermark
问题:流式聚合操作导致状态持续增长
解决:设置合理的状态TTL,考虑使用窗口限定范围
问题:SQL和Java类型系统不一致
解决:使用CAST
显式转换,或自定义类型映射
Flink Table API和SQL提供了强大的结构化数据处理能力,通过本文的介绍,您应该已经掌握:
在实际项目中,建议从简单查询开始,逐步应用高级特性,并结合监控指标持续优化,充分发挥Flink流批一体的优势。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。