【AI Agent 知识库】AI Agent面试与实战-强化记忆版

内容纲要

AI Agent 开发:面试与实战强化记忆版

面试应答 + 记忆要点 + 实战场景


目录


模块一:Agent 基础范式

面试问答

Q1: ReAct 和 Plan & Execute 的区别是什么?什么时候用哪个?

标准回答:

ReAct(Reasoning + Acting)vs Plan & Execute

【核心区别】

维度           | ReAct                  | Plan & Execute
---------------|------------------------|---------------------------
决策时机        | 每步实时思考          | 一次性规划
可复用性        | 低(依赖上下文)       | 高(Plan 可存档)
执行追踪        | 难以追踪              | Plan 就是追踪
Token 消耗      | 高(每步都思考)       | 低(规划一次)
纠错能力        | 实时纠错              | 需要重新规划
适用场景        | 探索性、交互式任务    | 结构化、可复用任务

【工程选择决策】

场景                    | 推荐 | 原因
------------------------|-------|-----------------------
简单问答              | ReAct | 快速响应,无需规划
探索性任务            | ReAct | 需要实时调整策略
需要执行记录存档      | Plan  | Plan 可追溯、复用
复杂任务拆解          | Plan  | DAG 结构清晰
需要并行执行          | Plan  | Plan 可识别并行任务
多轮对话              | ReAct | 实时响应用户反馈

【边界条件】

ReAct 的风险:
- 无限循环:需要设置最大迭代次数
- Token 消耗爆炸:长对话成本高
- 状态不一致:思考步骤可能偏离

Plan & Execute 的风险:
- 规划失败:初始规划错误无法执行
- 信息不完整:规划时需要所有信息
- 灵活性差:无法中途调整计划

【实战建议】

1. 混合模式:先用 Plan 规划,执行时用 ReAct 动态调整
2. 超时控制:Plan 执行设置超时,超时后回退到 ReAct
3. 缓存 Plan:相同任务的 Plan 可复用
4. Plan 校验:规划后进行语法和依赖检查

记忆要点:

口诀:
ReAct 实时思考探索性
Plan 一次性规划可复用

选择决策:
简单交互 → ReAct
复杂可追踪 → Plan
需要存档 → Plan

边界:
ReAct 防循环、防 token 爆炸
Plan  防规划失败、防信息不全

Checklist:

# ReAct 检查清单
ReAct_Checklist = [
    "设置最大迭代次数(默认10)",
    "设置超时时间(默认30s)",
    "添加停止条件关键词",
    "Token 计数监控",
    "异常捕获和降级"
]

# Plan & Execute 检查清单
Plan_Checklist = [
    "规划前信息完整性检查",
    "Plan 依赖关系验证",
    "Plan 语法校验",
    "任务执行超时控制",
    "失败任务重试策略",
    "Plan 版本管理"
]

Q2: 如何设计一个可扩展的 Multi-Agent 系统?

标准回答:

Multi-Agent 系统设计原则

【1. 架构分层】

┌─────────────────────────────────────────────┐
│         Orchestrator(编排层)           │
│  - 任务分解                              │
│  - Agent 路由                            │
│  - 状态管理                              │
│  - 结果聚合                              │
└──────────────┬──────────────────────────┘
               │
        ┌──────┴──────┬──────┬──────┐
        ▼             ▼      ▼      ▼
    Researcher    Writer  Coder  Tester
    (研究 Agent)  (写作)  (编程)  (测试)
        │             │      │      │
        └──────┬──────┴──────┴──────┘
               │
               ▼
         Shared Memory
         (共享记忆)

【2. Agent 接口标准化】

class Agent(Protocol):
    name: str
    description: str
    capabilities: List[str]

    def execute(self, task: Task) -> Result:
        """执行任务,返回结构化结果"""
        ...

    def can_handle(self, task: Task) -> bool:
        """判断是否能处理该任务"""
        ...

# 任意 Agent 实现此接口即可接入系统

【3. 通信协议定义】

@dataclass
class AgentMessage:
    id: str
    sender: str           # 发送 Agent
    receiver: str         # 接收 Agent
    task: Task
    context: Dict[str, Any]
    timestamp: datetime

@dataclass
class AgentResponse:
    message_id: str
    status: str          # success/failed/partial
    result: Any
    error: Optional[str]
    metadata: Dict

【4. 路由机制】

class AgentRouter(Protocol):
    def route(self, task: Task) -> str:
        """路由任务到合适的 Agent"""
        ...

# 路由策略实现

class RuleBasedRouter(AgentRouter):
    def route(self, task: Task) -> str:
        """基于规则的路由"""
        if "search" in task.keywords:
            return "researcher"
        elif "write" in task.keywords:
            return "writer"
        elif "code" in task.keywords:
            return "coder"
        return "general"

class VectorRouter(AgentRouter):
    def __init__(self):
        self.embedder = EmbeddingModel()
        self.agent_embeddings = {
            "researcher": self.embedder("搜索、查询、研究"),
            "writer": self.embedder("写作、总结、文档"),
            "coder": self.embedder("编程、代码、开发")
        }

    def route(self, task: Task) -> str:
        """基于语义相似度路由"""
        task_embedding = self.embedder(task.description)
        similarities = {
            agent: cosine(task_embedding, emb)
            for agent, emb in self.agent_embeddings.items()
        }
        return max(similarities, key=similarities.get)

class LLMRouter(AgentRouter):
    def route(self, task: Task) -> str:
        """基于 LLM 的路由"""
        prompt = f"""
        任务描述:{task.description}

        可用 Agents:
        - researcher: 搜索和信息分析
        - writer: 文档生成和总结
        - coder: 编程任务

        请选择最合适的 Agent。
        """
        decision = llm.generate(prompt)
        return decision.strip()

【5. 状态管理】

class SharedState:
    def __init__(self):
        self._store = {}
        self._lock = Lock()

    def get(self, key: str) -> Any:
        with self._lock:
            return self._store.get(key)

    def set(self, key: str, value: Any):
        with self._lock:
            self._store[key] = value

    def publish(self, event: str, data: Any):
        """发布事件,通知所有 Agent"""
        ...

    def subscribe(self, agent: str, event: str):
        """Agent 订阅事件"""
        ...

【6. 超时与容错】

class AgentOrchestrator:
    def __init__(self, timeout: int = 30, max_retries: int = 3):
        self.timeout = timeout
        self.max_retries = max_retries

    def execute_task(self, task: Task) -> Result:
        agent = self.router.route(task)

        for attempt in range(self.max_retries):
            try:
                # 超时控制
                result = with_timeout(
                    agent.execute(task),
                    timeout=self.timeout
                )
                return result

            except TimeoutError:
                log.warning(f"Agent {agent} 超时,重试 {attempt + 1}")

            except Exception as e:
                log.error(f"Agent {agent} 执行失败: {e}")

        # 所有重试失败,降级处理
        return self.fallback(task)

【7. 可观测性】

class AgentMetrics:
    def __init__(self):
        self._start_times = {}
        self._counts = defaultdict(int)
        self._errors = defaultdict(int)

    def record_start(self, agent: str, task_id: str):
        self._start_times[(agent, task_id)] = time.time()

    def record_end(self, agent: str, task_id: str):
        duration = time.time() - self._start_times.pop((agent, task_id))
        self._counts[agent] += 1
        return duration

    def record_error(self, agent: str):
        self._errors[agent] += 1

    def get_stats(self, agent: str) -> Dict:
        return {
            "total_calls": self._counts[agent],
            "errors": self._errors[agent],
            "success_rate": (
                (self._counts[agent] - self._errors[agent]) /
                max(1, self._counts[agent])
            )
        }

记忆要点:

架构口诀:
上 Orchestrator(编排中枢)
下 Agents(专业分工)
中间 Shared Memory(状态共享)

设计原则:
1. Agent 接口标准化
2. 通信协议清晰
3. 路由策略可插拔
4. 状态管理集中
5. 超时容错完善
6. 可观测性充分

扩展方式:
新增 Agent = 实现接口 + 注册到路由表
新增路由 = 实现 Router + 配置

Q3: Memory 怎么设计才能兼顾性能和效果?

标准回答:

Memory 设计:性能 vs 效果权衡

【三层 Memory 架构】

┌─────────────────────────────────────────┐
│         应用层(Agent 使用)           │
│    agent.memory.get(query, k=5)        │
└──────────────┬────────────────────────┘
               │
               ▼
┌─────────────────────────────────────────┐
│      Memory Manager(管理层)          │
│  - 合并检索结果                         │
│  - 去重                                 │
│  - 排序                                 │
│  - 缓存策略                             │
└──────┬────────┬────────────────────────┘
       │        │
       ▼        ▼
┌──────────┐  ┌────────────────┐
│  短期     │  │   长期         │
│  Memory   │  │   Memory       │
│ (内存)   │  │  (向量库)      │
│ FIFO/LRU │  │ 查询 + 聚类   │
└──────────┘  └────────────────┘

【1. 短期 Memory 设计】

class ShortTermMemory:
    def __init__(self, max_size: int = 100, strategy: str = "FIFO"):
        self.max_size = max_size
        self.strategy = strategy
        self._messages = []

    def add(self, message: Message):
        if self.strategy == "FIFO":
            self._messages.append(message)
            if len(self._messages) > self.max_size:
                self._messages.pop(0)  # 队首出

        elif self.strategy == "LRU":
            # 查询时更新访问时间
            if message in self._messages:
                self._messages.remove(message)
            self._messages.append(message)
            if len(self._messages) > self.max_size:
                self._messages.pop(0)

    def get_recent(self, n: int) -> List[Message]:
        return self._messages[-n:]

# 优化:
# - 最大窗口:限制 token 数量(如 4000 tokens)
# - 压缩策略:多条消息用 LLM 压缩成摘要
# - 去重:相同内容只保留一条

【2. 长期 Memory 设计】

class LongTermMemory:
    def __init__(
        self,
        vector_store: VectorStore,
        embedder: EmbeddingModel,
        chunk_size: int = 500,
        overlap: int = 50
    ):
        self.store = vector_store
        self.embedder = embedder
        self.chunk_size = chunk_size
        self.overlap = overlap

    def add_document(self, text: str, metadata: Dict):
        # 1. 切分文档(保留语义)
        chunks = self._chunk(text, self.chunk_size, self.overlap)

        # 2. 批量嵌入
        embeddings = self.embedder.embed_batch(chunks)

        # 3. 存储到向量库
        for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
            self.store.add(
                id=f"{metadata['doc_id']}_{i}",
                vector=embedding,
                metadata={
                    "content": chunk,
                    **metadata,
                    "chunk_index": i
                }
            )

    def search(
        self,
        query: str,
        top_k: int = 5,
        filters: Dict = None
    ) -> List[Message]:
        # 1. 嵌入查询
        query_embedding = self.embedder.embed(query)

        # 2. 向量搜索
        results = self.store.search(
            query_vector=query_embedding,
            top_k=top_k * 2,  # 多检索一些
            filters=filters
        )

        # 3. 去重(同一文档的多个 chunk 只保留最好的)
        deduped = self._deduplicate_by_doc(results)

        # 4. 返回 top_k
        return deduped[:top_k]

    def _chunk(
        self,
        text: str,
        chunk_size: int,
        overlap: int
    ) -> List[str]:
        """语义切分"""
        chunks = []
        start = 0
        while start < len(text):
            end = start + chunk_size
            chunk = text[start:end]
            chunks.append(chunk)
            start = end - overlap
        return chunks

# 优化:
# - 嵌入缓存:相同查询复用嵌入结果
# - 批量操作:减少 API 调用
# - 异步写入:不阻塞主流程
# - 索引优化:向量库建立索引加速搜索

【3. Memory Manager(合并策略)】

class MemoryManager:
    def __init__(self):
        self.short_term = ShortTermMemory(max_size=50)
        self.long_term = LongTermMemory(...)
        self._query_cache = LRUCache(maxsize=1000)

    def retrieve(
        self,
        query: str,
        k: int = 5,
        use_cache: bool = True
    ) -> List[Message]:
        # 1. 缓存命中
        cache_key = f"{query}:{k}"
        if use_cache and cache_key in self._query_cache:
            return self._query_cache[cache_key]

        # 2. 并行检索
        with ThreadPoolExecutor(max_workers=2) as executor:
            short_future = executor.submit(
                self.short_term.get_recent, min(3, k)
            )
            long_future = executor.submit(
                self.long_term.search, query, max(k, 5)
            )

            short_results = short_future.result()
            long_results = long_future.result()

        # 3. 合并去重
        all_results = self._merge_and_deduplicate(
            short_results,
            long_results
        )

        # 4. 排序(按相关性)
        sorted_results = self._rerank(query, all_results)

        # 5. 截断
        final_results = sorted_results[:k]

        # 6. 缓存
        if use_cache:
            self._query_cache[cache_key] = final_results

        return final_results

    def _merge_and_deduplicate(
        self,
        short: List[Message],
        long: List[Message]
    ) -> List[Message]:
        # 优先短期记忆
        seen_ids = {m.id for m in short}
        deduped = list(short)

        # 添加长期记忆中未重复的
        for m in long:
            if m.id not in seen_ids:
                deduped.append(m)
                seen_ids.add(m.id)

        return deduped

    def _rerank(
        self,
        query: str,
        messages: List[Message]
    ) -> List[Message]:
        # 使用 Cross-Encoder 重排
        pairs = [(query, m.content) for m in messages]
        scores = self.reranker.predict(pairs)

        for m, score in zip(messages, scores):
            m.score = score

        return sorted(messages, key=lambda m: m.score, reverse=True)

【性能优化清单】

优化项                | 短期 Memory    | 长期 Memory
----------------------|----------------|----------------
嵌入缓存              | N/A            | ✅ LRU 缓存
批量嵌入              | N/A            | ✅ 批量 API 调用
异步写入              | N/A            | ✅ 后台队列
索引优化              | N/A            | ✅ HNSW 索引
查询压缩              | ✅ LLM 压缩   | N/A
去重策略              | ✅ 内容哈希   | ✅ 文档级去重
并发检索              | N/A            | ✅ 混合检索

记忆要点:

Memory 三层:
1. 短期:内存、FIFO/LRU、快速
2. 长期:向量库、语义检索、持久
3. 管理:合并、去重、排序、缓存

优化口诀:
短期用内存,长期用向量
查询先缓存,写入要异步
批量做嵌入,混合去重排

Q4: Tool Calling 的安全风险有哪些?如何防御?

标准回答:

Tool Calling 安全风险与防御

【1. 风险清单】

风险类型            | 危害程度 | 场景
--------------------|----------|----------------------
注入攻击             | 高       | 恶意 Prompt 修改工具调用
越权调用             | 高       | Agent 调用无权限工具
参数注入             | 中       | 参数包含 SQL/命令注入
无限循环             |   中   | 工具互相调用
敏感信息泄露         | 高       | 工具返回包含 PII
资源耗尽             | 中       | 高频调用消耗配额

【2. 防御方案】

方案 ①:工具白名单

class SecureToolManager:
    def __init__(self, allowed_tools: Set[str]):
        self._allowed_tools = allowed_tools
        self._dangerous_patterns = [
            "delete", "drop", "truncate", "exec", "eval"
        ]

    def call(self, tool_name: str, **kwargs) -> Any:
        # 检查白名单
        if tool_name not in self._allowed_tools:
            raise SecurityError(f"工具未授权: {tool_name}")

        # 检查危险关键词
        if any(p in tool_name.lower() for p in self._dangerous_patterns):
            self._log_security_event(tool_name, kwargs)
            raise SecurityError(f"危险工具: {tool_name}")

        return self._execute_tool(tool_name, kwargs)

方案 ②:参数校验

from pydantic import BaseModel, Field, validator

class SearchInput(BaseModel):
    query: str = Field(..., min_length=1, max_length=1000)

    @validator('query')
    def validate_no_sql(cls, v):
        sql_keywords = ['select', 'insert', 'update', 'delete', 'drop']
        if any(kw in v.lower() for kw in sql_keywords):
            raise ValueError("查询包含 SQL 关键词")
        return v

class WeatherInput(BaseModel):
    location: str = Field(..., pattern=r'^[a-zA-Z\s]+$')
    unit: str = Field(default="celsius", regex="^(celsius|fahrenheit)$")

    @validator('location')
    def validate_location_length(cls, v):
        if len(v) > 100:
            raise ValueError("地点名称过长")
        return v

方案 ③:权限控制

class ToolPermissionManager:
    def __init__(self):
        self._tool_permissions = {
            "get_weather": {Permission.READ},
            "get_user_info": {Permission.READ},
            "update_user": {Permission.WRITE},
            "delete_user": {Permission.ADMIN}
        }

    def check_permission(
        self,
        tool_name: str,
        user_permissions: Set[Permission]
    ) -> bool:
        required = self._tool_permissions.get(tool_name, set())
        return required.issubset(user_permissions)

方案 ④:限流

class RateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client

    def check_rate_limit(
        self,
        tool_name: str,
        user_id: str,
        limit: int = 10,
        window: int = 60
    ) -> bool:
        key = f"rate:{tool_name}:{user_id}"
        count = self.redis.incr(key)

        if count == 1:
            self.redis.expire(key, window)

        return count <= limit

方案 ⑤:审计日志

class AuditLogger:
    def log_tool_call(
        self,
        tool_name: str,
        user_id: str,
        parameters: Dict,
        result: Any,
        status: str
    ):
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "tool_name": tool_name,
            "user_id": user_id,
            "parameters": self._sanitize_params(parameters),
            "result_status": status,
            "success": status == "success"
        }
        self._write_log(log_entry)

    def _sanitize_params(self, params: Dict) -> Dict:
        """脱敏处理"""
        sanitized = params.copy()
        for key in ['password', 'token', 'secret', 'api_key']:
            if key in sanitized:
                sanitized[key] = "***REDACTED***"
        return sanitized

方案 ⑥:输出过滤

class OutputFilter:
    def __init__(self):
        self._pii_patterns = [
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            r'\b\d{3}-\d{4}-\d{4}\b',  # 手机号
            r'\b\d{18}\b'  # 身份证
        ]

    def filter(self, text: str) -> str:
        for pattern in self._pii_patterns:
            text = re.sub(pattern, '[已脱敏]', text)
        return text

【3. 综合防御实现】

class SecureAgentToolManager:
    def __init__(
        self,
        allowed_tools: Set[str],
        audit_logger: AuditLogger
    ):
        self._tool_registry = ToolRegistry()
        self._allowed_tools = allowed_tools
        self._permission_manager = ToolPermissionManager()
        self._rate_limiter = RateLimiter(redis)
        self._audit_logger = audit_logger
        self._output_filter = OutputFilter()

    def call_tool(
        self,
        tool_name: str,
        user_id: str,
        user_permissions: Set[Permission],
        **kwargs
    ) -> str:
        # 1. 检查白名单
        if tool_name not in self._allowed_tools:
            raise SecurityError(f"工具未授权: {tool_name}")

       udi.logger = AuditLogger(redis_client)

# 审计日志
audit_logger.log_tool_call(
    tool_name="get_user",
    user_id="user123",
    parameters={"id": 456},
    result="用户信息",
    status="success"
)

# 输出过滤
output_filter = OutputFilter()
filtered = output_filter.filter("联系邮箱 test@example.com")
# 输出: "联系邮箱 [已脱敏]"

记忆要点:

