内容纲要
强化学习在Agent中的应用
目录
1. 强化学习概述
1.1 什么是强化学习
强化学习(Reinforcement Learning)是一种机器学习方法,Agent通过与环境交互学习最优策略。
┌─────────────────────────────────────────────┐
│ 强化学习循环 │
├─────────────────────────────────────────────┤
│ │
│ ┌─────────┐ │
│ │ Agent │ │
│ └────┬────┘ │
│ │ Action │
│ ▼ │
│ ┌─────────┐ ┌──────────┐ │
│ │Environment│────►│ Reward │ │
│ └────┬────┘ └──────────┘ │
│ │ State │
│ ▼ │
│ ┌──────────────────────────────────────┐ │
│ │ Policy (π): State → Action │ │
│ └──────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────┘
1.2 RL在Agent中的应用场景
| 场景 | 描述 | RL方法 |
|---|---|---|
| 任务规划 | 学习最优任务执行顺序 | Multi-agent RL |
| 对话策略 | 学习对话轮次和内容 | PPO、DQN |
| 工具选择 | 学习何时调用哪个工具 | Bandit、Contextual Bandit |
| 参数调优 | 学习提示词、温度等参数 | Bayesian Optimization |
| 轨迹优化 | 优化多轮对话轨迹 | Reward Shaping |
1.3 RL核心概念
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Tuple
from dataclasses import dataclass
from enum import Enum
class ActionType(Enum):
"""动作类型"""
TEXT_RESPONSE = "text_response"
TOOL_CALL = "tool_call"
END_CONVERSATION = "end_conversation"
@dataclass
class State:
"""状态"""
user_query: str
conversation_history: List[Dict]
available_tools: List[str]
context: Dict[str, Any]
@dataclass
class Action:
"""动作"""
type: ActionType
content: str
tool_name: str = None
tool_args: Dict = None
@dataclass
class Transition:
"""转移"""
state: State
action: Action
next_state: State
reward: float
done: bool
2. Agent环境设计
2.1 基础环境接口
from gym import Env, Space
import gym
from typing import Any, Dict, List
class AgentEnvironment(Env):
"""Agent强化学习环境"""
def __init__(
self,
max_turns: int = 10,
available_tools: List[str] = None
):
self.max_turns = max_turns
self.available_tools = available_tools or []
self.current_turn = 0
# 状态空间定义
self.observation_space = gym.spaces.Dict({
'user_query': gym.spaces.Text(max_length=1000),
'conversation_length': gym.spaces.Discrete(100),
'tool_availability': gym.spaces.MultiBinary(len(self.available_tools))
})
# 动作空间定义
self.action_space = gym.spaces.Dict({
'action_type': gym.spaces.Discrete(3), # 3种动作类型
'tool_index': gym.spaces.Discrete(len(self.available_tools) + 1), # +1 for no tool
'response_length': gym.spaces.Discrete(500)
})
def reset(self, seed=None) -> State:
"""重置环境"""
self.current_turn = 0
self.conversation_history = []
return State(
user_query="",
conversation_history=[],
available_tools=self.available_tools.copy(),
context={}
)
def step(self, action: Action) -> Tuple[State, float, bool, Dict]:
"""
执行一步
Returns:
(next_state, reward, done, info)
"""
# 执行动作
result = self._execute_action(action)
# 计算奖励
reward = self._calculate_reward(action, result)
# 检查是否结束
done = self._is_done(action, result)
# 更新状态
next_state = self._get_next_state(action, result)
# 更新对话历史
self.conversation_history.append({
"action": action,
"result": result
})
self.current_turn += 1
# 额外信息
info = {
"turn": self.current_turn,
"result": result
}
return next_state, reward, done, info
def _execute_action(self, action: Action) -> Dict:
"""执行动作"""
if action.type == ActionType.TOOL_CALL:
# 调用工具
return self._call_tool(action)
elif action.type == ActionType.TEXT_RESPONSE:
# 生成文本响应
return self._generate_response(action)
else:
# 结束对话
return {"status": "ended"}
def _call_tool(self, action: Action) -> Dict:
"""调用工具(模拟)"""
tool_name = action.tool_name
tool_args = action.tool_args or {}
# 简化实现:实际应调用真实工具
return {
"tool_name": tool_name,
"args": tool_args,
"result": f"Tool {tool_name} executed",
"success": True
}
def _generate_response(self, action: Action) -> Dict:
"""生成文本响应(模拟)"""
return {
"response": action.content,
"success": True
}
def _calculate_reward(self, action: Action, result: Dict) -> float:
"""计算奖励"""
reward = 0
# 动作成功奖励
if result.get("success"):
reward += 1
# 工具调用奖励
if action.type == ActionType.TOOL_CALL:
reward += 0.5
# 简洁性奖励
if action.type == ActionType.TEXT_RESPONSE:
length = len(action.content)
if 50 < length < 200:
reward += 0.3
elif length > 500:
reward -= 0.5
# 轮次惩罚
if self.current_turn > self.max_turns:
reward -= 1
return reward
def _is_done(self, action: Action, result: Dict) -> bool:
"""判断是否结束"""
# 明确结束
if action.type == ActionType.END_CONVERSATION:
return True
# 超过最大轮次
if self.current_turn >= self.max_turns:
return True
# 目标达成(示例:问题被解决)
if result.get("solved"):
return True
return False
def _get_next_state(self, action: Action, result: Dict) -> State:
"""获取下一状态"""
return State(
user_query=self._get_current_query(),
conversation_history=self.conversation_history.copy(),
available_tools=self.available_tools.copy(),
context={"last_result": result}
)
def _get_current_query(self) -> str:
"""获取当前查询"""
if self.conversation_history:
return self.conversation_history[0].get("query", "")
return ""
def render(self, mode='human'):
"""渲染环境(可选)"""
pass
2.2 对话环境
class ConversationEnvironment(AgentEnvironment):
"""对话环境"""
def __init__(
self,
llm_client,
user_simulator=None, # �拟用户
max_turns: int = 5
):
super().__init__(max_turns=max_turns)
self.llm = llm_client
self.user_simulator = user_simulator
self.current_query = ""
self.conversation_goal = ""
def set_goal(self, goal: str):
"""设置对话目标"""
self.conversation_goal = goal
def reset(self, seed=None) -> State:
"""重置环境"""
super().reset(seed)
# 初始化对话
if self.user_simulator:
self.current_query = self.user_simulator.get_initial_query()
else:
self.current_query = "开始对话"
return State(
user_query=self.current_query,
conversation_history=[],
available_tools=self.available_tools.copy(),
context={"goal": self.conversation_goal}
)
def _execute_action(self, action: Action) -> Dict:
"""执行对话动作"""
if action.type == ActionType.TOOL_CALL:
# 调用工具
tool_result = self._call_tool(action)
# 将工具结果添加到对话历史
self.conversation_history.append({
"role": "assistant",
"type": "tool_call",
"tool_name": action.tool_name,
"result": tool_result
})
return tool_result
elif action.type == ActionType.TEXT_RESPONSE:
# 获取对话上下文
messages = self._build_messages()
# 添加助手回复
messages.append({
"role": "assistant",
"content": action.content
})
# 获取用户反馈
if self.user_simulator:
user_feedback = self.user_simulator.respond(
action.content,
self.conversation_history
)
if user_feedback.get("satisfied"):
return {"status": "satisfied", "solved": True}
return user_feedback
return {"status": "continued"}
def _build_messages(self) -> List[Dict]:
"""构建LLM消息格式"""
messages = []
# 添加初始查询
if self.current_query:
messages.append({
"role": "user",
"content": self.current_query
})
# 添加对话历史
for turn in self.conversation_history:
if turn.get("type") == "tool_call":
messages.append({
"role": "assistant",
"content": f"Tool {turn['tool_name']} returned: {turn['result']}"
})
elif turn.get("role") == "user":
messages.append({
"role": "user",
"content": turn.get("content", "")
})
elif turn.get("role") == "assistant":
messages.append({
"role": "assistant",
"content": turn.get("content", "")
})
return messages
2.3 工具选择环境
import numpy as np
class ToolSelectionEnvironment(Env):
"""工具选择环境(简化为Bandit问题)"""
def __init__(self, tools: List[str]):
self.tools = tools
self.n_tools = len(tools)
# 每个工具的成功率和其他统计
self.tool_stats = {
tool: {
"successes": 0,
"total_calls": 0,
"average_reward": 0
}
for tool in tools
}
# 动作空间:选择工具
self.action_space = gym.spaces.Discrete(self.n_tools)
self.observation_space = gym.spaces.Box(
low=0, high=1, shape=(self.n_tools,), dtype=np.float32
)
def reset(self, seed=None) -> np.ndarray:
"""重置环境"""
return self._get_observation()
def step(self, action: int) -> Tuple[np.ndarray, float, bool, Dict]:
"""
执行工具选择
Args:
action: 工具索引
Returns:
(observation, reward, done, info)
"""
tool_name = self.tools[action]
# 模拟工具执行
success, reward = self._simulate_tool_call(tool_name)
# 更新统计
self.tool_stats[tool_name]["total_calls"] += 1
if success:
self.tool_stats[tool_name]["successes"] += 1
# 更新平均奖励
stats = self.tool_stats[tool_name]
n = stats["total_calls"]
stats["average_reward"] = (
(n - 1) * stats["average_reward"] + reward
) / n
# 获取观察
observation = self._get_observation()
info = {
"tool_name": tool_name,
"success": success,
"stats": self.tool_stats[tool_name].copy()
}
return observation, reward, False, info
def _simulate_tool_call(self, tool_name: str) -> Tuple[bool, float]:
"""模拟工具调用(示例)"""
# 根据工具类型决定成功率和奖励
tool_success_rates = {
"search": 0.8,
"calculate": 0.9,
"get_weather": 0.7,
"file_read": 0.85
}
success_rate = tool_success_rates.get(tool_name, 0.7)
success = np.random.random() < success_rate
if success:
reward = np.random.uniform(0.5, 1.0)
else:
reward = np.random.uniform(-1.0, -0.1)
return success, reward
def _get_observation(self) -> np.ndarray:
"""获取观察(工具成功率)"""
obs = np.zeros(self.n_tools, dtype=np.float32)
for i, tool in enumerate(self.tools):
stats = self.tool_stats[tool]
if stats["total_calls"] > 0:
obs[i] = stats["successes"] / stats["total_calls"]
else:
obs[i] = 0.5 # 初始探索
return obs
3. 奖励函数设计
3.1 奖励函数类型
from typing import Callable
class RewardFunction(ABC):
"""奖励函数基类"""
@abstractmethod
def compute(
self,
action: Action,
result: Dict,
state: State
) -> float:
"""计算奖励"""
pass
class SuccessReward(RewardFunction):
"""基于成功的奖励"""
def __init__(self, success_reward: float = 1.0, failure_reward: float = -1.0):
self.success_reward = success_reward
self.failure_reward = failure_reward
def compute(
self,
action: Action,
result: Dict,
state: State
) -> float:
if result.get("success", False):
return self.success_reward
return self.failure_reward
class ShapedReward(RewardFunction):
"""塑造奖励(奖励塑形)"""
def __init__(
self,
success_weight: float = 1.0,
efficiency_weight: float = 0.3,
penalty_weight: float = 0.5,
goal_achievement_weight: float = 2.0
):
self.success_weight = success_weight
self.efficiency_weight = efficiency_weight
self.penalty_weight = penalty_weight
self.goal_achievement_weight = goal_achievement_weight
def compute(
self,
action: Action,
result: Dict,
state: State
) -> float:
reward = 0
# 1. 基础成功奖励
if result.get("success"):
reward += self.success_weight
else:
reward -= self.success_weight * 0.5
# 2. 效率奖励(快速完成)
turns_used = len(state.conversation_history)
if turns_used < 5:
reward += self.efficiency_weight
elif turns_used > 10:
reward -= self.efficiency_weight * 0.5
# 3. 惩罚(不必要的操作)
if action.type == ActionType.TOOL_CALL:
# 检查是否重复调用同一工具
last_tool = self._get_last_tool(state)
if last_tool == action.tool_name:
reward -= self.penalty_weight
# 4. 目标达成奖励
if result.get("solved"):
reward += self.goal_achievement_weight
# 5. 惩罚(过长响应)
if action.type == ActionType.TEXT_RESPONSE:
length = len(action.content)
if length > 500:
reward -= self.penalty_weight * (length / 500)
return reward
def _get_last_tool(self, state: State) -> str:
"""获取上次调用的工具"""
if state.conversation_history:
last = state.conversation_history[-1]
if last.get("type") == "tool_call":
return last.get("tool_name")
return None
class UserFeedbackReward(RewardFunction):
"""基于用户反馈的奖励"""
def __init__(
self,
positive_reward: float = 1.0,
negative_reward: float = -1.0,
neutral_reward: float = 0.0
):
self.positive_reward = positive_reward
self.negative_reward = negative_reward
self.neutral_reward = neutral_reward
def compute(
self,
action: Action,
result: Dict,
state: State
) -> float:
feedback = result.get("user_feedback")
if feedback == "positive":
return self.positive_reward
elif feedback == "negative":
return self.negative_reward
elif feedback == "satisfied":
return self.positive_reward * 2
elif feedback == "unsatisfied":
return self.negative_reward * 2
else:
return self.neutral_reward
class CompositeReward(RewardFunction):
"""组合奖励函数"""
def __init__(self, reward_functions: List[RewardFunction], weights: List[float] = None):
self.reward_functions = reward_functions
if weights is None:
self.weights = [1.0] * len(reward_functions)
else:
self.weights = weights
assert len(self.weights) == len(reward_functions)
def compute(
self,
action: Action,
result: Dict,
state: State
) -> float:
total_reward = 0
for rf, weight in zip(self.reward_functions, self.weights):
reward = rf.compute(action, result, state)
total_reward += weight * reward
return total_reward
3.2 �励塑形技术
class RewardShaper:
"""奖励塑形器"""
@staticmethod
def potential_based_reward(
raw_reward: float,
gamma: float = 0.99
) -> float:
"""
基于势能的奖励塑形
可以加速训练,避免奖励尺度问题
"""
return raw_reward / (1 - gamma)
@staticmethod
def reward_clipping(
reward: float,
min_reward: float = -10.0,
max_reward: float = 10.0
) -> float:
"""奖励裁剪"""
return max(min_reward, min(reward, max_reward))
@staticmethod
def normalize_reward(
reward: float,
mean: float,
std: float
) -> float:
"""标准化奖励"""
if std == 0:
return 0
return (reward - mean) / std
4. 策略网络
4.1 简单策略网络
import torch
import torch.nn as nn
import torch.nn.functional as F
class AgentPolicyNetwork(nn.Module):
"""Agent策略网络"""
def __init__(
self,
state_dim: int,
action_dim: int,
hidden_dims: List[int] = [256, 128]
):
super().__init__()
# 构建隐藏层
layers = []
input_dim = state_dim
for hidden_dim in hidden_dims:
layers.append(nn.Linear(input_dim, hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Dropout(0.2))
input_dim = hidden_dim
self.hidden_layers = nn.Sequential(*layers)
# 输出层
self.output_layer = nn.Linear(input_dim, action_dim)
def forward(self, state: torch.Tensor) -> torch.Tensor:
"""前向传播"""
x = self.hidden_layers(state)
action_logits = self.output_layer(x)
return action_logits
def get_action(self, state: np.ndarray, deterministic: bool = False) -> int:
"""获取动作"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
with torch.no_grad():
logits = self.forward(state_tensor)
probs = F.softmax(logits, dim=-1)
if deterministic:
action = probs.argmax(dim=-1).item()
else:
action = torch.multinomial(probs, 1).item()
return action
4.2 Actor-Critic网络
class ActorCriticNetwork(nn.Module):
"""Actor-Critic网络"""
def __init__(
self,
state_dim: int,
action_dim: int,
hidden_dim: int = 256
):
super().__init__()
# 共享特征提取器
self.feature_extractor = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2)
)
# Actor网络(输出动作概率)
self.actor = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Linear(hidden_dim // 2:, action_dim),
nn.Softmax(dim=-1)
)
# Critic网络(输出价值)
self.critic = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Linear(hidden_dim // 2:, 1)
)
def forward(self, state: torch.Tensor) -> tuple:
"""前向传播"""
features = self.feature_extractor(state)
action_probs = self.actor(features)
value = self.critic(features)
return action_probs, value.squeeze(-1)
5. 训练流程
5.1 PPO训练
class PPOAgent:
"""PPO Agent"""
def __init__(
self,
env: AgentEnvironment,
state_dim: int,
action_dim: int,
lr: float = 3e-4,
gamma: float = 0.99,
clip_eps: float = 0.2
):
self.env = env
self.gamma = gamma
self.clip_eps = clip_eps
# 策略网络
self.policy = ActorCriticNetwork(state_dim, action_dim)
# 优化器
self.optimizer = torch.optim.Adam(
self.policy.parameters(),
lr=lr
)
# 存储经验
self.episode_rewards = []
def collect_trajectories(
self,
n_episodes: int,
max_steps: int = 1000
) -> List[Dict]:
"""收集轨迹经验"""
trajectories = []
for episode in range(n_episodes):
state = self.env.reset()
episode_reward = 0
done = False
steps = 0
states = []
actions = []
rewards = []
log_probs = []
values = []
while not done and steps < max_steps:
# 获取动作
state_array = self._state_to_array(state)
action = self.policy.get_action(state_array)
# 执行动作
next_state, reward, done, info = self.env.step(Action(
type=ActionType.TOOL_CALL,
tool_name=self.env.available_tools[action]
))
# 记录经验
states.append(state)
actions.append(action)
rewards.append(reward)
episode_reward += reward
state = next_state
steps += 1
self.episode_rewards.append(episode_reward)
return trajectories
def update(self, trajectories: List[Dict], n_epochs: int = 10):
"""更新策略"""
for epoch in range(n_epochs):
epoch_loss = 0
for trajectory in trajectories:
# 计算优势函数
advantages = self._compute_advantages(trajectory["rewards"])
returns = self._compute_returns(trajectory["rewards"])
# PPO更新
loss = self._ppo_update(
trajectory["states"],
trajectory["actions"],
trajectory["log_probs"],
advantages,
returns
)
epoch_loss += loss
if epoch % 1 == 0:
print(f"Epoch {epoch}, Loss: {epoch_loss:.4f}")
def _compute_advantages(self, rewards: List[float]) -> List[float]:
"""计算优势函数"""
advantages = []
discounted_sum = 0
for t in reversed(range(len(rewards))):
discounted_sum = discounted_sum * self.gamma + rewards[t]
advantages.append(discounted_sum)
return list(reversed(advantages))
def _compute_returns(self, rewards: List[float]) -> List[float]:
"""计算回报"""
returns = []
discounted_sum = 0
for reward in reversed(rewards):
discounted_sum = discounted_sum * self.gamma + reward
returns.append(discounted_sum)
return list(reversed(returns))
def _ppo_update(
self,
states,
actions,
old_log_probs,
advantages,
returns
) -> float:
"""PPO更新步骤(简化)"""
# 转换为tensor
states_tensor = torch.stack([
torch.FloatTensor(self._state_to_array(s))
for s in states
])
actions_tensor = torch.LongTensor(actions)
old_log_probs_tensor = torch.FloatTensor(old_log_probs)
advantages_tensor = torch.FloatTensor(advantages)
returns_tensor = torch.FloatTensor(returns)
# 前向传播
action_probs, values = self.policy(states_tensor)
# 计算新log prob
new_log_probs = torch.log(action_probs.gather(1, actions_tensor.unsqueeze(-1)).squeeze(-1))
# 计算ratio
ratio = torch.exp(new_log_probs - old_log_probs_tensor)
# 计算surrogate loss
surrogate1 = ratio * advantages_tensor
surrogate2 = torch.clamp(ratio, 1 - self.clip_eps, 1 + self.clip_eps) * advantages_tensor
policy_loss = -torch.min(surrogate1, surrogate2).mean()
# 计算价值损失
value_loss = F.mse_loss(values.squeeze(-1), returns_tensor)
# 总损失
total_loss = policy_loss + 0.5 * value_loss
# 反向传播
self.optimizer.zero_grad()
total_loss.backward()
self.optimizer.step()
return total_loss.item()
def _state_to_array(state: State) -> np.ndarray:
"""将状态转换为数组"""
# 简化实现
return np.random.randn(256)
def train(self, total_timesteps: int = 10000):
"""训练"""
print("=== 开始PPO训练 ===")
collected_timesteps = 0
episode_num = 0
while collected_timesteps < total_timesteps:
# 收集轨迹
trajectories = self.collect_trajectories(n_episodes=10)
# 更新策略
self.update(trajectories)
collected_timesteps += sum(len(t["rewards"]) for t in trajectories)
episode_num += 1
# 打印进度
avg_reward = np.mean(self.episode_rewards[-10:])
print(f"Episode {episode_num}, Avg Reward: {avg_reward:.2f}")
print("=== 训练完成 ===")
5.2 DQN训练
class DQNAgent:
"""DQN Agent"""
def __init__(
self,
env: AgentEnvironment,
state_dim: int,
action_dim: int,
lr: float = 1e-3,
gamma: float = 0.99,
epsilon: float = 0.1,
buffer_size: int = 10000,
batch_size: int = 32
):
self.env = env
self.state_dim = state_dim
self.action_dim = action_dim
self.gamma = gamma
self.epsilon = epsilon
self.batch_size = batch_size
# Q网络
self.q_network = AgentPolicyNetwork(state_dim, action_dim)
self.target_network = AgentPolicyNetwork(state_dim, action_dim)
self.target_network.load_state_dict(self.q_network.state_dict())
# 经验回放
self.replay_buffer = []
self.buffer_size = buffer_size
# 优化器
self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=lr)
def select_action(self, state: np.ndarray) -> int:
"""选择动作(epsilon-greedy)"""
if np.random.random() < self.epsilon:
return np.random.randint(0, self.action_dim)
else:
return self.q_network.get_action(state, deterministic=True)
def add_experience(
self,
state: np.ndarray,
action: int,
reward: float,
next_state: np.ndarray,
done: bool
):
"""添加经验"""
self.replay_buffer.append({
"state": state,
"action": action,
"reward": reward,
"next_state": next_state,
"done": done
})
# 限制buffer大小
if len(self.replay_buffer) > self.buffer_size:
self.replay_buffer.pop(0)
def train_step(self):
"""训练一步"""
if len(self.replay_buffer) < self.batch_size:
return
# 随机采样
batch = np.random.choice(self.replay_buffer, self.batch_size, replace=False)
states = torch.FloatTensor([b["state"] for b in batch])
actions = torch.LongTensor([b["action"] for b in batch])
rewards = torch.FloatTensor([b["reward"] for b in batch])
next_states = torch.FloatTensor([b["next_state"] for b in batch])
dones = torch.FloatTensor([b["done"] for b in batch])
# 当前Q值
current_q = self.q_network(states).gather(1, actions.unsqueeze(-1)).squeeze(-1)
# 目标Q值
next_q_values = self.target_network(next_states).max(dim=-1)[0]
target_q = rewards + self.gamma * next_q_values * (1 - dones)
# 计算loss
loss = F.mse_loss(current_q, target_q.detach())
# 反向传播
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
def update_target_network(self):
"""更新目标网络"""
self.target_network.load_state_dict(self.q_network.state_dict())
def train(self, num_episodes: int = 1000):
"""训练"""
print("=== 开始DQN训练 ===")
for episode in range(num_episodes):
state = self.env.reset()
episode_reward = 0
done = False
while not done:
# 选择动作
state_array = self._state_to_array(state)
action = self.select_action(state_array)
# 执行动作
next_state, reward, done, info = self.env.step(Action(
type=ActionType.TOOL_CALL,
tool_name=self.env.available_tools[action]
))
# 存储经验
next_state_array = self._state_to_array(next_state)
self.add_experience(state_array, action, reward, next_state_array, done)
# 训练
loss = self.train_step()
episode_reward += reward
state = next_state
# 定期更新目标网络
if episode % 10 == 0:
self.update_target_network()
# 衰减epsilon
self.epsilon = max(0.01, self.epsilon * 0.995)
if episode % 100 == 0:
print(f"Episode {episode}, Reward: {episode_reward:.2f}, Epsilon: {self.epsilon:.3f}")
print("=== 训练完成 ===")
def _state_to_array(self, state: State) -> np.ndarray:
"""将状态转换为数组"""
return np.random.randn(self.state_dim)
6. 推理与部署
6.1 训练后的策略
class TrainedAgent:
"""训练后的Agent"""
def __init__(
self,
policy_network: ActorCriticNetwork,
env: AgentEnvironment
):
self.policy = policy_network
self.env = env
self.policy.eval()
def run(self, state: State, max_steps: int = 10) -> List[Action]:
"""运行策略"""
actions = []
current_state = state
for _ in range(max_steps):
# 选择动作
state_array = self._state_to_array(current_state)
action = self._get_action(state_array)
actions.append(action)
# 执行动作
next_state, reward, done, info = self.env.step(action)
if done:
break
current_state = next_state
return actions
def _get_action(self, state: np.ndarray) -> Action:
"""获取动作(deterministic)"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
with torch.no_grad():
action_probs, _ = self.policy(state_tensor)
action = action_probs.argmax(dim=-1).item()
return Action(
type=ActionType.TOOL_CALL,
tool_name=self.env.available_tools[action]
)
def _state_to_array(self, state: State) -> np.ndarray:
return np.random.randn(256)
def save(self, path: str):
"""保存策略"""
torch.save({
"policy_state_dict": self.policy.state_dict(),
"model_config": {
"state_dim": self.policy.feature_extractor[0].in_features,
"action_dim": self.policy.actor[-1].out_features
}
}, path)
@classmethod
def load(cls, path: str, env: AgentEnvironment) -> 'TrainedAgent':
"""加载策略"""
checkpoint = torch.load(path)
model_config = checkpoint["model_config"]
# 重建网络
policy = ActorCriticNetwork(
model_config["state_dim"],
model_config["action_dim"]
)
policy.load_state_dict(checkpoint["policy_state_dict"])
return cls(policy, env)
7. 高级技术
7.1 分层强化学习(HRL)
class HighLevelAgent:
"""高层Agent(选项层级)"""
def __init__(self, options: List[str]):
self.options = options
# 学习选择哪个选项
def select_option(self, state) -> str:
"""选择选项"""
# 使用RL学习到的策略
pass
class LowLevelAgent:
"""低层Agent(执行层级)"""
def __init__(self):
# 学习如何执行选定的选项
pass
def execute_option(self, option: str, state) -> List[Action]:
"""执行选项"""
# 使用RL学习到的策略
pass
7.2 多智能体RL
class MultiAgentEnvironment(Env):
"""多Agent环境"""
def __init__(self, n_agents: int):
self.n_agents = n_agents
# 每个Agent的状态和动作空间
def step(self, actions: List[Action]) -> tuple:
"""多Agent步骤"""
# 处理多个Agent的动作
# 返回每个Agent的reward
pass
8. 实现示例
8.1 完整训练流程
"""
完整RL Agent训练流程
1. 创建环境
2. 定义奖励函数
3. 构建策略网络
4. 训练Agent
5. 评估Agent
6. 部署Agent
"""
if __name__ == "__main__":
import numpy as np
# 1. 创建环境
print("=== 创建环境 ===")
tools = ["search", "calculate", "get_weather", "file_read"]
env = ToolSelectionEnvironment(tools)
# 2. 定义奖励函数
reward_fn = CompositeReward([
SuccessReward(success_reward=1.0, failure_reward=-0.5),
])
# 3. 训练Agent
print("\n=== 训练Agent ===")
agent = DQNAgent(
env=env,
state_dim=256, # 工具数量
action_dim=len(tools),
lr=1e-3,
gamma=0.99,
epsilon=0.3
)
# 训练
agent.train(num_episodes=500)
# 4. 评估Agent
print("\n=== 评估Agent ===")
n_test_episodes = 100
total_rewards = []
for _ in range(n_test_episodes):
state = env.reset()
episode_reward = 0
done = False
while not done:
state_array = np.random.randn(256)
action = agent.select_action(state_array)
action_obj = Action(
type=ActionType.TOOL_CALL,
tool_name=env.tools[action]
)
next_state, reward, done, info = env.step(action_obj)
episode_reward += reward
state = next_state
total_rewards.append(episode_reward)
avg_reward = np.mean(total_rewards)
print(f"平均奖励: {avg_reward:.2f}")
print(f"成功率: {sum(1 for r in total_rewards if r > 0) / len(total_rewards):.2%}")
# 5. 保存模型
print("\n=== 保存模型 ===")
torch.save({
"q_network_state_dict": agent.q_network.state_dict(),
"target_network_state_dict": agent.target_network.state_dict()
}, "./models/tool_selection_dqn.pth")
print("=== 训练完成 ===")
面试高频问法
Q1: 强化学习如何应用到Agent中?
标准回答:
RL在Agent中的应用场景:
1. 对话策略学习
- 学习何时结束对话
- 学习如何引导用户
- 环境:对话状态
- 奖励:用户满意度、任务完成度
2. 工具选择优化
- 学习最优的工具调用顺序
- 环境:可用工具集
- 奖励:工具成功率、任务相关性
3. Prompt优化
- 学习最优的prompt参数
- 环境:任务类型、复杂度
- 奖励:输出质量、Token效率
4. 轨迹优化
- 优化多轮对话轨迹
- 环境:对话历史
- 奖励:最终成功率、中间步骤质量
实现要点:
- 定义清晰的状态空间
- 设计合理的奖励函数
- 选择合适的算法(DQN、PPO等)
- 充足的探索(epsilon-greedy)
Q2: 如何设计Agent的奖励函数?
标准回答:
奖励函数设计原则:
1. 稀密性(Sparse Reward)
- 只在关键事件给出奖励
- 示例:任务完成+10,失败-10
2. 奖励塑形(Reward Shaping)
- 引导Agent学习
- 但不要过度塑造
3. 分解奖励
reward = 基础奖励 + 效率奖励 + 惩罚
4. 常见奖励组件
- 成功奖励:任务完成
- 效率奖励:快速完成
- 惩罚:不必要操作、过长响应
- 质量奖励:输出质量
示例:
```python
def compute_reward(action, result, state):
reward = 0
# 成功奖励
if result.get("success"):
reward += 1
# 效率奖励
turns = len(state.history)
reward += 0.3 if turns < 5 else -0.2
# 惩罚
if action.type == "tool_call" and is_redundant(action, state):
reward -= 0.5
# 目标达成
if result.get("solved"):
reward += 2
return reward
```
Q3: 如何处理RL训练的不稳定性?
标准回答:
RL训练不稳定的原因和解决方案:
1. 奖励尺度问题
- 使用奖励标准化
- 使用归一化奖励(除以标准差)
- 使用奖励裁剪(clipping)
2. 探索不足
- 使用epsilon-greedy
- 使用entropy regularization
- 使用noisy net
3. 策略更新太快
- 使用较小的learning rate
- 使用target network(DQN)
- 使用gradient clipping
4. 经验回放不足
- 确保足够的buffer size
- 使用优先经验回放(PER)
- 使用重要性采样
5. 超参数调优
- 调整gamma(折扣因子)
- 调整batch size
- 调整network architecture
推荐实践:
- 使用稳定的算法(PPO优于DQN)
- 充分的训练时间
- 监控reward曲线
总结
RL在Agent中的核心要点
| 要点 | 策略 |
|---|---|
| 环境设计 | 状态清晰、动作明确 |
| 奖励设计 | 稀密性、塑形、分解 |
| 算法选择 | PPO稳定、DQN简单 |
| 训练技巧 | 探索、回放、target网络 |
| 部署推理 | deterministic策略 |
最佳实践
- 环境隔离:确保环境可复现
- 奖励调试:可视化reward曲线
- 逐步训练:先简单任务再复杂
- 持续监控:训练中观察行为
- A/B测试:RL策略 vs 规则策略