您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# FLINK 1.12 upsertSql 使用详解
## 目录
1. [Upsert 概念解析](#upsert-概念解析)
2. [Flink SQL 中的 Upsert 模式](#flink-sql-中的-upsert-模式)
3. [upsertSql 语法详解](#upsertsql-语法详解)
4. [实战:Kafka 到 MySQL 的 Upsert 示例](#实战kafka-到-mysql-的-upsert-示例)
5. [性能优化与常见问题](#性能优化与常见问题)
6. [与 CDC 的集成实践](#与-cdc-的集成实践)
7. [版本兼容性说明](#版本兼容性说明)
8. [最佳实践总结](#最佳实践总结)
---
## Upsert 概念解析
### 什么是 Upsert
Upsert(Update + Insert)是一种混合操作,当记录存在时更新,不存在时插入。在分布式系统中,这是保证数据一致性的关键操作。
```sql
-- 传统SQL示例
INSERT INTO table (id, value)
VALUES (1, 'a')
ON DUPLICATE KEY UPDATE value = 'a';
-- 启用upsert模式
CREATE TABLE upsert_table (
id INT PRIMARY KEY,
name STRING,
cnt BIGINT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/test',
'table-name' = 'target_table',
'username' = 'user',
'password' = 'pass',
'sink.upsert-materialize' = 'NONE' -- 可选:FORCE/NONE
);
Connector | 是否支持 | 特殊配置 |
---|---|---|
JDBC | ✓ | 需定义PRIMARY KEY |
Kafka | ✓ | 需配置key.format |
HBase | ✓ | 需配置rowkey |
Elasticsearch | ✓ | 需配置document-id.key-delimiter |
INSERT INTO [catalog_name.][db_name.]table_name
{ VALUES (value1 [, value2]*) | SELECT query }
[ON DUPLICATE KEY UPDATE col1 = val1 [, col2 = val2]*]
场景1:主键冲突更新
INSERT INTO user_actions
SELECT user_id, action_time, action_type
FROM kafka_source
ON DUPLICATE KEY UPDATE
action_time = VALUES(action_time),
action_type = VALUES(action_type);
场景2:条件更新
INSERT INTO order_states
SELECT order_id, status, update_time
FROM source_stream
ON DUPLICATE KEY UPDATE
status = IF(VALUES(update_time) > update_time,
VALUES(status),
status);
-- 1. 创建Kafka源表
CREATE TABLE kafka_orders (
order_id STRING,
product_id STRING,
amount DECIMAL(10,2),
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
-- 2. 创建MySQL目标表
CREATE TABLE mysql_order_summary (
product_id STRING PRIMARY KEY,
total_amount DECIMAL(10,2),
last_order_time TIMESTAMP(3)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysql:3306/db',
'table-name' = 'order_summary',
'username' = 'user',
'password' = 'pass'
);
-- 3. 执行Upsert操作
INSERT INTO mysql_order_summary
SELECT
product_id,
SUM(amount) as total_amount,
MAX(order_time) as last_order_time
FROM kafka_orders
GROUP BY product_id
ON DUPLICATE KEY UPDATE
total_amount = VALUES(total_amount),
last_order_time = VALUES(last_order_time);
批量写入:
'sink.buffer-flush.interval' = '1s',
'sink.buffer-flush.max-rows' = '100'
异步提交:
'sink.parallelism' = '4'
状态后端选择:
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints
主键冲突:
org.apache.flink.table.api.TableException: Table sink requires primary key but no primary key is defined
解决方案:确认目标表PRIMARY KEY定义
数据类型不匹配:
java.sql.SQLException: Incorrect datetime value
解决方案:检查TIMESTAMP精度设置
CREATE TABLE cdc_source (
id INT,
name STRING,
description STRING,
update_ts TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'inventory.db.customers',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'debezium-json'
);
-- 将CDC变更同步到OLAP系统
INSERT INTO doris_output
SELECT * FROM cdc_source
ON DUPLICATE KEY UPDATE
name = VALUES(name),
description = VALUES(description);
操作类型 | 处理方式 |
---|---|
INSERT | 直接插入 |
UPDATE | 根据主键更新 |
DELETE | 需配置sink.ignore-delete |
功能 | 社区版 | 企业版增强 |
---|---|---|
基本Upsert | ✓ | ✓ |
自动重试 | × | ✓ |
冲突解决策略 | 简单覆盖 | 条件更新 |
Savepoint
迁移sink.num-records-out
和sink.num-records-out-errors
”`
注:实际文档应包含更多示例代码、性能测试数据和架构图。本文档框架可根据需要扩展具体章节的详细内容。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。