Skip to content

Commit

Permalink
Applyschema uses ExecuteMultiFetchAsDba (#17078)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Nov 25, 2024
1 parent 145ff6b commit 0439d89
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 13 deletions.
6 changes: 4 additions & 2 deletions go/vt/schemamanager/local_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,10 @@ func (controller *LocalController) writeToLogDir(ctx context.Context, result *Ex
rowsReturned := uint64(0)
rowsAffected := uint64(0)
for _, queryResult := range result.SuccessShards {
rowsReturned += uint64(len(queryResult.Result.Rows))
rowsAffected += queryResult.Result.RowsAffected
for _, result := range queryResult.Results {
rowsReturned += uint64(len(result.Rows))
rowsAffected += result.RowsAffected
}
}
logFile.WriteString(fmt.Sprintf("-- Rows returned: %d\n", rowsReturned))
logFile.WriteString(fmt.Sprintf("-- Rows affected: %d\n", rowsAffected))
Expand Down
4 changes: 2 additions & 2 deletions go/vt/schemamanager/local_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ func TestLocalControllerSchemaChange(t *testing.T) {
result := &ExecuteResult{
Sqls: []string{"create table test_table (id int)"},
SuccessShards: []ShardResult{{
Shard: "0",
Result: &querypb.QueryResult{},
Shard: "0",
Results: []*querypb.QueryResult{{}},
}},
}
logPath := path.Join(controller.logDir, controller.sqlFilename)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/schemamanager/schemamanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ type ShardWithError struct {

// ShardResult contains sql execute information on a particular shard
type ShardResult struct {
Shard string
Result *querypb.QueryResult
Shard string
Results []*querypb.QueryResult
// Position is a replication position that is guaranteed to be after the
// schema change was applied. It can be used to wait for replicas to receive
// the schema change via replication.
Expand Down
7 changes: 7 additions & 0 deletions go/vt/schemamanager/schemamanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,13 @@ func (client *fakeTabletManagerClient) ExecuteFetchAsDba(ctx context.Context, ta
return client.TabletManagerClient.ExecuteFetchAsDba(ctx, tablet, usePool, req)
}

func (client *fakeTabletManagerClient) ExecuteMultiFetchAsDba(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, req *tabletmanagerdatapb.ExecuteMultiFetchAsDbaRequest) ([]*querypb.QueryResult, error) {
if client.EnableExecuteFetchAsDbaError {
return nil, fmt.Errorf("ExecuteMultiFetchAsDba occur an unknown error")
}
return client.TabletManagerClient.ExecuteMultiFetchAsDba(ctx, tablet, usePool, req)
}

// newFakeTopo returns a topo with:
// - a keyspace named 'test_keyspace'.
// - 3 shards named '1', '2', '3'.
Expand Down
14 changes: 8 additions & 6 deletions go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,13 +572,15 @@ func (exec *TabletExecutor) executeOneTablet(
errChan chan ShardWithError,
successChan chan ShardResult) {

var result *querypb.QueryResult
var results []*querypb.QueryResult
var err error
if viaQueryService {
result, err = exec.tmc.ExecuteQuery(ctx, tablet, &tabletmanagerdatapb.ExecuteQueryRequest{
result, reserr := exec.tmc.ExecuteQuery(ctx, tablet, &tabletmanagerdatapb.ExecuteQueryRequest{
Query: []byte(sql),
MaxRows: 10,
})
results = []*querypb.QueryResult{result}
err = reserr
} else {
if exec.ddlStrategySetting != nil && exec.ddlStrategySetting.IsAllowZeroInDateFlag() {
// --allow-zero-in-date Applies to DDLs
Expand All @@ -588,14 +590,14 @@ func (exec *TabletExecutor) executeOneTablet(
return
}
}
request := &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{
Query: []byte(sql),
request := &tabletmanagerdatapb.ExecuteMultiFetchAsDbaRequest{
Sql: []byte(sql),
MaxRows: 10,
}
if exec.ddlStrategySetting != nil && exec.ddlStrategySetting.IsAllowForeignKeysFlag() {
request.DisableForeignKeyChecks = true
}
result, err = exec.tmc.ExecuteFetchAsDba(ctx, tablet, false, request)
results, err = exec.tmc.ExecuteMultiFetchAsDba(ctx, tablet, false, request)

}
if err != nil {
Expand All @@ -614,7 +616,7 @@ func (exec *TabletExecutor) executeOneTablet(
}
successChan <- ShardResult{
Shard: tablet.Shard,
Result: result,
Results: results,
Position: pos,
}
}
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,9 @@ func (s *VtctldServer) ApplySchema(ctx context.Context, req *vtctldatapb.ApplySc
}

for _, shard := range execResult.SuccessShards {
resp.RowsAffectedByShard[shard.Shard] = shard.Result.RowsAffected
for _, result := range shard.Results {
resp.RowsAffectedByShard[shard.Shard] += result.RowsAffected
}
}

return resp, err
Expand Down

0 comments on commit 0439d89

Please sign in to comment.