From 45bcdcacd871d380616b8fd66425fb87a97b747b Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Fri, 3 Jan 2025 16:53:29 -0500 Subject: [PATCH] address comments Signed-off-by: Wenqi Mou --- br/cmd/br/cmd.go | 2 +- br/cmd/br/restore.go | 4 +- .../log_client/batch_meta_processor.go | 4 +- br/pkg/restore/log_client/client.go | 12 +-- br/pkg/stream/rewrite_meta_rawkv.go | 1 - br/pkg/stream/rewrite_meta_rawkv_test.go | 4 +- br/pkg/stream/table_mapping.go | 95 ++++++------------- br/pkg/stream/table_mapping_test.go | 79 --------------- br/pkg/task/stream.go | 6 +- br/pkg/utils/schema.go | 4 +- br/tests/config/tikv.toml | 3 - 11 files changed, 42 insertions(+), 172 deletions(-) diff --git a/br/cmd/br/cmd.go b/br/cmd/br/cmd.go index fa81922f88fbc8..9c73c53177fac5 100644 --- a/br/cmd/br/cmd.go +++ b/br/cmd/br/cmd.go @@ -37,7 +37,7 @@ var ( tidbGlue = gluetidb.New() envLogToTermKey = "BR_LOG_TO_TERM" - filterOutSysAndMemKeepPrivilege = []string{ + filterOutSysAndMemKeepAuthAndBind = []string{ "*.*", fmt.Sprintf("!%s.*", utils.TemporaryDBName("*")), "!mysql.*", diff --git a/br/cmd/br/restore.go b/br/cmd/br/restore.go index cab5d756b4eb5e..5ea2a99333afb1 100644 --- a/br/cmd/br/restore.go +++ b/br/cmd/br/restore.go @@ -186,7 +186,7 @@ func newFullRestoreCommand() *cobra.Command { return runRestoreCommand(cmd, task.FullRestoreCmd) }, } - task.DefineFilterFlags(command, filterOutSysAndMemKeepPrivilege, false) + task.DefineFilterFlags(command, filterOutSysAndMemKeepAuthAndBind, false) task.DefineRestoreSnapshotFlags(command) return command } @@ -254,7 +254,7 @@ func newStreamRestoreCommand() *cobra.Command { return runRestoreCommand(command, task.PointRestoreCmd) }, } - task.DefineFilterFlags(command, filterOutSysAndMemKeepPrivilege, true) + task.DefineFilterFlags(command, filterOutSysAndMemKeepAuthAndBind, true) task.DefineStreamRestoreFlags(command) return command } diff --git a/br/pkg/restore/log_client/batch_meta_processor.go b/br/pkg/restore/log_client/batch_meta_processor.go index 10032b8f2b1c18..79397117cb7823 100644 --- a/br/pkg/restore/log_client/batch_meta_processor.go +++ b/br/pkg/restore/log_client/batch_meta_processor.go @@ -186,7 +186,7 @@ func (mp *MetaKVInfoProcessor) ProcessBatch( mp.tableHistoryManager.RecordDBIdToName(dbInfo.ID, dbInfo.Name.O) // update the id map - if err = mp.tableMappingManager.ProcessDBValueAndUpdateIdMapping(dbInfo); err != nil { + if err = mp.tableMappingManager.ProcessDBValueAndUpdateIdMapping(&dbInfo); err != nil { return nil, errors.Trace(err) } } else if !meta.IsDBkey(rawKey.Key) { @@ -210,7 +210,7 @@ func (mp *MetaKVInfoProcessor) ProcessBatch( mp.tableHistoryManager.AddTableHistory(tableInfo.ID, tableInfo.Name.String(), dbID) // update the id map - if err = mp.tableMappingManager.ProcessTableValueAndUpdateIdMapping(dbID, tableInfo); err != nil { + if err = mp.tableMappingManager.ProcessTableValueAndUpdateIdMapping(dbID, &tableInfo); err != nil { return nil, errors.Trace(err) } } diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index 5bec3718785683..99936b5002e65e 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -1025,15 +1025,9 @@ func (rc *LogClient) GetBaseIDMap( if err != nil { return nil, errors.Trace(err) } - existTiFlashTable := false - rc.dom.InfoSchema().ListTablesWithSpecialAttribute(func(tableInfo *model.TableInfo) bool { - if tableInfo.TiFlashReplica != nil && tableInfo.TiFlashReplica.Count > 0 { - existTiFlashTable = true - } - return false - }) - if existTiFlashTable { - return nil, errors.Errorf("exist table(s) have tiflash replica, please remove it before restore") + err := rc.validateNoTiFlashReplica() + if err != nil { + return nil, errors.Trace(err) } } diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index b2a3a3fb9e63ee..d67f2dabd538a9 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -33,7 +33,6 @@ import ( "go.uber.org/zap" ) -type RewriteStatus int type UpstreamID = int64 type DownstreamID = int64 diff --git a/br/pkg/stream/rewrite_meta_rawkv_test.go b/br/pkg/stream/rewrite_meta_rawkv_test.go index de505b5c48dd3e..3df8a9fa6ae26e 100644 --- a/br/pkg/stream/rewrite_meta_rawkv_test.go +++ b/br/pkg/stream/rewrite_meta_rawkv_test.go @@ -412,7 +412,7 @@ func TestRewriteTableInfoForExchangePartition(t *testing.T) { value, err := json.Marshal(&t1Copy) require.Nil(t, err) - err = tc.ProcessTableValueAndUpdateIdMapping(dbID1, *t1Copy) + err = tc.ProcessTableValueAndUpdateIdMapping(dbID1, t1Copy) require.Nil(t, err) sr := NewSchemasReplace( @@ -434,7 +434,7 @@ func TestRewriteTableInfoForExchangePartition(t *testing.T) { // rewrite no partition table value, err = json.Marshal(&t2Copy) require.Nil(t, err) - err = tc.ProcessTableValueAndUpdateIdMapping(dbID2, *t2Copy) + err = tc.ProcessTableValueAndUpdateIdMapping(dbID2, t2Copy) require.Nil(t, err) value, err = sr.rewriteTableInfo(value, dbID2) require.Nil(t, err) diff --git a/br/pkg/stream/table_mapping.go b/br/pkg/stream/table_mapping.go index 705c4dbf74985a..750d3ffdaee773 100644 --- a/br/pkg/stream/table_mapping.go +++ b/br/pkg/stream/table_mapping.go @@ -43,7 +43,10 @@ const InitialTempId int64 = 0 // the dummy ids, it builds the final state of the db replace map type TableMappingManager struct { DBReplaceMap map[UpstreamID]*DBReplace - globalIdMap map[UpstreamID]DownstreamID + + // used during scanning log to identify already seen id mapping. For example after exchange partition, the + // exchanged-in table already had an id mapping can be identified in the partition so don't allocate a new id. + globalIdMap map[UpstreamID]DownstreamID // a counter for temporary IDs, need to get real global id // once full restore completes @@ -67,22 +70,13 @@ func (tm *TableMappingManager) FromDBReplaceMap(dbReplaceMap map[UpstreamID]*DBR if dbReplaceMap == nil { dbReplaceMap = make(map[UpstreamID]*DBReplace) } - globalTableIdMap := make(map[UpstreamID]DownstreamID) - for _, dr := range dbReplaceMap { - for tblID, tr := range dr.TableMap { - globalTableIdMap[tblID] = tr.TableID - for oldpID, newpID := range tr.PartitionMap { - globalTableIdMap[oldpID] = newpID - } - } - } - tm.globalIdMap = globalTableIdMap - tm.DBReplaceMap = dbReplaceMap + // doesn't even need to build globalIdMap since loading DBReplaceMap from saved checkpoint + tm.DBReplaceMap = dbReplaceMap return nil } -func (tm *TableMappingManager) ProcessDBValueAndUpdateIdMapping(dbInfo model.DBInfo) error { +func (tm *TableMappingManager) ProcessDBValueAndUpdateIdMapping(dbInfo *model.DBInfo) error { if dr, exist := tm.DBReplaceMap[dbInfo.ID]; !exist { newID := tm.generateTempID() tm.DBReplaceMap[dbInfo.ID] = NewDBReplace(dbInfo.Name.O, newID) @@ -93,7 +87,7 @@ func (tm *TableMappingManager) ProcessDBValueAndUpdateIdMapping(dbInfo model.DBI return nil } -func (tm *TableMappingManager) ProcessTableValueAndUpdateIdMapping(dbID int64, tableInfo model.TableInfo) error { +func (tm *TableMappingManager) ProcessTableValueAndUpdateIdMapping(dbID int64, tableInfo *model.TableInfo) error { var ( exist bool dbReplace *DBReplace @@ -144,18 +138,6 @@ func (tm *TableMappingManager) ProcessTableValueAndUpdateIdMapping(dbID int64, t } func (tm *TableMappingManager) MergeBaseDBReplace(baseMap map[UpstreamID]*DBReplace) { - // update globalIdMap - for upstreamID, dbReplace := range baseMap { - tm.globalIdMap[upstreamID] = dbReplace.DbID - - for tableUpID, tableReplace := range dbReplace.TableMap { - tm.globalIdMap[tableUpID] = tableReplace.TableID - for partUpID, partDownID := range tableReplace.PartitionMap { - tm.globalIdMap[partUpID] = partDownID - } - } - } - // merge baseMap to DBReplaceMap for upstreamID, baseDBReplace := range baseMap { if existingDBReplace, exists := tm.DBReplaceMap[upstreamID]; exists { @@ -179,7 +161,7 @@ func (tm *TableMappingManager) MergeBaseDBReplace(baseMap map[UpstreamID]*DBRepl } func (tm *TableMappingManager) IsEmpty() bool { - return len(tm.DBReplaceMap) == 0 && len(tm.globalIdMap) == 0 + return len(tm.DBReplaceMap) == 0 } func (tm *TableMappingManager) ReplaceTemporaryIDs( @@ -192,38 +174,43 @@ func (tm *TableMappingManager) ReplaceTemporaryIDs( // find actually used temporary IDs usedTempIDs := make(map[DownstreamID]struct{}) + // Helper function to check and add temporary ID + addTempIDIfNeeded := func(id DownstreamID) error { + if id < 0 { + if _, exists := usedTempIDs[id]; exists { + return errors.Annotate(berrors.ErrRestoreInvalidRewrite, + fmt.Sprintf("found duplicate temporary ID: %d", id)) + } + usedTempIDs[id] = struct{}{} + } + return nil + } + // check DBReplaceMap for used temporary IDs // any value less than 0 is temporary ID for _, dr := range tm.DBReplaceMap { - if dr.DbID < 0 { - usedTempIDs[dr.DbID] = struct{}{} + if err := addTempIDIfNeeded(dr.DbID); err != nil { + return err } for _, tr := range dr.TableMap { - if tr.TableID < 0 { - usedTempIDs[tr.TableID] = struct{}{} + if err := addTempIDIfNeeded(tr.TableID); err != nil { + return err } for _, partID := range tr.PartitionMap { - if partID < 0 { - usedTempIDs[partID] = struct{}{} + if err := addTempIDIfNeeded(partID); err != nil { + return err } } } } - // check in globalIdMap as well just be safe - for _, downID := range tm.globalIdMap { - if downID < 0 { - usedTempIDs[downID] = struct{}{} - } - } - tempIDs := make([]DownstreamID, 0, len(usedTempIDs)) // convert to sorted slice for id := range usedTempIDs { tempIDs = append(tempIDs, id) } - // sort to -1, -2, -4 ... etc + // sort to -1, -2, -4, -8 ... etc sort.Slice(tempIDs, func(i, j int) bool { return tempIDs[i] > tempIDs[j] }) @@ -246,13 +233,6 @@ func (tm *TableMappingManager) ReplaceTemporaryIDs( idMapping[tempID] = newIDs[i] } - // replace temp id in globalIdMap - for upID, downID := range tm.globalIdMap { - if newID, exists := idMapping[downID]; exists { - tm.globalIdMap[upID] = newID - } - } - // replace temp id in DBReplaceMap for _, dr := range tm.DBReplaceMap { if newID, exists := idMapping[dr.DbID]; exists { @@ -277,9 +257,6 @@ func (tm *TableMappingManager) ReplaceTemporaryIDs( } func (tm *TableMappingManager) FilterDBReplaceMap(filter *utils.PiTRTableTracker) { - // collect all IDs that should be kept - keepIDs := make(map[UpstreamID]struct{}) - // iterate through existing DBReplaceMap for dbID, dbReplace := range tm.DBReplaceMap { // remove entire database if not in filter @@ -288,27 +265,13 @@ func (tm *TableMappingManager) FilterDBReplaceMap(filter *utils.PiTRTableTracker continue } - keepIDs[dbID] = struct{}{} - // filter tables in this database - for tableID, tableReplace := range dbReplace.TableMap { + for tableID := range dbReplace.TableMap { if !filter.ContainsTable(dbID, tableID) { delete(dbReplace.TableMap, tableID) - } else { - keepIDs[tableID] = struct{}{} - for partitionID := range tableReplace.PartitionMap { - keepIDs[partitionID] = struct{}{} - } } } } - - // remove any ID from globalIdMap that isn't in keepIDs - for id := range tm.globalIdMap { - if _, ok := keepIDs[id]; !ok { - delete(tm.globalIdMap, id) - } - } } // ToProto produces schemas id maps from up-stream to down-stream. diff --git a/br/pkg/stream/table_mapping_test.go b/br/pkg/stream/table_mapping_test.go index bf75efea2ed847..1188768dde5b63 100644 --- a/br/pkg/stream/table_mapping_test.go +++ b/br/pkg/stream/table_mapping_test.go @@ -557,61 +557,10 @@ func TestFilterDBReplaceMap(t *testing.T) { tm := NewTableMappingManager() tm.DBReplaceMap = tt.initial - // create a copy of globalIdMap before filtering - globalIdMap := make(map[UpstreamID]DownstreamID) - for dbID, dbReplace := range tt.initial { - globalIdMap[dbID] = dbReplace.DbID - for tblID, tblReplace := range dbReplace.TableMap { - globalIdMap[tblID] = tblReplace.TableID - for partID, partDownID := range tblReplace.PartitionMap { - globalIdMap[partID] = partDownID - } - } - } - tm.globalIdMap = globalIdMap - tm.FilterDBReplaceMap(tt.filter) // verify DBReplaceMap is as expected require.Equal(t, tt.expected, tm.DBReplaceMap) - - // verify globalIdMap is properly filtered as well - for dbID, dbReplace := range tt.expected { - require.Equal(t, dbReplace.DbID, tm.globalIdMap[dbID]) - for tblID, tblReplace := range dbReplace.TableMap { - require.Equal(t, tblReplace.TableID, tm.globalIdMap[tblID]) - for partID, partDownID := range tblReplace.PartitionMap { - require.Equal(t, partDownID, tm.globalIdMap[partID]) - } - } - } - - // verify that filtered IDs are removed from globalIdMap - for upID := range globalIdMap { - found := false - for dbID, dbReplace := range tt.expected { - if upID == dbID { - found = true - break - } - for tblID, tblReplace := range dbReplace.TableMap { - if upID == tblID { - found = true - break - } - for partID := range tblReplace.PartitionMap { - if upID == partID { - found = true - break - } - } - } - } - if !found { - _, exists := tm.globalIdMap[upID] - require.False(t, exists, "ID %d should have been removed from globalIdMap", upID) - } - } }) } } @@ -913,18 +862,6 @@ func TestReplaceTemporaryIDs(t *testing.T) { tm.DBReplaceMap = tt.initial tm.tempIDCounter = tt.tempCounter - globalIdMap := make(map[UpstreamID]DownstreamID) - for dbID, dbReplace := range tt.initial { - globalIdMap[dbID] = dbReplace.DbID - for tblID, tblReplace := range dbReplace.TableMap { - globalIdMap[tblID] = tblReplace.TableID - for partID, partDownID := range tblReplace.PartitionMap { - globalIdMap[partID] = partDownID - } - } - } - tm.globalIdMap = globalIdMap - err := tm.ReplaceTemporaryIDs(context.Background(), tt.genGlobalIDs) if tt.expectedErr != nil { @@ -936,22 +873,6 @@ func TestReplaceTemporaryIDs(t *testing.T) { require.NoError(t, err) require.Equal(t, tt.expected, tm.DBReplaceMap) require.Equal(t, InitialTempId, tm.tempIDCounter) - - // verify globalIdMap is properly updated as well - for dbID, dbReplace := range tt.expected { - require.Equal(t, dbReplace.DbID, tm.globalIdMap[dbID]) - for tblID, tblReplace := range dbReplace.TableMap { - require.Equal(t, tblReplace.TableID, tm.globalIdMap[tblID]) - for partID, partDownID := range tblReplace.PartitionMap { - require.Equal(t, partDownID, tm.globalIdMap[partID]) - } - } - } - - // verify no temporary IDs remain - for _, id := range tm.globalIdMap { - require.False(t, id < 0, "temporary ID %d still exists in globalIdMap", id) - } }) } } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 774580a85fa0aa..cd8fb4ee226ed3 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -2022,11 +2022,7 @@ func waitUntilSchemaReload(ctx context.Context, client *logclient.LogClient) err } func isCurrentIdMapSaved(checkpointTaskInfo *checkpoint.CheckpointTaskInfoForLogRestore) bool { - newTask := false - if checkpointTaskInfo != nil && checkpointTaskInfo.Progress == checkpoint.InLogRestoreAndIdMapPersisted { - newTask = true - } - return newTask + return checkpointTaskInfo != nil && checkpointTaskInfo.Progress == checkpoint.InLogRestoreAndIdMapPersisted } func buildSchemaReplace( diff --git a/br/pkg/utils/schema.go b/br/pkg/utils/schema.go index ae5f98589ff43a..1d3c473928d6ce 100644 --- a/br/pkg/utils/schema.go +++ b/br/pkg/utils/schema.go @@ -38,9 +38,9 @@ func IsTemplateSysDB(dbname pmodel.CIStr) bool { // IsSysDB tests whether the database is system DB. // Currently, both `mysql` and `sys` are system DB. -func IsSysDB(dbLowerName string) bool { +func IsSysDB(dbName string) bool { // just in case - dbLowerName = strings.ToLower(dbLowerName) + dbLowerName := strings.ToLower(dbName) return dbLowerName == mysql.SystemDB || dbLowerName == mysql.SysDB } diff --git a/br/tests/config/tikv.toml b/br/tests/config/tikv.toml index 07eba85bc268d2..a469b389989e7a 100644 --- a/br/tests/config/tikv.toml +++ b/br/tests/config/tikv.toml @@ -36,6 +36,3 @@ path = "/tmp/backup_restore_test/master-key-file" [log-backup] max-flush-interval = "50s" -[gc] -ratio-threshold = -1.0 -