【AI Agent 知识库】30-Faiss向量检索架构详解

内容纲要

模块30: Faiss 向量检索架构详解

对应JD需求:微软中国 AI 知识库检索架构岗
核心技术:Faiss、HNSW、IVF、DiskANN、Embedding、RAG优化


目录

  1. 向量检索基础
  2. Faiss框架架构
  3. 核心索引算法详解
  4. 生产环境优化
  5. RAG检索优化实战
  6. 面试高频问题

1. 向量检索基础

1.1 向量空间模型

# 向量检索的本质:在高维空间中寻找最相似的向量

import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

# 基础相似度计算
def cosine_similarity(vec1, vec2):
    return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

def euclidean_distance(vec1, vec2):
    return np.linalg.norm(vec1 - vec2)

def dot_product_similarity(vec1, vec2):
    return np.dot(vec1, vec2)

1.2 向量数据库核心指标

指标 说明 优化目标
召回率 (Recall) 返回结果中包含真实最近邻的比例 >95%
查询延迟 (Latency) 单次查询的响应时间 <50ms
吞吐量 (QPS) 每秒处理查询数量 >1000
索引内存 索引占用的内存空间 最小化
构建时间 建立索引所需时间 可接受

1.3 精确检索 vs 近似检索

# 精确检索:暴力搜索,保证100%准确
def exact_search(query, vectors, k=5):
    """暴力搜索,时间复杂度 O(N*D)"""
    distances = [cosine_similarity(query, v) for v in vectors]
    top_k_indices = np.argsort(distances)[-k:][::-1]
    return [(idx, distances[idx]) for idx in top_k_indices]

# 近似检索:Faiss,牺牲小部分精度换取速度
import faiss

def approximate_search(query, index, k=5):
    """Faiss近似检索,时间复杂度 O(logN*D)"""
    query = np.array([query]).astype('float32')
    distances, indices = index.search(query, k)
    return list(zip(indices[0], distances[0]))

2. Faiss框架架构

2.1 Faiss核心概念

import faiss
import numpy as np

# 1. 数据准备
dimension = 768  # 向量维度(如bert-base-768)
n_vectors = 100000
vectors = np.random.rand(n_vectors, dimension).astype('float32')

# 2. 索引创建
# Flat索引:精确检索,适合小数据集
index = faiss.IndexFlatL2(dimension)

# 3. 添加向量
index.add(vectors)
print(f"索引包含 {index.ntotal} 个向量")

# 4. 查询
query = np.random.rand(1, dimension).astype('float32')
k = 10
distances, indices = index.search(query, k)

2.2 Faiss索引类型树

Faiss索引类型
├── 精确索引
│   ├── IndexFlatL2        # L2距离
│   ├── IndexFlatIP        # 内积
│   └── IndexFlatCosine    # 余弦相似度
├── 基于量化的索引
│   ├── IndexIVFFlat       # 倒排+平坦
│   ├── IndexIVFPQ         # 倒排+乘积量化
│   └── IndexHNSW          # 分层小世界图
└── 索引组合
    ├── IndexIVF           # 倒排框架
    ├── IndexRefine        # 精化索引
    └── IndexIDMap         # 带ID映射

2.3 Faiss GPU支持

import faiss

# GPU索引创建
def create_gpu_index(vectors, dimension):
    cpu_index = faiss.IndexFlatL2(dimension)
    cpu_index.add(vectors)

    # 迁移到GPU
    gpu_index = faiss.index_cpu_to_gpu(
        faiss.StandardGpuResources(), 0, cpu_index
    )
    return gpu_index

# 多GPU查询
def multi_gpu_search(queries, indexes, k=10):
    results = []
    for gpu_id, index in enumerate(indexes):
        distances, indices = index.search(queries, k)
        results.append((distances, indices))
    # 合并并重排序结果
    return merge_and_rerank(results)

3. 核心索引算法详解

3.1 HNSW (Hierarchical Navigable Small World)

HNSW是目前最流行的近似最近邻算法之一,结合了分层导航和跳表思想。

import faiss
import numpy as np

class HNSWIndex:
    """HNSW索引封装"""

    def __init__(self, dimension, ef_construction=200, M=16, ef_search=50):
        """
        参数说明:
        - ef_construction: 构建时每层保留的邻居数,越大精度越高,构建越慢
        - M: 每层每个节点的最大连接数,影响图密度
        - ef_search: 查询时动态扩展的候选列表大小
        """
        self.dimension = dimension
        self.index = faiss.IndexHNSWFlat(dimension, M)
        self.index.hnsw.efConstruction = ef_construction
        self.index.hnsw.efSearch = ef_search

    def add(self, vectors, ids=None):
        """添加向量"""
        if ids is not None:
            self.index.add_with_ids(vectors, ids)
        else:
            self.index.add(vectors)

    def search(self, query, k=10):
        """搜索"""
        if isinstance(query, np.ndarray) and query.ndim == 1:
            query = query.reshape(1, -1)
        distances, indices = self.index.search(query, k)
        return distances[0], indices[0]

    def search_batch(self, queries, k=10):
        """批量搜索"""
        distances, indices = self.index.search(queries, k)
        return distances, indices

    def save(self, path):
        """保存索引"""
        faiss.write_index(self.index, path)

    def load(self, path):
        """加载索引"""
        self.index = faiss.read_index(path)

# 使用示例
def hnsw_example():
    dimension = 768
    n_vectors = 10000
    vectors = np.random.rand(n_vectors, dimension).astype('float32')

    # 创建HNSW索引
    hnsw = HNSWIndex(
        dimension,
        ef_construction=200,  # 构建参数
        M=16,                  # 连接度
        ef_search=50           # 查询参数
    )
    hnsw.add(vectors)

    # 查询
    query = np.random.rand(dimension).astype('float32')
    distances, indices = hnsw.search(query, k=5)

    return distances, indices

3.2 HNSW算法原理

HNSW索引构建过程:

第0层(最底层)
┌─────────────────────────────────────┐
│  节点1 ──── 节点2 ──── 节点3      │  ← 基础图,所有点
│    │         │         │         │     边数最多(M个邻居)
│  节点4 ──── 节点5 ──── 节点6      │
└─────────────────────────────────────┘

第1层
┌─────────────────────────────────────┐
│  节点1 ──────────── 节点5          │  ← 稀疏图,部分点
│                    │               │     边数较少
│               节点8 ──── 节点9      │
└─────────────────────────────────────┘
         ↓ (指向更靠近查询的点)

第2层(最顶层)
┌─────────────────────────────────────┐
│  节点1 ────────────────── 节点9      │  ← 最稀疏,快速定位
└─────────────────────────────────────┘
# HNSW查询流程
def hnsw_search_process(query, index):
    """
    HNSW查询流程:
    1. 从顶层入口点开始
    2. 每层进行贪婪搜索,找到最近邻
    3. 逐层向下,最后在底层精确搜索
    """
    # 概念代码,展示查询流程
    current_point = index.entry_point

    # 从顶层向下搜索
    for level in reversed(range(index.max_level)):
        # 当前层的贪婪搜索
        candidates = index.get_neighbors(current_point, level)
        current_point = min(candidates, key=lambda p: distance(query, p))

    # 底层精细搜索(ef_search扩展)
    visited = set()
    results = []
    candidates = index.get_neighbors(current_point, 0)

    while candidates:
        point = candidates.pop()
        if point in visited:
            continue
        visited.add(point)

        # 计算距离并更新结果
        dist = distance(query, point)
        if len(results) < index.k or dist < results[-1].distance:
            results.append(Result(point, dist))
            results.sort(key=lambda r: r.distance)
            results = results[:index.k]

        # 动态扩展候选集
        if len(visited) < index.ef_search:
            candidates.extend(index.get_neighbors(point, 0))

    return results

3.3 IVF (Inverted File Index)

IVF将向量空间划分为多个聚类,搜索时只搜索相关聚类。

import faiss
import numpy as np

class IVFIndex:
    """IVF索引封装"""

    def __init__(self, dimension, nlist=100, nprobe=10, quantizer=None):
        """
        参数说明:
        - nlist: 聚类中心数量,通常取 sqrt(N) 或 N/100
        - nprobe: 搜索时探测的聚类数,影响精度/速度权衡
        - quantizer: 量化器,默认使用FlatL2
        """
        self.dimension = dimension
        self.nlist = nlist

        # 使用FlatL2作为quantizer
        quantizer = faiss.IndexFlatL2(dimension)

        # 创建IVF索引
        self.index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
        self.index.nprobe = nprobe

    def train(self, vectors):
        """训练聚类(必须先训练再添加)"""
        self.index.train(vectors)

    def add(self, vectors):
        """添加向量"""
        if not self.index.is_trained:
            self.train(vectors)
        self.index.add(vectors)

    def search(self, query, k=10):
        """搜索"""
        if isinstance(query, np.ndarray) and query.ndim == 1:
            query = query.reshape(1, -1)
        distances, indices = self.index.search(query, k)
        return distances[0], indices[0]

# 使用示例
def ivf_example():
    dimension = 768
    n_vectors = 100000
    vectors = np.random.rand(n_vectors, dimension).astype('float32')

    # 创建IVF索引
    ivf = IVFIndex(
        dimension,
        nlist=1000,   # 聚类数
        nprobe=100     # 搜索时探测的聚类数
    )

    # 先训练再添加
    ivf.train(vectors)
    ivf.add(vectors)

    # 查询
    query = np.random.rand(dimension).astype('float32')
    distances, indices = ivf.search(query, k=5)

    return distances, indices

3.4 IVF索引原理

IVF索引结构:

训练阶段:
┌─────────────────────────────────────────┐
│  向量集合 V = {v1, v2, ..., vN}         │
│                                         │
│  使用K-means聚类:                      │
│  - 随机选择nlist个聚类中心C1...Cnlist    │
│  - 将每个向量分配到最近的聚类          │
│  - 迭代优化聚类中心位置                │
└─────────────────────────────────────────┘

索引结构:
┌─────────────────────────────────────────┐
│  倒排文件(Inverted File):            │
│                                         │
│  Cluster 1: [v1, v15, v23, ...]        │
│  Cluster 2: [v5, v18, v30, ...]       │
│  Cluster 3: [v7, v9, v12, ...]         │
│  ...                                    │
│  Cluster nlist: [v3, v11, v45, ...]    │
└─────────────────────────────────────────┘

查询阶段:
┌─────────────────────────────────────────┐
│  1. 找到离查询q最近的聚类中心          │
│  2. 在最近的nprobe个聚类中搜索         │
│  3. 合并结果并返回top-k                │
└─────────────────────────────────────────┘

3.5 IVFPQ (Product Quantization)

IVFPQ在IVF基础上使用乘积量化,大幅减少内存占用。

import faiss
import numpy as np

class IVFPQIndex:
    """IVFPQ索引封装"""

    def __init__(self, dimension, nlist=1000, m=8, nbits=8, nprobe=100):
        """
        参数说明:
        - nlist: 聚类数量
        - m: PQ量化子空间数量,通常为维度/子空间大小
        - nbits: 每个子空间的编码位数(bit数),决定codebook大小
        - nprobe: 搜索时探测的聚类数
        """
        self.dimension = dimension
        self.m = m
        self.nbits = nbits

        # 创建quantizer
        quantizer = faiss.IndexFlatL2(dimension)

        # 创建IVFPQ索引
        self.index = faiss.IndexIVFPQ(quantizer, dimension, nlist, m, nbits)
        self.index.nprobe = nprobe

    def train(self, vectors):
        """训练聚类和量化器"""
        self.index.train(vectors)

    def add(self, vectors):
        """添加向量"""
        if not self.index.is_trained:
            self.train(vectors)
        self.index.add(vectors)

    def search(self, query, k=10):
        """搜索"""
        if isinstance(query, np.ndarray) and query.ndim == 1:
            query = query.reshape(1, -1)
        distances, indices = self.index.search(query, k)
        return distances[0], indices[0]

    def get_memory_usage(self):
        """获取内存使用信息"""
        return {
            'total_memory': self.index.memory_usage(),
            'stored_vectors': self.index.ntotal,
            'quantization_bits': self.nbits,
            'subspaces': self.m
        }

# 使用示例
def ivfpq_example():
    dimension = 768
    n_vectors = 1000000
    vectors = np.random.rand(n_vectors, dimension).astype('float32')

    # 创建IVFPQ索引(适合超大规模数据)
    ivfpq = IVFPQIndex(
        dimension,
        nlist=1000,    # 聚类数
        m=8,          # 量化子空间数(768/8=96维/子空间)
        nbits=8,      # 每个子空间8bit(256个codebook向量)
        nprobe=100    # 搜索时探测100个聚类
    )

    ivfpq.train(vectors)
    ivfpq.add(vectors)

    # 查询
    query = np.random.rand(dimension).astype('float32')
    distances, indices = ivfpq.search(query, k=5)

    # 内存使用
    memory = ivfpq.get_memory_usage()

    return distances, indices, memory

3.6 PQ量化原理

乘积量化(Product Quantization):

原始向量(D维):
[v1, v2, v3, ..., vD]

