【AI Agent 知识库】19-LLM应用部署与运维

内容纲要

LLM应用部署与运维

目录

  1. 部署架构设计
  2. Docker容器化部署
  3. Kubernetes编排
  4. 监控与告警
  5. CI/CD流程
  6. 性能优化
  7. 高可用设计
  8. 成本优化
  9. 安全运维
  10. 故障排查

1. 部署架构设计

1.1 典型架构组件

┌─────────────────────────────────────────────────────────┐
│                        负载均衡层                          │
│                    Nginx / ALB / SLB                     │
└───────────────────────┬─────────────────────────────────┘
                        │
┌───────────────────────┴─────────────────────────────────┐
│                      API网关层                            │
│                  Kong / Traefik / API Gateway            │
│         (鉴权、限流、路由、协议转换)                        │
└───────────────────────┬─────────────────────────────────┘
                        │
        ┌───────────────┼───────────────┐
        │               │               │
┌───────┴──────┐ ┌──────┴──────┐ ┌──────┴──────┐
│   Agent服务    │ │  检索服务    │ │  工具服务    │
│   (Python)    │ │  (RAG)     │ │  (Go/Rust) │
└───────┬──────┘ └──────┬──────┘ └──────┬──────┘
        │              │              │
┌───────┴──────────────┴──────────────┴──────┐
│               消息队列 / 缓存层                │
│         Redis / Kafka / RabbitMQ            │
└───────┬─────────────────────────────────────┘
        │
┌───────┴─────────────────────────────────────┐
│              向量数据库层                      │
│       Milvus / Qdrant / Pinecone            │
└───────┬─────────────────────────────────────┘
        │
┌───────┴─────────────────────────────────────┐
│              外部LLM服务                      │
│         OpenAI / Claude / 本地模型            │
└─────────────────────────────────────────────┘

1.2 服务拆分策略

服务类型 推荐语言 职责 部署方式
Agent核心服务 Python 对话管理、任务编排 StatefulSet + PodDisruptionBudget
检索服务 Python 向量检索、重排序 Deployment + HPA
工具执行服务 Go/Rust 高性能工具调用 Deployment
模型推理服务 Python/C++ 本地模型推理 Deployment + GPU
管理后台 TypeScript/Go 配置管理、日志查询 Deployment

1.3 资源规划

# 示例:Agent服务资源规格
resources:
  requests:
    cpu: "2"
    memory: "4Gi"
  limits:
    cpu: "4"
    memory: "8Gi"

# 本地模型推理服务资源
gpu_resources:
  requests:
    nvidia.com/gpu: "1"
    cpu: "4"
    memory: "16Gi"
  limits:
    nvidia.com/gpu: "1"
    cpu: "8"
    memory: "32Gi"

2. Docker容器化部署

2.1 基础镜像选择

# Python服务 - 使用多阶段构建
FROM python:3.11-slim as builder

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt

FROM python:3.11-slim

WORKDIR /app
COPY --from=builder /root/.local /root/.local
COPY . .

ENV PATH=/root/.local/bin:$PATH
ENV PYTHONUNBUFFERED=1

EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# Go服务
FROM golang:1.21-alpine AS builder

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o server

FROM alpine:latest

RUN apk --no-cache add ca-certificates tzdata
WORKDIR /root/

COPY --from=builder /app/server .
EXPOSE 8080
CMD ["./server"]
# 带GPU的推理服务
FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04

RUN apt-get update && apt-get install -y \
    python3.11 \
    python3-pip \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app
COPY requirements.txt .
RUN pip3 install --no-cache-dir torch transformers vllm

COPY . .
EXPOSE 8000
CMD ["python3", "serve.py"]

2.2 Docker Compose开发环境

version: '3.8'

services:
  agent:
    build: ./agent
    ports:
      - "8000:8000"
    environment:
      - REDIS_URL=redis://redis:6379
      - VECTORDB_URL=http://vector:8080
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    volumes:
      - ./agent:/app
      - /tmp/cache:/tmp/cache
    depends_on:
      - redis
      - vector

  retriever:
    build: ./retriever
    ports:
      - "8001:8000"
    environment:
      - REDIS_URL=redis://redis:6379
      - VECTORDB_URL=http://vector:8080
    depends_on:
      - redis
      - vector

  vector:
    image: qdrant/qdrant:latest
    ports:
      - "6333:6333"
    volumes:
      - qdrant_data:/qdrant/storage

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes

  kafka:
    image: confluentinc/cp-kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    volumes:
      - grafana_data:/var/lib/grafana

volumes:
  qdrant_data:
  redis_data:
  grafana_data:

2.3 健康检查

HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8000/health || exit 1
# FastAPI健康检查端点
from fastapi import FastAPI
from fastapi.responses import JSONResponse

app = FastAPI()

@app.get("/health")
async def health_check():
    try:
        # 检查依赖服务
        # await check_redis()
        # await check_vector_db()
        return JSONResponse({"status": "healthy"}, status_code=200)
    except Exception as e:
        return JSONResponse(
            {"status": "unhealthy", "error": str(e)},
            status_code=503
        )

3. Kubernetes编排

3.1 核心资源配置

apiVersion: apps/v1
kind: Deployment
metadata:
  name: agent-service
  namespace: llm-app
spec:
  replicas: 3
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0
  selector:
    matchLabels:
      app: agent-service
  template:
    metadata:
      labels:
        app: agent-service
        version: v1.0.0
    spec:
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 100
            podAffinityTerm:
              labelSelector:
                matchExpressions:
                - key: app
                  operator: In
                  values:
                  - agent-service
              topologyKey: kubernetes.io/hostname
      containers:
      - name: agent
        image: registry.example.com/llm/agent:latest
        ports:
        - containerPort: 8000
        resources:
          requests:
            cpu: "2"
            memory: "4Gi"
          limits:
            cpu: "4"
            memory: "8Gi"
        env:
        - name: REDIS_URL
          valueFrom:
            secretKeyRef:
              name: app-secrets
              key: redis-url
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: app-secrets
              key: openai-api-key
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

3.2 HPA自动扩缩容

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: agent-service-hpa
  namespace: llm-app
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: agent-service
  minReplicas: 3
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  - type: Pods
    pods:
      metric:
        name: concurrent_requests
      target:
        type: AverageValue
        averageValue: "100"
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 100
        periodSeconds: 60
      - type: Pods
        value: 4
        periodSeconds: 60
      selectPolicy: Max
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 50
        periodSeconds: 120

3.3 StatefulSet部署(有状态服务)

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: vector-db
  namespace: llm-app
spec:
  serviceName: vector-db
  replicas: 3
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 100Gi
  podManagementPolicy: Parallel
  template:
    spec:
      containers:
      - name: qdrant
        image: qdrant/qdrant:latest
        ports:
        - containerPort: 6333
        volumeMounts:
        - name: data
          mountPath: /qdrant/storage
        env:
        - name: QDRANT__SERVICE__GRPC_PORT
          value: "6334"

3.4 ConfigMap配置管理

apiVersion: v1
kind: ConfigMap
metadata:
  name: agent-config
  namespace: llm-app
data:
  config.yaml: |
    agent:
      name: "production-agent"
      model: "gpt-4"
      max_tokens: 4096
      temperature: 0.7

    retriever:
      top_k: 10
      score_threshold: 0.7
      rerank_model: "cohere/rerank-multilingual-v2"

    cache:
      enabled: true
      ttl: 3600
      max_size: 10000

    rate_limit:
      requests_per_minute: 60
      tokens_per_minute: 100000

3.5 Secret管理

apiVersion: v1
kind: Secret
metadata:
  name: app-secrets
  namespace: llm-app
type: Opaque
stringData:
  openai-api-key: "sk-..."
  redis-url: "redis://redis:6379"
  vector-db-url: "http://vector-db:6333"

3.6 Service和Ingress

apiVersion: v1
kind: Service
metadata:
  name: agent-service
  namespace: llm-app
spec:
  selector:
    app: agent-service
  ports:
  - port: 80
    targetPort: 8000
  type: ClusterIP

---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: agent-ingress
  namespace: llm-app
  annotations:
    nginx.ingress.kubernetes.io/rate-limit: "100"
    nginx.ingress.kubernetes.io/proxy-read-timeout: "300"
    nginx.ingress.kubernetes.io/proxy-send-timeout: "300"
spec:
  ingressClassName: nginx
  rules:
  - host: api.example.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: agent-service
            port:
              number: 80
  tls:
  - hosts:
    - api.example.com
    secretName: tls-secret

4. 监控与告警

4.1 Prometheus指标采集

