Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototyping secrets requirements and demonstrating loading behavior #26783

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pathlib import Path
from types import ModuleType
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Dict,
Expand All @@ -33,6 +34,9 @@

from dagster_components.core.component_rendering import TemplatedValueResolver

if TYPE_CHECKING:
from dagster_components.core.component_decl_builder import ComponentFileModel


class ComponentDeclNode: ...

Expand Down Expand Up @@ -260,6 +264,14 @@ def load_params(self, params_schema: Type[T]) -> T:
)
return TypeAdapter(params_schema).validate_python(preprocessed_params)

@property
def component_file_model(self) -> "ComponentFileModel":
from dagster_components.core.component_decl_builder import YamlComponentDecl

if not isinstance(self.decl_node, YamlComponentDecl):
check.failed(f"Unsupported decl_node type {type(self.decl_node)}")
return self.decl_node.component_file_model


COMPONENT_REGISTRY_KEY_ATTR = "__dagster_component_registry_key"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import Any, Mapping, Optional, Sequence, Union
from typing import Any, List, Mapping, Optional, Sequence, Union

from dagster._record import record
from dagster._utils.pydantic_yaml import parse_yaml_file_to_pydantic
Expand All @@ -8,9 +8,14 @@
from dagster_components.core.component import ComponentDeclNode


class RequiresModel(BaseModel):
env: Optional[List[str]] = None


class ComponentFileModel(BaseModel):
type: str
params: Optional[Mapping[str, Any]] = None
requires: Optional[RequiresModel] = None


@record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,16 @@ def component_type_from_yaml_decl(
for py_file in decl_node.path.glob("*.py"):
module_name = py_file.stem

module = load_module_from_path(module_name, decl_node.path / f"{module_name}.py")
# module = load_python_file(
# python_file=str(decl_node.path / f"{module_name}.py"),
# working_directory=str(decl_node.path.parent),
# )
py_path = str(decl_node.path / f"{module_name}.py")
module = load_module_from_path(module_name, py_path)

# print("**************")
# print(f"Using {module_name} at {py_path} found module: {module} ")
# print("**************")

for _name, obj in inspect.getmembers(module, inspect.isclass):
assert isinstance(obj, Type)
Expand All @@ -89,6 +98,32 @@ def build_components_from_component_folder(
return load_components_from_context(context.for_decl_node(component_folder))


def loading_context_for_component_path(
path: Path,
registry: ComponentTypeRegistry,
resources: Mapping[str, object],
) -> ComponentLoadContext:
decl_node = path_to_decl_node(path=path)
if not decl_node:
raise Exception(f"No component found at path {path}")

return ComponentLoadContext(
resources=resources,
registry=registry,
decl_node=decl_node,
templated_value_resolver=TemplatedValueResolver.default(),
)


def build_components_from_component_path(
path: Path,
registry: ComponentTypeRegistry,
resources: Mapping[str, object],
) -> Sequence[Component]:
context = loading_context_for_component_path(path, registry, resources)
return load_components_from_context(context)


def build_defs_from_component_path(
path: Path,
registry: ComponentTypeRegistry,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import AbstractSet

from dagster import Definitions, asset
from dagster_components import Component, ComponentLoadContext, component_type
from pydantic import BaseModel


class FootranParams(BaseModel):
target_address: str


@component_type(name="footran")
class FootranComponent(Component):
def __init__(self, target_address: str):
self.target_address = target_address

params_schema = FootranParams

@classmethod
def load(cls, context: ComponentLoadContext):
loaded_params = context.load_params(cls.params_schema)
return cls(target_address=loaded_params.target_address)

def build_defs(self, context: ComponentLoadContext):
@asset
def an_asset() -> None: ...

return Definitions()

def required_env_vars(self, context: ComponentLoadContext) -> AbstractSet[str]:
requires = context.component_file_model.requires
return set(requires.env) if requires and requires.env else set()
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
type: .footran

requires:
env:
- FOOTRAN_API_KEY
- FOOTRAN_API_SECRET

params:
target_address: some_address
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from pathlib import Path
from typing import List, Tuple, cast

from dagster_components.core.component import ComponentTypeRegistry
from dagster_components.core.component_defs_builder import (
build_components_from_component_path,
loading_context_for_component_path,
)

from dagster_components_tests.scope_tests.footran_component.component import FootranComponent


def test_custom_scope() -> None:
components = build_components_from_component_path(
path=Path(__file__).parent / "footran_component",
registry=ComponentTypeRegistry.empty(),
resources={},
)

assert len(components) == 1
component = components[0]

# This fails due to lack of proper module caching with the right package name
# assert isinstance(component, FootranComponent)
assert type(component).__name__ == "FootranComponent"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smackesey i want to discuss this at tmrw's standup

component = cast(FootranComponent, component)

assert component.target_address == "some_address"
load_context = loading_context_for_component_path(
path=Path(__file__).parent / "footran_component",
registry=ComponentTypeRegistry.empty(),
resources={},
)
assert component.required_env_vars(load_context) == {"FOOTRAN_API_KEY", "FOOTRAN_API_SECRET"}


import os


def find_package_root(file_path: str) -> Tuple[str, List[str]]:
"""Starting from the directory containing 'file_path', walk upward
until we find a directory that is NOT a Python package
(i.e., no __init__.py).

Returns the absolute path of the 'package root' directory (which
itself may have an __init__.py if it's still part of a package),
plus the list of path components under that root.

Example:
If file_path = "/home/user/project/my_pkg/sub_pkg/module.py"
and /home/user/project/my_pkg/__init__.py exists,
and /home/user/project/__init__.py does NOT exist,
then the package root is "/home/user/project"
and the sub-path is ["my_pkg", "sub_pkg", "module.py"].
"""
file_path = os.path.abspath(file_path)
if not os.path.isfile(file_path):
raise ValueError(f"Path {file_path} is not a file.")

dir_path = os.path.dirname(file_path)
filename = os.path.basename(file_path)

# We will accumulate path components in reverse, then reverse them at the end
components = [Path(filename).stem]

# Traverse upward while the directory contains an __init__.py,
# meaning it is part of a package.
while True:
init_py = os.path.join(dir_path, "__init__.py")
parent_dir = os.path.dirname(dir_path)

if not os.path.isfile(init_py):
# The current dir_path is not a python package directory
# so break: we've found the package "root" (which might
# itself still have an __init__.py, or might be outside any package).
break

# If we do have an __init__.py, then the directory is part of the package,
# so add that directory name to our components and go one level up.
components.append(os.path.basename(dir_path))

# Move upward in the filesystem
dir_path = parent_dir

# If we reach the filesystem root, stop
if dir_path == "/" or not dir_path:
break

components.reverse() # Now it's from top-level package -> ... -> filename
return dir_path, components


def package_name(file_path: str) -> str:
_, components = find_package_root(file_path)
return ".".join(components)


def test_get_full_module_name() -> None:
comp_path = Path(__file__).parent / "footran_component" / "component.py"
assert comp_path.exists()

dir_path, components = find_package_root(str(comp_path))

# import code

# code.interact(local=locals())

assert (
package_name(str(comp_path))
== "dagster_components_tests.scope_tests.footran_component.component"
)