-
Notifications
You must be signed in to change notification settings - Fork 238
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
"model" item_format support #2404
base: devel
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
33dbe36
to
f2e10c5
Compare
is_binary_format=False, | ||
supports_schema_changes="True", | ||
supports_compression=False, | ||
# NOTE: we create a new model file for each sql row |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to have one model per file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed you can yield many models per table in a single run. isn't it an error in the code that generates models?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rudolfix I don't quite understand what this convo is about 👀. If rephrased, is the the same as:
@rudolfix: One can yield many models (many insert statements) per table in a single run, so it's wrong to have one model (insert statement) per table
@sh-rp: No, it does make sense to have one model (insert statement) per table
@@ -129,7 +129,7 @@ class duckdb(Destination[DuckDbClientConfiguration, "DuckDbClient"]): | |||
def _raw_capabilities(self) -> DestinationCapabilitiesContext: | |||
caps = DestinationCapabilitiesContext() | |||
caps.preferred_loader_file_format = "insert_values" | |||
caps.supported_loader_file_formats = ["insert_values", "parquet", "jsonl"] | |||
caps.supported_loader_file_formats = ["insert_values", "parquet", "jsonl", "model"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this first iteration we only support duckdb, the transformations can check the capabilities to figure out where pure sql may be used.
@@ -68,11 +69,17 @@ class TResourceHints(TResourceHintsBase, total=False): | |||
|
|||
|
|||
class HintsMeta: | |||
__slots__ = ("hints", "create_table_variant") | |||
__slots__ = ("hints", "create_table_variant", "data_item_format") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opted to add a slot on the default HintsMeta, I could also create a subclass similar to the file import, but for me this solution somehow makes more sense, since it is kind of a hint, but does not need to go into the schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you do not need to modify HintsMeta
. Just create a container class in which you'll yield models ie
class ModelStr(str):
pass
and yield it. Are you 100% sure we do not need any model properties to be stored in the file? or we plan to pass those via TableSchema? ie. look here:
https://docs.getdbt.com/reference/model-configs (materialization)
https://docs.getdbt.com/docs/build/incremental-strategy (this is our write disposition + primary key) heh cool
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have done it this way now.
f2e10c5
to
1daad9b
Compare
5d59485
to
83cc002
Compare
dlt/extract/utils.py
Outdated
@@ -60,14 +62,21 @@ | |||
pandas = None | |||
|
|||
|
|||
def get_data_item_format(items: TDataItems) -> TDataItemFormat: | |||
def get_data_item_format(items: TDataItems, meta: Any = None) -> TDataItemFormat: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each row that has a special item type needs to have the meta set, similar to how the ImportFileMeta works. We could also store it so it works more like the other hints, but I think this should be ok. It's mostly going to be used from the transformations anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as mentioned, just check the item type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks cool! but IMO can be simplified
is_binary_format=False, | ||
supports_schema_changes="True", | ||
supports_compression=False, | ||
# NOTE: we create a new model file for each sql row |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed you can yield many models per table in a single run. isn't it an error in the code that generates models?
""" | ||
sql_client = self._job_client.sql_client | ||
name = sql_client.make_qualified_table_name(self._load_table["name"]) | ||
return f"INSERT INTO {name} {select_statement};" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
INSERT is just one of the options. other options are: MERGE and DELETE .. INSERT which (guess what) work like our merge jobs.
I think you need to take PreparedTableSchema here and generate INSERT/MERGE code depending on write disposition (maybe not now, maybe in the future)
Also I assume that model jobs for a table chain will be loaded as a single statement (preferably with transaction)
we could probably replace our sql merge jobs with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now these inserts are done on the dataset that the jobs are initially loaded to, so if you have a sql transformation with write_disposition merge, it will insert the rows into the staging table and then create a regular merge jobs, so another sql job, that gets executed at the end. At least I think it should do this, I still have to write tests. I'd like to keep this PR as simple as possible for now and then migrate to more efficient jobs later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heh right! but I see a way to skip the staging dataset...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes of course, but imho out of scope for now, because I am actually trying to build the transformations in time. unless you insist, then I'll do it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it could be a cool job for @anuunchin to see wether we can make nice merge jobs here.
path = self._get_data_item_path_template(load_id, schema_name, table_name) | ||
writer = BufferedDataWriter(self.writer_spec, path) | ||
writer = BufferedDataWriter(self.writer_spec, path, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? aren't those signatures explicit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I get it now, but I think you could just take file_max_items
from writer_spec
in BufferedDataWriter
. you do not need to pass any kwargs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is, that if I set an explicit "None" as an argument, it will not take this value from the BufferedDataWriterConfiguration
. It seems the config values in constructors are only set for arguments that do not have an explicit value set during instantiation, which makes sense to me. This also is true for a 'None' value, so I need to call the BufferedDataWriter.init either with this argument or without it, but never with None
.
@@ -68,11 +69,17 @@ class TResourceHints(TResourceHintsBase, total=False): | |||
|
|||
|
|||
class HintsMeta: | |||
__slots__ = ("hints", "create_table_variant") | |||
__slots__ = ("hints", "create_table_variant", "data_item_format") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you do not need to modify HintsMeta
. Just create a container class in which you'll yield models ie
class ModelStr(str):
pass
and yield it. Are you 100% sure we do not need any model properties to be stored in the file? or we plan to pass those via TableSchema? ie. look here:
https://docs.getdbt.com/reference/model-configs (materialization)
https://docs.getdbt.com/docs/build/incremental-strategy (this is our write disposition + primary key) heh cool
dlt/extract/resource.py
Outdated
@@ -81,7 +86,7 @@ def with_hints( | |||
Create `TResourceHints` with `make_hints`. | |||
Setting `table_name` will dispatch the `item` to a specified table, like `with_table_name` | |||
""" | |||
return DataItemWithMeta(HintsMeta(hints, create_table_variant), item) | |||
return DataItemWithMeta(HintsMeta(hints or {}, create_table_variant, data_item_format), item) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why default {} value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i removed it, it was a leftover
dlt/extract/utils.py
Outdated
@@ -60,14 +62,21 @@ | |||
pandas = None | |||
|
|||
|
|||
def get_data_item_format(items: TDataItems) -> TDataItemFormat: | |||
def get_data_item_format(items: TDataItems, meta: Any = None) -> TDataItemFormat: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as mentioned, just check the item type
@@ -279,6 +311,9 @@ def create_load_job( | |||
if SqlLoadJob.is_sql_job(file_path): | |||
# create sql load job | |||
return SqlLoadJob(file_path) | |||
if ModelLoadJob.is_model_job(file_path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmmm maybe we should register jobs via plugin as well?
"Are you 100% sure we do not need any model properties to be stored in the file? or we plan to pass those via TableSchema?" |
path = self._get_data_item_path_template(load_id, schema_name, table_name) | ||
writer = BufferedDataWriter(self.writer_spec, path) | ||
writer = BufferedDataWriter(self.writer_spec, path, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I get it now, but I think you could just take file_max_items
from writer_spec
in BufferedDataWriter
. you do not need to pass any kwargs.
@@ -111,7 +111,7 @@ def _get_items_normalizer( | |||
if item_format == "file": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think item_format_from_file_extension
should return model
item format, not file
for models. here this is not relevant but IMO may be in other places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does already doesn't it? Or am I misunderstanding your comment.
if extension == "typed-jsonl":
return "object"
elif extension == "parquet":
return "arrow"
elif extension == "model":
return "model"
@@ -0,0 +1,170 @@ | |||
# test the sql insert job loader, works only on duckdb for now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not quite sure what else to test here, basically we are just testing wether these sql statements are created properly, all the column types etc should work since we are on the same destination and if they don't, there is not much we can do about it really, since we are not touching the data.
Description
This PR introduces a new item-format "model" which is a sql select statement which will be inserted into a given table. For now only the duckdb destination supports this. Further destinations will be available after @anuunchin finished her research on this.
It is possible to mark an item with a HintsMeta to indicate the model item_type, which means the yielded string will be interpreted as a valid select statement for the targeted destination. During extraction each statement will be stored in its own job.
A resource emitting a model query would look like this, given an input dataset. Columns need to be supplied so dlt can create / update the table: