Skip to content

Commit

Permalink
support pg_cron/pg_partition issue during migration (#234)
Browse files Browse the repository at this point in the history
* support pg_cron/pg_partition issue during migration

* trying a newer version of ubuntu
  • Loading branch information
bjeevan-ib authored May 10, 2024
1 parent 8d7ef54 commit 587a62c
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 31 deletions.
5 changes: 4 additions & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ pipeline {
stage("Setup") {
steps {
prepareBuild()
sh 'echo "deb http://apt.postgresql.org/pub/repos/apt $(sudo lsb_release -cs)-pgdg main" | sudo tee /etc/apt/sources.list.d/pgdg.list'
sh 'sudo wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -'
sh 'sudo apt-get update -y'
}
}
stage("Run tests") {
Expand All @@ -32,7 +35,7 @@ pipeline {
}
dir("$DIRECTORY") {
sh "sudo apt-get update"
sh "sudo apt-get -y install postgresql-client"
sh "sudo apt-get -y install postgresql-client-14"
sh "echo 'db-controller-name' > .id"
sh "make test"
sh "sudo apt-get -y remove postgresql-client"
Expand Down
10 changes: 5 additions & 5 deletions controllers/dbc_end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ var _ = Describe("db-controller end to end testing", Label("integration"), Order
db1 = namespace + "-db-1"
db2 = namespace + "-db-2"
db3 = namespace + "-db-3"
rds1 = "box-3-" + db1 + "-1ec9b27c"
rds1 = "box-3-" + db1 + "-1d9fb876"
newdbcMasterSecretName = rds1 + "-master"
createNamespace()
})
Expand Down Expand Up @@ -226,7 +226,7 @@ func MigratePostgresToAuroraRDS() {
return "", err
}
return string(secret.Data["hostname"]), nil
}, time.Minute*20, interval_e2e).Should(ContainSubstring("box-3-" + db2 + "-bb1e7196"))
}, time.Minute*20, interval_e2e).Should(ContainSubstring("box-3-" + db2 + "-b8487b9c"))
}

func MigrateUseExistingToNewRDS() {
Expand Down Expand Up @@ -288,7 +288,7 @@ func UseExistingPostgresRDSTest() {
DSNName: "dsn",
EnableReplicationRole: &falseVal,
UseExistingSource: &trueVal,
DBVersion: "15.3",
DBVersion: "15.5",
DeletionPolicy: "delete",
SourceDataFrom: &persistancev1.SourceDataFrom{
Type: persistancev1.SourceDataType("database"),
Expand Down Expand Up @@ -393,7 +393,7 @@ func createPostgresRDSTest() {
DSNName: "dsn",
EnableReplicationRole: &falseVal,
UseExistingSource: &falseVal,
DBVersion: "15.3",
DBVersion: "15.5",
},
}
Expect(e2e_k8sClient.Create(ctx, dbClaim)).Should(Succeed())
Expand Down Expand Up @@ -437,7 +437,7 @@ func cleanupdb(db string) {
DSNName: "dsn",
EnableReplicationRole: &falseVal,
UseExistingSource: &falseVal,
DBVersion: "15.3",
DBVersion: "15.5",
DeletionPolicy: "delete",
},
}
Expand Down
67 changes: 48 additions & 19 deletions pkg/pgctl/pgctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,37 @@ func (s *create_publication_state) Execute() (State, error) {
FROM pg_catalog.pg_publication
WHERE pubname = $1)`

// dynamically creating the table list to be included in the publication
// this would avoid the issue related to partition extention tables. These schemas are getting
// changed significantly between versions and the publication would fail if we include them.
// Tried using schema name syntax but it does not in older version of postgres
// there are 13.x used by tide in EU prod
// After all RDSs are upgraded, we can remove this logic and include schema based publications

createPub := fmt.Sprintf(`
CREATE PUBLICATION %s
FOR ALL TABLES`, DefaultPubName)
DO $$
DECLARE
table_list TEXT := '';
BEGIN
SELECT INTO table_list string_agg(quote_ident(relname), ', ')
FROM pg_class c
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = 'public' -- Only public schema
AND c.relkind = 'r' -- Only include ordinary tables
AND c.relpersistence = 'p' -- Only include persistent tables
AND NOT EXISTS ( -- Exclude foreign tables
SELECT 1 FROM pg_foreign_table ft WHERE ft.ftrelid = c.oid
)
AND NOT EXISTS ( -- Exclude materialized views
SELECT 1 FROM pg_matviews mv WHERE mv.matviewname = c.relname AND mv.schemaname = n.nspname
);
IF table_list != '' THEN
EXECUTE 'CREATE PUBLICATION %s FOR TABLE ' || table_list;
ELSE
EXECUTE 'CREATE PUBLICATION %s';
END IF;
END $$`, DefaultPubName, DefaultPubName)

err = sourceDBAdmin.QueryRow(q, DefaultPubName).Scan(&exists)
if err != nil {
Expand Down Expand Up @@ -492,14 +520,14 @@ func (s *cut_over_readiness_check_state) Execute() (State, error) {
AND slot_name like '%s_%%'
AND temporary = 't'
)`, DefaultPubName)
relExistsQuery := fmt.Sprintf(`
SELECT EXISTS
(
SELECT 1
FROM pg_subscription s, pg_subscription_rel sr
WHERE s.oid = sr.srsubid
AND s.subname = '%s'
)`, DefaultSubName)
// relExistsQuery := fmt.Sprintf(`
// SELECT EXISTS
// (
// SELECT 1
// FROM pg_subscription s, pg_subscription_rel sr
// WHERE s.oid = sr.srsubid
// AND s.subname = '%s'
// )`, DefaultSubName)

subQuery := fmt.Sprintf(`
SELECT count(srrelid)
Expand All @@ -518,15 +546,16 @@ func (s *cut_over_readiness_check_state) Execute() (State, error) {
return retry(s.config), nil
}

err = targetDBAdmin.QueryRow(relExistsQuery).Scan(&exists)
if err != nil {
log.Error(err, "could not query for subscription rel for records present")
return nil, err
}
if !exists {
log.Info("target yet to start receiving data - retry check in a few seconds")
return retry(s.config), nil
}
// 15.5 seems to behave differently than previous versions regarding the subscription rel
// err = targetDBAdmin.QueryRow(relExistsQuery).Scan(&exists)
// if err != nil {
// log.Error(err, "could not query for subscription rel for records present")
// return nil, err
// }
// if !exists {
// log.Info("target yet to start receiving data - retry check in a few seconds")
// return retry(s.config), nil
// }

err = targetDBAdmin.QueryRow(subQuery).Scan(&count)
if err != nil {
Expand Down
25 changes: 19 additions & 6 deletions pkg/pgctl/pgctl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ var (
TargetDBUserDsn string
ExportFilePath = "/tmp/"
repository = "postgres"
sourceVersion = "10"
targetVersion = "12.3"
sourceVersion = "13.14"
targetVersion = "15.6"
sourcePort = "5435"
targetPort = "5436"
testDBNetwork = "testDBNetwork"
Expand Down Expand Up @@ -79,14 +79,27 @@ func realTestMain(m *testing.M) int {
}

//validate that no other network is lingering around from a prev test
networks, err := pool.NetworksByName(testDBNetwork)
//networks, err := pool.NetworksByName(testDBNetwork)
networks, err := pool.Client.ListNetworks()
if err != nil {
fmt.Println(err)
return 1
}
if len(networks) != 0 {
fmt.Printf("Expected 0 but got %v networks\n", len(networks))
return 1
networkExists := false
netID := ""
for _, network := range networks {
if network.Name == testDBNetwork {
networkExists = true
netID = network.ID
break
}
}
if networkExists {
err = pool.Client.RemoveNetwork(netID)
if err != nil {
fmt.Println(err)
return 1
}
}

network, err := pool.CreateNetwork(testDBNetwork)
Expand Down
8 changes: 8 additions & 0 deletions pkg/pgctl/pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,5 +141,13 @@ func (x *Dump) modifyPgDumpInfo() error {
return fmt.Errorf("error running sed command add if not exists to partman schema creation: %w", err)
}

// create pg_cron without specifying the schema
replacePgCronCmd := exec.Command("sed", "-i", "s/CREATE EXTENSION IF NOT EXISTS pg_cron WITH SCHEMA public;/CREATE EXTENSION IF NOT EXISTS pg_cron;/", filePath)
replacePgCronCmd.Stderr = os.Stderr
replacePgCronCmd.Stdout = os.Stdout
if err := replacePgCronCmd.Run(); err != nil {
return fmt.Errorf("error running sed command to create pg_cron without specifying the schema: %w", err)
}

return nil
}

0 comments on commit 587a62c

Please sign in to comment.