-
Notifications
You must be signed in to change notification settings - Fork 237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sql_table()
query_adapter_callback
function with custom extraction query fails with KeyError
during pipeline.extract()
(pyarrow
backend)
#2374
Comments
sql_table()
query_adapter_callback
function with custom extraction query fails with KeyError
(pyarrow
backend)sql_table()
query_adapter_callback
function with custom extraction query fails with KeyError
during pipeline.extract()
(pyarrow
backend)
@acaruso7 are you able to check the newest version of yield row_tuples_to_arrow(
partition,
columns=_add_missing_columns(self.columns, columns),
tz=backend_kwargs.get("tz", "UTC"),
)
the correct way of implementing this is to use def _add_change_tracking_columns(table: Table) -> None:
required_columns = [
("_dlt_sys_change_version", sa.BigInteger, {"nullable": True}),
("_dlt_deleted", sa.Text, {"default": None, "nullable": True}),
]
for col_name, col_type, col_kwargs in required_columns:
if col_name not in table.c:
table.append_column(
sa.Column(col_name, col_type, **col_kwargs) # type: ignore[arg-type]
) defines two additional columns that are computed columns added by the query adapter |
Hi @rudolfix, using version However, when I try the same pattern in my actual codebase (postgres db source), it's failing with the following error
This error is surprising - surely timestamp types are supported? The But I can tell you that I'm extracting tables with If this is a separate issue, and unrelated to the query adapter, you can go ahead and close this issue This seems to be related to the changes made in #2295 |
I see how the exception message can be confusing. It should say:
In short, I believe that the callback adapters coerces dlt hints making pyarrow unable to convert the data to the specified type. As reported, @acaruso7 If you can share the MYSQL version and the native data type on the table, that could help the repro. DetailsCode path:
For example, if the user specifies a binary column to be a SQLAlchemy timestamp type would trigger the same exception. The pyarrow code has no clue about SQLAlchemy, so we should try to catch these exception upstream in the |
I can't seem to reproduce this using the public Is there an equivalent for Postgres? I tried https://rnacentral.org/help/public-database but I'm running into various permission issues when setting up my |
Is there a way I can force dlt to infer types from the data directly, rather than from the dlt hints? |
I don't really understand what's happening here, but for the time being I've gotten past this pyarrow type conversion error by just using the
EDIT: I see my issue - I need to use I have this all working end to end for my use case with |
@acaruso7 Thanks for following up. I'm happy you have it working with |
The issue seems specific to MySQL. I had the same error for a new test I wrote |
@zilto I actually only had this error with a Postgres source. I couldnt reproduce it with MySQL |
@acaruso7
just remove it from @zilto it would be good to improve exception message as you suggested. maybe we could also inspect a first python object in the data being converted and display it |
No, this is a raw column that exists on the base table and I am selecting it as-is in the query adapter (no additional transforms etc)
Yes, I double checked my information schema and it is |
dlt version
1.5.0
Describe the problem
https://dlthub-community.slack.com/archives/C04DQA7JJN6/p1740596179916109
I am using
sql_table()
source withpyarrow
backend to extract data from a mysql db with a custom extraction query. The custom extraction query is passed toquery_adapter_callback
as a string, and selects a subset of columns from source table, as well as some additional derived columns (example below)Upon calling
pipeline.extract()
, adlt.extract.exceptions.ResourceExtractionError
is thrown. The stack trace indicates aKeyError
for one of the table columns which is not defined in custom extraction query result setMore details with full reproducable example and stack trace below
Note: I am following along with the example in the documentation here
Expected behavior
query_adapter_callback
with custom extraction query should extract only the columns defined in query into pyarrow table.KeyError
should not occurSteps to reproduce
Full reproducible example using public mysql db and duckdb destination:
Full stack trace:
I dug into the internals of the
arrow_helpers.row_tuples_to_arrow
function a bit and noticed some odd behaviordlt/dlt/common/libs/pyarrow.py
Line 575 in e8c5e9b
The value of the
columns
variable iswhich is not what I would expect. This output contains all columns from the source table, as well as my additional derived column,
my_custom_column
The value of
len(rows[0])
is3
, which is what I would expect - 3 columns in results, which aligns to my custom extraction queryThe value of
columnar.keys()
isdict_keys(['author_id', 'name', 'last_name'])
- not what I would expect.last_name
is not included in the result set of my custom queryIt seems something may be going wrong with the operation here, where
columns
are zipped withrows
dlt/dlt/common/libs/pyarrow.py
Lines 595 to 597 in e8c5e9b
This may lead to a
KeyError
laterOperating system
Linux
Runtime environment
Docker, Docker Compose
Python version
3.10
dlt data source
sql_table()
withpyarrow
backenddlt destination
No response
Other deployment details
No response
Additional information
Separately from the issue described here, I wonder whether I am going about this the right way. Is this the preferred approach for extracting custom column sets (including derived columns) from source tables?
An additional requirement I should mention for my own use case is that the custom extraction query must be passed the the
query_adapter_callback
as a raw SQL string - it cannot be built using native sqlalchemy syntax as I am re-using the same query elsewhereThe text was updated successfully, but these errors were encountered: