【AI Agent 知识库】03-工作流编排-详解版

内容纲要

模块三:工作流编排(详解版)

覆盖:DAG、FSM、LangGraph、CrewAI、Supervisor 模式


目录


必须掌握的概念

3.1 DAG(有向无环图)

定义:
DAG 是一种图结构,节点表示任务,边表示依赖关系,保证没有循环依赖。

核心属性:

  • 节点:任务
  • 边:依赖关系(A → B 表示 B 依赖 A)
  • 无环:没有循环依赖

示例:

任务流程:
A(数据准备)
    ↓
B(数据清洗)
    ↓
C(特征工程)
    ↓
D(模型训练)
    ↓
E(模型评估)

DAG 表示:
A → B → C → D → E

3.2 FSM(有限状态机)

定义:
FSM 通过有限个状态和状态之间的转换来控制流程。

核心要素:

  • 状态:流程的不同阶段
  • 事件:触发状态转换的条件
  • 转换:状态到状态的映射
  • 动作:状态转换时执行的操作

3.3 LangGraph

定义:
LangChain 的图编排库,用于构建有状态的、循环的 Agent 应用。

核心概念:

  • Node(节点):执行特定操作的函数
  • Edge(边):节点之间的连接
  • State(状态):在节点间传递的数据
  • Conditional Edge(条件边):根据状态选择下一个节点

关键设计点

3.1 LangGraph 完整实现

# workflow/langgraph_demo.py
"""
LangGraph 工作流完整实现
包含:状态定义、节点设计、条件边、执行
"""

from typing import TypedDict, Annotated, Literal, List, Dict, Any
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
import operator

# ============ 状态定义 ============

class GraphState(TypedDict):
    """工作流状态"""
    query: str                              # 用户查询
    search_results: List[str]              # 搜索结果
    analysis: str                           # 分析结果
    summary: str                             # 最终总结
    attempts: Annotated[int, operator.add]  # 尝试次数

# ============ 节点定义 ============

def search_node(state: GraphState) -> Dict[str, Any]:
    """搜索节点:执行信息搜索"""
    print("🔍 执行搜索...")

    # 模拟搜索
    search_results = [
        f"关于 {state['query']} 的相关资料 1",
        f"关于 {state['query']} 的相关资料 2",
        f"关于 {state['query']} 的相关资料 3"
    ]

    print(f"✅ 搜索到 {len(search_results)} 条结果")

    return {"search_results": search_results}

def analysis_node(state: GraphState) -> Dict[str, Any]:
    """分析节点:分析搜索结果"""
    print("🔍 执行分析...")

    # 模拟分析
    if not state["search_results"]:
        analysis = "搜索结果为空,无法分析"
    else:
        analysis = f"基于 {len(state['search_results'])} 条搜索结果进行分析,发现..."

    print(f"✅ 分析完成:{analysis[:50]}...")

    return {"analysis": analysis}

def summary_node(state: GraphState) -> Dict[str, Any]:
    """总结节点:生成最终总结"""
    print("🔍 生成总结...")

    # 模拟总结
    summary = f"针对问题 '{state['query']}' 的总结:\n"
    summary += f"搜索结果:{', '.join(state['search_results'])}\n"
    summary += f"分析结论:{state['analysis']}"

    print("✅ 总结生成完成")

    return {"summary": summary}

# ============ 边定义 ============

def should_continue(state: GraphState) -> Literal["continue", "analyze", "end"]:
    """判断是否继续"""
    attempts = state.get("attempts", 0)

    print(f"📊 尝试次数:{attempts}")

    # 超过最大次数
    if attempts >= 3:
        print("⏹ 超过最大尝试次数,结束")
        return "end"

    # 判断是否有搜索结果
    if not state.get("search_results"):
        return "continue"
    else:
        return "analyze"

def is_search_empty(state: GraphState) -> bool:
    """检查搜索是否为空"""
    return len(state.get("search_results", [])) == 0

# ============ 构建图 ============

def build_workflow():
    """构建 LangGraph 工作流"""
    from langgraph.graph import StateGraph, END

    # 创建图
    workflow = StateGraph(GraphState)

    # 添加节点
    workflow.add_node("search", search_node)
    workflow.add_node("analysis", analysis_node)
    workflow.add_node("summary", summary_node)

    # 设置入口
    workflow.set_entry_point("search")

    # 添加条件边
    workflow.add_conditional_edges(
        "search",
        should_continue,
        {
            "continue": "search",      # 继续搜索
            "analyze": "analysis",    # 进入分析
            "end": END                 # 结束
        }
    )

    # 添加普通边
    workflow.add_edge("analysis", "summary")
    workflow.add_edge("summary", END)

    # 编译图
    return workflow.compile()

# ============ 可视化 ============

def visualize_workflow(workflow):
    """可视化工作流"""
    try:
        from IPython.display import Image, display
        image_data = workflow.get_graph().draw_png()
        display(Image(image_data))
    except ImportError:
        print("无法显示图片,请安装 IPython")
        print(workflow.get_graph().print_ascii())

