diff --git a/pkg/databaseclaim/databaseclaim.go b/pkg/databaseclaim/databaseclaim.go index 313d7a08..2662872b 100644 --- a/pkg/databaseclaim/databaseclaim.go +++ b/pkg/databaseclaim/databaseclaim.go @@ -414,7 +414,7 @@ func (r *DatabaseClaimReconciler) executeDbClaimRequest(ctx context.Context, req //when using an existing db, this is the first status, then it moves to M_MigrateExistingToNewDB and falls into the condition below if operationMode == M_UseExistingDB { logr.Info("existing db reconcile started") - err := r.reconcileUseExistingDB(ctx, reqInfo, dbClaim) + err := r.reconcileUseExistingDB(ctx, reqInfo, dbClaim, operationMode) if err != nil { return r.manageError(ctx, dbClaim, err) } @@ -438,7 +438,7 @@ func (r *DatabaseClaimReconciler) executeDbClaimRequest(ctx context.Context, req logr.Info("existing db was not reconciled, calling reconcileUseExistingDB before reconcileUseExistingDB") - err := r.reconcileUseExistingDB(ctx, reqInfo, dbClaim) + err := r.reconcileUseExistingDB(ctx, reqInfo, dbClaim, operationMode) if err != nil { return r.manageError(ctx, dbClaim, err) } @@ -494,7 +494,7 @@ func (r *DatabaseClaimReconciler) executeDbClaimRequest(ctx context.Context, req // reconcileUseExistingDB reconciles the existing db // bool indicates that object status should be updated -func (r *DatabaseClaimReconciler) reconcileUseExistingDB(ctx context.Context, reqInfo *requestInfo, dbClaim *v1.DatabaseClaim) error { +func (r *DatabaseClaimReconciler) reconcileUseExistingDB(ctx context.Context, reqInfo *requestInfo, dbClaim *v1.DatabaseClaim, operationalMode ModeEnum) error { logr := log.FromContext(ctx).WithValues("databaseclaim", dbClaim.Namespace+"/"+dbClaim.Name) activeDB := dbClaim.Status.ActiveDB @@ -542,7 +542,7 @@ func (r *DatabaseClaimReconciler) reconcileUseExistingDB(ctx context.Context, re dbName := existingDBConnInfo.DatabaseName updateDBStatus(&dbClaim.Status.NewDB, dbName) - err = r.manageUserAndExtensions(reqInfo, logr, dbClient, &dbClaim.Status.NewDB, dbName, dbClaim.Spec.Username) + err = r.manageUserAndExtensions(reqInfo, logr, dbClient, &dbClaim.Status.NewDB, dbName, dbClaim.Spec.Username, operationalMode) if err != nil { return err } @@ -639,7 +639,7 @@ func (r *DatabaseClaimReconciler) reconcileNewDB(ctx context.Context, reqInfo *r return ctrl.Result{}, err } - err = r.manageUserAndExtensions(reqInfo, logr, dbClient, &dbClaim.Status.NewDB, dbClaim.Spec.DatabaseName, dbClaim.Spec.Username) + err = r.manageUserAndExtensions(reqInfo, logr, dbClient, &dbClaim.Status.NewDB, dbClaim.Spec.DatabaseName, dbClaim.Spec.Username, operationalMode) if err != nil { return ctrl.Result{}, err } @@ -1150,7 +1150,7 @@ func (r *DatabaseClaimReconciler) createDatabaseAndExtensions(ctx context.Contex return nil } -func (r *DatabaseClaimReconciler) manageUserAndExtensions(reqInfo *requestInfo, logger logr.Logger, dbClient dbclient.Clienter, status *v1.Status, dbName string, baseUsername string) error { +func (r *DatabaseClaimReconciler) manageUserAndExtensions(reqInfo *requestInfo, logger logr.Logger, dbClient dbclient.Clienter, status *v1.Status, dbName string, baseUsername string, operationalMode ModeEnum) error { if status == nil { return fmt.Errorf("status is nil") @@ -1165,7 +1165,7 @@ func (r *DatabaseClaimReconciler) manageUserAndExtensions(reqInfo *requestInfo, if err != nil { return err } - if roleCreated { + if roleCreated && operationalMode != M_MigrateExistingToNewDB && operationalMode != M_MigrationInProgress { // take care of special extensions related to the user err = dbClient.CreateSpecialExtensions(dbName, baseUsername) if err != nil { diff --git a/pkg/pgctl/pgctl.go b/pkg/pgctl/pgctl.go index 777b87fd..9fdc9d99 100644 --- a/pkg/pgctl/pgctl.go +++ b/pkg/pgctl/pgctl.go @@ -298,6 +298,8 @@ var revokeSuperUserAccess = func(DBAdmin *sql.DB, role string) error { return err } +// Execute runs the schema copy process, including granting and revoking superuser access, +// dumping the schema, and restoring it to the target database. func (s *copy_schema_state) Execute() (State, error) { log := s.config.Log.WithValues("state", s.String()) log.Info("started") @@ -313,7 +315,6 @@ func (s *copy_schema_state) Execute() (State, error) { if targetDBAdmin, err = getDB(s.config.TargetDBAdminDsn, nil); err != nil { log.Error(err, "connection test failed for targetDBAdmin") - return nil, err } defer closeDB(log, targetDBAdmin) @@ -322,18 +323,18 @@ func (s *copy_schema_state) Execute() (State, error) { if err != nil { return nil, err } + rolename := url.User.Username() - log.Info("granting super user acesss role to \"" + rolename + "\"") + log.Info("granting super user access", "role", rolename) err = grantSuperUserAccess(targetDBAdmin, rolename) if err != nil { - log.Error(err, "could not grant super user acesss role to \""+rolename+"\"") + log.Error(err, "failed to grant superuser access", "role", rolename) metrics.UsersUpdatedErrors.WithLabelValues("grant error").Inc() return nil, err } dump := NewDump(s.config.SourceDBAdminDsn) - dump.SetupFormat("p") dump.SetPath(s.config.ExportFilePath) dump.EnableVerbose() @@ -347,11 +348,12 @@ func (s *copy_schema_state) Execute() (State, error) { }) dumpExec := dump.Exec(ExecOptions{StreamPrint: true}) - log.Info("executed dump with", "full command", dumpExec.FullCommand) + log.Info("executed dump", "full command", dumpExec.FullCommand) if dumpExec.Error != nil { return nil, dumpExec.Error.Err } + if err = dump.modifyPgDumpInfo(); err != nil { log.Error(err, "failed to comment create policy") return nil, err @@ -359,17 +361,27 @@ func (s *copy_schema_state) Execute() (State, error) { restore := NewRestore(s.config.TargetDBUserDsn) restore.EnableVerbose() - restore.Path = s.config.ExportFilePath + restore.SetPath(s.config.ExportFilePath) + restoreExec := restore.Exec(dumpExec.FileName, ExecOptions{StreamPrint: true}) + if restoreExec.Error != nil { + log.Error(restoreExec.Error.Err, "restore failed") - log.Info("restored with", "full command", restoreExec.FullCommand) + // Attempt to drop schemas after restore failure. + dropResult := restore.DropSchemas() + if dropResult.Error != nil { + log.Error(dropResult.Error.Err, "failed to drop schemas") + return nil, dropResult.Error.Err + } - if restoreExec.Error != nil { return nil, restoreExec.Error.Err } + + log.Info("executed restore", "full command", restoreExec.FullCommand) + err = revokeSuperUserAccess(targetDBAdmin, rolename) if err != nil { - log.Error(err, "could not revoke super user acesss role from"+rolename) + log.Error(err, "failed to revoke superuser access", "role", rolename) metrics.UsersUpdatedErrors.WithLabelValues("revoke error").Inc() return nil, err } @@ -378,8 +390,8 @@ func (s *copy_schema_state) Execute() (State, error) { return &create_subscription_state{ config: s.config, }, nil - } + func (s *copy_schema_state) Id() StateEnum { return S_CopySchema } diff --git a/pkg/pgctl/pgrestore.go b/pkg/pgctl/pgrestore.go index 2f6d9385..c75cc4bc 100644 --- a/pkg/pgctl/pgrestore.go +++ b/pkg/pgctl/pgrestore.go @@ -19,28 +19,58 @@ type Restore struct { Schemas []string } +// NewRestore creates a new Restore instance with the provided configuration. func NewRestore(DsnUri string) *Restore { - return &Restore{Options: PGDRestoreOpts, DsnUri: DsnUri, Schemas: []string{"public"}} + return &Restore{ + Options: PGDRestoreOpts, + DsnUri: DsnUri, + Schemas: []string{"public"}, + } } +// Exec runs the pg_restore command with the provided filename and options. func (x *Restore) Exec(filename string, opts ExecOptions) Result { - result := Result{} - options := []string{x.DsnUri, "-vON_ERROR_STOP=ON", - fmt.Sprintf("--file=%s%s", x.Path, filename)} + options := []string{ + x.DsnUri, + "-vON_ERROR_STOP=ON", + fmt.Sprintf("--file=%s%s", x.Path, filename), + } options = append(options, x.restoreOptions()...) - result.FullCommand = strings.Join(options, " ") + + result := Result{ + FullCommand: strings.Join(options, " "), + } + cmd := exec.Command(PSQL, options...) - //cmd.Env = append(os.Environ(), x.EnvPassword) - stderrIn, _ := cmd.StderrPipe() + // Pipe to capture error output. + stderrIn, err := cmd.StderrPipe() + if err != nil { + result.Error = &ResultError{Err: err} + return result + } + go func() { result.Output = streamExecOutput(stderrIn, opts) }() - cmd.Start() - err := cmd.Wait() - if exitError, ok := err.(*exec.ExitError); ok { - result.Error = &ResultError{Err: err, ExitCode: exitError.ExitCode(), CmdOutput: result.Output} + + err = cmd.Start() + if err != nil { + result.Error = &ResultError{Err: err, CmdOutput: result.Output} + return result + } + + err = cmd.Wait() + if err != nil { + if exitError, ok := err.(*exec.ExitError); ok { + result.Error = &ResultError{Err: exitError, ExitCode: exitError.ExitCode(), CmdOutput: result.Output} + return result + } + + result.Error = &ResultError{Err: err, CmdOutput: result.Output} + return result } + return result } @@ -76,3 +106,47 @@ func (x *Restore) SetOptions(o []string) { func (x *Restore) GetOptions() []string { return x.Options } + +// DropSchemas drops all schemas except the system ones. +func (x *Restore) DropSchemas() Result { + dropSchemaSQL := ` + DO $$ DECLARE + r RECORD; + BEGIN + FOR r IN (SELECT nspname FROM pg_namespace WHERE nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') AND nspname !~ '^pg_temp_') LOOP + EXECUTE 'DROP SCHEMA IF EXISTS ' || quote_ident(r.nspname) || ' CASCADE'; + END LOOP; + END $$; + ` + + result := Result{} + cmd := exec.Command(PSQL, x.DsnUri, "-c", dropSchemaSQL) + + // Pipe to capture error output. + stderrIn, err := cmd.StderrPipe() + if err != nil { + result.Error = &ResultError{Err: err} + return result + } + + go func() { + result.Output = streamExecOutput(stderrIn, ExecOptions{}) + }() + + if err := cmd.Start(); err != nil { + result.Error = &ResultError{Err: err, CmdOutput: result.Output} + return result + } + + if err := cmd.Wait(); err != nil { + if exitError, ok := err.(*exec.ExitError); ok { + result.Error = &ResultError{Err: exitError, ExitCode: exitError.ExitCode(), CmdOutput: result.Output} + return result + } + + result.Error = &ResultError{Err: err, CmdOutput: result.Output} + return result + } + + return result +}