評價此頁

Rendezvous#

創建於: 2021年5月4日 | 最後更新於: 2024年5月22日

在 Torch Distributed Elastic 的上下文中,我們使用“rendezvous”(會合/碰頭)一詞來指代一種結合了分散式同步原語和對等發現的特定功能。

Torch Distributed Elastic 使用它來收集訓練作業的參與者(即節點),以便它們都就參與者列表及其各自的角色達成一致,並就何時可以開始/恢復訓練做出一致的集體決策。

Torch Distributed Elastic rendezvous 提供了以下關鍵功能:

Barrier(屏障):

執行 rendezvous 的節點將全部阻塞,直到 rendezvous 被認為完成為止——當至少有 min 個總節點加入 rendezvous 屏障(對於同一作業)時,即完成。這也意味著屏障的大小不一定是固定的。

在達到 min 個節點後,還會有一個小的額外等待時間——這用於確保 rendezvous 不會“過快”完成(這可能會排除在同一時間嘗試加入的額外節點)。

如果屏障收集到 max 個節點,rendezvous 將立即完成。

還有一個總體超時機制,如果從未達到 min 個節點,rendezvous 將失敗——這旨在作為一種簡單的故障安全措施,以幫助釋放部分分配的作業資源,以防資源管理器出現問題,並且不應被視為可重試的。

Exclusivity(排他性):

簡單的分散式屏障是不夠的,因為我們還需要確保在任何給定時間(對於給定的作業)只存在一個節點組。換句話說,新節點(即遲到的節點)不應能夠為同一個作業形成並行獨立的 worker 組。

Torch Distributed Elastic rendezvous 確保如果一個節點組已經完成了 rendezvous(並因此可能已經在訓練),那麼試圖進行 rendezvous 的其他“遲到”節點將僅宣佈自己正在等待,並且必須等到(之前完成的)現有 rendezvous 首先被銷燬。

Consistency(一致性):

當 rendezvous 完成時,所有成員都將就作業成員身份及其中的每個人的角色達成一致。這個角色用一個整數表示,稱為 rank(秩),它介於 0 和 world_size 之間。

請注意,rank不穩定,因為同一個節點在下一次(重新)rendezvous 時可能會被分配不同的 rank。

Fault-tolerance(容錯性):

Torch Distributed Elastic rendezvous 被設計為能夠容忍 rendezvous 過程中的節點故障。如果在加入 rendezvous 和其完成之間發生程序崩潰(或失去網路連線等),則會自動使用剩餘的健康節點進行重新 rendezvous。

節點還可能在完成 rendezvous(或被其他節點觀察到已完成)之後發生故障——這種情況將由 Torch Distributed Elastic 的 train_loop 處理(它也將觸發重新 rendezvous)。

Shared key-value store(共享鍵值儲存):

當 rendezvous 完成時,會建立一個共享的鍵值儲存並返回。此儲存實現了 torch.distributed.Store API(參見 分散式通訊文件)。

此儲存僅由已完成 rendezvous 的成員共享。它旨在供 Torch Distributed Elastic 使用,以交換初始化作業控制和資料平面所需的資訊。

Waiting workers and rendezvous closing(等待的 worker 和 rendezvous 關閉):

Torch Distributed Elastic rendezvous handler 物件提供了附加功能,這些功能在技術上不屬於 rendezvous 過程。

  1. 查詢有多少 worker 遲到屏障,它們可以參與下一個 rendezvous。

  2. 設定 rendezvous 為關閉狀態,以通知所有節點不要參與下一個 rendezvous。

DynamicRendezvousHandler:

Torch Distributed Elastic 提供了 DynamicRendezvousHandler 類,該類實現了上述 rendezvous 機制。它是一種與後端無關的型別,在構造時需要指定一個 RendezvousBackend 例項。

Torch 分散式使用者可以實現自己的後端型別,也可以使用 PyTorch 提供的以下實現之一:

下面是描述 rendezvous 如何工作的狀態圖。

../_images/etcd_rdzv_diagram.png

Registry(登錄檔)#

class torch.distributed.elastic.rendezvous.RendezvousParameters(backend, endpoint, run_id, min_nodes, max_nodes, local_addr=None, **kwargs)[source]#

儲存用於構造 RendezvousHandler 的引數。

