Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support postgres xmin replication #2376

Open
waterworthd-cim opened this issue Mar 5, 2025 · 7 comments
Open

support postgres xmin replication #2376

waterworthd-cim opened this issue Mar 5, 2025 · 7 comments
Assignees
Labels
documentation Improvements or additions to documentation question Further information is requested

Comments

@waterworthd-cim
Copy link

Feature description

We have a number of postgres sources that cannot be configured for CDC and many tables are large and don't have a suitable incremental/cursor column (ie. last_changed).

Am currently using airbyte for most our streams as they support cdc, xmin and user supplied cursor column. It would be nice if the postgres source supported all three as well (if I understand correctly it only supports cdc, you have to use sql_database for user supplied and xmin isn't supported).

I think I can use query_adapter_callback to implement xmin (I know this can wrap but it's a work-around until I can get last_changed implemented on our source tables) but it would be nice if the postgres source supported all three (or I've misread the documentation, in which case maybe it's not as clear as it could be?)

Are you a dlt user?

Yes, I run dlt in production.

Use case

I'm currently using a mix of airbyte and dlt as neither cover all my use cases.

Proposed solution

No response

Related issues

No response

@rudolfix rudolfix self-assigned this Mar 9, 2025
@rudolfix rudolfix added the question Further information is requested label Mar 9, 2025
@rudolfix
Copy link
Collaborator

rudolfix commented Mar 9, 2025

@waterworthd-cim please take a look at https://github.com/dlt-hub/verified-sources where we have postgres replication source.

@david-waterworth
Copy link

Correct me if I'm wrong but that source requires use of cdc, not xmin according to the documentation? I'm replicating from a Postgres replica, not a primary so CDC isn't available.

@rudolfix rudolfix moved this from Todo to In Progress in dlt core library Mar 10, 2025
@rudolfix
Copy link
Collaborator

@david-waterworth OK so there's something I'm not aware of :) could you point me to some docs that explain what xmin replicaiton is? we have a regular cdc and one that is right now in PR that replicates from WAL (for old postgres versions): dlt-hub/verified-sources#589

if there's a trick Airbyte does then it would be good to know.

@rudolfix
Copy link
Collaborator

OK I've found it. it seems you just need to add xmin to a list of columns that are selected, just use query_adapter callback to do it and define incremental loading on it. see our docs for examples.

@david-waterworth
Copy link

david-waterworth commented Mar 10, 2025

Yeah I've done that with the sql_table / psycopg2 / sqlalchemy (although I couldn't just use query_adapter_callback - I also had to use type_adapter_callback or dlt would error). Are you saying that this is also supported with the postgres source?

For anyone else, this is more or less what I did

def type_adapter_callback(table) -> None:
    required_columns = [("xmin", sqltypes.BigInteger, {"nullable": True})]
    for col_name, col_type, col_kwargs in required_columns:
        if col_name not in table.c:
            table.append_column(sa.Column(col_name, col_type, **col_kwargs))


def query_adapter_callback(query, table, incremental=None, _engine=None) -> sa.TextClause:
        return sa.text(
            f"SELECT {table.fullname}.*,"
            f" xmin::text::bigint as xmin FROM {table.fullname}"
            " WHERE"
            f" {incremental.cursor_path}::text::bigint >= ({incremental.start_value}::int8)"
        )

    return query

@rudolfix
Copy link
Collaborator

if by postgres source you mean pg_replication then no, xmin is just a regular query. you code is cool! we'll add it to our docs.
btw.

f" {incremental.cursor_path}::text::bigint >= ({incremental.start_value}::int8)"

aren't you getting duplicates in your data? I think you are re-taking the last row with >=?

do you still have the exception you've got? dlt should be able to generate types automatically for the missing columns

@rudolfix rudolfix added the documentation Improvements or additions to documentation label Mar 16, 2025
@waterworthd-cim
Copy link
Author

@rudolfix the >= was deliberate although possibly note required. I based it off an example I found somewhere and there was a discussion around > vs >= and whether it was better to risk duplicates (which will get fixed with merge at an additional cost) vs gaps. I don't think gaps are very likely with xmin, but if you're using a low resolution timestamp column I feel like you could have multiple rows on the source with the same timestamp but not generated at the exact instance of time, and dlt could run in between these updates so only get some of them. So figured better be safe than sorry.

I don't have the exception but I can probably recreate, so if I managed I'll post here. It worked fine when I used connectorx (based on this example - https://dlthub.com/blog/dlt-arrow-loading). When I migrated to sql_table (specifically doing a case in the sql in the query_adapter_callback) it didn't work. I assumed it was using the underlying table and not the sql query to generate the schema but that was a guess that I never verified. i.e. my actual code is less general that above, i.e. when I had to convert NUMERIC to FLOAT I used

def query_adapter_callback(query, table, incremental=None, _engine=None) -> sa.TextClause:
        return sa.text(
            f"SELECT data::FLOAT AS data,"
            f" xmin::text::bigint as xmin FROM {table.fullname}"
            " WHERE"
            f" {incremental.cursor_path}::text::bigint >= ({incremental.start_value}::int8)"
        )

    return query

and then added

def type_adapter_callback(sql_type):
    if isinstance(sql_type, sa.NUMERIC):
        return FLOAT
    return sql_type

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation question Further information is requested
Projects
Status: In Progress
Development

No branches or pull requests

3 participants