分散式資料並行入門#
創建於: 2019年4月23日 | 最後更新: 2025年9月23日 | 最後驗證: 2024年11月5日
作者:Shen Li
編輯者: Joe Zhu, Chirag Pandya
注意
在 github 檢視和編輯此教程。
先決條件
DistributedDataParallel (DDP) 是 PyTorch 中一個強大的模組,允許您跨多個機器並行化模型,非常適合大規模深度學習應用。要使用 DDP,您需要啟動多個程序,並在每個程序中建立一個 DDP 例項。
但它是如何工作的呢?DDP 使用 torch.distributed 包中的集體通訊來跨所有程序同步梯度和緩衝區。這意味著每個程序都有自己的模型副本,但它們會協同工作,就像模型在一個機器上一樣進行訓練。
為了實現這一點,DDP 為模型中的每個引數註冊一個 autograd hook。當執行反向傳播時,此 hook 會觸發跨所有程序的梯度同步。這確保每個程序都擁有相同的梯度,然後使用這些梯度來更新模型。
有關 DDP 工作原理和如何有效使用它的更多資訊,請務必檢視 DDP 設計說明。使用 DDP,您可以比以往任何時候都更快、更有效地訓練您的模型!
推薦使用 DDP 的方式是為每個模型副本啟動一個程序。模型副本可以跨越多個裝置。DDP 程序可以位於同一臺機器上,也可以跨多臺機器。請注意,GPU 裝置不能跨 DDP 程序共享(即一個 GPU 對應一個 DDP 程序)。
在本教程中,我們將從一個基本的 DDP 用例開始,然後展示更高階的用例,包括模型檢查點和 DDP 與模型並行結合使用。
注意
本教程中的程式碼在 8-GPU 伺服器上執行,但可以輕鬆推廣到其他環境。
DataParallel 與 DistributedDataParallel 的比較#
在我們深入研究之前,讓我們澄清一下,儘管 DistributedDataParallel 增加了複雜性,為什麼您會考慮使用它而不是 DataParallel。
首先,
DataParallel是單程序、多執行緒的,但它只能在一臺機器上執行。相比之下,DistributedDataParallel是多程序的,支援單機和多機訓練。由於執行緒間的 GIL 爭用、每個迭代複製的模型以及輸入散射和輸出收集引入的額外開銷,即使在一臺機器上,DataParallel通常也比DistributedDataParallel慢。回顧一下 之前的教程,如果您的模型太大而無法放入單個 GPU,您必須使用 **模型並行** 將其拆分到多個 GPU 上。
DistributedDataParallel可以與 **模型並行** 一起使用,而DataParallel目前則不能。當 DDP 與模型並行結合使用時,每個 DDP 程序將使用模型並行,而所有程序將 collectively 使用資料並行。
基本用例#
要建立一個 DDP 模組,您必須首先正確設定程序組。更多詳細資訊可以在 使用 PyTorch 編寫分散式應用程式 中找到。
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
# "gloo",
# rank=rank,
# init_method=init_method,
# world_size=world_size)
# For TcpStore, same way as on Linux.
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# We want to be able to train our model on an `accelerator <https://pytorch.com.tw/docs/stable/torch.html#accelerators>`__
# such as CUDA, MPS, MTIA, or XPU.
acc = torch.accelerator.current_accelerator()
backend = torch.distributed.get_default_backend_for_device(acc)
# initialize the process group
dist.init_process_group(backend, rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
現在,讓我們建立一個玩具模組,用 DDP 包裝它,並向其提供一些虛擬輸入資料。請注意,由於 DDP 在 DDP 建構函式中將模型狀態從 rank 0 程序廣播到所有其他程序,因此您無需擔心不同的 DDP 程序從不同的初始模型引數值開始。
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
def demo_basic(rank, world_size):
print(f"Running basic DDP example on rank {rank}.")
setup(rank, world_size)
# create model and move it to GPU with id rank
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
print(f"Finished running basic DDP example on rank {rank}.")
def run_demo(demo_fn, world_size):
mp.spawn(demo_fn,
args=(world_size,),
nprocs=world_size,
join=True)
如您所見,DDP 封裝了低階的分散式通訊細節,並提供了清晰的 API,就像一個本地模型一樣。梯度同步通訊在反向傳播過程中發生,並與反向計算重疊。當 backward() 返回時,param.grad 已經包含了同步的梯度張量。對於基本用例,DDP 只需增加幾行程式碼即可設定程序組。當將 DDP 應用於更高階的用例時,一些注意事項需要謹慎。
處理速度不均#
在 DDP 中,建構函式、前向傳播和後向傳播是分散式同步點。不同的程序應啟動相同數量的同步,並以相同的順序到達這些同步點,並在大致相同的時間進入每個同步點。否則,快速的程序可能會提前到達並因等待滯後者而超時。因此,使用者負責平衡程序之間的工作負載分配。有時,由於網路延遲、資源爭用或不可預測的工作負載峰值等原因,處理速度不均是不可避免的。為避免這種情況下的超時,請確保在呼叫 init_process_group 時傳遞一個足夠大的 timeout 值。
儲存和載入檢查點#
通常使用 torch.save 和 torch.load 在訓練過程中為模型建立檢查點並從檢查點恢復。有關更多詳細資訊,請參閱 儲存和載入模型。使用 DDP 時,一個最佳化是僅在一個程序中儲存模型,然後將其載入到所有程序中,從而減少寫入開銷。這是可行的,因為所有程序都從相同的引數開始,並且在後向傳播中同步梯度,因此最佳化器應該持續將引數設定為相同的值。如果您使用此最佳化(即在一個程序中儲存,在所有程序中恢復),請確保沒有程序在儲存完成之前開始載入。此外,載入模組時,您需要提供適當的 map_location 引數,以防止程序訪問其他程序的裝置。如果缺少 map_location,torch.load 將首先將模組載入到 CPU,然後將每個引數複製到其儲存的位置,這將導致同一臺機器上的所有程序使用同一組裝置。有關更高階的故障恢復和彈性支援,請參閱 TorchElastic。
def demo_checkpoint(rank, world_size):
print(f"Running DDP checkpoint example on rank {rank}.")
setup(rank, world_size)
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
if rank == 0:
# All processes should see same parameters as they all start from same
# random parameters and gradients are synchronized in backward passes.
# Therefore, saving it in one process is sufficient.
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# Use a barrier() to make sure that process 1 loads the model after process
# 0 saves it.
dist.barrier()
# We want to be able to train our model on an `accelerator <https://pytorch.com.tw/docs/stable/torch.html#accelerators>`__
# such as CUDA, MPS, MTIA, or XPU.
acc = torch.accelerator.current_accelerator()
# configure map_location properly
map_location = {f'{acc}:0': f'{acc}:{rank}'}
ddp_model.load_state_dict(
torch.load(CHECKPOINT_PATH, map_location=map_location, weights_only=True))
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
# Not necessary to use a dist.barrier() to guard the file deletion below
# as the AllReduce ops in the backward pass of DDP already served as
# a synchronization.
if rank == 0:
os.remove(CHECKPOINT_PATH)
cleanup()
print(f"Finished running DDP checkpoint example on rank {rank}.")
DDP 與模型並行結合#
DDP 也適用於多 GPU 模型。包裝多 GPU 模型的 DDP 在訓練具有海量資料的大型模型時尤其有用。
class ToyMpModel(nn.Module):
def __init__(self, dev0, dev1):
super(ToyMpModel, self).__init__()
self.dev0 = dev0
self.dev1 = dev1
self.net1 = torch.nn.Linear(10, 10).to(dev0)
self.relu = torch.nn.ReLU()
self.net2 = torch.nn.Linear(10, 5).to(dev1)
def forward(self, x):
x = x.to(self.dev0)
x = self.relu(self.net1(x))
x = x.to(self.dev1)
return self.net2(x)
將多 GPU 模型傳遞給 DDP 時,絕對不能設定 device_ids 和 output_device。輸入和輸出資料將由應用程式或模型的 forward() 方法放置在適當的裝置上。
def demo_model_parallel(rank, world_size):
print(f"Running DDP with model parallel example on rank {rank}.")
setup(rank, world_size)
# setup mp_model and devices for this process
dev0 = rank * 2
dev1 = rank * 2 + 1
mp_model = ToyMpModel(dev0, dev1)
ddp_mp_model = DDP(mp_model)
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)
optimizer.zero_grad()
# outputs will be on dev1
outputs = ddp_mp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(dev1)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
print(f"Finished running DDP with model parallel example on rank {rank}.")
if __name__ == "__main__":
n_gpus = torch.accelerator.device_count()
assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
world_size = n_gpus
run_demo(demo_basic, world_size)
run_demo(demo_checkpoint, world_size)
world_size = n_gpus//2
run_demo(demo_model_parallel, world_size)
使用 torch.distributed.run/torchrun 初始化 DDP#
我們可以利用 PyTorch Elastic 來簡化 DDP 程式碼並更輕鬆地初始化作業。我們仍然使用 Toymodel 示例並建立一個名為 elastic_ddp.py 的檔案。
import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
def demo_basic():
torch.accelerator.set_device_index(int(os.environ["LOCAL_RANK"]))
acc = torch.accelerator.current_accelerator()
backend = torch.distributed.get_default_backend_for_device(acc)
dist.init_process_group(backend)
rank = dist.get_rank()
print(f"Start running basic DDP example on rank {rank}.")
# create model and move it to GPU with id rank
device_id = rank % torch.accelerator.device_count()
model = ToyModel().to(device_id)
ddp_model = DDP(model, device_ids=[device_id])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(device_id)
loss_fn(outputs, labels).backward()
optimizer.step()
dist.destroy_process_group()
print(f"Finished running basic DDP example on rank {rank}.")
if __name__ == "__main__":
demo_basic()
然後,可以在所有節點上執行 torch elastic/torchrun 命令來初始化上面建立的 DDP 作業。
torchrun --nnodes=2 --nproc_per_node=8 --rdzv_id=100 --rdzv_backend=c10d --rdzv_endpoint=$MASTER_ADDR:29400 elastic_ddp.py
在上面的示例中,我們在兩個主機上執行 DDP 指令碼,並且每個主機執行 8 個程序。也就是說,我們在 16 個 GPU 上執行此作業。請注意,$MASTER_ADDR 在所有節點上必須相同。
torchrun 將啟動 8 個程序,並在每個程序上呼叫 elastic_ddp.py,但在節點上呼叫。然而,使用者還需要應用叢集管理工具(如 slurm)來實際在 2 個節點上執行此命令。
例如,在啟用了 SLURM 的叢集上,我們可以編寫一個指令碼來執行上述命令並將 MASTER_ADDR 設定為
export MASTER_ADDR=$(scontrol show hostname ${SLURM_NODELIST} | head -n 1)
然後我們可以使用 SLURM 命令執行此指令碼: srun --nodes=2 ./torchrun_script.sh。
這只是一個示例;您可以選擇自己的叢集排程工具來啟動 torchrun 作業。
有關 Elastic run 的更多資訊,請參閱 快速入門文件。