【AI Agent 知识库】28-MLOps平台构建

内容纲要

MLOps 平台构建

模型训练、推理、监控全流程自动化


目录


核心概念

概念 定义 核心价值
MLOps Machine Learning Operations 模型全生命周期管理
ML Pipeline 机器学习流水线 自动化训练流程
Model Registry 模型仓库 模型版本管理
Feature Store 特征存储 特征复用、一致性
Inference Service 推理服务 模型在线服务
Model Monitoring 模型监控 性能追踪、漂移检测
A/B Testing A/B测试 模型效果对比

平台架构设计

1.1 整体架构

┌─────────────────────────────────────────────────────────────────────┐
│                        MLOps Platform                            │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐          │
│  │  Web Portal  │  │  API Gateway │  │  Auth/SSO   │          │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘          │
│         │                 │                 │                    │
│  ┌──────▼─────────────────▼─────────────────▼──────────────┐      │
│  │                  Core Services Layer                 │      │
│  ├────────────────────────────────────────────────────────────┤      │
│  │  Training Service  │  Inference Service  │  Model Store │      │
│  │  Workflow Engine  │  Feature Store     │  Registry    │      │
│  └─────────────────────┴─────────────────────┴─────────────┘      │
│                              │                                    │
│  ┌────────────────────────────▼────────────────────────────┐      │
│  │                 Orchestration Layer                  │      │
│  ├─────────────────────────────────────────────────────────┤      │
│  │  Argo Workflows  │  Kueue  │  Volcano  │  Operator  │      │
│  └────────────────────┬────────────────────┬─────────────┘      │
│                       │                    │                      │
│  ┌────────────────────▼────────────────────▼──────────────┐      │
│  │                  Infrastructure Layer                │      │
│  ├─────────────────────────────────────────────────────────┤      │
│  │  Kubernetes Cluster  │  GPU Nodes  │  Storage       │      │
│  └─────────────────────────────────────────────────────────┘      │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

1.2 核心组件

# mlplatform/__init__.py
"""
MLOps 平台核心组件定义
"""

from typing import Dict, List, Optional, Protocol
from dataclasses import dataclass
from datetime import datetime
from enum import Enum

# ============== 数据模型 ==============

class ModelStatus(Enum):
    """模型状态"""
    UPLOADING = "uploading"
    REGISTERED = "registered"
    TRAINING = "training"
    TRAINED = "trained"
    EVALUATING = "evaluating"
    PUBLISHED = "published"
    DEPRECATED = "deprecated"
    FAILED = "failed"

class TaskStatus(Enum):
    """任务状态"""
    PENDING = "pending"
    RUNNING = "running"
    SUCCEEDED = "succeeded"
    FAILED = "failed"
    CANCELLED = "cancelled"

class ResourceType(Enum):
    """资源类型"""
    CPU = "cpu"
    MEMORY = "memory"
    GPU = "gpu"

@dataclass
class GPUResource:
    """GPU资源"""
    model: str  # A100, V100, T4, etc.
    count: int
    memory_gb: int
    sharded: bool = False  # 是否使用模型并行

@dataclass
class ModelMetadata:
    """模型元数据"""
    name: str
    version: str
    framework: str  # pytorch, tensorflow, onnx
    task_type: str  # classification, generation, embedding
    created_at: datetime
    author: str
    tags: List[str]
    metrics: Dict[str, float]
    file_size_mb: int
    hash: str

@dataclass
class TrainingJob:
    """训练任务"""
    job_id: str
    model_name: str
    config: Dict
    status: TaskStatus
    resources: GPUResource
    created_at: datetime
    started_at: Optional[datetime]
    finished_at: Optional[datetime]
    metrics: Dict[str, float]

# ============== 核心接口 ==============

class ModelRegistry(Protocol):
    """模型仓库接口"""

    def register_model(
        self,
        model_id: str,
        metadata: ModelMetadata,
        model_path: str
    ) -> str:
        """注册模型"""
        ...

    def get_model(self, model_id: str, version: str = None) -> ModelMetadata:
        """获取模型"""
        ...

    def list_models(
        self,
        task_type: str = None,
        tags: List[str] = None
    ) -> List[ModelMetadata]:
        """列出模型"""
        ...

    def promote_model(
        self,
        model_id: str,
        version: str,
        stage: str  # staging, production
    ) -> bool:
        """提升模型到指定环境"""
        ...

class TrainingService(Protocol):
    """训练服务接口"""

    def submit_training_job(
        self,
        config: Dict,
        resources: GPUResource,
        priority: int = 0
    ) -> TrainingJob:
        """提交训练任务"""
        ...

    def get_job_status(self, job_id: str) -> TrainingJob:
        """获取任务状态"""
        ...

    def cancel_job(self, job_id: str) -> bool:
        """取消任务"""
        ...

    def get_job_logs(self, job_id: str) -> str:
        """获取任务日志"""
        ...

class InferenceService(Protocol):
    """推理服务接口"""

    def deploy_model(
        self,
        model_id: str,
        replicas: int = 1,
        resources: GPUResource = None
    ) -> str:
        """部署模型"""
        ...

    def scale_model(
        self,
        deployment_id: str,
        replicas: int
    ) -> bool:
        """扩缩容"""
        ...

    def undeploy_model(self, deployment_id: str) -> bool:
        """下线模型"""
        ...

    def get_deployment_status(self, deployment_id: str) -> Dict:
        """获取部署状态"""
        ...

class FeatureStore(Protocol):
    """特征存储接口"""

    def write_features(
        self,
        entity_id: str,
        feature_name: str,
        value: any,
        timestamp: datetime = None
    ) -> bool:
        """写入特征"""
        ...

    def read_features(
        self,
        entity_ids: List[str],
        feature_names: List[str]
    ) -> Dict[str, Dict[str, any]]:
        """读取特征"""
        ...

    def get_training_dataset(
        self,
        feature_names: List[str],
        start_time: datetime,
        end_time: datetime
    ) -> Dict:
        """获取训练数据集"""
        ...

class WorkflowEngine(Protocol):
    """工作流引擎接口"""

    def submit_workflow(
        self,
        workflow_def: Dict,
        parameters: Dict = None
    ) -> str:
        """提交工作流"""
        ...

    def get_workflow_status(self, workflow_id: str) -> Dict:
        """获取工作流状态"""
        ...

class ResourceScheduler(Protocol):
    """资源调度器接口"""

    def schedule_job(
        self,
        job_id: str,
        resources: GPUResource,
        priority: int = 0
    ) -> bool:
        """调度任务"""
        ...

    def get_available_resources(self) -> Dict[ResourceType, int]:
        """获取可用资源"""
        ...

    def reserve_resources(
        self,
        job_id: str,
        resources: GPUResource
    ) -> bool:
        """预留资源"""
        ...

    def release_resources(self, job_id: str) -> bool:
        """释放资源"""
        ...

模型训练平台

2.1 训练服务实现

# mlplatform/training_service.py
"""
模型训练服务实现
包含:任务提交、资源调度、训练执行
"""

