From 3c7ab97a44a7cf16f1c6e7fef203531a42d6c4ec Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Wed, 16 Apr 2025 17:11:17 -0700 Subject: [PATCH 01/40] init Signed-off-by: Ayush Kamat --- forch_interface.py | 167 ++++++++++++++++++ .../nextflow/executor/ExecutorFactory.groovy | 4 +- .../nextflow/forch/ForchExecutor.groovy | 31 ++++ .../nextflow/forch/ForchTaskHandler.groovy | 93 ++++++++++ .../nextflow/forch/ForchTaskMonitor.groovy | 28 +++ .../nextflow/k8s/model/PodSpecBuilder.groovy | 20 +-- .../nextflow/processor/TaskProcessor.groovy | 3 + .../groovy/nextflow/script/BindableDef.groovy | 3 +- .../nextflow/util/DispatcherClient.groovy | 18 +- .../src/main/nextflow/util/MemoryUnit.groovy | 2 +- 10 files changed, 348 insertions(+), 21 deletions(-) create mode 100755 forch_interface.py create mode 100644 modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy create mode 100644 modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy create mode 100644 modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskMonitor.groovy diff --git a/forch_interface.py b/forch_interface.py new file mode 100755 index 0000000000..b84fdeb1e3 --- /dev/null +++ b/forch_interface.py @@ -0,0 +1,167 @@ +#!/Users/ayush/Desktop/core/nucleus-workflows/.venv/bin/python + +import asyncio +import json +import sys +from dataclasses import dataclass +from typing import Literal + +from latch_config.config import PostgresConnectionConfig, read_config +from latch_data_validation.data_validation import untraced_validate +from latch_postgres.postgres import ( + LatchAsyncConnection, + get_pool, + get_with_conn_retry, + sqlq, +) + +config = read_config(PostgresConnectionConfig, "db_") + +pool = get_pool(config, "nextflow_forch_test", read_only=False) +with_conn_retry = get_with_conn_retry(pool, config) + + +@dataclass +class TaskInfo: + id: int + + +@dataclass +class CreateTaskInput: + display_name: str + container_image: str + container_entrypoint: list[str] + cpus: int + memory_bytes: int + gpu_type: str | None + gpus: int + + +async def create_task(payload: CreateTaskInput): + @with_conn_retry + async def db_work(conn: LatchAsyncConnection): + return await conn.query1( + TaskInfo, + sqlq( + """ + insert into + forch_pub.tasks( + display_name, + container_image, + container_entrypoint, + dedicated_cpuset_size, + dedicated_memory_bytes, + allow_internet_egress, + dedicated_gpu_type, + dedicated_gpu_count + ) + values + ( + %(display_name)s, + %(container_image)s, + %(container_entrypoint)s, + %(cpus)s, + %(memory_bytes)s, + true, + %(gpu_type)s, + %(gpus)s + ) + returning + id + """, + ), + display_name=payload.display_name, + container_image=payload.container_image, + container_entrypoint=payload.container_entrypoint, + cpus=payload.cpus, + memory_bytes=payload.memory_bytes, + gpu_type=payload.gpu_type, + gpus=payload.gpus, + ) + + return await db_work() + + +@dataclass +class TaskStatus: + status: Literal[ + "queued", + "initializing", + "running", + "succeeded", + "failed", + ] + + +async def get_task_status(task_id: int): + @with_conn_retry + async def db_work(conn: LatchAsyncConnection): + return await conn.query1( + TaskStatus, + sqlq( + """ + select + coalesce( + ( + select + ( + select + case + when te.type = 'node-assigned' then + 'submitted' + when te.type = 'container-created' then + 'running' + when te.type = 'container-exited' then + ( + select + case + when teced.exit_status = 0 then + 'succeeded' + else + 'failed' + end + ) + else + null + end + ) + from + forch_pub.task_events te + left join + forch_pub.task_event_container_exited_data teced + on teced.id = te.id + where + te.task_id = %(task_id)s + order by + time desc + limit 1 + ), + 'queued' + ) status + """, + ), + task_id=task_id, + ) + + return await db_work() + + +async def main(): + await pool.open() + args = sys.argv[1:] + if len(args) != 2: + print("Must provide a command") + sys.exit(1) + + if args[0] == "create": + payload = untraced_validate(json.loads(args[1]), CreateTaskInput) + res = await create_task(payload) + print(res.id) + elif args[0] == "status": + task_id = int(args[1]) + res = await get_task_status(task_id) + print(res.status) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy index 57a1535b6c..3bd3253422 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/ExecutorFactory.groovy @@ -22,6 +22,7 @@ import groovy.transform.PackageScope import groovy.util.logging.Slf4j import nextflow.Session import nextflow.executor.local.LocalExecutor +import nextflow.forch.ForchExecutor import nextflow.k8s.K8sExecutor import nextflow.script.BodyDef import nextflow.script.ProcessConfig @@ -61,7 +62,8 @@ class ExecutorFactory { 'nqsii': NqsiiExecutor, 'moab': MoabExecutor, 'oar': OarExecutor, - 'hq': HyperQueueExecutor + 'hq': HyperQueueExecutor, + 'forch': ForchExecutor, ] @PackageScope Map> executorsMap diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy new file mode 100644 index 0000000000..8c893a6e84 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy @@ -0,0 +1,31 @@ +package nextflow.forch + +import groovy.util.logging.Slf4j +import nextflow.executor.Executor +import nextflow.processor.TaskHandler +import nextflow.processor.TaskMonitor +import nextflow.processor.TaskPollingMonitor +import nextflow.processor.TaskRun +import nextflow.util.DispatcherClient +import nextflow.util.Duration + +@Slf4j +class ForchExecutor extends Executor { + + @Override + protected TaskMonitor createTaskMonitor() { + return TaskPollingMonitor.create(session, name, 100, Duration.of("15s")) + } + + @Override + protected void register() { + // todo(ayush): decouple dispatcher and executor + this.dispatcherClient = new DispatcherClient() + this.dispatcherClient.debug = true + } + + @Override + TaskHandler createTaskHandler(TaskRun task) { + return new ForchTaskHandler(task) + } +} diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy new file mode 100644 index 0000000000..d68b6988be --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -0,0 +1,93 @@ +package nextflow.forch + +import java.util.concurrent.TimeUnit + +import groovy.json.JsonBuilder +import groovy.util.logging.Slf4j +import nextflow.executor.BashWrapperBuilder +import nextflow.executor.res.AcceleratorResource +import nextflow.processor.TaskHandler +import nextflow.processor.TaskRun +import nextflow.processor.TaskStatus +import nextflow.script.ProcessConfig +import nextflow.util.Escape +import nextflow.util.MemoryUnit + +@Slf4j +class ForchTaskHandler extends TaskHandler { + + ProcessConfig processConfig + + Integer forchTaskId + + + ForchTaskHandler(TaskRun task) { + super(task) + + this.processConfig = task.processor.config + } + + private String getCurrentStatus() { + if (this.forchTaskId == null) return + + String command = "forch status ${forchTaskId}" + StringBuilder stdout = new StringBuilder(), stderr = new StringBuilder(); + Process proc = command.execute() + + proc.consumeProcessOutput(stdout, stderr) + proc.waitFor(5, TimeUnit.SECONDS) + + return stdout.toString().trim() + } + + @Override + boolean checkIfRunning() { + return this.currentStatus == 'running' + } + + @Override + boolean checkIfCompleted() { + return this.currentStatus == 'succeeded' || this.currentStatus == 'failed' + } + + @Override + void kill() { + // noop + } + + @Override + void submit() { + JsonBuilder builder = new JsonBuilder() + + int cpus = task.config.getCpus() + MemoryUnit memory = task.config.getMemory() ?: MemoryUnit.of("2GiB") + + // todo(ayush): gpu support + // AcceleratorResource acc = task.config.getAccelerator() + + builder([ + "display_name": this.task.name, + "container_image": this.task.container, + "container_entrypoint": [ + "/bin/bash", + "-ue", + "${Escape.path(task.workDir)}/${TaskRun.CMD_RUN}" + ], + "cpus": cpus, + "memory_bytes": memory.bytes, + "gpu_type": null, + "gpus": 0, + ]) + + List command = ["forch", "create", builder.toString()] + StringBuilder stdout = new StringBuilder(), stderr = new StringBuilder(); + Process proc = command.execute() + + proc.consumeProcessOutput(stdout, stderr) + proc.waitFor(5, TimeUnit.SECONDS) + + log.debug("${task.name} taskExecutionId: $stdout, err: $stderr") + + this.forchTaskId = Integer.parseInt(stdout.toString().trim()) + } +} diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskMonitor.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskMonitor.groovy new file mode 100644 index 0000000000..35150dd9d9 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskMonitor.groovy @@ -0,0 +1,28 @@ +package nextflow.forch + +import groovy.util.logging.Slf4j +import nextflow.processor.TaskHandler +import nextflow.processor.TaskMonitor + +@Slf4j +class ForchTaskMonitor implements TaskMonitor { + @Override + void schedule(TaskHandler handler) { + + } + + @Override + boolean evict(TaskHandler handler) { + return false + } + + @Override + TaskMonitor start() { + return null + } + + @Override + void signal() { + // noop + } +} diff --git a/modules/nextflow/src/main/groovy/nextflow/k8s/model/PodSpecBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/k8s/model/PodSpecBuilder.groovy index a443e50470..615d8e2542 100644 --- a/modules/nextflow/src/main/groovy/nextflow/k8s/model/PodSpecBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/k8s/model/PodSpecBuilder.groovy @@ -671,7 +671,7 @@ class PodSpecBuilder { @PackageScope - void validateAccelerator(AcceleratorResource accelerator) { + static void validateAccelerator(AcceleratorResource accelerator) { // gpu-small: nvidia-t4 (1) // gpu-large: nvidia-a10g (1) // v100-x1: nvidia-v100 (1) @@ -687,15 +687,15 @@ class PodSpecBuilder { } throw new VerifyError("""\ -Invalid GPU configuration. Latch only allows the following combinations: - - accelerator 1, type: "nvidia-t4" - - accelerator 1, type: "nvidia-a10g" - - accelerator 1, type: "nvidia-v100" - - accelerator 4, type: "nvidia-v100" - - accelerator 8, type: "nvidia-v100" - -You provided ${accelerator.type}, ${accelerator.limit} - """) + Invalid GPU configuration. Latch only allows the following combinations: + - accelerator 1, type: "nvidia-t4" + - accelerator 1, type: "nvidia-a10g" + - accelerator 1, type: "nvidia-v100" + - accelerator 4, type: "nvidia-v100" + - accelerator 8, type: "nvidia-v100" + + You provided ${accelerator.type}, ${accelerator.limit} + """.stripIndent().trim()) } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index 489601cc59..a937764056 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -17,6 +17,8 @@ package nextflow.processor import nextflow.file.http.LatchPath import java.nio.file.StandardCopyOption + +import nextflow.k8s.K8sExecutor import nextflow.trace.TraceRecord import static nextflow.processor.ErrorStrategy.* @@ -332,6 +334,7 @@ class TaskProcessor { this.numTasks = 0 this.dispatcherClient = new DispatcherClient() + this.dispatcherClient.debug = !(executor instanceof K8sExecutor) this.maxForks = config.maxForks && config.maxForks>0 ? config.maxForks as int : 0 this.forksCount = maxForks ? new LongAdder() : null diff --git a/modules/nextflow/src/main/groovy/nextflow/script/BindableDef.groovy b/modules/nextflow/src/main/groovy/nextflow/script/BindableDef.groovy index e9b9fc502c..b91da88f48 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/BindableDef.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/BindableDef.groovy @@ -40,8 +40,9 @@ abstract class BindableDef extends ComponentDef { // use this instance an workflow template, therefore clone it final String prefix = ExecutionStack.workflow()?.name final fqName = prefix ? prefix+SCOPE_SEP+name : name + log.debug("bindable fqName: $fqName") if( this instanceof ProcessDef && !invocations.add(fqName) ) { - log.debug "Bindable invocations=$invocations" + log.debug "Bindable invocations=$invocations, ${this.toString()}" final msg = "Process '$name' has been already used -- If you need to reuse the same component, include it with a different name or include it in a different workflow context" throw new DuplicateProcessInvocation(msg) } diff --git a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy index 318c5f5460..308c6a3f6b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy @@ -10,14 +10,16 @@ class DispatcherClient { private GQLClient client = new GQLClient() + public boolean debug = System.getenv("LATCH_NF_DEBUG") == "true" + int createProcessNode(String processName) { - if (System.getenv("LATCH_NF_DEBUG") == "true") { + if (debug) { return 1 } String executionToken = System.getenv("FLYTE_INTERNAL_EXECUTION_ID") if (executionToken == null) - throw new RuntimeException("unable to get execution token") + throw new RuntimeException("unable to get execution token") Map res = client.execute(""" mutation CreateNode(\$executionToken: String!, \$name: String!) { @@ -39,7 +41,7 @@ class DispatcherClient { } void closeProcessNode(int nodeId, int numTasks) { - if (System.getenv("LATCH_NF_DEBUG") == "true") { + if (debug) { return } @@ -65,7 +67,7 @@ class DispatcherClient { } void createProcessEdge(int from, int to) { - if (System.getenv("LATCH_NF_DEBUG") == "true") { + if (debug) { return } @@ -91,7 +93,7 @@ class DispatcherClient { } int createProcessTask(int processNodeId, int index, String tag) { - if (System.getenv("LATCH_NF_DEBUG") == "true") { + if (debug) { return 1 } @@ -150,7 +152,7 @@ class DispatcherClient { } int createTaskExecution(int taskId, int attemptIdx, String hash, String status = null) { - if (System.getenv("LATCH_NF_DEBUG") == "true") { + if (debug) { return 1 } @@ -241,7 +243,7 @@ class DispatcherClient { } void updateTaskStatus(int taskExecutionId, String status) { - if (System.getenv("LATCH_NF_DEBUG") == "true") { + if (debug) { return } @@ -267,7 +269,7 @@ class DispatcherClient { } Map getTaskStatus(int taskExecutionId) { - if (System.getenv("LATCH_NF_DEBUG") == "true") { + if (debug) { return null } diff --git a/modules/nf-commons/src/main/nextflow/util/MemoryUnit.groovy b/modules/nf-commons/src/main/nextflow/util/MemoryUnit.groovy index 26495a81f4..dede90ba00 100644 --- a/modules/nf-commons/src/main/nextflow/util/MemoryUnit.groovy +++ b/modules/nf-commons/src/main/nextflow/util/MemoryUnit.groovy @@ -33,7 +33,7 @@ class MemoryUnit implements Comparable, Serializable, Cloneable { final static public MemoryUnit ZERO = new MemoryUnit(0) - final static private Pattern FORMAT = ~/([0-9\.]+)\s*(\S)?B?/ + final static private Pattern FORMAT = ~/([0-9\.]+)\s*(\S)?i?B?/ final static public List UNITS = [ "B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB" ] From dcbcd287f1602b801530c341c79d9cda8bde477f Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Fri, 25 Apr 2025 17:02:16 -0700 Subject: [PATCH 02/40] dur Signed-off-by: Ayush Kamat --- forch_interface.py | 33 ++++++ .../executor/BashWrapperBuilder.groovy | 8 +- .../nextflow/forch/ForchExecutor.groovy | 17 ++- .../forch/ForchFileCopyStrategy.groovy | 104 ++++++++++++++++++ .../nextflow/forch/ForchTaskHandler.groovy | 61 ++++++++-- .../forch/ForchTaskWrapperBuilder.groovy | 29 +++++ 6 files changed, 235 insertions(+), 17 deletions(-) create mode 100644 modules/nextflow/src/main/groovy/nextflow/forch/ForchFileCopyStrategy.groovy create mode 100644 modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskWrapperBuilder.groovy diff --git a/forch_interface.py b/forch_interface.py index b84fdeb1e3..7693242580 100755 --- a/forch_interface.py +++ b/forch_interface.py @@ -146,6 +146,35 @@ async def db_work(conn: LatchAsyncConnection): return await db_work() +@dataclass +class TaskExitCode: + exit_status: int | None + + +async def get_task_exit_code(task_id: int): + @with_conn_retry + async def db_work(conn: LatchAsyncConnection): + return await conn.query1( + TaskExitCode, + sqlq( + """ + select + teced.exit_status + from + forch_pub.task_events te + inner join + forch_pub.task_event_container_exited_data teced + on teced.id = te.id + where + te.task_id = %(task_id)s + """, + ), + task_id=task_id, + ) + + return await db_work() + + async def main(): await pool.open() args = sys.argv[1:] @@ -161,6 +190,10 @@ async def main(): task_id = int(args[1]) res = await get_task_status(task_id) print(res.status) + elif args[0] == "exitcode": + task_id = int(args[1]) + res = await get_task_exit_code(task_id) + print(res.exit_status) if __name__ == "__main__": diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy index 196ad9cfa5..92c34659ac 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy @@ -444,14 +444,8 @@ class BashWrapperBuilder { int attempt=0 while( true ) { try { - // note(taras): always sync to disk to ensure that the file is visible to other clients - try( - FileOutputStream fos = new FileOutputStream(path.toFile()); - BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos)) - ) { + try (BufferedWriter writer=Files.newBufferedWriter(path, CREATE,WRITE,TRUNCATE_EXISTING)) { writer.write(data) - writer.flush() - fos.getFD().sync() } return path } diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy index 8c893a6e84..6b915e2b06 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy @@ -1,7 +1,11 @@ package nextflow.forch +import java.nio.file.Path + import groovy.util.logging.Slf4j import nextflow.executor.Executor +import nextflow.extension.FilesEx +import nextflow.file.FileHelper import nextflow.processor.TaskHandler import nextflow.processor.TaskMonitor import nextflow.processor.TaskPollingMonitor @@ -12,6 +16,8 @@ import nextflow.util.Duration @Slf4j class ForchExecutor extends Executor { + Path remoteBinDir = null + @Override protected TaskMonitor createTaskMonitor() { return TaskPollingMonitor.create(session, name, 100, Duration.of("15s")) @@ -22,10 +28,19 @@ class ForchExecutor extends Executor { // todo(ayush): decouple dispatcher and executor this.dispatcherClient = new DispatcherClient() this.dispatcherClient.debug = true + uploadBinDir() } @Override TaskHandler createTaskHandler(TaskRun task) { - return new ForchTaskHandler(task) + return new ForchTaskHandler(task, remoteBinDir) + } + + protected void uploadBinDir() { + if( session.binDir && !session.binDir.empty() ) { + def s3 = getTempDir() + log.info "Uploading local `bin` scripts folder to ${s3.toUriString()}/bin" + remoteBinDir = FilesEx.copyTo(session.binDir, s3) + } } } diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchFileCopyStrategy.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchFileCopyStrategy.groovy new file mode 100644 index 0000000000..5a3d4ef764 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchFileCopyStrategy.groovy @@ -0,0 +1,104 @@ +package nextflow.forch + +import java.nio.file.Path + +import nextflow.executor.BashFunLib +import nextflow.executor.SimpleFileCopyStrategy +import nextflow.util.Escape + +class ForchFileCopyStrategy extends SimpleFileCopyStrategy { + + @Override + String getBeforeStartScript() { + def lib = new BashFunLib().coreLib() + + return lib + "\n\n" + """\ + nxf_s5cmd_upload() { + local name=\$1 + local s3path=\$2 + if [[ "\$name" == - ]]; then + echo 's5cmd --no-verify-ssl pipe "\$s3path"' + s5cmd --no-verify-ssl pipe "\$s3path" + elif [[ -d "\$name" ]]; then + s5cmd --no-verify-ssl cp "\$name" "\$s3path/" + else + s5cmd --no-verify-ssl cp "\$name" "\$s3path/\$name" + fi + } + + nxf_s5cmd_download() { + local source=\$1 + local target=\$2 + local file_name=\$(basename \$1) + local is_dir=\$(s5cmd --no-verify-ssl ls \$source | grep -F "DIR \${file_name}/" -c) + if [[ \$is_dir == 1 ]]; then + s5cmd --no-verify-ssl cp "\$source/*" "\$target" + else + s5cmd --no-verify-ssl cp "\$source" "\$target" + fi + } + + + """.stripIndent() + } + + @Override + String getStageInputFilesScript(Map inputFiles) { + def result = 'downloads=(true)\n' + result += super.getStageInputFilesScript(inputFiles) + '\n' + result += 'nxf_parallel "${downloads[@]}"\n' + return result + } + + @Override + protected String stageInCommand(String source, String target, String mode) { + return "downloads+=(\"nxf_s5cmd_download s3:/${Escape.path(source)} ${Escape.path(target)}\")" + } + + @Override + String getUnstageOutputFilesScript(List outputFiles, Path targetDir) { + final patterns = normalizeGlobStarPaths(outputFiles) + + if( !patterns ) + return null + + final escape = new ArrayList(outputFiles.size()) + for( String it : patterns ) + escape.add( Escape.path(it) ) + + return """\ + uploads=() + IFS=\$'\\n' + for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do + uploads+=("nxf_s5cmd_upload '\$name' s3:/${Escape.path(targetDir)}") + done + unset IFS + nxf_parallel "\${uploads[@]}" + """.stripIndent(true) + } + + @Override + String touchFile(Path file) { + return "echo start | s5cmd --no-verify-ssl pipe s3:/${Escape.path(file)}" + } + + @Override + String fileStr( Path path ) { + Escape.path(path.getFileName()) + } + + @Override + String copyFile( String name, Path target ) { + "s5cmd --no-verify-ssl cp ${Escape.path(name)} s3:/${Escape.path(target)}" + } + + @Override + String exitFile(Path file) { + return "| s5cmd --no-verify-ssl pipe s3:/${Escape.path(file)} || true" + } + + @Override + String pipeInputFile(Path file) { + return " < ${Escape.path(file.getFileName())}" + } +} diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy index d68b6988be..8230c70170 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -1,11 +1,15 @@ package nextflow.forch +import java.nio.file.Path import java.util.concurrent.TimeUnit import groovy.json.JsonBuilder import groovy.util.logging.Slf4j +import nextflow.exception.ProcessException +import nextflow.exception.ProcessUnrecoverableException import nextflow.executor.BashWrapperBuilder import nextflow.executor.res.AcceleratorResource +import nextflow.file.FileHelper import nextflow.processor.TaskHandler import nextflow.processor.TaskRun import nextflow.processor.TaskStatus @@ -20,17 +24,17 @@ class ForchTaskHandler extends TaskHandler { Integer forchTaskId + Path remoteBinDir = null - ForchTaskHandler(TaskRun task) { + + ForchTaskHandler(TaskRun task, Path remoteBinDir) { super(task) this.processConfig = task.processor.config + this.remoteBinDir = remoteBinDir } - private String getCurrentStatus() { - if (this.forchTaskId == null) return - - String command = "forch status ${forchTaskId}" + private String subprocess(String command) { StringBuilder stdout = new StringBuilder(), stderr = new StringBuilder(); Process proc = command.execute() @@ -40,14 +44,34 @@ class ForchTaskHandler extends TaskHandler { return stdout.toString().trim() } + private String getCurrentStatus() { + if (this.forchTaskId == null) return + + return subprocess("forch status ${forchTaskId}") + } + @Override boolean checkIfRunning() { - return this.currentStatus == 'running' + def running = this.currentStatus == 'running' + if (running) + status = TaskStatus.RUNNING + return running } @Override boolean checkIfCompleted() { - return this.currentStatus == 'succeeded' || this.currentStatus == 'failed' + def cur = this.currentStatus + if (cur != "succeeded" && cur != "failed") return false + + // todo(ayush): single query + def exitStatus = subprocess("forch exitcode ${forchTaskId}") + task.exitStatus = Integer.parseInt(exitStatus) + + // todo(ayush): logs, retries + task.stdout = "" + task.stderr = "" + status = TaskStatus.COMPLETED + return true } @Override @@ -55,6 +79,11 @@ class ForchTaskHandler extends TaskHandler { // noop } + @Override + void prepareLauncher() { + new ForchTaskWrapperBuilder(this.task.toTaskBean()).build() + } + @Override void submit() { JsonBuilder builder = new JsonBuilder() @@ -65,13 +94,27 @@ class ForchTaskHandler extends TaskHandler { // todo(ayush): gpu support // AcceleratorResource acc = task.config.getAccelerator() + String cmd = """\ + trap "{ ret=\$?; s5cmd cp ${TaskRun.CMD_LOG} ${task.workDir.toUriString()}/${TaskRun.CMD_LOG}||true; exit \$ret; }" EXIT; + s5cmd --no-verify-ssl cat ${task.workDir.toUriString()}/${TaskRun.CMD_RUN} | bash 2>&1 | tee ${TaskRun.CMD_LOG} + """.stripIndent().trim() + + if (remoteBinDir != null) { + cmd = """\ + s5cmd --no-verify-ssl cp s3:/${remoteBinDir}/* /nextflow-bin + chmod +x /nextflow-bin/* || true + export PATH=/nextflow-bin:\$PATH + + """ + cmd + } + builder([ "display_name": this.task.name, "container_image": this.task.container, "container_entrypoint": [ "/bin/bash", - "-ue", - "${Escape.path(task.workDir)}/${TaskRun.CMD_RUN}" + "-c", + cmd, ], "cpus": cpus, "memory_bytes": memory.bytes, diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskWrapperBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskWrapperBuilder.groovy new file mode 100644 index 0000000000..b69f06492a --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskWrapperBuilder.groovy @@ -0,0 +1,29 @@ +package nextflow.forch + +import java.nio.file.Path + +import nextflow.executor.BashWrapperBuilder +import nextflow.processor.TaskBean +import nextflow.processor.TaskRun + +class ForchTaskWrapperBuilder extends BashWrapperBuilder { + // entirely lifted from AWS Batch Wrapper + ForchTaskWrapperBuilder(TaskBean bean) { + super(bean, new ForchFileCopyStrategy()) + // enable the copying of output file to the S3 work dir + if( scratch==null ) + scratch = true + + // include task script as an input to force its staging in the container work directory + bean.inputFiles[TaskRun.CMD_SCRIPT] = bean.workDir.resolve(TaskRun.CMD_SCRIPT) + // add the wrapper file when stats are enabled + // NOTE: this must match the logic that uses the run script in BashWrapperBuilder + if( isTraceRequired() ) { + bean.inputFiles[TaskRun.CMD_RUN] = bean.workDir.resolve(TaskRun.CMD_RUN) + } + // include task stdin file + if( bean.input != null ) { + bean.inputFiles[TaskRun.CMD_INFILE] = bean.workDir.resolve(TaskRun.CMD_INFILE) + } + } +} From 000c376f0882d34fae09ee6347161b5f4da9c1d5 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Thu, 22 May 2025 09:25:29 -0700 Subject: [PATCH 03/40] push forgotten changes Signed-off-by: Ayush Kamat --- .../groovy/nextflow/executor/LatchPathFactory.groovy | 2 +- .../groovy/nextflow/forch/ForchTaskHandler.groovy | 11 ++++++----- .../nextflow/forch/ForchTaskWrapperBuilder.groovy | 3 ++- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/LatchPathFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/LatchPathFactory.groovy index 0c5de932b1..001cf99dc1 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/LatchPathFactory.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/LatchPathFactory.groovy @@ -41,7 +41,7 @@ class LatchPathFactory extends FileSystemPathFactory { @Override protected String getBashLib(Path target) { - if (target.scheme != "latch") { + if (target == null || target.scheme != "latch") { return null } diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy index 8230c70170..c1684f4b88 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -95,17 +95,18 @@ class ForchTaskHandler extends TaskHandler { // AcceleratorResource acc = task.config.getAccelerator() String cmd = """\ - trap "{ ret=\$?; s5cmd cp ${TaskRun.CMD_LOG} ${task.workDir.toUriString()}/${TaskRun.CMD_LOG}||true; exit \$ret; }" EXIT; - s5cmd --no-verify-ssl cat ${task.workDir.toUriString()}/${TaskRun.CMD_RUN} | bash 2>&1 | tee ${TaskRun.CMD_LOG} + trap "{ ret=\$?; cp ${TaskRun.CMD_LOG} ${task.workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }" EXIT; + cat ${task.workDir}/${TaskRun.CMD_RUN} | bash 2>&1 | tee ${TaskRun.CMD_LOG} """.stripIndent().trim() if (remoteBinDir != null) { cmd = """\ - s5cmd --no-verify-ssl cp s3:/${remoteBinDir}/* /nextflow-bin - chmod +x /nextflow-bin/* || true + mkdir -p /nextflow-bin + cp ${remoteBinDir}/* /nextflow-bin + chmod +x /nextflow-bin/* export PATH=/nextflow-bin:\$PATH - """ + cmd + """.stripIndent() + cmd } builder([ diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskWrapperBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskWrapperBuilder.groovy index b69f06492a..64556c9b8a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskWrapperBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskWrapperBuilder.groovy @@ -3,13 +3,14 @@ package nextflow.forch import java.nio.file.Path import nextflow.executor.BashWrapperBuilder +import nextflow.executor.SimpleFileCopyStrategy import nextflow.processor.TaskBean import nextflow.processor.TaskRun class ForchTaskWrapperBuilder extends BashWrapperBuilder { // entirely lifted from AWS Batch Wrapper ForchTaskWrapperBuilder(TaskBean bean) { - super(bean, new ForchFileCopyStrategy()) + super(bean, new SimpleFileCopyStrategy()) // enable the copying of output file to the S3 work dir if( scratch==null ) scratch = true From 763916f3084fa0c89d9e27c0363bc35f46d56166 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Thu, 29 May 2025 12:37:38 -0700 Subject: [PATCH 04/40] update forch cli Signed-off-by: Ayush Kamat --- forch_interface.py => forch | 77 +++++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 25 deletions(-) rename forch_interface.py => forch (72%) diff --git a/forch_interface.py b/forch similarity index 72% rename from forch_interface.py rename to forch index 7693242580..5febb94573 100755 --- a/forch_interface.py +++ b/forch @@ -1,4 +1,4 @@ -#!/Users/ayush/Desktop/core/nucleus-workflows/.venv/bin/python +#!/opt/conda/envs/workflow/bin/python import asyncio import json @@ -15,10 +15,17 @@ sqlq, ) -config = read_config(PostgresConnectionConfig, "db_") -pool = get_pool(config, "nextflow_forch_test", read_only=False) -with_conn_retry = get_with_conn_retry(pool, config) +@dataclass +class AppConfig: + db: PostgresConnectionConfig + nfs_server_task_id: int + + +config = read_config(AppConfig) + +pool = get_pool(config.db, "nextflow_forch_test", read_only=False) +with_conn_retry = get_with_conn_retry(pool, config.db) @dataclass @@ -44,30 +51,49 @@ async def db_work(conn: LatchAsyncConnection): TaskInfo, sqlq( """ - insert into - forch_pub.tasks( - display_name, - container_image, - container_entrypoint, - dedicated_cpuset_size, - dedicated_memory_bytes, - allow_internet_egress, - dedicated_gpu_type, - dedicated_gpu_count + with + task as ( + insert into + forch_pub.tasks( + display_name, + container_image, + container_entrypoint, + dedicated_cpuset_size, + dedicated_memory_bytes, + allow_internet_egress, + dedicated_gpu_type, + dedicated_gpu_count + ) + values + ( + %(display_name)s, + %(container_image)s, + %(container_entrypoint)s, + %(cpus)s, + %(memory_bytes)s, + true, + %(gpu_type)s, + %(gpus)s + ) + returning + id ) - values - ( - %(display_name)s, - %(container_image)s, - %(container_entrypoint)s, - %(cpus)s, - %(memory_bytes)s, - true, - %(gpu_type)s, - %(gpus)s + insert into + forch_pub.nfs_shares( + client_task_id, + server_task_id, + container_mount_path, + host_mountpoint_subpath ) + select + id, + %(nfs_server_task_id)s, + '/nf-workdir'::bytea, + 'nfs/work'::bytea + from + task returning - id + client_task_id as id """, ), display_name=payload.display_name, @@ -77,6 +103,7 @@ async def db_work(conn: LatchAsyncConnection): memory_bytes=payload.memory_bytes, gpu_type=payload.gpu_type, gpus=payload.gpus, + nfs_server_task_id=config.nfs_server_task_id, ) return await db_work() From 00802f69c6543aeb8241e11c5857526654e72fba Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Mon, 9 Jun 2025 15:52:22 -0700 Subject: [PATCH 05/40] forch make gql queries via forch-client token --- .../nextflow/forch/ForchExecutor.groovy | 2 +- .../nextflow/forch/ForchTaskHandler.groovy | 57 ++------ .../nextflow/util/DispatcherClient.groovy | 129 ++++++++++++++++++ .../nextflow/file/http/LatchPathUtils.groovy | 4 + 4 files changed, 149 insertions(+), 43 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy index 6b915e2b06..43b2833ee0 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy @@ -33,7 +33,7 @@ class ForchExecutor extends Executor { @Override TaskHandler createTaskHandler(TaskRun task) { - return new ForchTaskHandler(task, remoteBinDir) + return new ForchTaskHandler(task, this.dispatcherClient, remoteBinDir) } protected void uploadBinDir() { diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy index c1684f4b88..367c64f5b2 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -1,20 +1,17 @@ package nextflow.forch +import nextflow.file.http.GQLClient +import nextflow.util.DispatcherClient + import java.nio.file.Path import java.util.concurrent.TimeUnit import groovy.json.JsonBuilder import groovy.util.logging.Slf4j -import nextflow.exception.ProcessException -import nextflow.exception.ProcessUnrecoverableException -import nextflow.executor.BashWrapperBuilder -import nextflow.executor.res.AcceleratorResource -import nextflow.file.FileHelper import nextflow.processor.TaskHandler import nextflow.processor.TaskRun import nextflow.processor.TaskStatus import nextflow.script.ProcessConfig -import nextflow.util.Escape import nextflow.util.MemoryUnit @Slf4j @@ -26,28 +23,20 @@ class ForchTaskHandler extends TaskHandler { Path remoteBinDir = null + private DispatcherClient dispatcherClient - ForchTaskHandler(TaskRun task, Path remoteBinDir) { + ForchTaskHandler(TaskRun task, DispatcherClient client, Path remoteBinDir) { super(task) this.processConfig = task.processor.config this.remoteBinDir = remoteBinDir - } - - private String subprocess(String command) { - StringBuilder stdout = new StringBuilder(), stderr = new StringBuilder(); - Process proc = command.execute() - - proc.consumeProcessOutput(stdout, stderr) - proc.waitFor(5, TimeUnit.SECONDS) - - return stdout.toString().trim() + this.dispatcherClient = client } private String getCurrentStatus() { if (this.forchTaskId == null) return - return subprocess("forch status ${forchTaskId}") + return this.dispatcherClient.forchGetTaskStatus(this.forchTaskId) } @Override @@ -64,8 +53,7 @@ class ForchTaskHandler extends TaskHandler { if (cur != "succeeded" && cur != "failed") return false // todo(ayush): single query - def exitStatus = subprocess("forch exitcode ${forchTaskId}") - task.exitStatus = Integer.parseInt(exitStatus) + task.exitStatus = this.dispatcherClient.forchGetExitCode(this.forchTaskId) // todo(ayush): logs, retries task.stdout = "" @@ -86,8 +74,6 @@ class ForchTaskHandler extends TaskHandler { @Override void submit() { - JsonBuilder builder = new JsonBuilder() - int cpus = task.config.getCpus() MemoryUnit memory = task.config.getMemory() ?: MemoryUnit.of("2GiB") @@ -109,29 +95,16 @@ class ForchTaskHandler extends TaskHandler { """.stripIndent() + cmd } - builder([ - "display_name": this.task.name, - "container_image": this.task.container, - "container_entrypoint": [ + this.forchTaskId = this.dispatcherClient.forchSubmitTask( + this.task.name, + this.task.container, + [ "/bin/bash", "-c", cmd, ], - "cpus": cpus, - "memory_bytes": memory.bytes, - "gpu_type": null, - "gpus": 0, - ]) - - List command = ["forch", "create", builder.toString()] - StringBuilder stdout = new StringBuilder(), stderr = new StringBuilder(); - Process proc = command.execute() - - proc.consumeProcessOutput(stdout, stderr) - proc.waitFor(5, TimeUnit.SECONDS) - - log.debug("${task.name} taskExecutionId: $stdout, err: $stderr") - - this.forchTaskId = Integer.parseInt(stdout.toString().trim()) + cpus, + memory.bytes + ) } } diff --git a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy index 308c6a3f6b..5d6154033d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy @@ -294,4 +294,133 @@ class DispatcherClient { return res } + + + int forchSubmitTask( + String displayName, + String image, + List entrypoint, + int cpus, + long memoryBytes + ) { + // todo(rahul): get resource/billing groups from env + Map res = client.execute(""" + mutation CreateForchTask( + \$displayName: String!, + \$containerImage: String!, + \$containerEntrypoint: [String]!, + \$cpus: Int!, + \$memoryBytes: BigInt!, + \$dedicatedGpuType: String, + \$dedicatedGpuCount: Int! + ) { + createTask( + input: { + task: { + displayName: \$displayName, + containerImage: \$containerImage, + containerEntrypoint: \$containerEntrypoint, + dedicatedCpusetSize: \$cpus, + dedicatedMemoryBytes: \$memoryBytes, + allowInternetEgress: true, + dedicatedGpuType: \$gpuType, + dedicatedGpuCount: \$gpus + } + } + ) { + task { + id + } + } + } + """, + [ + "displayName" : displayName, + "containerImage" : image, + "containerEntrypoint" : entrypoint, + "cpus" : cpus, + "memoryBytes" : memoryBytes, + "gpuType" : null, + "gpus" : 0, + ] + )["createTask"] as Map + + if (res == null) + throw new RuntimeException("failed to create forch task") + + return ((res.task as Map).id as String).toInteger() + } + + String forchGetTaskStatus(int forchTaskId) { + List res = client.execute(""" + query GetTaskStatus(\$taskId: BigInt!) { + taskEvents(condition: {taskId: \$taskId}, orderBy: TIME_DESC, first: 1) { + id + type + taskEventContainerExitedDatumById { + id + exitStatus + } + } + } + """, + [ + taskId: forchTaskId + ] + )["taskEvents"] as List + + if (res == null) + throw new RuntimeException("failed to get task events for ${forchTaskId}") + + if (res.size() == 0) + return "queued" + + // todo(rahul): might be a good idea to throw this logic into a vac function so that we can easily update + String eventType = res[0]["type"] + if (eventType == "node-assigned") + return "submitted" + if (eventType == "container-created") + return "running" + if (eventType == "container-exited") { + if ((res[0]["taskEventContainerExitedDatumById"]["exitStatus"] as int) == 0) { + return "succeeded" + } else { + return "failed" + } + } + + return "queued" + } + + int forchGetExitCode(int forchTaskId) { + List res = client.execute(""" + query GetTaskExitCode(\$taskId: BigInt!) { + taskEvents( + condition: {taskId: \$taskId}, + filter: {taskEventContainerExitedDatumByIdExists: true}, + orderBy: TIME_DESC, + first: 1 + ) { + id + type + taskEventContainerExitedDatumById { + id + exitStatus + } + } + } + """, + [ + taskId: forchTaskId + ] + )["taskEvents"] as List + + if (res == null) + throw new RuntimeException("failed to get exit code for ${forchTaskId}") + + if (res.size() == 0) + return -1 + + return res[0]["taskEventContainerExitedDatumById"]["exitStatus"] as int + } } diff --git a/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy b/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy index 3e89103a58..669ba2aa3c 100644 --- a/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy +++ b/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy @@ -8,6 +8,10 @@ class LatchPathUtils { static class UnauthenticatedException extends Exception {} static String getAuthHeader() { + def forchToken = System.getenv("FORCH_AUTH_TOKEN") + if (forchToken != null) + return "Forch-Auth-Token $forchToken" + def flyteToken = System.getenv("FLYTE_INTERNAL_EXECUTION_ID") if (flyteToken != null) return "Latch-Execution-Token $flyteToken" From 7838be40533bf0dd732f3d4131be716bfbabf0be Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Mon, 9 Jun 2025 16:20:47 -0700 Subject: [PATCH 06/40] add resource and billing group ids --- .../nextflow/util/DispatcherClient.groovy | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy index 5d6154033d..c47db5338b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy @@ -303,7 +303,14 @@ class DispatcherClient { int cpus, long memoryBytes ) { - // todo(rahul): get resource/billing groups from env + String resourceGroup = System.getenv("FORCH_RESOURCE_GROUP_ID") + if (resourceGroup == null) + throw new RuntimeException("unable to get resource group") + + String billingGroup = System.getenv("FORCH_BILLING_GROUP_ID") + if (billingGroup == null) + throw new RuntimeException("unable to get billing group") + Map res = client.execute(""" mutation CreateForchTask( \$displayName: String!, @@ -312,7 +319,9 @@ class DispatcherClient { \$cpus: Int!, \$memoryBytes: BigInt!, \$dedicatedGpuType: String, - \$dedicatedGpuCount: Int! + \$dedicatedGpuCount: Int!, + \$groupId: BigInt!, + \$billedTo: BigInt! ) { createTask( input: { @@ -324,7 +333,9 @@ class DispatcherClient { dedicatedMemoryBytes: \$memoryBytes, allowInternetEgress: true, dedicatedGpuType: \$gpuType, - dedicatedGpuCount: \$gpus + dedicatedGpuCount: \$gpus, + groupId: \$groupId, + billedTo: \$billedTo } } ) { @@ -342,6 +353,8 @@ class DispatcherClient { "memoryBytes" : memoryBytes, "gpuType" : null, "gpus" : 0, + "groupId": resourceGroup.toInteger(), + "billedTo": billingGroup.toInteger() ] )["createTask"] as Map From b8eba806456dedf59fe8d0744dd085dda002e72d Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Tue, 10 Jun 2025 13:48:12 -0700 Subject: [PATCH 07/40] fixes --- .../nextflow/util/DispatcherClient.groovy | 47 +++++++++++-------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy index c47db5338b..cf35a7e07a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy @@ -318,8 +318,8 @@ class DispatcherClient { \$containerEntrypoint: [String]!, \$cpus: Int!, \$memoryBytes: BigInt!, - \$dedicatedGpuType: String, - \$dedicatedGpuCount: Int!, + \$gpuType: String, + \$gpus: Int!, \$groupId: BigInt!, \$billedTo: BigInt! ) { @@ -335,7 +335,8 @@ class DispatcherClient { dedicatedGpuType: \$gpuType, dedicatedGpuCount: \$gpus, groupId: \$groupId, - billedTo: \$billedTo + billedTo: \$billedTo, + debugTag: "rahul", } } ) { @@ -365,14 +366,16 @@ class DispatcherClient { } String forchGetTaskStatus(int forchTaskId) { - List res = client.execute(""" + Map res = client.execute(""" query GetTaskStatus(\$taskId: BigInt!) { taskEvents(condition: {taskId: \$taskId}, orderBy: TIME_DESC, first: 1) { - id - type - taskEventContainerExitedDatumById { + nodes { id - exitStatus + type + taskEventContainerExitedDatumById { + id + exitStatus + } } } } @@ -380,22 +383,23 @@ class DispatcherClient { [ taskId: forchTaskId ] - )["taskEvents"] as List + )["taskEvents"] as Map if (res == null) throw new RuntimeException("failed to get task events for ${forchTaskId}") - if (res.size() == 0) + List nodes = res["nodes"] as List + if (nodes == null || nodes.size() == 0) return "queued" // todo(rahul): might be a good idea to throw this logic into a vac function so that we can easily update - String eventType = res[0]["type"] + String eventType = nodes[0]["type"] if (eventType == "node-assigned") return "submitted" if (eventType == "container-created") return "running" if (eventType == "container-exited") { - if ((res[0]["taskEventContainerExitedDatumById"]["exitStatus"] as int) == 0) { + if ((nodes[0]["taskEventContainerExitedDatumById"]["exitStatus"] as int) == 0) { return "succeeded" } else { return "failed" @@ -406,7 +410,7 @@ class DispatcherClient { } int forchGetExitCode(int forchTaskId) { - List res = client.execute(""" + Map res = client.execute(""" query GetTaskExitCode(\$taskId: BigInt!) { taskEvents( condition: {taskId: \$taskId}, @@ -414,11 +418,13 @@ class DispatcherClient { orderBy: TIME_DESC, first: 1 ) { - id - type - taskEventContainerExitedDatumById { + nodes { id - exitStatus + type + taskEventContainerExitedDatumById { + id + exitStatus + } } } } @@ -426,14 +432,15 @@ class DispatcherClient { [ taskId: forchTaskId ] - )["taskEvents"] as List + )["taskEvents"] as Map if (res == null) throw new RuntimeException("failed to get exit code for ${forchTaskId}") - if (res.size() == 0) + List nodes = res["nodes"] as List + if (nodes == null || nodes.size() == 0) return -1 - return res[0]["taskEventContainerExitedDatumById"]["exitStatus"] as int + return nodes[0]["taskEventContainerExitedDatumById"]["exitStatus"] as int } } From c6d109c9c4b1835e197d763d6a8345a512694e7f Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Tue, 10 Jun 2025 14:31:00 -0700 Subject: [PATCH 08/40] remove debug tag --- .../src/main/groovy/nextflow/util/DispatcherClient.groovy | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy index cf35a7e07a..1fd6ae8ad6 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy @@ -335,8 +335,7 @@ class DispatcherClient { dedicatedGpuType: \$gpuType, dedicatedGpuCount: \$gpus, groupId: \$groupId, - billedTo: \$billedTo, - debugTag: "rahul", + billedTo: \$billedTo } } ) { From c4062b5f7566d9814ae3eac6de0fd4429a1845d9 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Thu, 12 Jun 2025 13:28:28 -0700 Subject: [PATCH 09/40] finish Signed-off-by: Ayush Kamat --- .../nextflow/forch/ForchExecutor.groovy | 2 +- .../nextflow/forch/ForchTaskHandler.groovy | 26 +++++++++++++++++-- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy index 6b915e2b06..fa0d9c60c9 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy @@ -33,7 +33,7 @@ class ForchExecutor extends Executor { @Override TaskHandler createTaskHandler(TaskRun task) { - return new ForchTaskHandler(task, remoteBinDir) + return new ForchTaskHandler(task, remoteBinDir, session) } protected void uploadBinDir() { diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy index c1684f4b88..b83ccd8dfb 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -5,6 +5,7 @@ import java.util.concurrent.TimeUnit import groovy.json.JsonBuilder import groovy.util.logging.Slf4j +import nextflow.Session import nextflow.exception.ProcessException import nextflow.exception.ProcessUnrecoverableException import nextflow.executor.BashWrapperBuilder @@ -26,12 +27,14 @@ class ForchTaskHandler extends TaskHandler { Path remoteBinDir = null + Session session - ForchTaskHandler(TaskRun task, Path remoteBinDir) { + ForchTaskHandler(TaskRun task, Path remoteBinDir, Session session) { super(task) this.processConfig = task.processor.config this.remoteBinDir = remoteBinDir + this.session = session } private String subprocess(String command) { @@ -94,8 +97,28 @@ class ForchTaskHandler extends TaskHandler { // todo(ayush): gpu support // AcceleratorResource acc = task.config.getAccelerator() + def serverIp = System.getenv("latch_internal_nfs_server_ip") + String cmd = """\ + if [[ "\$(command -v apt-get)" ]]; then + apt-get update + apt-get install -y nfs-common + elif [[ "\$(command -v yum)" ]]; then + yum install -y nfs-utils + elif [[ "\$(command -v dnf)" ]]; then + dnf install -y nfs-utils + fi + + mkdir --parents ${session.baseDir} + + until mount -t nfs4 [${serverIp}]:/ ${session.baseDir} + do + echo "failed to mount nfs share: retrying..." + sleep 5 + done + trap "{ ret=\$?; cp ${TaskRun.CMD_LOG} ${task.workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }" EXIT; + cat ${task.workDir}/${TaskRun.CMD_RUN} | bash 2>&1 | tee ${TaskRun.CMD_LOG} """.stripIndent().trim() @@ -105,7 +128,6 @@ class ForchTaskHandler extends TaskHandler { cp ${remoteBinDir}/* /nextflow-bin chmod +x /nextflow-bin/* export PATH=/nextflow-bin:\$PATH - """.stripIndent() + cmd } From 80aa48ec24d3e433aa8c8edefae9c05941566009 Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Thu, 12 Jun 2025 13:29:40 -0700 Subject: [PATCH 10/40] create ForchClient and add forchExecutionid to process node creation --- .../nextflow/forch/ForchExecutor.groovy | 6 +- .../nextflow/forch/ForchTaskHandler.groovy | 17 +- .../nextflow/util/DispatcherClient.groovy | 206 ++++-------------- .../groovy/nextflow/util/ForchClient.groovy | 157 +++++++++++++ .../main/nextflow/file/http/GQLClient.groovy | 6 +- .../nextflow/file/http/LatchPathUtils.groovy | 28 +-- 6 files changed, 228 insertions(+), 192 deletions(-) create mode 100644 modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy index 43b2833ee0..3cdfd0370f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy @@ -11,12 +11,14 @@ import nextflow.processor.TaskMonitor import nextflow.processor.TaskPollingMonitor import nextflow.processor.TaskRun import nextflow.util.DispatcherClient +import nextflow.util.ForchClient import nextflow.util.Duration @Slf4j class ForchExecutor extends Executor { Path remoteBinDir = null + private ForchClient forchClient @Override protected TaskMonitor createTaskMonitor() { @@ -27,13 +29,13 @@ class ForchExecutor extends Executor { protected void register() { // todo(ayush): decouple dispatcher and executor this.dispatcherClient = new DispatcherClient() - this.dispatcherClient.debug = true + this.forchClient = new ForchClient() uploadBinDir() } @Override TaskHandler createTaskHandler(TaskRun task) { - return new ForchTaskHandler(task, this.dispatcherClient, remoteBinDir) + return new ForchTaskHandler(task, this.forchClient, remoteBinDir) } protected void uploadBinDir() { diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy index 367c64f5b2..5176a7e17b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -1,12 +1,9 @@ package nextflow.forch -import nextflow.file.http.GQLClient -import nextflow.util.DispatcherClient +import nextflow.util.ForchClient import java.nio.file.Path -import java.util.concurrent.TimeUnit -import groovy.json.JsonBuilder import groovy.util.logging.Slf4j import nextflow.processor.TaskHandler import nextflow.processor.TaskRun @@ -23,20 +20,20 @@ class ForchTaskHandler extends TaskHandler { Path remoteBinDir = null - private DispatcherClient dispatcherClient + private ForchClient forchClient - ForchTaskHandler(TaskRun task, DispatcherClient client, Path remoteBinDir) { + ForchTaskHandler(TaskRun task, ForchClient client, Path remoteBinDir) { super(task) this.processConfig = task.processor.config this.remoteBinDir = remoteBinDir - this.dispatcherClient = client + this.forchClient = client } private String getCurrentStatus() { if (this.forchTaskId == null) return - return this.dispatcherClient.forchGetTaskStatus(this.forchTaskId) + return this.forchClient.getTaskStatus(this.forchTaskId) } @Override @@ -53,7 +50,7 @@ class ForchTaskHandler extends TaskHandler { if (cur != "succeeded" && cur != "failed") return false // todo(ayush): single query - task.exitStatus = this.dispatcherClient.forchGetExitCode(this.forchTaskId) + task.exitStatus = this.forchClient.getTaskExitCode(this.forchTaskId) // todo(ayush): logs, retries task.stdout = "" @@ -95,7 +92,7 @@ class ForchTaskHandler extends TaskHandler { """.stripIndent() + cmd } - this.forchTaskId = this.dispatcherClient.forchSubmitTask( + this.forchTaskId = this.forchClient.submitTask( this.task.name, this.task.container, [ diff --git a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy index 1fd6ae8ad6..00224d94dc 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy @@ -18,26 +18,50 @@ class DispatcherClient { } String executionToken = System.getenv("FLYTE_INTERNAL_EXECUTION_ID") - if (executionToken == null) - throw new RuntimeException("unable to get execution token") + if (executionToken != null) { + Map res = client.execute(""" + mutation CreateNode(\$executionToken: String!, \$name: String!) { + createNfProcessNodeByExecutionToken(input: {argExecutionToken: \$executionToken, argName: \$name}) { + nodeId + } + } + """, + [ + executionToken: executionToken, + name: processName, + ] + )["createNfProcessNodeByExecutionToken"] as Map - Map res = client.execute(""" - mutation CreateNode(\$executionToken: String!, \$name: String!) { - createNfProcessNodeByExecutionToken(input: {argExecutionToken: \$executionToken, argName: \$name}) { - nodeId + if (res == null) + throw new RuntimeException("failed to create remote process node for: processName=${processName}") + + return (res.nodeId as String).toInteger() + } + + String forchExecutionId = System.getenv("FORCH_EXECUTION_ID") + if (forchExecutionId != null) { + Map res = client.execute(""" + mutation CreateNode(\$forchExecutionId: BigInt!, \$name: String!) { + createNfProcessNode(input: {nfProcessNode: {forchExecutionId: \$forchExecutionId, name: \$name } }) { + nfProcessNode { + id + } + } } - } - """, - [ - executionToken: executionToken, - name: processName, - ] - )["createNfProcessNodeByExecutionToken"] as Map + """, + [ + forchExecutionId: forchExecutionId, + name: processName, + ] + )["createNfProcessNode"] as Map - if (res == null) - throw new RuntimeException("failed to create remote process node for: processName=${processName}") + if (res == null || res["nfProcessNode"] == null) + throw new RuntimeException("failed to create remote process node for: processName=${processName}") + + return (res["nfProcessNode"]["id"] as String).toInteger() + } - return (res.nodeId as String).toInteger() + throw new RuntimeException("failed to create process node: unable to get source execution") } void closeProcessNode(int nodeId, int numTasks) { @@ -46,7 +70,7 @@ class DispatcherClient { } client.execute(""" - mutation CreateTaskInfo(\$nodeId: BigInt!, \$numTasks: BigInt!) { + mutation UpdateTaskInfo(\$nodeId: BigInt!, \$numTasks: BigInt!) { updateNfProcessNode( input: { id: \$nodeId, @@ -294,152 +318,4 @@ class DispatcherClient { return res } - - - int forchSubmitTask( - String displayName, - String image, - List entrypoint, - int cpus, - long memoryBytes - ) { - String resourceGroup = System.getenv("FORCH_RESOURCE_GROUP_ID") - if (resourceGroup == null) - throw new RuntimeException("unable to get resource group") - - String billingGroup = System.getenv("FORCH_BILLING_GROUP_ID") - if (billingGroup == null) - throw new RuntimeException("unable to get billing group") - - Map res = client.execute(""" - mutation CreateForchTask( - \$displayName: String!, - \$containerImage: String!, - \$containerEntrypoint: [String]!, - \$cpus: Int!, - \$memoryBytes: BigInt!, - \$gpuType: String, - \$gpus: Int!, - \$groupId: BigInt!, - \$billedTo: BigInt! - ) { - createTask( - input: { - task: { - displayName: \$displayName, - containerImage: \$containerImage, - containerEntrypoint: \$containerEntrypoint, - dedicatedCpusetSize: \$cpus, - dedicatedMemoryBytes: \$memoryBytes, - allowInternetEgress: true, - dedicatedGpuType: \$gpuType, - dedicatedGpuCount: \$gpus, - groupId: \$groupId, - billedTo: \$billedTo - } - } - ) { - task { - id - } - } - } - """, - [ - "displayName" : displayName, - "containerImage" : image, - "containerEntrypoint" : entrypoint, - "cpus" : cpus, - "memoryBytes" : memoryBytes, - "gpuType" : null, - "gpus" : 0, - "groupId": resourceGroup.toInteger(), - "billedTo": billingGroup.toInteger() - ] - )["createTask"] as Map - - if (res == null) - throw new RuntimeException("failed to create forch task") - - return ((res.task as Map).id as String).toInteger() - } - - String forchGetTaskStatus(int forchTaskId) { - Map res = client.execute(""" - query GetTaskStatus(\$taskId: BigInt!) { - taskEvents(condition: {taskId: \$taskId}, orderBy: TIME_DESC, first: 1) { - nodes { - id - type - taskEventContainerExitedDatumById { - id - exitStatus - } - } - } - } - """, - [ - taskId: forchTaskId - ] - )["taskEvents"] as Map - - if (res == null) - throw new RuntimeException("failed to get task events for ${forchTaskId}") - - List nodes = res["nodes"] as List - if (nodes == null || nodes.size() == 0) - return "queued" - - // todo(rahul): might be a good idea to throw this logic into a vac function so that we can easily update - String eventType = nodes[0]["type"] - if (eventType == "node-assigned") - return "submitted" - if (eventType == "container-created") - return "running" - if (eventType == "container-exited") { - if ((nodes[0]["taskEventContainerExitedDatumById"]["exitStatus"] as int) == 0) { - return "succeeded" - } else { - return "failed" - } - } - - return "queued" - } - - int forchGetExitCode(int forchTaskId) { - Map res = client.execute(""" - query GetTaskExitCode(\$taskId: BigInt!) { - taskEvents( - condition: {taskId: \$taskId}, - filter: {taskEventContainerExitedDatumByIdExists: true}, - orderBy: TIME_DESC, - first: 1 - ) { - nodes { - id - type - taskEventContainerExitedDatumById { - id - exitStatus - } - } - } - } - """, - [ - taskId: forchTaskId - ] - )["taskEvents"] as Map - - if (res == null) - throw new RuntimeException("failed to get exit code for ${forchTaskId}") - - List nodes = res["nodes"] as List - if (nodes == null || nodes.size() == 0) - return -1 - - return nodes[0]["taskEventContainerExitedDatumById"]["exitStatus"] as int - } } diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy new file mode 100644 index 0000000000..7cc5a9369b --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -0,0 +1,157 @@ +package nextflow.util + +import groovy.util.logging.Slf4j +import nextflow.file.http.GQLClient +import nextflow.file.http.GQLClient.GQLQueryException + +@Slf4j +class ForchClient { + private GQLClient client = new GQLClient(true) + + int submitTask( + String displayName, + String image, + List entrypoint, + int cpus, + long memoryBytes + ) { + String resourceGroup = System.getenv("FORCH_RESOURCE_GROUP_ID") + if (resourceGroup == null) + throw new RuntimeException("unable to get resource group") + + String billingGroup = System.getenv("FORCH_BILLING_GROUP_ID") + if (billingGroup == null) + throw new RuntimeException("unable to get billing group") + + Map res = client.execute(""" + mutation CreateForchTask( + \$displayName: String!, + \$containerImage: String!, + \$containerEntrypoint: [String]!, + \$cpus: Int!, + \$memoryBytes: BigInt!, + \$gpuType: String, + \$gpus: Int!, + \$groupId: BigInt!, + \$billedTo: BigInt! + ) { + createTask( + input: { + task: { + displayName: \$displayName, + containerImage: \$containerImage, + containerEntrypoint: \$containerEntrypoint, + dedicatedCpusetSize: \$cpus, + dedicatedMemoryBytes: \$memoryBytes, + allowInternetEgress: true, + dedicatedGpuType: \$gpuType, + dedicatedGpuCount: \$gpus, + groupId: \$groupId, + billedTo: \$billedTo + } + } + ) { + task { + id + } + } + } + """, + [ + "displayName" : displayName, + "containerImage" : image, + "containerEntrypoint" : entrypoint, + "cpus" : cpus, + "memoryBytes" : memoryBytes, + "gpuType" : null, + "gpus" : 0, + "groupId": resourceGroup.toInteger(), + "billedTo": billingGroup.toInteger() + ] + )["createTask"] as Map + + if (res == null) + throw new RuntimeException("failed to create forch task") + + return ((res.task as Map).id as String).toInteger() + } + + String getTaskStatus(int forchTaskId) { + Map res = client.execute(""" + query GetTaskStatus(\$taskId: BigInt!) { + taskEvents(condition: {taskId: \$taskId}, orderBy: TIME_DESC, first: 1) { + nodes { + id + type + taskEventContainerExitedDatumById { + id + exitStatus + } + } + } + } + """, + [ + taskId: forchTaskId + ] + )["taskEvents"] as Map + + if (res == null) + throw new RuntimeException("failed to get task events for ${forchTaskId}") + + List nodes = res["nodes"] as List + if (nodes == null || nodes.size() == 0) + return "queued" + + // todo(rahul): might be a good idea to throw this logic into a vac function so that we can easily update + String eventType = nodes[0]["type"] + if (eventType == "node-assigned") + return "submitted" + if (eventType == "container-created") + return "running" + if (eventType == "container-exited") { + if ((nodes[0]["taskEventContainerExitedDatumById"]["exitStatus"] as int) == 0) { + return "succeeded" + } else { + return "failed" + } + } + + return "queued" + } + + int getTaskExitCode(int forchTaskId) { + Map res = client.execute(""" + query GetTaskExitCode(\$taskId: BigInt!) { + taskEvents( + condition: {taskId: \$taskId}, + filter: {taskEventContainerExitedDatumByIdExists: true}, + orderBy: TIME_DESC, + first: 1 + ) { + nodes { + id + type + taskEventContainerExitedDatumById { + id + exitStatus + } + } + } + } + """, + [ + taskId: forchTaskId + ] + )["taskEvents"] as Map + + if (res == null) + throw new RuntimeException("failed to get exit code for ${forchTaskId}") + + List nodes = res["nodes"] as List + if (nodes == null || nodes.size() == 0) + return -1 + + return nodes[0]["taskEventContainerExitedDatumById"]["exitStatus"] as int + } +} diff --git a/modules/nf-httpfs/src/main/nextflow/file/http/GQLClient.groovy b/modules/nf-httpfs/src/main/nextflow/file/http/GQLClient.groovy index 5ea8767073..c4c3286ba8 100644 --- a/modules/nf-httpfs/src/main/nextflow/file/http/GQLClient.groovy +++ b/modules/nf-httpfs/src/main/nextflow/file/http/GQLClient.groovy @@ -11,6 +11,7 @@ import groovy.json.JsonSlurper class GQLClient { private String endpoint private HttpRetryClient client + private boolean useForchAuth class GQLQueryException extends Exception { GQLQueryException(String msg) { @@ -18,7 +19,8 @@ class GQLClient { } } - GQLClient() { + GQLClient(boolean useForchAuth = false) { + this.useForchAuth = useForchAuth endpoint = "https://vacuole.latch.bio/graphql" String domain = System.getenv("LATCH_SDK_DOMAIN") @@ -43,7 +45,7 @@ class GQLClient { .uri(URI.create(this.endpoint)) .timeout(Duration.ofSeconds(90)) .header("Content-Type", "application/json") - .header("Authorization", LatchPathUtils.getAuthHeader()) + .header("Authorization", LatchPathUtils.getAuthHeader(useForchAuth)) HttpRequest req = requestBuilder.POST(HttpRequest.BodyPublishers.ofString(builder.toString())).build() HttpResponse response = this.client.send(req) diff --git a/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy b/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy index 669ba2aa3c..b2f59d71a1 100644 --- a/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy +++ b/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy @@ -7,19 +7,21 @@ class LatchPathUtils { static class UnauthenticatedException extends Exception {} - static String getAuthHeader() { - def forchToken = System.getenv("FORCH_AUTH_TOKEN") - if (forchToken != null) - return "Forch-Auth-Token $forchToken" - - def flyteToken = System.getenv("FLYTE_INTERNAL_EXECUTION_ID") - if (flyteToken != null) - return "Latch-Execution-Token $flyteToken" - - String home = System.getProperty("user.home") - File tokenFile = new File("$home/.latch/token") - if (tokenFile.exists()) - return "Latch-SDK-Token ${tokenFile.text.strip()}" + static String getAuthHeader(boolean useForchAuth = false) { + if (useForchAuth) { + def forchToken = System.getenv("FORCH_AUTH_TOKEN") + if (forchToken != null) + return "Forch-Auth-Token $forchToken" + } else { + def flyteToken = System.getenv("FLYTE_INTERNAL_EXECUTION_ID") + if (flyteToken != null) + return "Latch-Execution-Token $flyteToken" + + String home = System.getProperty("user.home") + File tokenFile = new File("$home/.latch/token") + if (tokenFile.exists()) + return "Latch-SDK-Token ${tokenFile.text.strip()}" + } throw new UnauthenticatedException() } From 3717181f9a3f174f62200faec35165ac9df6b471 Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Fri, 13 Jun 2025 15:55:16 -0700 Subject: [PATCH 11/40] fixes --- .../nextflow/processor/TaskProcessor.groovy | 1 - .../nextflow/util/DispatcherClient.groovy | 63 +++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index a937764056..40d7e4dad8 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -334,7 +334,6 @@ class TaskProcessor { this.numTasks = 0 this.dispatcherClient = new DispatcherClient() - this.dispatcherClient.debug = !(executor instanceof K8sExecutor) this.maxForks = config.maxForks && config.maxForks>0 ? config.maxForks as int : 0 this.forksCount = maxForks ? new LongAdder() : null diff --git a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy index 00224d94dc..b601f4fbdd 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy @@ -180,6 +180,67 @@ class DispatcherClient { return 1 } + String forchExecutionId = System.getenv("FORCH_EXECUTION_ID") + if (forchExecutionId != null) { + try { + Map res = client.execute(""" + mutation CreateForchTaskExecutionInfo(\$taskId: BigInt!, \$attemptIdx: BigInt!, \$hash: String, \$status: TaskExecutionStatus!) { + createNfForchTaskExecutionInfo( + input: { + nfForchTaskExecutionInfo: { + taskId: \$taskId, + attemptIdx: \$attemptIdx, + hash: \$hash, + statusOverride: \$status + } + } + ) { + nfForchTaskExecutionInfo { + id + } + } + } + """, + [ + taskId: taskId, + attemptIdx: attemptIdx, + hash: hash, + status: status, + ] + )["createNfForchTaskExecutionInfo"] as Map + + if (res == null) + throw new RuntimeException("failed to create remote task execution for: taskId=${taskId} attempt=${attemptIdx} hash=${hash}") + + return ((res.nfForchTaskExecutionInfo as Map).id as String).toInteger() + } catch (GQLQueryException e) { + + // note(rahul): the gql client uses the HTTP Retry Client. As a result, it may retry a request after + // successfully committing the row to the DB (for example, if the connection fails) + if (!e.message.contains("duplicate key value violates unique constraint")) { + throw e + } + } + + Map res = client.execute(""" + query GetNfForchTaskExecutionInfo(\$taskId: BigInt!, \$attemptIdx: BigInt!) { + nfForchTaskExecutionInfoByTaskIdAndAttemptIdx(attemptIdx: \$attemptIdx, taskId: \$taskId) { + id + } + } + """, + [ + taskId: taskId, + attemptIdx: attemptIdx, + ] + )["nfForchTaskExecutionInfoByTaskIdAndAttemptIdx"] as Map + + if (res == null) + throw new RuntimeException("failed to get forch task execution id for: taskId=${taskId} attemptIdx=${attemptIdx}") + + return (res.id as String).toInteger() + } + try { Map res = client.execute(""" mutation CreateTaskExecutionInfo(\$taskId: BigInt!, \$attemptIdx: BigInt!, \$hash: String, \$status: TaskExecutionStatus!) { @@ -244,6 +305,8 @@ class DispatcherClient { } void submitPod(int taskExecutionId, Map pod) { + if (debug) return + client.execute(""" mutation UpdateTaskExecution(\$taskExecutionId: BigInt!, \$podSpec: String!) { updateNfTaskExecutionInfo( From f2632e53b7f003c67f3c1aa2f72b69c5f112b711 Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Mon, 16 Jun 2025 15:43:42 -0700 Subject: [PATCH 12/40] working with gql and rls --- .../nextflow/forch/ForchTaskHandler.groovy | 2 + .../nextflow/util/DispatcherClient.groovy | 2 +- .../groovy/nextflow/util/ForchClient.groovy | 42 ++++++++++--------- 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy index ceabc7dfa4..a72f41a889 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -81,6 +81,8 @@ class ForchTaskHandler extends TaskHandler { // AcceleratorResource acc = task.config.getAccelerator() def serverIp = System.getenv("latch_internal_nfs_server_ip") + if (serverIp == null) + throw new RuntimeException("failed to get server ip") String cmd = """\ if [[ "\$(command -v apt-get)" ]]; then diff --git a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy index b601f4fbdd..56cb40dbc7 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy @@ -205,7 +205,7 @@ class DispatcherClient { taskId: taskId, attemptIdx: attemptIdx, hash: hash, - status: status, + status: status == null ? 'UNDEFINED' : status, ] )["createNfForchTaskExecutionInfo"] as Map diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index 7cc5a9369b..d133860806 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -23,6 +23,10 @@ class ForchClient { if (billingGroup == null) throw new RuntimeException("unable to get billing group") + String nfsServerTaskId = System.getenv("nfs_server_task_id") + if (nfsServerTaskId == null) + throw new RuntimeException("unable to get NFS server task id") + Map res = client.execute(""" mutation CreateForchTask( \$displayName: String!, @@ -33,27 +37,24 @@ class ForchClient { \$gpuType: String, \$gpus: Int!, \$groupId: BigInt!, - \$billedTo: BigInt! + \$billedTo: BigInt!, + \$nfsServerTaskId: BigInt! ) { - createTask( + nfCreateForchTask( input: { - task: { - displayName: \$displayName, - containerImage: \$containerImage, - containerEntrypoint: \$containerEntrypoint, - dedicatedCpusetSize: \$cpus, - dedicatedMemoryBytes: \$memoryBytes, - allowInternetEgress: true, - dedicatedGpuType: \$gpuType, - dedicatedGpuCount: \$gpus, - groupId: \$groupId, - billedTo: \$billedTo - } + argDisplayName: \$displayName, + argContainerImage: \$containerImage, + argContainerEntrypoint: \$containerEntrypoint, + argCpus: \$cpus, + argMemoryBytes: \$memoryBytes, + argGpuType: \$gpuType, + argGpus: \$gpus, + argGroupId: \$groupId, + argBilledTo: \$billedTo, + argNfsServerTaskId: \$nfsServerTaskId } ) { - task { - id - } + resTaskId } } """, @@ -66,14 +67,15 @@ class ForchClient { "gpuType" : null, "gpus" : 0, "groupId": resourceGroup.toInteger(), - "billedTo": billingGroup.toInteger() + "billedTo": billingGroup.toInteger(), + "nfsServerTaskId": nfsServerTaskId, ] - )["createTask"] as Map + )["nfCreateForchTask"] as Map if (res == null) throw new RuntimeException("failed to create forch task") - return ((res.task as Map).id as String).toInteger() + return (res.resTaskId as String).toInteger() } String getTaskStatus(int forchTaskId) { From 4832468ea9282dbfbfa344f8cd13f893e530febe Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Mon, 16 Jun 2025 16:25:23 -0700 Subject: [PATCH 13/40] update execution info with forch task id --- .../nextflow/forch/ForchExecutor.groovy | 2 +- .../nextflow/forch/ForchTaskHandler.groovy | 14 ++++++++-- .../nextflow/util/DispatcherClient.groovy | 26 +++++++++++++++++++ .../groovy/nextflow/util/ForchClient.groovy | 1 - 4 files changed, 39 insertions(+), 4 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy index fa892d138d..a99bd86cfd 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy @@ -34,7 +34,7 @@ class ForchExecutor extends Executor { @Override TaskHandler createTaskHandler(TaskRun task) { - return new ForchTaskHandler(task, this.forchClient, remoteBinDir, session) + return new ForchTaskHandler(task, remoteBinDir, session, this.forchClient, this.dispatcherClient) } protected void uploadBinDir() { diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy index a72f41a889..0cb93fecbc 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -1,5 +1,6 @@ package nextflow.forch +import nextflow.util.DispatcherClient import nextflow.util.ForchClient import java.nio.file.Path @@ -21,14 +22,16 @@ class ForchTaskHandler extends TaskHandler { Integer forchTaskId Path remoteBinDir = null private ForchClient forchClient + private DispatcherClient dispatcherClient Session session - ForchTaskHandler(TaskRun task, ForchClient client, Path remoteBinDir, Session session) { + ForchTaskHandler(TaskRun task, Path remoteBinDir, Session session, ForchClient forchClient, DispatcherClient dispatcherClient) { super(task) this.processConfig = task.processor.config this.remoteBinDir = remoteBinDir - this.forchClient = client + this.forchClient = forchClient + this.dispatcherClient = dispatcherClient this.session = session } @@ -127,5 +130,12 @@ class ForchTaskHandler extends TaskHandler { cpus, memory.bytes ) + + // note(rahul): this is not crash safe, but the forch task will still be billed to the proper + // account. It just wont be associated with a task execution info + this.dispatcherClient.updateForchTaskId( + this.taskExecutionId, + this.forchTaskId + ) } } diff --git a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy index 56cb40dbc7..96db477793 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy @@ -381,4 +381,30 @@ class DispatcherClient { return res } + + void updateForchTaskId(int taskExecutionId, int forchTaskId) { + if (debug) { + return + } + + client.execute(""" + mutation UpdateTaskExecution(\$taskExecutionId: BigInt!, \$forchTaskId: BigInt!) { + updateNfForchTaskExecutionInfo( + input: { + id: \$taskExecutionId, + patch: { + forchTaskId: \$forchTaskId + }, + } + ) { + clientMutationId + } + } + """, + [ + taskExecutionId: taskExecutionId, + forchTaskId: forchTaskId + ] + ) + } } diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index d133860806..8485cb710c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -2,7 +2,6 @@ package nextflow.util import groovy.util.logging.Slf4j import nextflow.file.http.GQLClient -import nextflow.file.http.GQLClient.GQLQueryException @Slf4j class ForchClient { From c9cacf87aafe729aa66f843b262a61f49d52609e Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Wed, 18 Jun 2025 13:20:30 -0700 Subject: [PATCH 14/40] move task status logic to vac function --- .../nextflow/forch/ForchTaskHandler.groovy | 7 ++-- .../groovy/nextflow/processor/TaskRun.groovy | 3 +- .../nextflow/util/DispatcherClient.groovy | 4 +- .../groovy/nextflow/util/ForchClient.groovy | 39 +++---------------- .../nextflow/file/http/LatchPathUtils.groovy | 2 +- 5 files changed, 13 insertions(+), 42 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy index 0cb93fecbc..e943f2baaf 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -44,7 +44,7 @@ class ForchTaskHandler extends TaskHandler { @Override boolean checkIfRunning() { - def running = this.currentStatus == 'running' + def running = this.currentStatus == 'RUNNING' if (running) status = TaskStatus.RUNNING return running @@ -53,7 +53,7 @@ class ForchTaskHandler extends TaskHandler { @Override boolean checkIfCompleted() { def cur = this.currentStatus - if (cur != "succeeded" && cur != "failed") return false + if (cur != "SUCCEEDED" && cur != "FAILED") return false // todo(ayush): single query task.exitStatus = this.forchClient.getTaskExitCode(this.forchTaskId) @@ -131,8 +131,7 @@ class ForchTaskHandler extends TaskHandler { memory.bytes ) - // note(rahul): this is not crash safe, but the forch task will still be billed to the proper - // account. It just wont be associated with a task execution info + // todo(rahul): put this in a single transaction with submitTask this.dispatcherClient.updateForchTaskId( this.taskExecutionId, this.forchTaskId diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy index 62012d7a9d..f50286c514 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy @@ -582,8 +582,7 @@ class TaskRun implements Cloneable { static final public String CMD_SCRIPT = '.command.sh' static final public String CMD_INFILE = '.command.in' static final public String CMD_OUTFILE = '.command.out' - static final public String CMD_ERRFILE = '.command.err' - static final public String CMD_EXIT = '.exitcode' +F static final public String CMD_EXIT = '.exitcode' static final public String CMD_START = '.command.begin' static final public String CMD_RUN = '.command.run' static final public String CMD_STAGE = '.command.stage' diff --git a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy index 96db477793..6d50a6e38d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy @@ -38,7 +38,7 @@ class DispatcherClient { return (res.nodeId as String).toInteger() } - String forchExecutionId = System.getenv("FORCH_EXECUTION_ID") + String forchExecutionId = System.getenv("forch_execution_id") if (forchExecutionId != null) { Map res = client.execute(""" mutation CreateNode(\$forchExecutionId: BigInt!, \$name: String!) { @@ -180,7 +180,7 @@ class DispatcherClient { return 1 } - String forchExecutionId = System.getenv("FORCH_EXECUTION_ID") + String forchExecutionId = System.getenv("forch_execution_id") if (forchExecutionId != null) { try { Map res = client.execute(""" diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index 8485cb710c..7a3cada022 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -14,11 +14,11 @@ class ForchClient { int cpus, long memoryBytes ) { - String resourceGroup = System.getenv("FORCH_RESOURCE_GROUP_ID") + String resourceGroup = System.getenv("forch_resource_group_id") if (resourceGroup == null) throw new RuntimeException("unable to get resource group") - String billingGroup = System.getenv("FORCH_BILLING_GROUP_ID") + String billingGroup = System.getenv("forch_billing_group_id") if (billingGroup == null) throw new RuntimeException("unable to get billing group") @@ -80,45 +80,18 @@ class ForchClient { String getTaskStatus(int forchTaskId) { Map res = client.execute(""" query GetTaskStatus(\$taskId: BigInt!) { - taskEvents(condition: {taskId: \$taskId}, orderBy: TIME_DESC, first: 1) { - nodes { - id - type - taskEventContainerExitedDatumById { - id - exitStatus - } - } - } + nfForchTaskStatus(argTaskId: \$taskId) } """, [ taskId: forchTaskId ] - )["taskEvents"] as Map + ) as Map if (res == null) - throw new RuntimeException("failed to get task events for ${forchTaskId}") - - List nodes = res["nodes"] as List - if (nodes == null || nodes.size() == 0) - return "queued" - - // todo(rahul): might be a good idea to throw this logic into a vac function so that we can easily update - String eventType = nodes[0]["type"] - if (eventType == "node-assigned") - return "submitted" - if (eventType == "container-created") - return "running" - if (eventType == "container-exited") { - if ((nodes[0]["taskEventContainerExitedDatumById"]["exitStatus"] as int) == 0) { - return "succeeded" - } else { - return "failed" - } - } + throw new RuntimeException("failed to get task status for ${forchTaskId}") - return "queued" + return res["nfForchTaskStatus"] } int getTaskExitCode(int forchTaskId) { diff --git a/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy b/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy index b2f59d71a1..d6e90625f2 100644 --- a/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy +++ b/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy @@ -9,7 +9,7 @@ class LatchPathUtils { static String getAuthHeader(boolean useForchAuth = false) { if (useForchAuth) { - def forchToken = System.getenv("FORCH_AUTH_TOKEN") + def forchToken = System.getenv("forch_auth_token") if (forchToken != null) return "Forch-Auth-Token $forchToken" } else { From 00fa840e9b7a27d401c97ea4e9f7313a98ca65ef Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Wed, 18 Jun 2025 13:58:12 -0700 Subject: [PATCH 15/40] forchexecid -> execid --- .../main/groovy/nextflow/util/DispatcherClient.groovy | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy index 6d50a6e38d..e4c5347702 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy @@ -38,11 +38,11 @@ class DispatcherClient { return (res.nodeId as String).toInteger() } - String forchExecutionId = System.getenv("forch_execution_id") - if (forchExecutionId != null) { + String executionId = System.getenv("forch_execution_id") + if (executionId != null) { Map res = client.execute(""" - mutation CreateNode(\$forchExecutionId: BigInt!, \$name: String!) { - createNfProcessNode(input: {nfProcessNode: {forchExecutionId: \$forchExecutionId, name: \$name } }) { + mutation CreateNode(\$executionId: BigInt!, \$name: String!) { + createNfProcessNode(input: {nfProcessNode: {executionId: \$executionId, name: \$name } }) { nfProcessNode { id } @@ -50,7 +50,7 @@ class DispatcherClient { } """, [ - forchExecutionId: forchExecutionId, + executionId: executionId, name: processName, ] )["createNfProcessNode"] as Map From 703f417ec90596ab62b030c5d3c3bee726f179c3 Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Thu, 19 Jun 2025 12:16:10 -0700 Subject: [PATCH 16/40] whoops --- .../nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy index f50286c514..62012d7a9d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy @@ -582,7 +582,8 @@ class TaskRun implements Cloneable { static final public String CMD_SCRIPT = '.command.sh' static final public String CMD_INFILE = '.command.in' static final public String CMD_OUTFILE = '.command.out' -F static final public String CMD_EXIT = '.exitcode' + static final public String CMD_ERRFILE = '.command.err' + static final public String CMD_EXIT = '.exitcode' static final public String CMD_START = '.command.begin' static final public String CMD_RUN = '.command.run' static final public String CMD_STAGE = '.command.stage' From 5f79539cca6482a0e7c1291064e9832e78b212af Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Thu, 19 Jun 2025 12:45:04 -0700 Subject: [PATCH 17/40] fix everything; --- .../src/main/groovy/nextflow/util/DispatcherClient.groovy | 8 ++++---- .../src/main/groovy/nextflow/util/ForchClient.groovy | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy index e4c5347702..8849adb893 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy @@ -184,14 +184,14 @@ class DispatcherClient { if (forchExecutionId != null) { try { Map res = client.execute(""" - mutation CreateForchTaskExecutionInfo(\$taskId: BigInt!, \$attemptIdx: BigInt!, \$hash: String, \$status: TaskExecutionStatus!) { + mutation CreateForchTaskExecutionInfo(\$taskId: BigInt!, \$attemptIdx: BigInt!, \$cached: Boolean!, \$hash: String) { createNfForchTaskExecutionInfo( input: { nfForchTaskExecutionInfo: { taskId: \$taskId, attemptIdx: \$attemptIdx, - hash: \$hash, - statusOverride: \$status + cached: \$cached, + hash: \$hash } } ) { @@ -204,8 +204,8 @@ class DispatcherClient { [ taskId: taskId, attemptIdx: attemptIdx, + cached: status == 'SKIPPED', hash: hash, - status: status == null ? 'UNDEFINED' : status, ] )["createNfForchTaskExecutionInfo"] as Map diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index 7a3cada022..881b037699 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -80,7 +80,7 @@ class ForchClient { String getTaskStatus(int forchTaskId) { Map res = client.execute(""" query GetTaskStatus(\$taskId: BigInt!) { - nfForchTaskStatus(argTaskId: \$taskId) + taskStatus(argTaskId: \$taskId) } """, [ From f85acb841df06900e71a41590c732be7a0467b1f Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Thu, 19 Jun 2025 14:15:57 -0700 Subject: [PATCH 18/40] fix --- .../nextflow/src/main/groovy/nextflow/util/ForchClient.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index 881b037699..f0b55e24da 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -91,7 +91,7 @@ class ForchClient { if (res == null) throw new RuntimeException("failed to get task status for ${forchTaskId}") - return res["nfForchTaskStatus"] + return res["taskStatus"] } int getTaskExitCode(int forchTaskId) { From 35d3061448d87c062ad716c1fb0cc315865dcb31 Mon Sep 17 00:00:00 2001 From: Rahul Desai Date: Fri, 20 Jun 2025 10:55:50 -0700 Subject: [PATCH 19/40] update task status function --- .../src/main/groovy/nextflow/util/ForchClient.groovy | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index f0b55e24da..b507c62043 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -80,7 +80,10 @@ class ForchClient { String getTaskStatus(int forchTaskId) { Map res = client.execute(""" query GetTaskStatus(\$taskId: BigInt!) { - taskStatus(argTaskId: \$taskId) + task(id: \$taskId) { + id + status + } } """, [ @@ -90,8 +93,9 @@ class ForchClient { if (res == null) throw new RuntimeException("failed to get task status for ${forchTaskId}") + + return res["task"]["status"] - return res["taskStatus"] } int getTaskExitCode(int forchTaskId) { From 8a1857405b741704b4b93849299a0d2b97bf6029 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Fri, 27 Jun 2025 14:54:29 -0700 Subject: [PATCH 20/40] add s5cmd Signed-off-by: Ayush Kamat --- Dockerfile | 11 +++++++++++ Justfile | 11 +++++++++++ 2 files changed, 22 insertions(+) create mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000000..3988a2b0ab --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +# syntax = docker/dockerfile:1.4.1 + +from alpine:3.22.0 + +run apk add \ + bash \ + s5cmd \ + openjdk21-jre-headless + +copy ./.nextflow /root/.nextflow +copy ./nextflow /usr/bin/nextflow diff --git a/Justfile b/Justfile index 12953c0b00..19c220259e 100644 --- a/Justfile +++ b/Justfile @@ -8,6 +8,17 @@ build-sync: CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -ldflags '-extldflags "-static"' -o custom_fsync.bin custom_fsync/sync.go chmod +x custom_fsync +image_name := "812206152185.dkr.ecr.us-west-2.amazonaws.com/forch-nf-runtime" + +@dbnp: + cp -rf ~/.nextflow ./ + + export tag=$( Date: Wed, 2 Jul 2025 12:23:07 -0700 Subject: [PATCH 21/40] use forch auth in dispatcher client too Signed-off-by: Ayush Kamat --- Dockerfile | 5 +++- Justfile | 6 ++--- LATCH_VERSION | 2 +- .../groovy/nextflow/util/ForchClient.groovy | 2 +- .../main/nextflow/file/http/GQLClient.groovy | 6 ++--- .../nextflow/file/http/LatchPathUtils.groovy | 25 ++++++++----------- 6 files changed, 21 insertions(+), 25 deletions(-) diff --git a/Dockerfile b/Dockerfile index 3988a2b0ab..54d397dc0a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,8 +4,11 @@ from alpine:3.22.0 run apk add \ bash \ - s5cmd \ + curl \ openjdk21-jre-headless +run curl -sSL https://github.com/jqlang/jq/releases/download/jq-1.8.1/jq-linux-amd64 -o /bin/jq +run chmod +x /bin/jq + copy ./.nextflow /root/.nextflow copy ./nextflow /usr/bin/nextflow diff --git a/Justfile b/Justfile index 19c220259e..028a3b4bc1 100644 --- a/Justfile +++ b/Justfile @@ -12,10 +12,10 @@ image_name := "812206152185.dkr.ecr.us-west-2.amazonaws.com/forch-nf-runtime" @dbnp: cp -rf ~/.nextflow ./ + rm -rf .nextflow/plugins/* - export tag=$( response = this.client.send(req) diff --git a/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy b/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy index d6e90625f2..ab7337a32f 100644 --- a/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy +++ b/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy @@ -7,21 +7,16 @@ class LatchPathUtils { static class UnauthenticatedException extends Exception {} - static String getAuthHeader(boolean useForchAuth = false) { - if (useForchAuth) { - def forchToken = System.getenv("forch_auth_token") - if (forchToken != null) - return "Forch-Auth-Token $forchToken" - } else { - def flyteToken = System.getenv("FLYTE_INTERNAL_EXECUTION_ID") - if (flyteToken != null) - return "Latch-Execution-Token $flyteToken" - - String home = System.getProperty("user.home") - File tokenFile = new File("$home/.latch/token") - if (tokenFile.exists()) - return "Latch-SDK-Token ${tokenFile.text.strip()}" - } + static String getAuthHeader() { + def forchToken = System.getenv("forch_auth_token") + if (forchToken != null) return "Forch-Auth-Token $forchToken" + + def flyteToken = System.getenv("FLYTE_INTERNAL_EXECUTION_ID") + if (flyteToken != null) return "Latch-Execution-Token $flyteToken" + + String home = System.getProperty("user.home") + File tokenFile = new File("$home/.latch/token") + if (tokenFile.exists()) return "Latch-SDK-Token ${tokenFile.text.strip()}" throw new UnauthenticatedException() } From afb02f791e62a8a41d934596ad269e4c60f72300 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Wed, 2 Jul 2025 12:24:30 -0700 Subject: [PATCH 22/40] bump ver Signed-off-by: Ayush Kamat --- LATCH_VERSION | 2 +- forch | 227 -------------------------------------------------- 2 files changed, 1 insertion(+), 228 deletions(-) delete mode 100755 forch diff --git a/LATCH_VERSION b/LATCH_VERSION index 5e333f9cf2..2251dc3e51 100644 --- a/LATCH_VERSION +++ b/LATCH_VERSION @@ -1 +1 @@ -v3.0.8 +v3.0.9 diff --git a/forch b/forch deleted file mode 100755 index 5febb94573..0000000000 --- a/forch +++ /dev/null @@ -1,227 +0,0 @@ -#!/opt/conda/envs/workflow/bin/python - -import asyncio -import json -import sys -from dataclasses import dataclass -from typing import Literal - -from latch_config.config import PostgresConnectionConfig, read_config -from latch_data_validation.data_validation import untraced_validate -from latch_postgres.postgres import ( - LatchAsyncConnection, - get_pool, - get_with_conn_retry, - sqlq, -) - - -@dataclass -class AppConfig: - db: PostgresConnectionConfig - nfs_server_task_id: int - - -config = read_config(AppConfig) - -pool = get_pool(config.db, "nextflow_forch_test", read_only=False) -with_conn_retry = get_with_conn_retry(pool, config.db) - - -@dataclass -class TaskInfo: - id: int - - -@dataclass -class CreateTaskInput: - display_name: str - container_image: str - container_entrypoint: list[str] - cpus: int - memory_bytes: int - gpu_type: str | None - gpus: int - - -async def create_task(payload: CreateTaskInput): - @with_conn_retry - async def db_work(conn: LatchAsyncConnection): - return await conn.query1( - TaskInfo, - sqlq( - """ - with - task as ( - insert into - forch_pub.tasks( - display_name, - container_image, - container_entrypoint, - dedicated_cpuset_size, - dedicated_memory_bytes, - allow_internet_egress, - dedicated_gpu_type, - dedicated_gpu_count - ) - values - ( - %(display_name)s, - %(container_image)s, - %(container_entrypoint)s, - %(cpus)s, - %(memory_bytes)s, - true, - %(gpu_type)s, - %(gpus)s - ) - returning - id - ) - insert into - forch_pub.nfs_shares( - client_task_id, - server_task_id, - container_mount_path, - host_mountpoint_subpath - ) - select - id, - %(nfs_server_task_id)s, - '/nf-workdir'::bytea, - 'nfs/work'::bytea - from - task - returning - client_task_id as id - """, - ), - display_name=payload.display_name, - container_image=payload.container_image, - container_entrypoint=payload.container_entrypoint, - cpus=payload.cpus, - memory_bytes=payload.memory_bytes, - gpu_type=payload.gpu_type, - gpus=payload.gpus, - nfs_server_task_id=config.nfs_server_task_id, - ) - - return await db_work() - - -@dataclass -class TaskStatus: - status: Literal[ - "queued", - "initializing", - "running", - "succeeded", - "failed", - ] - - -async def get_task_status(task_id: int): - @with_conn_retry - async def db_work(conn: LatchAsyncConnection): - return await conn.query1( - TaskStatus, - sqlq( - """ - select - coalesce( - ( - select - ( - select - case - when te.type = 'node-assigned' then - 'submitted' - when te.type = 'container-created' then - 'running' - when te.type = 'container-exited' then - ( - select - case - when teced.exit_status = 0 then - 'succeeded' - else - 'failed' - end - ) - else - null - end - ) - from - forch_pub.task_events te - left join - forch_pub.task_event_container_exited_data teced - on teced.id = te.id - where - te.task_id = %(task_id)s - order by - time desc - limit 1 - ), - 'queued' - ) status - """, - ), - task_id=task_id, - ) - - return await db_work() - - -@dataclass -class TaskExitCode: - exit_status: int | None - - -async def get_task_exit_code(task_id: int): - @with_conn_retry - async def db_work(conn: LatchAsyncConnection): - return await conn.query1( - TaskExitCode, - sqlq( - """ - select - teced.exit_status - from - forch_pub.task_events te - inner join - forch_pub.task_event_container_exited_data teced - on teced.id = te.id - where - te.task_id = %(task_id)s - """, - ), - task_id=task_id, - ) - - return await db_work() - - -async def main(): - await pool.open() - args = sys.argv[1:] - if len(args) != 2: - print("Must provide a command") - sys.exit(1) - - if args[0] == "create": - payload = untraced_validate(json.loads(args[1]), CreateTaskInput) - res = await create_task(payload) - print(res.id) - elif args[0] == "status": - task_id = int(args[1]) - res = await get_task_status(task_id) - print(res.status) - elif args[0] == "exitcode": - task_id = int(args[1]) - res = await get_task_exit_code(task_id) - print(res.exit_status) - - -if __name__ == "__main__": - asyncio.run(main()) From af05397860811d62c5c618e42e9112729b18331d Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Wed, 2 Jul 2025 12:32:18 -0700 Subject: [PATCH 23/40] fucking retard Signed-off-by: Ayush Kamat --- .../groovy/nextflow/util/ForchClient.groovy | 2 +- .../main/nextflow/file/http/GQLClient.groovy | 6 +++-- .../nextflow/file/http/LatchPathUtils.groovy | 22 ++++++++++--------- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index 1cfa973ed8..b507c62043 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -5,7 +5,7 @@ import nextflow.file.http.GQLClient @Slf4j class ForchClient { - private GQLClient client = new GQLClient() + private GQLClient client = new GQLClient(true) int submitTask( String displayName, diff --git a/modules/nf-httpfs/src/main/nextflow/file/http/GQLClient.groovy b/modules/nf-httpfs/src/main/nextflow/file/http/GQLClient.groovy index 5ea8767073..c4c3286ba8 100644 --- a/modules/nf-httpfs/src/main/nextflow/file/http/GQLClient.groovy +++ b/modules/nf-httpfs/src/main/nextflow/file/http/GQLClient.groovy @@ -11,6 +11,7 @@ import groovy.json.JsonSlurper class GQLClient { private String endpoint private HttpRetryClient client + private boolean useForchAuth class GQLQueryException extends Exception { GQLQueryException(String msg) { @@ -18,7 +19,8 @@ class GQLClient { } } - GQLClient() { + GQLClient(boolean useForchAuth = false) { + this.useForchAuth = useForchAuth endpoint = "https://vacuole.latch.bio/graphql" String domain = System.getenv("LATCH_SDK_DOMAIN") @@ -43,7 +45,7 @@ class GQLClient { .uri(URI.create(this.endpoint)) .timeout(Duration.ofSeconds(90)) .header("Content-Type", "application/json") - .header("Authorization", LatchPathUtils.getAuthHeader()) + .header("Authorization", LatchPathUtils.getAuthHeader(useForchAuth)) HttpRequest req = requestBuilder.POST(HttpRequest.BodyPublishers.ofString(builder.toString())).build() HttpResponse response = this.client.send(req) diff --git a/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy b/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy index ab7337a32f..c081f25e19 100644 --- a/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy +++ b/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy @@ -7,16 +7,18 @@ class LatchPathUtils { static class UnauthenticatedException extends Exception {} - static String getAuthHeader() { - def forchToken = System.getenv("forch_auth_token") - if (forchToken != null) return "Forch-Auth-Token $forchToken" - - def flyteToken = System.getenv("FLYTE_INTERNAL_EXECUTION_ID") - if (flyteToken != null) return "Latch-Execution-Token $flyteToken" - - String home = System.getProperty("user.home") - File tokenFile = new File("$home/.latch/token") - if (tokenFile.exists()) return "Latch-SDK-Token ${tokenFile.text.strip()}" + static String getAuthHeader(boolean useForchAuth = false) { + if (useForchAuth) { + def forchToken = System.getenv("forch_auth_token") + if (forchToken != null) return "Forch-Auth-Token $forchToken" + } else { + def flyteToken = System.getenv("FLYTE_INTERNAL_EXECUTION_ID") + if (flyteToken != null) return "Latch-Execution-Token $flyteToken" + + String home = System.getProperty("user.home") + File tokenFile = new File("$home/.latch/token") + if (tokenFile.exists()) return "Latch-SDK-Token ${tokenFile.text.strip()}" + } throw new UnauthenticatedException() } From 69d50ac5b3500329db297b460a30abcf8672c457 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Thu, 3 Jul 2025 10:28:28 -0700 Subject: [PATCH 24/40] make every task region local Signed-off-by: Ayush Kamat --- LATCH_VERSION | 2 +- .../src/main/groovy/nextflow/util/ForchClient.groovy | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/LATCH_VERSION b/LATCH_VERSION index 2251dc3e51..41c8218cd2 100644 --- a/LATCH_VERSION +++ b/LATCH_VERSION @@ -1 +1 @@ -v3.0.9 +v3.0.10 diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index b507c62043..2d7b2f6f2f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -26,6 +26,8 @@ class ForchClient { if (nfsServerTaskId == null) throw new RuntimeException("unable to get NFS server task id") + String region = System.getenv("host_region") ?: "us-west-2" + Map res = client.execute(""" mutation CreateForchTask( \$displayName: String!, @@ -38,6 +40,7 @@ class ForchClient { \$groupId: BigInt!, \$billedTo: BigInt!, \$nfsServerTaskId: BigInt! + \$argTargetRegion: String! ) { nfCreateForchTask( input: { @@ -51,6 +54,7 @@ class ForchClient { argGroupId: \$groupId, argBilledTo: \$billedTo, argNfsServerTaskId: \$nfsServerTaskId + argTargetRegion: \$targetRegion } ) { resTaskId @@ -68,6 +72,7 @@ class ForchClient { "groupId": resourceGroup.toInteger(), "billedTo": billingGroup.toInteger(), "nfsServerTaskId": nfsServerTaskId, + "argTargetRegion": region, ] )["nfCreateForchTask"] as Map From 3b5132dbaa540a21e55568d8715462b869dbd07b Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Wed, 9 Jul 2025 10:23:34 -0700 Subject: [PATCH 25/40] fixies Signed-off-by: Ayush Kamat --- LATCH_VERSION | 2 +- .../groovy/nextflow/forch/ForchTaskHandler.groovy | 11 +++++------ .../src/main/groovy/nextflow/util/ForchClient.groovy | 8 ++++---- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/LATCH_VERSION b/LATCH_VERSION index 41c8218cd2..545b77280f 100644 --- a/LATCH_VERSION +++ b/LATCH_VERSION @@ -1 +1 @@ -v3.0.10 +v3.0.13 diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy index e943f2baaf..23447f691e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -89,19 +89,18 @@ class ForchTaskHandler extends TaskHandler { String cmd = """\ if [[ "\$(command -v apt-get)" ]]; then - apt-get update - apt-get install -y nfs-common + apt-get update 2>&1 > /dev/null + apt-get install -y nfs-common 2>&1 > /dev/null elif [[ "\$(command -v yum)" ]]; then - yum install -y nfs-utils + yum install -y nfs-utils 2>&1 > /dev/null elif [[ "\$(command -v dnf)" ]]; then - dnf install -y nfs-utils + dnf install -y nfs-utils 2>&1 > /dev/null fi mkdir --parents ${session.baseDir} - until mount -t nfs4 [${serverIp}]:/ ${session.baseDir} + until mount -t nfs4 [${serverIp}]:/ ${session.baseDir} 2>&1 > /dev/null do - echo "failed to mount nfs share: retrying..." sleep 5 done diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index 2d7b2f6f2f..41d05034b4 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -39,8 +39,8 @@ class ForchClient { \$gpus: Int!, \$groupId: BigInt!, \$billedTo: BigInt!, - \$nfsServerTaskId: BigInt! - \$argTargetRegion: String! + \$nfsServerTaskId: BigInt!, + \$targetRegion: String! ) { nfCreateForchTask( input: { @@ -53,7 +53,7 @@ class ForchClient { argGpus: \$gpus, argGroupId: \$groupId, argBilledTo: \$billedTo, - argNfsServerTaskId: \$nfsServerTaskId + argNfsServerTaskId: \$nfsServerTaskId, argTargetRegion: \$targetRegion } ) { @@ -72,7 +72,7 @@ class ForchClient { "groupId": resourceGroup.toInteger(), "billedTo": billingGroup.toInteger(), "nfsServerTaskId": nfsServerTaskId, - "argTargetRegion": region, + "targetRegion": region, ] )["nfCreateForchTask"] as Map From 5a35a65d3760a939c130c9e0b9f58381e1c7d4e7 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Thu, 10 Jul 2025 09:54:14 -0700 Subject: [PATCH 26/40] only set status in shutdown Signed-off-by: Ayush Kamat --- LATCH_VERSION | 2 +- .../nextflow/forch/ForchExecutor.groovy | 13 +++++++ .../nextflow/util/DispatcherClient.groovy | 34 +++++++++++++++++++ 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/LATCH_VERSION b/LATCH_VERSION index 545b77280f..1328bfaa75 100644 --- a/LATCH_VERSION +++ b/LATCH_VERSION @@ -1 +1 @@ -v3.0.13 +v3.0.16 diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy index a99bd86cfd..b937e45522 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy @@ -29,6 +29,11 @@ class ForchExecutor extends Executor { // todo(ayush): decouple dispatcher and executor this.dispatcherClient = new DispatcherClient() this.forchClient = new ForchClient() + + this.session.addIgniter { + this.dispatcherClient.updateExecutionStatus("RUNNING") + } + uploadBinDir() } @@ -44,4 +49,12 @@ class ForchExecutor extends Executor { remoteBinDir = FilesEx.copyTo(session.binDir, s3) } } + + @Override + void shutdown() { + def status = this.session.isSuccess() ? "SUCCEEDED" : (this.session.isAborted() ? "ABORTED" : "FAILED") + this.dispatcherClient.updateExecutionStatus(status) + + super.shutdown() + } } diff --git a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy index 8849adb893..ec69199e0d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy @@ -12,6 +12,40 @@ class DispatcherClient { public boolean debug = System.getenv("LATCH_NF_DEBUG") == "true" + void updateExecutionStatus(String status) { + if (debug) { + return + } + + String executionId = System.getenv("forch_execution_id") + if (executionId == null) { + throw new RuntimeException("failed to update execution status: execution id not found") + } + + client.execute(""" + mutation UpdateExecutionStatus( + \$argExecutionId: BigInt! + \$argStatus: ExecutionStatus! + ) { + updateExecutionInfo( + input: { + id: \$argExecutionId, + patch: { + status: \$argStatus + } + } + ) { + clientMutationId + } + } + """, + [ + argExecutionId: executionId, + argStatus: status, + ] + ) + } + int createProcessNode(String processName) { if (debug) { return 1 From 2c45d3a824fd2b2235930701fba3dd3702863544 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Thu, 10 Jul 2025 09:54:34 -0700 Subject: [PATCH 27/40] use vars instead of getters Signed-off-by: Ayush Kamat --- .../src/main/groovy/nextflow/forch/ForchExecutor.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy index b937e45522..8feb9258db 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy @@ -52,7 +52,7 @@ class ForchExecutor extends Executor { @Override void shutdown() { - def status = this.session.isSuccess() ? "SUCCEEDED" : (this.session.isAborted() ? "ABORTED" : "FAILED") + def status = session.success ? "SUCCEEDED" : ((session.aborted || session.cancelled) ? "ABORTED" : "FAILED") this.dispatcherClient.updateExecutionStatus(status) super.shutdown() From cbf09af562d1f204cea3749e1bb6cb9e7cb5727b Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Thu, 10 Jul 2025 09:58:43 -0700 Subject: [PATCH 28/40] bump ver Signed-off-by: Ayush Kamat --- LATCH_VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LATCH_VERSION b/LATCH_VERSION index 1328bfaa75..9e9040677d 100644 --- a/LATCH_VERSION +++ b/LATCH_VERSION @@ -1 +1 @@ -v3.0.16 +v3.0.17 From 8edb4cd1ea40e98318decc0b6a0e868ddcd4c52c Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Mon, 21 Jul 2025 11:04:49 -0700 Subject: [PATCH 29/40] add abortion Signed-off-by: Ayush Kamat --- .../groovy/nextflow/container/DockerBuilder.groovy | 2 ++ .../main/groovy/nextflow/forch/ForchExecutor.groovy | 4 ++++ .../groovy/nextflow/forch/ForchTaskHandler.groovy | 2 +- .../src/main/groovy/nextflow/util/ForchClient.groovy | 12 ++++++++++++ 4 files changed, 19 insertions(+), 1 deletion(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/container/DockerBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/container/DockerBuilder.groovy index 0a648ae418..d1d0517970 100644 --- a/modules/nextflow/src/main/groovy/nextflow/container/DockerBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/container/DockerBuilder.groovy @@ -182,6 +182,8 @@ class DockerBuilder extends ContainerBuilder { // return the run command as result runCommand = result.toString() + log.warn(runCommand) + // use an explicit 'docker rm' command since the --rm flag may fail. See https://groups.google.com/d/msg/docker-user/0Ayim0wv2Ls/tDC-tlAK03YJ if( remove && name ) { removeCommand = 'docker rm ' + name diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy index 8feb9258db..af8612cd16 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchExecutor.groovy @@ -55,6 +55,10 @@ class ForchExecutor extends Executor { def status = session.success ? "SUCCEEDED" : ((session.aborted || session.cancelled) ? "ABORTED" : "FAILED") this.dispatcherClient.updateExecutionStatus(status) + String nfsServerTaskId = System.getenv("nfs_server_task_id") + if (nfsServerTaskId != null) + this.forchClient.abortTasks([Integer.parseInt(nfsServerTaskId)]) + super.shutdown() } } diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy index 23447f691e..7a991b241e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -67,7 +67,7 @@ class ForchTaskHandler extends TaskHandler { @Override void kill() { - // noop + forchClient.abortTasks([forchTaskId]) } @Override diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index 41d05034b4..12eaee3907 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -137,4 +137,16 @@ class ForchClient { return nodes[0]["taskEventContainerExitedDatumById"]["exitStatus"] as int } + + void abortTasks(List taskIds) { + client.execute(""" + mutation AbortTask(\$argTaskIds: [BigInt!]!) { + nfStopForchTasks(input: { argTaskIds: \$argTaskIds }) { + clientMutationId + } + } + """, + ["argTaskIds": taskIds] + ) + } } From bb10e471c40266a62995fda9717094c0e2cc25a6 Mon Sep 17 00:00:00 2001 From: Ryan Teoh Date: Wed, 17 Sep 2025 13:46:27 -0700 Subject: [PATCH 30/40] remove nfs install --- .../nextflow/forch/ForchTaskHandler.groovy | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy index 7a991b241e..de018c5144 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -88,23 +88,14 @@ class ForchTaskHandler extends TaskHandler { throw new RuntimeException("failed to get server ip") String cmd = """\ - if [[ "\$(command -v apt-get)" ]]; then - apt-get update 2>&1 > /dev/null - apt-get install -y nfs-common 2>&1 > /dev/null - elif [[ "\$(command -v yum)" ]]; then - yum install -y nfs-utils 2>&1 > /dev/null - elif [[ "\$(command -v dnf)" ]]; then - dnf install -y nfs-utils 2>&1 > /dev/null - fi - mkdir --parents ${session.baseDir} - + until mount -t nfs4 [${serverIp}]:/ ${session.baseDir} 2>&1 > /dev/null do - sleep 5 + sleep 5 done - - trap "{ ret=\$?; cp ${TaskRun.CMD_LOG} ${task.workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }" EXIT; + + trap "{ ret=\$?; cp ${TaskRun.CMD_LOG} ${task.workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }" EXIT; cat ${task.workDir}/${TaskRun.CMD_RUN} | bash 2>&1 | tee ${TaskRun.CMD_LOG} """.stripIndent().trim() From 83e1ea61087f63e39410dd7b50738af8045067ea Mon Sep 17 00:00:00 2001 From: Ryan Teoh Date: Thu, 18 Sep 2025 15:37:04 -0700 Subject: [PATCH 31/40] docker build platform & bump version --- Justfile | 2 +- LATCH_VERSION | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Justfile b/Justfile index 028a3b4bc1..2ad570420c 100644 --- a/Justfile +++ b/Justfile @@ -14,7 +14,7 @@ image_name := "812206152185.dkr.ecr.us-west-2.amazonaws.com/forch-nf-runtime" cp -rf ~/.nextflow ./ rm -rf .nextflow/plugins/* - docker build -t {{image_name}}:$( Date: Tue, 14 Oct 2025 15:37:11 -0700 Subject: [PATCH 32/40] Rteqs/ldata provenance (#74) * ldata provenance stuff * fix: pass execution instead of token * fix * cleanup * bump latch version * throw error * debug * fix: comma * remove sleep * rename * fix --- LATCH_VERSION | 2 +- .../groovy/nextflow/util/DispatcherClient.groovy | 6 +++--- .../main/groovy/nextflow/util/ForchClient.groovy | 13 ++++++++++--- .../src/main/nextflow/file/http/LatchPath.groovy | 12 +++++++++--- .../main/nextflow/file/http/LatchPathUtils.groovy | 2 +- 5 files changed, 24 insertions(+), 11 deletions(-) diff --git a/LATCH_VERSION b/LATCH_VERSION index bde2e21648..6bd9c39276 100644 --- a/LATCH_VERSION +++ b/LATCH_VERSION @@ -1 +1 @@ -v3.0.19 +v3.0.26 diff --git a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy index ec69199e0d..11651d9063 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/DispatcherClient.groovy @@ -28,10 +28,10 @@ class DispatcherClient { \$argStatus: ExecutionStatus! ) { updateExecutionInfo( - input: { + input: { id: \$argExecutionId, - patch: { - status: \$argStatus + patch: { + status: \$argStatus } } ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index 12eaee3907..7eeb25b707 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -28,6 +28,10 @@ class ForchClient { String region = System.getenv("host_region") ?: "us-west-2" + String forchExecutionId = System.getenv("forch_execution_id") + if (forchExecutionId == null) + throw new RuntimeException("unable to get forch_execution_id") + Map res = client.execute(""" mutation CreateForchTask( \$displayName: String!, @@ -40,7 +44,8 @@ class ForchClient { \$groupId: BigInt!, \$billedTo: BigInt!, \$nfsServerTaskId: BigInt!, - \$targetRegion: String! + \$targetRegion: String!, + \$forchExecutionId: BigInt! ) { nfCreateForchTask( input: { @@ -54,7 +59,8 @@ class ForchClient { argGroupId: \$groupId, argBilledTo: \$billedTo, argNfsServerTaskId: \$nfsServerTaskId, - argTargetRegion: \$targetRegion + argTargetRegion: \$targetRegion, + argForchExecutionId: \$forchExecutionId, } ) { resTaskId @@ -73,6 +79,7 @@ class ForchClient { "billedTo": billingGroup.toInteger(), "nfsServerTaskId": nfsServerTaskId, "targetRegion": region, + "forchExecutionId": forchExecutionId, ] )["nfCreateForchTask"] as Map @@ -98,7 +105,7 @@ class ForchClient { if (res == null) throw new RuntimeException("failed to get task status for ${forchTaskId}") - + return res["task"]["status"] } diff --git a/modules/nf-httpfs/src/main/nextflow/file/http/LatchPath.groovy b/modules/nf-httpfs/src/main/nextflow/file/http/LatchPath.groovy index 1902185fd7..2fd570b146 100644 --- a/modules/nf-httpfs/src/main/nextflow/file/http/LatchPath.groovy +++ b/modules/nf-httpfs/src/main/nextflow/file/http/LatchPath.groovy @@ -326,11 +326,14 @@ class LatchPath extends XPath { JsonBuilder builder = new JsonBuilder() builder(["path": this.toUriString(), "part_count": numParts, "content_type": mimeType]) + def latchToken = System.getenv("latch_execution_token") + def authHeader = latchToken != null ? "Latch-Execution-Token $latchToken" : LatchPathUtils.getAuthHeader() + def request = HttpRequest.newBuilder() .uri(URI.create("${host}/ldata/start-upload")) .timeout(Duration.ofSeconds(90)) .header("Content-Type", "application/json") - .header("Authorization", LatchPathUtils.getAuthHeader()) + .header("Authorization", authHeader) .POST(HttpRequest.BodyPublishers.ofString(builder.toString())) .build() @@ -424,7 +427,7 @@ class LatchPath extends XPath { .uri(URI.create("${host}/ldata/end-upload")) .timeout(Duration.ofSeconds(90)) .header("Content-Type", "application/json") - .header("Authorization", LatchPathUtils.getAuthHeader()) + .header("Authorization", authHeader) .POST(HttpRequest.BodyPublishers.ofString(endUploadBody)) .build() @@ -435,11 +438,14 @@ class LatchPath extends XPath { JsonBuilder builder = new JsonBuilder() builder(["path": this.toUriString()]) + def latchToken = System.getenv("latch_execution_token") + def authHeader = latchToken != null ? "Latch-Execution-Token $latchToken" : LatchPathUtils.getAuthHeader() + def request = HttpRequest.newBuilder() .uri(URI.create("${host}/ldata/get-signed-url")) .timeout(Duration.ofSeconds(90)) .header("Content-Type", "application/json") - .header("Authorization", LatchPathUtils.getAuthHeader()) + .header("Authorization", authHeader) .POST(HttpRequest.BodyPublishers.ofString(builder.toString())) .build() diff --git a/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy b/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy index c081f25e19..5c64efe7a8 100644 --- a/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy +++ b/modules/nf-httpfs/src/main/nextflow/file/http/LatchPathUtils.groovy @@ -78,7 +78,7 @@ class LatchPathUtils { defaultAccount } } - } + } """)["accountInfoCurrent"] as Map if (accInfo == null) From 592fffe82be937ff9686063317ba50c9cf1290fd Mon Sep 17 00:00:00 2001 From: Ryan Teoh Date: Wed, 15 Oct 2025 10:11:09 -0700 Subject: [PATCH 33/40] remove forch_execution_id from nf_create_forch_task --- LATCH_VERSION | 2 +- .../src/main/groovy/nextflow/util/ForchClient.groovy | 11 ++--------- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/LATCH_VERSION b/LATCH_VERSION index 6bd9c39276..c204e44ee6 100644 --- a/LATCH_VERSION +++ b/LATCH_VERSION @@ -1 +1 @@ -v3.0.26 +v3.0.27 diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index 7eeb25b707..4635f4424f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -28,10 +28,6 @@ class ForchClient { String region = System.getenv("host_region") ?: "us-west-2" - String forchExecutionId = System.getenv("forch_execution_id") - if (forchExecutionId == null) - throw new RuntimeException("unable to get forch_execution_id") - Map res = client.execute(""" mutation CreateForchTask( \$displayName: String!, @@ -44,8 +40,7 @@ class ForchClient { \$groupId: BigInt!, \$billedTo: BigInt!, \$nfsServerTaskId: BigInt!, - \$targetRegion: String!, - \$forchExecutionId: BigInt! + \$targetRegion: String! ) { nfCreateForchTask( input: { @@ -59,8 +54,7 @@ class ForchClient { argGroupId: \$groupId, argBilledTo: \$billedTo, argNfsServerTaskId: \$nfsServerTaskId, - argTargetRegion: \$targetRegion, - argForchExecutionId: \$forchExecutionId, + argTargetRegion: \$targetRegion } ) { resTaskId @@ -79,7 +73,6 @@ class ForchClient { "billedTo": billingGroup.toInteger(), "nfsServerTaskId": nfsServerTaskId, "targetRegion": region, - "forchExecutionId": forchExecutionId, ] )["nfCreateForchTask"] as Map From 1dbce6a1aee48ff65848f68ae42d48a7a69130e6 Mon Sep 17 00:00:00 2001 From: Ryan Teoh Date: Sat, 27 Dec 2025 20:08:05 -0800 Subject: [PATCH 34/40] fix: /bin directory permissions --- .../src/main/groovy/nextflow/forch/ForchTaskHandler.groovy | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy index de018c5144..8a00c28b0f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -90,6 +90,8 @@ class ForchTaskHandler extends TaskHandler { String cmd = """\ mkdir --parents ${session.baseDir} + chown -R root:root /bin 2>&1 > /dev/null + until mount -t nfs4 [${serverIp}]:/ ${session.baseDir} 2>&1 > /dev/null do sleep 5 From db763444b707548a58cf8edfe9cead3d518bce6f Mon Sep 17 00:00:00 2001 From: Ryan Teoh Date: Sat, 27 Dec 2025 20:18:13 -0800 Subject: [PATCH 35/40] bump version --- LATCH_VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LATCH_VERSION b/LATCH_VERSION index c204e44ee6..0251821e76 100644 --- a/LATCH_VERSION +++ b/LATCH_VERSION @@ -1 +1 @@ -v3.0.27 +v3.0.28 From b993d60ba43aaa2c2a06c0b83317d116e8176ce9 Mon Sep 17 00:00:00 2001 From: Ryan Teoh Date: Sat, 27 Dec 2025 21:33:06 -0800 Subject: [PATCH 36/40] fix: /bin is just a symlink to /usr/bin --- LATCH_VERSION | 2 +- .../src/main/groovy/nextflow/forch/ForchTaskHandler.groovy | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/LATCH_VERSION b/LATCH_VERSION index 0251821e76..c3e775ed5b 100644 --- a/LATCH_VERSION +++ b/LATCH_VERSION @@ -1 +1 @@ -v3.0.28 +v3.0.30 diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy index 8a00c28b0f..9026bea6bd 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -90,7 +90,7 @@ class ForchTaskHandler extends TaskHandler { String cmd = """\ mkdir --parents ${session.baseDir} - chown -R root:root /bin 2>&1 > /dev/null + chown -R root:root /usr/bin/mount 2>&1 > /dev/null until mount -t nfs4 [${serverIp}]:/ ${session.baseDir} 2>&1 > /dev/null do From 76f76a3910f4862d9cc2934d63f22024193ece64 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Sun, 1 Feb 2026 09:07:53 -0800 Subject: [PATCH 37/40] init Signed-off-by: Ayush Kamat --- LATCH_VERSION | 2 +- .../groovy/nextflow/forch/ForchTaskHandler.groovy | 14 ++++++++++---- .../main/groovy/nextflow/util/ForchClient.groovy | 6 +++++- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/LATCH_VERSION b/LATCH_VERSION index c3e775ed5b..45d4292900 100644 --- a/LATCH_VERSION +++ b/LATCH_VERSION @@ -1 +1 @@ -v3.0.30 +v3.0.32 diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy index 9026bea6bd..5d6a8df7b2 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -80,6 +80,13 @@ class ForchTaskHandler extends TaskHandler { int cpus = task.config.getCpus() MemoryUnit memory = task.config.getMemory() ?: MemoryUnit.of("2GiB") + final containerOpts = task.config.getContainerOptionsMap() + + MemoryUnit shm; + if (containerOpts != null && containerOpts.exists("shm-size")) { + shm = new MemoryUnit(containerOpts.getFirstValue("shm-size") as String) + } + // todo(ayush): gpu support // AcceleratorResource acc = task.config.getAccelerator() @@ -97,9 +104,7 @@ class ForchTaskHandler extends TaskHandler { sleep 5 done - trap "{ ret=\$?; cp ${TaskRun.CMD_LOG} ${task.workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }" EXIT; - - cat ${task.workDir}/${TaskRun.CMD_RUN} | bash 2>&1 | tee ${TaskRun.CMD_LOG} + cat ${task.workDir}/${TaskRun.CMD_RUN} | bash 2>&1 """.stripIndent().trim() if (remoteBinDir != null) { @@ -120,7 +125,8 @@ class ForchTaskHandler extends TaskHandler { cmd, ], cpus, - memory.bytes + memory.bytes, + shm?.bytes ) // todo(rahul): put this in a single transaction with submitTask diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index 4635f4424f..54d4afd03f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -12,7 +12,8 @@ class ForchClient { String image, List entrypoint, int cpus, - long memoryBytes + long memoryBytes, + long shmBytes // nullable ) { String resourceGroup = System.getenv("forch_resource_group_id") if (resourceGroup == null) @@ -35,6 +36,7 @@ class ForchClient { \$containerEntrypoint: [String]!, \$cpus: Int!, \$memoryBytes: BigInt!, + \$shmBytes: BigInt, \$gpuType: String, \$gpus: Int!, \$groupId: BigInt!, @@ -49,6 +51,7 @@ class ForchClient { argContainerEntrypoint: \$containerEntrypoint, argCpus: \$cpus, argMemoryBytes: \$memoryBytes, + argShmBytes: \$shmBytes, argGpuType: \$gpuType, argGpus: \$gpus, argGroupId: \$groupId, @@ -67,6 +70,7 @@ class ForchClient { "containerEntrypoint" : entrypoint, "cpus" : cpus, "memoryBytes" : memoryBytes, + "shmBytes": shmBytes, "gpuType" : null, "gpus" : 0, "groupId": resourceGroup.toInteger(), From a1ef374446714232336b210895f74d08a9776d55 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Tue, 3 Feb 2026 11:55:43 -0800 Subject: [PATCH 38/40] bump version Signed-off-by: Ayush Kamat --- LATCH_VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LATCH_VERSION b/LATCH_VERSION index 45d4292900..5aafa82c1b 100644 --- a/LATCH_VERSION +++ b/LATCH_VERSION @@ -1 +1 @@ -v3.0.32 +v3.0.33 From 2828e393ea365d4fd1a37f2b1c17afa94e0ac460 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Tue, 3 Feb 2026 12:06:41 -0800 Subject: [PATCH 39/40] upd ver Signed-off-by: Ayush Kamat --- LATCH_VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LATCH_VERSION b/LATCH_VERSION index 5aafa82c1b..3ecbef8bc6 100644 --- a/LATCH_VERSION +++ b/LATCH_VERSION @@ -1 +1 @@ -v3.0.33 +v3.0.34 From 34d5b8c0b9180181f9e0b6d01a023415d9e3665a Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Wed, 4 Feb 2026 08:28:20 -0800 Subject: [PATCH 40/40] working finally Signed-off-by: Ayush Kamat --- LATCH_VERSION | 2 +- .../nextflow/forch/ForchTaskHandler.groovy | 16 +++++++++------- .../main/groovy/nextflow/util/ForchClient.groovy | 4 ++-- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/LATCH_VERSION b/LATCH_VERSION index 3ecbef8bc6..cd76e98ba3 100644 --- a/LATCH_VERSION +++ b/LATCH_VERSION @@ -1 +1 @@ -v3.0.34 +v3.0.36 diff --git a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy index 5d6a8df7b2..e212c19b03 100644 --- a/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/forch/ForchTaskHandler.groovy @@ -82,7 +82,7 @@ class ForchTaskHandler extends TaskHandler { final containerOpts = task.config.getContainerOptionsMap() - MemoryUnit shm; + MemoryUnit shm = null; if (containerOpts != null && containerOpts.exists("shm-size")) { shm = new MemoryUnit(containerOpts.getFirstValue("shm-size") as String) } @@ -116,17 +116,19 @@ class ForchTaskHandler extends TaskHandler { """.stripIndent() + cmd } + List entrypoint = [ + "/bin/bash", + "-c", + cmd, + ] + this.forchTaskId = this.forchClient.submitTask( this.task.name, this.task.container, - [ - "/bin/bash", - "-c", - cmd, - ], + entrypoint, cpus, memory.bytes, - shm?.bytes + shm?.bytes ?: 0 ) // todo(rahul): put this in a single transaction with submitTask diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy index 54d4afd03f..4f121d9c24 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ForchClient.groovy @@ -7,7 +7,7 @@ import nextflow.file.http.GQLClient class ForchClient { private GQLClient client = new GQLClient(true) - int submitTask( + public int submitTask( String displayName, String image, List entrypoint, @@ -70,7 +70,7 @@ class ForchClient { "containerEntrypoint" : entrypoint, "cpus" : cpus, "memoryBytes" : memoryBytes, - "shmBytes": shmBytes, + "shmBytes": shmBytes == 0 ? null : shmBytes, "gpuType" : null, "gpus" : 0, "groupId": resourceGroup.toInteger(),