diff --git a/br/pkg/backup/BUILD.bazel b/br/pkg/backup/BUILD.bazel index f80a1cef6a4ef..cf0f5d5c5f963 100644 --- a/br/pkg/backup/BUILD.bazel +++ b/br/pkg/backup/BUILD.bazel @@ -69,7 +69,7 @@ go_test( embed = [":backup"], flaky = True, race = "on", - shard_count = 8, + shard_count = 9, deps = [ "//br/pkg/conn", "//br/pkg/gluetidb", diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 7bf187ffce6fb..3e9574e2dad51 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -12,6 +12,7 @@ import ( "io" "math/rand" "os" + "sort" "strings" "sync" "time" @@ -735,8 +736,37 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S if err != nil { return errors.Trace(err) } + + // determine whether the jobs need to be append into `allJobs` + appendJobsFn := func(jobs []*model.Job) ([]*model.Job, bool) { + appendJobs := make([]*model.Job, 0, len(jobs)) + for _, job := range jobs { + if skipUnsupportedDDLJob(job) { + continue + } + if job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion <= lastSchemaVersion { + // early exits to stop unnecessary scan + return appendJobs, true + } + + if (job.State == model.JobStateDone || job.State == model.JobStateSynced) && + (job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) { + if job.BinlogInfo.DBInfo != nil { + // ignore all placement policy info during incremental backup for now. + job.BinlogInfo.DBInfo.PlacementPolicyRef = nil + } + if job.BinlogInfo.TableInfo != nil { + // ignore all placement policy info during incremental backup for now. + job.BinlogInfo.TableInfo.ClearPlacement() + } + appendJobs = append(appendJobs, job) + } + } + return appendJobs, false + } + newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver))) - allJobs := make([]*model.Job, 0) + var allJobs []*model.Job err = g.UseOneShotSession(store, !needDomain, func(se glue.Session) error { allJobs, err = ddl.GetAllDDLJobs(se.GetSessionCtx()) if err != nil { @@ -749,41 +779,49 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S return errors.Trace(err) } - historyJobs, err := ddl.GetAllHistoryDDLJobs(newestMeta) + // filter out the jobs + allJobs, _ = appendJobsFn(allJobs) + + historyJobsIter, err := ddl.GetLastHistoryDDLJobsIterator(newestMeta) if err != nil { return errors.Trace(err) } - log.Debug("get history jobs", zap.Int("jobs", len(historyJobs))) - allJobs = append(allJobs, historyJobs...) - count := 0 + count := len(allJobs) + + cacheJobs := make([]*model.Job, 0, ddl.DefNumHistoryJobs) + for { + cacheJobs, err = historyJobsIter.GetLastJobs(ddl.DefNumHistoryJobs, cacheJobs) + if err != nil { + return errors.Trace(err) + } + if len(cacheJobs) == 0 { + // no more jobs + break + } + jobs, finished := appendJobsFn(cacheJobs) + count += len(jobs) + allJobs = append(allJobs, jobs...) + if finished { + // no more jobs between [LastTS, ts] + break + } + } + log.Debug("get complete jobs", zap.Int("jobs", count)) + // sort by job id with ascend order + sort.Slice(allJobs, func(i, j int) bool { + return allJobs[i].ID < allJobs[j].ID + }) for _, job := range allJobs { - if skipUnsupportedDDLJob(job) { - continue + jobBytes, err := json.Marshal(job) + if err != nil { + return errors.Trace(err) } - - if (job.State == model.JobStateDone || job.State == model.JobStateSynced) && - (job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) { - if job.BinlogInfo.DBInfo != nil { - // ignore all placement policy info during incremental backup for now. - job.BinlogInfo.DBInfo.PlacementPolicyRef = nil - } - if job.BinlogInfo.TableInfo != nil { - // ignore all placement policy info during incremental backup for now. - job.BinlogInfo.TableInfo.ClearPlacement() - } - jobBytes, err := json.Marshal(job) - if err != nil { - return errors.Trace(err) - } - err = metaWriter.Send(jobBytes, metautil.AppendDDL) - if err != nil { - return errors.Trace(err) - } - count++ + err = metaWriter.Send(jobBytes, metautil.AppendDDL) + if err != nil { + return errors.Trace(err) } } - log.Debug("get completed jobs", zap.Int("jobs", count)) return nil } diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index eeaea5cec762f..5a57069bbde94 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -166,6 +166,65 @@ func TestOnBackupRegionErrorResponse(t *testing.T) { } } +func TestGetHistoryDDLJobs(t *testing.T) { + s := createBackupSuite(t) + + tk := testkit.NewTestKit(t, s.cluster.Storage) + lastTS1, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + require.NoErrorf(t, err, "Error get last ts: %s", err) + tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;") + tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);") + lastTS2, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + require.NoErrorf(t, err, "Error get last ts: %s", err) + tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;") + tk.MustExec("DROP TABLE test_db.test_table1;") + tk.MustExec("DROP DATABASE test_db;") + tk.MustExec("CREATE DATABASE test_db;") + tk.MustExec("USE test_db;") + tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));") + tk.MustExec("RENAME TABLE test_table1 to test_table;") + tk.MustExec("RENAME TABLE test_table to test_table2;") + tk.MustExec("RENAME TABLE test_table2 to test_table;") + lastTS3, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + require.NoErrorf(t, err, "Error get last ts: %s", err) + tk.MustExec("TRUNCATE TABLE test_table;") + ts, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + require.NoErrorf(t, err, "Error get last ts: %s", err) + + checkFn := func(lastTS uint64, ts uint64, jobsCount int) { + cipher := backuppb.CipherInfo{CipherType: encryptionpb.EncryptionMethod_PLAINTEXT} + metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher) + ctx := context.Background() + metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) + s.mockGlue.SetSession(tk.Session()) + err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.cluster.Storage, lastTS, ts, false) + require.NoErrorf(t, err, "Error get ddl jobs: %s", err) + err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL) + require.NoError(t, err, "Flush failed", err) + err = metaWriter.FlushBackupMeta(ctx) + require.NoError(t, err, "Finally flush backup meta failed", err) + + metaBytes, err := s.storage.ReadFile(ctx, metautil.MetaFile) + require.NoError(t, err) + mockMeta := &backuppb.BackupMeta{} + err = proto.Unmarshal(metaBytes, mockMeta) + require.NoError(t, err) + // check the schema version + metaReader := metautil.NewMetaReader(mockMeta, s.storage, &cipher) + allDDLJobsBytes, err := metaReader.ReadDDLs(ctx) + require.NoError(t, err) + var allDDLJobs []*model.Job + err = json.Unmarshal(allDDLJobsBytes, &allDDLJobs) + require.NoError(t, err) + require.Len(t, allDDLJobs, jobsCount) + } + + checkFn(lastTS1, ts, 11) + checkFn(lastTS2, ts, 9) + checkFn(lastTS1, lastTS2, 2) + checkFn(lastTS3, ts, 1) +} + func TestSkipUnsupportedDDLJob(t *testing.T) { s := createBackupSuite(t)