注意
跳轉到末尾 下載完整的示例程式碼。
強化學習(DQN)教程#
創建於:2017年3月24日 | 最後更新:2025年6月16日 | 最後驗證:2024年11月05日
本教程將展示如何使用 PyTorch 在 Gymnasium 的 CartPole-v1 任務上訓練一個深度 Q-learning (DQN) 智慧體。
你可能還會發現閱讀原始的 深度 Q-learning (DQN) 論文很有幫助
任務
智慧體需要在兩個動作之間做出選擇——向左或向右移動推車——以便連線到推車上的杆保持直立。你可以在 Gymnasium 的網站上找到關於該環境和其他更具挑戰性環境的更多資訊:Gymnasium 的網站。
CartPole#
當智慧體觀察到環境的當前狀態並選擇一個動作時,環境會轉換到一個新狀態,並返回一個獎勵,該獎勵指示動作的結果。在此任務中,每個增量時間步的獎勵為 +1,如果杆傾斜過度或推車移出中心超過 2.4 個單位,環境將終止。這意味著表現更好的場景將執行更長時間,累積更高的回報。
CartPole 任務的設計使得智慧體的輸入是 4 個表示環境狀態(位置、速度等)的實值。我們不經過任何縮放就採用這 4 個輸入,並將它們透過一個小型的全連線網路,該網路有兩個輸出,分別對應兩個動作。該網路經過訓練,可以根據輸入狀態預測每個動作的預期值。然後選擇具有最高預期值的動作。
包
首先,讓我們匯入所需的包。首先,我們需要 gymnasium 來處理環境,可以使用 pip 安裝。這是原始 OpenAI Gym 專案的一個分支,自 Gym v0.19 起由同一團隊維護。如果你在 Google Colab 中執行此程式碼,請執行
%%bash
pip3 install gymnasium[classic_control]
我們還將使用 PyTorch 的以下模組:
神經網路 (
torch.nn)最佳化 (
torch.optim)自動微分 (
torch.autograd)
import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
env = gym.make("CartPole-v1")
# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
from IPython import display
plt.ion()
# if GPU is to be used
device = torch.device(
"cuda" if torch.cuda.is_available() else
"mps" if torch.backends.mps.is_available() else
"cpu"
)
# To ensure reproducibility during training, you can fix the random seeds
# by uncommenting the lines below. This makes the results consistent across
# runs, which is helpful for debugging or comparing different approaches.
#
# That said, allowing randomness can be beneficial in practice, as it lets
# the model explore different training trajectories.
# seed = 42
# random.seed(seed)
# torch.manual_seed(seed)
# env.reset(seed=seed)
# env.action_space.seed(seed)
# env.observation_space.seed(seed)
# if torch.cuda.is_available():
# torch.cuda.manual_seed(seed)
Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
Users of this version of Gym should be able to simply replace 'import gym' with 'import gymnasium as gym' in the vast majority of cases.
See the migration guide at https://gymnasium.llms.tw/introduction/migration_guide/ for additional information.
經驗回放#
我們將使用經驗回放記憶體來訓練我們的 DQN。它儲存智慧體觀察到的轉換,允許我們稍後重用這些資料。透過隨機取樣,構建批次的轉換是去相關的。研究表明,這極大地穩定並改善了 DQN 的訓練過程。
為此,我們需要兩個類:
Transition- 一個命名元組,表示環境中的單個轉換。它本質上是將(狀態,動作)對對映到它們的(下一個狀態,獎勵)結果,其中狀態是螢幕差異影像,如下文所述。ReplayMemory- 一個具有固定大小的迴圈緩衝區,用於儲存最近觀察到的轉換。它還實現了一個.sample()方法,用於為訓練選擇隨機批次的轉換。
Transition = namedtuple('Transition',
('state', 'action', 'next_state', 'reward'))
class ReplayMemory(object):
def __init__(self, capacity):
self.memory = deque([], maxlen=capacity)
def push(self, *args):
"""Save a transition"""
self.memory.append(Transition(*args))
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
現在,讓我們定義我們的模型。但首先,讓我們快速回顧一下 DQN 是什麼。
DQN 演算法#
我們的環境是確定性的,因此此處介紹的所有方程也以確定性的方式表述,以簡化說明。在強化學習文獻中,它們還包含對環境中隨機轉換的期望。
我們的目標是訓練一個策略,該策略試圖最大化折扣累積獎勵 \(R_{t_0} = \sum_{t=t_0}^{\infty} \gamma^{t - t_0} r_t\),其中 \(R_{t_0}\) 也被稱為回報。折扣因子 \(\gamma\) 必須是介於 \(0\) 和 \(1\) 之間的常數,以確保和收斂。較低的 \(\gamma\) 值使得未來不確定的獎勵對我們的智慧體的重要性不如它相當確定地能夠獲得的近期獎勵。它還鼓勵智慧體比在未來時間上遙遠的等效獎勵更早地獲得獎勵。
Q-learning 的主要思想是,如果我們有一個函式 \(Q^*: State \times Action \rightarrow \mathbb{R}\),它可以告訴我們在給定狀態下采取某個動作後,我們的回報將是多少,那麼我們就可以很容易地構建一個最大化我們獎勵的策略。
然而,我們並非全知世界,因此我們無法訪問 \(Q^*\)。但是,由於神經網路是萬能函式逼近器,我們可以簡單地建立一個神經網路並訓練它來逼近 \(Q^*\)。
對於我們的訓練更新規則,我們將使用一個事實:任何 \(Q\) 函式對於某個策略都服從貝爾曼方程:
等式兩邊的差值稱為時序差分誤差,\(\delta\):
為了最小化這個誤差,我們將使用 Huber 損失。當誤差很小時,Huber 損失類似於均方誤差,而當誤差很大時,它類似於平均絕對誤差——這使得它在 Q 估計非常嘈雜時對異常值更具魯棒性。我們對從經驗回放中取樣的轉換批次 \(B\) 計算此損失:
Q 網路#
我們的模型將是一個前饋神經網路,它接收當前螢幕幀和前一幀螢幕幀之間的差值作為輸入。它有兩個輸出,分別表示 \(Q(s, \mathrm{left})\) 和 \(Q(s, \mathrm{right})\)(其中 \(s\) 是輸入到網路的輸入)。實際上,網路試圖預測給定當前輸入時採取每個動作的預期回報。
class DQN(nn.Module):
def __init__(self, n_observations, n_actions):
super(DQN, self).__init__()
self.layer1 = nn.Linear(n_observations, 128)
self.layer2 = nn.Linear(128, 128)
self.layer3 = nn.Linear(128, n_actions)
# Called with either one element to determine next action, or a batch
# during optimization. Returns tensor([[left0exp,right0exp]...]).
def forward(self, x):
x = F.relu(self.layer1(x))
x = F.relu(self.layer2(x))
return self.layer3(x)
訓練#
超引數和實用工具#
此單元格例項化我們的模型及其最佳化器,並定義了一些實用工具。
select_action- 將根據 epsilon 貪婪策略選擇一個動作。簡單來說,我們會偶爾使用我們的模型來選擇動作,而有時我們會隨機均勻取樣一個動作。選擇隨機動作的機率將從EPS_START開始,並呈指數衰減到EPS_END。EPS_DECAY控制衰減速率。plot_durations- 一個輔助函式,用於繪製回合的持續時間,以及過去 100 回合的平均值(官方評估中使用的度量)。圖表將顯示在包含主訓練迴圈的單元格下方,並在每個回合後更新。
# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA is the discount factor as mentioned in the previous section
# EPS_START is the starting value of epsilon
# EPS_END is the final value of epsilon
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR is the learning rate of the ``AdamW`` optimizer
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.01
EPS_DECAY = 2500
TAU = 0.005
LR = 3e-4
# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
state, info = env.reset()
n_observations = len(state)
policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)
steps_done = 0
def select_action(state):
global steps_done
sample = random.random()
eps_threshold = EPS_END + (EPS_START - EPS_END) * \
math.exp(-1. * steps_done / EPS_DECAY)
steps_done += 1
if sample > eps_threshold:
with torch.no_grad():
# t.max(1) will return the largest column value of each row.
# second column on max result is index of where max element was
# found, so we pick action with the larger expected reward.
return policy_net(state).max(1).indices.view(1, 1)
else:
return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)
episode_durations = []
def plot_durations(show_result=False):
plt.figure(1)
durations_t = torch.tensor(episode_durations, dtype=torch.float)
if show_result:
plt.title('Result')
else:
plt.clf()
plt.title('Training...')
plt.xlabel('Episode')
plt.ylabel('Duration')
plt.plot(durations_t.numpy())
# Take 100 episode averages and plot them too
if len(durations_t) >= 100:
means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
means = torch.cat((torch.zeros(99), means))
plt.plot(means.numpy())
plt.pause(0.001) # pause a bit so that plots are updated
if is_ipython:
if not show_result:
display.display(plt.gcf())
display.clear_output(wait=True)
else:
display.display(plt.gcf())
訓練迴圈#
最後,這是訓練我們模型的程式碼。
在這裡,你可以找到一個 optimize_model 函式,它執行單步最佳化。它首先採樣一個批次,將所有張量連線成一個張量,計算 \(Q(s_t, a_t)\) 和 \(V(s_{t+1}) = \max_a Q(s_{t+1}, a)\),並將它們組合成我們的損失。根據定義,如果 \(s\) 是終止狀態,我們將 \(V(s) = 0\)。我們還使用目標網路來計算 \(V(s_{t+1})\) 以增加穩定性。目標網路在每一步都使用由超引數 TAU 控制的軟更新進行更新,TAU 之前已定義。
def optimize_model():
if len(memory) < BATCH_SIZE:
return
transitions = memory.sample(BATCH_SIZE)
# Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
# detailed explanation). This converts batch-array of Transitions
# to Transition of batch-arrays.
batch = Transition(*zip(*transitions))
# Compute a mask of non-final states and concatenate the batch elements
# (a final state would've been the one after which simulation ended)
non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
batch.next_state)), device=device, dtype=torch.bool)
non_final_next_states = torch.cat([s for s in batch.next_state
if s is not None])
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
# Compute Q(s_t, a) - the model computes Q(s_t), then we select the
# columns of actions taken. These are the actions which would've been taken
# for each batch state according to policy_net
state_action_values = policy_net(state_batch).gather(1, action_batch)
# Compute V(s_{t+1}) for all next states.
# Expected values of actions for non_final_next_states are computed based
# on the "older" target_net; selecting their best reward with max(1).values
# This is merged based on the mask, such that we'll have either the expected
# state value or 0 in case the state was final.
next_state_values = torch.zeros(BATCH_SIZE, device=device)
with torch.no_grad():
next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values
# Compute the expected Q values
expected_state_action_values = (next_state_values * GAMMA) + reward_batch
# Compute Huber loss
criterion = nn.SmoothL1Loss()
loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))
# Optimize the model
optimizer.zero_grad()
loss.backward()
# In-place gradient clipping
torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
optimizer.step()
下面是主訓練迴圈。開始時,我們重置環境並獲取初始 state 張量。然後,我們取樣一個動作,執行它,觀察下一個狀態和獎勵(始終為 1),並對我們的模型進行一次最佳化。當回合結束時(我們的模型失敗),我們重新啟動迴圈。
下面,如果可用 GPU,num_episodes 設定為 600;否則,安排 50 個回合,以免訓練時間過長。然而,50 個回合不足以在 CartPole 上觀察到良好效能。你應該會看到模型在 600 個訓練回合內持續達到 500 步。訓練 RL 智慧體可能是一個不確定的過程,因此如果未觀察到收斂,重新開始訓練可以產生更好的結果。
if torch.cuda.is_available() or torch.backends.mps.is_available():
num_episodes = 600
else:
num_episodes = 50
for i_episode in range(num_episodes):
# Initialize the environment and get its state
state, info = env.reset()
state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
for t in count():
action = select_action(state)
observation, reward, terminated, truncated, _ = env.step(action.item())
reward = torch.tensor([reward], device=device)
done = terminated or truncated
if terminated:
next_state = None
else:
next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
# Store the transition in memory
memory.push(state, action, next_state, reward)
# Move to the next state
state = next_state
# Perform one step of the optimization (on the policy network)
optimize_model()
# Soft update of the target network's weights
# θ′ ← τ θ + (1 −τ )θ′
target_net_state_dict = target_net.state_dict()
policy_net_state_dict = policy_net.state_dict()
for key in policy_net_state_dict:
target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
target_net.load_state_dict(target_net_state_dict)
if done:
episode_durations.append(t + 1)
plot_durations()
break
print('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()

/usr/local/lib/python3.10/dist-packages/gymnasium/utils/passive_env_checker.py:249: DeprecationWarning:
`np.bool8` is a deprecated alias for `np.bool_`. (Deprecated NumPy 1.24)
Complete
這是說明總體結果資料流的圖表。
動作是隨機選擇的,或者基於策略選擇的,從 gym 環境中獲取下一步的樣本。我們將結果記錄在經驗回放中,並在每次迭代時進行最佳化。最佳化從經驗回放中隨機抽取一個批次來訓練新策略。用於計算預期 Q 值的“舊”目標網路也用於最佳化。其權重在每一步都會進行軟更新。
指令碼總執行時間: (6 分鐘 48.617 秒)