評價此頁

遠端引用協議#

創建於:2019年11月20日 | 最後更新於:2025年04月27日

本文件描述了遠端引用協議的設計細節,並回顧了不同場景下的訊息流。請確保您熟悉 分散式 RPC 框架 後再繼續閱讀。

背景#

RRef 是 Remote REFerence 的縮寫。它是一個指向本地或遠端工作器上物件的引用,並在後臺透明地處理引用計數。從概念上講,它可以被視為一個分散式共享指標。應用程式可以透過呼叫 remote() 來建立 RRef。每個 RRef 由 remote() 呼叫的被呼叫方工作器(即所有者)擁有,並且可以被多個使用者使用。所有者儲存真實資料並跟蹤全域性引用計數。每個 RRef 都可以透過一個全域性唯一的 RRefId 來標識,該 ID 在 remote() 呼叫發起方處建立時分配。

在所有者工作器上,只有一個 OwnerRRef 例項,其中包含真實資料,而在使用者工作器上,可以根據需要建立任意數量的 UserRRef,並且 UserRRef 不持有資料。在所有者上的所有使用都將使用全域性唯一的 RRefId 來檢索唯一的 OwnerRRef 例項。當 UserRRef 用作 rpc_sync()rpc_async()remote() 呼叫中的引數或返回值時,將會建立一個 UserRRef,並且所有者將收到通知以更新引用計數。當全域性上沒有任何 UserRRef 例項並且所有者上也沒有對 OwnerRRef 的引用時,OwnerRRef 及其資料將被刪除。

假設#

RRef 協議的設計基於以下假設。

  • 短暫的網路故障:RRef 設計透過重試訊息來處理短暫的網路故障。它無法處理節點崩潰或永久性網路分割槽。當發生這些事件時,應用程式應關閉所有工作器,回滾到之前的檢查點,然後恢復訓練。

  • 非冪等的 UDF:我們假設提供給 rpc_sync()rpc_async()remote() 的使用者函式 (UDF) 不是冪等的,因此不能重試。但是,內部 RRef 控制訊息是冪等的,並在訊息失敗時重試。

  • 訊息亂序投遞:我們不假設任何節點對之間的訊息投遞順序,因為傳送方和接收方都使用了多個執行緒。訊息的處理順序沒有保證。

RRef 生命週期#

該協議的目標是在適當的時候刪除 OwnerRRef。刪除 OwnerRRef 的合適時機是當沒有活動的 UserRRef 例項,並且使用者程式碼也不持有對 OwnerRRef 的引用時。棘手的部分是如何確定是否存在任何活動的 UserRRef 例項。

設計推理#

使用者可以在三種情況下獲得 UserRRef

  1. 從所有者那裡收到 UserRRef

  2. 從另一個使用者那裡收到 UserRRef

  3. 建立一個由另一個工作器擁有的新 UserRRef

情況 1 最簡單,所有者將 RRef 傳遞給使用者,其中所有者呼叫 rpc_sync()rpc_async()remote() 並將其 RRef 作為引數。在這種情況下,使用者端將建立一個新的 UserRRef。由於所有者是呼叫者,它可以輕鬆更新其在 OwnerRRef 上的本地引用計數。

唯一的要求是任何 UserRRef 在銷燬時必須通知所有者。因此,我們需要第一個保證:

G1. 當任何 UserRRef 被刪除時,所有者將收到通知。

由於訊息可能延遲或亂序到達,我們需要另一個保證,以確保刪除訊息不會過早處理。如果 A 向 B 傳送一條涉及 RRef 的訊息,我們將 A 上的 RRef 稱為(父 RRef),將 B 上的 RRef 稱為(子 RRef)。

G2. 父 RRef 在被所有者確認子 RRef 之前不會被刪除。

在情況 2 和 3 中,所有者可能只對 RRef 分叉圖有部分或完全不知情。例如,RRef 可能在一個使用者上建立,在所有者收到任何 RPC 呼叫之前,建立者使用者可能已經與其他使用者共享了 RRef,並且這些使用者可能進一步共享 RRef。一個不變的規則是,任何 RRef 的分叉圖始終是一棵樹,因為分叉 RRef 總是會在被呼叫方(除非被呼叫方是所有者)上建立一個新的 UserRRef 例項,因此每個 RRef 都有一個唯一的父級。

所有者對樹中任何 UserRRef 的檢視有三個階段:

1) unknown -> 2) known -> 3) deleted.

所有者對整個樹的檢視不斷變化。當所有者認為沒有活動的 UserRRef 例項時,它會刪除其 OwnerRRef 例項,即當 OwnerRRef 被刪除時,所有 UserRRef 例項可能已經被實際刪除,也可能是未知的。危險的情況是某些分叉未知而其他分叉已被刪除。

G2 預設保證了任何父 UserRRef 在所有者知道其所有子 UserRRef 例項之前不會被刪除。但是,子 UserRRef 在所有者知道其父 UserRRef 之前可能已被刪除。

考慮以下示例,其中 OwnerRRef 分叉到 A,然後 A 分叉到 Y,Y 分叉到 Z。

OwnerRRef -> A -> Y -> Z

