Skip to content

Commit 3fb0a0b

Browse files
committed
[components] Remove AutomationConditionModel in favor of raw python object
1 parent f4d3125 commit 3fb0a0b

File tree

9 files changed

+123
-82
lines changed

9 files changed

+123
-82
lines changed

python_modules/libraries/dagster-components/dagster_components/core/component_rendering.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
from typing import AbstractSet, Any, Callable, Mapping, Optional, Sequence, Type, TypeVar, Union
55

66
import dagster._check as check
7+
from dagster._core.definitions.declarative_automation.automation_condition import (
8+
AutomationCondition,
9+
)
710
from dagster._record import record
811
from jinja2.nativetypes import NativeTemplate
912
from pydantic import BaseModel, Field
@@ -17,6 +20,13 @@
1720
CONTEXT_KEY = "required_rendering_scope"
1821

1922

23+
def automation_condition_scope() -> Mapping[str, Any]:
24+
return {
25+
"eager": AutomationCondition.eager,
26+
"on_cron": AutomationCondition.on_cron,
27+
}
28+
29+
2030
def RenderingScope(field: Optional[FieldInfo] = None, *, required_scope: AbstractSet[str]) -> Any:
2131
"""Defines a Pydantic Field that requires a specific scope to be available before rendering.
2232
@@ -50,7 +60,9 @@ class TemplatedValueResolver:
5060

5161
@staticmethod
5262
def default() -> "TemplatedValueResolver":
53-
return TemplatedValueResolver(context={"env": _env})
63+
return TemplatedValueResolver(
64+
context={"env": _env, "automation_condition": automation_condition_scope()}
65+
)
5466

5567
def with_context(self, **additional_context) -> "TemplatedValueResolver":
5668
return TemplatedValueResolver(context={**self.context, **additional_context})

python_modules/libraries/dagster-components/dagster_components/core/dsl_schema.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,34 @@ class OpSpecBaseModel(BaseModel):
2020
tags: Optional[Dict[str, str]] = None
2121

2222

23-
class AutomationConditionModel(BaseModel):
24-
type: str
25-
params: Mapping[str, Any] = {}
23+
class AssetAttributesModel(BaseModel):
24+
key: Optional[str] = None
25+
deps: Sequence[str] = []
26+
description: Optional[str] = None
27+
metadata: Union[str, Mapping[str, Any]] = {}
28+
group_name: Optional[str] = None
29+
skippable: bool = False
30+
code_version: Optional[str] = None
31+
owners: Sequence[str] = []
32+
tags: Union[str, Mapping[str, str]] = {}
33+
automation_condition: Optional[Union[str, AutomationCondition]] = RenderingScope(
34+
Field(None), required_scope={"automation_condition"}
35+
)
36+
37+
class Config:
38+
# required for AutomationCondition
39+
arbitrary_types_allowed = True
2640

27-
def to_automation_condition(self) -> AutomationCondition:
28-
return getattr(AutomationCondition, self.type)(**self.params)
41+
def get_resolved_attributes(self, value_resolver: TemplatedValueResolver) -> Mapping[str, Any]:
42+
return value_resolver.render_obj(self.model_dump(exclude_unset=True))
2943

3044

3145
class AssetSpecProcessor(ABC, BaseModel):
3246
target: str = "*"
33-
description: Optional[str] = None
34-
metadata: Optional[Mapping[str, Any]] = None
35-
group_name: Optional[str] = None
36-
tags: Optional[Mapping[str, str]] = None
37-
automation_condition: Optional[AutomationConditionModel] = None
47+
attributes: AssetAttributesModel
48+
49+
class Config:
50+
arbitrary_types_allowed = True
3851

3952
def _apply_to_spec(self, spec: AssetSpec, attributes: Mapping[str, Any]) -> AssetSpec: ...
4053

@@ -48,10 +61,9 @@ def apply_to_spec(
4861
return spec
4962

5063
# add the original spec to the context and resolve values
51-
attributes = value_resolver.with_context(asset=spec).render_obj(
52-
self.model_dump(exclude={"target", "operation"}, exclude_unset=True)
64+
return self._apply_to_spec(
65+
spec, self.attributes.get_resolved_attributes(value_resolver.with_context(asset=spec))
5366
)
54-
return self._apply_to_spec(spec, attributes)
5567

5668
def apply(self, defs: Definitions, value_resolver: TemplatedValueResolver) -> Definitions:
5769
target_selection = AssetSelection.from_string(self.target, include_sources=True)

python_modules/libraries/dagster-components/dagster_components/lib/pipes_subprocess_script_collection.py

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,24 @@
11
import shutil
22
from pathlib import Path
3-
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence
3+
from typing import TYPE_CHECKING, Mapping, Sequence
44

5-
from dagster._core.definitions.asset_key import AssetKey
65
from dagster._core.definitions.asset_spec import AssetSpec
76
from dagster._core.definitions.assets import AssetsDefinition
87
from dagster._core.definitions.decorators.asset_decorator import multi_asset
98
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
109
from dagster._core.pipes.subprocess import PipesSubprocessClient
11-
from dagster._utils.warnings import suppress_dagster_warnings
1210
from pydantic import BaseModel
1311

1412
from dagster_components.core.component import Component, ComponentLoadContext, component_type
15-
from dagster_components.core.dsl_schema import AutomationConditionModel
13+
from dagster_components.core.dsl_schema import AssetAttributesModel
1614

1715
if TYPE_CHECKING:
1816
from dagster._core.definitions.definitions_class import Definitions
1917

2018

21-
class AssetSpecModel(BaseModel):
22-
key: str
23-
deps: Sequence[str] = []
24-
description: Optional[str] = None
25-
metadata: Mapping[str, Any] = {}
26-
group_name: Optional[str] = None
27-
skippable: bool = False
28-
code_version: Optional[str] = None
29-
owners: Sequence[str] = []
30-
tags: Mapping[str, str] = {}
31-
automation_condition: Optional[AutomationConditionModel] = None
32-
33-
@suppress_dagster_warnings
34-
def to_asset_spec(self) -> AssetSpec:
35-
return AssetSpec(
36-
**{
37-
**self.__dict__,
38-
"key": AssetKey.from_user_string(self.key),
39-
"automation_condition": self.automation_condition.to_automation_condition()
40-
if self.automation_condition
41-
else None,
42-
},
43-
)
44-
45-
4619
class PipesSubprocessScriptParams(BaseModel):
4720
path: str
48-
assets: Sequence[AssetSpecModel]
21+
assets: Sequence[AssetAttributesModel]
4922

5023

5124
class PipesSubprocessScriptCollectionParams(BaseModel):
@@ -78,11 +51,14 @@ def load(cls, context: ComponentLoadContext) -> "PipesSubprocessScriptCollection
7851
script_path = context.path / script.path
7952
if not script_path.exists():
8053
raise FileNotFoundError(f"Script {script_path} does not exist")
81-
path_specs[script_path] = [spec.to_asset_spec() for spec in script.assets]
54+
path_specs[script_path] = [
55+
AssetSpec(**asset.get_resolved_attributes(context.templated_value_resolver))
56+
for asset in script.assets
57+
]
8258

8359
return cls(dirpath=context.path, path_specs=path_specs)
8460

85-
def build_defs(self, load_context: "ComponentLoadContext") -> "Definitions":
61+
def build_defs(self, context: "ComponentLoadContext") -> "Definitions":
8662
from dagster._core.definitions.definitions_class import Definitions
8763

8864
return Definitions(

python_modules/libraries/dagster-components/dagster_components_tests/code_locations/dbt_project_location/components/jaffle_shop_dbt/component.yaml

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,12 @@ params:
55
project_dir: jaffle_shop
66

77
asset_attributes:
8-
- tags:
9-
foo: bar
10-
metadata:
11-
something: 1
12-
automation_condition:
13-
type: on_cron
14-
params:
15-
cron_schedule: "@daily"
16-
- tags:
17-
another: one
8+
- attributes:
9+
tags:
10+
foo: bar
11+
metadata:
12+
something: 1
13+
automation_condition: "{{ automation_condition.on_cron('@daily') }}"
14+
- attributes:
15+
tags:
16+
another: one

python_modules/libraries/dagster-components/dagster_components_tests/code_locations/python_script_location/components/scripts/component.yaml

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,9 @@ params:
55
- path: script_one.py
66
assets:
77
- key: a
8-
automation_condition:
9-
type: eager
8+
automation_condition: "{{ automation_condition.eager() }}"
109
- key: b
11-
automation_condition:
12-
type: on_cron
13-
params:
14-
cron_schedule: "@daily"
10+
automation_condition: "{{ automation_condition.on_cron('@daily') }}"
1511
deps: [up1, up2]
1612
- path: script_two.py
1713
assets:

python_modules/libraries/dagster-components/dagster_components_tests/registry_tests/__init__.py

Whitespace-only changes.

python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_pipes_subprocess_script_collection.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ def test_python_params() -> None:
3434
{
3535
"path": "script_one.py",
3636
"assets": [
37-
{"key": "a", "automation_condition": {"type": "eager"}},
37+
{
38+
"key": "a",
39+
"automation_condition": "{{ automation_condition.eager() }}",
40+
},
3841
{
3942
"key": "b",
40-
"automation_condition": {
41-
"type": "on_cron",
42-
"params": {"cron_schedule": "@daily"},
43-
},
43+
"automation_condition": "{{ automation_condition.on_cron('@daily') }}",
4444
"deps": ["up1", "up2"],
4545
},
4646
],

python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_spec_processing.py

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import pytest
2-
from dagster import AssetKey, AssetSpec, Definitions
2+
from dagster import AssetKey, AssetSpec, AutomationCondition, Definitions
33
from dagster_components.core.dsl_schema import (
44
AssetAttributes,
5+
AssetAttributesModel,
56
MergeAttributes,
67
ReplaceAttributes,
78
TemplatedValueResolver,
@@ -23,7 +24,11 @@ class M(BaseModel):
2324

2425

2526
def test_replace_attributes() -> None:
26-
op = ReplaceAttributes(operation="replace", target="group:g2", tags={"newtag": "newval"})
27+
op = ReplaceAttributes(
28+
operation="replace",
29+
target="group:g2",
30+
attributes=AssetAttributesModel(tags={"newtag": "newval"}),
31+
)
2732

2833
newdefs = op.apply(defs, TemplatedValueResolver.default())
2934
asset_graph = newdefs.get_asset_graph()
@@ -33,7 +38,11 @@ def test_replace_attributes() -> None:
3338

3439

3540
def test_merge_attributes() -> None:
36-
op = MergeAttributes(operation="merge", target="group:g2", tags={"newtag": "newval"})
41+
op = MergeAttributes(
42+
operation="merge",
43+
target="group:g2",
44+
attributes=AssetAttributesModel(tags={"newtag": "newval"}),
45+
)
3746

3847
newdefs = op.apply(defs, TemplatedValueResolver.default())
3948
asset_graph = newdefs.get_asset_graph()
@@ -43,7 +52,9 @@ def test_merge_attributes() -> None:
4352

4453

4554
def test_render_attributes_asset_context() -> None:
46-
op = MergeAttributes(tags={"group_name_tag": "group__{{ asset.group_name }}"})
55+
op = MergeAttributes(
56+
attributes=AssetAttributesModel(tags={"group_name_tag": "group__{{ asset.group_name }}"})
57+
)
4758

4859
newdefs = op.apply(defs, TemplatedValueResolver.default().with_context(foo="theval"))
4960
asset_graph = newdefs.get_asset_graph()
@@ -54,33 +65,68 @@ def test_render_attributes_asset_context() -> None:
5465

5566
def test_render_attributes_custom_context() -> None:
5667
op = ReplaceAttributes(
57-
operation="replace", target="group:g2", tags={"a": "{{ foo }}", "b": "prefix_{{ foo }}"}
68+
operation="replace",
69+
target="group:g2",
70+
attributes=AssetAttributesModel(
71+
tags={"a": "{{ foo }}", "b": "prefix_{{ foo }}"},
72+
metadata="{{ metadata }}",
73+
automation_condition="{{ custom_cron('@daily') }}",
74+
),
5875
)
5976

60-
newdefs = op.apply(defs, TemplatedValueResolver.default().with_context(foo="theval"))
77+
def _custom_cron(s):
78+
return AutomationCondition.cron_tick_passed(s) & ~AutomationCondition.in_progress()
79+
80+
metadata = {"a": 1, "b": "str", "d": 1.23}
81+
newdefs = op.apply(
82+
defs,
83+
TemplatedValueResolver.default().with_context(
84+
foo="theval", metadata=metadata, custom_cron=_custom_cron
85+
),
86+
)
6187
asset_graph = newdefs.get_asset_graph()
6288
assert asset_graph.get(AssetKey("a")).tags == {}
63-
assert asset_graph.get(AssetKey("b")).tags == {"a": "theval", "b": "prefix_theval"}
64-
assert asset_graph.get(AssetKey("c")).tags == {"a": "theval", "b": "prefix_theval"}
89+
assert asset_graph.get(AssetKey("a")).metadata == {}
90+
assert asset_graph.get(AssetKey("a")).automation_condition is None
91+
92+
for k in ["b", "c"]:
93+
node = asset_graph.get(AssetKey(k))
94+
assert node.tags == {"a": "theval", "b": "prefix_theval"}
95+
assert node.metadata == metadata
96+
assert node.automation_condition == _custom_cron("@daily")
6597

6698

6799
@pytest.mark.parametrize(
68100
"python,expected",
69101
[
70102
# default to merge and a * target
71-
({"tags": {"a": "b"}}, MergeAttributes(target="*", tags={"a": "b"})),
72103
(
73-
{"operation": "replace", "tags": {"a": "b"}},
74-
ReplaceAttributes(operation="replace", target="*", tags={"a": "b"}),
104+
{"attributes": {"tags": {"a": "b"}}},
105+
MergeAttributes(target="*", attributes=AssetAttributesModel(tags={"a": "b"})),
106+
),
107+
(
108+
{"operation": "replace", "attributes": {"tags": {"a": "b"}}},
109+
ReplaceAttributes(
110+
operation="replace",
111+
target="*",
112+
attributes=AssetAttributesModel(tags={"a": "b"}),
113+
),
75114
),
76115
# explicit target
77116
(
78-
{"tags": {"a": "b"}, "target": "group:g2"},
79-
MergeAttributes(target="group:g2", tags={"a": "b"}),
117+
{"attributes": {"tags": {"a": "b"}}, "target": "group:g2"},
118+
MergeAttributes(
119+
target="group:g2",
120+
attributes=AssetAttributesModel(tags={"a": "b"}),
121+
),
80122
),
81123
(
82-
{"operation": "replace", "tags": {"a": "b"}, "target": "group:g2"},
83-
ReplaceAttributes(operation="replace", target="group:g2", tags={"a": "b"}),
124+
{"operation": "replace", "attributes": {"tags": {"a": "b"}}, "target": "group:g2"},
125+
ReplaceAttributes(
126+
operation="replace",
127+
target="group:g2",
128+
attributes=AssetAttributesModel(tags={"a": "b"}),
129+
),
84130
),
85131
],
86132
)

0 commit comments

Comments
 (0)