評價此頁

使用分散式 RPC 框架實現引數伺服器#

建立日期:2020 年 4 月 6 日 | 最後更新:2024 年 5 月 7 日 | 最後驗證:未驗證

作者Rohan Varma

注意

編輯github 中檢視和編輯此教程。

先決條件

本教程將透過一個簡單的示例,介紹如何使用 PyTorch 的 分散式 RPC 框架 實現引數伺服器。引數伺服器框架是一種範例,其中一組伺服器儲存引數(例如大型嵌入表),然後多個訓練器查詢引數伺服器以檢索最新的引數。這些訓練器可以本地執行訓練迴圈,並偶爾與引數伺服器同步以獲取最新引數。有關引數伺服器方法的更多閱讀,請參閱 這篇論文

使用分散式 RPC 框架,我們將構建一個示例,其中多個訓練器使用 RPC 與同一個引數伺服器通訊,並使用 RRef 訪問遠端引數伺服器例項上的狀態。每個訓練器將透過跨多個節點縫合自動微分圖(使用分散式自動微分)來分散式地啟動其專用的反向傳播。

注意:本教程涵蓋了分散式 RPC 框架的使用,該框架可用於將模型分割到多臺機器上,或實現引數伺服器訓練策略,其中網路訓練器從託管在另一臺機器上的引數中獲取引數。如果您正在尋找將模型複製到許多 GPU 上,請參閱 分散式資料並行教程。還有另一個 RPC 教程 涵蓋了強化學習和 RNN 的用例。

讓我們從熟悉的部分開始:匯入所需的模組並定義一個將在 MNIST 資料集上訓練的簡單 ConvNet。下面的網路在很大程度上改編自 pytorch/examples 倉庫 中定義的網路。

import argparse
import os
import time
from threading import Lock

import torch
import torch.distributed.autograd as dist_autograd
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.distributed.optim import DistributedOptimizer
from torchvision import datasets, transforms

# --------- MNIST Network to train, from pytorch/examples -----

class Net(nn.Module):
    def __init__(self, num_gpus=0):
        super(Net, self).__init__()
        print(f"Using {num_gpus} GPUs to train")
        self.num_gpus = num_gpus
        device = torch.device(
            "cuda:0" if torch.cuda.is_available() and self.num_gpus > 0 else "cpu")
        print(f"Putting first 2 convs on {str(device)}")
        # Put conv layers on the first cuda device, or CPU if no cuda device
        self.conv1 = nn.Conv2d(1, 32, 3, 1).to(device)
        self.conv2 = nn.Conv2d(32, 64, 3, 1).to(device)
        # Put rest of the network on the 2nd cuda device, if there is one
        if "cuda" in str(device) and num_gpus > 1:
            device = torch.device("cuda:1")

        print(f"Putting rest of layers on {str(device)}")
        self.dropout1 = nn.Dropout2d(0.25).to(device)
        self.dropout2 = nn.Dropout2d(0.5).to(device)
        self.fc1 = nn.Linear(9216, 128).to(device)
        self.fc2 = nn.Linear(128, 10).to(device)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.max_pool2d(x, 2)

        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        # Move tensor to next device if necessary
        next_device = next(self.fc1.parameters()).device
        x = x.to(next_device)

        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

接下來,讓我們定義一些對我們指令碼的其餘部分有用的輔助函式。以下程式碼使用 rpc_syncRRef 來定義一個函式,該函式會在位於遠端節點上的物件上呼叫給定方法。在下面,我們對遠端物件的控制代碼由 rref 引數提供,並且我們在其所有節點上執行它:rref.owner()。在呼叫節點上,我們透過使用 rpc_sync 同步執行此命令,這意味著我們將阻塞直到收到響應。

# --------- Helper Methods --------------------

# On the local node, call a method with first arg as the value held by the
# RRef. Other args are passed in as arguments to the function called.
# Useful for calling instance methods. method could be any matching function, including
# class methods.
def call_method(method, rref, *args, **kwargs):
    return method(rref.local_value(), *args, **kwargs)

# Given an RRef, return the result of calling the passed in method on the value
# held by the RRef. This call is done on the remote node that owns
# the RRef and passes along the given argument.
# Example: If the value held by the RRef is of type Foo, then
# remote_method(Foo.bar, rref, arg1, arg2) is equivalent to calling
# <foo_instance>.bar(arg1, arg2) on the remote node and getting the result
# back.

def remote_method(method, rref, *args, **kwargs):
    args = [method, rref] + list(args)
    return rpc.rpc_sync(rref.owner(), call_method, args=args, kwargs=kwargs)

現在,我們準備定義我們的引數伺服器。我們將繼承 nn.Module 並儲存上面定義的網路的控制代碼。我們還將儲存一個輸入裝置,這是在我們呼叫模型之前將輸入傳輸到的裝置。

# --------- Parameter Server --------------------
class ParameterServer(nn.Module):
    def __init__(self, num_gpus=0):
        super().__init__()
        model = Net(num_gpus=num_gpus)
        self.model = model
        self.input_device = torch.device(
            "cuda:0" if torch.cuda.is_available() and num_gpus > 0 else "cpu")

接下來,我們將定義我們的前向傳播。請注意,無論模型輸出的裝置如何,我們都會將輸出移動到 CPU,因為分散式 RPC 框架目前僅支援透過 RPC 傳送 CPU 張量。我們已故意停用透過 RPC 傳送 CUDA 張量,因為呼叫者/被呼叫者可能具有不同的裝置(CPU/GPU),但未來版本可能會支援此功能。

class ParameterServer(nn.Module):
...
    def forward(self, inp):
        inp = inp.to(self.input_device)
        out = self.model(inp)
        # This output is forwarded over RPC, which as of 1.5.0 only accepts CPU tensors.
        # Tensors must be moved in and out of GPU memory due to this.
        out = out.to("cpu")
        return out

接下來,我們將定義一些用於訓練和驗證目的的雜項函式。第一個函式 get_dist_gradients 將接收一個分散式自動微分上下文 ID,並呼叫 dist_autograd.get_gradients API 來檢索由分散式自動微分計算的梯度。更多資訊可以在 分散式自動微分文件 中找到。請注意,我們還會遍歷生成的字典並將每個張量轉換為 CPU 張量,因為該框架目前僅支援透過 RPC 傳送張量。接下來,get_param_rrefs 將遍歷我們的模型引數並將它們包裝為(本地)RRef。此方法將透過 RPC 由訓練器節點呼叫,並將返回要最佳化的引數列表。這是 分散式最佳化器 的輸入所必需的,該最佳化器需要所有要最佳化的引數作為 RRef 的列表。

# Use dist autograd to retrieve gradients accumulated for this model.
# Primarily used for verification.
def get_dist_gradients(self, cid):
    grads = dist_autograd.get_gradients(cid)
    # This output is forwarded over RPC, which as of 1.5.0 only accepts CPU tensors.
    # Tensors must be moved in and out of GPU memory due to this.
    cpu_grads = {}
    for k, v in grads.items():
        k_cpu, v_cpu = k.to("cpu"), v.to("cpu")
        cpu_grads[k_cpu] = v_cpu
    return cpu_grads

# Wrap local parameters in a RRef. Needed for building the
# DistributedOptimizer which optimizes paramters remotely.
def get_param_rrefs(self):
    param_rrefs = [rpc.RRef(param) for param in self.model.parameters()]
    return param_rrefs

最後,我們將建立初始化引數伺服器的方法。請注意,引數伺服器將在所有程序中只有一個例項,並且所有訓練器將與同一個引數伺服器通訊並更新儲存的同一模型。正如在 run_parameter_server 中所見,伺服器本身不執行任何獨立操作;它等待來自訓練器的請求(這些訓練器尚未定義),並透過執行請求的函式來響應它們。

# The global parameter server instance.
param_server = None
# A lock to ensure we only have one parameter server.
global_lock = Lock()


def get_parameter_server(num_gpus=0):
    """
    Returns a singleton parameter server to all trainer processes
    """
    global param_server
    # Ensure that we get only one handle to the ParameterServer.
    with global_lock:
        if not param_server:
            # construct it once
            param_server = ParameterServer(num_gpus=num_gpus)
        return param_server

def run_parameter_server(rank, world_size):
    # The parameter server just acts as a host for the model and responds to
    # requests from trainers.
    # rpc.shutdown() will wait for all workers to complete by default, which
    # in this case means that the parameter server will wait for all trainers
    # to complete, and then exit.
    print("PS master initializing RPC")
    rpc.init_rpc(name="parameter_server", rank=rank, world_size=world_size)
    print("RPC initialized! Running parameter server...")
    rpc.shutdown()
    print("RPC shutdown on parameter server.")

請注意,上面,rpc.shutdown() 不會立即關閉引數伺服器。相反,它將等待所有工作程式(在此情況下為訓練器)也呼叫 rpc.shutdown()。這使我們能夠保證引數伺服器在所有訓練器(尚未定義)完成其訓練過程之前不會離線。

接下來,我們將定義我們的 TrainerNet 類。這也將繼承 nn.Module,並且我們的 __init__ 方法將使用 rpc.remote API 來獲取我們引數伺服器的 RRef(遠端引用)。請注意,我們在這裡並沒有將引數伺服器複製到我們的本地程序,而是可以認為 self.param_server_rref 是一個分散式共享指標,指向位於單獨程序中的引數伺服器。

# --------- Trainers --------------------

# nn.Module corresponding to the network trained by this trainer. The
# forward() method simply invokes the network on the given parameter
# server.
class TrainerNet(nn.Module):
    def __init__(self, num_gpus=0):
        super().__init__()
        self.num_gpus = num_gpus
        self.param_server_rref = rpc.remote(
            "parameter_server", get_parameter_server, args=(num_gpus,))

接下來,我們將定義一個名為 get_global_param_rrefs 的方法。為了說明此方法的必要性,值得閱讀有關 分散式最佳化器 的文件,特別是 API 簽名。最佳化器必須傳遞一個 RRef 列表,該列表對應於要最佳化的遠端引數,因此我們在這裡獲取必需的 RRef。由於給定 TrainerNet 互動的唯一遠端工作程式是 ParameterServer,我們只需在 ParameterServer 上呼叫 remote_method。我們使用我們在 ParameterServer 類中定義的方法 get_param_rrefs。此方法將返回需要最佳化的引數的 RRef 列表。請注意,在這種情況下,我們的 TrainerNet 沒有定義自己的引數;如果它有,我們也需要將每個引數包裝在 RRef 中,並將其包含到我們對 DistributedOptimizer 的輸入中。

class TrainerNet(nn.Module):
...
    def get_global_param_rrefs(self):
        remote_params = remote_method(
            ParameterServer.get_param_rrefs,
            self.param_server_rref)
        return remote_params

現在,我們準備定義我們的 forward 方法,該方法將呼叫(同步)RPC 來執行在 ParameterServer 上定義的網路的前向傳播。請注意,我們將 self.param_server_rref(這是我們 ParameterServer 的遠端控制代碼)傳遞到我們的 RPC 呼叫中。此呼叫將向執行我們 ParameterServer 的節點發送 RPC,呼叫 forward 傳播,並返回與模型輸出對應的 Tensor

class TrainerNet(nn.Module):
...
    def forward(self, x):
        model_output = remote_method(
            ParameterServer.forward, self.param_server_rref, x)
        return model_output

在我們的訓練器完全定義之後,現在是時候編寫我們的神經網路訓練迴圈了,該迴圈將建立我們的網路和最佳化器,透過網路執行一些輸入並計算損失。訓練迴圈看起來很像本地訓練程式,但由於我們的網路分佈在多臺機器上,因此進行了一些修改。

下面,我們初始化我們的 TrainerNet 並構建一個 DistributedOptimizer。請注意,如上所述,我們必須傳入所有要最佳化的全域性(參與分散式訓練的所有節點之間)引數。此外,我們傳入本地最佳化器,在本例中是 SGD。請注意,我們可以像建立本地最佳化器一樣配置底層最佳化演算法 - optimizer.SGD 的所有引數都將被正確轉發。例如,我們傳入一個自定義的學習率,該學習率將用作所有本地最佳化器的學習率。

