【动手学强化学习】番外8-IPPO应用框架学习与复现
文章目录
- 一、待解决问题
- 1.1 问题描述
- 1.2 解决方法
- 二、方法详述
- 2.1 必要说明
- (1)MAPPO 与 IPPO 算法的区别在于什么地方?
- (2)IPPO 算法应用框架主要参考来源
- 2.2 应用步骤
- 2.2.1 搭建基础环境
- 2.2.2 IPPO 算法实例复现
- (1)源码
- (2)Combat环境补充
- (3)代码结果
- 2.2.3 代码框架理解
- 三、疑问
- 四、总结
一、待解决问题
1.1 问题描述
在Combat环境中应用了MAPPO算法,在同样环境中学习并复现IPPO算法。
1.2 解决方法
(1)搭建基础环境。
(2)IPPO 算法实例复现。
(3)代码框架理解
二、方法详述
2.1 必要说明
(1)MAPPO 与 IPPO 算法的区别在于什么地方?
源文献链接:The Surprising Effectiveness of PPO in Cooperative, Multi-Agent Games
源文献原文如下:
为清楚起见,我们将具有 集中价值函数 输入的 PPO 称为 MAPPO (Multi-Agent PPO)。
将 策略和价值函数均具有本地输入 的 PPO 称为 IPPO (Independent PPO)。
总结而言,就是critic网络
不再是集中式的了。
因此,IPPO 相对于 MAPPO 可能会更加占用计算、存储资源,毕竟每个agent都会拥有各自的critic网络
。
(2)IPPO 算法应用框架主要参考来源
其一,《动手学强化学习》-chapter 20
其二,深度强化学习(7)多智能体强化学习IPPO、MADDPG
✅非常感谢大佬的分享!!!
2.2 应用步骤
2.2.1 搭建基础环境
这一步骤直接参考上一篇博客,【动手学强化学习】番外7-MAPPO应用框架2学习与复现
2.2.2 IPPO 算法实例复现
(1)源码
智能体对于policy的使用分为separated policy
与shared policy
,即每个agent拥有单独的policy net,所有agent共用一个policy net,二者在源码中都能够使用,对应位置取消注释即可。
与MAPPO
的不同就在于,每个agent拥有单独的value net。
import torch
import torch.nn.functional as F
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import sys
from ma_gym.envs.combat.combat import Combat# PPO算法class PolicyNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)self.fc3 = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc2(F.relu(self.fc1(x))))return F.softmax(self.fc3(x), dim=1)class ValueNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim):super(ValueNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)self.fc3 = torch.nn.Linear(hidden_dim, 1)def forward(self, x):x = F.relu(self.fc2(F.relu(self.fc1(x))))return self.fc3(x)def compute_advantage(gamma, lmbda, td_delta):td_delta = td_delta.detach().numpy()advantage_list = []advantage = 0.0for delta in td_delta[::-1]:advantage = gamma * lmbda * advantage + deltaadvantage_list.append(advantage)advantage_list.reverse()return torch.tensor(advantage_list, dtype=torch.float)# PPO,采用截断方式
class PPO:def __init__(self, state_dim, hidden_dim, action_dim,actor_lr, critic_lr, lmbda, eps, gamma, device):self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)self.critic = ValueNet(state_dim, hidden_dim).to(device)self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), actor_lr)self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), critic_lr)self.gamma = gammaself.lmbda = lmbdaself.eps = eps # PPO中截断范围的参数self.device = devicedef take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(self.device)probs = self.actor(state)action_dict = torch.distributions.Categorical(probs)action = action_dict.sample()return action.item()def update(self, transition_dict):states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)td_target = rewards + self.gamma * \self.critic(next_states) * (1 - dones)td_delta = td_target - self.critic(states)advantage = compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(self.device)old_log_probs = torch.log(self.actor(states).gather(1, actions)).detach()log_probs = torch.log(self.actor(states).gather(1, actions))ratio = torch.exp(log_probs - old_log_probs)surr1 = ratio * advantagesurr2 = torch.clamp(ratio, 1 - self.eps, 1 +self.eps) * advantage # 截断action_loss = torch.mean(-torch.min(surr1, surr2)) # PPO损失函数critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))self.actor_optimizer.zero_grad()self.critic_optimizer.zero_grad()action_loss.backward()critic_loss.backward()self.actor_optimizer.step()self.critic_optimizer.step()def show_lineplot(data, name):# 生成 x 轴的索引x = list(range(100))# 创建图形和坐标轴plt.figure(figsize=(20, 6))# 绘制折线图plt.plot(x, data, label=name,marker='o', linestyle='-', linewidth=2)# 添加标题和标签plt.title(name)plt.xlabel('Index')plt.ylabel('Value')plt.legend()# 显示图形plt.grid(True)plt.show()actor_lr = 3e-4
critic_lr = 1e-3
epochs = 10
episode_per_epoch = 1000
hidden_dim = 64
gamma = 0.99
lmbda = 0.97
eps = 0.2
team_size = 2 # 每个team里agent的数量
grid_size = (15, 15) # 二维空间的大小
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")# 创建环境
env = Combat(grid_shape=grid_size, n_agents=team_size, n_opponents=team_size)
state_dim = env.observation_space[0].shape[0]
action_dim = env.action_space[0].n# =============================================================================
# # 创建智能体(不参数共享:separated policy)
# agent1 = PPO(
# state_dim, hidden_dim, action_dim,
# actor_lr, critic_lr, lmbda, eps, gamma, device
# )
# agent2 = PPO(
# state_dim, hidden_dim, action_dim,
# actor_lr, critic_lr, lmbda, eps, gamma, device
# )
# =============================================================================# 创建智能体(参数共享:shared policy)
agent = PPO(state_dim, hidden_dim, action_dim,actor_lr, critic_lr, lmbda, eps, gamma, device
)win_list = []for e in range(epochs):with tqdm(total=episode_per_epoch, desc='Epoch %d' % e) as pbar:for episode in range(episode_per_epoch):# Replay buffer for agent1buffer_agent1 = {'states': [],'actions': [],'next_states': [],'rewards': [],'dones': []}# Replay buffer for agent2buffer_agent2 = {'states': [],'actions': [],'next_states': [],'rewards': [],'dones': []}# 重置环境s = env.reset()terminal = Falsewhile not terminal:# 采取动作(不进行参数共享)# a1 = agent1.take_action(s[0])# a2 = agent2.take_action(s[1])# 采取动作(进行参数共享)a1 = agent.take_action(s[0])a2 = agent.take_action(s[1])next_s, r, done, info = env.step([a1, a2])buffer_agent1['states'].append(s[0])buffer_agent1['actions'].append(a1)buffer_agent1['next_states'].append(next_s[0])# 如果获胜,获得100的奖励,否则获得0.1惩罚buffer_agent1['rewards'].append(r[0] + 100 if info['win'] else r[0] - 0.1)buffer_agent1['dones'].append(False)buffer_agent2['states'].append(s[1])buffer_agent2['actions'].append(a2)buffer_agent2['next_states'].append(next_s[1])buffer_agent2['rewards'].append(r[1] + 100 if info['win'] else r[1] - 0.1)buffer_agent2['dones'].append(False)s = next_s # 转移到下一个状态terminal = all(done)# 更新策略(不进行参数共享)# agent1.update(buffer_agent1)# agent2.update(buffer_agent2)# 更新策略(进行参数共享)agent.update(buffer_agent1)agent.update(buffer_agent2)win_list.append(1 if info['win'] else 0)if (episode + 1) % 100 == 0:pbar.set_postfix({'episode': '%d' % (episode_per_epoch * e + episode + 1),'winner prob': '%.3f' % np.mean(win_list[-100:]),'win count': '%d' % win_list[-100:].count(1)})pbar.update(1)win_array = np.array(win_list)
# 每100条轨迹取一次平均
win_array = np.mean(win_array.reshape(-1, 100), axis=1)# 创建 episode_list,每组 100 个回合的累计回合数
episode_list = np.arange(1, len(win_array) + 1) * 100
plt.plot(episode_list, win_array)
plt.xlabel('Episodes')
plt.ylabel('win rate')
plt.title('IPPO on Combat(shared policy)')
plt.show()
(2)Combat环境补充
这里还需要说明的是,由于在奖励设置过程中采用了win
(获胜),但是Combat
环境中step函数并没有返还该值
buffer_agent1['rewards'].append(r[0] + 100 if info['win'] else r[0] - 0.1)...
win_array = np.array(win_list)
...
因此需要在step()
函数中加入对win
(获胜)的判断,与return返回值
# 判断是否获胜win = Falseif all(self._agent_dones):if sum([v for k, v in self.opp_health.items()]) == 0:win = Trueelif sum([v for k, v in self.agent_health.items()]) == 0:win = Falseelse:win = None # 平局# 将获胜信息添加到 info 中info = {'health': self.agent_health, 'win': win, 'opp_health': self.opp_health, 'step_count': self._step_count}return self.get_agent_obs(), rewards, self._agent_dones, info
(3)代码结果
左图为separated policy
下运行结果,右图为shared policy
下运行结果。
明显可以看出,separated policy
有着更好的效果,但是代价就是在训练过程中会占用更多的资源。
2.2.3 代码框架理解
从2.2.2节源码
不难看出,IPPO
算法其实就是在PPO
算法上改为了多agent的环境,其中policy net,value net的更新原理并没有改变,因此代码框架的理解查看PPO
算法原理即可。
参考链接:【动手学强化学习】part8-PPO(Proximal Policy Optimization)近端策略优化算法
三、疑问
- 暂无
四、总结
IPPO
算法相对于MAPPO
算法会占用更多的资源,如果环境较为简单,可以采用该算法。如果环境比较复杂,建议先采用MAPPO
算法进行训练。