Skip to content

Commit

Permalink
Ele 3634 add snapshot to the report (#1703)
Browse files Browse the repository at this point in the history
* add snapshot to the report - models, lineage, and groups

* extend entity fields

* filters help text

* fix lineage

* cli create temp table macro

* fix groups and owners

* remove empty line

* update help text

Co-authored-by: Mika Kerman <[email protected]>

* add table name to snapshot

* fix table name

---------

Co-authored-by: Mika Kerman <[email protected]>
  • Loading branch information
NoyaArie and MikaKerman authored Jan 2, 2025
1 parent 69393bd commit e8dbdbb
Show file tree
Hide file tree
Showing 12 changed files with 163 additions and 10 deletions.
20 changes: 18 additions & 2 deletions elementary/monitor/api/groups/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
)
from elementary.monitor.fetchers.tests.schema import NormalizedTestSchema
Expand All @@ -31,6 +32,7 @@
NormalizedExposureSchema,
NormalizedTestSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
]


Expand Down Expand Up @@ -83,7 +85,14 @@ def get_dwh_view(self, artifacts: List[GROUPABLE_ARTIFACT]) -> TreeGroupSchema:
filtered_artifacts: List[GROUPABLE_ARTIFACT] = [
artifact
for artifact in artifacts
if isinstance(artifact, (NormalizedSourceSchema, NormalizedModelSchema))
if isinstance(
artifact,
(
NormalizedSourceSchema,
NormalizedModelSchema,
NormalizedSnapshotSchema,
),
)
]
return self.get_fqn_view(filtered_artifacts)

Expand All @@ -107,7 +116,14 @@ def get_fqn_view(self, artifacts: List[GROUPABLE_ARTIFACT]) -> TreeGroupSchema:
for artifact in artifacts
if artifact.unique_id is not None
and artifact.fqn is not None
and isinstance(artifact, (NormalizedSourceSchema, NormalizedModelSchema))
and isinstance(
artifact,
(
NormalizedSourceSchema,
NormalizedModelSchema,
NormalizedSnapshotSchema,
),
)
)
tree_builder = TreeBuilder[GroupItemSchema](separator=".")
for artifact in filtered_artifacts:
Expand Down
2 changes: 1 addition & 1 deletion elementary/monitor/api/lineage/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from elementary.utils.pydantic_shim import BaseModel, validator

NodeUniqueIdType = str
NodeType = Literal["seed", "model", "source", "exposure"]
NodeType = Literal["snapshot", "seed", "model", "source", "exposure"]
NodeSubType = Literal["table", "view"]


Expand Down
30 changes: 28 additions & 2 deletions elementary/monitor/api/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
TotalsModelRunsSchema,
)
Expand All @@ -26,6 +27,7 @@
from elementary.monitor.fetchers.models.schema import (
ModelSchema,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.log import get_logger
Expand All @@ -36,6 +38,7 @@
class ModelsAPI(APIClient):
_ARTIFACT_TYPE_DIR_MAP = {
SeedSchema: "seeds",
SnapshotSchema: "snapshots",
SourceSchema: "sources",
ModelSchema: "models",
ExposureSchema: "exposures",
Expand Down Expand Up @@ -133,6 +136,16 @@ def get_seeds(self) -> Dict[str, NormalizedSeedSchema]:
seeds[seed_unique_id] = normalized_seed
return seeds

def get_snapshots(self) -> Dict[str, NormalizedSnapshotSchema]:
snapshot_results = self.models_fetcher.get_snapshots()
snapshots = dict()
if snapshot_results:
for snapshot_result in snapshot_results:
normalized_snapshot = self._normalize_dbt_artifact_dict(snapshot_result)
snapshot_unique_id = cast(str, normalized_snapshot.unique_id)
snapshots[snapshot_unique_id] = normalized_snapshot
return snapshots

def get_models(
self, exclude_elementary_models: bool = False
) -> Dict[str, NormalizedModelSchema]:
Expand Down Expand Up @@ -239,6 +252,12 @@ def _normalize_dbt_artifact_dict(
) -> NormalizedSeedSchema:
...

@overload
def _normalize_dbt_artifact_dict(
self, artifact: SnapshotSchema
) -> NormalizedSnapshotSchema:
...

@overload
def _normalize_dbt_artifact_dict(
self, artifact: ModelSchema
Expand All @@ -258,15 +277,20 @@ def _normalize_dbt_artifact_dict(
...

def _normalize_dbt_artifact_dict(
self, artifact: Union[SeedSchema, ModelSchema, ExposureSchema, SourceSchema]
self,
artifact: Union[
SeedSchema, SnapshotSchema, ModelSchema, ExposureSchema, SourceSchema
],
) -> Union[
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedModelSchema,
NormalizedExposureSchema,
NormalizedSourceSchema,
]:
schema_to_normalized_schema_map = {
SeedSchema: NormalizedSeedSchema,
SnapshotSchema: NormalizedSnapshotSchema,
ExposureSchema: NormalizedExposureSchema,
ModelSchema: NormalizedModelSchema,
SourceSchema: NormalizedSourceSchema,
Expand Down Expand Up @@ -308,7 +332,9 @@ def _normalize_artifact_path(cls, artifact: ArtifactSchemaType, fqn: str) -> str
@classmethod
def _fqn(
cls,
artifact: Union[ModelSchema, ExposureSchema, SourceSchema, SeedSchema],
artifact: Union[
ModelSchema, ExposureSchema, SourceSchema, SeedSchema, SnapshotSchema
],
) -> str:
if isinstance(artifact, ExposureSchema):
path = (artifact.meta or {}).get("path")
Expand Down
6 changes: 6 additions & 0 deletions elementary/monitor/api/models/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
ExposureSchema,
ModelSchema,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.pydantic_shim import BaseModel, Field, validator
Expand Down Expand Up @@ -41,6 +42,11 @@ class NormalizedSeedSchema(NormalizedArtifactSchema, SeedSchema):
artifact_type: str = Field("seed", const=True) # type: ignore # noqa


# NormalizedArtifactSchema must be first in the inheritance order
class NormalizedSnapshotSchema(NormalizedArtifactSchema, SnapshotSchema):
artifact_type: str = Field("snapshot", const=True) # type: ignore # noqa


# NormalizedArtifactSchema must be first in the inheritance order
class NormalizedModelSchema(NormalizedArtifactSchema, ModelSchema):
artifact_type: str = Field("model", const=True) # type: ignore # noqa
Expand Down
19 changes: 16 additions & 3 deletions elementary/monitor/api/report/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
)
from elementary.monitor.api.report.schema import ReportDataEnvSchema, ReportDataSchema
Expand Down Expand Up @@ -47,11 +48,19 @@ def _get_groups(
sources: Iterable[NormalizedSourceSchema],
exposures: Iterable[NormalizedExposureSchema],
seeds: Iterable[NormalizedSeedSchema],
snapshots: Iterable[NormalizedSnapshotSchema],
singular_tests: Iterable[NormalizedTestSchema],
) -> GroupsSchema:
groups_api = GroupsAPI(self.dbt_runner)
return groups_api.get_groups(
artifacts=[*models, *sources, *exposures, *seeds, *singular_tests]
artifacts=[
*models,
*sources,
*exposures,
*seeds,
*snapshots,
*singular_tests,
]
)

def get_report_data(
Expand Down Expand Up @@ -86,6 +95,8 @@ def get_report_data(
lineage_node_ids: List[str] = []
seeds = models_api.get_seeds()
lineage_node_ids.extend(seeds.keys())
snapshots = models_api.get_snapshots()
lineage_node_ids.extend(snapshots.keys())
models = models_api.get_models(exclude_elementary_models)
lineage_node_ids.extend(models.keys())
sources = models_api.get_sources()
Expand All @@ -99,6 +110,7 @@ def get_report_data(
sources.values(),
exposures.values(),
seeds.values(),
snapshots.values(),
singular_tests,
)

Expand Down Expand Up @@ -147,7 +159,7 @@ def get_report_data(

serializable_groups = groups.dict()
serializable_models = self._serialize_models(
models, sources, exposures, seeds
models, sources, exposures, seeds, snapshots
)
serializable_model_runs = self._serialize_models_runs(models_runs.runs)
serializable_model_runs_totals = models_runs.dict(include={"totals"})[
Expand Down Expand Up @@ -209,8 +221,9 @@ def _serialize_models(
sources: Dict[str, NormalizedSourceSchema],
exposures: Dict[str, NormalizedExposureSchema],
seeds: Dict[str, NormalizedSeedSchema],
snapshots: Dict[str, NormalizedSnapshotSchema],
) -> Dict[str, dict]:
nodes = dict(**models, **sources, **exposures, **seeds)
nodes = dict(**models, **sources, **exposures, **seeds, **snapshots)
serializable_nodes = dict()
for key in nodes.keys():
serializable_nodes[key] = dict(nodes[key])
Expand Down
2 changes: 1 addition & 1 deletion elementary/monitor/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def decorator(func):
default=None,
help="Filter the report by last_invocation / invocation_id:<INVOCATION_ID> / invocation_time:<INVOCATION_TIME>."
if cmd in (Command.REPORT, Command.SEND_REPORT)
else "DEPRECATED! Please use --filters instead! - Filter the alerts by tag:<TAG> / owner:<OWNER> / model:<MODEL> / "
else "DEPRECATED! Please use --filters instead! - Filter the alerts by tags:<TAGS> / owners:<OWNERS> / models:<MODELS> / "
"statuses:<warn/fail/error/skipped> / resource_types:<model/test>.",
)(func)
return func
Expand Down
26 changes: 26 additions & 0 deletions elementary/monitor/dbt_project/macros/create_temp_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{% macro create_temp_table(database_name, schema_name, table_name, sql_query) %}
{% do return(adapter.dispatch('create_temp_table','elementary_cli')(database_name, schema_name, table_name, sql_query)) %}
{%- endmacro %}

{% macro default__create_temp_table(database_name, schema_name, table_name, sql_query) %}
{% do return(elementary.create_temp_table(database_name, schema_name, table_name, sql_query)) %}
{% endmacro %}

{% macro snowflake__create_temp_table(database_name, schema_name, table_name, sql_query) %}
{% set temp_table_exists, temp_table_relation = dbt.get_or_create_relation(database=database_name,
schema=schema_name,
identifier=table_name,
type='table') -%}
{% set temp_table_relation = elementary.edr_make_temp_relation(temp_table_relation) %}
{% set create_query %}
create or replace temporary table {{ temp_table_relation }}
as (
{{ sql_query }}
);

{% endset %}

{% do elementary.run_query(create_query) %}

{{ return(temp_table_relation) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
'seed' as type
from {{ ref('elementary', 'dbt_seeds') }}
union all
select
unique_id,
depends_on_nodes,
materialization,
'snapshot' as type
from {{ ref('elementary', 'dbt_snapshots') }}
union all
select
unique_id,
depends_on_nodes,
Expand Down
35 changes: 35 additions & 0 deletions elementary/monitor/dbt_project/macros/get_snapshots.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{% macro get_snapshots() %}
{% set dbt_snapshots_relation = ref('elementary', 'dbt_snapshots') %}
{%- if elementary.relation_exists(dbt_snapshots_relation) -%}
{% set get_snapshots_query %}
with dbt_artifacts_snapshots as (
select
name,
case when alias is not null then alias
else name end as table_name,
unique_id,
owner as owners,
tags,
package_name,
description,
meta,
materialization,
database_name,
schema_name,
depends_on_macros,
depends_on_nodes,
original_path as full_path,
path,
patch_path,
generated_at,
unique_key,
incremental_strategy
from {{ dbt_snapshots_relation }}
)
select * from dbt_artifacts_snapshots
{% endset %}

{% set snapshots_agate = run_query(get_snapshots_query) %}
{% do return(elementary.agate_to_dicts(snapshots_agate)) %}
{%- endif -%}
{% endmacro %}
2 changes: 1 addition & 1 deletion elementary/monitor/dbt_project/macros/get_test_results.sql
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
{% set test_results = [] %}

{% set elementary_database, elementary_schema = elementary.get_package_database_and_schema() %}
{% set ordered_test_results_relation = elementary.create_temp_table(elementary_database, elementary_schema, 'ordered_test_results', select_test_results) %}
{% set ordered_test_results_relation = elementary_cli.create_temp_table(elementary_database, elementary_schema, 'ordered_test_results', select_test_results) %}

{% set test_results_agate_sql %}
select * from {{ ordered_test_results_relation }}
Expand Down
11 changes: 11 additions & 0 deletions elementary/monitor/fetchers/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
ModelSchema,
ModelTestCoverage,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.log import get_logger
Expand Down Expand Up @@ -43,6 +44,16 @@ def get_seeds(self) -> List[SeedSchema]:
seeds = [SeedSchema(**seed) for seed in seeds]
return seeds

def get_snapshots(self) -> List[SnapshotSchema]:
run_operation_response = self.dbt_runner.run_operation(
macro_name="elementary_cli.get_snapshots"
)
snapshots = (
json.loads(run_operation_response[0]) if run_operation_response else []
)
snapshots = [SnapshotSchema(**snapshot) for snapshot in snapshots]
return snapshots

def get_models(self, exclude_elementary_models: bool = False) -> List[ModelSchema]:
run_operation_response = self.dbt_runner.run_operation(
macro_name="elementary_cli.get_models",
Expand Down
13 changes: 13 additions & 0 deletions elementary/monitor/fetchers/models/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ def load_meta(cls, meta):
ArtifactSchemaType = TypeVar("ArtifactSchemaType", bound=ArtifactSchema)


class SnapshotSchema(ArtifactSchema):
database_name: str
schema_name: str
depends_on_macros: str
depends_on_nodes: str
path: str
patch_path: Optional[str]
generated_at: str
unique_key: str
incremental_strategy: Optional[str]
table_name: str


class SeedSchema(ArtifactSchema):
database_name: Optional[str] = None
schema_name: str
Expand Down

0 comments on commit e8dbdbb

Please sign in to comment.