Skip to content

Commit

Permalink
Online DDL: dynamic cut-over threshold via `ALTER VITESS_MIGRATION ..…
Browse files Browse the repository at this point in the history
…. CUTOVER_THRESHOLD ...` command (#17126)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Nov 20, 2024
1 parent fb79106 commit c5d0ecc
Show file tree
Hide file tree
Showing 18 changed files with 9,469 additions and 9,227 deletions.
29 changes: 28 additions & 1 deletion go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func testScheduler(t *testing.T) {
testTableSequentialTimes(t, t1uuid, t2uuid)
})
t.Run("Postpone launch CREATE", func(t *testing.T) {
t1uuid = testOnlineDDLStatement(t, createParams(createT1IfNotExistsStatement, ddlStrategy+" --postpone-launch", "vtgate", "", "", true)) // skip wait
t1uuid = testOnlineDDLStatement(t, createParams(createT1IfNotExistsStatement, ddlStrategy+" --postpone-launch --cut-over-threshold=14s", "vtgate", "", "", true)) // skip wait
time.Sleep(2 * time.Second)
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
Expand All @@ -501,6 +501,10 @@ func testScheduler(t *testing.T) {
for _, row := range rs.Named().Rows {
postponeLaunch := row.AsInt64("postpone_launch", 0)
assert.Equal(t, int64(0), postponeLaunch)

cutOverThresholdSeconds := row.AsInt64("cutover_threshold_seconds", 0)
// Threshold supplied in DDL strategy
assert.EqualValues(t, 14, cutOverThresholdSeconds)
}
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
Expand Down Expand Up @@ -580,6 +584,9 @@ func testScheduler(t *testing.T) {
assert.Equal(t, int64(1), postponeCompletion)
}
})
t.Run("set cut-over threshold", func(t *testing.T) {
onlineddl.CheckSetMigrationCutOverThreshold(t, &vtParams, shards, t1uuid, 17700*time.Millisecond, "")
})
t.Run("complete", func(t *testing.T) {
onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
Expand All @@ -592,6 +599,11 @@ func testScheduler(t *testing.T) {
for _, row := range rs.Named().Rows {
postponeCompletion := row.AsInt64("postpone_completion", 0)
assert.Equal(t, int64(0), postponeCompletion)

cutOverThresholdSeconds := row.AsInt64("cutover_threshold_seconds", 0)
// Expect 17800*time.Millisecond to be truncated to 17 seconds
assert.EqualValues(t, 17, cutOverThresholdSeconds)

assert.False(t, row["shadow_analyzed_timestamp"].IsNull())
}
})
Expand Down Expand Up @@ -1062,6 +1074,10 @@ func testScheduler(t *testing.T) {
for _, row := range rs.Named().Rows {
retries := row.AsInt64("retries", 0)
assert.Greater(t, retries, int64(0))

cutOverThresholdSeconds := row.AsInt64("cutover_threshold_seconds", 0)
// No explicit cut-over threshold given. Expect the default 10s
assert.EqualValues(t, 10, cutOverThresholdSeconds)
}
})
})
Expand All @@ -1088,6 +1104,13 @@ func testScheduler(t *testing.T) {
executedUUID := testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy, "vtctl", "", "", true)) // skip wait
require.Equal(t, uuid, executedUUID)

t.Run("set low cut-over threshold", func(t *testing.T) {
onlineddl.CheckSetMigrationCutOverThreshold(t, &vtParams, shards, uuid, 2*time.Second, "cut-over min value")
})
t.Run("set high cut-over threshold", func(t *testing.T) {
onlineddl.CheckSetMigrationCutOverThreshold(t, &vtParams, shards, uuid, 2000*time.Second, "cut-over max value")
})

// expect it to complete
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
Expand All @@ -1098,6 +1121,10 @@ func testScheduler(t *testing.T) {
for _, row := range rs.Named().Rows {
retries := row.AsInt64("retries", 0)
assert.Greater(t, retries, int64(0))

cutOverThresholdSeconds := row.AsInt64("cutover_threshold_seconds", 0)
// Teh default remains unchanged.
assert.EqualValues(t, 10, cutOverThresholdSeconds)
}
})
})
Expand Down
10 changes: 10 additions & 0 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,16 @@ func CheckForceMigrationCutOver(t *testing.T, vtParams *mysql.ConnParams, shards
}
}

