Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,7 @@ WORKDIR /
COPY --from=build-stage /output/bin/kperf /kperf
COPY --from=build-stage /output/bin/runkperf /runkperf
COPY scripts/run_runner.sh /run_runner.sh
COPY scripts/run_replay.sh /run_replay.sh

# Make scripts executable
RUN chmod +x /run_runner.sh /run_replay.sh
55 changes: 24 additions & 31 deletions replay/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ type timeBucket struct {
}

// workerMetrics holds per-worker statistics.
// For normal workers, each goroutine has its own instance (no synchronization needed).
// For WATCH metrics (shared across goroutines), atomic operations ensure correctness.
// Each normal worker goroutine has its own instance (no synchronization needed).
// WATCH metrics use fire-and-forget (no concurrent writes to requestsRun/requestsFailed).
type workerMetrics struct {
respMetric metrics.ResponseMetric
requestsRun int32
requestsFailed int32
requestsRun int
requestsFailed int
}

// groupIntoTimeBuckets groups requests by time buckets to reduce timer overhead.
Expand Down Expand Up @@ -228,7 +228,10 @@ func (r *Runner) Run(ctx context.Context, replayStart time.Time) (*RunnerResult,
}

// Start worker pool
wg := r.startWorkers(ctx, workers)
wg, err := r.startWorkers(ctx, workers)
if err != nil {
return nil, err
}

startTime := time.Now()

Expand Down Expand Up @@ -304,25 +307,12 @@ func (r *Runner) Run(ctx context.Context, replayStart time.Time) (*RunnerResult,
clis, err := request.NewClients(r.kubeconfigPath, 1, r.clientOpts...)
if err != nil {
klog.Errorf("Runner %d: failed to create overflow WATCH connection: %v", r.index, err)
atomic.AddInt32(&watchMetrics.requestsRun, 1)
atomic.AddInt32(&watchMetrics.requestsFailed, 1)
return
}
watchCli = clis[0]
}

// Check if context was cancelled during connection setup
select {
case <-watchCtx.Done():
return
default:
}

err := r.executeRequestWithClient(watchCtx, req, watchCli, watchMetrics.respMetric)
atomic.AddInt32(&watchMetrics.requestsRun, 1)
if err != nil {
atomic.AddInt32(&watchMetrics.requestsFailed, 1)
}
_ = r.executeRequestWithClient(watchCtx, req, watchCli, watchMetrics.respMetric)
}(req)
}

Expand All @@ -336,8 +326,8 @@ func (r *Runner) Run(ctx context.Context, replayStart time.Time) (*RunnerResult,
// Aggregate results from all workers (including WATCH metrics)
allMetrics := append(workers, watchMetrics)

var totalRun int32
var totalFailed int32
var totalRun int
var totalFailed int
aggregatedStats := types.ResponseStats{
Errors: make([]types.ResponseError, 0),
LatenciesByURL: make(map[string][]float64),
Expand Down Expand Up @@ -365,15 +355,19 @@ func (r *Runner) Run(ctx context.Context, replayStart time.Time) (*RunnerResult,
Total: len(r.requests),
Duration: time.Since(startTime),
ResponseStats: aggregatedStats,
RequestsRun: int(totalRun),
RequestsFailed: int(totalFailed),
RequestsRun: totalRun,
RequestsFailed: totalFailed,
}, nil
}

// startWorkers creates the worker pool that processes requests from the channel.
func (r *Runner) startWorkers(ctx context.Context, workers []*workerMetrics) *sync.WaitGroup {
func (r *Runner) startWorkers(ctx context.Context, workers []*workerMetrics) (*sync.WaitGroup, error) {
var wg sync.WaitGroup

if len(r.restClis) == 0 {
return nil, fmt.Errorf("runner %d: no REST clients configured", r.index)
}

for i := 0; i < r.workerCount; i++ {
wg.Add(1)

Expand All @@ -395,16 +389,16 @@ func (r *Runner) startWorkers(ctx context.Context, workers []*workerMetrics) *sy
// Execute request with worker's dedicated connection (pointer, no copy)
err := r.executeRequestWithClient(ctx, req, restCli, wm.respMetric)

// Track metrics (per-worker, no contention for normal workers)
atomic.AddInt32(&wm.requestsRun, 1)
// Track metrics (per-worker, no contention)
wm.requestsRun++
if err != nil {
atomic.AddInt32(&wm.requestsFailed, 1)
wm.requestsFailed++
}
}
}(i, cli, workers[i])
}

return &wg
return &wg, nil
}

