【AI Agent 知识库】26-强化学习在Agent中的应用

内容纲要

强化学习在Agent中的应用

目录

  1. 强化学习概述
  2. Agent环境设计
  3. 奖励函数设计
  4. 策略网络
  5. 训练流程
  6. 推理与部署
  7. 高级技术
  8. 实现示例

1. 强化学习概述

1.1 什么是强化学习

强化学习(Reinforcement Learning)是一种机器学习方法,Agent通过与环境交互学习最优策略。

┌─────────────────────────────────────────────┐
│              强化学习循环                      │
├─────────────────────────────────────────────┤
│                                                    │
│   ┌─────────┐                                 │
│   │  Agent  │                                 │
│   └────┬────┘                                 │
│        │ Action                                  │
│        ▼                                         │
│   ┌─────────┐     ┌──────────┐                │
│   │Environment│────►│  Reward  │                │
│   └────┬────┘     └──────────┘                │
│        │            State                           │
│        ▼                                            │
│   ┌──────────────────────────────────────┐          │
│   │   Policy (π): State → Action        │          │
│   └──────────────────────────────────────┘          │
│                                                    │
└─────────────────────────────────────────────┘

1.2 RL在Agent中的应用场景

场景 描述 RL方法
任务规划 学习最优任务执行顺序 Multi-agent RL
对话策略 学习对话轮次和内容 PPO、DQN
工具选择 学习何时调用哪个工具 Bandit、Contextual Bandit
参数调优 学习提示词、温度等参数 Bayesian Optimization
轨迹优化 优化多轮对话轨迹 Reward Shaping

1.3 RL核心概念

from abc import ABC, abstractmethod
from typing import Any, Dict, List, Tuple
from dataclasses import dataclass
from enum import Enum

class ActionType(Enum):
    """动作类型"""
    TEXT_RESPONSE = "text_response"
    TOOL_CALL = "tool_call"
    END_CONVERSATION = "end_conversation"

@dataclass
class State:
    """状态"""
    user_query: str
    conversation_history: List[Dict]
    available_tools: List[str]
    context: Dict[str, Any]

@dataclass
class Action:
    """动作"""
    type: ActionType
    content: str
    tool_name: str = None
    tool_args: Dict = None

@dataclass
class Transition:
    """转移"""
    state: State
    action: Action
    next_state: State
    reward: float
    done: bool

2. Agent环境设计

2.1 基础环境接口

from gym import Env, Space
import gym
from typing import Any, Dict, List

