Skip to content

Commit 6dd703b

Browse files
committed
Refactor ld_transform with the ld_store idea.
1 parent 162ecbb commit 6dd703b

File tree

13 files changed

+909
-318
lines changed

13 files changed

+909
-318
lines changed

src/bin/pgcopydb/catalog.c

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -420,18 +420,18 @@ static char *targetDBcreateDDLs[] = {
420420
static char *replayDBcreateDDLs[] = {
421421
"create table output("
422422
" id integer primary key, "
423-
" action text, xid integer, lsn pg_lsn, timestamp text, "
423+
" action text, xid integer, lsn integer, timestamp text, "
424424
" message text)",
425425

426426
"create unique index o_a_lsn on output(action, lsn)",
427-
"create unique index o_a_xid on output(action, xid)",
427+
"create index o_a_xid on output(action, xid)",
428428

429429
"create table stmt(hash text primary key, sql text)",
430430
"create unique index stmt_hash on stmt(hash)",
431431

432432
"create table replay("
433433
" id integer primary key, "
434-
" action text, xid integer, lsn pg_lsn, timestamp text, "
434+
" action text, xid integer, lsn integer, endlsn integer, timestamp text, "
435435
" stmt_hash text references stmt(hash), stmt_args jsonb)",
436436

437437
"create index r_xid on replay(xid)",
@@ -7904,7 +7904,19 @@ catalog_sql_execute(SQLiteQuery *query)
79047904
if (rc != SQLITE_ROW)
79057905
{
79067906
log_error("Failed to step through statement: %s", query->sql);
7907-
log_error("[SQLite %d] %s", rc, sqlite3_errstr(rc));
7907+
7908+
int offset = sqlite3_error_offset(query->db);
7909+
7910+
if (offset != -1)
7911+
{
7912+
/* "Failed to step through statement: %s" is 34 chars of prefix */
7913+
log_error("%34s%*s^", " ", offset, " ");
7914+
}
7915+
7916+
log_error("[SQLite %d: %s]: %s",
7917+
rc,
7918+
sqlite3_errstr(rc),
7919+
sqlite3_errmsg(query->db));
79087920

79097921
(void) sqlite3_clear_bindings(query->ppStmt);
79107922
(void) sqlite3_finalize(query->ppStmt);

src/bin/pgcopydb/cli_sentinel.c

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,7 @@ cli_sentinel_get(int argc, char **argv)
616616
char startpos[PG_LSN_MAXLENGTH] = { 0 };
617617
char endpos[PG_LSN_MAXLENGTH] = { 0 };
618618
char write_lsn[PG_LSN_MAXLENGTH] = { 0 };
619+
char transform_lsn[PG_LSN_MAXLENGTH] = { 0 };
619620
char flush_lsn[PG_LSN_MAXLENGTH] = { 0 };
620621
char replay_lsn[PG_LSN_MAXLENGTH] = { 0 };
621622

@@ -625,6 +626,8 @@ cli_sentinel_get(int argc, char **argv)
625626
LSN_FORMAT_ARGS(sentinel.endpos));
626627
sformat(write_lsn, PG_LSN_MAXLENGTH, "%X/%X",
627628
LSN_FORMAT_ARGS(sentinel.write_lsn));
629+
sformat(transform_lsn, PG_LSN_MAXLENGTH, "%X/%X",
630+
LSN_FORMAT_ARGS(sentinel.transform_lsn));
628631
sformat(flush_lsn, PG_LSN_MAXLENGTH, "%X/%X",
629632
LSN_FORMAT_ARGS(sentinel.flush_lsn));
630633
sformat(replay_lsn, PG_LSN_MAXLENGTH, "%X/%X",
@@ -634,6 +637,7 @@ cli_sentinel_get(int argc, char **argv)
634637
json_object_set_string(jsobj, "endpos", startpos);
635638
json_object_set_boolean(jsobj, "apply", sentinel.apply);
636639
json_object_set_string(jsobj, "write_lsn", write_lsn);
640+
json_object_set_string(jsobj, "transform_lsn", transform_lsn);
637641
json_object_set_string(jsobj, "flush_lsn", flush_lsn);
638642
json_object_set_string(jsobj, "replay_lsn", replay_lsn);
639643

@@ -645,17 +649,19 @@ cli_sentinel_get(int argc, char **argv)
645649
}
646650
else
647651
{
648-
fformat(stdout, "%-10s %X/%X\n", "startpos",
652+
fformat(stdout, "%-15s %X/%X\n", "startpos",
649653
LSN_FORMAT_ARGS(sentinel.startpos));
650-
fformat(stdout, "%-10s %X/%X\n", "endpos",
654+
fformat(stdout, "%-15s %X/%X\n", "endpos",
651655
LSN_FORMAT_ARGS(sentinel.endpos));
652-
fformat(stdout, "%-10s %s\n", "apply",
656+
fformat(stdout, "%-15s %s\n", "apply",
653657
sentinel.apply ? "enabled" : "disabled");
654-
fformat(stdout, "%-10s %X/%X\n", "write_lsn",
658+
fformat(stdout, "%-15s %X/%X\n", "write_lsn",
655659
LSN_FORMAT_ARGS(sentinel.write_lsn));
656-
fformat(stdout, "%-10s %X/%X\n", "flush_lsn",
660+
fformat(stdout, "%-15s %X/%X\n", "transform_lsn",
661+
LSN_FORMAT_ARGS(sentinel.transform_lsn));
662+
fformat(stdout, "%-15s %X/%X\n", "flush_lsn",
657663
LSN_FORMAT_ARGS(sentinel.flush_lsn));
658-
fformat(stdout, "%-10s %X/%X\n", "replay_lsn",
664+
fformat(stdout, "%-15s %X/%X\n", "replay_lsn",
659665
LSN_FORMAT_ARGS(sentinel.replay_lsn));
660666
}
661667
}

src/bin/pgcopydb/ld_apply.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ stream_apply_catchup(StreamSpecs *specs)
6565
{
6666
StreamApplyContext context = { 0 };
6767

68+
return true;
69+
6870
if (!stream_apply_setup(specs, &context))
6971
{
7072
log_error("Failed to setup for catchup, see above for details");

0 commit comments

Comments
 (0)