# ============ 执行工作流 ============

def execute_workflow(query: str):
    """执行工作流"""
    print("=" * 60)
    print(f"开始执行工作流,查询:{query}")
    print("=" * 60)

    # 构建工作流
    workflow = build_workflow()

    # 可视化
    print("\n工作流图:")
    visualize_workflow(workflow)

    # 初始状态
    initial_state = {
        "query": query,
        "search_results": [],
        "analysis": "",
        "summary": "",
        "attempts": 0
    }

    # 执行
    final_state = workflow.invoke(initial_state)

    print("\n" + "=" * 60)
    print("最终结果:")
    print("=" * 60)
    print(final_state.get("summary", "处理失败"))

    return final_state

# ============ 使用示例 ============

if __name__ == "__main__":
    # 执行工作流
    query = "Python 并发编程的最佳实践"
    result = execute_workflow(query)

3.2 Multi-Agent 编排(Supervisor 模式)

# workflow/multi_agent_workflow.py
"""
Multi-Agent 工作流
包含:Supervisor、专业 Agents、任务分配
"""

from typing import List, Dict, Optional
from dataclasses import dataclass
from enum import Enum
import json
from datetime import datetime

# ============ 数据结构 ============

class AgentType(Enum):
    RESEARCHER = "researcher"
    ANALYZER = "analyzer"
    WRITER = "writer"
    CODER = "coder"

@dataclass
class Task:
    id: str
    description: str
    agent_type: Optional[AgentType] = None
    dependencies: List[str] = None
    status: str = "pending"
    result: Optional[str] = None
    error: Optional[str] = None

@dataclass
class AgentMessage:
    sender: str
    receiver: str
    task: Task
    message_type: str  # request/response/complete
    timestamp: datetime = None

# ============ Agent 定义 ============

class BaseAgent:
    def __init__(self, name: str, agent_type: AgentType):
        self.name = name
        self.agent_type = agent_type

    def can_handle(self, task: Task) -> bool:
        """判断是否能处理该任务"""
        raise NotImplementedError

    def process(self, task: Task) -> str:
        """处理任务"""
        raise NotImplementedError

class ResearcherAgent(BaseAgent):
    def __init__(self):
        super().__init__("researcher", AgentType.RESEARCHER)

    def can_handle(self, task: Task) -> bool:
        keywords = ["搜索", "查找", "研究", "搜集"]
        return any(kw in task.description.lower() for kw in keywords)

    def process(self, task: Task) -> str:
        print(f"[{self.name}] 处理任务:{task.description}")
        # 模拟搜索
        result = f"已搜索并找到关于 {task.description} 的相关资料"
        print(f"[{self.name}] 完成:{result}")
        return result

class AnalyzerAgent(BaseAgent):
    def __init__(self):
        super().__init__("analyzer", AgentType.ANALYZER)

    def can_handle(self, task: Task) -> bool:
        keywords = ["分析", "评估", "统计"]
        return any(kw in task.description.lower() for kw in keywords)

    def process(self, task: Task) -> str:
        print(f"[{self.name}] 处理任务:{task.description}")
        result = f"已完成对 {task.description} 的分析"
        print(f"[{self.name}] 完成:{result}")
        return result

class WriterAgent(BaseAgent):
    def __init__(self):
        super().__init__("writer", AgentType.WRITER)

    def can_handle(self, task: Task) -> bool:
        keywords = ["写", "生成", "总结", "文档"]
        return any(kw in task.description.lower() for kw in keywords)

    def process(self, task: Task) -> str:
        print(f"[{self.name}] 处理任务:{task.description}")
        result = f"已生成关于 {task.description} 的文档"
        print(f"[{self.name}] 完成:{result}")
        return result

# ============ Supervisor ============

class Supervisor:
    def __init__(self, agents: List[BaseAgent]):
        self.agents = agents

    def route_task(self, task: Task) -> BaseAgent:
        """路由任务到合适的 Agent"""
        print(f"\n[Supervisor] 路由任务:{task.description}")

        # 计算每个 Agent 的置信度
        scores = []
        for agent in self.agents:
            if agent.can_handle(task):
                score = self._calculate_confidence(task, agent)
                scores.append((agent, score))

        if not scores:
            print("[Supervisor] ⚠ 没有合适的 Agent")
            return None

        # 选择置信度最高的 Agent
        selected = max(scores, key=lambda x: x[1])
        print(f"[Supervisor] 选择 Agent:{selected[0].name} (置信度:{selected[1]:.2f})")

        return selected[0]

    def _calculate_confidence(self, task: Task, agent: BaseAgent) -> float:
        """计算置信度"""
        # 简单实现:关键词匹配度
        keywords = task.description.split()
        agent_name_lower = agent.name.lower()
        matches = sum(1 for kw in keywords if kw.lower() in agent_name_lower)
        return min(matches / len(keywords) * 2, 1.0)

    def execute_workflow(self, tasks: List[Task]) -> Dict:
        """执行工作流"""
        results = {}

        # 简单执行(实际应该使用 DAG)
        for task in tasks:
            agent = self.route_task(task)

            if agent is None:
                task.status = "failed"
                task.error = "没有合适的 Agent"
            else:
                try:
                    task.agent_type = agent.agent_type
                    task.result = agent.process(task)
                    task.status = "completed"
                except Exception as e:
                    task.status = "failed"
                    task.error = str(e)

            results[task.id] = {
                "status": task.status,
                "agent": agent.name if agent else "none",
                "result": task.result,
                "error": task.error
            }

        return results

# ============ 使用示例 ============

if __name__ == "__main__":
    # 创建 Agents
    agents = [
        ResearcherAgent(),
        AnalyzerAgent(),
        WriterAgent()
    ]

    # 创建 Supervisor
    supervisor = Supervisor(agents)

    # 创建任务
    tasks = [
        Task(id="1", description="搜索 Python 并发编程资料"),
        Task(id="2", description="分析搜索结果"),
        Task(id="3", description="生成总结文档")
    ]

    # 执行工作流
    print("=" * 60)
    print("Multi-Agent 工作流执行")
    print("=" * 60)

    results = supervisor.execute_workflow(tasks)

    print("\n" + "=" * 60)
    print("执行结果:")
    print("=" * 60)
    for task_id, result in results.items():
        print(f"任务 {task_id}:")
        print(f"  状态:{result['status']}")
        print(f"  Agent:{result['agent']}")
        print(f"  结果:{result.get('result', 'N/A')}")
        print(f"  错误:{result.get('error', 'N/A')}")
        print()

常见坑与解决方案

3.1 DAG 循环依赖

问题:

A → B → C → A (循环依赖!

解决方案:

import networkx as nx

def check_dag(tasks):
    """检查 DAG 是否有环"""
    graph = nx.DiGraph()

    # 构建图
    for task in tasks:
        graph.add_node(task.id)
        for dep in task.dependencies:
            graph.add_edge(dep, task.id)

    # 检测环
    if nx.is_directed_cyclic_graph(graph):
        cycles = list(nx.simple_cycles(graph))
        raise ValueError(f"发现循环依赖:{cycles}")

    return True

# 执行拓扑排序
execution_order = list(nx.topological_sort(graph))

3.2 死锁问题

问题:
两个 Agent 互相等待

解决方案:

class TimeoutExecutor:
    def __init__(self, timeout: int = 30):
        self.timeout = timeout

    def execute_with_timeout(self, func, *args, **kwargs):
        """超时执行"""
        import threading

        result = [None]
        exception = [None]

        def worker():
            try:
                result[0] = func(*args, **kwargs)
            except Exception as e:
                exception[0] = e

        thread = threading.Thread(target=worker)
        thread.start()
        thread.join(timeout=self.timeout)

        if thread.is_alive():
            raise TimeoutError("执行超时")

        if exception[0]:
            raise exception[0]

        return result[0]

面试高频问法

Q1: LangGraph 和传统 Chain 的区别?

标准回答:

LangGraph vs 传统 Chain

【核心区别】

维度           | Chain             | LangGraph
---------------|-------------------|-------------------
结构           | 线性(串行)      | 图结构(复杂依赖)
可视化        | 无                | ✅ 内置
状态管理        | 简陋              | ✅ TypedDict
循环支持        | 难以实现          | ✅ 天然支持
条件分支        | 条件链            | ✅ 条件边
并行执行        | 无                | ✅ 支持

【何时使用】

场景          | Channel 推荐 | LangGraph 推荐
--------------|-------------|---------------
简单任务      | ✅           | ×
复杂依赖      | ×           | ✅
需要循环      | ×           | ✅
需要可视化    | ×           | ✅
多 Agent 协作   | ×           | ✅
状态追踪      | ×           | ✅

【架构对比】

Chain:
Query → Transform → Split → Reduce → Answer
(线性,无法分支)

LangGraph:
        Query
         ↓
    ┌────┴────┐
    ↓         ↓         ↓
Search   Direct  Database
    ├─────────┴────────┘
         ↓
      Re-rank
         ↓
    ┌────┴────┐
    ↓         ↓
 Answer  Fallback
(图结构,灵活分支)

记忆要点

工作流编排口诀:

DAG 设计要防环
拓扑排序定执行
状态定义用 TypedDict
条件边控制分支

LangGraph 优势:
图结构支持循环
TypedDict 管理状态
可视化辅助调试

Multi-Agent:
Supervisor 协调
Agent 接口标准化
任务路由动态化

实战场景

场景:自动化报告生成系统

需求:

  • 搜集数据
  • 分析数据
  • 生成图表
  • 编写报告
  • 邮件分发

实现:

# 见 workflow/langgraph_demo.py

文档版本: 1.0

close
arrow_upward