評價此頁

實驗性面向物件分散式 API#

建立日期:2025 年 7 月 9 日 | 最後更新日期:2025 年 7 月 30 日

這是 PyTorch Distributed 的一個實驗性新 API。它目前正在積極開發中,並且可能會發生更改或完全刪除。

此 API 旨在作為更靈活、面向物件的分散式 API 的試驗場。

class torch.distributed._dist2.ProcessGroup#

Bases: pybind11_object

ProcessGroup 是一個通訊基元,允許跨程序組進行集體操作。

這是一個基類,提供了所有 ProcessGroups 的介面。它不打算直接使用,而是由子類擴充套件。

class BackendType#

Bases: pybind11_object

用於程序組的後端型別。

成員

UNDEFINED

GLOO

NCCL

XCCL

UCC

MPI

CUSTOM

CUSTOM = <BackendType.CUSTOM: 6>#
GLOO = <BackendType.GLOO: 1>#
MPI = <BackendType.MPI: 4>#
NCCL = <BackendType.NCCL: 2>#
UCC = <BackendType.UCC: 3>#
UNDEFINED = <BackendType.UNDEFINED: 0>#
XCCL = <BackendType.XCCL: 5>#
property name#
property value#
CUSTOM = <BackendType.CUSTOM: 6>#
GLOO = <BackendType.GLOO: 1>#
MPI = <BackendType.MPI: 4>#
NCCL = <BackendType.NCCL: 2>#
UCC = <BackendType.UCC: 3>#
UNDEFINED = <BackendType.UNDEFINED: 0>#
XCCL = <BackendType.XCCL: 5>#
abort(self: torch._C._distributed_c10d.ProcessGroup) None#

如果後端支援,則中止所有操作和連線

allgather(*args, **kwargs)#

過載函式。

  1. allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: collections.abc.Sequence[collections.abc.Sequence[torch.Tensor]], input_tensors: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.AllgatherOptions = <torch._C._distributed_c10d.AllgatherOptions object at 0x7f5ce0efcc70>) -> c10d::Work

從程序組中的所有程序收集輸入張量。

有關更多詳細資訊,請參閱 torch.distributed.all_gather()

  1. allgather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: collections.abc.Sequence[torch.Tensor], input_tensor: torch.Tensor, timeout: datetime.timedelta | None = None) -> c10d::Work

從程序組中的所有程序收集輸入張量。

有關更多詳細資訊,請參閱 torch.distributed.all_gather()

allgather_coalesced(self: torch._C._distributed_c10d.ProcessGroup, output_lists: collections.abc.Sequence[collections.abc.Sequence[torch.Tensor]], input_list: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.AllgatherOptions = <torch._C._distributed_c10d.AllgatherOptions object at 0x7f5ce0aa5eb0>) c10d::Work#

從程序組中的所有程序收集輸入張量。

有關更多詳細資訊,請參閱 torch.distributed.all_gather()

allgather_into_tensor_coalesced(self: torch._C._distributed_c10d.ProcessGroup, outputs: collections.abc.Sequence[torch.Tensor], inputs: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.AllgatherOptions = <torch._C._distributed_c10d.AllgatherOptions object at 0x7f5ce0efae70>) c10d::Work#

從程序組中的所有程序收集輸入張量。

有關更多詳細資訊,請參閱 torch.distributed.all_gather()

allreduce(*args, **kwargs)#

過載函式。

  1. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.AllreduceOptions = <torch._C._distributed_c10d.AllreduceOptions object at 0x7f5ce0a9c2f0>) -> c10d::Work

在程序組中的所有程序上對提供的張量執行 allreduce。

有關更多詳細資訊,請參閱 torch.distributed.all_reduce()

  1. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: collections.abc.Sequence[torch.Tensor], op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>, timeout: datetime.timedelta | None = None) -> c10d::Work

在程序組中的所有程序上對提供的張量執行 allreduce。

