評價此頁

Pipeline 並行#

創建於: 2025 年 6 月 16 日 | 最後更新於: 2025 年 8 月 13 日

注意

torch.distributed.pipelining 目前處於 alpha 狀態,正在開發中。API 可能會發生更改。它從 PiPPy 專案遷移而來。

為何選擇 Pipeline 並行?#

Pipeline 並行是深度學習中一種 **基礎** 的並行方式。它允許模型 **執行** 被分割,使得多個 **微批次 (micro-batches)** 可以併發執行模型的不同部分。Pipeline 並行可以是有效的技術,適用於

  • 大規模訓練

  • 頻寬受限的叢集

  • 大型模型推理

上述場景的共同點是,每個裝置的計算量無法掩蓋傳統並行方式的通訊開銷,例如 FSDP 的權重 all-gather。

什麼是 torch.distributed.pipelining#

雖然 Pipelining 在擴充套件性方面前景廣闊,但實現起來通常很困難,因為它除了需要分割模型權重之外,還需要 **分割模型的執行**。執行的分割通常需要對模型進行侵入式程式碼更改。另一個複雜性來自 **在分散式環境中排程微批次**,並考慮 **資料流依賴**。

名為 pipelining 的包提供了一個工具包,可以 **自動** 完成上述任務,從而能夠輕鬆地在 **通用** 模型上實現 Pipeline 並行。

它由兩部分組成:一個 **分割前端** 和一個 **分散式執行時**。分割前端會原封不動地接收你的模型程式碼,將其分割成“模型分割槽 (model partitions)”,並捕獲資料流關係。分散式執行時會在不同的裝置上並行執行 Pipeline 階段,處理微批次分割、排程、通訊和梯度傳播等事宜。

總而言之,pipelining 包提供了以下功能:

  • 基於簡單規範對模型程式碼進行分割。

  • 豐富支援 Pipeline 排程,包括 GPipe、1F1B、Interleaved 1F1B 和 Looped BFS,並提供編寫自定義排程的基礎設施。

  • 一流的跨主機 Pipeline 並行支援,因為 PP 通常在此場景下使用(在較慢的互連上)。

  • 與其他 PyTorch 並行技術(如資料並行 (DDP, FSDP) 或張量並行)的可組合性。 TorchTitan 專案演示了 Llama 模型上的“3D 並行”應用。

步驟 1:構建 PipelineStage#

在我們可以使用 PipelineSchedule 之前,我們需要建立 PipelineStage 物件,這些物件包裝了在該階段執行的模型部分。 PipelineStage 負責分配通訊緩衝區並建立傳送/接收操作與對等方通訊。它管理中間緩衝區,例如尚未被消耗的forward輸出,並提供一個實用程式來執行階段模型的反向傳播。

PipelineStage 需要知道階段模型的輸入和輸出形狀,以便正確分配通訊緩衝區。形狀必須是靜態的,例如在執行時,形狀不能從一步到另一步發生變化。如果執行時形狀與預期形狀不匹配,將引發 PipeliningShapeError 類。當與其他並行技術組合或應用混合精度時,必須考慮這些技術,以便 PipelineStage 知道執行時階段模組輸出的正確形狀(和資料型別)。

使用者可以直接構造 PipelineStage 例項,透過傳入一個代表應在該階段執行的模型部分的 nn.Module。這可能需要更改原始模型程式碼。請參見 選項 1:手動分割模型 中的示例。

或者,分割前端可以使用圖分割技術,自動將您的模型分割成一系列 nn.Module。此技術要求模型可使用 torch.Export 進行跟蹤。結果 nn.Module 與其他並行技術的組合是實驗性的,可能需要一些變通方法。如果使用者無法輕鬆更改模型程式碼,使用此前端可能更有吸引力。有關更多資訊,請參見 選項 2:自動分割模型

步驟 2:使用 PipelineSchedule 進行執行#

現在我們可以將 PipelineStage 附加到 pipeline 排程,並使用輸入資料執行排程。這是一個 GPipe 示例

from torch.distributed.pipelining import ScheduleGPipe

# Create a schedule
schedule = ScheduleGPipe(stage, n_microbatches)

# Input data (whole batch)
x = torch.randn(batch_size, in_dim, device=device)

# Run the pipeline with input `x`
# `x` will be divided into microbatches automatically
if rank == 0:
    schedule.step(x)
else:
    output = schedule.step()

請注意,上述程式碼需要為每個 worker 啟動,因此我們使用啟動服務來啟動多個程序。

