Skip to content

Commit 69250b9

Browse files
committed
[components] Add SlingReplicationCollection component
1 parent 8ccb2f0 commit 69250b9

File tree

7 files changed

+196
-102
lines changed

7 files changed

+196
-102
lines changed

python_modules/libraries/dagster-components/dagster_components/core/component_defs_builder.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,13 @@ def defs_from_components(
119119
from dagster._core.definitions.definitions_class import Definitions
120120

121121
return Definitions.merge(
122-
*[*[c.build_defs(context) for c in components], Definitions(resources=resources)]
122+
*[
123+
*[
124+
c.build_defs(context.with_rendering_scope(c.get_rendering_scope()))
125+
for c in components
126+
],
127+
Definitions(resources=resources),
128+
]
123129
)
124130

125131

python_modules/libraries/dagster-components/dagster_components/lib/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
from dagster_components.lib.dbt_project import DbtProjectComponent as DbtProjectComponent
88

99
if _has_dagster_embedded_elt:
10-
from dagster_components.lib.sling_replication import (
11-
SlingReplicationComponent as SlingReplicationComponent,
10+
from dagster_components.lib.sling_replication_collection import (
11+
SlingReplicationCollectionComponent as SlingReplicationCollectionComponent,
1212
)
1313

1414
from dagster_components.lib.definitions_component import (

python_modules/libraries/dagster-components/dagster_components/lib/sling_replication.py

Lines changed: 0 additions & 79 deletions
This file was deleted.
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
from pathlib import Path
2+
from typing import Any, Iterator, Mapping, Optional, Sequence, Union
3+
4+
from dagster._core.definitions.asset_key import AssetKey
5+
from dagster._core.definitions.assets import AssetsDefinition
6+
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
7+
from dagster._core.definitions.definitions_class import Definitions
8+
from dagster._core.definitions.events import AssetMaterialization
9+
from dagster._core.definitions.result import MaterializeResult
10+
from dagster_embedded_elt.sling import DagsterSlingTranslator, SlingResource, sling_assets
11+
from dagster_embedded_elt.sling.resources import AssetExecutionContext
12+
from pydantic import BaseModel
13+
from typing_extensions import Self
14+
15+
from dagster_components import Component, ComponentLoadContext
16+
from dagster_components.core.component import (
17+
ComponentGenerateRequest,
18+
TemplatedValueRenderer,
19+
component_type,
20+
)
21+
from dagster_components.core.dsl_schema import (
22+
AssetAttributes,
23+
AssetAttributesModel,
24+
AssetSpecProcessor,
25+
OpSpecBaseModel,
26+
)
27+
from dagster_components.generate import generate_component_yaml
28+
29+
30+
class SlingReplicationParams(BaseModel):
31+
path: str
32+
op: Optional[OpSpecBaseModel] = None
33+
translator: Optional[AssetAttributesModel] = None
34+
35+
36+
class SlingReplicationCollectionParams(BaseModel):
37+
sling: Optional[SlingResource] = None
38+
replications: Sequence[SlingReplicationParams]
39+
asset_attributes: Optional[AssetAttributes] = None
40+
41+
42+
class SlingReplicationTranslator(DagsterSlingTranslator):
43+
def __init__(
44+
self,
45+
*,
46+
params: Optional[AssetAttributesModel],
47+
value_renderer: TemplatedValueRenderer,
48+
):
49+
self.params = params or AssetAttributesModel()
50+
self.value_renderer = value_renderer
51+
52+
def _get_rendered_attribute(
53+
self, attribute: str, stream_definition: Mapping[str, Any], default_method
54+
) -> Any:
55+
renderer = self.value_renderer.with_context(stream_definition=stream_definition)
56+
rendered_attribute = self.params.render_properties(renderer).get(attribute)
57+
return (
58+
rendered_attribute
59+
if rendered_attribute is not None
60+
else default_method(stream_definition)
61+
)
62+
63+
def get_asset_key(self, stream_definition: Mapping[str, Any]) -> AssetKey:
64+
return self._get_rendered_attribute("key", stream_definition, super().get_asset_key)
65+
66+
def get_group_name(self, stream_definition: Mapping[str, Any]) -> Optional[str]:
67+
return self._get_rendered_attribute("group_name", stream_definition, super().get_group_name)
68+
69+
def get_tags(self, stream_definition: Mapping[str, Any]) -> Mapping[str, str]:
70+
return self._get_rendered_attribute("tags", stream_definition, super().get_tags)
71+
72+
def get_metadata(self, stream_definition: Mapping[str, Any]) -> Mapping[str, Any]:
73+
return self._get_rendered_attribute("metadata", stream_definition, super().get_metadata)
74+
75+
def get_auto_materialize_policy(
76+
self, stream_definition: Mapping[str, Any]
77+
) -> Optional[AutoMaterializePolicy]:
78+
return self._get_rendered_attribute(
79+
"auto_materialize_policy", stream_definition, super().get_auto_materialize_policy
80+
)
81+
82+
83+
@component_type(name="sling_replication_collection")
84+
class SlingReplicationCollectionComponent(Component):
85+
params_schema = SlingReplicationCollectionParams
86+
87+
def __init__(
88+
self,
89+
dirpath: Path,
90+
resource: SlingResource,
91+
sling_replications: Sequence[SlingReplicationParams],
92+
asset_attributes: Sequence[AssetSpecProcessor],
93+
):
94+
self.dirpath = dirpath
95+
self.resource = resource
96+
self.sling_replications = sling_replications
97+
self.asset_attributes = asset_attributes
98+
99+
@classmethod
100+
def load(cls, context: ComponentLoadContext) -> Self:
101+
loaded_params = context.load_params(cls.params_schema)
102+
return cls(
103+
dirpath=context.path,
104+
resource=loaded_params.sling or SlingResource(),
105+
sling_replications=loaded_params.replications,
106+
asset_attributes=loaded_params.asset_attributes or [],
107+
)
108+
109+
def build_replication_asset(
110+
self, context: ComponentLoadContext, replication: SlingReplicationParams
111+
) -> AssetsDefinition:
112+
@sling_assets(
113+
name=replication.op.name if replication.op else Path(replication.path).stem,
114+
op_tags=replication.op.tags if replication.op else {},
115+
replication_config=self.dirpath / replication.path,
116+
dagster_sling_translator=SlingReplicationTranslator(
117+
params=replication.translator,
118+
value_renderer=context.templated_value_renderer,
119+
),
120+
)
121+
def _asset(context: AssetExecutionContext):
122+
yield from self.execute(context=context, sling=self.resource)
123+
124+
return _asset
125+
126+
def execute(
127+
self, context: AssetExecutionContext, sling: SlingResource
128+
) -> Iterator[Union[AssetMaterialization, MaterializeResult]]:
129+
yield from sling.replicate(context=context)
130+
131+
def build_defs(self, context: ComponentLoadContext) -> Definitions:
132+
defs = Definitions(
133+
assets=[
134+
self.build_replication_asset(context, replication)
135+
for replication in self.sling_replications
136+
],
137+
)
138+
for transform in self.asset_attributes:
139+
defs = transform.apply(defs, context.templated_value_renderer)
140+
return defs
141+
142+
@classmethod
143+
def generate_files(cls, request: ComponentGenerateRequest, params: Any) -> None:
144+
generate_component_yaml(request, params)

python_modules/libraries/dagster-components/dagster_components_tests/code_locations/sling_location/components/ingest/component.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
type: dagster_components.sling_replication
1+
type: dagster_components.sling_replication_collection
22

33
params:
4+
replications:
5+
- path: ./replication.yaml
46
sling:
57
connections:
68
- name: DUCKDB

python_modules/libraries/dagster-components/dagster_components_tests/integration_tests/test_sling_integration_test.py

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@
1111
from dagster._core.definitions.result import MaterializeResult
1212
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
1313
from dagster._utils.env import environ
14+
from dagster_components import component_type
1415
from dagster_components.core.component_decl_builder import ComponentFileModel
1516
from dagster_components.core.component_defs_builder import (
1617
YamlComponentDecl,
1718
build_components_from_component_folder,
1819
)
19-
from dagster_components.lib.sling_replication import SlingReplicationComponent, component_type
20+
from dagster_components.lib.sling_replication_collection import SlingReplicationCollectionComponent
2021
from dagster_embedded_elt.sling import SlingResource
2122

2223
from dagster_components_tests.utils import assert_assets, get_asset_keys, script_load_context
@@ -70,40 +71,52 @@ def test_python_params(sling_path: Path) -> None:
7071
path=sling_path / COMPONENT_RELPATH,
7172
component_file_model=ComponentFileModel(
7273
type="sling_replication",
73-
params={"sling": {}},
74+
params={"sling": {}, "replications": [{"path": "./replication.yaml"}]},
7475
),
7576
)
7677
context = script_load_context(decl_node)
77-
component = SlingReplicationComponent.load(context)
78-
assert component.op_spec is None
78+
component = SlingReplicationCollectionComponent.load(context)
79+
80+
replications = component.sling_replications
81+
assert len(replications) == 1
82+
op_spec = replications[0].op
83+
assert op_spec is None
7984
assert get_asset_keys(component) == {
8085
AssetKey("input_csv"),
8186
AssetKey("input_duckdb"),
8287
}
8388

8489
defs = component.build_defs(context)
8590
# inherited from directory name
86-
assert defs.get_assets_def("input_duckdb").op.name == "ingest"
91+
assert defs.get_assets_def("input_duckdb").op.name == "replication"
8792

8893

8994
def test_python_params_op_name(sling_path: Path) -> None:
9095
decl_node = YamlComponentDecl(
9196
path=sling_path / COMPONENT_RELPATH,
9297
component_file_model=ComponentFileModel(
9398
type="sling_replication",
94-
params={"sling": {}, "op": {"name": "my_op"}},
99+
params={
100+
"sling": {},
101+
"replications": [
102+
{"path": "./replication.yaml", "op": {"name": "my_op"}},
103+
],
104+
},
95105
),
96106
)
97107
context = script_load_context(decl_node)
98-
component = SlingReplicationComponent.load(context=context)
99-
assert component.op_spec
100-
assert component.op_spec.name == "my_op"
108+
component = SlingReplicationCollectionComponent.load(context=context)
109+
110+
replications = component.sling_replications
111+
assert len(replications) == 1
112+
op_spec = replications[0].op
113+
assert op_spec
114+
assert op_spec.name == "my_op"
101115
defs = component.build_defs(context)
102116
assert defs.get_asset_graph().get_all_asset_keys() == {
103117
AssetKey("input_csv"),
104118
AssetKey("input_duckdb"),
105119
}
106-
107120
assert defs.get_assets_def("input_duckdb").op.name == "my_op"
108121

109122

@@ -112,13 +125,21 @@ def test_python_params_op_tags(sling_path: Path) -> None:
112125
path=sling_path / COMPONENT_RELPATH,
113126
component_file_model=ComponentFileModel(
114127
type="sling_replication",
115-
params={"sling": {}, "op": {"tags": {"tag1": "value1"}}},
128+
params={
129+
"sling": {},
130+
"replications": [
131+
{"path": "./replication.yaml", "op": {"tags": {"tag1": "value1"}}},
132+
],
133+
},
116134
),
117135
)
118136
context = script_load_context(decl_node)
119-
component = SlingReplicationComponent.load(context=context)
120-
assert component.op_spec
121-
assert component.op_spec.tags == {"tag1": "value1"}
137+
component = SlingReplicationCollectionComponent.load(context=context)
138+
replications = component.sling_replications
139+
assert len(replications) == 1
140+
op_spec = replications[0].op
141+
assert op_spec
142+
assert op_spec.tags == {"tag1": "value1"}
122143
defs = component.build_defs(context)
123144
assert defs.get_assets_def("input_duckdb").op.tags == {"tag1": "value1"}
124145

@@ -138,7 +159,7 @@ def test_load_from_path(sling_path: Path) -> None:
138159

139160
def test_sling_subclass() -> None:
140161
@component_type(name="debug_sling_replication")
141-
class DebugSlingReplicationComponent(SlingReplicationComponent):
162+
class DebugSlingReplicationComponent(SlingReplicationCollectionComponent):
142163
def execute(
143164
self, context: AssetExecutionContext, sling: SlingResource
144165
) -> Iterator[Union[AssetMaterialization, MaterializeResult]]:
@@ -148,7 +169,7 @@ def execute(
148169
path=STUB_LOCATION_PATH / COMPONENT_RELPATH,
149170
component_file_model=ComponentFileModel(
150171
type="debug_sling_replication",
151-
params={"sling": {}},
172+
params={"sling": {}, "replications": [{"path": "./replication.yaml"}]},
152173
),
153174
)
154175
component_inst = DebugSlingReplicationComponent.load(

0 commit comments

Comments
 (0)