Skip to content

Commit ee95b9b

Browse files
authored
executor: support new sql syntax show distribution jobs and distribute table t1 (#60169)
close #60063
1 parent 4e79288 commit ee95b9b

File tree

11 files changed

+609
-113
lines changed

11 files changed

+609
-113
lines changed

pkg/domain/infosync/region.go

+25-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,31 @@ func GetRegionDistributionByKeyRange(ctx context.Context, startKey []byte, endKe
7878
return nil, err
7979
}
8080
if is.pdHTTPCli == nil {
81-
return nil, errs.ErrClientGetLeader.FastGenByArgs("pd client not found")
81+
return nil, errs.ErrClientGetLeader.FastGenByArgs("pd http cli is nil")
8282
}
8383
return is.pdHTTPCli.GetRegionDistributionByKeyRange(ctx, pd.NewKeyRange(startKey, endKey), engine)
8484
}
85+
86+
// GetSchedulerConfig is used to get the configuration of the specified scheduler from PD.
87+
func GetSchedulerConfig(ctx context.Context, schedulerName string) (any, error) {
88+
is, err := getGlobalInfoSyncer()
89+
if err != nil {
90+
return nil, err
91+
}
92+
if is.pdHTTPCli == nil {
93+
return nil, errs.ErrClientGetLeader.FastGenByArgs("pd http cli is nil")
94+
}
95+
return is.pdHTTPCli.GetSchedulerConfig(ctx, schedulerName)
96+
}
97+
98+
// CreateSchedulerConfigWithInput is used to create a scheduler with the specified input.
99+
func CreateSchedulerConfigWithInput(ctx context.Context, schedulerName string, input map[string]any) error {
100+
is, err := getGlobalInfoSyncer()
101+
if err != nil {
102+
return err
103+
}
104+
if is.pdHTTPCli == nil {
105+
return errs.ErrClientGetLeader.FastGenByArgs(schedulerName)
106+
}
107+
return is.pdHTTPCli.CreateSchedulerWithInput(ctx, schedulerName, input)
108+
}

pkg/executor/BUILD.bazel

+3-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ go_library(
2929
"ddl.go",
3030
"delete.go",
3131
"detach.go",
32+
"distribute.go",
3233
"distsql.go",
3334
"expand.go",
3435
"explain.go",
@@ -296,6 +297,7 @@ go_library(
296297
"@com_github_tikv_client_go_v2//txnkv/txnsnapshot",
297298
"@com_github_tikv_client_go_v2//util",
298299
"@com_github_tikv_pd_client//:client",
300+
"@com_github_tikv_pd_client//errs",
299301
"@com_github_tikv_pd_client//http",
300302
"@com_github_twmb_murmur3//:murmur3",
301303
"@com_sourcegraph_sourcegraph_appdash//:appdash",
@@ -331,7 +333,7 @@ go_test(
331333
"delete_test.go",
332334
"detach_integration_test.go",
333335
"detach_test.go",
334-
"distributions_test.go",
336+
"distribute_table_test.go",
335337
"distsql_test.go",
336338
"executor_failpoint_test.go",
337339
"executor_pkg_test.go",

pkg/executor/builder.go

+17
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,8 @@ func (b *executorBuilder) build(p base.Plan) exec.Executor {
305305
return b.buildSQLBindExec(v)
306306
case *plannercore.SplitRegion:
307307
return b.buildSplitRegion(v)
308+
case *plannercore.DistributeTable:
309+
return b.buildDistributeTable(v)
308310
case *plannercore.PhysicalIndexMergeReader:
309311
return b.buildIndexMergeReader(v)
310312
case *plannercore.SelectInto:
@@ -900,6 +902,7 @@ func (b *executorBuilder) buildShow(v *plannercore.PhysicalShow) exec.Executor {
900902
Extended: v.Extended,
901903
Extractor: v.Extractor,
902904
ImportJobID: v.ImportJobID,
905+
DistributionJobID: v.DistributionJobID,
903906
SQLOrDigest: v.SQLOrDigest,
904907
}
905908
if e.Tp == ast.ShowMasterStatus || e.Tp == ast.ShowBinlogStatus {
@@ -2680,6 +2683,20 @@ func buildHandleColsForSplit(sc *stmtctx.StatementContext, tbInfo *model.TableIn
26802683
return plannerutil.NewIntHandleCols(intCol)
26812684
}
26822685

2686+
func (b *executorBuilder) buildDistributeTable(v *plannercore.DistributeTable) exec.Executor {
2687+
base := exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID())
2688+
base.SetInitCap(1)
2689+
base.SetMaxChunkSize(1)
2690+
return &DistributeTableExec{
2691+
BaseExecutor: base,
2692+
tableInfo: v.TableInfo,
2693+
partitionNames: v.PartitionNames,
2694+
rule: v.Rule,
2695+
engine: v.Engine,
2696+
is: b.is,
2697+
}
2698+
}
2699+
26832700
func (b *executorBuilder) buildSplitRegion(v *plannercore.SplitRegion) exec.Executor {
26842701
base := exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID())
26852702
base.SetInitCap(1)

pkg/executor/compiler.go

+5
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,11 @@ func getStmtDbLabel(stmtNode ast.StmtNode, resolveCtx *resolve.Context) map[stri
339339
dbLabel := x.Table.Schema.O
340340
dbLabelSet[dbLabel] = struct{}{}
341341
}
342+
case *ast.DistributeTableStmt:
343+
if x.Table != nil {
344+
dbLabel := x.Table.Schema.O
345+
dbLabelSet[dbLabel] = struct{}{}
346+
}
342347
case *ast.NonTransactionalDMLStmt:
343348
if x.ShardColumn != nil {
344349
dbLabel := x.ShardColumn.Schema.O

pkg/executor/distribute.go

+166
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package executor
16+
17+
import (
18+
"context"
19+
"strings"
20+
21+
"github.com/pingcap/tidb/pkg/domain/infosync"
22+
"github.com/pingcap/tidb/pkg/executor/internal/exec"
23+
"github.com/pingcap/tidb/pkg/infoschema"
24+
"github.com/pingcap/tidb/pkg/meta/model"
25+
"github.com/pingcap/tidb/pkg/parser/ast"
26+
"github.com/pingcap/tidb/pkg/table/tables"
27+
"github.com/pingcap/tidb/pkg/tablecodec"
28+
"github.com/pingcap/tidb/pkg/util/chunk"
29+
"github.com/pingcap/tidb/pkg/util/codec"
30+
"github.com/tikv/pd/client/errs"
31+
pdhttp "github.com/tikv/pd/client/http"
32+
)
33+
34+
var schedulerName = "balance-range-scheduler"
35+
36+
// DistributeTableExec represents a distribute table executor.
37+
type DistributeTableExec struct {
38+
exec.BaseExecutor
39+
40+
tableInfo *model.TableInfo
41+
is infoschema.InfoSchema
42+
partitionNames []ast.CIStr
43+
rule ast.CIStr
44+
engine ast.CIStr
45+
46+
done bool
47+
keyRanges []*pdhttp.KeyRange
48+
}
49+
50+
// Open implements the Executor Open interface.
51+
func (e *DistributeTableExec) Open(context.Context) error {
52+
ranges, err := e.getKeyRanges()
53+
if err != nil {
54+
return err
55+
}
56+
e.keyRanges = ranges
57+
return nil
58+
}
59+
60+
// Next implements the Executor Next interface.
61+
func (e *DistributeTableExec) Next(ctx context.Context, chk *chunk.Chunk) error {
62+
chk.Reset()
63+
if e.done {
64+
return nil
65+
}
66+
e.done = true
67+
err := e.distributeTable(ctx)
68+
if err != nil {
69+
return err
70+
}
71+
config, err := infosync.GetSchedulerConfig(ctx, schedulerName)
72+
configs, ok := config.([]any)
73+
if !ok {
74+
return errs.ErrClientProtoUnmarshal.FastGenByArgs(config)
75+
}
76+
jobs := make([]map[string]any, 0, len(configs))
77+
for _, cfg := range configs {
78+
job, ok := cfg.(map[string]any)
79+
if !ok {
80+
return errs.ErrClientProtoUnmarshal.FastGenByArgs(cfg)
81+
}
82+
jobs = append(jobs, job)
83+
}
84+
if err != nil {
85+
return err
86+
}
87+
alias := e.getAlias()
88+
jobID := float64(-1)
89+
for _, job := range jobs {
90+
// PD will ensure all the alias of uncompleted job are different.
91+
// PD return err if the some job alredy exist in the scheduler.
92+
if job["alias"] == alias && job["engine"] == e.engine.String() && job["rule"] == e.rule.String() && job["status"] != "finished" {
93+
id := job["job-id"].(float64)
94+
if id > jobID {
95+
jobID = id
96+
}
97+
}
98+
}
99+
if jobID != -1 {
100+
chk.AppendUint64(0, uint64(jobID))
101+
}
102+
return nil
103+
}
104+
105+
func (e *DistributeTableExec) distributeTable(ctx context.Context) error {
106+
input := make(map[string]any)
107+
input["alias"] = e.getAlias()
108+
input["engine"] = e.engine.String()
109+
input["rule"] = e.rule.String()
110+
startKeys := make([]string, 0, len(e.keyRanges))
111+
endKeys := make([]string, 0, len(e.keyRanges))
112+
for _, r := range e.keyRanges {
113+
startKey, endKey := r.EscapeAsUTF8Str()
114+
startKeys = append(startKeys, startKey)
115+
endKeys = append(endKeys, endKey)
116+
}
117+
input["start-key"] = strings.Join(startKeys, ",")
118+
input["end-key"] = strings.Join(endKeys, ",")
119+
return infosync.CreateSchedulerConfigWithInput(ctx, schedulerName, input)
120+
}
121+
122+
func (e *DistributeTableExec) getAlias() string {
123+
partitionStr := ""
124+
if len(e.partitionNames) != 0 {
125+
partitionStr = "partition("
126+
for idx, partition := range e.partitionNames {
127+
partitionStr += partition.String()
128+
if idx != len(e.partitionNames)-1 {
129+
partitionStr += ","
130+
}
131+
}
132+
partitionStr += ")"
133+
}
134+
dbName := getSchemaName(e.is, e.tableInfo.DBID)
135+
return strings.Join([]string{dbName, e.tableInfo.Name.String(), partitionStr}, ".")
136+
}
137+
138+
func (e *DistributeTableExec) getKeyRanges() ([]*pdhttp.KeyRange, error) {
139+
physicalIDs := make([]int64, 0)
140+
pi := e.tableInfo.GetPartitionInfo()
141+
if pi == nil {
142+
physicalIDs = append(physicalIDs, e.tableInfo.ID)
143+
} else {
144+
for _, name := range e.partitionNames {
145+
pid, err := tables.FindPartitionByName(e.tableInfo, name.L)
146+
if err != nil {
147+
return nil, err
148+
}
149+
physicalIDs = append(physicalIDs, pid)
150+
}
151+
if len(physicalIDs) == 0 {
152+
for _, p := range pi.Definitions {
153+
physicalIDs = append(physicalIDs, p.ID)
154+
}
155+
}
156+
}
157+
158+
ranges := make([]*pdhttp.KeyRange, 0, len(physicalIDs))
159+
for _, pid := range physicalIDs {
160+
startKey := codec.EncodeBytes([]byte{}, tablecodec.GenTablePrefix(pid))
161+
endKey := codec.EncodeBytes([]byte{}, tablecodec.GenTablePrefix(pid+1))
162+
r := pdhttp.NewKeyRange(startKey, endKey)
163+
ranges = append(ranges, r)
164+
}
165+
return ranges, nil
166+
}

0 commit comments

Comments
 (0)