def run_training_loop(rank, num_gpus, train_loader, test_loader):
    # Runs the typical nueral network forward + backward + optimizer step, but
    # in a distributed fashion.
    net = TrainerNet(num_gpus=num_gpus)
    # Build DistributedOptimizer.
    param_rrefs = net.get_global_param_rrefs()
    opt = DistributedOptimizer(optim.SGD, param_rrefs, lr=0.03)

接下來,我們定義我們的主訓練迴圈。我們遍歷 PyTorch 的 DataLoader 提供的可迭代物件。在編寫典型的 forward/backward/optimizer 迴圈之前,我們首先將邏輯包裝在 分散式自動微分上下文 中。請注意,這對於記錄模型前向傳播中呼叫的 RPC 是必需的,以便構建一個包含所有參與分散式工作程式的反向傳播的適當圖。分散式自動微分上下文返回一個 context_id,它作為特定迭代的梯度累積和最佳化的識別符號。

與呼叫典型的 loss.backward()(該呼叫將在本地工作程式上啟動反向傳播)不同,我們呼叫 dist_autograd.backward() 並傳入我們的 context_id 和 loss,後者是我們希望反向傳播開始的根。此外,我們將此 context_id 傳入我們的最佳化器呼叫中,該呼叫是必需的,以便能夠查詢由此特定反向傳播在所有節點上計算的相應梯度。

def run_training_loop(rank, num_gpus, train_loader, test_loader):
...
    for i, (data, target) in enumerate(train_loader):
        with dist_autograd.context() as cid:
            model_output = net(data)
            target = target.to(model_output.device)
            loss = F.nll_loss(model_output, target)
            if i % 5 == 0:
                print(f"Rank {rank} training batch {i} loss {loss.item()}")
            dist_autograd.backward(cid, [loss])
            # Ensure that dist autograd ran successfully and gradients were
            # returned.
            assert remote_method(
                ParameterServer.get_dist_gradients,
                net.param_server_rref,
                cid) != {}
            opt.step(cid)

     print("Training complete!")
     print("Getting accuracy....")
     get_accuracy(test_loader, net)

以下程式碼只是在訓練完成後計算我們模型的準確率,這與傳統的本地模型非常相似。但是,請注意,我們上面傳遞到此函式的 netTrainerNet 的例項,因此前向傳播以透明的方式呼叫 RPC。

def get_accuracy(test_loader, model):
    model.eval()
    correct_sum = 0
    # Use GPU to evaluate if possible
    device = torch.device("cuda:0" if model.num_gpus > 0
        and torch.cuda.is_available() else "cpu")
    with torch.no_grad():
        for i, (data, target) in enumerate(test_loader):
            out = model(data, -1)
            pred = out.argmax(dim=1, keepdim=True)
            pred, target = pred.to(device), target.to(device)
            correct = pred.eq(target.view_as(pred)).sum().item()
            correct_sum += correct

    print(f"Accuracy {correct_sum / len(test_loader.dataset)}")

接下來,類似於我們定義 run_parameter_server 作為負責初始化 RPC 的 ParameterServer 的主迴圈,讓我們為訓練器定義一個類似的主迴圈。區別在於我們的訓練器必須執行我們上面定義的訓練迴圈。

# Main loop for trainers.
def run_worker(rank, world_size, num_gpus, train_loader, test_loader):
    print(f"Worker rank {rank} initializing RPC")
    rpc.init_rpc(
        name=f"trainer_{rank}",
        rank=rank,
        world_size=world_size)

    print(f"Worker {rank} done initializing RPC")

    run_training_loop(rank, num_gpus, train_loader, test_loader)
    rpc.shutdown()

請注意,與 run_parameter_server 類似,rpc.shutdown() 預設將等待所有工作程式(訓練器和引數伺服器)在當前節點退出之前呼叫 rpc.shutdown()。這確保了節點能夠正常終止,並且沒有節點在一個節點期望它線上時離線。

我們現在已經完成了針對訓練器和引數伺服器的程式碼,剩下的就是新增啟動訓練器和引數伺服器的程式碼。首先,我們必須獲取適用於我們的引數伺服器和訓練器的各種引數。world_size 對應於將參與訓練的總節點數,是所有訓練器和引數伺服器的總和。我們還必須為每個單獨的程序傳入一個唯一的 rank,從 0(我們將在此執行單個引數伺服器)到 world_size - 1master_addrmaster_port 是可以用來標識 rank 0 程序正在執行的位置的引數,並且將由各個節點用於互相發現。要在此本地測試此示例,只需為所有例項傳入 localhost 和相同的 master_port。請注意,出於演示目的,此示例僅支援 0-2 個 GPU,儘管該模式可以擴充套件以利用更多 GPU。

if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description="Parameter-Server RPC based training")
    parser.add_argument(
        "--world_size",
        type=int,
        default=4,
        help="""Total number of participating processes. Should be the sum of
        master node and all training nodes.""")
    parser.add_argument(
        "--rank",
        type=int,
        default=None,
        help="Global rank of this process. Pass in 0 for master.")
    parser.add_argument(
        "--num_gpus",
        type=int,
        default=0,
        help="""Number of GPUs to use for training, Currently supports between 0
         and 2 GPUs. Note that this argument will be passed to the parameter servers.""")
    parser.add_argument(
        "--master_addr",
        type=str,
        default="localhost",
        help="""Address of master, will default to localhost if not provided.
        Master must be able to accept network traffic on the address + port.""")
    parser.add_argument(
        "--master_port",
        type=str,
        default="29500",
        help="""Port that master is listening on, will default to 29500 if not
        provided. Master must be able to accept network traffic on the host and port.""")

    args = parser.parse_args()
    assert args.rank is not None, "must provide rank argument."
    assert args.num_gpus <= 3, f"Only 0-2 GPUs currently supported (got {args.num_gpus})."
    os.environ['MASTER_ADDR'] = args.master_addr
    os.environ["MASTER_PORT"] = args.master_port

現在,我們將建立一個對應於引數伺服器或訓練器的程序,具體取決於我們的命令列引數。如果傳入的 rank 為 0,我們將建立一個 ParameterServer,否則建立一個 TrainerNet。請注意,我們正在使用 torch.multiprocessing 來啟動對應於我們要執行的函式的子程序,並在主執行緒中使用 p.join() 等待此程序完成。在初始化我們的訓練器的情況下,我們還使用 PyTorch 的 資料載入器 來為 MNIST 資料集指定訓練和測試資料載入器。

processes = []
world_size = args.world_size
if args.rank == 0:
    p = mp.Process(target=run_parameter_server, args=(0, world_size))
    p.start()
    processes.append(p)
else:
    # Get data to train on
    train_loader = torch.utils.data.DataLoader(
        datasets.MNIST('../data', train=True, download=True,
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ])),
        batch_size=32, shuffle=True,)
    test_loader = torch.utils.data.DataLoader(
        datasets.MNIST(
            '../data',
            train=False,
            transform=transforms.Compose([
                    transforms.ToTensor(),
                    transforms.Normalize((0.1307,), (0.3081,))
                        ])),
        batch_size=32,
        shuffle=True,
    )
    # start training worker on this node
    p = mp.Process(
        target=run_worker,
        args=(
            args.rank,
            world_size, args.num_gpus,
            train_loader,
            test_loader))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

要本地執行該示例,請在單獨的終端視窗中執行以下命令,每個工作伺服器和您希望啟動的每個工作程式:python rpc_parameter_server.py --world_size=WORLD_SIZE --rank=RANK。例如,對於 world_size 為 2 的主節點,命令將是 python rpc_parameter_server.py --world_size=2 --rank=0。然後可以在單獨的視窗中使用命令 python rpc_parameter_server.py --world_size=2 --rank=1 啟動訓練器,這將開始使用一個伺服器和一個訓練器進行訓練。請注意,本教程假定訓練在 0 到 2 個 GPU 之間進行,並且可以透過將 --num_gpus=N 傳遞到訓練指令碼來配置此引數。

您可以傳遞命令列引數 --master_addr=ADDRESS--master_port=PORT 來指示主工作程式正在監聽的地址和埠,例如,測試訓練器和主節點執行在不同機器上的功能。