Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: update global checkpoint ts for safepoint #59210

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 33 additions & 19 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ type CheckpointAdvancer struct {

// The concurrency accessed task:
// both by the task listener and ticking.
task *backuppb.StreamBackupTaskInfo
taskRange []kv.KeyRange
taskMu sync.Mutex
task *backuppb.StreamBackupTaskInfo
taskRange []kv.KeyRange
taskServiceID string
taskMu sync.Mutex

// the read-only config.
// once tick begin, this should not be changed for now.
Expand Down Expand Up @@ -263,6 +264,10 @@ func tsoAfter(ts uint64, n time.Duration) uint64 {
return oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(n))
}

func (c *CheckpointAdvancer) safepointTTLSeconds() int64 {
return int64(max(c.cfg.GetCheckPointLagLimit(), logBackupSafePointTTL).Seconds())
}

func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull)) {
c.checkpointsMu.Lock()
defer c.checkpointsMu.Unlock()
Expand Down Expand Up @@ -418,6 +423,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
utils.LogBackupTaskCountInc()
c.task = e.Info
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
c.taskServiceID = logBackupServiceID(c.task.Name, c.task.StartTs)
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name)
if err != nil {
Expand All @@ -429,7 +435,8 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
}
log.Info("get global checkpoint", zap.Uint64("checkpoint", globalCheckpointTs))
c.lastCheckpoint = newCheckpointWithTS(globalCheckpointTs)
p, err := c.env.BlockGCUntil(ctx, globalCheckpointTs-1)
// It's OK to update safepoint by log backup task start-ts when it failed to get the global checkpoint task.
p, err := c.env.UpdateServiceGCSafePoint(ctx, c.taskServiceID, c.safepointTTLSeconds(), globalCheckpointTs-1)
if err != nil {
log.Warn("failed to upload service GC safepoint, skipping.", logutil.ShortError(err))
}
Expand All @@ -440,6 +447,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
c.task = nil
c.isPaused.Store(false)
c.taskRange = nil
c.taskServiceID = ""
// This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`.
// Do the null check because some of test cases won't equip the advancer with subscriber.
if c.subscriber != nil {
Expand All @@ -449,7 +457,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil {
log.Warn("failed to clear global checkpoint", logutil.ShortError(err))
}
if err := c.env.UnblockGC(ctx); err != nil {
if err := c.env.UnblockGC(ctx, logBackupServiceID(e.Name, e.Info.StartTs)); err != nil {
log.Warn("failed to remove service GC safepoint", logutil.ShortError(err))
}
metrics.LastCheckpoint.DeleteLabelValues(e.Name)
Expand Down Expand Up @@ -559,14 +567,10 @@ func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error {
return c.subscriber.PendingErrors()
}

func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, error) {
func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context, globalTs uint64) (bool, error) {
if c.cfg.CheckPointLagLimit <= 0 {
return false, nil
}
globalTs, err := c.env.GetGlobalCheckpointForTask(ctx, c.task.Name)
if err != nil {
return false, err
}
if globalTs < c.task.StartTs {
// unreachable.
return false, nil
Expand All @@ -590,10 +594,12 @@ func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
c.checkpointsMu.Lock()
c.setCheckpoint(c.checkpoints.Min())
c.checkpointsMu.Unlock()
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint.TS); err != nil {
newGlobalCheckpointTs, err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint.TS)
if err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
}
isLagged, err := c.isCheckpointLagged(ctx)
// tidb advancer may fail to collect new checkpoint ts, so use global checkpoint ts from PD instead.
isLagged, err := c.isCheckpointLagged(ctx, newGlobalCheckpointTs)
if err != nil {
// ignore the error, just log it
log.Warn("failed to check timestamp", logutil.ShortError(err))
Expand All @@ -605,19 +611,27 @@ func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
}
return errors.Annotate(errors.Errorf("check point lagged too large"), "check point lagged too large")
}
p, err := c.env.BlockGCUntil(ctx, c.lastCheckpoint.safeTS())
// tidb advancer may fail to collect new checkpoint ts, so use global checkpoint ts from PD instead.
safeGlobalCheckpointTs := newGlobalCheckpointTs - 1
p, err := c.env.UpdateServiceGCSafePoint(ctx, c.taskServiceID, c.safepointTTLSeconds(), safeGlobalCheckpointTs)
if err != nil {
return errors.Annotatef(err,
"failed to update service GC safe point, current checkpoint is %d, target checkpoint is %d",
c.lastCheckpoint.safeTS(), p)
safeGlobalCheckpointTs, p)
}
if p <= c.lastCheckpoint.safeTS() {
if p <= safeGlobalCheckpointTs {
log.Info("updated log backup GC safe point.",
zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint.safeTS()))
zap.Uint64("checkpoint", p), zap.Uint64("target", safeGlobalCheckpointTs))
}
if p > c.lastCheckpoint.safeTS() {
log.Warn("update log backup GC safe point failed: stale.",
zap.Uint64("checkpoint", p), zap.Uint64("target", c.lastCheckpoint.safeTS()))
if p > safeGlobalCheckpointTs {
log.Error("update log backup GC safe point failed: stale.",
zap.Uint64("checkpoint", p), zap.Uint64("target", safeGlobalCheckpointTs))
err := c.env.PauseTask(ctx, c.task.Name)
if err != nil {
return errors.Annotate(err, "failed to pause task")
}
return errors.Errorf("log backup GC safe point(%d) is stale, current minimal safe point is %d",
safeGlobalCheckpointTs, p)
}
return nil
}
Expand Down
14 changes: 9 additions & 5 deletions br/pkg/streamhelper/advancer_cliext.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,25 +273,29 @@ func (t AdvancerExt) GetGlobalCheckpointForTask(ctx context.Context, taskName st
return binary.BigEndian.Uint64(value), nil
}