from prometheus_client import Counter, Histogram, Gauge, Summary

# 业务指标
request_counter = Counter(
    'agent_requests_total',
    'Total number of agent requests',
    ['endpoint', 'status']
)

request_duration = Histogram(
    'agent_request_duration_seconds',
    'Request duration in seconds',
    ['endpoint']
)

active_connections = Gauge(
    'agent_active_connections',
    'Number of active connections'
)

token_usage = Counter(
    'agent_token_usage_total',
    'Total token usage',
    ['model', 'type']  # type: input/output
)

cache_hits = Counter(
    'agent_cache_hits_total',
    'Total cache hits',
    ['cache_type']
)

retrieval_latency = Histogram(
    'agent_retrieval_latency_seconds',
    'Retrieval latency in seconds',
    ['retriever_type']
)

tool_execution_time = Histogram(
    'agent_tool_execution_seconds',
    'Tool execution time in seconds',
    ['tool_name']
)

4.2 中间件集成

from fastapi import FastAPI, Request
import time

app = FastAPI()

@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    duration = time.time() - start_time

    endpoint = request.url.path
    status = response.status_code

    request_counter.labels(endpoint=endpoint, status=status).inc()
    request_duration.labels(endpoint=endpoint).observe(duration)

    return response

4.3 Grafana Dashboard配置

{
  "title": "LLM Agent Dashboard",
  "panels": [
    {
      "title": "Request Rate",
      "targets": [
        {
          "expr": "rate(agent_requests_total[5m])"
        }
      ],
      "type": "graph"
    },
    {
      "title": "Request Duration (P95)",
      "targets": [
        {
          "expr": "histogram_quantile(0.95, rate(agent_request_duration_seconds_bucket[5m]))"
        }
      ],
      "type": "graph"
    },
    {
      "title": "Token Usage",
      "targets": [
        {
          "expr": "sum(rate(agent_token_usage_total[1h])) by (model)"
        }
      ],
      "type": "graph"
    },
    {
      "title": "Cache Hit Rate",
      "targets": [
        {
          "expr": "rate(agent_cache_hits_total[5m]) / rate(agent_requests_total[5m])"
        }
      ],
      "type": "gauge"
    },
    {
      "title": "Retrieval Latency",
      "targets": [
        {
          "expr": "histogram_quantile(0.95, rate(agent_retrieval_latency_seconds_bucket[5m]))"
        }
      ],
      "type": "graph"
    }
  ]
}

4.4 告警规则

groups:
- name: llm_agent_alerts
  rules:
  - alert: HighErrorRate
    expr: |
      rate(agent_requests_total{status="5xx"}[5m]) /
      rate(agent_requests_total[5m]) > 0.05
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "High error rate detected"
      description: "Error rate is {{ $value | humanizePercentage }}"

  - alert: HighLatency
    expr: |
      histogram_quantile(0.95, rate(agent_request_duration_seconds_bucket[5m])) > 30
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "High request latency"
      description: "P95 latency is {{ $value }}s"

  - alert: LowCacheHitRate
    expr: |
      rate(agent_cache_hits_total[10m]) /
      rate(agent_requests_total[10m]) < 0.3
    for: 15m
    labels:
      severity: info
    annotations:
      summary: "Low cache hit rate"
      description: "Cache hit rate is {{ $value | humanizePercentage }}"

  - alert: HighTokenUsage
    expr: |
      sum(rate(agent_token_usage_total[1h])) > 1000000
    for: 1h
    labels:
      severity: warning
    annotations:
      summary: "High token usage detected"
      description: "Token usage rate is {{ $value }}/hour"

4.5 日志收集

import structlog
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

# 结构化日志配置
logger = structlog.get_logger()

# 示例:带追踪ID的日志
async def process_request(request_id: str):
    logger.info(
        "processing_request",
        request_id=request_id,
        trace_id=trace.get_current_span().get_span_context().trace_id,
    )

    try:
        result = await do_work()
        logger.info("request_completed", request_id=request_id, status="success")
        return result
    except Exception as e:
        logger.error(
            "request_failed",
            request_id=request_id,
            error=str(e),
            exc_info=True
        )
        raise

5. CI/CD流程

5.1 GitHub Actions配置

name: LLM Agent CI/CD

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

env:
  REGISTRY: registry.example.com
  IMAGE_NAME: llm/agent

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v4

    - name: Set up Python
      uses: actions/setup-python@v5
      with:
        python-version: '3.11'

    - name: Install dependencies
      run: |
        pip install -r requirements.txt
        pip install pytest pytest-cov

    - name: Run tests
      run: |
        pytest --cov=app --cov-report=xml --cov-report=html

    - name: Upload coverage
      uses: codecov/codecov-action@v3
      with:
        file: ./coverage.xml

  build:
    needs: test
    runs-on: ubuntu-latest
    if: github.event_name == 'push'
    steps:
    - uses: actions/checkout@v4

    - name: Set up Docker Buildx
      uses: docker/setup-buildx-action@v3

    - name: Login to registry
      uses: docker/login-action@v3
      with:
        registry: ${{ env.REGISTRY }}
        username: ${{ secrets.REGISTRY_USERNAME }}
        password: ${{ secrets.REGISTRY_PASSWORD }}

    - name: Build and push
      uses: docker/build-push-action@v5
      with:
        context: .
        push: true
        tags: |
          ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
          ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }}
        cache-from: type=registry,ref=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
        cache-to: type=inline

  deploy-staging:
    needs: build
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/develop'
    environment: staging
    steps:
    - name: Deploy to staging
      run: |
        kubectl set image deployment/agent-service \
          agent=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }} \
          --namespace=llm-app-staging

  deploy-production:
    needs: build
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    environment: production
    steps:
    - name: Deploy to production
      run: |
        # 蓝绿部署
        kubectl set image deployment/agent-service-blue \
          agent=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }} \
          --namespace=llm-app-prod

        # 健康检查后切换流量
        kubectl wait --for=condition=available \
          deployment/agent-service-blue \
          --namespace=llm-app-prod --timeout=300s

        kubectl patch service agent-service -p '{"spec":{"selector":{"app":"agent-service-blue"}}}'

5.2 GitLab CI配置

stages:
  - test
  - build
  - deploy

variables:
  REGISTRY: registry.example.com
  IMAGE_NAME: llm/agent

test:
  stage: test
  image: python:3.11
  script:
    - pip install -r requirements.txt
    - pip install pytest pytest-cov
    - pytest --cov=app --cov-report=xml
  coverage: '/TOTAL.*\s+(\d+%)$/'
  artifacts:
    reports:
      coverage_report:
        coverage_format: cobertura
        path: coverage.xml

build:
  stage: build
  image: docker:24
  services:
    - docker:24-dind
  script:
    - docker login -u $REGISTRY_USERNAME -p $REGISTRY_PASSWORD $REGISTRY
    - docker build -t $REGISTRY/$IMAGE_NAME:$CI_COMMIT_SHA .
    - docker push $REGISTRY/$IMAGE_NAME:$CI_COMMIT_SHA
    - docker tag $REGISTRY/$IMAGE_NAME:$CI_COMMIT_SHA $REGISTRY/$IMAGE_NAME:latest
    - docker push $REGISTRY/$IMAGE_NAME:latest

deploy:
  stage: deploy
  image: bitnami/kubectl:latest
  environment:
    name: production
    url: https://api.example.com
  script:
    - kubectl config use-context $KUBE_CONTEXT
    - kubectl set image deployment/agent-service \
      agent=$REGISTRY/$IMAGE_NAME:$CI_COMMIT_SHA \
      --namespace=llm-app
  rules:
    - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH

5.3 蓝绿 Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: agent-service-blue
  namespace: llm-app
spec:
  replicas: 5
  selector:
    matchLabels:
      app: agent-service
      version: blue
  template:
    metadata:
      labels:
        app: agent-service
        version: blue
    spec:
      containers:
      - name: agent
        image: registry.example.com/llm/agent:blue-v1.0.0

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: agent-service-green
  namespace: llm-app
spec:
  replicas: 5
  selector:
    matchLabels:
      app: agent-service
      version: green
  template:
    metadata:
      labels:
        app: agent-service
        version: green
    spec:
      containers:
      - name: agent
        image: registry.example.com/llm/agent:green-v1.0.0

---
apiVersion: v1
kind: Service
metadata:
  name: agent-service
  namespace: llm-app
spec:
  selector:
    app: agent-service
    version: blue  # 切换到 green 即可
  ports:
  - port: 80
    targetPort: 8000

6. 性能优化

6.1 并发处理

import asyncio
from fastapi import FastAPI
from concurrent.futures import ThreadPoolExecutor

app = FastAPI()

# 线程池用于CPU密集型任务
executor = ThreadPoolExecutor(max_workers=10)

# 异步批处理
async def batch_retrieve(queries: List[str], batch_size: int = 10):
    results = []
    for i in range(0, len(queries), batch_size):
        batch = queries[i:i + batch_size]
        tasks = [retrieve_single(q) for q in batch]
        batch_results = await asyncio.gather(*tasks)
        results.extend(batch_results)
    return results

# 使用线程池处理同步操作
def process_sync(data):
    # CPU密集型操作
    return expensive_computation(data)

@app.post("/process")
async def process_endpoint(data: dict):
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(executor, process_sync, data)
    return result

6.2 连接池配置

import redis.asyncio as redis
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine

# Redis连接池
redis_pool = redis.ConnectionPool(
    host="localhost",
    port=6379,
    max_connections=100,
    decode_responses=True
)
redis_client = redis.Redis(connection_pool=redis_pool)

# HTTP客户端连接池
http_client = AsyncClient(
    limits=httpx.Limits(
        max_keepalive_connections=100,
        max_connections=200,
        keepalive_expiry=30
    ),
    timeout=httpx.Timeout(30.0)
)

# 数据库连接池
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=20,
    max_overflow=40,
    pool_pre_ping=True,
    pool_recycle=3600
)

6.3 缓存策略

from functools import lru_cache
from datetime import timedelta

# 多级缓存
class MultiLevelCache:
    def __init__(self):
        self.l1 = {}  # 内存缓存
        self.l2 = redis_client
        self.l1_size = 1000

    async def get(self, key: str):
        # L1缓存
        if key in self.l1:
            return self.l1[key]

        # L2缓存
        value = await self.l2.get(key)
        if value:
            self.l1[key] = value
            return value

        return None

    async def set(self, key: str, value: str, ttl: timedelta):
        # 写入L1和L2
        self.l1[key] = value
        await self.l2.setex(key, int(ttl.total_seconds()), value)

        # L1淘汰
        if len(self.l1) > self.l1_size:
            self.l1.pop(next(iter(self.l1)))

# 请求结果缓存
@lru_cache(maxsize=1000)
def cached_embedding(text: str) -> List[float]:
    return get_embedding(text)

6.4 批量处理

# 批量工具调用
async def batch_tool_calls(tools: List[ToolCall]):
    # 按工具分组
    grouped = {}
    for tool_call in tools:
        tool_name = tool_call.tool_name
        if tool_name not in grouped:
            grouped[tool_name] = []
        grouped[tool_name].append(tool_call)

    # 并发执行
    results = {}
    tasks = []
    for tool_name, calls in grouped.items():
        task = execute_batch(tool_name, calls)
        tasks.append(task)

    batch_results = await asyncio.gather(*tasks)

    return batch_results

# 批量向量检索
async def batch_search(queries: List[str], k: int = 10):
    embeddings = await get_embeddings_batch(queries)
    results = await vector_db.search_batch(embeddings, k=k)
    return results

7. 高可用设计

7.1 熔断器模式

from circuitbreaker import circuit

@circuit(failure_threshold=5, recovery_timeout=60)
async def call_llm_api(prompt: str):
    try:
        response = await openai_client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    except Exception as e:
        # 熔断器会自动记录失败
        raise

# 使用示例
try:
    result = await call_llm_api("Hello")
except CircuitBreakerError:
    # 熔断器开启,使用降级策略
    result = await fallback_response()

7.2 重试策略

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type((openai.APITimeoutError, openai.APIConnectionError))
)
async def reliable_llm_call(prompt: str):
    return await call_llm_api(prompt)

7.3 限流

from slowapi import Limiter
from slowapi.util import get_remote_address
from fastapi import Request, FastAPI

limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter

@app.post("/chat")
@limiter.limit("60/minute")
async def chat(request: Request, message: str):
    return await process_message(message)

# Token限流
from fastapi import HTTPException

class TokenRateLimiter:
    def __init__(self, tokens_per_minute: int):
        self.tokens = tokens_per_minute
        self.last_reset = time.time()
        self.used = 0

    async def acquire(self, tokens: int):
        now = time.time()
        if now - self.last_reset > 60:
            self.used = 0
            self.last_reset = now

        if self.used + tokens > self.tokens:
            raise HTTPException(status_code=429, detail="Rate limit exceeded")

        self.used += tokens

7.4 降级策略

class FallbackStrategy:
    async def get_response(self, query: str):
        try:
            # 主策略
            return await self.primary_response(query)
        except Exception as e:
            logger.warning("Primary failed, trying fallback", error=str(e))

            try:
                # 降级策略1:简化模型
                return await self.simplified_model_response(query)
            except Exception:
                # 降级策略2:缓存响应
                return await self.cached_response(query)

    async def simplified_model_response(self, query: str):
        # 使用更快的模型
        return await call_llm_api(query, model="gpt-3.5-turbo")

    async def cached_response(self, query: str):
        # 返回预定义或缓存响应
        return self.fallback_responses.get(query, "Sorry, I'm having trouble right now.")

8. 成本优化

8.1 Token使用追踪

class TokenTracker:
    def __init__(self):
        self.usage = defaultdict(lambda: {"input": 0, "output": 0})

    def track(self, model: str, input_tokens: int, output_tokens: int):
        self.usage[model]["input"] += input_tokens
        self.usage[model]["output"] += output_tokens

        # 发送到监控系统
        token_usage.labels(model=model, type="input").inc(input_tokens)
        token_usage.labels(model=model, type="output").inc(output_tokens)

    def get_cost(self):
        cost = 0
        for model, usage in self.usage.items():
            price = MODEL_PRICES.get(model, {})
            cost += (
                usage["input"] * price.get("input", 0) +
                usage["output"] * price.get("output", 0)
            )
        return cost

MODEL_PRICES = {
    "gpt-4": {"input": 0.03 / 1000, "output": 0.06 / 1000},
    "gpt-4-turbo": {"input": 0.01 / 1000, "output": 0.03 / 1000},
    "gpt-3.5-turbo": {"input": 0.0005 / 1000, "output": 0.0015 / 1000},
}

8.2 智能模型选择

async def select_model_for_task(task: dict) -> str:
    task_complexity = estimate_complexity(task)

    if task_complexity < 0.3:
        return "gpt-3.5-turbo"  # 简单任务
    elif task_complexity < 0.7:
        return "gpt-4-turbo"    # 中等任务
    else:
        return "gpt-4"          # 复杂任务

def estimate_complexity(task: dict) -> float:
    # 基于任务特征估算复杂度
    factors = []
    factors.append(len(task.get("description", "")) / 1000)
    factors.append(len(task.get("context", "")) / 5000)
    factors.append(len(task.get("tools", [])) / 10)

    return min(sum(factors), 1.0)

8.3 上下文压缩

async def compress_context(context: List[Document], max_tokens: int) -> List[Document]:
    total_tokens = sum(len(doc.content.split()) for doc in context)

    if total_tokens <= max_tokens:
        return context

    # 策略1:基于重要性排序
    scored = [(doc, calculate_relevance(doc)) for doc in context]
    scored.sort(key=lambda x: x[1], reverse=True)

    # 策略2:递归摘要
    compressed = []
    current_tokens = 0

    for doc, score in scored:
        if current_tokens + len(doc.content.split()) > max_tokens:
            break

        if len(doc.content.split()) > 1000:
            # 长文档进行摘要
            summary = await summarize_document(doc)
            compressed.append(Document(content=summary, metadata=doc.metadata))
            current_tokens += len(summary.split())
        else:
            compressed.append(doc)
            current_tokens += len(doc.content.split())

    return compressed

9. 安全运维

9.1 API密钥管理

from pydantic_settings import BaseSettings
from cryptography.fernet import Fernet
import os

class Settings(BaseSettings):
    openai_api_key: str
    anthropic_api_key: str

    # 使用密钥管理系统
    def __init__(self):
        super().__init__()
        self.openai_api_key = self.get_secret("OPENAI_API_KEY")
        self.anthropic_api_key = self.get_secret("ANTHROPIC_API_KEY")

    @staticmethod
    def get_secret(key: str) -> str:
        # 优先从环境变量读取
        if key in os.environ:
            return os.environ[key]

        # 从密钥管理服务读取
        return get_from_vault(key)

# 轮换密钥
class APIKeyRotator:
    def __init__(self):
        self.keys = []
        self.current_index = 0

    def rotate(self):
        self.current_index = (self.current_index + 1) % len(self.keys)
        return self.keys[self.current_index]

    def add_key(self, key: str):
        self.keys.append(key)

9.2 输入过滤

from fastapi import HTTPException
import re

class InputValidator:
    MAX_INPUT_LENGTH = 10000
    DISALLOWED_PATTERNS = [
        r'<script.*?>.*?</script>',
        r'javascript:',
        r'on\w+\s*=',
    ]

    def validate(self, input_text: str) -> bool:
        # 长度检查
        if len(input_text) > self.MAX_INPUT_LENGTH:
            raise HTTPException(
                status_code=400,
                detail="Input too long"
            )

        # 模式检查
        for pattern in self.DISALLOWED_PATTERNS:
            if re.search(pattern, input_text, re.IGNORECASE):
                raise HTTPException(
                    status_code=400,
                    detail="Invalid input pattern"
                )

        return True

9.3 审计日志

class AuditLogger:
    def __init__(self):
        self.logger = structlog.get_logger("audit")

    async def log_request(self, user_id: str, action: str, details: dict):
        self.logger.info(
            "audit_log",
            user_id=user_id,
            action=action,
            details=details,
            timestamp=datetime.utcnow().isoformat()
        )

    async def log_llm_call(
        self,
        user_id: str,
        model: str,
        input_tokens: int,
        output_tokens: int,
        cost: float
    ):
        await self.log_request(
            user_id,
            "llm_call",
            {
                "model": model,
                "input_tokens": input_tokens,
                "output_tokens": output_tokens,
                "cost": cost
            }
        )

10. 故障排查

10.1 常见问题诊断

class Diagnostic:
    @staticmethod
    async def check_system():
        results = {}

        # 检查Redis连接
        try:
            await redis_client.ping()
            results["redis"] = "OK"
        except Exception as e:
            results["redis"] = f"FAILED: {e}"

        # 检查向量数据库
        try:
            await vector_client.get_collections()
            results["vector_db"] = "OK"
        except Exception as e:
            results["vector_db"] = f"FAILED: {e}"

        # 检查LLM API
        try:
            await openai_client.models.list()
            results["llm_api"] = "OK"
        except Exception as e:
            results["llm_api"] = f"FAILED: {e}"

        return results

    @staticmethod
    async def check_performance():
        import psutil

        return {
            "cpu_percent": psutil.cpu_percent(),
            "memory_percent": psutil.virtual_memory().percent,
            "disk_usage": psutil.disk_usage("/").percent,
            "process_count": len(psutil.pids()),
        }

10.2 调试工具

import json
from functools import wraps

class DebugTools:
    @staticmethod
    def trace_function(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            print(f"Calling {func.__name__} with args={args}, kwargs={kwargs}")
            result = await func(*args, **kwargs)
            print(f"{func.__name__} returned: {result}")
            return result
        return wrapper

    @staticmethod
    def log_request_response(func):
        @wraps(func)
        async def wrapper(request, *args, **kwargs):
            print(f"Request: {request}")
            response = await func(request, *args, **kwargs)
            print(f"Response: {response}")
            return response
        return wrapper

# 使用示例
@DebugTools.trace_function
async def process_agent_request(request):
    # 函数逻辑
    pass

最佳实践总结

部署最佳实践

  1. 容器化:使用多阶段构建减小镜像体积
  2. 编排:利用Kubernetes的自动扩缩容和自愈能力
  3. 配置管理:将配置与代码分离,使用ConfigMap和Secret
  4. 健康检查:实现完善的liveness和readiness探针

监控最佳实践

  1. 指标采集:收集关键业务和技术指标
  2. 告警:设置合理的告警阈值和通知策略
  3. 日志:使用结构化日志,包含追踪ID
  4. 可视化:构建全面的监控仪表板

性能最佳实践

  1. 并发:充分利用异步编程和连接池
  2. 缓存:实现多级缓存策略
  3. 批量:批量处理减少API调用次数
  4. 压缩:优化上下文大小减少token消耗

可靠性最佳实践

  1. 熔断:防止级联故障
  2. 重试:对暂时性错误进行重试
  3. 降级:提供降级方案保证基本功能
  4. 限流:保护系统免受过载

安全最佳实践

  1. 密钥管理:使用密钥管理系统,定期轮换
  2. 输入验证:严格验证和过滤所有输入
  3. 审计:记录所有关键操作
  4. 最小权限:遵循最小权限原则
close
arrow_upward