• 文件 >
  • TorchRL 目標:編寫 DDPG 損失函式
快捷方式

TorchRL 目標:編寫 DDPG 損失

作者Vincent Moens

概述

TorchRL 將強化學習演算法的訓練分解為幾個部分,這些部分將在您的訓練指令碼中進行組裝:環境、資料收集與儲存、模型以及最終的損失函式。

TorchRL 損失(或“目標”)是狀態化的物件,包含可訓練引數(策略和價值模型)。本教程將引導您完成使用 TorchRL 從頭開始編寫損失的步驟。

為此,我們將重點關注 DDPG,這是一個相對容易編寫的演算法。深度確定性策略梯度 (DDPG) 是一種簡單的連續控制演算法。它包括學習一個引數化的價值函式來表示一個動作-觀察對,然後學習一個策略,該策略根據特定的觀察輸出最大化該價值函式的動作。

您將學到什麼

  • 如何編寫損失模組並自定義其價值估計器;

  • 如何在 TorchRL 中構建環境,包括轉換(例如,資料歸一化)和並行執行;

  • 如何設計策略和價值網路;

  • 如何從環境中高效收集資料並將其儲存在回放緩衝區中;

  • 如何在回放緩衝區中儲存軌跡(而不是轉換);

  • 如何評估您的模型。

先決條件

本教程假設您已完成PPO 教程,該教程概述了 TorchRL 的元件和依賴項,例如 tensordict.TensorDicttensordict.nn.TensorDictModules,儘管它應該足夠透明,無需深入理解這些類即可理解。

注意

我們的目標不是提供最先進的演算法實現,而是提供 TorchRL 損失實現以及在此演算法背景下使用的庫功能的總體概述。

匯入和設定

%%bash
pip3 install torchrl mujoco glfw
import torch
import tqdm

如果可用,我們將策略在 CUDA 上執行

is_fork = multiprocessing.get_start_method() == "fork"
device = (
    torch.device(0)
    if torch.cuda.is_available() and not is_fork
    else torch.device("cpu")
)
collector_device = torch.device("cpu")  # Change the device to ``cuda`` to use CUDA

TorchRL LossModule

TorchRL 提供了一系列可在訓練指令碼中使用的損失。目的是讓損失易於重用/互換,並且具有簡單的簽名。

TorchRL 損失的主要特點是

  • 它們是狀態化的物件:它們包含可訓練引數的副本,因此 loss_module.parameters() 會給出訓練演算法所需的一切。

  • 它們遵循 TensorDict 約定:torch.nn.Module.forward() 方法將接收一個 TensorDict 作為輸入,其中包含返回損失值所需的所有資訊。

  • 它們輸出一個 tensordict.TensorDict 例項,其中損失值寫入 "loss_<smth>",其中 smth 是描述損失的字串。TensorDict 中的其他鍵可能是訓練時記錄的有用指標。

    注意

    我們返回獨立損失的原因是讓使用者可以為不同的引數集使用不同的最佳化器。損失的求和可以透過以下方式簡單完成

    ..code - block::Python

    >>> loss_val = sum(loss for key, loss in loss_dict.items() if key.startswith("loss_"))
    

The __init__ method

所有損失的父類是 LossModule。與庫中的許多其他元件一樣,其 forward() 方法期望接收一個從經驗回放緩衝區中取樣的 tensordict.TensorDict 例項或任何類似的 data structure 作為輸入。使用此格式可以跨模態或在模型需要讀取多個條目等複雜場景中重用該模組。換句話說,它允許我們編寫一個對輸入資料型別不敏感的損失模組,並專注於執行損失函式的基本步驟,僅此而已。

為了使本教程儘可能具教學性,我們將獨立展示類的每個方法,並在稍後填充該類。

讓我們從 __init__() 方法開始。DDPG 旨在透過一個簡單的策略來解決控制任務:訓練一個策略來輸出最大化價值網路預測的動作。因此,我們的損失模組需要在其建構函式中接收兩個網路:一個 Actor(策略)和一個價值網路。我們期望兩者都是 TensorDict 相容的物件,例如 tensordict.nn.TensorDictModule。我們的損失函式需要計算目標值並擬合價值網路,以及生成一個動作並擬合策略,使其價值估計最大化。

LossModule.__init__() 方法的關鍵步驟是對 convert_to_functional() 的呼叫。此方法將從模組中提取引數並將其轉換為函式式模組。嚴格來說,這不是必需的,並且可以完美地在沒有它的情況下編寫所有損失。但是,我們鼓勵其使用,原因如下。

