評價此頁

分散式資料並行#

創建於:2020年1月15日 | 最後更新於:2024年1月25日

警告

torch.nn.parallel.DistributedDataParallel 的實現會隨著時間推移而不斷演進。本文件基於 v1.4 版本時的狀態編寫。

torch.nn.parallel.DistributedDataParallel (DDP) 可透明地執行分散式資料並行訓練。本頁面將介紹其工作原理並揭示實現細節。

示例#

讓我們從一個簡單的 torch.nn.parallel.DistributedDataParallel 示例開始。該示例使用 torch.nn.Linear 作為本地模型,用 DDP 包裝它,然後對 DDP 模型執行一次前向傳播、一次後向傳播和一次最佳化器步驟。之後,本地模型上的引數將得到更新,並且不同程序上的所有模型應完全相同。

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
import os
from torch.nn.parallel import DistributedDataParallel as DDP


def example(rank, world_size):
    # create default process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)
    # create local model
    model = nn.Linear(10, 10).to(rank)
    # construct DDP model
    ddp_model = DDP(model, device_ids=[rank])
    # define loss function and optimizer
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    # forward pass
    outputs = ddp_model(torch.randn(20, 10).to(rank))
    labels = torch.randn(20, 10).to(rank)
    # backward pass
    loss_fn(outputs, labels).backward()
    # update parameters
    optimizer.step()

def main():
    world_size = 2
    mp.spawn(example,
        args=(world_size,),
        nprocs=world_size,
        join=True)

if __name__=="__main__":
    # Environment variables which need to be
    # set when using c10d's default "env"
    # initialization mode.
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29500"
    main()

DDP 可與 TorchDynamo 配合使用。當與 TorchDynamo 一起使用時,請在編譯模型之前應用 DDP 模型包裝器,以便 torchdynamo 可以基於 DDP 桶大小應用 DDPOptimizer(圖中斷最佳化)。(更多資訊請參閱 TorchDynamo DDPOptimizer。)

ddp_model = DDP(model, device_ids=[rank])
ddp_model = torch.compile(ddp_model)

內部設計#

本節將透過深入探討單次迭代中每個步驟的細節,揭示 torch.nn.parallel.DistributedDataParallel 引擎蓋下的工作原理。

  • 前提條件:DDP 依賴 c10d ProcessGroup 進行通訊。因此,應用程式必須在構建 DDP 之前建立 ProcessGroup 例項。

  • 構造:DDP 建構函式接收本地模組的引用,並將 rank 為 0 的程序的 state_dict() 廣播到組內的所有其他程序,以確保所有模型副本從完全相同的狀態開始。然後,每個 DDP 程序建立一個本地 Reducer,該 Reducer 稍後將在後向傳播期間負責梯度同步。為了提高通訊效率,Reducer 將引數梯度組織成桶(bucket),並一次減少一個桶。桶大小可以透過在 DDP 建構函式中設定 bucket_cap_mb 引數來配置。引數梯度到桶的對映是在構造時確定的,基於桶大小限制和引數大小。模型引數根據給定模型 Model.parameters() 的(大致)反向順序分配到桶中。使用反向順序的原因是 DDP 期望梯度在後向傳播期間以大約該順序準備好。下圖展示了一個示例。請注意,grad0grad1 位於 bucket1 中,而另外兩個梯度位於 bucket0 中。當然,這種假設不一定總是成立,當這種情況發生時,可能會影響 DDP 後向傳播的速度,因為 Reducer 無法在最早可能的時間開始通訊。除了分桶之外,Reducer 在構造時還會註冊 autograd 鉤子,每個引數一個鉤子。這些鉤子將在後向傳播期間,當梯度準備好時被觸發。

  • 前向傳播:DDP 接收輸入並將其傳遞給本地模型,然後分析本地模型的輸出,如果 find_unused_parameters 設定為 True。此模式允許在模型子圖上執行後向傳播,DDP 透過從模型輸出遍歷 autograd 圖並標記所有未使用的引數為可歸約狀態來找出哪些引數參與後向傳播。在後向傳播期間,Reducer 只會等待未準備好的引數,但它仍然會歸約所有桶。將引數梯度標記為已準備好並不能幫助 DDP 跳過桶(目前是這樣),但它可以防止 DDP 在後向傳播期間永遠等待缺失的梯度。請注意,遍歷 autograd 圖會引入額外的開銷,因此應用程式應僅在必要時將 find_unused_parameters 設定為 True

  • 後向傳播backward() 函式直接在損失 Tensor 上呼叫,DDP 無法控制這一點,DDP 使用在構造時註冊的 autograd 鉤子來觸發梯度同步。當一個梯度準備好後,它對應的 DDP 鉤子會觸發,DDP 隨後會將該引數梯度標記為可歸約狀態。當一個桶中的所有梯度都準備好後,Reducer 將為該桶啟動一個非同步 allreduce 操作,以計算所有程序上的梯度均值。當所有桶都準備好後,Reducer 將阻塞等待所有 allreduce 操作完成。完成後,平均梯度將被寫入所有引數的 param.grad 欄位。因此,在後向傳播之後,不同 DDP 程序上相同相應引數的 grad 欄位應該相同。

  • 最佳化器步驟:從最佳化器的角度來看,它正在最佳化一個本地模型。由於所有 DDP 程序上的模型副本都從相同的狀態開始,並且在每次迭代中具有相同的平均梯度,因此它們可以保持同步。

