From f90d81a39e2d1f8e8aaf7b072647213ee50046a9 Mon Sep 17 00:00:00 2001 From: JasonXuDeveloper Date: Fri, 23 Jan 2026 10:25:12 +1100 Subject: [PATCH 1/2] feat: add multi-spec execution with result aggregation and timeseries replay example Add support for executing multiple specs sequentially within a single kperf runner invocation, enabling time-series replay scenarios. Changes: - Add MultiSpecRunnerMetricReport type for multi-spec results - Implement executeSpecs() for sequential spec execution - Implement aggregateResults() to combine per-spec results - Add printMultiSpecResults() for multi-spec output format - Add hasCliOverrides() to detect and prevent CLI overrides with multi-spec - Update buildRunnerGroupSummary() to handle MultiSpecRunnerMetricReport format - Add timeseries_replay example with 3 phases: baseline (10 QPS, 30s), spike (100 QPS, 20s), recovery (25 QPS, 30s) - Add benchTimeseriesReplayCase command to demonstrate time-series replay This enables realistic workload patterns that vary over time, useful for stress testing, capacity planning, and validating system behavior under changing load conditions. Signed-off-by: JasonXuDeveloper --- api/types/metric.go | 8 + cmd/kperf/commands/runner/runner.go | 147 ++++++++++++++---- contrib/cmd/runkperf/commands/bench/root.go | 1 + .../commands/bench/timeseries_replay.go | 71 +++++++++ .../loadprofile/timeseries_replay.yaml | 61 ++++++++ runner/utils.go | 22 ++- 6 files changed, 278 insertions(+), 32 deletions(-) create mode 100644 contrib/cmd/runkperf/commands/bench/timeseries_replay.go create mode 100644 contrib/internal/manifests/loadprofile/timeseries_replay.yaml diff --git a/api/types/metric.go b/api/types/metric.go index ba26fe74..1d26f3d9 100644 --- a/api/types/metric.go +++ b/api/types/metric.go @@ -68,6 +68,14 @@ type RunnerMetricReport struct { PercentileLatenciesByURL map[string][][2]float64 `json:"percentileLatenciesByURL,omitempty"` } +// MultiSpecRunnerMetricReport contains results for multiple specs with aggregated summary. +type MultiSpecRunnerMetricReport struct { + // PerSpecResults contains individual results for each spec. + PerSpecResults []RunnerMetricReport `json:"perSpecResults,omitempty"` + // Aggregated contains summed/aggregated result across all specs. + Aggregated RunnerMetricReport `json:"aggregated"` +} + // TODO(weifu): build brand new struct for RunnerGroupsReport to include more // information, like how many runner groups, service account and flow control. type RunnerGroupsReport = RunnerMetricReport diff --git a/cmd/kperf/commands/runner/runner.go b/cmd/kperf/commands/runner/runner.go index cc06d7ff..6e830962 100644 --- a/cmd/kperf/commands/runner/runner.go +++ b/cmd/kperf/commands/runner/runner.go @@ -6,6 +6,7 @@ package runner import ( "context" "encoding/json" + "time" "fmt" "os" @@ -15,6 +16,7 @@ import ( "github.com/Azure/kperf/cmd/kperf/commands/utils" "github.com/Azure/kperf/metrics" "github.com/Azure/kperf/request" + "k8s.io/client-go/rest" "k8s.io/klog/v2" "github.com/urfave/cli" @@ -103,29 +105,25 @@ var runCommand = cli.Command{ return err } - // Runner only supports single spec - if len(profileCfg.Specs) > 1 { - return fmt.Errorf("runner only supports single spec, but got %d specs", len(profileCfg.Specs)) + // Check for multi-spec CLI override conflict + if len(profileCfg.Specs) > 1 && hasCliOverrides(cliCtx) { + return fmt.Errorf("CLI flag overrides are not allowed when config has multiple specs") } - pspec := profileCfg.Specs[0] - clientNum := pspec.Conns + // Use first spec for client configuration (all specs share same client pool) + firstSpec := profileCfg.Specs[0] + clientNum := firstSpec.Conns restClis, err := request.NewClients(kubeCfgPath, clientNum, request.WithClientUserAgentOpt(cliCtx.String("user-agent")), - request.WithClientQPSOpt(pspec.Rate), - request.WithClientContentTypeOpt(pspec.ContentType), - request.WithClientDisableHTTP2Opt(pspec.DisableHTTP2), + request.WithClientQPSOpt(firstSpec.Rate), + request.WithClientContentTypeOpt(firstSpec.ContentType), + request.WithClientDisableHTTP2Opt(firstSpec.DisableHTTP2), ) if err != nil { return err } - stats, err := request.Schedule(context.TODO(), &pspec, restClis) - if err != nil { - return err - } - var f *os.File = os.Stdout outputFilePath := cliCtx.String("result") if outputFilePath != "" { @@ -147,7 +145,14 @@ var runCommand = cli.Command{ } rawDataFlagIncluded := cliCtx.Bool("raw-data") - err = printResponseStats(f, rawDataFlagIncluded, stats) + + // Execute all specs (handles both single and multiple specs uniformly) + perSpecResults, aggregated, err := executeSpecs(context.TODO(), profileCfg.Specs, restClis) + if err != nil { + return err + } + + err = printMultiSpecResults(f, rawDataFlagIncluded, perSpecResults, aggregated) if err != nil { return fmt.Errorf("error while printing response stats: %w", err) } @@ -211,14 +216,111 @@ func loadConfig(cliCtx *cli.Context) (*types.LoadProfile, error) { return &profileCfg, nil } -// printResponseStats prints types.RunnerMetricReport into underlying file. -func printResponseStats(f *os.File, rawDataFlagIncluded bool, stats *request.Result) error { +// hasCliOverrides checks if any CLI override flags are set. +func hasCliOverrides(cliCtx *cli.Context) bool { + overrideFlags := []string{"rate", "conns", "client", "total", "duration", + "content-type", "disable-http2", "max-retries"} + for _, flag := range overrideFlags { + if cliCtx.IsSet(flag) { + return true + } + } + return false +} + +// executeSpecs runs all specs sequentially and returns per-spec + aggregated results. +func executeSpecs(ctx context.Context, specs []types.LoadProfileSpec, restClis []rest.Interface) ([]*request.Result, *request.Result, error) { + if len(specs) == 0 { + return nil, nil, fmt.Errorf("no specs to execute") + } + + results := make([]*request.Result, 0, len(specs)) + totalDuration := time.Duration(0) + + for i, spec := range specs { + klog.V(2).Infof("Executing spec %d/%d", i+1, len(specs)) + + result, err := request.Schedule(ctx, &spec, restClis) + if err != nil { + return nil, nil, fmt.Errorf("failed to execute spec %d: %w", i+1, err) + } + + results = append(results, result) + totalDuration += result.Duration + } + + aggregated := aggregateResults(results) + aggregated.Duration = totalDuration + + return results, aggregated, nil +} + +// aggregateResults combines multiple results into single aggregated result. +func aggregateResults(results []*request.Result) *request.Result { + aggregated := &request.Result{ + ResponseStats: types.ResponseStats{ + Errors: make([]types.ResponseError, 0), + LatenciesByURL: make(map[string][]float64), + TotalReceivedBytes: 0, + }, + Total: 0, + } + + for _, result := range results { + // Aggregate errors + aggregated.Errors = append(aggregated.Errors, result.Errors...) + + // Aggregate latencies by URL + for url, latencies := range result.LatenciesByURL { + if _, exists := aggregated.LatenciesByURL[url]; !exists { + aggregated.LatenciesByURL[url] = make([]float64, 0) + } + aggregated.LatenciesByURL[url] = append(aggregated.LatenciesByURL[url], latencies...) + } + + // Sum bytes and requests + aggregated.TotalReceivedBytes += result.TotalReceivedBytes + aggregated.Total += result.Total + } + + return aggregated +} + +// printMultiSpecResults prints results for multiple specs with aggregated summary. +func printMultiSpecResults(f *os.File, rawDataFlagIncluded bool, perSpecResults []*request.Result, aggregated *request.Result) error { + // Build per-spec reports + perSpecReports := make([]types.RunnerMetricReport, 0, len(perSpecResults)) + for _, result := range perSpecResults { + report := buildRunnerMetricReport(result, rawDataFlagIncluded) + perSpecReports = append(perSpecReports, report) + } + + // Build aggregated report + aggregatedReport := buildRunnerMetricReport(aggregated, rawDataFlagIncluded) + + // Create multi-spec report + multiReport := types.MultiSpecRunnerMetricReport{ + PerSpecResults: perSpecReports, + Aggregated: aggregatedReport, + } + + encoder := json.NewEncoder(f) + encoder.SetIndent("", " ") + + err := encoder.Encode(multiReport) + if err != nil { + return fmt.Errorf("failed to encode json: %w", err) + } + return nil +} + +// buildRunnerMetricReport builds a RunnerMetricReport from request.Result. +func buildRunnerMetricReport(stats *request.Result, includeRawData bool) types.RunnerMetricReport { output := types.RunnerMetricReport{ Total: stats.Total, ErrorStats: metrics.BuildErrorStatsGroupByType(stats.Errors), Duration: stats.Duration.String(), TotalReceivedBytes: stats.TotalReceivedBytes, - PercentileLatenciesByURL: map[string][][2]float64{}, } @@ -236,17 +338,10 @@ func printResponseStats(f *os.File, rawDataFlagIncluded bool, stats *request.Res output.PercentileLatenciesByURL[u] = metrics.BuildPercentileLatencies(l) } - if rawDataFlagIncluded { + if includeRawData { output.LatenciesByURL = stats.LatenciesByURL output.Errors = stats.Errors } - encoder := json.NewEncoder(f) - encoder.SetIndent("", " ") - - err := encoder.Encode(output) - if err != nil { - return fmt.Errorf("failed to encode json: %w", err) - } - return nil + return output } diff --git a/contrib/cmd/runkperf/commands/bench/root.go b/contrib/cmd/runkperf/commands/bench/root.go index 3d019dba..2f52c9ea 100644 --- a/contrib/cmd/runkperf/commands/bench/root.go +++ b/contrib/cmd/runkperf/commands/bench/root.go @@ -63,6 +63,7 @@ var Command = cli.Command{ benchNode10Job1Pod1kCase, benchNode100Job10Pod10kCase, benchReadUpdateCase, + benchTimeseriesReplayCase, }, } diff --git a/contrib/cmd/runkperf/commands/bench/timeseries_replay.go b/contrib/cmd/runkperf/commands/bench/timeseries_replay.go new file mode 100644 index 00000000..119baadd --- /dev/null +++ b/contrib/cmd/runkperf/commands/bench/timeseries_replay.go @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package bench + +import ( + "context" + + internaltypes "github.com/Azure/kperf/contrib/internal/types" + "github.com/Azure/kperf/contrib/utils" + + "github.com/urfave/cli" +) + +var benchTimeseriesReplayCase = cli.Command{ + Name: "timeseries_replay", + Usage: ` +The test suite demonstrates time-series replay functionality by executing multiple load specs sequentially. +It simulates a traffic pattern with three phases: baseline load, traffic spike, and recovery period. +This allows benchmarking API server performance under varying load conditions over time. +`, + Flags: append( + []cli.Flag{ + cli.IntFlag{ + Name: "duration", + Usage: "Duration for each phase in seconds (overrides default)", + }, + }, + commonFlags..., + ), + Action: func(cliCtx *cli.Context) error { + _, err := renderBenchmarkReportInterceptor( + addAPIServerCoresInfoInterceptor(benchTimeseriesReplayRun), + )(cliCtx) + return err + }, +} + +// benchTimeseriesReplayRun executes the timeseries replay benchmark. +func benchTimeseriesReplayRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, error) { + ctx := context.Background() + + // Load the load profile + rgCfgFile, rgSpec, rgCfgFileDone, err := newLoadProfileFromEmbed(cliCtx, + "loadprofile/timeseries_replay.yaml") + + if err != nil { + return nil, err + } + defer func() { _ = rgCfgFileDone() }() + + // Deploy the runner group + rgResult, derr := utils.DeployRunnerGroup(ctx, + cliCtx.GlobalString("kubeconfig"), + cliCtx.GlobalString("runner-image"), + rgCfgFile, + cliCtx.GlobalString("runner-flowcontrol"), + cliCtx.GlobalString("rg-affinity"), + ) + + if derr != nil { + return nil, derr + } + + return &internaltypes.BenchmarkReport{ + Description: "Time-series replay: Baseline (10 QPS, 30s) → Spike (100 QPS, 20s) → Recovery (25 QPS, 30s)", + LoadSpec: *rgSpec, + Result: *rgResult, + Info: map[string]interface{}{}, + }, nil +} diff --git a/contrib/internal/manifests/loadprofile/timeseries_replay.yaml b/contrib/internal/manifests/loadprofile/timeseries_replay.yaml new file mode 100644 index 00000000..7b8dcc14 --- /dev/null +++ b/contrib/internal/manifests/loadprofile/timeseries_replay.yaml @@ -0,0 +1,61 @@ +count: 10 +loadProfile: + version: 1 + description: "time-series replay - simulating traffic spike pattern" + specs: + # Phase 1: Baseline load (30 seconds) + - rate: 10 + duration: 30 + conns: 5 + client: 10 + contentType: json + disableHTTP2: false + maxRetries: 0 + requests: + - staleList: + version: v1 + resource: pods + shares: 70 + - quorumList: + version: v1 + resource: pods + limit: 100 + shares: 30 + + # Phase 2: Traffic spike (20 seconds) + - rate: 100 + duration: 20 + conns: 20 + client: 50 + contentType: json + disableHTTP2: false + maxRetries: 0 + requests: + - staleList: + version: v1 + resource: pods + shares: 60 + - quorumList: + version: v1 + resource: pods + limit: 100 + shares: 40 + + # Phase 3: Recovery period (30 seconds) + - rate: 25 + duration: 30 + conns: 10 + client: 20 + contentType: json + disableHTTP2: false + maxRetries: 0 + requests: + - staleList: + version: v1 + resource: pods + shares: 80 + - quorumList: + version: v1 + resource: pods + limit: 100 + shares: 20 diff --git a/runner/utils.go b/runner/utils.go index 7ea6ff82..1c8e3a8f 100644 --- a/runner/utils.go +++ b/runner/utils.go @@ -83,12 +83,22 @@ func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *type continue } - report := types.RunnerMetricReport{} - - err = json.Unmarshal(data, &report) - if err != nil { - klog.V(2).ErrorS(err, "failed to unmarshal", "runner", pod.Name) - continue + var report types.RunnerMetricReport + + // Try to unmarshal as MultiSpecRunnerMetricReport first + multiReport := types.MultiSpecRunnerMetricReport{} + err = json.Unmarshal(data, &multiReport) + if err == nil && len(multiReport.PerSpecResults) > 0 { + // Multi-spec format - use aggregated field + report = multiReport.Aggregated + } else { + // Single-spec format or unmarshal error - try as RunnerMetricReport + report = types.RunnerMetricReport{} + err = json.Unmarshal(data, &report) + if err != nil { + klog.V(2).ErrorS(err, "failed to unmarshal", "runner", pod.Name) + continue + } } // update totalReceivedBytes From e05c0d3818cd0dc8ac5c80584f7fdd1a4a840c3a Mon Sep 17 00:00:00 2001 From: JasonXuDeveloper Date: Fri, 23 Jan 2026 11:41:25 +1100 Subject: [PATCH 2/2] fix: apply gofmt formatting to runner.go Signed-off-by: JasonXuDeveloper --- cmd/kperf/commands/runner/runner.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/kperf/commands/runner/runner.go b/cmd/kperf/commands/runner/runner.go index 6e830962..6b19dd15 100644 --- a/cmd/kperf/commands/runner/runner.go +++ b/cmd/kperf/commands/runner/runner.go @@ -317,10 +317,10 @@ func printMultiSpecResults(f *os.File, rawDataFlagIncluded bool, perSpecResults // buildRunnerMetricReport builds a RunnerMetricReport from request.Result. func buildRunnerMetricReport(stats *request.Result, includeRawData bool) types.RunnerMetricReport { output := types.RunnerMetricReport{ - Total: stats.Total, - ErrorStats: metrics.BuildErrorStatsGroupByType(stats.Errors), - Duration: stats.Duration.String(), - TotalReceivedBytes: stats.TotalReceivedBytes, + Total: stats.Total, + ErrorStats: metrics.BuildErrorStatsGroupByType(stats.Errors), + Duration: stats.Duration.String(), + TotalReceivedBytes: stats.TotalReceivedBytes, PercentileLatenciesByURL: map[string][][2]float64{}, }