Skip to content

Commit

Permalink
feat: allow and manage module namespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed May 27, 2021
1 parent 9167beb commit d603289
Show file tree
Hide file tree
Showing 23 changed files with 679 additions and 270 deletions.
9 changes: 6 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
# Changelog

## Version 0.0.3 (Upcoming)

## Version 0.0.2 (Upcoming)


## Version 0.0.2

- metadata extraction method renamed to 'extract_type_metadata'; also, the type metadata format changed slightly: information extracted by type goes into 'type' subkey, python class information gets added under 'python' (automatically)
- type-hint signature of parameters in ``process`` method in a ``KiaraModule`` changed from ``StepInputs``/``StepOutputs`` to ``ValueSet``
- change all input and output data access within modules to use ``ValueSet.get_value_data()`` ``ValueSet.set_value(s)`` instead of direct attribute access -- for now, direct attribute access is removed because it's not clear whether the access should be for the value object, or the data itself
- 'dict' attribute in ValueData class renamed to 'get_all_value_data'
- add 'ModuleProcessor' class, to be able to implement different module execution strategies (multithreaded, multiprocess, ...)
- added 'ModuleProcessor' class, to be able to implement different module execution strategies (multithreaded, multiprocess, ...)
- renamed ``kiara.config`` module to ``kiara.module_config``
- modules are not split up into several packages: ``kiara_modules.core`` being the most important one, others are topic specific (``language_processing``, ``network_analysis``, ...)
- modules are now split up into several packages: ``kiara_modules.core`` being the most important one, others are topic specific (``language_processing``, ``network_analysis``, ...)

## Version 0.0.1

Expand Down
19 changes: 19 additions & 0 deletions src/kiara/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
# -*- coding: utf-8 -*-
# isort: skip_file

__all__ = [
"Kiara",
"explain",
"KiaraModule",
"Pipeline",
"PipelineStructure",
"PipelineController",
"PipelineModule",
"DataRegistry",
"find_kiara_modules_under",
"find_kiara_pipelines_under",
"KiaraEntryPointItem",
"get_version",
]
import os

from .kiara import Kiara, explain # noqa
Expand All @@ -10,6 +24,11 @@
from .pipeline.controller import PipelineController # noqa
from .pipeline.module import PipelineModule # noqa
from .data.registry import DataRegistry # noqa
from .utils.class_loading import (
find_kiara_modules_under,
find_kiara_pipelines_under,
KiaraEntryPointItem,
)

"""Top-level package for kiara."""

Expand Down
8 changes: 3 additions & 5 deletions src/kiara/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
from pydantic import BaseSettings, Extra, Field, validator

from kiara.defaults import kiara_app_dirs
from kiara.mgmt import (
ModuleManager,
PipelineModuleManagerConfig,
PythonModuleManagerConfig,
)
from kiara.module_mgmt import ModuleManager
from kiara.module_mgmt.pipelines import PipelineModuleManagerConfig
from kiara.module_mgmt.python_classes import PythonModuleManagerConfig
from kiara.processing.parallel import ThreadPoolProcessorConfig
from kiara.processing.synchronous import SynchronousProcessorConfig
from kiara.utils import get_data_from_file
Expand Down
5 changes: 4 additions & 1 deletion src/kiara/data/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ def get_type_transformation_configs(
The name of the transformation is the key of the result dictionary, the configuration is a module configuration
(dictionary wth 'module_type' and optional 'module_config', 'input_name' and 'output_name' keys).
"""
return {"to_string": {"module_type": "pretty_print", "input_name": "item"}}
return {
"to_string": {"module_type": "strings.pretty_print", "input_name": "item"}
}

def __init__(self, **type_config: typing.Any):

Expand Down Expand Up @@ -356,6 +358,7 @@ def validate(cls, value: typing.Any) -> None:
def extract_type_metadata(
cls, value: typing.Any
) -> typing.Mapping[str, typing.Any]:

table: pyarrow.Table = value
table_schema = {}
for name in table.schema.names:
Expand Down
2 changes: 1 addition & 1 deletion src/kiara/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

USER_PIPELINES_FOLDER = os.path.join(kiara_app_dirs.user_config_dir, "pipelines")

RELATIVE_PIPELINES_PATH = os.path.join("resources", "pipelines")
RELATIVE_PIPELINES_PATH = "pipelines"

MODULE_TYPE_KEY = "module_type"
"""The key to specify the type of a module."""
Expand Down
2 changes: 1 addition & 1 deletion src/kiara/doc/mkdocs_macros_kiara.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
Value,
ValueSchema,
)
from kiara.mgmt import PipelineModuleManager
from kiara.module import ModuleInfo
from kiara.module_config import (
KiaraModuleConfig,
KiaraWorkflowConfig,
PipelineModuleConfig,
)
from kiara.module_mgmt.pipelines import PipelineModuleManager
from kiara.pipeline.pipeline import (
PipelineInputEvent,
PipelineOutputEvent,
Expand Down
39 changes: 28 additions & 11 deletions src/kiara/interfaces/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ async def run(ctx, module, inputs, module_config, data_details, only_output):

if module in kiara_obj.available_module_types:
module_name = module
elif f"core.{module}" in kiara_obj.available_module_types:
module_name = f"core.{module}"
elif os.path.isfile(module):
module_name = kiara_obj.register_pipeline_description(
module, raise_exception=True
Expand All @@ -327,18 +329,28 @@ async def run(ctx, module, inputs, module_config, data_details, only_output):
sys.exit(1)

if not inputs:
print()
print(
"No inputs provided, not running the workflow. To run it, provide input following this schema:"
)

module_obj: KiaraModule = _create_module_instance(
ctx=ctx, module_type=module_name, module_config=module_config
)
inputs_table = create_table_from_field_schemas(
_show_header=True, **module_obj.input_schemas
)
rich_print(inputs_table)
sys.exit(0)

one_required = False
for schema in module_obj.input_schemas.values():
if schema.is_required():
one_required = True
break

if one_required:

inputs_table = create_table_from_field_schemas(
_show_header=True, **module_obj.input_schemas
)
print()
print(
"No inputs provided, not running the workflow. To run it, provide input following this schema:"
)
rich_print(inputs_table)
sys.exit(0)

processor = ThreadPoolProcessor()
# processor = None
Expand All @@ -350,16 +362,21 @@ async def run(ctx, module, inputs, module_config, data_details, only_output):
# workflow.pipeline.add_listener(l)

list_keys = []

for name, value in workflow.inputs.items():
if value.value_schema.type in ["array", "list"]:
list_keys.append(name)

workflow_input = dict_from_cli_args(*inputs, list_keys=list_keys)
workflow.inputs.set_values(**workflow_input)

if workflow_input:
workflow.inputs.set_values(**workflow_input)
else:
workflow.controller.process_pipeline()

transformer = "to_string"
transformer_config = {"max_lines": 6}
print()

if display_input_values:
vi = ValuesInfo(workflow.inputs)
vt = vi.create_value_data_table(
Expand Down
17 changes: 14 additions & 3 deletions src/kiara/kiara.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
from kiara.data.registry import DataRegistry
from kiara.data.types import ValueType
from kiara.interfaces import get_console
from kiara.mgmt import ModuleManager, PipelineModuleManager, PythonModuleManager
from kiara.module_config import KiaraWorkflowConfig, PipelineModuleConfig
from kiara.module_mgmt import ModuleManager
from kiara.module_mgmt.pipelines import PipelineModuleManager
from kiara.module_mgmt.python_classes import PythonModuleManager
from kiara.pipeline.controller import PipelineController
from kiara.pipeline.pipeline import Pipeline
from kiara.processing import Job, ModuleProcessor
Expand Down Expand Up @@ -63,7 +65,7 @@ def __init__(self, config: typing.Optional[KiaraConfig] = None):
self._zmq_context: Context = Context.instance()
self._default_python_mgr = PythonModuleManager()
self._default_pipeline_mgr = PipelineModuleManager(folders=None)
self._custom_pipelines_mgr = PipelineModuleManager(folders=[])
self._custom_pipelines_mgr = PipelineModuleManager(folders={})

self.start_zmq_device()
self.start_log_thread()
Expand Down Expand Up @@ -327,6 +329,7 @@ def add_pipeline_folder(self, folder: typing.Union[Path, str]) -> typing.List[st
f"Can't add pipeline folder '{folder.as_posix()}': not a directory"
)

raise NotImplementedError()
added = self._custom_pipelines_mgr.add_pipelines_path(folder)
result = []
for a in added:
Expand All @@ -342,11 +345,12 @@ def register_pipeline_description(
self,
data: typing.Union[Path, str, typing.Mapping[str, typing.Any]],
module_type_name: typing.Optional[str] = None,
namespace: typing.Optional[str] = None,
raise_exception: bool = False,
) -> typing.Optional[str]:

name = self._custom_pipelines_mgr.register_pipeline(
data, module_type_name=module_type_name
data=data, module_type_name=module_type_name, namespace=namespace
)
if name in self._modules.keys():
if raise_exception:
Expand All @@ -373,6 +377,13 @@ def get_module_class(self, module_type: str) -> typing.Type["KiaraModule"]:
raise Exception(f"No module '{module_type}' available.")

cls = mm.get_module_class(module_type)
if not hasattr(cls, "_module_type_name"):
raise Exception(
f"Class does not have a '_module_type_name' attribute: {cls}"
)

assert module_type.endswith(cls._module_type_name) # type: ignore

if hasattr(cls, "_module_type_id") and cls._module_type_id != "pipeline" and cls._module_type_id != module_type: # type: ignore
raise Exception(
f"Can't create module class '{cls}', it already has a _module_type_id attribute and it's different to the module name '{module_type}': {cls._module_type_id}" # type: ignore
Expand Down
3 changes: 2 additions & 1 deletion src/kiara/module.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import abc
import deepdiff
import inspect
import json
Expand Down Expand Up @@ -176,7 +177,7 @@ def sync(self):
self._outputs_staging.clear() # type: ignore


class KiaraModule(typing.Generic[KIARA_CONFIG]):
class KiaraModule(typing.Generic[KIARA_CONFIG], abc.ABC):
"""The base class that every custom module in *Kiara* needs to inherit from.
The core of every ``KiaraModule`` is the [``process``][kiara.module.KiaraModule.process] method, which needs to be
Expand Down
109 changes: 109 additions & 0 deletions src/kiara/module_mgmt/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# -*- coding: utf-8 -*-
import abc
import logging
import typing

if typing.TYPE_CHECKING:
from kiara import Kiara, KiaraModule
from kiara.module_config import KiaraModuleConfig
from kiara.module_mgmt.pipelines import PipelineModuleManagerConfig
from kiara.module_mgmt.python_classes import (
PythonModuleManager,
PythonModuleManagerConfig,
)


log = logging.getLogger("kiara")


# extensions
# ------------------------------------------------------------------------


class ModuleManager(abc.ABC):
@classmethod
def from_config(
cls,
config: typing.Union[
typing.Mapping[str, typing.Any],
"PipelineModuleManagerConfig",
"PythonModuleManagerConfig",
],
) -> "ModuleManager":

from kiara.module_mgmt.pipelines import (
PipelineModuleManager,
PipelineModuleManagerConfig,
)
from kiara.module_mgmt.python_classes import (
PythonModuleManager,
PythonModuleManagerConfig,
)

if isinstance(config, typing.Mapping):
mm_type = config.get("module_manager_type", None)
if not mm_type:
raise ValueError(f"No module manager type provided in config: {config}")
if mm_type == "python":
config = PythonModuleManagerConfig(**config)
elif mm_type == "pipeline":
config = PipelineModuleManagerConfig(**config)
else:
raise ValueError(f"Invalid module manager type: {mm_type}")

if config.module_manager_type == "python":
mm: ModuleManager = PythonModuleManager(
**config.dict(exclude={"module_manager_type"})
)
elif config.module_manager_type == "pipeline":
mm = PipelineModuleManager(**config.dict(exclude={"module_manager_type"}))

return mm

@abc.abstractmethod
def get_module_types(self) -> typing.Iterable[str]:
pass

@abc.abstractmethod
def get_module_class(self, module_type: str) -> typing.Type["KiaraModule"]:
pass

def create_module_config(
self, module_type: str, module_config: typing.Mapping[str, typing.Any]
) -> "KiaraModuleConfig":

cls = self.get_module_class(module_type)
config = cls._config_cls(**module_config)

return config

def create_module(
self,
kiara: "Kiara",
id: str,
module_type: str,
module_config: typing.Optional[typing.Mapping[str, typing.Any]] = None,
parent_id: typing.Optional[str] = None,
) -> "KiaraModule":

module_cls = self.get_module_class(module_type)

module = module_cls(
id=id, parent_id=parent_id, module_config=module_config, kiara=kiara
)
return module


class WorkflowManager(object):
def __init__(self, module_manager: "PythonModuleManager"):

self._module_mgr: "PythonModuleManager" = module_manager

def create_workflow(
self,
workflow_id: str,
config: typing.Union[str, typing.Mapping[str, typing.Any]],
):

if isinstance(config, typing.Mapping):
raise NotImplementedError()
Loading

0 comments on commit d603289

Please sign in to comment.