評價此頁

簡介 || 什麼是 DDP || 單節點多 GPU 訓練 || 容錯性 || 多節點訓練 || minGPT 訓練

使用 DDP 進行多 GPU 訓練#

創建於:2022年9月27日 | 最後更新:2024年11月3日 | 最後驗證:未驗證

作者: Suraj Subramanian

您將學到什麼
  • 如何透過 DDP 將單 GPU 訓練指令碼遷移到多 GPU

  • 設定分散式程序組

  • 在分散式設定中儲存和載入模型

GitHub 上檢視本教程使用的程式碼

先決條件
  • DDP 工作原理 高層概述

  • 一臺具有多個 GPU 的機器(本教程使用 AWS p3.8xlarge 例項)

  • 已安裝帶 CUDA 的 PyTorch 請在此處安裝

請觀看下方影片或在 YouTube 上觀看。

上一篇教程 中,我們對 DDP 的工作原理進行了高層概述;現在我們將學習如何在程式碼中使用 DDP。在本教程中,我們將從一個單 GPU 訓練指令碼開始,並將其遷移到單節點上的 4 個 GPU 上執行。在此過程中,我們將討論分散式訓練中的重要概念,並在程式碼中實現它們。

注意

如果您的模型包含任何 BatchNorm 層,則需要將其轉換為 SyncBatchNorm,以在副本之間同步 BatchNorm 層的執行統計資料。

使用輔助函式 torch.nn.SyncBatchNorm.convert_sync_batchnorm(model) 將模型中的所有 BatchNorm 層轉換為 SyncBatchNorm

single_gpu.pymultigpu.py 的差異

這些是您通常需要對單 GPU 訓練指令碼進行的更改,以啟用 DDP。

匯入#

  • torch.multiprocessing 是 PyTorch 對 Python 原生多程序的封裝。

  • 分散式程序組包含所有可以相互通訊和同步的程序。

import torch
import torch.nn.functional as F
from utils import MyTrainDataset

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os

構建程序組#

  • 首先,在初始化程序組之前,呼叫 set_device,它會為每個程序設定預設 GPU。這對於防止在 GPU:0 上掛起或過度使用記憶體非常重要。

  • 程序組可以透過 TCP(預設)或共享檔案系統進行初始化。詳細瞭解 程序組初始化

  • init_process_group 初始化分散式程序組。

  • 詳細瞭解 選擇 DDP 後端

def ddp_setup(rank: int, world_size: int):
   """
   Args:
       rank: Unique identifier of each process
      world_size: Total number of processes
   """
   os.environ["MASTER_ADDR"] = "localhost"
   os.environ["MASTER_PORT"] = "12355"
   torch.cuda.set_device(rank)
   init_process_group(backend="nccl", rank=rank, world_size=world_size)

構建 DDP 模型#

self.model = DDP(model, device_ids=[gpu_id])

分發輸入資料#

  • DistributedSampler 將輸入資料分割到所有分散式程序中。

  • DataLoader 結合了資料集和

    取樣器,並提供了一個可迭代的物件來遍歷給定的資料集。

  • 每個程序將接收一個包含 32 個樣本的輸入批次;有效批次大小為 32 * nprocs,在使用 4 個 GPU 時為 128。

train_data = torch.utils.data.DataLoader(
    dataset=train_dataset,
    batch_size=32,
    shuffle=False,  # We don't shuffle
    sampler=DistributedSampler(train_dataset), # Use the Distributed Sampler here.
)
  • 在每個 epoch 開始時,必須在 DistributedSampler 上呼叫 set_epoch() 方法,以使多 epoch 的 shuffle 功能正常工作。否則,每個 epoch 將使用相同的順序。

def _run_epoch(self, epoch):
    b_sz = len(next(iter(self.train_data))[0])
    self.train_data.sampler.set_epoch(epoch)   # call this additional line at every epoch
    for source, targets in self.train_data:
      ...
      self._run_batch(source, targets)

儲存模型檢查點#

- ckp = self.model.state_dict()
+ ckp = self.model.module.state_dict()
...
...
- if epoch % self.save_every == 0:
+ if self.gpu_id == 0 and epoch % self.save_every == 0:
  self._save_checkpoint(epoch)

警告

集體通訊函式 是在所有分散式程序上執行的函式,它們用於將某些狀態或值收集到特定程序。集體通訊需要所有 rank 執行集體程式碼。在此示例中,_save_checkpoint 不應包含任何集體通訊函式,因為它僅在 rank:0 程序上執行。如果您需要進行任何集體通訊,應在 if self.gpu_id == 0 檢查之前進行。

執行分散式訓練作業#

  • 包含新的引數 rank(替換 device)和 world_size

  • rank 是在呼叫 mp.spawn 時由 DDP 自動分配的。

  • world_size 是訓練作業中的程序總數。對於 GPU 訓練,這對應於使用的 GPU 數量,每個程序處理一個專用的 GPU。

- def main(device, total_epochs, save_every):
+ def main(rank, world_size, total_epochs, save_every):
+  ddp_setup(rank, world_size)
   dataset, model, optimizer = load_train_objs()
   train_data = prepare_dataloader(dataset, batch_size=32)
-  trainer = Trainer(model, train_data, optimizer, device, save_every)
+  trainer = Trainer(model, train_data, optimizer, rank, save_every)
   trainer.train(total_epochs)
+  destroy_process_group()

if __name__ == "__main__":
   import sys
   total_epochs = int(sys.argv[1])
   save_every = int(sys.argv[2])
-  device = 0      # shorthand for cuda:0
-  main(device, total_epochs, save_every)
+  world_size = torch.cuda.device_count()
+  mp.spawn(main, args=(world_size, total_epochs, save_every,), nprocs=world_size)

程式碼如下所示:

進一步閱讀#