分散式通訊包 - torch.distributed#
建立日期:2017年7月12日 | 最後更新日期:2025年9月4日
注意
有關分散式訓練所有功能的簡要介紹,請參閱PyTorch 分散式概述。
後端#
torch.distributed 支援四種內建後端,每種後端都有不同的功能。下表顯示了每個後端在 CPU 或 GPU 上可用的功能。對於 NCCL,GPU 指的是 CUDA GPU,而對於 XCCL 指的是 XPU GPU。
只有當用於構建 PyTorch 的實現支援時,MPI 才支援 CUDA。
後端 |
|
|
|
|
||||
|---|---|---|---|---|---|---|---|---|
裝置 |
CPU |
GPU |
CPU |
GPU |
CPU |
GPU |
CPU |
GPU |
send |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
✘ |
✓ |
recv |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
✘ |
✓ |
broadcast |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
✘ |
✓ |
all_reduce |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
✘ |
✓ |
reduce |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
✘ |
✓ |
all_gather |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
✘ |
✓ |
gather |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
✘ |
✓ |
scatter |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
✘ |
✓ |
reduce_scatter |
✓ |
✓ |
✘ |
✘ |
✘ |
✓ |
✘ |
✓ |
all_to_all |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
✘ |
✓ |
barrier |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
✘ |
✓ |
PyTorch 自帶的後端#
PyTorch 分散式包支援 Linux(穩定)、MacOS(穩定)和 Windows(原型)。預設情況下,對於 Linux,Gloo 和 NCCL 後端在 PyTorch 分散式中構建幷包含(NCCL 僅在透過 CUDA 構建時包含)。MPI 是一個可選後端,只有當您從原始碼構建 PyTorch 時才能包含(例如,在已安裝 MPI 的主機上構建 PyTorch)。
注意
從 PyTorch v1.8 開始,Windows 支援所有集體通訊後端,但 NCCL 除外。如果 init_method 引數 init_process_group() 指向一個檔案,則該檔案必須遵循以下模式
本地檔案系統,
init_method="file:///d:/tmp/some_file"共享檔案系統,
init_method="file://////{machine_name}/{share_folder_name}/some_file"
與 Linux 平臺相同,您可以設定環境變數 MASTER_ADDR 和 MASTER_PORT 來啟用 TcpStore。
選擇哪個後端?#
過去,我們經常被問到:“我應該使用哪個後端?”。
經驗法則
使用 NCCL 後端進行 CUDA **GPU** 的分散式訓練。
使用 XCCL 後端進行 XPU **GPU** 的分散式訓練。
使用 Gloo 後端進行 **CPU** 的分散式訓練。
配備 InfiniBand 互連的 GPU 主機
使用 NCCL,因為它是目前唯一支援 InfiniBand 和 GPUDirect 的後端。
配備乙太網互連的 GPU 主機
使用 NCCL,因為它目前提供了最佳的分散式 GPU 訓練效能,尤其是在多程序單節點或多節點分散式訓練方面。如果您在使用 NCCL 時遇到任何問題,請將 Gloo 作為備選方案。(請注意,Gloo 目前在 GPU 上的速度比 NCCL 慢。)
配備 InfiniBand 互連的 CPU 主機
如果您的 InfiniBand 啟用了 IP over IB,請使用 Gloo,否則,請使用 MPI。我們計劃在未來的版本中為 Gloo 新增 InfiniBand 支援。
配備乙太網互連的 CPU 主機
使用 Gloo,除非您有特別的原因要使用 MPI。
常用環境變數#
選擇要使用的網路介面#
預設情況下,NCCL 和 Gloo 後端都會嘗試找到要使用的正確網路介面。如果自動檢測到的介面不正確,您可以使用以下環境變數覆蓋它(適用於相應的後端)
NCCL_SOCKET_IFNAME,例如
export NCCL_SOCKET_IFNAME=eth0GLOO_SOCKET_IFNAME,例如
export GLOO_SOCKET_IFNAME=eth0
如果您使用的是 Gloo 後端,可以透過用逗號分隔來指定多個介面,例如:export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3。後端將透過這些介面以輪循方式分派操作。在此變數中,所有程序指定相同數量的介面是至關重要的。
其他 NCCL 環境變數#
除錯 - 如果 NCCL 失敗,您可以設定 NCCL_DEBUG=INFO 來列印明確的警告訊息以及基本的 NCCL 初始化資訊。
您還可以使用 NCCL_DEBUG_SUBSYS 來獲取 NCCL 特定方面的更多詳細資訊。例如,NCCL_DEBUG_SUBSYS=COLL 將列印集體呼叫的日誌,這在除錯掛起時可能很有用,尤其是那些由集體型別或訊息大小不匹配引起的情況。如果拓撲檢測失敗,設定 NCCL_DEBUG_SUBSYS=GRAPH 來檢查詳細的檢測結果並儲存作為參考,以便在需要 NCCL 團隊的進一步幫助時使用,這將很有幫助。
效能調優 - NCCL 基於其拓撲檢測執行自動調優,以節省使用者的調優工作。在某些基於套接字的系統上,使用者可能仍然嘗試調優 NCCL_SOCKET_NTHREADS 和 NCCL_NSOCKS_PERTHREAD 來提高套接字網路頻寬。這兩個環境變數已由 NCCL 為某些雲提供商(如 AWS 或 GCP)預調優。
有關 NCCL 環境變數的完整列表,請參閱NVIDIA NCCL 官方文件。
您可以使用 torch.distributed.ProcessGroupNCCL.NCCLConfig 和 torch.distributed.ProcessGroupNCCL.Options 進一步調優 NCCL 通訊器。在直譯器中使用 help(例如 help(torch.distributed.ProcessGroupNCCL.NCCLConfig))瞭解更多資訊。
基礎#
torch.distributed 包為在多臺機器上執行的多個計算節點上的多程序並行性提供了 PyTorch 支援和通訊原語。類 torch.nn.parallel.DistributedDataParallel() 在此功能的基礎上構建,透過包裝任何 PyTorch 模型來提供同步分散式訓練。這與 多程序包 - torch.multiprocessing 和 torch.nn.DataParallel() 提供的並行性型別不同,因為它支援多臺網路連線的機器,並且使用者必須為每個程序顯式啟動訓練指令碼的獨立副本。
在單機同步情況下,torch.distributed 或 torch.nn.parallel.DistributedDataParallel() 包裝器可能仍具有優於其他資料並行方法(包括 torch.nn.DataParallel())的優勢。
每個程序都維護自己的最佳化器,並在每次迭代中執行完整的最佳化步驟。雖然這可能看起來是冗餘的,因為梯度已經被收集並跨程序平均,因此對每個程序都相同,但這意味著不需要引數廣播步驟,從而減少了在節點之間傳輸張量的時間。
每個程序包含一個獨立的 Python 直譯器,消除了來自驅動單個 Python 程序的多個執行執行緒、模型副本或 GPU 的額外直譯器開銷和“GIL 爭用”。這對於大量使用 Python 執行時(包括具有迴圈層或許多小元件的模型)的模型尤其重要。
初始化#
在呼叫任何其他方法之前,需要使用 torch.distributed.init_process_group() 或 torch.distributed.device_mesh.init_device_mesh() 函式初始化包。兩者都會阻塞直到所有程序加入。
警告
初始化不是執行緒安全的。程序組建立應從單個執行緒執行,以防止跨 rank 的不一致“UUID”分配,並防止初始化期間的爭用導致掛起。
- torch.distributed.is_available()[source]#
如果分散式包可用,則返回
True。否則,
torch.distributed不會公開任何其他 API。目前,torch.distributed在 Linux、MacOS 和 Windows 上可用。在從原始碼構建 PyTorch 時,請設定USE_DISTRIBUTED=1來啟用它。目前,Linux 和 Windows 的預設值為USE_DISTRIBUTED=1,MacOS 的預設值為USE_DISTRIBUTED=0。- 返回型別
- torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)[source]#
初始化預設的分散式程序組。
這將初始化分散式包。
- 初始化程序組主要有兩種方法:
顯式指定
store、rank和world_size。指定
init_method(URL 字串),它指示在哪裡/如何發現對等節點。可以選擇指定rank和world_size,或者將所有必需的引數編碼到 URL 中並省略它們。
如果兩者都未指定,則假定
init_method為“env://”。- 引數
backend (str 或 Backend, 可選) – 要使用的後端。根據構建時配置,有效值包括
mpi、gloo、nccl、ucc、xccl或第三方外掛註冊的後端。自 2.6 版本起,如果未提供backend,c10d 將使用為 device_id 關鍵字引數(如果提供)指示的裝置型別註冊的後端。當前已知的預設註冊是:cuda的nccl,cpu的gloo,xpu的xccl。如果未提供backend或device_id,c10d 將檢測執行時機器上的加速器並使用為該檢測到的加速器(或cpu)註冊的後端。此欄位可以作為小寫字串(例如,"gloo")給出,也可以透過Backend屬性(例如Backend.GLOO)訪問。如果使用nccl後端的每臺機器上的多個程序,則每個程序必須能夠獨佔訪問它使用的每個 GPU,因為程序之間共享 GPU 可能導致死鎖或 NCCL 使用無效。ucc後端是實驗性的。可以透過get_default_backend_for_device()查詢裝置的預設後端。init_method (str, 可選) – 指定如何初始化程序組的 URL。如果未指定
init_method或store,則預設為“env://”。與store互斥。world_size (int, 可選) – 參與作業的程序數。如果指定了
store,則必需。rank (int, 可選) – 當前程序的 rank(應為介於 0 和
world_size-1 之間的數字)。如果指定了store,則必需。store (Store, 可選) – 所有工作節點都可以訪問的鍵/值儲存,用於交換連線/地址資訊。與
init_method互斥。timeout (timedelta, 可選) – 對程序組執行的操作的超時時間。NCCL 的預設值為 10 分鐘,其他後端的預設值為 30 分鐘。這是集體操作將被非同步中止且程序將崩潰的時間。這樣做是因為 CUDA 執行是非同步的,並且繼續執行使用者程式碼不再安全,因為失敗的非同步 NCCL 操作可能導致後續 CUDA 操作在損壞的資料上執行。當設定 TORCH_NCCL_BLOCKING_WAIT 時,程序將阻塞並等待此超時。
group_name (str, 可選, 已棄用) – 組名。此引數將被忽略
pg_options (ProcessGroupOptions, 可選) – 程序組選項,指定在構造特定程序組時需要傳遞哪些其他選項。目前,我們支援的唯一選項是
nccl後端的ProcessGroupNCCL.Options,可以指定is_high_priority_stream,以便 nccl 後端可以在存在計算核心等待時選擇高優先順序 CUDA 流。有關配置 nccl 的其他可用選項,請參閱 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-tdevice_id (torch.device | int, 可選) – 此程序將工作的單個特定裝置,允許進行後端特定最佳化。目前這有兩個影響,僅在 NCCL 下:通訊器將立即形成(呼叫
ncclCommInit*而不是正常的延遲呼叫),並且子組將使用ncclCommSplit(如果可能)來避免不必要的組建立開銷。如果您想提早知道 NCCL 初始化錯誤,也可以使用此欄位。如果提供了 int,API 會假設編譯時加速器型別將被使用。
注意
要啟用
backend == Backend.MPI,PyTorch 需要在支援 MPI 的系統上從原始碼構建。注意
對多個後端的支援是實驗性的。目前,當未指定後端時,將建立
gloo和nccl後端。對於 CPU 張量的集體操作,將使用gloo後端,而對於 CUDA 張量的集體操作,將使用nccl後端。可以透過傳遞格式為“<device_type>:<backend_name>,<device_type>:<backend_name>”的字串來指定自定義後端,例如“cpu:gloo,cuda:custom_backend”。
- torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None, backend_override=None)[source]#
根據 device_type、mesh_shape 和 mesh_dim_names 引數初始化一個 DeviceMesh。
這會建立一個具有 n 維陣列佈局的 DeviceMesh,其中 n 是 mesh_shape 的長度。如果提供了 mesh_dim_names,則每個維度都將標記為 mesh_dim_names[i]。
注意
init_device_mesh 遵循 SPMD 程式設計模型,這意味著相同的 PyTorch Python 程式將在群集中的所有程序/rank 上執行。請確保 mesh_shape(描述裝置佈局的 nD 陣列的維度)在所有 rank 上都相同。不一致的 mesh_shape 可能導致掛起。
注意
如果沒有找到程序組,init_device_mesh 將在後臺初始化分散式通訊所需的程序組。
- 引數
device_type (str) – Mesh 的裝置型別。當前支援:“cpu”、“cuda/cuda-like”、“xpu”。不允許傳入帶有 GPU 索引的裝置型別,例如“cuda:0”。
mesh_shape (Tuple[int]) – 定義描述裝置佈局的多維陣列的維度的元組。
mesh_dim_names (Tuple[str], 可選) – 要分配給描述裝置佈局的多維陣列每個維度的 mesh 維度名稱的元組。其長度必須與 mesh_shape 的長度匹配。 mesh_dim_names 中的每個字串都必須是唯一的。
backend_override (Dict[int | str, tuple[str, Options] | str | Options], 可選) – 對將為每個 mesh 維度建立的某些或所有 ProcessGroups 的覆蓋。每個鍵可以是維度的索引或其名稱(如果提供了 mesh_dim_names)。每個值可以是包含後端名稱及其選項的元組,或者僅包含這兩個元件中的一個(在這種情況下,另一個將被設定為其預設值)。
- 返回
一個
DeviceMesh物件,表示裝置佈局。- 返回型別
示例
>>> from torch.distributed.device_mesh import init_device_mesh >>> >>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,)) >>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))
- torch.distributed.is_torchelastic_launched()[source]#
檢查此程序是否由
torch.distributed.elastic(又名 torchelastic)啟動。是否存在
TORCHELASTIC_RUN_ID環境變數被用作確定當前程序是否由 torchelastic 啟動的代理。這是一個合理的代理,因為TORCHELASTIC_RUN_ID對映到 rendezvous id,它始終是一個非空值,用於對等發現目的。- 返回型別
- torch.distributed.get_default_backend_for_device(device)[source]#
獲取給定裝置的預設後端。
- 引數
device (Union[str, torch.device]) – 要獲取預設後端的裝置。
- 返回
給定裝置的預設後端(小寫字串)。
- 返回型別
目前支援三種初始化方法:
TCP 初始化#
有兩種使用 TCP 進行初始化的方法,這兩種方法都需要一個所有程序都能訪問的網路地址和一個期望的 world_size。第一種方法需要指定屬於 rank 0 程序的地址。此初始化方法要求所有程序都手動指定 rank。
請注意,在最新的分散式包中不再支援組播地址。group_name 也已棄用。
import torch.distributed as dist
# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
rank=args.rank, world_size=4)
環境變數初始化#
此方法將從環境變數讀取配置,允許完全自定義資訊獲取方式。需要設定的變數是:
MASTER_PORT- 必需;必須是 rank 0 機器上的一個空閒埠MASTER_ADDR- 必需(rank 0 除外);rank 0 節點的地址WORLD_SIZE- 必需;可以在此處設定,或在 init 函式呼叫中設定RANK- 必需;可以在此處設定,或在 init 函式呼叫中設定
rank 為 0 的機器將用於建立所有連線。
這是預設方法,意味著不必指定 init_method(或可以為 env://)。
改進初始化時間#
TORCH_GLOO_LAZY_INIT- 按需建立連線,而不是使用完整的網格,這可以大大縮短非 all2all 操作的初始化時間。
初始化後#
執行 torch.distributed.init_process_group() 後,可以使用以下函式。要檢查程序組是否已初始化,請使用 torch.distributed.is_initialized()。
- class torch.distributed.Backend(name)[source]#
後端的列舉類。
可用後端:GLOO、NCCL、UCC、MPI、XCCL 和其他已註冊的後端。
此類的值是小寫字串,例如
"gloo"。它們可以作為屬性訪問,例如Backend.NCCL。此類可以直接呼叫以解析字串,例如
Backend(backend_str)將檢查backend_str是否有效,然後返回解析後的小寫字串。它也接受大寫字串,例如Backend("GLOO")返回"gloo"。注意
條目
Backend.UNDEFINED存在,但僅用作某些欄位的初始值。使用者不應直接使用它,也不應假定它的存在。- classmethod register_backend(name, func, extended_api=False, devices=None)[source]#
使用給定的名稱和例項化函式註冊新後端。
此類方法由第三方
ProcessGroup擴充套件來註冊新後端。- 引數
name (str) –
ProcessGroup擴充套件的後端名稱。它應與init_process_group()中的名稱匹配。func (function) – 例項化後端的函式控制代碼。該函式應在後端擴充套件中實現,並接受四個引數,包括
store、rank、world_size和timeout。extended_api (bool, 可選) – 後端是否支援擴充套件引數結構。預設值:
False。如果設定為True,後端將獲得c10d::DistributedBackendOptions的例項,以及後端實現定義的程序組選項物件。device (str 或 list of str, 可選) – 此後端支援的裝置型別,例如“cpu”、“cuda”等。如果為 None,則假定為“cpu”和“cuda”。
注意
此第三方後端支援是實驗性的,可能會發生變化。
- torch.distributed.get_backend(group=None)[source]#
返回給定程序組的後端。
- 引數
group (ProcessGroup, 可選) – 要操作的程序組。預設是通用主程序組。如果指定了另一個特定組,則呼叫程序必須是
group的一部分。- 返回
給定程序組的後端(小寫字串)。
- 返回型別
- torch.distributed.get_rank(group=None)[source]#
返回當前程序在提供的
group中的 rank,否則返回預設值。Rank 是分配給分散式程序組內每個程序的唯一識別符號。它們始終是從 0 到
world_size的連續整數。- 引數
group (ProcessGroup, 可選) – 要操作的程序組。如果為 None,則使用預設程序組。
- 返回
程序組的 rank -1,如果不在組內
- 返回型別
- torch.distributed.get_world_size(group=None)[source]#
返回當前程序組中的程序數。
- 引數
group (ProcessGroup, 可選) – 要操作的程序組。如果為 None,則使用預設程序組。
- 返回
程序組的世界大小 -1,如果不在組內
- 返回型別
關閉#
在退出時透過呼叫 destroy_process_group() 來清理資源非常重要。
遵循的最簡單的模式是在通訊不再需要時,通常在 main() 結束時,透過呼叫 destroy_process_group()(group 引數的預設值為 None)來銷燬每個程序組和後端。此呼叫應由每個訓練器程序呼叫一次,而不是由外部程序啟動器級別呼叫。
如果在超時時間內,不是所有 rank 都呼叫了 destroy_process_group(),尤其是在應用程式中有多個程序組(例如,用於 N 維並行性)時,可能會在退出時發生掛起。這是因為 ProcessGroupNCCL 的解構函式呼叫 ncclCommAbort,必須集體呼叫它,但如果由 Python 的 GC 呼叫,ProcessGroupNCCL 解構函式的呼叫順序是不確定的。呼叫 destroy_process_group() 有助於確保 ncclCommAbort 以一致的順序在 rank 之間呼叫,並避免在 ProcessGroupNCCL 的解構函式期間呼叫 ncclCommAbort。
重新初始化#
destroy_process_group 也可用於銷燬單個程序組。一種用例可能是容錯訓練,其中程序組可能在執行時被銷燬然後重新初始化。在這種情況下,至關重要的是在呼叫 destroy 之後,並在隨後初始化之前,使用除 torch.distributed 原始函式之外的某種方法來同步訓練器程序。由於實現這種同步的難度,此行為目前不受支援/未經測試,並被視為已知問題。如果此用例阻礙了您,請提交一個 github 問題或 RFC。
組#
預設情況下,集體操作在預設組(也稱為 world)上執行,並要求所有程序進入分散式函式呼叫。然而,某些工作負載可能受益於更精細的通訊。這就是分散式組發揮作用的地方。可以使用 new_group() 函式建立新的組,其中包含任意數量的所有程序的子集。它返回一個不透明的組控制代碼,可以作為 group 引數傳遞給所有集體操作(集體操作是用於以某些眾所周知的程式設計模式交換資訊的分散式函式)。
- torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None, device_id=None)[source]#
建立一個新的分散式組。
此函式要求主組中的所有程序(即參與分散式作業的所有程序)都進入此函式,即使它們不屬於該組。此外,組應該在所有程序中以相同的順序建立。
警告
安全的併發使用:當使用具有
NCCL後端的多個程序組時,使用者必須確保跨 rank 的集體操作具有全域性一致的執行順序。如果程序內的多個執行緒發出集體操作,則需要顯式同步以確保一致的排序。
當使用 torch.distributed 通訊 API 的非同步變體時,將返回一個工作物件,並且通訊核心將被入隊到單獨的 CUDA 流中,從而允許通訊和計算的重疊。一旦在一個程序組上發出了一個或多個非同步操作,就必須在使用另一個程序組之前透過呼叫 work.wait() 將它們與另一個 CUDA 流同步。
有關更多詳細資訊,請參閱併發使用多個 NCCL 通訊器 <https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/communicators.html#using-multiple-nccl-communicators-concurrently>。
- 引數
ranks (list[int]) – 組員的 rank 列表。如果為
None,則設定為所有 rank。預設值為None。timeout (timedelta, 可選) – 請參閱 init_process_group 獲取詳細資訊和預設值。
backend (str 或 Backend, 可選) – 要使用的後端。根據構建時配置,有效值為
gloo和nccl。預設使用與全域性組相同的後端。此欄位應作為小寫字串(例如"gloo")給出,也可以透過Backend屬性(例如Backend.GLOO)訪問。如果傳入None,則將使用與預設程序組對應的後端。預設值為None。pg_options (ProcessGroupOptions, 可選) – 程序組選項,指定在構造特定程序組時需要傳遞的額外選項。即對於
nccl後端,可以指定is_high_priority_stream,以便程序組可以選取高優先順序 CUDA 流。有關配置 nccl 的其他可用選項,請參閱 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-tuse_local_synchronization (bool, optional): 在程序組建立結束時執行組本地屏障。這有所不同,因為非成員 rank 不需要呼叫 API 並且不加入屏障。group_desc (str, 可選) – 用於描述程序組的字串。
device_id (torch.device, 可選) – 要“繫結”此程序的單個特定裝置。如果提供了此欄位,new_group 呼叫將嘗試立即為該裝置初始化通訊後端。
- 返回
分散式組的控制代碼,可以傳遞給集體呼叫或 GroupMember.NON_GROUP_MEMBER,如果 rank 不屬於
ranks。
注意:use_local_synchronization 不能與 MPI 一起使用。
注意:雖然對於大型叢集和小型程序組,use_local_synchronization=True 的速度可能快得多,但仍需謹慎,因為它會改變叢集行為,因為非成員 rank 不會加入組屏障(barrier())。
注意:當每個 rank 建立多個重疊的程序組時,use_local_synchronization=True 可能導致死鎖。為避免這種情況,請確保所有 rank 都遵循相同的全域性建立順序。
- torch.distributed.get_group_rank(group, global_rank)[source]#
將全域性 rank 轉換為組 rank。
global_rank必須是group的一部分,否則會引發 RuntimeError。- 引數
group (ProcessGroup) – 要查詢相對 rank 的 ProcessGroup。
global_rank (int) – 要查詢的全域性 rank。
- 返回
global_rank相對於group的組 rank。- 返回型別
注意:在預設程序組上呼叫此函式會返回身份。
- torch.distributed.get_global_rank(group, group_rank)[source]#
將組 rank 轉換為全域性 rank。
group_rank必須是 group 的一部分,否則會引發 RuntimeError。- 引數
group (ProcessGroup) – 要從中查詢全域性 rank 的 ProcessGroup。
group_rank (int) – 要查詢的組 rank。
- 返回
group_rank相對於group的全域性 rank。- 返回型別
注意:在預設程序組上呼叫此函式會返回身份。
- torch.distributed.get_process_group_ranks(group)[source]#
獲取與
group關聯的所有 rank。- 引數
group (Optional[ProcessGroup]) – 要從中獲取所有 rank 的 ProcessGroup。如果為 None,則使用預設程序組。
- 返回
按組 rank 排序的全域性 rank 列表。
- 返回型別
DeviceMesh#
DeviceMesh 是一個更高級別的抽象,用於管理程序組(或 NCCL 通訊器)。它允許使用者輕鬆建立節點間和節點內程序組,而無需擔心如何為不同的子程序組正確設定 rank,並且它有助於輕鬆管理那些分散式程序組。init_device_mesh() 函式可用於建立新的 DeviceMesh,其 mesh 形狀描述了裝置拓撲。
- class torch.distributed.device_mesh.DeviceMesh(device_type, mesh, *, mesh_dim_names=None, backend_override=None, _init_backend=True)[source]#
DeviceMesh 表示一個裝置網格,其中裝置的佈局可以表示為 n 維陣列,n 維陣列的每個值是預設程序組 rank 的全域性 ID。
DeviceMesh 可用於設定群集中的 N 維裝置連線,並管理 N 維並行性的 ProcessGroups。通訊可以在 DeviceMesh 的每個維度上單獨進行。DeviceMesh 尊重使用者已選擇的裝置(即,如果使用者在 DeviceMesh 初始化之前呼叫 torch.cuda.set_device),並且如果使用者之前未設定裝置,則會為當前程序選擇/設定裝置。請注意,手動裝置選擇應在 DeviceMesh 初始化之前發生。
當與 DTensor API 一起使用時,DeviceMesh 也可以用作上下文管理器。
注意
DeviceMesh 遵循 SPMD 程式設計模型,這意味著相同的 PyTorch Python 程式將在群集中的所有程序/rank 上執行。因此,使用者需要確保 mesh 陣列(描述裝置佈局)在所有 rank 上都相同。不一致的 mesh 將導致靜默掛起。
- 引數
device_type (str) – Mesh 的裝置型別。當前支援:“cpu”、“cuda/cuda-like”。
mesh (ndarray) – 一個多維陣列或一個整數張量,描述裝置的佈局,其中 ID 是預設程序組的全域性 ID。
- 返回
一個
DeviceMesh物件,表示裝置佈局。- 返回型別
以下程式以 SPMD 方式在每個程序/rank 上執行。在此示例中,我們有 2 個主機,每個主機有 4 個 GPU。沿 mesh 的第一個維度進行歸約將跨列(0, 4)、... 和(3, 7)進行歸約,沿 mesh 的第二個維度進行歸約將跨行(0, 1, 2, 3)和(4, 5, 6, 7)進行歸約。
示例
>>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # Initialize device mesh as (2, 4) to represent the topology >>> # of cross-host(dim 0), and within-host (dim 1). >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
- static from_group(group, device_type, mesh=None, *, mesh_dim_names=None)[source]#
使用
device_type從現有的ProcessGroup或一系列現有ProcessGroup構建一個DeviceMesh。構造的裝置網格具有與傳入組數相等的維度數。例如,如果傳入單個程序組,則生成的 DeviceMesh 是一個 1D 網格。如果傳入 2 個程序組的列表,則生成的 DeviceMesh 是一個 2D 網格。
如果傳入多個組,則必須提供
mesh和mesh_dim_names引數。傳入程序組的順序決定了網格的拓撲結構。例如,第一個程序組將是 DeviceMesh 的第 0 個維度。傳入的 mesh 張量必須具有與傳入程序組數量相同的維度數,並且 mesh 張量中的維度順序必須與傳入程序組中的順序匹配。- 引數
group (ProcessGroup 或 list[ProcessGroup]) – 現有的 ProcessGroup 或一組現有的 ProcessGroup。
device_type (str) – Mesh 的裝置型別。當前支援:“cpu”、“cuda/cuda-like”。不允許傳入帶有 GPU 索引的裝置型別,例如“cuda:0”。
mesh (torch.Tensor 或 ArrayLike, 可選) – 一個多維陣列或一個整數張量,用於描述裝置的佈局,其中 ID 是預設程序組的全域性 ID。預設為 None。
mesh_dim_names (tuple[str], 可選) – 一個描述裝置佈局的多維陣列的每個維度要分配的網格維度名稱的元組。其長度必須與 mesh_shape 的長度匹配。mesh_dim_names 中的每個字串都必須是唯一的。預設為 None。
- 返回
一個
DeviceMesh物件,表示裝置佈局。- 返回型別
- get_group(mesh_dim=None)[原始碼]#
返回由 mesh_dim 指定的單個 ProcessGroup,或者,如果未指定 mesh_dim 且 DeviceMesh 是 1 維的,則返回網格中的唯一 ProcessGroup。
- 引數
mesh_dim (str/python:int, 可選) – 它可以是網格維度的名稱或索引
None. (網格維度的。預設為) –
- 返回
一個
ProcessGroup物件。- 返回型別
- get_local_rank(mesh_dim=None)[原始碼]#
返回 DeviceMesh 的給定 mesh_dim 的本地 rank。
- 引數
mesh_dim (str/python:int, 可選) – 它可以是網格維度的名稱或索引
None. (網格維度的。預設為) –
- 返回
一個表示本地 rank 的整數。
- 返回型別
以下程式以 SPMD 方式在每個程序/rank 上執行。在此示例中,我們有 2 個主機,每個主機有 4 個 GPU。在 rank 0、1、2、3 上呼叫 mesh_2d.get_local_rank(mesh_dim=0) 將返回 0。在 rank 4、5、6、7 上呼叫 mesh_2d.get_local_rank(mesh_dim=0) 將返回 1。在 rank 0、4 上呼叫 mesh_2d.get_local_rank(mesh_dim=1) 將返回 0。在 rank 1、5 上呼叫 mesh_2d.get_local_rank(mesh_dim=1) 將返回 1。在 rank 2、6 上呼叫 mesh_2d.get_local_rank(mesh_dim=1) 將返回 2。在 rank 3、7 上呼叫 mesh_2d.get_local_rank(mesh_dim=1) 將返回 3。
示例
>>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # Initialize device mesh as (2, 4) to represent the topology >>> # of cross-host(dim 0), and within-host (dim 1). >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
點對點通訊#
- torch.distributed.send(tensor, dst=None, group=None, tag=0, group_dst=None)[原始碼]#
同步傳送一個張量。
警告
tag引數不支援 NCCL 後端。- 引數
tensor (Tensor) – 要傳送的張量。
dst (int) – 全域性程序組中的目標 rank(無論
group引數如何)。目標 rank 不得與當前程序的 rank 相同。group (ProcessGroup, 可選) – 要操作的程序組。如果為 None,則使用預設程序組。
tag (int, 可選) – 用於匹配發送和遠端接收的標籤
group_dst (int, 可選) –
group中的目標 rank。指定dst和group_dst時,兩者不能同時有效。
- torch.distributed.recv(tensor, src=None, group=None, tag=0, group_src=None)[原始碼]#
同步接收一個張量。
警告
tag引數不支援 NCCL 後端。
isend() 和 irecv() 在使用時會返回分散式請求物件。通常,此物件的型別未指定,因為它們不應手動建立,但它們保證支援兩個方法
is_completed()- 如果操作已完成,則返回 Truewait()- 將阻塞程序直到操作完成。is_completed()保證在返回後返回 True。
- torch.distributed.isend(tensor, dst=None, group=None, tag=0, group_dst=None)[原始碼]#
非同步傳送一個張量。
警告
在請求完成之前修改
tensor會導致未定義行為。警告
tag引數不支援 NCCL 後端。與阻塞式的 send 不同,isend 允許 src == dst rank,即傳送給自己。
- torch.distributed.irecv(tensor, src=None, group=None, tag=0, group_src=None)[原始碼]#
非同步接收一個張量。
警告
tag引數不支援 NCCL 後端。與阻塞式的 recv 不同,irecv 允許 src == dst rank,即從自己接收。
- torch.distributed.send_object_list(object_list, dst=None, group=None, device=None, group_dst=None, use_batch=False)[原始碼]#
同步傳送
object_list中的可序列化物件。與
send()類似,但可以傳遞 Python 物件。請注意,object_list中的所有物件都必須是可序列化的才能傳送。- 引數
object_list (List[Any]) – 要傳送的輸入物件列表。每個物件都必須是可序列化的。接收方必須提供相同大小的列表。
dst (int) – 將
object_list傳送到的目標 rank。目標 rank 基於全域性程序組(無論group引數如何)group (Optional[ProcessGroup]) – 要操作的程序組。如果為 None,則使用預設程序組。預設為
None。device (
torch.device, 可選) – 如果不為 None,則物件將被序列化並轉換為張量,然後傳送到device。預設為None。group_dst (int, 可選) –
group中的目標 rank。必須指定dst或group_dst中的一個,但不能同時指定兩者。use_batch (bool, 可選) – 如果為 True,則使用批次 p2p 操作而不是常規的 send 操作。這避免了初始化 2-rank 通訊器,並使用現有的整個組通訊器。有關用法和假設,請參閱 batch_isend_irecv。預設為
False。
- 返回
無.
注意
對於基於 NCCL 的程序組,物件內部的張量表示必須在通訊發生之前移動到 GPU 裝置。在這種情況下,使用的裝置由
torch.cuda.current_device()提供,使用者有責任透過torch.cuda.set_device()確保每個 rank 都有獨立的 GPU。警告
物件集體操作存在一些嚴重的效能和可擴充套件性限制。詳情請參閱 物件集體操作。
警告
send_object_list()隱式使用了pickle模組,該模組已知是不安全的。可以構造惡意 pickle 資料,在反序列化時執行任意程式碼。僅使用可信資料呼叫此函式。警告
使用 GPU 張量呼叫
send_object_list()支援不佳且效率低下,因為它會產生 GPU -> CPU 傳輸,因為張量會被 pickle。請考慮使用send()。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.recv_object_list(object_list, src=None, group=None, device=None, group_src=None, use_batch=False)[原始碼]#
同步接收
object_list中的可序列化物件。與
recv()類似,但可以接收 Python 物件。- 引數
object_list (List[Any]) – 要接收的物件列表。必須提供與要傳送的列表大小相同的列表。
src (int, 可選) – 從哪個源 rank 接收
object_list。源 rank 基於全域性程序組(無論group引數如何)。如果設定為 None,則從任何 rank 接收。預設為None。group (Optional[ProcessGroup]) – 要操作的程序組。如果為 None,則使用預設程序組。預設為
None。device (
torch.device, 可選) – 如果不為 None,則在此裝置上接收。預設為None。group_src (int, 可選) –
group中的目標 rank。指定src和group_src時,兩者不能同時有效。use_batch (bool, 可選) – 如果為 True,則使用批次 p2p 操作而不是常規的 send 操作。這避免了初始化 2-rank 通訊器,並使用現有的整個組通訊器。有關用法和假設,請參閱 batch_isend_irecv。預設為
False。
- 返回
傳送者 rank。如果 rank 不在組中,則為 -1。如果 rank 在組中,則
object_list將包含來自srcrank 傳送的物件。
注意
對於基於 NCCL 的程序組,物件內部的張量表示必須在通訊發生之前移動到 GPU 裝置。在這種情況下,使用的裝置由
torch.cuda.current_device()提供,使用者有責任透過torch.cuda.set_device()確保每個 rank 都有獨立的 GPU。警告
物件集體操作存在一些嚴重的效能和可擴充套件性限制。詳情請參閱 物件集體操作。
警告
recv_object_list()隱式使用了pickle模組,該模組已知是不安全的。可以構造惡意 pickle 資料,在反序列化時執行任意程式碼。僅使用可信資料呼叫此函式。警告
使用 GPU 張量呼叫
recv_object_list()支援不佳且效率低下,因為它會產生 GPU -> CPU 傳輸,因為張量會被 pickle。請考慮使用recv()。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.batch_isend_irecv(p2p_op_list)[原始碼]#
非同步傳送或接收一批張量並返回請求列表。
處理
p2p_op_list中的每個操作,並返回相應的請求。目前支援 NCCL、Gloo 和 UCC 後端。- 引數
p2p_op_list (list[torch.distributed.distributed_c10d.P2POp]) – 點對點操作列表(每個運算子的型別為
torch.distributed.P2POp)。列表中 isend/irecv 的順序很重要,需要與遠端端的相應 isend/irecv 匹配。- 返回
透過呼叫 op_list 中的相應操作返回的分散式請求物件列表。
- 返回型別
示例
>>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank >>> recv_tensor = torch.randn(2, dtype=torch.float32) >>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1) % world_size) >>> recv_op = dist.P2POp( ... dist.irecv, recv_tensor, (rank - 1 + world_size) % world_size ... ) >>> reqs = batch_isend_irecv([send_op, recv_op]) >>> for req in reqs: >>> req.wait() >>> recv_tensor tensor([2, 3]) # Rank 0 tensor([0, 1]) # Rank 1
注意
注意,當此 API 與 NCCL PG 後端一起使用時,使用者必須使用 torch.cuda.set_device 設定當前 GPU 裝置,否則會導致意外的掛起問題。
此外,如果此 API 是
group中傳遞給dist.P2POp的第一個集合通訊呼叫,則group的所有 rank 都必須參與此 API 呼叫;否則,行為未定義。如果此 API 呼叫不是group中的第一個集合通訊呼叫,則允許僅涉及group子集 rank 的批次 P2P 操作。
- class torch.distributed.P2POp(op, tensor, peer=None, group=None, tag=0, group_peer=None)[原始碼]#
用於
batch_isend_irecv的點對點操作的構建類。此類用於構建 P2P 操作的型別、通訊緩衝區、對等 rank、程序組和標籤。此類例項將傳遞給
batch_isend_irecv以進行點對點通訊。- 引數
op (Callable) – 用於將資料傳送到對等程序或從對等程序接收資料的函式。
op的型別為torch.distributed.isend或torch.distributed.irecv。tensor (Tensor) – 要傳送或接收的張量。
peer (int, 可選) – 目標或源 rank。
group (ProcessGroup, 可選) – 要操作的程序組。如果為 None,則使用預設程序組。
tag (int, 可選) – 用於匹配發送和接收的標籤。
group_peer (int, 可選) – 目標或源 rank。
同步和非同步集合通訊操作#
每個集合通訊操作函式都支援以下兩種操作,具體取決於傳遞給集合通訊的 async_op 標誌的設定
同步操作 - 預設模式,當 async_op 設定為 False 時。函式返回時,保證集合通訊操作已執行。對於 CUDA 操作,不保證 CUDA 操作已完成,因為 CUDA 操作是非同步的。對於 CPU 集合通訊,使用集合通訊結果的任何後續函式呼叫都將按預期執行。對於 CUDA 集合通訊,在同一 CUDA 流上利用結果的函式呼叫將按預期執行。使用者必須在不同流下執行時處理同步。有關 CUDA 語義(如流同步)的詳細資訊,請參閱 CUDA Semantics。請參閱下面的指令碼,瞭解 CPU 和 CUDA 操作的語義差異示例。
非同步操作 - 當 async_op 設定為 True 時。集合通訊操作函式返回一個分散式請求物件。通常,您不需要手動建立它,並且它保證支援兩個方法
is_completed()- 對於 CPU 集合通訊,如果已完成,則返回True。對於 CUDA 操作,如果操作已成功排隊到 CUDA 流,並且在不進行進一步同步的情況下可以在預設流上使用該操作,則返回True。wait()- 對於 CPU 集合通訊,將阻塞程序直到操作完成。對於 CUDA 集合通訊,將阻塞當前活動的 CUDA 流直到操作完成(但不會阻塞 CPU)。get_future()- 返回torch._C.Future物件。支援 NCCL,也支援 GLOO 和 MPI 的大多數操作,但點對點操作除外。注意:隨著我們不斷採用 Futures 併合並 API,get_future()呼叫可能會變得多餘。
示例
以下程式碼可以作為使用分散式集合通訊時 CUDA 操作語義的參考。它顯示了在使用不同 CUDA 流上的集合通訊輸出時顯式同步的必要性
# Code runs on each rank.
dist.init_process_group("nccl", rank=rank, world_size=2)
output = torch.tensor([rank]).cuda(rank)
s = torch.cuda.Stream()
handle = dist.all_reduce(output, async_op=True)
# Wait ensures the operation is enqueued, but not necessarily complete.
handle.wait()
# Using result on non-default stream.
with torch.cuda.stream(s):
s.wait_stream(torch.cuda.default_stream())
output.add_(100)
if rank == 0:
# if the explicit call to wait_stream was omitted, the output below will be
# non-deterministically 1 or 101, depending on whether the allreduce overwrote
# the value after the add completed.
print(output)
集合通訊函式#
- torch.distributed.broadcast(tensor, src=None, group=None, async_op=False, group_src=None)[原始碼]#
將張量廣播到整個組。
tensor在參與集合通訊的所有程序中必須具有相同數量的元素。- 引數
tensor (Tensor) – 如果
src是當前程序的 rank,則為要傳送的資料,否則為用於儲存接收資料的張量。src (int) – 全域性程序組中的源 rank(無論
group引數如何)。group (ProcessGroup, 可選) – 要操作的程序組。如果為 None,則使用預設程序組。
async_op (bool, 可選) – 此操作是否應為非同步操作
group_src (int) –
group中的源 rank。必須指定group_src或src中的一個,但不能同時指定兩者。
- 返回
非同步工作控制代碼,如果 async_op 設定為 True。如果不是非同步操作或不在組中,則為 None
- torch.distributed.broadcast_object_list(object_list, src=None, group=None, device=None, group_src=None)[原始碼]#
將
object_list中的可序列化物件廣播到整個組。與
broadcast()類似,但可以傳遞 Python 物件。請注意,object_list中的所有物件都必須是可序列化的才能廣播。- 引數
object_list (List[Any]) – 要廣播的輸入物件列表。每個物件都必須是可序列化的。只有
srcrank 上的物件才會被廣播,但每個 rank 都必須提供相同大小的列表。src (int) – 從哪個源 rank 廣播
object_list。源 rank 基於全域性程序組(無論group引數如何)group (Optional[ProcessGroup]) – 要操作的程序組。如果為 None,則使用預設程序組。預設為
None。device (
torch.device, 可選) – 如果不為 None,則物件將被序列化並轉換為張量,然後廣播到device。預設為None。group_src (int) –
group中的源 rank。不得同時指定group_src和src中的一個。
- 返回
None。如果 rank 在組中,則object_list將包含從srcrank 廣播的物件。
注意
對於基於 NCCL 的程序組,物件內部的張量表示必須在通訊發生之前移動到 GPU 裝置。在這種情況下,使用的裝置由
torch.cuda.current_device()提供,使用者有責任透過torch.cuda.set_device()確保每個 rank 都有獨立的 GPU。注意
請注意,此 API 與
broadcast()集合通訊略有不同,因為它不提供async_op控制代碼,因此將是一個阻塞呼叫。警告
物件集體操作存在一些嚴重的效能和可擴充套件性限制。詳情請參閱 物件集體操作。
警告
broadcast_object_list()隱式使用了pickle模組,該模組已知是不安全的。可以構造惡意 pickle 資料,在反序列化時執行任意程式碼。僅使用可信資料呼叫此函式。警告
使用 GPU 張量呼叫
broadcast_object_list()支援不佳且效率低下,因為它會產生 GPU -> CPU 傳輸,因為張量會被 pickle。請考慮使用broadcast()。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> objects = [None, None, None] >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> dist.broadcast_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.all_reduce(tensor, op=<RedOpType.SUM: 0>, group=None, async_op=False)[原始碼]#
以所有程序獲得最終結果的方式對張量資料進行規約。
呼叫後
tensor在所有程序中都將逐位相同。支援複雜張量。
- 引數
tensor (Tensor) – 集合通訊的輸入和輸出。函式原地操作。
op (optional) –
torch.distributed.ReduceOp列舉中的一個值。指定用於逐元素規約的操作。group (ProcessGroup, 可選) – 要操作的程序組。如果為 None,則使用預設程序組。
async_op (bool, 可選) – 此操作是否應為非同步操作
- 返回
非同步工作控制代碼,如果 async_op 設定為 True。如果不是非同步操作或不在組中,則為 None
示例
>>> # All tensors below are of torch.int64 type. >>> # We have 2 process groups, 2 ranks. >>> device = torch.device(f"cuda:{rank}") >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4, 6], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1
>>> # All tensors below are of torch.cfloat type. >>> # We have 2 process groups, 2 ranks. >>> tensor = torch.tensor( ... [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device ... ) + 2 * rank * (1 + 1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0 tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1
- torch.distributed.reduce(tensor, dst=None, op=<RedOpType.SUM: 0>, group=None, async_op=False, group_dst=None)[原始碼]#
對所有機器上的張量資料進行規約。
只有 rank 為
dst的程序才會收到最終結果。- 引數
tensor (Tensor) – 集合通訊的輸入和輸出。函式原地操作。
dst (int) – 全域性程序組中的目標 rank(無論
group引數如何)op (optional) –
torch.distributed.ReduceOp列舉中的一個值。指定用於逐元素規約的操作。group (ProcessGroup, 可選) – 要操作的程序組。如果為 None,則使用預設程序組。
async_op (bool, 可選) – 此操作是否應為非同步操作
group_dst (int) –
group中的目標 rank。必須指定group_dst或dst中的一個,但不能同時指定兩者。
- 返回
非同步工作控制代碼,如果 async_op 設定為 True。如果不是非同步操作或不在組中,則為 None
- torch.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[原始碼]#
將來自整個組的張量收集到一個列表中。
支援複雜和不等大的張量。
- 引數
tensor_list (list[Tensor]) – 輸出列表。應包含正確大小的張量,用於集合通訊的輸出。支援不等大的張量。
tensor (Tensor) – 要從當前程序廣播的張量。
group (ProcessGroup, 可選) – 要操作的程序組。如果為 None,則使用預設程序組。
async_op (bool, 可選) – 此操作是否應為非同步操作
- 返回
非同步工作控制代碼,如果 async_op 設定為 True。如果不是非同步操作或不在組中,則為 None
示例
>>> # All tensors below are of torch.int64 dtype. >>> # We have 2 process groups, 2 ranks. >>> device = torch.device(f"cuda:{rank}") >>> tensor_list = [ ... torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2) ... ] >>> tensor_list [tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # Rank 0 [tensor([0, 0], device='cuda:1'), tensor([0, 0], device='cuda:1')] # Rank 1 >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # Rank 0 [tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # Rank 1
>>> # All tensors below are of torch.cfloat dtype. >>> # We have 2 process groups, 2 ranks. >>> tensor_list = [ ... torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2) ... ] >>> tensor_list [tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # Rank 0 [tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # Rank 1 >>> tensor = torch.tensor( ... [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device ... ) + 2 * rank * (1 + 1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # Rank 0 [tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # Rank 1
- torch.distributed.all_gather_into_tensor(output_tensor, input_tensor, group=None, async_op=False)[原始碼]#
從所有 rank 收集張量並放入一個輸出張量中。
此函式要求所有程序上的張量大小相同。
- 引數
output_tensor (Tensor) – 用於容納所有 rank 張量元素的輸出張量。它必須大小正確,具有以下形式之一:(i)沿主維度連線所有輸入張量;“連線”的定義請參閱
torch.cat();(ii)沿主維度堆疊所有輸入張量;“堆疊”的定義請參閱torch.stack()。下面的示例可以更好地解釋支援的輸出形式。input_tensor (Tensor) – 從當前 rank 收集的張量。與
all_gatherAPI 不同,此 API 中的輸入張量在所有 rank 上必須具有相同的大小。group (ProcessGroup, 可選) – 要操作的程序組。如果為 None,則使用預設程序組。
async_op (bool, 可選) – 此操作是否應為非同步操作
- 返回
非同步工作控制代碼,如果 async_op 設定為 True。如果不是非同步操作或不在組中,則為 None
示例
>>> # All tensors below are of torch.int64 dtype and on CUDA devices. >>> # We have two ranks. >>> device = torch.device(f"cuda:{rank}") >>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor_in tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> # Output in concatenation form >>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out, tensor_in) >>> tensor_out tensor([1, 2, 3, 4], device='cuda:0') # Rank 0 tensor([1, 2, 3, 4], device='cuda:1') # Rank 1 >>> # Output in stack form >>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out2, tensor_in) >>> tensor_out2 tensor([[1, 2], [3, 4]], device='cuda:0') # Rank 0 tensor([[1, 2], [3, 4]], device='cuda:1') # Rank 1
- torch.distributed.all_gather_object(object_list, obj, group=None)[原始碼]#
將可序列化物件從整個組收集到一個列表中。
與
all_gather()類似,但可以傳遞 Python 物件。請注意,物件必須是可序列化的才能被收集。- 引數
object_list (list[Any]) – 輸出列表。應根據此集合通訊的組的大小正確設定大小,並將包含輸出。
obj (Any) – 要從當前程序廣播的可序列化 Python 物件。
group (ProcessGroup, 可選) – 要操作的程序組。如果為 None,則使用預設程序組。預設為
None。
- 返回
None。如果呼叫 rank 屬於此組,則集合通訊的輸出將被填充到輸入的object_list中。如果呼叫 rank 不屬於此組,則傳遞的object_list將保持不變。
注意
請注意,此 API 與
all_gather()集合通訊略有不同,因為它不提供async_op控制代碼,因此將是一個阻塞呼叫。注意
對於基於 NCCL 的程序組,物件內部的張量表示必須在通訊發生之前移動到 GPU 裝置。在這種情況下,使用的裝置由
torch.cuda.current_device()提供,使用者有責任透過torch.cuda.set_device()確保每個 rank 都有獨立的 GPU。警告
物件集體操作存在一些嚴重的效能和可擴充套件性限制。詳情請參閱 物件集體操作。
警告
all_gather_object()隱式使用了pickle模組,該模組已知是不安全的。可以構造惡意 pickle 資料,在反序列化時執行任意程式碼。僅使用可信資料呼叫此函式。警告
使用 GPU 張量呼叫
all_gather_object()支援不佳且效率低下,因為它會產生 GPU -> CPU 傳輸,因為張量會被 pickle。請考慮使用all_gather()。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.all_gather_object(output, gather_objects[dist.get_rank()]) >>> output ['foo', 12, {1: 2}]
- torch.distributed.gather(tensor, gather_list=None, dst=None, group=None, async_op=False, group_dst=None)[原始碼]#
將張量列表收集到一個程序中。
此函式要求所有程序上的張量大小相同。
- 引數
tensor (Tensor) – 輸入張量。
gather_list (list[Tensor], 可選) – 用於收集資料的正確大小的張量列表(預設為 None,必須在目標 rank 上指定)
dst (int, 可選) – 全域性程序組中的目標 rank(無論
group引數如何)。(如果dst和group_dst都為 None,則預設為全域性 rank 0)group (ProcessGroup, 可選) – 要操作的程序組。如果為 None,則使用預設程序組。
async_op (bool, 可選) – 此操作是否應為非同步操作
group_dst (int, 可選) –
group中的目標 rank。指定dst和group_dst時,兩者不能同時有效
- 返回
非同步工作控制代碼,如果 async_op 設定為 True。如果不是非同步操作或不在組中,則為 None
注意
注意,gather_list 中的所有張量必須具有相同的大小。
- 示例:
>>> # We have 2 process groups, 2 ranks. >>> tensor_size = 2 >>> device = torch.device(f'cuda:{rank}') >>> tensor = torch.ones(tensor_size, device=device) + rank >>> if dist.get_rank() == 0: >>> gather_list = [torch.zeros_like(tensor, device=device) for i in range(2)] >>> else: >>> gather_list = None >>> dist.gather(tensor, gather_list, dst=0) >>> # Rank 0 gets gathered data. >>> gather_list [tensor([1., 1.], device='cuda:0'), tensor([2., 2.], device='cuda:0')] # Rank 0 None # Rank 1
- torch.distributed.gather_object(obj, object_gather_list=None, dst=None, group=None, group_dst=None)[原始碼]#
將整個組中的可序列化物件收集到一個程序中。
與
gather()類似,但可以傳遞 Python 物件。請注意,物件必須是可序列化的才能被收集。- 引數
obj (Any) – 輸入物件。必須是可序列化的。
object_gather_list (list[Any]) – 輸出列表。在
dstrank 上,它的大小應與此集合通訊的組大小相同,並將包含輸出。在非 dst rank 上必須為None。(預設為None)dst (int, 可選) – 全域性程序組中的目標 rank(無論
group引數如何)。(如果dst和group_dst都為 None,則預設為全域性 rank 0)group (Optional[ProcessGroup]) – 要操作的程序組。如果為 None,則使用預設程序組。預設為
None。group_dst (int, 可選) –
group中的目標 rank。指定dst和group_dst時,兩者不能同時有效
- 返回
None。在dstrank 上,object_gather_list將包含集合通訊的輸出。
注意
請注意,此 API 與 gather 集合通訊略有不同,因為它不提供 async_op 控制代碼,因此將是一個阻塞呼叫。
注意
對於基於 NCCL 的程序組,物件內部的張量表示必須在通訊發生之前移動到 GPU 裝置。在這種情況下,使用的裝置由
torch.cuda.current_device()提供,使用者有責任透過torch.cuda.set_device()確保每個 rank 都有獨立的 GPU。警告
物件集體操作存在一些嚴重的效能和可擴充套件性限制。詳情請參閱 物件集體操作。
警告
gather_object()隱式使用了pickle模組,該模組已知是不安全的。可以構造惡意 pickle 資料,在反序列化時執行任意程式碼。僅使用可信資料呼叫此函式。警告
使用 GPU 張量呼叫
gather_object()支援不佳且效率低下,因為它會產生 GPU -> CPU 傳輸,因為張量會被 pickle。請考慮使用gather()。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.gather_object( ... gather_objects[dist.get_rank()], ... output if dist.get_rank() == 0 else None, ... dst=0 ... ) >>> # On rank 0 >>> output ['foo', 12, {1: 2}]
- torch.distributed.scatter(tensor, scatter_list=None, src=None, group=None, async_op=False, group_src=None)[原始碼]#
將張量列表散佈到組中的所有程序。
每個程序將接收一個張量,並將其資料儲存在
tensor引數中。支援複雜張量。
- 引數
tensor (Tensor) – 輸出張量。
scatter_list (list[Tensor]) – 要散佈的張量列表(預設為 None,必須在源 rank 上指定)
src (int) – 從哪個源 rank 散佈
scatter_list。源 rank 基於全域性程序組(無論group引數如何)。(如果src和group_src都為 None,則預設為全域性 rank 0)group (ProcessGroup, 可選) – 要操作的程序組。如果為 None,則使用預設程序組。
async_op (bool, 可選) – 此操作是否應為非同步操作
group_src (int, 可選) –
group中的源 rank。不得同時指定src和group_src。
- 返回
非同步工作控制代碼,如果 async_op 設定為 True。如果不是非同步操作或不在組中,則為 None
注意
注意,scatter_list 中的所有張量必須具有相同的大小。
- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> tensor_size = 2 >>> device = torch.device(f'cuda:{rank}') >>> output_tensor = torch.zeros(tensor_size, device=device) >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> # Only tensors, all of which must be the same size. >>> t_ones = torch.ones(tensor_size, device=device) >>> t_fives = torch.ones(tensor_size, device=device) * 5 >>> scatter_list = [t_ones, t_fives] >>> else: >>> scatter_list = None >>> dist.scatter(output_tensor, scatter_list, src=0) >>> # Rank i gets scatter_list[i]. >>> output_tensor tensor([1., 1.], device='cuda:0') # Rank 0 tensor([5., 5.], device='cuda:1') # Rank 1
- torch.distributed.scatter_object_list(scatter_object_output_list, scatter_object_input_list=None, src=None, group=None, group_src=None)[原始碼]#
將
scatter_object_input_list中的可序列化物件散佈到整個組。與
scatter()類似,但可以傳遞 Python 物件。在每個 rank 上,散佈的物件將儲存在scatter_object_output_list的第一個元素中。請注意,scatter_object_input_list中的所有物件都必須是可序列化的才能散佈。- 引數
scatter_object_output_list (List[Any]) – 非空列表,其第一個元素將儲存散佈到此 rank 的物件。
scatter_object_input_list (List[Any], 可選) – 要散佈的輸入物件列表。每個物件都必須是可序列化的。只有
srcrank 上的物件才會被散佈,對於非 src rank,此引數可以為None。src (int) – 從哪個源 rank 散佈
scatter_object_input_list。源 rank 基於全域性程序組(無論group引數如何)。(如果src和group_src都為 None,則預設為全域性 rank 0)group (Optional[ProcessGroup]) – 要操作的程序組。如果為 None,則使用預設程序組。預設為
None。group_src (int, 可選) –
group中的源 rank。不得同時指定src和group_src。
- 返回
None。如果 rank 在組中,則scatter_object_output_list的第一個元素將被設定為該 rank 的散佈物件。
注意
請注意,此 API 與 scatter 集合通訊略有不同,因為它不提供
async_op控制代碼,因此將是一個阻塞呼叫。警告
物件集體操作存在一些嚴重的效能和可擴充套件性限制。詳情請參閱 物件集體操作。
警告
scatter_object_list()隱式使用了pickle模組,該模組已知是不安全的。可以構造惡意 pickle 資料,在反序列化時執行任意程式碼。僅使用可信資料呼叫此函式。警告
使用 GPU 張量呼叫
scatter_object_list()支援不佳且效率低下,因為它會產生 GPU -> CPU 傳輸,因為張量會被 pickle。請考慮使用scatter()。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> # Can be any list on non-src ranks, elements are not used. >>> objects = [None, None, None] >>> output_list = [None] >>> dist.scatter_object_list(output_list, objects, src=0) >>> # Rank i gets objects[i]. For example, on rank 2: >>> output_list [{1: 2}]
- torch.distributed.reduce_scatter(output, input_list, op=<RedOpType.SUM: 0>, group=None, async_op=False)[原始碼]#
規約,然後將張量列表散佈到組中的所有程序。
- 引數
output (Tensor) – 輸出張量。
op (optional) –
torch.distributed.ReduceOp列舉中的一個值。指定用於逐元素規約的操作。group (ProcessGroup, 可選) – 要操作的程序組。如果為 None,則使用預設程序組。
async_op (bool, 可選) – 此操作是否應為非同步操作。
- 返回
非同步工作控制代碼,如果 async_op 設定為 True。如果不是非同步操作或不在組中,則為 None。
- torch.distributed.reduce_scatter_tensor(output, input, op=<RedOpType.SUM: 0>, group=None, async_op=False)[原始碼]#
規約,然後將張量散佈到組中的所有 rank。
- 引數
output (Tensor) – 輸出張量。它在所有 rank 上都應具有相同的大小。
input (Tensor) – 要規約和散佈的輸入張量。其大小應為輸出張量大小乘以世界大小。輸入張量可以具有以下形狀之一:(i)沿主維度連線輸出張量,或(ii)沿主維度堆疊輸出張量。有關“連線”的定義,請參閱
torch.cat()。有關“堆疊”的定義,請參閱torch.stack()。group (ProcessGroup, 可選) – 要操作的程序組。如果為 None,則使用預設程序組。
async_op (bool, 可選) – 此操作是否應為非同步操作。
- 返回
非同步工作控制代碼,如果 async_op 設定為 True。如果不是非同步操作或不在組中,則為 None。
示例
>>> # All tensors below are of torch.int64 dtype and on CUDA devices. >>> # We have two ranks. >>> device = torch.device(f"cuda:{rank}") >>> tensor_out = torch.zeros(2, dtype=torch.int64, device=device) >>> # Input in concatenation form >>> tensor_in = torch.arange(world_size * 2, dtype=torch.int64, device=device) >>> tensor_in tensor([0, 1, 2, 3], device='cuda:0') # Rank 0 tensor([0, 1, 2, 3], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1 >>> # Input in stack form >>> tensor_in = torch.reshape(tensor_in, (world_size, 2)) >>> tensor_in tensor([[0, 1], [2, 3]], device='cuda:0') # Rank 0 tensor([[0, 1], [2, 3]], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1
- torch.distributed.all_to_all_single(output, input, output_split_sizes=None, input_split_sizes=None, group=None, async_op=False)[原始碼]#
分割輸入張量,然後將分割列表散佈到組中的所有程序。
稍後,將從組中所有程序接收到的張量連線起來,並作為單個輸出張量返回。
支援複雜張量。
- 引數
output (Tensor) – 收集的連線輸出張量。
input (Tensor) – 要散佈的輸入張量。
output_split_sizes – (list[Int], 可選): 如果指定為 None 或空,則 dim 0 的輸出分割大小。
output張量的 dim 0 必須能被world_size整除。input_split_sizes – (list[Int], 可選): 如果指定為 None 或空,則 dim 0 的輸入分割大小。
input張量的 dim 0 必須能被world_size整除。group (ProcessGroup, 可選) – 要操作的程序組。如果為 None,則使用預設程序組。
async_op (bool, 可選) – 此操作是否應為非同步操作。
- 返回
非同步工作控制代碼,如果 async_op 設定為 True。如果不是非同步操作或不在組中,則為 None。
警告
all_to_all_single 是實驗性的,可能會發生更改。
示例
>>> input = torch.arange(4) + rank * 4 >>> input tensor([0, 1, 2, 3]) # Rank 0 tensor([4, 5, 6, 7]) # Rank 1 tensor([8, 9, 10, 11]) # Rank 2 tensor([12, 13, 14, 15]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([0, 4, 8, 12]) # Rank 0 tensor([1, 5, 9, 13]) # Rank 1 tensor([2, 6, 10, 14]) # Rank 2 tensor([3, 7, 11, 15]) # Rank 3
>>> # Essentially, it is similar to following operation: >>> scatter_list = list(input.chunk(world_size)) >>> gather_list = list(output.chunk(world_size)) >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i)
>>> # Another example with uneven split >>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> output = ... >>> dist.all_to_all_single(output, input, output_splits, input_splits) >>> output tensor([ 0, 1, 10, 11, 12, 20, 21, 30, 31]) # Rank 0 tensor([ 2, 3, 13, 14, 22, 32, 33]) # Rank 1 tensor([ 4, 15, 16, 23, 34, 35]) # Rank 2 tensor([ 5, 17, 18, 24, 36]) # Rank 3
>>> # Another example with tensors of torch.cfloat type. >>> input = torch.tensor( ... [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j], dtype=torch.cfloat ... ) + 4 * rank * (1 + 1j) >>> input tensor([1+1j, 2+2j, 3+3j, 4+4j]) # Rank 0 tensor([5+5j, 6+6j, 7+7j, 8+8j]) # Rank 1 tensor([9+9j, 10+10j, 11+11j, 12+12j]) # Rank 2 tensor([13+13j, 14+14j, 15+15j, 16+16j]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([1+1j, 5+5j, 9+9j, 13+13j]) # Rank 0 tensor([2+2j, 6+6j, 10+10j, 14+14j]) # Rank 1 tensor([3+3j, 7+7j, 11+11j, 15+15j]) # Rank 2 tensor([4+4j, 8+8j, 12+12j, 16+16j]) # Rank 3
- torch.distributed.all_to_all(output_tensor_list, input_tensor_list, group=None, async_op=False)[原始碼]#
將輸入張量列表散佈到組中的所有程序,並在輸出列表中返回收集的張量列表。
支援複雜張量。
- 引數
- 返回
非同步工作控制代碼,如果 async_op 設定為 True。如果不是非同步操作或不在組中,則為 None。
警告
all_to_all 是實驗性的,可能會發生更改。
示例
>>> input = torch.arange(4) + rank * 4 >>> input = list(input.chunk(4)) >>> input [tensor([0]), tensor([1]), tensor([2]), tensor([3])] # Rank 0 [tensor([4]), tensor([5]), tensor([6]), tensor([7])] # Rank 1 [tensor([8]), tensor([9]), tensor([10]), tensor([11])] # Rank 2 [tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([0]), tensor([4]), tensor([8]), tensor([12])] # Rank 0 [tensor([1]), tensor([5]), tensor([9]), tensor([13])] # Rank 1 [tensor([2]), tensor([6]), tensor([10]), tensor([14])] # Rank 2 [tensor([3]), tensor([7]), tensor([11]), tensor([15])] # Rank 3
>>> # Essentially, it is similar to following operation: >>> scatter_list = input >>> gather_list = output >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src=i)
>>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> input = list(input.split(input_splits)) >>> input [tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])] # Rank 0 [tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1 [tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])] # Rank 2 [tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])] # Rank 3 >>> output = ... >>> dist.all_to_all(output, input) >>> output [tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])] # Rank 0 [tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])] # Rank 1 [tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])] # Rank 2 [tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])] # Rank 3
>>> # Another example with tensors of torch.cfloat type. >>> input = torch.tensor( ... [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j], dtype=torch.cfloat ... ) + 4 * rank * (1 + 1j) >>> input = list(input.chunk(4)) >>> input [tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])] # Rank 0 [tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])] # Rank 1 [tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])] # Rank 2 [tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])] # Rank 0 [tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])] # Rank 1 [tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])] # Rank 2 [tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])] # Rank 3
- torch.distributed.barrier(group=None, async_op=False, device_ids=None)[原始碼]#
同步所有程序。
此集合通訊會阻塞程序,直到整個組進入此函式,如果 async_op 為 False,或者如果呼叫 wait() 上的非同步工作控制代碼。
- 引數
group (ProcessGroup, 可選) – 要操作的程序組。如果為 None,則使用預設程序組。
async_op (bool, 可選) – 此操作是否應為非同步操作
device_ids ([int], 可選) – 裝置/GPU ID 列表。只期望一個 ID。
- 返回
非同步工作控制代碼,如果 async_op 設定為 True。如果不是非同步操作或不在組中,則為 None
注意
ProcessGroupNCCL 現在會阻塞 CPU 執行緒直到 barrier 集合通訊完成。
注意
ProcessGroupNCCL 將 barrier 實現為 1 元素張量的 all_reduce。必須為分配此張量選擇一個裝置。裝置選擇透過按以下順序檢查來完成:(1)如果 barrier 的 device_ids 引數不為 None,則為第一個裝置;(2)傳遞給 init_process_group 的裝置(如果不是 None);(3)已與此程序組首次使用的裝置(如果已執行另一個帶有張量輸入的集合通訊);(4)全域性 rank 對本地裝置計數取模指示的裝置索引。
- torch.distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False)[原始碼]#
類似於
torch.distributed.barrier同步程序,但考慮了可配置的超時。它能夠報告在規定超時時間內未透過此 barrier 的 rank。具體來說,對於非零 rank,將阻塞直到從 rank 0 處理傳送/接收。Rank 0 將阻塞直到處理完所有其他 rank 的傳送/接收,並將報告未及時響應的 rank 的失敗。請注意,如果某個 rank 未達到 monitored_barrier(例如由於掛起),則所有其他 rank 在 monitored_barrier 中都會失敗。
此集合通訊會阻塞組中的所有程序/rank,直到整個組成功退出函式,這對於除錯和同步非常有用。但是,它可能會影響效能,並且僅應用於除錯或需要完全主機端同步點的場景。出於除錯目的,可以在應用程式的集合通訊呼叫之前插入此 barrier,以檢查是否有任何 rank 不同步。
注意
請注意,此集合通訊僅支援 GLOO 後端。
- 引數
group (ProcessGroup, 可選) – 要操作的程序組。如果為
None,則使用預設程序組。timeout (datetime.timedelta, 可選) – monitored_barrier 的超時時間。如果為
None,則使用預設程序組超時。wait_all_ranks (bool, optional) – 是否收集所有失敗的 rank。預設情況下,此引數為
False,此時 rank 0 上的monitored_barrier會在遇到第一個失敗的 rank 時立即丟擲異常,以便快速失敗。透過將wait_all_ranks=True,monitored_barrier將收集所有失敗的 rank,並丟擲一個包含所有失敗 rank 資訊中的錯誤。
- 返回
無.
- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() != 1: >>> dist.monitored_barrier() # Raises exception indicating that >>> # rank 1 did not call into monitored_barrier. >>> # Example with wait_all_ranks=True >>> if dist.get_rank() == 0: >>> dist.monitored_barrier(wait_all_ranks=True) # Raises exception >>> # indicating that ranks 1, 2, ... world_size - 1 did not call into >>> # monitored_barrier.
- class torch.distributed.Work#
一個 Work 物件表示 PyTorch 分散式包中掛起的非同步操作的控制代碼。它由非阻塞集合操作返回,例如 dist.all_reduce(tensor, async_op=True)。
- block_current_stream(self: torch._C._distributed_c10d.Work) None#
阻塞當前活動的 GPU 流以完成操作。對於基於 GPU 的集合操作,這等同於同步。對於 CPU 發起的集合操作(例如使用 Gloo),這將阻塞 CUDA 流直到操作完成。
此方法在所有情況下都會立即返回。
要檢查操作是否成功,您應該非同步檢查 Work 物件的結果。
- exception(self: torch._C._distributed_c10d.Work) std::__exception_ptr::exception_ptr#
- get_future(self: torch._C._distributed_c10d.Work) torch.Future#
- 返回
一個與
Work完成相關的torch.futures.Future物件。例如,可以透過fut = process_group.allreduce(tensors).get_future()檢索 Future 物件。
- 示例:
下面是一個簡單的 allreduce DDP 通訊鉤子的示例,它使用
get_futureAPI 來檢索與allreduce完成相關的 Future。>>> def allreduce(process_group: dist.ProcessGroup, bucket: dist.GradBucket): -> torch.futures.Future >>> group_to_use = process_group if process_group is not None else torch.distributed.group.WORLD >>> tensor = bucket.buffer().div_(group_to_use.size()) >>> return torch.distributed.all_reduce(tensor, group=group_to_use, async_op=True).get_future() >>> ddp_model.register_comm_hook(state=None, hook=allreduce)
警告
get_futureAPI 支援 NCCL,部分支援 GLOO 和 MPI 後端(不支援點對點操作,如 send/recv),並返回一個torch.futures.Future。在上面的示例中,
allreduce操作將在 GPU 上使用 NCCL 後端完成,fut.wait()將在同步適當的 NCCL 流與 PyTorch 的當前裝置流後返回,以確保我們可以進行非同步 CUDA 執行,而無需等待 GPU 上的整個操作完成。請注意,CUDAFuture不支援TORCH_NCCL_BLOCKING_WAIT標誌或 NCCL 的barrier()。此外,如果fut.then()添加了一個回撥函式,它將等待直到WorkNCCL的 NCCL 流與ProcessGroupNCCL的專用回撥流同步,並在回撥流上執行回撥後內聯呼叫該回調。fut.then()將返回另一個CUDAFuture,該 Future 儲存回撥的返回值和一個記錄了回撥流的CUDAEvent。對於 CPU 操作,
fut.done()在操作完成且 value() 張量準備就緒時返回 true。對於 GPU 操作,
fut.done()僅在操作已入隊時返回 true。對於混合 CPU-GPU 操作(例如使用 GLOO 傳送 GPU 張量),
fut.done()在張量到達相應節點時返回 true,但不一定在相應 GPU 上同步(與 GPU 操作類似)。
- get_future_result(self: torch._C._distributed_c10d.Work) torch.Future#
- 返回
一個
torch.futures.Future物件,其型別為 int,對映到 WorkResult 的列舉型別。例如,可以透過fut = process_group.allreduce(tensor).get_future_result()檢索 Future 物件。
- 示例:
使用者可以使用
fut.wait()阻塞等待工作完成,並透過fut.value()獲取 WorkResult。此外,使用者還可以使用fut.then(call_back_func)註冊一個回撥函式,以便在工作完成時呼叫,而無需阻塞當前執行緒。
警告
get_future_resultAPI 支援 NCCL。
- result(self: torch._C._distributed_c10d.Work) list[torch.Tensor]#
- wait(self: torch._C._distributed_c10d.Work, timeout: datetime.timedelta = datetime.timedelta(0)) bool#
- 返回
true/false。
- 示例:
- try
work.wait(timeout)
- except
# some handling
警告
在正常情況下,使用者無需設定超時。呼叫 wait() 等同於呼叫 synchronize():讓當前流阻塞直到 NCCL 操作完成。但是,如果設定了超時,它將阻塞 CPU 執行緒直到 NCCL 操作完成或超時。如果超時,將丟擲異常。
- class torch.distributed.ReduceOp#
可用歸約操作的列舉類:
SUM,PRODUCT,MIN,MAX,BAND,BOR,BXOR, 和PREMUL_SUM。BAND,BOR, 和BXOR歸約在與NCCL後端一起使用時不可用。AVG在求和跨 rank 之前將值除以 world size。AVG僅適用於NCCL後端,並且僅適用於 NCCL 版本 2.10 或更高版本。PREMUL_SUM在歸約之前將輸入乘以給定的標量。PREMUL_SUM僅適用於NCCL後端,並且僅適用於 NCCL 版本 2.11 或更高版本。使用者應使用torch.distributed._make_nccl_premul_sum。此外,
MAX,MIN和PRODUCT不支援複數張量。此類的值可以透過屬性訪問,例如
ReduceOp.SUM。它們用於指定歸約集合操作的策略,例如reduce()。此類不支援
__members__屬性。
分散式鍵值儲存#
分散式包帶有一個分散式的鍵值儲存,可用於在組中的程序之間共享資訊,以及在 torch.distributed.init_process_group() 中初始化分散式包(透過顯式建立儲存作為指定 init_method 的替代方法)。有 3 種鍵值儲存可供選擇:TCPStore, FileStore, 和 HashStore。
- class torch.distributed.Store#
所有儲存實現的基類,例如 PyTorch 分散式提供的 3 種實現:(
TCPStore,FileStore, 和HashStore)。- add(self: torch._C._distributed_c10d.Store, arg0: str, arg1: SupportsInt) int#
第一次為給定的
key呼叫 add 會在儲存中建立一個與key關聯的計數器,並初始化為amount。對於同一個key的後續 add 呼叫會將計數器增加指定的amount。使用已透過set()在儲存中設定的鍵呼叫add()將導致異常。- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> store.add("first_key", 6) >>> # Should return 7 >>> store.get("first_key")
- append(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None#
將鍵值對追加到儲存中,基於提供的
key和value。如果key在儲存中不存在,它將被建立。- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.append("first_key", "po") >>> store.append("first_key", "tato") >>> # Should return "potato" >>> store.get("first_key")
- check(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str]) bool#
呼叫以檢查給定的
keys列表是否在儲存中有值。此呼叫在正常情況下會立即返回,但仍可能遇到一些邊緣的死鎖情況,例如,在 TCPStore 被銷燬後呼叫 check。呼叫check()並傳入一個鍵列表,用於檢查它們是否存在於儲存中。- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> # Should return 7 >>> store.check(["first_key"])
- clone(self: torch._C._distributed_c10d.Store) torch._C._distributed_c10d.Store#
克隆儲存並返回一個指向相同底層儲存的新物件。返回的儲存可以與原始物件併發使用。這旨在為從多個執行緒安全地使用儲存提供一種方式,每個執行緒克隆一個儲存。
- compare_set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str, arg2: str) bytes#
根據提供的
key和expected_value與desired_value之間的比較,將鍵值對插入儲存。僅當key的expected_value已存在於儲存中,或者expected_value是空字串時,才會設定desired_value。- 引數
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("key", "first_value") >>> store.compare_set("key", "first_value", "second_value") >>> # Should return "second_value" >>> store.get("key")
- delete_key(self: torch._C._distributed_c10d.Store, arg0: str) bool#
從儲存中刪除與
key關聯的鍵值對。如果鍵成功刪除,則返回 true,如果未刪除,則返回 false。- 引數
key (str) – 要從儲存中刪除的鍵
- 返回
True 如果
key已刪除,否則為 False。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, HashStore can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key") >>> # This should return true >>> store.delete_key("first_key") >>> # This should return false >>> store.delete_key("bad_key")
- get(self: torch._C._distributed_c10d.Store, arg0: str) bytes#
檢索儲存中與給定
key關聯的值。如果key不存在於儲存中,該函式將等待timeout(在初始化儲存時定義)後再丟擲異常。- 引數
key (str) – 函式將返回與此鍵關聯的值。
- 返回
如果
key存在於儲存中,則返回與key關聯的值。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key")
- multi_get(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str]) list[bytes]#
檢索
keys中的所有值。如果keys中的任何鍵在儲存中不存在,該函式將等待timeout。- 引數
keys (List[str]) – 要從儲存中檢索的鍵。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "po") >>> store.set("second_key", "tato") >>> # Should return [b"po", b"tato"] >>> store.multi_get(["first_key", "second_key"])
- multi_set(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str], arg1: collections.abc.Sequence[str]) None#
根據提供的
keys和values將鍵值對列表插入儲存。- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.multi_set(["first_key", "second_key"], ["po", "tato"]) >>> # Should return b"po" >>> store.get("first_key")
- num_keys(self: torch._C._distributed_c10d.Store) int#
返回儲存中已設定的鍵的數量。請注意,此數字通常比透過
set()和add()新增的鍵的數量多一個,因為會有一個鍵用於協調使用該儲存的所有 worker。警告
與
TCPStore一起使用時,num_keys返回寫入底層檔案的鍵的數量。如果儲存被銷燬,並且使用相同檔案的另一個儲存被建立,則原始鍵將被保留。- 返回
儲存中存在的鍵的數量。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # This should return 2 >>> store.num_keys()
- queue_len(self: torch._C._distributed_c10d.Store, arg0: str) int#
返回指定佇列的長度。
如果佇列不存在,則返回 0。
有關更多詳細資訊,請參閱 queue_push。
- 引數
key (str) – 獲取長度的佇列的鍵。
- queue_pop(self: torch._C._distributed_c10d.Store, key: str, block: bool = True) bytes#
從指定佇列中彈出值,或在佇列為空時等待直到超時。
有關更多詳細資訊,請參閱 queue_push。
如果 block 為 False,當佇列為空時將引發 dist.QueueEmptyError。
- queue_push(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None#
將值推送到指定佇列。
將相同的鍵用於佇列和 set/get 操作可能會導致意外行為。
wait/check 操作支援佇列。
佇列上的 wait 將只喚醒一個等待的 worker 而不是全部。
- set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None#
根據提供的
key和value將鍵值對插入儲存。如果key已存在於儲存中,則會用新提供的value覆蓋舊值。- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key")
- set_timeout(self: torch._C._distributed_c10d.Store, arg0: datetime.timedelta) None#
設定儲存的預設超時時間。此超時時間用於初始化以及
wait()和get()方法。- 引數
timeout (timedelta) – 要在儲存中設定的超時時間。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set_timeout(timedelta(seconds=10)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"])
- property timeout#
獲取儲存的超時時間。
- wait(*args, **kwargs)#
過載函式。
wait(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str]) -> None
等待
keys中的每個鍵被新增到儲存中。如果在timeout(在儲存初始化期間設定)之前並非所有鍵都已設定,則wait將丟擲異常。- 引數
keys (list) – 在其中等待直到它們在儲存中設定的鍵列表。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 30 seconds >>> store.wait(["bad_key"])
wait(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str], arg1: datetime.timedelta) -> None
等待
keys中的每個鍵被新增到儲存中,並在timeout時間內未設定鍵時丟擲異常。- 引數
keys (list) – 在其中等待直到它們在儲存中設定的鍵列表。
timeout (timedelta) – 在丟擲異常之前等待鍵新增的時間。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"], timedelta(seconds=10))
- class torch.distributed.TCPStore#
一個基於 TCP 的分散式鍵值儲存實現。伺服器儲存持有資料,而客戶端儲存可以透過 TCP 連線到伺服器儲存並執行操作,例如
set()來插入鍵值對,get()來檢索鍵值對等。因為客戶端儲存(們)將等待伺服器建立連線,所以總應該有一個伺服器儲存被初始化。- 引數
host_name (str) – 伺服器儲存應執行的主機名或 IP 地址。
port (int) – 伺服器儲存應監聽傳入請求的埠。
world_size (int, optional) – 儲存使用者的總數(客戶端數 + 1 個伺服器)。預設為 None(None 表示非固定數量的儲存使用者)。
is_master (bool, optional) – 初始化伺服器儲存時為 True,客戶端儲存時為 False。預設為 False。
timeout (timedelta, optional) – 儲存在初始化期間以及
get()和wait()方法中使用的超時時間。預設為 timedelta(seconds=300)wait_for_workers (bool, optional) – 是否等待所有 worker 連線到伺服器儲存。這僅在 world_size 是固定值時適用。預設為 True。
multi_tenant (bool, optional) – 如果為 True,則當前程序中所有具有相同主機/埠的
TCPStore例項將使用相同的底層TCPServer。預設為 False。master_listen_fd (int, optional) – 如果指定,底層
TCPServer將在該檔案描述符上監聽,該檔案描述符必須是已繫結到port的套接字。為了繫結一個臨時埠,我們建議將埠設定為 0 並讀取.port。預設為 None(表示伺服器建立一個新套接字並嘗試將其繫結到port)。use_libuv (bool, optional) – 如果為 True,則為
TCPServer後端使用 libuv。預設為 True。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Run on process 1 (server) >>> server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30)) >>> # Run on process 2 (client) >>> client_store = dist.TCPStore("127.0.0.1", 1234, 2, False) >>> # Use any of the store methods from either the client or server after initialization >>> server_store.set("first_key", "first_value") >>> client_store.get("first_key")
- __init__(self: torch._C._distributed_c10d.TCPStore, host_name: str, port: SupportsInt, world_size: SupportsInt | None = None, is_master: bool = False, timeout: datetime.timedelta = datetime.timedelta(seconds=300), wait_for_workers: bool = True, multi_tenant: bool = False, master_listen_fd: SupportsInt | None = None, use_libuv: bool = True) None#
建立一個新的 TCPStore。
- property host#
獲取儲存監聽請求的主機名。
- property libuvBackend#
如果正在使用 libuv 後端,則返回 True。
- property port#
獲取儲存監聽請求的埠號。
- class torch.distributed.HashStore#
基於底層雜湊表的執行緒安全儲存實現。此儲存可以在同一程序內使用(例如,由其他執行緒),但不能跨程序使用。
- 示例:
>>> import torch.distributed as dist >>> store = dist.HashStore() >>> # store can be used from other threads >>> # Use any of the store methods after initialization >>> store.set("first_key", "first_value")
- __init__(self: torch._C._distributed_c10d.HashStore) None#
建立一個新的 HashStore。
- class torch.distributed.FileStore#
使用檔案儲存底層鍵值對的儲存實現。
- 示例:
>>> import torch.distributed as dist >>> store1 = dist.FileStore("/tmp/filestore", 2) >>> store2 = dist.FileStore("/tmp/filestore", 2) >>> # Use any of the store methods from either the client or server after initialization >>> store1.set("first_key", "first_value") >>> store2.get("first_key")
- __init__(self: torch._C._distributed_c10d.FileStore, file_name: str, world_size: SupportsInt = -1) None#
建立一個新的 FileStore。
- property path#
獲取 FileStore 用於儲存鍵值對的檔案的路徑。
- class torch.distributed.PrefixStore#
圍繞 3 種鍵值儲存(
TCPStore,FileStore, 和HashStore)之一的包裝器,它在將每個鍵插入儲存之前新增一個字首。- 引數
prefix (str) – 在插入儲存之前新增到每個鍵的字首字串。
store (torch.distributed.store) – 構成底層鍵值儲存的儲存物件。
- __init__(self: torch._C._distributed_c10d.PrefixStore, prefix: str, store: torch._C._distributed_c10d.Store) None#
建立一個新的 PrefixStore。
- property underlying_store#
獲取 PrefixStore 包裝的底層儲存物件。
剖析集合通訊#
請注意,您可以使用 torch.profiler(推薦,僅在 1.8.1 後可用)或 torch.autograd.profiler 來剖析此處提到的集合通訊和點對點通訊 API。所有現成的後端(gloo, nccl, mpi)均受支援,並且集合通訊的使用將在剖析輸出/跟蹤中如預期般呈現。剖析程式碼與任何常規的 torch 操作相同。
import torch
import torch.distributed as dist
with torch.profiler():
tensor = torch.randn(20, 10)
dist.all_reduce(tensor)
有關剖析器功能的完整概述,請參閱 剖析器文件。
多 GPU 集合函式#
警告
多 GPU 函式(代表每個 CPU 執行緒有多個 GPU)已棄用。截至目前,PyTorch Distributed 首選的程式設計模型是每個執行緒一個裝置,正如本文件中的 API 所證明的那樣。如果您是後端開發人員並希望支援每個執行緒有多個裝置,請聯絡 PyTorch Distributed 的維護者。
物件集合#
警告
物件集合具有一系列嚴重的限制。請進一步閱讀以確定它們是否對您的用例安全。
物件集合是一系列類似集合的操作,它們作用於任意 Python 物件,只要它們可以被 pickle。實現了各種集合模式(例如 broadcast, all_gather, …),但它們大致遵循以下模式:
將輸入物件轉換為 pickle(原始位元組),然後將其放入位元組張量中
將此位元組張量的大小通訊給對等方(第一個集合操作)
分配適當大小的張量以執行實際的集合操作
通訊物件資料(第二個集合操作)
將原始資料轉換回 Python(unpickle)
物件集合有時具有令人驚訝的效能或記憶體特性,導致執行時長或 OOM,因此應謹慎使用。以下是一些常見問題。
不對稱的 pickle/unpickle 時間 - Pickling 物件可能很慢,具體取決於物件的數量、型別和大小。當集合操作是 fan-in(例如 gather_object)時,接收 rank 必須 unpickle N 倍於傳送 rank pickle 的物件的數量,這可能導致其他 rank 在下一次集合操作時超時。
低效的張量通訊 - 張量應透過常規集合 API 傳送,而不是物件集合 API。可以透過物件集合 API 傳送張量,但它們將被序列化和反序列化(對於非 CPU 張量,包括 CPU 同步和從裝置到主機的複製),並且在幾乎所有情況下,除了除錯或故障排除程式碼之外,都值得重構程式碼以使用非物件集合。
意外的張量裝置 - 如果您仍然想透過物件集合傳送張量,還有另一個特定於 CUDA(以及可能的其他加速器)張量的方面。如果您 pickle 了一個當前位於 cuda:3 上的張量,然後 unpickle 它,您將獲得另一個位於 cuda:3 上的張量,**無論您在哪一個程序上,或者哪個 CUDA 裝置是該程序的“預設”裝置**。使用常規的張量集合 API,“輸出張量”始終位於相同的本地裝置上,這通常是您所期望的。
unpickling 一個張量將隱式啟用一個 CUDA 上下文,如果這是程序第一次使用 GPU,這可能會浪費大量 GPU 記憶體。可以透過在將張量作為物件集合的輸入之前將其移動到 CPU 來避免此問題。
第三方後端#
除了內建的 GLOO/MPI/NCCL 後端之外,PyTorch 分散式還透過執行時註冊機制支援第三方後端。關於如何透過 C++ 擴充套件開發第三方後端的參考,請參閱 教程 - 自定義 C++ 和 CUDA 擴充套件 和 test/cpp_extensions/cpp_c10d_extension.cpp。第三方後端的效能取決於其自身的實現。
新的後端繼承自 c10d::ProcessGroup,並在匯入時透過 torch.distributed.Backend.register_backend() 註冊後端名稱和例項化介面。
當手動匯入此後端並使用相應的後端名稱呼叫 torch.distributed.init_process_group() 時,torch.distributed 包將在新後端上執行。
警告
第三方後端的支援是實驗性的,可能會發生變化。
啟動實用程式#
torch.distributed 包還在 torch.distributed.launch 中提供了一個啟動實用程式。這個輔助實用程式可用於在節點上啟動多個程序進行分散式訓練。
模組 torch.distributed.launch。
torch.distributed.launch 是一個模組,用於在每個訓練節點上啟動多個分散式訓練程序。
警告
該模組將被棄用,推薦使用 torchrun。
該實用程式可用於單節點分散式訓練,其中每個節點將啟動一個或多個程序。該實用程式可用於 CPU 訓練或 GPU 訓練。如果用於 GPU 訓練,每個分散式程序將操作於單個 GPU。這可以顯著提高單節點訓練效能。它還可以用於多節點分散式訓練,透過在每個節點上啟動多個程序來顯著提高多節點分散式訓練效能。這對於支援直接 GPU 的多 Infiniband 介面的系統尤其有益,因為所有介面都可以用於聚合通訊頻寬。
在單節點分散式訓練或多節點分散式訓練這兩種情況下,該實用程式都會根據 --nproc-per-node 引數啟動每個節點上的程序數。如果用於 GPU 訓練,此數量必須小於或等於當前系統的 GPU 數量(nproc_per_node),並且每個程序將操作於從 *GPU 0 到 GPU (nproc_per_node - 1)* 的單個 GPU。
如何使用此模組
單節點多程序分散式訓練
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
arguments of your training script)
多節點多程序分散式訓練:(例如,兩個節點)
節點 1:*(IP:192.168.1.1,並有一個空閒埠:1234)*
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node-rank=0 --master-addr="192.168.1.1"
--master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
節點 2
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node-rank=1 --master-addr="192.168.1.1"
--master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
要檢視此模組提供的可選引數
python -m torch.distributed.launch --help
重要通知
1. 該實用程式和多程序分散式(單節點或多節點)GPU 訓練目前僅在使用 NCCL 分散式後端時能達到最佳效能。因此, NCCL 後端是 GPU 訓練推薦使用的後端。
2. 在您的訓練程式中,必須解析由本模組提供的命令列引數:--local-rank=LOCAL_PROCESS_RANK。如果您的訓練程式使用 GPU,您應該確保您的程式碼僅在 LOCAL_PROCESS_RANK 的 GPU 裝置上執行。這可以透過以下方式實現:
解析 local_rank 引數
>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local-rank", "--local_rank", type=int)
>>> args = parser.parse_args()
使用以下任一方式將您的裝置設定為 local rank:
>>> torch.cuda.set_device(args.local_rank) # before your code runs
或
>>> with torch.cuda.device(args.local_rank):
>>> # your code to run
>>> ...
版本 2.0.0 中已更改:啟動器會將 --local-rank=<rank> 引數傳遞給您的指令碼。從 PyTorch 2.0.0 開始,推薦使用帶連字元的 --local-rank 而非之前使用的帶下劃線的 --local_rank。
為了向後相容,使用者可能需要在其引數解析程式碼中同時處理這兩種情況。這意味著在引數解析器中包含 "--local-rank" 和 "--local_rank"。如果只提供了 "--local_rank",啟動器將觸發一個錯誤:“error: unrecognized arguments: –local-rank=<rank>”。對於僅支援 PyTorch 2.0.0+ 的訓練程式碼,包含 "--local-rank" 應該就足夠了。
3. 在您的訓練程式中,您應該在開始時呼叫以下函式來啟動分散式後端。強烈推薦使用 init_method=env://。其他初始化方法(例如 tcp://)也可能有效,但 env:// 是該模組官方支援的方法。
>>> torch.distributed.init_process_group(backend='YOUR BACKEND',
>>> init_method='env://')
4. 在您的訓練程式中,您可以使用常規的分散式函式,或者使用 torch.nn.parallel.DistributedDataParallel() 模組。如果您的訓練程式使用 GPU 進行訓練,並且您想使用 torch.nn.parallel.DistributedDataParallel() 模組,配置方法如下。
>>> model = torch.nn.parallel.DistributedDataParallel(model,
>>> device_ids=[args.local_rank],
>>> output_device=args.local_rank)
請確保 device_ids 引數設定為您的程式碼將要操作的唯一 GPU 裝置 ID。這通常是 local rank。換句話說,device_ids 需要是 [args.local_rank],並且 output_device 需要是 args.local_rank 才能使用此實用程式。
5. 另一種透過環境變數 LOCAL_RANK 將 local_rank 傳遞給子程序的方法。當您使用 --use-env=True 啟動指令碼時,此功能將被啟用。您必須調整上面的子程序示例,將 args.local_rank 替換為 os.environ['LOCAL_RANK'];當您指定此標誌時,啟動器將不會傳遞 --local-rank。
警告
local_rank **不**是全域性唯一的:它只在同一臺機器上的程序之間是唯一的。因此,不要用它來決定是否應該,例如,寫入網路檔案系統。有關可能出錯的示例,請參閱 pytorch/pytorch#12042。
啟動實用程式#
Multiprocessing package - torch.multiprocessing 包還提供了 torch.multiprocessing.spawn() 中的 spawn 函式。此輔助函式可用於啟動多個程序。它透過傳入您想要執行的函式來工作,並啟動 N 個程序來執行它。這也可以用於多程序分散式訓練。
有關如何使用它的參考,請參閱 PyTorch 示例 - ImageNet 實現
請注意,此函式需要 Python 3.4 或更高版本。
除錯 torch.distributed 應用程式#
由於難以理解的掛起、崩潰或跨 rank 的不一致行為,除錯分散式應用程式可能具有挑戰性。torch.distributed 提供了一套工具來幫助使用者自行除錯訓練應用程式。
Python 斷點#
在分散式環境中,使用 Python 的偵錯程式非常方便,但由於它不能開箱即用,許多人根本不使用它。PyTorch 提供了一個圍繞 pdb 的自定義包裝器,可以簡化此過程。
torch.distributed.breakpoint 使此過程變得容易。在內部,它透過兩種方式自定義了 pdb 的斷點行為,但否則與正常的 pdb 行為一致。
僅在一個 rank(使用者指定)上附加偵錯程式。
透過使用
torch.distributed.barrier()來確保所有其他 rank 停止,一旦被除錯的 rank 發出continue命令,就會釋放。將 stdin 從子程序重定向,以便它連線到您的終端。
要使用它,只需在每個 rank 上發出 torch.distributed.breakpoint(rank),並在每種情況下使用相同的 rank 值。
監控屏障#
從 v1.10 開始,torch.distributed.monitored_barrier() 作為 torch.distributed.barrier() 的替代方案而存在,當出現崩潰時(即並非所有 rank 都在提供的超時時間內呼叫 torch.distributed.monitored_barrier()),它會提供有關哪個 rank 可能存在問題的有用資訊。 torch.distributed.monitored_barrier() 使用 send/recv 通訊原語實現了主機端屏障,其過程類似於確認,允許 rank 0 報告哪個(些)rank 未及時確認屏障。例如,考慮以下函式,其中 rank 1 未呼叫 torch.distributed.monitored_barrier()(在實際情況中,這可能是由於應用程式 bug 或先前集體操作的掛起)。
import os
from datetime import timedelta
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
# monitored barrier requires gloo process group to perform host-side sync.
group_gloo = dist.new_group(backend="gloo")
if rank not in [1]:
dist.monitored_barrier(group=group_gloo, timeout=timedelta(seconds=2))
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
mp.spawn(worker, nprocs=2, args=())
以下錯誤訊息將在 rank 0 上生成,允許使用者確定哪個(些)rank 可能存在問題並進一步調查。
RuntimeError: Rank 1 failed to pass monitoredBarrier in 2000 ms
Original exception:
[gloo/transport/tcp/pair.cc:598] Connection closed by peer [2401:db00:eef0:1100:3560:0:1c05:25d]:8594
TORCH_DISTRIBUTED_DEBUG#
透過 TORCH_CPP_LOG_LEVEL=INFO,可以使用環境變數 TORCH_DISTRIBUTED_DEBUG 來觸發額外的有用的日誌記錄和集體同步檢查,以確保所有 rank 都得到適當的同步。TORCH_DISTRIBUTED_DEBUG 可以設定為 OFF(預設)、INFO 或 DETAIL,具體取決於所需的除錯級別。請注意,最詳細的選項 DETAIL 可能會影響應用程式效能,因此應僅在除錯問題時使用。
將 TORCH_DISTRIBUTED_DEBUG=INFO 設定為在初始化使用 torch.nn.parallel.DistributedDataParallel() 訓練的模型時產生額外的除錯日誌,而 TORCH_DISTRIBUTED_DEBUG=DETAIL 將額外記錄選定迭代次數的執行時效能統計資訊。這些執行時統計資訊包括前向傳播時間、後向傳播時間、梯度通訊時間等資料。例如,給定以下應用程式:
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
class TwoLinLayerNet(torch.nn.Module):
def __init__(self):
super().__init__()
self.a = torch.nn.Linear(10, 10, bias=False)
self.b = torch.nn.Linear(10, 1, bias=False)
def forward(self, x):
a = self.a(x)
b = self.b(x)
return (a, b)
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
print("init model")
model = TwoLinLayerNet().cuda()
print("init ddp")
ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
inp = torch.randn(10, 10).cuda()
print("train")
for _ in range(20):
output = ddp_model(inp)
loss = output[0] + output[1]
loss.sum().backward()
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ[
"TORCH_DISTRIBUTED_DEBUG"
] = "DETAIL" # set to DETAIL for runtime logging.
mp.spawn(worker, nprocs=2, args=())
初始化時會生成以下日誌:
I0607 16:10:35.739390 515217 logger.cpp:173] [Rank 0]: DDP Initialized with:
broadcast_buffers: 1
bucket_cap_bytes: 26214400
find_unused_parameters: 0
gradient_as_bucket_view: 0
is_multi_device_module: 0
iteration: 0
num_parameter_tensors: 2
output_device: 0
rank: 0
total_parameter_size_bytes: 440
world_size: 2
backend_name: nccl
bucket_sizes: 440
cuda_visible_devices: N/A
device_ids: 0
dtypes: float
master_addr: localhost
master_port: 29501
module_name: TwoLinLayerNet
nccl_async_error_handling: N/A
nccl_blocking_wait: N/A
nccl_debug: WARN
nccl_ib_timeout: N/A
nccl_nthreads: N/A
nccl_socket_ifname: N/A
torch_distributed_debug: INFO
執行時會生成以下日誌(當設定 TORCH_DISTRIBUTED_DEBUG=DETAIL 時):
I0607 16:18:58.085681 544067 logger.cpp:344] [Rank 1 / 2] Training TwoLinLayerNet unused_parameter_size=0
Avg forward compute time: 40838608
Avg backward compute time: 5983335
Avg backward comm. time: 4326421
Avg backward comm/comp overlap time: 4207652
I0607 16:18:58.085693 544066 logger.cpp:344] [Rank 0 / 2] Training TwoLinLayerNet unused_parameter_size=0
Avg forward compute time: 42850427
Avg backward compute time: 3885553
Avg backward comm. time: 2357981
Avg backward comm/comp overlap time: 2234674
此外,TORCH_DISTRIBUTED_DEBUG=INFO 增強了 torch.nn.parallel.DistributedDataParallel() 中由於模型中存在未使用的引數而導致的崩潰日誌。目前,如果在初始化 torch.nn.parallel.DistributedDataParallel() 時傳遞了 find_unused_parameters=True,則可能存在前向傳播中未使用的引數,並且從 v1.10 開始,模型的所有輸出都必須用於損失計算,因為 torch.nn.parallel.DistributedDataParallel() 不支援後向傳播中的未使用引數。這些約束對於大型模型尤其具有挑戰性,因此當發生崩潰時,torch.nn.parallel.DistributedDataParallel() 將會記錄所有未使用的引數的完整限定名稱。例如,在上述應用程式中,如果我們修改 loss 以便計算為 loss = output[1],那麼 TwoLinLayerNet.a 在後向傳播中將不會接收到梯度,因此會導致 DDP 失敗。在崩潰時,使用者會收到有關未使用引數的資訊,而對於大型模型來說,這些資訊可能很難手動找到。
RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing
the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by
making sure all `forward` function outputs participate in calculating loss.
If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return va
lue of `forward` of your module when reporting this issue (e.g. list, dict, iterable).
Parameters which did not receive grad for rank 0: a.weight
Parameter indices which did not receive grad for rank 0: 0
將 TORCH_DISTRIBUTED_DEBUG=DETAIL 設定將觸發對使用者直接或間接發出的每個集體呼叫(例如 DDP allreduce)的額外一致性和同步檢查。這是透過建立一個包裝器程序組來實現的,該程序組包裝由 torch.distributed.init_process_group() 和 torch.distributed.new_group() API 返回的所有程序組。因此,這些 API 將返回一個包裝器程序組,該程序組可以用作常規程序組,但在將集體操作分派到底層程序組之前會執行一致性檢查。目前,這些檢查包括一個 torch.distributed.monitored_barrier(),它確保所有 rank 完成其掛起的集體呼叫,並報告卡住的 rank。接下來,透過確保所有集體函式匹配並以一致的張量形狀呼叫來檢查集體本身的一致性。如果不是這種情況,當應用程式崩潰時,會包含一個詳細的錯誤報告,而不是掛起或無資訊的錯誤訊息。例如,考慮以下函式,該函式在 torch.distributed.all_reduce() 中具有不匹配的輸入形狀:
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
tensor = torch.randn(10 if rank == 0 else 20).cuda()
dist.all_reduce(tensor)
torch.cuda.synchronize(device=rank)
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
mp.spawn(worker, nprocs=2, args=())
使用 NCCL 後端,此類應用程式可能會導致掛起,而在非平凡場景中,這可能難以追溯根本原因。如果使用者啟用 TORCH_DISTRIBUTED_DEBUG=DETAIL 並重新執行應用程式,以下錯誤訊息將揭示根本原因:
work = default_pg.allreduce([tensor], opts)
RuntimeError: Error when verifying shape tensors for collective ALLREDUCE on rank 0. This likely indicates that input shapes into the collective are mismatched across ranks. Got shapes: 10
[ torch.LongTensor{1} ]
注意
有關執行時除錯級別的精細控制,還可以使用函式 torch.distributed.set_debug_level()、torch.distributed.set_debug_level_from_env() 和 torch.distributed.get_debug_level()。
此外,TORCH_DISTRIBUTED_DEBUG=DETAIL 可以與 TORCH_SHOW_CPP_STACKTRACES=1 結合使用,在檢測到集體操作不同步時記錄整個呼叫堆疊。這些集體操作不同步檢查將適用於所有使用 c10d 集體呼叫,並由使用 torch.distributed.init_process_group() 和 torch.distributed.new_group() API 建立的程序組支援的應用程式。
日誌記錄#
除了透過 torch.distributed.monitored_barrier() 和 TORCH_DISTRIBUTED_DEBUG 提供的顯式除錯支援外,torch.distributed 的底層 C++ 庫還會輸出不同級別的日誌訊息。這些訊息有助於理解分散式訓練作業的執行狀態,並排查網路連線失敗等問題。下表顯示瞭如何透過 TORCH_CPP_LOG_LEVEL 和 TORCH_DISTRIBUTED_DEBUG 環境變數的組合來調整日誌級別。
|
|
生效日誌級別 |
|---|---|---|
|
ignored |
錯誤 |
|
ignored |
警告 |
|
ignored |
Info |
|
|
Debug |
|
|
Trace (a.k.a. All) |
分散式元件會引發派生自 RuntimeError 的自定義異常型別。
torch.distributed.DistError: 這是所有分散式異常的基類。torch.distributed.DistBackendError: 當發生後端特定的錯誤時會丟擲此異常。例如,如果使用NCCL後端,而使用者嘗試使用NCCL庫無法訪問的 GPU。torch.distributed.DistNetworkError: 當網路庫遇到錯誤時(例如:連線被遠端主機重置)會丟擲此異常。torch.distributed.DistStoreError: 當 Store 遇到錯誤時(例如:TCPStore 超時)會丟擲此異常。
- class torch.distributed.DistError#
分散式庫中發生錯誤時引發的異常
- class torch.distributed.DistBackendError#
分散式中發生後端錯誤時引發的異常
- class torch.distributed.DistNetworkError#
分散式中發生網路錯誤時引發的異常
- class torch.distributed.DistStoreError#
分散式儲存中發生錯誤時引發的異常
如果您正在進行單節點訓練,則可以方便地對指令碼進行互動式斷點。我們提供了一種方便地對單個 rank 進行斷點的方法。
- torch.distributed.breakpoint(rank=0, skip=0, timeout_s=3600)[source]#
設定一個斷點,但僅在一個 rank 上。所有其他 rank 將等待您完成斷點後再繼續。