【AI Agent 知识库】24-知识图谱构建与应用

内容纲要

知识图谱构建与应用

目录

  1. 知识图谱概述
  2. 实体识别与抽取
  3. 关系抽取
  4. 知识图谱存储
  5. 知识推理
  6. 知识图谱应用
  7. 知识图谱质量评估
  8. 实现示例

1. 知识图谱概述

1.1 知识图谱定义

知识图谱(Knowledge Graph)是一种以图结构表示知识的系统,由实体(节点)和关系(边)组成。

┌─────────────────────────────────────────────────┐
│              知识图谱基本结构                    │
├─────────────────────────────────────────────────┤
│                                                     │
│      (实体)              (实体)                     │
│     Python  ──────► created ──────►  Guido  │
│       │        (关系)      │                       │
│       ▼                   ▼                       │
│     used_by ◄──── used ◄────── NumPy            │
│              (关系)                                  │
│                                                     │
│  实体(Entity) + 关系(Relation) + 属性(Property)  │
│                                                     │
└─────────────────────────────────────────────────┘

1.2 知识图谱类型

类型 特点 应用场景
领域图谱 聚焦特定领域 垂直领域知识
通用图谱 覆盖广泛知识 通用问答、搜索
常识图谱 常识性知识 常识问答
企业图谱 企业内部知识 企业知识管理
时序图谱 包含时间信息 事件追踪、溯源

1.3 知识图谱价值

┌─────────────────────────────────────────────────┐
│              知识图谱的价值                      │
├─────────────────────────────────────────────────┤
│                                                     │
│  ✓ 结构化知识存储                                   │
│  ✓ 支持复杂推理                                    │
│  ✓ 提供可解释性                                    │
│  ✓ 支持知识融合                                    │
│  ✓ 增强检索能力                                    │
│  ✓ 支持知识演化                                    │
│                                                     │
└─────────────────────────────────────────────────┘

2. 实体识别与抽取

2.1 实体类型定义

from typing import List, Dict, Set, Optional
from dataclasses import dataclass
from enum import Enum

class EntityType(Enum):
    """实体类型"""
    PERSON = "person"           # 人物
    ORGANIZATION = "organization" # 组织
    LOCATION = "location"       # 地点
    PRODUCT = "product"         # 产品
    CONCEPT = "concept"         # 概念
    EVENT = "event"            # 事件
    DATE = "date"              # 日期
    NUMBER = "number"          # 数字
    URL = "url"                # 网址
    EMAIL = "email"            # 邮箱
    CUSTOM = "custom"           # 自定义

@dataclass
class Entity:
    """实体"""
    id: str
    text: str                   # 实体文本
    type: EntityType            # 实体类型
    start_pos: int             # 起始位置
    end_pos: int               # 结束位置
    properties: Dict = None     # 属性
    aliases: List[str] = None   # 别名
    confidence: float = 1.0     # 置信度
    source: str = None          # 来源

    def __post_init__(self):
        if self.properties is None:
            self.properties = {}
        if self.aliases is None:
            self.aliases = []

2.2 基于规则的实体识别

import re
from datetime import datetime

class RuleBasedEntityExtractor:
    """基于规则的实体提取器"""

    def __init__(self):
        # 定义实体规则
        self.rules = {
            EntityType.EMAIL: [
                r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
            ],
            EntityType.URL: [
                r'https?://[^\s<>"{}|\\^`\[\]]+',
                r'www\.[^\s<>"{}|\\^`\[\]]+',
                r'[A-Za-z0-9.-]+\.[A-Za-z]{2,}[^\s]*'
            ],
            EntityType.NUMBER: [
                r'\b\d+\.?\d*\b',
                r'\b\d{1,3}(,\d{3})*(\.\d+)?\b'
            ],
            EntityType.DATE: [
                r'\b\d{4}-\d{1,2}-\d{1,2}\b',  # YYYY-MM-DD
                r'\b\d{1,2}/\d{1,2}/\d{4}\b',  # MM/DD/YYYY
                r'\b\d{4}年\d{1,2}月\d{1,2}日\b'
            ],
            EntityType.PHONE: [
                r'\b1[3-9]\d{9}\b',
                r'\b\d{3}-\d{4}-\d{4}\b'
            ]
        }

    def extract(self, text: str) -> List[Entity]:
        """
        提取实体

        Args:
            text: 输入文本

        Returns:
            实体列表
        """
        entities = []

        for entity_type, patterns in self.rules.items():
            for pattern in patterns:
                for match in re.finditer(pattern, text):
                    entity = Entity(
                        id=self._generate_id(),
                        text=match.group(),
                        type=entity_type,
                        start_pos=match.start(),
                        end_pos=match.end(),
                        properties=self._extract_properties(match.group(), entity_type)
                    )
                    entities.append(entity)

        # 去重
        entities = self._deduplicate(entities)

        return entities

    def _generate_id(self) -> str:
        """生成实体ID"""
        import uuid
        return str(uuid.uuid4())

    def _extract_properties(self, text: str, entity_type: EntityType) -> Dict:
        """提取实体属性"""
        properties = {}

        if entity_type == EntityType.EMAIL:
            # 提取邮箱域名
            if '@' in text:
                properties['domain'] = text.split('@')[1]

        elif entity_type == EntityType.URL:
            # 提取URL域名
            from urllib.parse import urlparse
            try:
                parsed = urlparse(text)
                properties['domain'] = parsed.netloc
            except:
                pass

        elif entity_type == EntityType.DATE:
            # 尝试解析日期
            try:
                from dateutil.parser import parse
                parsed_date = parse(text)
                properties['date'] = parsed_date.isoformat()
            except:
                pass

        return properties

    def _deduplicate(self, entities: List[Entity]) -> List[Entity]:
        """去重(基于文本和位置)"""
        seen = set()
        unique = []

        for entity in entities:
            key = (entity.text, entity.start_pos, entity.end_pos)
            if key not in seen:
                seen.add(key)
                unique.append(entity)

        return unique

2.3 基于模型的实体识别

class ModelBasedEntityExtractor:
    """基于模型的实体提取器"""

    def __init__(self, model_name: str = None, use_llm: bool = False):
        self.use_llm = use_llm

        if use_llm:
            # 使用LLM提取
            import openai
            self.llm = openai.ChatCompletion
        else:
            # 使用NER模型
            try:
                from transformers import AutoTokenizer, AutoModelForTokenClassification
                from transformers import pipeline

                model_name = model_name or "dbmdz/bert-large-cased-finetuned-conll03-english"
                self.ner_pipeline = pipeline(
                    "ner",
                    model=model_name,
                    tokenizer=model_name,
                    aggregation_strategy="simple"
                )
            except ImportError:
                raise ImportError("transformers not installed")

    def extract(self, text: str) -> List[Entity]:
        """提取实体"""
        if self.use_llm:
            return self._extract_with_llm(text)
        else:
            return self._extract_with_ner(text)

    def _extract_with_llm(self, text: str) -> List[Entity]:
        """使用LLM提取实体"""
        prompt = f"""从以下文本中提取实体。

文本:
{text}

请识别以下类型的实体:
- 人物(person)
- 组织(organization)
- 地点(location)
- 产品(product)
- 概念(concept)
- 日期(date)
- 网址(url)
- 邮箱(email)

请以JSON格式输出:
{{
    "entities": [
        {{
            "text": "实体文本",
            "type": "实体类型",
            "start_pos": 起始位置,
            "end_pos": 结束位置
        }}
    ]
}}"""

        response = self.llm.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )

        import json
        data = json.loads(response.choices[0].message.content)
        entities = []

        for entity_data in data.get("entities", []):
            try:
                entity_type = EntityType(entity_data["type"])
            except ValueError:
                entity_type = EntityType.CUSTOM

            entity = Entity(
                id=self._generate_id(),
                text=entity_data["text"],
                type=entity_type,
                start_pos=entity_data["start_pos"],
                end_pos=entity_data["end_pos"]
            )
            entities.append(entity)

        return entities

    def _extract_with_ner(self, text: str) -> List[Entity]:
        """使用NER模型提取"""
        results = self.ner_pipeline(text)
        entities = []

        # 类型映射
        type_mapping = {
            'PER': EntityType.PERSON,
            'ORG': EntityType.ORGANIZATION,
            'LOC': EntityType.LOCATION,
            'MISC': EntityType.CONCEPT,
            'DATE': EntityType.DATE,
            'NUMBER': EntityType.NUMBER
        }

        for result in results:
            if result['entity_group'] == 'O':
                continue

            entity_type = type_mapping.get(
                result['entity_group'],
                EntityType.CUSTOM
            )

            entity = Entity(
                id=self._generate_id(),
                text=result['word'],
                type=entity_type,
                start_pos=result['start'],
                end_pos=result['end'],
                confidence=result['score']
            )
            entities.append(entity)

        return entities

    def _generate_id(self) -> str:
        import uuid
        return str(uuid.uuid4())

2.4 实体链接

class EntityLinker:
    """实体链接器"""

    def __init__(self, knowledge_base=None):
        self.knowledge_base = knowledge_base

    def link_entities(
        self,
        entities: List[Entity],
        candidates: List[Entity] = None
    ) -> List[Entity]:
        """
        链接实体到知识库

        Args:
            entities: 待链接的实体
            candidates: 候选实体(来自知识库)

        Returns:
            链接后的实体列表
        """
        if candidates is None and self.knowledge_base:
            candidates = self.knowledge_base.search_all_entities()

        for entity in entities:
            # 找到最相似的候选
            matches = self._find_matches(entity, candidates)

            if matches:
                # 链接到最佳匹配
                best_match = matches[0]
                entity.linked_id = best_match.id
                entity.linked_text = best_match.text
                entity.link_confidence = best_match['score']

        return entities

    def _find_matches(
        self,
        entity: Entity,
        candidates: List[Entity],
        top_k: int = 3
    ) -> List[Dict]:
        """找到匹配的候选实体"""
        matches = []

        for candidate in candidates:
            # 文本相似度
            text_sim = self._text_similarity(entity.text, candidate.text)

            # 类型匹配
            type_match = entity.type == candidate.type

            if type_match and text_sim > 0.8:
                matches.append({
                    "candidate": candidate,
                    "score": text_sim
                })

        # 按分数排序
        matches.sort(key=lambda x: x['score'], reverse=True)

        return matches[:top_k]

    def _text_similarity(self, text1: str, text2: str) -> float:
        """计算文本相似度"""
        # 简化实现:Jaccard相似度
        set1 = set(text1.lower().split())
        set2 = set(text2.lower().split())

        if not set1 or not set2:
            return 0

        intersection = len(set1 & set2)
        union = len(set1 | set2)

        return intersection / union

3. 关系抽取

3.1 关系定义

from enum import Enum

class RelationType(Enum):
    """关系类型"""
    # 通用关系
    RELATED_TO = "related_to"
    PART_OF = "part_of"
    INSTANCE_OF = "instance_of"
    SIMILAR_TO = "similar_to"

    # 人物关系
    WORKS_AT = "works_at"
    FOUNDED = "founded"
    COLLABORATES_WITH = "collaborates_with"
    STUDENT_OF = "student_of"

    # 组织关系
    SUBSIDIARY_OF = "subsidiary_of"
    PARTNERS_WITH = "partners_with"
    COMPETITOR_OF = "competitor_of"

    # 产品关系
    VERSION_OF = "version_of"
    DEPENDS_ON = "depends_on"
    COMPATIBLE_WITH = "compatible_with"

    # 时间关系
    HAPPENED_BEFORE = "happened_before"
    HAPPENED_AFTER = "happened_after"

    # 自定义
    CUSTOM = "custom"

@dataclass
class Relation:
    """关系"""
    id: str
    subject_id: str             # 主体实体ID
    object_id: str             # 客体实体ID
    relation_type: RelationType  # 关系类型
    properties: Dict = None     # 关系属性
    confidence: float = 1.0     # 置信度
    source: str = None          # 来源

    def __post_init__(self):
        if self.properties is None:
            self.properties = {}

3.2 基于规则的关系抽取

class RuleBasedRelationExtractor:
    """基于规则的关系提取器"""

    def __init__(self):
        # 定义关系模式
        self.patterns = [
            # 人物-组织关系
            {
                "type": RelationType.WORKS_AT,
                "patterns": [
                    r'({}\s+(is|are|was|were)\s+(a|an|the|at)\s+({})',
                    r'({})\s+(works|worked)\s+(at|for)\s+({})'
                ],
                "subject_type": EntityType.PERSON,
                "object_type": EntityType.ORGANIZATION
            },
            # 创始关系
            {
                "type": RelationType.FOUNDED,
                "patterns": [
                    r'({})\s+founded\s+({})',
                    r'({})\s+was\s+founded\s+by\s+({})'
                ],
                "subject_type": EntityType.PERSON,
                "object_type": EntityType.ORGANIZATION
            },
            # 依赖关系
            {
                "type": RelationType.DEPENDS_ON,
                "patterns": [
                    r'({})\s+(uses|used|depends\s+on)\s+({})'
                ],
                "subject_type": EntityType.PRODUCT,
                "object_type": EntityType.PRODUCT
            },
            # 版本关系
            {
                "type": RelationType.VERSION_OF,
                "patterns": [
                    r'({})\s+is\s+(a|an)\s+version\s+of\s+({})',
                    r'({})\s+v\d+(\.\d+)*\s+-\s+({})'
                ],
                "subject_type": EntityType.PRODUCT,
                "object_type": EntityType.PRODUCT
            }
        ]

    def extract(
        self,
        text: str,
        entities: List[Entity]
    ) -> List[Relation]:
        """
        抽取关系

        Args:
            text: 输入文本
            entities: 已识别的实体

        Returns:
            关系列表
        """
        relations = []

        # 按类型分组实体
        entities_by_type = {}
        for entity in entities:
            if entity.type not in entities_by_type:
                entities_by_type[entity.type] = []
            entities_by_type[entity.type].append(entity)

        # 对每个模式尝试提取
        for relation_config in self.patterns:
            relation_type = relation_config["type"]
            subject_type = relation_config["subject_type"]
            object_type = relation_config["object_type"]

            # 检查是否有对应类型的实体
            if subject_type not in entities_by_type or \
               object_type not in entities_by_type:
                continue

            # 尝试匹配模式
            for pattern in relation_config["patterns"]:
                matches = self._match_pattern(text, pattern)
                relations.extend(matches)

        # 去重
        relations = self._deduplicate(relations)

        return relations

    def _match_pattern(
        self,
        text: str,
        pattern: str
    ) -> List[Relation]:
        """匹配关系模式"""
        # 将模式中的{}替换为非贪婪匹配
        regex_pattern = pattern.replace('{}', '(.+?)')
        matches = []

        for match in re.finditer(regex_pattern, text, re.IGNORECASE):
            subject_text = match.group(1)
            object_text = match.group(2)

            relation = Relation(
                id=self._generate_id(),
                subject_id=self._generate_id(),
                object_id=self._generate_id(),
                relation_type=RelationType.CUSTOM,
                properties={
                    "subject_text": subject_text,
                    "object_text": object_text,
                    "pattern": pattern
                },
                confidence=0.8
            )
            matches.append(relation)

        return matches

    def _deduplicate(self, relations: List[Relation]) -> List[Relation]:
        """去重"""
        seen = set()
        unique = []

        for relation in relations:
            key = (
                relation.subject_id,
                relation.object_id,
                relation.relation_type
            )
            if key not in seen:
                seen.add(key)
                unique.append(relation)

        return unique

    def _generate_id(self) -> str:
        import uuid
        return str(uuid.uuid4())

3.3 基于模型的关系抽取

class ModelBasedRelationExtractor:
    """基于模型的关系提取器"""

    def __init__(self, llm=None):
        self.llm = llm

    def extract(
        self,
        text: str,
        entities: List[Entity]
    ) -> List[Relation]:
        """
        使用LLM抽取关系

        Args:
            text: 输入文本
            entities: 已识别的实体

        Returns:
            关系列表
        """
        if not entities:
            return []

        # 构建实体列表
        entity_list = "\n".join([
            f"{i+1}. {e.text} ({e.type.value})"
            for i, e in enumerate(entities)
        ])

        prompt = f"""从以下文本中抽取实体间的关系。

文本:
{text}

实体列表:
{entity_list}

请识别实体间的关系,并说明关系类型。

关系类型包括:
- works_at: 工作于
- founded: 创建
- collaborates_with: 合作
- part_of: 属于
- depends_on: 依赖
- version_of: 版本
- related_to: 相关

请以JSON格式输出:
{{
    "relations": [
        {{
            "subject": "主体实体文本",
            "object": "客体实体文本",
            "relation_type": "关系类型",
            "confidence": 0.95
        }}
    ]
}}"""

        response = self.llm.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )

        import json
        data = json.loads(response.choices[0].message.content)
        relations = []

        # 构建实体文本到ID的映射
        entity_map = {e.text: e.id for e in entities}

        for rel_data in data.get("relations", []):
            try:
                relation_type = RelationType(rel_data["relation_type"])
            except ValueError:
                relation_type = RelationType.CUSTOM

            subject_id = entity_map.get(rel_data["subject"])
            object_id = entity_map.get(rel_data["object"])

            if subject_id and object_id:
                relation = Relation(
                    id=self._generate_id(),
                    subject_id=subject_id,
                    object_id=object_id,
                    relation_type=relation_type,
                    confidence=rel_data.get("confidence", 1.0)
                )
                relations.append(relation)

        return relations

    def _generate_id(self) -> str:
        import uuid
        return str(uuid.uuid4())

