diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 08ff8d85c25ef..8ac9eeb0c3ae3 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -3278,7 +3278,7 @@ func (rc *Client) restoreMetaKvEntries( failpoint.Inject("failed-to-restore-metakv", func(_ failpoint.Value) { failpoint.Return(0, 0, errors.Errorf("failpoint: failed to restore metakv")) }) - if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.ts); err != nil { + if err := PutRawKvWithRetry(ctx, rc.rawKVClient, newEntry.Key, newEntry.Value, entry.ts); err != nil { return 0, 0, errors.Trace(err) } // for failpoint, we need to flush the cache in rawKVClient every time @@ -3867,3 +3867,13 @@ func (b *waitTiFlashBackoffer) NextBackoff(error) time.Duration { func (b *waitTiFlashBackoffer) Attempt() int { return b.Attempts } + +func PutRawKvWithRetry(ctx context.Context, client *RawKVBatchClient, key, value []byte, originTs uint64) error { + err := utils.WithRetry(ctx, func() error { + return client.Put(ctx, key, value, originTs) + }, utils.NewRawClientBackoffStrategy()) + if err != nil { + return errors.Errorf("failed to put raw kv after retry") + } + return nil +} diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index bc82efd8203cd..a20b4a2a8b905 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/pkg/tablecodec" filter "github.com/pingcap/tidb/pkg/util/table-filter" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/rawkv" pd "github.com/tikv/pd/client" "google.golang.org/grpc/keepalive" ) @@ -2007,3 +2008,69 @@ func TestCheckNewCollationEnable(t *testing.T) { require.Equal(t, ca.newCollationEnableInCluster == "True", enabled) } } + +type mockRawKVClient struct { + rawkv.Client + putCount int + errThreshold int +} + +func (m *mockRawKVClient) BatchPut(ctx context.Context, keys, values [][]byte, options ...rawkv.RawOption) error { + m.putCount += 1 + if m.errThreshold >= m.putCount { + return errors.New("rpcClient is idle") + } + return nil +} + +func TestPutRawKvWithRetry(t *testing.T) { + tests := []struct { + name string + errThreshold int + cancelAfter time.Duration + wantErr string + wantPuts int + }{ + { + name: "success on first try", + errThreshold: 0, + wantPuts: 1, + }, + { + name: "success on after failure", + errThreshold: 2, + wantPuts: 3, + }, + { + name: "fails all retries", + errThreshold: 5, + wantErr: "failed to put raw kv after retry", + wantPuts: 5, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockRawClient := &mockRawKVClient{ + errThreshold: tt.errThreshold, + } + client := restore.NewRawKVBatchClient(mockRawClient, 1) + + ctx := context.Background() + if tt.cancelAfter > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, tt.cancelAfter) + defer cancel() + } + + err := restore.PutRawKvWithRetry(ctx, client, []byte("key"), []byte("value"), 1) + + if tt.wantErr != "" { + require.ErrorContains(t, err, tt.wantErr) + } else { + require.NoError(t, err) + } + require.Equal(t, tt.wantPuts, mockRawClient.putCount) + }) + } +} diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index d1f82db6fe3ac..ee5d3284496c4 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -51,6 +51,10 @@ const ( ChecksumWaitInterval = 1 * time.Second ChecksumMaxWaitInterval = 30 * time.Second + rawClientMaxAttempts = 5 + rawClientDelayTime = 500 * time.Millisecond + rawClientMaxDelayTime = 5 * time.Second + gRPC_Cancel = "the client connection is closing" ) @@ -280,3 +284,35 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration { func (bo *pdReqBackoffer) Attempt() int { return bo.attempt } + +type RawClientBackoffStrategy struct { + Attempts int + BaseBackoff time.Duration + MaxBackoff time.Duration +} + +func NewRawClientBackoffStrategy() Backoffer { + return &RawClientBackoffStrategy{ + Attempts: rawClientMaxAttempts, + BaseBackoff: rawClientDelayTime, + MaxBackoff: rawClientMaxAttempts, + } +} + +// NextBackoff returns a duration to wait before retrying again +func (b *RawClientBackoffStrategy) NextBackoff(error) time.Duration { + bo := b.BaseBackoff + b.Attempts-- + if b.Attempts == 0 { + return 0 + } + b.BaseBackoff *= 2 + if b.BaseBackoff > b.MaxBackoff { + b.BaseBackoff = b.MaxBackoff + } + return bo +} + +func (b *RawClientBackoffStrategy) Attempt() int { + return b.Attempts +} diff --git a/build/BUILD.bazel b/build/BUILD.bazel index a1a180de16dd1..36fa803488f61 100644 --- a/build/BUILD.bazel +++ b/build/BUILD.bazel @@ -1,9 +1,9 @@ -package(default_visibility = ["//visibility:public"]) - -load("@io_bazel_rules_go//go:def.bzl", "go_library", "nogo") load("@bazel_skylib//rules:common_settings.bzl", "bool_flag") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "nogo") load("//build/linter/staticcheck:def.bzl", "staticcheck_analyzers") +package(default_visibility = ["//visibility:public"]) + bool_flag( name = "with_nogo_flag", build_setting_default = False,