内容纲要
Go 后端服务开发
目标:掌握使用 Go 开发高性能、高可用的 Agent 后端服务
目录
Go 在 Agent 开发中的定位
Go vs Python 分工
| 维度 | Go | Python | 典型场景 |
|---|---|---|---|
| 性能 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | Go: 高并发服务; Python: 模型推理 |
| 开发效率 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | Go: 生产力工具; Python: 原型开发 |
| 生态 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | Go: Web 框架; Python: AI 生态 |
| 并发 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | Go: Goroutine 轻量级 |
| 部署 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | Go: 单二进制; Python: 容器化 |
典型架构分工
┌─────────────────────────────────────────────────────┐
│ 前端层 │
│ React / Vue / Mobile │
└────────────────────┬──────────────────────────────┘
│
┌────────────────────┴──────────────────────────────┐
│ Go 后端服务层 │
│ ┌────────┬────────┬────────┬────────┐ │
│ │ API GW │ Auth │ Rate │ Stream │ │
│ │ateway │Service │Limiting │ Service │ │
│ └───┬────┴───┬────┴───┬────┴───┬────┘ │
│ │ │ │ │ │
└──────┼────────┼────────┼────────┼───────────────┘
│ │ │ │
┌──────┼────────┼────────┼────────┼───────────────┐
│ ▼ ▼ ▼ ▼ │
│ ┌────────────────────────────────┐ │
│ │ Python Agent 服务层 │ │
│ │ ┌────┬────┬────┬────┐ │ │
│ │ │RAG │Tool │Lang │Auto│ │ │
│ │ │Agt │Mgmt │Chain│Gen │ │ │
│ │ └────┴────┴────┴────┘ │ │
│ └────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
│
┌────────┼───────────────────────────────────────┐
│ ▼ │
│ ┌────────┬────────┬────────┬────────┐ │
│ │ LLM │Vector DB│ Redis │ Postgres│ │
│ └────────┴────────┴────────┴────────┘ │
└─────────────────────────────────────────────────────┘
通信方式
// 1. gRPC 通信(高性能)
syntax = "proto3";
service agent {
rpc Query(QueryRequest) returns (QueryResponse);
rpc StreamQuery(QueryRequest) returns (stream QueryResponse);
}
// 2. HTTP REST 通信(通用)
GET /api/v1/query
POST /api/v1/query
// 3. WebSocket 通信(实时)
ws://api/v1/stream
核心架构设计
1. 分层架构
┌─────────────────────────────────────┐
│ Handler Layer │ ← 请求处理
├─────────────────────────────────────┤
│ Service Layer │ ← 业务逻辑
├─────────────────────────────────────┤
│ Repository Layer │ ← 数据访问
├─────────────────────────────────────┤
│ Infrastructure Layer │ ← 基础设施
└─────────────────────────────────────┘
2. 代码结构
agent-backend/
├── cmd/
│ └── server/
│ └── main.go ← 程序入口
├── internal/
│ ├── api/
│ │ ├── handler/ ← HTTP 处理器
│ │ │ ├── query.go
│ │ │ ├── health.go
│ │ │ └── websocket.go
│ │ └── middleware/ ← 中间件
│ │ ├── auth.go
│ │ ├── logging.go
│ │ └── ratelimit.go
│ ├── service/ ← 业务服务层
│ │ ├── agent_service.go
│ │ ├── rag_service.go
│ │ └── stream_service.go
│ ├── repository/ ← 数据访问层
│ │ ├── cache.go
│ │ ├── db.go
│ │ └── vector_store.go
│ ├── domain/ ← 领域模型
│ │ ├── agent.go
│ │ ├── message.go
│ │ └── session.go
│ └── pkg/ ← 内部包
│ ├── llm/
│ │ └── client.go
│ └── python/
│ └── agent_client.go
├── config/
│ └── config.go ← 配置
├── deployments/
│ ├── docker/
│ │ └── Dockerfile
│ └── k8s/
│ └── deployment.yaml
├── go.mod
├── go.sum
└── README.md
3. 核心模型定义
package domain
import "time"
// Agent Agent 实体
type Agent struct {
ID string `json:"id"`
Name string `json:"name"`
Type string `json:"type"` // "rag", "tool", "chat"
Config map[string]any `json:"config"`
State string `json:"state"` // "active", "inactive"
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// Message 消息实体
type Message struct {
ID string `json:"id"`
SessionID string `json:"session_id"`
Role string `json:"role"` // "user", "assistant", "system"
Content string `json:"content"`
Metadata map[string]any `json:"metadata"`
CreatedAt time.Time `json:"created_at"`
}
// Session 会话实体
type Session struct {
ID string `json:"id"`
UserID string `json:"user_id"`
AgentID string `json:"agent_id"`
State string `json:"state"` // "active", "closed"
Context map[string]any `json:"context"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// QueryRequest 查询请求
type QueryRequest struct {
SessionID string `json:"session_id,omitempty"`
AgentID string `json:"agent_id"`
Query string `json:"query"`
Options map[string]any `json:"options,omitempty"`
}
// QueryResponse 查询响应
type QueryResponse struct {
ID string `json:"id"`
Answer string `json:"answer"`
Sources []Source `json:"sources,omitempty"`
Usage *Usage `json:"usage,omitempty"`
Latency int64 `json:"latency_ms"`
}
// Source 来源信息
type Source struct {
ID string `json:"id"`
Content string `json:"content"`
Score float64 `json:"score"`
Metadata map[string]any `json:"metadata,omitempty"`
}
// Usage 使用统计
type Usage struct {
PromptTokens int64 `json:"prompt_tokens"`
CompletionTokens int64 `json:"completion_tokens"`
TotalTokens int64 `json:"total_tokens"`
}
高性能实现
1. 并发处理
package service
import (
"context"
"sync"
"time"
)
// ConcurrentExecutor 并发执行器
type ConcurrentExecutor struct {
workerCount int
timeout time.Duration
}
func NewConcurrentExecutor(workers int, timeout time.Duration) *ConcurrentExecutor {
return &ConcurrentExecutor{
workerCount: workers,
timeout: timeout,
}
}
// Execute 并行执行任务
func (e *ConcurrentExecutor) Execute[T any](
ctx context.Context,
tasks []func(context.Context) (T, error),
) ([]T, error) {
results := make([]T, len(tasks))
errors := make([]error, len(tasks))
var wg sync.WaitGroup
sem := make(chan struct{}, e.workerCount) // 信号量限制并发数
for i, task := range tasks {
wg.Add(1)
go func(idx int, t func(context.Context) (T, error)) {
defer wg.Done()
sem <- struct{}{} // 获取信号量
defer func() { <-sem }() // 释放信号量
taskCtx, cancel := context.WithTimeout(ctx, e.timeout)
defer cancel()
result, err := t(taskCtx)
results[idx] = result
errors[idx] = err
}(i, task)
}
wg.Wait()
// 检查错误
for _, err := range errors {
if err != nil {
return results, err
}
}
return results, nil
}
// 使用示例:并行检索和意图分析
func (s *AgentService) ParallelProcess(ctx context.Context, query string) (*QueryResponse, error) {
executor := NewConcurrentExecutor(3, 5*time.Second)
// 并行执行多个任务
results, err := executor.Execute(ctx, []func(context.Context) (any, error){
func(ctx context.Context) (any, error) {
return s.vectorStore.Search(ctx, query, 5)
},
func(ctx context.Context) (any, error) {
return s.classifier.Classify(ctx, query)
},
func(ctx context.Context) (any, error) {
return s.cache.Get(ctx, query)
},
})
if err != nil {
return nil, err
}
documents := results[0].([]Document)
intent := results[1].(string)
cached := results[2].(*QueryResponse)
// 合并结果
// ...
}
2. 连接池管理
package pool
import (
"context"
"errors"
"sync"
"time"
)
// Pool 连接池接口
type Pool[T any] interface {
Get(ctx context.Context) (T, error)
Put(T) error
Close() error
}
// GenericPool 通用连接池
type GenericPool[T any] struct {
factory func() (T, error) // 连接工厂
closer func(T) error // 连接关闭
maxIdle int // 最大空闲连接数
maxActive int // 最大活跃连接数
idleTimeout time.Duration // 空闲超时
mu sync.RWMutex
active int // 活跃连接数
idle chan T // 空闲连接队列
closed bool
}
func NewGenericPool[T any](
factory func() (T, error),
closer func(T) error,
maxIdle, maxActive int,
idleTimeout time.Duration,
) *GenericPool[T] {
return &GenericPool[T]{
factory: factory,
closer: closer,
maxIdle: maxIdle,
maxActive: maxActive,
idleTimeout: idleTimeout,
idle: make(chan T, maxIdle),
}
}
func (p *GenericPool[T]) Get(ctx context.Context) (T, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
var zero T
return zero, errors.New("pool is closed")
}
// 检查活跃连接数
if p.active >= p.maxActive {
p.mu.Unlock()
select {
case <-ctx.Done():
var zero T
return zero, ctx.Err()
case <-time.After(100 * time.Millisecond):
// 重试
}
p.mu.Lock()
}
// 尝试从空闲队列获取
select {
case conn := <-p.idle:
p.active++
return conn, nil
default:
// 创建新连接
conn, err := p.factory()
if err != nil {
var zero T
return zero, err
}
p.active++
return conn, nil
}
}
func (p *GenericPool[T]) Put(conn T) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return p.closer(conn)
}
p.active--
// 检查空闲队列是否已满
select {
case p.idle <- conn:
return nil
default:
return p.closer(conn)
}
}
func (p *GenericPool[T]) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
p.closed = true
close(p.idle)
var lastErr error
for conn := range p.idle {
if err := p.closer(conn); err != nil {
lastErr = err
}
}
return lastErr
}
3. 缓存优化
package cache
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
)
// Cache 缓存接口
type Cache interface {
Get(ctx context.Context, key string) (any, error)
Set(ctx context.Context, key string, value any, ttl time.Duration) error
Delete(ctx context.Context, key string) error
}
// MultiLevelCache 多级缓存
type MultiLevelCache struct {
l1 *MemoryCache // L1: 内存缓存
l2 Cache // L2: Redis 缓存
}
func NewMultiLevelCache(l1Size int, l2 Cache) *MultiLevelCache {
return &MultiLevelCache{
l1: NewMemoryCache(l1Size),
l2: l2,
}
}
func (c *MultiLevelCache) Get(ctx context.Context, key string) (any, error) {
// L1 缓存命中
if value, found := c.l1.Get(key); found {
return value, nil
}
// L2 缓存命中
value, err := c.l2.Get(ctx, key)
if err == nil {
// 回写 L1
c.l1.Set(key, value, 5*time.Minute)
return value, nil
}
return nil, err
}
func (c *MultiLevelCache) Set(ctx context.Context, key string, value any, ttl time.Duration) error {
// 同时写入 L1 和 L2
c.l1.Set(key, value, ttl)
if err := c.l2.Set(ctx, key, value, ttl); err != nil {
return fmt.Errorf("l2 set error: %w", err)
}
return nil
}
// MemoryCache 内存缓存(LRU)
type MemoryCache struct {
mu sync.RWMutex
data map[string]*cacheItem
lru *list.List
maxAge time.Duration
maxLen int
}
type cacheItem struct {
value any
expires time.Time
element *list.Element
}
func NewMemoryCache(maxLen int) *MemoryCache {
return &MemoryCache{
data: make(map[string]*cacheItem),
lru: list.New(),
maxAge: 5 * time.Minute,
maxLen: maxLen,
}
}
func (c *MemoryCache) Get(key string) (any, bool) {
c.mu.Lock()
defer c.mu.Unlock()
item, found := c.data[key]
if !found {
return nil, false
}
// 检查过期
if time.Now().After(item.expires) {
c.remove(key)
return nil, false
}
// 更新 LRU
c.lru.MoveToFront(item.element)
return item.value, true
}
func (c *MemoryCache) Set(key string, value any, ttl time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
if item, found := c.data[key]; found {
// 更新现有
item.value = value
item.expires = time.Now().Add(ttl)
c.lru.MoveToFront(item.element)
return
}
// 新增
if len(c.data) >= c.maxLen {
// 淘汰最旧的
c.evict()
}
item := &cacheItem{
value: value,
expires: time.Now().Add(ttl),
element: c.lru.PushFront(key),
}
c.data[key] = item
}
4. 流式响应
package handler
import (
"encoding/json"
"fmt"
"net/http"
"time"
)
// StreamHandler 流式处理器
func (h *Handler) StreamHandler(w http.ResponseWriter, r *http.Request) {
// 设置 SSE 头部
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// 获取 flusher
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
// 启动流式生成
ctx := r.Context()
resultChan := h.agentService.StreamQuery(ctx, r.URL.Query().Get("query"))
// 发送事件
for chunk := range resultChan {
event := fmt.Sprintf("data: %s\n\n", chunk)
if _, err := fmt.Fprint(w, event); err != nil {
return
}
flusher.Flush()
}
// 发送结束事件
fmt.Fprint(w, "data: [DONE]\n\n")
flusher.Flush()
}
// StreamQuery 流式查询
func (s *AgentService) StreamQuery(ctx context.Context, query string) <-chan string {
resultChan := make(chan string)
go func() {
defer close(resultChan)
// 调用 Python Agent 服务获取流式结果
chunks := s.pythonClient.StreamQuery(ctx, query)
for chunk := range chunks {
select {
case resultChan <- chunk:
case <-ctx.Done():
return
}
}
}()
return resultChan
}
高可用实现
1. 熔断器
package circuitbreaker
import (
"context"
"errors"
"sync"
"time"
)
var (
ErrOpenState = errors.New("circuit breaker is open")
ErrHalfOpenWait = errors.New("circuit breaker is half-open")
)
// State 熔断器状态
type State string
const (
StateClosed State = "closed" // 正常状态
StateOpen State = "open" // 熔断状态
StateHalfOpen State = "half-open" // 半开状态
)
// CircuitBreaker 熔断器
type CircuitBreaker struct {
maxFailures int // 最大失败次数
resetTimeout time.Duration // 重置超时
halfOpenTimeout time.Duration // 半开状态超时
mu sync.RWMutex
state State
failures int
lastFailTime time.Time
nextAttempt time.Time
}
func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
maxFailures: maxFailures,
resetTimeout: resetTimeout,
halfOpenTimeout: time.Minute,
state: StateClosed,
}
}
func (cb *CircuitBreaker) Call(ctx context.Context, fn func() error) error {
if !cb.allowRequest() {
return ErrOpenState
}
err := fn()
cb.recordResult(err)
return err
}
func (cb *CircuitBreaker) allowRequest() bool {
cb.mu.Lock()
defer cb.mu.Unlock()
now := time.Now()
switch cb.state {
case StateClosed:
return true
case StateOpen:
// 检查是否可以转为半开状态
if now.After(cb.nextAttempt) {
cb.state = StateHalfOpen
return true
}
return false
case StateHalfOpen:
// 半开状态只允许一个请求
if now.Before(cb.nextAttempt) {
return false
}
return true
}
return false
}
func (cb *CircuitBreaker) recordResult(err error) {
cb.mu.Lock()
defer cb.mu.Unlock()
if err == nil {
// 成功:重置熔断器
cb.state = StateClosed
cb.failures = 0
return
}
// 失败
cb.failures++
cb.lastFailTime = time.Now()
if cb.failures >= cb.maxFailures {
// 达到最大失败次数,开启熔断
cb.state = StateOpen
cb.nextAttempt = time.Now().Add(cb.resetTimeout)
} else if cb.state == StateHalfOpen {
// 半开状态下失败,重新打开
cb.state = StateOpen
cb.nextAttempt = time.Now().Add(cb.resetTimeout)
}
}
2. 限流器
package ratelimit
import (
"sync"
"time"
)
// RateLimiter 限流器(令牌桶算法)
type RateLimiter struct {
rate float64 // 速率(请求/秒)
capacity int // 桶容量
mu sync sync.Mutex
tokens float64 // 当前令牌数
lastTime time.Time // 上次补充时间
}
func NewRateLimiter(rate float64, capacity int) *RateLimiter {
return &RateLimiter{
rate: rate,
capacity: capacity,
tokens: float64(capacity),
lastTime: time.Now(),
}
}
func (rl *RateLimiter) Allow() bool {
rl.mu.Lock()
defer rl.mu.Unlock()
now := time.Now()
elapsed := now.Sub(rl.lastTime).Seconds()
// 补充令牌
rl.tokens += elapsed * rl.rate
if rl.tokens > float64(rl.capacity) {
rl.tokens = float64(rl.capacity)
}
rl.lastTime = now
// 消费令牌
if rl.tokens >= 1.0 {
rl.tokens -= 1.0
return true
}
return false
}
func (rl *RateLimiter) Wait() {
for !rl.Allow() {
time.Sleep(10 * time.Millisecond)
}
}
// SlidingWindowLimiter 滑动窗口限流器
type SlidingWindowLimiter struct {
windowSize time.Duration // 窗口大小
maxRequests int // 最大请求数
mu sync.Mutex
requests []time.Time // 请求时间戳
}
func NewSlidingWindowLimiter(windowSize time.Duration, maxRequests int) *SlidingWindowLimiter {
return &SlidingWindowLimiter{
windowSize: windowSize,
maxRequests: maxRequests,
requests: make([]time.Time, 0),
}
}
func (sw *SlidingWindowLimiter) Allow() bool {
sw.mu.Lock()
defer sw.mu.Unlock()
now := time.Now()
windowStart := now.Add(-sw.windowSize)
// 清理过期请求
var valid []time.Time
for _, t := range sw.requests {
if t.After(windowStart) {
valid = append(valid, t)
}
}
sw.requests = valid
// 检查是否超过限制
if len(sw.requests) >= sw.maxRequests {
return false
}
// 记录当前请求
sw.requests = append(sw.requests, now)
return true
}
3. 重试机制
package retry
import (
"context"
"errors"
"fmt"
"time"
)
// BackoffStrategy 退避策略
type BackoffStrategy func(attempt int) time.Duration
var (
// ConstantBackoff 固定退避
ConstantBackoff = func(interval time.Duration) BackoffStrategy {
return func(_ int) time.Duration {
return interval
}
}
// LinearBackoff 线性退避
LinearBackoff = func(initialInterval time.Duration, multiplier float64) BackoffStrategy {
return func(attempt int) time.Duration {
return time.Duration(float64(initialInterval) * (1 + multiplier*float64(attempt)))
}
}
// ExponentialBackoff 指数退避
ExponentialBackoff = func(initialInterval time.Duration, maxInterval time.Duration) BackoffStrategy {
return func(attempt int) time.Duration {
interval := time.Duration(float64(initialInterval) * (1 << uint(attempt)))
if interval > maxInterval {
interval = maxInterval
}
return interval
}
}
)
// RetryConfig 重试配置
type RetryConfig struct {
MaxAttempts int // 最大尝试次数
Backoff BackoffStrategy // 退避策略
RetryIf func(error) bool // 重试条件
OnRetry func(attempt int, err error) // 重试回调
}
// Retry 重试执行
func Retry[T any](ctx context.Context, config *RetryConfig, fn func() (T, error)) (T, error) {
var lastErr error
var result T
for attempt := 0; attempt < config.MaxAttempts; attempt++ {
result, lastErr = fn()
if lastErr == nil {
return result, nil
}
// 检查是否需要重试
if config.RetryIf != nil && !config.RetryIf(lastErr) {
break
}
// 触发回调
if config.OnRetry != nil {
config.OnRetry(attempt, lastErr)
}
// 最后一次尝试,不等待
if attempt == config.MaxAttempts-1 {
break
}
// 计算退避时间
backoff := config.Backoff(attempt)
select {
case <-time.After(backoff):
// 继续重试
case <-ctx.Done():
var zero T
return zero, ctx.Err()
}
}
var zero T
return zero, fmt.Errorf("after %d attempts, last error: %w", config.MaxAttempts, lastErr)
}
4. 健康检查
package health
import (
"database/sql"
"encoding/json"
"net/http"
)
// Checker 健康检查器接口
type Checker interface {
Check(ctx context.Context) error
Name() string
}
// HealthCheckResponse 健康检查响应
type HealthCheckResponse struct {
Status string `json:"status"`
Timestamp string `json:"timestamp"`
Checks map[string]string `json:"checks,omitempty"`
}
// SQLChecker 数据库检查器
type SQLChecker struct {
name string
db *sql.DB
}
func (c *SQLChecker) Check(ctx context.Context) error {
return c.db.PingContext(ctx)
}
func (c *SQLChecker) Name() string {
return c.name
}
// HTTPChecker HTTP 服务检查器
type HTTPChecker struct {
name string
url string
}
func (c *HTTPChecker) Check(ctx context.Context) error {
req, _ := http.NewRequestWithContext(ctx, "GET", c.url, nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 500 {
return fmt.Errorf("service unavailable: %d", resp.StatusCode)
}
return nil
}
// HealthHandler 健康检查处理器
func HealthHandler(checkers []Checker) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
response := HealthCheckResponse{
Status: "healthy",
Timestamp: time.Now().Format(time.RFC3339),
Checks: make(map[string]string),
}
allHealthy := true
for _, checker := range checkers {
err := checker.Check(ctx)
if err != nil {
response.Checks[checker.Name()] = err.Error()
allHealthy = false
} else {
response.Checks[checker.Name()] = "ok"
}
}
if !allHealthy {
response.Status = "unhealthy"
w.WriteHeader(http.StatusServiceUnavailable)
} else {
w.WriteHeader(http.StatusOK)
}
json.NewEncoder(w).Encode(response)
}
}
与 Python Agent 交互
1. gRPC 通信
// agent.proto
syntax = "proto3";
package agent;
service AgentService {
// 同步查询
rpc Query(QueryRequest) returns (QueryResponse);
// 流式查询
rpc StreamQuery(QueryRequest) returns (stream QueryChunk);
// 批量查询
rpc BatchQuery(BatchQueryRequest) returns (BatchQueryResponse);
}
message QueryRequest {
string session_id = 1;
string agent_id = 2;
string query = 3;
map<string, string> options = 4;
}
message QueryResponse {
string id = 1;
string answer = 2;
repeated Source sources = 3;
Usage usage = 4;
int64 latency_ms = 5;
}
message QueryChunk {
string id = 1;
string chunk = 2;
bool is_final = 3;
}
message Source {
string id = 1;
string content = 2;
double score = 3;
map<string, string> metadata = 4;
}
message Usage {
int64 prompt_tokens = 1;
int64 completion_tokens = 2;
int64 total_tokens = 3;
}
message BatchQueryRequest {
repeated QueryRequest requests = 1;
}
message BatchQueryResponse {
repeated QueryResponse responses = 1;
}
// client.go
package agent
import (
"context"
"fmt"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type AgentClient struct {
conn *grpc.ClientConn
client pb.AgentServiceClient
}
func NewAgentClient(addr string) (*AgentClient, error) {
conn, err := grpc.Dial(addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
if err != nil {
return nil, err
}
return &AgentClient{
conn: conn,
client: pb.NewAgentServiceClient(conn),
}, nil
}
func (c *AgentClient) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
return c.client.Query(ctx, req)
}
func (c *AgentClient) StreamQuery(ctx context.Context, req *pb.QueryRequest) (<-chan *pb.QueryChunk, error) {
stream, err := c.client.StreamQuery(ctx, req)
if err != nil {
return nil, err
}
resultChan := make(chan *pb.QueryChunk)
go func() {
defer close(resultChan)
for {
chunk, err := stream.Recv()
if err != nil {
return
}
resultChan <- chunk
}
}()
return resultChan, nil
}
func (c *AgentClient) Close() error {
return c.conn.Close()
}
2. 使用示例
package main
import (
"context"
"fmt"
"log"
"time"
)
func main() {
// 连接 Python Agent 服务
client, err := agent.NewAgentClient("localhost:50051")
if err != nil {
log.Fatal(err)
}
defer client.Close()
ctx := context.Background()
// 同步查询
req := &pb.QueryRequest{
SessionId: "session-123",
AgentId: "rag-agent-1",
Query: "什么是 RAG?",
}
start := time.Now()
resp, err := client.Query(ctx, req)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Query: %s\n", req.Query)
fmt.Printf("Answer: %s\n", resp.Answer)
fmt.Printf("Latency: %dms\n", time.Since(start).Milliseconds())
// 流式查询
stream, err := client.StreamQuery(ctx, req)
if err != nil {
log.Fatal(err)
}
fmt.Print("\nStreaming: ")
for chunk := range stream {
fmt.Print(chunk.Chunk)
if chunk.IsFinal {
fmt.Println()
break
}
}
}
实战案例
案例:完整的 Agent 后端服务
// cmd/server/main.go
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/gin-gonic/gin"
)
func main() {
// 初始化
config := loadConfig()
// 初始化依赖
agentClient := initAgentClient(config)
cache := initCache(config)
db := initDB(config)
// 初始化服务
agentService := service.NewAgentService(agentClient, cache, db)
// 初始化 HTTP 服务器
router := setupRouter(agentService, config)
// 启动服务器
server := &http.Server{
Addr: config.HTTPAddr,
Handler: router,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 120 * time.Second,
}
// 启动
go func() {
log.Printf("Server started on %s", config.HTTPAddr)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Server error: %v", err)
}
}()
// 优雅关闭
gracefulShutdown(server)
}
func setupRouter(agentService *service.AgentService, config *Config) *gin.Engine {
r := gin.Default()
// 健康检查
r.GET("/health", healthHandler([]health.Checker{
&health.SQLChecker{name: "database", db: agentService.DB()},
&health.HTTPChecker{name: "python-agent", url: config.AgentURL},
}))
// API 路由
api := r.Group("/api/v1")
api.Use(middleware.Auth(config.AuthSecret))
api.Use(middleware.RateLimit(config.RateLimit))
// Agent 路由
agent := api.Group("/agent")
{
agent.POST("/query", agentService.QueryHandler)
agent.GET("/stream", agentService.StreamHandler)
agent.POST("/batch", agentService.BatchQueryHandler)
}
// 会话路由
session := api.Group("/session")
{
session.POST("/", agentService.CreateSession)
session.GET("/:id", agentService.GetSession)
session.DELETE("/:id", agentService.CloseSession)
}
return r
}
func gracefulShutdown(server *http.Server) {
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Printf("Server shutdown error: %v", err)
}
log.Println("Server stopped")
}
面试高频问法
Q1: Go 在 Agent 开发中的优势?
【标准回答】
Go 的核心优势:
1. 高性能
- 原生支持并发(Goroutine)
- 高效的调度器
- 低内存占用
2. 高并发
- Goroutine 轻量级(2KB 栈)
- 可轻松创建百万级并发
- 适合处理大量 Agent 请求
3. 易部署
- 单二进制文件
- 无依赖问题
- 快速启动
4. 强类型
- 编译时检查
- 减少运行时错误
- 便于重构
典型分工:
- Go: API 网关、任务调度、连接池管理
- Python: Agent 逻辑、LLM 调用、数据处理
Q2: 如何实现 Go 和 Python 的高效通信?
【标准回答】
通信方式选择:
1. gRPC(推荐)
优势:高性能、类型安全、流式支持
适用:高频调用、复杂协议
2. HTTP/REST
优势:简单、通用
适用:外部 API、简单调用
3. 消息队列
优势:解耦、异步
适用:任务队列、事件驱动
4. 共享存储
优势:无通信开销
适用:状态共享
最佳实践:
- Agent 调用:gRPC
- 外部 API:HTTP
- 长任务:消息队列
- 会话状态:Redis
Q3: 如何保证 Agent 服务的高可用?
【标准回答】
高可用策略:
1. 冗余部署
- 多实例部署
- 负载均衡
- 故障转移
2. 熔断降级
- 熔断器保护
- 降级策略
- 限流保护
3. 健康检查
- 主动健康检查
- 自动重启
- 通知告警
4. 优雅关闭
- 处理完成当前请求
- 释放资源
- 保存状态
5. 可观测性
- 完善日志
- 指标监控
- 链路追踪
记忆要点
【Go 定位】
高性能后端服务
高并发任务调度
分布式系统协调
【架构分层】
Handler → Service → Repository → Infrastructure
【高性能技术】
Goroutine 并发
连接池管理
多级缓存
流式响应
【高可用技术】
熔断器
限流器
重试机制
健康检查
【与 Python 交互】
gRPC 通信(高性能)
HTTP 通信(通用)
消息队列(异步)
共享存储(状态)
文档版本: 1.0
最后更新: 2026-01-21