有關更多詳細資訊,請參閱 torch.distributed.all_reduce()

  1. allreduce(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>, timeout: datetime.timedelta | None = None) -> c10d::Work

在程序組中的所有程序上對提供的張量執行 allreduce。

有關更多詳細資訊,請參閱 torch.distributed.all_reduce()

allreduce_coalesced(self: torch._C._distributed_c10d.ProcessGroup, tensors: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.AllreduceCoalescedOptions = <torch._C._distributed_c10d.AllreduceCoalescedOptions object at 0x7f5ce0efb9f0>) c10d::Work#

在程序組中的所有程序上對提供的張量執行 allreduce。

有關更多詳細資訊,請參閱 torch.distributed.all_reduce()

alltoall(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: collections.abc.Sequence[torch.Tensor], input_tensors: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.AllToAllOptions = <torch._C._distributed_c10d.AllToAllOptions object at 0x7f5ce0aa6a70>) c10d::Work#

在程序組中的所有程序之間 alltoall 輸入張量。

有關更多詳細資訊,請參閱 torch.distributed.all_to_all()

alltoall_base(*args, **kwargs)#

過載函式。

  1. alltoall_base(self: torch._C._distributed_c10d.ProcessGroup, output: torch.Tensor, input: torch.Tensor, output_split_sizes: collections.abc.Sequence[typing.SupportsInt], input_split_sizes: collections.abc.Sequence[typing.SupportsInt], opts: torch._C._distributed_c10d.AllToAllOptions = <torch._C._distributed_c10d.AllToAllOptions object at 0x7f5ce0f870b0>) -> c10d::Work

在程序組中的所有程序之間 alltoall 輸入張量。

有關更多詳細資訊,請參閱 torch.distributed.all_to_all()

  1. alltoall_base(self: torch._C._distributed_c10d.ProcessGroup, output: torch.Tensor, input: torch.Tensor, output_split_sizes: collections.abc.Sequence[typing.SupportsInt], input_split_sizes: collections.abc.Sequence[typing.SupportsInt], timeout: datetime.timedelta | None = None) -> c10d::Work

在程序組中的所有程序之間 alltoall 輸入張量。

有關更多詳細資訊,請參閱 torch.distributed.all_to_all()

barrier(*args, **kwargs)#

過載函式。

  1. barrier(self: torch._C._distributed_c10d.ProcessGroup, opts: torch._C._distributed_c10d.BarrierOptions = <torch._C._distributed_c10d.BarrierOptions object at 0x7f5ce0aa6f30>) -> c10d::Work

阻塞,直到組中的所有程序都呼叫此方法,然後

一起退出呼叫。

有關更多詳細資訊,請參閱 torch.distributed.barrier()

  1. barrier(self: torch._C._distributed_c10d.ProcessGroup, timeout: datetime.timedelta | None = None) -> c10d::Work

阻塞,直到組中的所有程序都呼叫此方法,然後

一起退出呼叫。

有關更多詳細資訊,請參閱 torch.distributed.barrier()

property bound_device_id#
boxed(self: torch._C._distributed_c10d.ProcessGroup) object#
broadcast(*args, **kwargs)#

過載函式。

  1. broadcast(self: torch._C._distributed_c10d.ProcessGroup, tensors: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.BroadcastOptions = <torch._C._distributed_c10d.BroadcastOptions object at 0x7f5ce0ef0bb0>) -> c10d::Work

將張量廣播到程序組中的所有程序。

有關更多詳細資訊,請參閱 torch.distributed.broadcast()

  1. broadcast(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, root: typing.SupportsInt, timeout: datetime.timedelta | None = None) -> c10d::Work

將張量廣播到程序組中的所有程序。

有關更多詳細資訊,請參閱 torch.distributed.broadcast()

gather(*args, **kwargs)#

過載函式。

  1. gather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: collections.abc.Sequence[collections.abc.Sequence[torch.Tensor]], input_tensors: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.GatherOptions = <torch._C._distributed_c10d.GatherOptions object at 0x7f5ce12f27f0>) -> c10d::Work

從程序組中的所有程序收集輸入張量。

有關更多詳細資訊,請參閱 torch.distributed.gather()

  1. gather(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: collections.abc.Sequence[torch.Tensor], input_tensor: torch.Tensor, root: typing.SupportsInt, timeout: datetime.timedelta | None = None) -> c10d::Work

從程序組中的所有程序收集輸入張量。

有關更多詳細資訊,請參閱 torch.distributed.gather()

get_group_store(self: torch._C._distributed_c10d.ProcessGroup) torch._C._distributed_c10d.Store#

獲取此程序組的儲存。

property group_desc#

獲取此程序組描述

property group_name#

(獲取此程序組名稱。它是叢集唯一的)

merge_remote_group(self: torch._C._distributed_c10d.ProcessGroup, store: torch._C._distributed_c10d.Store, size: SupportsInt, timeout: datetime.timedelta = datetime.timedelta(seconds=1800), group_name: str | None = None, group_desc: str | None = None) torch._C._distributed_c10d.ProcessGroup#
monitored_barrier(self: torch._C._distributed_c10d.ProcessGroup, timeout: datetime.timedelta | None = None, wait_all_ranks: bool = False) None#
阻塞,直到組中的所有程序都呼叫此方法,然後

一起退出呼叫。

有關更多詳細資訊,請參閱 torch.distributed.monitored_barrier()

name(self: torch._C._distributed_c10d.ProcessGroup) str#

獲取此程序組的名稱。

rank(self: torch._C._distributed_c10d.ProcessGroup) int#

獲取此程序組的 rank。

recv(self: torch._C._distributed_c10d.ProcessGroup, tensors: collections.abc.Sequence[torch.Tensor], srcRank: SupportsInt, tag: SupportsInt) c10d::Work#

從指定 rank 接收張量。

有關更多詳細資訊,請參閱 torch.distributed.recv()

recv_anysource(self: torch._C._distributed_c10d.ProcessGroup, arg0: collections.abc.Sequence[torch.Tensor], arg1: SupportsInt) c10d::Work#

從任何源接收張量。

有關更多詳細資訊,請參閱 torch.distributed.recv()

reduce(*args, **kwargs)#

過載函式。

  1. reduce(self: torch._C._distributed_c10d.ProcessGroup, tensors: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.ReduceOptions = <torch._C._distributed_c10d.ReduceOptions object at 0x7f5ce0eda330>) -> c10d::Work

在程序組中的所有程序上對提供的張量執行 reduce。

有關更多詳細資訊,請參閱 torch.distributed.reduce()

  1. reduce(self: torch._C._distributed_c10d.ProcessGroup, tensor: torch.Tensor, root: typing.SupportsInt, op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>, timeout: datetime.timedelta | None = None) -> c10d::Work

在程序組中的所有程序上對提供的張量執行 reduce。

有關更多詳細資訊,請參閱 torch.distributed.reduce()

reduce_scatter(*args, **kwargs)#

過載函式。

  1. reduce_scatter(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: collections.abc.Sequence[torch.Tensor], input_tensors: collections.abc.Sequence[collections.abc.Sequence[torch.Tensor]], opts: torch._C._distributed_c10d.ReduceScatterOptions = <torch._C._distributed_c10d.ReduceScatterOptions object at 0x7f5ce0ef2e30>) -> c10d::Work

從程序組中的所有程序對輸入張量進行 reduce 並 scatter。

有關更多詳細資訊,請參閱 torch.distributed.reduce_scatter()

  1. reduce_scatter(self: torch._C._distributed_c10d.ProcessGroup, output: torch.Tensor, input: collections.abc.Sequence[torch.Tensor], op: torch._C._distributed_c10d.ReduceOp = <RedOpType.SUM: 0>, timeout: datetime.timedelta | None = None) -> c10d::Work

從程序組中的所有程序對輸入張量進行 reduce 並 scatter。

