Skip to content

forch draft#70

Draft
ayushkamat wants to merge 43 commits intolatch/3.x.xfrom
ayush/forch
Draft

forch draft#70
ayushkamat wants to merge 43 commits intolatch/3.x.xfrom
ayush/forch

Conversation

@ayushkamat
Copy link

No description provided.

Signed-off-by: Ayush Kamat <ayush@latch.bio>

@Override
protected TaskMonitor createTaskMonitor() {
return TaskPollingMonitor.create(session, name, 100, Duration.of("15s"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

del the ForchTaskMonitor? or is that going to be used later?
idk what the downsides of the TaskPollingMonitor are


String command = "forch status ${forchTaskId}"
StringBuilder stdout = new StringBuilder(), stderr = new StringBuilder();
Process proc = command.execute()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this API is crazy wtf Groovy


@Override
void kill() {
// noop
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi you can insert a container-killed event (if it's running) or set cancelled on the task (if it's not running) to kill it

see pods-stop in nucleus

gpus: int


async def create_task(payload: CreateTaskInput):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

u can just wrap this function in with_conn_retry without a helper, since it doesn't do anything else

]


async def get_task_status(task_id: int):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap with @with_conn_retry directly

coalesce(
(
select
(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the indentation is looking sus

'submitted'
when te.type = 'container-created' then
'running'
when te.type = 'container-exited' then
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably also want to consider the forch_pub.tasks.cancelled_at field


async def main():
await pool.open()
args = sys.argv[1:]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use argparse, it's probably easier to dela with than sys.argv

form argparse import ArgumentParser

argp = ArgumentParser("forch")
cmds = argp.add_subparsers()

argp_create = cmds.add_parser("create")
argp_create.add_argument("spec")

argp_status = cmds.add_parser("status")
argp_status.add_argument("task_id")

args = argp.parse_args()

Signed-off-by: Ayush Kamat <ayush@latch.bio>
Signed-off-by: Ayush Kamat <ayush@latch.bio>
Signed-off-by: Ayush Kamat <ayush@latch.bio>
if( scratch==null )
scratch = true

// include task script as an input to force its staging in the container work directory
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no idea what any of this means

forch Outdated

config = read_config(AppConfig)

pool = get_pool(config.db, "nextflow_forch_test", read_only=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename the app

forch Outdated
(
select
(
select
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need a subquery for case

forch Outdated
'running'
when te.type = 'container-exited' then
(
select
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for a subquery

BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos))
) {
try (BufferedWriter writer=Files.newBufferedWriter(path, CREATE,WRITE,TRUNCATE_EXISTING)) {
writer.write(data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might still want to flush + fsync?

local name=\$1
local s3path=\$2
if [[ "\$name" == - ]]; then
echo 's5cmd --no-verify-ssl pipe "\$s3path"'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we doing no-verify-ssl btw?


@Override
String pipeInputFile(Path file) {
return " < ${Escape.path(file.getFileName())}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return " < ${Escape.path(file.getFileName())}"
return " < ${this.fileStr(file)}"

perhaps?

ayushkamat and others added 15 commits July 10, 2025 09:54
Signed-off-by: Ayush Kamat <ayush@latch.bio>
Signed-off-by: Ayush Kamat <ayush@latch.bio>
Signed-off-by: Ayush Kamat <ayush@latch.bio>
* ldata provenance stuff

* fix: pass execution instead of token

* fix

* cleanup

* bump latch version

* throw error

* debug

* fix: comma

* remove sleep

* rename

* fix
Signed-off-by: Ayush Kamat <ayush@latch.bio>
Signed-off-by: Ayush Kamat <ayush@latch.bio>
Signed-off-by: Ayush Kamat <ayush@latch.bio>
Signed-off-by: Ayush Kamat <ayush@latch.bio>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants