【AI Agent 知识库】20-上下文工程与压缩

内容纲要

上下文工程与压缩

目录

  1. 上下文工程概述
  2. 上下文窗口管理
  3. 上下文压缩策略
  4. 长上下文处理
  5. 上下文优化最佳实践
  6. Token优化技巧
  7. 实现示例

1. 上下文工程概述

1.1 什么是上下文工程

上下文工程是指管理传递给LLM的所有信息,包括:

  • 系统提示(System Prompt)
  • 对话历史
  • 检索到的文档
  • 工具调用结果
  • 用户输入

1.2 上下文限制

模型 上下文窗口 推荐安全使用
GPT-3.5-Turbo 16K 12K
GPT-4 8K 6K
GPT-4-Turbo 128K 100K
GPT-4o 128K 100K
Claude 3 Haiku 200K 150K
Claude 3 Sonnet 200K 150K
Claude 3 Opus 200K 150K

1.3 上下文工程的重要性

┌─────────────────────────────────────────────────┐
│              上下文工程的价值                    │
├─────────────────────────────────────────────────┤
│  ✓ 确保关键信息不被截断                          │
│  ✓ 控制Token使用成本                             │
│  ✓ 提升响应质量(减少噪音)                       │
│  ✓ 保持上下文连贯性                               │
│  ✓ 支持长文档处理                                │
└─────────────────────────────────────────────────┘

2. 上下文窗口管理

2.1 基本概念

from typing import List, Dict, Optional
from dataclasses import dataclass
import tiktoken

@dataclass
class ContextWindow:
    model: str
    max_tokens: int
    current_tokens: int = 0

    def remaining(self) -> int:
        return self.max_tokens - self.current_tokens

    def can_fit(self, text: str) -> bool:
        tokens = count_tokens(text, self.model)
        return self.current_tokens + tokens <= self.max_tokens

def count_tokens(text: str, model: str = "gpt-4") -> int:
    """计算文本的Token数量"""
    try:
        encoding = tiktoken.encoding_for_model(model)
        return len(encoding.encode(text))
    except KeyError:
        # 降级到 cl100k_base (GPT-4/Turbo的编码)
        encoding = tiktoken.get_encoding("cl100k_base")
        return len(encoding.encode(text))

2.2 对话历史管理

from typing import List
from dataclasses import dataclass

@dataclass
class Message:
    role: str  # "system", "user", "assistant", "tool"
    content: str
    tokens: int = 0

class ConversationManager:
    def __init__(
        self,
        model: str = "gpt-4",
        max_tokens: int = 8000,
        system_prompt: Optional[str] = None
    ):
        self.model = model
        self.window = ContextWindow(model=model, max_tokens=max_tokens)
        self.system_prompt = system_prompt
        self.messages: List[Message] = []

        # 计算系统提示占用
        if system_prompt:
            sys_tokens = count_tokens(system_prompt, model)
            self.window.current_tokens = sys_tokens

    def add_message(self, role: str, content: str) -> bool:
        """添加消息,返回是否成功"""
        tokens = count_tokens(content, self.model)

        if not self.window.can_fit(content):
            # 尝试压缩历史
            compressed = self._compress_history()
            if not self.window.can_fit(content):
                return False

        message = Message(role=role, content=content, tokens=tokens)
        self.messages.append(message)
        self.window.current_tokens += tokens
        return True

    def _compress_history(self) -> bool:
        """压缩对话历史"""
        # 策略1:删除最旧的非系统消息
        # 策略2:摘要长消息
        # 策略3:滑动窗口
        pass

    def get_messages(self) -> List[Dict]:
        """获取LLM格式的消息列表"""
        result = []
        if self.system_prompt:
            result.append({"role": "system", "content": self.system_prompt})

        for msg in self.messages:
            result.append({"role": msg.role, "content": msg.content})

        return result

    def summarize(self) -> str:
        """摘要对话历史"""
        pass

2.3 滑动窗口策略

class SlidingWindowManager:
    def __init__(
        self,
        max_messages: int = 10,
        keep_system: bool = True,
        keep_first_user: bool = True
    ):
        self.max_messages = max_messages
        self.keep_system = keep_system
        self.keep_first_user = keep_first_user
        self.messages: List[Message] = []

    def add_message(self, message: Message):
        self.messages.append(message)

        # 限制消息数量
        if len(self.messages) > self.max_messages:
            self._prune()

    def _prune(self):
        """修剪消息,保留关键消息"""
        # 保留系统消息
        system_msgs = [m for m in self.messages if m.role == "system"]

        # 保留第一条用户消息(通常是初始请求)
        first_user = None
        if self.keep_first_user:
            for m in self.messages:
                if m.role == "user":
                    first_user = m
                    break

        # 保留最近的N条消息
        recent_count = self.max_messages - len(system_msgs) - (1 if first_user else 0)
        recent = self.messages[-recent_count:] if recent_count > 0 else []

        # 重建消息列表
        self.messages = []
        self.messages.extend(system_msgs)
        if first_user:
            self.messages.append(first_user)
        self.messages.extend(recent)

