Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-sigma] Use Sigma translator instance in spec loader and state-backed defs #26799

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions python_modules/libraries/dagster-sigma/dagster_sigma/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dagster._serdes.serdes import deserialize_value
from dagster._utils.cached_method import cached_method
from dagster._utils.log import get_dagster_logger
from dagster._utils.warnings import deprecation_warning
from pydantic import Field, PrivateAttr
from sqlglot import exp, parse_one

Expand Down Expand Up @@ -709,7 +710,9 @@ def build_defs(
@experimental
def load_sigma_asset_specs(
organization: SigmaOrganization,
dagster_sigma_translator: Type[DagsterSigmaTranslator] = DagsterSigmaTranslator,
dagster_sigma_translator: Optional[
Union[DagsterSigmaTranslator, Type[DagsterSigmaTranslator]]
] = None,
sigma_filter: Optional[SigmaFilter] = None,
fetch_column_data: bool = True,
fetch_lineage_data: bool = True,
Expand All @@ -719,8 +722,9 @@ def load_sigma_asset_specs(

Args:
organization (SigmaOrganization): The Sigma organization to fetch assets from.
dagster_sigma_translator (Type[DagsterSigmaTranslator]): The translator to use
to convert Sigma content into AssetSpecs. Defaults to DagsterSigmaTranslator.
dagster_sigma_translator (Optional[Union[DagsterSigmaTranslator, Type[DagsterSigmaTranslatorr]]]):
The translator to use to convert Sigma content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterSigmaTranslator`.
sigma_filter (Optional[SigmaFilter]): Filters the set of Sigma objects to fetch.
fetch_column_data (bool): Whether to fetch column data for datasets, which can be slow.
fetch_lineage_data (bool): Whether to fetch any lineage data for workbooks and datasets.
Expand All @@ -730,6 +734,16 @@ def load_sigma_asset_specs(
Returns:
List[AssetSpec]: The set of assets representing the Sigma content in the organization.
"""
if isinstance(dagster_sigma_translator, type):
deprecation_warning(
subject="Support of `dagster_sigma_translator` as a Type[DagsterSigmaTranslator]",
breaking_version="1.10",
additional_warn_text=(
"Pass an instance of DagsterSigmaTranslator or subclass to `dagster_sigma_translator` instead."
),
)
dagster_sigma_translator = dagster_sigma_translator()

snapshot = None
if snapshot_path and not os.getenv(SNAPSHOT_ENV_VAR_NAME):
snapshot = deserialize_value(Path(snapshot_path).read_text(), RepositoryLoadData)
Expand All @@ -738,7 +752,7 @@ def load_sigma_asset_specs(
return check.is_list(
SigmaOrganizationDefsLoader(
organization=initialized_organization,
translator_cls=dagster_sigma_translator,
translator=dagster_sigma_translator or DagsterSigmaTranslator(),
sigma_filter=sigma_filter,
fetch_column_data=fetch_column_data,
fetch_lineage_data=fetch_lineage_data,
Expand Down Expand Up @@ -767,7 +781,7 @@ def _get_translator_spec_assert_keys_match(
@dataclass
class SigmaOrganizationDefsLoader(StateBackedDefinitionsLoader[SigmaOrganizationData]):
organization: SigmaOrganization
translator_cls: Type[DagsterSigmaTranslator]
translator: DagsterSigmaTranslator
snapshot: Optional[RepositoryLoadData]
sigma_filter: Optional[SigmaFilter] = None
fetch_column_data: bool = True
Expand All @@ -790,7 +804,6 @@ def fetch_state(self) -> SigmaOrganizationData:
)

def defs_from_state(self, state: SigmaOrganizationData) -> Definitions:
translator = self.translator_cls()
translator_data_workbooks = [
SigmaWorkbookTranslatorData(workbook=workbook, organization_data=state)
for workbook in state.workbooks
Expand All @@ -800,7 +813,7 @@ def defs_from_state(self, state: SigmaOrganizationData) -> Definitions:
for dataset in state.datasets
]
asset_specs = [
_get_translator_spec_assert_keys_match(translator, obj)
_get_translator_spec_assert_keys_match(self.translator, obj)
for obj in [*translator_data_workbooks, *translator_data_datasets]
]
return Definitions(assets=asset_specs)
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ def get_asset_spec(self, data) -> AssetSpec:
client_secret=EnvVar("SIGMA_CLIENT_SECRET"),
)

sigma_specs = load_sigma_asset_specs(resource, dagster_sigma_translator=MyCoolTranslator)
sigma_specs = load_sigma_asset_specs(resource, dagster_sigma_translator=MyCoolTranslator())
defs = Definitions(assets=[*sigma_specs], jobs=[define_asset_job("all_asset_job")])
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from dagster import EnvVar, define_asset_job
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_class import Definitions
from dagster._utils.env import environ
from dagster_sigma import (
DagsterSigmaTranslator,
SigmaBaseUrl,
SigmaOrganization,
load_sigma_asset_specs,
)

fake_client_id = "fake_client_id"
fake_client_secret = "fake_client_secret"

with environ({"SIGMA_CLIENT_ID": fake_client_id, "SIGMA_CLIENT_SECRET": fake_client_secret}):
fake_token = "fake_token"

class MyCoolTranslator(DagsterSigmaTranslator):
def get_asset_spec(self, data) -> AssetSpec:
spec = super().get_asset_spec(data)
return spec.replace_attributes(
key=spec.key.with_prefix("my_prefix"),
)

resource = SigmaOrganization(
base_url=SigmaBaseUrl.AWS_US,
client_id=EnvVar("SIGMA_CLIENT_ID"),
client_secret=EnvVar("SIGMA_CLIENT_SECRET"),
)

# Pass the translator type
sigma_specs = load_sigma_asset_specs(resource, dagster_sigma_translator=MyCoolTranslator)
defs = Definitions(assets=[*sigma_specs], jobs=[define_asset_job("all_asset_job")])
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pathlib import Path
from tempfile import TemporaryDirectory

import pytest
import responses
from click.testing import CliRunner
from dagster._core.code_pointer import CodePointer
Expand Down Expand Up @@ -117,3 +118,26 @@ def test_load_assets_organization_data_translator(
assert all(
key.path[0] == "my_prefix" for key in repository_def.assets_defs_by_key.keys()
), repository_def.assets_defs_by_key


@responses.activate
def test_load_assets_organization_data_translator_legacy(
sigma_auth_token: str, sigma_sample_data: None
) -> None:
with instance_for_test() as _instance:
with pytest.warns(
DeprecationWarning,
match=r"Support of `dagster_sigma_translator` as a Type\[DagsterSigmaTranslator\]",
):
repository_def = initialize_repository_def_from_pointer(
CodePointer.from_python_file(
str(Path(__file__).parent / "pending_repo_with_translator_legacy.py"),
"defs",
None,
),
)

assert len(repository_def.assets_defs_by_key) == 2
assert all(
key.path[0] == "my_prefix" for key in repository_def.assets_defs_by_key.keys()
), repository_def.assets_defs_by_key