AsyncVLLM¶
- class torchrl.modules.llm.AsyncVLLM(engine_args: AsyncEngineArgs, num_replicas: int = 1, actor_class=None, enable_prefix_caching: bool = False)[source]¶
一個管理多個非同步 vLLM 引擎 Actor 以進行分散式推理的服務。
這是 TorchRL 中非同步 vLLM 推理的主要入口點。它管理作為 Ray Actor 執行的多個 vLLM 引擎副本,提供負載均衡、權重更新以及統一的文字生成介面。
該服務自動處理 Ray Actor 的生命週期管理、透過 Placement Group 進行 GPU 分配,並提供與標準 vLLM API 相容的同步和非同步生成介面。
- 引數:
engine_args (AsyncEngineArgs) – vLLM 引擎的配置。
num_replicas (int, optional) – 要建立的引擎副本數量。預設為 1。
actor_class (optional) – 自定義 Ray Actor 類。預設為內部 Actor 實現。
enable_prefix_caching (bool, optional) –
是否啟用字首快取。預設為 False。
警告
enable_prefix_caching 預設設定為 False,如果需要 prompt log probs,則建議使用此設定。如果不需要 prompt log probs,則將其設定為 True。有關更多詳細資訊,請參閱 此 issue。
示例
>>> from torchrl.modules.llm.backends.vllm_async import AsyncVLLM >>> from vllm import SamplingParams >>> >>> # Simple usage - single GPU, single replica >>> service = AsyncVLLM.from_pretrained("Qwen/Qwen2.5-3B") >>> >>> # Advanced usage - multi-GPU tensor parallel with multiple replicas >>> service = AsyncVLLM.from_pretrained( ... "Qwen/Qwen2.5-7B", ... num_devices=2, # Use 2 GPUs for tensor parallelism ... num_replicas=2, # Create 2 replicas for higher throughput ... max_model_len=4096 ... ) >>> >>> # Generate text >>> sampling_params = SamplingParams(temperature=0.7, max_tokens=100) >>> result = service.generate("Hello, world!", sampling_params) >>> print(result.outputs[0].text) >>> >>> # Alternative: using AsyncEngineArgs directly for advanced configuration >>> from vllm import AsyncEngineArgs >>> engine_args = AsyncEngineArgs( ... model="Qwen/Qwen2.5-3B", ... tensor_parallel_size=2 ... ) >>> service = AsyncVLLM.launch(engine_args, num_replicas=2)
注意
架構與設計
AsyncVLLM 服務實現了一個具有以下關鍵元件的分散式推理架構:
Ray Actor 管理:每個副本都執行為一個獨立的 Ray Actor,擁有專用的 GPU 資源。該服務建立一個 Placement Group,以確保最佳的 GPU 分配,並在可能的情況下將張量並行工作節點共置於同一節點上。
負載均衡:預設情況下,生成請求透過隨機選擇在副本之間分發,或者可以使用 actor_index 引數定向到特定副本。
權重同步:該服務支援透過 NCCL 通訊組在所有副本之間進行權重更新,從而能夠與分散式訓練工作流整合。
資源管理:透過 Ray Placement Group 自動進行 GPU 分配和清理,並具有適當的關機程式以防止資源洩漏。
API 相容性:提供與 vLLM 的同步 LLM.generate() 方法相同的介面,使其成為非同步工作負載的即插即用替換。
Ray 整合
該服務利用 Ray 的 Actor 模型進行分散式執行。每個副本都是一個獨立的 Ray Actor,可以在不同節點上排程。該服務處理 Actor 的生命週期、監控就緒狀態,並提供對所有副本的集中訪問。
效能考慮
為提高重複提示的效能,預設啟用字首快取
支援張量並行,適用於不適合單個 GPU 的大型模型
多個副本允許併發處理不同的請求
每個副本內部使用原生 vLLM 批處理以獲得最佳吞吐量
錯誤處理
該服務包括超時支援、優雅關機程式以及在失敗時盡力清理請求。Ray 的容錯機制為長時間執行的推理工作負載提供了額外的彈性。
- collective_rpc(method: str, timeout: float | None = None, args: tuple = (), kwargs: dict | None = None) list[Any][source]¶
將 RPC 轉發給所有 Actor。
- 引數:
method (str) – 要呼叫的方法名稱。
timeout (float | None) – RPC 呼叫的超時時間。
args (tuple) – 要傳遞給方法的引數。
kwargs (dict | None) – 要傳遞給方法的關鍵字引數。
- 返回:
所有 RPC 呼叫的 Ray futures。
- 返回型別:
list[Any]
- create_load_balancer(strategy: Literal['requests', 'kv-cache'] | Sequence[Literal['prefix-aware', 'requests', 'kv-cache', 'round-robin']] | None = None, **kwargs) LoadBalancer[source]¶
為此 AsyncVLLM 服務建立一個負載均衡器。
- 引數:
strategy – 負載均衡策略或策略序列(按回退順序)。預設值:[“prefix-aware”, “requests”] - 先嚐試快取感知路由,然後是負載均衡。單一策略:“requests”、“kv-cache” 策略序列:[“prefix-aware”、“requests”、“round-robin”]
**kwargs – 傳遞給 LoadBalancer 建構函式的其他引數。
- 返回:
已配置的負載均衡器例項。此例項儲存在 AsyncVLLM 例項中。
- 返回型別:
LoadBalancer
示例
>>> service = AsyncVLLM.from_pretrained("Qwen/Qwen2.5-3B", num_replicas=3)
>>> # Use smart defaults (prefix-aware -> requests) >>> lb = service.create_load_balancer() >>> selected_actor_index = lb.select_actor(prompt="Hello world")
>>> # Simple single strategy >>> lb = service.create_load_balancer("requests") >>> selected_actor_index = lb.select_actor()
>>> # Custom strategy hierarchy >>> lb = service.create_load_balancer( ... ["prefix-aware", "kv-cache", "round-robin"], ... prefix_length=16, ... overload_threshold=2.0 ... ) >>> selected_actor_index = lb.select_actor(prompt="Hello world")
- classmethod from_pretrained(model_name: str, num_devices: int | None = None, num_replicas: int = 1, verbose: bool = True, compile: bool = True, **kwargs) AsyncVLLM[source]¶
從預訓練模型建立 AsyncVLLM 例項。
這是一個方便的方法,它將模型載入和服務的啟動合併為一個呼叫,類似於其他機器學習庫的工作方式。
- 引數:
model_name (str) – 要傳遞給 vLLM 的模型名稱。
num_devices (int, optional) – 要使用的裝置數量,每個副本。
num_replicas (int) – 要建立的引擎副本數量。
verbose (bool, optional) – 是否啟用詳細日誌記錄和吞吐量統計資訊。預設為 True。
compile (bool, optional) – 是否啟用模型編譯以獲得更好的效能。預設為 True。
**kwargs – 傳遞給 AsyncEngineArgs 的其他引數。
- 返回:
已啟動的非同步 vLLM 服務。
- 返回型別:
示例
>>> # Simple usage with defaults >>> service = AsyncVLLM.from_pretrained("Qwen/Qwen2.5-3B") >>> >>> # Multi-GPU tensor parallel with multiple replicas >>> service = AsyncVLLM.from_pretrained( ... "Qwen/Qwen2.5-7B", ... num_devices=2, ... num_replicas=2, ... max_model_len=4096 ... ) >>> >>> # Generate text >>> from vllm import SamplingParams >>> result = service.generate("Hello, world!", SamplingParams(max_tokens=50))
- generate(prompts: Any = None, sampling_params: SamplingParams | None = None, *, prompt_token_ids: list[int] | list[list[int]] | None = None, use_tqdm: bool = True, lora_request: Any = None, prompt_adapter_request: Any = None, guided_options_request: Any = None, timeout_seconds: float | None = None, actor_index: int | None = None) RequestOutput | list[RequestOutput][source]¶
使用 vLLM.LLM.generate 介面的 Actor 生成文字。
此方法提供與 vLLM.LLM.generate 相同的介面,以實現同步和非同步引擎之間的無縫相容性。它可以用於在多個執行緒/Actor 中生成文字。如果未提供 actor_index,將使用負載均衡器來選擇 Actor。
generate 是一個阻塞方法,因此它將等待生成完成。
- 引數:
prompts (String, TokensPrompt, or list of these) – 用於生成的輸入提示。
sampling_params (SamplingParams) – 用於控制生成行為的 SamplingParams 物件。
prompt_token_ids (list[int] | list[list[int]]) – 提示的替代選項 - 用於生成的 token ID。
use_tqdm (bool) – 是否顯示進度條(在非同步引擎中不使用)。
lora_request (Any) – 用於基於介面卡的生成的 LoRA 請求。
prompt_adapter_request (Any) – 提示介面卡請求。
guided_options_request (Any) – 引導解碼選項。
timeout_seconds (float | None) – 生成的超時時間(秒)。
actor_index (int | None) – 要使用的特定 Actor(如果為 None,則隨機選擇)。
- 返回:
來自 vLLM 的生成輸出。
- 返回型別:
RequestOutput | list[RequestOutput]
- get_cache_usage(actor_index: int | None = None) float | list[float][source]¶
獲取一個或所有 Actor 的 KV 快取使用情況。
- 引數:
actor_index (int | None) – 特定 Actor 的索引,如果為 None,則表示所有 Actor。
- 返回:
- 指定 Actor 的快取使用率,
如果 actor_index 為 None,則表示所有 Actor 的使用率列表。
- 返回型別:
float | list[float]
- get_model_metadata() dict[str, tuple[torch.dtype, torch.Size]][source]¶
獲取模型引數元資料。
注意:這需要模型已載入。目前,我們返回一個空字典,並期望元資料在權重更新期間從外部提供。
- get_num_unfinished_requests(actor_index: int | None = None) int | list[int][source]¶
獲取一個或所有 Actor 的未完成請求數量。
- 引數:
actor_index (int | None) – 特定 Actor 的索引,如果為 None,則表示所有 Actor。
- 返回:
- 指定 Actor 的未完成請求數量,
如果 actor_index 為 None,則表示所有 Actor 的計數列表。
- 返回型別:
int | list[int]
- classmethod launch(engine_args: AsyncEngineArgs, num_replicas: int = 1) AsyncVLLM[source]¶
啟動一個新的 AsyncVLLMEngineService。
- 引數:
engine_args (AsyncEngineArgs) – 建立 AsyncLLMEngine 例項的引數。
num_replicas (int) – 要建立的 Actor 副本數量。
- 返回:
已啟動的服務。
- 返回型別:
AsyncVLLMEngineService
- update_weights(weights: Iterator[tuple[str, torch.Tensor]]) None[source]¶
使用 NCCL 廣播在所有副本中更新模型權重。
- 引數:
weights – 產生 (parameter_name, tensor) 元組的迭代器