diff --git a/go/os2/file.go b/go/os2/file.go
new file mode 100644
index 00000000000..7c284ed1fc1
--- /dev/null
+++ b/go/os2/file.go
@@ -0,0 +1,50 @@
+/*
+Copyright 2025 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package os2
+
+import (
+ "io/fs"
+ "os"
+)
+
+const (
+ // PermFile is a FileMode for regular files without world permission bits.
+ PermFile fs.FileMode = 0660
+ // PermDirectory is a FileMode for directories without world permission bits.
+ PermDirectory fs.FileMode = 0770
+)
+
+// Create is identical to os.Create except uses 0660 permission
+// rather than 0666, to exclude world read/write bit.
+func Create(name string) (*os.File, error) {
+ return os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, PermFile)
+}
+
+// WriteFile is identical to os.WriteFile except permission of 0660 is used.
+func WriteFile(name string, data []byte) error {
+ return os.WriteFile(name, data, PermFile)
+}
+
+// Mkdir is identical to os.Mkdir except permission of 0770 is used.
+func Mkdir(path string) error {
+ return os.Mkdir(path, PermDirectory)
+}
+
+// MkdirAll is identical to os.MkdirAll except permission of 0770 is used.
+func MkdirAll(path string) error {
+ return os.MkdirAll(path, PermDirectory)
+}
diff --git a/go/pools/smartconnpool/pool.go b/go/pools/smartconnpool/pool.go
index ecc3f827c7b..c75ad6c12df 100644
--- a/go/pools/smartconnpool/pool.go
+++ b/go/pools/smartconnpool/pool.go
@@ -156,7 +156,6 @@ type ConnPool[C Connection] struct {
// The pool must be ConnPool.Open before it can start giving out connections
func NewPool[C Connection](config *Config[C]) *ConnPool[C] {
pool := &ConnPool[C]{}
- pool.freshSettingsStack.Store(-1)
pool.config.maxCapacity = config.Capacity
pool.config.maxLifetime.Store(config.MaxLifetime.Nanoseconds())
pool.config.idleTimeout.Store(config.IdleTimeout.Nanoseconds())
@@ -195,8 +194,14 @@ func (pool *ConnPool[C]) open() {
// The expire worker takes care of removing from the waiter list any clients whose
// context has been cancelled.
- pool.runWorker(pool.close, 1*time.Second, func(_ time.Time) bool {
- pool.wait.expire(false)
+ pool.runWorker(pool.close, 100*time.Millisecond, func(_ time.Time) bool {
+ maybeStarving := pool.wait.expire(false)
+
+ // Do not allow connections to starve; if there's waiters in the queue
+ // and connections in the stack, it means we could be starving them.
+ // Try getting out a connection and handing it over directly
+ for n := 0; n < maybeStarving && pool.tryReturnAnyConn(); n++ {
+ }
return true
})
@@ -395,16 +400,34 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
}
}
- if !pool.wait.tryReturnConn(conn) {
- connSetting := conn.Conn.Setting()
- if connSetting == nil {
- pool.clean.Push(conn)
- } else {
- stack := connSetting.bucket & stackMask
- pool.settings[stack].Push(conn)
- pool.freshSettingsStack.Store(int64(stack))
+ pool.tryReturnConn(conn)
+}
+
+func (pool *ConnPool[C]) tryReturnConn(conn *Pooled[C]) bool {
+ if pool.wait.tryReturnConn(conn) {
+ return true
+ }
+ connSetting := conn.Conn.Setting()
+ if connSetting == nil {
+ pool.clean.Push(conn)
+ } else {
+ stack := connSetting.bucket & stackMask
+ pool.settings[stack].Push(conn)
+ pool.freshSettingsStack.Store(int64(stack))
+ }
+ return false
+}
+
+func (pool *ConnPool[C]) tryReturnAnyConn() bool {
+ if conn, ok := pool.clean.Pop(); ok {
+ return pool.tryReturnConn(conn)
+ }
+ for u := 0; u <= stackMask; u++ {
+ if conn, ok := pool.settings[u].Pop(); ok {
+ return pool.tryReturnConn(conn)
}
}
+ return false
}
func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration {
@@ -443,14 +466,9 @@ func (pool *ConnPool[C]) connNew(ctx context.Context) (*Pooled[C], error) {
}
func (pool *ConnPool[C]) getFromSettingsStack(setting *Setting) *Pooled[C] {
- fresh := pool.freshSettingsStack.Load()
- if fresh < 0 {
- return nil
- }
-
var start uint32
if setting == nil {
- start = uint32(fresh)
+ start = uint32(pool.freshSettingsStack.Load())
} else {
start = setting.bucket
}
diff --git a/go/pools/smartconnpool/pool_test.go b/go/pools/smartconnpool/pool_test.go
index 701327005ad..a399bdfb3a4 100644
--- a/go/pools/smartconnpool/pool_test.go
+++ b/go/pools/smartconnpool/pool_test.go
@@ -20,6 +20,7 @@ import (
"context"
"fmt"
"reflect"
+ "sync"
"sync/atomic"
"testing"
"time"
@@ -36,6 +37,7 @@ var (
type TestState struct {
lastID, open, close, reset atomic.Int64
waits []time.Time
+ mu sync.Mutex
chaos struct {
delayConnect time.Duration
@@ -45,6 +47,8 @@ type TestState struct {
}
func (ts *TestState) LogWait(start time.Time) {
+ ts.mu.Lock()
+ defer ts.mu.Unlock()
ts.waits = append(ts.waits, start)
}
@@ -1080,3 +1084,68 @@ func TestApplySettingsFailure(t *testing.T) {
p.put(r)
}
}
+
+func TestGetSpike(t *testing.T) {
+ var state TestState
+
+ ctx := context.Background()
+ p := NewPool(&Config[*TestConn]{
+ Capacity: 5,
+ IdleTimeout: time.Second,
+ LogWait: state.LogWait,
+ }).Open(newConnector(&state), nil)
+
+ var resources [10]*Pooled[*TestConn]
+
+ // Ensure we have a pool with 5 available resources
+ for i := 0; i < 5; i++ {
+ r, err := p.Get(ctx, nil)
+
+ require.NoError(t, err)
+ resources[i] = r
+ assert.EqualValues(t, 5-i-1, p.Available())
+ assert.Zero(t, p.Metrics.WaitCount())
+ assert.Zero(t, len(state.waits))
+ assert.Zero(t, p.Metrics.WaitTime())
+ assert.EqualValues(t, i+1, state.lastID.Load())
+ assert.EqualValues(t, i+1, state.open.Load())
+ }
+
+ for i := 0; i < 5; i++ {
+ p.put(resources[i])
+ }
+
+ assert.EqualValues(t, 5, p.Available())
+ assert.EqualValues(t, 5, p.Active())
+ assert.EqualValues(t, 0, p.InUse())
+
+ for i := 0; i < 2000; i++ {
+ wg := sync.WaitGroup{}
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+
+ errs := make(chan error, 80)
+
+ for j := 0; j < 80; j++ {
+ wg.Add(1)
+
+ go func() {
+ defer wg.Done()
+ r, err := p.Get(ctx, nil)
+ defer p.put(r)
+
+ if err != nil {
+ errs <- err
+ }
+ }()
+ }
+ wg.Wait()
+
+ if len(errs) > 0 {
+ t.Errorf("Error getting connection: %v", <-errs)
+ }
+
+ close(errs)
+ }
+}
diff --git a/go/pools/smartconnpool/waitlist.go b/go/pools/smartconnpool/waitlist.go
index f16215f4b14..ef1eb1fe997 100644
--- a/go/pools/smartconnpool/waitlist.go
+++ b/go/pools/smartconnpool/waitlist.go
@@ -76,7 +76,7 @@ func (wl *waitlist[C]) waitForConn(ctx context.Context, setting *Setting) (*Pool
// expire removes and wakes any expired waiter in the waitlist.
// if force is true, it'll wake and remove all the waiters.
-func (wl *waitlist[C]) expire(force bool) {
+func (wl *waitlist[C]) expire(force bool) (maybeStarving int) {
if wl.list.Len() == 0 {
return
}
@@ -91,6 +91,9 @@ func (wl *waitlist[C]) expire(force bool) {
expired = append(expired, e)
continue
}
+ if e.Value.age == 0 {
+ maybeStarving++
+ }
}
// remove the expired waiters from the waitlist after traversing it
for _, e := range expired {
@@ -102,6 +105,7 @@ func (wl *waitlist[C]) expire(force bool) {
for _, e := range expired {
e.Value.sema.notify(false)
}
+ return
}
// tryReturnConn tries handing over a connection to one of the waiters in the pool.
diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go
index 04a67c5bf4d..f8f875164ad 100644
--- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go
+++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go
@@ -408,6 +408,18 @@ func TestBackup(t *testing.T, setupType int, streamMode string, stripes int, cDe
return vterrors.Errorf(vtrpc.Code_UNKNOWN, "test failure: %s", test.name)
}
}
+
+ t.Run("check for files created with global permissions", func(t *testing.T) {
+ t.Logf("Confirming that none of the MySQL data directories that we've created have files with global permissions")
+ for _, ks := range localCluster.Keyspaces {
+ for _, shard := range ks.Shards {
+ for _, tablet := range shard.Vttablets {
+ tablet.VttabletProcess.ConfirmDataDirHasNoGlobalPerms(t)
+ }
+ }
+ }
+ })
+
return nil
}
diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go
index f5b19094195..3d0783b1dac 100644
--- a/go/test/endtoend/cluster/vttablet_process.go
+++ b/go/test/endtoend/cluster/vttablet_process.go
@@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"io"
+ "io/fs"
"net/http"
"os"
"os/exec"
@@ -35,6 +36,8 @@ import (
"testing"
"time"
+ "github.com/stretchr/testify/require"
+
"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
@@ -677,6 +680,67 @@ func (vttablet *VttabletProcess) IsShutdown() bool {
return vttablet.proc == nil
}
+// ConfirmDataDirHasNoGlobalPerms confirms that no files in the tablet's data directory
+// have any global/world/other permissions enabled.
+func (vttablet *VttabletProcess) ConfirmDataDirHasNoGlobalPerms(t *testing.T) {
+ datadir := vttablet.Directory
+ if _, err := os.Stat(datadir); errors.Is(err, os.ErrNotExist) {
+ t.Logf("Data directory %s no longer exists, skipping permissions check", datadir)
+ return
+ }
+
+ var allowedFiles = []string{
+ // These are intentionally created with the world/other read bit set by mysqld itself
+ // during the --initialize[-insecure] step.
+ // See: https://dev.mysql.com/doc/mysql-security-excerpt/en/creating-ssl-rsa-files-using-mysql.html
+ // "On Unix and Unix-like systems, the file access mode is 644 for certificate files
+ // (that is, world readable) and 600 for key files (that is, accessible only by the
+ // account that runs the server)."
+ path.Join("data", "ca.pem"),
+ path.Join("data", "client-cert.pem"),
+ path.Join("data", "public_key.pem"),
+ path.Join("data", "server-cert.pem"),
+ // The domain socket must have global perms for anyone to use it.
+ "mysql.sock",
+ // These files are created by xtrabackup.
+ path.Join("tmp", "xtrabackup_checkpoints"),
+ path.Join("tmp", "xtrabackup_info"),
+ // These are 5.7 specific xtrabackup files.
+ path.Join("data", "xtrabackup_binlog_pos_innodb"),
+ path.Join("data", "xtrabackup_master_key_id"),
+ path.Join("data", "mysql_upgrade_info"),
+ }
+
+ var matches []string
+ fsys := os.DirFS(datadir)
+ err := fs.WalkDir(fsys, ".", func(p string, d fs.DirEntry, _ error) error {
+ // first check if the file should be skipped
+ for _, name := range allowedFiles {
+ if strings.HasSuffix(p, name) {
+ return nil
+ }
+ }
+
+ info, err := d.Info()
+ if err != nil {
+ return err
+ }
+
+ // check if any global bit is on the filemode
+ if info.Mode()&0007 != 0 {
+ matches = append(matches, fmt.Sprintf(
+ "%s (%s)",
+ path.Join(datadir, p),
+ info.Mode(),
+ ))
+ }
+ return nil
+ })
+
+ require.NoError(t, err, "Error walking directory")
+ require.Empty(t, matches, "Found files with global permissions: %s\n", strings.Join(matches, "\n"))
+}
+
// VttabletProcessInstance returns a VttabletProcess handle for vttablet process
// configured with the given Config.
// The process must be manually started by calling setup()
diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go
index 9eb72e9019a..33449462503 100644
--- a/go/test/endtoend/vreplication/cluster_test.go
+++ b/go/test/endtoend/vreplication/cluster_test.go
@@ -887,7 +887,7 @@ func (vc *VitessCluster) getVttabletsInKeyspace(t *testing.T, cell *Cell, ksName
tablets := make(map[string]*cluster.VttabletProcess)
for _, shard := range keyspace.Shards {
for _, tablet := range shard.Tablets {
- if tablet.Vttablet.GetTabletStatus() == "SERVING" {
+ if tablet.Vttablet.GetTabletStatus() == "SERVING" && (tabletType == "" || strings.EqualFold(tablet.Vttablet.GetTabletType(), tabletType)) {
log.Infof("Serving status of tablet %s is %s, %s", tablet.Name, tablet.Vttablet.ServingStatus, tablet.Vttablet.GetTabletStatus())
tablets[tablet.Name] = tablet.Vttablet
}
diff --git a/go/test/endtoend/vreplication/vdiff_helper_test.go b/go/test/endtoend/vreplication/vdiff_helper_test.go
index e7d0b714834..30b4a554b50 100644
--- a/go/test/endtoend/vreplication/vdiff_helper_test.go
+++ b/go/test/endtoend/vreplication/vdiff_helper_test.go
@@ -337,9 +337,7 @@ func getVDiffInfo(json string) *vdiffInfo {
}
func encodeString(in string) string {
- var buf strings.Builder
- sqltypes.NewVarChar(in).EncodeSQL(&buf)
- return buf.String()
+ return sqltypes.EncodeStringSQL(in)
}
// generateMoreCustomers creates additional test data for better tests
diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go
index 4e50ea12af3..7e3f93b6b20 100644
--- a/go/test/endtoend/vreplication/vreplication_test.go
+++ b/go/test/endtoend/vreplication/vreplication_test.go
@@ -762,11 +762,16 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
switchReads(t, workflowType, cellNames, ksWorkflow, false)
assertQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", query, query)
+ switchWritesDryRun(t, workflowType, ksWorkflow, dryRunResultsSwitchWritesCustomerShard)
+
+ testSwitchWritesErrorHandling(t, []*cluster.VttabletProcess{productTab}, []*cluster.VttabletProcess{customerTab1, customerTab2},
+ workflow, workflowType)
+
var commit func(t *testing.T)
if withOpenTx {
commit, _ = vc.startQuery(t, openTxQuery)
}
- switchWritesDryRun(t, workflowType, ksWorkflow, dryRunResultsSwitchWritesCustomerShard)
+ // Now let's confirm that it works as expected with an error.
switchWrites(t, workflowType, ksWorkflow, false)
checkThatVDiffFails(t, targetKs, workflow)
@@ -998,6 +1003,7 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
require.NoError(t, vc.AddShards(t, cells, keyspace, targetShards, defaultReplicas, defaultRdonly, tabletIDBase, targetKsOpts))
tablets := vc.getVttabletsInKeyspace(t, defaultCell, ksName, "primary")
+ var sourceTablets, targetTablets []*cluster.VttabletProcess
// Test multi-primary setups, like a Galera cluster, which have auto increment steps > 1.
for _, tablet := range tablets {
@@ -1010,9 +1016,11 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
targetShards = "," + targetShards + ","
for _, tab := range tablets {
if strings.Contains(targetShards, ","+tab.Shard+",") {
+ targetTablets = append(targetTablets, tab)
log.Infof("Waiting for vrepl to catch up on %s since it IS a target shard", tab.Shard)
catchup(t, tab, workflow, "Reshard")
} else {
+ sourceTablets = append(sourceTablets, tab)
log.Infof("Not waiting for vrepl to catch up on %s since it is NOT a target shard", tab.Shard)
continue
}
@@ -1026,6 +1034,10 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
if dryRunResultSwitchWrites != nil {
reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary", "--dry-run")
}
+ if tableName == "customer" {
+ testSwitchWritesErrorHandling(t, sourceTablets, targetTablets, workflow, "reshard")
+ }
+ // Now let's confirm that it works as expected with an error.
reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary")
reshardAction(t, "Complete", workflow, ksName, "", "", "", "")
for tabletName, count := range counts {
@@ -1534,6 +1546,140 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes
validateDryRunResults(t, output, dryRunResults)
}
+// testSwitchWritesErrorHandling confirms that switching writes works as expected
+// in the face of vreplication lag (canSwitch() precheck) and when canceling the
+// switch due to replication failing to catch up in time.
+// The workflow MUST be migrating the customer table from the source to the
+// target keyspace AND the workflow must currently have reads switched but not
+// writes.
+func testSwitchWritesErrorHandling(t *testing.T, sourceTablets, targetTablets []*cluster.VttabletProcess, workflow, workflowType string) {
+ t.Run("validate switch writes error handling", func(t *testing.T) {
+ vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
+ defer vtgateConn.Close()
+ require.NotZero(t, len(sourceTablets), "no source tablets provided")
+ require.NotZero(t, len(targetTablets), "no target tablets provided")
+ sourceKs := sourceTablets[0].Keyspace
+ targetKs := targetTablets[0].Keyspace
+ ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
+ var err error
+ sourceConns := make([]*mysql.Conn, len(sourceTablets))
+ for i, tablet := range sourceTablets {
+ sourceConns[i], err = tablet.TabletConn(tablet.Keyspace, true)
+ require.NoError(t, err)
+ defer sourceConns[i].Close()
+ }
+ targetConns := make([]*mysql.Conn, len(targetTablets))
+ for i, tablet := range targetTablets {
+ targetConns[i], err = tablet.TabletConn(tablet.Keyspace, true)
+ require.NoError(t, err)
+ defer targetConns[i].Close()
+ }
+ startingTestRowID := 10000000
+ numTestRows := 100
+ addTestRows := func() {
+ for i := 0; i < numTestRows; i++ {
+ execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("insert into customer (cid, name) values (%d, 'laggingCustomer')",
+ startingTestRowID+i))
+ }
+ }
+ deleteTestRows := func() {
+ execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("delete from customer where cid >= %d", startingTestRowID))
+ }
+ addIndex := func() {
+ for _, targetConn := range targetConns {
+ execQuery(t, targetConn, "set session sql_mode=''")
+ execQuery(t, targetConn, "alter table customer add unique index name_idx (name)")
+ }
+ }
+ dropIndex := func() {
+ for _, targetConn := range targetConns {
+ execQuery(t, targetConn, "alter table customer drop index name_idx")
+ }
+ }
+ lockTargetTable := func() {
+ for _, targetConn := range targetConns {
+ execQuery(t, targetConn, "lock table customer read")
+ }
+ }
+ unlockTargetTable := func() {
+ for _, targetConn := range targetConns {
+ execQuery(t, targetConn, "unlock tables")
+ }
+ }
+ cleanupTestData := func() {
+ dropIndex()
+ deleteTestRows()
+ }
+ restartWorkflow := func() {
+ err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", targetKs, "start", "--workflow", workflow)
+ require.NoError(t, err, "failed to start workflow: %v", err)
+ }
+ waitForTargetToCatchup := func() {
+ waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
+ waitForNoWorkflowLag(t, vc, targetKs, workflow)
+ }
+
+ // First let's test that the prechecks work as expected. We ALTER
+ // the table on the target shards to add a unique index on the name
+ // field.
+ addIndex()
+ // Then we replicate some test rows across the target shards by
+ // inserting them in the source keyspace.
+ addTestRows()
+ // Now the workflow should go into the error state and the lag should
+ // start to climb. So we sleep for twice the max lag duration that we
+ // will set for the SwitchTraffic call.
+ lagDuration := 3 * time.Second
+ time.Sleep(lagDuration * 3)
+ out, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
+ "SwitchTraffic", "--tablet-types=primary", "--timeout=30s", "--max-replication-lag-allowed", lagDuration.String())
+ // It should fail in the canSwitch() precheck.
+ require.Error(t, err)
+ require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*",
+ workflow, lagDuration.String()), out)
+ require.NotContains(t, out, "cancel migration failed")
+ // Confirm that queries still work fine.
+ execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
+ cleanupTestData()
+ // We have to restart the workflow again as the duplicate key error
+ // is a permanent/terminal one.
+ restartWorkflow()
+ waitForTargetToCatchup()
+
+ // Now let's test that the cancel works by setting the command timeout
+ // to a fraction (6s) of the default max repl lag duration (30s). First
+ // we lock the customer table on the target tablets so that we cannot
+ // apply the INSERTs and catch up.
+ lockTargetTable()
+ addTestRows()
+ timeout := lagDuration * 2 // 6s
+ // Use the default max-replication-lag-allowed value of 30s.
+ // We run the command in a goroutine so that we can unblock things
+ // after the timeout is reached -- as the vplayer query is blocking
+ // on the table lock in the MySQL layer.
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
+ "SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String())
+ }()
+ time.Sleep(timeout)
+ // Now we can unblock things and let it continue.
+ unlockTargetTable()
+ wg.Wait()
+ // It should fail due to the command context timeout and we should
+ // successfully cancel.
+ require.Error(t, err)
+ require.Contains(t, out, "failed to sync up replication between the source and target")
+ require.NotContains(t, out, "cancel migration failed")
+ // Confirm that queries still work fine.
+ execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
+ deleteTestRows()
+ waitForTargetToCatchup()
+ })
+}
+
// restartWorkflow confirms that a workflow can be successfully
// stopped and started.
func restartWorkflow(t *testing.T, ksWorkflow string) {
diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go
index f651f3bb25c..35f50269a29 100644
--- a/go/vt/binlog/binlogplayer/binlog_player.go
+++ b/go/vt/binlog/binlogplayer/binlog_player.go
@@ -527,7 +527,7 @@ func (blp *BinlogPlayer) setVReplicationState(state binlogdatapb.VReplicationWor
})
}
blp.blplStats.State.Store(state.String())
- query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state.String(), encodeString(MessageTruncate(message)), blp.uid)
+ query := fmt.Sprintf("update _vt.vreplication set state=%v, message=%v where id=%v", encodeString(state.String()), encodeString(MessageTruncate(message)), blp.uid)
if _, err := blp.dbClient.ExecuteFetch(query, 1); err != nil {
return fmt.Errorf("could not set state: %v: %v", query, err)
}
@@ -608,9 +608,9 @@ func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, posi
workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType, deferSecondaryKeys bool) string {
return fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys) "+
- "values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d, %v)",
+ "values (%v, %v, %v, %v, %v, %v, 0, %v, %v, %d, %d, %v)",
encodeString(workflow), encodeString(source.String()), encodeString(position), maxTPS, maxReplicationLag,
- timeUpdated, binlogdatapb.VReplicationWorkflowState_Running.String(), encodeString(dbName), workflowType, workflowSubType, deferSecondaryKeys)
+ timeUpdated, encodeString(binlogdatapb.VReplicationWorkflowState_Running.String()), encodeString(dbName), workflowType, workflowSubType, deferSecondaryKeys)
}
// CreateVReplicationState returns a statement to create a stopped vreplication.
@@ -618,9 +618,9 @@ func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource,
workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType) string {
return fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type) "+
- "values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d)",
+ "values (%v, %v, %v, %v, %v, %v, 0, %v, %v, %d, %d)",
encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled,
- throttler.ReplicationLagModuleDisabled, time.Now().Unix(), state.String(), encodeString(dbName),
+ throttler.ReplicationLagModuleDisabled, time.Now().Unix(), encodeString(state.String()), encodeString(dbName),
workflowType, workflowSubType)
}
@@ -663,15 +663,15 @@ func GenerateUpdateTimeThrottled(uid int32, timeThrottledUnix int64, componentTh
// StartVReplicationUntil returns a statement to start the replication with a stop position.
func StartVReplicationUntil(uid int32, pos string) string {
return fmt.Sprintf(
- "update _vt.vreplication set state='%v', stop_pos=%v where id=%v",
- binlogdatapb.VReplicationWorkflowState_Running.String(), encodeString(pos), uid)
+ "update _vt.vreplication set state=%v, stop_pos=%v where id=%v",
+ encodeString(binlogdatapb.VReplicationWorkflowState_Running.String()), encodeString(pos), uid)
}
// StopVReplication returns a statement to stop the replication.
func StopVReplication(uid int32, message string) string {
return fmt.Sprintf(
- "update _vt.vreplication set state='%v', message=%v where id=%v",
- binlogdatapb.VReplicationWorkflowState_Stopped.String(), encodeString(MessageTruncate(message)), uid)
+ "update _vt.vreplication set state=%v, message=%v where id=%v",
+ encodeString(binlogdatapb.VReplicationWorkflowState_Stopped.String()), encodeString(MessageTruncate(message)), uid)
}
// DeleteVReplication returns a statement to delete the replication.
@@ -686,9 +686,7 @@ func MessageTruncate(msg string) string {
}
func encodeString(in string) string {
- buf := bytes.NewBuffer(nil)
- sqltypes.NewVarChar(in).EncodeSQL(buf)
- return buf.String()
+ return sqltypes.EncodeStringSQL(in)
}
// ReadVReplicationPos returns a statement to query the gtid for a
diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go
index 7aaba085b14..39b6a0e1ad3 100644
--- a/go/vt/discovery/healthcheck.go
+++ b/go/vt/discovery/healthcheck.go
@@ -91,6 +91,9 @@ var (
// How much to sleep between each check.
waitAvailableTabletInterval = 100 * time.Millisecond
+ // Size of channel buffer for each subscriber
+ broadcastChannelBufferSize = 2048
+
// HealthCheckCacheTemplate uses healthCheckTemplate with the `HealthCheck Tablet - Cache` title to create the
// HTML code required to render the cache of the HealthCheck.
HealthCheckCacheTemplate = fmt.Sprintf(healthCheckTemplate, "HealthCheck - Cache")
@@ -624,7 +627,7 @@ func (hc *HealthCheckImpl) recomputeHealthy(key KeyspaceShardTabletType) {
func (hc *HealthCheckImpl) Subscribe() chan *TabletHealth {
hc.subMu.Lock()
defer hc.subMu.Unlock()
- c := make(chan *TabletHealth, 2)
+ c := make(chan *TabletHealth, broadcastChannelBufferSize)
hc.subscribers[c] = struct{}{}
return c
}
@@ -643,6 +646,8 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) {
select {
case c <- th:
default:
+ // If the channel is full, we drop the message.
+ log.Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet))
}
}
}
diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go
index 70f3654f242..ba79fd56c61 100644
--- a/go/vt/discovery/healthcheck_test.go
+++ b/go/vt/discovery/healthcheck_test.go
@@ -1460,6 +1460,50 @@ func TestDebugURLFormatting(t *testing.T) {
require.Contains(t, wr.String(), expectedURL, "output missing formatted URL")
}
+// TestConcurrentUpdates tests that concurrent updates from the HealthCheck implementation aren't dropped.
+// Added in response to https://github.com/vitessio/vitess/issues/17629.
+func TestConcurrentUpdates(t *testing.T) {
+ ctx := utils.LeakCheckContext(t)
+ var mu sync.Mutex
+ // reset error counters
+ hcErrorCounters.ResetAll()
+ ts := memorytopo.NewServer(ctx, "cell")
+ defer ts.Close()
+ hc := createTestHc(ctx, ts)
+ // close healthcheck
+ defer hc.Close()
+
+ // Subscribe to the healthcheck
+ // Make the receiver keep track of the updates received.
+ ch := hc.Subscribe()
+ totalCount := 0
+ go func() {
+ for range ch {
+ mu.Lock()
+ totalCount++
+ mu.Unlock()
+ // Simulate a somewhat slow consumer.
+ time.Sleep(100 * time.Millisecond)
+ }
+ }()
+
+ // Run multiple updates really quickly
+ // one after the other.
+ totalUpdates := 10
+ for i := 0; i < totalUpdates; i++ {
+ hc.broadcast(&TabletHealth{})
+ }
+ // Unsubscribe from the healthcheck
+ // and verify we process all the updates eventually.
+ hc.Unsubscribe(ch)
+ defer close(ch)
+ require.Eventuallyf(t, func() bool {
+ mu.Lock()
+ defer mu.Unlock()
+ return totalUpdates == totalCount
+ }, 5*time.Second, 100*time.Millisecond, "expected all updates to be processed")
+}
+
func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservice.QueryService, error) {
connMapMu.Lock()
defer connMapMu.Unlock()
diff --git a/go/vt/mysqlctl/backupengine.go b/go/vt/mysqlctl/backupengine.go
index 915b5e6894f..6e5c8f314ed 100644
--- a/go/vt/mysqlctl/backupengine.go
+++ b/go/vt/mysqlctl/backupengine.go
@@ -30,6 +30,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/replication"
+ "vitess.io/vitess/go/os2"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/backupstats"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
@@ -654,7 +655,7 @@ func createStateFile(cnf *Mycnf) error {
// rename func to openStateFile
// change to return a *File
fname := filepath.Join(cnf.TabletDir(), RestoreState)
- fd, err := os.Create(fname)
+ fd, err := os2.Create(fname)
if err != nil {
return fmt.Errorf("unable to create file: %v", err)
}
diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go
index af6b8fdbd85..68806824faf 100644
--- a/go/vt/mysqlctl/builtinbackupengine.go
+++ b/go/vt/mysqlctl/builtinbackupengine.go
@@ -39,6 +39,7 @@ import (
"vitess.io/vitess/go/ioutil"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/replication"
+ "vitess.io/vitess/go/os2"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
@@ -199,10 +200,10 @@ func (fe *FileEntry) open(cnf *Mycnf, readOnly bool) (*os.File, error) {
}
} else {
dir := path.Dir(name)
- if err := os.MkdirAll(dir, os.ModePerm); err != nil {
+ if err := os2.MkdirAll(dir); err != nil {
return nil, vterrors.Wrapf(err, "cannot create destination directory %v", dir)
}
- if fd, err = os.Create(name); err != nil {
+ if fd, err = os2.Create(name); err != nil {
return nil, vterrors.Wrapf(err, "cannot create destination file %v", name)
}
}
diff --git a/go/vt/mysqlctl/filebackupstorage/file.go b/go/vt/mysqlctl/filebackupstorage/file.go
index 99148d9169b..9c3e8165e2c 100644
--- a/go/vt/mysqlctl/filebackupstorage/file.go
+++ b/go/vt/mysqlctl/filebackupstorage/file.go
@@ -28,6 +28,7 @@ import (
"github.com/spf13/pflag"
"vitess.io/vitess/go/ioutil"
+ "vitess.io/vitess/go/os2"
"vitess.io/vitess/go/vt/concurrency"
stats "vitess.io/vitess/go/vt/mysqlctl/backupstats"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
@@ -110,7 +111,7 @@ func (fbh *FileBackupHandle) AddFile(ctx context.Context, filename string, files
return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
}
p := path.Join(FileBackupStorageRoot, fbh.dir, fbh.name, filename)
- f, err := os.Create(p)
+ f, err := os2.Create(p)
if err != nil {
return nil, err
}
@@ -186,13 +187,13 @@ func (fbs *FileBackupStorage) ListBackups(ctx context.Context, dir string) ([]ba
func (fbs *FileBackupStorage) StartBackup(ctx context.Context, dir, name string) (backupstorage.BackupHandle, error) {
// Make sure the directory exists.
p := path.Join(FileBackupStorageRoot, dir)
- if err := os.MkdirAll(p, os.ModePerm); err != nil {
+ if err := os2.MkdirAll(p); err != nil {
return nil, err
}
// Create the subdirectory for this named backup.
p = path.Join(p, name)
- if err := os.Mkdir(p, os.ModePerm); err != nil {
+ if err := os2.Mkdir(p); err != nil {
return nil, err
}
diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go
index f19cc681df7..54b0b036326 100644
--- a/go/vt/mysqlctl/mysqld.go
+++ b/go/vt/mysqlctl/mysqld.go
@@ -46,6 +46,7 @@ import (
"vitess.io/vitess/config"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/sqlerror"
+ "vitess.io/vitess/go/os2"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconfigs"
@@ -838,7 +839,7 @@ func (mysqld *Mysqld) initConfig(cnf *Mycnf, outFile string) error {
return err
}
- return os.WriteFile(outFile, []byte(configData), 0o664)
+ return os2.WriteFile(outFile, []byte(configData))
}
func (mysqld *Mysqld) getMycnfTemplate() string {
@@ -982,7 +983,7 @@ func (mysqld *Mysqld) ReinitConfig(ctx context.Context, cnf *Mycnf) error {
func (mysqld *Mysqld) createDirs(cnf *Mycnf) error {
tabletDir := cnf.TabletDir()
log.Infof("creating directory %s", tabletDir)
- if err := os.MkdirAll(tabletDir, os.ModePerm); err != nil {
+ if err := os2.MkdirAll(tabletDir); err != nil {
return err
}
for _, dir := range TopLevelDirs() {
@@ -992,7 +993,7 @@ func (mysqld *Mysqld) createDirs(cnf *Mycnf) error {
}
for _, dir := range cnf.directoryList() {
log.Infof("creating directory %s", dir)
- if err := os.MkdirAll(dir, os.ModePerm); err != nil {
+ if err := os2.MkdirAll(dir); err != nil {
return err
}
// FIXME(msolomon) validate permissions?
@@ -1016,14 +1017,14 @@ func (mysqld *Mysqld) createTopDir(cnf *Mycnf, dir string) error {
if os.IsNotExist(err) {
topdir := path.Join(tabletDir, dir)
log.Infof("creating directory %s", topdir)
- return os.MkdirAll(topdir, os.ModePerm)
+ return os2.MkdirAll(topdir)
}
return err
}
linkto := path.Join(target, vtname)
source := path.Join(tabletDir, dir)
log.Infof("creating directory %s", linkto)
- err = os.MkdirAll(linkto, os.ModePerm)
+ err = os2.MkdirAll(linkto)
if err != nil {
return err
}
diff --git a/go/vt/servenv/version.go b/go/vt/servenv/version.go
index f7e361379f4..ee9b312d5b7 100644
--- a/go/vt/servenv/version.go
+++ b/go/vt/servenv/version.go
@@ -19,4 +19,4 @@ package servenv
// DO NOT EDIT
// THIS FILE IS AUTO-GENERATED DURING NEW RELEASES BY THE VITESS-RELEASER
-const versionName = "19.0.9"
+const versionName = "19.0.10-SNAPSHOT"
diff --git a/go/vt/vtctl/vdiff_env_test.go b/go/vt/vtctl/vdiff_env_test.go
index b5324e7bd6e..044bf613c8a 100644
--- a/go/vt/vtctl/vdiff_env_test.go
+++ b/go/vt/vtctl/vdiff_env_test.go
@@ -128,7 +128,7 @@ func newTestVDiffEnv(t testing.TB, ctx context.Context, sourceShards, targetShar
// But this is one statement per stream.
env.tmc.setVRResults(
primary.tablet,
- fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos='%s', message='synchronizing for vdiff' where id=%d", vdiffSourceGtid, j+1),
+ fmt.Sprintf("update _vt.vreplication set state='Running', stop_pos=%s, message='synchronizing for vdiff' where id=%d", sqltypes.EncodeStringSQL(vdiffSourceGtid), j+1),
&sqltypes.Result{},
)
}
diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go
index 8d09185db1e..994672a5563 100644
--- a/go/vt/vtctl/workflow/server.go
+++ b/go/vt/vtctl/workflow/server.go
@@ -688,11 +688,10 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
targetKeyspaceByWorkflow[workflow.Name] = tablet.Keyspace
- timeUpdated := time.Unix(timeUpdatedSeconds, 0)
- vreplicationLag := time.Since(timeUpdated)
-
// MaxVReplicationLag represents the time since we last processed any event
// in the workflow.
+ timeUpdated := time.Unix(timeUpdatedSeconds, 0)
+ vreplicationLag := time.Since(timeUpdated)
if currentMaxLag, ok := maxVReplicationLagByWorkflow[workflow.Name]; ok {
if vreplicationLag.Seconds() > currentMaxLag {
maxVReplicationLagByWorkflow[workflow.Name] = vreplicationLag.Seconds()
@@ -701,32 +700,18 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
maxVReplicationLagByWorkflow[workflow.Name] = vreplicationLag.Seconds()
}
- // MaxVReplicationTransactionLag estimates the actual statement processing lag
- // between the source and the target. If we are still processing source events it
- // is the difference b/w current time and the timestamp of the last event. If
- // heartbeats are more recent than the last event, then the lag is the time since
- // the last heartbeat as there can be an actual event immediately after the
- // heartbeat, but which has not yet been processed on the target.
- // We don't allow switching during the copy phase, so in that case we just return
- // a large lag. All timestamps are in seconds since epoch.
+ // MaxVReplicationTransactionLag estimates the max statement processing lag
+ // between the source and the target across all of the workflow streams.
if _, ok := maxVReplicationTransactionLagByWorkflow[workflow.Name]; !ok {
maxVReplicationTransactionLagByWorkflow[workflow.Name] = 0
}
- lastTransactionTime := transactionTimeSeconds
- lastHeartbeatTime := timeHeartbeat
- if stream.State == binlogdatapb.VReplicationWorkflowState_Copying.String() {
- maxVReplicationTransactionLagByWorkflow[workflow.Name] = math.MaxInt64
- } else {
- if lastTransactionTime == 0 /* no new events after copy */ ||
- lastHeartbeatTime > lastTransactionTime /* no recent transactions, so all caught up */ {
-
- lastTransactionTime = lastHeartbeatTime
- }
- now := time.Now().Unix() /* seconds since epoch */
- transactionReplicationLag := float64(now - lastTransactionTime)
- if transactionReplicationLag > maxVReplicationTransactionLagByWorkflow[workflow.Name] {
- maxVReplicationTransactionLagByWorkflow[workflow.Name] = transactionReplicationLag
- }
+ heartbeatTimestamp := &vttimepb.Time{
+ Seconds: timeHeartbeat,
+ }
+ transactionReplicationLag := getVReplicationTrxLag(stream.TransactionTimestamp, stream.TimeUpdated, heartbeatTimestamp,
+ binlogdatapb.VReplicationWorkflowState(binlogdatapb.VReplicationWorkflowState_value[stream.State]))
+ if transactionReplicationLag > maxVReplicationTransactionLagByWorkflow[workflow.Name] {
+ maxVReplicationTransactionLagByWorkflow[workflow.Name] = transactionReplicationLag
}
return nil
@@ -3244,8 +3229,10 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
return handleError("failed to migrate the workflow streams", err)
}
if cancel {
- sw.cancelMigration(ctx, sm)
- return 0, sw.logs(), nil
+ if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+ err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+ }
+ return 0, sw.logs(), err
}
ts.Logger().Infof("Stopping streams")
@@ -3256,13 +3243,17 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
ts.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource)
}
}
- sw.cancelMigration(ctx, sm)
- return handleError("failed to stop the workflow streams", err)
+ if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+ err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+ }
+ return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err)
}
ts.Logger().Infof("Stopping source writes")
if err := sw.stopSourceWrites(ctx); err != nil {
- sw.cancelMigration(ctx, sm)
+ if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+ err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+ }
return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err)
}
@@ -3272,7 +3263,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
// the tablet's deny list check and the first mysqld side table lock.
for cnt := 1; cnt <= lockTablesCycles; cnt++ {
if err := ts.executeLockTablesOnSource(ctx); err != nil {
- sw.cancelMigration(ctx, sm)
+ if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+ err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+ }
return handleError(fmt.Sprintf("failed to execute LOCK TABLES (attempt %d of %d) on sources", cnt, lockTablesCycles), err)
}
// No need to UNLOCK the tables as the connection was closed once the locks were acquired
@@ -3283,25 +3276,33 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
ts.Logger().Infof("Waiting for streams to catchup")
if err := sw.waitForCatchup(ctx, timeout); err != nil {
- sw.cancelMigration(ctx, sm)
+ if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+ err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+ }
return handleError("failed to sync up replication between the source and target", err)
}
ts.Logger().Infof("Migrating streams")
if err := sw.migrateStreams(ctx, sm); err != nil {
- sw.cancelMigration(ctx, sm)
+ if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+ err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+ }
return handleError("failed to migrate the workflow streams", err)
}
ts.Logger().Infof("Resetting sequences")
if err := sw.resetSequences(ctx); err != nil {
- sw.cancelMigration(ctx, sm)
+ if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+ err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+ }
return handleError("failed to reset the sequences", err)
}
ts.Logger().Infof("Creating reverse streams")
if err := sw.createReverseVReplication(ctx); err != nil {
- sw.cancelMigration(ctx, sm)
+ if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+ err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+ }
return handleError("failed to create the reverse vreplication streams", err)
}
@@ -3314,7 +3315,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2)
defer cancel()
if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil {
- sw.cancelMigration(ctx, sm)
+ if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
+ err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr)
+ }
return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err)
}
}
@@ -3367,15 +3370,14 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, state *Stat
if err != nil {
return "", err
}
+ if wf.MaxVReplicationTransactionLag > maxAllowedReplLagSecs {
+ return fmt.Sprintf(cannotSwitchHighLag, wf.MaxVReplicationTransactionLag, maxAllowedReplLagSecs), nil
+ }
for _, stream := range wf.ShardStreams {
for _, st := range stream.GetStreams() {
if st.Message == Frozen {
return cannotSwitchFrozen, nil
}
- // If no new events have been replicated after the copy phase then it will be 0.
- if vreplLag := time.Now().Unix() - st.TimeUpdated.Seconds; vreplLag > maxAllowedReplLagSecs {
- return fmt.Sprintf(cannotSwitchHighLag, vreplLag, maxAllowedReplLagSecs), nil
- }
switch st.State {
case binlogdatapb.VReplicationWorkflowState_Copying.String():
return cannotSwitchCopyIncomplete, nil
@@ -3901,3 +3903,41 @@ func (s *Server) MigrateCreate(ctx context.Context, req *vtctldatapb.MigrateCrea
}
return s.moveTablesCreate(ctx, moveTablesCreateRequest, binlogdatapb.VReplicationWorkflowType_Migrate)
}
+
+// getVReplicationTrxLag estimates the actual statement processing lag between the
+// source and the target. If we are still processing source events it is the
+// difference between current time and the timestamp of the last event. If
+// heartbeats are more recent than the last event, then the lag is the time since
+// the last heartbeat as there can be an actual event immediately after the
+// heartbeat, but which has not yet been processed on the target. We don't allow
+// switching during the copy phase, so in that case we just return a large lag.
+// All timestamps are in seconds since epoch.
+func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 {
+ if state == binlogdatapb.VReplicationWorkflowState_Copying {
+ return math.MaxInt64
+ }
+ if trxTs == nil {
+ trxTs = &vttimepb.Time{}
+ }
+ lastTransactionTime := trxTs.Seconds
+ if updatedTs == nil {
+ updatedTs = &vttimepb.Time{}
+ }
+ lastUpdatedTime := updatedTs.Seconds
+ if heartbeatTs == nil {
+ heartbeatTs = &vttimepb.Time{}
+ }
+ lastHeartbeatTime := heartbeatTs.Seconds
+ // We do NOT update the heartbeat timestamp when we are regularly updating the
+ // position as we replicate transactions (GTIDs).
+ // When we DO record a heartbeat, we set the updated time to the same value.
+ // When recording that we are throttled, we update the updated time but NOT
+ // the heartbeat time.
+ if lastTransactionTime == 0 /* No replicated events after copy */ ||
+ (lastUpdatedTime == lastHeartbeatTime && /* The last update was from a heartbeat */
+ lastUpdatedTime > lastTransactionTime /* No recent transactions, only heartbeats, so all caught up */) {
+ lastTransactionTime = lastUpdatedTime
+ }
+ now := time.Now().Unix() // Seconds since epoch
+ return float64(now - lastTransactionTime)
+}
diff --git a/go/vt/vtctl/workflow/stream_migrator.go b/go/vt/vtctl/workflow/stream_migrator.go
index 7d225f6dd9f..1a7ffc71f24 100644
--- a/go/vt/vtctl/workflow/stream_migrator.go
+++ b/go/vt/vtctl/workflow/stream_migrator.go
@@ -158,12 +158,15 @@ func (sm *StreamMigrator) Templates() []*VReplicationStream {
}
// CancelStreamMigrations cancels the stream migrations.
-func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) {
+func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) error {
if sm.streams == nil {
- return
+ return nil
}
+ errs := &concurrency.AllErrorRecorder{}
- _ = sm.deleteTargetStreams(ctx)
+ if err := sm.deleteTargetStreams(ctx); err != nil {
+ errs.RecordError(fmt.Errorf("could not delete target streams: %v", err))
+ }
// Restart the source streams, but leave the Reshard workflow's reverse
// variant stopped.
@@ -176,8 +179,13 @@ func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) {
return err
})
if err != nil {
+ errs.RecordError(fmt.Errorf("could not restart source streams: %v", err))
sm.logger.Errorf("Cancel stream migrations failed: could not restart source streams: %v", err)
}
+ if errs.HasErrors() {
+ return errs.AggrError(vterrors.Aggregate)
+ }
+ return nil
}
// MigrateStreams migrates N streams
diff --git a/go/vt/vtctl/workflow/switcher.go b/go/vt/vtctl/workflow/switcher.go
index d7690458439..d0b924016d0 100644
--- a/go/vt/vtctl/workflow/switcher.go
+++ b/go/vt/vtctl/workflow/switcher.go
@@ -110,8 +110,8 @@ func (r *switcher) stopStreams(ctx context.Context, sm *StreamMigrator) ([]strin
return sm.StopStreams(ctx)
}
-func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) {
- r.ts.cancelMigration(ctx, sm)
+func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) error {
+ return r.ts.cancelMigration(ctx, sm)
}
func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) {
diff --git a/go/vt/vtctl/workflow/switcher_dry_run.go b/go/vt/vtctl/workflow/switcher_dry_run.go
index d1a3ecdaa1d..14075f60dee 100644
--- a/go/vt/vtctl/workflow/switcher_dry_run.go
+++ b/go/vt/vtctl/workflow/switcher_dry_run.go
@@ -217,8 +217,9 @@ func (dr *switcherDryRun) stopStreams(ctx context.Context, sm *StreamMigrator) (
return nil, nil
}
-func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrator) {
+func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrator) error {
dr.drLog.Log("Cancel migration as requested")
+ return nil
}
func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string) (context.Context, func(*error), error) {
diff --git a/go/vt/vtctl/workflow/switcher_interface.go b/go/vt/vtctl/workflow/switcher_interface.go
index 9af9ff49f2f..b9b8b6f6126 100644
--- a/go/vt/vtctl/workflow/switcher_interface.go
+++ b/go/vt/vtctl/workflow/switcher_interface.go
@@ -25,7 +25,7 @@ import (
type iswitcher interface {
lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error)
- cancelMigration(ctx context.Context, sm *StreamMigrator)
+ cancelMigration(ctx context.Context, sm *StreamMigrator) error
stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error)
stopSourceWrites(ctx context.Context) error
waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error
diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go
index 72fed638d2b..a295993583f 100644
--- a/go/vt/vtctl/workflow/traffic_switcher.go
+++ b/go/vt/vtctl/workflow/traffic_switcher.go
@@ -795,8 +795,8 @@ func (ts *trafficSwitcher) getReverseVReplicationUpdateQuery(targetCell string,
}
if ts.optCells != "" || ts.optTabletTypes != "" {
- query := fmt.Sprintf("update _vt.vreplication set cell = '%s', tablet_types = '%s' where workflow = '%s' and db_name = '%s'",
- ts.optCells, ts.optTabletTypes, ts.ReverseWorkflowName(), dbname)
+ query := fmt.Sprintf("update _vt.vreplication set cell = %s, tablet_types = %s where workflow = %s and db_name = %s",
+ sqltypes.EncodeStringSQL(ts.optCells), sqltypes.EncodeStringSQL(ts.optTabletTypes), sqltypes.EncodeStringSQL(ts.ReverseWorkflowName()), sqltypes.EncodeStringSQL(dbname))
return query
}
return ""
@@ -877,8 +877,8 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error
// For non-reference tables we return an error if there's no primary
// vindex as it's not clear what to do.
if len(vtable.ColumnVindexes) > 0 && len(vtable.ColumnVindexes[0].Columns) > 0 {
- inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s.%s', '%s')", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]),
- ts.SourceKeyspaceName(), vtable.ColumnVindexes[0].Name, key.KeyRangeString(source.GetShard().KeyRange))
+ inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s.%s', %s)", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]),
+ ts.SourceKeyspaceName(), vtable.ColumnVindexes[0].Name, encodeString(key.KeyRangeString(source.GetShard().KeyRange)))
} else {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "no primary vindex found for the %s table in the %s keyspace",
vtable.Name.String(), ts.SourceKeyspaceName())
@@ -999,8 +999,9 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a
// cancelMigration attempts to revert all changes made during the migration so that we can get back to the
// state when traffic switching (or reversing) was initiated.
-func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) {
+func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) error {
var err error
+ cancelErrs := &concurrency.AllErrorRecorder{}
if ctx.Err() != nil {
// Even though we create a new context later on we still record any context error:
@@ -1009,9 +1010,13 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
}
// We create a new context while canceling the migration, so that we are independent of the original
- // context being cancelled prior to or during the cancel operation.
- cmTimeout := 60 * time.Second
- cmCtx, cmCancel := context.WithTimeout(context.Background(), cmTimeout)
+ // context being canceled prior to or during the cancel operation itself.
+ // First we create a copy of the parent context, so that we maintain the locks, but which cannot be
+ // canceled by the parent context.
+ wcCtx := context.WithoutCancel(ctx)
+ // Now we create a child context from that which has a timeout.
+ cmTimeout := 2 * time.Minute
+ cmCtx, cmCancel := context.WithTimeout(wcCtx, cmTimeout)
defer cmCancel()
if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
@@ -1020,10 +1025,14 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
}
if err != nil {
+ cancelErrs.RecordError(fmt.Errorf("could not revert denied tables / shard access: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err)
}
- sm.CancelStreamMigrations(cmCtx)
+ if err := sm.CancelStreamMigrations(cmCtx); err != nil {
+ cancelErrs.RecordError(fmt.Errorf("could not cancel stream migrations: %v", err))
+ ts.Logger().Errorf("Cancel migration failed: could not cancel stream migrations: %v", err)
+ }
err = ts.ForAllTargets(func(target *MigrationTarget) error {
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s",
@@ -1032,13 +1041,19 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
return err
})
if err != nil {
+ cancelErrs.RecordError(fmt.Errorf("could not restart vreplication: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err)
}
- err = ts.deleteReverseVReplication(cmCtx)
- if err != nil {
- ts.Logger().Errorf("Cancel migration failed: could not delete revers vreplication entries: %v", err)
+ if err := ts.deleteReverseVReplication(cmCtx); err != nil {
+ cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err))
+ ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err)
}
+
+ if cancelErrs.HasErrors() {
+ return vterrors.Wrap(cancelErrs.AggrError(vterrors.Aggregate), "cancel migration failed, manual cleanup work may be necessary")
+ }
+ return nil
}
func (ts *trafficSwitcher) freezeTargetVReplication(ctx context.Context) error {
@@ -1046,7 +1061,7 @@ func (ts *trafficSwitcher) freezeTargetVReplication(ctx context.Context) error {
// re-invoked after a freeze, it will skip all the previous steps
err := ts.ForAllTargets(func(target *MigrationTarget) error {
ts.Logger().Infof("Marking target streams frozen for workflow %s db_name %s", ts.WorkflowName(), target.GetPrimary().DbName())
- query := fmt.Sprintf("update _vt.vreplication set message = '%s' where db_name=%s and workflow=%s", Frozen,
+ query := fmt.Sprintf("update _vt.vreplication set message = %s where db_name=%s and workflow=%s", encodeString(Frozen),
encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName()))
_, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, query)
return err
diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go
index 80b981026d8..2ecdf8318f2 100644
--- a/go/vt/vtctl/workflow/utils.go
+++ b/go/vt/vtctl/workflow/utils.go
@@ -17,7 +17,6 @@ limitations under the License.
package workflow
import (
- "bytes"
"context"
"fmt"
"hash/fnv"
@@ -578,9 +577,7 @@ func ReverseWorkflowName(workflow string) string {
// this public, but it doesn't belong in package workflow. Maybe package sqltypes,
// or maybe package sqlescape?
func encodeString(in string) string {
- buf := bytes.NewBuffer(nil)
- sqltypes.NewVarChar(in).EncodeSQL(buf)
- return buf.String()
+ return sqltypes.EncodeStringSQL(in)
}
func getRenameFileName(tableName string) string {
diff --git a/go/vt/vtgate/schema/tracker.go b/go/vt/vtgate/schema/tracker.go
index f7b46521b68..61fcdc05861 100644
--- a/go/vt/vtgate/schema/tracker.go
+++ b/go/vt/vtgate/schema/tracker.go
@@ -293,7 +293,7 @@ func (t *Tracker) updatedTableSchema(th *discovery.TabletHealth) bool {
func (t *Tracker) updateTables(keyspace string, res map[string]string) {
for tableName, tableDef := range res {
- stmt, err := t.parser.Parse(tableDef)
+ stmt, err := t.parser.ParseStrictDDL(tableDef)
if err != nil {
log.Warningf("error parsing table definition for %s: %v", tableName, err)
continue
@@ -475,7 +475,7 @@ func (vm *viewMap) set(ks, tbl, sql string) {
m = make(map[tableNameStr]sqlparser.SelectStatement)
vm.m[ks] = m
}
- stmt, err := vm.parser.Parse(sql)
+ stmt, err := vm.parser.ParseStrictDDL(sql)
if err != nil {
log.Warningf("ignoring view '%s', parsing error in view definition: '%s'", tbl, sql)
return
diff --git a/go/vt/vtgate/schema/tracker_test.go b/go/vt/vtgate/schema/tracker_test.go
index 1ee1aee6a0f..e8ebd0e5a5c 100644
--- a/go/vt/vtgate/schema/tracker_test.go
+++ b/go/vt/vtgate/schema/tracker_test.go
@@ -180,6 +180,8 @@ func TestTableTracking(t *testing.T) {
"t3": "create table t3(id datetime primary key)",
}, {
"t4": "create table t4(name varchar(50) primary key)",
+ }, {
+ "t5": "create table t5(name varchar(50) primary key with broken syntax)",
}}
testcases := []testCases{{
@@ -212,6 +214,15 @@ func TestTableTracking(t *testing.T) {
"t3": {{Name: sqlparser.NewIdentifierCI("id"), Type: querypb.Type_DATETIME, CollationName: "binary", Size: 0, Nullable: true}},
"t4": {{Name: sqlparser.NewIdentifierCI("name"), Type: querypb.Type_VARCHAR, Size: 50, Nullable: true}},
},
+ }, {
+ testName: "new broken table",
+ updTbl: []string{"t5"},
+ expTbl: map[string][]vindexes.Column{
+ "t1": {{Name: sqlparser.NewIdentifierCI("id"), Type: querypb.Type_INT64, CollationName: "binary", Nullable: true}, {Name: sqlparser.NewIdentifierCI("name"), Type: querypb.Type_VARCHAR, Size: 50, Nullable: true}, {Name: sqlparser.NewIdentifierCI("email"), Type: querypb.Type_VARCHAR, Size: 50, Nullable: false, Default: &sqlparser.Literal{Val: "a@b.com"}}},
+ "T1": {{Name: sqlparser.NewIdentifierCI("id"), Type: querypb.Type_VARCHAR, Size: 50, Nullable: true}, {Name: sqlparser.NewIdentifierCI("name"), Type: querypb.Type_VARCHAR, Size: 50, Nullable: true}},
+ "t3": {{Name: sqlparser.NewIdentifierCI("id"), Type: querypb.Type_DATETIME, CollationName: "binary", Size: 0, Nullable: true}},
+ "t4": {{Name: sqlparser.NewIdentifierCI("name"), Type: querypb.Type_VARCHAR, Size: 50, Nullable: true}},
+ },
}}
testTracker(t, schemaDefResult, testcases)
@@ -231,6 +242,8 @@ func TestViewsTracking(t *testing.T) {
"t3": "create view t3 as select 1 from tbl3",
}, {
"t4": "create view t4 as select 1 from tbl4",
+ }, {
+ "t4": "create view t5 as select 1 from tbl4 with broken syntax",
}}
testcases := []testCases{{
@@ -259,6 +272,14 @@ func TestViewsTracking(t *testing.T) {
"V1": "select 1, 2 from tbl2",
"t3": "select 1 from tbl3",
"t4": "select 1 from tbl4"},
+ }, {
+ testName: "new broken t5",
+ updView: []string{"t5"},
+ expView: map[string]string{
+ "t1": "select 1 from tbl1",
+ "V1": "select 1, 2 from tbl2",
+ "t3": "select 1 from tbl3",
+ "t4": "select 1 from tbl4"},
}}
testTracker(t, schemaDefResult, testcases)
diff --git a/go/vt/vttablet/endtoend/vstreamer_test.go b/go/vt/vttablet/endtoend/vstreamer_test.go
index 312273e0c84..8fff196da9f 100644
--- a/go/vt/vttablet/endtoend/vstreamer_test.go
+++ b/go/vt/vttablet/endtoend/vstreamer_test.go
@@ -17,7 +17,6 @@ limitations under the License.
package endtoend
import (
- "bytes"
"context"
"errors"
"fmt"
@@ -461,9 +460,7 @@ func expectLogs(ctx context.Context, t *testing.T, query string, eventCh chan []
}
func encodeString(in string) string {
- buf := bytes.NewBuffer(nil)
- sqltypes.NewVarChar(in).EncodeSQL(buf)
- return buf.String()
+ return sqltypes.EncodeStringSQL(in)
}
func validateSchemaInserted(client *framework.QueryClient, ddl string) bool {
diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go
index 5dd1811affd..1db88a5978c 100644
--- a/go/vt/vttablet/onlineddl/executor.go
+++ b/go/vt/vttablet/onlineddl/executor.go
@@ -1642,8 +1642,8 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem
{
// temporary hack. todo: this should be done when inserting any _vt.vreplication record across all workflow types
- query := fmt.Sprintf("update _vt.vreplication set workflow_type = %d where workflow = '%s'",
- binlogdatapb.VReplicationWorkflowType_OnlineDDL, v.workflow)
+ query := fmt.Sprintf("update _vt.vreplication set workflow_type = %d where workflow = %s",
+ binlogdatapb.VReplicationWorkflowType_OnlineDDL, sqltypes.EncodeStringSQL(v.workflow))
if _, err := e.vreplicationExec(ctx, tablet.Tablet, query); err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", tablet.Tablet, query)
}
diff --git a/go/vt/vttablet/tabletmanager/vdiff/utils.go b/go/vt/vttablet/tabletmanager/vdiff/utils.go
index 07e070976a9..b9469df8179 100644
--- a/go/vt/vttablet/tabletmanager/vdiff/utils.go
+++ b/go/vt/vttablet/tabletmanager/vdiff/utils.go
@@ -19,7 +19,6 @@ package vdiff
import (
"context"
"fmt"
- "strings"
"vitess.io/vitess/go/vt/vtgate/evalengine"
@@ -58,9 +57,7 @@ func newMergeSorter(participants map[string]*shardStreamer, comparePKs []compare
// Utility functions
func encodeString(in string) string {
- var buf strings.Builder
- sqltypes.NewVarChar(in).EncodeSQL(&buf)
- return buf.String()
+ return sqltypes.EncodeStringSQL(in)
}
func pkColsToGroupByParams(pkCols []int, collationEnv *collations.Environment) []*engine.GroupByParams {
diff --git a/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go b/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go
index da1753a8444..38e3038ede6 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go
@@ -50,7 +50,7 @@ func NewInsertGenerator(state binlogdatapb.VReplicationWorkflowState, dbname str
// AddRow adds a row to the insert statement.
func (ig *InsertGenerator) AddRow(workflow string, bls *binlogdatapb.BinlogSource, pos, cell, tabletTypes string,
workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType, deferSecondaryKeys bool) {
- fmt.Fprintf(ig.buf, "%s(%v, %v, %v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d, %v)",
+ fmt.Fprintf(ig.buf, "%s(%v, %v, %v, %v, %v, %v, %v, %v, 0, %v, %v, %d, %d, %v)",
ig.prefix,
encodeString(workflow),
encodeString(bls.String()),
@@ -60,7 +60,7 @@ func (ig *InsertGenerator) AddRow(workflow string, bls *binlogdatapb.BinlogSourc
encodeString(cell),
encodeString(tabletTypes),
ig.now,
- ig.state,
+ encodeString(ig.state),
encodeString(ig.dbname),
workflowType,
workflowSubType,
diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
index 11633d95f33..f9f0cc44443 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
@@ -482,7 +482,7 @@ func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, me
})
}
vr.stats.State.Store(state.String())
- query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state, encodeString(binlogplayer.MessageTruncate(message)), vr.id)
+ query := fmt.Sprintf("update _vt.vreplication set state=%v, message=%v where id=%v", encodeString(state.String()), encodeString(binlogplayer.MessageTruncate(message)), vr.id)
if _, err := vr.dbClient.ExecuteFetch(query, 1); err != nil {
return fmt.Errorf("could not set state: %v: %v", query, err)
}
@@ -498,9 +498,7 @@ func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, me
}
func encodeString(in string) string {
- var buf strings.Builder
- sqltypes.NewVarChar(in).EncodeSQL(&buf)
- return buf.String()
+ return sqltypes.EncodeStringSQL(in)
}
func (vr *vreplicator) getSettingFKCheck() error {
diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go
index bce5e4b33d6..f2734a36773 100644
--- a/go/vt/vttablet/tabletserver/schema/tracker.go
+++ b/go/vt/vttablet/tabletserver/schema/tracker.go
@@ -17,12 +17,12 @@ limitations under the License.
package schema
import (
- "bytes"
"context"
"fmt"
"sync"
"time"
+ "vitess.io/vitess/go/bytes2"
"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/vt/schema"
@@ -230,10 +230,15 @@ func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string,
}
defer conn.Recycle()
+ // We serialize a blob here, encodeString is for strings only
+ // and should not be used for binary data.
+ blobVal := sqltypes.MakeTrusted(sqltypes.VarBinary, blob)
+ buf := bytes2.Buffer{}
+ blobVal.EncodeSQLBytes2(&buf)
query := sqlparser.BuildParsedQuery("insert into %s.schema_version "+
"(pos, ddl, schemax, time_updated) "+
"values (%s, %s, %s, %d)", sidecar.GetIdentifier(), encodeString(gtid),
- encodeString(ddl), encodeString(string(blob)), timestamp).Query
+ encodeString(ddl), buf.String(), timestamp).Query
_, err = conn.Conn.Exec(ctx, query, 1, false)
if err != nil {
return err
@@ -242,9 +247,7 @@ func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string,
}
func encodeString(in string) string {
- buf := bytes.NewBuffer(nil)
- sqltypes.NewVarChar(in).EncodeSQL(buf)
- return buf.String()
+ return sqltypes.EncodeStringSQL(in)
}
// MustReloadSchemaOnDDL returns true if the ddl is for the db which is part of the workflow and is not an online ddl artifact
diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
index 9c63f8a499c..47a1c117719 100644
--- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
+++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
@@ -17,7 +17,6 @@ limitations under the License.
package vstreamer
import (
- "bytes"
"context"
"fmt"
"io"
@@ -856,9 +855,7 @@ type extColInfo struct {
}
func encodeString(in string) string {
- buf := bytes.NewBuffer(nil)
- sqltypes.NewVarChar(in).EncodeSQL(buf)
- return buf.String()
+ return sqltypes.EncodeStringSQL(in)
}
func (vs *vstreamer) processJournalEvent(vevents []*binlogdatapb.VEvent, plan *streamerPlan, rows mysql.Rows) ([]*binlogdatapb.VEvent, error) {
diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go
index a5f7d6ae0bf..3f548e19402 100644
--- a/go/vt/wrangler/keyspace.go
+++ b/go/vt/wrangler/keyspace.go
@@ -17,7 +17,6 @@ limitations under the License.
package wrangler
import (
- "bytes"
"context"
"fmt"
"sync"
@@ -124,7 +123,5 @@ func (wr *Wrangler) updateShardRecords(ctx context.Context, keyspace string, sha
}
func encodeString(in string) string {
- buf := bytes.NewBuffer(nil)
- sqltypes.NewVarChar(in).EncodeSQL(buf)
- return buf.String()
+ return sqltypes.EncodeStringSQL(in)
}
diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go
index 70dee1261fc..65998053e9b 100644
--- a/go/vt/wrangler/vdiff.go
+++ b/go/vt/wrangler/vdiff.go
@@ -537,7 +537,7 @@ func findPKs(env *vtenv.Environment, table *tabletmanagerdatapb.TableDefinition,
// column in the table definition leveraging MySQL's collation inheritance
// rules.
func getColumnCollations(venv *vtenv.Environment, table *tabletmanagerdatapb.TableDefinition) (map[string]collations.ID, error) {
- createstmt, err := venv.Parser().Parse(table.Schema)
+ createstmt, err := venv.Parser().ParseStrictDDL(table.Schema)
if err != nil {
return nil, err
}
diff --git a/go/vt/wrangler/vdiff_test.go b/go/vt/wrangler/vdiff_test.go
index 1b0071ebed7..96392c3cc0a 100644
--- a/go/vt/wrangler/vdiff_test.go
+++ b/go/vt/wrangler/vdiff_test.go
@@ -1277,6 +1277,13 @@ func TestGetColumnCollations(t *testing.T) {
"name": collationEnv.LookupByName("utf16_icelandic_ci"),
},
},
+ {
+ name: "invalid schema",
+ table: &tabletmanagerdatapb.TableDefinition{
+ Schema: "create table t1 (c1 varchar(10), size set('small', 'medium', 'large'), primary key(c1) with syntax error)",
+ },
+ wantErr: true,
+ },
}
env := vtenv.NewTestEnv()
for _, tt := range tests {
diff --git a/java/client/pom.xml b/java/client/pom.xml
index 44b86b15849..cfb1615a7ba 100644
--- a/java/client/pom.xml
+++ b/java/client/pom.xml
@@ -5,7 +5,7 @@
io.vitess
vitess-parent
- 19.0.9
+ 19.0.10-SNAPSHOT
vitess-client
diff --git a/java/example/pom.xml b/java/example/pom.xml
index caf4a642cd7..7c8d991d5a0 100644
--- a/java/example/pom.xml
+++ b/java/example/pom.xml
@@ -5,7 +5,7 @@
io.vitess
vitess-parent
- 19.0.9
+ 19.0.10-SNAPSHOT
vitess-example
diff --git a/java/grpc-client/pom.xml b/java/grpc-client/pom.xml
index 0fbfe73a268..e17bb074909 100644
--- a/java/grpc-client/pom.xml
+++ b/java/grpc-client/pom.xml
@@ -5,7 +5,7 @@
io.vitess
vitess-parent
- 19.0.9
+ 19.0.10-SNAPSHOT
vitess-grpc-client
diff --git a/java/jdbc/pom.xml b/java/jdbc/pom.xml
index 4aaae20c428..d07374083d1 100644
--- a/java/jdbc/pom.xml
+++ b/java/jdbc/pom.xml
@@ -5,7 +5,7 @@
io.vitess
vitess-parent
- 19.0.9
+ 19.0.10-SNAPSHOT
vitess-jdbc
diff --git a/java/pom.xml b/java/pom.xml
index d76a806be9a..87c2b110dc0 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -11,7 +11,7 @@
io.vitess
vitess-parent
- 19.0.9
+ 19.0.10-SNAPSHOT
pom
Vitess Java Client libraries [Parent]