評價此頁

使用 Monarch 實現互動式分散式應用#

作者: Amir Afzali

簡介#

隨著深度學習模型規模和複雜度的不斷增長,高效地訓練它們需要跨多個 GPU 和節點協調計算。在本教程中,您將學習如何使用 Monarch 的 Actor 框架以及 TorchTitan,在一個由 SLURM 管理的叢集上輕鬆設定和執行大規模分散式工作流。Monarch 將使我們能夠像在單臺主機、單程序環境中開發一樣,驅動一個大型機器叢集(組織成一個網格)。

什麼是 Monarch?#

Monarch 是一個 Actor 框架,旨在簡化分散式應用程式的開發。其核心是,Monarch 提供了:

  • 基於 Actor 的程式設計模型:將有狀態計算封裝在可以遠端程序和機器上執行的 Actor 中

  • 程序網格抽象:輕鬆管理和協調叢集中的分散式程序,支援可伸縮的 Actor 訊息傳遞

  • 容錯性:Actor 和程序構成一個樹狀結構,故障會沿著樹向上傳播,提供良好的預設錯誤行為並實現細粒度的故障恢復。

  • 靈活的資源管理:支援多種叢集排程器,包括 SLURM、Kubernetes、自定義主機管理和本地程序

  • 整合監控:將來自遠端程序的日誌流式傳輸回客戶端,便於除錯和聚合

有關更多詳細資訊,請參閱 Monarch 文件

為什麼使用 Monarch?#

TorchTitan 是一個 PyTorch 原生的大規模預訓練庫。雖然 TorchTitan 提供了出色的分散式訓練原語,但在叢集上啟動和管理這些作業可能會減慢迭代速度。Monarch 透過以下方式解決了這個問題:

  1. 簡化的叢集互動:使用簡單的非同步 Python 呼叫來預留和管理計算資源,而不是編寫 bash 指令碼

  2. 互動式開發:在現有分配上修改和重新執行訓練程式碼,而無需等待新資源

  3. 統一的工作流:使用相同的程式碼在本地測試和叢集執行之間無縫切換

先決條件#

本教程依賴於 Titan 的 nightly build,因此請確保其他 Torch 庫也跟蹤 nightly builds。

  1. 已安裝 Monarch nightly 版本: 安裝指令碼

  2. 已安裝 TorchTitan nightly 版本: TorchTitan 安裝說明

  3. 在您的工作目錄中**一個有效的 Titan 模型配置檔案**和**分詞器**(例如,來自 TorchTitan 配置 的 `debug_model.toml`)。

  4. SLURM 叢集訪問許可權

    • 擁有預留節點和啟動作業的足夠許可權。

    • 為分散式 GPU 訓練配置的 CUDA 環境。

現在,讓我們一步步來實現!

步驟 1:預留機器資源#

首先,我們將定義一個函式以程式設計方式預留機器分配。

Monarch 亮點:您無需提交 SBATCH 指令碼,就可以從 Python 中互動式地預留和管理資源。JobTrait 設計模式允許透過一致的 API 與自定義排程器(如 SLURM 和 Kubernetes)進行互動。

from monarch.job import SlurmJob, JobTrait


def create_slurm_job(
    mesh_name: str,
    num_nodes: int,
    gpus_per_node: int,
    time_limit: str = "06:00:00"
) -> SlurmJob:
    """
    Args:
        mesh_name: Name assigned to the primary mesh for this example.
                   A JobTrait can consist of multiple meshes, and
                   Monarch allows for re-attaching to ongoing jobs.
        num_nodes: Number of nodes allocated per mesh
        gpus_per_node: Number of GPUs per node in the mesh

        Note: SlurmJob is just one instance of a Monarch scheduler interface.
              Consult the JobTrait documentation to find one that's right for your usecase.
    """
    default_job_name = "monarch_titan"
    return SlurmJob(
        meshes={mesh_name: num_nodes},
        job_name=default_job_name,
        time_limit=time_limit,
        gpus_per_nodes=gpus_per_node,
        # ... additional args can be passed here
    )

步驟 2:定義 Trainer Actor#

現在我們建立一個 Monarch Actor 來封裝 TorchTitan 的 Trainer。這是允許 TorchTitan 在 Monarch 的分散式環境中執行的關鍵抽象。

Monarch 亮點:Actor 模式提供了幾個好處:

  1. 遠端執行:帶有 @endpoint 裝飾器的函式可以遠端呼叫

  2. 生命週期管理:Monarch 負責初始化、執行和清理

  3. 錯誤處理:異常會被正確地傳播回客戶端,支援漸進式錯誤處理

import torch
from monarch.actor import Actor, current_rank, endpoint
from monarch.utils import setup_env_for_distributed
from torchtitan.tools.logging import init_logger, logger
from torchtitan.train import Trainer