import uuid
from typing import Dict, Optional
from kubernetes import client, config
    from kubernetes.client import V1Job, V1ObjectMeta, V1PodTemplateSpec, V1Container
import yaml

class KubernetesTrainingService(TrainingService):
    """基于Kubernetes的训练服务"""

    def __init__(
        self,
        namespace: str = "ml-training",
        image: str = "ml-train-runner:latest",
        storage_class: str = "standard"
    ):
        self.namespace = namespace
        self.base_image = image
        self.storage_class = storage_class

        # 初始化K8s客户端
        config.load_kube_config()
        self.batch_api = client.BatchV1Api()
        self.core_api = client.CoreV1Api()

        self._create_namespace_if_not_exists()

    def submit_training_job(
        self,
        config: Dict,
        resources: GPUResource,
        priority: int = 0
    ) -> TrainingJob:
        """提交训练任务"""
        job_id = str(uuid.uuid4())

        # 创建PVC
        pvc_name = f"{job_id}-data"
        self._create_pvc(pvc_name)

        # 创建Job
        k8s_job = self._build_job(
            job_id=job_id,
            config=config,
            resources=resources,
            pvc_name=pvc_name,
            priority=priority
        )

        # 提交到K8s
        self.batch_api.create_namespaced_job(
            namespace=self.namespace,
            body=k8s_job
        )

        return TrainingJob(
            job_id=job_id,
            model_name=config.get("model_name", "unknown"),
            config=config,
            status=TaskStatus.PENDING,
            resources=resources,
            created_at=datetime.now(),
            started_at=None,
            finished_at=None,
            metrics={}
        )

    def get_job_status(self, job_id: str) -> TrainingJob:
        """获取任务状态"""
        try:
            job = self.batch_api.read_namespaced_job(
                name=job_id,
                namespace=self.namespace
            )

            status = self._parse_job_status(job)
            return status

        except client.ApiException as e:
            if e.status == 404:
                raise ValueError(f"Job {job_id} not found")
            raise

    def cancel_job(self, job_id: str) -> bool:
        """取消任务"""
        try:
            self.batch_api.delete_namespaced_job(
                name=job_id,
                namespace=self.namespace
            )
            return True
        except client.ApiException:
            return False

    def get_job_logs(self, job_id: str) -> str:
        """获取任务日志"""
        try:
            pods = self.core_api.list_namespaced_pod(
                namespace=self.namespace,
                label_selector=f"job-name={job_id}"
            )

            if not pods.items:
                return ""

            # 获取主pod的日志
            pod_name = pods.items[0].metadata.name
            logs = self.core_api.read_namespaced_pod_log(
                name=pod_name,
                namespace=self.namespace
            )

            return logs

        except client.ApiException:
            return ""

    def _create_namespace_if_not_exists(self):
        """创建命名空间"""
        try:
            self.core_api.create_namespace(
                client.V1Namespace(
                    metadata=client.V1ObjectMeta(name=self.namespace)
                )
            )
        except client.ApiException as e:
            if e.status != 409:  # Already exists
                raise

    def _create_pvc(self, name: str, size_gb: int = 100):
        """创建PVC"""
        pvc = client.V1PersistentVolumeClaim(
            metadata=client.V1ObjectMeta(name=name),
            spec=client.V1PersistentVolumeClaimSpec(
                access_modes=["ReadWriteOnce"],
                resources=client.V1ResourceRequirements(
                    requests={"storage": f"{size_gb}Gi"}
                ),
                storage_class_name=self.storage_class
            )
        )

        try:
            self.core_api.create_namespaced_persistent_volume_claim(
                namespace=self.namespace,
                body=pvc
            )
        except client.ApiException as e:
            if e.status != 409:
                raise

    def _build_job(
        self,
        job_id: str,
        config: Dict,
        resources: GPUResource,
        pvc_name: str,
        priority: int
    ) -> V1Job:
        """构建K8s Job"""
        # 资源请求
        resource_requests = {
            "cpu": "4",
            "memory": "16G"
        }

        resource_limits = {
            "cpu": "8",
            "memory": "32G"
        }

        if resources.count > 0:
            resource_requests["nvidia.com/gpu"] = str(resources.count)
            resource_limits["nvidia.com/gpu"] = str(resources.count)

        # 容器定义
        container = V1Container(
            name="trainer",
            image=self.base_image,
            command=["python", "train.py"],
            env=[
                client.V1EnvVar(
                    name="JOB_CONFIG",
                    value=yaml.dump(config)
                ),
                client.V1EnvVar(
                    name="DATA_PATH",
                    value=f"/data/{job_id}"
                )
            ],
            volume_mounts=[
                client.V1VolumeMount(
                    name="data",
                    mount_path="/data"
                )
            ],
            resources=client.V1ResourceRequirements(
                requests=resource_requests,
                limits=resource_limits
            )
        )

        # Pod模板
        pod_template = V1PodTemplateSpec(
            containers=[container],
            volumes=[
                client.V1Volume(
                    name="data",
                    persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
                        claim_name=pvc_name
                    )
                )
            ],
            restart_policy="Never"
        )

        # 优先级
        if priority > 0:
            pod_template.priority_class_name = "high-priority"

        # Job定义
        job = V1Job(
            metadata=V1ObjectMeta(
                name=job_id,
                labels={
                    "app": "ml-training",
                    "model": config.get("model_name", "unknown"),
                    "priority": str(priority)
                }
            ),
            spec=client.V1JobSpec(
                pod_spec=pod_template,
                backoff_limit=3,
                ttl_seconds_after_finished=86400  # 24小时后清理
            )
        )

        return job

    def _parse_job_status(self, job: V1Job) -> TrainingJob:
        """解析Job状态"""
        status = job.status

        # 解析任务状态
        task_status = TaskStatus.PENDING
        if status.active:
            task_status = TaskStatus.RUNNING
        elif status.succeeded:
            task_status = TaskStatus.SUCCEEDED
        elif status.failed:
            task_status = TaskStatus.FAILED

        return TrainingJob(
            job_id=job.metadata.name,
            model_name=job.metadata.labels.get("model", "unknown"),
            config={},
            status=task_status,
            resources=GPUResource(model="unknown", count=0, memory_gb=0),
            created_at=job.metadata.creation_timestamp,
            started_at=status.start_time,
            finished_at=status.completion_time,
            metrics={}
        )

# ============== 分布式训练支持 ==============

class DistributedTrainingJob:
    """分布式训练任务管理器"""

    def __init__(self, training_service: TrainingService):
        self.training_service = training_service

    def submit_distributed_job(
        self,
        config: Dict,
        strategy: str,  # data_parallel, tensor_parallel, pipeline_parallel
        num_nodes: int,
        gpus_per_node: int
    ) -> str:
        """
        提交分布式训练任务

        Args:
            strategy: 并行策略
            num_nodes: 节点数量
            gpus_per_node: 每节点GPU数
        """
        # 设置分布式配置
        config["distributed"] = {
            "strategy": strategy,
            "world_size": num_nodes * gpus_per_node,
            "backend": "nccl"  # NCCL for NVIDIA GPUs
        }

        # 提交多节点训练
        resources = GPUResource(
            model="unknown",
            count=gpus_per_node,
            memory_gb=40 * gpus_per_node
        )

        if strategy == "tensor_parallel":
            # 张量并行需要更多通信,降低并发度
            # 这里简化处理
            pass

        job = self.training_service.submit_training_job(
            config=config,
            resources=resources,
            priority=1  # 分布式训练通常优先级较高
        )

        return job.job_id

