【AI Agent 知识库】01-Agent基础范式-详解版

内容纲要

模块一:Agent 基础范式(详解版)

覆盖:ReAct、Plan&Execute、Tool-Calling、Multi-Agent、Memory、Reflection


目录


必须掌握的概念

1.1 ReAct(Reasoning + Acting)

定义:
ReAct 是一种 Agent 交互范式,通过"思考→行动→观察"的循环来完成任务。

核心流程:

User Query → Thought 1 → Action 1 → Observation 1
                    ↓
              Thought 2 → Action 2 → Observation 2
                    ↓
                            ↓
                    Final Answer

关键要素:

  • Thought(思考):分析当前情况,决定下一步行动
  • Action(行动):执行具体操作(调用工具、查询等)
  • Observation(观察):行动得到的结果
  • Final Answer:最终答案

优势:

  • 灵活适应动态环境
  • 实时决策和纠错
  • 适合探索性任务

劣势:

  • Token 消耗大(每步都思考)
  • 执行不可预测
  • 难以复用

适用场景:

  • 探索性任务(信息搜集、调试)
  • 需要实时交互
  • 任务路径不明确

1.2 Plan & Execute

定义:
先制定完整计划,然后按计划执行的双阶段模式。

核心流程:

User Query → Planning (DAG) → Execution → Final Answer

关键要素:

  • Planning(规划):分析任务,生成执行计划(DAG)
  • Execution(执行):按计划逐步执行
  • Plan(计划):任务列表,包含依赖关系

优势:

  • Plan 可复用和存档
  • 执行过程可追踪
  • 支持并行执行(DAG 并行)
  • 执行失败可重试特定节点

劣势:

  • 需要完整信息才能规划
  • 规划错误需要重新规划
  • 灵活性较低

适用场景:

  • 结构化任务(工作流)
  • 需要执行记录存档
  • 多步骤依赖关系明确

1.3 Tool Calling / Function Calling

定义:
LLM 通过调用外部函数/工具来扩展能力。

Tool Calling(通用):

{
    "name": "search",
    "description": "搜索互联网",
    "parameters": {
        "type": "object",
        "properties": {
            "query": {"type": "string", "description": "搜索查询"}
        },
        "required": ["query"]
    }
}

Function Calling(OpenAI 标准):

functions = [{
    "name": "get_weather",
    "description": "获取天气",
    "parameters": {
        "type": "object",
        "properties": {
            "location": {"type": "string"}
        },
        "required": ["location"]
    }
}]

response = chat.completions.create(
    model="gpt-4",
    messages=[...],
    functions=functions
)

if response.choices[0].function_call:
    # 执行工具

核心要素:

  • 工具定义:名称、描述、参数 schema
  • 工具注册:工具到名称的映射
  • 参数验证:确保调用参数符合 schema
  • 权限控制:工具访问权限

1.4 Multi-Agent

定义:
多个专业化的 Agent 协作完成复杂任务。

架构模式:

模式 说明 适用场景
Supervisor 协调器 Agent 分配任务 单一复杂任务
Sequential 顺序执行多个 Agent 流水线任务
Parallel 并行执行多个 Agent 独立任务
Hierarchical 层级分工(主从) 组织结构化
Routing 根据任务类型路由 任务分类明确

Supervisor 模式:

Supervisor
    ├─ Researcher (信息搜集)
    ├─ Analyst (数据分析)
    ├─ Writer (文档生成)
    └─ Reviewer (质量审核)

1.5 Memory(记忆机制)

Memory 类型:

类型 说明 存储方式 用途
短期记忆 对话历史 内存(列表) 最近交互
长期记忆 历史信息 向量数据库 知识检索
结构化记忆 用户偏好等 Redis/数据库 持久化状态
情景记忆 当前任务上下文 共享变量 任务执行

记忆操作:

  • add(message): 添加消息
  • get_all(): 获取所有消息
  • get_recent(n): 获取最近 n 条
  • search(query): 语义搜索
  • clear(): 清空记忆

1.6 Reflection(反思机制)

定义:
Agent 对自己的执行结果进行评估和改进。

Reflection 循环:

执行任务 → 评估结果 → 判断满意度
                  ↓         ↓
              满意?←否→改进策略
                  ↓
              最终结果

核心要素:

  • 评估标准:准确性、完整性、相关性
  • 反馈生成:给出改进建议
  • 策略调整:根据反馈调整执行策略
  • 重试机制:最多 N 次反思

关键设计点

2.1 ReAct Agent 接口设计

# agent/interfaces.py
from typing import Protocol, Callable, Dict, List, Optional
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime

class Action(Enum):
    FINAL = "final"
    SEARCH = "search"
    CALCULATE = "calculate"
    CODE = "code"

@dataclass
class Message:
    """消息基类"""
    role: str  # user/assistant/tool
    content: str
    timestamp: datetime = field(default_factory=datetime.now)
    metadata: Dict = field(default_factory=dict)

@dataclass
class ToolResult:
    """工具执行结果"""
    success: bool
    result: str
    error: Optional[str] = None
    metadata: Dict = field(default_factory=dict)

@dataclass
class AgentState:
    """Agent 状态"""
    query: str                              # 用户查询
    thoughts: List[str] = field(default_factory=list)  # 思考历史
    actions: List[Dict] = field(default_factory=list)    # 行动历史
    observations: List[str] = field(default_factory=list)  # 观察历史
    current_iteration: int = 0           # 当前迭代次数
    max_iterations: int = 10            # 最大迭代次数
    final_answer: Optional[str] = None     # 最终答案
    error: Optional[str] = None           # 错误信息

# LLM 接口
class LLM(Protocol):
    def generate(
        self,
        prompt: str,
        **kwargs
    ) -> str:
        """生成文本"""
        ...

    def generate_with_history(
        self,
        messages: List[Message],
        **kwargs
    ) -> str:
        """基于历史生成"""
        ...

# Tool 接口
class Tool(Protocol):
    name: str
    description: str
    parameters_schema: Dict

    def execute(self, **kwargs) -> ToolResult:
        """执行工具"""
        ...

# Memory 接口
class Memory(Protocol):
    def add(self, message: Message) -> None:
        """添加消息"""
        ...

    def get_all(self) -> List[Message]:
        """获取所有消息"""
        ...

    def get_recent(self, n: int) -> List[Message]:
        """获取最近 n 条"""
        ...

    def search(self, query: str, k: int = 5) -> List[Message]:
        """语义搜索"""
        ...

    def clear(self) -> None:
        """清空记忆"""
        ...

# Agent 接口
class Agent(Protocol):
    def run(self, query: str) -> AgentState:
        """执行查询"""
        ...

    def _think(self, state: AgentState) -> str:
        """生成思考"""
        ...

    def _act(self, state: AgentState) -> ToolResult:
        """执行行动"""
        ...

    def _should_stop(self, state: AgentState) -> bool:
        """判断是否停止"""
        ...

2.2 ReAct Agent 完整实现

# agent/react_agent.py
"""
ReAct Agent 完整实现
包含:工具注册、思考-行动循环、状态管理、异常处理
"""

import re
import json
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime

# LLM 模拟(实际使用 OpenAI/Anthropic 等)
class MockLLM:
    def generate(self, prompt: str) -> str:
        """模拟 LLM 响应"""
        # 实际项目使用:openai.ChatCompletion.create()
        if "搜索" in prompt.lower():
            return "我需要搜索相关信息。搜索(Python 并发编程)"
        elif "python" in prompt.lower() and "并发" in prompt.lower():
            return "很好,现在我有了信息。最终答案:Python 并发编程是指多个线程同时执行代码,需要使用同步机制确保线程安全。"
        elif "最新" in prompt.lower():
            return "需要搜索最新版本。搜索(Python 最新版本)"
        elif "python 3.13" in prompt.lower():
            return "Python 3.13 是当前最新版本,发布了更好的错误消息等特性。最终答案:Python 最新版本是 3.13,主要特性包括更好的错误消息。"
        else:
            return "最终答案:我不知道。"

# 工具基类
@dataclass
class Tool:
    name: str
    description: str
    parameters: Dict
    func: Callable

    def execute(self, **kwargs) -> Dict:
        try:
            result = self.func(**kwargs)
            return {"success": True, "result": result}
        except Exception as e:
            return {"success": False, "error": str(e)}

# 工具注册器
class ToolRegistry:
    def __init__(self):
        self._tools: Dict[str, Tool] = {}

    def register(self, tool: Tool):
        self._tools[tool.name] = tool

    def get(self, name: str) -> Optional[Tool]:
        return self._tools.get(name)

    def list_tools(self) -> str:
        """列出所有工具"""
        return "\n".join([
            f"- {t.name}: {t.description}"
            for t in self._tools.values()
        ])

    def get_tool_schemas(self) -> List[Dict]:
        """获取所有工具的 schema"""
        return [
            {
                "name": t.name,
                "description": t.description,
                "parameters": t.parameters
            }
            for t in self._tools.values()
        ]

# 内部工具实现
def search_internet(query: str) -> str:
    """搜索互联网(模拟)"""
    results = {
        "Python 并发编程": "Python 提供了 thread、threading、multiprocessing 等并发模块。threading 是高级接口,thread 是低级接口。",
        "Python 最新版本": "Python 3.13 是当前最新版本,主要特性包括更好的错误消息、性能改进等。",
        "异步编程": "Python 的 async/await 语法让异步编程更简洁。asyncio 是标准库的异步框架。"
    }
    return results.get(query, f"未找到关于 '{query}' 的信息")

def calculate(expression: str) -> str:
    """计算数学表达式"""
    try:
        # 安全的 eval
        result = eval(expression, {"__builtins__": {}}, {})
        return str(result)
    except Exception as e:
        return f"计算错误: {e}"

# ReAct Agent
class ReActAgent:
    def __init__(
        self,
        llm: Callable[[str], str],
        max_iterations: int = 10
    ):
        self.llm = llm
        self.max_iterations = max_iterations
        self.tool_registry = ToolRegistry()
        self._register_tools()

    def _register_tools(self):
        """注册默认工具"""
        self.tool_registry.register(Tool(
            name="search",
            description="搜索互联网获取信息",
            parameters={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "搜索查询"
                    }
                },
                "required": ["query"]
            },
            func=lambda query: search_internet(query)
        ))

        self.tool_registry.register(Tool(
            name="calculate",
            description="执行数学计算",
            parameters={
                "type": "object",
                "properties": {
                    "expression": {
                        "type": "string",
                        "description": "数学表达式"
                    }
                },
                "required": ["expression"]
            },
            func=lambda expr: calculate(expr)
        ))

    def run(self, query: str) -> AgentState:
        """执行查询"""
        state = AgentState(query=query, max_iterations=self.max_iterations)

        try:
            # 思考-行动循环
            for iteration in range(self.max_iterations):
                state.current_iteration = iteration + 1
                print(f"\n{'='* 50}")
                print(f"=== 迭代 {state.current_iteration} ===")

                # 1. 思考
                thought = self._think(state)
                state.thoughts.append(thought)
                print(f"思考: {thought}")

                # 2. 判断是否停止
                if self._should_stop(thought):
                    state.final_answer = self._extract_final_answer(thought)
                    print(f"✓ 完成,给出最终答案")
                    break

                # 3. 解析并执行行动
                action, params = self._parse_action(thought)
                if action == "unknown":
                    print(f"⚠ 无法解析行动,停止")
                    state.error = "无法解析行动"
                    break

                # 4. 执行行动
                observation = self._execute_action(action, params)

                state.actions.append({"action": action, "params": params})
                state.observations.append(observation)
                print(f"行动: {action}({params})")
                print(f"观察: {observation}")

        except Exception as e:
            state.error = str(e)
            print(f"✗ 错误: {e}")

        return state

    def _think(self, state: AgentState) -> str:
        """生成思考"""
        tools_info = self.tool_registry.list_tools()
        history = self._format_history(state)

        prompt = f"""你是一个智能助手,可以思考和使用工具解决问题。

可用工具:
{tools_info}

历史对话:
{history}

请分析当前情况并给出下一步行动。
思考格式:
- 如果需要查询信息:搜索(query)
- 如果需要计算:计算(expression)
- 如果可以给出最终答案:最终答案(your answer)

重要:
1. 只输出一行,包含行动或最终答案
2. 最终答案要包含完整的信息
3. 不要输出无关内容

当前用户问题: {state.query}

请给出你的思考:"""

        return self.llm(prompt).strip()

    def _should_stop(self, thought: str) -> bool:
        """判断是否应该停止"""
        return (
            "最终答案" in thought or
            "Final Answer" in thought.upper() or
            "answer:" in thought.lower()
        )

    def _extract_final_answer(self, thought: str) -> str:
        """提取最终答案"""
        # 尝试多种格式
        patterns = [
            r"最终答案[::](.+?)(?:$|\n)",  # 中文冒号
            r"Final Answer[::](.+?)(?:$|\n)",  # 英文
            r"answer[::](.+?)(?:$|\n)",  # 简短
        ]

        for pattern in patterns:
            match = re.search(pattern, thought, re.IGNORECASE)
            if match:
                return match.group(1).strip()

        # 如果都没匹配,返回整个 thought
        return thought

    def _parse_action(self, thought: str) -> tuple:
        """解析行动"""
        # 尝试匹配 搜索(...)
        search_match = re.search(r"搜索[((](.+?)[))]", thought)
        if search_match:
            return "search", {"query": search_match.group(1).strip()}

        # 尝试匹配 search(...)
        search_match = re.search(r"search[((](.+?)[))]", thought)
        if search_match:
            return "search", {"query": search_match.group(1).strip()}

        # 尝试匹配 计算(...)
        calc_match = re.search(r"计算[((](.+?)[))]", thought)
        if calc_match:
            return "calculate", {"expression": calc_match.group(1).strip()}

        # 尝试匹配 calculate(...)
        calc_match = re.search(r"calculate[((](.+?)[))]", thought)
        if calc_match:
            return "calculate", {"expression": calc_match.group(1).strip()}

        return "unknown", {}

    def _execute_action(self, action: str, params: Dict) -> str:
        """执行行动"""
        tool = self.tool_registry.get(action)
        if tool is None:
            return f"未知工具: {action}"

        result = tool.execute(**params)

        if result.get("success"):
            return result["result"]
        else:
            return f"工具执行失败: {result.get('error')}"

    def _format_history(self, state: AgentState) -> List[str]:
        """格式化历史"""
        lines = []
        for i, (thought, obs) in enumerate(zip(state.thoughts, state.observations)):
            lines.append(f"步骤 {i + 1}:")
            lines.append(f"  思考: {thought}")
            lines.append(f"  观察: {obs}")
        return "\n".join(lines) if lines else "(无历史)"

# 使用示例
if __name__ == "__main__":
    # 创建 Agent
    agent = ReActAgent(llm=MockLLM(), max_iterations=10)

    # 执行查询
    print("🤖 ReAct Agent 演示")
    print("=" * 50)

    # 测试用例
    test_queries = [
        "Python 并发编程是什么?",
        "Python 最新版本是什么?"
    ]

    for query in test_queries:
        print(f"\n\n{'#' * 50}")
        print(f"# 用户问题: {query}")
        print(f"{'#' * 50}")

        state = agent.run(query)

        print(f"\n{'=' * 50}")
        print("=== 执行摘要 ===")
        print(f"迭代次数: {state.current_iteration}")
        print(f"思考步骤: {len(state.thoughts)}")
        print(f"行动次数: {len(state.actions)}")
        print(f"\n最终答案:")
        print(state.final_answer or state.error)

2.3 Plan & Execute Agent

# agent/plan_execute_agent.py
"""
Plan & Execute Agent 实现
包含:任务规划、DAG 执行、并行执行
"""

from typing import List, Dict, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
import networkx as nx

@dataclass
class Task:
    """任务定义"""
    id: str
    description: str
    dependencies: List[str] = field(default_factory=list)
    status: str = "pending"  # pending/running/completed/failed
    result: Optional[str] = None
    error: Optional[str] = None

@dataclass
class Plan:
    """执行计划"""
    tasks: List[Task]
    reasoning: str
    created_at: datetime = field(default_factory=datetime.now)

@dataclass
class ExecutionResult:
    """执行结果"""
    plan: Plan
    executed_tasks: List[Task]
    final_result: str
    execution_time: float

class Planner:
    """规划器:生成任务计划"""

    def __init__(self, llm: Callable[[str], str]):
        self.llm = llm

    def plan(self, query: str) -> Plan:
        """生成计划"""
        prompt = f"""分析以下任务,生成执行计划。

任务: {query}

请以以下 JSON 格式输出:
{{
    "reasoning": "你的规划思路",
    "tasks": [
        {{
            "id": "task-1",
            "description": "任务描述",
            "dependencies": []
        }},
        {{
            "id": "task-2",
            "description": "任务描述",
            "dependencies": ["task-1"]
        }}
    ]
}}

要求:
1. 将复杂任务拆解为可执行的子任务
2. 明确任务之间的依赖关系
3. 依赖关系用 task id 表示
4. 确保没有循环依赖
"""

        response = self.llm(prompt)
        return self._parse_plan(response)

    def _parse_plan(self, response: str) -> Plan:
        """解析计划"""
        try:
            import json
            data = json.loads(response)

            tasks = []
            for t in data.get("tasks", []):
                tasks.append(Task(
                    id=t["id"],
                    description=t["description"],
                    dependencies=t.get("dependencies", [])
                ))

            return Plan(
                tasks=tasks,
                reasoning=data.get("reasoning", "")
            )

        except Exception as e:
            # 解析失败,返回默认计划
            return Plan(
                tasks=[Task(id="default", description=response)],
                reasoning="无法解析,将整个响应作为单个任务"
            )

