内容纲要
业务评测体系
目录
1. 评测体系概述
1.1 评测架构
┌─────────────────────────────────────────────────────┐
│ 业务评测体系架构 │
├─────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ 离线评测 │ │ 在线评测 │ │
│ │ (Offline) │ │ (Online) │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────────────────────────┐ │
│ │ 评测数据集 │ │
│ │ - Golden Set │ │
│ │ - 真实用户数据 │ │
│ │ - 人工标注数据 │ │
│ └──────────────────────────────────────┘ │
│ │ ▲ │
│ ▼ │ │
│ ┌──────────────────────────────────────┐ │
│ │ 评测指标 │ │
│ │ - 准确性指标 │ │
│ │ - 性能指标 │ │
│ │ - 业务指标 │ │
│ └──────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────┐ │
│ │ 评测报告 │ │
│ └──────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────┘
1.2 评测类型对比
| 类型 | 特点 | 使用场景 | 数据来源 |
|---|---|---|---|
| 离线评测 | 批量处理、快速迭代 | 模型调优、功能验证 | Golden Set |
| 在线评测 | 真实用户、实时反馈 | 上线后监控、A/B测试 | 用户日志 |
| 人工评测 | 高质量、成本高 | 关键功能验证 | 人工标注 |
| 自动评测 | 效率高、可重复 | 日常测试、回归 | 自动生成数据 |
1.3 评测目标
┌─────────────────────────────────────────────────┐
│ 评测目标 │
├─────────────────────────────────────────────────┤
│ │
│ 准确性目标 │
│ ✓ 答案正确率 > 90% │
│ ✓ 忠实度 > 85% │
│ ✓ 相关性 > 90% │
│ │
│ 性能目标 │
│ ✓ P95延迟 < 3s │
│ ✓ 吞吐量 > 100 QPS │
│ ✓ 错误率 < 1% │
│ │
│ 业务目标 │
│ ✓ 用户满意度 > 85% │
│ ✓ 问题解决率 > 80% │
│ ✓ 成本控制在预算内 │
│ │
└─────────────────────────────────────────────────┘
2. 离线评测
2.1 Golden Set构建
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime
import json
@dataclass
class TestCase:
"""测试用例"""
id: str
query: str # 问题
ground_truth: Dict # 标准答案
category: str # 类别
difficulty: str # 难度:easy/medium/hard
metadata: Dict = None
created_at: datetime = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
if self.created_at is None:
self.created_at = datetime.utcnow()
class GoldenSet:
"""Golden Set - 标准测试集"""
def __init__(self, name: str):
self.name = name
self.test_cases: Dict[str, TestCase] = {}
def add_case(self, case: TestCase):
"""添加测试用例"""
self.test_cases[case.id] = case
def get_case(self, case_id: str) -> Optional[TestCase]:
"""获取测试用例"""
return self.test_cases.get(case_id)
def get_by_category(self, category: str) -> List[TestCase]:
"""按类别获取"""
return [
case for case in self.test_cases.values()
if case.category == category
]
def get_by_difficulty(self, difficulty: str) -> List[TestCase]:
"""按难度获取"""
return [
case for case in self.test_cases.values()
if case.difficulty == difficulty
]
def get_all(self) -> List[TestCase]:
"""获取所有用例"""
return list(self.test_cases.values())
def count(self) -> int:
"""用例数量"""
return len(self.test_cases)
def save(self, file_path: str):
"""保存到文件"""
data = {
"name": self.name,
"test_cases": [
{
"id": case.id,
"query": case.query,
"ground_truth": case.ground_truth,
"category": case.category,
"difficulty": case.difficulty,
"metadata": case.metadata,
"created_at": case.created_at.isoformat()
}
for case in self.test_cases.values()
]
}
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
@classmethod
def load(cls, file_path: str) -> 'GoldenSet':
"""从文件加载"""
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
golden_set = cls(data["name"])
for case_data in data["test_cases"]:
case = TestCase(
id=case_data["id"],
query=case_data["query"],
ground_truth=case_data["ground_truth"],
category=case_data["category"],
difficulty=case_data["difficulty"],
metadata=case_data.get("metadata", {}),
created_at=datetime.fromisoformat(case_data["created_at"])
)
golden_set.add_case(case)
return golden_set
# ============== 使用示例 ==============
if __name__ == "__main__":
# 创建Golden Set
golden_set = GoldenSet(name="llm_agent_eval")
# 添加测试用例
golden_set.add_case(TestCase(
id="case_001",
query="Python是什么?",
ground_truth={
"answer": "Python是一种高级编程语言",
"type": "factual",
"confidence": 1.0
},
category="programming",
difficulty="easy"
))
golden_set.add_case(TestCase(
id="case_002",
query="如何用Python读取文件?",
ground_truth={
"answer": "使用open()函数",
"code_example": "with open('file.txt', 'r') as f:\n content = f.read()",
"type": "procedural"
},
category="programming",
difficulty="medium"
))
# 保存
golden_set.save("./golden_set.json")
# 加载
loaded_set = GoldenSet.load("./golden_set.json")
print(f"加载了 {loaded_set.count()} 个测试用例")
2.2 RAG评测
from typing import List, Dict
class RAGEvaluator:
"""RAG评测器"""
def __init__(self, rag_system):
self.rag_system = rag_system
def evaluate(
self,
golden_set: GoldenSet
) -> Dict:
"""
评测RAG系统
Returns:
{
"retrieval_metrics": {...},
"generation_metrics": {...},
"overall_score": float
}
"""
results = []
for case in golden_set.get_all():
# 执行RAG查询
response = self.rag_system.query(case.query)
# 评估单个用例
case_result = self._evaluate_case(case, response)
results.append(case_result)
# 计算聚合指标
return self._aggregate_results(results)
def _evaluate_case(
self,
case: TestCase,
response: Dict
) -> Dict:
"""评估单个用例"""
ground_truth = case.ground_truth
# 检索评估
retrieval_metrics = self._evaluate_retrieval(
ground_truth,
response
)
# 生成评估
generation_metrics = self._evaluate_generation(
ground_truth,
response
)
return {
"case_id": case.id,
"retrieval_metrics": retrieval_metrics,
"generation_metrics": generation_metrics,
"overall": (
retrieval_metrics["recall"] * 0.4 +
retrieval_metrics["precision"] * 0.3 +
generation_metrics["bleu"] * 0.3
)
}
def _evaluate_retrieval(
self,
ground_truth: Dict,
response: Dict
) -> Dict:
"""评估检索质量"""
retrieved_docs = response.get("sources", [])
expected_docs = ground_truth.get("relevant_docs", [])
if not expected_docs:
return {"recall": 1.0, "precision": 1.0, "ndcg": 1.0}
# 计算Recall
retrieved_ids = {doc.get("id") for doc in retrieved_docs}
expected_ids = set(expected_docs)
hit = len(retrieved_ids & expected_ids)
recall = hit / len(expected_ids) if expected_ids else 0
precision = hit / len(retrieved_ids) if retrieved_docs else 0
# 计算NDCG
ndcg = self._calculate_ndcg(retrieved_docs, expected_ids)
return {
"recall": recall,
"precision": precision,
"ndcg": ndcg
}
def _calculate_ndcg(
self,
retrieved_docs: List[Dict],
expected_ids: set
) -> float:
"""计算NDCG"""
dcg = 0
for i, doc in enumerate(retrieved_docs):
if doc.get("id") in expected_ids:
dcg += 1 / (i + 1)
# 理想DCG
ideal_dcg = sum(1 / (i + 1) for i in range(len(expected_ids)))
return dcg / ideal_dcg if ideal_dcg > 0 else 0
def _evaluate_generation(
self,
ground_truth: Dict,
response: Dict
) -> Dict:
"""评估生成质量"""
predicted = response.get("answer", "")
reference = ground_truth.get("answer", "")
# BLEU分数
bleu = self._calculate_bleu(predicted, reference)
# 准确性
accuracy = self._calculate_accuracy(predicted, reference)
return {
"bleu": bleu,
"accuracy": accuracy
}
def _calculate_bleu(self, predicted: str, reference: str) -> float:
"""计算BLEU分数(简化)"""
# 简化实现,实际应使用nltk或sacreble
pred_words = predicted.split()
ref_words = reference.split()
if not pred_words:
return 0
# 1-gram匹配
pred_set = set(pred_words)
ref_set = set(ref_words)
match = len(pred_set & ref_set)
precision = match / len(pred_set)
recall = match / len(ref_set) if ref_set else 0
# F1
return 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
def _calculate_accuracy(self, predicted: str, reference: str) -> float:
"""计算准确率(基于关键词匹配)"""
import re
ref_keywords = set(re.findall(r'\w+', reference.lower()))
pred_keywords = set(re.findall(r'\w+', predicted.lower()))
if not ref_keywords:
return 1.0
match = len(ref_keywords & pred_keywords)
return match / len(ref_keywords)
def _aggregate_results(self, results: List[Dict]) -> Dict:
"""聚合所有结果"""
retrieval_metrics = {
"recall": [],
"precision": [],
"ndcg": []
}
generation_metrics = {
"bleu": [],
"accuracy": []
}
for result in results:
for key in retrieval_metrics:
retrieval_metrics[key].append(
result["retrieval_metrics"][key]
)
for key in generation_metrics:
generation_metrics[key].append(
result["generation_metrics"][key]
)
# 计算平均值
def mean(values):
return sum(values) / len(values) if values else 0
return {
"retrieval_metrics": {
"recall": mean(retrieval_metrics["recall"]),
"precision": mean(retrieval_metrics["precision"]),
"ndcg": mean(retrieval_metrics["ndcg"])
},
"generation_metrics": {
"bleu": mean(generation_metrics["bleu"]),
"accuracy": mean(generation_metrics["accuracy"])
},
"overall_score": mean([r["overall"] for r in results])
}
2.3 Agent评测
class AgentEvaluator:
"""Agent评测器"""
def __init__(self, agent):
self.agent = agent
def evaluate(
self,
golden_set: GoldenSet
) -> Dict:
"""评测Agent"""
results = []
for case in golden_set.get_all():
result = self._evaluate_case(case)
results.append(result)
return self._aggregate_results(results)
def _evaluate_case(self, case: TestCase) -> Dict:
"""评估单个用例"""
# 执行Agent
agent_response = self.agent.run(case.query)
# 获取工具调用
tool_calls = agent_response.get("tool_calls", [])
final_answer = agent_response.get("final_answer", "")
# 评估
metrics = {
"tool_selection": self._evaluate_tool_selection(
case,
tool_calls
),
"answer_quality": self._evaluate_answer_quality(
case,
final_answer
),
"efficiency": self._evaluate_efficiency(
agent_response
),
"safety": self._evaluate_safety(agent_response)
}
# 计算综合分数
metrics["overall"] = (
metrics["tool_selection"]["score"] * 0.25 +
metrics["answer_quality"]["score"] * 0.35 +
metrics["efficiency"]["score"] * 0.2 +
metrics["safety"]["score"] * 0.2
)
return {
"case_id": case.id,
"metrics": metrics,
"agent_response": agent_response
}
def _evaluate_tool_selection(
self,
case: TestCase,
tool_calls: List[Dict]
) -> Dict:
"""评估工具选择"""
expected_tools = case.ground_truth.get("expected_tools", [])
if not expected_tools:
return {"score": 1.0, "reason": "无预期工具"}
called_tools = [call.get("tool") for call in tool_calls]
# 计算召回率
expected_set = set(expected_tools)
called_set = set(called_tools)
recall = len(expected_set & called_set) / len(expected_set)
precision = len(expected_set & called_set) / len(called_set) if called_set else 0
return {
"score": (recall + precision) / 2,
"recall": recall,
"precision": precision,
"expected": expected_tools,
"called": called_tools
}
def _evaluate_answer_quality(
self,
case: TestCase,
answer: str
) -> Dict:
"""评估答案质量"""
expected_answer = case.ground_truth.get("answer", "")
# 相似度
similarity = self._semantic_similarity(answer, expected_answer)
# 包含关键信息
key_info = case.ground_truth.get("key_info", [])
key_info_coverage = sum(
1 for info in key_info if info in answer
) / len(key_info) if key_info else 1.0
return {
"score": (similarity + key_info_coverage) / 2,
"similarity": similarity,
"key_info_coverage": key_info_coverage
}
def _evaluate_efficiency(
self,
response: Dict
) -> Dict:
"""评估效率"""
iterations = response.get("iterations", 0)
tokens_used = response.get("tokens_used", 0)
# 迭代次数评分(越少越好)
iteration_score = max(0, 1 - iterations / 10)
# Token使用评分
token_score = max(0, 1 - tokens_used / 5000)
return {
"score": (iteration_score + token_score) / 2,
"iterations": iterations,
"tokens_used": tokens_used
}
def _evaluate_safety(self, response: Dict) -> Dict:
"""评估安全性"""
# 检查是否有危险操作
dangerous_calls = [
call for call in response.get("tool_calls", [])
if call.get("tool") in ["delete", "execute", "eval"]
]
# 检查答案是否包含敏感信息
answer = response.get("final_answer", "")
sensitive_patterns = [
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # 邮箱
r'\b1[3-9]\d{9}\b', # 手机号
]
has_sensitive = any(
re.search(pattern, answer)
for pattern in sensitive_patterns
)
score = 1.0
if dangerous_calls:
score -= 0.5
if has_sensitive:
score -= 0.5
return {
"score": max(0, score),
"dangerous_calls": len(dangerous_calls),
"has_sensitive_info": has_sensitive
}
def _semantic_similarity(self, text1: str, text2: str) -> float:
"""计算语义相似度"""
# 简化实现:使用关键词重叠
import re
words1 = set(re.findall(r'\w+', text1.lower()))
words2 = set(re.findall(r'\w+', text2.lower()))
if not words1 or not words2:
return 0
intersection = len(words1 & words2)
union = len(words1 | words2)
return intersection / union
3. 在线评测
3.1 用户反馈收集
from typing import List, Dict, Optional
from datetime import datetime
from enum import Enum
class FeedbackType(Enum):
"""反馈类型"""
POSITIVE = "positive"
NEGATIVE = "negative"
NEUTRAL = "neutral"
class UserFeedbackCollector:
"""用户反馈收集器"""
def __init__(self, storage=None):
self.storage = storage or {}
self._feedbacks = []
def add_feedback(
self,
user_id: str,
session_id: str,
query: str,
answer: str,
feedback_type: FeedbackType,
score: Optional[int] = None,
comment: Optional[str] = None,
metadata: Dict = None
):
"""添加用户反馈"""
feedback = {
"id": self._generate_id(),
"user_id": user_id,
"session_id": session_id,
"query": query,
"answer": answer,
"feedback_type": feedback_type.value,
"score": score,
"comment": comment,
"metadata": metadata or {},
"created_at": datetime.utcnow().isoformat()
}
self._feedbacks.append(feedback)
self._save_feedback(feedback)
def get_feedback(
self,
user_id: str = None,
session_id: str = None,
feedback_type: FeedbackType = None,
limit: int = 100
) -> List[Dict]:
"""获取反馈"""
filtered = self._feedbacks.copy()
if user_id:
filtered = [f for f in filtered if f["user_id"] == user_id]
if session_id:
filtered = [f for f in filtered if f["session_id"] == session_id]
if feedback_type:
filtered = [f for f in filtered if f["feedback_type"] == feedback_type.value]
return filtered[-limit:]
def calculate_metrics(
self,
start_time: datetime = None,
end_time: datetime = None
) -> Dict:
"""计算指标"""
feedbacks = self._feedbacks
# 时间过滤
if start_time or end_time:
feedbacks = [
f for f in feedbacks
if self._is_in_time_range(f, start_time, end_time)
]
if not feedbacks:
return {}
total = len(feedbacks)
positive = sum(1 for f in feedbacks if f["feedback_type"] == FeedbackType.POSITIVE.value)
negative = sum(1 for f in feedbacks if f["feedback_type"] == FeedbackType.NEGATIVE.value)
# 平均分数
scores = [f["score"] for f in feedbacks if f["score"] is not None]
avg_score = sum(scores) / len(scores) if scores else None
return {
"total_feedbacks": total,
"positive_rate": positive / total if total > 0 else 0,
"negative_rate": negative / total if total > 0 else 0,
"average_score": avg_score
}
def _generate_id(self) -> str:
"""生成ID"""
import uuid
return str(uuid.uuid4())
def _save_feedback(self, feedback: Dict):
"""保存反馈"""
# 简化实现:实际应存储到数据库
pass
def _is_in_time_range(
self,
feedback: Dict,
start_time: datetime = None,
end_time: datetime = None
) -> bool:
"""检查是否在时间范围内"""
feedback_time = datetime.fromisoformat(feedback["created_at"])
if start_time and feedback_time < start_time:
return False
if end_time and feedback_time > end_time:
return False
return True
3.2 实时监控
import time
from collections import deque
from threading import Thread
from typing import Callable
class RealTimeMonitor:
"""实时监控器"""
def __init__(
self,
alert_thresholds: Dict = None
):
self.alert_thresholds = alert_thresholds or {
"error_rate": 0.05,
"latency_p95": 5.0,
"qps": 100
}
# 指标存储
self.metrics = {
"requests": deque(maxlen=10000),
"latencies": deque(maxlen=10000),
"errors": deque(maxlen=10000),
}
# 告警回调
self.alert_callbacks: List[Callable] = []
# 监控线程
self._running = False
self._monitor_thread = None
def record_request(self, latency: float, error: bool = False):
"""记录记录请求"""
timestamp = time.time()
self.metrics["requests"].append(timestamp)
self.metrics["latencies"].append(latency)
if error:
self.metrics["errors"].append(timestamp)
def get_current_metrics(self, window: int = 60) -> Dict:
"""获取当前指标"""
current_time = time.time()
window_start = current_time - window
# 统计窗口内的请求
requests = [
t for t in self.metrics["requests"]
if t >= window_start
]
# 统计延迟
latencies = [
l for l, t in zip(
self.metrics["latencies"],
self.metrics["requests"]
) if t >= window_start
]
# 统计错误
errors = [
t for t in self.metrics["errors"]
if t >= window_start
]
# 计算指标
qps = len(requests) / window if window > 0 else 0
error_rate = len(errors) / len(requests) if requests else 0
if latencies:
avg_latency = sum(latencies) / len(latencies)
sorted_latencies = sorted(latencies)
p50_latency = sorted_latencies[len(sorted_latencies) // 2]
p95_latency = sorted_latencies[int(len(sorted_latencies) * 0.95)]
else:
avg_latency = 0
p50_latency = 0
p95_latency = 0
return {
"qps": qps,
"avg_latency": avg_latency,
"p50_latency": p50_latency,
"p95_latency": p95_latency,
"error_rate": error_rate,
"total_requests": len(requests),
"total_errors": len(errors)
}
def add_alert_callback(self, callback: Callable):
"""添加告警回调"""
self.alert_callbacks.append(callback)
def start_monitoring(self, interval: int = 5):
"""开始监控"""
self._running = True
self._monitor_thread = Thread(
target=self._monitor_loop,
args=(interval,),
daemon=True
)
self._monitor_thread.start()
def stop_monitoring(self):
"""停止监控"""
self._running = False
if self._monitor_thread:
self._monitor_thread.join()
def _monitor_loop(self, interval: int):
"""监控循环"""
while self._running:
metrics = self.get_current_metrics()
self._check_alerts(metrics)
time.sleep(interval)
def _check_alerts(self, metrics: Dict):
"""检查告警"""
alerts = []
# 错误率告警
if metrics["error_rate"] > self.alert_thresholds["error_rate"]:
alerts.append({
"type": "error_rate",
"value": metrics["error_rate"],
"threshold": self.alert_thresholds["error_rate"]
})
# 延迟告警
if metrics["p95_latency"] > self.alert_thresholds["latency_p95"]:
alerts.append({
"type": "latency",
"value": metrics["p95_latency"],
"threshold": self.alert_thresholds["latency_p95"]
})
# QPS告警
if metrics["qps"] < self.alert_thresholds["qps"]:
alerts.append({
"type": "low_qps",
"value": metrics["qps"],
"threshold": self.alert_thresholds["qps"]
})
# 触发告警回调
for alert in alerts:
for callback in self.alert_callbacks:
callback(alert)
4. 评测指标
4.1 指标定义
class MetricsRegistry:
"""指标注册表"""
# RAG指标
RAG_METRICS = {
"recall": {
"name": "召回率",
"description": "正确文档被检索到的比例",
"range": [0, 1],
"higher_better": True
},
"precision": {
"name": "精确率",
"description": "检索结果中正确文档的比例",
"range": [0, 1],
"higher_better": True
},
"ndcg": {
"name": "NDCG",
"description": "归一化折损累积增益",
"range": [0, 1],
"higher_better": True
},
"faithfulness": {
"name": "忠实度",
"description": "答案基于检索内容的程度",
"range": [0, 1],
"higher_better": True
},
"answer_relevance": {
"name": "答案相关性",
"description": "答案与问题的相关程度",
"range": [0, 1],
"higher_better": True
}
}
# Agent指标
AGENT_METRICS = {
"task_completion": {
"name": "任务完成率",
"description": "成功完成任务的比例",
"range": [0, 1],
"higher_better": True
},
"tool_accuracy": {
"name": "工具选择准确率",
"description": "正确选择工具的比例",
"range": [0, 1],
"higher_better": True
},
"efficiency": {
"name": "效率分数",
"description": "任务执行效率",
"range": [0, 1],
"higher_better": True
},
"safety": {
"name": "安全性分数",
"description": "安全操作的比例",
"range": [0, 1],
"higher_better": True
}
}
# 性能指标
PERFORMANCE_METRICS = {
"latency_p50": {
"name": "P50延迟",
"description": "50分位延迟",
"range": [0, float('inf')],
"higher_better": False
},
"latency_p95": {
"name": "P95延迟",
"description": "95分位延迟",
"range": [0, float('inf')],
"higher_better": False
},
"qps": {
"name": "每秒查询数",
"description": "系统吞吐量",
"range": [0, float('inf')],
"higher_better": True
},
"error_rate": {
"name": "错误率",
"description": "请求错误的比例",
"range": [0, 1],
"higher_better": False
}
}
@classmethod
def get_metric(cls, metric_name: str) -> Optional[Dict]:
"""获取指标定义"""
all_metrics = {
**cls.RAG_METRICS,
**cls.AGENT_METRICS,
**cls.PERFORMANCE_METRICS
}
return all_metrics.get(metric_name)
4.2 指标计算
import numpy as np
from typing import List
class MetricsCalculator:
"""指标计算器"""
@staticmethod
def calculate_recall(
retrieved: List[str],
relevant: List[str]
) -> float:
"""计算召回率"""
if not relevant:
return 1.0
retrieved_set = set(retrieved)
relevant_set = set(relevant)
hit = len(retrieved_set & relevant_set)
return hit / len(relevant_set)
@staticmethod
def calculate_precision(
retrieved: List[str],
relevant: List[str]
) -> float:
"""计算精确率"""
if not retrieved:
return 0.0
retrieved_set = set(retrieved)
relevant_set = set(relevant)
hit = len(retrieved_set & relevant_set)
return hit / len(retrieved)
@staticmethod
def calculate_f1(recall: float, precision: float) -> float:
"""计算F1分数"""
if recall + precision == 0:
return 0.0
return 2 * recall * precision / (recall + precision)
@staticmethod
def calculate_mrr(rankings: List[int]) -> float:
"""
计算MRR(平均倒数排名)
Args:
rankings: 相关文档的排名列表
"""
if not rankings:
return 0.0
reciprocal_ranks = [1 / r for r in rankings]
return sum(reciprocal_ranks) / len(rankings)
@staticmethod
def calculate_ndcg(
rankings: List[bool],
k: int = 10
) -> float:
"""
计算NDCG
Args:
rankings: 是否相关的布尔列表
k: 截断位置
"""
# DCG
dcg = 0
for i, is_relevant in enumerate(rankings[:k]):
if is_relevant:
dcg += 1 / np.log2(i + 2)
# 理想DCG
ideal_rankings = sorted(rankings[:k], reverse=True)
ideal_dcg = 0
for i, is_relevant in enumerate(ideal_rankings):
if is_relevant:
ideal_dcg += 1 / np.log2(i + 2)
return dcg / ideal_dcg if ideal_dcg > 0 else 0
@staticmethod
def calculate_percentile(values: List[float], percentile: float) -> float:
"""计算百分位数"""
if not values:
return 0.0
sorted_values = sorted(values)
index = int(len(sorted_values) * percentile / 100)
return sorted_values[index]
5. 评测工具
5.1 RAGAS集成
try:
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevance,
context_precision,
context_recall
)
RAGAS_AVAILABLE = True
except ImportError:
RAGAS_AVAILABLE = False
class RAGASEvaluator:
"""RAGAS评测器"""
def __init__(self):
if not RAGAS_AVAILABLE:
raise ImportError("RAGAS not installed. Install with: pip install ragas")
self.metrics = [
faithfulness,
answer_relevance,
context_precision,
context_recall
]
def evaluate(
self,
golden_set: GoldenSet,
rag_system
) -> Dict:
"""
使用RAGAS评测
Args:
golden_set: 标准测试集
rag_system: RAG系统
Returns:
评测结果
"""
# 准备数据集
dataset = []
for case in golden_set.get_all():
# 执行RAG查询
response = rag_system.query(case.query)
# 构建RAGAS格式
dataset.append({
"question": case.query,
"answer": response.get("answer"),
"contexts": [doc.get("content") for doc in response.get("sources", [])],
"ground_truth": case.ground_truth.get("answer"),
"ground_truths": [case.ground_truth.get("answer")]
})
# 执行评测
result = evaluate(dataset=dataset, metrics=self.metrics)
return result.to_pandas().to_dict()
5.2 自定义评测器
class CustomEvaluator:
"""自定义评测器"""
def __init__(self, llm=None):
self.llm = llm
def evaluate_quality(self, question: str, answer: str) -> Dict:
"""
评估答案质量
使用LLM评估
"""
prompt = f"""评估以下问答对的质量。
问题:{question}
答案:{answer}
请从以下维度评估(1-5分):
1. 准确性:答案是否准确
2. 完整性:答案是否完整
3. 相关性:答案是否相关
4. 清晰性:答案是否清晰
请以JSON格式输出:
{{
"accuracy": 5,
"completeness": 4,
"relevance": 5,
"clarity": 4,
"overall": 4.5,
"reason": "评估原因"
}}"""
response = self.llm.generate(prompt)
import json
try:
return json.loads(response)
except:
return {"overall": 3.0, "reason": "解析失败"}
6. A/B测试
6.1 A/B测试框架
from typing import Dict, List, Callable
import uuid
from datetime import datetime
from enum import Enum
class Variant:
"""测试变体"""
def __init__(
self,
name: str,
config: Dict,
traffic_ratio: float = 0.5
):
self.name = name
self.config = config
self.traffic_ratio = traffic_ratio
class ABTest:
"""A/B测试"""
def __init__(
self,
name: str,
variants: List[Variant],
description: str = None
):
self.id = str(uuid.uuid4())
self.name = name
self.variants = variants
self.description = description or ""
self.created_at = datetime.utcnow()
self.is_active = False
# 验证流量比例
self._validate_traffic()
def _validate_traffic(self):
"""验证流量总比例为1.0"""
total_ratio = sum(v.traffic_ratio for v in self.variants)
if abs(total_ratio - 1.0) > 0.01:
raise ValueError(
f"流量总比例应为1.0,当前为{total_ratio}"
)
def activate(self):
"""激活测试"""
self.is_active = True
def deactivate(self):
"""停用测试"""
self.is_active = False
def assign_variant(self, user_id: str = None) -> Variant:
"""
分配变体
Args:
user_id: 用户ID,用于一致性哈希
"""
if not self.is_active:
raise RuntimeError("测试未激活")
# 使用一致性哈希分配
import hashlib
seed = user_id or str(uuid.uuid4())
hash_value = int(hashlib.md5(seed.encode()).hexdigest(), 16)
# 根据流量比例分配
cumulative = 0
for variant in self.variants:
cumulative += variant.traffic_ratio
# 归一化hash值到[0, 1)
normalized_hash = hash_value / (2 ** 128)
if normalized_hash < cumulative:
return variant
return self.variants[-1] # fallback
class ABTestManager:
"""A/B测试管理器"""
def __init__(self):
self.tests: Dict[str, ABTest] = {}
def create_test(
self,
name: str,
variants: List[Variant],
description: str = None
) -> ABTest:
"""创建测试"""
test = ABTest(name, variants, description)
self.tests[test.id] = test
return test
def get_test(self, test_id: str) -> ABTest:
"""获取测试"""
return self.tests.get(test_id)
def list_tests(self) -> List[ABTest]:
"""列出所有测试"""
return list(self.tests.values())
def activate_test(self, test_id: str):
"""激活测试"""
if test_id in self.tests:
self.tests[test_id].activate()
def deactivate_test(self, test_id: str):
"""停用测试"""
if test_id in self.tests:
self.tests[test_id].deactivate()
6.2 A/B测试结果分析
import numpy as np
from scipy import stats
class ABTestAnalyzer:
"""A/B测试结果分析器"""
def analyze_conversion(
self,
variant_a: Dict, # {"impressions": int, "conversions": int}
variant_b: Dict,
confidence: float = 0.95
) -> Dict:
"""
分析转化率
Args:
variant_a: 变体A的指标
variant_b: 变体B的指标
confidence: 置信度
Returns:
{
"variant_a_rate": float,
"variant_b_rate": float,
"lift": float,
"lift_percent": float,
"p_value": float,
"is_significant": bool
}
"""
# 计算转化率
rate_a = variant_a["conversions"] / variant_a["impressions"]
rate_b = variant_b["conversions"] / variant_b["impressions"]
# 计算提升
lift = rate_b - rate_a
lift_percent = (lift / rate_a) * 100 if rate_a > 0 else 0
# Z检验
p_a = rate_a
p_b = rate_b
n_a = variant_a["impressions"]
n_b = variant_b["impressions"]
pooled_p = (n_a * p_a + n_b * p) / (n_a + n_b)
se = np.sqrt(
pooled_p * (1 - pooled_p) * (1/n_a + 1/n_b)
)
z_score = (p_b - p_a) / se if se > 0 else 0
p_value = 2 * (1 - stats.norm.cdf(abs(z_score)))
return {
"variant_a_rate": rate_a,
"variant_b_rate": rate_b,
"lift": lift,
"lift_percent": lift_percent,
"p_value": p_value,
"is_significant": p_value < (1 - confidence)
}
def analyze_continuous(
self,
values_a: List[float],
values_b: List[float],
confidence: float = 0.95
) -> Dict:
"""
分析连续指标(如延迟、评分)
使用t检验
"""
# 计算统计量
mean_a = np.mean(values_a)
mean_b = np.mean(values_b)
std_a = np.std(values_a)
std_b = np.std(values_b)
# t检验
t_stat, p_value = stats.ttest_ind(values_a, values_b)
# 计算提升
lift = mean_b - mean_a
lift_percent = (lift / mean_a) * 100 if mean_a > 0 else 0
return {
"variant_a_mean": mean_a,
"variant_b_mean": mean_b,
"variant_a_std": std_a,
"variant_b_std": std_b,
"lift": lift,
"lift_percent": lift_percent,
"p_value": p_value,
"is_significant": p_value < (1 - confidence)
}
def analyze_with_bootstrap(
self,
values_a: List[float],
values_b: List[float],
n_iterations: int = 10000
) -> Dict:
"""
使用Bootstrap分析
适用于非正态分布
"""
lifts = []
for _ in range(n_iterations):
# 重采样
sample_a = np.random.choice(values_a, size=len(values_a), replace=True)
sample_b = np.random.choice(values_b, size=len(values_b), replace=True)
# 计算提升
mean_a = np.mean(sample_a)
mean_b = np.mean(sample_b)
lift = mean_b - mean_a
lifts.append(lift)
# 计算置信区间
lifts_sorted = np.sort(lifts)
lower = np.percentile(lifts_sorted, 2.5)
upper = np.percentile(lifts_sorted, 97.5)
return {
"mean_lift": np.mean(lifts),
"ci_lower": lower,
"ci_upper": upper,
"ci_95_zero": lower <= 0 <= upper # 0是否在置信区间内
}
7. 评测流程
7.1 完整评测流程
class EvaluationPipeline:
"""评测流程"""
def __init__(
self,
golden_set: GoldenSet,
system: object,
config: Dict = None
):
self.golden_set = golden_set
self.system = system
self.config = config or {}
self.results = []
def run(self) -> Dict:
"""执行完整评测流程"""
print("=== 开始评测流程 ===")
# Step 1: 准备
print("\n[1/5] 准备")
self._prepare()
# Step 2: 执行
print("\n[2/5] 执行")
self._execute()
# Step 3: 评估
print("\n[3/5] 评估")
metrics = self._evaluate()
# Step 4: 分析
print("\n[4/5] 分析")
analysis = self._analyze()
# Step 5: 报告
print("\n[5/5] 报告")
report = self._generate_report(metrics, analysis)
print("\n=== 评测完成 ===")
return report
def _prepare(self):
"""准备阶段"""
print(f" - 测试用例数: {self.golden_set.count()}")
print(f" - 类别分布: {self._get_category_distribution()}")
def _execute(self):
"""执行阶段"""
from tqdm import tqdm
for case in tqdm(
self.golden_set.get_all(),
desc="执行测试"
):
result = self.system.run(case.query)
self.results.append({
"case": case,
"result": result
})
def _evaluate(self) -> Dict:
"""评估阶段"""
evaluator = RAGEvaluator(self.system)
return evaluator.evaluate(self.golden_set)
def _analyze(self) -> Dict:
"""分析阶段"""
# 按类别分析
category_analysis = {}
for category in set(
r["case"].category for r in self.results
):
category_results = [
r for r in self.results
if r["case"].category == category
]
category_analysis[category] = {
"count": len(category_results),
"success_rate": sum(
1 for r in category_results
if r["result"].get("success", False)
) / len(category_results)
}
# 按难度分析
difficulty_analysis = {}
for difficulty in set(
r["case"].difficulty for r in self.results
):
difficulty_results = [
r for r in self.results
if r["case"].difficulty == difficulty
]
difficulty_analysis[difficulty] = {
"count": len(difficulty_results),
"success_rate": sum(
1 for r in difficulty_results
if r["result"].get("success", False)
) / len(difficulty_results)
}
return {
"by_category": category_analysis,
"by_difficulty": difficulty_analysis,
"total": len(self.results)
}
def _generate_report(self, metrics: Dict, analysis: Dict) -> Dict:
"""生成报告"""
return {
"timestamp": datetime.utcnow().isoformat(),
"test_set": self.golden_set.name,
"test_count": self.golden_set.count(),
"metrics": metrics,
"analysis": analysis
}
def _get_category_distribution(self) -> Dict:
"""获取类别分布"""
distribution = {}
for case in self.golden_set.get_all():
category = case.category
distribution[category] = distribution.get(category, 0) + 1
return distribution
8. 实现示例
8.1 完整评测示例
"""
完整业务评测示例
1. 构建Golden Set
2. 离线评测
3. A/B测试
4. 生成报告
"""
if __name__ == "__main__":
# 1. 构建Golden Set
print("构建Golden Set...")
golden_set = GoldenSet(name="example_eval")
# 添加测试用例
test_cases = [
{
"id": "case_001",
"query": "Python是什么?",
"ground_truth": {
"answer": "Python是一种高级编程语言",
"relevant_docs": ["doc_001"]
},
"category": "programming",
"difficulty": "easy"
},
{
"id": "case_002",
"query": "如何用Python读取文件?",
"ground_truth": {
"answer": "使用open()函数",
"relevant_docs": ["doc_002", "doc_003"],
"expected_tools": ["file_read"]
},
"category": "programming",
"difficulty": "medium"
}
]
for tc in test_cases:
golden_set.add_case(TestCase(**tc))
print(f" - 构建了 {golden_set.count()} 个测试用例")
# 2. 离线评测
print("\n执行离线评测...")
# rag_system = RAGSystem(...)
# evaluator = RAGEvaluator(rag_system)
# metrics = evaluator.evaluate(golden_set)
# 3. A/B测试
print("\n设置A/B测试...")
ab_test = ABTest(
name="rag_optimization",
variants=[
Variant(name="baseline", config={"model": "gpt-3.5"}, traffic_ratio=0.5),
Variant(name="optimized", config={"model": "gpt-4"}, traffic_ratio=0.5)
]
)
ab_test.activate()
# 分配用户
variant_for_user = ab_test.assign_variant(user_id="user_123")
print(f" - 用户user_123被分配到: {variant_for_user.name}")
# 4. 生成报告
print("\n生成评测报告...")
report = {
"test_set": golden_set.name,
"test_count": golden_set.count(),
"ab_test": ab_test.name,
"variants": [v.name for v in ab_test.variants]
}
import json
print(json.dumps(report, indent=2, ensure_ascii=False))
面试高频问法
Q1: 如何构建Agent的评测体系?
标准回答:
评测体系构建:
1. 数据准备
- 构建Golden Set(标准测试集)
- 覆盖不同类型和难度
- 包含预期工具调用和答案
2. 离线评测
- 批量执行测试用例
- 计算准确率、召回率等指标
- 按类别/难度分析
3. 在线评测
- 收集用户反馈
- 实时监控性能
- A/B测试对比
4. 指标定义
- 准确性:答案正确率
- 效率:迭代次数、token使用
- 安全性:危险操作检测
实现:
```python
def evaluate_agent(agent, golden_set):
results = []
for case in golden_set:
response = agent.run(case.query)
results.append({
"case_id": case.id,
"success": check_success(response, case.ground_truth),
"efficiency": calculate_efficiency(response)
})
# 计算指标
success_rate = sum(1 for r in results if r["success"]) / len(results)
avg_efficiency = sum(r["efficiency"] for r in results) / len(results)
return {
"success_rate": success_rate,
"avg_efficiency": avg_efficiency
}</code></pre>
<p>```</p>
<h3>Q2: 如何进行A/B测试?</h3>
<p>标准回答:</p>
<pre><code>A/B测试流程:
1. 设计测试
- 定义对照组和实验组
- 设置流量分配比例
- 定义成功指标
2. 实现分组
- 使用一致性哈希分配用户
- 确保同一用户总是分配到同一组
- 记录分配结果
3. 收集数据
- 记录两组的用户行为
- 收集关键指标(转化率、满意度)
- 确保数据完整性
4. 统计分析
- 计算指标提升
- 使用统计检验判断显著性
- 计算置信区间
5. 决策
- 如果显著且正向,全量上线
- 如果不显著,继续观察或调整
实现:
```python
class ABTest:
def assign_variant(self, user_id):
# 一致性哈希
hash_val = hash(user_id) % 100
if hash_val < 50:
return "control"
else:
return "experiment"
def analyze(self, results):
# 计算转化率
control_rate = results["control"]["conversions"] / results["control"]["users"]
exp_rate = results["experiment"]["conversions"] / results["experiment"]["users"]
# 计算提升
lift = (exp_rate - control_rate) / control_rate
return {
"control_rate": control_rate,
"experiment_rate": exp_rate,
"lift_percent": lift * 100
}
### Q3: 如何评估RAG系统的效果?
标准回答:
RAG评估维度:
-
检索质量
- Recall:正确文档被检索到的比例
- Precision:检索结果中正确文档的比例
- NDCG:排序质量
-
生成质量
- Faithfulness:答案基于检索内容
- Answer Relevance:答案相关性
- Context Precision:上下文相关性
-
评估方法
a) 使用RAGAS
from ragas import evaluate from ragas.metrics import faithfulness, answer_relevance
results = evaluate(
dataset=dataset,
metrics=[faithfulness, answer_relevance]
)
b) 自定义评估
```python
def evaluate_rag(rag_system, golden_set):
results = []
for case in golden_set:
response = rag_system.query(case.query)
# 检索评估
recall = calculate_recall(
response["sources"],
case["relevant_docs"]
)
# 生成评估
bleu = calculate_bleu(
response["answer"],
case["ground_truth_answer"]
)
results.append({"recall": recall, "bleu": bleu})
return aggregate(results)
c) LLM-as-Judge
- 用GPT-4评估答案质量
- 评分+给出改进建议
总结
评测体系核心要点
| 要点 | 策略 |
|---|---|
| Golden Set | 多类型、多难度覆盖 |
| 离线评测 | 批量执行、快速迭代 |
| 在线评测 | 用户反馈、实时监控 |
| A/B测试 | 流量分配、统计分析 |
| 指标分析 | 准确性、效率、安全性 |
最佳实践
- 数据质量:Golden Set必须准确完整
- 指标全面:准确性+效率+安全性
- 自动化:集成到CI/CD流程
- 持续监控:线上实时监控
- A/B验证:重要改动需AB测试