【AI Agent 知识库】23-业务评测体系

内容纲要

业务评测体系

目录

  1. 评测体系概述
  2. 离线评测
  3. 在线评测
  4. 评测指标
  5. 评测工具
  6. A/B测试
  7. 评测流程
  8. 实现示例

1. 评测体系概述

1.1 评测架构

┌─────────────────────────────────────────────────────┐
│              业务评测体系架构                        │
├─────────────────────────────────────────────────────┤
│                                                     │
│  ┌──────────────┐    ┌──────────────┐            │
│  │  离线评测    │    │  在线评测     │            │
│  │ (Offline)   │    │  (Online)   │            │
│  └──────┬───────┘    └──────┬───────┘            │
│         │                    │                    │
│         ▼                    ▼                    │
│  ┌──────────────────────────────────────┐         │
│  │           评测数据集               │         │
│  │  - Golden Set                    │         │
│  │  - 真实用户数据                    │         │
│  │  - 人工标注数据                    │         │
│  └──────────────────────────────────────┘         │
│         │                    ▲                    │
│         ▼                    │                    │
│  ┌──────────────────────────────────────┐         │
│  │           评测指标                 │         │
│  │  - 准确性指标                    │         │
│  │  - 性能指标                      │         │
│  │  - 业务指标                      │         │
│  └──────────────────────────────────────┘         │
│         │                                          │
│         ▼                                          │
│  ┌──────────────────────────────────────┐         │
│  │           评测报告                 │         │
│  └──────────────────────────────────────┘         │
│                                                     │
└─────────────────────────────────────────────────────┘

1.2 评测类型对比

类型 特点 使用场景 数据来源
离线评测 批量处理、快速迭代 模型调优、功能验证 Golden Set
在线评测 真实用户、实时反馈 上线后监控、A/B测试 用户日志
人工评测 高质量、成本高 关键功能验证 人工标注
自动评测 效率高、可重复 日常测试、回归 自动生成数据

1.3 评测目标

┌─────────────────────────────────────────────────┐
│              评测目标                          │
├─────────────────────────────────────────────────┤
│                                                     │
│  准确性目标                                         │
│  ✓ 答案正确率 > 90%                               │
│  ✓ 忠实度 > 85%                                    │
│  ✓ 相关性 > 90%                                    │
│                                                     │
│  性能目标                                           │
│  ✓ P95延迟 < 3s                                    │
│  ✓ 吞吐量 > 100 QPS                                 │
│  ✓ 错误率 < 1%                                     │
│                                                     │
│  业务目标                                           │
│  ✓ 用户满意度 > 85%                                 │
│  ✓ 问题解决率 > 80%                                │
│  ✓ 成本控制在预算内                                 │
│                                                     │
└─────────────────────────────────────────────────┘

2. 离线评测

2.1 Golden Set构建

from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime
import json

@dataclass
class TestCase:
    """测试用例"""
    id: str
    query: str  # 问题
    ground_truth: Dict  # 标准答案
    category: str  # 类别
    difficulty: str  # 难度:easy/medium/hard
    metadata: Dict = None
    created_at: datetime = None

    def __post_init__(self):
        if self.metadata is None:
            self.metadata = {}
        if self.created_at is None:
            self.created_at = datetime.utcnow()

class GoldenSet:
    """Golden Set - 标准测试集"""

    def __init__(self, name: str):
        self.name = name
        self.test_cases: Dict[str, TestCase] = {}

    def add_case(self, case: TestCase):
        """添加测试用例"""
        self.test_cases[case.id] = case

    def get_case(self, case_id: str) -> Optional[TestCase]:
        """获取测试用例"""
        return self.test_cases.get(case_id)

    def get_by_category(self, category: str) -> List[TestCase]:
        """按类别获取"""
        return [
            case for case in self.test_cases.values()
            if case.category == category
        ]

    def get_by_difficulty(self, difficulty: str) -> List[TestCase]:
        """按难度获取"""
        return [
            case for case in self.test_cases.values()
            if case.difficulty == difficulty
        ]

    def get_all(self) -> List[TestCase]:
        """获取所有用例"""
        return list(self.test_cases.values())

    def count(self) -> int:
        """用例数量"""
        return len(self.test_cases)

    def save(self, file_path: str):
        """保存到文件"""
        data = {
            "name": self.name,
            "test_cases": [
                {
                    "id": case.id,
                    "query": case.query,
                    "ground_truth": case.ground_truth,
                    "category": case.category,
                    "difficulty": case.difficulty,
                    "metadata": case.metadata,
                    "created_at": case.created_at.isoformat()
                }
                for case in self.test_cases.values()
            ]
        }

        with open(file_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)

    @classmethod
    def load(cls, file_path: str) -> 'GoldenSet':
        """从文件加载"""
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)

        golden_set = cls(data["name"])

        for case_data in data["test_cases"]:
            case = TestCase(
                id=case_data["id"],
                query=case_data["query"],
                ground_truth=case_data["ground_truth"],
                category=case_data["category"],
                difficulty=case_data["difficulty"],
                metadata=case_data.get("metadata", {}),
                created_at=datetime.fromisoformat(case_data["created_at"])
            )
            golden_set.add_case(case)

        return golden_set

# ============== 使用示例 ==============

if __name__ == "__main__":
    # 创建Golden Set
    golden_set = GoldenSet(name="llm_agent_eval")

    # 添加测试用例
    golden_set.add_case(TestCase(
        id="case_001",
        query="Python是什么?",
        ground_truth={
            "answer": "Python是一种高级编程语言",
            "type": "factual",
            "confidence": 1.0
        },
        category="programming",
        difficulty="easy"
    ))

    golden_set.add_case(TestCase(
        id="case_002",
        query="如何用Python读取文件?",
        ground_truth={
            "answer": "使用open()函数",
            "code_example": "with open('file.txt', 'r') as f:\n    content = f.read()",
            "type": "procedural"
        },
        category="programming",
        difficulty="medium"
    ))

    # 保存
    golden_set.save("./golden_set.json")

    # 加载
    loaded_set = GoldenSet.load("./golden_set.json")
    print(f"加载了 {loaded_set.count()} 个测试用例")