TorchRL 這樣做的原因是 RL 演算法通常使用不同的引數集執行相同的模型,稱為“可訓練”和“目標”引數。“可訓練”引數是最佳化器需要擬合的引數。“目標”引數通常是前者副本,具有一些時間滯後(絕對或透過移動平均稀釋)。這些目標引數用於計算與下一個觀察值相關聯的價值。使用一組不完全匹配當前配置的目標引數的一個優點是,它們提供了對正在計算的價值函式的一個悲觀界限。請注意下面的 create_target_params 關鍵字引數:此引數告訴 convert_to_functional() 方法在損失模組中建立一個目標引數集,用於目標值計算。如果將其設定為 False(例如,請參閱 Actor 網路),則 target_actor_network_params 屬性仍然可以訪問,但這隻會返回 Actor 引數的**分離**版本。

稍後,我們將看到如何在 TorchRL 中更新目標引數。

from tensordict.nn import TensorDictModule, TensorDictSequential


def _init(
    self,
    actor_network: TensorDictModule,
    value_network: TensorDictModule,
) -> None:
    super(type(self), self).__init__()

    self.convert_to_functional(
        actor_network,
        "actor_network",
        create_target_params=True,
    )
    self.convert_to_functional(
        value_network,
        "value_network",
        create_target_params=True,
        compare_against=list(actor_network.parameters()),
    )

    self.actor_in_keys = actor_network.in_keys

    # Since the value we'll be using is based on the actor and value network,
    # we put them together in a single actor-critic container.
    actor_critic = ActorCriticWrapper(actor_network, value_network)
    self.actor_critic = actor_critic
    self.loss_function = "l2"

價值估計器損失方法

在許多 RL 演算法中,價值網路(或 Q 值網路)基於經驗價值估計進行訓練。這可以是透過引導(TD(0),低方差,高偏差),意味著目標值僅使用下一個獎勵獲得,或者可以獲得蒙特卡羅估計(TD(1)),在這種情況下將使用整個即將到來的獎勵序列(高方差,低偏差)。中間估計器(TD(\(\lambda\)))也可以用來折衷偏差和方差。TorchRL 透過 ValueEstimators 列舉類,該類包含指向所有已實現價值估計器的指標,可以輕鬆使用其中一個或另一個估計器。讓我們在此處定義預設價值函式。我們將採用最簡單的版本(TD(0)),並稍後展示如何更改它。

from torchrl.objectives.utils import ValueEstimators

default_value_estimator = ValueEstimators.TD0

我們還需要根據使用者查詢向 DDPG 提供一些關於如何構建價值估計器的指示。根據提供的估計器,我們將構建相應的模組以在訓練時使用

from torchrl.objectives.utils import default_value_kwargs
from torchrl.objectives.value import TD0Estimator, TD1Estimator, TDLambdaEstimator


def make_value_estimator(self, value_type: ValueEstimators, **hyperparams):
    hp = dict(default_value_kwargs(value_type))
    if hasattr(self, "gamma"):
        hp["gamma"] = self.gamma
    hp.update(hyperparams)
    value_key = "state_action_value"
    if value_type == ValueEstimators.TD1:
        self._value_estimator = TD1Estimator(value_network=self.actor_critic, **hp)
    elif value_type == ValueEstimators.TD0:
        self._value_estimator = TD0Estimator(value_network=self.actor_critic, **hp)
    elif value_type == ValueEstimators.GAE:
        raise NotImplementedError(
            f"Value type {value_type} it not implemented for loss {type(self)}."
        )
    elif value_type == ValueEstimators.TDLambda:
        self._value_estimator = TDLambdaEstimator(value_network=self.actor_critic, **hp)
    else:
        raise NotImplementedError(f"Unknown value type {value_type}")
    self._value_estimator.set_keys(value=value_key)

可以呼叫 make_value_estimator 方法,但並非必須:如果未呼叫,LossModule 將使用其預設估計器查詢此方法。

Actor 損失方法

RL 演算法的核心是 Actor 的訓練損失。對於 DDPG,此函式非常簡單:我們只需要計算使用策略計算的動作的相關價值,並最佳化 Actor 的權重以最大化此價值。

在計算此值時,我們必須確保將值引數從圖中移除,否則 Actor 和價值損失將混合在一起。為此,可以使用 hold_out_params() 函式。

def _loss_actor(
    self,
    tensordict,
) -> torch.Tensor:
    td_copy = tensordict.select(*self.actor_in_keys)
    # Get an action from the actor network: since we made it functional, we need to pass the params
    with self.actor_network_params.to_module(self.actor_network):
        td_copy = self.actor_network(td_copy)
    # get the value associated with that action
    with self.value_network_params.detach().to_module(self.value_network):
        td_copy = self.value_network(td_copy)
    return -td_copy.get("state_action_value")

價值損失方法

現在我們需要最佳化我們的價值網路引數。為此,我們將依賴我們類的價值估計器

from torchrl.objectives.utils import distance_loss


def _loss_value(
    self,
    tensordict,
):
    td_copy = tensordict.clone()

    # V(s, a)
    with self.value_network_params.to_module(self.value_network):
        self.value_network(td_copy)
    pred_val = td_copy.get("state_action_value").squeeze(-1)

    # we manually reconstruct the parameters of the actor-critic, where the first
    # set of parameters belongs to the actor and the second to the value function.
    target_params = TensorDict(
        {
            "module": {
                "0": self.target_actor_network_params,
                "1": self.target_value_network_params,
            }
        },
        batch_size=self.target_actor_network_params.batch_size,
        device=self.target_actor_network_params.device,
    )
    with target_params.to_module(self.actor_critic):
        target_value = self.value_estimator.value_estimate(tensordict).squeeze(-1)

    # Computes the value loss: L2, L1 or smooth L1 depending on `self.loss_function`
    loss_value = distance_loss(pred_val, target_value, loss_function=self.loss_function)
    td_error = (pred_val - target_value).pow(2)

    return loss_value, td_error, pred_val, target_value

將內容放入 forward 呼叫中

唯一缺失的部分是 forward 方法,它將連線價值和 Actor 損失,收整合本值並將它們寫入提供給使用者的 TensorDict

from tensordict import TensorDict, TensorDictBase


def _forward(self, input_tensordict: TensorDictBase) -> TensorDict:
    loss_value, td_error, pred_val, target_value = self.loss_value(
        input_tensordict,
    )
    td_error = td_error.detach()
    td_error = td_error.unsqueeze(input_tensordict.ndimension())
    if input_tensordict.device is not None:
        td_error = td_error.to(input_tensordict.device)
    input_tensordict.set(
        "td_error",
        td_error,
        inplace=True,
    )
    loss_actor = self.loss_actor(input_tensordict)
    return TensorDict(
        source={
            "loss_actor": loss_actor.mean(),
            "loss_value": loss_value.mean(),
            "pred_value": pred_val.mean().detach(),
            "target_value": target_value.mean().detach(),
            "pred_value_max": pred_val.max().detach(),
            "target_value_max": target_value.max().detach(),
        },
        batch_size=[],
    )


from torchrl.objectives import LossModule


class DDPGLoss(LossModule):
    default_value_estimator = default_value_estimator
    make_value_estimator = make_value_estimator

    __init__ = _init
    forward = _forward
    loss_value = _loss_value
    loss_actor = _loss_actor

現在我們有了損失,我們可以使用它來訓練一個策略來解決一個控制任務。

環境

在大多數演算法中,首先需要處理的是環境的構建,因為它會影響訓練指令碼的其餘部分。

在此示例中,我們將使用"cheetah"任務。目標是讓半獵豹儘可能快地奔跑。

在 TorchRL 中,可以透過依賴 dm_controlgym 來建立此類任務。

env = GymEnv("HalfCheetah-v4")

env = DMControlEnv("cheetah", "run")

預設情況下,這些環境停用渲染。從狀態訓練通常比從影像訓練更容易。為了簡單起見,我們只關注從狀態學習。要將畫素傳遞給由 env.step() 收集的 tensordicts,只需將 from_pixels=True 引數傳遞給建構函式。

env = GymEnv("HalfCheetah-v4", from_pixels=True, pixels_only=True)

我們編寫了一個 make_env() 輔助函式,它將使用上面考慮的兩個後端之一(dm-controlgym)建立環境。

from torchrl.envs.libs.dm_control import DMControlEnv
from torchrl.envs.libs.gym import GymEnv

env_library = None
env_name = None


def make_env(from_pixels=False):
    """Create a base ``env``."""
    global env_library
    global env_name

    if backend == "dm_control":
        env_name = "cheetah"
        env_task = "run"
        env_args = (env_name, env_task)
        env_library = DMControlEnv
    elif backend == "gym":
        env_name = "HalfCheetah-v4"
        env_args = (env_name,)
        env_library = GymEnv
    else:
        raise NotImplementedError

    env_kwargs = {
        "device": device,
        "from_pixels": from_pixels,
        "pixels_only": from_pixels,
        "frame_skip": 2,
    }
    env = env_library(*env_args, **env_kwargs)
    return env

變換 (Transforms)

現在我們有了一個基礎環境,我們可能希望修改其表示以使其更適合策略。在 TorchRL 中,轉換會附加到專門的 torchr.envs.TransformedEnv 類中的基礎環境中。

  • 在 DDPG 中,通常使用一些啟發式值對獎勵進行重縮放。在此示例中,我們將獎勵乘以 5。

  • 如果我們使用 dm_control,那麼構建一個介面來連線處理雙精度數字的模擬器和我們指令碼(可能使用單精度數字)也很重要。此轉換是雙向的:呼叫 env.step() 時,我們的動作需要表示為雙精度,輸出需要轉換為單精度。 DoubleToFloat 轉換正好執行此操作:in_keys 列表指的是需要從雙精度轉換為單精度的鍵,而 in_keys_inv 指的是在傳遞給環境之前需要轉換為雙精度的鍵。

  • 我們使用 CatTensors 轉換將狀態鍵連線在一起。

  • 最後,我們也保留了狀態歸一化的可能性:我們將在稍後負責計算歸一化常數。

from torchrl.envs import (
    CatTensors,
    DoubleToFloat,
    EnvCreator,
    InitTracker,
    ObservationNorm,
    ParallelEnv,
    RewardScaling,
    StepCounter,
    TransformedEnv,
)


def make_transformed_env(
    env,
):
    """Apply transforms to the ``env`` (such as reward scaling and state normalization)."""

    env = TransformedEnv(env)

    # we append transforms one by one, although we might as well create the
    # transformed environment using the `env = TransformedEnv(base_env, transforms)`
    # syntax.
    env.append_transform(RewardScaling(loc=0.0, scale=reward_scaling))

    # We concatenate all states into a single "observation_vector"
    # even if there is a single tensor, it'll be renamed in "observation_vector".
    # This facilitates the downstream operations as we know the name of the
    # output tensor.
    # In some environments (not half-cheetah), there may be more than one
    # observation vector: in this case this code snippet will concatenate them
    # all.
    selected_keys = list(env.observation_spec.keys())
    out_key = "observation_vector"
    env.append_transform(CatTensors(in_keys=selected_keys, out_key=out_key))

    # we normalize the states, but for now let's just instantiate a stateless
    # version of the transform
    env.append_transform(ObservationNorm(in_keys=[out_key], standard_normal=True))

    env.append_transform(DoubleToFloat())

    env.append_transform(StepCounter(max_frames_per_traj))

    # We need a marker for the start of trajectories for our Ornstein-Uhlenbeck (OU)
    # exploration:
    env.append_transform(InitTracker())

    return env

並行執行

以下輔助函式允許我們並行執行環境。並行執行環境可以顯著加快收集吞吐量。使用轉換後的環境時,我們需要選擇是為每個環境單獨執行轉換,還是集中資料並批次處理。兩種方法都易於編碼

env = ParallelEnv(
    lambda: TransformedEnv(GymEnv("HalfCheetah-v4"), transforms),
    num_workers=4
)
env = TransformedEnv(
    ParallelEnv(lambda: GymEnv("HalfCheetah-v4"), num_workers=4),
    transforms
)

為了利用 PyTorch 的向量化能力,我們採用第一種方法

def parallel_env_constructor(
    env_per_collector,
    transform_state_dict,
):
    if env_per_collector == 1:

        def make_t_env():
            env = make_transformed_env(make_env())
            env.transform[2].init_stats(3)
            env.transform[2].loc.copy_(transform_state_dict["loc"])
            env.transform[2].scale.copy_(transform_state_dict["scale"])
            return env

        env_creator = EnvCreator(make_t_env)
        return env_creator

    parallel_env = ParallelEnv(
        num_workers=env_per_collector,
        create_env_fn=EnvCreator(lambda: make_env()),
        create_env_kwargs=None,
        pin_memory=False,
    )
    env = make_transformed_env(parallel_env)
    # we call `init_stats` for a limited number of steps, just to instantiate
    # the lazy buffers.
    env.transform[2].init_stats(3, cat_dim=1, reduce_dim=[0, 1])
    env.transform[2].load_state_dict(transform_state_dict)
    return env


# The backend can be ``gym`` or ``dm_control``
backend = "gym"

注意

frame_skip 使用一個動作將多個步驟批處理在一起。如果 > 1,則需要調整其他幀計數(例如,frames_per_batch、total_frames)以獲得跨實驗一致的總收集幀數。這很重要,因為提高 frame_skip 但保持總幀數不變可能看起來像作弊:所有因素都相同,具有 frame_skip 為 2 的 10M 元素資料集和具有 frame_skip 為 1 的資料集實際上具有 2:1 的環境互動比例!簡而言之,在處理 frame skipping 時,應謹慎考慮訓練指令碼的幀計數,因為這可能導致訓練策略之間的比較存在偏差。

縮放獎勵有助於我們控制訊號幅度,以實現更有效的學習。

reward_scaling = 5.0

我們還定義了軌跡何時將被截斷。對於 cheetah 任務,一千步(如果 frame_skip = 2,則為 500 步)是一個不錯的數字。

max_frames_per_traj = 500

觀察值歸一化

為了計算歸一化統計資料,我們執行任意數量的隨機步驟,並在收集的觀察值中計算均值和標準差。為此可以使用 ObservationNorm.init_stats() 方法。為了獲得彙總統計資料,我們建立一個虛擬環境並執行它給定的步數,在給定的步數上收集資料並計算其彙總統計資料。

def get_env_stats():
    """Gets the stats of an environment."""
    proof_env = make_transformed_env(make_env())
    t = proof_env.transform[2]
    t.init_stats(init_env_steps)
    transform_state_dict = t.state_dict()
    proof_env.close()
    return transform_state_dict

歸一化統計資料

用於使用 ObservationNorm 進行統計資料計算的隨機步數

init_env_steps = 5000

transform_state_dict = get_env_stats()

每個資料收集器中的環境數量

env_per_collector = 4

我們將之前計算的統計資料傳遞給我們的環境的輸出進行歸一化

parallel_env = parallel_env_constructor(
    env_per_collector=env_per_collector,
    transform_state_dict=transform_state_dict,
)


from torchrl.data import Composite

構建模型

現在我們轉向模型的設定。正如我們所見,DDPG 需要一個價值網路,該網路經過訓練以估計狀態-動作對的價值,以及一個引數化 Actor,它學習如何選擇最大化該價值的動作。

回想一下,構建 TorchRL 模組需要兩個步驟:

  • 編寫將用作網路的torch.nn.Module

  • 將網路包裝在 tensordict.nn.TensorDictModule 中,其中透過指定輸入和輸出鍵來處理資料流。

在更復雜的場景中,還可以使用 tensordict.nn.TensorDictSequential

Q 值網路被包裝在 ValueOperator 中,它會自動將 out_keys 設定為"state_action_value"(對於 q 值網路)和"state_value"(對於其他價值網路)。

TorchRL 提供了 DDPG 網路內建版本,如原始論文所示。這些可以在 DdpgMlpActorDdpgMlpQNet 中找到。

由於我們使用惰性模組,因此在能夠將策略從裝置移動到裝置並執行其他操作之前,有必要具體化惰性模組。因此,使用少量資料執行模組是一個好習慣。為此,我們從環境規範生成偽資料。

from torchrl.modules import (
    ActorCriticWrapper,
    DdpgMlpActor,
    DdpgMlpQNet,
    OrnsteinUhlenbeckProcessModule,
    ProbabilisticActor,
    TanhDelta,
    ValueOperator,
)


def make_ddpg_actor(
    transform_state_dict,
    device="cpu",
):
    proof_environment = make_transformed_env(make_env())
    proof_environment.transform[2].init_stats(3)
    proof_environment.transform[2].load_state_dict(transform_state_dict)

    out_features = proof_environment.action_spec.shape[-1]

    actor_net = DdpgMlpActor(
        action_dim=out_features,
    )

    in_keys = ["observation_vector"]
    out_keys = ["param"]

    actor = TensorDictModule(
        actor_net,
        in_keys=in_keys,
        out_keys=out_keys,
    )

    actor = ProbabilisticActor(
        actor,
        distribution_class=TanhDelta,
        in_keys=["param"],
        spec=Composite(action=proof_environment.action_spec),
    ).to(device)

    q_net = DdpgMlpQNet()

    in_keys = in_keys + ["action"]
    qnet = ValueOperator(
        in_keys=in_keys,
        module=q_net,
    ).to(device)

    # initialize lazy modules
    qnet(actor(proof_environment.reset().to(device)))
    return actor, qnet


actor, qnet = make_ddpg_actor(
    transform_state_dict=transform_state_dict,
    device=device,
)

探索

策略被傳遞到 OrnsteinUhlenbeckProcessModule 探索模組,如原始論文所述。讓我們定義 OU 噪聲達到最小值之前的幀數。

annealing_frames = 1_000_000

actor_model_explore = TensorDictSequential(
    actor,
    OrnsteinUhlenbeckProcessModule(
        spec=actor.spec.clone(),
        annealing_num_steps=annealing_frames,
    ).to(device),
)
if device == torch.device("cpu"):
    actor_model_explore.share_memory()

資料收集器 (Data collector)

TorchRL 提供了專門的類來幫助您透過在環境中執行策略來收集資料。這些“資料收集器”會迭代地計算給定時間要執行的動作,然後執行環境中的一個步驟,並在需要時重置環境。資料收集器的設計目的是讓開發者能夠嚴格控制每批資料的幀數、收集的(非同步)性質以及分配給資料收集器的資源(例如 GPU、工作執行緒數等)。

在這裡,我們將使用 SyncDataCollector,這是一個簡單的單程序資料收集器。TorchRL 提供其他收集器,例如 MultiaSyncDataCollector,它以非同步方式執行滾動,(例如,資料將在策略最佳化時收集,從而將訓練和資料收集解耦)。

需要指定的引數是:

  • 環境工廠或環境,

  • 策略,

  • 在收集器被視為為空之前,總幀數。

  • 每條軌跡的最大幀數(對於非終止環境,如 dm_control 環境很有用)。

    注意

    傳遞給收集器的 max_frames_per_traj 引數將導致在用於推理的環境中註冊一個新的 StepCounter 轉換。我們可以在此指令碼中手動實現相同的結果。

還應傳遞:

  • 每個收集批次中的幀數,

  • 獨立於策略執行的隨機步數,

  • 用於策略執行的裝置。

  • 用於在資料傳遞給主程序之前儲存資料的裝置。

訓練期間使用的總幀數應約為 100 萬。

total_frames = 10_000  # 1_000_000

在外部迴圈的每次迭代中,收集器返回的總幀數等於子軌跡的長度乘以在每個收集器中並行執行的環境數量。

換句話說,我們期望收集器的批次形狀為 [env_per_collector, traj_len],其中 traj_len=frames_per_batch/env_per_collector

traj_len = 200
frames_per_batch = env_per_collector * traj_len
init_random_frames = 5000
num_collectors = 2

from torchrl.collectors import SyncDataCollector
from torchrl.envs import ExplorationType

collector = SyncDataCollector(
    parallel_env,
    policy=actor_model_explore,
    total_frames=total_frames,
    frames_per_batch=frames_per_batch,
    init_random_frames=init_random_frames,
    reset_at_each_iter=False,
    split_trajs=False,
    device=collector_device,
    exploration_type=ExplorationType.RANDOM,
)

評估器:構建您的記錄物件

由於訓練資料是透過某種探索策略獲得的,因此需要以確定性模式評估我們演算法的真實效能。我們使用一個專用類 LogValidationReward 來做到這一點,該類以給定的頻率在環境中執行策略並返回從這些模擬中獲得的統計資料。

以下輔助函式構建此物件。

from torchrl.trainers import LogValidationReward


def make_recorder(actor_model_explore, transform_state_dict, record_interval):
    base_env = make_env()
    environment = make_transformed_env(base_env)
    environment.transform[2].init_stats(
        3
    )  # must be instantiated to load the state dict
    environment.transform[2].load_state_dict(transform_state_dict)

    recorder_obj = LogValidationReward(
        record_frames=1000,
        policy_exploration=actor_model_explore,
        environment=environment,
        exploration_type=ExplorationType.DETERMINISTIC,
        record_interval=record_interval,
    )
    return recorder_obj

我們將每收集 10 個批次記錄一次效能。

record_interval = 10

recorder = make_recorder(
    actor_model_explore, transform_state_dict, record_interval=record_interval
)

from torchrl.data.replay_buffers import (
    LazyMemmapStorage,
    PrioritizedSampler,
    RandomSampler,
    TensorDictReplayBuffer,
)

回放緩衝區 (Replay buffer)

回放緩衝區有兩種型別:優先(其中使用某些錯誤訊號來賦予某些項比其他項更高的取樣機率)和常規的迴圈經驗回放。

TorchRL 回放緩衝區是可組合的:可以選擇儲存、取樣和寫入策略。還可以使用記憶體對映陣列將張量儲存在物理記憶體中。以下函式負責使用所需的超引數建立回放緩衝區。

from torchrl.envs import RandomCropTensorDict


def make_replay_buffer(buffer_size, batch_size, random_crop_len, prefetch=3, prb=False):
    if prb:
        sampler = PrioritizedSampler(
            max_capacity=buffer_size,
            alpha=0.7,
            beta=0.5,
        )
    else:
        sampler = RandomSampler()
    replay_buffer = TensorDictReplayBuffer(
        storage=LazyMemmapStorage(
            buffer_size,
            scratch_dir=buffer_scratch_dir,
        ),
        batch_size=batch_size,
        sampler=sampler,
        pin_memory=False,
        prefetch=prefetch,
        transform=RandomCropTensorDict(random_crop_len, sample_dim=1),
    )
    return replay_buffer

我們將把回放緩衝區儲存在磁碟上的臨時目錄中。

import tempfile

tmpdir = tempfile.TemporaryDirectory()
buffer_scratch_dir = tmpdir.name

回放緩衝區儲存和批處理大小

TorchRL 回放緩衝區沿第一個維度計數元素。由於我們將把軌跡饋送到緩衝區,我們需要透過除以資料收集器生成的子軌跡長度來調整緩衝區大小。關於批處理大小,我們的取樣策略將包括對長度為 traj_len=200 的軌跡進行取樣,然後再選擇長度為 random_crop_len=25 的子軌跡,在此之上將計算損失。此策略平衡了儲存特定長度的整個軌跡的選擇與為我們的損失提供足夠異質性樣本的需求。下圖顯示了資料流:從一個收集器,該收集器在每個批次中獲取 8 幀,在其中 2 個環境並行執行,將它們饋送到包含 1000 條軌跡的回放緩衝區,並從中取樣長度為 2 個時間步的子軌跡。

Storing trajectories in the replay buffer

讓我們從緩衝區中儲存的幀數開始。

def ceil_div(x, y):
    return -x // (-y)


buffer_size = 1_000_000
buffer_size = ceil_div(buffer_size, traj_len)

優先回放緩衝區預設停用。

prb = False

我們還需要定義每個收集的批次將執行多少次更新。這被稱為更新到資料比率或 UTD 比率。

update_to_data = 64

我們將使用長度為 25 的軌跡饋送損失。

random_crop_len = 25

在原始論文中,作者對收集的每一幀進行 64 個元素的批處理進行一次更新。在這裡,我們複製相同的比例,但在每次批處理收集時進行多次更新。我們調整批處理大小以實現相同的每幀更新比率。

batch_size = ceil_div(64 * frames_per_batch, update_to_data * random_crop_len)

replay_buffer = make_replay_buffer(
    buffer_size=buffer_size,
    batch_size=batch_size,
    random_crop_len=random_crop_len,
    prefetch=3,
    prb=prb,
)

損失模組構建

我們使用剛剛建立的 Actor 和 qnet 來構建我們的損失模組。由於我們需要更新目標引數,因此我們**必須**建立一個目標網路更新器。

gamma = 0.99
lmbda = 0.9
tau = 0.001  # Decay factor for the target network

loss_module = DDPGLoss(actor, qnet)

讓我們使用 TD(lambda) 估計器!

loss_module.make_value_estimator(ValueEstimators.TDLambda, gamma=gamma, lmbda=lmbda)

注意

非策略通常指定 TD(0) 估計器。在這裡,我們使用 TD(\(\lambda\)) 估計器,它會引入一些偏差,因為在某個狀態之後跟隨的軌跡是用過時的策略收集的。這個技巧,就像資料收集期間可以使用多步技巧一樣,是“技巧”的替代版本,這些技巧通常在實踐中效果很好,儘管它們會在回報估計中引入一些偏差。

目標網路更新器

目標網路是非策略 RL 演算法的關鍵部分。透過 HardUpdateSoftUpdate 類,可以輕鬆更新目標網路引數。它們以損失模組作為引數構建,並透過在訓練迴圈的適當位置呼叫updater.step() 來實現更新。

from torchrl.objectives.utils import SoftUpdate

target_net_updater = SoftUpdate(loss_module, eps=1 - tau)

最佳化器

最後,我們將使用 Adam 最佳化器來更新策略和價值網路。

