Skip to content

Commit ec8d2bc

Browse files
committed
implements missing tests
1 parent 90d9d98 commit ec8d2bc

File tree

10 files changed

+413
-166
lines changed

10 files changed

+413
-166
lines changed

dlt/common/schema/utils.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -763,9 +763,8 @@ def get_inherited_table_hint(
763763
tables: TSchemaTables, table_name: str, table_hint_name: str, allow_none: bool = False
764764
) -> Any:
765765
table = tables.get(table_name, {})
766-
hint = table.get(table_hint_name)
767-
if hint:
768-
return hint
766+
if table_hint_name in table:
767+
return table[table_hint_name] # type: ignore[literal-required]
769768

770769
if is_nested_table(table):
771770
return get_inherited_table_hint(tables, table.get("parent"), table_hint_name, allow_none)

dlt/destinations/utils.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@ def verify_schema_merge_disposition(
120120
" merge keys defined."
121121
" dlt will fall back to `append` for this table."
122122
)
123-
raise RuntimeError()
124123
elif merge_strategy == "upsert":
125124
if not has_column_with_prop(table, "primary_key"):
126125
exception_log.append(

dlt/extract/extractors.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,13 +232,13 @@ def _compute_tables(
232232
) -> List[TTableSchema]:
233233
"""Computes a schema for a new or dynamic table and normalizes identifiers"""
234234
root_table_schema = resource.compute_table_schema(items, meta)
235+
nested_tables_schema = resource.compute_nested_table_schemas(
236+
root_table_schema["name"], self.naming, items, meta
237+
)
235238
# we need to re-normalize name to support legacy normalization mode which we will
236239
# drop in next major version
237240
# TODO: drop in 2.0 also drop SCHEMA__USE_BREAK_PATH_ON_NORMALIZE
238241
root_table_schema["name"] = self._normalize_table_identifier(root_table_schema["name"])
239-
nested_tables_schema = resource.compute_nested_table_schemas(
240-
root_table_schema["name"], self.naming, items, meta
241-
)
242242
return [
243243
utils.normalize_table_identifiers(table_schema, self.naming)
244244
for table_schema in (root_table_schema, *nested_tables_schema)

dlt/extract/source.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -320,16 +320,16 @@ def schema(self) -> Schema:
320320
def schema(self, value: Schema) -> None:
321321
self._schema = value
322322

323-
def discover_schema(self, item: TDataItem = None) -> Schema:
323+
def discover_schema(self, item: TDataItem = None, meta: Any = None) -> Schema:
324324
"""Computes table schemas for all selected resources in the source and merges them with a copy of current source schema. If `item` is provided,
325325
dynamic tables will be evaluated, otherwise those tables will be ignored."""
326326
schema = self._schema.clone(update_normalizers=True)
327327
for r in self.selected_resources.values():
328328
# names must be normalized here
329329
with contextlib.suppress(DataItemRequiredForDynamicTableHints):
330-
root_table_schema = r.compute_table_schema(item)
330+
root_table_schema = r.compute_table_schema(item, meta)
331331
nested_tables_schema = r.compute_nested_table_schemas(
332-
root_table_schema["name"], schema.naming, item
332+
root_table_schema["name"], schema.naming, item, meta
333333
)
334334
# NOTE must ensure that `schema.update_table()` is called in an order that respect parent-child relationships
335335
for table_schema in (root_table_schema, *nested_tables_schema):

dlt/normalize/items_normalizers.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,9 @@ def _normalize_chunk(
133133
schema_contract = self._table_contracts.setdefault(
134134
table_name,
135135
schema.resolve_contract_settings_for_table(
136-
table_name if table_name in schema.tables else parent_table
136+
table_name
137+
if table_name in schema.tables
138+
else parent_table or table_name
137139
), # parent_table, if present, exists in the schema
138140
)
139141
partial_table, filters = schema.apply_schema_contract(

tests/extract/test_decorators.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717
from dlt.common.pipeline import StateInjectableContext, TPipelineState
1818
from dlt.common.schema import Schema
1919
from dlt.common.schema.utils import new_table, new_column
20-
from dlt.common.schema.typing import TTableSchemaColumns
20+
from dlt.common.schema.typing import TTableReference, TTableSchemaColumns
2121
from dlt.common.schema.exceptions import InvalidSchemaName
22-
from dlt.common.typing import TDataItem
22+
from dlt.common.typing import TDataItem, TTableNames
2323

2424
from dlt.cli.source_detection import detect_source_configs
2525
from dlt.common.utils import custom_environ
2626
from dlt.extract.decorators import _DltSingleSource, DltSourceFactoryWrapper
27+
from dlt.extract.hints import TResourceNestedHints
2728
from dlt.extract.reference import SourceReference
2829
from dlt.extract import DltResource, DltSource
2930
from dlt.extract.exceptions import (
@@ -320,7 +321,7 @@ def get_users():
320321

321322

322323
def test_apply_hints_reference() -> None:
323-
example_reference = {
324+
example_reference: TTableReference = {
324325
"columns": ["User ID", "user_name"],
325326
"referenced_table": "users",
326327
"referenced_columns": ["id", "name"],
@@ -342,7 +343,7 @@ def campaign_details(campaign):
342343

343344

344345
def test_nested_hints_decorator() -> None:
345-
nested_hints = {
346+
nested_hints: Dict[TTableNames, TResourceNestedHints] = {
346347
"steps": dlt.mark.make_nested_hints(
347348
columns=[{"name": "timestamp", "data_type": "timestamp"}]
348349
),

tests/extract/test_extract.py

Lines changed: 102 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import dlt
66
from dlt.common import json
77
from dlt.common.data_types.typing import TDataType
8+
from dlt.common.schema.utils import is_nested_table, may_be_nested
89
from dlt.common.storages import (
910
SchemaStorage,
1011
SchemaStorageConfiguration,
@@ -23,6 +24,19 @@
2324
from tests.extract.utils import expect_extracted_file
2425

2526

27+
NESTED_DATA = [
28+
{
29+
"id": 1,
30+
"outer1": [
31+
{"outer1_id": "2", "innerfoo": [{"innerfoo_id": "3"}]},
32+
],
33+
"outer2": [
34+
{"outer2_id": "4", "innerbar": [{"innerbar_id": "5"}]},
35+
],
36+
}
37+
]
38+
39+
2640
@pytest.fixture
2741
def extract_step() -> Extract:
2842
clean_test_storage(init_normalize=True)
@@ -279,19 +293,8 @@ def with_table_hints():
279293

280294

281295
def test_extract_nested_hints(extract_step: Extract) -> None:
282-
data = [
283-
{
284-
"id": 1,
285-
"outer1": [
286-
{"outer1_id": "2", "innerfoo": [{"innerfoo_id": "3"}]},
287-
],
288-
"outer2": [
289-
{"outer2_id": "4", "innerbar": [{"innerbar_id": "5"}]},
290-
],
291-
}
292-
]
293296
resource_name = "with_nested_hints"
294-
nested_resource = DltResource.from_data(data, name=resource_name)
297+
nested_resource = DltResource.from_data(NESTED_DATA, name=resource_name)
295298

296299
# Check 1: apply nested hints
297300
outer1_id_new_type: TDataType = "double"
@@ -318,21 +321,21 @@ def test_extract_nested_hints(extract_step: Extract) -> None:
318321

319322
# root table exists even though there are no explicit hints
320323
assert pre_extract_schema.get_table(resource_name)
321-
assert (
322-
pre_extract_schema.get_table("with_nested_hints__outer1")["parent"] == "with_nested_hints"
323-
)
324-
assert (
325-
pre_extract_schema.get_table("with_nested_hints__outer1")["columns"]
326-
== nested_hints["outer1"]["columns"]
327-
)
328-
assert (
329-
pre_extract_schema.get_table("with_nested_hints__outer2__innerbar")["parent"]
330-
== "with_nested_hints__outer2"
331-
)
332-
assert (
333-
pre_extract_schema.get_table("with_nested_hints__outer2__innerbar")["columns"]
334-
== nested_hints[("outer2", "innerbar")]["columns"]
335-
)
324+
outer1_tab = pre_extract_schema.get_table("with_nested_hints__outer1")
325+
assert outer1_tab["parent"] == "with_nested_hints"
326+
assert outer1_tab["columns"] == nested_hints["outer1"]["columns"]
327+
# no resource on nested table
328+
assert "resource" not in outer1_tab
329+
assert is_nested_table(outer1_tab) is True
330+
assert may_be_nested(outer1_tab) is True
331+
332+
outer2_innerbar_tab = pre_extract_schema.get_table("with_nested_hints__outer2__innerbar")
333+
assert outer2_innerbar_tab["parent"] == "with_nested_hints__outer2"
334+
assert outer2_innerbar_tab["columns"] == nested_hints[("outer2", "innerbar")]["columns"]
335+
assert "resource" not in outer2_innerbar_tab
336+
assert is_nested_table(outer2_innerbar_tab) is True
337+
assert may_be_nested(outer2_innerbar_tab) is True
338+
336339
# this table is generated to ensure `innerbar` has a parent that links it to the root table
337340
# NOTE: nested tables do not have parent set
338341
assert pre_extract_schema.get_table(implicit_parent) == {
@@ -341,19 +344,88 @@ def test_extract_nested_hints(extract_step: Extract) -> None:
341344
"columns": {},
342345
}
343346

344-
source = DltSource(dlt.Schema("hintable"), "module", [nested_resource])
345347
extract_step.extract(source, 20, 1)
346348
# schema after extractions must be same as discovered schema
347349
assert source.schema._schema_tables == pre_extract_schema._schema_tables
348350

349351

350-
def test_break_nesting_with_primary_key() -> None:
352+
def test_break_nesting_with_primary_key(extract_step: Extract) -> None:
353+
resource_name = "with_nested_hints"
354+
nested_resource = DltResource.from_data(NESTED_DATA, name=resource_name)
355+
nested_hints: Dict[TTableNames, TResourceNestedHints] = {
356+
"outer1": {"columns": {"outer1_id": {"name": "outer1_id", "data_type": "bigint"}}},
357+
("outer1", "innerbar"): {"primary_key": "innerfoo_id"},
358+
}
359+
nested_resource.apply_hints(nested_hints=nested_hints)
360+
assert nested_resource.nested_hints == nested_hints
361+
362+
source = DltSource(dlt.Schema("hintable"), "module", [nested_resource])
363+
pre_extract_schema = source.discover_schema()
351364
# primary key will break nesting
365+
# print(pre_extract_schema.to_pretty_yaml())
366+
innerfoo_tab = pre_extract_schema.tables["with_nested_hints__outer1__innerbar"]
367+
assert innerfoo_tab["columns"]["innerfoo_id"]["primary_key"] is True
352368
# resource must be present
369+
assert innerfoo_tab["resource"] == "with_nested_hints"
353370
# parent cannot be present
371+
assert "parent" not in innerfoo_tab["columns"]["innerfoo_id"]
354372
# is_nested_table must be false
373+
assert is_nested_table(innerfoo_tab) is False
374+
assert may_be_nested(innerfoo_tab) is False
375+
extract_step.extract(source, 20, 1)
376+
# schema after extractions must be same as discovered schema
377+
assert source.schema._schema_tables == pre_extract_schema._schema_tables
378+
379+
380+
def test_nested_hints_dynamic_table_names(extract_step: Extract) -> None:
381+
data = [
382+
{"Event": "issue", "DataBlob": [{"ID": 1, "Name": "first", "Date": "2024-01-01"}]},
383+
{"Event": "purchase", "DataBlob": [{"PID": "20-1", "Name": "first", "Date": "2024-01-01"}]},
384+
]
385+
events = DltResource.from_data(
386+
data,
387+
name="events",
388+
hints=dlt.mark.make_hints(
389+
table_name=lambda e: e["Event"],
390+
nested_hints={
391+
"DataBlob": dlt.mark.make_nested_hints(
392+
columns=[{"name": "Date", "data_type": "date"}]
393+
)
394+
},
395+
),
396+
)
397+
398+
source = DltSource(dlt.Schema("hintable"), "module", [events])
399+
extract_step.extract(source, 20, 1)
400+
# make sure that tables exist and types are applies
401+
assert "issue" in source.schema.tables
402+
assert "purchase" in source.schema.tables
403+
assert source.schema.tables["issue__data_blob"]["columns"]["date"]["data_type"] == "date"
404+
assert source.schema.tables["purchase__data_blob"]["columns"]["date"]["data_type"] == "date"
405+
406+
407+
def test_nested_hints_table_name(extract_step: Extract) -> None:
408+
data = [
409+
{"Event": "issue", "DataBlob": [{"ID": 1, "Name": "first", "Date": "2024-01-01"}]},
410+
{"Event": "purchase", "DataBlob": [{"PID": "20-1", "Name": "first", "Date": "2024-01-01"}]},
411+
]
412+
events = DltResource.from_data(
413+
data,
414+
name="events",
415+
hints=dlt.mark.make_hints(
416+
table_name="events_table",
417+
nested_hints={
418+
"DataBlob": dlt.mark.make_nested_hints(
419+
columns=[{"name": "Date", "data_type": "date"}]
420+
)
421+
},
422+
),
423+
)
355424

356-
pass
425+
source = DltSource(dlt.Schema("hintable"), "module", [events])
426+
extract_step.extract(source, 20, 1)
427+
assert "events_table" in source.schema.tables
428+
assert source.schema.tables["events_table__data_blob"]["columns"]["date"]["data_type"] == "date"
357429

358430

359431
def test_extract_metrics_on_exception_no_flush(extract_step: Extract) -> None:

tests/extract/test_sources.py

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1848,20 +1848,66 @@ def test_apply_nested_hints():
18481848
]
18491849

18501850
nested_resource.apply_hints(nested_hints=nested_hints)
1851-
print(nested_resource.nested_hints)
18521851
assert nested_resource.nested_hints == nested_hints
18531852

18541853
nested_schemas = nested_resource.compute_nested_table_schemas(
18551854
resource_name, naming=SnakeCaseNamingConvention()
18561855
)
18571856
assert nested_schemas == expected_nested_schemas
18581857

1859-
# TODO: repeat apply_hints with different nested hints. we have a different code path
1860-
1861-
# TODO: also test nested hints that declare primary or merge keys. that should
1862-
# - generate 'resource' name
1863-
# - drop the `parent`
1864-
# effectively breaking the nesting chain`
1858+
# apply hints again
1859+
nested_hints = {
1860+
("outer1",): {
1861+
"columns": {
1862+
"outer1_id": {"name": "outer1_id", "data_type": "decimal", "precision": 18}
1863+
},
1864+
"references": [
1865+
{
1866+
"columns": ["outer1_id"],
1867+
"referenced_columns": ["outer1_fk_1"],
1868+
"referenced_table": "external_table",
1869+
}
1870+
],
1871+
"file_format": "parquet",
1872+
"table_format": "delta",
1873+
"schema_contract": "discard_value",
1874+
},
1875+
("outer2", "innerbar"): {"merge_key": "innerfoo_id"},
1876+
}
1877+
nested_resource.apply_hints(nested_hints=nested_hints)
1878+
nested_schemas = nested_resource.compute_nested_table_schemas(
1879+
resource_name, naming=SnakeCaseNamingConvention()
1880+
)
1881+
# currently new hints overwrite fully old hints. nothing gets merged
1882+
assert nested_schemas == [
1883+
{
1884+
"columns": {
1885+
"outer1_id": {"name": "outer1_id", "data_type": "decimal", "precision": 18}
1886+
},
1887+
"references": [
1888+
{
1889+
"columns": ["outer1_id"],
1890+
"referenced_columns": ["outer1_fk_1"],
1891+
"referenced_table": "external_table",
1892+
}
1893+
],
1894+
"file_format": "parquet",
1895+
"table_format": "delta",
1896+
"schema_contract": "discard_value",
1897+
"name": "with_nested_hints__outer1",
1898+
"parent": "with_nested_hints",
1899+
},
1900+
{
1901+
"columns": {
1902+
"innerfoo_id": {"name": "innerfoo_id", "nullable": False, "merge_key": True}
1903+
},
1904+
"name": "with_nested_hints__outer2__innerbar",
1905+
"resource": "with_nested_hints",
1906+
},
1907+
]
1908+
# with_nested_hints__outer2__innerbar not a nested table
1909+
assert utils.is_nested_table(nested_schemas[1]) is False
1910+
assert utils.may_be_nested(nested_schemas[1]) is False
18651911

18661912

18671913
def test_resource_no_template() -> None:

0 commit comments

Comments
 (0)