Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: kafka subpub and bindings support compression #3676

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
config.Consumer.Group.Session.Timeout = meta.SessionTimeout
config.ChannelBufferSize = meta.channelBufferSize

config.Producer.Compression = meta.internalCompression

config.Net.KeepAlive = meta.ClientConnectionKeepAliveInterval
config.Metadata.RefreshFrequency = meta.ClientConnectionTopicMetadataRefreshInterval

Expand Down
15 changes: 14 additions & 1 deletion common/component/kafka/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ const (
certificateAuthType = "certificate"
clientCert = "clientCert"
clientKey = "clientKey"
consumeRetryEnabled = "consumeRetryEnabled"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this?

This is an unused constant, so I removed it.

consumeRetryInterval = "consumeRetryInterval"
authType = "authType"
passwordAuthType = "password"
Expand All @@ -50,6 +49,7 @@ const (
consumerFetchDefault = "consumerFetchDefault"
channelBufferSize = "channelBufferSize"
valueSchemaType = "valueSchemaType"
compression = "compression"

// Kafka client config default values.
// Refresh interval < keep alive time so that way connection can be kept alive indefinitely if desired.
Expand Down Expand Up @@ -102,6 +102,10 @@ type KafkaMetadata struct {
consumerFetchMin int32 `mapstructure:"-"`
consumerFetchDefault int32 `mapstructure:"-"`

// configs for kafka producer
Compression string `mapstructure:"Compression"`
internalCompression sarama.CompressionCodec `mapstructure:"-"`

// schema registry
SchemaRegistryURL string `mapstructure:"schemaRegistryURL"`
SchemaRegistryAPIKey string `mapstructure:"schemaRegistryAPIKey"`
Expand Down Expand Up @@ -149,6 +153,7 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
ConsumeRetryEnabled: k.DefaultConsumeRetryEnabled,
ConsumeRetryInterval: 100 * time.Millisecond,
internalVersion: sarama.V2_0_0_0, //nolint:nosnakecase
internalCompression: sarama.CompressionNone,
channelBufferSize: 256,
consumerFetchMin: 1,
consumerFetchDefault: 1024 * 1024,
Expand Down Expand Up @@ -294,6 +299,14 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
m.internalVersion = version
}

if m.Compression != "" {
compression, err := parseCompression(m.Compression)
if err != nil {
return nil, err
}
m.internalCompression = compression
}

if val, ok := meta[channelBufferSize]; ok && val != "" {
v, err := strconv.Atoi(val)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions common/component/kafka/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,18 +397,21 @@ func TestMetadataProducerValues(t *testing.T) {
require.NoError(t, err)
require.Equal(t, defaultClientConnectionTopicMetadataRefreshInterval, meta.ClientConnectionTopicMetadataRefreshInterval)
require.Equal(t, defaultClientConnectionKeepAliveInterval, meta.ClientConnectionKeepAliveInterval)
require.Equal(t, sarama.CompressionNone, meta.internalCompression)
})

t.Run("setting producer values explicitly", func(t *testing.T) {
k := getKafka()
m := getCompleteMetadata()
m[clientConnectionTopicMetadataRefreshInterval] = "3m0s"
m[clientConnectionKeepAliveInterval] = "4m0s"
m[compression] = "gzip"

meta, err := k.getKafkaMetadata(m)
require.NoError(t, err)
require.Equal(t, 3*time.Minute, meta.ClientConnectionTopicMetadataRefreshInterval)
require.Equal(t, 4*time.Minute, meta.ClientConnectionKeepAliveInterval)
require.Equal(t, sarama.CompressionGZIP, meta.internalCompression)
})

t.Run("setting producer invalid values so defaults take over", func(t *testing.T) {
Expand All @@ -422,6 +425,17 @@ func TestMetadataProducerValues(t *testing.T) {
require.Equal(t, defaultClientConnectionTopicMetadataRefreshInterval, meta.ClientConnectionTopicMetadataRefreshInterval)
require.Equal(t, defaultClientConnectionKeepAliveInterval, meta.ClientConnectionKeepAliveInterval)
})

t.Run("setting producer invalid compression value", func(t *testing.T) {
k := getKafka()
m := getCompleteMetadata()
m[compression] = "invalid"

meta, err := k.getKafkaMetadata(m)
require.Error(t, err)
require.Nil(t, meta)
require.Equal(t, "kafka error: invalid compression: invalid", err.Error())
})
}

func TestMetadataChannelBufferSize(t *testing.T) {
Expand Down
18 changes: 17 additions & 1 deletion common/component/kafka/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@ func parseInitialOffset(value string) (initialOffset int64, err error) {
return initialOffset, err
}

// parseCompression parses the compression codec from the given string.
// If the string is empty, it returns the default compression codec.
// If the string is not empty, it returns the parsed compression codec.
// If the string is not empty and not a valid compression codec, it returns an error.
// Supported compression codecs are: none, gzip, snappy, lz4, zstd.
func parseCompression(value string) (compression sarama.CompressionCodec, err error) {
compression = sarama.CompressionNone // Default
if value != "" {
unmarshalErr := compression.UnmarshalText([]byte(value))
if unmarshalErr != nil {
return sarama.CompressionNone, fmt.Errorf("kafka error: invalid compression: %s", value)
}
}
return compression, err
}

// isValidPEM validates the provided input has PEM formatted block.
func isValidPEM(val string) bool {
block, _ := pem.Decode([]byte(val))
Expand All @@ -64,7 +80,7 @@ func isValidPEM(val string) bool {
// TopicHandlerConfig is the map of topics and sruct containing handler and their config.
type TopicHandlerConfig map[string]SubscriptionHandlerConfig

// // TopicList returns the list of topics
// TopicList returns the list of topics
func (tbh TopicHandlerConfig) TopicList() []string {
topics := make([]string, len(tbh))
i := 0
Expand Down