【AI Agent 知识库】02-RAG与检索优化-详解版

内容纲要

模块二:RAG 与检索优化(详解版)

覆盖:Embedding、Vector DB、Chunking、Re-ranking、混合检索、评估


目录


必须掌握的概念

2.1 RAG(Retrieval-Augmented Generation)

定义:
检索增强生成,通过检索相关文档来增强 LLM 的回答能力。

核心价值:

  • 知识时效性:无需重新训练模型即可更新知识
  • 减少幻觉:基于检索事实回答
  • 可解释性:提供答案来源
  • 领域知识:使用私有/领域数据

完整流程:

用户查询
    │
    ├─► 检索阶段
    │   ├─ 查询扩展
    │   ├─ 向量检索
    │   ├─ 关键词检索
    │   └─ 混合检索
    │
    ▼
文档检索结果
    │
    ├─► 重排阶段
    │   ├─ Cross-encoder
    │   ├─ LLM 重排
    │   └─ 规则重排
    │
    ▼
排序后的文档
    │
    ├─► 上下文构建
    │   ├─ 去重
    │   ├─ 截断/压缩
    │   └─ 引用标注
    │
    ▼
最终上下文
    │
    ▼
LLM 生成
    │
    ▼
答案 + 引用

2.2 Embedding(嵌入)

定义:
将文本映射到高维向量空间,语义相近的文本在空间中距离较近。

嵌入模型对比:

模型 维度 性能 适用场景
BGE-small-zh-v1.5 512 通用,成本敏感
BGE-base-zh-v1.5 768 平衡性能和效果
BGE-large-zh-v1.5 1024 高精度要求
Jina-embeddings-v2 768 多语言
OpenAI text-embedding-3 3072 慢/贵 高质量要求

代码示例:

from sentence_transformers import SentenceTransformer

# 初始化嵌入模型
embedder = SentenceTransformer('BAAI/bge-small-zh-v1.5')

# 单个文本嵌入
embedding = embedder.encode("Python 是一种编程语言")
# 输出: array([0.123, -0.456, ...], shape=(512,))

# 批量嵌入(推荐)
texts = ["Python", "Java", "Go", "JavaScript"]
embeddings = embedder.encode_batch(texts)
# 输出: shape=(4, 512)

# 计算相似度
from sklearn.metrics.pairwise import cosine_similarity

similarity = cosine_similarity([embedding], embeddings)[0]
# 输出: [0.95, 0.72, 0.65, 0.88]

2.3 Vector Database(向量数据库)

向量数据库对比:

数据库 开源 HNSW索引 云服务 推荐场景
Milvus 企业部署,高性能
Pinecone 快速开始,云原生
Weaviate 简单易用
Qdrant 混合检索
Chroma 本地开发
pgvector 已有 PostgreSQL

Milvus 示例:

from pymilvus import MilvusClient

client = MilvusClient("http://localhost:19530")

# 创建 Collection
client.create_collection(
    collection_name="documents",
    dimension=512,  # 嵌入维度
    metric_type="IP",  # 内积相似度
    index_type="IVF_FLAT",  # 索引类型
    index_param={"nlist": 128}
)

# 插入文档
documents = [
    {
        "id": "1",
        "vector": embedding1,
        "metadata": {"title": "Python 教程", "category": "技术"}
    },
    {
        "id": "2",
        "vector": embedding2,
        "metadata": {"title": "Java 教程", "category": "技术"}
    }
]
client.insert(collection_name="documents", data=documents)

# 搜索
results = client.search(
    collection_name="documents",
    data=[query_embedding],
    limit=5,
    output_fields=["title", "category"]
)

2.4 Chunking(文档切分)

切分策略对比:

策略 优点 缺点 适用场景
Fixed-size 简单 可能切断语义 通用场景
Semantic 保持语义完整 计算成本高 长文档
Markdown 保留结构 只适合 Markdown Markdown 文档
Recursive 保留层级 复杂 结构化文档

2.5 Re-ranking(重排)

重排方法:

方法 优点 缺点 成本
Cross-encoder 精度高
LLM rerank 最准确 很慢
规则重排 效果差

2.6 Hybrid Search(混合检索)

混合检索类型:

类型 组合方式 适用场景
Dense + Sparse 向量 + 关键词 通用
Dense + Dense 多个向量模型 多模态
Sparse + Sparse 多个关键词索引 精确匹配

关键设计点

2.1 RAG Engine 完整架构

# rag/rag_engine.py
"""
RAG Engine 完整实现
包含:查询处理、检索、重排、上下文构建、生成
"""

from typing import List, Dict, Optional
from dataclasses import dataclass
import numpy as np
from sentence_transformers import SentenceTransformer

# ============ 数据结构 ============

@dataclass
class Document:
    id: str
    content: str
    metadata: Dict
    embedding: Optional[np.ndarray] = None
    score: Optional[float] = 0.0

@dataclass
class RetrievedResult:
    document: Document
    score: float
    explanation: Optional[str] = None

@dataclass
class RAGResponse:
    answer: str
    sources: List[Document]
    usage: Dict[str, Any]
    intermediate: Optional[Dict[str, Any]] = None

# ============ 核心组件 ============

class EmbeddingModel:
    """嵌入模型"""
    def __init__(self, model_name: str = "BAAI/bge-small-zh-v1.5"):
        self.model = SentenceTransformer(model_name)
        self.dimension = self.model.get_sentence_embedding_dimension()

    def encode(self, text: str) -> np.ndarray:
        return self.model.encode(text)

    def encode_batch(self, texts: List[str]) -> np.ndarray:
        return self.model.encode_batch(texts)

class VectorStore:
    """向量存储(抽象)"""
    def add(self, documents: List[Document]) -> None:
        """添加文档"""
        raise NotImplementedError

    def search(
        self,
        query_vector: np.ndarray,
        top_k: int = 10,
        filters: Optional[Dict] = None
    ) -> List[RetrievedResult]:
        """向量搜索"""
        raise NotImplementedError

    def delete(self, ids: List[str]) -> None:
        """删除文档"""
        raise NotImplementedError

class SimpleVectorStore(VectorStore):
    """简单的内存向量存储"""
    def __init__(self, dimension: int = 512):
        self.documents: Dict[str, Document] = {}
        self.embeddings: Dict[str, np.ndarray] = {}
        self.dimension = dimension

    def add(self, documents: List[Document]) -> None:
        for doc in documents:
            self.documents[doc.id] = doc
            if doc.embedding is not None:
                self.embeddings[doc.id] = doc.embedding

    def search(
        self,
        query_vector: np.ndarray,
        top_k: int = 10,
        filters: Optional[Dict] = None
    ) -> List[RetrievedResult]:
        results = []

        for doc_id, embedding in self.embeddings.items():
            # 计算余弦相似度
            similarity = np.dot(query_vector, embedding) / (
                np.linalg.norm(query_vector) * np.linalg.norm(embedding)
            )

            # 过滤
            doc = self.documents[doc_id]
            if filters is not None:
                if not all(doc.metadata.get(k) == v for k, v in filters.items()):
                    continue

            results.append(RetrievedResult(
                document=doc,
                score=similarity
            ))

        # 排序并返回 top_k
        results.sort(key=lambda x: x.score, reverse=True)
        return results[:top_k]

class Retriever:
    """检索器"""
    def __init__(
        self,
        embedder: EmbeddingModel,
        vector_store: VectorStore,
        reranker: Optional['Reranker'] = None
    ):
        self.embedder = embedder
        self.vector_store = vector_store
        self.reranker = reranker

    def retrieve(
        self,
        query: str,
        top_k: int = 5,
        use_reranker: bool = True
    ) -> List[RetrievedResult]:
        # 1. 嵌入查询
        query_embedding = self.embedder.encode(query)

        # 2. 向量检索
        results = self.vector_store.search(
            query_vector=query_embedding,
            top_k=top_k * 2  # 检索更多候选
        )

        # 3. 去重
        results = self._deduplicate(results)

        # 4. 重排
        if use_reranker and self.reranker:
            results = self.reranker.rerank(query, results)

        # 5. 返回 top_k
        return results[:top_k]

    def _deduplicate(
        self,
        results: List[RetrievedResult]
    ) -> List[RetrievedResult]:
        """按文档 ID 去重"""
        seen = set()
        deduped = []
        for result in results:
            doc_id = result.document.id
            if doc_id not in seen:
                seen.add(doc_id)
                deduped.append(result)
        return deduped

class Reranker:
    """重排器"""
    def rerank(
        self,
        query: str,
        results: List[RetrievedResult]
    ) -> List[RetrievedResult]:
        raise NotImplementedError

class CrossEncoderReranker(Reranker):
    """Cross-encoder 重排"""
    def __init__(self, model_name: str = "BAAI/bge-reranker-v2-m3"):
        from sentence_transformers import CrossEncoder
        self.model = CrossEncoder(model_name)

    def rerank(
        self,
        query: str,
        results: List[RetrievedResult]
    ) -> List[RetrievedResult]:
        if not results:
            return results

        # 准备输入对
        pairs = [
            (query, result.document.content)
            for result in results
        ]

        # 计算分数
        scores = self.model.predict(pairs)

        # 更新分数
        for result, score in zip(results, scores):
            result.score = float(score)

        # 排序
        results.sort(key=lambda x: x.score, reverse=True)
        return results

class ContextBuilder:
    """上下文构建器"""
    def __init__(self, max_tokens: int = 4000):
        self.max_tokens = max_tokens

    def build(
        self,
        results: List[RetrievedResult]
    ) -> str:
        # 简单实现:连接所有文档
        contexts = []
        total_tokens = 0

        for result in results:
            content = result.document.content
            tokens = self._estimate_tokens(content)

            if total_tokens + tokens <= self.max_tokens:
                contexts.append(f"[文档 {result.document.id}]\n{content}")
                total_tokens += tokens
            else:
                break

        return "\n\n".join(contexts)

    def _estimate_tokens(self, text: str) -> int:
        """粗略估算 token 数"""
        # 中文约 1.5 字符 = 1 token
        # 英文约 4 字符 = 1 token
        chinese = len([c for c in text if '\u4e00' <= c <= '\u9fff'])
        english = len(text) - chinese
        return int(chinese / 1.5 + english / 4)

# ============ RAG Engine ============

class RAGEngine:
    def __init__(
        self,
        embedder: EmbeddingModel,
        vector_store: VectorStore,
        retriever: Optional[Retriever] = None,
        llm: Optional[Callable] = None,
        max_tokens: int = 4000
    ):
        self.embedder = embedder
        self.vector_store = vector_store

        if retriever is None:
            retriever = Retriever(embedder, vector_store)
        self.retriever = retriever

        self.llm = llm
        self.context_builder = ContextBuilder(max_tokens=max_tokens)

    def index_documents(self, texts: List[Dict]) -> None:
        """索引文档"""
        documents = []
        for i, text in enumerate(texts):
            doc_id = text.get("id", f"doc_{i}")
            content = text["content"]
            metadata = text.get("metadata", {})
            embedding = self.embedder.encode(content)

            documents.append(Document(
                id=doc_id,
                content=content,
                metadata=metadata,
                embedding=embedding
            ))

        self.vector_store.add(documents)

    def query(
        self,
        query: str,
        top_k: int = 5,
        use_reranker: bool = True
    ) -> RAGResponse:
        """执行 RAG 查询"""
        # 1. 检索
        results = self.retriever.retrieve(
            query=query,
            top_k=top_k,
            use_reranker=use_reranker
        )

        # 2. 构建上下文
        context = self.context_builder.build(results)

        # 3. 生成(如果有 LLM)
        if self.llm is not None:
            answer = self._generate(query, context)
        else:
            answer = context  # 返回上下文

        return RAGResponse(
            answer=answer,
            sources=[r.document for r in results],
            usage={
                "retrieved": len(results),
                "selected": len(results),
                "tokens": self.context_builder._estimate_tokens(context)
            }
        )

    def _generate(self, query: str, context: str) -> str:
        """生成答案"""
        prompt = f"""基于以下信息回答问题:

{context}

问题:{query}

请基于上述信息回答,如果信息不足请说明。"""

        return self.llm(prompt)

# ============ 使用示例 ============

if __name__ == "__main__":
    # 初始化组件
    embedder = EmbeddingModel()
    vector_store = SimpleVectorStore(dimension=embedder.dimension)
    reranker = CrossEncoderReranker()

    retriever = Retriever(
        embedder=embedder,
        vector_store=vector_store,
        reranker=reranker
    )

    # 创建 RAG Engine
    rag = RAGEngine(
        embedder=embedder,
        vector_store=vector_store,
        retriever=retriever,
        llm=lambda p: "基于文档的答案"
    )

    # 索引文档
    documents = [
        {
            "id": "1",
            "content": "Python 是一种高级编程语言,由 Guido van Rossum 创建。",
            "metadata": {"category": "编程语言", "year": 1991}
        },
        {
            "id": "2",
            "content": "JavaScript 是 Web 开发的主要语言之一。",
            "metadata": {"category": "编程语言", "type": "Web"}
        },
        {
            "id": "3",
            "content": "TypeScript 是 JavaScript 的超集,添加了类型系统。",
            "metadata": {"category": "编程语言", "type": "Web"}}
        }
    ]
    rag.index_documents(documents)

    # 查询
    query = "Python 是什么?"
    response = rag.query(query, top_k=2)

    print("=" * 50)
    print(f"问题: {query}")
    print("=" * 50)
    print(f"答案: {response.answer}")
    print("=" * 50)
    print(f"使用信息: {response.usage}")
    print("=" * 50)
    print("来源:")
    for source in response.sources:
        print(f"  - [{source.id}] {source.content[:50]}...")

2.2 Chunking 策略实现

# rag/chunking.py
"""
文档切分策略实现
"""

from typing import List
import re

class Chunker:
    def __init__(self, strategy: str = "semantic"):
        self.strategy = strategy

    def chunk(self, text: str, max_size: int = 500) -> List[str]:
        """切分文档"""
        if self.strategy == "fixed":
            return self._fixed_size_chunk(text, max_size)
        elif self.strategy == "semantic":
            return self._semantic_chunk(text, max_size)
        elif self.strategy == "markdown":
            return self._markdown_chunk(text, max_size)
        else:
            raise ValueError(f"未知策略: {self.strategy}")

    def _fixed_size_chunk(self, text: str, size: int) -> List[str]:
        """固定大小切分"""
        chunks = []
        for i in range(0, len(text), size):
            chunks.append(text[i:i + size])
        return chunks

    def _semantic_chunk(self, text: str, max_size: int) -> List[str]:
        """语义切分(按段落)"""
        # 按段落分割
        paragraphs = re.split(r'\n\n+', text.strip())

        chunks = []
        current_chunk = ""

        for para in paragraphs:
            if len(current_chunk) + len(para) + 2 <= max_size:
                current_chunk += para + "\n\n"
            else:
                if current_chunk:
                    chunks.append(current_chunk.strip())
                current_chunk = para + "\n\n"

        if current_chunk:
            chunks.append(current_chunk.strip())

        return chunks

    def _markdown_chunk(self, text: str, max_size: int) -> List[str]:
        """Markdown 切分(按标题)"""
        # 匹配 Markdown 标题
        pattern = r'^(#{1,6})\s+(.+)$'
        headers = list(re.finditer(pattern, text, re.MULTILINE))

        if not headers:
            return self._semantic_chunk(text, max_size)

        chunks = []
        last_pos = 0

        for match in headers:
            header_pos = match.start()

            # 检查是否需要切分
            if header_pos - last_pos > max_size:
                # 当前部分太大,切分
                section = text[last_pos:header_pos]
                sub_chunks = self._semantic_chunk(section, max_size)
                chunks.extend(sub_chunks)
            else:
                chunks.append(text[last_pos:header_pos].strip())

            last_pos = header_pos

        # 最后一部分
        if last_pos < len(text):
            chunks.append(text[last_pos:].strip())

        return chunks

# 使用示例
if __name__ == "__main__":
    text = """
# Python 简介

Python 是一种高级编程语言。

## 历史

Python 由 Guido van Rossum 于 1991 年创建。

## 特性

Python 以其简洁的语法和强大的功能而闻名。

# 用途

Python 广泛应用于 Web 开发、数据科学、人工智能等领域。
"""

    chunker = Chunker(strategy="markdown")
    chunks = chunker.chunk(text, max_size=100)

    for i, chunk in enumerate(chunks, 1):
        print(f"Chunk {i}:")
        print(chunk)
        print("-" * 50)