4. 知识图谱存储

4.1 内存存储

from typing import Dict, List, Set, Tuple

class InMemoryKnowledgeGraph:
    """内存知识图谱"""

    def __init__(self):
        self.entities: Dict[str, Entity] = {}
        self.relations: Dict[str, Relation] = {}
        self.adjacency_list: Dict[str, Set[str]] = {}  # subject -> objects

    def add_entity(self, entity: Entity) -> bool:
        """添加实体"""
        if entity.id in self.entities:
            return False

        self.entities[entity.id] = entity
        self.adjacency_list[entity.id] = set()
        return True

    def add_relation(self, relation: Relation) -> bool:
        """添加关系"""
        if relation.id in self.relations:
            return False

        if relation.subject_id not in self.entities or \
           relation.object_id not in self.entities:
            return False

        self.relations[relation.id] = relation
        self.adjacency_list[relation.subject_id].add(relation.object_id)
        return True

    def get_entity(self, entity_id: str) -> Optional[Entity]:
        """获取实体"""
        return self.entities.get(entity_id)

    def get_entities(
        self,
        entity_type: EntityType = None
    ) -> List[Entity]:
        """获取实体"""
        entities = list(self.entities.values())

        if entity_type:
            entities = [e for e in entities if e.type == entity_type]

        return entities

    def get_relations(
        self,
        subject_id: str = None,
        object_id: str = None,
        relation_type: RelationType = None
    ) -> List[Relation]:
        """获取关系"""
        relations = list(self.relations.values())

        if subject_id:
            relations = [r for r in relations if r.subject_id == subject_id]

        if object_id:
            relations = [r for r in relations if r.object_id == object_id]

        if relation_type:
            relations = [r for r in relations if r.relation_type == relation_type]

        return relations

    def get_neighbors(self, entity_id: str) -> List[Entity]:
        """获取邻居实体"""
        if entity_id not in self.adjacency_list:
            return []

        neighbor_ids = self.adjacency_list[entity_id]
        return [self.entities[eid] for eid in neighbor_ids if eid in self.entities]

    def find_path(
        self,
        start_id: str,
        end_id: str,
        max_depth: int = 5
    ) -> List[str]:
        """查找路径"""
        from collections import deque

        if start_id not in self.entities or end_id not in self.entities:
            return []

        queue = deque([(start_id, [start_id])])
        visited = {start_id}

        while queue:
            current_id, path = queue.popleft()

            if current_id == end_id:
                return path

            if len(path) >= max_depth:
                continue

            for neighbor_id in self.adjacency_list.get(current_id, []):
                if neighbor_id not in visited:
                    visited.add(neighbor_id)
                    queue.append((neighbor_id, path + [neighbor_id]))

        return []

    def to_json(self) -> Dict:
        """导出为JSON"""
        return {
            "entities": [
                {
                    "id": e.id,
                    "text": e.text,
                    "type": e.type.value,
                    "properties": e.properties
                }
                for e in self.entities.values()
            ],
            "relations": [
                {
                    "id": r.id,
                    "subject_id": r.subject_id,
                    "object_id": r.object_id,
                    "type": r.relation_type.value,
                    "properties": r.properties
                }
                for r in self.relations.values()
            ]
        }

4.2 Neo4j存储

class Neo4jKnowledgeGraph:
    """Neo4j知识图谱"""

    def __init__(
        self,
        uri: str = "bolt://localhost:7687",
        username: str = "neo4j",
        password: str = None
    ):
        from neo4j import GraphDatabase
        self.driver = GraphDatabase.driver(uri, auth=(username, password))

    def add_entity(self, entity: Entity) -> bool:
        """添加实体(节点)"""
        try:
            with self.driver.session() as session:
                cypher = """
                MERGE (e:Entity {id: $id})
                SET e.text = $text,
                    e.type = $type,
                    e.properties = $properties
                RETURN e
                """
                result = session.run(
                    cypher,
                    id=entity.id,
                    text=entity.text,
                    type=entity.type.value,
                    properties=entity.properties
                )
                return result.single() is not None
        except Exception as e:
            print(f"添加实体失败: {e}")
            return False

    def add_relation(self, relation: Relation) -> bool:
        """添加关系(边)"""
        try:
            with self.driver.session() as session:
                cypher = """
                MATCH (s:Entity {id: $subject_id})
                MATCH (o:Entity {id: $object_id})
                MERGE (s)-[r:RELATION {
                    id: $id,
                    type: $type,
                    properties: $properties
                }]->(o)
                RETURN r
                """
                result = session.run(
                    cypher,
                    subject_id=relation.subject_id,
                    object_id=relation.object_id,
                    id=relation.id,
                    type=relation.relation_type.value,
                    properties=relation.properties
                )
                return result.single() is not None
        except Exception as e:
            print(f"添加关系失败: {e}")
            return False

    def get_entity(self, entity_id: str) -> Optional[Dict]:
        """获取实体"""
        try:
            with self.driver.session() as session:
                cypher = """
                MATCH (e:Entity {id: $id})
                RETURN e
                """
                result = session.run(cypher, id=entity_id)
                record = result.single()
                return dict(record["e"]) if record else None
        except Exception as e:
            print(f"获取实体失败: {e}")
            return None

    def get_relations(
        self,
        subject_id: str = None,
        relation_type: str = None
    ) -> List[Dict]:
        """获取关系"""
        try:
            with self.driver.session() as session:
                cypher = """
                MATCH (s:Entity)-[r:RELATION]->(o:Entity)
                """

                conditions = []
                params = {}

                if subject_id:
                    conditions.append("s.id = $subject_id")
                    params["subject_id"] = subject_id

                if relation_type:
                    conditions.append("r.type = $type")
                    params["type"] = relation_type

                if conditions:
                    cypher += " WHERE " + " AND ".join(conditions)

                cypher += """
                RETURN s, r, o
                """

                result = session.run(cypher, **params)
                return [dict(record) for record in result]
        except Exception as e:
            print(f"获取关系失败: {e}")
            return []

    def query(self, cypher: str, **params) -> List[Dict]:
        """执行Cypher查询"""
        try:
            with self.driver.session() as session:
                result = session.run(cypher, **params)
                return [dict(record) for record in result]
        except Exception as e:
            print(f"查询失败: {e}")
            return []

    def close(self):
        """关闭连接"""
        self.driver.close()

5. 知识推理

5.1 基于规则的推理

class RuleBasedReasoner:
    """基于规则的推理器"""

    def __init__(self):
        # 定义推理规则
        self.rules = [
            # 传递性规则
            {
                "name": "transitivity",
                "description": "如果A related_to B 且 B related_to C,则 A related_to C",
                "pattern": {
                    "relation1": RelationType.RELATED_TO,
                    "relation2": RelationType.RELATED_TO,
                    "inferred": RelationType.RELATED_TO
                },
                "confidence": 0.6
            },
            # 层次性规则
            {
                "name": "hierarchy",
                "description": "如果A part_of B 且 B part_of C,则 A part_of C",
                "pattern": {
                    "relation1": RelationType.PART_OF,
                    "relation2": RelationType.PART_OF,
                    "inferred": RelationType.PART_OF
                },
                "confidence": 0.8
            }
        ]

    def infer(
        self,
        kg: InMemoryKnowledgeGraph
    ) -> List[Relation]:
        """
        执行推理

        Args:
            kg: 知识图谱

        Returns:
            推理出的新关系
        """
        inferred_relations = []

        for rule in self.rules:
            pattern = rule["pattern"]

            # 找到匹配规则的关系对
            relations = kg.get_relations(
                relation_type=pattern["relation1"]
            )

            for rel1 in relations:
                # 检查是否存在第二个关系
                rel2_candidates = kg.get_relations(
                    subject_id=rel1.object_id,
                    relation_type=pattern["relation2"]
                )

                for rel2 in rel2_candidates:
                    # 检查是否已存在推理关系
                    existing = kg.get_relations(
                        subject_id=rel1.subject_id,
                        object_id=rel2.object_id,
                        relation_type=pattern["inferred"]
                    )

                    if not existing:
                        # 创建推理关系
                        inferred_relation = Relation(
                            id=self._generate_id(),
                            subject_id=rel1.subject_id,
                            object_id=rel2.object_id,
                            relation_type=pattern["inferred"],
                            properties={
                                "inferred": True,
                                "rule": rule["name"],
                                "source_relations": [rel1.id, rel2.id]
                            },
                            confidence=rule["confidence"]
                        )
                        inferred_relations.append(inferred_relation)

        return inferred_relations

    def _generate_id(self) -> str:
        import uuid
        return str(uuid.uuid4())

5.2 基于路径的推理

class PathBasedReasoner:
    """基于路径的推理器"""

    def __init__(self):
        pass

    def infer_by_path(
        self,
        kg: InMemoryKnowledgeGraph,
        start_id: str,
        path_pattern: List[RelationType],
        max_depth: int = 5
    ) -> List[Tuple[str, List[Relation]]]:
        """
        沿路径模式推理

        Args:
            kg: 知识图谱
            start_id: 起始实体ID
            path_pattern: 路径模式 [RELATION_TYPE1, RELATION_TYPE2, ...]
            max_depth: 最大深度

        Returns:
            [(end_id, path_relations), ...]
        """
        if not path_pattern:
            return []

        results = []

        # 递归查找路径
        def dfs(current_id, current_depth, current_path):
            if current_depth >= len(path_pattern):
                # 找到完整路径
                results.append((current_id, current_path))
                return

            if current_depth >= max_depth:
                return

            # 查找下一步的关系
            target_relation = path_pattern[current_depth]
            relations = kg.get_relations(
                subject_id=current_id,
                relation_type=target_relation
            )

            for relation in relations:
                dfs(
                    relation.object_id,
                    current_depth + 1,
                    current_path + [relation]
                )

        dfs(start_id, 0, [])
        return results

    def find_common_type(
        self,
        kg: InMemoryKnowledgeGraph,
        entity_ids: List[str]
    ) -> Optional[str]:
        """
        查找实体的共同类型

        Args:
            kg: 知识图谱
            entity_ids: 实体ID列表

        Returns:
            共同类型ID(如果有)
        """
        if len(entity_ids) < 2:
            return None

        # 查找第一个实体的类型
        first_entity = kg.get_entity(entity_ids[0])
        if not first_entity:
            return None

        # 查找instance_of关系
        type_relations = kg.get_relations(
            subject_id=first_entity.id,
            relation_type=RelationType.INSTANCE_OF
        )

        candidate_type_ids = [r.object_id for r in type_relations]

        # 检查其他实体是否有相同的类型
        for type_id in candidate_type_ids:
            all_have_type = True

            for entity_id in entity_ids[1:]:
                relations = kg.get_relations(
                    subject_id=entity_id,
                    object_id=type_id,
                    relation_type=RelationType.INSTANCE_OF
                )

                if not relations:
                    all_have_type = False
                    break

            if all_have_type:
                return type_id

        return None

