【AI Agent 知识库】14-Go后端服务开发

内容纲要

Go 后端服务开发

目标:掌握使用 Go 开发高性能、高可用的 Agent 后端服务


目录


Go 在 Agent 开发中的定位

Go vs Python 分工

维度 Go Python 典型场景
性能 ⭐⭐⭐⭐⭐ ⭐⭐⭐ Go: 高并发服务; Python: 模型推理
开发效率 ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ Go: 生产力工具; Python: 原型开发
生态 ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ Go: Web 框架; Python: AI 生态
并发 ⭐⭐⭐⭐⭐ ⭐⭐⭐ Go: Goroutine 轻量级
部署 ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ Go: 单二进制; Python: 容器化

典型架构分工

┌─────────────────────────────────────────────────────┐
│                  前端层                           │
│            React / Vue / Mobile                   │
└────────────────────┬──────────────────────────────┘
                     │
┌────────────────────┴──────────────────────────────┐
│              Go 后端服务层                        │
│  ┌────────┬────────┬────────┬────────┐        │
│  │ API GW │  Auth  │  Rate   │  Stream │        │
│  │ateway  │Service │Limiting │ Service │        │
│  └───┬────┴───┬────┴───┬────┴───┬────┘        │
│      │        │        │        │               │
└──────┼────────┼────────┼────────┼───────────────┘
       │        │        │        │
┌──────┼────────┼────────┼────────┼───────────────┐
│       ▼        ▼        ▼        ▼               │
│  ┌────────────────────────────────┐              │
│  │     Python Agent 服务层       │              │
│  │  ┌────┬────┬────┬────┐    │              │
│  │  │RAG │Tool │Lang │Auto│    │              │
│  │  │Agt │Mgmt │Chain│Gen │    │              │
│  │  └────┴────┴────┴────┘    │              │
│  └────────────────────────────────┘              │
└─────────────────────────────────────────────────────┘
         │
┌────────┼───────────────────────────────────────┐
│        ▼                                        │
│  ┌────────┬────────┬────────┬────────┐         │
│  │  LLM   │Vector DB│  Redis │  Postgres│       │
│  └────────┴────────┴────────┴────────┘         │
└─────────────────────────────────────────────────────┘

通信方式

// 1. gRPC 通信(高性能)
syntax = "proto3";

service agent {
    rpc Query(QueryRequest) returns (QueryResponse);
    rpc StreamQuery(QueryRequest) returns (stream QueryResponse);
}

// 2. HTTP REST 通信(通用)
GET /api/v1/query
POST /api/v1/query

// 3. WebSocket 通信(实时)
ws://api/v1/stream

核心架构设计

1. 分层架构

┌─────────────────────────────────────┐
│         Handler Layer             │  ← 请求处理
├─────────────────────────────────────┤
│         Service Layer             │  ← 业务逻辑
├─────────────────────────────────────┤
│         Repository Layer          │  ← 数据访问
├─────────────────────────────────────┤
│         Infrastructure Layer     │    ← 基础设施
└─────────────────────────────────────┘

2. 代码结构

agent-backend/
├── cmd/
│   └── server/
│       └── main.go              ← 程序入口
├── internal/
│   ├── api/
│   │   ├── handler/            ← HTTP 处理器
│   │   │   ├── query.go
│   │   │   ├── health.go
│   │   │   └── websocket.go
│   │   └── middleware/         ← 中间件
│   │       ├── auth.go
│   │       ├── logging.go
│   │       └── ratelimit.go
│   ├── service/                 ← 业务服务层
│   │   ├── agent_service.go
│   │   ├── rag_service.go
│   │   └── stream_service.go
│   ├── repository/              ← 数据访问层
│   │   ├── cache.go
│   │   ├── db.go
│   │   └── vector_store.go
│   ├── domain/                   ← 领域模型
│   │   ├── agent.go
│   │   ├── message.go
│   │   └── session.go
│   └── pkg/                     ← 内部包
│       ├── llm/
│       │   └── client.go
│       └── python/
│           └── agent_client.go
├── config/
│   └── config.go                ← 配置
├── deployments/
│   ├── docker/
│   │   └── Dockerfile
│   └── k8s/
│       └── deployment.yaml
├── go.mod
├── go.sum
└── README.md

3. 核心模型定义

package domain

import "time"

// Agent Agent 实体
type Agent struct {
    ID          string            `json:"id"`
    Name        string            `json:"name"`
    Type        string            `json:"type"`        // "rag", "tool", "chat"
    Config      map[string]any    `json:"config"`
    State       string            `json:"state"`       // "active", "inactive"
    CreatedAt   time.Time         `json:"created_at"`
    UpdatedAt   time.Time         `json:"updated_at"`
}

// Message 消息实体
type Message struct {
    ID          string            `json:"id"`
    SessionID   string            `json:"session_id"`
    Role        string            `json:"role"`        // "user", "assistant", "system"
    Content     string            `json:"content"`
    Metadata    map[string]any    `json:"metadata"`
    CreatedAt   time.Time         `json:"created_at"`
}

// Session 会话实体
type Session struct {
    ID          string            `json:"id"`
    UserID      string            `json:"user_id"`
    AgentID     string            `json:"agent_id"`
    State       string            `json:"state"`       // "active", "closed"
    Context     map[string]any    `json:"context"`
    CreatedAt   time.Time         `json:"created_at"`
    UpdatedAt   time.Time         `json:"updated_at"`
}

// QueryRequest 查询请求
type QueryRequest struct {
    SessionID   string            `json:"session_id,omitempty"`
    AgentID     string            `json:"agent_id"`
    Query       string            `json:"query"`
    Options     map[string]any    `json:"options,omitempty"`
}

// QueryResponse 查询响应
type QueryResponse struct {
    ID          string            `json:"id"`
    Answer      string            `json:"answer"`
    Sources     []Source          `json:"sources,omitempty"`
    Usage       *Usage            `json:"usage,omitempty"`
    Latency     int64             `json:"latency_ms"`
}

// Source 来源信息
type Source struct {
    ID          string            `json:"id"`
    Content     string            `json:"content"`
    Score       float64           `json:"score"`
    Metadata    map[string]any    `json:"metadata,omitempty"`
}

// Usage 使用统计
type Usage struct {
    PromptTokens     int64   `json:"prompt_tokens"`
    CompletionTokens int64   `json:"completion_tokens"`
    TotalTokens      int64   `json:"total_tokens"`
}

高性能实现

1. 并发处理

package service

import (
    "context"
    "sync"
    "time"
)

// ConcurrentExecutor 并发执行器
type ConcurrentExecutor struct {
    workerCount int
    timeout     time.Duration
}

func NewConcurrentExecutor(workers int, timeout time.Duration) *ConcurrentExecutor {
    return &ConcurrentExecutor{
        workerCount: workers,
        timeout:     timeout,
    }
}

// Execute 并行执行任务
func (e *ConcurrentExecutor) Execute[T any](
    ctx context.Context,
    tasks []func(context.Context) (T, error),
) ([]T, error) {
    results := make([]T, len(tasks))
    errors := make([]error, len(tasks))

    var wg sync.WaitGroup
    sem := make(chan struct{}, e.workerCount) // 信号量限制并发数

    for i, task := range tasks {
        wg.Add(1)
        go func(idx int, t func(context.Context) (T, error)) {
            defer wg.Done()

            sem <- struct{}{}        // 获取信号量
            defer func() { <-sem }() // 释放信号量

            taskCtx, cancel := context.WithTimeout(ctx, e.timeout)
            defer cancel()

            result, err := t(taskCtx)
            results[idx] = result
            errors[idx] = err
        }(i, task)
    }

    wg.Wait()

    // 检查错误
    for _, err := range errors {
        if err != nil {
            return results, err
        }
    }

    return results, nil
}

