Skip to content

Commit

Permalink
feat(connector): MSK connection by AWS IAM (#16625)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
Co-authored-by: Runji Wang <[email protected]>
  • Loading branch information
yuhao-su and wangrunji0408 committed May 9, 2024
1 parent f78eb21 commit 164e4a3
Show file tree
Hide file tree
Showing 13 changed files with 403 additions and 129 deletions.
24 changes: 22 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ axum = "=0.7.4" # TODO: 0.7.5+ does not work with current toolchain
etcd-client = { package = "madsim-etcd-client", version = "0.4" }
futures-async-stream = "0.2.9"
hytra = "0.1"
rdkafka = { package = "madsim-rdkafka", version = "0.3.4", features = [
rdkafka = { package = "madsim-rdkafka", version = "0.4.1", features = [
"cmake-build",
] }
hashbrown = { version = "0.14", features = ["ahash", "inline-more", "nightly"] }
Expand Down
2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ auto_impl = "1"
await-tree = { workspace = true }
aws-config = { workspace = true }
aws-credential-types = { workspace = true }
aws-msk-iam-sasl-signer = "1.0.0"
aws-sdk-kinesis = { workspace = true }
aws-sdk-s3 = { workspace = true }
aws-smithy-http = { workspace = true }
Expand All @@ -43,6 +44,7 @@ aws-types = { workspace = true }
base64 = "0.22"
byteorder = "1"
bytes = { version = "1", features = ["serde"] }
cfg-or-panic = "0.2"
chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
Expand Down
25 changes: 22 additions & 3 deletions src/connector/src/connector_common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ use crate::source::nats::source::NatsOffset;
pub const PRIVATE_LINK_BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints";
pub const PRIVATE_LINK_TARGETS_KEY: &str = "privatelink.targets";

const AWS_MSK_IAM_AUTH: &str = "AWS_MSK_IAM";

#[derive(Debug, Clone, Deserialize)]
pub struct AwsPrivateLinkItem {
pub az_id: Option<String>,
Expand All @@ -65,9 +67,9 @@ pub struct AwsAuthProps {
pub access_key: Option<String>,
pub secret_key: Option<String>,
pub session_token: Option<String>,
/// IAM role
pub arn: Option<String>,
/// This field was added for kinesis. Not sure if it's useful for other connectors.
/// Please ignore it in the documentation for now.
/// external ID in IAM role trust policy
pub external_id: Option<String>,
pub profile: Option<String>,
}
Expand Down Expand Up @@ -181,7 +183,7 @@ pub struct KafkaCommon {
#[serde(rename = "properties.ssl.key.password")]
ssl_key_password: Option<String>,

/// SASL mechanism if SASL is enabled. Currently support PLAIN, SCRAM and GSSAPI.
/// SASL mechanism if SASL is enabled. Currently support PLAIN, SCRAM, GSSAPI, and AWS_MSK_IAM.
#[serde(rename = "properties.sasl.mechanism")]
sasl_mechanism: Option<String>,

Expand Down Expand Up @@ -286,6 +288,13 @@ impl RdKafkaPropertiesCommon {

impl KafkaCommon {
pub(crate) fn set_security_properties(&self, config: &mut ClientConfig) {
// AWS_MSK_IAM
if self.is_aws_msk_iam() {
config.set("security.protocol", "SASL_SSL");
config.set("sasl.mechanism", "OAUTHBEARER");
return;
}

// Security protocol
if let Some(security_protocol) = self.security_protocol.as_ref() {
config.set("security.protocol", security_protocol);
Expand Down Expand Up @@ -356,6 +365,16 @@ impl KafkaCommon {
// Currently, we only support unsecured OAUTH.
config.set("enable.sasl.oauthbearer.unsecure.jwt", "true");
}

pub(crate) fn is_aws_msk_iam(&self) -> bool {
if let Some(sasl_mechanism) = self.sasl_mechanism.as_ref()
&& sasl_mechanism == AWS_MSK_IAM_AUTH
{
true
} else {
false
}
}
}

#[derive(Clone, Debug, Deserialize, WithOptions)]
Expand Down
31 changes: 21 additions & 10 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,19 @@ use with_options::WithOptions;

use super::catalog::{SinkFormat, SinkFormatDesc};
use super::{Sink, SinkError, SinkParam};
use crate::connector_common::{KafkaCommon, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon};
use crate::connector_common::{
AwsAuthProps, KafkaCommon, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon,
};
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::formatter::SinkFormatterImpl;
use crate::sink::log_store::DeliveryFutureManagerAddFuture;
use crate::sink::writer::{
AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink,
};
use crate::sink::{DummySinkCommitCoordinator, Result, SinkWriterParam};
use crate::source::kafka::{KafkaProperties, KafkaSplitEnumerator, PrivateLinkProducerContext};
use crate::source::kafka::{
KafkaContextCommon, KafkaProperties, KafkaSplitEnumerator, RwProducerContext,
};
use crate::source::{SourceEnumeratorContext, SplitEnumerator};
use crate::{
deserialize_duration_from_string, deserialize_u32_from_string, dispatch_sink_formatter_impl,
Expand Down Expand Up @@ -244,6 +248,9 @@ pub struct KafkaConfig {

#[serde(flatten)]
pub privatelink_common: KafkaPrivateLinkCommon,

#[serde(flatten)]
pub aws_auth_props: AwsAuthProps,
}

impl KafkaConfig {
Expand Down Expand Up @@ -274,6 +281,7 @@ impl From<KafkaConfig> for KafkaProperties {
rdkafka_properties_common: val.rdkafka_properties_common,
rdkafka_properties_consumer: Default::default(),
privatelink_common: val.privatelink_common,
aws_auth_props: val.aws_auth_props,
unknown_fields: Default::default(),
}
}
Expand Down Expand Up @@ -393,7 +401,7 @@ const KAFKA_WRITER_MAX_QUEUE_SIZE_RATIO: f32 = 1.2;
const KAFKA_WRITER_MAX_QUEUE_SIZE: usize = 100000;

struct KafkaPayloadWriter<'a> {
inner: &'a FutureProducer<PrivateLinkProducerContext>,
inner: &'a FutureProducer<RwProducerContext>,
add_future: DeliveryFutureManagerAddFuture<'a, KafkaSinkDeliveryFuture>,
config: &'a KafkaConfig,
}
Expand All @@ -402,13 +410,13 @@ pub type KafkaSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> +

pub struct KafkaSinkWriter {
formatter: SinkFormatterImpl,
inner: FutureProducer<PrivateLinkProducerContext>,
inner: FutureProducer<RwProducerContext>,
config: KafkaConfig,
}

impl KafkaSinkWriter {
async fn new(config: KafkaConfig, formatter: SinkFormatterImpl) -> Result<Self> {
let inner: FutureProducer<PrivateLinkProducerContext> = {
let inner: FutureProducer<RwProducerContext> = {
let mut c = ClientConfig::new();

// KafkaConfig configuration
Expand All @@ -419,13 +427,16 @@ impl KafkaSinkWriter {
c.set("bootstrap.servers", &config.common.brokers);

// Create the producer context, will be used to create the producer
let producer_ctx = PrivateLinkProducerContext::new(
config.privatelink_common.broker_rewrite_map.clone(),
// fixme: enable kafka native metrics for sink
let broker_rewrite_map = config.privatelink_common.broker_rewrite_map.clone();
let ctx_common = KafkaContextCommon::new(
broker_rewrite_map,
None,
None,
)?;

config.aws_auth_props.clone(),
config.common.is_aws_msk_iam(),
)
.await?;
let producer_ctx = RwProducerContext::new(ctx_common);
// Generate the producer
c.create_with_context(producer_ctx).await?
};
Expand Down

0 comments on commit 164e4a3

Please sign in to comment.