Flink提交任务的方法是什么

发布时间:2021-12-31 14:32:07 作者:iii
来源:亿速云 阅读:418
# Flink提交任务的方法是什么

## 1. 引言

Apache Flink作为当今最流行的分布式流处理框架之一,其任务提交机制是每个Flink开发者必须掌握的核心技能。本文将全面剖析Flink任务的多种提交方式,从本地调试到生产环境部署,涵盖YARN、Kubernetess等多种集群模式,并深入探讨其底层原理和最佳实践。

## 2. Flink任务提交概述

### 2.1 任务提交的基本流程

Flink任务提交遵循标准化的执行流程:
1. **客户端阶段**:解析配置、构建执行图
2. **JobManager分配**:资源协商与调度
3. **TaskManager执行**:实际任务部署

```java
// 典型代码执行流程示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new FlinkKafkaConsumer<>())
   .keyBy(...)
   .process(new MyProcessFunction())
   .addSink(new FileSink<>());

env.execute("MyFlinkJob");

2.2 任务提交组件架构

组件 职责 关键配置
Client 提交作业、获取结果 parallelism.default
JobManager 协调任务执行 jobmanager.memory.process.size
TaskManager 执行具体任务 taskmanager.numberOfTaskSlots

3. 本地模式提交

3.1 IDE直接运行

开发阶段最常用的调试方式:

# IntelliJ IDEA运行配置示例
VM Options: -Dlog4j.configuration=file:log4j.properties
Program arguments: --input hdfs://path/to/input

3.2 本地Standalone集群

启动本地迷你集群:

# 启动本地集群
./bin/start-cluster.sh

# 提交作业
./bin/flink run -c com.MainClass ./examples/MyJob.jar

性能调优建议: - 设置env.setParallelism(4)匹配CPU核心数 - 启用enableCheckpointing(5000)进行状态管理

4. Standalone集群提交

4.1 集群配置

关键配置文件conf/flink-conf.yaml

# JobManager配置
jobmanager.rpc.address: 192.168.1.100
jobmanager.memory.process.size: 1600m

# TaskManager配置
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.process.size: 4096m

4.2 任务提交命令

完整参数示例:

./bin/flink run \
  -m yarn-cluster \
  -yn 2 \
  -ys 4 \
  -yjm 1024 \
  -ytm 4096 \
  -c com.YourMainClass \
  ./your-job.jar \
  --input kafka://topic1 \
  --output hdfs://path/output

5. YARN模式提交

5.1 会话模式(Session Mode)

# 启动YARN会话
./bin/yarn-session.sh -nm FlinkSession -d

# 提交任务到会话
./bin/flink run -m yarn-cluster -yid application_123 ./job.jar

资源分配策略: - 固定资源池 vs 动态资源申请 - 建议生产环境使用-ys指定每个TM的slot数

5.2 单作业模式(Per-Job Mode)

./bin/flink run \
  -m yarn-cluster \
  -ynm "MyProductionJob" \
  -yqu team1 \
  ./production-job.jar

优势比较

模式 资源隔离 启动延迟 适用场景
Session 开发测试
Per-Job 生产环境

6. Kubernetes模式提交

6.1 原生K8s部署

kubectl create -f job-cluster-service.yaml
./bin/flink run-application \
  --target kubernetes-application \
  -Dkubernetes.cluster-id=flink-cluster \
  -Dkubernetes.container.image=flink:1.15 \
  local:///opt/flink/usrlib/my-job.jar

6.2 使用Operator管理

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: wordcount
spec:
  image: flink:1.15
  flinkVersion: v1_15
  serviceAccount: flink
  jobManager:
    resource: {memory: "2048m", cpu: 1}
  taskManager:
    resource: {memory: "4096m", cpu: 2}
  job:
    jarURI: local:///opt/flink/usrlib/wordcount.jar
    parallelism: 4

7. 高级提交方式

7.1 REST API提交

import requests

url = "http://jobmanager:8081/jars/upload"
files = {'jarfile': open('myjob.jar','rb')}
r = requests.post(url, files=files)

jar_id = r.json()['filename'].split('/')[-1]
run_url = f"http://jobmanager:8081/jars/{jar_id}/run"
requests.post(run_url, json={"entryClass": "com.Main"})

7.2 SQL客户端提交

-- 启动SQL客户端
./bin/sql-client.sh

-- 提交SQL作业
SET 'execution.runtime-mode' = 'streaming';
CREATE TABLE kafka_source (...);
INSERT INTO hbase_sink SELECT * FROM kafka_source;

8. 参数调优指南

8.1 关键配置参数

参数 推荐值 说明
taskmanager.memory.task.heap.size 70%总内存 堆内存分配
taskmanager.network.memory.fraction 0.1 网络缓冲区
state.backend rocksdb 大状态处理

8.2 故障处理策略

# 从保存点恢复
./bin/flink run -s hdfs://savepoints/1 \
  -n -p 8 \
  ./updated-job.jar

9. 安全认证配置

9.1 Kerberos认证

export KEYTAB_PATH=/path/to/user.keytab
export PRINCIPAL=user@REALM

./bin/flink run \
  -m yarn-cluster \
  -yD security.kerberos.login.keytab=$KEYTAB_PATH \
  -yD security.kerberos.login.principal=$PRINCIPAL \
  ./secure-job.jar

10. 监控与运维

10.1 指标收集配置

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9999

10.2 日志排查技巧

# 查看JobManager日志
kubectl logs -f flink-jobmanager-0 | grep ERROR

# 获取背压信息
curl http://taskmanager:9999/backpressure

11. 总结与最佳实践

生产环境推荐方案: 1. 使用Per-Job模式配合Kubernetes部署 2. 配置至少3个JobManager实现HA 3. 设置合理的checkpoint间隔(30s-1min)

版本兼容性矩阵

Flink版本 Hadoop要求 Java版本
1.13+ 2.7+/3.x 811
1.15+ 3.2+ 11+

”`

注:本文为技术文档示例,实际部署时请根据具体环境调整参数。建议结合官方文档和集群监控数据进行调优。

推荐阅读:
  1. Spark提交任务
  2. 如何在IDEA上运行Flink任务

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

flink

上一篇:用于MAC的PDF和OCR识别软件Readiris 17怎么用

下一篇:linux如何替换字符串

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》