From 348a86d12c0c3990c7100d92c514456a058d298a Mon Sep 17 00:00:00 2001 From: Markus Binsteiner Date: Mon, 31 May 2021 14:18:23 +0200 Subject: [PATCH] refactor: support type conversions & metadata extranction via modules --- .pre-commit-config.yaml | 2 +- dev/dev.ipynb | 75 +++++++ docs/modules/module_development.ipynb | 52 +++++ docs/usage.md | 8 +- onboarding.folder_to_table.json | 20 ++ setup.cfg | 6 + src/kiara/data/registry.py | 27 +++ src/kiara/data/types/__init__.py | 70 ++---- src/kiara/data/types/graphs.py | 27 +++ src/kiara/data/types/type_mgmt.py | 244 +++++++++++++++++++++ src/kiara/data/values.py | 86 ++++++-- src/kiara/interfaces/cli/__init__.py | 301 ++++++++++++++------------ src/kiara/kiara.py | 203 ++++++----------- src/kiara/metadata/__init__.py | 0 src/kiara/module.py | 40 +++- src/kiara/module_config.py | 85 +++++--- src/kiara/modules/metadata.py | 154 +++++++++++++ src/kiara/modules/type_conversion.py | 128 +++++++++++ src/kiara/pipeline/module.py | 19 +- src/kiara/pipeline/pipeline.py | 29 ++- src/kiara/pipeline/structure.py | 158 ++++++++------ src/kiara/pipeline/utils.py | 211 ++++++++++++++++++ src/kiara/profiles.py | 197 +++++++++++++++++ src/kiara/utils/__init__.py | 25 ++- src/kiara/utils/class_loading.py | 1 + src/kiara/utils/output.py | 178 +++++++++++++++ src/kiara/utils/pretty_print.py | 75 ------- src/kiara/workflow.py | 5 +- 28 files changed, 1889 insertions(+), 537 deletions(-) create mode 100644 dev/dev.ipynb create mode 100644 docs/modules/module_development.ipynb create mode 100644 onboarding.folder_to_table.json create mode 100644 src/kiara/data/types/graphs.py create mode 100644 src/kiara/data/types/type_mgmt.py delete mode 100644 src/kiara/metadata/__init__.py create mode 100644 src/kiara/modules/metadata.py create mode 100644 src/kiara/modules/type_conversion.py create mode 100644 src/kiara/pipeline/utils.py create mode 100644 src/kiara/profiles.py create mode 100644 src/kiara/utils/output.py delete mode 100644 src/kiara/utils/pretty_print.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5ef648fea..7f6741380 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -44,7 +44,7 @@ repos: files: "^src/" pass_filenames: true args: ["--config-file", "setup.cfg"] - additional_dependencies: [pydantic>=1.8.0, rich>=10.0.0, ruamel.yaml, anyio>=3.0.0, pyzmq>=22.0.3] + additional_dependencies: [pydantic>=1.8.0, rich>=10.0.0, ruamel.yaml, anyio>=3.0.0, pyzmq>=22.0.3, bidict] - repo: git://github.com/pre-commit/pre-commit-hooks diff --git a/dev/dev.ipynb b/dev/dev.ipynb new file mode 100644 index 000000000..4cc9d869e --- /dev/null +++ b/dev/dev.ipynb @@ -0,0 +1,75 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 8, + "metadata": { + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "The autoreload extension is already loaded. To reload it, use:\n", + " %reload_ext autoreload\n" + ] + }, + { + "data": { + "image/png": "\n", + "text/plain": "" + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2\n", + "\n", + "from kiara import Kiara\n", + "from rich.jupyter import print\n", + "\n", + "from kiara.utils.jupyter import graph_to_image, save_image\n", + "\n", + "kiara: Kiara = Kiara.instance()\n", + "\n", + "# print(kiara.available_module_types)\n", + "\n", + "# # print('---')\n", + "workflow = kiara.create_workflow(\"language.topic_modeling\")\n", + "#\n", + "# kiara.explain(workflow.steps)\n", + "#\n", + "save_image(workflow.structure.data_flow_graph_simple, path=\"/tmp/data_flow.png\")\n", + "\n", + "graph_to_image(workflow.structure.data_flow_graph_simple)\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.2" + } + }, + "nbformat": 4, + "nbformat_minor": 1 +} diff --git a/docs/modules/module_development.ipynb b/docs/modules/module_development.ipynb new file mode 100644 index 000000000..3c623465f --- /dev/null +++ b/docs/modules/module_development.ipynb @@ -0,0 +1,52 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "collapsed": true, + "pycharm": { + "name": "#%% md\n" + } + }, + "source": [ + "# Developing *kiara* modules\n", + "\n", + "This page will show you how to create your own *kiara* modules. It's early days still, so the way this is done\n", + "currently is not as pythonic, user-friendly and easy as I hope it will eventually be. I do hope it is easy enough\n", + "for everyone with a bit of Python experience to be able to create their own, simple modules, though.\n", + "\n", + "## (Optional) Create a project structure\n", + "\n", + "*kiara* modules live in Python packages. Although technically it would be possible to just use Python files,\n", + "this is not supported for now, possibly ever. The main reason for that is that it is very important to be\n", + "able to pinpoint the exact version of a module that was used to create/transform some data. Python packages\n", + "can be versioned (as well as their dependencies) relatively easy, in a generic way. Simple scripts can't,\n", + "at least not to the extend we need.\n", + "\n", + "Creating Python packages correctly is not trivial, which is why I created a [project template](https://github.com/DHARPA-Project/kiara_modules.project_template) that includes\n", + "all the necessary bits and integrations to make this as painless as possible." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/docs/usage.md b/docs/usage.md index a8f257958..552fe665f 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -23,11 +23,11 @@ Display information about a modules, like description, configuration schema, sou #### ...for a core module -{{ cli("kiara", "module", "describe-type", "and") }} +{{ cli("kiara", "module", "explain-type", "and") }} #### ...for a pipeline module -{{ cli("kiara", "module", "describe-type", "nand") }} +{{ cli("kiara", "module", "explain-type", "nand") }} ### get properties of an instantiated module @@ -38,11 +38,11 @@ This command also can take module configuration, in different forms. This will b #### ...for a core module -{{ cli("kiara", "module", "describe-instance", "and") }} +{{ cli("kiara", "module", "explain-instance", "and") }} #### ...for a pipeline module -{{ cli("kiara", "module", "describe-instance", "nand") }} +{{ cli("kiara", "module", "explain-instance", "nand") }} ## pipeline-specific sub-commands diff --git a/onboarding.folder_to_table.json b/onboarding.folder_to_table.json new file mode 100644 index 000000000..5dafaa6f9 --- /dev/null +++ b/onboarding.folder_to_table.json @@ -0,0 +1,20 @@ +{ + "table": { + "id": [ + 0, + 1 + ], + "rel_path": [ + "csv_1.csv", + "csv_2.csv" + ], + "file_name": [ + "csv_1.csv", + "csv_2.csv" + ], + "content": [ + "a,b,c\nd,e,f\n", + "a,b,c\nd,e,f\n" + ] + } +} \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index dc5589c86..fa80a2e1f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -25,6 +25,7 @@ packages = find_namespace: install_requires = anyio>=3.0.0,<4.0.0 appdirs>=1.4.4,<2.0.0 + bidict>=0.21.0 deepdiff>=5.2.0,<6.0.0 filetype>=1.0.0,<2.0.0 networkx>=2.5,<3.0 @@ -32,6 +33,7 @@ install_requires = pyarrow>=4.0.0,<5.0.0 pydantic>=1.7.0,<2.0.0 python-dateutil>=2.8.0 + python-slugify>=5.0.0 pyyaml>=5.4.0,<6.0.0 pyzmq>=22.0.0,<23.0.0 rich>=9.0.0,<11.0.0 @@ -52,6 +54,7 @@ console_scripts = kiara = kiara.interfaces.cli:cli kiara.modules = pipeline = kiara.pipeline.module:PipelineModule + metadata.extract_python_class = kiara.modules.metadata:ExtractPythonClass [options.extras_require] cli = @@ -210,6 +213,9 @@ ignore_missing_imports = true [mypy-filetype] ignore_missing_imports = true +[mypy-kiara_modules.*] +ignore_missing_imports = true + [mypy-IPython.*] ignore_missing_imports = true diff --git a/src/kiara/data/registry.py b/src/kiara/data/registry.py index 8d5c4a938..99b3ef917 100644 --- a/src/kiara/data/registry.py +++ b/src/kiara/data/registry.py @@ -362,6 +362,33 @@ def get_value_data(self, item: typing.Union[str, KiaraValue]) -> typing.Any: return value + def get_value_metadata( + self, + value: typing.Union[KiaraValue, str], + *metadata_keys: str, + also_return_schema: bool = False, + ): + + value = self.get_value_item(value) + result = {} + missing = set() + for metadata_key in metadata_keys: + if metadata_keys in value.metadata.keys(): + result[metadata_key] = value.metadata[metadata_key]["metadata"] + else: + missing.add(metadata_key) + + if not missing: + return result + + _md = self._kiara.get_value_metadata(value, metadata_keys=missing) + result.update(_md) + + if also_return_schema: + return result + else: + return {k: v["metadata"] for k, v in result.items()} + def get_value_hash( self, item: typing.Union[str, KiaraValue] ) -> typing.Union[int, ValueHashMarker]: diff --git a/src/kiara/data/types/__init__.py b/src/kiara/data/types/__init__.py index 36e3bf3ba..019738bf2 100644 --- a/src/kiara/data/types/__init__.py +++ b/src/kiara/data/types/__init__.py @@ -20,18 +20,17 @@ """ import datetime -import networkx -import networkx as nx -import pyarrow import typing from dateutil import parser from deepdiff import DeepHash from enum import Enum -from networkx import DiGraph from rich.console import Console, ConsoleOptions, RenderResult from kiara.utils import camel_case_to_snake_case +if typing.TYPE_CHECKING: + from kiara.data.values import Value + class ValueHashMarker(Enum): @@ -93,9 +92,19 @@ def get_type_transformation_configs( The name of the transformation is the key of the result dictionary, the configuration is a module configuration (dictionary wth 'module_type' and optional 'module_config', 'input_name' and 'output_name' keys). """ - return { - "to_string": {"module_type": "strings.pretty_print", "input_name": "item"} - } + return {"string": {"module_type": "strings.pretty_print", "input_name": "item"}} + + @classmethod + def check_value_type(cls, value: "Value") -> typing.Optional["ValueType"]: + return cls.check_data_type(value.get_value_data()) + + @classmethod + def check_data_type(cls, data: typing.Any) -> typing.Optional["ValueType"]: + return None + + @classmethod + def relevant_python_types(cls) -> typing.Optional[typing.Iterable[typing.Type]]: + return None def __init__(self, **type_config: typing.Any): @@ -349,50 +358,3 @@ def parse_value(self, v: typing.Any) -> typing.Any: def validate(cls, value: typing.Any): assert isinstance(value, datetime.datetime) - - -class TableType(ValueType): - def validate(cls, value: typing.Any) -> None: - assert isinstance(value, pyarrow.Table) - - def extract_type_metadata( - cls, value: typing.Any - ) -> typing.Mapping[str, typing.Any]: - - table: pyarrow.Table = value - table_schema = {} - for name in table.schema.names: - field = table.schema.field(name) - md = field.metadata - if not md: - md = {} - _type = field.type - _d = {"item_type": str(_type), "arrow_type_id": _type.id, "metadata": md} - table_schema[name] = _d - - return { - "column_names": table.column_names, - "schema": table_schema, - "rows": table.num_rows, - "size_in_bytes": table.nbytes, - } - - -class NetworkGraphType(ValueType): - def validate(cls, value: typing.Any) -> typing.Any: - - if not isinstance(value, networkx.Graph): - raise ValueError(f"Invalid type '{type(value)}' for graph: {value}") - return value - - def extract_type_metadata( - cls, value: typing.Any - ) -> typing.Mapping[str, typing.Any]: - - graph: nx.Graph = value - return { - "directed": isinstance(value, DiGraph), - "number_of_nodes": len(graph.nodes), - "number_of_edges": len(graph.edges), - "density": nx.density(graph), - } diff --git a/src/kiara/data/types/graphs.py b/src/kiara/data/types/graphs.py new file mode 100644 index 000000000..3f3fa84c2 --- /dev/null +++ b/src/kiara/data/types/graphs.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- +import networkx +import networkx as nx +import typing +from networkx import DiGraph + +from kiara.data.types import ValueType + + +class NetworkGraphType(ValueType): + def validate(cls, value: typing.Any) -> typing.Any: + + if not isinstance(value, networkx.Graph): + raise ValueError(f"Invalid type '{type(value)}' for graph: {value}") + return value + + def extract_type_metadata( + cls, value: typing.Any + ) -> typing.Mapping[str, typing.Any]: + + graph: nx.Graph = value + return { + "directed": isinstance(value, DiGraph), + "number_of_nodes": len(graph.nodes), + "number_of_edges": len(graph.edges), + "density": nx.density(graph), + } diff --git a/src/kiara/data/types/type_mgmt.py b/src/kiara/data/types/type_mgmt.py new file mode 100644 index 000000000..a6a435722 --- /dev/null +++ b/src/kiara/data/types/type_mgmt.py @@ -0,0 +1,244 @@ +# -*- coding: utf-8 -*- +import typing + +from kiara.data import Value +from kiara.data.types import ValueType + +if typing.TYPE_CHECKING: + from kiara.kiara import Kiara + + +class TypeMgmt(object): + def __init__(self, kiara: "Kiara"): + + self._kiara: Kiara = kiara + self._value_types: typing.Optional[ + typing.Dict[str, typing.Type[ValueType]] + ] = None + self._value_type_transformations: typing.Dict[ + str, typing.Dict[str, typing.Any] + ] = {} + self._registered_python_classes: typing.Dict[typing.Type, typing.List[str]] = None # type: ignore + + @property + def value_types(self) -> typing.Mapping[str, typing.Type[ValueType]]: + + if self._value_types is not None: + return self._value_types + + all_value_type_classes = ValueType.__subclasses__() + value_type_dict: typing.Dict[str, typing.Type[ValueType]] = {} + for cls in all_value_type_classes: + type_name = cls.type_name() + if type_name in value_type_dict.keys(): + raise Exception( + f"Can't initiate types: duplicate type name '{type_name}'" + ) + value_type_dict[type_name] = cls + + self._value_types = value_type_dict + return self._value_types + + @property + def value_type_names(self) -> typing.List[str]: + return list(self.value_types.keys()) + + @property + def registered_python_classes( + self, + ) -> typing.Mapping[typing.Type, typing.Iterable[str]]: + + if self._registered_python_classes is not None: + return self._registered_python_classes + + registered_types = {} + for name, v_type in self.value_types.items(): + rel = v_type.relevant_python_types() + if rel: + for cls in rel: + registered_types.setdefault(cls, []).append(name) + + self._registered_python_classes = registered_types + return self._registered_python_classes + + def determine_type(self, data: typing.Any) -> typing.Optional[ValueType]: + + if isinstance(data, Value): + data = data.get_value_data() + + result: typing.List[ValueType] = [] + + registered_types = set(self.registered_python_classes.get(data.__class__, [])) + for cls in data.__class__.__bases__: + reg = self.registered_python_classes.get(cls) + if reg: + registered_types.update(reg) + + if registered_types: + for rt in registered_types: + _cls: typing.Type[ValueType] = self.get_value_type_cls(rt) + match = _cls.check_data_type(data) + if match: + result.append(match) + + # TODO: re-run all checks on all modules, not just the ones that registered interest in the class + + if len(result) == 0: + return None + elif len(result) > 1: + result_str = [x.type_name() for x in result] + raise Exception( + f"Multiple value types found for value: {', '.join(result_str)}." + ) + else: + return result[0] + + def get_value_type_cls(self, type_name: str) -> typing.Type[ValueType]: + + t = self.value_types.get(type_name, None) + if t is None: + raise Exception( + f"No value type '{type_name}', available types: {', '.join(self.value_types.keys())}" + ) + return t + + def get_value_type_transformations( + self, value_type_name: str + ) -> typing.Mapping[str, typing.Mapping[str, typing.Any]]: + """Return available transform pipelines for value types.""" + + if value_type_name in self._value_type_transformations.keys(): + return self._value_type_transformations[value_type_name] + + type_cls = self.get_value_type_cls(type_name=value_type_name) + _configs = type_cls.get_type_transformation_configs() + if _configs is None: + configs = {} + else: + configs = dict(_configs) + for base in type_cls.__bases__: + if hasattr(base, "get_type_transformation_configs"): + _b_configs = base.get_type_transformation_configs() # type: ignore + if not _b_configs: + continue + for k, v in _b_configs.items(): + if k not in configs.keys(): + configs[k] = v + + # TODO: check input type compatibility? + result: typing.Dict[str, typing.Dict[str, typing.Any]] = {} + for name, config in configs.items(): + config = dict(config) + module_type = config.pop("module_type", None) + if not module_type: + raise Exception( + f"Can't create transformation '{name}' for type '{value_type_name}', no module type specified in config: {config}" + ) + module_config = config.pop("module_config", {}) + module = self._kiara.create_module( + f"_transform_{value_type_name}_{name}", + module_type=module_type, + module_config=module_config, + ) + + if "input_name" not in config.keys(): + + if len(module.input_schemas) == 1: + config["input_name"] = next(iter(module.input_schemas.keys())) + else: + required_inputs = [ + inp + for inp, schema in module.input_schemas.items() + if schema.is_required() + ] + if len(required_inputs) == 1: + config["input_name"] = required_inputs[0] + else: + raise Exception( + f"Can't create transformation '{name}' for type '{value_type_name}': can't determine input name between those options: '{', '.join(required_inputs)}'" + ) + + if "output_name" not in config.keys(): + + if len(module.input_schemas) == 1: + config["output_name"] = next(iter(module.output_schemas.keys())) + else: + required_outputs = [ + inp + for inp, schema in module.output_schemas.items() + if schema.is_required() + ] + if len(required_outputs) == 1: + config["output_name"] = required_outputs[0] + else: + raise Exception( + f"Can't create transformation '{name}' for type '{value_type_name}': can't determine output name between those options: '{', '.join(required_outputs)}'" + ) + + result[name] = { + "module": module, + "module_type": module_type, + "module_config": module_config, + "transformation_config": config, + } + + self._value_type_transformations[value_type_name] = result + return self._value_type_transformations[value_type_name] + + # def get_available_transformations_for_type( + # self, value_type_name: str + # ) -> typing.Iterable[str]: + # + # return self.get_value_type_transformations(value_type_name=value_type_name) + + # def transform_value( + # self, + # transformation_alias: str, + # value: Value, + # other_inputs: typing.Optional[typing.Mapping[str, typing.Any]] = None, + # ) -> Value: + # + # transformations = self.get_value_type_transformations(value.value_schema.type) + # + # if transformation_alias not in transformations.keys(): + # raise Exception( + # f"Can't transform value of type '{value.value_schema.type}', transformation '{transformation_alias}' not available for this type. Available: {', '.join(transformations.keys())}" + # ) + # + # config = transformations[transformation_alias] + # + # transformation_config = config["transformation_config"] + # input_name = transformation_config["input_name"] + # + # module: KiaraModule = config["module"] + # + # constants = module.get_config_value("constants") + # inputs = dict(constants) + # + # if other_inputs: + # + # for k, v in other_inputs.items(): + # if k in constants.keys(): + # raise Exception(f"Invalid value '{k}' for 'other_inputs', this is a constant that can't be overwrittern.") + # inputs[k] = v + # + # defaults = transformation_config.get("defaults", None) + # if defaults: + # for k, v in defaults.items(): + # if k in constants.keys(): + # raise Exception(f"Invalid default value '{k}', this is a constant that can't be overwrittern.") + # if k not in inputs.keys(): + # inputs[k] = v + # + # if input_name in inputs.keys(): + # raise Exception( + # f"Invalid value for inputs in transform arguments, can't contain the main input key '{input_name}'." + # ) + # + # inputs[input_name] = value + # + # result = module.run(**inputs) + # output_name = transformation_config["output_name"] + # + # result_value = result.get_value_obj(output_name) + # return result_value diff --git a/src/kiara/data/values.py b/src/kiara/data/values.py index e5e231558..b923bbeda 100644 --- a/src/kiara/data/values.py +++ b/src/kiara/data/values.py @@ -277,17 +277,25 @@ def _create_value_table(self, padding=(0, 1)) -> Table: return table + def get_metadata( + self, *metadata_keys: str, also_return_schema: bool = False + ) -> typing.Mapping[str, typing.Mapping[str, typing.Any]]: + + raise NotImplementedError() + def transform( self, - transformation_alias: str, + target_type: str, return_data: bool = True, - other_inputs: typing.Optional[typing.Mapping[str, typing.Any]] = None, - ) -> typing.Any: - - transformed = self._kiara.transform_value( - transformation_alias=transformation_alias, - value=self, - other_inputs=other_inputs, + config: typing.Optional[typing.Mapping[str, typing.Any]] = None, + register_result: bool = False, + ) -> typing.Union[typing.Mapping[str, typing.Any], "Value"]: + + transformed = self._kiara.transform_data( + data=self, + target_type=target_type, + config=config, + register_result=register_result, ) if not return_data: return transformed @@ -394,6 +402,27 @@ def get_value_hash(self) -> typing.Any: else: return self.value_hash + def get_metadata( + self, *metadata_keys: str, also_return_schema: bool = False + ) -> typing.Mapping[str, typing.Mapping[str, typing.Any]]: + + result = {} + missing = set() + for metadata_key in metadata_keys: + if metadata_keys in self.metadata.keys(): + if also_return_schema: + result[metadata_key] = self.metadata[metadata_key] + else: + result[metadata_key] = self.metadata[metadata_key]["metadata"] + else: + missing.add(metadata_key) + + if not missing: + return result + _md = self.registry.get_value_metadata(self, *missing) + result.update(_md) + return result + def __eq__(self, other): # TODO: compare all attributes if id is equal, just to make sure... @@ -834,12 +863,13 @@ class ValuesInfo(object): def __init__(self, value_set: ValueSet, title: typing.Optional[str] = None): self._value_set: ValueSet = value_set + self._title: typing.Optional[str] = title def create_value_data_table( self, show_headers: bool = False, - transformer: typing.Optional[str] = None, - transformer_config: typing.Optional[typing.Mapping[str, typing.Any]] = None, + convert_module_type: typing.Optional[str] = None, + convert_config: typing.Optional[typing.Mapping[str, typing.Any]] = None, ) -> Table: table = Table(show_header=show_headers, box=box.SIMPLE) @@ -851,7 +881,9 @@ def create_value_data_table( if not value.is_set: if value.item_is_valid(): - value_str = "-- not set --" + value_str: typing.Union[ + ConsoleRenderable, RichCast, str + ] = "-- not set --" else: value_str = "[red]-- not set --[/red]" elif value.is_none: @@ -860,13 +892,14 @@ def create_value_data_table( else: value_str = "[red]-- no value --[/red]" else: - if not transformer: + if not convert_module_type: value_str = value.get_value_data() else: - value_str = value.transform( - transformer, return_data=True, other_inputs=transformer_config + _value_str = value.transform( + convert_module_type, return_data=True, config=convert_config ) - if not isinstance(value_str, (ConsoleRenderable, RichCast, str)): + + if not isinstance(_value_str, (ConsoleRenderable, RichCast, str)): value_str = str(value_str) table.add_row(field_name, value_str) @@ -1042,6 +1075,9 @@ class StepInputField(ValueField): connected_pipeline_input: typing.Optional[str] = Field( default=None, description="A potential pipeline input." ) + is_constant: bool = Field( + "Whether this input is a constant and can't be changed by the user." + ) @root_validator(pre=True) def ensure_single_connected_item(cls, values): @@ -1101,6 +1137,9 @@ class PipelineInputField(ValueField): description="The step inputs that are connected to this pipeline input", default_factory=list, ) + is_constant: bool = Field( + "Whether this input is a constant and can't be changed by the user." + ) @property def alias(self) -> str: @@ -1117,6 +1156,23 @@ def alias(self) -> str: return generate_step_alias(PIPELINE_PARENT_MARKER, self.value_name) +class ValueSetType(ValueType): + def validate(cls, value: typing.Any) -> None: + assert isinstance(value, ValueSet) + + @classmethod + def check_data_type(cls, data: typing.Any) -> typing.Optional[ValueType]: + + if isinstance(data, ValueSet): + return ValueSetType() + else: + return None + + @classmethod + def relevant_python_types(cls) -> typing.Optional[typing.Iterable[typing.Type]]: + return [ValueSet] + + Value.update_forward_refs() DataValue.update_forward_refs() LinkedValue.update_forward_refs() diff --git a/src/kiara/interfaces/cli/__init__.py b/src/kiara/interfaces/cli/__init__.py index 4f4b177d1..70a82a50a 100644 --- a/src/kiara/interfaces/cli/__init__.py +++ b/src/kiara/interfaces/cli/__init__.py @@ -6,22 +6,21 @@ import os.path import sys import typing +from kiara_modules.core.json import DEFAULT_TO_JSON_CONFIG +from pathlib import Path +from rich import box +from rich.console import Console, RenderGroup from rich.panel import Panel +from rich.syntax import Syntax from kiara import Kiara -from kiara.data.values import ValuesInfo from kiara.interfaces import get_console from kiara.module import KiaraModule, ModuleInfo from kiara.pipeline.controller import BatchController from kiara.pipeline.module import PipelineModuleInfo from kiara.processing.parallel import ThreadPoolProcessor from kiara.utils import create_table_from_field_schemas, dict_from_cli_args - -# from importlib.metadata import entry_points - - -# from asciinet import graph_to_ascii - +from kiara.utils.output import OutputDetails try: import uvloop @@ -43,7 +42,7 @@ def rich_print(msg: typing.Any = None) -> None: def _create_module_instance( ctx, module_type: str, module_config: typing.Iterable[typing.Any] -): +) -> KiaraModule: config = dict_from_cli_args(*module_config) kiara_obj = ctx.obj["kiara"] @@ -115,10 +114,10 @@ def list_modules(ctx, only_pipeline_modules: bool, only_core_modules: bool): kiara_obj.explain(p) -@module.command(name="describe-type") +@module.command(name="explain-type") @click.argument("module_type", nargs=1, required=True) @click.pass_context -def describe_module_type(ctx, module_type: str): +def explain_module_type(ctx, module_type: str): """Print details of a (Python) module.""" kiara_obj: Kiara = ctx.obj["kiara"] @@ -139,14 +138,14 @@ def describe_module_type(ctx, module_type: str): rich_print(info) -@module.command("describe-instance") +@module.command("explain-instance") @click.argument("module_type", nargs=1) @click.argument( "module_config", nargs=-1, ) @click.pass_context -def describe_module(ctx, module_type: str, module_config: typing.Iterable[typing.Any]): +def explain_module(ctx, module_type: str, module_config: typing.Iterable[typing.Any]): """Describe a step. A step, in this context, is basically a an instantiated module class, incl. (optional) config.""" @@ -265,6 +264,23 @@ def explain_steps(ctx, pipeline_type: str): kiara_obj.explain(structure.to_details().steps_info) +@cli.group(name="type") +@click.pass_context +def type_group(ctx): + """Information about available modules, and details about them.""" + + +@type_group.command(name="list") +@click.pass_context +def list_types(ctx): + + kiara_obj: Kiara = ctx.obj["kiara"] + + print() + for type_name, type in kiara_obj.value_types.items(): + rich_print(f"{type_name}: {type}") + + @cli.command() @click.argument("module", nargs=1) @click.argument("inputs", nargs=-1, required=False) @@ -276,40 +292,20 @@ def explain_steps(ctx, pipeline_type: str): multiple=True, ) @click.option( - "--data-details", + "--workflow-details", "-d", - help="Print details/metadata about input/output data.", + help="Display additional workflow details.", is_flag=True, - default=False, ) @click.option( - "--only-output", - "-o", - help="Only print output data. Overrides all other display options.", - is_flag=True, - default=False, + "--output", "-o", help="The output format and configuration.", multiple=True ) @click.pass_context -async def run(ctx, module, inputs, module_config, data_details, only_output): +async def run(ctx, module, inputs, module_config, output, workflow_details): if module_config: raise NotImplementedError() - display_state = True - display_input_values = True - display_input_details = False - display_output_values = True - display_output_details = False - - if data_details: - display_input_details = True - display_output_details = True - - if only_output: - display_state = False - display_input_values = False - display_input_details = False - kiara_obj: Kiara = ctx.obj["kiara"] if module in kiara_obj.available_module_types: @@ -335,8 +331,8 @@ async def run(ctx, module, inputs, module_config, data_details, only_output): ) one_required = False - for schema in module_obj.input_schemas.values(): - if schema.is_required(): + for input_name in module_obj.input_names: + if module_obj.input_required(input_name): one_required = True break @@ -350,17 +346,41 @@ async def run(ctx, module, inputs, module_config, data_details, only_output): "No inputs provided, not running the workflow. To run it, provide input following this schema:" ) rich_print(inputs_table) - sys.exit(0) + sys.exit(1) + + output_details = OutputDetails.from_data(output) + if output_details.format != "terminal": + pass + + force_overwrite = output_details.config.get("force", False) + + # SUPPORTED_TARGETS = ["terminal", "file"] + # if output_details.target not in SUPPORTED_TARGETS: + # print() + # rich_print(f"Invalid output target '{output_details.target}', must be one of: [i]{', '.join(SUPPORTED_TARGETS)}[/i]") + # sys.exit(1) + + target_file: typing.Optional[Path] = None + if output_details.target != "terminal": + if output_details.target == "file": + target_dir = Path.cwd() + target_file = target_dir / f"{module_name}.{output_details.format}" + else: + target_file = Path( + os.path.realpath(os.path.expanduser(output_details.target)) + ) + + if target_file.exists() and not force_overwrite: + print() + print( + f"Can't run workflow, the target files already exist, and '--output force=true' not specified: {target_file}" + ) + sys.exit(1) processor = ThreadPoolProcessor() # processor = None controller = BatchController(processor=processor) - workflow = kiara_obj.create_workflow(module_name, controller=controller) - - # l = DebugListener() - # workflow.pipeline.add_listener(l) - list_keys = [] for name, value in workflow.inputs.items(): @@ -374,49 +394,88 @@ async def run(ctx, module, inputs, module_config, data_details, only_output): else: workflow.controller.process_pipeline() - transformer = "to_string" - transformer_config = {"max_lines": 6} + if workflow_details: + kiara_obj.explain(workflow.current_state) - if display_input_values: - vi = ValuesInfo(workflow.inputs) - vt = vi.create_value_data_table( - show_headers=True, - transformer=transformer, - transformer_config=transformer_config, - ) - rich_print(Panel(vt, title_align="left", title="Workflow input data")) - print() - - if display_input_details: - vi = ValuesInfo(workflow.inputs) - vt = vi.create_value_info_table(show_headers=True) - rich_print(Panel(vt, title_align="left", title="Workflow input details")) - print() - - if display_state: - state_panel = Panel( - workflow.current_state, - title="Workflow state", - title_align="left", - padding=(1, 0, 0, 2), - ) - rich_print(state_panel) - print() - - if display_output_details: - vi = ValuesInfo(workflow.outputs) - vt = vi.create_value_info_table(show_headers=True) - rich_print(Panel(vt, title_align="left", title="Workflow output details")) - print() - - if display_output_values: - vi = ValuesInfo(workflow.outputs) - vt = vi.create_value_data_table( - show_headers=True, - transformer=transformer, - transformer_config=transformer_config, - ) - rich_print(Panel(vt, title_align="left", title="Workflow output data")) + if output_details.target == "terminal": + if output_details.format == "terminal": + print() + pretty_print = kiara_obj.create_workflow("strings.pretty_print") + pretty_print.inputs.set_value("item", workflow.outputs) + + renderables = pretty_print.outputs.get_value_data("renderables") + output = Panel(RenderGroup(*renderables), box=box.SIMPLE) + rich_print("[b]Output data[/b]") + rich_print(output) + else: + + format = output_details.format + available_formats = kiara_obj.get_convert_target_types( + source_type="value_set" + ) + if format not in available_formats: + print() + print( + f"Can't convert to output format '{format}', this format is not supported. Available formats: {', '.join(available_formats)}." + ) + sys.exit(1) + + config = {} + config.update(DEFAULT_TO_JSON_CONFIG) + + transformed = kiara_obj.transform_data( + workflow.outputs, + source_type="value_set", + target_type=format, + config=config, + ) + transformed_value = transformed.get_value_data("target_value") + + if format in ["json", "yaml"]: + transformed_str = Syntax( + transformed_value, lexer_name=format, background_color="default" + ) + rich_print(transformed_str) + else: + print(transformed_value) + + else: + if output_details.format == "terminal": + pretty_print = kiara_obj.create_workflow("strings.pretty_print") + pretty_print.inputs.set_value("item", workflow.outputs) + + renderables = pretty_print.outputs.get_value_data("renderables") + output = Panel(RenderGroup(*renderables), box=box.SIMPLE) + with open(target_file, "wt") as f: + console = Console(record=True, file=f) + console.print(output) + else: + + format = output_details.format + available_formats = kiara_obj.get_convert_target_types( + source_type="value_set" + ) + if format not in available_formats: + print() + print( + f"Can't convert to output format '{format}', this format is not supported. Available formats: {', '.join(available_formats)}." + ) + sys.exit(1) + + config = {} + config.update(DEFAULT_TO_JSON_CONFIG) + + transformed = kiara_obj.transform_data( + workflow.outputs, + source_type="value_set", + target_type=format, + config=config, + ) + transformed_value = transformed.get_value_data("target_value") + + target_file.parent.mkdir(parents=True, exist_ok=True) + # TODO: check whether to write text or bytes + target_file.write_text(transformed_value) # @cli.command() @@ -426,60 +485,34 @@ async def run(ctx, module, inputs, module_config, data_details, only_output): # # os.environ["DEBUG"] = "true" # -# # kiara = Kiara.instance() -# -# ctx = Context.instance() -# -# sub = ctx.socket(zmq.SUB) -# sub.setsockopt_string(zmq.SUBSCRIBE, "") -# sub.connect("tcp://127.0.0.1:5556") -# while True: -# message = sub.recv() -# print("Received request: %s" % message) -# # socket.send(b"World") -# -# module_info = kiara.get_module_info("import_local_folder") -# # kiara.explain(module_info) -# -# workflow = kiara.create_workflow("import_local_folder") -# workflow.inputs.path = ( -# "/home/markus/projects/dharpa/notebooks/TopicModelling/data_tm_workflow" -# ) -# -# # kiara.explain(workflow.outputs) -# kiara.explain(workflow.outputs.file_bundle) -# - -# @cli.command() -# @click.pass_context -# def dev2(ctx): -# import os -# -# os.environ["DEBUG"] = "true" +# kiara = Kiara.instance() # -# # kiara = Kiara.instance() +# workflow = kiara.create_workflow("onboarding.import_local_folder") # -# ctx = Context.instance() +# extended = { +# "steps": [ +# { +# "module_type": "tabular.create_table_from_text_files", +# "module_config": { +# "columns": ["id", "rel_path", "file_name", "content"] +# }, +# "step_id": "create_table_from_files", +# } +# ], +# "input_aliases": {"create_table_from_files__files": "file_bundle"}, +# } # -# pub = ctx.socket(zmq.PUB) -# pub.connect("tcp://127.0.0.1:5555") -# while True: -# topic = random.randrange(9999, 10005) -# messagedata = random.randrange(1, 215) - 80 -# print("%d %d" % (topic, messagedata)) -# pub.send_string("%d %d" % (topic, messagedata)) -# time.sleep(1) +# structure = workflow.structure +# new_structure = structure.extend(extended) # -# module_info = kiara.get_module_info("import_local_folder") -# # kiara.explain(module_info) +# workflow = kiara.create_workflow("tabular.import_table_from_folder") +# workflow.inputs.set_value("path", "/home/markus/projects/dharpa/data/csvs") # -# workflow = kiara.create_workflow("import_local_folder") -# workflow.inputs.path = ( -# "/home/markus/projects/dharpa/notebooks/TopicModelling/data_tm_workflow" -# ) +# import pp # -# # kiara.explain(workflow.outputs) -# kiara.explain(workflow.outputs.file_bundle) +# table = workflow.outputs.get_value_obj("table") +# md = table.get_metadata() +# pp(md) if __name__ == "__main__": diff --git a/src/kiara/kiara.py b/src/kiara/kiara.py index 670a95f8c..3a5a66504 100644 --- a/src/kiara/kiara.py +++ b/src/kiara/kiara.py @@ -14,6 +14,7 @@ from kiara.data import Value from kiara.data.registry import DataRegistry from kiara.data.types import ValueType +from kiara.data.types.type_mgmt import TypeMgmt from kiara.interfaces import get_console from kiara.module_config import KiaraWorkflowConfig, PipelineModuleConfig from kiara.module_mgmt import ModuleManager @@ -22,6 +23,7 @@ from kiara.pipeline.controller import PipelineController from kiara.pipeline.pipeline import Pipeline from kiara.processing import Job, ModuleProcessor +from kiara.profiles import ModuleProfileMgmt from kiara.utils import get_auto_workflow_alias, get_data_from_file, is_debug from kiara.workflow import KiaraWorkflow @@ -67,6 +69,8 @@ def __init__(self, config: typing.Optional[KiaraConfig] = None): self._default_pipeline_mgr = PipelineModuleManager(folders=None) self._custom_pipelines_mgr = PipelineModuleManager(folders={}) + self._profile_mgmt = ModuleProfileMgmt(kiara=self) + self.start_zmq_device() self.start_log_thread() @@ -91,12 +95,7 @@ def __init__(self, config: typing.Optional[KiaraConfig] = None): self._modules: typing.Dict[str, ModuleManager] = {} - self._value_types: typing.Optional[ - typing.Dict[str, typing.Type[ValueType]] - ] = None - self._value_type_transformations: typing.Dict[ - str, typing.Dict[str, typing.Any] - ] = {} + self._type_mgmt: TypeMgmt = TypeMgmt(self) self._data_registry: DataRegistry = DataRegistry(self) @@ -152,159 +151,93 @@ def explain(self, item: typing.Any): @property def value_types(self) -> typing.Mapping[str, typing.Type[ValueType]]: - - if self._value_types is not None: - return self._value_types - - all_value_type_classes = ValueType.__subclasses__() - value_type_dict: typing.Dict[str, typing.Type[ValueType]] = {} - for cls in all_value_type_classes: - type_name = cls.type_name() - if type_name in value_type_dict.keys(): - raise Exception( - f"Can't initiate types: duplicate type name '{type_name}'" - ) - value_type_dict[type_name] = cls - - self._value_types = value_type_dict - return self._value_types + return self._type_mgmt.value_types @property def value_type_names(self) -> typing.List[str]: - return list(self.value_types.keys()) - - def get_value_type_cls(self, type_name: str) -> typing.Type[ValueType]: + return self._type_mgmt.value_type_names - t = self.value_types.get(type_name, None) - if t is None: - raise Exception( - f"No value type '{type_name}', available types: {', '.join(self.value_types.keys())}" - ) - return t + def determine_type(self, data: typing.Any) -> typing.Optional[ValueType]: - def get_value_type_transformations( - self, value_type_name: str - ) -> typing.Mapping[str, typing.Mapping[str, typing.Any]]: - """Return available transform pipelines for value types.""" + return self._type_mgmt.determine_type(data) - if value_type_name in self._value_type_transformations.keys(): - return self._value_type_transformations[value_type_name] - - type_cls = self.get_value_type_cls(type_name=value_type_name) - _configs = type_cls.get_type_transformation_configs() - if _configs is None: - configs = {} - else: - configs = dict(_configs) - for base in type_cls.__bases__: - if hasattr(base, "get_type_transformation_configs"): - _b_configs = base.get_type_transformation_configs() # type: ignore - if not _b_configs: - continue - for k, v in _b_configs.items(): - if k not in configs.keys(): - configs[k] = v - - # TODO: check input type compatibility? - result: typing.Dict[str, typing.Dict[str, typing.Any]] = {} - for name, config in configs.items(): - config = dict(config) - module_type = config.pop("module_type", None) - if not module_type: - raise Exception( - f"Can't create transformation '{name}' for type '{value_type_name}', no module type specified in config: {config}" - ) - module_config = config.pop("module_config", {}) - module = self.create_module( - f"_transform_{value_type_name}_{name}", - module_type=module_type, - module_config=module_config, - ) + def get_value_metadata( + self, + value: Value, + metadata_keys: typing.Union[None, str, typing.Iterable[str]] = None, + ): - if "input_name" not in config.keys(): + value_type = value.value_schema.type + # TODO: validate type exists - if len(module.input_schemas) == 1: - config["input_name"] = next(iter(module.input_schemas.keys())) - else: - required_inputs = [ - inp - for inp, schema in module.input_schemas.items() - if schema.is_required() - ] - if len(required_inputs) == 1: - config["input_name"] = required_inputs[0] - else: - raise Exception( - f"Can't create transformation '{name}' for type '{value_type_name}': can't determine input name between those options: '{', '.join(required_inputs)}'" - ) + all_profiles_for_type = self._profile_mgmt.extract_metadata_profiles.get( + value_type, None + ) + if all_profiles_for_type is None: + all_profiles_for_type = {} - if "output_name" not in config.keys(): + if not metadata_keys: + metadata_keys = all_profiles_for_type.keys() + elif isinstance(metadata_keys, str): + metadata_keys = [metadata_keys] - if len(module.input_schemas) == 1: - config["output_name"] = next(iter(module.output_schemas.keys())) - else: - required_outputs = [ - inp - for inp, schema in module.output_schemas.items() - if schema.is_required() - ] - if len(required_outputs) == 1: - config["output_name"] = required_outputs[0] - else: - raise Exception( - f"Can't create transformation '{name}' for type '{value_type_name}': can't determine output name between those options: '{', '.join(required_outputs)}'" - ) + result = {} - result[name] = { - "module": module, - "module_type": module_type, - "module_config": module_config, - "transformation_config": config, - } + for mk in metadata_keys: + if not all_profiles_for_type or mk not in all_profiles_for_type: + raise Exception( + f"Can't extract metadata profile '{mk}' for type '{value_type}': metadata profile does not exist (for this type, anyway)." + ) + profile = all_profiles_for_type[mk] + module = profile.create_module(kiara=self) + metadata_result = module.run(value=value) + result[mk] = metadata_result.get_all_value_data() - self._value_type_transformations[value_type_name] = result - return self._value_type_transformations[value_type_name] + return result - def get_available_transformations_for_type( - self, value_type_name: str - ) -> typing.Iterable[str]: + def get_value_type_cls(self, type_name: str) -> typing.Type[ValueType]: - return self.get_value_type_transformations(value_type_name=value_type_name) + return self._type_mgmt.get_value_type_cls(type_name=type_name) - def transform_value( + def transform_data( self, - transformation_alias: str, - value: Value, - other_inputs: typing.Optional[typing.Mapping[str, typing.Any]] = None, + data: typing.Any, + target_type: str, + source_type: typing.Optional[str] = None, + config: typing.Optional[typing.Mapping[str, typing.Any]] = None, + register_result: bool = False, ) -> Value: - transformations = self.get_value_type_transformations(value.value_schema.type) - if transformation_alias not in transformations.keys(): - raise Exception( - f"Can't transform value of type '{value.value_schema.type}', transformation '{transformation_alias}' not available for this type. Available: {', '.join(transformations.keys())}" - ) + if register_result: + raise NotImplementedError() - config = transformations[transformation_alias] - input_name = config["transformation_config"]["input_name"] + if not source_type: + if isinstance(data, Value): + source_type = data.type_name + else: + _source_type = self._type_mgmt.determine_type(data) + if not _source_type: + raise Exception( + f"Can't transform data to '{target_type}': can not determine source type." + ) + source_type = _source_type.type_name() - module: KiaraModule = config["module"] + module = self._profile_mgmt.get_type_conversion_module( + source_type=source_type, target_type=target_type # type: ignore + ) + from kiara.modules.type_conversion import TypeConversionModule - if other_inputs is None: - inputs = {} - else: - inputs = dict(other_inputs) - if input_name in other_inputs.keys(): - raise Exception( - f"Invalid value for 'other_inputs' in transform arguments, can't contain the main input key '{input_name}'." - ) + if isinstance(module, TypeConversionModule): - inputs[input_name] = value + result = module.run(source_value=data, config=config) + return result.get_value_obj("target_value") + + else: + raise NotImplementedError() - result = module.run(**inputs) - output_name = config["transformation_config"]["output_name"] + def get_convert_target_types(self, source_type: str) -> typing.Iterable[str]: - result_value = result.get_value_obj(output_name) - return result_value + return self._profile_mgmt.type_convert_profiles.get(source_type, []) def add_module_manager(self, module_manager: ModuleManager): diff --git a/src/kiara/metadata/__init__.py b/src/kiara/metadata/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/kiara/module.py b/src/kiara/module.py index 44f7ec648..88a9498de 100644 --- a/src/kiara/module.py +++ b/src/kiara/module.py @@ -326,6 +326,19 @@ def config(self) -> KIARA_CONFIG: """ return self._config + def input_required(self, input_name: str): + + if input_name not in self._input_schemas.keys(): + raise Exception(f"No input '{input_name}' for module '{self.id}'.") + + if not self._input_schemas[input_name].is_required(): + return False + + if input_name in self.get_config_value("constants"): + return False + else: + return True + def get_config_value(self, key: str) -> typing.Any: """Retrieve the value for a specific configuration option. @@ -370,6 +383,7 @@ def input_schemas(self) -> typing.Mapping[str, ValueSchema]: result = {} for k, v in _input_schemas.items(): + if isinstance(v, ValueSchema): result[k] = v elif isinstance(v, typing.Mapping): @@ -380,6 +394,23 @@ def input_schemas(self) -> typing.Mapping[str, ValueSchema]: raise Exception( f"Invalid return type when tryping to create schema for '{self.id}': {type(v)}" ) + default_value = self.config.defaults.get(k, None) + constant_value = self.config.constants.get(k, None) + + # value_to_test = None + if default_value is not None and constant_value is not None: + raise Exception( + f"Module configuration error. Value '{k}' set in both 'constants' and 'defaults', this is not allowed." + ) + # elif default_value is not None: + # value_to_test = default_value + # elif constant_value is not None: + # value_to_test = constant_value + + # TODO: perform validation for constants/defaults + + if default_value is not None: + result[k].default = default_value self._input_schemas = result @@ -478,6 +509,10 @@ def clean_value(v: typing.Any) -> typing.Any: for k, v in inputs.items(): v = clean_value(v) if not isinstance(v, Value): + if k not in self.input_schemas.keys(): + raise Exception( + f"Invalid input name '{k}. Not part of the schema, allowed input names: {', '.join(self.input_names)}" + ) schema = self.input_schemas[k] v = NonRegistryValue( _init_value=v, # type: ignore @@ -590,8 +625,11 @@ def __rich_console__( config_str = json.dumps(config, indent=2) c = Syntax(config_str, "json", background_color="default") table.add_row("config", c) + + constants = self.config.get("constants") + inputs_table = create_table_from_field_schemas( - _show_header=True, **self.input_schemas + _show_header=True, _constants=constants, **self.input_schemas ) table.add_row("inputs", inputs_table) outputs_table = create_table_from_field_schemas( diff --git a/src/kiara/module_config.py b/src/kiara/module_config.py index 40e08ea4d..eec0d3fd8 100644 --- a/src/kiara/module_config.py +++ b/src/kiara/module_config.py @@ -21,16 +21,20 @@ def create_step_value_address( value_address_config: typing.Union[str, typing.Mapping[str, typing.Any]], - field_name: str, + default_field_name: str, ) -> StepValueAddress: + if isinstance(value_address_config, StepValueAddress): + return value_address_config + sub_value: typing.Optional[typing.Mapping[str, typing.Any]] = None + if isinstance(value_address_config, str): tokens = value_address_config.split(".") if len(tokens) == 1: step_id = value_address_config - output_name = field_name + output_name = default_field_name elif len(tokens) == 2: step_id = tokens[0] output_name = tokens[1] @@ -45,6 +49,10 @@ def create_step_value_address( step_id = value_address_config["step_id"] output_name = value_address_config["output_name"] sub_value = value_address_config.get("sub_value", None) + else: + raise TypeError( + f"Invalid type for creating step value address: {type(value_address_config)}" + ) if sub_value is not None and not isinstance(sub_value, typing.Mapping): raise ValueError( @@ -57,6 +65,30 @@ def create_step_value_address( return input_link +def ensure_step_value_addresses( + link: typing.Union[str, typing.Mapping, typing.Iterable], default_field_name: str +) -> typing.List[StepValueAddress]: + + if isinstance(link, (str, typing.Mapping)): + input_links: typing.List[StepValueAddress] = [ + create_step_value_address( + value_address_config=link, default_field_name=default_field_name + ) + ] + + elif isinstance(link, typing.Iterable): + input_links = [] + for o in link: + il = create_step_value_address( + value_address_config=o, default_field_name=default_field_name + ) + input_links.append(il) + else: + raise TypeError(f"Can't parse input map, invalid type for output: {link}") + + return input_links + + class PipelineStepConfig(BaseModel): """A class to hold the configuration of one module within a [PipelineModule][kiara.pipeline.module.PipelineModule].""" @@ -81,42 +113,45 @@ def ensure_input_links_valid(cls, v): result = {} for input_name, output in v.items(): - if isinstance(output, (str, typing.Mapping)): - input_links = [ - create_step_value_address( - value_address_config=output, field_name=input_name - ) - ] - - elif isinstance(output, collections.abc.Sequence): - input_links = [] - for o in output: - il = create_step_value_address( - value_address_config=o, field_name=input_name - ) - input_links.append(il) - else: - raise TypeError( - f"Can't parse input map, invalid type for output: {output}" - ) - + input_links = ensure_step_value_addresses( + default_field_name=input_name, link=output + ) result[input_name] = input_links return result +class PipelineStructureConfig(BaseModel): + + parent_id: str = Field(description="The id of the parent of this structure.") + steps: typing.List[PipelineStepConfig] + input_aliases: typing.Union[None, str, typing.Dict[str, str]] = None + output_aliases: typing.Union[None, str, typing.Dict[str, str]] = None + + class KiaraModuleConfig(BaseModel): - """Base class that describes the configuration a [KiaraModule][kiara.module.KiaraModule] class accepts. + """Base class that describes the configuration a [``KiaraModule``][kiara.module.KiaraModule] class accepts. This is stored in the ``_config_cls`` class attribute in each ``KiaraModule`` class. By default, such a ``KiaraModule`` is not configurable. + There are two config options every ``KiaraModule`` supports: + + - ``constants``, and + - ``defaults`` + + Constants are pre-set inputs, and users can't change them and an error is thrown if they try. Defaults are default + values that override the schema defaults, and those can be overwritten by users. If both a constant and a default + value is set for an input field, an error is thrown. """ _config_hash: str = PrivateAttr(default=None) constants: typing.Dict[str, typing.Any] = Field( default_factory=dict, description="Value constants for this module." ) + defaults: typing.Dict[str, typing.Any] = Field( + default_factory=dict, description="Value defaults for this module." + ) class Config: extra = Extra.forbid @@ -267,9 +302,7 @@ def create_structure( ps = PipelineStructure( parent_id=parent_id, - steps=self.steps, - input_aliases=self.input_aliases, - output_aliases=self.output_aliases, + config=self, kiara=kiara, ) return ps @@ -277,7 +310,6 @@ def create_structure( def create_pipeline( self, parent_id: typing.Optional[str] = None, - constants: typing.Optional[typing.Mapping[str, typing.Any]] = None, controller: typing.Optional["PipelineController"] = None, kiara: typing.Optional["Kiara"] = None, ): @@ -290,7 +322,6 @@ def create_pipeline( pipeline = Pipeline( structure=structure, - constants=constants, controller=controller, ) return pipeline diff --git a/src/kiara/modules/metadata.py b/src/kiara/modules/metadata.py new file mode 100644 index 000000000..55f5cfeee --- /dev/null +++ b/src/kiara/modules/metadata.py @@ -0,0 +1,154 @@ +# -*- coding: utf-8 -*- +import abc +import typing +from pydantic import BaseModel, Field + +from kiara import KiaraModule +from kiara.data import Value, ValueSet +from kiara.data.values import ValueSchema +from kiara.exceptions import KiaraProcessingException +from kiara.module_config import KiaraModuleConfig + + +class MetadataModuleConfig(KiaraModuleConfig): + + type: str = Field(description="The data type this module will be used for.") + + +class ExtractMetadataModule(KiaraModule): + """Extract metadata for the 'table' type.""" + + _config_cls = MetadataModuleConfig + + @classmethod + @abc.abstractmethod + def _get_supported_types(self) -> typing.Union[str, typing.Iterable[str]]: + pass + + @classmethod + def get_metadata_key(cls) -> str: + return cls._module_type_name # type: ignore + + @classmethod + def get_supported_value_types(cls) -> typing.Set[str]: + _types = cls._get_supported_types() + if isinstance(_types, str): + _types = [_types] + + return set(_types) + + def __init__(self, *args, **kwargs): + + self._metadata_schema: typing.Optional[str] = None + super().__init__(*args, **kwargs) + + @property + def value_type(self) -> str: + data_type = self.get_config_value("type") + sup_types = self.get_supported_value_types() + if "*" not in sup_types and data_type not in sup_types: + raise ValueError( + f"Invalid module configuration, type '{data_type}' not supported. Supported types: {', '.join(self.get_supported_value_types())}." + ) + + return data_type + + @property + def metadata_schema(self) -> str: + if self._metadata_schema is not None: + return self._metadata_schema + + schema = self._get_metadata_schema(type=self.value_type) + if isinstance(schema, type) and issubclass(schema, BaseModel): + schema = schema.schema_json() + elif not isinstance(schema, str): + raise TypeError(f"Invalid type for metadata schema: {type(schema)}") + + self._metadata_schema = schema + return self._metadata_schema + + @abc.abstractmethod + def _get_metadata_schema( + self, type: str + ) -> typing.Union[str, typing.Type[BaseModel]]: + """Create the metadata schema for the configured type.""" + + @abc.abstractmethod + def extract_metadata(self, value: Value) -> typing.Mapping[str, typing.Any]: + pass + + def create_input_schema( + self, + ) -> typing.Mapping[ + str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]] + ]: + + inputs = { + "value": { + "type": self.value_type, + "doc": f"A value of type '{self.value_type}'", + "optional": False, + } + } + return inputs + + def create_output_schema( + self, + ) -> typing.Mapping[ + str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]] + ]: + outputs = { + "metadata": {"type": "dict", "doc": "The metadata for the provided value."}, + "metadata_schema": { + "type": "string", + "doc": "The (json) schema for the metadata.", + }, + } + + return outputs + + def process(self, inputs: ValueSet, outputs: ValueSet) -> None: + + value = inputs.get_value_obj("value") + if value.value_schema.type != self.value_type: + raise KiaraProcessingException( + f"Can't extract metadata for value of type '{value.value_schema.type}'. Expected type '{self.value_type}'." + ) + + outputs.set_value("metadata_schema", self.metadata_schema) + metadata = self.extract_metadata(value) + # TODO: validate metadata? + outputs.set_value("metadata", metadata) + + +class ExtractPythonClass(ExtractMetadataModule): + @classmethod + def _get_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]: + return "*" + + @classmethod + def get_metadata_key(cls) -> str: + return "python_cls" + + def _get_metadata_schema( + self, type: str + ) -> typing.Union[str, typing.Type[BaseModel]]: + class PythonClassModel(BaseModel): + class_name: str = Field(description="The name of the Python class") + module_name: str = Field( + description="The name of the Python module this class lives in." + ) + full_name: str = Field(description="The full class namespace.") + + return PythonClassModel + + def extract_metadata(self, value: Value) -> typing.Mapping[str, typing.Any]: + + item = value.get_value_data() + cls = item.__class__ + + return { + "class_name": cls.__name__, + "module_name": cls.__module__, + "full_name": f"{cls.__module__}.{cls.__name__}", + } diff --git a/src/kiara/modules/type_conversion.py b/src/kiara/modules/type_conversion.py new file mode 100644 index 000000000..e1e36591d --- /dev/null +++ b/src/kiara/modules/type_conversion.py @@ -0,0 +1,128 @@ +# -*- coding: utf-8 -*- +import abc +import typing +from pydantic import Field + +from kiara import KiaraModule +from kiara.data.values import Value, ValueSchema, ValueSet +from kiara.exceptions import KiaraProcessingException +from kiara.module_config import KiaraModuleConfig + + +class TypeConversionModuleConfig(KiaraModuleConfig): + + source_type: str = Field(description="The source type.") + target_type: str = Field(description="The target type.") + + +class TypeConversionModule(KiaraModule): + + _config_cls = TypeConversionModuleConfig + + @classmethod + @abc.abstractmethod + def _get_supported_source_types(self) -> typing.Union[typing.Iterable[str], str]: + pass + + @classmethod + @abc.abstractmethod + def _get_target_types(self) -> typing.Union[typing.Iterable[str], str]: + pass + + @classmethod + def get_supported_source_types(self) -> typing.Set[str]: + + _types: typing.Iterable[str] = self._get_supported_source_types() + if isinstance(_types, str): + _types = [_types] + + if "config" in _types: + raise Exception("Invalid source type, type name 'config' is invalid.") + return set(_types) + + @classmethod + def get_supported_target_types(self) -> typing.Set[str]: + + _types: typing.Iterable[str] = self._get_target_types() + if isinstance(_types, str): + _types = [_types] + return set(_types) + + def __init__(self, *args, **kwargs): + + super().__init__(*args, **kwargs) + + @property + def source_type(self) -> str: + data_type = self.get_config_value("source_type") + supported = self.get_supported_source_types() + if "*" not in supported and data_type not in supported: + raise ValueError( + f"Invalid module configuration, source type '{data_type}' not supported. Supported types: {', '.join(self.get_supported_source_types())}." + ) + + return data_type + + @property + def target_type(self) -> str: + data_type = self.get_config_value("target_type") + if data_type not in self.get_supported_target_types(): + raise ValueError( + f"Invalid module configuration, target type '{data_type}' not supported. Supported types: {', '.join(self.get_supported_target_types())}." + ) + + return data_type + + def create_input_schema( + self, + ) -> typing.Mapping[ + str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]] + ]: + + inputs: typing.Mapping[str, typing.Any] = { + "source_value": { + "type": self.source_type, + "doc": f"A value of type '{self.source_type}'.", + }, + "config": { + "type": "dict", + "doc": "The configuration for the transformation.", + "optional": True, + }, + } + + return inputs + + def create_output_schema( + self, + ) -> typing.Mapping[ + str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]] + ]: + outputs = { + "target_value": { + "type": self.target_type, + "doc": f"A value of type '{self.target_type}'.", + } + } + return outputs + + def process(self, inputs: ValueSet, outputs: ValueSet) -> None: + + value = inputs.get_value_obj("source_value") + if value.value_schema.type != self.source_type: + raise KiaraProcessingException( + f"Can't convert value of source type '{value.value_schema.type}'. Expected type '{self.source_type}'." + ) + config = inputs.get_value_data("config") + if config is None: + config = {} + + target_value = self.convert(value=value, config=config) + # TODO: validate value? + outputs.set_value("target_value", target_value) + + @abc.abstractmethod + def convert( + self, value: Value, config: typing.Mapping[str, typing.Any] + ) -> typing.Any: + pass diff --git a/src/kiara/pipeline/module.py b/src/kiara/pipeline/module.py index 3f3bb0725..c829be530 100644 --- a/src/kiara/pipeline/module.py +++ b/src/kiara/pipeline/module.py @@ -59,7 +59,6 @@ def __init__( if controller is None: controller = BatchController() - self._pipeline_structure: typing.Optional[PipelineStructure] = None self._pipeline_controller: PipelineController = controller super().__init__( id=id, @@ -68,21 +67,23 @@ def __init__( meta=meta, kiara=kiara, ) + self._pipeline_structure: PipelineStructure = self._create_structure() + assert not self._config.constants + self._config.constants = dict(self._pipeline_structure.constants) @property def structure(self) -> PipelineStructure: """The ``PipelineStructure`` of this module.""" - if self._pipeline_structure is None: - self._pipeline_structure = PipelineStructure( - parent_id=self.full_id, - steps=self._config.steps, - input_aliases=self._config.input_aliases, - output_aliases=self._config.output_aliases, - kiara=self._kiara, - ) return self._pipeline_structure + def _create_structure(self) -> PipelineStructure: + + pipeline_structure = PipelineStructure( + parent_id=self.full_id, config=self.config, kiara=self._kiara + ) + return pipeline_structure + def create_input_schema(self) -> typing.Mapping[str, ValueSchema]: return self.structure.pipeline_input_schema diff --git a/src/kiara/pipeline/pipeline.py b/src/kiara/pipeline/pipeline.py index 845005b8d..054668f72 100644 --- a/src/kiara/pipeline/pipeline.py +++ b/src/kiara/pipeline/pipeline.py @@ -58,7 +58,7 @@ class Pipeline(object): def __init__( self, structure: PipelineStructure, - constants: typing.Optional[typing.Mapping[str, typing.Any]] = None, + # constants: typing.Optional[typing.Mapping[str, typing.Any]] = None, controller: typing.Optional[PipelineController] = None, ): @@ -75,10 +75,6 @@ def __init__( self._kiara: Kiara = self._structure._kiara - if constants is None: - constants = {} - self._constants: typing.Mapping[str, typing.Any] = constants - self._data_registry: DataRegistry = self._kiara.data_registry self._init_values() @@ -226,15 +222,26 @@ def _init_values(self): # if the pipeline input wasn't created by another step input before, # we need to take care of it here - constant = self._constants.get( - connected_pipeline_input_name, None - ) + if pipeline_input_field.is_constant: + init_value = self.structure.constants[ + pipeline_input_field.value_name + ] + origin = f"pipeline_input:{self.structure.pipeline_id}.{input_name} (constant)" + else: + init_value = self.structure.defaults.get( + pipeline_input_field.value_name, None + ) + if init_value is not None: + origin = f"pipeline_input:{self.structure.pipeline_id}.{input_name} (default)" + else: + origin = f"pipeline_input:{self.structure.pipeline_id}.{input_name}" + pipeline_input = self._data_registry.register_value( value_schema=pipeline_input_field.value_schema, value_fields=pipeline_input_field, - is_constant=False if constant is None else True, - initial_value=constant, - origin=f"pipeline_input:{self.structure.pipeline_id}.{input_name}", + is_constant=pipeline_input_field.is_constant, + initial_value=init_value, + origin=origin, ) self._data_registry.register_callback( self.values_updated, pipeline_input diff --git a/src/kiara/pipeline/structure.py b/src/kiara/pipeline/structure.py index 7c103254f..b348e6a68 100644 --- a/src/kiara/pipeline/structure.py +++ b/src/kiara/pipeline/structure.py @@ -2,9 +2,10 @@ import networkx as nx import typing import uuid +from bidict import frozenbidict from functools import lru_cache from networkx import NetworkXNoPath, NodeNotFound -from pydantic import BaseModel, Extra, Field, PrivateAttr +from pydantic import BaseModel, Extra, Field, PrivateAttr, validator from rich import box from rich.console import Console, ConsoleOptions, RenderGroup, RenderResult from rich.markdown import Markdown @@ -22,10 +23,12 @@ ) from kiara.defaults import DEFAULT_NO_DESC_VALUE, PIPELINE_PARENT_MARKER, SpecialValue from kiara.module import KiaraModule +from kiara.pipeline.utils import create_pipeline_structure_desc, extend_pipeline if typing.TYPE_CHECKING: from kiara.kiara import Kiara - from kiara.module_config import PipelineStepConfig + from kiara.module_config import PipelineModuleConfig, PipelineStepConfig + from kiara.pipeline.pipeline import Pipeline class PipelineStep(BaseModel): @@ -62,6 +65,15 @@ def create_steps( _module: typing.Optional[KiaraModule] = PrivateAttr(default=None) + @validator("step_id") + def _validate_step_id(cls, v): + + assert isinstance(v, str) + if "." in v: + raise ValueError("Step ids can't contain '.' characters.") + + return v + step_id: str parent_id: str module_type: str = Field(description="The module type.") @@ -235,13 +247,16 @@ class PipelineStructure(object): def __init__( self, parent_id: str, - steps: typing.Iterable["PipelineStepConfig"], - input_aliases: typing.Union[str, typing.Mapping[str, str]] = None, - output_aliases: typing.Union[str, typing.Mapping[str, str]] = None, - add_all_workflow_outputs: bool = False, + config: "PipelineModuleConfig", kiara: typing.Optional["Kiara"] = None, ): + self._structure_config: "PipelineModuleConfig" = config + + steps = self._structure_config.steps + input_aliases = self._structure_config.input_aliases + output_aliases = self._structure_config.output_aliases + if not steps: raise Exception("No steps provided.") @@ -276,12 +291,16 @@ def __init__( self._steps, output_aliases, "outputs" ) - self._input_aliases: typing.Mapping[str, str] = input_aliases # type: ignore + self._input_aliases: frozenbidict[str, str] = frozenbidict(input_aliases) # type: ignore if output_aliases is None: output_aliases = {} - self._output_aliases: typing.Mapping[str, str] = output_aliases # type: ignore + self._output_aliases: frozenbidict[str, str] = frozenbidict(output_aliases) # type: ignore - self._add_all_workflow_outputs: bool = add_all_workflow_outputs + # this is hardcoded for now + self._add_all_workflow_outputs: bool = False + + self._constants: typing.Dict[str, typing.Any] = None # type: ignore + self._defaults: typing.Dict[str, typing.Any] = None # type: ignore self._execution_graph: nx.DiGraph = None # type: ignore self._data_flow_graph: nx.DiGraph = None # type: ignore @@ -296,6 +315,10 @@ def __init__( def pipeline_id(self) -> str: return self._pipeline_id + @property + def structure_config(self) -> "PipelineModuleConfig": + return self._structure_config + @property def steps(self) -> typing.Iterable[PipelineStep]: return self._steps @@ -317,6 +340,20 @@ def step_ids(self) -> typing.Iterable[str]: self._process_steps() return self._steps_details.keys() + @property + def constants(self) -> typing.Mapping[str, typing.Any]: + + if self._constants is None: + self._process_steps() + return self._constants + + @property + def defaults(self) -> typing.Mapping[str, typing.Any]: + + if self._defaults is None: + self._process_steps() + return self._defaults + def get_step(self, step_id: str) -> PipelineStep: d = self.steps_details.get(step_id, None) @@ -436,6 +473,8 @@ def _process_steps(self): data_flow_graph = nx.DiGraph() data_flow_graph_simple = nx.DiGraph() processing_stages = [] + constants = {} + structure_defaults = {} # temp variable, to hold all outputs outputs: typing.Dict[str, StepOutputField] = {} @@ -518,9 +557,16 @@ def _process_steps(self): other_step_dependency: typing.Set = set() # go through all the inputs of a module, create input points and connect them to either # other module outputs, or pipeline inputs (which need to be created) + + module_constants: typing.Mapping[ + str, typing.Any + ] = step.module.get_config_value("constants") + for input_name, schema in step.module.input_schemas.items(): matching_input_links: typing.List[StepValueAddress] = [] + is_constant = input_name in module_constants.keys() + for value_name, input_links in step.input_links.items(): if value_name == input_name: for input_link in input_links: @@ -541,7 +587,7 @@ def _process_steps(self): if output_id not in outputs.keys(): raise Exception( - f"Can't connect input '{input_name}' for step '{step.step_id}': no output '{output_id}' available." + f"Can't connect input '{input_name}' for step '{step.step_id}': no output '{output_id}' available. Available output names: {', '.join(outputs.keys())}" ) connected_output_points.append(outputs[output_id]) connected_outputs.append(input_link) @@ -553,6 +599,7 @@ def _process_steps(self): pipeline_id=self._pipeline_id, value_name=input_name, value_schema=schema, + is_constant=is_constant, connected_pipeline_input=None, connected_outputs=connected_outputs, ) @@ -572,6 +619,7 @@ def _process_steps(self): pipeline_input_name = generate_pipeline_endpoint_name( step_id=step.step_id, value_name=input_name ) + # check whether this input has an alias associated with it if self._input_aliases: if pipeline_input_name in self._input_aliases.keys(): # this means we use the pipeline alias @@ -585,12 +633,14 @@ def _process_steps(self): connected_pipeline_input = existing_pipeline_input_points[ pipeline_input_name ] + assert connected_pipeline_input.is_constant == is_constant else: # we need to create the pipeline input connected_pipeline_input = PipelineInputField( value_name=pipeline_input_name, value_schema=schema, pipeline_id=self._pipeline_id, + is_constant=is_constant, ) existing_pipeline_input_points[ @@ -603,6 +653,20 @@ def _process_steps(self): data_flow_graph_simple.add_node( connected_pipeline_input, type=PipelineInputField.__name__ ) + if is_constant: + constants[ + pipeline_input_name + ] = step.module.get_config_value("constants")[input_name] + + default_val = step.module.get_config_value("defaults").get( + input_name, None + ) + if is_constant and default_val is not None: + raise Exception( + f"Module config invalid for step '{step.step_id}': both default value and constant provided for input '{input_name}'." + ) + elif default_val is not None: + structure_defaults[pipeline_input_name] = default_val step_input_point = StepInputField( step_id=step.step_id, @@ -652,6 +716,8 @@ def _process_steps(self): steps_details[_step_id]["processing_stage"] = i steps_details[_step_id]["step"].processing_stage = i + self._constants = constants + self._defaults = structure_defaults self._steps_details = steps_details self._execution_graph = execution_graph self._data_flow_graph = data_flow_graph @@ -716,64 +782,24 @@ def _process_steps(self): if optional: inp.value_schema.optional = True - def to_details(self) -> "PipelineStructureDesc": - - steps = {} - workflow_inputs: typing.Dict[str, typing.List[str]] = {} - workflow_outputs: typing.Dict[str, str] = {} - - for m_id, details in self.steps_details.items(): - - step = details["step"] - - input_connections: typing.Dict[str, typing.List[str]] = {} - for k, v in details["inputs"].items(): - - if v.connected_pipeline_input is not None: - connected_item = v.connected_pipeline_input - input_connections[k] = [ - generate_step_alias(PIPELINE_PARENT_MARKER, connected_item) - ] - workflow_inputs.setdefault(f"{connected_item}", []).append(v.alias) - elif v.connected_outputs is not None: - assert len(v.connected_outputs) > 0 - for co in v.connected_outputs: - input_connections.setdefault(k, []).append(co.alias) - else: - raise TypeError(f"Invalid connection type: {type(connected_item)}") - - output_connections: typing.Dict[str, typing.Any] = {} - for k, v in details["outputs"].items(): - for connected_item in v.connected_inputs: + def extend( + self, + other: typing.Union[ + "Pipeline", + "PipelineStructure", + "PipelineModuleConfig", + typing.Mapping[str, typing.Any], + ], + input_links: typing.Optional[ + typing.Mapping[str, typing.Iterable[StepValueAddress]] + ] = None, + ) -> "PipelineStructure": + + return extend_pipeline(self, other) - output_connections.setdefault(k, []).append( - generate_step_alias( - connected_item.step_id, connected_item.value_name - ) - ) - if v.pipeline_output: - output_connections.setdefault(k, []).append( - generate_step_alias(PIPELINE_PARENT_MARKER, v.pipeline_output) - ) - workflow_outputs[v.pipeline_output] = v.alias - - steps[step.step_id] = StepDesc( - step=step, - processing_stage=details["processing_stage"], - input_connections=input_connections, - output_connections=output_connections, - required=step.required, - ) + def to_details(self) -> "PipelineStructureDesc": - return PipelineStructureDesc( - pipeline_id=self._pipeline_id, - steps=steps, - processing_stages=self.processing_stages, - pipeline_input_connections=workflow_inputs, - pipeline_output_connections=workflow_outputs, - pipeline_inputs=self.pipeline_inputs, - pipeline_outputs=self.pipeline_outputs, - ) + return create_pipeline_structure_desc(self) def __rich_console__( self, console: Console, options: ConsoleOptions diff --git a/src/kiara/pipeline/utils.py b/src/kiara/pipeline/utils.py new file mode 100644 index 000000000..a1987226f --- /dev/null +++ b/src/kiara/pipeline/utils.py @@ -0,0 +1,211 @@ +# -*- coding: utf-8 -*- +import typing +from bidict import bidict + +from kiara.data.values import StepValueAddress, generate_step_alias +from kiara.defaults import PIPELINE_PARENT_MARKER + +if typing.TYPE_CHECKING: + from kiara.module_config import PipelineModuleConfig + from kiara.pipeline.pipeline import Pipeline + from kiara.pipeline.structure import PipelineStructure, PipelineStructureDesc + + +def create_pipeline_structure_desc( + pipeline: typing.Union["Pipeline", "PipelineStructure"] +) -> "PipelineStructureDesc": + + from kiara.pipeline.pipeline import Pipeline + from kiara.pipeline.structure import ( + PipelineStructure, + PipelineStructureDesc, + StepDesc, + ) + + if isinstance(pipeline, Pipeline): + structure: PipelineStructure = pipeline.structure + elif isinstance(pipeline, PipelineStructure): + structure = pipeline + + steps = {} + workflow_inputs: typing.Dict[str, typing.List[str]] = {} + workflow_outputs: typing.Dict[str, str] = {} + + for m_id, details in structure.steps_details.items(): + + step = details["step"] + + input_connections: typing.Dict[str, typing.List[str]] = {} + for k, v in details["inputs"].items(): + + if v.connected_pipeline_input is not None: + connected_item = v.connected_pipeline_input + input_connections[k] = [ + generate_step_alias(PIPELINE_PARENT_MARKER, connected_item) + ] + workflow_inputs.setdefault(f"{connected_item}", []).append(v.alias) + elif v.connected_outputs is not None: + assert len(v.connected_outputs) > 0 + for co in v.connected_outputs: + input_connections.setdefault(k, []).append(co.alias) + else: + raise TypeError(f"Invalid connection type: {type(connected_item)}") + + output_connections: typing.Dict[str, typing.Any] = {} + for k, v in details["outputs"].items(): + for connected_item in v.connected_inputs: + + output_connections.setdefault(k, []).append( + generate_step_alias( + connected_item.step_id, connected_item.value_name + ) + ) + if v.pipeline_output: + output_connections.setdefault(k, []).append( + generate_step_alias(PIPELINE_PARENT_MARKER, v.pipeline_output) + ) + workflow_outputs[v.pipeline_output] = v.alias + + steps[step.step_id] = StepDesc( + step=step, + processing_stage=details["processing_stage"], + input_connections=input_connections, + output_connections=output_connections, + required=step.required, + ) + + return PipelineStructureDesc( + pipeline_id=structure._pipeline_id, + steps=steps, + processing_stages=structure.processing_stages, + pipeline_input_connections=workflow_inputs, + pipeline_output_connections=workflow_outputs, + pipeline_inputs=structure.pipeline_inputs, + pipeline_outputs=structure.pipeline_outputs, + ) + + +def extend_pipeline( + pipeline: typing.Union["Pipeline", "PipelineStructure"], + other: typing.Union[ + "Pipeline", + "PipelineStructure", + "PipelineModuleConfig", + typing.Mapping[str, typing.Any], + ], + input_links: typing.Optional[ + typing.Mapping[str, typing.Iterable[StepValueAddress]] + ] = None, +): + + from kiara.module_config import PipelineModuleConfig + from kiara.pipeline.pipeline import Pipeline + from kiara.pipeline.structure import PipelineStructure + + if isinstance(pipeline, Pipeline): + structure: PipelineStructure = pipeline.structure + elif isinstance(pipeline, PipelineStructure): + structure = pipeline + + other_pipeline_config: typing.Optional[PipelineModuleConfig] = None + + other_name = "extended" + + if isinstance(other, typing.Mapping): + other_pipeline_config = PipelineModuleConfig(**other) + elif isinstance(other, PipelineModuleConfig): + other_pipeline_config = other + elif isinstance(other, PipelineStructure): + other_pipeline_config = other.structure_config + other_name = other.pipeline_id + + if other_pipeline_config is None: + from kiara.pipeline.pipeline import Pipeline + + if isinstance(other, Pipeline): + other_pipeline_config = other.structure.structure_config + + if other_pipeline_config is None: + raise TypeError( + f"Can't extend pipeline structure, invalid type: {type(other)}." + ) + + other_structure: PipelineStructure = PipelineStructure( + parent_id="_", config=other_pipeline_config, kiara=structure._kiara + ) + + step_id_overlap = [ + el for el in set(structure.step_ids) if el in other_structure.step_ids + ] + if step_id_overlap: + raise Exception( + f"Can't extend pipeline, duplicate step id(s) are not allowed: {', '.join(step_id_overlap)}." + ) + + if input_links is None: + input_links = {} + else: + input_links = dict(input_links) + + full_input_links: typing.Dict[ + str, typing.Dict[str, typing.Iterable[StepValueAddress]] + ] = {} + for input_name, input in other_structure.pipeline_inputs.items(): + if input_name not in input_links.keys(): + # this means we try to see whether there is an output in this structure with the same name + if input_name in structure.pipeline_outputs.keys(): + output_step_address: typing.Iterable[StepValueAddress] = [ + structure.pipeline_outputs[input_name].connected_output + ] + else: + # output_step_address = "NEW INPUT" + raise NotImplementedError() + else: + output_step_address = input_links[input_name] + + connected_inputs = other_structure.pipeline_inputs[input_name].connected_inputs + if len(connected_inputs) != 1: + raise NotImplementedError() + else: + connected_input = connected_inputs[0] + full_input_links.setdefault(connected_input.step_id, {})[ + connected_input.value_name + ] = output_step_address + + new_input_aliases = bidict(structure._input_aliases) + if not other_structure._output_aliases: + new_output_aliases: typing.Union[str, typing.Mapping[str, str]] = "auto" + else: + new_output_aliases = bidict(other_structure._output_aliases) + + config = structure.structure_config.dict( + exclude={"input_aliases", "output_aliases", "steps"} + ) + config["input_aliases"] = new_input_aliases + config["output_aliases"] = new_output_aliases + + new_steps = [ + step.dict(exclude={"parent_id", "processing_stage", "required"}) + for step in structure.steps + ] + + for step in other_structure.steps: + step_dict = step.dict(exclude={"parent_id", "processing_stage", "required"}) + if step.step_id in full_input_links.keys(): + if step.step_id not in full_input_links.keys(): + new_steps.append(step_dict) + else: + for input_name in full_input_links[step.step_id].keys(): + if input_name in step_dict["input_links"].keys(): + raise NotImplementedError() + step_dict["input_links"][input_name] = full_input_links[ + step.step_id + ][input_name] + new_steps.append(step_dict) + + config["steps"] = new_steps + pmc = PipelineModuleConfig(**config) + new_structure = pmc.create_structure( + f"{structure.pipeline_id}_{other_name}", kiara=structure._kiara + ) + return new_structure diff --git a/src/kiara/profiles.py b/src/kiara/profiles.py new file mode 100644 index 000000000..c90bcf506 --- /dev/null +++ b/src/kiara/profiles.py @@ -0,0 +1,197 @@ +# -*- coding: utf-8 -*- +import typing +from pydantic import BaseModel, Extra, Field, PrivateAttr + +if typing.TYPE_CHECKING: + from kiara import Kiara + from kiara.module import KiaraModule + + +class ModuleProfileConfig(BaseModel): + class Config: + extra = Extra.forbid + validate_all = True + + _module: typing.Optional["KiaraModule"] = PrivateAttr(default=None) + module_type: str = Field(description="The module type.") + module_config: typing.Dict[str, typing.Any] = Field( + default_factory=dict, description="The configuration for the module." + ) + + def create_module(self, kiara: "Kiara"): + + if self._module is None: + self._module = kiara.create_module( + id=f"extract_metadata_{self.module_type}", + module_type=self.module_type, + module_config=self.module_config, + ) + return self._module + + +class TypeConversionProfileConfig(ModuleProfileConfig): + + convert_source_type: str = Field(description="The source type.") + convert_target_type: str = Field(description="The target type.") + doc: str = Field( + description="Documentation of what this profile does/is appropriate for.", + default="-- n/a --", + ) + + +class ExtractMetadataProfileConfig(ModuleProfileConfig): + + value_type: str = Field( + description="The type of the value to extract metadata from." + ) + + +class ModuleProfileMgmt(object): + def __init__(self, kiara: "Kiara"): + + self._kiara: "Kiara" = kiara + self._convert_profiles: typing.Dict[ + str, typing.Dict[str, TypeConversionProfileConfig] + ] = None # type: ignore + self._metadata_profiles: typing.Dict[ + str, typing.Dict[str, ExtractMetadataProfileConfig] + ] = None # type: ignore + + @property + def extract_metadata_profiles( + self, + ) -> typing.Mapping[str, typing.Mapping[str, ExtractMetadataProfileConfig]]: + + if self._metadata_profiles is not None: + return self._metadata_profiles + + from kiara.modules.metadata import ExtractMetadataModule + + all_metadata_profiles = {} + for module_type in self._kiara.available_module_types: + + cls = self._kiara.get_module_class(module_type=module_type) + + if issubclass(cls, ExtractMetadataModule): + value_types = cls.get_supported_value_types() + if "*" in value_types: + value_types = self._kiara._type_mgmt.value_type_names + metadata_key = cls.get_metadata_key() + + for value_type in value_types: + if ( + metadata_key + in all_metadata_profiles.setdefault(value_type, {}).keys() + ): + raise Exception( + f"Multiple profiles for type '{value_type}' and metadata key '{metadata_key}'. This is not allowed." + ) # yet, anyway + + mc = {"type": value_type} + all_metadata_profiles[value_type][ + metadata_key + ] = ExtractMetadataProfileConfig( + module_type=module_type, module_config=mc, value_type=value_type + ) + + if hasattr(cls, "_extract_metadata_profiles"): + profiles: typing.Mapping[str, typing.Mapping[str, typing.Any]] = cls._convert_profiles # type: ignore + for value_type, extract_details in profiles.items(): + for metadata_key, module_config in extract_details.items(): + if ( + metadata_key + in all_metadata_profiles.setdefault(value_type, {}).keys() + ): + raise Exception( + f"Multiple profiles for type '{value_type}' and metadata key '{metadata_key}'. This is not allowed." + ) # yet, anyway + all_metadata_profiles[value_type][ + metadata_key + ] = ExtractMetadataProfileConfig( + module_type=module_type, + module_config=module_config, + value_type=value_type, + ) + + self._metadata_profiles = all_metadata_profiles + return self._metadata_profiles + + @property + def type_convert_profiles( + self, + ) -> typing.Mapping[str, typing.Mapping[str, TypeConversionProfileConfig]]: + + if self._convert_profiles is not None: + return self._convert_profiles + + from kiara.modules.type_conversion import TypeConversionModule + + all_convert_profiles = {} + for module_type in self._kiara.available_module_types: + cls = self._kiara.get_module_class(module_type=module_type) + + if issubclass(cls, TypeConversionModule): + source_types = cls.get_supported_source_types() + if "*" in source_types: + source_types = self._kiara._type_mgmt.value_type_names + target_types = cls.get_supported_target_types() + + for source_type in source_types: + for target_type in target_types: + if ( + target_type + in all_convert_profiles.setdefault(source_type, {}).keys() + ): + raise Exception( + f"Multiple convert targets for '{source_type} -> {target_type}', this is not allowed." + ) # yet, anyway + mc = {"source_type": source_type, "target_type": target_type} + all_convert_profiles[source_type][ + target_type + ] = TypeConversionProfileConfig( + module_type=module_type, + module_config=mc, + convert_source_type=source_type, + convert_target_type=target_type, + ) + + if hasattr(cls, "_convert_profiles"): + profiles: typing.Mapping[str, typing.Mapping[str, typing.Any]] = cls._convert_profiles # type: ignore + for source_type, module_config in profiles.items(): + for target_type, details in module_config.items(): + if ( + target_type + in all_convert_profiles.setdefault(source_type, {}).keys() + ): + raise Exception( + f"Multiple convert targets for '{source_type} -> {target_type}', this is not allowed." + ) # yet, anyway + all_convert_profiles[source_type][ + target_type + ] = TypeConversionProfileConfig( + module_type=module_type, + module_config=module_config, + convert_source_type=source_type, + convert_target_type=target_type, + ) + + self._convert_profiles = all_convert_profiles + return self._convert_profiles + + def get_type_conversion_module( + self, source_type: str, target_type: str + ) -> "KiaraModule": + + all_source_profiles = self.type_convert_profiles.get(source_type, None) + if not all_source_profiles: + raise Exception( + f"No type conversion profiles for source type '{source_type}'." + ) + + convert_profile = all_source_profiles.get(target_type, None) + if not convert_profile: + raise Exception( + f"No target conversion profile '{target_type}' for source type '{source_type}' available." + ) + + return convert_profile.create_module(self._kiara) diff --git a/src/kiara/utils/__init__.py b/src/kiara/utils/__init__.py index 3380214a2..9667aa904 100644 --- a/src/kiara/utils/__init__.py +++ b/src/kiara/utils/__init__.py @@ -16,6 +16,7 @@ model_process_schema, ) from rich import box +from rich.console import ConsoleRenderable, RichCast from rich.table import Table from ruamel.yaml import YAML from types import ModuleType @@ -48,6 +49,10 @@ def log_message(msg: str): log.debug(msg) +def is_rich_renderable(item: typing.Any): + return isinstance(item, (ConsoleRenderable, RichCast, str)) + + def get_data_from_file( path: Union[str, Path], content_type: typing.Optional[str] = None ) -> typing.Any: @@ -160,6 +165,7 @@ def create_table_from_field_schemas( _add_default: bool = True, _add_required: bool = True, _show_header: bool = False, + _constants: typing.Optional[typing.Mapping[str, typing.Any]] = None, **fields: "ValueSchema", ): @@ -167,10 +173,14 @@ def create_table_from_field_schemas( table.add_column("Field name", style="i") table.add_column("Type") table.add_column("Description") + if _add_required: table.add_column("Required") if _add_default: - table.add_column("Default") + if _constants: + table.add_column("Default / Constant") + else: + table.add_column("Default") for field_name, schema in fields.items(): @@ -192,10 +202,17 @@ def create_table_from_field_schemas( row.append(req_str) if _add_default: - if schema.default in [None, SpecialValue.NO_VALUE, SpecialValue.NOT_SET]: - d = "-- no default --" + if _constants and field_name in _constants.keys(): + d = f"[b]{_constants[field_name]}[/b] (constant)" else: - d = str(schema.default) + if schema.default in [ + None, + SpecialValue.NO_VALUE, + SpecialValue.NOT_SET, + ]: + d = "-- no default --" + else: + d = str(schema.default) row.append(d) table.add_row(*row) diff --git a/src/kiara/utils/class_loading.py b/src/kiara/utils/class_loading.py index 0c5a8f251..e018e5145 100644 --- a/src/kiara/utils/class_loading.py +++ b/src/kiara/utils/class_loading.py @@ -185,6 +185,7 @@ def find_all_kiara_modules() -> typing.Dict[str, typing.Type["KiaraModule"]]: if isinstance(plugin.plugin, type) and issubclass(plugin.plugin, KiaraModule): ep = plugin.entry_point module_cls = ep.load() + setattr(module_cls, "_module_type_name", name) result_entrypoints[name] = module_cls elif ( isinstance(plugin.plugin, tuple) diff --git a/src/kiara/utils/output.py b/src/kiara/utils/output.py new file mode 100644 index 000000000..3e981c49b --- /dev/null +++ b/src/kiara/utils/output.py @@ -0,0 +1,178 @@ +# -*- coding: utf-8 -*- +import typing +from pyarrow import Table +from pydantic import BaseModel, Field, root_validator +from rich import box +from rich.console import RenderableType +from rich.table import Table as RichTable + +from kiara.utils import dict_from_cli_args + + +class OutputDetails(BaseModel): + @classmethod + def from_data(cls, data: typing.Any): + + if isinstance(data, str): + if "=" in data: + data = [data] + else: + data = [f"format={data}"] + + if isinstance(data, typing.Iterable): + data = list(data) + if len(data) == 1 and isinstance(data[0], str) and "=" not in data[0]: + data = [f"format={data[0]}"] + output_details_dict = dict_from_cli_args(*data) + else: + raise TypeError( + f"Can't parse output detail config: invalid input type '{type(data)}'." + ) + + output_details = OutputDetails(**output_details_dict) + return output_details + + format: str = Field(description="The output format.") + target: str = Field(description="The output target.") + config: typing.Dict[str, typing.Any] = Field( + description="Output configuration.", default_factory=dict + ) + + @root_validator(pre=True) + def _set_defaults(cls, values): + + target: str = values.pop("target", "terminal") + format: str = values.pop("format", None) + if format is None: + if target == "terminal": + format = "terminal" + else: + if target == "file": + format = "json" + else: + ext = target.split(".")[-1] + if ext in ["yaml", "json"]: + format = ext + else: + format = "json" + result = {"format": format, "target": target, "config": dict(values)} + + return result + + +def pretty_print_arrow_table( + table: Table, + rows_head: typing.Optional[int] = None, + rows_tail: typing.Optional[int] = None, + max_row_height: typing.Optional[int] = None, + max_cell_length: typing.Optional[int] = None, +) -> RenderableType: + + rich_table = RichTable(box=box.SIMPLE) + for cn in table.column_names: + rich_table.add_column(cn) + + num_split_rows = 1 + + if rows_head is not None: + + if rows_head < 0: + rows_head = 0 + + if rows_head > table.num_rows: + rows_head = table.num_rows + rows_tail = None + num_split_rows = 0 + + if rows_tail is not None: + if rows_head + rows_tail >= table.num_rows: # type: ignore + rows_head = table.num_rows + rows_tail = None + num_split_rows = 0 + else: + num_split_rows = 0 + + if rows_head is not None: + head = table.slice(0, rows_head) + num_rows = rows_head + else: + head = table + num_rows = table.num_rows + + table_dict = head.to_pydict() + for i in range(0, num_rows): + row = [] + for cn in table.column_names: + cell = table_dict[cn][i] + cell_str = str(cell) + if max_row_height and max_row_height > 0 and "\n" in cell_str: + lines = cell_str.split("\n") + if len(lines) > max_row_height: + if max_row_height == 1: + lines = lines[0:1] + else: + half = int(len(lines) / 2) + lines = lines[0:half] + [".."] + lines[-half:] + cell_str = "\n".join(lines) + + if max_cell_length and max_cell_length > 0: + lines = [] + for line in cell_str.split("\n"): + if len(line) > max_cell_length: + line = line[0:(max_cell_length)] + " ..." + else: + line = line + lines.append(line) + cell_str = "\n".join(lines) + + row.append(cell_str) + + rich_table.add_row(*row) + + if num_split_rows: + for i in range(0, num_split_rows): + row = [] + for _ in table.column_names: + row.append("...") + rich_table.add_row(*row) + + if rows_head: + if rows_tail is not None: + if rows_tail < 0: + rows_tail = 0 + + tail = table.slice(table.num_rows - rows_tail) + table_dict = tail.to_pydict() + for i in range(0, num_rows): + + row = [] + for cn in table.column_names: + cell = table_dict[cn][i] + cell_str = str(cell) + + if max_row_height and max_row_height > 0 and "\n" in cell_str: + lines = cell_str.split("\n") + if len(lines) > max_row_height: + if max_row_height == 1: + lines = lines[0:1] + else: + half = int(len(lines) / 2) + lines = lines[0:half] + [".."] + lines[-half:] + cell_str = "\n".join(lines) + + if max_cell_length and max_cell_length > 0: + lines = [] + for line in cell_str.split("\n"): + + if len(line) > max_cell_length: + line = line[0:(max_cell_length)] + " ..." + else: + line = line + lines.append(line) + cell_str = "\n".join(lines) + + row.append(cell_str) + + rich_table.add_row(*row) + + return rich_table diff --git a/src/kiara/utils/pretty_print.py b/src/kiara/utils/pretty_print.py deleted file mode 100644 index 46557930b..000000000 --- a/src/kiara/utils/pretty_print.py +++ /dev/null @@ -1,75 +0,0 @@ -# -*- coding: utf-8 -*- -import typing -from pyarrow import Table -from rich import box -from rich.console import RenderableType -from rich.table import Table as RichTable - - -def pretty_print_arrow_table( - table: Table, - num_head: typing.Optional[int] = None, - num_tail: typing.Optional[int] = None, -) -> RenderableType: - - rich_table = RichTable(box=box.SIMPLE) - for cn in table.column_names: - rich_table.add_column(cn) - - num_split_rows = 1 - - if num_head is not None: - - if num_head < 0: - num_head = 0 - - if num_head > table.num_rows: - num_head = table.num_rows - num_tail = None - num_split_rows = 0 - - if num_tail is not None: - if num_head + num_tail >= table.num_rows: # type: ignore - num_head = table.num_rows - num_tail = None - num_split_rows = 0 - else: - num_split_rows = 0 - - if num_head is not None: - head = table.slice(0, num_head) - num_rows = num_head - else: - head = table - num_rows = table.num_rows - - table_dict = head.to_pydict() - for i in range(0, num_rows): - row = [] - for cn in table.column_names: - row.append(str(table_dict[cn][i])) - - rich_table.add_row(*row) - - if num_split_rows: - for i in range(0, num_split_rows): - row = [] - for _ in table.column_names: - row.append("...") - rich_table.add_row(*row) - - if num_head: - if num_tail is not None: - if num_tail < 0: - num_tail = 0 - - tail = table.slice(table.num_rows - num_tail) - table_dict = tail.to_pydict() - for i in range(0, num_rows): - row = [] - for cn in table.column_names: - row.append(str(table_dict[cn][i])) - - rich_table.add_row(*row) - - return rich_table diff --git a/src/kiara/workflow.py b/src/kiara/workflow.py index b8427f1d1..a321a1f42 100644 --- a/src/kiara/workflow.py +++ b/src/kiara/workflow.py @@ -3,6 +3,7 @@ from rich import box from rich.console import Console, ConsoleOptions, RenderResult from rich.table import Table +from slugify import slugify from kiara.data.values import ValueSet from kiara.module_config import KiaraWorkflowConfig @@ -45,7 +46,9 @@ def __init__( "steps": [ { "module_type": self._workflow_config.module_type, - "step_id": self._workflow_config.module_type, + "step_id": slugify( + self._workflow_config.module_type, separator="_" + ), "module_config": self._workflow_config.module_config, } ],