注意
轉到底部 下載完整的示例程式碼。
TorchRL 目標:編寫 DDPG 損失#
建立時間:2023 年 8 月 14 日 | 最後更新:2025 年 3 月 20 日 | 最後驗證:未驗證
概述#
TorchRL 將強化學習演算法的訓練分解為幾個部分,這些部分將在您的訓練指令碼中進行組裝:環境、資料收集和儲存、模型以及最終的損失函式。
TorchRL 損失(或“目標”)是包含可訓練引數(策略和價值模型)的狀態化物件。本教程將指導您完成使用 TorchRL 從頭開始編寫損失的步驟。
為此,我們將重點關注 DDPG,這是一個相對容易編寫的演算法。深度確定性策略梯度 (DDPG) 是一個簡單的連續控制演算法。它包括學習一個引數化價值函式,用於表示動作-觀測對的價值,然後學習一個策略,該策略在給定特定觀測的情況下輸出最大化該價值函式的動作。
您將學到什麼
如何編寫損失模組並自定義其價值估計器;
如何在 TorchRL 中構建環境,包括轉換(例如,資料歸一化)和並行執行;
如何設計策略和價值網路;
如何從環境中高效收集資料並將其儲存在回放緩衝區中;
如何在回放緩衝區中儲存軌跡(而不是單個轉換);
如何評估您的模型。
先決條件#
本教程假設您已完成 PPO 教程,該教程概述了 TorchRL 的元件和依賴項,例如 tensordict.TensorDict 和 tensordict.nn.TensorDictModules,儘管它應該足夠透明,無需深入瞭解這些類即可理解。
注意
我們的目標不是提供 SOTA 演算法的實現,而是提供 TorchRL 損失實現以及在演算法中使用到的庫功能的高階說明。
匯入和設定#
%%bash pip3 install torchrl mujoco glfw
import torch
import tqdm
如果可用,我們將策略在 CUDA 上執行
is_fork = multiprocessing.get_start_method() == "fork"
device = (
torch.device(0)
if torch.cuda.is_available() and not is_fork
else torch.device("cpu")
)
collector_device = torch.device("cpu") # Change the device to ``cuda`` to use CUDA
TorchRL LossModule#
TorchRL 提供了一系列可在訓練指令碼中使用的損失。目的是讓損失易於重用/互換,並且具有簡單的簽名。
TorchRL 損失的主要特點是
它們是狀態化物件:它們包含可訓練引數的副本,因此
loss_module.parameters()返回訓練演算法所需的一切。它們遵循
TensorDict約定:torch.nn.Module.forward()方法將接收一個 TensorDict 作為輸入,其中包含返回損失值的所有必要資訊。>>> data = replay_buffer.sample() >>> loss_dict = loss_module(data)
它們輸出一個
tensordict.TensorDict例項,其中損失值以"loss_<smth>"的形式寫入,其中smth是描述損失的字串。TensorDict中的其他鍵可能是有用的指標,可以在訓練時記錄。注意
我們返回獨立損失的原因是讓使用者可以為不同的引數集使用不同的最佳化器。損失的求和可以透過以下方式簡單完成
>>> loss_val = sum(loss for key, loss in loss_dict.items() if key.startswith("loss_"))
__init__ 方法#
所有損失的父類是 LossModule。與庫中的許多其他元件一樣,它的 forward() 方法期望的輸入是來自經驗回放緩衝區或其他類似資料結構的 tensordict.TensorDict 例項。使用這種格式可以跨模態或在模型需要讀取多個條目的複雜場景中重用模組。換句話說,它允許我們編寫一個對輸入資料型別不敏感的損失模組,並專注於執行損失函式的各個基本步驟。
為了使教程儘可能具有啟發性,我們將獨立展示類的每個方法,並在稍後填充類。
讓我們從 __init__() 方法開始。DDPG 旨在透過一個簡單的策略解決控制任務:訓練一個策略來輸出最大化價值網路預測的動作。因此,我們的損失模組需要在其建構函式中接收兩個網路:一個 Actor 和一個價值網路。我們期望兩者都是 TensorDict 相容的物件,例如 tensordict.nn.TensorDictModule。我們的損失函式需要計算一個目標值並擬合價值網路,生成一個動作並擬合策略,以便最大化其價值估計。
LossModule.__init__() 方法的關鍵步驟是呼叫 convert_to_functional()。此方法將從模組中提取引數並將其轉換為函式式模組。嚴格來說,這並非必需,並且可以完全不使用它編寫所有損失。但是,我們鼓勵使用它,原因如下。
TorchRL 這樣做是因為 RL 演算法經常使用不同的引數集執行相同的模型,稱為“可訓練”和“目標”引數。“可訓練”引數是最佳化器需要擬合的引數。“目標”引數通常是前者的副本,帶有時間滯後(絕對的或透過移動平均稀釋的)。這些目標引數用於計算與下一個觀測值相關聯的值。使用一組不完全匹配當前配置的目標引數的一個優點是,它們為正在計算的價值函式提供了悲觀界限。請注意下面的 create_target_params 關鍵字引數:此引數告訴 convert_to_functional() 方法在損失模組中建立一個目標引數集,用於目標值計算。如果將其設定為 False(例如,參見 Actor 網路),則 target_actor_network_params 屬性仍然可訪問,但這隻會返回 Actor 引數的 **分離** 版本。
稍後,我們將看到如何在 TorchRL 中更新目標引數。
from tensordict.nn import TensorDictModule, TensorDictSequential
def _init(
self,
actor_network: TensorDictModule,
value_network: TensorDictModule,
) -> None:
super(type(self), self).__init__()
self.convert_to_functional(
actor_network,
"actor_network",
create_target_params=True,
)
self.convert_to_functional(
value_network,
"value_network",
create_target_params=True,
compare_against=list(actor_network.parameters()),
)
self.actor_in_keys = actor_network.in_keys
# Since the value we'll be using is based on the actor and value network,
# we put them together in a single actor-critic container.
actor_critic = ActorCriticWrapper(actor_network, value_network)
self.actor_critic = actor_critic
self.loss_function = "l2"
價值估計器損失方法#
在許多 RL 演算法中,價值網路(或 Q 值網路)是根據經驗價值估計進行訓練的。它可以是自舉(TD(0),低方差,高偏差),這意味著目標值僅使用下一個獎勵獲得,或者可以獲得蒙特卡洛估計(TD(1)),在這種情況下,將使用所有即將到來的獎勵序列(高方差,低偏差)。中間估計器(TD(\(\lambda\)))也可以用於在偏差和方差之間進行折衷。TorchRL 透過 ValueEstimators 列舉類,其中包含指向所有已實現價值估計器的指標,使使用一個或另一個估計器變得容易。讓我們定義這裡的預設價值函式。我們將採用最簡單的版本(TD(0)),並稍後展示如何更改它。
from torchrl.objectives.utils import ValueEstimators
default_value_estimator = ValueEstimators.TD0
我們還需要根據使用者查詢,向 DDPG 提供如何構建價值估計器的說明。根據提供的估計器,我們將構建相應的模組以在訓練時使用
from torchrl.objectives.utils import default_value_kwargs
from torchrl.objectives.value import TD0Estimator, TD1Estimator, TDLambdaEstimator
def make_value_estimator(self, value_type: ValueEstimators, **hyperparams):
hp = dict(default_value_kwargs(value_type))
if hasattr(self, "gamma"):
hp["gamma"] = self.gamma
hp.update(hyperparams)
value_key = "state_action_value"
if value_type == ValueEstimators.TD1:
self._value_estimator = TD1Estimator(value_network=self.actor_critic, **hp)
elif value_type == ValueEstimators.TD0:
self._value_estimator = TD0Estimator(value_network=self.actor_critic, **hp)
elif value_type == ValueEstimators.GAE:
raise NotImplementedError(
f"Value type {value_type} it not implemented for loss {type(self)}."
)
elif value_type == ValueEstimators.TDLambda:
self._value_estimator = TDLambdaEstimator(value_network=self.actor_critic, **hp)
else:
raise NotImplementedError(f"Unknown value type {value_type}")
self._value_estimator.set_keys(value=value_key)
可以呼叫 make_value_estimator 方法,但不是必需的:如果未呼叫,LossModule 將使用其預設估計器查詢此方法。
Actor 損失方法#
RL 演算法的核心是 Actor 的訓練損失。在 DDPG 中,此函式非常簡單:我們只需要計算使用策略計算的動作的價值,並最佳化 Actor 的權重以最大化該價值。
計算此價值時,必須確保從計算圖中取出價值引數,否則 Actor 和價值損失將混合在一起。為此,可以使用 hold_out_params() 函式。
def _loss_actor(
self,
tensordict,
) -> torch.Tensor:
td_copy = tensordict.select(*self.actor_in_keys)
# Get an action from the actor network: since we made it functional, we need to pass the params
with self.actor_network_params.to_module(self.actor_network):
td_copy = self.actor_network(td_copy)
# get the value associated with that action
with self.value_network_params.detach().to_module(self.value_network):
td_copy = self.value_network(td_copy)
return -td_copy.get("state_action_value")
價值損失方法#
我們現在需要最佳化價值網路引數。為此,我們將依賴於類的價值估計器
from torchrl.objectives.utils import distance_loss
def _loss_value(
self,
tensordict,
):
td_copy = tensordict.clone()
# V(s, a)
with self.value_network_params.to_module(self.value_network):
self.value_network(td_copy)
pred_val = td_copy.get("state_action_value").squeeze(-1)
# we manually reconstruct the parameters of the actor-critic, where the first
# set of parameters belongs to the actor and the second to the value function.
target_params = TensorDict(
{
"module": {
"0": self.target_actor_network_params,
"1": self.target_value_network_params,
}
},
batch_size=self.target_actor_network_params.batch_size,
device=self.target_actor_network_params.device,
)
with target_params.to_module(self.actor_critic):
target_value = self.value_estimator.value_estimate(tensordict).squeeze(-1)
# Computes the value loss: L2, L1 or smooth L1 depending on `self.loss_function`
loss_value = distance_loss(pred_val, target_value, loss_function=self.loss_function)
td_error = (pred_val - target_value).pow(2)
return loss_value, td_error, pred_val, target_value
將它們放在 forward 呼叫中#
唯一缺少的部分是 forward 方法,它將連線價值損失和 Actor 損失,收整合本值,並將它們寫入提供給使用者的 TensorDict。
from tensordict import TensorDict, TensorDictBase
def _forward(self, input_tensordict: TensorDictBase) -> TensorDict:
loss_value, td_error, pred_val, target_value = self.loss_value(
input_tensordict,
)
td_error = td_error.detach()
td_error = td_error.unsqueeze(input_tensordict.ndimension())
if input_tensordict.device is not None:
td_error = td_error.to(input_tensordict.device)
input_tensordict.set(
"td_error",
td_error,
inplace=True,
)
loss_actor = self.loss_actor(input_tensordict)
return TensorDict(
source={
"loss_actor": loss_actor.mean(),
"loss_value": loss_value.mean(),
"pred_value": pred_val.mean().detach(),
"target_value": target_value.mean().detach(),
"pred_value_max": pred_val.max().detach(),
"target_value_max": target_value.max().detach(),
},
batch_size=[],
)
from torchrl.objectives import LossModule
class DDPGLoss(LossModule):
default_value_estimator = default_value_estimator
make_value_estimator = make_value_estimator
__init__ = _init
forward = _forward
loss_value = _loss_value
loss_actor = _loss_actor
現在我們有了損失,我們可以使用它來訓練策略來解決控制任務。
環境#
在大多數演算法中,首先需要處理的是環境的構建,因為它會影響訓練指令碼的其餘部分。
在本例中,我們將使用 "cheetah" 任務。目標是讓半獵豹儘可能快地奔跑。
在 TorchRL 中,可以透過依賴 dm_control 或 gym 來建立此類任務
env = GymEnv("HalfCheetah-v4")
或
env = DMControlEnv("cheetah", "run")
預設情況下,這些環境停用渲染。從狀態訓練通常比從影像訓練更容易。為簡單起見,我們只關注從狀態學習。要將畫素傳遞給 tensordicts,這些 tensordicts 由 env.step() 收集,只需將 from_pixels=True 引數傳遞給建構函式
env = GymEnv("HalfCheetah-v4", from_pixels=True, pixels_only=True)
我們編寫一個 make_env() 輔助函式,該函式將建立一個環境,該環境使用上述兩種後端之一(dm-control 或 gym)。
from torchrl.envs.libs.dm_control import DMControlEnv
from torchrl.envs.libs.gym import GymEnv
env_library = None
env_name = None
def make_env(from_pixels=False):
"""Create a base ``env``."""
global env_library
global env_name
if backend == "dm_control":
env_name = "cheetah"
env_task = "run"
env_args = (env_name, env_task)
env_library = DMControlEnv
elif backend == "gym":
env_name = "HalfCheetah-v4"
env_args = (env_name,)
env_library = GymEnv
else:
raise NotImplementedError
env_kwargs = {
"device": device,
"from_pixels": from_pixels,
"pixels_only": from_pixels,
"frame_skip": 2,
}
env = env_library(*env_args, **env_kwargs)
return env
轉換#
現在我們有了一個基本環境,我們可能希望修改其表示形式,使其更適合策略。在 TorchRL 中,轉換會附加到特殊類 torchr.envs.TransformedEnv 的基本環境中。
在 DDPG 中,使用啟發式值對獎勵進行重縮放是很常見的。在此示例中,我們將獎勵乘以 5。
如果我們使用的是
dm_control,那麼建立模擬器(使用雙精度數)和我們的指令碼(可能使用單精度數)之間的介面也很重要。此轉換是雙向的:呼叫env.step()時,我們的動作需要表示為雙精度,輸出需要轉換為單精度。DoubleToFloat轉換正是這樣做的:in_keys列表指的是需要從雙精度轉換為單精度的鍵,而in_keys_inv指的是在傳遞給環境之前需要轉換為雙精度的鍵。我們使用
CatTensors轉換將狀態鍵連線在一起。最後,我們還保留了歸一化狀態的可能性:我們將負責稍後計算歸一化常數。
from torchrl.envs import (
CatTensors,
DoubleToFloat,
EnvCreator,
InitTracker,
ObservationNorm,
ParallelEnv,
RewardScaling,
StepCounter,
TransformedEnv,
)
def make_transformed_env(
env,
):
"""Apply transforms to the ``env`` (such as reward scaling and state normalization)."""
env = TransformedEnv(env)
# we append transforms one by one, although we might as well create the
# transformed environment using the `env = TransformedEnv(base_env, transforms)`
# syntax.
env.append_transform(RewardScaling(loc=0.0, scale=reward_scaling))
# We concatenate all states into a single "observation_vector"
# even if there is a single tensor, it'll be renamed in "observation_vector".
# This facilitates the downstream operations as we know the name of the
# output tensor.
# In some environments (not half-cheetah), there may be more than one
# observation vector: in this case this code snippet will concatenate them
# all.
selected_keys = list(env.observation_spec.keys())
out_key = "observation_vector"
env.append_transform(CatTensors(in_keys=selected_keys, out_key=out_key))
# we normalize the states, but for now let's just instantiate a stateless
# version of the transform
env.append_transform(ObservationNorm(in_keys=[out_key], standard_normal=True))
env.append_transform(DoubleToFloat())
env.append_transform(StepCounter(max_frames_per_traj))
# We need a marker for the start of trajectories for our Ornstein-Uhlenbeck (OU)
# exploration:
env.append_transform(InitTracker())
return env
並行執行#
以下輔助函式允許我們在並行環境中執行。並行執行環境可以顯著加快收集吞吐量。使用轉換後的環境時,我們需要選擇是單獨為每個環境執行轉換,還是集中資料並批次轉換。這兩種方法都易於編碼
env = ParallelEnv(
lambda: TransformedEnv(GymEnv("HalfCheetah-v4"), transforms),
num_workers=4
)
env = TransformedEnv(
ParallelEnv(lambda: GymEnv("HalfCheetah-v4"), num_workers=4),
transforms
)
為了利用 PyTorch 的向量化能力,我們採用第一種方法
def parallel_env_constructor(
env_per_collector,
transform_state_dict,
):
if env_per_collector == 1:
def make_t_env():
env = make_transformed_env(make_env())
env.transform[2].init_stats(3)
env.transform[2].loc.copy_(transform_state_dict["loc"])
env.transform[2].scale.copy_(transform_state_dict["scale"])
return env
env_creator = EnvCreator(make_t_env)
return env_creator
parallel_env = ParallelEnv(
num_workers=env_per_collector,
create_env_fn=EnvCreator(lambda: make_env()),
create_env_kwargs=None,
pin_memory=False,
)
env = make_transformed_env(parallel_env)
# we call `init_stats` for a limited number of steps, just to instantiate
# the lazy buffers.
env.transform[2].init_stats(3, cat_dim=1, reduce_dim=[0, 1])
env.transform[2].load_state_dict(transform_state_dict)
return env
# The backend can be ``gym`` or ``dm_control``
backend = "gym"
注意
frame_skip 將多個步長與單個動作一起批次處理。如果 > 1,則需要調整其他幀計數(例如,frames_per_batch、total_frames)以在實驗中保持一致的總幀數。這很重要,因為提高 frame_skip 但保持總幀數不變可能顯得不公平:所有其他條件相同,使用 frame_skip 為 2 收集的資料集和使用 frame_skip 為 1 收集的資料集實際上具有 2:1 的環境互動比例!簡而言之,在處理 frame skipping 時,應謹慎對待訓練指令碼的幀計數,因為這可能導致訓練策略之間進行有偏倚的比較。
縮放獎勵有助於我們控制訊號幅度,以實現更有效的學習。
reward_scaling = 5.0
我們還定義了軌跡何時將被截斷。一千步(如果 frame_skip = 2,則為 500 步)是 cheetah 任務的好選擇
max_frames_per_traj = 500
觀測值歸一化#
為了計算歸一化統計資料,我們執行任意數量的隨機步驟,並在環境中計算收集到的觀測值的均值和標準差。ObservationNorm.init_stats() 方法可用於此目的。為了獲得彙總統計資料,我們建立了一個虛擬環境並運行了給定的步數,收集了給定步數的資料並計算了其彙總統計資料。
def get_env_stats():
"""Gets the stats of an environment."""
proof_env = make_transformed_env(make_env())
t = proof_env.transform[2]
t.init_stats(init_env_steps)
transform_state_dict = t.state_dict()
proof_env.close()
return transform_state_dict
歸一化統計資料#
用於透過 ObservationNorm 進行統計計算的隨機步數
init_env_steps = 5000
transform_state_dict = get_env_stats()
Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
Users of this version of Gym should be able to simply replace 'import gym' with 'import gymnasium as gym' in the vast majority of cases.
See the migration guide at https://gymnasium.llms.tw/introduction/migration_guide/ for additional information.
每個資料收集器中的環境數量
env_per_collector = 4
我們將之前計算的統計資料傳遞給我們的環境輸出進行歸一化
parallel_env = parallel_env_constructor(
env_per_collector=env_per_collector,
transform_state_dict=transform_state_dict,
)
from torchrl.data import CompositeSpec
構建模型#
現在我們轉向模型的設定。如前所述,DDPG 需要一個價值網路,該網路用於估計狀態-動作對的價值,以及一個引數化 Actor,它學習如何選擇最大化該價值的動作。
回想一下,構建 TorchRL 模組需要兩個步驟
編寫將用作網路的
torch.nn.Module,將網路包裝在
tensordict.nn.TensorDictModule中,其中透過指定輸入和輸出鍵來處理資料流。
在更復雜的場景中,也可以使用 tensordict.nn.TensorDictSequential。
Q 值網路被包裝在 ValueOperator 中,該類自動將 out_keys 設定為 "state_action_value"(對於 q 值網路)和 "state_value"(對於其他價值網路)。
TorchRL 提供了原始論文中介紹的 DDPG 網路的內建版本。這些可以在 DdpgMlpActor 和 DdpgMlpQNet 中找到。
由於我們使用懶惰模組,因此在將策略移動到裝置之間以及執行其他操作之前,有必要具體化懶惰模組。因此,最好使用少量資料執行模組。為此,我們生成了來自環境規範的偽資料。
from torchrl.modules import (
ActorCriticWrapper,
DdpgMlpActor,
DdpgMlpQNet,
OrnsteinUhlenbeckProcessModule,
ProbabilisticActor,
TanhDelta,
ValueOperator,
)
def make_ddpg_actor(
transform_state_dict,
device="cpu",
):
proof_environment = make_transformed_env(make_env())
proof_environment.transform[2].init_stats(3)
proof_environment.transform[2].load_state_dict(transform_state_dict)
out_features = proof_environment.action_spec.shape[-1]
actor_net = DdpgMlpActor(
action_dim=out_features,
)
in_keys = ["observation_vector"]
out_keys = ["param"]
actor = TensorDictModule(
actor_net,
in_keys=in_keys,
out_keys=out_keys,
)
actor = ProbabilisticActor(
actor,
distribution_class=TanhDelta,
in_keys=["param"],
spec=CompositeSpec(action=proof_environment.action_spec),
).to(device)
q_net = DdpgMlpQNet()
in_keys = in_keys + ["action"]
qnet = ValueOperator(
in_keys=in_keys,
module=q_net,
).to(device)
# initialize lazy modules
qnet(actor(proof_environment.reset().to(device)))
return actor, qnet
actor, qnet = make_ddpg_actor(
transform_state_dict=transform_state_dict,
device=device,
)
/usr/local/lib/python3.10/dist-packages/torchrl/data/tensor_specs.py:6911: DeprecationWarning:
The CompositeSpec has been deprecated and will be removed in v0.8. Please use Composite instead.
探索#
如原始論文所示,策略被放入 OrnsteinUhlenbeckProcessModule 探索模組。讓我們定義 OU 噪聲達到最小值之前的幀數
annealing_frames = 1_000_000
actor_model_explore = TensorDictSequential(
actor,
OrnsteinUhlenbeckProcessModule(
spec=actor.spec.clone(),
annealing_num_steps=annealing_frames,
).to(device),
)
if device == torch.device("cpu"):
actor_model_explore.share_memory()
資料收集器#
TorchRL 提供了專門的類來幫助您透過在環境中執行策略來收集資料。這些“資料收集器”會迭代地計算在給定時間要執行的動作,然後執行環境中的一個步長,並在需要時重置它。資料收集器的設計旨在幫助開發人員嚴格控制每批資料的幀數、收集的(非同步)性質以及分配給資料收集的資源(例如 GPU、工作執行緒數量等)。
在這裡,我們將使用 SyncDataCollector,這是一個簡單的單程序資料收集器。TorchRL 還提供其他收集器,例如 MultiaSyncDataCollector,它以非同步方式執行滾動(例如,在最佳化策略時收集資料,從而將訓練和資料收集解耦)。
要指定的引數是
一個環境工廠或一個環境;
策略;
在收集器被認為為空之前的總幀數;
每個軌跡的最大幀數(對於非終止環境,如
dm_control環境,這很有用)。注意
傳遞給收集器的
max_frames_per_traj將導致在用於推理的環境中註冊一個新的StepCounter轉換。我們可以在指令碼中手動實現相同的結果。
還應傳遞
每個收集批次的幀數;
獨立於策略執行的隨機步數;
用於策略執行的裝置;
用於在將資料傳遞給主程序之前儲存資料的裝置。
我們將在訓練期間使用的總幀數應約為 1M。
total_frames = 10_000 # 1_000_000
外層迴圈的每次迭代中收集器返回的幀數等於每個子軌跡的長度乘以每個收集器中並行執行的環境數量。
換句話說,我們期望來自收集器的批次形狀為 [env_per_collector, traj_len],其中 traj_len=frames_per_batch/env_per_collector
traj_len = 200
frames_per_batch = env_per_collector * traj_len
init_random_frames = 5000
num_collectors = 2
from torchrl.collectors import SyncDataCollector
from torchrl.envs import ExplorationType
collector = SyncDataCollector(
parallel_env,
policy=actor_model_explore,
total_frames=total_frames,
frames_per_batch=frames_per_batch,
init_random_frames=init_random_frames,
reset_at_each_iter=False,
split_trajs=False,
device=collector_device,
exploration_type=ExplorationType.RANDOM,
)
評估器:構建記錄器物件#
由於訓練資料是透過某種探索策略獲得的,因此需要以確定性模式評估我們演算法的真實效能。我們使用一個專用類 Recorder 來實現這一點,該類以給定的頻率在環境中執行策略,並返回從這些模擬中獲得的一些統計資料。
以下輔助函式構建此物件
from torchrl.trainers import Recorder
def make_recorder(actor_model_explore, transform_state_dict, record_interval):
base_env = make_env()
environment = make_transformed_env(base_env)
environment.transform[2].init_stats(
3
) # must be instantiated to load the state dict
environment.transform[2].load_state_dict(transform_state_dict)
recorder_obj = Recorder(
record_frames=1000,
policy_exploration=actor_model_explore,
environment=environment,
exploration_type=ExplorationType.DETERMINISTIC,
record_interval=record_interval,
)
return recorder_obj
我們將每收集 10 個批次記錄一次效能
record_interval = 10
recorder = make_recorder(
actor_model_explore, transform_state_dict, record_interval=record_interval
)
from torchrl.data.replay_buffers import (
LazyMemmapStorage,
PrioritizedSampler,
RandomSampler,
TensorDictReplayBuffer,
)
回放緩衝區#
回放緩衝區有兩種型別:優先(使用一些誤差訊號,使某些項的取樣機率高於其他項)和常規的迴圈經驗回放。
TorchRL 回放緩衝區是可組合的:可以選擇儲存、取樣和寫入策略。也可以使用記憶體對映陣列將張量儲存在物理記憶體中。以下函式負責使用所需的超引數建立回放緩衝區
from torchrl.envs import RandomCropTensorDict
def make_replay_buffer(buffer_size, batch_size, random_crop_len, prefetch=3, prb=False):
if prb:
sampler = PrioritizedSampler(
max_capacity=buffer_size,
alpha=0.7,
beta=0.5,
)
else:
sampler = RandomSampler()
replay_buffer = TensorDictReplayBuffer(
storage=LazyMemmapStorage(
buffer_size,
scratch_dir=buffer_scratch_dir,
),
batch_size=batch_size,
sampler=sampler,
pin_memory=False,
prefetch=prefetch,
transform=RandomCropTensorDict(random_crop_len, sample_dim=1),
)
return replay_buffer
我們將回放緩衝區儲存在磁碟上的臨時目錄中
import tempfile
tmpdir = tempfile.TemporaryDirectory()
buffer_scratch_dir = tmpdir.name
回放緩衝區儲存和批次大小#
TorchRL 回放緩衝區沿第一個維度計算元素數量。由於我們將把軌跡饋送到緩衝區,因此我們需要透過除以資料收集器生成的子軌跡的長度來調整緩衝區大小。關於批次大小,我們的取樣策略將包括對長度為 traj_len=200 的軌跡進行取樣,然後選擇長度為 random_crop_len=25 的子軌跡,在這些子軌跡上計算損失。此策略平衡了儲存具有特定長度的完整軌跡的選擇以及向損失提供足夠異質性樣本的需求。下圖顯示了從一個收集器到回放緩衝區的 8 幀資料流,該收集器並行執行 2 個環境,然後對其進行取樣,最後從中提取長度為 2 個時間步的子軌跡。
讓我們從緩衝區中儲存的幀數開始
def ceil_div(x, y):
return -x // (-y)
buffer_size = 1_000_000
buffer_size = ceil_div(buffer_size, traj_len)
優先回放緩衝區預設停用
prb = False
我們還需要定義每個收集批次將進行多少次更新。這稱為更新與資料比率或 UTD 比率
update_to_data = 64
我們將使用長度為 25 的軌跡來饋送損失
random_crop_len = 25
在原始論文中,作者每收集一幀資料執行一次批次大小為 64 的更新。在這裡,我們複製相同的比率,但在每次批次收集時進行多次更新。我們調整批次大小以實現相同的每幀更新比率
batch_size = ceil_div(64 * frames_per_batch, update_to_data * random_crop_len)
replay_buffer = make_replay_buffer(
buffer_size=buffer_size,
batch_size=batch_size,
random_crop_len=random_crop_len,
prefetch=3,
prb=prb,
)
損失模組構建#
我們使用剛剛建立的 Actor 和 qnet 來構建損失模組。因為有目標引數需要更新,所以我們 _必須_ 建立一個目標網路更新器。
讓我們使用 TD(lambda) 估計器!
loss_module.make_value_estimator(ValueEstimators.TDLambda, gamma=gamma, lmbda=lmbda, device=device)
注意
離軌策略通常指示使用 TD(0) 估計器。在這裡,我們使用 TD(\(\lambda\)) 估計器,這會引入一些偏差,因為在特定狀態之後遵循的軌跡是用過時的策略收集的。此技巧,就像在資料收集期間可以使用多步技巧一樣,是“技巧”的替代版本,儘管它們在返回值估計中引入了一些偏差,但它們通常在實踐中效果很好。
目標網路更新器#
目標網路是離軌 RL 演算法的關鍵部分。透過 HardUpdate 和 SoftUpdate 類,可以輕鬆更新目標網路引數。它們使用損失模組作為引數進行構建,並透過在訓練迴圈的適當位置呼叫 updater.step() 來實現更新。
from torchrl.objectives.utils import SoftUpdate
target_net_updater = SoftUpdate(loss_module, eps=1 - tau)
最佳化器#
最後,我們將使用 Adam 最佳化器來最佳化策略和價值網路
from torch import optim
optimizer_actor = optim.Adam(
loss_module.actor_network_params.values(True, True), lr=1e-4, weight_decay=0.0
)
optimizer_value = optim.Adam(
loss_module.value_network_params.values(True, True), lr=1e-3, weight_decay=1e-2
)
total_collection_steps = total_frames // frames_per_batch
訓練策略的時候到了#
現在我們已經構建了所有需要的模組,訓練迴圈就相當直接了。
rewards = []
rewards_eval = []
# Main loop
collected_frames = 0
pbar = tqdm.tqdm(total=total_frames)
r0 = None
for i, tensordict in enumerate(collector):
# update weights of the inference policy
collector.update_policy_weights_()
if r0 is None:
r0 = tensordict["next", "reward"].mean().item()
pbar.update(tensordict.numel())
# extend the replay buffer with the new data
current_frames = tensordict.numel()
collected_frames += current_frames
replay_buffer.extend(tensordict.cpu())
# optimization steps
if collected_frames >= init_random_frames:
for _ in range(update_to_data):
# sample from replay buffer
sampled_tensordict = replay_buffer.sample().to(device)
# Compute loss
loss_dict = loss_module(sampled_tensordict)
# optimize
loss_dict["loss_actor"].backward()
gn1 = torch.nn.utils.clip_grad_norm_(
loss_module.actor_network_params.values(True, True), 10.0
)
optimizer_actor.step()
optimizer_actor.zero_grad()
loss_dict["loss_value"].backward()
gn2 = torch.nn.utils.clip_grad_norm_(
loss_module.value_network_params.values(True, True), 10.0
)
optimizer_value.step()
optimizer_value.zero_grad()
gn = (gn1**2 + gn2**2) ** 0.5
# update priority
if prb:
replay_buffer.update_tensordict_priority(sampled_tensordict)
# update target network
target_net_updater.step()
rewards.append(
(
i,
tensordict["next", "reward"].mean().item(),
)
)
td_record = recorder(None)
if td_record is not None:
rewards_eval.append((i, td_record["r_evaluation"].item()))
if len(rewards_eval) and collected_frames >= init_random_frames:
target_value = loss_dict["target_value"].item()
loss_value = loss_dict["loss_value"].item()
loss_actor = loss_dict["loss_actor"].item()
rn = sampled_tensordict["next", "reward"].mean().item()
rs = sampled_tensordict["next", "reward"].std().item()
pbar.set_description(
f"reward: {rewards[-1][1]: 4.2f} (r0 = {r0: 4.2f}), "
f"reward eval: reward: {rewards_eval[-1][1]: 4.2f}, "
f"reward normalized={rn :4.2f}/{rs :4.2f}, "
f"grad norm={gn: 4.2f}, "
f"loss_value={loss_value: 4.2f}, "
f"loss_actor={loss_actor: 4.2f}, "
f"target value: {target_value: 4.2f}"
)
# update the exploration strategy
actor_model_explore[1].step(current_frames)
collector.shutdown()
del collector
0%| | 0/10000 [00:00<?, ?it/s]
8%|▊ | 800/10000 [00:00<00:06, 1355.90it/s]
16%|█▌ | 1600/10000 [00:02<00:15, 526.86it/s]
24%|██▍ | 2400/10000 [00:03<00:09, 773.78it/s]
32%|███▏ | 3200/10000 [00:03<00:06, 994.86it/s]
40%|████ | 4000/10000 [00:04<00:05, 1177.97it/s]
48%|████▊ | 4800/10000 [00:04<00:03, 1329.23it/s]
56%|█████▌ | 5600/10000 [00:05<00:03, 1446.60it/s]
reward: -2.18 (r0 = -2.02), reward eval: reward: 0.00, reward normalized=-2.29/6.36, grad norm= 129.21, loss_value= 444.52, loss_actor= 13.29, target value: -13.62: 56%|█████▌ | 5600/10000 [00:06<00:03, 1446.60it/s]
reward: -2.18 (r0 = -2.02), reward eval: reward: 0.00, reward normalized=-2.29/6.36, grad norm= 129.21, loss_value= 444.52, loss_actor= 13.29, target value: -13.62: 64%|██████▍ | 6400/10000 [00:07<00:05, 695.30it/s]
reward: -1.34 (r0 = -2.02), reward eval: reward: 0.00, reward normalized=-3.00/6.06, grad norm= 290.06, loss_value= 397.83, loss_actor= 14.71, target value: -19.55: 64%|██████▍ | 6400/10000 [00:09<00:05, 695.30it/s]
reward: -1.34 (r0 = -2.02), reward eval: reward: 0.00, reward normalized=-3.00/6.06, grad norm= 290.06, loss_value= 397.83, loss_actor= 14.71, target value: -19.55: 72%|███████▏ | 7200/10000 [00:09<00:05, 518.12it/s]
reward: -4.97 (r0 = -2.02), reward eval: reward: 0.00, reward normalized=-3.03/5.49, grad norm= 171.41, loss_value= 315.45, loss_actor= 19.29, target value: -19.99: 72%|███████▏ | 7200/10000 [00:11<00:05, 518.12it/s]
reward: -4.97 (r0 = -2.02), reward eval: reward: 0.00, reward normalized=-3.03/5.49, grad norm= 171.41, loss_value= 315.45, loss_actor= 19.29, target value: -19.99: 80%|████████ | 8000/10000 [00:12<00:04, 442.37it/s]
reward: -3.94 (r0 = -2.02), reward eval: reward: 0.00, reward normalized=-2.23/5.10, grad norm= 61.54, loss_value= 238.42, loss_actor= 13.63, target value: -14.66: 80%|████████ | 8000/10000 [00:14<00:04, 442.37it/s]
reward: -3.94 (r0 = -2.02), reward eval: reward: 0.00, reward normalized=-2.23/5.10, grad norm= 61.54, loss_value= 238.42, loss_actor= 13.63, target value: -14.66: 88%|████████▊ | 8800/10000 [00:14<00:02, 402.09it/s]
reward: -3.27 (r0 = -2.02), reward eval: reward: -1.73, reward normalized=-2.25/5.09, grad norm= 84.03, loss_value= 191.63, loss_actor= 13.87, target value: -15.35: 88%|████████▊ | 8800/10000 [00:18<00:02, 402.09it/s]
reward: -3.27 (r0 = -2.02), reward eval: reward: -1.73, reward normalized=-2.25/5.09, grad norm= 84.03, loss_value= 191.63, loss_actor= 13.87, target value: -15.35: 96%|█████████▌| 9600/10000 [00:18<00:01, 304.56it/s]
reward: -0.69 (r0 = -2.02), reward eval: reward: -1.73, reward normalized=-2.73/4.87, grad norm= 94.23, loss_value= 202.54, loss_actor= 15.48, target value: -19.00: 96%|█████████▌| 9600/10000 [00:20<00:01, 304.56it/s]
reward: -0.69 (r0 = -2.02), reward eval: reward: -1.73, reward normalized=-2.73/4.87, grad norm= 94.23, loss_value= 202.54, loss_actor= 15.48, target value: -19.00: : 10400it [00:21, 304.02it/s]
reward: -4.88 (r0 = -2.02), reward eval: reward: -1.73, reward normalized=-2.77/4.17, grad norm= 75.76, loss_value= 172.23, loss_actor= 20.02, target value: -19.61: : 10400it [00:23, 304.02it/s]
實驗結果#
我們繪製了訓練期間平均獎勵的簡單圖。我們可以看到我們的策略學會了很好地解決任務。
注意
如上所述,為了獲得更合理的結果,請使用更大的 total_frames 值,例如 1M。
from matplotlib import pyplot as plt
plt.figure()
plt.plot(*zip(*rewards), label="training")
plt.plot(*zip(*rewards_eval), label="eval")
plt.legend()
plt.xlabel("iter")
plt.ylabel("reward")
plt.tight_layout()

結論#
在本教程中,我們透過 DDPG 的具體示例學習瞭如何在 TorchRL 中編寫損失模組。
要點總結
如何使用
LossModule類來編寫新的損失元件;如何使用(或不使用)目標網路,以及如何更新其引數;
如何建立與損失模組關聯的最佳化器。
下一步#
為了在此損失模組上進行進一步迭代,我們可以考慮
使用 @dispatch(參見 [Feature] Distpatch IQL loss module)。
允許靈活的 TensorDict 鍵。
指令碼總執行時間: (0 分 29.811 秒)