評價此頁

超時計時器#

創建於:2021 年 5 月 4 日 | 最後更新於:2024 年 4 月 25 日

超時計時器在與代理程式相同的程序中設定,並從您的指令碼中使用,以處理卡住的工作程序。當您進入一個有潛在卡住風險的程式碼塊時,您可以獲取一個超時計時器,該計時器指示計時器伺服器在程序未能在自行設定的到期截止時間之前釋放計時器時終止該程序。

用法

import torchelastic.timer as timer
import torchelastic.agent.server as agent

def main():
    start_method = "spawn"
    message_queue = mp.get_context(start_method).Queue()
    server = timer.LocalTimerServer(message, max_interval=0.01)
    server.start() # non-blocking

    spec = WorkerSpec(
                fn=trainer_func,
                args=(message_queue,),
                ...<OTHER_PARAMS...>)
    agent = agent.LocalElasticAgent(spec, start_method)
    agent.run()

def trainer_func(message_queue):
    timer.configure(timer.LocalTimerClient(message_queue))
    with timer.expires(after=60): # 60 second expiry
        # do some work

在上面的示例中,如果 trainer_func 的完成時間超過 60 秒,則工作程序將被終止,代理程式將重試工作程序組。

客戶端方法#

torch.distributed.elastic.timer.configure(timer_client)[source]#

配置計時器客戶端。必須在使用 expires 之前呼叫。

torch.distributed.elastic.timer.expires(after, scope=None, client=None)[source]#

獲取一個倒計時計時器,該計時器將在 after 秒後過期,除非它包裝的程式碼塊在規定時間內完成。當計時器過期時,該工作程序將有資格被回收。 “回收”的確切含義取決於客戶端實現。在大多數情況下,回收意味著終止工作程序。請注意,不保證工作程序會在 time.now() + after 時刻被回收,而是工作程序 “有資格” 被回收,並且客戶端通訊的 TimerServer 將最終決定何時以及如何回收具有過期計時器的工作程序。

用法

torch.distributed.elastic.timer.configure(LocalTimerClient())
with expires(after=10):
    torch.distributed.all_reduce(...)

伺服器/客戶端實現#

以下是 torchelastic 提供的計時器伺服器和客戶端對。

注意

計時器伺服器和客戶端必須始終成對實現和使用,因為伺服器和客戶端之間存在訊息傳遞協議。

以下是一對基於 multiprocess.Queue 實現的計時器伺服器和客戶端。

class torch.distributed.elastic.timer.LocalTimerServer(mp_queue, max_interval=60, daemon=True)[source]#

LocalTimerClient 一起工作的伺服器。客戶端預計是執行此伺服器的父程序的子程序。作業中的每個主機都應在本地啟動自己的計時器伺服器,每個伺服器例項管理本地工作程序(在同一主機上的程序中執行)的計時器。

class torch.distributed.elastic.timer.LocalTimerClient(mp_queue)[source]#

LocalTimerServer 對應的客戶端。此客戶端旨在在執行 LocalTimerServer 的同一主機上使用,並使用 pid 唯一標識工作程序。這在在一個具有多個 GPU 裝置的機器上為每個 GPU 產生一個(訓練器)子程序的情況下特別有用。

以下是基於命名管道實現的另一對計時器伺服器和客戶端。

class torch.distributed.elastic.timer.FileTimerServer(file_path, run_id, max_interval=10, daemon=True, log_event=None)[source]#

FileTimerClient 一起工作的伺服器。客戶端預計與執行此伺服器的程序在同一臺主機上執行。作業中的每個主機都應在本地啟動自己的計時器伺服器,每個伺服器例項管理本地工作程序(在同一主機上的程序中執行)的計時器。

引數
  • file_path (str) – str,FIFO 特殊檔案的路徑,將建立該檔案。

  • max_interval (float) – float,每個看門狗迴圈的最大間隔(秒)。

  • daemon (bool) – bool,是否以守護程序模式執行看門狗執行緒。守護程序執行緒不會阻止程序停止。

  • log_event (Optional[Callable[[str, Optional[FileTimerRequest]], None]]) – Callable[[Dict[str, str]], None],一個用於以 JSON 格式記錄事件的可選回撥。

class torch.distributed.elastic.timer.FileTimerClient(file_path, signal=Signals.SIGKILL)[source]#

FileTimerServer 對應的客戶端。此客戶端旨在在執行 FileTimerServer 的同一主機上使用,並使用 pid 唯一標識工作程序。此客戶端使用命名管道將計時器請求傳送到 FileTimerServer。此客戶端是生產者,而 FileTimerServer 是消費者。多個客戶端可以與同一個 FileTimerServer 一起工作。

引數
  • file_path (str) – str,FIFO 特殊檔案的路徑。 FileTimerServer 必須透過呼叫 os.mkfifo() 來建立它。

  • signal – signal,用於終止程序的訊號。使用負數或零訊號將不會終止程序。

編寫自定義計時器伺服器/客戶端#

要編寫自己的計時器伺服器和客戶端,請為伺服器擴充套件 torch.distributed.elastic.timer.TimerServer,為客戶端擴充套件 torch.distributed.elastic.timer.TimerClientTimerRequest 物件用於在伺服器和客戶端之間傳遞訊息。

class torch.distributed.elastic.timer.TimerRequest(worker_id, scope_id, expiration_time)[source]#

表示倒計時計時器獲取和釋放的資料物件,用於 TimerClientTimerServer 之間。負數的 expiration_time 應解釋為“釋放”請求。

注意

worker_id 的型別是特定於實現的。它是 TimerServer 和 TimerClient 實現用來唯一標識工作程序的任何內容。

class torch.distributed.elastic.timer.TimerServer(request_queue, max_interval, daemon=True)[source]#

監控活動計時器並及時過期它們的實體。此伺服器負責回收具有過期計時器的工作程序。

abstract clear_timers(worker_ids)[source]#

清除給定 worker_ids 的所有計時器。

abstract get_expired_timers(deadline)[source]#

返回每個 worker_id 的所有過期計時器。過期計時器是 expiration_time 小於或等於提供的截止時間的計時器。

返回型別

dict[str, list[torch.distributed.elastic.timer.api.TimerRequest]]

abstract register_timers(timer_requests)[source]#

處理傳入的計時器請求,並將它們註冊到伺服器。計時器請求可以是獲取計時器或釋放計時器的請求。具有負 expiration_time 的計時器請求應解釋為釋放計時器請求。

class torch.distributed.elastic.timer.TimerClient[source]#

透過與 TimerServer 通訊來獲取和釋放倒計時計時器的客戶端庫。

abstract acquire(scope_id, expiration_time)[source]#

為持有此客戶端物件的工作程序獲取一個計時器,給定 scope_id 和 expiration_time。通常會將計時器註冊到 TimerServer。

abstract release(scope_id)[source]#

釋放此客戶端代表的工作程序的 scope_id 的計時器。呼叫此方法後,範圍內的倒計時計時器不再生效。

除錯資訊日誌記錄#

torch.distributed.elastic.timer.debug_info_logging.log_debug_info_for_expired_timers(run_id, expired_timers)[source]#