快捷方式

torchrl.data 包

經驗回放緩衝區

經驗回放緩衝區是離策略強化學習演算法的核心部分。TorchRL 提供了幾種常用經驗回放緩衝區的有效實現。

ReplayBuffer(*[, storage, sampler, writer, ...])

一個通用、可組合的經驗回放緩衝區類。

PrioritizedReplayBuffer(*, alpha, beta[, ...])

優先經驗回放緩衝區。

TensorDictReplayBuffer(*[, priority_key])

TensorDict 特定的 ReplayBuffer 類包裝器。

TensorDictPrioritizedReplayBuffer(*, alpha, beta)

TensorDict 特定的 PrioritizedReplayBuffer 類包裝器。

RayReplayBuffer(*args, replay_buffer_cls, ...)

Replay Buffer 的 Ray 實現,可以遠端擴充套件和取樣。

RemoteTensorDictReplayBuffer(*args, **kwargs)

一個對遠端呼叫友好的 ReplayBuffer 類。

可組合的經驗回放緩衝區

我們也為使用者提供了組合經驗回放緩衝區的能力。我們為經驗回放緩衝區的用法提供了廣泛的解決方案,包括支援幾乎任何資料型別;記憶體、裝置或物理記憶體儲存;多種取樣策略;變換的使用等。

支援的資料型別和儲存選擇

理論上,經驗回放緩衝區支援任何資料型別,但我們無法保證每個元件都會支援任何資料型別。最基礎的經驗回放緩衝區實現由一個 ReplayBuffer 基類和一個 ListStorage 儲存組成。這非常低效,但它允許您儲存具有非張量資料的複雜資料結構。連續記憶體中的儲存包括 TensorStorageLazyTensorStorageLazyMemmapStorage。這些類將 TensorDict 資料作為一流公民支援,但也支援任何 PyTree 資料結構(例如,元組、列表、字典及其巢狀版本)。TensorStorage 儲存要求您在構造時提供儲存,而 TensorStorage(RAM、CUDA)和 LazyMemmapStorage(物理記憶體)將在第一次擴充套件後為您預先分配儲存。

以下是一些示例,首先從通用的 ListStorage 開始

>>> from torchrl.data.replay_buffers import ReplayBuffer, ListStorage
>>> rb = ReplayBuffer(storage=ListStorage(10))
>>> rb.add("a string!") # first element will be a string
>>> rb.extend([30, None])  # element [1] is an int, [2] is None

寫入緩衝區的主要入口點是 add()extend()。還可以使用 __setitem__(),在這種情況下,資料會寫入指定位置,而不會更新緩衝區的長度或遊標。當從緩衝區取樣專案並在之後原地更新其值時,這會很有用。

使用 TensorStorage,我們告訴 RB 希望儲存是連續的,這效率最高,但也更受限制。

>>> import torch
>>> from torchrl.data.replay_buffers import ReplayBuffer, TensorStorage
>>> container = torch.empty(10, 3, 64, 64, dtype=torch.unit8)
>>> rb = ReplayBuffer(storage=TensorStorage(container))
>>> img = torch.randint(255, (3, 64, 64), dtype=torch.uint8)
>>> rb.add(img)

接下來,我們可以避免建立容器,讓儲存自動建立。這在使用 PyTrees 和 tensordicts 時非常有用!對於 PyTrees 和其他資料結構,add() 將傳遞給它的樣本視為單個型別的例項。extend() 另一方面,它將認為資料是可迭代的。對於張量、tensordicts 和列表(如下文所示),可迭代物件在根級別查詢。對於 PyTrees,我們假設樹中所有葉子(張量)的前導維度匹配。如果不匹配,extend 將引發異常。

>>> import torch
>>> from tensordict import TensorDict
>>> from torchrl.data.replay_buffers import ReplayBuffer, LazyMemmapStorage
>>> rb_td = ReplayBuffer(storage=LazyMemmapStorage(10), batch_size=1)  # max 10 elements stored
>>> rb_td.add(TensorDict({"img": torch.randint(255, (3, 64, 64), dtype=torch.unit8),
...     "labels": torch.randint(100, ())}, batch_size=[]))
>>> rb_pytree = ReplayBuffer(storage=LazyMemmapStorage(10))  # max 10 elements stored
>>> # extend with a PyTree where all tensors have the same leading dim (3)
>>> rb_pytree.extend({"a": {"b": torch.randn(3), "c": [torch.zeros(3, 2), (torch.ones(3, 10),)]}})
>>> assert len(rb_pytree) == 3  # the replay buffer has 3 elements!

注意

extend() 在處理值列表時可能具有模糊的簽名,這些值列表應解釋為 PyTree(在這種情況下,列表中的所有元素都將放入儲存中的 PyTree 的一個切片中)或逐個新增值的列表。為了解決這個問題,TorchRL 明確區分列表和元組:元組將被視為 PyTree,列表(在根級別)將被解釋為要逐個新增到緩衝區的值堆疊。

取樣和索引

經驗回放緩衝區可以被索引和取樣。索引和取樣將資料收集到儲存中的指定索引,然後透過一系列變換和 collate_fn 進行處理,這些變換和 collate_fn 可以傳遞給經驗回放緩衝區的 __init__ 函式。collate_fn 帶有預設值,在大多數情況下應符合使用者的期望,因此您通常不必擔心它。變換通常是 Transform 的例項,儘管普通函式也可以工作(後一種情況下,inv() 方法將被忽略,而在前一種情況下,它可以用於在資料傳遞給緩衝區之前進行預處理)。最後,可以透過在建構函式中將執行緒數透過 prefetch 關鍵字引數傳遞來使用多執行緒實現取樣。我們建議使用者在實際環境中對其進行基準測試,然後再採用此技術,因為不能保證它會在實踐中帶來更快的吞吐量,具體取決於使用它的機器和環境。

取樣時,batch_size 可以在構造時(例如,如果它在整個訓練過程中是恆定的)或在 sample() 方法中指定。

為了進一步完善取樣策略,我們建議您檢視我們的取樣器!

以下是一些如何從經驗回放緩衝區中獲取資料的示例

>>> first_elt = rb_td[0]
>>> storage = rb_td[:] # returns all valid elements from the buffer
>>> sample = rb_td.sample(128)
>>> for data in rb_td:  # iterate over the buffer using the sampler -- batch-size was set in the constructor to 1
...     print(data)

使用以下元件

CompressedListStorage(max_size, *[, ...])

壓縮和解壓縮資料的儲存。

CompressedListStorageCheckpointer()

CompressedListStorage 的儲存檢查點。

FlatStorageCheckpointer([done_keys, reward_keys])

以緊湊形式儲存儲存,節省 TED 格式的空間。

H5StorageCheckpointer(*[, checkpoint_file, ...])

以緊湊形式儲存儲存,節省 TED 格式的空間,並使用 H5 格式儲存資料。

ImmutableDatasetWriter([compilable])

不可變資料集的阻塞寫入器。

LazyMemmapStorage(max_size, *[, ...])

用於張量和 tensordicts 的記憶體對映儲存。

LazyTensorStorage(max_size, *[, device, ...])

用於張量和 tensordicts 的預分配張量儲存。

ListStorage([max_size, compilable, device])

儲存在列表中的儲存。

LazyStackStorage([max_size, compilable, ...])

返回 LazyStackTensorDict 例項的 ListStorage。

ListStorageCheckpointer()

ListStoage 的儲存檢查點。

NestedStorageCheckpointer([done_keys, ...])

以緊湊形式儲存儲存,節省 TED 格式的空間,並使用記憶體對映的巢狀張量。

PrioritizedSampler(max_capacity, alpha, beta)

經驗回放緩衝區的優先採樣器。

PrioritizedSliceSampler(max_capacity, alpha, ...)

使用優先採樣,沿第一個維度對資料切片進行取樣,並給出開始和停止訊號。

RandomSampler()

用於可組合經驗回放緩衝區的均勻隨機取樣器。

RoundRobinWriter([compilable])

用於可組合經驗回放緩衝區的 RoundRobin Writer 類。

Sampler()

用於可組合經驗回放緩衝區的通用取樣器基類。

SamplerWithoutReplacement([drop_last, shuffle])

一個數據消耗取樣器,可確保同一樣本不會出現在連續的批次中。

SliceSampler(*[, num_slices, slice_len, ...])

沿第一個維度對資料切片進行取樣,並給出開始和停止訊號。

SliceSamplerWithoutReplacement(*[, ...])

使用無替換取樣,沿第一個維度對資料切片進行取樣,並給出開始和停止訊號。

Storage(max_size[, checkpointer, compilable])

Storage 是經驗回放緩衝區的容器。

StorageCheckpointerBase()

儲存檢查點器的公共基類。

StorageEnsembleCheckpointer()

集合儲存的檢查點器。

TensorDictMaxValueWriter([rank_key, reduction])

用於可組合經驗回放緩衝區的寫入器類,該類根據某個排名鍵保留最高元素。

TensorDictRoundRobinWriter([compilable])

用於可組合、基於 tensordict 的經驗回放緩衝區的 RoundRobin Writer 類。

TensorStorage(storage[, max_size, device, ...])

用於張量和 tensordicts 的儲存。

TensorStorageCheckpointer()

TensorStorages 的儲存檢查點。

Writer([compilable])

經驗回放緩衝區基類的寫入器類。

儲存選擇對經驗回放緩衝區的取樣延遲有很大影響,尤其是在資料量較大的分散式強化學習設定中。LazyMemmapStorage 在分散式設定中具有共享儲存,強烈推薦使用,因為它具有較低的 MemoryMappedTensors 序列化成本,並且能夠指定檔案儲存位置以提高節點故障恢復能力。透過在 ListStorage 上的粗略基準測試(請參閱 https://github.com/pytorch/rl/tree/main/benchmarks/storage)發現,以下是平均取樣延遲的改進。

儲存型別

加速

ListStorage

1x

LazyTensorStorage

1.83x

LazyMemmapStorage

3.44x

記憶體效率的壓縮儲存

對於記憶體使用量或記憶體頻寬是主要考慮因素的應用——尤其是在儲存或傳輸影像、音訊或文字等大型感官觀測時——CompressedListStorage 透過壓縮實現了顯著的記憶體節省。

主要特點

  • 記憶體效率:透過壓縮實現顯著的記憶體節省。

  • 資料完整性:透過無失真壓縮保持完整的資料保真度。

  • 靈活壓縮:預設使用 zstd 壓縮,支援自定義壓縮演算法。

  • TensorDict 支援:與 TensorDict 結構無縫整合。

  • 檢查點:完全支援壓縮資料的儲存和載入。

  • 批次 GPU 壓縮/解壓縮:可直接從 VRAM 實現高效的經驗回放緩衝區取樣。

CompressedListStorage 在儲存時壓縮資料,在檢索時解壓縮,對於 Atari 影像實現了 95x–122x 的壓縮比,同時保持完整的資料保真度。我們在 Atari Learning Environment (ALE) 中,使用隨機策略在 Pong 遊戲中進行了為期一個回合的取樣,並在每個壓縮級別看到了這些結果。

zstd 的壓縮級別

1

3

8

12

22

ALE Pong 中的壓縮比

95x

99x

106x

111x

122x

使用示例

>>> import torch
>>> from torchrl.data import ReplayBuffer, CompressedListStorage
>>> from tensordict import TensorDict
>>>
>>> # Create a compressed storage for image data
>>> storage = CompressedListStorage(max_size=1000, compression_level=3)
>>> rb = ReplayBuffer(storage=storage, batch_size=32)
>>>
>>> # Add image data
>>> images = torch.randn(100, 3, 84, 84)  # Atari-like frames
>>> data = TensorDict({"obs": images}, batch_size=[100])
>>> rb.extend(data)
>>>
>>> # Sample data (automatically decompressed)
>>> sample = rb.sample(32)
>>> print(sample["obs"].shape)  # torch.Size([32, 3, 84, 84])

壓縮級別可以從 1(快速,壓縮少)調整到 22(慢速,壓縮多),對於大多數用例,級別 3 是一個不錯的預設值。

對於自定義壓縮演算法

>>> def my_compress(tensor):
...     return tensor.to(torch.uint8)  # Simple example
>>>
>>> def my_decompress(compressed_tensor, metadata):
...     return compressed_tensor.to(metadata["dtype"])
>>>
>>> storage = CompressedListStorage(
...     max_size=1000,
...     compression_fn=my_compress,
...     decompression_fn=my_decompress
... )

注意

CompressedListStorage 在 Python 版本至少為 3.14 時使用 zstd,否則預設為 zlib。

注意

批次 GPU 壓縮依賴於 nvidia.nvcomp,請參閱示例程式碼 examples/replay-buffers/compressed_replay_buffer.py

跨程序共享經驗回放緩衝區

只要其元件可共享,經驗回放緩衝區就可以在程序之間共享。此功能允許多個程序協同收集資料並填充共享的經驗回放緩衝區,而不是將資料集中在主程序上,這可能會產生一些資料傳輸開銷。

可共享的儲存包括 LazyMemmapStorage 或任何 TensorStorage 的子類,只要它們被例項化並且其內容被儲存為記憶體對映張量。有狀態的寫入器(如 TensorDictRoundRobinWriter)目前不可共享,有狀態的取樣器(如 PrioritizedSampler)也是如此。

共享的經驗回放緩衝區可以在任何有權訪問它的程序上讀取和擴充套件,如下例所示

>>> from torchrl.data import TensorDictReplayBuffer, LazyMemmapStorage
>>> import torch
>>> from torch import multiprocessing as mp
>>> from tensordict import TensorDict
>>>
>>> def worker(rb):
...     # Updates the replay buffer with new data
...     td = TensorDict({"a": torch.ones(10)}, [10])
...     rb.extend(td)
...
>>> if __name__ == "__main__":
...     rb = TensorDictReplayBuffer(storage=LazyMemmapStorage(21))
...     td = TensorDict({"a": torch.zeros(10)}, [10])
...     rb.extend(td)
...
...     proc = mp.Process(target=worker, args=(rb,))
...     proc.start()
...     proc.join()
...     # the replay buffer now has a length of 20, since the worker updated it
...     assert len(rb) == 20
...     assert (rb["_data", "a"][:10] == 0).all()  # data from main process
...     assert (rb["_data", "a"][10:20] == 1).all()  # data from remote process

儲存軌跡

將軌跡儲存在經驗回放緩衝區中並不困難。需要注意的一點是,經驗回放緩衝區的預設大小是儲存的前導維度的大小:換句話說,建立一個大小為 100 萬的儲存的經驗回放緩衝區並不意味著儲存 100 萬幀,而是儲存 100 萬條軌跡。但是,如果軌跡(或回合/滾動)在儲存之前被展平,容量仍將是 100 萬步。

有一種方法可以繞過這個問題,透過告訴儲存在儲存資料時應考慮的維度數量。這可以透過 ndim 關鍵字引數來實現,該引數被所有連續儲存(如 TensorStorage 等)接受。當將多維儲存傳遞給緩衝區時,緩衝區將自動將最後一個維度視為 TorchRL 中的傳統“時間”維度。可以透過 ReplayBuffer 中的 dim_extend 關鍵字引數來覆蓋此行為。這是儲存透過 ParallelEnv 或其序列對應項獲得的軌跡的推薦方法,如下文所示。

取樣軌跡時,為了多樣化學習或提高取樣效率,可能需要取樣子軌跡。TorchRL 提供了兩種不同的方法來實現這一點

  • SliceSampler 允許沿 TensorStorage 的前導維度按順序儲存的給定數量的軌跡切片進行取樣。這是 TorchRL 中取樣子軌跡的推薦方法,__尤其__是在使用離線資料集(以該約定儲存)時。此策略需要在擴充套件經驗回放緩衝區之前展平軌跡,並在取樣後對其進行重塑。SliceSampler 類文件中提供了關於此儲存和取樣策略的詳細資訊。請注意,SliceSampler 與多維儲存相容。以下示例展示瞭如何在展平 tensordict 和不展平 tensordict 的情況下使用此功能。在第一種場景中,我們從單個環境中收集資料。在這種情況下,我們滿意於一個儲存,該儲存將傳入的資料沿第一個維度連線起來,因為收集計劃不會引入中斷。

    >>> from torchrl.envs import TransformedEnv, StepCounter, GymEnv
    >>> from torchrl.collectors import SyncDataCollector, RandomPolicy
    >>> from torchrl.data import ReplayBuffer, LazyTensorStorage, SliceSampler
    >>> env = TransformedEnv(GymEnv("CartPole-v1"), StepCounter())
    >>> collector = SyncDataCollector(env,
    ...     RandomPolicy(env.action_spec),
    ...     frames_per_batch=10, total_frames=-1)
    >>> rb = ReplayBuffer(
    ...     storage=LazyTensorStorage(100),
    ...     sampler=SliceSampler(num_slices=8, traj_key=("collector", "traj_ids"),
    ...         truncated_key=None, strict_length=False),
    ...     batch_size=64)
    >>> for i, data in enumerate(collector):
    ...     rb.extend(data)
    ...     if i == 10:
    ...         break
    >>> assert len(rb) == 100, len(rb)
    >>> print(rb[:]["next", "step_count"])
    tensor([[32],
            [33],
            [34],
            [35],
            [36],
            [37],
            [38],
            [39],
            [40],
            [41],
            [11],
            [12],
            [13],
            [14],
            [15],
            [16],
            [17],
            [...
    

    如果一批中執行的環境不止一個,我們仍然可以透過呼叫 data.reshape(-1) 將資料儲存在同一個緩衝區中,該函式將 [B, T] 大小展平為 [B * T],但這意味著,例如,第一個環境的軌跡會被批次中的其他環境的軌跡交錯,這是 SliceSampler 無法處理的場景。為了解決這個問題,我們建議在儲存建構函式中使用 ndim 引數。

    >>> env = TransformedEnv(SerialEnv(2,
    ...     lambda: GymEnv("CartPole-v1")), StepCounter())
    >>> collector = SyncDataCollector(env,
    ...     RandomPolicy(env.action_spec),
    ...     frames_per_batch=1, total_frames=-1)
    >>> rb = ReplayBuffer(
    ...     storage=LazyTensorStorage(100, ndim=2),
    ...     sampler=SliceSampler(num_slices=8, traj_key=("collector", "traj_ids"),
    ...         truncated_key=None, strict_length=False),
    ...     batch_size=64)
    >>> for i, data in enumerate(collector):
    ...     rb.extend(data)
    ...     if i == 100:
    ...         break
    >>> assert len(rb) == 100, len(rb)
    >>> print(rb[:]["next", "step_count"].squeeze())
    tensor([[ 6,  5],
            [ 2,  2],
            [ 3,  3],
            [ 4,  4],
            [ 5,  5],
            [ 6,  6],
            [ 7,  7],
            [ 8,  8],
            [ 9,  9],
            [10, 10],
            [11, 11],
            [12, 12],
            [13, 13],
            [14, 14],
            [15, 15],
            [16, 16],
            [17, 17],
            [18,  1],
            [19,  2],
            [...
    
  • 軌跡也可以獨立儲存,前導維度中的每個元素指向不同的軌跡。這要求軌跡具有一致的形狀(或被填充)。我們提供了一個名為 RandomCropTensorDict 的自定義 Transform 類,允許在緩衝區中取樣子軌跡。請注意,與基於 SliceSampler 的策略不同,這裡不需要“episode”或“done”鍵指向開始和停止訊號。以下是如何使用此類的一個示例。

檢查點經驗回放緩衝區

經驗回放緩衝區的每個元件都可能是有狀態的,因此需要一種專門的方法來對其進行序列化。我們的經驗回放緩衝區提供了兩個獨立的 API 來將它們的狀態儲存在磁碟上:dumps()loads() 將使用記憶體對映張量和 JSON 檔案儲存除變換之外的每個元件的資料(儲存、寫入器、取樣器),而元資料使用 JSON 檔案儲存。

此方法適用於除 ListStorage 之外的所有類,因為其內容無法預測(因此不符合 tensordict 庫中找到的記憶體對映資料結構)。

此 API 保證,已儲存然後重新載入的緩衝區將處於完全相同的狀態,無論我們檢視其取樣器(例如,優先順序樹)、其寫入器(例如,最大寫入器堆)還是其儲存的狀態。

在底層,對 dumps() 的簡單呼叫將在每個元件的特定資料夾中呼叫公共 dumps 方法(不包括變換,因為我們通常不假設它們可以使用記憶體對映張量進行序列化)。

然而,將資料儲存為 TED 格式 可能會消耗比所需更多的記憶體。如果連續軌跡儲存在緩衝區中,我們可以透過儲存根目錄下的所有觀測值以及“next”子 tensordict 觀測值的最後一個元素來避免儲存重複的觀測值,這可以將儲存消耗量最多減少一半。為此,提供了三個檢查點類:FlatStorageCheckpointer 將丟棄重複的觀測值以壓縮 TED 格式。載入時,此類將以正確的格式重寫觀測值。如果緩衝區儲存在磁碟上,此檢查點器執行的操作不需要額外的 RAM。NestedStorageCheckpointer 將使用巢狀張量儲存軌跡,使資料表示更清晰(第一個維度上的每個元素代表一個不同的軌跡)。最後,H5StorageCheckpointer 將緩衝區儲存在 H5DB 格式中,使使用者能夠壓縮資料並節省更多空間。

警告

檢查點器對經驗回放緩衝區做出了一些限制性假設。首先,假設 done 狀態準確地表示了軌跡的結束(最後一個寫入的軌跡除外,其寫入器遊標指示了截斷訊號的位置)。對於 MARL 用途,應注意只允許“done”狀態具有與根 tensordict 相同數量的元素:如果“done”狀態包含儲存批次大小中未表示的額外元素,這些檢查點器將失敗。例如,形狀為 torch.Size([3, 4, 5]) 的“done”狀態在形狀為 torch.Size([3, 4]) 的儲存中是不允許的。

以下是如何在實踐中使用 H5DB 檢查點器的具體示例

>>> from torchrl.data import ReplayBuffer, H5StorageCheckpointer, LazyMemmapStorage
>>> from torchrl.collectors import SyncDataCollector
>>> from torchrl.envs import GymEnv, SerialEnv
>>> import torch
>>> env = SerialEnv(3, lambda: GymEnv("CartPole-v1", device=None))
>>> env.set_seed(0)
>>> torch.manual_seed(0)
>>> collector = SyncDataCollector(
>>>     env, policy=env.rand_step, total_frames=200, frames_per_batch=22
>>> )
>>> rb = ReplayBuffer(storage=LazyMemmapStorage(100, ndim=2))
>>> rb_test = ReplayBuffer(storage=LazyMemmapStorage(100, ndim=2))
>>> rb.storage.checkpointer = H5StorageCheckpointer()
>>> rb_test.storage.checkpointer = H5StorageCheckpointer()
>>> for i, data in enumerate(collector):
...     rb.extend(data)
...     assert rb._storage.max_size == 102
...     rb.dumps(path_to_save_dir)
...     rb_test.loads(path_to_save_dir)
...     assert_allclose_td(rb_test[:], rb[:])

當無法使用 dumps() 儲存資料時,另一種方法是使用 state_dict(),它返回一個可以由 torch.save() 儲存並由 torch.load() 載入,然後呼叫 load_state_dict() 的資料結構。此方法的缺點是它難以儲存大型資料結構,而這在使用經驗回放緩衝區時很常見。

TorchRL Episode Data Format (TED)

在 TorchRL 中,順序資料始終以一種特定格式呈現,稱為 TorchRL Episode Data Format (TED)。此格式對於 TorchRL 各元件的無縫整合和功能至關重要。

某些元件,例如經驗回放緩衝區,對資料格式不太敏感。但是,其他元件,尤其是環境,嚴重依賴它才能順利執行。

因此,理解 TED 及其用途,以及如何與之互動至關重要。本指南將清楚地解釋 TED、其使用原因以及如何有效地使用它。

TED 的基本原理

在強化學習(RL)領域,格式化順序資料可能是一項複雜的任務。作為實踐者,我們經常遇到在重置時(儘管不總是)提供資料,有時在軌跡的最後一步提供或丟棄資料的情況。

這種可變性意味著我們可以在資料集中觀察到不同長度的資料,並且並不總是清楚如何匹配該資料集各種元素之間的時間步長。考慮以下歧義資料集結構

>>> observation.shape
[200, 3]
>>> action.shape
[199, 4]
>>> info.shape
[200, 3]

乍一看,資訊和觀測似乎是同時提供的(在重置時各一個 + 在每個步呼叫時各一個),這與動作的元素數量少一個相對應。但是,如果資訊少一個元素,我們必須假設它在重置時被省略,或者在軌跡的最後一步未提供或未記錄。如果沒有正確的資料結構文件,就無法確定哪個資訊對應哪個時間步長。

更復雜的是,某些資料集提供不一致的資料格式,其中 observationsinfos 在回滾的開始或結束時缺失,並且這種行為通常未被記錄。TED 的主要目標是透過提供清晰一致的資料表示來消除這些歧義。

TED 的結構

TED 基於 RL 上下文中馬爾可夫決策過程 (MDP) 的規範定義。在每一步,一個觀測值會條件化一個動作,該動作會產生(1)一個新的觀測值,(2)一個任務完成的指示符(終止、截斷、完成),以及(3)一個獎勵訊號。

某些元素可能缺失(例如,在模仿學習上下文中,獎勵是可選的),或者可能透過狀態或資訊容器傳遞其他資訊。在某些情況下,在呼叫 step 時需要其他資訊才能獲取觀測值(例如,在無狀態環境模擬器中)。此外,在某些場景下,“動作”(或任何其他資料)不能表示為單個張量,需要以不同的方式組織。例如,在多代理 RL 設定中,動作、觀測值、獎勵和完成訊號可能是複合的。

TED 能夠以一種統一、明確的格式處理所有這些場景。我們透過在執行動作時設定一個限制來區分時間步 tt+1 發生的事情。換句話說,在呼叫 env.step 之前存在的所有內容都屬於 t,之後的所有內容都屬於 t+1

一般規則是,屬於時間步 t 的所有內容都儲存在 tensordict 的根目錄中,而屬於 t+1 的所有內容都儲存在 tensordict 的 "next" 條目中。示例如下

>>> data = env.reset()
>>> data = policy(data)
>>> print(env.step(data))
TensorDict(
    fields={
        action: Tensor(...),  # The action taken at time t
        done: Tensor(...),  # The done state when the action was taken (at reset)
        next: TensorDict(  # all of this content comes from the call to `step`
            fields={
                done: Tensor(...),  # The done state after the action has been taken
                observation: Tensor(...),  # The observation resulting from the action
                reward: Tensor(...),  # The reward resulting from the action
                terminated: Tensor(...),  # The terminated state after the action has been taken
                truncated: Tensor(...),  # The truncated state after the action has been taken
            batch_size=torch.Size([]),
            device=cpu,
            is_shared=False),
        observation: Tensor(...),  # the observation at reset
        terminated: Tensor(...),  # the terminated at reset
        truncated: Tensor(...),  # the truncated at reset
    batch_size=torch.Size([]),
    device=cpu,
    is_shared=False)

在回滾過程中(使用 EnvBaseSyncDataCollector),當代理重置其步數計數時(t <- t+1),"next" tensordict 的內容將被移到根目錄。您可以在此處閱讀有關環境 API 的更多資訊:這裡

在大多數情況下,根目錄中沒有 True 值的 "done" 狀態,因為任何完成狀態都將觸發(部分)重置,這將把 "done" 變為 False。但是,這僅在自動執行重置時才成立。在某些情況下,部分重置不會觸發重置,因此我們會保留這些資料,例如,這些資料應該比觀測值佔用更少的記憶體。

此格式消除了關於觀測值與其動作、資訊或完成狀態匹配的任何歧義。

關於 TED 中單例維度的說明

在 TorchRL 中,標準做法是 done 狀態(包括終止和截斷)和獎勵應具有一個可以擴充套件以匹配觀測值、狀態和動作形狀的維度,而無需重複(即,獎勵的維度必須與觀測值和/或動作,或它們的嵌入相同)。

本質上,此格式是可接受的(儘管不是嚴格強制的)

>>> print(rollout[t])
... TensorDict(
...     fields={
...         action: Tensor(n_action),
...         done: Tensor(1),  # The done state has a rightmost singleton dimension
...         next: TensorDict(
...             fields={
...                 done: Tensor(1),
...                 observation: Tensor(n_obs),
...                 reward: Tensor(1),  # The reward has a rightmost singleton dimension
...                 terminated: Tensor(1),
...                 truncated: Tensor(1),
...             batch_size=torch.Size([]),
...             device=cpu,
...             is_shared=False),
...         observation: Tensor(n_obs),  # the observation at reset
...         terminated: Tensor(1),  # the terminated at reset
...         truncated: Tensor(1),  # the truncated at reset
...     batch_size=torch.Size([]),
...     device=cpu,
...     is_shared=False)

這樣做的基本原理是確保對觀測值和/或動作的操作結果(例如,值估計)具有與獎勵和 done 狀態相同的維度數。這種一致性允許後續操作順利進行。

>>> state_value = f(observation)
>>> next_state_value = state_value + reward

如果沒有獎勵末尾的這個單例維度,廣播規則(僅在張量可以從左側擴充套件時工作)將嘗試從左側擴充套件獎勵。這可能會導致失敗(最壞情況)或引入錯誤(更壞情況)。

展平 TED 以減少記憶體消耗

TED 將觀測值複製兩次到記憶體中,這可能會影響在該格式在實踐中的可行性。由於它主要用於表示的便利性,因此可以以扁平的方式儲存資料,但在訓練期間將其表示為 TED。

這在序列化經驗回放緩衝區時特別有用:例如,TED2Flat 類確保 TED 格式的資料結構在寫入磁碟之前被展平,而 Flat2TED 載入鉤會在反序列化期間取消展平此結構。

Tensordict 的維度

在回滾過程中,所有收集的 tensordicts 都將沿著位於末尾的新維度堆疊。收集器和環境都會將此維度標記為 "time" 名稱。示例如下

>>> rollout = env.rollout(10, policy)
>>> assert rollout.shape[-1] == 10
>>> assert rollout.names[-1] == "time"

這確保了時間維度在資料結構中清晰標記並易於識別。

特殊情況和腳註

多代理資料表示

多代理資料格式文件可在 MARL 環境 API 部分訪問。

基於記憶體的策略(RNN 和 Transformer)

在上面提供的示例中,只有 env.step(data) 生成了需要在下一步讀取的資料。但是,在某些情況下,策略還會輸出需要在下一步中使用的資訊。這通常是基於 RNN 的策略的情況,它輸出一個動作以及一個需要在下一步中使用的遞迴狀態。為了適應這一點,我們建議使用者調整其 RNN 策略,將此資料寫入 tensordict 的 "next" 條目下。這確保了此內容將在下一步移到根目錄。有關更多資訊,請參閱 GRUModuleLSTMModule

多步

收集器允許使用者在讀取資料時跳過步數,累積未來 n 步的獎勵。此技術在 DQN 類演算法(如 Rainbow)中很受歡迎。MultiStep 類對來自收集器的資料批次執行此資料轉換。在這些情況下,像這樣的檢查會失敗,因為下一個觀測值會偏移 n 步。

>>> assert (data[..., 1:]["observation"] == data[..., :-1]["next", "observation"]).all()

記憶體需求如何?

天真地實現,這種資料格式消耗的記憶體大約是扁平表示的兩倍。在某些記憶體密集型設定中(例如,在 AtariDQNExperienceReplay 資料集中),我們只將 T+1 觀測值儲存在磁碟上,並在獲取時線上執行格式化。在其他情況下,我們認為 2 倍的記憶體成本是獲得更清晰表示所需的小代價。然而,為離線資料集推廣延遲表示無疑將是一個有益的功能,我們歡迎在這方面做出貢獻!

資料集

TorchRL 提供離線 RL 資料集的包裝器。這些資料以 ReplayBuffer 例項的形式提供,這意味著它們可以根據需要使用變換、取樣器和儲存進行自定義。例如,可以使用 SelectTransformExcludeTransform 將條目篩選進出資料集。

預設情況下,資料集儲存為記憶體對映張量,允許它們幾乎沒有記憶體佔用即可快速取樣。

以下是一個示例

注意

安裝依賴項是使用者的責任。對於 D4RL,需要克隆 儲存庫,因為最新的輪子未在 PyPI 上釋出。對於 OpenML,需要 scikit-learnpandas

轉換資料集

在許多情況下,原始資料不會按原樣使用。自然的方法是將 Transform 例項傳遞給資料集建構函式並即時修改樣本。這樣做會有效,但會產生額外的執行時開銷。如果變換可以(至少部分)預先應用於資料集,則可以節省大量的磁碟空間和取樣時產生的一些開銷。為此,可以使用 preprocess() 方法。此方法將在資料集的每個樣本上執行一個樣本預處理管道,並用其轉換版本替換現有資料集。

轉換後,重新建立相同的資料集將產生另一個具有相同轉換儲存的物件(除非使用了 download="force"

>>> dataset = RobosetExperienceReplay(
...     "FK1-v4(expert)/FK1_MicroOpenRandom_v2d-v4", batch_size=32, download="force"
... )
>>>
>>> def func(data):
...     return data.set("obs_norm", data.get("observation").norm(dim=-1))
...
>>> dataset.preprocess(
...     func,
...     num_workers=max(1, os.cpu_count() - 2),
...     num_chunks=1000,
...     mp_start_method="fork",
... )
>>> sample = dataset.sample()
>>> assert "obs_norm" in sample.keys()
>>> # re-recreating the dataset gives us the transformed version back.
>>> dataset = RobosetExperienceReplay(
...     "FK1-v4(expert)/FK1_MicroOpenRandom_v2d-v4", batch_size=32
... )
>>> sample = dataset.sample()
>>> assert "obs_norm" in sample.keys()

BaseDatasetExperienceReplay(*[, priority_key])

離線資料集的父類。

AtariDQNExperienceReplay(dataset_id[, ...])

Atari DQN Experience 回放類。

D4RLExperienceReplay(dataset_id, batch_size)

D4RL 的經驗回放類。

GenDGRLExperienceReplay(dataset_id[, ...])

Gen-DGRL Experience Replay 資料集。

MinariExperienceReplay(dataset_id, batch_size, *)

Minari Experience 回放資料集。

OpenMLExperienceReplay(name, batch_size[, ...])

OpenML 資料的經驗回放。

OpenXExperienceReplay(dataset_id[, ...])

Open X-Embodiment 資料集經驗回放。

RobosetExperienceReplay(dataset_id, ...[, ...])

Roboset 經驗回放資料集。

VD4RLExperienceReplay(dataset_id, batch_size, *)

V-D4RL 經驗回放資料集。

組合資料集

在離線 RL 中,同時處理多個數據集是常態。此外,TorchRL 通常具有細粒度的資料集命名法,其中每個任務單獨表示,而其他庫將這些資料集以更緊湊的方式表示。為了允許使用者組合多個數據集,我們提出了一個 ReplayBufferEnsemble 基元,它允許使用者一次從多個數據集中取樣。

如果各個資料集格式不同,可以使用 Transform 例項。在以下示例中,我們建立了兩個具有語義上相同但名稱不同的條目的虛擬資料集(("some", "key")"another_key"),並展示瞭如何重新命名它們以匹配名稱。我們還調整了影像大小,以便在取樣期間可以堆疊它們。

>>> from torchrl.envs import Comopse, ToTensorImage, Resize, RenameTransform
>>> from torchrl.data import TensorDictReplayBuffer, ReplayBufferEnsemble, LazyMemmapStorage
>>> from tensordict import TensorDict
>>> import torch
>>> rb0 = TensorDictReplayBuffer(
...     storage=LazyMemmapStorage(10),
...     transform=Compose(
...         ToTensorImage(in_keys=["pixels", ("next", "pixels")]),
...         Resize(32, in_keys=["pixels", ("next", "pixels")]),
...         RenameTransform([("some", "key")], ["renamed"]),
...     ),
... )
>>> rb1 = TensorDictReplayBuffer(
...     storage=LazyMemmapStorage(10),
...     transform=Compose(
...         ToTensorImage(in_keys=["pixels", ("next", "pixels")]),
...         Resize(32, in_keys=["pixels", ("next", "pixels")]),
...         RenameTransform(["another_key"], ["renamed"]),
...     ),
... )
>>> rb = ReplayBufferEnsemble(
...     rb0,
...     rb1,
...     p=[0.5, 0.5],
...     transform=Resize(33, in_keys=["pixels"], out_keys=["pixels33"]),
... )
>>> data0 = TensorDict(
...     {
...         "pixels": torch.randint(255, (10, 244, 244, 3)),
...         ("next", "pixels"): torch.randint(255, (10, 244, 244, 3)),
...         ("some", "key"): torch.randn(10),
...     },
...     batch_size=[10],
... )
>>> data1 = TensorDict(
...     {
...         "pixels": torch.randint(255, (10, 64, 64, 3)),
...         ("next", "pixels"): torch.randint(255, (10, 64, 64, 3)),
...         "another_key": torch.randn(10),
...     },
...     batch_size=[10],
... )
>>> rb[0].extend(data0)
>>> rb[1].extend(data1)
>>> for _ in range(2):
...     sample = rb.sample(10)
...     assert sample["next", "pixels"].shape == torch.Size([2, 5, 3, 32, 32])
...     assert sample["pixels"].shape == torch.Size([2, 5, 3, 32, 32])
...     assert sample["pixels33"].shape == torch.Size([2, 5, 3, 33, 33])
...     assert sample["renamed"].shape == torch.Size([2, 5])

ReplayBufferEnsemble(*rbs[, storages, ...])

經驗回放緩衝區的集合。

SamplerEnsemble(*samplers[, p, ...])

取樣器的集合。

StorageEnsemble(*storages[, transforms])

儲存的集合。

WriterEnsemble(*writers)

寫入器的集合。

TensorSpec

TensorSpec 父類及其子類定義了 TorchRL 中狀態、觀測值、動作、獎勵和完成狀態的基本屬性,例如它們的形狀、裝置、資料型別和域。

重要的是,您的環境規範要與它傳送和接收的輸入和輸出匹配,因為 ParallelEnv 將根據這些規範建立緩衝區以與子程序通訊。請參閱 torchrl.envs.utils.check_env_specs() 方法以進行健全性檢查。

如果需要,可以使用 make_composite_from_td() 函式從資料中自動生成規範。

規範分為兩大類:數值和分類。

數值 TensorSpec 子類。

數值

Bounded

Unbounded

有界離散

有界連續

UnboundedDiscrete

UnboundedContinuous

每當建立 Bounded 例項時,其域(由其 dtype 隱式定義或由 “domain” 關鍵字引數顯式定義)將決定例項化的類是 BoundedContinuous 還是 BoundedDiscrete 型別。對於 Unbounded 類也是如此。有關更多資訊,請參閱這些類。

分類 TensorSpec 子類。

Categorical

OneHot

MultiOneHot

Categorical

MultiCategorical

二元

gymnasium 不同,TorchRL 沒有任意規範列表的概念。如果要組合多個規範,TorchRL 假定資料將以字典形式(更具體地說,以 TensorDict 或相關格式)呈現。在這種情況下,對應的 TensorSpec 類是 Composite 規範。

儘管如此,規範可以使用 stack() 堆疊在一起:如果它們相同,它們的形狀將相應擴充套件。否則,將透過 Stacked 類建立延遲堆疊。

同樣,TensorSpecs 具有與 TensorTensorDict 一些共同的行為:它們可以像常規 Tensor 例項一樣進行重塑、索引、擠壓、解擠壓、移動到另一個裝置(to)或解綁(unbind)。

維度為 -1 的規範被認為是“動態”的,負維度表示相應資料形狀不一致。當被最佳化器或環境(例如,批處理環境,如 ParallelEnv)看到時,這些負形狀告訴 TorchRL 避免使用緩衝區,因為張量形狀是不可預測的。

TensorSpec(shape, space, device, dtype, ...)

張量元資料容器的父類。

Binary([n, shape, device, dtype])

二進位制離散張量規範。

Bounded(*args, **kwargs)

有界張量規範。

Categorical(n[, shape, device, dtype, mask])

離散張量規範。

Composite(*args, **kwargs)

TensorSpecs 的組合。

MultiCategorical(nvec[, shape, device, ...])

離散張量規範的連線。

MultiOneHot(nvec[, shape, device, dtype, ...])

單熱編碼離散張量規範的連線。

NonTensor([shape, device, dtype, ...])

非張量資料的規範。

OneHot(n[, shape, device, dtype, ...])

一維單熱編碼離散張量規範。

Stacked(*specs, dim)

張量規範堆疊的延遲表示。

StackedComposite(*args, **kwargs)

複合規範堆疊的延遲表示。

Unbounded(*args, **kwargs)

無界張量規範。

UnboundedContinuous(*args, **kwargs)

具有連續空間的 torchrl.data.Unbounded 的專用版本。

UnboundedDiscrete(*args, **kwargs)

具有離散空間的 torchrl.data.Unbounded 的專用版本。

以下類已棄用,僅指向上述類

BinaryDiscreteTensorSpec(*args, **kwargs)

已棄用的 torchrl.data.Binary 版本。

BoundedTensorSpec(*args, **kwargs)

已棄用的 torchrl.data.Bounded 版本。

CompositeSpec(*args, **kwargs)

已棄用的 torchrl.data.Composite 版本。

DiscreteTensorSpec(*args, **kwargs)

已棄用的 torchrl.data.Categorical 版本。

LazyStackedCompositeSpec(*args, **kwargs)

已棄用的 torchrl.data.StackedComposite 版本。

LazyStackedTensorSpec(*args, **kwargs)

已棄用的 torchrl.data.Stacked 版本。

MultiDiscreteTensorSpec(*args, **kwargs)

已棄用的 torchrl.data.MultiCategorical 版本。

MultiOneHotDiscreteTensorSpec(*args, **kwargs)

已棄用的 torchrl.data.MultiOneHot 版本。

NonTensorSpec(*args, **kwargs)

已棄用的 torchrl.data.NonTensor 版本。

OneHotDiscreteTensorSpec(*args, **kwargs)

已棄用的 torchrl.data.OneHot 版本。

UnboundedContinuousTensorSpec(*args, **kwargs)

連續空間的 torchrl.data.Unbounded 的已棄用版本。

UnboundedDiscreteTensorSpec(*args, **kwargs)

離散空間的 torchrl.data.Unbounded 的已棄用版本。

樹和森林

TorchRL 提供了一組類和函式,可用於有效地表示樹和森林,這對於蒙特卡洛樹搜尋 (MCTS) 演算法尤其有用。

TensorDictMap

核心而言,MCTS API 依賴於 TensorDictMap,它充當一個儲存,其索引可以是任何數值物件。在傳統的儲存(例如 TensorStorage)中,只允許使用整數索引。

>>> storage = TensorStorage(...)
>>> data = storage[3]

TensorDictMap 允許我們在儲存中進行更高階的查詢。典型的例子是當我們有一個包含一組 MDP 的儲存,並且我們想透過其初始觀測和動作對來重建軌跡。用張量的話來說,這可以用以下虛擬碼編寫:

>>> next_state = storage[observation, action]

(如果此對有多個後續狀態,則可以返回 next_states 的堆疊)。這個 API 是合理的,但會受到限制:允許由多個張量組成的觀測或動作可能難以實現。相反,我們提供一個包含這些值的 tensordict,並讓儲存知道要檢視哪些 in_keys 來查詢下一個狀態。

>>> td = TensorDict(observation=observation, action=action)
>>> next_td = storage[td]

當然,此類還允許我們使用新資料擴充套件儲存。

>>> storage[td] = next_state

這很方便,因為它允許我們表示複雜的 rollout 結構,其中在給定節點(即給定觀測)下采取不同的動作。已觀察到的所有 (observation, action) 對都可能 dẫn us to a (set of) rollout that we can use further。

MCTSForest

從初始觀測構建樹就變成了一個高效組織資料的問題。 MCTSForest 的核心有兩個儲存:第一個儲存將觀測與過去在資料集中遇到的動作的雜湊值和索引連結起來。

>>> data = TensorDict(observation=observation)
>>> metadata = forest.node_map[data]
>>> index = metadata["_index"]

其中 forest 是一個 MCTSForest 例項。然後,第二個儲存會跟蹤與觀測相關的動作和結果。

>>> next_data = forest.data_map[index]

通常,next_data 條目可以具有任何形狀,但它通常會匹配 index 的形狀(因為每個索引對應一個動作)。一旦獲得 next_data,就可以將其與 data 組合以形成一組節點,並且可以為每個節點擴充套件樹。下圖展示了這是如何完成的。

../_images/collector-copy.png

MCTSForest 物件構建 Tree。流程圖表示一棵樹如何從初始觀測 o 構建。 get_tree 方法將輸入資料結構(根節點)傳遞給 node_map TensorDictMap 例項,該例項返回一組雜湊值和索引。然後,使用這些索引來查詢與根節點關聯的相應的動作、下一個觀測、獎勵等元組。從每個元組建立一個頂點(如果需要緊湊表示,則可能帶有更長的 rollout)。然後,頂點堆疊用於進一步構建樹,這些頂點堆疊在一起構成了根處的樹分支。此過程會重複進行,直到達到給定的深度或樹無法再擴充套件為止。

BinaryToDecimal(num_bits, device, dtype[, ...])

一個將二進位制編碼的張量轉換為十進位制的模組。

HashToInt()

將雜湊值轉換為可用於索引連續儲存的整數。

MCTSForest(*[, data_map, node_map, ...])

MCTS 樹的集合。

QueryModule(*args, **kwargs)

一個用於為儲存生成相容索引的模組。

RandomProjectionHash(*[, n_components, ...])

一個結合了隨機投影和 SipHash 的模組,以獲得低維張量,便於透過 SipHash 進行嵌入。

SipHash([as_tensor])

一個用於計算給定張量的 SipHash 值的模組。

TensorDictMap(*args, **kwargs)

TensorDict 的 Map 儲存。

TensorMap()

實現不同儲存的抽象。

Tree([count, wins, index, hash, node_id, ...])

大型語言模型和基於人類反饋的強化學習 (RLHF)

警告

這些 API 已棄用,將在未來移除。請改用 torchrl.data.llm 模組。有關更多資訊,請參閱完整的 LLM 文件

資料在 LLM 後期訓練(例如 GRPO 或基於人類反饋的強化學習 (RLHF))中至關重要。鑑於這些技術通常用於語言領域,而該領域在庫的其他 RL 子領域中很少涉及,因此我們提供專門的工具來促進與 datasets 等外部庫的互動。這些工具包括用於標記化資料、將其格式化為適合 TorchRL 模組的格式以及最佳化儲存以實現高效取樣。

PairwiseDataset(chosen_data, rejected_data, ...)

PromptData(input_ids, attention_mask, ...[, ...])

PromptTensorDictTokenizer(tokenizer, max_length)

Prompt 資料集的標記化配方。

RewardData(input_ids, attention_mask[, ...])

RolloutFromModel(model, ref_model, reward_model)

用於執行因果語言模型 rollout 的類。

TensorDictTokenizer(tokenizer, max_length[, ...])

用於對文字示例應用標記器的程序函式工廠。

TokenizedDatasetLoader(split, max_length, ...)

載入標記化的資料集,並快取其記憶體對映副本。

create_infinite_iterator(iterator)

無限迭代器。

get_dataloader(batch_size, block_size, ...)

建立資料集並從中返回 dataloader。

ConstantKLController(*[, kl_coef, model])

恆定 KL 控制器。

AdaptiveKLController(*, init_kl_coef, ...[, ...])

自 Ziegler 等人 "Fine-Tuning Language Models from Human Preferences" 論文所述的自適應 KL 控制器。

Utils

DensifyReward(*args, **kwargs)

一個用於將 done 狀態下的獎勵重新分配給軌跡其餘部分的實用程式。

Flat2TED([done_key, shift_key, is_full_key, ...])

一個儲存載入鉤子,用於將扁平化的 TED 資料反序列化為 TED 格式。

H5Combine()

將永續性 tensordict 中的軌跡合併為儲存在檔案系統中的單個 standing tensordict。

H5Split([done_key, shift_key, is_full_key, ...])

將使用 TED2Nested 準備的資料集分割為 TensorDict,其中每個軌跡都儲存為對其父巢狀張量的檢視。

MultiStep(gamma, n_steps)

多步獎勵轉換。

Nested2TED([done_key, shift_key, ...])

將巢狀的 tensordict(其中每一行是一個軌跡)轉換為 TED 格式。

TED2Flat([done_key, shift_key, is_full_key, ...])

一個儲存儲存鉤子,用於將 TED 資料序列化為緊湊格式。

TED2Nested(*args, **kwargs)

將 TED 格式的資料集轉換為填充了巢狀張量的 tensordict,其中每一行是一個軌跡。

check_no_exclusive_keys(spec[, recurse])

給定一個 TensorSpec,如果不存在獨佔鍵,則返回 true。

consolidate_spec(spec[, ...])

給定一個 TensorSpec,透過新增 0 形狀的 spec 來刪除獨佔鍵。

contains_lazy_spec(spec)

如果 spec 包含延遲堆疊的 spec,則返回 true。

MultiStepTransform(n_steps, gamma, *[, ...])

ReplayBuffers 的多步轉換。

文件

訪問全面的 PyTorch 開發者文件

檢視文件

教程

為初學者和高階開發者提供深入的教程

檢視教程

資源

查詢開發資源並讓您的問題得到解答

檢視資源