diff --git a/doc/workloads/index.rst b/doc/workloads/index.rst index b8d23ddad..af6dccb7f 100644 --- a/doc/workloads/index.rst +++ b/doc/workloads/index.rst @@ -29,6 +29,7 @@ Available Workloads ":doc:`slurm_container`", "✅", "❌", "❌", "❌" "Triton Inference", "✅", "❌", "❌", "❌" ":doc:`ucc`", "✅", "❌", "❌", "❌" + ":doc:`vllm`", "✅", "❌", "❌", "❌" .. toctree:: :hidden: diff --git a/doc/workloads/vllm.rst b/doc/workloads/vllm.rst new file mode 100644 index 000000000..65ec9d1f6 --- /dev/null +++ b/doc/workloads/vllm.rst @@ -0,0 +1,154 @@ +vLLM +==== + +This workload (``test_template_name`` is ``vllm``) allows users to execute vLLM benchmarks within the CloudAI framework. + +vLLM is a high-throughput and memory-efficient inference engine for LLMs. This workload supports both aggregated and disaggregated prefill/decode modes. + +Usage Examples +-------------- + +Test + Scenario example +~~~~~~~~~~~~~~~~~~~~~~~ + +.. code-block:: toml + :caption: test.toml (test definition) + + name = "vllm_test" + description = "Example vLLM test" + test_template_name = "vllm" + + [cmd_args] + docker_image_url = "nvcr.io#nvidia/ai-dynamo/vllm-runtime:0.7.0" + model = "Qwen/Qwen3-0.6B" + + [bench_cmd_args] + random_input_len = 16 + random_output_len = 128 + max_concurrency = 16 + num_prompts = 30 + + +.. code-block:: toml + :caption: scenario.toml (scenario with one test) + + name = "vllm-benchmark" + + [[Tests]] + id = "vllm.1" + num_nodes = 1 + time_limit = "00:10:00" + test_name = "vllm_test" + +Test-in-Scenario example +~~~~~~~~~~~~~~~~~~~~~~~~ + +.. code-block:: toml + :caption: scenario.toml (separate test toml is not needed) + + name = "vllm-benchmark" + + [[Tests]] + id = "vllm.1" + num_nodes = 1 + time_limit = "00:10:00" + + name = "vllm_test" + description = "Example vLLM test" + test_template_name = "vllm" + + [Tests.cmd_args] + docker_image_url = "nvcr.io#nvidia/ai-dynamo/vllm-runtime:0.7.0" + model = "Qwen/Qwen3-0.6B" + + [Tests.bench_cmd_args] + random_input_len = 16 + random_output_len = 128 + max_concurrency = 16 + num_prompts = 30 + + +Control number of GPUs +---------------------- +The number of GPUs can be controlled using the options below, listed from lowest to highest priority: +1. ``gpus_per_node`` system property (scalar value) +2. ``CUDA_VISIBLE_DEVICES`` environment variable (comma-separated list of GPU IDs) +3. ``gpu_ids`` command argument for ``prefill`` and ``decode`` configurations (comma-separated list of GPU IDs) + + +Control disaggregation +---------------------- +By default, vLLM will run without disaggregation as a single process. To enable disaggregation, one needs to set ``prefill`` configuration: + +.. code-block:: toml + :caption: test.toml (disaggregated prefill/decode) + + [cmd_args] + docker_image_url = "nvcr.io#nvidia/ai-dynamo/vllm-runtime:0.7.0" + model = "Qwen/Qwen3-0.6B" + + [cmd_args.prefill] + + [extra_env_vars] + CUDA_VISIBLE_DEVICES = "0,1,2,3" + +The config above will automatically split GPUs specified in ``CUDA_VISIBLE_DEVICES`` into two halves, first half will be used for prefill and second half will be used for decode. + +For more control, one can specify the GPU IDs explicitly in ``prefill`` and ``decode`` configurations: + +.. code-block:: toml + :caption: test.toml (disaggregated prefill/decode) + + [cmd_args.prefill] + gpu_ids = "0,1" + + [cmd_args.decode] + gpu_ids = "2,3" + +In this case ``CUDA_VISIBLE_DEVICES`` will be ignored and only the GPUs specified in ``gpu_ids`` will be used. + + +Control ``proxy_script`` +------------------------ +``proxy_script`` is used to proxy the requests from the client to the prefill and decode instances. It is ignored for non-disaggregated mode. Default value can be found below. + +It can be overridden by setting ``proxy_script`` by using the latest version of the script from vLLM repository: + +.. code-block:: toml + :caption: test_scenario.toml (override proxy_script) + + [[Tests.git_repos]] + url = "https://github.com/vllm-project/vllm.git" + commit = "main" + mount_as = "/vllm_repo" + + [Tests.cmd_args] + docker_image_url = "vllm/vllm-openai:v0.14.0-cu130" + proxy_script = "/vllm_repo/tests/v1/kv_connector/nixl_integration/toy_proxy_server.py" + +In this case the proxy script will be mounted from the vLLM repository (cloned locally) as ``/vllm_repo`` and used for the test. + + +API Documentation +----------------- + +Command Arguments +~~~~~~~~~~~~~~~~~ + +.. autoclass:: cloudai.workloads.vllm.vllm.VllmCmdArgs + :members: + :show-inheritance: + +Benchmark Command Arguments +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. autoclass:: cloudai.workloads.vllm.vllm.VllmBenchCmdArgs + :members: + :show-inheritance: + +Test Definition +~~~~~~~~~~~~~~~ + +.. autoclass:: cloudai.workloads.vllm.vllm.VllmTestDefinition + :members: + :show-inheritance: diff --git a/src/cloudai/registration.py b/src/cloudai/registration.py index f9be227e6..749d7da24 100644 --- a/src/cloudai/registration.py +++ b/src/cloudai/registration.py @@ -1,5 +1,5 @@ # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -159,6 +159,11 @@ def register_all(): UCCTestReportGenerationStrategy, UCCTestSlurmCommandGenStrategy, ) + from cloudai.workloads.vllm import ( + VLLMBenchReportGenerationStrategy, + VllmSlurmCommandGenStrategy, + VllmTestDefinition, + ) Registry().add_runner("slurm", SlurmRunner) Registry().add_runner("kubernetes", KubernetesRunner) @@ -216,6 +221,7 @@ def register_all(): Registry().add_command_gen_strategy(SlurmSystem, BashCmdTestDefinition, BashCmdCommandGenStrategy) Registry().add_command_gen_strategy(SlurmSystem, NIXLKVBenchTestDefinition, NIXLKVBenchSlurmCommandGenStrategy) Registry().add_command_gen_strategy(SlurmSystem, OSUBenchTestDefinition, OSUBenchSlurmCommandGenStrategy) + Registry().add_command_gen_strategy(SlurmSystem, VllmTestDefinition, VllmSlurmCommandGenStrategy) Registry().add_installer("slurm", SlurmInstaller) Registry().add_installer("standalone", StandaloneInstaller) @@ -251,6 +257,7 @@ def register_all(): Registry().add_test_definition("NIXLKVBench", NIXLKVBenchTestDefinition) Registry().add_test_definition("Aiconfigurator", AiconfiguratorTestDefinition) Registry().add_test_definition("OSUBench", OSUBenchTestDefinition) + Registry().add_test_definition("vllm", VllmTestDefinition) Registry().add_agent("grid_search", GridSearchAgent) @@ -271,6 +278,7 @@ def register_all(): Registry().add_report(AIDynamoTestDefinition, AIDynamoReportGenerationStrategy) Registry().add_report(AiconfiguratorTestDefinition, AiconfiguratorReportGenerationStrategy) Registry().add_report(NixlPerftestTestDefinition, NIXLKVBenchDummyReport) + Registry().add_report(VllmTestDefinition, VLLMBenchReportGenerationStrategy) Registry().add_scenario_report("per_test", PerTestReporter, ReportConfig(enable=True)) Registry().add_scenario_report("status", StatusReporter, ReportConfig(enable=True)) diff --git a/src/cloudai/workloads/vllm/__init__.py b/src/cloudai/workloads/vllm/__init__.py new file mode 100644 index 000000000..e0ec73351 --- /dev/null +++ b/src/cloudai/workloads/vllm/__init__.py @@ -0,0 +1,29 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .report_generation_strategy import VLLMBenchReportGenerationStrategy +from .slurm_command_gen_strategy import VllmSlurmCommandGenStrategy +from .vllm import VLLM_BENCH_LOG_FILE, VllmArgs, VllmBenchCmdArgs, VllmCmdArgs, VllmTestDefinition + +__all__ = [ + "VLLM_BENCH_LOG_FILE", + "VLLMBenchReportGenerationStrategy", + "VllmArgs", + "VllmBenchCmdArgs", + "VllmCmdArgs", + "VllmSlurmCommandGenStrategy", + "VllmTestDefinition", +] diff --git a/src/cloudai/workloads/vllm/report_generation_strategy.py b/src/cloudai/workloads/vllm/report_generation_strategy.py new file mode 100644 index 000000000..7ade3e325 --- /dev/null +++ b/src/cloudai/workloads/vllm/report_generation_strategy.py @@ -0,0 +1,91 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +from functools import cache +from pathlib import Path + +from pydantic import BaseModel, ConfigDict +from rich.console import Console +from rich.table import Table + +from cloudai.core import ReportGenerationStrategy + +from .vllm import VLLM_BENCH_JSON_FILE + + +class VLLMBenchReport(BaseModel): + """Report for vLLM benchmark results.""" + + model_config = ConfigDict(extra="ignore") + + num_prompts: int + completed: int + mean_ttft_ms: float + median_ttft_ms: float + p99_ttft_ms: float + mean_tpot_ms: float + median_tpot_ms: float + p99_tpot_ms: float + + +@cache +def parse_vllm_bench_output(res_file: Path) -> VLLMBenchReport | None: + """Parse the vLLM benchmark output file and return a VLLMBenchReport object.""" + if not res_file.is_file(): + return None + + try: + with res_file.open("r") as f: + data = json.load(f) + return VLLMBenchReport.model_validate(data) + except Exception as e: + logging.debug(f"Error parsing vLLM benchmark output: {e}") + return None + + +class VLLMBenchReportGenerationStrategy(ReportGenerationStrategy): + """Generate a report for vLLM benchmark results.""" + + def can_handle_directory(self) -> bool: + return parse_vllm_bench_output(self.test_run.output_path / VLLM_BENCH_JSON_FILE) is not None + + def generate_report(self) -> None: + results = parse_vllm_bench_output(self.test_run.output_path / VLLM_BENCH_JSON_FILE) + if results is None: + return + + console = Console() + table = Table(title=f"vLLM Benchmark Results ({self.test_run.output_path})", title_justify="left") + table.add_column("Successful prompts", justify="right") + table.add_column("TTFT Mean, ms", justify="right") + table.add_column("TTFT Median, ms", justify="right") + table.add_column("TTFT P99, ms", justify="right") + table.add_column("TPOT Mean, ms", justify="right") + table.add_column("TPOT Median, ms", justify="right") + table.add_column("TPOT P99, ms", justify="right") + table.add_row( + f"{results.completed / results.num_prompts * 100:.2f}% ({results.completed} of {results.num_prompts})", + f"{results.mean_ttft_ms:.4f}", + f"{results.median_ttft_ms:.4f}", + f"{results.p99_ttft_ms:.4f}", + f"{results.mean_tpot_ms:.4f}", + f"{results.median_tpot_ms:.4f}", + f"{results.p99_tpot_ms:.4f}", + ) + + console.print(table) diff --git a/src/cloudai/workloads/vllm/slurm_command_gen_strategy.py b/src/cloudai/workloads/vllm/slurm_command_gen_strategy.py new file mode 100644 index 000000000..9b5f0522a --- /dev/null +++ b/src/cloudai/workloads/vllm/slurm_command_gen_strategy.py @@ -0,0 +1,268 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +from typing import cast + +from cloudai.systems.slurm import SlurmCommandGenStrategy + +from .vllm import VLLM_BENCH_JSON_FILE, VLLM_BENCH_LOG_FILE, VLLM_SERVE_LOG_FILE, VllmCmdArgs, VllmTestDefinition + + +class VllmSlurmCommandGenStrategy(SlurmCommandGenStrategy): + """Command generation strategy for vLLM on Slurm systems.""" + + def _container_mounts(self) -> list[str]: + return [f"{self.system.hf_home_path.absolute()}:/root/.cache/huggingface"] + + def image_path(self) -> str | None: + return str(self.tdef.docker_image.installed_path) + + @property + def tdef(self) -> VllmTestDefinition: + return cast(VllmTestDefinition, self.test_run.test) + + @property + def gpu_ids(self) -> list[int]: + cuda_devices = str(self.test_run.test.extra_env_vars.get("CUDA_VISIBLE_DEVICES", "")) + if (self.tdef.cmd_args.prefill and self.tdef.cmd_args.prefill.gpu_ids) and self.tdef.cmd_args.decode.gpu_ids: + cuda_devices = f"{self.tdef.cmd_args.prefill.gpu_ids},{self.tdef.cmd_args.decode.gpu_ids}" + if cuda_devices: + return [int(gpu_id) for gpu_id in cuda_devices.split(",")] + return list(range(self.system.gpus_per_node or 1)) + + @property + def prefill_gpu_ids(self) -> list[int]: + if self.tdef.cmd_args.prefill and self.tdef.cmd_args.prefill.gpu_ids: + return [int(gpu_id) for gpu_id in str(self.tdef.cmd_args.prefill.gpu_ids).split(",")] + mid = len(self.gpu_ids) // 2 + return self.gpu_ids[:mid] + + @property + def decode_gpu_ids(self) -> list[int]: + if self.tdef.cmd_args.decode.gpu_ids: + return [int(gpu_id) for gpu_id in str(self.tdef.cmd_args.decode.gpu_ids).split(",")] + mid = len(self.gpu_ids) // 2 + return self.gpu_ids[mid:] + + def get_vllm_serve_commands(self) -> list[list[str]]: + tdef: VllmTestDefinition = cast(VllmTestDefinition, self.test_run.test) + cmd_args: VllmCmdArgs = tdef.cmd_args + + base_cmd = ["vllm", "serve", cmd_args.model] + if not tdef.cmd_args.prefill: + return [[*base_cmd, *tdef.cmd_args.decode.serve_args, "--port", str(cmd_args.port)]] + + prefill_port = cmd_args.port + 100 + decode_port = cmd_args.port + 200 + + prefill_extra_args = tdef.cmd_args.prefill.serve_args if tdef.cmd_args.prefill else [] + prefill_cmd = [ + *base_cmd, + "--port", + str(prefill_port), + "--kv-transfer-config", + '\'{"kv_connector":"NixlConnector","kv_role":"kv_producer"}\'', + *prefill_extra_args, + ] + decode_cmd = [ + *base_cmd, + "--port", + str(decode_port), + "--kv-transfer-config", + '\'{"kv_connector":"NixlConnector","kv_role":"kv_consumer"}\'', + *tdef.cmd_args.decode.serve_args, + ] + return [prefill_cmd, decode_cmd] + + def get_proxy_command(self) -> list[str]: + tdef: VllmTestDefinition = cast(VllmTestDefinition, self.test_run.test) + cmd_args: VllmCmdArgs = tdef.cmd_args + prefill_port = cmd_args.port + 100 + decode_port = cmd_args.port + 200 + return [ + "python3", + cmd_args.proxy_script, + "--port", + str(cmd_args.port), + "--prefiller-hosts", + "0.0.0.0", + "--prefiller-ports", + str(prefill_port), + "--decoder-hosts", + "0.0.0.0", + "--decoder-ports", + str(decode_port), + ] + + def get_vllm_bench_command(self) -> list[str]: + tdef: VllmTestDefinition = cast(VllmTestDefinition, self.test_run.test) + cmd_args: VllmCmdArgs = tdef.cmd_args + bench_args = tdef.bench_cmd_args + extra_args = tdef.bench_cmd_args.model_extra or {} + extras = ["--" + k.replace("_", "-") + " " + str(v) for k, v in extra_args.items()] + return [ + "vllm", + "bench", + "serve", + f"--model {cmd_args.model}", + f"--base-url http://0.0.0.0:{cmd_args.port}", + f"--random-input-len {bench_args.random_input_len}", + f"--random-output-len {bench_args.random_output_len}", + f"--max-concurrency {bench_args.max_concurrency}", + f"--num-prompts {bench_args.num_prompts}", + f"--result-dir {self.test_run.output_path.absolute()}", + f"--result-filename {VLLM_BENCH_JSON_FILE}", + "--save-result", + *extras, + ] + + def generate_wait_for_health_function(self) -> str: + tdef: VllmTestDefinition = cast(VllmTestDefinition, self.test_run.test) + cmd_args: VllmCmdArgs = tdef.cmd_args + + return f"""\ +wait_for_health() {{ + local endpoint="$1" + local timeout={cmd_args.vllm_serve_wait_seconds} + local interval=5 + local end_time=$(($(date +%s) + timeout)) + + while [ "$(date +%s)" -lt "$end_time" ]; do + if curl -sf "$endpoint" > /dev/null 2>&1; then + echo "Health check passed: $endpoint" + return 0 + fi + sleep "$interval" + done + + echo "Timeout waiting for: $endpoint" + return 1 +}}""" + + def _gen_srun_command(self) -> str: + tdef: VllmTestDefinition = cast(VllmTestDefinition, self.test_run.test) + cmd_args: VllmCmdArgs = tdef.cmd_args + output_path = self.test_run.output_path.absolute() + + srun_prefix = " ".join(self.gen_srun_prefix()) + serve_commands = self.get_vllm_serve_commands() + bench_cmd = " ".join(self.get_vllm_bench_command()) + health_func = self.generate_wait_for_health_function() + + if len(serve_commands) == 1: + return self._gen_aggregated_script( + srun_prefix, serve_commands[0], bench_cmd, health_func, cmd_args, output_path + ) + else: + return self._gen_disaggregated_script( + srun_prefix, serve_commands, bench_cmd, health_func, cmd_args, output_path + ) + + def _gen_aggregated_script( + self, + srun_prefix: str, + serve_cmd: list[str], + bench_cmd: str, + health_func: str, + cmd_args: VllmCmdArgs, + output_path: Path, + ) -> str: + return f"""\ +cleanup() {{ + echo "Cleaning up PIDs: VLLM_PID=$VLLM_PID" + [ -n "$VLLM_PID" ] && kill -9 $VLLM_PID 2>/dev/null +}} +trap cleanup EXIT + +{health_func} + +echo "Starting vLLM instances..." +{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ + --output={output_path}/{VLLM_SERVE_LOG_FILE} \\ + {" ".join(serve_cmd)} & +VLLM_PID=$! + +NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) +echo "Waiting for vLLM on $NODE to be ready..." +wait_for_health "http://${{NODE}}:{cmd_args.port}/health" || exit 1 + +echo "Running benchmark..." +{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ + --output={output_path}/{VLLM_BENCH_LOG_FILE} \\ + {bench_cmd}""" + + def _gen_disaggregated_script( + self, + srun_prefix: str, + serve_commands: list[list[str]], + bench_cmd: str, + health_func: str, + cmd_args: VllmCmdArgs, + output_path: Path, + ) -> str: + prefill_cmd, decode_cmd = serve_commands + proxy_cmd = self.get_proxy_command() + prefill_port = cmd_args.port + 100 + decode_port = cmd_args.port + 200 + prefill_gpus = ",".join(str(g) for g in self.prefill_gpu_ids) + decode_gpus = ",".join(str(g) for g in self.decode_gpu_ids) + + return f"""\ +cleanup() {{ + echo "Cleaning up PIDs: PREFILL_PID=$PREFILL_PID DECODE_PID=$DECODE_PID PROXY_PID=$PROXY_PID" + [ -n "$PREFILL_PID" ] && kill -9 $PREFILL_PID 2>/dev/null + [ -n "$DECODE_PID" ] && kill -9 $DECODE_PID 2>/dev/null + [ -n "$PROXY_PID" ] && kill -9 $PROXY_PID 2>/dev/null +}} +trap cleanup EXIT + +{health_func} + +PORT_OFFSET=$((SLURM_JOB_ID % 1000)) +PREFILL_NIXL_PORT=$((5557 + PORT_OFFSET)) +DECODE_NIXL_PORT=$((5557 + PORT_OFFSET + {len(self.gpu_ids)})) + +echo "Starting vLLM instances..." +export CUDA_VISIBLE_DEVICES="{prefill_gpus}" +export VLLM_NIXL_SIDE_CHANNEL_PORT=$PREFILL_NIXL_PORT +{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ + --output={output_path}/vllm-prefill.log \\ + {" ".join(prefill_cmd)} & +PREFILL_PID=$! + +export CUDA_VISIBLE_DEVICES="{decode_gpus}" +export VLLM_NIXL_SIDE_CHANNEL_PORT=$DECODE_NIXL_PORT +{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ + --output={output_path}/vllm-decode.log \\ + {" ".join(decode_cmd)} & +DECODE_PID=$! + +NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) +echo "Waiting for vLLM on $NODE to be ready..." +wait_for_health "http://${{NODE}}:{prefill_port}/health" || exit 1 +wait_for_health "http://${{NODE}}:{decode_port}/health" || exit 1 + +echo "Starting proxy..." +{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ + --output={output_path}/vllm-proxy.log \\ + {" ".join(proxy_cmd)} & +PROXY_PID=$! + +echo "Running benchmark..." +{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ + --output={output_path}/{VLLM_BENCH_LOG_FILE} \\ + {bench_cmd}""" diff --git a/src/cloudai/workloads/vllm/vllm.py b/src/cloudai/workloads/vllm/vllm.py new file mode 100644 index 000000000..2c184c0a3 --- /dev/null +++ b/src/cloudai/workloads/vllm/vllm.py @@ -0,0 +1,128 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging + +from pydantic import ConfigDict, Field + +from cloudai.core import DockerImage, GitRepo, HFModel, Installable, JobStatusResult, TestRun +from cloudai.models.workload import CmdArgs, TestDefinition + +VLLM_SERVE_LOG_FILE = "vllm-serve.log" +VLLM_BENCH_LOG_FILE = "vllm-bench.log" +VLLM_BENCH_JSON_FILE = "vllm-bench.json" + + +class VllmArgs(CmdArgs): + """Base command arguments for vLLM instances.""" + + gpu_ids: str | list[str] | None = Field( + default=None, + description="Comma-separated GPU IDs. If not set, will use all available GPUs.", + ) + + @property + def serve_args(self) -> list[str]: + """Convert cmd_args_dict to command-line arguments list for vllm serve.""" + args = [] + for k, v in self.model_dump(exclude={"gpu_ids"}).items(): + opt = f"--{k.replace('_', '-')}" + if v == "": + args.append(opt) + else: + args.extend([opt, str(v)]) + return args + + +class VllmCmdArgs(CmdArgs): + """vLLM serve command arguments.""" + + model_config = ConfigDict(extra="forbid") # arbitrary fields are allowed per decode/prefill, not here + + docker_image_url: str + port: int = 8000 + vllm_serve_wait_seconds: int = 300 + proxy_script: str = "/opt/vllm/tests/v1/kv_connector/nixl_integration/toy_proxy_server.py" + + model: str = "Qwen/Qwen3-0.6B" + prefill: VllmArgs | None = Field( + default=None, + description="Prefill instance arguments. If not set, a single instance without disaggregation will be used.", + ) + decode: VllmArgs = Field(default_factory=VllmArgs, description="Decode instance arguments.") + + +class VllmBenchCmdArgs(CmdArgs): + """vLLM bench serve command arguments.""" + + random_input_len: int = 16 + random_output_len: int = 128 + max_concurrency: int = 16 + num_prompts: int = 30 + + +class VllmTestDefinition(TestDefinition): + """Test object for vLLM.""" + + cmd_args: VllmCmdArgs + bench_cmd_args: VllmBenchCmdArgs = VllmBenchCmdArgs() + proxy_script_repo: GitRepo | None = None + + _docker_image: DockerImage | None = None + _hf_model: HFModel | None = None + + @property + def docker_image(self) -> DockerImage: + if not self._docker_image: + self._docker_image = DockerImage(url=self.cmd_args.docker_image_url) + return self._docker_image + + @property + def hf_model(self) -> HFModel: + if not self._hf_model: + self._hf_model = HFModel(model_name=self.cmd_args.model) + return self._hf_model + + @property + def installables(self) -> list[Installable]: + installables = [*self.git_repos, self.docker_image, self.hf_model] + if self.proxy_script_repo: + installables.append(self.proxy_script_repo) + return installables + + def was_run_successful(self, tr: TestRun) -> JobStatusResult: + log_path = tr.output_path / VLLM_BENCH_LOG_FILE + if not log_path.is_file(): + return JobStatusResult(is_successful=False, error_message=f"vLLM bench log not found in {tr.output_path}.") + + has_results_marker = False + with log_path.open("r") as f: + for line in f: + if "============ Serving Benchmark Result ============" in line: + has_results_marker = True + continue + if has_results_marker and "Successful requests:" in line: + try: + num_successful_requests = int(line.split()[2]) + if num_successful_requests > 0: + return JobStatusResult(is_successful=True) + except Exception as e: + logging.debug(f"Error parsing number of successful requests: {e}") + + return JobStatusResult( + is_successful=False, error_message=f"vLLM bench log does not contain benchmark result in {tr.output_path}." + ) diff --git a/tests/job_status_retrieval_strategy/test_vllm_job_status_retrieval_strategy.py b/tests/job_status_retrieval_strategy/test_vllm_job_status_retrieval_strategy.py new file mode 100644 index 000000000..ec7678199 --- /dev/null +++ b/tests/job_status_retrieval_strategy/test_vllm_job_status_retrieval_strategy.py @@ -0,0 +1,74 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from cloudai.core import TestRun +from cloudai.workloads.vllm import VLLM_BENCH_LOG_FILE, VllmCmdArgs, VllmTestDefinition + + +class TestVllmSuccessCheck: + def setup_method(self) -> None: + self.vllm_tdef = VllmTestDefinition( + name="vllm", + description="vLLM benchmark", + test_template_name="Vllm", + cmd_args=VllmCmdArgs(docker_image_url="nvcr.io/nvidia/vllm:latest"), + ) + + def test_no_bench_log_file(self, base_tr: TestRun) -> None: + result = self.vllm_tdef.was_run_successful(base_tr) + assert not result.is_successful + assert result.error_message == f"vLLM bench log not found in {base_tr.output_path}." + + def test_successful_job(self, base_tr: TestRun) -> None: + base_tr.output_path.mkdir(parents=True, exist_ok=True) + log_file = base_tr.output_path / VLLM_BENCH_LOG_FILE + log_content = """ +============ Serving Benchmark Result ============ +Successful requests: 1 +""" + log_file.write_text(log_content) + result = self.vllm_tdef.was_run_successful(base_tr) + assert result.is_successful + assert result.error_message == "" + + def test_failed_job_no_result(self, base_tr: TestRun) -> None: + base_tr.output_path.mkdir(parents=True, exist_ok=True) + log_file = base_tr.output_path / VLLM_BENCH_LOG_FILE + log_content = "Starting benchmark...\nsome line\n" + log_file.write_text(log_content) + result = self.vllm_tdef.was_run_successful(base_tr) + assert not result.is_successful + assert result.error_message == f"vLLM bench log does not contain benchmark result in {base_tr.output_path}." + + def test_empty_log_file(self, base_tr: TestRun) -> None: + base_tr.output_path.mkdir(parents=True, exist_ok=True) + log_file = base_tr.output_path / VLLM_BENCH_LOG_FILE + log_file.touch() + result = self.vllm_tdef.was_run_successful(base_tr) + assert not result.is_successful + assert result.error_message == f"vLLM bench log does not contain benchmark result in {base_tr.output_path}." + + def test_no_successful_requests(self, base_tr: TestRun) -> None: + base_tr.output_path.mkdir(parents=True, exist_ok=True) + log_file = base_tr.output_path / VLLM_BENCH_LOG_FILE + log_content = """ +============ Serving Benchmark Result ============ +Successful requests: 0 +""" + log_file.write_text(log_content) + result = self.vllm_tdef.was_run_successful(base_tr) + assert not result.is_successful + assert result.error_message == f"vLLM bench log does not contain benchmark result in {base_tr.output_path}." diff --git a/tests/ref_data/vllm-disagg.sbatch b/tests/ref_data/vllm-disagg.sbatch new file mode 100644 index 000000000..78d2cf613 --- /dev/null +++ b/tests/ref_data/vllm-disagg.sbatch @@ -0,0 +1,76 @@ +#!/bin/bash +# generated by CloudAI@__CLOUDAI_VERSION__ +#SBATCH --job-name=__JOB_NAME__ +#SBATCH --output=__OUTPUT_DIR__/output/stdout.txt +#SBATCH --error=__OUTPUT_DIR__/output/stderr.txt +#SBATCH --partition=main +#SBATCH -N 1 +#SBATCH --gpus-per-node=8 +#SBATCH --gres=gpu:8 + +export SLURM_JOB_MASTER_NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) +export CUDA_VISIBLE_DEVICES=0,1,2,3 +srun --export=ALL --mpi=pmix -N1 --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --output=__OUTPUT_DIR__/output/mapping-stdout.txt --error=__OUTPUT_DIR__/output/mapping-stderr.txt bash -c "echo \$(date): \$(hostname):node \${SLURM_NODEID}:rank \${SLURM_PROCID}." + +srun --export=ALL --mpi=pmix -N1 --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --ntasks=1 --ntasks-per-node=1 --output=__OUTPUT_DIR__/output/metadata/node-%N.toml --error=__OUTPUT_DIR__/output/metadata/nodes.err bash /cloudai_install/slurm-metadata.sh + +cleanup() { + echo "Cleaning up PIDs: PREFILL_PID=$PREFILL_PID DECODE_PID=$DECODE_PID PROXY_PID=$PROXY_PID" + [ -n "$PREFILL_PID" ] && kill -9 $PREFILL_PID 2>/dev/null + [ -n "$DECODE_PID" ] && kill -9 $DECODE_PID 2>/dev/null + [ -n "$PROXY_PID" ] && kill -9 $PROXY_PID 2>/dev/null +} +trap cleanup EXIT + +wait_for_health() { + local endpoint="$1" + local timeout=300 + local interval=5 + local end_time=$(($(date +%s) + timeout)) + + while [ "$(date +%s)" -lt "$end_time" ]; do + if curl -sf "$endpoint" > /dev/null 2>&1; then + echo "Health check passed: $endpoint" + return 0 + fi + sleep "$interval" + done + + echo "Timeout waiting for: $endpoint" + return 1 +} + +PORT_OFFSET=$((SLURM_JOB_ID % 1000)) +PREFILL_NIXL_PORT=$((5557 + PORT_OFFSET)) +DECODE_NIXL_PORT=$((5557 + PORT_OFFSET + 4)) + +echo "Starting vLLM instances..." +export CUDA_VISIBLE_DEVICES="0,1" +export VLLM_NIXL_SIDE_CHANNEL_PORT=$PREFILL_NIXL_PORT +srun --export=ALL --mpi=pmix -N1 --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 \ + --output=__OUTPUT_DIR__/output/vllm-prefill.log \ + vllm serve Qwen/Qwen3-0.6B --port 8100 --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_producer"}' & +PREFILL_PID=$! + +export CUDA_VISIBLE_DEVICES="2,3" +export VLLM_NIXL_SIDE_CHANNEL_PORT=$DECODE_NIXL_PORT +srun --export=ALL --mpi=pmix -N1 --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 \ + --output=__OUTPUT_DIR__/output/vllm-decode.log \ + vllm serve Qwen/Qwen3-0.6B --port 8200 --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_consumer"}' & +DECODE_PID=$! + +NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) +echo "Waiting for vLLM on $NODE to be ready..." +wait_for_health "http://${NODE}:8100/health" || exit 1 +wait_for_health "http://${NODE}:8200/health" || exit 1 + +echo "Starting proxy..." +srun --export=ALL --mpi=pmix -N1 --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 \ + --output=__OUTPUT_DIR__/output/vllm-proxy.log \ + python3 /opt/vllm/tests/v1/kv_connector/nixl_integration/toy_proxy_server.py --port 8000 --prefiller-hosts 0.0.0.0 --prefiller-ports 8100 --decoder-hosts 0.0.0.0 --decoder-ports 8200 & +PROXY_PID=$! + +echo "Running benchmark..." +srun --export=ALL --mpi=pmix -N1 --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 \ + --output=__OUTPUT_DIR__/output/vllm-bench.log \ + vllm bench serve --model Qwen/Qwen3-0.6B --base-url http://0.0.0.0:8000 --random-input-len 16 --random-output-len 128 --max-concurrency 16 --num-prompts 30 --result-dir __OUTPUT_DIR__/output --result-filename vllm-bench.json --save-result diff --git a/tests/ref_data/vllm.sbatch b/tests/ref_data/vllm.sbatch new file mode 100644 index 000000000..9d0c99be1 --- /dev/null +++ b/tests/ref_data/vllm.sbatch @@ -0,0 +1,54 @@ +#!/bin/bash +# generated by CloudAI@__CLOUDAI_VERSION__ +#SBATCH --job-name=__JOB_NAME__ +#SBATCH --output=__OUTPUT_DIR__/output/stdout.txt +#SBATCH --error=__OUTPUT_DIR__/output/stderr.txt +#SBATCH --partition=main +#SBATCH -N 1 +#SBATCH --gpus-per-node=8 +#SBATCH --gres=gpu:8 + +export SLURM_JOB_MASTER_NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) +export CUDA_VISIBLE_DEVICES=0 +srun --export=ALL --mpi=pmix -N1 --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --output=__OUTPUT_DIR__/output/mapping-stdout.txt --error=__OUTPUT_DIR__/output/mapping-stderr.txt bash -c "echo \$(date): \$(hostname):node \${SLURM_NODEID}:rank \${SLURM_PROCID}." + +srun --export=ALL --mpi=pmix -N1 --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --ntasks=1 --ntasks-per-node=1 --output=__OUTPUT_DIR__/output/metadata/node-%N.toml --error=__OUTPUT_DIR__/output/metadata/nodes.err bash /cloudai_install/slurm-metadata.sh + +cleanup() { + echo "Cleaning up PIDs: VLLM_PID=$VLLM_PID" + [ -n "$VLLM_PID" ] && kill -9 $VLLM_PID 2>/dev/null +} +trap cleanup EXIT + +wait_for_health() { + local endpoint="$1" + local timeout=300 + local interval=5 + local end_time=$(($(date +%s) + timeout)) + + while [ "$(date +%s)" -lt "$end_time" ]; do + if curl -sf "$endpoint" > /dev/null 2>&1; then + echo "Health check passed: $endpoint" + return 0 + fi + sleep "$interval" + done + + echo "Timeout waiting for: $endpoint" + return 1 +} + +echo "Starting vLLM instances..." +srun --export=ALL --mpi=pmix -N1 --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 \ + --output=__OUTPUT_DIR__/output/vllm-serve.log \ + vllm serve Qwen/Qwen3-0.6B --port 8000 & +VLLM_PID=$! + +NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) +echo "Waiting for vLLM on $NODE to be ready..." +wait_for_health "http://${NODE}:8000/health" || exit 1 + +echo "Running benchmark..." +srun --export=ALL --mpi=pmix -N1 --container-image=nvcr.io/nvidia/vllm:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output,__INSTALL_DIR__/huggingface:/root/.cache/huggingface --overlap --ntasks-per-node=1 --ntasks=1 \ + --output=__OUTPUT_DIR__/output/vllm-bench.log \ + vllm bench serve --model Qwen/Qwen3-0.6B --base-url http://0.0.0.0:8000 --random-input-len 16 --random-output-len 128 --max-concurrency 16 --num-prompts 30 --result-dir __OUTPUT_DIR__/output --result-filename vllm-bench.json --save-result diff --git a/tests/slurm_command_gen_strategy/test_vllm_slurm_command_gen_strategy.py b/tests/slurm_command_gen_strategy/test_vllm_slurm_command_gen_strategy.py new file mode 100644 index 000000000..7909b3d67 --- /dev/null +++ b/tests/slurm_command_gen_strategy/test_vllm_slurm_command_gen_strategy.py @@ -0,0 +1,417 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +from typing import cast + +import pytest + +from cloudai.core import TestRun +from cloudai.systems.slurm import SlurmSystem +from cloudai.workloads.vllm import ( + VllmArgs, + VllmBenchCmdArgs, + VllmCmdArgs, + VllmSlurmCommandGenStrategy, + VllmTestDefinition, +) +from cloudai.workloads.vllm.vllm import VLLM_BENCH_JSON_FILE, VLLM_BENCH_LOG_FILE, VLLM_SERVE_LOG_FILE + + +@pytest.fixture +def vllm() -> VllmTestDefinition: + return VllmTestDefinition( + name="vllm_test", + description="vLLM benchmark test", + test_template_name="Vllm", + cmd_args=VllmCmdArgs(docker_image_url="nvcr.io/nvidia/vllm:latest", model="Qwen/Qwen3-0.6B", port=8000), + extra_env_vars={"CUDA_VISIBLE_DEVICES": "0"}, + ) + + +@pytest.fixture +def vllm_tr(vllm: VllmTestDefinition, tmp_path: Path) -> TestRun: + return TestRun(test=vllm, num_nodes=1, nodes=[], output_path=tmp_path, name="vllm-job") + + +@pytest.fixture +def vllm_cmd_gen_strategy(vllm_tr: TestRun, slurm_system: SlurmSystem) -> VllmSlurmCommandGenStrategy: + return VllmSlurmCommandGenStrategy(slurm_system, vllm_tr) + + +@pytest.fixture +def vllm_disagg_tr(vllm: VllmTestDefinition, tmp_path: Path) -> TestRun: + """TestRun for disaggregated mode with 4 GPUs.""" + vllm.extra_env_vars = {"CUDA_VISIBLE_DEVICES": "0,1,2,3"} + vllm.cmd_args.prefill = VllmArgs() + return TestRun(test=vllm, num_nodes=1, nodes=[], output_path=tmp_path, name="vllm-disagg-job") + + +class TestGpuDetection: + """Tests for GPU detection logic.""" + + @pytest.mark.parametrize("cuda_visible_devices", ["0", "0,1,2,3", "0,1,2,3,4,5,6,7"]) + def test_gpu_ids_from_cuda_visible_devices_single( + self, cuda_visible_devices: str, vllm_tr: TestRun, slurm_system: SlurmSystem + ) -> None: + vllm_tr.test.extra_env_vars = {"CUDA_VISIBLE_DEVICES": cuda_visible_devices} + strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_tr) + assert strategy.gpu_ids == [int(gpu_id) for gpu_id in cuda_visible_devices.split(",")] + + @pytest.mark.parametrize("gpus_per_node", [None, 1, 8]) + def test_gpu_ids_fallback_to_system( + self, gpus_per_node: int | None, vllm_tr: TestRun, slurm_system: SlurmSystem + ) -> None: + vllm_tr.test.extra_env_vars = {} + slurm_system.gpus_per_node = gpus_per_node + + strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_tr) + + assert strategy.gpu_ids == list(range(gpus_per_node or 1)) + + def test_gpu_ids_use_prefill_and_decode_gpu_ids(self, vllm_tr: TestRun, slurm_system: SlurmSystem) -> None: + slurm_system.gpus_per_node = 4 + vllm_tr.test.extra_env_vars = {"CUDA_VISIBLE_DEVICES": "0,1,2,3"} + vllm_tr.test.cmd_args.prefill = VllmArgs(gpu_ids="4") + vllm_tr.test.cmd_args.decode.gpu_ids = "5" + strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_tr) + assert strategy.gpu_ids == [4, 5] + assert strategy.prefill_gpu_ids == [4] + assert strategy.decode_gpu_ids == [5] + + def test_prefill_nodes_set(self, vllm_tr: TestRun, slurm_system: SlurmSystem) -> None: + slurm_system.gpus_per_node = 4 + vllm_tr.test.extra_env_vars = {"CUDA_VISIBLE_DEVICES": "0,1,2,3"} + vllm_tr.test.cmd_args.prefill = VllmArgs(gpu_ids="0,3") + strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_tr) + assert strategy.prefill_gpu_ids == [0, 3] + + def test_decode_nodes_set(self, vllm_tr: TestRun, slurm_system: SlurmSystem) -> None: + slurm_system.gpus_per_node = 4 + vllm_tr.test.extra_env_vars = {"CUDA_VISIBLE_DEVICES": "0,1,2,3"} + vllm_tr.test.cmd_args.decode.gpu_ids = "1,2" + strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_tr) + assert strategy.decode_gpu_ids == [1, 2] + + +class TestServeExtraArgs: + """Tests for serve_args property.""" + + def test_serve_args_empty_by_default(self) -> None: + assert VllmArgs().serve_args == [] + + def test_empty_string_value_means_flag(self) -> None: + assert VllmArgs.model_validate({"some_flag": "", "some_arg": "value", "zero_value": 0}).serve_args == [ + "--some-flag", + "--some-arg", + "value", + "--zero-value", + "0", + ] + + def test_decode_serve_args_with_custom_fields(self) -> None: + tdef = VllmTestDefinition( + name="vllm", + description="test", + test_template_name="Vllm", + cmd_args=VllmCmdArgs.model_validate( + { + "docker_image_url": "image:latest", + "decode": {"tensor_parallel_size": 4, "max_model_len": 8192, "some_long_arg": "value"}, + } + ), + ) + assert tdef.cmd_args.decode.serve_args == [ + "--tensor-parallel-size", + "4", + "--max-model-len", + "8192", + "--some-long-arg", + "value", + ] + + def test_prefill_serve_args_with_custom_fields(self) -> None: + tdef = VllmTestDefinition( + name="vllm", + description="test", + test_template_name="Vllm", + cmd_args=VllmCmdArgs.model_validate( + { + "docker_image_url": "image:latest", + "prefill": {"tensor_parallel_size": 4, "max_model_len": 8192, "some_long_arg": "value"}, + } + ), + ) + assert tdef.cmd_args.prefill is not None + assert tdef.cmd_args.prefill.serve_args == [ + "--tensor-parallel-size", + "4", + "--max-model-len", + "8192", + "--some-long-arg", + "value", + ] + + +def test_container_mounts(vllm_cmd_gen_strategy: VllmSlurmCommandGenStrategy) -> None: + assert vllm_cmd_gen_strategy._container_mounts() == [ + f"{vllm_cmd_gen_strategy.system.hf_home_path.absolute()}:/root/.cache/huggingface" + ] + + +class TestVllmAggregatedMode: + """Tests for vLLM non-disaggregated mode with 1 GPU.""" + + def test_get_vllm_serve_commands_single_gpu(self, vllm_cmd_gen_strategy: VllmSlurmCommandGenStrategy) -> None: + cmd_args = vllm_cmd_gen_strategy.test_run.test.cmd_args + + commands = vllm_cmd_gen_strategy.get_vllm_serve_commands() + + assert len(commands) == 1 + assert commands[0] == ["vllm", "serve", cmd_args.model, "--port", str(cmd_args.port)] + + def test_generate_wait_for_health_function(self, vllm_cmd_gen_strategy: VllmSlurmCommandGenStrategy) -> None: + cmd_args = vllm_cmd_gen_strategy.test_run.test.cmd_args + + func = vllm_cmd_gen_strategy.generate_wait_for_health_function() + + expected = f"""\ +wait_for_health() {{ + local endpoint="$1" + local timeout={cmd_args.vllm_serve_wait_seconds} + local interval=5 + local end_time=$(($(date +%s) + timeout)) + + while [ "$(date +%s)" -lt "$end_time" ]; do + if curl -sf "$endpoint" > /dev/null 2>&1; then + echo "Health check passed: $endpoint" + return 0 + fi + sleep "$interval" + done + + echo "Timeout waiting for: $endpoint" + return 1 +}}""" + + assert func == expected + + def test_get_vllm_bench_command(self, vllm_cmd_gen_strategy: VllmSlurmCommandGenStrategy) -> None: + tdef = cast(VllmTestDefinition, vllm_cmd_gen_strategy.test_run.test) + cmd_args = tdef.cmd_args + bench_args = tdef.bench_cmd_args + + command = vllm_cmd_gen_strategy.get_vllm_bench_command() + + expected = [ + "vllm", + "bench", + "serve", + f"--model {cmd_args.model}", + f"--base-url http://0.0.0.0:{cmd_args.port}", + f"--random-input-len {bench_args.random_input_len}", + f"--random-output-len {bench_args.random_output_len}", + f"--max-concurrency {bench_args.max_concurrency}", + f"--num-prompts {bench_args.num_prompts}", + f"--result-dir {vllm_cmd_gen_strategy.test_run.output_path.absolute()}", + f"--result-filename {VLLM_BENCH_JSON_FILE}", + "--save-result", + ] + assert command == expected + + def test_get_vllm_bench_command_with_extra_args( + self, vllm: VllmTestDefinition, vllm_tr: TestRun, slurm_system: SlurmSystem + ) -> None: + vllm.bench_cmd_args = VllmBenchCmdArgs.model_validate({"extra1": 1, "extra-2": 2, "extra_3": 3}) + vllm_tr.test = vllm + vllm_cmd_gen_strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_tr) + + cmd = vllm_cmd_gen_strategy.get_vllm_bench_command() + + assert "--extra1 1" in cmd + assert "--extra-2 2" in cmd + assert "--extra-3 3" in cmd + + def test_gen_srun_command_full_flow(self, vllm_cmd_gen_strategy: VllmSlurmCommandGenStrategy) -> None: + tdef = vllm_cmd_gen_strategy.test_run.test + cmd_args = tdef.cmd_args + output_path = vllm_cmd_gen_strategy.test_run.output_path.absolute() + srun_prefix = " ".join(vllm_cmd_gen_strategy.gen_srun_prefix()) + serve_cmd = " ".join(vllm_cmd_gen_strategy.get_vllm_serve_commands()[0]) + bench_cmd = " ".join(vllm_cmd_gen_strategy.get_vllm_bench_command()) + health_func = vllm_cmd_gen_strategy.generate_wait_for_health_function() + + srun_command = vllm_cmd_gen_strategy._gen_srun_command() + + expected = f"""\ +cleanup() {{ + echo "Cleaning up PIDs: VLLM_PID=$VLLM_PID" + [ -n "$VLLM_PID" ] && kill -9 $VLLM_PID 2>/dev/null +}} +trap cleanup EXIT + +{health_func} + +echo "Starting vLLM instances..." +{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ + --output={output_path}/{VLLM_SERVE_LOG_FILE} \\ + {serve_cmd} & +VLLM_PID=$! + +NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) +echo "Waiting for vLLM on $NODE to be ready..." +wait_for_health "http://${{NODE}}:{cmd_args.port}/health" || exit 1 + +echo "Running benchmark..." +{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ + --output={output_path}/{VLLM_BENCH_LOG_FILE} \\ + {bench_cmd}""" + + assert srun_command == expected + + +class TestVllmDisaggregatedMode: + """Tests for vLLM disaggregated mode with multiple GPUs.""" + + def test_prefill_gpu_ids(self, vllm_disagg_tr: TestRun, slurm_system: SlurmSystem) -> None: + """Prefill gets first half of GPUs.""" + strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_disagg_tr) + assert strategy.prefill_gpu_ids == [0, 1] + + def test_decode_gpu_ids(self, vllm_disagg_tr: TestRun, slurm_system: SlurmSystem) -> None: + """Decode gets second half of GPUs.""" + strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_disagg_tr) + assert strategy.decode_gpu_ids == [2, 3] + + def test_get_vllm_serve_commands_returns_two(self, vllm_disagg_tr: TestRun, slurm_system: SlurmSystem) -> None: + """Disagg mode returns prefill and decode commands.""" + strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_disagg_tr) + cmd_args = vllm_disagg_tr.test.cmd_args + + commands = strategy.get_vllm_serve_commands() + + assert len(commands) == 2 + prefill_cmd, decode_cmd = commands + + assert prefill_cmd == [ + "vllm", + "serve", + cmd_args.model, + "--port", + str(cmd_args.port + 100), + "--kv-transfer-config", + '\'{"kv_connector":"NixlConnector","kv_role":"kv_producer"}\'', + ] + assert decode_cmd == [ + "vllm", + "serve", + cmd_args.model, + "--port", + str(cmd_args.port + 200), + "--kv-transfer-config", + '\'{"kv_connector":"NixlConnector","kv_role":"kv_consumer"}\'', + ] + + def test_get_proxy_command(self, vllm_disagg_tr: TestRun, slurm_system: SlurmSystem) -> None: + """Proxy command routes to prefill and decode ports.""" + strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_disagg_tr) + cmd_args = vllm_disagg_tr.test.cmd_args + + command = strategy.get_proxy_command() + + assert command == [ + "python3", + cmd_args.proxy_script, + "--port", + str(cmd_args.port), + "--prefiller-hosts", + "0.0.0.0", + "--prefiller-ports", + str(cmd_args.port + 100), + "--decoder-hosts", + "0.0.0.0", + "--decoder-ports", + str(cmd_args.port + 200), + ] + + def test_gen_srun_command_disagg_flow(self, vllm_disagg_tr: TestRun, slurm_system: SlurmSystem) -> None: + """Disagg mode starts prefill, decode, and proxy, waits for health checks.""" + strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_disagg_tr) + cmd_args = vllm_disagg_tr.test.cmd_args + output_path = vllm_disagg_tr.output_path.absolute() + srun_prefix = " ".join(strategy.gen_srun_prefix()) + prefill_cmd, decode_cmd = strategy.get_vllm_serve_commands() + proxy_cmd = strategy.get_proxy_command() + bench_cmd = " ".join(strategy.get_vllm_bench_command()) + health_func = strategy.generate_wait_for_health_function() + prefill_gpus = ",".join(str(g) for g in strategy.prefill_gpu_ids) + decode_gpus = ",".join(str(g) for g in strategy.decode_gpu_ids) + + srun_command = strategy._gen_srun_command() + + expected = f"""\ +cleanup() {{ + echo "Cleaning up PIDs: PREFILL_PID=$PREFILL_PID DECODE_PID=$DECODE_PID PROXY_PID=$PROXY_PID" + [ -n "$PREFILL_PID" ] && kill -9 $PREFILL_PID 2>/dev/null + [ -n "$DECODE_PID" ] && kill -9 $DECODE_PID 2>/dev/null + [ -n "$PROXY_PID" ] && kill -9 $PROXY_PID 2>/dev/null +}} +trap cleanup EXIT + +{health_func} + +PORT_OFFSET=$((SLURM_JOB_ID % 1000)) +PREFILL_NIXL_PORT=$((5557 + PORT_OFFSET)) +DECODE_NIXL_PORT=$((5557 + PORT_OFFSET + {len(strategy.gpu_ids)})) + +echo "Starting vLLM instances..." +export CUDA_VISIBLE_DEVICES="{prefill_gpus}" +export VLLM_NIXL_SIDE_CHANNEL_PORT=$PREFILL_NIXL_PORT +{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ + --output={output_path}/vllm-prefill.log \\ + {" ".join(prefill_cmd)} & +PREFILL_PID=$! + +export CUDA_VISIBLE_DEVICES="{decode_gpus}" +export VLLM_NIXL_SIDE_CHANNEL_PORT=$DECODE_NIXL_PORT +{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ + --output={output_path}/vllm-decode.log \\ + {" ".join(decode_cmd)} & +DECODE_PID=$! + +NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) +echo "Waiting for vLLM on $NODE to be ready..." +wait_for_health "http://${{NODE}}:{cmd_args.port + 100}/health" || exit 1 +wait_for_health "http://${{NODE}}:{cmd_args.port + 200}/health" || exit 1 + +echo "Starting proxy..." +{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ + --output={output_path}/vllm-proxy.log \\ + {" ".join(proxy_cmd)} & +PROXY_PID=$! + +echo "Running benchmark..." +{srun_prefix} --overlap --ntasks-per-node=1 --ntasks=1 \\ + --output={output_path}/{VLLM_BENCH_LOG_FILE} \\ + {bench_cmd}""" + + assert srun_command == expected + + +def test_sweep_detection(vllm: VllmTestDefinition) -> None: + assert vllm.is_dse_job is False + vllm.cmd_args.decode.gpu_ids = ["1"] + assert vllm.is_dse_job is True diff --git a/tests/test_acceptance.py b/tests/test_acceptance.py index df3bbc06b..48af3d09b 100644 --- a/tests/test_acceptance.py +++ b/tests/test_acceptance.py @@ -73,6 +73,7 @@ TritonInferenceTestDefinition, ) from cloudai.workloads.ucc_test import UCCCmdArgs, UCCTestDefinition +from cloudai.workloads.vllm import VllmArgs, VllmCmdArgs, VllmTestDefinition SLURM_TEST_SCENARIOS = [ {"path": Path("conf/common/test_scenario/sleep.toml"), "expected_dirs_number": 4, "log_file": "sleep_debug.log"}, @@ -262,6 +263,8 @@ def build_special_test_run( "nixl-kvbench", "deepep-benchmark", "osu-bench", + "vllm", + "vllm-disagg", ] ) def test_req(request, slurm_system: SlurmSystem, partial_tr: partial[TestRun]) -> Tuple[TestRun, str, Optional[str]]: @@ -492,6 +495,37 @@ def test_req(request, slurm_system: SlurmSystem, partial_tr: partial[TestRun]) - ), ), ), + "vllm": lambda: create_test_run( + partial_tr, + "vllm", + VllmTestDefinition( + name="vllm", + description="vLLM benchmark", + test_template_name="Vllm", + cmd_args=VllmCmdArgs( + docker_image_url="nvcr.io/nvidia/vllm:latest", + model="Qwen/Qwen3-0.6B", + port=8000, + ), + extra_env_vars={"CUDA_VISIBLE_DEVICES": "0"}, + ), + ), + "vllm-disagg": lambda: create_test_run( + partial_tr, + "vllm-disagg", + VllmTestDefinition( + name="vllm-disagg", + description="vLLM disaggregated benchmark", + test_template_name="Vllm", + cmd_args=VllmCmdArgs( + docker_image_url="nvcr.io/nvidia/vllm:latest", + model="Qwen/Qwen3-0.6B", + port=8000, + prefill=VllmArgs(), + ), + extra_env_vars={"CUDA_VISIBLE_DEVICES": "0,1,2,3"}, + ), + ), } if request.param.startswith(("gpt-", "grok-", "nemo-run-", "nemo-launcher")): diff --git a/tests/test_init.py b/tests/test_init.py index 78b717d8f..9a5da9db7 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -1,5 +1,5 @@ # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -97,6 +97,7 @@ UCCTestGradingStrategy, UCCTestSlurmCommandGenStrategy, ) +from cloudai.workloads.vllm import VllmSlurmCommandGenStrategy, VllmTestDefinition def test_systems(): @@ -144,6 +145,7 @@ def test_runners(): (SlurmSystem, NixlPerftestTestDefinition): NixlPerftestSlurmCommandGenStrategy, (SlurmSystem, NIXLKVBenchTestDefinition): NIXLKVBenchSlurmCommandGenStrategy, (SlurmSystem, OSUBenchTestDefinition): OSUBenchSlurmCommandGenStrategy, + (SlurmSystem, VllmTestDefinition): VllmSlurmCommandGenStrategy, } JSON_GEN_STRATEGIES = { (KubernetesSystem, NCCLTestDefinition): NcclTestKubernetesJsonGenStrategy, @@ -217,7 +219,7 @@ def test_installers(): def test_definitions(): test_defs = Registry().test_definitions_map - assert len(test_defs) == 22 + assert len(test_defs) == 23 for tdef in [ ("UCCTest", UCCTestDefinition), ("DDLBTest", DDLBTestDefinition), @@ -241,6 +243,7 @@ def test_definitions(): ("NIXLKVBench", NIXLKVBenchTestDefinition), ("Aiconfigurator", AiconfiguratorTestDefinition), ("OSUBench", OSUBenchTestDefinition), + ("vllm", VllmTestDefinition), ]: assert test_defs[tdef[0]] == tdef[1] diff --git a/tests/test_test_scenario.py b/tests/test_test_scenario.py index c2af1373b..e55fda0d5 100644 --- a/tests/test_test_scenario.py +++ b/tests/test_test_scenario.py @@ -1,5 +1,5 @@ # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -472,7 +472,7 @@ def test_default(self): assert len(reporters) == 0 def test_default_reporters_size(self): - assert len(Registry().reports_map) == 16 + assert len(Registry().reports_map) == 17 @pytest.mark.parametrize( "tdef,expected_reporters",