划分为m个子空间(每个D/m维):
Subspace 1: [v1, v2, ..., vD/m]
Subspace 2: [vD/m+1, ..., v2D/m]
...
Subspace m: [..., vD]

每个子空间独立量化:
┌─────────────────────────────────────────┐
│  Subspace 1:                           │
│  - 使用K-means聚类得到2^nbits个中心点   │
│  - 原始向量替换为最近的中心点ID         │
│                                         │
│  Subspace 2:                           │
│  - 同样操作...                         │
└─────────────────────────────────────────┘

编码结果:
[code1, code2, ..., codem]  (每个code是nbits位)

压缩比例:
原始: D × 4 bytes (float32)
编码: m × nbits bits
压缩比 = (D × 32) / (m × nbits)

3.7 DiskANN (Disk-based Approximate Nearest Neighbor)

DiskANN专为超大规模数据(>1亿向量)设计,支持SSD上的索引。

# 注意:Faiss的DiskANN支持是实验性的
# 生产环境建议使用Microsoft的DiskANN实现

class DiskANN:
    """DiskANN概念实现"""

    def __init__(self, dimension, max_degree=64, search_list_size=100):
        """
        参数说明:
        - max_degree: 图中每个节点的最大度数
        - search_list_size: 搜索时的候选列表大小
        """
        self.dimension = dimension
        self.max_degree = max_degree
        self.search_list_size = search_list_size

    def build_index(self, vectors, storage_path):
        """
        构建索引并存储到磁盘

        关键技术:
        1. Vamana图构建(类似HNSW)
        2. 节点数据存储在SSD
        3. 索引结构常驻内存
        """
        # 概念代码
        self.graph = build_vamana_graph(
            vectors,
            max_degree=self.max_degree
        )

        # 将向量数据存到磁盘
        self.vector_storage = DiskVectorStorage(storage_path)
        self.vector_storage.write(vectors)

        # 图索引常驻内存(轻量级)
        self.index_size = len(self.graph) * max_degree * 4  # 指针大小

    def search(self, query, k=10):
        """
        磁盘感知的搜索算法

        优化策略:
        1. 预取:预加载可能访问的向量数据
        2. 缓存:热点数据常驻内存
        3. 批量:减少磁盘I/O次数
        """
        # 1. 在图上找到候选节点(内存操作)
        candidate_ids = self.graph_search(query, self.search_list_size)

        # 2. 批量加载向量数据(磁盘操作)
        vectors = self.vector_storage.read_batch(candidate_ids)

        # 3. 精确计算距离并排序
        distances = [distance(query, v) for v in vectors]
        results = sorted(zip(candidate_ids, distances), key=lambda x: x[1])[:k]

        return results

# 实际使用建议:使用Microsoft的DiskANN或ScaNN

4. 生产环境优化

4.1 索引选择决策树

数据规模决策:
┌─────────────────────────────────────────┐
│  N < 10K:                               │
│    → IndexFlatL2 (精确搜索)             │
│                                         │
│  10K <= N < 1M:                        │
│    如果内存充足 → IndexHNSW              │
│    如果内存受限 → IndexIVFPQ             │
│                                         │
│  1M <= N < 10M:                        │
│    高精度需求 → IndexHNSW + GPU          │
│    平衡需求 → IndexIVFPQ                 │
│                                         │
│  N >= 10M:                              │
│    可全内存 → IVFPQ + GPU                │
│    需用磁盘 → DiskANN / ScaNN           │
└─────────────────────────────────────────┘

4.2 生产级Faiss封装

import faiss
import numpy as np
from typing import Optional, List, Tuple
import pickle
import os

class ProductionVectorIndex:
    """生产级向量索引"""

    def __init__(
        self,
        dimension: int,
        index_type: str = "hnsw",
        metric_type: str = "l2",
        **kwargs
    ):
        """
        参数:
        - dimension: 向量维度
        - index_type: 索引类型 (flat, hnsw, ivf, ivfpq)
        - metric_type: 距离类型 (l2, ip, cosine)
        - kwargs: 索引特定参数
        """
        self.dimension = dimension
        self.index_type = index_type
        self.metric_type = metric_type
        self.id_map = {}  # 外部ID到内部ID的映射
        self.next_id = 0

        self.index = self._create_index(**kwargs)

    def _create_index(self, **kwargs):
        """创建索引"""
        index = None

        if self.index_type == "flat":
            index = self._create_flat_index()

        elif self.index_type == "hnsw":
            index = self._create_hnsw_index(**kwargs)

        elif self.index_type == "ivf":
            index = self._create_ivf_index(**kwargs)

        elif self.index_type == "ivfpq":
            index = self._create_ivfpq_index(**kwargs)

        else:
            raise ValueError(f"Unknown index type: {self.index_type}")

        # 应用余弦距离(如果需要)
        if self.metric_type == "cosine":
            index = self._apply_cosine_wrapper(index)

        return index

    def _create_flat_index(self):
        """创建精确索引"""
        if self.metric_type == "l2":
