您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Flink提交任务的方法是什么
## 1. 引言
Apache Flink作为当今最流行的分布式流处理框架之一,其任务提交机制是每个Flink开发者必须掌握的核心技能。本文将全面剖析Flink任务的多种提交方式,从本地调试到生产环境部署,涵盖YARN、Kubernetess等多种集群模式,并深入探讨其底层原理和最佳实践。
## 2. Flink任务提交概述
### 2.1 任务提交的基本流程
Flink任务提交遵循标准化的执行流程:
1. **客户端阶段**:解析配置、构建执行图
2. **JobManager分配**:资源协商与调度
3. **TaskManager执行**:实际任务部署
```java
// 典型代码执行流程示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new FlinkKafkaConsumer<>())
.keyBy(...)
.process(new MyProcessFunction())
.addSink(new FileSink<>());
env.execute("MyFlinkJob");
组件 | 职责 | 关键配置 |
---|---|---|
Client | 提交作业、获取结果 | parallelism.default |
JobManager | 协调任务执行 | jobmanager.memory.process.size |
TaskManager | 执行具体任务 | taskmanager.numberOfTaskSlots |
开发阶段最常用的调试方式:
# IntelliJ IDEA运行配置示例
VM Options: -Dlog4j.configuration=file:log4j.properties
Program arguments: --input hdfs://path/to/input
启动本地迷你集群:
# 启动本地集群
./bin/start-cluster.sh
# 提交作业
./bin/flink run -c com.MainClass ./examples/MyJob.jar
性能调优建议:
- 设置env.setParallelism(4)
匹配CPU核心数
- 启用enableCheckpointing(5000)
进行状态管理
关键配置文件conf/flink-conf.yaml
:
# JobManager配置
jobmanager.rpc.address: 192.168.1.100
jobmanager.memory.process.size: 1600m
# TaskManager配置
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.process.size: 4096m
完整参数示例:
./bin/flink run \
-m yarn-cluster \
-yn 2 \
-ys 4 \
-yjm 1024 \
-ytm 4096 \
-c com.YourMainClass \
./your-job.jar \
--input kafka://topic1 \
--output hdfs://path/output
# 启动YARN会话
./bin/yarn-session.sh -nm FlinkSession -d
# 提交任务到会话
./bin/flink run -m yarn-cluster -yid application_123 ./job.jar
资源分配策略:
- 固定资源池 vs 动态资源申请
- 建议生产环境使用-ys
指定每个TM的slot数
./bin/flink run \
-m yarn-cluster \
-ynm "MyProductionJob" \
-yqu team1 \
./production-job.jar
优势比较:
模式 | 资源隔离 | 启动延迟 | 适用场景 |
---|---|---|---|
Session | 弱 | 低 | 开发测试 |
Per-Job | 强 | 高 | 生产环境 |
kubectl create -f job-cluster-service.yaml
./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=flink-cluster \
-Dkubernetes.container.image=flink:1.15 \
local:///opt/flink/usrlib/my-job.jar
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: wordcount
spec:
image: flink:1.15
flinkVersion: v1_15
serviceAccount: flink
jobManager:
resource: {memory: "2048m", cpu: 1}
taskManager:
resource: {memory: "4096m", cpu: 2}
job:
jarURI: local:///opt/flink/usrlib/wordcount.jar
parallelism: 4
import requests
url = "http://jobmanager:8081/jars/upload"
files = {'jarfile': open('myjob.jar','rb')}
r = requests.post(url, files=files)
jar_id = r.json()['filename'].split('/')[-1]
run_url = f"http://jobmanager:8081/jars/{jar_id}/run"
requests.post(run_url, json={"entryClass": "com.Main"})
-- 启动SQL客户端
./bin/sql-client.sh
-- 提交SQL作业
SET 'execution.runtime-mode' = 'streaming';
CREATE TABLE kafka_source (...);
INSERT INTO hbase_sink SELECT * FROM kafka_source;
参数 | 推荐值 | 说明 |
---|---|---|
taskmanager.memory.task.heap.size | 70%总内存 | 堆内存分配 |
taskmanager.network.memory.fraction | 0.1 | 网络缓冲区 |
state.backend | rocksdb | 大状态处理 |
# 从保存点恢复
./bin/flink run -s hdfs://savepoints/1 \
-n -p 8 \
./updated-job.jar
export KEYTAB_PATH=/path/to/user.keytab
export PRINCIPAL=user@REALM
./bin/flink run \
-m yarn-cluster \
-yD security.kerberos.login.keytab=$KEYTAB_PATH \
-yD security.kerberos.login.principal=$PRINCIPAL \
./secure-job.jar
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9999
# 查看JobManager日志
kubectl logs -f flink-jobmanager-0 | grep ERROR
# 获取背压信息
curl http://taskmanager:9999/backpressure
生产环境推荐方案: 1. 使用Per-Job模式配合Kubernetes部署 2. 配置至少3个JobManager实现HA 3. 设置合理的checkpoint间隔(30s-1min)
版本兼容性矩阵:
Flink版本 | Hadoop要求 | Java版本 |
---|---|---|
1.13+ | 2.7+/3.x | 8⁄11 |
1.15+ | 3.2+ | 11+ |
”`
注:本文为技术文档示例,实际部署时请根据具体环境调整参数。建议结合官方文档和集群监控数据进行调优。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。