class Executor:
    """执行器:执行任务计划"""

    def __init__(self, tool_registry):
        self.tool_registry = tool_registry

    def execute_plan(
        self,
        plan: Plan,
        max_parallel: int = 3
    ) -> ExecutionResult:
        """执行计划"""
        start_time = datetime.now()

        # 1. 验证 DAG
        if not self._validate_dag(plan.tasks):
            raise ValueError("计划包含循环依赖")

        # 2. 拓扑排序
        execution_order = self._topological_sort(plan.tasks)

        # 3. 执行任务
        executed_tasks = []

        for batch in self._group_by_dependencies(execution_order, max_parallel):
            # 并行执行一批任务
            results = self._execute_batch(batch)
            executed_tasks.extend(results)

        # 4. 生成最终结果
        final_result = self._generate_final_result(plan, executed_tasks)

        execution_time = (datetime.now() - start_time).total_seconds()

        return ExecutionResult(
            plan=plan,
            executed_tasks=executed_tasks,
            final_result=final_result,
            execution_time=execution_time
        )

    def _validate_dag(self, tasks: List[Task]) -> bool:
        """验证是否有循环依赖"""
        # 构建 Graph
        graph = nx.DiGraph()

        for task in tasks:
            graph.add_node(task.id)
            for dep in task.dependencies:
                graph.add_edge(dep, task.id)

        # 检查是否有环
        return not nx.is_directed_cyclic_graph(graph)

    def _topological_sort(self, tasks: List[Task]) -> List[Task]:
        """拓扑排序"""
        # 构建 Graph
        graph = nx.DiGraph()
        task_map = {t.id: t for t in tasks}

        for task in tasks:
            graph.add_node(task.id)
            for dep in task.dependencies:
                graph.add_edge(dep, task.id)

        # 拓扑排序
        sorted_ids = list(nx.topological_sort(graph))

        return [task_map[task_id] for task_id in sorted_ids if task_id in task_map]

    def _group_by_dependencies(
        self,
        tasks: List[Task],
        max_parallel: int
    ) -> List[List[Task]]:
        """分组并行执行"""
        batches = []
        current_batch = []
        current_deps = set()

        for task in tasks:
            # 检查是否可以加入当前批次
            task_deps = set(task.dependencies)

            if not current_deps.intersection(task_deps):
                # 可以并行执行
                current_batch.append(task)
                current_deps.update(task_deps)

                if len(current_batch) >= max_parallel:
                    batches.append(current_batch)
                    current_batch = []
                    current_deps = set()
            else:
                # 必依赖前一批,开始新批次
                if current_batch:
                    batches.append(current_batch)
                current_batch = [task]
                current_deps = set(task_deps)

        if current_batch:
            batches.append(current_batch)

        return batches

    def _execute_batch(self, tasks: List[Task]) -> List[Task]:
        """并行执行一批任务"""
        import concurrent.futures

        executed = []

        with concurrent.futures.ThreadPoolExecutor(max_workers=len(tasks)) as executor:
            futures = {
                executor.submit(self._execute_task, task): task
                for task in tasks
            }

            for future, task in concurrent.futures.as_completed(futures.items()):
                try:
                    result = future.result()
                    task.status = "completed"
                    task.result = result
                except Exception as e:
                    task.status = "failed"
                    task.error = str(e)

                executed.append(task)

        return executed

    def _execute_task(self, task: Task) -> str:
        """执行单个任务"""
        # 根据任务描述执行
        if "搜索" in task.description:
            tool = self.tool_registry.get("search")
            if tool:
                result = tool.execute(query=task.description.split("搜索")[-1].strip())
                return result.get("result", "")

        # 模拟执行
        return f"完成任务: {task.description}"

    def _generate_final_result(
        self,
        plan: Plan,
        executed_tasks: List[Task]
    ) -> str:
        """生成最终结果"""
        # 聚合所有任务结果
        results = []
        for task in executed_tasks:
            if task.status == "completed" and task.result:
                results.append(f"任务 {task.id}: {task.result}")

        return "\n".join(results) if results else "所有任务已完成"

# Plan & Execute Agent
class PlanExecuteAgent:
    def __init__(self, llm, tool_registry):
        self.planner = Planner(llm)
        self.executor = Executor(tool_registry)

    def run(self, query: str) -> ExecutionResult:
        """运行 Plan & Execute"""
        print(f"\n{'='* 50}")
        print("=== 阶段 1: 规划 ===")

        # 1. 规划
        plan = self.planner.plan(query)
        print(f"规划思路: {plan.reasoning}")
        print(f"任务数量: {len(plan.tasks)}")

        for i, task in enumerate(plan.tasks, 1):
            deps = f" 依赖: {', '.join(task.dependencies)}" if task.dependencies else ""
            print(f"  任务{i} [{task.id}]: {task.description}{deps}")

        print(f"\n{'='* 50}")
        print("=== 阶段 2: 执行 ===")

        # 2. 执行
        result = self.executor.execute_plan(plan)

        print(f"执行时间: {result.execution_time:.2f}秒")
        print(f"\n最终结果:\n{result.final_result}")

        return result