"    return faiss.IndexFlatL2(self.dimension)
        elif self.metric_type == "ip":
            return faiss.IndexFlatIP(self.dimension)
        else:
            return faiss.IndexFlatL2(self.dimension)

    def _create_hnsw_index(self, M=16, ef_construction=200, ef_search=50):
        """创建HNSW索引"""
        index = faiss.IndexHNSWFlat(self.dimension, M)
        index.hnsw.efConstruction = ef_construction
        index.hnsw.efSearch = ef_search
        return index

    def _create_ivf_index(self, nlist=100, nprobe=10):
        """创建IVF索引"""
        quantizer = faiss.IndexFlatL2(self.dimension)
        index = faiss.IndexIVFFlat(quantizer, self.dimension, nlist)
        index.nprobe = nprobe
        return index

    def _create_ivfpq_index(self, nlist=1000, m=8, nbits=8, nprobe=100):
        """创建IVFPQ索引"""
        quantizer = faiss.IndexFlatL2(self.dimension)
        index = faiss.IndexIVFPQ(quantizer, self.dimension, nlist, m, nbits)
        index.nprobe = nprobe
        return index

    def _apply_cosine_wrapper(self, index):
        """应用余弦距离(归一化包装器)"""
        return faiss.IndexIDMap(
            faiss.IndexPreTransform(
                faiss.NormalizationTransform(self.dimension),
                index
            )
        )

    def add(self, vectors: np.ndarray, ids: Optional[List[int]] = None):
        """添加向量"""
        if ids is None:
            ids = list(range(self.next_id, self.next_id + len(vectors)))

        # 记录ID映射
        for i, external_id in enumerate(ids):
            self.id_map[external_id] = self.next_id + i

        self.next_id += len(vectors)

        # 归一化(余弦距离)
        if self.metric_type == "cosine":
            faiss.normalize_L2(vectors)

        # 如果需要训练
        if not self.index.is_trained and hasattr(self.index, 'train'):
            self.index.train(vectors)

        # 添加向量
        self.index.add(vectors)

    def search(
        self,
        query: np.ndarray,
        k: int = 10,
        return_ids: bool = True
    ) -> Tuple[np.ndarray, np.ndarray]:
        """搜索"""
        # 归一化查询
        if self.metric_type == "cosine":
            query = query.copy()
            faiss.normalize_L2(query)

        # 确保是2D
        if query.ndim == 1:
            query = query.reshape(1, -1)

        # 搜索
        distances, indices = self.index.search(query, k)

        # 转换回外部ID
        if return_ids:
            external_ids = []
            for idx in indices[0]:
                if idx >= 0:  # -1表示无结果
                    external_id = self._get_external_id(idx)
                    external_ids.append(external_id)
                else:
                    external_ids.append(-1)
            return distances[0], np.array(external_ids)

        return distances[0], indices[0]

    def _get_external_id(self, internal_id: int) -> int:
        """获取外部ID"""
        for ext_id, int_id in self.id_map.items():
            if int_id == internal_id:
                return ext_id
        return -1

    def save(self, path: str):
        """保存索引"""
        os.makedirs(os.path.dirname(path), exist_ok=True)

        # 保存Faiss索引
        faiss.write_index(self.index, f"{path}.index")

        # 保存元数据
        metadata = {
            'id_map': self.id_map,
            'next_id': self.next_id,
            'dimension': self.dimension,
            'index_type': self.index_type,
            'metric_type': self.metric_type
        }
        with open(f"{path}.metadata", 'wb') as f:
            pickle.dump(metadata, f)

    def load(self, path: str):
        """加载索引"""
        # 加载Faiss索引
        self.index = faiss.read_index(f"{path}.index")

        # 加载元数据
        with open(f"{path}.metadata", 'rb') as f:
            metadata = pickle.load(f)

        self.id_map = metadata['id_map']
        self.next_id = metadata['next_id']
        self.dimension = metadata['dimension']
        self.index_type = metadata['index_type']
        self.metric_type = metadata['metric_type']

    def get_stats(self) -> dict:
        """获取统计信息"""
        return {
            'total_vectors': self.index.ntotal,
            'dimension': self.dimension,
            'index_type': self.index_type,
            'metric_type': self.metric_type,
            'memory_usage': self.index.memory_usage(),
            'is_trained': self.index.is_trained
        }

4.3 索引对比测试

import time
import matplotlib.pyplot as plt
from sklearn.metrics import average_precision_score

class IndexBenchmark:
    """索引性能测试"""

    def __init__(self, dimension=768, n_vectors=100000, n_queries=1000):
        self.dimension = dimension
        self.n_vectors = n_vectors
        self.n_queries = n_queries

        # 生成测试数据
        self.vectors = np.random.rand(n_vectors, dimension).astype('float32')
        self.queries = np.random.rand(n_queries, dimension).astype('float32')
        self.ground_truth = self._compute_ground_truth()

    def _compute_ground_truth(self, k=10):
        """计算精确搜索结果(基准)"""
        index = faiss.IndexFlatL2(self.dimension)
        index.add(self.vectors)
        distances, indices = index.search(self.queries, k)
        return list(zip(distances, indices))

    def benchmark_index(self, index_type: str, **kwargs) -> dict:
        """测试单个索引"""
        results = {}

        # 构建时间
        start = time.time()
        index = ProductionVectorIndex(self.dimension, index_type, **kwargs)
        index.add(self.vectors)
        build_time = time.time() - start
        results['build_time'] = build_time

        # 查询时间
        start = time.time()
        distances, indices = index.search(self.queries, k=10)
        query_time = time.time() - start
        results['query_time'] = query_time
        results['qps'] = self.n_queries / query_time

        # 内存使用
        results['memory'] = index.get_stats()['memory_usage']

        # 召回率计算
        recall = self._compute_recall(indices, self.ground_truth)
        results['recall@10'] = recall

        return results

    def _compute_recall(self, predicted, ground_truth):
        """计算召回率"""
        total = 0
        correct = 0

        for pred, gt in zip(predicted, ground_truth):
            gt_set = set(gt[1][0])  # ground truth indices
            pred_set = set(pred)
            intersection = gt_set & pred_set
            correct += len(intersection)
            total += len(gt_set)

        return correct / total if total > 0 else 0

    def compare_indexes(self, index_configs: dict) -> None:
        """对比多个索引配置"""
        results = {}

        for name, config in index_configs.items():
            print(f"Benchmarking {name}...")
            results[name] = self.benchmark_index(**config)

        # 打印结果
        print("\n" + "="*80)
        print(f"{'Index':<20} {'Build(s)':<10} {'Query(ms)':<12} {'QPS':<10} {'Recall@10':<12} {'Memory(MB)':<12}")
        print("="*80)

        for name, res in results.items():
            print(f"{name:<20} {res['build_time']:<10.2f} "
                  f"{res['query_time']*1000/self.n_queries:<12.2f} "
                  f"{res['qps']:<10.0f} {res['recall@10']:<12.4f} "
                  f"{res['memory']/1024/1024:<12.2f}")

        return results

# 运行基准测试
def run_benchmark():
    configs = {
        'Flat': {'index_type': 'flat'},
        'HNSW(standard)': {
            'index_type': 'hnsw',
            'M': 16,
            'ef_construction': 200,
            'ef_search': 50
        },
        'HNSW(fast)': {
            'index_type': 'hnsw',
            'M': 8,
            'ef_construction': 100,
            'ef_search': 30
        },
        'IVF': {
            'index_type': 'ivf',
            'nlist': 100,
            'nprobe': 10
        },
        'IVFPQ': {
            'index_type': 'ivfpq',
            'nlist': 1000,
            'm': 8,
            'nbits': 8,
            'nprobe': 100
        }
    }

    benchmark = IndexBenchmark(n_vectors=100000, n_queries=1000)
    results = benchmark.compare_indexes(configs)

    return results

4.4 实时更新索引

import faiss
import numpy as np
from threading import Lock

class RealTimeVectorIndex:
    """支持实时更新的向量索引"""

    def __init__(self, dimension, index_type="hnsw", **kwargs):
        self.dimension = dimension
        self.index_type = index_type
        self.lock = Lock()

        # 主索引
        self.main_index = ProductionVectorIndex(dimension, index_type, **kwargs)

        # 缓冲区(新向量暂存)
        self.buffer_vectors = []
        self.buffer_ids = []
        self.buffer_size = 1000  # 缓冲区大小

        # 重建索引阈值
        self.rebuild_threshold = 100000

    def add(self, vectors, ids=None):
        """添加向量(线程安全)"""
        with self.lock:
            if ids is None:
                ids = list(range(self._next_id(), self._next_id() + len(vectors)))

            self.buffer_vectors.extend(vectors)
            self.buffer_ids.extend(ids)

            # 缓冲区满时刷新
            if len(self.buffer_vectors) >= self.buffer_size:
                self._flush_buffer()

    def search(self, query, k=10):
        """搜索(合并主索引和缓冲区结果)"""
        with self.lock:
            # 搜索主索引
            main_distances, main_ids = self.main_index.search(query, k=k)

            # 搜索缓冲区
            buffer_results = []
            if self.buffer_vectors:
                buffer_results = self._search_buffer(query, k)

            # 合并结果
            combined = list(zip(main_distances.tolist(), main_ids.tolist()))
            combined.extend(buffer_results)

            # 排序并返回top-k
            combined.sort(key=lambda x: x[0])
            combined = combined[:k]

            distances = [x[0] for x in combined]
            ids = [x[1] for x in combined]

            return np.array(distances), np.array(ids)

    def _flush_buffer(self):
        """刷新缓冲区到主索引"""
        if not self.buffer_vectors:
            return

        # 添加到主索引
        vectors = np.array(self.buffer_vectors, dtype='float32')
        self.main_index.add(vectors, self.buffer_ids)

        # 清空缓冲区
        self.buffer_vectors = []
        self.buffer_ids = []

    def _search_buffer(self, query, k):
        """在缓冲区中搜索"""
        if not self.buffer_vectors:
            return []

        buffer_vectors = np.array(self.buffer_vectors, dtype='float32')
        distances = np.linalg.norm(buffer_vectors - query, axis=1)

        # 获取top-k
        top_k_indices = np.argsort(distances)[:k]
        results = []
        for idx in top_k_indices:
            results.append((float(distances[idx]), self.buffer_ids[idx]))

        return results

    def _next_id(self):
        """获取下一个ID"""
        return self.main_index.next_id + len(self.buffer_ids)

    def rebuild_index(self):
        """重建索引(当向量数量超过阈值时)"""
        with self.lock:
            print("Rebuilding index...")

            # 合并所有向量
            all_vectors = []
            all_ids = []

            # 这里需要从主索引导出所有向量
            # (Faiss不直接支持,需要预先保存)

            # 重建索引
            new_index = ProductionVectorIndex(
                self.dimension,
                self.index_type
            )

            if all_vectors:
                vectors = np.array(all_vectors, dtype='float32')
                new_index.add(vectors, all_ids)

            self.main_index = new_index

5. RAG检索优化实战

5.1 混合检索

import faiss
import numpy as np
from typing import List, Tuple, Dict

class HybridSearchRAG:
    """混合检索RAG:向量检索 + 关键词检索"""

    def __init__(self, dimension, embedding_model, bm25_index=None):
        self.dimension = dimension
        self.embedding_model = embedding_model

        # 向量索引
        self.vector_index = ProductionVectorIndex(dimension, "hnsw")

        # BM25索引(关键词检索)
        self.bm25_index = bm25_index

        # 文档存储
        self.documents = {}  # id -> document

    def add_documents(self, documents: List[Dict]):
        """添加文档"""
        for doc in documents:
            doc_id = doc['id']

            # 生成向量
            text = doc['text']
            vector = self.embedding_model.encode(text)

            # 添加到向量索引
            self.vector_index.add(
                np.array([vector], dtype='float32'),
                [doc_id]
            )

            # 添加到BM25
            if self.bm25_index:
                self.bm25_index.add(doc_id, text)

            # 存储文档
            self.documents[doc_id] = doc

    def search(
        self,
        query: str,
        k: int = 10,
        alpha: float = 0.7,  # 向量权重
        use_rerank: bool = True
    ) -> List[Dict]:
        """
        混合检索

        参数:
        - alpha: 向量检索权重 (1-alpha为关键词权重)
        - use_rerank: 是否使用重排序
        """
        # 获取查询向量
        query_vector = self.embedding_model.encode(query)

        # 1. 向量检索
        vec_distances, vec_ids = self.vector_index.search(
            np.array([query_vector], dtype='float32'),
            k=k*2
        )

        # 归一化向量距离分数
        vec_scores = {}
        for dist, doc_id in zip(vec_distances, vec_ids):
            vec_scores[doc_id] = 1.0 / (1.0 + dist)

        # 2. 关键词检索
        kw_scores = {}
        if self.bm25_index:
            kw_results = self.bm25_index.search(query, k=k*2)
            for doc_id, score in kw_results:
                kw_scores[doc_id] = score

        # 3. 混合评分
        combined_scores = {}
        all_ids = set(vec_scores.keys()) | set(kw_scores.keys())

        for doc_id in all_ids:
            vec_score = vec_scores.get(doc_id, 0)
            kw_score = kw_scores.get(doc_id, 0)

            # 线性加权
            combined_scores[doc_id] = alpha * vec_score + (1 - alpha) * kw_score

        # 4. 排序
        sorted_results = sorted(
            combined_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )[:k]

        # 5. 重排序
        if use_rerank:
            sorted_results = self._rerank(query, sorted_results)

        # 6. 返回文档
        results = []
        for doc_id, score in sorted_results:
            doc = self.documents.get(doc_id, {})
            doc['score'] = score
            doc['retrieval_method'] = 'hybrid'
            results.append(doc)

        return results

    def _rerank(self, query, results):
        """重排序(使用LLM或交叉编码器)"""
        # 简化版:可以用交叉编码器重新计算相关性
        reranked = []
        for doc_id, score in results:
            doc = self.documents.get(doc_id, {})
            # 这里可以使用更精确的相关性模型
            reranked.append((doc_id, score))

        return sorted(reranked, key=lambda x: x[1], reverse=True)

5.2 查询扩展与重写

import re
from typing import List

