diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 6f6c84e..b201da6 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -17,7 +17,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: '3.10' + python-version: '3.13' - name: Install dependencies run: | diff --git a/.github/workflows/pre-commit.yaml b/.github/workflows/pre-commit.yaml index 63bfaa6..9ba2dd5 100644 --- a/.github/workflows/pre-commit.yaml +++ b/.github/workflows/pre-commit.yaml @@ -26,4 +26,4 @@ jobs: - name: Run pre-commit (no fix) run: | - pre-commit run --all-files --hook-stage manual --show-diff-on-failure --color always + pre-commit run --all-files --show-diff-on-failure --color always diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 2c28488..263f1df 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -26,10 +26,10 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install -e . + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi pip install flake8 pytest pip install torch --index-url https://download.pytorch.org/whl/cpu - if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + pip install -e . - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names diff --git a/src/keep_gpu/benchmark.py b/src/keep_gpu/benchmark.py index 5653bae..b9b2c94 100644 --- a/src/keep_gpu/benchmark.py +++ b/src/keep_gpu/benchmark.py @@ -51,7 +51,9 @@ def keep(rank, args): toc = time.time() logger.info( - f"benchmark {rank} matmul: time span: {(toc - tic) * 1000 / 5000:.2f}ms" + "benchmark %s matmul: time span: %.2fms", + rank, + (toc - tic) * 1000 / args.matmul_iterations, ) time.sleep(args.interval) diff --git a/src/keep_gpu/global_gpu_controller/global_gpu_controller.py b/src/keep_gpu/global_gpu_controller/global_gpu_controller.py index 804f2de..38d65bc 100644 --- a/src/keep_gpu/global_gpu_controller/global_gpu_controller.py +++ b/src/keep_gpu/global_gpu_controller/global_gpu_controller.py @@ -51,7 +51,7 @@ def __init__( controller_cls( rank=i, interval=interval, - vram_to_keep=vram_to_keep, + vram_to_keep=self.vram_to_keep, busy_threshold=busy_threshold, ) for i in self.gpu_ids diff --git a/src/keep_gpu/single_gpu_controller/base_gpu_controller.py b/src/keep_gpu/single_gpu_controller/base_gpu_controller.py index 4adc5ff..b40be05 100644 --- a/src/keep_gpu/single_gpu_controller/base_gpu_controller.py +++ b/src/keep_gpu/single_gpu_controller/base_gpu_controller.py @@ -32,7 +32,7 @@ def monitor(self): def keep(self): """ - Method to keep the specified amount of VRAM free. + Method to keep the specified amount of VRAM busy/occupied. Should be implemented by subclasses. """ raise NotImplementedError("Subclasses must implement this method.") @@ -46,7 +46,7 @@ def rest(self): async def _keep(self): """ - Asynchronous method to keep the specified amount of VRAM free. + Asynchronous method to keep the specified amount of VRAM busy/occupied. This is a placeholder for subclasses to implement their logic. """ raise NotImplementedError("Subclasses must implement this method.") diff --git a/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py b/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py index baf9579..2ecc364 100644 --- a/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py +++ b/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py @@ -16,7 +16,7 @@ class CudaGPUController(BaseGPUController): """CudaGPUController Keep a single CUDA GPU busy by repeatedly running lightweight - matrix-multiplication workloads in a background thread. + elementwise workloads in a background thread. Typical usage: @@ -42,7 +42,8 @@ def __init__( *, rank: int, interval: float = 1.0, - matmul_iterations: int = 5000, + relu_iterations: int = 5000, + matmul_iterations: Optional[int] = None, vram_to_keep: str | int = "1000 MB", busy_threshold: int = 10, ): @@ -50,8 +51,11 @@ def __init__( Args: rank (int): Local CUDA device index to occupy. interval (float, optional): Sleep time (seconds) between workload - batches. Defaults to 0.5. - matmul_iterations (int, optional): Number of matmul ops per batch. + batches. Defaults to 1.0. + relu_iterations (int, optional): Number of in-place ReLU ops per + batch. + matmul_iterations (int, optional): Legacy alias for + `relu_iterations`. When set, it overrides `relu_iterations`. vram_to_keep (int or str, optional): Amount of VRAM to keep busy, e.g. `"1000 MB"`, `"20 GB"`, or an integer like `1000 * 1000`. This represents the total size of the matrix allocated to @@ -65,12 +69,17 @@ def __init__( self.rank = rank self.device = torch.device(f"cuda:{rank}") self.interval = interval - self.matmul_iterations = matmul_iterations + if matmul_iterations is not None: + relu_iterations = matmul_iterations + if relu_iterations <= 0: + raise ValueError("relu_iterations must be positive") + self.relu_iterations = relu_iterations self.busy_threshold = busy_threshold self.platform = ComputingPlatform.CUDA self._stop_evt: Optional[threading.Event] = None self._thread: Optional[threading.Thread] = None + self._num_elements: Optional[int] = None @staticmethod def parse_size(text: str) -> int: @@ -86,6 +95,10 @@ def keep(self) -> None: logger.warning("rank %s: keep thread already running", self.rank) return + self._num_elements = int(self.vram_to_keep) + if self._num_elements <= 0: + raise ValueError("vram_to_keep must be positive") + self._stop_evt = threading.Event() self._thread = threading.Thread( target=self._keep_loop, @@ -123,11 +136,17 @@ def __exit__(self, exc_type, exc, tb): def _keep_loop(self) -> None: """Internal: run workloads until stop event is set.""" torch.cuda.set_device(self.rank) + num_elements = self._num_elements if self._num_elements is not None else 0 + if num_elements <= 0: + logger.error( + "rank %s: invalid vram_to_keep=%s", self.rank, self.vram_to_keep + ) + return matrix = None while not self._stop_evt.is_set(): try: matrix = torch.rand( - self.vram_to_keep, + num_elements, device=self.device, dtype=torch.float32, requires_grad=False, @@ -149,7 +168,7 @@ def _keep_loop(self) -> None: gpu_utilization, ) else: - self._run_mat_batch(matrix) + self._run_relu_batch(matrix) time.sleep(self.interval) except RuntimeError as e: # Handle OOM by clearing cache; then sleep and continue @@ -165,11 +184,11 @@ def _keep_loop(self) -> None: # Workload implementation # ------------------------------------------------------------------ @torch.no_grad() - def _run_mat_batch(self, matrix: torch.Tensor) -> None: - """Run a batch of dummy matmuls to keep GPU busy.""" + def _run_relu_batch(self, matrix: torch.Tensor) -> None: + """Run a batch of in-place ReLU ops to keep GPU busy.""" tic = time.time() - for _ in range(self.matmul_iterations): + for _ in range(self.relu_iterations): torch.relu_(matrix) if self._stop_evt.is_set(): break @@ -177,9 +196,9 @@ def _run_mat_batch(self, matrix: torch.Tensor) -> None: toc = time.time() logger.debug( - "rank %s: mat ops batch done - avg %.2f ms", + "rank %s: relu ops batch done - avg %.2f ms", self.rank, - (toc - tic) * 1000 / self.matmul_iterations, + (toc - tic) * 1000 / max(1, self.relu_iterations), ) # ------------------------------------------------------------------ diff --git a/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py b/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py index 2f65b80..735b6ab 100644 --- a/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py +++ b/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py @@ -24,14 +24,18 @@ def __init__( vram_to_keep: str | int = "1000 MB", busy_threshold: int = 10, iterations: int = 5000, + max_allocation_retries: Optional[int] = None, ): super().__init__(vram_to_keep=vram_to_keep, interval=interval) self.rank = rank self.device = torch.device(f"cuda:{rank}") self.busy_threshold = busy_threshold self.iterations = iterations + self.max_allocation_retries = max_allocation_retries self._stop_evt: Optional[threading.Event] = None self._thread: Optional[threading.Thread] = None + self._failure_exc: Optional[Exception] = None + self._num_elements: Optional[int] = None # Lazy rocm_smi import; keep handle for reuse try: @@ -46,6 +50,10 @@ def keep(self) -> None: if self._thread and self._thread.is_alive(): logger.warning("rank %s: keep thread already running", self.rank) return + self._failure_exc = None + self._num_elements = int(self.vram_to_keep) + if self._num_elements <= 0: + raise ValueError("vram_to_keep must be positive") if self._rocm_smi: try: self._rocm_smi.rsmi_init() @@ -62,12 +70,12 @@ def keep(self) -> None: logger.info("rank %s: ROCm keep thread started", self.rank) def release(self) -> None: - if not (self._thread and self._thread.is_alive()): + if self._thread and self._thread.is_alive(): + self._stop_evt.set() + self._thread.join() + torch.cuda.empty_cache() + else: logger.warning("rank %s: keep thread not running", self.rank) - return - self._stop_evt.set() - self._thread.join() - torch.cuda.empty_cache() if self._rocm_smi: try: self._rocm_smi.rsmi_shut_down() @@ -95,21 +103,45 @@ def _query_utilization(self) -> Optional[int]: def _keep_loop(self) -> None: torch.cuda.set_device(self.rank) tensor = None + attempts = 0 + num_elements = self._num_elements if self._num_elements is not None else 0 + if num_elements <= 0: + logger.error( + "rank %s: invalid vram_to_keep=%s", self.rank, self.vram_to_keep + ) + return while not self._stop_evt.is_set(): try: tensor = torch.rand( - self.vram_to_keep, + num_elements, device=self.device, dtype=torch.float32, requires_grad=False, ) break - except RuntimeError: - logger.exception("rank %s: failed to allocate tensor", self.rank) + except RuntimeError as exc: + attempts += 1 + logger.error( + "rank %s: failed to allocate tensor (attempt %d%s): %s", + self.rank, + attempts, + ( + f"/{self.max_allocation_retries}" + if self.max_allocation_retries is not None + else "" + ), + exc, + ) + if ( + self.max_allocation_retries is not None + and attempts >= self.max_allocation_retries + ): + self._failure_exc = RuntimeError( + f"rank {self.rank}: failed to allocate tensor after {attempts} attempts" + ) + logger.error("%s", self._failure_exc) + return time.sleep(self.interval) - if tensor is None: - logger.error("rank %s: failed to allocate tensor, exiting loop", self.rank) - raise RuntimeError("Failed to allocate tensor for ROCm GPU keeping") while not self._stop_evt.is_set(): try: @@ -127,6 +159,14 @@ def _keep_loop(self) -> None: logger.exception("rank %s: unexpected error", self.rank) time.sleep(self.interval) + def allocation_status(self) -> Optional[Exception]: + """ + Return allocation failure captured in the worker thread, if any. + + The reference assignment/read is thread-safe for CPython's GIL model. + """ + return self._failure_exc + @torch.no_grad() def _run_batch(self, tensor: torch.Tensor) -> None: tic = time.time() diff --git a/src/keep_gpu/utilities/humanized_input.py b/src/keep_gpu/utilities/humanized_input.py index 9432638..4936497 100644 --- a/src/keep_gpu/utilities/humanized_input.py +++ b/src/keep_gpu/utilities/humanized_input.py @@ -17,6 +17,13 @@ def parse_size(text: str) -> int: + """ + Parse human-readable memory strings into float32 element counts. + + The return value is the number of float32 elements needed to occupy the + requested memory size. When no unit is provided, the default unit is GB. + Supported units are the keys in `_UNITS`. + """ text = text.strip().replace(" ", "") m = re.fullmatch(r"([0-9]*\.?[0-9]+)([A-Za-z]*)", text) if not m: @@ -24,7 +31,9 @@ def parse_size(text: str) -> int: value, unit = m.groups() unit = unit or "GB" if len(unit) > 1: - unit = unit[:-1].upper() + unit[-1] + # Treat all-lowercase units as byte units ("gb" -> "GB", "gib" -> "GIB") + # while preserving explicit mixed-case bit forms ("Gb", "GIb"). + unit = unit.upper() if unit.islower() else unit[:-1].upper() + unit[-1] if unit not in _UNITS: raise ValueError(f"unknown unit: {unit}, should be one of {_UNITS.keys()}") return int(float(value) * _UNITS[unit] / 4) # float32 takes 4 bytes diff --git a/src/keep_gpu/utilities/logger.py b/src/keep_gpu/utilities/logger.py index a9bc0b1..2a5dc02 100644 --- a/src/keep_gpu/utilities/logger.py +++ b/src/keep_gpu/utilities/logger.py @@ -29,10 +29,13 @@ def _build_console_handler(level: int) -> logging.Handler: """Create a colored console handler with filename:lineno.""" handler = logging.StreamHandler(sys.stdout) handler.setLevel(level) - fmt = "%(log_color)s%(asctime)s [%(levelname)s] %(name)s (%(filename)s:%(lineno)d): %(message)s" + color_fmt = "%(log_color)s%(asctime)s [%(levelname)s] %(name)s (%(filename)s:%(lineno)d): %(message)s" + plain_fmt = ( + "%(asctime)s [%(levelname)s] %(name)s (%(filename)s:%(lineno)d): %(message)s" + ) if ColoredFormatter: formatter = ColoredFormatter( - fmt, + color_fmt, datefmt="%H:%M:%S", log_colors={ "DEBUG": "cyan", @@ -43,7 +46,7 @@ def _build_console_handler(level: int) -> logging.Handler: }, ) else: - formatter = logging.Formatter(fmt, "%H:%M:%S") + formatter = logging.Formatter(plain_fmt, "%H:%M:%S") handler.setFormatter(formatter) return handler @@ -53,7 +56,7 @@ def _build_file_handler( ) -> logging.Handler: """Create a file handler with filename:lineno.""" log_dir = Path(log_dir) - log_dir.mkdir(exist_ok=True) + log_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") file_path = log_dir / f"{name}_{timestamp}.log" diff --git a/tests/conftest.py b/tests/conftest.py index 33fd177..004d2ba 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,4 @@ import pytest -import torch def pytest_addoption(parser): @@ -23,4 +22,11 @@ def pytest_collection_modifyitems(config, items): @pytest.fixture def rocm_available(): - return bool(torch.cuda.is_available() and getattr(torch.version, "hip", None)) + try: + import torch + except Exception: + return False + try: + return bool(torch.cuda.is_available() and getattr(torch.version, "hip", None)) + except Exception: + return False diff --git a/tests/cuda_controller/context_manager.py b/tests/cuda_controller/context_manager.py deleted file mode 100644 index de21be7..0000000 --- a/tests/cuda_controller/context_manager.py +++ /dev/null @@ -1,24 +0,0 @@ -import time -import torch -import pytest -from keep_gpu.single_gpu_controller.cuda_gpu_controller import CudaGPUController - - -@pytest.mark.skipif( - not torch.cuda.is_available(), - reason="Only run CUDA tests when CUDA is available", -) -def test_cuda_controller_context_manager(): - ctrl = CudaGPUController( - rank=torch.cuda.device_count() - 1, interval=10, vram_to_keep="1GB" - ) - - with ctrl: - print("GPU kept busy for 10 seconds.") - time.sleep(10) - print("GPU released.") - print("Test completed successfully.") - - -if __name__ == "__main__": - test_cuda_controller_context_manager() diff --git a/tests/cuda_controller/test_2_32pow_elements.py b/tests/cuda_controller/test_2_32pow_elements.py index 7bcfe5a..a6dedd6 100644 --- a/tests/cuda_controller/test_2_32pow_elements.py +++ b/tests/cuda_controller/test_2_32pow_elements.py @@ -8,15 +8,20 @@ @pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available") def test_large_vram_allocation(): """Tests controller with a large VRAM allocation.""" - # Using a smaller allocation for general testing. The original 2**32 can be used on machines with sufficient VRAM. - # torch has some indexing issues on very large tensors - # e.g. tensors with more than 2**32-1 elements may cause issues - # just a test to see if it is real. - vram_elements = 2**32 # Allocates 16GiB to test large tensor handling + # Intentionally using full 2**32 float32 elements (~16 GiB) for large-tensor testing. + # Torch may expose indexing issues around this boundary on some systems. + vram_elements = 2**32 + required_bytes = vram_elements * 4 + free_bytes, _ = torch.cuda.mem_get_info(0) + if free_bytes < required_bytes: + pytest.skip( + f"Insufficient free VRAM for large test: need {required_bytes}, have {free_bytes}" + ) + controller = CudaGPUController( rank=0, interval=0.5, - matmul_iterations=100, + relu_iterations=100, vram_to_keep=vram_elements, busy_threshold=10, ) @@ -27,7 +32,3 @@ def test_large_vram_allocation(): assert controller._thread is not None and controller._thread.is_alive() finally: controller.release() - - -if __name__ == "__main__": - test_large_vram_allocation() diff --git a/tests/cuda_controller/test_context_manager.py b/tests/cuda_controller/test_context_manager.py new file mode 100644 index 0000000..854f9fd --- /dev/null +++ b/tests/cuda_controller/test_context_manager.py @@ -0,0 +1,28 @@ +import time +import pytest +import torch + +from keep_gpu.single_gpu_controller.cuda_gpu_controller import CudaGPUController + + +@pytest.mark.skipif( + not torch.cuda.is_available(), + reason="Only run CUDA tests when CUDA is available", +) +def test_cuda_controller_context_manager(): + ctrl = CudaGPUController( + rank=torch.cuda.device_count() - 1, + interval=0.05, + vram_to_keep="8MB", + relu_iterations=64, + ) + + torch.cuda.set_device(ctrl.rank) + before_reserved = torch.cuda.memory_reserved(ctrl.rank) + with ctrl: + time.sleep(0.3) + assert ctrl._thread and ctrl._thread.is_alive() + during_reserved = torch.cuda.memory_reserved(ctrl.rank) + assert during_reserved >= before_reserved + + assert not (ctrl._thread and ctrl._thread.is_alive()) diff --git a/tests/cuda_controller/test_keep_and_release.py b/tests/cuda_controller/test_keep_and_release.py index e23ff44..2e9970d 100644 --- a/tests/cuda_controller/test_keep_and_release.py +++ b/tests/cuda_controller/test_keep_and_release.py @@ -13,7 +13,7 @@ def test_cuda_controller_basic(): rank=0, interval=0.05, vram_to_keep="8MB", - matmul_iterations=64, + relu_iterations=64, ) ctrl.keep() time.sleep(0.2) @@ -32,7 +32,3 @@ def test_cuda_controller_basic(): assert ctrl._thread and ctrl._thread.is_alive() time.sleep(0.2) assert not (ctrl._thread and ctrl._thread.is_alive()) - - -if __name__ == "__main__": - test_cuda_controller_basic() diff --git a/tests/cuda_controller/test_throttle.py b/tests/cuda_controller/test_throttle.py index d767132..fec9a2c 100644 --- a/tests/cuda_controller/test_throttle.py +++ b/tests/cuda_controller/test_throttle.py @@ -18,7 +18,7 @@ def test_controller_respects_busy_threshold(monkeypatch): interval=0.05, vram_to_keep="4MB", busy_threshold=10, - matmul_iterations=8, + relu_iterations=8, ) def fake_utilization(_rank: int) -> int: @@ -28,7 +28,7 @@ def fake_run(matrix): calls["run"] += 1 monkeypatch.setattr(ctrl, "_monitor_utilization", fake_utilization) - monkeypatch.setattr(ctrl, "_run_mat_batch", fake_run) + monkeypatch.setattr(ctrl, "_run_relu_batch", fake_run) ctrl.keep() time.sleep(0.2) diff --git a/tests/global_controller/global_keep_test.py b/tests/global_controller/global_keep_test.py index 0d84c8f..0f680d3 100644 --- a/tests/global_controller/global_keep_test.py +++ b/tests/global_controller/global_keep_test.py @@ -23,7 +23,3 @@ def test_global_controller(): controller.release() for ctrl in controller.controllers: assert not (ctrl._thread and ctrl._thread.is_alive()) - - -if __name__ == "__main__": - test_global_controller()