# 使用示例
if __name__ == "__main__":
    from agent.react_agent import ToolRegistry, Tool

    # 创建工具注册器
    tool_registry = ToolRegistry()
    tool_registry.register(Tool(
        name="search",
        description="搜索互联网",
        parameters={"query": {"type": "string"}},
        func=lambda query: {"result": f"搜索结果: {query}"}
    ))

    # 创建 Agent
    agent = PlanExecuteAgent(llm=MockLLM(), tool_registry=tool_registry)

    # 执行查询
    result = agent.run("帮我研究 Python 的最新并发特性并写一份总结")

2.4 Multi-Agent 设计

# agent/multi_agent.py
"""
Multi-Agent 系统
包含:Supervisor 模式、Agent 路由、状态管理
"""

from typing import Dict, List, Optional, Protocol
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
import json
from concurrent.futures import ThreadPoolExecutor

@dataclass
class AgentMessage:
    """Agent 之间的消息"""
    sender: str
    receiver: str
    content: str
    message_type: str  # request/response/notification

@dataclass
class AgentContext:
    """共享上下文"""
    user_query: str
    messages: List[AgentMessage] = field(default_factory=list)
    shared_data: Dict = field(default_factory=dict)
    status: str = "running"

class BaseAgent(ABC):
    """Agent 基类"""

    def __init__(
        self,
        name: str,
        description: str,
        capabilities: List[str]
    ):
        self.name = name
        self.description = description
        self.capabilities = capabilities

    @abstractmethod
    def can_handle(self, query: str) -> float:
        """判断是否能处理该任务,返回置信度 [0, 1]"""
        ...

    @abstractmethod
    def process(self, context: AgentContext) -> str:
        """处理任务"""
        ...

class ResearcherAgent(BaseAgent):
    """研究 Agent:信息搜集"""

    def __init__(self):
        super().__init__(
            name="researcher",
            description="擅长信息搜集和分析",
            capabilities=["搜索", "信息分析"]
        )

    def can_handle(self, query: str) -> float:
        keywords = ["搜索", "查找", "研究", "分析", "信息"]
        if any(kw in query.lower() for kw in keywords):
            return 0.9
        return 0.1

    def process(self, context: AgentContext) -> str:
        print(f"[{self.name}] 处理任务: {context.user_query}")

        # 模拟搜索
        result = f"已搜索并分析: {context.user_query} 相关的信息"
        print(f"[{self.name}] 完成: {result}")

        return result

class WriterAgent(BaseAgent):
    """写作 Agent:文档生成"""

    def __init__(self):
        super().__init__(
            name="writer",
            description="擅长内容生成和总结",
            capabilities=["写作", "总结", "文档生成"]
        )

    def can_handle(self, query: str) -> float:
        keywords = ["写", "总结", "文档", "生成", "报告"]
        if any(kw in query.lower() for kw in keywords):
            return 0.9
        return 0.1

    def process(self, context: AgentContext) -> str:
        print(f"[{self.name}] 处理任务: {context.user_query}")

        # 获取其他 Agent 的结果
        research_results = [
            msg.content for msg in context.messages
            if msg.sender == "researcher"
        ]

        if research_results:
            content = "\n".join(research_results)
            result = f"基于研究结果,生成总结文档:\n{content}"
        else:
            result = f"生成文档: {context.user_query}"

        print(f"[{self.name}] 完成: {result}")

        return result

class CoderAgent(BaseAgent):
    """编程 Agent:代码生成"""

    def __init__(self):
        super().__init__(
            name="coder",
            description="擅长编程和代码分析",
            capabilities=["编程", "代码", "算法"]
        )

    def can_handle(self, query: str) -> float:
        keywords = ["代码", "编程", "算法", "函数", "类"]
        if any(kw in query.lower() for kw in keywords):
            return 0.9
        return 0.1

    def process(self, context: AgentContext) -> str:
        print(f"[{self.name}] 处理任务: {context.user_query}")
        result = f"生成代码: {context.user_query}"
        print(f"[{self.name}] 完成: {result}")
        return result