class TrainerActor(Actor):
    """
    Monarch Actor wrapper for TorchTitan's Trainer.

    This actor encapsulates a complete TorchTitan training process, handling
    initialization, training loop execution, and cleanup. Each instance runs
    on a single GPU in the distributed training job.

    The actor's lifetime:
        1. __init__: Initialize with job configuration
        2. start_training:
           Execute the training loop
           Destroy process group and release resources

    Attributes:
        job_config: TorchTitan configuration for this trainer
        uid: Unique identifier for logging (includes rank)
    """

    def __init__(self, job_config: "JobConfig") -> None:
        """
        Initialize the trainer actor.

        Args:
            job_config: TorchTitan JobConfig with training parameters
        """
        self.job_config = job_config

        # current_rank() provides access to this actor's rank in the process mesh
        self.rank = current_rank().rank
        self.uid = f"[trainer_{rank}]"

    @endpoint
    async def ping_rank(self) -> None:
        """
            A dummy logging function we will use for demonstration purposes.
        """
        logger.info(f"{self.uid} Ping!")

    @endpoint
    async def start_training(self) -> None:
        """
        Execute the TorchTitan training loop.

        This remote endpoint:
        1. Initializes TorchTitan's logger
        2. Creates a Trainer instance with the job configuration
        3. Runs the training loop
        4. Handles cleanup and error conditions

        The @endpoint decorator makes this method callable from the Monarch
        client, even though it runs on a remote GPU node.

        Raises:
            Exception: Any exception from TorchTitan training is propagated
                      back to the client
        """
        init_logger()
        trainer: Trainer | None = None
        try:
            # Initialize TorchTitan trainer
            trainer = Trainer(self.job_config)
            logger.info(f"{self.uid} initialized successfully and starting training")

            # Run the training loop
            trainer.train()

        except Exception as e:
            logger.error(f"{self.uid} training failed: {e}")
            if trainer:
                trainer.close()
            # Note: error is propagated back to the controller
            raise e

        else:
            # Training completed successfully
            trainer.close()
            logger.info(f"{self.uid} training completed successfully")

        finally:
            # Clean up distributed process group
            torch.distributed.destroy_process_group()
            logger.info(f"{self.uid} trainer cleaned up")

Actor 端點可以以多種模式呼叫。我們將在 步驟 4:執行訓練工作流 中探討一個具體的例子,但這裡有一些常見用法的虛擬碼:

try:
    # where mesh0 is made of N nodes, each node having 8 GPUs
    proc_mesh = mesh0.spawn_procs({"gpus": 8})
    trainer_actors = proc_mesh.spawn("trainers", TrainerActor, ...)

    # Call on all ranks
    await trainer_actors.ping_rank.call()

    # Call-and-forget on all ranks
    trainer_actors.ping_rank.broadcast()

    # Call on ONE random rank
    await trainer_actors.ping_rank.choose()

    # Call on the first 3 ranks of node 0
    await trainer_actors.slice(hosts=0, gpus=slice(0, 3)).ping_rank.call()

except Exception as e:
    # handle SupervisionEvents from remote actor failures
    pass

遠端 Actor 端點還可以利用 Python 原生斷點,支援互動式除錯會話。有關 Monarch 偵錯程式的完整深入介紹,請參考文件

@endpoint
    async def ping_debuggable_rank(self) -> None:
        logger.info(f"{self.uid} Ping!")
        if self.rank == 0:
            breakpoint()
        logger.info(f"{self.uid} Pong!")

步驟 3:定義訓練引數#

接下來,我們為訓練作業和叢集資源定義一些通用引數。此配置決定了訓練的規模(節點數和 GPU 數)以及一些訓練超引數。

from dataclasses import dataclass


@dataclass
class RunParams:
    """
    Configuration for cluster resources and training parameters.

    Attributes:
        training_steps: Number of training iterations to run
        model_config: Path to TorchTitan model configuration file
        tokenizer: Path to tokenizer directory
        dataset: Dataset to use for training (e.g., 'c4', 'c4_test')
        num_nodes: Number of compute nodes to request
        gpus_per_node: Number of GPUs per node

    Adjust these values based on your model size and available resources.
    """

    training_steps: int = 50
    model_config: str = "debug_model.toml"
    tokenizer: str = "tokenizer"
    dataset: str = "c4"
    num_nodes: int = 2
    gpus_per_node: int = 8

TorchTitan 使用 JobConfig 物件來控制訓練的所有方面。這裡我們建立一個函式,從 RunParams 解析此配置。

import os
from torchtitan.config import ConfigManager, JobConfig