6. 知识图谱应用

6.1 知识增强检索

class KnowledgeEnhancedRetriever:
    """知识增强检索器"""

    def __init__(self, kg: InMemoryKnowledgeGraph):
        self.kg = kg

    def retrieve_with_knowledge(
        self,
        query: str,
        top_k: int = 10
    ) -> List[Dict]:
        """
        结合知识图谱检索

        1. 实体识别
        2. 路径查找
        3. 扩展检索
        """
        # 1. 识别查询中的实体
        extractor = ModelBasedEntityExtractor()
        entities = extractor.extract(query)

        if not entities:
            return []

        # 2. 查找相关实体和路径
        related_entities = []

        for entity in entities:
            # 获取邻居
            neighbors = self.kg.get_neighbors(entity.id)
            related_entities.extend(neighbors)

            # 查找相关实体
            related = self.kg.get_relations(subject_id=entity.id)
            for rel in related:
                neighbor = self.kg.get_entity(rel.object_id)
                if neighbor:
                    related_entities.append(neighbor)

        # 去重
        unique_entities = {}
        for e in related_entities:
            if e.id not in unique_entities:
                unique_entities[e.id] = e

        # 3. 构建结果
        results = []
        for entity_id, entity in unique_entities.items():
            # 计算相关性分数
            score = self._calculate_relevance(query, entity)

            if score > 0.5:
                # 获取实体描述
                results.append({
                    "entity": entity,
                    "score": score,
                    "context": self._get_entity_context(entity)
                })

        # 按分数排序
        results.sort(key=lambda x: x["score"], reverse=True)

        return results[:top_k]

    def _calculate_relevance(self, query: str, entity: Entity) -> float:
        """计算相关性分数"""
        # 简化实现:基于文本相似度
        query_words = set(query.lower().split())
        entity_words = set(entity.text.lower().split())

        if not query_words or not entity_words:
            return 0

        intersection = len(query_words & entity_words)
        union = len(query_words | entity_words)

        return intersection / union

    def _get_entity_context(self, entity: Entity) -> str:
        """获取实体上下文"""
        # 获取相关关系
        relations = self.kg.get_relations(subject_id=entity.id)

        context_parts = [f"{entity.text} ({entity.type.value})"]

        for rel in relations[:3]:  # 最多3个关系
            neighbor = self.kg.get_entity(rel.object_id)
            if neighbor:
                context_parts.append(
                    f"{rel.relation_type.value} {neighbor.text}"
                )

        return ", ".join(context_parts)

6.2 知识问答

class KnowledgeQA:
    """知识问答"""

    def __init__(self, kg: InMemoryKnowledgeGraph, llm=None):
        self.kg = kg
        self.llm = llm

    def answer(
        self,
        question: str,
        use_reasoning: bool = True
    ) -> Dict:
        """
        基于知识图谱回答问题

        Args:
            question: 问题
            use_reasoning: 是否使用推理

        Returns:
            {
                "answer": str,
                "entities": List[Entity],
                "relations": List[Relation],
                "reasoning_path": List[str]
            }
        """
        # 1. 识别问题中的实体
        extractor = ModelBasedEntityExtractor()
        entities = extractor.extract(question)

        if not entities:
            return {
                "answer": "无法识别相关实体",
                "entities": [],
                "relations": []
            }

        # 2. 查找相关实体和关系
        main_entity = entities[0]
        relations = self.kg.get_relations(subject_id=main_entity.id)

        # 3. 推理
        reasoning_path = []
        if use_reasoning:
            reasoner = RuleBasedReasoner()
            inferred = reasoner.infer(self.kg)

            # 检查推理结果是否相关
            relevant_inferred = [
                r for r in inferred
                if r.subject_id == main_entity.id or
                   r.object_id == main_entity.id
            ]
            relations.extend(relevant_inferred)

            if relevant_inferred:
                reasoning_path = [r.relation_type.value for r in relevant_inferred]

        # 4. 构建上下文
        context = self._build_context(main_entity, relations)

        # 5. 使用LLM生成答案
        if self.llm:
            answer = self._generate_with_llm(question, context)
        else:
            answer = self._generate_simple(context)

        return {
            "answer": answer,
            "entities": [main_entity] + [
                self.kg.get_entity(r.object_id)
                for r in relations[:5]
                if self.kg.get_entity(r.object_id)
            ],
            "relations": relations[:5],
            "reasoning_path": reasoning_path
        }

    def _build_context(
        self,
        entity: Entity,
        relations: List[Relation]
    ) -> str:
        """构建上下文"""
        context_parts = [f"{entity.text} ({entity.type.value})"]

        for rel in relations[:10]:
            neighbor = self.kg.get_entity(rel.object_id)
            if neighbor:
                context_parts.append(
                    f"- {rel.relation_type.value}: {neighbor.text} ({neighbor.type.value})"
                )

        return "\n".join(context_parts)

    def _generate_with_llm(self, question: str, context: str) -> str:
        """使用LLM生成答案"""
        prompt = f"""基于以下知识回答问题。

知识:
{context}

问题:{question}

请基于上述知识回答,如果信息不足请说明。"""

        response = self.llm.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}]
        )

        return response.choices[0].message.content

    def _generate_simple(self, context: str) -> str:
        """简单生成答案"""
        return f"基于知识库,{context}"

