【AI Agent 知识库】08-性能与成本-详解版

内容纲要

模块八:性能与成本(详解版)

覆盖:缓存策略、模型路由、异步处理、部署优化


目录


必须掌握的概念

8.1 缓存策略

定义:
通过缓存减少重复的 LLM 调用,降低延迟和成本。

缓存类型:

类型 说明 适用场景
Exact Match 精确匹配 相同输入完全相同
Semantic Cache 语义缓存 输入含义相似
Hybrid Cache 混合缓存 精确+语义结合
Distributed Cache 分布式缓存 多实例共享

缓存算法:

  • LRU(Least Recently Used)
  • LFU(Least Frequently Used)
  • TTL(Time To Live)
  • LRU-K(考虑访问次数)

8.2 模型路由(Model Routing)

定义:
根据任务复杂度动态选择合适的模型,平衡质量和成本。

路由策略:

策略 说明 模型选择
Simple 简单任务用小模型 任务长度 < 100 tokens → 小模型
Confidence 基于置信度 小模型置信度 < 阈值 → 大模型
Cascading 级联路由 先小模型,需要则升级
Cost-Aware 成本感知 预算约束下最优选择

8.3 异步处理

定义:
将耗时操作异步执行,提高系统吞吐量。

异步模式:

  • Task Queue:消息队列处理
  • Stream:流式响应
  • Batch:批量处理
  • Background:后台任务

8.4 部署优化

关键指标:

  • 延迟(Latency):P50, P95, P99
  • 吞吐量(Throughput):QPS, RPS
  • 并发(Concurrency):并发连接数
  • 成本(Cost):每 1K tokens 价格

部署策略:

  • 水平扩展(Horizontal Scaling)
  • 垂直扩展(Vertical Scaling)
  • 区域部署(Regional Deployment)
  • 边缘计算(Edge Computing)

关键设计点

8.1 缓存系统实现

# performance/cache_system.py
"""
缓存系统完整实现
包含:精确缓存、语义缓存、分布式缓存
"""

from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from enum import Enum
import time
import hashlib
import json
from collections import OrderedDict, deque
from abc import ABC, abstractmethod
import threading
import asyncio

# ============ 枚举定义 ============

class CacheType(Enum):
    EXACT = "exact"
    SEMANTIC = "semantic"
    HYBRID = "hybrid"

class CacheStrategy(Enum):
    LRU = "lru"
    LFU = "lfu"
    FIFO = "fifo"

# ============ 数据结构 ============

@dataclass
class CacheEntry:
    """缓存条目"""
    key: str
    value: Any
    timestamp: float
    hits: int = 0
    ttl: Optional[float] = None

    def is_expired(self) -> bool:
        """检查是否过期"""
        if self.ttl is None:
            return False
        return time.time() - self.timestamp > self.ttl

@dataclass
class CacheStats:
    """缓存统计"""
    hits: int = 0
    misses: int = 0
    evictions: int = 0
    total_requests: int = 0

    @property
    def hit_rate(self) -> float:
        """命中率"""
        if self.total_requests == 0:
            return 0.0
        return self.hits / self.total_requests

# ============ 缓存存储 ============

class CacheStorage(ABC):
    """缓存存储抽象"""

    @abstractmethod
    def get(self, key: str) -> Optional[CacheEntry]:
        pass

    @abstractmethod
    def put(self, entry: CacheEntry):
        pass

    @abstractmethod
    def delete(self, key: str):
        pass

    @abstractmethod
    def clear(self):
        pass

    @abstractmethod
    def size(self) -> int:
        pass

