【AI Agent 知识库】09-多Agent系统设计与开发

内容纲要

多Agent系统设计与开发(详解版)

目标:精通多Agent协作、架构设计与工程实现


目录


多Agent系统概述

1. 为什么需要多Agent?

【单Agent的局限】

1. 能力瓶颈
   - 上下文容量有限
   - 工具调用数量受限
   - 并发处理能力弱

2. 复杂任务困难
   - 多步骤任务难以规划
   - 知识覆盖面有限
   - 缺乏专业化分工

3. 可扩展性差
   - 功能扩展需要重新训练
   - 模型切换成本高
   - 难以模块化升级

【多Agent的优势】

1. 专业化分工
   - 每个Agent专注特定领域
   - 知识更深入
   - 准确率更高

2. 并行处理
   - 多任务并行执行
   - 减少等待时间
   - 提升整体效率

3. 容错能力
   - 单个Agent失败不影响整体
   - 自动重试和fallback
   - 系统更稳定

4. 可扩展性强
   - 模块化设计
   - 动态增减Agent
   - 独立升级优化

2. 典型应用场景

【代码开发场景】

需求分析Agent → 技术设计Agent → 代码实现Agent → 测试Agent → 文档Agent

【客服场景】

意图识别Agent → 常见问题Agent → 订单查询Agent → 人工接管Agent

【研究场景】

搜索Agent → 总结Agent → 分析Agent → 报告Agent

【医疗场景】

症状分析Agent → 诊断Agent → 治疗建议Agent → 风险评估Agent

协作模式

1. 顺序协作

【模式特点】

- Agent按顺序执行
- 前一个Agent的输出是后一个的输入
- 适合有明确依赖关系的任务

【架构图】

用户请求
    ↓
Agent A (任务A)
    ↓ (输出A)
Agent B (任务B) ← 使用输出A
    ↓ (输出B)
Agent C (任务C) ← 使用输出B
    ↓ (输出C)
最终结果

【代码示例】

from langchain.agents import AgentExecutor, LLMSingleActionAgent
from langchain.chains import SequentialChain

# 定义Agent
agent_a = AgentExecutor(
    agent=agent_a,
    tools=[search_tool],
    verbose=True
)

agent_b = AgentExecutor(
    agent=agent_b,
    tools=[calculator_tool],
    verbose=True
)

# 顺序执行
result_a = agent_a.invoke({"input": user_query})
result_b = agent_b.invoke({"input": result_a["output"]})

2. 并行协作

【模式特点】

- 多个Agent同时执行
- 互不依赖,可并行处理
- 最后汇总结果

【架构图】

用户请求
    ↓
┌────────┬────────┬────────┐
│Agent A │Agent B │Agent C │
└────────┴────────┴────────┘
    ↓       ↓       ↓
输出A   输出B   输出C
    └───────┬───────┘
            ↓
        汇总Agent
            ↓
        最终结果

【代码示例】

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def parallel_execute(agents, input_data):
    """并行执行多个Agent"""
    tasks = [agent.invoke(input_data) for agent in agents]
    results = await asyncio.gather(*tasks)
    return results

# 使用
agents = [agent_a, agent_b, agent_c]
results = await parallel_execute(agents, user_input)

3. 层次协作

【模式特点】

- 有主从关系
- Master负责任务分发和结果汇总
- Worker执行具体任务

【架构图】

用户请求
    ↓
┌──────────────────────┐
│   Master Agent      │  任务调度
│  - 任务分解        │  结果汇总
│  - 负载均衡        │  容错处理
└──────────┬──────────┘
           ↓
    ┌──────┴──────┐
    │  Worker池   │
    │  Agent A    │
    │  Agent B    │
    │  Agent C    │
    └─────────────┘
           ↓
        最终结果

【代码示例】

class MasterAgent:
    def __init__(self, workers):
        self.workers = workers
        self.task_queue = Queue()

    def distribute_task(self, task):
        """分发任务"""
        # 选择合适的Worker
        worker = self.select_worker(task)
        result = worker.execute(task)
        return result

    def orchestrate(self, user_request):
        """编排整个任务"""
        # 1. 任务分解
        subtasks = self.decompose(user_request)

        # 2. 并行执行
        results = []
        for task in subtasks:
            result = self.distribute_task(task)
            results.append(result)

        # 3. 结果汇总
        return self.aggregate(results)

4. 议论协作

【模式特点】

- 多个Agent讨论同一个问题
- 通过对话达成共识
- 适合需要多方案对比的场景

【架构图】

用户请求
    ↓
┌──────────────────────────┐
│      议论协调器        │
└──────────┬───────────────┘
           │
   ┌───────┼───────┐
   ↓       ↓       ↓
┌──────┐┌──────┐┌──────┐
│Agent 1││Agent 2││Agent 3│
│观点A ││观点B ││观点C │
└───┬──┘└───┬──┘└───┬──┘
    │      │      │
    └──────┼──────┘
           ↓
        评估Agent
        综合意见
           ↓
        最终方案

【代码示例】

class DiscussionAgent:
    def __init__(self, agents, max_rounds=3):
        self.agents = agents
        self.max_rounds = max_rounds

    def discuss(self, topic):
        """组织讨论"""
        context = {"topic": topic, "messages": []}

        for round in range(self.max_rounds):
            # 每个Agent发言
            for agent in self.agents:
                response = agent.speak(context)
                context["messages"].append({
                    "agent": agent.name,
                    "message": response
                })

            # 检查是否达成共识
            if self.check_consensus(context):
                break

        return self.synthesize(context)

    def check_consensus(self, context):
        """检查是否达成共识"""
        # 实现共识检测逻辑
        pass

    def synthesize(self, context):
        """综合各方意见"""
        # 实现意见综合逻辑
        pass

架构设计

1. 系统架构

【分层架构】

┌─────────────────────────────────────┐
│         用户交互层               │
│  - Web界面                      │
│  - API接口                      │
│  - 消息网关                      │
└─────────────────┬───────────────┘
                  │
┌─────────────────┴───────────────┐
│         编排层                  │
│  - 任务编排器                    │
│  - 工作流引擎                    │
│  - 状态管理                      │
└─────────────────┬───────────────┘
                  │
┌─────────────────┴───────────────┐
│         Agent层                │
│  ┌──────┬──────┬──────┐      │
│  │Agent A│Agent B│Agent C│ ... │
│  └──────┴──────┴──────┘      │
└─────────────────┬───────────────┘
                  │
┌─────────────────┴───────────────┐
│         服务层                  │
│  - LLM服务                      │
│  - 工具服务                      │
│  - 存储服务                      │
└─────────────────┬───────────────┘
                  │
┌─────────────────┴───────────────┐
│         基础设施层              │
│  - 向量数据库                    │
│  - 消息队列                      │
│  - 缓存                          │
└───────────────────────────────────┘

2. 核心组件

【Agent Registry(Agent注册中心)】

职责:
- 管理所有Agent的注册信息
- 提供Agent查询和发现
- 管理Agent生命周期

代码实现:

class AgentRegistry:
    def __init__(self):
        self.agents = {}

    def register(self, agent):
        """注册Agent"""
        self.agents[agent.id] = agent
        agent.start()

    def unregister(self, agent_id):
        """注销Agent"""
        agent = self.agents.get(agent_id)
        if agent:
            agent.stop()
            del self.agents[agent_id]

    def get_agent(self, agent_id):
        """获取Agent"""
        return self.agents.get(agent_id)

    def find_agents(self, capability):
        """根据能力查找Agent"""
        return [
            agent for agent in self.agents.values()
            if agent.has_capability(capability)
        ]

【Task Planner(任务规划器)】

职责:
- 解析用户请求
- 分解为子任务
- 确定执行顺序

代码实现:

