【AI Agent 知识库】15-多模态交互系统

内容纲要

多模态交互系统

目标:掌握多模态(文本、图像、语音、视频)Agent 的开发


目录


多模态概述

什么是多模态 Agent?

【定义】

多模态 Agent 是能够理解和处理多种类型数据(文本、图像、语音、视频等)
的智能体,提供更丰富的交互体验。

【模态类型】

1. 文本(Text)
   - 对话、文档、代码

2. 图像(Image)
   - 图片理解、生成
   - OCR、视觉问答

3. 语音(Audio)
   - 语音识别(ASR)
   - 语音合成(TTS)
   - 语音交互

4. 视频(Video)
   - 视频理解
   - 视频生成
   - 关键帧提取

5. 其他模态
   - 3D 模型
   - 传感器数据
   - 时间序列数据

应用场景

【文本 + 图像】

- 视觉问答(VQA)
- 图像描述生成
- 文档扫描理解
- 图表分析

【语音 + 文本】

- 语音助手
- 会议记录
- 实时翻译
- 语音搜索

【视频 + 文本】

- 视频搜索
- 视频摘要
- 视频问答
- 内容审核

【全模态】

- 多模态搜索
- 创意生成
- 内容创作

文本交互

1. 文本理解

class TextProcessor:
    """文本处理器"""

    def __init__(self, llm):
        self.llm = llm

    def understand(self, text: str) -> dict:
        """理解文本内容"""
        prompt = f"""
        分析以下文本,提取关键信息:

        文本:{text}

        请以 JSON 格式返回:
        {{
            "summary": "文本摘要",
            "key_points": ["关键点1", "关键点2"],
            "sentiment": "positive/negative/neutral",
            "entities": [{{"type": "类型", "value": "值"}}],
            "topics": ["主题1", "主题2"]
        }}
        """

        response = self.llm.generate(prompt)
        return json.loads(response)

    def extract(self, text: str, schema: dict) -> dict:
        """按 Schema 提取信息"""
        prompt = f"""
        从以下文本中提取指定信息:

        文本:{text}

        需要提取的字段:
        {json.dumps(schema, indent=2, ensure_ascii=False)}

        请以 JSON 格式返回提取结果。
        """

        response = self.llm.generate(prompt)
        return json.loads(response)

2. 文本生成

class TextGenerator:
    """文本生成器"""

    def __init__(self, llm):
        self.llm = llm

    def generate(self, prompt: str, **kwargs) -> str:
        """生成文本"""
        return self.llm.generate(prompt, **kwargs)

    def generate_with_few_shot(self, prompt: str, examples: list) -> str:
        """少样本生成"""
        examples_text = "\n".join([
            f"输入:{ex['input']}\n输出:{ex['output']}"
            for ex in examples
        ])

        full_prompt = f"""
        示例:
        {examples_text}

        输入:{prompt}
        输出:
        """

        return self.llm.generate(full_prompt)

    def generate_structured(self, prompt: str, schema: dict) -> dict:
        """生成结构化输出"""
        schema_prompt = f"""
        请根据以下要求生成内容,输出必须符合以下 JSON Schema:

        要求:{prompt}

        JSON Schema:
        {json.dumps(schema, indent=2)}

        请只输出 JSON,不要输出其他内容。
        """

        response = self.llm.generate(schema_prompt, response_format="json")
        return json.loads(response)

图像交互

1. 图像理解

from openai import OpenAI
import base64
from typing import List
from PIL import Image