# ============== 使用示例 ==============

if __name__ == "__main__":
    # 创建训练服务
    training_service = KubernetesTrainingService(
        namespace="ml-training",
        image="ml-train-runner:latest"
    )

    # 提交训练任务
    config = {
        "model_name": "gpt2-finetune",
        "dataset_path": "/data/dataset",
        "output_path": "/data/output",
        "hyperparameters": {
            "learning_rate": 1e-4,
            "batch_size": 32,
            "epochs": 3,
            "max_seq_length": 512
        },
        "checkpointing": {
            "save_steps": 500,
            "save_total_limit": 3
        }
    }

    resources = GPUResource(
        model="A100",
        count=4,
        memory_gb=160
    )

    job = training_service.submit_training_job(
        config=config,
        resources=resources,
        priority=1
    )

    print(f"Submitted training job: {job.job_id}")

    # 监控任务
    import time
    while True:
        status = training_service.get_job_status(job.job_id)
        print(f"Job status: {status.status}")

        if status.status in [TaskStatus.SUCCEEDED, TaskStatus.FAILED]:
            break

        time.sleep(30)

    # 获取日志
    logs = training_service.get_job_logs(job.job_id)
    print(f"Logs:\n{logs}")

模型推理服务

3.1 推理服务实现

# mlplatform/inference_service.py
"""
模型推理服务实现
包含:模型部署、自动扩缩容、请求路由
"""

from typing import Dict, List, Optional
from kubernetes import client, config
from kubernetes.client import V1Deployment, V1Service, V1Ingress
import yaml

class KubernetesInferenceService(InferenceService):
    """基于Kubernetes的推理服务"""

    def __init__(
        self,
        namespace: str = "ml-inference",
        base_image: str = "ml-inference-server:latest"
    ):
        self.namespace = namespace
        self.base_image = base_image

        config.load_kube_config()
        self.apps_api = client.AppsV1Api()
        self.core_api = client.CoreV1Api()
        self.networking_api = client.NetworkingV1Api()

    def deploy_model(
        self,
        model_id: str,
        replicas: int = 1,
        resources: GPUResource = None,
        autoscaling: bool = True
    ) -> str:
        """部署模型"""
        deployment_name = f"model-{model_id}"

        # 创建Deployment
        deployment = self._build_deployment(
            name=deployment_name,
            model_id=model_id,
            replicas=replicas,
            resources=resources
        )

        self.apps_api.create_namespaced_deployment(
            namespace=self.namespace,
            body=deployment
        )

        # 创建Service
        service = self._build_service(
            name=deployment_name,
            selector={"app": deployment_name}
        )

        self.core_api.create_namespaced_service(
            namespace=self.namespace,
            body=service
        )

        # 创建HPA(自动扩缩容)
        if autoscaling and resources:
            self._create_hpa(
                name=deployment_name,
                target_cpu=70,
                min_replicas=replicas,
                max_replicas=replicas * 4
            )

        return deployment_name

    def scale_model(
        self,
        deployment_id: str,
        replicas: int
    ) -> bool:
        """扩缩容"""
        try:
            # 读取当前deployment
            deployment = self.apps_api.read_namespaced_deployment(
                name=deployment_id,
                namespace=self.namespace
            )

            # 更新副本数
            deployment.spec.replicas = replicas

            # 应用更新
            self.apps_api.patch_namespaced_deployment(
                name=deployment_id,
                namespace=self.namespace,
                body=deployment
            )

            return True

        except client.ApiException:
            return False

    def undeploy_model(self, deployment_id: str) -> bool:
        """下线模型"""
        try:
            # 删除HPA
            try:
                self.autoscaling_api.delete_namespaced_horizontal_pod_autoscaler(
                    name=deployment_id,
                    namespace=self.namespace
                )
            except:
                pass

            # 删除Service
            self.core_api.delete_namespaced_service(
                name=deployment_id,
                namespace=self.namespace
            )

            # 删除Deployment
            self.apps_api.delete_namespaced_deployment(
                name=deployment_id,
                namespace=self.namespace
            )

            return True

        except client.ApiException:
            return False

    def get_deployment_status(self, deployment_id: str) -> Dict:
        """获取部署状态"""
        try:
            deployment = self.apps_api.read_namespaced_deployment(
                name=deployment_id,
                namespace=self.namespace
            )

            pods = self.core_api.list_namespaced_pod(
                namespace=self.namespace,
                label_selector=f"app={deployment_id}"
            )

            return {
                "name": deployment_id,
                "replicas": deployment.spec.replicas,
                "ready_replicas": deployment.status.ready_replicas,
                "available_replicas": deployment.status.available_replicas,
                "updated_replicas": deployment.status.updated_replicas,
                "pods": [
                    {
                        "name": pod.metadata.name,
                        "status": pod.status.phase,
                        "ready": all(
                            cs.ready for cs in (pod.status.container_statuses or [])
                        )
                    }
                    for pod in pods.items
                ]
            }

        except client.ApiException as e:
            if e.status == 404:
                return {"name": deployment_id, "exists": False}
            raise

    def _build_deployment(
        self,
        name: str,
        model_id: str,
        replicas: int,
        resources: GPUResource
    ) -> V1Deployment:
        """构建Deployment"""
        resource_requests = {
            "cpu": "2",
            "memory": "8G"
        }

        resource_limits = {
            "cpu": "4",
            "memory": "16G"
        }

        if resources:
            if resources.count > 0:
                resource_limits["nvidia.com/gpu"] = str(resources.count)

        container = client.V1Container(
            name="inference",
            image=self.base_image,
            env=[
                client.V1EnvVar(name="MODEL_ID", value=model_id),
                client.V1EnvVar(name="PORT", value="8000")
            ],
            ports=[client.V1ContainerPort(container_port=8000)],
            resources=client.V1ResourceRequirements(
                requests=resource_requests,
                limits=resource_limits
            ),
            liveness_probe=client.V1Probe(
                http_get=client.V1HTTPGetAction(
                    path="/health",
                    port=8000
                ),
                initial_delay_seconds=30,
                period_seconds=10
            ),
            readiness_probe=client.V1Probe(
                http_get=client.V1HTTPGetAction(
                    path="/ready",
                    port=8000
                ),
                initial_delay_seconds=10,
                period_seconds=5
            )
        )

        deployment = V1Deployment(
            metadata=client.V1ObjectMeta(
                name=name,
                labels={"app": name, "model": model_id}
            ),
            spec=client.V1DeploymentSpec(
                replicas=replicas,
                selector=client.V1LabelSelector(match_labels={"app": name}),
                template=client.V1PodTemplateSpec(
                    metadata=client.V1ObjectMeta(
                        labels={"app": name, "model": model_id}
                    ),
                    spec=client.V1PodSpec(
                        containers=[container]
                    )
                )
            )
        )

        return deployment

    def _build_service(
        self,
        name: str,
        selector: Dict[str, str]
    ) -> V1Service:
        """构建Service"""
        service = V1Service(
            metadata=client.V1ObjectMeta(name=name),
            spec=client.V1ServiceSpec(
                selector=selector,
                ports=[client.V1ServicePort(
                    port=80,
                    target_port=8000,
                    protocol="TCP"
                )],
                type="ClusterIP"
            )
        )

        return service

    def _create_hpa(
        self,
        name: str,
        target_cpu: int,
        min_replicas: int,
        max_replicas: int
    ):
        """创建HPA"""
        hpa = client.V1HorizontalPodAutoscaler(
            metadata=client.V1ObjectMeta(name=name),
            spec=client.V2HorizontalPodAutoscalerSpec(
                scale_target_ref=client.V2CrossVersionObjectReference(
                    kind="Deployment",
                    name=name
                ),
                min_replicas=min_replicas,
                max_replicas=max_replicas,
                metrics=[
                    client.V2MetricSpec(
                        type="Resource",
                        resource=client.V2ResourceMetricSource(
                            name="cpu",
                            target=client.V2MetricTarget(
                                type="Utilization",
                                average_utilization=target_cpu
                            )
                        )
                    )
                ]
            )
        )

        self.autoscaling_api.create_namespaced_horizontal_pod_autoscaler(
            namespace=self.namespace,
            body=hpa
        )

# ============== 模型路由器 ==============

class ModelRouter:
    """模型路由器"""

    def __init__(self):
        self.routes: Dict[str, str] = {}
        self.load_balancer = LoadBalancer()

    def register_model(
        self,
        model_id: str,
        endpoint: str,
        priority: int = 0
    ):
        """注册模型路由"""
        self.routes[model_id] = {
            "endpoint": endpoint,
            "priority": priority,
            "healthy": True
        }

    def get_endpoint(self, model_id: str) -> Optional[str]:
        """获取模型端点"""
        if model_id not in self.routes:
            return None

        route = self.routes[model_id]
        if route["healthy"]:
            return route["endpoint"]
        return None

    def route_request(
        self,
        model_id: str,
        request: Dict
    ) -> Dict:
        """路由请求"""
        endpoint = self.get_endpoint(model_id)
        if not endpoint:
            raise ValueError(f"Model {model_id} not available")

        return self.load_balancer.send_request(endpoint, request)

class LoadBalancer:
    """负载均衡器"""

    def __init__(self, strategy: str = "round_robin"):
        self.strategy = strategy
        self.endpoints: List[str] = []
        self.current_index = 0

    def add_endpoint(self, endpoint: str):
        """添加端点"""
        if endpoint not in self.endpoints:
            self.endpoints.append(endpoint)

    def remove_endpoint(self, endpoint: str):
        """移除端点"""
        if endpoint in self.endpoints:
            self.endpoints.remove(endpoint)

    def send_request(self, endpoint: str, request: Dict) -> Dict:
        """发送请求"""
        import requests

        response = requests.post(
            f"{endpoint}/predict",
            json=request,
            timeout=30
        )

        return response.json()

# ============== 使用示例 ==============

if __name__ == "__main__":
    # 创建推理服务
    inference_service = KubernetesInferenceService(
        namespace="ml-inference"
    )

    # 部署模型
    resources = GPUResource(
        model="A100",
        count=2,
        memory_gb=80
    )

    deployment_id = inference_service.deploy_model(
        model_id="gpt2-finetune-v1",
        replicas=2,
        resources=resources,
        autoscaling=True
    )

    print(f"Deployed model: {deployment_id}")

    # 检查状态
    import time
    time.sleep(10)

    status = inference_service.get_deployment_status(deployment_id)
    print(f"Deployment status: {status}")

    # 扩容
    inference_service.scale_model(deployment_id, replicas=4)
    print("Scaled to 4 replicas")

    # 下线
    # inference_service.undeploy_model(deployment_id)

工作流编排

4.1 Argo Workflows 集成

# mlplatform/workflow_engine.py
"""
工作流引擎实现
基于Argo Workflows
"""

from typing import Dict, List, Optional
import yaml
import requests

class ArgoWorkflowEngine:
    """Argo Workflows引擎"""

    def __init__(
        self,
        server_url: str,
        namespace: str = "ml-workflows",
        token: str = None
    ):
        self.server_url = server_url
        self.namespace = namespace
        self.token = token

    def submit_workflow(
        self,
        workflow_def: Dict,
        parameters: Dict = None,
        labels: Dict = None
    ) -> str:
        """提交工作流"""
        # 设置参数
        if parameters:
            for param_name, param_value in parameters.items():
                self._set_workflow_parameter(workflow_def, param_name, param_value)

        # 设置标签
        if labels:
            if "metadata" not in workflow_def:
                workflow_def["metadata"] = {}
            if "labels" not in workflow_def["metadata"]:
                workflow_def["metadata"]["labels"] = {}
            workflow_def["metadata"]["labels"].update(labels)

        # 提交
        response = requests.post(
            f"{self.server_url}/api/v1/workflows/{self.namespace}",
            headers=self._get_headers(),
            json=workflow_def
        )

        if response.status_code != 200:
            raise RuntimeError(f"Failed to submit workflow: {response.text}")

        data = response.json()
        return data["metadata"]["name"]

    def get_workflow_status(self, workflow_id: str) -> Dict:
        """获取工作流状态"""
        response = requests.get(
            f"{self.server_url}/api/v1/workflows/{self.namespace}/{workflow_id}",
            headers=self._get_headers()
        )

        if response.status_code != 200:
            raise RuntimeError(f"Failed to get workflow: {response.text}")

        return response.json()

    def list_workflows(
        self,
        labels: Dict = None,
        phase: str = None
    ) -> List[Dict]:
        """列出工作流"""
        params = []

        if labels:
            label_selector = ",".join([f"{k}={v}" for k, v in labels.items()])
            params.append(f"labelSelector={label_selector}")

        if phase:
            params.append(f"phase={phase}")

        url = f"{self.server_url}/api/v1/workflows/{self.namespace}"
        if params:
            url += "?" + "&".join(params)

        response = requests.get(url, headers=self._get_headers())

        if response.status_code != 200:
            raise RuntimeError(f"Failed to list workflows: {response.text}")

        data = response.json()
        return data["items"]

    def retry_workflow(self, workflow_id: str, node_id: str = None) -> str:
        """重试工作流"""
        response = requests.put(
            f"{self.server_url}/api/v1/workflows/{self.namespace}/{workflow_id}/retry",
            headers=self._get_headers(),
            json={"nodeFieldSelector": f"nodeId={node_id}" if node_id else ""}
        )

        if response.status_code != 200:
            raise RuntimeError(f"Failed to retry workflow: {response.text}")

        data = response.json()
        return data["metadata"]["name"]

    def _get_headers(self) -> Dict[str, str]:
        """获取请求头"""
        headers = {"Content-Type": "application/json"}
        if self.token:
            headers["Authorization"] = f"Bearer {self.token}"
        return headers

    def _set_workflow_parameter(self, workflow: Dict, name: str, value: any):
        """设置工作流参数"""
        if "spec" not in workflow:
            workflow["spec"] = {}
        if "arguments" not in workflow["spec"]:
            workflow["spec"]["arguments"] = {}
        if "parameters" not in workflow["spec"]["arguments"]:
            workflow["spec"]["arguments"]["parameters"] = []

        # 检查参数是否已存在
        for param in workflow["spec"]["arguments"]["parameters"]:
            if param["name"] == name:
                param["value"] = str(value)
                return

        # 添加新参数
        workflow["spec"]["arguments"]["parameters"].append({
            "name": name,
            "value": str(value)
        })

# ============== ML Pipeline构建器 ==============

class MLPipelineBuilder:
    """ML Pipeline构建器"""

    def __init__(self, name: str):
        self.name = name
        self.steps: List[Dict] = []
        self.parameters: List[Dict] = []

    def add_parameter(self, name: str, default: any = None) -> "MLPipelineBuilder":
        """添加参数"""
        self.parameters.append({
            "name": name,
            "value": str(default) if default is not None else None
        })
        return self

    def add_data_preparation(
        self,
        image: str,
        script: str,
        arguments: Dict = None
    ) -> "MLPipelineBuilder":
        """添加数据准备步骤"""
        step = {
            "name": "data-prep",
            "template": "data-prep",
            "arguments": self._build_arguments(arguments)
        }
        self.steps.append(step)
        return self

    def add_training(
        self,
        image: str,
        resources: Dict = None,
        arguments: Dict = None
    ) -> "MLPipelineBuilder":
        """添加训练步骤"""
        step = {
            "name": "training",
            "template": "training",
            "arguments": self._build_arguments(arguments)
        }
        self.steps.append(step)
        return self

    def add_evaluation(
        self,
        image: str,
        metrics: List[str],
        arguments: Dict = None
    ) -> "MLPipelineBuilder":
        """添加评估步骤"""
        step = {
            "name": "evaluation",
            "template": "evaluation",
            "arguments": self._build_arguments({
                "metrics": ",".join(metrics),
                **(arguments or {})
            })
        }
        self.steps.append(step)
        return self

    def add_model_publish(
        self,
        registry: str,
        arguments: Dict = None
    ) -> "MLPipelineBuilder":
        """添加模型发布步骤"""
        step = {
            "name": "publish",
            "template": "publish",
            "arguments": self._build_arguments({
                "registry": registry,
                **(arguments or {})
            })
        }
        self.steps.append(step)
        return self

    def build(self) -> Dict:
        """构建Argo Workflow定义"""
        workflow = {
            "apiVersion": "argoproj.io/v1alpha1",
            "kind": "Workflow",
            "metadata": {
                "generateName": f"{self.name}-"
            },
            "spec": {
                "entrypoint": "ml-pipeline",
                "arguments": {
                    "parameters": self.parameters
                },
                "templates": [
                    {
                        "name": "ml-pipeline",
                        "steps": self.steps
                    }
                ]
            }
        }

        return workflow

    def _build_arguments(self, params: Dict) -> List[Dict]:
        """构建参数列表"""
        if not params:
            return []
        return [{"name": k, "value": str(v)} for k, v in params.items()]

# ============== 标准ML Pipeline模板 ==============

STANDARD_ML_PIPELINE_TEMPLATE = """
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: ml-pipeline-
spec:
  entrypoint: ml-pipeline
  arguments:
    parameters:
      - name: model-name
      - name: dataset-path
      - name: learning-rate
        value: "0.001"
      - name: batch-size
        value: "32"
      - name: epochs
        value: "10"
  templates:
    - name: ml-pipeline
      steps:
        - - name: data-prep
            template: data-prep
            arguments:
              - name: dataset-path
                value: "{{workflow.parameters.dataset-path}}"

        - - name: training
            template: training
            arguments:
              - name: model-name
                value: "{{workflow.parameters.model-name}}"
              - name: learning-rate
                value: "{{workflow.parameters.learning-rate}}"
              - name: batch-size
                value: "{{workflow.parameters.batch-size}}"
              - name: epochs
                value: "{{workflow.parameters.epochs}}"
              - name: data-path
                value: "{{steps.data-prep.outputs.parameters.processed-data}}"

        - - name: evaluation
            template: evaluation
            arguments:
              - name: model-path
                value: "{{steps.training.outputs.parameters.model-path}}"

        - - name: publish
            template: publish
            arguments:
              - name: model-path
                value: "{{steps.evaluation.outputs.parameters.model-path}}"
              - name: metrics
                value: "{{steps.evaluation.outputs.parameters.metrics}}"

    - name: data-prep
      container:
        image: python:3.9
        command: ["python", "scripts/data_prep.py"]
        args:
          - --dataset-path={{inputs.parameters.dataset-path}}
          - --output-path=/data/processed
      inputs:
        parameters:
          - name: dataset-path
      outputs:
        parameters:
          - name: processed-data
            valueFrom: {path: /data/processed}

    - name: training
      container:
        image: python:3.9-gpu
        command: ["python", "scripts/train.py"]
        args:
          - --model-name={{inputs.parameters.model-name}}
          - --lr={{inputs.parameters.learning-rate}}
          - --batch-size={{inputs.parameters.batch-size}}
          - --epochs={{inputs.parameters.epochs}}
          - --data-path={{inputs.parameters.data-path}}
          - --output-path=/output
        resources:
          limits:
            nvidia.com/gpu: 4
      inputs:
        parameters:
          - name: model-name
          - name: learning-rate
          - name: batch-size
          - name: epochs
          - name: data-path
      outputs:
        parameters:
          - name: model-path
            valueFrom: {path: /output/model}

    - name: evaluation
      container:
        image: python:3.9
        command: ["python", "scripts/evaluate.py"]
        args:
          - --model-path={{inputs.parameters.model-path}}
          - --output=/output/metrics.json
      inputs:
        parameters:
          - name: model-path
      outputs:
        parameters:
          - name: model-path
            value: "{{inputs.parameters.model-path}}"
          - name: metrics
            valueFrom: {path: /output/metrics.json}

    - name: publish
      container:
        image: python:3.9
        command: ["python", "scripts/publish.py"]
        args:
          - --model-path={{inputs.parameters.model-path}}
          - --metrics={{inputs.parameters.metrics}}
      inputs:
        parameters:
          - name: model-path
          - name: metrics
"""

# ============== 使用示例 ==============