class InMemoryLRUCache(CacheStorage):
    """LRU 内存缓存"""

    def __init__(self, max_size: int = 1000):
        self.max_size = max_size
        self.cache: OrderedDict[str, CacheEntry] = OrderedDict()
        self.lock = threading.Lock()
        self.stats = CacheStats()

    def get(self, key: str) -> Optional[CacheEntry]:
        with self.lock:
            self.stats.total_requests += 1

            if key not in self.cache:
                self.stats.misses += 1
                return None

            entry = self.cache[key]

            # 检查过期
            if entry.is_expired():
                del self.cache[key]
                self.stats.misses += 1
                return None

            # 移动到末尾(最近使用)
            self.cache.move_to_end(key)

            entry.hits += 1
            self.stats.hits += 1

            return entry

    def put(self, entry: CacheEntry):
        with self.lock:
            # 如果已存在,先删除
            if entry.key in self.cache:
                del self.cache[entry.key]

            # 如果超过大小,删除最最老的
            while len(self.cache) >= self.max_size:
                self.cache.popitem(last=False)
                self.stats.evictions += 1

            # 添加新条目
            self.cache[entry.key] = entry

    def delete(self, key: str):
        with self.lock:
            if key in self.cache:
                del self.cache[key]

    def clear(self):
        with self.lock:
            self.cache.clear()

    def size(self) -> int:
        with self.lock:
            return len(self.cache)

    def get_stats(self) -> CacheStats:
        with self.lock:
            return CacheStats(
                hits=self.stats.hits,
                misses=self.stats.misses,
                evictions=self.stats.evictions,
                total_requests=self.stats.total_requests
            )

# ============ 语义缓存 ============

class SemanticCache:
    """语义缓存"""

    def __init__(self, storage: CacheStorage, embedder=None, similarity_threshold: float = 0.9):
        self.storage = storage
        self.embedder = embedder
        self.similarity_threshold = similarity_threshold

    def _generate_key(self, text: str) -> str:
        """生成缓存键"""
        return hashlib.md5(text.encode()).hexdigest()

    def get(self, query: str) -> Optional[CacheEntry]:
        """获取缓存"""
        # 1. 先尝试精确匹配
        exact_key = self._generate_key(query)
        entry = self.storage.get(exact_key)
        if entry:
            return entry

        # 2. 如果没有精确匹配,尝试语义匹配
        if self.embedder:
            query_embedding = self.embedder.embed(query)

            # 遍历所有缓存条目(实际应该使用向量数据库)
            for key in list(self.storage.cache.keys()) if hasattr(self.storage, 'cache') else []:
                cached_entry = self.storage.get(key)
                if cached_entry:
                    cached_embedding = self.embedder.embed(str(cached_entry.key))
                    similarity = self._cosine_similarity(query_embedding, cached_embedding)

                    if similarity >= self.similarity_threshold:
                        print(f"语义缓存命中,相似度:{similarity:.3f}")
                        return cached_entry

        return None

    def put(self, query: str, value: Any, ttl: Optional[float] = None):
        """存入缓存"""
        key = self._generate_key(query)
        entry = CacheEntry(
            key=key,
            value=value,
            timestamp=time.time(),
            ttl=ttl
        )
        self.storage.put(entry)

    def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
        """计算余弦相似度"""
        import numpy as np
        a = np.array(a)
        b = np.array(b)
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

# ============ 混合缓存 ============

class HybridCache:
    """混合缓存(精确 + 语义)"""

    def __init__(
        self,
        exact_cache: CacheStorage,
        semantic_cache: SemanticCache
    ):
        self.exact_cache = exact_cache
        self.semantic_cache = semantic_cache

    def get(self, query: str) -> Optional[CacheEntry]:
        # 先查精确缓存
        entry = self.exact_cache.get(query)
        if entry:
            return entry

        # 再查语义缓存
        entry = self.semantic_cache.get(query)
        if entry:
            return entry

        return None

    def put(self, query: str, value: Any, ttl: Optional[float] = None):
        # 同时存入两个缓存
        self.exact_cache.put(query, value, ttl)
        self.semantic_cache.put(query, value, ttl)

# ============ 缓存装饰器 ============