class VisionProcessor:
    """视觉处理器"""

    def __init__(self, api_key: str):
        self.client = OpenAI(api_key=api_key)

    def encode_image(self, image_path: str) -> str:
        """编码图像为 base64"""
        with open(image_path, "rb") as image_file:
            return base64.b64encode(image_file.read()).decode("utf-8")

    def describe_image(self, image_path: str) -> str:
        """描述图像内容"""
        base64_image = self.encode_image(image_path)

        response = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[{
                "role": "user",
                "content": [
                    {"type": "text", "text": "请描述这张图片的内容。"},
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/jpeg;base64,{base64_image}"
                        }
                    }
                ]
            }]
        )

        return response.choices[0].message.content

    def analyze_image(self, image_path: str, analysis_type: str) -> str:
        """分析图像"""
        base64_image = self.encode_image(image_path)

        prompts = {
            "describe": "请详细描述这张图片的内容。",
            "detect": "请检测并识别图片中的所有物体。",
            "ocr": "请提取图片中的所有文字。",
            "sentiment": "请分析图片传达的情感和氛围。",
            "technical": "请从技术角度分析这张图片(如构图、色彩、光影等)。"
        }

        response = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[{
                "role": "user",
                "content": [
                    {"type": "text", "text": prompts.get(analysis_type, prompts["describe"])},
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/jpeg;base64,{base64_image}"
                        }
                    }
                ]
            }]
        )

        return response.choices[0].message.content

    def visual_qa(self, image_path: str, question: str) -> str:
        """视觉问答"""
        base64_image = self.encode_image(image_path)

        response = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[{
                "role": "user",
                "content": [
                    {"type": "text", "text": f"请回答以下关于图片的问题:{question}"},
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/jpeg;base64,{base64_image}"
                        }
                    }
                ]
            }]
        )

        return response.choices[0].message.content

    def extract_text_from_image(self, image_path: str) -> str:
        """从图像中提取文字(OCR)"""
        return self.analyze_image(image_path, "ocr")

    def detect_objects(self, image_path: str) -> List[dict]:
        """检测图像中的物体"""
        result = self.analyze_image(image_path, "detect")

        # 解析检测结果
        return json.loads(result) if "{" in result else []

2. 图像生成

class ImageGenerator:
    """图像生成器"""

    def __init__(self, api_key: str):
        self.client = OpenAI(api_key=api_key)

    def generate_image(self, prompt: str, **kwargs) -> dict:
        """生成图像"""
        response = self.client.images.generate(
            model=kwargs.get("model", "dall-e-3"),
            prompt=prompt,
            size=kwargs.get("size", "1024x1024"),
            quality=kwargs.get("quality", "standard"),
            n=kwargs.get("n", 1),
            style=kwargs.get("style", "vivid")
        )

        return {
            "url": response.data[0].url,
            "revised_prompt": response.data[0].revised_prompt
        }

    def edit_image(self, image_path: str, prompt: str, mask_path: str = None) -> dict:
        """编辑图像"""
        with open(image_path, "rb") as img_file:
            image = img_file.read()

        mask = None
        if mask_path:
            with open(mask_path, "rb") as mask_file:
                mask = mask_file.read()

        response = self.client.images.edit(
            image=image,
            mask=mask,
            prompt=prompt,
            size="1024x1024",
            n=1
        )

        return {"url": response.data[0].url}

    def create_variation(self, image_path: str) -> dict:
        """创建图像变体"""
        with open(image_path, "rb") as img_file:
            image = img_file.read()

        response = self.client.images.create_variation(
            image=image,
            size="1024x1024",
            n=1
        )

        return {"url": response.data[0].url}

3. 多图像处理

class MultiImageProcessor:
    """多图像处理器"""

    def __init__(self, api_key: str):
        self.client = OpenAI(api_key=api_key)

    def compare_images(self, image_paths: List[str]) -> dict:
        """比较多个图像"""
        content = [{"type": "text", "text": "请比较以下图片的异同点,包括内容、风格、构图等方面。"}]

        for i, image_path in enumerate(image_paths, 1):
            base64_image = self.encode_image(image_path)
            content.append({
                "type": "image_url",
                "image_url": {
                    "url": f"data:image/jpeg;base64,{base64_image}"
                }
            })
            content.append({"type": "text", "text": f"\n--- 图片 {i+1} ---\n"})

        response = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": content}]
        )

        return {
            "comparison": response.choices[0].message.content
        }

    def image_retrieval(self, query_image: str, database_images: List[str], top_k: int = 5) -> List[dict]:
        """基于图像的检索"""
        query_description = self.describe_image(query_image)

        # 为所有数据库图像生成描述
        db_descriptions = []
        for db_image in database_images:
            desc = self.describe_image(db_image)
            db_descriptions.append({
                "image_path": db_image,
                "description": desc
            })

        # 使用 LLM 评估相似度
        ranked_images = []
        for db_desc in db_descriptions:
            similarity_prompt = f"""
            查询图像描述:{query_description}
            数据库图像描述:{db_desc['description']}

            请评估这两张图像的相似度(0-1之间的浮点数),只输出数字。
            """

            score = float(self.client.chat.completions.create(
                model="gpt-4",
                messages=[{"role": "user", "content": similarity_prompt}]
            ).choices[0].message.content.strip())

            ranked_images.append({
                "image_path": db_desc["image_path"],
                "description": db_desc["description"],
                "score": score
            })

        # 按相似度排序
        ranked_images.sort(key=lambda x: x["score"], reverse=True)

        return ranked_images[:top_k]

语音交互

1. 语音识别(ASR)

import speech_recognition as sr
from typing import Optional
import threading

class SpeechRecognizer:
    """语音识别器"""

    def __init__(self, language: str = "zh-CN"):
        self.language = language
        self.recognizer = sr.Recognizer()

    def recognize_from_file(self, audio_file: str) -> dict:
        """从文件识别语音"""
        with sr.AudioFile(audio_file) as source:
            audio_data = self.recognizer.record(source)

        try:
            # 使用多个引擎尝试识别
            text = self._try_multiple_engines(audio_data)
            return {
                "success": True,
                "text": text,
                "language": self.language
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }

    def _try_multiple_engines(self, audio_data):
        """尝试多个识别引擎"""
        engines = [
            self._recognize_google,
            self._recognize_whisper,
        ]

        for engine in engines:
            try:
                text = engine(audio_data)
                if text and len(text.strip()) > 0:
                    return text
            except:
                continue

        raise Exception("所有识别引擎都失败了")

    def _recognize_google(self, audio_data) -> str:
        """Google 语音识别"""
        return self.recognizer.recognize_google(
            audio_data,
            language=self.language
        )

    def _recognize_whisper(self, audio_data) -> str:
        """Whisper 语音识别"""
        # 需要安装 openai-whisper
        import whisper
        model = whisper.load_model("base")
        result = model.transcribe(audio_data)
        return result["text"]

    def recognize_from_microphone(self, duration: int = 5) -> dict:
        """从麦克风识别"""
        with sr.Microphone() as source:
            print(f"正在录音...({duration}秒)")
            audio_data = self.recognizer.listen(source, duration=duration)
            print("录音完成")

        try:
            text = self._try_multiple_engines(audio_data)
            return {"success": True, "text": text}
        except Exception as e:
            return {"success": False, "error": str(e)}

    def continuous_recognition(self, callback):
        """连续语音识别"""
        with sr.Microphone() as source:
            print("开始连续识别...")
            self.recognizer.adjust_for_ambient_noise(source, duration=1)

            # 使用后台线程持续识别
            stop_listening = self.recognizer.listen_in_background(
                source,
                callback
            )

            return stop_listening

2. 语音合成(TTS)

import openai
import io
import pygame
import tempfile

class SpeechSynthesizer:
    """语音合成器"""

    def __init__(self, api_key: str):
        self.client = openai.OpenAI(api_key=api_key)
        pygame.mixer.init()

    def generate_speech(self, text: str, voice: str = "alloy", **kwargs) -> bytes:
        """生成语音"""
        response = self.client.audio.speech.create(
            model=kwargs.get("model", "tts-1"),
            voice=voice,
            input=text,
            response_format=kwargs.get("format", "mp3")
        )

        return response.content

    def speak(self, text: str, voice: str = "alloy"):
        """朗读文本"""
        # 生成语音
        audio_data = self.generate_speech(text, voice)

        # 播放
        with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file:
            temp_file.write(audio_data)
            temp_file.flush()

            sound = pygame.mixer.Sound(temp_file.name)
            sound.play()

            # 等待播放完成
            while pygame.mixer.get_busy():
                pygame.time.Clock().tick(10)

            # 清理
            pygame.mixer.stop()
            os.unlink(temp_file.name)

    def save_speech(self, text: str, output_file: str, voice: str = "alloy"):
        """保存语音到文件"""
        audio_data = self.generate_speech(text, voice)

        with open(output_file, "wb") as f:
            f.write(audio_data)

    def get_available_voices(self) -> list:
        """获取可用语音"""
        return [
            {"id": "alloy", "name": "Alloy", "gender": "female"},
            {"id": "echo", "name": "Echo", "gender": "male"},
            {"id": "fable", "name": "Fable", "gender": "male"},
            {"id": "onyx", "name": "Onyx", "gender": "male"},
            {"id": "nova", "name": "Nova", "gender": "female"},
            {"id": "shimmer", "name": "Shimmer", "gender": "female"}
        ]

3. 语音对话系统