ddp_grad_sync.png

注意

DDP 要求所有程序上的 Reducer 例項以完全相同的順序呼叫 allreduce,這是透過始終按桶索引順序而不是實際的桶就緒順序執行 allreduce 來實現的。程序間 allreduce 順序不匹配可能導致錯誤結果或 DDP 後向傳播掛起。

實現#

以下是指向 DDP 實現元件的連結。堆疊圖顯示了程式碼的結構。

ProcessGroup#

  • ProcessGroup.hpp:包含所有程序組實現。 c10d 庫開箱即用提供了 3 種實現,即 ProcessGroupGlooProcessGroupNCCLProcessGroupMPIDistributedDataParallel 在初始化期間使用 ProcessGroup::broadcast() 將模型狀態從 rank 為 0 的程序傳送到其他程序,並在後向傳播期間使用 ProcessGroup::allreduce() 對梯度進行求和。

  • Store.hpp:協助程序組例項相互查詢的集合服務。

DistributedDataParallel#

  • distributed.py:DDP 的 Python 入口點。它實現了 nn.parallel.DistributedDataParallel 模組的初始化步驟和 forward 函式,這些函式呼叫 C++ 庫。其 _sync_param 函式在 DDP 程序處理多個裝置時執行程序內參數同步,並且還將模型緩衝區從 rank 為 0 的程序廣播到所有其他程序。程序間引數同步發生在 Reducer.cpp 中。

  • comm.h:實現了合併廣播輔助函式,該函式在初始化期間用於廣播模型狀態,並在前向傳播之前同步模型緩衝區。

  • reducer.h:提供了後向傳播中梯度同步的核心實現。它有三個入口點函式:

    • Reducer:建構函式在 distributed.py 中呼叫,該函式將 Reducer::autograd_hook() 註冊到梯度累加器。

    • autograd_hook() 函式將在梯度準備就緒時由 autograd 引擎呼叫。

    • prepare_for_backward() 在 DDP 前向傳播結束時在 distributed.py 中呼叫。當 DDP 建構函式中設定 find_unused_parametersTrue 時,它會遍歷 autograd 圖以查詢未使用的引數。

ddp_code.png

TorchDynamo DDPOptimizer#

DDP 的效能優勢來自於在後向傳播期間將 allreduce 集合操作與計算重疊。當與 TorchDynamo 一起用於編譯整個前向和後向圖時,AotAutograd 會阻止這種重疊,因為 allreduce 操作是在整個最佳化後的後向計算完成後,由 autograd 鉤子 _之後_ 啟動的。

TorchDynamo 的 DDPOptimizer 透過在後向傳播期間的 DDP allreduce 桶的邏輯邊界處中斷前向圖來提供幫助。注意:目標是在後向傳播期間中斷圖,最簡單的實現是中斷前向圖,然後在每個部分上呼叫 AotAutograd 和編譯。這使得 DDP 的 allreduce 鉤子可以在後向傳播的各個部分之間觸發,並排程通訊與計算重疊。

有關更深入的解釋和實驗結果,請參閱 這篇博文,或在 torch/_dynamo/optimizations/distributed.py 閱讀文件和程式碼。

要除錯 DDPOptimizer,請設定 TORCH_LOGS=’ddp_graphs’ 以獲取完整的圖轉儲。要獲取不帶圖的日誌,請將 ‘dynamo’、‘distributed’ 或 ‘dist_ddp’ 新增到 TORCH_LOGS(用於關於桶邊界的基本資訊)。要停用 DDPOptimizer,請設定 torch._dynamo.config.optimize_ddp=False。DDP 和 TorchDynamo 在沒有 DDPOptimizer 的情況下仍能正常工作,但效能會下降。