Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor MirrorMaker2Connectors configuration setup #11150

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,8 @@
import io.fabric8.kubernetes.api.model.VolumeMount;
import io.strimzi.api.kafka.model.common.CertSecretSource;
import io.strimzi.api.kafka.model.common.ClientTls;
import io.strimzi.api.kafka.model.common.GenericSecretSource;
import io.strimzi.api.kafka.model.common.PasswordSecretSource;
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthentication;
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationOAuth;
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationPlain;
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScram;
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationTls;
import io.strimzi.api.kafka.model.connect.KafkaConnectSpec;
import io.strimzi.api.kafka.model.connect.KafkaConnectSpecBuilder;
Expand Down Expand Up @@ -42,18 +38,11 @@ public class KafkaMirrorMaker2Cluster extends KafkaConnectCluster {

// Kafka MirrorMaker 2 connector configuration keys (EnvVariables)
protected static final String ENV_VAR_KAFKA_MIRRORMAKER_2_CLUSTERS = "KAFKA_MIRRORMAKER_2_CLUSTERS";
protected static final String ENV_VAR_KAFKA_MIRRORMAKER_2_TLS_CLUSTERS = "KAFKA_MIRRORMAKER_2_TLS_CLUSTERS";
protected static final String ENV_VAR_KAFKA_MIRRORMAKER_2_TRUSTED_CERTS_CLUSTERS = "KAFKA_MIRRORMAKER_2_TRUSTED_CERTS_CLUSTERS";
protected static final String ENV_VAR_KAFKA_MIRRORMAKER_2_TLS_AUTH_CLUSTERS = "KAFKA_MIRRORMAKER_2_TLS_AUTH_CLUSTERS";
protected static final String ENV_VAR_KAFKA_MIRRORMAKER_2_TLS_AUTH_CERTS_CLUSTERS = "KAFKA_MIRRORMAKER_2_TLS_AUTH_CERTS_CLUSTERS";
protected static final String ENV_VAR_KAFKA_MIRRORMAKER_2_TLS_AUTH_KEYS_CLUSTERS = "KAFKA_MIRRORMAKER_2_TLS_AUTH_KEYS_CLUSTERS";
protected static final String ENV_VAR_KAFKA_MIRRORMAKER_2_SASL_PASSWORD_FILES_CLUSTERS = "KAFKA_MIRRORMAKER_2_SASL_PASSWORD_FILES_CLUSTERS";
protected static final String ENV_VAR_KAFKA_MIRRORMAKER_2_OAUTH_TRUSTED_CERTS_CLUSTERS = "KAFKA_MIRRORMAKER_2_OAUTH_TRUSTED_CERTS_CLUSTERS";
protected static final String ENV_VAR_KAFKA_MIRRORMAKER_2_OAUTH_CLIENT_SECRETS_CLUSTERS = "KAFKA_MIRRORMAKER_2_OAUTH_CLIENT_SECRETS_CLUSTERS";
protected static final String ENV_VAR_KAFKA_MIRRORMAKER_2_OAUTH_ACCESS_TOKENS_CLUSTERS = "KAFKA_MIRRORMAKER_2_OAUTH_OAUTH_ACCESS_TOKENS_CLUSTERS";
protected static final String ENV_VAR_KAFKA_MIRRORMAKER_2_OAUTH_REFRESH_TOKENS_CLUSTERS = "KAFKA_MIRRORMAKER_2_OAUTH_REFRESH_TOKENS_CLUSTERS";
protected static final String ENV_VAR_KAFKA_MIRRORMAKER_2_OAUTH_PASSWORDS_CLUSTERS = "KAFKA_MIRRORMAKER_2_OAUTH_PASSWORDS_CLUSTERS";
protected static final String ENV_VAR_KAFKA_MIRRORMAKER_2_OAUTH_CLIENT_ASSERTIONS_CLUSTERS = "KAFKA_MIRRORMAKER_2_OAUTH_CLIENT_ASSERTIONS_CLUSTERS";
protected static final String CO_ENV_VAR_CUSTOM_MIRROR_MAKER2_POD_LABELS = "STRIMZI_CUSTOM_KAFKA_MIRROR_MAKER2_LABELS";

protected static final String MIRRORMAKER_2_OAUTH_SECRETS_BASE_VOLUME_MOUNT = "/opt/kafka/mm2-oauth/";
Expand Down Expand Up @@ -216,16 +205,9 @@ protected List<EnvVar> getEnvVars() {

final StringBuilder clusterAliases = new StringBuilder();
final StringBuilder clustersTrustedCerts = new StringBuilder();
boolean hasClusterWithTls = false;
final StringBuilder clustersTlsAuthCerts = new StringBuilder();
final StringBuilder clustersTlsAuthKeys = new StringBuilder();
final StringBuilder clustersSaslPasswordFiles = new StringBuilder();
final StringBuilder clustersOauthClientSecrets = new StringBuilder();
final StringBuilder clustersOauthAccessTokens = new StringBuilder();
final StringBuilder clustersOauthRefreshTokens = new StringBuilder();
final StringBuilder clustersOauthPasswords = new StringBuilder();
final StringBuilder clustersOauthTrustedCerts = new StringBuilder();
final StringBuilder clustersOauthClientAssertions = new StringBuilder();

for (KafkaMirrorMaker2ClusterSpec mirrorMaker2Cluster : clusters) {
String clusterAlias = mirrorMaker2Cluster.getAlias();
Expand All @@ -235,10 +217,6 @@ protected List<EnvVar> getEnvVars() {
}
clusterAliases.append(clusterAlias);

if (mirrorMaker2Cluster.getTls() != null) {
hasClusterWithTls = true;
}

getClusterTrustedCerts(clustersTrustedCerts, mirrorMaker2Cluster, clusterAlias);

KafkaClientAuthentication authentication = mirrorMaker2Cluster.getAuthentication();
Expand All @@ -248,30 +226,16 @@ protected List<EnvVar> getEnvVars() {
appendCluster(clustersTlsAuthCerts, clusterAlias, () -> tlsAuth.getCertificateAndKey().getSecretName() + "/" + tlsAuth.getCertificateAndKey().getCertificate());
appendCluster(clustersTlsAuthKeys, clusterAlias, () -> tlsAuth.getCertificateAndKey().getSecretName() + "/" + tlsAuth.getCertificateAndKey().getKey());
}
} else if (authentication instanceof KafkaClientAuthenticationPlain passwordAuth) {
appendClusterPasswordSecretSource(clustersSaslPasswordFiles, clusterAlias, passwordAuth.getPasswordSecret());
} else if (authentication instanceof KafkaClientAuthenticationScram passwordAuth) {
appendClusterPasswordSecretSource(clustersSaslPasswordFiles, clusterAlias, passwordAuth.getPasswordSecret());
} else if (authentication instanceof KafkaClientAuthenticationOAuth oauth) {
if (oauth.getTlsTrustedCertificates() != null && !oauth.getTlsTrustedCertificates().isEmpty()) {
appendClusterOAuthTrustedCerts(clustersOauthTrustedCerts, clusterAlias, oauth.getTlsTrustedCertificates());
}

appendClusterOAuthSecretSource(clustersOauthClientSecrets, clusterAlias, oauth.getClientSecret());
appendClusterOAuthSecretSource(clustersOauthAccessTokens, clusterAlias, oauth.getAccessToken());
appendClusterOAuthSecretSource(clustersOauthRefreshTokens, clusterAlias, oauth.getRefreshToken());
appendClusterPasswordSecretSource(clustersOauthPasswords, clusterAlias, oauth.getPasswordSecret());
appendClusterOAuthSecretSource(clustersOauthClientAssertions, clusterAlias, oauth.getClientAssertion());
}
}
}

varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_MIRRORMAKER_2_CLUSTERS, clusterAliases.toString()));

if (hasClusterWithTls) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_MIRRORMAKER_2_TLS_CLUSTERS, "true"));
}

if (clustersTrustedCerts.length() > 0) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_MIRRORMAKER_2_TRUSTED_CERTS_CLUSTERS, clustersTrustedCerts.toString()));
}
Expand All @@ -282,34 +246,10 @@ protected List<EnvVar> getEnvVars() {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_MIRRORMAKER_2_TLS_AUTH_KEYS_CLUSTERS, clustersTlsAuthKeys.toString()));
}

if (clustersSaslPasswordFiles.length() > 0) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_MIRRORMAKER_2_SASL_PASSWORD_FILES_CLUSTERS, clustersSaslPasswordFiles.toString()));
}

if (clustersOauthTrustedCerts.length() > 0) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_MIRRORMAKER_2_OAUTH_TRUSTED_CERTS_CLUSTERS, clustersOauthTrustedCerts.toString()));
}

if (clustersOauthClientSecrets.length() > 0) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_MIRRORMAKER_2_OAUTH_CLIENT_SECRETS_CLUSTERS, clustersOauthClientSecrets.toString()));
}

if (clustersOauthAccessTokens.length() > 0) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_MIRRORMAKER_2_OAUTH_ACCESS_TOKENS_CLUSTERS, clustersOauthAccessTokens.toString()));
}

