Skip to content

Conversation

@darrell-d
Copy link
Collaborator

Looking for feedback on the approach to. cancelling a workflow using channels. Any threading traps I might run into on this environment?

Have a few blank functions that I need to fill in, and create endpoints in further PRs

main.go Outdated

csvWriter.Write([]string{"IntegrationID", "PID"})

var wg sync.WaitGroup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% on this, but I feel like it is unusual for this be be declared here and passed into processSQS() as a pointer. Can this just be moved inside the function so it doesn't have to be passed in?

Copy link
Collaborator Author

@darrell-d darrell-d Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was unsure about it as well, however I thought that since processSQS() is called continuously in a loop that it would lead to many waitgroups being made which at best is wasteful and at worst could lead to some threading locks / memory leaks /other unknowns

main.go Outdated

// Receive complete messages when nextflow task is done
// Then clean up input / output files
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think because of a gotcha in Go loop variables prior to version 1.22, this function should have a msg types.Message arg like the function above, and msg passed in. https://go.dev/doc/faq#closures_and_goroutines

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch, didn't know about that. Thanks

@darrell-d darrell-d changed the title WIP: Cancel workflow Cancel workflow Jan 28, 2025
logger.Error(err.Error())
os.Exit(1)
}
return err
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it would ever get here since each conditional does an os.Exit(1)

return baseDir
}

func readFile(baseDir, integrationID string) ([]byte, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might want to rename this to something more specific to pid files like readPidFile

return content, nil
}

time.Sleep(100 * time.Millisecond)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this for debugging? or is this something to wait indefinitely for the file to have a value?

Comment on lines +35 to +61
# compute node / workflow manager specific
subnet_ids = os.environ['SUBNET_IDS']
cluster_name = os.environ['CLUSTER_NAME']
security_group = os.environ['SECURITY_GROUP_ID']
base_dir = os.environ['BASE_DIR']
env = os.environ['ENVIRONMENT']

# App specific - params? - defaults on app creation, then overriden on run
# session token retrieval should be on the processor(s)
pennsieve_host = ""
pennsieve_host2 = ""
pennsieve_upload_bucket = "" # agent specific
pennsieve_agent_home = "/tmp" # agent specific

if env == "dev":
pennsieve_host = "https://api.pennsieve.net"
pennsieve_host2 = "https://api2.pennsieve.net"
pennsieve_upload_bucket = "pennsieve-dev-uploads-v2-use1"
else:
pennsieve_host = "https://api.pennsieve.io"
pennsieve_host2 = "https://api2.pennsieve.io"

container_name = ""
task_definition_name = ""



Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this all got added back by accident

}

func ProcessSQS(ctx context.Context, client SQSService, queueUrl string, logger *slog.Logger) (bool, error) {
var wg sync.WaitGroup
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what did the wait group end up being needed for?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

3 participants