内容纲要
AI Agent 开发:面试与实战强化记忆版
面试应答 + 记忆要点 + 实战场景
目录
- 模块一:Agent 基础范式
- 模块二:RAG 与检索优化
- 模块三:工作流编排
- 模块四:工具接入与权限控制
- 模块五:Prompt 工程与约束输出
- 模块六:评估与回归测试
- 模块七:安全与风控
- 模块八:性能与成本
模块一:Agent 基础范式
面试问答
Q1: ReAct 和 Plan & Execute 的区别是什么?什么时候用哪个?
标准回答:
ReAct(Reasoning + Acting)vs Plan & Execute
【核心区别】
维度 | ReAct | Plan & Execute
---------------|------------------------|---------------------------
决策时机 | 每步实时思考 | 一次性规划
可复用性 | 低(依赖上下文) | 高(Plan 可存档)
执行追踪 | 难以追踪 | Plan 就是追踪
Token 消耗 | 高(每步都思考) | 低(规划一次)
纠错能力 | 实时纠错 | 需要重新规划
适用场景 | 探索性、交互式任务 | 结构化、可复用任务
【工程选择决策】
场景 | 推荐 | 原因
------------------------|-------|-----------------------
简单问答 | ReAct | 快速响应,无需规划
探索性任务 | ReAct | 需要实时调整策略
需要执行记录存档 | Plan | Plan 可追溯、复用
复杂任务拆解 | Plan | DAG 结构清晰
需要并行执行 | Plan | Plan 可识别并行任务
多轮对话 | ReAct | 实时响应用户反馈
【边界条件】
ReAct 的风险:
- 无限循环:需要设置最大迭代次数
- Token 消耗爆炸:长对话成本高
- 状态不一致:思考步骤可能偏离
Plan & Execute 的风险:
- 规划失败:初始规划错误无法执行
- 信息不完整:规划时需要所有信息
- 灵活性差:无法中途调整计划
【实战建议】
1. 混合模式:先用 Plan 规划,执行时用 ReAct 动态调整
2. 超时控制:Plan 执行设置超时,超时后回退到 ReAct
3. 缓存 Plan:相同任务的 Plan 可复用
4. Plan 校验:规划后进行语法和依赖检查
记忆要点:
口诀:
ReAct 实时思考探索性
Plan 一次性规划可复用
选择决策:
简单交互 → ReAct
复杂可追踪 → Plan
需要存档 → Plan
边界:
ReAct 防循环、防 token 爆炸
Plan 防规划失败、防信息不全
Checklist:
# ReAct 检查清单
ReAct_Checklist = [
"设置最大迭代次数(默认10)",
"设置超时时间(默认30s)",
"添加停止条件关键词",
"Token 计数监控",
"异常捕获和降级"
]
# Plan & Execute 检查清单
Plan_Checklist = [
"规划前信息完整性检查",
"Plan 依赖关系验证",
"Plan 语法校验",
"任务执行超时控制",
"失败任务重试策略",
"Plan 版本管理"
]
Q2: 如何设计一个可扩展的 Multi-Agent 系统?
标准回答:
Multi-Agent 系统设计原则
【1. 架构分层】
┌─────────────────────────────────────────────┐
│ Orchestrator(编排层) │
│ - 任务分解 │
│ - Agent 路由 │
│ - 状态管理 │
│ - 结果聚合 │
└──────────────┬──────────────────────────┘
│
┌──────┴──────┬──────┬──────┐
▼ ▼ ▼ ▼
Researcher Writer Coder Tester
(研究 Agent) (写作) (编程) (测试)
│ │ │ │
└──────┬──────┴──────┴──────┘
│
▼
Shared Memory
(共享记忆)
【2. Agent 接口标准化】
class Agent(Protocol):
name: str
description: str
capabilities: List[str]
def execute(self, task: Task) -> Result:
"""执行任务,返回结构化结果"""
...
def can_handle(self, task: Task) -> bool:
"""判断是否能处理该任务"""
...
# 任意 Agent 实现此接口即可接入系统
【3. 通信协议定义】
@dataclass
class AgentMessage:
id: str
sender: str # 发送 Agent
receiver: str # 接收 Agent
task: Task
context: Dict[str, Any]
timestamp: datetime
@dataclass
class AgentResponse:
message_id: str
status: str # success/failed/partial
result: Any
error: Optional[str]
metadata: Dict
【4. 路由机制】
class AgentRouter(Protocol):
def route(self, task: Task) -> str:
"""路由任务到合适的 Agent"""
...
# 路由策略实现
class RuleBasedRouter(AgentRouter):
def route(self, task: Task) -> str:
"""基于规则的路由"""
if "search" in task.keywords:
return "researcher"
elif "write" in task.keywords:
return "writer"
elif "code" in task.keywords:
return "coder"
return "general"
class VectorRouter(AgentRouter):
def __init__(self):
self.embedder = EmbeddingModel()
self.agent_embeddings = {
"researcher": self.embedder("搜索、查询、研究"),
"writer": self.embedder("写作、总结、文档"),
"coder": self.embedder("编程、代码、开发")
}
def route(self, task: Task) -> str:
"""基于语义相似度路由"""
task_embedding = self.embedder(task.description)
similarities = {
agent: cosine(task_embedding, emb)
for agent, emb in self.agent_embeddings.items()
}
return max(similarities, key=similarities.get)
class LLMRouter(AgentRouter):
def route(self, task: Task) -> str:
"""基于 LLM 的路由"""
prompt = f"""
任务描述:{task.description}
可用 Agents:
- researcher: 搜索和信息分析
- writer: 文档生成和总结
- coder: 编程任务
请选择最合适的 Agent。
"""
decision = llm.generate(prompt)
return decision.strip()
【5. 状态管理】
class SharedState:
def __init__(self):
self._store = {}
self._lock = Lock()
def get(self, key: str) -> Any:
with self._lock:
return self._store.get(key)
def set(self, key: str, value: Any):
with self._lock:
self._store[key] = value
def publish(self, event: str, data: Any):
"""发布事件,通知所有 Agent"""
...
def subscribe(self, agent: str, event: str):
"""Agent 订阅事件"""
...
【6. 超时与容错】
class AgentOrchestrator:
def __init__(self, timeout: int = 30, max_retries: int = 3):
self.timeout = timeout
self.max_retries = max_retries
def execute_task(self, task: Task) -> Result:
agent = self.router.route(task)
for attempt in range(self.max_retries):
try:
# 超时控制
result = with_timeout(
agent.execute(task),
timeout=self.timeout
)
return result
except TimeoutError:
log.warning(f"Agent {agent} 超时,重试 {attempt + 1}")
except Exception as e:
log.error(f"Agent {agent} 执行失败: {e}")
# 所有重试失败,降级处理
return self.fallback(task)
【7. 可观测性】
class AgentMetrics:
def __init__(self):
self._start_times = {}
self._counts = defaultdict(int)
self._errors = defaultdict(int)
def record_start(self, agent: str, task_id: str):
self._start_times[(agent, task_id)] = time.time()
def record_end(self, agent: str, task_id: str):
duration = time.time() - self._start_times.pop((agent, task_id))
self._counts[agent] += 1
return duration
def record_error(self, agent: str):
self._errors[agent] += 1
def get_stats(self, agent: str) -> Dict:
return {
"total_calls": self._counts[agent],
"errors": self._errors[agent],
"success_rate": (
(self._counts[agent] - self._errors[agent]) /
max(1, self._counts[agent])
)
}
记忆要点:
架构口诀:
上 Orchestrator(编排中枢)
下 Agents(专业分工)
中间 Shared Memory(状态共享)
设计原则:
1. Agent 接口标准化
2. 通信协议清晰
3. 路由策略可插拔
4. 状态管理集中
5. 超时容错完善
6. 可观测性充分
扩展方式:
新增 Agent = 实现接口 + 注册到路由表
新增路由 = 实现 Router + 配置
Q3: Memory 怎么设计才能兼顾性能和效果?
标准回答:
Memory 设计:性能 vs 效果权衡
【三层 Memory 架构】
┌─────────────────────────────────────────┐
│ 应用层(Agent 使用) │
│ agent.memory.get(query, k=5) │
└──────────────┬────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Memory Manager(管理层) │
│ - 合并检索结果 │
│ - 去重 │
│ - 排序 │
│ - 缓存策略 │
└──────┬────────┬────────────────────────┘
│ │
▼ ▼
┌──────────┐ ┌────────────────┐
│ 短期 │ │ 长期 │
│ Memory │ │ Memory │
│ (内存) │ │ (向量库) │
│ FIFO/LRU │ │ 查询 + 聚类 │
└──────────┘ └────────────────┘
【1. 短期 Memory 设计】
class ShortTermMemory:
def __init__(self, max_size: int = 100, strategy: str = "FIFO"):
self.max_size = max_size
self.strategy = strategy
self._messages = []
def add(self, message: Message):
if self.strategy == "FIFO":
self._messages.append(message)
if len(self._messages) > self.max_size:
self._messages.pop(0) # 队首出
elif self.strategy == "LRU":
# 查询时更新访问时间
if message in self._messages:
self._messages.remove(message)
self._messages.append(message)
if len(self._messages) > self.max_size:
self._messages.pop(0)
def get_recent(self, n: int) -> List[Message]:
return self._messages[-n:]
# 优化:
# - 最大窗口:限制 token 数量(如 4000 tokens)
# - 压缩策略:多条消息用 LLM 压缩成摘要
# - 去重:相同内容只保留一条
【2. 长期 Memory 设计】
class LongTermMemory:
def __init__(
self,
vector_store: VectorStore,
embedder: EmbeddingModel,
chunk_size: int = 500,
overlap: int = 50
):
self.store = vector_store
self.embedder = embedder
self.chunk_size = chunk_size
self.overlap = overlap
def add_document(self, text: str, metadata: Dict):
# 1. 切分文档(保留语义)
chunks = self._chunk(text, self.chunk_size, self.overlap)
# 2. 批量嵌入
embeddings = self.embedder.embed_batch(chunks)
# 3. 存储到向量库
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
self.store.add(
id=f"{metadata['doc_id']}_{i}",
vector=embedding,
metadata={
"content": chunk,
**metadata,
"chunk_index": i
}
)
def search(
self,
query: str,
top_k: int = 5,
filters: Dict = None
) -> List[Message]:
# 1. 嵌入查询
query_embedding = self.embedder.embed(query)
# 2. 向量搜索
results = self.store.search(
query_vector=query_embedding,
top_k=top_k * 2, # 多检索一些
filters=filters
)
# 3. 去重(同一文档的多个 chunk 只保留最好的)
deduped = self._deduplicate_by_doc(results)
# 4. 返回 top_k
return deduped[:top_k]
def _chunk(
self,
text: str,
chunk_size: int,
overlap: int
) -> List[str]:
"""语义切分"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
chunks.append(chunk)
start = end - overlap
return chunks
# 优化:
# - 嵌入缓存:相同查询复用嵌入结果
# - 批量操作:减少 API 调用
# - 异步写入:不阻塞主流程
# - 索引优化:向量库建立索引加速搜索
【3. Memory Manager(合并策略)】
class MemoryManager:
def __init__(self):
self.short_term = ShortTermMemory(max_size=50)
self.long_term = LongTermMemory(...)
self._query_cache = LRUCache(maxsize=1000)
def retrieve(
self,
query: str,
k: int = 5,
use_cache: bool = True
) -> List[Message]:
# 1. 缓存命中
cache_key = f"{query}:{k}"
if use_cache and cache_key in self._query_cache:
return self._query_cache[cache_key]
# 2. 并行检索
with ThreadPoolExecutor(max_workers=2) as executor:
short_future = executor.submit(
self.short_term.get_recent, min(3, k)
)
long_future = executor.submit(
self.long_term.search, query, max(k, 5)
)
short_results = short_future.result()
long_results = long_future.result()
# 3. 合并去重
all_results = self._merge_and_deduplicate(
short_results,
long_results
)
# 4. 排序(按相关性)
sorted_results = self._rerank(query, all_results)
# 5. 截断
final_results = sorted_results[:k]
# 6. 缓存
if use_cache:
self._query_cache[cache_key] = final_results
return final_results
def _merge_and_deduplicate(
self,
short: List[Message],
long: List[Message]
) -> List[Message]:
# 优先短期记忆
seen_ids = {m.id for m in short}
deduped = list(short)
# 添加长期记忆中未重复的
for m in long:
if m.id not in seen_ids:
deduped.append(m)
seen_ids.add(m.id)
return deduped
def _rerank(
self,
query: str,
messages: List[Message]
) -> List[Message]:
# 使用 Cross-Encoder 重排
pairs = [(query, m.content) for m in messages]
scores = self.reranker.predict(pairs)
for m, score in zip(messages, scores):
m.score = score
return sorted(messages, key=lambda m: m.score, reverse=True)
【性能优化清单】
优化项 | 短期 Memory | 长期 Memory
----------------------|----------------|----------------
嵌入缓存 | N/A | ✅ LRU 缓存
批量嵌入 | N/A | ✅ 批量 API 调用
异步写入 | N/A | ✅ 后台队列
索引优化 | N/A | ✅ HNSW 索引
查询压缩 | ✅ LLM 压缩 | N/A
去重策略 | ✅ 内容哈希 | ✅ 文档级去重
并发检索 | N/A | ✅ 混合检索
记忆要点:
Memory 三层:
1. 短期:内存、FIFO/LRU、快速
2. 长期:向量库、语义检索、持久
3. 管理:合并、去重、排序、缓存
优化口诀:
短期用内存,长期用向量
查询先缓存,写入要异步
批量做嵌入,混合去重排
Q4: Tool Calling 的安全风险有哪些?如何防御?
标准回答:
Tool Calling 安全风险与防御
【1. 风险清单】
风险类型 | 危害程度 | 场景
--------------------|----------|----------------------
注入攻击 | 高 | 恶意 Prompt 修改工具调用
越权调用 | 高 | Agent 调用无权限工具
参数注入 | 中 | 参数包含 SQL/命令注入
无限循环 | 中 | 工具互相调用
敏感信息泄露 | 高 | 工具返回包含 PII
资源耗尽 | 中 | 高频调用消耗配额
【2. 防御方案】
方案 ①:工具白名单
class SecureToolManager:
def __init__(self, allowed_tools: Set[str]):
self._allowed_tools = allowed_tools
self._dangerous_patterns = [
"delete", "drop", "truncate", "exec", "eval"
]
def call(self, tool_name: str, **kwargs) -> Any:
# 检查白名单
if tool_name not in self._allowed_tools:
raise SecurityError(f"工具未授权: {tool_name}")
# 检查危险关键词
if any(p in tool_name.lower() for p in self._dangerous_patterns):
self._log_security_event(tool_name, kwargs)
raise SecurityError(f"危险工具: {tool_name}")
return self._execute_tool(tool_name, kwargs)
方案 ②:参数校验
from pydantic import BaseModel, Field, validator
class SearchInput(BaseModel):
query: str = Field(..., min_length=1, max_length=1000)
@validator('query')
def validate_no_sql(cls, v):
sql_keywords = ['select', 'insert', 'update', 'delete', 'drop']
if any(kw in v.lower() for kw in sql_keywords):
raise ValueError("查询包含 SQL 关键词")
return v
class WeatherInput(BaseModel):
location: str = Field(..., pattern=r'^[a-zA-Z\s]+$')
unit: str = Field(default="celsius", regex="^(celsius|fahrenheit)$")
@validator('location')
def validate_location_length(cls, v):
if len(v) > 100:
raise ValueError("地点名称过长")
return v
方案 ③:权限控制
class ToolPermissionManager:
def __init__(self):
self._tool_permissions = {
"get_weather": {Permission.READ},
"get_user_info": {Permission.READ},
"update_user": {Permission.WRITE},
"delete_user": {Permission.ADMIN}
}
def check_permission(
self,
tool_name: str,
user_permissions: Set[Permission]
) -> bool:
required = self._tool_permissions.get(tool_name, set())
return required.issubset(user_permissions)
方案 ④:限流
class RateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
def check_rate_limit(
self,
tool_name: str,
user_id: str,
limit: int = 10,
window: int = 60
) -> bool:
key = f"rate:{tool_name}:{user_id}"
count = self.redis.incr(key)
if count == 1:
self.redis.expire(key, window)
return count <= limit
方案 ⑤:审计日志
class AuditLogger:
def log_tool_call(
self,
tool_name: str,
user_id: str,
parameters: Dict,
result: Any,
status: str
):
log_entry = {
"timestamp": datetime.now().isoformat(),
"tool_name": tool_name,
"user_id": user_id,
"parameters": self._sanitize_params(parameters),
"result_status": status,
"success": status == "success"
}
self._write_log(log_entry)
def _sanitize_params(self, params: Dict) -> Dict:
"""脱敏处理"""
sanitized = params.copy()
for key in ['password', 'token', 'secret', 'api_key']:
if key in sanitized:
sanitized[key] = "***REDACTED***"
return sanitized
方案 ⑥:输出过滤
class OutputFilter:
def __init__(self):
self._pii_patterns = [
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
r'\b\d{3}-\d{4}-\d{4}\b', # 手机号
r'\b\d{18}\b' # 身份证
]
def filter(self, text: str) -> str:
for pattern in self._pii_patterns:
text = re.sub(pattern, '[已脱敏]', text)
return text
【3. 综合防御实现】
class SecureAgentToolManager:
def __init__(
self,
allowed_tools: Set[str],
audit_logger: AuditLogger
):
self._tool_registry = ToolRegistry()
self._allowed_tools = allowed_tools
self._permission_manager = ToolPermissionManager()
self._rate_limiter = RateLimiter(redis)
self._audit_logger = audit_logger
self._output_filter = OutputFilter()
def call_tool(
self,
tool_name: str,
user_id: str,
user_permissions: Set[Permission],
**kwargs
) -> str:
# 1. 检查白名单
if tool_name not in self._allowed_tools:
raise SecurityError(f"工具未授权: {tool_name}")
udi.logger = AuditLogger(redis_client)
# 审计日志
audit_logger.log_tool_call(
tool_name="get_user",
user_id="user123",
parameters={"id": 456},
result="用户信息",
status="success"
)
# 输出过滤
output_filter = OutputFilter()
filtered = output_filter.filter("联系邮箱 test@example.com")
# 输出: "联系邮箱 [已脱敏]"
记忆要点:
安全六道防线:
1. 白名单:只允许注册的工具
2. 参数校验:Pydantic 验证 + 正则
3. 权限控制:RBAC 检查
4. 限流:防止滥用
5. 审计:记录所有调用
6. 输出过滤:PII 脱敏
口诀:
白名单要控制
参数要校验
权限要检查
调用要限流
操作要审计
输出要脱敏
Q5: Agent 的 Reflection 机制如何实现?有什么应用场景?
标准回答:
Agent Reflection 机制
【1. Reflection 原理】
Reflection = Agent 自我评估 + 自我改进
┌─────────────────────────────────────────┐
│ Reflection 循环 │
│ │
│ 执行任务 → 评估结果 → 判断满意度 │
│ ↓ ↓ ↓ │
│ 满意?←是→结束 ←否→改进策略 │
│ ↑ ↑ │
│ └────重新执行──────┘ │
└─────────────────────────────────────────┘
【2. 核心实现】
from typing import Tuple, List
from dataclasses import dataclass
@dataclass
class ReflectionResult:
is_satisfactory: bool
confidence: float
feedback: str
suggestions: List[str]
class ReflectiveAgent:
def __init__(
self,
llm,
max_reflections: int = 3,
satisfaction_threshold: float = 0.8
):
self.llm = llm
self.max_reflections = max_reflections
self.threshold = satisfaction_threshold
def execute_with_reflection(
self,
task: str,
execute_func: Callable
) -> Tuple[Any, List[Dict]]:
"""带反思的执行"""
history = []
for attempt in range(self.max_reflections + 1):
# 执行任务
result = execute_func(task)
# 第一次直接返回
if attempt == 0:
history.append({
"attempt": attempt,
"result": result,
"reflection": None
})
return result, history
# 反思评估
reflection = self._reflect(task, result, history)
history.append({
"attempt": attempt,
"result": result,
"reflection": reflection
})
# 判断是否满意
if reflection.is_satisfactory:
return result, history
# 改进任务描述
task = self._improve_task(task, reflection.feedback)
# 达到最大次数,返回最后一次
return result, history
def _reflect(
self,
original_task: str,
result: str,
history: List[Dict]
) -> ReflectionResult:
"""反思评估"""
# 构建反思 Prompt
prompt = self._build_reflection_prompt(
original_task, result, history
)
response = self.llm.generate(prompt)
return self._parse_reflection(response)
def _build_reflection_prompt(
self,
task: str,
result: str,
history: List[Dict]
) -> str:
"""构建反思 Prompt"""
attempts_summary = "\n".join([
f"尝试 {h['attempt'] + 1}: {h['result'][:200]}..."
for h in history
])
return f"""请评估以下任务执行结果:
原始任务:{task}
执行历史:
{attempts_summary}
当前结果:
{result}
请从以下角度评估:
1. 结果是否完整回答了问题?
2. 结果是否准确可信?
3. 是否需要更多信息?
4. 有哪些可以改进的地方?
请以以下 JSON 格式回答:
{{
"is_satisfactory": true/false,
"confidence": 0.0-1.0,
"feedback": "评估反馈",
"suggestions": ["改进建议1", "改进建议2"]
}}"""
def _improve_task(self, task: str, feedback: str) -> str:
"""根据反馈改进任务"""
prompt = f"""基于以下反馈,改进任务描述:
原始任务:{task}
反馈:{feedback}
请生成改进后的任务描述,更明确地指导执行。"""
improved_task = self.llm.generate(prompt)
return improved_task.strip()
def _parse_reflection(self, response: str) -> ReflectionResult:
"""解析反思结果"""
try:
import json
data = json.loads(response)
return ReflectionResult(**data)
except:
# 解析失败,返回默认反思
return ReflectionResult(
is_satisfactory=False,
confidence=0.5,
feedback="无法解析评估结果",
suggestions=[]
)
【3. 应用场景】
场景①:代码生成
def generate_code_with_reflection(prompt: str):
agent = ReflectiveAgent(llm=code_llm, max_reflections=2)
def generate(p: str):
return code_llm.generate(f"生成代码:{p}")
result, history = agent.execute_with_reflection(prompt, generate)
for h in history:
if h['reflection']:
print(f"反思 {h['attempt']}: {h['reflection'].feedback}")
return result
场景②:RAG 检索优化
class RAGWithReflection:
def __init__(__(self, rag_engine):
self.rag = rag_engine
self.agent = ReflectiveAgent(llm=eval_llm)
def query(self, question: str) -> str:
def retrieve_and_answer(q: str):
return self.rag.query(q)
result, history = self.agent.execute_with_reflection(
question,
retrieve_and_answer
)
return result
场景③:搜索策略优化
class SearchReflector:
def search_with_reflection(
self,
query: str,
max_attempts: int = 3
):
for attempt in range(max_attempts):
# 执行搜索
results = search_engine.search(query)
# 反思
reflection = self._reflect_search_quality(query, results)
if reflection.is_satisfactory:
return results
# 改进查询
query = self._improve_query(query, reflection.feedback)
return results
【4. Reflection 成本控制】
优化项 | 策略
----------------------|---------------------------
最大反思次数 | 默认 2-3 次,避免无限循环
缓存反思结果 | 相似任务复用反思
异步反思 | 非关键场景异步评估
阈值控制 | 设置满意度阈值
记忆要点:
Reflection 三步:
1. 执行任务
2. 评估结果
3. 改进重试
应用场景:
- 代码生成:改进代码质量
- RAG 优化:调整检索策略
- 搜索优化:改进查询语句
成本控制:
最大次数:2-3 次
满意度阈值:0.8
缓存复用:相似任务
实战场景:企业智能问答系统
需求:
企业需要一个 AI 问答系统,用户可以问各种业务问题:
- 公司政策文档
- 技术文档
- 产品手册
- 员工手册
要求:
- 回答准确,提供来源
- 支持多轮对话
- 数据更新及时
- 成本可控
架构设计:
┌─────────────────────────────────────────┐
│ 用户层 │
│ Web / Mobile / Internal App │
└──────────────┬────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ API Gateway │
│ - 认证授权 │
│ - 限流管理 │
│ - 请求路由 │
└──────────────┬────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Agent Orchestrator │
│ - 意图识别 │
│ - Agent 路由 │
│ - 对话管理 │
│ - 状态同步 │
└──────┬────────────────────────────────┘
│
┌───┴──────────────────┬────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌──────────────┐
│ RAG │ │ General │ │ Knowledge │
│ Agent │ │ Agent │ │ Graph Agent │
└────┬────┘ └────┬────┘ └──────┬───────┘
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌──────────┐
│ Vector │ │ LLM │ │ Neo4j │
│ Store │ │ (API) │ │ (图谱) │
└─────────┘ └─────────┘ └──────────┘
数据流:
# 1. 用户提问
user_query = "公司年假政策是怎样的?"
# 2. Orchestrator �由到 RAG Agent
agent = orchestrator.route(user_query)
# 3. RAG Agent 检索相关文档
retrieved_docs = rag_agent.retrieve(user_query, top_k=5)
# 4. 构建 Prompt
context = "\n\n".join([doc.content for doc in retrieved_docs])
prompt = f"""基于以下信息回答问题:
{context}
问题:{user_query}
请提供准确的答案,并标注信息来源。"""
# 5. LLM 生成答案
answer = llm.generate(prompt)
# 6. 返回结构化响应
response = {
"answer": answer,
"sources": retrieved_docs,
"confidence": calculate_confidence(retrieved_docs)
}
接口设计:
# api/routes.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI(title="企业智能问答系统")
class QueryRequest(BaseModel):
question: str = Field(..., min_length=1, max_length=500)
conversation_id: Optional[str] = None
user_id: str
class QueryResponse(BaseModel):
answer: str
sources: List[dict]
conversation_id: str
confidence: float
tokens_used: int
@app.post("/api/query", response_model=QueryResponse)
async def query(request: QueryRequest):
# 1. 检查限流
if not rate_limiter.check(request.user_id):
raise HTTPException(429, "请求过于频繁")
# 2. 获取或创建会话
conversation_id = request.conversation_id or generate_id()
conversation = conversation_manager.get(conversation_id)
# 3. 执行查询
try:
response = orchestrator.query(
question=request.question,
conversation=conversation,
user_id=request.user_id
)
# 4. 更新对话历史
conversation.add_message(
role="user",
content=request.question
)
conversation.add_message(
role="assistant",
content=response.answer
)
return QueryResponse(
answer=response.answer,
sources=response.sources,
conversation_id=conversation_id,
confidence=response.confidence,
tokens_used=response.tokens_used
)
except Exception as e:
# 降级处理
log.error(f"查询失败: {e}")
return fallback_query(request.question)
评估:
# evaluation/qa_eval.py
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevance,
context_precision
)
def build_golden_set():
"""构建评估集"""
return [
{
"question": "公司年假政策是怎样的?",
"ground_truth": "员工每年享有5天年假,满1年增加1天,最多15天",
"contexts": ["HR文档/年假政策.txt"],
"metadata": {"category": "HR", "difficulty": "easy"}
},
# ... 更多测试用例
]
def evaluate_qa_system():
"""评估问答系统"""
# 1. 加载 Golden Set
golden_set = build_golden_set()
# 2. 运行测试
results = []
for test_case in golden_set:
response = orchestrator.query(test_case["question"])
results.append({
"question": test_case["question"],
"ground_truth": test_case["ground_truth"],
"answer": response.answer,
"contexts": [doc.content for doc in response.sources]
})
# 3. 计算 RAGAS 指标
metrics = evaluate(
dataset=results,
metrics=[
faithfulness,
answer_relevance,
context_precision
]
)
return metrics
# 离线评估
metrics = evaluate_qa_system()
print(f"Faithfulness: {metrics['faithfulness']:.3f}")
print(f"Answer Relevance: {metrics['answer_relevance']:.3f}")
上线:
# 1. 灰度发布(5% 流量)
kubectl patch deployment qa-agent \
-p '{"spec":{"template":{"spec":{"containers":[{"name":"qa-agent","env":[{"name":"TRAFFIC_RATIO","value":"0.05"}]}}]}}}'
# 2. 监控指标
watch kubectl get pods -l app=qa-agent
# 3. 检查错误日志
kubectl logs -l app=qa-agent --tail=100 | grep ERROR
# 4. 观察用户反馈
# 查看 analytics dashboard
# 5. 逐步放量
# 10% -> 25% -> 50% -> 100%
监控:
# monitoring/prometheus.yml
apiVersion: v1
kind: ConfigMap
metadata:
name: qa-agent-metrics
data:
metrics.yaml: |
# QPS
metric: qa_agent_requests_total
labels: [agent_type, user_type]
# 延迟
metric: qa_agent_request_duration_seconds
labels: [agent_type, operation]
# 错误率
metric: qa_agent_errors_total
labels: [agent_type, error_type]
# Token 使用
metric: qa_agent_tokens_used_total
labels: [model_name]
# 缓存命中率
metric: qa_agent_cache_hit_rate
# RAG 指标
metric: qa_agent_rag_retrieved_docs
metric: qa_agent_rag_context_precision
回滚:
# 检测到问题时立即回滚
kubectl rollout undo deployment/qa-agent
# 或使用 GitOps
git revert HEAD
git push origin main
# ArgoCD 自动回滚
模块二:RAG 与检索优化
面试问答
Q6: 如何提升 RAG 的召回率?
标准回答:
RAG 召回率提升策略
【召回率低的原因分析】
原因 | 诊断方法 | 解决方案
------------------------|----------------------|----------------------
查询和文档语义不匹配 | 分析检索结果 | 查询扩展
嵌入模型效果差 | 对比不同模型 | 换更好的模型
切分粒度不当 | 查看检索到的 chunk | 调整切分策略
元数据过滤过严 | 检查过滤条件 | 放宽过滤
索引未更新 | 检查索引时间戳 | 实时索引
查询表述不清 | 分析失败查询 | 查询重写
【1. 查询扩展】
class QueryExpander:
def __init__(self, llm, embeddings):
self.llm = llm
self.embeddings = embeddings
def expand(self, query: str, n: int = 3) -> List[str]:
"""生成相关查询"""
prompt = f"""为以下问题生成 {n} 个相关的查询变体:
原问题:{query}
要求:
1. 与原问题语义相同
2. 使用不同的表述方式
3. 包含相关的同义词或关联词
请只输出查询,每行一个:"""
related_queries = self.llm.generate(prompt).strip().split('\n')
return [query] + related_queries[:n]
def expand_with_synonyms(self, query: str) -> List[str]:
"""同义词扩展"""
# 使用同义词库
synonyms = self._get_synonyms(query)
expanded = [query]
for synonym in synonyms:
expanded.append(query.replace(synonym.original, synonym.word))
return list(set(expanded))
# 使用
expander = QueryExpander(llm, embeddings)
queries = expander.expand("Python 并发编程", n=3)
" -> [
"Python 并发编程",
"Python 多线程编程",
"Python 异步编程",
"Python concurrency"
]
【2. 混合检索】
class HybridRetriever:
def __init__(
self,
vector_store,
keyword_store,
alpha: float = 0.7
):
self.vector_store = vector_store
self.keyword_store = keyword_store
self.alpha = alpha # 向量权重
def retrieve(
self,
query: str,
top_k: int = 10
) -> List[RetrievedResult]:
"""混合检索"""
# 1. 向量检索
vector_results = self.vector_store.search(query, top_k=top_k * 2)
# 归一化分数
for r in vector_results:
r.score = self._normalize_vector_score(r.score)
# 2. 关键词检索
keyword_results = self.keyword_store.search(query, top_k=top_k * 2)
for r in keyword_results:
r.score = self._normalize_keyword_score(r.score)
# 3. 合并打分
merged = self._merge_and_rerank(
vector_results,
keyword_results,
alpha=self.alpha
)
# 4. 返回 top_k
return merged[:top_k]
def _merge_and_rerank(
self,
vector_results: List[Result],
keyword_results: List[Result],
alpha: float
) -> List[Result]:
"""合并并重新打分"""
# 创建文档 -> 分数映射
doc_scores = {}
# 向量分数
for r in vector_results:
if r.doc_id not in doc_scores:
doc_scores[r.doc_id] = {"vector": 0, "keyword": 0}
doc_scores[r.doc_id]["vector"] = r.score
# 关键词分数
for r in keyword_results:
if r.doc_id not in doc_scores:
doc_scores[r.doc_id] = {"vector": 0, "keyword": 0}
doc_scores[r.doc_id]["keyword"] = r.score
# 混合打分
merged = []
for doc_id, scores in doc_scores.items():
hybrid_score = (
alpha * scores["vector"] +
(1 - alpha) * scores["keyword"]
)
merged.append(Result(
doc_id=doc_id,
score=hybrid_score
))
# 按混合分数排序
return sorted(merged, key=lambda x: x.score, reverse=True)
【3. 嵌入模型调优】
# 对比不同嵌入模型
embedding_models = {
"bge-small": "BAAI/bge-small-zh-v1.5",
"bge-large": "BAAI/bge-large-zh-v1.5",
"jina-small": "jina-embeddings-v2-small-zh",
"m3e": "moka-ai/m3e-base"
}
def evaluate_embeddings(models: Dict, test_set: List):
"""评估不同嵌入模型"""
results = {}
for name, model_name in models.items():
embedder = EmbeddingModel(model_name)
vector_store = VectorStore(embedder=embedder)
# 测试
metrics = evaluate_rag(vector_store, test_set)
results[name] = metrics
return results
# 领域微调
def fine_tune_embedder(
base_model: str,
domain_data: List[Pair]
):
"""领域数据微调嵌入模型"""
from sentence_transformers import SentenceTransformer, InputExample
model = SentenceTransformer(base_model)
# 准备训练数据
train_examples = [
InputExample(texts=[query, doc], label=1.0)
for query, doc in domain_data
]
# 训练
train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=16)
train_loss = losses.CosineSimilarityLoss(model=model)
model.fit(
train_objectives=[(train_dataloader, train_loss)],
epochs=5,
warmup_steps=100
)
return model
【4. Chunking 策略优化】
class AdaptiveChunker:
def __init__(self, embedder):
self.embedder = embedder
def chunk(self, text: str, max_tokens: int = 500) -> List[str]:
"""自适应切分"""
# 1. 检测文档类型
doc_type = self._detect_document_type(text)
# 2. 根据类型选择策略
if doc_type == "markdown":
return self._chunk_markdown(text, max_tokens)
elif doc_type == "code":
return self._chunk_code(text, max_tokens)
else:
return self._chunk_semantic(text, max_tokens)
def _chunk_semantic(
self,
text: str,
max_tokens: int
) -> List[str]:
"""语义切分:在句子边界切分"""
sentences = self._split_sentences(text)
chunks = []
current_chunk = []
current_tokens = 0
for sentence in sentences:
sentence_tokens = self._estimate_tokens(sentence)
if current_tokens + sentence_tokens <= max_tokens:
current_chunk.append(sentence)
current_tokens += sentence_tokens
else:
if current_chunk:
chunks.append("".join(current_chunk))
current_chunk = [sentence]
current_tokens = sentence_tokens
if current_chunk:
chunks.append("".join(current_chunk))
return chunks
def _detect_document_type(self, text: str) -> str:
"""检测文档类型"""
if text.startswith("# ") or "## " in text[:500]:
return "markdown"
elif text.strip().startswith(("def ", "class ", "import ")):
return "code"
else:
return "plain"
【5. 多路召回 + 重排】
class MultiPathRetriever:
def __init__(self, retrievers: List[Retriever], reranker):
self.retrievers = retrievers
self.reranker = reranker
def retrieve(self, query: str, top_k: int = 10) -> List[Result]:
"""多路召回"""
all_results = []
retriever.execute(1)
# 并行召回
with ThreadPoolExecutor(max_workers=len(self.retrievers)) as executor:
futures = [
executor.submit(r.retrieve, query, top_k * 2)
for r in self.retrievers
]
for future in futures:
all_results.extend(future.result())
retriever.end()
# 去重
deduped = self._deduplicate(all_results)
# 重排
reranked = self.reranker.rerank(query, deduped)
return reranked[:top_k]
【召回率提升 Checklist】
提升手段 | 优先级 | 预期效果
----------------------|--------|----------
查询扩展 | 高 | +15-30%
混合检索 | 高 | +20-40%
更好的嵌入模型 | 高 | +10-25%
优化切分策略 | 中 | +10-20%
Re-ranking | 中 | +5-15%
领域微调 | 低 | +5-10%
记忆要点:
召回提升口诀:
先扩展查询,再混合检索
嵌入选好的,切分要精细
多路并行召,重排精度提
Checklist:
- 查询扩展:同义词 + LLM 生成
- 混合检索:向量 + 关键词
- 嵌入模型:bge-large > bge-small
- 切分策略:语义 > 固定
- 重排:Cross-encoder 必需
Q7: RAG 的上下文窗口管理怎么做?
标准回答:
RAG 上下文窗口管理
【问题背景】
LLM 上下文窗口有限:
- GPT-3.5: 4K / 16K tokens
- GPT-4: 8K / 32K tokens
- Claude: 100K / 200K tokens
检索到的文档可能远超限制
【1. 截断策略】
class TruncationStrategy:
def truncate_by_score(
self,
results: List[RetrievedResult],
max_tokens: int
) -> List[RetrievedResult]:
"""按分数截断:保留高分文档"""
selected = []
total_tokens = 0
for result in sorted(results, key=lambda r: r.score, reverse=True):
tokens = self._estimate_tokens(result.content)
if total_tokens + tokens <= max_tokens:
selected.append(result)
total_tokens += tokens
else:
break
return selected
def truncate_by_recency(
self,
results: List[RetrievedResult],
max_tokens: int,
recency_bonus: float = 0.1
) -> List[RetrievedResult]:
"""按时效性截断:新文档优先"""
now = datetime.now()
def recency_score(result: RetrievedResult) -> float:
age_days = (now - result.timestamp).days
return result.score * (1 - min(age_days * recency_bonus, 0.5))
scored = sorted(results, key=recency_score, reverse=True)
return self.truncate_by_score(scored, max_tokens)
def truncate_with_overlap(
self,
results: List[RetrievedResult],
max_tokens: int,
overlap_tokens: int = 100
) -> List[RetrievedResult]:
"""保留部分内容"""
selected = []
total_tokens = 0
for result in results:
tokens = self._estimate_tokens(result.content)
if total_tokens + tokens <= max_tokens:
selected.append(result)
total_tokens += tokens
else:
# 剩余空间有限,保留最后 N tokens
remaining = max_tokens - total_tokens
if remaining > overlap_tokens:
truncated = result.content[
-self._tokens_to_chars(remaining):
]
selected.append(RetrievedResult(
content=truncated,
score=result.score
))
break
return selected
【2. 压缩策略】
class ContextCompressor:
def __init__(self, llm):
self.llm = llm
def compress_documents(
self,
documents: List[str],
target_tokens: int
) -> List[str]:
"""压缩文档集合"""
# 计算当前 tokens
current_tokens = sum(self._estimate_tokens(d) for d in documents)
if current_tokens <= target_tokens:
return documents
# 需要压缩
compression_ratio = target_tokens / current_tokens
if compression_ratio > 0.5:
# 轻度压缩:每篇文档压缩摘要
return self._summarize_each(documents)
else:
# 重度压缩:合并后统一压缩
return self._summarize_all(documents)
def _summarize_each(self, documents: List[str]) -> List[str]:
"""每篇文档生成摘要"""
summaries = []
for doc in documents:
summary = self.llm.generate(
f"用最简练的语言总结以下内容:\n{doc}"
)
summaries.append(summary)
return summaries
def _summarize_all(self, documents: List[str]) -> List[str]:
"""合并后生成摘要"""
combined = "\n\n".join(documents)
summary = self.llm.generate(
f"总结以下内容的关键信息:\n{combined}"
)
return [summary]
【3. 智能选择策略】
class SmartContextSelector:
def __init__(self, llm):
self.llm = llm
def select_context(
self,
query: str,
candidates: List[RetrievedResult],
max_tokens: int
) -> List[RetrievedResult]:
"""智能选择:用 LLM 判断重要性"""
# 先按分数预筛选
preselected = sorted(candidates, key=lambda r: r.score, reverse=True)[:max_tokens * 2]
# 构建 Prompt
prompt = self._build_selection_prompt(query, preselected, max_tokens)
# LLM 选择
response = self.llm.generate(prompt)
selected_indices = self._parse_selection(response)
return [preselected[i] for i in selected_indices if i < len(preselected)]
def _build_selection_prompt(
self,
query: str,
candidates: List[RetrievedResult],
max_tokens: int
) -> str:
candidates_text = "\n".join([
f"{i}. {c.content[:200]}..."
for i, c in enumerate(candidates)
])
return f"""用户问题:{query}
候选文档({len(candidates)} 篇):
{candidates_text}
请选择最能回答问题的文档编号。
总 token 预算:{max_tokens}
请输出选择的文档编号,用逗号分隔。"""
【4. 分片策略】
class ChunkedContextManager:
def __init__(self, max_context_tokens: int = 4000):
self.max_tokens = max_context_tokens
def generate_chunks(
self,
query: str,
documents: List[RetrievedResult]
) -> List[str]:
"""生成上下文分片"""
# 计算单次能容纳的文档数
avg_doc_tokens = sum(self._estimate_tokens(d.content) for d in documents) / len(documents)
docs_per_chunk = int(self.max_tokens / avg_doc_tokens * 0.8)
# 分片
chunks = []
for i in range(0, len(documents), docs_per_chunk):
chunk_docs = documents[i:i + docs_per_chunk]
context = "\n\n".join([d.content for d in chunk_docs])
chunks.append(context)
return chunks
def process_with_chunks(
self,
query: str,
documents: List[RetrievedResult]
) -> str:
"""分片处理并合并结果"""
chunks = self.generate_chunks(query, documents)
results = []
for i, chunk in enumerate(chunks):
prompt = f"""基于以下信息回答问题:
{chunk}
问题:{query}
{('如果是第一部分,请开始回答。' if i == 0 else '请继续完成回答。')}"""
result = self.llm.generate(prompt)
results.append(result)
# 合并结果
return " ".join(results)
【上下文窗口管理 Checklist】
策略 | 适用场景 | 优缺点
------------------|----------------------|----------
按分数截断 | 通用场景 | 简单,可能丢失重要信息
按时效截断 | 时效性敏感 | 新信息优先
保留部分内容 | 长文档 | 保留上下文
每篇压缩 | 中等压缩 | 保持结构
统一压缩 | 重度压缩 | 简洁但丢失细节
LLM 智能选择 | 复杂推理 | 准确但成本高
分片处理 | 超长上下文 | 支持无限长度
记忆要点:
窗口管理四策:
1. 截断:按分数/时效/重叠
2. 压缩:每篇摘要/统一压缩
3. 智选:LLM 判断重要性
4. 分片:分次处理合并
选择决策:
简单截断 → 通用
需要精度 → LLM 智选
超长内容 → 分片处理
实战场景:技术文档问答系统
需求:
为技术团队搭建一个智能文档问答系统:
- 源数据:Git 仓库、Wiki、API 文档
- 准确性高,必须引用原始文档
- 支持代码示例查询
- 实时索引更新
架构设计:
数据源:
├── GitLab 仓库
│ ├── 代码文件
│ └── README/注释
├── Confluence Wiki
│ └── 文档页面
└── API 文档(Swagger/OpenAPI)
ETL 流程:
├── Git webhook → 触发索引
├── 数据提取
├── 代码分块(函数/类级别)
├── 文档分块(章节/段落)
├── 嵌入
└── 向量库更新
检索流程:
├── 意图识别
│ ├── 代码查询 → Code Agent
│ └── 文档查询 → RAG Agent
├── 混合检索
│ ├── 代码语义检索
│ └── 关键词检索
├── Re-ranking
└── 返回答案 + 引用
数据流:
# etl/git_indexer.py
class GitIndexer:
def __init__(self, vector_store, code_embedder):
self.vector_store = vector_store
self.code_embedder = code_embedder
def index_repository(self, repo_url: str, branch: str = "main"):
"""索引 Git 仓库"""
# 1. Clone 仓库
repo_dir = self._clone_repo(repo_url, branch)
# 2. 遍历文件
for file_path in self._walk_files(repo_dir):
if file_path.endswith(('.py', '.js', '.java', '.go')):
self._index_code_file(file_path)
elif file_path.endswith(('.md', '.txt', '.rst')):
self._index_doc_file(file_path)
def _index_code_file(self, file_path: str):
"""索引代码文件"""
# 解析 AST
ast = self._parse_ast(file_path)
# 提取函数/类
for function in ast.functions:
# 生成文档字符串
doc = f"""函数:{function.name}
参数:{', '.join(function.params)}
描述:{function.docstring}
代码:{function.code}"""
# 嵌入
embedding = self.code_embedder.embed(doc)
# 存储
self.vector_store.add(
id=f"{file_path}:{function.name}",
vector=embedding,
metadata={
"file_path": file_path,
"type": "function",
"name": function.name,
"line": function.line_number
}
)
# retrieval/code_retriever.py
class CodeRetriever:
def retrieve(
self,
query: str,
language: Optional[str] = None,
top_k: int = 5
) -> List[CodeResult]:
"""检索代码"""
# 1. 嵌入查询
query_embedding = self.code_embedder.embed(query)
# 2. 构建过滤条件
filters = {}
if language:
filters["language"] = language
# 3. 向量搜索
results = self.vector_store.search(
query_vector=query_embedding,
top_k=top_k * 2,
filters=filters
)
# 4. 重排
reranked = self.code_reranker.rerank(query, results)
return reranked[:top_k]
完整 Demo 代码详见技术文档问答系统目录。
模块三 - 八(简要)
由于篇幅限制,以下为剩余模块的要点摘要。
模块三:工作流编排
Q: LangGraph 和传统 Chain 的区别?
LangGraph 优势:
- 图结构:支持复杂依赖
- 状态机:状态明确
- 可视化:工作流图
- 并行:天然支持
传统 Chain:
- 线性:顺序执行
- 简单:适合简单任务
模块四:工具接入与权限控制
Q: 如何防止 Agent 调用危险工具?
防御措施:
1. 工具白名单
2. 参数校验(Pydantic)
3. 权限检查(RBAC)
4. 审计日志
5. 输出过滤(PII 脱敏)
模块五:Prompt 工程与约束输出
Q: 如何保证 LLM 输出符合预期格式?
约束方法:
1. JSON Mode(强制 JSON)
2. Function Calling(自动格式)
3. Pydantic 验证(类型检查)
4. Few-shot(示例引导)
5. 重试机制(失败重试)
模块六:评估与回归测试
Q: 如何构建 RAG 评估的 Golden Set?
构建步骤:
1. 收集真实查询
2. 人工标注标准答案
3. 标记相关文档
4. 难度分级
5. 持续维护
规模:
- 开发:50-100 对
- 测试:200-500 对
- 生产:1000+ 对
模块七:安全与风控
Q: 如何防御 Prompt Injection 攻击?
防御措施:
1. 输入验证(检测危险关键词)
2. 分隔符隔离用户输入
3. System Prompt 强化
4. 输出过滤
5. 审计监控
模块八:性能与成本
Q: 如何降低 LLM 调用成本?
成本优化:
1. 缓存(相同Redis缓存)
2. 模型路由(简单用小模型)
3. Token 优化(精简 Prompt)
4. 批量处理(合并请求)
5. 异步处理(非核心异步)
6. 本地部署(高频用本地)
记忆要点总表
| 模块 | 核心口诀 | 面试重点 |
|---|---|---|
| Agent 基础 | ReAct 实时、Plan 可复用 | 模式选择、Memory 设计 |
| RAG | 查询扩展、混合检索、Re-ranking | 召回率、上下文管理 |
| 工作流 | 图结构、状态机 | LangGraph vs Chain |
| 工具 | 白名单、权限控制、审计 | 安全防御、参数校验 |
| Prompt | JSON Mode、Pydantic 验证 | 输出约束 |
| 评估 | Golden Set、RAGAS 指标 | 离线/在线评估 |
| 安全 | 输入验证、输出过滤 | Prompt Injection 防御 |
| 性能 | 缓存、模型路由、批量 | 成本优化、延迟控制 |
文档版本: 1.0
最后更新: 2026-01-19