// 使用示例:并行检索和意图分析
func (s *AgentService) ParallelProcess(ctx context.Context, query string) (*QueryResponse, error) {
    executor := NewConcurrentExecutor(3, 5*time.Second)

    // 并行执行多个任务
    results, err := executor.Execute(ctx, []func(context.Context) (any, error){
        func(ctx context.Context) (any, error) {
            return s.vectorStore.Search(ctx, query, 5)
        },
        func(ctx context.Context) (any, error) {
            return s.classifier.Classify(ctx, query)
        },
        func(ctx context.Context) (any, error) {
            return s.cache.Get(ctx, query)
        },
    })

    if err != nil {
        return nil, err
    }

    documents := results[0].([]Document)
    intent := results[1].(string)
    cached := results[2].(*QueryResponse)

    // 合并结果
    // ...
}

2. 连接池管理

package pool

import (
    "context"
    "errors"
    "sync"
    "time"
)

// Pool 连接池接口
type Pool[T any] interface {
    Get(ctx context.Context) (T, error)
    Put(T) error
    Close() error
}

// GenericPool 通用连接池
type GenericPool[T any] struct {
    factory    func() (T, error)         // 连接工厂
    closer     func(T) error             // 连接关闭
    maxIdle    int                       // 最大空闲连接数
    maxActive  int                       // 最大活跃连接数
    idleTimeout time.Duration            // 空闲超时

    mu         sync.RWMutex
    active     int                       // 活跃连接数
    idle       chan T                    // 空闲连接队列
    closed     bool
}

func NewGenericPool[T any](
    factory func() (T, error),
    closer func(T) error,
    maxIdle, maxActive int,
    idleTimeout time.Duration,
) *GenericPool[T] {
    return &GenericPool[T]{
        factory:     factory,
        closer:      closer,
        maxIdle:     maxIdle,
        maxActive:   maxActive,
        idleTimeout:  idleTimeout,
        idle:        make(chan T, maxIdle),
    }
}

func (p *GenericPool[T]) Get(ctx context.Context) (T, error) {
    p.mu.Lock()
    defer p.mu.Unlock()

    if p.closed {
        var zero T
        return zero, errors.New("pool is closed")
    }

    // 检查活跃连接数
    if p.active >= p.maxActive {
        p.mu.Unlock()
        select {
        case <-ctx.Done():
            var zero T
            return zero, ctx.Err()
        case <-time.After(100 * time.Millisecond):
            // 重试
        }
        p.mu.Lock()
    }

    // 尝试从空闲队列获取
    select {
    case conn := <-p.idle:
        p.active++
        return conn, nil
    default:
        // 创建新连接
        conn, err := p.factory()
        if err != nil {
            var zero T
            return zero, err
        }
        p.active++
        return conn, nil
    }
}

func (p *GenericPool[T]) Put(conn T) error {
    p.mu.Lock()
    defer p.mu.Unlock()

    if p.closed {
        return p.closer(conn)
    }

    p.active--

    // 检查空闲队列是否已满
    select {
    case p.idle <- conn:
        return nil
    default:
        return p.closer(conn)
    }
}

func (p *GenericPool[T]) Close() error {
    p.mu.Lock()
    defer p.mu.Unlock()

    p.closed = true
    close(p.idle)

    var lastErr error
    for conn := range p.idle {
        if err := p.closer(conn); err != nil {
            lastErr = err
        }
    }

    return lastErr
}

3. 缓存优化

package cache

import (
    "context"
    "encoding/json"
    "fmt"
    "sync"
    "time"
)

// Cache 缓存接口
type Cache interface {
    Get(ctx context.Context, key string) (any, error)
    Set(ctx context.Context, key string, value any, ttl time.Duration) error
    Delete(ctx context.Context, key string) error
}

// MultiLevelCache 多级缓存
type MultiLevelCache struct {
    l1 *MemoryCache // L1: 内存缓存
    l2 Cache         // L2: Redis 缓存
}

func NewMultiLevelCache(l1Size int, l2 Cache) *MultiLevelCache {
    return &MultiLevelCache{
        l1: NewMemoryCache(l1Size),
        l2: l2,
    }
}

