【AI Agent 知识库】22-领域知识库建设

内容纲要

领域知识库建设

目录

  1. 知识库概述
  2. 数据采集
  3. 数据处理
  4. 知识提取
  5. 知识索引
  6. 知识管理
  7. 知识质量评估
  8. 实现示例

1. 知识库概述

1.1 领域知识库架构

┌─────────────────────────────────────────────────────┐
│              领域知识库架构                          │
├─────────────────────────────────────────────────────┤
│                                                     │
│  ┌─────────────────────────────────────────────┐  │
│  │              数据采集层                     │  │
│  │  - API接口  - 网页抓取  - 文档导入        │  │
│  └────────────────┬────────────────────────────┘  │
│                   │                                 │
│  ┌────────────────▼────────────────────────────┐  │
│  │              数据处理层                     │  │
│  │  - 清洗  - 去重  - 格式化  - 结构化      │  │
│  └────────────────┬────────────────────────────┘  │
│                   │                                 │
│  ┌────────────────▼────────────────────────────┐  │
│  │              知识提取层                     │  │
│  │  - 实体提取  - 关系抽取  - 属性抽取        │  │
│  └────────────────┬────────────────────────────┘  │
│                   │                                 │
│  ┌────────────────▼────────────────────────────┐  │
│  │              知识存储层                     │  │
│  │  - 向量数据库  - 图数据库  - 关系数据库     │  │
│  └────────────────┬────────────────────────────┘  │
│                   │                                 │
│  ┌────────────────▼────────────────────────────┐  │
│  │              知识检索层                     │  │
│  │  - 语义检索  - 关键词检索  - 图检索        │  │
│  └─────────────────────────────────────────────┘  │
│                                                     │
└─────────────────────────────────────────────────────┘

1.2 知识库类型

类型 特点 适用场景
文档型 非结构化文档 通用知识库
结构化型 实体、关系、属性 知识图谱
混合型 文档+结构化 复杂领域知识
FAQ型 问答对 客服、帮助中心
手册型 操作指南、流程 技术文档

1.3 常见领域知识库

┌─────────────────────────────────────────────────┐
│              常见领域知识库                      │
├─────────────────────────────────────────────────┤
│                                                     │
│  技术领域                                            │
│  - 编程语言文档(Python、Java、Go等)                │
│  - 框架文档(LangChain、Spring等)                  │
│  - API文档                                           │
│                                                     │
│  商业领域                                            │
│  - 产品手册                                           │
│  - 销售话术                                           │
│  - 竞品分析                                           │
│                                                     │
│  行业领域                                            │
│  - 法律法规                                           │
│  - 行业标准                                           │
│  - 市场报告                                           │
│                                                     │
│  企业内部                                            │
│  - 规章制度                                           │
│  - 业务流程                                           │
│  - 培训材料                                           │
│                                                     │
└─────────────────────────────────────────────────┘

2. 数据采集

2.1 数据源类型

from typing import List, Dict
from abc import ABC, abstractmethod
import requests
from bs4 import BeautifulSoup
import json

class DataSource(ABC):
    """数据源抽象类"""

    @abstractmethod
    def collect(self) -> List[Dict]:
        """采集数据"""
        pass

class WebScraper(DataSource):
    """网页抓取"""

    def __init__(self, urls: List[str]):
        self.urls = urls

    def collect(self) -> List[Dict]:
        results = []
        for url in self.urls:
            try:
                content = self._scrape(url)
                results.append({
                    "url": url,
                    "content": content,
                    "source": "web",
                    "type": "html"
                })
            except Exception as e:
                print(f"抓取失败 {url}: {e}")
        return results

    def _scrape(self, url: str) -> str:
        response = requests.get(url, timeout=30)
        soup = BeautifulSoup(response.content, 'html.parser')

        # 移除脚本和样式
        for script in soup(['script', 'style']):
            script.decompose()

        return soup.get_text(separator='\n', strip=True)

class APIConnector(DataSource):
    """API数据采集"""

    def __init__(
        self,
        api_url: str,
        api_key: str = None,
        params: Dict = None
    ):
        self.api_url = api_url
        self.api_key = api_key
        self.params = params or {}

    def collect(self) -> List[Dict]:
        headers = {}
        if self.api_key:
            headers["Authorization"] = f"Bearer {self.api_key}"

        response = requests.get(
            self.api_url,
            headers=headers,
            params=self.params
        )
        response.raise_for_status()

        data = response.json()
        return self._normalize(data)

    def _normalize(self, data: any) -> List[Dict]:
        """标准化API返回数据"""
        if isinstance(data, list):
            return data
        elif isinstance(data, dict):
            return [data]
        else:
            return []

class FileImporter(DataSource):
    """文件导入"""

    def __init__(self, file_paths: List[str]):
        self.file_paths = file_paths

    def collect(self) -> List[Dict]:
        results = []
        for file_path in self.file_paths:
            content = self._read_file(file_path)
            results.append({
                "file_path": file_path,
                "content": content,
                "source": "file",
                "type": self._get_file_type(file_path)
            })
        return results

    def _read_file(self, file_path: str) -> str:
        """读取文件内容"""
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()

    def _get_file_type(self, file_path: str) -> str:
        """获取文件类型"""
        import os
        ext = os.path.splitext(file_path)[1].lower()
        type_map = {
            '.md': 'markdown',
            '.txt': 'text',
            '.pdf': 'pdf',
            '.docx': 'word',
            '.json': 'json',
        }
        return type_map.get(ext, 'unknown')

