内容纲要
模块30: Faiss 向量检索架构详解
对应JD需求:微软中国 AI 知识库检索架构岗
核心技术:Faiss、HNSW、IVF、DiskANN、Embedding、RAG优化
目录
1. 向量检索基础
1.1 向量空间模型
# 向量检索的本质:在高维空间中寻找最相似的向量
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
# 基础相似度计算
def cosine_similarity(vec1, vec2):
return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
def euclidean_distance(vec1, vec2):
return np.linalg.norm(vec1 - vec2)
def dot_product_similarity(vec1, vec2):
return np.dot(vec1, vec2)
1.2 向量数据库核心指标
| 指标 | 说明 | 优化目标 |
|---|---|---|
| 召回率 (Recall) | 返回结果中包含真实最近邻的比例 | >95% |
| 查询延迟 (Latency) | 单次查询的响应时间 | <50ms |
| 吞吐量 (QPS) | 每秒处理查询数量 | >1000 |
| 索引内存 | 索引占用的内存空间 | 最小化 |
| 构建时间 | 建立索引所需时间 | 可接受 |
1.3 精确检索 vs 近似检索
# 精确检索:暴力搜索,保证100%准确
def exact_search(query, vectors, k=5):
"""暴力搜索,时间复杂度 O(N*D)"""
distances = [cosine_similarity(query, v) for v in vectors]
top_k_indices = np.argsort(distances)[-k:][::-1]
return [(idx, distances[idx]) for idx in top_k_indices]
# 近似检索:Faiss,牺牲小部分精度换取速度
import faiss
def approximate_search(query, index, k=5):
"""Faiss近似检索,时间复杂度 O(logN*D)"""
query = np.array([query]).astype('float32')
distances, indices = index.search(query, k)
return list(zip(indices[0], distances[0]))
2. Faiss框架架构
2.1 Faiss核心概念
import faiss
import numpy as np
# 1. 数据准备
dimension = 768 # 向量维度(如bert-base-768)
n_vectors = 100000
vectors = np.random.rand(n_vectors, dimension).astype('float32')
# 2. 索引创建
# Flat索引:精确检索,适合小数据集
index = faiss.IndexFlatL2(dimension)
# 3. 添加向量
index.add(vectors)
print(f"索引包含 {index.ntotal} 个向量")
# 4. 查询
query = np.random.rand(1, dimension).astype('float32')
k = 10
distances, indices = index.search(query, k)
2.2 Faiss索引类型树
Faiss索引类型
├── 精确索引
│ ├── IndexFlatL2 # L2距离
│ ├── IndexFlatIP # 内积
│ └── IndexFlatCosine # 余弦相似度
├── 基于量化的索引
│ ├── IndexIVFFlat # 倒排+平坦
│ ├── IndexIVFPQ # 倒排+乘积量化
│ └── IndexHNSW # 分层小世界图
└── 索引组合
├── IndexIVF # 倒排框架
├── IndexRefine # 精化索引
└── IndexIDMap # 带ID映射
2.3 Faiss GPU支持
import faiss
# GPU索引创建
def create_gpu_index(vectors, dimension):
cpu_index = faiss.IndexFlatL2(dimension)
cpu_index.add(vectors)
# 迁移到GPU
gpu_index = faiss.index_cpu_to_gpu(
faiss.StandardGpuResources(), 0, cpu_index
)
return gpu_index
# 多GPU查询
def multi_gpu_search(queries, indexes, k=10):
results = []
for gpu_id, index in enumerate(indexes):
distances, indices = index.search(queries, k)
results.append((distances, indices))
# 合并并重排序结果
return merge_and_rerank(results)
3. 核心索引算法详解
3.1 HNSW (Hierarchical Navigable Small World)
HNSW是目前最流行的近似最近邻算法之一,结合了分层导航和跳表思想。
import faiss
import numpy as np
class HNSWIndex:
"""HNSW索引封装"""
def __init__(self, dimension, ef_construction=200, M=16, ef_search=50):
"""
参数说明:
- ef_construction: 构建时每层保留的邻居数,越大精度越高,构建越慢
- M: 每层每个节点的最大连接数,影响图密度
- ef_search: 查询时动态扩展的候选列表大小
"""
self.dimension = dimension
self.index = faiss.IndexHNSWFlat(dimension, M)
self.index.hnsw.efConstruction = ef_construction
self.index.hnsw.efSearch = ef_search
def add(self, vectors, ids=None):
"""添加向量"""
if ids is not None:
self.index.add_with_ids(vectors, ids)
else:
self.index.add(vectors)
def search(self, query, k=10):
"""搜索"""
if isinstance(query, np.ndarray) and query.ndim == 1:
query = query.reshape(1, -1)
distances, indices = self.index.search(query, k)
return distances[0], indices[0]
def search_batch(self, queries, k=10):
"""批量搜索"""
distances, indices = self.index.search(queries, k)
return distances, indices
def save(self, path):
"""保存索引"""
faiss.write_index(self.index, path)
def load(self, path):
"""加载索引"""
self.index = faiss.read_index(path)
# 使用示例
def hnsw_example():
dimension = 768
n_vectors = 10000
vectors = np.random.rand(n_vectors, dimension).astype('float32')
# 创建HNSW索引
hnsw = HNSWIndex(
dimension,
ef_construction=200, # 构建参数
M=16, # 连接度
ef_search=50 # 查询参数
)
hnsw.add(vectors)
# 查询
query = np.random.rand(dimension).astype('float32')
distances, indices = hnsw.search(query, k=5)
return distances, indices
3.2 HNSW算法原理
HNSW索引构建过程:
第0层(最底层)
┌─────────────────────────────────────┐
│ 节点1 ──── 节点2 ──── 节点3 │ ← 基础图,所有点
│ │ │ │ │ 边数最多(M个邻居)
│ 节点4 ──── 节点5 ──── 节点6 │
└─────────────────────────────────────┘
第1层
┌─────────────────────────────────────┐
│ 节点1 ──────────── 节点5 │ ← 稀疏图,部分点
│ │ │ 边数较少
│ 节点8 ──── 节点9 │
└─────────────────────────────────────┘
↓ (指向更靠近查询的点)
第2层(最顶层)
┌─────────────────────────────────────┐
│ 节点1 ────────────────── 节点9 │ ← 最稀疏,快速定位
└─────────────────────────────────────┘
# HNSW查询流程
def hnsw_search_process(query, index):
"""
HNSW查询流程:
1. 从顶层入口点开始
2. 每层进行贪婪搜索,找到最近邻
3. 逐层向下,最后在底层精确搜索
"""
# 概念代码,展示查询流程
current_point = index.entry_point
# 从顶层向下搜索
for level in reversed(range(index.max_level)):
# 当前层的贪婪搜索
candidates = index.get_neighbors(current_point, level)
current_point = min(candidates, key=lambda p: distance(query, p))
# 底层精细搜索(ef_search扩展)
visited = set()
results = []
candidates = index.get_neighbors(current_point, 0)
while candidates:
point = candidates.pop()
if point in visited:
continue
visited.add(point)
# 计算距离并更新结果
dist = distance(query, point)
if len(results) < index.k or dist < results[-1].distance:
results.append(Result(point, dist))
results.sort(key=lambda r: r.distance)
results = results[:index.k]
# 动态扩展候选集
if len(visited) < index.ef_search:
candidates.extend(index.get_neighbors(point, 0))
return results
3.3 IVF (Inverted File Index)
IVF将向量空间划分为多个聚类,搜索时只搜索相关聚类。
import faiss
import numpy as np
class IVFIndex:
"""IVF索引封装"""
def __init__(self, dimension, nlist=100, nprobe=10, quantizer=None):
"""
参数说明:
- nlist: 聚类中心数量,通常取 sqrt(N) 或 N/100
- nprobe: 搜索时探测的聚类数,影响精度/速度权衡
- quantizer: 量化器,默认使用FlatL2
"""
self.dimension = dimension
self.nlist = nlist
# 使用FlatL2作为quantizer
quantizer = faiss.IndexFlatL2(dimension)
# 创建IVF索引
self.index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
self.index.nprobe = nprobe
def train(self, vectors):
"""训练聚类(必须先训练再添加)"""
self.index.train(vectors)
def add(self, vectors):
"""添加向量"""
if not self.index.is_trained:
self.train(vectors)
self.index.add(vectors)
def search(self, query, k=10):
"""搜索"""
if isinstance(query, np.ndarray) and query.ndim == 1:
query = query.reshape(1, -1)
distances, indices = self.index.search(query, k)
return distances[0], indices[0]
# 使用示例
def ivf_example():
dimension = 768
n_vectors = 100000
vectors = np.random.rand(n_vectors, dimension).astype('float32')
# 创建IVF索引
ivf = IVFIndex(
dimension,
nlist=1000, # 聚类数
nprobe=100 # 搜索时探测的聚类数
)
# 先训练再添加
ivf.train(vectors)
ivf.add(vectors)
# 查询
query = np.random.rand(dimension).astype('float32')
distances, indices = ivf.search(query, k=5)
return distances, indices
3.4 IVF索引原理
IVF索引结构:
训练阶段:
┌─────────────────────────────────────────┐
│ 向量集合 V = {v1, v2, ..., vN} │
│ │
│ 使用K-means聚类: │
│ - 随机选择nlist个聚类中心C1...Cnlist │
│ - 将每个向量分配到最近的聚类 │
│ - 迭代优化聚类中心位置 │
└─────────────────────────────────────────┘
索引结构:
┌─────────────────────────────────────────┐
│ 倒排文件(Inverted File): │
│ │
│ Cluster 1: [v1, v15, v23, ...] │
│ Cluster 2: [v5, v18, v30, ...] │
│ Cluster 3: [v7, v9, v12, ...] │
│ ... │
│ Cluster nlist: [v3, v11, v45, ...] │
└─────────────────────────────────────────┘
查询阶段:
┌─────────────────────────────────────────┐
│ 1. 找到离查询q最近的聚类中心 │
│ 2. 在最近的nprobe个聚类中搜索 │
│ 3. 合并结果并返回top-k │
└─────────────────────────────────────────┘
3.5 IVFPQ (Product Quantization)
IVFPQ在IVF基础上使用乘积量化,大幅减少内存占用。
import faiss
import numpy as np
class IVFPQIndex:
"""IVFPQ索引封装"""
def __init__(self, dimension, nlist=1000, m=8, nbits=8, nprobe=100):
"""
参数说明:
- nlist: 聚类数量
- m: PQ量化子空间数量,通常为维度/子空间大小
- nbits: 每个子空间的编码位数(bit数),决定codebook大小
- nprobe: 搜索时探测的聚类数
"""
self.dimension = dimension
self.m = m
self.nbits = nbits
# 创建quantizer
quantizer = faiss.IndexFlatL2(dimension)
# 创建IVFPQ索引
self.index = faiss.IndexIVFPQ(quantizer, dimension, nlist, m, nbits)
self.index.nprobe = nprobe
def train(self, vectors):
"""训练聚类和量化器"""
self.index.train(vectors)
def add(self, vectors):
"""添加向量"""
if not self.index.is_trained:
self.train(vectors)
self.index.add(vectors)
def search(self, query, k=10):
"""搜索"""
if isinstance(query, np.ndarray) and query.ndim == 1:
query = query.reshape(1, -1)
distances, indices = self.index.search(query, k)
return distances[0], indices[0]
def get_memory_usage(self):
"""获取内存使用信息"""
return {
'total_memory': self.index.memory_usage(),
'stored_vectors': self.index.ntotal,
'quantization_bits': self.nbits,
'subspaces': self.m
}
# 使用示例
def ivfpq_example():
dimension = 768
n_vectors = 1000000
vectors = np.random.rand(n_vectors, dimension).astype('float32')
# 创建IVFPQ索引(适合超大规模数据)
ivfpq = IVFPQIndex(
dimension,
nlist=1000, # 聚类数
m=8, # 量化子空间数(768/8=96维/子空间)
nbits=8, # 每个子空间8bit(256个codebook向量)
nprobe=100 # 搜索时探测100个聚类
)
ivfpq.train(vectors)
ivfpq.add(vectors)
# 查询
query = np.random.rand(dimension).astype('float32')
distances, indices = ivfpq.search(query, k=5)
# 内存使用
memory = ivfpq.get_memory_usage()
return distances, indices, memory
3.6 PQ量化原理
乘积量化(Product Quantization):
原始向量(D维):
[v1, v2, v3, ..., vD]
划分为m个子空间(每个D/m维):
Subspace 1: [v1, v2, ..., vD/m]
Subspace 2: [vD/m+1, ..., v2D/m]
...
Subspace m: [..., vD]
每个子空间独立量化:
┌─────────────────────────────────────────┐
│ Subspace 1: │
│ - 使用K-means聚类得到2^nbits个中心点 │
│ - 原始向量替换为最近的中心点ID │
│ │
│ Subspace 2: │
│ - 同样操作... │
└─────────────────────────────────────────┘
编码结果:
[code1, code2, ..., codem] (每个code是nbits位)
压缩比例:
原始: D × 4 bytes (float32)
编码: m × nbits bits
压缩比 = (D × 32) / (m × nbits)
3.7 DiskANN (Disk-based Approximate Nearest Neighbor)
DiskANN专为超大规模数据(>1亿向量)设计,支持SSD上的索引。
# 注意:Faiss的DiskANN支持是实验性的
# 生产环境建议使用Microsoft的DiskANN实现
class DiskANN:
"""DiskANN概念实现"""
def __init__(self, dimension, max_degree=64, search_list_size=100):
"""
参数说明:
- max_degree: 图中每个节点的最大度数
- search_list_size: 搜索时的候选列表大小
"""
self.dimension = dimension
self.max_degree = max_degree
self.search_list_size = search_list_size
def build_index(self, vectors, storage_path):
"""
构建索引并存储到磁盘
关键技术:
1. Vamana图构建(类似HNSW)
2. 节点数据存储在SSD
3. 索引结构常驻内存
"""
# 概念代码
self.graph = build_vamana_graph(
vectors,
max_degree=self.max_degree
)
# 将向量数据存到磁盘
self.vector_storage = DiskVectorStorage(storage_path)
self.vector_storage.write(vectors)
# 图索引常驻内存(轻量级)
self.index_size = len(self.graph) * max_degree * 4 # 指针大小
def search(self, query, k=10):
"""
磁盘感知的搜索算法
优化策略:
1. 预取:预加载可能访问的向量数据
2. 缓存:热点数据常驻内存
3. 批量:减少磁盘I/O次数
"""
# 1. 在图上找到候选节点(内存操作)
candidate_ids = self.graph_search(query, self.search_list_size)
# 2. 批量加载向量数据(磁盘操作)
vectors = self.vector_storage.read_batch(candidate_ids)
# 3. 精确计算距离并排序
distances = [distance(query, v) for v in vectors]
results = sorted(zip(candidate_ids, distances), key=lambda x: x[1])[:k]
return results
# 实际使用建议:使用Microsoft的DiskANN或ScaNN
4. 生产环境优化
4.1 索引选择决策树
数据规模决策:
┌─────────────────────────────────────────┐
│ N < 10K: │
│ → IndexFlatL2 (精确搜索) │
│ │
│ 10K <= N < 1M: │
│ 如果内存充足 → IndexHNSW │
│ 如果内存受限 → IndexIVFPQ │
│ │
│ 1M <= N < 10M: │
│ 高精度需求 → IndexHNSW + GPU │
│ 平衡需求 → IndexIVFPQ │
│ │
│ N >= 10M: │
│ 可全内存 → IVFPQ + GPU │
│ 需用磁盘 → DiskANN / ScaNN │
└─────────────────────────────────────────┘
4.2 生产级Faiss封装
import faiss
import numpy as np
from typing import Optional, List, Tuple
import pickle
import os
class ProductionVectorIndex:
"""生产级向量索引"""
def __init__(
self,
dimension: int,
index_type: str = "hnsw",
metric_type: str = "l2",
**kwargs
):
"""
参数:
- dimension: 向量维度
- index_type: 索引类型 (flat, hnsw, ivf, ivfpq)
- metric_type: 距离类型 (l2, ip, cosine)
- kwargs: 索引特定参数
"""
self.dimension = dimension
self.index_type = index_type
self.metric_type = metric_type
self.id_map = {} # 外部ID到内部ID的映射
self.next_id = 0
self.index = self._create_index(**kwargs)
def _create_index(self, **kwargs):
"""创建索引"""
index = None
if self.index_type == "flat":
index = self._create_flat_index()
elif self.index_type == "hnsw":
index = self._create_hnsw_index(**kwargs)
elif self.index_type == "ivf":
index = self._create_ivf_index(**kwargs)
elif self.index_type == "ivfpq":
index = self._create_ivfpq_index(**kwargs)
else:
raise ValueError(f"Unknown index type: {self.index_type}")
# 应用余弦距离(如果需要)
if self.metric_type == "cosine":
index = self._apply_cosine_wrapper(index)
return index
def _create_flat_index(self):
"""创建精确索引"""
if self.metric_type == "l2":
" return faiss.IndexFlatL2(self.dimension)
elif self.metric_type == "ip":
return faiss.IndexFlatIP(self.dimension)
else:
return faiss.IndexFlatL2(self.dimension)
def _create_hnsw_index(self, M=16, ef_construction=200, ef_search=50):
"""创建HNSW索引"""
index = faiss.IndexHNSWFlat(self.dimension, M)
index.hnsw.efConstruction = ef_construction
index.hnsw.efSearch = ef_search
return index
def _create_ivf_index(self, nlist=100, nprobe=10):
"""创建IVF索引"""
quantizer = faiss.IndexFlatL2(self.dimension)
index = faiss.IndexIVFFlat(quantizer, self.dimension, nlist)
index.nprobe = nprobe
return index
def _create_ivfpq_index(self, nlist=1000, m=8, nbits=8, nprobe=100):
"""创建IVFPQ索引"""
quantizer = faiss.IndexFlatL2(self.dimension)
index = faiss.IndexIVFPQ(quantizer, self.dimension, nlist, m, nbits)
index.nprobe = nprobe
return index
def _apply_cosine_wrapper(self, index):
"""应用余弦距离(归一化包装器)"""
return faiss.IndexIDMap(
faiss.IndexPreTransform(
faiss.NormalizationTransform(self.dimension),
index
)
)
def add(self, vectors: np.ndarray, ids: Optional[List[int]] = None):
"""添加向量"""
if ids is None:
ids = list(range(self.next_id, self.next_id + len(vectors)))
# 记录ID映射
for i, external_id in enumerate(ids):
self.id_map[external_id] = self.next_id + i
self.next_id += len(vectors)
# 归一化(余弦距离)
if self.metric_type == "cosine":
faiss.normalize_L2(vectors)
# 如果需要训练
if not self.index.is_trained and hasattr(self.index, 'train'):
self.index.train(vectors)
# 添加向量
self.index.add(vectors)
def search(
self,
query: np.ndarray,
k: int = 10,
return_ids: bool = True
) -> Tuple[np.ndarray, np.ndarray]:
"""搜索"""
# 归一化查询
if self.metric_type == "cosine":
query = query.copy()
faiss.normalize_L2(query)
# 确保是2D
if query.ndim == 1:
query = query.reshape(1, -1)
# 搜索
distances, indices = self.index.search(query, k)
# 转换回外部ID
if return_ids:
external_ids = []
for idx in indices[0]:
if idx >= 0: # -1表示无结果
external_id = self._get_external_id(idx)
external_ids.append(external_id)
else:
external_ids.append(-1)
return distances[0], np.array(external_ids)
return distances[0], indices[0]
def _get_external_id(self, internal_id: int) -> int:
"""获取外部ID"""
for ext_id, int_id in self.id_map.items():
if int_id == internal_id:
return ext_id
return -1
def save(self, path: str):
"""保存索引"""
os.makedirs(os.path.dirname(path), exist_ok=True)
# 保存Faiss索引
faiss.write_index(self.index, f"{path}.index")
# 保存元数据
metadata = {
'id_map': self.id_map,
'next_id': self.next_id,
'dimension': self.dimension,
'index_type': self.index_type,
'metric_type': self.metric_type
}
with open(f"{path}.metadata", 'wb') as f:
pickle.dump(metadata, f)
def load(self, path: str):
"""加载索引"""
# 加载Faiss索引
self.index = faiss.read_index(f"{path}.index")
# 加载元数据
with open(f"{path}.metadata", 'rb') as f:
metadata = pickle.load(f)
self.id_map = metadata['id_map']
self.next_id = metadata['next_id']
self.dimension = metadata['dimension']
self.index_type = metadata['index_type']
self.metric_type = metadata['metric_type']
def get_stats(self) -> dict:
"""获取统计信息"""
return {
'total_vectors': self.index.ntotal,
'dimension': self.dimension,
'index_type': self.index_type,
'metric_type': self.metric_type,
'memory_usage': self.index.memory_usage(),
'is_trained': self.index.is_trained
}
4.3 索引对比测试
import time
import matplotlib.pyplot as plt
from sklearn.metrics import average_precision_score
class IndexBenchmark:
"""索引性能测试"""
def __init__(self, dimension=768, n_vectors=100000, n_queries=1000):
self.dimension = dimension
self.n_vectors = n_vectors
self.n_queries = n_queries
# 生成测试数据
self.vectors = np.random.rand(n_vectors, dimension).astype('float32')
self.queries = np.random.rand(n_queries, dimension).astype('float32')
self.ground_truth = self._compute_ground_truth()
def _compute_ground_truth(self, k=10):
"""计算精确搜索结果(基准)"""
index = faiss.IndexFlatL2(self.dimension)
index.add(self.vectors)
distances, indices = index.search(self.queries, k)
return list(zip(distances, indices))
def benchmark_index(self, index_type: str, **kwargs) -> dict:
"""测试单个索引"""
results = {}
# 构建时间
start = time.time()
index = ProductionVectorIndex(self.dimension, index_type, **kwargs)
index.add(self.vectors)
build_time = time.time() - start
results['build_time'] = build_time
# 查询时间
start = time.time()
distances, indices = index.search(self.queries, k=10)
query_time = time.time() - start
results['query_time'] = query_time
results['qps'] = self.n_queries / query_time
# 内存使用
results['memory'] = index.get_stats()['memory_usage']
# 召回率计算
recall = self._compute_recall(indices, self.ground_truth)
results['recall@10'] = recall
return results
def _compute_recall(self, predicted, ground_truth):
"""计算召回率"""
total = 0
correct = 0
for pred, gt in zip(predicted, ground_truth):
gt_set = set(gt[1][0]) # ground truth indices
pred_set = set(pred)
intersection = gt_set & pred_set
correct += len(intersection)
total += len(gt_set)
return correct / total if total > 0 else 0
def compare_indexes(self, index_configs: dict) -> None:
"""对比多个索引配置"""
results = {}
for name, config in index_configs.items():
print(f"Benchmarking {name}...")
results[name] = self.benchmark_index(**config)
# 打印结果
print("\n" + "="*80)
print(f"{'Index':<20} {'Build(s)':<10} {'Query(ms)':<12} {'QPS':<10} {'Recall@10':<12} {'Memory(MB)':<12}")
print("="*80)
for name, res in results.items():
print(f"{name:<20} {res['build_time']:<10.2f} "
f"{res['query_time']*1000/self.n_queries:<12.2f} "
f"{res['qps']:<10.0f} {res['recall@10']:<12.4f} "
f"{res['memory']/1024/1024:<12.2f}")
return results
# 运行基准测试
def run_benchmark():
configs = {
'Flat': {'index_type': 'flat'},
'HNSW(standard)': {
'index_type': 'hnsw',
'M': 16,
'ef_construction': 200,
'ef_search': 50
},
'HNSW(fast)': {
'index_type': 'hnsw',
'M': 8,
'ef_construction': 100,
'ef_search': 30
},
'IVF': {
'index_type': 'ivf',
'nlist': 100,
'nprobe': 10
},
'IVFPQ': {
'index_type': 'ivfpq',
'nlist': 1000,
'm': 8,
'nbits': 8,
'nprobe': 100
}
}
benchmark = IndexBenchmark(n_vectors=100000, n_queries=1000)
results = benchmark.compare_indexes(configs)
return results
4.4 实时更新索引
import faiss
import numpy as np
from threading import Lock
class RealTimeVectorIndex:
"""支持实时更新的向量索引"""
def __init__(self, dimension, index_type="hnsw", **kwargs):
self.dimension = dimension
self.index_type = index_type
self.lock = Lock()
# 主索引
self.main_index = ProductionVectorIndex(dimension, index_type, **kwargs)
# 缓冲区(新向量暂存)
self.buffer_vectors = []
self.buffer_ids = []
self.buffer_size = 1000 # 缓冲区大小
# 重建索引阈值
self.rebuild_threshold = 100000
def add(self, vectors, ids=None):
"""添加向量(线程安全)"""
with self.lock:
if ids is None:
ids = list(range(self._next_id(), self._next_id() + len(vectors)))
self.buffer_vectors.extend(vectors)
self.buffer_ids.extend(ids)
# 缓冲区满时刷新
if len(self.buffer_vectors) >= self.buffer_size:
self._flush_buffer()
def search(self, query, k=10):
"""搜索(合并主索引和缓冲区结果)"""
with self.lock:
# 搜索主索引
main_distances, main_ids = self.main_index.search(query, k=k)
# 搜索缓冲区
buffer_results = []
if self.buffer_vectors:
buffer_results = self._search_buffer(query, k)
# 合并结果
combined = list(zip(main_distances.tolist(), main_ids.tolist()))
combined.extend(buffer_results)
# 排序并返回top-k
combined.sort(key=lambda x: x[0])
combined = combined[:k]
distances = [x[0] for x in combined]
ids = [x[1] for x in combined]
return np.array(distances), np.array(ids)
def _flush_buffer(self):
"""刷新缓冲区到主索引"""
if not self.buffer_vectors:
return
# 添加到主索引
vectors = np.array(self.buffer_vectors, dtype='float32')
self.main_index.add(vectors, self.buffer_ids)
# 清空缓冲区
self.buffer_vectors = []
self.buffer_ids = []
def _search_buffer(self, query, k):
"""在缓冲区中搜索"""
if not self.buffer_vectors:
return []
buffer_vectors = np.array(self.buffer_vectors, dtype='float32')
distances = np.linalg.norm(buffer_vectors - query, axis=1)
# 获取top-k
top_k_indices = np.argsort(distances)[:k]
results = []
for idx in top_k_indices:
results.append((float(distances[idx]), self.buffer_ids[idx]))
return results
def _next_id(self):
"""获取下一个ID"""
return self.main_index.next_id + len(self.buffer_ids)
def rebuild_index(self):
"""重建索引(当向量数量超过阈值时)"""
with self.lock:
print("Rebuilding index...")
# 合并所有向量
all_vectors = []
all_ids = []
# 这里需要从主索引导出所有向量
# (Faiss不直接支持,需要预先保存)
# 重建索引
new_index = ProductionVectorIndex(
self.dimension,
self.index_type
)
if all_vectors:
vectors = np.array(all_vectors, dtype='float32')
new_index.add(vectors, all_ids)
self.main_index = new_index
5. RAG检索优化实战
5.1 混合检索
import faiss
import numpy as np
from typing import List, Tuple, Dict
class HybridSearchRAG:
"""混合检索RAG:向量检索 + 关键词检索"""
def __init__(self, dimension, embedding_model, bm25_index=None):
self.dimension = dimension
self.embedding_model = embedding_model
# 向量索引
self.vector_index = ProductionVectorIndex(dimension, "hnsw")
# BM25索引(关键词检索)
self.bm25_index = bm25_index
# 文档存储
self.documents = {} # id -> document
def add_documents(self, documents: List[Dict]):
"""添加文档"""
for doc in documents:
doc_id = doc['id']
# 生成向量
text = doc['text']
vector = self.embedding_model.encode(text)
# 添加到向量索引
self.vector_index.add(
np.array([vector], dtype='float32'),
[doc_id]
)
# 添加到BM25
if self.bm25_index:
self.bm25_index.add(doc_id, text)
# 存储文档
self.documents[doc_id] = doc
def search(
self,
query: str,
k: int = 10,
alpha: float = 0.7, # 向量权重
use_rerank: bool = True
) -> List[Dict]:
"""
混合检索
参数:
- alpha: 向量检索权重 (1-alpha为关键词权重)
- use_rerank: 是否使用重排序
"""
# 获取查询向量
query_vector = self.embedding_model.encode(query)
# 1. 向量检索
vec_distances, vec_ids = self.vector_index.search(
np.array([query_vector], dtype='float32'),
k=k*2
)
# 归一化向量距离分数
vec_scores = {}
for dist, doc_id in zip(vec_distances, vec_ids):
vec_scores[doc_id] = 1.0 / (1.0 + dist)
# 2. 关键词检索
kw_scores = {}
if self.bm25_index:
kw_results = self.bm25_index.search(query, k=k*2)
for doc_id, score in kw_results:
kw_scores[doc_id] = score
# 3. 混合评分
combined_scores = {}
all_ids = set(vec_scores.keys()) | set(kw_scores.keys())
for doc_id in all_ids:
vec_score = vec_scores.get(doc_id, 0)
kw_score = kw_scores.get(doc_id, 0)
# 线性加权
combined_scores[doc_id] = alpha * vec_score + (1 - alpha) * kw_score
# 4. 排序
sorted_results = sorted(
combined_scores.items(),
key=lambda x: x[1],
reverse=True
)[:k]
# 5. 重排序
if use_rerank:
sorted_results = self._rerank(query, sorted_results)
# 6. 返回文档
results = []
for doc_id, score in sorted_results:
doc = self.documents.get(doc_id, {})
doc['score'] = score
doc['retrieval_method'] = 'hybrid'
results.append(doc)
return results
def _rerank(self, query, results):
"""重排序(使用LLM或交叉编码器)"""
# 简化版:可以用交叉编码器重新计算相关性
reranked = []
for doc_id, score in results:
doc = self.documents.get(doc_id, {})
# 这里可以使用更精确的相关性模型
reranked.append((doc_id, score))
return sorted(reranked, key=lambda x: x[1], reverse=True)
5.2 查询扩展与重写
import re
from typing import List
class QueryProcessor:
"""查询处理与扩展"""
def __init__(self, llm_client=None):
self.llm_client = llm_client
def expand_query(
self,
query: str,
method: str = "llm"
) -> List[str]:
"""
查询扩展
方法:
- llm: 使用LLM生成扩展查询
- synonyms: 同义词扩展
- hyponyms: 下位词扩展
"""
if method == "llm":
return self._llm_expand(query)
elif method == "synonyms":
return self._synonym_expand(query)
else:
return [query]
def _llm_expand(self, query: str) -> List[str]:
"""使用LLM扩展查询"""
prompt = f"""
为以下查询生成3-5个等价或相关的查询,用于信息检索:
原查询: {query}
扩展查询(仅返回查询列表,每行一个):
"""
if self.llm_client:
response = self.llm_client.complete(prompt)
queries = [q.strip() for q in response.split('\n') if q.strip()]
return [query] + queries
else:
return [query]
def _synonym_expand(self, query: str) -> List[str]:
"""同义词扩展(简化版)"""
# 实际应用中应使用词库或词向量
synonym_dict = {
'AI': ['人工智能', 'artificial intelligence', '机器学习', 'ML'],
'LLM': ['大语言模型', 'large language model', 'GPT', '语言模型'],
'检索': ['搜索', 'search', '信息检索'],
}
expanded = [query]
for term, synonyms in synonym_dict.items():
if term in query:
for syn in synonyms:
expanded_query = query.replace(term, syn)
expanded.append(expanded_query)
return list(set(expanded))
def rewrite_query_for_retrieval(self, query: str) -> str:
"""
为检索重写查询
目标:使查询更适合向量检索
- 分解复合查询
- 补充缺失上下文
- 标准化表述
"""
prompt = f"""
重写以下查询,使其更适合在知识库中检索相关文档。
保持原意,但使用更精确、更完整的表述。
原查询: {query}
重写后的查询:
"""
if self.llm_client:
return self.llm_client.complete(prompt)
else:
return query
class MultiQueryRAG:
"""多查询RAG"""
def __init__(self, vector_index, query_processor=None):
self.vector_index = vector_index
self.query_processor = query_processor or QueryProcessor()
def search(self, query: str, k: int = 10) -> List[Dict]:
"""使用多个扩展查询进行检索"""
# 1. 生成扩展查询
expanded_queries = self.query_processor.expand_query(
query,
method="synonyms"
)
# 2. 对每个查询检索
all_results = []
for expanded_q in expanded_queries:
# 将查询编码为向量
query_vector = self._encode_query(expanded_q)
# 检索
distances, ids = self.vector_index.search(
query_vector,
k=k
)
all_results.extend(list(zip(ids, distances.tolist())))
# 3. 去重和排序
seen = set()
unique_results = []
for doc_id, dist in all_results:
if doc_id not in seen:
unique_results.append((doc_id, dist))
seen.add(doc_id)
unique_results.sort(key=lambda x: x[1])
return unique_results[:k]
def _encode_query(self, query: str) -> np.ndarray:
"""编码查询(简化)"""
# 实际应用中应使用embedding模型
return np.random.rand(768).astype('float32')
5.3 重排序(Reranking)
import numpy as np
from typing import List, Tuple
class Reranker:
"""重排序器"""
def __init__(self, method="cross_encoder"):
self.method = method
def rerank(
self,
query: str,
documents: List[Dict],
top_k: int = 10
) -> List[Dict]:
"""
重排序文档
方法:
- cross_encoder: 使用交叉编码器
- llm_rerank: 使用LLM打分
- keyword_boost: 关键词加权
"""
if self.method == "cross_encoder":
return self._cross_encoder_rerank(query, documents, top_k)
elif self.method == "llm_rerank":
return self._llm_rerank(query, documents, top_k)
elif self.method == "keyword_boost":
return self._keyword_boost_rerank(query, documents, top_k)
else:
return documents[:top_k]
def _cross_encoder_rerank(
self,
query: str,
documents: List[Dict],
top_k: int
) -> List[Dict]:
"""使用交叉编码器重排序"""
scores = []
for doc in documents:
# 计算查询-文档对的分数
# 简化版:实际应用使用BERT/ColBERT等
score = self._compute_cross_encoder_score(query, doc['text'])
scores.append(score)
# 按分数排序
reranked = list(zip(documents, scores))
reranked.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, _ in reranked[:top_k]]
def _compute_cross_encoder_score(self, query: str, document: str) -> float:
"""计算交叉编码器分数(简化)"""
# 实际应用中应使用预训练模型
query_terms = set(query.lower().split())
doc_terms = set(document.lower().split())
overlap = len(query_terms & doc_terms)
return overlap / len(query_terms) if query_terms else 0
def _llm_rerank(
self,
query: str,
documents: List[Dict],
top_k: int
) -> List[Dict]:
"""使用LLM重排序"""
scores = []
for doc in documents:
prompt = f"""
评估以下文档与查询的相关性,返回0-1的分数:
查询: {query}
文档: {doc['text'][:500]}
相关性分数(0-1):
"""
# 简化版:实际应用中应调用LLM
score = self._compute_cross_encoder_score(query, doc['text'])
scores.append(score)
# 排序
reranked = list(zip(documents, scores))
reranked.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, _ in reranked[:top_k]]
def _keyword_boost_rerank(
self,
query: str,
documents: List[Dict],
top_k: int
) -> List[Dict]:
"""关键词加权重排序"""
query_terms = set(query.lower().split())
for doc in documents:
# 计算原始分数(来自检索)
base_score = doc.get('score', 0)
# 计算关键词匹配分数
doc_text = doc.get('text', '').lower()
keyword_score = 0
for term in query_terms:
if term in doc_text:
keyword_score += 1
# 混合分数
doc['rerank_score'] = 0.7 * base_score + 0.3 * keyword_score
# 排序
documents.sort(key=lambda x: x.get('rerank_score', 0), reverse=True)
return documents[:top_k]
5.4 RAG检索评估
from typing import List, Dict
import numpy as np
class RAGEvaluator:
"""RAG检索评估器"""
def __init__(self):
self.metrics = []
def evaluate_retrieval(
self,
queries: List[str],
ground_truth: List[List[int]], # 每个查询的相关文档ID
retrieved: List[List[Tuple[int, float]]], # 检索结果
k_values: List[int] = [1, 5, 10, 20]
) -> Dict[str, float]:
"""
评估检索质量
指标:
- Recall@k: 召回率
- Precision@k: 精确率
- MAP@k: 平均精度
- MRR@k: 平均倒数排名
- NDCG@k: 归一化折损累积增益
"""
metrics = {}
for k in k_values:
recall = self._compute_recall_at_k(queries, ground_truth, retrieved, k)
precision = self._compute_precision_at_k(queries, ground_truth, retrieved, k)
map_score = self._compute_map_at_k(queries, ground_truth, retrieved, k)
mrr = self._compute_mrr(queries, ground_truth, retrieved, k)
ndcg = self._compute_ndcg_at_k(queries, ground_truth, retrieved, k)
metrics[f'recall@{k}'] = recall
metrics[f'precision@{k}'] = precision
metrics[f'map@{k}'] = map_score
metrics[f'mrr@{k}'] = mrr
metrics[f'ndcg@{k}'] = ndcg
return metrics
def _compute_recall_at_k(
self,
queries: List[str],
ground_truth: List[List[int]],
retrieved: List[List[Tuple[int, float]]],
k: int
) -> float:
"""计算Recall@k"""
total_recall = 0
for gt_docs, ret_docs in zip(ground_truth, retrieved):
gt_set = set(gt_docs)
ret_set = set([doc_id for doc_id, _ in ret_docs[:k]])
if gt_set:
recall = len(gt_set & ret_set) / len(gt_set)
total_recall += recall
return total_recall / len(queries) if queries else 0
def _compute_precision_at_k(
self,
queries: List[str],
ground_truth: List[List[int]],
retrieved: List[List[Tuple[int, float]]],
k: int
) -> float:
"""计算Precision@k"""
total_precision = 0
for gt_docs, ret_docs in zip(ground_truth, retrieved):
gt_set = set(gt_docs)
ret_top_k = [doc_id for doc_id, _ in ret_docs[:k]]
relevant = sum(1 for doc_id in ret_top_k if doc_id in gt_set)
precision = relevant / k
total_precision += precision
return total_precision / len(queries) if queries else 0
def _compute_map_at_k(
self,
queries: List[str],
ground_truth: List[List[int]],
retrieved: List[List[Tuple[int, float]]],
k: int
) -> float:
"""计算MAP@k"""
total_ap = 0
for gt_docs, ret_docs in zip(ground_truth, retrieved):
gt_set = set(gt_docs)
ret_top_k = [doc_id for doc_id, _ in ret_docs[:k]]
# 计算AP
hits = 0
precision_sum = 0
for i, doc_id in enumerate(ret_top_k):
if doc_id in gt_set:
hits += 1
precision_sum += hits / (i + 1)
ap = precision_sum / len(gt_set) if gt_set else 0
total_ap += ap
return total_ap / len(queries) if queries else 0
def _compute_mrr(
self,
queries: List[str],
ground_truth: List[List[int]],
retrieved: List[List[Tuple[int, float]]],
k: int
) -> float:
"""计算MRR"""
total_rr = 0
for gt_docs, ret_docs in zip(ground_truth, retrieved):
gt_set = set(gt_docs)
ret_top_k = [doc_id for doc_id, _ in ret_docs[:k]]
# 找到第一个相关文档的位置
rr = 0
for i, doc_id in enumerate(ret_top_k):
if doc_id in gt_set:
rr = 1 / (i + 1)
break
total_rr += rr
return total_rr / len(queries) if queries else 0
def _compute_ndcg_at_k(
self,
queries: List[str],
ground_truth: List[List[int]],
retrieved: List[List[Tuple[int, float]]],
k: int
) -> float:
"""计算NDCG@k"""
total_ndcg = 0
for gt_docs, ret_docs in zip(ground_truth, retrieved):
# 创建理想排序(所有相关文档在前)
ideal_gain = [1] * len(gt_docs) + [0] * (k - len(gt_docs))
dcg_ideal = sum(
gain / np.log2(i + 2) for i, gain in enumerate(ideal_gain[:k])
)
# 计算实际DCG
gt_set = set(gt_docs)
actual_gain = [
1 if doc_id in gt_set else 0
for doc_id, _ in ret_docs[:k]
]
dcg_actual = sum(
gain / np.log2(i + 2) for i, gain in enumerate(actual_gain)
)
# NDCG
ndcg = dcg_actual / dcg_ideal if dcg_ideal > 0 else 0
total_ndcg += ndcg
return total_ndcg / len(queries) if queries else 0
def evaluate_end_to_end(
self,
rag_system,
test_dataset: List[Dict],
evaluator_model=None
) -> Dict[str, float]:
"""
端到端评估(生成质量)
指标:
- Answer Relevance: 答案相关性
- Faithfulness: 答案忠实度(基于检索的上下文)
- Context Relevance: 上下文相关性
"""
results = {
'answer_relevance': [],
'faithfulness': [],
'context_relevance': []
}
for item in test_dataset:
query = item['query']
ground_truth_answer = item['answer']
# 使用RAG系统生成答案
response = rag_system.generate(query)
answer = response['answer']
context = response['context']
# 评估答案相关性
answer_relevance = self._evaluate_answer_relevance(
query, answer, ground_truth_answer, evaluator_model
)
results['answer_relevance'].append(answer_relevance)
# 评估忠实度
faithfulness = self._evaluate_faithfulness(
answer, context, evaluator_model
)
results['faithfulness'].append(faithfulness)
# 评估上下文相关性
context_relevance = self._evaluate_context_relevance(
query, context, evaluator_model
)
results['context_relevance'].append(context_relevance)
# 计算平均分
return {
key: np.mean(values)
for key, values in results.items()
}
def _evaluate_answer_relevance(self, query, answer, ground_truth, model=None):
"""评估答案相关性"""
# 简化版:实际应用中应使用嵌入相似度或LLM评估
if not model:
return 0.8 # 示例值
return 0
def _evaluate_faithfulness(self, answer, context, model=None):
"""评估忠实度(答案是否基于检索的上下文)"""
# 简化版:实际应用中应使用LLM评估
if not model:
return 0.8 # 示例值
return 0
def _evaluate_context_relevance(self, query, context, model=None):
"""评估上下文相关性"""
# 简化版:实际应用中应使用嵌入相似度或LLM评估
if not model:
return 0.8 # 示例值
return 0
6. 面试高频问题
Q1: 精确检索和近似检索有什么区别?什么时候用哪个?
回答要点:
-
精确检索(暴力搜索)
- 算法:计算查询与所有向量的距离
- 时间复杂度:O(N×D)
- 优点:100%准确,不需要训练
- 缺点:数据量大时性能差
-
近似检索(ANN)
- 算法:HNSW、IVF、IVFPQ等
- 时间复杂度:O(logN×D)
- 优点:查询快,支持大规模数据
- 缺点:牺牲部分精度(召回率)
-
使用场景
精确检索: - 数据量小(<10K) - 对召回率要求极高(100%) - 实时构建索引 近似检索: - 数据量大(>10K) - 对查询延迟有要求(<50ms) - 接受95%-99%召回率
Q2: HNSW算法的原理是什么?为什么它是最流行的ANN算法?
回答要点:
-
HNSW核心思想
- 结合跳表和NSW(Navigable Small World)
- 多层结构:顶层稀疏,底层稠密
- 借跳表的思想实现快速定位
-
查询流程
1. 从顶层入口点开始 2. 在每层执行贪婪搜索(找到最近的节点) 3. 逐层向下,最终在底层精确搜索 4. 底层使用ef_search参数扩展搜索范围 -
优势
- 对数级查询复杂度:O(logN)
- 高召回率:接近精确检索
- 支持动态插入删除
- 内存占用可控
-
关键参数
M: 每层连接数(影响召回率和构建时间) ef_construction: 构建时邻居数 ef_search: 查询时候选列表大小
Q3: IVF索引的原理是什么?nlist和nprobe如何选择?
回答要点:
-
IVF核心思想
- 倒排索引(Inverted File)
- 使用K-means聚类划分向量空间
- 查询时只搜索相关聚类
-
参数选择
nlist(聚类数): - 通常取 sqrt(N) 或 N/100 - N=1M时,nlist=1000-10000 - 太小:每个聚类太多向量,搜索慢 - 太大:聚类中心存储开销大 nprobe(探测聚类数): - 影响精度和速度 - 一般取 nlist/10 到 nlist/2 - nprobe越大召回率越高,但查询越慢 -
优化建议
高精度场景:nprobe = nlist/2 平衡场景:nprobe = nlist/10 低延迟场景:nprobe = nlist/100
Q4: IVFPQ中的Product Quantization是如何工作的?
回答要点:
-
PQ原理
- 将D维向量分成m个子空间(每个D/m维)
- 每个子空间独立量化(使用K-means聚类)
- 用子空间中心点ID代替原始向量
-
编码过程
原始向量 [v1, v2, ..., v768] ↓ 划分为8个子空间,每个96维 ↓ 每个子空间量化为8位(256个中心点) ↓ 编码结果 [c1, c2, ..., c8] (每个c是0-255) -
压缩比
原始: 768 × 4 bytes = 3072 bytes PQ编码: 8 × 1 byte = 8 bytes 压缩比: 384x -
距离计算优化
- 使用查表法(Lookup Table)加速
- 避免实数运算
Q5: 如何提高向量检索的召回率?
回答要点:
-
索引参数调优
# HNSW: 增加ef_search index.hnsw.efSearch = 100 # 默认50 # IVF: 增加nprobe index.nprobe = 100 # 默认10 # IVFPQ: 增加nbits和nprobe -
混合检索
- 向量检索 + 关键词检索(BM25)
- 融合多种相似度
-
查询优化
- 查询扩展(同义词、相关词)
- 查询重写(更精确的表述)
-
重排序
- 使用交叉编码器(Cross Encoder)
- LLM打分
-
使用Refine索引
# 先用快速索引检索更多结果 base_index = faiss.IndexIVFFlat(...) # 再用精确索引重排序 refine_index = faiss.IndexRefineFlat(base_index, exact_index)
Q6: 如何处理大规模向量数据库(>1亿)?
回答要点:
-
磁盘索引
- DiskANN:专为SSD设计
- ScaNN:Google的ANN库
- 分片策略:水平分片
-
内存优化
- IVFPQ:量化压缩
- 热数据常驻内存
- 冷数据存磁盘
-
分布式架构
方案1: 数据分片 - 按向量ID范围分片 - 每个节点管理一个分片 方案2: 查询并行 - 同时查询所有分片 - 合并并重排序结果 方案3: 混合架构 - 元数据在数据库 - 向量在向量存储 -
更新策略
- 批量更新
- 增量索引
- 异步重建
Q7: RAG系统中如何优化检索质量?
回答要点:
-
检索前优化
- 查询扩展:生成多个相关查询 - 查询重写:使用LLM改写查询 - 元数据过滤:基于标签、时间等过滤 -
检索中优化
- 混合检索:向量 + BM25 - 调整k值:检索更多候选 - 使用高召回率索引配置 # 示例 index.hnsw.efSearch = 100 k = 50 # 检索50个候选 -
检索后优化
- 重排序:Cross Encoder / LLM - 去重:合并相似文档 - 多样性:确保结果多样性 -
评估与迭代
- 离线评估:Recall、Precision、MAP、MRR、NDCG - 在线评估:点击率、停留时间 - A/B测试:对比不同策略
Q8: Faiss和Milvus有什么区别?
回答要点:
| 特性 | Faiss | Milvus |
|---|---|---|
| 定位 | 向量检索库 | 向量数据库 |
| 功能 | 检索算法 | 存储+检索+AR |
| 元数据 | 需自行管理 | 原生支持 |
| CRUD | 仅Add | 完整CRUD |
| 分布式 | 需自行实现 | 原生支持 |
| 持久化 | 需自行处理 | 内置 |
| 语言 | C++/Python | Go/Python |
| 使用场景 | 嵌入式应用 | 独立部署 |
选择建议:
- 需要完整向量数据库功能 → Milvus
- 需要嵌入到现有系统 → Faiss
- 需要元数据查询 → Milvus
- 需要极致性能定制 → Faiss
总结
本模块深入讲解了Faiss向量检索架构的核心内容:
关键技术点
-
Faiss索引类型
- 精确索引:Flat
- 近似索引:HNSW、IVF、IVFPQ
- 分布式索引:DiskANN
-
核心算法
- HNSW:分层小世界图
- IVF:倒排聚类
- PQ:乘积量化
-
生产优化
- 索引选择决策
- 参数调优(ef_search、nprobe)
- 混合检索、重排序
- 查询扩展
-
RAG应用
- 检索-生成流水线
- 质量评估指标
- 优化策略