From f746c48810a42b3e7ef4ea1a760463ef96dedffc Mon Sep 17 00:00:00 2001 From: Renan Rangel Date: Mon, 9 Dec 2024 09:08:12 +0000 Subject: [PATCH] fix flaky test on mysqlshell backup engine (#17037) Signed-off-by: Renan Rangel --- go/vt/mysqlctl/mysqlshellbackupengine.go | 53 +++++++++---------- go/vt/mysqlctl/mysqlshellbackupengine_test.go | 18 +++---- 2 files changed, 34 insertions(+), 37 deletions(-) diff --git a/go/vt/mysqlctl/mysqlshellbackupengine.go b/go/vt/mysqlctl/mysqlshellbackupengine.go index b7405ce7eaa..681e62d10cd 100644 --- a/go/vt/mysqlctl/mysqlshellbackupengine.go +++ b/go/vt/mysqlctl/mysqlshellbackupengine.go @@ -56,8 +56,6 @@ var ( // disable redo logging and double write buffer mysqlShellSpeedUpRestore = false - mysqlShellBackupBinaryName = "mysqlsh" - // use when checking if we need to create the directory on the local filesystem or not. knownObjectStoreParams = []string{"s3BucketName", "osBucketName", "azureContainerName"} @@ -87,7 +85,9 @@ type MySQLShellBackupManifest struct { } func init() { - BackupRestoreEngineMap[mysqlShellBackupEngineName] = &MySQLShellBackupEngine{} + BackupRestoreEngineMap[mysqlShellBackupEngineName] = &MySQLShellBackupEngine{ + binaryName: "mysqlsh", + } for _, cmd := range []string{"vtcombo", "vttablet", "vtbackup", "vttestserver", "vtctldclient"} { servenv.OnParseFor(cmd, registerMysqlShellBackupEngineFlags) @@ -106,6 +106,7 @@ func registerMysqlShellBackupEngineFlags(fs *pflag.FlagSet) { // MySQLShellBackupEngine encapsulates the logic to implement the restoration // of a mysql-shell based backup. type MySQLShellBackupEngine struct { + binaryName string } const ( @@ -164,41 +165,37 @@ func (be *MySQLShellBackupEngine) ExecuteBackup(ctx context.Context, params Back return BackupUnusable, vterrors.Wrap(err, "failed to fetch position") } - cmd := exec.CommandContext(ctx, mysqlShellBackupBinaryName, args...) + cmd := exec.CommandContext(ctx, be.binaryName, args...) params.Logger.Infof("running %s", cmd.String()) - cmdOut, err := cmd.StdoutPipe() - if err != nil { - return BackupUnusable, vterrors.Wrap(err, "cannot create stdout pipe") - } - cmdOriginalErr, err := cmd.StderrPipe() - if err != nil { - return BackupUnusable, vterrors.Wrap(err, "cannot create stderr pipe") - } - if err := cmd.Start(); err != nil { - return BackupUnusable, vterrors.Wrap(err, "can't start mysqlshell") - } + stdoutReader, stdoutWriter := io.Pipe() + stderrReader, stderrWriter := io.Pipe() + lockWaiterReader, lockWaiterWriter := io.Pipe() - pipeReader, pipeWriter := io.Pipe() - cmdErr := io.TeeReader(cmdOriginalErr, pipeWriter) + cmd.Stdout = stdoutWriter + cmd.Stderr = stderrWriter + combinedErr := io.TeeReader(stderrReader, lockWaiterWriter) cmdWg := &sync.WaitGroup{} cmdWg.Add(3) - go releaseReadLock(ctx, pipeReader, params, cmdWg, lockAcquired) - go scanLinesToLogger(mysqlShellBackupEngineName+" stdout", cmdOut, params.Logger, cmdWg.Done) - go scanLinesToLogger(mysqlShellBackupEngineName+" stderr", cmdErr, params.Logger, cmdWg.Done) + go releaseReadLock(ctx, lockWaiterReader, params, cmdWg, lockAcquired) + go scanLinesToLogger(mysqlShellBackupEngineName+" stdout", stdoutReader, params.Logger, cmdWg.Done) + go scanLinesToLogger(mysqlShellBackupEngineName+" stderr", combinedErr, params.Logger, cmdWg.Done) - // Get exit status. - if err := cmd.Wait(); err != nil { - pipeWriter.Close() // make sure we close the writer so the goroutines above will complete. - return BackupUnusable, vterrors.Wrap(err, mysqlShellBackupEngineName+" failed") - } + // we run the command, wait for it to complete and close all pipes so the goroutines can complete on their own. + // after that we can process if an error has happened or not. + err = cmd.Run() - // close the pipeWriter and wait for the goroutines to have read all the logs - pipeWriter.Close() + stdoutWriter.Close() + stderrWriter.Close() + lockWaiterWriter.Close() cmdWg.Wait() + if err != nil { + return BackupUnusable, vterrors.Wrap(err, mysqlShellBackupEngineName+" failed") + } + // open the MANIFEST params.Logger.Infof("Writing backup MANIFEST") mwc, err := bh.AddFile(ctx, backupManifestFileName, backupstorage.FileSizeUnknown) @@ -371,7 +368,7 @@ func (be *MySQLShellBackupEngine) ExecuteRestore(ctx context.Context, params Res if err := cmd.Wait(); err != nil { return nil, vterrors.Wrap(err, mysqlShellBackupEngineName+" failed") } - params.Logger.Infof("%s completed successfully", mysqlShellBackupBinaryName) + params.Logger.Infof("%s completed successfully", be.binaryName) // disable local_infile now that the restore is done. err = params.Mysqld.ExecuteSuperQuery(ctx, "SET GLOBAL LOCAL_INFILE=0") diff --git a/go/vt/mysqlctl/mysqlshellbackupengine_test.go b/go/vt/mysqlctl/mysqlshellbackupengine_test.go index 67f27b5382e..80491ec2afc 100644 --- a/go/vt/mysqlctl/mysqlshellbackupengine_test.go +++ b/go/vt/mysqlctl/mysqlshellbackupengine_test.go @@ -325,13 +325,10 @@ func generateTestFile(t *testing.T, name, contents string) { // during ExecuteBackup(), even if the backup didn't succeed. func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) { originalLocation := mysqlShellBackupLocation - originalBinary := mysqlShellBackupBinaryName mysqlShellBackupLocation = "logical" - mysqlShellBackupBinaryName = path.Join(t.TempDir(), "test.sh") - defer func() { // restore the original values. + defer func() { // restore the original value. mysqlShellBackupLocation = originalLocation - mysqlShellBackupBinaryName = originalBinary }() logger := logutil.NewMemoryLogger() @@ -340,7 +337,6 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) { mysql := NewFakeMysqlDaemon(fakedb) defer mysql.Close() - be := &MySQLShellBackupEngine{} params := BackupParams{ TabletAlias: "test", Logger: logger, @@ -351,6 +347,7 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) { } t.Run("lock released if we see the mysqlsh lock being acquired", func(t *testing.T) { + be := &MySQLShellBackupEngine{binaryName: path.Join(t.TempDir(), "mysqlsh.sh")} logger.Clear() manifestBuffer := ioutil.NewBytesBufferWriter() bs.StartBackupReturn.BackupHandle = &FakeBackupHandle{ @@ -359,7 +356,8 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) { } // this simulates mysql shell completing without any issues. - generateTestFile(t, mysqlShellBackupBinaryName, fmt.Sprintf("#!/bin/bash\n>&2 echo %s", mysqlShellLockMessage)) + generateTestFile(t, be.binaryName, fmt.Sprintf( + "#!/bin/bash\n>&2 echo %s; echo \"backup completed\"; sleep 0.01", mysqlShellLockMessage)) bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name()) require.NoError(t, err) @@ -380,7 +378,8 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) { "failed to release the global lock after mysqlsh") }) - t.Run("lock released if when we don't see mysqlsh released it", func(t *testing.T) { + t.Run("lock released if we don't see mysqlsh release it", func(t *testing.T) { + be := &MySQLShellBackupEngine{binaryName: path.Join(t.TempDir(), "mysqlsh.sh")} mysql.GlobalReadLock = false // clear lock status. logger.Clear() manifestBuffer := ioutil.NewBytesBufferWriter() @@ -390,7 +389,7 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) { } // this simulates mysqlshell completing, but we don't see the message that is released its lock. - generateTestFile(t, mysqlShellBackupBinaryName, "#!/bin/bash\nexit 0") + generateTestFile(t, be.binaryName, "#!/bin/bash\nexit 0") bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name()) require.NoError(t, err) @@ -407,6 +406,7 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) { }) t.Run("lock released when backup fails", func(t *testing.T) { + be := &MySQLShellBackupEngine{binaryName: path.Join(t.TempDir(), "mysqlsh.sh")} mysql.GlobalReadLock = false // clear lock status. logger.Clear() manifestBuffer := ioutil.NewBytesBufferWriter() @@ -416,7 +416,7 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) { } // this simulates the backup process failing. - generateTestFile(t, mysqlShellBackupBinaryName, "#!/bin/bash\nexit 1") + generateTestFile(t, be.binaryName, "#!/bin/bash\nexit 1") bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name()) require.NoError(t, err)