def cached(cache: CacheStorage, ttl: Optional[float] = None):
    """缓存装饰器"""

    def decorator(func):
        def wrapper(*args, **kwargs):
            # 生成缓存键
            cache_key = _generate_cache_key(func.__name__, args, kwargs)

            # 尝试从缓存获取
            entry = cache.get(cache_key)
            if entry and not entry.is_expired():
                print(f"缓存命中:{cache_key}")
                return entry.value

            # 调用函数
            result = func(*args, **kwargs)

            # 存入缓存
            cache.put(CacheEntry(
                key=cache_key,
                value=result,
                timestamp=time.time(),
                ttl=ttl
            ))

            return result

        return wrapper
    return decorator

def _generate_cache_key(func_name: str, args: tuple, kwargs: dict) -> str:
    """生成缓存键"""
    key_data = {
        "func": func_name,
        "args": str(args),
        "kwargs": str(sorted(kwargs.items()))
    }
    return hashlib.md5(json.dumps(key_data, sort_keys=True).encode()).hexdigest()

# ============ 使用示例 ============

if __name__ == "__main__":
    # 创建缓存
    cache = InMemoryLRUCache(max_size=100)
    semantic_cache = SemanticCache(cache, similarity_threshold=0.9)
    hybrid_cache = HybridCache(ExactCache(1000), semantic_cache)

    print("=" * 60)
    print("缓存系统测试")
    print("=" * 60)

    # 测试1:基本缓存
    print("\n测试1:基本缓存")
    cache.put(CacheEntry(key="test", value="Hello", timestamp=time.time()))
    result = cache.get("test")
    print(f"结果:{result.value if result else 'None'}")

    # 测试2:缓存统计
    print("\n测试2:缓存统计")
    for i in range(10):
        cache.get(f"key_{i % 3}")  # 模拟访问

    stats = cache.get_stats()
    print(f"命中率:{stats.hit_rate:.2%}")
    print(f"命中次数:{stats.hits}")
    print(f"未命中次数:{stats.misses}")

    # 测试3:装饰器
    print("\n测试3:缓存装饰器")

    @cached(cache, ttl=60)
    def expensive_computation(x: int) -> int:
        print(f"执行计算:{x}")
        return x * x

    # 第一次调用(实际执行)
    print("第一次调用:")
    result1 = expensive_computation(5)

    # 第二次调用(从缓存)
    print("第二次调用:")
    result2 = expensive_computation(5)

    print(f"结果相同:{result1 == result2}")

8.2 模型路由系统

# performance/model_router.py
"""
模型路由系统完整实现
包含:级联路由、成本路由、置信度路由
"""

from typing import Dict, List, Optional, Callable, Any
from dataclasses import dataclass
from enum import Enum
import time

# ============ 枚举定义 ============

class ModelTier(Enum):
    TINY = "tiny"          # 最便宜,最快
    SMALL = "small"        # 便宜,较快
    MEDIUM = "medium"      # 中等
    LARGE = "large"        # 质量好,贵
    HUGE = "huge"          # 最好,最贵

class RoutingStrategy(Enum):
    SIMPLE = "simple"      # 基于简单规则
    CONFIDENCE = "confidence"  # 基于置信度
    CASCADING = "cascading"    # 级联
    COST_AWARE = "cost_aware"   # 成本感知

# ============ 数据结构 ============

@dataclass
class ModelConfig:
    """模型配置"""
    name: str
    tier: ModelTier
    input_cost_per_1k: float  # 输入价格(每1K tokens)
    output_cost: float         # 输出价格
    avg_latency_ms: float     # 平均延迟
    max_tokens: int           # 最大token数
    confidence_threshold: float = 0.8

@dataclass
class RoutingDecision:
    """路由决策"""
    model: str
    reason: str
    confidence: float
    estimated_cost: float
    estimated_latency: float

@dataclass
class ModelResult:
    """模型结果"""
    output: str
    model_used: str
    tokens_used: int
    cost: float
    latency_ms: float
    confidence: Optional[float] = None
    need_upgrade: bool = False

# ============ 模型路由器 ============

