From c24e964e668fbb54458e4304f6ced60281ed91ad Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Thu, 5 Feb 2026 10:56:12 -0800 Subject: [PATCH 1/4] propagate spot tags Signed-off-by: Ayush Kamat --- LATCH_VERSION | 2 +- .../src/main/groovy/nextflow/forch/ForchExecutor.groovy | 2 +- .../main/groovy/nextflow/forch/ForchTaskHandler.groovy | 3 ++- .../src/main/groovy/nextflow/processor/TaskConfig.groovy | 4 ++++ .../src/main/groovy/nextflow/util/ForchClient.groovy | 8 ++++++-- 5 files changed, 14 insertions(+), 5 deletions(-) diff --git a/LATCH_VERSION b/LATCH_VERSION index cd76e98ba3..ca615aad2f 100644 --- a/LATCH_VERSION +++ b/LATCH_VERSION @@ -1 +1 @@ -v3.0.36 +v3.0.37 diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy index af8612cd16..d43691631c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy @@ -52,7 +52,7 @@ class ForchExecutor extends Executor { @Override void shutdown() { - def status = session.success ? "SUCCEEDED" : ((session.aborted || session.cancelled) ? "ABORTED" : "FAILED") + def status = session.success ? "SUCCEEDED" : "FAILED" this.dispatcherClient.updateExecutionStatus(status) String nfsServerTaskId = System.getenv("nfs_server_task_id") diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy index e212c19b03..c0de883ab2 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -128,7 +128,8 @@ class ForchTaskHandler extends TaskHandler { entrypoint, cpus, memory.bytes, - shm?.bytes ?: 0 + shm?.bytes ?: 0, + this.task.config.spot ? "spot" : "on-demand", ) // todo(rahul): put this in a single transaction with submitTask diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy index cff070f73c..467fd9921a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy @@ -212,6 +212,10 @@ class TaskConfig extends LazyMap implements Cloneable { return get('stageOutMode') } + boolean getSpot() { + return toBool(get('spot')) + } + boolean getDebug() { // check both `debug` and `echo` for backward // compatibility until `echo` is not removed diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index 4f121d9c24..157387812f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -13,7 +13,8 @@ class ForchClient { List entrypoint, int cpus, long memoryBytes, - long shmBytes // nullable + long shmBytes, // nullable + String capacityType ) { String resourceGroup = System.getenv("forch_resource_group_id") if (resourceGroup == null) @@ -39,10 +40,11 @@ class ForchClient { \$shmBytes: BigInt, \$gpuType: String, \$gpus: Int!, + \$capacityType: String! \$groupId: BigInt!, \$billedTo: BigInt!, \$nfsServerTaskId: BigInt!, - \$targetRegion: String! + \$targetRegion: String!, ) { nfCreateForchTask( input: { @@ -54,6 +56,7 @@ class ForchClient { argShmBytes: \$shmBytes, argGpuType: \$gpuType, argGpus: \$gpus, + argCapacityType: \$capacityType, argGroupId: \$groupId, argBilledTo: \$billedTo, argNfsServerTaskId: \$nfsServerTaskId, @@ -73,6 +76,7 @@ class ForchClient { "shmBytes": shmBytes == 0 ? null : shmBytes, "gpuType" : null, "gpus" : 0, + "capacityType": capacityType, "groupId": resourceGroup.toInteger(), "billedTo": billingGroup.toInteger(), "nfsServerTaskId": nfsServerTaskId, From 6d6a614b9e1d33afd8b2a0712f9c9bb89a5d4a63 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Thu, 5 Feb 2026 11:54:55 -0800 Subject: [PATCH 2/4] prettify query Signed-off-by: Ayush Kamat --- .../groovy/nextflow/util/ForchClient.groovy | 51 ++++++++++--------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index 157387812f..86b2edaa47 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -30,36 +30,37 @@ class ForchClient { String region = System.getenv("host_region") ?: "us-west-2" - Map res = client.execute(""" + Map res = client.execute( + """ mutation CreateForchTask( - \$displayName: String!, - \$containerImage: String!, - \$containerEntrypoint: [String]!, - \$cpus: Int!, - \$memoryBytes: BigInt!, - \$shmBytes: BigInt, - \$gpuType: String, - \$gpus: Int!, + \$displayName: String! + \$containerImage: String! + \$containerEntrypoint: [String]! + \$cpus: Int! + \$memoryBytes: BigInt! + \$shmBytes: BigInt + \$gpuType: String + \$gpus: Int! \$capacityType: String! - \$groupId: BigInt!, - \$billedTo: BigInt!, - \$nfsServerTaskId: BigInt!, - \$targetRegion: String!, + \$groupId: BigInt! + \$billedTo: BigInt! + \$nfsServerTaskId: BigInt! + \$targetRegion: String! ) { nfCreateForchTask( input: { - argDisplayName: \$displayName, - argContainerImage: \$containerImage, - argContainerEntrypoint: \$containerEntrypoint, - argCpus: \$cpus, - argMemoryBytes: \$memoryBytes, - argShmBytes: \$shmBytes, - argGpuType: \$gpuType, - argGpus: \$gpus, - argCapacityType: \$capacityType, - argGroupId: \$groupId, - argBilledTo: \$billedTo, - argNfsServerTaskId: \$nfsServerTaskId, + argDisplayName: \$displayName + argContainerImage: \$containerImage + argContainerEntrypoint: \$containerEntrypoint + argCpus: \$cpus + argMemoryBytes: \$memoryBytes + argShmBytes: \$shmBytes + argGpuType: \$gpuType + argGpus: \$gpus + argCapacityType: \$capacityType + argGroupId: \$groupId + argBilledTo: \$billedTo + argNfsServerTaskId: \$nfsServerTaskId argTargetRegion: \$targetRegion } ) { From f0d56f2d091bc025edf0e1093d8d350f39b71eed Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Thu, 5 Feb 2026 13:50:48 -0800 Subject: [PATCH 3/4] add fallback for preemption Signed-off-by: Ayush Kamat --- .../nextflow/forch/ForchTaskMonitor.groovy | 28 ------------------- .../groovy/nextflow/util/ForchClient.groovy | 9 +++++- 2 files changed, 8 insertions(+), 29 deletions(-) delete mode 100644 modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskMonitor.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskMonitor.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskMonitor.groovy deleted file mode 100644 index 35150dd9d9..0000000000 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskMonitor.groovy +++ /dev/null @@ -1,28 +0,0 @@ -package nextflow.forch - -import groovy.util.logging.Slf4j -import nextflow.processor.TaskHandler -import nextflow.processor.TaskMonitor - -@Slf4j -class ForchTaskMonitor implements TaskMonitor { - @Override - void schedule(TaskHandler handler) { - - } - - @Override - boolean evict(TaskHandler handler) { - return false - } - - @Override - TaskMonitor start() { - return null - } - - @Override - void signal() { - // noop - } -} diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index 86b2edaa47..6e1cb16042 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -144,7 +144,14 @@ class ForchClient { if (nodes == null || nodes.size() == 0) return -1 - return nodes[0]["taskEventContainerExitedDatumById"]["exitStatus"] as int + def teced = nodes[0]["taskEventContainerExitedDatumById"]; + + // note(ayush): the only time we have an exit event without an exit code is if the node it was running on was killed + // either manually or via spot preemption - in either case, treat that as a system kill and provide a 137 status + if (teced == null) + return 137 + + return teced["exitStatus"] as int } void abortTasks(List taskIds) { From 9f0fef18362cf3c0423c085086ad43e8ba0677c4 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Thu, 12 Feb 2026 15:58:40 -0800 Subject: [PATCH 4/4] save state Signed-off-by: Ayush Kamat --- LATCH_VERSION | 2 +- .../src/main/groovy/nextflow/util/ForchClient.groovy | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/LATCH_VERSION b/LATCH_VERSION index ca615aad2f..a2d729d9aa 100644 --- a/LATCH_VERSION +++ b/LATCH_VERSION @@ -1 +1 @@ -v3.0.37 +v3.0.39 diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index 6e1cb16042..cd8d2983e3 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -116,9 +116,8 @@ class ForchClient { Map res = client.execute(""" query GetTaskExitCode(\$taskId: BigInt!) { taskEvents( - condition: {taskId: \$taskId}, - filter: {taskEventContainerExitedDatumByIdExists: true}, - orderBy: TIME_DESC, + condition: { taskId: \$taskId, type: "container-exited" } + orderBy: TIME_DESC first: 1 ) { nodes {