内容纲要
模块31: 多模态大模型应用开发
对应JD需求:寒武纪人工智能项目、北京寰京AI技术落地
核心技术:文生图、图生视频、多模态融合、视觉识别、语音处理
目录
1. 多模态AI基础
1.1 多模态学习范式
from enum import Enum
from typing import Dict, Any
class Modality(Enum):
"""模态类型"""
TEXT = "text"
IMAGE = "image"
AUDIO = "audio"
VIDEO = "video"
TABULAR = "tabular"
class MultiModalTask(Enum):
"""多模态任务类型"""
# 生成任务
TEXT_TO_IMAGE = "text_to_image" # 文生图
IMAGE_TO_TEXT = "image_to_text" # 图生文
TEXT_TO_VIDEO = "text_to_video" # 文生视频
IMAGE_TO_VIDEO = "image_to_video" # 图生视频
TEXT_TO_AUDIO = "text_to_audio" # 文生语音
# 理解任务
VISUAL_QUESTION_ANSWERING = "vqa" # 视觉问答
IMAGE_CLASSIFICATION = "image_cls" # 图像分类
OBJECT_DETECTION = "object_detection" # 目标检测
IMAGE_CAPTIONING = "image_captioning" # 图像描述
# 编辑任务
IMAGE_INPAINTING = "image_inpainting" # 图像修复
IMAGE_EDITING = "image_editing" # 图像编辑
VIDEO_EDITING = "video_editing" # 视频编辑
class MultiModalInput:
"""多模态输入封装"""
def __init__(self, data: Dict[Modality, Any]):
self.data = data
def get(self, modality: Modality) -> Any:
"""获取指定模态的数据"""
return self.data.get(modality)
def has(self, modality: Modality) -> bool:
"""检查是否包含指定模态"""
return modality in self.data
def modalities(self) -> list:
"""返回包含的模态列表"""
return list(self.data.keys())
# 使用示例
def create_multimodal_input():
# 创建多模态输入
input_data = MultiModalInput({
Modality.TEXT: "一只可爱的猫坐在沙发上",
Modality.IMAGE: "path/to/image.jpg", # 或numpy array
Modality.AUDIO: "path/to/audio.wav"
})
print(f"包含模态: {input_data.modalities()}")
return input_data
1.2 多模态架构类型
┌─────────────────────────────────────────────────────────────┐
│ 多模态架构类型 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. Early Fusion (早期融合) │
│ ┌──────────────┐ │
│ │ Text Embed │ ┌─────────────────┐ │
│ └──────┬───────┘ │ │ │
│ │ │ Joint Model │ │
│ ┌──────▼───────┐ │ │ │
│ │ Image Embed │───▶│ (Fusion) │───▶ Output │
│ └──────────────┘ │ │ │
│ └─────────────────┘ │
│ │
│ 2. Late Fusion (晚期融合) │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Text Embed │───▶│ Text Model │ │
│ └──────────────┘ └───────┬──────┘ │
│ Text Output │ │
│ │ ┌──────────────┐ │
│ └─────▶│ Fusion │───▶ Output
│ └───────▲──────┘ │
│ ┌──────────────┐ ┌──────────────┐ │ │
│ │ Image Embed │───▶│ Image Model │────────┘ │
│ └──────────────┘ └───────┬──────┘ │
│ Image Output│ │
│ │
│ 3. Cross-Attention Fusion (交叉注意力融合) │
│ ┌──────────────┐ │
│ │ Text Embed │─────────────┐ │
│ └──────────────┘ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ Transformer │ │
│ │ Cross-Attn │───▶ Output │
│ └───────▲───────┘ │
│ ┌──────────────┐ │ │
│ │ Image Embed │────────┘ │
│ └──────────────┘ │
│ │
│ 4. Encoder-Decoder (编码器-解码器) │
│ ┌──────────────┐ ┌─────────────────┐ ┌───────────┐ │
│ │ Input Modal │───▶│ Shared │──▶│ Output │ │
│ │ Encoder │ │ Encoder │ │ Decoder │ │
│ └──────────────┘ └─────────────────┘ └───────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
1.3 多模态嵌入对齐
import torch
import torch.nn as nn
from typing import Dict, Optional
class MultiModalEmbedding(nn.Module):
"""多模态嵌入对齐层"""
def __init__(
self,
text_dim: int = 768,
image_dim: int = 768,
audio_dim: int = 512,
output_dim: int = 768
):
super().__init__()
# 投影层:将不同模态嵌入对齐到统一维度
self.text_projector = nn.Linear(text_dim, output_dim)
self.image_projector = nn.Linear(image_dim, output_dim)
self.audio_projector = nn.Linear(audio_dim, output_dim)
# 可选:添加模态标识符
self.modality_tokens = nn.Parameter(
torch.randn(4, output_dim) # 4种模态
)
# 层归一化
self.layer_norm = nn.LayerNorm(output_dim)
def forward(
self,
text_embed: Optional[torch.Tensor] = None,
image_embed: Optional[torch.Tensor] = None,
audio_embed: Optional[torch.Tensor] = None
) -> Dict[str, torch.Tensor]:
"""
参数:
- text_embed: [batch, seq_len, text_dim]
- image_embed: [batch, num_patches, image_dim]
- audio_embed: [batch, audio_len, audio_dim]
"""
outputs = {}
# 文本嵌入投影
if text_embed is not None:
projected = self.text_projector(text_embed)
# 添加模态标识符
token = self.modality_tokens[0].unsqueeze(0).unsqueeze(1)
projected = torch.cat([token.expand_as(projected[:, :1, :]), projected], dim=1)
outputs['text'] = self.layer_norm(projected)
# 图像嵌入投影
if image_embed is not None:
projected = self.image_projector(image_embed)
token = self.modality_tokens[1].unsqueeze(0).unsqueeze(1)
projected = torch.cat([token.expand_as(projected[:, :1, :]), projected], dim=1)
outputs['image'] = self.layer_norm(projected)
# 音频嵌入投影
if audio_embed is not None:
projected = self.audio_projector(audio_embed)
token = self.modality_tokens[2].unsqueeze(0).unsqueeze(1)
projected = torch.cat([token.expand_as(projected[:, :1, :]), projected], dim=1)
outputs['audio'] = self.layer_norm(projected)
return outputs
class CrossModalAttention(nn.Module):
"""跨模态注意力机制"""
def __init__(self, dim: int = 768, num_heads: int = 8):
super().__init__()
self.dim = dim
self.num_heads = num_heads
self.head_dim = dim // num_heads
# 交叉注意力层
self.cross_attn = nn.MultiheadAttention(dim, num_heads, batch_first=True)
self.layer_norm1 = nn.LayerNorm(dim)
self.layer_norm2 = nn.LayerNorm(dim)
# 前馈网络
self.ffn = nn.Sequential(
nn.Linear(dim, dim * 4),
nn.GELU(),
nn.Linear(dim, dim)
)
def forward(
self,
query_modal: torch.Tensor, # 主模态
context_modal: torch.Tensor # 上下文模态
) -> torch.Tensor:
"""
使用context_modal的上下文信息增强query_modal
"""
# 残差连接 + 交叉注意力
attn_out, _ = self.cross_attn(
query_modal, # Query
context_modal, # Key & Value
context_modal
)
query_modal = self.layer_norm1(query_modal + attn_out)
# 残差连接 + FFN
ffn_out = self.ffn(query_modal)
query_modal = self.layer_norm2(query_modal + ffn_out)
return query_modal
class MultiModalFusion(nn.Module):
"""多模态融合模块"""
def __init__(self, dim: int = 768):
super().__init__()
# 跨模态注意力(双向)
self.text_to_image_attn = CrossModalAttention(dim)
self.image_to_text_attn = CrossModalAttention(dim)
# 融合后的处理
self.fusion_layer = nn.TransformerEncoderLayer(
d_model=dim,
nhead=8,
dim_feedforward=dim * 4,
dropout=0.1,
batch_first=True
)
def forward(
self,
text_embed: torch.Tensor,
image_embed: torch.Tensor
) -> torch.Tensor:
"""
融合文本和图像嵌入
参数:
- text_embed: [batch, seq_len, dim]
- image_embed: [batch, num_patches, dim]
返回:
- fused: [batch, seq_len + num_patches, dim]
"""
# 跨模态注意力增强
enhanced_text = self.text_to_image_attn(text_embed, image_embed)
enhanced_image = self.image_to_text_attn(image_embed, text_embed)
# 拼接
fused = torch.cat([enhanced_text, enhanced_image], dim=1)
# 融合层处理
fused = self.fusion_layer(fused)
return fused
2. 文生图应用开发
2.1 Stable Diffusion API封装
import torch
from diffusers import (
StableDiffusionPipeline,
StableDiffusionXLPipeline,
StableDiffusionImg2ImgPipeline,
StableDiffusionControlNetPipeline,
ControlNetModel,
DPMScheduler
)
from PIL import Image
from typing import Optional, List, Dict, Union
import io
import base64
class TextToImageGenerator:
"""文生图生成器"""
def __init__(
self,
model_id: str = "runwayml/stable-diffusion-v1-5",
device: str = "cuda",
enable_xformers: bool = True,
enable_safety_checker: bool = False
):
self.device = device
self.model_id = model_id
# 加载模型
self.pipeline = self._load_pipeline(
model_id,
device,
enable_xformers,
enable_safety_checker
)
def _load_pipeline(
self,
model_id: str,
device: str,
enable_xformers: bool,
enable_safety_checker: bool
) -> StableDiffusionPipeline:
"""加载管道"""
# 加载基础模型
pipeline = StableDiffusionPipeline.from_pretrained(
model_id,
torch_dtype=torch.float16 if device == "cuda" else torch.float32,
safety_checker=not enable_safety_checker
).to(device)
# 优化
if enable_xformers:
try:
import pipeline_xla_utils
pipeline.enable_xformers_memory_efficient_attention()
except ImportError:
pass
# 启用内存优化
if device == "cuda":
pipeline.enable_model_cpu_offload()
return pipeline
def generate(
self,
prompt: str,
negative_prompt: Optional[str] = None,
width: num = 512,
height: num = 512,
num_inference_steps: num = 50,
guidance_scale: num = 7.5,
seed: Optional[int] = None,
num_images: num = 1,
return_pil: bool = True
) -> Union[List[Image.Image], List[torch.Tensor]]:
"""
生成图像
参数:
- prompt: 提示词
- negative_prompt: 负面提示词(避免的内容)
- width, height: 图像尺寸
- num_inference_steps: 生成步数(越多越精细但越慢)
- guidance_scale: 引导系数(越高越遵循提示词)
- seed: 随机种子(用于复现)
- num_images: 生成数量
- return_pil: 是否返回PIL图像
"""
# 设置随机种子
if seed is not None:
torch.manual_seed(seed)
# 生成参数
kwargs = {
"prompt": [prompt] * num_images,
"width": width,
"height": height,
"num_inference_steps": num_inference_steps,
"guidance_scale": guidance_scale,
"return_dict": False
}
if negative_prompt:
kwargs["negative_prompt"] = [negative_prompt] * num_images
# 生成
outputs = self.pipeline(**kwargs)
if return_pil:
return outputs[0]
else:
return outputs[0] # 返回张量
def generate_base64(
self,
prompt: str,
**kwargs
) -> List[str]:
"""生成并返回base64编码"""
images = self.generate(prompt, return_pil=True, **kwargs)
base64_images = []
for img in images:
buffer = io.BytesIO()
img.save(buffer, format="PNG")
base64_images.append(base64.b64encode(buffer.getvalue()).decode())
return base64_images
def edit_image(
self,
image: Union[Image.Image, str, torch.Tensor],
prompt: str,
strength: num = 0.8,
**kwargs
) -> Image.Image:
"""图像编辑(图生图)"""
# 加载图像
if isinstance(image, str):
image = Image.open(image)
# 创建图生图管道
img2img_pipeline = StableDiffusionImg2ImgPipeline(
**self.pipeline.components
).to(self.device)
# 编辑
result = img2img_pipeline(
image=image,
prompt=prompt,
strength=strength, # 编辑强度(0-1)
**kwargs
)
return result.images[0]
class SDXLGenerator(TextToImageGenerator):
"""SDXL文生图生成器(更高分辨率)"""
def __init__(
self,
model_id: str = "stabilityai/stable-diffusion-xl-base-1.0",
refiner_id: Optional[str] = "stabilityai/stable-diffusion-xl-refiner-1.0",
**kwargs
):
super().__init__(model_id, **kwargs)
# 加载精炼器
if refiner_id:
self.refiner = StableDiffusionXLPipeline.from_pretrained(
refiner_id,
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32
).to(self.device)
else:
self.refiner = None
def generate(
self,
prompt: str,
use_refiner: bool = True,
**kwargs
) -> List[Image.Image]:
"""使用精炼器生成高质量图像"""
# 基础生成
images = super().generate(prompt, **kwargs)
if use_refiner and self.refiner:
# 精炼(高噪声)
noise_level = 20
refiner_kwargs = {
"image": images,
"prompt": [prompt] * len(images),
"denoising_start": 0.0,
"denoising_end": noise_level / 1000
}
refined = self.refiner(**refiner_kwargs)
return refined.images
return images
2.2 ControlNet应用
from diffusers import (
ControlNetModel,
StableDiffusionControlNetPipeline
)
import cv2
import numpy as np
class ControlNetImageGenerator:
"""ControlNet图像生成器(可控生成)"""
def __init__(
self,
base_model_id: str = "runwayml/stable-diffusion-v1-5",
controlnet_id: str = "lllyasviel/sd-controlnet-canny",
device: str = "cuda"
):
self.device = device
# 加载ControlNet
self.controlnet = ControlNetModel.from_pretrained(
controlnet_id,
torch_dtype=torch.float16 if device == "cuda" else torch.float32
).to(device)
# 加载基础管道
self.pipeline = StableDiffusionControlNetPipeline(
StableDiffusionPipeline.from_pretrained(base_model_id)
).to(device)
# 优化
self.pipeline.enable_xformers_memory_efficient_attention()
self.pipeline.enable_model_cpu_offload()
def generate_from_canny(
self,
prompt: str,
control_image: Union[str, Image.Image, np.ndarray],
**kwargs
) -> Image.Image:
"""
使用Canny边缘图控制生成
参数:
- prompt: 文本提示
- control_image: 控制图(Canny边缘)
"""
# 处理控制图
if isinstance(control_image, str):
control_image = Image.open(control_image)
elif isinstance(control_image, np.ndarray):
control_image = Image.fromarray(control_image)
# 生成
result = self.pipeline(
prompt=prompt,
image=control_image,
**kwargs
)
return result.images[0]
def generate_from_depth(
self,
prompt: str,
depth_map: Union[str, Image.Image, np.ndarray],
controlnet_id: str = "lllyasviel/sd-controlnet-depth",
**kwargs
) -> Image.Image:
"""使用深度图控制生成"""
# 需要切换ControlNet
# ... (类似于Canny)
pass
class CannyEdgeDetector:
"""Canny边缘检测器"""
def __init__(
self,
low_threshold: int = 50,
high_threshold: int = 150
):
self.low_threshold = low_threshold
self.high_threshold = high_threshold
def detect(
self,
image: Union[str, Image.Image]
) -> Image.Image:
"""检测Canny边缘"""
# 加载图像
if isinstance(image, str):
image = Image.open(image)
# 转换为numpy
np_image = np.array(image)
# 转为灰度
if len(np_image.shape) == 3:
np_image = cv2.cvtColor(np_image, cv2.COLOR_RGB2GRAY)
# Canny边缘检测
edges = cv2.Canny(
np_image,
self.low_threshold,
self.high_threshold
)
# 转换为PIL
edges_image = Image.fromarray(edges)
return edges_image
# 使用示例
def controlnet_example():
# 创建边缘检测器
canny = CannyEdgeDetector(low_threshold=50, high_threshold=150)
# 检测边缘
edges = canny.detect("input_image.jpg")
# 使用ControlNet生成
generator = ControlNetImageGenerator(
controlnet_id="lllyasviel/sd-controlnet-canny"
)
result = generator.generate_from_canny(
prompt="一只可爱的猫,白色,坐在红色沙发上",
control_image=edges,
num_inference_steps=30,
guidance_scale=7.5
)
result.save("controlnet_output.jpg")
2.3 图像修复(Inpainting)
from diffusers import StableDiffusionInpaintPipeline
from PIL import Image, ImageDraw
import numpy as np
class ImageInpainter:
"""图像修复器"""
def __init__(
self,
model_id: str = "runwayml/stable-diffusion-inpainting",
device: str = "cuda"
):
self.device = device
self.pipeline = StableDiffusionInpaintPipeline.from_pretrained(
model_id,
torch_dtype=torch.float16 if device == "cuda" else torch.float32
).to(device)
# 优化
self.pipeline.enable_xformers_memory_efficient_attention()
def inpaint(
self,
image: Union[str, Image.Image],
mask: Union[str, Image.Image, np.ndarray],
prompt: str,
num_inference_steps: int = 50,
guidance_scale: float = 7.5
) -> Image.Image:
"""
图像修复
参数:
- image: 原始图像
- mask: 掩码图(白色区域将被修复)
- prompt: 修复内容的描述
"""
# 加载图像
if isinstance(image, str):
image = Image.open(image)
# 处理掩码
if isinstance(mask, str):
mask = Image.open(mask)
elif isinstance(mask, np.ndarray):
mask = Image.fromarray((mask * 255).astype(np.uint8))
# 修复
result = self.pipeline(
image=image,
mask_image=mask,
prompt=prompt,
num_inference_steps=num_inference_steps,
guidance_scale=guidance_scale
)
return result.images[0]
def create_rectangle_mask(
self,
image_size: tuple,
rect_coords: tuple # (x, y, width, height)
) -> Image.Image:
"""创建矩形掩码"""
mask = Image.new('L', image_size, 0)
draw = ImageDraw.Draw(mask)
x, y, w, h = rect_coords
draw.rectangle([x, y, x + w, y + h], fill=255)
return mask
def remove_object(
self,
image: Image.Image,
object_coords: tuple,
prompt: str = "seamless background"
) -> Image.Image:
"""移除对象"""
mask = self.create_rectangle_mask(image.size, object_coords)
return self.inpaint(image, mask, prompt)
2.4 图像风格迁移
from transformers import (
BlipProcessor,
BlipForConditionalGeneration,
CLIPProcessor,
CLIPModel
)
class StyleTransfer:
"""风格迁移"""
def __init__(
self,
style_model_id: str = "stabilityai/stable-diffusion-xl-base-1.0",
device: str = "cuda"
):
self.device = device
self.generator = SDXLGenerator(style_model_id, device=device)
# 加载CLIP模型(用于风格提取)
self.clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
self.clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
def extract_style_prompt(
self,
style_image: Image.Image
) -> str:
"""从图像中提取风格描述"""
# 简化版:实际应用可以用BLIP生成图像描述
inputs = self.clip_processor(
images=style_image,
return_tensors="pt"
).to(self.device)
with torch.no_grad():
image_features = self.clip_model.get_image_features(**inputs)
# 可以用image_features在风格向量数据库中查找相似风格
# 这里简化返回预设风格
return "oil painting, detailed brushstrokes, warm colors"
def transfer_style(
self,
content_image: Image.Image,
style_image: Image.Image,
style_strength: float = 0.7
) -> Image.Image:
"""
风格迁移
参数:
- content_image: 内容图像
- style_image: 风格图像
- style_strength: 风格强度(0-1)
"""
# 提取风格描述
style_prompt = self.extract_style_prompt(style_image)
# 图生图实现风格迁移
result = self.generator.edit_image(
image=content_image,
prompt=style_prompt,
strength=style_strength
)
return result
3. 图像处理与理解
3.1 CLIP图像理解
from transformers import CLIPProcessor, CLIPModel
from typing import List, Tuple
import torch
class CLIPImageEncoder:
"""CLIP图像编码器(多模态对齐)"""
def __init__(
self,
model_id: str = "openai/clip-vit-base-patch32",
device: str = "cuda"
):
self.device = device
self.processor = CLIPProcessor.from_pretrained(model_id)
self.model = CLIPModel.from_pretrained(model_id).to(device)
def encode_image(self, image: Union[str, Image.Image]) -> torch.Tensor:
"""编码图像为嵌入向量"""
if isinstance(image, str):
image = Image.open(image)
inputs = self.processor(
images=image,
return_tensors="pt"
).to(self.device)
with torch.no_grad():
image_features = self.model.get_image_features(**inputs)
# 归一化
image_features = image_features / image_features.norm(dim=-1, keepdim=True)
return image_features
def encode_text(self, text: str) -> torch.Tensor:
"""编码文本为嵌入向量"""
inputs = self.processor(
text=text,
return_tensors="pt"
).to(self.device)
with torch.no_grad():
text_features = self.model.get_text_features(**inputs)
# 归一化
text_features = text_features / text_features.norm(dim=-1, keepdim=True)
return text_features
def compute_similarity(
self,
image: Union[str, Image.Image],
text: str
) -> float:
"""计算图像-文本相似度"""
image_features = self.encode_image(image)
text_features = self.encode_text(text)
# 余弦相似度
similarity = (image_features @ text_features.T).item()
return similarity
def classify_image(
self,
image: Union[str, Image.Image],
labels: List[str]
) -> Tuple[str, float]:
"""图像分类(基于文本标签)"""
# 编码图像
image_features = self.encode_image(image)
# 编码所有标签
text_features = []
for label in labels:
text_features.append(self.encode_text(label))
text_features = torch.cat(text_features)
# 计算相似度
similarities = (image_features @ text_features.T).squeeze()
# 获取最高分
best_idx = similarities.argmax()
best_label = labels[best_idx]
best_score = similarities[best_idx].item()
return best_label, best_score
def search_images(
self,
query_text: str,
image_paths: List[str],
top_k: int = 10
) -> List[Tuple[str, float]]:
"""用文本查询相似图像"""
# 编码查询文本
text_features = self.encode_text(query_text)
# 编码所有图像
image_features = []
for path in image_paths:
image_features.append(self.encode_image(path))
image_features = torch.cat(image_features)
# 计算相似度
similarities = (text_features @ image_features.T).squeeze()
# 获取top-k
top_k_indices = similarities.topk(top_k).indices
top_k_scores = similarities.topk(top_k).values
results = [
(image_paths[idx], score.item())
for idx, score in zip(top_k_indices, top_k_scores)
]
return results
# 使用示例
def clip_example():
clip = CLIPImageEncoder()
# 图像-文本相似度
similarity = clip.compute_similarity(
image="cat.jpg",
text="a cute cat"
)
print(f"Similarity: {similarity}")
# 图像分类
labels = ["cat", "dog", "bird", "fish"]
label, score = clip.classify_image("cat.jpg", labels)
print(f"Classification: {label} (score: {score})")
# 图像搜索
images = ["img1.jpg", "img2.jpg", "img3.jpg", "img4.jpg"]
results = clip.search_images("a beautiful sunset", images, top_k=2)
print(f"Search results: {results}")
3.2 视觉问答(VQA)
from transformers import (
BlipProcessor,
BlipForQuestionAnswering,
ViltProcessor,
ViltForQuestionAnswering
)
class VisualQuestionAnswering:
"""视觉问答"""
def __init__(
self,
model_id: str = "Salesforce/blip-vqa-base",
device: str = "cuda"
):
self.device = device
self.processor = BlipProcessor.from_pretrained(model_id)
self.model = BlipForQuestionAnswering.from_pretrained(model_id).to(device)
def answer(
self,
image: Union[str, Image.Image],
question: str,
max_length: int = 20
) -> str:
"""回答关于图像的问题"""
# 加载图像
if isinstance(image, str):
image = Image.open(image)
# 处理输入
inputs = self.processor(
image=image,
text=question,
return_tensors="pt"
).to(self.device)
# 生成回答
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_length=max_length
)
# 解码
answer = self.processor.decode(outputs[0], skip_special_tokens=True)
return answer
def answer_batch(
self,
image: Union[str, Image.Image],
questions: List[str],
max_length: int = 20
) -> List[str]:
"""批量回答"""
answers = []
for question in questions:
answer = self.answer(image, question, max_length)
answers.append(answer)
return answers
# 使用示例
def vqa_example():
vqa = VisualQuestionAnswering()
# 单问题
answer = vqa.answer(
image="scene.jpg",
question="What color is the car?"
)
print(f"Answer: {answer}")
# 多问题
questions = [
"What color is the car?",
"How many people are there?",
"Is it day or night?"
]
answers = vqa.answer_batch("scene.jpg", questions)
for q, a in zip(questions, answers):
print(f"Q: {q} A: {a}")
3.3 图像描述生成
from transformers import BlipForConditionalGeneration
class ImageCaptioning:
"""图像描述生成"""
def __init__(
self,
model_id: str = "Salesforce/blip-image-captioning-base",
device: str = "cuda"
):
self.device = device
self.processor = BlipProcessor.from_pretrained(model_id)
self.model = BlipForConditionalGeneration.from_pretrained(model_id).to(device)
def caption(
self,
image: Union[str, Image.Image],
max_length: int = 50,
num_captions: int = 1
) -> Union[str, List[str]]:
"""生成图像描述"""
# 加载图像
if isinstance(image, str):
image = Image.open(image)
# 处理
inputs = self.processor(
image=image,
return_tensors="pt"
).to(self.device)
# 生成
captions = []
for _ in range(num_captions):
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_length=max_length,
do_sample=True,
top_k=50,
temperature=0.7
)
caption = self.processor.decode(outputs[0], skip_special_tokens=True)
captions.append(caption)
if num_captions == 1:
return captions[0]
return captions
def caption_with_condition(
self,
image: Union[str, Image.Image],
text_prompt: str,
max_length: int = 50
) -> str:
"""根据文本提示生成描述"""
# 加载图像
if isinstance(image, str):
image = Image.open(image)
# 处理
inputs = self.processor(
image=image,
text=text_prompt,
return_tensors="pt"
).to(self.device)
# 生成
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_length=max_length
)
caption = self.processor.decode(outputs[0], skip_special_tokens=True)
return caption
# 使用示例
def captioning_example():
captioner = ImageCaptioning()
# 基本描述
caption = captioner.caption("image.jpg")
print(f"Caption: {caption}")
# 多样性描述
captions = captioner.caption("image.jpg", num_captions=3)
for i, cap in enumerate(captions):
print(f"Caption {i+1}: {cap}")
4. 语音处理与合成
4.1 语音识别(ASR)
from transformers import (
WhisperProcessor,
WhisperForConditionalGeneration,
AutomaticSpeechRecognitionPipeline
)
import torchaudio
class SpeechRecognition:
"""语音识别"""
def __init__(
self,
model_id: str = "openai/whisper-large-v3",
device: str = "cuda"
):
self.device = device
self.processor = WhisperProcessor.from_pretrained(model_id)
self.model = WhisperForConditionalGeneration.from_pretrained(model_id).to(device)
def transcribe(
self,
audio_path: str,
language: Optional[str] = "zh", # None表示自动检测
task: str = "transcribe", # 或 "translate"(翻译为英文)
temperature: float = 0.0 # 0表示确定性输出
) -> str:
"""
语音转文本
参数:
- audio_path: 音频文件路径
- language: 语言代码(zh, en, ja等)
- task: transcribe或translate
- temperature: 生成温度
"""
# 加载音频
waveform, sample_rate = torchaudio.load(audio_path)
# 重采样(如果需要)
if sample_rate != 16000:
resampler = torchaudio.transforms.Resample(sample_rate, 16000)
waveform = resampler(waveform)
sample_rate = 16000
# 处理
inputs = self.processor(
waveform.squeeze(),
sampling_rate=sample_rate,
return_tensors="pt"
).to(self.device)
# 强制语言和任务
forced_decoder_ids = self.processor.get_decoder_prompt_ids(
language=language,
task=task
)
# 生成
with torch.no_grad():
predicted_ids = self.model.generate(
**inputs,
forced_decoder_ids=forced_decoder_ids,
temperature=temperature
)
# 解码
transcription = self.processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
return transcription
def transcribe_with_timestamps(
self,
audio_path: str,
language: Optional[str] = "zh"
) -> List[Dict]:
"""带时间戳的转录"""
# 加载音频
waveform, sample_rate = torchaudio.load(audio_path)
# 处理
inputs = self.processor(
waveform.squeeze(),
sampling_rate=sample_rate,
return_tensors="pt"
).to(self.device)
# 设置返回时间戳
forced_decoder_ids = self.processor.get_decoder_prompt_ids(
language=language,
task="transcribe"
)
# 生成
with torch.no_grad():
predicted_ids = self.model.generate(
**inputs,
forced_decoder_ids=forced_decoder_ids,
output_scores=True,
return_timestamps=True
)
# 解码并提取时间戳
result = self.processor.decode(predicted_ids[0], skip_special_tokens=True)
# 解析时间戳(简化版)
# 实际实现需要从输出中提取
return [
{
"text": result,
"start": 0.0,
"end": len(waveform) / sample_rate
}
]
# 使用示例
def asr_example():
asr = SpeechRecognition()
# 基本转录
text = asr.transcribe("audio.wav", language="zh")
print(f"Transcript: {text}")
# 带时间戳转录
segments = asr.transcribe_with_timestamps("audio.wav")
for seg in segments:
print(f"[{seg['start']:.2f}-{seg['end']:.2f}] {seg['text']}")
4.2 语音合成(TTS)
from TTS.api import TTS
from typing import Optional, List
class TextToSpeech:
"""文本转语音"""
def __init__(
self,
model_name: str = "tts_models/multilingual/multi-dataset/xtts_v2",
device: str = "cuda"
):
self.tts = TTS(model_name).to(device)
def synthesize(
self,
text: str,
output_path: str,
speaker_wav: Optional[str] = None,
language: str = "zh-cn"
) -> None:
"""
文本转语音
参数:
- text: 输入文本
- output_path: 输出音频路径
- speaker_wav: 参考音频(用于音色克隆)
- language: 语言代码
"""
kwargs = {
"text": text,
"file_path": output_path,
"language": language
}
if speaker_wav:
kwargs["speaker_wav"] = speaker_wav
self.tts.tts_to_file(**kwargs)
def synthesize_speaker(
self,
text: str,
speaker_name: str,
output_path: str,
language: str = "zh-cn"
) -> None:
"""使用预定义说话人合成"""
self.tts.tts_to_file(
text=text,
speaker=speaker_name,
language=language,
file_path=output_path
)
def clone_voice(
self,
text: str,
reference_audio: str,
output_path: str,
language: str = "zh-cn"
) -> None:
"""音色克隆"""
self.synthesize(
text=text,
output_path=output_path,
speaker_wav=reference_audio,
language=language
)
def list_speakers(self) -> List[str]:
"""列出可用说话人"""
return self.tts.speakers
# 使用示例
def tts_example():
tts = TextToSpeech()
# 基本合成
tts.synthesize(
text="你好,我是AI助手,很高兴为您服务。",
output_path="output.wav",
language="zh-cn"
)
# 音色克隆
tts.clone_voice(
text="这是克隆的音色说的话。",
reference_audio="reference_voice.wav",
output_path="cloned_voice.wav"
)
4.3 语音增强与降噪
import torch
import torchaudio
from torchaudio.transforms import Spectrogram, InverseSpectrogram
class SpeechEnhancer:
"""语音增强与降噪"""
def __init__(
self,
model_id: str = "mit/speechenhancement-whisper-tiny",
device: str = "cuda"
):
self.device = device
# 加载语音增强模型
# ...(具体实现取决于使用的模型库)
def denoise(
self,
audio_path: str,
output_path: str
) -> None:
"""语音降噪"""
# 加载音频
waveform, sample_rate = torchaudio.load(audio_path)
# 重采样
if sample_rate != 16000:
resampler = torchaudio.transforms.Resample(sample_rate, 16000)
waveform = resampler(waveform)
# 应用降噪模型
enhanced = self._apply_denoising(waveform)
# 保存
torchaudio.save(output_path, enhanced, 16000)
def _apply_denoising(self, waveform: torch.Tensor) -> torch.Tensor:
"""应用降噪算法"""
# 简化版:使用谱减法
# 实际应用可用DeepFilterNet、VoiceFixer等模型
# 计算STFT
stft = torch.stft(
waveform.squeeze(),
n_fft=512,
hop_length=256,
return_complex=True
)
# 计算功率谱
power = torch.abs(stft) ** 2
# 估计噪声(前几帧)
noise_power = power.mean(dim=1, keepdim=True)
# 谱减
alpha = 2.0 # 过减因子
enhanced_power = torch.clamp(power - alpha * noise_power, min=0)
# 重建STFT
enhanced_stft = stft / (torch.abs(stft) + 1e-10) * torch.sqrt(enhanced_power)
# ISTFT
enhanced = torch.istft(
enhanced_stft,
n_fft=512,
hop_length=256
)
return enhanced.unsqueeze(0)
5. 视频生成与处理
5.1 文生视频(SVD / AnimateDiff)
from diffusers import (
StableVideoDiffusionPipeline,
AnimateDiffPipeline,
MotionAdapter
)
class VideoGenerator:
"""视频生成器"""
def __init__(
self,
model_type: str = "svd", # svd 或 animatediff
device: str = "cuda"
):
self.device = device
self.model_type = model_type
if model_type == "svd":
self.pipeline = self._load_svd(device)
elif model_type == "animatediff":
self.pipeline = self._load_animatediff(device)
def _load_svd(self, device: str) -> StableVideoDiffusionPipeline:
"""加载SVD(图生视频)"""
pipeline = StableVideoDiffusionPipeline.from_pretrained(
"stabilityai/stable-video-diffusion-img2vid-xt",
torch_dtype=torch.float16 if device == "cuda" else torch.float32
).to(device)
# 优化
pipeline.enable_xformers_memory_efficient_attention()
pipeline.enable_model_cpu_offload()
return pipeline
def _load_animatediff(self, device: str) -> AnimateDiffPipeline:
"""加载AnimateDiff(文生/图生视频)"""
motion_adapter = MotionAdapter.from_pretrained(
"guoyww/animatediff-motion-adapter-v1-5-2"
)
pipeline = AnimateDiffPipeline.from(
"runwayml/stable-diffusion-v1-5",
motion_adapter=motion_adapter,
torch_dtype=torch.float16 if device == "cuda" else torch.float32
).to(device)
# 优化
pipeline.enable_xformers_memory_efficient_attention()
return pipeline
def image_to_video(
self,
image: Union[str, Image.Image],
num_frames: int = 25,
fps: int = 7,
motion_bucket_id: int = 127,
output_path: str = "output.mp4"
) -> None:
"""
图生视频(SVD)
参数:
- image: 输入图像
- num_frames: 视频帧数
- fps: 帧率
- motion_bucket_id: 运动强度(0-255)
"""
if self.model_type != "svd":
raise ValueError("Use SVD model for image-to-video")
# 加载图像
if isinstance(image, str):
image = Image.open(image)
# 生成视频
result = self.pipeline(
image=image,
num_frames=num_frames,
motion_bucket_id=motion_bucket_id
)
# 保存视频
frames = result.frames[0]
self._save_video(frames, fps, output_path)
def text_to_video(
self,
prompt: str,
num_frames: int = 16x,
width: int = 512,
height: int = 512,
guidance_scale: float = 7.5,
output_path: str = "output.mp4"
) -> None:
"""
文生视频(AnimateDiff)
参数:
- prompt: 文本提示
- -num_frames: 视频帧数
- width, height: 视频尺寸
"""
if self.model_type != "animatediff":
raise ValueError("Use AnimateDiff model for text-to-video")
# 生成
result = self.pipeline(
prompt=prompt,
num_inference_steps=25,
guidance_scale=guidance_scale,
width=width,
height=height,
num_frames=num_frames
)
# 保存
frames = result.frames[0]
self._save_video(frames, 8, output_path)
def _save_video(
self,
frames: List[Image.Image],
fps: int,
output_path: str
) -> None:
"""保存视频"""
import imageio
# 转换为numpy数组
frame_arrays = [np.array(frame) for frame in frames]
# 保存
imageio.mimwrite(output_path, frame_arrays, fps=fps)
# 使用示例
def video_generation_example():
# 图生视频
svd = VideoGenerator(model_type="svd")
svd.image_to_video(
image="input.jpg",
num_frames=25,
fps=7,
motion_bucket_id=150,
output_path="svd_output.mp4"
)
# 文生视频
animatediff = VideoGenerator(model_type="animatediff")
animatediff.text_to_video(
prompt="一只猫在花园里玩耍,阳光明媚",
num_frames=16,
output_path="animatediff_output.mp4"
)
5.2 视频编辑
class VideoEditor:
"""视频编辑器"""
def __init__(self):
pass
def replace_object(
self,
video_path: str,
object_mask: str, # 对象掩码
replacement_prompt: str,
output_path: str
) -> None:
"""替换视频中的对象"""
# 加载视频
import cv2
cap = cv2.VideoCapture(video_path)
# 加载掩码
mask = cv2.imread(object_mask, cv2.IMREAD_GRAYSCALE)
# 帧处理
frames = []
while True:
ret, frame = cap.read()
if not ret:
break
# 使用inpainting替换对象
# ... (类似图像修复)
frames.append(frame)
cap.release()
# 保存
self._save_video(frames, output_path)
def change_background(
self,
video_path: str,
new_background: Union[str, Image.Image],
output_path: str
) -> None:
"""更换背景"""
# 使用背景分割模型(如MiDaS、RVM)
# 然后用新背景合成
pass
def stabilize_video(
self,
video_path: str,
output_path: str
) -> None:
"""视频防抖"""
# 使用视频防抖算法
pass
6. 多模态融合架构
6.1 多模态Agent架构
from typing import Dict, Any, Optional, List
class MultiModalAgent:
"""多模态AI Agent"""
def __init__(
self,
llm_client,
image_generator: Optional[TextToImageGenerator] = None,
asr: Optional[SpeechRecognition] = None,
tts: Optional[TextToSpeech] = None,
vqa: Optional[VisualQuestionAnswering] = None
):
self.llm = llm_client
self.image_generator = image_generator
self.asr = asr
self.tts = tts
self.vqa = vqa
def process(
self,
input_data: Dict[str, Any],
task: str
) -> Dict[str, Any]:
"""
处理多模态输入
参数:
- input_data: 包含不同模态的数据
- task: 任务类型
"""
# 解析输入
text = input_data.get('text')
image = input_data.get('image')
audio = input_data.get('audio')
# 根据任务处理
if task == 'text_to_image':
return self._text_to_image(text, input_data)
elif task == 'image_to_text':
return self._image_to_text(image, input_data)
elif task == 'vqa':
return self._visual_qa(image, text, input_data)
elif task == 'multimodal_chat':
return self._multimodal_chat(input_data)
else:
raise ValueError(f"Unknown task: {task}")
def _text_to_image(
self,
text: str,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""文生图任务"""
if not self.image_generator:
raise ValueError("Image generator not initialized")
# 使用LLM优化提示词
optimized_prompt = self._optimize_image_prompt(text)
# 生成图像
images = self.image_generator.generate(
prompt=optimized_prompt,
**params.get('generation_params', {})
)
return {
'images': images,
'prompt': optimized_prompt
}
def _image_to_text(
self,
image: Union[str, Image.Image],
params: Dict[str, Any]
) -> Dict[str, Any]:
"""图生文任务"""
from transformers import BlipProcessor, BlipForConditionalGeneration
# 使用BLIP生成描述
# ... (参见前面图像描述生成)
return {
'text': description
}
def _visual_qa(
self,
image: Union[str, Image.Image],
question: str,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""视觉问答任务"""
if not self.vqa:
raise ValueError("VQA not initialized")
answer = self.vqa.answer(image, question)
return {
'answer': answer,
'question': question
}
def _multimodal_chat(
self,
input_data: Dict[str, Any]
) -> Dict[str, Any]:
"""多模态对话"""
# 构建包含多模态的prompt
prompt_parts = []
if 'text' in input_data:
prompt_parts.append(f"Text: {input_data['text']}")
if 'image' in input_data:
# 提取图像信息
image = input_data['image']
if self.vqa:
description = self.vqa.answer(
image,
"Describe this image in detail."
)
prompt_parts.append(f"Image description: {description}")
if 'audio' in input_data:
# 转录音频
if self.asr:
transcript = self.asr.transcribe(input_data['audio'])
prompt_parts.append(f"Audio transcript: {transcript}")
# 组合prompt
combined_prompt = "\n".join(prompt_parts)
# 使用LLM生成响应
response = self.llm.complete(combined_prompt)
# 语音合成(如果需要)
audio_response = None
if input_data.get('return_audio', False) and self.tts:
temp_path = "temp_response.wav"
self.tts.synthesize(response, temp_path)
audio_response = temp_path
return {
'text': response,
'audio': audio_response
}
def _optimize_image_prompt(
self,
user_prompt: str
) -> str:
"""使用LLM优化图像生成提示词"""
optimization_prompt = f"""
优化以下图像生成提示词,使其更适合AI绘画模型。
添加细节描述、风格、构图等元素。
用户提示: {user_prompt}
优化后的提示词:
"""
optimized = self.llm.complete(optimization_prompt)
return optimized.strip()
# 使用示例
def multimodal_agent_example():
# 初始化各组件
llm = ... # LLM客户端
image_gen = TextToImageGenerator()
asr = SpeechRecognition()
tts = TextToSpeech()
vqa = VisualQuestionAnswering()
# 创建多模态Agent
agent = MultiModalAgent(
llm_client=llm,
image_generator=image_gen,
asr=asr,
tts=tts,
vqa=vqa
)
# 文生图
result = agent.process(
input_data={'text': '一只可爱的猫坐在窗边,阳光洒进来'},
task='text_to_image'
)
# 视觉问答
result = agent.process(
input_data={
'image': 'image.jpg',
'text': '图片中有几个人?'
},
task='vqa'
)
# 多模态对话
result = agent.process(
input_data={
'image': 'image.jpg',
'audio': 'question.wav',
'return_audio': True
},
task='multimodal_chat'
)
6.2 多模态RAG系统
class MultiModalRAG:
"""多模态检索增强生成系统"""
def __init__(
self,
llm_client,
vector_store, # 文本向量存储
image_store, # 图像向量存储
image_encoder, # CLIP图像编码器
image_generator
):
self.llm = llm_client
self.vector_store = vector_store
self.image_store = image_store
self.image_encoder = image_encoder
self.image_generator = image_generator
def query(
self,
query_text: str,
query_image: Optional[Union[str, Image.Image]] = None,
use_hybrid: bool = True
) -> Dict[str, Any]:
"""
多模态查询
参数:
- query_text: 文本查询
- query_image: 图像查询(可选)
- use_hybrid: 是否使用混合检索
"""
# 1. 文本检索
text_results = self.vector_store.search(query_text, k=5)
# 2. 图像检索(如果有查询图)
image_results = []
if query_image:
query_image_embed = self.image_encoder.encode_image(query_image)
image_results = self.image_store.search(query_image_embed, k=5)
# 3. 跨模态检索(用文本查图像)
cross_modal_images = []
if use_hybrid:
# 用文本查相似图像
cross_modal_images = self._search_images_with_text(query_text, k=3)
# 4. 构建上下文
context = self._build_context(text_results, image_results, cross_modal_images)
# 5. 生成响应
response = self.llm.complete(f"""
基于以下信息回答问题:
问题: {query_text}
参考信息:
{context}
回答:
""")
# 6. 可选:生成相关图像
if query_image is None and not cross_modal_images:
generated_image = self.image_generator.generate(query_text)
else:
generated_image = None
return {
'answer': response,
'context': context,
'retrieved_images': cross_modal_images,
'generated_image': generated_image
}
def _search_images_with_text(
self,
query_text: str,
k: int = 5
) -> List[str]:
"""用文本查询图像"""
# 使用CLIP的文本嵌入查询图像存储
text_embed = self.image_encoder.encode_text(query_text)
results = self.image_store.search(text_embed, k=k)
return [r['path'] for r in results]
def _build_context(
self,
text_results: List[Dict],
image_results: List[Dict],
cross_modal_images: List[str]
) -> str:
"""构建上下文"""
context_parts = []
# 添加文本上下文
if text_results:
context_parts.append("文本参考:")
for r in text_results:
context_parts.append(f"- {r['text']}")
# 添加图像上下文
if image_results or cross_modal_images:
context_parts.append("\n相关图像:")
for r in image_results:
context_parts.append(f"- {r['path']}")
for img_path in cross_modal_images:
context_parts.append(f"- {img_path}")
return "\n".join(context_parts)
7. 面试高频问题
Q1: 什么是多模态学习?有哪些主要架构类型?
回答要点:
-
多模态学习定义
- 处理多种类型数据(文本、图像、音频、视频等)
- 学习模态间的关联和互补信息
- 比单模态提供更丰富的上下文
-
主要架构类型
| 架构类型 | 特点 | 适用场景 |
|---|---|---|
| Early Fusion | 早期融合,联合特征学习 | 多模态特征强关联 |
| Late Fusion | 晚期融合,独立处理后合并 | 各模态独立性强 |
| Cross-Attention | 交叉注意力机制 | 需要精细模态交互 |
| Encoder-Decoder | 共享编码器+独立解码器 | 多模态到多模态生成 |
Q2: CLIP是如何实现图像-文本对齐的?
回答要点:
-
CLIP核心思想
- 对比学习:最大化图文相似度,减小非图文相似度
- 共享嵌入空间:图像和文本映射到同一向量空间
-
训练过程
# 概念代码 for batch in dataloader: images, texts = batch # 编码 image_features = vision_encoder(images) text_features = text_encoder(texts) # 归一化 image_features = F.normalize(image_features) text_features = F.normalize(text_features) # 计算相似度矩阵 similarity = image_features @ text_features.T # 对比损失 loss = contrastive_loss(similarity) # 反向传播 loss.backward() -
优势
- 零样本能力:无需微调即可分类
- 多任务能力:检索、分类、VQA等
Q3: Stable Diffusion的生成过程是怎样的?
回答要点:
-
核心组件
- VAE: 潜空间编码解码
- U-Net: 去噪网络
- Text Encoder: CLIP文本编码
- Scheduler: 噪声调度
-
生成流程
1. 初始化随机噪声 (latent) 2. 编码文本提示词 3. 迭代去噪: a. 用U-Net预测噪声 b. 用Scheduler更新latent c. 重复N次 4. 用VAE解码latent 5. 输出图像 -
关键参数
num_inference_steps: 去噪步数guidance_scale: 提示词引导强度seed: 随机种子
Q4: ControlNet是如何实现可控生成的?
回答要点:
-
ControlNet原理
- 复制基础U-Net权重
- 添加零卷积层初始化
- 通过控制图调节生成
-
架构
基础UNet ControlNet │ │ │ +-----------+ ▼ ▼ 求和后输入到后续层 -
控制类型
- Canny: 边缘控制
- Depth: 深度控制
- Pose: 姿态控制
- Segmentation: 分割控制
- Normal: 法向量控制
Q5: 如何评估多模态模型的效果?
回答要点:
-
文本到图像评估
- FID (Fréchet Inception Distance): 图像质量
- CLIP Score: 文本-图像对齐度
- IS (Inception Score): 图像多样性
-
视觉问答评估
- 准确率
- BLEU/ROUGE
- 人工评估
-
端到端评估
- 用户满意度
- 任务完成率
- A/B测试
Q6: 多模态RAG与文本RAG有什么区别?
回答要点:
| 特性 | 文本RAG | 多模态RAG |
|---|---|---|
| 检索内容 | 纯文本 | 文本+图像+视频 |
| 编码器 | 文本嵌入 | CLIP多模态嵌入 |
| 检索方式 | 向量检索 | 跨模态检索 |
| 上下文 | 文本片段 | 多模态描述 |
| 生成能力 | 文本 | 文本+图像 |
Q7: 如何处理多模态输入的对齐问题?
回答要点:
-
特征对齐
- 投影层对齐维度
- 归一化
- 模态标识符
-
时间对齐(音频、视频)
- 统一采样率
- 时间戳同步
- 动态时间规整(DTW)
-
语义对齐
- 对比学习
- 跨模态注意力
- 联合训练
总结
本模块深入讲解了多模态大模型应用开发的核心内容:
关键技术点
-
多模态架构
- Early/Late Fusion
- Cross-Attention
- Encoder-Decoder
-
文生图
- Stable Diffusion
- ControlNet
- 图像编辑与修复
-
图像理解
- CL:IP
- 视觉问答(VQA)
- 图像描述
-
语音处理
- ASR (Whisper)
- TTS
- 语音增强
-
视频处理
- SVD (图生视频)
- AnimateDiff (文生视频)
- 视频编辑
-
多模态Agent
- 多模态输入处理
- 多模态RAG
- 跨模态交互