内容纲要
模块四:工具接入与权限控制(详解版)
覆盖:Tool Calling、权限系统、限流、审计日志
目录
必须掌握的概念
4.1 Tool Calling(工具调用)
定义:
LLM 通过调用外部函数/工具来扩展能力,执行搜索、计算、API 调用等操作。
标准流程:
用户查询
↓
LLM 分析
↓
决定调用工具
↓
返回 function_call
↓
执行工具
↓
返回结果给 LLM
↓
LLM 生成最终答案
4.2 Permission Control(权限控制)
权限类型:
| 权限 | 说明 | 示例 |
|---|---|---|
| READ | 读取数据 | 获取用户信息 |
| WRITE | 写入数据 | 更新用户配置 |
| EXECUTE | 执行操作 | 运行命令 |
| ADMIN | 管理员权限 | 删除用户 |
RBAC 模型:
- 用户 → 角色 → 权限
- 一个用户可以有多个角色
- 一个角色有多个权限
4.3 Rate Limiting(限流)
限流类型:
| 类型 | 说明 | 适用场景 |
|---|---|---|
| Token | 令牌桶 | API 调用 |
| Sliding Window | 滑动窗口 | 防止突发流量 |
| Fixed Window | 固定窗口 | 简单限流 |
| Leaky Bucket | 漏桶 | 漏水限流 |
4.4 Audit Log(审计日志)
审计内容:
timestamp:时间戳user_id:用户 IDtool_name:工具名称parameters:调用参数(脱敏后)result:执行结果status:成功/失败duration_ms:执行耗时
关键设计点
4.1 工具注册与管理
# tools/tool_registry.py
"""
工具注册器完整实现
包含:工具白名单、参数验证、权限检查、限流控制、审计日志
"""
from typing import Callable, Dict, List, Optional, Set
from dataclasses import dataclass, field
from enum import Enum
import time
import threading
from abc import ABC, abstractmethod
from datetime import datetime
from pydantic import BaseModel, Field, validator
# ============ 枚举定义 ============
class Permission(Enum):
READ = "read"
WRITE = "write"
EXECUTE = "execute"
ADMIN = "admin"
DELETE = "delete"
class ToolStatus(Enum):
ACTIVE = "active"
INACTIVE = "inactive"
DEPRECATED = "deprecated"
# ============ 数据结构 ============
@dataclass
class ToolCall:
"""工具调用记录"""
tool_name: str
user_id: str
parameters: Dict
timestamp: datetime = field(default_factory=datetime.now)
duration_ms: Optional[float] = None
status: Optional[str] = None
error: Optional[str] = None
@dataclass
class ToolMetrics:
"""工具指标"""
total_calls: int = 0
success_calls: int = 0
failed_calls: int = 0
avg_duration_ms: float = 0.0
total_duration_ms: float = 0kt: str, user_id: str, **kwargs) -> Any:
"""安全调用工具"""
start_time = time.time()
call = ToolCall(tool_name=tool_name, user_id=user_id, parameters=kwargs)
try:
# 1. 检查工具存在
tool = self._get_tool(tool_name)
if tool is None:
raise ToolNotFoundError(f"工具不存在: {tool_name}")
# 2. 检查工具状态
if tool.status != ToolStatus.ACTIVE:
raise ToolInactiveError(f"工具未激活: {tool_name}")
# 3. 检查权限
self._check_permissions(tool_name, user_permissions)
# 4. 检查限流
self._check_rate_limit(tool_name, user_id)
# 5. 参数验证
validated_params = self._validate_parameters(tool_name, kwargs)
# 6. 执行工具
result = tool.execute(**validated_params)
# 7. 记录指标
duration_ms = (time.time() - start_time) * 1000
self._update_metrics(tool_name, duration_ms, True)
# 8. 审计日志
call.duration_ms = duration_ms
call.status = "success"
self._audit_log.log_call(call)
return result
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
self._update_metrics(tool_name, duration_ms, False)
call.duration_ms = duration_ms
call.status = "failed"
call.error = str(e)
self._audit_log.log_call(call)
raise
def _get_tool(self, name: str) -> Optional[BaseTool]:
"""获取工具"""
with self._lock:
return self._tools.get(name)
def _check_permissions(
self,
tool_name: str,
user_permissions: Set[Permission]
):
"""检查权限"""
tool = self._get_tool(tool_name)
if tool is None:
raise ToolNotFoundError(f"工具不存在: {tool_name}")
required = tool.required_permissions
if not all(p in user_permissions for p in required):
missing = [p.value for p in required if p not in user_permissions]
raise PermissionError(
f"权限不足,需要权限: {', '.join(missing)}"
)
def _check_rate_limit(self, tool_name: str, user_id: str):
"""检查限流"""
rate_limit = self._rate_limits.get(tool_name)
if rate is None:
return # 无限流限制
current_time = time.time()
key = f"{tool_name}:{user_id}"
with self._rate_lock:
# 清理过期记录
if key not in self._call_history:
self._call_history[key] = []
self._call_history[key] = [
t for t in self._call_history[key]
if current_time - t < rate_limit.window
]
# 检查是否超限
if len(self._call_history[key]) >= rate_limit.max_calls:
raise RateLimitError(
f"超过限流:{rate_limit.max_calls} 次/{rate_limit.window} 秒"
)
# 记录调用
self._call_history[key].append(current_time)
def _validate_parameters(self, tool_name: str, params: Dict) -> Dict:
"""验证参数"""
tool = self._get_tool(tool_name)
if tool is None:
raise ToolNotFoundError(f"工具不存在: {tool_name}")
# 使用 Pydantic 验证
try:
return tool.parameters_schema.parse_obj(params)
except Exception as e:
raise ParameterValidationError(f"参数验证失败: {e}")
def _update_metrics(self, tool_name: str, duration_ms: float, success: bool):
"""更新指标"""
with self._metrics_lock:
metrics = self._metrics.setdefault(tool_name, ToolMetrics())
metrics.total_calls += 1
metrics.total_duration_ms += duration_ms
if success:
metrics.success_calls += 1
else:
metrics.failed_calls += 1
# 计算平均耗时
metrics.avg_duration_ms = (
metrics.total_duration_ms / metrics.total_calls
)
def get_metrics(self, tool_name: str) -> Optional[ToolMetrics]:
"""获取工具指标"""
with self._metrics_lock:
return self._metrics.get(tool_name)
def get_all_metrics(self) -> Dict[str, ToolMetrics]:
"""获取所有工具指标"""
with self._metrics_lock:
return self._metrics.copy()
# ============ 错误类 ============
class ToolError(Exception):
"""工具错误基类"""
pass
class ToolNotFoundError(ToolError):
"""工具不存在"""
pass
class ToolInactiveError(ToolError):
"""工具未激活"""
pass
class PermissionError(ToolError):
"""权限错误"""
pass
class RateLimitError(ToolError):
"""限流错误"""
pass
class ParameterValidationError(ToolError):
"""参数验证错误"""
pass
# ============ 使用示例 ============main__":
# 1. 创建工具注册器
registry = ToolRegistry()
# 2. 注册工具
@registry.register_tool(
name="get_user_info",
description="获取用户信息",
status=ToolStatus.ACTIVE,
required_permissions=[{Permission.READ}]
)
def get_user_info(user_id: int) -> Dict:
"""获取用户信息"""
print(f"调用 get_user_info({user_id})")
return {"id": user_id, "name": f"User{user_id}", "email": f"user{user_id}@example.com"}
@registry.register_tool(
name="update_user",
description="更新用户信息",
status=ToolStatus.ACTIVE,
required_permissions=[{Permission.WRITE}]
)
def update_user(user_id: int, name: str, email: str) -> Dict:
"""更新用户信息"""
print(f"调用 update_user({user_id}, {name}, {email})")
return {"id": user_id, "name": name, "email": email, "updated": True}
@registry.register_tool(
name="delete_user",
description="删除用户",
status=ToolStatus.ACTIVE,
required_permissions=[{Permission.DELETE}]
)
def delete_user(user_id: int) -> Dict:
"""删除用户"""
print(f"调用 delete_user({user_id})")
return {"id": user_id, "deleted": True}
# 3. 设置限流
registry.set_rate_limit(
"get_user_info",
RateLimit(max_calls=10, window=60) # 每分钟60次
)
registry.set_rate_limit(
"update_user",
RateLimit(max_calls=5, window=60)
)
# 4. 用户权限
admin_permissions = {Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN}
user_permissions = {Permission.READ, Permission.WRITE}
read_only_permissions = {Permission.READ}
# 5. 测试调用
print("=" * 60)
print("工具调用测试")
print("=" * 60)
# 测试1: 普通用户读取
print("\n测试1: 普通用户读取(成功)")
result = registry.call_tool(
tool_name="get_user_info",
user_permissions=user_permissions,
user_id=1
)
print(f"结果: {result}")
# 测试2: 普通用户删除(失败)
print("\n测试2: 普通用户删除(权限不足)")
try:
result = registry.call_tool(
tool_name="delete_user",
user_permissions=user_permissions,
user_id=1
)
except PermissionError as e:
print(f"✗ 权限错误: {e}")
# 测试3: 管理员删除(成功)
print("\n测试3: 管理员删除(成功)")
result = registry.call_tool(
tool_name="delete_user",
user_permissions=admin_permissions,
user_id=1
)
print(f"结果: {result}")
# 测试4: 限流
print("\n测试4: 限流测试(10次调用,前5次成功)")
for i in range(10):
try:
result = registry.call_tool(
tool_name="get_user_info",
user_permissions=admin_permissions,
user_id=i
)
print(f" 第 {i+1} 次调用: ✅ 成功")
except RateLimitError as e:
print(f" 第 {i+1} 次调用: ✗ 限流")
# 查看指标
print("\n" + "=" * 60)
print("工具指标")
print("=" * 60)
metrics = registry.get_all_metrics()
for tool_name, tool_metrics in metrics.items():
print(f"\n工具: {tool_name}")
print(f" 总调用次数: {tool_metrics.total_calls}")
print(f" 成功次数: {tool_metrics.success_calls}")
print(f" 失败次数: {tool_metrics.failed_calls}")
print(f" 平均耗时: {tool_metrics.avg_duration_ms:.2f}ms")
常见坑与解决方案
4.1 参数注入
问题:
用户通过工具参数注入恶意代码
# 危险示例
def execute_query(query: str):
return db.execute(query) # SQL 注入!
execute_query("SELECT * FROM users; DROP TABLE users;")
解决方案:
from pydantic import BaseModel, Field
class QueryInput(BaseModel):
query: str = Field(..., min_length=1, max_length=1000)
@validator('query')
def validate_no_sql(cls, v):
# 检查 SQL 关键词
sql_keywords = ['select', 'insert', 'update', 'delete', 'drop']
if any(kw in v.lower() for kw in sql_keywords):
raise ValueError("不允许包含 SQL 关键词")
return v
4.2 权限提升(权限逃逸)
问题:
用户通过漏洞提升权限
解决方案:
- 最小权限原则:每个用户只给必要权限
- 定期审计:检查用户权限
- 权限分离:读写分离、操作分离
4.3 限流绕过
问题:
用户通过多 IP/多账号绕过限流
解决方案:
# 多维限流
class MultiDimensionRateLimiter:
def check(self, user_id: str, ip: str) -> bool:
# 检查用户级限流
if not self._check_user(user_id):
return False
# 检查 IP 级限流
if not self._check_ip(ip):
return False
# 检查用户+IP 组合限流
if not self._check_user_ip(user_id, ip):
return False
return True
面试高频问法
Q1: 如何防止 Agent 调用危险工具?
标准回答:
安全措施:
1. 工具白名单
- 只允许经过注册和审核的工具
- 禁止危险工具(delete、drop、exec)
2. 参数验证
- 使用 Pydantic 定义参数 schema
- 类型检查、范围检查、格式验证
- 检测 SQL 注入、命令注入
3. 权限控制
- RBAC:用户-角色-权限
- 工具级别权限检查
- 最小权限原则
4. 限流
- 用户级限流
- 工具级限流
- IP 级限流
- 滑动窗口策略
5. 审计日志
- 记录所有工具调用
- 记录调用者、参数、结果
- 参数脱敏(密码、token)
- 异常告警
6. 实时监控
- 检测异常调用模式
- 流量监控
- 自动封禁
7. 工具沙箱
- 限制执行环境
- 超时控制
- 禁用危险操作
工程实现:
```python
@tool_registry.register(
name="execute_query",
permissions=[Permission.READ], # 只读权限
rate_limit=RateLimit(max_calls=10, window=60) # 限流
)
def execute_query(query: QueryInput) -> Dict:
# 参数已通过 Pydantic 验证
# 只读权限,不能执行修改操作
return database.execute(query)
```
记忆要点
工具安全六道防线:
1. 白名单:只允许注册的工具
2. 参数验证:Pydantic + 正则
3. 权限控制:RBAC + 最小权限
4. 限流:用户 + 工具 + IP
5. 审计日志:记录 + 脱敏
6. 实时监控:异常 + 自动封禁
最小 Demo
见上文完整实现
实战场景
场景:企业工具调用平台
需求:
企业内部工具平台,各种业务系统通过 Agent 调用
安全要求:
- 细粒权限控制
- 完整审计日志
- 性能监控
- 成本追踪
文档版本: 1.0