diff --git a/common/component/kafka/consumer.go b/common/component/kafka/consumer.go index a05e611707..8de163f296 100644 --- a/common/component/kafka/consumer.go +++ b/common/component/kafka/consumer.go @@ -16,6 +16,7 @@ package kafka import ( "errors" "fmt" + "net/url" "strconv" "sync" "time" @@ -128,7 +129,7 @@ func (consumer *consumer) doBulkCallback(session sarama.ConsumerGroupSession, messages []*sarama.ConsumerMessage, handler BulkEventHandler, topic string, ) error { consumer.k.logger.Debugf("Processing Kafka bulk message: %s", topic) - messageValues := make([]KafkaBulkMessageEntry, (len(messages))) + messageValues := make([]KafkaBulkMessageEntry, len(messages)) for i, message := range messages { if message != nil { @@ -205,14 +206,14 @@ func GetEventMetadata(message *sarama.ConsumerMessage) map[string]string { if message != nil { metadata := make(map[string]string, len(message.Headers)+5) if message.Key != nil { - metadata[keyMetadataKey] = string(message.Key) + metadata[keyMetadataKey] = url.QueryEscape(string(message.Key)) } metadata[offsetMetadataKey] = strconv.FormatInt(message.Offset, 10) metadata[topicMetadataKey] = message.Topic metadata[timestampMetadataKey] = strconv.FormatInt(message.Timestamp.UnixMilli(), 10) metadata[partitionMetadataKey] = strconv.FormatInt(int64(message.Partition), 10) for _, header := range message.Headers { - metadata[string(header.Key)] = string(header.Value) + metadata[string(header.Key)] = url.QueryEscape(string(header.Value)) } return metadata } diff --git a/common/component/kafka/metadata_test.go b/common/component/kafka/metadata_test.go index 3006f5f3fd..6e25d309ad 100644 --- a/common/component/kafka/metadata_test.go +++ b/common/component/kafka/metadata_test.go @@ -15,6 +15,7 @@ package kafka import ( "fmt" + "net/url" "strconv" "testing" "time" @@ -556,4 +557,31 @@ func TestGetEventMetadata(t *testing.T) { act := GetEventMetadata(nil) require.Nil(t, act) }) + + t.Run("key with invalid value escaped", func(t *testing.T) { + keyValue := "key1\xFF" + escapedKeyValue := url.QueryEscape(keyValue) + + m := sarama.ConsumerMessage{ + Headers: nil, Timestamp: ts, Key: []byte(keyValue), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic", + } + act := GetEventMetadata(&m) + require.Equal(t, escapedKeyValue, act[keyMetadataKey]) + }) + + t.Run("header with invalid value escaped", func(t *testing.T) { + headerKey := "key1" + headerValue := "value1\xFF" + escapedHeaderValue := url.QueryEscape(headerValue) + + headers := []*sarama.RecordHeader{ + {Key: []byte(headerKey), Value: []byte(headerValue)}, + } + m := sarama.ConsumerMessage{ + Headers: headers, Timestamp: ts, Key: []byte("MyKey"), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic", + } + act := GetEventMetadata(&m) + require.Len(t, act, 6) + require.Equal(t, escapedHeaderValue, act[headerKey]) + }) }