Conversation
|
Important Review skippedAuto reviews are limited to specific labels. 🏷️ Labels to auto review (1)
Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
Pull reviewers statsStats of the last 30 days for lnd:
|
gijswijs
left a comment
There was a problem hiding this comment.
I did a pass of this and made a bunch of comments. There seems to be a lot of LLM fluff (although I could be mistaken) that I don't like.
2e346d0 to
3511604
Compare
|
Pushed up a new version. PTAL. |
gijswijs
left a comment
There was a problem hiding this comment.
Did a second pass. Mainly nits (line length! 😄) and some clarifications from my side about the use of generics in the tests.
Suffice to say that I really like this. I already used it in #10219 so I would love to see this merged.
@yyforyongyu Could you have a go at this as well?
| } | ||
|
|
||
| // intQueueMachine is a concrete wrapper for queueMachine[int] for rapid. | ||
| type intQueueMachine struct { |
There was a problem hiding this comment.
No, I meant
type intQueueMachine struct {
tb rapid.TB
capacity int
queue *BackpressureQueue[int]
modelQueue []int
dropPredicate DropPredicate[int]
itemGenerator *rapid.Generator[int]
}
and take it from there.
| } | ||
|
|
||
| // intQueueMachine is a concrete wrapper for queueMachine[int] for rapid. | ||
| type intQueueMachine struct { |
There was a problem hiding this comment.
This is what copilot came up with. Github doesn't allow me to upload .diff files, hence the .txt extension. no-generics.txt
In this commit, we add a new type of queue: the back pressure queue. This is a bounded queue based on a simple channel, that will consult a predicate to decide if we should preemptively drop a message or not. We then provide a sample predicate for this use case, based on random early dropping. Given a min and max threshold, we'll start to drop message randomly once we get past the min threshold, ramping up to the max threshold where we'll start to always drop the message.
3511604 to
3998b98
Compare
|
@gijswijs: review reminder |
gijswijs
left a comment
There was a problem hiding this comment.
Some minor issues still, mostly validation and better comments. I don't think release notes are in place here, so maybe add the no-changelog label to the PR to skip that CI check.
| // In between the thresholds, linearly scale the drop | ||
| // probability. | ||
| denominator := float64(maxThreshold - minThreshold) | ||
| p := float64(queueLen-minThreshold) / denominator |
There was a problem hiding this comment.
This smells like a possible division by zero. It isn't tho, since you wouldn't get passed the above two if-statements if maxThreshold == minThreshold.
That said, it's arguably a logical error to call RandomEarlyDrop with equal thresholds, so you might want to add validation to catch that as a programming mistake.
|
|
||
| // NewBackpressureQueue creates a new BackpressureQueue with the given capacity | ||
| // and drop predicate. | ||
| func NewBackpressureQueue[T any](capacity int, |
There was a problem hiding this comment.
Consider adding validation of capacity? It needs to be bigger than 0, right? Likewise check for a nil predicate.
|
|
||
| // If the predicate decides not to drop, attempt to enqueue the item. | ||
| select { | ||
| case q.ch <- item: |
There was a problem hiding this comment.
The queueLen snapshot at line 47 can become stale before the actual send here. It's ok because RED is inherently probabilistic and approximate but maybe add a comment documenting this?
| select { | ||
|
|
||
| case item := <-q.ch: | ||
| return fn.Ok(item) |
There was a problem hiding this comment.
If the channel is closed, item := <-q.ch receives the zero value of type T without any way to distinguish it from a legitimately sent zero value. The function would return fn.Ok(zeroValue), which is indistinguishable from a real dequeue.
| // This smooth ramp helps avoid tail-drop spikes, smooths queue occupancy, | ||
| // and gives early back-pressure signals to senders. | ||
| func RandomEarlyDrop[T any](minThreshold, maxThreshold int, | ||
| opts ...REDOption) DropPredicate[T] { |
There was a problem hiding this comment.
Add validation to check that minThreshold >= 0 and maxThreshold > minThreshold. If you would input minThreshold=-5 and maxThreshold=-20stuff gets crazy! 😄
|
|
||
| // ErrQueueFullAndDropped is returned by Enqueue when the item is dropped | ||
| // due to the DropPredicate. | ||
| var ErrQueueFullAndDropped = errors.New("queue full and item dropped") |
There was a problem hiding this comment.
Consider renaming ErrItemDropped or something. Depending on the DropPredicate it's not a given that the queue is full. You just lost the lottery, that's all.
| "github.com/lightningnetwork/lnd/fn/v2" | ||
| ) | ||
|
|
||
| // DropPredicate decides whether to drop an item when the queue is full. |
There was a problem hiding this comment.
Not necessarily when the queue is full. It can decide on dropping the item when there's still room in the queue.
| var ErrQueueFullAndDropped = errors.New("queue full and item dropped") | ||
|
|
||
| // BackpressureQueue is a generic, fixed-capacity queue with predicate-based | ||
| // drop behavior. When full, it uses the DropPredicate to perform early drops |
There was a problem hiding this comment.
Again, "When full" is misleading.
In this commit, we add a new type of queue: the back pressure queue. This is a bounded queue based on a simple channel, that will consult a predicate to decide if we should preemptively drop a message or not.
We then provide a sample predicate for this use case, based on random early dropping. Given a min and max threshold, we'll start to drop message randomly once we get past the min threshold, ramping up to the max threshold where we'll start to always drop the message.