forked from content-services/content-sources-backend
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.go
36 lines (31 loc) · 884 Bytes
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package event
import (
"fmt"
"strings"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/rs/zerolog/log"
)
func getHeaderString(headers []kafka.Header) string {
var output []string = make([]string, len(headers))
for i, header := range headers {
output[i] = fmt.Sprintf("%s: %s", header.Key, string(header.Value))
}
return fmt.Sprintf("{%s}", strings.Join(output, ", "))
}
func logEventMessageInfo(msg *kafka.Message, text string) {
if msg == nil || text == "" {
return
}
log.Info().
Str("Topic", *msg.TopicPartition.Topic).
Str("Key", string(msg.Key)).
Str("Headers", getHeaderString(msg.Headers)).
Msg(text)
}
func logEventMessageError(msg *kafka.Message, err error) {
if msg == nil || err == nil {
return
}
log.Error().
Msgf("error processing event message: headers=%v; payload=%v: %s", msg.Headers, string(msg.Value), err.Error())
}