内容纲要
多Agent系统设计与开发(详解版)
目标:精通多Agent协作、架构设计与工程实现
目录
多Agent系统概述
1. 为什么需要多Agent?
【单Agent的局限】
1. 能力瓶颈
- 上下文容量有限
- 工具调用数量受限
- 并发处理能力弱
2. 复杂任务困难
- 多步骤任务难以规划
- 知识覆盖面有限
- 缺乏专业化分工
3. 可扩展性差
- 功能扩展需要重新训练
- 模型切换成本高
- 难以模块化升级
【多Agent的优势】
1. 专业化分工
- 每个Agent专注特定领域
- 知识更深入
- 准确率更高
2. 并行处理
- 多任务并行执行
- 减少等待时间
- 提升整体效率
3. 容错能力
- 单个Agent失败不影响整体
- 自动重试和fallback
- 系统更稳定
4. 可扩展性强
- 模块化设计
- 动态增减Agent
- 独立升级优化
2. 典型应用场景
【代码开发场景】
需求分析Agent → 技术设计Agent → 代码实现Agent → 测试Agent → 文档Agent
【客服场景】
意图识别Agent → 常见问题Agent → 订单查询Agent → 人工接管Agent
【研究场景】
搜索Agent → 总结Agent → 分析Agent → 报告Agent
【医疗场景】
症状分析Agent → 诊断Agent → 治疗建议Agent → 风险评估Agent
协作模式
1. 顺序协作
【模式特点】
- Agent按顺序执行
- 前一个Agent的输出是后一个的输入
- 适合有明确依赖关系的任务
【架构图】
用户请求
↓
Agent A (任务A)
↓ (输出A)
Agent B (任务B) ← 使用输出A
↓ (输出B)
Agent C (任务C) ← 使用输出B
↓ (输出C)
最终结果
【代码示例】
from langchain.agents import AgentExecutor, LLMSingleActionAgent
from langchain.chains import SequentialChain
# 定义Agent
agent_a = AgentExecutor(
agent=agent_a,
tools=[search_tool],
verbose=True
)
agent_b = AgentExecutor(
agent=agent_b,
tools=[calculator_tool],
verbose=True
)
# 顺序执行
result_a = agent_a.invoke({"input": user_query})
result_b = agent_b.invoke({"input": result_a["output"]})
2. 并行协作
【模式特点】
- 多个Agent同时执行
- 互不依赖,可并行处理
- 最后汇总结果
【架构图】
用户请求
↓
┌────────┬────────┬────────┐
│Agent A │Agent B │Agent C │
└────────┴────────┴────────┘
↓ ↓ ↓
输出A 输出B 输出C
└───────┬───────┘
↓
汇总Agent
↓
最终结果
【代码示例】
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def parallel_execute(agents, input_data):
"""并行执行多个Agent"""
tasks = [agent.invoke(input_data) for agent in agents]
results = await asyncio.gather(*tasks)
return results
# 使用
agents = [agent_a, agent_b, agent_c]
results = await parallel_execute(agents, user_input)
3. 层次协作
【模式特点】
- 有主从关系
- Master负责任务分发和结果汇总
- Worker执行具体任务
【架构图】
用户请求
↓
┌──────────────────────┐
│ Master Agent │ 任务调度
│ - 任务分解 │ 结果汇总
│ - 负载均衡 │ 容错处理
└──────────┬──────────┘
↓
┌──────┴──────┐
│ Worker池 │
│ Agent A │
│ Agent B │
│ Agent C │
└─────────────┘
↓
最终结果
【代码示例】
class MasterAgent:
def __init__(self, workers):
self.workers = workers
self.task_queue = Queue()
def distribute_task(self, task):
"""分发任务"""
# 选择合适的Worker
worker = self.select_worker(task)
result = worker.execute(task)
return result
def orchestrate(self, user_request):
"""编排整个任务"""
# 1. 任务分解
subtasks = self.decompose(user_request)
# 2. 并行执行
results = []
for task in subtasks:
result = self.distribute_task(task)
results.append(result)
# 3. 结果汇总
return self.aggregate(results)
4. 议论协作
【模式特点】
- 多个Agent讨论同一个问题
- 通过对话达成共识
- 适合需要多方案对比的场景
【架构图】
用户请求
↓
┌──────────────────────────┐
│ 议论协调器 │
└──────────┬───────────────┘
│
┌───────┼───────┐
↓ ↓ ↓
┌──────┐┌──────┐┌──────┐
│Agent 1││Agent 2││Agent 3│
│观点A ││观点B ││观点C │
└───┬──┘└───┬──┘└───┬──┘
│ │ │
└──────┼──────┘
↓
评估Agent
综合意见
↓
最终方案
【代码示例】
class DiscussionAgent:
def __init__(self, agents, max_rounds=3):
self.agents = agents
self.max_rounds = max_rounds
def discuss(self, topic):
"""组织讨论"""
context = {"topic": topic, "messages": []}
for round in range(self.max_rounds):
# 每个Agent发言
for agent in self.agents:
response = agent.speak(context)
context["messages"].append({
"agent": agent.name,
"message": response
})
# 检查是否达成共识
if self.check_consensus(context):
break
return self.synthesize(context)
def check_consensus(self, context):
"""检查是否达成共识"""
# 实现共识检测逻辑
pass
def synthesize(self, context):
"""综合各方意见"""
# 实现意见综合逻辑
pass
架构设计
1. 系统架构
【分层架构】
┌─────────────────────────────────────┐
│ 用户交互层 │
│ - Web界面 │
│ - API接口 │
│ - 消息网关 │
└─────────────────┬───────────────┘
│
┌─────────────────┴───────────────┐
│ 编排层 │
│ - 任务编排器 │
│ - 工作流引擎 │
│ - 状态管理 │
└─────────────────┬───────────────┘
│
┌─────────────────┴───────────────┐
│ Agent层 │
│ ┌──────┬──────┬──────┐ │
│ │Agent A│Agent B│Agent C│ ... │
│ └──────┴──────┴──────┘ │
└─────────────────┬───────────────┘
│
┌─────────────────┴───────────────┐
│ 服务层 │
│ - LLM服务 │
│ - 工具服务 │
│ - 存储服务 │
└─────────────────┬───────────────┘
│
┌─────────────────┴───────────────┐
│ 基础设施层 │
│ - 向量数据库 │
│ - 消息队列 │
│ - 缓存 │
└───────────────────────────────────┘
2. 核心组件
【Agent Registry(Agent注册中心)】
职责:
- 管理所有Agent的注册信息
- 提供Agent查询和发现
- 管理Agent生命周期
代码实现:
class AgentRegistry:
def __init__(self):
self.agents = {}
def register(self, agent):
"""注册Agent"""
self.agents[agent.id] = agent
agent.start()
def unregister(self, agent_id):
"""注销Agent"""
agent = self.agents.get(agent_id)
if agent:
agent.stop()
del self.agents[agent_id]
def get_agent(self, agent_id):
"""获取Agent"""
return self.agents.get(agent_id)
def find_agents(self, capability):
"""根据能力查找Agent"""
return [
agent for agent in self.agents.values()
if agent.has_capability(capability)
]
【Task Planner(任务规划器)】
职责:
- 解析用户请求
- 分解为子任务
- 确定执行顺序
代码实现:
class TaskPlanner:
def __init__(self, llm):
self.llm = llm
def plan(self, user_request):
"""规划任务"""
prompt = f"""
分析以下用户请求,将其分解为可执行的子任务:
用户请求:{user_request}
请以JSON格式返回任务列表,每个任务包含:
- name: 任务名称
- description: 任务描述
- required_capabilities: 所需能力
- dependencies: 依赖的任务ID
"""
response = self.llm.generate(prompt)
return self.parse_tasks(response)
【Workflow Engine(工作流引擎)】
职责:
- 执行工作流
- 管理任务状态
- 处理任务依赖
代码实现:
class WorkflowEngine:
def __init__(self, agent_registry):
self.registry = agent_registry
self.task_queue = Queue()
self.results = {}
def execute_workflow(self, tasks):
"""执行工作流"""
# 拓扑排序
sorted_tasks = self.topological_sort(tasks)
for task in sorted_tasks:
# 等待依赖完成
self.wait_for_dependencies(task)
# 选择合适的Agent
agent = self.select_agent(task)
# 执行任务
result = agent.execute(task)
self.results[task.id] = result
return self.results
def select_agent(self, task):
"""选择执行任务的Agent"""
candidates = self.registry.find_agents(task.capability)
# 实现选择策略(负载均衡、能力匹配等)
return self.balance_select(candidates)
【Message Bus(消息总线)】
职责:
- Agent间通信
- 消息路由
- 消息持久化
代码实现:
class MessageBus:
def __init__(self):
self.subscribers = {}
self.message_queue = Queue()
def subscribe(self, agent, topic):
"""订阅消息"""
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(agent)
def publish(self, message):
"""发布消息"""
self.message_queue.put(message)
def start(self):
"""启动消息循环"""
while True:
message = self.message_queue.get()
subscribers = self.subscribers.get(message.topic, [])
for subscriber in subscribers:
subscriber.receive(message)
3. 状态管理
【任务状态】
class TaskState:
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
【任务追踪】
class TaskTracker:
def __init__(self):
self.tasks = {}
def create_task(self, task):
"""创建任务"""
self.tasks[task.id] = {
"task": task,
"state": TaskState.PENDING,
"start_time": None,
"end_time": None,
"result": None,
"error": None
}
def update_state(self, task_id, state, **kwargs):
"""更新任务状态"""
if task_id in self.tasks:
self.tasks[task_id].update({
"state": state,
**kwargs
})
def get_state(self, task_id):
"""获取任务状态"""
return self.tasks.get(task_id)
【分布式状态】
使用Redis实现分布式状态管理:
class DistributedTaskTracker(TaskTracker):
def __init__(self, redis_client):
self.redis = redis_client
self.key_prefix = "task:"
def create_task(self, task):
"""创建任务(分布式)"""
key = f"{self.key_prefix}{task.id}"
data = {
"task": task.to_dict(),
"state": TaskState.PENDING,
"start_time": None,
"end_time": None,
"result": None,
"error": None
}
self.redis.hset(key, mapping=data)
def update_state(self, task_id, state, **kwargs):
"""更新状态(分布式)"""
key = f"{self.key_prefix}{task_id}"
self.redis.hset(key, mapping={"state": state, **kwargs})
通信机制
1. 同步通信
【直接调用】
适合场景:
- 简单协作
- 响应时间要求高
- 串行执行
实现:
class SynchronousCommunication:
def send(self, receiver, message):
"""发送消息并等待响应"""
response = receiver.receive(message)
return response
# 使用
agent_a.send(agent_b, Message(content="Hello"))
2. 异步通信
【消息队列】
适合场景:
- 解耦Agent
- 缓冲消息
- 可靠传输
架构:
Agent A
↓ 发送消息
┌─────────────────┐
│ 消息队列 │ (RabbitMQ/Kafka)
│ 消息持久化 │
│ 消息路由 │
└─────────────────┘
↓ 消费消息
Agent B
实现:
import pika
class AsyncCommunication:
def __init__(self, queue_url):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(queue_url)
)
self.channel = self.connection.channel()
def send(self, queue_name, message):
"""发送消息(异步)"""
self.channel.queue_declare(queue=queue_name)
self.channel.basic_publish(
exchange='',
routing_key=queue_name,
body=json.dumps(message)
)
def receive(self, queue_name, callback):
"""接收消息(异步)"""
self.channel.queue_declare(queue=queue_name)
self.channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
self.channel.start_consuming()
3. 共享内存
【共享状态】
适合场景:
- 高频交互
- 状态同步
- 低延迟要求
实现:
import threading
class SharedMemory:
def __init__(self):
self.data = {}
self.lock = threading.Lock()
def write(self, key, value):
"""写入数据(线程安全)"""
with self.lock:
self.data[key] = value
def read(self, key):
"""读取数据(线程安全)"""
with self.lock:
return self.data.get(key)
# 使用
shared_memory = SharedMemory()
agent_a.write("status", "processing")
status = agent_b.read("status")
任务调度
1. 调度策略
【FIFO(先进先出)】
class FIFOScheduler:
def __init__(self):
self.queue = Queue()
def schedule(self, task):
"""调度任务"""
self.queue.put(task)
def get_next(self):
"""获取下一个任务"""
return self.queue.get()
【优先级调度】
class PriorityScheduler:
def __init__(self):
self.queue = PriorityQueue()
def schedule(self, task):
"""按优先级调度"""
self.queue.put((task.priority, task))
def get_next(self):
"""获取优先级最高的任务"""
return self.queue.get()[1]
【负载均衡调度】
class LoadBalancerScheduler:
def __init__(self, agents):
self.agents = agents
self.task_counts = {agent.id: 0 for agent in agents}
def select_agent(self, task):
"""选择负载最低的Agent"""
min_agent = min(
self.task_counts.items(),
key=lambda x: x[1]
)[0]
return self.agents[min_agent]
def update_load(self, agent_id):
"""更新负载"""
self.task_counts[agent_id] += 1
2. 任务依赖
【DAG(有向无环图)】
class TaskDAG:
def __init__(self):
self.tasks = {}
self.edges = {}
def add_task(self, task):
"""添加任务"""
self.tasks[task.id] = task
self.edges[task.id] = []
def add_dependency(self, from_task, to_task):
"""添加依赖"""
self.edges[from_task].append(to_task)
def topological_sort(self):
"""拓扑排序"""
# 使用Kahn算法
in_degree = {task: 0 for task in self.tasks}
for task_id in self.tasks:
for dependent in self.edges[task_id]:
in_degree[dependent] += 1
queue = deque([
task for task, degree in in_degree.items()
if degree == 0
])
result = []
while queue:
task = queue.popleft()
result.append(task)
for dependent in self.edges[task]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
return result
工程实现
1. LangChain Multi-Agent
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.tools import Tool
from langchain.chains import SequentialChain, LLMChain
# 定义工具
search_tool = Tool(
name="Search",
func=search_function,
description="搜索网络信息"
)
calculator_tool = Tool(
name="Calculator",
func=calculate,
description="进行数学计算"
)
# 创建Agent
agent1 = create_openai_functions_agent(
llm=llm,
tools=[search_tool]
)
agent2 = create_openai_functions_agent(
llm=llm,
tools=[calculator_tool]
)
# 创建执行器
executor1 = AgentExecutor(agent=agent1, tools=[search_tool])
executor2 = AgentExecutor(agent=agent2, tools=[calculator_tool])
# 顺序执行
chain = SequentialChain(
chains=[executor1, executor2],
input_variables=["query"]
)
result = chain.invoke({"query": user_query})
2. AutoGPT
from autogpt import AutoGPT
# 配置AutoGPT
config = {
"openai_api_key": "your-api-key",
"llm_model": "gpt-4",
"max_iterations": 10,
"tools": [
"web_search",
"file_operations",
"code_interpreter"
]
}
"agent_name": "CodingAgent",
"agent_role": "AI coding assistant",
"agent_goals": [
"Write clean code",
"Follow best practices",
"Add tests"
]
}
# 创建AutoGPT
autogpt = AutoGPT(**config)
# 运行
result = autogpt.run("Create a REST API for user management")
3. BabyAGI
from babyagi import BabyAGI
# 定义任务列表
def initial_task():
return "Research and write about quantum computing"
# 配置BabyAGI
baby_agi = BabyAGI(
llm=llm,
vector_db=vector_db,
tools=[search_tool, write_tool],
max_loops=5
)
# 运行
result = baby_agi.run(initial_task())
4. 自定义Multi-Agent框架
class MultiAgentFramework:
def __init__(self, config):
self.config = config
self.agents = {}
self.message_bus = MessageBus()
self.scheduler = TaskScheduler()
self.tracker = TaskTracker()
def register_agent(self, agent):
"""注册Agent"""
self.agents[agent.id] = agent
self.message_bus.subscribe(agent, agent.topic)
def execute(self, user_request):
"""执行用户请求"""
# 1. 规划任务
tasks = self.plan_tasks(user_request)
# 2. 调度执行
for task in tasks:
self.scheduler.schedule(task)
# 3. 执行任务
results = []
while not self.scheduler.empty():
task = self.scheduler.get_next()
agent = self.select_agent(task)
result = agent.execute(task)
results.append(result)
return self.aggregate_results(results)
def select_agent(self, task):
"""选择执行任务的Agent"""
# 实现Agent选择逻辑
candidates = [
agent for agent in self.agents.values()
if agent.can_handle(task)
]
return self.load_balance_select(candidates)
实战案例
1. 代码开发助手系统
# 定义不同角色的Agent
class RequirementsAnalyst(Agent):
"""需求分析Agent"""
def __init__(self, llm):
super().__init__(
name="RequirementsAnalyst",
role="分析需求并制定功能规格",
llm=llm
)
class Architect(Agent):
"""架构设计Agent"""
def __init__(self, llm):
super().__init__(
name="Architect",
role="设计系统架构和技术选型",
llm=llm
)
class Developer(Agent):
"""代码实现Agent"""
def __init__(self, llm):
super().__init__(
name="Developer",
role="编写高质量代码",
tools=[file_tool, search_tool],
llm=llm
)
class Tester(Agent):
"""测试Agent"""
def __init__(self, llm):
super().__init__(
name="Tester",
role="编写并运行测试",
tools=[test_tool, code_tool],
llm=llm
)
# 创建多Agent系统
class CodeDevelopmentSystem:
def __init__(self, llm):
self.analyst = RequirementsAnalyst(llm)
self.architect = Architect(llm)
self.developer = Developer(llm)
self.tester = Tester(llm)
def develop_feature(self, requirement):
"""开发功能"""
# 1. 分析需求
specs = self.analyst.execute(
f"分析需求: {requirement}"
)
# 2. 设计架构
architecture = self.architect.execute(
f"基于需求设计架构: {specs}"
)
# 3. 实现代码
code = self.developer.execute(
f"根据架构实现代码: {architecture}"
)
# 4. 编写测试
tests = self.tester.execute(
f"为代码编写测试: {code}"
)
# 5. 运行测试
test_results = self.tester.execute(
f"运行测试: {tests}"
)
return {
"specs": specs,
"architecture": architecture,
"code": code,
"tests": tests,
"results": test_results
}
2. 智能客服系统
class IntentClassifier(Agent):
"""意图识别Agent"""
def __init__(self, llm):
super().__init__(
name="IntentClassifier",
role="识别用户意图",
llm=llm
)
class FAQAgent(Agent):
"""常见问题Agent"""
def __init__(self, llm, vector_db):
super().__init__(
name="FAQAgent",
role="回答常见问题",
tools=[vector_db_tool],
llm=llm
)
class OrderAgent(Agent):
"""订单查询Agent"""
def __init__(self, llm, database):
super().__init__(
name="OrderAgent",
role="查询订单信息",
tools=[database_tool],
llm=llm
)
class HumanAgent(Agent):
"""人工接管Agent"""
def __init__(self):
super().__init__(
name="HumanAgent",
role="转接人工客服"
)
class CustomerServiceSystem:
def __init__(self, llm, vector_db, database):
self.intent_classifier = IntentClassifier(llm)
self.faq_agent = FAQAgent(llm, vector_db)
self.order_agent = OrderAgent(llm, database)
self.human_agent = HumanAgent()
def handle_query(self, user_query):
"""处理用户查询"""
# 1. 识别意图
intent = self.intent_classifier.execute(user_query)
# 2. 根据意图路由到对应Agent
if intent == "faq":
response = self.faq_agent.execute(user_query)
elif intent == "order":
response = self.order_agent.execute(user_query)
elif intent == "complaint":
response = self.human_agent.execute(user_query)
else:
response = "对不起,我不理解您的问题"
return response
面试高频问法
Q1: 单Agent vs 多Agent?
【标准回答】
单Agent:
优势:
- 实现简单
- 上下文连续
- 调试容易
劣势:
- 能力有限
- 扩展性差
- 单点故障
多Agent:
优势:
- 专业化分工
- 并行处理
- 容错能力强
- 可扩展性好
劣势:
- 实现复杂
- 协调开销
- 状态管理难
选择建议:
- 简单任务:单Agent
- 复杂任务:多Agent
- 高并发:多Agent
Q2: 多Agent如何通信?
【标准回答】
通信方式:
1. 同步通信
- 直接调用
- 适合简单场景
- 响应快
2. 异步通信
- 消息队列
- 解耦Agent
- 可靠传输
3. 共享内存
- 共享状态
- 高频交互
- 低延迟
选择:
- 低延迟:共享内存
- 高可靠:消息队列
- 简单场景:同步调用
Q3: 如何避免循环依赖?
【标准回答】
循环依赖问题:
1. 检测方法
- 拓扑排序失败
- 检测环
2. 解决方案
- 拆分任务
- 引入中间Agent
- 使用共享状态
3. 预防措施
- DAG设计
- 依赖检查
- 限制依赖层级
记忆要点
【多Agent优势】
专业化分工
并行处理
容错能力
可扩展性强
【协作模式】
顺序协作:有依赖关系
并行协作:互不依赖
层次协作:主从关系
议论协作:多方案对比
【架构】
编排层:任务编排、工作流
Agent层:专业分工、协作执行
服务层:LLM、工具、存储
基础层:向量库、消息队列、缓存
【通信机制】
同步:直接调用
异步:消息队列
共享:共享内存
【任务调度】
FIFO:先进先出
优先级:按优先级
负载均衡:最低负载
【框架】
LangChain Multi-Agent
AutoGPT
BabyAGI
自定义框架
文档版本: 1.0