内容纲要
模块一:Agent 基础范式(详解版)
覆盖:ReAct、Plan&Execute、Tool-Calling、Multi-Agent、Memory、Reflection
目录
必须掌握的概念
1.1 ReAct(Reasoning + Acting)
定义:
ReAct 是一种 Agent 交互范式,通过"思考→行动→观察"的循环来完成任务。
核心流程:
User Query → Thought 1 → Action 1 → Observation 1
↓
Thought 2 → Action 2 → Observation 2
↓
↓
Final Answer
关键要素:
- Thought(思考):分析当前情况,决定下一步行动
- Action(行动):执行具体操作(调用工具、查询等)
- Observation(观察):行动得到的结果
- Final Answer:最终答案
优势:
- 灵活适应动态环境
- 实时决策和纠错
- 适合探索性任务
劣势:
- Token 消耗大(每步都思考)
- 执行不可预测
- 难以复用
适用场景:
- 探索性任务(信息搜集、调试)
- 需要实时交互
- 任务路径不明确
1.2 Plan & Execute
定义:
先制定完整计划,然后按计划执行的双阶段模式。
核心流程:
User Query → Planning (DAG) → Execution → Final Answer
关键要素:
- Planning(规划):分析任务,生成执行计划(DAG)
- Execution(执行):按计划逐步执行
- Plan(计划):任务列表,包含依赖关系
优势:
- Plan 可复用和存档
- 执行过程可追踪
- 支持并行执行(DAG 并行)
- 执行失败可重试特定节点
劣势:
- 需要完整信息才能规划
- 规划错误需要重新规划
- 灵活性较低
适用场景:
- 结构化任务(工作流)
- 需要执行记录存档
- 多步骤依赖关系明确
1.3 Tool Calling / Function Calling
定义:
LLM 通过调用外部函数/工具来扩展能力。
Tool Calling(通用):
{
"name": "search",
"description": "搜索互联网",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "搜索查询"}
},
"required": ["query"]
}
}
Function Calling(OpenAI 标准):
functions = [{
"name": "get_weather",
"description": "获取天气",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}]
response = chat.completions.create(
model="gpt-4",
messages=[...],
functions=functions
)
if response.choices[0].function_call:
# 执行工具
核心要素:
- 工具定义:名称、描述、参数 schema
- 工具注册:工具到名称的映射
- 参数验证:确保调用参数符合 schema
- 权限控制:工具访问权限
1.4 Multi-Agent
定义:
多个专业化的 Agent 协作完成复杂任务。
架构模式:
| 模式 | 说明 | 适用场景 |
|---|---|---|
| Supervisor | 协调器 Agent 分配任务 | 单一复杂任务 |
| Sequential | 顺序执行多个 Agent | 流水线任务 |
| Parallel | 并行执行多个 Agent | 独立任务 |
| Hierarchical | 层级分工(主从) | 组织结构化 |
| Routing | 根据任务类型路由 | 任务分类明确 |
Supervisor 模式:
Supervisor
├─ Researcher (信息搜集)
├─ Analyst (数据分析)
├─ Writer (文档生成)
└─ Reviewer (质量审核)
1.5 Memory(记忆机制)
Memory 类型:
| 类型 | 说明 | 存储方式 | 用途 |
|---|---|---|---|
| 短期记忆 | 对话历史 | 内存(列表) | 最近交互 |
| 长期记忆 | 历史信息 | 向量数据库 | 知识检索 |
| 结构化记忆 | 用户偏好等 | Redis/数据库 | 持久化状态 |
| 情景记忆 | 当前任务上下文 | 共享变量 | 任务执行 |
记忆操作:
add(message): 添加消息get_all(): 获取所有消息get_recent(n): 获取最近 n 条search(query): 语义搜索clear(): 清空记忆
1.6 Reflection(反思机制)
定义:
Agent 对自己的执行结果进行评估和改进。
Reflection 循环:
执行任务 → 评估结果 → 判断满意度
↓ ↓
满意?←否→改进策略
↓
最终结果
核心要素:
- 评估标准:准确性、完整性、相关性
- 反馈生成:给出改进建议
- 策略调整:根据反馈调整执行策略
- 重试机制:最多 N 次反思
关键设计点
2.1 ReAct Agent 接口设计
# agent/interfaces.py
from typing import Protocol, Callable, Dict, List, Optional
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
class Action(Enum):
FINAL = "final"
SEARCH = "search"
CALCULATE = "calculate"
CODE = "code"
@dataclass
class Message:
"""消息基类"""
role: str # user/assistant/tool
content: str
timestamp: datetime = field(default_factory=datetime.now)
metadata: Dict = field(default_factory=dict)
@dataclass
class ToolResult:
"""工具执行结果"""
success: bool
result: str
error: Optional[str] = None
metadata: Dict = field(default_factory=dict)
@dataclass
class AgentState:
"""Agent 状态"""
query: str # 用户查询
thoughts: List[str] = field(default_factory=list) # 思考历史
actions: List[Dict] = field(default_factory=list) # 行动历史
observations: List[str] = field(default_factory=list) # 观察历史
current_iteration: int = 0 # 当前迭代次数
max_iterations: int = 10 # 最大迭代次数
final_answer: Optional[str] = None # 最终答案
error: Optional[str] = None # 错误信息
# LLM 接口
class LLM(Protocol):
def generate(
self,
prompt: str,
**kwargs
) -> str:
"""生成文本"""
...
def generate_with_history(
self,
messages: List[Message],
**kwargs
) -> str:
"""基于历史生成"""
...
# Tool 接口
class Tool(Protocol):
name: str
description: str
parameters_schema: Dict
def execute(self, **kwargs) -> ToolResult:
"""执行工具"""
...
# Memory 接口
class Memory(Protocol):
def add(self, message: Message) -> None:
"""添加消息"""
...
def get_all(self) -> List[Message]:
"""获取所有消息"""
...
def get_recent(self, n: int) -> List[Message]:
"""获取最近 n 条"""
...
def search(self, query: str, k: int = 5) -> List[Message]:
"""语义搜索"""
...
def clear(self) -> None:
"""清空记忆"""
...
# Agent 接口
class Agent(Protocol):
def run(self, query: str) -> AgentState:
"""执行查询"""
...
def _think(self, state: AgentState) -> str:
"""生成思考"""
...
def _act(self, state: AgentState) -> ToolResult:
"""执行行动"""
...
def _should_stop(self, state: AgentState) -> bool:
"""判断是否停止"""
...
2.2 ReAct Agent 完整实现
# agent/react_agent.py
"""
ReAct Agent 完整实现
包含:工具注册、思考-行动循环、状态管理、异常处理
"""
import re
import json
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
# LLM 模拟(实际使用 OpenAI/Anthropic 等)
class MockLLM:
def generate(self, prompt: str) -> str:
"""模拟 LLM 响应"""
# 实际项目使用:openai.ChatCompletion.create()
if "搜索" in prompt.lower():
return "我需要搜索相关信息。搜索(Python 并发编程)"
elif "python" in prompt.lower() and "并发" in prompt.lower():
return "很好,现在我有了信息。最终答案:Python 并发编程是指多个线程同时执行代码,需要使用同步机制确保线程安全。"
elif "最新" in prompt.lower():
return "需要搜索最新版本。搜索(Python 最新版本)"
elif "python 3.13" in prompt.lower():
return "Python 3.13 是当前最新版本,发布了更好的错误消息等特性。最终答案:Python 最新版本是 3.13,主要特性包括更好的错误消息。"
else:
return "最终答案:我不知道。"
# 工具基类
@dataclass
class Tool:
name: str
description: str
parameters: Dict
func: Callable
def execute(self, **kwargs) -> Dict:
try:
result = self.func(**kwargs)
return {"success": True, "result": result}
except Exception as e:
return {"success": False, "error": str(e)}
# 工具注册器
class ToolRegistry:
def __init__(self):
self._tools: Dict[str, Tool] = {}
def register(self, tool: Tool):
self._tools[tool.name] = tool
def get(self, name: str) -> Optional[Tool]:
return self._tools.get(name)
def list_tools(self) -> str:
"""列出所有工具"""
return "\n".join([
f"- {t.name}: {t.description}"
for t in self._tools.values()
])
def get_tool_schemas(self) -> List[Dict]:
"""获取所有工具的 schema"""
return [
{
"name": t.name,
"description": t.description,
"parameters": t.parameters
}
for t in self._tools.values()
]
# 内部工具实现
def search_internet(query: str) -> str:
"""搜索互联网(模拟)"""
results = {
"Python 并发编程": "Python 提供了 thread、threading、multiprocessing 等并发模块。threading 是高级接口,thread 是低级接口。",
"Python 最新版本": "Python 3.13 是当前最新版本,主要特性包括更好的错误消息、性能改进等。",
"异步编程": "Python 的 async/await 语法让异步编程更简洁。asyncio 是标准库的异步框架。"
}
return results.get(query, f"未找到关于 '{query}' 的信息")
def calculate(expression: str) -> str:
"""计算数学表达式"""
try:
# 安全的 eval
result = eval(expression, {"__builtins__": {}}, {})
return str(result)
except Exception as e:
return f"计算错误: {e}"
# ReAct Agent
class ReActAgent:
def __init__(
self,
llm: Callable[[str], str],
max_iterations: int = 10
):
self.llm = llm
self.max_iterations = max_iterations
self.tool_registry = ToolRegistry()
self._register_tools()
def _register_tools(self):
"""注册默认工具"""
self.tool_registry.register(Tool(
name="search",
description="搜索互联网获取信息",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查询"
}
},
"required": ["query"]
},
func=lambda query: search_internet(query)
))
self.tool_registry.register(Tool(
name="calculate",
description="执行数学计算",
parameters={
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "数学表达式"
}
},
"required": ["expression"]
},
func=lambda expr: calculate(expr)
))
def run(self, query: str) -> AgentState:
"""执行查询"""
state = AgentState(query=query, max_iterations=self.max_iterations)
try:
# 思考-行动循环
for iteration in range(self.max_iterations):
state.current_iteration = iteration + 1
print(f"\n{'='* 50}")
print(f"=== 迭代 {state.current_iteration} ===")
# 1. 思考
thought = self._think(state)
state.thoughts.append(thought)
print(f"思考: {thought}")
# 2. 判断是否停止
if self._should_stop(thought):
state.final_answer = self._extract_final_answer(thought)
print(f"✓ 完成,给出最终答案")
break
# 3. 解析并执行行动
action, params = self._parse_action(thought)
if action == "unknown":
print(f"⚠ 无法解析行动,停止")
state.error = "无法解析行动"
break
# 4. 执行行动
observation = self._execute_action(action, params)
state.actions.append({"action": action, "params": params})
state.observations.append(observation)
print(f"行动: {action}({params})")
print(f"观察: {observation}")
except Exception as e:
state.error = str(e)
print(f"✗ 错误: {e}")
return state
def _think(self, state: AgentState) -> str:
"""生成思考"""
tools_info = self.tool_registry.list_tools()
history = self._format_history(state)
prompt = f"""你是一个智能助手,可以思考和使用工具解决问题。
可用工具:
{tools_info}
历史对话:
{history}
请分析当前情况并给出下一步行动。
思考格式:
- 如果需要查询信息:搜索(query)
- 如果需要计算:计算(expression)
- 如果可以给出最终答案:最终答案(your answer)
重要:
1. 只输出一行,包含行动或最终答案
2. 最终答案要包含完整的信息
3. 不要输出无关内容
当前用户问题: {state.query}
请给出你的思考:"""
return self.llm(prompt).strip()
def _should_stop(self, thought: str) -> bool:
"""判断是否应该停止"""
return (
"最终答案" in thought or
"Final Answer" in thought.upper() or
"answer:" in thought.lower()
)
def _extract_final_answer(self, thought: str) -> str:
"""提取最终答案"""
# 尝试多种格式
patterns = [
r"最终答案[::](.+?)(?:$|\n)", # 中文冒号
r"Final Answer[::](.+?)(?:$|\n)", # 英文
r"answer[::](.+?)(?:$|\n)", # 简短
]
for pattern in patterns:
match = re.search(pattern, thought, re.IGNORECASE)
if match:
return match.group(1).strip()
# 如果都没匹配,返回整个 thought
return thought
def _parse_action(self, thought: str) -> tuple:
"""解析行动"""
# 尝试匹配 搜索(...)
search_match = re.search(r"搜索[((](.+?)[))]", thought)
if search_match:
return "search", {"query": search_match.group(1).strip()}
# 尝试匹配 search(...)
search_match = re.search(r"search[((](.+?)[))]", thought)
if search_match:
return "search", {"query": search_match.group(1).strip()}
# 尝试匹配 计算(...)
calc_match = re.search(r"计算[((](.+?)[))]", thought)
if calc_match:
return "calculate", {"expression": calc_match.group(1).strip()}
# 尝试匹配 calculate(...)
calc_match = re.search(r"calculate[((](.+?)[))]", thought)
if calc_match:
return "calculate", {"expression": calc_match.group(1).strip()}
return "unknown", {}
def _execute_action(self, action: str, params: Dict) -> str:
"""执行行动"""
tool = self.tool_registry.get(action)
if tool is None:
return f"未知工具: {action}"
result = tool.execute(**params)
if result.get("success"):
return result["result"]
else:
return f"工具执行失败: {result.get('error')}"
def _format_history(self, state: AgentState) -> List[str]:
"""格式化历史"""
lines = []
for i, (thought, obs) in enumerate(zip(state.thoughts, state.observations)):
lines.append(f"步骤 {i + 1}:")
lines.append(f" 思考: {thought}")
lines.append(f" 观察: {obs}")
return "\n".join(lines) if lines else "(无历史)"
# 使用示例
if __name__ == "__main__":
# 创建 Agent
agent = ReActAgent(llm=MockLLM(), max_iterations=10)
# 执行查询
print("🤖 ReAct Agent 演示")
print("=" * 50)
# 测试用例
test_queries = [
"Python 并发编程是什么?",
"Python 最新版本是什么?"
]
for query in test_queries:
print(f"\n\n{'#' * 50}")
print(f"# 用户问题: {query}")
print(f"{'#' * 50}")
state = agent.run(query)
print(f"\n{'=' * 50}")
print("=== 执行摘要 ===")
print(f"迭代次数: {state.current_iteration}")
print(f"思考步骤: {len(state.thoughts)}")
print(f"行动次数: {len(state.actions)}")
print(f"\n最终答案:")
print(state.final_answer or state.error)
2.3 Plan & Execute Agent
# agent/plan_execute_agent.py
"""
Plan & Execute Agent 实现
包含:任务规划、DAG 执行、并行执行
"""
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
import networkx as nx
@dataclass
class Task:
"""任务定义"""
id: str
description: str
dependencies: List[str] = field(default_factory=list)
status: str = "pending" # pending/running/completed/failed
result: Optional[str] = None
error: Optional[str] = None
@dataclass
class Plan:
"""执行计划"""
tasks: List[Task]
reasoning: str
created_at: datetime = field(default_factory=datetime.now)
@dataclass
class ExecutionResult:
"""执行结果"""
plan: Plan
executed_tasks: List[Task]
final_result: str
execution_time: float
class Planner:
"""规划器:生成任务计划"""
def __init__(self, llm: Callable[[str], str]):
self.llm = llm
def plan(self, query: str) -> Plan:
"""生成计划"""
prompt = f"""分析以下任务,生成执行计划。
任务: {query}
请以以下 JSON 格式输出:
{{
"reasoning": "你的规划思路",
"tasks": [
{{
"id": "task-1",
"description": "任务描述",
"dependencies": []
}},
{{
"id": "task-2",
"description": "任务描述",
"dependencies": ["task-1"]
}}
]
}}
要求:
1. 将复杂任务拆解为可执行的子任务
2. 明确任务之间的依赖关系
3. 依赖关系用 task id 表示
4. 确保没有循环依赖
"""
response = self.llm(prompt)
return self._parse_plan(response)
def _parse_plan(self, response: str) -> Plan:
"""解析计划"""
try:
import json
data = json.loads(response)
tasks = []
for t in data.get("tasks", []):
tasks.append(Task(
id=t["id"],
description=t["description"],
dependencies=t.get("dependencies", [])
))
return Plan(
tasks=tasks,
reasoning=data.get("reasoning", "")
)
except Exception as e:
# 解析失败,返回默认计划
return Plan(
tasks=[Task(id="default", description=response)],
reasoning="无法解析,将整个响应作为单个任务"
)
class Executor:
"""执行器:执行任务计划"""
def __init__(self, tool_registry):
self.tool_registry = tool_registry
def execute_plan(
self,
plan: Plan,
max_parallel: int = 3
) -> ExecutionResult:
"""执行计划"""
start_time = datetime.now()
# 1. 验证 DAG
if not self._validate_dag(plan.tasks):
raise ValueError("计划包含循环依赖")
# 2. 拓扑排序
execution_order = self._topological_sort(plan.tasks)
# 3. 执行任务
executed_tasks = []
for batch in self._group_by_dependencies(execution_order, max_parallel):
# 并行执行一批任务
results = self._execute_batch(batch)
executed_tasks.extend(results)
# 4. 生成最终结果
final_result = self._generate_final_result(plan, executed_tasks)
execution_time = (datetime.now() - start_time).total_seconds()
return ExecutionResult(
plan=plan,
executed_tasks=executed_tasks,
final_result=final_result,
execution_time=execution_time
)
def _validate_dag(self, tasks: List[Task]) -> bool:
"""验证是否有循环依赖"""
# 构建 Graph
graph = nx.DiGraph()
for task in tasks:
graph.add_node(task.id)
for dep in task.dependencies:
graph.add_edge(dep, task.id)
# 检查是否有环
return not nx.is_directed_cyclic_graph(graph)
def _topological_sort(self, tasks: List[Task]) -> List[Task]:
"""拓扑排序"""
# 构建 Graph
graph = nx.DiGraph()
task_map = {t.id: t for t in tasks}
for task in tasks:
graph.add_node(task.id)
for dep in task.dependencies:
graph.add_edge(dep, task.id)
# 拓扑排序
sorted_ids = list(nx.topological_sort(graph))
return [task_map[task_id] for task_id in sorted_ids if task_id in task_map]
def _group_by_dependencies(
self,
tasks: List[Task],
max_parallel: int
) -> List[List[Task]]:
"""分组并行执行"""
batches = []
current_batch = []
current_deps = set()
for task in tasks:
# 检查是否可以加入当前批次
task_deps = set(task.dependencies)
if not current_deps.intersection(task_deps):
# 可以并行执行
current_batch.append(task)
current_deps.update(task_deps)
if len(current_batch) >= max_parallel:
batches.append(current_batch)
current_batch = []
current_deps = set()
else:
# 必依赖前一批,开始新批次
if current_batch:
batches.append(current_batch)
current_batch = [task]
current_deps = set(task_deps)
if current_batch:
batches.append(current_batch)
return batches
def _execute_batch(self, tasks: List[Task]) -> List[Task]:
"""并行执行一批任务"""
import concurrent.futures
executed = []
with concurrent.futures.ThreadPoolExecutor(max_workers=len(tasks)) as executor:
futures = {
executor.submit(self._execute_task, task): task
for task in tasks
}
for future, task in concurrent.futures.as_completed(futures.items()):
try:
result = future.result()
task.status = "completed"
task.result = result
except Exception as e:
task.status = "failed"
task.error = str(e)
executed.append(task)
return executed
def _execute_task(self, task: Task) -> str:
"""执行单个任务"""
# 根据任务描述执行
if "搜索" in task.description:
tool = self.tool_registry.get("search")
if tool:
result = tool.execute(query=task.description.split("搜索")[-1].strip())
return result.get("result", "")
# 模拟执行
return f"完成任务: {task.description}"
def _generate_final_result(
self,
plan: Plan,
executed_tasks: List[Task]
) -> str:
"""生成最终结果"""
# 聚合所有任务结果
results = []
for task in executed_tasks:
if task.status == "completed" and task.result:
results.append(f"任务 {task.id}: {task.result}")
return "\n".join(results) if results else "所有任务已完成"
# Plan & Execute Agent
class PlanExecuteAgent:
def __init__(self, llm, tool_registry):
self.planner = Planner(llm)
self.executor = Executor(tool_registry)
def run(self, query: str) -> ExecutionResult:
"""运行 Plan & Execute"""
print(f"\n{'='* 50}")
print("=== 阶段 1: 规划 ===")
# 1. 规划
plan = self.planner.plan(query)
print(f"规划思路: {plan.reasoning}")
print(f"任务数量: {len(plan.tasks)}")
for i, task in enumerate(plan.tasks, 1):
deps = f" 依赖: {', '.join(task.dependencies)}" if task.dependencies else ""
print(f" 任务{i} [{task.id}]: {task.description}{deps}")
print(f"\n{'='* 50}")
print("=== 阶段 2: 执行 ===")
# 2. 执行
result = self.executor.execute_plan(plan)
print(f"执行时间: {result.execution_time:.2f}秒")
print(f"\n最终结果:\n{result.final_result}")
return result
# 使用示例
if __name__ == "__main__":
from agent.react_agent import ToolRegistry, Tool
# 创建工具注册器
tool_registry = ToolRegistry()
tool_registry.register(Tool(
name="search",
description="搜索互联网",
parameters={"query": {"type": "string"}},
func=lambda query: {"result": f"搜索结果: {query}"}
))
# 创建 Agent
agent = PlanExecuteAgent(llm=MockLLM(), tool_registry=tool_registry)
# 执行查询
result = agent.run("帮我研究 Python 的最新并发特性并写一份总结")
2.4 Multi-Agent 设计
# agent/multi_agent.py
"""
Multi-Agent 系统
包含:Supervisor 模式、Agent 路由、状态管理
"""
from typing import Dict, List, Optional, Protocol
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
import json
from concurrent.futures import ThreadPoolExecutor
@dataclass
class AgentMessage:
"""Agent 之间的消息"""
sender: str
receiver: str
content: str
message_type: str # request/response/notification
@dataclass
class AgentContext:
"""共享上下文"""
user_query: str
messages: List[AgentMessage] = field(default_factory=list)
shared_data: Dict = field(default_factory=dict)
status: str = "running"
class BaseAgent(ABC):
"""Agent 基类"""
def __init__(
self,
name: str,
description: str,
capabilities: List[str]
):
self.name = name
self.description = description
self.capabilities = capabilities
@abstractmethod
def can_handle(self, query: str) -> float:
"""判断是否能处理该任务,返回置信度 [0, 1]"""
...
@abstractmethod
def process(self, context: AgentContext) -> str:
"""处理任务"""
...
class ResearcherAgent(BaseAgent):
"""研究 Agent:信息搜集"""
def __init__(self):
super().__init__(
name="researcher",
description="擅长信息搜集和分析",
capabilities=["搜索", "信息分析"]
)
def can_handle(self, query: str) -> float:
keywords = ["搜索", "查找", "研究", "分析", "信息"]
if any(kw in query.lower() for kw in keywords):
return 0.9
return 0.1
def process(self, context: AgentContext) -> str:
print(f"[{self.name}] 处理任务: {context.user_query}")
# 模拟搜索
result = f"已搜索并分析: {context.user_query} 相关的信息"
print(f"[{self.name}] 完成: {result}")
return result
class WriterAgent(BaseAgent):
"""写作 Agent:文档生成"""
def __init__(self):
super().__init__(
name="writer",
description="擅长内容生成和总结",
capabilities=["写作", "总结", "文档生成"]
)
def can_handle(self, query: str) -> float:
keywords = ["写", "总结", "文档", "生成", "报告"]
if any(kw in query.lower() for kw in keywords):
return 0.9
return 0.1
def process(self, context: AgentContext) -> str:
print(f"[{self.name}] 处理任务: {context.user_query}")
# 获取其他 Agent 的结果
research_results = [
msg.content for msg in context.messages
if msg.sender == "researcher"
]
if research_results:
content = "\n".join(research_results)
result = f"基于研究结果,生成总结文档:\n{content}"
else:
result = f"生成文档: {context.user_query}"
print(f"[{self.name}] 完成: {result}")
return result
class CoderAgent(BaseAgent):
"""编程 Agent:代码生成"""
def __init__(self):
super().__init__(
name="coder",
description="擅长编程和代码分析",
capabilities=["编程", "代码", "算法"]
)
def can_handle(self, query: str) -> float:
keywords = ["代码", "编程", "算法", "函数", "类"]
if any(kw in query.lower() for kw in keywords):
return 0.9
return 0.1
def process(self, context: AgentContext) -> str:
print(f"[{self.name}] 处理任务: {context.user_query}")
result = f"生成代码: {context.user_query}"
print(f"[{self.name}] 完成: {result}")
return result
class SupervisorAgent:
"""Supervisor Agent:协调其他 Agent"""
def __init__(self, agents: List[BaseAgent]):
self.agents = agents
self._routing_cache = {}
def route(
self,
query: str,
context: AgentContext
) -> List[BaseAgent]:
"""路由任务到合适的 Agent"""
# 方案 1: 基于关键词规则路由
selected = []
for agent in self.agents:
confidence = agent.can_handle(query)
if confidence > 0.5:
selected.append((agent, confidence))
# 按置信度排序
selected.sort(key=lambda x: x[1], reverse=True)
# 返回前 N 个
return [agent for agent, in selected]
def execute(
self,
query: str,
max_agents: int = 2
) -> AgentContext:
"""执行任务"""
context = AgentContext(user_query=query)
# 路由
selected_agents = self.route(query, context)[:max_agents]
print(f"\n{'='* 50}")
print(f"[Supervisor] 路由到: {[a.name for a in selected_agents]}")
# 并行执行
with ThreadPoolExecutor(max_workers=len(selected_agents)) as executor:
futures = {
executor.submit(agent.process, context): agent
for agent in selected_agents
}
for future, agent in futures.items():
try:
result = future.result()
# 记录消息
message = AgentMessage(
sender=agent.name,
receiver="supervisor",
content=result,
message_type="response"
)
context.messages.append(message)
except Exception as e:
print(f"[Supervisor] Agent {agent.name} 执行失败: {e}")
return context
# 使用示例
if __name__ == "__main__":
# 创建 Agents
agents = [
ResearcherAgent(),
WriterAgent(),
CoderAgent()
]
# 创建 Supervisor
supervisor = SupervisorAgent(agents)
# 执行任务
tasks = [
"帮我研究 Python 的最新并发特性",
"编写一个 Python 并发示例",
"总结 Python 的并发编程最佳实践"
]
for task in tasks:
print(f"\n\n{'#' * 50}")
print(f"# 任务: {task}")
print(f"{'#' * 50}")
context = supervisor.execute(task, max_agents=2)
print(f"\n最终结果:")
for msg in context.messages:
print(f"[{msg.sender}]: {msg.content}")
常见坑与解决方案
3.1 ReAct 无限循环
问题:
思考: 需要搜索
行动: search(xxx)
观察: 无结果
思考: 需要搜索 # 循环!
行动: search(xxx)
...
原因:
- 没有设置最大迭代次数
- 停止条件不正确
- LLM 陷入循环
解决方案:
class SafeReActAgent(ReActAgent):
def run(self, query: str) -> AgentState:
state = super().run(query)
# 检查是否达到最大迭代次数
if state.current_iteration >= state.max_iterations:
state.error = f"超过最大迭代次数 {state.max_iterations}"
return state
3.2 工具调用错误
问题:
- LLM 调用不存在的工具
- LLM 传递错误参数
解决方案:
class SafeToolRegistry(ToolRegistry):
def call_safely(
self,
tool_name: str,
**kwargs
) -> Dict:
"""安全调用工具"""
# 1. 检查工具存在
tool = self.get(tool_name)
if tool is None:
return {
"success": False,
"error": f"工具不存在: {tool_name}"
}
# 2. 验证参数
validation = self._validate_params(tool, kwargs)
if not validation["valid"]:
return {
"success": False,
"error": f"参数验证失败: {validation['error']}"
}
# 3. 执行工具
try:
return tool.execute(**kwargs)
except Exception as e:
return {
"success": False,
"error": str(e)
}
def _validate_params(self, tool: Tool, params: Dict) -> Dict:
"""验证参数"""
schema = tool.parameters
required = schema.get("required", [])
# 检查必需参数
for req in required:
if req not in params:
return {
"valid": False,
"error": f"缺少必需参数: {req}"
}
return {"valid": True}
3.3 Memory 爆炸
问题:
# 对话不断增长,内存 OOM
memory.add(message) # 每次都添加
解决方案:
class SafeMemory:
def __init__(self, max_size: int = 100):
self._messages = []
self.max_size = max_size
def add(self, message: Message):
self._messages.append(message)
# 滑动窗口:删除旧消息
if len(self._messages) > self.max_size:
self._messages.pop(0)
def compress(self) -> None:
"""压缩旧消息"""
if len(self._messages) > 50:
# 前半部分用 LLM 压缩
old_messages = self._messages[:50]
summary = self._summarize(old_messages)
self._messages = [summary] + self._messages[50:]
面试高频问法
Q1: ReAct 和 Plan & Execute 的区别是什么?什么时候用哪个?
标准回答:
ReAct(Reasoning + Acting)vs Plan & Execute
【核心区别】
维度 | ReAct | Plan & Execute
---------------|------------------------|---------------------------
决策时机 | 每步实时思考 | 一次性规划
可复用性 | 低(依赖上下文) | 高(Plan 可存档)
执行追踪 | 难以追踪 | Plan 就是追踪
Token 消耗 | 高(每步都思考) | 低(规划一次)
纠错能力 | 实时纠错 | 需要重新规划
适用场景 | 探索性、交互式任务 | 结构化、可复用任务
【工程选择决策】
场景 | 推荐 | 原因
------------------------|-------|-----------------------
简单问答 | ReAct | 快速响应,无需规划
探索性任务 | ReAct | 需要实时调整策略
需要执行记录存档 | Plan | Plan 可追溯、复用
复杂任务拆解 | Plan | DAG 结构清晰
需要并行执行 | Plan | Plan 可识别并行任务
多轮对话 | ReAct | 实时响应用户反馈
工作流任务 | Plan | 依赖关系明确
【边界条件】
ReAct 的风险:
- 无限循环:需要设置最大迭代次数(建议 10-20)
- Token 消耗爆炸:长对话成本高
- 状态不一致:思考步骤可能偏离
Plan & Execute 的风险:
- 规划失败:初始规划错误无法执行
- 信息不完整:规划时需要所有信息
- 灵活性差:无法中途调整计划
【混合策略】
实际项目中可以混合使用:
1. 先用 Plan 规划大方向
2. 执行过程中用 ReAct 动态调整
3. Plan 失败时退回 ReAct 模式
【面试关键点】
1. 能清楚说出两者的优缺点
2. 能根据场景选择合适的模式
3. 知道各自的边界条件和风险
4. 有实际项目中的选择经验
记忆要点
ReAct 模式
口诀:
思考-行动-观察循环
最大次数防无限
停止条件要明确
Checklist:
□ 设置最大迭代次数(默认10)
□ 定义停止条件关键词
□ 工具调用失败降级
□ 超时控制
□ Token 消耗监控
Plan & Execute 模式
口诀:
先规划再执行
DAG 依赖要清晰
拓扑排序防循环
Checklist:
□ DAG 验证(无环)
□ 拓扑排序
□ 并行执行优化
□ 任务超时控制
□ 失败重试策略
Tool Calling
口诀:
工具要注册
参数要验证
权限要检查
调用要审计
Checklist:
□ 工具白名单
□ 参数 Schema 验证
□ 权限 RBAC 检查
□ 限流控制
□ 审计日志
□ 异常捕获
Memory
口诀:
短期用内存
长期用向量
混合检索最好
Checklist:
□ 最大窗口大小
□ 压缩策略
□ 去重机制
□ 缓存热点查询
□ 异步写入
最小 Demo(完整可运行)
见上文代码,包含:
- ReAct Agent 完整实现
- Plan & Execute Agent 实现
- Multi-Agent Supervisor 实现
实战场景
场景:企业智能问答系统
需求:
企业需要一个智能问答系统,回答:
- 公司政策文档
- 技术文档
- 产品手册
- 员工手册
要求:
- 回答准确,提供来源
- 支持多轮对话
- 数据更新及时
- 成本可控
架构设计:
┌─────────────────────────────────────────┐
│ API Gateway │
├─────────────────────────────────────────┤
│ Agent Orchestrator │
│ - 意图识别(RAG / 工具调用) │
│ - Agent 路由 │
│ - 对话管理(Memory) │
└──────────────┬──────────────────────────┘
│
┌────────┴────────┬────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ RAG │ │ Tool │ │General │
│ Agent │ │ Agent │ │ Agent │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Vector │ │ 企业 │ │ LLM │
│ Store │ │ 工具 │ │ (API) │
└─────────┘ └─────────┘ └─────────┘
数据流:
# 1. 用户提问
query = "公司的年假政策是什么?"
# 2. Orchestrator 路由
agent_type = orchestrator.detect_type(query)
# → "RAG"
# 3. RAG Agent 执行
context = rag_agent.retrieve(query)
answer = rag_agent.generate(query, context)
# 4. 返回结果
response = {
"answer": answer,
"sources": context.documents,
"agent_type": "RAG"
}
文档版本: 1.0