評價此頁

結合使用 Distributed DataParallel 和 Distributed RPC Framework#

建立日期:2020 年 7 月 28 日 | 最後更新:2023 年 6 月 6 日 | 最後驗證:未驗證

作者Pritam DamaniaYi Wang

注意

編輯github 上檢視和編輯本教程。

本教程使用一個簡單的示例來演示如何結合使用 DistributedDataParallel (DDP) 和 Distributed RPC framework,以結合分散式資料並行和分散式模型並行來訓練一個簡單的模型。示例的原始碼可以在 這裡 找到。

之前的教程 分散式資料並行入門分散式 RPC 框架入門 分別介紹瞭如何進行分散式資料並行和分散式模型並行訓練。然而,存在幾種您可能希望結合這兩種技術的訓練正規化。例如:

  1. 如果我們有一個具有稀疏部分(大型嵌入表)和稠密部分(FC 層)的模型,我們可能希望將嵌入表放在引數伺服器上,並使用 DistributedDataParallel 將 FC 層複製到多個訓練器上。 Distributed RPC framework 可用於在引數伺服器上執行嵌入查詢。

  2. 啟用 PipeDream 論文中描述的混合並行。我們可以使用 Distributed RPC framework 將模型階段管道化到多個工作節點上,並在需要時使用 DistributedDataParallel 複製每個階段。


在本教程中,我們將介紹上面提到的第一種情況。我們的設定中總共有 4 個工作節點,如下所示:

  1. 1 個 Master,負責在引數伺服器上建立嵌入表 (nn.EmbeddingBag)。Master 還負責驅動兩個訓練器上的訓練迴圈。

  2. 1 個 Parameter Server,它基本上將嵌入表儲存在記憶體中,並響應來自 Master 和 Trainers 的 RPC。

  3. 2 個 Trainers,它們儲存一個 FC 層 (nn.Linear),該層使用 DistributedDataParallel 在它們之間進行復制。Trainers 還負責執行前向傳播、反向傳播和最佳化器步驟。


整個訓練過程執行如下:

  1. Master 建立一個 RemoteModule,該模組在 Parameter Server 上儲存嵌入表。

  2. 然後,Master 啟動訓練器上的訓練迴圈,並將遠端模組傳遞給訓練器。

  3. Trainers 建立一個名為 HybridModel 的模型,該模型首先使用 Master 提供的遠端模組執行嵌入查詢,然後執行 DDP 包裝的 FC 層。

  4. Trainer 執行模型的正向傳播,並使用損失透過 Distributed Autograd 執行反向傳播。

  5. 在反向傳播過程中,首先計算 FC 層的梯度,並透過 DDP 中的 allreduce 同步到所有 Trainer。

  6. 接下來,Distributed Autograd 將梯度傳播到引數伺服器,並在那裡更新嵌入表的梯度。

  7. 最後,使用 Distributed Optimizer 來更新所有引數。

注意

如果您正在結合使用 DDP 和 RPC,在反向傳播時應始終使用 Distributed Autograd

現在,讓我們詳細瞭解每個部分。首先,在進行任何訓練之前,我們需要設定好所有工作節點。我們建立 4 個程序,其中 rank 0 和 1 是我們的 Trainers,rank 2 是 Master,rank 3 是 Parameter Server。

我們使用 TCP init_method 在所有 4 個工作節點上初始化 RPC 框架。RPC 初始化完成後,Master 使用 RemoteModule 在 Parameter Server 上建立一個包含 EmbeddingBag 層的遠端模組。然後,Master 迴圈遍歷每個 Trainer,並透過使用 rpc_async 呼叫每個 Trainer 上的 _run_trainer 來啟動訓練迴圈。最後,Master 在退出前等待所有訓練完成。