安全六道防线:
1. 白名单:只允许注册的工具
2. 参数校验:Pydantic 验证 + 正则
3. 权限控制:RBAC 检查
4. 限流:防止滥用
5. 审计:记录所有调用
6. 输出过滤:PII 脱敏

口诀:
白名单要控制
参数要校验
权限要检查
调用要限流
操作要审计
输出要脱敏

Q5: Agent 的 Reflection 机制如何实现?有什么应用场景?

标准回答:

Agent Reflection 机制

【1. Reflection 原理】

Reflection = Agent 自我评估 + 自我改进

┌─────────────────────────────────────────┐
│         Reflection 循环               │
│                                         │
│   执行任务 → 评估结果 → 判断满意度   │
│       ↓         ↓         ↓           │
│   满意?←是→结束  ←否→改进策略      │
│       ↑                   ↑           │
│       └────重新执行──────┘           │
└─────────────────────────────────────────┘

【2. 核心实现】

from typing import Tuple, List
from dataclasses import dataclass

@dataclass
class ReflectionResult:
    is_satisfactory: bool
    confidence: float
    feedback: str
    suggestions: List[str]

class ReflectiveAgent:
    def __init__(
        self,
        llm,
        max_reflections: int = 3,
        satisfaction_threshold: float = 0.8
    ):
        self.llm = llm
        self.max_reflections = max_reflections
        self.threshold = satisfaction_threshold

    def execute_with_reflection(
        self,
        task: str,
        execute_func: Callable
    ) -> Tuple[Any, List[Dict]]:
        """带反思的执行"""
        history = []

        for attempt in range(self.max_reflections + 1):
            # 执行任务
            result = execute_func(task)

            # 第一次直接返回
            if attempt == 0:
                history.append({
                    "attempt": attempt,
                    "result": result,
                    "reflection": None
                })
                return result, history

            # 反思评估
            reflection = self._reflect(task, result, history)
            history.append({
                "attempt": attempt,
                "result": result,
                "reflection": reflection
            })

            # 判断是否满意
            if reflection.is_satisfactory:
                return result, history

            # 改进任务描述
            task = self._improve_task(task, reflection.feedback)

        # 达到最大次数,返回最后一次
        return result, history

    def _reflect(
        self,
        original_task: str,
        result: str,
        history: List[Dict]
    ) -> ReflectionResult:
        """反思评估"""
        # 构建反思 Prompt
        prompt = self._build_reflection_prompt(
            original_task, result, history
        )

        response = self.llm.generate(prompt)
        return self._parse_reflection(response)

    def _build_reflection_prompt(
        self,
        task: str,
        result: str,
        history: List[Dict]
    ) -> str:
        """构建反思 Prompt"""
        attempts_summary = "\n".join([
            f"尝试 {h['attempt'] + 1}: {h['result'][:200]}..."
            for h in history
        ])

        return f"""请评估以下任务执行结果:

原始任务:{task}

执行历史:
{attempts_summary}

当前结果:
{result}

请从以下角度评估:
1. 结果是否完整回答了问题?
2. 结果是否准确可信?
3. 是否需要更多信息?
4. 有哪些可以改进的地方?

请以以下 JSON 格式回答:
{{
    "is_satisfactory": true/false,
    "confidence": 0.0-1.0,
    "feedback": "评估反馈",
    "suggestions": ["改进建议1", "改进建议2"]
}}"""

    def _improve_task(self, task: str, feedback: str) -> str:
        """根据反馈改进任务"""
        prompt = f"""基于以下反馈,改进任务描述:

原始任务:{task}

反馈:{feedback}

请生成改进后的任务描述,更明确地指导执行。"""

        improved_task = self.llm.generate(prompt)
        return improved_task.strip()

    def _parse_reflection(self, response: str) -> ReflectionResult:
        """解析反思结果"""
        try:
            import json
            data = json.loads(response)
            return ReflectionResult(**data)
        except:
            # 解析失败,返回默认反思
            return ReflectionResult(
                is_satisfactory=False,
                confidence=0.5,
                feedback="无法解析评估结果",
                suggestions=[]
            )

【3. 应用场景】

场景①:代码生成

def generate_code_with_reflection(prompt: str):
    agent = ReflectiveAgent(llm=code_llm, max_reflections=2)

    def generate(p: str):
        return code_llm.generate(f"生成代码:{p}")

    result, history = agent.execute_with_reflection(prompt, generate)

    for h in history:
        if h['reflection']:
            print(f"反思 {h['attempt']}: {h['reflection'].feedback}")

    return result

场景②:RAG 检索优化

class RAGWithReflection:
    def __init__(__(self, rag_engine):
        self.rag = rag_engine
        self.agent = ReflectiveAgent(llm=eval_llm)

    def query(self, question: str) -> str:
        def retrieve_and_answer(q: str):
            return self.rag.query(q)

        result, history = self.agent.execute_with_reflection(
            question,
            retrieve_and_answer
        )

        return result

场景③:搜索策略优化

class SearchReflector:
    def search_with_reflection(
        self,
        query: str,
        max_attempts: int = 3
    ):
        for attempt in range(max_attempts):
            # 执行搜索
            results = search_engine.search(query)

            # 反思
            reflection = self._reflect_search_quality(query, results)

            if reflection.is_satisfactory:
                return results

            # 改进查询
            query = self._improve_query(query, reflection.feedback)

        return results

【4. Reflection 成本控制】

优化项                | 策略
----------------------|---------------------------
最大反思次数          | 默认 2-3 次,避免无限循环
缓存反思结果          | 相似任务复用反思
异步反思              | 非关键场景异步评估
阈值控制              | 设置满意度阈值

记忆要点:

Reflection 三步:
1. 执行任务
2. 评估结果
3. 改进重试

应用场景:
- 代码生成:改进代码质量
- RAG 优化:调整检索策略
- 搜索优化:改进查询语句

成本控制:
最大次数:2-3 次
满意度阈值:0.8
缓存复用:相似任务

实战场景:企业智能问答系统

需求:

企业需要一个 AI 问答系统,用户可以问各种业务问题:

  • 公司政策文档
  • 技术文档
  • 产品手册
  • 员工手册

要求:

  • 回答准确,提供来源
  • 支持多轮对话
  • 数据更新及时
  • 成本可控

架构设计:

┌─────────────────────────────────────────┐
│            用户层                       │
│  Web / Mobile / Internal App          │
└──────────────┬────────────────────────┘
               │
               ▼
┌─────────────────────────────────────────┐
│           API Gateway                 │
│  - 认证授权                            │
│  - 限流管理                            │
│  - 请求路由                            │
└──────────────┬────────────────────────┘
               │
               ▼
┌─────────────────────────────────────────┐
│         Agent Orchestrator            │
│  - 意图识别                            │
│  - Agent 路由                          │
│  - 对话管理                            │
│  - 状态同步                            │
└──────┬────────────────────────────────┘
       │
   ┌───┴──────────────────┬────────────┐
   ▼                      ▼            ▼
┌─────────┐          ┌─────────┐   ┌──────────────┐
│  RAG    │          │ General  │   │  Knowledge   │
│  Agent  │          │ Agent   │   │  Graph Agent │
└────┬────┘          └────┬────┘   └──────┬───────┘
     │                     │                │
     ▼                     ▼                ▼
┌─────────┐          ┌─────────┐     ┌──────────┐
│ Vector  │          │  LLM    │     │  Neo4j   │
│  Store  │          │ (API)   │     │  (图谱)  │
└─────────┘          └─────────┘     └──────────┘

数据流:

# 1. 用户提问
user_query = "公司年假政策是怎样的?"

# 2. Orchestrator �由到 RAG Agent
agent = orchestrator.route(user_query)

# 3. RAG Agent 检索相关文档
retrieved_docs = rag_agent.retrieve(user_query, top_k=5)

# 4. 构建 Prompt
context = "\n\n".join([doc.content for doc in retrieved_docs])
prompt = f"""基于以下信息回答问题:

{context}

问题:{user_query}

请提供准确的答案,并标注信息来源。"""

# 5. LLM 生成答案
answer = llm.generate(prompt)

# 6. 返回结构化响应
response = {
    "answer": answer,
    "sources": retrieved_docs,
    "confidence": calculate_confidence(retrieved_docs)
}

接口设计:

# api/routes.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI(title="企业智能问答系统")

class QueryRequest(BaseModel):
    question: str = Field(..., min_length=1, max_length=500)
    conversation_id: Optional[str] = None
    user_id: str

class QueryResponse(BaseModel):
    answer: str
    sources: List[dict]
    conversation_id: str
    confidence: float
    tokens_used: int

@app.post("/api/query", response_model=QueryResponse)
async def query(request: QueryRequest):
    # 1. 检查限流
    if not rate_limiter.check(request.user_id):
        raise HTTPException(429, "请求过于频繁")

    # 2. 获取或创建会话
    conversation_id = request.conversation_id or generate_id()
    conversation = conversation_manager.get(conversation_id)

    # 3. 执行查询
    try:
        response = orchestrator.query(
            question=request.question,
            conversation=conversation,
            user_id=request.user_id
        )

        # 4. 更新对话历史
        conversation.add_message(
            role="user",
            content=request.question
        )
        conversation.add_message(
            role="assistant",
            content=response.answer
        )

        return QueryResponse(
            answer=response.answer,
            sources=response.sources,
            conversation_id=conversation_id,
            confidence=response.confidence,
            tokens_used=response.tokens_used
        )

    except Exception as e:
        # 降级处理
        log.error(f"查询失败: {e}")
        return fallback_query(request.question)

评估:

# evaluation/qa_eval.py
from ragas import evaluate
from ragas.metrics import (
    faithfulness,
    answer_relevance,
    context_precision
)

def build_golden_set():
    """构建评估集"""
    return [
        {
            "question": "公司年假政策是怎样的?",
            "ground_truth": "员工每年享有5天年假,满1年增加1天,最多15天",
            "contexts": ["HR文档/年假政策.txt"],
            "metadata": {"category": "HR", "difficulty": "easy"}
        },
        # ... 更多测试用例
    ]

def evaluate_qa_system():
    """评估问答系统"""
    # 1. 加载 Golden Set
    golden_set = build_golden_set()

    # 2. 运行测试
    results = []
    for test_case in golden_set:
        response = orchestrator.query(test_case["question"])

        results.append({
            "question": test_case["question"],
            "ground_truth": test_case["ground_truth"],
            "answer": response.answer,
            "contexts": [doc.content for doc in response.sources]
        })

    # 3. 计算 RAGAS 指标
    metrics = evaluate(
        dataset=results,
        metrics=[
            faithfulness,
            answer_relevance,
            context_precision
        ]
    )

    return metrics

# 离线评估
metrics = evaluate_qa_system()
print(f"Faithfulness: {metrics['faithfulness']:.3f}")
print(f"Answer Relevance: {metrics['answer_relevance']:.3f}")

上线:

# 1. 灰度发布(5% 流量)
kubectl patch deployment qa-agent \
  -p '{"spec":{"template":{"spec":{"containers":[{"name":"qa-agent","env":[{"name":"TRAFFIC_RATIO","value":"0.05"}]}}]}}}'

# 2. 监控指标
watch kubectl get pods -l app=qa-agent

# 3. 检查错误日志
kubectl logs -l app=qa-agent --tail=100 | grep ERROR

# 4. 观察用户反馈
# 查看 analytics dashboard

# 5. 逐步放量
# 10% -> 25% -> 50% -> 100%

监控:

# monitoring/prometheus.yml
apiVersion: v1
kind: ConfigMap
metadata:
  name: qa-agent-metrics
data:
  metrics.yaml: |
    # QPS
    metric: qa_agent_requests_total
    labels: [agent_type, user_type]

    # 延迟
    metric: qa_agent_request_duration_seconds
    labels: [agent_type, operation]

    # 错误率
    metric: qa_agent_errors_total
    labels: [agent_type, error_type]

    # Token 使用
    metric: qa_agent_tokens_used_total
    labels: [model_name]

    # 缓存命中率
    metric: qa_agent_cache_hit_rate

    # RAG 指标
    metric: qa_agent_rag_retrieved_docs
    metric: qa_agent_rag_context_precision

回滚:

# 检测到问题时立即回滚
kubectl rollout undo deployment/qa-agent

# 或使用 GitOps
git revert HEAD
git push origin main
# ArgoCD 自动回滚

模块二:RAG 与检索优化

面试问答

Q6: 如何提升 RAG 的召回率?

标准回答:

RAG 召回率提升策略

【召回率低的原因分析】

原因                    | 诊断方法              | 解决方案
------------------------|----------------------|----------------------
查询和文档语义不匹配  | 分析检索结果          | 查询扩展
嵌入模型效果差        | 对比不同模型          | 换更好的模型
切分粒度不当          | 查看检索到的 chunk  | 调整切分策略
元数据过滤过严        | 检查过滤条件          | 放宽过滤
索引未更新            | 检查索引时间戳        | 实时索引
查询表述不清          | 分析失败查询          | 查询重写

【1. 查询扩展】

class QueryExpander:
    def __init__(self, llm, embeddings):
        self.llm = llm
        self.embeddings = embeddings

    def expand(self, query: str, n: int = 3) -> List[str]:
        """生成相关查询"""
        prompt = f"""为以下问题生成 {n} 个相关的查询变体:

原问题:{query}

要求:
1. 与原问题语义相同
2. 使用不同的表述方式
3. 包含相关的同义词或关联词

请只输出查询,每行一个:"""

        related_queries = self.llm.generate(prompt).strip().split('\n')
        return [query] + related_queries[:n]

    def expand_with_synonyms(self, query: str) -> List[str]:
        """同义词扩展"""
        # 使用同义词库
        synonyms = self._get_synonyms(query)
        expanded = [query]
        for synonym in synonyms:
            expanded.append(query.replace(synonym.original, synonym.word))
        return list(set(expanded))

# 使用
expander = QueryExpander(llm, embeddings)
queries = expander.expand("Python 并发编程", n=3)
" -> [
    "Python 并发编程",
    "Python 多线程编程",
    "Python 异步编程",
    "Python concurrency"
]

【2. 混合检索】

class HybridRetriever:
    def __init__(
        self,
        vector_store,
        keyword_store,
        alpha: float = 0.7
    ):
        self.vector_store = vector_store
        self.keyword_store = keyword_store
        self.alpha = alpha  # 向量权重

    def retrieve(
        self,
        query: str,
        top_k: int = 10
    ) -> List[RetrievedResult]:
        """混合检索"""
        # 1. 向量检索
        vector_results = self.vector_store.search(query, top_k=top_k * 2)
        # 归一化分数
        for r in vector_results:
            r.score = self._normalize_vector_score(r.score)

        # 2. 关键词检索
        keyword_results = self.keyword_store.search(query, top_k=top_k * 2)
        for r in keyword_results:
            r.score = self._normalize_keyword_score(r.score)

        # 3. 合并打分
        merged = self._merge_and_rerank(
            vector_results,
            keyword_results,
            alpha=self.alpha
        )

        # 4. 返回 top_k
        return merged[:top_k]

    def _merge_and_rerank(
        self,
        vector_results: List[Result],
        keyword_results: List[Result],
        alpha: float
    ) -> List[Result]:
        """合并并重新打分"""
        # 创建文档 -> 分数映射
        doc_scores = {}

        # 向量分数
        for r in vector_results:
            if r.doc_id not in doc_scores:
                doc_scores[r.doc_id] = {"vector": 0, "keyword": 0}
            doc_scores[r.doc_id]["vector"] = r.score

        # 关键词分数
        for r in keyword_results:
            if r.doc_id not in doc_scores:
                doc_scores[r.doc_id] = {"vector": 0, "keyword": 0}
            doc_scores[r.doc_id]["keyword"] = r.score

        # 混合打分
        merged = []
        for doc_id, scores in doc_scores.items():
            hybrid_score = (
                alpha * scores["vector"] +
                (1 - alpha) * scores["keyword"]
            )
            merged.append(Result(
                doc_id=doc_id,
                score=hybrid_score
            ))

        # 按混合分数排序
        return sorted(merged, key=lambda x: x.score, reverse=True)

【3. 嵌入模型调优】

# 对比不同嵌入模型
embedding_models = {
    "bge-small": "BAAI/bge-small-zh-v1.5",
    "bge-large": "BAAI/bge-large-zh-v1.5",
    "jina-small": "jina-embeddings-v2-small-zh",
    "m3e": "moka-ai/m3e-base"
}

def evaluate_embeddings(models: Dict, test_set: List):
    """评估不同嵌入模型"""
    results = {}

    for name, model_name in models.items():
        embedder = EmbeddingModel(model_name)
        vector_store = VectorStore(embedder=embedder)

        # 测试
        metrics = evaluate_rag(vector_store, test_set)

        results[name] = metrics

    return results

# 领域微调
def fine_tune_embedder(
    base_model: str,
    domain_data: List[Pair]
):
    """领域数据微调嵌入模型"""
    from sentence_transformers import SentenceTransformer, InputExample

    model = SentenceTransformer(base_model)

    # 准备训练数据
    train_examples = [
        InputExample(texts=[query, doc], label=1.0)
        for query, doc in domain_data
    ]

    # 训练
    train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=16)
    train_loss = losses.CosineSimilarityLoss(model=model)

    model.fit(
        train_objectives=[(train_dataloader, train_loss)],
        epochs=5,
        warmup_steps=100
    )

    return model

【4. Chunking 策略优化】

class AdaptiveChunker:
    def __init__(self, embedder):
        self.embedder = embedder

    def chunk(self, text: str, max_tokens: int = 500) -> List[str]:
        """自适应切分"""
        # 1. 检测文档类型
        doc_type = self._detect_document_type(text)

        # 2. 根据类型选择策略
        if doc_type == "markdown":
            return self._chunk_markdown(text, max_tokens)
        elif doc_type == "code":
            return self._chunk_code(text, max_tokens)
        else:
            return self._chunk_semantic(text, max_tokens)

    def _chunk_semantic(
        self,
        text: str,
        max_tokens: int
    ) -> List[str]:
        """语义切分:在句子边界切分"""
        sentences = self._split_sentences(text)

        chunks = []
        current_chunk = []
        current_tokens = 0

        for sentence in sentences:
            sentence_tokens = self._estimate_tokens(sentence)

            if current_tokens + sentence_tokens <= max_tokens:
                current_chunk.append(sentence)
                current_tokens += sentence_tokens
            else:
                if current_chunk:
                    chunks.append("".join(current_chunk))
                current_chunk = [sentence]
                current_tokens = sentence_tokens

        if current_chunk:
            chunks.append("".join(current_chunk))

        return chunks

    def _detect_document_type(self, text: str) -> str:
        """检测文档类型"""
        if text.startswith("# ") or "## " in text[:500]:
            return "markdown"
        elif text.strip().startswith(("def ", "class ", "import ")):
            return "code"
        else:
            return "plain"

【5. 多路召回 + 重排】

class MultiPathRetriever:
    def __init__(self, retrievers: List[Retriever], reranker):
        self.retrievers = retrievers
        self.reranker = reranker

    def retrieve(self, query: str, top_k: int = 10) -> List[Result]:
        """多路召回"""
        all_results = []

        retriever.execute(1)

        # 并行召回
        with ThreadPoolExecutor(max_workers=len(self.retrievers)) as executor:
            futures = [
                executor.submit(r.retrieve, query, top_k * 2)
                for r in self.retrievers
            ]

            for future in futures:
                all_results.extend(future.result())

        retriever.end()

        # 去重
        deduped = self._deduplicate(all_results)

        # 重排
        reranked = self.reranker.rerank(query, deduped)

        return reranked[:top_k]

