Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: separate the restore client #53197

Merged
merged 11 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ go_test(
shard_count = 8,
deps = [
"//br/pkg/conn",
"//br/pkg/gluetidb",
"//br/pkg/gluetidb/mock",
"//br/pkg/metautil",
"//br/pkg/mock",
"//br/pkg/pdutil",
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/tidb/br/pkg/backup"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/gluetidb"
gluemock "github.com/pingcap/tidb/br/pkg/gluetidb/mock"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/pdutil"
Expand All @@ -35,7 +35,7 @@ type testBackup struct {

mockPDClient pd.Client
mockCluster *testutils.MockCluster
mockGlue *gluetidb.MockGlue
mockGlue *gluemock.MockGlue
backupClient *backup.Client

cluster *mock.Cluster
Expand All @@ -46,7 +46,7 @@ func createBackupSuite(t *testing.T) *testBackup {
tikvClient, mockCluster, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
s := new(testBackup)
s.mockGlue = &gluetidb.MockGlue{}
s.mockGlue = &gluemock.MockGlue{}
s.mockPDClient = pdClient
s.mockCluster = mockCluster
s.ctx, s.cancel = context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/conn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ go_test(
"//br/pkg/config",
"//br/pkg/conn/util",
"//br/pkg/pdutil",
"//br/pkg/utils",
"//br/pkg/utiltest",
"//pkg/testkit/testsetup",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",
Expand Down
57 changes: 26 additions & 31 deletions br/pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package conn
package conn_test

import (
"context"
Expand All @@ -15,9 +15,10 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
kvconfig "github.com/pingcap/tidb/br/pkg/config"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/conn/util"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/utiltest"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -61,11 +62,9 @@ func TestGetAllTiKVStoresWithRetryCancel(t *testing.T) {
},
}

fpdc := utils.FakePDClient{
Stores: stores,
}
fpdc := utiltest.NewFakePDClient(stores, false, nil)

_, err = GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash)
_, err = conn.GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash)
require.Error(t, err)
errs := multierr.Errors(err)
require.Equal(t, 1, len(errs))
Expand Down Expand Up @@ -109,11 +108,9 @@ func TestGetAllTiKVStoresWithUnknown(t *testing.T) {
},
}

fpdc := utils.FakePDClient{
Stores: stores,
}
fpdc := utiltest.NewFakePDClient(stores, false, nil)

_, err = GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash)
_, err = conn.GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash)
require.Error(t, err)
errs := multierr.Errors(err)
require.Equal(t, 1, len(errs))
Expand Down Expand Up @@ -167,16 +164,14 @@ func TestCheckStoresAlive(t *testing.T) {
},
}

fpdc := utils.FakePDClient{
Stores: stores,
}
fpdc := utiltest.NewFakePDClient(stores, false, nil)

kvStores, err := GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash)
kvStores, err := conn.GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash)
require.NoError(t, err)
require.Len(t, kvStores, 2)
require.Equal(t, stores[2:], kvStores)

err = checkStoresAlive(ctx, fpdc, util.SkipTiFlash)
err = conn.CheckStoresAlive(ctx, fpdc, util.SkipTiFlash)
require.NoError(t, err)
}

Expand Down Expand Up @@ -256,7 +251,7 @@ func TestGetAllTiKVStores(t *testing.T) {
}