class AgentEnvironment(Env):
    """Agent强化学习环境"""

    def __init__(
        self,
        max_turns: int = 10,
        available_tools: List[str] = None
    ):
        self.max_turns = max_turns
        self.available_tools = available_tools or []
        self.current_turn = 0

        # 状态空间定义
        self.observation_space = gym.spaces.Dict({
            'user_query': gym.spaces.Text(max_length=1000),
            'conversation_length': gym.spaces.Discrete(100),
            'tool_availability': gym.spaces.MultiBinary(len(self.available_tools))
        })

        # 动作空间定义
        self.action_space = gym.spaces.Dict({
            'action_type': gym.spaces.Discrete(3),  # 3种动作类型
            'tool_index': gym.spaces.Discrete(len(self.available_tools) + 1),  # +1 for no tool
            'response_length': gym.spaces.Discrete(500)
        })

    def reset(self, seed=None) -> State:
        """重置环境"""
        self.current_turn = 0
        self.conversation_history = []

        return State(
            user_query="",
            conversation_history=[],
            available_tools=self.available_tools.copy(),
            context={}
        )

    def step(self, action: Action) -> Tuple[State, float, bool, Dict]:
        """
        执行一步

        Returns:
            (next_state, reward, done, info)
        """
        # 执行动作
        result = self._execute_action(action)

        # 计算奖励
        reward = self._calculate_reward(action, result)

        # 检查是否结束
        done = self._is_done(action, result)

        # 更新状态
        next_state = self._get_next_state(action, result)

        # 更新对话历史
        self.conversation_history.append({
            "action": action,
            "result": result
        })
        self.current_turn += 1

        # 额外信息
        info = {
            "turn": self.current_turn,
            "result": result
        }

        return next_state, reward, done, info

    def _execute_action(self, action: Action) -> Dict:
        """执行动作"""
        if action.type == ActionType.TOOL_CALL:
            # 调用工具
            return self._call_tool(action)
        elif action.type == ActionType.TEXT_RESPONSE:
            # 生成文本响应
            return self._generate_response(action)
        else:
            # 结束对话
            return {"status": "ended"}

    def _call_tool(self, action: Action) -> Dict:
        """调用工具(模拟)"""
        tool_name = action.tool_name
        tool_args = action.tool_args or {}

        # 简化实现:实际应调用真实工具
        return {
            "tool_name": tool_name,
            "args": tool_args,
            "result": f"Tool {tool_name} executed",
            "success": True
        }

    def _generate_response(self, action: Action) -> Dict:
        """生成文本响应(模拟)"""
        return {
            "response": action.content,
            "success": True
        }

    def _calculate_reward(self, action: Action, result: Dict) -> float:
        """计算奖励"""
        reward = 0

        # 动作成功奖励
        if result.get("success"):
            reward += 1

        # 工具调用奖励
        if action.type == ActionType.TOOL_CALL:
            reward += 0.5

        # 简洁性奖励
        if action.type == ActionType.TEXT_RESPONSE:
            length = len(action.content)
            if 50 < length < 200:
                reward += 0.3
            elif length > 500:
                reward -= 0.5

        # 轮次惩罚
        if self.current_turn > self.max_turns:
            reward -= 1

        return reward

    def _is_done(self, action: Action, result: Dict) -> bool:
        """判断是否结束"""
        # 明确结束
        if action.type == ActionType.END_CONVERSATION:
            return True

        # 超过最大轮次
        if self.current_turn >= self.max_turns:
            return True

        # 目标达成(示例:问题被解决)
        if result.get("solved"):
            return True

        return False

    def _get_next_state(self, action: Action, result: Dict) -> State:
        """获取下一状态"""
        return State(
            user_query=self._get_current_query(),
            conversation_history=self.conversation_history.copy(),
            available_tools=self.available_tools.copy(),
            context={"last_result": result}
        )

    def _get_current_query(self) -> str:
        """获取当前查询"""
        if self.conversation_history:
            return self.conversation_history[0].get("query", "")
        return ""

    def render(self, mode='human'):
        """渲染环境(可选)"""
        pass

2.2 对话环境

class ConversationEnvironment(AgentEnvironment):
    """对话环境"""

    def __init__(
        self,
        llm_client,
        user_simulator=None,  # �拟用户
        max_turns: int = 5
    ):
        super().__init__(max_turns=max_turns)
        self.llm = llm_client
        self.user_simulator = user_simulator
        self.current_query = ""
        self.conversation_goal = ""

    def set_goal(self, goal: str):
        """设置对话目标"""
        self.conversation_goal = goal

    def reset(self, seed=None) -> State:
        """重置环境"""
        super().reset(seed)

        # 初始化对话
        if self.user_simulator:
            self.current_query = self.user_simulator.get_initial_query()
        else:
            self.current_query = "开始对话"

        return State(
            user_query=self.current_query,
            conversation_history=[],
            available_tools=self.available_tools.copy(),
            context={"goal": self.conversation_goal}
        )

    def _execute_action(self, action: Action) -> Dict:
        """执行对话动作"""
        if action.type == ActionType.TOOL_CALL:
            # 调用工具
            tool_result = self._call_tool(action)

            # 将工具结果添加到对话历史
            self.conversation_history.append({
                "role": "assistant",
                "type": "tool_call",
                "tool_name": action.tool_name,
                "result": tool_result
            })

            return tool_result

        elif action.type == ActionType.TEXT_RESPONSE:
            # 获取对话上下文
            messages = self._build_messages()

            # 添加助手回复
            messages.append({
                "role": "assistant",
                "content": action.content
            })

            # 获取用户反馈
            if self.user_simulator:
                user_feedback = self.user_simulator.respond(
                    action.content,
                    self.conversation_history
                )

                if user_feedback.get("satisfied"):
                    return {"status": "satisfied", "solved": True}

                return user_feedback

            return {"status": "continued"}

    def _build_messages(self) -> List[Dict]:
        """构建LLM消息格式"""
        messages = []

        # 添加初始查询
        if self.current_query:
            messages.append({
                "role": "user",
                "content": self.current_query
            })

        # 添加对话历史
        for turn in self.conversation_history:
            if turn.get("type") == "tool_call":
                messages.append({
                    "role": "assistant",
                    "content": f"Tool {turn['tool_name']} returned: {turn['result']}"
                })
            elif turn.get("role") == "user":
                messages.append({
                    "role": "user",
                    "content": turn.get("content", "")
                })
            elif turn.get("role") == "assistant":
                messages.append({
                    "role": "assistant",
                    "content": turn.get("content", "")
                })

        return messages

