内容纲要
上下文工程与压缩
目录
1. 上下文工程概述
1.1 什么是上下文工程
上下文工程是指管理传递给LLM的所有信息,包括:
- 系统提示(System Prompt)
- 对话历史
- 检索到的文档
- 工具调用结果
- 用户输入
1.2 上下文限制
| 模型 | 上下文窗口 | 推荐安全使用 |
|---|---|---|
| GPT-3.5-Turbo | 16K | 12K |
| GPT-4 | 8K | 6K |
| GPT-4-Turbo | 128K | 100K |
| GPT-4o | 128K | 100K |
| Claude 3 Haiku | 200K | 150K |
| Claude 3 Sonnet | 200K | 150K |
| Claude 3 Opus | 200K | 150K |
1.3 上下文工程的重要性
┌─────────────────────────────────────────────────┐
│ 上下文工程的价值 │
├─────────────────────────────────────────────────┤
│ ✓ 确保关键信息不被截断 │
│ ✓ 控制Token使用成本 │
│ ✓ 提升响应质量(减少噪音) │
│ ✓ 保持上下文连贯性 │
│ ✓ 支持长文档处理 │
└─────────────────────────────────────────────────┘
2. 上下文窗口管理
2.1 基本概念
from typing import List, Dict, Optional
from dataclasses import dataclass
import tiktoken
@dataclass
class ContextWindow:
model: str
max_tokens: int
current_tokens: int = 0
def remaining(self) -> int:
return self.max_tokens - self.current_tokens
def can_fit(self, text: str) -> bool:
tokens = count_tokens(text, self.model)
return self.current_tokens + tokens <= self.max_tokens
def count_tokens(text: str, model: str = "gpt-4") -> int:
"""计算文本的Token数量"""
try:
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
except KeyError:
# 降级到 cl100k_base (GPT-4/Turbo的编码)
encoding = tiktoken.get_encoding("cl100k_base")
return len(encoding.encode(text))
2.2 对话历史管理
from typing import List
from dataclasses import dataclass
@dataclass
class Message:
role: str # "system", "user", "assistant", "tool"
content: str
tokens: int = 0
class ConversationManager:
def __init__(
self,
model: str = "gpt-4",
max_tokens: int = 8000,
system_prompt: Optional[str] = None
):
self.model = model
self.window = ContextWindow(model=model, max_tokens=max_tokens)
self.system_prompt = system_prompt
self.messages: List[Message] = []
# 计算系统提示占用
if system_prompt:
sys_tokens = count_tokens(system_prompt, model)
self.window.current_tokens = sys_tokens
def add_message(self, role: str, content: str) -> bool:
"""添加消息,返回是否成功"""
tokens = count_tokens(content, self.model)
if not self.window.can_fit(content):
# 尝试压缩历史
compressed = self._compress_history()
if not self.window.can_fit(content):
return False
message = Message(role=role, content=content, tokens=tokens)
self.messages.append(message)
self.window.current_tokens += tokens
return True
def _compress_history(self) -> bool:
"""压缩对话历史"""
# 策略1:删除最旧的非系统消息
# 策略2:摘要长消息
# 策略3:滑动窗口
pass
def get_messages(self) -> List[Dict]:
"""获取LLM格式的消息列表"""
result = []
if self.system_prompt:
result.append({"role": "system", "content": self.system_prompt})
for msg in self.messages:
result.append({"role": msg.role, "content": msg.content})
return result
def summarize(self) -> str:
"""摘要对话历史"""
pass
2.3 滑动窗口策略
class SlidingWindowManager:
def __init__(
self,
max_messages: int = 10,
keep_system: bool = True,
keep_first_user: bool = True
):
self.max_messages = max_messages
self.keep_system = keep_system
self.keep_first_user = keep_first_user
self.messages: List[Message] = []
def add_message(self, message: Message):
self.messages.append(message)
# 限制消息数量
if len(self.messages) > self.max_messages:
self._prune()
def _prune(self):
"""修剪消息,保留关键消息"""
# 保留系统消息
system_msgs = [m for m in self.messages if m.role == "system"]
# 保留第一条用户消息(通常是初始请求)
first_user = None
if self.keep_first_user:
for m in self.messages:
if m.role == "user":
first_user = m
break
# 保留最近的N条消息
recent_count = self.max_messages - len(system_msgs) - (1 if first_user else 0)
recent = self.messages[-recent_count:] if recent_count > 0 else []
# 重建消息列表
self.messages = []
self.messages.extend(system_msgs)
if first_user:
self.messages.append(first_user)
self.messages.extend(recent)
3. 上下文压缩策略
3.1 压缩策略概览
| 策略 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 摘要压缩 | 长文档、长对话 | 大幅减少token | 可能丢失细节 |
| 关键词提取 | 文档检索 | 保留关键信息 | 可能丢失上下文 |
| 层级压缩 | 结构化文档 | 保留结构关系 | 实现复杂 |
| 智能采样 | 检索结果 | 保留多样信息 | 需要相似度分数 |
| 实体提取 | 信息密集文本 | 保留事实信息 | 丢失语言细节 |
3.2 摘要压缩
import openai
class SummarizationCompressor:
def __init__(self, llm_client=None):
self.llm = llm_client or openai.ChatCompletion
async def compress(
self,
text: str,
target_ratio: float = 0.3,
max_tokens: int = 1000
) -> str:
"""
压缩文本到目标比例
Args:
text: 原始文本
target_ratio: 目标压缩比例(0.3 = 30%)
max_tokens: 最大输出token数
"""
original_tokens = count_tokens(text)
target_tokens = int(original_tokens * target_ratio)
prompt = f"""请将以下文本压缩,保留核心信息和主要观点。
目标长度:约 {target_tokens} tokens
原始长度:{original_tokens} tokens
文本:
{text}
请输出压缩后的版本:"""
response = await self.llm.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
temperature=0.3 # 低温度确保忠实度
)
return response.choices[0].message.content
async def compress_messages(
self,
messages: List[Message],
summary_threshold: int = 10
) -> List[Message]:
"""
压缩消息列表
将旧消息摘要为一总结消息
"""
if len(messages) < summary_threshold:
return messages
# 分离旧消息和最近消息
old_messages = messages[:-5]
recent_messages = messages[-5:]
# 摘要旧消息
old_text = "\n".join([
f"{m.role}: {m.content}" for m in old_messages
])
summary = await self.compress(old_text)
# 返回摘要 + 最近消息
result = [Message(role="system", content=summary)]
result.extend(recent_messages)
return result
3.3 关键词提取
import re
from collections import Counter
class KeywordExtractor:
def __init__(self):
# 中英文停用词
self.stopwords = {
"的", "了", "是", "在", "我", "有", "和", "就", "不", "人",
"都", "一", "一个", "上", "也", "很", "到", "说", "要", "去",
"你", "会", "着", "没有", "看", "好", "自己", "这", "the", "a",
"an", "is", "are", "was", "were", "be", "been", "being",
"have", "has", "had", "do", "does", "did", "will", "would"
}
def extract(
self,
text: str,
top_k: int = 20,
min_length: int = 2
) -> List[str]:
"""
提取关键词
Args:
text: 输入文本
top_k: 返回前K个关键词
min_length: 最小词长
"""
# 简单分词(实际可用更复杂的NLP工具)
words = re.findall(r'\b\w+\b', text.lower())
# 过滤停用词
words = [w for w in words if w not in self.stopwords and len(w) >= min_length]
# 统计词频
counter = Counter(words)
# 返回top_k
return [word for word, _ in counter.most_common(top_k)]
def extract_with_positions(
self,
text: str,
top_k: int = 10
) -> List[Dict]:
"""提取关键词及其位置"""
keywords = self.extract(text, top_k * 2) # 多提取一些
result = []
for keyword in keywords[:top_k]:
# 查找关键词出现的位置
positions = []
for match in re.finditer(keyword, text, re.IGNORECASE):
positions.append(match.start())
if positions:
result.append({
"keyword": keyword,
"positions": positions,
"count": len(positions)
})
return sorted(result, key=lambda x: x["count"], reverse=True)
def build_context(self, text: str, keywords: List[str]) -> str:
"""
基于关键词构建上下文
保留关键词所在句子
"""
sentences = re.split(r'[.!?。!?\n]', text)
# 筛选包含关键词的句子
keyword_set = set(k.lower() for k in keywords)
selected = []
for sentence in sentences:
sentence_lower = sentence.lower()
if any(keyword in sentence_lower for keyword in keyword_set):
selected.append(sentence.strip())
return " ".join(selected)
3.4 层级压缩
from typing import Dict, List
class HierarchicalCompressor:
"""层级压缩 - 适用于Markdown/结构化文档"""
def compress(
self,
text: str,
max_tokens: int,
model: str = "gpt-4"
) -> Dict:
"""
层级压缩文档
1. 解析文档结构
2. 压缩低层级内容
3. 保留高层级结构
"""
# 解析为层级结构
structure = self._parse_structure(text)
# 递归压缩
compressed = self._compress_structure(structure, max_tokens, model)
return {
"summary": compressed["summary"],
"structure": compressed["structure"],
"tokens": self._count_tokens_compressed(compressed["structure"])
}
def _parse_structure(self, text: str) -> List[Dict]:
"""解析文档为层级结构"""
import re
# 简单Markdown解析
sections = []
current_section = {
"level": 0,
"title": "Root",
"content": "",
"children": []
}
lines = text.split('\n')
i = 0
while i < len(lines):
line = lines[i]
# 检测标题
header_match = re.match(r'^(#{1,6})\s+(.+)$', line)
if header_match:
level = len(header_match.group(1))
title = header_match.group(2)
# 如果是更低级的标题,保存当前section
sections.append(current_section)
current_section = {
"level": level,
"title": title,
"content": "",
"children": []
}
else:
current_section["content"] += line + "\n"
i += 1
sections.append(current_section)
return sections
def _compress_structure(
self,
structure: List[Dict],
max_tokens: int,
model: str
) -> Dict:
"""递归压缩结构"""
total_tokens = 0
compressed_structure = []
# 从高优先级(低level)开始
for section in sorted(structure, key=lambda x: x["level"]):
section_tokens = count_tokens(section["content"], model)
if total_tokens + section_tokens <= max_tokens:
# 完整保留
compressed_structure.append({
"title": section["title"],
"content": section["content"],
"compressed": False
})
total_tokens += section_tokens
else:
# 压缩内容
compressed_content = self._compress_content(
section["content"],
max_tokens - total_tokens,
model
)
compressed_structure.append({
"title": section["title"],
"content": compressed_content,
"compressed": True
})
break
return {
"structure": compressed_structure,
"summary": self._generate_summary(compressed_structure)
}
def _compress_content(self, content: str, max_tokens: int, model: str) -> str:
"""压缩单个section"""
# 简单截断或使用LLM摘要
if count_tokens(content, model) <= max_tokens:
return content
# 策略:保留开头和结尾
tokens_per_side = max_tokens // 2
# 按tokens分割
encoder = tiktoken.encoding_for_model(model)
tokens = encoder.encode(content)
mid = len(tokens) // 2
start = max(0, mid - tokens_per_side)
end = min(len(tokens), mid + tokens_per_side)
compressed_tokens = tokens[start:end]
return encoder.decode(compressed_tokens)
def _generate_summary(self, structure: List[Dict]) -> str:
"""生成结构摘要"""
summary = []
for section in structure:
flag = "[已压缩]" if section["compressed"] else "[完整]"
summary.append(f"{flag} {section['title']}")
return "\n".join(summary)
def _count_tokens_compressed(self, structure: List[Dict], model: str = "gpt-4") -> int:
"""计算压缩后的token数"""
return sum(count_tokens(s["content"], model) for s in structure)
3.5 智能采样压缩
class IntelligentSamplingCompressor:
"""智能采样 - 基于分数和多样性"""
def compress(
self,
results: List[Dict],
max_items: int,
diversity_factor: float = 0.3
) -> List[Dict]:
"""
智能采样检索结果
Args:
results: 检索结果,每个包含 {"content": str, "score": float, "metadata": dict}
max_items: 最大保留数量
diversity_factor: 多样性因子,越高越注重多样性
"""
if len(results) <= max_items:
return results
# 1. 按分数排序
scored = sorted(results, key=lambda x: x["score"], reverse=True)
# 2. 计算多样性
diversity_count = int(max_items * diversity_factor)
top_count = max_items - diversity_count
# 3. 选取高分结果
top_results = scored[:top_count]
# 4. 从剩余中选取多样性样本
remaining = scored[top_count:]
diverse_results = self._select_diverse(
remaining,
diversity_count,
top_results
)
# 5. 合并
return top_results + diverse_results
def _select_diverse(
self,
candidates: List[Dict],
count: int,
existing: List[Dict]
) -> List[Dict]:
"""选择多样性样本"""
if len(candidates) <= count:
return candidates
selected = []
existing_texts = [r["content"] for r in existing]
for _ in range(count):
if not candidates:
break
# 计算每个候选与已选的相似度
选_index = self._select_least_similar(
candidates,
existing_texts + [r["content"] for r in selected]
)
selected.append(candidates.pop(选_index))
return selected
def _select_least_similar(
self,
candidates: List[Dict],
existing: List[str]
) -> int:
"""选择与现有样本最不相似的候选"""
min_similarity = float('inf')
best_index = 0
for i, candidate in enumerate(candidates):
similarity = self._compute_similarity(candidate["content"], existing)
if similarity < min_similarity:
min_similarity = similarity
best_index = i
return best_index
def _compute_similarity(self, text: str, existing_texts: List[str]) -> float:
"""计算文本与现有文本的相似度"""
# 简单实现:使用Jaccard相似度
text_words = set(text.lower().split())
max_sim = 0
for existing in existing_texts:
existing_words = set(existing.lower().split())
intersection = len(text_words & existing_words)
union = len(text_words | existing_words)
sim = intersection / union if union > 0 else 0
max_sim = max(max_sim, sim)
return max_sim
4. 长上下文处理
4.1 分块处理
from typing import Generator, List
class LongContextHandler:
def __init__(self, model: str = "gpt-4"):
self.model = model
self.max_tokens = self._get_model_limit(model)
def _get_model_limit(self, model: str) -> int:
limits = {
"gpt-4": 8000,
"gpt-4-turbo": 128000,
"gpt-4o": 128000,
"gpt-3.5-turbo": 16000,
"claude-3-opus": 200000,
}
return limits.get(model, 8000)
def split_into_chunks(
self,
text: str,
chunk_size: Optional[int] = None,
overlap: int = 200
) -> Generator[str, None, None]:
"""
将长文本分割为适合模型的chunks
Args:
text: 输入文本
chunk_size: 每块token数,默认为模型限制的80%
overlap: 重叠token数
"""
if chunk_size is None:
chunk_size = int(self.max_tokens * 0.8)
encoder = tiktoken.encoding_for_model(self.model)
tokens = encoder.encode(text)
start = 0
while start < len(tokens):
end = min(start + chunk_size, len(tokens))
chunk_tokens = tokens[start:end]
yield encoder.decode(chunk_tokens)
start = end - overlap
def process_long_document(
self,
document: str,
query: str,
process_func: callable
) -> str:
"""
处理长文档
策略:
1. 将文档分块
2. 每块独立处理
3. 合并结果
"""
chunks = list(self.split_into_chunks(document))
results = []
for i, chunk in enumerate(chunks):
# 为每个chunk添加上下文信息
context_info = f"[文档片段 {i+1}/{len(chunks)}]"
enriched_chunk = f"{context_info}\n\n{chunk}"
result = process_func(query, enriched_chunk)
results.append(result)
# 合并结果(可以进一步摘要)
return "\n\n---\n\n".join(results)
4.2 递归RAG
class RecursiveRAG:
"""
递进式RAG处理长文档
1. 先检索文档索引/摘要
2. 确定需要哪些文档
3. 检索完整文档内容
4. 最终生成
"""
def __init__(
self,
index_retriever, # 检索索引/摘要的retriever
document_retriever, # 检索完整文档的retriever
llm
):
self.index_retriever = index_retriever
self.document_retriever = document_retriever
self.llm = llm
def query(self, query: str) -> Dict:
# Step 1: 检索相关文档索引
index_results = self.index_retriever.retrieve(query, top_k=10)
# Step 2: 确定需要哪些完整文档
document_ids = self._select_documents(query, index_results)
# Step 3: 检索完整内容
documents = self.document_retriever.retrieve_by_ids(document_ids)
# Step 4: 生成回答
answer = self._generate_answer(query, documents)
return {
"answer": answer,
"sources": documents,
"index_results": index_results
}
def _select_documents(self, query: str, index_results: List) -> List[str]:
"""
基于查询和索引结果,选择需要检索完整内容的文档
"""
# 可以用LLM判断哪些文档相关
# 或者基于分数阈值
selected = []
for result in index_results:
if result.score > 0.7: # 阈值
selected.append(result.document_id)
return selected
def _generate_answer(self, query: str, documents: List) -> str:
context = "\n\n".join([doc.content for doc in documents])
prompt = f"""基于以下文档回答问题:
文档:
{context}
问题:{query}
请基于文档内容回答:"""
response = self.llm.generate(prompt)
return response
4.3 Map-Reduce模式
class MapReduce:
"""
Map-Reduce模式处理长文档
Map: 对每个chunk独立处理
Reduce: 合并所有结果
"""
def __init__(self, llm, chunk_size: int = 4000):
self.llm = llm
self.chunk_size = chunk_size
def map(self, document: str, query: str) -> List[str]:
"""Map阶段:处理每个chunk"""
handler = LongContextHandler()
chunks = list(handler.split_into_chunks(document, self.chunk_size))
results = []
for chunk in chunks:
result = self._process_chunk(query, chunk)
results.append(result)
return results
def reduce(self, mapped_results: List[str], query: str) -> str:
"""Reduce阶段:合并结果"""
combined = "\n\n".join(mapped_results)
prompt = f"""基于以下分析结果,给出综合回答:
各部分分析:
{combined}
原问题:{query}
请给出综合性的最终回答:"""
return self.llm.generate(prompt)
def process(self, document: str, query: str) -> str:
"""完整的Map-Reduce流程"""
# Map
mapped = self.map(document, query)
# Reduce
reduced = self.reduce(mapped, query)
return reduced
def _process_chunk(self, query: str, chunk: str) -> str:
prompt = f"""分析以下文档片段,回答问题。
文档片段:
{chunk}
问题:{query}
请给出该片段的分析:"""
return self.llm.generate(prompt)
5. 上下文优化最佳实践
5.1 上下文结构化
class ContextBuilder:
"""结构化上下文构建器"""
def build(
self,
system_prompt: str,
user_query: str,
retrieved_docs: List[str],
conversation_history: List[Dict],
max_tokens: int = 8000
) -> List[Dict]:
"""
构建优化后的上下文
结构:
1. System Prompt (固定)
2. 检索文档 (可压缩)
3. 对话历史 (滑动窗口)
4. 用户查询 (必须包含)
"""
messages = []
# 1. System Prompt - 必须包含
messages.append({"role": "system", "content": system_prompt})
# 2. 检索文档 - 智能压缩
docs_context = self._build_docs_context(retrieved_docs, max_tokens)
if docs_context:
messages.append({
"role": "system",
"content": f"以下是相关信息:\n{docs_context}"
})
# 3. 对话历史 - 滑动窗口
history_messages = self._select_history(
conversation_history,
max_tokens - self._count_messages_tokens(messages)
)
messages.extend(history_messages)
# 4. 用户查询 - 必须包含
messages.append({"role": "user", "content": user_query})
return messages
def _build_docs_context(self, docs: List[str], max_tokens: int) -> str:
"""构建文档上下文"""
# 策略:优先保留高相关度的文档
# 这里简化处理:连接所有文档
combined = "\n\n---\n\n".join(docs)
if count_tokens(combined) <= max_tokens:
return combined
# 压缩
compressor = HierarchicalCompressor()
result = compressor.compress(combined, max_tokens)
return result["structure"]
def _select_history(self, history: List[Dict], max_tokens: int) -> List[Dict]:
"""选择历史消息"""
# 简单实现:保留最近的N条
selected = []
total_tokens = 0
for msg in reversed(history):
tokens = count_tokens(msg.get("content", ""))
if total_tokens + tokens <= max_tokens:
selected.insert(0, msg)
total_tokens += tokens
else:
break
return selected
def _count_messages_tokens(self, messages: List[Dict]) -> int:
"""计算消息列表的token数"""
return sum(
count_tokens(msg.get("content", ""))
for msg in messages
)
5.2 动态上下文调整
class DynamicContextManager:
"""
动态上下文管理器
根据任务复杂度动态调整上下文策略
"""
def __init__(self, llm):
self.llm = llm
def estimate_complexity(self, query: str, context: str) -> float:
"""
估算任务复杂度
Returns:
0-1的复杂度分数
"""
factors = []
# 因素1:查询长度
query_len = len(query.split())
factors.append(min(query_len / 100, 1))
# 因素2:上下文大小
context_len = len(context.split())
factors.append(min(context_len / 5000, 1))
# 因素3:问题类型(用LLM判断)
question_type = self._classify_question(query)
complexity_score = {
"simple": 0.2,
"medium": 0.5,
"complex": 0.9
}
factors.append(complexity_score.get(question_type, 0.5))
return sum(factors) / len(factors)
def _classify_question(self, query: str) -> str:
"""用LLM分类问题复杂度"""
prompt = f"""将以下问题分类为 simple/medium/complex:
问题:{query}
只输出分类结果:"""
return self.llm.generate(prompt).strip().lower()
def get_context_strategy(self, complexity: float) -> Dict:
"""
根据复杂度获取上下文策略
"""
if complexity < 0.3:
return {
"max_docs": 3,
"max_history": 2,
"compress_docs": False,
"model": "gpt-3.5-turbo"
}
elif complexity < 0.7:
return {
"max_docs": 5,
"max_history": 5,
"compress_docs": True,
"model": "gpt-4"
}
else:
return {
"max_docs": 10,
"max_history": 10,
"compress_docs": True,
"model": "gpt-4",
"use_recursion": True
}
6. Token优化技巧
6.1 Token计数与优化
class TokenOptimizer:
"""Token优化工具"""
@staticmethod
def optimize_prompt(prompt: str, target_tokens: int, model: str = "gpt-4") -> str:
"""
优化提示以适应目标token数
策略:
1. 移除冗余空行
2. 压缩重复内容
3. 缩短示例
"""
# 1. 移除多余空行
optimized = re.sub(r'\n{3,}', '\n\n', prompt.strip())
# 2. 检查是否需要进一步压缩
if count_tokens(optimized, model) <= target_tokens:
return optimized
# 3. 智能截断(保留开头和结尾)
encoder = tiktoken.encoding_for_model(model)
tokens = encoder.encode(optimized)
if len(tokens) > target_tokens:
# 保留前70%和后30%
keep_start = int(target_tokens * 0.7)
keep_end = int(target_tokens * 0.3)
start_tokens = tokens[:keep_start]
end_tokens = tokens[-keep_end:] if keep_end > 0 else []
# 添加省略标记
ellipsis_tokens = encoder.encode("\n\n[...]\n\n")
final_tokens = start_tokens + ellipsis_tokens + end_tokens
optimized = encoder.decode(final_tokens)
return optimized
@staticmethod
def estimate_response_tokens(
input_tokens: int,
desired_output_length: str = "medium"
) -> int:
"""
估算需要的输出token数
Args:
input_tokens: 输入token数
desired_output_length: short/medium/long
"""
multipliers = {
"short": 0.5,
"medium": 1.0,
"long": 2.0
}
multiplier = multipliers.get(desired_output_length, 1.0)
return int(input_tokens * multiplier)
@staticmethod
def is_safe_for_model(
total_tokens: int,
model: str = "gpt-4"
) -> tuple[bool, int]:
"""
检查token数是否安全
Returns:
(is_safe, remaining_tokens)
"""
limits = {
"gpt-4": 8192,
"gpt-4-turbo": 128000,
"gpt-4o": 128000,
"gpt-3.5-turbo": 16384,
}
limit = limits.get(model, 8192)
# 保留20%余量
safe_limit = int(limit * 0.8)
remaining = max(0, safe_limit - total_tokens)
return total_tokens <= safe_limit, remaining
6.2 系统提示优化
class SystemPromptOptimizer:
"""系统提示优化器"""
OPTIMIZED_TEMPLATES = {
"concise": """
你是一个专业的AI助手。
直接回答问题,保持简洁。
""",
"detailed": """
你是一个专业的AI助手,擅长提供详细、准确的信息。
回答要求:
1. 内容完整、准确
2. 结构清晰、易读
3. 必要时提供示例
请基于提供的信息回答,如信息不足请说明。
""",
"rag": """
你是一个专业的问答助手,基于检索到的信息回答问题。
回答要求:
1. 严格基于提供的信息回答
2. 不要编造信息
3. 信息不足时明确说明
4. 重要事实请标注来源
请基于以下信息回答问题。
""",
"code": """
你是一个编程助手,擅长解决编程问题。
回答要求:
1. 提供完整的、可运行的代码
2. 代码包含必要的注释
3. 解释关键逻辑
4. 提供使用示例
请用代码解决问题并解释。
"""
}
@classmethod
def get_optimized_prompt(cls, task_type: str) -> str:
"""获取优化的系统提示"""
return cls.OPTIMIZED_TEMPLATES.get(task_type, cls.OPTIMIZED_TEMPLATES["concise"])
@classmethod
def merge_prompts(cls, *prompts: str) -> str:
"""合并多个系统提示"""
# 去重
unique = set(p.strip() for p in prompts if p.strip())
# 过滤相似内容
# 合并
return "\n\n".join(sorted(unique))
7. 实现示例
7.1 完整的上下文管理器
"""
完整的上下文管理器实现
整合了:
- 对话历史管理
- 检索结果压缩
- 动态策略调整
- Token优化
"""
import tiktoken
from typing import List, Dict, Optional
from dataclasses import dataclass
import openai
class SmartContextManager:
"""
智能上下文管理器
功能:
1. 自动管理对话历史
2. 智能压缩检索结果
3. 动态调整策略
4. Token计数优化
"""
def __init__(
self,
model: str = "gpt-4",
system_prompt: Optional[str] = None,
max_tokens: Optional[int] = None
):
self.model = model
self.max_tokens = max_tokens or self._get_model_limit(model)
self.safe_limit = int(self.max_tokens * 0.8)
self.system_prompt = system_prompt
self.conversation_history: List[Dict] = []
# 初始化组件
self.compressor = HierarchicalCompressor()
self.dynamic_manager = DynamicContextManager(openai.ChatCompletion)
# 计算系统提示占用
self.system_tokens = (
count_tokens(system_prompt, model) if system_prompt else 0
)
def _get_model_limit(self, model: str) -> int:
limits = {
"gpt-4": 8192,
"gpt-4-turbo": 128000,
"gpt-4o": 128000,
"gpt-3.5-turbo": 16384,
}
return limits.get(model, 8192)
def build_context(
self,
user_query: str,
retrieved_docs: Optional[List[str]] = None,
complexity: Optional[float] = None
) -> List[Dict]:
"""
构建优化的上下文
Args:
user_query: 用户查询
retrieved_docs: 检索到的文档列表
complexity: 任务复杂度(0-1),不指定则自动估算
Returns:
LLM格式的消息列表
"""
# 估算复杂度
if complexity is None:
context = "\n\n".join(retrieved_docs) if retrieved_docs else ""
complexity = self.dynamic_manager.estimate_complexity(
user_query, context
)
# 获取策略
strategy = self.dynamic_manager.get_context_strategy(complexity)
# 构建消息
messages = []
# 1. System Prompt
if self.system_prompt:
messages.append({
"role": "system",
"content": self.system_prompt
})
# 2. 检索文档
if retrieved_docs:
docs_context = self._build_docs_context(
retrieved_docs,
strategy["max_docs"],
strategy["compress_docs"]
)
if docs_context:
messages.append({
"role": "system",
"content": f"参考信息:\n{docs_context}"
})
# 3. 对话历史
history_messages = self._select_history(
strategy["max_history"]
)
messages.extend(history_messages)
# 4. 用户查询
messages.append({
"role": "user",
"content": user_query
})
return messages
def _build_docs_context(
self,
docs: List[str],
max_docs: int,
compress: bool
) -> str:
"""构建文档上下文"""
# 限制文档数量
selected_docs = docs[:max_docs]
combined = "\n\n---\n\n".join(selected_docs)
tokens = count_tokens(combined, self.model)
available = self.safe_limit - self.system_tokens - 500 # 预留查询空间
if tokens <= available:
return combined
# 压缩
if compress:
result = self.compressor.compress(combined, available, self.model)
return result["structure"]
# 简单截断
return TokenOptimizer.optimize_prompt(combined, available, self.model)
def _select_history(self, max_messages: int) -> List[Dict]:
"""选择历史消息"""
# 保留最近的N条
count = min(max_messages, len(self.conversation_history))
return self.conversation_history[-count:]
def add_message(self, role: str, content: str):
"""添加消息到历史"""
self.conversation_history.append({
"role": role,
"content": content
})
def get_token_info(self, messages: List[Dict]) -> Dict:
"""获取Token信息"""
total = sum(count_tokens(m["content"], self.model) for m in messages)
return {
"total_tokens": total,
"remaining": self.safe_limit - total,
"percentage": total / self.safe_limit * 100,
"is_safe": total <= self.safe_limit
}
def clear_history(self):
"""清空对话历史"""
self.conversation_history = []
def trim_history(self, target_tokens: int):
"""修剪历史到目标token数"""
current_tokens = sum(
count_tokens(m["content"], self.model)
for m in self.conversation_history
)
if current_tokens <= target_tokens:
return
# 从最旧的消息开始删除
while current_tokens > target_tokens and self.conversation_history:
removed = self.conversation_history.pop(0)
current_tokens -= count_tokens(removed["content"], self.model)
# ============== 使用示例 ==============
if __name__ == "__main__":
# 初始化管理器
manager = SmartContextManager(
model="gpt-4",
system_prompt="你是一个专业的问答助手。"
)
# 模拟检索结果
docs = [
"Python 是一种高级编程语言,由 Guido van Rossum 创建。",
"Python 3.12 于 2023 年发布,带来了类型参数等新特性。",
"Python 广泛应用于 Web 开发、数据科学、人工智能等领域。",
] * 3 # 模拟更多文档
# 构建上下文
messages = manager.build_context(
user_query="Python 3.12 有什么新特性?",
retrieved_docs=docs
)
# 查看Token信息
token_info = manager.get_token_info(messages)
print(f"Token 使用情况: {token_info}")
# 构建API调用
# response = openai.ChatCompletion.create(
# model="gpt-4",
# messages=messages
# )
# 添加回复到历史
# manager.add_message("user", "Python 3.12 有什么新特性?")
# manager.add_message("assistant", response.choices[0].message.content)
面试高频问法
Q1: 如何处理超出上下文窗口的情况?
标准回答:
处理策略:
1. 滑动窗口
- 保留最近的N条消息
- 保留系统提示
- 保留关键的早期消息(如初始请求)
2. 内容压缩
- 摘要压缩:用LLM摘要长内容
- 关键词提取:保留关键词句
- 层级压缩:保留结构,压缩内容
3. 智能优先级
- 系统提示:必须保留
- 用户查询:必须保留
- 检索文档:按相关性排序,保留高分的
- 对话历史:滑动窗口
4. 分块处理
- Map-Reduce:分块处理再合并
- 递进式RAG:先索引再详情
实现:
```python
def build_context(query, docs, history, max_tokens):
messages = []
# 1. System prompt (必须)
messages.append({"role": "system", "content": system_prompt})
# 2. 检索文档 (压缩)
docs_tokens = sum(count_tokens(d) for d in docs)
if docs_tokens > available_tokens * 0.5:
docs = compress_docs(docs, available_tokens * 0.5)
messages.append({"role": "system", "content": format_docs(docs)})
# 3. 对话历史 (滑动窗口)
history = sliding_window(history, max_history)
messages.extend(history)
# 4. 用户查询 (必须)
messages.append({"role": "user", "content": query})
return messages</code></pre>
<p>```</p>
<h3>Q2: 如何优化Prompt以减少Token使用?</h3>
<p>标准回答:</p>
<pre><code>优化策略:
1. 简化语言
- 移除冗余说明
- 使用简洁的指令
- 避免重复表达
2. 优化示例
- 减少示例数量
- 缩短示例内容
- 只保留有代表性的示例
3. 结构化提示
- 使用JSON/列表格式
- 避免自然语言重复
- 用符号代替文本
4. 模板化
- 使用模板
- 动态填充
- 避免硬编码
5. Token技巧
- 中文比英文更省token
- 数字用字面量而非单词
- 避免不必要的标点
示例:
# 优化前
prompt = """你是一个编程助手。请帮我写一个Python函数,
该函数接受一个数字列表作为参数,返回列表中所有数字的平均值。
请确保函数处理空列表的情况,并添加适当的文档字符串。"""
# 优化后
prompt = """编程助手。写函数:
输入:数字列表
输出:平均值
要求:处理空列表,添加docstring
语言:Python"""
对比:优化前约150 tokens,优化后约60 tokens</code></pre>
<h3>Q3: 如何设计长文档的上下文管理?</h3>
<p>标准回答:</p>
<pre><code>长文档处理策略:
1. 分块策略
- 按语义边界分块(段落/章节)
- 控制块大小(如4K tokens)
- 添加重叠(200-500 tokens)
2. 递进式RAG
- 先建立文档索引/摘要
- 检索确定相关文档
- 获取完整文档内容
- 生成最终答案
3. Map-Reduce
- Map:每个chunk独立处理
- Reduce:合并分析结果
4. 层级压缩
- 保留文档结构
- 压缩低层级内容
- 保留高层级摘要
实现:
```python
def process_long_document(doc, query):
# 方案1: Map-Reduce
chunks = split_document(doc, chunk_size=4000)
# Map
results = [process_chunk(q, c) for c in chunks]
# Reduce
final = combine_results(results, q)
return final
# 方案2: 递进式RAG
def recursive_rag(query):
# Step 1: 检索索引
index_results = index_retriever.search(query)
# Step 2: 选择文档
doc_ids = select_relevant(index_results)
# Step 3: 获取完整内容
docs = doc_retriever.get(doc_ids)
# Step 4: 生成
return generate(query, docs)
---
## 总结
### 上下文工程核心要点
| 要点 | 策略 |
|------|------|
| **窗口管理** | 滑动窗口、动态调整、优先级排序 |
| **内容压缩** | 摘要、关键词、层级压缩 |
| **Token优化** | 简化语言、模板化、结构化 |
| **长文档处理** | 分块、递进、Map-Reduce |
| **系统提示** | 精简、清晰、必要即可 |
### 最佳实践
1. **预留安全余量**:使用80%的上下文限制
2. **优先级明确**:System > 查询 > 文档 > 历史
3. **动态策略**:根据任务复杂度调整
4. **监控Token**:实时跟踪使用情况
5. **测试验证**:确保关键信息不丢失