diff --git a/ats/atsMachines/slurmProcessorScheduled.py b/ats/atsMachines/slurmProcessorScheduled.py index 465f5d5..7eceec5 100644 --- a/ats/atsMachines/slurmProcessorScheduled.py +++ b/ats/atsMachines/slurmProcessorScheduled.py @@ -31,6 +31,11 @@ class SlurmProcessorScheduled(lcMachines.LCMachineCore): + # Strings used to determine which node a user wants the test to run + # Used with same_node var + _cached_nodes = None # static/class variable + node_list = [] + lastMessageLine = 0 remainingCapacity_numNodesReported = -1 remainingCapacity_numProcsReported = -1 @@ -72,10 +77,64 @@ def init(self): super(SlurmProcessorScheduled, self).init() - # - # Slurm subtracts one node from the script itself which is running if an salloc is done beforehand. - # original coding looked for 'bin/ats' process, which does not work with all the wrappers that - # projects put around ats. We need another method to determine if the ats wrapper or binary + # Call get_physical_node to cache the hardware node listing before starting jobs + self.get_physical_node(0) + + def expand_nodelist(self, nodelist_field): + """ + Expand a Flux nodelist string like 'rzadams[1002,1005-1007]' into a list of node names. + Handles multiple comma-separated patterns. + """ + nodes = [] + # Regex to find patterns like prefix[range] or prefixNNNN + pattern = re.compile(r'([a-zA-Z0-9_-]+)(?:\[(.*?)\])?') + for match in pattern.finditer(nodelist_field): + prefix = match.group(1) + bracket = match.group(2) + if bracket: + for part in bracket.split(','): + part = part.strip() + if '-' in part: + start, end = map(int, part.split('-')) + nodes.extend([f"{prefix}{i}" for i in range(start, end + 1)]) + else: + nodes.append(f"{prefix}{part}") + else: + nodes.append(prefix) + return nodes + + def get_physical_node(self, rel_index): + """ + Given a relative node number, return the actual physical node within + the Slurm allocation. + """ + if self._cached_nodes is None: + nodelist_str = os.environ.get("SLURM_JOB_NODELIST") + if not nodelist_str: + raise RuntimeError( + "SLURM_JOB_NODELIST is not set. Are you running inside a Slurm allocation/job?" + ) + + # Option 1: if your expand_nodelist already handles Slurm-style nodelists, + # just call it directly: + self._cached_nodes = self.expand_nodelist(nodelist_str) + + # Option 2 (alternative): use scontrol show hostnames to expand for you: + # out = subprocess.check_output( + # f"scontrol show hostnames {nodelist_str}", shell=True + # ).decode().splitlines() + # self._cached_nodes = out + + # Optional logging + log(("Info: Physical Hardware Nodes: %s" % self._cached_nodes), echo=True) + # log(f"Info: Physical Hardware Nodes: {self._cached_nodes}", echo=True) + + nodes = self._cached_nodes + if rel_index < 0 or rel_index >= len(nodes): + raise IndexError(f"Relative index {rel_index} out of range (0-{len(nodes)-1})") + return nodes[rel_index] + + # will be taking up one of the cores. # # Let's try and do this with ENV VARS instead @@ -318,6 +377,21 @@ def calculateCommandList(self, test): if self.exclusive == True: srun_ex_or_sh = "--exclusive" + # If the user requests that a test be run on the same node then we store the identifying string + # and then spread the node identifiers across the nodes we have access to. This way users dont need + # to think about how many nodes there are or which one they want to run on. + srun_nodelist = '--comment="no_nodelist"' + same_node = test.options.get('same_node', None) + if same_node is not None: + if same_node not in self.node_list: + self.node_list.append(same_node) + rel_node = self.node_list.index(same_node) % self.numNodes + physical_node = self.get_physical_node(rel_node) + srun_nodelist = '--nodelist=%s' % physical_node + distribution="no_distribution" + + # print("SAD DEBUG NODELIST = %s" % (srun_nodelist)) + # 2021-07-14 SAD Old logic where we were using overlap for newer SchedMD slurm update # May not be needed now, but leave this in for a bit in case # we need to revert @@ -348,9 +422,12 @@ def calculateCommandList(self, test): # # If the distribution is unset, then set it to cyclic + # If same_node was processed, make a comment, do not add a distribution opion though # if distribution == 'unset': srun_distribution="--distribution=cyclic" + elif distribution=='no_distribution': + srun_distribution="--comment=no_distribution" else: srun_distribution="--distribution=%s" % distribution @@ -494,7 +571,7 @@ def calculateCommandList(self, test): return ["salloc", srun_partition, srun_ex_or_sh, srun_nodes, extra_sargs] + commandList else: return ["srun", srun_mpi_type, "--label", "-J", test.jobname, - srun_partition, srun_ex_or_sh, srun_unbuffered, srun_mpibind, srun_distribution, srun_nodes, srun_cpus_per_task, + srun_partition, srun_ex_or_sh, srun_unbuffered, srun_mpibind, srun_distribution, srun_nodes, srun_cpus_per_task, srun_nodelist, "--ntasks=%i" % np, extra_sargs \ ] + commandList @@ -527,11 +604,12 @@ def calculateCommandList(self, test): if SlurmProcessorScheduled.debugClass: print("SAD DEBUG SRUN800 ") + if self.salloc : return ["salloc", srun_partition, srun_ex_or_sh, srun_nodes, extra_sargs] + commandList else: return ["srun", srun_mpi_type, "--label", "-J", test.jobname, - srun_partition, srun_ex_or_sh, srun_unbuffered, srun_mpibind, srun_distribution, srun_nodes, srun_cpus_per_task, + srun_partition, srun_ex_or_sh, srun_unbuffered, srun_mpibind, srun_distribution, srun_nodes, srun_cpus_per_task, srun_nodelist, "--ntasks=%i" % np, extra_sargs, \ ] + commandList diff --git a/test/HelloSameNode/READ.ME b/test/HelloSameNode/READ.ME index 68fbad1..8514822 100644 --- a/test/HelloSameNode/READ.ME +++ b/test/HelloSameNode/READ.ME @@ -9,11 +9,7 @@ How to use: export PATH=/usr/gapps/ats/${SYS_TYPE}/7.0.${USER}/bin:$PATH -------------------------------------------------------------------------------- -Toss 4 CTS : flux under salloc -Toss 4 EAS : flux native scheduler - -atsflux can be used for either, the difference is how the job starts, but -both can use flux with same 'atsflux' command. +- FLUX -------------------------------------------------------------------------------- export -n MPICH_GPU_SUPPORT_ENABLED ./create_test_ats.py <- create the ats test file @@ -26,6 +22,11 @@ FAILED: 0 PASSED: 32 +-------------------------------------------------------------------------------- +- SLURM +-------------------------------------------------------------------------------- + ./create_test_ats.py <- create the ats test file + atslite1 test.ats <- test using slurm -------------------------------------------------------------------------------- -- end of READ.ME --