From 24460babec0fda7260ee7f79a7bd66000e4cec08 Mon Sep 17 00:00:00 2001 From: JasonXuDeveloper Date: Mon, 15 Dec 2025 14:39:48 +1100 Subject: [PATCH 1/7] feat: add ModeConfig interface with backward compatibility Add ModeConfig interface to enable self-configuring execution modes and provide backward compatibility for legacy LoadProfile format. ## Changes ### New files: - api/types/mode_config.go: ModeConfig interface and helpers - api/types/weighted_random_config.go: WeightedRandomConfig implementation - api/types/timeseries_config.go: TimeSeriesConfig implementation - Split test files for better organization ### Modified files: - api/types/load_traffic.go: - Add ExecutionMode type - Update LoadProfileSpec with Mode and ModeConfig fields - Add UnmarshalYAML/UnmarshalJSON for backward compatibility - Fix typo: RequestList.Selector (was "seletor") - Update Validate() method ## Features **ModeConfig Interface:** - Self-configuring modes declare their own overridable fields - Automatic CLI override extraction via BuildOverridesFromCLI() - Mode-specific validation and client configuration - Clean separation between mode logic and CLI tools **Backward Compatibility:** - Legacy format (rate/total/requests fields) auto-migrates to weighted-random mode - Both YAML and JSON unmarshaling supported - Existing load profiles continue to work without changes ## Testing All tests pass including: - Polymorphic deserialization tests - Backward compatibility tests for legacy format - CLI override extraction tests - Mode-specific configuration tests --- api/types/load_traffic.go | 216 ++++++++++-- api/types/load_traffic_test.go | 213 +++--------- api/types/mode_config.go | 87 +++++ api/types/mode_config_test.go | 173 ++++++++++ api/types/timeseries_config.go | 97 ++++++ api/types/timeseries_config_test.go | 151 +++++++++ api/types/weighted_random_config.go | 96 ++++++ api/types/weighted_random_config_test.go | 400 +++++++++++++++++++++++ 8 files changed, 1236 insertions(+), 197 deletions(-) create mode 100644 api/types/mode_config.go create mode 100644 api/types/mode_config_test.go create mode 100644 api/types/timeseries_config.go create mode 100644 api/types/timeseries_config_test.go create mode 100644 api/types/weighted_random_config.go create mode 100644 api/types/weighted_random_config_test.go diff --git a/api/types/load_traffic.go b/api/types/load_traffic.go index d5d9a3b5..e84d892c 100644 --- a/api/types/load_traffic.go +++ b/api/types/load_traffic.go @@ -9,6 +9,7 @@ import ( "strings" apitypes "k8s.io/apimachinery/pkg/types" + "gopkg.in/yaml.v2" ) // ContentType represents the format of response. @@ -31,6 +32,27 @@ func (ct ContentType) Validate() error { } } +// ExecutionMode represents the execution strategy for generating requests. +type ExecutionMode string + +const ( + // ModeWeightedRandom generates requests randomly based on weighted distribution. + ModeWeightedRandom ExecutionMode = "weighted-random" + // ModeTimeSeries replays requests from time-bucketed audit logs. + ModeTimeSeries ExecutionMode = "time-series" +) + +// Validate returns error if ExecutionMode is not supported. +func (em ExecutionMode) Validate() error { + switch em { + case ModeWeightedRandom, ModeTimeSeries: + return nil + default: + return fmt.Errorf("unsupported execution mode: %s", em) + } +} + + // LoadProfile defines how to create load traffic from one host to kube-apiserver. type LoadProfile struct { // Version defines the version of this object. @@ -41,14 +63,8 @@ type LoadProfile struct { Spec LoadProfileSpec `json:"spec" yaml:"spec"` } -// LoadProfileSpec defines the load traffic for traget resource. +// LoadProfileSpec defines the load traffic for target resource. type LoadProfileSpec struct { - // Rate defines the maximum requests per second (zero is no limit). - Rate float64 `json:"rate" yaml:"rate"` - // Total defines the total number of requests. - Total int `json:"total" yaml:"total"` - // Duration defines the running time in seconds. - Duration int `json:"duration" yaml:"duration"` // Conns defines total number of long connections used for traffic. Conns int `json:"conns" yaml:"conns"` // Client defines total number of HTTP clients. @@ -61,9 +77,12 @@ type LoadProfileSpec struct { // retrying upon receiving "Retry-After" headers and 429 status-code // in the response (<= 0 means no retry). MaxRetries int `json:"maxRetries" yaml:"maxRetries"` - // Requests defines the different kinds of requests with weights. - // The executor should randomly pick by weight. - Requests []*WeightedRequest `json:"requests" yaml:"requests"` + + // Mode defines the execution strategy (weighted-random, time-series, etc.). + Mode ExecutionMode `json:"mode" yaml:"mode"` + // ModeConfig contains mode-specific configuration. + // This is automatically deserialized to the correct type based on Mode. + ModeConfig ModeConfig `json:"modeConfig" yaml:"modeConfig"` } // KubeGroupVersionResource identifies the resource URI. @@ -120,7 +139,7 @@ type RequestList struct { // Limit defines the page size. Limit int `json:"limit" yaml:"limit"` // Selector defines how to identify a set of objects. - Selector string `json:"seletor" yaml:"seletor"` + Selector string `json:"selector" yaml:"selector"` // FieldSelector defines how to identify a set of objects with field selector. FieldSelector string `json:"fieldSelector" yaml:"fieldSelector"` } @@ -201,34 +220,181 @@ func (lp LoadProfile) Validate() error { return lp.Spec.Validate() } -// Validate verifies fields of LoadProfileSpec. -func (spec LoadProfileSpec) Validate() error { - if spec.Conns <= 0 { - return fmt.Errorf("conns requires > 0: %v", spec.Conns) +// UnmarshalYAML implements custom YAML unmarshaling for LoadProfileSpec. +// It automatically deserializes ModeConfig to the correct concrete type based on Mode. +// It also provides backward compatibility for legacy format (without mode field). +func (spec *LoadProfileSpec) UnmarshalYAML(unmarshal func(interface{}) error) error { + // Create a temporary struct that has all fields explicitly (no embedding) + type tempSpec struct { + Conns int `yaml:"conns"` + Client int `yaml:"client"` + ContentType ContentType `yaml:"contentType"` + DisableHTTP2 bool `yaml:"disableHTTP2"` + MaxRetries int `yaml:"maxRetries"` + Mode ExecutionMode `yaml:"mode"` + ModeConfig map[string]interface{} `yaml:"modeConfig"` + + // Legacy fields (for backward compatibility) + Rate float64 `yaml:"rate"` + Total int `yaml:"total"` + Duration int `yaml:"duration"` + Requests []*WeightedRequest `yaml:"requests"` + } + + temp := &tempSpec{} + if err := unmarshal(temp); err != nil { + return err } - if spec.Rate < 0 { - return fmt.Errorf("rate requires >= 0: %v", spec.Rate) + // Copy common fields + spec.Conns = temp.Conns + spec.Client = temp.Client + spec.ContentType = temp.ContentType + spec.DisableHTTP2 = temp.DisableHTTP2 + spec.MaxRetries = temp.MaxRetries + + // Check if this is legacy format (no mode specified but has requests) + if temp.Mode == "" && len(temp.Requests) > 0 { + // Auto-migrate legacy format to weighted-random mode + spec.Mode = ModeWeightedRandom + spec.ModeConfig = &WeightedRandomConfig{ + Rate: temp.Rate, + Total: temp.Total, + Duration: temp.Duration, + Requests: temp.Requests, + } + return nil } - if spec.Total <= 0 && spec.Duration <= 0 { - return fmt.Errorf("total requires > 0: %v or duration > 0s: %v", spec.Total, spec.Duration) + // New format: mode is specified + spec.Mode = temp.Mode + + // Now unmarshal ModeConfig based on Mode + if temp.ModeConfig != nil { + var config ModeConfig + switch temp.Mode { + case ModeWeightedRandom: + config = &WeightedRandomConfig{} + case ModeTimeSeries: + config = &TimeSeriesConfig{} + default: + return fmt.Errorf("unknown mode: %s", temp.Mode) + } + + // Convert map to YAML bytes and unmarshal into typed struct + data, err := yaml.Marshal(temp.ModeConfig) + if err != nil { + return fmt.Errorf("failed to marshal modeConfig: %w", err) + } + if err := yaml.Unmarshal(data, config); err != nil { + return fmt.Errorf("failed to unmarshal modeConfig for mode %s: %w", temp.Mode, err) + } + spec.ModeConfig = config + } + + return nil +} + +// UnmarshalJSON implements custom JSON unmarshaling for LoadProfileSpec. +// It automatically deserializes ModeConfig to the correct concrete type based on Mode. +// It also provides backward compatibility for legacy format (without mode field). +func (spec *LoadProfileSpec) UnmarshalJSON(data []byte) error { + // Create a temporary struct that has all fields explicitly (no embedding) + type tempSpec struct { + Conns int `json:"conns"` + Client int `json:"client"` + ContentType ContentType `json:"contentType"` + DisableHTTP2 bool `json:"disableHTTP2"` + MaxRetries int `json:"maxRetries"` + Mode ExecutionMode `json:"mode"` + ModeConfig map[string]interface{} `json:"modeConfig"` + + // Legacy fields (for backward compatibility) + Rate float64 `json:"rate"` + Total int `json:"total"` + Duration int `json:"duration"` + Requests []*WeightedRequest `json:"requests"` + } + + temp := &tempSpec{} + if err := json.Unmarshal(data, temp); err != nil { + return err + } + + // Copy common fields + spec.Conns = temp.Conns + spec.Client = temp.Client + spec.ContentType = temp.ContentType + spec.DisableHTTP2 = temp.DisableHTTP2 + spec.MaxRetries = temp.MaxRetries + + // Check if this is legacy format (no mode specified but has requests) + if temp.Mode == "" && len(temp.Requests) > 0 { + // Auto-migrate legacy format to weighted-random mode + spec.Mode = ModeWeightedRandom + spec.ModeConfig = &WeightedRandomConfig{ + Rate: temp.Rate, + Total: temp.Total, + Duration: temp.Duration, + Requests: temp.Requests, + } + return nil + } + + // New format: mode is specified + spec.Mode = temp.Mode + + // Now unmarshal ModeConfig based on Mode + if temp.ModeConfig != nil { + var config ModeConfig + switch temp.Mode { + case ModeWeightedRandom: + config = &WeightedRandomConfig{} + case ModeTimeSeries: + config = &TimeSeriesConfig{} + default: + return fmt.Errorf("unknown mode: %s", temp.Mode) + } + + // Convert map to JSON bytes and unmarshal into typed struct + data, err := json.Marshal(temp.ModeConfig) + if err != nil { + return fmt.Errorf("failed to marshal modeConfig: %w", err) + } + if err := json.Unmarshal(data, config); err != nil { + return fmt.Errorf("failed to unmarshal modeConfig for mode %s: %w", temp.Mode, err) + } + spec.ModeConfig = config + } + + return nil +} + + +// Validate verifies fields of LoadProfileSpec. +func (spec *LoadProfileSpec) Validate() error { + + // Validate common fields + if spec.Conns <= 0 { + return fmt.Errorf("conns requires > 0: %v", spec.Conns) } if spec.Client <= 0 { return fmt.Errorf("client requires > 0: %v", spec.Client) } - err := spec.ContentType.Validate() - if err != nil { + if err := spec.ContentType.Validate(); err != nil { return err } - for idx, req := range spec.Requests { - if err := req.Validate(); err != nil { - return fmt.Errorf("idx: %v request: %v", idx, err) - } + if err := spec.Mode.Validate(); err != nil { + return err } + + if spec.ModeConfig == nil { + return fmt.Errorf("modeConfig is required") + } + return nil } diff --git a/api/types/load_traffic_test.go b/api/types/load_traffic_test.go index ab64b255..66c40376 100644 --- a/api/types/load_traffic_test.go +++ b/api/types/load_traffic_test.go @@ -7,210 +7,79 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "gopkg.in/yaml.v2" ) -func TestLoadProfileUnmarshalFromYAML(t *testing.T) { - in := ` -version: 1 -description: test -spec: - rate: 100 - total: 10000 - conns: 2 - client: 1 - contentType: json - requests: - - staleGet: - group: core - version: v1 - resource: pods - namespace: default - name: x1 - shares: 100 - - quorumGet: - group: core - version: v1 - resource: configmaps - namespace: default - name: x2 - shares: 150 - - staleList: - group: core - version: v1 - resource: pods - namespace: default - seletor: app=x2 - fieldSelector: spec.nodeName=x - shares: 200 - - quorumList: - group: core - version: v1 - resource: configmaps - namespace: default - limit: 10000 - seletor: app=x3 - shares: 400 - - put: - group: core - version: v1 - resource: configmaps - namespace: kperf - name: kperf- - keySpaceSize: 1000 - valueSize: 1024 - shares: 1000 - - getPodLog: - namespace: default - name: hello - container: main - tailLines: 1000 - limitBytes: 1024 - shares: 10 - - watchList: - group: core - version: v1 - resource: pods - namespace: default - seletor: app=x2 - fieldSelector: spec.nodeName=x - shares: 250 -` - - target := LoadProfile{} - require.NoError(t, yaml.Unmarshal([]byte(in), &target)) - assert.Equal(t, 1, target.Version) - assert.Equal(t, "test", target.Description) - assert.Equal(t, float64(100), target.Spec.Rate) - assert.Equal(t, 10000, target.Spec.Total) - assert.Equal(t, 2, target.Spec.Conns) - assert.Len(t, target.Spec.Requests, 7) - - assert.Equal(t, 100, target.Spec.Requests[0].Shares) - assert.NotNil(t, target.Spec.Requests[0].StaleGet) - assert.Equal(t, "pods", target.Spec.Requests[0].StaleGet.Resource) - assert.Equal(t, "v1", target.Spec.Requests[0].StaleGet.Version) - assert.Equal(t, "core", target.Spec.Requests[0].StaleGet.Group) - assert.Equal(t, "default", target.Spec.Requests[0].StaleGet.Namespace) - assert.Equal(t, "x1", target.Spec.Requests[0].StaleGet.Name) - - assert.NotNil(t, target.Spec.Requests[1].QuorumGet) - assert.Equal(t, 150, target.Spec.Requests[1].Shares) - - assert.Equal(t, 200, target.Spec.Requests[2].Shares) - assert.NotNil(t, target.Spec.Requests[2].StaleList) - assert.Equal(t, "pods", target.Spec.Requests[2].StaleList.Resource) - assert.Equal(t, "v1", target.Spec.Requests[2].StaleList.Version) - assert.Equal(t, "core", target.Spec.Requests[2].StaleList.Group) - assert.Equal(t, "default", target.Spec.Requests[2].StaleList.Namespace) - assert.Equal(t, 0, target.Spec.Requests[2].StaleList.Limit) - assert.Equal(t, "app=x2", target.Spec.Requests[2].StaleList.Selector) - assert.Equal(t, "spec.nodeName=x", target.Spec.Requests[2].StaleList.FieldSelector) - - assert.NotNil(t, target.Spec.Requests[3].QuorumList) - assert.Equal(t, 400, target.Spec.Requests[3].Shares) - - assert.Equal(t, 1000, target.Spec.Requests[4].Shares) - assert.NotNil(t, target.Spec.Requests[4].Put) - assert.Equal(t, "configmaps", target.Spec.Requests[4].Put.Resource) - assert.Equal(t, "v1", target.Spec.Requests[4].Put.Version) - assert.Equal(t, "core", target.Spec.Requests[4].Put.Group) - assert.Equal(t, "kperf", target.Spec.Requests[4].Put.Namespace) - assert.Equal(t, "kperf-", target.Spec.Requests[4].Put.Name) - assert.Equal(t, 1000, target.Spec.Requests[4].Put.KeySpaceSize) - assert.Equal(t, 1024, target.Spec.Requests[4].Put.ValueSize) - - assert.Equal(t, 10, target.Spec.Requests[5].Shares) - assert.NotNil(t, target.Spec.Requests[5].GetPodLog) - assert.Equal(t, "default", target.Spec.Requests[5].GetPodLog.Namespace) - assert.Equal(t, "hello", target.Spec.Requests[5].GetPodLog.Name) - assert.Equal(t, "main", target.Spec.Requests[5].GetPodLog.Container) - assert.Equal(t, int64(1000), *target.Spec.Requests[5].GetPodLog.TailLines) - assert.Equal(t, int64(1024), *target.Spec.Requests[5].GetPodLog.LimitBytes) - - assert.Equal(t, 250, target.Spec.Requests[6].Shares) - assert.NotNil(t, target.Spec.Requests[6].WatchList) - - assert.NoError(t, target.Validate()) -} - func TestWeightedRequest(t *testing.T) { - for _, r := range []struct { - name string - req *WeightedRequest - hasErr bool + tests := map[string]struct { + req WeightedRequest + err bool }{ - { - name: "shares < 0", - req: &WeightedRequest{Shares: -1}, - hasErr: true, + "shares < 0": { + req: WeightedRequest{ + Shares: -1, + }, + err: true, }, - { - name: "no request setting", - req: &WeightedRequest{Shares: 10}, - hasErr: true, + "no request setting": { + req: WeightedRequest{ + Shares: 100, + }, + err: true, }, - { - name: "empty version", - req: &WeightedRequest{ - Shares: 10, - StaleGet: &RequestGet{ + "empty version": { + req: WeightedRequest{ + Shares: 100, + StaleList: &RequestList{ KubeGroupVersionResource: KubeGroupVersionResource{ Resource: "pods", }, }, }, - hasErr: true, + err: true, }, - { - name: "empty resource", - req: &WeightedRequest{ - Shares: 10, - StaleGet: &RequestGet{ + "empty resource": { + req: WeightedRequest{ + Shares: 100, + StaleList: &RequestList{ KubeGroupVersionResource: KubeGroupVersionResource{ - Group: "core", Version: "v1", }, }, }, - hasErr: true, + err: true, }, - { - name: "wrong limit", - req: &WeightedRequest{ - Shares: 10, + "wrong limit": { + req: WeightedRequest{ + Shares: 100, StaleList: &RequestList{ KubeGroupVersionResource: KubeGroupVersionResource{ - Group: "core", - Version: "v1", Resource: "pods", + Version: "v1", }, Limit: -1, }, }, - hasErr: true, + err: true, }, - { - name: "no error", - req: &WeightedRequest{ - Shares: 10, - StaleGet: &RequestGet{ + "no error": { + req: WeightedRequest{ + Shares: 100, + StaleList: &RequestList{ KubeGroupVersionResource: KubeGroupVersionResource{ - Group: "core", - Version: "v1", Resource: "pods", + Version: "v1", }, - Namespace: "default", - Name: "testing", + Limit: 0, }, }, + err: false, }, - } { - r := r - t.Run(r.name, func(t *testing.T) { - err := r.req.Validate() - if r.hasErr { + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + err := tc.req.Validate() + if tc.err { assert.Error(t, err) } else { assert.NoError(t, err) diff --git a/api/types/mode_config.go b/api/types/mode_config.go new file mode 100644 index 00000000..f84d59ce --- /dev/null +++ b/api/types/mode_config.go @@ -0,0 +1,87 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package types + +// ModeConfig is a discriminated union for mode-specific configuration. +// It automatically deserializes to the correct concrete type based on the Mode field. +type ModeConfig interface { + isModeConfig() + // ApplyOverrides applies string key-value overrides to the config. + // This allows each mode to define its own override logic without + // coupling CLI/util code to specific config fields. + // Returns error if override key is invalid for this mode. + ApplyOverrides(overrides map[string]interface{}) error + // GetOverridableFields returns metadata about fields that can be overridden via CLI. + // This allows CLI tools to automatically extract and apply overrides without + // hardcoding field names and types. + GetOverridableFields() []OverridableField + // Validate performs mode-specific validation and normalization. + // This includes checking for conflicting settings and applying defaults. + // defaultOverrides provides default values from CLI (e.g., default total). + Validate(defaultOverrides map[string]interface{}) error + // ConfigureClientOptions returns mode-specific client configuration. + // This allows each mode to customize REST client behavior (e.g., QPS limiting). + ConfigureClientOptions() ClientOptions +} + +// ClientOptions contains mode-specific REST client configuration +type ClientOptions struct { + // QPS is the queries per second limit (0 means no limit) + QPS float64 +} + +// OverridableField describes a config field that can be overridden via CLI flags. +type OverridableField struct { + // Name is the field name (e.g., "rate", "total", "interval") + Name string + // Type describes the field type for CLI parsing + Type FieldType + // Description is help text for CLI flags + Description string +} + +// FieldType indicates the type of a field for CLI flag parsing +type FieldType string + +const ( + FieldTypeFloat64 FieldType = "float64" + FieldTypeInt FieldType = "int" + FieldTypeString FieldType = "string" + FieldTypeBool FieldType = "bool" +) + +// CLIContext is an interface for CLI flag access (wraps urfave/cli.Context) +// This allows the types package to extract overrides without depending on urfave/cli +type CLIContext interface { + IsSet(name string) bool + Float64(name string) float64 + Int(name string) int + String(name string) string + Bool(name string) bool +} + +// BuildOverridesFromCLI automatically builds an override map from CLI flags +// based on the mode config's declared overridable fields. +func BuildOverridesFromCLI(config ModeConfig, cliCtx CLIContext) map[string]interface{} { + overrides := make(map[string]interface{}) + + for _, field := range config.GetOverridableFields() { + if !cliCtx.IsSet(field.Name) { + continue + } + + switch field.Type { + case FieldTypeFloat64: + overrides[field.Name] = cliCtx.Float64(field.Name) + case FieldTypeInt: + overrides[field.Name] = cliCtx.Int(field.Name) + case FieldTypeString: + overrides[field.Name] = cliCtx.String(field.Name) + case FieldTypeBool: + overrides[field.Name] = cliCtx.Bool(field.Name) + } + } + + return overrides +} diff --git a/api/types/mode_config_test.go b/api/types/mode_config_test.go new file mode 100644 index 00000000..4fbd33fe --- /dev/null +++ b/api/types/mode_config_test.go @@ -0,0 +1,173 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package types + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func TestBuildOverridesFromCLI(t *testing.T) { + tests := map[string]struct { + config ModeConfig + cliValues map[string]interface{} + expectedResult map[string]interface{} + }{ + "weighted-random with rate and total": { + config: &WeightedRandomConfig{}, + cliValues: map[string]interface{}{ + "rate": float64(250), + "total": 5000, + }, + expectedResult: map[string]interface{}{ + "rate": float64(250), + "total": 5000, + }, + }, + "weighted-random with duration only": { + config: &WeightedRandomConfig{}, + cliValues: map[string]interface{}{ + "duration": 120, + }, + expectedResult: map[string]interface{}{ + "duration": 120, + }, + }, + "time-series with interval": { + config: &TimeSeriesConfig{}, + cliValues: map[string]interface{}{ + "interval": "500ms", + }, + expectedResult: map[string]interface{}{ + "interval": "500ms", + }, + }, + "no overrides set": { + config: &WeightedRandomConfig{}, + cliValues: map[string]interface{}{}, + expectedResult: map[string]interface{}{}, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + mockCLI := &mockCLIContext{values: tc.cliValues} + result := BuildOverridesFromCLI(tc.config, mockCLI) + assert.Equal(t, tc.expectedResult, result) + }) + } +} + +func TestPolymorphicDeserialization(t *testing.T) { + tests := map[string]struct { + yaml string + expectedMode ExecutionMode + validateFunc func(*testing.T, ModeConfig) + }{ + "weighted-random mode": { + yaml: ` +version: 1 +spec: + mode: weighted-random + conns: 10 + client: 5 + contentType: json + modeConfig: + rate: 150 + total: 2000 + requests: + - shares: 100 + staleGet: + version: v1 + resource: pods + namespace: default + name: test-pod +`, + expectedMode: ModeWeightedRandom, + validateFunc: func(t *testing.T, mc ModeConfig) { + wrConfig, ok := mc.(*WeightedRandomConfig) + require.True(t, ok) + assert.Equal(t, float64(150), wrConfig.Rate) + assert.Equal(t, 2000, wrConfig.Total) + assert.Len(t, wrConfig.Requests, 1) + }, + }, + "time-series mode": { + yaml: ` +version: 1 +spec: + mode: time-series + conns: 10 + client: 5 + contentType: json + modeConfig: + interval: "2s" + buckets: + - startTime: 0.0 + requests: + - method: GET + version: v1 + resource: pods + namespace: default +`, + expectedMode: ModeTimeSeries, + validateFunc: func(t *testing.T, mc ModeConfig) { + tsConfig, ok := mc.(*TimeSeriesConfig) + require.True(t, ok) + assert.Equal(t, "2s", tsConfig.Interval) + assert.Len(t, tsConfig.Buckets, 1) + }, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + var profile LoadProfile + err := yaml.Unmarshal([]byte(tc.yaml), &profile) + require.NoError(t, err) + assert.Equal(t, tc.expectedMode, profile.Spec.Mode) + tc.validateFunc(t, profile.Spec.ModeConfig) + }) + } +} + +type mockCLIContext struct { + values map[string]interface{} +} + +func (m *mockCLIContext) IsSet(name string) bool { + _, exists := m.values[name] + return exists +} + +func (m *mockCLIContext) Float64(name string) float64 { + if v, ok := m.values[name].(float64); ok { + return v + } + return 0 +} + +func (m *mockCLIContext) Int(name string) int { + if v, ok := m.values[name].(int); ok { + return v + } + return 0 +} + +func (m *mockCLIContext) String(name string) string { + if v, ok := m.values[name].(string); ok { + return v + } + return "" +} + +func (m *mockCLIContext) Bool(name string) bool { + if v, ok := m.values[name].(bool); ok { + return v + } + return false +} diff --git a/api/types/timeseries_config.go b/api/types/timeseries_config.go new file mode 100644 index 00000000..5c356fc0 --- /dev/null +++ b/api/types/timeseries_config.go @@ -0,0 +1,97 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package types + +import "fmt" + +// TimeSeriesConfig defines configuration for time-series execution mode. +type TimeSeriesConfig struct { + // Interval defines the time bucket size (e.g., "1s", "60s"). + Interval string `json:"interval" yaml:"interval" mapstructure:"interval"` + // Buckets contains the time-bucketed requests. + Buckets []RequestBucket `json:"buckets" yaml:"buckets" mapstructure:"buckets"` +} + +// RequestBucket represents requests for one time slot. +type RequestBucket struct { + // StartTime is the relative time in seconds from benchmark start. + StartTime float64 `json:"startTime" yaml:"startTime" mapstructure:"startTime"` + // Requests are the exact requests to execute in this bucket. + Requests []ExactRequest `json:"requests" yaml:"requests" mapstructure:"requests"` +} + +// ExactRequest represents a single exact API request. +type ExactRequest struct { + // Method is the HTTP method (GET, POST, PUT, PATCH, DELETE, LIST). + Method string `json:"method" yaml:"method" mapstructure:"method"` + // Group is the API group. + Group string `json:"group,omitempty" yaml:"group,omitempty" mapstructure:"group"` + // Version is the API version. + Version string `json:"version" yaml:"version" mapstructure:"version"` + // Resource is the resource type. + Resource string `json:"resource" yaml:"resource" mapstructure:"resource"` + // Namespace is the object's namespace. + Namespace string `json:"namespace,omitempty" yaml:"namespace,omitempty" mapstructure:"namespace"` + // Name is the object's name. + Name string `json:"name,omitempty" yaml:"name,omitempty" mapstructure:"name"` + // Body is the request body for POST/PUT/PATCH. + Body string `json:"body,omitempty" yaml:"body,omitempty" mapstructure:"body"` + // PatchType is the patch type for PATCH requests. + PatchType string `json:"patchType,omitempty" yaml:"patchType,omitempty" mapstructure:"patchType"` + // LabelSelector for LIST requests. + LabelSelector string `json:"labelSelector,omitempty" yaml:"labelSelector,omitempty" mapstructure:"labelSelector"` + // FieldSelector for LIST requests. + FieldSelector string `json:"fieldSelector,omitempty" yaml:"fieldSelector,omitempty" mapstructure:"fieldSelector"` + // Limit for LIST requests. + Limit int `json:"limit,omitempty" yaml:"limit,omitempty" mapstructure:"limit"` + // ResourceVersion for consistency. + ResourceVersion string `json:"resourceVersion,omitempty" yaml:"resourceVersion,omitempty" mapstructure:"resourceVersion"` +} + +// Ensure TimeSeriesConfig implements ModeConfig +func (*TimeSeriesConfig) isModeConfig() {} + +// GetOverridableFields implements ModeConfig for TimeSeriesConfig +func (c *TimeSeriesConfig) GetOverridableFields() []OverridableField { + return []OverridableField{ + { + Name: "interval", + Type: FieldTypeString, + Description: "Time bucket interval (e.g., '1s', '100ms')", + }, + } +} + +// ApplyOverrides implements ModeConfig for TimeSeriesConfig +func (c *TimeSeriesConfig) ApplyOverrides(overrides map[string]interface{}) error { + for key, value := range overrides { + switch key { + case "interval": + if v, ok := value.(string); ok { + c.Interval = v + } else { + return fmt.Errorf("interval must be string, got %T", value) + } + default: + return fmt.Errorf("unknown override key for time-series mode: %s", key) + } + } + return nil +} + +// Validate implements ModeConfig for TimeSeriesConfig +func (c *TimeSeriesConfig) Validate(defaultOverrides map[string]interface{}) error { + // Time-series mode doesn't have conflicting settings or defaults + // Could add validation for interval format, bucket ordering, etc. + return nil +} + +// ConfigureClientOptions implements ModeConfig for TimeSeriesConfig +func (c *TimeSeriesConfig) ConfigureClientOptions() ClientOptions { + // Time-series mode doesn't use client-side rate limiting + // (rate is controlled by bucket timing) + return ClientOptions{ + QPS: 0, // No limit + } +} diff --git a/api/types/timeseries_config_test.go b/api/types/timeseries_config_test.go new file mode 100644 index 00000000..36e89449 --- /dev/null +++ b/api/types/timeseries_config_test.go @@ -0,0 +1,151 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package types + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func TestTimeSeriesConfigGetOverridableFields(t *testing.T) { + config := &TimeSeriesConfig{} + fields := config.GetOverridableFields() + + assert.Len(t, fields, 1) + assert.Equal(t, "interval", fields[0].Name) + assert.Equal(t, FieldTypeString, fields[0].Type) + assert.Contains(t, fields[0].Description, "Time bucket") +} + +func TestTimeSeriesConfigApplyOverrides(t *testing.T) { + tests := map[string]struct { + initial TimeSeriesConfig + overrides map[string]interface{} + expected TimeSeriesConfig + err bool + }{ + "interval override": { + initial: TimeSeriesConfig{Interval: "1s"}, + overrides: map[string]interface{}{ + "interval": "100ms", + }, + expected: TimeSeriesConfig{Interval: "100ms"}, + err: false, + }, + "invalid interval type": { + initial: TimeSeriesConfig{Interval: "1s"}, + overrides: map[string]interface{}{ + "interval": 123, + }, + expected: TimeSeriesConfig{Interval: "1s"}, + err: true, + }, + "unknown key": { + initial: TimeSeriesConfig{Interval: "1s"}, + overrides: map[string]interface{}{ + "unknown": "value", + }, + expected: TimeSeriesConfig{Interval: "1s"}, + err: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + config := tc.initial + err := config.ApplyOverrides(tc.overrides) + if tc.err { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expected.Interval, config.Interval) + } + }) + } +} + +func TestTimeSeriesConfigValidate(t *testing.T) { + config := &TimeSeriesConfig{Interval: "1s"} + err := config.Validate(nil) + assert.NoError(t, err) +} + +func TestTimeSeriesConfigConfigureClientOptions(t *testing.T) { + config := &TimeSeriesConfig{} + opts := config.ConfigureClientOptions() + assert.Equal(t, float64(0), opts.QPS, "time-series should not use client-side rate limiting") +} + +func TestLoadProfileTimeSeriesUnmarshalFromYAML(t *testing.T) { + in := ` +version: 1 +description: time-series test +spec: + conns: 5 + client: 10 + contentType: json + mode: time-series + modeConfig: + interval: "1s" + buckets: + - startTime: 0.0 + requests: + - method: GET + version: v1 + resource: pods + namespace: default + name: pod-1 + - method: LIST + version: v1 + resource: configmaps + namespace: kube-system + limit: 100 + - startTime: 1.0 + requests: + - method: POST + version: v1 + resource: configmaps + namespace: default + name: cm-1 + body: '{"data": {"key": "value"}}' +` + + target := LoadProfile{} + require.NoError(t, yaml.Unmarshal([]byte(in), &target)) + assert.Equal(t, 1, target.Version) + assert.Equal(t, "time-series test", target.Description) + assert.Equal(t, 5, target.Spec.Conns) + assert.Equal(t, ModeTimeSeries, target.Spec.Mode) + + tsConfig, ok := target.Spec.ModeConfig.(*TimeSeriesConfig) + require.True(t, ok, "ModeConfig should be *TimeSeriesConfig") + require.NotNil(t, tsConfig) + + assert.Equal(t, "1s", tsConfig.Interval) + assert.Len(t, tsConfig.Buckets, 2) + + assert.Equal(t, 0.0, tsConfig.Buckets[0].StartTime) + assert.Len(t, tsConfig.Buckets[0].Requests, 2) + assert.Equal(t, "GET", tsConfig.Buckets[0].Requests[0].Method) + assert.Equal(t, "pods", tsConfig.Buckets[0].Requests[0].Resource) + assert.Equal(t, "default", tsConfig.Buckets[0].Requests[0].Namespace) + assert.Equal(t, "pod-1", tsConfig.Buckets[0].Requests[0].Name) + + assert.Equal(t, "LIST", tsConfig.Buckets[0].Requests[1].Method) + assert.Equal(t, "configmaps", tsConfig.Buckets[0].Requests[1].Resource) + assert.Equal(t, "kube-system", tsConfig.Buckets[0].Requests[1].Namespace) + assert.Equal(t, 100, tsConfig.Buckets[0].Requests[1].Limit) + + assert.Equal(t, 1.0, tsConfig.Buckets[1].StartTime) + assert.Len(t, tsConfig.Buckets[1].Requests, 1) + assert.Equal(t, "POST", tsConfig.Buckets[1].Requests[0].Method) + assert.Equal(t, "configmaps", tsConfig.Buckets[1].Requests[0].Resource) + assert.Equal(t, "cm-1", tsConfig.Buckets[1].Requests[0].Name) + assert.Equal(t, `{"data": {"key": "value"}}`, tsConfig.Buckets[1].Requests[0].Body) + + assert.NoError(t, target.Validate()) +} diff --git a/api/types/weighted_random_config.go b/api/types/weighted_random_config.go new file mode 100644 index 00000000..deaa7af4 --- /dev/null +++ b/api/types/weighted_random_config.go @@ -0,0 +1,96 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package types + +import "fmt" + +// WeightedRandomConfig defines configuration for weighted-random execution mode. +type WeightedRandomConfig struct { + // Rate defines the maximum requests per second (zero is no limit). + Rate float64 `json:"rate" yaml:"rate" mapstructure:"rate"` + // Total defines the total number of requests. + Total int `json:"total" yaml:"total" mapstructure:"total"` + // Duration defines the running time in seconds. + Duration int `json:"duration" yaml:"duration" mapstructure:"duration"` + // Requests defines the different kinds of requests with weights. + Requests []*WeightedRequest `json:"requests" yaml:"requests" mapstructure:"requests"` +} + +// Ensure WeightedRandomConfig implements ModeConfig +func (*WeightedRandomConfig) isModeConfig() {} + +// GetOverridableFields implements ModeConfig for WeightedRandomConfig +func (c *WeightedRandomConfig) GetOverridableFields() []OverridableField { + return []OverridableField{ + { + Name: "rate", + Type: FieldTypeFloat64, + Description: "Maximum requests per second (0 means no limit)", + }, + { + Name: "total", + Type: FieldTypeInt, + Description: "Total number of requests to execute", + }, + { + Name: "duration", + Type: FieldTypeInt, + Description: "Duration in seconds (ignored if total is set)", + }, + } +} + +// ApplyOverrides implements ModeConfig for WeightedRandomConfig +func (c *WeightedRandomConfig) ApplyOverrides(overrides map[string]interface{}) error { + for key, value := range overrides { + switch key { + case "rate": + if v, ok := value.(float64); ok { + c.Rate = v + } else { + return fmt.Errorf("rate must be float64, got %T", value) + } + case "total": + if v, ok := value.(int); ok { + c.Total = v + } else { + return fmt.Errorf("total must be int, got %T", value) + } + case "duration": + if v, ok := value.(int); ok { + c.Duration = v + } else { + return fmt.Errorf("duration must be int, got %T", value) + } + default: + return fmt.Errorf("unknown override key for weighted-random mode: %s", key) + } + } + return nil +} + +// Validate implements ModeConfig for WeightedRandomConfig +func (c *WeightedRandomConfig) Validate(defaultOverrides map[string]interface{}) error { + // Check for conflicting Total and Duration settings + if c.Total > 0 && c.Duration > 0 { + // Both set - Duration is ignored + c.Duration = 0 + } + + // Apply defaults if both are zero + if c.Total == 0 && c.Duration == 0 { + if defaultTotal, ok := defaultOverrides["total"].(int); ok { + c.Total = defaultTotal + } + } + + return nil +} + +// ConfigureClientOptions implements ModeConfig for WeightedRandomConfig +func (c *WeightedRandomConfig) ConfigureClientOptions() ClientOptions { + return ClientOptions{ + QPS: c.Rate, + } +} diff --git a/api/types/weighted_random_config_test.go b/api/types/weighted_random_config_test.go new file mode 100644 index 00000000..1408f64a --- /dev/null +++ b/api/types/weighted_random_config_test.go @@ -0,0 +1,400 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package types + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func TestWeightedRandomConfigGetOverridableFields(t *testing.T) { + config := &WeightedRandomConfig{} + fields := config.GetOverridableFields() + + assert.Len(t, fields, 3) + + fieldMap := make(map[string]OverridableField) + for _, f := range fields { + fieldMap[f.Name] = f + } + + assert.Equal(t, FieldTypeFloat64, fieldMap["rate"].Type) + assert.Contains(t, fieldMap["rate"].Description, "requests per second") + + assert.Equal(t, FieldTypeInt, fieldMap["total"].Type) + assert.Contains(t, fieldMap["total"].Description, "Total number") + + assert.Equal(t, FieldTypeInt, fieldMap["duration"].Type) + assert.Contains(t, fieldMap["duration"].Description, "Duration") +} + +func TestWeightedRandomConfigApplyOverrides(t *testing.T) { + tests := map[string]struct { + initial WeightedRandomConfig + overrides map[string]interface{} + expected WeightedRandomConfig + err bool + }{ + "rate override": { + initial: WeightedRandomConfig{Rate: 100, Total: 1000}, + overrides: map[string]interface{}{ + "rate": float64(200), + }, + expected: WeightedRandomConfig{Rate: 200, Total: 1000}, + err: false, + }, + "total override": { + initial: WeightedRandomConfig{Rate: 100, Total: 1000}, + overrides: map[string]interface{}{ + "total": 2000, + }, + expected: WeightedRandomConfig{Rate: 100, Total: 2000}, + err: false, + }, + "duration override": { + initial: WeightedRandomConfig{Rate: 100, Duration: 60}, + overrides: map[string]interface{}{ + "duration": 120, + }, + expected: WeightedRandomConfig{Rate: 100, Duration: 120}, + err: false, + }, + "multiple overrides": { + initial: WeightedRandomConfig{Rate: 100, Total: 1000, Duration: 60}, + overrides: map[string]interface{}{ + "rate": float64(300), + "total": 3000, + "duration": 180, + }, + expected: WeightedRandomConfig{Rate: 300, Total: 3000, Duration: 180}, + err: false, + }, + "invalid rate type": { + initial: WeightedRandomConfig{Rate: 100}, + overrides: map[string]interface{}{ + "rate": "invalid", + }, + expected: WeightedRandomConfig{Rate: 100}, + err: true, + }, + "invalid total type": { + initial: WeightedRandomConfig{Total: 1000}, + overrides: map[string]interface{}{ + "total": "invalid", + }, + expected: WeightedRandomConfig{Total: 1000}, + err: true, + }, + "unknown key": { + initial: WeightedRandomConfig{Rate: 100}, + overrides: map[string]interface{}{ + "unknown": 123, + }, + expected: WeightedRandomConfig{Rate: 100}, + err: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + config := tc.initial + err := config.ApplyOverrides(tc.overrides) + if tc.err { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expected.Rate, config.Rate) + assert.Equal(t, tc.expected.Total, config.Total) + assert.Equal(t, tc.expected.Duration, config.Duration) + } + }) + } +} + +func TestWeightedRandomConfigValidate(t *testing.T) { + tests := map[string]struct { + config WeightedRandomConfig + defaultOverrides map[string]interface{} + expectedTotal int + expectedDuration int + err bool + }{ + "total and duration set - duration ignored": { + config: WeightedRandomConfig{Total: 1000, Duration: 60}, + defaultOverrides: nil, + expectedTotal: 1000, + expectedDuration: 0, + err: false, + }, + "only total set": { + config: WeightedRandomConfig{Total: 1000}, + defaultOverrides: nil, + expectedTotal: 1000, + expectedDuration: 0, + err: false, + }, + "only duration set": { + config: WeightedRandomConfig{Duration: 60}, + defaultOverrides: nil, + expectedTotal: 0, + expectedDuration: 60, + err: false, + }, + "neither set - default total applied": { + config: WeightedRandomConfig{}, + defaultOverrides: map[string]interface{}{"total": 500}, + expectedTotal: 500, + expectedDuration: 0, + err: false, + }, + "neither set - no default": { + config: WeightedRandomConfig{}, + defaultOverrides: nil, + expectedTotal: 0, + expectedDuration: 0, + err: false, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + config := tc.config + err := config.Validate(tc.defaultOverrides) + if tc.err { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expectedTotal, config.Total) + assert.Equal(t, tc.expectedDuration, config.Duration) + } + }) + } +} + +func TestWeightedRandomConfigConfigureClientOptions(t *testing.T) { + tests := map[string]struct { + config WeightedRandomConfig + expectedQPS float64 + }{ + "rate set": { + config: WeightedRandomConfig{Rate: 100}, + expectedQPS: 100, + }, + "rate zero": { + config: WeightedRandomConfig{Rate: 0}, + expectedQPS: 0, + }, + "high rate": { + config: WeightedRandomConfig{Rate: 10000}, + expectedQPS: 10000, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + opts := tc.config.ConfigureClientOptions() + assert.Equal(t, tc.expectedQPS, opts.QPS) + }) + } +} + +func TestLoadProfileWeightedRandomUnmarshalFromYAML(t *testing.T) { + in := ` +version: 1 +description: test +spec: + conns: 2 + client: 1 + contentType: json + mode: weighted-random + modeConfig: + rate: 100 + total: 10000 + requests: + - staleGet: + group: core + version: v1 + resource: pods + namespace: default + name: x1 + shares: 100 + - quorumGet: + group: core + version: v1 + resource: configmaps + namespace: default + name: x2 + shares: 150 + - staleList: + group: core + version: v1 + resource: pods + namespace: default + selector: app=x2 + fieldSelector: spec.nodeName=x + shares: 200 + - quorumList: + group: core + version: v1 + resource: configmaps + namespace: default + limit: 10000 + selector: app=x3 + shares: 400 + - put: + group: core + version: v1 + resource: configmaps + namespace: kperf + name: kperf- + keySpaceSize: 1000 + valueSize: 1024 + shares: 1000 + - getPodLog: + namespace: default + name: hello + container: main + tailLines: 1000 + limitBytes: 1024 + shares: 10 + - watchList: + group: core + version: v1 + resource: pods + namespace: default + selector: app=x7 + fieldSelector: spec.nodeName=x + shares: 25 +` + + target := LoadProfile{} + require.NoError(t, yaml.Unmarshal([]byte(in), &target)) + assert.Equal(t, 1, target.Version) + assert.Equal(t, "test", target.Description) + assert.Equal(t, 2, target.Spec.Conns) + assert.Equal(t, ModeWeightedRandom, target.Spec.Mode) + + wrConfig, ok := target.Spec.ModeConfig.(*WeightedRandomConfig) + require.True(t, ok, "ModeConfig should be *WeightedRandomConfig") + require.NotNil(t, wrConfig) + + assert.Equal(t, float64(100), wrConfig.Rate) + assert.Equal(t, 10000, wrConfig.Total) + assert.Len(t, wrConfig.Requests, 7) + + assert.Equal(t, 100, wrConfig.Requests[0].Shares) + assert.NotNil(t, wrConfig.Requests[0].StaleGet) + assert.Equal(t, "pods", wrConfig.Requests[0].StaleGet.Resource) + assert.Equal(t, "v1", wrConfig.Requests[0].StaleGet.Version) + assert.Equal(t, "core", wrConfig.Requests[0].StaleGet.Group) + assert.Equal(t, "default", wrConfig.Requests[0].StaleGet.Namespace) + assert.Equal(t, "x1", wrConfig.Requests[0].StaleGet.Name) + + assert.NotNil(t, wrConfig.Requests[1].QuorumGet) + assert.Equal(t, 150, wrConfig.Requests[1].Shares) + + assert.Equal(t, 200, wrConfig.Requests[2].Shares) + assert.NotNil(t, wrConfig.Requests[2].StaleList) + assert.Equal(t, "pods", wrConfig.Requests[2].StaleList.Resource) + assert.Equal(t, "v1", wrConfig.Requests[2].StaleList.Version) + assert.Equal(t, "core", wrConfig.Requests[2].StaleList.Group) + assert.Equal(t, "default", wrConfig.Requests[2].StaleList.Namespace) + assert.Equal(t, 0, wrConfig.Requests[2].StaleList.Limit) + assert.Equal(t, "app=x2", wrConfig.Requests[2].StaleList.Selector) + assert.Equal(t, "spec.nodeName=x", wrConfig.Requests[2].StaleList.FieldSelector) + + assert.NotNil(t, wrConfig.Requests[3].QuorumList) + assert.Equal(t, 400, wrConfig.Requests[3].Shares) + + assert.Equal(t, 1000, wrConfig.Requests[4].Shares) + assert.NotNil(t, wrConfig.Requests[4].Put) + assert.Equal(t, "configmaps", wrConfig.Requests[4].Put.Resource) + assert.Equal(t, "v1", wrConfig.Requests[4].Put.Version) + assert.Equal(t, "core", wrConfig.Requests[4].Put.Group) + assert.Equal(t, "kperf", wrConfig.Requests[4].Put.Namespace) + assert.Equal(t, "kperf-", wrConfig.Requests[4].Put.Name) + assert.Equal(t, 1000, wrConfig.Requests[4].Put.KeySpaceSize) + assert.Equal(t, 1024, wrConfig.Requests[4].Put.ValueSize) + + assert.Equal(t, 10, wrConfig.Requests[5].Shares) + assert.NotNil(t, wrConfig.Requests[5].GetPodLog) + assert.Equal(t, "default", wrConfig.Requests[5].GetPodLog.Namespace) + assert.Equal(t, "hello", wrConfig.Requests[5].GetPodLog.Name) + assert.Equal(t, "main", wrConfig.Requests[5].GetPodLog.Container) + assert.Equal(t, int64(1000), *wrConfig.Requests[5].GetPodLog.TailLines) + assert.Equal(t, int64(1024), *wrConfig.Requests[5].GetPodLog.LimitBytes) + + assert.Equal(t, 25, wrConfig.Requests[6].Shares) + assert.NotNil(t, wrConfig.Requests[6].WatchList) + + assert.NoError(t, target.Validate()) +} + +func TestLoadProfileWeightedRandomUnmarshalFromYAML_LegacyFormat(t *testing.T) { + // Test backward compatibility with legacy format (no mode field) + in := ` +version: 1 +description: legacy format test +spec: + rate: 50 + total: 5000 + duration: 120 + conns: 4 + client: 2 + contentType: json + requests: + - staleGet: + group: core + version: v1 + resource: pods + namespace: default + name: test-pod + shares: 50 + - quorumList: + group: core + version: v1 + resource: configmaps + namespace: default + limit: 100 + shares: 100 +` + + target := LoadProfile{} + require.NoError(t, yaml.Unmarshal([]byte(in), &target)) + + assert.Equal(t, 1, target.Version) + assert.Equal(t, "legacy format test", target.Description) + assert.Equal(t, 4, target.Spec.Conns) + assert.Equal(t, 2, target.Spec.Client) + + // Should auto-migrate to weighted-random mode + assert.Equal(t, ModeWeightedRandom, target.Spec.Mode) + + wrConfig, ok := target.Spec.ModeConfig.(*WeightedRandomConfig) + require.True(t, ok, "ModeConfig should be *WeightedRandomConfig for legacy format") + require.NotNil(t, wrConfig) + + // Verify legacy fields are migrated + assert.Equal(t, float64(50), wrConfig.Rate) + assert.Equal(t, 5000, wrConfig.Total) + assert.Equal(t, 120, wrConfig.Duration) + assert.Len(t, wrConfig.Requests, 2) + + assert.Equal(t, 50, wrConfig.Requests[0].Shares) + assert.NotNil(t, wrConfig.Requests[0].StaleGet) + assert.Equal(t, "pods", wrConfig.Requests[0].StaleGet.Resource) + assert.Equal(t, "v1", wrConfig.Requests[0].StaleGet.Version) + assert.Equal(t, "default", wrConfig.Requests[0].StaleGet.Namespace) + assert.Equal(t, "test-pod", wrConfig.Requests[0].StaleGet.Name) + + assert.Equal(t, 100, wrConfig.Requests[1].Shares) + assert.NotNil(t, wrConfig.Requests[1].QuorumList) + assert.Equal(t, "configmaps", wrConfig.Requests[1].QuorumList.Resource) + assert.Equal(t, 100, wrConfig.Requests[1].QuorumList.Limit) + + assert.NoError(t, target.Validate()) +} From a360381baae5f291dc2227e956ec09bd614ec89f Mon Sep 17 00:00:00 2001 From: JasonXuDeveloper Date: Mon, 15 Dec 2025 14:44:58 +1100 Subject: [PATCH 2/7] feat: implement executor pattern and request builder adapters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement executor pattern for mode-specific execution strategies and adapter pattern for request builders, enabling clean separation and extensibility. ## Changes ### New Executor Pattern (request/executor/) - **executor.go**: Executor interface and metadata types - **factory.go**: Registry-based factory for creating executors - **weighted_random.go**: WeightedRandomExecutor with rate limiting - **timeseries.go**: TimeSeriesExecutor for audit log replay ### Request Builder Adapters (request/builders.go) - CreateRequestBuilderFromWeighted(): Converts WeightedRequest → RESTRequestBuilder - CreateRequestBuilderFromExact(): Converts ExactRequest → RESTRequestBuilder - Centralized factory registration functions - No unnecessary type conversions ### Refactored Scheduler (request/schedule.go) - Uses executor pattern instead of hardcoded mode logic - Mode-agnostic scheduling - Executors manage their own rate limiting ### Simplified Random Module (request/random.go) - Removed WeightedRandomRequests() function - Kept individual request builder functions - Cleaner separation of concerns ## Benefits - Each executor is self-contained and testable - Easy to add new execution modes - No coupling between scheduler and mode-specific logic - Clean adapter pattern for different request types ## Depends On - PR1: mode-config-interface (ModeConfig interface must exist first) --- request/builders.go | 117 +++++++++++++++++++ request/executor/executor.go | 103 +++++++++++++++++ request/executor/factory.go | 72 ++++++++++++ request/executor/timeseries.go | 158 +++++++++++++++++++++++++ request/executor/weighted_random.go | 172 ++++++++++++++++++++++++++++ request/random.go | 121 +------------------ request/requester.go | 10 +- request/schedule.go | 91 +++++++++------ 8 files changed, 684 insertions(+), 160 deletions(-) create mode 100644 request/builders.go create mode 100644 request/executor/executor.go create mode 100644 request/executor/factory.go create mode 100644 request/executor/timeseries.go create mode 100644 request/executor/weighted_random.go diff --git a/request/builders.go b/request/builders.go new file mode 100644 index 00000000..579ed1e6 --- /dev/null +++ b/request/builders.go @@ -0,0 +1,117 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package request + +import ( + "fmt" + + "github.com/Azure/kperf/api/types" + "github.com/Azure/kperf/request/executor" +) + +func init() { + // Register the request builder factories with the executor package + executor.SetRequestBuilderFactory(CreateRequestBuilder) + executor.SetExactRequestBuilderFactory(CreateRequestBuilderFromExact) +} + +// CreateRequestBuilder creates a RESTRequestBuilder from a WeightedRequest. +// This function is used by weighted-random mode executors. +func CreateRequestBuilder(r *types.WeightedRequest, maxRetries int) (executor.RESTRequestBuilder, error) { + var builder executor.RESTRequestBuilder + switch { + case r.StaleList != nil: + builder = newRequestListBuilder(r.StaleList, "0", maxRetries) + case r.QuorumList != nil: + builder = newRequestListBuilder(r.QuorumList, "", maxRetries) + case r.WatchList != nil: + builder = newRequestWatchListBuilder(r.WatchList, maxRetries) + case r.StaleGet != nil: + builder = newRequestGetBuilder(r.StaleGet, "0", maxRetries) + case r.QuorumGet != nil: + builder = newRequestGetBuilder(r.QuorumGet, "", maxRetries) + case r.GetPodLog != nil: + builder = newRequestGetPodLogBuilder(r.GetPodLog, maxRetries) + case r.Patch != nil: + builder = newRequestPatchBuilder(r.Patch, "", maxRetries) + case r.PostDel != nil: + builder = newRequestPostDelBuilder(r.PostDel, "", maxRetries) + default: + return nil, fmt.Errorf("unsupported request type") + } + return builder, nil +} + +// CreateRequestBuilderFromExact creates a RESTRequestBuilder from an ExactRequest. +// This function is used by time-series and other exact-replay mode executors. +func CreateRequestBuilderFromExact(req *types.ExactRequest, maxRetries int) (executor.RESTRequestBuilder, error) { + resourceVersion := req.ResourceVersion + + switch req.Method { + case "GET": + return newRequestGetBuilder(&types.RequestGet{ + KubeGroupVersionResource: types.KubeGroupVersionResource{ + Group: req.Group, + Version: req.Version, + Resource: req.Resource, + }, + Namespace: req.Namespace, + Name: req.Name, + }, resourceVersion, maxRetries), nil + + case "LIST": + return newRequestListBuilder(&types.RequestList{ + KubeGroupVersionResource: types.KubeGroupVersionResource{ + Group: req.Group, + Version: req.Version, + Resource: req.Resource, + }, + Namespace: req.Namespace, + Limit: req.Limit, + Selector: req.LabelSelector, + FieldSelector: req.FieldSelector, + }, resourceVersion, maxRetries), nil + + case "PATCH": + patchType, ok := types.GetPatchType(req.PatchType) + if !ok { + return nil, fmt.Errorf("invalid patch type: %s", req.PatchType) + } + return newRequestPatchBuilder(&types.RequestPatch{ + KubeGroupVersionResource: types.KubeGroupVersionResource{ + Group: req.Group, + Version: req.Version, + Resource: req.Resource, + }, + Namespace: req.Namespace, + Name: req.Name, + Body: req.Body, + PatchType: string(patchType), + }, resourceVersion, maxRetries), nil + + case "POST": + return newRequestPostDelBuilder(&types.RequestPostDel{ + KubeGroupVersionResource: types.KubeGroupVersionResource{ + Group: req.Group, + Version: req.Version, + Resource: req.Resource, + }, + Namespace: req.Namespace, + }, resourceVersion, maxRetries), nil + + case "DELETE": + return newRequestPostDelBuilder(&types.RequestPostDel{ + KubeGroupVersionResource: types.KubeGroupVersionResource{ + Group: req.Group, + Version: req.Version, + Resource: req.Resource, + }, + Namespace: req.Namespace, + DeleteRatio: 1.0, + }, resourceVersion, maxRetries), nil + + default: + return nil, fmt.Errorf("unsupported method: %s", req.Method) + } +} diff --git a/request/executor/executor.go b/request/executor/executor.go new file mode 100644 index 00000000..1217839a --- /dev/null +++ b/request/executor/executor.go @@ -0,0 +1,103 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package executor + +import ( + "context" + "net/url" + "time" + + "github.com/Azure/kperf/api/types" + "k8s.io/client-go/rest" +) + +// RESTRequestBuilder builds REST requests for the Kubernetes API. +// This interface is used by executors to produce requests that workers will execute. +type RESTRequestBuilder interface { + Build(cli rest.Interface) Requester +} + +// Requester represents a request that can be executed. +type Requester interface { + Method() string + URL() *url.URL + MaskedURL() *url.URL + Timeout(time.Duration) + Do(context.Context) (bytes int64, err error) +} + +// Executor generates requests according to a specific execution mode. +// This interface abstracts different request generation strategies, +// allowing the scheduler to be mode-agnostic. +type Executor interface { + // Chan returns a channel that produces RESTRequestBuilders. + // The scheduler's worker pool consumes from this channel. + Chan() <-chan RESTRequestBuilder + + // Run starts the executor and begins producing requests. + // The executor should respect ctx cancellation. + // Returns error if execution fails (except context.Canceled). + Run(ctx context.Context) error + + // Stop gracefully stops the executor and closes the channel. + // Should be idempotent. + Stop() + + // Metadata returns information about this executor. + // Used for logging and metrics. + Metadata() ExecutorMetadata + + // GetRateLimiter returns a rate limiter if this mode requires rate limiting at the worker level. + // Returns nil if no rate limiting is needed (e.g., time-series mode handles timing internally). + GetRateLimiter() RateLimiter + + // GetExecutionContext returns a context that includes mode-specific timeouts (e.g., duration). + // The returned context is derived from the base context and should be used for execution. + GetExecutionContext(baseCtx context.Context) (context.Context, context.CancelFunc) +} + +// RateLimiter is an interface for rate limiting. +// This allows executors to provide custom rate limiting strategies. +type RateLimiter interface { + // Wait blocks until the limiter permits an event to happen. + Wait(ctx context.Context) error +} + +// ExecutorMetadata contains information about an executor's expected behavior. +type ExecutorMetadata struct { + // ExpectedTotal is the total number of requests expected (0 if unbounded). + ExpectedTotal int + + // ExpectedDuration is the expected duration of execution (0 if unbounded). + ExpectedDuration time.Duration + + // Custom contains mode-specific metadata. + // This allows modes to provide additional information without changing the interface. + // Examples: + // - Weighted-random: {"rate": 100, "request_types": 5} + // - Time-series: {"bucket_count": 1800, "interval": "1s"} + // - Poisson: {"lambda": 50, "distribution": "poisson"} + Custom map[string]interface{} +} + +// requestBuilderFactory is a function type for creating request builders from WeightedRequest. +type requestBuilderFactory func(*types.WeightedRequest, int) (RESTRequestBuilder, error) + +// exactRequestBuilderFactory is a function type for creating request builders from ExactRequest. +type exactRequestBuilderFactory func(*types.ExactRequest, int) (RESTRequestBuilder, error) + +var createRequestBuilderFunc requestBuilderFactory +var createExactRequestBuilderFunc exactRequestBuilderFactory + +// SetRequestBuilderFactory sets the factory function for creating request builders from WeightedRequest. +// This is called by the request package during initialization to avoid import cycles. +func SetRequestBuilderFactory(factory requestBuilderFactory) { + createRequestBuilderFunc = factory +} + +// SetExactRequestBuilderFactory sets the factory function for creating request builders from ExactRequest. +// This is called by the request package during initialization to avoid import cycles. +func SetExactRequestBuilderFactory(factory exactRequestBuilderFactory) { + createExactRequestBuilderFunc = factory +} diff --git a/request/executor/factory.go b/request/executor/factory.go new file mode 100644 index 00000000..6f5d67f1 --- /dev/null +++ b/request/executor/factory.go @@ -0,0 +1,72 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package executor + +import ( + "fmt" + + "github.com/Azure/kperf/api/types" +) + +// ExecutorConstructor creates an executor from a LoadProfileSpec. +type ExecutorConstructor func(spec *types.LoadProfileSpec) (Executor, error) + +// ExecutorFactory creates executors for different execution modes. +type ExecutorFactory struct { + constructors map[string]ExecutorConstructor +} + +var defaultFactory = NewExecutorFactory() + +// NewExecutorFactory creates a new factory with built-in modes registered. +func NewExecutorFactory() *ExecutorFactory { + f := &ExecutorFactory{ + constructors: make(map[string]ExecutorConstructor), + } + + f.Register(string(types.ModeWeightedRandom), NewWeightedRandomExecutor) + f.Register(string(types.ModeTimeSeries), NewTimeSeriesExecutor) + + return f +} + +// Register registers a new mode constructor. +func (f *ExecutorFactory) Register(mode string, constructor ExecutorConstructor) { + f.constructors[mode] = constructor +} + +// RegisterMode registers a mode constructor using the ExecutionMode type. +func (f *ExecutorFactory) RegisterMode(mode types.ExecutionMode, constructor ExecutorConstructor) { + f.Register(string(mode), constructor) +} + +// Create creates an executor for the given mode. +func (f *ExecutorFactory) Create(spec *types.LoadProfileSpec) (Executor, error) { + modeStr := string(spec.Mode) + constructor, ok := f.constructors[modeStr] + if !ok { + return nil, fmt.Errorf("unknown executor mode: %s (available modes: %v)", + spec.Mode, f.AvailableModes()) + } + return constructor(spec) +} + +// AvailableModes returns a list of registered mode names. +func (f *ExecutorFactory) AvailableModes() []string { + modes := make([]string, 0, len(f.constructors)) + for mode := range f.constructors { + modes = append(modes, mode) + } + return modes +} + +// CreateExecutor is a global convenience function that uses the default factory. +func CreateExecutor(spec *types.LoadProfileSpec) (Executor, error) { + return defaultFactory.Create(spec) +} + +// RegisterMode allows external packages to register custom executors. +func RegisterMode(mode types.ExecutionMode, constructor ExecutorConstructor) { + defaultFactory.RegisterMode(mode, constructor) +} diff --git a/request/executor/timeseries.go b/request/executor/timeseries.go new file mode 100644 index 00000000..4f426618 --- /dev/null +++ b/request/executor/timeseries.go @@ -0,0 +1,158 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package executor + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/Azure/kperf/api/types" +) + +// TimeSeriesExecutor implements Executor for time-series replay mode. +// It dispatches requests according to recorded timestamps from audit logs. +type TimeSeriesExecutor struct { + config *types.TimeSeriesConfig + spec *types.LoadProfileSpec + interval time.Duration + buckets []types.RequestBucket + reqBuilderCh chan RESTRequestBuilder + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + once sync.Once +} + +// NewTimeSeriesExecutor creates a new time series executor from spec. +func NewTimeSeriesExecutor(spec *types.LoadProfileSpec) (Executor, error) { + if spec.Mode != types.ModeTimeSeries { + return nil, fmt.Errorf("expected mode %s, got %s", types.ModeTimeSeries, spec.Mode) + } + + if spec.ModeConfig == nil { + return nil, fmt.Errorf("modeConfig is required") + } + + // Type assert to TimeSeriesConfig + config, ok := spec.ModeConfig.(*types.TimeSeriesConfig) + if !ok { + return nil, fmt.Errorf("invalid config type for time-series mode") + } + + interval, err := time.ParseDuration(config.Interval) + if err != nil { + return nil, fmt.Errorf("invalid interval: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + return &TimeSeriesExecutor{ + config: config, + spec: spec, + interval: interval, + buckets: config.Buckets, + reqBuilderCh: make(chan RESTRequestBuilder), + ctx: ctx, + cancel: cancel, + }, nil +} + +// Chan returns the channel that produces request builders. +func (e *TimeSeriesExecutor) Chan() <-chan RESTRequestBuilder { + return e.reqBuilderCh +} + +// Run starts the executor and begins replaying requests. +func (e *TimeSeriesExecutor) Run(ctx context.Context) error { + e.wg.Add(1) + defer e.wg.Done() + + startTime := time.Now() + + for _, bucket := range e.buckets { + targetTime := startTime.Add(time.Duration(bucket.StartTime * float64(time.Second))) + + // Wait until target time + select { + case <-time.After(time.Until(targetTime)): + case <-ctx.Done(): + return ctx.Err() + case <-e.ctx.Done(): + return e.ctx.Err() + } + + // Dispatch requests in this bucket + for _, req := range bucket.Requests { + builder := e.createBuilderForExactRequest(&req) + if builder == nil { + continue + } + select { + case e.reqBuilderCh <- builder: + case <-ctx.Done(): + return ctx.Err() + case <-e.ctx.Done(): + return e.ctx.Err() + } + } + } + + return nil +} + +// Stop gracefully stops the executor. +func (e *TimeSeriesExecutor) Stop() { + e.once.Do(func() { + e.cancel() + e.wg.Wait() + close(e.reqBuilderCh) + }) +} + +// Metadata returns executor metadata. +func (e *TimeSeriesExecutor) Metadata() ExecutorMetadata { + totalRequests := 0 + for _, bucket := range e.buckets { + totalRequests += len(bucket.Requests) + } + + maxDuration := 0.0 + if len(e.buckets) > 0 { + maxDuration = e.buckets[len(e.buckets)-1].StartTime + } + + return ExecutorMetadata{ + ExpectedTotal: totalRequests, + ExpectedDuration: time.Duration(maxDuration * float64(time.Second)), + Custom: map[string]interface{}{ + "mode": string(types.ModeTimeSeries), + "bucket_count": len(e.buckets), + "interval": e.interval.String(), + }, + } +} + +// createBuilderForExactRequest creates a request builder from an ExactRequest. +func (e *TimeSeriesExecutor) createBuilderForExactRequest(req *types.ExactRequest) RESTRequestBuilder { + if createExactRequestBuilderFunc == nil { + return nil + } + + builder, err := createExactRequestBuilderFunc(req, e.spec.MaxRetries) + if err != nil { + return nil + } + return builder +} + +// GetRateLimiter returns nil because time-series mode handles timing internally. +func (e *TimeSeriesExecutor) GetRateLimiter() RateLimiter { + return nil +} + +// GetExecutionContext returns a simple cancellable context (no duration timeout). +func (e *TimeSeriesExecutor) GetExecutionContext(baseCtx context.Context) (context.Context, context.CancelFunc) { + return context.WithCancel(baseCtx) +} diff --git a/request/executor/weighted_random.go b/request/executor/weighted_random.go new file mode 100644 index 00000000..59889a3a --- /dev/null +++ b/request/executor/weighted_random.go @@ -0,0 +1,172 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package executor + +import ( + "context" + "crypto/rand" + "fmt" + "math" + "math/big" + "sync" + "time" + + "github.com/Azure/kperf/api/types" + "golang.org/x/time/rate" +) + +// WeightedRandomExecutor implements Executor for weighted-random mode. +// It generates requests randomly based on weighted distribution. +type WeightedRandomExecutor struct { + config *types.WeightedRandomConfig + spec *types.LoadProfileSpec + limiter *rate.Limiter + reqBuilderCh chan RESTRequestBuilder + shares []int + reqBuilders []RESTRequestBuilder + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + once sync.Once +} + +// NewWeightedRandomExecutor creates a new weighted random executor from spec. +func NewWeightedRandomExecutor(spec *types.LoadProfileSpec) (Executor, error) { + if spec.Mode != types.ModeWeightedRandom { + return nil, fmt.Errorf("expected mode %s, got %s", types.ModeWeightedRandom, spec.Mode) + } + + if spec.ModeConfig == nil { + return nil, fmt.Errorf("modeConfig is required") + } + + // Type assert to WeightedRandomConfig + config, ok := spec.ModeConfig.(*types.WeightedRandomConfig) + if !ok { + return nil, fmt.Errorf("invalid config type for weighted-random mode") + } + + // Build request builders + shares := make([]int, 0, len(config.Requests)) + reqBuilders := make([]RESTRequestBuilder, 0, len(config.Requests)) + for _, r := range config.Requests { + shares = append(shares, r.Shares) + if createRequestBuilderFunc == nil { + return nil, fmt.Errorf("request builder factory not initialized") + } + builder, err := createRequestBuilderFunc(r, spec.MaxRetries) + if err != nil { + return nil, fmt.Errorf("failed to create request builder: %v", err) + } + reqBuilders = append(reqBuilders, builder) + } + + // Create rate limiter + qps := config.Rate + if qps == 0 { + qps = float64(math.MaxInt32) + } + limiter := rate.NewLimiter(rate.Limit(qps), 1) + + ctx, cancel := context.WithCancel(context.Background()) + return &WeightedRandomExecutor{ + config: config, + spec: spec, + limiter: limiter, + reqBuilderCh: make(chan RESTRequestBuilder), + shares: shares, + reqBuilders: reqBuilders, + ctx: ctx, + cancel: cancel, + }, nil +} + +// Chan returns the channel that produces request builders. +func (e *WeightedRandomExecutor) Chan() <-chan RESTRequestBuilder { + return e.reqBuilderCh +} + +// Run starts the executor and begins generating requests. +func (e *WeightedRandomExecutor) Run(ctx context.Context) error { + e.wg.Add(1) + defer e.wg.Done() + + total := e.config.Total + sum := 0 + + for { + if total > 0 && sum >= total { + break + } + + builder := e.randomPick() + select { + case e.reqBuilderCh <- builder: + sum++ + case <-e.ctx.Done(): + return e.ctx.Err() + case <-ctx.Done(): + return ctx.Err() + } + } + return nil +} + +// Stop gracefully stops the executor. +func (e *WeightedRandomExecutor) Stop() { + e.once.Do(func() { + e.cancel() + e.wg.Wait() + close(e.reqBuilderCh) + }) +} + +// Metadata returns executor metadata. +func (e *WeightedRandomExecutor) Metadata() ExecutorMetadata { + return ExecutorMetadata{ + ExpectedTotal: e.config.Total, + ExpectedDuration: time.Duration(e.config.Duration) * time.Second, + Custom: map[string]interface{}{ + "mode": string(types.ModeWeightedRandom), + "rate": e.config.Rate, + "request_types": len(e.config.Requests), + }, + } +} + +// randomPick randomly selects a request builder based on weights. +func (e *WeightedRandomExecutor) randomPick() RESTRequestBuilder { + sum := 0 + for _, s := range e.shares { + sum += s + } + + rndInt, err := rand.Int(rand.Reader, big.NewInt(int64(sum))) + if err != nil { + panic(err) + } + + rnd := rndInt.Int64() + for i := range e.shares { + s := int64(e.shares[i]) + if rnd < s { + return e.reqBuilders[i] + } + rnd -= s + } + panic("unreachable") +} + +// GetRateLimiter returns the rate limiter for worker-level rate limiting. +func (e *WeightedRandomExecutor) GetRateLimiter() RateLimiter { + return e.limiter +} + +// GetExecutionContext returns a context with duration timeout if configured. +func (e *WeightedRandomExecutor) GetExecutionContext(baseCtx context.Context) (context.Context, context.CancelFunc) { + if e.config.Duration > 0 { + return context.WithTimeout(baseCtx, time.Duration(e.config.Duration)*time.Second) + } + return context.WithCancel(baseCtx) +} diff --git a/request/random.go b/request/random.go index 32012e04..aaad8614 100644 --- a/request/random.go +++ b/request/random.go @@ -8,12 +8,12 @@ import ( "crypto/rand" "fmt" "math/big" - "sync" "sync/atomic" "time" "github.com/Azure/kperf/api/types" "github.com/Azure/kperf/contrib/utils" + "github.com/Azure/kperf/request/executor" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -23,125 +23,8 @@ import ( "k8s.io/client-go/rest" ) -// WeightedRandomRequests is used to generate requests based on LoadProfileSpec. -type WeightedRandomRequests struct { - once sync.Once - wg sync.WaitGroup - ctx context.Context - cancel context.CancelFunc - reqBuilderCh chan RESTRequestBuilder - - shares []int - reqBuilders []RESTRequestBuilder -} - -// NewWeightedRandomRequests creates new instance of WeightedRandomRequests. -func NewWeightedRandomRequests(spec *types.LoadProfileSpec) (*WeightedRandomRequests, error) { - if err := spec.Validate(); err != nil { - return nil, fmt.Errorf("invalid load profile spec: %v", err) - } - - shares := make([]int, 0, len(spec.Requests)) - reqBuilders := make([]RESTRequestBuilder, 0, len(spec.Requests)) - for _, r := range spec.Requests { - shares = append(shares, r.Shares) - - var builder RESTRequestBuilder - switch { - case r.StaleList != nil: - builder = newRequestListBuilder(r.StaleList, "0", spec.MaxRetries) - case r.QuorumList != nil: - builder = newRequestListBuilder(r.QuorumList, "", spec.MaxRetries) - case r.WatchList != nil: - builder = newRequestWatchListBuilder(r.WatchList, spec.MaxRetries) - case r.StaleGet != nil: - builder = newRequestGetBuilder(r.StaleGet, "0", spec.MaxRetries) - case r.QuorumGet != nil: - builder = newRequestGetBuilder(r.QuorumGet, "", spec.MaxRetries) - case r.GetPodLog != nil: - builder = newRequestGetPodLogBuilder(r.GetPodLog, spec.MaxRetries) - case r.Patch != nil: - builder = newRequestPatchBuilder(r.Patch, "", spec.MaxRetries) - case r.PostDel != nil: - builder = newRequestPostDelBuilder(r.PostDel, "", spec.MaxRetries) - default: - return nil, fmt.Errorf("not implement for PUT yet") - } - reqBuilders = append(reqBuilders, builder) - } - - ctx, cancel := context.WithCancel(context.Background()) - return &WeightedRandomRequests{ - ctx: ctx, - cancel: cancel, - reqBuilderCh: make(chan RESTRequestBuilder), - shares: shares, - reqBuilders: reqBuilders, - }, nil -} - -// Run starts to random pick request. -func (r *WeightedRandomRequests) Run(ctx context.Context, total int) { - defer r.wg.Done() - r.wg.Add(1) - - sum := 0 - for { - if total > 0 && sum >= total { - break - } - builder := r.randomPick() - select { - case r.reqBuilderCh <- builder: - sum++ - case <-r.ctx.Done(): - return - case <-ctx.Done(): - return - } - } -} - -// Chan returns channel to get random request. -func (r *WeightedRandomRequests) Chan() chan RESTRequestBuilder { - return r.reqBuilderCh -} - -func (r *WeightedRandomRequests) randomPick() RESTRequestBuilder { - sum := 0 - for _, s := range r.shares { - sum += s - } - - rndInt, err := rand.Int(rand.Reader, big.NewInt(int64(sum))) - if err != nil { - panic(err) - } - - rnd := rndInt.Int64() - for i := range r.shares { - s := int64(r.shares[i]) - if rnd < s { - return r.reqBuilders[i] - } - rnd -= s - } - panic("unreachable") -} - -// Stop stops request generator. -func (r *WeightedRandomRequests) Stop() { - r.once.Do(func() { - r.cancel() - r.wg.Wait() - close(r.reqBuilderCh) - }) -} - // RESTRequestBuilder is used to build rest.Request. -type RESTRequestBuilder interface { - Build(cli rest.Interface) Requester -} +type RESTRequestBuilder = executor.RESTRequestBuilder type requestGetBuilder struct { version schema.GroupVersion diff --git a/request/requester.go b/request/requester.go index 9737b42d..dfe154ef 100644 --- a/request/requester.go +++ b/request/requester.go @@ -11,6 +11,7 @@ import ( "time" _ "unsafe" // unsafe to use internal function from client-go + "github.com/Azure/kperf/request/executor" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/rest" @@ -18,13 +19,8 @@ import ( "k8s.io/utils/clock" ) -type Requester interface { - Method() string - URL() *url.URL - MaskedURL() *url.URL - Timeout(time.Duration) - Do(context.Context) (bytes int64, err error) -} +// Requester is a type alias for executor.Requester. +type Requester = executor.Requester type BaseRequester struct { method string diff --git a/request/schedule.go b/request/schedule.go index b6a09706..b6f3c461 100644 --- a/request/schedule.go +++ b/request/schedule.go @@ -6,15 +6,15 @@ package request import ( "context" "errors" - "math" + "fmt" "sync" "time" "github.com/Azure/kperf/api/types" "github.com/Azure/kperf/metrics" + "github.com/Azure/kperf/request/executor" "golang.org/x/net/http2" - "golang.org/x/time/rate" "k8s.io/client-go/rest" "k8s.io/klog/v2" ) @@ -30,47 +30,60 @@ type Result struct { Total int } -// Schedule files requests to apiserver based on LoadProfileSpec. +// Schedule executes requests to apiserver based on LoadProfileSpec using the executor pattern. func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.Interface) (*Result, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() - rndReqs, err := NewWeightedRandomRequests(spec) + // Create executor for the specified mode + exec, err := executor.CreateExecutor(spec) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create executor: %v", err) } + defer exec.Stop() - qps := spec.Rate - if qps == 0 { - qps = float64(math.MaxInt32) - } - limiter := rate.NewLimiter(rate.Limit(qps), 1) + // Get metadata for logging + metadata := exec.Metadata() + + // Get execution context with mode-specific timeouts + execCtx, execCancel := exec.GetExecutionContext(ctx) + defer execCancel() + // Get rate limiter (nil if mode doesn't need it) + limiter := exec.GetRateLimiter() + + // Worker pool - start workers BEFORE executor to avoid unbuffered channel deadlock clients := spec.Client if clients == 0 { clients = spec.Conns } - reqBuilderCh := rndReqs.Chan() + respMetric := metrics.NewResponseMetric() var wg sync.WaitGroup - respMetric := metrics.NewResponseMetric() + reqBuilderCh := exec.Chan() for i := 0; i < clients; i++ { - // reuse connection if clients > conns cli := restCli[i%len(restCli)] wg.Add(1) - go func(cli rest.Interface) { + go func(workerID int, cli rest.Interface) { defer wg.Done() - for builder := range reqBuilderCh { - req := builder.Build(cli) + klog.V(5).Infof("Worker %d started, waiting for requests", workerID) + requestCount := 0 - if err := limiter.Wait(ctx); err != nil { - klog.V(5).Infof("Rate limiter wait failed: %v", err) - cancel() - return + for builder := range reqBuilderCh { + // Apply rate limiting (if configured) + if limiter != nil { + if err := limiter.Wait(ctx); err != nil { + klog.V(5).Infof("Worker %d: Rate limiter wait failed: %v", workerID, err) + return + } } + requestCount++ + klog.V(8).Infof("Worker %d received request #%d", workerID, requestCount) + req := builder.Build(cli) + klog.V(5).Infof("Request URL: %s", req.URL()) req.Timeout(defaultTimeout) @@ -111,30 +124,40 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I respMetric.ObserveLatency(req.Method(), req.MaskedURL().String(), latency) }() } - }(cli) + + klog.V(5).Infof("Worker %d finished: processed %d requests", workerID, requestCount) + }(i, cli) } - klog.V(2).InfoS("Setting", + // Extract rate from metadata for logging (mode-specific) + rate, _ := metadata.Custom["rate"].(float64) + + klog.V(2).InfoS("Schedule started", + "mode", spec.Mode, "clients", clients, "connections", len(restCli), - "rate", qps, - "total", spec.Total, - "duration", spec.Duration, + "rate", rate, + "expectedTotal", metadata.ExpectedTotal, + "expectedDuration", metadata.ExpectedDuration, "http2", !spec.DisableHTTP2, "content-type", spec.ContentType, ) start := time.Now() - if spec.Duration > 0 { - // If duration is set, we will run for duration. - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Duration(spec.Duration)*time.Second) - defer cancel() - } - rndReqs.Run(ctx, spec.Total) + // Start executor AFTER workers are ready to receive + go func() { + if err := exec.Run(execCtx); err != nil && err != context.Canceled { + klog.Errorf("Executor error: %v", err) + } + // Signal completion (success or failure) + cancel() + }() + + // Wait for completion + <-ctx.Done() - rndReqs.Stop() + exec.Stop() wg.Wait() totalDuration := time.Since(start) @@ -142,7 +165,7 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I return &Result{ ResponseStats: responseStats, Duration: totalDuration, - Total: spec.Total, + Total: metadata.ExpectedTotal, }, nil } From ddf7481723413d4ed888c17577f6fc11ade8722b Mon Sep 17 00:00:00 2001 From: JasonXuDeveloper Date: Mon, 15 Dec 2025 14:47:27 +1100 Subject: [PATCH 3/7] refactor: make CLI tools mode-agnostic Update CLI tools to be mode-agnostic by using BuildOverridesFromCLI() instead of hardcoded field access. ## Changes ### cmd/kperf/commands/runner/runner.go - Remove hardcoded rate/total/duration flag handling - Use BuildOverridesFromCLI() to auto-discover overridable fields - Apply overrides via ModeConfig.ApplyOverrides() ### contrib/cmd/runkperf/commands/bench/utils.go - Use BuildOverridesFromCLI() for automatic override extraction - No longer needs to know about specific mode field names ### contrib/cmd/runkperf/commands/warmup/command.go - Updated to use mode-agnostic override handling ## Benefits - CLI tools don't depend on specific mode types - Adding new modes doesn't require CLI changes - Each mode declares its own CLI-overridable fields - Cleaner, more maintainable code ## Depends On - PR1: mode-config-interface (BuildOverridesFromCLI helper) - PR2: executor-pattern-virtualization (executor pattern must exist) --- cmd/kperf/commands/runner/runner.go | 42 +++++++------- contrib/cmd/runkperf/commands/bench/utils.go | 58 ++++++++++--------- .../cmd/runkperf/commands/warmup/command.go | 10 +++- 3 files changed, 60 insertions(+), 50 deletions(-) diff --git a/cmd/kperf/commands/runner/runner.go b/cmd/kperf/commands/runner/runner.go index ecb56311..3a227163 100644 --- a/cmd/kperf/commands/runner/runner.go +++ b/cmd/kperf/commands/runner/runner.go @@ -15,7 +15,6 @@ import ( "github.com/Azure/kperf/cmd/kperf/commands/utils" "github.com/Azure/kperf/metrics" "github.com/Azure/kperf/request" - "k8s.io/klog/v2" "github.com/urfave/cli" "gopkg.in/yaml.v2" @@ -104,10 +103,14 @@ var runCommand = cli.Command{ } clientNum := profileCfg.Spec.Conns + + // Get mode-specific client options + clientOpts := profileCfg.Spec.ModeConfig.ConfigureClientOptions() + restClis, err := request.NewClients(kubeCfgPath, clientNum, request.WithClientUserAgentOpt(cliCtx.String("user-agent")), - request.WithClientQPSOpt(profileCfg.Spec.Rate), + request.WithClientQPSOpt(clientOpts.QPS), request.WithClientContentTypeOpt(profileCfg.Spec.ContentType), request.WithClientDisableHTTP2Opt(profileCfg.Spec.DisableHTTP2), ) @@ -165,30 +168,13 @@ func loadConfig(cliCtx *cli.Context) (*types.LoadProfile, error) { return nil, fmt.Errorf("failed to unmarshal %s from yaml format: %w", cfgPath, err) } - // override value by flags - if v := "rate"; cliCtx.IsSet(v) { - profileCfg.Spec.Rate = cliCtx.Float64(v) - } + // Apply CLI overrides to common fields if v := "conns"; cliCtx.IsSet(v) || profileCfg.Spec.Conns == 0 { profileCfg.Spec.Conns = cliCtx.Int(v) } if v := "client"; cliCtx.IsSet(v) || profileCfg.Spec.Client == 0 { profileCfg.Spec.Client = cliCtx.Int(v) } - if v := "total"; cliCtx.IsSet(v) { - profileCfg.Spec.Total = cliCtx.Int(v) - } - if v := "duration"; cliCtx.IsSet(v) { - profileCfg.Spec.Duration = cliCtx.Int(v) - } - if profileCfg.Spec.Total > 0 && profileCfg.Spec.Duration > 0 { - klog.Warningf("both total:%v and duration:%v are set, duration will be ignored\n", profileCfg.Spec.Total, profileCfg.Spec.Duration) - profileCfg.Spec.Duration = 0 - } - if profileCfg.Spec.Total == 0 && profileCfg.Spec.Duration == 0 { - // Use default total value - profileCfg.Spec.Total = cliCtx.Int("total") - } if v := "content-type"; cliCtx.IsSet(v) || profileCfg.Spec.ContentType == "" { profileCfg.Spec.ContentType = types.ContentType(cliCtx.String(v)) } @@ -199,6 +185,22 @@ func loadConfig(cliCtx *cli.Context) (*types.LoadProfile, error) { profileCfg.Spec.MaxRetries = cliCtx.Int(v) } + // Apply mode-specific CLI flag overrides + modeOverrides := types.BuildOverridesFromCLI(profileCfg.Spec.ModeConfig, cliCtx) + if len(modeOverrides) > 0 { + if err := profileCfg.Spec.ModeConfig.ApplyOverrides(modeOverrides); err != nil { + return nil, fmt.Errorf("failed to apply config overrides: %w", err) + } + } + + // Mode-specific validation with defaults + defaultOverrides := map[string]interface{}{ + "total": cliCtx.Int("total"), + } + if err := profileCfg.Spec.ModeConfig.Validate(defaultOverrides); err != nil { + return nil, fmt.Errorf("config validation failed: %w", err) + } + if err := profileCfg.Validate(); err != nil { return nil, err } diff --git a/contrib/cmd/runkperf/commands/bench/utils.go b/contrib/cmd/runkperf/commands/bench/utils.go index 3995db55..112865b7 100644 --- a/contrib/cmd/runkperf/commands/bench/utils.go +++ b/contrib/cmd/runkperf/commands/bench/utils.go @@ -150,10 +150,14 @@ func newLoadProfileFromEmbed(cliCtx *cli.Context, name string) (_name string, _s if reqs < 0 { return fmt.Errorf("invalid total-requests value: %v", reqs) } - reqsTime := cliCtx.Int("duration") - if !cliCtx.IsSet("total") && reqsTime > 0 { - reqs = 0 - spec.Profile.Spec.Duration = reqsTime + // Apply CLI overrides automatically + if spec != nil && spec.Profile.Spec.ModeConfig != nil { + overrides := types.BuildOverridesFromCLI(spec.Profile.Spec.ModeConfig, cliCtx) + if len(overrides) > 0 { + if err := spec.Profile.Spec.ModeConfig.ApplyOverrides(overrides); err != nil { + return fmt.Errorf("failed to apply config overrides: %w", err) + } + } } rgAffinity := cliCtx.GlobalString("rg-affinity") @@ -161,10 +165,6 @@ func newLoadProfileFromEmbed(cliCtx *cli.Context, name string) (_name string, _s if err != nil { return fmt.Errorf("failed to parse %s affinity: %w", rgAffinity, err) } - - if reqs != 0 { - spec.Profile.Spec.Total = reqs - } spec.NodeAffinity = affinityLabels spec.Profile.Spec.ContentType = types.ContentType(cliCtx.String("content-type")) data, _ := yaml.Marshal(spec) @@ -202,27 +202,29 @@ func tweakReadUpdateProfile(cliCtx *cli.Context, spec *types.RunnerGroupSpec) er configmapTotal := cliCtx.Int("read-update-configmap-total") if namePattern != "" || ratio != 0 || namespace != "" || configmapTotal > 0 { - for _, r := range spec.Profile.Spec.Requests { - if r.Patch != nil { - if namePattern != "" { - r.Patch.Name = fmt.Sprintf("runkperf-cm-%s", namePattern) - } - if ratio != 0 { - r.Shares = 100 - int(ratio*100) - } - if namespace != "" { - r.Patch.Namespace = namespace - } - if configmapTotal > 0 { - r.Patch.KeySpaceSize = configmapTotal - } - } - if r.StaleList != nil { - if ratio != 0 { - r.Shares = int(ratio * 100) + if wrConfig, ok := spec.Profile.Spec.ModeConfig.(*types.WeightedRandomConfig); ok && wrConfig != nil { + for _, r := range wrConfig.Requests { + if r.Patch != nil { + if namePattern != "" { + r.Patch.Name = fmt.Sprintf("runkperf-cm-%s", namePattern) + } + if ratio != 0 { + r.Shares = 100 - int(ratio*100) + } + if namespace != "" { + r.Patch.Namespace = namespace + } + if configmapTotal > 0 { + r.Patch.KeySpaceSize = configmapTotal + } } - if namespace != "" { - r.StaleList.Namespace = namespace + if r.StaleList != nil { + if ratio != 0 { + r.Shares = int(ratio * 100) + } + if namespace != "" { + r.StaleList.Namespace = namespace + } } } } diff --git a/contrib/cmd/runkperf/commands/warmup/command.go b/contrib/cmd/runkperf/commands/warmup/command.go index e022e91f..85ea0fe2 100644 --- a/contrib/cmd/runkperf/commands/warmup/command.go +++ b/contrib/cmd/runkperf/commands/warmup/command.go @@ -102,8 +102,14 @@ var Command = cli.Command{ return fmt.Errorf("failed to parse %s affinity: %w", rgAffinity, err) } - spec.Profile.Spec.Total = reqs - spec.Profile.Spec.Rate = rate + // Build overrides from the function parameters + overrides := map[string]interface{}{ + "total": reqs, + "rate": rate, + } + if err := spec.Profile.Spec.ModeConfig.ApplyOverrides(overrides); err != nil { + return fmt.Errorf("failed to apply config overrides: %w", err) + } spec.NodeAffinity = affinityLabels data, _ := yaml.Marshal(spec) From c60c4e4174e30cf4eb4c6556cd56c1dd77662010 Mon Sep 17 00:00:00 2001 From: JasonXuDeveloper Date: Tue, 16 Dec 2025 09:27:35 +1100 Subject: [PATCH 4/7] refactor: remove redundant interval field from TimeSeriesConfig The interval field in TimeSeriesConfig is redundant since timing is already specified via StartTime in each RequestBucket. Removing it simplifies the configuration. ## Changes - Remove Interval field from TimeSeriesConfig struct - Update GetOverridableFields() to return empty array (no CLI overrides) - Update ApplyOverrides() to reject any override attempts - Update all tests to remove interval references - Update comment examples to remove interval mentions ## Benefits - Simpler configuration - Less redundant data - Timing is determined by bucket StartTime values --- api/types/mode_config.go | 2 +- api/types/mode_config_test.go | 11 -------- api/types/timeseries_config.go | 28 +++++--------------- api/types/timeseries_config_test.go | 40 ++++++++--------------------- 4 files changed, 19 insertions(+), 62 deletions(-) diff --git a/api/types/mode_config.go b/api/types/mode_config.go index f84d59ce..7b5245ab 100644 --- a/api/types/mode_config.go +++ b/api/types/mode_config.go @@ -33,7 +33,7 @@ type ClientOptions struct { // OverridableField describes a config field that can be overridden via CLI flags. type OverridableField struct { - // Name is the field name (e.g., "rate", "total", "interval") + // Name is the field name (e.g., "rate", "total", "duration") Name string // Type describes the field type for CLI parsing Type FieldType diff --git a/api/types/mode_config_test.go b/api/types/mode_config_test.go index 4fbd33fe..e0874637 100644 --- a/api/types/mode_config_test.go +++ b/api/types/mode_config_test.go @@ -37,15 +37,6 @@ func TestBuildOverridesFromCLI(t *testing.T) { "duration": 120, }, }, - "time-series with interval": { - config: &TimeSeriesConfig{}, - cliValues: map[string]interface{}{ - "interval": "500ms", - }, - expectedResult: map[string]interface{}{ - "interval": "500ms", - }, - }, "no overrides set": { config: &WeightedRandomConfig{}, cliValues: map[string]interface{}{}, @@ -105,7 +96,6 @@ spec: client: 5 contentType: json modeConfig: - interval: "2s" buckets: - startTime: 0.0 requests: @@ -118,7 +108,6 @@ spec: validateFunc: func(t *testing.T, mc ModeConfig) { tsConfig, ok := mc.(*TimeSeriesConfig) require.True(t, ok) - assert.Equal(t, "2s", tsConfig.Interval) assert.Len(t, tsConfig.Buckets, 1) }, }, diff --git a/api/types/timeseries_config.go b/api/types/timeseries_config.go index 5c356fc0..7111df04 100644 --- a/api/types/timeseries_config.go +++ b/api/types/timeseries_config.go @@ -7,8 +7,6 @@ import "fmt" // TimeSeriesConfig defines configuration for time-series execution mode. type TimeSeriesConfig struct { - // Interval defines the time bucket size (e.g., "1s", "60s"). - Interval string `json:"interval" yaml:"interval" mapstructure:"interval"` // Buckets contains the time-bucketed requests. Buckets []RequestBucket `json:"buckets" yaml:"buckets" mapstructure:"buckets"` } @@ -54,28 +52,16 @@ func (*TimeSeriesConfig) isModeConfig() {} // GetOverridableFields implements ModeConfig for TimeSeriesConfig func (c *TimeSeriesConfig) GetOverridableFields() []OverridableField { - return []OverridableField{ - { - Name: "interval", - Type: FieldTypeString, - Description: "Time bucket interval (e.g., '1s', '100ms')", - }, - } + // Time-series mode has no CLI-overridable fields + // Bucket timing is defined in the load profile itself + return []OverridableField{} } // ApplyOverrides implements ModeConfig for TimeSeriesConfig func (c *TimeSeriesConfig) ApplyOverrides(overrides map[string]interface{}) error { - for key, value := range overrides { - switch key { - case "interval": - if v, ok := value.(string); ok { - c.Interval = v - } else { - return fmt.Errorf("interval must be string, got %T", value) - } - default: - return fmt.Errorf("unknown override key for time-series mode: %s", key) - } + // Time-series mode has no overridable fields + if len(overrides) > 0 { + return fmt.Errorf("time-series mode does not support CLI overrides") } return nil } @@ -83,7 +69,7 @@ func (c *TimeSeriesConfig) ApplyOverrides(overrides map[string]interface{}) erro // Validate implements ModeConfig for TimeSeriesConfig func (c *TimeSeriesConfig) Validate(defaultOverrides map[string]interface{}) error { // Time-series mode doesn't have conflicting settings or defaults - // Could add validation for interval format, bucket ordering, etc. + // Could add validation for bucket ordering, etc. return nil } diff --git a/api/types/timeseries_config_test.go b/api/types/timeseries_config_test.go index 36e89449..26e86c95 100644 --- a/api/types/timeseries_config_test.go +++ b/api/types/timeseries_config_test.go @@ -15,42 +15,27 @@ func TestTimeSeriesConfigGetOverridableFields(t *testing.T) { config := &TimeSeriesConfig{} fields := config.GetOverridableFields() - assert.Len(t, fields, 1) - assert.Equal(t, "interval", fields[0].Name) - assert.Equal(t, FieldTypeString, fields[0].Type) - assert.Contains(t, fields[0].Description, "Time bucket") + // Time-series mode has no CLI-overridable fields + assert.Len(t, fields, 0) } func TestTimeSeriesConfigApplyOverrides(t *testing.T) { tests := map[string]struct { initial TimeSeriesConfig overrides map[string]interface{} - expected TimeSeriesConfig err bool }{ - "interval override": { - initial: TimeSeriesConfig{Interval: "1s"}, - overrides: map[string]interface{}{ - "interval": "100ms", - }, - expected: TimeSeriesConfig{Interval: "100ms"}, - err: false, - }, - "invalid interval type": { - initial: TimeSeriesConfig{Interval: "1s"}, - overrides: map[string]interface{}{ - "interval": 123, - }, - expected: TimeSeriesConfig{Interval: "1s"}, - err: true, + "no overrides": { + initial: TimeSeriesConfig{}, + overrides: map[string]interface{}{}, + err: false, }, - "unknown key": { - initial: TimeSeriesConfig{Interval: "1s"}, + "any override fails": { + initial: TimeSeriesConfig{}, overrides: map[string]interface{}{ - "unknown": "value", + "interval": "100ms", }, - expected: TimeSeriesConfig{Interval: "1s"}, - err: true, + err: true, }, } @@ -62,14 +47,13 @@ func TestTimeSeriesConfigApplyOverrides(t *testing.T) { assert.Error(t, err) } else { assert.NoError(t, err) - assert.Equal(t, tc.expected.Interval, config.Interval) } }) } } func TestTimeSeriesConfigValidate(t *testing.T) { - config := &TimeSeriesConfig{Interval: "1s"} + config := &TimeSeriesConfig{} err := config.Validate(nil) assert.NoError(t, err) } @@ -90,7 +74,6 @@ spec: contentType: json mode: time-series modeConfig: - interval: "1s" buckets: - startTime: 0.0 requests: @@ -125,7 +108,6 @@ spec: require.True(t, ok, "ModeConfig should be *TimeSeriesConfig") require.NotNil(t, tsConfig) - assert.Equal(t, "1s", tsConfig.Interval) assert.Len(t, tsConfig.Buckets, 2) assert.Equal(t, 0.0, tsConfig.Buckets[0].StartTime) From a3652883830ff3c82884a61af263c2314d6e4b28 Mon Sep 17 00:00:00 2001 From: JasonXuDeveloper Date: Tue, 16 Dec 2025 09:43:56 +1100 Subject: [PATCH 5/7] fix: remove interval field from TimeSeriesExecutor - Update TimeSeriesExecutor to work without interval field - Timing is handled by bucket StartTime values - Ensure PR builds independently --- request/executor/timeseries.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/request/executor/timeseries.go b/request/executor/timeseries.go index 4f426618..6d542b5e 100644 --- a/request/executor/timeseries.go +++ b/request/executor/timeseries.go @@ -17,7 +17,6 @@ import ( type TimeSeriesExecutor struct { config *types.TimeSeriesConfig spec *types.LoadProfileSpec - interval time.Duration buckets []types.RequestBucket reqBuilderCh chan RESTRequestBuilder ctx context.Context @@ -42,16 +41,10 @@ func NewTimeSeriesExecutor(spec *types.LoadProfileSpec) (Executor, error) { return nil, fmt.Errorf("invalid config type for time-series mode") } - interval, err := time.ParseDuration(config.Interval) - if err != nil { - return nil, fmt.Errorf("invalid interval: %v", err) - } - ctx, cancel := context.WithCancel(context.Background()) return &TimeSeriesExecutor{ config: config, spec: spec, - interval: interval, buckets: config.Buckets, reqBuilderCh: make(chan RESTRequestBuilder), ctx: ctx, @@ -129,7 +122,6 @@ func (e *TimeSeriesExecutor) Metadata() ExecutorMetadata { Custom: map[string]interface{}{ "mode": string(types.ModeTimeSeries), "bucket_count": len(e.buckets), - "interval": e.interval.String(), }, } } From c5adae0376b19c5901874494392db12c004459fc Mon Sep 17 00:00:00 2001 From: JasonXuDeveloper Date: Tue, 16 Dec 2025 11:51:28 +1100 Subject: [PATCH 6/7] chore: remove load_traffic_test.go (split into separate mode config tests) --- api/types/load_traffic_test.go | 220 --------------------------------- 1 file changed, 220 deletions(-) delete mode 100644 api/types/load_traffic_test.go diff --git a/api/types/load_traffic_test.go b/api/types/load_traffic_test.go deleted file mode 100644 index 32178e7d..00000000 --- a/api/types/load_traffic_test.go +++ /dev/null @@ -1,220 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -package types - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "gopkg.in/yaml.v2" -) - -func TestLoadProfileUnmarshalFromYAML(t *testing.T) { - in := ` -version: 1 -description: test -spec: - rate: 100 - total: 10000 - conns: 2 - client: 1 - contentType: json - requests: - - staleGet: - group: core - version: v1 - resource: pods - namespace: default - name: x1 - shares: 100 - - quorumGet: - group: core - version: v1 - resource: configmaps - namespace: default - name: x2 - shares: 150 - - staleList: - group: core - version: v1 - resource: pods - namespace: default - selector: app=x2 - fieldSelector: spec.nodeName=x - shares: 200 - - quorumList: - group: core - version: v1 - resource: configmaps - namespace: default - limit: 10000 - selector: app=x3 - shares: 400 - - put: - group: core - version: v1 - resource: configmaps - namespace: kperf - name: kperf- - keySpaceSize: 1000 - valueSize: 1024 - shares: 1000 - - getPodLog: - namespace: default - name: hello - container: main - tailLines: 1000 - limitBytes: 1024 - shares: 10 - - watchList: - group: core - version: v1 - resource: pods - namespace: default - selector: app=x2 - fieldSelector: spec.nodeName=x - shares: 250 -` - - target := LoadProfile{} - require.NoError(t, yaml.Unmarshal([]byte(in), &target)) - assert.Equal(t, 1, target.Version) - assert.Equal(t, "test", target.Description) - assert.Equal(t, float64(100), target.Spec.Rate) - assert.Equal(t, 10000, target.Spec.Total) - assert.Equal(t, 2, target.Spec.Conns) - assert.Len(t, target.Spec.Requests, 7) - - assert.Equal(t, 100, target.Spec.Requests[0].Shares) - assert.NotNil(t, target.Spec.Requests[0].StaleGet) - assert.Equal(t, "pods", target.Spec.Requests[0].StaleGet.Resource) - assert.Equal(t, "v1", target.Spec.Requests[0].StaleGet.Version) - assert.Equal(t, "core", target.Spec.Requests[0].StaleGet.Group) - assert.Equal(t, "default", target.Spec.Requests[0].StaleGet.Namespace) - assert.Equal(t, "x1", target.Spec.Requests[0].StaleGet.Name) - - assert.NotNil(t, target.Spec.Requests[1].QuorumGet) - assert.Equal(t, 150, target.Spec.Requests[1].Shares) - - assert.Equal(t, 200, target.Spec.Requests[2].Shares) - assert.NotNil(t, target.Spec.Requests[2].StaleList) - assert.Equal(t, "pods", target.Spec.Requests[2].StaleList.Resource) - assert.Equal(t, "v1", target.Spec.Requests[2].StaleList.Version) - assert.Equal(t, "core", target.Spec.Requests[2].StaleList.Group) - assert.Equal(t, "default", target.Spec.Requests[2].StaleList.Namespace) - assert.Equal(t, 0, target.Spec.Requests[2].StaleList.Limit) - assert.Equal(t, "app=x2", target.Spec.Requests[2].StaleList.Selector) - assert.Equal(t, "spec.nodeName=x", target.Spec.Requests[2].StaleList.FieldSelector) - - assert.NotNil(t, target.Spec.Requests[3].QuorumList) - assert.Equal(t, 400, target.Spec.Requests[3].Shares) - - assert.Equal(t, 1000, target.Spec.Requests[4].Shares) - assert.NotNil(t, target.Spec.Requests[4].Put) - assert.Equal(t, "configmaps", target.Spec.Requests[4].Put.Resource) - assert.Equal(t, "v1", target.Spec.Requests[4].Put.Version) - assert.Equal(t, "core", target.Spec.Requests[4].Put.Group) - assert.Equal(t, "kperf", target.Spec.Requests[4].Put.Namespace) - assert.Equal(t, "kperf-", target.Spec.Requests[4].Put.Name) - assert.Equal(t, 1000, target.Spec.Requests[4].Put.KeySpaceSize) - assert.Equal(t, 1024, target.Spec.Requests[4].Put.ValueSize) - - assert.Equal(t, 10, target.Spec.Requests[5].Shares) - assert.NotNil(t, target.Spec.Requests[5].GetPodLog) - assert.Equal(t, "default", target.Spec.Requests[5].GetPodLog.Namespace) - assert.Equal(t, "hello", target.Spec.Requests[5].GetPodLog.Name) - assert.Equal(t, "main", target.Spec.Requests[5].GetPodLog.Container) - assert.Equal(t, int64(1000), *target.Spec.Requests[5].GetPodLog.TailLines) - assert.Equal(t, int64(1024), *target.Spec.Requests[5].GetPodLog.LimitBytes) - - assert.Equal(t, 250, target.Spec.Requests[6].Shares) - assert.NotNil(t, target.Spec.Requests[6].WatchList) - - assert.NoError(t, target.Validate()) -} - -func TestWeightedRequest(t *testing.T) { - for _, r := range []struct { - name string - req *WeightedRequest - hasErr bool - }{ - { - name: "shares < 0", - req: &WeightedRequest{Shares: -1}, - hasErr: true, - }, - { - name: "no request setting", - req: &WeightedRequest{Shares: 10}, - hasErr: true, - }, - { - name: "empty version", - req: &WeightedRequest{ - Shares: 10, - StaleGet: &RequestGet{ - KubeGroupVersionResource: KubeGroupVersionResource{ - Resource: "pods", - }, - }, - }, - hasErr: true, - }, - { - name: "empty resource", - req: &WeightedRequest{ - Shares: 10, - StaleGet: &RequestGet{ - KubeGroupVersionResource: KubeGroupVersionResource{ - Group: "core", - Version: "v1", - }, - }, - }, - hasErr: true, - }, - { - name: "wrong limit", - req: &WeightedRequest{ - Shares: 10, - StaleList: &RequestList{ - KubeGroupVersionResource: KubeGroupVersionResource{ - Group: "core", - Version: "v1", - Resource: "pods", - }, - Limit: -1, - }, - }, - hasErr: true, - }, - { - name: "no error", - req: &WeightedRequest{ - Shares: 10, - StaleGet: &RequestGet{ - KubeGroupVersionResource: KubeGroupVersionResource{ - Group: "core", - Version: "v1", - Resource: "pods", - }, - Namespace: "default", - Name: "testing", - }, - }, - }, - } { - r := r - t.Run(r.name, func(t *testing.T) { - err := r.req.Validate() - if r.hasErr { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - }) - } -} From 23c9a15baf345c7efa53d99889c0a407403eb6c9 Mon Sep 17 00:00:00 2001 From: JasonXuDeveloper Date: Tue, 16 Dec 2025 11:55:27 +1100 Subject: [PATCH 7/7] chore: restore TestWeightedRequest (was removed accidentally during merge) --- api/types/load_traffic_test.go | 89 ++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 api/types/load_traffic_test.go diff --git a/api/types/load_traffic_test.go b/api/types/load_traffic_test.go new file mode 100644 index 00000000..66c40376 --- /dev/null +++ b/api/types/load_traffic_test.go @@ -0,0 +1,89 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package types + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWeightedRequest(t *testing.T) { + tests := map[string]struct { + req WeightedRequest + err bool + }{ + "shares < 0": { + req: WeightedRequest{ + Shares: -1, + }, + err: true, + }, + "no request setting": { + req: WeightedRequest{ + Shares: 100, + }, + err: true, + }, + "empty version": { + req: WeightedRequest{ + Shares: 100, + StaleList: &RequestList{ + KubeGroupVersionResource: KubeGroupVersionResource{ + Resource: "pods", + }, + }, + }, + err: true, + }, + "empty resource": { + req: WeightedRequest{ + Shares: 100, + StaleList: &RequestList{ + KubeGroupVersionResource: KubeGroupVersionResource{ + Version: "v1", + }, + }, + }, + err: true, + }, + "wrong limit": { + req: WeightedRequest{ + Shares: 100, + StaleList: &RequestList{ + KubeGroupVersionResource: KubeGroupVersionResource{ + Resource: "pods", + Version: "v1", + }, + Limit: -1, + }, + }, + err: true, + }, + "no error": { + req: WeightedRequest{ + Shares: 100, + StaleList: &RequestList{ + KubeGroupVersionResource: KubeGroupVersionResource{ + Resource: "pods", + Version: "v1", + }, + Limit: 0, + }, + }, + err: false, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + err := tc.req.Validate() + if tc.err { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +}