torchrun (彈性啟動)#
創建於:2021年5月4日 | 最後更新於:2021年8月26日
模組 torch.distributed.run。
torch.distributed.run 是一個模組,用於在每個訓練節點上啟動多個分散式訓練程序。
torchrun 是一個 Python 命令列指令碼,指向 entry_points 配置中宣告的主要模組 torch.distributed.run 在 setup.py 中。它等同於呼叫 python -m torch.distributed.run。
torchrun 可用於單節點分散式訓練,其中將在每個節點上啟動一個或多個程序。它可用於 CPU 訓練或 GPU 訓練。如果用於 GPU 訓練,每個分散式程序將操作於單個 GPU。這可以實現單節點訓練效能的顯著提升。torchrun 也可用於多節點分散式訓練,透過在每個節點上啟動多個程序來提高多節點分散式訓練的效能。這對於具有直接 GPU 支援的多個 Infiniband 介面的系統尤其有利,因為所有介面都可以用於聚合通訊頻寬。
在單節點分散式訓練或多節點分散式訓練這兩種情況下,torchrun 都將啟動每個節點上指定的程序數(--nproc-per-node)。如果用於 GPU 訓練,這個數字需要小於或等於當前系統的 GPU 數量(nproc_per_node),並且每個程序將操作於從 *GPU 0 到 GPU (nproc_per_node - 1)* 的單個 GPU。
版本 2.0.0 中已更改:torchrun 會將 --local-rank=<rank> 引數傳遞給您的指令碼。從 PyTorch 2.0.0 開始,推薦使用帶連字元的 --local-rank 而不是之前使用的帶下劃線的 --local_rank。
為了向後相容,使用者可能需要在其引數解析程式碼中同時處理這兩種情況。這意味著在引數解析器中包含 "--local-rank" 和 "--local_rank"。如果只提供 "--local_rank",torchrun 將觸發錯誤:“error: unrecognized arguments: –local-rank=<rank>”。對於僅支援 PyTorch 2.0.0+ 的訓練程式碼,包含 "--local-rank" 應該足夠了。
>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local-rank", "--local_rank", type=int)
>>> args = parser.parse_args()
用法#
單節點多工作程序#
torchrun
--standalone
--nnodes=1
--nproc-per-node=$NUM_TRAINERS
YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
注意
--nproc-per-node 可以是 "gpu"(每個 GPU 啟動一個程序),"cpu"(每個 CPU 啟動一個程序),"auto"(如果 CUDA 可用,則等同於 "gpu",否則等同於 "cpu"),或者一個指定程序數的整數。更多詳細資訊請參閱 torch.distributed.run.determine_local_world_size。
堆疊式單節點多工作程序#
要在同一主機上執行多個單節點多工作程序例項(獨立作業),我們需要確保每個例項(作業)設定了不同的埠,以避免埠衝突(或者更糟的是,兩個作業合併為一個作業)。要做到這一點,您必須使用 --rdzv-backend=c10d 執行,並透過設定 --rdzv-endpoint=localhost:$PORT_k 來指定一個不同的埠。對於 --nodes=1,讓 torchrun 自動選擇一個免費埠通常比手動為每次執行分配不同埠更方便。
torchrun
--rdzv-backend=c10d
--rdzv-endpoint=localhost:0
--nnodes=1
--nproc-per-node=$NUM_TRAINERS
YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
容錯(固定大小的工作程序數,無彈性,可容忍 3 次故障)#
torchrun
--nnodes=$NUM_NODES
--nproc-per-node=$NUM_TRAINERS
--max-restarts=3
--rdzv-id=$JOB_ID
--rdzv-backend=c10d
--rdzv-endpoint=$HOST_NODE_ADDR
YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
HOST_NODE_ADDR,形式為 <host>[:<port>](例如,node1.example.com:29400),指定 C10d 集合後端應例項化和託管的節點和埠。它可以是訓練叢集中的任何節點,但最好選擇一個具有高頻寬的節點。
注意
如果未指定埠號,HOST_NODE_ADDR 預設為 29400。
彈性(min=1,max=4,可容忍最多 3 次成員變更或故障)#
torchrun
--nnodes=1:4
--nproc-per-node=$NUM_TRAINERS
--max-restarts=3
--rdzv-id=$JOB_ID
--rdzv-backend=c10d
--rdzv-endpoint=$HOST_NODE_ADDR
YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)
HOST_NODE_ADDR,形式為 <host>[:<port>](例如,node1.example.com:29400),指定 C10d 集合後端應例項化和託管的節點和埠。它可以是訓練叢集中的任何節點,但最好選擇一個具有高頻寬的節點。
注意
如果未指定埠號,HOST_NODE_ADDR 預設為 29400。
關於集合後端的說明#
對於多節點訓練,您需要指定
--rdzv-id:一個唯一的作業 ID(所有參與作業的節點共享)--rdzv-backend:torch.distributed.elastic.rendezvous.RendezvousHandler的一個實現--rdzv-endpoint:集合後端執行的端點;通常形式為host:port。
目前,開箱即支援 c10d(推薦)、etcd-v2 和 etcd(舊版)集合後端。要使用 etcd-v2 或 etcd,請設定一個啟用了 v2 API 的 etcd 伺服器(例如,--enable-v2)。
警告
etcd-v2 和 etcd 集合使用 etcd API v2。您必須在 etcd 伺服器上啟用 v2 API。我們的測試使用了 etcd v3.4.3。
警告
對於基於 etcd 的集合,我們建議使用 etcd-v2 而不是 etcd,後者功能上等效,但使用了經過修訂的實現。etcd 處於維護模式,將在未來版本中移除。
定義#
Node- 一個物理例項或容器;對映到作業管理器工作的單元。Worker- 在分散式訓練的上下文中,指一個工作程序。WorkerGroup- 執行相同功能的(例如,訓練器)工作程序的集合。LocalWorkerGroup- 同一節點上執行的工作程序組的子集。RANK- 工作程序在工作程序組內的秩。WORLD_SIZE- 工作程序組中工作程序的總數。LOCAL_RANK- 工作程序在本地工作程序組內的秩。LOCAL_WORLD_SIZE- 本地工作程序組的大小。rdzv_id- 一個使用者定義的 ID,用於唯一標識作業的工作程序組。每個節點使用此 ID 作為特定工作程序組的成員。
rdzv_backend- 集合後端(例如,c10d)。這通常是一個強一致性的鍵值儲存。rdzv_endpoint- 集合後端端點;通常形式為<host>:<port>。
一個 Node 執行 LOCAL_WORLD_SIZE 個工作程序,它們構成一個 LocalWorkerGroup。作業中所有節點上的 LocalWorkerGroups 的聯合構成 WorkerGroup。
環境變數#
以下環境變數在您的指令碼中可用
LOCAL_RANK- 本地秩。RANK- 全域性秩。GROUP_RANK- 工作程序組的秩。一個介於 0 和max_nnodes之間的數字。當每個節點執行一個工作程序組時,這就是節點的秩。ROLE_RANK- 在所有具有相同角色的工作程序中的秩。工作程序的角色在WorkerSpec中指定。LOCAL_WORLD_SIZE- 本地全域性大小(例如,本地執行的工作程序數);等於torchrun上指定的--nproc-per-node。WORLD_SIZE- 全域性大小(作業中工作程序的總數)。ROLE_WORLD_SIZE- 使用WorkerSpec中指定的相同角色啟動的工作程序總數。MASTER_ADDR- 執行秩為 0 的工作程序的主機的完全限定域名 (FQDN);用於初始化 Torch 分散式後端。MASTER_PORT-MASTER_ADDR上可用於託管 C10d TCP 儲存的埠。TORCHELASTIC_RESTART_COUNT- 到目前為止工作程序組已重啟的次數。TORCHELASTIC_MAX_RESTARTS- 配置的最大重啟次數。TORCHELASTIC_RUN_ID- 等同於集合run_id(例如,唯一的作業 ID)。PYTHON_EXEC- 系統可執行檔案覆蓋。如果提供了該變數,Python 使用者指令碼將使用PYTHON_EXEC的值作為可執行檔案。預設情況下使用 sys.executable。
部署#
(C10d 後端不需要)啟動集合後端伺服器並獲取端點(作為
--rdzv-endpoint傳遞給torchrun)單節點多工作程序:在主機上啟動
torchrun以啟動代理程序,該程序建立並監視本地工作程序組。多節點多工作程序:在所有參與訓練的節點上使用相同的引數啟動
torchrun。
當使用作業/叢集管理器時,多節點作業的入口命令應為 torchrun。
故障模式#
工作程序故障:對於具有
n個工作程序的訓練作業,如果k<=n個工作程序發生故障,所有工作程序將被停止並最多重啟max_restarts次。代理故障:代理故障會導致本地工作程序組故障。由作業管理器決定是終止整個作業(同組語義)還是嘗試替換節點。代理支援這兩種行為。
節點故障:與代理故障相同。
成員變更#
節點離開(縮減規模):代理將收到節點離開的通知,所有現有工作程序將被停止,形成一個新的
WorkerGroup,並且所有工作程序將以新的RANK和WORLD_SIZE啟動。節點加入(擴容):新節點將被加入作業,所有現有工作程序將被停止,形成一個新的
WorkerGroup,並且所有工作程序將以新的RANK和WORLD_SIZE啟動。
重要提示#
此實用程式和多程序分散式(單節點或多節點)GPU 訓練目前僅在使用 NCCL 分散式後端時能達到最佳效能。因此,NCCL 後端是 GPU 訓練推薦使用的後端。
用於初始化 Torch 程序組的環境變數由本模組提供,您無需手動傳遞
RANK。要在訓練指令碼中初始化程序組,只需執行
>>> import torch.distributed as dist
>>> dist.init_process_group(backend="gloo|nccl")
在您的訓練程式中,您可以選擇使用常規的分散式函式,或者使用
torch.nn.parallel.DistributedDataParallel()模組。如果您的訓練程式使用 GPU 進行訓練,並且您想使用torch.nn.parallel.DistributedDataParallel()模組,配置方法如下。
local_rank = int(os.environ["LOCAL_RANK"])
model = torch.nn.parallel.DistributedDataParallel(
model, device_ids=[local_rank], output_device=local_rank
)
請確保 device_ids 引數設定為您的程式碼將要操作的唯一 GPU 裝置 ID。這通常是本地秩。換句話說,為了使用此實用程式,device_ids 需要是 [int(os.environ("LOCAL_RANK"))],而 output_device 需要是 int(os.environ("LOCAL_RANK"))。
在發生故障或成員變更時,所有存活的工作程序都會立即被終止。請務必檢查點您的進度。檢查點的頻率應取決於您的作業對丟失工作的容忍度。
本模組僅支援同構
LOCAL_WORLD_SIZE。也就是說,假設所有節點執行相同數量的本地工作程序(按角色)。RANK**不穩定**。在重啟之間,節點上的本地工作程序可能會被分配與之前不同的秩範圍。**切勿** 硬編碼關於秩的穩定性或RANK與LOCAL_RANK之間任何相關性的假設。使用彈性(
min_size!=max_size)時,**請勿** 硬編碼對WORLD_SIZE的假設,因為隨著節點允許離開和加入,全域性大小可能會發生變化。建議您的指令碼具有以下結構
def main():
load_checkpoint(checkpoint_path)
initialize()
train()
def train():
for batch in iter(dataset):
train_step(batch)
if should_checkpoint:
save_checkpoint(checkpoint_path)
(推薦)在工作程序出錯時,此工具將彙總錯誤詳情(例如,時間、秩、主機、PID、回溯等)。在每個節點上,第一個錯誤(按時間戳)會作為“根本原因”錯誤進行報告。要將回溯包含在此錯誤摘要列印輸出中,您必須像下面的示例一樣裝飾您的主入口函式。如果未裝飾,摘要將不包含異常的回溯,而只包含退出程式碼。有關 torchelastic 錯誤處理的詳細資訊,請參閱:https://pytorch.com.tw/docs/stable/elastic/errors.html
from torch.distributed.elastic.multiprocessing.errors import record
@record
def main():
# do train
pass
if __name__ == "__main__":
main()