class TaskPlanner:
    def __init__(self, llm):
        self.llm = llm

    def plan(self, user_request):
        """规划任务"""
        prompt = f"""
        分析以下用户请求,将其分解为可执行的子任务:

        用户请求:{user_request}

        请以JSON格式返回任务列表,每个任务包含:
        - name: 任务名称
        - description: 任务描述
        - required_capabilities: 所需能力
        - dependencies: 依赖的任务ID
        """

        response = self.llm.generate(prompt)
        return self.parse_tasks(response)

【Workflow Engine(工作流引擎)】

职责:
- 执行工作流
- 管理任务状态
- 处理任务依赖

代码实现:

class WorkflowEngine:
    def __init__(self, agent_registry):
        self.registry = agent_registry
        self.task_queue = Queue()
        self.results = {}

    def execute_workflow(self, tasks):
        """执行工作流"""
        # 拓扑排序
        sorted_tasks = self.topological_sort(tasks)

        for task in sorted_tasks:
            # 等待依赖完成
            self.wait_for_dependencies(task)

            # 选择合适的Agent
            agent = self.select_agent(task)

            # 执行任务
            result = agent.execute(task)
            self.results[task.id] = result

        return self.results

    def select_agent(self, task):
        """选择执行任务的Agent"""
        candidates = self.registry.find_agents(task.capability)
        # 实现选择策略(负载均衡、能力匹配等)
        return self.balance_select(candidates)

【Message Bus(消息总线)】

职责:
- Agent间通信
- 消息路由
- 消息持久化

代码实现:

class MessageBus:
    def __init__(self):
        self.subscribers = {}
        self.message_queue = Queue()

    def subscribe(self, agent, topic):
        """订阅消息"""
        if topic not in self.subscribers:
            self.subscribers[topic] = []
        self.subscribers[topic].append(agent)

    def publish(self, message):
        """发布消息"""
        self.message_queue.put(message)

    def start(self):
        """启动消息循环"""
        while True:
            message = self.message_queue.get()
            subscribers = self.subscribers.get(message.topic, [])
            for subscriber in subscribers:
                subscriber.receive(message)

3. 状态管理

【任务状态】

class TaskState:
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

【任务追踪】

class TaskTracker:
    def __init__(self):
        self.tasks = {}

    def create_task(self, task):
        """创建任务"""
        self.tasks[task.id] = {
            "task": task,
            "state": TaskState.PENDING,
            "start_time": None,
            "end_time": None,
            "result": None,
            "error": None
        }

    def update_state(self, task_id, state, **kwargs):
        """更新任务状态"""
        if task_id in self.tasks:
            self.tasks[task_id].update({
                "state": state,
                **kwargs
            })

    def get_state(self, task_id):
        """获取任务状态"""
        return self.tasks.get(task_id)

【分布式状态】

使用Redis实现分布式状态管理:

class DistributedTaskTracker(TaskTracker):
    def __init__(self, redis_client):
        self.redis = redis_client
        self.key_prefix = "task:"

    def create_task(self, task):
        """创建任务(分布式)"""
        key = f"{self.key_prefix}{task.id}"
        data = {
            "task": task.to_dict(),
            "state": TaskState.PENDING,
            "start_time": None,
            "end_time": None,
            "result": None,
            "error": None
        }
        self.redis.hset(key, mapping=data)

    def update_state(self, task_id, state, **kwargs):
        """更新状态(分布式)"""
        key = f"{self.key_prefix}{task_id}"
        self.redis.hset(key, mapping={"state": state, **kwargs})

通信机制

1. 同步通信

【直接调用】

适合场景:
- 简单协作
- 响应时间要求高
- 串行执行

实现:

class SynchronousCommunication:
    def send(self, receiver, message):
        """发送消息并等待响应"""
        response = receiver.receive(message)
        return response

# 使用
agent_a.send(agent_b, Message(content="Hello"))

2. 异步通信

【消息队列】

适合场景:
- 解耦Agent
- 缓冲消息
- 可靠传输

架构:

Agent A
    ↓ 发送消息
┌─────────────────┐
│   消息队列       │  (RabbitMQ/Kafka)
│  消息持久化      │
│  消息路由        │
└─────────────────┘
    ↓ 消费消息
Agent B

实现:

import pika

class AsyncCommunication:
    def __init__(self, queue_url):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(queue_url)
        )
        self.channel = self.connection.channel()

    def send(self, queue_name, message):
        """发送消息(异步)"""
        self.channel.queue_declare(queue=queue_name)
        self.channel.basic_publish(
            exchange='',
            routing_key=queue_name,
            body=json.dumps(message)
        )

    def receive(self, queue_name, callback):
        """接收消息(异步)"""
        self.channel.queue_declare(queue=queue_name)
        self.channel.basic_consume(
            queue=queue_name,
            on_message_callback=callback,
            auto_ack=True
        )
        self.channel.start_consuming()

3. 共享内存

【共享状态】

适合场景:
- 高频交互
- 状态同步
- 低延迟要求

实现:

import threading

class SharedMemory:
    def __init__(self):
        self.data = {}
        self.lock = threading.Lock()

    def write(self, key, value):
        """写入数据(线程安全)"""
        with self.lock:
            self.data[key] = value

    def read(self, key):
        """读取数据(线程安全)"""
        with self.lock:
            return self.data.get(key)

# 使用
shared_memory = SharedMemory()
agent_a.write("status", "processing")
status = agent_b.read("status")

任务调度

1. 调度策略

【FIFO(先进先出)】

class FIFOScheduler:
    def __init__(self):
        self.queue = Queue()

    def schedule(self, task):
        """调度任务"""
        self.queue.put(task)

    def get_next(self):
        """获取下一个任务"""
        return self.queue.get()

【优先级调度】

class PriorityScheduler:
    def __init__(self):
        self.queue = PriorityQueue()

    def schedule(self, task):
        """按优先级调度"""
        self.queue.put((task.priority, task))

    def get_next(self):
        """获取优先级最高的任务"""
        return self.queue.get()[1]

【负载均衡调度】

class LoadBalancerScheduler:
    def __init__(self, agents):
        self.agents = agents
        self.task_counts = {agent.id: 0 for agent in agents}

    def select_agent(self, task):
        """选择负载最低的Agent"""
        min_agent = min(
            self.task_counts.items(),
            key=lambda x: x[1]
        )[0]
        return self.agents[min_agent]

    def update_load(self, agent_id):
        """更新负载"""
        self.task_counts[agent_id] += 1

2. 任务依赖

【DAG(有向无环图)】

class TaskDAG:
    def __init__(self):
        self.tasks = {}
        self.edges = {}

    def add_task(self, task):
        """添加任务"""
        self.tasks[task.id] = task
        self.edges[task.id] = []

    def add_dependency(self, from_task, to_task):
        """添加依赖"""
        self.edges[from_task].append(to_task)

    def topological_sort(self):
        """拓扑排序"""
        # 使用Kahn算法
        in_degree = {task: 0 for task in self.tasks}
        for task_id in self.tasks:
            for dependent in self.edges[task_id]:
                in_degree[dependent] += 1

        queue = deque([
            task for task, degree in in_degree.items()
            if degree == 0
        ])
        result = []

        while queue:
            task = queue.popleft()
            result.append(task)

            for dependent in self.edges[task]:
                in_degree[dependent] -= 1
                if in_degree[dependent] == 0:
                    queue.append(dependent)

        return result

工程实现

1. LangChain Multi-Agent

from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.tools import Tool
from langchain.chains import SequentialChain, LLMChain

# 定义工具
search_tool = Tool(
    name="Search",
    func=search_function,
    description="搜索网络信息"
)

calculator_tool = Tool(
    name="Calculator",
    func=calculate,
    description="进行数学计算"
)

# 创建Agent
agent1 = create_openai_functions_agent(
    llm=llm,
    tools=[search_tool]
)

agent2 = create_openai_functions_agent(
    llm=llm,
    tools=[calculator_tool]
)

