diff --git a/examples/docs_snippets/docs_snippets/integrations/sigma/customize-sigma-asset-defs.py b/examples/docs_snippets/docs_snippets/integrations/sigma/customize-sigma-asset-defs.py index 4daac3247cf14..c33fcdbce0df0 100644 --- a/examples/docs_snippets/docs_snippets/integrations/sigma/customize-sigma-asset-defs.py +++ b/examples/docs_snippets/docs_snippets/integrations/sigma/customize-sigma-asset-defs.py @@ -19,7 +19,7 @@ class MyCustomSigmaTranslator(DagsterSigmaTranslator): def get_asset_spec(self, data: SigmaWorkbook) -> dg.AssetSpec: # We create the default asset spec using super() - default_spec = super().get_asset_spec(data) + default_spec = super().get_asset_spec(data) # type: ignore # we customize the team owner tag for all Sigma assets return default_spec.replace_attributes(owners=["team:my_team"]) diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py index 5e533517fc1a2..b6d5ae249c099 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/resource.py @@ -9,19 +9,7 @@ from dataclasses import dataclass from enum import Enum from pathlib import Path -from typing import ( - AbstractSet, - Any, - Callable, - Dict, - Iterator, - List, - Mapping, - Optional, - Sequence, - Type, - Union, -) +from typing import AbstractSet, Any, Dict, Iterator, List, Mapping, Optional, Sequence, Type, Union import aiohttp import dagster._check as check @@ -45,10 +33,12 @@ from dagster_sigma.translator import ( DagsterSigmaTranslator, SigmaDataset, + SigmaDatasetTranslatorData, SigmaOrganizationData, SigmaTable, SigmaWorkbook, SigmaWorkbookMetadataSet, + SigmaWorkbookTranslatorData, _inode_from_url, ) @@ -719,9 +709,7 @@ def build_defs( @experimental def load_sigma_asset_specs( organization: SigmaOrganization, - dagster_sigma_translator: Callable[ - [SigmaOrganizationData], DagsterSigmaTranslator - ] = DagsterSigmaTranslator, + dagster_sigma_translator: Type[DagsterSigmaTranslator] = DagsterSigmaTranslator, sigma_filter: Optional[SigmaFilter] = None, fetch_column_data: bool = True, fetch_lineage_data: bool = True, @@ -731,7 +719,7 @@ def load_sigma_asset_specs( Args: organization (SigmaOrganization): The Sigma organization to fetch assets from. - dagster_sigma_translator (Callable[[SigmaOrganizationData], DagsterSigmaTranslator]): The translator to use + dagster_sigma_translator (Type[DagsterSigmaTranslator]): The translator to use to convert Sigma content into AssetSpecs. Defaults to DagsterSigmaTranslator. sigma_filter (Optional[SigmaFilter]): Filters the set of Sigma objects to fetch. fetch_column_data (bool): Whether to fetch column data for datasets, which can be slow. @@ -763,7 +751,8 @@ def load_sigma_asset_specs( def _get_translator_spec_assert_keys_match( - translator: DagsterSigmaTranslator, data: Union[SigmaDataset, SigmaWorkbook] + translator: DagsterSigmaTranslator, + data: Union[SigmaDatasetTranslatorData, SigmaWorkbookTranslatorData], ) -> AssetSpec: key = translator.get_asset_key(data) spec = translator.get_asset_spec(data) @@ -778,7 +767,7 @@ def _get_translator_spec_assert_keys_match( @dataclass class SigmaOrganizationDefsLoader(StateBackedDefinitionsLoader[SigmaOrganizationData]): organization: SigmaOrganization - translator_cls: Callable[[SigmaOrganizationData], DagsterSigmaTranslator] + translator_cls: Type[DagsterSigmaTranslator] snapshot: Optional[RepositoryLoadData] sigma_filter: Optional[SigmaFilter] = None fetch_column_data: bool = True @@ -801,9 +790,17 @@ def fetch_state(self) -> SigmaOrganizationData: ) def defs_from_state(self, state: SigmaOrganizationData) -> Definitions: - translator = self.translator_cls(state) + translator = self.translator_cls() + translator_data_workbooks = [ + SigmaWorkbookTranslatorData(workbook=workbook, organization_data=state) + for workbook in state.workbooks + ] + translator_data_datasets = [ + SigmaDatasetTranslatorData(dataset=dataset, organization_data=state) + for dataset in state.datasets + ] asset_specs = [ _get_translator_spec_assert_keys_match(translator, obj) - for obj in [*state.workbooks, *state.datasets] + for obj in [*translator_data_workbooks, *translator_data_datasets] ] return Definitions(assets=asset_specs) diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma/translator.py b/python_modules/libraries/dagster-sigma/dagster_sigma/translator.py index de54d74219115..e2f0d7095355c 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma/translator.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma/translator.py @@ -90,6 +90,58 @@ def get_table_path(self) -> List[str]: return self.properties["path"].split("/")[1:] + [self.properties["name"]] +@record +class SigmaWorkbookTranslatorData: + """A record representing a Sigma workbook and the Sigma organization data.""" + + workbook: "SigmaWorkbook" + organization_data: "SigmaOrganizationData" + + @property + def properties(self) -> Dict[str, Any]: + return self.workbook.properties + + @property + def lineage(self) -> List[Dict[str, Any]]: + return self.workbook.lineage + + @property + def datasets(self) -> AbstractSet[str]: + return self.workbook.datasets + + @property + def direct_table_deps(self) -> AbstractSet[str]: + return self.workbook.direct_table_deps + + @property + def owner_email(self) -> Optional[str]: + return self.workbook.owner_email + + @property + def materialization_schedules(self) -> Optional[List[Dict[str, Any]]]: + return self.workbook.materialization_schedules + + +@record +class SigmaDatasetTranslatorData: + """A record representing a Sigma dataset and the Sigma organization data.""" + + dataset: "SigmaDataset" + organization_data: "SigmaOrganizationData" + + @property + def properties(self) -> Dict[str, Any]: + return self.dataset.properties + + @property + def columns(self) -> AbstractSet[str]: + return self.dataset.columns + + @property + def inputs(self) -> AbstractSet[str]: + return self.dataset.inputs + + @whitelist_for_serdes @record class SigmaOrganizationData: @@ -111,24 +163,21 @@ class DagsterSigmaTranslator: Subclass this class to provide custom translation logic. """ - def __init__(self, context: SigmaOrganizationData): - self._context = context - - @property - def organization_data(self) -> SigmaOrganizationData: - return self._context - @deprecated( breaking_version="1.10", additional_warn_text="Use `DagsterSigmaTranslator.get_asset_spec(...).key` instead", ) - def get_asset_key(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetKey: + def get_asset_key( + self, data: Union[SigmaDatasetTranslatorData, SigmaWorkbookTranslatorData] + ) -> AssetKey: """Get the AssetKey for a Sigma object, such as a workbook or dataset.""" return self.get_asset_spec(data).key - def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec: + def get_asset_spec( + self, data: Union[SigmaDatasetTranslatorData, SigmaWorkbookTranslatorData] + ) -> AssetSpec: """Get the AssetSpec for a Sigma object, such as a workbook or dataset.""" - if isinstance(data, SigmaWorkbook): + if isinstance(data, SigmaWorkbookTranslatorData): metadata = { **SigmaWorkbookMetadataSet( web_url=MetadataValue.url(data.properties["url"]), @@ -148,9 +197,12 @@ def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec: ), ), } - datasets = [self._context.get_datasets_by_inode()[inode] for inode in data.datasets] + datasets = [ + data.organization_data.get_datasets_by_inode()[inode] for inode in data.datasets + ] tables = [ - self._context.get_tables_by_inode()[inode] for inode in data.direct_table_deps + data.organization_data.get_tables_by_inode()[inode] + for inode in data.direct_table_deps ] return AssetSpec( @@ -158,7 +210,14 @@ def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec: metadata=metadata, kinds={"sigma", "workbook"}, deps={ - *[self.get_asset_key(dataset) for dataset in datasets], + *[ + self.get_asset_key( + SigmaDatasetTranslatorData( + dataset=dataset, organization_data=data.organization_data + ) + ) + for dataset in datasets + ], *[ asset_key_from_table_name(".".join(table.get_table_path()).lower()) for table in tables @@ -166,7 +225,7 @@ def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec: }, owners=[data.owner_email] if data.owner_email else None, ) - elif isinstance(data, SigmaDataset): + elif isinstance(data, SigmaDatasetTranslatorData): metadata = { "dagster_sigma/web_url": MetadataValue.url(data.properties["url"]), "dagster_sigma/created_at": MetadataValue.timestamp( diff --git a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_translator.py b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_translator.py index a32aa3977518e..3b1170d488b72 100644 --- a/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_translator.py +++ b/python_modules/libraries/dagster-sigma/dagster_sigma_tests/test_translator.py @@ -7,9 +7,11 @@ from dagster_sigma.translator import ( DagsterSigmaTranslator, SigmaDataset, + SigmaDatasetTranslatorData, SigmaOrganizationData, SigmaTable, SigmaWorkbook, + SigmaWorkbookTranslatorData, ) from dagster_sigma_tests.conftest import ( @@ -33,16 +35,19 @@ def test_workbook_translation() -> None: sample_dataset = SigmaDataset(properties=SAMPLE_DATASET_DATA, columns=set(), inputs=set()) - translator = DagsterSigmaTranslator( - SigmaOrganizationData( - workbooks=[sample_workbook], - datasets=[sample_dataset], - tables=[SigmaTable(properties=SAMPLE_TABLE_DATA)], + translator = DagsterSigmaTranslator() + + asset_spec = translator.get_asset_spec( + SigmaWorkbookTranslatorData( + workbook=sample_workbook, + organization_data=SigmaOrganizationData( + workbooks=[sample_workbook], + datasets=[sample_dataset], + tables=[SigmaTable(properties=SAMPLE_TABLE_DATA)], + ), ) ) - asset_spec = translator.get_asset_spec(sample_workbook) - assert asset_spec.key.path == ["Sample_Workbook"] assert asset_spec.metadata["dagster_sigma/web_url"].value == SAMPLE_WORKBOOK_DATA["url"] assert asset_spec.metadata["dagster_sigma/version"] == 5 @@ -63,11 +68,16 @@ def test_dataset_translation() -> None: inputs={"TESTDB.JAFFLE_SHOP.STG_ORDERS"}, ) - translator = DagsterSigmaTranslator( - SigmaOrganizationData(workbooks=[], datasets=[sample_dataset], tables=[]) - ) + translator = DagsterSigmaTranslator() - asset_spec = translator.get_asset_spec(sample_dataset) + asset_spec = translator.get_asset_spec( + SigmaDatasetTranslatorData( + dataset=sample_dataset, + organization_data=SigmaOrganizationData( + workbooks=[], datasets=[sample_dataset], tables=[] + ), + ) + ) assert asset_spec.key.path == ["Orders_Dataset"] assert asset_spec.metadata["dagster_sigma/web_url"].value == SAMPLE_DATASET_DATA["url"] @@ -91,9 +101,11 @@ def test_dataset_translation() -> None: def test_dataset_translation_custom_translator() -> None: class MyCustomTranslator(DagsterSigmaTranslator): - def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec: + def get_asset_spec( + self, data: Union[SigmaDatasetTranslatorData, SigmaWorkbookTranslatorData] + ) -> AssetSpec: spec = super().get_asset_spec(data) - if isinstance(data, SigmaDataset): + if isinstance(data, SigmaDatasetTranslatorData): spec = spec.replace_attributes( key=spec.key.with_prefix("sigma"), description="Custom description" ) @@ -105,11 +117,16 @@ def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec: inputs={"TESTDB.JAFFLE_SHOP.STG_ORDERS"}, ) - translator = MyCustomTranslator( - SigmaOrganizationData(workbooks=[], datasets=[sample_dataset], tables=[]) - ) + translator = MyCustomTranslator() - asset_spec = translator.get_asset_spec(sample_dataset) + asset_spec = translator.get_asset_spec( + SigmaDatasetTranslatorData( + dataset=sample_dataset, + organization_data=SigmaOrganizationData( + workbooks=[], datasets=[sample_dataset], tables=[] + ), + ) + ) assert asset_spec.key.path == ["sigma", "Orders_Dataset"] assert asset_spec.description == "Custom description"