if (clustersOauthRefreshTokens.length() > 0) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_MIRRORMAKER_2_OAUTH_REFRESH_TOKENS_CLUSTERS, clustersOauthRefreshTokens.toString()));
}

if (clustersOauthPasswords.length() > 0) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_MIRRORMAKER_2_OAUTH_PASSWORDS_CLUSTERS, clustersOauthPasswords.toString()));
}

if (clustersOauthClientAssertions.length() > 0) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_MIRRORMAKER_2_OAUTH_CLIENT_ASSERTIONS_CLUSTERS, clustersOauthClientAssertions.toString()));
}

JvmOptionUtils.jvmSystemProperties(varList, jvmOptions);

return varList;
Expand All @@ -331,18 +271,6 @@ private void getClusterTrustedCerts(final StringBuilder clustersTrustedCerts, Ka
}
}

private void appendClusterPasswordSecretSource(final StringBuilder clusters, String clusterAlias, PasswordSecretSource passwordSecretSource) {
if (passwordSecretSource != null) {
appendCluster(clusters, clusterAlias, () -> passwordSecretSource.getSecretName() + "/" + passwordSecretSource.getPassword());
}
}

private void appendClusterOAuthSecretSource(final StringBuilder clusters, String clusterAlias, GenericSecretSource secretSource) {
if (secretSource != null) {
appendCluster(clusters, clusterAlias, () -> secretSource.getSecretName() + "/" + secretSource.getKey());
}
}

private void appendClusterOAuthTrustedCerts(final StringBuilder clusters, String clusterAlias, List<CertSecretSource> trustedCerts) {
appendCluster(clusters, clusterAlias, () -> CertUtils.trustedCertsEnvVar(trustedCerts));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static io.strimzi.operator.cluster.model.KafkaMirrorMaker2Cluster.MIRRORMAKER_2_OAUTH_SECRETS_BASE_VOLUME_MOUNT;
import static io.strimzi.operator.cluster.model.KafkaMirrorMaker2Cluster.MIRRORMAKER_2_PASSWORD_VOLUME_MOUNT;

/**
* Kafka Mirror Maker 2 Connectors model
*/
Expand All @@ -49,10 +52,13 @@ public class KafkaMirrorMaker2Connectors {
private static final String TRUSTSTORE_SUFFIX = ".truststore.p12";
private static final String KEYSTORE_SUFFIX = ".keystore.p12";
private static final String CONNECT_CONFIG_FILE = "/tmp/strimzi-connect.properties";
private static final String CONNECTORS_CONFIG_FILE = "/tmp/strimzi-mirrormaker2-connector.properties";
private static final String SOURCE_CONNECTOR_SUFFIX = ".MirrorSourceConnector";
private static final String CHECKPOINT_CONNECTOR_SUFFIX = ".MirrorCheckpointConnector";
private static final String HEARTBEAT_CONNECTOR_SUFFIX = ".MirrorHeartbeatConnector";
protected static final String PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:MIRRORMAKER_2_CERTS_STORE_PASSWORD}";

private static final String PLACEHOLDER_MIRRORMAKER2_CONNECTOR_CONFIGS_TEMPLATE_CONFIG_PROVIDER_DIR = "${strimzidir:%s%s/%s:%s}";

private static final Map<String, Function<KafkaMirrorMaker2MirrorSpec, KafkaMirrorMaker2ConnectorSpec>> CONNECTORS = Map.of(
SOURCE_CONNECTOR_SUFFIX, KafkaMirrorMaker2MirrorSpec::getSourceConnector,
CHECKPOINT_CONNECTOR_SUFFIX, KafkaMirrorMaker2MirrorSpec::getCheckpointConnector,
Expand Down Expand Up @@ -238,21 +244,23 @@ public List<KafkaConnector> generateConnectorDefinitions() {
if (cluster.getAuthentication() instanceof KafkaClientAuthenticationTls) {
config.put(configPrefix + SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
config.put(configPrefix + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, STORE_LOCATION_ROOT + cluster.getAlias() + KEYSTORE_SUFFIX);
config.put(configPrefix + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "${strimzifile:" + CONNECTORS_CONFIG_FILE + ":" + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG + "}");
config.put(configPrefix + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR);
} else if (cluster.getAuthentication() instanceof KafkaClientAuthenticationPlain plainAuthentication) {
securityProtocol = cluster.getTls() != null ? "SASL_SSL" : "SASL_PLAINTEXT";
config.put(configPrefix + SaslConfigs.SASL_MECHANISM, "PLAIN");
String passwordFilePath = String.format(PLACEHOLDER_MIRRORMAKER2_CONNECTOR_CONFIGS_TEMPLATE_CONFIG_PROVIDER_DIR, MIRRORMAKER_2_PASSWORD_VOLUME_MOUNT, cluster.getAlias(), plainAuthentication.getPasswordSecret().getSecretName(), plainAuthentication.getPasswordSecret().getPassword());
config.put(configPrefix + SaslConfigs.SASL_JAAS_CONFIG,
AuthenticationUtils.jaasConfig("org.apache.kafka.common.security.plain.PlainLoginModule",
Map.of("username", plainAuthentication.getUsername(),
"password", "${strimzifile:" + CONNECTORS_CONFIG_FILE + ":" + cluster.getAlias() + ".sasl.password}")));
"password", passwordFilePath)));
} else if (cluster.getAuthentication() instanceof KafkaClientAuthenticationScram scramAuthentication) {
securityProtocol = cluster.getTls() != null ? "SASL_SSL" : "SASL_PLAINTEXT";
config.put(configPrefix + SaslConfigs.SASL_MECHANISM, scramAuthentication instanceof KafkaClientAuthenticationScramSha256 ? "SCRAM-SHA-256" : "SCRAM-SHA-512");
String passwordFilePath = String.format(PLACEHOLDER_MIRRORMAKER2_CONNECTOR_CONFIGS_TEMPLATE_CONFIG_PROVIDER_DIR, MIRRORMAKER_2_PASSWORD_VOLUME_MOUNT, cluster.getAlias(), scramAuthentication.getPasswordSecret().getSecretName(), scramAuthentication.getPasswordSecret().getPassword());
config.put(configPrefix + SaslConfigs.SASL_JAAS_CONFIG,
AuthenticationUtils.jaasConfig("org.apache.kafka.common.security.scram.ScramLoginModule",
Map.of("username", scramAuthentication.getUsername(),
"password", "${strimzifile:" + CONNECTORS_CONFIG_FILE + ":" + cluster.getAlias() + ".sasl.password}")));
"password", passwordFilePath)));
} else if (cluster.getAuthentication() instanceof KafkaClientAuthenticationOAuth oauthAuthentication) {
securityProtocol = cluster.getTls() != null ? "SASL_SSL" : "SASL_PLAINTEXT";
config.put(configPrefix + SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
Expand All @@ -274,28 +282,28 @@ private static String oauthJaasConfig(KafkaMirrorMaker2ClusterSpec cluster, Kafk
Map<String, String> jaasOptions = cluster.getAuthentication() instanceof KafkaClientAuthenticationOAuth ? AuthenticationUtils.oauthJaasOptions((KafkaClientAuthenticationOAuth) cluster.getAuthentication()) : new LinkedHashMap<>();

if (oauth.getClientSecret() != null) {
jaasOptions.put("oauth.client.secret", "${strimzifile:" + CONNECTORS_CONFIG_FILE + ":" + cluster.getAlias() + ".oauth.client.secret}");
jaasOptions.put("oauth.client.secret", String.format(PLACEHOLDER_MIRRORMAKER2_CONNECTOR_CONFIGS_TEMPLATE_CONFIG_PROVIDER_DIR, MIRRORMAKER_2_OAUTH_SECRETS_BASE_VOLUME_MOUNT, cluster.getAlias(), oauth.getClientSecret().getSecretName(), oauth.getClientSecret().getKey()));
}

if (oauth.getAccessToken() != null) {
jaasOptions.put("oauth.access.token", "${strimzifile:" + CONNECTORS_CONFIG_FILE + ":" + cluster.getAlias() + ".oauth.access.token}");
jaasOptions.put("oauth.access.token", String.format(PLACEHOLDER_MIRRORMAKER2_CONNECTOR_CONFIGS_TEMPLATE_CONFIG_PROVIDER_DIR, MIRRORMAKER_2_OAUTH_SECRETS_BASE_VOLUME_MOUNT, cluster.getAlias(), oauth.getAccessToken().getSecretName(), oauth.getAccessToken().getKey()));
}

if (oauth.getRefreshToken() != null) {
jaasOptions.put("oauth.refresh.token", "${strimzifile:" + CONNECTORS_CONFIG_FILE + ":" + cluster.getAlias() + ".oauth.refresh.token}");
jaasOptions.put("oauth.refresh.token", String.format(PLACEHOLDER_MIRRORMAKER2_CONNECTOR_CONFIGS_TEMPLATE_CONFIG_PROVIDER_DIR, MIRRORMAKER_2_OAUTH_SECRETS_BASE_VOLUME_MOUNT, cluster.getAlias(), oauth.getRefreshToken().getSecretName(), oauth.getRefreshToken().getKey()));
}

if (oauth.getPasswordSecret() != null) {
jaasOptions.put("oauth.password.grant.password", "${strimzifile:" + CONNECTORS_CONFIG_FILE + ":" + cluster.getAlias() + ".oauth.password.grant.password}");
jaasOptions.put("oauth.password.grant.password", String.format(PLACEHOLDER_MIRRORMAKER2_CONNECTOR_CONFIGS_TEMPLATE_CONFIG_PROVIDER_DIR, MIRRORMAKER_2_OAUTH_SECRETS_BASE_VOLUME_MOUNT, cluster.getAlias(), oauth.getPasswordSecret().getSecretName(), oauth.getPasswordSecret().getPassword()));
}

if (oauth.getClientAssertion() != null) {
jaasOptions.put("oauth.client.assertion", "${strimzifile:" + CONNECTORS_CONFIG_FILE + ":" + cluster.getAlias() + ".oauth.client.assertion}");
jaasOptions.put("oauth.client.assertion", String.format(PLACEHOLDER_MIRRORMAKER2_CONNECTOR_CONFIGS_TEMPLATE_CONFIG_PROVIDER_DIR, MIRRORMAKER_2_OAUTH_SECRETS_BASE_VOLUME_MOUNT, cluster.getAlias(), oauth.getClientAssertion().getSecretName(), oauth.getClientAssertion().getKey()));
}

if (oauth.getTlsTrustedCertificates() != null && !oauth.getTlsTrustedCertificates().isEmpty()) {
jaasOptions.put("oauth.ssl.truststore.location", "/tmp/kafka/clusters/" + cluster.getAlias() + "-oauth.truststore.p12");
jaasOptions.put("oauth.ssl.truststore.password", "${strimzifile:" + CONNECTORS_CONFIG_FILE + ":oauth.ssl.truststore.password}");
jaasOptions.put("oauth.ssl.truststore.password", PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR);
jaasOptions.put("oauth.ssl.truststore.type", "PKCS12");
}

Expand All @@ -318,7 +326,7 @@ private static String addTLSConfigToMirrorMaker2ConnectorConfig(Map<String, Obje
if (cluster.getTls().getTrustedCertificates() != null && !cluster.getTls().getTrustedCertificates().isEmpty()) {
config.put(configPrefix + SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12");
config.put(configPrefix + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, STORE_LOCATION_ROOT + cluster.getAlias() + TRUSTSTORE_SUFFIX);
config.put(configPrefix + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "${strimzifile:" + CONNECTORS_CONFIG_FILE + ":" + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + "}");
config.put(configPrefix + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR);
}

return "SSL";
Expand Down
Loading
Loading