内容纲要
模块三:工作流编排(详解版)
覆盖:DAG、FSM、LangGraph、CrewAI、Supervisor 模式
目录
必须掌握的概念
3.1 DAG(有向无环图)
定义:
DAG 是一种图结构,节点表示任务,边表示依赖关系,保证没有循环依赖。
核心属性:
- 节点:任务
- 边:依赖关系(A → B 表示 B 依赖 A)
- 无环:没有循环依赖
示例:
任务流程:
A(数据准备)
↓
B(数据清洗)
↓
C(特征工程)
↓
D(模型训练)
↓
E(模型评估)
DAG 表示:
A → B → C → D → E
3.2 FSM(有限状态机)
定义:
FSM 通过有限个状态和状态之间的转换来控制流程。
核心要素:
- 状态:流程的不同阶段
- 事件:触发状态转换的条件
- 转换:状态到状态的映射
- 动作:状态转换时执行的操作
3.3 LangGraph
定义:
LangChain 的图编排库,用于构建有状态的、循环的 Agent 应用。
核心概念:
- Node(节点):执行特定操作的函数
- Edge(边):节点之间的连接
- State(状态):在节点间传递的数据
- Conditional Edge(条件边):根据状态选择下一个节点
关键设计点
3.1 LangGraph 完整实现
# workflow/langgraph_demo.py
"""
LangGraph 工作流完整实现
包含:状态定义、节点设计、条件边、执行
"""
from typing import TypedDict, Annotated, Literal, List, Dict, Any
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
import operator
# ============ 状态定义 ============
class GraphState(TypedDict):
"""工作流状态"""
query: str # 用户查询
search_results: List[str] # 搜索结果
analysis: str # 分析结果
summary: str # 最终总结
attempts: Annotated[int, operator.add] # 尝试次数
# ============ 节点定义 ============
def search_node(state: GraphState) -> Dict[str, Any]:
"""搜索节点:执行信息搜索"""
print("🔍 执行搜索...")
# 模拟搜索
search_results = [
f"关于 {state['query']} 的相关资料 1",
f"关于 {state['query']} 的相关资料 2",
f"关于 {state['query']} 的相关资料 3"
]
print(f"✅ 搜索到 {len(search_results)} 条结果")
return {"search_results": search_results}
def analysis_node(state: GraphState) -> Dict[str, Any]:
"""分析节点:分析搜索结果"""
print("🔍 执行分析...")
# 模拟分析
if not state["search_results"]:
analysis = "搜索结果为空,无法分析"
else:
analysis = f"基于 {len(state['search_results'])} 条搜索结果进行分析,发现..."
print(f"✅ 分析完成:{analysis[:50]}...")
return {"analysis": analysis}
def summary_node(state: GraphState) -> Dict[str, Any]:
"""总结节点:生成最终总结"""
print("🔍 生成总结...")
# 模拟总结
summary = f"针对问题 '{state['query']}' 的总结:\n"
summary += f"搜索结果:{', '.join(state['search_results'])}\n"
summary += f"分析结论:{state['analysis']}"
print("✅ 总结生成完成")
return {"summary": summary}
# ============ 边定义 ============
def should_continue(state: GraphState) -> Literal["continue", "analyze", "end"]:
"""判断是否继续"""
attempts = state.get("attempts", 0)
print(f"📊 尝试次数:{attempts}")
# 超过最大次数
if attempts >= 3:
print("⏹ 超过最大尝试次数,结束")
return "end"
# 判断是否有搜索结果
if not state.get("search_results"):
return "continue"
else:
return "analyze"
def is_search_empty(state: GraphState) -> bool:
"""检查搜索是否为空"""
return len(state.get("search_results", [])) == 0
# ============ 构建图 ============
def build_workflow():
"""构建 LangGraph 工作流"""
from langgraph.graph import StateGraph, END
# 创建图
workflow = StateGraph(GraphState)
# 添加节点
workflow.add_node("search", search_node)
workflow.add_node("analysis", analysis_node)
workflow.add_node("summary", summary_node)
# 设置入口
workflow.set_entry_point("search")
# 添加条件边
workflow.add_conditional_edges(
"search",
should_continue,
{
"continue": "search", # 继续搜索
"analyze": "analysis", # 进入分析
"end": END # 结束
}
)
# 添加普通边
workflow.add_edge("analysis", "summary")
workflow.add_edge("summary", END)
# 编译图
return workflow.compile()
# ============ 可视化 ============
def visualize_workflow(workflow):
"""可视化工作流"""
try:
from IPython.display import Image, display
image_data = workflow.get_graph().draw_png()
display(Image(image_data))
except ImportError:
print("无法显示图片,请安装 IPython")
print(workflow.get_graph().print_ascii())
# ============ 执行工作流 ============
def execute_workflow(query: str):
"""执行工作流"""
print("=" * 60)
print(f"开始执行工作流,查询:{query}")
print("=" * 60)
# 构建工作流
workflow = build_workflow()
# 可视化
print("\n工作流图:")
visualize_workflow(workflow)
# 初始状态
initial_state = {
"query": query,
"search_results": [],
"analysis": "",
"summary": "",
"attempts": 0
}
# 执行
final_state = workflow.invoke(initial_state)
print("\n" + "=" * 60)
print("最终结果:")
print("=" * 60)
print(final_state.get("summary", "处理失败"))
return final_state
# ============ 使用示例 ============
if __name__ == "__main__":
# 执行工作流
query = "Python 并发编程的最佳实践"
result = execute_workflow(query)
3.2 Multi-Agent 编排(Supervisor 模式)
# workflow/multi_agent_workflow.py
"""
Multi-Agent 工作流
包含:Supervisor、专业 Agents、任务分配
"""
from typing import List, Dict, Optional
from dataclasses import dataclass
from enum import Enum
import json
from datetime import datetime
# ============ 数据结构 ============
class AgentType(Enum):
RESEARCHER = "researcher"
ANALYZER = "analyzer"
WRITER = "writer"
CODER = "coder"
@dataclass
class Task:
id: str
description: str
agent_type: Optional[AgentType] = None
dependencies: List[str] = None
status: str = "pending"
result: Optional[str] = None
error: Optional[str] = None
@dataclass
class AgentMessage:
sender: str
receiver: str
task: Task
message_type: str # request/response/complete
timestamp: datetime = None
# ============ Agent 定义 ============
class BaseAgent:
def __init__(self, name: str, agent_type: AgentType):
self.name = name
self.agent_type = agent_type
def can_handle(self, task: Task) -> bool:
"""判断是否能处理该任务"""
raise NotImplementedError
def process(self, task: Task) -> str:
"""处理任务"""
raise NotImplementedError
class ResearcherAgent(BaseAgent):
def __init__(self):
super().__init__("researcher", AgentType.RESEARCHER)
def can_handle(self, task: Task) -> bool:
keywords = ["搜索", "查找", "研究", "搜集"]
return any(kw in task.description.lower() for kw in keywords)
def process(self, task: Task) -> str:
print(f"[{self.name}] 处理任务:{task.description}")
# 模拟搜索
result = f"已搜索并找到关于 {task.description} 的相关资料"
print(f"[{self.name}] 完成:{result}")
return result
class AnalyzerAgent(BaseAgent):
def __init__(self):
super().__init__("analyzer", AgentType.ANALYZER)
def can_handle(self, task: Task) -> bool:
keywords = ["分析", "评估", "统计"]
return any(kw in task.description.lower() for kw in keywords)
def process(self, task: Task) -> str:
print(f"[{self.name}] 处理任务:{task.description}")
result = f"已完成对 {task.description} 的分析"
print(f"[{self.name}] 完成:{result}")
return result
class WriterAgent(BaseAgent):
def __init__(self):
super().__init__("writer", AgentType.WRITER)
def can_handle(self, task: Task) -> bool:
keywords = ["写", "生成", "总结", "文档"]
return any(kw in task.description.lower() for kw in keywords)
def process(self, task: Task) -> str:
print(f"[{self.name}] 处理任务:{task.description}")
result = f"已生成关于 {task.description} 的文档"
print(f"[{self.name}] 完成:{result}")
return result
# ============ Supervisor ============
class Supervisor:
def __init__(self, agents: List[BaseAgent]):
self.agents = agents
def route_task(self, task: Task) -> BaseAgent:
"""路由任务到合适的 Agent"""
print(f"\n[Supervisor] 路由任务:{task.description}")
# 计算每个 Agent 的置信度
scores = []
for agent in self.agents:
if agent.can_handle(task):
score = self._calculate_confidence(task, agent)
scores.append((agent, score))
if not scores:
print("[Supervisor] ⚠ 没有合适的 Agent")
return None
# 选择置信度最高的 Agent
selected = max(scores, key=lambda x: x[1])
print(f"[Supervisor] 选择 Agent:{selected[0].name} (置信度:{selected[1]:.2f})")
return selected[0]
def _calculate_confidence(self, task: Task, agent: BaseAgent) -> float:
"""计算置信度"""
# 简单实现:关键词匹配度
keywords = task.description.split()
agent_name_lower = agent.name.lower()
matches = sum(1 for kw in keywords if kw.lower() in agent_name_lower)
return min(matches / len(keywords) * 2, 1.0)
def execute_workflow(self, tasks: List[Task]) -> Dict:
"""执行工作流"""
results = {}
# 简单执行(实际应该使用 DAG)
for task in tasks:
agent = self.route_task(task)
if agent is None:
task.status = "failed"
task.error = "没有合适的 Agent"
else:
try:
task.agent_type = agent.agent_type
task.result = agent.process(task)
task.status = "completed"
except Exception as e:
task.status = "failed"
task.error = str(e)
results[task.id] = {
"status": task.status,
"agent": agent.name if agent else "none",
"result": task.result,
"error": task.error
}
return results
# ============ 使用示例 ============
if __name__ == "__main__":
# 创建 Agents
agents = [
ResearcherAgent(),
AnalyzerAgent(),
WriterAgent()
]
# 创建 Supervisor
supervisor = Supervisor(agents)
# 创建任务
tasks = [
Task(id="1", description="搜索 Python 并发编程资料"),
Task(id="2", description="分析搜索结果"),
Task(id="3", description="生成总结文档")
]
# 执行工作流
print("=" * 60)
print("Multi-Agent 工作流执行")
print("=" * 60)
results = supervisor.execute_workflow(tasks)
print("\n" + "=" * 60)
print("执行结果:")
print("=" * 60)
for task_id, result in results.items():
print(f"任务 {task_id}:")
print(f" 状态:{result['status']}")
print(f" Agent:{result['agent']}")
print(f" 结果:{result.get('result', 'N/A')}")
print(f" 错误:{result.get('error', 'N/A')}")
print()
常见坑与解决方案
3.1 DAG 循环依赖
问题:
A → B → C → A (循环依赖!
解决方案:
import networkx as nx
def check_dag(tasks):
"""检查 DAG 是否有环"""
graph = nx.DiGraph()
# 构建图
for task in tasks:
graph.add_node(task.id)
for dep in task.dependencies:
graph.add_edge(dep, task.id)
# 检测环
if nx.is_directed_cyclic_graph(graph):
cycles = list(nx.simple_cycles(graph))
raise ValueError(f"发现循环依赖:{cycles}")
return True
# 执行拓扑排序
execution_order = list(nx.topological_sort(graph))
3.2 死锁问题
问题:
两个 Agent 互相等待
解决方案:
class TimeoutExecutor:
def __init__(self, timeout: int = 30):
self.timeout = timeout
def execute_with_timeout(self, func, *args, **kwargs):
"""超时执行"""
import threading
result = [None]
exception = [None]
def worker():
try:
result[0] = func(*args, **kwargs)
except Exception as e:
exception[0] = e
thread = threading.Thread(target=worker)
thread.start()
thread.join(timeout=self.timeout)
if thread.is_alive():
raise TimeoutError("执行超时")
if exception[0]:
raise exception[0]
return result[0]
面试高频问法
Q1: LangGraph 和传统 Chain 的区别?
标准回答:
LangGraph vs 传统 Chain
【核心区别】
维度 | Chain | LangGraph
---------------|-------------------|-------------------
结构 | 线性(串行) | 图结构(复杂依赖)
可视化 | 无 | ✅ 内置
状态管理 | 简陋 | ✅ TypedDict
循环支持 | 难以实现 | ✅ 天然支持
条件分支 | 条件链 | ✅ 条件边
并行执行 | 无 | ✅ 支持
【何时使用】
场景 | Channel 推荐 | LangGraph 推荐
--------------|-------------|---------------
简单任务 | ✅ | ×
复杂依赖 | × | ✅
需要循环 | × | ✅
需要可视化 | × | ✅
多 Agent 协作 | × | ✅
状态追踪 | × | ✅
【架构对比】
Chain:
Query → Transform → Split → Reduce → Answer
(线性,无法分支)
LangGraph:
Query
↓
┌────┴────┐
↓ ↓ ↓
Search Direct Database
├─────────┴────────┘
↓
Re-rank
↓
┌────┴────┐
↓ ↓
Answer Fallback
(图结构,灵活分支)
记忆要点
工作流编排口诀:
DAG 设计要防环
拓扑排序定执行
状态定义用 TypedDict
条件边控制分支
LangGraph 优势:
图结构支持循环
TypedDict 管理状态
可视化辅助调试
Multi-Agent:
Supervisor 协调
Agent 接口标准化
任务路由动态化
实战场景
场景:自动化报告生成系统
需求:
- 搜集数据
- 分析数据
- 生成图表
- 编写报告
- 邮件分发
实现:
# 见 workflow/langgraph_demo.py
文档版本: 1.0