From 9491f5fc993c02e107b3383e28f6961e0aff990a Mon Sep 17 00:00:00 2001 From: Wang Siyuan Date: Sun, 15 Feb 2026 15:43:59 +0800 Subject: [PATCH 1/3] fix(multi): address full-repo coderabbit findings --- .github/workflows/docs.yml | 2 +- .github/workflows/pre-commit.yaml | 2 +- .github/workflows/python-app.yml | 4 +- src/keep_gpu/benchmark.py | 4 +- .../global_gpu_controller.py | 2 +- .../base_gpu_controller.py | 4 +- .../cuda_gpu_controller.py | 9 ++-- .../rocm_gpu_controller.py | 44 ++++++++++++++----- src/keep_gpu/utilities/humanized_input.py | 11 ++++- src/keep_gpu/utilities/logger.py | 11 +++-- tests/conftest.py | 10 ++++- tests/cuda_controller/context_manager.py | 24 ++++++---- .../cuda_controller/test_2_32pow_elements.py | 19 ++++---- .../cuda_controller/test_keep_and_release.py | 10 +++-- tests/global_controller/global_keep_test.py | 4 -- 15 files changed, 105 insertions(+), 55 deletions(-) diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 6f6c84e..b201da6 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -17,7 +17,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: '3.10' + python-version: '3.13' - name: Install dependencies run: | diff --git a/.github/workflows/pre-commit.yaml b/.github/workflows/pre-commit.yaml index 63bfaa6..9ba2dd5 100644 --- a/.github/workflows/pre-commit.yaml +++ b/.github/workflows/pre-commit.yaml @@ -26,4 +26,4 @@ jobs: - name: Run pre-commit (no fix) run: | - pre-commit run --all-files --hook-stage manual --show-diff-on-failure --color always + pre-commit run --all-files --show-diff-on-failure --color always diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 2c28488..263f1df 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -26,10 +26,10 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install -e . + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi pip install flake8 pytest pip install torch --index-url https://download.pytorch.org/whl/cpu - if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + pip install -e . - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names diff --git a/src/keep_gpu/benchmark.py b/src/keep_gpu/benchmark.py index 5653bae..b9b2c94 100644 --- a/src/keep_gpu/benchmark.py +++ b/src/keep_gpu/benchmark.py @@ -51,7 +51,9 @@ def keep(rank, args): toc = time.time() logger.info( - f"benchmark {rank} matmul: time span: {(toc - tic) * 1000 / 5000:.2f}ms" + "benchmark %s matmul: time span: %.2fms", + rank, + (toc - tic) * 1000 / args.matmul_iterations, ) time.sleep(args.interval) diff --git a/src/keep_gpu/global_gpu_controller/global_gpu_controller.py b/src/keep_gpu/global_gpu_controller/global_gpu_controller.py index 804f2de..38d65bc 100644 --- a/src/keep_gpu/global_gpu_controller/global_gpu_controller.py +++ b/src/keep_gpu/global_gpu_controller/global_gpu_controller.py @@ -51,7 +51,7 @@ def __init__( controller_cls( rank=i, interval=interval, - vram_to_keep=vram_to_keep, + vram_to_keep=self.vram_to_keep, busy_threshold=busy_threshold, ) for i in self.gpu_ids diff --git a/src/keep_gpu/single_gpu_controller/base_gpu_controller.py b/src/keep_gpu/single_gpu_controller/base_gpu_controller.py index 4adc5ff..b40be05 100644 --- a/src/keep_gpu/single_gpu_controller/base_gpu_controller.py +++ b/src/keep_gpu/single_gpu_controller/base_gpu_controller.py @@ -32,7 +32,7 @@ def monitor(self): def keep(self): """ - Method to keep the specified amount of VRAM free. + Method to keep the specified amount of VRAM busy/occupied. Should be implemented by subclasses. """ raise NotImplementedError("Subclasses must implement this method.") @@ -46,7 +46,7 @@ def rest(self): async def _keep(self): """ - Asynchronous method to keep the specified amount of VRAM free. + Asynchronous method to keep the specified amount of VRAM busy/occupied. This is a placeholder for subclasses to implement their logic. """ raise NotImplementedError("Subclasses must implement this method.") diff --git a/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py b/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py index baf9579..b3ef7cc 100644 --- a/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py +++ b/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py @@ -50,7 +50,7 @@ def __init__( Args: rank (int): Local CUDA device index to occupy. interval (float, optional): Sleep time (seconds) between workload - batches. Defaults to 0.5. + batches. Defaults to 1.0. matmul_iterations (int, optional): Number of matmul ops per batch. vram_to_keep (int or str, optional): Amount of VRAM to keep busy, e.g. `"1000 MB"`, `"20 GB"`, or an integer like `1000 * 1000`. @@ -126,8 +126,11 @@ def _keep_loop(self) -> None: matrix = None while not self._stop_evt.is_set(): try: + num_elements = int(self.vram_to_keep) + if num_elements <= 0: + raise ValueError("vram_to_keep must be positive") matrix = torch.rand( - self.vram_to_keep, + num_elements, device=self.device, dtype=torch.float32, requires_grad=False, @@ -166,7 +169,7 @@ def _keep_loop(self) -> None: # ------------------------------------------------------------------ @torch.no_grad() def _run_mat_batch(self, matrix: torch.Tensor) -> None: - """Run a batch of dummy matmuls to keep GPU busy.""" + """Run a batch of in-place ReLU ops to keep GPU busy.""" tic = time.time() for _ in range(self.matmul_iterations): diff --git a/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py b/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py index 2f65b80..1fbf75f 100644 --- a/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py +++ b/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py @@ -24,14 +24,17 @@ def __init__( vram_to_keep: str | int = "1000 MB", busy_threshold: int = 10, iterations: int = 5000, + max_allocation_retries: int = 3, ): super().__init__(vram_to_keep=vram_to_keep, interval=interval) self.rank = rank self.device = torch.device(f"cuda:{rank}") self.busy_threshold = busy_threshold self.iterations = iterations + self.max_allocation_retries = max_allocation_retries self._stop_evt: Optional[threading.Event] = None self._thread: Optional[threading.Thread] = None + self._failure_exc: Optional[Exception] = None # Lazy rocm_smi import; keep handle for reuse try: @@ -46,6 +49,7 @@ def keep(self) -> None: if self._thread and self._thread.is_alive(): logger.warning("rank %s: keep thread already running", self.rank) return + self._failure_exc = None if self._rocm_smi: try: self._rocm_smi.rsmi_init() @@ -62,12 +66,12 @@ def keep(self) -> None: logger.info("rank %s: ROCm keep thread started", self.rank) def release(self) -> None: - if not (self._thread and self._thread.is_alive()): + if self._thread and self._thread.is_alive(): + self._stop_evt.set() + self._thread.join() + torch.cuda.empty_cache() + else: logger.warning("rank %s: keep thread not running", self.rank) - return - self._stop_evt.set() - self._thread.join() - torch.cuda.empty_cache() if self._rocm_smi: try: self._rocm_smi.rsmi_shut_down() @@ -95,21 +99,35 @@ def _query_utilization(self) -> Optional[int]: def _keep_loop(self) -> None: torch.cuda.set_device(self.rank) tensor = None - while not self._stop_evt.is_set(): + attempts = 0 + while not self._stop_evt.is_set() and attempts < self.max_allocation_retries: try: + num_elements = int(self.vram_to_keep) + if num_elements <= 0: + raise ValueError("vram_to_keep must be positive") tensor = torch.rand( - self.vram_to_keep, + num_elements, device=self.device, dtype=torch.float32, requires_grad=False, ) break - except RuntimeError: - logger.exception("rank %s: failed to allocate tensor", self.rank) + except (RuntimeError, ValueError) as exc: + attempts += 1 + logger.error( + "rank %s: failed to allocate tensor (attempt %d/%d): %s", + self.rank, + attempts, + self.max_allocation_retries, + exc, + ) time.sleep(self.interval) if tensor is None: - logger.error("rank %s: failed to allocate tensor, exiting loop", self.rank) - raise RuntimeError("Failed to allocate tensor for ROCm GPU keeping") + self._failure_exc = RuntimeError( + f"rank {self.rank}: failed to allocate tensor after {attempts} attempts" + ) + logger.error("%s", self._failure_exc) + return while not self._stop_evt.is_set(): try: @@ -127,6 +145,10 @@ def _keep_loop(self) -> None: logger.exception("rank %s: unexpected error", self.rank) time.sleep(self.interval) + def allocation_status(self) -> Optional[Exception]: + """Return allocation failure exception captured in worker thread, if any.""" + return self._failure_exc + @torch.no_grad() def _run_batch(self, tensor: torch.Tensor) -> None: tic = time.time() diff --git a/src/keep_gpu/utilities/humanized_input.py b/src/keep_gpu/utilities/humanized_input.py index 9432638..4936497 100644 --- a/src/keep_gpu/utilities/humanized_input.py +++ b/src/keep_gpu/utilities/humanized_input.py @@ -17,6 +17,13 @@ def parse_size(text: str) -> int: + """ + Parse human-readable memory strings into float32 element counts. + + The return value is the number of float32 elements needed to occupy the + requested memory size. When no unit is provided, the default unit is GB. + Supported units are the keys in `_UNITS`. + """ text = text.strip().replace(" ", "") m = re.fullmatch(r"([0-9]*\.?[0-9]+)([A-Za-z]*)", text) if not m: @@ -24,7 +31,9 @@ def parse_size(text: str) -> int: value, unit = m.groups() unit = unit or "GB" if len(unit) > 1: - unit = unit[:-1].upper() + unit[-1] + # Treat all-lowercase units as byte units ("gb" -> "GB", "gib" -> "GIB") + # while preserving explicit mixed-case bit forms ("Gb", "GIb"). + unit = unit.upper() if unit.islower() else unit[:-1].upper() + unit[-1] if unit not in _UNITS: raise ValueError(f"unknown unit: {unit}, should be one of {_UNITS.keys()}") return int(float(value) * _UNITS[unit] / 4) # float32 takes 4 bytes diff --git a/src/keep_gpu/utilities/logger.py b/src/keep_gpu/utilities/logger.py index a9bc0b1..2a5dc02 100644 --- a/src/keep_gpu/utilities/logger.py +++ b/src/keep_gpu/utilities/logger.py @@ -29,10 +29,13 @@ def _build_console_handler(level: int) -> logging.Handler: """Create a colored console handler with filename:lineno.""" handler = logging.StreamHandler(sys.stdout) handler.setLevel(level) - fmt = "%(log_color)s%(asctime)s [%(levelname)s] %(name)s (%(filename)s:%(lineno)d): %(message)s" + color_fmt = "%(log_color)s%(asctime)s [%(levelname)s] %(name)s (%(filename)s:%(lineno)d): %(message)s" + plain_fmt = ( + "%(asctime)s [%(levelname)s] %(name)s (%(filename)s:%(lineno)d): %(message)s" + ) if ColoredFormatter: formatter = ColoredFormatter( - fmt, + color_fmt, datefmt="%H:%M:%S", log_colors={ "DEBUG": "cyan", @@ -43,7 +46,7 @@ def _build_console_handler(level: int) -> logging.Handler: }, ) else: - formatter = logging.Formatter(fmt, "%H:%M:%S") + formatter = logging.Formatter(plain_fmt, "%H:%M:%S") handler.setFormatter(formatter) return handler @@ -53,7 +56,7 @@ def _build_file_handler( ) -> logging.Handler: """Create a file handler with filename:lineno.""" log_dir = Path(log_dir) - log_dir.mkdir(exist_ok=True) + log_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") file_path = log_dir / f"{name}_{timestamp}.log" diff --git a/tests/conftest.py b/tests/conftest.py index 33fd177..004d2ba 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,4 @@ import pytest -import torch def pytest_addoption(parser): @@ -23,4 +22,11 @@ def pytest_collection_modifyitems(config, items): @pytest.fixture def rocm_available(): - return bool(torch.cuda.is_available() and getattr(torch.version, "hip", None)) + try: + import torch + except Exception: + return False + try: + return bool(torch.cuda.is_available() and getattr(torch.version, "hip", None)) + except Exception: + return False diff --git a/tests/cuda_controller/context_manager.py b/tests/cuda_controller/context_manager.py index de21be7..e3bb1f0 100644 --- a/tests/cuda_controller/context_manager.py +++ b/tests/cuda_controller/context_manager.py @@ -1,6 +1,7 @@ import time -import torch import pytest +import torch + from keep_gpu.single_gpu_controller.cuda_gpu_controller import CudaGPUController @@ -10,15 +11,20 @@ ) def test_cuda_controller_context_manager(): ctrl = CudaGPUController( - rank=torch.cuda.device_count() - 1, interval=10, vram_to_keep="1GB" + rank=torch.cuda.device_count() - 1, + interval=0.05, + vram_to_keep="8MB", + matmul_iterations=64, ) + torch.cuda.set_device(ctrl.rank) + before_reserved = torch.cuda.memory_reserved(ctrl.rank) with ctrl: - print("GPU kept busy for 10 seconds.") - time.sleep(10) - print("GPU released.") - print("Test completed successfully.") - + time.sleep(0.3) + assert ctrl._thread and ctrl._thread.is_alive() + during_reserved = torch.cuda.memory_reserved(ctrl.rank) + assert during_reserved >= before_reserved -if __name__ == "__main__": - test_cuda_controller_context_manager() + if ctrl._thread: + ctrl._thread.join(timeout=2) + assert not (ctrl._thread and ctrl._thread.is_alive()) diff --git a/tests/cuda_controller/test_2_32pow_elements.py b/tests/cuda_controller/test_2_32pow_elements.py index 7bcfe5a..df3f247 100644 --- a/tests/cuda_controller/test_2_32pow_elements.py +++ b/tests/cuda_controller/test_2_32pow_elements.py @@ -8,11 +8,16 @@ @pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") def test_large_vram_allocation(): """Tests controller with a large VRAM allocation.""" - # Using a smaller allocation for general testing. The original 2**32 can be used on machines with sufficient VRAM. - # torch has some indexing issues on very large tensors - # e.g. tensors with more than 2**32-1 elements may cause issues - # just a test to see if it is real. - vram_elements = 2**32 # Allocates 16GiB to test large tensor handling + # Intentionally using full 2**32 float32 elements (~16 GiB) for large-tensor testing. + # Torch may expose indexing issues around this boundary on some systems. + vram_elements = 2**32 + required_bytes = vram_elements * 4 + free_bytes, _ = torch.cuda.mem_get_info(0) + if free_bytes < required_bytes: + pytest.skip( + f"Insufficient free VRAM for large test: need {required_bytes}, have {free_bytes}" + ) + controller = CudaGPUController( rank=0, interval=0.5, @@ -27,7 +32,3 @@ def test_large_vram_allocation(): assert controller._thread is not None and controller._thread.is_alive() finally: controller.release() - - -if __name__ == "__main__": - test_large_vram_allocation() diff --git a/tests/cuda_controller/test_keep_and_release.py b/tests/cuda_controller/test_keep_and_release.py index e23ff44..b549eee 100644 --- a/tests/cuda_controller/test_keep_and_release.py +++ b/tests/cuda_controller/test_keep_and_release.py @@ -19,20 +19,22 @@ def test_cuda_controller_basic(): time.sleep(0.2) assert ctrl._thread and ctrl._thread.is_alive() + assert ctrl._thread is not None ctrl.release() + ctrl._thread.join(timeout=2) assert not (ctrl._thread and ctrl._thread.is_alive()) ctrl.keep() time.sleep(0.2) assert ctrl._thread and ctrl._thread.is_alive() + assert ctrl._thread is not None ctrl.release() + ctrl._thread.join(timeout=2) assert not (ctrl._thread and ctrl._thread.is_alive()) with ctrl: assert ctrl._thread and ctrl._thread.is_alive() time.sleep(0.2) + assert ctrl._thread is not None + ctrl._thread.join(timeout=2) assert not (ctrl._thread and ctrl._thread.is_alive()) - - -if __name__ == "__main__": - test_cuda_controller_basic() diff --git a/tests/global_controller/global_keep_test.py b/tests/global_controller/global_keep_test.py index 0d84c8f..0f680d3 100644 --- a/tests/global_controller/global_keep_test.py +++ b/tests/global_controller/global_keep_test.py @@ -23,7 +23,3 @@ def test_global_controller(): controller.release() for ctrl in controller.controllers: assert not (ctrl._thread and ctrl._thread.is_alive()) - - -if __name__ == "__main__": - test_global_controller() From cffc16fcc919780eb7ac788efa902a32850a41bb Mon Sep 17 00:00:00 2001 From: Wang Siyuan Date: Sun, 15 Feb 2026 20:52:38 +0800 Subject: [PATCH 2/3] fix(cuda,rocm,tests): address follow-up review comments --- .../cuda_gpu_controller.py | 38 +++++++++++++------ .../rocm_gpu_controller.py | 33 ++++++++++------ .../cuda_controller/test_2_32pow_elements.py | 2 +- ...ext_manager.py => test_context_manager.py} | 4 +- .../cuda_controller/test_keep_and_release.py | 8 +--- tests/cuda_controller/test_throttle.py | 4 +- 6 files changed, 53 insertions(+), 36 deletions(-) rename tests/cuda_controller/{context_manager.py => test_context_manager.py} (89%) diff --git a/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py b/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py index b3ef7cc..0732151 100644 --- a/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py +++ b/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py @@ -16,7 +16,7 @@ class CudaGPUController(BaseGPUController): """CudaGPUController Keep a single CUDA GPU busy by repeatedly running lightweight - matrix-multiplication workloads in a background thread. + elementwise workloads in a background thread. Typical usage: @@ -42,7 +42,8 @@ def __init__( *, rank: int, interval: float = 1.0, - matmul_iterations: int = 5000, + relu_iterations: int = 5000, + matmul_iterations: Optional[int] = None, vram_to_keep: str | int = "1000 MB", busy_threshold: int = 10, ): @@ -51,7 +52,10 @@ def __init__( rank (int): Local CUDA device index to occupy. interval (float, optional): Sleep time (seconds) between workload batches. Defaults to 1.0. - matmul_iterations (int, optional): Number of matmul ops per batch. + relu_iterations (int, optional): Number of in-place ReLU ops per + batch. + matmul_iterations (int, optional): Legacy alias for + `relu_iterations`. When set, it overrides `relu_iterations`. vram_to_keep (int or str, optional): Amount of VRAM to keep busy, e.g. `"1000 MB"`, `"20 GB"`, or an integer like `1000 * 1000`. This represents the total size of the matrix allocated to @@ -65,12 +69,15 @@ def __init__( self.rank = rank self.device = torch.device(f"cuda:{rank}") self.interval = interval - self.matmul_iterations = matmul_iterations + if matmul_iterations is not None: + relu_iterations = matmul_iterations + self.relu_iterations = relu_iterations self.busy_threshold = busy_threshold self.platform = ComputingPlatform.CUDA self._stop_evt: Optional[threading.Event] = None self._thread: Optional[threading.Thread] = None + self._num_elements: Optional[int] = None @staticmethod def parse_size(text: str) -> int: @@ -86,6 +93,10 @@ def keep(self) -> None: logger.warning("rank %s: keep thread already running", self.rank) return + self._num_elements = int(self.vram_to_keep) + if self._num_elements <= 0: + raise ValueError("vram_to_keep must be positive") + self._stop_evt = threading.Event() self._thread = threading.Thread( target=self._keep_loop, @@ -123,12 +134,15 @@ def __exit__(self, exc_type, exc, tb): def _keep_loop(self) -> None: """Internal: run workloads until stop event is set.""" torch.cuda.set_device(self.rank) + num_elements = self._num_elements if self._num_elements is not None else 0 + if num_elements <= 0: + logger.error( + "rank %s: invalid vram_to_keep=%s", self.rank, self.vram_to_keep + ) + return matrix = None while not self._stop_evt.is_set(): try: - num_elements = int(self.vram_to_keep) - if num_elements <= 0: - raise ValueError("vram_to_keep must be positive") matrix = torch.rand( num_elements, device=self.device, @@ -152,7 +166,7 @@ def _keep_loop(self) -> None: gpu_utilization, ) else: - self._run_mat_batch(matrix) + self._run_relu_batch(matrix) time.sleep(self.interval) except RuntimeError as e: # Handle OOM by clearing cache; then sleep and continue @@ -168,11 +182,11 @@ def _keep_loop(self) -> None: # Workload implementation # ------------------------------------------------------------------ @torch.no_grad() - def _run_mat_batch(self, matrix: torch.Tensor) -> None: + def _run_relu_batch(self, matrix: torch.Tensor) -> None: """Run a batch of in-place ReLU ops to keep GPU busy.""" tic = time.time() - for _ in range(self.matmul_iterations): + for _ in range(self.relu_iterations): torch.relu_(matrix) if self._stop_evt.is_set(): break @@ -180,9 +194,9 @@ def _run_mat_batch(self, matrix: torch.Tensor) -> None: toc = time.time() logger.debug( - "rank %s: mat ops batch done - avg %.2f ms", + "rank %s: relu ops batch done - avg %.2f ms", self.rank, - (toc - tic) * 1000 / self.matmul_iterations, + (toc - tic) * 1000 / self.relu_iterations, ) # ------------------------------------------------------------------ diff --git a/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py b/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py index 1fbf75f..7365741 100644 --- a/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py +++ b/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py @@ -24,7 +24,7 @@ def __init__( vram_to_keep: str | int = "1000 MB", busy_threshold: int = 10, iterations: int = 5000, - max_allocation_retries: int = 3, + max_allocation_retries: Optional[int] = None, ): super().__init__(vram_to_keep=vram_to_keep, interval=interval) self.rank = rank @@ -100,7 +100,7 @@ def _keep_loop(self) -> None: torch.cuda.set_device(self.rank) tensor = None attempts = 0 - while not self._stop_evt.is_set() and attempts < self.max_allocation_retries: + while not self._stop_evt.is_set(): try: num_elements = int(self.vram_to_keep) if num_elements <= 0: @@ -115,19 +115,26 @@ def _keep_loop(self) -> None: except (RuntimeError, ValueError) as exc: attempts += 1 logger.error( - "rank %s: failed to allocate tensor (attempt %d/%d): %s", + "rank %s: failed to allocate tensor (attempt %d%s): %s", self.rank, attempts, - self.max_allocation_retries, + ( + f"/{self.max_allocation_retries}" + if self.max_allocation_retries is not None + else "" + ), exc, ) + if ( + self.max_allocation_retries is not None + and attempts >= self.max_allocation_retries + ): + self._failure_exc = RuntimeError( + f"rank {self.rank}: failed to allocate tensor after {attempts} attempts" + ) + logger.error("%s", self._failure_exc) + return time.sleep(self.interval) - if tensor is None: - self._failure_exc = RuntimeError( - f"rank {self.rank}: failed to allocate tensor after {attempts} attempts" - ) - logger.error("%s", self._failure_exc) - return while not self._stop_evt.is_set(): try: @@ -146,7 +153,11 @@ def _keep_loop(self) -> None: time.sleep(self.interval) def allocation_status(self) -> Optional[Exception]: - """Return allocation failure exception captured in worker thread, if any.""" + """ + Return allocation failure captured in the worker thread, if any. + + The reference assignment/read is thread-safe for CPython's GIL model. + """ return self._failure_exc @torch.no_grad() diff --git a/tests/cuda_controller/test_2_32pow_elements.py b/tests/cuda_controller/test_2_32pow_elements.py index df3f247..a6dedd6 100644 --- a/tests/cuda_controller/test_2_32pow_elements.py +++ b/tests/cuda_controller/test_2_32pow_elements.py @@ -21,7 +21,7 @@ def test_large_vram_allocation(): controller = CudaGPUController( rank=0, interval=0.5, - matmul_iterations=100, + relu_iterations=100, vram_to_keep=vram_elements, busy_threshold=10, ) diff --git a/tests/cuda_controller/context_manager.py b/tests/cuda_controller/test_context_manager.py similarity index 89% rename from tests/cuda_controller/context_manager.py rename to tests/cuda_controller/test_context_manager.py index e3bb1f0..854f9fd 100644 --- a/tests/cuda_controller/context_manager.py +++ b/tests/cuda_controller/test_context_manager.py @@ -14,7 +14,7 @@ def test_cuda_controller_context_manager(): rank=torch.cuda.device_count() - 1, interval=0.05, vram_to_keep="8MB", - matmul_iterations=64, + relu_iterations=64, ) torch.cuda.set_device(ctrl.rank) @@ -25,6 +25,4 @@ def test_cuda_controller_context_manager(): during_reserved = torch.cuda.memory_reserved(ctrl.rank) assert during_reserved >= before_reserved - if ctrl._thread: - ctrl._thread.join(timeout=2) assert not (ctrl._thread and ctrl._thread.is_alive()) diff --git a/tests/cuda_controller/test_keep_and_release.py b/tests/cuda_controller/test_keep_and_release.py index b549eee..2e9970d 100644 --- a/tests/cuda_controller/test_keep_and_release.py +++ b/tests/cuda_controller/test_keep_and_release.py @@ -13,28 +13,22 @@ def test_cuda_controller_basic(): rank=0, interval=0.05, vram_to_keep="8MB", - matmul_iterations=64, + relu_iterations=64, ) ctrl.keep() time.sleep(0.2) assert ctrl._thread and ctrl._thread.is_alive() - assert ctrl._thread is not None ctrl.release() - ctrl._thread.join(timeout=2) assert not (ctrl._thread and ctrl._thread.is_alive()) ctrl.keep() time.sleep(0.2) assert ctrl._thread and ctrl._thread.is_alive() - assert ctrl._thread is not None ctrl.release() - ctrl._thread.join(timeout=2) assert not (ctrl._thread and ctrl._thread.is_alive()) with ctrl: assert ctrl._thread and ctrl._thread.is_alive() time.sleep(0.2) - assert ctrl._thread is not None - ctrl._thread.join(timeout=2) assert not (ctrl._thread and ctrl._thread.is_alive()) diff --git a/tests/cuda_controller/test_throttle.py b/tests/cuda_controller/test_throttle.py index d767132..fec9a2c 100644 --- a/tests/cuda_controller/test_throttle.py +++ b/tests/cuda_controller/test_throttle.py @@ -18,7 +18,7 @@ def test_controller_respects_busy_threshold(monkeypatch): interval=0.05, vram_to_keep="4MB", busy_threshold=10, - matmul_iterations=8, + relu_iterations=8, ) def fake_utilization(_rank: int) -> int: @@ -28,7 +28,7 @@ def fake_run(matrix): calls["run"] += 1 monkeypatch.setattr(ctrl, "_monitor_utilization", fake_utilization) - monkeypatch.setattr(ctrl, "_run_mat_batch", fake_run) + monkeypatch.setattr(ctrl, "_run_relu_batch", fake_run) ctrl.keep() time.sleep(0.2) From da890d8e4c13db03bbd9cb48c9a6cb65e57b09cb Mon Sep 17 00:00:00 2001 From: Wang Siyuan Date: Sun, 15 Feb 2026 21:01:16 +0800 Subject: [PATCH 3/3] fix(cuda,rocm): validate iterations and stop retrying config errors --- .../single_gpu_controller/cuda_gpu_controller.py | 4 +++- .../single_gpu_controller/rocm_gpu_controller.py | 15 +++++++++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py b/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py index 0732151..2ecc364 100644 --- a/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py +++ b/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py @@ -71,6 +71,8 @@ def __init__( self.interval = interval if matmul_iterations is not None: relu_iterations = matmul_iterations + if relu_iterations <= 0: + raise ValueError("relu_iterations must be positive") self.relu_iterations = relu_iterations self.busy_threshold = busy_threshold self.platform = ComputingPlatform.CUDA @@ -196,7 +198,7 @@ def _run_relu_batch(self, matrix: torch.Tensor) -> None: logger.debug( "rank %s: relu ops batch done - avg %.2f ms", self.rank, - (toc - tic) * 1000 / self.relu_iterations, + (toc - tic) * 1000 / max(1, self.relu_iterations), ) # ------------------------------------------------------------------ diff --git a/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py b/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py index 7365741..735b6ab 100644 --- a/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py +++ b/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py @@ -35,6 +35,7 @@ def __init__( self._stop_evt: Optional[threading.Event] = None self._thread: Optional[threading.Thread] = None self._failure_exc: Optional[Exception] = None + self._num_elements: Optional[int] = None # Lazy rocm_smi import; keep handle for reuse try: @@ -50,6 +51,9 @@ def keep(self) -> None: logger.warning("rank %s: keep thread already running", self.rank) return self._failure_exc = None + self._num_elements = int(self.vram_to_keep) + if self._num_elements <= 0: + raise ValueError("vram_to_keep must be positive") if self._rocm_smi: try: self._rocm_smi.rsmi_init() @@ -100,11 +104,14 @@ def _keep_loop(self) -> None: torch.cuda.set_device(self.rank) tensor = None attempts = 0 + num_elements = self._num_elements if self._num_elements is not None else 0 + if num_elements <= 0: + logger.error( + "rank %s: invalid vram_to_keep=%s", self.rank, self.vram_to_keep + ) + return while not self._stop_evt.is_set(): try: - num_elements = int(self.vram_to_keep) - if num_elements <= 0: - raise ValueError("vram_to_keep must be positive") tensor = torch.rand( num_elements, device=self.device, @@ -112,7 +119,7 @@ def _keep_loop(self) -> None: requires_grad=False, ) break - except (RuntimeError, ValueError) as exc: + except RuntimeError as exc: attempts += 1 logger.error( "rank %s: failed to allocate tensor (attempt %d%s): %s",