内容纲要
用 DAG 设计工作流是把任务与依赖关系用图结构表达出来,清晰、可并行化、易于重跑与监控。
下面提供一套实战化、工程化的步骤、设计原则、常见模式与示例代码(Airflow + Prefect 风格),可以直接拿来用或改造。
核心概念(一分钟速懂)
- 节点(Task):单个工作单元(脚本、API 调用、SQL、数据迁移等)。
- 有向边(Dependency):表示执行顺序(A → B 表示 B 依赖 A 完成)。
- 无环(Acyclic):不能有循环依赖,保证可调度和能终止。
- DAG = 任务集合 + 依赖关系 + 调度策略。
设计原则(必须牢记)
- 单一职责:每个 Task 做一件事(便于重试与重用)。
- 幂等性:Task 可重复运行而不引起错误(use idempotent operations)。
- 显式依赖:不要靠隐式时间等待(sleep)来保证顺序。
- 可重跑/断点续跑:把中间产物持久化(对象存储、数据库、parquet 等)。
- 资源隔离:重 IO/CPU 任务分配独立资源/队列。
- 小文件/批次控制:避免生成大量小文件造成元数据压力。
- 监控与告警:任务失败、延迟、重试等必须可观测。
- 版本化与审计:DAG 代码、配置与 schema 都需版本控制。
设计流程(从想法到生产)
- 画出高层数据流图:列出输入、输出、关键转换、关键依赖。
- 拆解任务:把大步骤拆成原子 Task(提取/清洗/转换/加载/报告)。
- 定义边界与契约:每个 Task 的输入/输出格式与位置(schema、文件名模板)。
- 选择调度/编排工具:Airflow、Prefect、Dagster、Argo Workflows 等。
- 实现任务与依赖:本地用小数据测试,写单元测试 + 集成测试。
- 加上重试/超时/并发限制:为每个 task 配置重试策略、超时、并发上限。
- 部署 & 监控:日志、指标(成功率、延迟)、告警(失败/异常延迟)。
- 运维 & 迭代:性能瓶颈、失败模式分析、重构 DAG(拆分或合并任务)。
常见设计模式(摘取常用的)
- 线性流水线:A → B → C(简单 ETL)。
- 分支/条件执行:基于结果走不同分支(if/else)。
- 并行化 + 聚合:多个独立任务并行跑,最后聚合结果。
- 动态 DAG:运行时生成任务(例如根据输入表数目)。
- 子 DAG / 子流程:把复杂子流程封装成可复用模块。
- 事件驱动触发:用文件到达或消息队列触发 DAG(而非固定时间)。
- 幂等增量:增量处理(基于 watermark/offset)避免全量重跑。
失败与重试策略建议
- 幂等 + 并发保护:对于可能被重试的 task,确保不会重复写脏数据(使用临时路径 + 原子重命名/事务)。
- 分级重试:短暂网络抖动重试短次数;外部系统故障延长重试间隔或上报人工。
- 退避策略(exponential backoff):避免短时间内大量重试压垮下游。
- 告警策略:严重失败直接发短信/钉钉/Slack,非关键失败只通知日志/看板。
监测与可观测(必做)
- 每个 Task 记录:开始时间、结束时间、输入摘要、输出位置、耗时、消耗资源。
- 指标:DAG 成功率、平均时长、延迟(SLA)、重试次数分布。
- Log 聚合(ELK/Graylog/Cloud logs)、Trace(若可用),并对异常设置告警阈值。
文件夹与代码组织建议
dags/
pipelines/
daily_sales.py # DAG 定义
utils/
db.py
s3.py
common.py
tasks/
extract/
fetch_orders.py
transform/
clean_orders.py
load/
load_to_db.py
tests/
unit/
integration/
configs/
env_dev.yml
env_prod.yml
实战示例:Airflow DAG(Python)
下面是一个小的 Airflow DAG 示例:每日跑数据提取 → 清洗 → 加载 → 生成报告。你可以直接复制到 dags/
目录运行(需 Airflow 环境)。
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
"owner": "data-eng",
"depends_on_past": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["oncall@example.com"],
}
def extract(**ctx):
# 幂等实现:写入临时文件 temp/<run_id>/orders.json,然后原子移动到 final/
run_id = ctx["run_id"]
# 模拟提取
path = f"/data/raw/orders/{run_id}.json"
with open(path, "w") as f:
f.write("...")
def transform(**ctx):
# 读取上一步输出并清洗,输出到 /data/processed/
pass
def load(**ctx):
# 把 processed 写入数据库,使用事务/幂等键
pass
with DAG(
dag_id="daily_orders_pipeline",
default_args=default_args,
description="每日订单 ETL",
schedule_interval="0 2 * * *",
start_date=datetime(2025, 1, 1),
catchup=False,
max_active_runs=1,
) as dag:
t1 = PythonOperator(task_id="extract_orders", python_callable=extract)
t2 = PythonOperator(task_id="transform_orders", python_callable=transform)
t3 = PythonOperator(task_id="load_orders", python_callable=load)
t1 >> t2 >> t3
另一个轻量示例:Prefect(更简单、现代)
from prefect import flow, task
@task(retries=2, retry_delay_seconds=60)
def extract():
return "raw data"
@task
def transform(raw):
return raw.upper()
@task
def load(data):
print("loaded", data)
@flow
def daily_flow():
raw = extract()
processed = transform(raw)
load(processed)
if __name__ == "__main__":
daily_flow()
测试与本地调试要点
- 单元测试:每个 task 的函数可在本地用小样本测试。
- 集成测试:用一套测试数据在隔离环境跑完整 DAG。
- 回归测试:schema 变更时确保失败可被捕获并回滚。
- Dry-run:Airflow 的
airflow tasks test
可以在本地执行单个任务。
常见反模式(要避免)
- 把一整个 ETL 写成一个超大 Task(不可复用,难以监控)。
- 使用时间睡眠(sleep)作为依赖控制。
- 未持久化中间产物导致失败无法断点续跑。
- 把调度逻辑写死在任务内(难以动态伸缩)。
实用清单(部署时逐项核对)
- 确认每个 task 是否幂等。
- 输入/输出契约文档化(路径、schema)。
- 为每个 Task 配置合理的重试与超时。
- 设置资源队列/池(避免过载 DB/外部接口)。
- 日志、指标、告警接入。
- 安全与凭据管理(Secrets 管理)。
- DAG 版本化与变更审批流程。