// CheckSetMigrationCutOverThreshold sets the cut-over threshold for a given migration.
func CheckSetMigrationCutOverThreshold(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, threshold time.Duration, expectError string) {
query, err := sqlparser.ParseAndBind("alter vitess_migration %a cutover_threshold %a",
sqltypes.StringBindVariable(uuid),
sqltypes.StringBindVariable(threshold.String()),
)
require.NoError(t, err)
_ = VtgateExecQuery(t, vtParams, query, expectError)
}

// CheckMigrationStatus verifies that the migration indicated by given UUID has the given expected status
func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectStatuses ...schema.OnlineDDLStatus) bool {
ksName := shards[0].PrimaryTablet().VttabletProcess.Keyspace
Expand Down
2 changes: 2 additions & 0 deletions go/vt/schema/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"regexp"
"strconv"
"strings"
"time"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -116,6 +117,7 @@ type OnlineDDL struct {
Retries int64 `json:"retries,omitempty"`
ReadyToComplete int64 `json:"ready_to_complete,omitempty"`
WasReadyToComplete int64 `json:"was_ready_to_complete,omitempty"`
CutOverThreshold time.Duration `json:"cutover_threshold,omitempty"`
}

// ParseOnlineDDLStatement parses the given SQL into a statement and returns the action type of the DDL statement, or error
Expand Down
1 change: 1 addition & 0 deletions go/vt/sidecardb/schema/onlineddl/schema_migrations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ CREATE TABLE IF NOT EXISTS schema_migrations
`removed_foreign_key_names` text NOT NULL,
`last_cutover_attempt_timestamp` timestamp NULL DEFAULT NULL,
`force_cutover` tinyint unsigned NOT NULL DEFAULT '0',
`cutover_threshold_seconds` int unsigned NOT NULL DEFAULT '0',
PRIMARY KEY (`id`),
UNIQUE KEY `uuid_idx` (`migration_uuid`),
KEY `keyspace_shard_idx` (`keyspace`(64), `shard`(64)),
Expand Down
11 changes: 6 additions & 5 deletions go/vt/sqlparser/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,11 +491,12 @@ type (

// AlterMigration represents a ALTER VITESS_MIGRATION statement
AlterMigration struct {
Type AlterMigrationType
UUID string
Expire string
Ratio *Literal
Shards string
Type AlterMigrationType
UUID string
Expire string
Ratio *Literal
Threshold string
Shards string
}

// AlterTable represents a ALTER TABLE statement.
Expand Down
1 change: 1 addition & 0 deletions go/vt/sqlparser/ast_equals.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions go/vt/sqlparser/ast_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,13 @@ func (node *AlterMigration) Format(buf *TrackedBuffer) {
alterType = "force_cutover"
case ForceCutOverAllMigrationType:
alterType = "force_cutover all"
case SetCutOverThresholdMigrationType:
alterType = "cutover_threshold"
}
buf.astPrintf(node, " %#s", alterType)
if node.Threshold != "" {
buf.astPrintf(node, " '%#s'", node.Threshold)
}
if node.Expire != "" {
buf.astPrintf(node, " expire '%#s'", node.Expire)
}
Expand Down
7 changes: 7 additions & 0 deletions go/vt/sqlparser/ast_format_fast.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion go/vt/sqlparser/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions go/vt/sqlparser/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,7 @@ const (
UnthrottleAllMigrationType
ForceCutOverMigrationType
ForceCutOverAllMigrationType
SetCutOverThresholdMigrationType
)

// ColumnStorage constants
Expand Down
1 change: 1 addition & 0 deletions go/vt/sqlparser/keywords.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ var keywords = []keyword{
{"current_timestamp", CURRENT_TIMESTAMP},
{"current_user", CURRENT_USER},
{"cursor", UNUSED},
{"cutover_threshold", CUTOVER_THRESHOLD},
{"data", DATA},
{"database", DATABASE},
{"databases", DATABASES},
Expand Down
2 changes: 2 additions & 0 deletions go/vt/sqlparser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2482,6 +2482,8 @@ var (
input: "alter vitess_migration force_cutover all",
}, {
input: "alter vitess_migration '9748c3b7_7fdb_11eb_ac2c_f875a4d24e90' force_cutover",
}, {
input: "alter vitess_migration '9748c3b7_7fdb_11eb_ac2c_f875a4d24e90' cutover_threshold '17s'",
}, {
input: "alter vitess_migration '9748c3b7_7fdb_11eb_ac2c_f875a4d24e90' FORCE_CUTOVER",
output: "alter vitess_migration '9748c3b7_7fdb_11eb_ac2c_f875a4d24e90' force_cutover",
Expand Down
Loading

0 comments on commit c5d0ecc

Please sign in to comment.