Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ setup-testenv:

.PHONY: test
test:
go test -v $$(go list ./... | grep -v '/e2e')
UPSTREAM_KUBEBUILDER_ASSETS=$$(go run sigs.k8s.io/controller-runtime/tools/setup-envtest@latest use -p path) go test -v $$(go list ./... | grep -v '/e2e')

.PHONY: test-e2e
test-e2e:
Expand Down
103 changes: 103 additions & 0 deletions internal/controllers/watch/integration_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package watch

import (
"fmt"
"testing"

apiv1 "github.com/Azure/eno/api/v1"
Expand Down Expand Up @@ -518,6 +519,108 @@ func TestRemoveInput(t *testing.T) {
})
}

// TestConcurrentStatusUpdates verifies that the retry mechanism handles
// concurrent updates to composition status without causing a retry storm.
// This tests the fix for the issue where conflict errors would cause immediate
// requeues without backoff, leading to API server overload.
func TestConcurrentStatusUpdates(t *testing.T) {
mgr := testutil.NewManager(t)
require.NoError(t, NewController(mgr.Manager))
mgr.Start(t)

ctx := testutil.NewContext(t)
cli := mgr.GetClient()

// Create a shared input that will be used by multiple compositions
input := &corev1.ConfigMap{}
input.Name = "shared-input"
input.Namespace = "default"
input.Data = map[string]string{"version": "1"}
require.NoError(t, cli.Create(ctx, input))

synth := &apiv1.Synthesizer{}
synth.Name = "test-synth"
synth.Spec.Refs = []apiv1.Ref{{
Key: "foo",
Resource: apiv1.ResourceRef{
Version: "v1",
Kind: "ConfigMap",
},
}}
require.NoError(t, cli.Create(ctx, synth))

// Create multiple compositions that reference the same input
// This creates conditions for concurrent status updates when the input changes
numComps := 3
comps := make([]*apiv1.Composition, numComps)
for i := 0; i < numComps; i++ {
comp := &apiv1.Composition{}
comp.Name = fmt.Sprintf("test-comp-%d", i)
comp.Namespace = "default"
comp.Spec.Synthesizer.Name = synth.Name
comp.Spec.Bindings = []apiv1.Binding{{
Key: "foo",
Resource: apiv1.ResourceBinding{
Name: input.Name,
Namespace: input.Namespace,
},
}}
require.NoError(t, cli.Create(ctx, comp))
comps[i] = comp
}

// Wait for all compositions to have their initial status populated
for _, comp := range comps {
testutil.Eventually(t, func() bool {
cli.Get(ctx, client.ObjectKeyFromObject(comp), comp)
return len(comp.Status.InputRevisions) == 1 && comp.Status.InputRevisions[0].ResourceVersion != ""
})
}

// Record the initial resource versions
initialRVs := make([]string, numComps)
for i, comp := range comps {
cli.Get(ctx, client.ObjectKeyFromObject(comp), comp)
initialRVs[i] = comp.Status.InputRevisions[0].ResourceVersion
}

// Update the shared input - this will trigger concurrent status updates
// to all compositions. The retry mechanism should handle conflicts gracefully.
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
cli.Get(ctx, client.ObjectKeyFromObject(input), input)
input.Data["version"] = "2"
return cli.Update(ctx, input)
})
require.NoError(t, err)

// Verify all compositions eventually get their status updated
// This confirms the retry mechanism works even with concurrent updates
for i, comp := range comps {
testutil.Eventually(t, func() bool {
cli.Get(ctx, client.ObjectKeyFromObject(comp), comp)
if len(comp.Status.InputRevisions) != 1 {
return false
}
rv := comp.Status.InputRevisions[0].ResourceVersion
return rv != "" && rv != initialRVs[i]
})
}

// Verify the final state is consistent - all compositions should have
// the same input revision (the updated one)
var expectedRV string
for i, comp := range comps {
cli.Get(ctx, client.ObjectKeyFromObject(comp), comp)
assert.Len(t, comp.Status.InputRevisions, 1, "composition %d should have exactly one input revision", i)
if expectedRV == "" {
expectedRV = comp.Status.InputRevisions[0].ResourceVersion
} else {
assert.Equal(t, expectedRV, comp.Status.InputRevisions[0].ResourceVersion,
"all compositions should reference the same input revision")
}
}
}

func TestOptionalInputCreatedLater(t *testing.T) {
mgr := testutil.NewManager(t)
require.NoError(t, NewController(mgr.Manager))
Expand Down
49 changes: 29 additions & 20 deletions internal/controllers/watch/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -249,33 +250,41 @@ func (k *KindWatchController) updateCompositions(ctx context.Context, logger log
continue
}

compKey := client.ObjectKeyFromObject(&comp)
var modified bool
if isDeleted {
// Only remove InputRevisions for optional refs
// Required refs should trigger MissingInputs status instead
if isOptionalRef(synth, key) {
modified = removeInputRevision(&comp, key)

err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// Re-fetch composition to get the latest version
if err := k.client.Get(ctx, compKey, &comp); err != nil {
return err
}
// For required refs, do nothing - the composition controller will handle MissingInputs
} else {
revs := apiv1.NewInputRevisions(meta, key)
modified = setInputRevisions(&comp, revs)
}

if !modified {
continue
}
if isDeleted {
// Only remove InputRevisions for optional refs
// Required refs should trigger MissingInputs status instead
if isOptionalRef(synth, key) {
modified = removeInputRevision(&comp, key)
}
// For required refs, do nothing - the composition controller will handle MissingInputs
} else {
revs := apiv1.NewInputRevisions(meta, key)
modified = setInputRevisions(&comp, revs)
}

// TODO: Reduce risk of conflict errors here
err := k.client.Status().Update(ctx, &comp)
if errors.IsConflict(err) {
return false, fmt.Errorf("composition was modified during input reconciliation - will retry")
}
if !modified {
return nil
}

return k.client.Status().Update(ctx, &comp)
})
if err != nil {
return false, fmt.Errorf("updating input revisions: %w", err)
}
logger.V(1).Info("noticed input resource change", "compositionName", comp.Name, "compositionNamespace", comp.Namespace, "ref", key)
return true, nil // wait for requeue

if modified {
logger.V(1).Info("noticed input resource change", "compositionName", comp.Name, "compositionNamespace", comp.Namespace, "ref", key)
return true, nil // wait for requeue
}
}

return false, nil
Expand Down
7 changes: 5 additions & 2 deletions internal/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package manager

import (
"context"
"os"
"path/filepath"
"runtime"
"testing"
Expand Down Expand Up @@ -30,7 +31,8 @@ func TestManagerBasics(t *testing.T) {
filepath.Join(root, "api", "v1", "config", "crd"),
testCrdDir,
},
ErrorIfCRDPathMissing: true,
ErrorIfCRDPathMissing: true,
BinaryAssetsDirectory: os.Getenv("UPSTREAM_KUBEBUILDER_ASSETS"),
}
t.Cleanup(func() {
err := env.Stop()
Expand Down Expand Up @@ -155,7 +157,8 @@ func TestReconcilerLimitedScope(t *testing.T) {
filepath.Join(root, "api", "v1", "config", "crd"),
testCrdDir,
},
ErrorIfCRDPathMissing: true,
ErrorIfCRDPathMissing: true,
BinaryAssetsDirectory: os.Getenv("UPSTREAM_KUBEBUILDER_ASSETS"),
}
t.Cleanup(func() {
err := env.Stop()
Expand Down
5 changes: 4 additions & 1 deletion internal/resource/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"sort"
"testing"
"time"
Expand Down Expand Up @@ -652,7 +653,9 @@ func TestSnapshotPatch(t *testing.T) {
}

func TestComparisons(t *testing.T) {
env := &envtest.Environment{}
env := &envtest.Environment{
BinaryAssetsDirectory: os.Getenv("UPSTREAM_KUBEBUILDER_ASSETS"),
}
t.Cleanup(func() {
err := env.Stop()
if err != nil {
Expand Down