Skip to content

Commit

Permalink
Add support for runtime types (#2263)
Browse files Browse the repository at this point in the history
Introduces a new RuntimeProcessorType enumeration class. The 
enumerated names (i.e., constants) are then referenced (as constants) 
within the Runtimes schemas and their instances (this PR 
addresses the migration of existing instances).

Fixes #2258 
Fixes #2261
  • Loading branch information
kevin-bates authored Nov 10, 2021
1 parent 99793f9 commit 25bdb73
Show file tree
Hide file tree
Showing 56 changed files with 417 additions and 288 deletions.
5 changes: 1 addition & 4 deletions elyra/metadata/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ class Metadata(object):
resource = None
display_name = None
schema_name = None
version = None
metadata = {}
reason = None

def __init__(self, **kwargs: Any) -> None:
self.name = kwargs.get('name')
self.display_name = kwargs.get('display_name')
self.schema_name = kwargs.get('schema_name')
self.version = kwargs.get('version', 0)
self.metadata = kwargs.get('metadata', {})
self.resource = kwargs.get('resource')
self.reason = kwargs.get('reason')
Expand Down Expand Up @@ -112,8 +110,7 @@ def from_dict(cls: Type[M], schemaspace: str, metadata_dict: dict) -> M:
def to_dict(self, trim: bool = False) -> dict:
# Exclude resource, and reason only if trim is True since we don't want to persist that information.
# Method prepare_write will be used to remove name prior to writes.
d = dict(name=self.name, display_name=self.display_name, metadata=self.metadata,
schema_name=self.schema_name, version=self.version or 0)
d = dict(name=self.name, display_name=self.display_name, metadata=self.metadata, schema_name=self.schema_name)
if not trim:
if self.resource:
d['resource'] = self.resource
Expand Down
8 changes: 8 additions & 0 deletions elyra/metadata/schemas/airflow.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
"display_name": "Apache Airflow",
"schemaspace": "runtimes",
"schemaspace_id": "130b8e00-de7c-4b32-b553-b4a52824a3b5",
"metadata_class_name": "elyra.pipeline.runtimes_metadata.RuntimesMetadata",
"runtime_type": "APACHE_AIRFLOW",
"uihints": {
"title": "Apache Airflow runtimes",
"icon": "elyra:runtimes",
Expand All @@ -28,6 +30,12 @@
"description": "Additional data specific to this metadata",
"type": "object",
"properties": {
"runtime_type": {
"title": "Runtime Type",
"description": "The runtime associated with this instance",
"type": "string",
"const": "APACHE_AIRFLOW"
},
"description": {
"title": "Description",
"description": "Description of this Apache Airflow configuration",
Expand Down
7 changes: 7 additions & 0 deletions elyra/metadata/schemas/kfp.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"schemaspace": "runtimes",
"schemaspace_id": "130b8e00-de7c-4b32-b553-b4a52824a3b5",
"metadata_class_name": "elyra.pipeline.kfp.kfp_metadata.KfpMetadata",
"runtime_type": "KUBEFLOW_PIPELINES",
"uihints": {
"title": "Kubeflow Pipelines runtimes",
"icon": "elyra:runtimes",
Expand All @@ -29,6 +30,12 @@
"description": "Additional data specific to this metadata",
"type": "object",
"properties": {
"runtime_type": {
"title": "Runtime Type",
"description": "The runtime associated with this instance",
"type": "string",
"const": "KUBEFLOW_PIPELINES"
},
"description": {
"title": "Description",
"description": "Description of this Kubeflow Pipelines configuration",
Expand Down
19 changes: 6 additions & 13 deletions elyra/metadata/schemas/local-directory-catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@
"type": "string",
"minLength": 1
},
"version": {
"title": "Version",
"description": "The version associated with this instance",
"type": "integer",
"const": 1
},
"metadata": {
"description": "Additional data specific to this metadata",
"type": "object",
Expand All @@ -39,12 +33,11 @@
"description": "Description of this Component Catalog",
"type": "string"
},
"runtime": {
"title": "Runtime",
"description": "The runtime associated with this Component Catalog",
"runtime_type": {
"title": "Runtime Type",
"description": "The type of runtime associated with this Component Catalog",
"type": "string",
"$comment": "This enum is dynamically generated to contain the available runtime values.",
"enum": ["{currently-configured-runtimes}"],
"enum": ["KUBEFLOW_PIPELINES", "APACHE_AIRFLOW"],
"uihints": {
"field_type": "dropdown",
"category": "Runtime"
Expand Down Expand Up @@ -86,8 +79,8 @@
}
}
},
"required": ["runtime", "paths"]
"required": ["runtime_type", "paths"]
}
},
"required": ["schema_name", "display_name", "version", "metadata"]
"required": ["schema_name", "display_name", "metadata"]
}
19 changes: 6 additions & 13 deletions elyra/metadata/schemas/local-file-catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@
"type": "string",
"minLength": 1
},
"version": {
"title": "Version",
"description": "The version associated with this instance",
"type": "integer",
"const": 1
},
"metadata": {
"description": "Additional data specific to this metadata",
"type": "object",
Expand All @@ -39,12 +33,11 @@
"description": "Description of this Component Catalog",
"type": "string"
},
"runtime": {
"title": "Runtime",
"description": "The runtime associated with this Component Catalog",
"runtime_type": {
"title": "Runtime Type",
"description": "The type of runtime associated with this Component Catalog",
"type": "string",
"$comment": "This enum is dynamically generated to contain the available runtime values.",
"enum": ["{currently-configured-runtimes}"],
"enum": ["KUBEFLOW_PIPELINES", "APACHE_AIRFLOW"],
"uihints": {
"field_type": "dropdown",
"category": "Runtime"
Expand Down Expand Up @@ -84,8 +77,8 @@
}
}
},
"required": ["runtime", "paths"]
"required": ["runtime_type", "paths"]
}
},
"required": ["schema_name", "display_name", "version", "metadata"]
"required": ["schema_name", "display_name", "metadata"]
}
19 changes: 6 additions & 13 deletions elyra/metadata/schemas/url-catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@
"type": "string",
"minLength": 1
},
"version": {
"title": "Version",
"description": "The version associated with this instance",
"type": "integer",
"const": 1
},
"metadata": {
"description": "Additional data specific to this metadata",
"type": "object",
Expand All @@ -39,12 +33,11 @@
"description": "Description of this Component Catalog",
"type": "string"
},
"runtime": {
"title": "Runtime",
"description": "The runtime associated with this Component Catalog",
"runtime_type": {
"title": "Runtime Type",
"description": "The type of runtime associated with this Component Catalog",
"type": "string",
"$comment": "This enum is dynamically generated to contain the available runtime values.",
"enum": ["{currently-configured-runtimes}"],
"enum": ["KUBEFLOW_PIPELINES", "APACHE_AIRFLOW"],
"uihints": {
"field_type": "dropdown",
"category": "Runtime"
Expand Down Expand Up @@ -77,8 +70,8 @@
}
}
},
"required": ["runtime", "paths"]
"required": ["runtime_type", "paths"]
}
},
"required": ["schema_name", "display_name", "version", "metadata"]
"required": ["schema_name", "display_name", "metadata"]
}
30 changes: 10 additions & 20 deletions elyra/metadata/schemaspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

from typing import Dict

import entrypoints

from elyra.metadata.schema import Schemaspace


Expand Down Expand Up @@ -67,27 +65,19 @@ def __init__(self, *args, **kwargs):
display_name=ComponentRegistries.COMPONENT_REGISTRIES_SCHEMASPACE_DISPLAY_NAME,
description="Schemaspace for instances of Elyra component registries configurations")

# get set of registered runtimes
self._runtime_processor_names = set()
for processor in entrypoints.get_group_all('elyra.pipeline.processors'):
# load the names of the runtime processors (skip 'local')
if processor.name == 'local':
continue
self._runtime_processor_names.add(processor.name)

def filter_schema(self, schema: Dict) -> Dict:
"""Replace contents of Runtimes value with set of runtimes if using templated value."""

# Component-registry requires that `runtime` be a defined property so ensure its existence.
instance_properties = schema.get('properties', {}).get('metadata', {}).get('properties', {})
runtime = instance_properties.get('runtime')
if not runtime:
raise ValueError(f"{ComponentRegistries.COMPONENT_REGISTRIES_SCHEMASPACE_DISPLAY_NAME} schemas are "
f"required to define a 'runtime' (string-valued) property and schema "
f"\'{schema.get('name')}\' does not define 'runtime'.")

if runtime.get('enum') == ["{currently-configured-runtimes}"]:
runtime['enum'] = list(self._runtime_processor_names)
# Component-registry requires that `runtime_type` be a defined property so ensure its existence.
# Since schema 'component-registry' is deprecated, skip its check.
is_deprecated = schema.get('deprecated', False)
if not is_deprecated: # Skip deprecated schemas
instance_properties = schema.get('properties', {}).get('metadata', {}).get('properties', {})
runtime_type = instance_properties.get('runtime_type')
if not runtime_type:
raise ValueError(f"{ComponentRegistries.COMPONENT_REGISTRIES_SCHEMASPACE_DISPLAY_NAME} schemas are "
f"required to define a 'runtime_type' (string-valued) property and schema "
f"\'{schema.get('name')}\' does not define 'runtime_type'.")

# Component catalogs should have an associated 'metadata' class name
# If none is provided, use the ComponentCatalogMetadata class, which implements
Expand Down
7 changes: 4 additions & 3 deletions elyra/pipeline/airflow/component_parser_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
from elyra.pipeline.component import Component
from elyra.pipeline.component import ComponentParameter
from elyra.pipeline.component import ComponentParser
from elyra.pipeline.runtime_type import RuntimeProcessorType


class AirflowComponentParser(ComponentParser):
_component_platform = "airflow"
_file_types = [".py"]
_component_platform: RuntimeProcessorType = RuntimeProcessorType.APACHE_AIRFLOW
_file_types: List[str] = [".py"]

def parse(self, registry_entry: SimpleNamespace) -> Optional[List[Component]]:
components: List[Component] = list()
Expand All @@ -51,7 +52,7 @@ def parse(self, registry_entry: SimpleNamespace) -> Optional[List[Component]]:
catalog_type=registry_entry.catalog_type,
source_identifier=registry_entry.component_identifier,
definition=self.get_class_def_as_string(component_content),
runtime=self.component_platform,
runtime_type=self.component_platform.name,
categories=registry_entry.categories,
properties=component_properties
)
Expand Down
15 changes: 5 additions & 10 deletions elyra/pipeline/airflow/processor_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@
from elyra.pipeline.processor import PipelineProcessor
from elyra.pipeline.processor import PipelineProcessorResponse
from elyra.pipeline.processor import RuntimePipelineProcessor
from elyra.pipeline.runtime_type import RuntimeProcessorType
from elyra.util.git import GithubClient
from elyra.util.path import get_absolute_path


class AirflowPipelineProcessor(RuntimePipelineProcessor):
_type = 'airflow'
_type = RuntimeProcessorType.APACHE_AIRFLOW
_name = 'airflow'

# Provide users with the ability to identify a writable directory in the
# running container where the notebook | script is executed. The location
Expand Down Expand Up @@ -73,10 +75,6 @@ class AirflowPipelineProcessor(RuntimePipelineProcessor):
# Contains mappings from class to import statement for each available Airflow operator
class_import_map = {}

@property
def type(self):
return self._type

def __init__(self, root_dir, **kwargs):
super().__init__(root_dir, component_parser=AirflowComponentParser(), **kwargs)
if not self.class_import_map: # Only need to load once
Expand Down Expand Up @@ -432,16 +430,13 @@ def _process_list_value(self, value: str) -> Union[List, str]:

class AirflowPipelineProcessorResponse(PipelineProcessorResponse):

_type = 'airflow'
_type = RuntimeProcessorType.APACHE_AIRFLOW
_name = 'airflow'

def __init__(self, git_url, run_url, object_storage_url, object_storage_path):
super().__init__(run_url, object_storage_url, object_storage_path)
self.git_url = git_url

@property
def type(self):
return self._type

def to_json(self):
response = super().to_json()
response['git_url'] = self.git_url
Expand Down
17 changes: 10 additions & 7 deletions elyra/pipeline/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

from traitlets.config import LoggingConfigurable

from elyra.pipeline.runtime_type import RuntimeProcessorType


class ComponentParameter(object):
"""
Expand Down Expand Up @@ -122,7 +124,7 @@ def __init__(self,
catalog_type: str,
source_identifier: Any,
definition: Optional[str] = None,
runtime: Optional[str] = None,
runtime_type: Optional[RuntimeProcessorType] = None,
op: Optional[str] = None,
categories: Optional[List[str]] = None,
properties: Optional[List[ComponentParameter]] = None,
Expand All @@ -136,7 +138,7 @@ def __init__(self,
location; one of ['url', filename', 'directory]
:param source_identifier: Source information to help locate the component definition
:param definition: The content of the specification file for this component
:param runtime: The runtime of the component (e.g. KFP or Airflow)
:param runtime_type: The runtime type of the component (e.g. KUBEFLOW_PIPELINES, APACHE_AIRFLOW, etc.)
:param op: The operation name of the component; used by generic components in rendering the palette
:param categories: A list of categories that this component belongs to; used to organize component
in the palette
Expand All @@ -156,7 +158,7 @@ def __init__(self,
self._source_identifier = source_identifier

self._definition = definition
self._runtime = runtime
self._runtime_type = runtime_type
self._op = op
self._categories = categories or []
self._properties = properties
Expand Down Expand Up @@ -209,8 +211,8 @@ def definition(self) -> str:
return self._definition

@property
def runtime(self) -> Optional[str]:
return self._runtime
def runtime_type(self) -> Optional[RuntimeProcessorType]:
return self._runtime_type

@property
def op(self) -> Optional[str]:
Expand Down Expand Up @@ -244,10 +246,11 @@ def _log_warning(msg: str, logger: Optional[Logger] = None):


class ComponentParser(LoggingConfigurable): # ABC
_component_platform = None
_component_platform: RuntimeProcessorType = None
_file_types: List[str] = None

@property
def component_platform(self) -> str:
def component_platform(self) -> RuntimeProcessorType:
return self._component_platform

@property
Expand Down
Loading

0 comments on commit 25bdb73

Please sign in to comment.