Multiprocessing#
創建於:2021年5月4日 | 最後更新於:2024年2月29日
用於啟動和管理 n 個工作子程序的庫,這些子程序可由函式或二進位制檔案指定。
對於函式,它使用 torch.multiprocessing(因此也包括 Python 的 multiprocessing)來建立/分叉工作程序。對於二進位制檔案,它使用 Python 的 subprocess.Popen 來建立工作程序。
用法 1:以函式形式啟動兩個訓練器
from torch.distributed.elastic.multiprocessing import Std, start_processes
def trainer(a, b, c):
pass # train
# runs two trainers
# LOCAL_RANK=0 trainer(1,2,3)
# LOCAL_RANK=1 trainer(4,5,6)
ctx = start_processes(
name="trainer",
entrypoint=trainer,
args={0: (1, 2, 3), 1: (4, 5, 6)},
envs={0: {"LOCAL_RANK": 0}, 1: {"LOCAL_RANK": 1}},
log_dir="/tmp/foobar",
redirects=Std.ALL, # write all worker stdout/stderr to a log file
tee={0: Std.ERR}, # tee only local rank 0's stderr to console
)
# waits for all copies of trainer to finish
ctx.wait()
用法 2:以二進位制形式啟動 2 個 echo 工作程序
# same as invoking
# echo hello
# echo world > stdout.log
ctx = start_processes(
name="echo"
entrypoint="echo",
log_dir="/tmp/foobar",
args={0: "hello", 1: "world"},
redirects={1: Std.OUT},
)
與 torch.multiprocessing 類似,函式 start_processes() 的返回值是一個程序上下文(api.PContext)。如果啟動的是函式,則返回 api.MultiprocessContext,如果啟動的是二進位制檔案,則返回 api.SubprocessContext。兩者都是父類 api.PContext 類的特定實現。
啟動多個工作程序#
- torch.distributed.elastic.multiprocessing.start_processes(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None, start_method='spawn', numa_options=None)[source]#
使用提供的選項啟動
n個entrypoint程序的副本。entrypoint可以是Callable(函式)或str(二進位制檔案)。副本數量由args和envs引數的條目數量決定,這些引數需要具有相同的鍵集。args和env引數是傳遞給入口點的引數和環境變數,它們由副本索引(本地秩)對映。必須包含所有本地秩。也就是說,鍵集應為{0,1,...,(nprocs-1)}。注意
當
entrypoint是二進位制檔案(str)時,args只能是字串。如果提供了任何其他型別,則會將其轉換為字串表示形式(例如str(arg1))。此外,二進位制檔案失敗僅在主函式被註釋為torch.distributed.elastic.multiprocessing.errors.record時才會寫入error.json錯誤檔案。對於函式啟動,這是預設行為,無需手動註釋@record註解。redirects和tee是位掩碼,指定要重定向到log_dir中的日誌檔案的標準流(stdout/stderr)。有效掩碼值定義在Std中。要僅重定向/分發特定本地秩的日誌,請將redirects作為字典傳遞,其中鍵是本地秩,用於指定該秩的重定向行為。任何缺失的本地秩將預設為Std.NONE。tee的作用類似於 Unix 的“tee”命令,它重定向 + 列印到控制檯。要避免工作程序的 stdout/stderr 列印到控制檯,請使用redirects引數。對於每個程序,
log_dir將包含:{local_rank}/error.json:如果程序失敗,則包含錯誤資訊的⽂件{local_rank}/stdout.log:如果redirect & STDOUT == STDOUT{local_rank}/stderr.log:如果redirect & STDERR == STDERR
注意
預期
log_dir存在、為空且為一個目錄。示例
log_dir = "/tmp/test" # ok; two copies of foo: foo("bar0"), foo("bar1") start_processes( name="trainer", entrypoint=foo, args:{0:("bar0",), 1:("bar1",), envs:{0:{}, 1:{}}, log_dir=log_dir ) # invalid; envs missing for local rank 1 start_processes( name="trainer", entrypoint=foo, args:{0:("bar0",), 1:("bar1",), envs:{0:{}}, log_dir=log_dir ) # ok; two copies of /usr/bin/touch: touch file1, touch file2 start_processes( name="trainer", entrypoint="/usr/bin/touch", args:{0:("file1",), 1:("file2",), envs:{0:{}, 1:{}}, log_dir=log_dir ) # caution; arguments casted to string, runs: # echo "1" "2" "3" and echo "[1, 2, 3]" start_processes( name="trainer", entrypoint="/usr/bin/echo", args:{0:(1,2,3), 1:([1,2,3],), envs:{0:{}, 1:{}}, log_dir=log_dir )
- 引數
- 返回型別
程序上下文#
- class torch.distributed.elastic.multiprocessing.api.PContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[source]#
透過不同機制啟動的一組程序的標準化操作的基類。
名稱
PContext的目的是為了區分torch.multiprocessing.ProcessContext。警告
stdout 和 stderr 應該始終是 tee_stdout 和 tee_stderr(分別)的超集,這是因為 tee 是透過重定向 + tail -f <stdout/stderr.log> 實現的。
- class torch.distributed.elastic.multiprocessing.api.MultiprocessContext(name, entrypoint, args, envs, start_method, logs_specs, log_line_prefixes=None, numa_options=None)[source]#
作為函式呼叫的工作程序的
PContext。
- class torch.distributed.elastic.multiprocessing.api.SubprocessContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None, numa_options=None)[source]#
作為二進位制檔案呼叫的工作程序的
PContext。
- class torch.distributed.elastic.multiprocessing.api.RunProcsResult(return_values=<factory>, failures=<factory>, stdouts=<factory>, stderrs=<factory>)[source]#
透過
start_processes()啟動的程序完成執行的結果。由PContext返回。請注意以下幾點:
所有欄位都由本地秩對映
return_values- 僅針對函式(而不是二進位制檔案)填充。stdouts- stdout.log 的路徑(如果沒有重定向則為空字串)stderrs- stderr.log 的路徑(如果沒有重定向則為空字串)
- class torch.distributed.elastic.multiprocessing.api.DefaultLogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[source]#
預設的 LogsSpecs 實現
log_dir 如果不存在將被建立
為每次嘗試和每個秩生成巢狀資料夾。
- class torch.distributed.elastic.multiprocessing.api.LogsDest(stdouts=<factory>, stderrs=<factory>, tee_stdouts=<factory>, tee_stderrs=<factory>, error_files=<factory>)[source]#
對於每種日誌型別,都包含本地秩 ID 到⽂件路徑的對映。
- class torch.distributed.elastic.multiprocessing.api.LogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[source]#
為每個工作程序定義日誌處理和重定向。
- 引數
redirects (Union[Std, dict[int, torch.distributed.elastic.multiprocessing.api.Std]]) – 重定向到⽂件的流。傳遞單個
Std列舉以重定向所有工作程序,或傳遞按 local_rank 鍵控的字典以選擇性重定向。tee (Union[Std, dict[int, torch.distributed.elastic.multiprocessing.api.Std]]) – 要複製到 stdout/stderr 的流。傳遞單個
Std列舉以複製所有工作程序的流,或傳遞按 local_rank 鍵控的字典以選擇性複製。