class QueryProcessor:
    """查询处理与扩展"""

    def __init__(self, llm_client=None):
        self.llm_client = llm_client

    def expand_query(
        self,
        query: str,
        method: str = "llm"
    ) -> List[str]:
        """
        查询扩展

        方法:
        - llm: 使用LLM生成扩展查询
        - synonyms: 同义词扩展
        - hyponyms: 下位词扩展
        """
        if method == "llm":
            return self._llm_expand(query)
        elif method == "synonyms":
            return self._synonym_expand(query)
        else:
            return [query]

    def _llm_expand(self, query: str) -> List[str]:
        """使用LLM扩展查询"""
        prompt = f"""
        为以下查询生成3-5个等价或相关的查询,用于信息检索:

        原查询: {query}

        扩展查询(仅返回查询列表,每行一个):
        """

        if self.llm_client:
            response = self.llm_client.complete(prompt)
            queries = [q.strip() for q in response.split('\n') if q.strip()]
            return [query] + queries
        else:
            return [query]

    def _synonym_expand(self, query: str) -> List[str]:
        """同义词扩展(简化版)"""
        # 实际应用中应使用词库或词向量
        synonym_dict = {
            'AI': ['人工智能', 'artificial intelligence', '机器学习', 'ML'],
            'LLM': ['大语言模型', 'large language model', 'GPT', '语言模型'],
            '检索': ['搜索', 'search', '信息检索'],
        }

        expanded = [query]
        for term, synonyms in synonym_dict.items():
            if term in query:
                for syn in synonyms:
                    expanded_query = query.replace(term, syn)
                    expanded.append(expanded_query)

        return list(set(expanded))

    def rewrite_query_for_retrieval(self, query: str) -> str:
        """
        为检索重写查询

        目标:使查询更适合向量检索
        - 分解复合查询
        - 补充缺失上下文
        - 标准化表述
        """
        prompt = f"""
        重写以下查询,使其更适合在知识库中检索相关文档。
        保持原意,但使用更精确、更完整的表述。

        原查询: {query}

        重写后的查询:
        """

        if self.llm_client:
            return self.llm_client.complete(prompt)
        else:
            return query

class MultiQueryRAG:
    """多查询RAG"""

    def __init__(self, vector_index, query_processor=None):
        self.vector_index = vector_index
        self.query_processor = query_processor or QueryProcessor()

    def search(self, query: str, k: int = 10) -> List[Dict]:
        """使用多个扩展查询进行检索"""
        # 1. 生成扩展查询
        expanded_queries = self.query_processor.expand_query(
            query,
            method="synonyms"
        )

        # 2. 对每个查询检索
        all_results = []
        for expanded_q in expanded_queries:
            # 将查询编码为向量
            query_vector = self._encode_query(expanded_q)

            # 检索
            distances, ids = self.vector_index.search(
                query_vector,
                k=k
            )

            all_results.extend(list(zip(ids, distances.tolist())))

        # 3. 去重和排序
        seen = set()
        unique_results = []
        for doc_id, dist in all_results:
            if doc_id not in seen:
                unique_results.append((doc_id, dist))
                seen.add(doc_id)

        unique_results.sort(key=lambda x: x[1])

        return unique_results[:k]

    def _encode_query(self, query: str) -> np.ndarray:
        """编码查询(简化)"""
        # 实际应用中应使用embedding模型
        return np.random.rand(768).astype('float32')

5.3 重排序(Reranking)

import numpy as np
from typing import List, Tuple

class Reranker:
    """重排序器"""

    def __init__(self, method="cross_encoder"):
        self.method = method

    def rerank(
        self,
        query: str,
        documents: List[Dict],
        top_k: int = 10
    ) -> List[Dict]:
        """
        重排序文档

        方法:
        - cross_encoder: 使用交叉编码器
        - llm_rerank: 使用LLM打分
        - keyword_boost: 关键词加权
        """
        if self.method == "cross_encoder":
            return self._cross_encoder_rerank(query, documents, top_k)
        elif self.method == "llm_rerank":
            return self._llm_rerank(query, documents, top_k)
        elif self.method == "keyword_boost":
            return self._keyword_boost_rerank(query, documents, top_k)
        else:
            return documents[:top_k]

    def _cross_encoder_rerank(
        self,
        query: str,
        documents: List[Dict],
        top_k: int
    ) -> List[Dict]:
        """使用交叉编码器重排序"""
        scores = []

        for doc in documents:
            # 计算查询-文档对的分数
            # 简化版:实际应用使用BERT/ColBERT等
            score = self._compute_cross_encoder_score(query, doc['text'])
            scores.append(score)

        # 按分数排序
        reranked = list(zip(documents, scores))
        reranked.sort(key=lambda x: x[1], reverse=True)

        return [doc for doc, _ in reranked[:top_k]]

    def _compute_cross_encoder_score(self, query: str, document: str) -> float:
        """计算交叉编码器分数(简化)"""
        # 实际应用中应使用预训练模型
        query_terms = set(query.lower().split())
        doc_terms = set(document.lower().split())

        overlap = len(query_terms & doc_terms)
        return overlap / len(query_terms) if query_terms else 0

    def _llm_rerank(
        self,
        query: str,
        documents: List[Dict],
        top_k: int
    ) -> List[Dict]:
        """使用LLM重排序"""
        scores = []

        for doc in documents:
            prompt = f"""
            评估以下文档与查询的相关性,返回0-1的分数:

            查询: {query}

            文档: {doc['text'][:500]}

            相关性分数(0-1):
            """

            # 简化版:实际应用中应调用LLM
            score = self._compute_cross_encoder_score(query, doc['text'])
            scores.append(score)

        # 排序
        reranked = list(zip(documents, scores))
        reranked.sort(key=lambda x: x[1], reverse=True)

        return [doc for doc, _ in reranked[:top_k]]

    def _keyword_boost_rerank(
        self,
        query: str,
        documents: List[Dict],
        top_k: int
    ) -> List[Dict]:
        """关键词加权重排序"""
        query_terms = set(query.lower().split())

        for doc in documents:
            # 计算原始分数(来自检索)
            base_score = doc.get('score', 0)

            # 计算关键词匹配分数
            doc_text = doc.get('text', '').lower()
            keyword_score = 0
            for term in query_terms:
                if term in doc_text:
                    keyword_score += 1

            # 混合分数
            doc['rerank_score'] = 0.7 * base_score + 0.3 * keyword_score

        # 排序
        documents.sort(key=lambda x: x.get('rerank_score', 0), reverse=True)

        return documents[:top_k]

5.4 RAG检索评估

from typing import List, Dict
import numpy as np

