評價此頁

多程序包 - torch.multiprocessing#

建立日期:2016 年 12 月 23 日 | 最後更新日期:2025 年 6 月 8 日

torch.multiprocessing 是對原生 multiprocessing 模組的封裝。

它註冊了自定義的 reducers,這些 reducers 使用共享記憶體,在不同程序中提供對同一資料的共享檢視。一旦 tensor/storage 被移動到 shared_memory(參見 share_memory_()),就可以在不進行任何複製的情況下將其傳送到其他程序。

API 與原始模組 100% 相容 - 只需將 import multiprocessing 改為 import torch.multiprocessing,就可以讓所有透過佇列傳送或透過其他機制共享的 tensor 被移動到共享記憶體中。

由於 API 的相似性,我們不對該包的大部分內容進行文件記錄,並建議參考原始模組非常優秀的文件。

警告

如果主程序異常退出(例如,由於收到訊號),Python 的 multiprocessing 有時會無法清理其子程序。這是一個已知的注意事項,因此如果您在中斷直譯器後看到任何資源洩漏,這可能意味著您剛剛遇到了這種情況。

策略管理#

torch.multiprocessing.get_all_sharing_strategies()[源]#

返回當前系統支援的共享策略集合。

torch.multiprocessing.get_sharing_strategy()[源]#

返回當前 CPU tensor 的共享策略。

torch.multiprocessing.set_sharing_strategy(new_strategy)[源]#

設定 CPU tensor 的共享策略。

引數

new_strategy (str) – 所選策略的名稱。應為 get_all_sharing_strategies() 返回的值之一。

共享 CUDA tensor#

僅在 Python 3 中支援在程序之間共享 CUDA tensor,使用 spawnforkserver 啟動方法。

與 CPU tensor 不同,傳送程序需要一直保留原始 tensor,直到接收程序保留該 tensor 的副本。引用計數在後臺實現,但要求使用者遵循以下最佳實踐。

警告

如果消費者程序因致命訊號而異常死亡,則只要傳送程序仍在執行,共享 tensor 就可能被永久保留在記憶體中。

  1. 在消費者中儘快釋放記憶體。

## Good
x = queue.get()
# do somethings with x
del x
## Bad
x = queue.get()
# do somethings with x
# do everything else (producer have to keep x in memory)
  1. 保持生產者程序執行,直到所有消費者退出。這將防止生產者程序釋放仍被消費者使用的記憶體。

## producer
# send tensors, do something
event.wait()
## consumer
# receive tensors and use them
event.set()
  1. 不要傳遞接收到的 tensor。

# not going to work
x = queue.get()
queue_2.put(x)
# you need to create a process-local copy
x = queue.get()
x_clone = x.clone()
queue_2.put(x_clone)
# putting and getting from the same queue in the same process will likely end up with segfault
queue.put(tensor)
x = queue.get()

共享策略#

本節簡要概述了不同共享策略的工作原理。請注意,這僅適用於 CPU tensor - CUDA tensor 將始終使用 CUDA API,因為這是它們可以共享的唯一方式。

檔案描述符 - file_descriptor#

注意

這是預設策略(macOS 和 OS X 除外,因為不支援)。

此策略將使用檔案描述符作為共享記憶體控制代碼。每當一個 storage 被移動到共享記憶體時,從 shm_open 獲取的檔案描述符會與物件一起快取,當它將被髮送到其他程序時,檔案描述符將被傳輸(例如,透過 UNIX 套接字)給它。接收方也會快取檔案描述符並 mmap 它,以獲得對 storage 資料的共享檢視。

請注意,如果共享的 tensor 很多,此策略將大部分時間保持大量開啟的檔案描述符。如果您的系統對開啟檔案描述符的數量有限制,並且您無法提高這些限制,則應使用 file_system 策略。

檔案系統 - file_system#

此策略將使用傳遞給 shm_open 的檔名來標識共享記憶體區域。這樣做的好處是不需要實現來快取從它獲取的檔案描述符,但同時容易導致共享記憶體洩漏。檔案建立後不能立即刪除,因為其他程序需要訪問它才能開啟它們的檢視。如果程序因致命錯誤而崩潰,或被終止且未呼叫 storage 解構函式,則檔案將保留在系統中。這非常嚴重,因為它們會一直佔用記憶體,直到系統重啟或手動釋放。

為了應對共享記憶體檔案洩漏的問題,torch.multiprocessing 會啟動一個名為 torch_shm_manager 的守護程序,它會與當前程序組隔離,並跟蹤所有共享記憶體分配。一旦所有連線到它的程序退出,它會等待片刻以確保沒有新的連線,然後遍歷該組分配的所有共享記憶體檔案。如果它發現其中任何一個仍然存在,它們將被解除分配。我們已經測試過這種方法,它在各種故障情況下都表現出了魯棒性。儘管如此,如果您的系統有足夠高的限制,並且 file_descriptor 是支援的策略,我們不建議切換到此策略。

生成子程序#

注意

適用於 Python >= 3.4。

這依賴於 Python multiprocessing 包中的 spawn 啟動方法。

透過建立 Process 例項並呼叫 join 來等待它們完成,可以生成指定數量的子程序來執行某個函式。這種方法在處理單個子程序時效果很好,但在處理多個程序時會帶來潛在的問題。

特別是,順序 joining 程序意味著它們將順序終止。如果它們不這樣做,並且第一個程序不終止,那麼程序的終止將不會被注意到。此外,沒有原生的錯誤傳播機制。

下面的 spawn 函式解決了這些問題,並處理了錯誤傳播、亂序終止,並且在檢測到其中一個程序出錯時會主動終止程序。

torch.multiprocessing.spawn.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn')[源]#

生成 nprocs 個程序,這些程序以 args 執行 fn

如果其中一個程序以非零退出狀態退出,則剩餘的程序將被殺死,並丟擲一個異常,說明終止的原因。如果子程序中捕獲到異常,則該異常將被轉發,其回溯將包含在父程序中丟擲的異常中。

引數
  • fn (function) –

    函式作為生成程序的入口點被呼叫。此函式必須定義在模組的頂層,以便可以對其進行 pickling 和 spawning。這是 multiprocessing 提出的要求。

    該函式被呼叫為 fn(i, *args),其中 i 是程序索引,args 是透過的引數元組。

  • args (tuple) – 傳遞給 fn 的引數。

  • nprocs (int) – 要生成的程序數。

  • join (bool) – 對所有程序執行阻塞 join。

  • daemon (bool) – 生成的程序的守護程序標誌。如果設定為 True,將建立守護程序。

  • start_method (str) – (已棄用) 此方法將始終使用 spawn 作為啟動方法。要使用不同的啟動方法,請使用 start_processes()

返回

如果 joinTrue,則為 None;如果 joinFalse,則為 ProcessContext

class torch.multiprocessing.SpawnContext[源]#

當呼叫 spawn() 時(使用 join=False),返回此物件。

join(timeout=None, grace_period=None)[源]#

Join spawn context中的一個或多個程序。

嘗試 join 此 spawn context 中的一個或多個程序。如果其中一個程序以非零退出狀態退出,此函式將(可選地帶有一個寬限期)終止剩餘程序,並丟擲一個異常,說明第一個程序退出的原因。

如果所有程序都成功 join,則返回 True;如果還有更多程序需要 join,則返回 False

引數
  • timeout (float) – 在放棄等待之前,等待這麼長時間(以秒為單位)。

  • grace_period (float) – 當任何程序失敗時,等待這麼長時間(以秒為單位)讓其他程序優雅關閉,然後才終止它們。如果它們仍然不退出,則在殺死它們之前再等待一個寬限期。