分散式流水線並行簡介#
建立日期:2024 年 7 月 9 日 | 最後更新:2024 年 12 月 12 日 | 最後驗證:2024 年 11 月 5 日
作者:Howard Huang
注意
在 github 上檢視和編輯此教程。
本教程使用一個類 GPT 的 transformer 模型來演示如何使用 torch.distributed.pipelining API 實現分散式流水線並行。
如何使用
torch.distributed.pipeliningAPI如何將流水線並行應用於 transformer 模型
如何對一組微批次利用不同的排程策略
熟悉 PyTorch 中的基本分散式訓練
設定#
透過 torch.distributed.pipelining,我們將對模型的執行進行分割槽,並在微批次上排程計算。我們將使用一個簡化的 transformer 解碼器模型。該模型架構僅用於教學目的,包含多個 transformer 解碼器層,以便我們演示如何將模型分割成不同的塊。首先,讓我們定義模型。
import torch
import torch.nn as nn
from dataclasses import dataclass
@dataclass
class ModelArgs:
dim: int = 512
n_layers: int = 8
n_heads: int = 8
vocab_size: int = 10000
class Transformer(nn.Module):
def __init__(self, model_args: ModelArgs):
super().__init__()
self.tok_embeddings = nn.Embedding(model_args.vocab_size, model_args.dim)
# Using a ModuleDict lets us delete layers witout affecting names,
# ensuring checkpoints will correctly save and load.
self.layers = torch.nn.ModuleDict()
for layer_id in range(model_args.n_layers):
self.layers[str(layer_id)] = nn.TransformerDecoderLayer(model_args.dim, model_args.n_heads)
self.norm = nn.LayerNorm(model_args.dim)
self.output = nn.Linear(model_args.dim, model_args.vocab_size)
def forward(self, tokens: torch.Tensor):
# Handling layers being 'None' at runtime enables easy pipeline splitting
h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens
for layer in self.layers.values():
h = layer(h, h)
h = self.norm(h) if self.norm else h
output = self.output(h).clone() if self.output else h
return output
然後,我們需要在指令碼中匯入必要的庫並初始化分散式訓練過程。在這種情況下,我們定義了一些全域性變數以供指令碼稍後使用。
import os
import torch.distributed as dist
from torch.distributed.pipelining import pipeline, SplitPoint, PipelineStage, ScheduleGPipe
global rank, device, pp_group, stage_index, num_stages
def init_distributed():
global rank, device, pp_group, stage_index, num_stages
rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"])
device = torch.device(f"cuda:{rank}") if torch.cuda.is_available() else torch.device("cpu")
dist.init_process_group()
# This group can be a sub-group in the N-D parallel case
pp_group = dist.new_group()
stage_index = rank
num_stages = world_size
通常在所有分散式程式中都會使用 rank、world_size 和 init_process_group() 程式碼。特定於流水線並行的全域性變數包括 pp_group,它是用於 send/recv 通訊的程序組;stage_index,在此示例中,每個 stage 只有一個 rank,因此索引等同於 rank;以及 num_stages,它等同於 world_size。
num_stages 用於設定流水線並行排程中使用的 stage 數量。例如,對於 num_stages=4,一個微批次需要經過 4 次前向傳播和 4 次後向傳播才能完成。框架需要 stage_index 來了解如何在 stage 之間進行通訊。例如,對於第一個 stage(stage_index=0),它將使用來自資料載入器的資料,而無需從任何前驅 peer 接收資料即可執行其計算。
步驟 1:分割 Transformer 模型#
有兩種不同的模型分割方法
第一種是手動模式,我們可以透過刪除模型的部分屬性來手動建立模型的兩個例項。在此示例中,對於兩個 stage(2 個 rank),模型被分成兩半。
def manual_model_split(model) -> PipelineStage:
if stage_index == 0:
# prepare the first stage model
for i in range(4, 8):
del model.layers[str(i)]
model.norm = None
model.output = None
elif stage_index == 1:
# prepare the second stage model
for i in range(4):
del model.layers[str(i)]
model.tok_embeddings = None
stage = PipelineStage(
model,
stage_index,
num_stages,
device,
)
return stage
我們可以看到第一個 stage 沒有 layer norm 或輸出層,只包含前四個 transformer 塊。第二個 stage 沒有輸入嵌入層,但包含輸出層和最後的四個 transformer 塊。然後該函式返回當前 rank 的 PipelineStage。
第二種方法是基於跟蹤器的模式,它根據 split_spec 引數自動分割模型。使用流水線規範,我們可以指示 torch.distributed.pipelining 在哪裡分割模型。在下面的程式碼塊中,我們在第 4 個 transformer 解碼器層之前進行分割,這與上面描述的手動分割相呼應。同樣,在完成分割後,可以透過呼叫 build_stage 來檢索 PipelineStage。
步驟 2:定義主執行#
在主函式中,我們將建立一個特定的流水線排程,stage 應遵循該排程。torch.distributed.pipelining 支援多種排程策略,包括單 stage-per-rank 排程 GPipe 和 1F1B,以及多 stage-per-rank 排程,如 Interleaved1F1B 和 LoopedBFS。
if __name__ == "__main__":
init_distributed()
num_microbatches = 4
model_args = ModelArgs()
model = Transformer(model_args)
# Dummy data
x = torch.ones(32, 500, dtype=torch.long)
y = torch.randint(0, model_args.vocab_size, (32, 500), dtype=torch.long)
example_input_microbatch = x.chunk(num_microbatches)[0]
# Option 1: Manual model splitting
stage = manual_model_split(model)
# Option 2: Tracer model splitting
# stage = tracer_model_split(model, example_input_microbatch)
model.to(device)
x = x.to(device)
y = y.to(device)
def tokenwise_loss_fn(outputs, targets):
loss_fn = nn.CrossEntropyLoss()
outputs = outputs.reshape(-1, model_args.vocab_size)
targets = targets.reshape(-1)
return loss_fn(outputs, targets)
schedule = ScheduleGPipe(stage, n_microbatches=num_microbatches, loss_fn=tokenwise_loss_fn)
if rank == 0:
schedule.step(x)
elif rank == 1:
losses = []
output = schedule.step(target=y, losses=losses)
print(f"losses: {losses}")
dist.destroy_process_group()
在上面的示例中,我們使用了手動方法來分割模型,但可以取消註釋該程式碼以嘗試基於跟蹤器的模型分割函式。在我們的排程中,我們需要傳入微批次的數量以及用於評估目標的損失函式。
.step() 函式處理整個小批次,並根據之前傳入的 n_microbatches 自動將其分割成微批次。然後根據排程類對微批次進行操作。在上面的示例中,我們使用的是 GPipe,它遵循簡單的全前向傳播然後全後向傳播的排程。從 rank 1 返回的輸出將與模型在單個 GPU 上執行整個批次時相同。類似地,我們可以傳入一個 losses 容器來儲存每個微批次對應的損失。
步驟 3:啟動分散式程序#
最後,我們準備執行指令碼。我們將使用 torchrun 來建立一個單主機、2 程序的任務。我們的指令碼已經以 rank 0 執行流水線 stage 0 所需的邏輯,rank 1 執行流水線 stage 1 所需的邏輯的方式編寫。
torchrun --nnodes 1 --nproc_per_node 2 pipelining_tutorial.py
結論#
在本教程中,我們學習瞭如何使用 PyTorch 的 torch.distributed.pipelining API 來實現分散式流水線並行。我們探索了環境的設定、 transformer 模型的定義以及為分散式訓練對其進行的分割。我們討論了兩種模型分割方法:手動和基於跟蹤器,並演示瞭如何在不同 stage 上排程微批次的計算。最後,我們介紹了流水線排程的執行以及使用 torchrun 啟動分散式程序。
附加資源#
我們已成功將 torch.distributed.pipelining 整合到 torchtitan 儲存庫中。TorchTitan 是一個乾淨、最小的程式碼庫,用於使用原生 PyTorch 進行大規模 LLM 訓練。有關生產就緒的流水線並行用法以及與其他分散式技術的組合,請參閱 TorchTitan 的 3D 並行端到端示例。