# 创建执行器
executor1 = AgentExecutor(agent=agent1, tools=[search_tool])
executor2 = AgentExecutor(agent=agent2, tools=[calculator_tool])

# 顺序执行
chain = SequentialChain(
    chains=[executor1, executor2],
    input_variables=["query"]
)

result = chain.invoke({"query": user_query})

2. AutoGPT

from autogpt import AutoGPT

# 配置AutoGPT
config = {
    "openai_api_key": "your-api-key",
    "llm_model": "gpt-4",
    "max_iterations": 10,
    "tools": [
        "web_search",
        "file_operations",
        "code_interpreter"
    ]
}

    "agent_name": "CodingAgent",
    "agent_role": "AI coding assistant",
    "agent_goals": [
        "Write clean code",
        "Follow best practices",
        "Add tests"
    ]
}

# 创建AutoGPT
autogpt = AutoGPT(**config)

# 运行
result = autogpt.run("Create a REST API for user management")

3. BabyAGI

from babyagi import BabyAGI

# 定义任务列表
def initial_task():
    return "Research and write about quantum computing"

# 配置BabyAGI
baby_agi = BabyAGI(
    llm=llm,
    vector_db=vector_db,
    tools=[search_tool, write_tool],
    max_loops=5
)

# 运行
result = baby_agi.run(initial_task())

4. 自定义Multi-Agent框架

class MultiAgentFramework:
    def __init__(self, config):
        self.config = config
        self.agents = {}
        self.message_bus = MessageBus()
        self.scheduler = TaskScheduler()
        self.tracker = TaskTracker()

    def register_agent(self, agent):
        """注册Agent"""
        self.agents[agent.id] = agent
        self.message_bus.subscribe(agent, agent.topic)

    def execute(self, user_request):
        """执行用户请求"""
        # 1. 规划任务
        tasks = self.plan_tasks(user_request)

        # 2. 调度执行
        for task in tasks:
            self.scheduler.schedule(task)

        # 3. 执行任务
        results = []
        while not self.scheduler.empty():
            task = self.scheduler.get_next()
            agent = self.select_agent(task)
            result = agent.execute(task)
            results.append(result)

        return self.aggregate_results(results)

    def select_agent(self, task):
        """选择执行任务的Agent"""
        # 实现Agent选择逻辑
        candidates = [
            agent for agent in self.agents.values()
            if agent.can_handle(task)
        ]
        return self.load_balance_select(candidates)

实战案例

1. 代码开发助手系统

# 定义不同角色的Agent

class RequirementsAnalyst(Agent):
    """需求分析Agent"""
    def __init__(self, llm):
        super().__init__(
            name="RequirementsAnalyst",
            role="分析需求并制定功能规格",
            llm=llm
        )

class Architect(Agent):
    """架构设计Agent"""
    def __init__(self, llm):
        super().__init__(
            name="Architect",
            role="设计系统架构和技术选型",
            llm=llm
        )

class Developer(Agent):
    """代码实现Agent"""
    def __init__(self, llm):
        super().__init__(
            name="Developer",
            role="编写高质量代码",
            tools=[file_tool, search_tool],
            llm=llm
        )

class Tester(Agent):
    """测试Agent"""
    def __init__(self, llm):
        super().__init__(
            name="Tester",
            role="编写并运行测试",
            tools=[test_tool, code_tool],
            llm=llm
        )

# 创建多Agent系统
class CodeDevelopmentSystem:
    def __init__(self, llm):
        self.analyst = RequirementsAnalyst(llm)
        self.architect = Architect(llm)
        self.developer = Developer(llm)
        self.tester = Tester(llm)

    def develop_feature(self, requirement):
        """开发功能"""
        # 1. 分析需求
        specs = self.analyst.execute(
            f"分析需求: {requirement}"
        )

        # 2. 设计架构
        architecture = self.architect.execute(
            f"基于需求设计架构: {specs}"
        )

        # 3. 实现代码
        code = self.developer.execute(
            f"根据架构实现代码: {architecture}"
        )

        # 4. 编写测试
        tests = self.tester.execute(
            f"为代码编写测试: {code}"
        )

        # 5. 运行测试
        test_results = self.tester.execute(
            f"运行测试: {tests}"
        )

        return {
            "specs": specs,
            "architecture": architecture,
            "code": code,
            "tests": tests,
            "results": test_results
        }