class RAGEvaluator:
    """RAG检索评估器"""

    def __init__(self):
        self.metrics = []

    def evaluate_retrieval(
        self,
        queries: List[str],
        ground_truth: List[List[int]],  # 每个查询的相关文档ID
        retrieved: List[List[Tuple[int, float]]],  # 检索结果
        k_values: List[int] = [1, 5, 10, 20]
    ) -> Dict[str, float]:
        """
        评估检索质量

        指标:
        - Recall@k: 召回率
        - Precision@k: 精确率
        - MAP@k: 平均精度
        - MRR@k: 平均倒数排名
        - NDCG@k: 归一化折损累积增益
        """
        metrics = {}

        for k in k_values:
            recall = self._compute_recall_at_k(queries, ground_truth, retrieved, k)
            precision = self._compute_precision_at_k(queries, ground_truth, retrieved, k)
            map_score = self._compute_map_at_k(queries, ground_truth, retrieved, k)
            mrr = self._compute_mrr(queries, ground_truth, retrieved, k)
            ndcg = self._compute_ndcg_at_k(queries, ground_truth, retrieved, k)

            metrics[f'recall@{k}'] = recall
            metrics[f'precision@{k}'] = precision
            metrics[f'map@{k}'] = map_score
            metrics[f'mrr@{k}'] = mrr
            metrics[f'ndcg@{k}'] = ndcg

        return metrics

    def _compute_recall_at_k(
        self,
        queries: List[str],
        ground_truth: List[List[int]],
        retrieved: List[List[Tuple[int, float]]],
        k: int
    ) -> float:
        """计算Recall@k"""
        total_recall = 0

        for gt_docs, ret_docs in zip(ground_truth, retrieved):
            gt_set = set(gt_docs)
            ret_set = set([doc_id for doc_id, _ in ret_docs[:k]])

            if gt_set:
                recall = len(gt_set & ret_set) / len(gt_set)
                total_recall += recall

        return total_recall / len(queries) if queries else 0

    def _compute_precision_at_k(
        self,
        queries: List[str],
        ground_truth: List[List[int]],
        retrieved: List[List[Tuple[int, float]]],
        k: int
    ) -> float:
        """计算Precision@k"""
        total_precision = 0

        for gt_docs, ret_docs in zip(ground_truth, retrieved):
            gt_set = set(gt_docs)
            ret_top_k = [doc_id for doc_id, _ in ret_docs[:k]]

            relevant = sum(1 for doc_id in ret_top_k if doc_id in gt_set)
            precision = relevant / k
            total_precision += precision

        return total_precision / len(queries) if queries else 0

    def _compute_map_at_k(
        self,
        queries: List[str],
        ground_truth: List[List[int]],
        retrieved: List[List[Tuple[int, float]]],
        k: int
    ) -> float:
        """计算MAP@k"""
        total_ap = 0

        for gt_docs, ret_docs in zip(ground_truth, retrieved):
            gt_set = set(gt_docs)
            ret_top_k = [doc_id for doc_id, _ in ret_docs[:k]]

            # 计算AP
            hits = 0
            precision_sum = 0
            for i, doc_id in enumerate(ret_top_k):
                if doc_id in gt_set:
                    hits += 1
                    precision_sum += hits / (i + 1)

            ap = precision_sum / len(gt_set) if gt_set else 0
            total_ap += ap

        return total_ap / len(queries) if queries else 0

    def _compute_mrr(
        self,
        queries: List[str],
        ground_truth: List[List[int]],
        retrieved: List[List[Tuple[int, float]]],
        k: int
    ) -> float:
        """计算MRR"""
        total_rr = 0

        for gt_docs, ret_docs in zip(ground_truth, retrieved):
            gt_set = set(gt_docs)
            ret_top_k = [doc_id for doc_id, _ in ret_docs[:k]]

            # 找到第一个相关文档的位置
            rr = 0
            for i, doc_id in enumerate(ret_top_k):
                if doc_id in gt_set:
                    rr = 1 / (i + 1)
                    break

            total_rr += rr

        return total_rr / len(queries) if queries else 0

    def _compute_ndcg_at_k(
        self,
        queries: List[str],
        ground_truth: List[List[int]],
        retrieved: List[List[Tuple[int, float]]],
        k: int
    ) -> float:
        """计算NDCG@k"""
        total_ndcg = 0

        for gt_docs, ret_docs in zip(ground_truth, retrieved):
            # 创建理想排序(所有相关文档在前)
            ideal_gain = [1] * len(gt_docs) + [0] * (k - len(gt_docs))
            dcg_ideal = sum(
                gain / np.log2(i + 2) for i, gain in enumerate(ideal_gain[:k])
            )

            # 计算实际DCG
            gt_set = set(gt_docs)
            actual_gain = [
                1 if doc_id in gt_set else 0
                for doc_id, _ in ret_docs[:k]
            ]
            dcg_actual = sum(
                gain / np.log2(i + 2) for i, gain in enumerate(actual_gain)
            )

            # NDCG
            ndcg = dcg_actual / dcg_ideal if dcg_ideal > 0 else 0
            total_ndcg += ndcg

        return total_ndcg / len(queries) if queries else 0

    def evaluate_end_to_end(
        self,
        rag_system,
        test_dataset: List[Dict],
        evaluator_model=None
    ) -> Dict[str, float]:
        """
        端到端评估(生成质量)

        指标:
        - Answer Relevance: 答案相关性
        - Faithfulness: 答案忠实度(基于检索的上下文)
        - Context Relevance: 上下文相关性
        """
        results = {
            'answer_relevance': [],
            'faithfulness': [],
            'context_relevance': []
        }

        for item in test_dataset:
            query = item['query']
            ground_truth_answer = item['answer']

            # 使用RAG系统生成答案
            response = rag_system.generate(query)
            answer = response['answer']
            context = response['context']

            # 评估答案相关性
            answer_relevance = self._evaluate_answer_relevance(
                query, answer, ground_truth_answer, evaluator_model
            )
            results['answer_relevance'].append(answer_relevance)

            # 评估忠实度
            faithfulness = self._evaluate_faithfulness(
                answer, context, evaluator_model
            )
            results['faithfulness'].append(faithfulness)

            # 评估上下文相关性
            context_relevance = self._evaluate_context_relevance(
                query, context, evaluator_model
            )
            results['context_relevance'].append(context_relevance)

        # 计算平均分
        return {
            key: np.mean(values)
            for key, values in results.items()
        }

    def _evaluate_answer_relevance(self, query, answer, ground_truth, model=None):
        """评估答案相关性"""
        # 简化版:实际应用中应使用嵌入相似度或LLM评估
        if not model:
            return 0.8  # 示例值
        return 0

    def _evaluate_faithfulness(self, answer, context, model=None):
        """评估忠实度(答案是否基于检索的上下文)"""
        # 简化版:实际应用中应使用LLM评估
        if not model:
            return 0.8  # 示例值
        return 0

    def _evaluate_context_relevance(self, query, context, model=None):
        """评估上下文相关性"""
        # 简化版:实际应用中应使用嵌入相似度或LLM评估
        if not model:
            return 0.8  # 示例值
        return 0

6. 面试高频问题

Q1: 精确检索和近似检索有什么区别?什么时候用哪个?