def make_job_config() -> JobConfig:
    """
    Create a TorchTitan JobConfig from RunParams.

    This function constructs the complete training configuration, including
    parallelism settings, model architecture, and dataset paths
    """
    # Calculate total parallelism based on cluster size
    data_parallel_shard_degree = RunParams.num_nodes * RunParams.gpus_per_node
    output_path = "./outputs"
    # Construct paths relative to script directory
    script_dir = os.getcwd()

    # Build argument list for TorchTitan's ConfigManager
    # These override defaults from the model config file
    default_args = [
        "--job.config_file",
        os.path.join(script_dir, RunParams.model_config),
        "--model.tokenizer_path",
        os.path.join(script_dir, RunParams.tokenizer),
        "--parallelism.data_parallel_shard_degree",
        str(data_parallel_shard_degree),
        "--training.steps",
        str(RunParams.training_steps),
        "--training.dataset",
        RunParams.dataset,
        "--job.dump_folder",
        output_path,
        # continue to configure as needed
    ]
    config_manager = ConfigManager()
    job_config = config_manager.parse_args(default_args)
    return job_config

步驟 4:執行訓練工作流#

在定義了所有元件後,我們現在來協調整個工作流。這是 Monarch 的強大功能最突出的地方。

Monarch 亮點:

  1. 互動式迭代:預留機器分配後,您可以調整邏輯並重新啟動 Actor,而無需請求新資源。SLURM 的共享檔案系統可確保框架/工作區更改在工作節點之間同步。

  2. 透明日誌記錄:所有來自遠端工作節點的日誌都會即時流式傳輸回您的客戶端,使除錯感覺就像在本地執行一樣

工作流:

預留機器 → 建立程序網格 → 配置日誌 → 啟動 Actor → 訓練 → 清理

async def execute_training() -> None:
    """
    Execute the complete distributed training workflow.
    """
    job_config = make_job_config()
    slurm_job = None
    mesh_name = "mesh0"
    try:
        # 1. Create a SLURM job with N nodes
        #    This leverages Monarch to reserve a persistent machine allocation
        slurm_job = create_slurm_job(mesh_name, RunParams.num_nodes, RunParams.gpus_per_node)
        job_state = slurm_job.state()

        # 2. Create a process mesh on the machine allocation
        #    This creates one process per GPU across all allocated nodes
        logger.info("Creating process mesh...")
        proc_mesh = job_state.mesh0.spawn_procs({"gpus": RunParams.gpus_per_node})

        # 3. Configure remote logging behavior
        #    - stream_to_client: Forward all remote logs to your local console
        #    - aggregate_window_sec: Batch logs for efficiency
        logger.info("Configuring logging...")
        await proc_mesh.logging_option(
            stream_to_client=True,
            # aggregate_window_sec=None  # Uncomment to disable log batching
        )

        # 4. Setup environment for torch.distributed
        #    This configures torch.distributed across all processes in the mesh
        logger.info("Setting up distributed environment...")
        await setup_env_for_distributed(proc_mesh)

        # 5. Spawn TrainerActor on each GPU
        #    Each process in the mesh creates its own TrainerActor instance
        logger.info("Spawning trainer actors...")
        trainer = proc_mesh.spawn(
            "trainer_actor",  # Name for the actor group
            TrainerActor,  # Actor class to instantiate
            job_config,  # Arguments to __init__
        )

        # 6. Execute the training job across all actors
        #    The .call() method invokes start_training() on all actors in parallel
        logger.info("Starting distributed training...")
        await trainer.start_training.call()

        logger.info("Training completed successfully!")

    except Exception as e:
        logger.error(f"Training workflow failed: {e}")

    finally:
        # Always clean up the machine allocation
        if slurm_job:
            await cleanup_job(slurm_job)

步驟 5:清理資源#

訓練完成後(或者如果您已完成實驗),重要的是透過終止 SLURM 作業來釋放叢集資源。

Monarch 亮點:雖然您可以在開發過程中將分配保持為活動狀態以進行多次訓練執行,但請務必記住釋放叢集資源。

async def cleanup_job(job: JobTrait) -> None:
    """
    This function cancels the SLURM job, releasing all reserved nodes back
    to the cluster for other users.

    Args:
        job: A JobTrait, like the one returned from create_slurm_job()

    Note:
        The job will also terminate automatically when the configured TTL
        is exceeded, but explicit cleanup is recommended for long-running
        notebooks or scripts.
    """
    job.kill()
    logger.info("Job terminated successfully")

步驟 6:執行完整流水線#

最後,我們將所有內容在一個主函式中連線起來,以啟動工作流。

import asyncio


if __name__ == "__main__":
    """
    Run the complete workflow: reserve resources, train, and cleanup.
    """
    logger.info("Starting Monarch + TorchTitan Distributed Training")

    asyncio.run(execute_training())

    logger.info("Workflow completed!")

結論#

恭喜!在本教程中,您學習瞭如何將 Monarch 的 Actor 框架與 TorchTitan 結合用於可擴充套件的分散式訓練。

進一步閱讀

  • Monarch 還與 TorchFT 整合,為複製工作節點提供逐步容錯。您可以在 TorchFT repo 中找到此整合的全面概念驗證

  • 對於涵蓋與本教程類似主題的互動式筆記本,請參考此 Monarch 示例