class ModelRouter:
    """模型路由器"""

    def __init__(self, models: Dict[str, ModelConfig]):
        self.models = models
        self.routing_history: List[Dict] = []

    def route(
        self,
        query: str,
        strategy: RoutingStrategy = RoutingStrategy.CASCADING,
        budget: Optional[float] = None
    ) -> RoutingDecision:
        """路由决策"""

        if strategy == RoutingStrategy.SIMPLE:
            return self._simple_route(query)

        elif strategy == RoutingStrategy.CONFIDENCE:
            return self._confidence_route(query)

        elif strategy == RoutingStrategy.CASCADING:
            return self._cascading_route(query)

        elif strategy == RoutingStrategy.COST_AWARE:
            return self._cost_aware_route(query, budget)

        else:
            return self._simple_route(query)

    def _simple_route(self, query: str) -> RoutingDecision:
        """简单路由:根据查询长度"""
        query_length = len(query)

        if query_length < 50:
            model = self._get_best_model(ModelTier.TINY)
            reason = "查询简单,使用 TINY 模型"

        elif query_length < 200:
            model = self._get_best_model(ModelTier.SMALL)
            reason = "查询中等,使用 SMALL 模型"

        elif query_length < 500:
            model = self._get_best_model(ModelTier.MEDIUM)
            reason = "查询较复杂,使用 MEDIUM 模型"

        else:
            model = self._get_best_model(ModelTier.LARGE)
            reason = "查询复杂,使用 LARGE 模型"

        return RoutingDecision(
            model=model,
            reason=reason,
            confidence=0.8,
            estimated_cost=self._estimate_cost(model, query),
            estimated_latency=self._estimate_latency(model)
        )

    def _confidence_route(self, query: str) -> RoutingDecision:
        """置信度路由:基于模型置信度"""
        # 先用小模型
        small_model = self._get_best_model(ModelTier.SMALL)
        result = self._call_model(small_model, query)

        # 如果置信度不够,升级
        if result.confidence and result.confidence < 0.8:
            large_model = self._get_best_model(ModelTier.LARGE)
            reason = f"小模型置信度不足({result.confidence:.2f}),升级到 LARGE"
        else:
            reason = f"小模型置信度足够({result.confidence:.2f})"

        return RoutingDecision(
            model=result.model_used,
            reason=reason,
            confidence=result.confidence or 0.8,
            estimated_cost=result.cost,
            estimated_latency=result.latency_ms
        )

    def _cascading_route(self, query: str) -> RoutingDecision:
        """级联路由:从小模型开始,逐步升级"""
        tiers = [ModelTier.TINY, ModelTier.SMALL, ModelTier.MEDIUM, ModelTier.LARGE]

        for tier in tiers:
            model = self._get_best_model(tier)
            result = self._call_model(model, query)

            if not result.need_upgrade:
                return RoutingDecision(
                    model=model,
                    reason=f"{tier.value} 模型满足需求",
                    confidence=result.confidence or 0.8,
                    estimated_cost=result.cost,
                    estimated_latency=result.latency_ms
                )

        # 所有模型都不满足,使用最大模型
        largest = self._get_best_model(ModelTier.HUGE)
        return RoutingDecision(
            model=largest,
            reason="所有模型都不满足,使用最大模型",
            confidence=1.0,
            estimated_cost=self._estimate_cost(largest, query),
            estimated_latency=self._estimate_latency(largest)
        )

    def _cost_aware_route(self, query: str, budget: Optional[float]) -> RoutingDecision:
        """成本感知路由"""
        if budget is None:
            return self._cascading_route(query)

        # 按成本排序模型
        sorted_models = sorted(
            self.models.items(),
            key=lambda x: x[1].input_cost_per_1k
        )

        for model_name, config in sorted_models:
            estimated_cost = self._estimate_cost(model_name, query)
            if estimated_cost <= budget:
                return RoutingDecision(
                    model=model_name,
                    reason=f"在预算 {budget} 内选择 {model_name}",
                    confidence=0.8,
                    estimated_cost=estimated_cost,
                    estimated_latency=self._estimate_latency(model_name)
                )

        # 预算不足,使用最便宜的
        cheapest = sorted_models[0][0]
        return RoutingDecision(
            model=cheapest,
            reason=f"预算不足,使用最便宜的 {cheapest}",
            confidence=0.7,
            estimated_cost=self._estimate_cost(cheapest, query),
            estimated_latency=self._estimate_latency(cheapest)
        )

    def _get_best_model(self, tier: ModelTier) -> str:
        """获取指定层级的最佳模型"""
        candidates = [name for name, config in self.models.items() if config.tier == tier]
        if not candidates:
            # 没有该层级的模型,返回第一个
            return list(self.models.keys())[0]
        return candidates[0]

    def _estimate_cost(self, model: str, query: str) -> float:
        """估算成本"""
        config = self.models[model]
        input_tokens = len(query.split())
        output_tokens = input_tokens * 2  # 假设输出是输入的2倍
        return (input_tokens / 1000 * config.input_cost_per_1k +
                output_tokens / 1000 * config.output_cost)

    def _estimate_latency(self, model: str) -> float:
        """估算延迟"""
        return self.models[model].avg_latency_ms

    def _call_model(self, model: str, query: str) -> ModelResult:
        """调用模型(模拟)"""
        # 这里应该实际调用 LLM API
        start = time.time()

        # 模拟结果
        config = self.models[model]

        # 简单模型可能需要升级
        need_upgrade = config.tier in [ModelTier.TINY, ModelTier.SMALL]

        # 模拟延迟
        time.sleep(config.avg_latency_ms / 1000 * 0.1)

        return ModelResult(
            output=f"回答({model})",
            model_used=model,
            tokens_used=100,
            cost=self._estimate_cost(model, query),
            latency_ms=(time.time() - start) * 1000,
            confidence=0.9 if config.tier != ModelTier.TINY else 0.6,
            need_upgrade=need_upgrade
        )

# ============ 使用示例 ============

if __name__ == "__main__":
    # 定义模型
    models = {
        "gpt-4o-mini": ModelConfig(
            name="gpt-4o-mini",
            tier=ModelTier.TINY,
            input_cost_per_1k=0.00015,
            output_cost=0.0006,
            avg_latency_ms=200,
            max_tokens=16000
        ),
        "gpt-4o": ModelConfig(
            name="gpt-4o",
            tier=ModelTier.LARGE,
            input_cost_per_1k=0.0025,
            output_cost=0.01,
            avg_latency_ms=800,
            max_tokens=128000
        ),
        "claude-3.5-sonnet": ModelConfig(
            name="claude-3.5-sonnet",
            tier=ModelTier.MEDIUM,
            input_cost_per_1k=0.003,
            output_cost=0.015,
            avg_latency_ms=600,
            max_tokens=200000
        ),
    }

    # 创建路由器
    router = ModelRouter(models)

    print("=" * 60)
    print("模型路由测试")
    print("=" * 60)

    # 测试查询
    queries = [
        "1+1=?",
        "解释 Python 装饰器的原理",
        "分析并优化这个复杂的机器学习模型架构,包括超参数调优和部署策略"
    ]

    for strategy in [RoutingStrategy.SIMPLE, RoutingStrategy.CASCADING]:
        print(f"\n{strategy.value.upper()} 策略:")
        print("─" * 60)

        for query in queries:
            decision = router.route(query, strategy=strategy)
            print(f"\n查询:{query[:50]}...")
            print(f"  模型:{decision.model}")
            print(f"  原因:{decision.reason}")
            print(f"  预估成本:${decision.estimated_cost:.6f}")
            print(f"  预估延迟:{decision.estimated_latency:.0f}ms")

    # 测试成本感知路由
    print("\nCOST_AWARE 策略:")
    print("─" * 60)

    for budget in [0.0001, 0.001, 0.01]:
        query = "解释 Python 装饰器的原理"
        decision = router.route(query, strategy=RoutingStrategy.COST_AWARE, budget=budget)
        print(f"\n预算:${budget:.6f}")
        print(f"  选择模型:{decision.model}")
        print(f"  实际成本:${decision.estimated_cost:.6f}")

