Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions queue/back_pressure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package queue

import (
"context"
"errors"
"math/rand"

"github.com/lightningnetwork/lnd/fn/v2"
)

// DropPredicate decides whether to drop an item when the queue is full.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily when the queue is full. It can decide on dropping the item when there's still room in the queue.

// It receives the current queue length and the item, and returns true to drop,
// false to enqueue.
type DropPredicate[T any] func(queueLen int, item T) bool

// ErrQueueFullAndDropped is returned by Enqueue when the item is dropped
// due to the DropPredicate.
var ErrQueueFullAndDropped = errors.New("queue full and item dropped")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming ErrItemDropped or something. Depending on the DropPredicate it's not a given that the queue is full. You just lost the lottery, that's all.


// BackpressureQueue is a generic, fixed-capacity queue with predicate-based
// drop behavior. When full, it uses the DropPredicate to perform early drops
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, "When full" is misleading.

// (e.g., RED-style).
type BackpressureQueue[T any] struct {
ch chan T
dropPredicate DropPredicate[T]
}

// NewBackpressureQueue creates a new BackpressureQueue with the given capacity
// and drop predicate.
func NewBackpressureQueue[T any](capacity int,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding validation of capacity? It needs to be bigger than 0, right? Likewise check for a nil predicate.

predicate DropPredicate[T]) *BackpressureQueue[T] {

return &BackpressureQueue[T]{
ch: make(chan T, capacity),
dropPredicate: predicate,
}
}

// Enqueue attempts to add an item to the queue, respecting context
// cancellation. Returns ErrQueueFullAndDropped if dropped, or context error if
// ctx is done before enqueue. Otherwise, `nil` is returned on success.
func (q *BackpressureQueue[T]) Enqueue(ctx context.Context,
item T) error {

// First, consult the drop predicate based on the current queue length.
// If the predicate decides to drop the item, return an error.
if q.dropPredicate(len(q.ch), item) {
return ErrQueueFullAndDropped
}

// If the predicate decides not to drop, attempt to enqueue the item.
select {
case q.ch <- item:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queueLen snapshot at line 47 can become stale before the actual send here. It's ok because RED is inherently probabilistic and approximate but maybe add a comment documenting this?

return nil

default:
// Channel is full, and the predicate decided not to drop. We
// must block until space is available or context is cancelled.
select {
case q.ch <- item:
return nil

case <-ctx.Done():
return ctx.Err()
}
}
}

// Dequeue retrieves the next item from the queue, blocking until available or
// context done. Returns the item or an error if ctx is done before an item is
// available.
func (q *BackpressureQueue[T]) Dequeue(ctx context.Context) fn.Result[T] {
select {

case item := <-q.ch:
return fn.Ok(item)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the channel is closed, item := <-q.ch receives the zero value of type T without any way to distinguish it from a legitimately sent zero value. The function would return fn.Ok(zeroValue), which is indistinguishable from a real dequeue.


case <-ctx.Done():
return fn.Err[T](ctx.Err())
}
}

// redConfig holds configuration for RandomEarlyDrop.
type redConfig struct {
randSrc func() float64
}

// REDOption is a functional option for configuring RandomEarlyDrop.
type REDOption func(*redConfig)

// WithRandSource provides a custom random number source (a function that
// returns a float64 between 0.0 and 1.0).
func WithRandSource(src func() float64) REDOption {
return func(cfg *redConfig) {
cfg.randSrc = src
}
}

// RandomEarlyDrop returns a DropPredicate that implements Random Early
// Detection (RED), inspired by TCP-RED queue management.
//
// RED prevents sudden buffer overflows by proactively dropping packets before
// the queue is full. It establishes two thresholds:
//
// 1. minThreshold: queue length below which no drops occur.
// 2. maxThreshold: queue length at or above which all items are dropped.
//
// Between these points, the drop probability p increases linearly:
//
// p = (queueLen - minThreshold) / (maxThreshold - minThreshold)
//
// For example, with minThreshold=15 and maxThreshold=35:
// - At queueLen=15, p=0.0 (0% drop chance)
// - At queueLen=25, p=0.5 (50% drop chance)
// - At queueLen=35, p=1.0 (100% drop chance)
//
// This smooth ramp helps avoid tail-drop spikes, smooths queue occupancy,
// and gives early back-pressure signals to senders.
func RandomEarlyDrop[T any](minThreshold, maxThreshold int,
opts ...REDOption) DropPredicate[T] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add validation to check that minThreshold >= 0 and maxThreshold > minThreshold. If you would input minThreshold=-5 and maxThreshold=-20stuff gets crazy! 😄


cfg := redConfig{
randSrc: rand.Float64,
}

for _, opt := range opts {
opt(&cfg)
}
if cfg.randSrc == nil {
panic("randSrc cannot be nil")
}

return func(queueLen int, _ T) bool {
// If the queue is below the minimum threshold, then we never
// drop.
if queueLen < minThreshold {
return false
}

// If the queue is at or above the maximum threshold, then we
// always drop.
if queueLen >= maxThreshold {
return true
}

// In between the thresholds, linearly scale the drop
// probability.
denominator := float64(maxThreshold - minThreshold)
p := float64(queueLen-minThreshold) / denominator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This smells like a possible division by zero. It isn't tho, since you wouldn't get passed the above two if-statements if maxThreshold == minThreshold.

That said, it's arguably a logical error to call RandomEarlyDrop with equal thresholds, so you might want to add validation to catch that as a programming mistake.


return cfg.randSrc() < p
}
}
Loading
Loading