for _, testCase := range testCases {
pdClient := utils.FakePDClient{Stores: testCase.stores}
pdClient := utiltest.NewFakePDClient(testCase.stores, false, nil)
stores, err := util.GetAllTiKVStores(context.Background(), pdClient, testCase.storeBehavior)
if len(testCase.expectedError) != 0 {
require.Error(t, err)
Expand All @@ -275,7 +270,7 @@ func TestGetConnOnCanceledContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

mgr := &Mgr{PdController: &pdutil.PdController{}}
mgr := &conn.Mgr{PdController: &pdutil.PdController{}}

_, err := mgr.GetBackupClient(ctx, 42)
require.Error(t, err)
Expand Down Expand Up @@ -309,9 +304,9 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) {
},
content: []string{""},
// no tikv detected in this case
importNumGoroutines: DefaultImportNumGoroutines,
regionSplitSize: DefaultMergeRegionSizeBytes,
regionSplitKeys: DefaultMergeRegionKeyCount,
importNumGoroutines: conn.DefaultImportNumGoroutines,
regionSplitSize: conn.DefaultMergeRegionSizeBytes,
regionSplitKeys: conn.DefaultMergeRegionKeyCount,
},
{
stores: []*metapb.Store{
Expand Down Expand Up @@ -342,9 +337,9 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) {
"",
},
// no tikv detected in this case
importNumGoroutines: DefaultImportNumGoroutines,
regionSplitSize: DefaultMergeRegionSizeBytes,
regionSplitKeys: DefaultMergeRegionKeyCount,
importNumGoroutines: conn.DefaultImportNumGoroutines,
regionSplitSize: conn.DefaultMergeRegionSizeBytes,
regionSplitKeys: conn.DefaultMergeRegionKeyCount,
},
{
stores: []*metapb.Store{
Expand Down Expand Up @@ -426,7 +421,7 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) {
pctx := context.Background()
for _, ca := range cases {
ctx, cancel := context.WithCancel(pctx)
pdCli := utils.FakePDClient{Stores: ca.stores}
pdCli := utiltest.NewFakePDClient(ca.stores, false, nil)
require.Equal(t, len(ca.content), len(ca.stores))
count := 0
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -448,12 +443,12 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) {
}

httpCli := mockServer.Client()
mgr := &Mgr{PdController: &pdutil.PdController{}}
mgr := &conn.Mgr{PdController: &pdutil.PdController{}}
mgr.PdController.SetPDClient(pdCli)
kvConfigs := &kvconfig.KVConfig{
ImportGoroutines: kvconfig.ConfigTerm[uint]{Value: DefaultImportNumGoroutines, Modified: false},
MergeRegionSize: kvconfig.ConfigTerm[uint64]{Value: DefaultMergeRegionSizeBytes, Modified: false},
MergeRegionKeyCount: kvconfig.ConfigTerm[uint64]{Value: DefaultMergeRegionKeyCount, Modified: false},
ImportGoroutines: kvconfig.ConfigTerm[uint]{Value: conn.DefaultImportNumGoroutines, Modified: false},
MergeRegionSize: kvconfig.ConfigTerm[uint64]{Value: conn.DefaultMergeRegionSizeBytes, Modified: false},
MergeRegionKeyCount: kvconfig.ConfigTerm[uint64]{Value: conn.DefaultMergeRegionKeyCount, Modified: false},
}
mgr.ProcessTiKVConfigs(ctx, kvConfigs, httpCli)
require.EqualValues(t, ca.regionSplitSize, kvConfigs.MergeRegionSize.Value)
Expand Down Expand Up @@ -591,7 +586,7 @@ func TestIsLogBackupEnabled(t *testing.T) {
pctx := context.Background()
for _, ca := range cases {
ctx, cancel := context.WithCancel(pctx)
pdCli := utils.FakePDClient{Stores: ca.stores}
pdCli := utiltest.NewFakePDClient(ca.stores, false, nil)
require.Equal(t, len(ca.content), len(ca.stores))
count := 0
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -613,7 +608,7 @@ func TestIsLogBackupEnabled(t *testing.T) {
}

httpCli := mockServer.Client()
mgr := &Mgr{PdController: &pdutil.PdController{}}
mgr := &conn.Mgr{PdController: &pdutil.PdController{}}
mgr.PdController.SetPDClient(pdCli)
enable, err := mgr.IsLogBackupEnabled(ctx, httpCli)
if ca.err {
Expand Down Expand Up @@ -655,7 +650,7 @@ func TestHandleTiKVAddress(t *testing.T) {
},
}
for _, ca := range cases {
addr, err := handleTiKVAddress(ca.store, ca.httpPrefix)
addr, err := conn.HandleTiKVAddress(ca.store, ca.httpPrefix)
require.Nil(t, err)
require.Equal(t, ca.result, addr.String())
}
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/conn/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (
"go.uber.org/goleak"
)

var (
CheckStoresAlive = checkStoresAlive
HandleTiKVAddress = handleTiKVAddress
)

func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
Expand Down
132 changes: 0 additions & 132 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,135 +235,3 @@ func (gs *tidbSession) GetGlobalVariable(name string) (string, error) {
func (gs *tidbSession) showCreatePlacementPolicy(policy *model.PolicyInfo) string {
return executor.ConstructResultOfShowCreatePlacementPolicy(policy)
}

// mockSession is used for test.
type mockSession struct {
se sessiontypes.Session
globalVars map[string]string
}

// GetSessionCtx implements glue.Glue
func (s *mockSession) GetSessionCtx() sessionctx.Context {
return s.se
}

// Execute implements glue.Session.
func (s *mockSession) Execute(ctx context.Context, sql string) error {
return s.ExecuteInternal(ctx, sql)
}

func (s *mockSession) ExecuteInternal(ctx context.Context, sql string, args ...any) error {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR)
rs, err := s.se.ExecuteInternal(ctx, sql, args...)
if err != nil {
return err
}
// Some of SQLs (like ADMIN RECOVER INDEX) may lazily take effect
// when we are polling the result set.
// At least call `next` once for triggering theirs side effect.
// (Maybe we'd better drain all returned rows?)
if rs != nil {
//nolint: errcheck
defer rs.Close()
c := rs.NewChunk(nil)
if err := rs.Next(ctx, c); err != nil {
return nil
}
}
return nil
}

// CreateDatabase implements glue.Session.
func (*mockSession) CreateDatabase(_ context.Context, _ *model.DBInfo) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// CreatePlacementPolicy implements glue.Session.
func (*mockSession) CreatePlacementPolicy(_ context.Context, _ *model.PolicyInfo) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// CreateTables implements glue.BatchCreateTableSession.
func (*mockSession) CreateTables(_ context.Context, _ map[string][]*model.TableInfo,
_ ...ddl.CreateTableWithInfoConfigurier) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// CreateTable implements glue.Session.
func (*mockSession) CreateTable(_ context.Context, _ model.CIStr,
_ *model.TableInfo, _ ...ddl.CreateTableWithInfoConfigurier) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// Close implements glue.Session.
func (s *mockSession) Close() {
s.se.Close()
}

// GetGlobalVariables implements glue.Session.
func (s *mockSession) GetGlobalVariable(name string) (string, error) {
if ret, ok := s.globalVars[name]; ok {
return ret, nil
}
return "True", nil
}

// MockGlue only used for test
type MockGlue struct {
se sessiontypes.Session
GlobalVars map[string]string
}

func (m *MockGlue) SetSession(se sessiontypes.Session) {
m.se = se
}

// GetDomain implements glue.Glue.
func (*MockGlue) GetDomain(store kv.Storage) (*domain.Domain, error) {
return nil, nil
}

// CreateSession implements glue.Glue.
func (m *MockGlue) CreateSession(store kv.Storage) (glue.Session, error) {
glueSession := &mockSession{
se: m.se,
globalVars: m.GlobalVars,
}
return glueSession, nil
}

// Open implements glue.Glue.
func (*MockGlue) Open(path string, option pd.SecurityOption) (kv.Storage, error) {
return nil, nil
}

// OwnsStorage implements glue.Glue.
func (*MockGlue) OwnsStorage() bool {
return true
}

// StartProgress implements glue.Glue.
func (*MockGlue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress {
return nil
}

// Record implements glue.Glue.
func (*MockGlue) Record(name string, value uint64) {
}

// GetVersion implements glue.Glue.
func (*MockGlue) GetVersion() string {
return "mock glue"
}

// UseOneShotSession implements glue.Glue.
func (m *MockGlue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error {
glueSession := &mockSession{
se: m.se,
}
return fn(glueSession)
}
18 changes: 18 additions & 0 deletions br/pkg/gluetidb/mock/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "mock",
srcs = ["mock.go"],
importpath = "github.com/pingcap/tidb/br/pkg/gluetidb/mock",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/glue",
"//pkg/ddl",
"//pkg/domain",
"//pkg/kv",
"//pkg/parser/model",
"//pkg/session/types",
"//pkg/sessionctx",
"@com_github_tikv_pd_client//:client",
],
)