8.3 异步处理系统

# performance/async_processor.py
"""
异步处理系统完整实现
包含:任务队列、流式响应、批量处理
"""

from typing import Optional, Dict, Any, List, Callable, AsyncIterator
from dataclasses import dataclass
from enum import Enum
import asyncio
import time
from datetime import datetime
import uuid
from collections import deque

# ============ 枚举定义 ============

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

# ============ 数据结构 ============

@dataclass
class Task:
    """任务"""
    id: str
    name: str
    func: Callable
    args: tuple = ()
    kwargs: dict = None
    status: TaskStatus = TaskStatus.PENDING
    result: Any = None
    error: Optional[str] = None
    created_at: datetime = None
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None

    def __post_init__(self):
        if self.created_at is None:
            self.created_at = datetime.now()
        if self.kwargs is None:
            self.kwargs = {}

@dataclass
class Batch:
    """批次"""
    id: str
    tasks: List[Task]
    max_size: int
    timeout: float
    created_at: datetime = None

    def __post_init__(self):
        if self.created_at is None:
            self.created_at = datetime.now()

    def is_full(self) -> bool:
        return len(self.tasks) >= self.max_size

    def is_timeout(self) -> bool:
        return (datetime.now() - self.created_at).total_seconds() > self.timeout

# ============ 异步任务队列 ============

class AsyncTaskQueue:
    """异步任务队列"""

    def __init__(self, max_concurrent: int = 10):
        self.queue: asyncio.Queue = asyncio.Queue()
        self.running_tasks: Dict[str, Task] = {}
        self.completed_tasks: Dict[str, Task] = {}
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.worker_task: Optional[asyncio.Task] = None

    async def submit(
        self,
        name: str,
        func: Callable,
        *args,
        **kwargs
    ) -> str:
        """提交任务"""
        task = Task(
            id=str(uuid.uuid4()),
            name=name,
            func=func,
            args=args,
            kwargs=kwargs
        )

        await self.queue.put(task)
        return task.id

    async def get_result(self, task_id: str, timeout: Optional[float] = None) -> Optional[Task]:
        """获取任务结果"""
        # 先检查已完成任务
        if task_id in self.completed_tasks:
            return self.completed_tasks[task_id]

        # 等待任务完成
        start = time.time()
        while True:
            if task_id in self.completed_tasks:
                return self.completed_tasks[task_id]

            if timeout and (time.time() - start) > timeout:
                return None

            await asyncio.sleep(0.1)

    async def start_worker(self):
        """启动工作线程"""
        if self.worker_task is None:
            self.worker_task = asyncio.create_task(self._worker())

    async def stop_worker(self):
        """停止工作工作线程"""
        if self.worker_task:
            self.worker_task.cancel()
            try:
                await self.worker_task
            except asyncio.CancelledError:
                pass
            self.worker_task = None

    async def _worker(self):
        """工作协程"""
        while True:
            try:
                task = await self.queue.get()

                async with self.semaphore:
                    # 执行任务
                    task.status = TaskStatus.RUNNING
                    task.started_at = datetime.now()
                    self.running_tasks[task.id] = task

                    try:
                        if asyncio.iscoroutinefunction(task.func):
                            result = await task.func(*task.args, **task.kwargs)
                        else:
                            result = task.func(*task.args, **task.kwargs)

                        task.result = result
                        task.status = TaskStatus.COMPLETED
                    except Exception as e:
                        task.error = str(e)
                        task.status = TaskStatus.FAILED
                    finally:
                        task.completed_at = datetime.now()
                        self.completed_tasks[task.id] = task
                        if task.id in self.running_tasks:
                            del self.running_tasks[task.id]

            except asyncio.CancelledError:
                break

# ============ 流式响应 ============

class StreamResponse:
    """流式响应"""

    def __init__(self, chunk_size: int = 10):
        self.chunk_size = chunk_size
        self.queue: asyncio.Queue = asyncio.Queue()

    async def generate(self, text: str) -> AsyncIterator[str]:
        """生成流式响应"""
        # 模拟流式输出
        for i in range(0, len(text), self.chunk_size):
            chunk = text[i:i + self.chunk_size]
            yield chunk
            await asyncio.sleep(0.05)  # 模拟延迟

    async def stream(self, text: str) -> AsyncIterator[str]:
        """流式输出"""
        async for chunk in self.generate(text):
            yield chunk

# ============ 批量处理器 ============

class BatchProcessor:
    """批量处理器"""

    def __init__(self, max_batch_size: int = 10, timeout: float = 1.0):
        self.max_batch_size = max_batch_size
        self.timeout = timeout
        self.current_batch: Optional[Batch] = None
        self.processed_batches: List[Batch] = []
        self.lock = asyncio.Lock()

    async def add_task(self, task: Task) -> str:
        """添加任务"""
        async with self.lock:
            if self.current_batch is None:
                self.current_batch = Batch(
                    id=str(uuid.uuid4()),
                    tasks=[],
                    max_size=self.max_batch_size,
                    timeout=self.timeout
                )

            self.current_batch.tasks.append(task)

            # 检查是否需要处理
            if self.current_batch.is_full():
                await self._process_batch()
                return task.id

            # 设置定时器检查超时
            asyncio.create_task(self._check_timeout())

            return task.id

    async def _check_timeout(self):
        """检查超时"""
        await asyncio.sleep(self.timeout)

        async with self.lock:
            if self.current_batch and self.current_batch.is_timeout():
                await self._process_batch()

    async def _process_batch(self):
        """处理批次"""
        if not self.current_batch:
            return

        batch = self.current_batch
        self.processed_batches.append(batch)

        # 处理批次中的任务
        for task in batch.tasks:
            try:
                if asyncio.iscoroutinefunction(task.func):
                    task.result = await task.func(*task.args, **task.kwargs)
                else:
                    task.result = task.func(*task.args, **task.kwargs)
                task.status = TaskStatus.COMPLETED
            except Exception as e:
                task.error = str(e)
                task.status = TaskStatus(FAILED)

        # 重置
        self.current_batch = None

# ============ 使用示例 ============

if __name__ == "__main__":
    async def demo_async_queue():
        """演示异步任务队列"""
        print("=" * 60)
        print("异步任务队列演示")
        print("=" * 60)

        queue = AsyncTaskQueue(max_concurrent=3)
        await queue.start_worker()

        # 提交任务
        def compute(x: int) -> int:
            print(f"计算 {x}...")
            time.sleep(1)
            return x * x

        task_ids = []
        for i in range(5):
            task_id = await queue.submit(f"compute_{i}", compute, i)
            task_ids.append(task_id)
            print(f"提交任务 {task_id}")

        # 获取结果
        print("\n等待结果...")
        for task_id in task_ids:
            task = await queue.get_result(task_id)
            print(f"任务 {task_id}: {task.result}")

        await queue.stop_worker()

    async def demo_stream():
        """演示流式响应"""
        print("\n" + "=" * 60)
        print("流式响应演示")
        print("=" * 60)

        stream = StreamResponse()
        text = "这是一个很长的文本,我们通过流式方式逐块输出。"

        print("\n流式输出:")
        async for chunk in stream.stream(text):
            print(chunk, end="", flush=True)

        print()  # 换行

    async def demo_batch():
        """演示批量处理"""
        print("\n" + "=" * 60)
        print("批量处理演示")
        print("=" * 60)

        processor = BatchProcessor(max_batch_size=3, timeout=2.0)

        def process(x: int) -> int:
            print(f"处理 {x}...")
            return x * 2

        # 添加任务
        task_ids = []
        for i in range(5):
            task = Task(id=str(i), name=f"task_{i}", func=process, args=(i,))
            task_id = await processor.add_task(task)
            task_ids.append(task_id)

        # 等待处理
        await asyncio.sleep(3)

        print(f"\n处理了 {len(processor.processed_batches)} 个批次")

    async def main():
        await demo_async_queue()
        await demo_stream()
        await demo_batch()

    asyncio.run(main())

常见坑与解决方案

8.1 缓存穿透

问题:
大量不存在的请求导致缓存失效,直接打到数据库

解决方案:

class CacheWithBloomFilter:
    def __init__(self):
        self.cache = {}
        self.bloom_filter = BloomFilter(10000)

    def get(self, key: str):
        # 先查布隆过滤器
        if not self.bloom_filter.contains(key):
            return None  # 肯定不存在

        # 再查缓存
        return self.cache.get(key)

8.2 级联路由的"成本陷阱"

问题:
级联路由可能多次调用,反而增加成本

解决方案:

def smart_cascade(query: str) -> ModelResult:
    # 先用很小且很快的模型判断复杂度
    complexity = judge_complexity(query)

    # 直接跳到合适的层级
    if complexity == "simple":
        return call_model("tiny")
    elif complexity == "medium":
        return call_model("medium")
    else:
        return call_model("large")

面试高频问法

Q1: 如何优化 Agent 系统的性能?

标准回答:

性能优化六维度:

1. 缓存策略
   - 精确缓存:相同输入直接返回
   - 语义缓存:相似输入复用结果
   - 分布式缓存:Redis/Memcached
   - 缓存预热:热点数据预加载

2. 模型路由
   - 级联路由:小模型优先,升级按需
   - 成本路由:预算约束最优选择
   - 置信度路由:质量不足才升级
   - 混合策略:根据场景组合

3. 异步处理
   - 流式响应:边生成边返回
   - 任务队列:后台异步执行
   - 批量处理:聚合请求降低成本
   - 并行请求:独立的任务并行执行

4. 提示词优化
   - 压缩 prompt:去除冗余
   - 结构化输出:减少 tokens
   - Few-shot 优化:精简示例

5. 模型选择
   - 任务分类:简单任务小模型
   - 知识蒸馏:大模型训练小模型
   - 量化:FP16 → INT8

6. 基础设施
   - 区域部署:就近访问
   - 边缘计算:响应更快
   - 负载均衡:水平扩展

Q2: 如何监控 Agent 系统的健康状况?

标准回答:

监控指标体系:

1. 性能指标
   - P50/P95/P99 延迟
   - QPS(每秒请求数)
   - 并发连接数
   - 错误率

2. 成本指标
   - Token 使用量
   - API 调用费用
   - 模型分布占比

3. 质量指标
   - 模型置信度
   - 用户满意度
   - 任务完成率

4. 缓存指标
   - 缓存命中率
   - 缓存大小
   - 缓存过期数

5. 安全指标
   - 攻击检测数
   - 封禁用户数
   - 合规违规数

告警策略:
- 错误率 > 5% 立即告警
- P99 延迟 > 5s 告警
- 成本超预算告警

记忆要点

性能优化CSM:
Cache(缓存)
Semantic(语义)
Model(模型路由)

模型路由四策:
Simple(简单规则)
Confidence(置信度)
Cascading(级联)
Cost-Aware(成本)

异步四模式:
Queue(队列)
Stream(流)
Batch(批处理)
Background(后台)

最小 Demo

见上文完整实现


实战场景

场景:高性能企业助手

需求:

  • 低延迟响应
  • 成本可控
  • 高并发支持

实现:

class HighPerformanceAssistant:
    def __init__(self):
        self.cache = HybridCache(...)
        self.router = ModelRouter(...)
        self.queue = AsyncTaskQueue(...)
        self.stream = StreamResponse(...)

    async def process(self, query: str) -> AsyncIterator[str]:
        # 1. 查缓存
        cached = self.cache.get(query)
        if cached:
            yield cached
            return

        # 2. 模型路由
        decision = self.router.route(query)

        # 3. 流式响应
        async for chunk in self.stream.stream(query: str, model=decision.model):
            # 4. 存缓存
            self.cache.put(query, chunk)
            yield chunk

文档版本: 1.0

close
arrow_upward