2.3 工具选择环境

import numpy as np

class ToolSelectionEnvironment(Env):
    """工具选择环境(简化为Bandit问题)"""

    def __init__(self, tools: List[str]):
        self.tools = tools
        self.n_tools = len(tools)

        # 每个工具的成功率和其他统计
        self.tool_stats = {
            tool: {
                "successes": 0,
                "total_calls": 0,
                "average_reward": 0
            }
            for tool in tools
        }

        # 动作空间:选择工具
        self.action_space = gym.spaces.Discrete(self.n_tools)
        self.observation_space = gym.spaces.Box(
            low=0, high=1, shape=(self.n_tools,), dtype=np.float32
        )

    def reset(self, seed=None) -> np.ndarray:
        """重置环境"""
        return self._get_observation()

    def step(self, action: int) -> Tuple[np.ndarray, float, bool, Dict]:
        """
        执行工具选择

        Args:
            action: 工具索引

        Returns:
            (observation, reward, done, info)
        """
        tool_name = self.tools[action]

        # 模拟工具执行
        success, reward = self._simulate_tool_call(tool_name)

        # 更新统计
        self.tool_stats[tool_name]["total_calls"] += 1
        if success:
            self.tool_stats[tool_name]["successes"] += 1

        # 更新平均奖励
        stats = self.tool_stats[tool_name]
        n = stats["total_calls"]
        stats["average_reward"] = (
            (n - 1) * stats["average_reward"] + reward
        ) / n

        # 获取观察
        observation = self._get_observation()

        info = {
            "tool_name": tool_name,
            "success": success,
            "stats": self.tool_stats[tool_name].copy()
        }

        return observation, reward, False, info

    def _simulate_tool_call(self, tool_name: str) -> Tuple[bool, float]:
        """模拟工具调用(示例)"""
        # 根据工具类型决定成功率和奖励
        tool_success_rates = {
            "search": 0.8,
            "calculate": 0.9,
            "get_weather": 0.7,
            "file_read": 0.85
        }

        success_rate = tool_success_rates.get(tool_name, 0.7)
        success = np.random.random() < success_rate

        if success:
            reward = np.random.uniform(0.5, 1.0)
        else:
            reward = np.random.uniform(-1.0, -0.1)

        return success, reward

    def _get_observation(self) -> np.ndarray:
        """获取观察(工具成功率)"""
        obs = np.zeros(self.n_tools, dtype=np.float32)

        for i, tool in enumerate(self.tools):
            stats = self.tool_stats[tool]
            if stats["total_calls"] > 0:
                obs[i] = stats["successes"] / stats["total_calls"]
            else:
                obs[i] = 0.5  # 初始探索

        return obs

3. 奖励函数设计

3.1 奖励函数类型

from typing import Callable

class RewardFunction(ABC):
    """奖励函数基类"""

    @abstractmethod
    def compute(
        self,
        action: Action,
        result: Dict,
        state: State
    ) -> float:
        """计算奖励"""
        pass

class SuccessReward(RewardFunction):
    """基于成功的奖励"""

    def __init__(self, success_reward: float = 1.0, failure_reward: float = -1.0):
        self.success_reward = success_reward
        self.failure_reward = failure_reward

    def compute(
        self,
        action: Action,
        result: Dict,
        state: State
    ) -> float:
        if result.get("success", False):
            return self.success_reward
        return self.failure_reward

class ShapedReward(RewardFunction):
    """塑造奖励(奖励塑形)"""

    def __init__(
        self,
        success_weight: float = 1.0,
        efficiency_weight: float = 0.3,
        penalty_weight: float = 0.5,
        goal_achievement_weight: float = 2.0
    ):
        self.success_weight = success_weight
        self.efficiency_weight = efficiency_weight
        self.penalty_weight = penalty_weight
        self.goal_achievement_weight = goal_achievement_weight

    def compute(
        self,
        action: Action,
        result: Dict,
        state: State
    ) -> float:
        reward = 0

        # 1. 基础成功奖励
        if result.get("success"):
            reward += self.success_weight
        else:
            reward -= self.success_weight * 0.5

        # 2. 效率奖励(快速完成)
        turns_used = len(state.conversation_history)
        if turns_used < 5:
            reward += self.efficiency_weight
        elif turns_used > 10:
            reward -= self.efficiency_weight * 0.5

        # 3. 惩罚(不必要的操作)
        if action.type == ActionType.TOOL_CALL:
            # 检查是否重复调用同一工具
            last_tool = self._get_last_tool(state)
            if last_tool == action.tool_name:
                reward -= self.penalty_weight

        # 4. 目标达成奖励
        if result.get("solved"):
            reward += self.goal_achievement_weight

        # 5. 惩罚(过长响应)
        if action.type == ActionType.TEXT_RESPONSE:
            length = len(action.content)
            if length > 500:
                reward -= self.penalty_weight * (length / 500)

        return reward

    def _get_last_tool(self, state: State) -> str:
        """获取上次调用的工具"""
        if state.conversation_history:
            last = state.conversation_history[-1]
            if last.get("type") == "tool_call":
                return last.get("tool_name")
        return None

class UserFeedbackReward(RewardFunction):
    """基于用户反馈的奖励"""

    def __init__(
        self,
        positive_reward: float = 1.0,
        negative_reward: float = -1.0,
        neutral_reward: float = 0.0
    ):
        self.positive_reward = positive_reward
        self.negative_reward = negative_reward
        self.neutral_reward = neutral_reward

    def compute(
        self,
        action: Action,
        result: Dict,
        state: State
    ) -> float:
        feedback = result.get("user_feedback")

        if feedback == "positive":
            return self.positive_reward
        elif feedback == "negative":
            return self.negative_reward
        elif feedback == "satisfied":
            return self.positive_reward * 2
        elif feedback == "unsatisfied":
            return self.negative_reward * 2
        else:
            return self.neutral_reward

class CompositeReward(RewardFunction):
    """组合奖励函数"""

    def __init__(self, reward_functions: List[RewardFunction], weights: List[float] = None):
        self.reward_functions = reward_functions

        if weights is None:
            self.weights = [1.0] * len(reward_functions)
        else:
            self.weights = weights

        assert len(self.weights) == len(reward_functions)

    def compute(
        self,
        action: Action,
        result: Dict,
        state: State
    ) -> float:
        total_reward = 0

        for rf, weight in zip(self.reward_functions, self.weights):
            reward = rf.compute(action, result, state)
            total_reward += weight * reward

        return total_reward

3.2 �励塑形技术

class RewardShaper:
    """奖励塑形器"""

    @staticmethod
    def potential_based_reward(
        raw_reward: float,
        gamma: float = 0.99
    ) -> float:
        """
        基于势能的奖励塑形

        可以加速训练,避免奖励尺度问题
        """
        return raw_reward / (1 - gamma)

    @staticmethod
    def reward_clipping(
        reward: float,
        min_reward: float = -10.0,
        max_reward: float = 10.0
    ) -> float:
        """奖励裁剪"""
        return max(min_reward, min(reward, max_reward))

    @staticmethod
    def normalize_reward(
        reward: float,
        mean: float,
        std: float
    ) -> float:
        """标准化奖励"""
        if std == 0:
            return 0
        return (reward - mean) / std

4. 策略网络

4.1 简单策略网络

import torch
import torch.nn as nn
import torch.nn.functional as F

class AgentPolicyNetwork(nn.Module):
    """Agent策略网络"""

    def __init__(
        self,
        state_dim: int,
        action_dim: int,
        hidden_dims: List[int] = [256, 128]
    ):
        super().__init__()

        # 构建隐藏层
        layers = []
        input_dim = state_dim

        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(input_dim, hidden_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(0.2))
            input_dim = hidden_dim

        self.hidden_layers = nn.Sequential(*layers)

        # 输出层
        self.output_layer = nn.Linear(input_dim, action_dim)

    def forward(self, state: torch.Tensor) -> torch.Tensor:
        """前向传播"""
        x = self.hidden_layers(state)
        action_logits = self.output_layer(x)
        return action_logits

    def get_action(self, state: np.ndarray, deterministic: bool = False) -> int:
        """获取动作"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)

        with torch.no_grad():
            logits = self.forward(state_tensor)
            probs = F.softmax(logits, dim=-1)

            if deterministic:
                action = probs.argmax(dim=-1).item()
            else:
                action = torch.multinomial(probs, 1).item()

        return action

4.2 Actor-Critic网络

class ActorCriticNetwork(nn.Module):
    """Actor-Critic网络"""

    def __init__(
        self,
        state_dim: int,
        action_dim: int,
        hidden_dim: int = 256
    ):
        super().__init__()

        # 共享特征提取器
        self.feature_extractor = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.2)
        )

        # Actor网络(输出动作概率)
        self.actor = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Linear(hidden_dim // 2:, action_dim),
            nn.Softmax(dim=-1)
        )

        # Critic网络(输出价值)
        self.critic = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Linear(hidden_dim // 2:, 1)
        )

    def forward(self, state: torch.Tensor) -> tuple:
        """前向传播"""
        features = self.feature_extractor(state)
        action_probs = self.actor(features)
        value = self.critic(features)
        return action_probs, value.squeeze(-1)

5. 训练流程

5.1 PPO训练

class PPOAgent:
    """PPO Agent"""

    def __init__(
        self,
        env: AgentEnvironment,
        state_dim: int,
        action_dim: int,
        lr: float = 3e-4,
        gamma: float = 0.99,
        clip_eps: float = 0.2
    ):
        self.env = env
        self.gamma = gamma
        self.clip_eps = clip_eps

        # 策略网络
        self.policy = ActorCriticNetwork(state_dim, action_dim)

        # 优化器
        self.optimizer = torch.optim.Adam(
            self.policy.parameters(),
            lr=lr
        )

        # 存储经验
        self.episode_rewards = []

    def collect_trajectories(
        self,
        n_episodes: int,
        max_steps: int = 1000
    ) -> List[Dict]:
        """收集轨迹经验"""
        trajectories = []

        for episode in range(n_episodes):
            state = self.env.reset()
            episode_reward = 0
            done = False
            steps = 0

            states = []
            actions = []
            rewards = []
            log_probs = []
            values = []

            while not done and steps < max_steps:
                # 获取动作
                state_array = self._state_to_array(state)
                action = self.policy.get_action(state_array)

                # 执行动作
                next_state, reward, done, info = self.env.step(Action(
                    type=ActionType.TOOL_CALL,
                    tool_name=self.env.available_tools[action]
                ))

                # 记录经验
                states.append(state)
                actions.append(action)
                rewards.append(reward)
                episode_reward += reward

                state = next_state
                steps += 1

            self.episode_rewards.append(episode_reward)

        return trajectories

    def update(self, trajectories: List[Dict], n_epochs: int = 10):
        """更新策略"""
        for epoch in range(n_epochs):
            epoch_loss = 0

            for trajectory in trajectories:
                # 计算优势函数
                advantages = self._compute_advantages(trajectory["rewards"])
                returns = self._compute_returns(trajectory["rewards"])

                # PPO更新
                loss = self._ppo_update(
                    trajectory["states"],
                    trajectory["actions"],
                    trajectory["log_probs"],
                    advantages,
                    returns
                )

                epoch_loss += loss

            if epoch % 1 == 0:
                print(f"Epoch {epoch}, Loss: {epoch_loss:.4f}")

    def _compute_advantages(self, rewards: List[float]) -> List[float]:
        """计算优势函数"""
        advantages = []
        discounted_sum = 0

        for t in reversed(range(len(rewards))):
            discounted_sum = discounted_sum * self.gamma + rewards[t]
            advantages.append(discounted_sum)

        return list(reversed(advantages))

    def _compute_returns(self, rewards: List[float]) -> List[float]:
        """计算回报"""
        returns = []
        discounted_sum = 0

        for reward in reversed(rewards):
            discounted_sum = discounted_sum * self.gamma + reward
            returns.append(discounted_sum)

        return list(reversed(returns))

    def _ppo_update(
        self,
        states,
        actions,
        old_log_probs,
        advantages,
        returns
    ) -> float:
        """PPO更新步骤(简化)"""
        # 转换为tensor
        states_tensor = torch.stack([
            torch.FloatTensor(self._state_to_array(s))
            for s in states
        ])
        actions_tensor = torch.LongTensor(actions)
        old_log_probs_tensor = torch.FloatTensor(old_log_probs)
        advantages_tensor = torch.FloatTensor(advantages)
        returns_tensor = torch.FloatTensor(returns)

        # 前向传播
        action_probs, values = self.policy(states_tensor)

        # 计算新log prob
        new_log_probs = torch.log(action_probs.gather(1, actions_tensor.unsqueeze(-1)).squeeze(-1))

        # 计算ratio
        ratio = torch.exp(new_log_probs - old_log_probs_tensor)

        # 计算surrogate loss
        surrogate1 = ratio * advantages_tensor
        surrogate2 = torch.clamp(ratio, 1 - self.clip_eps, 1 + self.clip_eps) * advantages_tensor
        policy_loss = -torch.min(surrogate1, surrogate2).mean()

        # 计算价值损失
        value_loss = F.mse_loss(values.squeeze(-1), returns_tensor)

        # 总损失
        total_loss = policy_loss + 0.5 * value_loss

        # 反向传播
        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()

        return total_loss.item()

    def _state_to_array(state: State) -> np.ndarray:
        """将状态转换为数组"""
        # 简化实现
        return np.random.randn(256)

    def train(self, total_timesteps: int = 10000):
        """训练"""
        print("=== 开始PPO训练 ===")

        collected_timesteps = 0
        episode_num = 0

        while collected_timesteps < total_timesteps:
            # 收集轨迹
            trajectories = self.collect_trajectories(n_episodes=10)

            # 更新策略
            self.update(trajectories)

            collected_timesteps += sum(len(t["rewards"]) for t in trajectories)
            episode_num += 1

            # 打印进度
            avg_reward = np.mean(self.episode_rewards[-10:])
            print(f"Episode {episode_num}, Avg Reward: {avg_reward:.2f}")

        print("=== 训练完成 ===")

5.2 DQN训练

class DQNAgent:
    """DQN Agent"""

    def __init__(
        self,
        env: AgentEnvironment,
        state_dim: int,
        action_dim: int,
        lr: float = 1e-3,
        gamma: float = 0.99,
        epsilon: float = 0.1,
        buffer_size: int = 10000,
        batch_size: int = 32
    ):
        self.env = env
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.epsilon = epsilon
        self.batch_size = batch_size

        # Q网络
        self.q_network = AgentPolicyNetwork(state_dim, action_dim)
        self.target_network = AgentPolicyNetwork(state_dim, action_dim)
        self.target_network.load_state_dict(self.q_network.state_dict())

        # 经验回放
        self.replay_buffer = []
        self.buffer_size = buffer_size

        # 优化器
        self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=lr)

    def select_action(self, state: np.ndarray) -> int:
        """选择动作(epsilon-greedy)"""
        if np.random.random() < self.epsilon:
            return np.random.randint(0, self.action_dim)
        else:
            return self.q_network.get_action(state, deterministic=True)

    def add_experience(
        self,
        state: np.ndarray,
        action: int,
        reward: float,
        next_state: np.ndarray,
        done: bool
    ):
        """添加经验"""
        self.replay_buffer.append({
            "state": state,
            "action": action,
            "reward": reward,
            "next_state": next_state,
            "done": done
        })

        # 限制buffer大小
        if len(self.replay_buffer) > self.buffer_size:
            self.replay_buffer.pop(0)

    def train_step(self):
        """训练一步"""
        if len(self.replay_buffer) < self.batch_size:
            return

        # 随机采样
        batch = np.random.choice(self.replay_buffer, self.batch_size, replace=False)

        states = torch.FloatTensor([b["state"] for b in batch])
        actions = torch.LongTensor([b["action"] for b in batch])
        rewards = torch.FloatTensor([b["reward"] for b in batch])
        next_states = torch.FloatTensor([b["next_state"] for b in batch])
        dones = torch.FloatTensor([b["done"] for b in batch])

        # 当前Q值
        current_q = self.q_network(states).gather(1, actions.unsqueeze(-1)).squeeze(-1)

        # 目标Q值
        next_q_values = self.target_network(next_states).max(dim=-1)[0]
        target_q = rewards + self.gamma * next_q_values * (1 - dones)

        # 计算loss
        loss = F.mse_loss(current_q, target_q.detach())

        # 反向传播
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss.item()

    def update_target_network(self):
        """更新目标网络"""
        self.target_network.load_state_dict(self.q_network.state_dict())

    def train(self, num_episodes: int = 1000):
        """训练"""
        print("=== 开始DQN训练 ===")

        for episode in range(num_episodes):
            state = self.env.reset()
            episode_reward = 0
            done = False

            while not done:
                # 选择动作
                state_array = self._state_to_array(state)
                action = self.select_action(state_array)

                # 执行动作
                next_state, reward, done, info = self.env.step(Action(
                    type=ActionType.TOOL_CALL,
                    tool_name=self.env.available_tools[action]
                ))

                # 存储经验
                next_state_array = self._state_to_array(next_state)
                self.add_experience(state_array, action, reward, next_state_array, done)

                # 训练
                loss = self.train_step()

                episode_reward += reward
                state = next_state

            # 定期更新目标网络
            if episode % 10 == 0:
                self.update_target_network()

            # 衰减epsilon
            self.epsilon = max(0.01, self.epsilon * 0.995)

            if episode % 100 == 0:
                print(f"Episode {episode}, Reward: {episode_reward:.2f}, Epsilon: {self.epsilon:.3f}")

        print("=== 训练完成 ===")

    def _state_to_array(self, state: State) -> np.ndarray:
        """将状态转换为数组"""
        return np.random.randn(self.state_dim)

6. 推理与部署

6.1 训练后的策略

class TrainedAgent:
    """训练后的Agent"""

    def __init__(
        self,
        policy_network: ActorCriticNetwork,
        env: AgentEnvironment
    ):
        self.policy = policy_network
        self.env = env
        self.policy.eval()

    def run(self, state: State, max_steps: int = 10) -> List[Action]:
        """运行策略"""
        actions = []
        current_state = state

        for _ in range(max_steps):
            # 选择动作
            state_array = self._state_to_array(current_state)
            action = self._get_action(state_array)
            actions.append(action)

            # 执行动作
            next_state, reward, done, info = self.env.step(action)

            if done:
                break

            current_state = next_state

        return actions

    def _get_action(self, state: np.ndarray) -> Action:
        """获取动作(deterministic)"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)

        with torch.no_grad():
            action_probs, _ = self.policy(state_tensor)
            action = action_probs.argmax(dim=-1).item()

        return Action(
            type=ActionType.TOOL_CALL,
            tool_name=self.env.available_tools[action]
        )

    def _state_to_array(self, state: State) -> np.ndarray:
        return np.random.randn(256)

    def save(self, path: str):
        """保存策略"""
        torch.save({
            "policy_state_dict": self.policy.state_dict(),
            "model_config": {
                "state_dim": self.policy.feature_extractor[0].in_features,
                "action_dim": self.policy.actor[-1].out_features
            }
        }, path)

    @classmethod
    def load(cls, path: str, env: AgentEnvironment) -> 'TrainedAgent':
        """加载策略"""
        checkpoint = torch.load(path)
        model_config = checkpoint["model_config"]

        # 重建网络
        policy = ActorCriticNetwork(
            model_config["state_dim"],
            model_config["action_dim"]
        )
        policy.load_state_dict(checkpoint["policy_state_dict"])

        return cls(policy, env)

