Rendezvous#
創建於: 2021年5月4日 | 最後更新於: 2024年5月22日
在 Torch Distributed Elastic 的上下文中,我們使用“rendezvous”(會合/碰頭)一詞來指代一種結合了分散式同步原語和對等發現的特定功能。
Torch Distributed Elastic 使用它來收集訓練作業的參與者(即節點),以便它們都就參與者列表及其各自的角色達成一致,並就何時可以開始/恢復訓練做出一致的集體決策。
Torch Distributed Elastic rendezvous 提供了以下關鍵功能:
Barrier(屏障):
執行 rendezvous 的節點將全部阻塞,直到 rendezvous 被認為完成為止——當至少有 min 個總節點加入 rendezvous 屏障(對於同一作業)時,即完成。這也意味著屏障的大小不一定是固定的。
在達到 min 個節點後,還會有一個小的額外等待時間——這用於確保 rendezvous 不會“過快”完成(這可能會排除在同一時間嘗試加入的額外節點)。
如果屏障收集到 max 個節點,rendezvous 將立即完成。
還有一個總體超時機制,如果從未達到 min 個節點,rendezvous 將失敗——這旨在作為一種簡單的故障安全措施,以幫助釋放部分分配的作業資源,以防資源管理器出現問題,並且不應被視為可重試的。
Exclusivity(排他性):
簡單的分散式屏障是不夠的,因為我們還需要確保在任何給定時間(對於給定的作業)只存在一個節點組。換句話說,新節點(即遲到的節點)不應能夠為同一個作業形成並行獨立的 worker 組。
Torch Distributed Elastic rendezvous 確保如果一個節點組已經完成了 rendezvous(並因此可能已經在訓練),那麼試圖進行 rendezvous 的其他“遲到”節點將僅宣佈自己正在等待,並且必須等到(之前完成的)現有 rendezvous 首先被銷燬。
Consistency(一致性):
當 rendezvous 完成時,所有成員都將就作業成員身份及其中的每個人的角色達成一致。這個角色用一個整數表示,稱為 rank(秩),它介於 0 和 world_size 之間。
請注意,rank不穩定,因為同一個節點在下一次(重新)rendezvous 時可能會被分配不同的 rank。
Fault-tolerance(容錯性):
Torch Distributed Elastic rendezvous 被設計為能夠容忍 rendezvous 過程中的節點故障。如果在加入 rendezvous 和其完成之間發生程序崩潰(或失去網路連線等),則會自動使用剩餘的健康節點進行重新 rendezvous。
節點還可能在完成 rendezvous(或被其他節點觀察到已完成)之後發生故障——這種情況將由 Torch Distributed Elastic 的 train_loop 處理(它也將觸發重新 rendezvous)。
Shared key-value store(共享鍵值儲存):
當 rendezvous 完成時,會建立一個共享的鍵值儲存並返回。此儲存實現了 torch.distributed.Store API(參見 分散式通訊文件)。
此儲存僅由已完成 rendezvous 的成員共享。它旨在供 Torch Distributed Elastic 使用,以交換初始化作業控制和資料平面所需的資訊。
Waiting workers and rendezvous closing(等待的 worker 和 rendezvous 關閉):
Torch Distributed Elastic rendezvous handler 物件提供了附加功能,這些功能在技術上不屬於 rendezvous 過程。
查詢有多少 worker 遲到屏障,它們可以參與下一個 rendezvous。
設定 rendezvous 為關閉狀態,以通知所有節點不要參與下一個 rendezvous。
DynamicRendezvousHandler:
Torch Distributed Elastic 提供了 DynamicRendezvousHandler 類,該類實現了上述 rendezvous 機制。它是一種與後端無關的型別,在構造時需要指定一個 RendezvousBackend 例項。
Torch 分散式使用者可以實現自己的後端型別,也可以使用 PyTorch 提供的以下實現之一:
C10dRendezvousBackend:使用 C10d 儲存(預設為TCPStore)作為 rendezvous 後端。使用 C10d 儲存的主要優點是它不需要第三方依賴項(如 etcd)即可建立 rendezvous。EtcdRendezvousBackend:取代了舊版的EtcdRendezvousHandler類。將EtcdRendezvousBackend例項傳遞給DynamicRendezvousHandler在功能上等同於例項化一個EtcdRendezvousHandler。store = TCPStore("localhost") backend = C10dRendezvousBackend(store, "my_run_id") rdzv_handler = DynamicRendezvousHandler.from_backend( run_id="my_run_id", store=store, backend=backend, min_nodes=2, max_nodes=4 )
下面是描述 rendezvous 如何工作的狀態圖。
Registry(登錄檔)#
- class torch.distributed.elastic.rendezvous.RendezvousParameters(backend, endpoint, run_id, min_nodes, max_nodes, local_addr=None, **kwargs)[source]#
儲存用於構造
RendezvousHandler的引數。- 引數
- class torch.distributed.elastic.rendezvous.RendezvousHandlerRegistry[source]#
代表
RendezvousHandler後端的登錄檔。
Handler(處理程式)#
- class torch.distributed.elastic.rendezvous.RendezvousHandler[source]#
主要 rendezvous 介面。
注意
分散式 PyTorch 使用者通常不需要實現自己的
RendezvousHandler。已提供基於 C10d Store 的實現,並且推薦給大多數使用者使用。- abstract get_run_id()[source]#
返回 rendezvous 的 run id。
run id 是一個使用者定義的 ID,用於唯一標識分散式應用程式的一個例項。它通常對映到作業 ID,並用於允許節點加入正確的分散式應用程式。
- 返回型別
- abstract is_closed()[source]#
檢查 rendezvous 是否已關閉。
關閉的 rendezvous 意味著同一作業中所有未來的重新 rendezvous 嘗試都將失敗。
is_closed()和set_closed()具有最終傳播的語義,不應用於同步。其目的是,如果至少有一個節點決定作業已完成,它將關閉 rendezvous,其他節點也將很快觀察到這一點並停止執行。- 返回型別
- abstract next_rendezvous()[source]#
rendezvous 屏障的主入口點。
阻塞直到 rendezvous 完成並且當前程序包含在形成的 worker 組中,或者發生超時,或者 rendezvous 被標記為已關閉。
- 返回
返回
RendezvousInfo例項。- 引發
RendezvousClosedError – rendezvous 已關閉。
RendezvousConnectionError – 與 rendezvous 後端的連線失敗。
RendezvousStateError – rendezvous 狀態損壞。
RendezvousTimeoutError – rendezvous 未按時完成。
- 返回型別
- abstract num_nodes_waiting()[source]#
返回遲到 rendezvous 屏障的節點數量,這些節點未包含在當前 worker 組中。
呼叫者應定期呼叫此方法以檢查是否有新節點正在等待加入作業,如果有,則透過呼叫
next_rendezvous()(重新 rendezvous)來接納它們。- 返回型別
- abstract shutdown()[source]#
關閉為 rendezvous 開啟的所有資源。
示例
rdzv_handler = ... try: store, rank, world_size = rdzv_handler.next_rendezvous() finally: rdzv_handler.shutdown()
- 返回型別
- property use_agent_store: bool#
指示透過
next_rendezvous()返回的 store 引用可以與使用者應用程式共享,並在應用程式生命週期內可用。Rendezvous handler impl 將 store 詳細資訊作為
RendezvousStoreInfo的例項共享。應用程式通常使用 MASTER_ADDR/MASTER_PORT 環境變數來查詢 store。
Dataclasses(資料類)#
- class torch.distributed.elastic.rendezvous.RendezvousInfo(store, rank, world_size, bootstrap_store_info)[source]#
儲存 rendezvous 的資訊。
Exceptions(異常)#
- class torch.distributed.elastic.rendezvous.api.RendezvousTimeoutError[source]#
在 rendezvous 未按時完成時引發。
Implementations(實現)#
Dynamic Rendezvous(動態 Rendezvous)#
- torch.distributed.elastic.rendezvous.dynamic_rendezvous.create_handler(store, backend, params)[source]#
根據指定引數建立一個新的
DynamicRendezvousHandler。- 引數
store (Store) – 作為 rendezvous 的一部分返回的 C10d store。
backend (RendezvousBackend) – 用於儲存 rendezvous 狀態的後端。
- 返回型別
引數
描述
join_timeout
rendezvous 預計完成的總時間(秒)。預設為 600 秒。
last_call_timeout
在達到最小節點數後,在完成 rendezvous 之前的額外等待時間(秒)。預設為 30 秒。
close_timeout
在呼叫
RendezvousHandler.set_closed()或RendezvousHandler.shutdown()後,rendezvous 預計關閉的時間(秒)。預設為 30 秒。heartbeat
保持連線的心跳(heartbeat)預計完成的時間(秒)。
- class torch.distributed.elastic.rendezvous.dynamic_rendezvous.DynamicRendezvousHandler[source]#
代表一個用於在一組節點之間設定 rendezvous 的處理程式。
- classmethod from_backend(run_id, store, backend, min_nodes, max_nodes, local_addr=None, timeout=None, keep_alive_interval=5, keep_alive_max_attempt=3)[source]#
建立一個新的
DynamicRendezvousHandler。- 引數
run_id (str) – rendezvous 的 run id。
store (Store) – 作為 rendezvous 的一部分返回的 C10d store。
backend (RendezvousBackend) – 用於儲存 rendezvous 狀態的後端。
min_nodes (int) – 允許進入 rendezvous 的最小節點數。
max_nodes (int) – 允許進入 rendezvous 的最大節點數。
timeout (Optional[RendezvousTimeout]) – rendezvous 的超時配置。
keep_alive_interval (int) – 節點在傳送心跳以保持其在 rendezvous 中存活之前等待的時間。
keep_alive_max_attempt (int) – 在節點被視為死亡之前,失敗的心跳嘗試次數。
- class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousBackend[source]#
代表一個儲存 rendezvous 狀態的後端。
- abstract get_state()[source]#
獲取 rendezvous 狀態。
- 返回
編碼的 rendezvous 狀態及其 fencing token 的元組,如果沒有在後端找到狀態則為
None。- 引發
RendezvousConnectionError – 與後端的連線失敗。
RendezvousStateError – rendezvous 狀態損壞。
- 返回型別
- abstract set_state(state, token=None)[source]#
設定 rendezvous 狀態。
新的 rendezvous 狀態是條件性設定的。
如果指定的
token與後端中儲存的 fencing token 匹配,則狀態將被更新。新的狀態將與它的 fencing token 一起返回給呼叫者。如果指定的
token與後端中儲存的 fencing token 不匹配,則狀態不會被更新;相反,現有狀態將與它的 fencing token 一起返回給呼叫者。如果指定的
token為None,則僅當後端中不存在現有狀態時,才會設定新狀態。新狀態或現有狀態及其 fencing token 將返回給呼叫者。
- 引數
state (bytes) – 編碼的 rendezvous 狀態。
token (Optional[Any]) – 一個可選的 fencing token,該 token 是透過之前呼叫
get_state()或set_state()獲取的。
- 返回
序列化的 rendezvous 狀態、其 fencing token 以及一個指示設定嘗試是否成功的布林值的元組。
- 引發
RendezvousConnectionError – 與後端的連線失敗。
RendezvousStateError – rendezvous 狀態損壞。
- 返回型別
- class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousTimeout(join=None, last_call=None, close=None, heartbeat=None)[source]#
儲存 rendezvous 的超時配置。
C10d Backend(C10d 後端)#
- torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.create_backend(params)[source]#
根據指定引數建立一個新的
C10dRendezvousBackend。引數
描述
store_type
C10d store 的型別。當前支援的型別是“tcp”和“file”,分別對應
torch.distributed.TCPStore和torch.distributed.FileStore。預設為“tcp”。read_timeout
store 操作的讀取超時時間(秒)。預設為 60 秒。
注意,這僅適用於
torch.distributed.TCPStore。它與torch.distributed.FileStore無關,後者不接受超時作為引數。is_host
一個布林值,指示此後端例項是否將託管 C10d store。如果未指定,將透過將此機器的主機名或 IP 地址與指定的 rendezvous 端點進行匹配來推斷。預設為
None。注意,此配置選項僅適用於
torch.distributed.TCPStore。在正常情況下,您可以安全地跳過它;唯一需要的時候是當其值無法正確確定時(例如,rendezvous 端點的主機名是 CNAME,或者不匹配機器的 FQDN)。
- class torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend(store, run_id)[source]#
代表一個基於 C10d 的 rendezvous 後端。
- 引數
store (Store) – 用於與 C10d store 通訊的
torch.distributed.Store例項。run_id (str) – rendezvous 的 run id。
Etcd Backend(Etcd 後端)#
- torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.create_backend(params)[source]#
根據指定引數建立一個新的
EtcdRendezvousBackend。引數
描述
read_timeout
etcd 操作的讀取超時時間(秒)。預設為 60 秒。
protocol
用於與 etcd 通訊的協議。有效值為“http”和“https”。預設為“http”。
ssl_cert
用於 HTTPS 的 SSL 客戶端證書路徑。預設為
None。ssl_cert_key
用於 HTTPS 的 SSL 客戶端證書私鑰路徑。預設為
None。ca_cert
根 SSL 證書頒發機構的路徑。預設為
None。
Etcd Rendezvous (Legacy)(Etcd Rendezvous (遺留版))#
警告
DynamicRendezvousHandler 類取代了 EtcdRendezvousHandler 類,並且是大多數使用者的推薦選擇。EtcdRendezvousHandler 處於維護模式,未來將被棄用。
- class torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl, local_addr)[source]#
實現了由
torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous支援的torch.distributed.elastic.rendezvous.RendezvousHandler介面。EtcdRendezvousHandler使用 URL 來配置要使用的 rendezvous 型別,並將實現特定的配置傳遞給 rendezvous 模組。基本的 etcd rendezvous 配置 URL 如下所示:etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers> # noqa: W605 -- example -- etcd://:2379/1234?min_workers=1&max_workers=3
上述 URL 的解釋如下:
使用與
etcdscheme 註冊的 rendezvous handler。要使用的
etcd端點是localhost:2379。job_id == 1234用作 etcd 中的字首(這允許在共享公共 etcd 伺服器的情況下,只要job_ids保證是唯一的,就可以支援多個作業)。請注意,作業 ID 可以是任何字串(例如,不需要是數字),只要它是唯一的。min_workers=1和max_workers=3指定了成員資格大小的範圍——只要叢集大小大於或等於min_workers,Torch Distributed Elastic 就會開始執行作業,並允許最多max_workers個節點加入叢集。
以下是可傳遞給 etcd rendezvous 的引數的完整列表:
引數
描述
min_workers
rendezvous 有效的最小 worker 數量。
max_workers
要接納的最大 worker 數量。
timeout
next_rendezvous 預計成功執行的總超時時間(預設為 600 秒)。
last_call_timeout
在達到最小 worker 數量後的額外等待時間(“最後呼叫”(defaults to 30s))。
etcd_prefix
etcd 中的路徑字首(從 etcd 根目錄開始),所有 etcd 節點都將在其中建立(預設為
/torchelastic/p2p)。
Etcd Store(Etcd 儲存)#
當 etcd 用作 rendezvous 後端時,EtcdStore 是 next_rendezvous() 返回的 C10d Store 例項型別。
- class torch.distributed.elastic.rendezvous.etcd_store.EtcdStore(etcd_client, etcd_store_prefix, timeout=None)[source]#
透過搭便車 rendezvous etcd 例項來實現 c10 Store 介面。
這是
EtcdRendezvous返回的 store 物件。
Etcd Server(Etcd 伺服器)#
EtcdServer 是一個方便類,可以輕鬆地在子程序中啟動和停止 etcd 伺服器。這對於測試或單節點(多 worker)部署非常有用,因為手動設定一個 sidecar etcd 伺服器比單獨設定 etcd 伺服器更方便。
警告
對於生產和多節點部署,請考慮正確部署高可用的 etcd 伺服器,因為它是您分散式作業的單點故障。
- class torch.distributed.elastic.rendezvous.etcd_server.EtcdServer(data_dir=None)[source]#
注意
已在 etcd 伺服器 v3.4.3 上進行測試。
在隨機空閒埠上啟動和停止本地獨立 etcd 伺服器。對於單節點、多 worker 啟動或測試非常有用,其中 sidecar etcd 伺服器比單獨設定 etcd 伺服器更方便。
此類會註冊一個終止處理程式來在退出時關閉 etcd 子程序。此終止處理程式不是呼叫
stop()方法的替代品。以下回退機制用於查詢 etcd 二進位制檔案:
使用環境變數 TORCHELASTIC_ETCD_BINARY_PATH。
如果存在,則使用
<this file root>/bin/etcd。使用
PATH中的etcd。
用法
server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd") server.start() client = server.get_client() # use client server.stop()
- 引數
etcd_binary_path – etcd 伺服器二進位制檔案的路徑(參見上方回退路徑)。