Skip to content

Commit

Permalink
[dagster-sigma] Move contextual data from DagsterSigmaTranslator to c…
Browse files Browse the repository at this point in the history
…ontainer classes (#26798)

## Summary & Motivation

Same as #26654 but for Sigma.

## How I Tested These Changes

Updated tests with BK

## Changelog

[dagster-sigma] Type hints in the signature of
`DagsterLookerApiTranslator.get_asset_spec` have been updated - the
parameter `data` is now of type `Union[SigmaDatasetTranslatorData,
SigmaWorkbookTranslatorData]` instead of `Union[SigmaDataset,
SigmaWorkbook]`. Custom Looker API translators should be updated.
  • Loading branch information
maximearmstrong authored Jan 3, 2025
1 parent e81309e commit a2c8ece
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
class MyCustomSigmaTranslator(DagsterSigmaTranslator):
def get_asset_spec(self, data: SigmaWorkbook) -> dg.AssetSpec:
# We create the default asset spec using super()
default_spec = super().get_asset_spec(data)
default_spec = super().get_asset_spec(data) # type: ignore
# we customize the team owner tag for all Sigma assets
return default_spec.replace_attributes(owners=["team:my_team"])

Expand Down
39 changes: 18 additions & 21 deletions python_modules/libraries/dagster-sigma/dagster_sigma/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,7 @@
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import (
AbstractSet,
Any,
Callable,
Dict,
Iterator,
List,
Mapping,
Optional,
Sequence,
Type,
Union,
)
from typing import AbstractSet, Any, Dict, Iterator, List, Mapping, Optional, Sequence, Type, Union

import aiohttp
import dagster._check as check
Expand All @@ -45,10 +33,12 @@
from dagster_sigma.translator import (
DagsterSigmaTranslator,
SigmaDataset,
SigmaDatasetTranslatorData,
SigmaOrganizationData,
SigmaTable,
SigmaWorkbook,
SigmaWorkbookMetadataSet,
SigmaWorkbookTranslatorData,
_inode_from_url,
)

Expand Down Expand Up @@ -719,9 +709,7 @@ def build_defs(
@experimental
def load_sigma_asset_specs(
organization: SigmaOrganization,
dagster_sigma_translator: Callable[
[SigmaOrganizationData], DagsterSigmaTranslator
] = DagsterSigmaTranslator,
dagster_sigma_translator: Type[DagsterSigmaTranslator] = DagsterSigmaTranslator,
sigma_filter: Optional[SigmaFilter] = None,
fetch_column_data: bool = True,
fetch_lineage_data: bool = True,
Expand All @@ -731,7 +719,7 @@ def load_sigma_asset_specs(
Args:
organization (SigmaOrganization): The Sigma organization to fetch assets from.
dagster_sigma_translator (Callable[[SigmaOrganizationData], DagsterSigmaTranslator]): The translator to use
dagster_sigma_translator (Type[DagsterSigmaTranslator]): The translator to use
to convert Sigma content into AssetSpecs. Defaults to DagsterSigmaTranslator.
sigma_filter (Optional[SigmaFilter]): Filters the set of Sigma objects to fetch.
fetch_column_data (bool): Whether to fetch column data for datasets, which can be slow.
Expand Down Expand Up @@ -763,7 +751,8 @@ def load_sigma_asset_specs(


def _get_translator_spec_assert_keys_match(
translator: DagsterSigmaTranslator, data: Union[SigmaDataset, SigmaWorkbook]
translator: DagsterSigmaTranslator,
data: Union[SigmaDatasetTranslatorData, SigmaWorkbookTranslatorData],
) -> AssetSpec:
key = translator.get_asset_key(data)
spec = translator.get_asset_spec(data)
Expand All @@ -778,7 +767,7 @@ def _get_translator_spec_assert_keys_match(
@dataclass
class SigmaOrganizationDefsLoader(StateBackedDefinitionsLoader[SigmaOrganizationData]):
organization: SigmaOrganization
translator_cls: Callable[[SigmaOrganizationData], DagsterSigmaTranslator]
translator_cls: Type[DagsterSigmaTranslator]
snapshot: Optional[RepositoryLoadData]
sigma_filter: Optional[SigmaFilter] = None
fetch_column_data: bool = True
Expand All @@ -801,9 +790,17 @@ def fetch_state(self) -> SigmaOrganizationData:
)

def defs_from_state(self, state: SigmaOrganizationData) -> Definitions:
translator = self.translator_cls(state)
translator = self.translator_cls()
translator_data_workbooks = [
SigmaWorkbookTranslatorData(workbook=workbook, organization_data=state)
for workbook in state.workbooks
]
translator_data_datasets = [
SigmaDatasetTranslatorData(dataset=dataset, organization_data=state)
for dataset in state.datasets
]
asset_specs = [
_get_translator_spec_assert_keys_match(translator, obj)
for obj in [*state.workbooks, *state.datasets]
for obj in [*translator_data_workbooks, *translator_data_datasets]
]
return Definitions(assets=asset_specs)
87 changes: 73 additions & 14 deletions python_modules/libraries/dagster-sigma/dagster_sigma/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,58 @@ def get_table_path(self) -> List[str]:
return self.properties["path"].split("/")[1:] + [self.properties["name"]]


@record
class SigmaWorkbookTranslatorData:
"""A record representing a Sigma workbook and the Sigma organization data."""

workbook: "SigmaWorkbook"
organization_data: "SigmaOrganizationData"

@property
def properties(self) -> Dict[str, Any]:
return self.workbook.properties

@property
def lineage(self) -> List[Dict[str, Any]]:
return self.workbook.lineage

@property
def datasets(self) -> AbstractSet[str]:
return self.workbook.datasets

@property
def direct_table_deps(self) -> AbstractSet[str]:
return self.workbook.direct_table_deps

@property
def owner_email(self) -> Optional[str]:
return self.workbook.owner_email

@property
def materialization_schedules(self) -> Optional[List[Dict[str, Any]]]:
return self.workbook.materialization_schedules


@record
class SigmaDatasetTranslatorData:
"""A record representing a Sigma dataset and the Sigma organization data."""

dataset: "SigmaDataset"
organization_data: "SigmaOrganizationData"

@property
def properties(self) -> Dict[str, Any]:
return self.dataset.properties

@property
def columns(self) -> AbstractSet[str]:
return self.dataset.columns

@property
def inputs(self) -> AbstractSet[str]:
return self.dataset.inputs


@whitelist_for_serdes
@record
class SigmaOrganizationData:
Expand All @@ -111,24 +163,21 @@ class DagsterSigmaTranslator:
Subclass this class to provide custom translation logic.
"""

def __init__(self, context: SigmaOrganizationData):
self._context = context

@property
def organization_data(self) -> SigmaOrganizationData:
return self._context

@deprecated(
breaking_version="1.10",
additional_warn_text="Use `DagsterSigmaTranslator.get_asset_spec(...).key` instead",
)
def get_asset_key(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetKey:
def get_asset_key(
self, data: Union[SigmaDatasetTranslatorData, SigmaWorkbookTranslatorData]
) -> AssetKey:
"""Get the AssetKey for a Sigma object, such as a workbook or dataset."""
return self.get_asset_spec(data).key

def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
def get_asset_spec(
self, data: Union[SigmaDatasetTranslatorData, SigmaWorkbookTranslatorData]
) -> AssetSpec:
"""Get the AssetSpec for a Sigma object, such as a workbook or dataset."""
if isinstance(data, SigmaWorkbook):
if isinstance(data, SigmaWorkbookTranslatorData):
metadata = {
**SigmaWorkbookMetadataSet(
web_url=MetadataValue.url(data.properties["url"]),
Expand All @@ -148,25 +197,35 @@ def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
),
),
}
datasets = [self._context.get_datasets_by_inode()[inode] for inode in data.datasets]
datasets = [
data.organization_data.get_datasets_by_inode()[inode] for inode in data.datasets
]
tables = [
self._context.get_tables_by_inode()[inode] for inode in data.direct_table_deps
data.organization_data.get_tables_by_inode()[inode]
for inode in data.direct_table_deps
]

return AssetSpec(
key=AssetKey(_coerce_input_to_valid_name(data.properties["name"])),
metadata=metadata,
kinds={"sigma", "workbook"},
deps={
*[self.get_asset_key(dataset) for dataset in datasets],
*[
self.get_asset_key(
SigmaDatasetTranslatorData(
dataset=dataset, organization_data=data.organization_data
)
)
for dataset in datasets
],
*[
asset_key_from_table_name(".".join(table.get_table_path()).lower())
for table in tables
],
},
owners=[data.owner_email] if data.owner_email else None,
)
elif isinstance(data, SigmaDataset):
elif isinstance(data, SigmaDatasetTranslatorData):
metadata = {
"dagster_sigma/web_url": MetadataValue.url(data.properties["url"]),
"dagster_sigma/created_at": MetadataValue.timestamp(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
from dagster_sigma.translator import (
DagsterSigmaTranslator,
SigmaDataset,
SigmaDatasetTranslatorData,
SigmaOrganizationData,
SigmaTable,
SigmaWorkbook,
SigmaWorkbookTranslatorData,
)

from dagster_sigma_tests.conftest import (
Expand All @@ -33,16 +35,19 @@ def test_workbook_translation() -> None:

sample_dataset = SigmaDataset(properties=SAMPLE_DATASET_DATA, columns=set(), inputs=set())

translator = DagsterSigmaTranslator(
SigmaOrganizationData(
workbooks=[sample_workbook],
datasets=[sample_dataset],
tables=[SigmaTable(properties=SAMPLE_TABLE_DATA)],
translator = DagsterSigmaTranslator()

asset_spec = translator.get_asset_spec(
SigmaWorkbookTranslatorData(
workbook=sample_workbook,
organization_data=SigmaOrganizationData(
workbooks=[sample_workbook],
datasets=[sample_dataset],
tables=[SigmaTable(properties=SAMPLE_TABLE_DATA)],
),
)
)

asset_spec = translator.get_asset_spec(sample_workbook)

assert asset_spec.key.path == ["Sample_Workbook"]
assert asset_spec.metadata["dagster_sigma/web_url"].value == SAMPLE_WORKBOOK_DATA["url"]
assert asset_spec.metadata["dagster_sigma/version"] == 5
Expand All @@ -63,11 +68,16 @@ def test_dataset_translation() -> None:
inputs={"TESTDB.JAFFLE_SHOP.STG_ORDERS"},
)

translator = DagsterSigmaTranslator(
SigmaOrganizationData(workbooks=[], datasets=[sample_dataset], tables=[])
)
translator = DagsterSigmaTranslator()

asset_spec = translator.get_asset_spec(sample_dataset)
asset_spec = translator.get_asset_spec(
SigmaDatasetTranslatorData(
dataset=sample_dataset,
organization_data=SigmaOrganizationData(
workbooks=[], datasets=[sample_dataset], tables=[]
),
)
)

assert asset_spec.key.path == ["Orders_Dataset"]
assert asset_spec.metadata["dagster_sigma/web_url"].value == SAMPLE_DATASET_DATA["url"]
Expand All @@ -91,9 +101,11 @@ def test_dataset_translation() -> None:

def test_dataset_translation_custom_translator() -> None:
class MyCustomTranslator(DagsterSigmaTranslator):
def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
def get_asset_spec(
self, data: Union[SigmaDatasetTranslatorData, SigmaWorkbookTranslatorData]
) -> AssetSpec:
spec = super().get_asset_spec(data)
if isinstance(data, SigmaDataset):
if isinstance(data, SigmaDatasetTranslatorData):
spec = spec.replace_attributes(
key=spec.key.with_prefix("sigma"), description="Custom description"
)
Expand All @@ -105,11 +117,16 @@ def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
inputs={"TESTDB.JAFFLE_SHOP.STG_ORDERS"},
)

translator = MyCustomTranslator(
SigmaOrganizationData(workbooks=[], datasets=[sample_dataset], tables=[])
)
translator = MyCustomTranslator()

asset_spec = translator.get_asset_spec(sample_dataset)
asset_spec = translator.get_asset_spec(
SigmaDatasetTranslatorData(
dataset=sample_dataset,
organization_data=SigmaOrganizationData(
workbooks=[], datasets=[sample_dataset], tables=[]
),
)
)

assert asset_spec.key.path == ["sigma", "Orders_Dataset"]
assert asset_spec.description == "Custom description"

1 comment on commit a2c8ece

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-hv9je8ddf-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit a2c8ece.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.