func (c *MultiLevelCache) Get(ctx context.Context, key string) (any, error) {
    // L1 缓存命中
    if value, found := c.l1.Get(key); found {
        return value, nil
    }

    // L2 缓存命中
    value, err := c.l2.Get(ctx, key)
    if err == nil {
        // 回写 L1
        c.l1.Set(key, value, 5*time.Minute)
        return value, nil
    }

    return nil, err
}

func (c *MultiLevelCache) Set(ctx context.Context, key string, value any, ttl time.Duration) error {
    // 同时写入 L1 和 L2
    c.l1.Set(key, value, ttl)

    if err := c.l2.Set(ctx, key, value, ttl); err != nil {
        return fmt.Errorf("l2 set error: %w", err)
    }

    return nil
}

// MemoryCache 内存缓存(LRU)
type MemoryCache struct {
    mu     sync.RWMutex
    data   map[string]*cacheItem
    lru    *list.List
    maxAge time.Duration
    maxLen int
}

type cacheItem struct {
    value    any
    expires  time.Time
    element  *list.Element
}

func NewMemoryCache(maxLen int) *MemoryCache {
    return &MemoryCache{
        data:   make(map[string]*cacheItem),
        lru:    list.New(),
        maxAge: 5 * time.Minute,
        maxLen: maxLen,
    }
}

func (c *MemoryCache) Get(key string) (any, bool) {
    c.mu.Lock()
    defer c.mu.Unlock()

    item, found := c.data[key]
    if !found {
        return nil, false
    }

    // 检查过期
    if time.Now().After(item.expires) {
        c.remove(key)
        return nil, false
    }

    // 更新 LRU
    c.lru.MoveToFront(item.element)
    return item.value, true
}

func (c *MemoryCache) Set(key string, value any, ttl time.Duration) {
    c.mu.Lock()
    defer c.mu.Unlock()

    if item, found := c.data[key]; found {
        // 更新现有
        item.value = value
        item.expires = time.Now().Add(ttl)
        c.lru.MoveToFront(item.element)
        return
    }

    // 新增
    if len(c.data) >= c.maxLen {
        // 淘汰最旧的
        c.evict()
    }

    item := &cacheItem{
        value:   value,
        expires: time.Now().Add(ttl),
        element: c.lru.PushFront(key),
    }
    c.data[key] = item
}

4. 流式响应

package handler

import (
    "encoding/json"
    "fmt"
    "net/http"
    "time"
)

// StreamHandler 流式处理器
func (h *Handler) StreamHandler(w http.ResponseWriter, r *http.Request) {
    // 设置 SSE 头部
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    // 获取 flusher
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming unsupported", http.StatusInternalServerError)
        return
    }

    // 启动流式生成
    ctx := r.Context()
    resultChan := h.agentService.StreamQuery(ctx, r.URL.Query().Get("query"))

    // 发送事件
    for chunk := range resultChan {
        event := fmt.Sprintf("data: %s\n\n", chunk)
        if _, err := fmt.Fprint(w, event); err != nil {
            return
        }
        flusher.Flush()
    }

    // 发送结束事件
    fmt.Fprint(w, "data: [DONE]\n\n")
    flusher.Flush()
}

// StreamQuery 流式查询
func (s *AgentService) StreamQuery(ctx context.Context, query string) <-chan string {
    resultChan := make(chan string)

    go func() {
        defer close(resultChan)

        // 调用 Python Agent 服务获取流式结果
        chunks := s.pythonClient.StreamQuery(ctx, query)

        for chunk := range chunks {
            select {
            case resultChan <- chunk:
            case <-ctx.Done():
                return
            }
        }
    }()

    return resultChan
}

高可用实现

1. 熔断器

package circuitbreaker

import (
    "context"
    "errors"
    "sync"
    "time"
)

var (
    ErrOpenState     = errors.New("circuit breaker is open")
    ErrHalfOpenWait = errors.New("circuit breaker is half-open")
)

// State 熔断器状态
type State string

const (
    StateClosed   State = "closed"   // 正常状态
    StateOpen     State = "open"     // 熔断状态
    StateHalfOpen State = "half-open" // 半开状态
)

// CircuitBreaker 熔断器
type CircuitBreaker struct {
    maxFailures   int                    // 最大失败次数
    resetTimeout  time.Duration          // 重置超时
    halfOpenTimeout time.Duration        // 半开状态超时

    mu           sync.RWMutex
    state        State
    failures     int
    lastFailTime time.Time
    nextAttempt   time.Time
}

func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        maxFailures:   maxFailures,
        resetTimeout:   resetTimeout,
        halfOpenTimeout: time.Minute,
        state:          StateClosed,
    }
}

func (cb *CircuitBreaker) Call(ctx context.Context, fn func() error) error {
    if !cb.allowRequest() {
        return ErrOpenState
    }

    err := fn()

    cb.recordResult(err)

    return err
}

func (cb *CircuitBreaker) allowRequest() bool {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    now := time.Now()

    switch cb.state {
    case StateClosed:
        return true

    case StateOpen:
        // 检查是否可以转为半开状态
        if now.After(cb.nextAttempt) {
            cb.state = StateHalfOpen
            return true
        }
        return false

    case StateHalfOpen:
        // 半开状态只允许一个请求
        if now.Before(cb.nextAttempt) {
            return false
        }
        return true
    }

    return false
}

func (cb *CircuitBreaker) recordResult(err error) {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    if err == nil {
        // 成功:重置熔断器
        cb.state = StateClosed
        cb.failures = 0
        return
    }

    // 失败
    cb.failures++
    cb.lastFailTime = time.Now()

    if cb.failures >= cb.maxFailures {
        // 达到最大失败次数,开启熔断
        cb.state = StateOpen
        cb.nextAttempt = time.Now().Add(cb.resetTimeout)
    } else if cb.state == StateHalfOpen {
        // 半开状态下失败,重新打开
        cb.state = StateOpen
        cb.nextAttempt = time.Now().Add(cb.resetTimeout)
    }
}

2. 限流器

package ratelimit

import (
    "sync"
    "time"
)

// RateLimiter 限流器(令牌桶算法)
type RateLimiter struct {
    rate     float64       // 速率(请求/秒)
    capacity int           // 桶容量

    mu       sync sync.Mutex
    tokens   float64       // 当前令牌数
    lastTime time.Time     // 上次补充时间
}

func NewRateLimiter(rate float64, capacity int) *RateLimiter {
    return &RateLimiter{
        rate:     rate,
        capacity: capacity,
        tokens:   float64(capacity),
        lastTime: time.Now(),
    }
}

func (rl *RateLimiter) Allow() bool {
    rl.mu.Lock()
    defer rl.mu.Unlock()

    now := time.Now()
    elapsed := now.Sub(rl.lastTime).Seconds()

    // 补充令牌
    rl.tokens += elapsed * rl.rate
    if rl.tokens > float64(rl.capacity) {
        rl.tokens = float64(rl.capacity)
    }
    rl.lastTime = now

    // 消费令牌
    if rl.tokens >= 1.0 {
        rl.tokens -= 1.0
        return true
    }

    return false
}

func (rl *RateLimiter) Wait() {
    for !rl.Allow() {
        time.Sleep(10 * time.Millisecond)
    }
}

// SlidingWindowLimiter 滑动窗口限流器
type SlidingWindowLimiter struct {
    windowSize time.Duration    // 窗口大小
    maxRequests int            // 最大请求数

    mu        sync.Mutex
    requests  []time.Time       // 请求时间戳
}

func NewSlidingWindowLimiter(windowSize time.Duration, maxRequests int) *SlidingWindowLimiter {
    return &SlidingWindowLimiter{
        windowSize: windowSize,
        maxRequests: maxRequests,
        requests:   make([]time.Time, 0),
    }
}

func (sw *SlidingWindowLimiter) Allow() bool {
    sw.mu.Lock()
    defer sw.mu.Unlock()

    now := time.Now()
    windowStart := now.Add(-sw.windowSize)

    // 清理过期请求
    var valid []time.Time
    for _, t := range sw.requests {
        if t.After(windowStart) {
            valid = append(valid, t)
        }
    }
    sw.requests = valid

    // 检查是否超过限制
    if len(sw.requests) >= sw.maxRequests {
        return false
    }

    // 记录当前请求
    sw.requests = append(sw.requests, now)
    return true
}

3. 重试机制

package retry

import (
    "context"
    "errors"
    "fmt"
    "time"
)

// BackoffStrategy 退避策略
type BackoffStrategy func(attempt int) time.Duration

var (
    // ConstantBackoff 固定退避
    ConstantBackoff = func(interval time.Duration) BackoffStrategy {
        return func(_ int) time.Duration {
            return interval
        }
    }

    // LinearBackoff 线性退避
    LinearBackoff = func(initialInterval time.Duration, multiplier float64) BackoffStrategy {
        return func(attempt int) time.Duration {
            return time.Duration(float64(initialInterval) * (1 + multiplier*float64(attempt)))
        }
    }

    // ExponentialBackoff 指数退避
    ExponentialBackoff = func(initialInterval time.Duration, maxInterval time.Duration) BackoffStrategy {
        return func(attempt int) time.Duration {
            interval := time.Duration(float64(initialInterval) * (1 << uint(attempt)))
            if interval > maxInterval {
                interval = maxInterval
            }
            return interval
        }
    }
)

// RetryConfig 重试配置
type RetryConfig struct {
    MaxAttempts    int                // 最大尝试次数
    Backoff        BackoffStrategy    // 退避策略
    RetryIf        func(error) bool  // 重试条件
    OnRetry        func(attempt int, err error) // 重试回调
}

// Retry 重试执行
func Retry[T any](ctx context.Context, config *RetryConfig, fn func() (T, error)) (T, error) {
    var lastErr error
    var result T

    for attempt := 0; attempt < config.MaxAttempts; attempt++ {
        result, lastErr = fn()

        if lastErr == nil {
            return result, nil
        }

        // 检查是否需要重试
        if config.RetryIf != nil && !config.RetryIf(lastErr) {
            break
        }

        // 触发回调
        if config.OnRetry != nil {
            config.OnRetry(attempt, lastErr)
        }

        // 最后一次尝试,不等待
        if attempt == config.MaxAttempts-1 {
            break
        }

        // 计算退避时间
        backoff := config.Backoff(attempt)

        select {
        case <-time.After(backoff):
            // 继续重试
        case <-ctx.Done():
            var zero T
            return zero, ctx.Err()
        }
    }

    var zero T
    return zero, fmt.Errorf("after %d attempts, last error: %w", config.MaxAttempts, lastErr)
}

4. 健康检查

package health

import (
    "database/sql"
    "encoding/json"
    "net/http"
)

// Checker 健康检查器接口
type Checker interface {
    Check(ctx context.Context) error
    Name() string
}

// HealthCheckResponse 健康检查响应
type HealthCheckResponse struct {
    Status    string            `json:"status"`
    Timestamp string            `json:"timestamp"`
    Checks   map[string]string `json:"checks,omitempty"`
}

// SQLChecker 数据库检查器
type SQLChecker struct {
    name string
    db   *sql.DB
}

func (c *SQLChecker) Check(ctx context.Context) error {
    return c.db.PingContext(ctx)
}

func (c *SQLChecker) Name() string {
    return c.name
}

// HTTPChecker HTTP 服务检查器
type HTTPChecker struct {
    name string
    url  string
}

func (c *HTTPChecker) Check(ctx context.Context) error {
    req, _ := http.NewRequestWithContext(ctx, "GET", c.url, nil)
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    if resp.StatusCode >= 500 {
        return fmt.Errorf("service unavailable: %d", resp.StatusCode)
    }

    return nil
}

// HealthHandler 健康检查处理器
func HealthHandler(checkers []Checker) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        ctx := r.Context()

        response := HealthCheckResponse{
            Status:    "healthy",
            Timestamp: time.Now().Format(time.RFC3339),
            Checks:     make(map[string]string),
        }

        allHealthy := true

        for _, checker := range checkers {
            err := checker.Check(ctx)
            if err != nil {
                response.Checks[checker.Name()] = err.Error()
                allHealthy = false
            } else {
                response.Checks[checker.Name()] = "ok"
            }
        }

        if !allHealthy {
            response.Status = "unhealthy"
            w.WriteHeader(http.StatusServiceUnavailable)
        } else {
            w.WriteHeader(http.StatusOK)
        }

        json.NewEncoder(w).Encode(response)
    }
}

