Skip to content

Commit

Permalink
Added code for support 'SASL_PLAINTEXT' security protocol for kafka.
Browse files Browse the repository at this point in the history
  • Loading branch information
melonaerial committed Feb 25, 2025
1 parent 7faf897 commit ae4d0ff
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 12 deletions.
5 changes: 4 additions & 1 deletion kafka/include/userver/kafka/impl/configuration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ struct CommonConfiguration final {

struct SecurityConfiguration final {
struct Plaintext {};
struct SaslPlaintext {
std::string security_mechanism;
};
struct SaslSsl {
std::string security_mechanism;
std::string ssl_ca_location;
};

using SecurityProtocol = std::variant<Plaintext, SaslSsl>;
using SecurityProtocol = std::variant<Plaintext, SaslPlaintext, SaslSsl>;
SecurityProtocol security_protocol{};
};

Expand Down
39 changes: 30 additions & 9 deletions kafka/src/kafka/impl/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ void VerifyComponentNamePrefix(const std::string& component_name, const std::str
// producer's component should start with kafka-producer, consumer's - with
// kafka-consumer
if (component_name.rfind(expected_prefix) != 0) {
throw std::runtime_error{
fmt::format("Component '{}' doesn't start with '{}'", component_name, expected_prefix)};
throw std::runtime_error{fmt::format("Component '{}' doesn't start with '{}'", component_name, expected_prefix)
};
}
}

Expand Down Expand Up @@ -105,8 +105,13 @@ CommonConfiguration Parse(const yaml_config::YamlConfig& config, formats::parse:

SecurityConfiguration Parse(const yaml_config::YamlConfig& config, formats::parse::To<SecurityConfiguration>) {
static constexpr std::string_view kPlainTextProtocol{"PLAINTEXT"};
static constexpr std::string_view kSaslPlainTextProtocol{"SASL_PLAINTEXT"};
static constexpr std::string_view kSaslSSLProtocol{"SASL_SSL"};
static constexpr std::array kSupportedSecurityProtocols{kPlainTextProtocol, kSaslSSLProtocol};
static constexpr std::array kSupportedSecurityProtocols{
kPlainTextProtocol,
kSaslSSLProtocol,
kSaslPlainTextProtocol,
};
static constexpr std::array kSupportedSaslSecurityMechanisms{"PLAIN", "SCRAM-SHA-512"};

SecurityConfiguration security{};
Expand All @@ -117,15 +122,23 @@ SecurityConfiguration Parse(const yaml_config::YamlConfig& config, formats::pars
}
if (protocol == kPlainTextProtocol) {
security.security_protocol.emplace<SecurityConfiguration::Plaintext>();
} else if (protocol == kSaslSSLProtocol) {
const auto mechanism = config["sasl_mechanisms"].As<std::string>();
if (!IsSupportedOption(kSupportedSaslSecurityMechanisms, mechanism)) {
ThrowUnsupportedOption("SASL security mechanism", mechanism, kSupportedSaslSecurityMechanisms);
}
return security;
}

const auto mechanism = config["sasl_mechanisms"].As<std::string>();
if (!IsSupportedOption(kSupportedSaslSecurityMechanisms, mechanism)) {
ThrowUnsupportedOption("SASL security mechanism", mechanism, kSupportedSaslSecurityMechanisms);
}

if (protocol == kSaslPlainTextProtocol) {
security.security_protocol.emplace<SecurityConfiguration::SaslPlaintext>(SecurityConfiguration::SaslPlaintext{
/*security_mechanism=*/mechanism,
});
} else if (protocol == kSaslSSLProtocol) {
security.security_protocol.emplace<SecurityConfiguration::SaslSsl>(SecurityConfiguration::SaslSsl{
/*security_mechanism=*/mechanism,
/*ssl_ca_location=*/config["ssl_ca_location"].As<std::string>()});
/*ssl_ca_location=*/config["ssl_ca_location"].As<std::string>(),
});
}

return security;
Expand Down Expand Up @@ -229,6 +242,14 @@ void Configuration::SetSecurity(const SecurityConfiguration& security, const Sec
utils::Visit(
security.security_protocol,
[](const SecurityConfiguration::Plaintext&) { LOG_INFO() << "Using PLAINTEXT security protocol"; },
[this, &secrets](const SecurityConfiguration::SaslPlaintext& sasl_ssl) {
LOG_INFO() << "Using SASL_PLAINTEXT security protocol";

SetOption("security.protocol", "SASL_PLAINTEXT");
SetOption("sasl.mechanism", sasl_ssl.security_mechanism);
SetOption("sasl.username", secrets.username);
SetOption("sasl.password", secrets.password);
},
[this, &secrets](const SecurityConfiguration::SaslSsl& sasl_ssl) {
LOG_INFO() << "Using SASL_SSL security protocol";

Expand Down
47 changes: 45 additions & 2 deletions kafka/tests/configuration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ UTEST_F(ConfigurationTest, ProducerSecure) {
kafka::impl::ProducerConfiguration producer_configuration{};
producer_configuration.security.security_protocol = kafka::impl::SecurityConfiguration::SaslSsl{
/*security_mechanism=*/"SCRAM-SHA-512",
/*ssl_ca_location=*/"probe"};
/*ssl_ca_location=*/"probe",
};

kafka::impl::Secret secrets;
secrets.username = kafka::impl::Secret::SecretType{"username"};
Expand All @@ -150,11 +151,33 @@ UTEST_F(ConfigurationTest, ProducerSecure) {
EXPECT_EQ(configuration->GetOption("ssl.ca.location"), "probe");
}

UTEST_F(ConfigurationTest, ProducerSecurePlaintext) {
kafka::impl::ProducerConfiguration producer_configuration{};
producer_configuration.security.security_protocol = kafka::impl::SecurityConfiguration::SaslPlaintext{
/*security_mechanism=*/"SCRAM-SHA-512",
};

kafka::impl::Secret secrets;
secrets.username = kafka::impl::Secret::SecretType{"username"};
secrets.password = kafka::impl::Secret::SecretType{"password"};

std::optional<kafka::impl::Configuration> configuration;
UEXPECT_NO_THROW(configuration.emplace(MakeProducerConfiguration("kafka-producer", producer_configuration, secrets))
);

EXPECT_EQ(configuration->GetOption("security.protocol"), "sasl_plaintext");
EXPECT_EQ(configuration->GetOption("sasl.mechanism"), "SCRAM-SHA-512");
EXPECT_EQ(configuration->GetOption("sasl.username"), "username");
EXPECT_EQ(configuration->GetOption("sasl.password"), "password");
EXPECT_EQ(configuration->GetOption("ssl.ca.location"), "probe");
}

UTEST_F(ConfigurationTest, ConsumerSecure) {
kafka::impl::ConsumerConfiguration consumer_configuration{};
consumer_configuration.security.security_protocol = kafka::impl::SecurityConfiguration::SaslSsl{
/*security_mechanism=*/"SCRAM-SHA-512",
/*ssl_ca_location=*/"/etc/ssl/cert.ca"};
/*ssl_ca_location=*/"/etc/ssl/cert.ca",
};

kafka::impl::Secret secrets;
secrets.username = kafka::impl::Secret::SecretType{"username"};
Expand All @@ -171,6 +194,26 @@ UTEST_F(ConfigurationTest, ConsumerSecure) {
EXPECT_EQ(configuration->GetOption("ssl.ca.location"), "/etc/ssl/cert.ca");
}

UTEST_F(ConfigurationTest, ConsumerSecurePlaintext) {
kafka::impl::ConsumerConfiguration consumer_configuration{};
consumer_configuration.security.security_protocol = kafka::impl::SecurityConfiguration::SaslPlaintext{
/*security_mechanism=*/"SCRAM-SHA-512",
};

kafka::impl::Secret secrets;
secrets.username = kafka::impl::Secret::SecretType{"username"};
secrets.password = kafka::impl::Secret::SecretType{"password"};

std::optional<kafka::impl::Configuration> configuration;
UEXPECT_NO_THROW(configuration.emplace(MakeConsumerConfiguration("kafka-consumer", consumer_configuration, secrets))
);

EXPECT_EQ(configuration->GetOption("security.protocol"), "sasl_plaintext");
EXPECT_EQ(configuration->GetOption("sasl.mechanism"), "SCRAM-SHA-512");
EXPECT_EQ(configuration->GetOption("sasl.username"), "username");
EXPECT_EQ(configuration->GetOption("sasl.password"), "password");
}

UTEST_F(ConfigurationTest, IncorrectComponentName) {
UEXPECT_THROW(MakeProducerConfiguration("producer"), std::runtime_error);
UEXPECT_THROW(MakeConsumerConfiguration("consumer"), std::runtime_error);
Expand Down

0 comments on commit ae4d0ff

Please sign in to comment.