Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 84 additions & 6 deletions ats/atsMachines/slurmProcessorScheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@

class SlurmProcessorScheduled(lcMachines.LCMachineCore):

# Strings used to determine which node a user wants the test to run
# Used with same_node var
_cached_nodes = None # static/class variable
node_list = []

lastMessageLine = 0
remainingCapacity_numNodesReported = -1
remainingCapacity_numProcsReported = -1
Expand Down Expand Up @@ -72,10 +77,64 @@ def init(self):

super(SlurmProcessorScheduled, self).init()

#
# Slurm subtracts one node from the script itself which is running if an salloc is done beforehand.
# original coding looked for 'bin/ats' process, which does not work with all the wrappers that
# projects put around ats. We need another method to determine if the ats wrapper or binary
# Call get_physical_node to cache the hardware node listing before starting jobs
self.get_physical_node(0)

def expand_nodelist(self, nodelist_field):
"""
Expand a Flux nodelist string like 'rzadams[1002,1005-1007]' into a list of node names.
Handles multiple comma-separated patterns.
"""
nodes = []
# Regex to find patterns like prefix[range] or prefixNNNN
pattern = re.compile(r'([a-zA-Z0-9_-]+)(?:\[(.*?)\])?')
for match in pattern.finditer(nodelist_field):
prefix = match.group(1)
bracket = match.group(2)
if bracket:
for part in bracket.split(','):
part = part.strip()
if '-' in part:
start, end = map(int, part.split('-'))
nodes.extend([f"{prefix}{i}" for i in range(start, end + 1)])
else:
nodes.append(f"{prefix}{part}")
else:
nodes.append(prefix)
return nodes

def get_physical_node(self, rel_index):
"""
Given a relative node number, return the actual physical node within
the Slurm allocation.
"""
if self._cached_nodes is None:
nodelist_str = os.environ.get("SLURM_JOB_NODELIST")
if not nodelist_str:
raise RuntimeError(
"SLURM_JOB_NODELIST is not set. Are you running inside a Slurm allocation/job?"
)

# Option 1: if your expand_nodelist already handles Slurm-style nodelists,
# just call it directly:
self._cached_nodes = self.expand_nodelist(nodelist_str)

# Option 2 (alternative): use scontrol show hostnames to expand for you:
# out = subprocess.check_output(
# f"scontrol show hostnames {nodelist_str}", shell=True
# ).decode().splitlines()
# self._cached_nodes = out

# Optional logging
log(("Info: Physical Hardware Nodes: %s" % self._cached_nodes), echo=True)
# log(f"Info: Physical Hardware Nodes: {self._cached_nodes}", echo=True)

nodes = self._cached_nodes
if rel_index < 0 or rel_index >= len(nodes):
raise IndexError(f"Relative index {rel_index} out of range (0-{len(nodes)-1})")
return nodes[rel_index]


# will be taking up one of the cores.
#
# Let's try and do this with ENV VARS instead
Expand Down Expand Up @@ -318,6 +377,21 @@ def calculateCommandList(self, test):
if self.exclusive == True:
srun_ex_or_sh = "--exclusive"

# If the user requests that a test be run on the same node then we store the identifying string
# and then spread the node identifiers across the nodes we have access to. This way users dont need
# to think about how many nodes there are or which one they want to run on.
srun_nodelist = '--comment="no_nodelist"'
same_node = test.options.get('same_node', None)
if same_node is not None:
if same_node not in self.node_list:
self.node_list.append(same_node)
rel_node = self.node_list.index(same_node) % self.numNodes
physical_node = self.get_physical_node(rel_node)
srun_nodelist = '--nodelist=%s' % physical_node
distribution="no_distribution"

# print("SAD DEBUG NODELIST = %s" % (srun_nodelist))

# 2021-07-14 SAD Old logic where we were using overlap for newer SchedMD slurm update
# May not be needed now, but leave this in for a bit in case
# we need to revert
Expand Down Expand Up @@ -348,9 +422,12 @@ def calculateCommandList(self, test):

#
# If the distribution is unset, then set it to cyclic
# If same_node was processed, make a comment, do not add a distribution opion though
#
if distribution == 'unset':
srun_distribution="--distribution=cyclic"
elif distribution=='no_distribution':
srun_distribution="--comment=no_distribution"
else:
srun_distribution="--distribution=%s" % distribution

Expand Down Expand Up @@ -494,7 +571,7 @@ def calculateCommandList(self, test):
return ["salloc", srun_partition, srun_ex_or_sh, srun_nodes, extra_sargs] + commandList
else:
return ["srun", srun_mpi_type, "--label", "-J", test.jobname,
srun_partition, srun_ex_or_sh, srun_unbuffered, srun_mpibind, srun_distribution, srun_nodes, srun_cpus_per_task,
srun_partition, srun_ex_or_sh, srun_unbuffered, srun_mpibind, srun_distribution, srun_nodes, srun_cpus_per_task, srun_nodelist,
"--ntasks=%i" % np, extra_sargs \
] + commandList

Expand Down Expand Up @@ -527,11 +604,12 @@ def calculateCommandList(self, test):
if SlurmProcessorScheduled.debugClass:
print("SAD DEBUG SRUN800 ")


if self.salloc :
return ["salloc", srun_partition, srun_ex_or_sh, srun_nodes, extra_sargs] + commandList
else:
return ["srun", srun_mpi_type, "--label", "-J", test.jobname,
srun_partition, srun_ex_or_sh, srun_unbuffered, srun_mpibind, srun_distribution, srun_nodes, srun_cpus_per_task,
srun_partition, srun_ex_or_sh, srun_unbuffered, srun_mpibind, srun_distribution, srun_nodes, srun_cpus_per_task, srun_nodelist,
"--ntasks=%i" % np, extra_sargs, \
] + commandList

Expand Down
11 changes: 6 additions & 5 deletions test/HelloSameNode/READ.ME
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ How to use:
export PATH=/usr/gapps/ats/${SYS_TYPE}/7.0.${USER}/bin:$PATH

--------------------------------------------------------------------------------
Toss 4 CTS : flux under salloc
Toss 4 EAS : flux native scheduler

atsflux can be used for either, the difference is how the job starts, but
both can use flux with same 'atsflux' command.
- FLUX
--------------------------------------------------------------------------------
export -n MPICH_GPU_SUPPORT_ENABLED
./create_test_ats.py <- create the ats test file
Expand All @@ -26,6 +22,11 @@ FAILED: 0
PASSED: 32


--------------------------------------------------------------------------------
- SLURM
--------------------------------------------------------------------------------
./create_test_ats.py <- create the ats test file
atslite1 test.ats <- test using slurm

--------------------------------------------------------------------------------
-- end of READ.ME --
Expand Down