【召回率提升 Checklist】

提升手段              | 优先级 | 预期效果
----------------------|--------|----------
查询扩展              | 高     | +15-30%
混合检索              | 高     | +20-40%
更好的嵌入模型        | 高     | +10-25%
优化切分策略          | 中     | +10-20%
Re-ranking             | 中     | +5-15%
领域微调              | 低     | +5-10%

记忆要点:

召回提升口诀:
先扩展查询,再混合检索
嵌入选好的,切分要精细
多路并行召,重排精度提

Checklist:
- 查询扩展:同义词 + LLM 生成
- 混合检索:向量 + 关键词
- 嵌入模型:bge-large > bge-small
- 切分策略:语义 > 固定
- 重排:Cross-encoder 必需

Q7: RAG 的上下文窗口管理怎么做?

标准回答:

RAG 上下文窗口管理

【问题背景】

LLM 上下文窗口有限:
- GPT-3.5: 4K / 16K tokens
- GPT-4: 8K / 32K tokens
- Claude: 100K / 200K tokens

检索到的文档可能远超限制

【1. 截断策略】

class TruncationStrategy:
    def truncate_by_score(
        self,
        results: List[RetrievedResult],
        max_tokens: int
    ) -> List[RetrievedResult]:
        """按分数截断:保留高分文档"""
        selected = []
        total_tokens = 0

        for result in sorted(results, key=lambda r: r.score, reverse=True):
            tokens = self._estimate_tokens(result.content)
            if total_tokens + tokens <= max_tokens:
                selected.append(result)
                total_tokens += tokens
            else:
                break

        return selected

    def truncate_by_recency(
        self,
        results: List[RetrievedResult],
        max_tokens: int,
        recency_bonus: float = 0.1
    ) -> List[RetrievedResult]:
        """按时效性截断:新文档优先"""
        now = datetime.now()

        def recency_score(result: RetrievedResult) -> float:
            age_days = (now - result.timestamp).days
            return result.score * (1 - min(age_days * recency_bonus, 0.5))

        scored = sorted(results, key=recency_score, reverse=True)
        return self.truncate_by_score(scored, max_tokens)

    def truncate_with_overlap(
        self,
        results: List[RetrievedResult],
        max_tokens: int,
        overlap_tokens: int = 100
    ) -> List[RetrievedResult]:
        """保留部分内容"""
        selected = []
        total_tokens = 0

        for result in results:
            tokens = self._estimate_tokens(result.content)

            if total_tokens + tokens <= max_tokens:
                selected.append(result)
                total_tokens += tokens
            else:
                # 剩余空间有限,保留最后 N tokens
                remaining = max_tokens - total_tokens
                if remaining > overlap_tokens:
                    truncated = result.content[
                        -self._tokens_to_chars(remaining):
                    ]
                    selected.append(RetrievedResult(
                        content=truncated,
                        score=result.score
                    ))
                break

        return selected

【2. 压缩策略】

class ContextCompressor:
    def __init__(self, llm):
        self.llm = llm

    def compress_documents(
        self,
        documents: List[str],
        target_tokens: int
    ) -> List[str]:
        """压缩文档集合"""
        # 计算当前 tokens
        current_tokens = sum(self._estimate_tokens(d) for d in documents)

        if current_tokens <= target_tokens:
            return documents

        # 需要压缩
        compression_ratio = target_tokens / current_tokens

        if compression_ratio > 0.5:
            # 轻度压缩:每篇文档压缩摘要
            return self._summarize_each(documents)
        else:
            # 重度压缩:合并后统一压缩
            return self._summarize_all(documents)

    def _summarize_each(self, documents: List[str]) -> List[str]:
        """每篇文档生成摘要"""
        summaries = []
        for doc in documents:
            summary = self.llm.generate(
                f"用最简练的语言总结以下内容:\n{doc}"
            )
            summaries.append(summary)
        return summaries

    def _summarize_all(self, documents: List[str]) -> List[str]:
        """合并后生成摘要"""
        combined = "\n\n".join(documents)
        summary = self.llm.generate(
            f"总结以下内容的关键信息:\n{combined}"
        )
        return [summary]

【3. 智能选择策略】

class SmartContextSelector:
    def __init__(self, llm):
        self.llm = llm

    def select_context(
        self,
        query: str,
        candidates: List[RetrievedResult],
        max_tokens: int
    ) -> List[RetrievedResult]:
        """智能选择:用 LLM 判断重要性"""
        # 先按分数预筛选
        preselected = sorted(candidates, key=lambda r: r.score, reverse=True)[:max_tokens * 2]

        # 构建 Prompt
        prompt = self._build_selection_prompt(query, preselected, max_tokens)

        # LLM 选择
        response = self.llm.generate(prompt)
        selected_indices = self._parse_selection(response)

        return [preselected[i] for i in selected_indices if i < len(preselected)]

    def _build_selection_prompt(
        self,
        query: str,
        candidates: List[RetrievedResult],
        max_tokens: int
    ) -> str:
        candidates_text = "\n".join([
            f"{i}. {c.content[:200]}..."
            for i, c in enumerate(candidates)
        ])

        return f"""用户问题:{query}

候选文档({len(candidates)} 篇):
{candidates_text}

请选择最能回答问题的文档编号。
总 token 预算:{max_tokens}

请输出选择的文档编号,用逗号分隔。"""

【4. 分片策略】

class ChunkedContextManager:
    def __init__(self, max_context_tokens: int = 4000):
        self.max_tokens = max_context_tokens

    def generate_chunks(
        self,
        query: str,
        documents: List[RetrievedResult]
    ) -> List[str]:
        """生成上下文分片"""
        # 计算单次能容纳的文档数
        avg_doc_tokens = sum(self._estimate_tokens(d.content) for d in documents) / len(documents)
        docs_per_chunk = int(self.max_tokens / avg_doc_tokens * 0.8)

        # 分片
        chunks = []
        for i in range(0, len(documents), docs_per_chunk):
            chunk_docs = documents[i:i + docs_per_chunk]
            context = "\n\n".join([d.content for d in chunk_docs])
            chunks.append(context)

        return chunks

    def process_with_chunks(
        self,
        query: str,
        documents: List[RetrievedResult]
    ) -> str:
        """分片处理并合并结果"""
        chunks = self.generate_chunks(query, documents)

        results = []
        for i, chunk in enumerate(chunks):
            prompt = f"""基于以下信息回答问题:

{chunk}

问题:{query}

{('如果是第一部分,请开始回答。' if i == 0 else '请继续完成回答。')}"""

            result = self.llm.generate(prompt)
            results.append(result)

        # 合并结果
        return " ".join(results)

【上下文窗口管理 Checklist】

策略              | 适用场景              | 优缺点
------------------|----------------------|----------
按分数截断        | 通用场景              | 简单,可能丢失重要信息
按时效截断        | 时效性敏感            | 新信息优先
保留部分内容      | 长文档                | 保留上下文
每篇压缩          | 中等压缩              | 保持结构
统一压缩          | 重度压缩              | 简洁但丢失细节
LLM 智能选择      | 复杂推理              | 准确但成本高
分片处理          | 超长上下文            | 支持无限长度

记忆要点:

窗口管理四策:
1. 截断:按分数/时效/重叠
2. 压缩:每篇摘要/统一压缩
3. 智选:LLM 判断重要性
4. 分片:分次处理合并

选择决策:
简单截断 → 通用
需要精度 → LLM 智选
超长内容 → 分片处理

实战场景:技术文档问答系统

需求:

为技术团队搭建一个智能文档问答系统:

  • 源数据:Git 仓库、Wiki、API 文档
  • 准确性高,必须引用原始文档
  • 支持代码示例查询
  • 实时索引更新

架构设计:

数据源:
├── GitLab 仓库
│   ├── 代码文件
│   └── README/注释
├── Confluence Wiki
│   └── 文档页面
└── API 文档(Swagger/OpenAPI)

ETL 流程:
├── Git webhook → 触发索引
├── 数据提取
├── 代码分块(函数/类级别)
├── 文档分块(章节/段落)
├── 嵌入
└── 向量库更新

检索流程:
├── 意图识别
│   ├── 代码查询 → Code Agent
│   └── 文档查询 → RAG Agent
├── 混合检索
│   ├── 代码语义检索
│   └── 关键词检索
├── Re-ranking
└── 返回答案 + 引用

数据流:

# etl/git_indexer.py
class GitIndexer:
    def __init__(self, vector_store, code_embedder):
        self.vector_store = vector_store
        self.code_embedder = code_embedder

    def index_repository(self, repo_url: str, branch: str = "main"):
        """索引 Git 仓库"""
        # 1. Clone 仓库
        repo_dir = self._clone_repo(repo_url, branch)

        # 2. 遍历文件
        for file_path in self._walk_files(repo_dir):
            if file_path.endswith(('.py', '.js', '.java', '.go')):
                self._index_code_file(file_path)
            elif file_path.endswith(('.md', '.txt', '.rst')):
                self._index_doc_file(file_path)

    def _index_code_file(self, file_path: str):
        """索引代码文件"""
        # 解析 AST
        ast = self._parse_ast(file_path)

        # 提取函数/类
        for function in ast.functions:
            # 生成文档字符串
            doc = f"""函数:{function.name}
参数:{', '.join(function.params)}
描述:{function.docstring}
代码:{function.code}"""

            # 嵌入
            embedding = self.code_embedder.embed(doc)

            # 存储
            self.vector_store.add(
                id=f"{file_path}:{function.name}",
                vector=embedding,
                metadata={
                    "file_path": file_path,
                    "type": "function",
                    "name": function.name,
                    "line": function.line_number
                }
            )

# retrieval/code_retriever.py
class CodeRetriever:
    def retrieve(
        self,
        query: str,
        language: Optional[str] = None,
        top_k: int = 5
    ) -> List[CodeResult]:
        """检索代码"""
        # 1. 嵌入查询
        query_embedding = self.code_embedder.embed(query)

        # 2. 构建过滤条件
        filters = {}
        if language:
            filters["language"] = language

        # 3. 向量搜索
        results = self.vector_store.search(
            query_vector=query_embedding,
            top_k=top_k * 2,
            filters=filters
        )

        # 4. 重排
        reranked = self.code_reranker.rerank(query, results)

        return reranked[:top_k]

完整 Demo 代码详见技术文档问答系统目录。


模块三 - 八(简要)

由于篇幅限制,以下为剩余模块的要点摘要。

模块三:工作流编排

Q: LangGraph 和传统 Chain 的区别?

LangGraph 优势:
- 图结构:支持复杂依赖
- 状态机:状态明确
- 可视化:工作流图
- 并行:天然支持

传统 Chain:
- 线性:顺序执行
- 简单:适合简单任务

模块四:工具接入与权限控制

Q: 如何防止 Agent 调用危险工具?

防御措施:
1. 工具白名单
2. 参数校验(Pydantic)
3. 权限检查(RBAC)
4. 审计日志
5. 输出过滤(PII 脱敏)

模块五:Prompt 工程与约束输出

Q: 如何保证 LLM 输出符合预期格式?

约束方法:
1. JSON Mode(强制 JSON)
2. Function Calling(自动格式)
3. Pydantic 验证(类型检查)
4. Few-shot(示例引导)
5. 重试机制(失败重试)

模块六:评估与回归测试

Q: 如何构建 RAG 评估的 Golden Set?

构建步骤:
1. 收集真实查询
2. 人工标注标准答案
3. 标记相关文档
4. 难度分级
5. 持续维护

规模:
- 开发:50-100 对
- 测试:200-500 对
- 生产:1000+ 对

模块七:安全与风控

Q: 如何防御 Prompt Injection 攻击?

防御措施:
1. 输入验证(检测危险关键词)
2. 分隔符隔离用户输入
3. System Prompt 强化
4. 输出过滤
5. 审计监控

模块八:性能与成本

Q: 如何降低 LLM 调用成本?

成本优化:
1. 缓存(相同Redis缓存)
2. 模型路由(简单用小模型)
3. Token 优化(精简 Prompt)
4. 批量处理(合并请求)
5. 异步处理(非核心异步)
6. 本地部署(高频用本地)

记忆要点总表

模块 核心口诀 面试重点
Agent 基础 ReAct 实时、Plan 可复用 模式选择、Memory 设计
RAG 查询扩展、混合检索、Re-ranking 召回率、上下文管理
工作流 图结构、状态机 LangGraph vs Chain
工具 白名单、权限控制、审计 安全防御、参数校验
Prompt JSON Mode、Pydantic 验证 输出约束
评估 Golden Set、RAGAS 指标 离线/在线评估
安全 输入验证、输出过滤 Prompt Injection 防御
性能 缓存、模型路由、批量 成本优化、延迟控制

文档版本: 1.0
最后更新: 2026-01-19

close
arrow_upward