class SupervisorAgent:
    """Supervisor Agent:协调其他 Agent"""

    def __init__(self, agents: List[BaseAgent]):
        self.agents = agents
        self._routing_cache = {}

    def route(
        self,
        query: str,
        context: AgentContext
    ) -> List[BaseAgent]:
        """路由任务到合适的 Agent"""
        # 方案 1: 基于关键词规则路由
        selected = []
        for agent in self.agents:
            confidence = agent.can_handle(query)
            if confidence > 0.5:
                selected.append((agent, confidence))

        # 按置信度排序
        selected.sort(key=lambda x: x[1], reverse=True)

        # 返回前 N 个
        return [agent for agent, in selected]

    def execute(
        self,
        query: str,
        max_agents: int = 2
    ) -> AgentContext:
        """执行任务"""
        context = AgentContext(user_query=query)

        # 路由
        selected_agents = self.route(query, context)[:max_agents]

        print(f"\n{'='* 50}")
        print(f"[Supervisor] 路由到: {[a.name for a in selected_agents]}")

        # 并行执行
        with ThreadPoolExecutor(max_workers=len(selected_agents)) as executor:
            futures = {
                executor.submit(agent.process, context): agent
                for agent in selected_agents
            }

            for future, agent in futures.items():
                try:
                    result = future.result()

                    # 记录消息
                    message = AgentMessage(
                        sender=agent.name,
                        receiver="supervisor",
                        content=result,
                        message_type="response"
                    )
                    context.messages.append(message)

                except Exception as e:
                    print(f"[Supervisor] Agent {agent.name} 执行失败: {e}")

        return context

# 使用示例
if __name__ == "__main__":
    # 创建 Agents
    agents = [
        ResearcherAgent(),
        WriterAgent(),
        CoderAgent()
    ]

    # 创建 Supervisor
    supervisor = SupervisorAgent(agents)

    # 执行任务
    tasks = [
        "帮我研究 Python 的最新并发特性",
        "编写一个 Python 并发示例",
        "总结 Python 的并发编程最佳实践"
    ]

    for task in tasks:
        print(f"\n\n{'#' * 50}")
        print(f"# 任务: {task}")
        print(f"{'#' * 50}")

        context = supervisor.execute(task, max_agents=2)

        print(f"\n最终结果:")
        for msg in context.messages:
            print(f"[{msg.sender}]: {msg.content}")

常见坑与解决方案

3.1 ReAct 无限循环

问题:

思考: 需要搜索
行动: search(xxx)
观察: 无结果
思考: 需要搜索  # 循环!
行动: search(xxx)
...

原因:

  • 没有设置最大迭代次数
  • 停止条件不正确
  • LLM 陷入循环

解决方案:

class SafeReActAgent(ReActAgent):
    def run(self, query: str) -> AgentState:
        state = super().run(query)

        # 检查是否达到最大迭代次数
        if state.current_iteration >= state.max_iterations:
            state.error = f"超过最大迭代次数 {state.max_iterations}"

        return state

3.2 工具调用错误

问题:

  • LLM 调用不存在的工具
  • LLM 传递错误参数

解决方案:

class SafeToolRegistry(ToolRegistry):
    def call_safely(
        self,
        tool_name: str,
        **kwargs
    ) -> Dict:
        """安全调用工具"""
        # 1. 检查工具存在
        tool = self.get(tool_name)
        if tool is None:
            return {
                "success": False,
                "error": f"工具不存在: {tool_name}"
            }

        # 2. 验证参数
        validation = self._validate_params(tool, kwargs)
        if not validation["valid"]:
            return {
                "success": False,
                "error": f"参数验证失败: {validation['error']}"
            }

        # 3. 执行工具
        try:
            return tool.execute(**kwargs)
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }

    def _validate_params(self, tool: Tool, params: Dict) -> Dict:
        """验证参数"""
        schema = tool.parameters
        required = schema.get("required", [])

        # 检查必需参数
        for req in required:
            if req not in params:
                return {
                    "valid": False,
                    "error": f"缺少必需参数: {req}"
                }

        return {"valid": True}

3.3 Memory 爆炸

问题:

# 对话不断增长,内存 OOM
memory.add(message)  # 每次都添加

解决方案:

class SafeMemory:
    def __init__(self, max_size: int = 100):
        self._messages = []
        self.max_size = max_size

    def add(self, message: Message):
        self._messages.append(message)

        # 滑动窗口:删除旧消息
        if len(self._messages) > self.max_size:
            self._messages.pop(0)

    def compress(self) -> None:
        """压缩旧消息"""
        if len(self._messages) > 50:
            # 前半部分用 LLM 压缩
            old_messages = self._messages[:50]
            summary = self._summarize(old_messages)
            self._messages = [summary] + self._messages[50:]

面试高频问法

Q1: ReAct 和 Plan & Execute 的区别是什么?什么时候用哪个?

标准回答:

ReAct(Reasoning + Acting)vs Plan & Execute

【核心区别】