引數
  • backend (str) – 用於處理 rendezvous 的後端名稱。

  • endpoint (str) – rendezvous 的端點,通常格式為 <hostname>[:<port>]。

  • run_id (str) – rendezvous 的 ID。

  • min_nodes (int) – 允許進入 rendezvous 的最小節點數。

  • max_nodes (int) – 允許進入 rendezvous 的最大節點數。

  • local_addr (Optional[str]) – 本地節點的地址。

  • **kwargs – 指定後端的附加引數。

get(key, default=None)[source]#

如果 key 存在,則返回 key 的值,否則返回 default

返回型別

任何

get_as_bool(key, default=None)[source]#

key 的值作為 bool 返回。

返回型別

Optional[bool]

get_as_int(key, default=None)[source]#

key 的值作為 int 返回。

返回型別

Optional[int]

class torch.distributed.elastic.rendezvous.RendezvousHandlerRegistry[source]#

代表 RendezvousHandler 後端的登錄檔。

Handler(處理程式)#

class torch.distributed.elastic.rendezvous.RendezvousHandler[source]#

主要 rendezvous 介面。

注意

分散式 PyTorch 使用者通常不需要實現自己的 RendezvousHandler。已提供基於 C10d Store 的實現,並且推薦給大多數使用者使用。

abstract get_backend()[source]#

返回 rendezvous 後端的名稱。

返回型別

str

abstract get_run_id()[source]#

返回 rendezvous 的 run id。

run id 是一個使用者定義的 ID,用於唯一標識分散式應用程式的一個例項。它通常對映到作業 ID,並用於允許節點加入正確的分散式應用程式。

返回型別

str

abstract is_closed()[source]#

檢查 rendezvous 是否已關閉。

關閉的 rendezvous 意味著同一作業中所有未來的重新 rendezvous 嘗試都將失敗。

is_closed()set_closed() 具有最終傳播的語義,不應用於同步。其目的是,如果至少有一個節點決定作業已完成,它將關閉 rendezvous,其他節點也將很快觀察到這一點並停止執行。

返回型別

布林值

abstract next_rendezvous()[source]#

rendezvous 屏障的主入口點。

阻塞直到 rendezvous 完成並且當前程序包含在形成的 worker 組中,或者發生超時,或者 rendezvous 被標記為已關閉。

返回

返回 RendezvousInfo 例項。

引發
返回型別

RendezvousInfo

abstract num_nodes_waiting()[source]#

返回遲到 rendezvous 屏障的節點數量,這些節點未包含在當前 worker 組中。

呼叫者應定期呼叫此方法以檢查是否有新節點正在等待加入作業,如果有,則透過呼叫 next_rendezvous()(重新 rendezvous)來接納它們。

返回型別

int

abstract set_closed()[source]#

將 rendezvous 標記為已關閉。

abstract shutdown()[source]#

關閉為 rendezvous 開啟的所有資源。

示例

rdzv_handler = ...
try:
    store, rank, world_size = rdzv_handler.next_rendezvous()
finally:
    rdzv_handler.shutdown()
返回型別

布林值

property use_agent_store: bool#

指示透過 next_rendezvous() 返回的 store 引用可以與使用者應用程式共享,並在應用程式生命週期內可用。

Rendezvous handler impl 將 store 詳細資訊作為 RendezvousStoreInfo 的例項共享。應用程式通常使用 MASTER_ADDR/MASTER_PORT 環境變數來查詢 store。

Dataclasses(資料類)#

class torch.distributed.elastic.rendezvous.RendezvousInfo(store, rank, world_size, bootstrap_store_info)[source]#

儲存 rendezvous 的資訊。

class torch.distributed.elastic.rendezvous.api.RendezvousStoreInfo(master_addr, master_port)[source]#

可用於引導 trainer 分散式通訊的 store 地址和埠。

static build(rank, store)[source]#

工廠方法,在 rank0 主機上查詢未使用的埠,並與所有 ranks 共享地址/埠資訊。

如果 master_addr/master_port 已知(當共享現有 TCP store 伺服器時有用),請使用建構函式。

引數
  • rank (int) – 當前節點的 rank。

  • store (Store) – 用於 rendezvous 的 store。

  • local_addr (Optional[str]) – 當前節點的地址,如果未提供,將從主機名解析。

  • server_port (Optional[int]) – TCPStore 伺服器的埠,當 TCPStore 被共享時。

返回型別

RendezvousStoreInfo

Exceptions(異常)#

