内容纲要
领域知识库建设
目录
1. 知识库概述
1.1 领域知识库架构
┌─────────────────────────────────────────────────────┐
│ 领域知识库架构 │
├─────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ 数据采集层 │ │
│ │ - API接口 - 网页抓取 - 文档导入 │ │
│ └────────────────┬────────────────────────────┘ │
│ │ │
│ ┌────────────────▼────────────────────────────┐ │
│ │ 数据处理层 │ │
│ │ - 清洗 - 去重 - 格式化 - 结构化 │ │
│ └────────────────┬────────────────────────────┘ │
│ │ │
│ ┌────────────────▼────────────────────────────┐ │
│ │ 知识提取层 │ │
│ │ - 实体提取 - 关系抽取 - 属性抽取 │ │
│ └────────────────┬────────────────────────────┘ │
│ │ │
│ ┌────────────────▼────────────────────────────┐ │
│ │ 知识存储层 │ │
│ │ - 向量数据库 - 图数据库 - 关系数据库 │ │
│ └────────────────┬────────────────────────────┘ │
│ │ │
│ ┌────────────────▼────────────────────────────┐ │
│ │ 知识检索层 │ │
│ │ - 语义检索 - 关键词检索 - 图检索 │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────┘
1.2 知识库类型
| 类型 | 特点 | 适用场景 |
|---|---|---|
| 文档型 | 非结构化文档 | 通用知识库 |
| 结构化型 | 实体、关系、属性 | 知识图谱 |
| 混合型 | 文档+结构化 | 复杂领域知识 |
| FAQ型 | 问答对 | 客服、帮助中心 |
| 手册型 | 操作指南、流程 | 技术文档 |
1.3 常见领域知识库
┌─────────────────────────────────────────────────┐
│ 常见领域知识库 │
├─────────────────────────────────────────────────┤
│ │
│ 技术领域 │
│ - 编程语言文档(Python、Java、Go等) │
│ - 框架文档(LangChain、Spring等) │
│ - API文档 │
│ │
│ 商业领域 │
│ - 产品手册 │
│ - 销售话术 │
│ - 竞品分析 │
│ │
│ 行业领域 │
│ - 法律法规 │
│ - 行业标准 │
│ - 市场报告 │
│ │
│ 企业内部 │
│ - 规章制度 │
│ - 业务流程 │
│ - 培训材料 │
│ │
└─────────────────────────────────────────────────┘
2. 数据采集
2.1 数据源类型
from typing import List, Dict
from abc import ABC, abstractmethod
import requests
from bs4 import BeautifulSoup
import json
class DataSource(ABC):
"""数据源抽象类"""
@abstractmethod
def collect(self) -> List[Dict]:
"""采集数据"""
pass
class WebScraper(DataSource):
"""网页抓取"""
def __init__(self, urls: List[str]):
self.urls = urls
def collect(self) -> List[Dict]:
results = []
for url in self.urls:
try:
content = self._scrape(url)
results.append({
"url": url,
"content": content,
"source": "web",
"type": "html"
})
except Exception as e:
print(f"抓取失败 {url}: {e}")
return results
def _scrape(self, url: str) -> str:
response = requests.get(url, timeout=30)
soup = BeautifulSoup(response.content, 'html.parser')
# 移除脚本和样式
for script in soup(['script', 'style']):
script.decompose()
return soup.get_text(separator='\n', strip=True)
class APIConnector(DataSource):
"""API数据采集"""
def __init__(
self,
api_url: str,
api_key: str = None,
params: Dict = None
):
self.api_url = api_url
self.api_key = api_key
self.params = params or {}
def collect(self) -> List[Dict]:
headers = {}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
response = requests.get(
self.api_url,
headers=headers,
params=self.params
)
response.raise_for_status()
data = response.json()
return self._normalize(data)
def _normalize(self, data: any) -> List[Dict]:
"""标准化API返回数据"""
if isinstance(data, list):
return data
elif isinstance(data, dict):
return [data]
else:
return []
class FileImporter(DataSource):
"""文件导入"""
def __init__(self, file_paths: List[str]):
self.file_paths = file_paths
def collect(self) -> List[Dict]:
results = []
for file_path in self.file_paths:
content = self._read_file(file_path)
results.append({
"file_path": file_path,
"content": content,
"source": "file",
"type": self._get_file_type(file_path)
})
return results
def _read_file(self, file_path: str) -> str:
"""读取文件内容"""
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
def _get_file_type(self, file_path: str) -> str:
"""获取文件类型"""
import os
ext = os.path.splitext(file_path)[1].lower()
type_map = {
'.md': 'markdown',
'.txt': 'text',
'.pdf': 'pdf',
'.docx': 'word',
'.json': 'json',
}
return type_map.get(ext, 'unknown')
class DatabaseExporter(DataSource):
"""数据库导出"""
def __init__(
self,
connection_string: str,
query: str,
batch_size: int = 1000
):
self.connection_string = connection_string
self.query = query
self.batch_size = batch_size
def collect(self) -> List[Dict]:
import psycopg2
import pandas as pd
conn = psycopg2.connect(self.connection_string)
df = pd.read_sql_query(self.query, conn)
conn.close()
return df.to_dict('records')
2.2 数据采集器
class DataCollector:
"""数据采集器"""
def __init__(self):
self.sources: List[DataSource] = []
def add_source(self, source: DataSource):
"""添加数据源"""
self.sources.append(source)
def collect_all(self) -> List[Dict]:
"""采集所有数据源"""
all_data = []
for source in self.sources:
try:
data = source.collect()
all_data.extend(data)
except Exception as e:
print(f"采集失败: {e}")
return all_data
def collect_with_progress(self) -> List[Dict]:
"""带进度的采集"""
from tqdm import tqdm
all_data = []
for source in tqdm(self.sources, desc="采集数据"):
data = source.collect()
all_data.extend(data)
tqdm.write(f"采集到 {len(data)} 条数据")
return all_data
# ============== 使用示例 ==============
if __name__ == "__main__":
collector = DataCollector()
# 添加网页抓取源
collector.add_source(WebScraper([
"https://docs.example.com/api",
"https://docs.example.com/guide"
]))
# 添加API源
collector.add_source(APIConnector(
api_url="https://api.example.com/documents",
api_key="your-api-key"
))
# 添加文件源
collector.add_source(FileImporter([
"./docs/README.md",
"./docs/GUIDE.md"
]))
# 采集数据
data = collector.collect_all()
print(f"共采集 {len(data)} 条数据")
2.3 增量采集
from typing import Optional
import hashlib
class IncrementalCollector:
"""增量采集器"""
def __init__(self, state_file: str = ".data_state.json"):
self.state_file = state_file
self.state = self._load_state()
def _load_state(self) -> Dict:
"""加载采集状态"""
import os
if not os.path.exists(self.state_file):
return {
"sources": {},
"last_sync": None
}
with open(self.state_file, 'r') as f:
import json
return json.load(f)
def _save_state(self):
"""保存采集状态"""
import json
with open(self.state_file, 'w') as f:
json.dump(self.state, f, indent=2, ensure_ascii=False)
def collect_incremental(
self,
source: DataSource,
source_id: str
) -> List[Dict]:
"""
增量采集
只采集自上次采集以来变化的数据
"""
# 获取上次采集的hash
last_hash = self.state["sources"].get(source_id, {}).get("hash")
# 采集最新数据
new_data = source.collect()
# 计算新数据的hash
new_hash = self._compute_hash(new_data)
# 比较是否变化
if new_hash == last_hash:
print(f"源 {source_id} 无变化,跳过")
return []
# 更新状态
self.state["sources"][source_id] = {
"hash": new_hash,
"last_sync": datetime.utcnow().isoformat()
}
self.state["last_sync"] = datetime.utcnow().isoformat()
self._save_state()
return new_data
def _compute_hash(self, data: List[Dict]) -> str:
"""计算数据hash"""
import json
data_str = json.dumps(data, sort_keys=True)
return hashlib.md5(data_str.encode()).hexdigest()
3. 数据处理
3.1 数据清洗
import re
from typing import List, Dict, Callable
class DataCleaner:
"""数据清洗器"""
def __init__(self):
self.cleaners: List[Callable] = []
def add_cleaner(self, cleaner: Callable):
"""添加清洗函数"""
self.cleaners.append(cleaner)
def clean(self, data: List[Dict]) -> List[Dict]:
"""清洗数据"""
cleaned = data.copy()
for cleaner in self.cleaners:
cleaned = cleaner(cleaned)
return cleaned
# 内置清洗器
@staticmethod
def remove_empty(data: List[Dict]) -> List[Dict]:
"""移除空数据"""
return [
item for item in data
if item.get("content") and item["content"].strip()
]
@staticmethod
def remove_duplicates(data: List[Dict]) -> List[Dict]:
"""去重(基于内容hash)"""
seen = set()
unique = []
for item in data:
content = item.get("content", "")
content_hash = hashlib.md5(content.encode()).hexdigest()
if content_hash not in seen:
seen.add(content_hash)
unique.append(item)
return unique
@staticmethod
def normalize_whitespace(data: List[Dict]) -> List[Dict]:
"""规范化空白字符"""
for item in data:
if "content" in item:
# 替换多个空白为单个空格
item["content"] = re.sub(r'\s+', ' ', item["content"])
# 去除首尾空白
item["content"] = item["content"].strip()
return data
@staticmethod
def remove_special_chars(data: List[Dict], chars: str = None) -> List[Dict]:
"""移除特殊字符"""
if chars is None:
# 默认移除控制字符
chars = '\x00-\x1F\x7F-\x9F'
pattern = re.compile(f'[{re.escape(chars)}]')
for item in data:
if "content" in item:
item["content"] = pattern.sub('', item["content"])
return data
@staticmethod
def filter_by_length(
data: List[Dict],
min_length: int = 10,
max_length: int = 100000
) -> List[Dict]:
"""按长度过滤"""
return [
item for item in data
if min_length <= len(item.get("content", "")) <= max_length
]
@staticmethod
def filter_by_keywords(
data: List[Dict],
keywords: List[str],
mode: str = "include" # include/exclude
) -> List[Dict]:
"""按关键词过滤"""
keywords_lower = [k.lower() for k in keywords]
if mode == "include":
return [
item for item in data
if any(k in item.get("content", "").lower()
for k in keywords_lower)
]
else:
return [
item for item in data
if not any(k in item.get("content", "").lower()
for k in keywords_lower)
]
# ============== 使用示例 ==============
cleaner = DataCleaner()
# 添加清洗步骤
cleaner.add_cleaner(DataCleaner.remove_empty)
cleaner.add_cleaner(DataCleaner.remove_duplicates)
cleaner.add_cleaner(DataCleaner.normalize_whitespace)
cleaner.add_cleaner(DataCleaner.filter_by_length)
# 清洗数据
cleaned_data = cleaner.clean(raw_data)
3.2 数据转换
class DataTransformer:
"""数据转换器"""
def to_documents(self, data: List[Dict]) -> List[Document]:
"""转换为文档对象"""
from rag import Document
documents = []
for i, item in enumerate(data):
doc = Document(
id=str(i),
content=item.get("content", ""),
metadata={
"source": item.get("source"),
"type": item.get("type"),
"url": item.get("url"),
"file_path": item.get("file_path"),
"created_at": datetime.utcnow().isoformat()
}
)
documents.append(doc)
return documents
def to_qa_pairs(self, data: List[Dict]) -> List[Dict]:
"""转换为问答对"""
qa_pairs = []
for item in data:
content = item.get("content", "")
# 简单实现:每段生成问答对
# 实际应使用LLM生成
paragraphs = content.split('\n\n')
for para in paragraphs:
if len(para) > 50:
qa_pairs.append({
"question": f"关于{para[:30]}?",
"answer": para,
"source": item.get("source"),
"metadata": item
})
return qa_pairs
def to_knowledge_graph(self, data: List[Dict]) -> Dict:
"""转换为知识图谱"""
graph = {
"nodes": [],
"edges": []
}
# 简化实现:实际应使用NLP提取实体和关系
for item in data:
content = item.get("content", "")
# 提取实体(简化)
entities = self._extract_entities(content)
# 添加节点
for entity in entities:
graph["nodes"].append({
"id": entity,
"label": entity,
"type": "entity"
})
return graph
def _extract_entities(self, text: str) -> List[str]:
"""提取实体(简化)"""
# 实际应使用NER模型
# 这里简单按大写单词提取
return list(set(re.findall(r'\b[A-Z][a-zA-Z]+\b', text)))
4. 知识提取
4.1 实体识别
from typing import List, Tuple
class EntityExtractor:
"""实体提取器"""
def __init__(self, model_name: str = "BAAI/bge-large-zh"):
from sentence_transformers import SentenceTransformer
self.embedder = SentenceTransformer(model_name)
def extract_entities(self, text: str) -> List[Dict]:
"""
提取实体
Returns:
[
{
"text": "实体文本",
"type": "实体类型",
"start": 起始位置,
"end": 结束位置,
"confidence": 置信度
}
]
"""
# 简化实现,实际应使用NER模型
# 如 spaCy, Hugging Face transformers
entities = []
# 基于规则的实体提取
rules = [
# 提取邮箱
(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', 'email'),
# 提取网址
(r'https?://[^\s]+', 'url'),
# 提取版本号
(r'\b\d+\b\.\b\d+\b\.\b\d+\b', 'version'),
# 提取日期
(r'\d{4}-\d{2}-\d{2}', 'date'),
]
for pattern, entity_type in rules:
for match in re.finditer(pattern, text):
entities.append({
"text": match.group(),
"type": entity_type,
"start": match.start(),
"end": match.end(),
"confidence": 0.9
})
return entities
def extract_with_llm(self, text: str, schema: Dict) -> List[Dict]:
"""
使用LLM提取实体
Args:
text: 输入文本
schema: 实体schema,如 {"person": "人名", "org": "组织"}
"""
prompt = f"""从以下文本中提取实体。
文本:
{text}
请提取以下类型的实体:
{json.dumps(schema, indent=2)}
请以JSON格式输出:
{{
"entities": [
{{
"text": "实体文本",
"type": "实体类型",
"confidence": 0.95
}}
]
}}"""
response = self.llm.generate(prompt)
data = json.loads(response)
return data.get("entities", [])
def link_entities(self, entities: List[Dict], kb) -> List[Dict]:
"""
实体链接
将提取的实体链接到知识库中的实体
"""
linked = []
for entity in entities:
# 在知识库中搜索相似实体
candidates = kb.search_entities(
query=entity["text"],
top_k=3
)
if candidates:
entity["linked"] = candidates[0]["id"]
entity["link_confidence"] = candidates[0]["score"]
linked.append(entity)
return linked
4.2 关系抽取
class RelationExtractor:
"""关系抽取器"""
def __init__(self, llm=None):
self.llm = llm
def extract_relations(
self,
text: str,
entities: List[Dict]
) -> List[Dict]:
"""
抽取实体间的关系
Args:
text: 文本
entities: 实体列表
Returns:
[
{
"subject": "主体实体",
"relation": "关系",
"object": "客体实体",
"confidence": 置信度
}
]
"""
if not self.llm:
# 基于规则的关系抽取
return self._rule_based_extraction(text, entities)
# 使用LLM抽取
return self._llm_based_extraction(text, entities)
def _rule_based_extraction(
self,
text: str,
entities: List[Dict]
) -> List[Dict]:
"""基于规则的关系抽取"""
relations = []
entity_texts = [e["text"] for e in entities]
# 定义关系模式
patterns = [
(r'({})\s*(是|属于)\s*({})', 'is_a'),
(r'({})\s*(位于|在)\s*({})', 'located_in'),
(r'({})\s*(创建|开发)\s*({})', 'created'),
(r'({})\s*(使用|基于)\s*({})', 'uses'),
]
for pattern, relation_type in patterns:
for e1 in entity_texts:
for e2 in entity_texts:
if e1 == e2:
continue
pattern_str = pattern.format(
re.escape(e1),
re.escape(e2)
)
if re.search(pattern_str, text):
relations.append({
"subject": e1,
"relation": relation_type,
"object": e2,
"confidence": 0.8
})
return relations
def _llm_based_extraction(
self,
text: str,
entities: List[Dict]
) -> List[Dict]:
"""使用LLM抽取关系"""
entity_list = "\n".join([
f"- {e['text']} ({e.get('type', 'unknown')})"
for e in entities
])
prompt = f"""从以下文本中抽取实体间的关系。
文本:
{text}
实体:
{entity_list}
请以JSON格式输出:
{{
"relations": [
{{
"subject": "主体实体",
"relation": "关系",
"object": "客体实体",
"confidence": 0.95
}}
]
}}"""
response = self.llm.generate(prompt)
data = json.loads(response)
return data.get("relations", [])
4.3 知识图谱构建
class KnowledgeGraphBuilder:
"""知识图谱构建器"""
def __init__(self):
self.nodes = {}
self.edges = []
def add_node(self, id: str, label: str, **kwargs):
"""添加节点"""
self.nodes[id] = {
"id": id,
"label": label,
**kwargs
}
def add_edge(
self,
source: str,
target: str,
relation: str,
**kwargs
):
"""添加边"""
self.edges.append({
"source": source,
"target": target,
"relation": relation,
**kwargs
})
from documents(self, documents: List[Dict]):
"""从文档构建知识图谱"""
extractor = EntityExtractor()
relation_extractor = RelationExtractor()
for doc in documents:
content = doc["content"]
# 提取实体
entities = extractor.extract_entities(content)
# 添加节点
for entity in entities:
node_id = f"{entity['type']}:{entity['text']}"
self.add_node(
id=node_id,
label=entity["text"],
type=entity["type"]
)
# 抽取关系
relations = relation_extractor.extract_relations(
content,
entities
)
# 添加边
for rel in relations:
source_id = f"entity:{rel['subject']}"
target_id = f"entity:{rel['object']}"
self.add_edge(
source=source_id,
target=target_id,
relation=rel["relation"]
)
def to_graph_json(self) -> Dict:
"""导出为图JSON"""
return {
"nodes": list(self.nodes.values()),
"edges": self.edges
}
def save_to_file(self, file_path: str):
"""保存到文件"""
import json
with open(file_path, 'w') as f:
json.dump(self.to_graph_json(), f, indent=2)
5. 知识索引
5.1 向量索引构建
from sentence_transformers import SentenceTransformer
import numpy as np
class VectorIndex:
"""向量索引"""
def __init__(
self,
embedding_model: str = "BAAI/bge-small-zh-v1.5",
dim: int = 768
):
self.embedder = SentenceTransformer(embedding_model)
self.dim = dim
self.documents = {} # id -> document
self.embeddings = np.zeros((0, dim)) # embeddings matrix
self.ids = [] # ids aligned with embeddings
def add_document(self, doc: Dict):
"""添加文档"""
doc_id = doc["id"]
# 生成embedding
text = doc.get("content", "")
embedding = self.embedder.encode(text)
# 存储
self.documents[doc_id] = doc
self.ids.append(doc_id)
# 添加到embedding矩阵
if len(self.embeddings) == 0:
self.embeddings = embedding.reshape(1, -1)
else:
self.embeddings = np.vstack([self.embeddings, embedding])
def add_documents(self, docs: List[Dict]):
"""批量添加文档"""
for doc in docs:
self.add_document(doc)
def search(
self,
query: str,
top_k: int = 10,
min_score: float = 0.0
) -> List[Dict]:
"""
搜索
Returns:
[
{
"document": {...},
"score": 相似度分数
}
]
"""
# 生成查询embedding
query_embedding = self.embedder.encode(query)
# 计算相似度
scores = np.dot(
self.embeddings,
query_embedding
) / (
np.linalg.norm(self.embeddings, axis=1) *
np.linalg.norm(query_embedding)
)
# 获取top_k
top_indices = np.argsort(scores)[-top_k:][::-1]
results = []
for idx in top_indices:
score = float(scores[idx])
if score >= min_score:
doc_id = self.ids[idx]
results.append({
"document": self.documents[doc_id],
"score": score
})
return results
def save(self, path: str):
"""保存索引"""
import pickle
with open(path, 'wb') as f:
pickle.dump({
"documents": self.documents,
"embeddings": self.embeddings,
"ids": self.ids,
"dim": self.dim
}, f)
def load(self, path: str):
"""加载索引"""
import pickle
with open(path, 'rb') as f:
data = pickle.load(f)
self.documents = data["documents"]
self.embeddings = data["embeddings"]
self.ids = data["ids"]
self.dim = data["dim"]
5.2 混合索引
class HybridIndex:
"""混合索引 - 向量 + 关键词"""
def __init__(
self,
embedding_model: str = "BAAI/bge-small-zh-v1.5"
):
self.vector_index = VectorIndex(embedding_model)
self.keyword_index = {} # word -> {doc_ids}
def add_document(self, doc: Dict):
"""添加文档"""
doc_id = doc["id"]
content = doc.get("content", "")
# 添加到向量索引
self.vector_index.add_document(doc)
# 添加到关键词索引
words = self._extract_keywords(content)
for word in words:
if word not in self.keyword_index:
self.keyword_index[word] = set()
self.keyword_index[word].add(doc_id)
def search(
self,
query: str,
top_k: int = 10,
vector_weight: float = 0.7,
keyword_weight: float = 0.3
) -> List[Dict]:
"""
混与其他搜索
结合向量搜索和关键词搜索
"""
# 向量搜索
vector_results = self.vector_index.search(query, top_k * 2)
# 关键词搜索
keyword_results = self._keyword_search(query, top_k * 2)
# 合并分数
combined_scores = {}
# 向量结果
for result in vector_results:
doc_id = result["document"]["id"]
combined_scores[doc_id] = (
combined_scores.get(doc_id, 0) +
result["score"] * vector_weight
)
# 关键词结果
for result in keyword_results:
doc_id = result["document"]["id"]
combined_scores[doc_id] = (
combined_scores.get(doc_id, 0) +
result["score"] * keyword_weight
)
# 排序
sorted_ids = sorted(
combined_scores.keys(),
key=lambda x: combined_scores[x],
reverse=True
)
# 构建结果
results = []
for doc_id in sorted_ids[:top_k]:
results.append({
"document": self.vector_index.documents[doc_id],
"score": combined_scores[doc_id]
})
return results
def _extract_keywords(self, text: str) -> List[str]:
"""提取关键词"""
# 简化实现:分词
import jieba
words = jieba.lcut(text)
# 过滤停用词
stopwords = {
"的", "了", "是", "在", "我", "有", "和", "就"
}
return [w for w in words if w not in stopwords and len(w) > 1]
def _keyword_search(
self,
query: str,
top_k: int
) -> List[Dict]:
"""关键词搜索"""
keywords = self._extract_keywords(query)
# 计算每个文档的匹配分数
doc_scores = {}
for keyword in keywords:
if keyword in self.keyword_index:
for doc_id in self.keyword_index[keyword]:
doc_scores[doc_id] = (
doc_scores.get(doc_id, 0) + 1
)
# 归一化
max_score = max(doc_scores.values()) if doc_scores else 1
for doc_id in doc_scores:
doc_scores[doc_id] /= max_score
# 构建结果
results = []
for doc_id, score in sorted(
doc_scores.items(),
key=lambda x: x[1],
reverse=True
)[:top_k]:
results.append({
"document": self.vector_index.documents[doc_id],
"score": score
})
return results
6. 知识管理
6.1 知识库管理器
class KnowledgeBase:
"""知识库管理器"""
def __init__(
self,
name: str,
index_path: str = None
):
self.name = name
self.index_path = index_path or f"./indexes/{name}"
# 初始化组件
self.collector = DataCollector()
self.cleaner = DataCleaner()
self.index = VectorIndex()
# 设置默认清洗器
self.cleaner.add_cleaner(DataCleaner.remove_empty)
self.cleaner.add_cleaner(DataCleaner.remove_duplicates)
self.cleaner.add_cleaner(DataCleaner.normalize_whitespace)
def add_source(self, source: DataSource):
"""添加数据源"""
self.collector.add_source(source)
def build(self):
"""构建知识库"""
print(f"开始构建知识库: {self.name}")
# 1.采集数据
print("采集数据...")
raw_data = = self.collector.collect_all()
print(f"采集到 {len(raw_data)} 条原始数据")
# 2. 清洗数据
print("清洗数据...")
cleaned_data = self.cleaner.clean(raw_data)
print(f"清洗后 {len(cleaned_data)} 条数据")
# 3. 转换为文档
transformer = DataTransformer()
documents = transformer.to_documents(cleaned_data)
# 4. 构建索引
print("构建索引...")
self.index.add_documents(documents)
# 5. 保存索引
if self.index_path:
import os
os.makedirs(os.path.dirname(self.index_path), exist_ok=True)
self.index.save(self.index_path)
print(f"索引已保存到: {self.index_path}")
print("知识库构建完成!")
def search(
self,
query: str,
top_k: int = 10
) -> List[Dict]:
"""搜索知识库"""
return self.index.search(query, top_k=top_k)
def load_index(self):
"""加载索引"""
if self.index_path:
self.index.load(self.index_path)
print(f"已加载索引: {self.index_path}")
print(f"索引包含 {len(self.index.documents)} 个文档")
def update(self):
"""更新知识库(增量)"""
# TODO: 实现增量更新
pass
def get_stats(self) -> Dict:
"""获取知识库统计信息"""
return {
"name": self.name,
"document_count": len(self.index.documents),
"index_path": self.index_path
}
# ============== 使用示例 ==============
if __name__ == "__main__":
# 创建知识库
kb = KnowledgeBase(name="tech_docs")
# 添加数据源
kb.add_source(WebScraper([
"https://python.org/doc"
]))
kb.add_source(FileImporter([
"./docs/*.md"
]))
# 构建知识库
kb.build()
# 搜索
results = kb.search("Python列表操作", top_k=3)
for result in results:
print(f"{result['score']:.2f}: {result['document']['content'][:100]}")
6.2 知识库版本管理
class KnowledgeBaseVersioning:
"""知识库版本管理"""
def __init__(self, kb: KnowledgeBase):
self.kb = kb
self.versions: Dict[str, Dict] = {}
def create_version(self, version: str) -> str:
"""创建版本快照"""
import hashlib
import json
# 计算当前内容的hash
content_hash = hashlib.md5(
json.dumps(self.kb.index.documents).encode()
).hexdigest()
version_id = f"{version}-{content_hash[:8]}"
# 保存快照
snapshot_path = f"{self.kb.index_path}.snapshot.{version_id}"
self.kb.index.save(snapshot_path)
self.versions[version_id] = {
"version": version,
"created_at": datetime.utcnow().isoformat(),
"hash": content_hash,
"snapshot_path": snapshot_path
}
return version_id
def restore_version(self, version_id: str):
"""恢复版本"""
if version_id not in self.versions:
raise ValueError(f"版本不存在: {version_id}")
snapshot_path = self.versions[version_id]["snapshot_path"]
self.kb.index.load(snapshot_path)
def list_versions(self) -> List[Dict]:
"""列出所有版本"""
return list(self.versions.values())
7. 知识质量评估
7.1 质量指标
class KnowledgeQualityAssessor:
"""知识质量评估器"""
def assess(self, kb: KnowledgeBase) -> Dict:
"""
评估知识库质量
Returns:
{
"completeness": 完整性分数,
"accuracy": 准确性分数,
"consistency": 一致性分数,
"coverage": 覆盖率分数,
"overall": 综合分数
}
"""
metrics = {}
# 完整性
metrics["completeness"] = self._assess_completeness(kb)
# 准确性
metrics["accuracy"] = self._assess_accuracy(kb)
# 一致性
metrics["consistency"] = self._assess_consistency(kb)
# 覆盖率
metrics["coverage"] = self._assess_coverage(kb)
# 综合分数
metrics["overall"] = sum(metrics.values()) / len(metrics)
return metrics
def _assess_completeness(self, kb: KnowledgeBase) -> float:
"""评估完整性"""
# 检查必要字段
required_fields = ["content", "metadata"]
score = 0
total = len(kb.index.documents)
for doc in kb.index.documents.values():
if all(f in doc for f in required_fields):
score += 1
return score / total if total > 0 else 0
def _assess_accuracy(self, kb: KnowledgeBase) -> float:
"""评估准确性"""
# 简化实现:检查文档长度
min_length = 50
score = 0
total = len(kb.index.documents)
for doc in kb.index.documents.values():
if len(doc.get("content", "")) >= min_length:
score += 1
return score / total if total > 0 else 0
def _assess_consistency(self, kb: KnowledgeBase) -> float:
"""评估一致性"""
# 检查元数据一致性
score = 1.0
# TODO: 实现更复杂的一致性检查
return score
def _assess_coverage(self, kb: KnowledgeBase) -> float:
"""评估覆盖率"""
# 检查文档类型分布
types = set()
for doc in kb.index.documents.values():
doc_type = doc.get("metadata", {}).get("type")
if doc_type:
types.add(doc_type)
# 至少3种类型认为覆盖良好
return min(len(types) / 3, 1.0)
8. 实现示例
8.1 完整知识库构建流程
"""
完整领域知识库构建流程
1. 数据采集
2. 数据清洗
3. 知识提取
4. 索引构建
5. 质量评估
"""
class DomainKnowledgeBaseBuilder:
"""领域知识库构建器"""
def __init__(self, domain: str, config: Dict = None):
self.domain = domain
self.config = config or {}
# 初始化组件
self.kb = KnowledgeBase(name=domain)
# 配置清洗器
self._setup_cleaners()
def _setup_cleaners(self):
"""配置数据清洗器"""
self.kb.cleaner.add_cleaner(DataCleaner.remove_empty)
self.kb.cleaner.add_cleaner(DataCleaner.remove_duplicates)
self.kb.cleaner.add_cleaner(DataCleaner.normalize_whitespace)
self.kb.cleaner.add_cleaner(
lambda d: DataCleaner.filter_by_length(d, min_length=50)
)
def add_web_sources(self, urls: List[str]):
"""添加网页数据源"""
self.kb.add_source(WebScraper(urls))
def add_file_sources(self, file_paths: List[str]):
"""添加文件数据源"""
self.kb.add_source(FileImporter(file_paths))
def add_api_source(
self,
api_url: str,
api_key: str = None,
params: Dict = None
):
"""添加API数据源"""
self.kb.add_source(APIConnector(api_url, api_key, params))
def build(self) -> Dict:
"""构建知识库"""
print(f"=== 开始构建 {self.domain} 知识库 ===")
# 1. 数据采集
print("\n[1/5] 数据采集")
raw_data = self.kb.collector.collect_all()
print(f" 采集到 {len(raw_data)} 条原始数据")
# 2. 数据清洗
print("\n[2/5] 数据清洗")
cleaned_data = self.kb.cleaner.clean(raw_data)
print(f" 清洗后 {len(cleaned_data)} 条数据")
# 3. 转换为文档
print("\n[3/5] 文档转换")
transformer = DataTransformer()
documents = transformer.to_documents(cleaned_data)
print(f" 生成 {len(documents)} 个文档")
# 4. 构建索引
print("\n[4/5] 构建索引")
self.kb.index.add_documents(documents)
print(f" 索引构建完成")
# 5. 质量评估
print("\n[5/5] 质量评估")
assessor = KnowledgeQualityAssessor()
quality = assessor.assess(self.kb)
print(f" 完整性: {quality['completeness']:.2f}")
print(f" 准确性: {quality['accuracy']:.2f}")
print(f" 一致性: {quality['consistency']:.2f}")
print(f" 覆盖率: {quality['coverage']:.2f}")
print(f" 综合分数: {quality['overall']:.2f}")
# 保存索引
self.kb.save(self.kb.index_path)
print(f"\n=== {self.domain} 知识库构建完成 ===")
return {
"document_count": len(documents),
"quality": quality
}
def search(self, query: str, top_k: int = 5) -> List[Dict]:
"""搜索知识库"""
return self.kb.search(query, top_k=top_k)
# ============== 使用示例 ==============
if __name__ == "__main__":
# 创建构建器
builder = DomainKnowledgeBaseBuilder(domain="python")
# 添加数据源
builder.add_web_sources([
"https://docs.python.org/3/tutorial",
"https://docs.python.org/3/library"
])
builder.add_file_sources([
"./docs/python/*.md"
])
# 构建知识库
result = builder.build()
# 搜索
query = "Python列表操作"
results = builder.search(query, top_k=3)
print(f"\n搜索: {query}")
for i, result in enumerate(results, 1):
doc = result["document"]
print(f"\n[{i}] 相关度: {result['score']:.2f}")
print(f"来源: {doc['metadata'].get('source', 'unknown')}")
print(f"内容: {doc['content'][:200]}...")
面试高频问法
Q1: 如何构建一个领域知识库?
标准回答:
领域知识库构建流程:
1. 数据采集
- 确定数据源:文档、API、网页、数据库
- 采集原始数据
- 支持增量更新
2. 数据处理
- 数据清洗:去空、去重、格式化
- 数据转换:标准化、结构化
- 质量检查:长度、格式验证
3. 知识提取
- 实体识别:提取关键实体
- 关系抽取:构建实体关系
- 属性抽取:提取实体属性
4. 索引构建
- 向量索引:语义检索
- 关键词索引:全文检索
- 混合索引:结合两者
5. 质量评估
- 完整性检查
- 准确性验证
- 覆盖率评估
实现:
```python
def build_knowledge_base(sources):
# Step 1: 采集数据
collector = DataCollector()
for source in sources:
collector.add_source(source)
raw_data = collector.collect_all()
# Step 2: 清洗数据
cleaner = DataCleaner()
cleaned = cleaner.clean(raw_data)
# Step 3: 构建索引
index = VectorIndex()
index.add_documents(cleaned)
return index</code></pre>
<p>```</p>
<h3>Q2: 如何实现知识库的增量更新?</h3>
<p>标准回答:</p>
<pre><code>增量更新策略:
1. 变更检测
- 记录上次采集的hash
- 比较当前和上次
- 只处理变化的数据
2. 增量索引
- 添加新文档
- 更新已有文档
- 删除过期文档
3. 状态管理
- 保存同步状态
- 记录文档版本
- 支持回滚
实现:
```python
class IncrementalUpdater:
def __init__(self, kb, state_file):
self.kb = kb
self.state_file = state_file
self.state = self._load_state()
def update(self, new_data):
# 检测变化
new_hash = self._compute_hash(new_data)
last_hash = self.state.get("last_hash")
if new_hash == last_hash:
return "无变化"
# 增量更新
for doc in new_data:
if doc["id"] in self.kb.documents:
# 更新
self.kb.update_document(doc)
else:
# 添加
self.kb.add_document(doc)
# 保存状态
self.state["last_hash"] = new_hash
self._save_state()
### Q3: 如何评估知识库的质量?
标准回答:
知识库质量评估维度:
-
完整性
- 必要字段是否齐全
- 文档内容是否完整
- 元数据是否完整
-
准确性
- 内容是否准确
- 格式是否正确
- 链接是否有效
-
一致性
- 格式是否统一
- 命名是否一致
- 分类是否一致
-
覆盖率
- 类型分布是否合理
- 重要主题是否覆盖
- 更新是否及时
-
可用性
- 检索效果
- 响应速度
- 错误率
实现:
def assess_quality(kb):
metrics = {}
# 完整性
complete = sum(1 for d in kb.documents if d.get("content"))
metrics["completeness"] = complete / len(kb.documents)
# 准确性
valid_length = sum(1 for d in kb.documents
if len(d.get("content", "")) > 50)
metrics["accuracy"] = valid_length / len(kb.documents)
# 覆盖率
types = set(d.get("type") for d in kb.documents)
metrics["coverage"] = min(len(types) / 5, 1.0)
# 综合分数
metrics["overall"] = sum(metrics.values()) / len(metrics)
return metrics
---
## 总结
### 知识库建设核心要点
| 要点 | 策略 |
|------|------|
| **数据采集** | 多源采集、增量更新 |
| **数据清洗** | 去重、规范化、过滤 |
| **知识提取** | 实体、关系、属性 |
| **索引构建** | 向量、关键词、混合 |
| **质量管理** | 完整性、准确性、覆盖率 |
### 最佳实践
1. **多源采集**:整合不同数据源
2. **增量更新**:支持实时更新
3. **数据清洗**:严格质量把控
4. **混合索引**:向量+关键词
5. **定期评估**:持续优化质量