func (t AdvancerExt) UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error {
func (t AdvancerExt) UploadV3GlobalCheckpointForTask(
ctx context.Context,
taskName string,
checkpoint uint64,
) (uint64, error) {
key := GlobalCheckpointOf(taskName)
value := string(encodeUint64(checkpoint))
oldValue, err := t.GetGlobalCheckpointForTask(ctx, taskName)
if err != nil {
return err
return 0, err
}

if checkpoint < oldValue {
log.Warn("skipping upload global checkpoint", zap.String("category", "log backup advancer"),
zap.Uint64("old", oldValue), zap.Uint64("new", checkpoint))
return nil
return oldValue, nil
}

_, err = t.KV.Put(ctx, key, value)
if err != nil {
return err
return 0, err
}
return nil
return checkpoint, nil
}

func (t AdvancerExt) ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error {
Expand Down
45 changes: 23 additions & 22 deletions br/pkg/streamhelper/advancer_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ package streamhelper

import (
"context"
"fmt"
"math"
"time"

"github.com/pingcap/errors"
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/config"
Expand All @@ -23,10 +23,25 @@ import (
)

const (
logBackupServiceID = "log-backup-coordinator"
logBackupSafePointTTL = 24 * time.Hour
logBackupServiceIDPrefix = "log-backup-coordinator"
logBackupSafePointTTL = 24 * time.Hour
)

// Get the service id name unique to the specified task. When there are steps as follows:
// 1. start a log backup task named `task1`.
// 2. stop the log backup task `task1`.
// 3. satrt another log backup task named `task1`.
//
// For any newly launched TiDB node(log backup advancer node), it will get the steps from PD and
// 1. set a safepoint A.
// 2. delete the safepoint A.
// 3. set a safepoint B.
//
// The advancer should ensure that A is different from B.
func logBackupServiceID(taskName string, startTs uint64) string {
return fmt.Sprintf("%s-%s-%d", logBackupServiceIDPrefix, taskName, startTs)
}

// Env is the interface required by the advancer.
type Env interface {
// The region scanner provides the region information.
Expand All @@ -45,24 +60,9 @@ type PDRegionScanner struct {
pd.Client
}

// Updates the service GC safe point for the cluster.
// Returns the minimal service GC safe point across all services.
// If the arguments is `0`, this would remove the service safe point.
func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, error) {
minimalSafePoint, err := c.UpdateServiceGCSafePoint(
ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at)
if err != nil {
return 0, errors.Annotate(err, "failed to block gc until")
}
if minimalSafePoint > at {
return 0, errors.Errorf("minimal safe point %d is greater than the target %d", minimalSafePoint, at)
}
return at, nil
}

func (c PDRegionScanner) UnblockGC(ctx context.Context) error {
func (c PDRegionScanner) UnblockGC(ctx context.Context, serviceID string) error {
// set ttl to 0, means remove the safe point.
_, err := c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, 0, math.MaxUint64)
_, err := c.UpdateServiceGCSafePoint(ctx, serviceID, 0, math.MaxUint64)
return err
}

Expand Down Expand Up @@ -172,8 +172,9 @@ type LogBackupService interface {
type StreamMeta interface {
// Begin begins listen the task event change.
Begin(ctx context.Context, ch chan<- TaskEvent) error
// UploadV3GlobalCheckpointForTask uploads the global checkpoint to the meta store.
UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error
// UploadV3GlobalCheckpointForTask uploads the global checkpoint to the meta store
// and returns the lastest global checkpoint
UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) (uint64, error)
// GetGlobalCheckpointForTask gets the global checkpoint from the meta store.
GetGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error)
// ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store.
Expand Down
11 changes: 5 additions & 6 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.Ge
// Updates the service GC safe point for the cluster.
// Returns the latest service GC safe point.
// If the arguments is `0`, this would remove the service safe point.
func (f *fakeCluster) BlockGCUntil(ctx context.Context, at uint64) (uint64, error) {
func (f *fakeCluster) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, at uint64) (uint64, error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.serviceGCSafePoint > at {
Expand All @@ -281,7 +281,7 @@ func (f *fakeCluster) BlockGCUntil(ctx context.Context, at uint64) (uint64, erro
return at, nil
}

func (f *fakeCluster) UnblockGC(ctx context.Context) error {
func (f *fakeCluster) UnblockGC(ctx context.Context, serviceID string) error {
f.mu.Lock()
defer f.mu.Unlock()
f.serviceGCSafePointDeleted = true
Expand Down Expand Up @@ -659,7 +659,6 @@ type testEnv struct {
resolveLocks func([]*txnlock.Lock, *tikv.KeyLocation) (*tikv.KeyLocation, error)

mu sync.Mutex
pd.Client
}

func newTestEnv(c *fakeCluster, t *testing.T) *testEnv {
Expand Down Expand Up @@ -689,7 +688,7 @@ func (t *testEnv) Begin(ctx context.Context, ch chan<- streamhelper.TaskEvent) e
return nil
}

func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string, checkpoint uint64) error {
func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string, checkpoint uint64) (uint64, error) {
t.mu.Lock()
defer t.mu.Unlock()

Expand All @@ -699,10 +698,10 @@ func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string,
zap.Uint64("to", checkpoint),
zap.Stack("stack"))
// t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
return errors.New("checkpoint rolling back")
return 0, errors.New("checkpoint rolling back")
}
t.checkpoint = checkpoint
return nil
return checkpoint, nil
}

func (t *testEnv) mockPDConnectionError() {
Expand Down
18 changes: 14 additions & 4 deletions br/pkg/streamhelper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,15 +359,23 @@ func testStreamCheckpoint(t *testing.T, metaCli streamhelper.AdvancerExt) {
task := "simple"
req := require.New(t)

req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 5))
newTs, err := metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 5)
req.NoError(err)
req.EqualValues(5, newTs)
ts, err := metaCli.GetGlobalCheckpointForTask(ctx, task)
req.NoError(err)
req.EqualValues(5, ts)
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 18))

newTs, err = metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 18)
req.NoError(err)
req.EqualValues(18, newTs)
ts, err = metaCli.GetGlobalCheckpointForTask(ctx, task)
req.NoError(err)
req.EqualValues(18, ts)
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 16))