from torch import optim

optimizer_actor = optim.Adam(
    loss_module.actor_network_params.values(True, True), lr=1e-4, weight_decay=0.0
)
optimizer_value = optim.Adam(
    loss_module.value_network_params.values(True, True), lr=1e-3, weight_decay=1e-2
)
total_collection_steps = total_frames // frames_per_batch

訓練策略的時間

現在我們已經構建了所有需要的模組,訓練迴圈非常直接。

rewards = []
rewards_eval = []

# Main loop

collected_frames = 0
pbar = tqdm.tqdm(total=total_frames)
r0 = None
for i, tensordict in enumerate(collector):

    # update weights of the inference policy
    collector.update_policy_weights_()

    if r0 is None:
        r0 = tensordict["next", "reward"].mean().item()
    pbar.update(tensordict.numel())

    # extend the replay buffer with the new data
    current_frames = tensordict.numel()
    collected_frames += current_frames
    replay_buffer.extend(tensordict.cpu())

    # optimization steps
    if collected_frames >= init_random_frames:
        for _ in range(update_to_data):
            # sample from replay buffer
            sampled_tensordict = replay_buffer.sample().to(device)

            # Compute loss
            loss_dict = loss_module(sampled_tensordict)

            # optimize
            loss_dict["loss_actor"].backward()
            gn1 = torch.nn.utils.clip_grad_norm_(
                loss_module.actor_network_params.values(True, True), 10.0
            )
            optimizer_actor.step()
            optimizer_actor.zero_grad()

            loss_dict["loss_value"].backward()
            gn2 = torch.nn.utils.clip_grad_norm_(
                loss_module.value_network_params.values(True, True), 10.0
            )
            optimizer_value.step()
            optimizer_value.zero_grad()

            gn = (gn1**2 + gn2**2) ** 0.5

            # update priority
            if prb:
                replay_buffer.update_tensordict_priority(sampled_tensordict)
            # update target network
            target_net_updater.step()

    rewards.append(
        (
            i,
            tensordict["next", "reward"].mean().item(),
        )
    )
    td_record = recorder(None)
    if td_record is not None:
        rewards_eval.append((i, td_record["r_evaluation"].item()))
    if len(rewards_eval) and collected_frames >= init_random_frames:
        target_value = loss_dict["target_value"].item()
        loss_value = loss_dict["loss_value"].item()
        loss_actor = loss_dict["loss_actor"].item()
        rn = sampled_tensordict["next", "reward"].mean().item()
        rs = sampled_tensordict["next", "reward"].std().item()
        pbar.set_description(
            f"reward: {rewards[-1][1]: 4.2f} (r0 = {r0: 4.2f}), "
            f"reward eval: reward: {rewards_eval[-1][1]: 4.2f}, "
            f"reward normalized={rn :4.2f}/{rs :4.2f}, "
            f"grad norm={gn: 4.2f}, "
            f"loss_value={loss_value: 4.2f}, "
            f"loss_actor={loss_actor: 4.2f}, "
            f"target value: {target_value: 4.2f}"
        )

    # update the exploration strategy
    actor_model_explore[1].step(current_frames)

collector.shutdown()
del collector

try:
    parallel_env.close()
    del parallel_env
except Exception:
    pass

實驗結果

我們對訓練期間的平均獎勵進行了一個簡單的繪圖。我們可以觀察到我們的策略在解決任務方面表現得相當好。

注意

如上所述,為了獲得更合理的結果,請使用更大的 total_frames 值,例如 100 萬。

from matplotlib import pyplot as plt

plt.figure()
plt.plot(*zip(*rewards), label="training")
plt.plot(*zip(*rewards_eval), label="eval")
plt.legend()
plt.xlabel("iter")
plt.ylabel("reward")
plt.tight_layout()

結論

在本教程中,我們以 DDPG 為具體示例,學習瞭如何在 TorchRL 中編寫損失模組。

關鍵要點是:

  • 如何使用 LossModule 類來編寫新的損失元件;

  • 如何使用(或不使用)目標網路,以及如何更新其引數;

  • 如何建立與損失模組關聯的最佳化器。

下一步

為了進一步迭代此損失模組,我們可以考慮:

由 Sphinx-Gallery 生成的畫廊

文件

訪問全面的 PyTorch 開發者文件

檢視文件

教程

為初學者和高階開發者提供深入的教程

檢視教程

資源

查詢開發資源並讓您的問題得到解答

檢視資源