Elastic Agent#
創建於: May 04, 2021 | 最後更新於: Jun 07, 2025
Server#
Elastic Agent 是 torchelastic 的控制平面。
它是一個啟動和管理底層工作程序的程序。Agent 負責
與分散式 PyTorch 協同工作:工作程序以所有必要資訊啟動,以便成功且輕鬆地呼叫
torch.distributed.init_process_group()。容錯:監控工作程序,並在檢測到工作程序失敗或不健康時,終止所有工作程序並重新啟動所有程序。
彈性:響應成員資格變化並使用新成員重新啟動工作程序。
最簡單的 Agent 是按節點部署的,並與本地程序協同工作。更高階的 Agent 可以遠端啟動和管理工作程序。Agent 可以是完全去中心化的,基於它所管理的程序進行決策。也可以是協調的,與其他 Agent(管理同一作業中工作程序的 Agent)通訊以做出集體決策。
下圖是管理本地工作程序組的 Agent 的示意圖。
Concepts#
本節描述了與理解 agent 在 torchelastic 中的作用相關的高層類和概念。
- class torch.distributed.elastic.agent.server.ElasticAgent[source]#
負責管理一個或多個工作程序的 Agent 程序。
工作程序被假定為標準的分散式 PyTorch 指令碼。當工作程序由 Agent 建立時,Agent 提供必要的資訊,以便工作程序能夠正確地初始化一個 PyTorch 程序組。
確切的部署拓撲和 Agent 與工作程序的比例取決於 Agent 的具體實現和使用者的作業放置偏好。例如,要在 GPU 上執行具有 8 個訓練器的分散式訓練作業(每個 GPU 一個),可以
使用 8 個 x 單 GPU 例項,每個例項放置一個 Agent,管理每個 Agent 的 1 個工作程序。
使用 4 個 x 雙 GPU 例項,每個例項放置一個 Agent,管理每個 Agent 的 2 個工作程序。
使用 2 個 x 四 GPU 例項,每個例項放置一個 Agent,管理每個 Agent 的 4 個工作程序。
使用 1 個 x 8 GPU 例項,每個例項放置一個 Agent,管理每個 Agent 的 8 個工作程序。
用法
group_result = agent.run() if group_result.is_failed(): # workers failed failure = group_result.failures[0] logger.exception("worker 0 failed with exit code : %s", failure.exit_code) else: return group_result.return_values[0] # return rank 0's results
- class torch.distributed.elastic.agent.server.WorkerSpec(role, local_world_size, rdzv_handler, fn=None, entrypoint=None, args=(), max_restarts=3, monitor_interval=0.1, master_port=None, master_addr=None, local_addr=None, event_log_handler='null', numa_options=None)[source]#
關於特定型別工作程序的藍圖資訊。
對於給定的角色,必須只有一個 WorkerSpec。WorkerSpec 預計在所有節點(機器)上都是同質的,即每個節點執行相同數量的特定 Spec 的工作程序。
- 引數
role (str) – 使用者定義的 Spec 工作程序的角色
local_world_size (int) – 要執行的本地工作程序數量
args (tuple) – 傳遞給
entrypoint的引數rdzv_handler (RendezvousHandler) – 處理此組工作程序的 rdzv
max_restarts (int) – 工作程序的最大重試次數
monitor_interval (float) – 每
n秒監控一次工作程序的狀態master_port (Optional[int]) – rank 0 上 c10d 儲存的固定埠,如果未指定,則選擇一個隨機的可用埠
master_addr (Optional[str]) – rank 0 上 c10d 儲存的固定 master_addr,如果未指定,則選擇 Agent rank 0 的主機名
redirects – 將標準流重定向到檔案,透過傳遞對映選擇性地重定向特定本地 rank 的流
tee – 將指定標準流(或多個流)複製到控制檯 + 檔案,透過傳遞對映選擇性地為特定本地 rank 進行復制,該選項優先於
redirects設定。event_log_handler (str) – 事件日誌處理程式在 elastic/events/handlers.py 中註冊的名稱。
- class torch.distributed.elastic.agent.server.WorkerState(value)[source]#
WorkerGroup的狀態。WorkerGroup 中的工作程序作為一個整體改變狀態。如果 WorkerGroup 中的單個工作程序失敗,則整個組被視為失敗。
UNKNOWN - agent lost track of worker group state, unrecoverable INIT - worker group object created not yet started HEALTHY - workers running and healthy UNHEALTHY - workers running and unhealthy STOPPED - workers stopped (interrupted) by the agent SUCCEEDED - workers finished running (exit 0) FAILED - workers failed to successfully finish (exit !0)
工作程序組從初始
INIT狀態開始,然後進展到HEALTHY或UNHEALTHY狀態,最終達到終止狀態SUCCEEDED或FAILED。工作程序組可以被 Agent 中斷並暫時置於
STOPPED狀態。處於STOPPED狀態的工作程序將在不久的將來由 Agent 重新安排啟動。將工作程序置於STOPPED狀態的一些例子包括:工作程序組失敗 | 檢測到不健康
檢測到成員資格變化
當對工作程序組執行操作(啟動、停止、rdzv、重試等)失敗,並且導致操作部分應用於工作程序組時,狀態將為
UNKNOWN。通常這發生在 Agent 狀態變更事件期間未捕獲/未處理的異常。Agent 不期望恢復處於UNKNOWN狀態的工作程序組,最好是自行終止,讓作業管理器重新嘗試該節點。
- class torch.distributed.elastic.agent.server.Worker(local_rank, global_rank=-1, role_rank=-1, world_size=-1, role_world_size=-1)[source]#
一個工作程序例項。
將此與
WorkerSpec進行對比,後者代表工作程序的規範。一個Worker是從一個WorkerSpec建立的。Worker之於WorkerSpec就像物件之於類。工作程序的
id由ElasticAgent的特定實現來解釋。對於本地 Agent,它可能是工作程序的pid (int),對於遠端 Agent,它可能被編碼為host:port (string)。
Implementations#
以下是 torchelastic 提供的 Agent 實現。
- class torch.distributed.elastic.agent.server.local_elastic_agent.LocalElasticAgent(spec, logs_specs, start_method='spawn', exit_barrier_timeout=300, log_line_prefix_template=None)[source]#
一個
torchelastic.agent.server.ElasticAgent的實現,該實現處理主機本地工作程序。此 Agent 按主機部署,並配置為生成
n個工作程序。在使用 GPU 時,n對映到主機上可用的 GPU 數量。本地 Agent 不與其他主機上的本地 Agent 通訊,即使工作程序可能進行跨主機通訊。工作程序 ID 被解釋為本地程序。Agent 將所有工作程序作為一個單元啟動和停止。
傳遞給工作程序函式的工作程序函式和引數必須與 Python 的 multiprocessing 相容。要將 multiprocessing 資料結構傳遞給工作程序,您可以建立與指定
start_method相同的 multiprocessing 上下文中的資料結構,並將其作為函式引數傳遞。exit_barrier_timeout指定等待其他 Agent 完成的時間(以秒為單位)。這充當了處理工作程序不同時完成的情況的安全網,以防止 Agent 將過早完成的工作程序視為縮減事件。強烈建議使用者程式碼處理確保工作程序同步終止,而不是依賴 exit_barrier_timeout。如果 Agent 程序中定義了環境變數
TORCHELASTIC_ENABLE_FILE_TIMER且其值為 1,則可以在`LocalElasticAgent`中啟用基於命名管道的看門狗。另外,可以設定另一個環境變數`TORCHELASTIC_TIMER_FILE`,其中包含命名管道的唯一檔名。如果未設定環境變數`TORCHELASTIC_TIMER_FILE`,`LocalElasticAgent`將內部建立唯一檔名並將其設定為環境變數`TORCHELASTIC_TIMER_FILE`,並且此環境變數將被傳播到工作程序,以便它們可以連線到`LocalElasticAgent`使用的相同命名管道。日誌將寫入指定的日誌目錄。預設情況下,每行日誌將以
[${role_name}${local_rank}]:(例如[trainer0]: foobar)作為字首。可以透過將 模板字串 作為log_line_prefix_template引數傳遞來定製日誌字首。執行時會替換以下宏(識別符號):${role_name}, ${local_rank}, ${rank}。例如,要將每行日誌字首為全域性 rank 而非本地 rank,請設定log_line_prefix_template = "[${rank}]:。示例啟動函式
def trainer(args) -> str: return "do train" def main(): start_method="spawn" shared_queue= multiprocessing.get_context(start_method).Queue() spec = WorkerSpec( role="trainer", local_world_size=nproc_per_process, entrypoint=trainer, args=("foobar",), ...<OTHER_PARAMS...>) agent = LocalElasticAgent(spec, start_method) results = agent.run() if results.is_failed(): print("trainer failed") else: print(f"rank 0 return value: {results.return_values[0]}") # prints -> rank 0 return value: do train
示例啟動二進位制檔案
def main(): spec = WorkerSpec( role="trainer", local_world_size=nproc_per_process, entrypoint="/usr/local/bin/trainer", args=("--trainer-args", "foobar"), ...<OTHER_PARAMS...>) agent = LocalElasticAgent(spec) results = agent.run() if not results.is_failed(): print("binary launches do not have return values")
Extending the Agent#
要擴充套件 Agent,可以直接實現 ElasticAgent,但我們建議您改用 SimpleElasticAgent,它提供了大部分腳手架,只留下幾個特定的抽象方法供您實現。
- class torch.distributed.elastic.agent.server.SimpleElasticAgent(spec, exit_barrier_timeout=300)[source]#
一個管理一個特定型別工作程序角色的
ElasticAgent。一個
ElasticAgent,它管理一個WorkerSpec的工作程序(WorkerGroup),例如一種特定型別的工作程序角色。- _assign_worker_ranks(store, group_rank, group_world_size, spec)[source]#
確定工作程序的正確 rank。
快速路徑:當所有工作程序都具有相同的角色和 world size 時。我們計算全域性 rank 為 group_rank * group_world_size + local_rank。並且 role_world_size 與 global_world_size 相同。此情況下不使用 TCP 儲存。僅當用戶設定環境變數 TORCH_ELASTIC_WORKER_IDENTICAL 為 1 時才啟用此選項。
時間複雜度:每個工作程序 O(1),整體 O(1)
慢速路徑:當工作程序具有不同的角色和 world size 時。我們使用以下演算法
每個 Agent 將其配置(group_rank、group_world_size、num_workers)寫入公共儲存。
rank 0 Agent 從儲存中讀取所有 role_info 並確定每個 Agent 的工作程序 rank。
確定全域性 rank:工作程序的全域性 rank 透過累加其前面的所有工作程序的 local_world_size 來計算。出於效率原因,每個工作程序被分配一個基礎全域性 rank,使其工作程序位於 [base_global_rank, base_global_rank + local_world_size) 範圍內。
確定角色 rank:使用第 3 點中的演算法確定角色 rank,但計算 rank 時是相對於角色名稱的。
rank 0 Agent 將分配的 rank 寫入儲存。
每個 Agent 從儲存中讀取分配的 rank。
時間複雜度:每個工作程序 O(1),rank0 O(n),整體 O(n)
- _exit_barrier()[source]#
定義一個屏障,保持 Agent 程序存活直到所有工作程序完成。
等待
exit_barrier_timeout秒,直到所有 Agent 完成其本地工作程序的執行(無論成功與否)。這充當了處理使用者指令碼終止時間不同的安全措施。
- _initialize_workers(worker_group)[source]#
為 worker_group 啟動一組新的工作程序。
本質上,這是一個 rendezvous,然後是一個
start_workers。呼叫者應首先呼叫_stop_workers()來停止正在執行的工作程序,然後再呼叫此方法。樂觀地將剛啟動的工作程序組的狀態設定為
HEALTHY,並將實際的狀態監控委託給_monitor_workers()方法。
- _rendezvous(worker_group)[source]#
為 worker spec 指定的工作程序執行 rendezvous。
為工作程序分配新的全域性 rank 和 world size。更新工作程序組的 rendezvous 儲存。
- abstract _shutdown(death_sig=Signals.SIGTERM)[source]#
清理 Agent 工作期間分配的任何資源。
- 引數
death_sig (Signals) – 傳送給子程序的訊號,預設為 SIGTERM
- class torch.distributed.elastic.agent.server.api.RunResult(state, return_values=<factory>, failures=<factory>)[source]#
返回工作程序執行的結果。
Run 結果遵循“全有或全無”策略,即當且僅當此 Agent 管理的所有本地工作程序都成功完成時,執行才算成功。
如果結果成功(例如
is_failed() = False),則return_values欄位包含THIS Agent 管理的工作程序的輸出(返回值),按其全域性 rank 對映。也就是說result.return_values[0]是全域性 rank 0 的返回值。注意
return_values僅在工作程序入口點是函式時才有意義。指定為二進位制入口點的程序沒有規範的返回值,並且return_values欄位沒有意義,可能為空。如果
is_failed()返回True,則failures欄位包含失敗資訊,同樣按失敗工作程序的全域性 rank 對映。return_values和failures中的鍵是互斥的,也就是說,工作程序的最終狀態只能是成功或失敗之一。被 Agent 根據 Agent 的重啟策略有意終止的工作程序,在return_values或failures中均不表示。
Watchdog in the Agent#
如果 Agent 程序中定義了環境變數 TORCHELASTIC_ENABLE_FILE_TIMER 且其值為 1,則可以在 LocalElasticAgent 中啟用基於命名管道的看門狗。另外,可以設定另一個環境變數 TORCHELASTIC_TIMER_FILE,其中包含命名管道的唯一檔名。如果未設定環境變數 TORCHELASTIC_TIMER_FILE,LocalElasticAgent 將內部建立唯一檔名並將其設定為環境變數 TORCHELASTIC_TIMER_FILE,並且此環境變數將被傳播到工作程序,以便它們可以連線到 LocalElasticAgent 使用的相同命名管道。
Health Check Server#
如果 Agent 程序中定義了環境變數 TORCHELASTIC_HEALTH_CHECK_PORT,則可以在 LocalElasticAgent 中啟用健康檢查監控伺服器。為健康檢查伺服器提供了介面,可以透過啟動指定埠的 TCP/HTTP 伺服器進行擴充套件。此外,健康檢查伺服器將具有檢檢視門狗是否存活的回撥。
- class torch.distributed.elastic.agent.server.health_check_server.HealthCheckServer(alive_callback, port, timeout)[source]#
健康檢查監控伺服器的介面,可以透過在指定埠啟動 TCP/HTTP 伺服器來擴充套件。
- 引數