class DatabaseExporter(DataSource):
    """数据库导出"""

    def __init__(
        self,
        connection_string: str,
        query: str,
        batch_size: int = 1000
    ):
        self.connection_string = connection_string
        self.query = query
        self.batch_size = batch_size

    def collect(self) -> List[Dict]:
        import psycopg2
        import pandas as pd

        conn = psycopg2.connect(self.connection_string)
        df = pd.read_sql_query(self.query, conn)
        conn.close()

        return df.to_dict('records')

2.2 数据采集器

class DataCollector:
    """数据采集器"""

    def __init__(self):
        self.sources: List[DataSource] = []

    def add_source(self, source: DataSource):
        """添加数据源"""
        self.sources.append(source)

    def collect_all(self) -> List[Dict]:
        """采集所有数据源"""
        all_data = []
        for source in self.sources:
            try:
                data = source.collect()
                all_data.extend(data)
            except Exception as e:
                print(f"采集失败: {e}")
        return all_data

    def collect_with_progress(self) -> List[Dict]:
        """带进度的采集"""
        from tqdm import tqdm

        all_data = []
        for source in tqdm(self.sources, desc="采集数据"):
            data = source.collect()
            all_data.extend(data)
            tqdm.write(f"采集到 {len(data)} 条数据")

        return all_data

# ============== 使用示例 ==============

if __name__ == "__main__":
    collector = DataCollector()

    # 添加网页抓取源
    collector.add_source(WebScraper([
        "https://docs.example.com/api",
        "https://docs.example.com/guide"
    ]))

    # 添加API源
    collector.add_source(APIConnector(
        api_url="https://api.example.com/documents",
        api_key="your-api-key"
    ))

    # 添加文件源
    collector.add_source(FileImporter([
        "./docs/README.md",
        "./docs/GUIDE.md"
    ]))

    # 采集数据
    data = collector.collect_all()
    print(f"共采集 {len(data)} 条数据")

2.3 增量采集

from typing import Optional
import hashlib

class IncrementalCollector:
    """增量采集器"""

    def __init__(self, state_file: str = ".data_state.json"):
        self.state_file = state_file
        self.state = self._load_state()

    def _load_state(self) -> Dict:
        """加载采集状态"""
        import os
        if not os.path.exists(self.state_file):
            return {
                "sources": {},
                "last_sync": None
            }

        with open(self.state_file, 'r') as f:
            import json
            return json.load(f)

    def _save_state(self):
        """保存采集状态"""
        import json
        with open(self.state_file, 'w') as f:
            json.dump(self.state, f, indent=2, ensure_ascii=False)

    def collect_incremental(
        self,
        source: DataSource,
        source_id: str
    ) -> List[Dict]:
        """
        增量采集

        只采集自上次采集以来变化的数据
        """
        # 获取上次采集的hash
        last_hash = self.state["sources"].get(source_id, {}).get("hash")

        # 采集最新数据
        new_data = source.collect()

        # 计算新数据的hash
        new_hash = self._compute_hash(new_data)

        # 比较是否变化
        if new_hash == last_hash:
            print(f"源 {source_id} 无变化,跳过")
            return []

        # 更新状态
        self.state["sources"][source_id] = {
            "hash": new_hash,
            "last_sync": datetime.utcnow().isoformat()
        }
        self.state["last_sync"] = datetime.utcnow().isoformat()
        self._save_state()

        return new_data

    def _compute_hash(self, data: List[Dict]) -> str:
        """计算数据hash"""
        import json
        data_str = json.dumps(data, sort_keys=True)
        return hashlib.md5(data_str.encode()).hexdigest()

3. 数据处理

3.1 数据清洗

import re
from typing import List, Dict, Callable

class DataCleaner:
    """数据清洗器"""

    def __init__(self):
        self.cleaners: List[Callable] = []

    def add_cleaner(self, cleaner: Callable):
        """添加清洗函数"""
        self.cleaners.append(cleaner)

    def clean(self, data: List[Dict]) -> List[Dict]:
        """清洗数据"""
        cleaned = data.copy()

        for cleaner in self.cleaners:
            cleaned = cleaner(cleaned)

        return cleaned

    # 内置清洗器
    @staticmethod
    def remove_empty(data: List[Dict]) -> List[Dict]:
        """移除空数据"""
        return [
            item for item in data
            if item.get("content") and item["content"].strip()
        ]

    @staticmethod
    def remove_duplicates(data: List[Dict]) -> List[Dict]:
        """去重(基于内容hash)"""
        seen = set()
        unique = []

        for item in data:
            content = item.get("content", "")
            content_hash = hashlib.md5(content.encode()).hexdigest()

            if content_hash not in seen:
                seen.add(content_hash)
                unique.append(item)

        return unique

    @staticmethod
    def normalize_whitespace(data: List[Dict]) -> List[Dict]:
        """规范化空白字符"""
        for item in data:
            if "content" in item:
                # 替换多个空白为单个空格
                item["content"] = re.sub(r'\s+', ' ', item["content"])
                # 去除首尾空白
                item["content"] = item["content"].strip()

        return data

    @staticmethod
    def remove_special_chars(data: List[Dict], chars: str = None) -> List[Dict]:
        """移除特殊字符"""
        if chars is None:
            # 默认移除控制字符
            chars = '\x00-\x1F\x7F-\x9F'

        pattern = re.compile(f'[{re.escape(chars)}]')

        for item in data:
            if "content" in item:
                item["content"] = pattern.sub('', item["content"])

        return data

    @staticmethod
    def filter_by_length(
        data: List[Dict],
        min_length: int = 10,
        max_length: int = 100000
    ) -> List[Dict]:
        """按长度过滤"""
        return [
            item for item in data
            if min_length <= len(item.get("content", "")) <= max_length
        ]

    @staticmethod
    def filter_by_keywords(
        data: List[Dict],
        keywords: List[str],
        mode: str = "include"  # include/exclude
    ) -> List[Dict]:
        """按关键词过滤"""
        keywords_lower = [k.lower() for k in keywords]

        if mode == "include":
            return [
                item for item in data
                if any(k in item.get("content", "").lower()
                      for k in keywords_lower)
            ]
        else:
            return [
                item for item in data
                if not any(k in item.get("content", "").lower()
                         for k in keywords_lower)
            ]

# ============== 使用示例 ==============

cleaner = DataCleaner()

# 添加清洗步骤
cleaner.add_cleaner(DataCleaner.remove_empty)
cleaner.add_cleaner(DataCleaner.remove_duplicates)
cleaner.add_cleaner(DataCleaner.normalize_whitespace)
cleaner.add_cleaner(DataCleaner.filter_by_length)

# 清洗数据
cleaned_data = cleaner.clean(raw_data)

3.2 数据转换

class DataTransformer:
    """数据转换器"""

    def to_documents(self, data: List[Dict]) -> List[Document]:
        """转换为文档对象"""
        from rag import Document

        documents = []
        for i, item in enumerate(data):
            doc = Document(
                id=str(i),
                content=item.get("content", ""),
                metadata={
                    "source": item.get("source"),
                    "type": item.get("type"),
                    "url": item.get("url"),
                    "file_path": item.get("file_path"),
                    "created_at": datetime.utcnow().isoformat()
                }
            )
            documents.append(doc)

        return documents

    def to_qa_pairs(self, data: List[Dict]) -> List[Dict]:
        """转换为问答对"""
        qa_pairs = []

        for item in data:
            content = item.get("content", "")

            # 简单实现:每段生成问答对
            # 实际应使用LLM生成
            paragraphs = content.split('\n\n')
            for para in paragraphs:
                if len(para) > 50:
                    qa_pairs.append({
                        "question": f"关于{para[:30]}?",
                        "answer": para,
                        "source": item.get("source"),
                        "metadata": item
                    })

        return qa_pairs

    def to_knowledge_graph(self, data: List[Dict]) -> Dict:
        """转换为知识图谱"""
        graph = {
            "nodes": [],
            "edges": []
        }

        # 简化实现:实际应使用NLP提取实体和关系
        for item in data:
            content = item.get("content", "")

            # 提取实体(简化)
            entities = self._extract_entities(content)

            # 添加节点
            for entity in entities:
                graph["nodes"].append({
                    "id": entity,
                    "label": entity,
                    "type": "entity"
                })

        return graph

    def _extract_entities(self, text: str) -> List[str]:
        """提取实体(简化)"""
        # 实际应使用NER模型
        # 这里简单按大写单词提取
        return list(set(re.findall(r'\b[A-Z][a-zA-Z]+\b', text)))

4. 知识提取

4.1 实体识别

from typing import List, Tuple

