From 6277d9a8c9e9b43613b6d78e2af8aca1fde9593f Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Wed, 29 May 2024 20:13:22 +0800 Subject: [PATCH] kvClient (ticdc): revert e5999e3 to remove useless metrics (#11184) close pingcap/tiflow#11073 --- cdc/kv/client.go | 79 +++---------------- cdc/kv/client_bench_test.go | 4 +- cdc/kv/client_test.go | 51 ++++++------ cdc/kv/metrics.go | 18 +---- cdc/kv/region_worker.go | 38 ++------- cdc/processor/pipeline/puller.go | 1 - cdc/processor/processor.go | 2 +- cdc/processor/sinkmanager/manager_test.go | 2 +- .../sinkmanager/table_sink_worker_test.go | 2 +- cdc/processor/sourcemanager/manager.go | 22 +++--- .../sourcemanager/puller/puller_wrapper.go | 2 - cdc/puller/ddl_puller.go | 1 - cdc/puller/puller.go | 8 +- cdc/puller/puller_test.go | 3 +- 14 files changed, 60 insertions(+), 173 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 0541923ed6c..33e64b8aeb8 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -18,7 +18,6 @@ import ( "fmt" "io" "math/rand" - "strconv" "strings" "sync" "sync/atomic" @@ -39,7 +38,6 @@ import ( "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/txnutil" "github.com/pingcap/tiflow/pkg/version" - "github.com/prometheus/client_golang/prometheus" tidbkv "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" @@ -75,8 +73,6 @@ const ( resolveLockMinInterval = 10 * time.Second scanRegionsConcurrency = 1024 - - tableMonitorInterval = 2 * time.Second ) // time interval to force kv client to terminate gRPC stream and reconnect @@ -166,7 +162,6 @@ type CDCKVClient interface { ts uint64, lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, - enableTableMonitor bool, ) error // RegionCount returns the number of captured regions. @@ -310,9 +305,8 @@ func (c *CDCClient) EventFeed( ctx context.Context, span regionspan.ComparableSpan, ts uint64, lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, - enableTableMonitor bool, ) error { - s := newEventFeedSession(c, span, lockResolver, ts, eventCh, enableTableMonitor) + s := newEventFeedSession(c, span, lockResolver, ts, eventCh) return s.eventFeed(ctx) } @@ -396,11 +390,6 @@ type eventFeedSession struct { rangeLock *regionspan.RegionRangeLock - enableTableMonitor bool - regionChSizeGauge prometheus.Gauge - errChSizeGauge prometheus.Gauge - rangeChSizeGauge prometheus.Gauge - // storeStreamsCache is used to cache the established gRPC streams to TiKV stores. // Note: The cache is not thread-safe, so it should be accessed in the same goroutine. // For now, it is only accessed in the `requestRegionToStore` goroutine. @@ -421,7 +410,6 @@ func newEventFeedSession( lockResolver txnutil.LockResolver, startTs uint64, eventCh chan<- model.RegionFeedEvent, - enableTableMonitor bool, ) *eventFeedSession { id := allocateRequestID() rangeLock := regionspan.NewRegionRangeLock( @@ -429,23 +417,16 @@ func newEventFeedSession( client.changefeed.Namespace+"."+client.changefeed.ID) return &eventFeedSession{ - client: client, - startTs: startTs, - changefeed: client.changefeed, - tableID: client.tableID, - tableName: client.tableName, - storeStreamsCache: make(map[string]*eventFeedStream), - totalSpan: totalSpan, - eventCh: eventCh, - rangeLock: rangeLock, - lockResolver: lockResolver, - enableTableMonitor: enableTableMonitor, - regionChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, - client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "region"), - errChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, - client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "err"), - rangeChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace, - client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "range"), + client: client, + startTs: startTs, + changefeed: client.changefeed, + tableID: client.tableID, + tableName: client.tableName, + storeStreamsCache: make(map[string]*eventFeedStream), + totalSpan: totalSpan, + eventCh: eventCh, + rangeLock: rangeLock, + lockResolver: lockResolver, resolvedTsPool: sync.Pool{ New: func() any { return ®ionStatefulEvent{ @@ -486,7 +467,6 @@ func (s *eventFeedSession) eventFeed(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case task := <-s.requestRangeCh.Out(): - s.rangeChSizeGauge.Dec() // divideAndSendEventFeedToRegions could be blocked for some time, // since it must wait for the region lock available. In order to // consume region range request from `requestRangeCh` as soon as @@ -508,7 +488,6 @@ func (s *eventFeedSession) eventFeed(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case errInfo := <-s.errCh.Out(): - s.errChSizeGauge.Dec() if err := s.handleError(ctx, errInfo); err != nil { return err } @@ -518,7 +497,6 @@ func (s *eventFeedSession) eventFeed(ctx context.Context) error { }) s.requestRangeCh.In() <- rangeRequestTask{span: s.totalSpan} - s.rangeChSizeGauge.Inc() log.Info("event feed started", zap.String("namespace", s.changefeed.Namespace), @@ -539,7 +517,6 @@ func (s *eventFeedSession) scheduleDivideRegionAndRequest( task := rangeRequestTask{span: span} select { case s.requestRangeCh.In() <- task: - s.rangeChSizeGauge.Inc() case <-ctx.Done(): } } @@ -553,7 +530,6 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single sri.lockedRange = res.LockedRange select { case s.regionCh.In() <- sri: - s.regionChSizeGauge.Inc() case <-ctx.Done(): } case regionspan.LockRangeStatusStale: @@ -615,7 +591,6 @@ func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErr zap.Error(errorInfo.err)) select { case s.errCh.In() <- errorInfo: - s.errChSizeGauge.Inc() case <-ctx.Done(): } } @@ -791,7 +766,6 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error { case <-ctx.Done(): return errors.Trace(ctx.Err()) case sri = <-s.regionCh.Out(): - s.regionChSizeGauge.Dec() } // Send a resolved ts to event channel first, for two reasons: @@ -1040,11 +1014,6 @@ func (s *eventFeedSession) receiveFromStream( metricSendEventBatchResolvedSize := batchResolvedEventSize. WithLabelValues(s.changefeed.Namespace, s.changefeed.ID) - metricReceiveBusyRatio := workerBusyRatio.WithLabelValues( - s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), stream.addr, "event-receiver") - metricProcessBusyRatio := workerBusyRatio.WithLabelValues( - s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), stream.addr, "event-processor") - // always create a new region worker, because `receiveFromStream` is ensured // to call exactly once from outer code logic worker := newRegionWorker(parentCtx, stream, s) @@ -1063,7 +1032,7 @@ func (s *eventFeedSession) receiveFromStream( eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - err := handleExit(worker.run(s.enableTableMonitor)) + err := handleExit(worker.run()) if err != nil { log.Error("region worker exited with error", zap.String("namespace", s.changefeed.Namespace), @@ -1079,32 +1048,10 @@ func (s *eventFeedSession) receiveFromStream( }) receiveEvents := func() error { - var receiveTime time.Duration - var processTime time.Duration - startToWork := time.Now() - maxCommitTs := model.Ts(0) for { - startToReceive := time.Now() cevent, err := stream.client.Recv() - if s.enableTableMonitor { - receiveTime += time.Since(startToReceive) - if time.Since(startToWork) >= tableMonitorInterval { - now := time.Now() - // Receive busyRatio indicates the blocking time (receive and decode grpc msg) of the worker. - busyRatio := receiveTime.Seconds() / now.Sub(startToWork).Seconds() * 100 - metricReceiveBusyRatio.Set(busyRatio) - receiveTime = 0 - // Process busyRatio indicates the working time (dispatch to region worker) of the worker. - busyRatio = processTime.Seconds() / now.Sub(startToWork).Seconds() * 100 - metricProcessBusyRatio.Set(busyRatio) - processTime = 0 - - startToWork = now - } - } - failpoint.Inject("kvClientRegionReentrantError", func(op failpoint.Value) { if op.(string) == "error" { _ = worker.sendEvents(ctx, []*regionStatefulEvent{nil}) @@ -1163,7 +1110,6 @@ func (s *eventFeedSession) receiveFromStream( return nil } - startToProcess := time.Now() size := cevent.Size() if size > warnRecvMsgSizeThreshold { regionCount := 0 @@ -1208,7 +1154,6 @@ func (s *eventFeedSession) receiveFromStream( tsStat.commitTs.Store(maxCommitTs) } } - processTime += time.Since(startToProcess) } } eg.Go(func() error { diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index c517e71afc9..4c00a77064c 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -202,7 +202,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) ( go func() { err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) if errors.Cause(err) != context.Canceled { b.Error(err) } @@ -296,7 +296,7 @@ func prepareBench(b *testing.B, regionNum int) ( go func() { err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) if errors.Cause(err) != context.Canceled { b.Error(err) } diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 664b1dfcec8..89b2d53d629 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -330,7 +330,7 @@ func TestConnectOfflineTiKV(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 1, lockResolver, eventCh, false) + 1, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -428,7 +428,7 @@ func TestRecvLargeMessageSize(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 1, lockResolver, eventCh, false) + 1, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -528,7 +528,7 @@ func TestHandleError(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("d")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -688,7 +688,7 @@ func TestCompatibilityWithSameConn(t *testing.T) { defer wg2.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.True(t, cerror.ErrVersionIncompatible.Equal(err)) }() @@ -756,7 +756,7 @@ func TestClusterIDMismatch(t *testing.T) { defer wg2.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.True(t, cerror.ErrClusterIDMismatch.Equal(err)) }() @@ -823,7 +823,7 @@ func testHandleFeedEvent(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1284,7 +1284,7 @@ func TestStreamSendWithError(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - 100, lockerResolver, eventCh, false) + 100, lockerResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1402,7 +1402,7 @@ func testStreamRecvWithError(t *testing.T, failpointStr string) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1537,7 +1537,7 @@ func TestStreamRecvWithErrorAndResolvedGoBack(t *testing.T) { defer close(eventCh) err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1746,7 +1746,7 @@ func TestIncompatibleTiKV(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1823,7 +1823,7 @@ func TestNoPendingRegionError(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -1901,7 +1901,7 @@ func TestDropStaleRequest(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2015,7 +2015,7 @@ func TestResolveLock(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2123,7 +2123,7 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { defer clientWg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, errUnreachable, err) }() @@ -2250,7 +2250,7 @@ func testEventAfterFeedStop(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2437,7 +2437,7 @@ func TestOutOfRegionRangeEvent(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2655,7 +2655,7 @@ func TestResolveLockNoCandidate(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2751,7 +2751,7 @@ func TestFailRegionReentrant(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2834,7 +2834,7 @@ func TestClientV1UnlockRangeReentrant(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2902,7 +2902,7 @@ func testClientErrNoPendingRegion(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -2980,7 +2980,7 @@ func testKVClientForceReconnect(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3131,7 +3131,7 @@ func TestConcurrentProcessRangeRequest(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3248,7 +3248,7 @@ func TestEvTimeUpdate(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3374,7 +3374,7 @@ func TestRegionWorkerExitWhenIsIdle(t *testing.T) { defer wg.Done() err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3468,7 +3468,7 @@ func TestPrewriteNotMatchError(t *testing.T) { defer wg.Done() err = cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - 100, lockResolver, eventCh, false) + 100, lockResolver, eventCh) require.Equal(t, context.Canceled, errors.Cause(err)) }() @@ -3546,6 +3546,5 @@ func createFakeEventFeedSession() *eventFeedSession { nil, /*lockResolver*/ 100, /*startTs*/ nil, /*eventCh*/ - false, ) } diff --git a/cdc/kv/metrics.go b/cdc/kv/metrics.go index 31142175274..4f063ed470d 100644 --- a/cdc/kv/metrics.go +++ b/cdc/kv/metrics.go @@ -71,7 +71,7 @@ var ( Subsystem: "kvclient", Name: "channel_size", Help: "size of each channel in kv client", - }, []string{"namespace", "changefeed", "table", "type"}) + }, []string{"channel"}) clientRegionTokenSize = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", @@ -110,20 +110,6 @@ var ( Help: "region events batch size", Buckets: prometheus.ExponentialBuckets(1, 2, 20), }) - workerBusyRatio = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "ticdc", - Subsystem: "kvclient", - Name: "region_worker_busy_ratio", - Help: "Busy ratio (X ms in 1s) for region worker.", - }, []string{"namespace", "changefeed", "table", "store", "type"}) - workerChannelSize = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "ticdc", - Subsystem: "kvclient", - Name: "region_worker_channel_size", - Help: "size of each channel in region worker", - }, []string{"namespace", "changefeed", "table", "store", "type"}) ) // InitMetrics registers all metrics in the kv package @@ -140,8 +126,6 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(batchResolvedEventSize) registry.MustRegister(grpcPoolStreamGauge) registry.MustRegister(regionEventsBatchSize) - registry.MustRegister(workerBusyRatio) - registry.MustRegister(workerChannelSize) // Register client metrics to registry. registry.MustRegister(grpcMetrics) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 150e82975eb..5ab91216855 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -18,7 +18,6 @@ import ( "encoding/hex" "reflect" "runtime" - "strconv" "sync" "sync/atomic" "time" @@ -75,9 +74,6 @@ type regionWorkerMetrics struct { metricSendEventResolvedCounter prometheus.Counter metricSendEventCommitCounter prometheus.Counter metricSendEventCommittedCounter prometheus.Counter - - metricWorkerBusyRatio prometheus.Gauge - metricWorkerChannelSize prometheus.Gauge } /* @@ -117,7 +113,7 @@ type regionWorker struct { inputPendingEvents int32 } -func newRegionWorkerMetrics(changefeedID model.ChangeFeedID, tableID string, storeAddr string) *regionWorkerMetrics { +func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetrics { metrics := ®ionWorkerMetrics{} metrics.metricReceivedEventSize = eventSize.WithLabelValues("received") metrics.metricDroppedEventSize = eventSize.WithLabelValues("dropped") @@ -140,11 +136,6 @@ func newRegionWorkerMetrics(changefeedID model.ChangeFeedID, tableID string, sto metrics.metricSendEventCommittedCounter = sendEventCounter. WithLabelValues("committed", changefeedID.Namespace, changefeedID.ID) - metrics.metricWorkerBusyRatio = workerBusyRatio.WithLabelValues( - changefeedID.Namespace, changefeedID.ID, tableID, storeAddr, "event-handler") - metrics.metricWorkerChannelSize = workerChannelSize.WithLabelValues( - changefeedID.Namespace, changefeedID.ID, tableID, storeAddr, "input") - return metrics } @@ -164,7 +155,7 @@ func newRegionWorker( rtsManager: newRegionTsManager(), rtsUpdateCh: make(chan *rtsUpdateEvent, 1024), concurrency: s.client.config.KVClient.WorkerConcurrent, - metrics: newRegionWorkerMetrics(s.changefeed, strconv.FormatInt(s.tableID, 10), stream.addr), + metrics: newRegionWorkerMetrics(s.changefeed), inputPendingEvents: 0, } } @@ -458,7 +449,7 @@ func (w *regionWorker) onHandleExit(err error) { } } -func (w *regionWorker) eventHandler(ctx context.Context, enableTableMonitor bool) error { +func (w *regionWorker) eventHandler(ctx context.Context) error { exitFn := func() error { log.Info("region worker closed by error", zap.String("namespace", w.session.client.changefeed.Namespace), @@ -468,11 +459,6 @@ func (w *regionWorker) eventHandler(ctx context.Context, enableTableMonitor bool return cerror.ErrRegionWorkerExit.GenWithStackByArgs() } - metricsTicker := time.NewTicker(tableMonitorInterval) - defer metricsTicker.Stop() - var processTime time.Duration - startToWork := time.Now() - highWatermarkMet := false for { select { @@ -480,17 +466,6 @@ func (w *regionWorker) eventHandler(ctx context.Context, enableTableMonitor bool return errors.Trace(ctx.Err()) case err := <-w.errorCh: return errors.Trace(err) - case <-metricsTicker.C: - if enableTableMonitor { - w.metrics.metricWorkerChannelSize.Set(float64(len(w.inputCh))) - - now := time.Now() - // busyRatio indicates the actual working time of the worker. - busyRatio := processTime.Seconds() / now.Sub(startToWork).Seconds() * 100 - w.metrics.metricWorkerBusyRatio.Set(busyRatio) - startToWork = now - processTime = 0 - } case events, ok := <-w.inputCh: if !ok { return exitFn() @@ -507,8 +482,6 @@ func (w *regionWorker) eventHandler(ctx context.Context, enableTableMonitor bool } regionEventsBatchSize.Observe(float64(len(events))) - - start := time.Now() inputPending := atomic.LoadInt32(&w.inputPendingEvents) if highWatermarkMet { highWatermarkMet = int(inputPending) >= regionWorkerLowWatermark @@ -569,7 +542,6 @@ func (w *regionWorker) eventHandler(ctx context.Context, enableTableMonitor bool w.session.resolvedTsPool.Put(ev) } } - processTime += time.Since(start) } } } @@ -626,7 +598,7 @@ func (w *regionWorker) cancelStream(delay time.Duration) { time.Sleep(delay) } -func (w *regionWorker) run(enableTableMonitor bool) error { +func (w *regionWorker) run() error { defer func() { for _, h := range w.handles { h.Unregister() @@ -651,7 +623,7 @@ func (w *regionWorker) run(enableTableMonitor bool) error { return handleError(w.checkErrorReconnect(w.resolveLock(ctx))) }) wg.Go(func() error { - return handleError(w.eventHandler(ctx, enableTableMonitor)) + return handleError(w.eventHandler(ctx)) }) _ = handleError(w.collectWorkpoolError(ctx)) _ = wg.Wait() diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index 5e5b63d9e44..188b0167e97 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -85,7 +85,6 @@ func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext, n.tableID, n.tableName, filterLoop, - false, ) n.wg.Go(func() error { ctx.Throw(errors.Trace(n.plr.Run(ctxC))) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 4ae710e7a7e..5aba460e0a3 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -852,7 +852,7 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error { return errors.Trace(err) } p.sourceManager = sourcemanager.New(p.changefeedID, p.upstream, p.mg, - sortEngine, p.errCh, p.changefeed.Info.Config.BDRMode, p.changefeed.Info.Config.EnableTableMonitor, pullerSafeModeAtStart) + sortEngine, p.errCh, p.changefeed.Info.Config.BDRMode, pullerSafeModeAtStart) p.sinkManager, err = sinkmanager.New(stdCtx, p.changefeedID, p.changefeed.Info, p.upstream, p.schemaStorage, p.redoDMLMgr, p.sourceManager, diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index bdd75798d06..12818eeeec8 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -57,7 +57,7 @@ func createManagerWithMemEngine( ) (*SinkManager, engine.SortEngine) { sortEngine := memory.New(context.Background()) up := upstream.NewUpstream4Test(&mockPD{}) - sm := sourcemanager.New(changefeedID, up, &entry.MockMountGroup{}, sortEngine, errChan, false, false, false) + sm := sourcemanager.New(changefeedID, up, &entry.MockMountGroup{}, sortEngine, errChan, false, false) manager, err := New( ctx, changefeedID, changefeedInfo, up, &entry.MockSchemaStorage{Resolved: math.MaxUint64}, diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 33f7ffc8817..bf72629c302 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -39,7 +39,7 @@ func createWorker( ) (*sinkWorker, engine.SortEngine) { sortEngine := memory.New(context.Background()) sm := sourcemanager.New(changefeedID, upstream.NewUpstream4Test(&mockPD{}), - &entry.MockMountGroup{}, sortEngine, make(chan error, 1), false, false, false) + &entry.MockMountGroup{}, sortEngine, make(chan error, 1), false, false) // To avoid refund or release panics. quota := memquota.NewMemQuota(changefeedID, memQuota+1024*1024*1024, "") diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 9916e4c68fa..9064da82cb6 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -57,8 +57,6 @@ type SourceManager struct { safeModeAtStart bool startTs model.Ts - - enableTableMonitor bool } // New creates a new source manager. @@ -69,7 +67,6 @@ func New( engine engine.SortEngine, errChan chan error, bdrMode bool, - enableTableMonitor bool, safeModeAtStart bool, ) *SourceManager { startTs, err := getCurrentTs(context.Background(), up.PDClient) @@ -80,15 +77,14 @@ func New( return nil } return &SourceManager{ - changefeedID: changefeedID, - up: up, - mg: mg, - engine: engine, - errChan: errChan, - bdrMode: bdrMode, - enableTableMonitor: enableTableMonitor, - safeModeAtStart: safeModeAtStart, - startTs: startTs, + changefeedID: changefeedID, + up: up, + mg: mg, + engine: engine, + errChan: errChan, + bdrMode: bdrMode, + safeModeAtStart: safeModeAtStart, + startTs: startTs, } } @@ -117,7 +113,7 @@ func (m *SourceManager) AddTable(ctx cdccontext.Context, tableID model.TableID, return m.safeModeAtStart && isOldUpdateKVEntry(raw, m.startTs) } p := pullerwrapper.NewPullerWrapper(m.changefeedID, tableID, tableName, startTs, m.bdrMode, shouldSplitKVEntry, splitUpdateKVEntry) - p.Start(ctx, m.up, m.engine, m.errChan, m.enableTableMonitor) + p.Start(ctx, m.up, m.engine, m.errChan) m.pullers.Store(tableID, p) } diff --git a/cdc/processor/sourcemanager/puller/puller_wrapper.go b/cdc/processor/sourcemanager/puller/puller_wrapper.go index 19d00845641..1fb7a2ac934 100644 --- a/cdc/processor/sourcemanager/puller/puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/puller_wrapper.go @@ -91,7 +91,6 @@ func (n *Wrapper) Start( up *upstream.Upstream, eventSortEngine engine.SortEngine, errChan chan<- error, - enableTableMonitor bool, ) { failpoint.Inject("ProcessorAddTableError", func() { errChan <- cerrors.New("processor add table injected error") @@ -116,7 +115,6 @@ func (n *Wrapper) Start( n.tableID, n.tableName, n.bdrMode, - enableTableMonitor, ) n.wg.Add(1) go func() { diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index d29b131acc8..ed7f505ca37 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -511,7 +511,6 @@ func NewDDLJobPuller( changefeed, -1, DDLPullerTableName, ddLPullerFilterLoop, - false, ), kvStorage: kvStorage, outputCh: make(chan *model.DDLJobEntry, defaultPullerOutputChanSize), diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 930571dddb3..a89559f5445 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -80,8 +80,6 @@ type pullerImpl struct { lastForwardResolvedTs uint64 // startResolvedTs is the resolvedTs when puller is initialized startResolvedTs uint64 - - enableTableMonitor bool } // New create a new Puller fetch event start from checkpointTs and put into buf. @@ -98,7 +96,6 @@ func New(ctx context.Context, tableID model.TableID, tableName string, filterLoop bool, - enableTableMonitor bool, ) Puller { tikvStorage, ok := kvStorage.(tikv.Storage) if !ok { @@ -133,8 +130,7 @@ func New(ctx context.Context, tableName: tableName, cfg: cfg, - startResolvedTs: checkpointTs, - enableTableMonitor: enableTableMonitor, + startResolvedTs: checkpointTs, } return p } @@ -152,7 +148,7 @@ func (p *pullerImpl) Run(ctx context.Context) error { span := span g.Go(func() error { - return p.kvCli.EventFeed(ctx, span, checkpointTs, lockResolver, eventCh, p.enableTableMonitor) + return p.kvCli.EventFeed(ctx, span, checkpointTs, lockResolver, eventCh) }) } diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 5f888e89848..408a86ba903 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -79,7 +79,6 @@ func (mc *mockCDCKVClient) EventFeed( ts uint64, lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, - enableTableMonitor bool, ) error { for { select { @@ -132,7 +131,7 @@ func newPullerForTest( plr := New( ctx, pdCli, grpcPool, regionCache, store, pdutil.NewClock4Test(), checkpointTs, spans, config.GetDefaultServerConfig(), - model.DefaultChangeFeedID("changefeed-id-test"), 0, "table-test", false, false) + model.DefaultChangeFeedID("changefeed-id-test"), 0, "table-test", false) wg.Add(1) go func() { defer wg.Done()