Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion deploy/helm/kafka-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ spec:
authentication: []
authorization:
opa: null
metadataManager: ZooKeeper
tls:
internalSecretClass: tls
serverSecretClass: tls
Expand Down Expand Up @@ -793,6 +794,12 @@ spec:
- configMapName
type: object
type: object
metadataManager:
default: ZooKeeper
enum:
- ZooKeeper
- KRaft
type: string
tls:
default:
internalSecretClass: tls
Expand Down Expand Up @@ -836,7 +843,7 @@ spec:
Provide the name of the ZooKeeper [discovery ConfigMap](https://docs.stackable.tech/home/nightly/concepts/service_discovery)
here. When using the [Stackable operator for Apache ZooKeeper](https://docs.stackable.tech/home/nightly/zookeeper/)
to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource.
This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper suppport was dropped.
This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper support was dropped.
Please use the 'controller' role instead.
nullable: true
type: string
Expand Down
87 changes: 87 additions & 0 deletions examples/kraft-migration/01-setup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
---
apiVersion: v1
kind: Namespace
metadata:
labels:
stackable.tech/vendor: Stackable
name: kraft-migration
---
apiVersion: zookeeper.stackable.tech/v1alpha1
kind: ZookeeperCluster
metadata:
name: simple-zk
namespace: kraft-migration
spec:
image:
productVersion: 3.9.4
pullPolicy: IfNotPresent
servers:
roleGroups:
default:
replicas: 1
---
apiVersion: zookeeper.stackable.tech/v1alpha1
kind: ZookeeperZnode
metadata:
name: simple-kafka-znode
namespace: kraft-migration
spec:
clusterRef:
name: simple-zk
---
apiVersion: secrets.stackable.tech/v1alpha1
kind: SecretClass
metadata:
name: kafka-internal-tls
spec:
backend:
autoTls:
ca:
secret:
name: secret-provisioner-kafka-internal-tls-ca
namespace: kraft-migration
autoGenerate: true
---
apiVersion: authentication.stackable.tech/v1alpha1
kind: AuthenticationClass
metadata:
name: kafka-client-auth-tls
spec:
provider:
tls:
clientCertSecretClass: kafka-client-auth-secret
---
apiVersion: secrets.stackable.tech/v1alpha1
kind: SecretClass
metadata:
name: kafka-client-auth-secret
spec:
backend:
autoTls:
ca:
secret:
name: secret-provisioner-tls-kafka-client-ca
namespace: kraft-migration
autoGenerate: true
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
namespace: kraft-migration
spec:
image:
productVersion: 3.9.1
pullPolicy: IfNotPresent
clusterConfig:
metadataManager: ZooKeeper
authentication:
- authenticationClass: kafka-client-auth-tls
tls:
internalSecretClass: kafka-internal-tls
serverSecretClass: tls
zookeeperConfigMapName: simple-kafka-znode
brokers:
roleGroups:
default:
replicas: 3
33 changes: 33 additions & 0 deletions examples/kraft-migration/02-start-controllers.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
namespace: kraft-migration
spec:
image:
productVersion: 3.9.1
pullPolicy: IfNotPresent
clusterConfig:
metadataManager: ZooKeeper
authentication:
- authenticationClass: kafka-client-auth-tls
tls:
internalSecretClass: kafka-internal-tls
serverSecretClass: tls
zookeeperConfigMapName: simple-kafka-znode
brokers:
envOverrides:
KAFKA_CLUSTER_ID: "saiZFmAuSX-QyMfMhwLk9g"
roleGroups:
default:
replicas: 3
controllers:
roleGroups:
default:
replicas: 3
envOverrides:
KAFKA_CLUSTER_ID: "saiZFmAuSX-QyMfMhwLk9g"
configOverrides:
controller.properties:
zookeeper.metadata.migration.enable: "true" # Enable migration mode so the controller can read metadata from ZooKeeper.
47 changes: 47 additions & 0 deletions examples/kraft-migration/03-migrate-metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
namespace: kraft-migration
spec:
image:
productVersion: 3.9.1
pullPolicy: IfNotPresent
clusterConfig:
metadataManager: ZooKeeper
authentication:
- authenticationClass: kafka-client-auth-tls
tls:
internalSecretClass: kafka-internal-tls
serverSecretClass: tls
zookeeperConfigMapName: simple-kafka-znode
brokers:
envOverrides:
KAFKA_CLUSTER_ID: "saiZFmAuSX-QyMfMhwLk9g"
roleGroups:
default:
replicas: 3
configOverrides:
broker.properties:
inter.broker.protocol.version: "3.9" # - Latest value known to Kafka 3.9.1
zookeeper.metadata.migration.enable: "true" # - Enable migration mode so the broker can participate in metadata migration.
controller.listener.names: "CONTROLLER"
controller.quorum.voters: "2110489703@simple-kafka-controller-default-0.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9093,2110489704@simple-kafka-controller-default-1.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9093,2110489705@simple-kafka-controller-default-2.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9093"

# listener.security.protocol.map: CONTROLLER:SSL,... - Already defined by the operator
# zookeeper.connect=<zk_connection_string> (should already be present) - The ZooKeeper connection string. This property should already be configured.
# controller.quorum.voters=<voter_string> (same as controllers) - Specify the same controller quorum voters string as configured in phase 2.
# controller.listener.names=CONTROLLER - Define the listener name for the controller.
# Add CONTROLLER to listener.security.protocol.map (for example, ...CONTROLLER:PLAINTEXT) - Add the CONTROLLER listener to the security protocol map with the appropriate security protocol.
# confluent.cluster.link.metadata.topic.enable=true - This property is used by Cluster Linking during the migration.

controllers:
roleGroups:
default:
replicas: 3
envOverrides:
KAFKA_CLUSTER_ID: "saiZFmAuSX-QyMfMhwLk9g"
configOverrides:
controller.properties:
zookeeper.metadata.migration.enable: "true" # Enable migration mode so the controller can read metadata from ZooKeeper.
14 changes: 7 additions & 7 deletions rust/operator-binary/src/config/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::{
/// Returns the commands to start the main Kafka container
pub fn broker_kafka_container_commands(
kafka: &v1alpha1::KafkaCluster,
cluster_id: &str,
controller_descriptors: Vec<KafkaPodDescriptor>,
kafka_security: &KafkaTlsSecurity,
product_version: &str,
Expand All @@ -42,17 +41,16 @@ pub fn broker_kafka_container_commands(
true => format!("export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {STACKABLE_KERBEROS_KRB5_PATH})"),
false => "".to_string(),
},
broker_start_command = broker_start_command(kafka, cluster_id, controller_descriptors, product_version),
broker_start_command = broker_start_command(kafka, controller_descriptors, product_version),
}
}

fn broker_start_command(
kafka: &v1alpha1::KafkaCluster,
cluster_id: &str,
controller_descriptors: Vec<KafkaPodDescriptor>,
product_version: &str,
) -> String {
if kafka.is_controller_configured() {
if kafka.is_kraft_mode() {
formatdoc! {"
POD_INDEX=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$')
export REPLICA_ID=$((POD_INDEX+NODE_ID_OFFSET))
Expand All @@ -63,7 +61,7 @@ fn broker_start_command(
cp {config_dir}/jaas.properties /tmp/jaas.properties
config-utils template /tmp/jaas.properties

bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command}
bin/kafka-storage.sh format --cluster-id \"$KAFKA_CLUSTER_ID\" --config /tmp/{properties_file} --ignore-formatted {initial_controller_command}
bin/kafka-server-start.sh /tmp/{properties_file} &
",
config_dir = STACKABLE_CONFIG_DIR,
Expand All @@ -72,6 +70,9 @@ fn broker_start_command(
}
} else {
formatdoc! {"
POD_INDEX=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$')
export REPLICA_ID=$((POD_INDEX+NODE_ID_OFFSET))

cp {config_dir}/{properties_file} /tmp/{properties_file}
config-utils template /tmp/{properties_file}

Expand Down Expand Up @@ -128,7 +129,6 @@ wait_for_termination()
"#;

pub fn controller_kafka_container_command(
cluster_id: &str,
controller_descriptors: Vec<KafkaPodDescriptor>,
product_version: &str,
) -> String {
Expand All @@ -145,7 +145,7 @@ pub fn controller_kafka_container_command(

config-utils template /tmp/{properties_file}

bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command}
bin/kafka-storage.sh format --cluster-id \"$KAFKA_CLUSTER_ID\" --config /tmp/{properties_file} --ignore-formatted {initial_controller_command}
bin/kafka-server-start.sh /tmp/{properties_file} &

wait_for_termination $!
Expand Down
17 changes: 10 additions & 7 deletions rust/operator-binary/src/crd/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,17 @@ impl KafkaListenerConfig {
.join(",")
}

/// Returns the `listener.security.protocol.map` for the Kafka `broker.properties` config.
pub fn listener_security_protocol_map_for_listener(
&self,
listener_name: &KafkaListenerName,
) -> Option<String> {
/// Returns the `listener.security.protocol.map` for the Kraft controller.
/// This map must include the internal broker listener too.
pub fn listener_security_protocol_map_for_controller(&self) -> String {
self.listener_security_protocol_map
.get(listener_name)
.map(|protocol| format!("{listener_name}:{protocol}"))
.iter()
.filter(|(name, _)| {
*name == &KafkaListenerName::Internal || *name == &KafkaListenerName::Controller
})
.map(|(name, protocol)| format!("{name}:{protocol}"))
.collect::<Vec<String>>()
.join(",")
}
}

Expand Down
69 changes: 47 additions & 22 deletions rust/operator-binary/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use stackable_operator::{
utils::cluster_info::KubernetesClusterInfo,
versioned::versioned,
};
use strum::{Display, EnumIter, EnumString};

use crate::{
config::node_id_hasher::node_id_hash32_offset,
Expand Down Expand Up @@ -158,9 +159,12 @@ pub mod versioned {
/// Provide the name of the ZooKeeper [discovery ConfigMap](DOCS_BASE_URL_PLACEHOLDER/concepts/service_discovery)
/// here. When using the [Stackable operator for Apache ZooKeeper](DOCS_BASE_URL_PLACEHOLDER/zookeeper/)
/// to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource.
/// This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper suppport was dropped.
/// This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper support was dropped.
/// Please use the 'controller' role instead.
pub zookeeper_config_map_name: Option<String>,

#[serde(default = "default_metadata_manager")]
pub metadata_manager: MetadataManager,
}
}

Expand All @@ -172,6 +176,7 @@ impl Default for v1alpha1::KafkaClusterConfig {
tls: tls::default_kafka_tls(),
vector_aggregator_config_map_name: None,
zookeeper_config_map_name: None,
metadata_manager: default_metadata_manager(),
}
}
}
Expand All @@ -186,30 +191,26 @@ impl HasStatusCondition for v1alpha1::KafkaCluster {
}

impl v1alpha1::KafkaCluster {
/// Supporting Kraft alongside Zookeeper requires a couple of CRD checks
/// - If Kafka 4 and higher is used, no zookeeper config map ref has to be provided
/// - Configuring the controller role means no zookeeper config map ref has to be provided
pub fn check_kraft_vs_zookeeper(&self, product_version: &str) -> Result<(), Error> {
if product_version.starts_with("4.") && self.spec.controllers.is_none() {
return Err(Error::Kafka4RequiresKraft);
}

if self.spec.controllers.is_some()
&& self.spec.cluster_config.zookeeper_config_map_name.is_some()
{
return Err(Error::KraftAndZookeeperConfigured);
}

Ok(())
}

pub fn is_controller_configured(&self) -> bool {
self.spec.controllers.is_some()
pub fn is_kraft_mode(&self) -> bool {
self.spec.cluster_config.metadata_manager == MetadataManager::KRaft
}

// The cluster-id for Kafka
/// The Kafka cluster id when running in Kraft mode.
///
/// In ZooKeeper mode the cluster id is a UUID generated by Kafka its self and users typically
/// do not need to deal with it.
///
/// When in Kraft mode, the cluster id is passed on an as the environment variable `KAFKA_CLUSTER_ID`.
///
/// When migrating to Kraft mode, users *must* set this variable via `envOverrides` to the value
/// found in the `cluster/id` ZooKeeper node or in the `meta.properties` file.
///
/// For freshly installed clusters, users do not need to deal with the cluster id.
pub fn cluster_id(&self) -> Option<&str> {
self.metadata.name.as_deref()
match self.spec.cluster_config.metadata_manager {
MetadataManager::KRaft => self.metadata.name.as_deref(),
_ => None,
}
}

/// The name of the load-balanced Kubernetes Service providing the bootstrap address. Kafka clients will use this
Expand Down Expand Up @@ -407,6 +408,30 @@ pub struct KafkaClusterStatus {
pub conditions: Vec<ClusterCondition>,
}

#[derive(
Clone,
Debug,
Deserialize,
Display,
EnumIter,
Eq,
Hash,
JsonSchema,
PartialEq,
Serialize,
EnumString,
)]
pub enum MetadataManager {
#[strum(serialize = "zookeeper")]
ZooKeeper,
#[strum(serialize = "kraft")]
KRaft,
}

fn default_metadata_manager() -> MetadataManager {
MetadataManager::ZooKeeper
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading