評價此頁

分散式流水線並行簡介#

建立日期:2024 年 7 月 9 日 | 最後更新:2024 年 12 月 12 日 | 最後驗證:2024 年 11 月 5 日

作者Howard Huang

注意

editgithub 上檢視和編輯此教程。

本教程使用一個類 GPT 的 transformer 模型來演示如何使用 torch.distributed.pipelining API 實現分散式流水線並行。

您將學到什麼
  • 如何使用 torch.distributed.pipelining API

  • 如何將流水線並行應用於 transformer 模型

  • 如何對一組微批次利用不同的排程策略

先決條件

設定#

透過 torch.distributed.pipelining,我們將對模型的執行進行分割槽,並在微批次上排程計算。我們將使用一個簡化的 transformer 解碼器模型。該模型架構僅用於教學目的,包含多個 transformer 解碼器層,以便我們演示如何將模型分割成不同的塊。首先,讓我們定義模型。

import torch
import torch.nn as nn
from dataclasses import dataclass

@dataclass
class ModelArgs:
   dim: int = 512
   n_layers: int = 8
   n_heads: int = 8
   vocab_size: int = 10000

class Transformer(nn.Module):
   def __init__(self, model_args: ModelArgs):
      super().__init__()

      self.tok_embeddings = nn.Embedding(model_args.vocab_size, model_args.dim)

      # Using a ModuleDict lets us delete layers witout affecting names,
      # ensuring checkpoints will correctly save and load.
      self.layers = torch.nn.ModuleDict()
      for layer_id in range(model_args.n_layers):
            self.layers[str(layer_id)] = nn.TransformerDecoderLayer(model_args.dim, model_args.n_heads)

      self.norm = nn.LayerNorm(model_args.dim)
      self.output = nn.Linear(model_args.dim, model_args.vocab_size)

   def forward(self, tokens: torch.Tensor):
      # Handling layers being 'None' at runtime enables easy pipeline splitting
      h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens

      for layer in self.layers.values():
            h = layer(h, h)

      h = self.norm(h) if self.norm else h
      output = self.output(h).clone() if self.output else h
      return output

然後,我們需要在指令碼中匯入必要的庫並初始化分散式訓練過程。在這種情況下,我們定義了一些全域性變數以供指令碼稍後使用。

import os
import torch.distributed as dist
from torch.distributed.pipelining import pipeline, SplitPoint, PipelineStage, ScheduleGPipe

global rank, device, pp_group, stage_index, num_stages
def init_distributed():
   global rank, device, pp_group, stage_index, num_stages
   rank = int(os.environ["LOCAL_RANK"])
   world_size = int(os.environ["WORLD_SIZE"])
   device = torch.device(f"cuda:{rank}") if torch.cuda.is_available() else torch.device("cpu")
   dist.init_process_group()

   # This group can be a sub-group in the N-D parallel case
   pp_group = dist.new_group()
   stage_index = rank
   num_stages = world_size

通常在所有分散式程式中都會使用 rankworld_sizeinit_process_group() 程式碼。特定於流水線並行的全域性變數包括 pp_group,它是用於 send/recv 通訊的程序組;stage_index,在此示例中,每個 stage 只有一個 rank,因此索引等同於 rank;以及 num_stages,它等同於 world_size。

num_stages 用於設定流水線並行排程中使用的 stage 數量。例如,對於 num_stages=4,一個微批次需要經過 4 次前向傳播和 4 次後向傳播才能完成。框架需要 stage_index 來了解如何在 stage 之間進行通訊。例如,對於第一個 stage(stage_index=0),它將使用來自資料載入器的資料,而無需從任何前驅 peer 接收資料即可執行其計算。

步驟 1:分割 Transformer 模型#

有兩種不同的模型分割方法

第一種是手動模式,我們可以透過刪除模型的部分屬性來手動建立模型的兩個例項。在此示例中,對於兩個 stage(2 個 rank),模型被分成兩半。

def manual_model_split(model) -> PipelineStage:
   if stage_index == 0:
      # prepare the first stage model
      for i in range(4, 8):
            del model.layers[str(i)]
      model.norm = None
      model.output = None

   elif stage_index == 1:
      # prepare the second stage model
      for i in range(4):
            del model.layers[str(i)]
      model.tok_embeddings = None

   stage = PipelineStage(
      model,
      stage_index,
      num_stages,
      device,
   )
   return stage

我們可以看到第一個 stage 沒有 layer norm 或輸出層,只包含前四個 transformer 塊。第二個 stage 沒有輸入嵌入層,但包含輸出層和最後的四個 transformer 塊。然後該函式返回當前 rank 的 PipelineStage

第二種方法是基於跟蹤器的模式,它根據 split_spec 引數自動分割模型。使用流水線規範,我們可以指示 torch.distributed.pipelining 在哪裡分割模型。在下面的程式碼塊中,我們在第 4 個 transformer 解碼器層之前進行分割,這與上面描述的手動分割相呼應。同樣,在完成分割後,可以透過呼叫 build_stage 來檢索 PipelineStage

步驟 2:定義主執行#

在主函式中,我們將建立一個特定的流水線排程,stage 應遵循該排程。torch.distributed.pipelining 支援多種排程策略,包括單 stage-per-rank 排程 GPipe1F1B,以及多 stage-per-rank 排程,如 Interleaved1F1BLoopedBFS

if __name__ == "__main__":
   init_distributed()
   num_microbatches = 4
   model_args = ModelArgs()
   model = Transformer(model_args)

   # Dummy data
   x = torch.ones(32, 500, dtype=torch.long)
   y = torch.randint(0, model_args.vocab_size, (32, 500), dtype=torch.long)
   example_input_microbatch = x.chunk(num_microbatches)[0]

   # Option 1: Manual model splitting
   stage = manual_model_split(model)

   # Option 2: Tracer model splitting
   # stage = tracer_model_split(model, example_input_microbatch)

   model.to(device)
   x = x.to(device)
   y = y.to(device)

   def tokenwise_loss_fn(outputs, targets):
      loss_fn = nn.CrossEntropyLoss()
      outputs = outputs.reshape(-1, model_args.vocab_size)
      targets = targets.reshape(-1)
      return loss_fn(outputs, targets)

   schedule = ScheduleGPipe(stage, n_microbatches=num_microbatches, loss_fn=tokenwise_loss_fn)

   if rank == 0:
      schedule.step(x)
   elif rank == 1:
      losses = []
      output = schedule.step(target=y, losses=losses)
      print(f"losses: {losses}")
   dist.destroy_process_group()

在上面的示例中,我們使用了手動方法來分割模型,但可以取消註釋該程式碼以嘗試基於跟蹤器的模型分割函式。在我們的排程中,我們需要傳入微批次的數量以及用於評估目標的損失函式。

.step() 函式處理整個小批次,並根據之前傳入的 n_microbatches 自動將其分割成微批次。然後根據排程類對微批次進行操作。在上面的示例中,我們使用的是 GPipe,它遵循簡單的全前向傳播然後全後向傳播的排程。從 rank 1 返回的輸出將與模型在單個 GPU 上執行整個批次時相同。類似地,我們可以傳入一個 losses 容器來儲存每個微批次對應的損失。

步驟 3:啟動分散式程序#

最後,我們準備執行指令碼。我們將使用 torchrun 來建立一個單主機、2 程序的任務。我們的指令碼已經以 rank 0 執行流水線 stage 0 所需的邏輯,rank 1 執行流水線 stage 1 所需的邏輯的方式編寫。

torchrun --nnodes 1 --nproc_per_node 2 pipelining_tutorial.py

結論#

在本教程中,我們學習瞭如何使用 PyTorch 的 torch.distributed.pipelining API 來實現分散式流水線並行。我們探索了環境的設定、 transformer 模型的定義以及為分散式訓練對其進行的分割。我們討論了兩種模型分割方法:手動和基於跟蹤器,並演示瞭如何在不同 stage 上排程微批次的計算。最後,我們介紹了流水線排程的執行以及使用 torchrun 啟動分散式程序。

附加資源#

我們已成功將 torch.distributed.pipelining 整合到 torchtitan 儲存庫中。TorchTitan 是一個乾淨、最小的程式碼庫,用於使用原生 PyTorch 進行大規模 LLM 訓練。有關生產就緒的流水線並行用法以及與其他分散式技術的組合,請參閱 TorchTitan 的 3D 並行端到端示例