使用 Monarch 實現互動式分散式應用#
作者: Amir Afzali
簡介#
隨著深度學習模型規模和複雜度的不斷增長,高效地訓練它們需要跨多個 GPU 和節點協調計算。在本教程中,您將學習如何使用 Monarch 的 Actor 框架以及 TorchTitan,在一個由 SLURM 管理的叢集上輕鬆設定和執行大規模分散式工作流。Monarch 將使我們能夠像在單臺主機、單程序環境中開發一樣,驅動一個大型機器叢集(組織成一個網格)。
什麼是 Monarch?#
Monarch 是一個 Actor 框架,旨在簡化分散式應用程式的開發。其核心是,Monarch 提供了:
基於 Actor 的程式設計模型:將有狀態計算封裝在可以遠端程序和機器上執行的 Actor 中
程序網格抽象:輕鬆管理和協調叢集中的分散式程序,支援可伸縮的 Actor 訊息傳遞
容錯性:Actor 和程序構成一個樹狀結構,故障會沿著樹向上傳播,提供良好的預設錯誤行為並實現細粒度的故障恢復。
靈活的資源管理:支援多種叢集排程器,包括 SLURM、Kubernetes、自定義主機管理和本地程序
整合監控:將來自遠端程序的日誌流式傳輸回客戶端,便於除錯和聚合
有關更多詳細資訊,請參閱 Monarch 文件。
為什麼使用 Monarch?#
TorchTitan 是一個 PyTorch 原生的大規模預訓練庫。雖然 TorchTitan 提供了出色的分散式訓練原語,但在叢集上啟動和管理這些作業可能會減慢迭代速度。Monarch 透過以下方式解決了這個問題:
簡化的叢集互動:使用簡單的非同步 Python 呼叫來預留和管理計算資源,而不是編寫 bash 指令碼
互動式開發:在現有分配上修改和重新執行訓練程式碼,而無需等待新資源
統一的工作流:使用相同的程式碼在本地測試和叢集執行之間無縫切換
先決條件#
本教程依賴於 Titan 的 nightly build,因此請確保其他 Torch 庫也跟蹤 nightly builds。
已安裝 Monarch nightly 版本: 安裝指令碼
已安裝 TorchTitan nightly 版本: TorchTitan 安裝說明
在您的工作目錄中**一個有效的 Titan 模型配置檔案**和**分詞器**(例如,來自 TorchTitan 配置 的 `debug_model.toml`)。
SLURM 叢集訪問許可權
擁有預留節點和啟動作業的足夠許可權。
為分散式 GPU 訓練配置的 CUDA 環境。
現在,讓我們一步步來實現!
步驟 1:預留機器資源#
首先,我們將定義一個函式以程式設計方式預留機器分配。
Monarch 亮點:您無需提交 SBATCH 指令碼,就可以從 Python 中互動式地預留和管理資源。JobTrait 設計模式允許透過一致的 API 與自定義排程器(如 SLURM 和 Kubernetes)進行互動。
from monarch.job import SlurmJob, JobTrait
def create_slurm_job(
mesh_name: str,
num_nodes: int,
gpus_per_node: int,
time_limit: str = "06:00:00"
) -> SlurmJob:
"""
Args:
mesh_name: Name assigned to the primary mesh for this example.
A JobTrait can consist of multiple meshes, and
Monarch allows for re-attaching to ongoing jobs.
num_nodes: Number of nodes allocated per mesh
gpus_per_node: Number of GPUs per node in the mesh
Note: SlurmJob is just one instance of a Monarch scheduler interface.
Consult the JobTrait documentation to find one that's right for your usecase.
"""
default_job_name = "monarch_titan"
return SlurmJob(
meshes={mesh_name: num_nodes},
job_name=default_job_name,
time_limit=time_limit,
gpus_per_nodes=gpus_per_node,
# ... additional args can be passed here
)
步驟 2:定義 Trainer Actor#
現在我們建立一個 Monarch Actor 來封裝 TorchTitan 的 Trainer。這是允許 TorchTitan 在 Monarch 的分散式環境中執行的關鍵抽象。
Monarch 亮點:Actor 模式提供了幾個好處:
遠端執行:帶有 @endpoint 裝飾器的函式可以遠端呼叫
生命週期管理:Monarch 負責初始化、執行和清理
錯誤處理:異常會被正確地傳播回客戶端,支援漸進式錯誤處理
import torch
from monarch.actor import Actor, current_rank, endpoint
from monarch.utils import setup_env_for_distributed
from torchtitan.tools.logging import init_logger, logger
from torchtitan.train import Trainer
class TrainerActor(Actor):
"""
Monarch Actor wrapper for TorchTitan's Trainer.
This actor encapsulates a complete TorchTitan training process, handling
initialization, training loop execution, and cleanup. Each instance runs
on a single GPU in the distributed training job.
The actor's lifetime:
1. __init__: Initialize with job configuration
2. start_training:
Execute the training loop
Destroy process group and release resources
Attributes:
job_config: TorchTitan configuration for this trainer
uid: Unique identifier for logging (includes rank)
"""
def __init__(self, job_config: "JobConfig") -> None:
"""
Initialize the trainer actor.
Args:
job_config: TorchTitan JobConfig with training parameters
"""
self.job_config = job_config
# current_rank() provides access to this actor's rank in the process mesh
self.rank = current_rank().rank
self.uid = f"[trainer_{rank}]"
@endpoint
async def ping_rank(self) -> None:
"""
A dummy logging function we will use for demonstration purposes.
"""
logger.info(f"{self.uid} Ping!")
@endpoint
async def start_training(self) -> None:
"""
Execute the TorchTitan training loop.
This remote endpoint:
1. Initializes TorchTitan's logger
2. Creates a Trainer instance with the job configuration
3. Runs the training loop
4. Handles cleanup and error conditions
The @endpoint decorator makes this method callable from the Monarch
client, even though it runs on a remote GPU node.
Raises:
Exception: Any exception from TorchTitan training is propagated
back to the client
"""
init_logger()
trainer: Trainer | None = None
try:
# Initialize TorchTitan trainer
trainer = Trainer(self.job_config)
logger.info(f"{self.uid} initialized successfully and starting training")
# Run the training loop
trainer.train()
except Exception as e:
logger.error(f"{self.uid} training failed: {e}")
if trainer:
trainer.close()
# Note: error is propagated back to the controller
raise e
else:
# Training completed successfully
trainer.close()
logger.info(f"{self.uid} training completed successfully")
finally:
# Clean up distributed process group
torch.distributed.destroy_process_group()
logger.info(f"{self.uid} trainer cleaned up")
Actor 端點可以以多種模式呼叫。我們將在 步驟 4:執行訓練工作流 中探討一個具體的例子,但這裡有一些常見用法的虛擬碼:
try:
# where mesh0 is made of N nodes, each node having 8 GPUs
proc_mesh = mesh0.spawn_procs({"gpus": 8})
trainer_actors = proc_mesh.spawn("trainers", TrainerActor, ...)
# Call on all ranks
await trainer_actors.ping_rank.call()
# Call-and-forget on all ranks
trainer_actors.ping_rank.broadcast()
# Call on ONE random rank
await trainer_actors.ping_rank.choose()
# Call on the first 3 ranks of node 0
await trainer_actors.slice(hosts=0, gpus=slice(0, 3)).ping_rank.call()
except Exception as e:
# handle SupervisionEvents from remote actor failures
pass
遠端 Actor 端點還可以利用 Python 原生斷點,支援互動式除錯會話。有關 Monarch 偵錯程式的完整深入介紹,請參考文件。
@endpoint
async def ping_debuggable_rank(self) -> None:
logger.info(f"{self.uid} Ping!")
if self.rank == 0:
breakpoint()
logger.info(f"{self.uid} Pong!")
步驟 3:定義訓練引數#
接下來,我們為訓練作業和叢集資源定義一些通用引數。此配置決定了訓練的規模(節點數和 GPU 數)以及一些訓練超引數。
from dataclasses import dataclass
@dataclass
class RunParams:
"""
Configuration for cluster resources and training parameters.
Attributes:
training_steps: Number of training iterations to run
model_config: Path to TorchTitan model configuration file
tokenizer: Path to tokenizer directory
dataset: Dataset to use for training (e.g., 'c4', 'c4_test')
num_nodes: Number of compute nodes to request
gpus_per_node: Number of GPUs per node
Adjust these values based on your model size and available resources.
"""
training_steps: int = 50
model_config: str = "debug_model.toml"
tokenizer: str = "tokenizer"
dataset: str = "c4"
num_nodes: int = 2
gpus_per_node: int = 8
TorchTitan 使用 JobConfig 物件來控制訓練的所有方面。這裡我們建立一個函式,從 RunParams 解析此配置。
import os
from torchtitan.config import ConfigManager, JobConfig
def make_job_config() -> JobConfig:
"""
Create a TorchTitan JobConfig from RunParams.
This function constructs the complete training configuration, including
parallelism settings, model architecture, and dataset paths
"""
# Calculate total parallelism based on cluster size
data_parallel_shard_degree = RunParams.num_nodes * RunParams.gpus_per_node
output_path = "./outputs"
# Construct paths relative to script directory
script_dir = os.getcwd()
# Build argument list for TorchTitan's ConfigManager
# These override defaults from the model config file
default_args = [
"--job.config_file",
os.path.join(script_dir, RunParams.model_config),
"--model.tokenizer_path",
os.path.join(script_dir, RunParams.tokenizer),
"--parallelism.data_parallel_shard_degree",
str(data_parallel_shard_degree),
"--training.steps",
str(RunParams.training_steps),
"--training.dataset",
RunParams.dataset,
"--job.dump_folder",
output_path,
# continue to configure as needed
]
config_manager = ConfigManager()
job_config = config_manager.parse_args(default_args)
return job_config
步驟 4:執行訓練工作流#
在定義了所有元件後,我們現在來協調整個工作流。這是 Monarch 的強大功能最突出的地方。
Monarch 亮點:
互動式迭代:預留機器分配後,您可以調整邏輯並重新啟動 Actor,而無需請求新資源。SLURM 的共享檔案系統可確保框架/工作區更改在工作節點之間同步。
透明日誌記錄:所有來自遠端工作節點的日誌都會即時流式傳輸回您的客戶端,使除錯感覺就像在本地執行一樣
工作流:
預留機器 → 建立程序網格 → 配置日誌 → 啟動 Actor → 訓練 → 清理
async def execute_training() -> None:
"""
Execute the complete distributed training workflow.
"""
job_config = make_job_config()
slurm_job = None
mesh_name = "mesh0"
try:
# 1. Create a SLURM job with N nodes
# This leverages Monarch to reserve a persistent machine allocation
slurm_job = create_slurm_job(mesh_name, RunParams.num_nodes, RunParams.gpus_per_node)
job_state = slurm_job.state()
# 2. Create a process mesh on the machine allocation
# This creates one process per GPU across all allocated nodes
logger.info("Creating process mesh...")
proc_mesh = job_state.mesh0.spawn_procs({"gpus": RunParams.gpus_per_node})
# 3. Configure remote logging behavior
# - stream_to_client: Forward all remote logs to your local console
# - aggregate_window_sec: Batch logs for efficiency
logger.info("Configuring logging...")
await proc_mesh.logging_option(
stream_to_client=True,
# aggregate_window_sec=None # Uncomment to disable log batching
)
# 4. Setup environment for torch.distributed
# This configures torch.distributed across all processes in the mesh
logger.info("Setting up distributed environment...")
await setup_env_for_distributed(proc_mesh)
# 5. Spawn TrainerActor on each GPU
# Each process in the mesh creates its own TrainerActor instance
logger.info("Spawning trainer actors...")
trainer = proc_mesh.spawn(
"trainer_actor", # Name for the actor group
TrainerActor, # Actor class to instantiate
job_config, # Arguments to __init__
)
# 6. Execute the training job across all actors
# The .call() method invokes start_training() on all actors in parallel
logger.info("Starting distributed training...")
await trainer.start_training.call()
logger.info("Training completed successfully!")
except Exception as e:
logger.error(f"Training workflow failed: {e}")
finally:
# Always clean up the machine allocation
if slurm_job:
await cleanup_job(slurm_job)
步驟 5:清理資源#
訓練完成後(或者如果您已完成實驗),重要的是透過終止 SLURM 作業來釋放叢集資源。
Monarch 亮點:雖然您可以在開發過程中將分配保持為活動狀態以進行多次訓練執行,但請務必記住釋放叢集資源。
async def cleanup_job(job: JobTrait) -> None:
"""
This function cancels the SLURM job, releasing all reserved nodes back
to the cluster for other users.
Args:
job: A JobTrait, like the one returned from create_slurm_job()
Note:
The job will also terminate automatically when the configured TTL
is exceeded, but explicit cleanup is recommended for long-running
notebooks or scripts.
"""
job.kill()
logger.info("Job terminated successfully")
步驟 6:執行完整流水線#
最後,我們將所有內容在一個主函式中連線起來,以啟動工作流。
import asyncio
if __name__ == "__main__":
"""
Run the complete workflow: reserve resources, train, and cleanup.
"""
logger.info("Starting Monarch + TorchTitan Distributed Training")
asyncio.run(execute_training())
logger.info("Workflow completed!")
結論#
恭喜!在本教程中,您學習瞭如何將 Monarch 的 Actor 框架與 TorchTitan 結合用於可擴充套件的分散式訓練。
進一步閱讀
Monarch 還與 TorchFT 整合,為複製工作節點提供逐步容錯。您可以在 TorchFT repo 中找到此整合的全面概念驗證。
對於涵蓋與本教程類似主題的互動式筆記本,請參考此 Monarch 示例。