diff --git a/Dockerfile b/Dockerfile index 0c8905fe..e0746534 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,3 +19,7 @@ WORKDIR / COPY --from=build-stage /output/bin/kperf /kperf COPY --from=build-stage /output/bin/runkperf /runkperf COPY scripts/run_runner.sh /run_runner.sh +COPY scripts/run_replay.sh /run_replay.sh + +# Make scripts executable +RUN chmod +x /run_runner.sh /run_replay.sh diff --git a/replay/runner.go b/replay/runner.go index e4d2befe..67a9803b 100644 --- a/replay/runner.go +++ b/replay/runner.go @@ -39,12 +39,12 @@ type timeBucket struct { } // workerMetrics holds per-worker statistics. -// For normal workers, each goroutine has its own instance (no synchronization needed). -// For WATCH metrics (shared across goroutines), atomic operations ensure correctness. +// Each normal worker goroutine has its own instance (no synchronization needed). +// WATCH metrics use fire-and-forget (no concurrent writes to requestsRun/requestsFailed). type workerMetrics struct { respMetric metrics.ResponseMetric - requestsRun int32 - requestsFailed int32 + requestsRun int + requestsFailed int } // groupIntoTimeBuckets groups requests by time buckets to reduce timer overhead. @@ -228,7 +228,10 @@ func (r *Runner) Run(ctx context.Context, replayStart time.Time) (*RunnerResult, } // Start worker pool - wg := r.startWorkers(ctx, workers) + wg, err := r.startWorkers(ctx, workers) + if err != nil { + return nil, err + } startTime := time.Now() @@ -304,25 +307,12 @@ func (r *Runner) Run(ctx context.Context, replayStart time.Time) (*RunnerResult, clis, err := request.NewClients(r.kubeconfigPath, 1, r.clientOpts...) if err != nil { klog.Errorf("Runner %d: failed to create overflow WATCH connection: %v", r.index, err) - atomic.AddInt32(&watchMetrics.requestsRun, 1) - atomic.AddInt32(&watchMetrics.requestsFailed, 1) return } watchCli = clis[0] } - // Check if context was cancelled during connection setup - select { - case <-watchCtx.Done(): - return - default: - } - - err := r.executeRequestWithClient(watchCtx, req, watchCli, watchMetrics.respMetric) - atomic.AddInt32(&watchMetrics.requestsRun, 1) - if err != nil { - atomic.AddInt32(&watchMetrics.requestsFailed, 1) - } + _ = r.executeRequestWithClient(watchCtx, req, watchCli, watchMetrics.respMetric) }(req) } @@ -336,8 +326,8 @@ func (r *Runner) Run(ctx context.Context, replayStart time.Time) (*RunnerResult, // Aggregate results from all workers (including WATCH metrics) allMetrics := append(workers, watchMetrics) - var totalRun int32 - var totalFailed int32 + var totalRun int + var totalFailed int aggregatedStats := types.ResponseStats{ Errors: make([]types.ResponseError, 0), LatenciesByURL: make(map[string][]float64), @@ -365,15 +355,19 @@ func (r *Runner) Run(ctx context.Context, replayStart time.Time) (*RunnerResult, Total: len(r.requests), Duration: time.Since(startTime), ResponseStats: aggregatedStats, - RequestsRun: int(totalRun), - RequestsFailed: int(totalFailed), + RequestsRun: totalRun, + RequestsFailed: totalFailed, }, nil } // startWorkers creates the worker pool that processes requests from the channel. -func (r *Runner) startWorkers(ctx context.Context, workers []*workerMetrics) *sync.WaitGroup { +func (r *Runner) startWorkers(ctx context.Context, workers []*workerMetrics) (*sync.WaitGroup, error) { var wg sync.WaitGroup + if len(r.restClis) == 0 { + return nil, fmt.Errorf("runner %d: no REST clients configured", r.index) + } + for i := 0; i < r.workerCount; i++ { wg.Add(1) @@ -395,16 +389,16 @@ func (r *Runner) startWorkers(ctx context.Context, workers []*workerMetrics) *sy // Execute request with worker's dedicated connection (pointer, no copy) err := r.executeRequestWithClient(ctx, req, restCli, wm.respMetric) - // Track metrics (per-worker, no contention for normal workers) - atomic.AddInt32(&wm.requestsRun, 1) + // Track metrics (per-worker, no contention) + wm.requestsRun++ if err != nil { - atomic.AddInt32(&wm.requestsFailed, 1) + wm.requestsFailed++ } } }(i, cli, workers[i]) } - return &wg + return &wg, nil } // executeRequestWithClient executes a single replay request with a specific client. @@ -437,10 +431,9 @@ func (r *Runner) executeRequestWithClient(ctx context.Context, req *types.Replay respMetric.ObserveReceivedBytes(bytes) if err != nil { - // Check if error is due to context cancellation (expected for WATCH when replay ends) + // Context cancellation is expected when replay duration expires. + // In-flight requests of any verb get cancelled — treat as success. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - // Context cancelled - treat as successful completion for WATCH operations - // This ensures cancelled WATCHes are counted in the total respMetric.ObserveLatency(requester.Method(), requester.MaskedURL().String(), reportLatency) klog.V(5).Infof("Request cancelled (expected): %s %s", req.Verb, req.APIPath) return nil diff --git a/replay/schedule.go b/replay/schedule.go index 712d39f1..bc11fa46 100644 --- a/replay/schedule.go +++ b/replay/schedule.go @@ -121,6 +121,12 @@ func Schedule(ctx context.Context, kubeconfigPath string, profile *types.ReplayP ) } + // Enforce a hard deadline at the profile duration so the replay doesn't overrun. + // Add a small grace period for the last batch of in-flight requests. + profileDuration := time.Duration(profile.Duration()) * time.Millisecond + replayCtx, cancelReplay := context.WithTimeout(ctx, profileDuration+30*time.Second) + defer cancelReplay() + // Synchronize start time across all runners startTime := time.Now() @@ -133,7 +139,7 @@ func Schedule(ctx context.Context, kubeconfigPath string, profile *types.ReplayP wg.Add(1) go func(idx int) { defer wg.Done() - result, err := runners[idx].Run(ctx, startTime) + result, err := runners[idx].Run(replayCtx, startTime) results[idx] = result runnerErrors[idx] = err }(i) @@ -180,6 +186,11 @@ func ScheduleSingleRunner(ctx context.Context, kubeconfigPath string, profile *t workersPerRunner = profile.Spec.ConnsPerRunner } + // Validate runner index + if runnerIndex < 0 || runnerIndex >= profile.Spec.RunnerCount { + return nil, fmt.Errorf("runner index %d is out of range [0, %d)", runnerIndex, profile.Spec.RunnerCount) + } + // Partition requests for this runner requests := PartitionRequests(profile.Requests, profile.Spec.RunnerCount, runnerIndex) @@ -208,10 +219,15 @@ func ScheduleSingleRunner(ctx context.Context, kubeconfigPath string, profile *t clientOpts, ) + // Enforce a hard deadline at the profile duration so the replay doesn't overrun. + profileDuration := time.Duration(profile.Duration()) * time.Millisecond + replayCtx, cancelReplay := context.WithTimeout(ctx, profileDuration+30*time.Second) + defer cancelReplay() + // Use current time as start (each pod will have slightly different start times) replayStart := time.Now() - return runner.Run(ctx, replayStart) + return runner.Run(replayCtx, replayStart) } // aggregateResults combines results from all runners. @@ -293,7 +309,7 @@ func validateAndWarnConfig(profile *types.ReplayProfile, runnerRequests [][]type recommendedWorkers := int(qps/qpsPerWorkerEstimate) + qpsPerWorkerEstimate if workers < recommendedWorkers { klog.Warningf("Runner %d: ClientsPerRunner (%d) may be insufficient for QPS (%.0f). "+ - "Recommend at least %d workers (3-4x connections).", + "Recommend at least %d workers.", i, workers, qps, recommendedWorkers) } diff --git a/runner/group/handler.go b/runner/group/handler.go index 27dbf3e1..5afde42f 100644 --- a/runner/group/handler.go +++ b/runner/group/handler.go @@ -112,8 +112,11 @@ func (h *Handler) Info(ctx context.Context) *types.RunnerGroup { // Deploy deploys a group of runners. func (h *Handler) Deploy(ctx context.Context, uploadURL string) error { - if err := h.uploadLoadProfileAsConfigMap(ctx); err != nil { - return fmt.Errorf("failed to ensure if load profile has been uploaded: %w", err) + // Skip uploading config map for replay mode - profile is loaded from URL or PVC + if !h.spec.IsReplayMode() { + if err := h.uploadLoadProfileAsConfigMap(ctx); err != nil { + return fmt.Errorf("failed to ensure if load profile has been uploaded: %w", err) + } } return h.deployRunners(ctx, uploadURL) } @@ -379,60 +382,120 @@ func (h *Handler) buildBatchJobObject(uploadURL string) *batchv1.Job { "app-name-job": h.name, }, } - job.Spec.Template.Spec = corev1.PodSpec{ - Affinity: &corev1.Affinity{}, - Containers: []corev1.Container{ - { - Name: "runner", - Image: h.imageRef, - Env: []corev1.EnvVar{ - { - Name: "POD_NAME", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: metav1.ObjectNameField, - }, - }, - }, - { - Name: "POD_NAMESPACE", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, - }, - }, - { - Name: "POD_UID", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.uid", - }, - }, - }, - { - Name: "TARGET_URL", - Value: uploadURL, - }, - { - Name: "RUNNER_VERBOSITY", - Value: strconv.Itoa(h.runnerVerbosity), - }, + + // Build environment variables + envVars := []corev1.EnvVar{ + { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: metav1.ObjectNameField, }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "config", - MountPath: "/config", - }, - { - Name: "host-root-tmp", - MountPath: "/data", + }, + }, + { + Name: "POD_NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }, + }, + { + Name: "POD_UID", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.uid", + }, + }, + }, + { + Name: "TARGET_URL", + Value: uploadURL, + }, + { + Name: "RUNNER_VERBOSITY", + Value: strconv.Itoa(h.runnerVerbosity), + }, + } + + // Build volume mounts and volumes + volumeMounts := []corev1.VolumeMount{ + { + Name: "host-root-tmp", + MountPath: "/data", + }, + } + + volumes := []corev1.Volume{ + { + Name: "host-root-tmp", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/tmp", + }, + }, + }, + } + + // Determine command and configure mode-specific settings + command := []string{"/run_runner.sh"} + + if h.spec.IsReplayMode() { + // Replay mode configuration + command = []string{"/run_replay.sh"} + + // Add replay profile source env var + envVars = append(envVars, corev1.EnvVar{ + Name: "REPLAY_PROFILE_SOURCE", + Value: *h.spec.ReplayProfile, + }) + + // If PVC is specified, mount it for local file access + if h.spec.ReplayPVCName != nil && *h.spec.ReplayPVCName != "" { + volumes = append(volumes, corev1.Volume{ + Name: "replay-profile", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: *h.spec.ReplayPVCName, + ReadOnly: true, }, }, - Command: []string{ - "/run_runner.sh", + }) + volumeMounts = append(volumeMounts, corev1.VolumeMount{ + Name: "replay-profile", + MountPath: "/mnt/profile", + ReadOnly: true, + }) + } + } else { + // Standard load profile mode - mount config map + volumes = append(volumes, corev1.Volume{ + Name: "config", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: h.name, + }, }, }, + }) + volumeMounts = append(volumeMounts, corev1.VolumeMount{ + Name: "config", + MountPath: "/config", + }) + } + + job.Spec.Template.Spec = corev1.PodSpec{ + Affinity: &corev1.Affinity{}, + Containers: []corev1.Container{ + { + Name: "runner", + Image: h.imageRef, + Env: envVars, + VolumeMounts: volumeMounts, + Command: command, + }, }, RestartPolicy: corev1.RestartPolicyNever, TopologySpreadConstraints: []corev1.TopologySpreadConstraint{ @@ -447,27 +510,7 @@ func (h *Handler) buildBatchJobObject(uploadURL string) *batchv1.Job { }, }, }, - - Volumes: []corev1.Volume{ - { - Name: "config", - VolumeSource: corev1.VolumeSource{ - ConfigMap: &corev1.ConfigMapVolumeSource{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: h.name, - }, - }, - }, - }, - { - Name: "host-root-tmp", - VolumeSource: corev1.VolumeSource{ - HostPath: &corev1.HostPathVolumeSource{ - Path: "/tmp", - }, - }, - }, - }, + Volumes: volumes, } if len(h.spec.NodeAffinity) > 0 { diff --git a/scripts/run_replay.sh b/scripts/run_replay.sh new file mode 100644 index 00000000..c911ea05 --- /dev/null +++ b/scripts/run_replay.sh @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +# +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# +# Runner script for replay mode. Used by runner pods to execute replay tests. +# Expects the following environment variables: +# - POD_NAME: Name of the pod +# - POD_NAMESPACE: Namespace of the pod +# - POD_UID: UID of the pod +# - TARGET_URL: URL to upload results to +# - RUNNER_VERBOSITY: Log verbosity level +# - REPLAY_PROFILE_SOURCE: Path or URL to the replay profile + +set -euo pipefail + +result_file=/data/${POD_NAMESPACE}-${POD_NAME}-${POD_UID}.json + +/kperf -v="${RUNNER_VERBOSITY}" runner replay \ + --config="${REPLAY_PROFILE_SOURCE}" \ + --result="${result_file}" \ + --raw-data + +while true; do + set +e + http_code=$(curl -s -o /dev/null -w "%{http_code}" -XPOST --data-binary "@${result_file}" "${TARGET_URL}" || echo "50X") + set -e + + case $http_code in + 201) + echo "Uploaded it" + exit 0 + ;; + 409) + echo "Has been uploaded, skip" + exit 0; + ;; + 404) + echo "Leaking pod? skip" + exit 1; + ;; + *) + echo "Need to retry after received http code ${http_code} (or failed to connect)" + sleep 5s + ;; + esac +done