您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Flink SQL怎么实时计算当天PV写入MySQL
## 前言
在大数据实时计算领域,Apache Flink已经成为事实上的标准之一。其中Flink SQL凭借其声明式的编程方式和较低的入门门槛,让开发者能够快速实现实时数据处理需求。本文将详细介绍如何使用Flink SQL实时计算当天的页面访问量(PV)并将结果写入MySQL数据库。
## 一、环境准备
### 1.1 所需组件
- Apache Flink 1.13+(支持SQL语法)
- MySQL 5.7+
- Kafka(作为数据源)
### 1.2 Maven依赖
```xml
<dependencies>
<!-- Flink相关依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<!-- MySQL连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
</dependencies>
假设我们有一个Kafka topic user_events
,其中包含用户访问日志:
{
"user_id": "user123",
"page_url": "/product/123",
"access_time": "2023-07-20 14:30:00",
"ip_address": "192.168.1.1"
}
CREATE TABLE user_events (
user_id STRING,
page_url STRING,
access_time TIMESTAMP(3),
ip_address STRING,
WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_events',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'pv-group',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
CREATE TABLE daily_pv (
calc_date DATE,
pv_count BIGINT,
update_time TIMESTAMP(3),
PRIMARY KEY (calc_date) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysql:3306/analytics',
'table-name' = 'daily_pv_stats',
'username' = 'flink_user',
'password' = 'flink_pwd',
'sink.buffer-flush.interval' = '1s',
'sink.buffer-flush.max-rows' = '100'
);
-- 计算当天的PV(按自然日)
INSERT INTO daily_pv
SELECT
CAST(access_time AS DATE) AS calc_date,
COUNT(*) AS pv_count,
CURRENT_TIMESTAMP AS update_time
FROM user_events
WHERE
-- 可选:过滤无效数据
user_id IS NOT NULL
AND page_url IS NOT NULL
-- 只处理当天的数据(假设是T+1计算可以去掉这个条件)
AND CAST(access_time AS DATE) = CURRENT_DATE
GROUP BY CAST(access_time AS DATE);
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class RealtimePVCalculator {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. 设置检查点(确保Exactly-Once语义)
env.enableCheckpointing(30000);
// 3. 执行SQL
tableEnv.executeSql("CREATE TABLE user_events (...)");
tableEnv.executeSql("CREATE TABLE daily_pv (...)");
tableEnv.executeSql("INSERT INTO daily_pv SELECT ...");
// 4. 启动任务
env.execute("Realtime PV Calculation Job");
}
}
-- 修改源表定义,允许迟到数据
CREATE TABLE user_events (
...
WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND
) WITH (...);
-- 使用窗口函数处理迟到数据
INSERT INTO daily_pv
SELECT
window_date AS calc_date,
pv_count,
CURRENT_TIMESTAMP AS update_time
FROM (
SELECT
TUMBLE_START(access_time, INTERVAL '1' DAY) AS window_date,
COUNT(*) AS pv_count
FROM user_events
GROUP BY TUMBLE(access_time, INTERVAL '1' DAY)
);
MySQL结果表需要设置主键(已在前面定义),Flink JDBC连接器会自动处理:
PRIMARY KEY (calc_date) NOT ENFORCED
并行度设置:
env.setParallelism(4);
状态后端配置:
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints"));
MySQL批量写入:
'sink.buffer-flush.max-rows' = '500',
'sink.buffer-flush.interval' = '2s'
通过Flink Web UI或Metric Reporter监控: - 输入记录数 - 输出记录数 - 延迟情况
Kafka消费位点管理:
'scan.startup.mode' = 'group-offsets'
MySQL写入重试:
'sink.max-retries' = '3'
检查点超时设置:
env.getCheckpointConfig().setCheckpointTimeout(60000);
数据准确性校验:
资源隔离:
版本管理:
通过本文的介绍,我们实现了一个完整的Flink SQL实时PV计算方案。这种方案具有以下优势:
实际生产中,还需要考虑业务高峰期处理、集群资源分配等问题。希望本文能为您的实时计算实践提供有价值的参考。 “`
注:本文实际约2000字,包含了从环境准备到生产实践的完整流程。如需调整字数或内容细节,可以进一步修改。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。