評價此頁

Elastic Agent#

創建於: May 04, 2021 | 最後更新於: Jun 07, 2025

Server#

Elastic Agent 是 torchelastic 的控制平面。

它是一個啟動和管理底層工作程序的程序。Agent 負責

  1. 與分散式 PyTorch 協同工作:工作程序以所有必要資訊啟動,以便成功且輕鬆地呼叫 torch.distributed.init_process_group()

  2. 容錯:監控工作程序,並在檢測到工作程序失敗或不健康時,終止所有工作程序並重新啟動所有程序。

  3. 彈性:響應成員資格變化並使用新成員重新啟動工作程序。

最簡單的 Agent 是按節點部署的,並與本地程序協同工作。更高階的 Agent 可以遠端啟動和管理工作程序。Agent 可以是完全去中心化的,基於它所管理的程序進行決策。也可以是協調的,與其他 Agent(管理同一作業中工作程序的 Agent)通訊以做出集體決策。

下圖是管理本地工作程序組的 Agent 的示意圖。

../_images/agent_diagram.jpg

Concepts#

本節描述了與理解 agent 在 torchelastic 中的作用相關的高層類和概念。

class torch.distributed.elastic.agent.server.ElasticAgent[source]#

負責管理一個或多個工作程序的 Agent 程序。

工作程序被假定為標準的分散式 PyTorch 指令碼。當工作程序由 Agent 建立時,Agent 提供必要的資訊,以便工作程序能夠正確地初始化一個 PyTorch 程序組。

確切的部署拓撲和 Agent 與工作程序的比例取決於 Agent 的具體實現和使用者的作業放置偏好。例如,要在 GPU 上執行具有 8 個訓練器的分散式訓練作業(每個 GPU 一個),可以

  1. 使用 8 個 x 單 GPU 例項,每個例項放置一個 Agent,管理每個 Agent 的 1 個工作程序。

  2. 使用 4 個 x 雙 GPU 例項,每個例項放置一個 Agent,管理每個 Agent 的 2 個工作程序。

  3. 使用 2 個 x 四 GPU 例項,每個例項放置一個 Agent,管理每個 Agent 的 4 個工作程序。

  4. 使用 1 個 x 8 GPU 例項,每個例項放置一個 Agent,管理每個 Agent 的 8 個工作程序。

用法

group_result = agent.run()
 if group_result.is_failed():
   # workers failed
   failure = group_result.failures[0]
   logger.exception("worker 0 failed with exit code : %s", failure.exit_code)
 else:
   return group_result.return_values[0] # return rank 0's results
abstract get_worker_group(role='default')[source]#

返回給定 roleWorkerGroup

請注意,WorkerGroup 是一個可變物件,因此在多執行緒/多程序環境中,它可能會改變狀態。鼓勵(但不要求)實現者返回一個防禦性的只讀副本。

返回型別

WorkerGroup

abstract run(role='default')[source]#

執行 Agent。

支援在出現故障時最多重試 max_restarts 次工作程序組。

返回

執行結果,包含每個工作程序(按其全域性 rank 對映)的返回值或失敗詳情。

引發

Exception - any other failures NOT related to worker process

返回型別

RunResult

class torch.distributed.elastic.agent.server.WorkerSpec(role, local_world_size, rdzv_handler, fn=None, entrypoint=None, args=(), max_restarts=3, monitor_interval=0.1, master_port=None, master_addr=None, local_addr=None, event_log_handler='null', numa_options=None)[source]#

關於特定型別工作程序的藍圖資訊。

對於給定的角色,必須只有一個 WorkerSpec。WorkerSpec 預計在所有節點(機器)上都是同質的,即每個節點執行相同數量的特定 Spec 的工作程序。

引數
  • role (str) – 使用者定義的 Spec 工作程序的角色

  • local_world_size (int) – 要執行的本地工作程序數量

  • fn (Optional[Callable]) – (已棄用,請使用 entrypoint)

  • entrypoint (Optional[Union[Callable, str]]) – 工作程序函式或命令

  • args (tuple) – 傳遞給 entrypoint 的引數

  • rdzv_handler (RendezvousHandler) – 處理此組工作程序的 rdzv

  • max_restarts (int) – 工作程序的最大重試次數

  • monitor_interval (float) – 每 n 秒監控一次工作程序的狀態

  • master_port (Optional[int]) – rank 0 上 c10d 儲存的固定埠,如果未指定,則選擇一個隨機的可用埠

  • master_addr (Optional[str]) – rank 0 上 c10d 儲存的固定 master_addr,如果未指定,則選擇 Agent rank 0 的主機名

  • redirects – 將標準流重定向到檔案,透過傳遞對映選擇性地重定向特定本地 rank 的流

  • tee – 將指定標準流(或多個流)複製到控制檯 + 檔案,透過傳遞對映選擇性地為特定本地 rank 進行復制,該選項優先於 redirects 設定。

  • event_log_handler (str) – 事件日誌處理程式在 elastic/events/handlers.py 中註冊的名稱。