2.2 RAG评测

from typing import List, Dict

class RAGEvaluator:
    """RAG评测器"""

    def __init__(self, rag_system):
        self.rag_system = rag_system

    def evaluate(
        self,
        golden_set: GoldenSet
    ) -> Dict:
        """
        评测RAG系统

        Returns:
            {
                "retrieval_metrics": {...},
                "generation_metrics": {...},
                "overall_score": float
            }
        """
        results = []

        for case in golden_set.get_all():
            # 执行RAG查询
            response = self.rag_system.query(case.query)

            # 评估单个用例
            case_result = self._evaluate_case(case, response)
            results.append(case_result)

        # 计算聚合指标
        return self._aggregate_results(results)

    def _evaluate_case(
        self,
        case: TestCase,
        response: Dict
    ) -> Dict:
        """评估单个用例"""
        ground_truth = case.ground_truth

        # 检索评估
        retrieval_metrics = self._evaluate_retrieval(
            ground_truth,
            response
        )

        # 生成评估
        generation_metrics = self._evaluate_generation(
            ground_truth,
            response
        )

        return {
            "case_id": case.id,
            "retrieval_metrics": retrieval_metrics,
            "generation_metrics": generation_metrics,
            "overall": (
                retrieval_metrics["recall"] * 0.4 +
                retrieval_metrics["precision"] * 0.3 +
                generation_metrics["bleu"] * 0.3
            )
        }

    def _evaluate_retrieval(
        self,
        ground_truth: Dict,
        response: Dict
    ) -> Dict:
        """评估检索质量"""
        retrieved_docs = response.get("sources", [])
        expected_docs = ground_truth.get("relevant_docs", [])

        if not expected_docs:
            return {"recall": 1.0, "precision": 1.0, "ndcg": 1.0}

        # 计算Recall
        retrieved_ids = {doc.get("id") for doc in retrieved_docs}
        expected_ids = set(expected_docs)
        hit = len(retrieved_ids & expected_ids)

        recall = hit / len(expected_ids) if expected_ids else 0
        precision = hit / len(retrieved_ids) if retrieved_docs else 0

        # 计算NDCG
        ndcg = self._calculate_ndcg(retrieved_docs, expected_ids)

        return {
            "recall": recall,
            "precision": precision,
            "ndcg": ndcg
        }

    def _calculate_ndcg(
        self,
        retrieved_docs: List[Dict],
        expected_ids: set
    ) -> float:
        """计算NDCG"""
        dcg = 0
        for i, doc in enumerate(retrieved_docs):
            if doc.get("id") in expected_ids:
                dcg += 1 / (i + 1)

        # 理想DCG
        ideal_dcg = sum(1 / (i + 1) for i in range(len(expected_ids)))

        return dcg / ideal_dcg if ideal_dcg > 0 else 0

    def _evaluate_generation(
        self,
        ground_truth: Dict,
        response: Dict
    ) -> Dict:
        """评估生成质量"""
        predicted = response.get("answer", "")
        reference = ground_truth.get("answer", "")

        # BLEU分数
        bleu = self._calculate_bleu(predicted, reference)

        # 准确性
        accuracy = self._calculate_accuracy(predicted, reference)

        return {
            "bleu": bleu,
            "accuracy": accuracy
        }

    def _calculate_bleu(self, predicted: str, reference: str) -> float:
        """计算BLEU分数(简化)"""
        # 简化实现,实际应使用nltk或sacreble
        pred_words = predicted.split()
        ref_words = reference.split()

        if not pred_words:
            return 0

        # 1-gram匹配
        pred_set = set(pred_words)
        ref_set = set(ref_words)
        match = len(pred_set & ref_set)

        precision = match / len(pred_set)
        recall = match / len(ref_set) if ref_set else 0

        # F1
        return 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

    def _calculate_accuracy(self, predicted: str, reference: str) -> float:
        """计算准确率(基于关键词匹配)"""
        import re
        ref_keywords = set(re.findall(r'\w+', reference.lower()))
        pred_keywords = set(re.findall(r'\w+', predicted.lower()))

        if not ref_keywords:
            return 1.0

        match = len(ref_keywords & pred_keywords)
        return match / len(ref_keywords)

    def _aggregate_results(self, results: List[Dict]) -> Dict:
        """聚合所有结果"""
        retrieval_metrics = {
            "recall": [],
            "precision": [],
            "ndcg": []
        }

        generation_metrics = {
            "bleu": [],
            "accuracy": []
        }

        for result in results:
            for key in retrieval_metrics:
                retrieval_metrics[key].append(
                    result["retrieval_metrics"][key]
                )

            for key in generation_metrics:
                generation_metrics[key].append(
                    result["generation_metrics"][key]
                )

        # 计算平均值
        def mean(values):
            return sum(values) / len(values) if values else 0

        return {
            "retrieval_metrics": {
                "recall": mean(retrieval_metrics["recall"]),
                "precision": mean(retrieval_metrics["precision"]),
                "ndcg": mean(retrieval_metrics["ndcg"])
            },
            "generation_metrics": {
                "bleu": mean(generation_metrics["bleu"]),
                "accuracy": mean(generation_metrics["accuracy"])
            },
            "overall_score": mean([r["overall"] for r in results])
        }

2.3 Agent评测

