您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# 任务调度神器Airflow怎么用呢
## 一、什么是Airflow
Apache Airflow 是一个由Python编写的开源工作流管理平台,最初由Airbnb开发并于2016年开源。它通过**有向无环图(DAG)**的方式定义任务依赖关系,提供可视化界面监控任务执行状态,是现代数据工程领域的核心工具之一。
### 核心特性
1. **可视化工作流**:Web UI直观展示任务依赖和执行状态
2. **灵活调度**:支持基于时间、外部事件等多种触发方式
3. **可扩展性**:丰富的Operator和Hook支持各类系统集成
4. **编程式定义**:所有工作流通过Python代码定义,便于版本控制
## 二、核心概念解析
### 1. DAG (Directed Acyclic Graph)
```python
from airflow import DAG
from datetime import datetime
dag = DAG(
'my_first_dag',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily'
)
常见类型:
- BashOperator
:执行Shell命令
- PythonOperator
:执行Python函数
- EmailOperator
:发送邮件通知
- 数据库相关:PostgresOperator
, MySqlOperator
等
from airflow.operators.bash import BashOperator
task1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag
)
task1 >> task2 # 设置task2依赖task1
# 等价于
task2.set_upstream(task1)
# Python 3.8+环境
sudo apt-get install python3 python3-pip
# 推荐使用虚拟环境
python3 -m venv airflow_env
source airflow_env/bin/activate
pip install "apache-airflow==2.6.1" \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.6.1/constraints-3.8.txt"
airflow db init
airflow users create \
--username admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email spiderman@example.com
airflow webserver -p 8080
airflow scheduler
构建每日执行的电商数据处理流程: 1. 下载前日订单数据 2. 清洗转换数据 3. 加载到数据仓库 4. 生成销售报表 5. 发送通知邮件
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
default_args = {
'owner': 'ecom_team',
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
def process_data(**context):
import pandas as pd
execution_date = context['execution_date']
# 数据清洗逻辑
print(f"Processing data for {execution_date}")
with DAG(
'ecommerce_pipeline',
default_args=default_args,
start_date=datetime(2023, 6, 1),
schedule_interval='0 3 * * *', # 每天凌晨3点
catchup=False
) as dag:
download = BashOperator(
task_id='download_orders',
bash_command='wget https://example.com/orders/{{ ds }}.csv -O /tmp/orders.csv'
)
process = PythonOperator(
task_id='process_data',
python_callable=process_data,
provide_context=True
)
load = BashOperator(
task_id='load_to_dw',
bash_command='psql -c "COPY orders FROM \'/tmp/orders.csv\' CSV HEADER"'
)
report = BashOperator(
task_id='generate_report',
bash_command='python /scripts/generate_report.py {{ ds }}'
)
notify = EmailOperator(
task_id='send_email',
to='team@example.com',
subject='Daily Report {{ ds }}',
html_content="<h1>Daily ETL Completed</h1>"
)
download >> process >> load >> [report, notify]
from airflow.models import Variable
from airflow.hooks.base import BaseHook
# 使用变量
api_key = Variable.get("MY_API_KEY")
# 获取数据库连接
conn = BaseHook.get_connection("postgres_default")
conn_string = f"postgresql://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}"
# 生产者任务
def push_data(**context):
context['ti'].xcom_push(key='my_key', value='secret_data')
# 消费者任务
def pull_data(**context):
value = context['ti'].xcom_pull(key='my_key')
for region in ['north', 'south', 'east', 'west']:
dag_id = f'process_{region}_data'
with DAG(dag_id, schedule_interval='@daily') as region_dag:
start = DummyOperator(task_id='start')
process = PythonOperator(
task_id=f'process_{region}',
python_callable=process_region,
op_kwargs={'region': region}
)
start >> process
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
def __init__(self, my_param, **kwargs):
super().__init__(**kwargs)
self.my_param = my_param
def execute(self, context):
print(f"Running with param: {self.my_param}")
pool
和priority_weight
scheduler
的parallelism
参数# 查看任务日志
airflow tasks log my_dag my_task 2023-07-01
# 测试单个任务
airflow tasks test my_dag my_task 2023-07-01
Fernet
密钥加密数据库连接执行器类型 | 适用场景 | 优缺点 |
---|---|---|
SequentialExecutor | 开发测试 | 简单但无法并行 |
LocalExecutor | 小型生产环境 | 支持并行但单点故障 |
CeleryExecutor | 中大型生产环境 | 分布式但需要维护Redis |
KubernetesExecutor | 云原生环境 | 弹性伸缩但配置复杂 |
+-----------------+
| Load Balancer |
+--------+--------+
|
+----------------+----------------+
| |
+----------+---------+ +----------+---------+
| Web Server 1 | | Web Server 2 |
+----------+---------+ +----------+---------+
| |
+----------+---------+ +----------+---------+
| Scheduler 1 | | Scheduler 2 |
+--------------------+ +--------------------+
| |
+----------------+----------------+
|
+--------+--------+
| PostgreSQL HA |
+--------+--------+
|
+--------+--------+
| Redis Cluster |
+-----------------+
pip install apache-airflow-providers-google
pip install apache-airflow-providers-amazon
pip install apache-airflow-providers-microsoft-azure
SparkSubmitOperator
HiveOperator
KafkaProducerOperator
airflow-exporter
通过本文的全面介绍,您应该已经掌握了Airflow的核心概念和实战技巧。建议从简单DAG开始实践,逐步构建复杂的数据管道。Airflow的强大之处在于其灵活性,随着使用深入,您会发现它能解决各种自动化工作流需求。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。