diff --git a/cmd/sim/capped.go b/cmd/sim/capped.go new file mode 100644 index 0000000..71b7f61 --- /dev/null +++ b/cmd/sim/capped.go @@ -0,0 +1,31 @@ +package main + +import ( + "errors" + "sync" +) + +type Capped struct { + mu sync.Mutex + cur int64 + cap int64 +} + +func (s *Capped) Lock() error { + s.mu.Lock() + if s.cur >= s.cap { + s.mu.Unlock() + return errors.New("too many threads") + } + + s.cur++ + s.mu.Unlock() + + return nil +} + +func (s *Capped) Unlock() { + s.mu.Lock() + s.cur-- + s.mu.Unlock() +} diff --git a/cmd/sim/lock.go b/cmd/sim/lock.go index e6441f6..68c6ae7 100644 --- a/cmd/sim/lock.go +++ b/cmd/sim/lock.go @@ -12,23 +12,34 @@ import ( type Locker interface { Acquire(ctx context.Context) error Release() + Limit() int64 + Backoff() } type Semaphore struct { - mu sync.Mutex - cur int64 - cap int64 - sem *semaphore.Weighted + mu sync.Mutex + cur int64 + cap int64 + limit int64 + sem *semaphore.Weighted } func NewSemaphore(opts codel.Options) *Semaphore { s := semaphore.NewWeighted(int64(opts.MaxOutstanding)) return &Semaphore{ - cap: int64(opts.MaxPending) + int64(opts.MaxOutstanding), - sem: s, + cap: int64(opts.MaxPending) + int64(opts.MaxOutstanding), + limit: int64(opts.MaxOutstanding), + sem: s, } } +func (s *Semaphore) Limit() int64 { + return s.limit +} + +func (s *Semaphore) Backoff() { +} + func (s *Semaphore) Acquire(ctx context.Context) error { s.mu.Lock() // Drop if queue is full diff --git a/cmd/sim/main.go b/cmd/sim/main.go index 6798ba8..f561ea0 100644 --- a/cmd/sim/main.go +++ b/cmd/sim/main.go @@ -3,6 +3,7 @@ package main import ( "context" "flag" + "fmt" "log" "math/rand" "sync" @@ -13,112 +14,120 @@ import ( "github.com/joshbohde/codel/stats" ) -func msToWait(perSec int) time.Duration { +func msToWait(perSec int64) time.Duration { ms := rand.ExpFloat64() / (float64(perSec) / 1000) return time.Duration(ms * float64(time.Millisecond)) } -func emit(perSec int, timeToRun time.Duration, action func()) int64 { +type Simulation struct { + Method string + TimeToRun time.Duration + InputPerSec int64 + OutputPerSec int64 + Completed uint64 + Rejected uint64 + Started int64 + Stats stats.Stats + Limit int64 + mu sync.Mutex +} + +func (sim *Simulation) Process() { + sim.mu.Lock() + time.Sleep(msToWait(sim.OutputPerSec)) + sim.mu.Unlock() +} + +func (sim *Simulation) String() string { + successPercentage := float64(sim.Completed) / float64(sim.Started) + rejectedPercentage := float64(sim.Rejected) / float64(sim.Started) + + return fmt.Sprintf("method=%s duration=%s input=%d output=%d limit=%d throughput=%.2f completed=%.4f rejected=%.4f p50=%s p95=%s p99=%s ", + sim.Method, sim.TimeToRun, + sim.InputPerSec, sim.OutputPerSec, sim.Limit, + float64(sim.InputPerSec)*successPercentage, successPercentage, rejectedPercentage, + sim.Stats.Query(0.5), sim.Stats.Query(0.95), sim.Stats.Query(0.99)) + +} + +// Model input & output as random processes with average throughput. +func (sim *Simulation) Run(lock Locker) { start := time.Now() wg := sync.WaitGroup{} - started := int64(0) for { - time.Sleep(msToWait(perSec)) + time.Sleep(msToWait(sim.InputPerSec)) - if time.Now().Sub(start) > timeToRun { + if time.Since(start) > sim.TimeToRun { break } - started++ + sim.Started++ wg.Add(1) go func() { - action() - wg.Done() - }() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) - } - return started -} - -type fakeServer struct { - mu sync.Mutex - perSec int -} + timer := sim.Stats.Time() -// Simulate a single threaded server -func (s *fakeServer) Process() { - s.mu.Lock() - time.Sleep(msToWait(s.perSec)) - s.mu.Unlock() -} - -// Model input & output as random processes with average throughput. -func Simulate(method string, lock Locker, inputPerSec, outputPerSec int, timeToRun time.Duration) { - stat := stats.New() - server := fakeServer{perSec: outputPerSec} + err := lock.Acquire(ctx) + cancel() - completed := uint64(0) - rejected := uint64(0) + if err == nil { + sim.Process() - started := emit(inputPerSec, timeToRun, func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + lock.Release() + timer.Mark() + atomic.AddUint64(&sim.Completed, 1) + } else { + atomic.AddUint64(&sim.Rejected, 1) + } - timer := stat.Time() + wg.Done() + }() - err := lock.Acquire(ctx) - cancel() + } - if err == nil { - server.Process() - lock.Release() - timer.Mark() - atomic.AddUint64(&completed, 1) - } else { - atomic.AddUint64(&rejected, 1) - } - }) + sim.Limit = lock.Limit() +} - actualCompleted := atomic.LoadUint64(&completed) - actualRejected := atomic.LoadUint64(&rejected) +func Overloaded(runtime time.Duration, opts codel.Options) []*Simulation { + wg := sync.WaitGroup{} - successPercentage := float64(actualCompleted) / float64(started) - rejectedPercentage := float64(actualRejected) / float64(started) + runs := []*Simulation{} - log.Printf("method=%s duration=%s input=%d output=%d throughput=%.2f completed=%.4f rejected=%.4f p50=%s p95=%s p99=%s ", - method, timeToRun, - inputPerSec, outputPerSec, float64(inputPerSec)*successPercentage, successPercentage, rejectedPercentage, - stat.Query(0.5), stat.Query(0.95), stat.Query(0.99)) -} + run := func(in, out int64) { + wg.Add(2) -func main() { - runtime := flag.Duration("simulation-time", 5*time.Second, "Time to run each simulation") - targetLatency := flag.Duration("target-latency", 5*time.Millisecond, "Time to run each simulation") - flag.Parse() + codelRun := Simulation{ + Method: "codel", + InputPerSec: in, + OutputPerSec: out, + TimeToRun: runtime, + Stats: stats.New(), + } - wg := sync.WaitGroup{} + queueRun := Simulation{ + Method: "queue", + InputPerSec: in, + OutputPerSec: out, + TimeToRun: runtime, + Stats: stats.New(), + } - opts := codel.Options{ - MaxPending: 1000, - MaxOutstanding: 10, - TargetLatency: *targetLatency, - } + runs = append(runs, &codelRun, &queueRun) - run := func(in, out int) { - wg.Add(2) go func() { - Simulate("codel", codel.New(opts), in, out, *runtime) + codelRun.Run(codel.New(opts)) wg.Done() }() go func() { - Simulate("queue", NewSemaphore(opts), in, out, *runtime) + queueRun.Run(NewSemaphore(opts)) wg.Done() }() } - run(1000, 2000) run(1000, 1000) run(1000, 900) run(1000, 750) @@ -127,4 +136,25 @@ func main() { run(1000, 100) wg.Wait() + return runs +} + +func main() { + runtime := *flag.Duration("simulation-time", 5*time.Second, "Time to run each simulation") + targetLatency := *flag.Duration("target-latency", 5*time.Millisecond, "Time to run each simulation") + + flag.Parse() + + opts := codel.Options{ + MaxPending: 1000, + InitialOutstanding: 10, + MaxOutstanding: 10, + TargetLatency: targetLatency, + } + + runs := Overloaded(runtime, opts) + for _, r := range runs { + log.Printf("%s", r.String()) + } + } diff --git a/codel.go b/codel.go index 3292adb..e85560e 100644 --- a/codel.go +++ b/codel.go @@ -16,6 +16,7 @@ import ( "errors" "math" "sync" + "sync/atomic" "time" ) @@ -23,7 +24,8 @@ import ( var Dropped = errors.New("dropped") const ( - interval = 10 * time.Millisecond + interval = 10 * time.Millisecond + outstandingInterval = 1 * time.Second ) // rendezvouz is for returning context to the calling goroutine @@ -45,9 +47,10 @@ func (r rendezvouz) Signal() { // Options are options to configure a Lock. type Options struct { - MaxPending int // The maximum number of pending acquires - MaxOutstanding int // The maximum number of concurrent acquires - TargetLatency time.Duration // The target latency to wait for an acquire. Acquires that take longer than this can fail. + MaxPending int // The maximum number of pending acquires + InitialOutstanding int // The initial number of concurrent acquires + MaxOutstanding int // The maximum number of concurrent acquires + TargetLatency time.Duration // The target latency to wait for an acquire. Acquires that take longer than this can fail. } // Lock implements a FIFO lock with concurrency control, based upon the CoDel algorithm (https://queue.acm.org/detail.cfm?id=2209336). @@ -63,26 +66,40 @@ type Lock struct { waiters list.List maxPending int64 - outstanding int64 - maxOutstanding int64 + outstanding int64 + outstandingLimit int64 + maxOutstandingLimit int64 + nextOutstandingIncrease time.Time + nextOutstandingDecrease time.Time } func New(opts Options) *Lock { + initial := opts.InitialOutstanding + if initial == 0 { + initial = opts.MaxOutstanding + } + q := Lock{ - target: opts.TargetLatency, - maxOutstanding: int64(opts.MaxOutstanding), - maxPending: int64(opts.MaxPending), + target: opts.TargetLatency, + outstandingLimit: int64(initial), + maxOutstandingLimit: int64(opts.MaxOutstanding), + maxPending: int64(opts.MaxPending), } return &q } +// Limit atomically returns the current limit of the lock +func (l *Lock) Limit() int64 { + return atomic.LoadInt64(&l.outstandingLimit) +} + // Acquire a Lock with FIFO ordering, respecting the context. Returns an error it fails to acquire. func (l *Lock) Acquire(ctx context.Context) error { l.mu.Lock() // Fast path if we are unblocked. - if l.outstanding < l.maxOutstanding && l.waiters.Len() == 0 { + if l.outstanding < l.outstandingLimit && l.waiters.Len() == 0 { l.outstanding++ l.mu.Unlock() return nil @@ -135,7 +152,44 @@ func (l *Lock) Release() { panic("lock: bad release") } - l.deque() + now := time.Now() + + // If we have enqueued acquires, and can scale outstandingLimit, increase by one + if l.outstandingLimit < l.maxOutstandingLimit && l.waiters.Len() > 0 && l.nextOutstandingIncrease.Before(now) { + l.outstandingLimit++ + l.nextOutstandingIncrease = now.Add(outstandingInterval) + } + + keepGoing := true + for keepGoing && l.outstanding < l.outstandingLimit { + keepGoing = l.deque(now) + } + + l.mu.Unlock() +} + +// Backoff reduces the amount of concurrent aqcuires +func (l *Lock) Backoff() { + l.mu.Lock() + + now := time.Now() + + if l.nextOutstandingDecrease.Before(now) { + // scale down 0.7 times, rounding up, ensuring we scale down + before := l.outstanding + + if before > 1 { + l.outstandingLimit = (before * 7) / 10 + if l.outstandingLimit >= before { + l.outstandingLimit-- + } + } + + } + + l.nextOutstandingDecrease = now.Add(outstandingInterval) + // we need at least 1 quiet period before increasing + l.nextOutstandingIncrease = now.Add(outstandingInterval * 2) l.mu.Unlock() @@ -180,21 +234,19 @@ func (l *Lock) externalDrop() { } // Pull instances off the queue until we no longer drop -func (l *Lock) deque() { - now := time.Now() - +func (l *Lock) deque(now time.Time) bool { rendezvouz, ok, okToDrop := l.doDeque(now) // The queue has no entries, so return if !ok { - return + return false } if !okToDrop { l.dropping = false l.outstanding++ rendezvouz.Signal() - return + return true } if l.dropping { @@ -203,7 +255,7 @@ func (l *Lock) deque() { rendezvouz, ok, okToDrop = l.doDeque(now) if !ok { - return + return false } l.droppedCount++ @@ -219,7 +271,7 @@ func (l *Lock) deque() { rendezvouz, ok, _ = l.doDeque(now) if !ok { - return + return false } l.dropping = true @@ -235,4 +287,5 @@ func (l *Lock) deque() { l.outstanding++ rendezvouz.Signal() + return true } diff --git a/codel_test.go b/codel_test.go index d83e610..b2489a4 100644 --- a/codel_test.go +++ b/codel_test.go @@ -93,6 +93,33 @@ func TestLockCanHaveMultiple(t *testing.T) { } } +func TestBackoff(t *testing.T) { + tests := []struct{ Val, Expected int64 }{ + {1, 1}, + {2, 1}, + {7, 4}, + {10, 7}, + } + + for _, tc := range tests { + c := New(Options{ + InitialOutstanding: 1, + MaxOutstanding: int(tc.Val), + }) + + c.outstanding = tc.Val + + c.Backoff() + actual := c.Limit() + if actual != tc.Expected { + t.Errorf( + "Backoff of %d = %d, expected %d", + tc.Val, actual, tc.Expected) + } + + } +} + func BenchmarkLockUnblocked(b *testing.B) { c := New(Options{ MaxPending: 1, diff --git a/stats/stats.go b/stats/stats.go index ba32ec2..c54525c 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -8,44 +8,42 @@ import ( ) type Stats struct { - stream *quantile.Stream - lock *sync.Mutex + stream quantile.Stream + lock sync.Mutex } type Timer struct { start time.Time - stats Stats + stats *Stats } -func (t Timer) Mark() { +func (t *Timer) Mark() { elapsed := time.Since(t.start) t.stats.Insert(float64(elapsed.Nanoseconds())) } func New() Stats { stream := quantile.NewTargeted(0.5, 0.95, 0.99) - lock := sync.Mutex{} return Stats{ - stream: stream, - lock: &lock, + stream: *stream, } } -func (s Stats) Insert(val float64) { +func (s *Stats) Insert(val float64) { s.lock.Lock() s.stream.Insert(val) s.lock.Unlock() } -func (s Stats) Time() Timer { +func (s *Stats) Time() Timer { return Timer{ start: time.Now(), stats: s, } } -func (s Stats) Query(quantile float64) time.Duration { +func (s *Stats) Query(quantile float64) time.Duration { s.lock.Lock() val := s.stream.Query(quantile) s.lock.Unlock()