class torch.distributed.elastic.rendezvous.api.RendezvousError[source]#

代表 rendezvous 錯誤的基類。

class torch.distributed.elastic.rendezvous.api.RendezvousClosedError[source]#

在 rendezvous 關閉時引發。

class torch.distributed.elastic.rendezvous.api.RendezvousTimeoutError[source]#

在 rendezvous 未按時完成時引發。

class torch.distributed.elastic.rendezvous.api.RendezvousConnectionError[source]#

在與 rendezvous 後端的連線失敗時引發。

class torch.distributed.elastic.rendezvous.api.RendezvousStateError[source]#

在 rendezvous 狀態損壞時引發。

class torch.distributed.elastic.rendezvous.api.RendezvousGracefulExitError[source]#

當節點未包含在 rendezvous 中並優雅退出時引發。

此異常是一種退出堆疊的機制,但不表示失敗。

Implementations(實現)#

Dynamic Rendezvous(動態 Rendezvous)#

torch.distributed.elastic.rendezvous.dynamic_rendezvous.create_handler(store, backend, params)[source]#

根據指定引數建立一個新的 DynamicRendezvousHandler

引數
  • store (Store) – 作為 rendezvous 的一部分返回的 C10d store。

  • backend (RendezvousBackend) – 用於儲存 rendezvous 狀態的後端。

返回型別

DynamicRendezvousHandler

引數

描述

join_timeout

rendezvous 預計完成的總時間(秒)。預設為 600 秒。

last_call_timeout

在達到最小節點數後,在完成 rendezvous 之前的額外等待時間(秒)。預設為 30 秒。

close_timeout

在呼叫 RendezvousHandler.set_closed()RendezvousHandler.shutdown() 後,rendezvous 預計關閉的時間(秒)。預設為 30 秒。

heartbeat

保持連線的心跳(heartbeat)預計完成的時間(秒)。

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.DynamicRendezvousHandler[source]#

代表一個用於在一組節點之間設定 rendezvous 的處理程式。

classmethod from_backend(run_id, store, backend, min_nodes, max_nodes, local_addr=None, timeout=None, keep_alive_interval=5, keep_alive_max_attempt=3)[source]#

建立一個新的 DynamicRendezvousHandler

引數
  • run_id (str) – rendezvous 的 run id。

  • store (Store) – 作為 rendezvous 的一部分返回的 C10d store。

  • backend (RendezvousBackend) – 用於儲存 rendezvous 狀態的後端。

  • min_nodes (int) – 允許進入 rendezvous 的最小節點數。

  • max_nodes (int) – 允許進入 rendezvous 的最大節點數。

  • local_addr (Optional[str]) – 本地節點的地址。

  • timeout (Optional[RendezvousTimeout]) – rendezvous 的超時配置。

  • keep_alive_interval (int) – 節點在傳送心跳以保持其在 rendezvous 中存活之前等待的時間。

  • keep_alive_max_attempt (int) – 在節點被視為死亡之前,失敗的心跳嘗試次數。

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousBackend[source]#

代表一個儲存 rendezvous 狀態的後端。

abstract get_state()[source]#

獲取 rendezvous 狀態。

返回

編碼的 rendezvous 狀態及其 fencing token 的元組,如果沒有在後端找到狀態則為 None

引發
返回型別

Optional[tuple[bytes, Any]]

abstract property name: str#

獲取後端的名稱。

abstract set_state(state, token=None)[source]#

設定 rendezvous 狀態。

新的 rendezvous 狀態是條件性設定的。

  • 如果指定的 token 與後端中儲存的 fencing token 匹配,則狀態將被更新。新的狀態將與它的 fencing token 一起返回給呼叫者。

  • 如果指定的 token 與後端中儲存的 fencing token 不匹配,則狀態不會被更新;相反,現有狀態將與它的 fencing token 一起返回給呼叫者。

  • 如果指定的 tokenNone,則僅當後端中不存在現有狀態時,才會設定新狀態。新狀態或現有狀態及其 fencing token 將返回給呼叫者。

引數
  • state (bytes) – 編碼的 rendezvous 狀態。

  • token (Optional[Any]) – 一個可選的 fencing token,該 token 是透過之前呼叫 get_state()set_state() 獲取的。

返回

序列化的 rendezvous 狀態、其 fencing token 以及一個指示設定嘗試是否成功的布林值的元組。

引發
返回型別

Optional[tuple[bytes, Any, bool]]

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousTimeout(join=None, last_call=None, close=None, heartbeat=None)[source]#

儲存 rendezvous 的超時配置。

引數
  • join (Optional[timedelta]) – rendezvous 預計完成的時間。

  • last_call (Optional[timedelta]) – rendezvous 達到所需參與者最小數量後,在完成 rendezvous 之前的額外等待時間。

  • close (Optional[timedelta]) – 呼叫 RendezvousHandler.set_closed()RendezvousHandler.shutdown() 後,rendezvous 預計關閉的時間。

  • heartbeat (Optional[timedelta]) – 心跳(heartbeat)預計完成的時間。

property close: timedelta#

獲取關閉超時時間。

property heartbeat: timedelta#

獲取心跳(heartbeat)超時時間。

property join: timedelta#

獲取加入(join)超時時間。

property last_call: timedelta#

獲取最後呼叫(last call)超時時間。

C10d Backend(C10d 後端)#

torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.create_backend(params)[source]#

根據指定引數建立一個新的 C10dRendezvousBackend

引數

描述

store_type

C10d store 的型別。當前支援的型別是“tcp”和“file”,分別對應 torch.distributed.TCPStoretorch.distributed.FileStore。預設為“tcp”。

read_timeout

store 操作的讀取超時時間(秒)。預設為 60 秒。

注意,這僅適用於 torch.distributed.TCPStore。它與 torch.distributed.FileStore 無關,後者不接受超時作為引數。

is_host

一個布林值,指示此後端例項是否將託管 C10d store。如果未指定,將透過將此機器的主機名或 IP 地址與指定的 rendezvous 端點進行匹配來推斷。預設為 None

注意,此配置選項僅適用於 torch.distributed.TCPStore。在正常情況下,您可以安全地跳過它;唯一需要的時候是當其值無法正確確定時(例如,rendezvous 端點的主機名是 CNAME,或者不匹配機器的 FQDN)。

返回型別

tuple[torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend, torch.distributed.distributed_c10d.Store]

class torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend(store, run_id)[source]#

代表一個基於 C10d 的 rendezvous 後端。

引數
get_state()[source]#

參見基類。

返回型別

Optional[tuple[bytes, Any]]

property name: str#

參見基類。

set_state(state, token=None)[source]#

參見基類。

返回型別

Optional[tuple[bytes, Any, bool]]

Etcd Backend(Etcd 後端)#

torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.create_backend(params)[source]#

根據指定引數建立一個新的 EtcdRendezvousBackend

引數

描述

read_timeout

etcd 操作的讀取超時時間(秒)。預設為 60 秒。

protocol

用於與 etcd 通訊的協議。有效值為“http”和“https”。預設為“http”。

ssl_cert

用於 HTTPS 的 SSL 客戶端證書路徑。預設為 None

ssl_cert_key

用於 HTTPS 的 SSL 客戶端證書私鑰路徑。預設為 None

ca_cert

根 SSL 證書頒發機構的路徑。預設為 None

返回型別

tuple[torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.EtcdRendezvousBackend, torch.distributed.distributed_c10d.Store]

class torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.EtcdRendezvousBackend(client, run_id, key_prefix=None, ttl=None)[source]#

代表一個基於 etcd 的 rendezvous 後端。

引數
  • client (Client) – 用於與 etcd 通訊的 etcd.Client 例項。

  • run_id (str) – rendezvous 的 run id。

  • key_prefix (Optional[str]) – 在 etcd 中儲存 rendezvous 狀態的路徑。

  • ttl (Optional[int]) – rendezvous 狀態的 TTL。如果未指定,則預設為兩小時。

get_state()[source]#

參見基類。

返回型別

Optional[tuple[bytes, Any]]

property name: str#

參見基類。

set_state(state, token=None)[source]#

參見基類。

返回型別

Optional[tuple[bytes, Any, bool]]

Etcd Rendezvous (Legacy)(Etcd Rendezvous (遺留版))#

警告

DynamicRendezvousHandler 類取代了 EtcdRendezvousHandler 類,並且是大多數使用者的推薦選擇。EtcdRendezvousHandler 處於維護模式,未來將被棄用。

class torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl, local_addr)[source]#

實現了由 torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous 支援的 torch.distributed.elastic.rendezvous.RendezvousHandler 介面。EtcdRendezvousHandler 使用 URL 來配置要使用的 rendezvous 型別,並將實現特定的配置傳遞給 rendezvous 模組。基本的 etcd rendezvous 配置 URL 如下所示:

etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers>  # noqa: W605

-- example --

etcd://:2379/1234?min_workers=1&max_workers=3

上述 URL 的解釋如下:

  1. 使用與 etcd scheme 註冊的 rendezvous handler。

  2. 要使用的 etcd 端點是 localhost:2379

  3. job_id == 1234 用作 etcd 中的字首(這允許在共享公共 etcd 伺服器的情況下,只要 job_ids 保證是唯一的,就可以支援多個作業)。請注意,作業 ID 可以是任何字串(例如,不需要是數字),只要它是唯一的。

  4. min_workers=1max_workers=3 指定了成員資格大小的範圍——只要叢集大小大於或等於 min_workers,Torch Distributed Elastic 就會開始執行作業,並允許最多 max_workers 個節點加入叢集。

以下是可傳遞給 etcd rendezvous 的引數的完整列表:

引數

描述

min_workers

rendezvous 有效的最小 worker 數量。

max_workers

要接納的最大 worker 數量。

timeout

next_rendezvous 預計成功執行的總超時時間(預設為 600 秒)。

last_call_timeout

在達到最小 worker 數量後的額外等待時間(“最後呼叫”(defaults to 30s))。

etcd_prefix

etcd 中的路徑字首(從 etcd 根目錄開始),所有 etcd 節點都將在其中建立(預設為 /torchelastic/p2p)。

Etcd Store(Etcd 儲存)#

當 etcd 用作 rendezvous 後端時,EtcdStorenext_rendezvous() 返回的 C10d Store 例項型別。

class torch.distributed.elastic.rendezvous.etcd_store.EtcdStore(etcd_client, etcd_store_prefix, timeout=None)[source]#

透過搭便車 rendezvous etcd 例項來實現 c10 Store 介面。

這是 EtcdRendezvous 返回的 store 物件。

add(key, num)[source]#

原子性地將值增加一個整數量。

整數以 10 進位制字串形式表示。如果 key 不存在,則假定預設值為 0

返回

新(已增加)的值。

返回型別

int

check(keys)[source]#

檢查所有 key 是否立即存在(無需等待)。

返回型別

布林值

get(key)[source]#

透過 key 獲取值,可能執行阻塞等待。

如果 key 未立即存在,則將阻塞等待最多 timeout 時間,或直到 key 被髮布。

返回

(bytes)

引發

LookupError - 如果在超時後 key 仍然未釋出

返回型別

位元組

set(key, value)[source]#

將 key/value 對寫入 EtcdStore

key 和 value 都可以是 Python strbytes

wait(keys, override_timeout=None)[source]#

等待直到所有 key 都被髮布,或者直到超時。

引發

LookupError - 如果發生超時

Etcd Server(Etcd 伺服器)#

EtcdServer 是一個方便類,可以輕鬆地在子程序中啟動和停止 etcd 伺服器。這對於測試或單節點(多 worker)部署非常有用,因為手動設定一個 sidecar etcd 伺服器比單獨設定 etcd 伺服器更方便。

警告

對於生產和多節點部署,請考慮正確部署高可用的 etcd 伺服器,因為它是您分散式作業的單點故障。

class torch.distributed.elastic.rendezvous.etcd_server.EtcdServer(data_dir=None)[source]#

注意

已在 etcd 伺服器 v3.4.3 上進行測試。

在隨機空閒埠上啟動和停止本地獨立 etcd 伺服器。對於單節點、多 worker 啟動或測試非常有用,其中 sidecar etcd 伺服器比單獨設定 etcd 伺服器更方便。

此類會註冊一個終止處理程式來在退出時關閉 etcd 子程序。此終止處理程式不是呼叫 stop() 方法的替代品。

以下回退機制用於查詢 etcd 二進位制檔案:

  1. 使用環境變數 TORCHELASTIC_ETCD_BINARY_PATH。

  2. 如果存在,則使用 <this file root>/bin/etcd

  3. 使用 PATH 中的 etcd

用法

server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd")
server.start()
client = server.get_client()
# use client
server.stop()
引數

etcd_binary_path – etcd 伺服器二進位制檔案的路徑(參見上方回退路徑)。