内容纲要
MLOps 平台构建
模型训练、推理、监控全流程自动化
目录
核心概念
| 概念 | 定义 | 核心价值 |
|---|---|---|
| MLOps | Machine Learning Operations | 模型全生命周期管理 |
| ML Pipeline | 机器学习流水线 | 自动化训练流程 |
| Model Registry | 模型仓库 | 模型版本管理 |
| Feature Store | 特征存储 | 特征复用、一致性 |
| Inference Service | 推理服务 | 模型在线服务 |
| Model Monitoring | 模型监控 | 性能追踪、漂移检测 |
| A/B Testing | A/B测试 | 模型效果对比 |
平台架构设计
1.1 整体架构
┌─────────────────────────────────────────────────────────────────────┐
│ MLOps Platform │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Web Portal │ │ API Gateway │ │ Auth/SSO │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ┌──────▼─────────────────▼─────────────────▼──────────────┐ │
│ │ Core Services Layer │ │
│ ├────────────────────────────────────────────────────────────┤ │
│ │ Training Service │ Inference Service │ Model Store │ │
│ │ Workflow Engine │ Feature Store │ Registry │ │
│ └─────────────────────┴─────────────────────┴─────────────┘ │
│ │ │
│ ┌────────────────────────────▼────────────────────────────┐ │
│ │ Orchestration Layer │ │
│ ├─────────────────────────────────────────────────────────┤ │
│ │ Argo Workflows │ Kueue │ Volcano │ Operator │ │
│ └────────────────────┬────────────────────┬─────────────┘ │
│ │ │ │
│ ┌────────────────────▼────────────────────▼──────────────┐ │
│ │ Infrastructure Layer │ │
│ ├─────────────────────────────────────────────────────────┤ │
│ │ Kubernetes Cluster │ GPU Nodes │ Storage │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
1.2 核心组件
# mlplatform/__init__.py
"""
MLOps 平台核心组件定义
"""
from typing import Dict, List, Optional, Protocol
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
# ============== 数据模型 ==============
class ModelStatus(Enum):
"""模型状态"""
UPLOADING = "uploading"
REGISTERED = "registered"
TRAINING = "training"
TRAINED = "trained"
EVALUATING = "evaluating"
PUBLISHED = "published"
DEPRECATED = "deprecated"
FAILED = "failed"
class TaskStatus(Enum):
"""任务状态"""
PENDING = "pending"
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
CANCELLED = "cancelled"
class ResourceType(Enum):
"""资源类型"""
CPU = "cpu"
MEMORY = "memory"
GPU = "gpu"
@dataclass
class GPUResource:
"""GPU资源"""
model: str # A100, V100, T4, etc.
count: int
memory_gb: int
sharded: bool = False # 是否使用模型并行
@dataclass
class ModelMetadata:
"""模型元数据"""
name: str
version: str
framework: str # pytorch, tensorflow, onnx
task_type: str # classification, generation, embedding
created_at: datetime
author: str
tags: List[str]
metrics: Dict[str, float]
file_size_mb: int
hash: str
@dataclass
class TrainingJob:
"""训练任务"""
job_id: str
model_name: str
config: Dict
status: TaskStatus
resources: GPUResource
created_at: datetime
started_at: Optional[datetime]
finished_at: Optional[datetime]
metrics: Dict[str, float]
# ============== 核心接口 ==============
class ModelRegistry(Protocol):
"""模型仓库接口"""
def register_model(
self,
model_id: str,
metadata: ModelMetadata,
model_path: str
) -> str:
"""注册模型"""
...
def get_model(self, model_id: str, version: str = None) -> ModelMetadata:
"""获取模型"""
...
def list_models(
self,
task_type: str = None,
tags: List[str] = None
) -> List[ModelMetadata]:
"""列出模型"""
...
def promote_model(
self,
model_id: str,
version: str,
stage: str # staging, production
) -> bool:
"""提升模型到指定环境"""
...
class TrainingService(Protocol):
"""训练服务接口"""
def submit_training_job(
self,
config: Dict,
resources: GPUResource,
priority: int = 0
) -> TrainingJob:
"""提交训练任务"""
...
def get_job_status(self, job_id: str) -> TrainingJob:
"""获取任务状态"""
...
def cancel_job(self, job_id: str) -> bool:
"""取消任务"""
...
def get_job_logs(self, job_id: str) -> str:
"""获取任务日志"""
...
class InferenceService(Protocol):
"""推理服务接口"""
def deploy_model(
self,
model_id: str,
replicas: int = 1,
resources: GPUResource = None
) -> str:
"""部署模型"""
...
def scale_model(
self,
deployment_id: str,
replicas: int
) -> bool:
"""扩缩容"""
...
def undeploy_model(self, deployment_id: str) -> bool:
"""下线模型"""
...
def get_deployment_status(self, deployment_id: str) -> Dict:
"""获取部署状态"""
...
class FeatureStore(Protocol):
"""特征存储接口"""
def write_features(
self,
entity_id: str,
feature_name: str,
value: any,
timestamp: datetime = None
) -> bool:
"""写入特征"""
...
def read_features(
self,
entity_ids: List[str],
feature_names: List[str]
) -> Dict[str, Dict[str, any]]:
"""读取特征"""
...
def get_training_dataset(
self,
feature_names: List[str],
start_time: datetime,
end_time: datetime
) -> Dict:
"""获取训练数据集"""
...
class WorkflowEngine(Protocol):
"""工作流引擎接口"""
def submit_workflow(
self,
workflow_def: Dict,
parameters: Dict = None
) -> str:
"""提交工作流"""
...
def get_workflow_status(self, workflow_id: str) -> Dict:
"""获取工作流状态"""
...
class ResourceScheduler(Protocol):
"""资源调度器接口"""
def schedule_job(
self,
job_id: str,
resources: GPUResource,
priority: int = 0
) -> bool:
"""调度任务"""
...
def get_available_resources(self) -> Dict[ResourceType, int]:
"""获取可用资源"""
...
def reserve_resources(
self,
job_id: str,
resources: GPUResource
) -> bool:
"""预留资源"""
...
def release_resources(self, job_id: str) -> bool:
"""释放资源"""
...
模型训练平台
2.1 训练服务实现
# mlplatform/training_service.py
"""
模型训练服务实现
包含:任务提交、资源调度、训练执行
"""
import uuid
from typing import Dict, Optional
from kubernetes import client, config
from kubernetes.client import V1Job, V1ObjectMeta, V1PodTemplateSpec, V1Container
import yaml
class KubernetesTrainingService(TrainingService):
"""基于Kubernetes的训练服务"""
def __init__(
self,
namespace: str = "ml-training",
image: str = "ml-train-runner:latest",
storage_class: str = "standard"
):
self.namespace = namespace
self.base_image = image
self.storage_class = storage_class
# 初始化K8s客户端
config.load_kube_config()
self.batch_api = client.BatchV1Api()
self.core_api = client.CoreV1Api()
self._create_namespace_if_not_exists()
def submit_training_job(
self,
config: Dict,
resources: GPUResource,
priority: int = 0
) -> TrainingJob:
"""提交训练任务"""
job_id = str(uuid.uuid4())
# 创建PVC
pvc_name = f"{job_id}-data"
self._create_pvc(pvc_name)
# 创建Job
k8s_job = self._build_job(
job_id=job_id,
config=config,
resources=resources,
pvc_name=pvc_name,
priority=priority
)
# 提交到K8s
self.batch_api.create_namespaced_job(
namespace=self.namespace,
body=k8s_job
)
return TrainingJob(
job_id=job_id,
model_name=config.get("model_name", "unknown"),
config=config,
status=TaskStatus.PENDING,
resources=resources,
created_at=datetime.now(),
started_at=None,
finished_at=None,
metrics={}
)
def get_job_status(self, job_id: str) -> TrainingJob:
"""获取任务状态"""
try:
job = self.batch_api.read_namespaced_job(
name=job_id,
namespace=self.namespace
)
status = self._parse_job_status(job)
return status
except client.ApiException as e:
if e.status == 404:
raise ValueError(f"Job {job_id} not found")
raise
def cancel_job(self, job_id: str) -> bool:
"""取消任务"""
try:
self.batch_api.delete_namespaced_job(
name=job_id,
namespace=self.namespace
)
return True
except client.ApiException:
return False
def get_job_logs(self, job_id: str) -> str:
"""获取任务日志"""
try:
pods = self.core_api.list_namespaced_pod(
namespace=self.namespace,
label_selector=f"job-name={job_id}"
)
if not pods.items:
return ""
# 获取主pod的日志
pod_name = pods.items[0].metadata.name
logs = self.core_api.read_namespaced_pod_log(
name=pod_name,
namespace=self.namespace
)
return logs
except client.ApiException:
return ""
def _create_namespace_if_not_exists(self):
"""创建命名空间"""
try:
self.core_api.create_namespace(
client.V1Namespace(
metadata=client.V1ObjectMeta(name=self.namespace)
)
)
except client.ApiException as e:
if e.status != 409: # Already exists
raise
def _create_pvc(self, name: str, size_gb: int = 100):
"""创建PVC"""
pvc = client.V1PersistentVolumeClaim(
metadata=client.V1ObjectMeta(name=name),
spec=client.V1PersistentVolumeClaimSpec(
access_modes=["ReadWriteOnce"],
resources=client.V1ResourceRequirements(
requests={"storage": f"{size_gb}Gi"}
),
storage_class_name=self.storage_class
)
)
try:
self.core_api.create_namespaced_persistent_volume_claim(
namespace=self.namespace,
body=pvc
)
except client.ApiException as e:
if e.status != 409:
raise
def _build_job(
self,
job_id: str,
config: Dict,
resources: GPUResource,
pvc_name: str,
priority: int
) -> V1Job:
"""构建K8s Job"""
# 资源请求
resource_requests = {
"cpu": "4",
"memory": "16G"
}
resource_limits = {
"cpu": "8",
"memory": "32G"
}
if resources.count > 0:
resource_requests["nvidia.com/gpu"] = str(resources.count)
resource_limits["nvidia.com/gpu"] = str(resources.count)
# 容器定义
container = V1Container(
name="trainer",
image=self.base_image,
command=["python", "train.py"],
env=[
client.V1EnvVar(
name="JOB_CONFIG",
value=yaml.dump(config)
),
client.V1EnvVar(
name="DATA_PATH",
value=f"/data/{job_id}"
)
],
volume_mounts=[
client.V1VolumeMount(
name="data",
mount_path="/data"
)
],
resources=client.V1ResourceRequirements(
requests=resource_requests,
limits=resource_limits
)
)
# Pod模板
pod_template = V1PodTemplateSpec(
containers=[container],
volumes=[
client.V1Volume(
name="data",
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
claim_name=pvc_name
)
)
],
restart_policy="Never"
)
# 优先级
if priority > 0:
pod_template.priority_class_name = "high-priority"
# Job定义
job = V1Job(
metadata=V1ObjectMeta(
name=job_id,
labels={
"app": "ml-training",
"model": config.get("model_name", "unknown"),
"priority": str(priority)
}
),
spec=client.V1JobSpec(
pod_spec=pod_template,
backoff_limit=3,
ttl_seconds_after_finished=86400 # 24小时后清理
)
)
return job
def _parse_job_status(self, job: V1Job) -> TrainingJob:
"""解析Job状态"""
status = job.status
# 解析任务状态
task_status = TaskStatus.PENDING
if status.active:
task_status = TaskStatus.RUNNING
elif status.succeeded:
task_status = TaskStatus.SUCCEEDED
elif status.failed:
task_status = TaskStatus.FAILED
return TrainingJob(
job_id=job.metadata.name,
model_name=job.metadata.labels.get("model", "unknown"),
config={},
status=task_status,
resources=GPUResource(model="unknown", count=0, memory_gb=0),
created_at=job.metadata.creation_timestamp,
started_at=status.start_time,
finished_at=status.completion_time,
metrics={}
)
# ============== 分布式训练支持 ==============
class DistributedTrainingJob:
"""分布式训练任务管理器"""
def __init__(self, training_service: TrainingService):
self.training_service = training_service
def submit_distributed_job(
self,
config: Dict,
strategy: str, # data_parallel, tensor_parallel, pipeline_parallel
num_nodes: int,
gpus_per_node: int
) -> str:
"""
提交分布式训练任务
Args:
strategy: 并行策略
num_nodes: 节点数量
gpus_per_node: 每节点GPU数
"""
# 设置分布式配置
config["distributed"] = {
"strategy": strategy,
"world_size": num_nodes * gpus_per_node,
"backend": "nccl" # NCCL for NVIDIA GPUs
}
# 提交多节点训练
resources = GPUResource(
model="unknown",
count=gpus_per_node,
memory_gb=40 * gpus_per_node
)
if strategy == "tensor_parallel":
# 张量并行需要更多通信,降低并发度
# 这里简化处理
pass
job = self.training_service.submit_training_job(
config=config,
resources=resources,
priority=1 # 分布式训练通常优先级较高
)
return job.job_id
# ============== 使用示例 ==============
if __name__ == "__main__":
# 创建训练服务
training_service = KubernetesTrainingService(
namespace="ml-training",
image="ml-train-runner:latest"
)
# 提交训练任务
config = {
"model_name": "gpt2-finetune",
"dataset_path": "/data/dataset",
"output_path": "/data/output",
"hyperparameters": {
"learning_rate": 1e-4,
"batch_size": 32,
"epochs": 3,
"max_seq_length": 512
},
"checkpointing": {
"save_steps": 500,
"save_total_limit": 3
}
}
resources = GPUResource(
model="A100",
count=4,
memory_gb=160
)
job = training_service.submit_training_job(
config=config,
resources=resources,
priority=1
)
print(f"Submitted training job: {job.job_id}")
# 监控任务
import time
while True:
status = training_service.get_job_status(job.job_id)
print(f"Job status: {status.status}")
if status.status in [TaskStatus.SUCCEEDED, TaskStatus.FAILED]:
break
time.sleep(30)
# 获取日志
logs = training_service.get_job_logs(job.job_id)
print(f"Logs:\n{logs}")
模型推理服务
3.1 推理服务实现
# mlplatform/inference_service.py
"""
模型推理服务实现
包含:模型部署、自动扩缩容、请求路由
"""
from typing import Dict, List, Optional
from kubernetes import client, config
from kubernetes.client import V1Deployment, V1Service, V1Ingress
import yaml
class KubernetesInferenceService(InferenceService):
"""基于Kubernetes的推理服务"""
def __init__(
self,
namespace: str = "ml-inference",
base_image: str = "ml-inference-server:latest"
):
self.namespace = namespace
self.base_image = base_image
config.load_kube_config()
self.apps_api = client.AppsV1Api()
self.core_api = client.CoreV1Api()
self.networking_api = client.NetworkingV1Api()
def deploy_model(
self,
model_id: str,
replicas: int = 1,
resources: GPUResource = None,
autoscaling: bool = True
) -> str:
"""部署模型"""
deployment_name = f"model-{model_id}"
# 创建Deployment
deployment = self._build_deployment(
name=deployment_name,
model_id=model_id,
replicas=replicas,
resources=resources
)
self.apps_api.create_namespaced_deployment(
namespace=self.namespace,
body=deployment
)
# 创建Service
service = self._build_service(
name=deployment_name,
selector={"app": deployment_name}
)
self.core_api.create_namespaced_service(
namespace=self.namespace,
body=service
)
# 创建HPA(自动扩缩容)
if autoscaling and resources:
self._create_hpa(
name=deployment_name,
target_cpu=70,
min_replicas=replicas,
max_replicas=replicas * 4
)
return deployment_name
def scale_model(
self,
deployment_id: str,
replicas: int
) -> bool:
"""扩缩容"""
try:
# 读取当前deployment
deployment = self.apps_api.read_namespaced_deployment(
name=deployment_id,
namespace=self.namespace
)
# 更新副本数
deployment.spec.replicas = replicas
# 应用更新
self.apps_api.patch_namespaced_deployment(
name=deployment_id,
namespace=self.namespace,
body=deployment
)
return True
except client.ApiException:
return False
def undeploy_model(self, deployment_id: str) -> bool:
"""下线模型"""
try:
# 删除HPA
try:
self.autoscaling_api.delete_namespaced_horizontal_pod_autoscaler(
name=deployment_id,
namespace=self.namespace
)
except:
pass
# 删除Service
self.core_api.delete_namespaced_service(
name=deployment_id,
namespace=self.namespace
)
# 删除Deployment
self.apps_api.delete_namespaced_deployment(
name=deployment_id,
namespace=self.namespace
)
return True
except client.ApiException:
return False
def get_deployment_status(self, deployment_id: str) -> Dict:
"""获取部署状态"""
try:
deployment = self.apps_api.read_namespaced_deployment(
name=deployment_id,
namespace=self.namespace
)
pods = self.core_api.list_namespaced_pod(
namespace=self.namespace,
label_selector=f"app={deployment_id}"
)
return {
"name": deployment_id,
"replicas": deployment.spec.replicas,
"ready_replicas": deployment.status.ready_replicas,
"available_replicas": deployment.status.available_replicas,
"updated_replicas": deployment.status.updated_replicas,
"pods": [
{
"name": pod.metadata.name,
"status": pod.status.phase,
"ready": all(
cs.ready for cs in (pod.status.container_statuses or [])
)
}
for pod in pods.items
]
}
except client.ApiException as e:
if e.status == 404:
return {"name": deployment_id, "exists": False}
raise
def _build_deployment(
self,
name: str,
model_id: str,
replicas: int,
resources: GPUResource
) -> V1Deployment:
"""构建Deployment"""
resource_requests = {
"cpu": "2",
"memory": "8G"
}
resource_limits = {
"cpu": "4",
"memory": "16G"
}
if resources:
if resources.count > 0:
resource_limits["nvidia.com/gpu"] = str(resources.count)
container = client.V1Container(
name="inference",
image=self.base_image,
env=[
client.V1EnvVar(name="MODEL_ID", value=model_id),
client.V1EnvVar(name="PORT", value="8000")
],
ports=[client.V1ContainerPort(container_port=8000)],
resources=client.V1ResourceRequirements(
requests=resource_requests,
limits=resource_limits
),
liveness_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(
path="/health",
port=8000
),
initial_delay_seconds=30,
period_seconds=10
),
readiness_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(
path="/ready",
port=8000
),
initial_delay_seconds=10,
period_seconds=5
)
)
deployment = V1Deployment(
metadata=client.V1ObjectMeta(
name=name,
labels={"app": name, "model": model_id}
),
spec=client.V1DeploymentSpec(
replicas=replicas,
selector=client.V1LabelSelector(match_labels={"app": name}),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={"app": name, "model": model_id}
),
spec=client.V1PodSpec(
containers=[container]
)
)
)
)
return deployment
def _build_service(
self,
name: str,
selector: Dict[str, str]
) -> V1Service:
"""构建Service"""
service = V1Service(
metadata=client.V1ObjectMeta(name=name),
spec=client.V1ServiceSpec(
selector=selector,
ports=[client.V1ServicePort(
port=80,
target_port=8000,
protocol="TCP"
)],
type="ClusterIP"
)
)
return service
def _create_hpa(
self,
name: str,
target_cpu: int,
min_replicas: int,
max_replicas: int
):
"""创建HPA"""
hpa = client.V1HorizontalPodAutoscaler(
metadata=client.V1ObjectMeta(name=name),
spec=client.V2HorizontalPodAutoscalerSpec(
scale_target_ref=client.V2CrossVersionObjectReference(
kind="Deployment",
name=name
),
min_replicas=min_replicas,
max_replicas=max_replicas,
metrics=[
client.V2MetricSpec(
type="Resource",
resource=client.V2ResourceMetricSource(
name="cpu",
target=client.V2MetricTarget(
type="Utilization",
average_utilization=target_cpu
)
)
)
]
)
)
self.autoscaling_api.create_namespaced_horizontal_pod_autoscaler(
namespace=self.namespace,
body=hpa
)
# ============== 模型路由器 ==============
class ModelRouter:
"""模型路由器"""
def __init__(self):
self.routes: Dict[str, str] = {}
self.load_balancer = LoadBalancer()
def register_model(
self,
model_id: str,
endpoint: str,
priority: int = 0
):
"""注册模型路由"""
self.routes[model_id] = {
"endpoint": endpoint,
"priority": priority,
"healthy": True
}
def get_endpoint(self, model_id: str) -> Optional[str]:
"""获取模型端点"""
if model_id not in self.routes:
return None
route = self.routes[model_id]
if route["healthy"]:
return route["endpoint"]
return None
def route_request(
self,
model_id: str,
request: Dict
) -> Dict:
"""路由请求"""
endpoint = self.get_endpoint(model_id)
if not endpoint:
raise ValueError(f"Model {model_id} not available")
return self.load_balancer.send_request(endpoint, request)
class LoadBalancer:
"""负载均衡器"""
def __init__(self, strategy: str = "round_robin"):
self.strategy = strategy
self.endpoints: List[str] = []
self.current_index = 0
def add_endpoint(self, endpoint: str):
"""添加端点"""
if endpoint not in self.endpoints:
self.endpoints.append(endpoint)
def remove_endpoint(self, endpoint: str):
"""移除端点"""
if endpoint in self.endpoints:
self.endpoints.remove(endpoint)
def send_request(self, endpoint: str, request: Dict) -> Dict:
"""发送请求"""
import requests
response = requests.post(
f"{endpoint}/predict",
json=request,
timeout=30
)
return response.json()
# ============== 使用示例 ==============
if __name__ == "__main__":
# 创建推理服务
inference_service = KubernetesInferenceService(
namespace="ml-inference"
)
# 部署模型
resources = GPUResource(
model="A100",
count=2,
memory_gb=80
)
deployment_id = inference_service.deploy_model(
model_id="gpt2-finetune-v1",
replicas=2,
resources=resources,
autoscaling=True
)
print(f"Deployed model: {deployment_id}")
# 检查状态
import time
time.sleep(10)
status = inference_service.get_deployment_status(deployment_id)
print(f"Deployment status: {status}")
# 扩容
inference_service.scale_model(deployment_id, replicas=4)
print("Scaled to 4 replicas")
# 下线
# inference_service.undeploy_model(deployment_id)
工作流编排
4.1 Argo Workflows 集成
# mlplatform/workflow_engine.py
"""
工作流引擎实现
基于Argo Workflows
"""
from typing import Dict, List, Optional
import yaml
import requests
class ArgoWorkflowEngine:
"""Argo Workflows引擎"""
def __init__(
self,
server_url: str,
namespace: str = "ml-workflows",
token: str = None
):
self.server_url = server_url
self.namespace = namespace
self.token = token
def submit_workflow(
self,
workflow_def: Dict,
parameters: Dict = None,
labels: Dict = None
) -> str:
"""提交工作流"""
# 设置参数
if parameters:
for param_name, param_value in parameters.items():
self._set_workflow_parameter(workflow_def, param_name, param_value)
# 设置标签
if labels:
if "metadata" not in workflow_def:
workflow_def["metadata"] = {}
if "labels" not in workflow_def["metadata"]:
workflow_def["metadata"]["labels"] = {}
workflow_def["metadata"]["labels"].update(labels)
# 提交
response = requests.post(
f"{self.server_url}/api/v1/workflows/{self.namespace}",
headers=self._get_headers(),
json=workflow_def
)
if response.status_code != 200:
raise RuntimeError(f"Failed to submit workflow: {response.text}")
data = response.json()
return data["metadata"]["name"]
def get_workflow_status(self, workflow_id: str) -> Dict:
"""获取工作流状态"""
response = requests.get(
f"{self.server_url}/api/v1/workflows/{self.namespace}/{workflow_id}",
headers=self._get_headers()
)
if response.status_code != 200:
raise RuntimeError(f"Failed to get workflow: {response.text}")
return response.json()
def list_workflows(
self,
labels: Dict = None,
phase: str = None
) -> List[Dict]:
"""列出工作流"""
params = []
if labels:
label_selector = ",".join([f"{k}={v}" for k, v in labels.items()])
params.append(f"labelSelector={label_selector}")
if phase:
params.append(f"phase={phase}")
url = f"{self.server_url}/api/v1/workflows/{self.namespace}"
if params:
url += "?" + "&".join(params)
response = requests.get(url, headers=self._get_headers())
if response.status_code != 200:
raise RuntimeError(f"Failed to list workflows: {response.text}")
data = response.json()
return data["items"]
def retry_workflow(self, workflow_id: str, node_id: str = None) -> str:
"""重试工作流"""
response = requests.put(
f"{self.server_url}/api/v1/workflows/{self.namespace}/{workflow_id}/retry",
headers=self._get_headers(),
json={"nodeFieldSelector": f"nodeId={node_id}" if node_id else ""}
)
if response.status_code != 200:
raise RuntimeError(f"Failed to retry workflow: {response.text}")
data = response.json()
return data["metadata"]["name"]
def _get_headers(self) -> Dict[str, str]:
"""获取请求头"""
headers = {"Content-Type": "application/json"}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
return headers
def _set_workflow_parameter(self, workflow: Dict, name: str, value: any):
"""设置工作流参数"""
if "spec" not in workflow:
workflow["spec"] = {}
if "arguments" not in workflow["spec"]:
workflow["spec"]["arguments"] = {}
if "parameters" not in workflow["spec"]["arguments"]:
workflow["spec"]["arguments"]["parameters"] = []
# 检查参数是否已存在
for param in workflow["spec"]["arguments"]["parameters"]:
if param["name"] == name:
param["value"] = str(value)
return
# 添加新参数
workflow["spec"]["arguments"]["parameters"].append({
"name": name,
"value": str(value)
})
# ============== ML Pipeline构建器 ==============
class MLPipelineBuilder:
"""ML Pipeline构建器"""
def __init__(self, name: str):
self.name = name
self.steps: List[Dict] = []
self.parameters: List[Dict] = []
def add_parameter(self, name: str, default: any = None) -> "MLPipelineBuilder":
"""添加参数"""
self.parameters.append({
"name": name,
"value": str(default) if default is not None else None
})
return self
def add_data_preparation(
self,
image: str,
script: str,
arguments: Dict = None
) -> "MLPipelineBuilder":
"""添加数据准备步骤"""
step = {
"name": "data-prep",
"template": "data-prep",
"arguments": self._build_arguments(arguments)
}
self.steps.append(step)
return self
def add_training(
self,
image: str,
resources: Dict = None,
arguments: Dict = None
) -> "MLPipelineBuilder":
"""添加训练步骤"""
step = {
"name": "training",
"template": "training",
"arguments": self._build_arguments(arguments)
}
self.steps.append(step)
return self
def add_evaluation(
self,
image: str,
metrics: List[str],
arguments: Dict = None
) -> "MLPipelineBuilder":
"""添加评估步骤"""
step = {
"name": "evaluation",
"template": "evaluation",
"arguments": self._build_arguments({
"metrics": ",".join(metrics),
**(arguments or {})
})
}
self.steps.append(step)
return self
def add_model_publish(
self,
registry: str,
arguments: Dict = None
) -> "MLPipelineBuilder":
"""添加模型发布步骤"""
step = {
"name": "publish",
"template": "publish",
"arguments": self._build_arguments({
"registry": registry,
**(arguments or {})
})
}
self.steps.append(step)
return self
def build(self) -> Dict:
"""构建Argo Workflow定义"""
workflow = {
"apiVersion": "argoproj.io/v1alpha1",
"kind": "Workflow",
"metadata": {
"generateName": f"{self.name}-"
},
"spec": {
"entrypoint": "ml-pipeline",
"arguments": {
"parameters": self.parameters
},
"templates": [
{
"name": "ml-pipeline",
"steps": self.steps
}
]
}
}
return workflow
def _build_arguments(self, params: Dict) -> List[Dict]:
"""构建参数列表"""
if not params:
return []
return [{"name": k, "value": str(v)} for k, v in params.items()]
# ============== 标准ML Pipeline模板 ==============
STANDARD_ML_PIPELINE_TEMPLATE = """
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: ml-pipeline-
spec:
entrypoint: ml-pipeline
arguments:
parameters:
- name: model-name
- name: dataset-path
- name: learning-rate
value: "0.001"
- name: batch-size
value: "32"
- name: epochs
value: "10"
templates:
- name: ml-pipeline
steps:
- - name: data-prep
template: data-prep
arguments:
- name: dataset-path
value: "{{workflow.parameters.dataset-path}}"
- - name: training
template: training
arguments:
- name: model-name
value: "{{workflow.parameters.model-name}}"
- name: learning-rate
value: "{{workflow.parameters.learning-rate}}"
- name: batch-size
value: "{{workflow.parameters.batch-size}}"
- name: epochs
value: "{{workflow.parameters.epochs}}"
- name: data-path
value: "{{steps.data-prep.outputs.parameters.processed-data}}"
- - name: evaluation
template: evaluation
arguments:
- name: model-path
value: "{{steps.training.outputs.parameters.model-path}}"
- - name: publish
template: publish
arguments:
- name: model-path
value: "{{steps.evaluation.outputs.parameters.model-path}}"
- name: metrics
value: "{{steps.evaluation.outputs.parameters.metrics}}"
- name: data-prep
container:
image: python:3.9
command: ["python", "scripts/data_prep.py"]
args:
- --dataset-path={{inputs.parameters.dataset-path}}
- --output-path=/data/processed
inputs:
parameters:
- name: dataset-path
outputs:
parameters:
- name: processed-data
valueFrom: {path: /data/processed}
- name: training
container:
image: python:3.9-gpu
command: ["python", "scripts/train.py"]
args:
- --model-name={{inputs.parameters.model-name}}
- --lr={{inputs.parameters.learning-rate}}
- --batch-size={{inputs.parameters.batch-size}}
- --epochs={{inputs.parameters.epochs}}
- --data-path={{inputs.parameters.data-path}}
- --output-path=/output
resources:
limits:
nvidia.com/gpu: 4
inputs:
parameters:
- name: model-name
- name: learning-rate
- name: batch-size
- name: epochs
- name: data-path
outputs:
parameters:
- name: model-path
valueFrom: {path: /output/model}
- name: evaluation
container:
image: python:3.9
command: ["python", "scripts/evaluate.py"]
args:
- --model-path={{inputs.parameters.model-path}}
- --output=/output/metrics.json
inputs:
parameters:
- name: model-path
outputs:
parameters:
- name: model-path
value: "{{inputs.parameters.model-path}}"
- name: metrics
valueFrom: {path: /output/metrics.json}
- name: publish
container:
image: python:3.9
command: ["python", "scripts/publish.py"]
args:
- --model-path={{inputs.parameters.model-path}}
- --metrics={{inputs.parameters.metrics}}
inputs:
parameters:
- name: model-path
- name: metrics
"""
# ============== 使用示例 ==============
if __name__ == "__main__":
# 创建工作流引擎
workflow_engine = ArgoWorkflowEngine(
server_url="http://argo-server.argo.svc:2746",
namespace="ml-workflows"
)
# 加载标准模板
workflow_def = yaml.safe_load(STANDARD_ML_PIPELINE_TEMPLATE)
# 提交工作流
workflow_id = workflow_engine.submit_workflow(
workflow_def=workflow_def,
parameters={
"model-name": "gpt2-finetune",
"dataset-path": "/datasets/train",
"learning-rate": "0.0001",
"batch-size": "64",
"epochs": "5"
},
labels={
"project": "ml-platform",
"model": "gpt2"
}
)
print(f"Submitted workflow: {workflow_id}")
# 监控工作流
import time
while True:
status = workflow_engine.get_workflow_status(workflow_id)
phase = status["status"]["phase"]
print(f"Workflow phase: {phase}")
if phase in ["Succeeded", "Failed", "Error"]:
break
time.sleep(10)
# 使用Pipeline构建器
builder = MLPipelineBuilder("custom-pipeline") \
.add_parameter("model-name", "custom-model") \
.add_parameter("dataset-path") \
.add_parameter("learning-rate", "0.001")
workflow_def = builder.build()
workflow_id = workflow_engine.submit_workflow(workflow_def)
print(f"Custom workflow: {workflow_id}")
资源调度
5.1 GPU资源管理
# mlplatform/resource_scheduler.py
"""
资源调度器实现
支持GPU资源隔离、优先级调度
"""
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from enum import Enum
import heapq
class SchedulingPolicy(Enum):
"""调度策略"""
FIFO = "fifo"
PRIORITY = "priority"
BIN_PACKING = "bin_packing" # 资源优化分配
FAIR_SHARE = "fair_share" # 公平共享
@dataclass
class Node:
"""计算节点"""
name: str
total_gpus: int
gpu_memory_gb: int
available_gpus: int
assigned_jobs: List[str] = field(default_factory=list)
def can_allocate(self, gpu_count: int, memory_gb: int) -> bool:
"""检查是否可以分配"""
return (
self.available_gpus >= gpu_count and
self.gpu_memory_gb >= memory_gb
)
def allocate(self, job_id: str, gpu_count: int):
"""分配资源"""
self.available_gpus -= gpu_count
self.assigned_jobs.append(job_id)
def release(self, job_id: str, gpu_count: int):
"""释放资源"""
self.available_gpus += gpu_count
if job_id in self.assigned_jobs:
self.assigned_jobs.remove(job_id)
@dataclass(order=True)
class PendingJob:
"""等待调度的任务"""
priority: int
job_id: str
gpu_count: int
memory_gb: int
timestamp: float
submit_time: float
class GPUResourceScheduler(ResourceScheduler):
"""GPU资源调度器"""
def __init__(self, policy: SchedulingPolicy = SchedulingPolicy.PRIORITY):
self.policy = policy
self.nodes: Dict[str, Node] = {}
self.pending_queue: List[PendingJob] = []
self.allocated: Dict[str, Dict] = {} # job_id -> {node_name, gpu_count}
def add_node(self, name: str, gpus: int, memory_gb: int):
"""添加计算节点"""
self.nodes[name] = Node(
name=name,
total_gpus=gpus,
gpu_memory_gb=memory_gb,
available_gpus=gpus
)
def schedule_job(
self,
job_id: str,
resources: GPUResource,
priority: int = 0
) -> bool:
"""调度任务"""
pending_job = PendingJob(
priority=-priority, # 最小堆,负数实现最大堆
job_id=job_id,
gpu_count=resources.count,
memory_gb=resources.memory_gb,
timestamp=time.time(),
submit_time=time.time()
)
# 尝试立即调度
if self._try_schedule(pending_job):
return True
# 无法立即调度,加入等待队列
heapq.push(self.pending_queue, pending_job)
return False
def release_resources(self, job_id: str) -> bool:
"""释放资源"""
if job_id not in self.allocated:
return False
allocation = self.allocated.pop(job_id)
node = self.nodes[allocation["node_name"]]
node.release(job_id, allocation["gpu_count"])
# 尝试调度等待队列中的任务
self._reconcile()
return True
def get_available_resources(self) -> Dict[ResourceType, int]:
"""获取可用资源"""
total_gpus = sum(node.available_gpus for node in self.nodes.values())
total_memory = sum(node.gpu_memory_gb for node in self.nodes.values())
return {
ResourceType.GPU: total_gpus,
ResourceType.MEMORY: total_memory
}
def get_queue_status(self) -> Dict:
"""获取队列状态"""
return {
"pending_count": len(self.pending_queue),
"allocated_jobs": len(self.allocated),
"total_nodes": capacity,
"total_gpus": total_gpus,
"available_gpus": sum(node.available_gpus for node in self.nodes.values())
}
def _try_schedule(self, job: PendingJob) -> bool:
"""尝试调度任务"""
if self.policy == SchedulingPolicy.PRIORITY:
return self._schedule_priority(job)
elif self.policy == SchedulingPolicy.BIN_PACKING:
return self._schedule_bin_packing(job)
else:
return self._schedule_fifo(job)
def _schedule_priority(self, job: PendingJob) -> bool:
"""优先级调度"""
# 找到第一个能容纳的节点
for node in sorted(
self.nodes.values(),
key=lambda n: n.available_gpus,
reverse=True
):
if node.can_allocate(job.gpu_count, job.memory_gb):
node.allocate(job.job_id, job.gpu_count)
self.allocated[job.job_id] = {
"node_name": node.name,
"gpu_count": job.gpu_count
}
return True
return False
def _schedule_bin_packing(self, job: PendingJob) -> bool:
"""装箱算法(资源优化)"""
# 找到最小满足条件的节点
best_node = None
min_waste = float('inf')
for node in self.nodes.values():
if node.can_allocate(job.gpu_count, job.memory_gb):
waste = node.available_gpus - job.gpu_count
if waste < min_waste:
min_waste = waste
best_node = node
if best_node:
best_node.allocate(job.job_id, job.gpu_count)
self.allocated[job.job_id] = {
"node_name": best_node.name,
"gpu_count": job.gpu_count
}
return True
return False
def _schedule_fifo(self, job: PendingJob) -> bool:
"""FIFO调度"""
for node in self.nodes.values():
if node.can_allocate(job.gpu_count, job.memory_gb):
node.allocate(job.job_id, job.gpu_count)
self.allocated[job.job_id] = {
"node_name": node.name,
"gpu_count": job.gpu_count
}
return True
return False
def _reconcile(self):
"""重新调度等待队列"""
scheduled = []
remaining = []
while self.pending_queue:
job = heapq.heappop(self.pending_queue)
if self._try_schedule(job):
scheduled.append(job.job_id)
else:
remaining.append(job)
self.pending_queue = remaining
# ============== Kueue集成 ==============
class KueueScheduler:
"""Kueue调度器集成"""
def __init__(self, namespace: str = "kueue-system"):
from kubernetes import client, config
config.load_kube_config()
self.namespace = namespace
self.custom_api = client.CustomObjectsApi()
def create_cluster_queue(
self,
name: str,
gpu_capacity: int,
cpu_capacity: str = "1000",
memory_capacity: str = "4000G"
):
"""创建集群队列"""
cluster_queue = {
"apiVersion": "kueue.x-k8s.io/v1beta1",
"kind": "ClusterQueue",
"metadata": {
"name": name
},
"spec": {
"capacity": [
{
"name": "gpu",
"quotas": [{
"name": "nvidia.com/gpu",
"min": str(gpu_capacity)
}]
},
{
"name": "cpu",
"nominalQuota": cpu_capacity
},
{
"name": "memory",
"nominalQuota": memory_capacity
}
]
}
}
self.custom_api.create_cluster_custom_object(
group="kueue.x-k8s.io",
version="v1beta1",
plural="clusterqueues",
body=cluster_queue
)
def create_local_queue(
self,
name: str,
cluster_queue: str,
namespace: str = "default"
):
"""创建本地队列"""
local_queue = {
"apiVersion": "kueue.x-k8s.io/v1beta1",
"kind": "LocalQueue",
"metadata": {
"name": name,
"namespace": namespace
},
"spec": {
"clusterQueue": cluster_queue
}
}
self.custom_api.create_namespaced_custom_object(
group="kueue.x-k8s.io",
version="v1beta1",
plural="localqueues",
namespace=namespace,
body=local_queue
)
# ============== 使用示例 ==============
if __name__ == "__main__":
# 创建GPU调度器
scheduler = GPUResourceScheduler(
policy=SchedulingPolicy.PRIORITY
)
# 添加节点
scheduler.add_node("node-1", gpus=8, memory_gb=320) # 8x A100 40GB
scheduler.add_node("node-2", gpus=4, memory_gb=160) # 4x A100 40GB
# 提交任务
import time
job1 = scheduler.schedule_job(
job_id="train-job-1",
resources=GPUResource(model="A100", count=4, memory_gb=160),
priority=1
)
print(f"Job 1 scheduled: {job1}")
job2 = scheduler.schedule_job(
job_id="train-job-2",
resources=GPUResource(model="A100", count=8, memory_gb=320),
priority=2 # 高优先级
)
print(f"Job 2 scheduled: {job2}")
job3 = scheduler.schedule_job(
job_id="train-job-3",
resources=GPUResource(model="A100", count=4, memory_gb=160),
priority=0
)
print(f"Job 3 scheduled: {job3}")
# 查看状态
status = scheduler.get_queue_status()
print(f"Queue status: {status}")
# 释放资源
scheduler.release_resources("train-job-1")
print("Released job-1 resources")
# 使用Kueue
kueue = KueueScheduler()
# kueue.create_cluster_queue("default-queue", gpu_capacity=16)
# kueue.create_local_queue("training", "default-queue")
监控与可观测性
6.1 模型监控系统
# mlplatform/monitoring.py
"""
模型监控与可观测性
包含:指标收集、漂移检测、告警
"""
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import numpy as np
@dataclass
class MetricRecord:
"""指标记录"""
model_id: str
metric_name: str
value: float
timestamp: datetime
tags: Dict[str, str] = None
@dataclass
class DriftReport:
"""漂移报告"""
model_id: str
drift_detected: bool
drift_score: float
drift_type: str
timestamp: datetime
details: Dict = None
class ModelMonitor:
"""模型监控器"""
def __init__(self):
self.metrics: List[MetricRecord] = []
self.baselines: Dict[str, Dict[str, float]] = {}
def record_metric(
self,
model_id: str,
metric_name: str,
value: float,
tags: Dict[str, str] = None
):
"""记录指标"""
record = MetricRecord(
model_id=model_id,
metric_name=metric_name,
value=value,
timestamp=datetime.now(),
tags=tags or {}
)
self.metrics.append(record)
def set_baseline(
self,
model_id: str,
metrics: Dict[str, float]
):
"""设置基准指标"""
self.baselines[model_id] = metrics
def get_metrics(
self,
model_id: str,
metric_name: str,
start_time: datetime,
end_time: datetime
) -> List[MetricRecord]:
"""获取指标"""
return [
m for m in self.metrics
if m.model_id == model_id
and m.metric_name == metric_name
and start_time <= m.timestamp <= end_time
]
def get_average_metric(
self,
model_id: str,
metric_name: str,
window_minutes: int = 60
) -> float:
"""获取平均指标"""
start_time = datetime.now() - timedelta(minutes=window_minutes)
records = self.get_metrics(model_id, metric_name, start_time, datetime.now())
if not records:
return 0.0
return sum(m.value for m in records) / len(records)
class DriftDetector:
"""数据漂移检测器"""
def __init__(self, threshold: float = 0.3):
self.threshold = threshold
def detect_drift(
self,
baseline: np.ndarray,
current: np.ndarray
) -> DriftReport:
"""
检测数据漂移
Returns:
DriftReport: 漂移报告
"""
# 方法1: KL散度
kl_divergence = self._calculate_kl_divergence(baseline, current)
# 方法2: PSI (Population Stability Index)
psi_score = self._calculate_psi(baseline, current)
# 方法3: Wasserstein距离
wasserstein_dist = self._calculate_wasserstein(baseline, current)
# 综合判断
max_score = max(abs(kl_divergence), psi_score, wasserstein_dist)
drift_detected = max_score > self.threshold
# 确定漂移类型
drift_type = "none"
if drift_detected:
if wasserstein_dist > self.threshold:
drift_type = "distribution_shift"
elif psi_score > self.threshold:
drift_type = "population_stability"
else:
drift_type = "feature_drift"
return DriftReport(
model_id="unknown",
drift_detected=drift_detected,
drift_score=max_score,
drift_type=drift_type,
timestamp=datetime.now(),
details={
"kl_divergence": kl_divergence,
"psi": psi_score,
"wasserstein": wasserstein_dist
}
)
def _calculate_kl_divergence(
self,
p: np.ndarray,
q: np.ndarray
) -> float:
"""计算KL散度"""
# 归一化为概率分布
p_normalized = p / np.sum(p)
q_normalized = q / np.sum(q)
# 添加小值避免log(0)
eps = 1e-10
p_normalized = np.clip(p_normalized, eps, 1)
q_normalized = np.clip(q_normalized, eps, 1)
return np.sum(p_normalized * np.log(p_normalized / q_normalized))
def _calculate_psi(
self,
expected: np.ndarray,
actual: np.ndarray
) -> float:
"""计算PSI"""
# 分桶
bins = np.percentile(expected, np.linspace(0, 100, 11))
expected_hist, _ = np.histogram(expected, bins=bins)
actual_hist, _ = np.histogram(actual, bins=bins)
# 归一化
expected_pct = expected_hist / np.sum(expected_hist)
actual_pct = actual_hist / np.sum(actual_hist)
# 避免除零
expected_pct = np.clip(expected_pct, 1e-10, 1)
actual_pct = np.clip(actual_pct, 1e-10, 1)
# 计算PSI
psi = np.sum((expected_pct - actual_pct) * np.log(expected_pct / actual_pct))
return psi
def _calculate_wasserstein(
self,
p: np.ndarray,
q: np.ndarray
) -> float:
"""计算Wasserstein距离"""
from scipy.stats import wasserstein_distance
return wasserstein_distance(p, q)
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.latencies: Dict[str, List[float]] = {}
self.errors: Dict[str, List[Dict]] = {}
def record_latency(
self,
model_id: str,
latency_ms: float,
request_id: str = None
):
"""记录延迟"""
if model_id not in self.latencies:
self.latencies[model_id] = []
self.latencies[model_id].append(latency_ms)
def record_error(
self,
model_id: str,
error_type: str,
error_message: str
):
"""记录错误"""
if model_id not in self.errors:
self.errors[model_id] = []
self.errors[model_id].append({
"type": error_type,
"message": error_message,
"timestamp": datetime.now()
})
def get_p95_latency(self, model_id: str) -> float:
"""获取P95延迟"""
if model_id not in self.latencies or not self.latencies[model_id]:
return 0.0
latencies = self.latencies[model_id]
return np.percentile(latencies, 95)
def get_p99_latency(self, model_id: str) -> float:
"""获取P99延迟"""
if model_id not in self.latencies or not self.latencies[model_id]:
return 0.0
latencies = self.latencies[model_id]
return np.percentile(latencies, 99)
def get_error_rate(
self,
model_id: str,
window_minutes: int = 5
) -> float:
"""获取错误率"""
if model_id not in self.errors:
return 0.0
cutoff_time = datetime.now() - timedelta(minutes=window_minutes)
recent_errors = [
e for e in self.errors[model_id]
if e["timestamp"] >= cutoff_time
]
total_requests = len(self.latencies[model_id])
if total_requests == 0:
return 0.0
return len(recent_errors) / total_requests
class AlertManager:
"""告警管理器"""
def __init__(self):
self.alerts: List[Dict] = []
self.alert_rules: Dict[str, Dict] = {}
def add_alert_rule(
self,
rule_id: str,
condition: callable,
severity: str,
channels: List[str] # email, slack, webhook
):
"""添加告警规则"""
self.alert_rules[rule_id] = {
"condition": condition,
"severity": severity,
"channels": channels
}
def check_and_fire_alerts(self, metrics: Dict) -> List[Dict]:
"""检查并触发告警"""
fired_alerts = []
for rule_id, rule in self.alert_rules.items():
if rule["condition"](metrics):
alert = {
"rule_id": rule_id,
"severity": rule["severity"],
"timestamp": datetime.now(),
"metrics": metrics
}
self.alerts.append(alert)
fired_alerts.append(alert)
# 发送告警
self._send_alert(alert, rule["channels"])
return fired_alerts
def _send_alert(self, alert: Dict, channels: List[str]):
"""发送告警"""
for channel in channels:
if channel == "slack":
self._send_to_slack(alert)
elif channel == "email":
self._send_to_email(alert)
elif channel == "webhook":
self._send_to_webhook(alert)
def _send_to_slack(self, alert: Dict):
"""发送Slack告警"""
import requests
webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
if webhook_url:
message = {
"text": f"🚨 Alert: {alert['rule_id']}\n"
f"Severity: {alert['severity']}\n"
f"Timestamp: {alert['timestamp']}"
}
requests.post(webhook_url, json=message)
# ============== 使用示例 ==============
if __name__ == "__main__":
# 创建监控器
monitor = ModelMonitor()
drift_detector = DriftDetector(threshold=0.3)
perf_monitor = PerformanceMonitor()
# 设置基准
monitor.set_baseline("model-1", {
"accuracy": 0.95,
"latency_p95": 50,
"error_rate": 0.01
})
# 记录指标
monitor.record_metric("model-1", "accuracy", 0.94)
monitor.record_metric("model-1", "latency_p95", 45)
monitor.record_metric("model-1", "error_rate", 0.015)
# 记录性能
perf_monitor.record_latency("model-1", 42.5)
perf_monitor.record_latency("model-1", 38.2)
perf_monitor.record_latency("model-1", 51.7)
perf_monitor.record_latency("model-1", 125.3) # 慢请求
print(f"P95 Latency: {perf_monitor.get_p95_latency('model-1')} ms")
print(f"P99 Latency: {perf_monitor.get_p99_latency('model-1')} ms")
# 检测漂移
baseline_data = np.random.normal(0, 1, 1000)
current_data = np.random.normal(0.5, 1.2, 1000) # 漂移数据
drift_report = drift_detector.detect_drift(baseline_data, current_data)
print(f"Drift detected: {drift_report.drift_detected}")
print(f"Drift score: {drift_report.drift_score}")
print(f"Drift type: {drift_report.drift_type}")
# 设置告警
alert_manager = AlertManager()
alert_manager.add_alert_rule(
rule_id="high-latency",
condition=lambda m: m.get("latency_p95", 0) > 100,
severity="warning",
channels=["slack"]
)
alert_manager.add_alert_rule(
rule_id="high-error-rate",
condition=lambda m: m.get("error_rate", 0) > 0.05,
severity="critical",
channels=["slack", "email"]
)
# 检查告警
current_metrics = {
"latency_p95": perf_monitor.get_p95_latency("model-1"),
"error_rate": monitor.get_average_metric("model-1", "error_rate")
}
alerts = alert_manager.check_and_fire_alerts(current_metrics)
print(f"Fired alerts: {len(alerts)}")
参考资料
K8s调度框架
- Argo Workflows - K8s工作流引擎
- Kueue - GPU资源队列管理
- Volcano - 批处理任务调度
- Koordinator - QoS感知的调度器
MLOps工具
- MLflow - 实验追踪和模型管理
- Kubeflow - K8s机器学习平台
- ClearML - MLOps平台
- Weights & Biases - 实验跟踪
文档版本: 1.0
最后更新: 2026-01-22