Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions cmd/sim/capped.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"errors"
"sync"
)

type Capped struct {
mu sync.Mutex
cur int64
cap int64
}

func (s *Capped) Lock() error {
s.mu.Lock()
if s.cur >= s.cap {
s.mu.Unlock()
return errors.New("too many threads")
}

s.cur++
s.mu.Unlock()

return nil
}

func (s *Capped) Unlock() {
s.mu.Lock()
s.cur--
s.mu.Unlock()
}
23 changes: 17 additions & 6 deletions cmd/sim/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,34 @@ import (
type Locker interface {
Acquire(ctx context.Context) error
Release()
Limit() int64
Backoff()
}

type Semaphore struct {
mu sync.Mutex
cur int64
cap int64
sem *semaphore.Weighted
mu sync.Mutex
cur int64
cap int64
limit int64
sem *semaphore.Weighted
}

func NewSemaphore(opts codel.Options) *Semaphore {
s := semaphore.NewWeighted(int64(opts.MaxOutstanding))
return &Semaphore{
cap: int64(opts.MaxPending) + int64(opts.MaxOutstanding),
sem: s,
cap: int64(opts.MaxPending) + int64(opts.MaxOutstanding),
limit: int64(opts.MaxOutstanding),
sem: s,
}
}

func (s *Semaphore) Limit() int64 {
return s.limit
}

func (s *Semaphore) Backoff() {
}

func (s *Semaphore) Acquire(ctx context.Context) error {
s.mu.Lock()
// Drop if queue is full
Expand Down
166 changes: 98 additions & 68 deletions cmd/sim/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"flag"
"fmt"
"log"
"math/rand"
"sync"
Expand All @@ -13,112 +14,120 @@ import (
"github.com/joshbohde/codel/stats"
)

func msToWait(perSec int) time.Duration {
func msToWait(perSec int64) time.Duration {
ms := rand.ExpFloat64() / (float64(perSec) / 1000)
return time.Duration(ms * float64(time.Millisecond))
}

func emit(perSec int, timeToRun time.Duration, action func()) int64 {
type Simulation struct {
Method string
TimeToRun time.Duration
InputPerSec int64
OutputPerSec int64
Completed uint64
Rejected uint64
Started int64
Stats stats.Stats
Limit int64
mu sync.Mutex
}

func (sim *Simulation) Process() {
sim.mu.Lock()
time.Sleep(msToWait(sim.OutputPerSec))
sim.mu.Unlock()
}

func (sim *Simulation) String() string {
successPercentage := float64(sim.Completed) / float64(sim.Started)
rejectedPercentage := float64(sim.Rejected) / float64(sim.Started)

return fmt.Sprintf("method=%s duration=%s input=%d output=%d limit=%d throughput=%.2f completed=%.4f rejected=%.4f p50=%s p95=%s p99=%s ",
sim.Method, sim.TimeToRun,
sim.InputPerSec, sim.OutputPerSec, sim.Limit,
float64(sim.InputPerSec)*successPercentage, successPercentage, rejectedPercentage,
sim.Stats.Query(0.5), sim.Stats.Query(0.95), sim.Stats.Query(0.99))

}

// Model input & output as random processes with average throughput.
func (sim *Simulation) Run(lock Locker) {
start := time.Now()

wg := sync.WaitGroup{}
started := int64(0)

for {
time.Sleep(msToWait(perSec))
time.Sleep(msToWait(sim.InputPerSec))

if time.Now().Sub(start) > timeToRun {
if time.Since(start) > sim.TimeToRun {
break
}

started++
sim.Started++
wg.Add(1)

go func() {
action()
wg.Done()
}()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)

}
return started
}

type fakeServer struct {
mu sync.Mutex
perSec int
}
timer := sim.Stats.Time()

// Simulate a single threaded server
func (s *fakeServer) Process() {
s.mu.Lock()
time.Sleep(msToWait(s.perSec))
s.mu.Unlock()
}

// Model input & output as random processes with average throughput.
func Simulate(method string, lock Locker, inputPerSec, outputPerSec int, timeToRun time.Duration) {
stat := stats.New()
server := fakeServer{perSec: outputPerSec}
err := lock.Acquire(ctx)
cancel()

completed := uint64(0)
rejected := uint64(0)
if err == nil {
sim.Process()

started := emit(inputPerSec, timeToRun, func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
lock.Release()
timer.Mark()
atomic.AddUint64(&sim.Completed, 1)
} else {
atomic.AddUint64(&sim.Rejected, 1)
}

timer := stat.Time()
wg.Done()
}()

err := lock.Acquire(ctx)
cancel()
}

if err == nil {
server.Process()
lock.Release()
timer.Mark()
atomic.AddUint64(&completed, 1)
} else {
atomic.AddUint64(&rejected, 1)
}
})
sim.Limit = lock.Limit()
}

actualCompleted := atomic.LoadUint64(&completed)
actualRejected := atomic.LoadUint64(&rejected)
func Overloaded(runtime time.Duration, opts codel.Options) []*Simulation {
wg := sync.WaitGroup{}

successPercentage := float64(actualCompleted) / float64(started)
rejectedPercentage := float64(actualRejected) / float64(started)
runs := []*Simulation{}

log.Printf("method=%s duration=%s input=%d output=%d throughput=%.2f completed=%.4f rejected=%.4f p50=%s p95=%s p99=%s ",
method, timeToRun,
inputPerSec, outputPerSec, float64(inputPerSec)*successPercentage, successPercentage, rejectedPercentage,
stat.Query(0.5), stat.Query(0.95), stat.Query(0.99))
}
run := func(in, out int64) {
wg.Add(2)

func main() {
runtime := flag.Duration("simulation-time", 5*time.Second, "Time to run each simulation")
targetLatency := flag.Duration("target-latency", 5*time.Millisecond, "Time to run each simulation")
flag.Parse()
codelRun := Simulation{
Method: "codel",
InputPerSec: in,
OutputPerSec: out,
TimeToRun: runtime,
Stats: stats.New(),
}

wg := sync.WaitGroup{}
queueRun := Simulation{
Method: "queue",
InputPerSec: in,
OutputPerSec: out,
TimeToRun: runtime,
Stats: stats.New(),
}

opts := codel.Options{
MaxPending: 1000,
MaxOutstanding: 10,
TargetLatency: *targetLatency,
}
runs = append(runs, &codelRun, &queueRun)

run := func(in, out int) {
wg.Add(2)
go func() {
Simulate("codel", codel.New(opts), in, out, *runtime)
codelRun.Run(codel.New(opts))
wg.Done()
}()
go func() {
Simulate("queue", NewSemaphore(opts), in, out, *runtime)
queueRun.Run(NewSemaphore(opts))
wg.Done()
}()
}

run(1000, 2000)
run(1000, 1000)
run(1000, 900)
run(1000, 750)
Expand All @@ -127,4 +136,25 @@ func main() {
run(1000, 100)

wg.Wait()
return runs
}

func main() {
runtime := *flag.Duration("simulation-time", 5*time.Second, "Time to run each simulation")
targetLatency := *flag.Duration("target-latency", 5*time.Millisecond, "Time to run each simulation")

flag.Parse()

opts := codel.Options{
MaxPending: 1000,
InitialOutstanding: 10,
MaxOutstanding: 10,
TargetLatency: targetLatency,
}

runs := Overloaded(runtime, opts)
for _, r := range runs {
log.Printf("%s", r.String())
}

}
Loading