Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-sigma] Move contextual data from DagsterSigmaTranslator to container classes #26798

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
class MyCustomSigmaTranslator(DagsterSigmaTranslator):
def get_asset_spec(self, data: SigmaWorkbook) -> dg.AssetSpec:
# We create the default asset spec using super()
default_spec = super().get_asset_spec(data)
default_spec = super().get_asset_spec(data) # type: ignore
# we customize the team owner tag for all Sigma assets
return default_spec.replace_attributes(owners=["team:my_team"])

Expand Down
39 changes: 18 additions & 21 deletions python_modules/libraries/dagster-sigma/dagster_sigma/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,7 @@
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import (
AbstractSet,
Any,
Callable,
Dict,
Iterator,
List,
Mapping,
Optional,
Sequence,
Type,
Union,
)
from typing import AbstractSet, Any, Dict, Iterator, List, Mapping, Optional, Sequence, Type, Union

import aiohttp
import dagster._check as check
Expand All @@ -45,10 +33,12 @@
from dagster_sigma.translator import (
DagsterSigmaTranslator,
SigmaDataset,
SigmaDatasetTranslatorData,
SigmaOrganizationData,
SigmaTable,
SigmaWorkbook,
SigmaWorkbookMetadataSet,
SigmaWorkbookTranslatorData,
_inode_from_url,
)

Expand Down Expand Up @@ -719,9 +709,7 @@ def build_defs(
@experimental
def load_sigma_asset_specs(
organization: SigmaOrganization,
dagster_sigma_translator: Callable[
[SigmaOrganizationData], DagsterSigmaTranslator
] = DagsterSigmaTranslator,
dagster_sigma_translator: Type[DagsterSigmaTranslator] = DagsterSigmaTranslator,
sigma_filter: Optional[SigmaFilter] = None,
fetch_column_data: bool = True,
fetch_lineage_data: bool = True,
Expand All @@ -731,7 +719,7 @@ def load_sigma_asset_specs(

Args:
organization (SigmaOrganization): The Sigma organization to fetch assets from.
dagster_sigma_translator (Callable[[SigmaOrganizationData], DagsterSigmaTranslator]): The translator to use
dagster_sigma_translator (Type[DagsterSigmaTranslator]): The translator to use
to convert Sigma content into AssetSpecs. Defaults to DagsterSigmaTranslator.
sigma_filter (Optional[SigmaFilter]): Filters the set of Sigma objects to fetch.
fetch_column_data (bool): Whether to fetch column data for datasets, which can be slow.
Expand Down Expand Up @@ -763,7 +751,8 @@ def load_sigma_asset_specs(


def _get_translator_spec_assert_keys_match(
translator: DagsterSigmaTranslator, data: Union[SigmaDataset, SigmaWorkbook]
translator: DagsterSigmaTranslator,
data: Union[SigmaDatasetTranslatorData, SigmaWorkbookTranslatorData],
) -> AssetSpec:
key = translator.get_asset_key(data)
spec = translator.get_asset_spec(data)
Expand All @@ -778,7 +767,7 @@ def _get_translator_spec_assert_keys_match(
@dataclass
class SigmaOrganizationDefsLoader(StateBackedDefinitionsLoader[SigmaOrganizationData]):
organization: SigmaOrganization
translator_cls: Callable[[SigmaOrganizationData], DagsterSigmaTranslator]
translator_cls: Type[DagsterSigmaTranslator]
snapshot: Optional[RepositoryLoadData]
sigma_filter: Optional[SigmaFilter] = None
fetch_column_data: bool = True
Expand All @@ -801,9 +790,17 @@ def fetch_state(self) -> SigmaOrganizationData:
)

def defs_from_state(self, state: SigmaOrganizationData) -> Definitions:
translator = self.translator_cls(state)
translator = self.translator_cls()
translator_data_workbooks = [
SigmaWorkbookTranslatorData(workbook=workbook, organization_data=state)
for workbook in state.workbooks
]
translator_data_datasets = [
SigmaDatasetTranslatorData(dataset=dataset, organization_data=state)
for dataset in state.datasets
]
asset_specs = [
_get_translator_spec_assert_keys_match(translator, obj)
for obj in [*state.workbooks, *state.datasets]
for obj in [*translator_data_workbooks, *translator_data_datasets]
]
return Definitions(assets=asset_specs)
87 changes: 73 additions & 14 deletions python_modules/libraries/dagster-sigma/dagster_sigma/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,58 @@ def get_table_path(self) -> List[str]:
return self.properties["path"].split("/")[1:] + [self.properties["name"]]


@record
class SigmaWorkbookTranslatorData:
"""A record representing a Sigma workbook and the Sigma organization data."""

workbook: "SigmaWorkbook"
organization_data: "SigmaOrganizationData"

@property
def properties(self) -> Dict[str, Any]:
return self.workbook.properties

@property
def lineage(self) -> List[Dict[str, Any]]:
return self.workbook.lineage

@property
def datasets(self) -> AbstractSet[str]:
return self.workbook.datasets

@property
def direct_table_deps(self) -> AbstractSet[str]:
return self.workbook.direct_table_deps

@property
def owner_email(self) -> Optional[str]:
return self.workbook.owner_email

@property
def materialization_schedules(self) -> Optional[List[Dict[str, Any]]]:
return self.workbook.materialization_schedules


@record
class SigmaDatasetTranslatorData:
"""A record representing a Sigma dataset and the Sigma organization data."""

dataset: "SigmaDataset"
organization_data: "SigmaOrganizationData"

@property
def properties(self) -> Dict[str, Any]:
return self.dataset.properties

@property
def columns(self) -> AbstractSet[str]:
return self.dataset.columns

@property
def inputs(self) -> AbstractSet[str]:
return self.dataset.inputs


@whitelist_for_serdes
@record
class SigmaOrganizationData:
Expand All @@ -111,24 +163,21 @@ class DagsterSigmaTranslator:
Subclass this class to provide custom translation logic.
"""

def __init__(self, context: SigmaOrganizationData):
self._context = context

@property
def organization_data(self) -> SigmaOrganizationData:
return self._context

@deprecated(
breaking_version="1.10",
additional_warn_text="Use `DagsterSigmaTranslator.get_asset_spec(...).key` instead",
)
def get_asset_key(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetKey:
def get_asset_key(
self, data: Union[SigmaDatasetTranslatorData, SigmaWorkbookTranslatorData]
) -> AssetKey:
"""Get the AssetKey for a Sigma object, such as a workbook or dataset."""
return self.get_asset_spec(data).key

def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
def get_asset_spec(
self, data: Union[SigmaDatasetTranslatorData, SigmaWorkbookTranslatorData]
) -> AssetSpec:
"""Get the AssetSpec for a Sigma object, such as a workbook or dataset."""
if isinstance(data, SigmaWorkbook):
if isinstance(data, SigmaWorkbookTranslatorData):
metadata = {
**SigmaWorkbookMetadataSet(
web_url=MetadataValue.url(data.properties["url"]),
Expand All @@ -148,25 +197,35 @@ def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
),
),
}
datasets = [self._context.get_datasets_by_inode()[inode] for inode in data.datasets]
datasets = [
data.organization_data.get_datasets_by_inode()[inode] for inode in data.datasets
]
tables = [
self._context.get_tables_by_inode()[inode] for inode in data.direct_table_deps
data.organization_data.get_tables_by_inode()[inode]
for inode in data.direct_table_deps
]

return AssetSpec(
key=AssetKey(_coerce_input_to_valid_name(data.properties["name"])),
metadata=metadata,
kinds={"sigma", "workbook"},
deps={
*[self.get_asset_key(dataset) for dataset in datasets],
*[
self.get_asset_key(
SigmaDatasetTranslatorData(
dataset=dataset, organization_data=data.organization_data
)
)
for dataset in datasets
],
*[
asset_key_from_table_name(".".join(table.get_table_path()).lower())
for table in tables
],
},
owners=[data.owner_email] if data.owner_email else None,
)
elif isinstance(data, SigmaDataset):
elif isinstance(data, SigmaDatasetTranslatorData):
metadata = {
"dagster_sigma/web_url": MetadataValue.url(data.properties["url"]),
"dagster_sigma/created_at": MetadataValue.timestamp(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
from dagster_sigma.translator import (
DagsterSigmaTranslator,
SigmaDataset,
SigmaDatasetTranslatorData,
SigmaOrganizationData,
SigmaTable,
SigmaWorkbook,
SigmaWorkbookTranslatorData,
)

from dagster_sigma_tests.conftest import (
Expand All @@ -33,16 +35,19 @@ def test_workbook_translation() -> None:

sample_dataset = SigmaDataset(properties=SAMPLE_DATASET_DATA, columns=set(), inputs=set())

translator = DagsterSigmaTranslator(
SigmaOrganizationData(
workbooks=[sample_workbook],
datasets=[sample_dataset],
tables=[SigmaTable(properties=SAMPLE_TABLE_DATA)],
translator = DagsterSigmaTranslator()

asset_spec = translator.get_asset_spec(
SigmaWorkbookTranslatorData(
workbook=sample_workbook,
organization_data=SigmaOrganizationData(
workbooks=[sample_workbook],
datasets=[sample_dataset],
tables=[SigmaTable(properties=SAMPLE_TABLE_DATA)],
),
)
)

asset_spec = translator.get_asset_spec(sample_workbook)

assert asset_spec.key.path == ["Sample_Workbook"]
assert asset_spec.metadata["dagster_sigma/web_url"].value == SAMPLE_WORKBOOK_DATA["url"]
assert asset_spec.metadata["dagster_sigma/version"] == 5
Expand All @@ -63,11 +68,16 @@ def test_dataset_translation() -> None:
inputs={"TESTDB.JAFFLE_SHOP.STG_ORDERS"},
)

translator = DagsterSigmaTranslator(
SigmaOrganizationData(workbooks=[], datasets=[sample_dataset], tables=[])
)
translator = DagsterSigmaTranslator()

asset_spec = translator.get_asset_spec(sample_dataset)
asset_spec = translator.get_asset_spec(
SigmaDatasetTranslatorData(
dataset=sample_dataset,
organization_data=SigmaOrganizationData(
workbooks=[], datasets=[sample_dataset], tables=[]
),
)
)

assert asset_spec.key.path == ["Orders_Dataset"]
assert asset_spec.metadata["dagster_sigma/web_url"].value == SAMPLE_DATASET_DATA["url"]
Expand All @@ -91,9 +101,11 @@ def test_dataset_translation() -> None:

def test_dataset_translation_custom_translator() -> None:
class MyCustomTranslator(DagsterSigmaTranslator):
def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
def get_asset_spec(
self, data: Union[SigmaDatasetTranslatorData, SigmaWorkbookTranslatorData]
) -> AssetSpec:
spec = super().get_asset_spec(data)
if isinstance(data, SigmaDataset):
if isinstance(data, SigmaDatasetTranslatorData):
spec = spec.replace_attributes(
key=spec.key.with_prefix("sigma"), description="Custom description"
)
Expand All @@ -105,11 +117,16 @@ def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
inputs={"TESTDB.JAFFLE_SHOP.STG_ORDERS"},
)

translator = MyCustomTranslator(
SigmaOrganizationData(workbooks=[], datasets=[sample_dataset], tables=[])
)
translator = MyCustomTranslator()

asset_spec = translator.get_asset_spec(sample_dataset)
asset_spec = translator.get_asset_spec(
SigmaDatasetTranslatorData(
dataset=sample_dataset,
organization_data=SigmaOrganizationData(
workbooks=[], datasets=[sample_dataset], tables=[]
),
)
)

assert asset_spec.key.path == ["sigma", "Orders_Dataset"]
assert asset_spec.description == "Custom description"
Loading