class AgentEvaluator:
    """Agent评测器"""

    def __init__(self, agent):
        self.agent = agent

    def evaluate(
        self,
        golden_set: GoldenSet
    ) -> Dict:
        """评测Agent"""
        results = []

        for case in golden_set.get_all():
            result = self._evaluate_case(case)
            results.append(result)

        return self._aggregate_results(results)

    def _evaluate_case(self, case: TestCase) -> Dict:
        """评估单个用例"""
        # 执行Agent
        agent_response = self.agent.run(case.query)

        # 获取工具调用
        tool_calls = agent_response.get("tool_calls", [])
        final_answer = agent_response.get("final_answer", "")

        # 评估
        metrics = {
            "tool_selection": self._evaluate_tool_selection(
                case,
                tool_calls
            ),
            "answer_quality": self._evaluate_answer_quality(
                case,
                final_answer
            ),
            "efficiency": self._evaluate_efficiency(
                agent_response
            ),
            "safety": self._evaluate_safety(agent_response)
        }

        # 计算综合分数
        metrics["overall"] = (
            metrics["tool_selection"]["score"] * 0.25 +
            metrics["answer_quality"]["score"] * 0.35 +
            metrics["efficiency"]["score"] * 0.2 +
            metrics["safety"]["score"] * 0.2
        )

        return {
            "case_id": case.id,
            "metrics": metrics,
            "agent_response": agent_response
        }

    def _evaluate_tool_selection(
        self,
        case: TestCase,
        tool_calls: List[Dict]
    ) -> Dict:
        """评估工具选择"""
        expected_tools = case.ground_truth.get("expected_tools", [])

        if not expected_tools:
            return {"score": 1.0, "reason": "无预期工具"}

        called_tools = [call.get("tool") for call in tool_calls]

        # 计算召回率
        expected_set = set(expected_tools)
        called_set = set(called_tools)

        recall = len(expected_set & called_set) / len(expected_set)
        precision = len(expected_set & called_set) / len(called_set) if called_set else 0

        return {
            "score": (recall + precision) / 2,
            "recall": recall,
            "precision": precision,
            "expected": expected_tools,
            "called": called_tools
        }

    def _evaluate_answer_quality(
        self,
        case: TestCase,
        answer: str
    ) -> Dict:
        """评估答案质量"""
        expected_answer = case.ground_truth.get("answer", "")

        # 相似度
        similarity = self._semantic_similarity(answer, expected_answer)

        # 包含关键信息
        key_info = case.ground_truth.get("key_info", [])
        key_info_coverage = sum(
            1 for info in key_info if info in answer
        ) / len(key_info) if key_info else 1.0

        return {
            "score": (similarity + key_info_coverage) / 2,
            "similarity": similarity,
            "key_info_coverage": key_info_coverage
        }

    def _evaluate_efficiency(
        self,
        response: Dict
    ) -> Dict:
        """评估效率"""
        iterations = response.get("iterations", 0)
        tokens_used = response.get("tokens_used", 0)

        # 迭代次数评分(越少越好)
        iteration_score = max(0, 1 - iterations / 10)

        # Token使用评分
        token_score = max(0, 1 - tokens_used / 5000)

        return {
            "score": (iteration_score + token_score) / 2,
            "iterations": iterations,
            "tokens_used": tokens_used
        }

    def _evaluate_safety(self, response: Dict) -> Dict:
        """评估安全性"""
        # 检查是否有危险操作
        dangerous_calls = [
            call for call in response.get("tool_calls", [])
            if call.get("tool") in ["delete", "execute", "eval"]
        ]

        # 检查答案是否包含敏感信息
        answer = response.get("final_answer", "")
        sensitive_patterns = [
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',  # 邮箱
            r'\b1[3-9]\d{9}\b',  # 手机号
        ]
        has_sensitive = any(
            re.search(pattern, answer)
            for pattern in sensitive_patterns
        )

        score = 1.0
        if dangerous_calls:
            score -= 0.5
        if has_sensitive:
            score -= 0.5

        return {
            "score": max(0, score),
            "dangerous_calls": len(dangerous_calls),
            "has_sensitive_info": has_sensitive
        }

    def _semantic_similarity(self, text1: str, text2: str) -> float:
        """计算语义相似度"""
        # 简化实现:使用关键词重叠
        import re
        words1 = set(re.findall(r'\w+', text1.lower()))
        words2 = set(re.findall(r'\w+', text2.lower()))

        if not words1 or not words2:
            return 0

        intersection = len(words1 & words2)
        union = len(words1 | words2)

        return intersection / union

3. 在线评测

3.1 用户反馈收集

from typing import List, Dict, Optional
from datetime import datetime
from enum import Enum

class FeedbackType(Enum):
    """反馈类型"""
    POSITIVE = "positive"
    NEGATIVE = "negative"
    NEUTRAL = "neutral"

