Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"model" item_format support #2404

Draft
wants to merge 5 commits into
base: devel
Choose a base branch
from
Draft

"model" item_format support #2404

wants to merge 5 commits into from

Conversation

sh-rp
Copy link
Collaborator

@sh-rp sh-rp commented Mar 13, 2025

Description

This PR introduces a new item-format "model" which is a sql select statement which will be inserted into a given table. For now only the duckdb destination supports this. Further destinations will be available after @anuunchin finished her research on this.

It is possible to mark an item with a HintsMeta to indicate the model item_type, which means the yielded string will be interpreted as a valid select statement for the targeted destination. During extraction each statement will be stored in its own job.

A resource emitting a model query would look like this, given an input dataset. Columns need to be supplied so dlt can create / update the table:

@dlt.resource()
def copied_table() -> Any:
    query = dataset["example_table"].limit(5).query()
    yield dlt.mark.with_hints(
        query, hints=make_hints(columns={...), data_item_format="model"
    )

Copy link

netlify bot commented Mar 13, 2025

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit c7a9adf
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/67d7f2c7a45bc7000893c322

@sh-rp sh-rp force-pushed the feat/2366-sql-jobs-2 branch 2 times, most recently from 33dbe36 to f2e10c5 Compare March 13, 2025 12:46
is_binary_format=False,
supports_schema_changes="True",
supports_compression=False,
# NOTE: we create a new model file for each sql row
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to have one model per file

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed you can yield many models per table in a single run. isn't it an error in the code that generates models?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rudolfix I don't quite understand what this convo is about 👀. If rephrased, is the the same as:

@rudolfix: One can yield many models (many insert statements) per table in a single run, so it's wrong to have one model (insert statement) per table
@sh-rp: No, it does make sense to have one model (insert statement) per table

@@ -129,7 +129,7 @@ class duckdb(Destination[DuckDbClientConfiguration, "DuckDbClient"]):
def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = "insert_values"
caps.supported_loader_file_formats = ["insert_values", "parquet", "jsonl"]
caps.supported_loader_file_formats = ["insert_values", "parquet", "jsonl", "model"]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this first iteration we only support duckdb, the transformations can check the capabilities to figure out where pure sql may be used.

@@ -68,11 +69,17 @@ class TResourceHints(TResourceHintsBase, total=False):


class HintsMeta:
__slots__ = ("hints", "create_table_variant")
__slots__ = ("hints", "create_table_variant", "data_item_format")
Copy link
Collaborator Author

@sh-rp sh-rp Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted to add a slot on the default HintsMeta, I could also create a subclass similar to the file import, but for me this solution somehow makes more sense, since it is kind of a hint, but does not need to go into the schema.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you do not need to modify HintsMeta. Just create a container class in which you'll yield models ie

class ModelStr(str):
   pass

and yield it. Are you 100% sure we do not need any model properties to be stored in the file? or we plan to pass those via TableSchema? ie. look here:
https://docs.getdbt.com/reference/model-configs (materialization)
https://docs.getdbt.com/docs/build/incremental-strategy (this is our write disposition + primary key) heh cool

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done it this way now.

@sh-rp sh-rp force-pushed the feat/2366-sql-jobs-2 branch from f2e10c5 to 1daad9b Compare March 13, 2025 13:54
@sh-rp sh-rp force-pushed the feat/2366-sql-jobs-2 branch from 5d59485 to 83cc002 Compare March 13, 2025 14:47
@@ -60,14 +62,21 @@
pandas = None


def get_data_item_format(items: TDataItems) -> TDataItemFormat:
def get_data_item_format(items: TDataItems, meta: Any = None) -> TDataItemFormat:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each row that has a special item type needs to have the meta set, similar to how the ImportFileMeta works. We could also store it so it works more like the other hints, but I think this should be ok. It's mostly going to be used from the transformations anyway.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned, just check the item type

@sh-rp sh-rp changed the title [tmp] model itemformat support "model" item_format support Mar 13, 2025
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks cool! but IMO can be simplified

is_binary_format=False,
supports_schema_changes="True",
supports_compression=False,
# NOTE: we create a new model file for each sql row
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed you can yield many models per table in a single run. isn't it an error in the code that generates models?

"""
sql_client = self._job_client.sql_client
name = sql_client.make_qualified_table_name(self._load_table["name"])
return f"INSERT INTO {name} {select_statement};"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INSERT is just one of the options. other options are: MERGE and DELETE .. INSERT which (guess what) work like our merge jobs.

I think you need to take PreparedTableSchema here and generate INSERT/MERGE code depending on write disposition (maybe not now, maybe in the future)

Also I assume that model jobs for a table chain will be loaded as a single statement (preferably with transaction)

we could probably replace our sql merge jobs with it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now these inserts are done on the dataset that the jobs are initially loaded to, so if you have a sql transformation with write_disposition merge, it will insert the rows into the staging table and then create a regular merge jobs, so another sql job, that gets executed at the end. At least I think it should do this, I still have to write tests. I'd like to keep this PR as simple as possible for now and then migrate to more efficient jobs later.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heh right! but I see a way to skip the staging dataset...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes of course, but imho out of scope for now, because I am actually trying to build the transformations in time. unless you insist, then I'll do it :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could be a cool job for @anuunchin to see wether we can make nice merge jobs here.

path = self._get_data_item_path_template(load_id, schema_name, table_name)
writer = BufferedDataWriter(self.writer_spec, path)
writer = BufferedDataWriter(self.writer_spec, path, **kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? aren't those signatures explicit?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I get it now, but I think you could just take file_max_items from writer_spec in BufferedDataWriter . you do not need to pass any kwargs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is, that if I set an explicit "None" as an argument, it will not take this value from the BufferedDataWriterConfiguration. It seems the config values in constructors are only set for arguments that do not have an explicit value set during instantiation, which makes sense to me. This also is true for a 'None' value, so I need to call the BufferedDataWriter.init either with this argument or without it, but never with None.

@@ -68,11 +69,17 @@ class TResourceHints(TResourceHintsBase, total=False):


class HintsMeta:
__slots__ = ("hints", "create_table_variant")
__slots__ = ("hints", "create_table_variant", "data_item_format")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you do not need to modify HintsMeta. Just create a container class in which you'll yield models ie

class ModelStr(str):
   pass

and yield it. Are you 100% sure we do not need any model properties to be stored in the file? or we plan to pass those via TableSchema? ie. look here:
https://docs.getdbt.com/reference/model-configs (materialization)
https://docs.getdbt.com/docs/build/incremental-strategy (this is our write disposition + primary key) heh cool

@@ -81,7 +86,7 @@ def with_hints(
Create `TResourceHints` with `make_hints`.
Setting `table_name` will dispatch the `item` to a specified table, like `with_table_name`
"""
return DataItemWithMeta(HintsMeta(hints, create_table_variant), item)
return DataItemWithMeta(HintsMeta(hints or {}, create_table_variant, data_item_format), item)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why default {} value?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i removed it, it was a leftover

@@ -60,14 +62,21 @@
pandas = None


def get_data_item_format(items: TDataItems) -> TDataItemFormat:
def get_data_item_format(items: TDataItems, meta: Any = None) -> TDataItemFormat:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned, just check the item type

@@ -279,6 +311,9 @@ def create_load_job(
if SqlLoadJob.is_sql_job(file_path):
# create sql load job
return SqlLoadJob(file_path)
if ModelLoadJob.is_model_job(file_path):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmmm maybe we should register jobs via plugin as well?

@sh-rp
Copy link
Collaborator Author

sh-rp commented Mar 14, 2025

"Are you 100% sure we do not need any model properties to be stored in the file? or we plan to pass those via TableSchema?"
I'm not. Conceptually it would be cool if the model files only had the instructions of where to take the data from and the rest is in the schema, so it is like all the other job files. I was planning to add views in the transformations, I would've added a new table format for this here, but in your review you said we don't need them, so now this is not here. For merging and upserting we already have the right hints which are in the schema, so I don't think anything else is required. But I'm not 100% sure at this point :)

path = self._get_data_item_path_template(load_id, schema_name, table_name)
writer = BufferedDataWriter(self.writer_spec, path)
writer = BufferedDataWriter(self.writer_spec, path, **kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I get it now, but I think you could just take file_max_items from writer_spec in BufferedDataWriter . you do not need to pass any kwargs.

@@ -111,7 +111,7 @@ def _get_items_normalizer(
if item_format == "file":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think item_format_from_file_extension should return model item format, not file for models. here this is not relevant but IMO may be in other places

Copy link
Collaborator Author

@sh-rp sh-rp Mar 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does already doesn't it? Or am I misunderstanding your comment.

        if extension == "typed-jsonl":
            return "object"
        elif extension == "parquet":
            return "arrow"
        elif extension == "model":
            return "model"

@sh-rp sh-rp requested a review from rudolfix March 17, 2025 14:29
@@ -0,0 +1,170 @@
# test the sql insert job loader, works only on duckdb for now
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not quite sure what else to test here, basically we are just testing wether these sql statements are created properly, all the column types etc should work since we are on the same destination and if they don't, there is not much we can do about it really, since we are not touching the data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants