Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql_table() query_adapter_callback function with custom extraction query fails with KeyError during pipeline.extract() (pyarrow backend) #2374

Open
acaruso7 opened this issue Mar 5, 2025 · 11 comments
Assignees
Labels
bug Something isn't working question Further information is requested

Comments

@acaruso7
Copy link

acaruso7 commented Mar 5, 2025

dlt version

1.5.0

Describe the problem

https://dlthub-community.slack.com/archives/C04DQA7JJN6/p1740596179916109

I am using sql_table() source with pyarrow backend to extract data from a mysql db with a custom extraction query. The custom extraction query is passed to query_adapter_callback as a string, and selects a subset of columns from source table, as well as some additional derived columns (example below)

Upon calling pipeline.extract(), a dlt.extract.exceptions.ResourceExtractionError is thrown. The stack trace indicates a KeyError for one of the table columns which is not defined in custom extraction query result set

More details with full reproducable example and stack trace below

Note: I am following along with the example in the documentation here

Expected behavior

query_adapter_callback with custom extraction query should extract only the columns defined in query into pyarrow table. KeyError should not occur

Steps to reproduce

Full reproducible example using public mysql db and duckdb destination:

import functools

import dlt
from dlt.sources.sql_database import sql_table

import sqlalchemy as sa
from sqlalchemy import create_engine, engine


mysql_engine = create_engine(
    engine.URL.create(
        drivername="mysql+mysqldb",
        host="mysql-rfam-public.ebi.ac.uk",
        port=4497,
        database="Rfam",
        username="rfamro",
        password="",
    )
)

pipeline = dlt.pipeline(
    pipeline_name="test",
    destination="duckdb",
)


def query_adapter_callback(
    query, table, incremental=None, engine=None, custom_extract_query_override=None
) -> sa.sql.elements.TextClause:
    if custom_extract_query_override:
        t_query = sa.text(custom_extract_query_override)
    else:
        t_query = s.text(query)

    return t_query


custom_extract_query_override = """
select
	  author_id
	, name
        , UPPER(name) as my_custom_column
	-- other columns deliberately excluded
from author;
"""

table_source = sql_table(
    credentials=mysql_engine,
    table="author",
    backend="pyarrow",
    query_adapter_callback=functools.partial(
        query_adapter_callback,
        custom_extract_query_override=custom_extract_query_override,
    ),
)

pipeline.extract(table_source)

Full stack trace:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/dlt/extract/pipe_iterator.py", line 277, in _get_source_item
    pipe_item = next(gen)
  File "/usr/local/lib/python3.10/site-packages/dlt/sources/sql_database/helpers.py", line 301, in table_rows
    yield from loader.load_rows(backend_kwargs)
  File "/usr/local/lib/python3.10/site-packages/dlt/sources/sql_database/helpers.py", line 178, in load_rows
    yield from self._load_rows(query, backend_kwargs)
  File "/usr/local/lib/python3.10/site-packages/dlt/sources/sql_database/helpers.py", line 200, in _load_rows
    yield row_tuples_to_arrow(
  File "/usr/local/lib/python3.10/site-packages/dlt/common/configuration/inject.py", line 247, in _wrap
    return f(*bound_args.args, **bound_args.kwargs)
  File "/usr/local/lib/python3.10/site-packages/dlt/sources/sql_database/arrow_helpers.py", line 22, in row_tuples_to_arrow
    return _row_tuples_to_arrow(
  File "/usr/local/lib/python3.10/site-packages/dlt/common/libs/pyarrow.py", line 615, in row_tuples_to_arrow
    columnar_known_types = {
  File "/usr/local/lib/python3.10/site-packages/dlt/common/libs/pyarrow.py", line 616, in <dictcomp>
    col["name"]: columnar[col["name"]]
KeyError: 'initials'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 471, in extract
    self._extract_source(
  File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 1239, in _extract_source
    load_id = extract.extract(
  File "/usr/local/lib/python3.10/site-packages/dlt/extract/extract.py", line 421, in extract
    self._extract_single_source(
  File "/usr/local/lib/python3.10/site-packages/dlt/extract/extract.py", line 344, in _extract_single_source
    for pipe_item in pipes:
  File "/usr/local/lib/python3.10/site-packages/dlt/extract/pipe_iterator.py", line 162, in __next__
    pipe_item = self._get_source_item()
  File "/usr/local/lib/python3.10/site-packages/dlt/extract/pipe_iterator.py", line 307, in _get_source_item
    raise ResourceExtractionError(pipe.name, gen, str(ex), "generator") from ex
dlt.extract.exceptions.ResourceExtractionError: In processing pipe author: extraction of resource author in generator table_rows caused an exception: 'initials'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/local/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/opt/prefect/flows/subflows/repro_dlt_query_adapter_issue.py", line 57, in <module>
    pipeline.extract(table_source)
  File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 226, in _wrap
    step_info = f(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 180, in _wrap
    rv = f(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 166, in _wrap
    return f(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 275, in _wrap
    return f(self, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/dlt/pipeline/pipeline.py", line 492, in extract
    raise PipelineStepFailed(
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage extract when processing package 1741133041.594346 with exception:

<class 'dlt.extract.exceptions.ResourceExtractionError'>
In processing pipe author: extraction of resource author in generator table_rows caused an exception: 'initials'

I dug into the internals of the arrow_helpers.row_tuples_to_arrow function a bit and noticed some odd behavior

def row_tuples_to_arrow(

The value of the columns variable is

{'author_id': {'name': 'author_id', 'nullable': False, 'data_type': 'bigint'}, 'name': {'name': 'name', 'nullable': False, 'data_type': 'text'}, 'last_name': {'name': 'last_name', 'nullable': True, 'data_type': 'text'}, 'initials': {'name': 'initials', 'nullable': True, 'data_type': 'text'}, 'orcid': {'name': 'orcid', 'nullable': True, 'data_type': 'text'}, 'synonyms': {'name': 'synonyms', 'nullable': True, 'data_type': 'text'}, 'my_custom_column': {'name': 'my_custom_column'}}

which is not what I would expect. This output contains all columns from the source table, as well as my additional derived column, my_custom_column

The value of len(rows[0]) is 3, which is what I would expect - 3 columns in results, which aligns to my custom extraction query

The value of columnar.keys() is dict_keys(['author_id', 'name', 'last_name']) - not what I would expect. last_name is not included in the result set of my custom query

It seems something may be going wrong with the operation here, where columns are zipped with rows

columnar = {
col: dat.ravel() for col, dat in zip(columns, np.vsplit(pivoted_rows, len(pivoted_rows)))
}

This may lead to a KeyError later

Operating system

Linux

Runtime environment

Docker, Docker Compose

Python version

3.10

dlt data source

sql_table() with pyarrow backend

dlt destination

No response

Other deployment details

No response

Additional information

Separately from the issue described here, I wonder whether I am going about this the right way. Is this the preferred approach for extracting custom column sets (including derived columns) from source tables?

An additional requirement I should mention for my own use case is that the custom extraction query must be passed the the query_adapter_callback as a raw SQL string - it cannot be built using native sqlalchemy syntax as I am re-using the same query elsewhere

@acaruso7 acaruso7 changed the title sql_table() query_adapter_callback function with custom extraction query fails with KeyError (pyarrow backend) sql_table() query_adapter_callback function with custom extraction query fails with KeyError during pipeline.extract() (pyarrow backend) Mar 5, 2025
@rudolfix rudolfix added the bug Something isn't working label Mar 9, 2025
@rudolfix rudolfix self-assigned this Mar 9, 2025
@rudolfix rudolfix added the question Further information is requested label Mar 9, 2025
@rudolfix rudolfix moved this from Todo to In Progress in dlt core library Mar 9, 2025
@rudolfix
Copy link
Collaborator

rudolfix commented Mar 9, 2025

@acaruso7 are you able to check the newest version of dlt? we added a handler for your case:

yield row_tuples_to_arrow(
                        partition,
                        columns=_add_missing_columns(self.columns, columns),
                        tz=backend_kwargs.get("tz", "UTC"),
                    )

_add_missing_columns will add missing columns according to your cursor. then we'll try to derive the data types from data (using pyarrow)

the correct way of implementing this is to use table_adapter and add data types explicitly. for example:

def _add_change_tracking_columns(table: Table) -> None:
        required_columns = [
            ("_dlt_sys_change_version", sa.BigInteger, {"nullable": True}),
            ("_dlt_deleted", sa.Text, {"default": None, "nullable": True}),
        ]

        for col_name, col_type, col_kwargs in required_columns:
            if col_name not in table.c:
                table.append_column(
                    sa.Column(col_name, col_type, **col_kwargs)  # type: ignore[arg-type]
                )

defines two additional columns that are computed columns added by the query adapter

@acaruso7
Copy link
Author

acaruso7 commented Mar 10, 2025

Hi @rudolfix, using version 1.8.0 resolved my issue for the example posted above (it extracts the data using the custom query with no issue)

However, when I try the same pattern in my actual codebase (postgres db source), it's failing with the following error

<class 'dlt.extract.exceptions.ResourceExtractionError'>
In processing pipe notifications: extraction of resource notifications in future <Future at 0xffff6913b9a0 state=finished raised PyToArrowConversionException> caused an exception: Conversion to arrow failed for field `created_at` with dlt hint `data_type=timestamp` and `inferred_arrow_type=timestamp[us, tz=UTC]` This data type seems currently unsupported by dlt. Please open a GitHub issue

This error is surprising - surely timestamp types are supported?

The created_at column in my Postgres source table is a timestamptz type. I'm not sure if the error is related to the fact that I'm using a custom extraction query with query adapter; maybe something is going wrong with table reflection

But I can tell you that I'm extracting tables with timestamptz columns in many other pipelines with no issue (not using query adapter). In my table_adapter_callback function, I tried both ("created_at", sqltypes.DateTime, {"nullable": False}) and ("created_at", sqltypes.Time, {"nullable": False})

If this is a separate issue, and unrelated to the query adapter, you can go ahead and close this issue

This seems to be related to the changes made in #2295

@zilto
Copy link
Collaborator

zilto commented Mar 10, 2025

I see how the exception message can be confusing. It should say:

Data for field created_at with hint timestamp cannot be converted to inferred arrow type timestamp[us, tz=UTC]

In short, I believe that the callback adapters coerces dlt hints making pyarrow unable to convert the data to the specified type. As reported, timestampz type causes no issues outside the callbacks. Without dlt hints, pyarrow has more flexibility to determine the right type.

@acaruso7 If you can share the MYSQL version and the native data type on the table, that could help the repro.

Details

Code path:

  • SQLAlchemy type is set by user via query_adapter_callback
  • dlt hints (e.g., timestamp) is derived from SQLAlchemy types
  • arrow type (e.g., timestamp[us, tz=UTC]) is inferred from dlt hints, not the data
  • try to convert data (type unknown) to meet dlt hints and arrow type

For example, if the user specifies a binary column to be a SQLAlchemy timestamp type would trigger the same exception. The pyarrow code has no clue about SQLAlchemy, so we should try to catch these exception upstream in the sql_database() source and provide better debugging information. Could extend the PyToArrowConversionException exception

@acaruso7
Copy link
Author

I can't seem to reproduce this using the public mysql-rfam-public.ebi.ac.uk MySQL db

Is there an equivalent for Postgres? I tried https://rnacentral.org/help/public-database but I'm running into various permission issues when setting up my sql_table() source

@acaruso7
Copy link
Author

Is there a way I can force dlt to infer types from the data directly, rather than from the dlt hints?

@acaruso7
Copy link
Author

acaruso7 commented Mar 11, 2025

I don't really understand what's happening here, but for the time being I've gotten past this pyarrow type conversion error by just using the sqlalchemy backend.

However now I've noticed that the sql_table() source doesn't seem to be respecting my custom column set from the query adapter callback

In the example above, I expect to only extract author_id, name, and my_custom_column, but when I run print(table_source.columns.keys())

I get back

dict_keys(['author_id', 'name', 'last_name', 'initials', 'orcid', 'synonyms'])

which is all columns defined in the author table. Should it not be limited to just the columns in my select clause, or am I misunderstanding how the sql_table() source works?

EDIT: I see my issue - I need to use table_adapter_callback to define the column set explicitly for sql_table() source, since by default it just reflects the source db table schema, but in my case I'm using a custom query adapter, which drops some columns, and adds other computed columns

I have this all working end to end for my use case with sqlalchemy backend at this point. I would like to still understand more deeply what's happening with pyarrow type conversion when you guys get to it, thanks!

@zilto
Copy link
Collaborator

zilto commented Mar 12, 2025

@acaruso7 Thanks for following up. I'm happy you have it working with sqlalchemy, but it's definitely something we want to fix because pyarrow should be much more performant.

@zilto
Copy link
Collaborator

zilto commented Mar 14, 2025

The issue seems specific to MySQL. I had the same error for a new test I wrote

@acaruso7
Copy link
Author

@zilto I actually only had this error with a Postgres source. I couldnt reproduce it with MySQL

@rudolfix
Copy link
Collaborator

@acaruso7 sqlalchemy let's dlt to handle type conversion and it is pretty aggressive when converting types. we definitely support date times for arrow backend. a few questions:

  1. is this created_at column a computed one or coming from a join?
  2. are you sure you select timestamptz type, not something converted to string?

Is there a way I can force dlt to infer types from the data directly, rather than from the dlt hints?

just remove it from table_adapter_callback - if column is not know, we'll infer type from data. I hope we are not going in circles here :) for sure we need to write a few more tests

@zilto it would be good to improve exception message as you suggested. maybe we could also inspect a first python object in the data being converted and display it

@acaruso7
Copy link
Author

is this created_at column a computed one or coming from a join?

No, this is a raw column that exists on the base table and I am selecting it as-is in the query adapter (no additional transforms etc)

are you sure you select timestamptz type, not something converted to string?

Yes, I double checked my information schema and it is timestamp with time zone

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working question Further information is requested
Projects
Status: In Progress
Development

No branches or pull requests

3 participants