Skip to content

Commit

Permalink
Parse the pgcontrol file on boot to ensure compatibility with our int…
Browse files Browse the repository at this point in the history
…ernal defaults.
  • Loading branch information
davissp14 committed Feb 6, 2025
1 parent bb46120 commit 8b4fd9e
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 21 deletions.
4 changes: 2 additions & 2 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (n *Node) Init(ctx context.Context) error {
return fmt.Errorf("failed to initialize fly config: %s", err)
}

if err := n.PGConfig.initialize(store); err != nil {
if err := n.PGConfig.initialize(ctx, store); err != nil {
return fmt.Errorf("failed to initialize pg config: %s", err)
}

Expand Down Expand Up @@ -527,7 +527,7 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro
}

// Set restore configuration
if err := n.PGConfig.initialize(store); err != nil {
if err := n.PGConfig.initialize(ctx, store); err != nil {
return fmt.Errorf("failed to initialize pg config: %s", err)
}

Expand Down
27 changes: 24 additions & 3 deletions internal/flypg/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (c *PGConfig) Print(w io.Writer) error {
return e.Encode(cfg)
}

func (c *PGConfig) SetDefaults(store *state.Store) error {
func (c *PGConfig) SetDefaults(ctx context.Context, store *state.Store) error {
// The default wal_segment_size in mb
const walSegmentSize = 16

Expand Down Expand Up @@ -184,6 +184,27 @@ func (c *PGConfig) SetDefaults(store *state.Store) error {
return fmt.Errorf("failed to set recovery target config: %s", err)
}

// Override any default settings that may conflict with pg_control.
pgControlMap, err := pgControlSettings(ctx)
if err != nil {
return fmt.Errorf("failed to fetch pg_control settings: %s", err)
}

if pgControlMap != nil {

Check failure on line 193 in internal/flypg/pg.go

View workflow job for this annotation

GitHub Actions / Staticcheck

unnecessary nil check around range (S1031)
for k, v := range pgControlMap {
// Skip any settings that are not already specified by the internal config
if _, ok := c.internalConfig[k]; !ok {
continue
}

// Check for value discrepancies and log a warning if found.
if c.internalConfig[k] != v {
log.Printf("[WARN] Overriding internal config setting %s: %s -> %s", k, c.internalConfig[k], v)
c.internalConfig[k] = v
}
}
}

return nil
}

Expand Down Expand Up @@ -269,7 +290,7 @@ func (c *PGConfig) isInitialized() bool {

// initialize will ensure the required configuration files are stubbed and the parent
// postgresql.conf file includes them.
func (c *PGConfig) initialize(store *state.Store) error {
func (c *PGConfig) initialize(ctx context.Context, store *state.Store) error {
if err := c.setDefaultHBA(); err != nil {
return fmt.Errorf("failed updating pg_hba.conf: %s", err)
}
Expand All @@ -294,7 +315,7 @@ func (c *PGConfig) initialize(store *state.Store) error {
}
}

if err := c.SetDefaults(store); err != nil {
if err := c.SetDefaults(context.TODO(), store); err != nil {
return fmt.Errorf("failed to set pg defaults: %s", err)
}

Expand Down
62 changes: 62 additions & 0 deletions internal/flypg/pg_control.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package flypg

import (
"bufio"
"context"
"fmt"
"strings"

"log"

"github.com/fly-apps/postgres-flex/internal/utils"
)

const (
pathToPGControl = "/data/postgresql/global/pg_control"
)

func pgControlSettings(ctx context.Context) (map[string]string, error) {
// Short-circuit if the pg_control file doesn't exist.
if !utils.FileExists(pathToPGControl) {
log.Println("[WARN] pg_control file does not exist. Skipping evaluation.")
return nil, nil
}

result, err := utils.RunCmd(ctx, "root", "pg_controldata")
if err != nil {
return nil, fmt.Errorf("failed to run pg_controldata: %s", err)
}

return parsePGControlData(string(result))
}

func parsePGControlData(pgControlData string) (map[string]string, error) {
settings := make(map[string]string)

scanner := bufio.NewScanner(strings.NewReader(pgControlData))
for scanner.Scan() {
line := scanner.Text()

// Filter out lines that don't contain the word "setting".
if !strings.Contains(line, "setting:") {
continue
}

parts := strings.SplitN(line, "setting:", 2)
if len(parts) != 2 {
continue
}

key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])

settings[key] = value
}

// Check for any scanner errors.
if err := scanner.Err(); err != nil {
return nil, err
}

return settings, nil
}
60 changes: 60 additions & 0 deletions internal/flypg/pg_control_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package flypg

import (
"testing"
)

func TestParseSettingsFromFile(t *testing.T) {
// Sample input that includes some lines with "setting:" and some without.
input := `pg_control version number: 1300
Catalog version number: 202307071
Database system identifier: 7420479024646529412
Database cluster state: in archive recovery
pg_control last modified: Tue 04 Feb 2025 10:04:52 PM UTC
Latest checkpoint location: 2/40000060
Latest checkpoint's REDO location: 2/40000028
Latest checkpoint's REDO WAL file: 000000020000000200000040
Latest checkpoint's TimeLineID: 2
Latest checkpoint's PrevTimeLineID: 2
Latest checkpoint's full_page_writes: on
Latest checkpoint's NextXID: 0:34
wal_level setting: replica
wal_log_hints setting: on
max_connections setting: 500
max_worker_processes setting: 8
Some other line without the keyword
Blocks per segment of large relation: 131072
WAL block size: 8192
Bytes per WAL segment: 16777216
Maximum length of identifiers: 64
Maximum columns in an index: 32
Maximum size of a TOAST chunk: 1996
Size of a large-object chunk: 2048`

settings, err := parsePGControlData(input)
if err != nil {
t.Fatalf("parsePGControlData returned an error: %v", err)
}

// Define the expected key/value pairs.
expected := map[string]string{
"wal_level": "replica",
"wal_log_hints": "on",
"max_connections": "500",
"max_worker_processes": "8",
}

if len(settings) != len(expected) {
t.Errorf("expected %d settings, got %d", len(expected), len(settings))
}

// Verify that the expected key/value pairs are present in the settings map.
for key, want := range expected {
got, ok := settings[key]
if !ok {
t.Errorf("expected key %q not found in settings", key)
} else if got != want {
t.Errorf("for key %q, expected value %q, got %q", key, want, got)
}
}
}
35 changes: 19 additions & 16 deletions internal/flypg/pg_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package flypg

import (
"context"
"fmt"
"os"
"strings"
Expand All @@ -25,6 +26,8 @@ func TestPGConfigInitialization(t *testing.T) {
}
defer cleanup()

ctx := context.TODO()

pgConf := &PGConfig{
DataDir: pgTestDirectory,
Port: 5433,
Expand All @@ -41,7 +44,7 @@ func TestPGConfigInitialization(t *testing.T) {

t.Run("initialize", func(t *testing.T) {
store, _ := state.NewStore()
if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}
})
Expand Down Expand Up @@ -98,7 +101,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Setenv("TIMESCALEDB_ENABLED", "true")
store, _ := state.NewStore()

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -129,7 +132,7 @@ func TestPGConfigInitialization(t *testing.T) {
}

t.Run("defaults", func(t *testing.T) {
if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -160,7 +163,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Fatal(err)
}

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -181,7 +184,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Fatal(err)
}

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -202,7 +205,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Fatal(err)
}

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -223,7 +226,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Fatal(err)
}

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -242,7 +245,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Setenv("S3_ARCHIVE_CONFIG", "https://my-key:[email protected]/my-bucket/my-directory")
store, _ := state.NewStore()

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -257,7 +260,7 @@ func TestPGConfigInitialization(t *testing.T) {

t.Setenv("S3_ARCHIVE_CONFIG", "")

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -275,7 +278,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:[email protected]/my-bucket/my-directory?targetTime=2024-06-30T11:15:00-06:00")
store, _ := state.NewStore()

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -293,7 +296,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:[email protected]/my-bucket/my-directory?targetName=20240626T172443")
store, _ := state.NewStore()

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -311,7 +314,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:[email protected]/my-bucket/my-directory?target=immediate")
store, _ := state.NewStore()

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -329,7 +332,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:[email protected]/my-bucket/my-directory?targetTime=2024-06-30T11:15:00Z&targetInclusive=false")
store, _ := state.NewStore()

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand All @@ -351,7 +354,7 @@ func TestPGConfigInitialization(t *testing.T) {
t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:[email protected]/my-bucket/my-directory?targetTime=2024-06-30T11:15:00-06:00&targetTimeline=2")
store, _ := state.NewStore()

if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(ctx, store); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -391,7 +394,7 @@ func TestPGUserConfigOverride(t *testing.T) {
}

store, _ := state.NewStore()
if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(context.TODO(), store); err != nil {
t.Error(err)
}

Expand Down Expand Up @@ -542,7 +545,7 @@ func TestValidateCompatibility(t *testing.T) {
}

store, _ := state.NewStore()
if err := pgConf.initialize(store); err != nil {
if err := pgConf.initialize(context.TODO(), store); err != nil {
t.Fatal(err)
}
t.Run("SharedPreloadLibraries", func(t *testing.T) {
Expand Down

0 comments on commit 8b4fd9e

Please sign in to comment.