class EntityExtractor:
    """实体提取器"""

    def __init__(self, model_name: str = "BAAI/bge-large-zh"):
        from sentence_transformers import SentenceTransformer
        self.embedder = SentenceTransformer(model_name)

    def extract_entities(self, text: str) -> List[Dict]:
        """
        提取实体

        Returns:
            [
                {
                    "text": "实体文本",
                    "type": "实体类型",
                    "start": 起始位置,
                    "end": 结束位置,
                    "confidence": 置信度
                }
            ]
        """
        # 简化实现,实际应使用NER模型
        # 如 spaCy, Hugging Face transformers

        entities = []

        # 基于规则的实体提取
        rules = [
            # 提取邮箱
            (r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', 'email'),
            # 提取网址
            (r'https?://[^\s]+', 'url'),
            # 提取版本号
            (r'\b\d+\b\.\b\d+\b\.\b\d+\b', 'version'),
            # 提取日期
            (r'\d{4}-\d{2}-\d{2}', 'date'),
        ]

        for pattern, entity_type in rules:
            for match in re.finditer(pattern, text):
                entities.append({
                    "text": match.group(),
                    "type": entity_type,
                    "start": match.start(),
                    "end": match.end(),
                    "confidence": 0.9
                })

        return entities

    def extract_with_llm(self, text: str, schema: Dict) -> List[Dict]:
        """
        使用LLM提取实体

        Args:
            text: 输入文本
            schema: 实体schema,如 {"person": "人名", "org": "组织"}
        """
        prompt = f"""从以下文本中提取实体。

文本:
{text}

请提取以下类型的实体:
{json.dumps(schema, indent=2)}

请以JSON格式输出:
{{
    "entities": [
        {{
            "text": "实体文本",
            "type": "实体类型",
            "confidence": 0.95
        }}
    ]
}}"""

        response = self.llm.generate(prompt)
        data = json.loads(response)
        return data.get("entities", [])

    def link_entities(self, entities: List[Dict], kb) -> List[Dict]:
        """
        实体链接

        将提取的实体链接到知识库中的实体
        """
        linked = []

        for entity in entities:
            # 在知识库中搜索相似实体
            candidates = kb.search_entities(
                query=entity["text"],
                top_k=3
            )

            if candidates:
                entity["linked"] = candidates[0]["id"]
                entity["link_confidence"] = candidates[0]["score"]

            linked.append(entity)

        return linked

4.2 关系抽取

class RelationExtractor:
    """关系抽取器"""

    def __init__(self, llm=None):
        self.llm = llm

    def extract_relations(
        self,
        text: str,
        entities: List[Dict]
    ) -> List[Dict]:
        """
        抽取实体间的关系

        Args:
            text: 文本
            entities: 实体列表

        Returns:
            [
                {
                    "subject": "主体实体",
                    "relation": "关系",
                    "object": "客体实体",
                    "confidence": 置信度
                }
            ]
        """
        if not self.llm:
            # 基于规则的关系抽取
            return self._rule_based_extraction(text, entities)

        # 使用LLM抽取
        return self._llm_based_extraction(text, entities)

    def _rule_based_extraction(
        self,
        text: str,
        entities: List[Dict]
    ) -> List[Dict]:
        """基于规则的关系抽取"""
        relations = []
        entity_texts = [e["text"] for e in entities]

        # 定义关系模式
        patterns = [
            (r'({})\s*(是|属于)\s*({})', 'is_a'),
            (r'({})\s*(位于|在)\s*({})', 'located_in'),
            (r'({})\s*(创建|开发)\s*({})', 'created'),
            (r'({})\s*(使用|基于)\s*({})', 'uses'),
        ]

        for pattern, relation_type in patterns:
            for e1 in entity_texts:
                for e2 in entity_texts:
                    if e1 == e2:
                        continue

                    pattern_str = pattern.format(
                        re.escape(e1),
                        re.escape(e2)
                    )

                    if re.search(pattern_str, text):
                        relations.append({
                            "subject": e1,
                            "relation": relation_type,
                            "object": e2,
                            "confidence": 0.8
                        })

        return relations

    def _llm_based_extraction(
        self,
        text: str,
        entities: List[Dict]
    ) -> List[Dict]:
        """使用LLM抽取关系"""
        entity_list = "\n".join([
            f"- {e['text']} ({e.get('type', 'unknown')})"
            for e in entities
        ])

        prompt = f"""从以下文本中抽取实体间的关系。

文本:
{text}

实体:
{entity_list}

请以JSON格式输出:
{{
    "relations": [
        {{
            "subject": "主体实体",
            "relation": "关系",
            "object": "客体实体",
            "confidence": 0.95
        }}
    ]
}}"""

        response = self.llm.generate(prompt)
        data = json.loads(response)
        return data.get("relations", [])

4.3 知识图谱构建

class KnowledgeGraphBuilder:
    """知识图谱构建器"""

    def __init__(self):
        self.nodes = {}
        self.edges = []

    def add_node(self, id: str, label: str, **kwargs):
        """添加节点"""
        self.nodes[id] = {
            "id": id,
            "label": label,
            **kwargs
        }

    def add_edge(
        self,
        source: str,
        target: str,
        relation: str,
        **kwargs
    ):
        """添加边"""
        self.edges.append({
            "source": source,
            "target": target,
            "relation": relation,
            **kwargs
        })

   from documents(self, documents: List[Dict]):
        """从文档构建知识图谱"""
        extractor = EntityExtractor()
        relation_extractor = RelationExtractor()

        for doc in documents:
            content = doc["content"]

            # 提取实体
            entities = extractor.extract_entities(content)

            # 添加节点
            for entity in entities:
                node_id = f"{entity['type']}:{entity['text']}"
                self.add_node(
                    id=node_id,
                    label=entity["text"],
                    type=entity["type"]
                )

            # 抽取关系
            relations = relation_extractor.extract_relations(
                content,
                entities
            )

            # 添加边
            for rel in relations:
                source_id = f"entity:{rel['subject']}"
                target_id = f"entity:{rel['object']}"
                self.add_edge(
                    source=source_id,
                    target=target_id,
                    relation=rel["relation"]
                )

    def to_graph_json(self) -> Dict:
        """导出为图JSON"""
        return {
            "nodes": list(self.nodes.values()),
            "edges": self.edges
        }

    def save_to_file(self, file_path: str):
        """保存到文件"""
        import json
        with open(file_path, 'w') as f:
            json.dump(self.to_graph_json(), f, indent=2)

5. 知识索引

5.1 向量索引构建

from sentence_transformers import SentenceTransformer
import numpy as np

class VectorIndex:
    """向量索引"""

    def __init__(
        self,
        embedding_model: str = "BAAI/bge-small-zh-v1.5",
        dim: int = 768
    ):
        self.embedder = SentenceTransformer(embedding_model)
        self.dim = dim
        self.documents = {}  # id -> document
        self.embeddings = np.zeros((0, dim))  # embeddings matrix
        self.ids = []  # ids aligned with embeddings

    def add_document(self, doc: Dict):
        """添加文档"""
        doc_id = doc["id"]

        # 生成embedding
        text = doc.get("content", "")
        embedding = self.embedder.encode(text)

        # 存储
        self.documents[doc_id] = doc
        self.ids.append(doc_id)

        # 添加到embedding矩阵
        if len(self.embeddings) == 0:
            self.embeddings = embedding.reshape(1, -1)
        else:
            self.embeddings = np.vstack([self.embeddings, embedding])

    def add_documents(self, docs: List[Dict]):
        """批量添加文档"""
        for doc in docs:
            self.add_document(doc)

    def search(
        self,
        query: str,
        top_k: int = 10,
        min_score: float = 0.0
    ) -> List[Dict]:
        """
        搜索

        Returns:
            [
                {
                    "document": {...},
                    "score": 相似度分数
                }
            ]
        """
        # 生成查询embedding
        query_embedding = self.embedder.encode(query)

        # 计算相似度
        scores = np.dot(
            self.embeddings,
            query_embedding
        ) / (
            np.linalg.norm(self.embeddings, axis=1) *
            np.linalg.norm(query_embedding)
        )

        # 获取top_k
        top_indices = np.argsort(scores)[-top_k:][::-1]

        results = []
        for idx in top_indices:
            score = float(scores[idx])
            if score >= min_score:
                doc_id = self.ids[idx]
                results.append({
                    "document": self.documents[doc_id],
                    "score": score
                })

        return results

    def save(self, path: str):
        """保存索引"""
        import pickle
        with open(path, 'wb') as f:
            pickle.dump({
                "documents": self.documents,
                "embeddings": self.embeddings,
                "ids": self.ids,
                "dim": self.dim
            }, f)

    def load(self, path: str):
        """加载索引"""
        import pickle
        with open(path, 'rb') as f:
            data = pickle.load(f)
            self.documents = data["documents"]
            self.embeddings = data["embeddings"]
            self.ids = data["ids"]
            self.dim = data["dim"]

5.2 混合索引

class HybridIndex:
    """混合索引 - 向量 + 关键词"""

    def __init__(
        self,
        embedding_model: str = "BAAI/bge-small-zh-v1.5"
    ):
        self.vector_index = VectorIndex(embedding_model)
        self.keyword_index = {}  # word -> {doc_ids}

    def add_document(self, doc: Dict):
        """添加文档"""
        doc_id = doc["id"]
        content = doc.get("content", "")

        # 添加到向量索引
        self.vector_index.add_document(doc)

        # 添加到关键词索引
        words = self._extract_keywords(content)
        for word in words:
            if word not in self.keyword_index:
                self.keyword_index[word] = set()
            self.keyword_index[word].add(doc_id)

    def search(
        self,
        query: str,
        top_k: int = 10,
        vector_weight: float = 0.7,
        keyword_weight: float = 0.3
    ) -> List[Dict]:
        """
        混与其他搜索

        结合向量搜索和关键词搜索
        """
        # 向量搜索
        vector_results = self.vector_index.search(query, top_k * 2)

        # 关键词搜索
        keyword_results = self._keyword_search(query, top_k * 2)

        # 合并分数
        combined_scores = {}

        # 向量结果
        for result in vector_results:
            doc_id = result["document"]["id"]
            combined_scores[doc_id] = (
                combined_scores.get(doc_id, 0) +
                result["score"] * vector_weight
            )

        # 关键词结果
        for result in keyword_results:
            doc_id = result["document"]["id"]
            combined_scores[doc_id] = (
                combined_scores.get(doc_id, 0) +
                result["score"] * keyword_weight
            )

        # 排序
        sorted_ids = sorted(
            combined_scores.keys(),
            key=lambda x: combined_scores[x],
            reverse=True
        )

        # 构建结果
        results = []
        for doc_id in sorted_ids[:top_k]:
            results.append({
                "document": self.vector_index.documents[doc_id],
                "score": combined_scores[doc_id]
            })

        return results

    def _extract_keywords(self, text: str) -> List[str]:
        """提取关键词"""
        # 简化实现:分词
        import jieba
        words = jieba.lcut(text)

        # 过滤停用词
        stopwords = {
            "的", "了", "是", "在", "我", "有", "和", "就"
        }
        return [w for w in words if w not in stopwords and len(w) > 1]

    def _keyword_search(
        self,
        query: str,
        top_k: int
    ) -> List[Dict]:
        """关键词搜索"""
        keywords = self._extract_keywords(query)

        # 计算每个文档的匹配分数
        doc_scores = {}
        for keyword in keywords:
            if keyword in self.keyword_index:
                for doc_id in self.keyword_index[keyword]:
                    doc_scores[doc_id] = (
                        doc_scores.get(doc_id, 0) + 1
                    )

        # 归一化
        max_score = max(doc_scores.values()) if doc_scores else 1
        for doc_id in doc_scores:
            doc_scores[doc_id] /= max_score

        # 构建结果
        results = []
        for doc_id, score in sorted(
            doc_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )[:top_k]:
            results.append({
                "document": self.vector_index.documents[doc_id],
                "score": score
            })

        return results

6. 知识管理

6.1 知识库管理器

class KnowledgeBase:
    """知识库管理器"""

    def __init__(
        self,
        name: str,
        index_path: str = None
    ):
        self.name = name
        self.index_path = index_path or f"./indexes/{name}"

        # 初始化组件
        self.collector = DataCollector()
        self.cleaner = DataCleaner()
        self.index = VectorIndex()

        # 设置默认清洗器
        self.cleaner.add_cleaner(DataCleaner.remove_empty)
        self.cleaner.add_cleaner(DataCleaner.remove_duplicates)
        self.cleaner.add_cleaner(DataCleaner.normalize_whitespace)

    def add_source(self, source: DataSource):
        """添加数据源"""
        self.collector.add_source(source)

    def build(self):
        """构建知识库"""
        print(f"开始构建知识库: {self.name}")

        # 1.采集数据
        print("采集数据...")
        raw_data = = self.collector.collect_all()
        print(f"采集到 {len(raw_data)} 条原始数据")

        # 2. 清洗数据
        print("清洗数据...")
        cleaned_data = self.cleaner.clean(raw_data)
        print(f"清洗后 {len(cleaned_data)} 条数据")

        # 3. 转换为文档
        transformer = DataTransformer()
        documents = transformer.to_documents(cleaned_data)

        # 4. 构建索引
        print("构建索引...")
        self.index.add_documents(documents)

        # 5. 保存索引
        if self.index_path:
            import os
            os.makedirs(os.path.dirname(self.index_path), exist_ok=True)
            self.index.save(self.index_path)
            print(f"索引已保存到: {self.index_path}")

        print("知识库构建完成!")

    def search(
        self,
        query: str,
        top_k: int = 10
    ) -> List[Dict]:
        """搜索知识库"""
        return self.index.search(query, top_k=top_k)

    def load_index(self):
        """加载索引"""
        if self.index_path:
            self.index.load(self.index_path)
            print(f"已加载索引: {self.index_path}")
            print(f"索引包含 {len(self.index.documents)} 个文档")

    def update(self):
        """更新知识库(增量)"""
        # TODO: 实现增量更新
        pass

    def get_stats(self) -> Dict:
        """获取知识库统计信息"""
        return {
            "name": self.name,
            "document_count": len(self.index.documents),
            "index_path": self.index_path
        }

# ============== 使用示例 ==============

if __name__ == "__main__":
    # 创建知识库
    kb = KnowledgeBase(name="tech_docs")

    # 添加数据源
    kb.add_source(WebScraper([
        "https://python.org/doc"
    ]))

    kb.add_source(FileImporter([
        "./docs/*.md"
    ]))

    # 构建知识库
    kb.build()

    # 搜索
    results = kb.search("Python列表操作", top_k=3)
    for result in results:
        print(f"{result['score']:.2f}: {result['document']['content'][:100]}")

6.2 知识库版本管理

class KnowledgeBaseVersioning:
    """知识库版本管理"""

    def __init__(self, kb: KnowledgeBase):
        self.kb = kb
        self.versions: Dict[str, Dict] = {}

    def create_version(self, version: str) -> str:
        """创建版本快照"""
        import hashlib
        import json

        # 计算当前内容的hash
        content_hash = hashlib.md5(
            json.dumps(self.kb.index.documents).encode()
        ).hexdigest()

        version_id = f"{version}-{content_hash[:8]}"

        # 保存快照
        snapshot_path = f"{self.kb.index_path}.snapshot.{version_id}"
        self.kb.index.save(snapshot_path)

        self.versions[version_id] = {
            "version": version,
            "created_at": datetime.utcnow().isoformat(),
            "hash": content_hash,
            "snapshot_path": snapshot_path
        }

        return version_id

    def restore_version(self, version_id: str):
        """恢复版本"""
        if version_id not in self.versions:
            raise ValueError(f"版本不存在: {version_id}")

        snapshot_path = self.versions[version_id]["snapshot_path"]
        self.kb.index.load(snapshot_path)

    def list_versions(self) -> List[Dict]:
        """列出所有版本"""
        return list(self.versions.values())

7. 知识质量评估

7.1 质量指标

class KnowledgeQualityAssessor:
    """知识质量评估器"""

    def assess(self, kb: KnowledgeBase) -> Dict:
        """
        评估知识库质量

        Returns:
            {
                "completeness": 完整性分数,
                "accuracy": 准确性分数,
                "consistency": 一致性分数,
                "coverage": 覆盖率分数,
                "overall": 综合分数
            }
        """
        metrics = {}

        # 完整性
        metrics["completeness"] = self._assess_completeness(kb)

        # 准确性
        metrics["accuracy"] = self._assess_accuracy(kb)

        # 一致性
        metrics["consistency"] = self._assess_consistency(kb)

        # 覆盖率
        metrics["coverage"] = self._assess_coverage(kb)

        # 综合分数
        metrics["overall"] = sum(metrics.values()) / len(metrics)

        return metrics

    def _assess_completeness(self, kb: KnowledgeBase) -> float:
        """评估完整性"""
        # 检查必要字段
        required_fields = ["content", "metadata"]
        score = 0
        total = len(kb.index.documents)

        for doc in kb.index.documents.values():
            if all(f in doc for f in required_fields):
                score += 1

        return score / total if total > 0 else 0

    def _assess_accuracy(self, kb: KnowledgeBase) -> float:
        """评估准确性"""
        # 简化实现:检查文档长度
        min_length = 50
        score = 0
        total = len(kb.index.documents)

        for doc in kb.index.documents.values():
            if len(doc.get("content", "")) >= min_length:
                score += 1

        return score / total if total > 0 else 0

    def _assess_consistency(self, kb: KnowledgeBase) -> float:
        """评估一致性"""
        # 检查元数据一致性
        score = 1.0
        # TODO: 实现更复杂的一致性检查
        return score

    def _assess_coverage(self, kb: KnowledgeBase) -> float:
        """评估覆盖率"""
        # 检查文档类型分布
        types = set()
        for doc in kb.index.documents.values():
            doc_type = doc.get("metadata", {}).get("type")
            if doc_type:
                types.add(doc_type)

        # 至少3种类型认为覆盖良好
        return min(len(types) / 3, 1.0)

8. 实现示例

8.1 完整知识库构建流程

"""
完整领域知识库构建流程

1. 数据采集
2. 数据清洗
3. 知识提取
4. 索引构建
5. 质量评估
"""

class DomainKnowledgeBaseBuilder:
    """领域知识库构建器"""

    def __init__(self, domain: str, config: Dict = None):
        self.domain = domain
        self.config = config or {}

        # 初始化组件
        self.kb = KnowledgeBase(name=domain)

        # 配置清洗器
        self._setup_cleaners()

    def _setup_cleaners(self):
        """配置数据清洗器"""
        self.kb.cleaner.add_cleaner(DataCleaner.remove_empty)
        self.kb.cleaner.add_cleaner(DataCleaner.remove_duplicates)
        self.kb.cleaner.add_cleaner(DataCleaner.normalize_whitespace)
        self.kb.cleaner.add_cleaner(
            lambda d: DataCleaner.filter_by_length(d, min_length=50)
        )

    def add_web_sources(self, urls: List[str]):
        """添加网页数据源"""
        self.kb.add_source(WebScraper(urls))

    def add_file_sources(self, file_paths: List[str]):
        """添加文件数据源"""
        self.kb.add_source(FileImporter(file_paths))

    def add_api_source(
        self,
        api_url: str,
        api_key: str = None,
        params: Dict = None
    ):
        """添加API数据源"""
        self.kb.add_source(APIConnector(api_url, api_key, params))

    def build(self) -> Dict:
        """构建知识库"""
        print(f"=== 开始构建 {self.domain} 知识库 ===")

        # 1. 数据采集
        print("\n[1/5] 数据采集")
        raw_data = self.kb.collector.collect_all()
        print(f"    采集到 {len(raw_data)} 条原始数据")

        # 2. 数据清洗
        print("\n[2/5] 数据清洗")
        cleaned_data = self.kb.cleaner.clean(raw_data)
        print(f"    清洗后 {len(cleaned_data)} 条数据")

        # 3. 转换为文档
        print("\n[3/5] 文档转换")
        transformer = DataTransformer()
        documents = transformer.to_documents(cleaned_data)
        print(f"    生成 {len(documents)} 个文档")

        # 4. 构建索引
        print("\n[4/5] 构建索引")
        self.kb.index.add_documents(documents)
        print(f"    索引构建完成")

        # 5. 质量评估
        print("\n[5/5] 质量评估")
        assessor = KnowledgeQualityAssessor()
        quality = assessor.assess(self.kb)
        print(f"    完整性: {quality['completeness']:.2f}")
        print(f"    准确性: {quality['accuracy']:.2f}")
        print(f"    一致性: {quality['consistency']:.2f}")
        print(f"    覆盖率: {quality['coverage']:.2f}")
        print(f"    综合分数: {quality['overall']:.2f}")

        # 保存索引
        self.kb.save(self.kb.index_path)

        print(f"\n=== {self.domain} 知识库构建完成 ===")

        return {
            "document_count": len(documents),
            "quality": quality
        }

    def search(self, query: str, top_k: int = 5) -> List[Dict]:
        """搜索知识库"""
        return self.kb.search(query, top_k=top_k)

# ============== 使用示例 ==============

if __name__ == "__main__":
    # 创建构建器
    builder = DomainKnowledgeBaseBuilder(domain="python")

    # 添加数据源
    builder.add_web_sources([
        "https://docs.python.org/3/tutorial",
        "https://docs.python.org/3/library"
    ])

    builder.add_file_sources([
        "./docs/python/*.md"
    ])

    # 构建知识库
    result = builder.build()

    # 搜索
    query = "Python列表操作"
    results = builder.search(query, top_k=3)

    print(f"\n搜索: {query}")
    for i, result in enumerate(results, 1):
        doc = result["document"]
        print(f"\n[{i}] 相关度: {result['score']:.2f}")
        print(f"来源: {doc['metadata'].get('source', 'unknown')}")
        print(f"内容: {doc['content'][:200]}...")

面试高频问法

Q1: 如何构建一个领域知识库?

标准回答:

领域知识库构建流程:

1. 数据采集
   - 确定数据源:文档、API、网页、数据库
   - 采集原始数据
   - 支持增量更新

2. 数据处理
   - 数据清洗:去空、去重、格式化
   - 数据转换:标准化、结构化
   - 质量检查:长度、格式验证

3. 知识提取
   - 实体识别:提取关键实体
   - 关系抽取:构建实体关系
   - 属性抽取:提取实体属性

4. 索引构建
   - 向量索引:语义检索
   - 关键词索引:全文检索
   - 混合索引:结合两者

5. 质量评估
   - 完整性检查
   - 准确性验证
   - 覆盖率评估

实现:
```python
def build_knowledge_base(sources):
    # Step 1: 采集数据
    collector = DataCollector()
    for source in sources:
        collector.add_source(source)
    raw_data = collector.collect_all()

    # Step 2: 清洗数据
    cleaner = DataCleaner()
    cleaned = cleaner.clean(raw_data)

    # Step 3: 构建索引
    index = VectorIndex()
    index.add_documents(cleaned)

    return index</code></pre>
<p>```</p>
<h3>Q2: 如何实现知识库的增量更新?</h3>
<p>标准回答:</p>
<pre><code>增量更新策略:

1. 变更检测
   - 记录上次采集的hash
   - 比较当前和上次
   - 只处理变化的数据

2. 增量索引
   - 添加新文档
   - 更新已有文档
   - 删除过期文档

3. 状态管理
   - 保存同步状态
   - 记录文档版本
   - 支持回滚

实现:
```python
class IncrementalUpdater:
    def __init__(self, kb, state_file):
        self.kb = kb
        self.state_file = state_file
        self.state = self._load_state()

    def update(self, new_data):
        # 检测变化
        new_hash = self._compute_hash(new_data)
        last_hash = self.state.get("last_hash")

        if new_hash == last_hash:
            return "无变化"

        # 增量更新
        for doc in new_data:
            if doc["id"] in self.kb.documents:
                # 更新
                self.kb.update_document(doc)
            else:
                # 添加
                self.kb.add_document(doc)

        # 保存状态
        self.state["last_hash"] = new_hash
        self._save_state()

### Q3: 如何评估知识库的质量?

标准回答:

知识库质量评估维度:

  1. 完整性

    • 必要字段是否齐全
    • 文档内容是否完整
    • 元数据是否完整
  2. 准确性

    • 内容是否准确
    • 格式是否正确
    • 链接是否有效
  3. 一致性

    • 格式是否统一
    • 命名是否一致
    • 分类是否一致
  4. 覆盖率

    • 类型分布是否合理
    • 重要主题是否覆盖
    • 更新是否及时
  5. 可用性

    • 检索效果
    • 响应速度
    • 错误率

实现:

def assess_quality(kb):
    metrics = {}

    # 完整性
    complete = sum(1 for d in kb.documents if d.get("content"))
    metrics["completeness"] = complete / len(kb.documents)

    # 准确性
    valid_length = sum(1 for d in kb.documents
                      if len(d.get("content", "")) > 50)
    metrics["accuracy"] = valid_length / len(kb.documents)

    # 覆盖率
    types = set(d.get("type") for d in kb.documents)
    metrics["coverage"] = min(len(types) / 5, 1.0)

    # 综合分数
    metrics["overall"] = sum(metrics.values()) / len(metrics)

    return metrics


---

## 总结

### 知识库建设核心要点

| 要点 | 策略 |
|------|------|
| **数据采集** | 多源采集、增量更新 |
| **数据清洗** | 去重、规范化、过滤 |
| **知识提取** | 实体、关系、属性 |
| **索引构建** | 向量、关键词、混合 |
| **质量管理** | 完整性、准确性、覆盖率 |

### 最佳实践

1. **多源采集**:整合不同数据源
2. **增量更新**:支持实时更新
3. **数据清洗**:严格质量把控
4. **混合索引**:向量+关键词
5. **定期评估**:持续优化质量
close
arrow_upward