3. 上下文压缩策略

3.1 压缩策略概览

策略 适用场景 优点 缺点
摘要压缩 长文档、长对话 大幅减少token 可能丢失细节
关键词提取 文档检索 保留关键信息 可能丢失上下文
层级压缩 结构化文档 保留结构关系 实现复杂
智能采样 检索结果 保留多样信息 需要相似度分数
实体提取 信息密集文本 保留事实信息 丢失语言细节

3.2 摘要压缩

import openai

class SummarizationCompressor:
    def __init__(self, llm_client=None):
        self.llm = llm_client or openai.ChatCompletion

    async def compress(
        self,
        text: str,
        target_ratio: float = 0.3,
        max_tokens: int = 1000
    ) -> str:
        """
        压缩文本到目标比例

        Args:
            text: 原始文本
            target_ratio: 目标压缩比例(0.3 = 30%)
            max_tokens: 最大输出token数
        """
        original_tokens = count_tokens(text)
        target_tokens = int(original_tokens * target_ratio)

        prompt = f"""请将以下文本压缩,保留核心信息和主要观点。

目标长度:约 {target_tokens} tokens
原始长度:{original_tokens} tokens

文本:
{text}

请输出压缩后的版本:"""

        response = await self.llm.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=max_tokens,
            temperature=0.3  # 低温度确保忠实度
        )

        return response.choices[0].message.content

    async def compress_messages(
        self,
        messages: List[Message],
        summary_threshold: int = 10
    ) -> List[Message]:
        """
        压缩消息列表

        将旧消息摘要为一总结消息
        """
        if len(messages) < summary_threshold:
            return messages

        # 分离旧消息和最近消息
        old_messages = messages[:-5]
        recent_messages = messages[-5:]

        # 摘要旧消息
        old_text = "\n".join([
            f"{m.role}: {m.content}" for m in old_messages
        ])
        summary = await self.compress(old_text)

        # 返回摘要 + 最近消息
        result = [Message(role="system", content=summary)]
        result.extend(recent_messages)
        return result

3.3 关键词提取

import re
from collections import Counter

class KeywordExtractor:
    def __init__(self):
        # 中英文停用词
        self.stopwords = {
            "的", "了", "是", "在", "我", "有", "和", "就", "不", "人",
            "都", "一", "一个", "上", "也", "很", "到", "说", "要", "去",
            "你", "会", "着", "没有", "看", "好", "自己", "这", "the", "a",
            "an", "is", "are", "was", "were", "be", "been", "being",
            "have", "has", "had", "do", "does", "did", "will", "would"
        }

    def extract(
        self,
        text: str,
        top_k: int = 20,
        min_length: int = 2
    ) -> List[str]:
        """
        提取关键词

        Args:
            text: 输入文本
            top_k: 返回前K个关键词
            min_length: 最小词长
        """
        # 简单分词(实际可用更复杂的NLP工具)
        words = re.findall(r'\b\w+\b', text.lower())

        # 过滤停用词
        words = [w for w in words if w not in self.stopwords and len(w) >= min_length]

        # 统计词频
        counter = Counter(words)

        # 返回top_k
        return [word for word, _ in counter.most_common(top_k)]

    def extract_with_positions(
        self,
        text: str,
        top_k: int = 10
    ) -> List[Dict]:
        """提取关键词及其位置"""
        keywords = self.extract(text, top_k * 2)  # 多提取一些

        result = []
        for keyword in keywords[:top_k]:
            # 查找关键词出现的位置
            positions = []
            for match in re.finditer(keyword, text, re.IGNORECASE):
                positions.append(match.start())

            if positions:
                result.append({
                    "keyword": keyword,
                    "positions": positions,
                    "count": len(positions)
                })

        return sorted(result, key=lambda x: x["count"], reverse=True)

    def build_context(self, text: str, keywords: List[str]) -> str:
        """
        基于关键词构建上下文

        保留关键词所在句子
        """
        sentences = re.split(r'[.!?。!?\n]', text)

        # 筛选包含关键词的句子
        keyword_set = set(k.lower() for k in keywords)
        selected = []

        for sentence in sentences:
            sentence_lower = sentence.lower()
            if any(keyword in sentence_lower for keyword in keyword_set):
                selected.append(sentence.strip())

        return " ".join(selected)

3.4 层级压缩

from typing import Dict, List

