From 6c132eed637287fddcd9744149a46e63db101e1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JasonXuDeveloper=20-=20=E5=82=91?= Date: Wed, 18 Feb 2026 22:44:44 +1100 Subject: [PATCH 1/3] feat: add replay support to runner group and deployment infrastructure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Distributed replay mode integration: - Replay-aware job building: skip configmap upload for replay mode, use indexed Jobs for runner assignment, custom replay entrypoint script - run_replay.sh: entrypoint script for replay runner pods that downloads the replay profile and invokes kperf runner replay - Dockerfile: chmod +x for scripts directory Signed-off-by: JasonXuDeveloper - 傑 --- Dockerfile | 4 + replay/runner.go | 22 +++-- replay/schedule.go | 9 +- runner/group/handler.go | 187 ++++++++++++++++++++++++---------------- scripts/run_replay.sh | 47 ++++++++++ 5 files changed, 187 insertions(+), 82 deletions(-) create mode 100644 scripts/run_replay.sh diff --git a/Dockerfile b/Dockerfile index 0c8905fe..e0746534 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,3 +19,7 @@ WORKDIR / COPY --from=build-stage /output/bin/kperf /kperf COPY --from=build-stage /output/bin/runkperf /runkperf COPY scripts/run_runner.sh /run_runner.sh +COPY scripts/run_replay.sh /run_replay.sh + +# Make scripts executable +RUN chmod +x /run_runner.sh /run_replay.sh diff --git a/replay/runner.go b/replay/runner.go index e4d2befe..fb50f4e3 100644 --- a/replay/runner.go +++ b/replay/runner.go @@ -228,7 +228,10 @@ func (r *Runner) Run(ctx context.Context, replayStart time.Time) (*RunnerResult, } // Start worker pool - wg := r.startWorkers(ctx, workers) + wg, err := r.startWorkers(ctx, workers) + if err != nil { + return nil, err + } startTime := time.Now() @@ -371,9 +374,13 @@ func (r *Runner) Run(ctx context.Context, replayStart time.Time) (*RunnerResult, } // startWorkers creates the worker pool that processes requests from the channel. -func (r *Runner) startWorkers(ctx context.Context, workers []*workerMetrics) *sync.WaitGroup { +func (r *Runner) startWorkers(ctx context.Context, workers []*workerMetrics) (*sync.WaitGroup, error) { var wg sync.WaitGroup + if len(r.restClis) == 0 { + return nil, fmt.Errorf("runner %d: no REST clients configured", r.index) + } + for i := 0; i < r.workerCount; i++ { wg.Add(1) @@ -404,7 +411,7 @@ func (r *Runner) startWorkers(ctx context.Context, workers []*workerMetrics) *sy }(i, cli, workers[i]) } - return &wg + return &wg, nil } // executeRequestWithClient executes a single replay request with a specific client. @@ -437,12 +444,11 @@ func (r *Runner) executeRequestWithClient(ctx context.Context, req *types.Replay respMetric.ObserveReceivedBytes(bytes) if err != nil { - // Check if error is due to context cancellation (expected for WATCH when replay ends) - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - // Context cancelled - treat as successful completion for WATCH operations - // This ensures cancelled WATCHes are counted in the total + // Context cancellation is expected for WATCH when replay ends (cancelled by runner). + // For non-WATCH requests, context cancellation is a real failure. + if req.Verb == "WATCH" && (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) { respMetric.ObserveLatency(requester.Method(), requester.MaskedURL().String(), reportLatency) - klog.V(5).Infof("Request cancelled (expected): %s %s", req.Verb, req.APIPath) + klog.V(5).Infof("WATCH cancelled (expected): %s", req.APIPath) return nil } diff --git a/replay/schedule.go b/replay/schedule.go index 712d39f1..2df546d7 100644 --- a/replay/schedule.go +++ b/replay/schedule.go @@ -180,6 +180,11 @@ func ScheduleSingleRunner(ctx context.Context, kubeconfigPath string, profile *t workersPerRunner = profile.Spec.ConnsPerRunner } + // Validate runner index + if runnerIndex < 0 || runnerIndex >= profile.Spec.RunnerCount { + return nil, fmt.Errorf("runner index %d is out of range [0, %d)", runnerIndex, profile.Spec.RunnerCount) + } + // Partition requests for this runner requests := PartitionRequests(profile.Requests, profile.Spec.RunnerCount, runnerIndex) @@ -290,10 +295,10 @@ func validateAndWarnConfig(profile *types.ReplayProfile, runnerRequests [][]type } // Warning: Insufficient workers for QPS - recommendedWorkers := int(qps/qpsPerWorkerEstimate) + qpsPerWorkerEstimate + recommendedWorkers := conns * 3 if workers < recommendedWorkers { klog.Warningf("Runner %d: ClientsPerRunner (%d) may be insufficient for QPS (%.0f). "+ - "Recommend at least %d workers (3-4x connections).", + "Recommend at least %d workers (3x connections).", i, workers, qps, recommendedWorkers) } diff --git a/runner/group/handler.go b/runner/group/handler.go index 27dbf3e1..5afde42f 100644 --- a/runner/group/handler.go +++ b/runner/group/handler.go @@ -112,8 +112,11 @@ func (h *Handler) Info(ctx context.Context) *types.RunnerGroup { // Deploy deploys a group of runners. func (h *Handler) Deploy(ctx context.Context, uploadURL string) error { - if err := h.uploadLoadProfileAsConfigMap(ctx); err != nil { - return fmt.Errorf("failed to ensure if load profile has been uploaded: %w", err) + // Skip uploading config map for replay mode - profile is loaded from URL or PVC + if !h.spec.IsReplayMode() { + if err := h.uploadLoadProfileAsConfigMap(ctx); err != nil { + return fmt.Errorf("failed to ensure if load profile has been uploaded: %w", err) + } } return h.deployRunners(ctx, uploadURL) } @@ -379,60 +382,120 @@ func (h *Handler) buildBatchJobObject(uploadURL string) *batchv1.Job { "app-name-job": h.name, }, } - job.Spec.Template.Spec = corev1.PodSpec{ - Affinity: &corev1.Affinity{}, - Containers: []corev1.Container{ - { - Name: "runner", - Image: h.imageRef, - Env: []corev1.EnvVar{ - { - Name: "POD_NAME", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: metav1.ObjectNameField, - }, - }, - }, - { - Name: "POD_NAMESPACE", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, - }, - }, - { - Name: "POD_UID", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.uid", - }, - }, - }, - { - Name: "TARGET_URL", - Value: uploadURL, - }, - { - Name: "RUNNER_VERBOSITY", - Value: strconv.Itoa(h.runnerVerbosity), - }, + + // Build environment variables + envVars := []corev1.EnvVar{ + { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: metav1.ObjectNameField, }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "config", - MountPath: "/config", - }, - { - Name: "host-root-tmp", - MountPath: "/data", + }, + }, + { + Name: "POD_NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }, + }, + { + Name: "POD_UID", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.uid", + }, + }, + }, + { + Name: "TARGET_URL", + Value: uploadURL, + }, + { + Name: "RUNNER_VERBOSITY", + Value: strconv.Itoa(h.runnerVerbosity), + }, + } + + // Build volume mounts and volumes + volumeMounts := []corev1.VolumeMount{ + { + Name: "host-root-tmp", + MountPath: "/data", + }, + } + + volumes := []corev1.Volume{ + { + Name: "host-root-tmp", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/tmp", + }, + }, + }, + } + + // Determine command and configure mode-specific settings + command := []string{"/run_runner.sh"} + + if h.spec.IsReplayMode() { + // Replay mode configuration + command = []string{"/run_replay.sh"} + + // Add replay profile source env var + envVars = append(envVars, corev1.EnvVar{ + Name: "REPLAY_PROFILE_SOURCE", + Value: *h.spec.ReplayProfile, + }) + + // If PVC is specified, mount it for local file access + if h.spec.ReplayPVCName != nil && *h.spec.ReplayPVCName != "" { + volumes = append(volumes, corev1.Volume{ + Name: "replay-profile", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: *h.spec.ReplayPVCName, + ReadOnly: true, }, }, - Command: []string{ - "/run_runner.sh", + }) + volumeMounts = append(volumeMounts, corev1.VolumeMount{ + Name: "replay-profile", + MountPath: "/mnt/profile", + ReadOnly: true, + }) + } + } else { + // Standard load profile mode - mount config map + volumes = append(volumes, corev1.Volume{ + Name: "config", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: h.name, + }, }, }, + }) + volumeMounts = append(volumeMounts, corev1.VolumeMount{ + Name: "config", + MountPath: "/config", + }) + } + + job.Spec.Template.Spec = corev1.PodSpec{ + Affinity: &corev1.Affinity{}, + Containers: []corev1.Container{ + { + Name: "runner", + Image: h.imageRef, + Env: envVars, + VolumeMounts: volumeMounts, + Command: command, + }, }, RestartPolicy: corev1.RestartPolicyNever, TopologySpreadConstraints: []corev1.TopologySpreadConstraint{ @@ -447,27 +510,7 @@ func (h *Handler) buildBatchJobObject(uploadURL string) *batchv1.Job { }, }, }, - - Volumes: []corev1.Volume{ - { - Name: "config", - VolumeSource: corev1.VolumeSource{ - ConfigMap: &corev1.ConfigMapVolumeSource{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: h.name, - }, - }, - }, - }, - { - Name: "host-root-tmp", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: "/tmp", - }, - }, - }, - }, + Volumes: volumes, } if len(h.spec.NodeAffinity) > 0 { diff --git a/scripts/run_replay.sh b/scripts/run_replay.sh new file mode 100644 index 00000000..c911ea05 --- /dev/null +++ b/scripts/run_replay.sh @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +# +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# +# Runner script for replay mode. Used by runner pods to execute replay tests. +# Expects the following environment variables: +# - POD_NAME: Name of the pod +# - POD_NAMESPACE: Namespace of the pod +# - POD_UID: UID of the pod +# - TARGET_URL: URL to upload results to +# - RUNNER_VERBOSITY: Log verbosity level +# - REPLAY_PROFILE_SOURCE: Path or URL to the replay profile + +set -euo pipefail + +result_file=/data/${POD_NAMESPACE}-${POD_NAME}-${POD_UID}.json + +/kperf -v="${RUNNER_VERBOSITY}" runner replay \ + --config="${REPLAY_PROFILE_SOURCE}" \ + --result="${result_file}" \ + --raw-data + +while true; do + set +e + http_code=$(curl -s -o /dev/null -w "%{http_code}" -XPOST --data-binary "@${result_file}" "${TARGET_URL}" || echo "50X") + set -e + + case $http_code in + 201) + echo "Uploaded it" + exit 0 + ;; + 409) + echo "Has been uploaded, skip" + exit 0; + ;; + 404) + echo "Leaking pod? skip" + exit 1; + ;; + *) + echo "Need to retry after received http code ${http_code} (or failed to connect)" + sleep 5s + ;; + esac +done From f45260e0dee83478fea5b09626b3c9f8a02eb99b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JasonXuDeveloper=20-=20=E5=82=91?= Date: Thu, 19 Feb 2026 11:08:03 +1100 Subject: [PATCH 2/3] fix: resolve replay duration overrun from context cancellation thundering herd Treat context cancellation as success for all verbs (not just WATCH) to prevent mutex contention at shutdown. Remove unnecessary atomic operations on per-worker metrics and simplify WATCH goroutines to fire-and-forget. Update worker recommendation formula to be QPS-based. Co-Authored-By: Claude Opus 4.6 --- replay/runner.go | 45 ++++++++++++++++----------------------------- replay/schedule.go | 4 ++-- 2 files changed, 18 insertions(+), 31 deletions(-) diff --git a/replay/runner.go b/replay/runner.go index fb50f4e3..67a9803b 100644 --- a/replay/runner.go +++ b/replay/runner.go @@ -39,12 +39,12 @@ type timeBucket struct { } // workerMetrics holds per-worker statistics. -// For normal workers, each goroutine has its own instance (no synchronization needed). -// For WATCH metrics (shared across goroutines), atomic operations ensure correctness. +// Each normal worker goroutine has its own instance (no synchronization needed). +// WATCH metrics use fire-and-forget (no concurrent writes to requestsRun/requestsFailed). type workerMetrics struct { respMetric metrics.ResponseMetric - requestsRun int32 - requestsFailed int32 + requestsRun int + requestsFailed int } // groupIntoTimeBuckets groups requests by time buckets to reduce timer overhead. @@ -307,25 +307,12 @@ func (r *Runner) Run(ctx context.Context, replayStart time.Time) (*RunnerResult, clis, err := request.NewClients(r.kubeconfigPath, 1, r.clientOpts...) if err != nil { klog.Errorf("Runner %d: failed to create overflow WATCH connection: %v", r.index, err) - atomic.AddInt32(&watchMetrics.requestsRun, 1) - atomic.AddInt32(&watchMetrics.requestsFailed, 1) return } watchCli = clis[0] } - // Check if context was cancelled during connection setup - select { - case <-watchCtx.Done(): - return - default: - } - - err := r.executeRequestWithClient(watchCtx, req, watchCli, watchMetrics.respMetric) - atomic.AddInt32(&watchMetrics.requestsRun, 1) - if err != nil { - atomic.AddInt32(&watchMetrics.requestsFailed, 1) - } + _ = r.executeRequestWithClient(watchCtx, req, watchCli, watchMetrics.respMetric) }(req) } @@ -339,8 +326,8 @@ func (r *Runner) Run(ctx context.Context, replayStart time.Time) (*RunnerResult, // Aggregate results from all workers (including WATCH metrics) allMetrics := append(workers, watchMetrics) - var totalRun int32 - var totalFailed int32 + var totalRun int + var totalFailed int aggregatedStats := types.ResponseStats{ Errors: make([]types.ResponseError, 0), LatenciesByURL: make(map[string][]float64), @@ -368,8 +355,8 @@ func (r *Runner) Run(ctx context.Context, replayStart time.Time) (*RunnerResult, Total: len(r.requests), Duration: time.Since(startTime), ResponseStats: aggregatedStats, - RequestsRun: int(totalRun), - RequestsFailed: int(totalFailed), + RequestsRun: totalRun, + RequestsFailed: totalFailed, }, nil } @@ -402,10 +389,10 @@ func (r *Runner) startWorkers(ctx context.Context, workers []*workerMetrics) (*s // Execute request with worker's dedicated connection (pointer, no copy) err := r.executeRequestWithClient(ctx, req, restCli, wm.respMetric) - // Track metrics (per-worker, no contention for normal workers) - atomic.AddInt32(&wm.requestsRun, 1) + // Track metrics (per-worker, no contention) + wm.requestsRun++ if err != nil { - atomic.AddInt32(&wm.requestsFailed, 1) + wm.requestsFailed++ } } }(i, cli, workers[i]) @@ -444,11 +431,11 @@ func (r *Runner) executeRequestWithClient(ctx context.Context, req *types.Replay respMetric.ObserveReceivedBytes(bytes) if err != nil { - // Context cancellation is expected for WATCH when replay ends (cancelled by runner). - // For non-WATCH requests, context cancellation is a real failure. - if req.Verb == "WATCH" && (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) { + // Context cancellation is expected when replay duration expires. + // In-flight requests of any verb get cancelled — treat as success. + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { respMetric.ObserveLatency(requester.Method(), requester.MaskedURL().String(), reportLatency) - klog.V(5).Infof("WATCH cancelled (expected): %s", req.APIPath) + klog.V(5).Infof("Request cancelled (expected): %s %s", req.Verb, req.APIPath) return nil } diff --git a/replay/schedule.go b/replay/schedule.go index 2df546d7..c2aad040 100644 --- a/replay/schedule.go +++ b/replay/schedule.go @@ -295,10 +295,10 @@ func validateAndWarnConfig(profile *types.ReplayProfile, runnerRequests [][]type } // Warning: Insufficient workers for QPS - recommendedWorkers := conns * 3 + recommendedWorkers := int(qps/qpsPerWorkerEstimate) + qpsPerWorkerEstimate if workers < recommendedWorkers { klog.Warningf("Runner %d: ClientsPerRunner (%d) may be insufficient for QPS (%.0f). "+ - "Recommend at least %d workers (3x connections).", + "Recommend at least %d workers.", i, workers, qps, recommendedWorkers) } From 58de69cfa1e9e5dd3587b5d3c555037096153ef5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JasonXuDeveloper=20-=20=E5=82=91?= Date: Thu, 19 Feb 2026 12:05:06 +1100 Subject: [PATCH 3/3] fix: enforce profile duration deadline in Schedule and ScheduleSingleRunner Both Schedule and ScheduleSingleRunner were called with context.Background() and never enforced a hard deadline. The replay would run until every request completed naturally (up to 60s timeout each), causing 15-min profiles to run 20+ minutes. Now both functions create a context.WithTimeout based on profile.Duration() plus a 30s grace period. When the deadline fires, in-flight requests get context.Canceled (treated as success per the previous commit), and WATCH connections are torn down immediately. Co-Authored-By: Claude Opus 4.6 --- replay/schedule.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/replay/schedule.go b/replay/schedule.go index c2aad040..bc11fa46 100644 --- a/replay/schedule.go +++ b/replay/schedule.go @@ -121,6 +121,12 @@ func Schedule(ctx context.Context, kubeconfigPath string, profile *types.ReplayP ) } + // Enforce a hard deadline at the profile duration so the replay doesn't overrun. + // Add a small grace period for the last batch of in-flight requests. + profileDuration := time.Duration(profile.Duration()) * time.Millisecond + replayCtx, cancelReplay := context.WithTimeout(ctx, profileDuration+30*time.Second) + defer cancelReplay() + // Synchronize start time across all runners startTime := time.Now() @@ -133,7 +139,7 @@ func Schedule(ctx context.Context, kubeconfigPath string, profile *types.ReplayP wg.Add(1) go func(idx int) { defer wg.Done() - result, err := runners[idx].Run(ctx, startTime) + result, err := runners[idx].Run(replayCtx, startTime) results[idx] = result runnerErrors[idx] = err }(i) @@ -213,10 +219,15 @@ func ScheduleSingleRunner(ctx context.Context, kubeconfigPath string, profile *t clientOpts, ) + // Enforce a hard deadline at the profile duration so the replay doesn't overrun. + profileDuration := time.Duration(profile.Duration()) * time.Millisecond + replayCtx, cancelReplay := context.WithTimeout(ctx, profileDuration+30*time.Second) + defer cancelReplay() + // Use current time as start (each pod will have slightly different start times) replayStart := time.Now() - return runner.Run(ctx, replayStart) + return runner.Run(replayCtx, replayStart) } // aggregateResults combines results from all runners.