分散式 RPC 框架#
創建於:2019 年 11 月 14 日 | 最後更新於:2025 年 07 月 09 日
分散式 RPC 框架提供了一種透過一系列原始介面實現多機模型訓練的機制,這些介面允許遠端通訊,並提供了一個更高級別的 API,可以自動區分分佈在多臺機器上的模型。
警告
RPC 包中的 API 是穩定的,並且處於維護模式。
警告
CUDA 支援是一個Beta功能。RPC 包並非所有功能都與 CUDA 支援相容,因此不建議使用它們。這些不支援的功能包括:RRefs、JIT 相容性、分散式自動微分和分散式最佳化器以及效能分析。有關分散式訓練所有功能的簡要介紹,請參閱PyTorch 分散式概述 <https://pytorch.com.tw/tutorials/beginner/dist_overview.html>__。
注意
請參閱 PyTorch 分散式概述 <https://pytorch.com.tw/tutorials/beginner/dist_overview.html>__ 以獲取有關分散式訓練所有功能的簡要介紹。
基礎#
分散式 RPC 框架可以輕鬆地遠端執行函式,支援在不復制實際資料的情況下引用遠端物件,並提供 autograd 和 optimizer API,以便在 RPC 邊界之間透明地執行反向傳播和更新引數。這些功能可以分為四組 API。
遠端過程呼叫 (RPC) 支援在指定的遠端工作節點上使用給定引數執行函式,並獲取返回值或建立返回值的引用。有三個主要的 RPC API:
rpc_sync()(同步)、rpc_async()(非同步)和remote()(非同步並返回遠端返回值引用)。如果使用者程式碼在沒有返回值的情況下無法繼續執行,請使用同步 API。否則,請使用非同步 API 獲取一個 future,並在需要返回值時等待該 future。當需要遠端建立某物但永遠不需要將其獲取到呼叫方時,remote()API 非常有用。例如,假設一個驅動程序正在設定一個引數伺服器和一個訓練器。驅動程式可以在引數伺服器上建立一個嵌入表,然後將該表引用共享給訓練器,但驅動程式本身永遠不會在本地使用該嵌入表。在這種情況下,rpc_sync()和rpc_async()不再適用,因為它們始終意味著返回值將立即或在未來返回給呼叫方。遠端引用 (RRef) 用作本地或遠端物件的分散式共享指標。它可以與其他工作節點共享,並且引用計數將得到透明處理。每個 RRef 只有一個所有者,物件僅存在於該所有者處。持有 RRef 的非所有者工作節點可以透過顯式請求來獲取該物件的副本。當某個工作節點需要訪問某個資料物件,但該節點本身既不是建立者(remote() 的呼叫者)也不是該物件的所有者時,這非常有用。如下文將討論的分散式最佳化器就是此類用例之一。
分散式自動微分 (Distributed Autograd) 將所有參與前向傳播的工作節點上的本地自動微分引擎縫合在一起,並在反向傳播過程中自動訪問它們以計算梯度。如果前向傳播需要跨多臺機器進行,例如分散式模型並行訓練、引數伺服器訓練等,這尤其有幫助。有了此功能,使用者程式碼不再需要擔心如何跨 RPC 邊界傳送梯度以及應以何種順序啟動本地自動微分引擎,這在存在巢狀和相互依賴的 RPC 呼叫時可能會非常複雜。
分散式最佳化器 (Distributed Optimizer) 的建構函式接受一個
Optimizer()(例如SGD()、Adagrad()等)和一個引數 RRef 列表,在每個不同的 RRef 所有者上建立一個Optimizer()例項,並在執行step()時相應地更新引數。當您擁有分散式前向和後向傳播時,引數和梯度將分散在多個工作節點上,因此需要每個參與工作節點上的最佳化器。分散式最佳化器將所有這些本地最佳化器包裝在一個之中,並提供簡潔的建構函式和step()API。
RPC#
在使用 RPC 和分散式自動微分原語之前,必須進行初始化。要初始化 RPC 框架,我們需要使用 init_rpc(),它將初始化 RPC 框架、RRef 框架和分散式自動微分。
- torch.distributed.rpc.init_rpc(name, backend=None, rank=-1, world_size=None, rpc_backend_options=None)[source]#
初始化 RPC 原語,例如本地 RPC 代理和分散式自動微分,這會立即使當前程序準備好傳送和接收 RPC。
- 引數
name (str) – 此節點的全域性唯一名稱。(例如,
Trainer3、ParameterServer2、Master、Worker1)名稱只能包含數字、字母、下劃線、冒號和/或連字元,且長度必須小於 128 個字元。backend (BackendType, optional) – RPC 後端實現的型別。支援的值為
BackendType.TENSORPIPE(預設值)。有關更多資訊,請參閱 後。rank (int) – 此節點的全域性唯一 ID/rank。
world_size (int) – 組中的工作節點數量。
rpc_backend_options (RpcBackendOptions, optional) – 傳遞給 RpcAgent 建構函式的選項。它必須是
RpcBackendOptions的代理特定子類,幷包含代理特定的初始化配置。預設情況下,對於所有代理,它將預設超時設定為 60 秒,並使用使用init_method = "env://"初始化的底層程序組進行協調。這意味著需要正確設定環境變數MASTER_ADDR和MASTER_PORT。有關更多資訊,請參閱 後端,並查詢可用的選項。
以下 API 允許使用者遠端執行函式以及建立遠端資料物件的引用(RRefs)。在這些 API 中,當將 Tensor 作為引數或返回值傳遞時,目標工作節點將嘗試建立一個具有相同元資料(即形狀、步幅等)的 Tensor。我們故意禁止傳輸 CUDA 張量,因為如果源工作節點和目標工作節點上的裝置列表不匹配,可能會導致崩潰。在這種情況下,應用程式始終可以顯式地將輸入張量移至呼叫方的 CPU,並在必要時將它們移至被呼叫方的所需裝置。
- torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None, timeout=-1.0)[source]#
執行一個阻塞式 RPC 呼叫,在工作節點
to上執行函式func。RPC 訊息與 Python 程式碼的執行並行傳送和接收。此方法是執行緒安全的。- 引數
to (str 或 WorkerInfo 或 int) – 目標工作節點的名稱/rank/
WorkerInfo。func (Callable) – 一個可呼叫函式,例如 Python 可呼叫物件、內建運算子(例如
add())和註解的 TorchScript 函式。args (tuple) – 呼叫
func的引數元組。kwargs (dict) – 呼叫
func的關鍵字引數字典。timeout (float, optional) – 此 RPC 的超時時間(秒)。如果 RPC 在此時間內未完成,將引發指示超時的異常。值為 0 表示無限超時,即永遠不會引發超時錯誤。如果未提供,將使用初始化期間或使用
_set_rpc_timeout設定的預設值。
- 返回
返回執行
func並帶引數args和kwargs的結果。
- 示例:
請確保在兩個工作節點上都正確設定了
MASTER_ADDR和MASTER_PORT。有關更多詳細資訊,請參閱init_process_group()API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然後,在兩個不同的程序中執行以下程式碼
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
下面是一個使用 RPC 執行 TorchScript 函式的示例。
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None, timeout=-1.0)[source]#
執行一個非阻塞 RPC 呼叫,在工作節點
to上執行函式func。RPC 訊息與 Python 程式碼的執行並行傳送和接收。此方法是執行緒安全的。此方法將立即返回一個可以等待的Future。- 引數
to (str 或 WorkerInfo 或 int) – 目標工作節點的名稱/rank/
WorkerInfo。func (Callable) – 一個可呼叫函式,例如 Python 可呼叫物件、內建運算子(例如
add())和註解的 TorchScript 函式。args (tuple) – 呼叫
func的引數元組。kwargs (dict) – 呼叫
func的關鍵字引數字典。timeout (float, optional) – 此 RPC 的超時時間(秒)。如果 RPC 在此時間內未完成,將引發指示超時的異常。值為 0 表示無限超時,即永遠不會引發超時錯誤。如果未提供,將使用初始化期間或使用
_set_rpc_timeout設定的預設值。
- 返回
返回一個可以等待的
Future物件。當完成時,可以在Future物件中檢索在args和kwargs上執行func的返回值。
警告
不支援使用 GPU 張量作為
func的引數或返回值,因為我們不支援透過網路傳輸 GPU 張量。在使用它們作為func的引數或返回值之前,您需要顯式地將 GPU 張量複製到 CPU。警告
rpc_asyncAPI 在傳送張量引數到線路上之前不會複製它們的儲存,這可能由不同的執行緒完成,具體取決於 RPC 後端型別。呼叫方應確保在返回的Future完成之前,這些張量的內容保持不變。- 示例:
請確保在兩個工作節點上都正確設定了
MASTER_ADDR和MASTER_PORT。有關更多詳細資訊,請參閱init_process_group()API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然後,在兩個不同的程序中執行以下程式碼
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3)) >>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2)) >>> result = fut1.wait() + fut2.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
下面是一個使用 RPC 執行 TorchScript 函式的示例。
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3)) >>> ret = fut.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.remote(to, func, args=None, kwargs=None, timeout=-1.0)[source]#
執行遠端呼叫,在工作節點
to上執行func,並立即返回結果值的RRef。工作節點to將是返回的RRef的所有者,而呼叫remote的工作節點是使用者。所有者管理其RRef的全域性引用計數,並且所有者RRef僅在其不再有任何全域性引用時才會被銷燬。- 引數
to (str 或 WorkerInfo 或 int) – 目標工作節點的名稱/rank/
WorkerInfo。func (Callable) – 一個可呼叫函式,例如 Python 可呼叫物件、內建運算子(例如
add())和註解的 TorchScript 函式。args (tuple) – 呼叫
func的引數元組。kwargs (dict) – 呼叫
func的關鍵字引數字典。timeout (float, optional) – 此遠端呼叫的超時時間(秒)。如果在該超時時間內
to節點上成功處理此RRef的建立,則下次嘗試使用 RRef(例如to_here())時,將引發指示此失敗的超時錯誤。值為 0 表示無限超時,即永遠不會引發超時錯誤。如果未提供,將使用初始化期間或使用_set_rpc_timeout設定的預設值。
- 返回
使用者
RRef例項到結果值。使用阻塞 APItorch.distributed.rpc.RRef.to_here()在本地檢索結果值。
警告
remoteAPI 在傳送張量引數到線路上之前不會複製它們的儲存,這可能由不同的執行緒完成,具體取決於 RPC 後端型別。呼叫方應確保在返回的 RRef 被所有者確認之前,這些張量的內容保持不變,這可以使用torch.distributed.rpc.RRef.confirmed_by_owner()API 進行檢查。警告
諸如
remoteAPI 的超時之類的錯誤是盡力而為處理的。這意味著當remote啟動的遠端呼叫失敗(例如,超時錯誤)時,我們會採取盡力而為的方法來處理錯誤。這意味著錯誤是非同步處理和設定在結果 RRef 上的。如果 RRef 在此處理之前尚未被應用程式使用(例如to_here或 fork 呼叫),那麼未來使用 RRef 將適當地引發錯誤。但是,使用者應用程式可能會在錯誤處理之前使用 RRef。在這種情況下,錯誤可能不會引發,因為它們尚未得到處理。示例
Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly on both workers. Refer to :meth:`~torch.distributed.init_process_group` API for more details. For example, export MASTER_ADDR=localhost export MASTER_PORT=5678 Then run the following code in two different processes: >>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >>> x = rref1.to_here() + rref2.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() Below is an example of running a TorchScript function using RPC. >>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar) >>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rref.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.get_worker_info(worker_name=None)[source]#
獲取給定工作節點名稱的
WorkerInfo。使用此WorkerInfo以避免每次呼叫時傳遞昂貴的字串。- 引數
worker_name (str) – 工作節點的字串名稱。如果為
None,則返回當前工作節點的 ID。(預設None)- 返回
給定
worker_name的WorkerInfo例項,或者如果worker_name為None,則為當前工作節點的WorkerInfo。
- torch.distributed.rpc.shutdown(graceful=True, timeout=0)[source]#
執行 RPC 代理的關閉,然後銷燬 RPC 代理。這會阻止本地代理接受未完成的請求,並透過終止所有 RPC 執行緒來關閉 RPC 框架。如果
graceful=True,此方法將阻塞,直到所有本地和遠端 RPC 程序都到達此方法並等待所有未完成的工作完成。否則,如果graceful=False,則為本地關閉,並且不會等待其他 RPC 程序到達此方法。警告
對於
Future物件,由rpc_async()返回,不應在shutdown()之後呼叫future.wait()。- 引數
graceful (bool) – 是否執行優雅關閉。如果為 True,將 1) 等待直到沒有待處理的
UserRRefs系統訊息並刪除它們;2) 阻塞直到所有本地和遠端 RPC 程序都到達此方法並等待所有未完成的工作完成。
- 示例:
請確保在兩個工作節點上都正確設定了
MASTER_ADDR和MASTER_PORT。有關更多詳細資訊,請參閱init_process_group()API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然後,在兩個不同的程序中執行以下程式碼
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> # do some work >>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1)) >>> # ready to shutdown >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> # wait for worker 0 to finish work, and then shutdown. >>> rpc.shutdown()
- class torch.distributed.rpc.WorkerInfo#
封裝系統中工作節點資訊的結構。包含工作節點的名稱和 ID。此類不應直接例項化,而是可以透過
get_worker_info()獲取例項,並將結果傳遞給rpc_sync()、rpc_async()、remote()等函式,以避免每次呼叫時複製字串。- property id#
用於標識工作節點的全域性唯一 ID。
- property name#
工作節點的名稱。
RPC 包還提供了裝飾器,允許應用程式指定給定函式在被呼叫方應如何處理。
- torch.distributed.rpc.functions.async_execution(fn)[source]#
一個函式裝飾器,表示函式的返回值保證是
Future物件,並且該函式可以在 RPC 被呼叫方上非同步執行。更具體地說,被呼叫方提取被包裝函式返回的Future,並將後續處理步驟安裝為該Future的回撥。安裝的回撥將在Future完成後從中讀取值,並將該值作為 RPC 響應傳送。這意味著返回的Future僅存在於被呼叫方,並且永不透過 RPC 傳送。當被包裝函式(fn)的執行需要暫停和恢復時,此裝飾器非常有用,例如,當它包含rpc_async()或等待其他訊號時。注意
要啟用非同步執行,應用程式必須將此裝飾器返回的函式物件傳遞給 RPC API。如果 RPC 檢測到此裝飾器安裝的屬性,它將知道該函式返回
Future物件,並將相應地進行處理。但是,這並不意味著在定義函式時此裝飾器必須是最外層的。例如,當與@staticmethod或@classmethod結合使用時,@rpc.functions.async_execution需要成為內部裝飾器,以便目標函式被識別為靜態或類函式。此目標函式仍然可以非同步執行,因為在訪問時,靜態或類方法會保留@rpc.functions.async_execution安裝的屬性。- 示例:
返回的
Future物件可以來自rpc_async()、then()或Future建構函式。下面的示例展示了直接使用then()返回的Future。>>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @rpc.functions.async_execution >>> def async_add_chained(to, x, y, z): >>> # This function runs on "worker1" and returns immediately when >>> # the callback is installed through the `then(cb)` API. In the >>> # mean time, the `rpc_async` to "worker2" can run concurrently. >>> # When the return value of that `rpc_async` arrives at >>> # "worker1", "worker1" will run the lambda function accordingly >>> # and set the value for the previously returned `Future`, which >>> # will then trigger RPC to send the result back to "worker0". >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add_chained, >>> args=("worker2", torch.ones(2), 1, 1) >>> ) >>> print(ret) # prints tensor([3., 3.])
與 TorchScript 裝飾器結合使用時,此裝飾器必須是最外層的。
>>> from torch import Tensor >>> from torch.futures import Future >>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @torch.jit.script >>> def script_add(x: Tensor, y: Tensor) -> Tensor: >>> return x + y >>> >>> @rpc.functions.async_execution >>> @torch.jit.script >>> def async_add(to: str, x: Tensor, y: Tensor) -> Future[Tensor]: >>> return rpc.rpc_async(to, script_add, (x, y)) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add, >>> args=("worker2", torch.ones(2), 1) >>> ) >>> print(ret) # prints tensor([2., 2.])
與靜態方法或類方法結合使用時,此裝飾器必須是內部裝飾器。
>>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> class AsyncExecutionClass: >>> >>> @staticmethod >>> @rpc.functions.async_execution >>> def static_async_add(to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> @classmethod >>> @rpc.functions.async_execution >>> def class_async_add(cls, to, x, y, z): >>> ret_fut = torch.futures.Future() >>> rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: ret_fut.set_result(fut.wait() + z) >>> ) >>> return ret_fut >>> >>> @rpc.functions.async_execution >>> def bound_async_add(self, to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.static_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.]) >>> >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.class_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.])
此裝飾器也適用於 RRef 助手,即 .
torch.distributed.rpc.RRef.rpc_sync()、torch.distributed.rpc.RRef.rpc_async()和torch.distributed.rpc.RRef.remote()。>>> from torch.distributed import rpc >>> >>> # reuse the AsyncExecutionClass class above >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_sync().static_async_add("worker2", torch.ones(2), 1, 2) >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_async().static_async_add("worker2", torch.ones(2), 1, 2).wait() >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.remote().static_async_add("worker2", torch.ones(2), 1, 2).to_here() >>> print(ret) # prints tensor([4., 4.])
後端#
RPC 模組可以利用不同的後端在節點之間執行通訊。可以在 init_rpc() 函式中指定要使用的後端,方法是傳遞 BackendType 列舉的特定值。無論使用哪個後端,其餘的 RPC API 都不會改變。每個後端還定義了自己的 RpcBackendOptions 類子類,其例項也可以傳遞給 init_rpc() 以配置後端行為。
- class torch.distributed.rpc.BackendType(value)#
可用後端列舉類。
PyTorch 提供了內建的
BackendType.TENSORPIPE後端。可以使用register_backend()函式註冊其他後端。
- class torch.distributed.rpc.RpcBackendOptions#
封裝傳遞給 RPC 後端的選項的抽象結構。此類的例項可以傳遞給
init_rpc(),以便使用特定配置初始化 RPC,例如 RPC 超時和要使用的init_method。- property init_method#
用於初始化程序組的 URL。預設為
env://
- property rpc_timeout#
一個浮點數,表示所有 RPC 的超時時間。如果 RPC 在此時間範圍內未完成,它將以指示超時的異常完成。
TensorPipe 後端#
TensorPipe 代理是預設的,它利用 TensorPipe 庫,該庫提供了本地點對點通訊原語,專門適用於機器學習,從根本上解決了 Gloo 的一些侷限性。與 Gloo 相比,它的優點是非同步的,允許大量傳輸同時發生,每個傳輸以自己的速度進行,而不會互相阻塞。它僅在需要時按需在節點對之間開啟管道,並且當一個節點失敗時,只有其相關的管道會被關閉,而所有其他管道將正常工作。此外,它能夠支援多種不同的傳輸(當然是 TCP,但也有共享記憶體、NVLink、InfiniBand 等),並可以自動檢測它們的可用性並協商為每個管道使用的最佳傳輸。
TensorPipe 後端帶有基於 TCP 的傳輸,就像 Gloo 一樣。它還能夠自動分塊和多路複用大張量到多個套接字和執行緒,以實現非常高的頻寬。代理將能夠自行選擇最佳傳輸,無需任何干預。
示例
import os
from torch.distributed import rpc
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
rpc.init_rpc(
"worker1",
rank=0,
world_size=2,
rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
num_worker_threads=8,
rpc_timeout=20 # 20 second timeout
)
)
# omitting init_rpc invocation on worker2
- class torch.distributed.rpc.TensorPipeRpcBackendOptions(*, num_worker_threads=16, rpc_timeout=60.0, init_method='env://', device_maps=None, devices=None, _transports=None, _channels=None)[source]#
TensorPipeAgent的後端選項,繼承自RpcBackendOptions。- 引數
num_worker_threads (int, optional) –
TensorPipeAgent用於執行請求的執行緒池中的執行緒數(預設值:16)。rpc_timeout (float, optional) – RPC 請求的預設超時時間(秒)(預設值:60 秒)。如果在該時間內 RPC 未完成,將引發相應的異常。呼叫者可以在
rpc_sync()和rpc_async()中為單個 RPC 覆蓋此超時時間(如有必要)。init_method (str, optional) – 用於初始化用於協調的分散式儲存的 URL。它可以接受
init_process_group()的相同引數的任何值(預設值:env://)。device_maps (Dict[str, Dict], optional) – 從此工作節點到被呼叫方的裝置放置對映。鍵是被呼叫方的工作節點名稱,值是字典(
Dictofint、str或torch.device),將此工作節點的裝置對映到被呼叫方工作節點的裝置。(預設值:None)devices (List[int, str, 或
torch.device], optional) – RPC 代理使用的所有本地 CUDA 裝置。預設情況下,它將初始化為來自其自身device_maps的所有本地裝置以及來自其對等方device_maps的相應裝置。在處理 CUDA RPC 請求時,代理將正確同步此List中所有裝置的 CUDA 流。
- property device_maps#
裝置對映位置。
- property devices#
本地代理使用的所有裝置。
- property init_method#
用於初始化程序組的 URL。預設為
env://
- property num_worker_threads#
TensorPipeAgent用於執行請求的執行緒池中的執行緒數。
- property rpc_timeout#
一個浮點數,表示所有 RPC 的超時時間。如果 RPC 在此時間範圍內未完成,它將以指示超時的異常完成。
- set_device_map(to, device_map)[source]#
設定每個 RPC 呼叫方和被呼叫方對之間的裝置對映。此函式可以多次呼叫以增量新增裝置放置配置。
- 引數
to (str) – 被呼叫方名稱。
device_map (Dict of int, str, or torch.device) – 從此工作節點到被呼叫方的裝置放置對映。此對映必須是可逆的。
示例
>>> # both workers >>> def add(x, y): >>> print(x) # tensor([1., 1.], device='cuda:1') >>> return x + y, (x + y).to(2) >>> >>> # on worker 0 >>> options = TensorPipeRpcBackendOptions( >>> num_worker_threads=8, >>> device_maps={"worker1": {0: 1}} >>> # maps worker0's cuda:0 to worker1's cuda:1 >>> ) >>> options.set_device_map("worker1", {1: 2}) >>> # maps worker0's cuda:1 to worker1's cuda:2 >>> >>> rpc.init_rpc( >>> "worker0", >>> rank=0, >>> world_size=2, >>> backend=rpc.BackendType.TENSORPIPE, >>> rpc_backend_options=options >>> ) >>> >>> x = torch.ones(2) >>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1)) >>> # The first argument will be moved to cuda:1 on worker1. When >>> # sending the return value back, it will follow the invert of >>> # the device map, and hence will be moved back to cuda:0 and >>> # cuda:1 on worker0 >>> print(rets[0]) # tensor([2., 2.], device='cuda:0') >>> print(rets[1]) # tensor([2., 2.], device='cuda:1')
- set_devices(devices)[source]#
設定 TensorPipe RPC 代理使用的本地裝置。在處理 CUDA RPC 請求時,TensorPipe RPC 代理將正確同步此
List中所有裝置的 CUDA 流。- 引數
devices (List of int, str, or torch.device) – TensorPipe RPC 代理使用的本地裝置。
注意
RPC 框架不會自動重試任何 rpc_sync()、rpc_async() 和 remote() 呼叫。原因是 RPC 框架無法確定操作是否是冪等的,以及是否可以安全地重試。因此,應用程式有責任處理故障並根據需要重試。RPC 通訊基於 TCP,因此故障可能由網路故障或間歇性網路連線問題引起。在這種情況下,應用程式需要以合理的退避機制進行適當重試,以確保網路不會因激進的重試而過載。
RRef#
警告
使用 CUDA 張量時,目前不支援 RRefs。
RRef(遠端引用)是對遠端工作節點上某個型別 T(例如 Tensor)的值的引用。此控制代碼會使所有者處的被引用遠端值保持活動狀態,但並不意味著該值將來會被傳輸到本地工作節點。RRefs 可用於多機訓練,透過持有存在於其他工作節點上的 nn.Modules 的引用,並在訓練期間呼叫適當的函式來檢索或修改其引數。有關更多詳細資訊,請參閱 遠端引用協議。
- class torch.distributed.rpc.PyRRef(RRef)#
封裝遠端工作節點上某個型別的值的引用的類。此控制代碼將使被引用的遠端值在工作節點上保持活動狀態。
UserRRef將在以下情況下被刪除:1) 應用程式程式碼和本地 RRef 上下文中不再有對其的引用;或 2) 應用程式已呼叫了優雅關機。呼叫已刪除 RRef 上的方法會導致未定義行為。RRef 實現僅提供盡力錯誤檢測,並且應用程式不應在rpc.shutdown()之後使用UserRRefs。警告
RRefs 只能由 RPC 模組序列化和反序列化。在沒有 RPC 的情況下序列化和反序列化 RRefs(例如,Python pickle、torch
save()/load()、JITsave()/load()等)將導致錯誤。- 引數
value (object) – 要由此 RRef 包裝的值。
type_hint (Type, optional) – 應傳遞給
TorchScript編譯器作為value的型別提示的 Python 型別。
- 示例:
以下示例為簡潔起見省略了 RPC 初始化和關閉程式碼。有關這些詳細資訊,請參閱 RPC 文件。
使用 rpc.remote 建立 RRef
>>> import torch >>> import torch.distributed.rpc as rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> # get a copy of value from the RRef >>> x = rref.to_here()
從本地物件建立 RRef
>>> import torch >>> from torch.distributed.rpc import RRef >>> x = torch.zeros(2, 2) >>> rref = RRef(x)
與其它工作節點共享 RRef
>>> # On both worker0 and worker1: >>> def f(rref): >>> return rref.to_here() + 1
>>> # On worker0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch.distributed.rpc import RRef >>> rref = RRef(torch.zeros(2, 2)) >>> # the following RPC shares the rref with worker1, reference >>> # count is automatically updated. >>> rpc.rpc_sync("worker1", f, args=(rref,))
- backward(self: torch._C._distributed_rpc.PyRRef, dist_autograd_ctx_id: SupportsInt = -1, retain_graph: bool = False) None#
使用 RRef 作為反向傳播的根節點執行反向傳播。如果提供了
dist_autograd_ctx_id,我們將使用提供的 ctx_id 從 RRef 的所有者開始執行分散式反向傳播。在這種情況下,應使用get_gradients()來檢索梯度。如果dist_autograd_ctx_id為None,則假定這是一個本地自動微分圖,我們只執行本地反向傳播。在本地情況下,呼叫此 API 的節點必須是 RRef 的所有者。RRef 的值預計是標量 Tensor。- 引數
- 示例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> rref.backward(context_id)
- confirmed_by_owner(self: torch._C._distributed_rpc.PyRRef) bool#
返回此
RRef是否已由所有者確認。OwnerRRef始終返回 true,而UserRRef僅在所有者瞭解此UserRRef時才返回 true。
- is_owner(self: torch._C._distributed_rpc.PyRRef) bool#
返回當前節點是否是此
RRef的所有者。
- local_value(self: torch._C._distributed_rpc.PyRRef) object#
如果當前節點是所有者,則返回本地值的引用。否則,丟擲異常。
- owner(self: torch._C._distributed_rpc.PyRRef) torch._C._distributed_rpc.WorkerInfo#
返回擁有此
RRef的節點的工作節點資訊。
- owner_name(self: torch._C._distributed_rpc.PyRRef) str#
返回擁有此
RRef的節點的工作節點名稱。
- remote(self: torch._C._distributed_rpc.PyRRef, timeout: SupportsFloat = -1.0) object#
建立一個輔助代理,以便輕鬆啟動一個
remote,並使用 RRef 的所有者作為目標,在該物件上執行函式。更具體地說,rref.remote().func_name(*args, **kwargs)與以下內容相同>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.remote(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 引數
timeout (float, optional) –
rref.remote()的超時時間。如果在超時時間內未能成功建立此RRef,則下次嘗試使用 RRef(例如to_here)時將引發超時。如果未提供,將使用預設的 RPC 超時。有關RRef的具體超時語義,請參閱rpc.remote()。
- 示例:
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.remote().size().to_here() # returns torch.Size([2, 2]) >>> rref.remote().view(1, 4).to_here() # returns tensor([[1., 1., 1., 1.]])
- rpc_async(self: torch._C._distributed_rpc.PyRRef, timeout: SupportsFloat = -1.0) object#
建立一個輔助代理,以便輕鬆啟動一個
rpc_async,並使用 RRef 的所有者作為目標,在該物件上執行函式。更具體地說,rref.rpc_async().func_name(*args, **kwargs)與以下內容相同>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_async(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 引數
timeout (float, optional) –
rref.rpc_async()的超時時間。如果在該時間範圍內呼叫未完成,將引發指示此情況的異常。如果未提供此引數,將使用預設的 RPC 超時。
- 示例:
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_async().size().wait() # returns torch.Size([2, 2]) >>> rref.rpc_async().view(1, 4).wait() # returns tensor([[1., 1., 1., 1.]])
- rpc_sync(self: torch._C._distributed_rpc.PyRRef, timeout: SupportsFloat = -1.0) object#
建立一個輔助代理,以便輕鬆啟動一個
rpc_sync,並使用 RRef 的所有者作為目標,在該物件上執行函式。更具體地說,rref.rpc_sync().func_name(*args, **kwargs)與以下內容相同>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_sync(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 引數
timeout (float, optional) –
rref.rpc_sync()的超時時間。如果在該時間範圍內呼叫未完成,將引發指示此情況的異常。如果未提供此引數,將使用預設的 RPC 超時。
- 示例:
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_sync().size() # returns torch.Size([2, 2]) >>> rref.rpc_sync().view(1, 4) # returns tensor([[1., 1., 1., 1.]])
- to_here(self: torch._C._distributed_rpc.PyRRef, timeout: SupportsFloat = -1.0) object#
阻塞呼叫,將 RRef 的值從所有者複製到本地節點並返回。如果當前節點是所有者,則返回本地值的引用。
- 引數
timeout (float, optional) –
to_here的超時時間。如果在該時間範圍內呼叫未完成,將引發指示此情況的異常。如果未提供此引數,將使用預設的 RPC 超時(60 秒)。
RemoteModule#
警告
目前在使用 CUDA 張量時不支援 RemoteModule
RemoteModule 是在不同程序中遠端建立 nn.Module 的簡便方法。實際模組駐留在遠端主機上,但本地主機擁有該模組的控制代碼,並可以像普通 nn.Module 一樣呼叫該模組。呼叫會產生到遠端端的 RPC 呼叫,並且可以透過 RemoteModule 支援的附加 API 非同步執行。
- class torch.distributed.nn.api.remote_module.RemoteModule(*args, **kwargs)[source]#
只有在 RPC 初始化後才能建立 RemoteModule 例項。
它會在指定的遠端節點上建立一個使用者指定的模組。它的行為類似於普通的
nn.Module,不同之處在於forward方法在遠端節點上執行。它負責 autograd 記錄,以確保反向傳播將梯度傳播回相應的遠端模組。它根據
module_cls的forward方法的簽名,生成兩個方法:forward_async和forward。forward_async非同步執行並返回一個 Future。forward_async和forward的引數與module_cls返回的模組的forward方法相同。例如,如果
module_cls返回一個nn.Linear的例項,其forward方法簽名如下:def forward(input: Tensor) -> Tensor:,則生成的RemoteModule將具有 2 個方法,簽名如下:def forward(input: Tensor) -> Tensor:def forward_async(input: Tensor) -> Future[Tensor]:- 引數
remote_device (str) – 我們希望將此模組放置在目標工作程序上的裝置。格式應為“<workername>/<device>”,其中 device 欄位可以解析為 torch.device 型別。例如,“trainer0/cpu”、“trainer0”、“ps0/cuda:0”。此外,device 欄位是可選的,預設值為“cpu”。
module_cls (nn.Module) –
要遠端建立的模組的類。例如,
>>> class MyModule(nn.Module): >>> def forward(input): >>> return input + 1 >>> >>> module_cls = MyModule
args (Sequence, optional) – 要傳遞給
module_cls的引數。kwargs (Dict, optional) – 要傳遞給
module_cls的關鍵字引數。
- 返回
一個遠端模組例項,它包裝了由使用者提供的
module_cls建立的Module,它具有一個阻塞的forward方法和一個非同步的forward_async方法,該方法返回遠端端使用者提供的模組上forward呼叫的 Future。
- 示例:
在兩個不同的程序中執行以下程式碼
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch import nn, Tensor >>> from torch.distributed.nn.api.remote_module import RemoteModule >>> >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> remote_linear_module = RemoteModule( >>> "worker1/cpu", nn.Linear, args=(20, 30), >>> ) >>> input = torch.randn(128, 20) >>> ret_fut = remote_linear_module.forward_async(input) >>> ret = ret_fut.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch >>> import torch.distributed.rpc as rpc >>> >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
此外,可以在此 教程 中找到與 DistributedDataParallel (DDP) 結合使用的更實用的示例。
- remote_parameters(recurse=True)[source]#
返回指向遠端模組引數的
RRef列表。這通常可以與
DistributedOptimizer結合使用。- 引數
recurse (bool) – 如果為 True,則返回遠端模組及其所有子模組的引數。否則,僅返回直接屬於遠端模組的引數。
- 返回
指向遠端模組引數的
RRef列表(List[RRef[nn.Parameter]])。- 返回型別
list[torch.distributed.rpc.api.RRef[torch.nn.parameter.Parameter]]
分散式自動微分框架#
警告
目前在使用 CUDA 張量時不支援分散式自動微分
此模組提供了一個基於 RPC 的分散式自動微分框架,可用於模型並行訓練等應用。簡而言之,應用程式可以透過 RPC 傳送和接收梯度記錄張量。在正向傳播中,我們記錄梯度記錄張量何時透過 RPC 傳送,在反向傳播過程中,我們利用此資訊透過 RPC 執行分散式反向傳播。有關更多詳細資訊,請參閱 分散式自動微分設計。
- torch.distributed.autograd.backward(context_id: int, roots: List[Tensor], retain_graph=False) None#
使用提供的根節點啟動分散式反向傳播。目前,它實現了 FAST 模式演算法,該演算法假定在不同工作節點之間同一分散式自動微分上下文中的所有 RPC 訊息都將作為自動微分圖的一部分在反向傳播過程中進行處理。
我們使用提供的根節點來發現自動微分圖並計算適當的依賴項。此方法將阻塞直到整個自動微分計算完成。
我們在每個節點上將梯度累積到適當的
torch.distributed.autograd.context中。要使用的自動微分上下文是根據呼叫torch.distributed.autograd.backward()時傳入的context_id查詢的。如果不存在與給定 ID 對應的有效自動微分上下文,則會引發錯誤。您可以使用get_gradients()API 來檢索累積的梯度。- 引數
- 示例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> pred = model.forward() >>> loss = loss_func(pred, loss) >>> dist_autograd.backward(context_id, loss)
- class torch.distributed.autograd.context[source]#
用於包裝正向和反向傳播的上下文物件,當使用分散式自動微分時。
with語句中生成的context_id對於在所有工作節點上唯一標識分散式反向傳播至關重要。每個工作節點都儲存與此context_id關聯的元資料,這對於正確執行分散式自動微分傳播至關重要。- 示例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum() >>> dist_autograd.backward(context_id, [loss])
- torch.distributed.autograd.get_gradients(context_id: int) Dict[Tensor, Tensor]#
檢索一個對映,該對映將張量與其在提供的上下文(對應於給定
context_id)中作為分散式自動微分反向傳播一部分累積的相應梯度關聯起來。- 引數
context_id (int) – 應從中檢索梯度的自動微分上下文 ID。
- 返回
一個對映,其中鍵是張量,值是該張量相關的梯度。
- 示例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = t1 + t2 >>> dist_autograd.backward(context_id, [loss.sum()]) >>> grads = dist_autograd.get_gradients(context_id) >>> print(grads[t1]) >>> print(grads[t2])
有關 RPC 自動微分的更多資訊
分散式最佳化器#
有關分散式最佳化器的文件,請參閱 torch.distributed.optim 頁面。
設計說明#
分散式自動微分設計說明涵蓋了基於 RPC 的分散式自動微分框架的設計,該框架對於模型並行訓練等應用非常有用。
RRef 設計說明涵蓋了框架用於引用遠端工作節點上的值的 RRef(遠端引用)協議的設計。
教程#
RPC 教程向用戶介紹了 RPC 框架,提供了使用 torch.distributed.rpc API 的幾個示例應用程式,並演示瞭如何使用 profiler 來分析基於 RPC 的工作負載。
將 Distributed DataParallel 與分散式 RPC 框架結合使用(也涵蓋 **RemoteModule**)