ThreadPool concurrency refactoring (#2220)#3
ThreadPool concurrency refactoring (#2220)#3MitchLewis930 wants to merge 1 commit intopr_053_beforefrom
Conversation
- Wait for threads to enter waiting loop on ThreadPool startup - Simplify #spawn_thread inner threadpool loop - Refactor TestThreadPool to make tests faster and more stable Co-authored-by: Nate Berkopec <nate.berkopec@gmail.com>
📝 WalkthroughWalkthroughThreadPool concurrency handling is refactored with a new Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@lib/puma/thread_pool.rb`:
- Around line 105-112: When a worker handles a trim (inside the
mutex.synchronize block that checks `@trim_requested`) it exits without signaling
`@not_full`, which can leave threads blocked in wait_until_not_full or
MutexPool#<<; modify the trim branch so that before decrementing `@spawned` /
deleting from `@workers` and calling Thread.exit it signals (or broadcasts)
`@not_full` (e.g., `@not_full.signal` or `@not_full.broadcast`) while still holding
the mutex so any waiters are woken and can re-check capacity; ensure the change
is made in the same block that manipulates `@trim_requested`, `@spawned` and
`@workers` to keep state consistent.
In `@test/test_thread_pool.rb`:
- Around line 26-33: MutexPool#<< currently does a single `@not_full.wait`(`@mutex`)
which can return early or hang if workers exit during forced trims; change the
wait to a loop that checks a clear completion condition (e.g. until queued items
processed or queue size below capacity) and use `@not_full.wait`(`@mutex`, timeout)
inside the loop to avoid permanent hang. Update the block inside with_mutex
around work.each {|arg| super arg} to set/inspect a condition (such as
`@queue.empty`? or a processed-count) and loop while that condition is not
satisfied, also accounting for worker shutdown/trim flags so the loop breaks if
workers have exited. Ensure you still yield if block_given? and that the loop
re-checks the completion predicate after each timed wait or signal.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
History.mdlib/puma/thread_pool.rbtest/test_thread_pool.rb
🧰 Additional context used
🧬 Code graph analysis (1)
test/test_thread_pool.rb (1)
lib/puma/thread_pool.rb (5)
work(149-163)with_mutex(142-146)trim(215-223)auto_trim!(269-272)shutdown(281-323)
🔇 Additional comments (5)
lib/puma/thread_pool.rb (2)
56-60: Startup wait after spawning looks solid.Waiting on
@not_fullafter each initial spawn helps ensure the worker enters its wait loop beforeinitializereturns.
77-79: Consistent locking viawith_muteximproves auditability.Nice consolidation of synchronization in backlog/enqueue/wait/trim/reap/shutdown.
Also applies to: 148-163, 194-209, 215-223, 227-239, 281-292
test/test_thread_pool.rb (2)
16-19: MutexPool helper additions look good.The
mutex_poolfactory and helper methods make test coordination clearer.Also applies to: 21-25, 35-43
46-241: Test updates align with the new synchronization model.The updated assertions and new startup waiting test track the refactor well.
History.md (1)
32-40: History entries align with the refactor/bugfix scope.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| mutex.synchronize do | ||
| while todo.empty? | ||
| if @trim_requested > 0 | ||
| @trim_requested -= 1 | ||
| continue = false | ||
| not_full.signal | ||
| break | ||
| end | ||
|
|
||
| if @shutdown | ||
| continue = false | ||
| break | ||
| @spawned -= 1 | ||
| @workers.delete th | ||
| Thread.exit | ||
| end |
There was a problem hiding this comment.
Signal @not_full when a worker exits on trim.
A thread that exits before incrementing @waiting never signals @not_full. If a waiter is blocked in wait_until_not_full (or tests using MutexPool#<<), it can remain blocked indefinitely. Consider signaling (or broadcasting) before Thread.exit.
🔧 Proposed fix
if `@trim_requested` > 0
`@trim_requested` -= 1
`@spawned` -= 1
`@workers.delete` th
+ not_full.signal
Thread.exit
end🤖 Prompt for AI Agents
In `@lib/puma/thread_pool.rb` around lines 105 - 112, When a worker handles a trim
(inside the mutex.synchronize block that checks `@trim_requested`) it exits
without signaling `@not_full`, which can leave threads blocked in
wait_until_not_full or MutexPool#<<; modify the trim branch so that before
decrementing `@spawned` / deleting from `@workers` and calling Thread.exit it
signals (or broadcasts) `@not_full` (e.g., `@not_full.signal` or
`@not_full.broadcast`) while still holding the mutex so any waiters are woken and
can re-check capacity; ensure the change is made in the same block that
manipulates `@trim_requested`, `@spawned` and `@workers` to keep state consistent.
| def <<(work, &block) | ||
| work = [work] unless work.is_a?(Array) | ||
| with_mutex do | ||
| work.each {|arg| super arg} | ||
| yield if block_given? | ||
| @not_full.wait(@mutex) | ||
| end | ||
| end |
There was a problem hiding this comment.
MutexPool#<< can return early or hang for batches/forced trims.
It waits for a single @not_full signal, which doesn’t guarantee all queued items are done and can block if workers exit on forced trims. Consider waiting in a loop on a completion condition (optionally with a timed wait).
🔧 Proposed fix
def <<(work, &block)
work = [work] unless work.is_a?(Array)
with_mutex do
work.each {|arg| super arg}
yield if block_given?
- `@not_full.wait`(`@mutex`)
+ `@not_full.wait`(`@mutex`) until `@todo.empty`? && `@waiting` == `@spawned`
end
end📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def <<(work, &block) | |
| work = [work] unless work.is_a?(Array) | |
| with_mutex do | |
| work.each {|arg| super arg} | |
| yield if block_given? | |
| @not_full.wait(@mutex) | |
| end | |
| end | |
| def <<(work, &block) | |
| work = [work] unless work.is_a?(Array) | |
| with_mutex do | |
| work.each {|arg| super arg} | |
| yield if block_given? | |
| `@not_full.wait`(`@mutex`) until `@todo.empty`? && `@waiting` == `@spawned` | |
| end | |
| end |
🤖 Prompt for AI Agents
In `@test/test_thread_pool.rb` around lines 26 - 33, MutexPool#<< currently does a
single `@not_full.wait`(`@mutex`) which can return early or hang if workers exit
during forced trims; change the wait to a loop that checks a clear completion
condition (e.g. until queued items processed or queue size below capacity) and
use `@not_full.wait`(`@mutex`, timeout) inside the loop to avoid permanent hang.
Update the block inside with_mutex around work.each {|arg| super arg} to
set/inspect a condition (such as `@queue.empty`? or a processed-count) and loop
while that condition is not satisfied, also accounting for worker shutdown/trim
flags so the loop breaks if workers have exited. Ensure you still yield if
block_given? and that the loop re-checks the completion predicate after each
timed wait or signal.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@test/test_thread_pool.rb`:
- Around line 21-43: MutexPool#<< currently waits once on `@not_full` which can
return when only one worker becomes idle; change it to wait in a loop until both
the work queue is empty and all worker threads are idle. Inside the with_mutex
block replace the single `@not_full.wait`(`@mutex`) with a loop that repeatedly
calls `@not_full.wait`(`@mutex`) while !( `@queue.empty`? && `@pool.all`? { |th| /* idle
check for thread th */ } ), i.e. keep waiting until `@queue.empty`? and every
thread in `@pool` is idle, using whatever thread-idle check the Puma::ThreadPool
uses in this codebase. Ensure you reference MutexPool#<<, `@not_full`, `@mutex`,
`@queue` and `@pool` when making the change.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
History.mdlib/puma/thread_pool.rbtest/test_thread_pool.rb
🧰 Additional context used
🧬 Code graph analysis (1)
test/test_thread_pool.rb (1)
lib/puma/thread_pool.rb (5)
work(149-163)with_mutex(142-146)trim(215-223)auto_trim!(269-272)shutdown(281-323)
🔇 Additional comments (25)
History.md (2)
32-32: Bugfix entry matches the PR scope.
40-40: Refactor entry looks accurate for the ThreadPool changes.lib/puma/thread_pool.rb (10)
56-60: Startup wait makes initial sizing deterministic.
78-78: Backlog read is now properly synchronized.
107-112: Trim exit path now keeps counters and worker list consistent.
120-120: Queue shift is safely protected by the mutex.
142-146: with_mutex helper is a clean re-entrancy fix.
150-162: Enqueue path is now consistently synchronized.
195-208: wait_until_not_full is safely wrapped with the mutex.
211-222: Trim synchronization looks solid.
228-239: Reap now runs under the mutex as expected.
282-292: Shutdown now captures and signals under lock cleanly.test/test_thread_pool.rb (13)
16-19: Helper for mutex-backed pools is clear and reusable.
48-55: Updated spawn test aligns with mutex_pool behavior.
57-63: Thread naming test is a nice coverage addition.
88-93: Trim test updated correctly for mutex_pool.
99-104: Trim-minimum behavior test update looks good.
113-119: Force-trim test adapts cleanly to new helper.
125-131: No-waiting-threads trim test update is sound.
138-149: Auto-trim timing test update looks good.
158-165: Cleanliness test update is consistent with batching.
173-176: Reap-only-dead-threads setup is updated correctly.
199-207: Auto-reap test updates align with new pool helper.Also applies to: 213-216
223-236: Force-shutdown test update looks solid.
239-242: Startup-waiting test is a good regression guard.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| # Wraps ThreadPool work in mutex for better concurrency control. | ||
| class MutexPool < Puma::ThreadPool | ||
| # Wait until the added work is completed before returning. | ||
| # Array argument is treated as a batch of work items to be added. | ||
| # Block will run after work is added but before it is executed on a worker thread. | ||
| def <<(work, &block) | ||
| work = [work] unless work.is_a?(Array) | ||
| with_mutex do | ||
| work.each {|arg| super arg} | ||
| yield if block_given? | ||
| @not_full.wait(@mutex) | ||
| end | ||
| end | ||
|
|
||
| def signal | ||
| @not_full.signal | ||
| end | ||
|
|
||
| # If +wait+ is true, wait until the trim request is completed before returning. | ||
| def trim(force=false, wait: true) | ||
| super(force) | ||
| Thread.pass until @trim_requested == 0 if wait | ||
| end |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find the ThreadPool class in the repository
find . -type f -name "*.rb" | xargs grep -l "class ThreadPool"Repository: Signal65/puma-CodeRabbit
Length of output: 93
🏁 Script executed:
#!/bin/bash
# Look for the puma directory structure
find . -type f -name "thread_pool.rb" | head -20Repository: Signal65/puma-CodeRabbit
Length of output: 93
🏁 Script executed:
#!/bin/bash
# Read the ThreadPool class to understand the concurrency implementation
cat -n ./lib/puma/thread_pool.rb | head -200Repository: Signal65/puma-CodeRabbit
Length of output: 7199
🏁 Script executed:
#!/bin/bash
# Continue reading ThreadPool implementation
cat -n ./lib/puma/thread_pool.rb | sed -n '200,350p'Repository: Signal65/puma-CodeRabbit
Length of output: 4077
🏁 Script executed:
#!/bin/bash
# Now examine the test file to see how MutexPool is used and what behavior is expected
cat -n ./test/test_thread_pool.rbRepository: Signal65/puma-CodeRabbit
Length of output: 6361
🏁 Script executed:
#!/bin/bash
# Verify the with_mutex method and condition variable behavior
sed -n '142,146p' ./lib/puma/thread_pool.rbRepository: Signal65/puma-CodeRabbit
Length of output: 175
🏁 Script executed:
#!/bin/bash
# Check if there are other places `@not_full.signal` is called besides line 115
rg '@not_full\.signal' ./lib/puma/thread_pool.rb -nRepository: Signal65/puma-CodeRabbit
Length of output: 50
🏁 Script executed:
#!/bin/bash
# Search for `@not_full.signal` across the entire codebase
rg '@not_full' ./lib/puma/thread_pool.rb -nRepository: Signal65/puma-CodeRabbit
Length of output: 246
🏁 Script executed:
#!/bin/bash
# Let's look at the spawn_thread method more carefully to understand when `@not_full` signals
sed -n '102,122p' ./lib/puma/thread_pool.rbRepository: Signal65/puma-CodeRabbit
Length of output: 538
🏁 Script executed:
#!/bin/bash
# Let's verify the actual behavior by checking how `@not_full.signal` is actually triggered
# Look at the whole spawn_thread again to trace the signal path
sed -n '89,138p' ./lib/puma/thread_pool.rbRepository: Signal65/puma-CodeRabbit
Length of output: 1192
🏁 Script executed:
#!/bin/bash
# Verify the attr_reader declarations to confirm `@waiting` and `@spawned` are accessible
sed -n '66p' ./lib/puma/thread_pool.rbRepository: Signal65/puma-CodeRabbit
Length of output: 119
MutexPool#<< can return before all work finishes.
Line 31 waits for a single @not_full signal; with multiple workers, the first idle thread can signal while other work is still running. That makes the "wait until added work is completed" promise unreliable and can introduce test flakiness when batching. Wait in a loop until all workers are idle and the queue is empty.
🔧 Suggested fix
def <<(work, &block)
work = [work] unless work.is_a?(Array)
with_mutex do
work.each {|arg| super arg}
yield if block_given?
- `@not_full.wait`(`@mutex`)
+ `@not_full.wait`(`@mutex`) until `@todo.empty`? && `@waiting` == `@spawned`
end
end📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Wraps ThreadPool work in mutex for better concurrency control. | |
| class MutexPool < Puma::ThreadPool | |
| # Wait until the added work is completed before returning. | |
| # Array argument is treated as a batch of work items to be added. | |
| # Block will run after work is added but before it is executed on a worker thread. | |
| def <<(work, &block) | |
| work = [work] unless work.is_a?(Array) | |
| with_mutex do | |
| work.each {|arg| super arg} | |
| yield if block_given? | |
| @not_full.wait(@mutex) | |
| end | |
| end | |
| def signal | |
| @not_full.signal | |
| end | |
| # If +wait+ is true, wait until the trim request is completed before returning. | |
| def trim(force=false, wait: true) | |
| super(force) | |
| Thread.pass until @trim_requested == 0 if wait | |
| end | |
| # Wraps ThreadPool work in mutex for better concurrency control. | |
| class MutexPool < Puma::ThreadPool | |
| # Wait until the added work is completed before returning. | |
| # Array argument is treated as a batch of work items to be added. | |
| # Block will run after work is added but before it is executed on a worker thread. | |
| def <<(work, &block) | |
| work = [work] unless work.is_a?(Array) | |
| with_mutex do | |
| work.each {|arg| super arg} | |
| yield if block_given? | |
| `@not_full.wait`(`@mutex`) until `@todo.empty`? && `@waiting` == `@spawned` | |
| end | |
| end | |
| def signal | |
| `@not_full.signal` | |
| end | |
| # If +wait+ is true, wait until the trim request is completed before returning. | |
| def trim(force=false, wait: true) | |
| super(force) | |
| Thread.pass until `@trim_requested` == 0 if wait | |
| end |
🤖 Prompt for AI Agents
In `@test/test_thread_pool.rb` around lines 21 - 43, MutexPool#<< currently waits
once on `@not_full` which can return when only one worker becomes idle; change it
to wait in a loop until both the work queue is empty and all worker threads are
idle. Inside the with_mutex block replace the single `@not_full.wait`(`@mutex`) with
a loop that repeatedly calls `@not_full.wait`(`@mutex`) while !( `@queue.empty`? &&
`@pool.all`? { |th| /* idle check for thread th */ } ), i.e. keep waiting until
`@queue.empty`? and every thread in `@pool` is idle, using whatever thread-idle
check the Puma::ThreadPool uses in this codebase. Ensure you reference
MutexPool#<<, `@not_full`, `@mutex`, `@queue` and `@pool` when making the change.
PR_053
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.