如果 Z 的所有訊息,包括刪除訊息,都在 Y 的訊息之前被所有者處理。所有者將在知道 Y 存在之前就得知 Z 被刪除。儘管如此,這並不會導致任何問題。因為,至少 Y 的一個祖先(A)會保持活動狀態,它將阻止所有者刪除 OwnerRRef。更具體地說,如果所有者不知道 Y,則由於 G2,A 不會被刪除,並且所有者知道 A,因為它就是 A 的父級。

如果 RRef 是在使用者上建立的,情況會稍微複雜一些。

OwnerRRef
    ^
    |
    A -> Y -> Z

如果 Z 對 UserRRef 呼叫 to_here(),那麼在 Z 被刪除時,所有者至少知道 A,因為否則 to_here() 將不會完成。如果 Z 沒有呼叫 to_here(),則所有者可能在收到來自 A 和 Y 的任何訊息之前就收到了 Z 的所有訊息。在這種情況下,由於 OwnerRRef 的真實資料尚未建立,因此也沒有什麼可以刪除的。這與 Z 完全不存在的情況相同。因此,仍然是可以的。

實現#

G1 透過在 UserRRef 解構函式中傳送刪除訊息來實現。為了提供 G2,父 UserRRef 在被分叉時被放入一個上下文中,並由新的 ForkId 索引。父 UserRRef 僅在收到子節點的確認訊息 (ACK) 後才從上下文中移除,而子節點僅在得到所有者的確認後才會傳送 ACK。

協議場景#

現在讓我們在四種場景中討論上述設計如何轉化為協議。

使用者將 RRef 作為返回值與所有者共享#

import torch
import torch.distributed.rpc as rpc

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rref.to_here()

在這種情況下,UserRRef 在使用者工作器 A 上建立,然後與遠端訊息一起傳遞給所有者工作器 B,然後 B 建立 OwnerRRefremote() 方法立即返回,這意味著 UserRRef 可以在所有者知曉之前被分叉/使用。

在所有者端,當收到 remote() 呼叫時,它將建立 OwnerRRef,並返回一個 ACK 來確認 {100, 1}(RRefId, ForkId)。只有在收到此 ACK 後,A 才能刪除其 UserRRef。這涉及 G1G2G1 是顯而易見的。對於 G2OwnerRRefUserRRef 的子級,並且 UserRRef 在收到所有者的 ACK 之前不會被刪除。

user_to_owner_ret.png

上圖顯示了訊息流,其中實線箭頭包含使用者函式,虛線箭頭是內建訊息。請注意,從 A 到 B 的前兩條訊息(remote()to_here())可能以任何順序到達 B,但最終的刪除訊息僅在以下情況傳送:

  • B 確認 UserRRef {100, 1} (G2),並且

  • Python GC 同意刪除本地 UserRRef 例項。這發生在 RRef 不再處於作用域並且可以被垃圾回收時。

使用者將 RRef 作為引數與所有者共享#

import torch
import torch.distributed.rpc as rpc

# on worker A and worker B
def func(rref):
  pass

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rpc.rpc_async('B', func, args=(rref, ))

在這種情況下,在 A 上建立 UserRRef 後,A 在後續的 RPC 呼叫中將其作為引數傳遞給 B。A 將保持 UserRRef {100, 1} 存活,直到收到 B 的確認(G2,而不是 RPC 呼叫的返回值)。這是必要的,因為 A 不應在所有先前訊息都已收到之前傳送刪除訊息,否則,OwnerRRef 可能會在被使用之前被刪除,因為我們不保證訊息投遞順序。這是透過建立一個 RRef 的子 ForkId,並將其儲存在一個對映中,直到收到所有者確認子 ForkId。下圖顯示了訊息流。

user_to_owner_arg.png

請注意,UserRRef 可以在 func 完成之前或甚至開始之前在 B 上被刪除。但是這沒關係,因為當 B 傳送子 ForkId 的 ACK 時,它已經獲取了 OwnerRRef 例項,這將防止它過早被刪除。

所有者與使用者共享 RRef#

從所有者到使用者是最簡單的情況,所有者可以在本地更新引用計數,並且不需要任何額外的控制訊息來通知其他人。關於 G2,情況與父級立即收到所有者的 ACK 相同,因為父級就是所有者。

import torch
import torch.distributed.rpc as RRef, rpc

# on worker B and worker C
def func(rref):
  pass

# on worker B, creating a local RRef
rref = RRef("data")
# say the rref has RRefId 100
dist.rpc_async('C', func, args=(rref, ))
owner_to_user.png

上圖顯示了訊息流。請注意,當 OwnerRRef 在 rpc_async 呼叫後退出作用域時,它不會被刪除,因為內部有一個對映來保持其存活,如果存在任何已知的分叉,在這種情況下是 UserRRef {100, 1}。(G2

使用者之間共享 RRef#

這是最複雜的情況,其中呼叫方使用者(父 UserRRef)、被呼叫方使用者(子 UserRRef)以及所有者都需要參與。

import torch
import torch.distributed.rpc as rpc

# on worker A and worker C
def func(rref):
  pass

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rpc.rpc_async('C', func, args=(rref, ))
user_to_user.png

當 C 從 A 接收到子 UserRRef 時,它會向所有者 B 傳送一個分叉請求。稍後,當 B 確認 C 上的 UserRRef 時,C 將並行執行兩個操作:1) 向 A 傳送子 ACK,和 2) 執行使用者提供的函式。在此期間,父級 (A) 將保持其 UserRRef {100, 1} 存活以實現 G2