2. 智能客服系统

class IntentClassifier(Agent):
    """意图识别Agent"""
    def __init__(self, llm):
        super().__init__(
            name="IntentClassifier",
            role="识别用户意图",
            llm=llm
        )

class FAQAgent(Agent):
    """常见问题Agent"""
    def __init__(self, llm, vector_db):
        super().__init__(
            name="FAQAgent",
            role="回答常见问题",
            tools=[vector_db_tool],
            llm=llm
        )

class OrderAgent(Agent):
    """订单查询Agent"""
    def __init__(self, llm, database):
        super().__init__(
            name="OrderAgent",
            role="查询订单信息",
            tools=[database_tool],
            llm=llm
        )

class HumanAgent(Agent):
    """人工接管Agent"""
    def __init__(self):
        super().__init__(
            name="HumanAgent",
            role="转接人工客服"
        )

class CustomerServiceSystem:
    def __init__(self, llm, vector_db, database):
        self.intent_classifier = IntentClassifier(llm)
        self.faq_agent = FAQAgent(llm, vector_db)
        self.order_agent = OrderAgent(llm, database)
        self.human_agent = HumanAgent()

    def handle_query(self, user_query):
        """处理用户查询"""
        # 1. 识别意图
        intent = self.intent_classifier.execute(user_query)

        # 2. 根据意图路由到对应Agent
        if intent == "faq":
            response = self.faq_agent.execute(user_query)
        elif intent == "order":
            response = self.order_agent.execute(user_query)
        elif intent == "complaint":
            response = self.human_agent.execute(user_query)
        else:
            response = "对不起,我不理解您的问题"

        return response

面试高频问法

Q1: 单Agent vs 多Agent?

【标准回答】

单Agent:
优势:
- 实现简单
- 上下文连续
- 调试容易

劣势:
- 能力有限
- 扩展性差
- 单点故障

多Agent:
优势:
- 专业化分工
- 并行处理
- 容错能力强
- 可扩展性好

劣势:
- 实现复杂
- 协调开销
- 状态管理难

选择建议:
- 简单任务:单Agent
- 复杂任务:多Agent
- 高并发:多Agent

Q2: 多Agent如何通信?

【标准回答】

通信方式:

1. 同步通信
   - 直接调用
   - 适合简单场景
   - 响应快

2. 异步通信
   - 消息队列
   - 解耦Agent
   - 可靠传输

3. 共享内存
   - 共享状态
   - 高频交互
   - 低延迟

选择:
- 低延迟:共享内存
- 高可靠:消息队列
- 简单场景:同步调用

Q3: 如何避免循环依赖?

【标准回答】

循环依赖问题:

1. 检测方法
   - 拓扑排序失败
   - 检测环

2. 解决方案
   - 拆分任务
   - 引入中间Agent
   - 使用共享状态

3. 预防措施
   - DAG设计
   - 依赖检查
   - 限制依赖层级

记忆要点

【多Agent优势】

专业化分工
并行处理
容错能力
可扩展性强

【协作模式】

顺序协作:有依赖关系
并行协作:互不依赖
层次协作:主从关系
议论协作:多方案对比

【架构】

编排层:任务编排、工作流
Agent层:专业分工、协作执行
服务层:LLM、工具、存储
基础层:向量库、消息队列、缓存

【通信机制】

同步:直接调用
异步:消息队列
共享:共享内存

【任务调度】

FIFO:先进先出
优先级:按优先级
负载均衡:最低负载

【框架】

LangChain Multi-Agent
AutoGPT
BabyAGI
自定义框架

文档版本: 1.0

close
arrow_upward