7. 知识图谱质量评估

7.1 质量指标

class KnowledgeGraphQualityAssessor:
    """知识图谱质量评估器"""

    def __init__(self):
        pass

    def assess(self, kg: InMemoryKnowledgeGraph) -> Dict:
        """
        评估知识图谱质量

        Returns:
            {
                "completeness": 完整性,
                "consistency": 一致性,
                "accuracy": 准确性,
                "connectivity": 连通性,
                "overall": 综合分数
            }
        """
        metrics = {}

        # 完整性
        metrics["completeness"] = self._assess_completeness(kg)

        # 一致性
        metrics["consistency"] = self._assess_consistency(kg)

        # 准确性
        metrics["accuracy"] = self._assess_accuracy(kg)

        # 连通性
        metrics["connectivity"] = self._assess_connectivity(kg)

        # 综合分数
        metrics["overall"] = sum(metrics.values()) / len(metrics)

        return metrics

    def _assess_completeness(self, kg: InMemoryKnowledgeGraph) -> float:
        """评估完整性"""
        # 检查实体是否有必要属性
        entities = kg.get_entities()

        valid_count = sum(
            1 for e in entities
            if e.text and e.properties
        )

        return valid_count / len(entities) if entities else 1.0

    def _assess_consistency(self, kg: InMemoryKnowledgeGraph) -> float:
        """评估一致性"""
        # 检查关系是否有效
        relations = kg.get_relations()

        valid_count = 0
        for rel in relations:
            subject = kg.get_entity(rel.subject_id)
            object_entity = kg.get_entity(rel.object_id)

            if subject and object_entity:
                valid_count += 1

        return valid_count / len(relations) if relations else 1.0

    def _assess_accuracy(self, kg: InMemoryKnowledgeGraph) -> float:
        """评估准确性"""
        # 简化实现:检查文本质量
        entities = kg.get_entities()

        valid_count = sum(
            1 for e in entities
            if len(e.text) >= 2  # 至少2个字符
        )

        return valid_count / len(entities) if entities else 1.0

    def _assess_connectivity(self, kg: InMemoryKnowledgeGraph) -> float:
        """评估连通性"""
        entities = kg.get_entities()

        if len(entities) <= 1:
            return 1.0

        # 计算连通的实体数量
        visited = set()

        def dfs(entity_id):
            if entity_id in visited:
                return

            visited.add(entity_id)

            for neighbor in kg.get_neighbors(entity_id):
                dfs(neighbor.id)

        # 从第一个实体开始DFS
        if entities:
            dfs(entities[0].id)

        connectivity = len(visited) / len(entities)

        return connectivity

8. 实现示例

8.1 完整知识图谱构建流程

"""
完整知识图谱构建流程

1. 数据采集
2. 实体识别
3. 关系抽取
4. 知识存储
5. 知识推理
6. 质量评估
"""

class KnowledgeGraphBuilder:
    """知识图谱构建器"""

    def __init__(
        self,
        storage_type: str = "memory"  # memory/neo4j
    ):
        self.storage_type = storage_type

        # 初始化组件
        self.entity_extractor = ModelBasedEntityExtractor()
        self.relation_extractor = ModelBasedRelationExtractor()

        # 初始化存储
        if storage_type == "memory":
            self.kg = InMemoryKnowledgeGraph()
        elif storage_type == "neo4j":
            self.kg = Neo4jKnowledgeGraph()
        else:
            raise ValueError(f"Unknown storage type: {storage_type}")

    def build_from_text(self, text: str) -> Dict:
        """
        从文本构建知识图谱

        Returns:
            {
                "entities": 实体数量,
                "relations": 关系数量,
                "quality": 质量指标
            }
        """
        print("=== 开始构建知识图谱 ===")

        # 1. 实体识别
        print("\n[1/4] 实体识别")
        entities = self.entity_extractor.extract(text)
        print(f"    识别到 {len(entities)} 个实体")
        for entity in entities:
            print(f"      - {entity.text} ({entity.type.value})")

        # 2. 添加实体到知识图谱
        print("\n[2/4] 添加实体")
        for entity in entities:
            self.kg.add_entity(entity)

        # 3. 关系抽取
        print("\n[3/4] 关系抽取")
        relations = self.relation_extractor.extract(text, entities)
        print(f"    抽取到 {len(relations)} 个关系")
        for relation in relations[:5]:
            subject = self.kg.get_entity(relation.subject_id)
            object_entity = self.kg.get_entity(relation.object_id)
            print(f"      - {subject.text} -> {relation.relation_type.value} -> {object_entity.text}")

        # 4. 添加关系到知识图谱
        print("\n[4/4] 添加关系")
        for relation in relations:
            self.kg.add_relation(relation)

        # 5. 质量评估
        print("\n[5/5] 质量评估")
        if isinstance(self.storage_type, str) and self.storage_type == "memory":
            assessor = KnowledgeGraphQualityAssessor()
            quality = assessor.assess(self.kg)
            print(f"    完整性: {quality['completeness']:.2f}")
            print(f"    一致性: {quality['consistency']:.2f}")
            print(f"    准确性: {quality['accuracy']:.2f}")
            print(f"    连通性: {quality['connectivity']:.2f}")
            print(f"    综合分数: {quality['overall']:.2f}")
        else:
            quality = None

        print("\n=== 知识图谱构建完成 ===")

        return {
            "entities": len(entities),
            "relations": len(relations),
            "quality": quality
        }

    def query(self, query: str) -> Dict:
        """查询知识图谱"""
        # 识别查询实体
        entities = self.entity_extractor.extract(query)

        if not entities:
            return {"answer": "无法识别相关实体"}

        main_entity = entities[0]

        # 获取邻居
        neighbors = self.kg.get.get_neighbors(main_entity.id)

        # 获取关系
        relations = self.kg.get_relations(subject_id=main_entity.id)

        return {
            "query": query,
            "entity": {
                "text": main_entity.text,
                "type": main_entity.type.value
            },
            "neighbors": [
                {"text": n.text, "type": n.type.value}
                for n in neighbors
            ],
            "relations": len(relations)
        }

# ============== 使用示例 ==============

if __name__ == "__main__":
    # 创建构建器
    builder = KnowledgeGraphBuilder(storage_type="memory")

    # 示例文本
    text = """
    Python是一种高级编程语言,由Guido van Rossum在1989年创建。
    Guido在荷兰的CWI研究所开发了Python。
    Python被广泛用于Web开发、数据科学和人工智能领域。
    Guido曾在Google工作,后来加入了Dropbox。
    Python的最新版本是3.12。
    """

    # 构建知识图谱
    result = builder.build_from_text(text)

    # 查询
    query = "Guido在哪里工作?"
    print(f"\n查询: {query}")
    answer = builder.query(query)
    print(f"结果: {answer}")

面试高频问法

Q1: 如何构建一个知识图谱?

标准回答:

知识图谱构建流程:

1. 数据采集
   - 文档数据
   - 结构化数据(数据库、API)
   - 网络数据

2. 实体识别
   - 基于规则:正则匹配
   - 基于模型:NER模型、LLM
   - 实体链接:消歧、链接到知识库

3. 关系抽取
   - 基于规则:模式匹配
   - 基于模型:关系分类模型、LLM
   - 多跳关系:路径抽取

4. 知识存储
   - 内存存储:小规模、快速
   - 图数据库:Neo4j、JanusGraph
   - 关系数据库:PostgreSQL

5. 知识推理
   - 规则推理:传递性、层次性
   - 路径推理:查找路径
   - 逻辑推理:谓词逻辑

实现:
```python
# 1. 实体识别
extractor = ModelBasedEntityExtractor()
entities = extractor.extract(text)

# 2. 关系抽取
relation_extractor = ModelBasedRelationExtractor()
relations = relation_extractor.extract(text, entities)

# 3. 存储
kg = InMemoryKnowledgeGraph()
for entity in entities:
    kg.add_entity(entity)

for relation in relations:
    kg.add_relation(relation)</code></pre>
<p>```</p>
<h3>Q2: 知识图谱在RAG中如何应用?</h3>
<p>标准回答:</p>
<pre><code>知识图谱增强RAG:

1. 实体识别
   - 从查询中识别实体
   - 识别文档中的实体

2. 关系扩展
   - 查找实体间的关系
   - 扩展相关实体
   - 构建实体图

3. 路径检索
   - 查找实体间的路径
   - 收集路径上的信息
   - 提供多跳上下文

4. 层级检索
   - 实体级:直接匹配
   - 关系级:相关实体
   - 邻居级:扩展检索

实现:
```python
def kg_enhanced_rag(query, kg, vector_db):
    # Step 1: 识别实体
    entities = extract_entities(query)

    # Step 2: 查找相关实体和关系
    related_entities = []
    for entity in entities:
        related_entities.extend(
            kg.get_neighbors(entity.id)
        )

    # Step 3: 扩展查询
    expanded_query = query
    for entity in related_entities:
        expanded_query += " " + entity.text

    # Step 4: 向量检索
    results = vector_db.search(expanded_query)

    return results

### Q3: 如何评估知识图谱的质量?

标准回答:

知识图谱质量评估维度:

  1. 完整性

    • 实体覆盖率
    • 关系覆盖率
    • 属性完整性
  2. 一致性

    • 无矛盾的关系
    • 数据类型一致
    • 约束满足
  3. 准确性

    • 实体识别准确率
    • 关系抽取准确率
    • 属性值准确率
  4. 连通性

    • 连通分量数量
    • 孤立节点比例
    • 平均路径长度
  5. 可用性

    • 查询性能
    • 更新性能
    • 存储效率

实现:

def assess_kg_quality(kg):
    metrics = {}

    # 完整性
    entities = kg.get_entities()
    valid_entities = sum(
        1 for e in entities
        if e.text and e.properties
    )
    metrics["completeness"] = valid_entities / len(entities)

    # 一致性
    relations = kg.get_relations()
    valid_relations = sum(
        1 for r in relations
        if kg.get_entity(r.subject_id) and
           kg.get_entity(r.object_id)
    )
    metrics["consistency"] = valid_relations / len(relations)

    # 连通性
    visited = set()
    dfs(entities[0].id, visited)
    metrics["connectivity"] = len(visited) / len(entities)

    # 综合分数
    metrics["overall"] = sum(metrics.values()) / len(metrics)

    return metrics


---

## 总结

### 知识图谱核心要点

| 要点 | 策略 |
|------|------|
| **实体识别** | 规则+模型结合 |
| **关系抽取** | 模式匹配+LLM |
| **知识存储** | 图数据库优先 |
| **知识推理** | 规则+路径 |
| **质量评估** | 多维度指标 |

### 最佳实践

1. **分步构建**:实体→关系→验证
2. **多源融合**:整合不同数据源
3. **增量更新**:支持知识演化
4. **质量管控**:持续评估优化
5. **应用导向**:根据场景设计图谱
close
arrow_upward