Skip to content

Commit

Permalink
chore(zero-cache): add more logging to debug watermark rejections (#3653
Browse files Browse the repository at this point in the history
)
  • Loading branch information
darkgnotic authored Jan 30, 2025
1 parent c952b11 commit 5a9242b
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ class PostgresChangeSource implements ChangeSource {
async startStream(clientWatermark: string): Promise<ChangeStream> {
const db = pgClient(this.#lc, this.#upstreamUri);
const slot = replicationSlot(this.#shardID);
const clientStart = oneAfter(clientWatermark);

try {
await this.#stopExistingReplicationSlotSubscriber(db, slot);
Expand All @@ -207,7 +206,7 @@ class PostgresChangeSource implements ChangeSource {
// Unlike the postgres.js client, the pg client does not have an option to
// only use SSL if the server supports it. We achieve it manually by
// trying SSL first, and then falling back to connecting without SSL.
return await this.#startStream(slot, clientStart, config, useSSL);
return await this.#startStream(slot, clientWatermark, config, useSSL);
} catch (e) {
if (e instanceof SSLUnsupportedError) {
this.#lc.info?.('retrying upstream connection without SSL');
Expand Down Expand Up @@ -237,7 +236,7 @@ class PostgresChangeSource implements ChangeSource {

async #startStream(
slot: string,
clientStart: string,
clientWatermark: string,
shardConfig: InternalShardConfig,
useSSL: boolean,
): Promise<ChangeStream> {
Expand Down Expand Up @@ -312,6 +311,7 @@ class PostgresChangeSource implements ChangeSource {

acker = new Acker(service);

const clientStart = oneAfter(clientWatermark);
service
.subscribe(
new PgoutputPlugin({
Expand All @@ -325,7 +325,11 @@ class PostgresChangeSource implements ChangeSource {
.then(() => changes.cancel(), handleError);

await started;
this.#lc.info?.(`started replication stream@${slot}`);

const {replicaVersion} = this.#replicationConfig;
this.#lc.info?.(
`started replication stream@${slot} from ${clientWatermark} (replicaVersion: ${replicaVersion})`,
);

return {
changes,
Expand Down
7 changes: 5 additions & 2 deletions packages/zero-cache/src/services/change-streamer/storer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ export class Storer implements Service {
count++;
} else {
this.#lc.warn?.(
`rejecting subscriber at watermark ${sub.watermark}`,
`rejecting subscriber at watermark ${sub.watermark} (earliest watermark: ${entry.watermark})`,
);
sub.close(
ErrorType.WatermarkTooOld,
Expand All @@ -249,7 +249,10 @@ export class Storer implements Service {
} ms)`,
);
} else {
this.#lc.warn?.(`rejecting subscriber at watermark ${sub.watermark}`);
const lastStoredWatermark = await this.getLastStoredWatermark();
this.#lc.warn?.(
`rejecting subscriber at watermark ${sub.watermark} (latest watermark: ${lastStoredWatermark})`,
);
sub.close(
ErrorType.WatermarkNotFound,
`cannot catch up from requested watermark ${sub.watermark}`,
Expand Down

0 comments on commit 5a9242b

Please sign in to comment.