diff --git a/docs/quick-start/cassandra-cluster.yaml b/docs/quick-start/cassandra-cluster.yaml index 003ba76e0..4da109b20 100644 --- a/docs/quick-start/cassandra-cluster.yaml +++ b/docs/quick-start/cassandra-cluster.yaml @@ -7,6 +7,7 @@ spec: nodePools: - name: "ringnodes" replicas: 3 + seeds: 2 datacenter: "demo-datacenter" rack: "demo-rack" persistence: diff --git a/hack/e2e.sh b/hack/e2e.sh index 831d041b5..2bde2b6db 100755 --- a/hack/e2e.sh +++ b/hack/e2e.sh @@ -222,6 +222,15 @@ if [[ "test_elasticsearchcluster" = "${TEST_PREFIX}"* ]]; then kube_delete_namespace_and_wait "${ES_TEST_NS}" fi +function apply_cassandracluster() { + kubectl apply \ + --namespace "${namespace}" \ + --filename \ + <(envsubst \ + '$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_REPLICAS:$CASS_VERSION:$CASS_SEEDS' \ + < "${SCRIPT_DIR}/testdata/cass-cluster-test.template.yaml") +} + function test_cassandracluster() { echo "Testing CassandraCluster" local namespace="${1}" @@ -230,6 +239,7 @@ function test_cassandracluster() { export CASS_REPLICAS=1 export CASS_CQL_PORT=9042 export CASS_VERSION="3.11.1" + export CASS_SEEDS=1 kube_create_namespace_with_quota "${namespace}" @@ -239,12 +249,7 @@ function test_cassandracluster() { fail_test "Failed to get cassandraclusters" fi - if ! kubectl apply \ - --namespace "${namespace}" \ - --filename \ - <(envsubst \ - '$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_REPLICAS:$CASS_VERSION' \ - < "${SCRIPT_DIR}/testdata/cass-cluster-test.template.yaml") + if ! apply_cassandracluster then fail_test "Failed to create cassandracluster" fi @@ -338,12 +343,10 @@ function test_cassandracluster() { # Increment the replica count export CASS_REPLICAS=2 - kubectl apply \ - --namespace "${namespace}" \ - --filename \ - <(envsubst \ - '$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_REPLICAS:$CASS_VERSION' \ - < "${SCRIPT_DIR}/testdata/cass-cluster-test.template.yaml") + if ! apply_cassandracluster + then + fail_test "Failed to apply cassandracluster" + fi if ! retry TIMEOUT=300 stdout_equals 2 kubectl \ --namespace "${namespace}" \ @@ -357,7 +360,7 @@ function test_cassandracluster() { # TODO: A better test would be to query the endpoints and check that only # the `-0` pods are included. E.g. # kubectl -n test-cassandra-1519754828-19864 get ep cass-cassandra-1519754828-19864-cassandra-seeds -o "jsonpath={.subsets[*].addresses[*].hostname}" - if ! stdout_equals "cass-${CASS_NAME}-ringnodes-0" \ + if ! retry stdout_equals "cass-${CASS_NAME}-ringnodes-0" \ kubectl get pods --namespace "${namespace}" \ --selector=navigator.jetstack.io/cassandra-seed=true \ --output 'jsonpath={.items[*].metadata.name}' @@ -392,6 +395,21 @@ function test_cassandracluster() { then fail_test "Cassandra liveness probe failed to restart dead node" fi + + export CASS_REPLICAS=2 + export CASS_SEEDS=2 + if ! apply_cassandracluster + then + fail_test "Failed to apply cassandracluster" + fi + + if ! retry stdout_equals "cass-${CASS_NAME}-ringnodes-0 cass-${CASS_NAME}-ringnodes-1" \ + kubectl get pods --namespace "${namespace}" \ + --selector=navigator.jetstack.io/cassandra-seed=true \ + --output 'jsonpath={.items[*].metadata.name}' + then + fail_test "Second cassandra node not marked as seed" + fi } if [[ "test_cassandracluster" = "${TEST_PREFIX}"* ]]; then diff --git a/hack/testdata/cass-cluster-test.template.yaml b/hack/testdata/cass-cluster-test.template.yaml index 5859a0406..5c0c77ef8 100644 --- a/hack/testdata/cass-cluster-test.template.yaml +++ b/hack/testdata/cass-cluster-test.template.yaml @@ -9,6 +9,7 @@ spec: replicas: ${CASS_REPLICAS} datacenter: "${CASS_NAME}-datacenter" rack: "{CASS_NAME}-rack" + seeds: ${CASS_SEEDS} persistence: enabled: true size: "5Gi" diff --git a/pkg/apis/navigator/types.go b/pkg/apis/navigator/types.go index 9f63052e7..f3f09160a 100644 --- a/pkg/apis/navigator/types.go +++ b/pkg/apis/navigator/types.go @@ -43,6 +43,7 @@ type CassandraClusterNodePool struct { Datacenter string Resources v1.ResourceRequirements SchedulerName string + Seeds *int32 } type CassandraClusterStatus struct { diff --git a/pkg/apis/navigator/v1alpha1/defaults.go b/pkg/apis/navigator/v1alpha1/defaults.go index 3bf61b535..2c51e694d 100644 --- a/pkg/apis/navigator/v1alpha1/defaults.go +++ b/pkg/apis/navigator/v1alpha1/defaults.go @@ -2,6 +2,8 @@ package v1alpha1 import ( "k8s.io/apimachinery/pkg/runtime" + + "github.com/jetstack/navigator/pkg/util/ptr" ) const ( @@ -20,4 +22,9 @@ func SetDefaults_CassandraClusterNodePool(np *CassandraClusterNodePool) { if np.Rack == "" { np.Rack = np.Name } + + // default to 1 seed if not specified + if np.Seeds == nil { + np.Seeds = ptr.Int32(1) + } } diff --git a/pkg/apis/navigator/v1alpha1/types.go b/pkg/apis/navigator/v1alpha1/types.go index d09825d02..8cbd1e148 100644 --- a/pkg/apis/navigator/v1alpha1/types.go +++ b/pkg/apis/navigator/v1alpha1/types.go @@ -74,6 +74,11 @@ type CassandraClusterNodePool struct { // If not specified, the pod will be dispatched by default scheduler. // +optional SchedulerName string `json:"schedulerName,omitempty"` + + // Seeds specifies the number of seed nodes to allocate in this nodepool. By + // default, 1 is selected. + // +optional + Seeds *int32 `json:"seeds,omitempty"` } type CassandraClusterStatus struct { diff --git a/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go b/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go index a290cd478..79ecc34b9 100644 --- a/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go +++ b/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go @@ -173,6 +173,7 @@ func autoConvert_v1alpha1_CassandraClusterNodePool_To_navigator_CassandraCluster out.Datacenter = in.Datacenter out.Resources = in.Resources out.SchedulerName = in.SchedulerName + out.Seeds = (*int32)(unsafe.Pointer(in.Seeds)) return nil } @@ -192,6 +193,7 @@ func autoConvert_navigator_CassandraClusterNodePool_To_v1alpha1_CassandraCluster out.Datacenter = in.Datacenter out.Resources = in.Resources out.SchedulerName = in.SchedulerName + out.Seeds = (*int32)(unsafe.Pointer(in.Seeds)) return nil } diff --git a/pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go index 0f15c581b..cc829c1bb 100644 --- a/pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/navigator/v1alpha1/zz_generated.deepcopy.go @@ -100,6 +100,15 @@ func (in *CassandraClusterNodePool) DeepCopyInto(out *CassandraClusterNodePool) } } in.Resources.DeepCopyInto(&out.Resources) + if in.Seeds != nil { + in, out := &in.Seeds, &out.Seeds + if *in == nil { + *out = nil + } else { + *out = new(int32) + **out = **in + } + } return } diff --git a/pkg/apis/navigator/validation/cassandra.go b/pkg/apis/navigator/validation/cassandra.go index 559fc4cbe..9ba9987fe 100644 --- a/pkg/apis/navigator/validation/cassandra.go +++ b/pkg/apis/navigator/validation/cassandra.go @@ -1,6 +1,7 @@ package validation import ( + "fmt" "reflect" apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation" @@ -13,7 +14,27 @@ import ( func ValidateCassandraClusterNodePool(np *navigator.CassandraClusterNodePool, fldPath *field.Path) field.ErrorList { // TODO: call k8s.io/kubernetes/pkg/apis/core/validation.ValidateResourceRequirements on np.Resources // this will require vendoring kubernetes/kubernetes. - return field.ErrorList{} + + allErrs := field.ErrorList{} + + if np.Seeds != nil { + if *np.Seeds > np.Replicas { + allErrs = append(allErrs, + field.Invalid( + fldPath.Child("seeds"), + np.Seeds, + fmt.Sprintf("number of seeds cannot be greater than number of replicas (%d)", np.Replicas)), + ) + } + + if *np.Seeds < 1 { + allErrs = append(allErrs, + field.Invalid(fldPath.Child("seeds"), np.Seeds, "number of seeds must be greater than or equal to 1"), + ) + } + } + + return allErrs } func ValidateCassandraCluster(c *navigator.CassandraCluster) field.ErrorList { diff --git a/pkg/apis/navigator/zz_generated.deepcopy.go b/pkg/apis/navigator/zz_generated.deepcopy.go index e187c402e..0b1e2f102 100644 --- a/pkg/apis/navigator/zz_generated.deepcopy.go +++ b/pkg/apis/navigator/zz_generated.deepcopy.go @@ -100,6 +100,15 @@ func (in *CassandraClusterNodePool) DeepCopyInto(out *CassandraClusterNodePool) } } in.Resources.DeepCopyInto(&out.Resources) + if in.Seeds != nil { + in, out := &in.Seeds, &out.Seeds + if *in == nil { + *out = nil + } else { + *out = new(int32) + **out = **in + } + } return } diff --git a/pkg/controllers/cassandra/seedlabeller/control.go b/pkg/controllers/cassandra/seedlabeller/control.go index 32ac9a5bf..091f11afa 100644 --- a/pkg/controllers/cassandra/seedlabeller/control.go +++ b/pkg/controllers/cassandra/seedlabeller/control.go @@ -44,36 +44,60 @@ func NewControl( func (c *defaultSeedLabeller) labelSeedNodes( cluster *v1alpha1.CassandraCluster, + np *v1alpha1.CassandraClusterNodePool, set *appsv1beta1.StatefulSet, ) error { - // TODO: make number of seed nodes configurable - pod, err := c.pods.Pods(cluster.Namespace).Get(fmt.Sprintf("%s-%d", set.Name, 0)) - if err != nil { - glog.Warningf("Couldn't get stateful set pod: %v", err) - return nil - } - labels := pod.Labels - value := labels[service.SeedLabelKey] - if value == service.SeedLabelValue { - return nil - } - if labels == nil { - labels = map[string]string{} + for i := int32(0); i < np.Replicas; i++ { + pod, err := c.pods.Pods(cluster.Namespace).Get(fmt.Sprintf("%s-%d", set.Name, i)) + if err != nil { + glog.Warningf("Couldn't get stateful set pod: %v", err) + return nil + } + + // label first n as seeds + isSeed := i < *np.Seeds + + desiredLabel := "" + if isSeed { + desiredLabel = service.SeedLabelValue + } + + labels := pod.Labels + value := labels[service.SeedLabelKey] + if value == desiredLabel { + continue + } + if labels == nil { + labels = map[string]string{} + } + + if isSeed { + labels[service.SeedLabelKey] = desiredLabel + } else { + delete(labels, service.SeedLabelKey) + } + + podCopy := pod.DeepCopy() + podCopy.SetLabels(labels) + _, err = c.kubeClient.CoreV1().Pods(podCopy.Namespace).Update(podCopy) + if err != nil { + return err + } } - labels[service.SeedLabelKey] = service.SeedLabelValue - podCopy := pod.DeepCopy() - podCopy.SetLabels(labels) - _, err = c.kubeClient.CoreV1().Pods(podCopy.Namespace).Update(podCopy) - return err + return nil } func (c *defaultSeedLabeller) Sync(cluster *v1alpha1.CassandraCluster) error { - sets, err := util.StatefulSetsForCluster(cluster, c.statefulSetLister) - if err != nil { - return err - } - for _, s := range sets { - err = c.labelSeedNodes(cluster, s) + for _, np := range cluster.Spec.NodePools { + setName := util.NodePoolResourceName(cluster, &np) + + set, err := c.statefulSetLister.StatefulSets(cluster.Namespace).Get(setName) + if err != nil { + glog.Warningf("Couldn't get stateful set: %v", err) + return nil + } + + err = c.labelSeedNodes(cluster, &np, set) if err != nil { return err } diff --git a/pkg/controllers/cassandra/seedlabeller/control_test.go b/pkg/controllers/cassandra/seedlabeller/control_test.go index fad694c16..c842b9f94 100644 --- a/pkg/controllers/cassandra/seedlabeller/control_test.go +++ b/pkg/controllers/cassandra/seedlabeller/control_test.go @@ -14,9 +14,10 @@ import ( "github.com/jetstack/navigator/pkg/controllers/cassandra/seedlabeller" "github.com/jetstack/navigator/pkg/controllers/cassandra/service" casstesting "github.com/jetstack/navigator/pkg/controllers/cassandra/testing" + "github.com/jetstack/navigator/pkg/util" ) -func CheckSeedLabel(podName, podNamespace string, t *testing.T, state *controllers.State) { +func CheckSeedLabel(podName, seedLabelValue string, podNamespace string, t *testing.T, state *controllers.State) { p, err := state.Clientset. CoreV1(). Pods(podNamespace). @@ -24,7 +25,7 @@ func CheckSeedLabel(podName, podNamespace string, t *testing.T, state *controlle if err != nil { t.Fatal(err) } - if p.Labels[service.SeedLabelKey] != service.SeedLabelValue { + if p.Labels[service.SeedLabelKey] != seedLabelValue { t.Errorf("unexpected seed label: %s", p.Labels) } } @@ -44,6 +45,23 @@ func TestSeedLabellerSync(t *testing.T) { pod0ValueIncorrect := pod0LabelMissing.DeepCopy() pod0ValueIncorrect.Labels[service.SeedLabelKey] = "blah" + clusterOneSeed := cluster.DeepCopy() + clusterOneSeed.Spec.NodePools[0].Seeds = util.Int64Ptr(1) + + pod1 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cass-cassandra-1-RingNodes-1", + Namespace: cluster.Namespace, + }, + } + + pod2 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cass-cassandra-1-RingNodes-2", + Namespace: cluster.Namespace, + }, + } + type testT struct { kubeObjects []runtime.Object navObjects []runtime.Object @@ -63,7 +81,7 @@ func TestSeedLabellerSync(t *testing.T) { navObjects: []runtime.Object{cluster}, cluster: cluster, assertions: func(t *testing.T, state *controllers.State) { - CheckSeedLabel(pod0.Name, pod0.Namespace, t, state) + CheckSeedLabel(pod0.Name, seedprovider.SeedLabelValue, pod0.Namespace, t, state) }, }, "add label if key missing": { @@ -71,7 +89,7 @@ func TestSeedLabellerSync(t *testing.T) { navObjects: []runtime.Object{cluster}, cluster: cluster, assertions: func(t *testing.T, state *controllers.State) { - CheckSeedLabel(pod0.Name, pod0.Namespace, t, state) + CheckSeedLabel(pod0.Name, seedprovider.SeedLabelValue, pod0.Namespace, t, state) }, }, "fix label if value incorrect": { @@ -79,7 +97,31 @@ func TestSeedLabellerSync(t *testing.T) { navObjects: []runtime.Object{cluster}, cluster: cluster, assertions: func(t *testing.T, state *controllers.State) { - CheckSeedLabel(pod0.Name, pod0.Namespace, t, state) + CheckSeedLabel(pod0.Name, seedprovider.SeedLabelValue, pod0.Namespace, t, state) + }, + }, + "add multiple seeds": { + kubeObjects: []runtime.Object{ss0, pod0, pod1, pod2}, + navObjects: []runtime.Object{cluster}, + cluster: cluster, + assertions: func(t *testing.T, state *controllers.State) { + CheckSeedLabel(pod1.Name, seedprovider.SeedLabelValue, pod1.Namespace, t, state) + }, + }, + "don't add too many seeds": { + kubeObjects: []runtime.Object{ss0, pod0, pod1, pod2}, + navObjects: []runtime.Object{cluster}, + cluster: cluster, + assertions: func(t *testing.T, state *controllers.State) { + CheckSeedLabel(pod2.Name, "", pod2.Namespace, t, state) + }, + }, + "delete label if seed number decreased": { + kubeObjects: []runtime.Object{ss0, pod0, pod1, pod2}, + navObjects: []runtime.Object{cluster}, + cluster: clusterOneSeed, + assertions: func(t *testing.T, state *controllers.State) { + CheckSeedLabel(pod1.Name, "", pod1.Namespace, t, state) }, }, } diff --git a/pkg/controllers/cassandra/testing/testing.go b/pkg/controllers/cassandra/testing/testing.go index 91e2c4be3..c592dad90 100644 --- a/pkg/controllers/cassandra/testing/testing.go +++ b/pkg/controllers/cassandra/testing/testing.go @@ -16,6 +16,7 @@ import ( "github.com/jetstack/navigator/pkg/controllers/cassandra/seedlabeller" "github.com/jetstack/navigator/pkg/controllers/cassandra/service" "github.com/jetstack/navigator/pkg/controllers/cassandra/serviceaccount" + "github.com/jetstack/navigator/pkg/util" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -37,6 +38,7 @@ func ClusterForTest() *v1alpha1.CassandraCluster { v1alpha1.CassandraClusterNodePool{ Name: "RingNodes", Replicas: 3, + Seeds: util.Int64Ptr(2), }, }, },