Skip to content

Commit 8798c17

Browse files
authored
Merge pull request #1934 from dlt-hub/devel
master merge for 1.2.0 release
2 parents d2b6d05 + 2d07a43 commit 8798c17

File tree

111 files changed

+2733
-1888
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

111 files changed

+2733
-1888
lines changed

dlt/common/runtime/anon_tracker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ def _default_context_fields() -> TExecutionContext:
155155
global _TRACKER_CONTEXT
156156

157157
if not _TRACKER_CONTEXT:
158-
# Make sure to update the example in docs/docs/telemetry/telemetry.mdx
158+
# Make sure to update the example in docs/reference/telemetry.md
159159
# if you change / add context
160160
_TRACKER_CONTEXT = get_execution_context()
161161

dlt/common/schema/typing.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,15 +232,20 @@ class TWriteDispositionDict(TypedDict):
232232
disposition: TWriteDisposition
233233

234234

235-
class TMergeDispositionDict(TWriteDispositionDict, total=False):
235+
class TMergeDispositionDict(TWriteDispositionDict):
236236
strategy: Optional[TLoaderMergeStrategy]
237+
238+
239+
class TScd2StrategyDict(TMergeDispositionDict, total=False):
237240
validity_column_names: Optional[List[str]]
238241
active_record_timestamp: Optional[TAnyDateTime]
239242
boundary_timestamp: Optional[TAnyDateTime]
240243
row_version_column_name: Optional[str]
241244

242245

243-
TWriteDispositionConfig = Union[TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict]
246+
TWriteDispositionConfig = Union[
247+
TWriteDisposition, TWriteDispositionDict, TMergeDispositionDict, TScd2StrategyDict
248+
]
244249

245250

246251
class _TTableSchemaBase(TTableProcessingHints, total=False):

dlt/destinations/impl/athena/athena.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,12 @@ def gen_delete_temp_table_sql(
149149
sql.insert(0, f"""DROP TABLE IF EXISTS {temp_table_name.replace('"', '`')};""")
150150
return sql, temp_table_name
151151

152+
@classmethod
153+
def gen_concat_sql(cls, columns: Sequence[str]) -> str:
154+
# Athena requires explicit casting
155+
columns = [f"CAST({c} AS VARCHAR)" for c in columns]
156+
return f"CONCAT({', '.join(columns)})"
157+
152158
@classmethod
153159
def requires_temp_table_for_delete(cls) -> bool:
154160
return True

dlt/destinations/impl/databricks/databricks.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from dlt.destinations.job_impl import ReferenceFollowupJobRequest
3434

3535
AZURE_BLOB_STORAGE_PROTOCOLS = ["az", "abfss", "abfs"]
36+
SUPPORTED_BLOB_STORAGE_PROTOCOLS = AZURE_BLOB_STORAGE_PROTOCOLS + ["s3", "gs", "gcs"]
3637

3738

3839
class DatabricksLoadJob(RunnableLoadJob, HasFollowupJobs):
@@ -69,11 +70,12 @@ def run(self) -> None:
6970
bucket_url = urlparse(bucket_path)
7071
bucket_scheme = bucket_url.scheme
7172

72-
if bucket_scheme not in AZURE_BLOB_STORAGE_PROTOCOLS + ["s3"]:
73+
if bucket_scheme not in SUPPORTED_BLOB_STORAGE_PROTOCOLS:
7374
raise LoadJobTerminalException(
7475
self._file_path,
75-
f"Databricks cannot load data from staging bucket {bucket_path}. Only s3 and"
76-
" azure buckets are supported",
76+
f"Databricks cannot load data from staging bucket {bucket_path}. Only s3, azure"
77+
" and gcs buckets are supported. Please note that gcs buckets are supported"
78+
" only via named credential",
7779
)
7880

7981
if self._job_client.config.is_staging_external_location:
@@ -106,6 +108,12 @@ def run(self) -> None:
106108
bucket_path = self.ensure_databricks_abfss_url(
107109
bucket_path, staging_credentials.azure_storage_account_name
108110
)
111+
else:
112+
raise LoadJobTerminalException(
113+
self._file_path,
114+
"You need to use Databricks named credential to use google storage."
115+
" Passing explicit Google credentials is not supported by Databricks.",
116+
)
109117

110118
if bucket_scheme in AZURE_BLOB_STORAGE_PROTOCOLS:
111119
assert isinstance(
@@ -125,7 +133,7 @@ def run(self) -> None:
125133
raise LoadJobTerminalException(
126134
self._file_path,
127135
"Cannot load from local file. Databricks does not support loading from local files."
128-
" Configure staging with an s3 or azure storage bucket.",
136+
" Configure staging with an s3, azure or google storage bucket.",
129137
)
130138

131139
# decide on source format, stage_file_path will either be a local file or a bucket path

dlt/destinations/impl/sqlalchemy/db_api_client.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -238,16 +238,16 @@ def _sqlite_create_dataset(self, dataset_name: str) -> None:
238238
"""Mimic multiple schemas in sqlite using ATTACH DATABASE to
239239
attach a new database file to the current connection.
240240
"""
241-
if dataset_name == "main":
242-
# main always exists
243-
return
244241
if self._sqlite_is_memory_db():
245242
new_db_fn = ":memory:"
246243
else:
247244
new_db_fn = self._sqlite_dataset_filename(dataset_name)
248245

249-
statement = "ATTACH DATABASE :fn AS :name"
250-
self.execute_sql(statement, fn=new_db_fn, name=dataset_name)
246+
if dataset_name != "main": # main is the current file, it is always attached
247+
statement = "ATTACH DATABASE :fn AS :name"
248+
self.execute_sql(statement, fn=new_db_fn, name=dataset_name)
249+
# WAL mode is applied to all currently attached databases
250+
self.execute_sql("PRAGMA journal_mode=WAL")
251251
self._sqlite_attached_datasets.add(dataset_name)
252252

253253
def _sqlite_drop_dataset(self, dataset_name: str) -> None:

dlt/destinations/impl/sqlalchemy/factory.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import typing as t
22

3+
from dlt.common import pendulum
34
from dlt.common.destination import Destination, DestinationCapabilitiesContext
45
from dlt.common.destination.capabilities import DataTypeMapper
56
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
@@ -9,6 +10,7 @@
910
SqlalchemyCredentials,
1011
SqlalchemyClientConfiguration,
1112
)
13+
from dlt.common.data_writers.escape import format_datetime_literal
1214

1315
SqlalchemyTypeMapper: t.Type[DataTypeMapper]
1416

@@ -24,6 +26,13 @@
2426
from sqlalchemy.engine import Engine
2527

2628

29+
def _format_mysql_datetime_literal(
30+
v: pendulum.DateTime, precision: int = 6, no_tz: bool = False
31+
) -> str:
32+
# Format without timezone to prevent tz conversion in SELECT
33+
return format_datetime_literal(v, precision, no_tz=True)
34+
35+
2736
class sqlalchemy(Destination[SqlalchemyClientConfiguration, "SqlalchemyJobClient"]):
2837
spec = SqlalchemyClientConfiguration
2938

@@ -50,6 +59,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
5059
caps.supports_multiple_statements = False
5160
caps.type_mapper = SqlalchemyTypeMapper
5261
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
62+
caps.supported_merge_strategies = ["delete-insert", "scd2"]
5363

5464
return caps
5565

@@ -67,6 +77,8 @@ def adjust_capabilities(
6777
caps.max_identifier_length = dialect.max_identifier_length
6878
caps.max_column_identifier_length = dialect.max_identifier_length
6979
caps.supports_native_boolean = dialect.supports_native_boolean
80+
if dialect.name == "mysql":
81+
caps.format_datetime_literal = _format_mysql_datetime_literal
7082

7183
return caps
7284

dlt/destinations/impl/sqlalchemy/load_jobs.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from dlt.destinations.sql_jobs import SqlFollowupJob, SqlJobParams
1414

1515
from dlt.destinations.impl.sqlalchemy.db_api_client import SqlalchemyClient
16+
from dlt.destinations.impl.sqlalchemy.merge_job import SqlalchemyMergeFollowupJob
1617

1718
if TYPE_CHECKING:
1819
from dlt.destinations.impl.sqlalchemy.sqlalchemy_job_client import SqlalchemyJobClient
@@ -134,3 +135,11 @@ def generate_sql(
134135
statements.append(stmt)
135136

136137
return statements
138+
139+
140+
__all__ = [
141+
"SqlalchemyJsonLInsertJob",
142+
"SqlalchemyParquetInsertJob",
143+
"SqlalchemyStagingCopyJob",
144+
"SqlalchemyMergeFollowupJob",
145+
]

0 commit comments

Comments
 (0)