if __name__ == "__main__":
    # 创建工作流引擎
    workflow_engine = ArgoWorkflowEngine(
        server_url="http://argo-server.argo.svc:2746",
        namespace="ml-workflows"
    )

    # 加载标准模板
    workflow_def = yaml.safe_load(STANDARD_ML_PIPELINE_TEMPLATE)

    # 提交工作流
    workflow_id = workflow_engine.submit_workflow(
        workflow_def=workflow_def,
        parameters={
            "model-name": "gpt2-finetune",
            "dataset-path": "/datasets/train",
            "learning-rate": "0.0001",
            "batch-size": "64",
            "epochs": "5"
        },
        labels={
            "project": "ml-platform",
            "model": "gpt2"
        }
    )

    print(f"Submitted workflow: {workflow_id}")

    # 监控工作流
    import time
    while True:
        status = workflow_engine.get_workflow_status(workflow_id)
        phase = status["status"]["phase"]
        print(f"Workflow phase: {phase}")

        if phase in ["Succeeded", "Failed", "Error"]:
            break

        time.sleep(10)

    # 使用Pipeline构建器
    builder = MLPipelineBuilder("custom-pipeline") \
        .add_parameter("model-name", "custom-model") \
        .add_parameter("dataset-path") \
        .add_parameter("learning-rate", "0.001")

    workflow_def = builder.build()
    workflow_id = workflow_engine.submit_workflow(workflow_def)
    print(f"Custom workflow: {workflow_id}")

资源调度

5.1 GPU资源管理

# mlplatform/resource_scheduler.py
"""
资源调度器实现
支持GPU资源隔离、优先级调度
"""

from typing import Dict, List, Optional
from dataclasses import dataclass, field
from enum import Enum
import heapq

class SchedulingPolicy(Enum):
    """调度策略"""
    FIFO = "fifo"
    PRIORITY = "priority"
    BIN_PACKING = "bin_packing"  # 资源优化分配
    FAIR_SHARE = "fair_share"  # 公平共享

@dataclass
class Node:
    """计算节点"""
    name: str
    total_gpus: int
    gpu_memory_gb: int
    available_gpus: int
    assigned_jobs: List[str] = field(default_factory=list)

    def can_allocate(self, gpu_count: int, memory_gb: int) -> bool:
        """检查是否可以分配"""
        return (
            self.available_gpus >= gpu_count and
            self.gpu_memory_gb >= memory_gb
        )

    def allocate(self, job_id: str, gpu_count: int):
        """分配资源"""
        self.available_gpus -= gpu_count
        self.assigned_jobs.append(job_id)

    def release(self, job_id: str, gpu_count: int):
        """释放资源"""
        self.available_gpus += gpu_count
        if job_id in self.assigned_jobs:
            self.assigned_jobs.remove(job_id)

@dataclass(order=True)
class PendingJob:
    """等待调度的任务"""
    priority: int
    job_id: str
    gpu_count: int
    memory_gb: int
    timestamp: float
    submit_time: float

class GPUResourceScheduler(ResourceScheduler):
    """GPU资源调度器"""

    def __init__(self, policy: SchedulingPolicy = SchedulingPolicy.PRIORITY):
        self.policy = policy
        self.nodes: Dict[str, Node] = {}
        self.pending_queue: List[PendingJob] = []
        self.allocated: Dict[str, Dict] = {}  # job_id -> {node_name, gpu_count}

    def add_node(self, name: str, gpus: int, memory_gb: int):
        """添加计算节点"""
        self.nodes[name] = Node(
            name=name,
            total_gpus=gpus,
            gpu_memory_gb=memory_gb,
            available_gpus=gpus
        )

    def schedule_job(
        self,
        job_id: str,
        resources: GPUResource,
        priority: int = 0
    ) -> bool:
        """调度任务"""
        pending_job = PendingJob(
            priority=-priority,  # 最小堆,负数实现最大堆
            job_id=job_id,
            gpu_count=resources.count,
            memory_gb=resources.memory_gb,
            timestamp=time.time(),
            submit_time=time.time()
        )

        # 尝试立即调度
        if self._try_schedule(pending_job):
            return True

        # 无法立即调度,加入等待队列
        heapq.push(self.pending_queue, pending_job)
        return False

    def release_resources(self, job_id: str) -> bool:
        """释放资源"""
        if job_id not in self.allocated:
            return False

        allocation = self.allocated.pop(job_id)
        node = self.nodes[allocation["node_name"]]
        node.release(job_id, allocation["gpu_count"])

        # 尝试调度等待队列中的任务
        self._reconcile()

        return True

    def get_available_resources(self) -> Dict[ResourceType, int]:
        """获取可用资源"""
        total_gpus = sum(node.available_gpus for node in self.nodes.values())
        total_memory = sum(node.gpu_memory_gb for node in self.nodes.values())

        return {
            ResourceType.GPU: total_gpus,
            ResourceType.MEMORY: total_memory
        }

    def get_queue_status(self) -> Dict:
        """获取队列状态"""
        return {
            "pending_count": len(self.pending_queue),
            "allocated_jobs": len(self.allocated),
            "total_nodes": capacity,
            "total_gpus": total_gpus,
            "available_gpus": sum(node.available_gpus for node in self.nodes.values())
        }

    def _try_schedule(self, job: PendingJob) -> bool:
        """尝试调度任务"""
        if self.policy == SchedulingPolicy.PRIORITY:
            return self._schedule_priority(job)
        elif self.policy == SchedulingPolicy.BIN_PACKING:
            return self._schedule_bin_packing(job)
        else:
            return self._schedule_fifo(job)

    def _schedule_priority(self, job: PendingJob) -> bool:
        """优先级调度"""
        # 找到第一个能容纳的节点
        for node in sorted(
            self.nodes.values(),
            key=lambda n: n.available_gpus,
            reverse=True
        ):
            if node.can_allocate(job.gpu_count, job.memory_gb):
                node.allocate(job.job_id, job.gpu_count)
                self.allocated[job.job_id] = {
                    "node_name": node.name,
                    "gpu_count": job.gpu_count
                }
                return True
        return False

    def _schedule_bin_packing(self, job: PendingJob) -> bool:
        """装箱算法(资源优化)"""
        # 找到最小满足条件的节点
        best_node = None
        min_waste = float('inf')

        for node in self.nodes.values():
            if node.can_allocate(job.gpu_count, job.memory_gb):
                waste = node.available_gpus - job.gpu_count
                if waste < min_waste:
                    min_waste = waste
                    best_node = node

        if best_node:
            best_node.allocate(job.job_id, job.gpu_count)
            self.allocated[job.job_id] = {
                "node_name": best_node.name,
                "gpu_count": job.gpu_count
            }
            return True

        return False

    def _schedule_fifo(self, job: PendingJob) -> bool:
        """FIFO调度"""
        for node in self.nodes.values():
            if node.can_allocate(job.gpu_count, job.memory_gb):
                node.allocate(job.job_id, job.gpu_count)
                self.allocated[job.job_id] = {
                    "node_name": node.name,
                    "gpu_count": job.gpu_count
                }
                return True
        return False

    def _reconcile(self):
        """重新调度等待队列"""
        scheduled = []
        remaining = []

        while self.pending_queue:
            job = heapq.heappop(self.pending_queue)
            if self._try_schedule(job):
                scheduled.append(job.job_id)
            else:
                remaining.append(job)

        self.pending_queue = remaining

# ============== Kueue集成 ==============

class KueueScheduler:
    """Kueue调度器集成"""

    def __init__(self, namespace: str = "kueue-system"):
        from kubernetes import client, config
        config.load_kube_config()

        self.namespace = namespace
        self.custom_api = client.CustomObjectsApi()

    def create_cluster_queue(
        self,
        name: str,
        gpu_capacity: int,
        cpu_capacity: str = "1000",
        memory_capacity: str = "4000G"
    ):
        """创建集群队列"""
        cluster_queue = {
            "apiVersion": "kueue.x-k8s.io/v1beta1",
            "kind": "ClusterQueue",
            "metadata": {
                "name": name
            },
            "spec": {
                "capacity": [
                    {
                        "name": "gpu",
                        "quotas": [{
                            "name": "nvidia.com/gpu",
                            "min": str(gpu_capacity)
                        }]
                    },
                    {
                        "name": "cpu",
                        "nominalQuota": cpu_capacity
                    },
                    {
                        "name": "memory",
                        "nominalQuota": memory_capacity
                    }
                ]
            }
        }

        self.custom_api.create_cluster_custom_object(
            group="kueue.x-k8s.io",
            version="v1beta1",
            plural="clusterqueues",
            body=cluster_queue
        )

    def create_local_queue(
        self,
        name: str,
        cluster_queue: str,
        namespace: str = "default"
    ):
        """创建本地队列"""
        local_queue = {
            "apiVersion": "kueue.x-k8s.io/v1beta1",
            "kind": "LocalQueue",
            "metadata": {
                "name": name,
                "namespace": namespace
            },
            "spec": {
                "clusterQueue": cluster_queue
            }
        }

        self.custom_api.create_namespaced_custom_object(
            group="kueue.x-k8s.io",
            version="v1beta1",
            plural="localqueues",
            namespace=namespace,
            body=local_queue
        )

# ============== 使用示例 ==============

if __name__ == "__main__":
    # 创建GPU调度器
    scheduler = GPUResourceScheduler(
        policy=SchedulingPolicy.PRIORITY
    )

    # 添加节点
    scheduler.add_node("node-1", gpus=8, memory_gb=320)  # 8x A100 40GB
    scheduler.add_node("node-2", gpus=4, memory_gb=160)  # 4x A100 40GB

    # 提交任务
    import time

    job1 = scheduler.schedule_job(
        job_id="train-job-1",
        resources=GPUResource(model="A100", count=4, memory_gb=160),
        priority=1
    )
    print(f"Job 1 scheduled: {job1}")

    job2 = scheduler.schedule_job(
        job_id="train-job-2",
        resources=GPUResource(model="A100", count=8, memory_gb=320),
        priority=2  # 高优先级
    )
    print(f"Job 2 scheduled: {job2}")

    job3 = scheduler.schedule_job(
        job_id="train-job-3",
        resources=GPUResource(model="A100", count=4, memory_gb=160),
        priority=0
    )
    print(f"Job 3 scheduled: {job3}")

    # 查看状态
    status = scheduler.get_queue_status()
    print(f"Queue status: {status}")

    # 释放资源
    scheduler.release_resources("train-job-1")
    print("Released job-1 resources")

    # 使用Kueue
    kueue = KueueScheduler()
    # kueue.create_cluster_queue("default-queue", gpu_capacity=16)
    # kueue.create_local_queue("training", "default-queue")

监控与可观测性

6.1 模型监控系统

# mlplatform/monitoring.py
"""
模型监控与可观测性
包含:指标收集、漂移检测、告警
"""

from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import numpy as np

@dataclass
class MetricRecord:
    """指标记录"""
    model_id: str
    metric_name: str
    value: float
    timestamp: datetime
    tags: Dict[str, str] = None

@dataclass
class DriftReport:
    """漂移报告"""
    model_id: str
    drift_detected: bool
    drift_score: float
    drift_type: str
    timestamp: datetime
    details: Dict = None

class ModelMonitor:
    """模型监控器"""

    def __init__(self):
        self.metrics: List[MetricRecord] = []
        self.baselines: Dict[str, Dict[str, float]] = {}

    def record_metric(
        self,
        model_id: str,
        metric_name: str,
        value: float,
        tags: Dict[str, str] = None
    ):
        """记录指标"""
        record = MetricRecord(
            model_id=model_id,
            metric_name=metric_name,
            value=value,
            timestamp=datetime.now(),
            tags=tags or {}
        )
        self.metrics.append(record)

    def set_baseline(
        self,
        model_id: str,
        metrics: Dict[str, float]
    ):
        """设置基准指标"""
        self.baselines[model_id] = metrics

    def get_metrics(
        self,
        model_id: str,
        metric_name: str,
        start_time: datetime,
        end_time: datetime
    ) -> List[MetricRecord]:
        """获取指标"""
        return [
            m for m in self.metrics
            if m.model_id == model_id
            and m.metric_name == metric_name
            and start_time <= m.timestamp <= end_time
        ]

    def get_average_metric(
        self,
        model_id: str,
        metric_name: str,
        window_minutes: int = 60
    ) -> float:
        """获取平均指标"""
        start_time = datetime.now() - timedelta(minutes=window_minutes)
        records = self.get_metrics(model_id, metric_name, start_time, datetime.now())

        if not records:
            return 0.0

        return sum(m.value for m in records) / len(records)

class DriftDetector:
    """数据漂移检测器"""

    def __init__(self, threshold: float = 0.3):
        self.threshold = threshold

    def detect_drift(
        self,
        baseline: np.ndarray,
        current: np.ndarray
    ) -> DriftReport:
        """
        检测数据漂移

        Returns:
            DriftReport: 漂移报告
        """
        # 方法1: KL散度
        kl_divergence = self._calculate_kl_divergence(baseline, current)

        # 方法2: PSI (Population Stability Index)
        psi_score = self._calculate_psi(baseline, current)

        # 方法3: Wasserstein距离
        wasserstein_dist = self._calculate_wasserstein(baseline, current)

        # 综合判断
        max_score = max(abs(kl_divergence), psi_score, wasserstein_dist)
        drift_detected = max_score > self.threshold

        # 确定漂移类型
        drift_type = "none"
        if drift_detected:
            if wasserstein_dist > self.threshold:
                drift_type = "distribution_shift"
            elif psi_score > self.threshold:
                drift_type = "population_stability"
            else:
                drift_type = "feature_drift"

        return DriftReport(
            model_id="unknown",
            drift_detected=drift_detected,
            drift_score=max_score,
            drift_type=drift_type,
            timestamp=datetime.now(),
            details={
                "kl_divergence": kl_divergence,
                "psi": psi_score,
                "wasserstein": wasserstein_dist
            }
        )

    def _calculate_kl_divergence(
        self,
        p: np.ndarray,
        q: np.ndarray
    ) -> float:
        """计算KL散度"""
        # 归一化为概率分布
        p_normalized = p / np.sum(p)
        q_normalized = q / np.sum(q)

        # 添加小值避免log(0)
        eps = 1e-10
        p_normalized = np.clip(p_normalized, eps, 1)
        q_normalized = np.clip(q_normalized, eps, 1)

        return np.sum(p_normalized * np.log(p_normalized / q_normalized))

    def _calculate_psi(
        self,
        expected: np.ndarray,
        actual: np.ndarray
    ) -> float:
        """计算PSI"""
        # 分桶
        bins = np.percentile(expected, np.linspace(0, 100, 11))
        expected_hist, _ = np.histogram(expected, bins=bins)
        actual_hist, _ = np.histogram(actual, bins=bins)

        # 归一化
        expected_pct = expected_hist / np.sum(expected_hist)
        actual_pct = actual_hist / np.sum(actual_hist)

        # 避免除零
        expected_pct = np.clip(expected_pct, 1e-10, 1)
        actual_pct = np.clip(actual_pct, 1e-10, 1)

        # 计算PSI
        psi = np.sum((expected_pct - actual_pct) * np.log(expected_pct / actual_pct))

        return psi

    def _calculate_wasserstein(
        self,
        p: np.ndarray,
        q: np.ndarray
    ) -> float:
        """计算Wasserstein距离"""
        from scipy.stats import wasserstein_distance
        return wasserstein_distance(p, q)

