Skip to content

Commit eadf844

Browse files
committed
{WIP} Use SQLite as the file format for CDC streaming.
1 parent 5c50c3b commit eadf844

18 files changed

+865
-938
lines changed

src/bin/pgcopydb/catalog.c

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,17 @@ static char *sourceDBcreateDDLs[] = {
183183
"create table sentinel("
184184
" id integer primary key check (id = 1), "
185185
" startpos pg_lsn, endpos pg_lsn, apply bool, "
186-
" write_lsn pg_lsn, flush_lsn pg_lsn, replay_lsn pg_lsn)"
186+
" write_lsn pg_lsn, flush_lsn pg_lsn, "
187+
" transform_lsn pg_lsn, "
188+
" replay_lsn pg_lsn)",
189+
190+
"create table cdc_files("
191+
" id integer primary key, filename text unique, timeline integer, "
192+
" startpos pg_lsn, endpos pg_lsn, "
193+
" start_time_epoch integer, done_time_epoch integer)",
194+
195+
"create table timeline_history("
196+
" tli integer primary key, startpos pg_lsn, endpos pg_lsn)"
187197
};
188198

189199

@@ -407,6 +417,28 @@ static char *targetDBcreateDDLs[] = {
407417
};
408418

409419

420+
static char *replayDBcreateDDLs[] = {
421+
"create table output("
422+
" id integer primary key, "
423+
" action text, xid integer, lsn pg_lsn, timestamp text, "
424+
" message text)",
425+
426+
"create unique index o_a_lsn on output(action, lsn)",
427+
"create unique index o_a_xid on output(action, xid)",
428+
429+
"create table stmt(hash text primary key, sql text)",
430+
"create unique index stmt_hash on stmt(hash)",
431+
432+
"create table replay("
433+
" id integer primary key, "
434+
" action text, xid integer, lsn pg_lsn, timestamp text, "
435+
" stmt_hash text references stmt(hash), stmt_args jsonb)",
436+
437+
"create index r_xid on replay(xid)",
438+
"create index r_lsn on replay(lsn)",
439+
};
440+
441+
410442
static char *sourceDBdropDDLs[] = {
411443
"drop table if exists setup",
412444
"drop table if exists section",
@@ -473,6 +505,12 @@ static char *targetDBdropDDLs[] = {
473505
};
474506

475507

508+
static char *replayDBdropDDLs[] = {
509+
"drop table if exists output",
510+
"drop table if exists replay"
511+
};
512+
513+
476514
/*
477515
* catalog_init_from_specs initializes our internal catalog database file from
478516
* a specification.
@@ -945,6 +983,13 @@ catalog_create_schema(DatabaseCatalog *catalog)
945983
break;
946984
}
947985

986+
case DATABASE_CATALOG_TYPE_REPLAY:
987+
{
988+
createDDLs = replayDBcreateDDLs;
989+
count = sizeof(replayDBcreateDDLs) / sizeof(replayDBcreateDDLs[0]);
990+
break;
991+
}
992+
948993
default:
949994
{
950995
log_error("BUG: called catalog_init for unknown type %d",
@@ -1005,6 +1050,13 @@ catalog_drop_schema(DatabaseCatalog *catalog)
10051050
break;
10061051
}
10071052

1053+
case DATABASE_CATALOG_TYPE_REPLAY:
1054+
{
1055+
dropDDLs = replayDBdropDDLs;
1056+
count = sizeof(replayDBdropDDLs) / sizeof(replayDBdropDDLs[0]);
1057+
break;
1058+
}
1059+
10081060
default:
10091061
{
10101062
log_error("BUG: called catalog_drop_schema for unknown type %d",
@@ -7996,6 +8048,27 @@ catalog_bind_parameters(sqlite3 *db,
79968048

79978049
switch (p->type)
79988050
{
8051+
case BIND_PARAMETER_TYPE_NULL:
8052+
{
8053+
int rc = sqlite3_bind_null(ppStmt, n);
8054+
8055+
if (rc != SQLITE_OK)
8056+
{
8057+
log_error("[SQLite %d] Failed to bind \"%s\" to NULL: %s",
8058+
rc,
8059+
p->name,
8060+
sqlite3_errstr(rc));
8061+
return false;
8062+
}
8063+
8064+
if (logSQL)
8065+
{
8066+
appendPQExpBuffer(debugParameters, "%s", "null");
8067+
}
8068+
8069+
break;
8070+
}
8071+
79998072
case BIND_PARAMETER_TYPE_INT:
80008073
{
80018074
int rc = sqlite3_bind_int(ppStmt, n, p->intVal);

src/bin/pgcopydb/catalog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,7 @@ bool catalog_count_summary_done_fetch(SQLiteQuery *query);
628628
typedef enum
629629
{
630630
BIND_PARAMETER_TYPE_UNKNOWN = 0,
631+
BIND_PARAMETER_TYPE_NULL,
631632
BIND_PARAMETER_TYPE_INT,
632633
BIND_PARAMETER_TYPE_INT64,
633634
BIND_PARAMETER_TYPE_TEXT

src/bin/pgcopydb/cli_clone_follow.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ clone_and_follow(CopyDataSpec *copySpecs)
208208
copyDBoptions.endpos,
209209
STREAM_MODE_CATCHUP,
210210
&(copySpecs->catalogs.source),
211+
&(copySpecs->catalogs.replay),
211212
copyDBoptions.stdIn,
212213
copyDBoptions.stdOut,
213214
logSQL))
@@ -386,6 +387,7 @@ cli_follow(int argc, char **argv)
386387
copyDBoptions.endpos,
387388
STREAM_MODE_CATCHUP,
388389
&(copySpecs.catalogs.source),
390+
&(copySpecs.catalogs.replay),
389391
copyDBoptions.stdIn,
390392
copyDBoptions.stdOut,
391393
logSQL))

src/bin/pgcopydb/cli_snapshot.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ cli_create_snapshot(int argc, char **argv)
323323
createSNoptions.endpos,
324324
STREAM_MODE_CATCHUP,
325325
&(copySpecs.catalogs.source),
326+
&(copySpecs.catalogs.replay),
326327
createSNoptions.stdIn,
327328
createSNoptions.stdOut,
328329
logSQL))

src/bin/pgcopydb/cli_stream.c

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,7 @@ cli_stream_setup(int argc, char **argv)
582582
streamDBoptions.endpos,
583583
STREAM_MODE_CATCHUP,
584584
&(copySpecs.catalogs.source),
585+
&(copySpecs.catalogs.replay),
585586
streamDBoptions.stdIn,
586587
streamDBoptions.stdOut,
587588
logSQL))
@@ -709,6 +710,7 @@ cli_stream_catchup(int argc, char **argv)
709710
streamDBoptions.endpos,
710711
STREAM_MODE_CATCHUP,
711712
&(copySpecs.catalogs.source),
713+
&(copySpecs.catalogs.replay),
712714
streamDBoptions.stdIn,
713715
streamDBoptions.stdOut,
714716
logSQL))
@@ -792,6 +794,7 @@ cli_stream_replay(int argc, char **argv)
792794
streamDBoptions.endpos,
793795
STREAM_MODE_REPLAY,
794796
&(copySpecs.catalogs.source),
797+
&(copySpecs.catalogs.replay),
795798
true, /* stdin */
796799
true, /* stdout */
797800
logSQL))
@@ -800,17 +803,6 @@ cli_stream_replay(int argc, char **argv)
800803
exit(EXIT_CODE_INTERNAL_ERROR);
801804
}
802805

803-
/*
804-
* Remove the possibly still existing stream context files from
805-
* previous round of operations (--resume, etc). We want to make sure
806-
* that the catchup process reads the files created on this connection.
807-
*/
808-
if (!stream_cleanup_context(&specs))
809-
{
810-
/* errors have already been logged */
811-
exit(EXIT_CODE_INTERNAL_ERROR);
812-
}
813-
814806
/*
815807
* Before starting the receive, transform, and apply sub-processes, we need
816808
* to set the sentinel endpos to the command line --endpos option, when
@@ -917,6 +909,7 @@ cli_stream_transform(int argc, char **argv)
917909
streamDBoptions.endpos,
918910
STREAM_MODE_CATCHUP,
919911
&(copySpecs.catalogs.source),
912+
&(copySpecs.catalogs.replay),
920913
streamDBoptions.stdIn,
921914
streamDBoptions.stdOut,
922915
logSQL))
@@ -1079,6 +1072,7 @@ cli_stream_apply(int argc, char **argv)
10791072
streamDBoptions.endpos,
10801073
STREAM_MODE_CATCHUP,
10811074
&(copySpecs.catalogs.source),
1075+
&(copySpecs.catalogs.replay),
10821076
true, /* streamDBoptions.stdIn */
10831077
false, /* streamDBoptions.stdOut */
10841078
logSQL))
@@ -1102,6 +1096,7 @@ cli_stream_apply(int argc, char **argv)
11021096

11031097
if (!stream_apply_init_context(&context,
11041098
&(copySpecs.catalogs.source),
1099+
&(copySpecs.catalogs.replay),
11051100
&(copySpecs.cfPaths.cdc),
11061101
&(streamDBoptions.connStrings),
11071102
streamDBoptions.origin,
@@ -1190,6 +1185,7 @@ stream_start_in_mode(LogicalStreamMode mode)
11901185
streamDBoptions.endpos,
11911186
mode,
11921187
&(copySpecs.catalogs.source),
1188+
&(copySpecs.catalogs.replay),
11931189
streamDBoptions.stdIn,
11941190
streamDBoptions.stdOut,
11951191
logSQL))
@@ -1214,18 +1210,6 @@ stream_start_in_mode(LogicalStreamMode mode)
12141210

12151211
case STREAM_MODE_PREFETCH:
12161212
{
1217-
/*
1218-
* Remove the possibly still existing stream context files from
1219-
* previous round of operations (--resume, etc). We want to make
1220-
* sure that the catchup process reads the files created on this
1221-
* connection.
1222-
*/
1223-
if (!stream_cleanup_context(&specs))
1224-
{
1225-
/* errors have already been logged */
1226-
exit(EXIT_CODE_INTERNAL_ERROR);
1227-
}
1228-
12291213
if (!followDB(&copySpecs, &specs))
12301214
{
12311215
/* errors have already been logged */

src/bin/pgcopydb/copydb.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,17 +563,21 @@ copydb_init_specs(CopyDataSpec *specs,
563563
DatabaseCatalog *source = &(specs->catalogs.source);
564564
DatabaseCatalog *filter = &(specs->catalogs.filter);
565565
DatabaseCatalog *target = &(specs->catalogs.target);
566+
DatabaseCatalog *replay = &(specs->catalogs.replay);
566567

567568
/* init the catalog type */
568569
source->type = DATABASE_CATALOG_TYPE_SOURCE;
569570
filter->type = DATABASE_CATALOG_TYPE_FILTER;
570571
target->type = DATABASE_CATALOG_TYPE_TARGET;
572+
replay->type = DATABASE_CATALOG_TYPE_REPLAY;
571573

572574
/* pick the dbfile from the specs */
573575
strlcpy(source->dbfile, specs->cfPaths.sdbfile, sizeof(source->dbfile));
574576
strlcpy(filter->dbfile, specs->cfPaths.fdbfile, sizeof(filter->dbfile));
575577
strlcpy(target->dbfile, specs->cfPaths.tdbfile, sizeof(target->dbfile));
576578

579+
/* skip replay->dbfile which is rotated */
580+
577581
if (specs->section == DATA_SECTION_ALL ||
578582
specs->section == DATA_SECTION_TABLE_DATA)
579583
{

src/bin/pgcopydb/copydb.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,15 +175,16 @@ typedef struct ExtensionReqs
175175

176176
/*
177177
* pgcopydb sentinel is a table that's created on the source catalog and allows
178-
* communicating elements from the outside, and in between the receive and
179-
* apply processes.
178+
* communicating elements from the outside, and in between the receive,
179+
* transform and apply processes.
180180
*/
181181
typedef struct CopyDBSentinel
182182
{
183183
bool apply;
184184
uint64_t startpos;
185185
uint64_t endpos;
186186
uint64_t write_lsn;
187+
uint64_t transform_lsn;
187188
uint64_t flush_lsn;
188189
uint64_t replay_lsn;
189190
} CopyDBSentinel;
@@ -491,6 +492,7 @@ bool sentinel_update_write_flush_lsn(DatabaseCatalog *catalog,
491492
uint64_t write_lsn,
492493
uint64_t flush_lsn);
493494

495+
bool sentinel_update_transform_lsn(DatabaseCatalog *catalog, uint64_t transform_lsn);
494496
bool sentinel_update_replay_lsn(DatabaseCatalog *catalog, uint64_t replay_lsn);
495497

496498
bool sentinel_get(DatabaseCatalog *catalog, CopyDBSentinel *sentinel);
@@ -501,6 +503,10 @@ bool sentinel_sync_recv(DatabaseCatalog *catalog,
501503
uint64_t flush_lsn,
502504
CopyDBSentinel *sentinel);
503505

506+
bool sentinel_sync_transform(DatabaseCatalog *catalog,
507+
uint64_t transform_lsn,
508+
CopyDBSentinel *sentinel);
509+
504510
bool sentinel_sync_apply(DatabaseCatalog *catalog,
505511
uint64_t replay_lsn,
506512
CopyDBSentinel *sentinel);

src/bin/pgcopydb/follow.c

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -251,18 +251,6 @@ follow_get_sentinel(StreamSpecs *specs, CopyDBSentinel *sentinel, bool verbose)
251251
bool
252252
follow_main_loop(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs)
253253
{
254-
/*
255-
* Remove the possibly still existing stream context files from
256-
* previous round of operations (--resume, etc). We want to make
257-
* sure that the catchup process reads the files created on this
258-
* connection.
259-
*/
260-
if (!stream_cleanup_context(streamSpecs))
261-
{
262-
/* errors have already been logged */
263-
return false;
264-
}
265-
266254
DatabaseCatalog *sourceDB = &(copySpecs->catalogs.source);
267255

268256
if (!catalog_open(sourceDB))
@@ -417,18 +405,6 @@ follow_prepare_mode_switch(StreamSpecs *streamSpecs,
417405
{
418406
log_info("Catching-up from existing on-disk files");
419407

420-
if (streamSpecs->system.timeline == 0)
421-
{
422-
if (!stream_read_context(&(streamSpecs->paths),
423-
&(streamSpecs->system),
424-
&(streamSpecs->WalSegSz)))
425-
{
426-
log_error("Failed to read the streaming context information "
427-
"from the source database, see above for details");
428-
return false;
429-
}
430-
}
431-
432408
/*
433409
* If the previous mode was catch-up, then before proceeding, we might need
434410
* to empty the transform queue where the STOP message was sent.
@@ -518,19 +494,6 @@ followDB(CopyDataSpec *copySpecs, StreamSpecs *streamSpecs)
518494
return false;
519495
}
520496

521-
/*
522-
* Before starting sub-processes, clean-up intermediate files from previous
523-
* round. Here that's the stream context with WAL segment size and timeline
524-
* history, which are fetched from the source server to compute WAL file
525-
* names. The current timeline can only change at a server restart or a
526-
* failover, both with trigger a reconnect.
527-
*/
528-
if (!stream_cleanup_context(streamSpecs))
529-
{
530-
/* errors have already been logged */
531-
return false;
532-
}
533-
534497
/*
535498
* Before starting sub-processes, make sure to close our SQLite catalogs.
536499
* We open the SQLite catalogs again before returning from this function

0 commit comments

Comments
 (0)