libuv TCPStore 後端簡介#
建立日期:2024 年 7 月 22 日 | 最後更新:2024 年 7 月 24 日 | 最後驗證:2024 年 11 月 5 日
作者: Xilun Wu
注意
在 github 上檢視和編輯此教程。
什麼是新的 TCPStore 後端
將新的 libuv 後端與舊的後端進行比較
如何啟用使用舊的後端
PyTorch 2.4 或更高版本
閱讀 TCPStore API。
簡介#
最近,我們推出了一個使用 libuv(一個第三方非同步 I/O 庫)的新 TCPStore 伺服器後端。這個新的伺服器後端旨在解決大規模分散式訓練作業(例如,超過 1024 個 rank 的作業)中的可擴充套件性和健壯性挑戰。我們進行了一系列基準測試,將 libuv 後端與舊的後端進行比較,實驗結果表明,在儲存初始化時間和儲存 I/O 操作的效能方面都有顯著的改進,並且保持了相當的效能。
因此,libuv 後端已在 PyTorch 2.4 中設定為預設的 TCPStore 伺服器後端。此更改預計將提高分散式訓練作業的效能和可擴充套件性。
此更改引入了對儲存初始化的一個小不相容性。對於希望繼續使用舊後端的使用者,本教程將提供有關如何指定使用以前的 TCPStore 伺服器後端的指導。
效能基準測試#
為了更好地展示我們新的 libuv TCPStore 後端的優勢,我們在從 1024 (1K) 到 98304 (96K) rank 的廣泛作業規模上設定了基準測試。我們首先使用下面的程式碼片段測量了 TCPStore 的初始化時間
import logging
import os
from time import perf_counter
import torch
import torch.distributed as dist
logger: logging.Logger = logging.getLogger(__name__)
# Env var are preset when launching the benchmark
env_rank = os.environ.get("RANK", 0)
env_world_size = os.environ.get("WORLD_SIZE", 1)
env_master_addr = os.environ.get("MASTER_ADDR", "localhost")
env_master_port = os.environ.get("MASTER_PORT", "23456")
start = perf_counter()
tcp_store = dist.TCPStore(
env_master_addr,
int(env_master_port),
world_size=int(env_world_size),
is_master=(int(env_rank) == 0),
)
end = perf_counter()
time_elapsed = end - start
logger.info(
f"Complete TCPStore init with rank={env_rank}, world_size={env_world_size} in {time_elapsed} seconds."
)
由於 TCPStore 伺服器執行緒的執行將在所有客戶端成功連線之前被阻塞,我們將 rank 0 上測量的時間作為總的 TCPStore 初始化執行時。實驗結果在下圖中標示
圖 1. 顯示了一些重要證據,表明 libuv 後端優於舊的後端
libuv 後端的 TCPStore 初始化總是比舊的後端更快,尤其是在超大規模下
舊的後端在 96K 規模下(例如,超過 30 分鐘)的伺服器-客戶端連線時會超時,而 libuv 後端在 100 秒內完成了初始化。
我們進行的第二個基準測試是測量 TCPStore store_based_barrier 操作的執行時
import logging
import os
import time
from datetime import timedelta
from time import perf_counter
import torch
import torch.distributed as dist
DistStoreError = torch._C._DistStoreError
logger: logging.Logger = logging.getLogger(__name__)
# since dist._store_based_barrier is a private function and cannot be directly called, we need to write a function which does the same
def store_based_barrier(
rank,
store,
group_name,
rendezvous_count,
timeout=dist.constants.default_pg_timeout,
logging_interval=timedelta(seconds=10),
):
store_key = f"store_based_barrier_key:{group_name}"
store.add(store_key, 1)
world_size = rendezvous_count
worker_count = store.add(store_key, 0)
last_worker_key = f"{store_key}:last_worker"
if worker_count == world_size:
store.set(last_worker_key, "1")
start = time.time()
while True:
try:
# This will throw an exception after the logging_interval in which we print out
# the status of the group or time out officially, throwing runtime error
store.wait([last_worker_key], logging_interval)
break
except RuntimeError as e:
worker_count = store.add(store_key, 0)
# Print status periodically to keep track.
logger.info(
"Waiting in store based barrier to initialize process group for "
"rank: %s, key: %s (world_size=%s, num_workers_joined=%s, timeout=%s)"
"error: %s",
rank,
store_key,
world_size,
worker_count,
timeout,
e,
)
if timedelta(seconds=(time.time() - start)) > timeout:
raise DistStoreError(
"Timed out initializing process group in store based barrier on "
"rank {}, for key: {} (world_size={}, num_workers_joined={}, timeout={})".format(
rank, store_key, world_size, worker_count, timeout
)
)
logger.info(
"Rank %s: Completed store-based barrier for key:%s with %s nodes.",
rank,
store_key,
world_size,
)
# Env var are preset when launching the benchmark
env_rank = os.environ.get("RANK", 0)
env_world_size = os.environ.get("WORLD_SIZE", 1)
env_master_addr = os.environ.get("MASTER_ADDR", "localhost")
env_master_port = os.environ.get("MASTER_PORT", "23456")
tcp_store = dist.TCPStore(
env_master_addr,
int(env_master_port),
world_size=int(env_world_size),
is_master=(int(env_rank) == 0),
)
# sync workers
store_based_barrier(int(env_rank), tcp_store, "tcpstore_test", int(env_world_size))
number_runs = 10
start = perf_counter()
for _ in range(number_runs):
store_based_barrier(
int(env_rank), tcp_store, "tcpstore_test", int(env_world_size)
)
end = perf_counter()
time_elapsed = end - start
logger.info(
f"Complete {number_runs} TCPStore barrier runs with rank={env_rank}, world_size={env_world_size} in {time_elapsed} seconds."
)
我們透過將 rank 0 上測得的執行時除以 number_runs 來計算平均值,並在下圖中標示
圖 2. 顯示 libuv 後端的 I/O 效能與舊的後端相當
libuv 後端的效能在整個 rank 數量範圍內都具有可比性
隨著 rank 數量的增長,libuv 後端的執行時比舊的後端更穩定
影響#
使用者可能需要注意的一個不相容性是,在使用 libuv 後端時,TCPStore 目前不支援使用 listen_fd 進行初始化。如果使用者想繼續使用此初始化方法,使用者只需傳遞 use_libuv=False 即可保留舊的 TCPStore 後端。
import socket
import torch
import torch.distributed as dist
listen_sock: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listen_sock.bind(("localhost", 0))
addr, port, *_ = listen_sock.getsockname()
listen_fd = listen_sock.detach()
tcpstore = dist.TCPStore(addr, port, 1, True, master_listen_fd=listen_fd) # expect NotImplementedError
tcpstore = dist.TCPStore(addr, port, 1, True, master_listen_fd=listen_fd, use_libuv=False) # OK. Use legacy backend
退出路徑 1:在 TCPStore 初始化中傳遞 use_libuv=False#
如上程式碼片段所示,如果使用者呼叫 TCPStore init 方法建立儲存,只需傳遞 use_libuv=False 即可使用舊的 TCPStore 後端。此覆蓋具有最高優先順序,高於決定 TCPStore 伺服器應選擇哪個後端的其他方法。
退出路徑 2:在 ProcessGroup 初始化中的 init_method 中新增 use_libuv=0#
如果使用者未在初始化時顯式傳遞 TCPStore,ProcessGroup 會建立一個 TCPStore。使用者可以在初始化 ProcessGroup 時,在 init_method 中新增查詢選項 use_libuv=0。此方法的優先順序低於退出路徑 1。
import torch
import torch.distributed as dist
addr = "localhost"
port = 23456
dist.init_process_group(
backend="cpu:gloo,cuda:nccl",
rank=0,
world_size=1,
init_method=f"tcp://{addr}:{port}?use_libuv=0",
)
dist.destroy_process_group()
退出路徑 3:將環境變數 USE_LIBUV 設定為 0#
當 ProcessGroup 建立 TCPStore 時,它還會檢查環境變數 USE_LIBUV 來確定使用哪個 TCPStore 後端。使用者可以將環境變數 "USE_LIBUV" 設定為 "0" 來指定使用舊的 TCPStore 後端。此方法的優先順序低於退出路徑 2,例如,如果使用者將環境變數 USE_LIBUV 設定為 "1",並且還在 init_method 中傳遞了 use_libuv=0,那麼將選擇舊的儲存後端。
import os
import torch
import torch.distributed as dist
addr = "localhost"
port = 23456
os.environ["USE_LIBUV"] = "0"
dist.init_process_group(
backend="cpu:gloo,cuda:nccl",
rank=0,
world_size=1,
init_method=f"tcp://{addr}:{port}",
)
dist.destroy_process_group()
結論#
在 PyTorch 2.4 中,我們將新的 libuv TCPStore 後端設定為預設。儘管新的後端與從 listen_fd 進行初始化存在不相容性,但在大規模儲存初始化方面顯示出顯著的效能提升,並且在中/大型規模的儲存 I/O 方面具有可比的效能,這為分散式訓練的控制平面帶來了巨大好處。本教程解釋了我們的動機,介紹了效能基準測試,提醒使用者潛在的影響,並介紹了三種保留使用舊後端的方法。從長遠來看,我們的目標是最終棄用舊的後端。