From 22b597b2f6a3a5d95813848fd194ab662b68e867 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 12 Dec 2024 16:02:53 +0800 Subject: [PATCH 1/6] alway enable the sarama logger --- pkg/logutil/log.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/logutil/log.go b/pkg/logutil/log.go index 1f7afe61d79..138db763b9c 100644 --- a/pkg/logutil/log.go +++ b/pkg/logutil/log.go @@ -223,14 +223,11 @@ func initMySQLLogger() error { // initSaramaLogger hacks logger used in sarama lib func initSaramaLogger(level zapcore.Level) error { - // only available less than info level - if !zapcore.InfoLevel.Enabled(level) { - logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level) - if err != nil { - return errors.Trace(err) - } - sarama.Logger = logger + logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level) + if err != nil { + return errors.Trace(err) } + sarama.Logger = logger return nil } From 7711fa26613f08914f9d24184196b664733a90ec Mon Sep 17 00:00:00 2001 From: nhsmw Date: Sun, 29 Dec 2024 20:30:02 +0800 Subject: [PATCH 2/6] add more sarama log --- go.mod | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go.mod b/go.mod index 0a6f5dbfa92..12f150124bd 100644 --- a/go.mod +++ b/go.mod @@ -418,5 +418,7 @@ replace sourcegraph.com/sourcegraph/appdash => github.com/sourcegraph/appdash v0 replace sourcegraph.com/sourcegraph/appdash-data => github.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 +replace github.com/IBM/sarama => github.com/3AceShowHand/sarama v0.0.0-20241204051647-318559e536ae + // tls10server=1 godebug tlsrsakex=1 From 730dc632208535ef909bde912852f5548a9e7f05 Mon Sep 17 00:00:00 2001 From: nhsmw Date: Sun, 29 Dec 2024 20:32:43 +0800 Subject: [PATCH 3/6] . --- go.sum | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go.sum b/go.sum index 4d4837cb879..fc0b96df5e1 100644 --- a/go.sum +++ b/go.sum @@ -31,6 +31,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.39.1 h1:MvraqHKhogCOTXTlct/9C3K3+Uy2jBmFYb3/Sp6dVtY= cloud.google.com/go/storage v1.39.1/go.mod h1:xK6xZmxZmo+fyP7+DEF6FhNc24/JAe95OLyOHCXFH1o= +github.com/3AceShowHand/sarama v0.0.0-20241204051647-318559e536ae h1:TZxjgE3hc0iM0GIxz1RywfrEheWz8CsSYAEXyo6EZV8= +github.com/3AceShowHand/sarama v0.0.0-20241204051647-318559e536ae/go.mod h1:xdpu7sd6OE1uxNdjYTSKUfY8FaKkJES9/+EyjSgiGQk= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= From c8049d952ec08746d66087dfa887ee276445d655 Mon Sep 17 00:00:00 2001 From: nhsmw Date: Mon, 30 Dec 2024 10:42:55 +0800 Subject: [PATCH 4/6] add sarama send msg log --- pkg/sink/kafka/factory.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/sink/kafka/factory.go b/pkg/sink/kafka/factory.go index a8a8f05e00f..98dcdc7d43e 100644 --- a/pkg/sink/kafka/factory.go +++ b/pkg/sink/kafka/factory.go @@ -266,5 +266,6 @@ func (p *saramaAsyncProducer) AsyncSend(ctx context.Context, topic string, parti return errors.Trace(ctx.Err()) case p.producer.Input() <- msg: } + log.Info("async send msg", zap.Int32("partition", partition), zap.Any("commitTs", message.Ts)) return nil } From 477a705e8d97d07409ebd9eee50022fe4299a932 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Mon, 30 Dec 2024 14:31:42 +0800 Subject: [PATCH 5/6] add headers --- pkg/sink/codec/common/message.go | 17 ++++++++++++++++- pkg/sink/kafka/factory.go | 1 + 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/pkg/sink/codec/common/message.go b/pkg/sink/codec/common/message.go index 6ee0d52b89d..b5dd3ce5670 100644 --- a/pkg/sink/codec/common/message.go +++ b/pkg/sink/codec/common/message.go @@ -16,8 +16,10 @@ package common import ( "encoding/binary" "encoding/json" + "strconv" "time" + "github.com/IBM/sarama" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/tikv/client-go/v2/oracle" @@ -50,7 +52,20 @@ type Message struct { // We didn't append any `Headers` when send the message, so ignore the calculations related to it. // If `ProducerMessage` Headers fields used, this method should also adjust. func (m *Message) Length() int { - return len(m.Key) + len(m.Value) + MaxRecordOverhead + headers := m.Headers() + var headerLen int + for _, header := range headers { + headerLen += len(header.Key) + len(header.Value) + 2*binary.MaxVarintLen32 + } + return headerLen + len(m.Key) + len(m.Value) + MaxRecordOverhead +} + +// Headers returns the headers of Kafka message +func (m *Message) Headers() []sarama.RecordHeader { + headers := []sarama.RecordHeader{ + {Key: []byte("commitTs"), Value: []byte(strconv.FormatUint(m.Ts, 10))}, + } + return headers } // PhysicalTime returns physical time part of Ts in time.Time diff --git a/pkg/sink/kafka/factory.go b/pkg/sink/kafka/factory.go index 98dcdc7d43e..7f7a9051f4f 100644 --- a/pkg/sink/kafka/factory.go +++ b/pkg/sink/kafka/factory.go @@ -257,6 +257,7 @@ func (p *saramaAsyncProducer) AsyncSend(ctx context.Context, topic string, parti msg := &sarama.ProducerMessage{ Topic: topic, Partition: partition, + Headers: message.Headers(), Key: sarama.StringEncoder(message.Key), Value: sarama.ByteEncoder(message.Value), Metadata: message.Callback, From e092af911265c0a1d046bc75dd622657cc688ae4 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 31 Dec 2024 11:22:23 +0800 Subject: [PATCH 6/6] add table --- pkg/sink/codec/common/message.go | 1 + pkg/sink/kafka/factory.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/sink/codec/common/message.go b/pkg/sink/codec/common/message.go index b5dd3ce5670..9a71f519626 100644 --- a/pkg/sink/codec/common/message.go +++ b/pkg/sink/codec/common/message.go @@ -64,6 +64,7 @@ func (m *Message) Length() int { func (m *Message) Headers() []sarama.RecordHeader { headers := []sarama.RecordHeader{ {Key: []byte("commitTs"), Value: []byte(strconv.FormatUint(m.Ts, 10))}, + {Key: []byte("table"), Value: []byte(m.GetTable())}, } return headers } diff --git a/pkg/sink/kafka/factory.go b/pkg/sink/kafka/factory.go index 7f7a9051f4f..46c1c4ab643 100644 --- a/pkg/sink/kafka/factory.go +++ b/pkg/sink/kafka/factory.go @@ -267,6 +267,6 @@ func (p *saramaAsyncProducer) AsyncSend(ctx context.Context, topic string, parti return errors.Trace(ctx.Err()) case p.producer.Input() <- msg: } - log.Info("async send msg", zap.Int32("partition", partition), zap.Any("commitTs", message.Ts)) + log.Info("async send msg", zap.Int32("partition", partition), zap.Any("commitTs", message.Ts), zap.String("table", message.GetTable()), zap.String("shcema", message.GetSchema())) return nil }