Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/types/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ type RunnerMetricReport struct {
PercentileLatenciesByURL map[string][][2]float64 `json:"percentileLatenciesByURL,omitempty"`
}

// MultiSpecRunnerMetricReport contains results for multiple specs with aggregated summary.
type MultiSpecRunnerMetricReport struct {
// PerSpecResults contains individual results for each spec.
PerSpecResults []RunnerMetricReport `json:"perSpecResults,omitempty"`
// Aggregated contains summed/aggregated result across all specs.
Aggregated RunnerMetricReport `json:"aggregated"`
}

// TODO(weifu): build brand new struct for RunnerGroupsReport to include more
// information, like how many runner groups, service account and flow control.
type RunnerGroupsReport = RunnerMetricReport
155 changes: 125 additions & 30 deletions cmd/kperf/commands/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package runner
import (
"context"
"encoding/json"
"time"

"fmt"
"os"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/Azure/kperf/cmd/kperf/commands/utils"
"github.com/Azure/kperf/metrics"
"github.com/Azure/kperf/request"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

"github.com/urfave/cli"
Expand Down Expand Up @@ -103,29 +105,25 @@ var runCommand = cli.Command{
return err
}

// Runner only supports single spec
if len(profileCfg.Specs) > 1 {
return fmt.Errorf("runner only supports single spec, but got %d specs", len(profileCfg.Specs))
// Check for multi-spec CLI override conflict
if len(profileCfg.Specs) > 1 && hasCliOverrides(cliCtx) {
return fmt.Errorf("CLI flag overrides are not allowed when config has multiple specs")
}
pspec := profileCfg.Specs[0]

clientNum := pspec.Conns
// Use first spec for client configuration (all specs share same client pool)
firstSpec := profileCfg.Specs[0]
clientNum := firstSpec.Conns
restClis, err := request.NewClients(kubeCfgPath,
clientNum,
request.WithClientUserAgentOpt(cliCtx.String("user-agent")),
request.WithClientQPSOpt(pspec.Rate),
request.WithClientContentTypeOpt(pspec.ContentType),
request.WithClientDisableHTTP2Opt(pspec.DisableHTTP2),
request.WithClientQPSOpt(firstSpec.Rate),
request.WithClientContentTypeOpt(firstSpec.ContentType),
request.WithClientDisableHTTP2Opt(firstSpec.DisableHTTP2),
)
if err != nil {
return err
}

stats, err := request.Schedule(context.TODO(), &pspec, restClis)
if err != nil {
return err
}

var f *os.File = os.Stdout
outputFilePath := cliCtx.String("result")
if outputFilePath != "" {
Expand All @@ -147,7 +145,14 @@ var runCommand = cli.Command{
}

rawDataFlagIncluded := cliCtx.Bool("raw-data")
err = printResponseStats(f, rawDataFlagIncluded, stats)

// Execute all specs (handles both single and multiple specs uniformly)
perSpecResults, aggregated, err := executeSpecs(context.TODO(), profileCfg.Specs, restClis)
if err != nil {
return err
}

err = printMultiSpecResults(f, rawDataFlagIncluded, perSpecResults, aggregated)
if err != nil {
return fmt.Errorf("error while printing response stats: %w", err)
}
Expand Down Expand Up @@ -211,14 +216,111 @@ func loadConfig(cliCtx *cli.Context) (*types.LoadProfile, error) {
return &profileCfg, nil
}

// printResponseStats prints types.RunnerMetricReport into underlying file.
func printResponseStats(f *os.File, rawDataFlagIncluded bool, stats *request.Result) error {
output := types.RunnerMetricReport{
Total: stats.Total,
ErrorStats: metrics.BuildErrorStatsGroupByType(stats.Errors),
Duration: stats.Duration.String(),
TotalReceivedBytes: stats.TotalReceivedBytes,
// hasCliOverrides checks if any CLI override flags are set.
func hasCliOverrides(cliCtx *cli.Context) bool {
overrideFlags := []string{"rate", "conns", "client", "total", "duration",
"content-type", "disable-http2", "max-retries"}
for _, flag := range overrideFlags {
if cliCtx.IsSet(flag) {
return true
}
}
return false
}

// executeSpecs runs all specs sequentially and returns per-spec + aggregated results.
func executeSpecs(ctx context.Context, specs []types.LoadProfileSpec, restClis []rest.Interface) ([]*request.Result, *request.Result, error) {
if len(specs) == 0 {
return nil, nil, fmt.Errorf("no specs to execute")
}

results := make([]*request.Result, 0, len(specs))
totalDuration := time.Duration(0)

for i, spec := range specs {
klog.V(2).Infof("Executing spec %d/%d", i+1, len(specs))

result, err := request.Schedule(ctx, &spec, restClis)
if err != nil {
return nil, nil, fmt.Errorf("failed to execute spec %d: %w", i+1, err)
}

results = append(results, result)
totalDuration += result.Duration
}

aggregated := aggregateResults(results)
aggregated.Duration = totalDuration

return results, aggregated, nil
}

// aggregateResults combines multiple results into single aggregated result.
func aggregateResults(results []*request.Result) *request.Result {
aggregated := &request.Result{
ResponseStats: types.ResponseStats{
Errors: make([]types.ResponseError, 0),
LatenciesByURL: make(map[string][]float64),
TotalReceivedBytes: 0,
},
Total: 0,
}

for _, result := range results {
// Aggregate errors
aggregated.Errors = append(aggregated.Errors, result.Errors...)

// Aggregate latencies by URL
for url, latencies := range result.LatenciesByURL {
if _, exists := aggregated.LatenciesByURL[url]; !exists {
aggregated.LatenciesByURL[url] = make([]float64, 0)
}
aggregated.LatenciesByURL[url] = append(aggregated.LatenciesByURL[url], latencies...)
}

// Sum bytes and requests
aggregated.TotalReceivedBytes += result.TotalReceivedBytes
aggregated.Total += result.Total
}

return aggregated
}

// printMultiSpecResults prints results for multiple specs with aggregated summary.
func printMultiSpecResults(f *os.File, rawDataFlagIncluded bool, perSpecResults []*request.Result, aggregated *request.Result) error {
// Build per-spec reports
perSpecReports := make([]types.RunnerMetricReport, 0, len(perSpecResults))
for _, result := range perSpecResults {
report := buildRunnerMetricReport(result, rawDataFlagIncluded)
perSpecReports = append(perSpecReports, report)
}

// Build aggregated report
aggregatedReport := buildRunnerMetricReport(aggregated, rawDataFlagIncluded)

// Create multi-spec report
multiReport := types.MultiSpecRunnerMetricReport{
PerSpecResults: perSpecReports,
Aggregated: aggregatedReport,
}

encoder := json.NewEncoder(f)
encoder.SetIndent("", " ")

err := encoder.Encode(multiReport)
if err != nil {
return fmt.Errorf("failed to encode json: %w", err)
}
return nil
}

// buildRunnerMetricReport builds a RunnerMetricReport from request.Result.
func buildRunnerMetricReport(stats *request.Result, includeRawData bool) types.RunnerMetricReport {
output := types.RunnerMetricReport{
Total: stats.Total,
ErrorStats: metrics.BuildErrorStatsGroupByType(stats.Errors),
Duration: stats.Duration.String(),
TotalReceivedBytes: stats.TotalReceivedBytes,
PercentileLatenciesByURL: map[string][][2]float64{},
}

Expand All @@ -236,17 +338,10 @@ func printResponseStats(f *os.File, rawDataFlagIncluded bool, stats *request.Res
output.PercentileLatenciesByURL[u] = metrics.BuildPercentileLatencies(l)
}

if rawDataFlagIncluded {
if includeRawData {
output.LatenciesByURL = stats.LatenciesByURL
output.Errors = stats.Errors
}

encoder := json.NewEncoder(f)
encoder.SetIndent("", " ")

err := encoder.Encode(output)
if err != nil {
return fmt.Errorf("failed to encode json: %w", err)
}
return nil
return output
}
1 change: 1 addition & 0 deletions contrib/cmd/runkperf/commands/bench/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ var Command = cli.Command{
benchNode10Job1Pod1kCase,
benchNode100Job10Pod10kCase,
benchReadUpdateCase,
benchTimeseriesReplayCase,
},
}

Expand Down
71 changes: 71 additions & 0 deletions contrib/cmd/runkperf/commands/bench/timeseries_replay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

package bench

import (
"context"

internaltypes "github.com/Azure/kperf/contrib/internal/types"
"github.com/Azure/kperf/contrib/utils"

"github.com/urfave/cli"
)

var benchTimeseriesReplayCase = cli.Command{
Name: "timeseries_replay",
Usage: `
The test suite demonstrates time-series replay functionality by executing multiple load specs sequentially.
It simulates a traffic pattern with three phases: baseline load, traffic spike, and recovery period.
This allows benchmarking API server performance under varying load conditions over time.
`,
Flags: append(
[]cli.Flag{
cli.IntFlag{
Name: "duration",
Usage: "Duration for each phase in seconds (overrides default)",
},
},
commonFlags...,
),
Action: func(cliCtx *cli.Context) error {
_, err := renderBenchmarkReportInterceptor(
addAPIServerCoresInfoInterceptor(benchTimeseriesReplayRun),
)(cliCtx)
return err
},
}

// benchTimeseriesReplayRun executes the timeseries replay benchmark.
func benchTimeseriesReplayRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, error) {
ctx := context.Background()

// Load the load profile
rgCfgFile, rgSpec, rgCfgFileDone, err := newLoadProfileFromEmbed(cliCtx,
"loadprofile/timeseries_replay.yaml")

if err != nil {
return nil, err
}
defer func() { _ = rgCfgFileDone() }()

// Deploy the runner group
rgResult, derr := utils.DeployRunnerGroup(ctx,
cliCtx.GlobalString("kubeconfig"),
cliCtx.GlobalString("runner-image"),
rgCfgFile,
cliCtx.GlobalString("runner-flowcontrol"),
cliCtx.GlobalString("rg-affinity"),
)

if derr != nil {
return nil, derr
}

return &internaltypes.BenchmarkReport{
Description: "Time-series replay: Baseline (10 QPS, 30s) → Spike (100 QPS, 20s) → Recovery (25 QPS, 30s)",
LoadSpec: *rgSpec,
Result: *rgResult,
Info: map[string]interface{}{},
}, nil
}
61 changes: 61 additions & 0 deletions contrib/internal/manifests/loadprofile/timeseries_replay.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
count: 10
loadProfile:
version: 1
description: "time-series replay - simulating traffic spike pattern"
specs:
# Phase 1: Baseline load (30 seconds)
- rate: 10
duration: 30
conns: 5
client: 10
contentType: json
disableHTTP2: false
maxRetries: 0
requests:
- staleList:
version: v1
resource: pods
shares: 70
- quorumList:
version: v1
resource: pods
limit: 100
shares: 30

# Phase 2: Traffic spike (20 seconds)
- rate: 100
duration: 20
conns: 20
client: 50
contentType: json
disableHTTP2: false
maxRetries: 0
requests:
- staleList:
version: v1
resource: pods
shares: 60
- quorumList:
version: v1
resource: pods
limit: 100
shares: 40

# Phase 3: Recovery period (30 seconds)
- rate: 25
duration: 30
conns: 10
client: 20
contentType: json
disableHTTP2: false
maxRetries: 0
requests:
- staleList:
version: v1
resource: pods
shares: 80
- quorumList:
version: v1
resource: pods
limit: 100
shares: 20
22 changes: 16 additions & 6 deletions runner/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,22 @@ func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *type
continue
}

report := types.RunnerMetricReport{}

err = json.Unmarshal(data, &report)
if err != nil {
klog.V(2).ErrorS(err, "failed to unmarshal", "runner", pod.Name)
continue
var report types.RunnerMetricReport

// Try to unmarshal as MultiSpecRunnerMetricReport first
multiReport := types.MultiSpecRunnerMetricReport{}
err = json.Unmarshal(data, &multiReport)
if err == nil && len(multiReport.PerSpecResults) > 0 {
// Multi-spec format - use aggregated field
report = multiReport.Aggregated
} else {
// Single-spec format or unmarshal error - try as RunnerMetricReport
report = types.RunnerMetricReport{}
err = json.Unmarshal(data, &report)
if err != nil {
klog.V(2).ErrorS(err, "failed to unmarshal", "runner", pod.Name)
continue
}
}

// update totalReceivedBytes
Expand Down
Loading