class HierarchicalCompressor:
    """层级压缩 - 适用于Markdown/结构化文档"""

    def compress(
        self,
        text: str,
        max_tokens: int,
        model: str = "gpt-4"
    ) -> Dict:
        """
        层级压缩文档

        1. 解析文档结构
        2. 压缩低层级内容
        3. 保留高层级结构
        """
        # 解析为层级结构
        structure = self._parse_structure(text)

        # 递归压缩
        compressed = self._compress_structure(structure, max_tokens, model)

        return {
            "summary": compressed["summary"],
            "structure": compressed["structure"],
            "tokens": self._count_tokens_compressed(compressed["structure"])
        }

    def _parse_structure(self, text: str) -> List[Dict]:
        """解析文档为层级结构"""
        import re

        # 简单Markdown解析
        sections = []

        current_section = {
            "level": 0,
            "title": "Root",
            "content": "",
            "children": []
        }

        lines = text.split('\n')
        i = 0

        while i < len(lines):
            line = lines[i]

            # 检测标题
            header_match = re.match(r'^(#{1,6})\s+(.+)$', line)
            if header_match:
                level = len(header_match.group(1))
                title = header_match.group(2)

                # 如果是更低级的标题,保存当前section
                sections.append(current_section)

                current_section = {
                    "level": level,
                    "title": title,
                    "content": "",
                    "children": []
                }
            else:
                current_section["content"] += line + "\n"

            i += 1

        sections.append(current_section)
        return sections

    def _compress_structure(
        self,
        structure: List[Dict],
        max_tokens: int,
        model: str
    ) -> Dict:
        """递归压缩结构"""
        total_tokens = 0
        compressed_structure = []

        # 从高优先级(低level)开始
        for section in sorted(structure, key=lambda x: x["level"]):
            section_tokens = count_tokens(section["content"], model)

            if total_tokens + section_tokens <= max_tokens:
                # 完整保留
                compressed_structure.append({
                    "title": section["title"],
                    "content": section["content"],
                    "compressed": False
                })
                total_tokens += section_tokens
            else:
                # 压缩内容
                compressed_content = self._compress_content(
                    section["content"],
                    max_tokens - total_tokens,
                    model
                )
                compressed_structure.append({
                    "title": section["title"],
                    "content": compressed_content,
                    "compressed": True
                })
                break

        return {
            "structure": compressed_structure,
            "summary": self._generate_summary(compressed_structure)
        }

    def _compress_content(self, content: str, max_tokens: int, model: str) -> str:
        """压缩单个section"""
        # 简单截断或使用LLM摘要
        if count_tokens(content, model) <= max_tokens:
            return content

        # 策略:保留开头和结尾
        tokens_per_side = max_tokens // 2

        # 按tokens分割
        encoder = tiktoken.encoding_for_model(model)
        tokens = encoder.encode(content)

        mid = len(tokens) // 2
        start = max(0, mid - tokens_per_side)
        end = min(len(tokens), mid + tokens_per_side)

        compressed_tokens = tokens[start:end]
        return encoder.decode(compressed_tokens)

    def _generate_summary(self, structure: List[Dict]) -> str:
        """生成结构摘要"""
        summary = []
        for section in structure:
            flag = "[已压缩]" if section["compressed"] else "[完整]"
            summary.append(f"{flag} {section['title']}")
        return "\n".join(summary)

    def _count_tokens_compressed(self, structure: List[Dict], model: str = "gpt-4") -> int:
        """计算压缩后的token数"""
        return sum(count_tokens(s["content"], model) for s in structure)

3.5 智能采样压缩

class IntelligentSamplingCompressor:
    """智能采样 - 基于分数和多样性"""

    def compress(
        self,
        results: List[Dict],
        max_items: int,
        diversity_factor: float = 0.3
    ) -> List[Dict]:
        """
        智能采样检索结果

        Args:
            results: 检索结果,每个包含 {"content": str, "score": float, "metadata": dict}
            max_items: 最大保留数量
            diversity_factor: 多样性因子,越高越注重多样性
        """
        if len(results) <= max_items:
            return results

        # 1. 按分数排序
        scored = sorted(results, key=lambda x: x["score"], reverse=True)

        # 2. 计算多样性
        diversity_count = int(max_items * diversity_factor)
        top_count = max_items - diversity_count

        # 3. 选取高分结果
        top_results = scored[:top_count]

        # 4. 从剩余中选取多样性样本
        remaining = scored[top_count:]
        diverse_results = self._select_diverse(
            remaining,
            diversity_count,
            top_results
        )

        # 5. 合并
        return top_results + diverse_results

    def _select_diverse(
        self,
        candidates: List[Dict],
        count: int,
        existing: List[Dict]
    ) -> List[Dict]:
        """选择多样性样本"""
        if len(candidates) <= count:
            return candidates

        selected = []
        existing_texts = [r["content"] for r in existing]

        for _ in range(count):
            if not candidates:
                break

            # 计算每个候选与已选的相似度
           选_index = self._select_least_similar(
                candidates,
                existing_texts + [r["content"] for r in selected]
            )

            selected.append(candidates.pop(选_index))

        return selected

    def _select_least_similar(
        self,
        candidates: List[Dict],
        existing: List[str]
    ) -> int:
        """选择与现有样本最不相似的候选"""
        min_similarity = float('inf')
        best_index = 0

        for i, candidate in enumerate(candidates):
            similarity = self._compute_similarity(candidate["content"], existing)
            if similarity < min_similarity:
                min_similarity = similarity
                best_index = i

        return best_index

    def _compute_similarity(self, text: str, existing_texts: List[str]) -> float:
        """计算文本与现有文本的相似度"""
        # 简单实现:使用Jaccard相似度
        text_words = set(text.lower().split())

        max_sim = 0
        for existing in existing_texts:
            existing_words = set(existing.lower().split())
            intersection = len(text_words & existing_words)
            union = len(text_words | existing_words)
            sim = intersection / union if union > 0 else 0
            max_sim = max(max_sim, sim)

        return max_sim

4. 长上下文处理

4.1 分块处理

from typing import Generator, List

class LongContextHandler:
    def __init__(self, model: str = "gpt-4"):
        self.model = model
        self.max_tokens = self._get_model_limit(model)

    def _get_model_limit(self, model: str) -> int:
        limits = {
            "gpt-4": 8000,
            "gpt-4-turbo": 128000,
            "gpt-4o": 128000,
            "gpt-3.5-turbo": 16000,
            "claude-3-opus": 200000,
        }
        return limits.get(model, 8000)

    def split_into_chunks(
        self,
        text: str,
        chunk_size: Optional[int] = None,
        overlap: int = 200
    ) -> Generator[str, None, None]:
        """
        将长文本分割为适合模型的chunks

        Args:
            text: 输入文本
            chunk_size: 每块token数,默认为模型限制的80%
            overlap: 重叠token数
        """
        if chunk_size is None:
            chunk_size = int(self.max_tokens * 0.8)

        encoder = tiktoken.encoding_for_model(self.model)
        tokens = encoder.encode(text)

        start = 0
        while start < len(tokens):
            end = min(start + chunk_size, len(tokens))
            chunk_tokens = tokens[start:end]
            yield encoder.decode(chunk_tokens)
            start = end - overlap

    def process_long_document(
        self,
        document: str,
        query: str,
        process_func: callable
    ) -> str:
        """
        处理长文档

        策略:
        1. 将文档分块
        2. 每块独立处理
        3. 合并结果
        """
        chunks = list(self.split_into_chunks(document))

        results = []
        for i, chunk in enumerate(chunks):
            # 为每个chunk添加上下文信息
            context_info = f"[文档片段 {i+1}/{len(chunks)}]"
            enriched_chunk = f"{context_info}\n\n{chunk}"

            result = process_func(query, enriched_chunk)
            results.append(result)

        # 合并结果(可以进一步摘要)
        return "\n\n---\n\n".join(results)

4.2 递归RAG

class RecursiveRAG:
    """
递进式RAG处理长文档

    1. 先检索文档索引/摘要
    2. 确定需要哪些文档
    3. 检索完整文档内容
    4. 最终生成
    """

    def __init__(
        self,
        index_retriever,  # 检索索引/摘要的retriever
        document_retriever,  # 检索完整文档的retriever
        llm
    ):
        self.index_retriever = index_retriever
        self.document_retriever = document_retriever
        self.llm = llm

    def query(self, query: str) -> Dict:
        # Step 1: 检索相关文档索引
        index_results = self.index_retriever.retrieve(query, top_k=10)

        # Step 2: 确定需要哪些完整文档
        document_ids = self._select_documents(query, index_results)

        # Step 3: 检索完整内容
        documents = self.document_retriever.retrieve_by_ids(document_ids)

        # Step 4: 生成回答
        answer = self._generate_answer(query, documents)

        return {
            "answer": answer,
            "sources": documents,
            "index_results": index_results
        }

    def _select_documents(self, query: str, index_results: List) -> List[str]:
        """
        基于查询和索引结果,选择需要检索完整内容的文档
        """
        # 可以用LLM判断哪些文档相关
        # 或者基于分数阈值
        selected = []
        for result in index_results:
            if result.score > 0.7:  # 阈值
                selected.append(result.document_id)
        return selected

    def _generate_answer(self, query: str, documents: List) -> str:
        context = "\n\n".join([doc.content for doc in documents])

        prompt = f"""基于以下文档回答问题:

文档:
{context}

问题:{query}

请基于文档内容回答:"""

        response = self.llm.generate(prompt)
        return response

4.3 Map-Reduce模式

class MapReduce:
    """
    Map-Reduce模式处理长文档

    Map: 对每个chunk独立处理
    Reduce: 合并所有结果
    """

    def __init__(self, llm, chunk_size: int = 4000):
        self.llm = llm
        self.chunk_size = chunk_size

    def map(self, document: str, query: str) -> List[str]:
        """Map阶段:处理每个chunk"""
        handler = LongContextHandler()
        chunks = list(handler.split_into_chunks(document, self.chunk_size))

        results = []
        for chunk in chunks:
            result = self._process_chunk(query, chunk)
            results.append(result)

        return results

    def reduce(self, mapped_results: List[str], query: str) -> str:
        """Reduce阶段:合并结果"""
        combined = "\n\n".join(mapped_results)

        prompt = f"""基于以下分析结果,给出综合回答:

各部分分析:
{combined}

原问题:{query}

请给出综合性的最终回答:"""

        return self.llm.generate(prompt)

    def process(self, document: str, query: str) -> str:
        """完整的Map-Reduce流程"""
        # Map
        mapped = self.map(document, query)

        # Reduce
        reduced = self.reduce(mapped, query)

        return reduced

    def _process_chunk(self, query: str, chunk: str) -> str:
        prompt = f"""分析以下文档片段,回答问题。

文档片段:
{chunk}

问题:{query}

请给出该片段的分析:"""
        return self.llm.generate(prompt)

5. 上下文优化最佳实践

5.1 上下文结构化

class ContextBuilder:
    """结构化上下文构建器"""

    def build(
        self,
        system_prompt: str,
        user_query: str,
        retrieved_docs: List[str],
        conversation_history: List[Dict],
        max_tokens: int = 8000
    ) -> List[Dict]:
        """
        构建优化后的上下文

        结构:
        1. System Prompt (固定)
        2. 检索文档 (可压缩)
        3. 对话历史 (滑动窗口)
        4. 用户查询 (必须包含)
        """
        messages = []

        # 1. System Prompt - 必须包含
        messages.append({"role": "system", "content": system_prompt})

        # 2. 检索文档 - 智能压缩
        docs_context = self._build_docs_context(retrieved_docs, max_tokens)
        if docs_context:
            messages.append({
                "role": "system",
                "content": f"以下是相关信息:\n{docs_context}"
            })

        # 3. 对话历史 - 滑动窗口
        history_messages = self._select_history(
            conversation_history,
            max_tokens - self._count_messages_tokens(messages)
        )
        messages.extend(history_messages)

        # 4. 用户查询 - 必须包含
        messages.append({"role": "user", "content": user_query})

        return messages

    def _build_docs_context(self, docs: List[str], max_tokens: int) -> str:
        """构建文档上下文"""
        # 策略:优先保留高相关度的文档
        # 这里简化处理:连接所有文档
        combined = "\n\n---\n\n".join(docs)

        if count_tokens(combined) <= max_tokens:
            return combined

        # 压缩
        compressor = HierarchicalCompressor()
        result = compressor.compress(combined, max_tokens)

        return result["structure"]

    def _select_history(self, history: List[Dict], max_tokens: int) -> List[Dict]:
        """选择历史消息"""
        # 简单实现:保留最近的N条
        selected = []
        total_tokens = 0

        for msg in reversed(history):
            tokens = count_tokens(msg.get("content", ""))
            if total_tokens + tokens <= max_tokens:
                selected.insert(0, msg)
                total_tokens += tokens
            else:
                break

        return selected

    def _count_messages_tokens(self, messages: List[Dict]) -> int:
        """计算消息列表的token数"""
        return sum(
            count_tokens(msg.get("content", ""))
            for msg in messages
        )

5.2 动态上下文调整

class DynamicContextManager:
    """
    动态上下文管理器

    根据任务复杂度动态调整上下文策略
    """

    def __init__(self, llm):
        self.llm = llm

    def estimate_complexity(self, query: str, context: str) -> float:
        """
        估算任务复杂度

        Returns:
            0-1的复杂度分数
        """
        factors = []

        # 因素1:查询长度
        query_len = len(query.split())
        factors.append(min(query_len / 100, 1))

        # 因素2:上下文大小
        context_len = len(context.split())
        factors.append(min(context_len / 5000, 1))

        # 因素3:问题类型(用LLM判断)
        question_type = self._classify_question(query)
        complexity_score = {
            "simple": 0.2,
            "medium": 0.5,
            "complex": 0.9
        }
        factors.append(complexity_score.get(question_type, 0.5))

        return sum(factors) / len(factors)

    def _classify_question(self, query: str) -> str:
        """用LLM分类问题复杂度"""
        prompt = f"""将以下问题分类为 simple/medium/complex:

问题:{query}

只输出分类结果:"""

        return self.llm.generate(prompt).strip().lower()

    def get_context_strategy(self, complexity: float) -> Dict:
        """
        根据复杂度获取上下文策略
        """
        if complexity < 0.3:
            return {
                "max_docs": 3,
                "max_history": 2,
                "compress_docs": False,
                "model": "gpt-3.5-turbo"
            }
        elif complexity < 0.7:
            return {
                "max_docs": 5,
                "max_history": 5,
                "compress_docs": True,
                "model": "gpt-4"
            }
        else:
            return {
                "max_docs": 10,
                "max_history": 10,
                "compress_docs": True,
                "model": "gpt-4",
                "use_recursion": True
            }

6. Token优化技巧

6.1 Token计数与优化

class TokenOptimizer:
    """Token优化工具"""

    @staticmethod
    def optimize_prompt(prompt: str, target_tokens: int, model: str = "gpt-4") -> str:
        """
        优化提示以适应目标token数

        策略:
        1. 移除冗余空行
        2. 压缩重复内容
        3. 缩短示例
        """
        # 1. 移除多余空行
        optimized = re.sub(r'\n{3,}', '\n\n', prompt.strip())

        # 2. 检查是否需要进一步压缩
        if count_tokens(optimized, model) <= target_tokens:
            return optimized

        # 3. 智能截断(保留开头和结尾)
        encoder = tiktoken.encoding_for_model(model)
        tokens = encoder.encode(optimized)

        if len(tokens) > target_tokens:
            # 保留前70%和后30%
            keep_start = int(target_tokens * 0.7)
            keep_end = int(target_tokens * 0.3)

            start_tokens = tokens[:keep_start]
            end_tokens = tokens[-keep_end:] if keep_end > 0 else []

            # 添加省略标记
            ellipsis_tokens = encoder.encode("\n\n[...]\n\n")

            final_tokens = start_tokens + ellipsis_tokens + end_tokens
            optimized = encoder.decode(final_tokens)

        return optimized

    @staticmethod
    def estimate_response_tokens(
        input_tokens: int,
        desired_output_length: str = "medium"
    ) -> int:
        """
        估算需要的输出token数

        Args:
            input_tokens: 输入token数
            desired_output_length: short/medium/long
        """
        multipliers = {
            "short": 0.5,
            "medium": 1.0,
            "long": 2.0
        }

        multiplier = multipliers.get(desired_output_length, 1.0)
        return int(input_tokens * multiplier)

    @staticmethod
    def is_safe_for_model(
        total_tokens: int,
        model: str = "gpt-4"
    ) -> tuple[bool, int]:
        """
        检查token数是否安全

        Returns:
            (is_safe, remaining_tokens)
        """
        limits = {
            "gpt-4": 8192,
            "gpt-4-turbo": 128000,
            "gpt-4o": 128000,
            "gpt-3.5-turbo": 16384,
        }

        limit = limits.get(model, 8192)
        # 保留20%余量
        safe_limit = int(limit * 0.8)

        remaining = max(0, safe_limit - total_tokens)
        return total_tokens <= safe_limit, remaining

6.2 系统提示优化

class SystemPromptOptimizer:
    """系统提示优化器"""

    OPTIMIZED_TEMPLATES = {
        "concise": """
你是一个专业的AI助手。
直接回答问题,保持简洁。
""",

        "detailed": """
你是一个专业的AI助手,擅长提供详细、准确的信息。

回答要求:
1. 内容完整、准确
2. 结构清晰、易读
3. 必要时提供示例

请基于提供的信息回答,如信息不足请说明。
""",

        "rag": """
你是一个专业的问答助手,基于检索到的信息回答问题。

回答要求:
1. 严格基于提供的信息回答
2. 不要编造信息
3. 信息不足时明确说明
4. 重要事实请标注来源

请基于以下信息回答问题。
""",

        "code": """
你是一个编程助手,擅长解决编程问题。

回答要求:
1. 提供完整的、可运行的代码
2. 代码包含必要的注释
3. 解释关键逻辑
4. 提供使用示例

请用代码解决问题并解释。
"""
    }

    @classmethod
    def get_optimized_prompt(cls, task_type: str) -> str:
        """获取优化的系统提示"""
        return cls.OPTIMIZED_TEMPLATES.get(task_type, cls.OPTIMIZED_TEMPLATES["concise"])

    @classmethod
    def merge_prompts(cls, *prompts: str) -> str:
        """合并多个系统提示"""
        # 去重
        unique = set(p.strip() for p in prompts if p.strip())

        # 过滤相似内容
        # 合并
        return "\n\n".join(sorted(unique))

7. 实现示例

7.1 完整的上下文管理器

"""
完整的上下文管理器实现

整合了:
- 对话历史管理
- 检索结果压缩
- 动态策略调整
- Token优化
"""

import tiktoken
from typing import List, Dict, Optional
from dataclasses import dataclass
import openai

