Skip to content

Conversation

@PaulM5406
Copy link
Contributor

Hey @TkTech,

I worked on an implementation to enforce global concurrency by job and partition before the case of rate limiting as it is more useful for my team and it looks easier.

I tried designing a SQL query in fetch_jobs that respects global concurrency and would still be performant. It is not perfect though as there is the edge case where a lot of jobs subject to concurrency constraint could block a queue but it gives us ground for discussion.

I liked your idea of faking it using temporary queues too. Maybe we could use this for rate limiting ? I have the intuition we can solve the concurrency problem in SQL and stay performant.

chancy/app.py Outdated
job = job.job
if job.concurrency_key:
cursor.execute(
self._push_concurrency_config_sql(),
Copy link
Owner

@TkTech TkTech Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got some concerns about doing this per-job - if users are doing this the "right" way they may be batching tens of thousands of jobs at a time, which is going to issue an extra query per job with a concurrency_key. IMO we should be gathering up all the unique concurrency keys and issuing 1 bulk upsert.

Alternatively, and which might be better, we should require the user to call a function to push a concurrency rule before they take effect - a concurrency_key with no rule set would just be ignored when fetching jobs. This way we can remove this upsert here entirely and never have to process individual jobs as they get inserted.

Copy link
Contributor Author

@PaulM5406 PaulM5406 Aug 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right for the bulk insert, would be easy to do. Should we insert the jobs as one bulk insert as well ?

I went on this implementation because I found the API simpler for the user but maybe you're wright that might be clearer to push ahead of time a concurrency rule.

Copy link
Contributor Author

@PaulM5406 PaulM5406 Aug 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The think is, having a table with concurrency keys as rows that can be locked the time to fetch jobs in workers looks like the easiest way to implement concurrency and avoid race conditions. On job push is the first moment we know the full concurrency key when using a partition id.
We could use a trigger too if we do not support methods as concurrency key. If we use a bulk insert for jobs, the trigger would be more performant as it avoid a round trip.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have implemented deduplication of concurrency key and bulk insert. Performance should be fine even when inserting a high number of jobs.

Concurrency
-----------------------

Control the number of jobs that can run simultaneously using concurrency:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably reword this a bit to make it clear immediately that this impacts global concurrency. I see you've got the note down below but it's the core reason to use this. Something like:

"Control the number of jobs with the same concurrency key that can run simultaneously across all workers and queues using with_concurrency():"?

await cursor.execute(
sql.SQL(
"""
CREATE TABLE {concurrency_configs} (
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried about high cardinality in the concurrency_key (like <func_name>_<user ID>), it might quickly result in a lot of rows. But, that gotcha might just need to be documented to be A-OK.

Copy link
Contributor Author

@PaulM5406 PaulM5406 Aug 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid the table to increase infinitely, we could have the pruner plugin to clean this table on a regular basis.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have extended the pruner plugin to prune new concurrency_configs table.

@PaulM5406
Copy link
Contributor Author

PaulM5406 commented Aug 28, 2025

Update: work is still ongoing. Need to find some time.

@PaulM5406 PaulM5406 force-pushed the implement-concurrency branch from 9e04bf0 to 92fa930 Compare December 29, 2025 16:24
@PaulM5406
Copy link
Contributor Author

PaulM5406 commented Dec 29, 2025

Hey @TkTech, I have found the time to go on with the work after thinking extensively about it.

Tradeoffs I did:

  • simplicity of the api to add concurrency limits
    It couples concurrency limits definition with job push. I admit this is not ideal but it simplifies the API and it easily supports concurrency limits on a partition key.

  • simplicity of the implementation
    I looked at how other products implement global concurrency.

hatchet: use a scheduler that runs in the hatchet service. Chancy leader could have this responsability. I chose to not go in this direction to keep things simple and I am not sure adding such a responsability to a leader that is a worker too is a good idea...

temporal, pgqueuer: not implemented, it is a feature request

kafka: if I could dare a parallel, the implementation in Chancy would be close to create dynamic and ephemeral queues in workers for every different concurrency keys. I don't think this is a proper solution that would scale because of the high cardinality in concurrency_keys when using partition key for exemple... each queue polling its jobs in a different while True loop.

None of these 2 alternatives looked easy and fit into Chancy current architecture, so I decided to go for resolving concurrency limits when fetching jobs. It has downsides for sure:

  • adding overhead to the fetch query. For what I could observe, overhead looks ok for the usecases I tested.
  • the case with a lot of jobs with same concurrency keys and low concurrency limits could block fetching other jobs because we need to scan for more jobs but it is hard to know how many more in advance... I think for this use case, creating a new queue is better. We could add it to the documentation ? We could also implement an adaptative way of determining how many jobs to fetch ?
  • hard enforcement of concurrency limits across all workers
    This implies locking on a concurrency key but the it could create contention.

Please tell me what you think about this work and if you could see it in chancy in the future ?

Thanks again for all the work on chancy !

EDIT:
steady-queue implements global concurrency in another way: check concurrency limits at enqueue time and flag tasks to be ready to be picked by worker, or flag them as blocked. When a task with concurrency control finished (success or error), switch a blocked task to ready.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants