使用 C++ 擴充套件自定義程序組後端#
建立日期:2022年02月01日 | 最後更新:2024年11月14日 | 最後驗證:2024年11月05日
作者:Howard Huang, Feng Tian, Shen Li, Min Si
注意
在 github 上檢視和編輯此教程。
先決條件
本教程演示如何實現自定義的 Backend 並使用 C++ 擴充套件將其整合到 PyTorch 分散式包中。當您需要針對特定硬體的專用軟體堆疊,或者想試驗新的集體通訊演算法時,這將非常有用。
基礎知識#
PyTorch 集體通訊為許多廣泛採用的分散式訓練特性提供支援,包括 DistributedDataParallel 和 ZeroRedundancyOptimizer。為了使相同的集體通訊 API 能夠與不同的通訊後端配合使用,分散式包將集體通訊操作抽象為一個 Backend 類。然後,不同的後端可以實現為 Backend 的子類,並使用首選的第三方庫。PyTorch 分散式自帶三個預設後端:ProcessGroupNCCL、ProcessGroupGloo 和 ProcessGroupMPI。然而,除了這三個後端之外,還有其他通訊庫(例如 UCC、OneCCL)、不同型別的硬體(例如 TPU、Trainum)以及新興的通訊演算法(例如 Herring、Reduction Server)。因此,分散式包暴露了擴充套件 API,允許自定義集體通訊後端。
以下 4 個步驟展示瞭如何實現一個簡單的 Backend 後端並在 Python 應用程式程式碼中使用它。請注意,本教程側重於演示擴充套件 API,而不是開發一個功能齊全的通訊後端。因此,dummy 後端僅覆蓋了部分 API(all_reduce 和 all_gather),並將張量的值簡單設定為 0。
步驟 1:實現 Backend 的子類#
第一步是實現一個 Backend 子類,重寫目標集體通訊 API 並執行自定義通訊演算法。擴充套件還需要實現一個 Work 子類,它作為通訊結果的未來,並允許在應用程式程式碼中進行非同步執行。如果擴充套件使用第三方庫,它可以包含標頭檔案並從 BackendDummy 子類呼叫庫 API。下面的兩個程式碼片段展示了 dummy.h 和 dummy.cpp 的實現。有關完整實現,請參閱 dummy collectives 倉庫。
// file name: dummy.hpp
#include <torch/python.h>
#include <torch/csrc/distributed/c10d/Backend.hpp>
#include <torch/csrc/distributed/c10d/Work.hpp>
#include <torch/csrc/distributed/c10d/Store.hpp>
#include <torch/csrc/distributed/c10d/Types.hpp>
#include <torch/csrc/distributed/c10d/Utils.hpp>
#include <pybind11/chrono.h>
namespace c10d {
class BackendDummy : public Backend {
public:
BackendDummy(int rank, int size);
c10::intrusive_ptr<Work> allgather(
std::vector<std::vector<at::Tensor>>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const AllgatherOptions& opts = AllgatherOptions()) override;
c10::intrusive_ptr<Work> allreduce(
std::vector<at::Tensor>& tensors,
const AllreduceOptions& opts = AllreduceOptions()) override;
// The collective communication APIs without a custom implementation
// will error out if invoked by application code.
};
class WorkDummy : public Work {
public:
WorkDummy(
OpType opType,
c10::intrusive_ptr<c10::ivalue::Future> future) // future of the output
: Work(
-1, // rank, only used by recvAnySource, irrelevant in this demo
opType),
future_(std::move(future)) {}
bool isCompleted() override;
bool isSuccess() const override;
bool wait(std::chrono::milliseconds timeout = kUnsetTimeout) override;
virtual c10::intrusive_ptr<c10::ivalue::Future> getFuture() override;
private:
c10::intrusive_ptr<c10::ivalue::Future> future_;
};
} // namespace c10d
// file name: dummy.cpp
#include "dummy.hpp"
namespace c10d {
// This is a dummy allgather that sets all output tensors to zero
// Modify the implementation to conduct real communication asynchronously
c10::intrusive_ptr<Work> BackendDummy::allgather(
std::vector<std::vector<at::Tensor>>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const AllgatherOptions& /* unused */) {
for (auto& outputTensorVec : outputTensors) {
for (auto& outputTensor : outputTensorVec) {
outputTensor.zero_();
}
}
auto future = c10::make_intrusive<c10::ivalue::Future>(
c10::ListType::create(c10::ListType::create(c10::TensorType::get())));
future->markCompleted(c10::IValue(outputTensors));
return c10::make_intrusive<WorkDummy>(OpType::ALLGATHER, std::move(future));
}
// This is a dummy allreduce that sets all output tensors to zero
// Modify the implementation to conduct real communication asynchronously
c10::intrusive_ptr<Work> BackendDummy::allreduce(
std::vector<at::Tensor>& tensors,
const AllreduceOptions& opts) {
for (auto& tensor : tensors) {
tensor.zero_();
}
auto future = c10::make_intrusive<c10::ivalue::Future>(
c10::ListType::create(c10::TensorType::get()));
future->markCompleted(c10::IValue(tensors));
return c10::make_intrusive<WorkDummy>(OpType::ALLGATHER, std::move(future));
}
} // namespace c10d
步驟 2:暴露擴充套件的 Python API#
後端建構函式是從 Python 端呼叫的,因此擴充套件也需要將建構函式 API 暴露給 Python。這可以透過新增以下方法來完成。在此示例中,store 和 timeout 引數在 BackendDummy 例項化方法中被忽略,因為在此簡單的實現中未使用它們。然而,實際的擴充套件應考慮使用 store 來執行握手並支援 timeout 引數。
// file name: dummy.hpp
class BackendDummy : public Backend {
...
<Step 1 code>
...
static c10::intrusive_ptr<Backend> createBackendDummy(
const c10::intrusive_ptr<::c10d::Store>& store,
int rank,
int size,
const std::chrono::duration<float>& timeout);
static void BackendDummyConstructor() __attribute__((constructor)) {
py::object module = py::module::import("torch.distributed");
py::object register_backend =
module.attr("Backend").attr("register_backend");
// torch.distributed.Backend.register_backend will add `dummy` as a
// new valid backend.
register_backend("dummy", py::cpp_function(createBackendDummy));
}
}
// file name: dummy.cpp
c10::intrusive_ptr<Backend> BackendDummy::createBackendDummy(
const c10::intrusive_ptr<::c10d::Store>& /* unused */,
int rank,
int size,
const std::chrono::duration<float>& /* unused */) {
return c10::make_intrusive<BackendDummy>(rank, size);
}
PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
m.def("createBackendDummy", &BackendDummy::createBackendDummy);
}
步驟 3:構建自定義擴充套件#
現在,擴充套件原始檔已準備就緒。我們可以使用 C++ 擴充套件來構建它。為此,建立一個 setup.py 檔案來準備路徑和命令。然後呼叫 python setup.py develop 來安裝擴充套件。
如果擴充套件依賴於第三方庫,您還可以將 libraries_dirs 和 libraries 指定給 C++ 擴充套件 API。請參閱 torch ucc 專案作為實際示例。
# file name: setup.py
import os
import sys
import torch
from setuptools import setup
from torch.utils import cpp_extension
sources = ["src/dummy.cpp"]
include_dirs = [f"{os.path.dirname(os.path.abspath(__file__))}/include/"]
if torch.cuda.is_available():
module = cpp_extension.CUDAExtension(
name = "dummy_collectives",
sources = sources,
include_dirs = include_dirs,
)
else:
module = cpp_extension.CppExtension(
name = "dummy_collectives",
sources = sources,
include_dirs = include_dirs,
)
setup(
name = "Dummy-Collectives",
version = "0.0.1",
ext_modules = [module],
cmdclass={'build_ext': cpp_extension.BuildExtension}
)
步驟 4:在應用程式中使用擴充套件#
安裝完成後,您可以在呼叫 init_process_group 時方便地使用 dummy 後端,就像它是一個內建後端一樣。
我們可以透過更改 init_process_group 的 backend 引數來指定基於後端的排程。我們可以透過將 cpu:gloo,cuda:dummy 指定為 backend 引數,將 CPU 張量的集體操作排程到 gloo 後端,並將 CUDA 張量的集體操作排程到 dummy 後端。
要將所有張量傳送到 dummy 後端,我們可以簡單地將 dummy 指定為 backend 引數。
import os
import torch
# importing dummy_collectives makes torch.distributed recognize `dummy`
# as a valid backend.
import dummy_collectives
import torch.distributed as dist
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
# Alternatively:
# dist.init_process_group("dummy", rank=0, world_size=1)
dist.init_process_group("cpu:gloo,cuda:dummy", rank=0, world_size=1)
# this goes through gloo
x = torch.ones(6)
dist.all_reduce(x)
print(f"cpu allreduce: {x}")
# this goes through dummy
if torch.cuda.is_available():
y = x.cuda()
dist.all_reduce(y)
print(f"cuda allreduce: {y}")
try:
dist.broadcast(y, 0)
except RuntimeError:
print("got RuntimeError when calling broadcast")