class VoiceAssistant:
    """语音助手"""

    def __init__(self, api_key: str, llm):
        self.tts = SpeechSynthesizer(api_key)
        self.asr = SpeechRecognizer()
        self.llm = llm
        self.conversation_history = []

    def speak_and_listen(self, prompt: str, duration: int = 5) -> dict:
        """说话并听"""
        # 1. TTS 播放提示
        print(f"助手:{prompt}")
        self.tts.speak(prompt)

        # 2. ASR 识别用户输入
        result = self.asr.recognize_from_microphone(duration)

        if result["success"]:
            user_input = result["text"]
            print(f"用户:{user_input}")

            # 3. 更新对话历史
            self.conversation_history.append({"role": "user", "content": user_input})

            return {"success": True, "text": user_input}

        return {"success": False, "error": result["error"]}

    def process_and_respond(self, user_input: str) -> str:
        """处理并响应"""
        # 构建消息历史
        messages = [{"role": "system", "content": "你是一个语音助手。"}]
        messages.extend(self.conversation_history)

        # 生成响应
        response = self.llm.generate(messages)

        # 更新历史
        self.conversation_history.append({"role": "assistant", "content": response})

        # TTS 播放响应
        print(f"助手:{response}")
        self.tts.speak(response)

        return response

    def run_voice_chat(self, max_turns: int = 10):
        """运行语音对话"""
        print("=== 语音对话系统 ===")
        print("说话开始,按 Ctrl+C 结束\n")

        # 初始问候
        self.speak_and_listen("你好!我是语音助手,请问我任何问题。")

        for turn in range(max_turns):
            try:
                # 听用户输入
                result = self.asr.recognize_from_microphone(5)

                if result["success"]:
                    user_input = result["text"]
                    print(f"你:{user_input}")

                    # 处理并响应
                    self.process_and_respond(user_input)
                else:
                    print(f"识别失败:{result['error']}")

            except KeyboardInterrupt:
                print("\n再见!")
                self.tts.speak("再见!")
                break

视频交互

1. 视频理解

import cv2
import base64
from typing import List
from moviepy.editor import VideoFileClip

class VideoProcessor:
    """视频处理器"""

    def __init__(self, vision_client):
        self.vision = vision_client

    def extract_keyframes(self, video_path: str, num_frames: int = 10) -> List[str]:
        """提取关键帧"""
        # 使用 OpenCV 提取帧
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        # 计算采样间隔
        interval = max(1, total_frames // num_frames)

        keyframes = []
        for i in range(0, total_frames, interval):
            cap.set(cv2.CAP_PROP_POS_FRAMES, i)
            ret, frame = cap.read()

            if ret:
                # 保存帧
                frame_path = f"frame_{i}.jpg"
                cv2.imwrite(frame_path, frame)
                keyframes.append(frame_path)

            if len(keyframes) >= num_frames:
                break

        cap.release()
        return keyframes

    def describe_video(self, video_path: str) -> dict:
        """描述视频内容"""
        # 提取关键帧
        keyframes = self.extract_keyframes(video_path, num_frames=5)

        # 获取视频信息
        clip = VideoFileClip(video_path)
        duration = clip.duration
        fps = clip.fps

        # 使用视觉模型描述关键帧
        frame_descriptions = []
        for frame_path in keyframes:
            desc = self.vision.describe_image(frame_path)
            frame_descriptions.append(desc)

        # 生成视频摘要
        summary = self._generate_video_summary(
            frame_descriptions,
            duration,
            fps
        )

        return {
            "duration": duration,
            "fps": fps,
            "keyframe_descriptions": frame_descriptions,
            "summary": summary
        }

    def _generate_video_summary(self, frame_descriptions: List[str], duration: float, fps: float) -> str:
        """生成视频摘要"""
        frames_text = "\n".join([
            f"帧 {i+1}: {desc}"
            for i, desc in enumerate(frame_descriptions)
        ])

        prompt = f"""
        基于以下关键帧描述,总结视频内容:

        视频信息:
        - 时长:{duration:.2f} 秒
        - 帧率:{fps:.2f} FPS

        关键帧描述:
        {frames_text}

        请提供一个简洁的视频内容摘要。
        """

        # 使用 LLM 生成摘要
        return self.vision.llm.generate(prompt)

    def video_qa(self, video_path: str, question: str) -> str:
        """视频问答"""
        # 提取关键帧
        keyframes = self.extract_keyframes(video_path, num_frames=3)

        # 使用视觉模型回答
        return self.vision.visual_qa(keyframes[0], question)

    def extract_audio_from_video(self, video_path: str, output_path: str) -> str:
        """从视频中提取音频"""
        clip = VideoFileClip(video_path)
        audio = clip.audio

        if audio:
            audio.write_audiofile(output_path)
            return output_path

        return None

2. 视频分析

class VideoAnalyzer:
    """视频分析器"""

    def analyze_activities(self, video_path: str) -> dict:
        """分析视频中的活动"""
        # 提取关键帧
        keyframes = self.extract_keyframes(video_path, num_frames=10)

        # 分析每帧的活动
        activities = []
        for frame_path in keyframes:
            # 使用视觉模型检测活动
            activity = self.vision.analyze_image(
                frame_path,
                "detect_activities"
            )
            activities.append(activity)

        return {"activities": activities}

    def detect_scenes(self, video_path: str, threshold: float = 0.3) -> List[dict]:
        """检测场景转换"""
        cap = cv2.VideoCapture(video_path)
        prev_frame = None
        scenes = []
        frame_count = 0
        current_scene_start = 0

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # 计算帧间差异
            if prev_frame is not None:
                diff = cv2.absdiff(prev_frame, frame)
                diff_score = cv2.mean(diff)[0] / 255.0

                if diff_score > threshold:
                    # 场景转换
                    scenes.append({
                        "start_frame": current_scene_start,
                        "end_frame": frame_count - 1,
                        "start_time": current_scene_start / 30.0,
                        "end_time": (frame_count - 1) / 30.0
                    })
                    current_scene_start = frame_count

            prev_frame = frame
            frame_count += 1

        # 最后一个场景
        if current_scene_start < frame_count:
            scenes.append({
                "start_frame": current_scene_start,
                "end_frame": frame_count - 1,
                "start_time": current_scene_start / 30.0,
                "end_time": (frame_count - 1) / 30.0
            })

        cap.release()
        return scenes

多模态融合

1. 多模态 Agent

class MultimodalAgent:
    """多模态智能体"""

    def __init__(self, vision_client, tts_client, asr_client, llm):
        self.vision = vision_client
        self.tts = tts_client
        self.asr = asr_client
        self.llm = llm

    def process_multimodal_query(self, query: str, **modal_data) -> dict:
        """处理多模态查询"""
        context = []

        # 处理文本查询
        if query:
            context.append({"type": "text", "content": query})

        # 处理图像
        if "image" in modal_data:
            image_desc = self.vision.describe_image(modal_data["image"])
            context.append({
                "type": "image",
                "content": f"图片描述:{image_desc}"
            })

        # 处理语音
        if "audio" in modal_data:
            audio_text = self.asr.recognize_from_file(modal_data["audio"])
            context.append({
                "type": "audio",
                "content": f"语音内容:{audio_text}"
            })

        # 处理视频
        if "video" in modal_data:
            video_desc = self.vision.describe_video(modal_data["video"])
            context.append({
                "type": "video",
                "content": f"视频摘要:{video_desc['summary']}"
            })

        # 生成综合响应
        response = self._generate_response(context)

        return {
            "context": context,
            "response": response
        }

    def _generate_response(self, context: list) -> str:
        """生成响应"""
        context_text = "\n".join([
            f"{c['type']}: {c['content']}"
            for c in context
        ])

        prompt = f"""
        基于以下多模态上下文,给出一个综合的回答:

        {context_text}

        请回答用户的问题。
        """

        return self.llm.generate(prompt)

2. 多模态 RAG

class MultimodalRAG:
    """多模态 RAG"""

    def __init__(self, vision_client, vector_store, llm):
        self.vision = vision_client
        self.vector_store = vector_store
        self.llm = llm

    def index_multimodal_document(self, document: str, images: List[str] = None) -> str:
        """索引多模态文档"""
        # 提取文本特征
        text_embedding = self.llm.embed(document)

        # 提取图像特征
        image_descriptions = []
        if images:
            for image_path in images:
                desc = self.vision.describe_image(image_path)
                image_descriptions.append(desc)

        # 合并上下文
        combined_context = f"{document}\n\n图像描述:\n" + "\n".join(image_descriptions)

        # 创建嵌入
        combined_embedding = self.llm.embed(combined_context)

        # 存储到向量库
        doc_id = self._generate_id()
        self.vector_store.add({
            "id": doc_id,
            "content": combined_context,
            "embedding": combined_embedding,
            "metadata": {
                "text": document,
                "image_descriptions": image_descriptions
            }
        })

        return doc_id

    def query_multimodal(self, query: str, image: str = None) -> dict:
        """多模态查询"""
        # 构建查询上下文
        if image:
            image_desc = self.vision.describe_image(image)
            combined_query = f"{query}\n\n参考图片描述:{image_desc}"
        else:
            combined_query = query

        # 检索
        query_embedding = self.llm.embed(combined_query)
        results = self.vector_store.search(query_embedding, top_k=5)

        # 生成响应
        context = "\n".join([r["content"] for r in results])
        response = self.llm.generate(f"""
        查询:{combined_query}

        相关信息:
        {context}

        请基于上述信息回答查询。
        """)

        return {
            "query": combined_query,
            "results": results,
            "response": response
        }

实战案例

案例:图像辅助问答系统

class ImageAssistantQA:
    """图像辅助问答系统"""

    def __init__(self, vision_client, rag_engine, llm):
        self.vision = vision_client
        self.rag = rag_engine
        self.llm = llm

    def answer_with_image_context(self, question: str, image: str) -> dict:
        """结合图像上下文回答问题"""

        # 1. 理解图像内容
        image_info = self.vision.describe_image(image)

        # 2. 基于图像信息检索相关文档
        rag_results = self.rag.query(image_info, top_k=3)

        # 3. 生成综合回答
        context = f"""
        图像信息:{image_info}

        相关文档:
        {chr(10).join([r['content'] for r in rag_results['sources']])}
        """

        prompt = f"""
        用户问题:{question}

        上下文信息:
        {context}

        请结合图像信息和相关文档回答问题。
        """

        answer = self.llm.generate(prompt)

        return {
            "image_info": image_info,
            "rag_results": rag_results,
            "answer": answer
        }

案例:多模态文档检索

class MultimodalDocumentSearch:
    """多模态文档检索"""

    def __init__(self, vision_client, vector_store, llm):
        self.vision = vision_client
        self.vector_store = vector_store
        self.llm = llm

    def index_document_with_images(self, doc_path: str, image_folder: str) -> str:
        """索引带图片的文档"""
        # 读取文档内容
        with open(doc_path, "r", encoding="utf-8") as f:
            content = f.read()

        # 获取文档中的所有图片
        images = []
        if os.path.exists(image_folder):
            for img_file in os.listdir(image_folder):
                if img_file.endswith((".jpg", ".png", ".jpeg")):
                    img_path = os.path.join(image_folder, img_file)
                    images.append(img_path)

        # 提取图片描述
        image_descriptions = []
        for img_path in images:
            desc = self.vision.describe_image(img_path)
            image_descriptions.append({
                "path": img_path,
                "description": desc
            })

        # 创建综合嵌入
        combined_content = f"{content}\n\n" + "\n".join([
            f"图片 {i+1}: {img['description']}"
            for i, img in enumerate(image_descriptions)
        ])

        embedding = self.llm.embed(combined_content)

        # 存储到向量库
        doc_id = self._generate_id()
        self.vector_store.add({
            "id": doc_id,
            "content": content,
            "embedding": embedding,
            "metadata": {
                "images": image_descriptions,
                "source": doc_path
            }
        })

        return doc_id

   Search(self, query: str, query_image: str = None) -> dict:
        """多模态检索"""
        # 构建查询
        if query_image:
            image_desc = self.vision.describe_image(query_image)
            combined_query = f"{query}\n\n参考图片:{image_desc}"
        else:
            combined_query = query

        # 检索
        query_embedding = self.llm.embed(combined_query)
        results = self.vector_store.search(query_embedding, top_k=5)

        return {
            "query": combined_query,
            "results": results
        }

面试高频问法

Q1: 如何实现图像 + 文本的 Agent?

【标准回答】

实现方法:

1. 视觉模型集成
   - 使用 GPT-4V 等视觉模型
   - 传入图片和文本
   - 获取统一响应

2. 多模态输入处理
   - 图片编码(base64)
   - 文本提示构建
   - 组合输入

3. RAG 扩展
   - 图片描述作为文档
   - 向量存储图片特征
   - 图片+文本检索

4. 输出处理
   - 文本响应
   - 图片生成(需要时)

框架支持:
- LangChain: MultiModalChain
- LlamaIndex: ImageNodeParser
- OpenAI: GPT-4V API

记忆要点

【多模态】

文本、图像、语音、视频

【图像处理】

理解:描述、检测、OCR
生成:DALL-E 3
检索:图像相似度

【语音处理】

识别:Google/Whisper
合成:TTS API
对话:ASR + TTS + LLM

【视频处理】

理解:关键帧提取
分析:场景检测
问答:VQA

【多模态融合】

上下文构建
向量嵌入
综合生成

文档版本: 1.0
最后更新: 2026-01-21

close
arrow_upward