FLINK 1.12 upsertSql怎么使用

发布时间:2021-12-22 11:46:42 作者:iii
来源:亿速云 阅读:758
# FLINK 1.12 upsertSql 使用详解

## 目录
1. [Upsert 概念解析](#upsert-概念解析)  
2. [Flink SQL 中的 Upsert 模式](#flink-sql-中的-upsert-模式)  
3. [upsertSql 语法详解](#upsertsql-语法详解)  
4. [实战:Kafka 到 MySQL 的 Upsert 示例](#实战kafka-到-mysql-的-upsert-示例)  
5. [性能优化与常见问题](#性能优化与常见问题)  
6. [与 CDC 的集成实践](#与-cdc-的集成实践)  
7. [版本兼容性说明](#版本兼容性说明)  
8. [最佳实践总结](#最佳实践总结)  

---

## Upsert 概念解析

### 什么是 Upsert
Upsert(Update + Insert)是一种混合操作,当记录存在时更新,不存在时插入。在分布式系统中,这是保证数据一致性的关键操作。

```sql
-- 传统SQL示例
INSERT INTO table (id, value) 
VALUES (1, 'a') 
ON DUPLICATE KEY UPDATE value = 'a';

Flink 中的实现挑战

  1. 无界数据流:需要处理持续变化的键状态
  2. 精确一次语义:确保在故障恢复时不重复处理
  3. 状态管理:高效维护键值索引

Flink SQL 中的 Upsert 模式

核心配置参数

-- 启用upsert模式
CREATE TABLE upsert_table (
    id INT PRIMARY KEY,
    name STRING,
    cnt BIGINT
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/test',
    'table-name' = 'target_table',
    'username' = 'user',
    'password' = 'pass',
    'sink.upsert-materialize' = 'NONE' -- 可选:FORCE/NONE
);

支持的 Connector 类型

Connector 是否支持 特殊配置
JDBC 需定义PRIMARY KEY
Kafka 需配置key.format
HBase 需配置rowkey
Elasticsearch 需配置document-id.key-delimiter

upsertSql 语法详解

完整语法结构

INSERT INTO [catalog_name.][db_name.]table_name
  { VALUES (value1 [, value2]*) | SELECT query }
  [ON DUPLICATE KEY UPDATE col1 = val1 [, col2 = val2]*]

典型场景示例

场景1:主键冲突更新

INSERT INTO user_actions
SELECT user_id, action_time, action_type 
FROM kafka_source
ON DUPLICATE KEY UPDATE 
    action_time = VALUES(action_time),
    action_type = VALUES(action_type);

场景2:条件更新

INSERT INTO order_states
SELECT order_id, status, update_time
FROM source_stream
ON DUPLICATE KEY UPDATE
    status = IF(VALUES(update_time) > update_time, 
              VALUES(status), 
              status);

实战:Kafka 到 MySQL 的 Upsert 示例

完整 Pipeline 示例

-- 1. 创建Kafka源表
CREATE TABLE kafka_orders (
    order_id STRING,
    product_id STRING,
    amount DECIMAL(10,2),
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json'
);

-- 2. 创建MySQL目标表
CREATE TABLE mysql_order_summary (
    product_id STRING PRIMARY KEY,
    total_amount DECIMAL(10,2),
    last_order_time TIMESTAMP(3)
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql:3306/db',
    'table-name' = 'order_summary',
    'username' = 'user',
    'password' = 'pass'
);

-- 3. 执行Upsert操作
INSERT INTO mysql_order_summary
SELECT 
    product_id,
    SUM(amount) as total_amount,
    MAX(order_time) as last_order_time
FROM kafka_orders
GROUP BY product_id
ON DUPLICATE KEY UPDATE
    total_amount = VALUES(total_amount),
    last_order_time = VALUES(last_order_time);

关键配置说明

  1. kafka.source.auto.offset.reset: latest/earliest
  2. jdbc.sink.max-retries: 建议设置为3
  3. table.exec.state.ttl: 设置状态保留时间

性能优化与常见问题

优化技巧

  1. 批量写入

    'sink.buffer-flush.interval' = '1s',
    'sink.buffer-flush.max-rows' = '100'
    
  2. 异步提交

    'sink.parallelism' = '4'
    
  3. 状态后端选择

    state.backend: rocksdb
    state.checkpoints.dir: hdfs:///flink/checkpoints
    

常见错误排查

  1. 主键冲突

    org.apache.flink.table.api.TableException: Table sink requires primary key but no primary key is defined
    

    解决方案:确认目标表PRIMARY KEY定义

  2. 数据类型不匹配

    java.sql.SQLException: Incorrect datetime value
    

    解决方案:检查TIMESTAMP精度设置


与 CDC 的集成实践

Debezium + Flink Upsert 示例

CREATE TABLE cdc_source (
    id INT,
    name STRING,
    description STRING,
    update_ts TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'inventory.db.customers',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'debezium-json'
);

-- 将CDC变更同步到OLAP系统
INSERT INTO doris_output
SELECT * FROM cdc_source
ON DUPLICATE KEY UPDATE
    name = VALUES(name),
    description = VALUES(description);

变更日志处理策略

操作类型 处理方式
INSERT 直接插入
UPDATE 根据主键更新
DELETE 需配置sink.ignore-delete

版本兼容性说明

Flink 1.12 特性矩阵

功能 社区版 企业版增强
基本Upsert
自动重试 ×
冲突解决策略 简单覆盖 条件更新

升级注意事项

  1. 从1.11升级:需要重建状态
  2. 与Checkpoint兼容性:建议使用Savepoint迁移
  3. Connector API变化:部分参数需要调整

最佳实践总结

设计原则

  1. 主键设计:选择业务不可变字段
  2. 更新策略:明确业务冲突解决逻辑
  3. 监控指标:关注sink.num-records-outsink.num-records-out-errors

典型应用场景

  1. 实时聚合结果更新
  2. 维度表变更同步
  3. 跨系统数据一致性保证

扩展阅读

”`

注:实际文档应包含更多示例代码、性能测试数据和架构图。本文档框架可根据需要扩展具体章节的详细内容。

推荐阅读:
  1. Flink与数据库集成方法是什么
  2. flink1.12怎么通过kerberos认证读取kafka数据

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

flink

上一篇:如何绘制UML状态机图

下一篇:绘制UML图时应避免的问题有哪些

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》