内容纲要
LLM应用部署与运维
目录
1. 部署架构设计
1.1 典型架构组件
┌─────────────────────────────────────────────────────────┐
│ 负载均衡层 │
│ Nginx / ALB / SLB │
└───────────────────────┬─────────────────────────────────┘
│
┌───────────────────────┴─────────────────────────────────┐
│ API网关层 │
│ Kong / Traefik / API Gateway │
│ (鉴权、限流、路由、协议转换) │
└───────────────────────┬─────────────────────────────────┘
│
┌───────────────┼───────────────┐
│ │ │
┌───────┴──────┐ ┌──────┴──────┐ ┌──────┴──────┐
│ Agent服务 │ │ 检索服务 │ │ 工具服务 │
│ (Python) │ │ (RAG) │ │ (Go/Rust) │
└───────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
┌───────┴──────────────┴──────────────┴──────┐
│ 消息队列 / 缓存层 │
│ Redis / Kafka / RabbitMQ │
└───────┬─────────────────────────────────────┘
│
┌───────┴─────────────────────────────────────┐
│ 向量数据库层 │
│ Milvus / Qdrant / Pinecone │
└───────┬─────────────────────────────────────┘
│
┌───────┴─────────────────────────────────────┐
│ 外部LLM服务 │
│ OpenAI / Claude / 本地模型 │
└─────────────────────────────────────────────┘
1.2 服务拆分策略
| 服务类型 | 推荐语言 | 职责 | 部署方式 |
|---|---|---|---|
| Agent核心服务 | Python | 对话管理、任务编排 | StatefulSet + PodDisruptionBudget |
| 检索服务 | Python | 向量检索、重排序 | Deployment + HPA |
| 工具执行服务 | Go/Rust | 高性能工具调用 | Deployment |
| 模型推理服务 | Python/C++ | 本地模型推理 | Deployment + GPU |
| 管理后台 | TypeScript/Go | 配置管理、日志查询 | Deployment |
1.3 资源规划
# 示例:Agent服务资源规格
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
# 本地模型推理服务资源
gpu_resources:
requests:
nvidia.com/gpu: "1"
cpu: "4"
memory: "16Gi"
limits:
nvidia.com/gpu: "1"
cpu: "8"
memory: "32Gi"
2. Docker容器化部署
2.1 基础镜像选择
# Python服务 - 使用多阶段构建
FROM python:3.11-slim as builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt
FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /root/.local /root/.local
COPY . .
ENV PATH=/root/.local/bin:$PATH
ENV PYTHONUNBUFFERED=1
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# Go服务
FROM golang:1.21-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o server
FROM alpine:latest
RUN apk --no-cache add ca-certificates tzdata
WORKDIR /root/
COPY --from=builder /app/server .
EXPOSE 8080
CMD ["./server"]
# 带GPU的推理服务
FROM nvidia/cuda:12.1.0-runtime-ubuntu22.04
RUN apt-get update && apt-get install -y \
python3.11 \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip3 install --no-cache-dir torch transformers vllm
COPY . .
EXPOSE 8000
CMD ["python3", "serve.py"]
2.2 Docker Compose开发环境
version: '3.8'
services:
agent:
build: ./agent
ports:
- "8000:8000"
environment:
- REDIS_URL=redis://redis:6379
- VECTORDB_URL=http://vector:8080
- OPENAI_API_KEY=${OPENAI_API_KEY}
volumes:
- ./agent:/app
- /tmp/cache:/tmp/cache
depends_on:
- redis
- vector
retriever:
build: ./retriever
ports:
- "8001:8000"
environment:
- REDIS_URL=redis://redis:6379
- VECTORDB_URL=http://vector:8080
depends_on:
- redis
- vector
vector:
image: qdrant/qdrant:latest
ports:
- "6333:6333"
volumes:
- qdrant_data:/qdrant/storage
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
kafka:
image: confluentinc/cp-kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
volumes:
- grafana_data:/var/lib/grafana
volumes:
qdrant_data:
redis_data:
grafana_data:
2.3 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# FastAPI健康检查端点
from fastapi import FastAPI
from fastapi.responses import JSONResponse
app = FastAPI()
@app.get("/health")
async def health_check():
try:
# 检查依赖服务
# await check_redis()
# await check_vector_db()
return JSONResponse({"status": "healthy"}, status_code=200)
except Exception as e:
return JSONResponse(
{"status": "unhealthy", "error": str(e)},
status_code=503
)
3. Kubernetes编排
3.1 核心资源配置
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-service
namespace: llm-app
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
selector:
matchLabels:
app: agent-service
template:
metadata:
labels:
app: agent-service
version: v1.0.0
spec:
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- agent-service
topologyKey: kubernetes.io/hostname
containers:
- name: agent
image: registry.example.com/llm/agent:latest
ports:
- containerPort: 8000
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
env:
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: app-secrets
key: redis-url
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: app-secrets
key: openai-api-key
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
3.2 HPA自动扩缩容
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: agent-service-hpa
namespace: llm-app
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: agent-service
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: concurrent_requests
target:
type: AverageValue
averageValue: "100"
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 60
- type: Pods
value: 4
periodSeconds: 60
selectPolicy: Max
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 120
3.3 StatefulSet部署(有状态服务)
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: vector-db
namespace: llm-app
spec:
serviceName: vector-db
replicas: 3
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 100Gi
podManagementPolicy: Parallel
template:
spec:
containers:
- name: qdrant
image: qdrant/qdrant:latest
ports:
- containerPort: 6333
volumeMounts:
- name: data
mountPath: /qdrant/storage
env:
- name: QDRANT__SERVICE__GRPC_PORT
value: "6334"
3.4 ConfigMap配置管理
apiVersion: v1
kind: ConfigMap
metadata:
name: agent-config
namespace: llm-app
data:
config.yaml: |
agent:
name: "production-agent"
model: "gpt-4"
max_tokens: 4096
temperature: 0.7
retriever:
top_k: 10
score_threshold: 0.7
rerank_model: "cohere/rerank-multilingual-v2"
cache:
enabled: true
ttl: 3600
max_size: 10000
rate_limit:
requests_per_minute: 60
tokens_per_minute: 100000
3.5 Secret管理
apiVersion: v1
kind: Secret
metadata:
name: app-secrets
namespace: llm-app
type: Opaque
stringData:
openai-api-key: "sk-..."
redis-url: "redis://redis:6379"
vector-db-url: "http://vector-db:6333"
3.6 Service和Ingress
apiVersion: v1
kind: Service
metadata:
name: agent-service
namespace: llm-app
spec:
selector:
app: agent-service
ports:
- port: 80
targetPort: 8000
type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: agent-ingress
namespace: llm-app
annotations:
nginx.ingress.kubernetes.io/rate-limit: "100"
nginx.ingress.kubernetes.io/proxy-read-timeout: "300"
nginx.ingress.kubernetes.io/proxy-send-timeout: "300"
spec:
ingressClassName: nginx
rules:
- host: api.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: agent-service
port:
number: 80
tls:
- hosts:
- api.example.com
secretName: tls-secret
4. 监控与告警
4.1 Prometheus指标采集
from prometheus_client import Counter, Histogram, Gauge, Summary
# 业务指标
request_counter = Counter(
'agent_requests_total',
'Total number of agent requests',
['endpoint', 'status']
)
request_duration = Histogram(
'agent_request_duration_seconds',
'Request duration in seconds',
['endpoint']
)
active_connections = Gauge(
'agent_active_connections',
'Number of active connections'
)
token_usage = Counter(
'agent_token_usage_total',
'Total token usage',
['model', 'type'] # type: input/output
)
cache_hits = Counter(
'agent_cache_hits_total',
'Total cache hits',
['cache_type']
)
retrieval_latency = Histogram(
'agent_retrieval_latency_seconds',
'Retrieval latency in seconds',
['retriever_type']
)
tool_execution_time = Histogram(
'agent_tool_execution_seconds',
'Tool execution time in seconds',
['tool_name']
)
4.2 中间件集成
from fastapi import FastAPI, Request
import time
app = FastAPI()
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
endpoint = request.url.path
status = response.status_code
request_counter.labels(endpoint=endpoint, status=status).inc()
request_duration.labels(endpoint=endpoint).observe(duration)
return response
4.3 Grafana Dashboard配置
{
"title": "LLM Agent Dashboard",
"panels": [
{
"title": "Request Rate",
"targets": [
{
"expr": "rate(agent_requests_total[5m])"
}
],
"type": "graph"
},
{
"title": "Request Duration (P95)",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(agent_request_duration_seconds_bucket[5m]))"
}
],
"type": "graph"
},
{
"title": "Token Usage",
"targets": [
{
"expr": "sum(rate(agent_token_usage_total[1h])) by (model)"
}
],
"type": "graph"
},
{
"title": "Cache Hit Rate",
"targets": [
{
"expr": "rate(agent_cache_hits_total[5m]) / rate(agent_requests_total[5m])"
}
],
"type": "gauge"
},
{
"title": "Retrieval Latency",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(agent_retrieval_latency_seconds_bucket[5m]))"
}
],
"type": "graph"
}
]
}
4.4 告警规则
groups:
- name: llm_agent_alerts
rules:
- alert: HighErrorRate
expr: |
rate(agent_requests_total{status="5xx"}[5m]) /
rate(agent_requests_total[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }}"
- alert: HighLatency
expr: |
histogram_quantile(0.95, rate(agent_request_duration_seconds_bucket[5m])) > 30
for: 10m
labels:
severity: warning
annotations:
summary: "High request latency"
description: "P95 latency is {{ $value }}s"
- alert: LowCacheHitRate
expr: |
rate(agent_cache_hits_total[10m]) /
rate(agent_requests_total[10m]) < 0.3
for: 15m
labels:
severity: info
annotations:
summary: "Low cache hit rate"
description: "Cache hit rate is {{ $value | humanizePercentage }}"
- alert: HighTokenUsage
expr: |
sum(rate(agent_token_usage_total[1h])) > 1000000
for: 1h
labels:
severity: warning
annotations:
summary: "High token usage detected"
description: "Token usage rate is {{ $value }}/hour"
4.5 日志收集
import structlog
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
# 结构化日志配置
logger = structlog.get_logger()
# 示例:带追踪ID的日志
async def process_request(request_id: str):
logger.info(
"processing_request",
request_id=request_id,
trace_id=trace.get_current_span().get_span_context().trace_id,
)
try:
result = await do_work()
logger.info("request_completed", request_id=request_id, status="success")
return result
except Exception as e:
logger.error(
"request_failed",
request_id=request_id,
error=str(e),
exc_info=True
)
raise
5. CI/CD流程
5.1 GitHub Actions配置
name: LLM Agent CI/CD
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
env:
REGISTRY: registry.example.com
IMAGE_NAME: llm/agent
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run tests
run: |
pytest --cov=app --cov-report=xml --cov-report=html
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
build:
needs: test
runs-on: ubuntu-latest
if: github.event_name == 'push'
steps:
- uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ secrets.REGISTRY_USERNAME }}
password: ${{ secrets.REGISTRY_PASSWORD }}
- name: Build and push
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }}
cache-from: type=registry,ref=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
cache-to: type=inline
deploy-staging:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/develop'
environment: staging
steps:
- name: Deploy to staging
run: |
kubectl set image deployment/agent-service \
agent=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }} \
--namespace=llm-app-staging
deploy-production:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
environment: production
steps:
- name: Deploy to production
run: |
# 蓝绿部署
kubectl set image deployment/agent-service-blue \
agent=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }} \
--namespace=llm-app-prod
# 健康检查后切换流量
kubectl wait --for=condition=available \
deployment/agent-service-blue \
--namespace=llm-app-prod --timeout=300s
kubectl patch service agent-service -p '{"spec":{"selector":{"app":"agent-service-blue"}}}'
5.2 GitLab CI配置
stages:
- test
- build
- deploy
variables:
REGISTRY: registry.example.com
IMAGE_NAME: llm/agent
test:
stage: test
image: python:3.11
script:
- pip install -r requirements.txt
- pip install pytest pytest-cov
- pytest --cov=app --cov-report=xml
coverage: '/TOTAL.*\s+(\d+%)$/'
artifacts:
reports:
coverage_report:
coverage_format: cobertura
path: coverage.xml
build:
stage: build
image: docker:24
services:
- docker:24-dind
script:
- docker login -u $REGISTRY_USERNAME -p $REGISTRY_PASSWORD $REGISTRY
- docker build -t $REGISTRY/$IMAGE_NAME:$CI_COMMIT_SHA .
- docker push $REGISTRY/$IMAGE_NAME:$CI_COMMIT_SHA
- docker tag $REGISTRY/$IMAGE_NAME:$CI_COMMIT_SHA $REGISTRY/$IMAGE_NAME:latest
- docker push $REGISTRY/$IMAGE_NAME:latest
deploy:
stage: deploy
image: bitnami/kubectl:latest
environment:
name: production
url: https://api.example.com
script:
- kubectl config use-context $KUBE_CONTEXT
- kubectl set image deployment/agent-service \
agent=$REGISTRY/$IMAGE_NAME:$CI_COMMIT_SHA \
--namespace=llm-app
rules:
- if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
5.3 蓝绿 Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-service-blue
namespace: llm-app
spec:
replicas: 5
selector:
matchLabels:
app: agent-service
version: blue
template:
metadata:
labels:
app: agent-service
version: blue
spec:
containers:
- name: agent
image: registry.example.com/llm/agent:blue-v1.0.0
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-service-green
namespace: llm-app
spec:
replicas: 5
selector:
matchLabels:
app: agent-service
version: green
template:
metadata:
labels:
app: agent-service
version: green
spec:
containers:
- name: agent
image: registry.example.com/llm/agent:green-v1.0.0
---
apiVersion: v1
kind: Service
metadata:
name: agent-service
namespace: llm-app
spec:
selector:
app: agent-service
version: blue # 切换到 green 即可
ports:
- port: 80
targetPort: 8000
6. 性能优化
6.1 并发处理
import asyncio
from fastapi import FastAPI
from concurrent.futures import ThreadPoolExecutor
app = FastAPI()
# 线程池用于CPU密集型任务
executor = ThreadPoolExecutor(max_workers=10)
# 异步批处理
async def batch_retrieve(queries: List[str], batch_size: int = 10):
results = []
for i in range(0, len(queries), batch_size):
batch = queries[i:i + batch_size]
tasks = [retrieve_single(q) for q in batch]
batch_results = await asyncio.gather(*tasks)
results.extend(batch_results)
return results
# 使用线程池处理同步操作
def process_sync(data):
# CPU密集型操作
return expensive_computation(data)
@app.post("/process")
async def process_endpoint(data: dict):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, process_sync, data)
return result
6.2 连接池配置
import redis.asyncio as redis
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine
# Redis连接池
redis_pool = redis.ConnectionPool(
host="localhost",
port=6379,
max_connections=100,
decode_responses=True
)
redis_client = redis.Redis(connection_pool=redis_pool)
# HTTP客户端连接池
http_client = AsyncClient(
limits=httpx.Limits(
max_keepalive_connections=100,
max_connections=200,
keepalive_expiry=30
),
timeout=httpx.Timeout(30.0)
)
# 数据库连接池
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20,
max_overflow=40,
pool_pre_ping=True,
pool_recycle=3600
)
6.3 缓存策略
from functools import lru_cache
from datetime import timedelta
# 多级缓存
class MultiLevelCache:
def __init__(self):
self.l1 = {} # 内存缓存
self.l2 = redis_client
self.l1_size = 1000
async def get(self, key: str):
# L1缓存
if key in self.l1:
return self.l1[key]
# L2缓存
value = await self.l2.get(key)
if value:
self.l1[key] = value
return value
return None
async def set(self, key: str, value: str, ttl: timedelta):
# 写入L1和L2
self.l1[key] = value
await self.l2.setex(key, int(ttl.total_seconds()), value)
# L1淘汰
if len(self.l1) > self.l1_size:
self.l1.pop(next(iter(self.l1)))
# 请求结果缓存
@lru_cache(maxsize=1000)
def cached_embedding(text: str) -> List[float]:
return get_embedding(text)
6.4 批量处理
# 批量工具调用
async def batch_tool_calls(tools: List[ToolCall]):
# 按工具分组
grouped = {}
for tool_call in tools:
tool_name = tool_call.tool_name
if tool_name not in grouped:
grouped[tool_name] = []
grouped[tool_name].append(tool_call)
# 并发执行
results = {}
tasks = []
for tool_name, calls in grouped.items():
task = execute_batch(tool_name, calls)
tasks.append(task)
batch_results = await asyncio.gather(*tasks)
return batch_results
# 批量向量检索
async def batch_search(queries: List[str], k: int = 10):
embeddings = await get_embeddings_batch(queries)
results = await vector_db.search_batch(embeddings, k=k)
return results
7. 高可用设计
7.1 熔断器模式
from circuitbreaker import circuit
@circuit(failure_threshold=5, recovery_timeout=60)
async def call_llm_api(prompt: str):
try:
response = await openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
except Exception as e:
# 熔断器会自动记录失败
raise
# 使用示例
try:
result = await call_llm_api("Hello")
except CircuitBreakerError:
# 熔断器开启,使用降级策略
result = await fallback_response()
7.2 重试策略
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((openai.APITimeoutError, openai.APIConnectionError))
)
async def reliable_llm_call(prompt: str):
return await call_llm_api(prompt)
7.3 限流
from slowapi import Limiter
from slowapi.util import get_remote_address
from fastapi import Request, FastAPI
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
@app.post("/chat")
@limiter.limit("60/minute")
async def chat(request: Request, message: str):
return await process_message(message)
# Token限流
from fastapi import HTTPException
class TokenRateLimiter:
def __init__(self, tokens_per_minute: int):
self.tokens = tokens_per_minute
self.last_reset = time.time()
self.used = 0
async def acquire(self, tokens: int):
now = time.time()
if now - self.last_reset > 60:
self.used = 0
self.last_reset = now
if self.used + tokens > self.tokens:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
self.used += tokens
7.4 降级策略
class FallbackStrategy:
async def get_response(self, query: str):
try:
# 主策略
return await self.primary_response(query)
except Exception as e:
logger.warning("Primary failed, trying fallback", error=str(e))
try:
# 降级策略1:简化模型
return await self.simplified_model_response(query)
except Exception:
# 降级策略2:缓存响应
return await self.cached_response(query)
async def simplified_model_response(self, query: str):
# 使用更快的模型
return await call_llm_api(query, model="gpt-3.5-turbo")
async def cached_response(self, query: str):
# 返回预定义或缓存响应
return self.fallback_responses.get(query, "Sorry, I'm having trouble right now.")
8. 成本优化
8.1 Token使用追踪
class TokenTracker:
def __init__(self):
self.usage = defaultdict(lambda: {"input": 0, "output": 0})
def track(self, model: str, input_tokens: int, output_tokens: int):
self.usage[model]["input"] += input_tokens
self.usage[model]["output"] += output_tokens
# 发送到监控系统
token_usage.labels(model=model, type="input").inc(input_tokens)
token_usage.labels(model=model, type="output").inc(output_tokens)
def get_cost(self):
cost = 0
for model, usage in self.usage.items():
price = MODEL_PRICES.get(model, {})
cost += (
usage["input"] * price.get("input", 0) +
usage["output"] * price.get("output", 0)
)
return cost
MODEL_PRICES = {
"gpt-4": {"input": 0.03 / 1000, "output": 0.06 / 1000},
"gpt-4-turbo": {"input": 0.01 / 1000, "output": 0.03 / 1000},
"gpt-3.5-turbo": {"input": 0.0005 / 1000, "output": 0.0015 / 1000},
}
8.2 智能模型选择
async def select_model_for_task(task: dict) -> str:
task_complexity = estimate_complexity(task)
if task_complexity < 0.3:
return "gpt-3.5-turbo" # 简单任务
elif task_complexity < 0.7:
return "gpt-4-turbo" # 中等任务
else:
return "gpt-4" # 复杂任务
def estimate_complexity(task: dict) -> float:
# 基于任务特征估算复杂度
factors = []
factors.append(len(task.get("description", "")) / 1000)
factors.append(len(task.get("context", "")) / 5000)
factors.append(len(task.get("tools", [])) / 10)
return min(sum(factors), 1.0)
8.3 上下文压缩
async def compress_context(context: List[Document], max_tokens: int) -> List[Document]:
total_tokens = sum(len(doc.content.split()) for doc in context)
if total_tokens <= max_tokens:
return context
# 策略1:基于重要性排序
scored = [(doc, calculate_relevance(doc)) for doc in context]
scored.sort(key=lambda x: x[1], reverse=True)
# 策略2:递归摘要
compressed = []
current_tokens = 0
for doc, score in scored:
if current_tokens + len(doc.content.split()) > max_tokens:
break
if len(doc.content.split()) > 1000:
# 长文档进行摘要
summary = await summarize_document(doc)
compressed.append(Document(content=summary, metadata=doc.metadata))
current_tokens += len(summary.split())
else:
compressed.append(doc)
current_tokens += len(doc.content.split())
return compressed
9. 安全运维
9.1 API密钥管理
from pydantic_settings import BaseSettings
from cryptography.fernet import Fernet
import os
class Settings(BaseSettings):
openai_api_key: str
anthropic_api_key: str
# 使用密钥管理系统
def __init__(self):
super().__init__()
self.openai_api_key = self.get_secret("OPENAI_API_KEY")
self.anthropic_api_key = self.get_secret("ANTHROPIC_API_KEY")
@staticmethod
def get_secret(key: str) -> str:
# 优先从环境变量读取
if key in os.environ:
return os.environ[key]
# 从密钥管理服务读取
return get_from_vault(key)
# 轮换密钥
class APIKeyRotator:
def __init__(self):
self.keys = []
self.current_index = 0
def rotate(self):
self.current_index = (self.current_index + 1) % len(self.keys)
return self.keys[self.current_index]
def add_key(self, key: str):
self.keys.append(key)
9.2 输入过滤
from fastapi import HTTPException
import re
class InputValidator:
MAX_INPUT_LENGTH = 10000
DISALLOWED_PATTERNS = [
r'<script.*?>.*?</script>',
r'javascript:',
r'on\w+\s*=',
]
def validate(self, input_text: str) -> bool:
# 长度检查
if len(input_text) > self.MAX_INPUT_LENGTH:
raise HTTPException(
status_code=400,
detail="Input too long"
)
# 模式检查
for pattern in self.DISALLOWED_PATTERNS:
if re.search(pattern, input_text, re.IGNORECASE):
raise HTTPException(
status_code=400,
detail="Invalid input pattern"
)
return True
9.3 审计日志
class AuditLogger:
def __init__(self):
self.logger = structlog.get_logger("audit")
async def log_request(self, user_id: str, action: str, details: dict):
self.logger.info(
"audit_log",
user_id=user_id,
action=action,
details=details,
timestamp=datetime.utcnow().isoformat()
)
async def log_llm_call(
self,
user_id: str,
model: str,
input_tokens: int,
output_tokens: int,
cost: float
):
await self.log_request(
user_id,
"llm_call",
{
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost": cost
}
)
10. 故障排查
10.1 常见问题诊断
class Diagnostic:
@staticmethod
async def check_system():
results = {}
# 检查Redis连接
try:
await redis_client.ping()
results["redis"] = "OK"
except Exception as e:
results["redis"] = f"FAILED: {e}"
# 检查向量数据库
try:
await vector_client.get_collections()
results["vector_db"] = "OK"
except Exception as e:
results["vector_db"] = f"FAILED: {e}"
# 检查LLM API
try:
await openai_client.models.list()
results["llm_api"] = "OK"
except Exception as e:
results["llm_api"] = f"FAILED: {e}"
return results
@staticmethod
async def check_performance():
import psutil
return {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"disk_usage": psutil.disk_usage("/").percent,
"process_count": len(psutil.pids()),
}
10.2 调试工具
import json
from functools import wraps
class DebugTools:
@staticmethod
def trace_function(func):
@wraps(func)
async def wrapper(*args, **kwargs):
print(f"Calling {func.__name__} with args={args}, kwargs={kwargs}")
result = await func(*args, **kwargs)
print(f"{func.__name__} returned: {result}")
return result
return wrapper
@staticmethod
def log_request_response(func):
@wraps(func)
async def wrapper(request, *args, **kwargs):
print(f"Request: {request}")
response = await func(request, *args, **kwargs)
print(f"Response: {response}")
return response
return wrapper
# 使用示例
@DebugTools.trace_function
async def process_agent_request(request):
# 函数逻辑
pass
最佳实践总结
部署最佳实践
- 容器化:使用多阶段构建减小镜像体积
- 编排:利用Kubernetes的自动扩缩容和自愈能力
- 配置管理:将配置与代码分离,使用ConfigMap和Secret
- 健康检查:实现完善的liveness和readiness探针
监控最佳实践
- 指标采集:收集关键业务和技术指标
- 告警:设置合理的告警阈值和通知策略
- 日志:使用结构化日志,包含追踪ID
- 可视化:构建全面的监控仪表板
性能最佳实践
- 并发:充分利用异步编程和连接池
- 缓存:实现多级缓存策略
- 批量:批量处理减少API调用次数
- 压缩:优化上下文大小减少token消耗
可靠性最佳实践
- 熔断:防止级联故障
- 重试:对暂时性错误进行重试
- 降级:提供降级方案保证基本功能
- 限流:保护系统免受过载
安全最佳实践
- 密钥管理:使用密钥管理系统,定期轮换
- 输入验证:严格验证和过滤所有输入
- 审计:记录所有关键操作
- 最小权限:遵循最小权限原则