快捷方式

AsyncVLLM

class torchrl.modules.llm.AsyncVLLM(engine_args: AsyncEngineArgs, num_replicas: int = 1, actor_class=None, enable_prefix_caching: bool = False)[source]

一個管理多個非同步 vLLM 引擎 Actor 以進行分散式推理的服務。

這是 TorchRL 中非同步 vLLM 推理的主要入口點。它管理作為 Ray Actor 執行的多個 vLLM 引擎副本,提供負載均衡、權重更新以及統一的文字生成介面。

該服務自動處理 Ray Actor 的生命週期管理、透過 Placement Group 進行 GPU 分配,並提供與標準 vLLM API 相容的同步和非同步生成介面。

引數:
  • engine_args (AsyncEngineArgs) – vLLM 引擎的配置。

  • num_replicas (int, optional) – 要建立的引擎副本數量。預設為 1。

  • actor_class (optional) – 自定義 Ray Actor 類。預設為內部 Actor 實現。

  • enable_prefix_caching (bool, optional) –

    是否啟用字首快取。預設為 False。

    警告

    enable_prefix_caching 預設設定為 False,如果需要 prompt log probs,則建議使用此設定。如果不需要 prompt log probs,則將其設定為 True。有關更多詳細資訊,請參閱 此 issue

示例

>>> from torchrl.modules.llm.backends.vllm_async import AsyncVLLM
>>> from vllm import SamplingParams
>>>
>>> # Simple usage - single GPU, single replica
>>> service = AsyncVLLM.from_pretrained("Qwen/Qwen2.5-3B")
>>>
>>> # Advanced usage - multi-GPU tensor parallel with multiple replicas
>>> service = AsyncVLLM.from_pretrained(
...     "Qwen/Qwen2.5-7B",
...     num_devices=2,  # Use 2 GPUs for tensor parallelism
...     num_replicas=2,  # Create 2 replicas for higher throughput
...     max_model_len=4096
... )
>>>
>>> # Generate text
>>> sampling_params = SamplingParams(temperature=0.7, max_tokens=100)
>>> result = service.generate("Hello, world!", sampling_params)
>>> print(result.outputs[0].text)
>>>
>>> # Alternative: using AsyncEngineArgs directly for advanced configuration
>>> from vllm import AsyncEngineArgs
>>> engine_args = AsyncEngineArgs(
...     model="Qwen/Qwen2.5-3B",
...     tensor_parallel_size=2
... )
>>> service = AsyncVLLM.launch(engine_args, num_replicas=2)

注意

架構與設計

AsyncVLLM 服務實現了一個具有以下關鍵元件的分散式推理架構:

  1. Ray Actor 管理:每個副本都執行為一個獨立的 Ray Actor,擁有專用的 GPU 資源。該服務建立一個 Placement Group,以確保最佳的 GPU 分配,並在可能的情況下將張量並行工作節點共置於同一節點上。

  2. 負載均衡:預設情況下,生成請求透過隨機選擇在副本之間分發,或者可以使用 actor_index 引數定向到特定副本。

  3. 權重同步:該服務支援透過 NCCL 通訊組在所有副本之間進行權重更新,從而能夠與分散式訓練工作流整合。

  4. 資源管理:透過 Ray Placement Group 自動進行 GPU 分配和清理,並具有適當的關機程式以防止資源洩漏。

  5. API 相容性:提供與 vLLM 的同步 LLM.generate() 方法相同的介面,使其成為非同步工作負載的即插即用替換。

Ray 整合

該服務利用 Ray 的 Actor 模型進行分散式執行。每個副本都是一個獨立的 Ray Actor,可以在不同節點上排程。該服務處理 Actor 的生命週期、監控就緒狀態,並提供對所有副本的集中訪問。

效能考慮

  • 為提高重複提示的效能,預設啟用字首快取

  • 支援張量並行,適用於不適合單個 GPU 的大型模型

  • 多個副本允許併發處理不同的請求

  • 每個副本內部使用原生 vLLM 批處理以獲得最佳吞吐量

錯誤處理

該服務包括超時支援、優雅關機程式以及在失敗時盡力清理請求。Ray 的容錯機制為長時間執行的推理工作負載提供了額外的彈性。

collective_rpc(method: str, timeout: float | None = None, args: tuple = (), kwargs: dict | None = None) list[Any][source]

將 RPC 轉發給所有 Actor。

引數:
  • method (str) – 要呼叫的方法名稱。

  • timeout (float | None) – RPC 呼叫的超時時間。

  • args (tuple) – 要傳遞給方法的引數。

  • kwargs (dict | None) – 要傳遞給方法的關鍵字引數。

返回:

所有 RPC 呼叫的 Ray futures。

返回型別:

list[Any]

create_load_balancer(strategy: Literal['requests', 'kv-cache'] | Sequence[Literal['prefix-aware', 'requests', 'kv-cache', 'round-robin']] | None = None, **kwargs) LoadBalancer[source]

為此 AsyncVLLM 服務建立一個負載均衡器。

引數:
  • strategy – 負載均衡策略或策略序列(按回退順序)。預設值:[“prefix-aware”, “requests”] - 先嚐試快取感知路由,然後是負載均衡。單一策略:“requests”、“kv-cache” 策略序列:[“prefix-aware”、“requests”、“round-robin”]

  • **kwargs – 傳遞給 LoadBalancer 建構函式的其他引數。

返回:

已配置的負載均衡器例項。此例項儲存在 AsyncVLLM 例項中。

返回型別:

LoadBalancer

示例

>>> service = AsyncVLLM.from_pretrained("Qwen/Qwen2.5-3B", num_replicas=3)
>>> # Use smart defaults (prefix-aware -> requests)
>>> lb = service.create_load_balancer()
>>> selected_actor_index = lb.select_actor(prompt="Hello world")
>>> # Simple single strategy
>>> lb = service.create_load_balancer("requests")
>>> selected_actor_index = lb.select_actor()
>>> # Custom strategy hierarchy
>>> lb = service.create_load_balancer(
...     ["prefix-aware", "kv-cache", "round-robin"],
...     prefix_length=16,
...     overload_threshold=2.0
... )
>>> selected_actor_index = lb.select_actor(prompt="Hello world")
classmethod from_pretrained(model_name: str, num_devices: int | None = None, num_replicas: int = 1, verbose: bool = True, compile: bool = True, **kwargs) AsyncVLLM[source]

從預訓練模型建立 AsyncVLLM 例項。

這是一個方便的方法,它將模型載入和服務的啟動合併為一個呼叫,類似於其他機器學習庫的工作方式。

引數:
  • model_name (str) – 要傳遞給 vLLM 的模型名稱。

  • num_devices (int, optional) – 要使用的裝置數量,每個副本。

  • num_replicas (int) – 要建立的引擎副本數量。

  • verbose (bool, optional) – 是否啟用詳細日誌記錄和吞吐量統計資訊。預設為 True。

  • compile (bool, optional) – 是否啟用模型編譯以獲得更好的效能。預設為 True。

  • **kwargs – 傳遞給 AsyncEngineArgs 的其他引數。

返回:

已啟動的非同步 vLLM 服務。

返回型別:

AsyncVLLM

示例

>>> # Simple usage with defaults
>>> service = AsyncVLLM.from_pretrained("Qwen/Qwen2.5-3B")
>>>
>>> # Multi-GPU tensor parallel with multiple replicas
>>> service = AsyncVLLM.from_pretrained(
...     "Qwen/Qwen2.5-7B",
...     num_devices=2,
...     num_replicas=2,
...     max_model_len=4096
... )
>>>
>>> # Generate text
>>> from vllm import SamplingParams
>>> result = service.generate("Hello, world!", SamplingParams(max_tokens=50))
generate(prompts: Any = None, sampling_params: SamplingParams | None = None, *, prompt_token_ids: list[int] | list[list[int]] | None = None, use_tqdm: bool = True, lora_request: Any = None, prompt_adapter_request: Any = None, guided_options_request: Any = None, timeout_seconds: float | None = None, actor_index: int | None = None) RequestOutput | list[RequestOutput][source]

使用 vLLM.LLM.generate 介面的 Actor 生成文字。

此方法提供與 vLLM.LLM.generate 相同的介面,以實現同步和非同步引擎之間的無縫相容性。它可以用於在多個執行緒/Actor 中生成文字。如果未提供 actor_index,將使用負載均衡器來選擇 Actor。

generate 是一個阻塞方法,因此它將等待生成完成。

引數:
  • prompts (String, TokensPrompt, or list of these) – 用於生成的輸入提示。

  • sampling_params (SamplingParams) – 用於控制生成行為的 SamplingParams 物件。

  • prompt_token_ids (list[int] | list[list[int]]) – 提示的替代選項 - 用於生成的 token ID。

  • use_tqdm (bool) – 是否顯示進度條(在非同步引擎中不使用)。

  • lora_request (Any) – 用於基於介面卡的生成的 LoRA 請求。

  • prompt_adapter_request (Any) – 提示介面卡請求。

  • guided_options_request (Any) – 引導解碼選項。

  • timeout_seconds (float | None) – 生成的超時時間(秒)。

  • actor_index (int | None) – 要使用的特定 Actor(如果為 None,則隨機選擇)。

返回:

來自 vLLM 的生成輸出。

返回型別:

RequestOutput | list[RequestOutput]

get_cache_usage(actor_index: int | None = None) float | list[float][source]

獲取一個或所有 Actor 的 KV 快取使用情況。

引數:

actor_index (int | None) – 特定 Actor 的索引,如果為 None,則表示所有 Actor。

返回:

指定 Actor 的快取使用率,

如果 actor_index 為 None,則表示所有 Actor 的使用率列表。

返回型別:

float | list[float]

get_master_address() str[source]

獲取用於權重同步的主地址。

get_master_port() int[source]

獲取用於權重同步的主埠。

get_model_metadata() dict[str, tuple[torch.dtype, torch.Size]][source]

獲取模型引數元資料。

注意:這需要模型已載入。目前,我們返回一個空字典,並期望元資料在權重更新期間從外部提供。

get_num_unfinished_requests(actor_index: int | None = None) int | list[int][source]

獲取一個或所有 Actor 的未完成請求數量。

引數:

actor_index (int | None) – 特定 Actor 的索引,如果為 None,則表示所有 Actor。

返回:

指定 Actor 的未完成請求數量,

如果 actor_index 為 None,則表示所有 Actor 的計數列表。

返回型別:

int | list[int]

get_random_actor_index() int[source]

獲取一個隨機 Actor 索引。

get_tp_size() int[source]

獲取張量並行大小。

init_weight_update_group() None[source]

初始化權重更新通訊組(RLvLLMEngine 介面)。

classmethod launch(engine_args: AsyncEngineArgs, num_replicas: int = 1) AsyncVLLM[source]

啟動一個新的 AsyncVLLMEngineService。

引數:
  • engine_args (AsyncEngineArgs) – 建立 AsyncLLMEngine 例項的引數。

  • num_replicas (int) – 要建立的 Actor 副本數量。

返回:

已啟動的服務。

返回型別:

AsyncVLLMEngineService

shutdown()[source]

關閉所有 Actor 並清理資源。

update_weights(weights: Iterator[tuple[str, torch.Tensor]]) None[source]

使用 NCCL 廣播在所有副本中更新模型權重。

引數:

weights – 產生 (parameter_name, tensor) 元組的迭代器

文件

訪問全面的 PyTorch 開發者文件

檢視文件

教程

為初學者和高階開發者提供深入的教程

檢視教程

資源

查詢開發資源並讓您的問題得到解答

檢視資源