torchrun --nproc_per_node=2 example.py

模型分割選項#

選項 1:手動分割模型#

要直接構造 PipelineStage,使用者需要提供一個 nn.Module 例項,該例項擁有相關的 nn.Parametersnn.Buffers,並定義一個 forward() 方法來執行與該階段相關的操作。例如,Torchtitan 中 Transformer 類的精簡版本顯示了一種構建易於分割模型的模式。

class Transformer(nn.Module):
    def __init__(self, model_args: ModelArgs):
        super().__init__()

        self.tok_embeddings = nn.Embedding(...)

        # Using a ModuleDict lets us delete layers without affecting names,
        # ensuring checkpoints will correctly save and load.
        self.layers = torch.nn.ModuleDict()
        for layer_id in range(model_args.n_layers):
            self.layers[str(layer_id)] = TransformerBlock(...)

        self.output = nn.Linear(...)

    def forward(self, tokens: torch.Tensor):
        # Handling layers being 'None' at runtime enables easy pipeline splitting
        h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens

        for layer in self.layers.values():
            h = layer(h, self.freqs_cis)

        h = self.norm(h) if self.norm else h
        output = self.output(h).float() if self.output else h
        return output

以這種方式定義的模型可以透過以下方式輕鬆地按階段配置:首先初始化整個模型(使用 meta-device 避免 OOM 錯誤),刪除該階段不需要的層,然後建立一個包裝模型的 PipelineStage。例如:

with torch.device("meta"):
    assert num_stages == 2, "This is a simple 2-stage example"

    # we construct the entire model, then delete the parts we do not need for this stage
    # in practice, this can be done using a helper function that automatically divides up layers across stages.
    model = Transformer()

    if stage_index == 0:
        # prepare the first stage model
        del model.layers["1"]
        model.norm = None
        model.output = None

    elif stage_index == 1:
        # prepare the second stage model
        model.tok_embeddings = None
        del model.layers["0"]

    from torch.distributed.pipelining import PipelineStage
    stage = PipelineStage(
        model,
        stage_index,
        num_stages,
        device,
    )

當與其他資料或模型並行技術組合時,如果模型塊的輸出形狀/資料型別會受到影響,可能還需要 output_args

選項 2:自動分割模型#

如果您擁有完整的模型,並且不想花時間將其修改為一系列“模型分割槽”,那麼 pipeline API 將為您提供幫助。這是一個簡短的示例:

class Model(torch.nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.emb = torch.nn.Embedding(10, 3)
        self.layers = torch.nn.ModuleList(
            Layer() for _ in range(2)
        )
        self.lm = LMHead()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.emb(x)
        for layer in self.layers:
            x = layer(x)
        x = self.lm(x)
        return x

如果我們列印模型,可以看到多個層級結構,這使得手動分割變得困難。

Model(
  (emb): Embedding(10, 3)
  (layers): ModuleList(
    (0-1): 2 x Layer(
      (lin): Linear(in_features=3, out_features=3, bias=True)
    )
  )
  (lm): LMHead(
    (proj): Linear(in_features=3, out_features=3, bias=True)
  )
)

讓我們看看 pipeline API 是如何工作的。

from torch.distributed.pipelining import pipeline, SplitPoint

# An example micro-batch input
x = torch.LongTensor([1, 2, 4, 5])

pipe = pipeline(
    module=mod,
    mb_args=(x,),
    split_spec={
        "layers.1": SplitPoint.BEGINNING,
    }
)

pipeline API 根據 split_spec 來分割您的模型,其中 SplitPoint.BEGINNING 表示在 forward 函式中某個子模組執行 **之前** 新增一個分割點,同理,SplitPoint.END 表示在 **之後** 新增。

如果我們 print(pipe),我們可以看到:

GraphModule(
  (submod_0): GraphModule(
    (emb): InterpreterModule()
    (layers): Module(
      (0): InterpreterModule(
        (lin): InterpreterModule()
      )
    )
  )
  (submod_1): GraphModule(
    (layers): Module(
      (1): InterpreterModule(
        (lin): InterpreterModule()
      )
    )
    (lm): InterpreterModule(
      (proj): InterpreterModule()
    )
  )
)

def forward(self, x):
    submod_0 = self.submod_0(x);  x = None
    submod_1 = self.submod_1(submod_0);  submod_0 = None
    return (submod_1,)

“模型分割槽”由子模組 (submod_0, submod_1) 表示,每個子模組都使用原始模型操作、權重和層級結構重建。此外,一個“根級別”的 forward 函式被重建,以捕獲這些分割槽之間的資料流。這些資料流稍後將由 pipeline 執行時以分散式方式重放。

Pipe 物件提供了一個檢索“模型分割槽”的方法。

stage_mod : nn.Module = pipe.get_stage_module(stage_idx)

返回的 stage_mod 是一個 nn.Module,您可以使用它來建立最佳化器、儲存或載入檢查點,或應用其他並行技術。

Pipe 還允許您在給定 ProcessGroup 的裝置上建立分散式階段執行時。

stage = pipe.build_stage(stage_idx, device, group)

或者,如果您想在對 stage_mod 進行一些修改後稍後構建階段執行時,可以使用函式式版本的 build_stage API。例如:

from torch.distributed.pipelining import build_stage
from torch.nn.parallel import DistributedDataParallel

dp_mod = DistributedDataParallel(stage_mod)
info = pipe.info()
stage = build_stage(dp_mod, stage_idx, info, device, group)

注意

pipeline 前端使用一個跟蹤器 (torch.export) 將您的模型捕獲到單個圖中。如果您的模型不是完全可捕獲的,您可以使用下面的手動前端。

Hugging Face 示例#

在該包最初建立的 PiPPy 倉庫中,我們保留了基於未修改的 Hugging Face 模型的示例。請參見 examples/huggingface 目錄。

示例包括:

技術深入分析#

pipeline API 如何分割模型?#

首先,pipeline API 透過跟蹤模型將其轉換為有向無環圖 (DAG)。它使用 torch.export — 一個 PyTorch 2 的全圖捕獲工具 — 來跟蹤模型。

然後,它將階段所需的 **操作和引數** 組合到一個重建的子模組中:submod_0, submod_1, ...

Module.children() 等傳統的子模組訪問方法不同,pipeline API 不僅切割模型的模組結構,還切割模型的 **forward** 函式。

這是必需的,因為像 Module.children() 這樣的模組結構僅在 Module.__init__() 期間捕獲資訊,而不會捕獲關於 Module.forward() 的任何資訊。換句話說,Module.children() 缺乏關於以下對 Pipelining 至關重要的方面的資訊:

  • forward 中子模組的執行順序

  • 子模組之間的啟用流

  • 子模組之間是否存在任何函式式運算子(例如,reluadd 操作不會被 Module.children() 捕獲)。

pipeline API 相反,它確保了 forward 的行為得到了真正保留。它還捕獲了分割槽之間的啟用流,幫助分散式執行時在無需人工干預的情況下進行正確的傳送/接收呼叫。

pipeline API 的另一個靈活性是,分割點可以在模型層級結構內的任意級別。在分割分割槽中,與該分割槽相關的原始模型層級結構將免費重建。因此,指向子模組或引數的完全限定名稱 (FQNs) 仍然有效,並且依賴於 FQNs 的服務(如 FSDP、TP 或 checkpointing)幾乎無需更改程式碼即可與您的分割槽模組一起執行。

實現您自己的排程#

您可以透過擴充套件以下兩個類之一來實現自己的 pipeline 排程:

  • PipelineScheduleSingle

  • PipelineScheduleMulti

PipelineScheduleSingle 適用於 **每個 rank 只分配一個** 階段的排程。 PipelineScheduleMulti 適用於每個 rank 分配多個階段的排程。

例如,ScheduleGPipeSchedule1F1BPipelineScheduleSingle 的子類。而 ScheduleInterleaved1F1BScheduleLoopedBFSScheduleInterleavedZeroBubbleScheduleZBVZeroBubblePipelineScheduleMulti 的子類。

日誌記錄#

您可以使用 torch._logging 中的 TORCH_LOGS 環境變數啟用額外的日誌記錄。

  • TORCH_LOGS=+pp 將顯示 logging.DEBUG 訊息及其以上所有級別。

  • TORCH_LOGS=pp 將顯示 logging.INFO 訊息及其以上。

  • TORCH_LOGS=-pp 將顯示 logging.WARNING 訊息及其以上。

API 參考#

模型分割 API#

以下 API 集將您的模型轉換為 pipeline 表示。

class torch.distributed.pipelining.SplitPoint(value)[source]#

列舉,表示在子模組執行中可以發生分割的點。:ivar BEGINNING: 表示在 forward 函式中某個子模組執行 **之前** 新增一個分割點。:ivar END: 表示在 forward 函式中某個子模組執行 **之後** 新增一個分割點。

torch.distributed.pipelining.pipeline(module, mb_args, mb_kwargs=None, split_spec=None, split_policy=None)[source]#

根據規範分割模組。

更多詳情請參見 Pipe

引數
返回型別

Pipe 的 pipeline 表示。

class torch.distributed.pipelining.Pipe(split_gm, num_stages, has_loss_and_backward, loss_spec)[source]#
torch.distributed.pipelining.pipe_split()[source]#

pipe_split 是一個特殊運算子,用於標記模組中階段之間的邊界。它用於將模組分割成階段。如果註釋的模組是立即執行的,它將是一個無操作。

示例

>>> def forward(self, x):
>>>     x = torch.mm(x, self.mm_param)
>>>     x = torch.relu(x)
>>>     pipe_split()
>>>     x = self.lin(x)
>>>     return x

上面的例子將被分割成兩個階段。

微批次工具#

class torch.distributed.pipelining.microbatch.TensorChunkSpec(split_dim)[source]#

用於指定輸入分塊的類。

torch.distributed.pipelining.microbatch.split_args_kwargs_into_chunks(args, kwargs, chunks, args_chunk_spec=None, kwargs_chunk_spec=None)[source]#

給定一系列 args 和 kwargs,根據它們各自的分塊規範將它們分割成多個塊。

引數
返回

分片 args 和 kwargs 的列表:分片 kwargs 的列表。

返回型別

args_split

torch.distributed.pipelining.microbatch.merge_chunks(chunks, chunk_spec)[source]#

給定一個塊列表,根據分塊規範將它們合併成一個值。

引數
  • chunks (list[Any]) – 塊的列表。

  • chunk_spec – 塊的分塊規範。

返回

合併後的值。

返回型別

value

Pipeline 階段#

class torch.distributed.pipelining.stage.PipelineStage(submodule, stage_index, num_stages, device, input_args=None, output_args=None, group=None, dw_builder=None)[source]#

表示 Pipeline 並行設定中 pipeline 階段的類。

PipelineStage 假設模型的順序分割,即模型被分割成塊,一個塊的輸出饋入下一個塊的輸入,沒有跳過連線。

PipelineStage 透過按線性順序將 stage0 的輸出傳播到 stage1 等,自動執行執行時形狀/資料型別推斷。要繞過形狀推斷,請將 input_argsoutput_args 傳遞給每個 PipelineStage 例項。

引數
  • submodule (nn.Module) – 由此階段包裝的 PyTorch 模組。

  • stage_index (int) – 此階段的 ID。

  • num_stages (int) – 階段總數。

  • device (torch.device) – 此階段所在裝置。

  • input_args (Union[torch.Tensor, Tuple[torch.tensor]], optional) – 子模組的輸入引數。

  • output_args (Union[torch.Tensor, Tuple[torch.tensor]], optional) – 子模組的輸出引數。

  • group (dist.ProcessGroup, optional) – 分散式訓練的程序組。如果為 None,則使用預設組。

  • dw_builder (Optional[Callable[[], Callable[..., None]]) – 如果提供,dw_builder 將構建一個新的 dw_runner 函式,該函式將用於 F, I, W(正向、輸入、權重)零氣泡排程的 W 操作(輸入權重)。

torch.distributed.pipelining.stage.build_stage(stage_module, stage_index, pipe_info, device, group=None)[source]#

給定一個要由此階段包裝的 stage_module 和 pipeline 資訊,建立一個 pipeline 階段。

引數
  • stage_module (torch.nn.Module) – 要由此階段包裝的模組。

  • stage_index (int) – 此階段在 pipeline 中的索引。

  • pipe_info (PipeInfo) – 關於 pipeline 的資訊,可以透過 pipe.info() 檢索。

  • device (torch.device) – 此階段要使用的裝置。

  • group (Optional[dist.ProcessGroup]) – 此階段要使用的程序組。

返回

一個可以與 PipelineSchedules 一起執行的 pipeline 階段。

返回型別

_PipelineStage

Pipeline 排程#

class torch.distributed.pipelining.schedules.ScheduleGPipe(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#

GPipe 排程。將以填充-排空 (fill-drain) 的方式遍歷所有微批次。

class torch.distributed.pipelining.schedules.Schedule1F1B(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#

1F1B 排程。在穩定狀態下,將對微批次執行一次前向和一次後向。

class torch.distributed.pipelining.schedules.ScheduleInterleaved1F1B(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#

交錯 1F1B 排程。請參閱 https://arxiv.org/pdf/2104.04473 以獲取詳細資訊。在穩定狀態下,將對微批次執行一次前向和一次後向,並支援每個 rank 多個階段。當微批次準備好處理多個本地階段時,Interleaved 1F1B 會優先處理較早的微批次(也稱為“深度優先”)。

此排程與原始論文非常相似。它的不同之處在於放寬了 num_microbatch % pp_size == 0 的要求。使用 flex_pp 排程,我們將得到 num_rounds = max(1, n_microbatches // pp_group_size),並且只要 n_microbatches % num_rounds == 0 即可工作。例如,支援:

  1. pp_group_size = 4, n_microbatches = 10。我們將得到 num_rounds = 2,且 n_microbatches % 2 == 0。

  2. pp_group_size = 4, n_microbatches = 3。我們將得到 num_rounds = 1,且 n_microbatches % 1 == 0。

class torch.distributed.pipelining.schedules.ScheduleLoopedBFS(stages, n_microbatches, loss_fn=None, output_merge_spec=None, scale_grads=True)[source]#

廣度優先 Pipeline 並行。請參閱 https://arxiv.org/abs/2211.05953 以獲取詳細資訊。與 Interleaved 1F1B 類似,Looped BFS 支援每個 rank 多個階段。不同之處在於,當微批次準備好處理多個本地階段時,Looped BFS 會優先處理較早的階段,一次性執行所有可用的微批次。

class torch.distributed.pipelining.schedules.ScheduleInterleavedZeroBubble(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#

交錯零氣泡排程。請參閱 https://arxiv.org/pdf/2401.10241 以獲取詳細資訊。在穩定狀態下,將對微批次的輸入執行一次前向和一次後向,並支援每個 rank 多個階段。使用反向傳播權重來填充 pipeline 氣泡。

特別地,這實現了論文中的 ZB1P 排程。

class torch.distributed.pipelining.schedules.ScheduleZBVZeroBubble(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#

零氣泡排程(ZBV 變體)。請參閱 https://arxiv.org/pdf/2401.10241 第 6 節以獲取詳細資訊。

此排程要求每個 rank 必須有兩個階段。

此排程在穩定狀態下將對微批次的輸入執行一次前向和一次後向,並支援每個 rank 多個階段。使用反向傳播權重來填充 pipeline 氣泡。

這個 ZB-V 排程只有在 time forward == time backward input == time backward weights 時才具有“零氣泡”屬性。實際上,對於真實的模型,這不太可能發生,因此可以改用貪婪排程器來實現不等/不平衡的時間。

class torch.distributed.pipelining.schedules.ScheduleDualPipeV(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#

DualPipeV 排程。基於 DeepSeek 在 https://arxiv.org/pdf/2412.19437 中提出的 DualPipe 排程的更高效的排程變體。

基於 deepseek-ai/DualPipe 的開原始碼。

class torch.distributed.pipelining.schedules.PipelineScheduleSingle(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source]#

單階段排程的基類。實現了 step 方法。派生類應實現 _step_microbatches

根據 scale_grads 引數(預設為 True),梯度會按 num_microbatches 進行縮放。此設定應與您的 loss_fn 的配置匹配,loss_fn 可以平均損失(scale_grads=True)或求和損失(scale_grads=False)。

step(*args, target=None, losses=None, **kwargs)[source]#

使用 **整批** 輸入執行 pipeline 排程的一個迭代。會自動將輸入分塊,並根據排程實現遍歷微批次。

args:模型的 positional 引數(與非 pipeline 情況相同)。kwargs:模型的關鍵字引數(與非 pipeline 情況相同)。target:損失函式的 target。losses:一個列表,用於儲存每個微批次的損失。

class torch.distributed.pipelining.schedules.PipelineScheduleMulti(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, use_full_backward=None, scale_grads=True)[source]#

多階段排程的基類。實現了 step 方法。

根據 scale_grads 引數(預設為 True),梯度會按 num_microbatches 進行縮放。此設定應與您的 loss_fn 的配置匹配,loss_fn 可以平均損失(scale_grads=True)或求和損失(scale_grads=False)。

step(*args, target=None, losses=None, **kwargs)[source]#

使用 **整批** 輸入執行 pipeline 排程的一個迭代。會自動將輸入分塊,並根據排程實現遍歷微批次。

args:模型的 positional 引數(與非 pipeline 情況相同)。kwargs:模型的關鍵字引數(與非 pipeline 情況相同)。target:損失函式的 target。losses:一個列表,用於儲存每個微批次的損失。