Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
3c7ab97
init
ayushkamat Apr 17, 2025
dcbcd28
dur
ayushkamat Apr 26, 2025
000c376
push forgotten changes
ayushkamat May 22, 2025
763916f
update forch cli
ayushkamat May 29, 2025
00802f6
forch make gql queries via forch-client token
rahuldesai1 Jun 9, 2025
7838be4
add resource and billing group ids
rahuldesai1 Jun 9, 2025
b8eba80
fixes
rahuldesai1 Jun 10, 2025
c6d109c
remove debug tag
rahuldesai1 Jun 10, 2025
c4062b5
finish
ayushkamat Jun 12, 2025
80aa48e
create ForchClient and add forchExecutionid to process node creation
rahuldesai1 Jun 12, 2025
052d8f4
Merge remote-tracking branch 'origin/ayush/forch' into rahuldesai1/fo…
rahuldesai1 Jun 12, 2025
3717181
fixes
rahuldesai1 Jun 13, 2025
f2632e5
working with gql and rls
rahuldesai1 Jun 16, 2025
4832468
update execution info with forch task id
rahuldesai1 Jun 16, 2025
c9cacf8
move task status logic to vac function
rahuldesai1 Jun 18, 2025
00fa840
forchexecid -> execid
rahuldesai1 Jun 18, 2025
703f417
whoops
rahuldesai1 Jun 19, 2025
5f79539
fix everything;
rahuldesai1 Jun 19, 2025
f85acb8
fix
rahuldesai1 Jun 19, 2025
324e3e1
Merge pull request #71 from latchbio/rahuldesai1/forch-gql
rahuldesai1 Jun 19, 2025
35d3061
update task status function
rahuldesai1 Jun 20, 2025
8a18574
add s5cmd
ayushkamat Jun 27, 2025
d953e0a
use forch auth in dispatcher client too
ayushkamat Jul 2, 2025
afb02f7
bump ver
ayushkamat Jul 2, 2025
af05397
fucking retard
ayushkamat Jul 2, 2025
69d50ac
make every task region local
ayushkamat Jul 3, 2025
3b5132d
fixies
ayushkamat Jul 9, 2025
5a35a65
only set status in shutdown
ayushkamat Jul 10, 2025
2c45d3a
use vars instead of getters
ayushkamat Jul 10, 2025
cbf09af
bump ver
ayushkamat Jul 10, 2025
8edb4cd
add abortion
ayushkamat Jul 21, 2025
bb10e47
remove nfs install
rteqs Sep 17, 2025
83e1ea6
docker build platform & bump version
rteqs Sep 18, 2025
5087298
Rteqs/ldata provenance (#74)
rteqs Oct 14, 2025
592fffe
remove forch_execution_id from nf_create_forch_task
rteqs Oct 15, 2025
1dbce6a
fix: /bin directory permissions
rteqs Dec 28, 2025
db76344
bump version
rteqs Dec 28, 2025
b993d60
fix: /bin is just a symlink to /usr/bin
rteqs Dec 28, 2025
76f76a3
init
ayushkamat Feb 1, 2026
a1ef374
bump version
ayushkamat Feb 3, 2026
2828e39
upd ver
ayushkamat Feb 3, 2026
34d5b8c
working finally
ayushkamat Feb 4, 2026
03630c4
Merge pull request #77 from latchbio/ayush/fixies
ayushkamat Feb 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# syntax = docker/dockerfile:1.4.1

from alpine:3.22.0

run apk add \
bash \
curl \
openjdk21-jre-headless

run curl -sSL https://github.com/jqlang/jq/releases/download/jq-1.8.1/jq-linux-amd64 -o /bin/jq
run chmod +x /bin/jq

copy ./.nextflow /root/.nextflow
copy ./nextflow /usr/bin/nextflow
11 changes: 11 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ build-sync:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -ldflags '-extldflags "-static"' -o custom_fsync.bin custom_fsync/sync.go
chmod +x custom_fsync

image_name := "812206152185.dkr.ecr.us-west-2.amazonaws.com/forch-nf-runtime"

@dbnp:
cp -rf ~/.nextflow ./
rm -rf .nextflow/plugins/*

docker build --platform linux/amd64 -t {{image_name}}:$(<LATCH_VERSION) .
docker push {{image_name}}:$(<LATCH_VERSION)

rm -rf .nextflow

build:
#!/usr/bin/env bash

Expand Down
2 changes: 1 addition & 1 deletion LATCH_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v3.0.4
v3.0.36
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ class DockerBuilder extends ContainerBuilder<DockerBuilder> {
// return the run command as result
runCommand = result.toString()

log.warn(runCommand)

// use an explicit 'docker rm' command since the --rm flag may fail. See https://groups.google.com/d/msg/docker-user/0Ayim0wv2Ls/tDC-tlAK03YJ
if( remove && name ) {
removeCommand = 'docker rm ' + name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,14 +444,8 @@ class BashWrapperBuilder {
int attempt=0
while( true ) {
try {
// note(taras): always sync to disk to ensure that the file is visible to other clients
try(
FileOutputStream fos = new FileOutputStream(path.toFile());
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos))
) {
try (BufferedWriter writer=Files.newBufferedWriter(path, CREATE,WRITE,TRUNCATE_EXISTING)) {
writer.write(data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might still want to flush + fsync?

writer.flush()
fos.getFD().sync()
}
return path
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.executor.local.LocalExecutor
import nextflow.forch.ForchExecutor
import nextflow.k8s.K8sExecutor
import nextflow.script.BodyDef
import nextflow.script.ProcessConfig
Expand Down Expand Up @@ -61,7 +62,8 @@ class ExecutorFactory {
'nqsii': NqsiiExecutor,
'moab': MoabExecutor,
'oar': OarExecutor,
'hq': HyperQueueExecutor
'hq': HyperQueueExecutor,
'forch': ForchExecutor,
]

@PackageScope Map<String, Class<? extends Executor>> executorsMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class LatchPathFactory extends FileSystemPathFactory {

@Override
protected String getBashLib(Path target) {
if (target.scheme != "latch") {
if (target == null || target.scheme != "latch") {
return null
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package nextflow.forch

import java.nio.file.Path

import groovy.util.logging.Slf4j
import nextflow.executor.Executor
import nextflow.extension.FilesEx
import nextflow.processor.TaskHandler
import nextflow.processor.TaskMonitor
import nextflow.processor.TaskPollingMonitor
import nextflow.processor.TaskRun
import nextflow.util.DispatcherClient
import nextflow.util.ForchClient
import nextflow.util.Duration

@Slf4j
class ForchExecutor extends Executor {

Path remoteBinDir = null
private ForchClient forchClient

@Override
protected TaskMonitor createTaskMonitor() {
return TaskPollingMonitor.create(session, name, 100, Duration.of("15s"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

del the ForchTaskMonitor? or is that going to be used later?
idk what the downsides of the TaskPollingMonitor are

}

@Override
protected void register() {
// todo(ayush): decouple dispatcher and executor
this.dispatcherClient = new DispatcherClient()
this.forchClient = new ForchClient()

this.session.addIgniter {
this.dispatcherClient.updateExecutionStatus("RUNNING")
}

uploadBinDir()
}

@Override
TaskHandler createTaskHandler(TaskRun task) {
return new ForchTaskHandler(task, remoteBinDir, session, this.forchClient, this.dispatcherClient)
}

protected void uploadBinDir() {
if( session.binDir && !session.binDir.empty() ) {
def s3 = getTempDir()
log.info "Uploading local `bin` scripts folder to ${s3.toUriString()}/bin"
remoteBinDir = FilesEx.copyTo(session.binDir, s3)
}
}

@Override
void shutdown() {
def status = session.success ? "SUCCEEDED" : ((session.aborted || session.cancelled) ? "ABORTED" : "FAILED")
this.dispatcherClient.updateExecutionStatus(status)

String nfsServerTaskId = System.getenv("nfs_server_task_id")
if (nfsServerTaskId != null)
this.forchClient.abortTasks([Integer.parseInt(nfsServerTaskId)])

super.shutdown()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package nextflow.forch

import java.nio.file.Path

import nextflow.executor.BashFunLib
import nextflow.executor.SimpleFileCopyStrategy
import nextflow.util.Escape

class ForchFileCopyStrategy extends SimpleFileCopyStrategy {

@Override
String getBeforeStartScript() {
def lib = new BashFunLib().coreLib()

return lib + "\n\n" + """\
nxf_s5cmd_upload() {
local name=\$1
local s3path=\$2
if [[ "\$name" == - ]]; then
echo 's5cmd --no-verify-ssl pipe "\$s3path"'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we doing no-verify-ssl btw?

s5cmd --no-verify-ssl pipe "\$s3path"
elif [[ -d "\$name" ]]; then
s5cmd --no-verify-ssl cp "\$name" "\$s3path/"
else
s5cmd --no-verify-ssl cp "\$name" "\$s3path/\$name"
fi
}

nxf_s5cmd_download() {
local source=\$1
local target=\$2
local file_name=\$(basename \$1)
local is_dir=\$(s5cmd --no-verify-ssl ls \$source | grep -F "DIR \${file_name}/" -c)
if [[ \$is_dir == 1 ]]; then
s5cmd --no-verify-ssl cp "\$source/*" "\$target"
else
s5cmd --no-verify-ssl cp "\$source" "\$target"
fi
}


""".stripIndent()
}

@Override
String getStageInputFilesScript(Map<String, Path> inputFiles) {
def result = 'downloads=(true)\n'
result += super.getStageInputFilesScript(inputFiles) + '\n'
result += 'nxf_parallel "${downloads[@]}"\n'
return result
}

@Override
protected String stageInCommand(String source, String target, String mode) {
return "downloads+=(\"nxf_s5cmd_download s3:/${Escape.path(source)} ${Escape.path(target)}\")"
}

@Override
String getUnstageOutputFilesScript(List<String> outputFiles, Path targetDir) {
final patterns = normalizeGlobStarPaths(outputFiles)

if( !patterns )
return null

final escape = new ArrayList(outputFiles.size())
for( String it : patterns )
escape.add( Escape.path(it) )

return """\
uploads=()
IFS=\$'\\n'
for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do
uploads+=("nxf_s5cmd_upload '\$name' s3:/${Escape.path(targetDir)}")
done
unset IFS
nxf_parallel "\${uploads[@]}"
""".stripIndent(true)
}

@Override
String touchFile(Path file) {
return "echo start | s5cmd --no-verify-ssl pipe s3:/${Escape.path(file)}"
}

@Override
String fileStr( Path path ) {
Escape.path(path.getFileName())
}

@Override
String copyFile( String name, Path target ) {
"s5cmd --no-verify-ssl cp ${Escape.path(name)} s3:/${Escape.path(target)}"
}

@Override
String exitFile(Path file) {
return "| s5cmd --no-verify-ssl pipe s3:/${Escape.path(file)} || true"
}

@Override
String pipeInputFile(Path file) {
return " < ${Escape.path(file.getFileName())}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return " < ${Escape.path(file.getFileName())}"
return " < ${this.fileStr(file)}"

perhaps?

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package nextflow.forch

import nextflow.util.DispatcherClient
import nextflow.util.ForchClient

import java.nio.file.Path

import groovy.util.logging.Slf4j

import nextflow.Session

import nextflow.processor.TaskHandler
import nextflow.processor.TaskRun
import nextflow.processor.TaskStatus
import nextflow.script.ProcessConfig
import nextflow.util.MemoryUnit

@Slf4j
class ForchTaskHandler extends TaskHandler {

ProcessConfig processConfig
Integer forchTaskId
Path remoteBinDir = null
private ForchClient forchClient
private DispatcherClient dispatcherClient
Session session

ForchTaskHandler(TaskRun task, Path remoteBinDir, Session session, ForchClient forchClient, DispatcherClient dispatcherClient) {
super(task)

this.processConfig = task.processor.config
this.remoteBinDir = remoteBinDir
this.forchClient = forchClient
this.dispatcherClient = dispatcherClient

this.session = session
}

private String getCurrentStatus() {
if (this.forchTaskId == null) return

return this.forchClient.getTaskStatus(this.forchTaskId)
}

@Override
boolean checkIfRunning() {
def running = this.currentStatus == 'RUNNING'
if (running)
status = TaskStatus.RUNNING
return running
}

@Override
boolean checkIfCompleted() {
def cur = this.currentStatus
if (cur != "SUCCEEDED" && cur != "FAILED") return false

// todo(ayush): single query
task.exitStatus = this.forchClient.getTaskExitCode(this.forchTaskId)

// todo(ayush): logs, retries
task.stdout = ""
task.stderr = ""
status = TaskStatus.COMPLETED
return true
}

@Override
void kill() {
forchClient.abortTasks([forchTaskId])
}

@Override
void prepareLauncher() {
new ForchTaskWrapperBuilder(this.task.toTaskBean()).build()
}

@Override
void submit() {
int cpus = task.config.getCpus()
MemoryUnit memory = task.config.getMemory() ?: MemoryUnit.of("2GiB")

final containerOpts = task.config.getContainerOptionsMap()

MemoryUnit shm = null;
if (containerOpts != null && containerOpts.exists("shm-size")) {
shm = new MemoryUnit(containerOpts.getFirstValue("shm-size") as String)
}

// todo(ayush): gpu support
// AcceleratorResource acc = task.config.getAccelerator()

def serverIp = System.getenv("latch_internal_nfs_server_ip")
if (serverIp == null)
throw new RuntimeException("failed to get server ip")

String cmd = """\
mkdir --parents ${session.baseDir}

chown -R root:root /usr/bin/mount 2>&1 > /dev/null

until mount -t nfs4 [${serverIp}]:/ ${session.baseDir} 2>&1 > /dev/null
do
sleep 5
done

cat ${task.workDir}/${TaskRun.CMD_RUN} | bash 2>&1
""".stripIndent().trim()

if (remoteBinDir != null) {
cmd = """\
mkdir -p /nextflow-bin
cp ${remoteBinDir}/* /nextflow-bin
chmod +x /nextflow-bin/*
export PATH=/nextflow-bin:\$PATH
""".stripIndent() + cmd
}

List<String> entrypoint = [
"/bin/bash",
"-c",
cmd,
]

this.forchTaskId = this.forchClient.submitTask(
this.task.name,
this.task.container,
entrypoint,
cpus,
memory.bytes,
shm?.bytes ?: 0
)

// todo(rahul): put this in a single transaction with submitTask
this.dispatcherClient.updateForchTaskId(
this.taskExecutionId,
this.forchTaskId
)
}
}
Loading