Skip to content

Commit

Permalink
[dagster-sigma] Use Sigma translator instance in spec loader and stat…
Browse files Browse the repository at this point in the history
…e-backed defs
  • Loading branch information
maximearmstrong committed Jan 3, 2025
1 parent a2c8ece commit c090141
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 8 deletions.
27 changes: 20 additions & 7 deletions python_modules/libraries/dagster-sigma/dagster_sigma/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dagster._serdes.serdes import deserialize_value
from dagster._utils.cached_method import cached_method
from dagster._utils.log import get_dagster_logger
from dagster._utils.warnings import deprecation_warning
from pydantic import Field, PrivateAttr
from sqlglot import exp, parse_one

Expand Down Expand Up @@ -709,7 +710,9 @@ def build_defs(
@experimental
def load_sigma_asset_specs(
organization: SigmaOrganization,
dagster_sigma_translator: Type[DagsterSigmaTranslator] = DagsterSigmaTranslator,
dagster_sigma_translator: Optional[
Union[DagsterSigmaTranslator, Type[DagsterSigmaTranslator]]
] = None,
sigma_filter: Optional[SigmaFilter] = None,
fetch_column_data: bool = True,
fetch_lineage_data: bool = True,
Expand All @@ -719,8 +722,9 @@ def load_sigma_asset_specs(
Args:
organization (SigmaOrganization): The Sigma organization to fetch assets from.
dagster_sigma_translator (Type[DagsterSigmaTranslator]): The translator to use
to convert Sigma content into AssetSpecs. Defaults to DagsterSigmaTranslator.
dagster_sigma_translator (Optional[Union[DagsterSigmaTranslator, Type[DagsterSigmaTranslatorr]]]):
The translator to use to convert Sigma content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterSigmaTranslator`.
sigma_filter (Optional[SigmaFilter]): Filters the set of Sigma objects to fetch.
fetch_column_data (bool): Whether to fetch column data for datasets, which can be slow.
fetch_lineage_data (bool): Whether to fetch any lineage data for workbooks and datasets.
Expand All @@ -730,6 +734,16 @@ def load_sigma_asset_specs(
Returns:
List[AssetSpec]: The set of assets representing the Sigma content in the organization.
"""
if isinstance(dagster_sigma_translator, type):
deprecation_warning(
subject="Support of `dagster_sigma_translator` as a Type[DagsterSigmaTranslator]",
breaking_version="1.10",
additional_warn_text=(
"Pass an instance of DagsterSigmaTranslator or subclass to `dagster_sigma_translator` instead."
),
)
dagster_sigma_translator = dagster_sigma_translator()

snapshot = None
if snapshot_path and not os.getenv(SNAPSHOT_ENV_VAR_NAME):
snapshot = deserialize_value(Path(snapshot_path).read_text(), RepositoryLoadData)
Expand All @@ -738,7 +752,7 @@ def load_sigma_asset_specs(
return check.is_list(
SigmaOrganizationDefsLoader(
organization=initialized_organization,
translator_cls=dagster_sigma_translator,
translator=dagster_sigma_translator or DagsterSigmaTranslator(),
sigma_filter=sigma_filter,
fetch_column_data=fetch_column_data,
fetch_lineage_data=fetch_lineage_data,
Expand Down Expand Up @@ -767,7 +781,7 @@ def _get_translator_spec_assert_keys_match(
@dataclass
class SigmaOrganizationDefsLoader(StateBackedDefinitionsLoader[SigmaOrganizationData]):
organization: SigmaOrganization
translator_cls: Type[DagsterSigmaTranslator]
translator: DagsterSigmaTranslator
snapshot: Optional[RepositoryLoadData]
sigma_filter: Optional[SigmaFilter] = None
fetch_column_data: bool = True
Expand All @@ -790,7 +804,6 @@ def fetch_state(self) -> SigmaOrganizationData:
)

def defs_from_state(self, state: SigmaOrganizationData) -> Definitions:
translator = self.translator_cls()
translator_data_workbooks = [
SigmaWorkbookTranslatorData(workbook=workbook, organization_data=state)
for workbook in state.workbooks
Expand All @@ -800,7 +813,7 @@ def defs_from_state(self, state: SigmaOrganizationData) -> Definitions:
for dataset in state.datasets
]
asset_specs = [
_get_translator_spec_assert_keys_match(translator, obj)
_get_translator_spec_assert_keys_match(self.translator, obj)
for obj in [*translator_data_workbooks, *translator_data_datasets]
]
return Definitions(assets=asset_specs)
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ def get_asset_spec(self, data) -> AssetSpec:
client_secret=EnvVar("SIGMA_CLIENT_SECRET"),
)

sigma_specs = load_sigma_asset_specs(resource, dagster_sigma_translator=MyCoolTranslator)
sigma_specs = load_sigma_asset_specs(resource, dagster_sigma_translator=MyCoolTranslator())
defs = Definitions(assets=[*sigma_specs], jobs=[define_asset_job("all_asset_job")])
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from dagster import EnvVar, define_asset_job
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_class import Definitions
from dagster._utils.env import environ
from dagster_sigma import (
DagsterSigmaTranslator,
SigmaBaseUrl,
SigmaOrganization,
load_sigma_asset_specs,
)

fake_client_id = "fake_client_id"
fake_client_secret = "fake_client_secret"

with environ({"SIGMA_CLIENT_ID": fake_client_id, "SIGMA_CLIENT_SECRET": fake_client_secret}):
fake_token = "fake_token"

class MyCoolTranslator(DagsterSigmaTranslator):
def get_asset_spec(self, data) -> AssetSpec:
spec = super().get_asset_spec(data)
return spec.replace_attributes(
key=spec.key.with_prefix("my_prefix"),
)

resource = SigmaOrganization(
base_url=SigmaBaseUrl.AWS_US,
client_id=EnvVar("SIGMA_CLIENT_ID"),
client_secret=EnvVar("SIGMA_CLIENT_SECRET"),
)

# Pass the translator type
sigma_specs = load_sigma_asset_specs(resource, dagster_sigma_translator=MyCoolTranslator)
defs = Definitions(assets=[*sigma_specs], jobs=[define_asset_job("all_asset_job")])
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pathlib import Path
from tempfile import TemporaryDirectory

import pytest
import responses
from click.testing import CliRunner
from dagster._core.code_pointer import CodePointer
Expand Down Expand Up @@ -117,3 +118,26 @@ def test_load_assets_organization_data_translator(
assert all(
key.path[0] == "my_prefix" for key in repository_def.assets_defs_by_key.keys()
), repository_def.assets_defs_by_key


@responses.activate
def test_load_assets_organization_data_translator_legacy(
sigma_auth_token: str, sigma_sample_data: None
) -> None:
with instance_for_test() as _instance:
with pytest.warns(
DeprecationWarning,
match=r"Support of `dagster_sigma_translator` as a Type\[DagsterSigmaTranslator\]",
):
repository_def = initialize_repository_def_from_pointer(
CodePointer.from_python_file(
str(Path(__file__).parent / "pending_repo_with_translator_legacy.py"),
"defs",
None,
),
)

assert len(repository_def.assets_defs_by_key) == 2
assert all(
key.path[0] == "my_prefix" for key in repository_def.assets_defs_by_key.keys()
), repository_def.assets_defs_by_key

0 comments on commit c090141

Please sign in to comment.