Trainers 首先使用 init_process_group 為 DDP 初始化一個 ProcessGroup,world_size=2(表示兩個 Trainer)。接下來,它們使用 TCP init_method 初始化 RPC 框架。請注意,RPC 初始化和 ProcessGroup 初始化使用的埠是不同的。這是為了避免兩個框架初始化之間的埠衝突。初始化完成後,Trainers 只需等待來自 Master 的 _run_trainer RPC。

Parameter Server 只初始化 RPC 框架,並等待來自 Trainers 和 Master 的 RPC。

def run_worker(rank, world_size):
    r"""
    A wrapper function that initializes RPC, calls the function, and shuts down
    RPC.
    """

    # We need to use different port numbers in TCP init_method for init_rpc and
    # init_process_group to avoid port conflicts.
    rpc_backend_options = TensorPipeRpcBackendOptions()
    rpc_backend_options.init_method = "tcp://:29501"

    # Rank 2 is master, 3 is ps and 0 and 1 are trainers.
    if rank == 2:
        rpc.init_rpc(
            "master",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )

        remote_emb_module = RemoteModule(
            "ps",
            torch.nn.EmbeddingBag,
            args=(NUM_EMBEDDINGS, EMBEDDING_DIM),
            kwargs={"mode": "sum"},
        )

        # Run the training loop on trainers.
        futs = []
        for trainer_rank in [0, 1]:
            trainer_name = "trainer{}".format(trainer_rank)
            fut = rpc.rpc_async(
                trainer_name, _run_trainer, args=(remote_emb_module, trainer_rank)
            )
            futs.append(fut)

        # Wait for all training to finish.
        for fut in futs:
            fut.wait()
    elif rank <= 1:
        # Initialize process group for Distributed DataParallel on trainers.
        dist.init_process_group(
            backend="gloo", rank=rank, world_size=2, init_method="tcp://:29500"
        )

        # Initialize RPC.
        trainer_name = "trainer{}".format(rank)
        rpc.init_rpc(
            trainer_name,
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )

        # Trainer just waits for RPCs from master.
    else:
        rpc.init_rpc(
            "ps",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )
        # parameter server do nothing
        pass

    # block until all rpcs finish
    rpc.shutdown()


if __name__ == "__main__":
    # 2 trainers, 1 parameter server, 1 master.
    world_size = 4
    mp.spawn(run_worker, args=(world_size,), nprocs=world_size, join=True)

在討論 Trainer 的細節之前,讓我們先介紹一下 Trainer 使用的 HybridModel。如下所述,HybridModel 使用一個在引數伺服器上儲存嵌入表的遠端模組 (remote_emb_module) 和用於 DDP 的 device 進行初始化。模型的初始化將一個 nn.Linear 層包裝在 DDP 中,以在所有 Trainer 之間複製和同步該層。

模型的正向傳播方法非常直接。它使用 RemoteModule 的 forward 方法在引數伺服器上執行嵌入查詢,並將其輸出傳遞給 FC 層。

class HybridModel(torch.nn.Module):
    r"""
    The model consists of a sparse part and a dense part.
    1) The dense part is an nn.Linear module that is replicated across all trainers using DistributedDataParallel.
    2) The sparse part is a Remote Module that holds an nn.EmbeddingBag on the parameter server.
    This remote model can get a Remote Reference to the embedding table on the parameter server.
    """

    def __init__(self, remote_emb_module, device):
        super(HybridModel, self).__init__()
        self.remote_emb_module = remote_emb_module
        self.fc = DDP(torch.nn.Linear(16, 8).cuda(device), device_ids=[device])
        self.device = device

    def forward(self, indices, offsets):
        emb_lookup = self.remote_emb_module.forward(indices, offsets)
        return self.fc(emb_lookup.cuda(self.device))

接下來,我們看看 Trainer 上的設定。Trainer 首先使用一個在引數伺服器上儲存嵌入表的遠端模組及其自己的 rank 來建立上述 HybridModel

現在,我們需要檢索一個 RRefs 列表,其中包含我們希望使用 DistributedOptimizer 最佳化的所有引數。要從引數伺服器檢索嵌入表的引數,我們可以呼叫 RemoteModule 的 remote_parameters,它會遍歷嵌入表的所有引數並返回一個 RRefs 列表。Trainer 透過 RPC 呼叫引數伺服器上的此方法,以接收所需的引數的 RRefs 列表。由於 DistributedOptimizer 始終接受一個 RRefs 列表作為需要最佳化的引數,因此我們還需要為 FC 層的本地引數建立 RRefs。這是透過遍歷 model.fc.parameters(),為每個引數建立一個 RRef,並將其追加到從 remote_parameters() 返回的列表中來完成的。請注意,我們不能使用 model.parameters(),因為這會遞迴呼叫 model.remote_emb_module.parameters(),而這不受 RemoteModule 支援。

最後,我們使用所有 RRefs 建立 DistributedOptimizer,並定義一個 CrossEntropyLoss 函式。

def _run_trainer(remote_emb_module, rank):
    r"""
    Each trainer runs a forward pass which involves an embedding lookup on the
    parameter server and running nn.Linear locally. During the backward pass,
    DDP is responsible for aggregating the gradients for the dense part
    (nn.Linear) and distributed autograd ensures gradients updates are
    propagated to the parameter server.
    """

    # Setup the model.
    model = HybridModel(remote_emb_module, rank)

    # Retrieve all model parameters as rrefs for DistributedOptimizer.

    # Retrieve parameters for embedding table.
    model_parameter_rrefs = model.remote_emb_module.remote_parameters()

    # model.fc.parameters() only includes local parameters.
    # NOTE: Cannot call model.parameters() here,
    # because this will call remote_emb_module.parameters(),
    # which supports remote_parameters() but not parameters().
    for param in model.fc.parameters():
        model_parameter_rrefs.append(RRef(param))

    # Setup distributed optimizer
    opt = DistributedOptimizer(
        optim.SGD,
        model_parameter_rrefs,
        lr=0.05,
    )

    criterion = torch.nn.CrossEntropyLoss()

現在我們可以介紹在每個 Trainer 上執行的主訓練迴圈了。get_next_batch 只是一個用於生成隨機輸入和目標以供訓練的輔助函式。我們對多個 epoch 執行訓練迴圈,對於每個 batch:

  1. 為 Distributed Autograd 設定一個 Distributed Autograd Context

  2. 執行模型的正向傳播並檢索其輸出。

  3. 使用損失函式根據我們的輸出和目標計算損失。

  4. 使用 Distributed Autograd 來執行分散式反向傳播,以損失為依據。

  5. 最後,執行一個 Distributed Optimizer 步驟來最佳化所有引數。

    def get_next_batch(rank):
        for _ in range(10):
            num_indices = random.randint(20, 50)
            indices = torch.LongTensor(num_indices).random_(0, NUM_EMBEDDINGS)

            # Generate offsets.
            offsets = []
            start = 0
            batch_size = 0
            while start < num_indices:
                offsets.append(start)
                start += random.randint(1, 10)
                batch_size += 1

            offsets_tensor = torch.LongTensor(offsets)
            target = torch.LongTensor(batch_size).random_(8).cuda(rank)
            yield indices, offsets_tensor, target

    # Train for 100 epochs
    for epoch in range(100):
        # create distributed autograd context
        for indices, offsets, target in get_next_batch(rank):
            with dist_autograd.context() as context_id:
                output = model(indices, offsets)
                loss = criterion(output, target)

                # Run distributed backward pass
                dist_autograd.backward(context_id, [loss])

                # Tun distributed optimizer
                opt.step(context_id)

                # Not necessary to zero grads as each iteration creates a different
                # distributed autograd context which hosts different grads
        print("Training done for epoch {}".format(epoch))

整個示例的原始碼可以在 這裡 找到。