维度           | ReAct                  | Plan & Execute
---------------|------------------------|---------------------------
决策时机        | 每步实时思考          | 一次性规划
可复用性        | 低(依赖上下文)       | 高(Plan 可存档)
执行追踪        | 难以追踪              | Plan 就是追踪
Token 消耗      | 高(每步都思考)       | 低(规划一次)
纠错能力        | 实时纠错              | 需要重新规划
适用场景        | 探索性、交互式任务    | 结构化、可复用任务

【工程选择决策】

场景                    | 推荐 | 原因
------------------------|-------|-----------------------
简单问答              | ReAct | 快速响应,无需规划
探索性任务            | ReAct | 需要实时调整策略
需要执行记录存档      | Plan  | Plan 可追溯、复用
复杂任务拆解          | Plan  | DAG 结构清晰
需要并行执行          | Plan  | Plan 可识别并行任务
多轮对话              | ReAct | 实时响应用户反馈
工作流任务            | Plan  | 依赖关系明确

【边界条件】

ReAct 的风险:
- 无限循环:需要设置最大迭代次数(建议 10-20)
- Token 消耗爆炸:长对话成本高
- 状态不一致:思考步骤可能偏离

Plan & Execute 的风险:
- 规划失败:初始规划错误无法执行
- 信息不完整:规划时需要所有信息
- 灵活性差:无法中途调整计划

【混合策略】

实际项目中可以混合使用:
1. 先用 Plan 规划大方向
2. 执行过程中用 ReAct 动态调整
3. Plan 失败时退回 ReAct 模式

【面试关键点】

1. 能清楚说出两者的优缺点
2. 能根据场景选择合适的模式
3. 知道各自的边界条件和风险
4. 有实际项目中的选择经验

记忆要点

ReAct 模式

口诀:
思考-行动-观察循环
最大次数防无限
停止条件要明确

Checklist:
□ 设置最大迭代次数(默认10)
□ 定义停止条件关键词
□ 工具调用失败降级
□ 超时控制
□ Token 消耗监控

Plan & Execute 模式

口诀:
先规划再执行
DAG 依赖要清晰
拓扑排序防循环

Checklist:
□ DAG 验证(无环)
□ 拓扑排序
□ 并行执行优化
□ 任务超时控制
□ 失败重试策略

Tool Calling

口诀:
工具要注册
参数要验证
权限要检查
调用要审计

Checklist:
□ 工具白名单
□ 参数 Schema 验证
□ 权限 RBAC 检查
□ 限流控制
□ 审计日志
□ 异常捕获

Memory

口诀:
短期用内存
长期用向量
混合检索最好

Checklist:
□ 最大窗口大小
□ 压缩策略
□ 去重机制
□ 缓存热点查询
□ 异步写入

最小 Demo(完整可运行)

见上文代码,包含:

  1. ReAct Agent 完整实现
  2. Plan & Execute Agent 实现
  3. Multi-Agent Supervisor 实现

实战场景

场景:企业智能问答系统

需求:
企业需要一个智能问答系统,回答:

  • 公司政策文档
  • 技术文档
  • 产品手册
  • 员工手册

要求:

  • 回答准确,提供来源
  • 支持多轮对话
  • 数据更新及时
  • 成本可控

架构设计:

┌─────────────────────────────────────────┐
│             API Gateway               │
├─────────────────────────────────────────┤
│         Agent Orchestrator             │
│  - 意图识别(RAG / 工具调用)        │
│  - Agent 路由                         │
│  - 对话管理(Memory)                 │
└──────────────┬──────────────────────────┘
               │
      ┌────────┴────────┬────────┐
      ▼                 ▼        ▼
┌─────────┐   ┌─────────┐ ┌─────────┐
│ RAG     │   │ Tool    │ │General  │
│ Agent   │   │ Agent   │ │ Agent   │
└────┬────┘   └────┬────┘ └────┬────┘
     │               │           │
     ▼               ▼           ▼
┌─────────┐   ┌─────────┐   ┌─────────┐
│ Vector  │   │ 企业    │   │  LLM    │
│  Store  │   │  工具    │   │  (API)  │
└─────────┘   └─────────┘   └─────────┘

数据流:

# 1. 用户提问
query = "公司的年假政策是什么?"

# 2. Orchestrator 路由
agent_type = orchestrator.detect_type(query)
# → "RAG"

# 3. RAG Agent 执行
context = rag_agent.retrieve(query)
answer = rag_agent.generate(query, context)

# 4. 返回结果
response = {
    "answer": answer,
    "sources": context.documents,
    "agent_type": "RAG"
}

文档版本: 1.0

close
arrow_upward