Pipeline 並行#
創建於: 2025 年 6 月 16 日 | 最後更新於: 2025 年 8 月 13 日
注意
torch.distributed.pipelining 目前處於 alpha 狀態,正在開發中。API 可能會發生更改。它從 PiPPy 專案遷移而來。
為何選擇 Pipeline 並行?#
Pipeline 並行是深度學習中一種 **基礎** 的並行方式。它允許模型 **執行** 被分割,使得多個 **微批次 (micro-batches)** 可以併發執行模型的不同部分。Pipeline 並行可以是有效的技術,適用於
大規模訓練
頻寬受限的叢集
大型模型推理
上述場景的共同點是,每個裝置的計算量無法掩蓋傳統並行方式的通訊開銷,例如 FSDP 的權重 all-gather。
什麼是 torch.distributed.pipelining?#
雖然 Pipelining 在擴充套件性方面前景廣闊,但實現起來通常很困難,因為它除了需要分割模型權重之外,還需要 **分割模型的執行**。執行的分割通常需要對模型進行侵入式程式碼更改。另一個複雜性來自 **在分散式環境中排程微批次**,並考慮 **資料流依賴**。
名為 pipelining 的包提供了一個工具包,可以 **自動** 完成上述任務,從而能夠輕鬆地在 **通用** 模型上實現 Pipeline 並行。
它由兩部分組成:一個 **分割前端** 和一個 **分散式執行時**。分割前端會原封不動地接收你的模型程式碼,將其分割成“模型分割槽 (model partitions)”,並捕獲資料流關係。分散式執行時會在不同的裝置上並行執行 Pipeline 階段,處理微批次分割、排程、通訊和梯度傳播等事宜。
總而言之,pipelining 包提供了以下功能:
基於簡單規範對模型程式碼進行分割。
豐富支援 Pipeline 排程,包括 GPipe、1F1B、Interleaved 1F1B 和 Looped BFS,並提供編寫自定義排程的基礎設施。
一流的跨主機 Pipeline 並行支援,因為 PP 通常在此場景下使用(在較慢的互連上)。
與其他 PyTorch 並行技術(如資料並行 (DDP, FSDP) 或張量並行)的可組合性。 TorchTitan 專案演示了 Llama 模型上的“3D 並行”應用。
步驟 1:構建 PipelineStage#
在我們可以使用 PipelineSchedule 之前,我們需要建立 PipelineStage 物件,這些物件包裝了在該階段執行的模型部分。 PipelineStage 負責分配通訊緩衝區並建立傳送/接收操作與對等方通訊。它管理中間緩衝區,例如尚未被消耗的forward輸出,並提供一個實用程式來執行階段模型的反向傳播。
PipelineStage 需要知道階段模型的輸入和輸出形狀,以便正確分配通訊緩衝區。形狀必須是靜態的,例如在執行時,形狀不能從一步到另一步發生變化。如果執行時形狀與預期形狀不匹配,將引發 PipeliningShapeError 類。當與其他並行技術組合或應用混合精度時,必須考慮這些技術,以便 PipelineStage 知道執行時階段模組輸出的正確形狀(和資料型別)。
使用者可以直接構造 PipelineStage 例項,透過傳入一個代表應在該階段執行的模型部分的 nn.Module。這可能需要更改原始模型程式碼。請參見 選項 1:手動分割模型 中的示例。
或者,分割前端可以使用圖分割技術,自動將您的模型分割成一系列 nn.Module。此技術要求模型可使用 torch.Export 進行跟蹤。結果 nn.Module 與其他並行技術的組合是實驗性的,可能需要一些變通方法。如果使用者無法輕鬆更改模型程式碼,使用此前端可能更有吸引力。有關更多資訊,請參見 選項 2:自動分割模型。
步驟 2:使用 PipelineSchedule 進行執行#
現在我們可以將 PipelineStage 附加到 pipeline 排程,並使用輸入資料執行排程。這是一個 GPipe 示例
from torch.distributed.pipelining import ScheduleGPipe
# Create a schedule
schedule = ScheduleGPipe(stage, n_microbatches)
# Input data (whole batch)
x = torch.randn(batch_size, in_dim, device=device)
# Run the pipeline with input `x`
# `x` will be divided into microbatches automatically
if rank == 0:
schedule.step(x)
else:
output = schedule.step()
請注意,上述程式碼需要為每個 worker 啟動,因此我們使用啟動服務來啟動多個程序。
torchrun --nproc_per_node=2 example.py
模型分割選項#
選項 1:手動分割模型#
要直接構造 PipelineStage,使用者需要提供一個 nn.Module 例項,該例項擁有相關的 nn.Parameters 和 nn.Buffers,並定義一個 forward() 方法來執行與該階段相關的操作。例如,Torchtitan 中 Transformer 類的精簡版本顯示了一種構建易於分割模型的模式。
class Transformer(nn.Module):
def __init__(self, model_args: ModelArgs):
super().__init__()
self.tok_embeddings = nn.Embedding(...)
# Using a ModuleDict lets us delete layers without affecting names,
# ensuring checkpoints will correctly save and load.
self.layers = torch.nn.ModuleDict()
for layer_id in range(model_args.n_layers):
self.layers[str(layer_id)] = TransformerBlock(...)
self.output = nn.Linear(...)
def forward(self, tokens: torch.Tensor):
# Handling layers being 'None' at runtime enables easy pipeline splitting
h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens
for layer in self.layers.values():
h = layer(h, self.freqs_cis)
h = self.norm(h) if self.norm else h
output = self.output(h).float() if self.output else h
return output
以這種方式定義的模型可以透過以下方式輕鬆地按階段配置:首先初始化整個模型(使用 meta-device 避免 OOM 錯誤),刪除該階段不需要的層,然後建立一個包裝模型的 PipelineStage。例如:
with torch.device("meta"):
assert num_stages == 2, "This is a simple 2-stage example"
# we construct the entire model, then delete the parts we do not need for this stage
# in practice, this can be done using a helper function that automatically divides up layers across stages.
model = Transformer()
if stage_index == 0:
# prepare the first stage model
del model.layers["1"]
model.norm = None
model.output = None
elif stage_index == 1:
# prepare the second stage model
model.tok_embeddings = None
del model.layers["0"]
from torch.distributed.pipelining import PipelineStage
stage = PipelineStage(
model,
stage_index,
num_stages,
device,
)
當與其他資料或模型並行技術組合時,如果模型塊的輸出形狀/資料型別會受到影響,可能還需要 output_args。
選項 2:自動分割模型#
如果您擁有完整的模型,並且不想花時間將其修改為一系列“模型分割槽”,那麼 pipeline API 將為您提供幫助。這是一個簡短的示例:
class Model(torch.nn.Module):
def __init__(self) -> None:
super().__init__()
self.emb = torch.nn.Embedding(10, 3)
self.layers = torch.nn.ModuleList(
Layer() for _ in range(2)
)
self.lm = LMHead()
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.emb(x)
for layer in self.layers:
x = layer(x)
x = self.lm(x)
return x
如果我們列印模型,可以看到多個層級結構,這使得手動分割變得困難。
Model(
(emb): Embedding(10, 3)
(layers): ModuleList(
(0-1): 2 x Layer(
(lin): Linear(in_features=3, out_features=3, bias=True)
)
)
(lm): LMHead(
(proj): Linear(in_features=3, out_features=3, bias=True)
)
)
讓我們看看 pipeline API 是如何工作的。
from torch.distributed.pipelining import pipeline, SplitPoint
# An example micro-batch input
x = torch.LongTensor([1, 2, 4, 5])
pipe = pipeline(
module=mod,
mb_args=(x,),
split_spec={
"layers.1": SplitPoint.BEGINNING,
}
)
pipeline API 根據 split_spec 來分割您的模型,其中 SplitPoint.BEGINNING 表示在 forward 函式中某個子模組執行 **之前** 新增一個分割點,同理,SplitPoint.END 表示在 **之後** 新增。
如果我們 print(pipe),我們可以看到:
GraphModule(
(submod_0): GraphModule(
(emb): InterpreterModule()
(layers): Module(
(0): InterpreterModule(
(lin): InterpreterModule()
)
)
)
(submod_1): GraphModule(
(layers): Module(
(1): InterpreterModule(
(lin): InterpreterModule()
)
)
(lm): InterpreterModule(
(proj): InterpreterModule()
)
)
)
def forward(self, x):
submod_0 = self.submod_0(x); x = None
submod_1 = self.submod_1(submod_0); submod_0 = None
return (submod_1,)
“模型分割槽”由子模組 (submod_0, submod_1) 表示,每個子模組都使用原始模型操作、權重和層級結構重建。此外,一個“根級別”的 forward 函式被重建,以捕獲這些分割槽之間的資料流。這些資料流稍後將由 pipeline 執行時以分散式方式重放。
Pipe 物件提供了一個檢索“模型分割槽”的方法。
stage_mod : nn.Module = pipe.get_stage_module(stage_idx)
返回的 stage_mod 是一個 nn.Module,您可以使用它來建立最佳化器、儲存或載入檢查點,或應用其他並行技術。
Pipe 還允許您在給定 ProcessGroup 的裝置上建立分散式階段執行時。
stage = pipe.build_stage(stage_idx, device, group)
或者,如果您想在對 stage_mod 進行一些修改後稍後構建階段執行時,可以使用函式式版本的 build_stage API。例如:
from torch.distributed.pipelining import build_stage
from torch.nn.parallel import DistributedDataParallel
dp_mod = DistributedDataParallel(stage_mod)
info = pipe.info()
stage = build_stage(dp_mod, stage_idx, info, device, group)
注意
pipeline 前端使用一個跟蹤器 (torch.export) 將您的模型捕獲到單個圖中。如果您的模型不是完全可捕獲的,您可以使用下面的手動前端。
Hugging Face 示例#
在該包最初建立的 PiPPy 倉庫中,我們保留了基於未修改的 Hugging Face 模型的示例。請參見 examples/huggingface 目錄。
示例包括:
技術深入分析#
pipeline API 如何分割模型?#
首先,pipeline API 透過跟蹤模型將其轉換為有向無環圖 (DAG)。它使用 torch.export — 一個 PyTorch 2 的全圖捕獲工具 — 來跟蹤模型。
然後,它將階段所需的 **操作和引數** 組合到一個重建的子模組中:submod_0, submod_1, ...
與 Module.children() 等傳統的子模組訪問方法不同,pipeline API 不僅切割模型的模組結構,還切割模型的 **forward** 函式。
這是必需的,因為像 Module.children() 這樣的模組結構僅在 Module.__init__() 期間捕獲資訊,而不會捕獲關於 Module.forward() 的任何資訊。換句話說,Module.children() 缺乏關於以下對 Pipelining 至關重要的方面的資訊:
forward中子模組的執行順序子模組之間的啟用流
子模組之間是否存在任何函式式運算子(例如,
relu或add操作不會被Module.children()捕獲)。
pipeline API 相反,它確保了 forward 的行為得到了真正保留。它還捕獲了分割槽之間的啟用流,幫助分散式執行時在無需人工干預的情況下進行正確的傳送/接收呼叫。
pipeline API 的另一個靈活性是,分割點可以在模型層級結構內的任意級別。在分割分割槽中,與該分割槽相關的原始模型層級結構將免費重建。因此,指向子模組或引數的完全限定名稱 (FQNs) 仍然有效,並且依賴於 FQNs 的服務(如 FSDP、TP 或 checkpointing)幾乎無需更改程式碼即可與您的分割槽模組一起執行。
實現您自己的排程#
您可以透過擴充套件以下兩個類之一來實現自己的 pipeline 排程:
PipelineScheduleSinglePipelineScheduleMulti
PipelineScheduleSingle 適用於 **每個 rank 只分配一個** 階段的排程。 PipelineScheduleMulti 適用於每個 rank 分配多個階段的排程。
例如,ScheduleGPipe 和 Schedule1F1B 是 PipelineScheduleSingle 的子類。而 ScheduleInterleaved1F1B、ScheduleLoopedBFS、ScheduleInterleavedZeroBubble 和 ScheduleZBVZeroBubble 是 PipelineScheduleMulti 的子類。
日誌記錄#
您可以使用 torch._logging 中的 TORCH_LOGS 環境變數啟用額外的日誌記錄。
TORCH_LOGS=+pp將顯示logging.DEBUG訊息及其以上所有級別。TORCH_LOGS=pp將顯示logging.INFO訊息及其以上。TORCH_LOGS=-pp將顯示logging.WARNING訊息及其以上。
API 參考#
模型分割 API#
以下 API 集將您的模型轉換為 pipeline 表示。
- class torch.distributed.pipelining.SplitPoint(value)[source]#
列舉,表示在子模組執行中可以發生分割的點。:ivar BEGINNING: 表示在 forward 函式中某個子模組執行 **之前** 新增一個分割點。:ivar END: 表示在 forward 函式中某個子模組執行 **之後** 新增一個分割點。
- torch.distributed.pipelining.pipeline(module, mb_args, mb_kwargs=None, split_spec=None, split_policy=None)[source]#
根據規範分割模組。
更多詳情請參見 Pipe。
- 引數
module (Module) – 要分割的模組。
mb_kwargs (Optional[dict[str, Any]]) – 示例關鍵字輸入,以微批次形式。(預設為:None)
split_spec (Optional[dict[str, torch.distributed.pipelining._IR.SplitPoint]]) – 使用子模組名稱作為分割標記的字典。(預設為:None)
split_policy (Optional[Callable[[GraphModule], GraphModule]]) – 用於分割模組的策略。(預設為:None)
- 返回型別
類 Pipe 的 pipeline 表示。
微批次工具#
- torch.distributed.pipelining.microbatch.split_args_kwargs_into_chunks(args, kwargs, chunks, args_chunk_spec=None, kwargs_chunk_spec=None)[source]#
給定一系列 args 和 kwargs,根據它們各自的分塊規範將它們分割成多個塊。
- 引數
chunks (int) – 要將 args 和 kwargs 分割成的塊數。
args_chunk_spec (Optional[tuple[torch.distributed.pipelining.microbatch.TensorChunkSpec, ...]]) – args 的分塊規範,形狀與 args 相同。
kwargs_chunk_spec (Optional[dict[str, torch.distributed.pipelining.microbatch.TensorChunkSpec]]) – kwargs 的分塊規範,形狀與 kwargs 相同。
- 返回
分片 args 和 kwargs 的列表:分片 kwargs 的列表。
- 返回型別
args_split
Pipeline 階段#
- class torch.distributed.pipelining.stage.PipelineStage(submodule, stage_index, num_stages, device, input_args=None, output_args=None, group=None, dw_builder=None)[source]#
表示 Pipeline 並行設定中 pipeline 階段的類。
PipelineStage 假設模型的順序分割,即模型被分割成塊,一個塊的輸出饋入下一個塊的輸入,沒有跳過連線。
PipelineStage 透過按線性順序將 stage0 的輸出傳播到 stage1 等,自動執行執行時形狀/資料型別推斷。要繞過形狀推斷,請將 input_args 和 output_args 傳遞給每個 PipelineStage 例項。
- 引數
submodule (nn.Module) – 由此階段包裝的 PyTorch 模組。
stage_index (int) – 此階段的 ID。
num_stages (int) – 階段總數。
device (torch.device) – 此階段所在裝置。
input_args (Union[torch.Tensor, Tuple[torch.tensor]], optional) – 子模組的輸入引數。
output_args (Union[torch.Tensor, Tuple[torch.tensor]], optional) – 子模組的輸出引數。
group (dist.ProcessGroup, optional) – 分散式訓練的程序組。如果為 None,則使用預設組。
dw_builder (Optional[Callable[[], Callable[..., None]]) – 如果提供,dw_builder 將構建一個新的 dw_runner 函式,該函式將用於 F, I, W(正向、輸入、權重)零氣泡排程的 W 操作(輸入權重)。
- torch.distributed.pipelining.stage.build_stage(stage_module, stage_index, pipe_info, device, group=None)[source]#
給定一個要由此階段包裝的 stage_module 和 pipeline 資訊,建立一個 pipeline 階段。
- 引數
stage_module (torch.nn.Module) – 要由此階段包裝的模組。
stage_index (int) – 此階段在 pipeline 中的索引。
pipe_info (PipeInfo) – 關於 pipeline 的資訊,可以透過 pipe.info() 檢索。
device (torch.device) – 此階段要使用的裝置。
group (Optional[dist.ProcessGroup]) – 此階段要使用的程序組。
- 返回
一個可以與 PipelineSchedules 一起執行的 pipeline 階段。
- 返回型別
_PipelineStage
Pipeline 排程#
- class torch.distributed.pipelining.schedules.ScheduleGPipe(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#
GPipe 排程。將以填充-排空 (fill-drain) 的方式遍歷所有微批次。
- class torch.distributed.pipelining.schedules.Schedule1F1B(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#
1F1B 排程。在穩定狀態下,將對微批次執行一次前向和一次後向。
- class torch.distributed.pipelining.schedules.ScheduleInterleaved1F1B(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#
交錯 1F1B 排程。請參閱 https://arxiv.org/pdf/2104.04473 以獲取詳細資訊。在穩定狀態下,將對微批次執行一次前向和一次後向,並支援每個 rank 多個階段。當微批次準備好處理多個本地階段時,Interleaved 1F1B 會優先處理較早的微批次(也稱為“深度優先”)。
此排程與原始論文非常相似。它的不同之處在於放寬了 num_microbatch % pp_size == 0 的要求。使用 flex_pp 排程,我們將得到 num_rounds = max(1, n_microbatches // pp_group_size),並且只要 n_microbatches % num_rounds == 0 即可工作。例如,支援:
pp_group_size = 4, n_microbatches = 10。我們將得到 num_rounds = 2,且 n_microbatches % 2 == 0。
pp_group_size = 4, n_microbatches = 3。我們將得到 num_rounds = 1,且 n_microbatches % 1 == 0。
- class torch.distributed.pipelining.schedules.ScheduleLoopedBFS(stages, n_microbatches, loss_fn=None, output_merge_spec=None, scale_grads=True)[source]#
廣度優先 Pipeline 並行。請參閱 https://arxiv.org/abs/2211.05953 以獲取詳細資訊。與 Interleaved 1F1B 類似,Looped BFS 支援每個 rank 多個階段。不同之處在於,當微批次準備好處理多個本地階段時,Looped BFS 會優先處理較早的階段,一次性執行所有可用的微批次。
- class torch.distributed.pipelining.schedules.ScheduleInterleavedZeroBubble(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#
交錯零氣泡排程。請參閱 https://arxiv.org/pdf/2401.10241 以獲取詳細資訊。在穩定狀態下,將對微批次的輸入執行一次前向和一次後向,並支援每個 rank 多個階段。使用反向傳播權重來填充 pipeline 氣泡。
特別地,這實現了論文中的 ZB1P 排程。
- class torch.distributed.pipelining.schedules.ScheduleZBVZeroBubble(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#
零氣泡排程(ZBV 變體)。請參閱 https://arxiv.org/pdf/2401.10241 第 6 節以獲取詳細資訊。
此排程要求每個 rank 必須有兩個階段。
此排程在穩定狀態下將對微批次的輸入執行一次前向和一次後向,並支援每個 rank 多個階段。使用反向傳播權重來填充 pipeline 氣泡。
這個 ZB-V 排程只有在 time forward == time backward input == time backward weights 時才具有“零氣泡”屬性。實際上,對於真實的模型,這不太可能發生,因此可以改用貪婪排程器來實現不等/不平衡的時間。
- class torch.distributed.pipelining.schedules.ScheduleDualPipeV(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#
DualPipeV 排程。基於 DeepSeek 在 https://arxiv.org/pdf/2412.19437 中提出的 DualPipe 排程的更高效的排程變體。
基於 deepseek-ai/DualPipe 的開原始碼。
- class torch.distributed.pipelining.schedules.PipelineScheduleSingle(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#
單階段排程的基類。實現了 step 方法。派生類應實現 _step_microbatches。
根據 scale_grads 引數(預設為 True),梯度會按 num_microbatches 進行縮放。此設定應與您的 loss_fn 的配置匹配,loss_fn 可以平均損失(scale_grads=True)或求和損失(scale_grads=False)。
- class torch.distributed.pipelining.schedules.PipelineScheduleMulti(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, use_full_backward=None, scale_grads=True)[source]#
多階段排程的基類。實現了 step 方法。
根據 scale_grads 引數(預設為 True),梯度會按 num_microbatches 進行縮放。此設定應與您的 loss_fn 的配置匹配,loss_fn 可以平均損失(scale_grads=True)或求和損失(scale_grads=False)。