class UserFeedbackCollector:
    """用户反馈收集器"""

    def __init__(self, storage=None):
        self.storage = storage or {}
        self._feedbacks = []

    def add_feedback(
        self,
        user_id: str,
        session_id: str,
        query: str,
        answer: str,
        feedback_type: FeedbackType,
        score: Optional[int] = None,
        comment: Optional[str] = None,
        metadata: Dict = None
    ):
        """添加用户反馈"""
        feedback = {
            "id": self._generate_id(),
            "user_id": user_id,
            "session_id": session_id,
            "query": query,
            "answer": answer,
            "feedback_type": feedback_type.value,
            "score": score,
            "comment": comment,
            "metadata": metadata or {},
            "created_at": datetime.utcnow().isoformat()
        }

        self._feedbacks.append(feedback)
        self._save_feedback(feedback)

    def get_feedback(
        self,
        user_id: str = None,
        session_id: str = None,
        feedback_type: FeedbackType = None,
        limit: int = 100
    ) -> List[Dict]:
        """获取反馈"""
        filtered = self._feedbacks.copy()

        if user_id:
            filtered = [f for f in filtered if f["user_id"] == user_id]

        if session_id:
            filtered = [f for f in filtered if f["session_id"] == session_id]

        if feedback_type:
            filtered = [f for f in filtered if f["feedback_type"] == feedback_type.value]

        return filtered[-limit:]

    def calculate_metrics(
        self,
        start_time: datetime = None,
        end_time: datetime = None
    ) -> Dict:
        """计算指标"""
        feedbacks = self._feedbacks

        # 时间过滤
        if start_time or end_time:
            feedbacks = [
                f for f in feedbacks
                if self._is_in_time_range(f, start_time, end_time)
            ]

        if not feedbacks:
            return {}

        total = len(feedbacks)
        positive = sum(1 for f in feedbacks if f["feedback_type"] == FeedbackType.POSITIVE.value)
        negative = sum(1 for f in feedbacks if f["feedback_type"] == FeedbackType.NEGATIVE.value)

        # 平均分数
        scores = [f["score"] for f in feedbacks if f["score"] is not None]
        avg_score = sum(scores) / len(scores) if scores else None

        return {
            "total_feedbacks": total,
            "positive_rate": positive / total if total > 0 else 0,
            "negative_rate": negative / total if total > 0 else 0,
            "average_score": avg_score
        }

    def _generate_id(self) -> str:
        """生成ID"""
        import uuid
        return str(uuid.uuid4())

    def _save_feedback(self, feedback: Dict):
        """保存反馈"""
        # 简化实现:实际应存储到数据库
        pass

    def _is_in_time_range(
        self,
        feedback: Dict,
        start_time: datetime = None,
        end_time: datetime = None
    ) -> bool:
        """检查是否在时间范围内"""
        feedback_time = datetime.fromisoformat(feedback["created_at"])

        if start_time and feedback_time < start_time:
            return False

        if end_time and feedback_time > end_time:
            return False

        return True

3.2 实时监控

import time
from collections import deque
from threading import Thread
from typing import Callable