7. 高级技术

7.1 分层强化学习(HRL)

class HighLevelAgent:
    """高层Agent(选项层级)"""

    def __init__(self, options: List[str]):
        self.options = options
        # 学习选择哪个选项

    def select_option(self, state) -> str:
        """选择选项"""
        # 使用RL学习到的策略
        pass

class LowLevelAgent:
    """低层Agent(执行层级)"""

    def __init__(self):
        # 学习如何执行选定的选项
        pass

    def execute_option(self, option: str, state) -> List[Action]:
        """执行选项"""
        # 使用RL学习到的策略
        pass

7.2 多智能体RL

class MultiAgentEnvironment(Env):
    """多Agent环境"""

    def __init__(self, n_agents: int):
        self.n_agents = n_agents
        # 每个Agent的状态和动作空间

    def step(self, actions: List[Action]) -> tuple:
        """多Agent步骤"""
        # 处理多个Agent的动作
        # 返回每个Agent的reward
        pass

8. 实现示例

8.1 完整训练流程

"""
完整RL Agent训练流程

1. 创建环境
2. 定义奖励函数
3. 构建策略网络
4. 训练Agent
5. 评估Agent
6. 部署Agent
"""

if __name__ == "__main__":
    import numpy as np

    # 1. 创建环境
    print("=== 创建环境 ===")
    tools = ["search", "calculate", "get_weather", "file_read"]
    env = ToolSelectionEnvironment(tools)

    # 2. 定义奖励函数
    reward_fn = CompositeReward([
        SuccessReward(success_reward=1.0, failure_reward=-0.5),
    ])

    # 3. 训练Agent
    print("\n=== 训练Agent ===")
    agent = DQNAgent(
        env=env,
        state_dim=256,  # 工具数量
        action_dim=len(tools),
        lr=1e-3,
        gamma=0.99,
        epsilon=0.3
    )

    # 训练
    agent.train(num_episodes=500)

    # 4. 评估Agent
    print("\n=== 评估Agent ===")
    n_test_episodes = 100
    total_rewards = []

    for _ in range(n_test_episodes):
        state = env.reset()
        episode_reward = 0
        done = False

        while not done:
            state_array = np.random.randn(256)
            action = agent.select_action(state_array)
            action_obj = Action(
                type=ActionType.TOOL_CALL,
                tool_name=env.tools[action]
            )

            next_state, reward, done, info = env.step(action_obj)
            episode_reward += reward
            state = next_state

        total_rewards.append(episode_reward)

    avg_reward = np.mean(total_rewards)
    print(f"平均奖励: {avg_reward:.2f}")
    print(f"成功率: {sum(1 for r in total_rewards if r > 0) / len(total_rewards):.2%}")

    # 5. 保存模型
    print("\n=== 保存模型 ===")
    torch.save({
        "q_network_state_dict": agent.q_network.state_dict(),
        "target_network_state_dict": agent.target_network.state_dict()
    }, "./models/tool_selection_dqn.pth")

    print("=== 训练完成 ===")