newTs, err = metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 16)
req.NoError(err)
req.EqualValues(18, newTs)
ts, err = metaCli.GetGlobalCheckpointForTask(ctx, task)
req.NoError(err)
req.EqualValues(18, ts)
Expand Down Expand Up @@ -399,7 +407,9 @@ func testStoptask(t *testing.T, metaCli streamhelper.AdvancerExt) {
req.EqualValues(taskInfo.PBInfo.Name, t2.Info.Name)

// upload global checkpoint
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, taskName, 100))
newTs, err := metaCli.UploadV3GlobalCheckpointForTask(ctx, taskName, 100)
req.NoError(err)
req.EqualValues(100, newTs)
ts, err := metaCli.GetGlobalCheckpointForTask(ctx, taskName)
req.NoError(err)
req.EqualValues(100, ts)
Expand Down
11 changes: 4 additions & 7 deletions br/pkg/streamhelper/regioniter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,12 @@ type TiKVClusterMeta interface {
// Stores returns the store metadata from the cluster.
Stores(ctx context.Context) ([]Store, error)

// Updates the service GC safe point for the cluster.
// Returns the latest service GC safe point.
// If the arguments is `0`, this would remove the service safe point.
// NOTE: once we support multi tasks, perhaps we need to allow the caller to provide a namespace.
// For now, all tasks (exactly one task in fact) use the same checkpoint.
BlockGCUntil(ctx context.Context, at uint64) (uint64, error)
// UpdateServiceGCSafePoint updates the safepoint for specific service and
// returns the minimum safepoint across all services.
UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error)

// UnblockGC used to remove the service GC safe point in PD.
UnblockGC(ctx context.Context) error
UnblockGC(ctx context.Context, serviceID string) error

FetchCurrentTS(ctx context.Context) (uint64, error)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/streamhelper/regioniter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ func (c constantRegions) Stores(ctx context.Context) ([]streamhelper.Store, erro
// Updates the service GC safe point for the cluster.
// Returns the latest service GC safe point.
// If the arguments is `0`, this would remove the service safe point.
func (c constantRegions) BlockGCUntil(ctx context.Context, at uint64) (uint64, error) {
func (c constantRegions) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
return 0, status.Error(codes.Unimplemented, "Unsupported operation")
}

func (c constantRegions) UnblockGC(ctx context.Context) error {
func (c constantRegions) UnblockGC(ctx context.Context, serviceID string) error {
return status.Error(codes.Unimplemented, "Unsupported operation")
}

Expand Down