常见坑与解决方案

3.1 召回率低

问题现象:

  • 查询相关问题检索不到
  • 相关文档排名很低

原因分析:

  1. 查询和文档语义不匹配
  2. 嵌入模型效果差
  3. 切分粒度不当
  4. 元数据过滤过严

解决方案:

class RecallOptimizer:
    def __init__(self, retriever):
        self.retriever = retriever

    def optimize_retrieval(self, query: str, top_k: int = 10) -> List[RetrievedResult]:
        """召回优化"""

        # 策略 1: 查询扩展
        expanded_queries = self._expand_query(query)

        # 策略 2: 多模型检索
        all_results = []

        # 粗确模型
        precise_embedder = EmbeddingModel("BAAI/bge-large-zh-v1.5")
        for q in expanded_queries:
            results = self.retriever.retrieve_with_embedder(
                query=q,
                embedder=precise_embedder,
                top_k=top_k
            )
            all_results.extend(results)

        # 快速模型
        fast_embedder = EmbeddingModel("BAAI/bge-small-zh-v1.5")
        for q in expanded_queries:
            results = self.retriever.retrieve_with_embedder(
                query=q,
                embedder=fast_embedder,
                top_k=top_k
            )
            all_results.extend(results)

        # 策略 3: 去重
        deduped = self._deduplicate(all_results)

        # 策略 4: 混合检索(如果有)
        keyword_results = self._keyword_search(query)
        deduped.extend(keyword_results)

        return deduped[:top_k]

    def _expand_query(self, query: str) -> List[str]:
        """查询扩展"""
        # 同义词扩展
        synonyms = {
            "AI": ["人工智能", "机器学习", "深度学习"],
            "Python": ["python", "蟒蛇"],
            "并发": ["多线程", "异步"]
        }

        expanded = [query]

        for key, values in synonyms.items():
            if key.lower() in query.lower():
                for value in values:
                    expanded.append(query.replace(key, value))

        return list(set(expanded))

    def _deduplicate(self, results: List[RetrievedResult]) -> List[RetrievedResult]:
        """去重"""
        seen = set()
        deduped = []
        for r in results:
            if r.document.id not in seen:
                seen.add(r.document.id)
                deduped.append(r)
        return deduped

    def _keyword_search(self, query: str) -> List[RetrievedResult]:
        """关键词检索(如果支持)"""
        # 实现关键词搜索
        return []

3.2 精度低

问题现象:

  • 检索结果不相关
  • 高分文档实际不相关

解决方案:

class PrecisionOptimizer:
    def __init__(self, cross_encoder):
        self.cross_encoder = cross_encoder

    def improve_precision(
        self,
        query: str,
        candidates: List[RetrievedResult]
    ) -> List[RetrievedResult]:
        """提升精度"""

        # 策略 1: 增加检索候选
        if len(candidates) < 20:
            return candidates

        # 策略 2: Cross-encoder 重排
        reranked = self._cross_encoder_rerank(query, candidates)

        # 策略 3: LLM 重排(可选)
        if len(reranked) > 5:
            reranked = self._llm_rerank(query, reranked[:10])

        return reranked

    def _cross_encoder_rerank(self, query: str, results: List[RetrievedResult]):
        """Cross-encoder 重排"""
        pairs = [(query, r.document.content) for r in results]
        scores = self.cross_encoder.predict(pairs)

        for r, score in zip(results, scores):
            r.score = float(score)

        return sorted(results, key=lambda x: x.score, reverse=True)

    def _llm_rerank(self, query: str, results: List[RetrievedResult]):
        """LLM 重排"""
        docs_text = "\n\n".join([
            f"{i+1}. {r.document.content}"
            for i, r in enumerate(results)
        ])

        prompt = f"""请评估以下文档与问题的相关性。

问题:{query}

文档:
{docs_text}

请输出最相关的文档编号,用逗号分隔(例如:1,3,5)。"""

        response = llm.generate(prompt)
        selected_indices = [int(i.strip()) for i in response.split(",")]

        return [results[i-1] for i in selected_indices if i <= len(results)]

3.3 上下文截断

问题现象:

  • 相关信息被截断
  • 答案不完整

解决方案:

class ContextManager:
    def __init__(self, max_tokens: int = 4000):
        self.max_tokens = max_tokens

    def build_context(
        self,
        results: List[RetrievedResult]
    ) -> str:
        """智能上下文构建"""

        # 策略 1: 按分数排序
        sorted_results = sorted(results, key=lambda r: r.score, reverse=True)

        # 策略 2: 优先保留高分文档
        selected = []
        total_tokens = 0

        for r in sorted_results:
            tokens = self._estimate_tokens(r.document.content)

            if total_tokens + tokens <= self.max_tokens:
                selected.append(r)
                total_tokens += tokens
            else:
                # 策略 3: 尝试截断
                remaining = self.max_tokens - total_tokens
                if remaining > 100:  # 至少保留一些内容
                    truncated = self._truncate_by_tokens(
                        r.document.content,
                        remaining
                    )
                    selected.append(RetrievedResult(
                        document=Document(
                            id=r.document.id,
                            content=truncated,
                            metadata=r.document.metadata
                        ),
                        score=r.score
                    ))
                break

        return self._format_context(selected)

    def _truncate_by_tokens(self, text: str, max_tokens: int) -> str:
        """按 token 数截断"""
        chars_per_token = 2  # 粗略估计
        max_chars = int(max_tokens * chars_per_token)
        return text[:max_chars] + "..."

    def _estimate_tokens(self, text: str) -> int:
        """估算 token 数"""
        chinese = len([c for c in text if '\u4e00' <= c <= '\u9fff'])
        english = len(text) - chinese
        return int(chinese / 1.5 + english / 4)

    def _format_context(self, results: List[RetrievedResult]) -> str:
        """格式化上下文"""
        return "\n\n".join([
            f"[文档 {r.document.id}]\n{r.document.content}"
            for r in results
        ])

面试高频问法

Q1: 如何提升 RAG 的召回率?

标准回答:

召回率提升策略:

【1. 查询层面】

a) 查询扩展
   - 同义词扩展:AI → 人工智能、机器学习
   - LLM 生成:让 LLM 生成相关查询
   - 查询改写:明确意图,补充背景

示例:
```python
expanded = expand_query("Python 并发")
# → ["Python 并发", "Python 多线程", "Python 异步", "Python concurrency"]</code></pre>
<p>b) 多查询并行</p>
<ul>
<li>同时执行多个扩展查询</li>
<li>合并并去重结果</li>
</ul>
<p>【2. 检索层面】</p>
<p>a) 混合检索</p>
<ul>
<li>向量检索 + 关键词检索(BM25)</li>
<li>加权融合:score = α <em> vector_score + (1-α) </em> keyword_score</li>
</ul>
<p>b) 扩大检索候选</p>
<ul>
<li>先检索 2*K 个候选</li>
<li>再用 Re-ranking 筛选出 K 个</li>
</ul>
<p>c) 多模型检索</p>
<ul>
<li>精确模型 + 快速模型</li>
<li>合并结果</li>
</ul>
<p>【3. 数据层面】</p>
<p>a) 更好的嵌入模型</p>
<ul>
<li>换大模型:bge-large > bge-small</li>
<li>领域微调:用领域数据微调</li>
</ul>
<p>b) 优化切分策略</p>
<ul>
<li>语义切分:保持语义块完整</li>
<li>Markdown 切分:保留标题结构</li>
<li>递归切分:保留层级关系</li>
</ul>
<p>【4. 重排层面】</p>
<p>a) Cross-encoder</p>
<ul>
<li>使用专门的重排模型</li>
<li>精度提升:+10-30%</li>
</ul>
<p>b) LLM rerank(可选)</p>
<ul>
<li>用 LLM 评估相关性</li>
<li>成本高,但最准确</li>
</ul>
<p>【工程实践】</p>
<ol>
<li>默认使用混合检索</li>
<li>向量检索 top_k = 20</li>
<li>Cross-encoder 重排到 top_k = 5</li>
<li>严格测试:构建 Golden Set</li>
<li>监控指标:Recall@5, Recall@10
<pre><code></code></pre></li>
</ol>
<h3>Q2: RAG 的上下文窗口管理怎么做?</h3>
<p><strong>标准回答:</strong></p>
<pre><code>问题:LLM 上下文窗口有限,检索到的文档可能超出限制

解决方案:

【1. 截断策略】

按分数截断:
- 优先保留高分文档
- 简单有效

按时效截断:
- 新文档优先
- 适合时效性敏感场景

保留部分内容:
- 超长文档截断尾部
- 用 "..." 标记

【2. 压缩策略】

摘要压缩:
```python
def compress_documents(docs):
    for i, doc in enumerate(docs):
        if len(doc) > 1000:
            summary = llm.generate(f"用100字总结:{doc}")
            docs[i] = summary