有關更多詳細資訊,請參閱 torch.distributed.reduce_scatter()

reduce_scatter_tensor_coalesced(self: torch._C._distributed_c10d.ProcessGroup, outputs: collections.abc.Sequence[torch.Tensor], inputs: collections.abc.Sequence[torch.Tensor], opts: torch._C._distributed_c10d.ReduceScatterOptions = <torch._C._distributed_c10d.ReduceScatterOptions object at 0x7f5ce0efe830>) c10d::Work#

從程序組中的所有程序對輸入張量進行 reduce 並 scatter。

有關更多詳細資訊,請參閱 torch.distributed.reduce_scatter()

scatter(*args, **kwargs)#

過載函式。

  1. scatter(self: torch._C._distributed_c10d.ProcessGroup, output_tensors: collections.abc.Sequence[torch.Tensor], input_tensors: collections.abc.Sequence[collections.abc.Sequence[torch.Tensor]], opts: torch._C._distributed_c10d.ScatterOptions = <torch._C._distributed_c10d.ScatterOptions object at 0x7f5ce0a9dff0>) -> c10d::Work

將輸入張量從程序組中的所有程序 scatter。

有關更多詳細資訊,請參閱 torch.distributed.scatter()

  1. scatter(self: torch._C._distributed_c10d.ProcessGroup, output_tensor: torch.Tensor, input_tensors: collections.abc.Sequence[torch.Tensor], root: typing.SupportsInt, timeout: datetime.timedelta | None = None) -> c10d::Work

將輸入張量從程序組中的所有程序 scatter。

有關更多詳細資訊,請參閱 torch.distributed.scatter()

send(self: torch._C._distributed_c10d.ProcessGroup, tensors: collections.abc.Sequence[torch.Tensor], dstRank: SupportsInt, tag: SupportsInt) c10d::Work#

將張量傳送到指定 rank。

有關更多詳細資訊,請參閱 torch.distributed.send()

set_timeout(self: torch._C._distributed_c10d.ProcessGroup, timeout: datetime.timedelta) None#

設定所有未來操作的預設超時時間。

shutdown(self: torch._C._distributed_c10d.ProcessGroup) None#

關閉程序組

size(self: torch._C._distributed_c10d.ProcessGroup) int#

獲取此程序組的大小。

split_group(self: torch._C._distributed_c10d.ProcessGroup, ranks: collections.abc.Sequence[typing.SupportsInt], timeout: datetime.timedelta | None = None, opts: c10d::Backend::Options | None = None, group_name: str | None = None, group_desc: str | None = None) torch._C._distributed_c10d.ProcessGroup#
static unbox(arg0: object) torch._C._distributed_c10d.ProcessGroup#
class torch.distributed._dist2.ProcessGroupFactory(*args, **kwargs)[source]#

Bases: Protocol

程序組工廠的協議。

torch.distributed._dist2.current_process_group()[source]#

獲取當前程序組。執行緒本地方法。

返回

當前程序組。

返回型別

ProcessGroup

torch.distributed._dist2.new_group(backend, timeout, device, **kwargs)[source]#

使用給定的後端和選項建立一個新的程序組。此組是獨立的,不會被全域性註冊,因此不能透過標準的 torch.distributed.* API 使用。

引數
  • backend (str) – 要用於程序組的後端。

  • timeout (timedelta) – 集體操作的超時時間。

  • device (Union[str, device]) – 要用於程序組的裝置。

  • **kwargs (object) – 所有剩餘的引數都會傳遞給後端建構函式。有關詳細資訊,請參閱特定後端的文件。

返回

一個新的程序組。

返回型別

ProcessGroup

torch.distributed._dist2.process_group(pg)[source]#

程序組的上下文管理器。執行緒本地方法。

引數

pg (ProcessGroup) – 要使用的程序組。

返回型別

Generator[None, None, None]

torch.distributed._dist2.register_backend(name, func)[source]#

註冊新的程序組後端。

引數