注意
轉到末尾 下載完整的示例程式碼。
訓練一個玩馬里奧的強化學習代理#
建立日期:2020 年 12 月 17 日 | 最後更新:2024 年 2 月 5 日 | 最後驗證:未驗證
作者: Yuansong Feng, Suraj Subramanian, Howard Wang, Steven Guo。
本教程將帶您瞭解深度強化學習的基礎知識。最後,您將實現一個能夠自主玩遊戲的、由 AI 驅動的馬里奧(使用 雙深度 Q 網路)。
雖然本教程不需要任何先驗的 RL 知識,但您可以熟悉這些 RL 概念,並準備好這份 備忘單 作為您的助手。完整的程式碼可在 此處 獲取。
%%bash
pip install gym-super-mario-bros==7.4.0
pip install tensordict==0.3.0
pip install torchrl==0.3.0
import torch
from torch import nn
from torchvision import transforms as T
from PIL import Image
import numpy as np
from pathlib import Path
from collections import deque
import random, datetime, os
# Gym is an OpenAI toolkit for RL
import gym
from gym.spaces import Box
from gym.wrappers import FrameStack
# NES Emulator for OpenAI Gym
from nes_py.wrappers import JoypadSpace
# Super Mario environment for OpenAI Gym
import gym_super_mario_bros
from tensordict import TensorDict
from torchrl.data import TensorDictReplayBuffer, LazyMemmapStorage
Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
Users of this version of Gym should be able to simply replace 'import gym' with 'import gymnasium as gym' in the vast majority of cases.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.
/usr/local/lib/python3.10/dist-packages/torchrl/__init__.py:43: UserWarning:
failed to set start method to spawn, and current start method for mp is fork.
RL 定義#
環境:代理與之互動並從中學習的世界。
動作 \(a\) :代理如何響應環境。所有可能動作的集合稱為*動作空間*。
狀態 \(s\) :環境的當前特徵。環境可能處於的所有可能狀態的集合稱為*狀態空間*。
獎勵 \(r\) :獎勵是環境給代理的關鍵反饋。它驅動代理學習並改變其未來的動作。多個時間步的獎勵總和稱為**回報**。
最優動作-值函式 \(Q^*(s,a)\) :給出從狀態 \(s\) 開始,採取任意動作 \(a\),然後在每個未來時間步採取使回報最大化的動作時,預期的回報。可以說 \(Q\) 代表了狀態下動作的“質量”。我們嘗試逼近這個函式。
環境#
初始化環境#
在馬里奧遊戲中,環境由管道、蘑菇和其他元件組成。
當馬里奧執行一個動作時,環境會響應以改變後的(下一個)狀態、獎勵和其他資訊。
# Initialize Super Mario environment (in v0.26 change render mode to 'human' to see results on the screen)
if gym.__version__ < '0.26':
env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0", new_step_api=True)
else:
env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0", render_mode='rgb', apply_api_compatibility=True)
# Limit the action-space to
# 0. walk right
# 1. jump right
env = JoypadSpace(env, [["right"], ["right", "A"]])
env.reset()
next_state, reward, done, trunc, info = env.step(action=0)
print(f"{next_state.shape},\n {reward},\n {done},\n {info}")
/usr/local/lib/python3.10/dist-packages/gym/envs/registration.py:555: UserWarning:
WARN: The environment SuperMarioBros-1-1-v0 is out of date. You should consider upgrading to version `v3`.
/usr/local/lib/python3.10/dist-packages/gym/envs/registration.py:627: UserWarning:
WARN: The environment creator metadata doesn't include `render_modes`, contains: ['render.modes', 'video.frames_per_second']
/usr/local/lib/python3.10/dist-packages/gym/utils/passive_env_checker.py:233: DeprecationWarning:
`np.bool8` is a deprecated alias for `np.bool_`. (Deprecated NumPy 1.24)
(240, 256, 3),
0.0,
False,
{'coins': 0, 'flag_get': False, 'life': 2, 'score': 0, 'stage': 1, 'status': 'small', 'time': 400, 'world': 1, 'x_pos': 40, 'y_pos': 79}
預處理環境#
環境資料以 next_state 的形式返回給代理。如上所示,每個狀態由一個大小為 [3, 240, 256] 的陣列表示。通常,這比代理所需的資訊多;例如,馬里奧的動作不取決於管道或天空的顏色!
我們使用**包裝器 (Wrappers)** 在將環境資料傳送給代理之前對其進行預處理。
GrayScaleObservation 是一個常見的包裝器,用於將 RGB 影像轉換為灰度影像;這樣做可以減小狀態表示的大小,同時不丟失有用資訊。現在每個狀態的大小為:[1, 240, 256]。
ResizeObservation 將每個觀察值縮小為方形影像。新大小:[1, 84, 84]。
SkipFrame 是一個自定義包裝器,它繼承自 gym.Wrapper 並實現了 step() 函式。由於連續幀的變化不大,我們可以跳過 n 幀而不丟失太多資訊。第 n 幀聚合了每跳過幀累積的獎勵。
FrameStack 是一個包裝器,它允許我們將環境的連續幀壓縮成一個單獨的觀察點,以饋送給我們的學習模型。這樣,我們可以根據馬里奧在過去幾幀中的移動方向來判斷他是在著陸還是在跳躍。
class SkipFrame(gym.Wrapper):
def __init__(self, env, skip):
"""Return only every `skip`-th frame"""
super().__init__(env)
self._skip = skip
def step(self, action):
"""Repeat action, and sum reward"""
total_reward = 0.0
for i in range(self._skip):
# Accumulate reward and repeat the same action
obs, reward, done, trunk, info = self.env.step(action)
total_reward += reward
if done:
break
return obs, total_reward, done, trunk, info
class GrayScaleObservation(gym.ObservationWrapper):
def __init__(self, env):
super().__init__(env)
obs_shape = self.observation_space.shape[:2]
self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)
def permute_orientation(self, observation):
# permute [H, W, C] array to [C, H, W] tensor
observation = np.transpose(observation, (2, 0, 1))
observation = torch.tensor(observation.copy(), dtype=torch.float)
return observation
def observation(self, observation):
observation = self.permute_orientation(observation)
transform = T.Grayscale()
observation = transform(observation)
return observation
class ResizeObservation(gym.ObservationWrapper):
def __init__(self, env, shape):
super().__init__(env)
if isinstance(shape, int):
self.shape = (shape, shape)
else:
self.shape = tuple(shape)
obs_shape = self.shape + self.observation_space.shape[2:]
self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)
def observation(self, observation):
transforms = T.Compose(
[T.Resize(self.shape, antialias=True), T.Normalize(0, 255)]
)
observation = transforms(observation).squeeze(0)
return observation
# Apply Wrappers to environment
env = SkipFrame(env, skip=4)
env = GrayScaleObservation(env)
env = ResizeObservation(env, shape=84)
if gym.__version__ < '0.26':
env = FrameStack(env, num_stack=4, new_step_api=True)
else:
env = FrameStack(env, num_stack=4)
將上述包裝器應用於環境後,最終包裝好的狀態由 4 個堆疊在一起的連續灰度幀組成,如左側影像所示。每次馬里奧執行一個動作,環境都會響應一個具有此結構的狀態。該結構由一個大小為 [4, 84, 84] 的 3D 陣列表示。
代理#
我們建立一個名為 Mario 的類來表示我們在遊戲中的代理。馬里奧應該能夠
根據當前環境狀態,基於最優動作策略**行動**(**Act**)。
**記住**經驗。經驗 =(當前狀態,當前動作,獎勵,下一個狀態)。馬里奧*快取*並稍後*回憶*他的經驗來更新他的動作策略。
隨著時間的推移**學習**一個更好的動作策略。
class Mario:
def __init__():
pass
def act(self, state):
"""Given a state, choose an epsilon-greedy action"""
pass
def cache(self, experience):
"""Add the experience to memory"""
pass
def recall(self):
"""Sample experiences from memory"""
pass
def learn(self):
"""Update online action value (Q) function with a batch of experiences"""
pass
在接下來的部分,我們將填充馬里奧的引數並定義他的函式。
行動#
對於任何給定的狀態,代理可以選擇執行最優動作(**利用**)或隨機動作(**探索**)。
馬里奧以 self.exploration_rate 的機率隨機探索;當他選擇利用時,他依賴 MarioNet(在 學習 部分實現)來提供最優動作。
class Mario:
def __init__(self, state_dim, action_dim, save_dir):
self.state_dim = state_dim
self.action_dim = action_dim
self.save_dir = save_dir
self.device = "cuda" if torch.cuda.is_available() else "cpu"
# Mario's DNN to predict the most optimal action - we implement this in the Learn section
self.net = MarioNet(self.state_dim, self.action_dim).float()
self.net = self.net.to(device=self.device)
self.exploration_rate = 1
self.exploration_rate_decay = 0.99999975
self.exploration_rate_min = 0.1
self.curr_step = 0
self.save_every = 5e5 # no. of experiences between saving Mario Net
def act(self, state):
"""
Given a state, choose an epsilon-greedy action and update value of step.
Inputs:
state(``LazyFrame``): A single observation of the current state, dimension is (state_dim)
Outputs:
``action_idx`` (``int``): An integer representing which action Mario will perform
"""
# EXPLORE
if np.random.rand() < self.exploration_rate:
action_idx = np.random.randint(self.action_dim)
# EXPLOIT
else:
state = state[0].__array__() if isinstance(state, tuple) else state.__array__()
state = torch.tensor(state, device=self.device).unsqueeze(0)
action_values = self.net(state, model="online")
action_idx = torch.argmax(action_values, axis=1).item()
# decrease exploration_rate
self.exploration_rate *= self.exploration_rate_decay
self.exploration_rate = max(self.exploration_rate_min, self.exploration_rate)
# increment step
self.curr_step += 1
return action_idx
快取和回憶#
這兩個函式充當了馬里奧的“記憶”過程。
cache():每次馬里奧執行一個動作時,他都會將*經驗*儲存到他的記憶中。他的經驗包括當前的*狀態*,執行的*動作*,該動作帶來的*獎勵*,*下一個狀態*,以及遊戲是否*結束*。
recall():馬里奧從他的記憶中隨機取樣一批經驗,並使用這些經驗來學習遊戲。
class Mario(Mario): # subclassing for continuity
def __init__(self, state_dim, action_dim, save_dir):
super().__init__(state_dim, action_dim, save_dir)
self.memory = TensorDictReplayBuffer(storage=LazyMemmapStorage(100000, device=torch.device("cpu")))
self.batch_size = 32
def cache(self, state, next_state, action, reward, done):
"""
Store the experience to self.memory (replay buffer)
Inputs:
state (``LazyFrame``),
next_state (``LazyFrame``),
action (``int``),
reward (``float``),
done(``bool``))
"""
def first_if_tuple(x):
return x[0] if isinstance(x, tuple) else x
state = first_if_tuple(state).__array__()
next_state = first_if_tuple(next_state).__array__()
state = torch.tensor(state)
next_state = torch.tensor(next_state)
action = torch.tensor([action])
reward = torch.tensor([reward])
done = torch.tensor([done])
# self.memory.append((state, next_state, action, reward, done,))
self.memory.add(TensorDict({"state": state, "next_state": next_state, "action": action, "reward": reward, "done": done}, batch_size=[]))
def recall(self):
"""
Retrieve a batch of experiences from memory
"""
batch = self.memory.sample(self.batch_size).to(self.device)
state, next_state, action, reward, done = (batch.get(key) for key in ("state", "next_state", "action", "reward", "done"))
return state, next_state, action.squeeze(), reward.squeeze(), done.squeeze()
學習#
馬里奧在後臺使用 DDQN 演算法。DDQN 使用兩個卷積神經網路 - \(Q_{online}\) 和 \(Q_{target}\) - 來獨立逼近最優動作-值函式。
在我們的實現中,我們在 \(Q_{online}\) 和 \(Q_{target}\) 之間共享特徵生成器 features,但為每個網路維護單獨的全連線分類器。\(\theta_{target}\)(\(Q_{target}\) 的引數)被凍結以防止透過反向傳播進行更新。相反,它會定期與 \(\theta_{online}\) 同步(稍後會詳細介紹)。
神經網路#
class MarioNet(nn.Module):
"""mini CNN structure
input -> (conv2d + relu) x 3 -> flatten -> (dense + relu) x 2 -> output
"""
def __init__(self, input_dim, output_dim):
super().__init__()
c, h, w = input_dim
if h != 84:
raise ValueError(f"Expecting input height: 84, got: {h}")
if w != 84:
raise ValueError(f"Expecting input width: 84, got: {w}")
self.online = self.__build_cnn(c, output_dim)
self.target = self.__build_cnn(c, output_dim)
self.target.load_state_dict(self.online.state_dict())
# Q_target parameters are frozen.
for p in self.target.parameters():
p.requires_grad = False
def forward(self, input, model):
if model == "online":
return self.online(input)
elif model == "target":
return self.target(input)
def __build_cnn(self, c, output_dim):
return nn.Sequential(
nn.Conv2d(in_channels=c, out_channels=32, kernel_size=8, stride=4),
nn.ReLU(),
nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2),
nn.ReLU(),
nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),
nn.ReLU(),
nn.Flatten(),
nn.Linear(3136, 512),
nn.ReLU(),
nn.Linear(512, output_dim),
)
TD 估計 & TD 目標#
學習涉及兩個值:
TD 估計 - 給定狀態 \(s\) 的預測最優 \(Q^*\)。
TD 目標 - 當前獎勵與下一個狀態 \(s'\) 中的估計 \(Q^*\) 的聚合。
由於我們不知道下一個動作 \(a'\) 是什麼,我們使用最大化下一個狀態 \(s'\) 中 \(Q_{online}\) 的動作 \(a'\)。
請注意,我們在 td_target() 上使用了 @torch.no_grad() 裝飾器來停用此處的梯度計算(因為我們不需要對 \(\theta_{target}\) 進行反向傳播)。
class Mario(Mario):
def __init__(self, state_dim, action_dim, save_dir):
super().__init__(state_dim, action_dim, save_dir)
self.gamma = 0.9
def td_estimate(self, state, action):
current_Q = self.net(state, model="online")[
np.arange(0, self.batch_size), action
] # Q_online(s,a)
return current_Q
@torch.no_grad()
def td_target(self, reward, next_state, done):
next_state_Q = self.net(next_state, model="online")
best_action = torch.argmax(next_state_Q, axis=1)
next_Q = self.net(next_state, model="target")[
np.arange(0, self.batch_size), best_action
]
return (reward + (1 - done.float()) * self.gamma * next_Q).float()
更新模型#
當馬里奧從其重放緩衝區取樣輸入時,我們計算 \(TD_t\) 和 \(TD_e\),並將此損失反向傳播到 \(Q_{online}\) 以更新其引數 \(\theta_{online}\)(\(\alpha\) 是傳遞給 optimizer 的學習率 lr)。
\(\theta_{target}\) 不會透過反向傳播進行更新。取而代之的是,我們定期將 \(\theta_{online}\) 複製到 \(\theta_{target}\)。
class Mario(Mario):
def __init__(self, state_dim, action_dim, save_dir):
super().__init__(state_dim, action_dim, save_dir)
self.optimizer = torch.optim.Adam(self.net.parameters(), lr=0.00025)
self.loss_fn = torch.nn.SmoothL1Loss()
def update_Q_online(self, td_estimate, td_target):
loss = self.loss_fn(td_estimate, td_target)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
def sync_Q_target(self):
self.net.target.load_state_dict(self.net.online.state_dict())
儲存檢查點#
class Mario(Mario):
def save(self):
save_path = (
self.save_dir / f"mario_net_{int(self.curr_step // self.save_every)}.chkpt"
)
torch.save(
dict(model=self.net.state_dict(), exploration_rate=self.exploration_rate),
save_path,
)
print(f"MarioNet saved to {save_path} at step {self.curr_step}")
整合所有內容#
class Mario(Mario):
def __init__(self, state_dim, action_dim, save_dir):
super().__init__(state_dim, action_dim, save_dir)
self.burnin = 1e4 # min. experiences before training
self.learn_every = 3 # no. of experiences between updates to Q_online
self.sync_every = 1e4 # no. of experiences between Q_target & Q_online sync
def learn(self):
if self.curr_step % self.sync_every == 0:
self.sync_Q_target()
if self.curr_step % self.save_every == 0:
self.save()
if self.curr_step < self.burnin:
return None, None
if self.curr_step % self.learn_every != 0:
return None, None
# Sample from memory
state, next_state, action, reward, done = self.recall()
# Get TD Estimate
td_est = self.td_estimate(state, action)
# Get TD Target
td_tgt = self.td_target(reward, next_state, done)
# Backpropagate loss through Q_online
loss = self.update_Q_online(td_est, td_tgt)
return (td_est.mean().item(), loss)
日誌記錄#
import numpy as np
import time, datetime
import matplotlib.pyplot as plt
class MetricLogger:
def __init__(self, save_dir):
self.save_log = save_dir / "log"
with open(self.save_log, "w") as f:
f.write(
f"{'Episode':>8}{'Step':>8}{'Epsilon':>10}{'MeanReward':>15}"
f"{'MeanLength':>15}{'MeanLoss':>15}{'MeanQValue':>15}"
f"{'TimeDelta':>15}{'Time':>20}\n"
)
self.ep_rewards_plot = save_dir / "reward_plot.jpg"
self.ep_lengths_plot = save_dir / "length_plot.jpg"
self.ep_avg_losses_plot = save_dir / "loss_plot.jpg"
self.ep_avg_qs_plot = save_dir / "q_plot.jpg"
# History metrics
self.ep_rewards = []
self.ep_lengths = []
self.ep_avg_losses = []
self.ep_avg_qs = []
# Moving averages, added for every call to record()
self.moving_avg_ep_rewards = []
self.moving_avg_ep_lengths = []
self.moving_avg_ep_avg_losses = []
self.moving_avg_ep_avg_qs = []
# Current episode metric
self.init_episode()
# Timing
self.record_time = time.time()
def log_step(self, reward, loss, q):
self.curr_ep_reward += reward
self.curr_ep_length += 1
if loss:
self.curr_ep_loss += loss
self.curr_ep_q += q
self.curr_ep_loss_length += 1
def log_episode(self):
"Mark end of episode"
self.ep_rewards.append(self.curr_ep_reward)
self.ep_lengths.append(self.curr_ep_length)
if self.curr_ep_loss_length == 0:
ep_avg_loss = 0
ep_avg_q = 0
else:
ep_avg_loss = np.round(self.curr_ep_loss / self.curr_ep_loss_length, 5)
ep_avg_q = np.round(self.curr_ep_q / self.curr_ep_loss_length, 5)
self.ep_avg_losses.append(ep_avg_loss)
self.ep_avg_qs.append(ep_avg_q)
self.init_episode()
def init_episode(self):
self.curr_ep_reward = 0.0
self.curr_ep_length = 0
self.curr_ep_loss = 0.0
self.curr_ep_q = 0.0
self.curr_ep_loss_length = 0
def record(self, episode, epsilon, step):
mean_ep_reward = np.round(np.mean(self.ep_rewards[-100:]), 3)
mean_ep_length = np.round(np.mean(self.ep_lengths[-100:]), 3)
mean_ep_loss = np.round(np.mean(self.ep_avg_losses[-100:]), 3)
mean_ep_q = np.round(np.mean(self.ep_avg_qs[-100:]), 3)
self.moving_avg_ep_rewards.append(mean_ep_reward)
self.moving_avg_ep_lengths.append(mean_ep_length)
self.moving_avg_ep_avg_losses.append(mean_ep_loss)
self.moving_avg_ep_avg_qs.append(mean_ep_q)
last_record_time = self.record_time
self.record_time = time.time()
time_since_last_record = np.round(self.record_time - last_record_time, 3)
print(
f"Episode {episode} - "
f"Step {step} - "
f"Epsilon {epsilon} - "
f"Mean Reward {mean_ep_reward} - "
f"Mean Length {mean_ep_length} - "
f"Mean Loss {mean_ep_loss} - "
f"Mean Q Value {mean_ep_q} - "
f"Time Delta {time_since_last_record} - "
f"Time {datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}"
)
with open(self.save_log, "a") as f:
f.write(
f"{episode:8d}{step:8d}{epsilon:10.3f}"
f"{mean_ep_reward:15.3f}{mean_ep_length:15.3f}{mean_ep_loss:15.3f}{mean_ep_q:15.3f}"
f"{time_since_last_record:15.3f}"
f"{datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'):>20}\n"
)
for metric in ["ep_lengths", "ep_avg_losses", "ep_avg_qs", "ep_rewards"]:
plt.clf()
plt.plot(getattr(self, f"moving_avg_{metric}"), label=f"moving_avg_{metric}")
plt.legend()
plt.savefig(getattr(self, f"{metric}_plot"))
讓我們開始玩吧!#
在此示例中,我們運行了 40 個訓練回合,但要讓馬里奧真正掌握他的世界,我們建議至少執行 40,000 個回合!
use_cuda = torch.cuda.is_available()
print(f"Using CUDA: {use_cuda}")
print()
save_dir = Path("checkpoints") / datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
save_dir.mkdir(parents=True)
mario = Mario(state_dim=(4, 84, 84), action_dim=env.action_space.n, save_dir=save_dir)
logger = MetricLogger(save_dir)
episodes = 40
for e in range(episodes):
state = env.reset()
# Play the game!
while True:
# Run agent on the state
action = mario.act(state)
# Agent performs action
next_state, reward, done, trunc, info = env.step(action)
# Remember
mario.cache(state, next_state, action, reward, done)
# Learn
q, loss = mario.learn()
# Logging
logger.log_step(reward, loss, q)
# Update state
state = next_state
# Check if end of game
if done or info["flag_get"]:
break
logger.log_episode()
if (e % 20 == 0) or (e == episodes - 1):
logger.record(episode=e, epsilon=mario.exploration_rate, step=mario.curr_step)

Using CUDA: True
Episode 0 - Step 298 - Epsilon 0.9999255027657337 - Mean Reward 784.0 - Mean Length 298.0 - Mean Loss 0.0 - Mean Q Value 0.0 - Time Delta 2.485 - Time 2025-10-15T19:20:04
Episode 20 - Step 5462 - Epsilon 0.9986354317002756 - Mean Reward 692.857 - Mean Length 260.095 - Mean Loss 0.0 - Mean Q Value 0.0 - Time Delta 43.663 - Time 2025-10-15T19:20:48
Episode 39 - Step 9038 - Epsilon 0.9977430504665065 - Mean Reward 659.85 - Mean Length 225.95 - Mean Loss 0.0 - Mean Q Value 0.0 - Time Delta 30.074 - Time 2025-10-15T19:21:18
結論#
在本教程中,我們瞭解瞭如何使用 PyTorch 訓練一個遊戲 AI。您可以使用相同的方法訓練一個 AI 來玩 OpenAI Gym 中的任何遊戲。希望您喜歡本教程,歡迎隨時透過 我們的 GitHub 與我們聯絡!
指令碼總執行時間: (1 分鐘 17.340 秒)