get_entrypoint_name()[source]#

獲取入口點名稱。

如果入口點是函式(例如 Callable),則返回其 __qualname__;如果入口點是二進位制檔案(例如 str),則返回二進位制檔名。

class torch.distributed.elastic.agent.server.WorkerState(value)[source]#

WorkerGroup 的狀態。

WorkerGroup 中的工作程序作為一個整體改變狀態。如果 WorkerGroup 中的單個工作程序失敗,則整個組被視為失敗。

UNKNOWN - agent lost track of worker group state, unrecoverable
INIT - worker group object created not yet started
HEALTHY - workers running and healthy
UNHEALTHY - workers running and unhealthy
STOPPED - workers stopped (interrupted) by the agent
SUCCEEDED - workers finished running (exit 0)
FAILED - workers failed to successfully finish (exit !0)

工作程序組從初始 INIT 狀態開始,然後進展到 HEALTHYUNHEALTHY 狀態,最終達到終止狀態 SUCCEEDEDFAILED

工作程序組可以被 Agent 中斷並暫時置於 STOPPED 狀態。處於 STOPPED 狀態的工作程序將在不久的將來由 Agent 重新安排啟動。將工作程序置於 STOPPED 狀態的一些例子包括:

  1. 工作程序組失敗 | 檢測到不健康

  2. 檢測到成員資格變化

當對工作程序組執行操作(啟動、停止、rdzv、重試等)失敗,並且導致操作部分應用於工作程序組時,狀態將為 UNKNOWN。通常這發生在 Agent 狀態變更事件期間未捕獲/未處理的異常。Agent 不期望恢復處於 UNKNOWN 狀態的工作程序組,最好是自行終止,讓作業管理器重新嘗試該節點。

static is_running(state)[source]#

返回工作程序的狀態。

返回

如果工作程序狀態代表仍在執行的工作程序(例如,程序存在但並非一定健康),則返回 True。

返回型別

布林值

class torch.distributed.elastic.agent.server.Worker(local_rank, global_rank=-1, role_rank=-1, world_size=-1, role_world_size=-1)[source]#

一個工作程序例項。

將此與 WorkerSpec 進行對比,後者代表工作程序的規範。一個 Worker 是從一個 WorkerSpec 建立的。Worker 之於 WorkerSpec 就像物件之於類。

工作程序的 idElasticAgent 的特定實現來解釋。對於本地 Agent,它可能是工作程序的 pid (int),對於遠端 Agent,它可能被編碼為 host:port (string)

引數
  • id (Any) – 唯一標識一個工作程序(由 Agent 解釋)

  • local_rank (int) – 工作程序的本地 rank

  • global_rank (int) – 工作程序的全域性 rank

  • role_rank (int) – 具有相同角色的所有工作程序中的工作程序 rank

  • world_size (int) – 工作程序總數(全域性)

  • role_world_size (int) – 具有相同角色的工作程序數量

class torch.distributed.elastic.agent.server.WorkerGroup(spec)[source]#

一組 Worker 例項。

該類定義了由 ElasticAgent 管理的、針對給定 WorkerSpec 的一組 Worker 例項。工作程序組是否包含跨例項的工作程序取決於 Agent 的實現。

Implementations#

以下是 torchelastic 提供的 Agent 實現。

class torch.distributed.elastic.agent.server.local_elastic_agent.LocalElasticAgent(spec, logs_specs, start_method='spawn', exit_barrier_timeout=300, log_line_prefix_template=None)[source]#

一個 torchelastic.agent.server.ElasticAgent 的實現,該實現處理主機本地工作程序。

此 Agent 按主機部署,並配置為生成 n 個工作程序。在使用 GPU 時,n 對映到主機上可用的 GPU 數量。

本地 Agent 不與其他主機上的本地 Agent 通訊,即使工作程序可能進行跨主機通訊。工作程序 ID 被解釋為本地程序。Agent 將所有工作程序作為一個單元啟動和停止。

傳遞給工作程序函式的工作程序函式和引數必須與 Python 的 multiprocessing 相容。要將 multiprocessing 資料結構傳遞給工作程序,您可以建立與指定 start_method 相同的 multiprocessing 上下文中的資料結構,並將其作為函式引數傳遞。

exit_barrier_timeout 指定等待其他 Agent 完成的時間(以秒為單位)。這充當了處理工作程序不同時完成的情況的安全網,以防止 Agent 將過早完成的工作程序視為縮減事件。強烈建議使用者程式碼處理確保工作程序同步終止,而不是依賴 exit_barrier_timeout。

如果 Agent 程序中定義了環境變數 TORCHELASTIC_ENABLE_FILE_TIMER 且其值為 1,則可以在 `LocalElasticAgent` 中啟用基於命名管道的看門狗。另外,可以設定另一個環境變數 `TORCHELASTIC_TIMER_FILE`,其中包含命名管道的唯一檔名。如果未設定環境變數 `TORCHELASTIC_TIMER_FILE``LocalElasticAgent` 將內部建立唯一檔名並將其設定為環境變數 `TORCHELASTIC_TIMER_FILE`,並且此環境變數將被傳播到工作程序,以便它們可以連線到 `LocalElasticAgent` 使用的相同命名管道。

日誌將寫入指定的日誌目錄。預設情況下,每行日誌將以 [${role_name}${local_rank}]:(例如 [trainer0]: foobar)作為字首。可以透過將 模板字串 作為 log_line_prefix_template 引數傳遞來定製日誌字首。執行時會替換以下宏(識別符號):${role_name}, ${local_rank}, ${rank}。例如,要將每行日誌字首為全域性 rank 而非本地 rank,請設定 log_line_prefix_template = "[${rank}]:

示例啟動函式

def trainer(args) -> str:
    return "do train"

def main():
    start_method="spawn"
    shared_queue= multiprocessing.get_context(start_method).Queue()
    spec = WorkerSpec(
                role="trainer",
                local_world_size=nproc_per_process,
                entrypoint=trainer,
                args=("foobar",),
                ...<OTHER_PARAMS...>)
    agent = LocalElasticAgent(spec, start_method)
    results = agent.run()

    if results.is_failed():
        print("trainer failed")
    else:
        print(f"rank 0 return value: {results.return_values[0]}")
        # prints -> rank 0 return value: do train

示例啟動二進位制檔案

def main():
    spec = WorkerSpec(
                role="trainer",
                local_world_size=nproc_per_process,
                entrypoint="/usr/local/bin/trainer",
                args=("--trainer-args", "foobar"),
                ...<OTHER_PARAMS...>)
    agent = LocalElasticAgent(spec)
    results = agent.run()

    if not results.is_failed():
        print("binary launches do not have return values")

Extending the Agent#

要擴充套件 Agent,可以直接實現 ElasticAgent,但我們建議您改用 SimpleElasticAgent,它提供了大部分腳手架,只留下幾個特定的抽象方法供您實現。

class torch.distributed.elastic.agent.server.SimpleElasticAgent(spec, exit_barrier_timeout=300)[source]#

一個管理一個特定型別工作程序角色的 ElasticAgent

一個 ElasticAgent,它管理一個 WorkerSpec 的工作程序(WorkerGroup),例如一種特定型別的工作程序角色。

_assign_worker_ranks(store, group_rank, group_world_size, spec)[source]#

確定工作程序的正確 rank。

快速路徑:當所有工作程序都具有相同的角色和 world size 時。我們計算全域性 rank 為 group_rank * group_world_size + local_rank。並且 role_world_sizeglobal_world_size 相同。此情況下不使用 TCP 儲存。僅當用戶設定環境變數 TORCH_ELASTIC_WORKER_IDENTICAL 為 1 時才啟用此選項。

時間複雜度:每個工作程序 O(1),整體 O(1)

慢速路徑:當工作程序具有不同的角色和 world size 時。我們使用以下演算法

  1. 每個 Agent 將其配置(group_rank、group_world_size、num_workers)寫入公共儲存。

  2. rank 0 Agent 從儲存中讀取所有 role_info 並確定每個 Agent 的工作程序 rank。

  3. 確定全域性 rank:工作程序的全域性 rank 透過累加其前面的所有工作程序的 local_world_size 來計算。出於效率原因,每個工作程序被分配一個基礎全域性 rank,使其工作程序位於 [base_global_rank, base_global_rank + local_world_size) 範圍內。

  4. 確定角色 rank:使用第 3 點中的演算法確定角色 rank,但計算 rank 時是相對於角色名稱的。

  5. rank 0 Agent 將分配的 rank 寫入儲存。

  6. 每個 Agent 從儲存中讀取分配的 rank。

時間複雜度:每個工作程序 O(1),rank0 O(n),整體 O(n)

返回型別

list[torch.distributed.elastic.agent.server.api.Worker]

_exit_barrier()[source]#

定義一個屏障,保持 Agent 程序存活直到所有工作程序完成。

等待 exit_barrier_timeout 秒,直到所有 Agent 完成其本地工作程序的執行(無論成功與否)。這充當了處理使用者指令碼終止時間不同的安全措施。

_initialize_workers(worker_group)[source]#

為 worker_group 啟動一組新的工作程序。

本質上,這是一個 rendezvous,然後是一個 start_workers。呼叫者應首先呼叫 _stop_workers() 來停止正在執行的工作程序,然後再呼叫此方法。

樂觀地將剛啟動的工作程序組的狀態設定為 HEALTHY,並將實際的狀態監控委託給 _monitor_workers() 方法。

abstract _monitor_workers(worker_group)[source]#

檢查 worker_group 的工作程序。

此函式還返回工作程序組的新狀態。

返回型別

RunResult

_rendezvous(worker_group)[source]#

為 worker spec 指定的工作程序執行 rendezvous。

為工作程序分配新的全域性 rank 和 world size。更新工作程序組的 rendezvous 儲存。

_restart_workers(worker_group)[source]#

重新啟動(停止、rendezvous、啟動)組中所有本地工作程序。

abstract _shutdown(death_sig=Signals.SIGTERM)[source]#

清理 Agent 工作期間分配的任何資源。

引數

death_sig (Signals) – 傳送給子程序的訊號,預設為 SIGTERM

abstract _start_workers(worker_group)[source]#

根據 worker group 的 worker spec 啟動 worker_group.spec.local_world_size 個工作程序。

返回一個從 local_rank 到工作程序 id 的對映。

返回型別

dict[int, Any]

abstract _stop_workers(worker_group)[source]#

停止給定工作程序組中的所有工作程序。

實現者必須處理 WorkerState 定義的所有狀態的工作程序。也就是說,它必須優雅地處理停止不存在的工作程序、不健康(卡住)的工作程序等。

class torch.distributed.elastic.agent.server.api.RunResult(state, return_values=<factory>, failures=<factory>)[source]#

返回工作程序執行的結果。

Run 結果遵循“全有或全無”策略,即當且僅當此 Agent 管理的所有本地工作程序都成功完成時,執行才算成功。

如果結果成功(例如 is_failed() = False),則 return_values 欄位包含THIS Agent 管理的工作程序的輸出(返回值),按其全域性 rank 對映。也就是說 result.return_values[0] 是全域性 rank 0 的返回值。

注意

return_values 僅在工作程序入口點是函式時才有意義。指定為二進位制入口點的程序沒有規範的返回值,並且 return_values 欄位沒有意義,可能為空。

如果 is_failed() 返回 True,則 failures 欄位包含失敗資訊,同樣按失敗工作程序的全域性 rank 對映。

return_valuesfailures 中的鍵是互斥的,也就是說,工作程序的最終狀態只能是成功或失敗之一。被 Agent 根據 Agent 的重啟策略有意終止的工作程序,在 return_valuesfailures 中均不表示。

Watchdog in the Agent#

如果 Agent 程序中定義了環境變數 TORCHELASTIC_ENABLE_FILE_TIMER 且其值為 1,則可以在 LocalElasticAgent 中啟用基於命名管道的看門狗。另外,可以設定另一個環境變數 TORCHELASTIC_TIMER_FILE,其中包含命名管道的唯一檔名。如果未設定環境變數 TORCHELASTIC_TIMER_FILELocalElasticAgent 將內部建立唯一檔名並將其設定為環境變數 TORCHELASTIC_TIMER_FILE,並且此環境變數將被傳播到工作程序,以便它們可以連線到 LocalElasticAgent 使用的相同命名管道。

Health Check Server#

如果 Agent 程序中定義了環境變數 TORCHELASTIC_HEALTH_CHECK_PORT,則可以在 LocalElasticAgent 中啟用健康檢查監控伺服器。為健康檢查伺服器提供了介面,可以透過啟動指定埠的 TCP/HTTP 伺服器進行擴充套件。此外,健康檢查伺服器將具有檢檢視門狗是否存活的回撥。

class torch.distributed.elastic.agent.server.health_check_server.HealthCheckServer(alive_callback, port, timeout)[source]#

健康檢查監控伺服器的介面,可以透過在指定埠啟動 TCP/HTTP 伺服器來擴充套件。

引數
  • alive_callback (Callable[[], int]) – Callable[[], int],Agent 的最後活動時間回撥

  • port (int) – int,啟動 TCP/HTTP 伺服器的埠號

  • timeout (int) – int,用於確定 Agent 是否存活/死亡的超時秒數

start()[source]#

不支援 Pytorch 的功能,不啟動任何健康檢查伺服器

stop()[source]#

停止健康檢查伺服器的函式

torch.distributed.elastic.agent.server.health_check_server.create_healthcheck_server(alive_callback, port, timeout)[source]#

建立健康檢查伺服器物件

返回型別

HealthCheckServer