From ecc75326791d6c10935b2e6d62be6f753a0f8915 Mon Sep 17 00:00:00 2001 From: John Stupka Date: Tue, 3 Mar 2026 13:05:57 -0600 Subject: [PATCH 1/2] fix: use retry.RetryOnConflict to prevent retry storm on composition status updates When multiple controllers concurrently update composition status, conflict errors cause immediate requeues without backoff, leading to a retry storm that can overwhelm the API server (142+ retries/second observed). This fix uses k8s.io/client-go/util/retry.RetryOnConflict which: - Re-fetches the composition before each retry attempt - Applies exponential backoff between retries - Handles conflicts internally instead of propagating errors This prevents the thundering herd effect where conflict errors cascade through the controller-runtime work queue. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- internal/controllers/watch/kind.go | 50 +++++++++++++++++------------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/internal/controllers/watch/kind.go b/internal/controllers/watch/kind.go index d7ce5152..4fc5d645 100644 --- a/internal/controllers/watch/kind.go +++ b/internal/controllers/watch/kind.go @@ -13,10 +13,10 @@ import ( "github.com/Azure/eno/internal/manager" "github.com/go-logr/logr" "golang.org/x/time/rate" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -249,33 +249,41 @@ func (k *KindWatchController) updateCompositions(ctx context.Context, logger log continue } + compKey := client.ObjectKeyFromObject(&comp) var modified bool - if isDeleted { - // Only remove InputRevisions for optional refs - // Required refs should trigger MissingInputs status instead - if isOptionalRef(synth, key) { - modified = removeInputRevision(&comp, key) + + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + // Re-fetch composition to get the latest version + if err := k.client.Get(ctx, compKey, &comp); err != nil { + return err } - // For required refs, do nothing - the composition controller will handle MissingInputs - } else { - revs := apiv1.NewInputRevisions(meta, key) - modified = setInputRevisions(&comp, revs) - } - if !modified { - continue - } + if isDeleted { + // Only remove InputRevisions for optional refs + // Required refs should trigger MissingInputs status instead + if isOptionalRef(synth, key) { + modified = removeInputRevision(&comp, key) + } + // For required refs, do nothing - the composition controller will handle MissingInputs + } else { + revs := apiv1.NewInputRevisions(meta, key) + modified = setInputRevisions(&comp, revs) + } - // TODO: Reduce risk of conflict errors here - err := k.client.Status().Update(ctx, &comp) - if errors.IsConflict(err) { - return false, fmt.Errorf("composition was modified during input reconciliation - will retry") - } + if !modified { + return nil + } + + return k.client.Status().Update(ctx, &comp) + }) if err != nil { return false, fmt.Errorf("updating input revisions: %w", err) } - logger.V(1).Info("noticed input resource change", "compositionName", comp.Name, "compositionNamespace", comp.Namespace, "ref", key) - return true, nil // wait for requeue + + if modified { + logger.V(1).Info("noticed input resource change", "compositionName", comp.Name, "compositionNamespace", comp.Namespace, "ref", key) + return true, nil // wait for requeue + } } return false, nil From 19fcb862437dc7b93ef9dc2e8c0b949629cc1282 Mon Sep 17 00:00:00 2001 From: John Stupka Date: Tue, 3 Mar 2026 13:41:54 -0600 Subject: [PATCH 2/2] update test cases to ensure they run and validate new test case for the concurrent status updates --- Makefile | 2 +- .../controllers/watch/integration_test.go | 103 ++++++++++++++++++ internal/controllers/watch/kind.go | 1 + internal/manager/manager_test.go | 7 +- internal/resource/resource_test.go | 5 +- 5 files changed, 114 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index b7c45df4..b6821bee 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ setup-testenv: .PHONY: test test: - go test -v $$(go list ./... | grep -v '/e2e') + UPSTREAM_KUBEBUILDER_ASSETS=$$(go run sigs.k8s.io/controller-runtime/tools/setup-envtest@latest use -p path) go test -v $$(go list ./... | grep -v '/e2e') .PHONY: test-e2e test-e2e: diff --git a/internal/controllers/watch/integration_test.go b/internal/controllers/watch/integration_test.go index 04e256cb..22b3e07e 100644 --- a/internal/controllers/watch/integration_test.go +++ b/internal/controllers/watch/integration_test.go @@ -1,6 +1,7 @@ package watch import ( + "fmt" "testing" apiv1 "github.com/Azure/eno/api/v1" @@ -518,6 +519,108 @@ func TestRemoveInput(t *testing.T) { }) } +// TestConcurrentStatusUpdates verifies that the retry mechanism handles +// concurrent updates to composition status without causing a retry storm. +// This tests the fix for the issue where conflict errors would cause immediate +// requeues without backoff, leading to API server overload. +func TestConcurrentStatusUpdates(t *testing.T) { + mgr := testutil.NewManager(t) + require.NoError(t, NewController(mgr.Manager)) + mgr.Start(t) + + ctx := testutil.NewContext(t) + cli := mgr.GetClient() + + // Create a shared input that will be used by multiple compositions + input := &corev1.ConfigMap{} + input.Name = "shared-input" + input.Namespace = "default" + input.Data = map[string]string{"version": "1"} + require.NoError(t, cli.Create(ctx, input)) + + synth := &apiv1.Synthesizer{} + synth.Name = "test-synth" + synth.Spec.Refs = []apiv1.Ref{{ + Key: "foo", + Resource: apiv1.ResourceRef{ + Version: "v1", + Kind: "ConfigMap", + }, + }} + require.NoError(t, cli.Create(ctx, synth)) + + // Create multiple compositions that reference the same input + // This creates conditions for concurrent status updates when the input changes + numComps := 3 + comps := make([]*apiv1.Composition, numComps) + for i := 0; i < numComps; i++ { + comp := &apiv1.Composition{} + comp.Name = fmt.Sprintf("test-comp-%d", i) + comp.Namespace = "default" + comp.Spec.Synthesizer.Name = synth.Name + comp.Spec.Bindings = []apiv1.Binding{{ + Key: "foo", + Resource: apiv1.ResourceBinding{ + Name: input.Name, + Namespace: input.Namespace, + }, + }} + require.NoError(t, cli.Create(ctx, comp)) + comps[i] = comp + } + + // Wait for all compositions to have their initial status populated + for _, comp := range comps { + testutil.Eventually(t, func() bool { + cli.Get(ctx, client.ObjectKeyFromObject(comp), comp) + return len(comp.Status.InputRevisions) == 1 && comp.Status.InputRevisions[0].ResourceVersion != "" + }) + } + + // Record the initial resource versions + initialRVs := make([]string, numComps) + for i, comp := range comps { + cli.Get(ctx, client.ObjectKeyFromObject(comp), comp) + initialRVs[i] = comp.Status.InputRevisions[0].ResourceVersion + } + + // Update the shared input - this will trigger concurrent status updates + // to all compositions. The retry mechanism should handle conflicts gracefully. + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + cli.Get(ctx, client.ObjectKeyFromObject(input), input) + input.Data["version"] = "2" + return cli.Update(ctx, input) + }) + require.NoError(t, err) + + // Verify all compositions eventually get their status updated + // This confirms the retry mechanism works even with concurrent updates + for i, comp := range comps { + testutil.Eventually(t, func() bool { + cli.Get(ctx, client.ObjectKeyFromObject(comp), comp) + if len(comp.Status.InputRevisions) != 1 { + return false + } + rv := comp.Status.InputRevisions[0].ResourceVersion + return rv != "" && rv != initialRVs[i] + }) + } + + // Verify the final state is consistent - all compositions should have + // the same input revision (the updated one) + var expectedRV string + for i, comp := range comps { + cli.Get(ctx, client.ObjectKeyFromObject(comp), comp) + assert.Len(t, comp.Status.InputRevisions, 1, "composition %d should have exactly one input revision", i) + if expectedRV == "" { + expectedRV = comp.Status.InputRevisions[0].ResourceVersion + } else { + assert.Equal(t, expectedRV, comp.Status.InputRevisions[0].ResourceVersion, + "all compositions should reference the same input revision") + } + } +} + func TestOptionalInputCreatedLater(t *testing.T) { mgr := testutil.NewManager(t) require.NoError(t, NewController(mgr.Manager)) diff --git a/internal/controllers/watch/kind.go b/internal/controllers/watch/kind.go index 4fc5d645..3b385543 100644 --- a/internal/controllers/watch/kind.go +++ b/internal/controllers/watch/kind.go @@ -13,6 +13,7 @@ import ( "github.com/Azure/eno/internal/manager" "github.com/go-logr/logr" "golang.org/x/time/rate" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" diff --git a/internal/manager/manager_test.go b/internal/manager/manager_test.go index dc5cf4be..9ab5f377 100644 --- a/internal/manager/manager_test.go +++ b/internal/manager/manager_test.go @@ -2,6 +2,7 @@ package manager import ( "context" + "os" "path/filepath" "runtime" "testing" @@ -30,7 +31,8 @@ func TestManagerBasics(t *testing.T) { filepath.Join(root, "api", "v1", "config", "crd"), testCrdDir, }, - ErrorIfCRDPathMissing: true, + ErrorIfCRDPathMissing: true, + BinaryAssetsDirectory: os.Getenv("UPSTREAM_KUBEBUILDER_ASSETS"), } t.Cleanup(func() { err := env.Stop() @@ -155,7 +157,8 @@ func TestReconcilerLimitedScope(t *testing.T) { filepath.Join(root, "api", "v1", "config", "crd"), testCrdDir, }, - ErrorIfCRDPathMissing: true, + ErrorIfCRDPathMissing: true, + BinaryAssetsDirectory: os.Getenv("UPSTREAM_KUBEBUILDER_ASSETS"), } t.Cleanup(func() { err := env.Stop() diff --git a/internal/resource/resource_test.go b/internal/resource/resource_test.go index 3c8d8f8f..e559b06d 100644 --- a/internal/resource/resource_test.go +++ b/internal/resource/resource_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "os" "sort" "testing" "time" @@ -652,7 +653,9 @@ func TestSnapshotPatch(t *testing.T) { } func TestComparisons(t *testing.T) { - env := &envtest.Environment{} + env := &envtest.Environment{ + BinaryAssetsDirectory: os.Getenv("UPSTREAM_KUBEBUILDER_ASSETS"), + } t.Cleanup(func() { err := env.Stop() if err != nil {