内容纲要
Agent 通信协议
目标:掌握 Agent 间通信的各种协议和实现方式
目录
通信协议概述
Agent 通信场景
【Agent 间协作】
场景 | 适用协议 | 特点
--------------------|--------------------|------------------
Agent 间调用 | gRPC / HTTP | 高性能 / 简单
实时交互 | WebSocket | 双向通信
事件通知 | 消息队列 | 异步解耦
状态共享 | 共享内存 / Redis | 低延迟
跨服务通信 | HTTP / gRPC | 服务间调用
流式数据传输 | WebSocket / HTTP | 流式输出
协议选择指南
【选择决策树】
需要高性能实时调用?
├─ 是 → 需要类型安全?
│ ├─ 是 → gRPC
│ └─ 否 → WebSocket
└─ 否 → 需要简单通用?
├─ 是 → HTTP REST
└─ 否 → 消息队列
HTTP/HTTPS 通信
1. RESTful API 设计
# HTTP Agent 通信客户端
import requests
import json
from typing import Dict, Optional
from dataclasses import dataclass
@dataclass
class AgentResponse:
status: str
data: Dict
error: Optional[str] = None
class HTTPAgentClient:
"""HTTP Agent 通信客户端"""
def __init__(
self,
base_url: str,
timeout: int = 30,
api_key: Optional[str] = None
):
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.api_key = api_key
self.session = requests.Session()
if api_key:
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
})
def query(
self,
agent_id: str,
query: str,
options: Optional[Dict] = None
) -> AgentResponse:
"""调用 Agent 查询接口"""
url = f"{self.base_url}/api/v1/agents/{agent_id}/query"
payload = {
"query": query,
"options": options or {}
}
try:
response = self.session.post(
url,
json=payload,
timeout=self.timeout
)
response.raise_for_status()
return AgentResponse(
status="success",
data=response.json()
)
except requests.exceptions.RequestException as e:
return AgentResponse(
status="error",
data={},
error=str(e)
)
def stream_query(
self,
agent_id: str,
query: str
) -> None:
"""流式查询接口"""
url = f"{self.base_url}/api/v1/agents/{agent_id}/stream"
payload = {"query": query}
with self.session.post(
url,
json=payload,
stream=True,
timeout=None # 流式请求不超时
) as response:
response.raise_for_status()
for line in response.iter_lines():
if line.startswith("data: "):
data = line[5:] # 移除 "data: " 前缀
if data == "[DONE]":
break
yield json.loads(data)
def batch_query(
self,
queries: list
) -> AgentResponse:
"""批量查询"""
url = f"{self.base_url}/api/v1/agents/batch"
try:
response = self.session.post(
url,
json={"queries": queries},
timeout=self.timeout
)
response.raise_for_status()
return AgentResponse(
status="success",
data=response.json()
)
except requests.exceptions.RequestException as e:
return AgentResponse(
status="error",
data={},
error=str(e)
)
def health_check(self) -> bool:
"""健康检查"""
try:
response = self.session.get(
f"{self.base_url}/health",
timeout=5
)
return response.status_code == 200
except:
return False
# 使用示例
if __name__ == "__main__":
client = HTTPAgentClient(
base_url="http://localhost:8080",
api_key="your-api-key"
)
# 同步查询
result = client.query(
agent_id="rag-agent-1",
query="什么是 RAG?"
)
print(result)
# 流式查询
print("\n流式输出:")
for chunk in client.stream_query(
agent_id="rag-agent-1",
query="介绍一下 Python 的特性"
):
print(chunk.get("chunk", ""), end="", flush=True)
# 批量查询
batch_result = client.batch_query([
{"agent_id": "agent-1", "query": "问题1"},
{"agent_id": "agent-2", "query": "问题2"},
])
print(f"\n批量结果:{batch_result}")
2. HTTP Agent 服务端(Go)
// agent_server.go
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"time"
"github.com/gin-gonic/gin"
)
// QueryRequest 查询请求
type QueryRequest struct {
Query string `json:"query"`
Options map[string]string `json:"options,omitempty"`
}
// QueryResponse 查询响应
type QueryResponse struct {
ID string `json:"id"`
Answer string `json:"answer"`
Sources []string `json:"sources,omitempty"`
Usage *Usage `json:"usage,omitempty"`
}
// Usage 使用统计
type Usage struct {
PromptTokens int64 `json:"prompt_tokens"`
CompletionTokens int64 `json:"completion_tokens"`
TotalTokens int64 `json:"total_tokens"`
}
// AgentService Agent 服务
type AgentService struct {
pythonAgentURL string
}
// NewAgentService 创建服务
func NewAgentService(pythonURL string) *AgentService {
return &AgentService{
pythonAgentURL: pythonURL,
}
}
// QueryHandler 处理查询
func (s *AgentService) QueryHandler(c *gin.Context) {
agentID := c.Param("id")
var req QueryRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// 转发到 Python Agent 服务
response, err := s.forwardToPythonAgent(agentID, req.Query)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, response)
}
// forwardToPythonAgent 转发到 Python Agent
func (s *AgentService) forwardToPythonAgent(agentID, query string) (*QueryResponse, error) {
// 实现 gRPC 或 HTTP 调用
// 这里简化返回模拟数据
return &QueryResponse{
ID: generateID(),
Answer: "这是模拟的响应",
Usage: &Usage{TotalTokens: 100},
}, nil
}
// StreamHandler 流式处理
func (s *AgentService) StreamHandler(c *gin.Context) {
agentID := c.Param("id")
var req QueryRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// 设置 SSE 头部
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
flusher, ok := c.Writer.(http.Flusher)
if !ok {
c.String(http.StatusInternalServerError, "Streaming unsupported")
return
}
// 模拟流式输出
chunks := []string{"这", "是", "一", "个", "流", "式", "响", "应"}
for _, chunk := range chunks {
event := fmt.Sprintf("data: %s\n", map[string]any{
"chunk": chunk,
"is_final": false,
})
c.Writer.WriteString(event)
flusher.Flush()
time.Sleep(100 * time.Millisecond)
}
// 发送结束事件
c.Writer.WriteString("data: [DONE]\n")
flusher.Flush()
}
// HealthHandler 健康检查
func (s *AgentService) HealthHandler(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"status": "healthy",
"timestamp": time.Now().Format(time.RFC3339),
"version": "1.0.0",
})
}
func main() {
service := NewAgentService("http://localhost:50051")
r := gin.Default()
// 健康检查
r.GET("/health", service.HealthHandler)
// API 路由
api := r.Group("/api/v1")
{
agents := api.Group("/agents")
{
agents.POST("/:id/query", service.QueryHandler)
agents.POST("/:id/stream", service.StreamHandler)
}
}
log.Println("Server starting on :8080")
r.Run(":8080")
}
func generateID() string {
return fmt.Sprintf("req-%d", time.Now().UnixNano())
}
WebSocket 实时通信
1. WebSocket 客户端
# websocket_client.py
import asyncio
import json
import websockets
from typing import Optional, Callable, Dict, Any
class WebSocketAgentClient:
"""WebSocket Agent 通信客户端"""
def __init__(
self,
uri: str,
api_key: Optional[str] = None
):
self.uri = uri
self.api_key = api_key
self.websocket = None
self.message_handlers: Dict[str, Callable] = {}
def connect(self) -> None:
"""连接 WebSocket"""
async with websockets.connect(self.uri) as websocket:
self.websocket = websocket
# 发送认证消息
if self.api_key:
await self.send({
"type": "auth",
"token": self.api_key
})
# 启动消息处理循环
await self._message_loop()
async def _message_loop(self):
"""消息处理循环"""
async for message in self.websocket:
try:
data = json.loads(message)
await self._handle_message(data)
except json.JSONDecodeError as e:
print(f"JSON decode error: {e}")
async def _handle_message(self, data: Dict[str, Any]):
"""处理接收到的消息"""
message_type = data.get("type", "")
# 调用对应的消息处理器
handler = self.message_handlers.get(message_type)
if handler:
await handler(data)
def register_handler(self, message_type: str, handler: Callable):
"""注册消息处理器"""
self.message_handlers[message_type] = handler
async def send(self, data: Dict[str, Any]) -> None:
"""发送消息"""
if self.websocket:
await self.websocket.send(json.dumps(data))
async def query(
self,
agent_id: str,
query: str,
on_chunk: Optional[Callable] = None
) -> Dict[str, Any]:
"""发送查询"""
request_id = generate_id()
# 注册响应处理器
if on_chunk:
self.register_handler("chunk", on_chunk)
# 发送查询请求
await self.send({
"type": "query",
"id": request_id,
"agent_id": agent_id,
"query": query
})
return {"request_id": request_id}
async def stream_query(
self,
agent_id: str,
query: str
) -> None:
"""流式查询"""
chunks = []
async def handle_chunk(data: Dict[str, Any]):
if data.get("is_final"):
# 最终响应
result = "".join(chunks)
print(f"\n最终结果:{result}")
else:
# 流式块
chunk = data.get("chunk", "")
chunks.append(chunk)
print(chunk, end="", flush=True)
# 注册流式处理器
self.register_handler("chunk", handle_chunk)
# 发送流式查询请求
await self.send({
"type": "stream_query",
"agent_id": agent_id,
"query": query
})
async def notify(self, event: str, data: Dict[str, Any]) -> None:
"""发送通知"""
await self.send({
"type": "notify",
"event": event,
"data": data
})
# 使用示例
async def main():
client = WebSocketAgentClient(
uri="ws://localhost:8080/ws",
api_key="your-api-key"
)
# 连接 WebSocket
print("正在连接...")
await client.connect()
# 生成唯一 ID
def generate_id():
import uuid
return str(uuid.uuid4())
# 运行
if __name__ == "__main__":
asyncio.run(main())
2. WebSocket 服务端(Go)
// websocket_server.go
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
// WSMessage WebSocket 消息
type WSMessage struct {
Type string `json:"type"`
ID string `json:"id,omitempty"`
AgentID string `json:"agent_id,omitempty"`
Query string `json:"query,omitempty"`
Chunk string `json:"chunk,omitempty"`
IsFinal bool `json:"is_final,omitempty"`
Data map[string]interface{} `json:"data,omitempty"`
Event stringData `json:"event,omitempty"`
}
// WSUpgrader WebSocket 升级器
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true // 生产环境需要验证来源
},
}
// WebSocketManager WebSocket 连接管理器
type WebSocketManager struct {
connections map[string]*websocket.Conn
mu sync.RWMutex
}
// NewWebSocketManager 创建管理器
func NewWebSocketManager() *WebSocketManager {
return &WebSocketManager{
connections: make(map[string]*websocket.Conn),
}
}
// AddConnection 添加连接
func (m *WebSocketManager) AddConnection(id string, conn *websocket.Conn) {
m.mu.Lock()
defer m.mu.Unlock()
m.connections[id] = conn
log.Printf("Connection added: %s (total: %d)", id, len(m.connections))
}
// RemoveConnection 移除连接
func (m *WebSocketManager) RemoveConnection(id string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.connections, id)
log.Printf("Connection removed: %s (total: %d)", id, len(m.connections))
}
// SendToConnection 发送到指定连接
func (m *WebSocketManager) SendToConnection(id string, message WSMessage) error {
m.mu.RLock()
defer m.mu.RUnlock()
conn, ok := m.connections[id]
if !ok {
return fmt.Errorf("connection not found: %s", id)
}
data, err := json.Marshal(message)
if err != nil {
return err
}
return conn.WriteMessage(websocket.TextMessage, data)
}
// Broadcast 广播消息
func (m *WebSocketManager) Broadcast(message WSMessage) {
m.mu.RLock()
defer m.mu.RUnlock()
data, _ := json.Marshal(message)
for id, conn := range m.connections {
err := conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
log.Printf("Error broadcasting to %s: %v", id, err)
}
}
}
// WSHandler WebSocket 处理器
func WSHandler(ws *websocket.Conn, manager *WebSocketManager) {
connID := generateConnID()
manager.AddConnection(connID, ws)
defer manager.RemoveConnection(connID)
log.Printf("WebSocket connected: %s", connID)
// 消息处理循环
for {
messageType, data, err := ws.ReadMessage()
if err != nil {
log.Printf("Read error: %v", err)
break
}
if messageType == websocket.TextMessage {
var msg WSMessage
if err := json.Unmarshal(data, &msg); err != nil {
log.Printf("JSON unmarshal error: %v", err)
continue
}
go handleWSMessage(connID, msg, manager, ws)
}
}
}
// handleWSMessage 处理 WebSocket 消息
func handleWSMessage(connID string, msg WSMessage, manager *WebSocketManager, ws *websocket.Conn) {
log.Printf("Received message from %s: type=%s", connID, msg.Type)
switch msg.Type {
case "auth":
// 处理认证
manager.SendToConnection(connID, WSMessage{
Type: "auth_result",
Data: map[string]interface{}{
"success": true,
"user_id": connID,
},
})
case "query":
// 处理查询
handleQuery(connID, msg, manager)
case "stream_query":
// 处理流式查询
handleStreamQuery(connID, msg, manager)
default:
log.Printf("Unknown message type: %s", msg.Type)
}
}
// handleQuery 处理查询
func handleQuery(connID string, msg WSMessage, manager *WebSocketManager) {
// 模拟处理查询
time.Sleep(1 * time.Second)
response := WSMessage{
Type: "query_result",
ID: msg.ID,
Data: map[string]interface{}{
"answer": "这是查询的模拟响应",
"usage": map[string]int{
"total_tokens": 100,
},
},
}
err := manager.SendToConnection(connID, response)
if err != nil {
log.Printf("Error sending response: %v", err)
}
}
// handleStreamQuery 处理流式查询
func handleStreamQuery(connID string, msg WSMessage, manager *WebSocketManager) {
chunks := []string{"这", "是", "一", "个", "流", "式", "响", "应"}
for i, chunk := range chunks {
time.Sleep(200 * time.Millisecond)
response := WSMessage{
Type: "chunk",
Chunk: chunk,
IsFinal: i == len(chunks)-1,
}
err := manager.SendToConnection(connID, response)
if err != nil {
log.Printf("Error sending chunk: %v", err)
break
}
}
}
// WebSocketHandler gin 包装器
func WebSocketHandlerWrapper(manager *WebSocketManager) gin.HandlerFunc {
return func(c *gin.Context) {
upgrader.CheckOrigin = func(r *http.Request) bool {
return true
}
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Printf("WebSocket upgrade error: %v", err)
return
}
WSHandlerHandler(ws, manager)
}
}
func generateConnID() string {
return fmt.Sprintf("conn-%d", time.Now().UnixNano())
}
func main() {
manager := NewWebSocketManager()
r := gin.Default()
// WebSocket 路由
r.GET("/ws", WebSocketHandlerWrapper(manager))
// 健康检查
r.GET("/health", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"status": "healthy",
"timestamp": time.Now().Format(time.RFC3339),
"ws_connections": len(manager.connections),
})
})
log.Println("Server starting on :8080")
r.Run(":8080")
}
gRPC 高性能通信
1. Protocol Buffers 定义
// agent.proto
syntax = "proto3";
package agent;
// AgentService Agent 服务
service AgentService {
// 同步查询
rpc Query(QueryRequest) returns (QueryResponse);
// 流式查询
rpc StreamQuery(QueryRequest) returns (stream QueryChunk);
// 批量查询
rpc BatchQuery(BatchQueryRequest) returns (BatchQueryResponse);
// 双向流式通信
rpc BidirectionalStream(stream StreamMessage) returns (stream StreamMessage);
// 健康检查
rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse);
}
// QueryRequest 查询请求
message QueryRequest {
string session_id = 1;
string agent_id = 2;
string query = 3;
map<string, string> options = 4;
}
// QueryResponse 查询响应
message QueryResponse {
string id = 1;
string answer = 2;
repeated Source sources = 3;
Usage usage = 4;
int64 latency_ms = 5;
}
// Source 来源信息
message Source {
string id = 1;
string content = 2;
double score = 3;
map<string, string> metadata = 4;
}
// Usage 使用统计
message Usage {
int64 prompt_tokens = 1;
int64 completion_tokens = 2;
int64 total_tokens = 3;
}
// QueryChunk 查询块
message QueryChunk {
string id = 1;
string chunk = 2;
bool is_final = 3;
}
// BatchQueryRequest 批量查询请求
message BatchQueryRequest {
repeated QueryRequest requests = 1;
}
// BatchQueryResponse 批量查询响应
message BatchQueryResponse {
repeated QueryResponse responses = 1;
}
// StreamMessage 流式消息
message StreamMessage {
string type = 1;
string content = 2;
map<string, string> metadata = 3;
}
// HealthCheckRequest 健康检查请求
message HealthCheckRequest {}
// HealthCheckResponse 健康检查响应
message HealthCheckResponse {
string status = 1;
string timestamp = 2;
map<string, string> info = 3;
}
2. gRPC 服务端(Python)
# grpc_server.py
import asyncio
import time
from typing import Iterator, AsyncIterator
import grpc
from concurrent import futures
import agent_pb2
import agent_pb2_grpc
class AgentServicer(agent_pb2_grpc.AgentServiceServicer):
"""Agent 服务实现"""
def Query(
self,
request: agent_pb2.QueryRequest,
context: grpc.ServicerContext
) -> agent_pb2.QueryResponse:
"""同步查询"""
print(f"Received query: {request.query}")
start_time = time.time()
# 模拟处理
answer = f"这是对 '{request.query}' 的响应"
return agent_pb2.QueryResponse(
id=str(time.time_ns()),
answer=answer,
sources=[
agent_pb2.Source(
id="source-1",
content="这是来源内容",
score=0.95,
metadata={"type": "text"}
)
],
usage=agent_pb2.Usage(
prompt_tokens=len(request.query),
completion_tokens=len(answer),
total_tokens=len(request.query) + len(answer)
),
latency_ms=int((time.time() - start_time) * 1000)
)
def StreamQuery(
self,
request: agent_pb2.QueryRequest,
context: grpc.ServicerContext
) -> Iterator[agent_pb2.QueryChunk]:
"""流式查询"""
print(f"Streaming query: {request.query}")
# 模拟流式输出
chunks = ["这", "是", "一", "个", "流", "式", "响", "应"]
for i, chunk in enumerate(chunks):
yield agent_pb2.QueryChunk(
id=str(time.time_ns()),
chunk=chunk,
is_final=(i == len(chunks) - 1)
)
time.sleep(0.2)
def BatchQuery(
self,
request: agent_pb2.BatchQueryRequest,
context: grpc.ServicerContext
) -> agent_pb2.BatchQueryResponse:
"""批量查询"""
print(f"Batch query with {len(request.requests)} requests")
responses = []
for req in request.requests:
response = self.Query(req, context)
responses.append(response)
return agent_pb2.BatchQueryResponse(responses=responses)
def BidirectionalStream(
self,
request_iterator: AsyncIterator[agent_pb2.StreamMessage],
context: grpc.ServicerContext
) -> AsyncIterator[agent_pb2.StreamMessage]:
"""双向流式通信"""
print("Bidirectional stream started")
async def process_messages():
async for request_msg in request_iterator:
print(f"Received: {request_msg.type} - {request_msg.content}")
# 响应
yield agent_pb2.StreamMessage(
type="response",
content=f"Echo: {request_msg.content}"
)
return process_messages()
def HealthCheck(
self,
request: agent_pb2.HealthCheckRequest,
context: grpc.ServicerContext
) -> agent_pb2.HealthCheckResponse:
"""健康检查"""
return agent_pb2.HealthCheckResponse(
status="healthy",
timestamp=time.time().isoformat(),
info={"version": "1.0.0", "service": "agent"}
)
async def serve():
"""启动 gRPC 服务"""
server = grpc.aio.server()
agent_pb2_grpc.add_AgentServiceServicer_to_server(
AgentServicer(),
server
)
server.add_insecure_port(
"[::]:50051",
grpc.local_server_credentials(
grpc.ssl_server_credentials(
open("server.key", "rb").read(),
open("server.crt", "rb").read()
)
)
)
print("Server starting on port 50051")
await server.start()
await server.wait_for_termination()
if __name__ == "__main__":
asyncio.run(serve())
3. gRPC 客户端(Python)
# grpc_client.py
import asyncio
import grpc
import agent_pb2
import agent_pb2_grpc
class AgentClient:
"""Agent 客户端"""
def __init__(
self,
host: str = "localhost",
port: int = 50051,
use_ssl: bool = False
):
self.host = host
self.port = port
self.use_ssl = use_ssl
def _create_channel(self) -> grpc.Channel:
"""创建 gRPC 通道"""
if self.use_ssl:
credentials = grpc.ssl_channel_credentials(
grpc.ssl_client_credentials(
open("client.crt", "rb").read(),
open("client.key", "rb").read()
)
)
return grpc.secure_channel(
f"{self.host}:{self.port}",
credentials
)
else:
return grpc.insecure_channel(f"{self.host}:{self.port}")
def query(
self,
agent_id: str,
query: str,
session_id: str = None
) -> agent_pb2.QueryResponse:
"""同步查询"""
channel = self._create_channel()
stub = agent_pb2_grpc.AgentServiceStub(channel)
request = agent_pb2.QueryRequest(
session_id=session_id or "",
agent_id=agent_id,
query=query
)
try:
response = stub.Query(request)
print(f"Answer: {response.answer}")
print(f"Latency: {response.latency_ms}ms")
return response
except grpc.RpcError as e:
print(f"gRPC error: {e}")
raise
def stream_query(
self,
agent_id: str,
query: str
) -> None:
"""流式查询"""
channel = self._create_channel()
stub = agent_pb2_grpc.AgentServiceStub(channel)
request = agent_pb2.QueryRequest(
agent_id=agent_id,
query=query
)
try:
print("Streamed response:")
for chunk in stub.StreamQuery(request):
print(chunk.chunk, end="", flush=True)
if chunk.is_final:
print("\nStream completed")
except grpc.RpcError as e:
print(f"gRPC error: {e}")
raise
async def bidirectional_stream(
self,
messages: list
) -> None:
"""双向流式通信"""
channel = self._create_channel()
stub = agent_pb2_grpc.AgentServiceStub(channel)
async def send_and_receive():
call = stub.BidirectionalStream()
# 发送消息的协程
async def sender():
for msg in messages:
await call.write(agent_pb2.StreamMessage(
type="request",
content=msg
))
print(f"Sent: {msg}")
await call.done_writing()
# 接收响应
async for response in call:
print(f"Received: {response.type} - {response.content}")
await sender()
await send_and_receive()
def batch_query(
self,
queries: list
) -> agent_pb2.BatchQueryResponse:
"""批量查询"""
channel = self._create_channel()
stub = agent_pb2_grpc.AgentServiceStub(channel)
requests = []
for q in queries:
requests.append(agent_pb2.QueryRequest(
agent_id=q.get("agent_id", ""),
query=q["query"]
))
batch_request = agent_pb2.BatchQueryRequest(requests=requests)
try:
response = stub.BatchQuery(batch_request)
print(f"Received {len(response.responses)} responses")
return response
except grpc.RpcError as e:
print(f"gRPC error: {e}")
raise
def health_check(self) -> bool:
"""健康检查"""
channel = self._create_channel()
stub = agent_pb2_grpc.AgentServiceStub(channel)
try:
response = stub.HealthCheck(agent_pb2.HealthCheckRequest())
return response.status == "healthy"
except grpc.RpcError:
return False
# 使用示例
if __name__ == "__main__":
client = AgentClient()
# 同步查询
print("=== Synchronous Query ===")
client.query(agent_id="rag-agent-1", query="什么是 RAG?")
# 流式查询
print("\n=== Streaming Query ===")
client.stream_query(agent_id="rag-agent-1", query="介绍一下 Python")
# 批量查询
print("\n=== Batch Query ===")
client.batch_query([
{"agent_id": "agent-1", "query": "问题1"},
{"agent_id": "agent-2", "query": "问题2"},
])
# 双向流式
print("\n=== Bidirectional Stream ===")
asyncio.run(client.bidirectional_stream([
"你好",
"介绍一下你的能力",
"再见"
]))
# 健康检查
print("\n=== Health Check ===")
is_healthy = client.health_check()
print(f"Healthy: {is_healthy}")
MCP 协议
1. MCP 概述
【MCP (Model Context Protocol)】
定义:模型上下文协议,用于 Agent 与 LLM 之间
的标准化通信。
核心概念:
1. Context:上下文信息
2. Capability:能力声明
3. Tool:工具定义
4. Resource:资源访问
优势:
- 标准化通信
- 类型安全
- 易于扩展
2. MCP 消息格式
# mcp_protocol.py
"""MCP 协议定义"""
from typing import Dict, List, Any, Optional
from enum import Enum
from dataclasses import dataclass
class MessageType(Enum):
"""消息类型"""
CONTEXT_REQUEST = "context_request"
CONTEXT_RESPONSE = "context_response"
CAPABILITY_REQUEST = "capability_request"
CAPABILITY_RESPONSE = "capability_response"
TOOL_CALL = "tool_call"
TOOL_RESULT = "tool_result"
ERROR = "error"
@dataclass
class MCPMessage:
"""MCP 消息"""
type: MessageType
id: str
data: Optional[Dict[str, Any]] = None
error: Optional[str] = None
@dataclass
class ContextRequest:
"""上下文请求"""
session_id: str
query: str
context_type: str # "query", "document", "conversation"
@dataclass
class ContextResponse:
"""上下文响应"""
session_id: str
context: Dict[str, Any]
capabilities: List[str]
metadata: Dict[str, Any]
@dataclass
class ToolCall:
"""工具调用"""
tool_name: str
parameters: Dict[str, Any]
context: Optional[Dict[str, Any]]
@dataclass
class ToolResult:
"""工具结果"""
tool_name: str
result: Any
error: Optional[str] = None
metadata: Optional[Dict[str, Any]]
# Capability 定义
CAPABILITIES = {
"text_generation": {
"name": "Text Generation",
"description": "Generate text based on context",
"parameters": {
"prompt": {"type": "string", "required": True},
"max_tokens": {"type": "integer", "default": 512}
}
},
"code_execution": {
"name": "Code Execution",
"description": "Execute code safely",
"parameters": {
"code": {"type": "string", "required": True},
"language": {"type": "string", "required": True}
}
},
"rag_query": {
"name": "RAG Query",
"description": "Query with retrieval augmentation",
"parameters": {
"query": {"type": "string", "required": True},
"top_k": {"type": "integer", "default": 5}
}
}
}
3. MCP Agent 实现
# mcp_agent.py
import json
from typing import Dict, Any, List
import uuid
class MCPAgent:
"""MCP Agent 实现"""
def __init__(self, name: str, capabilities: List[str]):
self.name = name
self.capabilities = capabilities
self.tools: Dict[str, Any] = {}
def register_tool(self, name: str, handler: Any):
"""注册工具"""
self.tools[name] = handler
def handle_message(self, message: str) -> str:
"""处理 MCP 消息"""
try:
data = json.loads(message)
msg_type = data.get("type")
if msg_type == "context_request":
return self._handle_context_request(data)
elif msg_type == "tool_call":
return self._handle_tool_call(data)
else:
return self._error(f"Unknown message type: {msg_type}")
except Exception as e:
return self._error(str(e))
def _handle_context_request(self, data: Dict) -> str:
"""处理上下文请求"""
request_id = data.get("id")
# 构建上下文响应
context = {
"agent_name": self.name,
"session_id": data.get("session_id"),
"query": data.get("query"),
"available_tools": list(self.tools.keys()),
"capabilities": self.capabilities
}
response = {
"type": "context_response",
"id": request_id,
"data": {
"context": context,
"capabilities": self.capabilities
}
}
return json.dumps(response)
def _handle_tool_call(self, data: Dict) -> str:
"""处理工具调用"""
request_id = data.get("id")
tool_name = data.get("data", {}).get("tool_name")
parameters = data.get("data", {}).get("parameters")
if tool_name not in self.tools:
return self._error(f"Tool not found: {tool_name}")
# 执行工具
try:
result = self.tools[tool_name](**parameters)
except Exception as e:
return self._error(f"Tool execution error: {e}")
response = {
"type": "tool_result",
"id": request_id,
"data": {
"tool_name": tool_name,
"result": result,
"success": True
}
}
return json.dumps(response)
def _error(self, error_message: str) -> str:
"""错误响应"""
response = {
"type": "error",
"id": str(uuid.uuid4()),
"error": error_message
}
return json.dumps(response)
# 使用示例
if __name__ == "__main__":
# 创建 Agent
agent = MCPAgent(
name="RAG Agent",
capabilities=["rag_query", "text_generation"]
)
# 注册工具
def search_tool(query: str, top_k: int = 5) -> Dict:
return {
"results": [
{"id": "1", "text": "相关文档1"},
{"id": "2", "text": "相关文档2"}
],
"top_k": top_k
}
agent.register_tool("search", search_tool)
# 处理消息
context_request = {
"type": "context_request",
"id": "req-1",
"session_id": "session-1",
"query": "搜索相关信息"
}
response = agent.handle_message(json.dumps(context_request))
print(response)
消息队列通信
1. Redis Stream 实现
# redis_messaging.py
import json
import time
import redis
from typing import Dict, Any, Callable, Optional
class RedisMessaging:
"""基于 Redis Stream 的消息传递"""
def __init__(
self,
redis_host: str = "localhost",
redis_port: int = 6379,
stream_name: str = "agent_messages"
):
self.redis = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=True
)
self.stream_name = stream_name
self.consumer_groups: Dict[str, Any] = {}
def publish(self, message_type: str, data: Dict[str, Any]) -> str:
"""发布消息"""
message_id = str(time.time_ns())
message = {
"id": message_id,
"type": message_type,
"data": data,
"timestamp": time.time().isoformat()
}
self.redis.xadd(
self.stream_name,
{"message": json.dumps(message)},
"*"
)
return message_id
def subscribe(
self,
group_name: str,
consumer_name: str,
callback: Callable[[Dict], None]
) -> None:
"""订阅消息"""
# 创建消费者组
try:
self.redis.xgroup_create(self.stream_name, group_name, "0")
except redis.ResponseError:
pass # 组已存在
# 创建消费者
self.redis.xgroup_create_consumer(
self.stream_name,
group_name,
consumer_name
)
# 添加到管理器
if group_name not in self.consumer_groups:
self.consumer_groups[group_name] = []
# 启动监听
self._start_listener(group_name, consumer_name, callback)
def _start_listener(
self,
group_name: str,
consumer_name: str,
callback: Callable[[Dict], None]
) -> None:
"""启动监听器"""
def listener():
last_id = "0"
while True:
try:
# 读取新消息
messages = self.redis.xreadgroup(
self.stream_name,
group_name,
{consumer_name: ">"},
count=10,
block=5000 # 5秒超时
)
if messages:
for message in messages:
data = json.loads(message["message"])
callback(data)
last_id = message["id"]
# 确认已处理
if last_id != "0":
self.redis.xack(
self.stream_name,
group_name,
last_id
)
except redis.ConnectionError:
time.sleep(1)
continue
import threading
thread = threading.Thread(target=listener, daemon=True)
thread.start()
def get_queue_info(self) -> Dict:
"""获取队列信息"""
groups = self.redis.xinfo_groups(self.stream_name)
info = {
"stream": self.stream_name,
"groups": groups
}
return info
# 使用示例
if __name__ == "__main__":
messaging = RedisMessaging(stream_name="agent_messages")
# 消费者回调
def on_message(message: Dict):
print(f"Received message: {message}")
# 订阅
messaging.subscribe("agent_group", "agent_1", on_message)
# 发布
messaging.publish("query", {
"agent_id": "agent-1",
"query": "这是查询消息"
})
time.sleep(2) # 等待消息处理
共享内存通信
1. Redis 共享状态
# redis_shared_state.py
import json
import redis
from typing import Dict, Any, Optional
class SharedStateManager:
"""共享状态管理器(Redis)"""
def __init__(
self,
redis_host: str = "localhost",
redis_port: int = 6379
):
self.redis = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=True
)
def set_state(
self,
session_id: str,
state: Dict[str, Any],
ttl: int = 3600
) -> bool:
"""设置状态"""
key = f"state:{session_id}"
try:
self.redis.setex(
key,
json.dumps(state),
ttl
)
return True
except Exception as e:
print(f"Error setting state: {e}")
return False
def get_state(self, session_id: str) -> Optional[Dict[str, Any]]:
"""获取状态"""
key = f"state:{session_id}"
try:
data = self.redis.get(key)
if data:
return json.loads(data)
return None
except Exception as e:
print(f"Error getting state: {e}")
return None
def update_state(
self,
session_id: str,
updates: Dict[str, Any]
) -> bool:
"""更新状态(原子操作)"""
with self.redis.pipeline() as pipe:
pipe.get(f"state:{session_id}")
pipe.execute()
current_data = pipe.results[0]
if not current_data:
return False
try:
current_state = json.loads(current_data)
current_state.update(updates)
self.redis.set(
f"state:{session_id}",
json.dumps(current_state)
)
return True
except Exception as e:
print(f"Error updating state: {e}")
return False
def append_to_history(
self,
session_id: str,
message: Dict[str, Any],
max_history: int = 100
) -> bool:
"""添加到历史"""
with self.redis.pipeline() as pipe:
pipe.lpush(f"history:{session_id}", json.dumps(message))
pipe.ltrim(f"history:{session_id}", 0, max_history - 1)
pipe.execute()
return True
def get_history(
self,
session_id: str
) -> list:
"""获取历史"""
try:
history_data = self.redis.lrange(f"history:{session_id}", 0, -1)
return [json.loads(item) for item in history_data]
except Exception as e:
print(f"Error getting history: {e}")
return []
# 使用示例
if __name__ == "__main__":
manager = SharedStateManager()
# 设置初始状态
session_id = "session-123"
manager.set_state(session_id, {
"status": "active",
"current_step": "init",
"agent_id": "agent-1"
})
# 更新状态
manager.update_state(session_id, {
"current_step": "processing",
"query": "用户查询"
})
# 获取状态
state = manager.get_state(session_id)
print(f"Current state: {state}")
实战案例
案例:Agent 协作系统
# agent_collaboration.py
"""
完整的 Agent 协作系统
使用:WebSocket 实时通信 + Redis 消息队列 + 共享状态
"""
from typing import Dict, Any, List
import json
import asyncio
class AgentCollaborator:
"""Agent 协作器"""
def __init__(self):
self.agents: Dict[str, Any] = {}
self.messaging = RedisMessaging(stream_name="agent_collaboration")
self.state_manager = SharedStateManager()
self.active_collaborations: Dict[str, Any] = {}
def register_agent(self, agent_id: str, agent: Any):
"""注册 Agent"""
self.agents[agent_id] = agent
# 订阅消息
self.messaging.subscribe(
f"agent_{agent_id}",
f"consumer_{agent_id}",
lambda msg: self._handle_agent_message(agent_id, msg)
)
def start_collaboration(
self,
collaboration_id: str,
task: str,
participants: List[str]
) -> Dict[str, Any]:
"""启动协作"""
# 创建协作状态
collaboration_state = {
"id": collaboration_id,
"task": task,
"participants": participants,
"status": "started",
"current_step": 0,
"results": {}
}
self.state_manager.set_state(
collaboration_id,
collaboration_state
)
# 通知所有参与者
for agent_id in participants:
self.messaging.publish(
"task_assignment",
{
"collaboration_id": collaboration_id,
"agent_id": agent_id,
"task": task,
"role": self._get_agent_role(agent_id)
}
)
return collaboration_state
def _handle_agent_message(self, agent_id: str, message: Dict):
"""处理 Agent 消息"""
msg_type = message.get("type")
if msg_type == "task_result":
# 处理任务结果
self._handle_task_result(agent_id, message)
elif msg_type == "status_update":
# 处理状态更新
self._handle_status_update(agent_id, message)
def _handle_task_result(self, agent_id: str, message: Dict):
"""处理任务结果"""
collaboration_id = message.get("collaboration_id")
result = message.get("result")
# 更新协作状态
state = self.state_manager.get_state(collaboration_id)
if state:
state["results"][agent_id] = result
state["current_step"] += 1
self.state_manager.set_state(collaboration_id, state)
# 检查是否完成
if self._is_collaboration_complete(state):
self._finalize_collaboration(collaboration_id)
def _is_collaboration_complete(self, state: Dict) -> bool:
"""检查协作是否完成"""
return len(state["results"]) == len(state["participants"])
def _finalize_collaboration(self, collaboration_id: str):
"""完成协作"""
state = self.state_manager.get_state(collaboration_id)
state["status"] = "completed"
self.state_manager.set_state(collaboration_id, state)
print(f"Collaboration {collaboration_id} completed!")
print(f"Results: {state['results']}")
def _get_agent_role(self, agent_id: str) -> str:
"""获取 Agent 角色"""
# 简化实现:根据 agent_id 返回角色
if "analyst" in agent_id:
return "analyst"
elif "writer" in agent_id:
return "writer"
return "participant"
# 使用示例
if __name__ == "__main__":
collaborator = AgentCollaborator()
# 注册 Agents
collaborator.register_agent("analyst-1", None)
collaborator.register_agent("writer-1", None)
collaborator.register_agent("reviewer-1", None)
# 启动协作
collaboration = collaborator.start_collaboration(
collaboration_id="collab-1",
task="分析市场趋势并撰写报告",
participants=["analyst-1", "writer-1", "reviewer-1"]
)
print(f"Started collaboration: {collaboration['id']}")
面试高频问法
Q1: Agent 间通信方式如何选择?
【标准回答】
选择标准:
1. 同步调用(gRPC)
- 场景:高频、低延迟
- 优势:性能高、类型安全
- 缺点:复杂度高
2. 实时交互(WebSocket)
- 场景:流式输出、双向通信
- 优势:实时性好、推送方便
- 缺点:连接管理复杂
3. 事件通知(消息队列)
- 场景:异步解耦、事件驱动
- 优势:可靠、可扩展
- 缺点:延迟较高
4. 状态共享(Redis)
- 场景:共享状态、低延迟
- 优势:简单、高效
- 缺点:需要序列化
决策树:
需要实时交互?
├─ 是 → WebSocket
└─ 否 → 需要高性能?
├─ 是 → gRPC
└─ 否 → 需要解耦?
├─ 是 → 消息队列
└─ 否 → HTTP REST
记忆要点
【通信方式】
HTTP:简单通用
WebSocket:实时双向
gRPC:高性能类型安全
MCP:模型上下文协议
消息队列:异步解耦
共享内存:低延迟状态共享
【HTTP】
RESTful API
流式响应
批量请求
健康检查
【WebSocket】
实时推送
双向通信
流式输出
连接管理
【gRPC】
Protocol Buffers
流式通信
批量处理
类型安全
【MCP】
Context 交换
Capability 声明
Tool 调用
Resource 访问
【消息队列】
发布订阅
持久化
消息路由
消费组
【共享内存】
Redis 共享
原子操作
过期管理
并发安全
文档版本: 1.0
最后更新: 2026-01-21