Skip to content

Commit

Permalink
VReplication: Support reversing read-only traffic in vtctldclient (vi…
Browse files Browse the repository at this point in the history
  • Loading branch information
mattlord authored and mhamza15 committed Mar 3, 2025
1 parent a0d4776 commit d2a7411
Show file tree
Hide file tree
Showing 10 changed files with 1,044 additions and 101 deletions.
26 changes: 26 additions & 0 deletions go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,32 @@ create table nopk (name varchar(128), age int unsigned);
"create_ddl": "create table cproduct(pid bigint, description varchar(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid)) CHARSET=utf8mb4"
}]
}
`

materializeCustomerNameSpec = `
{
"workflow": "customer_name",
"source_keyspace": "customer",
"target_keyspace": "customer",
"table_settings": [{
"target_table": "customer_name",
"source_expression": "select cid, name from customer",
"create_ddl": "create table if not exists customer_name (cid bigint not null, name varchar(128), primary key(cid), key(name))"
}]
}
`

materializeCustomerTypeSpec = `
{
"workflow": "enterprise_customer",
"source_keyspace": "customer",
"target_keyspace": "customer",
"table_settings": [{
"target_table": "enterprise_customer",
"source_expression": "select cid, name, typ from customer where typ = 'enterprise'",
"create_ddl": "create table if not exists enterprise_customer (cid bigint not null, name varchar(128), typ varchar(64), primary key(cid), key(typ))"
}]
}
`

merchantOrdersVSchema = `
Expand Down
5 changes: 4 additions & 1 deletion go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,11 @@ func executeOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletPro

func assertQueryExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) {
t.Helper()
rr, err := vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules")
require.NoError(t, err)
count0, body0, count1, body1 := executeOnTablet(t, conn, tablet, ksName, query, matchQuery)
assert.Equalf(t, count0+1, count1, "query %q did not execute in target;\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\n", query, matchQuery, body0, body1)
require.Equalf(t, count0+1, count1, "query %q did not execute on destination %s (%s-%d);\ntried to match %q\nbefore:\n%s\n\nafter:\n%s\n\nrouting rules:\n%s\n\n",
query, ksName, tablet.Cell, tablet.TabletUID, matchQuery, body0, body1, rr)
}

func assertQueryDoesNotExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *cluster.VttabletProcess, ksName string, query string, matchQuery string) {
Expand Down
9 changes: 8 additions & 1 deletion go/test/endtoend/vreplication/movetables_buffering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@ import (
)

func TestMoveTablesBuffering(t *testing.T) {
defaultRdonly = 1
ogReplicas := defaultReplicas
ogRdOnly := defaultRdonly
defer func() {
defaultReplicas = ogReplicas
defaultRdonly = ogRdOnly
}()
defaultRdonly = 0
defaultReplicas = 0
vc = setupMinimalCluster(t)
defer vc.TearDown()

Expand Down
128 changes: 97 additions & 31 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"net"
"slices"
"strconv"
"strings"
"testing"
Expand All @@ -31,9 +32,11 @@ import (

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/wrangler"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

Expand Down Expand Up @@ -163,9 +166,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
args = append(args, "--tablet-types", tabletTypes)
}
args = append(args, "--action_timeout=10m") // At this point something is up so fail the test
if debugMode {
t.Logf("Executing workflow command: vtctldclient %v", strings.Join(args, " "))
}
t.Logf("Executing workflow command: vtctldclient %s", strings.Join(args, " "))
output, err := vc.VtctldClient.ExecuteCommandWithOutput(args...)
lastOutput = output
if err != nil {
Expand Down Expand Up @@ -326,27 +327,45 @@ func tstWorkflowCancel(t *testing.T) error {
return tstWorkflowAction(t, workflowActionCancel, "", "")
}

func validateReadsRoute(t *testing.T, tabletTypes string, tablet *cluster.VttabletProcess) {
if tabletTypes == "" {
tabletTypes = "replica,rdonly"
func validateReadsRoute(t *testing.T, tabletType string, tablet *cluster.VttabletProcess) {
if tablet == nil {
return
}
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
for _, tt := range []string{"replica", "rdonly"} {
destination := fmt.Sprintf("%s:%s@%s", tablet.Keyspace, tablet.Shard, tt)
if strings.Contains(tabletTypes, tt) {
readQuery := "select * from customer"
assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, readQuery)
}
}
// We do NOT want to target a shard as that goes around the routing rules and
// defeats the purpose here. We are using a query w/o a WHERE clause so for
// sharded keyspaces it should hit all shards as a SCATTER query. So all we
// care about is the keyspace and tablet type.
destination := fmt.Sprintf("%s@%s", tablet.Keyspace, strings.ToLower(tabletType))
readQuery := "select cid from customer limit 50"
assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, "select cid from customer limit :vtg1")
}

func validateReadsRouteToSource(t *testing.T, tabletTypes string) {
validateReadsRoute(t, tabletTypes, sourceReplicaTab)
tt, err := topoproto.ParseTabletTypes(tabletTypes)
require.NoError(t, err)
if slices.Contains(tt, topodatapb.TabletType_REPLICA) {
require.NotNil(t, sourceReplicaTab)
validateReadsRoute(t, topodatapb.TabletType_REPLICA.String(), sourceReplicaTab)
}
if slices.Contains(tt, topodatapb.TabletType_RDONLY) {
require.NotNil(t, sourceRdonlyTab)
validateReadsRoute(t, topodatapb.TabletType_RDONLY.String(), sourceRdonlyTab)
}
}

func validateReadsRouteToTarget(t *testing.T, tabletTypes string) {
validateReadsRoute(t, tabletTypes, targetReplicaTab1)
tt, err := topoproto.ParseTabletTypes(tabletTypes)
require.NoError(t, err)
if slices.Contains(tt, topodatapb.TabletType_REPLICA) {
require.NotNil(t, targetReplicaTab1)
validateReadsRoute(t, topodatapb.TabletType_REPLICA.String(), targetReplicaTab1)
}
if slices.Contains(tt, topodatapb.TabletType_RDONLY) {
require.NotNil(t, targetRdonlyTab1)
validateReadsRoute(t, topodatapb.TabletType_RDONLY.String(), targetRdonlyTab1)
}
}

func validateWritesRouteToSource(t *testing.T) {
Expand Down Expand Up @@ -396,6 +415,13 @@ func getCurrentStatus(t *testing.T) string {
// but CI currently fails on creating multiple clusters even after the previous ones are torn down

func TestBasicV2Workflows(t *testing.T) {
ogReplicas := defaultReplicas
ogRdOnly := defaultRdonly
defer func() {
defaultReplicas = ogReplicas
defaultRdonly = ogRdOnly
}()
defaultReplicas = 1
defaultRdonly = 1
extraVTTabletArgs = []string{
parallelInsertWorkers,
Expand Down Expand Up @@ -633,6 +659,12 @@ func testPartialSwitches(t *testing.T) {
tstWorkflowSwitchReads(t, "", "")
checkStates(t, nextState, nextState) // idempotency

tstWorkflowReverseReads(t, "replica,rdonly", "")
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)

tstWorkflowSwitchReads(t, "", "")
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)

tstWorkflowSwitchWrites(t)
currentState = nextState
nextState = wrangler.WorkflowStateAllSwitched
Expand Down Expand Up @@ -669,12 +701,12 @@ func testRestOfWorkflow(t *testing.T) {
waitForLowLag(t, "customer", "wf1")
tstWorkflowSwitchReads(t, "", "")
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)
validateReadsRouteToTarget(t, "replica")
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToSource(t)

tstWorkflowSwitchWrites(t)
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateAllSwitched)
validateReadsRouteToTarget(t, "replica")
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToTarget(t)

// this function is called for both MoveTables and Reshard, so the reverse workflows exist in different keyspaces
Expand All @@ -685,42 +717,45 @@ func testRestOfWorkflow(t *testing.T) {
waitForLowLag(t, keyspace, "wf1_reverse")
tstWorkflowReverseReads(t, "", "")
checkStates(t, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateWritesSwitched)
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToTarget(t)

tstWorkflowReverseWrites(t)
checkStates(t, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

waitForLowLag(t, "customer", "wf1")
tstWorkflowSwitchWrites(t)
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateWritesSwitched)
validateReadsRouteToSource(t, "replica")
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToTarget(t)

waitForLowLag(t, keyspace, "wf1_reverse")
tstWorkflowReverseWrites(t)
validateReadsRouteToSource(t, "replica")
checkStates(t, wrangler.WorkflowStateWritesSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

waitForLowLag(t, "customer", "wf1")
tstWorkflowSwitchReads(t, "", "")
validateReadsRouteToTarget(t, "replica")
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateReadsSwitched)
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToSource(t)

tstWorkflowReverseReads(t, "", "")
validateReadsRouteToSource(t, "replica")
checkStates(t, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

tstWorkflowSwitchReadsAndWrites(t)
validateReadsRouteToTarget(t, "replica")
validateReadsRoute(t, "rdonly", targetRdonlyTab1)
checkStates(t, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched)
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToTarget(t)
waitForLowLag(t, keyspace, "wf1_reverse")
tstWorkflowReverseReadsAndWrites(t)
validateReadsRoute(t, "rdonly", sourceRdonlyTab)
validateReadsRouteToSource(t, "replica")
checkStates(t, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched)
validateReadsRouteToSource(t, "replica,rdonly")
validateWritesRouteToSource(t)

// trying to complete an unswitched workflow should error
Expand All @@ -731,8 +766,7 @@ func testRestOfWorkflow(t *testing.T) {
// fully switch and complete
waitForLowLag(t, "customer", "wf1")
tstWorkflowSwitchReadsAndWrites(t)
validateReadsRoute(t, "rdonly", targetRdonlyTab1)
validateReadsRouteToTarget(t, "replica")
validateReadsRouteToTarget(t, "replica,rdonly")
validateWritesRouteToTarget(t)

err = tstWorkflowComplete(t)
Expand Down Expand Up @@ -787,7 +821,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster {

zone1 := vc.Cells["zone1"]

vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, 0, 0, 100, nil)
vc.AddKeyspace(t, []*Cell{zone1}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil)

verifyClusterHealth(t, vc)
insertInitialData(t)
Expand All @@ -800,7 +834,7 @@ func setupMinimalCluster(t *testing.T) *VitessCluster {
func setupMinimalCustomerKeyspace(t *testing.T) map[string]*cluster.VttabletProcess {
tablets := make(map[string]*cluster.VttabletProcess)
if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells["zone1"]}, "customer", "-80,80-",
customerVSchema, customerSchema, 0, 0, 200, nil); err != nil {
customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200, nil); err != nil {
t.Fatal(err)
}
defaultCell := vc.Cells[vc.CellNames[0]]
Expand Down Expand Up @@ -936,6 +970,7 @@ func createAdditionalCustomerShards(t *testing.T, shards string) {
targetTab2 = custKs.Shards["80-c0"].Tablets["zone1-600"].Vttablet
targetTab1 = custKs.Shards["40-80"].Tablets["zone1-500"].Vttablet
targetReplicaTab1 = custKs.Shards["-40"].Tablets["zone1-401"].Vttablet
targetRdonlyTab1 = custKs.Shards["-40"].Tablets["zone1-402"].Vttablet

sourceTab = custKs.Shards["-80"].Tablets["zone1-200"].Vttablet
sourceReplicaTab = custKs.Shards["-80"].Tablets["zone1-201"].Vttablet
Expand All @@ -947,3 +982,34 @@ func tstApplySchemaOnlineDDL(t *testing.T, sql string, keyspace string) {
"--sql", sql, keyspace)
require.NoError(t, err, fmt.Sprintf("ApplySchema Error: %s", err))
}

func validateTableRoutingRule(t *testing.T, table, tabletType, fromKeyspace, toKeyspace string) {
tabletType = strings.ToLower(strings.TrimSpace(tabletType))
rr := getRoutingRules(t)
// We set matched = true by default because it is possible, if --no-routing-rules is set while creating
// a workflow, that the routing rules are empty when the workflow starts.
// We set it to false below when the rule is found, but before matching the routed keyspace.
matched := true
for _, r := range rr.GetRules() {
fromRule := fmt.Sprintf("%s.%s", fromKeyspace, table)
if tabletType != "" && tabletType != "primary" {
fromRule = fmt.Sprintf("%s@%s", fromRule, tabletType)
}
if r.FromTable == fromRule {
// We found the rule, so we can set matched to false here and check for the routed keyspace below.
matched = false
require.NotEmpty(t, r.ToTables)
toTable := r.ToTables[0]
// The ToTables value is of the form "routedKeyspace.table".
routedKeyspace, routedTable, ok := strings.Cut(toTable, ".")
require.True(t, ok)
require.Equal(t, table, routedTable)
if routedKeyspace == toKeyspace {
// We found the rule, the table and keyspace matches, so our search is done.
matched = true
break
}
}
}
require.Truef(t, matched, "routing rule for %s.%s from %s to %s not found", fromKeyspace, table, tabletType, toKeyspace)
}
Loading

0 comments on commit d2a7411

Please sign in to comment.