Skip to content

Commit 5577b6a

Browse files
committed
WIP
1 parent b93752e commit 5577b6a

File tree

6 files changed

+91
-58
lines changed

6 files changed

+91
-58
lines changed

cmd/keeper/cmd/keeper.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -821,9 +821,10 @@ func (p *PostgresKeeper) resync(db, followedDB *cluster.DB, tryPgrewind bool) er
821821
// fallback to pg_basebackup
822822
if tryPgrewind && p.usePgrewind(db) {
823823
connParams := p.getSUConnParams(db, followedDB)
824-
log.Infow("syncing using pg_rewind", "followedDB", followedDB.UID, "keeper", followedDB.Spec.KeeperUID)
825-
// TODO: Make the forceCheckpoint parameter use cluster specification
826-
if err := pgm.SyncFromFollowedPGRewind(connParams, p.pgSUPassword, true); err != nil {
824+
checkpointBeforePgrewind := db.Spec.CheckpointBeforePgrewind
825+
log.Infow("syncing using pg_rewind", "followedDB", followedDB.UID,
826+
"keeper", followedDB.Spec.KeeperUID, "forcingCheckpoint", checkpointBeforePgrewind)
827+
if err := pgm.SyncFromFollowedPGRewind(connParams, p.pgSUPassword, checkpointBeforePgrewind); err != nil {
827828
// log pg_rewind error and fallback to pg_basebackup
828829
log.Errorw("error syncing with pg_rewind", zap.Error(err))
829830
} else {
@@ -1266,19 +1267,18 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
12661267
tryPgrewind = false
12671268
}
12681269

1269-
// TODO(sgotti) pg_rewind considers databases on the same timeline
1270-
// as in sync and doesn't check if they diverged at different
1271-
// position in previous timelines.
1272-
// So check that the db as been synced or resync again with
1273-
// pg_rewind disabled. Will need to report this upstream.
1274-
1275-
// TODO(sgotti) The rewinded standby needs wal from the master
1276-
// starting from the common ancestor, if they aren't available the
1277-
// instance will keep waiting for them, now we assume that if the
1278-
// instance isn't ready after the start timeout, it's waiting for
1279-
// wals and we'll force a full resync.
1280-
// We have to find a better way to detect if a standby is waiting
1281-
// for unavailable wals.
1270+
// TODO(sgotti) pg_rewind considers databases on the same timeline as in sync and
1271+
// doesn't check if they diverged at different position in previous timelines. So
1272+
// check that the db has been synced or resync again with pg_rewind disabled. Will
1273+
// need to report this upstream.
1274+
1275+
// TODO(sgotti) The rewinded standby needs wal from the master starting from the
1276+
// common ancestor, if they aren't available the instance will keep waiting for
1277+
// them, now we assume that if the instance isn't ready after the start timeout,
1278+
// it's waiting for wals and we'll force a full resync.
1279+
//
1280+
// We have to find a better way to detect if a standby is waiting for unavailable
1281+
// wals.
12821282
if err = p.resync(db, followedDB, tryPgrewind); err != nil {
12831283
log.Errorw("failed to resync from followed instance", zap.Error(err))
12841284
return

cmd/sentinel/cmd/sentinel.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ func (s *Sentinel) setDBSpecFromClusterSpec(cd *cluster.ClusterData) {
377377
db.Spec.RequestTimeout = *clusterSpec.RequestTimeout
378378
db.Spec.MaxStandbys = *clusterSpec.MaxStandbys
379379
db.Spec.UsePgrewind = *clusterSpec.UsePgrewind
380+
db.Spec.CheckpointBeforePgrewind = *clusterSpec.CheckpointBeforePgrewind
380381
db.Spec.PGParameters = clusterSpec.PGParameters
381382
db.Spec.PGHBA = clusterSpec.PGHBA
382383
if db.Spec.FollowConfig != nil && db.Spec.FollowConfig.Type == cluster.FollowTypeExternal {

internal/cluster/cluster.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ const (
6666
DefaultMaxSynchronousStandbys uint16 = 1
6767
DefaultAdditionalWalSenders = 5
6868
DefaultUsePgrewind = false
69+
DefaultCheckpointBeforePgrewind = false
6970
DefaultMergePGParameter = true
7071
DefaultRole ClusterRole = ClusterRoleMaster
7172
DefaultSUReplAccess SUReplAccessMode = SUReplAccessAll
@@ -261,6 +262,8 @@ type ClusterSpec struct {
261262
AdditionalMasterReplicationSlots []string `json:"additionalMasterReplicationSlots"`
262263
// Whether to use pg_rewind
263264
UsePgrewind *bool `json:"usePgrewind,omitempty"`
265+
// Whether to issue a CHECKPOINT; before attempting a rewind
266+
CheckpointBeforePgrewind *bool `json:"checkpointBeforePgrewind,omitempty"`
264267
// InitMode defines the cluster initialization mode. Current modes are: new, existing, pitr
265268
InitMode *ClusterInitMode `json:"initMode,omitempty"`
266269
// Whether to merge pgParameters of the initialized db cluster, useful
@@ -379,6 +382,9 @@ func (os *ClusterSpec) WithDefaults() *ClusterSpec {
379382
if s.UsePgrewind == nil {
380383
s.UsePgrewind = BoolP(DefaultUsePgrewind)
381384
}
385+
if s.CheckpointBeforePgrewind == nil {
386+
s.CheckpointBeforePgrewind = BoolP(DefaultCheckpointBeforePgrewind)
387+
}
382388
if s.MinSynchronousStandbys == nil {
383389
s.MinSynchronousStandbys = Uint16P(DefaultMinSynchronousStandbys)
384390
}
@@ -607,6 +613,8 @@ type DBSpec struct {
607613
SynchronousReplication bool `json:"synchronousReplication,omitempty"`
608614
// Whether to use pg_rewind
609615
UsePgrewind bool `json:"usePgrewind,omitempty"`
616+
// Whether to issue a CHECKPOINT; before attempting a rewind
617+
CheckpointBeforePgrewind bool `json:"checkpointBeforePgrewind,omitempty"`
610618
// AdditionalWalSenders defines the number of additional wal_senders in
611619
// addition to the ones internally defined by stolon
612620
AdditionalWalSenders uint16 `json:"additionalWalSenders"`

internal/postgresql/postgresql.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -786,9 +786,9 @@ func (p *Manager) SyncFromFollowedPGRewind(followedConnParams ConnParams, passwo
786786
followedConnParams.Set("options", "-c synchronous_commit=off")
787787
followedConnString := followedConnParams.ConnString()
788788

789-
// TODO: Follow up with tests. We need to issue a checkpoint on the primary prior to us
790-
// starting our recovery, as until the primary checkpoints the global/pg_control file
791-
// won't contain up-to-date information about what timeline the primary exists in.
789+
// We need to issue a checkpoint on the source before pg_rewind'ing as until the primary
790+
// checkpoints the global/pg_control file won't contain up-to-date information about
791+
// what timeline the primary exists in.
792792
//
793793
// Imagine everyone is on timeline 1, then we promote a node to timeline 2. Standbys
794794
// attempt to replicate from the newly promoted node but fail due to diverged timelines.

tests/integration/ha_test.go

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -124,18 +124,21 @@ func TestInitWithMultipleKeepers(t *testing.T) {
124124
}
125125

126126
func setupServers(t *testing.T, clusterName, dir string, numKeepers, numSentinels uint8, syncRepl bool, usePgrewind bool, primaryKeeper *TestKeeper) (testKeepers, testSentinels, *TestProxy, *TestStore) {
127-
var initialClusterSpec *cluster.ClusterSpec
127+
var initialClusterSpec = &cluster.ClusterSpec{
128+
SleepInterval: &cluster.Duration{Duration: 2 * time.Second},
129+
FailInterval: &cluster.Duration{Duration: 5 * time.Second},
130+
ConvergenceTimeout: &cluster.Duration{Duration: 30 * time.Second},
131+
MaxStandbyLag: cluster.Uint32P(50 * 1024), // limit lag to 50kiB
132+
SynchronousReplication: cluster.BoolP(syncRepl),
133+
PGParameters: defaultPGParameters,
134+
// If we want to pg_rewind then also checkpoint beforehand, otherwise we may fail due
135+
// to uncheckpoint'ed timeline changes that have us silently fallback to basebackup.
136+
UsePgrewind: cluster.BoolP(usePgrewind),
137+
CheckpointBeforePgrewind: cluster.BoolP(usePgrewind),
138+
}
139+
128140
if primaryKeeper == nil {
129-
initialClusterSpec = &cluster.ClusterSpec{
130-
InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModeNew),
131-
SleepInterval: &cluster.Duration{Duration: 2 * time.Second},
132-
FailInterval: &cluster.Duration{Duration: 5 * time.Second},
133-
ConvergenceTimeout: &cluster.Duration{Duration: 30 * time.Second},
134-
MaxStandbyLag: cluster.Uint32P(50 * 1024), // limit lag to 50kiB
135-
SynchronousReplication: cluster.BoolP(syncRepl),
136-
UsePgrewind: cluster.BoolP(usePgrewind),
137-
PGParameters: defaultPGParameters,
138-
}
141+
initialClusterSpec.InitMode = cluster.ClusterInitModeP(cluster.ClusterInitModeNew)
139142
} else {
140143
// if primaryKeeper is provided then we should create a standby cluster and do a
141144
// pitr recovery from the external primary database
@@ -147,22 +150,14 @@ func setupServers(t *testing.T, clusterName, dir string, numKeepers, numSentinel
147150
pgpass.WriteString(fmt.Sprintf("%s:%s:*:%s:%s\n", primaryKeeper.pgListenAddress, primaryKeeper.pgPort, primaryKeeper.pgReplUsername, primaryKeeper.pgReplPassword))
148151
pgpass.Close()
149152

150-
initialClusterSpec = &cluster.ClusterSpec{
151-
InitMode: cluster.ClusterInitModeP(cluster.ClusterInitModePITR),
152-
Role: cluster.ClusterRoleP(cluster.ClusterRoleStandby),
153-
SleepInterval: &cluster.Duration{Duration: 2 * time.Second},
154-
FailInterval: &cluster.Duration{Duration: 5 * time.Second},
155-
ConvergenceTimeout: &cluster.Duration{Duration: 30 * time.Second},
156-
MaxStandbyLag: cluster.Uint32P(50 * 1024), // limit lag to 50kiB
157-
SynchronousReplication: cluster.BoolP(syncRepl),
158-
PGParameters: defaultPGParameters,
159-
PITRConfig: &cluster.PITRConfig{
160-
DataRestoreCommand: fmt.Sprintf("PGPASSFILE=%s pg_basebackup -D %%d -h %s -p %s -U %s", pgpass.Name(), primaryKeeper.pgListenAddress, primaryKeeper.pgPort, primaryKeeper.pgReplUsername),
161-
},
162-
StandbyConfig: &cluster.StandbyConfig{
163-
StandbySettings: &cluster.StandbySettings{
164-
PrimaryConninfo: fmt.Sprintf("sslmode=disable host=%s port=%s user=%s password=%s", primaryKeeper.pgListenAddress, primaryKeeper.pgPort, primaryKeeper.pgReplUsername, primaryKeeper.pgReplPassword),
165-
},
153+
initialClusterSpec.InitMode = cluster.ClusterInitModeP(cluster.ClusterInitModePITR)
154+
initialClusterSpec.Role = cluster.ClusterRoleP(cluster.ClusterRoleStandby)
155+
initialClusterSpec.PITRConfig = &cluster.PITRConfig{
156+
DataRestoreCommand: fmt.Sprintf("PGPASSFILE=%s pg_basebackup -D %%d -h %s -p %s -U %s", pgpass.Name(), primaryKeeper.pgListenAddress, primaryKeeper.pgPort, primaryKeeper.pgReplUsername),
157+
}
158+
initialClusterSpec.StandbyConfig = &cluster.StandbyConfig{
159+
StandbySettings: &cluster.StandbySettings{
160+
PrimaryConninfo: fmt.Sprintf("sslmode=disable host=%s port=%s user=%s password=%s", primaryKeeper.pgListenAddress, primaryKeeper.pgPort, primaryKeeper.pgReplUsername, primaryKeeper.pgReplPassword),
166161
},
167162
}
168163
}
@@ -1011,6 +1006,23 @@ func testTimelineFork(t *testing.T, syncRepl, usePgrewind bool) {
10111006
t.Fatalf("unexpected err: %v", err)
10121007
}
10131008

1009+
ctx, cancel := context.WithCancel(context.Background())
1010+
defer cancel()
1011+
go func(ctx context.Context) {
1012+
var cd string
1013+
for {
1014+
select {
1015+
case <-ctx.Done():
1016+
return
1017+
case <-time.After(100 * time.Millisecond):
1018+
if newCd := standbys[0].PGControldata(); newCd != cd {
1019+
t.Logf("standby[0] controldata changed: %s", newCd)
1020+
cd = newCd
1021+
}
1022+
}
1023+
}
1024+
}(ctx)
1025+
10141026
// Stop standby[0]
10151027
t.Logf("Stopping standby[0]: %s", standbys[0].uid)
10161028
standbys[0].Stop()
@@ -1112,19 +1124,19 @@ func testTimelineFork(t *testing.T, syncRepl, usePgrewind bool) {
11121124
if usePgrewind {
11131125
output := standbys[1].ReadOutput()
11141126
if !strings.Contains(output, "running pg_rewind") {
1115-
t.Error("expected to run pg_rewind but could not find it in logs")
1127+
t.Errorf("expected logs to contain evidence of running pg_rewind: %s", standbys[1].uid)
11161128
}
11171129

11181130
// This will occur whenever pg_rewind is run against a Postgres that has not
11191131
// checkpointed since its timeline previously forked. pg_rewind first grabs the
11201132
// pg_control file to check if timelines are diverged, and this file won't have been
11211133
// updated until a checkpoint takes place.
1122-
if !strings.Contains(output, "no rewind required") {
1123-
t.Error("keeper tried rewinding but rewind thought it was not required")
1134+
if strings.Contains(output, "no rewind required") {
1135+
t.Errorf("keeper tried rewinding but rewind thought it was not required: %s", standbys[1].uid)
11241136
}
11251137

11261138
if strings.Contains(output, "running pg_basebackup") {
1127-
t.Error("pg_rewind is enabled but we performed a pg_basebackup anyway")
1139+
t.Errorf("pg_rewind is enabled but we performed a pg_basebackup anyway: %s", standbys[1].uid)
11281140
}
11291141
}
11301142

tests/integration/utils.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"database/sql"
2222
"encoding/json"
2323
"fmt"
24-
"io"
2524
"io/ioutil"
2625
"net"
2726
"os"
@@ -54,7 +53,11 @@ const (
5453
)
5554

5655
var (
57-
defaultPGParameters = cluster.PGParameters{"log_destination": "stderr", "logging_collector": "false"}
56+
defaultPGParameters = cluster.PGParameters{
57+
"log_destination": "stderr",
58+
"logging_collector": "false",
59+
"log_checkpoints": "on",
60+
}
5861
)
5962

6063
var curPort = MinPort
@@ -430,12 +433,8 @@ func NewTestKeeper(t *testing.T, dir, clusterName, pgSUUsername, pgSUPassword, p
430433
// ReadOutput returns the latest output from the keeper process: reading the output
431434
// consumes the output.
432435
func (tk *TestKeeper) ReadOutput() string {
433-
lines, err := tk.Process.output.ReadString('\n')
434-
if err != nil && err != io.EOF {
435-
tk.t.Fatalf("failed to read output from test keeper buffer: %v", err)
436-
}
437-
438-
return lines
436+
defer tk.Process.output.Reset()
437+
return tk.Process.output.String()
439438
}
440439

441440
func (tk *TestKeeper) PGDataVersion() (int, int, error) {
@@ -454,6 +453,19 @@ func (tk *TestKeeper) PGDataVersion() (int, int, error) {
454453
return pg.ParseVersion(version)
455454
}
456455

456+
// PGControldata calls pg_controldata for the Postgres database directory. This can be
457+
// useful in debugging database state, especially when comparing what was successfully
458+
// checkpointed.
459+
func (tk *TestKeeper) PGControldata() string {
460+
cmd := exec.Command("pg_controldata", filepath.Join(tk.dataDir, "postgres"))
461+
output, err := cmd.CombinedOutput()
462+
if err != nil {
463+
tk.t.Fatalf("failed to run pg_controldata: %v:\n%s", err, output)
464+
}
465+
466+
return string(output)
467+
}
468+
457469
func (tk *TestKeeper) GetPrimaryConninfo() (pg.ConnParams, error) {
458470
regex := regexp.MustCompile(`\s*primary_conninfo\s*=\s*'(.*)'$`)
459471

0 commit comments

Comments
 (0)