class RealTimeMonitor:
    """实时监控器"""

    def __init__(
        self,
        alert_thresholds: Dict = None
    ):
        self.alert_thresholds = alert_thresholds or {
            "error_rate": 0.05,
            "latency_p95": 5.0,
            "qps": 100
        }

        # 指标存储
        self.metrics = {
            "requests": deque(maxlen=10000),
            "latencies": deque(maxlen=10000),
            "errors": deque(maxlen=10000),
        }

        # 告警回调
        self.alert_callbacks: List[Callable] = []

        # 监控线程
        self._running = False
        self._monitor_thread = None

    def record_request(self, latency: float, error: bool = False):
        """记录记录请求"""
        timestamp = time.time()

        self.metrics["requests"].append(timestamp)
        self.metrics["latencies"].append(latency)

        if error:
            self.metrics["errors"].append(timestamp)

    def get_current_metrics(self, window: int = 60) -> Dict:
        """获取当前指标"""
        current_time = time.time()
        window_start = current_time - window

        # 统计窗口内的请求
        requests = [
            t for t in self.metrics["requests"]
            if t >= window_start
        ]

        # 统计延迟
        latencies = [
            l for l, t in zip(
                self.metrics["latencies"],
                self.metrics["requests"]
            ) if t >= window_start
        ]

        # 统计错误
        errors = [
            t for t in self.metrics["errors"]
            if t >= window_start
        ]

        # 计算指标
        qps = len(requests) / window if window > 0 else 0
        error_rate = len(errors) / len(requests) if requests else 0

        if latencies:
            avg_latency = sum(latencies) / len(latencies)
            sorted_latencies = sorted(latencies)
            p50_latency = sorted_latencies[len(sorted_latencies) // 2]
            p95_latency = sorted_latencies[int(len(sorted_latencies) * 0.95)]
        else:
            avg_latency = 0
            p50_latency = 0
            p95_latency = 0

        return {
            "qps": qps,
            "avg_latency": avg_latency,
            "p50_latency": p50_latency,
            "p95_latency": p95_latency,
            "error_rate": error_rate,
            "total_requests": len(requests),
            "total_errors": len(errors)
        }

    def add_alert_callback(self, callback: Callable):
        """添加告警回调"""
        self.alert_callbacks.append(callback)

    def start_monitoring(self, interval: int = 5):
        """开始监控"""
        self._running = True
        self._monitor_thread = Thread(
            target=self._monitor_loop,
            args=(interval,),
            daemon=True
        )
        self._monitor_thread.start()

    def stop_monitoring(self):
        """停止监控"""
        self._running = False
        if self._monitor_thread:
            self._monitor_thread.join()

    def _monitor_loop(self, interval: int):
        """监控循环"""
        while self._running:
            metrics = self.get_current_metrics()
            self._check_alerts(metrics)
            time.sleep(interval)

    def _check_alerts(self, metrics: Dict):
        """检查告警"""
        alerts = []

        # 错误率告警
        if metrics["error_rate"] > self.alert_thresholds["error_rate"]:
            alerts.append({
                "type": "error_rate",
                "value": metrics["error_rate"],
                "threshold": self.alert_thresholds["error_rate"]
            })

        # 延迟告警
        if metrics["p95_latency"] > self.alert_thresholds["latency_p95"]:
            alerts.append({
                "type": "latency",
                "value": metrics["p95_latency"],
                "threshold": self.alert_thresholds["latency_p95"]
            })

        # QPS告警
        if metrics["qps"] < self.alert_thresholds["qps"]:
            alerts.append({
                "type": "low_qps",
                "value": metrics["qps"],
                "threshold": self.alert_thresholds["qps"]
            })

        # 触发告警回调
        for alert in alerts:
            for callback in self.alert_callbacks:
                callback(alert)

4. 评测指标

4.1 指标定义

class MetricsRegistry:
    """指标注册表"""

    # RAG指标
    RAG_METRICS = {
        "recall": {
            "name": "召回率",
            "description": "正确文档被检索到的比例",
            "range": [0, 1],
            "higher_better": True
        },
        "precision": {
            "name": "精确率",
            "description": "检索结果中正确文档的比例",
            "range": [0, 1],
            "higher_better": True
        },
        "ndcg": {
            "name": "NDCG",
            "description": "归一化折损累积增益",
            "range": [0, 1],
            "higher_better": True
        },
        "faithfulness": {
            "name": "忠实度",
            "description": "答案基于检索内容的程度",
            "range": [0, 1],
            "higher_better": True
        },
        "answer_relevance": {
            "name": "答案相关性",
            "description": "答案与问题的相关程度",
            "range": [0, 1],
            "higher_better": True
        }
    }

    # Agent指标
    AGENT_METRICS = {
        "task_completion": {
            "name": "任务完成率",
            "description": "成功完成任务的比例",
            "range": [0, 1],
            "higher_better": True
        },
        "tool_accuracy": {
            "name": "工具选择准确率",
            "description": "正确选择工具的比例",
            "range": [0, 1],
            "higher_better": True
        },
        "efficiency": {
            "name": "效率分数",
            "description": "任务执行效率",
            "range": [0, 1],
            "higher_better": True
        },
        "safety": {
            "name": "安全性分数",
            "description": "安全操作的比例",
            "range": [0, 1],
            "higher_better": True
        }
    }

    # 性能指标
    PERFORMANCE_METRICS = {
        "latency_p50": {
            "name": "P50延迟",
            "description": "50分位延迟",
            "range": [0, float('inf')],
            "higher_better": False
        },
        "latency_p95": {
            "name": "P95延迟",
            "description": "95分位延迟",
            "range": [0, float('inf')],
            "higher_better": False
        },
        "qps": {
            "name": "每秒查询数",
            "description": "系统吞吐量",
            "range": [0, float('inf')],
            "higher_better": True
        },
        "error_rate": {
            "name": "错误率",
            "description": "请求错误的比例",
            "range": [0, 1],
            "higher_better": False
        }
    }

    @classmethod
    def get_metric(cls, metric_name: str) -> Optional[Dict]:
        """获取指标定义"""
        all_metrics = {
            **cls.RAG_METRICS,
            **cls.AGENT_METRICS,
            **cls.PERFORMANCE_METRICS
        }
        return all_metrics.get(metric_name)

4.2 指标计算

import numpy as np
from typing import List

class MetricsCalculator:
    """指标计算器"""

    @staticmethod
    def calculate_recall(
        retrieved: List[str],
        relevant: List[str]
    ) -> float:
        """计算召回率"""
        if not relevant:
            return 1.0

        retrieved_set = set(retrieved)
        relevant_set = set(relevant)

        hit = len(retrieved_set & relevant_set)
        return hit / len(relevant_set)

    @staticmethod
    def calculate_precision(
        retrieved: List[str],
        relevant: List[str]
    ) -> float:
        """计算精确率"""
        if not retrieved:
            return 0.0

        retrieved_set = set(retrieved)
        relevant_set = set(relevant)

        hit = len(retrieved_set & relevant_set)
        return hit / len(retrieved)

    @staticmethod
    def calculate_f1(recall: float, precision: float) -> float:
        """计算F1分数"""
        if recall + precision == 0:
            return 0.0

        return 2 * recall * precision / (recall + precision)

    @staticmethod
    def calculate_mrr(rankings: List[int]) -> float:
        """
        计算MRR(平均倒数排名)

        Args:
            rankings: 相关文档的排名列表
        """
        if not rankings:
            return 0.0

        reciprocal_ranks = [1 / r for r in rankings]
        return sum(reciprocal_ranks) / len(rankings)

    @staticmethod
    def calculate_ndcg(
        rankings: List[bool],
        k: int = 10
    ) -> float:
        """
        计算NDCG

        Args:
            rankings: 是否相关的布尔列表
            k: 截断位置
        """
        # DCG
        dcg = 0
        for i, is_relevant in enumerate(rankings[:k]):
            if is_relevant:
                dcg += 1 / np.log2(i + 2)

        # 理想DCG
        ideal_rankings = sorted(rankings[:k], reverse=True)
        ideal_dcg = 0
        for i, is_relevant in enumerate(ideal_rankings):
            if is_relevant:
                ideal_dcg += 1 / np.log2(i + 2)

        return dcg / ideal_dcg if ideal_dcg > 0 else 0

    @staticmethod
    def calculate_percentile(values: List[float], percentile: float) -> float:
        """计算百分位数"""
        if not values:
            return 0.0

        sorted_values = sorted(values)
        index = int(len(sorted_values) * percentile / 100)
        return sorted_values[index]

5. 评测工具

5.1 RAGAS集成

try:
    from ragas import evaluate
    from ragas.metrics import (
        faithfulness,
        answer_relevance,
        context_precision,
        context_recall
    )
    RAGAS_AVAILABLE = True
except ImportError:
    RAGAS_AVAILABLE = False

class RAGASEvaluator:
    """RAGAS评测器"""

    def __init__(self):
        if not RAGAS_AVAILABLE:
            raise ImportError("RAGAS not installed. Install with: pip install ragas")

        self.metrics = [
            faithfulness,
            answer_relevance,
            context_precision,
            context_recall
        ]

    def evaluate(
        self,
        golden_set: GoldenSet,
        rag_system
    ) -> Dict:
        """
        使用RAGAS评测

        Args:
            golden_set: 标准测试集
            rag_system: RAG系统

        Returns:
            评测结果
        """
        # 准备数据集
        dataset = []

        for case in golden_set.get_all():
            # 执行RAG查询
            response = rag_system.query(case.query)

            # 构建RAGAS格式
            dataset.append({
                "question": case.query,
                "answer": response.get("answer"),
                "contexts": [doc.get("content") for doc in response.get("sources", [])],
                "ground_truth": case.ground_truth.get("answer"),
                "ground_truths": [case.ground_truth.get("answer")]
            })

        # 执行评测
        result = evaluate(dataset=dataset, metrics=self.metrics)

        return result.to_pandas().to_dict()

5.2 自定义评测器

class CustomEvaluator:
    """自定义评测器"""

    def __init__(self, llm=None):
        self.llm = llm

    def evaluate_quality(self, question: str, answer: str) -> Dict:
        """
        评估答案质量

        使用LLM评估
        """
        prompt = f"""评估以下问答对的质量。

问题:{question}
答案:{answer}

请从以下维度评估(1-5分):
1. 准确性:答案是否准确
2. 完整性:答案是否完整
3. 相关性:答案是否相关
4. 清晰性:答案是否清晰

请以JSON格式输出:
{{
    "accuracy": 5,
    "completeness": 4,
    "relevance": 5,
    "clarity": 4,
    "overall": 4.5,
    "reason": "评估原因"
}}"""

        response = self.llm.generate(prompt)

        import json
        try:
            return json.loads(response)
        except:
            return {"overall": 3.0, "reason": "解析失败"}

6. A/B测试

6.1 A/B测试框架

from typing import Dict, List, Callable
import uuid
from datetime import datetime
from enum import Enum

class Variant:
    """测试变体"""
    def __init__(
        self,
        name: str,
        config: Dict,
        traffic_ratio: float = 0.5
    ):
        self.name = name
        self.config = config
        self.traffic_ratio = traffic_ratio

class ABTest:
    """A/B测试"""

    def __init__(
        self,
        name: str,
        variants: List[Variant],
        description: str = None
    ):
        self.id = str(uuid.uuid4())
        self.name = name
        self.variants = variants
        self.description = description or ""
        self.created_at = datetime.utcnow()
        self.is_active = False

        # 验证流量比例
        self._validate_traffic()

    def _validate_traffic(self):
        """验证流量总比例为1.0"""
        total_ratio = sum(v.traffic_ratio for v in self.variants)
        if abs(total_ratio - 1.0) > 0.01:
            raise ValueError(
                f"流量总比例应为1.0,当前为{total_ratio}"
            )

    def activate(self):
        """激活测试"""
        self.is_active = True

    def deactivate(self):
        """停用测试"""
        self.is_active = False

    def assign_variant(self, user_id: str = None) -> Variant:
        """
        分配变体

        Args:
            user_id: 用户ID,用于一致性哈希
        """
        if not self.is_active:
            raise RuntimeError("测试未激活")

        # 使用一致性哈希分配
        import hashlib

        seed = user_id or str(uuid.uuid4())
        hash_value = int(hashlib.md5(seed.encode()).hexdigest(), 16)

        # 根据流量比例分配
        cumulative = 0
        for variant in self.variants:
            cumulative += variant.traffic_ratio
            # 归一化hash值到[0, 1)
            normalized_hash = hash_value / (2 ** 128)
            if normalized_hash < cumulative:
                return variant

        return self.variants[-1]  # fallback

class ABTestManager:
    """A/B测试管理器"""

    def __init__(self):
        self.tests: Dict[str, ABTest] = {}

    def create_test(
        self,
        name: str,
        variants: List[Variant],
        description: str = None
    ) -> ABTest:
        """创建测试"""
        test = ABTest(name, variants, description)
        self.tests[test.id] = test
        return test

    def get_test(self, test_id: str) -> ABTest:
        """获取测试"""
        return self.tests.get(test_id)

    def list_tests(self) -> List[ABTest]:
        """列出所有测试"""
        return list(self.tests.values())

    def activate_test(self, test_id: str):
        """激活测试"""
        if test_id in self.tests:
            self.tests[test_id].activate()

    def deactivate_test(self, test_id: str):
        """停用测试"""
        if test_id in self.tests:
            self.tests[test_id].deactivate()

6.2 A/B测试结果分析

import numpy as np
from scipy import stats

class ABTestAnalyzer:
    """A/B测试结果分析器"""

    def analyze_conversion(
        self,
        variant_a: Dict,  # {"impressions": int, "conversions": int}
        variant_b: Dict,
        confidence: float = 0.95
    ) -> Dict:
        """
        分析转化率

        Args:
            variant_a: 变体A的指标
            variant_b: 变体B的指标
            confidence: 置信度

        Returns:
            {
                "variant_a_rate": float,
                "variant_b_rate": float,
                "lift": float,
                "lift_percent": float,
                "p_value": float,
                "is_significant": bool
            }
        """
        # 计算转化率
        rate_a = variant_a["conversions"] / variant_a["impressions"]
        rate_b = variant_b["conversions"] / variant_b["impressions"]

        # 计算提升
        lift = rate_b - rate_a
        lift_percent = (lift / rate_a) * 100 if rate_a > 0 else 0

        # Z检验
        p_a = rate_a
        p_b = rate_b
        n_a = variant_a["impressions"]
        n_b = variant_b["impressions"]

        pooled_p = (n_a * p_a + n_b * p) / (n_a + n_b)
        se = np.sqrt(
            pooled_p * (1 - pooled_p) * (1/n_a + 1/n_b)
        )

        z_score = (p_b - p_a) / se if se > 0 else 0
        p_value = 2 * (1 - stats.norm.cdf(abs(z_score)))

        return {
            "variant_a_rate": rate_a,
            "variant_b_rate": rate_b,
            "lift": lift,
            "lift_percent": lift_percent,
            "p_value": p_value,
            "is_significant": p_value < (1 - confidence)
        }

    def analyze_continuous(
        self,
        values_a: List[float],
        values_b: List[float],
        confidence: float = 0.95
    ) -> Dict:
        """
        分析连续指标(如延迟、评分)

        使用t检验
        """
        # 计算统计量
        mean_a = np.mean(values_a)
        mean_b = np.mean(values_b)
        std_a = np.std(values_a)
        std_b = np.std(values_b)

        # t检验
        t_stat, p_value = stats.ttest_ind(values_a, values_b)

        # 计算提升
        lift = mean_b - mean_a
        lift_percent = (lift / mean_a) * 100 if mean_a > 0 else 0

        return {
            "variant_a_mean": mean_a,
            "variant_b_mean": mean_b,
            "variant_a_std": std_a,
            "variant_b_std": std_b,
            "lift": lift,
            "lift_percent": lift_percent,
            "p_value": p_value,
            "is_significant": p_value < (1 - confidence)
        }

    def analyze_with_bootstrap(
        self,
        values_a: List[float],
        values_b: List[float],
        n_iterations: int = 10000
    ) -> Dict:
        """
        使用Bootstrap分析

        适用于非正态分布
        """
        lifts = []

        for _ in range(n_iterations):
            # 重采样
            sample_a = np.random.choice(values_a, size=len(values_a), replace=True)
            sample_b = np.random.choice(values_b, size=len(values_b), replace=True)

            # 计算提升
            mean_a = np.mean(sample_a)
            mean_b = np.mean(sample_b)
            lift = mean_b - mean_a
            lifts.append(lift)

        # 计算置信区间
        lifts_sorted = np.sort(lifts)
        lower = np.percentile(lifts_sorted, 2.5)
        upper = np.percentile(lifts_sorted, 97.5)

        return {
            "mean_lift": np.mean(lifts),
            "ci_lower": lower,
            "ci_upper": upper,
            "ci_95_zero": lower <= 0 <= upper  # 0是否在置信区间内
        }

7. 评测流程

7.1 完整评测流程

class EvaluationPipeline:
    """评测流程"""

    def __init__(
        self,
        golden_set: GoldenSet,
        system: object,
        config: Dict = None
    ):
        self.golden_set = golden_set
        self.system = system
        self.config = config or {}
        self.results = []

    def run(self) -> Dict:
        """执行完整评测流程"""
        print("=== 开始评测流程 ===")

        # Step 1: 准备
        print("\n[1/5] 准备")
        self._prepare()

        # Step 2: 执行
        print("\n[2/5] 执行")
        self._execute()

        # Step 3: 评估
        print("\n[3/5] 评估")
        metrics = self._evaluate()

        # Step 4: 分析
        print("\n[4/5] 分析")
        analysis = self._analyze()

        # Step 5: 报告
        print("\n[5/5] 报告")
        report = self._generate_report(metrics, analysis)

        print("\n=== 评测完成 ===")

        return report

    def _prepare(self):
        """准备阶段"""
        print(f"  - 测试用例数: {self.golden_set.count()}")
        print(f"  - 类别分布: {self._get_category_distribution()}")

    def _execute(self):
        """执行阶段"""
        from tqdm import tqdm

        for case in tqdm(
            self.golden_set.get_all(),
            desc="执行测试"
        ):
            result = self.system.run(case.query)
            self.results.append({
                "case": case,
                "result": result
            })

    def _evaluate(self) -> Dict:
        """评估阶段"""
        evaluator = RAGEvaluator(self.system)
        return evaluator.evaluate(self.golden_set)

    def _analyze(self) -> Dict:
        """分析阶段"""
        # 按类别分析
        category_analysis = {}

        for category in set(
            r["case"].category for r in self.results
        ):
            category_results = [
                r for r in self.results
                if r["case"].category == category
            ]
            category_analysis[category] = {
                "count": len(category_results),
                "success_rate": sum(
                    1 for r in category_results
                    if r["result"].get("success", False)
                ) / len(category_results)
            }

        # 按难度分析
        difficulty_analysis = {}

        for difficulty in set(
            r["case"].difficulty for r in self.results
        ):
            difficulty_results = [
                r for r in self.results
                if r["case"].difficulty == difficulty
            ]
            difficulty_analysis[difficulty] = {
                "count": len(difficulty_results),
                "success_rate": sum(
                    1 for r in difficulty_results
                    if r["result"].get("success", False)
                ) / len(difficulty_results)
            }

        return {
            "by_category": category_analysis,
            "by_difficulty": difficulty_analysis,
            "total": len(self.results)
        }

    def _generate_report(self, metrics: Dict, analysis: Dict) -> Dict:
        """生成报告"""
        return {
            "timestamp": datetime.utcnow().isoformat(),
            "test_set": self.golden_set.name,
            "test_count": self.golden_set.count(),
            "metrics": metrics,
            "analysis": analysis
        }

    def _get_category_distribution(self) -> Dict:
        """获取类别分布"""
        distribution = {}

        for case in self.golden_set.get_all():
            category = case.category
            distribution[category] = distribution.get(category, 0) + 1

        return distribution

8. 实现示例

8.1 完整评测示例

"""
完整业务评测示例

1. 构建Golden Set
2. 离线评测
3. A/B测试
4. 生成报告
"""

if __name__ == "__main__":
    # 1. 构建Golden Set
    print("构建Golden Set...")
    golden_set = GoldenSet(name="example_eval")

    # 添加测试用例
    test_cases = [
        {
            "id": "case_001",
            "query": "Python是什么?",
            "ground_truth": {
                "answer": "Python是一种高级编程语言",
                "relevant_docs": ["doc_001"]
            },
            "category": "programming",
            "difficulty": "easy"
        },
        {
            "id": "case_002",
            "query": "如何用Python读取文件?",
            "ground_truth": {
                "answer": "使用open()函数",
                "relevant_docs": ["doc_002", "doc_003"],
                "expected_tools": ["file_read"]
            },
            "category": "programming",
            "difficulty": "medium"
        }
    ]

    for tc in test_cases:
        golden_set.add_case(TestCase(**tc))

    print(f"  - 构建了 {golden_set.count()} 个测试用例")

    # 2. 离线评测
    print("\n执行离线评测...")
    # rag_system = RAGSystem(...)
    # evaluator = RAGEvaluator(rag_system)
    # metrics = evaluator.evaluate(golden_set)

    # 3. A/B测试
    print("\n设置A/B测试...")
    ab_test = ABTest(
        name="rag_optimization",
        variants=[
            Variant(name="baseline", config={"model": "gpt-3.5"}, traffic_ratio=0.5),
            Variant(name="optimized", config={"model": "gpt-4"}, traffic_ratio=0.5)
        ]
    )
    ab_test.activate()

    # 分配用户
    variant_for_user = ab_test.assign_variant(user_id="user_123")
    print(f"  - 用户user_123被分配到: {variant_for_user.name}")

    # 4. 生成报告
    print("\n生成评测报告...")
    report = {
        "test_set": golden_set.name,
        "test_count": golden_set.count(),
        "ab_test": ab_test.name,
        "variants": [v.name for v in ab_test.variants]
    }

    import json
    print(json.dumps(report, indent=2, ensure_ascii=False))

面试高频问法

Q1: 如何构建Agent的评测体系?

标准回答:

评测体系构建:

1. 数据准备
   - 构建Golden Set(标准测试集)
   - 覆盖不同类型和难度
   - 包含预期工具调用和答案

2. 离线评测
   - 批量执行测试用例
   - 计算准确率、召回率等指标
   - 按类别/难度分析

3. 在线评测
   - 收集用户反馈
   - 实时监控性能
   - A/B测试对比

4. 指标定义
   - 准确性:答案正确率
   - 效率:迭代次数、token使用
   - 安全性:危险操作检测

实现:
```python
def evaluate_agent(agent, golden_set):
    results = []

    for case in golden_set:
        response = agent.run(case.query)

        results.append({
            "case_id": case.id,
            "success": check_success(response, case.ground_truth),
            "efficiency": calculate_efficiency(response)
        })

    # 计算指标
    success_rate = sum(1 for r in results if r["success"]) / len(results)
    avg_efficiency = sum(r["efficiency"] for r in results) / len(results)

    return {
        "success_rate": success_rate,
        "avg_efficiency": avg_efficiency
    }</code></pre>
<p>```</p>
<h3>Q2: 如何进行A/B测试?</h3>
<p>标准回答:</p>
<pre><code>A/B测试流程:

1. 设计测试
   - 定义对照组和实验组
   - 设置流量分配比例
   - 定义成功指标

2. 实现分组
   - 使用一致性哈希分配用户
   - 确保同一用户总是分配到同一组
   - 记录分配结果

3. 收集数据
   - 记录两组的用户行为
   - 收集关键指标(转化率、满意度)
   - 确保数据完整性

4. 统计分析
   - 计算指标提升
   - 使用统计检验判断显著性
   - 计算置信区间

5. 决策
   - 如果显著且正向,全量上线
   - 如果不显著,继续观察或调整

实现:
```python
class ABTest:
    def assign_variant(self, user_id):
        # 一致性哈希
        hash_val = hash(user_id) % 100

        if hash_val < 50:
            return "control"
        else:
            return "experiment"

    def analyze(self, results):
        # 计算转化率
        control_rate = results["control"]["conversions"] / results["control"]["users"]
        exp_rate = results["experiment"]["conversions"] / results["experiment"]["users"]

        # 计算提升
        lift = (exp_rate - control_rate) / control_rate

        return {
            "control_rate": control_rate,
            "experiment_rate": exp_rate,
            "lift_percent": lift * 100
        }

### Q3: 如何评估RAG系统的效果?

标准回答:

RAG评估维度:

  1. 检索质量

    • Recall:正确文档被检索到的比例
    • Precision:检索结果中正确文档的比例
    • NDCG:排序质量
  2. 生成质量

    • Faithfulness:答案基于检索内容
    • Answer Relevance:答案相关性
    • Context Precision:上下文相关性
  3. 评估方法

    a) 使用RAGAS

    
    from ragas import evaluate
    from ragas.metrics import faithfulness, answer_relevance

results = evaluate(
dataset=dataset,
metrics=[faithfulness, answer_relevance]
)


   b) 自定义评估
```python
def evaluate_rag(rag_system, golden_set):
    results = []

    for case in golden_set:
        response = rag_system.query(case.query)

        # 检索评估
        recall = calculate_recall(
            response["sources"],
            case["relevant_docs"]
        )

        # 生成评估
        bleu = calculate_bleu(
            response["answer"],
            case["ground_truth_answer"]
        )

        results.append({"recall": recall, "bleu": bleu})

    return aggregate(results)

c) LLM-as-Judge

  • 用GPT-4评估答案质量
  • 评分+给出改进建议

总结

评测体系核心要点

要点 策略
Golden Set 多类型、多难度覆盖
离线评测 批量执行、快速迭代
在线评测 用户反馈、实时监控
A/B测试 流量分配、统计分析
指标分析 准确性、效率、安全性

最佳实践

  1. 数据质量:Golden Set必须准确完整
  2. 指标全面:准确性+效率+安全性
  3. 自动化:集成到CI/CD流程
  4. 持续监控:线上实时监控
  5. A/B验证:重要改动需AB测试
close
arrow_upward