Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql_database() & sql_table() source functions not working with DuckDB/ MotherDuck #2421

Open
thenaturalist opened this issue Mar 19, 2025 · 2 comments
Assignees

Comments

@thenaturalist
Copy link

thenaturalist commented Mar 19, 2025

dlt version

1.8.0

Describe the problem

I want to load data from a DuckDB/ MotherDuck source using either the sql_database or sql_table functions.

Both fail as dlt fails to resolve DuckDBs catalog inspection to correctly scope table and column qualifications.

I am using https://github.com/Mause/duckdb_engine verions 0.15.0.

Expected behavior

dlt is generating SQL which correctly identifies a specified table based on DuckDB's catalog conventions and the pipeline completes without error.

Steps to reproduce

  1. Create a DuckDB instance with a synthetic test table as described here: https://duckdb.org/docs/stable/guides/snippets/create_synthetic_data.html

  2. Create a pipeline which uses DuckDB as a source like so:

conn_string = f"duckdb:///md:{db_name}{access_token_suffix}"

credentials = create_engine(
    conn_string,
    connect_args={
        'read_only': True,
    }
)

pipeline = dlt.pipeline(
        pipeline_name=pipeline_name,
        destination='postgres',
        dataset_name=dataset_name,
        progress="log",
    )

source1 = sql_database(
   credentials,
   schema=schema_name, # with our without, same effect
   table_names=['foobar'], # or schema_name.foobar or db_name.schema_name.foobar
   reflection_level="full",
 )

source2 = sql_database(
   credentials,
   schema=schema_name, # with our without, same effect
   reflection_level="full",
 ).with_resources('foobar')

source3 = sql_table(
    credentials,
    schema="info",
    table="foobar",
)

load_info1 = pipeline.run(source1, write_disposition=write_disposition)
print(load_info1)

load_info2 = pipeline.run(source2, write_disposition=write_disposition)
print(load_info2)

load_info3 = pipeline.run(source3, write_disposition=write_disposition)
print(load_info3)

Expected errors to see:

dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage extract when processing package 1742387100.2242408 with exception:

<class 'dlt.extract.exceptions.ResourceExtractionError'>
In processing pipe foo: extraction of resource foo in generator table_rows caused an exception: (duckdb.duckdb.CatalogException) Catalog Error: Table with name foo does not exist!
Did you mean "db_name.schema_name.foo"?

LINE 2: FROM foo
             ^
[SQL: SELECT foo.id, foo.created_at, foo.bar
FROM foo]
(Background on this error at: https://sqlalche.me/e/20/f405)

When fully qualifying the table name in the functions like db_name.schema_name.table_name, the error returned is:

sqlalchemy.exc.InvalidRequestError: Could not reflect: requested table(s) not available in Engine(duckdb:///foobar): db_name.schema_name.table_name

Operating system

macOS

Runtime environment

Local

Python version

3.12

dlt data source

sql_database

dlt destination

Postgres

Other deployment details

No response

Additional information

No response

@thenaturalist thenaturalist changed the title sql_database source not working with DuckDB/ MotherDuck sql_database() & sql_table() source functions not working with DuckDB/ MotherDuck Mar 19, 2025
@thenaturalist
Copy link
Author

thenaturalist commented Mar 19, 2025

This is quickly rectified in the interim by adding:

def query_adapter_callback(
      query, table
  ) -> TextClause:

    t_query = sa.text(f"SELECT * FROM {table.fullname}")

    return t_query

@sh-rp
Copy link
Collaborator

sh-rp commented Mar 21, 2025

Hey @thenaturalist, I just did a quick test here and for duckdb this definately works:

import dlt

from dlt.sources.sql_database import sql_database


SOURCE_DB_FILE_NAME = "./db.duckdb"
DESTINATION_DB_FILE_NAME = "./db_out.duckdb"

if __name__ == "__main__":

    # create example table
    pipeline = dlt.pipeline(pipeline_name="source_pipe", dataset_name="source_dataset", destination=dlt.destinations.duckdb(SOURCE_DB_FILE_NAME))
    pipeline.run([1,2,3], table_name="source_table")

    # read example table
    table_source = sql_database(
        "duckdb:///" + SOURCE_DB_FILE_NAME,
        schema="source_dataset",
        reflection_level="full",
    ).with_resources('source_table')

    dest_pipeline = dlt.pipeline(pipeline_name="dest_pipe", dataset_name="dest_dataset", destination=dlt.destinations.duckdb(DESTINATION_DB_FILE_NAME))
    dest_pipeline.run(table_source)

    print(dest_pipeline.dataset().source_table.df())

Maybe this snippet can help with your code.

@sh-rp sh-rp self-assigned this Mar 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Todo
Development

No branches or pull requests

2 participants