与 Python Agent 交互

1. gRPC 通信

// agent.proto
syntax = "proto3";

package agent;

service AgentService {
    // 同步查询
    rpc Query(QueryRequest) returns (QueryResponse);

    // 流式查询
    rpc StreamQuery(QueryRequest) returns (stream QueryChunk);

    // 批量查询
    rpc BatchQuery(BatchQueryRequest) returns (BatchQueryResponse);
}

message QueryRequest {
    string session_id = 1;
    string agent_id = 2;
    string query = 3;
    map<string, string> options = 4;
}

message QueryResponse {
    string id = 1;
    string answer = 2;
    repeated Source sources = 3;
    Usage usage = 4;
    int64 latency_ms = 5;
}

message QueryChunk {
    string id = 1;
    string chunk = 2;
    bool is_final = 3;
}

message Source {
    string id = 1;
    string content = 2;
    double score = 3;
    map<string, string> metadata = 4;
}

message Usage {
    int64 prompt_tokens = 1;
    int64 completion_tokens = 2;
    int64 total_tokens = 3;
}

message BatchQueryRequest {
    repeated QueryRequest requests = 1;
}

message BatchQueryResponse {
    repeated QueryResponse responses = 1;
}
// client.go
package agent

import (
    "context"
    "fmt"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
)

type AgentClient struct {
    conn   *grpc.ClientConn
    client pb.AgentServiceClient
}

func NewAgentClient(addr string) (*AgentClient, error) {
    conn, err := grpc.Dial(addr,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithBlock(),
    )
    if err != nil {
        return nil, err
    }

    return &AgentClient{
        conn:   conn,
        client: pb.NewAgentServiceClient(conn),
    }, nil
}

func (c *AgentClient) Query(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
    return c.client.Query(ctx, req)
}

func (c *AgentClient) StreamQuery(ctx context.Context, req *pb.QueryRequest) (<-chan *pb.QueryChunk, error) {
    stream, err := c.client.StreamQuery(ctx, req)
    if err != nil {
        return nil, err
    }

    resultChan := make(chan *pb.QueryChunk)

    go func() {
        defer close(resultChan)
        for {
            chunk, err := stream.Recv()
            if err != nil {
                return
            }
            resultChan <- chunk
        }
    }()

    return resultChan, nil
}

func (c *AgentClient) Close() error {
    return c.conn.Close()
}

2. 使用示例

package main

import (
    "context"
    "fmt"
    "log"
    "time"
)

func main() {
    // 连接 Python Agent 服务
    client, err := agent.NewAgentClient("localhost:50051")
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    ctx := context.Background()

    // 同步查询
    req := &pb.QueryRequest{
        SessionId: "session-123",
        AgentId:   "rag-agent-1",
        Query:     "什么是 RAG?",
    }

    start := time.Now()
    resp, err := client.Query(ctx, req)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("Query: %s\n", req.Query)
    fmt.Printf("Answer: %s\n", resp.Answer)
    fmt.Printf("Latency: %dms\n", time.Since(start).Milliseconds())

    // 流式查询
    stream, err := client.StreamQuery(ctx, req)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Print("\nStreaming: ")
    for chunk := range stream {
        fmt.Print(chunk.Chunk)
        if chunk.IsFinal {
            fmt.Println()
            break
        }
    }
}

实战案例

案例:完整的 Agent 后端服务

// cmd/server/main.go
package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/gin-gonic/gin"
)