回答要点:

  1. 精确检索(暴力搜索)

    • 算法:计算查询与所有向量的距离
    • 时间复杂度:O(N×D)
    • 优点:100%准确,不需要训练
    • 缺点:数据量大时性能差
  2. 近似检索(ANN)

    • 算法:HNSW、IVF、IVFPQ等
    • 时间复杂度:O(logN×D)
    • 优点:查询快,支持大规模数据
    • 缺点:牺牲部分精度(召回率)
  3. 使用场景

    精确检索:
    - 数据量小(<10K)
    - 对召回率要求极高(100%)
    - 实时构建索引
    
    近似检索:
    - 数据量大(>10K)
    - 对查询延迟有要求(<50ms)
    - 接受95%-99%召回率

Q2: HNSW算法的原理是什么?为什么它是最流行的ANN算法?

回答要点:

  1. HNSW核心思想

    • 结合跳表和NSW(Navigable Small World)
    • 多层结构:顶层稀疏,底层稠密
    • 借跳表的思想实现快速定位
  2. 查询流程

    1. 从顶层入口点开始
    2. 在每层执行贪婪搜索(找到最近的节点)
    3. 逐层向下,最终在底层精确搜索
    4. 底层使用ef_search参数扩展搜索范围
  3. 优势

    • 对数级查询复杂度:O(logN)
    • 高召回率:接近精确检索
    • 支持动态插入删除
    • 内存占用可控
  4. 关键参数

    M: 每层连接数(影响召回率和构建时间)
    ef_construction: 构建时邻居数
    ef_search: 查询时候选列表大小

Q3: IVF索引的原理是什么?nlist和nprobe如何选择?

回答要点:

  1. IVF核心思想

    • 倒排索引(Inverted File)
    • 使用K-means聚类划分向量空间
    • 查询时只搜索相关聚类
  2. 参数选择

    nlist(聚类数):
    - 通常取 sqrt(N) 或 N/100
    - N=1M时,nlist=1000-10000
    - 太小:每个聚类太多向量,搜索慢
    - 太大:聚类中心存储开销大
    
    nprobe(探测聚类数):
    - 影响精度和速度
    - 一般取 nlist/10 到 nlist/2
    - nprobe越大召回率越高,但查询越慢
  3. 优化建议

    高精度场景:nprobe = nlist/2
    平衡场景:nprobe = nlist/10
    低延迟场景:nprobe = nlist/100

Q4: IVFPQ中的Product Quantization是如何工作的?

回答要点:

  1. PQ原理

    • 将D维向量分成m个子空间(每个D/m维)
    • 每个子空间独立量化(使用K-means聚类)
    • 用子空间中心点ID代替原始向量
  2. 编码过程

    原始向量 [v1, v2, ..., v768]
                  ↓
    划分为8个子空间,每个96维
                  ↓
    每个子空间量化为8位(256个中心点)
                  ↓
    编码结果 [c1, c2, ..., c8] (每个c是0-255)
  3. 压缩比

    原始: 768 × 4 bytes = 3072 bytes
    PQ编码: 8 × 1 byte = 8 bytes
    压缩比: 384x
  4. 距离计算优化

    • 使用查表法(Lookup Table)加速
    • 避免实数运算

Q5: 如何提高向量检索的召回率?

回答要点:

  1. 索引参数调优

    # HNSW: 增加ef_search
    index.hnsw.efSearch = 100  # 默认50
    
    # IVF: 增加nprobe
    index.nprobe = 100  # 默认10
    
    # IVFPQ: 增加nbits和nprobe
  2. 混合检索

    • 向量检索 + 关键词检索(BM25)
    • 融合多种相似度
  3. 查询优化

    • 查询扩展(同义词、相关词)
    • 查询重写(更精确的表述)
  4. 重排序

    • 使用交叉编码器(Cross Encoder)
    • LLM打分
  5. 使用Refine索引

    # 先用快速索引检索更多结果
    base_index = faiss.IndexIVFFlat(...)
    # 再用精确索引重排序
    refine_index = faiss.IndexRefineFlat(base_index, exact_index)

Q6: 如何处理大规模向量数据库(>1亿)?

回答要点:

  1. 磁盘索引

    • DiskANN:专为SSD设计
    • ScaNN:Google的ANN库
    • 分片策略:水平分片
  2. 内存优化

    • IVFPQ:量化压缩
    • 热数据常驻内存
    • 冷数据存磁盘
  3. 分布式架构

    方案1: 数据分片
    - 按向量ID范围分片
    - 每个节点管理一个分片
    
    方案2: 查询并行
    - 同时查询所有分片
    - 合并并重排序结果
    
    方案3: 混合架构
    - 元数据在数据库
    - 向量在向量存储
  4. 更新策略

    • 批量更新
    • 增量索引
    • 异步重建

Q7: RAG系统中如何优化检索质量?

回答要点:

  1. 检索前优化

    - 查询扩展:生成多个相关查询
    - 查询重写:使用LLM改写查询
    - 元数据过滤:基于标签、时间等过滤
  2. 检索中优化

    - 混合检索:向量 + BM25
    - 调整k值:检索更多候选
    - 使用高召回率索引配置
    
    # 示例
    index.hnsw.efSearch = 100
    k = 50  # 检索50个候选
  3. 检索后优化

    - 重排序:Cross Encoder / LLM
    - 去重:合并相似文档
    - 多样性:确保结果多样性
  4. 评估与迭代

    - 离线评估:Recall、Precision、MAP、MRR、NDCG
    - 在线评估:点击率、停留时间
    - A/B测试:对比不同策略

Q8: Faiss和Milvus有什么区别?

回答要点:

特性 Faiss Milvus
定位 向量检索库 向量数据库
功能 检索算法 存储+检索+AR
元数据 需自行管理 原生支持
CRUD 仅Add 完整CRUD
分布式 需自行实现 原生支持
持久化 需自行处理 内置
语言 C++/Python Go/Python
使用场景 嵌入式应用 独立部署

选择建议:

  • 需要完整向量数据库功能 → Milvus
  • 需要嵌入到现有系统 → Faiss
  • 需要元数据查询 → Milvus
  • 需要极致性能定制 → Faiss

总结

本模块深入讲解了Faiss向量检索架构的核心内容:

关键技术点

  1. Faiss索引类型

    • 精确索引:Flat
    • 近似索引:HNSW、IVF、IVFPQ
    • 分布式索引:DiskANN
  2. 核心算法

    • HNSW:分层小世界图
    • IVF:倒排聚类
    • PQ:乘积量化
  3. 生产优化

    • 索引选择决策
    • 参数调优(ef_search、nprobe)
    • 混合检索、重排序
    • 查询扩展
  4. RAG应用

    • 检索-生成流水线
    • 质量评估指标
    • 优化策略

下一步学习

close
arrow_upward