每篇压缩 vs 整体压缩:

  • 每篇:保留每篇的摘要
  • 整体:所有文档合并后压缩

【3. 优先级策略】

动态优先级:

  • 相关性权重:70%
  • 时效性权重:20%
  • 用户偏好权重:10%

综合评分 = 0.7 relevance + 0.2 recency + 0.1 * preference

【4. 分片策略】

分次请求:

  • 超长上下文分多块
  • 每块独立处理
  • 最后合并结果

递归 RAG:

  • 先检索索引(标题)
  • 再检索详情(正文)
  • 减少单次请求数据量

【实现示例】

class ContextBuilder:
    def build(self, results, max_tokens=4000):
        selected = []
        total = 0

        for r in sorted(results, key=lambda x: x.score, reverse=True):
            tokens = estimate(r.content)
            if total + tokens <= max_tokens:
                selected.append(r)
                total += tokens
            else:
                # 尝试截断
                remaining = max_tokens - total
                if remaining > 100:
                    truncated = truncate(r.content, remaining)
                    selected.append(truncated)
        break

        return selected

【最佳实践】

  1. 先按分数排序
  2. 计算每个文档的 token 数
  3. 累加直到接近限制
  4. 留微超额时截断
  5. 粗确估算:中文 1.5 字符/token,英文 4 字符/token

记忆要点

RAG 优化口诀:

召回:查询扩展、混合检索
精度:重排、多模型
上下文:压缩、智能截断

Checklist:
□ 查询扩展(同义词 + LLM)
□ 混合检索(向量 + 关键词)
□ Re-ranking(Cross-encoder)
□ 上下文压缩
□ Token 精确估算

实战场景

场景:企业技术文档问答系统

需求:
企业需要一个技术文档问答系统:

  • 数据源:Git 仓库、Wiki、API 文档
  • 要求:回答准确、提供来源、支持多轮对话

架构:

用户查询
    │
    ▼
┌─────────────────┐
│   Query Router  │  → 意图识别
└────────┬────────┘
         │
    ┌────┴────┬────┐
    ▼         ▼    ▼
┌──────┐ ┌──────┐ ┌──────┐
│ RAG  │ │ Code  │ │General│
│Agent │ │Search│ │Agent  │
└───┬──┘ └───┬──┘ └───┬──┘
    │         │        │
    ▼         ▼        ▼
┌──────────────────────────┐
│    Knowledge Graph       │
│  - Tech Docs             │
│  - Code Base             │
│  - API Specs             │
└──────────────────────────┘

数据流:

# 1. 文档摄入
ingest_documents([
    {"source": "git", "repo": "github.com/company/docs"},
    {"source": "wiki", "url": "wiki.company.com"},
    {"source": "api", "specs": "swagger.yaml"}
])

# 2. 增强索引
for doc in documents:
    chunks = chunker.chunk(doc.content)
    for chunk in chunks:
        embedding = embedder.encode(chunk)
        vector_store.add(Document(
            id=generate_id(),
            content=chunk,
            embedding=embedding,
            metadata={"source": doc.source, ...}
        ))

# 3. 查询处理
query = "如何配置数据库连接池?"
results = rag_engine.query(
    query=query,
    top_k=5,
    filters={"source": "docs"}  # 只搜索文档
)

# 4. 生成答案
answer = llm.generate(
    prompt=f"基于以下信息回答:\n{context}\n问题:{query}"
)

文档版本: 1.0

close
arrow_upward