面试高频问法

Q1: 强化学习如何应用到Agent中?

标准回答:

RL在Agent中的应用场景:

1. 对话策略学习
   - 学习何时结束对话
   - 学习如何引导用户
   - 环境:对话状态
   - 奖励:用户满意度、任务完成度

2. 工具选择优化
   - 学习最优的工具调用顺序
   - 环境:可用工具集
   - 奖励:工具成功率、任务相关性

3. Prompt优化
   - 学习最优的prompt参数
   - 环境:任务类型、复杂度
   - 奖励:输出质量、Token效率

4. 轨迹优化
   - 优化多轮对话轨迹
   - 环境:对话历史
   - 奖励:最终成功率、中间步骤质量

实现要点:
- 定义清晰的状态空间
- 设计合理的奖励函数
- 选择合适的算法(DQN、PPO等)
- 充足的探索(epsilon-greedy)

Q2: 如何设计Agent的奖励函数?

标准回答:

奖励函数设计原则:

1. 稀密性(Sparse Reward)
   - 只在关键事件给出奖励
   - 示例:任务完成+10,失败-10

2. 奖励塑形(Reward Shaping)
   - 引导Agent学习
   - 但不要过度塑造

3. 分解奖励
   reward = 基础奖励 + 效率奖励 + 惩罚

4. 常见奖励组件
   - 成功奖励:任务完成
   - 效率奖励:快速完成
   - 惩罚:不必要操作、过长响应
   - 质量奖励:输出质量

示例:
```python
def compute_reward(action, result, state):
    reward = 0

    # 成功奖励
    if result.get("success"):
        reward += 1

    # 效率奖励
    turns = len(state.history)
    reward += 0.3 if turns < 5 else -0.2

    # 惩罚
    if action.type == "tool_call" and is_redundant(action, state):
        reward -= 0.5

    # 目标达成
    if result.get("solved"):
        reward += 2

    return reward

```

Q3: 如何处理RL训练的不稳定性?

标准回答:

RL训练不稳定的原因和解决方案:

1. 奖励尺度问题
   - 使用奖励标准化
   - 使用归一化奖励(除以标准差)
   - 使用奖励裁剪(clipping)

2. 探索不足
   - 使用epsilon-greedy
   - 使用entropy regularization
   - 使用noisy net

3. 策略更新太快
   - 使用较小的learning rate
   - 使用target network(DQN)
   - 使用gradient clipping

4. 经验回放不足
   - 确保足够的buffer size
   - 使用优先经验回放(PER)
   - 使用重要性采样

5. 超参数调优
   - 调整gamma(折扣因子)
   - 调整batch size
   - 调整network architecture

推荐实践:
- 使用稳定的算法(PPO优于DQN)
- 充分的训练时间
- 监控reward曲线

总结

RL在Agent中的核心要点

要点 策略
环境设计 状态清晰、动作明确
奖励设计 稀密性、塑形、分解
算法选择 PPO稳定、DQN简单
训练技巧 探索、回放、target网络
部署推理 deterministic策略

最佳实践

  1. 环境隔离:确保环境可复现
  2. 奖励调试:可视化reward曲线
  3. 逐步训练:先简单任务再复杂
  4. 持续监控:训练中观察行为
  5. A/B测试:RL策略 vs 规则策略
close
arrow_upward