Source code for src.utils.auto_gpu

import os
import time
import torch
import logging
import subprocess

__all__ = ["AutoGPU"]

_logger = logging.getLogger(__name__)


[docs] class AutoGPU: """ Automatic GPU memory manager used to select a GPU with sufficient free memory. """
[docs] def __init__(self): """ Initialize AutoGPU and get the currently visible CUDA device list. """ visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") if visible_devices: self.gpu_list = list(map(int, visible_devices.split(","))) else: self.gpu_list = [i for i in range(torch.cuda.device_count())] self.free_memory = { i: self.query_free_memory(j) for i, j in enumerate(self.gpu_list) } # cuda:i -> memory of j-th GPU
[docs] @staticmethod def allocate_gpu(device, memory_MB: int, block_MB: int = None): """ [Internal method] Allocate placeholder memory on the target device. This is used to verify that memory is truly available by actually allocating it, or to proactively reserve GPU memory. Args: device (str or torch.device): Target device. memory_MB (int): Amount of memory to allocate in MB. block_MB (int, optional): Block size. If None, allocate in one shot. Returns: torch.Tensor or List[torch.Tensor]: References to the allocated tensors. """ if block_MB is None: return torch.zeros(memory_MB, 1024, 256, dtype=torch.float32, device=device) else: blocks = [block_MB] * (memory_MB // block_MB) if sum(blocks) < memory_MB: blocks.append(memory_MB % block_MB) assert ( sum(blocks) == memory_MB ), f"Sum of blocks {sum(blocks)} != {memory_MB}" return [ torch.zeros(block, 1024, 256, dtype=torch.float32, device=device) for block in blocks ]
[docs] def choice_gpu(self, memory_MB, interval=600, force=True): """ Select a GPU with enough free memory. This method not only queries `nvidia-smi`, but also tries to allocate memory to verify actual availability. If all GPUs are busy and `force=True`, it blocks and waits. Args: memory_MB (int): Minimum memory required by the task in MB. interval (int, optional): Polling interval in seconds. Default is 600. force (bool, optional): Whether to wait until a GPU becomes available. If False and no GPU is available, returns "cpu". Default is True. Returns: str: Selected device string, such as "cuda:0" or "cpu". """ waiting = False while True: for i, free_memory in self.free_memory.items(): if free_memory < memory_MB: continue try: device = f"cuda:{i}" free_memory1 = self.query_free_memory(self.gpu_list[i]) allocation = self.allocate_gpu( device=device, memory_MB=memory_MB, block_MB=512 ) free_memory2 = self.query_free_memory(self.gpu_list[i]) (_logger.note if waiting else _logger.info)( f"SubProcess[{os.getpid()}]: Choose GPU{self.gpu_list[i]} ({device}) " f"with {memory_MB}MB ({free_memory1}MB -> {free_memory2}MB)" ) del allocation torch.cuda.reset_peak_memory_stats( device ) # Keep allocation from affecting torch.cuda.max_memory_allocated return device except Exception: torch.cuda.empty_cache() continue else: if force: if not waiting: _logger.warning(f"SubProcess[{os.getpid()}]: Waiting GPU...") waiting = True time.sleep(interval) self.update_free_memory() else: # not force _logger.warning(f"SubProcess[{os.getpid()}]: No available GPU!") return "cpu"
[docs] def query_free_memory(self, gpu_id): try: cmd = f"nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits -i {gpu_id}" return int( subprocess.check_output(cmd, shell=True).decode().strip().split("\n")[0] ) except Exception as e: _logger.warning(f"Query CUDA (GPU{gpu_id}) Memory Failed! {e}") return 0
[docs] def update_free_memory(self): for i, j in enumerate(self.gpu_list): self.free_memory[i] = self.query_free_memory(j)