torch.futures#
建立日期:2025 年 6 月 12 日 | 最後更新日期:2025 年 6 月 12 日
該包提供了一個 Future 型別,它封裝了非同步執行,以及一組簡化 Future 物件操作的實用函式。目前,Future 型別主要由 Distributed RPC Framework 使用。
- class torch.futures.Future(*, devices=None)#
對
torch._C.Future的封裝,它封裝了可呼叫物件的非同步執行,例如rpc_async()。它還公開了一組 API,用於添加回調函式和設定結果。警告
GPU 支援是一項 beta 功能,可能會發生變化。
- add_done_callback(callback)[source]#
將給定的回撥函式附加到此
Future,當Future完成時將執行該回調。可以向同一個Future新增多個回撥,但不能保證其執行順序。回撥函式必須接受一個引數,即此Future的引用。回撥函式可以使用value()方法獲取值。請注意,如果此Future已經完成,則將內聯執行給定的回撥。我們建議使用
then()方法,因為它提供了一種在回撥完成後進行同步的方法。add_done_callback如果回撥不返回任何內容,可能會更高效。但then()和add_done_callback在底層使用相同的回撥註冊 API。關於 GPU 張量,此方法與
then()的行為相同。- 引數
callback (
Future) – 一個Callable,它接受一個引數,即此Future的引用。
注意
請注意,如果回撥函式丟擲異常(無論是由於原始 Future 以異常完成並呼叫
fut.wait(),還是由於回撥中的其他程式碼),則必須仔細處理錯誤。例如,如果此回撥後來完成了其他 Future,那麼這些 Future 不會被標記為錯誤完成,使用者負責獨立處理這些 Future 的完成/等待。示例
>>> def callback(fut): ... print("This will run after the future has finished.") ... print(fut.wait()) >>> fut = torch.futures.Future() >>> fut.add_done_callback(callback) >>> fut.set_result(5) This will run after the future has finished. 5
- done()[source]#
如果此
Future已完成,則返回True。如果Future具有結果或異常,則為完成。如果值包含位於 GPU 上的張量,則
Future.done()將返回True,即使填充這些張量的非同步核心尚未在裝置上完成執行,因為此時結果已可用,前提是執行適當的同步(參見wait())。- 返回型別
- set_exception(result)[source]#
為此
Future設定一個異常,這將使此Future標記為錯誤完成並觸發所有已附加的回撥。請注意,當呼叫此Future上的 wait()/value() 時,此處設定的異常將內聯丟擲。- 引數
result (BaseException) – 此
Future的異常。
示例
>>> fut = torch.futures.Future() >>> fut.set_exception(ValueError("foo")) >>> fut.wait() Traceback (most recent call last): ... ValueError: foo
- set_result(result)[source]#
為此
Future設定結果,這將使此Future標記為完成並觸發所有已附加的回撥。請注意,Future不能被標記為完成兩次。如果結果包含位於 GPU 上的張量,即使填充這些張量的非同步核心尚未在裝置上完成執行,也可以呼叫此方法,前提是該方法被呼叫時,那些核心所在的流被設定為當前流。簡單來說,只要在啟動這些核心後立即呼叫此方法,而不更改流,就可以安全地進行,無需任何額外的同步。此方法將在所有相關的當前流上記錄事件,並使用它們來確保此
Future的所有使用者得到正確的排程。- 引數
result (object) – 此
Future的結果物件。
示例
>>> import threading >>> import time >>> def slow_set_future(fut, value): ... time.sleep(0.5) ... fut.set_result(value) >>> fut = torch.futures.Future() >>> t = threading.Thread( ... target=slow_set_future, ... args=(fut, torch.ones(2) * 3) ... ) >>> t.start() >>> print(fut.wait()) tensor([3., 3.]) >>> t.join()
- then(callback)[source]#
將給定的回撥函式附加到此
Future,當Future完成時將執行該回調。可以向同一個Future新增多個回撥,但不能保證其執行順序(要強制執行特定順序,請考慮鏈式呼叫:fut.then(cb1).then(cb2))。回撥函式必須接受一個引數,即此Future的引用。回撥函式可以使用value()方法獲取值。請注意,如果此Future已經完成,則將立即內聯執行給定的回撥。如果
Future的值包含位於 GPU 上的張量,那麼在填充這些張量的非同步核心尚未在裝置上完成執行時,可能會呼叫該回調。但是,將使用一些專用的當前流(從全域性池獲取)來呼叫該回調,這些流將與那些核心同步。因此,回撥在這些張量上執行的任何操作都將在核心完成後在裝置上排程。換句話說,只要回撥不切換流,它就可以安全地操作結果,而無需任何額外的同步。這與wait()的非阻塞行為類似。同樣,如果回撥返回一個包含位於 GPU 上的張量的值,即使生成這些張量的核心仍在裝置上執行,也可以這樣做,只要回撥在其執行期間沒有更改流。如果要更改流,必須小心地將它們與原始流(即回撥被呼叫時是當前流的那些流)重新同步。
- 引數
callback (
Callable) – 一個Callable,它將此Future作為唯一引數。- 返回
一個新
Future物件,它持有callback的返回值,並在給定callback完成時被標記為完成。- 返回型別
Future[S]
注意
請注意,如果回撥函式丟擲異常(無論是由於原始 Future 以異常完成並呼叫
fut.wait(),還是由於回撥中的其他程式碼),那麼由then返回的 Future 將被恰當地標記為遇到錯誤。但是,如果此回撥後來完成了其他 Future,那麼這些 Future 不會被標記為錯誤完成,使用者負責獨立處理這些 Future 的完成/等待。示例
>>> def callback(fut): ... print(f"RPC return value is {fut.wait()}.") >>> fut = torch.futures.Future() >>> # The inserted callback will print the return value when >>> # receiving the response from "worker1" >>> cb_fut = fut.then(callback) >>> chain_cb_fut = cb_fut.then( ... lambda x : print(f"Chained cb done. {x.wait()}") ... ) >>> fut.set_result(5) RPC return value is 5. Chained cb done. None
- torch.futures.collect_all(futures)[source]#
將提供的
Future物件收集到一個單獨的合併Future中,當所有子 Future 都完成後,該 Future 將被標記為完成。- 引數
- 返回
返回一個
Future物件,該物件表示傳遞的 Future 列表。- 返回型別
- 示例:
>>> fut0 = torch.futures.Future() >>> fut1 = torch.futures.Future() >>> fut = torch.futures.collect_all([fut0, fut1]) >>> fut0.set_result(0) >>> fut1.set_result(1) >>> fut_list = fut.wait() >>> print(f"fut0 result = {fut_list[0].wait()}") fut0 result = 0 >>> print(f"fut1 result = {fut_list[1].wait()}") fut1 result = 1