class PerformanceMonitor:
    """性能监控器"""

    def __init__(self):
        self.latencies: Dict[str, List[float]] = {}
        self.errors: Dict[str, List[Dict]] = {}

    def record_latency(
        self,
        model_id: str,
        latency_ms: float,
        request_id: str = None
    ):
        """记录延迟"""
        if model_id not in self.latencies:
            self.latencies[model_id] = []
        self.latencies[model_id].append(latency_ms)

    def record_error(
        self,
        model_id: str,
        error_type: str,
        error_message: str
    ):
        """记录错误"""
        if model_id not in self.errors:
            self.errors[model_id] = []
        self.errors[model_id].append({
            "type": error_type,
            "message": error_message,
            "timestamp": datetime.now()
        })

    def get_p95_latency(self, model_id: str) -> float:
        """获取P95延迟"""
        if model_id not in self.latencies or not self.latencies[model_id]:
            return 0.0

        latencies = self.latencies[model_id]
        return np.percentile(latencies, 95)

    def get_p99_latency(self, model_id: str) -> float:
        """获取P99延迟"""
        if model_id not in self.latencies or not self.latencies[model_id]:
            return 0.0

        latencies = self.latencies[model_id]
        return np.percentile(latencies, 99)

    def get_error_rate(
        self,
        model_id: str,
        window_minutes: int = 5
    ) -> float:
        """获取错误率"""
        if model_id not in self.errors:
            return 0.0

        cutoff_time = datetime.now() - timedelta(minutes=window_minutes)
        recent_errors = [
            e for e in self.errors[model_id]
            if e["timestamp"] >= cutoff_time
        ]

        total_requests = len(self.latencies[model_id])
        if total_requests == 0:
            return 0.0

        return len(recent_errors) / total_requests

class AlertManager:
    """告警管理器"""

    def __init__(self):
        self.alerts: List[Dict] = []
        self.alert_rules: Dict[str, Dict] = {}

    def add_alert_rule(
        self,
        rule_id: str,
        condition: callable,
        severity: str,
        channels: List[str]  # email, slack, webhook
    ):
        """添加告警规则"""
        self.alert_rules[rule_id] = {
            "condition": condition,
            "severity": severity,
            "channels": channels
        }

    def check_and_fire_alerts(self, metrics: Dict) -> List[Dict]:
        """检查并触发告警"""
        fired_alerts = []

        for rule_id, rule in self.alert_rules.items():
            if rule["condition"](metrics):
                alert = {
                    "rule_id": rule_id,
                    "severity": rule["severity"],
                    "timestamp": datetime.now(),
                    "metrics": metrics
                }
                self.alerts.append(alert)
                fired_alerts.append(alert)

                # 发送告警
                self._send_alert(alert, rule["channels"])

        return fired_alerts

    def _send_alert(self, alert: Dict, channels: List[str]):
        """发送告警"""
        for channel in channels:
            if channel == "slack":
                self._send_to_slack(alert)
            elif channel == "email":
                self._send_to_email(alert)
            elif channel == "webhook":
                self._send_to_webhook(alert)

    def _send_to_slack(self, alert: Dict):
        """发送Slack告警"""
        import requests
        webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
        if webhook_url:
            message = {
                "text": f"🚨 Alert: {alert['rule_id']}\n"
                        f"Severity: {alert['severity']}\n"
                        f"Timestamp: {alert['timestamp']}"
            }
            requests.post(webhook_url, json=message)

# ============== 使用示例 ==============

if __name__ == "__main__":
    # 创建监控器
    monitor = ModelMonitor()
    drift_detector = DriftDetector(threshold=0.3)
    perf_monitor = PerformanceMonitor()

    # 设置基准
    monitor.set_baseline("model-1", {
        "accuracy": 0.95,
        "latency_p95": 50,
        "error_rate": 0.01
    })

    # 记录指标
    monitor.record_metric("model-1", "accuracy", 0.94)
    monitor.record_metric("model-1", "latency_p95", 45)
    monitor.record_metric("model-1", "error_rate", 0.015)

    # 记录性能
    perf_monitor.record_latency("model-1", 42.5)
    perf_monitor.record_latency("model-1", 38.2)
    perf_monitor.record_latency("model-1", 51.7)
    perf_monitor.record_latency("model-1", 125.3)  # 慢请求

    print(f"P95 Latency: {perf_monitor.get_p95_latency('model-1')} ms")
    print(f"P99 Latency: {perf_monitor.get_p99_latency('model-1')} ms")

    # 检测漂移
    baseline_data = np.random.normal(0, 1, 1000)
    current_data = np.random.normal(0.5, 1.2, 1000)  # 漂移数据

    drift_report = drift_detector.detect_drift(baseline_data, current_data)
    print(f"Drift detected: {drift_report.drift_detected}")
    print(f"Drift score: {drift_report.drift_score}")
    print(f"Drift type: {drift_report.drift_type}")

    # 设置告警
    alert_manager = AlertManager()

    alert_manager.add_alert_rule(
        rule_id="high-latency",
        condition=lambda m: m.get("latency_p95", 0) > 100,
        severity="warning",
        channels=["slack"]
    )

    alert_manager.add_alert_rule(
        rule_id="high-error-rate",
        condition=lambda m: m.get("error_rate", 0) > 0.05,
        severity="critical",
        channels=["slack", "email"]
    )

    # 检查告警
    current_metrics = {
        "latency_p95": perf_monitor.get_p95_latency("model-1"),
        "error_rate": monitor.get_average_metric("model-1", "error_rate")
    }

    alerts = alert_manager.check_and_fire_alerts(current_metrics)
    print(f"Fired alerts: {len(alerts)}")

参考资料

K8s调度框架

MLOps工具


文档版本: 1.0
最后更新: 2026-01-22

close
arrow_upward