// executeRequestWithClient executes a single replay request with a specific client.
Expand Down Expand Up @@ -437,10 +431,9 @@ func (r *Runner) executeRequestWithClient(ctx context.Context, req *types.Replay
respMetric.ObserveReceivedBytes(bytes)

if err != nil {
// Check if error is due to context cancellation (expected for WATCH when replay ends)
// Context cancellation is expected when replay duration expires.
// In-flight requests of any verb get cancelled — treat as success.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// Context cancelled - treat as successful completion for WATCH operations
// This ensures cancelled WATCHes are counted in the total
respMetric.ObserveLatency(requester.Method(), requester.MaskedURL().String(), reportLatency)
klog.V(5).Infof("Request cancelled (expected): %s %s", req.Verb, req.APIPath)
return nil
Expand Down
22 changes: 19 additions & 3 deletions replay/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ func Schedule(ctx context.Context, kubeconfigPath string, profile *types.ReplayP
)
}

// Enforce a hard deadline at the profile duration so the replay doesn't overrun.
// Add a small grace period for the last batch of in-flight requests.
profileDuration := time.Duration(profile.Duration()) * time.Millisecond
replayCtx, cancelReplay := context.WithTimeout(ctx, profileDuration+30*time.Second)
defer cancelReplay()

// Synchronize start time across all runners
startTime := time.Now()

Expand All @@ -133,7 +139,7 @@ func Schedule(ctx context.Context, kubeconfigPath string, profile *types.ReplayP
wg.Add(1)
go func(idx int) {
defer wg.Done()
result, err := runners[idx].Run(ctx, startTime)
result, err := runners[idx].Run(replayCtx, startTime)
results[idx] = result
runnerErrors[idx] = err
}(i)
Expand Down Expand Up @@ -180,6 +186,11 @@ func ScheduleSingleRunner(ctx context.Context, kubeconfigPath string, profile *t
workersPerRunner = profile.Spec.ConnsPerRunner
}

// Validate runner index
if runnerIndex < 0 || runnerIndex >= profile.Spec.RunnerCount {
return nil, fmt.Errorf("runner index %d is out of range [0, %d)", runnerIndex, profile.Spec.RunnerCount)
}

// Partition requests for this runner
requests := PartitionRequests(profile.Requests, profile.Spec.RunnerCount, runnerIndex)

Expand Down Expand Up @@ -208,10 +219,15 @@ func ScheduleSingleRunner(ctx context.Context, kubeconfigPath string, profile *t
clientOpts,
)

// Enforce a hard deadline at the profile duration so the replay doesn't overrun.
profileDuration := time.Duration(profile.Duration()) * time.Millisecond
replayCtx, cancelReplay := context.WithTimeout(ctx, profileDuration+30*time.Second)
defer cancelReplay()

// Use current time as start (each pod will have slightly different start times)
replayStart := time.Now()

return runner.Run(ctx, replayStart)
return runner.Run(replayCtx, replayStart)
}

// aggregateResults combines results from all runners.
Expand Down Expand Up @@ -293,7 +309,7 @@ func validateAndWarnConfig(profile *types.ReplayProfile, runnerRequests [][]type
recommendedWorkers := int(qps/qpsPerWorkerEstimate) + qpsPerWorkerEstimate
if workers < recommendedWorkers {
klog.Warningf("Runner %d: ClientsPerRunner (%d) may be insufficient for QPS (%.0f). "+
"Recommend at least %d workers (3-4x connections).",
"Recommend at least %d workers.",
i, workers, qps, recommendedWorkers)
}

Expand Down
Loading