From 06a0eb7b5ceba079b921069ed9c8156f784e3492 Mon Sep 17 00:00:00 2001 From: Shawn Dawson Date: Thu, 15 Jan 2026 08:38:28 -0800 Subject: [PATCH] same_node processing for slurm. However, slurm does not honor this option in all cases. test23 and test24 in the HelloSameNode tickel this bug. I have tried various slurm options to get slurm to comply, but there are scenarios where it will ignore the requested node and just run it wherever slurm thinks there are free nodes. Thus, at this time same_node processing under slurm is not reliable. --- ats/atsMachines/slurmProcessorScheduled.py | 90 ++++++++++++++++++++-- test/HelloSameNode/READ.ME | 11 +-- 2 files changed, 90 insertions(+), 11 deletions(-) diff --git a/ats/atsMachines/slurmProcessorScheduled.py b/ats/atsMachines/slurmProcessorScheduled.py index 465f5d5..7eceec5 100644 --- a/ats/atsMachines/slurmProcessorScheduled.py +++ b/ats/atsMachines/slurmProcessorScheduled.py @@ -31,6 +31,11 @@ class SlurmProcessorScheduled(lcMachines.LCMachineCore): + # Strings used to determine which node a user wants the test to run + # Used with same_node var + _cached_nodes = None # static/class variable + node_list = [] + lastMessageLine = 0 remainingCapacity_numNodesReported = -1 remainingCapacity_numProcsReported = -1 @@ -72,10 +77,64 @@ def init(self): super(SlurmProcessorScheduled, self).init() - # - # Slurm subtracts one node from the script itself which is running if an salloc is done beforehand. - # original coding looked for 'bin/ats' process, which does not work with all the wrappers that - # projects put around ats. We need another method to determine if the ats wrapper or binary + # Call get_physical_node to cache the hardware node listing before starting jobs + self.get_physical_node(0) + + def expand_nodelist(self, nodelist_field): + """ + Expand a Flux nodelist string like 'rzadams[1002,1005-1007]' into a list of node names. + Handles multiple comma-separated patterns. + """ + nodes = [] + # Regex to find patterns like prefix[range] or prefixNNNN + pattern = re.compile(r'([a-zA-Z0-9_-]+)(?:\[(.*?)\])?') + for match in pattern.finditer(nodelist_field): + prefix = match.group(1) + bracket = match.group(2) + if bracket: + for part in bracket.split(','): + part = part.strip() + if '-' in part: + start, end = map(int, part.split('-')) + nodes.extend([f"{prefix}{i}" for i in range(start, end + 1)]) + else: + nodes.append(f"{prefix}{part}") + else: + nodes.append(prefix) + return nodes + + def get_physical_node(self, rel_index): + """ + Given a relative node number, return the actual physical node within + the Slurm allocation. + """ + if self._cached_nodes is None: + nodelist_str = os.environ.get("SLURM_JOB_NODELIST") + if not nodelist_str: + raise RuntimeError( + "SLURM_JOB_NODELIST is not set. Are you running inside a Slurm allocation/job?" + ) + + # Option 1: if your expand_nodelist already handles Slurm-style nodelists, + # just call it directly: + self._cached_nodes = self.expand_nodelist(nodelist_str) + + # Option 2 (alternative): use scontrol show hostnames to expand for you: + # out = subprocess.check_output( + # f"scontrol show hostnames {nodelist_str}", shell=True + # ).decode().splitlines() + # self._cached_nodes = out + + # Optional logging + log(("Info: Physical Hardware Nodes: %s" % self._cached_nodes), echo=True) + # log(f"Info: Physical Hardware Nodes: {self._cached_nodes}", echo=True) + + nodes = self._cached_nodes + if rel_index < 0 or rel_index >= len(nodes): + raise IndexError(f"Relative index {rel_index} out of range (0-{len(nodes)-1})") + return nodes[rel_index] + + # will be taking up one of the cores. # # Let's try and do this with ENV VARS instead @@ -318,6 +377,21 @@ def calculateCommandList(self, test): if self.exclusive == True: srun_ex_or_sh = "--exclusive" + # If the user requests that a test be run on the same node then we store the identifying string + # and then spread the node identifiers across the nodes we have access to. This way users dont need + # to think about how many nodes there are or which one they want to run on. + srun_nodelist = '--comment="no_nodelist"' + same_node = test.options.get('same_node', None) + if same_node is not None: + if same_node not in self.node_list: + self.node_list.append(same_node) + rel_node = self.node_list.index(same_node) % self.numNodes + physical_node = self.get_physical_node(rel_node) + srun_nodelist = '--nodelist=%s' % physical_node + distribution="no_distribution" + + # print("SAD DEBUG NODELIST = %s" % (srun_nodelist)) + # 2021-07-14 SAD Old logic where we were using overlap for newer SchedMD slurm update # May not be needed now, but leave this in for a bit in case # we need to revert @@ -348,9 +422,12 @@ def calculateCommandList(self, test): # # If the distribution is unset, then set it to cyclic + # If same_node was processed, make a comment, do not add a distribution opion though # if distribution == 'unset': srun_distribution="--distribution=cyclic" + elif distribution=='no_distribution': + srun_distribution="--comment=no_distribution" else: srun_distribution="--distribution=%s" % distribution @@ -494,7 +571,7 @@ def calculateCommandList(self, test): return ["salloc", srun_partition, srun_ex_or_sh, srun_nodes, extra_sargs] + commandList else: return ["srun", srun_mpi_type, "--label", "-J", test.jobname, - srun_partition, srun_ex_or_sh, srun_unbuffered, srun_mpibind, srun_distribution, srun_nodes, srun_cpus_per_task, + srun_partition, srun_ex_or_sh, srun_unbuffered, srun_mpibind, srun_distribution, srun_nodes, srun_cpus_per_task, srun_nodelist, "--ntasks=%i" % np, extra_sargs \ ] + commandList @@ -527,11 +604,12 @@ def calculateCommandList(self, test): if SlurmProcessorScheduled.debugClass: print("SAD DEBUG SRUN800 ") + if self.salloc : return ["salloc", srun_partition, srun_ex_or_sh, srun_nodes, extra_sargs] + commandList else: return ["srun", srun_mpi_type, "--label", "-J", test.jobname, - srun_partition, srun_ex_or_sh, srun_unbuffered, srun_mpibind, srun_distribution, srun_nodes, srun_cpus_per_task, + srun_partition, srun_ex_or_sh, srun_unbuffered, srun_mpibind, srun_distribution, srun_nodes, srun_cpus_per_task, srun_nodelist, "--ntasks=%i" % np, extra_sargs, \ ] + commandList diff --git a/test/HelloSameNode/READ.ME b/test/HelloSameNode/READ.ME index 68fbad1..8514822 100644 --- a/test/HelloSameNode/READ.ME +++ b/test/HelloSameNode/READ.ME @@ -9,11 +9,7 @@ How to use: export PATH=/usr/gapps/ats/${SYS_TYPE}/7.0.${USER}/bin:$PATH -------------------------------------------------------------------------------- -Toss 4 CTS : flux under salloc -Toss 4 EAS : flux native scheduler - -atsflux can be used for either, the difference is how the job starts, but -both can use flux with same 'atsflux' command. +- FLUX -------------------------------------------------------------------------------- export -n MPICH_GPU_SUPPORT_ENABLED ./create_test_ats.py <- create the ats test file @@ -26,6 +22,11 @@ FAILED: 0 PASSED: 32 +-------------------------------------------------------------------------------- +- SLURM +-------------------------------------------------------------------------------- + ./create_test_ats.py <- create the ats test file + atslite1 test.ats <- test using slurm -------------------------------------------------------------------------------- -- end of READ.ME --