Skip to content

Commit 7c04493

Browse files
committed
initial
Signed-off-by: Wenqi Mou <[email protected]>
1 parent 49c3eba commit 7c04493

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1414
-608
lines changed

br/pkg/checkpoint/checkpoint_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,12 @@ func TestCheckpointMetaForRestore(t *testing.T) {
105105
exists := checkpoint.ExistsCheckpointProgress(ctx, dom)
106106
require.False(t, exists)
107107
err = checkpoint.SaveCheckpointProgress(ctx, se, &checkpoint.CheckpointProgress{
108-
Progress: checkpoint.InLogRestoreAndIdMapPersist,
108+
Progress: checkpoint.InLogRestoreAndIdMapPersisted,
109109
})
110110
require.NoError(t, err)
111111
progress, err := checkpoint.LoadCheckpointProgress(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor())
112112
require.NoError(t, err)
113-
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersist, progress.Progress)
113+
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, progress.Progress)
114114

115115
taskInfo, err := checkpoint.TryToGetCheckpointTaskInfo(ctx, s.Mock.Domain, se.GetSessionCtx().GetRestrictedSQLExecutor())
116116
require.NoError(t, err)
@@ -120,7 +120,7 @@ func TestCheckpointMetaForRestore(t *testing.T) {
120120
require.Equal(t, uint64(333), taskInfo.Metadata.RewriteTS)
121121
require.Equal(t, "1.0", taskInfo.Metadata.GcRatio)
122122
require.Equal(t, true, taskInfo.HasSnapshotMetadata)
123-
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersist, taskInfo.Progress)
123+
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, taskInfo.Progress)
124124

125125
exists = checkpoint.ExistsCheckpointIngestIndexRepairSQLs(ctx, dom)
126126
require.False(t, exists)

br/pkg/checkpoint/log_restore.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -205,22 +205,22 @@ func ExistsLogRestoreCheckpointMetadata(
205205
TableExists(pmodel.NewCIStr(LogRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointMetaTableName))
206206
}
207207

208-
// A progress type for snapshot + log restore.
208+
// RestoreProgress is a progress type for snapshot + log restore.
209209
//
210-
// Before the id-maps is persist into external storage, the snapshot restore and
211-
// id-maps constructure can be repeated. So if the progress is in `InSnapshotRestore`,
210+
// Before the id-maps is persisted into external storage, the snapshot restore and
211+
// id-maps building can be retried. So if the progress is in `InSnapshotRestore`,
212212
// it can retry from snapshot restore.
213213
//
214-
// After the id-maps is persist into external storage, there are some meta-kvs has
215-
// been restored into the cluster, such as `rename ddl`. Where would be a situation:
214+
// After the id-maps is persisted into external storage, there are some meta-kvs has
215+
// been restored into the cluster, such as `rename ddl`. A situation could be:
216216
//
217217
// the first execution:
218218
//
219219
// table A created in snapshot restore is renamed to table B in log restore
220220
// table A (id 80) --------------> table B (id 80)
221221
// ( snapshot restore ) ( log restore )
222222
//
223-
// the second execution if don't skip snasphot restore:
223+
// the second execution if don't skip snapshot restore:
224224
//
225225
// table A is created again in snapshot restore, because there is no table named A
226226
// table A (id 81) --------------> [not in id-maps, so ignored]
@@ -232,8 +232,8 @@ type RestoreProgress int
232232

233233
const (
234234
InSnapshotRestore RestoreProgress = iota
235-
// Only when the id-maps is persist, status turns into it.
236-
InLogRestoreAndIdMapPersist
235+
// Only when the id-maps is persisted, status turns into it.
236+
InLogRestoreAndIdMapPersisted
237237
)
238238

239239
type CheckpointProgress struct {
@@ -265,8 +265,7 @@ func ExistsCheckpointProgress(
265265
TableExists(pmodel.NewCIStr(LogRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointProgressTableName))
266266
}
267267

268-
// CheckpointTaskInfo is unique information within the same cluster id. It represents the last
269-
// restore task executed for this cluster.
268+
// CheckpointTaskInfoForLogRestore is tied to a specific cluster. It represents the last restore task executed this cluster.
270269
type CheckpointTaskInfoForLogRestore struct {
271270
Metadata *CheckpointMetadataForLogRestore
272271
HasSnapshotMetadata bool

br/pkg/restore/import_mode_switcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func (switcher *ImportModeSwitcher) switchToImportMode(
148148
}()
149149
}
150150

151-
// RestorePreWork executes some prepare work before restore.
151+
// RestorePreWork switches to import mode and removes pd schedulers if needed
152152
// TODO make this function returns a restore post work.
153153
func RestorePreWork(
154154
ctx context.Context,

br/pkg/restore/log_client/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
33
go_library(
44
name = "log_client",
55
srcs = [
6+
"batch_file_processor.go",
67
"client.go",
78
"import.go",
89
"import_retry.go",
@@ -33,6 +34,7 @@ go_library(
3334
"//br/pkg/stream",
3435
"//br/pkg/summary",
3536
"//br/pkg/utils",
37+
"//br/pkg/utils/consts",
3638
"//br/pkg/utils/iter",
3739
"//br/pkg/version",
3840
"//pkg/ddl/util",
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Copyright 2024 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package logclient
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
21+
"github.com/pingcap/errors"
22+
backuppb "github.com/pingcap/kvproto/pkg/brpb"
23+
"github.com/pingcap/log"
24+
"github.com/pingcap/tidb/br/pkg/stream"
25+
"github.com/pingcap/tidb/br/pkg/utils"
26+
"github.com/pingcap/tidb/br/pkg/utils/consts"
27+
"github.com/pingcap/tidb/pkg/meta"
28+
"github.com/pingcap/tidb/pkg/meta/model"
29+
"go.uber.org/zap"
30+
)
31+
32+
// BatchFileProcessor defines how to process a batch of files
33+
type BatchFileProcessor interface {
34+
// process a batch of files and with a filterTS and return what's not processed for next iteration
35+
processBatch(
36+
ctx context.Context,
37+
files []*backuppb.DataFileInfo,
38+
entries []*KvEntryWithTS,
39+
filterTS uint64,
40+
cf string,
41+
) ([]*KvEntryWithTS, error)
42+
}
43+
44+
// RestoreProcessor implements BatchFileProcessor for restoring files
45+
type RestoreProcessor struct {
46+
client *LogClient
47+
schemasReplace *stream.SchemasReplace
48+
updateStats func(kvCount uint64, size uint64)
49+
progressInc func()
50+
}
51+
52+
func (rp *RestoreProcessor) processBatch(
53+
ctx context.Context,
54+
files []*backuppb.DataFileInfo,
55+
entries []*KvEntryWithTS,
56+
filterTS uint64,
57+
cf string,
58+
) ([]*KvEntryWithTS, error) {
59+
return rp.client.RestoreBatchMetaKVFiles(
60+
ctx, files, rp.schemasReplace, entries,
61+
filterTS, rp.updateStats, rp.progressInc, cf,
62+
)
63+
}
64+
65+
// DDLCollector implements BatchFileProcessor for collecting DDL information
66+
// 1. It collects table renaming information. The table rename operation will not change the table id, and the process
67+
// will drop the original table and create a new one with the same table id, so in DDL history there will be two events
68+
// that corresponds to the same table id.
69+
//
70+
// add more logic in future if needed
71+
type DDLCollector struct {
72+
client *LogClient
73+
tableRenameInfo *stream.LogBackupTableHistory
74+
}
75+
76+
func (dc *DDLCollector) processBatch(
77+
ctx context.Context,
78+
files []*backuppb.DataFileInfo,
79+
entries []*KvEntryWithTS,
80+
filterTS uint64,
81+
cf string,
82+
) ([]*KvEntryWithTS, error) {
83+
// doesn't need to parse writeCF as it contains value like "p\XXXX\XXX" which is meaningless.
84+
// DefaultCF value should contain everything we want for DDL operation
85+
if cf == consts.WriteCF {
86+
return nil, nil
87+
}
88+
89+
curSortedEntries, filteredEntries, err := dc.client.filterAndSortKvEntriesFromFiles(ctx, files, entries, filterTS)
90+
if err != nil {
91+
return nil, errors.Trace(err)
92+
}
93+
94+
// process entries to collect table IDs
95+
for _, entry := range curSortedEntries {
96+
value := entry.E.Value
97+
98+
if utils.IsMetaDBKey(entry.E.Key) {
99+
rawKey, err := stream.ParseTxnMetaKeyFrom(entry.E.Key)
100+
if err != nil {
101+
return nil, errors.Trace(err)
102+
}
103+
104+
// collect db id -> name mapping during log backup, it will contain information about newly created db
105+
if meta.IsDBkey(rawKey.Field) {
106+
var dbInfo model.DBInfo
107+
if err := json.Unmarshal(value, &dbInfo); err != nil {
108+
return nil, errors.Trace(err)
109+
}
110+
dc.tableRenameInfo.RecordDBIdToName(dbInfo.ID, dbInfo.Name.O)
111+
} else if !meta.IsDBkey(rawKey.Key) {
112+
// also see RewriteMetaKvEntry
113+
continue
114+
}
115+
116+
// collect table history indexed by table id, same id may have different table names in history
117+
if meta.IsTableKey(rawKey.Field) {
118+
var tableInfo model.TableInfo
119+
if err := json.Unmarshal(value, &tableInfo); err != nil {
120+
return nil, errors.Trace(err)
121+
}
122+
// cannot use dbib in the parsed table info cuz it might not set so default to 0
123+
dbID, err := meta.ParseDBKey(rawKey.Key)
124+
if err != nil {
125+
return nil, errors.Trace(err)
126+
}
127+
128+
log.Info("######################################## adding table info", zap.Int64("tableid", tableInfo.ID), zap.String("table name", tableInfo.Name.O), zap.Int64("db id", dbID))
129+
dc.tableRenameInfo.AddTableHistory(tableInfo.ID, tableInfo.Name.String(), dbID)
130+
}
131+
}
132+
}
133+
return filteredEntries, nil
134+
}

0 commit comments

Comments
 (0)