評價此頁

torch.futures#

建立日期:2025 年 6 月 12 日 | 最後更新日期:2025 年 6 月 12 日

該包提供了一個 Future 型別,它封裝了非同步執行,以及一組簡化 Future 物件操作的實用函式。目前,Future 型別主要由 Distributed RPC Framework 使用。

class torch.futures.Future(*, devices=None)#

torch._C.Future 的封裝,它封裝了可呼叫物件的非同步執行,例如 rpc_async()。它還公開了一組 API,用於添加回調函式和設定結果。

警告

GPU 支援是一項 beta 功能,可能會發生變化。

add_done_callback(callback)[source]#

將給定的回撥函式附加到此 Future,當 Future 完成時將執行該回調。可以向同一個 Future 新增多個回撥,但不能保證其執行順序。回撥函式必須接受一個引數,即此 Future 的引用。回撥函式可以使用 value() 方法獲取值。請注意,如果此 Future 已經完成,則將內聯執行給定的回撥。

我們建議使用 then() 方法,因為它提供了一種在回撥完成後進行同步的方法。add_done_callback 如果回撥不返回任何內容,可能會更高效。但 then()add_done_callback 在底層使用相同的回撥註冊 API。

關於 GPU 張量,此方法與 then() 的行為相同。

引數

callback (Future) – 一個 Callable,它接受一個引數,即此 Future 的引用。

注意

請注意,如果回撥函式丟擲異常(無論是由於原始 Future 以異常完成並呼叫 fut.wait(),還是由於回撥中的其他程式碼),則必須仔細處理錯誤。例如,如果此回撥後來完成了其他 Future,那麼這些 Future 不會被標記為錯誤完成,使用者負責獨立處理這些 Future 的完成/等待。

示例

>>> def callback(fut):
...     print("This will run after the future has finished.")
...     print(fut.wait())
>>> fut = torch.futures.Future()
>>> fut.add_done_callback(callback)
>>> fut.set_result(5)
This will run after the future has finished.
5
done()[source]#

如果此 Future 已完成,則返回 True。如果 Future 具有結果或異常,則為完成。

如果值包含位於 GPU 上的張量,則 Future.done() 將返回 True,即使填充這些張量的非同步核心尚未在裝置上完成執行,因為此時結果已可用,前提是執行適當的同步(參見 wait())。

返回型別

布林值

set_exception(result)[source]#

為此 Future 設定一個異常,這將使此 Future 標記為錯誤完成並觸發所有已附加的回撥。請注意,當呼叫此 Future 上的 wait()/value() 時,此處設定的異常將內聯丟擲。

引數

result (BaseException) – 此 Future 的異常。

示例

>>> fut = torch.futures.Future()
>>> fut.set_exception(ValueError("foo"))
>>> fut.wait()
Traceback (most recent call last):
...
ValueError: foo
set_result(result)[source]#

為此 Future 設定結果,這將使此 Future 標記為完成並觸發所有已附加的回撥。請注意,Future 不能被標記為完成兩次。

如果結果包含位於 GPU 上的張量,即使填充這些張量的非同步核心尚未在裝置上完成執行,也可以呼叫此方法,前提是該方法被呼叫時,那些核心所在的流被設定為當前流。簡單來說,只要在啟動這些核心後立即呼叫此方法,而不更改流,就可以安全地進行,無需任何額外的同步。此方法將在所有相關的當前流上記錄事件,並使用它們來確保此 Future 的所有使用者得到正確的排程。

引數

result (object) – 此 Future 的結果物件。

示例

>>> import threading
>>> import time
>>> def slow_set_future(fut, value):
...     time.sleep(0.5)
...     fut.set_result(value)
>>> fut = torch.futures.Future()
>>> t = threading.Thread(
...     target=slow_set_future,
...     args=(fut, torch.ones(2) * 3)
... )
>>> t.start()
>>> print(fut.wait())
tensor([3., 3.])
>>> t.join()
then(callback)[source]#

將給定的回撥函式附加到此 Future,當 Future 完成時將執行該回調。可以向同一個 Future 新增多個回撥,但不能保證其執行順序(要強制執行特定順序,請考慮鏈式呼叫:fut.then(cb1).then(cb2))。回撥函式必須接受一個引數,即此 Future 的引用。回撥函式可以使用 value() 方法獲取值。請注意,如果此 Future 已經完成,則將立即內聯執行給定的回撥。

如果 Future 的值包含位於 GPU 上的張量,那麼在填充這些張量的非同步核心尚未在裝置上完成執行時,可能會呼叫該回調。但是,將使用一些專用的當前流(從全域性池獲取)來呼叫該回調,這些流將與那些核心同步。因此,回撥在這些張量上執行的任何操作都將在核心完成後在裝置上排程。換句話說,只要回撥不切換流,它就可以安全地操作結果,而無需任何額外的同步。這與 wait() 的非阻塞行為類似。

同樣,如果回撥返回一個包含位於 GPU 上的張量的值,即使生成這些張量的核心仍在裝置上執行,也可以這樣做,只要回撥在其執行期間沒有更改流。如果要更改流,必須小心地將它們與原始流(即回撥被呼叫時是當前流的那些流)重新同步。

引數

callback (Callable) – 一個 Callable,它將此 Future 作為唯一引數。

返回

一個新 Future 物件,它持有 callback 的返回值,並在給定 callback 完成時被標記為完成。

返回型別

Future[S]

注意

請注意,如果回撥函式丟擲異常(無論是由於原始 Future 以異常完成並呼叫 fut.wait(),還是由於回撥中的其他程式碼),那麼由 then 返回的 Future 將被恰當地標記為遇到錯誤。但是,如果此回撥後來完成了其他 Future,那麼這些 Future 不會被標記為錯誤完成,使用者負責獨立處理這些 Future 的完成/等待。

示例

>>> def callback(fut):
...     print(f"RPC return value is {fut.wait()}.")
>>> fut = torch.futures.Future()
>>> # The inserted callback will print the return value when
>>> # receiving the response from "worker1"
>>> cb_fut = fut.then(callback)
>>> chain_cb_fut = cb_fut.then(
...     lambda x : print(f"Chained cb done. {x.wait()}")
... )
>>> fut.set_result(5)
RPC return value is 5.
Chained cb done. None
value()[source]#

獲取已完成 Future 的值。

此方法只能在呼叫 wait() 完成後,或在傳遞給 then() 的回撥函式內部呼叫。在其他情況下,此 Future 可能尚未持有值,呼叫 value() 可能會失敗。

如果值包含位於 GPU 上的張量,那麼此方法將*不*執行任何額外的同步。這應該事先單獨透過呼叫 wait() 來完成(回撥函式內部除外,對於回撥函式,then() 已經處理了)。

返回

Future 持有的值。如果建立該值的方法(回撥或 RPC)丟擲了錯誤,那麼此 value() 方法也將丟擲錯誤。

返回型別

T

wait()[source]#

阻塞直到此 Future 的值準備就緒。

如果值包含位於 GPU 上的張量,那麼將與可能正在非同步填充這些張量的核心(在裝置上執行)進行額外的同步。這種同步是非阻塞的,這意味著 wait() 會在當前流中插入必要的指令,以確保在當前流上排隊的後續操作在非同步核心之後正確排程,但完成這些操作後,wait() 將返回,即使那些核心仍在執行。只要不更改流,訪問和使用這些值就不需要進一步的同步。

返回

Future 持有的值。如果建立該值的方法(回撥或 RPC)丟擲了錯誤,那麼此 wait 方法也將丟擲錯誤。

返回型別

T

torch.futures.collect_all(futures)[source]#

將提供的 Future 物件收集到一個單獨的合併 Future 中,當所有子 Future 都完成後,該 Future 將被標記為完成。

引數

futures (list) – 一個 Future 物件列表。

返回

返回一個 Future 物件,該物件表示傳遞的 Future 列表。

返回型別

Future[list[torch.jit.Future]]

示例:
>>> fut0 = torch.futures.Future()
>>> fut1 = torch.futures.Future()
>>> fut = torch.futures.collect_all([fut0, fut1])
>>> fut0.set_result(0)
>>> fut1.set_result(1)
>>> fut_list = fut.wait()
>>> print(f"fut0 result = {fut_list[0].wait()}")
fut0 result = 0
>>> print(f"fut1 result = {fut_list[1].wait()}")
fut1 result = 1
torch.futures.wait_all(futures)[source]#

等待所有提供的 Future 完成,並返回已完成值的列表。如果任何 Future 遇到錯誤,該方法將提前退出並報告錯誤,而不會等待其他 Future 完成。

引數

futures (list) – 一個 Future 物件列表。

返回

已完成 Future 結果的列表。如果對任何 Future 呼叫 wait 時丟擲錯誤,則此方法將丟擲錯誤。

返回型別

列表