diff --git a/LATCH_VERSION b/LATCH_VERSION index cd76e98ba3..a2d729d9aa 100644 --- a/LATCH_VERSION +++ b/LATCH_VERSION @@ -1 +1 @@ -v3.0.36 +v3.0.39 diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy index af8612cd16..d43691631c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy @@ -52,7 +52,7 @@ class ForchExecutor extends Executor { @Override void shutdown() { - def status = session.success ? "SUCCEEDED" : ((session.aborted || session.cancelled) ? "ABORTED" : "FAILED") + def status = session.success ? "SUCCEEDED" : "FAILED" this.dispatcherClient.updateExecutionStatus(status) String nfsServerTaskId = System.getenv("nfs_server_task_id") diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy index e212c19b03..c0de883ab2 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -128,7 +128,8 @@ class ForchTaskHandler extends TaskHandler { entrypoint, cpus, memory.bytes, - shm?.bytes ?: 0 + shm?.bytes ?: 0, + this.task.config.spot ? "spot" : "on-demand", ) // todo(rahul): put this in a single transaction with submitTask diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskMonitor.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskMonitor.groovy deleted file mode 100644 index 35150dd9d9..0000000000 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskMonitor.groovy +++ /dev/null @@ -1,28 +0,0 @@ -package nextflow.forch - -import groovy.util.logging.Slf4j -import nextflow.processor.TaskHandler -import nextflow.processor.TaskMonitor - -@Slf4j -class ForchTaskMonitor implements TaskMonitor { - @Override - void schedule(TaskHandler handler) { - - } - - @Override - boolean evict(TaskHandler handler) { - return false - } - - @Override - TaskMonitor start() { - return null - } - - @Override - void signal() { - // noop - } -} diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy index cff070f73c..467fd9921a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy @@ -212,6 +212,10 @@ class TaskConfig extends LazyMap implements Cloneable { return get('stageOutMode') } + boolean getSpot() { + return toBool(get('spot')) + } + boolean getDebug() { // check both `debug` and `echo` for backward // compatibility until `echo` is not removed diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index 4f121d9c24..cd8d2983e3 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -13,7 +13,8 @@ class ForchClient { List entrypoint, int cpus, long memoryBytes, - long shmBytes // nullable + long shmBytes, // nullable + String capacityType ) { String resourceGroup = System.getenv("forch_resource_group_id") if (resourceGroup == null) @@ -29,34 +30,37 @@ class ForchClient { String region = System.getenv("host_region") ?: "us-west-2" - Map res = client.execute(""" + Map res = client.execute( + """ mutation CreateForchTask( - \$displayName: String!, - \$containerImage: String!, - \$containerEntrypoint: [String]!, - \$cpus: Int!, - \$memoryBytes: BigInt!, - \$shmBytes: BigInt, - \$gpuType: String, - \$gpus: Int!, - \$groupId: BigInt!, - \$billedTo: BigInt!, - \$nfsServerTaskId: BigInt!, + \$displayName: String! + \$containerImage: String! + \$containerEntrypoint: [String]! + \$cpus: Int! + \$memoryBytes: BigInt! + \$shmBytes: BigInt + \$gpuType: String + \$gpus: Int! + \$capacityType: String! + \$groupId: BigInt! + \$billedTo: BigInt! + \$nfsServerTaskId: BigInt! \$targetRegion: String! ) { nfCreateForchTask( input: { - argDisplayName: \$displayName, - argContainerImage: \$containerImage, - argContainerEntrypoint: \$containerEntrypoint, - argCpus: \$cpus, - argMemoryBytes: \$memoryBytes, - argShmBytes: \$shmBytes, - argGpuType: \$gpuType, - argGpus: \$gpus, - argGroupId: \$groupId, - argBilledTo: \$billedTo, - argNfsServerTaskId: \$nfsServerTaskId, + argDisplayName: \$displayName + argContainerImage: \$containerImage + argContainerEntrypoint: \$containerEntrypoint + argCpus: \$cpus + argMemoryBytes: \$memoryBytes + argShmBytes: \$shmBytes + argGpuType: \$gpuType + argGpus: \$gpus + argCapacityType: \$capacityType + argGroupId: \$groupId + argBilledTo: \$billedTo + argNfsServerTaskId: \$nfsServerTaskId argTargetRegion: \$targetRegion } ) { @@ -73,6 +77,7 @@ class ForchClient { "shmBytes": shmBytes == 0 ? null : shmBytes, "gpuType" : null, "gpus" : 0, + "capacityType": capacityType, "groupId": resourceGroup.toInteger(), "billedTo": billingGroup.toInteger(), "nfsServerTaskId": nfsServerTaskId, @@ -111,9 +116,8 @@ class ForchClient { Map res = client.execute(""" query GetTaskExitCode(\$taskId: BigInt!) { taskEvents( - condition: {taskId: \$taskId}, - filter: {taskEventContainerExitedDatumByIdExists: true}, - orderBy: TIME_DESC, + condition: { taskId: \$taskId, type: "container-exited" } + orderBy: TIME_DESC first: 1 ) { nodes { @@ -139,7 +143,14 @@ class ForchClient { if (nodes == null || nodes.size() == 0) return -1 - return nodes[0]["taskEventContainerExitedDatumById"]["exitStatus"] as int + def teced = nodes[0]["taskEventContainerExitedDatumById"]; + + // note(ayush): the only time we have an exit event without an exit code is if the node it was running on was killed + // either manually or via spot preemption - in either case, treat that as a system kill and provide a 137 status + if (teced == null) + return 137 + + return teced["exitStatus"] as int } void abortTasks(List taskIds) {