Skip to content

Commit

Permalink
add one more unit test, and some minor fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Jan 3, 2025
1 parent d1050cb commit 5fd7504
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 55 deletions.
2 changes: 1 addition & 1 deletion br/pkg/restore/log_client/batch_meta_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (rp *RestoreMetaKVProcessor) RestoreAndRewriteMetaKVFiles(
return errors.Trace(err)
}

// UpdateTable global schema version to trigger a full reload so every TiDB node in the cluster will get synced with
// AddTable global schema version to trigger a full reload so every TiDB node in the cluster will get synced with
// the latest schema update.
if err := rp.client.UpdateSchemaVersionFullReload(ctx); err != nil {
return errors.Trace(err)
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ func readFilteredFullBackupTables(
ctx context.Context,
s storage.ExternalStorage,
tableFilter filter.Filter,
piTRTableFilter *utils.PiTRTableFilter,
piTRTableFilter *utils.PiTRTableTracker,
cipherInfo *backuppb.CipherInfo,
) (map[int64]*metautil.Table, error) {
metaData, err := s.ReadFile(ctx, metautil.MetaFile)
Expand Down Expand Up @@ -934,7 +934,7 @@ type GetIDMapConfig struct {
// optional
FullBackupStorage *FullBackupStorageConfig
CipherInfo *backuppb.CipherInfo
PiTRTableFilter *utils.PiTRTableFilter // generated table filter that contain all the table id that needs to restore
PiTRTableFilter *utils.PiTRTableTracker // generated table filter that contain all the table id that needs to restore
}

const UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL = "UNSAFE_PITR_LOG_RESTORE_START_BEFORE_ANY_UPSTREAM_USER_DDL"
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/stream/table_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (tm *TableMappingManager) ReplaceTemporaryIDs(
return nil
}

func (tm *TableMappingManager) FilterDBReplaceMap(filter *utils.PiTRTableFilter) {
func (tm *TableMappingManager) FilterDBReplaceMap(filter *utils.PiTRTableTracker) {
// collect all IDs that should be kept
keepIDs := make(map[UpstreamID]struct{})

Expand Down
22 changes: 11 additions & 11 deletions br/pkg/stream/table_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func TestFilterDBReplaceMap(t *testing.T) {
tests := []struct {
name string
initial map[UpstreamID]*DBReplace
filter *utils.PiTRTableFilter
filter *utils.PiTRTableTracker
expected map[UpstreamID]*DBReplace
}{
{
Expand All @@ -378,8 +378,8 @@ func TestFilterDBReplaceMap(t *testing.T) {
},
},
},
filter: &utils.PiTRTableFilter{
DbIdToTable: map[int64]map[int64]struct{}{},
filter: &utils.PiTRTableTracker{
DBIdToTable: map[int64]map[int64]struct{}{},
},
expected: map[UpstreamID]*DBReplace{},
},
Expand All @@ -401,8 +401,8 @@ func TestFilterDBReplaceMap(t *testing.T) {
},
},
},
filter: &utils.PiTRTableFilter{
DbIdToTable: map[int64]map[int64]struct{}{
filter: &utils.PiTRTableTracker{
DBIdToTable: map[int64]map[int64]struct{}{
1: {10: struct{}{}},
},
},
Expand All @@ -429,8 +429,8 @@ func TestFilterDBReplaceMap(t *testing.T) {
},
},
},
filter: &utils.PiTRTableFilter{
DbIdToTable: map[int64]map[int64]struct{}{
filter: &utils.PiTRTableTracker{
DBIdToTable: map[int64]map[int64]struct{}{
1: {
10: struct{}{},
12: struct{}{},
Expand Down Expand Up @@ -474,8 +474,8 @@ func TestFilterDBReplaceMap(t *testing.T) {
},
},
},
filter: &utils.PiTRTableFilter{
DbIdToTable: map[int64]map[int64]struct{}{
filter: &utils.PiTRTableTracker{
DBIdToTable: map[int64]map[int64]struct{}{
1: {10: struct{}{}},
},
},
Expand Down Expand Up @@ -523,8 +523,8 @@ func TestFilterDBReplaceMap(t *testing.T) {
},
},
},
filter: &utils.PiTRTableFilter{
DbIdToTable: map[int64]map[int64]struct{}{
filter: &utils.PiTRTableTracker{
DBIdToTable: map[int64]map[int64]struct{}{
1: {10: struct{}{}},
2: {
20: struct{}{},
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ type Config struct {
TableFilter filter.Filter `json:"-" toml:"-"`
// PiTRTableFilter generated from TableFilter during snapshot restore, it has all the db id and table id that needs
// to be restored
PiTRTableFilter *utils.PiTRTableFilter `json:"-" toml:"-"`
SwitchModeInterval time.Duration `json:"switch-mode-interval" toml:"switch-mode-interval"`
PiTRTableFilter *utils.PiTRTableTracker `json:"-" toml:"-"`
SwitchModeInterval time.Duration `json:"switch-mode-interval" toml:"switch-mode-interval"`
// Schemas is a database name set, to check whether the restore database has been backup
Schemas map[string]struct{}
// Tables is a table name set, to check whether the restore table has been backup
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1468,7 +1468,7 @@ func adjustTablesToRestoreAndCreateFilter(
newlyCreatedDBs := logBackupTableHistory.GetNewlyCreatedDBHistory()
for dbId, dbName := range newlyCreatedDBs {
if utils.MatchSchema(cfg.TableFilter, dbName) {
piTRTableFilter.UpdateDB(dbId)
piTRTableFilter.AddDB(dbId)
}
}

Expand Down Expand Up @@ -1505,7 +1505,7 @@ func adjustTablesToRestoreAndCreateFilter(
// put this db/table id into pitr filter as it matches with user's filter
// have to update filter here since table might be empty or not in snapshot so nothing will be returned .
// but we still need to capture this table id to restore during log restore.
piTRTableFilter.UpdateTable(end.DbID, tableID)
piTRTableFilter.AddTable(end.DbID, tableID)

// check if snapshot contains the original db/table
originalDB, exists := snapshotDBMap[start.DbID]
Expand Down Expand Up @@ -1564,7 +1564,7 @@ func adjustTablesToRestoreAndCreateFilter(

func UpdatePiTRFilter(cfg *RestoreConfig, tableMap map[int64]*metautil.Table) {
for _, table := range tableMap {
cfg.PiTRTableFilter.UpdateTable(table.DB.ID, table.Info.ID)
cfg.PiTRTableFilter.AddTable(table.DB.ID, table.Info.ID)
}
}

Expand Down
2 changes: 0 additions & 2 deletions br/pkg/task/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,6 @@ func TestFilterDDLJobs(t *testing.T) {
ddlJobs := task.FilterDDLJobs(allDDLJobs, tables)
for _, job := range ddlJobs {
t.Logf("get ddl job: %s", job.Query)
t.Logf("table name: %s", job.TableName)
t.Logf("dbid: %s", job.SchemaName)
}
require.Equal(t, 7, len(ddlJobs))
}
Expand Down
10 changes: 7 additions & 3 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/util/cdcutil"
"github.com/spf13/pflag"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -1301,9 +1302,12 @@ func RunStreamRestore(
if err != nil {
return errors.Trace(err)
}
// TODO: pitr filtered restore doesn't support restore system table yet, hacky way to override the sys filter here
// TODO: pitr filtered restore doesn't support restore system table yet
if cfg.ExplicitFilter {
// add some check
if cfg.TableFilter.MatchSchema(mysql.SystemDB) {
return errors.Annotatef(berrors.ErrInvalidArgument,
"PiTR doesn't support custom filter to include system db, consider to exclude system db")
}
}
metaInfoProcessor := logclient.NewMetaKVInfoProcessor(logClient)
// only doesn't need to build if id map has been saved during log restore
Expand All @@ -1318,7 +1322,7 @@ func RunStreamRestore(
return errors.Trace(err)
}
dbReplace := metaInfoProcessor.GetTableMappingManager().DBReplaceMap
stream.LogDBReplaceMap("scanning log meta kv before snapshot restore", dbReplace)
stream.LogDBReplaceMap("scanned log meta kv before snapshot restore", dbReplace)
}

// restore full snapshot.
Expand Down
1 change: 1 addition & 0 deletions br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ go_test(
"backoff_test.go",
"db_test.go",
"error_handling_test.go",
"filter_test.go",
"json_test.go",
"key_test.go",
"main_test.go",
Expand Down
61 changes: 31 additions & 30 deletions br/pkg/utils/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,44 +22,45 @@ import (
filter "github.com/pingcap/tidb/pkg/util/table-filter"
)

type PiTRTableFilter struct {
DbIdToTable map[int64]map[int64]struct{}
// PiTRTableTracker tracks all the DB and table ids that need to restore in a PiTR
type PiTRTableTracker struct {
DBIdToTable map[int64]map[int64]struct{}
}

func NewPiTRTableFilter() *PiTRTableFilter {
return &PiTRTableFilter{
DbIdToTable: make(map[int64]map[int64]struct{}),
func NewPiTRTableFilter() *PiTRTableTracker {
return &PiTRTableTracker{
DBIdToTable: make(map[int64]map[int64]struct{}),
}
}

// UpdateTable adds a table ID to the filter for the given database ID
func (f *PiTRTableFilter) UpdateTable(dbID, tableID int64) {
if f.DbIdToTable == nil {
f.DbIdToTable = make(map[int64]map[int64]struct{})
// AddTable adds a table ID to the filter for the given database ID
func (f *PiTRTableTracker) AddTable(dbID, tableID int64) {
if f.DBIdToTable == nil {
f.DBIdToTable = make(map[int64]map[int64]struct{})
}

if _, ok := f.DbIdToTable[dbID]; !ok {
f.DbIdToTable[dbID] = make(map[int64]struct{})
if _, ok := f.DBIdToTable[dbID]; !ok {
f.DBIdToTable[dbID] = make(map[int64]struct{})
}

f.DbIdToTable[dbID][tableID] = struct{}{}
f.DBIdToTable[dbID][tableID] = struct{}{}
}

// UpdateDB adds the database id
func (f *PiTRTableFilter) UpdateDB(dbID int64) {
if f.DbIdToTable == nil {
f.DbIdToTable = make(map[int64]map[int64]struct{})
// AddDB adds the database id
func (f *PiTRTableTracker) AddDB(dbID int64) {
if f.DBIdToTable == nil {
f.DBIdToTable = make(map[int64]map[int64]struct{})
}

if _, ok := f.DbIdToTable[dbID]; !ok {
f.DbIdToTable[dbID] = make(map[int64]struct{})
if _, ok := f.DBIdToTable[dbID]; !ok {
f.DBIdToTable[dbID] = make(map[int64]struct{})
}
}

// Remove removes a table ID from the filter for the given database ID.
// Returns true if the table was found and removed, false otherwise.
func (f *PiTRTableFilter) Remove(dbID, tableID int64) bool {
if tables, ok := f.DbIdToTable[dbID]; ok {
func (f *PiTRTableTracker) Remove(dbID, tableID int64) bool {
if tables, ok := f.DBIdToTable[dbID]; ok {
if _, exists := tables[tableID]; exists {
delete(tables, tableID)
return true
Expand All @@ -69,29 +70,29 @@ func (f *PiTRTableFilter) Remove(dbID, tableID int64) bool {
}

// ContainsTable checks if the given database ID and table ID combination exists in the filter
func (f *PiTRTableFilter) ContainsTable(dbID, tableID int64) bool {
if tables, ok := f.DbIdToTable[dbID]; ok {
func (f *PiTRTableTracker) ContainsTable(dbID, tableID int64) bool {
if tables, ok := f.DBIdToTable[dbID]; ok {
_, exists := tables[tableID]
return exists
}
return false
}

// ContainsDB checks if the given database ID exists in the filter
func (f *PiTRTableFilter) ContainsDB(dbID int64) bool {
_, ok := f.DbIdToTable[dbID]
func (f *PiTRTableTracker) ContainsDB(dbID int64) bool {
_, ok := f.DBIdToTable[dbID]
return ok
}

// String returns a string representation of the PiTRTableFilter for debugging
func (f *PiTRTableFilter) String() string {
if f == nil || f.DbIdToTable == nil {
return "PiTRTableFilter{nil}"
// String returns a string representation of the PiTRTableTracker for debugging
func (f *PiTRTableTracker) String() string {
if f == nil || f.DBIdToTable == nil {
return "PiTRTableTracker{nil}"
}

var result strings.Builder
result.WriteString("PiTRTableFilter{\n")
for dbID, tables := range f.DbIdToTable {
result.WriteString("PiTRTableTracker{\n")
for dbID, tables := range f.DBIdToTable {
result.WriteString(fmt.Sprintf(" DB[%d]: {", dbID))
tableIDs := make([]int64, 0, len(tables))
for tableID := range tables {
Expand Down
53 changes: 53 additions & 0 deletions br/pkg/utils/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package utils

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestPiTRTableTracker(t *testing.T) {
t.Run("test new tracker", func(t *testing.T) {
tracker := NewPiTRTableFilter()
require.NotNil(t, tracker)
require.NotNil(t, tracker.DBIdToTable)
require.Empty(t, tracker.DBIdToTable)
})

t.Run("test update and contains table", func(t *testing.T) {
tracker := NewPiTRTableFilter()

tracker.AddDB(1)
tracker.AddTable(1, 100)
tracker.AddDB(2)
require.True(t, tracker.ContainsDB(1))
require.True(t, tracker.ContainsDB(2))
require.True(t, tracker.ContainsTable(1, 100))
require.False(t, tracker.ContainsTable(1, 101))
require.False(t, tracker.ContainsTable(2, 100))

tracker.AddTable(1, 101)
tracker.AddTable(2, 200)
require.True(t, tracker.ContainsTable(1, 100))
require.True(t, tracker.ContainsTable(1, 101))
require.True(t, tracker.ContainsTable(2, 200))

tracker.AddTable(3, 300)
require.True(t, tracker.ContainsDB(3))
require.True(t, tracker.ContainsTable(3, 300))
})

t.Run("test remove table", func(t *testing.T) {
tracker := NewPiTRTableFilter()

tracker.AddTable(1, 100)
tracker.AddTable(1, 101)

require.True(t, tracker.Remove(1, 100))
require.False(t, tracker.ContainsTable(1, 100))
require.True(t, tracker.ContainsTable(1, 101))

require.False(t, tracker.Remove(1, 102))
require.False(t, tracker.Remove(2, 100))
})
}

0 comments on commit 5fd7504

Please sign in to comment.