class SmartContextManager:
    """
    智能上下文管理器

    功能:
    1. 自动管理对话历史
    2. 智能压缩检索结果
    3. 动态调整策略
    4. Token计数优化
    """

    def __init__(
        self,
        model: str = "gpt-4",
        system_prompt: Optional[str] = None,
        max_tokens: Optional[int] = None
    ):
        self.model = model
        self.max_tokens = max_tokens or self._get_model_limit(model)
        self.safe_limit = int(self.max_tokens * 0.8)

        self.system_prompt = system_prompt
        self.conversation_history: List[Dict] = []

        # 初始化组件
        self.compressor = HierarchicalCompressor()
        self.dynamic_manager = DynamicContextManager(openai.ChatCompletion)

        # 计算系统提示占用
        self.system_tokens = (
            count_tokens(system_prompt, model) if system_prompt else 0
        )

    def _get_model_limit(self, model: str) -> int:
        limits = {
            "gpt-4": 8192,
            "gpt-4-turbo": 128000,
            "gpt-4o": 128000,
            "gpt-3.5-turbo": 16384,
        }
        return limits.get(model, 8192)

    def build_context(
        self,
        user_query: str,
        retrieved_docs: Optional[List[str]] = None,
        complexity: Optional[float] = None
    ) -> List[Dict]:
        """
        构建优化的上下文

        Args:
            user_query: 用户查询
            retrieved_docs: 检索到的文档列表
            complexity: 任务复杂度(0-1),不指定则自动估算

        Returns:
            LLM格式的消息列表
        """
        # 估算复杂度
        if complexity is None:
            context = "\n\n".join(retrieved_docs) if retrieved_docs else ""
            complexity = self.dynamic_manager.estimate_complexity(
                user_query, context
            )

        # 获取策略
        strategy = self.dynamic_manager.get_context_strategy(complexity)

        # 构建消息
        messages = []

        # 1. System Prompt
        if self.system_prompt:
            messages.append({
                "role": "system",
                "content": self.system_prompt
            })

        # 2. 检索文档
        if retrieved_docs:
            docs_context = self._build_docs_context(
                retrieved_docs,
                strategy["max_docs"],
                strategy["compress_docs"]
            )
            if docs_context:
                messages.append({
                    "role": "system",
                    "content": f"参考信息:\n{docs_context}"
                })

        # 3. 对话历史
        history_messages = self._select_history(
            strategy["max_history"]
        )
        messages.extend(history_messages)

        # 4. 用户查询
        messages.append({
            "role": "user",
            "content": user_query
        })

        return messages

    def _build_docs_context(
        self,
        docs: List[str],
        max_docs: int,
        compress: bool
    ) -> str:
        """构建文档上下文"""
        # 限制文档数量
        selected_docs = docs[:max_docs]

        combined = "\n\n---\n\n".join(selected_docs)
        tokens = count_tokens(combined, self.model)

        available = self.safe_limit - self.system_tokens - 500  # 预留查询空间

        if tokens <= available:
            return combined

        # 压缩
        if compress:
            result = self.compressor.compress(combined, available, self.model)
            return result["structure"]

        # 简单截断
        return TokenOptimizer.optimize_prompt(combined, available, self.model)

    def _select_history(self, max_messages: int) -> List[Dict]:
        """选择历史消息"""
        # 保留最近的N条
        count = min(max_messages, len(self.conversation_history))
        return self.conversation_history[-count:]

    def add_message(self, role: str, content: str):
        """添加消息到历史"""
        self.conversation_history.append({
            "role": role,
            "content": content
        })

    def get_token_info(self, messages: List[Dict]) -> Dict:
        """获取Token信息"""
        total = sum(count_tokens(m["content"], self.model) for m in messages)
        return {
            "total_tokens": total,
            "remaining": self.safe_limit - total,
            "percentage": total / self.safe_limit * 100,
            "is_safe": total <= self.safe_limit
        }

    def clear_history(self):
        """清空对话历史"""
        self.conversation_history = []

    def trim_history(self, target_tokens: int):
        """修剪历史到目标token数"""
        current_tokens = sum(
            count_tokens(m["content"], self.model)
            for m in self.conversation_history
        )

        if current_tokens <= target_tokens:
            return

        # 从最旧的消息开始删除
        while current_tokens > target_tokens and self.conversation_history:
            removed = self.conversation_history.pop(0)
            current_tokens -= count_tokens(removed["content"], self.model)

# ============== 使用示例 ==============

if __name__ == "__main__":
    # 初始化管理器
    manager = SmartContextManager(
        model="gpt-4",
        system_prompt="你是一个专业的问答助手。"
    )

    # 模拟检索结果
    docs = [
        "Python 是一种高级编程语言,由 Guido van Rossum 创建。",
        "Python 3.12 于 2023 年发布,带来了类型参数等新特性。",
        "Python 广泛应用于 Web 开发、数据科学、人工智能等领域。",
    ] * 3  # 模拟更多文档

    # 构建上下文
    messages = manager.build_context(
        user_query="Python 3.12 有什么新特性?",
        retrieved_docs=docs
    )

    # 查看Token信息
    token_info = manager.get_token_info(messages)
    print(f"Token 使用情况: {token_info}")

    # 构建API调用
    # response = openai.ChatCompletion.create(
    #     model="gpt-4",
    #     messages=messages
    # )

    # 添加回复到历史
    # manager.add_message("user", "Python 3.12 有什么新特性?")
    # manager.add_message("assistant", response.choices[0].message.content)

面试高频问法

Q1: 如何处理超出上下文窗口的情况?

标准回答:

处理策略:

1. 滑动窗口
   - 保留最近的N条消息
   - 保留系统提示
   - 保留关键的早期消息(如初始请求)

2. 内容压缩
   - 摘要压缩:用LLM摘要长内容
   - 关键词提取:保留关键词句
   - 层级压缩:保留结构,压缩内容

3. 智能优先级
   - 系统提示:必须保留
   - 用户查询:必须保留
   - 检索文档:按相关性排序,保留高分的
   - 对话历史:滑动窗口

4. 分块处理
   - Map-Reduce:分块处理再合并
   - 递进式RAG:先索引再详情

实现:
```python
def build_context(query, docs, history, max_tokens):
    messages = []

    # 1. System prompt (必须)
    messages.append({"role": "system", "content": system_prompt})

    # 2. 检索文档 (压缩)
    docs_tokens = sum(count_tokens(d) for d in docs)
    if docs_tokens > available_tokens * 0.5:
        docs = compress_docs(docs, available_tokens * 0.5)
    messages.append({"role": "system", "content": format_docs(docs)})

    # 3. 对话历史 (滑动窗口)
    history = sliding_window(history, max_history)
    messages.extend(history)

    # 4. 用户查询 (必须)
    messages.append({"role": "user", "content": query})

    return messages</code></pre>
<p>```</p>
<h3>Q2: 如何优化Prompt以减少Token使用?</h3>
<p>标准回答:</p>
<pre><code>优化策略:

1. 简化语言
   - 移除冗余说明
   - 使用简洁的指令
   - 避免重复表达

2. 优化示例
   - 减少示例数量
   - 缩短示例内容
   - 只保留有代表性的示例

3. 结构化提示
   - 使用JSON/列表格式
   - 避免自然语言重复
   - 用符号代替文本

4. 模板化
   - 使用模板
   - 动态填充
   - 避免硬编码

5. Token技巧
   - 中文比英文更省token
   - 数字用字面量而非单词
   - 避免不必要的标点

示例:
# 优化前
prompt = """你是一个编程助手。请帮我写一个Python函数,
该函数接受一个数字列表作为参数,返回列表中所有数字的平均值。
请确保函数处理空列表的情况,并添加适当的文档字符串。"""

# 优化后
prompt = """编程助手。写函数:
输入:数字列表
输出:平均值
要求:处理空列表,添加docstring
语言:Python"""

对比:优化前约150 tokens,优化后约60 tokens</code></pre>
<h3>Q3: 如何设计长文档的上下文管理?</h3>
<p>标准回答:</p>
<pre><code>长文档处理策略:

1. 分块策略
   - 按语义边界分块(段落/章节)
   - 控制块大小(如4K tokens)
   - 添加重叠(200-500 tokens)

2. 递进式RAG
   - 先建立文档索引/摘要
   - 检索确定相关文档
   - 获取完整文档内容
   - 生成最终答案

3. Map-Reduce
   - Map:每个chunk独立处理
   - Reduce:合并分析结果

4. 层级压缩
   - 保留文档结构
   - 压缩低层级内容
   - 保留高层级摘要

实现:
```python
def process_long_document(doc, query):
    # 方案1: Map-Reduce
    chunks = split_document(doc, chunk_size=4000)

    # Map
    results = [process_chunk(q, c) for c in chunks]

    # Reduce
    final = combine_results(results, q)

    return final

# 方案2: 递进式RAG
def recursive_rag(query):
    # Step 1: 检索索引
    index_results = index_retriever.search(query)

    # Step 2: 选择文档
    doc_ids = select_relevant(index_results)

    # Step 3: 获取完整内容
    docs = doc_retriever.get(doc_ids)

    # Step 4: 生成
    return generate(query, docs)


---

## 总结

### 上下文工程核心要点

| 要点 | 策略 |
|------|------|
| **窗口管理** | 滑动窗口、动态调整、优先级排序 |
| **内容压缩** | 摘要、关键词、层级压缩 |
| **Token优化** | 简化语言、模板化、结构化 |
| **长文档处理** | 分块、递进、Map-Reduce |
| **系统提示** | 精简、清晰、必要即可 |

### 最佳实践

1. **预留安全余量**:使用80%的上下文限制
2. **优先级明确**:System > 查询 > 文档 > 历史
3. **动态策略**:根据任务复杂度调整
4. **监控Token**:实时跟踪使用情况
5. **测试验证**:确保关键信息不丢失
close
arrow_upward