From 326f4fb92878db80be62993c8af1dc0006055edd Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Thu, 23 Jan 2025 12:21:01 -0500 Subject: [PATCH] update table mapping Signed-off-by: Wenqi Mou --- br/pkg/stream/table_mapping.go | 182 ++++++++++++++++++++++++--------- 1 file changed, 133 insertions(+), 49 deletions(-) diff --git a/br/pkg/stream/table_mapping.go b/br/pkg/stream/table_mapping.go index ff44f1fdc7b35..b0dde8a268029 100644 --- a/br/pkg/stream/table_mapping.go +++ b/br/pkg/stream/table_mapping.go @@ -17,11 +17,9 @@ package stream import ( "context" "encoding/json" - "fmt" "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/meta/model" @@ -60,6 +58,9 @@ func NewTableMappingManager( } // ParseMetaKvAndUpdateIdMapping collect table information +// the keys and values that are selected to parse here follows the implementation in rewrite_meta_rawkv. Maybe +// parsing a subset of these keys/values would suffice, but to make it safe we decide to parse exactly same as +// in rewrite_meta_rawkv. func (tc *TableMappingManager) ParseMetaKvAndUpdateIdMapping(e *kv.Entry, cf string) error { if !IsMetaDBKey(e.Key) { return nil @@ -70,92 +71,175 @@ func (tc *TableMappingManager) ParseMetaKvAndUpdateIdMapping(e *kv.Entry, cf str return errors.Trace(err) } - value, err := extractValue(e, cf) - if err != nil { - return errors.Trace(err) - } - // sanity check - if value == nil { - log.Warn("entry suggests having short value but is nil") - return nil - } - if meta.IsDBkey(rawKey.Field) { - return tc.parseDBValueAndUpdateIdMapping(value) + // parse db key + err := tc.parseDBKeyAndUpdateIdMapping(rawKey.Field) + if err != nil { + return errors.Trace(err) + } + + // parse value and update if exists + value, err := extractValue(e, cf) + if err != nil { + return errors.Trace(err) + } + if value != nil { + return tc.parseDBValueAndUpdateIdMapping(value) + } } else if !meta.IsDBkey(rawKey.Key) { return nil } if meta.IsTableKey(rawKey.Field) { - dbID, err := ParseDBIDFromTableKey(e.Key) + dbID, err := meta.ParseDBKey(rawKey.Key) + if err != nil { + return errors.Trace(err) + } + + // parse table key and update + err = tc.parseTableIdAndUpdateIdMapping(rawKey.Key, rawKey.Field, meta.ParseTableKey) + if err != nil { + return errors.Trace(err) + } + + // parse value and update if exists + value, err := extractValue(e, cf) + if err != nil { + return errors.Trace(err) + } + if value != nil { + return tc.parseTableValueAndUpdateIdMapping(dbID, value) + } + } else if meta.IsAutoIncrementIDKey(rawKey.Field) { + // parse auto increment key and update + err = tc.parseTableIdAndUpdateIdMapping(rawKey.Key, rawKey.Field, meta.ParseAutoIncrementIDKey) + if err != nil { + return errors.Trace(err) + } + } else if meta.IsAutoTableIDKey(rawKey.Field) { + // parse auto table key and update + err = tc.parseTableIdAndUpdateIdMapping(rawKey.Key, rawKey.Field, meta.ParseAutoTableIDKey) + if err != nil { + return errors.Trace(err) + } + } else if meta.IsSequenceKey(rawKey.Field) { + // parse sequence key and update + err = tc.parseTableIdAndUpdateIdMapping(rawKey.Key, rawKey.Field, meta.ParseSequenceKey) + if err != nil { + return errors.Trace(err) + } + } else if meta.IsAutoRandomTableIDKey(rawKey.Field) { + // parse sequence key and update + err = tc.parseTableIdAndUpdateIdMapping(rawKey.Key, rawKey.Field, meta.ParseAutoRandomTableIDKey) if err != nil { return errors.Trace(err) } - return tc.parseTableValueAndUpdateIdMapping(dbID, value) } + return nil } +func (tc *TableMappingManager) parseDBKeyAndUpdateIdMapping(field []byte) error { + dbID, err := meta.ParseDBKey(field) + if err != nil { + return errors.Trace(err) + } + + _, err = tc.getOrCreateDBReplace(dbID) + return errors.Trace(err) +} + func (tc *TableMappingManager) parseDBValueAndUpdateIdMapping(value []byte) error { dbInfo := new(model.DBInfo) if err := json.Unmarshal(value, dbInfo); err != nil { return errors.Trace(err) } - if dr, exist := tc.DbReplaceMap[dbInfo.ID]; !exist { - newID, err := tc.genGlobalIdFn(context.Background()) - if err != nil { - return errors.Trace(err) - } - tc.DbReplaceMap[dbInfo.ID] = NewDBReplace(dbInfo.Name.O, newID) - tc.globalIdMap[dbInfo.ID] = newID - } else { - dr.Name = dbInfo.Name.O + dbReplace, err := tc.getOrCreateDBReplace(dbInfo.ID) + if err != nil { + return errors.Trace(err) } + dbReplace.Name = dbInfo.Name.O return nil } -func (tc *TableMappingManager) parseTableValueAndUpdateIdMapping(dbID int64, value []byte) error { - var ( - tableInfo model.TableInfo - err error - exist bool - dbReplace *DBReplace - tableReplace *TableReplace - ) - - if err := json.Unmarshal(value, &tableInfo); err != nil { - return errors.Trace(err) - } - - // construct or find the id map. - dbReplace, exist = tc.DbReplaceMap[dbID] +// getOrCreateDBReplace gets an existing DBReplace or creates a new one if not found +func (tc *TableMappingManager) getOrCreateDBReplace(dbID int64) (*DBReplace, error) { + dbReplace, exist := tc.DbReplaceMap[dbID] if !exist { newID, err := tc.genGlobalIdFn(context.Background()) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } tc.globalIdMap[dbID] = newID dbReplace = NewDBReplace("", newID) tc.DbReplaceMap[dbID] = dbReplace } + return dbReplace, nil +} - tableReplace, exist = dbReplace.TableMap[tableInfo.ID] +// getOrCreateTableReplace gets an existing TableReplace or creates a new one if not found +func (tc *TableMappingManager) getOrCreateTableReplace(dbReplace *DBReplace, tableID int64) (*TableReplace, error) { + tableReplace, exist := dbReplace.TableMap[tableID] if !exist { - newID, exist := tc.globalIdMap[tableInfo.ID] + newID, exist := tc.globalIdMap[tableID] if !exist { + var err error newID, err = tc.genGlobalIdFn(context.Background()) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } - tc.globalIdMap[tableInfo.ID] = newID + tc.globalIdMap[tableID] = newID } + tableReplace = NewTableReplace("", newID) + dbReplace.TableMap[tableID] = tableReplace + } + return tableReplace, nil +} + +func (tc *TableMappingManager) parseTableIdAndUpdateIdMapping( + key []byte, + field []byte, + parseField func([]byte) (tableID int64, err error)) error { + + dbID, err := meta.ParseDBKey(key) + if err != nil { + return errors.Trace(err) + } + + tableID, err := parseField(field) + if err != nil { + return errors.Trace(err) + } + + dbReplace, err := tc.getOrCreateDBReplace(dbID) + if err != nil { + return errors.Trace(err) + } + + _, err = tc.getOrCreateTableReplace(dbReplace, tableID) + if err != nil { + return errors.Trace(err) + } + return nil +} - tableReplace = NewTableReplace(tableInfo.Name.O, newID) - dbReplace.TableMap[tableInfo.ID] = tableReplace - } else { - tableReplace.Name = tableInfo.Name.O +func (tc *TableMappingManager) parseTableValueAndUpdateIdMapping(dbID int64, value []byte) error { + var tableInfo model.TableInfo + if err := json.Unmarshal(value, &tableInfo); err != nil { + return errors.Trace(err) + } + + dbReplace, err := tc.getOrCreateDBReplace(dbID) + if err != nil { + return errors.Trace(err) + } + + tableReplace, err := tc.getOrCreateTableReplace(dbReplace, tableInfo.ID) + if err != nil { + return errors.Trace(err) } + tableReplace.Name = tableInfo.Name.O // update table ID and partition ID. tableInfo.ID = tableReplace.TableID @@ -252,6 +336,6 @@ func extractValue(e *kv.Entry, cf string) ([]byte, error) { } return nil, nil default: - panic(fmt.Sprintf("not support cf:%s", cf)) + return nil, errors.Errorf("unsupported column family: %s", cf) } }