Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Expand sql table resource config #2396

Open
wants to merge 4 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion dlt/sources/sql_database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ def sql_table(
resolve_foreign_keys: bool = False,
engine_adapter_callback: Callable[[Engine], Engine] = None,
write_disposition: TWriteDispositionConfig = "append",
primary_key: List[str] = None,
merge_key: List[str] = None,
) -> DltResource:
"""
A dlt resource which loads data from an SQL database table using SQLAlchemy.
Expand Down Expand Up @@ -195,10 +197,16 @@ def sql_table(
engine_adapter_callback (Callable[[Engine], Engine]): Callback to configure, modify and Engine instance that will be used to open a connection ie. to
set transaction isolation level.
write_disposition (TWriteDispositionConfig): write disposition of the table resource, defaults to `append`.
primary_key (List[str]): A list of column names that comprise a private key. Typically used with "merge" write disposition to deduplicate loaded data.
merge_key (List[str]): A list of column names that define a merge key. Typically used with "merge" write disposition to remove overlapping data ranges ie. to
keep a single record for a given day.

Returns:
DltResource: The dlt resource for loading data from the SQL database table.
"""
# In case we get None from the config, we want to default to "append"
write_disposition = write_disposition if write_disposition else "append"

_detect_precision_hints_deprecated(detect_precision_hints)

if detect_precision_hints:
Expand Down Expand Up @@ -232,8 +240,11 @@ def sql_table(
else:
hints = {}

if primary_key:
hints["primary_key"] = primary_key

return decorators.resource(
table_rows, name=table, write_disposition=write_disposition, **hints
table_rows, name=table, write_disposition=write_disposition, merge_key=merge_key, **hints
)(
engine,
table_obj if table_obj is not None else table, # Pass table name if reflection deferred
Expand Down
4 changes: 4 additions & 0 deletions dlt/sources/sql_database/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)
from dlt.common.exceptions import MissingDependencyException
from dlt.common.schema import TTableSchemaColumns
from dlt.common.schema.typing import TWriteDispositionDict
from dlt.common.typing import TDataItem, TSortOrder
from dlt.common.jsonpath import extract_simple_field_name

Expand Down Expand Up @@ -408,3 +409,6 @@ class SqlTableResourceConfiguration(BaseConfiguration):
defer_table_reflect: Optional[bool] = False
reflection_level: Optional[ReflectionLevel] = "full"
included_columns: Optional[List[str]] = None
write_disposition: Optional[TWriteDispositionDict] = None
primary_key: Optional[List[str]] = None
merge_key: Optional[List[str]] = None