如何使用DAG(有向无环图)设计一个工作流

内容纲要

用 DAG 设计工作流是把任务与依赖关系用图结构表达出来,清晰、可并行化、易于重跑与监控。
下面提供一套实战化、工程化的步骤、设计原则、常见模式与示例代码(Airflow + Prefect 风格),可以直接拿来用或改造。

核心概念(一分钟速懂)

  • 节点(Task):单个工作单元(脚本、API 调用、SQL、数据迁移等)。
  • 有向边(Dependency):表示执行顺序(A → B 表示 B 依赖 A 完成)。
  • 无环(Acyclic):不能有循环依赖,保证可调度和能终止。
  • DAG = 任务集合 + 依赖关系 + 调度策略

设计原则(必须牢记)

  1. 单一职责:每个 Task 做一件事(便于重试与重用)。
  2. 幂等性:Task 可重复运行而不引起错误(use idempotent operations)。
  3. 显式依赖:不要靠隐式时间等待(sleep)来保证顺序。
  4. 可重跑/断点续跑:把中间产物持久化(对象存储、数据库、parquet 等)。
  5. 资源隔离:重 IO/CPU 任务分配独立资源/队列。
  6. 小文件/批次控制:避免生成大量小文件造成元数据压力。
  7. 监控与告警:任务失败、延迟、重试等必须可观测。
  8. 版本化与审计:DAG 代码、配置与 schema 都需版本控制。

设计流程(从想法到生产)

  1. 画出高层数据流图:列出输入、输出、关键转换、关键依赖。
  2. 拆解任务:把大步骤拆成原子 Task(提取/清洗/转换/加载/报告)。
  3. 定义边界与契约:每个 Task 的输入/输出格式与位置(schema、文件名模板)。
  4. 选择调度/编排工具:Airflow、Prefect、Dagster、Argo Workflows 等。
  5. 实现任务与依赖:本地用小数据测试,写单元测试 + 集成测试。
  6. 加上重试/超时/并发限制:为每个 task 配置重试策略、超时、并发上限。
  7. 部署 & 监控:日志、指标(成功率、延迟)、告警(失败/异常延迟)。
  8. 运维 & 迭代:性能瓶颈、失败模式分析、重构 DAG(拆分或合并任务)。

常见设计模式(摘取常用的)

  • 线性流水线:A → B → C(简单 ETL)。
  • 分支/条件执行:基于结果走不同分支(if/else)。
  • 并行化 + 聚合:多个独立任务并行跑,最后聚合结果。
  • 动态 DAG:运行时生成任务(例如根据输入表数目)。
  • 子 DAG / 子流程:把复杂子流程封装成可复用模块。
  • 事件驱动触发:用文件到达或消息队列触发 DAG(而非固定时间)。
  • 幂等增量:增量处理(基于 watermark/offset)避免全量重跑。

失败与重试策略建议

  • 幂等 + 并发保护:对于可能被重试的 task,确保不会重复写脏数据(使用临时路径 + 原子重命名/事务)。
  • 分级重试:短暂网络抖动重试短次数;外部系统故障延长重试间隔或上报人工。
  • 退避策略(exponential backoff):避免短时间内大量重试压垮下游。
  • 告警策略:严重失败直接发短信/钉钉/Slack,非关键失败只通知日志/看板。

监测与可观测(必做)

  • 每个 Task 记录:开始时间、结束时间、输入摘要、输出位置、耗时、消耗资源。
  • 指标:DAG 成功率、平均时长、延迟(SLA)、重试次数分布。
  • Log 聚合(ELK/Graylog/Cloud logs)、Trace(若可用),并对异常设置告警阈值。

文件夹与代码组织建议

dags/
  pipelines/
    daily_sales.py        # DAG 定义
  utils/
    db.py
    s3.py
    common.py
tasks/
  extract/
    fetch_orders.py
  transform/
    clean_orders.py
  load/
    load_to_db.py
tests/
  unit/
  integration/
configs/
  env_dev.yml
  env_prod.yml

实战示例:Airflow DAG(Python)

下面是一个小的 Airflow DAG 示例:每日跑数据提取 → 清洗 → 加载 → 生成报告。你可以直接复制到 dags/ 目录运行(需 Airflow 环境)。

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

default_args = {
    "owner": "data-eng",
    "depends_on_past": False,
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
    "email": ["oncall@example.com"],
}

def extract(**ctx):
    # 幂等实现:写入临时文件 temp/<run_id>/orders.json,然后原子移动到 final/
    run_id = ctx["run_id"]
    # 模拟提取
    path = f"/data/raw/orders/{run_id}.json"
    with open(path, "w") as f:
        f.write("...")

def transform(**ctx):
    # 读取上一步输出并清洗,输出到 /data/processed/
    pass

def load(**ctx):
    # 把 processed 写入数据库,使用事务/幂等键
    pass

with DAG(
    dag_id="daily_orders_pipeline",
    default_args=default_args,
    description="每日订单 ETL",
    schedule_interval="0 2 * * *",
    start_date=datetime(2025, 1, 1),
    catchup=False,
    max_active_runs=1,
) as dag:
    t1 = PythonOperator(task_id="extract_orders", python_callable=extract)
    t2 = PythonOperator(task_id="transform_orders", python_callable=transform)
    t3 = PythonOperator(task_id="load_orders", python_callable=load)

    t1 >> t2 >> t3

另一个轻量示例:Prefect(更简单、现代)

from prefect import flow, task

@task(retries=2, retry_delay_seconds=60)
def extract():
    return "raw data"

@task
def transform(raw):
    return raw.upper()

@task
def load(data):
    print("loaded", data)

@flow
def daily_flow():
    raw = extract()
    processed = transform(raw)
    load(processed)

if __name__ == "__main__":
    daily_flow()

测试与本地调试要点

  • 单元测试:每个 task 的函数可在本地用小样本测试。
  • 集成测试:用一套测试数据在隔离环境跑完整 DAG。
  • 回归测试:schema 变更时确保失败可被捕获并回滚。
  • Dry-run:Airflow 的 airflow tasks test 可以在本地执行单个任务。

常见反模式(要避免)

  • 把一整个 ETL 写成一个超大 Task(不可复用,难以监控)。
  • 使用时间睡眠(sleep)作为依赖控制。
  • 未持久化中间产物导致失败无法断点续跑。
  • 把调度逻辑写死在任务内(难以动态伸缩)。

实用清单(部署时逐项核对)

  • 确认每个 task 是否幂等。
  • 输入/输出契约文档化(路径、schema)。
  • 为每个 Task 配置合理的重试与超时。
  • 设置资源队列/池(避免过载 DB/外部接口)。
  • 日志、指标、告警接入。
  • 安全与凭据管理(Secrets 管理)。
  • DAG 版本化与变更审批流程。

Leave a Comment

您的电子邮箱地址不会被公开。 必填项已用*标注

close
arrow_upward