From 8362b376881e681355187122709a10fd4f07d4ab Mon Sep 17 00:00:00 2001 From: wlwilliamx <53336371+wlwilliamx@users.noreply.github.com> Date: Fri, 20 Dec 2024 22:07:30 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #11869 Signed-off-by: ti-chi-bot --- cdc/entry/schema/snapshot.go | 3 +++ cdc/entry/schema_storage.go | 20 +++++--------------- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index 71486e1442a..1b736f63ce6 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -109,6 +109,7 @@ func (s *Snapshot) FillSchemaName(job *timodel.Job) error { return nil } +<<<<<<< HEAD // GetSchemaVersion returns the schema version of the meta. func GetSchemaVersion(meta *timeta.Meta) (int64, error) { // After we get the schema version at startTs, if the diff corresponding to that version does not exist, @@ -127,6 +128,8 @@ func GetSchemaVersion(meta *timeta.Meta) (int64, error) { return version, nil } +======= +>>>>>>> c5b8800f8e (schemaStorage(ticdc): remove `schemaVersion` in `schemaStorage` (#11869)) // NewSnapshotFromMeta creates a schema snapshot from meta. func NewSnapshotFromMeta( id model.ChangeFeedID, diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index 75a46392278..e37d7afe5e7 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -72,9 +72,8 @@ type schemaStorage struct { snaps []*schema.Snapshot snapsMu sync.RWMutex - gcTs uint64 - resolvedTs uint64 - schemaVersion int64 + gcTs uint64 + resolvedTs uint64 filter filter.Filter @@ -91,9 +90,8 @@ func NewSchemaStorage( role util.Role, filter filter.Filter, ) (SchemaStorage, error) { var ( - snap *schema.Snapshot - version int64 - err error + snap *schema.Snapshot + err error ) // storage may be nil in some unit test cases. if storage == nil { @@ -104,7 +102,6 @@ func NewSchemaStorage( if err != nil { return nil, errors.Trace(err) } - version, err = schema.GetSchemaVersion(meta) if err != nil { return nil, errors.Trace(err) } @@ -115,7 +112,6 @@ func NewSchemaStorage( forceReplicate: forceReplicate, filter: filter, id: id, - schemaVersion: version, role: role, }, nil } @@ -193,7 +189,6 @@ func (s *schemaStorage) GetLastSnapshot() *schema.Snapshot { // HandleDDLJob creates a new snapshot in storage and handles the ddl job func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error { if s.skipJob(job) { - s.schemaVersion = job.BinlogInfo.SchemaVersion s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS) return nil } @@ -202,16 +197,13 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error { var snap *schema.Snapshot if len(s.snaps) > 0 { lastSnap := s.snaps[len(s.snaps)-1] - // We use schemaVersion to check if an already-executed DDL job is processed for a second time. - // Unexecuted DDL jobs should have largest schemaVersions. - if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() || job.BinlogInfo.SchemaVersion <= s.schemaVersion { + if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() { log.Info("schemaStorage: ignore foregone DDL", zap.String("namespace", s.id.Namespace), zap.String("changefeed", s.id.ID), zap.String("DDL", job.Query), zap.Int64("jobID", job.ID), zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), - zap.Int64("schemaVersion", s.schemaVersion), zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion), zap.String("role", s.role.String())) return nil @@ -233,7 +225,6 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error { return errors.Trace(err) } s.snaps = append(s.snaps, snap) - s.schemaVersion = job.BinlogInfo.SchemaVersion s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS) log.Info("schemaStorage: update snapshot by the DDL job", zap.String("namespace", s.id.Namespace), @@ -242,7 +233,6 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error { zap.String("table", job.TableName), zap.String("query", job.Query), zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), - zap.Uint64("schemaVersion", uint64(s.schemaVersion)), zap.String("role", s.role.String())) return nil } From 219361f5e46c7442e99c4852f81e5ef68a903682 Mon Sep 17 00:00:00 2001 From: wlwilliamx Date: Tue, 24 Dec 2024 18:30:24 +0800 Subject: [PATCH 2/2] snapshot.go: resolve conflicts --- cdc/entry/schema/snapshot.go | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index 1b736f63ce6..21d32ac11b6 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -109,27 +109,6 @@ func (s *Snapshot) FillSchemaName(job *timodel.Job) error { return nil } -<<<<<<< HEAD -// GetSchemaVersion returns the schema version of the meta. -func GetSchemaVersion(meta *timeta.Meta) (int64, error) { - // After we get the schema version at startTs, if the diff corresponding to that version does not exist, - // it means that the job is not committed yet, so we should subtract one from the version, i.e., version--. - version, err := meta.GetSchemaVersion() - if err != nil { - return 0, errors.Trace(err) - } - diff, err := meta.GetSchemaDiff(version) - if err != nil { - return 0, errors.Trace(err) - } - if diff == nil { - version-- - } - return version, nil -} - -======= ->>>>>>> c5b8800f8e (schemaStorage(ticdc): remove `schemaVersion` in `schemaStorage` (#11869)) // NewSnapshotFromMeta creates a schema snapshot from meta. func NewSnapshotFromMeta( id model.ChangeFeedID,