diff --git a/sync_diff_inspector/chunk/chunk.go b/sync_diff_inspector/chunk/chunk.go index 6ac71cf51..ce75346a4 100644 --- a/sync_diff_inspector/chunk/chunk.go +++ b/sync_diff_inspector/chunk/chunk.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb-tools/pkg/dbutil" + "github.com/pingcap/tidb/pkg/meta/model" "go.uber.org/zap" ) @@ -150,6 +151,11 @@ type Range struct { Where string `json:"where"` Args []interface{} `json:"args"` + // IndexHint is the index found in chunk splitting, it's only used for test. + IndexHint string `json:"index-hint"` + // IndexColumns is the columns used to split chunks, and it's used to find index hint in checksum query. + IndexColumns []*model.ColumnInfo `json:"-"` + columnOffset map[string]int } @@ -386,6 +392,8 @@ func (c *Range) Update(column, lower, upper string, updateLower, updateUpper boo func (c *Range) Copy() *Range { newChunk := NewChunkRange() + newChunk.IndexHint = c.IndexHint + newChunk.IndexColumns = c.IndexColumns for _, bound := range c.Bounds { newChunk.addBound(&Bound{ Column: bound.Column, @@ -401,6 +409,8 @@ func (c *Range) Copy() *Range { func (c *Range) Clone() *Range { newChunk := NewChunkRange() + newChunk.IndexHint = c.IndexHint + newChunk.IndexColumns = c.IndexColumns for _, bound := range c.Bounds { newChunk.addBound(&Bound{ Column: bound.Column, diff --git a/sync_diff_inspector/chunk/chunk_test.go b/sync_diff_inspector/chunk/chunk_test.go index b5d62dd94..16a4518c3 100644 --- a/sync_diff_inspector/chunk/chunk_test.go +++ b/sync_diff_inspector/chunk/chunk_test.go @@ -122,7 +122,7 @@ func TestChunkToString(t *testing.T) { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"2","has-lower":true,"has-upper":true},{"column":"b","lower":"3","upper":"4","has-lower":true,"has-upper":true},{"column":"c","lower":"5","upper":"6","has-lower":true,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"2","has-lower":true,"has-upper":true},{"column":"b","lower":"3","upper":"4","has-lower":true,"has-upper":true},{"column":"c","lower":"5","upper":"6","has-lower":true,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: (1,3,5) < (a,b,c) <= (2,4,6)") // upper @@ -157,7 +157,7 @@ func TestChunkToString(t *testing.T) { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"2","has-lower":false,"has-upper":true},{"column":"b","lower":"3","upper":"4","has-lower":false,"has-upper":true},{"column":"c","lower":"5","upper":"6","has-lower":false,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"2","has-lower":false,"has-upper":true},{"column":"b","lower":"3","upper":"4","has-lower":false,"has-upper":true},{"column":"c","lower":"5","upper":"6","has-lower":false,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: (a,b,c) <= (2,4,6)") // lower @@ -199,7 +199,7 @@ func TestChunkToString(t *testing.T) { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"2","has-lower":true,"has-upper":false},{"column":"b","lower":"3","upper":"4","has-lower":true,"has-upper":false},{"column":"c","lower":"5","upper":"6","has-lower":true,"has-upper":false}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"2","has-lower":true,"has-upper":false},{"column":"b","lower":"3","upper":"4","has-lower":true,"has-upper":false},{"column":"c","lower":"5","upper":"6","has-lower":true,"has-upper":false}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: (1,3,5) < (a,b,c)") // none @@ -232,7 +232,7 @@ func TestChunkToString(t *testing.T) { for i, arg := range args { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"2","has-lower":false,"has-upper":false},{"column":"b","lower":"3","upper":"4","has-lower":false,"has-upper":false},{"column":"c","lower":"5","upper":"6","has-lower":false,"has-upper":false}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"2","has-lower":false,"has-upper":false},{"column":"b","lower":"3","upper":"4","has-lower":false,"has-upper":false},{"column":"c","lower":"5","upper":"6","has-lower":false,"has-upper":false}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: Full") // same & lower & upper @@ -274,7 +274,7 @@ func TestChunkToString(t *testing.T) { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"1","has-lower":true,"has-upper":true},{"column":"b","lower":"3","upper":"4","has-lower":true,"has-upper":true},{"column":"c","lower":"5","upper":"5","has-lower":true,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"1","has-lower":true,"has-upper":true},{"column":"b","lower":"3","upper":"4","has-lower":true,"has-upper":true},{"column":"c","lower":"5","upper":"5","has-lower":true,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: (1,3,5) < (a,b,c) <= (1,4,5)") // same & upper @@ -309,7 +309,7 @@ func TestChunkToString(t *testing.T) { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"2","upper":"2","has-lower":false,"has-upper":true},{"column":"b","lower":"3","upper":"4","has-lower":false,"has-upper":true},{"column":"c","lower":"5","upper":"6","has-lower":false,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"2","upper":"2","has-lower":false,"has-upper":true},{"column":"b","lower":"3","upper":"4","has-lower":false,"has-upper":true},{"column":"c","lower":"5","upper":"6","has-lower":false,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: (a,b,c) <= (2,4,6)") // same & lower @@ -351,7 +351,7 @@ func TestChunkToString(t *testing.T) { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"1","has-lower":true,"has-upper":false},{"column":"b","lower":"3","upper":"4","has-lower":true,"has-upper":false},{"column":"c","lower":"5","upper":"6","has-lower":true,"has-upper":false}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"1","has-lower":true,"has-upper":false},{"column":"b","lower":"3","upper":"4","has-lower":true,"has-upper":false},{"column":"c","lower":"5","upper":"6","has-lower":true,"has-upper":false}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: (1,3,5) < (a,b,c)") // same & none @@ -384,7 +384,7 @@ func TestChunkToString(t *testing.T) { for i, arg := range args { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"1","has-lower":false,"has-upper":false},{"column":"b","lower":"3","upper":"4","has-lower":false,"has-upper":false},{"column":"c","lower":"5","upper":"6","has-lower":false,"has-upper":false}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"1","has-lower":false,"has-upper":false},{"column":"b","lower":"3","upper":"4","has-lower":false,"has-upper":false},{"column":"c","lower":"5","upper":"6","has-lower":false,"has-upper":false}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: Full") // all equal @@ -417,7 +417,7 @@ func TestChunkToString(t *testing.T) { for i, arg := range args { require.Equal(t, arg, expectArgs[i]) } - require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"1","has-lower":true,"has-upper":true},{"column":"b","lower":"3","upper":"3","has-lower":true,"has-upper":true},{"column":"c","lower":"6","upper":"6","has-lower":true,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null}`) + require.Equal(t, chunk.String(), `{"index":null,"type":0,"bounds":[{"column":"a","lower":"1","upper":"1","has-lower":true,"has-upper":true},{"column":"b","lower":"3","upper":"3","has-lower":true,"has-upper":true},{"column":"c","lower":"6","upper":"6","has-lower":true,"has-upper":true}],"is-first":false,"is-last":false,"where":"","args":null,"index-hint":""}`) require.Equal(t, chunk.ToMeta(), "range in sequence: (1,3,6) < (a,b,c) <= (1,3,6)") } diff --git a/sync_diff_inspector/config/config.go b/sync_diff_inspector/config/config.go index 7fd99cca6..f04c09b4f 100644 --- a/sync_diff_inspector/config/config.go +++ b/sync_diff_inspector/config/config.go @@ -374,6 +374,8 @@ type Config struct { CheckStructOnly bool `toml:"check-struct-only" json:"check-struct-only"` // experimental feature: only check table data without table struct CheckDataOnly bool `toml:"check-data-only" json:"-"` + // the mode of hint + HintMode string `toml:"hint-mode" json:"hint-mode"` // skip validation for tables that don't exist upstream or downstream SkipNonExistingTable bool `toml:"skip-non-existing-table" json:"-"` // DMAddr is dm-master's address, the format should like "http://127.0.0.1:8261" diff --git a/sync_diff_inspector/config/config_test.go b/sync_diff_inspector/config/config_test.go index 7c12c260b..1145a0c6d 100644 --- a/sync_diff_inspector/config/config_test.go +++ b/sync_diff_inspector/config/config_test.go @@ -50,7 +50,7 @@ func TestParseConfig(t *testing.T) { // we might not use the same config to run this test. e.g. MYSQL_PORT can be 4000 require.JSONEq(t, cfg.String(), - "{\"check-thread-count\":4,\"split-thread-count\":5,\"export-fix-sql\":true,\"check-struct-only\":false,\"dm-addr\":\"\",\"dm-task\":\"\",\"data-sources\":{\"mysql1\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql2\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql3\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"tidb0\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null}},\"routes\":{\"rule1\":{\"schema-pattern\":\"test_*\",\"table-pattern\":\"t_*\",\"target-schema\":\"test\",\"target-table\":\"t\"},\"rule2\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test2\",\"target-table\":\"t2\"},\"rule3\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test\",\"target-table\":\"t\"}},\"table-configs\":{\"config1\":{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}},\"task\":{\"source-instances\":[\"mysql1\",\"mysql2\",\"mysql3\"],\"source-routes\":null,\"target-instance\":\"tidb0\",\"target-check-tables\":[\"schema*.table*\",\"!c.*\",\"test2.t2\"],\"target-configs\":[\"config1\"],\"output-dir\":\"/tmp/output/config\",\"SourceInstances\":[{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null}],\"TargetInstance\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null},\"TargetTableConfigs\":[{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}],\"TargetCheckTables\":[{},{},{}],\"FixDir\":\"/tmp/output/config/fix-on-tidb0\",\"CheckpointDir\":\"/tmp/output/config/checkpoint\",\"HashFile\":\"\"},\"ConfigFile\":\"config_sharding.toml\",\"PrintVersion\":false}") + "{\"check-thread-count\":4,\"split-thread-count\":5,\"export-fix-sql\":true,\"hint-mode\":\"\",\"check-struct-only\":false,\"dm-addr\":\"\",\"dm-task\":\"\",\"data-sources\":{\"mysql1\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql2\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql3\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"tidb0\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null}},\"routes\":{\"rule1\":{\"schema-pattern\":\"test_*\",\"table-pattern\":\"t_*\",\"target-schema\":\"test\",\"target-table\":\"t\"},\"rule2\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test2\",\"target-table\":\"t2\"},\"rule3\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test\",\"target-table\":\"t\"}},\"table-configs\":{\"config1\":{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}},\"task\":{\"source-instances\":[\"mysql1\",\"mysql2\",\"mysql3\"],\"source-routes\":null,\"target-instance\":\"tidb0\",\"target-check-tables\":[\"schema*.table*\",\"!c.*\",\"test2.t2\"],\"target-configs\":[\"config1\"],\"output-dir\":\"/tmp/output/config\",\"SourceInstances\":[{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null}],\"TargetInstance\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null},\"TargetTableConfigs\":[{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}],\"TargetCheckTables\":[{},{},{}],\"FixDir\":\"/tmp/output/config/fix-on-tidb0\",\"CheckpointDir\":\"/tmp/output/config/checkpoint\",\"HashFile\":\"\"},\"ConfigFile\":\"config_sharding.toml\",\"PrintVersion\":false}") hash, err := cfg.Task.ComputeConfigHash() require.NoError(t, err) require.Equal(t, hash, "c080f9894ec24aadb4aaec1109cd1951454f09a1233f2034bc3b06e0903cb289") diff --git a/sync_diff_inspector/diff.go b/sync_diff_inspector/diff.go index b46885d0c..9f9af65eb 100644 --- a/sync_diff_inspector/diff.go +++ b/sync_diff_inspector/diff.go @@ -135,6 +135,9 @@ func (df *Diff) init(ctx context.Context, cfg *config.Config) (err error) { setTiDBCfg() df.downstream, df.upstream, err = source.NewSources(ctx, cfg) + df.downstream.SetHintMode(cfg.HintMode) + df.upstream.SetHintMode(cfg.HintMode) + if err != nil { return errors.Trace(err) } diff --git a/sync_diff_inspector/source/mysql_shard.go b/sync_diff_inspector/source/mysql_shard.go index afeb056c4..cece39fab 100644 --- a/sync_diff_inspector/source/mysql_shard.go +++ b/sync_diff_inspector/source/mysql_shard.go @@ -64,6 +64,11 @@ type MySQLSources struct { sourceTablesMap map[string][]*common.TableShardSource } +// SetHintMode does nothing for MySQL source +func (*MySQLSources) SetHintMode(string) error { + return nil +} + func getMatchedSourcesForTable(sourceTablesMap map[string][]*common.TableShardSource, table *common.TableDiff) []*common.TableShardSource { if sourceTablesMap == nil { log.Fatal("unreachable, source tables map shouldn't be nil.") @@ -103,7 +108,17 @@ func (s *MySQLSources) GetCountAndMd5(ctx context.Context, tableRange *splitter. for _, ms := range matchSources { go func(ms *common.TableShardSource) { - count, checksum, err := utils.GetCountAndMd5Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, chunk.Args) + conn, err := ms.DBConn.Conn(ctx) + if err != nil { + infoCh <- &ChecksumInfo{ + Err: err, + } + return + } + defer conn.Close() + count, checksum, err := utils.GetCountAndMd5Checksum( + ctx, conn, ms.OriginSchema, ms.OriginTable, table.Info, + chunk.Where, "", chunk.Args) infoCh <- &ChecksumInfo{ Checksum: checksum, Count: count, diff --git a/sync_diff_inspector/source/source.go b/sync_diff_inspector/source/source.go index c0c827105..8ef4d0eb9 100644 --- a/sync_diff_inspector/source/source.go +++ b/sync_diff_inspector/source/source.go @@ -72,6 +72,9 @@ type TableAnalyzer interface { } type Source interface { + // SetHintMode set the hint mode + SetHintMode(string) error + // GetTableAnalyzer pick the proper analyzer for different source. // the implement of this function is different in mysql/tidb. GetTableAnalyzer() TableAnalyzer diff --git a/sync_diff_inspector/source/tidb.go b/sync_diff_inspector/source/tidb.go index 4504b27c2..c7a3dcbe4 100644 --- a/sync_diff_inspector/source/tidb.go +++ b/sync_diff_inspector/source/tidb.go @@ -17,6 +17,7 @@ import ( "context" "database/sql" "fmt" + "strings" "time" "github.com/coreos/go-semver/semver" @@ -82,10 +83,36 @@ func (s *TiDBRowsIterator) Next() (map[string]*dbutil.ColumnData, error) { return nil, nil } +type hintMode int + +const ( + // hintNone does nothing + hintNone hintMode = iota + // hintSQL indicates using SQL hints to force index scan + hintSQL + // hintSessionVar indicates using session variable to force index scan + hintSessionVar +) + +// String implements the fmt.Stringer interface. +func (m hintMode) String() string { + switch m { + case hintNone: + return "None" + case hintSQL: + return "HintSQL" + case hintSessionVar: + return "HintSessionVar" + default: + panic(fmt.Sprintf("invalid hint mode '%d'", m)) + } +} + type TiDBSource struct { tableDiffs []*common.TableDiff sourceTableMap map[string]*common.TableSource snapshot string + mode hintMode // bucketSpliterPool is the shared pool to produce chunks using bucket bucketSpliterPool *utils.WorkerPool dbConn *sql.DB @@ -93,6 +120,23 @@ type TiDBSource struct { version *semver.Version } +// SetHintMode parses the string value to the hintMode. +func (s *TiDBSource) SetHintMode(ss string) error { + switch strings.ToLower(ss) { + case "", "none": + s.mode = hintNone + case "sql": + s.mode = hintSQL + case "sessionvar": + s.mode = hintSessionVar + default: + return errors.Errorf("invalid hint mode '%s', please choose valid option between ['', 'sql', 'session']", ss) + } + + log.Info("get hint mode", zap.String("hint mode", s.mode.String())) + return nil +} + func (s *TiDBSource) GetTableAnalyzer() TableAnalyzer { return &TiDBTableAnalyzer{ s.dbConn, @@ -126,7 +170,42 @@ func (s *TiDBSource) GetCountAndMd5(ctx context.Context, tableRange *splitter.Ra chunk := tableRange.GetChunk() matchSource := getMatchSource(s.sourceTableMap, table) - count, checksum, err := utils.GetCountAndMd5Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, chunk.Where, chunk.Args) + + conn, err := s.dbConn.Conn(ctx) + if err != nil { + return &ChecksumInfo{ + Err: err, + } + } + defer conn.Close() + + indexHint := "" + if s.mode == hintSQL && len(chunk.IndexColumns) > 0 { + // Since the index name is extracted from one data source, + // while another data source may have an index with same columns but a different index name, + // we use the index columns to get the actual index name here. + // For example: + // Upstream: idx1(c1, c2) + // Downstream: idx2(c1, c2) + if tableInfos, err := s.GetSourceStructInfo(ctx, tableRange.GetTableIndex()); err == nil { + for _, index := range dbutil.FindAllIndex(tableInfos[0]) { + if utils.IsSameIndex(index, chunk.IndexColumns) { + indexHint = fmt.Sprintf("/*+ USE_INDEX(`%s`.`%s`, `%s`) */", + matchSource.OriginSchema, + matchSource.OriginTable, + index.Name.L, + ) + break + } + } + } + } else if s.mode == hintSessionVar { + conn.ExecContext(ctx, "set session tidb_opt_prefer_range_scan = 1") + } + + count, checksum, err := utils.GetCountAndMd5Checksum( + ctx, conn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, + chunk.Where, indexHint, chunk.Args) cost := time.Since(beginTime) return &ChecksumInfo{ diff --git a/sync_diff_inspector/splitter/bucket.go b/sync_diff_inspector/splitter/bucket.go index 9d886494c..6cc61eb06 100644 --- a/sync_diff_inspector/splitter/bucket.go +++ b/sync_diff_inspector/splitter/bucket.go @@ -48,6 +48,7 @@ type BucketIterator struct { errCh chan error cancel context.CancelFunc indexID int64 + indexName string progressID string dbConn *sql.DB @@ -192,6 +193,7 @@ NEXTINDEX: s.buckets = bucket s.indexColumns = indexColumns s.indexID = index.ID + s.indexName = index.Name.L break } @@ -278,6 +280,8 @@ func (s *BucketIterator) produceChunks(ctx context.Context, startRange *RangeInf leftCnt := c.Index.ChunkCnt - c.Index.ChunkIndex - 1 if leftCnt > 0 { chunkRange := chunk.NewChunkRange() + chunkRange.IndexHint = s.indexName + chunkRange.IndexColumns = s.indexColumns for i, column := range s.indexColumns { chunkRange.Update(column.Name.O, "", nextUpperValues[i], false, true) @@ -310,6 +314,9 @@ func (s *BucketIterator) produceChunks(ctx context.Context, startRange *RangeInf } chunkRange := chunk.NewChunkRange() + chunkRange.IndexHint = s.indexName + chunkRange.IndexColumns = s.indexColumns + for j, column := range s.indexColumns { var lowerValue, upperValue string if len(lowerValues) > 0 { @@ -347,6 +354,9 @@ func (s *BucketIterator) produceChunks(ctx context.Context, startRange *RangeInf // merge the rest keys into one chunk chunkRange := chunk.NewChunkRange() + chunkRange.IndexHint = s.indexName + chunkRange.IndexColumns = s.indexColumns + if len(lowerValues) > 0 { for j, column := range s.indexColumns { chunkRange.Update(column.Name.O, lowerValues[j], "", true, false) diff --git a/sync_diff_inspector/splitter/random.go b/sync_diff_inspector/splitter/random.go index 33fe7272b..1fe580ce3 100644 --- a/sync_diff_inspector/splitter/random.go +++ b/sync_diff_inspector/splitter/random.go @@ -61,6 +61,43 @@ func NewRandomIteratorWithCheckpoint(ctx context.Context, progressID string, tab } chunkRange := chunk.NewChunkRange() + + iFields := &indexFields{cols: fields, tableInfo: table.Info} + var indices = dbutil.FindAllIndex(table.Info) +NEXTINDEX: + for _, index := range indices { + if index == nil { + continue + } + if startRange != nil && startRange.IndexID != index.ID { + continue + } + + indexColumns := utils.GetColumnsFromIndex(index, table.Info) + + if len(indexColumns) < len(index.Columns) { + // some column in index is ignored. + continue + } + + if !iFields.MatchesIndex(index) { + // We are enforcing user configured "index-fields" settings. + continue + } + + // skip the index that has expression column + for _, col := range indexColumns { + if col.Hidden { + continue NEXTINDEX + } + } + + // Found the index, use it as index hint. + chunkRange.IndexHint = index.Name.O + chunkRange.IndexColumns = indexColumns + break + } + beginIndex := 0 bucketChunkCnt := 0 chunkCnt := 0 @@ -117,7 +154,7 @@ func NewRandomIteratorWithCheckpoint(ctx context.Context, progressID string, tab bucketChunkCnt = chunkCnt } - chunks, err := splitRangeByRandom(ctx, dbConn, chunkRange, chunkCnt, table.Schema, table.Table, fields, table.Range, table.Collation) + chunks, err := splitRangeByRandom(ctx, dbConn, chunkRange, chunkCnt, table.Schema, table.Table, iFields.cols, table.Range, table.Collation) if err != nil { return nil, errors.Trace(err) } diff --git a/sync_diff_inspector/splitter/splitter_test.go b/sync_diff_inspector/splitter/splitter_test.go index 18577c60c..2f94ef293 100644 --- a/sync_diff_inspector/splitter/splitter_test.go +++ b/sync_diff_inspector/splitter/splitter_test.go @@ -19,6 +19,7 @@ import ( "fmt" "sort" "strconv" + "strings" "testing" sqlmock "github.com/DATA-DOG/go-sqlmock" @@ -934,3 +935,125 @@ func TestChunkSize(t *testing.T) { require.NoError(t, err) } + +func TestBucketSpliterHint(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + ctx := context.Background() + + testCases := []struct { + tableSQL string + indexCount int + expectedHint string + }{ + { + "create table `test`.`test`(`a` int, `b` int, `c` int, primary key(`a`, `b`), unique key i1(`c`))", + 0, + "PRIMARY", + }, + { + "create table `test`.`test`(`a` int, `b` int, `c` int, unique key i1(`c`))", + 0, + "i1", + }, + { + "create table `test`.`test`(`a` int, `b` int, `c` int, key i2(`b`))", + 1, + "i2", + }, + } + + for _, tc := range testCases { + tableInfo, err := dbutil.GetTableInfoBySQL(tc.tableSQL, parser.New()) + require.NoError(t, err) + + tableDiff := &common.TableDiff{ + Schema: "test", + Table: "test", + Info: tableInfo, + } + + createFakeResultForBucketIterator(mock, tc.indexCount) + + iter, err := NewBucketIteratorWithCheckpoint(ctx, "", tableDiff, db, nil, utils.NewWorkerPool(1, "bucketIter")) + require.NoError(t, err) + chunk, err := iter.Next() + require.NoError(t, err) + require.Equal(t, strings.ToLower(tc.expectedHint), strings.ToLower(chunk.IndexHint)) + } +} + +func TestRandomSpliterHint(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + ctx := context.Background() + + testCases := []struct { + tableSQL string + expectedHint string + }{ + { + "create table `test`.`test`(`a` int, `b` int, `c` int, primary key(`a`, `b`), unique key i1(`c`))", + "PRIMARY", + }, + { + "create table `test`.`test`(`a` int, `b` int, `c` int, unique key i1(`c`), key i2(`b`))", + "i1", + }, + { + "create table `test`.`test`(`a` int, `b` int, `c` int, key i2(`b`))", + "i2", + }, + { + "create table `test`.`test`(`a` int, `b` int, `c` int)", + "", + }, + } + + for _, tc := range testCases { + tableInfo, err := dbutil.GetTableInfoBySQL(tc.tableSQL, parser.New()) + require.NoError(t, err) + + for _, tableRange := range []string{"", "c > 100"} { + tableDiff := &common.TableDiff{ + Schema: "test", + Table: "test", + Info: tableInfo, + Range: tableRange, + } + + mock.ExpectQuery("SELECT COUNT*").WillReturnRows(sqlmock.NewRows([]string{"CNT"}).AddRow("320")) + + iter, err := NewRandomIteratorWithCheckpoint(ctx, "", tableDiff, db, nil) + require.NoError(t, err) + chunk, err := iter.Next() + require.NoError(t, err) + require.Equal(t, strings.ToLower(tc.expectedHint), strings.ToLower(chunk.IndexHint)) + } + } +} + +func createFakeResultForBucketIterator(mock sqlmock.Sqlmock, indexCount int) { + /* + +---------+------------+-------------+----------+-----------+-------+---------+-------------+-------------+ + | Db_name | Table_name | Column_name | Is_index | Bucket_id | Count | Repeats | Lower_Bound | Upper_Bound | + +---------+------------+-------------+----------+-----------+-------+---------+-------------+-------------+ + | test | test | PRIMARY | 1 | 0 | 64 | 1 | (0, 0) | (63, 11) | + | test | test | PRIMARY | 1 | 1 | 128 | 1 | (64, 12) | (127, 23) | + | test | test | PRIMARY | 1 | 2 | 192 | 1 | (128, 24) | (191, 35) | + | test | test | PRIMARY | 1 | 3 | 256 | 1 | (192, 36) | (255, 47) | + | test | test | PRIMARY | 1 | 4 | 320 | 1 | (256, 48) | (319, 59) | + +---------+------------+-------------+----------+-----------+-------+---------+-------------+-------------+ + */ + statsRows := sqlmock.NewRows([]string{"Db_name", "Table_name", "Column_name", "Is_index", "Bucket_id", "Count", "Repeats", "Lower_Bound", "Upper_Bound"}) + for _, indexName := range []string{"PRIMARY", "i1", "i2", "i3", "i4"} { + for i := 0; i < 5; i++ { + statsRows.AddRow("test", "test", indexName, 1, (i+1)*64, (i+1)*64, 1, fmt.Sprintf("(%d, %d)", i*64, i*12), fmt.Sprintf("(%d, %d)", (i+1)*64-1, (i+1)*12-1)) + } + } + mock.ExpectQuery("SHOW STATS_BUCKETS").WillReturnRows(statsRows) + + for i := 0; i < indexCount; i++ { + mock.ExpectQuery("SELECT COUNT\\(DISTINCT *").WillReturnRows(sqlmock.NewRows([]string{"SEL"}).AddRow("5")) + } +} diff --git a/sync_diff_inspector/utils/utils.go b/sync_diff_inspector/utils/utils.go index 382bfe02d..ca9a22459 100644 --- a/sync_diff_inspector/utils/utils.go +++ b/sync_diff_inspector/utils/utils.go @@ -766,7 +766,15 @@ func GetTableSize(ctx context.Context, db *sql.DB, schemaName, tableName string) } // GetCountAndMd5Checksum returns checksum code and count of some data by given condition -func GetCountAndMd5Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, args []interface{}) (int64, uint64, error) { +func GetCountAndMd5Checksum( + ctx context.Context, + conn *sql.Conn, + schemaName, tableName string, + tbInfo *model.TableInfo, + limitRange string, + indexHint string, + args []interface{}, +) (int64, uint64, error) { /* calculate MD5 checksum and count example: mysql> SELECT COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', `id`, `name`, CONCAT(ISNULL(`id`), ISNULL(`name`)))), 1, 16), 16, 10) AS UNSIGNED) ^ CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', `id`, `name`, CONCAT(ISNULL(`id`), ISNULL(`name`)))), 17, 16), 16, 10) AS UNSIGNED)) as CHECKSUM FROM `a`.`t`; @@ -796,19 +804,30 @@ func GetCountAndMd5Checksum(ctx context.Context, db *sql.DB, schemaName, tableNa columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", name)) } - query := fmt.Sprintf("SELECT COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 1, 16), 16, 10) AS UNSIGNED) ^ CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 17, 16), 16, 10) AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;", - strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange) + query := fmt.Sprintf("SELECT %s COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 1, 16), 16, 10) AS UNSIGNED) ^ CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 17, 16), 16, 10) AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;", + indexHint, + strings.Join(columnNames, ", "), + strings.Join(columnIsNull, ", "), + strings.Join(columnNames, ", "), + strings.Join(columnIsNull, ", "), + dbutil.TableName(schemaName, tableName), + limitRange, + ) log.Debug("count and checksum", zap.String("sql", query), zap.Reflect("args", args)) var count sql.NullInt64 var checksum uint64 - err := db.QueryRowContext(ctx, query, args...).Scan(&count, &checksum) + err := conn.QueryRowContext(ctx, query, args...).Scan(&count, &checksum) if err != nil { - log.Warn("execute checksum query fail", zap.String("query", query), zap.Reflect("args", args), zap.Error(err)) + log.Warn("execute checksum query fail", + zap.String("query", query), + zap.Reflect("args", args), + zap.Error(err), + ) return -1, 0, errors.Trace(err) } if !count.Valid { - // if don't have any data, the checksum will be `NULL` + // If there are no data, the checksum will be `NULL` log.Warn("get empty count", zap.String("sql", query), zap.Reflect("args", args)) return 0, 0, nil } @@ -1047,3 +1066,15 @@ func IsBinaryColumn(col *model.ColumnInfo) bool { // varbinary or binary return (col.GetType() == mysql.TypeVarchar || col.GetType() == mysql.TypeString) && mysql.HasBinaryFlag(col.GetFlag()) } + +func IsSameIndex(index *model.IndexInfo, columns []*model.ColumnInfo) bool { + if len(index.Columns) != len(columns) { + return false + } + for i, col := range index.Columns { + if col.Name.L != columns[i].Name.L { + return false + } + } + return true +} diff --git a/sync_diff_inspector/utils/utils_test.go b/sync_diff_inspector/utils/utils_test.go index 2131fa22c..cea44165c 100644 --- a/sync_diff_inspector/utils/utils_test.go +++ b/sync_diff_inspector/utils/utils_test.go @@ -261,9 +261,9 @@ func TestGetCountAndMd5Checksum(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() - conn, mock, err := sqlmock.New() + db, mock, err := sqlmock.New() require.NoError(t, err) - defer conn.Close() + defer db.Close() createTableSQL := "create table `test`.`test`(`a` int, `c` float, `b` varchar(10), `d` datetime, primary key(`a`, `b`), key(`c`, `d`))" tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL, parser.New()) @@ -271,7 +271,9 @@ func TestGetCountAndMd5Checksum(t *testing.T) { mock.ExpectQuery("SELECT COUNT.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)) - count, checksum, err := GetCountAndMd5Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", []interface{}{"123", "234"}) + conn, err := db.Conn(ctx) + require.NoError(t, err) + count, checksum, err := GetCountAndMd5Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", "", []interface{}{"123", "234"}) require.NoError(t, err) require.Equal(t, count, int64(123)) require.Equal(t, checksum, uint64(0x1c8))