func main() {
    // 初始化
    config := loadConfig()

    // 初始化依赖
    agentClient := initAgentClient(config)
    cache := initCache(config)
    db := initDB(config)

    // 初始化服务
    agentService := service.NewAgentService(agentClient, cache, db)

    // 初始化 HTTP 服务器
    router := setupRouter(agentService, config)

    // 启动服务器
    server := &http.Server{
        Addr:         config.HTTPAddr,
        Handler:      router,
        ReadTimeout:  30 * time.Second,
        WriteTimeout: 30 * time.Second,
        IdleTimeout:  120 * time.Second,
    }

    // 启动
    go func() {
        log.Printf("Server started on %s", config.HTTPAddr)
        if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("Server error: %v", err)
        }
    }()

    // 优雅关闭
    gracefulShutdown(server)
}

func setupRouter(agentService *service.AgentService, config *Config) *gin.Engine {
    r := gin.Default()

    // 健康检查
    r.GET("/health", healthHandler([]health.Checker{
        &health.SQLChecker{name: "database", db: agentService.DB()},
        &health.HTTPChecker{name: "python-agent", url: config.AgentURL},
    }))

    // API 路由
    api := r.Group("/api/v1")
    api.Use(middleware.Auth(config.AuthSecret))
    api.Use(middleware.RateLimit(config.RateLimit))

    // Agent 路由
    agent := api.Group("/agent")
    {
        agent.POST("/query", agentService.QueryHandler)
        agent.GET("/stream", agentService.StreamHandler)
        agent.POST("/batch", agentService.BatchQueryHandler)
    }

    // 会话路由
    session := api.Group("/session")
    {
        session.POST("/", agentService.CreateSession)
        session.GET("/:id", agentService.GetSession)
        session.DELETE("/:id", agentService.CloseSession)
    }

    return r
}

func gracefulShutdown(server *http.Server) {
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit

    log.Println("Shutting down server...")

    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    if err := server.Shutdown(ctx); err != nil {
        log.Printf("Server shutdown error: %v", err)
    }

    log.Println("Server stopped")
}

面试高频问法

Q1: Go 在 Agent 开发中的优势?

【标准回答】

Go 的核心优势:

1. 高性能
   - 原生支持并发(Goroutine)
   - 高效的调度器
   - 低内存占用

2. 高并发
   - Goroutine 轻量级(2KB 栈)
   - 可轻松创建百万级并发
   - 适合处理大量 Agent 请求

3. 易部署
   - 单二进制文件
   - 无依赖问题
   - 快速启动

4. 强类型
   - 编译时检查
   - 减少运行时错误
   - 便于重构

典型分工:
- Go: API 网关、任务调度、连接池管理
- Python: Agent 逻辑、LLM 调用、数据处理

Q2: 如何实现 Go 和 Python 的高效通信?

【标准回答】

通信方式选择:

1. gRPC(推荐)
   优势:高性能、类型安全、流式支持
   适用:高频调用、复杂协议

2. HTTP/REST
   优势:简单、通用
   适用:外部 API、简单调用

3. 消息队列
   优势:解耦、异步
   适用:任务队列、事件驱动

4. 共享存储
   优势:无通信开销
   适用:状态共享

最佳实践:
- Agent 调用:gRPC
- 外部 API:HTTP
- 长任务:消息队列
- 会话状态:Redis

Q3: 如何保证 Agent 服务的高可用?

【标准回答】

高可用策略:

1. 冗余部署
   - 多实例部署
   - 负载均衡
   - 故障转移

2. 熔断降级
   - 熔断器保护
   - 降级策略
   - 限流保护

3. 健康检查
   - 主动健康检查
   - 自动重启
   - 通知告警

4. 优雅关闭
   - 处理完成当前请求
   - 释放资源
   - 保存状态

5. 可观测性
   - 完善日志
   - 指标监控
   - 链路追踪

记忆要点

【Go 定位】

高性能后端服务
高并发任务调度
分布式系统协调

【架构分层】

Handler → Service → Repository → Infrastructure

【高性能技术】

Goroutine 并发
连接池管理
多级缓存
流式响应

【高可用技术】

熔断器
限流器
重试机制
健康检查

【与 Python 交互】

gRPC 通信(高性能)
HTTP 通信(通用)
消息队列(异步)
共享存储(状态)

文档版本: 1.0
最后更新: 2026-01-21

close
arrow_upward