Skip to content

Commit

Permalink
PTEUDO-1543: Handle schema drops on restore failure, avoid special ex…
Browse files Browse the repository at this point in the history
…tensions (#319)
  • Loading branch information
leandrorichardtoledo authored Sep 27, 2024
1 parent 547a6c4 commit 6d4741f
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 28 deletions.
14 changes: 7 additions & 7 deletions pkg/databaseclaim/databaseclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (r *DatabaseClaimReconciler) executeDbClaimRequest(ctx context.Context, req
//when using an existing db, this is the first status, then it moves to M_MigrateExistingToNewDB and falls into the condition below
if operationMode == M_UseExistingDB {
logr.Info("existing db reconcile started")
err := r.reconcileUseExistingDB(ctx, reqInfo, dbClaim)
err := r.reconcileUseExistingDB(ctx, reqInfo, dbClaim, operationMode)
if err != nil {
return r.manageError(ctx, dbClaim, err)
}
Expand All @@ -438,7 +438,7 @@ func (r *DatabaseClaimReconciler) executeDbClaimRequest(ctx context.Context, req

logr.Info("existing db was not reconciled, calling reconcileUseExistingDB before reconcileUseExistingDB")

err := r.reconcileUseExistingDB(ctx, reqInfo, dbClaim)
err := r.reconcileUseExistingDB(ctx, reqInfo, dbClaim, operationMode)
if err != nil {
return r.manageError(ctx, dbClaim, err)
}
Expand Down Expand Up @@ -494,7 +494,7 @@ func (r *DatabaseClaimReconciler) executeDbClaimRequest(ctx context.Context, req

// reconcileUseExistingDB reconciles the existing db
// bool indicates that object status should be updated
func (r *DatabaseClaimReconciler) reconcileUseExistingDB(ctx context.Context, reqInfo *requestInfo, dbClaim *v1.DatabaseClaim) error {
func (r *DatabaseClaimReconciler) reconcileUseExistingDB(ctx context.Context, reqInfo *requestInfo, dbClaim *v1.DatabaseClaim, operationalMode ModeEnum) error {
logr := log.FromContext(ctx).WithValues("databaseclaim", dbClaim.Namespace+"/"+dbClaim.Name)

activeDB := dbClaim.Status.ActiveDB
Expand Down Expand Up @@ -542,7 +542,7 @@ func (r *DatabaseClaimReconciler) reconcileUseExistingDB(ctx context.Context, re
dbName := existingDBConnInfo.DatabaseName
updateDBStatus(&dbClaim.Status.NewDB, dbName)

err = r.manageUserAndExtensions(reqInfo, logr, dbClient, &dbClaim.Status.NewDB, dbName, dbClaim.Spec.Username)
err = r.manageUserAndExtensions(reqInfo, logr, dbClient, &dbClaim.Status.NewDB, dbName, dbClaim.Spec.Username, operationalMode)
if err != nil {
return err
}
Expand Down Expand Up @@ -639,7 +639,7 @@ func (r *DatabaseClaimReconciler) reconcileNewDB(ctx context.Context, reqInfo *r
return ctrl.Result{}, err

}
err = r.manageUserAndExtensions(reqInfo, logr, dbClient, &dbClaim.Status.NewDB, dbClaim.Spec.DatabaseName, dbClaim.Spec.Username)
err = r.manageUserAndExtensions(reqInfo, logr, dbClient, &dbClaim.Status.NewDB, dbClaim.Spec.DatabaseName, dbClaim.Spec.Username, operationalMode)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -1150,7 +1150,7 @@ func (r *DatabaseClaimReconciler) createDatabaseAndExtensions(ctx context.Contex
return nil
}

func (r *DatabaseClaimReconciler) manageUserAndExtensions(reqInfo *requestInfo, logger logr.Logger, dbClient dbclient.Clienter, status *v1.Status, dbName string, baseUsername string) error {
func (r *DatabaseClaimReconciler) manageUserAndExtensions(reqInfo *requestInfo, logger logr.Logger, dbClient dbclient.Clienter, status *v1.Status, dbName string, baseUsername string, operationalMode ModeEnum) error {

if status == nil {
return fmt.Errorf("status is nil")
Expand All @@ -1165,7 +1165,7 @@ func (r *DatabaseClaimReconciler) manageUserAndExtensions(reqInfo *requestInfo,
if err != nil {
return err
}
if roleCreated {
if roleCreated && operationalMode != M_MigrateExistingToNewDB && operationalMode != M_MigrationInProgress {
// take care of special extensions related to the user
err = dbClient.CreateSpecialExtensions(dbName, baseUsername)
if err != nil {
Expand Down
32 changes: 22 additions & 10 deletions pkg/pgctl/pgctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ var revokeSuperUserAccess = func(DBAdmin *sql.DB, role string) error {
return err
}

// Execute runs the schema copy process, including granting and revoking superuser access,
// dumping the schema, and restoring it to the target database.
func (s *copy_schema_state) Execute() (State, error) {
log := s.config.Log.WithValues("state", s.String())
log.Info("started")
Expand All @@ -313,7 +315,6 @@ func (s *copy_schema_state) Execute() (State, error) {

if targetDBAdmin, err = getDB(s.config.TargetDBAdminDsn, nil); err != nil {
log.Error(err, "connection test failed for targetDBAdmin")

return nil, err
}
defer closeDB(log, targetDBAdmin)
Expand All @@ -322,18 +323,18 @@ func (s *copy_schema_state) Execute() (State, error) {
if err != nil {
return nil, err
}

rolename := url.User.Username()
log.Info("granting super user acesss role to \"" + rolename + "\"")
log.Info("granting super user access", "role", rolename)

err = grantSuperUserAccess(targetDBAdmin, rolename)
if err != nil {
log.Error(err, "could not grant super user acesss role to \""+rolename+"\"")
log.Error(err, "failed to grant superuser access", "role", rolename)
metrics.UsersUpdatedErrors.WithLabelValues("grant error").Inc()
return nil, err
}

dump := NewDump(s.config.SourceDBAdminDsn)

dump.SetupFormat("p")
dump.SetPath(s.config.ExportFilePath)
dump.EnableVerbose()
Expand All @@ -347,29 +348,40 @@ func (s *copy_schema_state) Execute() (State, error) {
})

dumpExec := dump.Exec(ExecOptions{StreamPrint: true})
log.Info("executed dump with", "full command", dumpExec.FullCommand)
log.Info("executed dump", "full command", dumpExec.FullCommand)

if dumpExec.Error != nil {
return nil, dumpExec.Error.Err
}

if err = dump.modifyPgDumpInfo(); err != nil {
log.Error(err, "failed to comment create policy")
return nil, err
}

restore := NewRestore(s.config.TargetDBUserDsn)
restore.EnableVerbose()
restore.Path = s.config.ExportFilePath
restore.SetPath(s.config.ExportFilePath)

restoreExec := restore.Exec(dumpExec.FileName, ExecOptions{StreamPrint: true})
if restoreExec.Error != nil {
log.Error(restoreExec.Error.Err, "restore failed")

log.Info("restored with", "full command", restoreExec.FullCommand)
// Attempt to drop schemas after restore failure.
dropResult := restore.DropSchemas()
if dropResult.Error != nil {
log.Error(dropResult.Error.Err, "failed to drop schemas")
return nil, dropResult.Error.Err
}

if restoreExec.Error != nil {
return nil, restoreExec.Error.Err
}

log.Info("executed restore", "full command", restoreExec.FullCommand)

err = revokeSuperUserAccess(targetDBAdmin, rolename)
if err != nil {
log.Error(err, "could not revoke super user acesss role from"+rolename)
log.Error(err, "failed to revoke superuser access", "role", rolename)
metrics.UsersUpdatedErrors.WithLabelValues("revoke error").Inc()
return nil, err
}
Expand All @@ -378,8 +390,8 @@ func (s *copy_schema_state) Execute() (State, error) {
return &create_subscription_state{
config: s.config,
}, nil

}

func (s *copy_schema_state) Id() StateEnum {
return S_CopySchema
}
Expand Down
96 changes: 85 additions & 11 deletions pkg/pgctl/pgrestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,58 @@ type Restore struct {
Schemas []string
}

// NewRestore creates a new Restore instance with the provided configuration.
func NewRestore(DsnUri string) *Restore {
return &Restore{Options: PGDRestoreOpts, DsnUri: DsnUri, Schemas: []string{"public"}}
return &Restore{
Options: PGDRestoreOpts,
DsnUri: DsnUri,
Schemas: []string{"public"},
}
}

// Exec runs the pg_restore command with the provided filename and options.
func (x *Restore) Exec(filename string, opts ExecOptions) Result {
result := Result{}
options := []string{x.DsnUri, "-vON_ERROR_STOP=ON",
fmt.Sprintf("--file=%s%s", x.Path, filename)}
options := []string{
x.DsnUri,
"-vON_ERROR_STOP=ON",
fmt.Sprintf("--file=%s%s", x.Path, filename),
}
options = append(options, x.restoreOptions()...)
result.FullCommand = strings.Join(options, " ")

result := Result{
FullCommand: strings.Join(options, " "),
}

cmd := exec.Command(PSQL, options...)

//cmd.Env = append(os.Environ(), x.EnvPassword)
stderrIn, _ := cmd.StderrPipe()
// Pipe to capture error output.
stderrIn, err := cmd.StderrPipe()
if err != nil {
result.Error = &ResultError{Err: err}
return result
}

go func() {
result.Output = streamExecOutput(stderrIn, opts)
}()
cmd.Start()
err := cmd.Wait()
if exitError, ok := err.(*exec.ExitError); ok {
result.Error = &ResultError{Err: err, ExitCode: exitError.ExitCode(), CmdOutput: result.Output}

err = cmd.Start()
if err != nil {
result.Error = &ResultError{Err: err, CmdOutput: result.Output}
return result
}

err = cmd.Wait()
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
result.Error = &ResultError{Err: exitError, ExitCode: exitError.ExitCode(), CmdOutput: result.Output}
return result
}

result.Error = &ResultError{Err: err, CmdOutput: result.Output}
return result
}

return result
}

Expand Down Expand Up @@ -76,3 +106,47 @@ func (x *Restore) SetOptions(o []string) {
func (x *Restore) GetOptions() []string {
return x.Options
}

// DropSchemas drops all schemas except the system ones.
func (x *Restore) DropSchemas() Result {
dropSchemaSQL := `
DO $$ DECLARE
r RECORD;
BEGIN
FOR r IN (SELECT nspname FROM pg_namespace WHERE nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') AND nspname !~ '^pg_temp_') LOOP
EXECUTE 'DROP SCHEMA IF EXISTS ' || quote_ident(r.nspname) || ' CASCADE';
END LOOP;
END $$;
`

result := Result{}
cmd := exec.Command(PSQL, x.DsnUri, "-c", dropSchemaSQL)

// Pipe to capture error output.
stderrIn, err := cmd.StderrPipe()
if err != nil {
result.Error = &ResultError{Err: err}
return result
}

go func() {
result.Output = streamExecOutput(stderrIn, ExecOptions{})
}()

if err := cmd.Start(); err != nil {
result.Error = &ResultError{Err: err, CmdOutput: result.Output}
return result
}

if err := cmd.Wait(); err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
result.Error = &ResultError{Err: exitError, ExitCode: exitError.ExitCode(), CmdOutput: result.Output}
return result
}

result.Error = &ResultError{Err: err, CmdOutput: result.Output}
return result
}

return result
}

0 comments on commit 6d4741f

Please sign in to comment.