diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 000000000..f3f336ec8 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,2 @@ +[*.md] +ij_any_wrap_long_lines = true diff --git a/docs/architecture/data/storage.md b/docs/architecture/data/storage.md new file mode 100644 index 000000000..525aedbbd --- /dev/null +++ b/docs/architecture/data/storage.md @@ -0,0 +1,7 @@ +# Data storage + +This is a document to describe my plans for storing data (and metadata) in *kiara*. (Almost) nothing I describe here is inmplemented yet, so it only reflects my current thinking. I think the overall strategy will hold, but there might be changes here and there. + +## The problem + +*kiara*s main functionality centers around transforming input data sets to output data sets. Those outputs need to be stored, to be of any use later on. Obviously. When deciding how to do this, we must take into account concerns about performance, disk- and memory-usage, data versioning, which metadata to attach, in what way, how to deal with metadata schemas (and versioning of both), etc. diff --git a/setup.cfg b/setup.cfg index cbc8900c5..bf919dc1d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -55,7 +55,7 @@ console_scripts = kiara = kiara.interfaces.cli:cli kiara.modules = pipeline = kiara.pipeline.module:PipelineModule - metadata.extract_python_class = kiara.modules.metadata:ExtractPythonClass + metadata.python_class = kiara.modules.metadata:ExtractPythonClass [options.extras_require] cli = diff --git a/src/kiara/interfaces/cli/__init__.py b/src/kiara/interfaces/cli/__init__.py index 5701f9b30..f0b695eb1 100644 --- a/src/kiara/interfaces/cli/__init__.py +++ b/src/kiara/interfaces/cli/__init__.py @@ -3,28 +3,15 @@ """A command-line interface for *Kiara*. """ import asyncclick as click -import json -import os.path -import sys -import typing -from kiara_modules.core.json import DEFAULT_TO_JSON_CONFIG -from kiara_modules.core.string import DEFAULT_PRETTY_PRINT_CONFIG -from pathlib import Path -from rich import box -from rich.console import Console, RenderGroup -from rich.panel import Panel -from rich.syntax import Syntax from kiara import Kiara -from kiara.data.values import Value, ValuesInfo -from kiara.interfaces import get_console -from kiara.module import KiaraModule, ModuleInfo -from kiara.pipeline.controller import BatchController -from kiara.pipeline.module import PipelineModuleInfo -from kiara.pipeline.pipeline import StepStatus -from kiara.processing.parallel import ThreadPoolProcessor -from kiara.utils import create_table_from_field_schemas, dict_from_cli_args, is_debug -from kiara.utils.output import OutputDetails + +from .data.commands import data +from .metadata.commands import metadata +from .module.commands import module +from .pipeline.commands import pipeline +from .run import run +from .type.command import type_group try: import uvloop @@ -36,31 +23,6 @@ click.anyio_backend = "asyncio" -def rich_print(msg: typing.Any = None) -> None: - - if msg is None: - msg = "" - console = get_console() - console.print(msg) - - -def _create_module_instance( - ctx, module_type: str, module_config: typing.Iterable[typing.Any] -) -> KiaraModule: - config = dict_from_cli_args(*module_config) - - kiara_obj = ctx.obj["kiara"] - if os.path.isfile(module_type): - module_type = kiara_obj.register_pipeline_description( - module_type, raise_exception=True - ) - - module_obj = kiara_obj.create_module( - id=module_type, module_type=module_type, module_config=config - ) - return module_obj - - @click.group() @click.pass_context def cli(ctx): @@ -70,576 +32,12 @@ def cli(ctx): ctx.obj["kiara"] = Kiara.instance() -@cli.group() -@click.pass_context -def module(ctx): - """Information about available modules, and details about them.""" - - -@module.command(name="list") -@click.option( - "--only-pipeline-modules", "-p", is_flag=True, help="Only list pipeline modules." -) -@click.option( - "--only-core-modules", - "-c", - is_flag=True, - help="Only list core (aka 'Python') modules.", -) -@click.pass_context -def list_modules(ctx, only_pipeline_modules: bool, only_core_modules: bool): - """List available module types.""" - - if only_pipeline_modules and only_core_modules: - rich_print() - rich_print( - "Please provide either '--only-core-modules' or '--only-pipeline-modules', not both." - ) - sys.exit(1) - - kiara_obj: Kiara = ctx.obj["kiara"] - - if only_pipeline_modules: - title = "Available pipeline modules" - m_list = kiara_obj.create_modules_list( - list_pipeline_modules=True, list_non_pipeline_modules=False - ) - elif only_core_modules: - title = "Available core modules" - m_list = kiara_obj.create_modules_list( - list_pipeline_modules=False, list_non_pipeline_modules=True - ) - else: - title = "Available modules" - m_list = kiara_obj.modules_list - - p = Panel(m_list, title_align="left", title=title) - print() - kiara_obj.explain(p) - - -@module.command(name="explain-type") -@click.argument("module_type", nargs=1, required=True) -@click.pass_context -def explain_module_type(ctx, module_type: str): - """Print details of a module type. - - This is different to the 'explain-instance' command, because module types need to be - instantiated with configuration, before we can query all their properties (like - input/output types). - """ - - kiara_obj: Kiara = ctx.obj["kiara"] - - if os.path.isfile(module_type): - _module_type: str = kiara_obj.register_pipeline_description( # type: ignore - module_type, raise_exception=True - ) # type: ignore - else: - _module_type = module_type - - m_cls = kiara_obj.get_module_class(_module_type) - if _module_type == "pipeline" or not m_cls.is_pipeline(): - info = ModuleInfo(module_type=_module_type, _kiara=kiara_obj) - else: - info = ModuleInfo(module_type=_module_type, _kiara=kiara_obj) - rich_print() - rich_print(info) - - -@module.command("explain-instance") -@click.argument("module_type", nargs=1) -@click.argument( - "module_config", - nargs=-1, -) -@click.pass_context -def explain_module(ctx, module_type: str, module_config: typing.Iterable[typing.Any]): - """Describe a module instance. - - This command shows information and metadata about an instantiated *kiara* module. - """ - - module_obj = _create_module_instance( - ctx, module_type=module_type, module_config=module_config - ) - rich_print() - rich_print(module_obj) - - -@cli.group() -@click.pass_context -def pipeline(ctx): - """Pipeline-related sub-commands.""" - - -@pipeline.command() -@click.argument("pipeline-type", nargs=1) -@click.option( - "--full", - "-f", - is_flag=True, - help="Display full data-flow graph, incl. intermediate input/output connections.", -) -@click.pass_context -def data_flow_graph(ctx, pipeline_type: str, full: bool): - """Print the data flow graph for a pipeline structure.""" - - kiara_obj = ctx.obj["kiara"] - if os.path.isfile(pipeline_type): - pipeline_type = kiara_obj.register_pipeline_description( - pipeline_type, raise_exception=True - ) - - m_cls = kiara_obj.get_module_class(pipeline_type) - if not m_cls.is_pipeline(): - rich_print() - rich_print(f"Module '{pipeline_type}' is not a pipeline-type module.") - sys.exit(1) - - info = PipelineModuleInfo(module_type=pipeline_type) - - info.print_data_flow_graph(simplified=not full) - - -@pipeline.command() -@click.argument("pipeline-type", nargs=1) -@click.pass_context -def execution_graph(ctx, pipeline_type: str): - """Print the execution graph for a pipeline structure.""" - - kiara_obj = ctx.obj["kiara"] - - if os.path.isfile(pipeline_type): - pipeline_type = kiara_obj.register_pipeline_description( - pipeline_type, raise_exception=True - ) - - m_cls = kiara_obj.get_module_class(pipeline_type) - if not m_cls.is_pipeline(): - rich_print() - rich_print(f"Module '{pipeline_type}' is not a pipeline-type module.") - sys.exit(1) - - info = PipelineModuleInfo(module_type=pipeline_type) - info.print_execution_graph() - - -@pipeline.command() -@click.argument("pipeline-type", nargs=1) -@click.pass_context -def structure(ctx, pipeline_type: str): - """Print details about a pipeline structure.""" - - kiara_obj = ctx.obj["kiara"] - - if os.path.isfile(pipeline_type): - pipeline_type = kiara_obj.register_pipeline_description( - pipeline_type, raise_exception=True - ) - - m_cls = kiara_obj.get_module_class(pipeline_type) - if not m_cls.is_pipeline(): - rich_print() - rich_print(f"Module '{pipeline_type}' is not a pipeline-type module.") - sys.exit(1) - - info = PipelineModuleInfo(module_type=pipeline_type, _kiara=kiara_obj) - structure = info.create_structure() - print() - kiara_obj.explain(structure) - - -@pipeline.command() -@click.argument("pipeline-type", nargs=1) -@click.pass_context -def explain_steps(ctx, pipeline_type: str): - - kiara_obj = ctx.obj["kiara"] - - if os.path.isfile(pipeline_type): - pipeline_type = kiara_obj.register_pipeline_description( - pipeline_type, raise_exception=True - ) - - m_cls = kiara_obj.get_module_class(pipeline_type) - if not m_cls.is_pipeline(): - rich_print() - rich_print(f"Module '{pipeline_type}' is not a pipeline-type module.") - sys.exit(1) - - info = PipelineModuleInfo(module_type=pipeline_type) - structure = info.create_structure() - print() - kiara_obj.explain(structure.to_details().steps_info) - - -@cli.command() -@click.argument("module", nargs=1) -@click.argument("inputs", nargs=-1, required=False) -@click.option("--id", "-i", help="Set workflow id.", required=False) -@click.option( - "--module-config", - "-c", - required=False, - help="(Optional) module configuration.", - multiple=True, -) -@click.option( - "--explain", - "-e", - help="Display additional workflow details.", - is_flag=True, -) -@click.option( - "--output", "-o", help="The output format and configuration.", multiple=True -) -@click.option( - "--save", "-s", help="Save the outputs into the kiara data store.", is_flag=True -) -@click.pass_context -async def run(ctx, module, inputs, module_config, output, explain, save, id): - """Execute a workflow run.""" - - if module_config: - module_config = dict_from_cli_args(*module_config) - - kiara_obj: Kiara = ctx.obj["kiara"] - - if module in kiara_obj.available_module_types: - module_name = module - elif f"core.{module}" in kiara_obj.available_module_types: - module_name = f"core.{module}" - elif os.path.isfile(module): - module_name = kiara_obj.register_pipeline_description( - module, raise_exception=True - ) - else: - rich_print( - f"\nInvalid module name '[i]{module}[/i]'. Must be a path to a pipeline file, or one of the available modules:\n" - ) - for n in kiara_obj.available_module_types: - rich_print(f" - [i]{n}[/i]") - sys.exit(1) - - if not inputs: - - module_obj: KiaraModule = _create_module_instance( - ctx=ctx, module_type=module_name, module_config=module_config - ) - - one_required = False - for input_name in module_obj.input_names: - if module_obj.input_required(input_name): - one_required = True - break - - if one_required: - - inputs_table = create_table_from_field_schemas( - _show_header=True, **module_obj.input_schemas - ) - print() - print( - "No inputs provided, not running the workflow. To run it, provide input following this schema:" - ) - rich_print(inputs_table) - sys.exit(0) - - output_details = OutputDetails.from_data(output) - silent = False - if output_details.format == "silent": - silent = True - - force_overwrite = output_details.config.get("force", False) - - # SUPPORTED_TARGETS = ["terminal", "file"] - # if output_details.target not in SUPPORTED_TARGETS: - # print() - # rich_print(f"Invalid output target '{output_details.target}', must be one of: [i]{', '.join(SUPPORTED_TARGETS)}[/i]") - # sys.exit(1) - - target_file: typing.Optional[Path] = None - if output_details.target != "terminal": - if output_details.target == "file": - target_dir = Path.cwd() - target_file = target_dir / f"{module_name}.{output_details.format}" - else: - target_file = Path( - os.path.realpath(os.path.expanduser(output_details.target)) - ) - - if target_file.exists() and not force_overwrite: - print() - print( - f"Can't run workflow, the target files already exist, and '--output force=true' not specified: {target_file}" - ) - sys.exit(1) - - processor = ThreadPoolProcessor() - processor = None - controller = BatchController(processor=processor) - - workflow_id = id - if workflow_id is None: - workflow_id = f"{module_name}_0" - - workflow = kiara_obj.create_workflow( - module_name, - module_config=module_config, - workflow_id=workflow_id, - controller=controller, - ) - - if save: - - invalid = set() - for ov in workflow.outputs.values(): - existing = kiara_obj.data_store.check_existing_aliases(*ov.aliases) - invalid.update(existing) - - if invalid: - print() - print( - f"Can't run workflow, value aliases for saving already exist: {', '.join(invalid)}. Set another workflow id?" - ) - sys.exit(1) - - list_keys = [] - - for name, value in workflow.inputs.items(): - if value.value_schema.type in ["array", "list"]: - list_keys.append(name) - - workflow_input = dict_from_cli_args(*inputs, list_keys=list_keys) - failed = False - try: - if workflow_input: - workflow.inputs.set_values(**workflow_input) - else: - workflow.controller.process_pipeline() - except Exception as e: - print() - print(e) - failed = True - - if explain: - print() - kiara_obj.explain(workflow.current_state) - - if workflow.status == StepStatus.RESULTS_READY: - vi = ValuesInfo(workflow.outputs) - vi_table = vi.create_value_info_table( - ensure_metadata=True, show_headers=True - ) - panel = Panel(Panel(vi_table), box=box.SIMPLE) - rich_print("[b]Output data details[/b]") - rich_print(panel) - - if failed: - sys.exit(1) - - if not silent: - - if output_details.target == "terminal": - if output_details.format == "terminal": - print() - pretty_print = kiara_obj.create_workflow("string.pretty_print") - pretty_print_inputs: typing.Dict[str, typing.Any] = { - "item": workflow.outputs - } - pretty_print_inputs.update(DEFAULT_PRETTY_PRINT_CONFIG) - - pretty_print.inputs.set_values(**pretty_print_inputs) - - renderables = pretty_print.outputs.get_value_data("renderables") - if renderables: - output = Panel(RenderGroup(*renderables), box=box.SIMPLE) - rich_print("[b]Output data[/b]") - rich_print(output) - else: - rich_print("No output.") - else: - - format = output_details.format - available_formats = kiara_obj.get_convert_target_types( - source_type="value_set" - ) - if format not in available_formats: - print() - print( - f"Can't convert to output format '{format}', this format is not supported. Available formats: {', '.join(available_formats)}." - ) - sys.exit(1) - - config = {} - config.update(DEFAULT_TO_JSON_CONFIG) - - try: - transformed = kiara_obj.transform_data( - workflow.outputs, - source_type="value_set", - target_type=format, - config=config, - ) - transformed_value = transformed.get_value_data("target_value") - - if format in ["json", "yaml"]: - transformed_str = Syntax( - transformed_value, - lexer_name=format, - background_color="default", - ) - rich_print(transformed_str) - else: - print(transformed_value) - except Exception as e: - print() - rich_print(f"Can't transform outputs into '{format}': {e}") - sys.exit(1) - - else: - if output_details.format == "terminal": - - pretty_print = kiara_obj.create_workflow("string.pretty_print") - - pretty_print_inputs = {"item": value} - pretty_print_inputs.update(DEFAULT_PRETTY_PRINT_CONFIG) - pretty_print.inputs.set_values(**pretty_print_inputs) - - renderables = pretty_print.outputs.get_value_data("renderables") - output = Panel(RenderGroup(*renderables), box=box.SIMPLE) - with open(target_file, "wt") as f: - console = Console(record=True, file=f) - console.print(output) - else: - - format = output_details.format - available_formats = kiara_obj.get_convert_target_types( - source_type="value_set" - ) - if format not in available_formats: - print() - print( - f"Can't convert to output format '{format}', this format is not supported. Available formats: {', '.join(available_formats)}." - ) - sys.exit(1) - - config = {} - config.update(DEFAULT_TO_JSON_CONFIG) - - transformed = kiara_obj.transform_data( - workflow.outputs, - source_type="value_set", - target_type=format, - config=config, - ) - transformed_value = transformed.get_value_data() - - target_file.parent.mkdir(parents=True, exist_ok=True) - # TODO: check whether to write text or bytes - target_file.write_text(transformed_value) - - if save: - for field, value in workflow.outputs.items(): - rich_print(f"Saving '[i]{field}[/i]'...") - try: - value_id = value.save() - rich_print(f" -> done, id: [i]{value_id}[/i]") - - except Exception as e: - if is_debug(): - import traceback - - traceback.print_exc() - rich_print(f" -> failed: [red]{e}[/red]") - print() - - -@cli.group() -@click.pass_context -def data(ctx): - """Data-related sub-commands.""" - - -@data.command(name="list") -@click.option("--details", "-d", help="Display data item details.", is_flag=True) -@click.option("--ids", "-i", help="List value ids instead of aliases.", is_flag=True) -@click.pass_context -def list_values(ctx, details, ids): - - kiara_obj: Kiara = ctx.obj["kiara"] - - print() - if ids: - for id, d in kiara_obj.data_store.values_metadata.items(): - if not details: - rich_print(f" - [b]{id}[/b]: {d['type']}") - else: - rich_print(f"[b]{id}[/b]: {d['type']}\n") - md = kiara_obj.data_store.get_value_metadata(value_id=id) - s = Syntax(json.dumps(md, indent=2), "json") - rich_print(s) - print() - else: - for alias, v_id in kiara_obj.data_store.aliases.items(): - v_type = kiara_obj.data_store.get_value_type(v_id) - if not details: - rich_print(f" - [b]{alias}[/b]: {v_type}") - else: - rich_print(f"[b]{alias}[/b]: {v_type}\n") - md = kiara_obj.data_store.get_value_metadata(value_id=v_id) - s = Syntax(json.dumps(md, indent=2), "json") - rich_print(s) - print() - - -@data.command(name="explain") -@click.argument("value_id", nargs=1, required=True) -@click.pass_context -def explain_value(ctx, value_id: str): - - kiara_obj: Kiara = ctx.obj["kiara"] - - print() - value = kiara_obj.data_store.load_value(value_id=value_id) - rich_print(value) - - -@data.command(name="load") -@click.argument("value_id", nargs=1, required=True) -@click.pass_context -def load_value(ctx, value_id: str): - - kiara_obj: Kiara = ctx.obj["kiara"] - - print() - value = kiara_obj.data_store.load_value(value_id=value_id) - - pretty_print_config: typing.Dict[str, typing.Any] = {"item": value} - pretty_print_config.update(DEFAULT_PRETTY_PRINT_CONFIG) - renderables: Value = kiara_obj.run( # type: ignore - "string.pretty_print", inputs=pretty_print_config, output_name="renderables" - ) - rich_print(*renderables.get_value_data()) - - -@cli.group(name="type") -@click.pass_context -def type_group(ctx): - """Information about available value types, and details about them.""" - - -@type_group.command(name="list") -@click.pass_context -def list_types(ctx): - """List available types (work in progress).""" - - kiara_obj: Kiara = ctx.obj["kiara"] - - print() - for type_name, type in kiara_obj.value_types.items(): - rich_print(f"{type_name}: {type}") +cli.add_command(run) +cli.add_command(data) +cli.add_command(metadata) +cli.add_command(type_group) +cli.add_command(module) +cli.add_command(pipeline) if __name__ == "__main__": diff --git a/src/kiara/interfaces/cli/data/__init__.py b/src/kiara/interfaces/cli/data/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/kiara/interfaces/cli/data/commands.py b/src/kiara/interfaces/cli/data/commands.py new file mode 100644 index 000000000..07c29e6aa --- /dev/null +++ b/src/kiara/interfaces/cli/data/commands.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +import asyncclick as click +import json +import typing +from kiara_modules.core.string import DEFAULT_PRETTY_PRINT_CONFIG +from rich.syntax import Syntax + +from kiara import Kiara +from kiara.data.values import Value +from kiara.utils.output import rich_print + + +@click.group() +@click.pass_context +def data(ctx): + """Data-related sub-commands.""" + + +@data.command(name="list") +@click.option("--details", "-d", help="Display data item details.", is_flag=True) +@click.option("--ids", "-i", help="List value ids instead of aliases.", is_flag=True) +@click.pass_context +def list_values(ctx, details, ids): + + kiara_obj: Kiara = ctx.obj["kiara"] + + print() + if ids: + for id, d in kiara_obj.data_store.values_metadata.items(): + if not details: + rich_print(f" - [b]{id}[/b]: {d['type']}") + else: + rich_print(f"[b]{id}[/b]: {d['type']}\n") + md = kiara_obj.data_store.get_value_metadata(value_id=id) + s = Syntax(json.dumps(md, indent=2), "json") + rich_print(s) + print() + else: + for alias, v_id in kiara_obj.data_store.aliases.items(): + v_type = kiara_obj.data_store.get_value_type(v_id) + if not details: + rich_print(f" - [b]{alias}[/b]: {v_type}") + else: + rich_print(f"[b]{alias}[/b]: {v_type}\n") + md = kiara_obj.data_store.get_value_metadata(value_id=v_id) + s = Syntax(json.dumps(md, indent=2), "json") + rich_print(s) + print() + + +@data.command(name="explain") +@click.argument("value_id", nargs=1, required=True) +@click.pass_context +def explain_value(ctx, value_id: str): + + kiara_obj: Kiara = ctx.obj["kiara"] + + print() + value = kiara_obj.data_store.load_value(value_id=value_id) + rich_print(value) + + +@data.command(name="load") +@click.argument("value_id", nargs=1, required=True) +@click.pass_context +def load_value(ctx, value_id: str): + + kiara_obj: Kiara = ctx.obj["kiara"] + + print() + value = kiara_obj.data_store.load_value(value_id=value_id) + + pretty_print_config: typing.Dict[str, typing.Any] = {"item": value} + pretty_print_config.update(DEFAULT_PRETTY_PRINT_CONFIG) + renderables: Value = kiara_obj.run( # type: ignore + "string.pretty_print", inputs=pretty_print_config, output_name="renderables" + ) + rich_print(*renderables.get_value_data()) diff --git a/src/kiara/interfaces/cli/metadata/__init__.py b/src/kiara/interfaces/cli/metadata/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/kiara/interfaces/cli/metadata/commands.py b/src/kiara/interfaces/cli/metadata/commands.py new file mode 100644 index 000000000..0a22c2e5d --- /dev/null +++ b/src/kiara/interfaces/cli/metadata/commands.py @@ -0,0 +1,8 @@ +# -*- coding: utf-8 -*- +import asyncclick as click + + +@click.group() +@click.pass_context +def metadata(ctx): + """Metadata-related sub-commands.""" diff --git a/src/kiara/interfaces/cli/module/__init__.py b/src/kiara/interfaces/cli/module/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/kiara/interfaces/cli/module/commands.py b/src/kiara/interfaces/cli/module/commands.py new file mode 100644 index 000000000..4cb294b5a --- /dev/null +++ b/src/kiara/interfaces/cli/module/commands.py @@ -0,0 +1,108 @@ +# -*- coding: utf-8 -*- +import asyncclick as click +import os.path +import sys +import typing +from rich.panel import Panel + +from kiara import Kiara +from kiara.interfaces.cli.utils import _create_module_instance +from kiara.module import ModuleInfo +from kiara.utils.output import rich_print + + +@click.group() +@click.pass_context +def module(ctx): + """Information about available modules, and details about them.""" + + +@module.command(name="list") +@click.option( + "--only-pipeline-modules", "-p", is_flag=True, help="Only list pipeline modules." +) +@click.option( + "--only-core-modules", + "-c", + is_flag=True, + help="Only list core (aka 'Python') modules.", +) +@click.pass_context +def list_modules(ctx, only_pipeline_modules: bool, only_core_modules: bool): + """List available module types.""" + + if only_pipeline_modules and only_core_modules: + rich_print() + rich_print( + "Please provide either '--only-core-modules' or '--only-pipeline-modules', not both." + ) + sys.exit(1) + + kiara_obj: Kiara = ctx.obj["kiara"] + + if only_pipeline_modules: + title = "Available pipeline modules" + m_list = kiara_obj.create_modules_list( + list_pipeline_modules=True, list_non_pipeline_modules=False + ) + elif only_core_modules: + title = "Available core modules" + m_list = kiara_obj.create_modules_list( + list_pipeline_modules=False, list_non_pipeline_modules=True + ) + else: + title = "Available modules" + m_list = kiara_obj.modules_list + + p = Panel(m_list, title_align="left", title=title) + print() + kiara_obj.explain(p) + + +@module.command(name="explain-type") +@click.argument("module_type", nargs=1, required=True) +@click.pass_context +def explain_module_type(ctx, module_type: str): + """Print details of a module type. + + This is different to the 'explain-instance' command, because module types need to be + instantiated with configuration, before we can query all their properties (like + input/output types). + """ + + kiara_obj: Kiara = ctx.obj["kiara"] + + if os.path.isfile(module_type): + _module_type: str = kiara_obj.register_pipeline_description( # type: ignore + module_type, raise_exception=True + ) # type: ignore + else: + _module_type = module_type + + m_cls = kiara_obj.get_module_class(_module_type) + if _module_type == "pipeline" or not m_cls.is_pipeline(): + info = ModuleInfo(module_type=_module_type, _kiara=kiara_obj) + else: + info = ModuleInfo(module_type=_module_type, _kiara=kiara_obj) + rich_print() + rich_print(info) + + +@module.command("explain-instance") +@click.argument("module_type", nargs=1) +@click.argument( + "module_config", + nargs=-1, +) +@click.pass_context +def explain_module(ctx, module_type: str, module_config: typing.Iterable[typing.Any]): + """Describe a module instance. + + This command shows information and metadata about an instantiated *kiara* module. + """ + + module_obj = _create_module_instance( + ctx, module_type=module_type, module_config=module_config + ) + rich_print() + rich_print(module_obj) diff --git a/src/kiara/interfaces/cli/pipeline/__init__.py b/src/kiara/interfaces/cli/pipeline/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/kiara/interfaces/cli/pipeline/commands.py b/src/kiara/interfaces/cli/pipeline/commands.py new file mode 100644 index 000000000..df8d310d6 --- /dev/null +++ b/src/kiara/interfaces/cli/pipeline/commands.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +import asyncclick as click +import os.path +import sys + +from kiara.pipeline.module import PipelineModuleInfo +from kiara.utils.output import rich_print + + +@click.group() +@click.pass_context +def pipeline(ctx): + """Pipeline-related sub-commands.""" + + +@pipeline.command() +@click.argument("pipeline-type", nargs=1) +@click.option( + "--full", + "-f", + is_flag=True, + help="Display full data-flow graph, incl. intermediate input/output connections.", +) +@click.pass_context +def data_flow_graph(ctx, pipeline_type: str, full: bool): + """Print the data flow graph for a pipeline structure.""" + + kiara_obj = ctx.obj["kiara"] + if os.path.isfile(pipeline_type): + pipeline_type = kiara_obj.register_pipeline_description( + pipeline_type, raise_exception=True + ) + + m_cls = kiara_obj.get_module_class(pipeline_type) + if not m_cls.is_pipeline(): + rich_print() + rich_print(f"Module '{pipeline_type}' is not a pipeline-type module.") + sys.exit(1) + + info = PipelineModuleInfo(module_type=pipeline_type) + + info.print_data_flow_graph(simplified=not full) + + +@pipeline.command() +@click.argument("pipeline-type", nargs=1) +@click.pass_context +def execution_graph(ctx, pipeline_type: str): + """Print the execution graph for a pipeline structure.""" + + kiara_obj = ctx.obj["kiara"] + + if os.path.isfile(pipeline_type): + pipeline_type = kiara_obj.register_pipeline_description( + pipeline_type, raise_exception=True + ) + + m_cls = kiara_obj.get_module_class(pipeline_type) + if not m_cls.is_pipeline(): + rich_print() + rich_print(f"Module '{pipeline_type}' is not a pipeline-type module.") + sys.exit(1) + + info = PipelineModuleInfo(module_type=pipeline_type) + info.print_execution_graph() + + +@pipeline.command() +@click.argument("pipeline-type", nargs=1) +@click.pass_context +def structure(ctx, pipeline_type: str): + """Print details about a pipeline structure.""" + + kiara_obj = ctx.obj["kiara"] + + if os.path.isfile(pipeline_type): + pipeline_type = kiara_obj.register_pipeline_description( + pipeline_type, raise_exception=True + ) + + m_cls = kiara_obj.get_module_class(pipeline_type) + if not m_cls.is_pipeline(): + rich_print() + rich_print(f"Module '{pipeline_type}' is not a pipeline-type module.") + sys.exit(1) + + info = PipelineModuleInfo(module_type=pipeline_type, _kiara=kiara_obj) + structure = info.create_structure() + print() + kiara_obj.explain(structure) + + +@pipeline.command() +@click.argument("pipeline-type", nargs=1) +@click.pass_context +def explain_steps(ctx, pipeline_type: str): + + kiara_obj = ctx.obj["kiara"] + + if os.path.isfile(pipeline_type): + pipeline_type = kiara_obj.register_pipeline_description( + pipeline_type, raise_exception=True + ) + + m_cls = kiara_obj.get_module_class(pipeline_type) + if not m_cls.is_pipeline(): + rich_print() + rich_print(f"Module '{pipeline_type}' is not a pipeline-type module.") + sys.exit(1) + + info = PipelineModuleInfo(module_type=pipeline_type) + structure = info.create_structure() + print() + kiara_obj.explain(structure.to_details().steps_info) diff --git a/src/kiara/interfaces/cli/run.py b/src/kiara/interfaces/cli/run.py new file mode 100644 index 000000000..1dc689c5e --- /dev/null +++ b/src/kiara/interfaces/cli/run.py @@ -0,0 +1,304 @@ +# -*- coding: utf-8 -*- +import asyncclick as click +import os.path +import sys +import typing +from kiara_modules.core.json import DEFAULT_TO_JSON_CONFIG +from kiara_modules.core.string import DEFAULT_PRETTY_PRINT_CONFIG +from pathlib import Path +from rich import box +from rich.console import Console, RenderGroup +from rich.panel import Panel +from rich.syntax import Syntax + +from kiara import Kiara +from kiara.data.values import ValuesInfo +from kiara.interfaces.cli.utils import _create_module_instance +from kiara.module import KiaraModule +from kiara.pipeline.controller import BatchController +from kiara.pipeline.pipeline import StepStatus +from kiara.processing.parallel import ThreadPoolProcessor +from kiara.utils import create_table_from_field_schemas, dict_from_cli_args, is_debug +from kiara.utils.output import OutputDetails, rich_print + + +@click.command() +@click.argument("module", nargs=1) +@click.argument("inputs", nargs=-1, required=False) +@click.option("--id", "-i", help="Set workflow id.", required=False) +@click.option( + "--module-config", + "-c", + required=False, + help="(Optional) module configuration.", + multiple=True, +) +@click.option( + "--explain", + "-e", + help="Display additional workflow details.", + is_flag=True, +) +@click.option( + "--output", "-o", help="The output format and configuration.", multiple=True +) +@click.option( + "--save", "-s", help="Save the outputs into the kiara data store.", is_flag=True +) +@click.pass_context +async def run(ctx, module, inputs, module_config, output, explain, save, id): + """Execute a workflow run.""" + + if module_config: + module_config = dict_from_cli_args(*module_config) + + kiara_obj: Kiara = ctx.obj["kiara"] + + if module in kiara_obj.available_module_types: + module_name = module + elif f"core.{module}" in kiara_obj.available_module_types: + module_name = f"core.{module}" + elif os.path.isfile(module): + module_name = kiara_obj.register_pipeline_description( + module, raise_exception=True + ) + else: + rich_print( + f"\nInvalid module name '[i]{module}[/i]'. Must be a path to a pipeline file, or one of the available modules:\n" + ) + for n in kiara_obj.available_module_types: + rich_print(f" - [i]{n}[/i]") + sys.exit(1) + + if not inputs: + + module_obj: KiaraModule = _create_module_instance( + ctx=ctx, module_type=module_name, module_config=module_config + ) + + one_required = False + for input_name in module_obj.input_names: + if module_obj.input_required(input_name): + one_required = True + break + + if one_required: + + inputs_table = create_table_from_field_schemas( + _show_header=True, **module_obj.input_schemas + ) + print() + print( + "No inputs provided, not running the workflow. To run it, provide input following this schema:" + ) + rich_print(inputs_table) + sys.exit(0) + + output_details = OutputDetails.from_data(output) + silent = False + if output_details.format == "silent": + silent = True + + force_overwrite = output_details.config.get("force", False) + + # SUPPORTED_TARGETS = ["terminal", "file"] + # if output_details.target not in SUPPORTED_TARGETS: + # print() + # rich_print(f"Invalid output target '{output_details.target}', must be one of: [i]{', '.join(SUPPORTED_TARGETS)}[/i]") + # sys.exit(1) + + target_file: typing.Optional[Path] = None + if output_details.target != "terminal": + if output_details.target == "file": + target_dir = Path.cwd() + target_file = target_dir / f"{module_name}.{output_details.format}" + else: + target_file = Path( + os.path.realpath(os.path.expanduser(output_details.target)) + ) + + if target_file.exists() and not force_overwrite: + print() + print( + f"Can't run workflow, the target files already exist, and '--output force=true' not specified: {target_file}" + ) + sys.exit(1) + + processor = ThreadPoolProcessor() + processor = None + controller = BatchController(processor=processor) + + workflow_id = id + if workflow_id is None: + workflow_id = f"{module_name}_0" + + workflow = kiara_obj.create_workflow( + module_name, + module_config=module_config, + workflow_id=workflow_id, + controller=controller, + ) + + if save: + + invalid = set() + for ov in workflow.outputs.values(): + existing = kiara_obj.data_store.check_existing_aliases(*ov.aliases) + invalid.update(existing) + + if invalid: + print() + print( + f"Can't run workflow, value aliases for saving already exist: {', '.join(invalid)}. Set another workflow id?" + ) + sys.exit(1) + + list_keys = [] + + for name, value in workflow.inputs.items(): + if value.value_schema.type in ["array", "list"]: + list_keys.append(name) + + workflow_input = dict_from_cli_args(*inputs, list_keys=list_keys) + failed = False + try: + if workflow_input: + workflow.inputs.set_values(**workflow_input) + else: + workflow.controller.process_pipeline() + except Exception as e: + print() + print(e) + failed = True + + if explain: + print() + kiara_obj.explain(workflow.current_state) + + if workflow.status == StepStatus.RESULTS_READY: + vi = ValuesInfo(workflow.outputs) + vi_table = vi.create_value_info_table( + ensure_metadata=True, show_headers=True + ) + panel = Panel(Panel(vi_table), box=box.SIMPLE) + rich_print("[b]Output data details[/b]") + rich_print(panel) + + if failed: + sys.exit(1) + + if not silent: + + if output_details.target == "terminal": + if output_details.format == "terminal": + print() + pretty_print = kiara_obj.create_workflow("string.pretty_print") + pretty_print_inputs: typing.Dict[str, typing.Any] = { + "item": workflow.outputs + } + pretty_print_inputs.update(DEFAULT_PRETTY_PRINT_CONFIG) + + pretty_print.inputs.set_values(**pretty_print_inputs) + + renderables = pretty_print.outputs.get_value_data("renderables") + if renderables: + output = Panel(RenderGroup(*renderables), box=box.SIMPLE) + rich_print("[b]Output data[/b]") + rich_print(output) + else: + rich_print("No output.") + else: + + format = output_details.format + available_formats = kiara_obj.get_convert_target_types( + source_type="value_set" + ) + if format not in available_formats: + print() + print( + f"Can't convert to output format '{format}', this format is not supported. Available formats: {', '.join(available_formats)}." + ) + sys.exit(1) + + config = {} + config.update(DEFAULT_TO_JSON_CONFIG) + + try: + transformed = kiara_obj.transform_data( + workflow.outputs, + source_type="value_set", + target_type=format, + config=config, + ) + transformed_value = transformed.get_value_data("target_value") + + if format in ["json", "yaml"]: + transformed_str = Syntax( + transformed_value, + lexer_name=format, + background_color="default", + ) + rich_print(transformed_str) + else: + print(transformed_value) + except Exception as e: + print() + rich_print(f"Can't transform outputs into '{format}': {e}") + sys.exit(1) + + else: + if output_details.format == "terminal": + + pretty_print = kiara_obj.create_workflow("string.pretty_print") + + pretty_print_inputs = {"item": value} + pretty_print_inputs.update(DEFAULT_PRETTY_PRINT_CONFIG) + pretty_print.inputs.set_values(**pretty_print_inputs) + + renderables = pretty_print.outputs.get_value_data("renderables") + output = Panel(RenderGroup(*renderables), box=box.SIMPLE) + with open(target_file, "wt") as f: + console = Console(record=True, file=f) + console.print(output) + else: + + format = output_details.format + available_formats = kiara_obj.get_convert_target_types( + source_type="value_set" + ) + if format not in available_formats: + print() + print( + f"Can't convert to output format '{format}', this format is not supported. Available formats: {', '.join(available_formats)}." + ) + sys.exit(1) + + config = {} + config.update(DEFAULT_TO_JSON_CONFIG) + + transformed = kiara_obj.transform_data( + workflow.outputs, + source_type="value_set", + target_type=format, + config=config, + ) + transformed_value = transformed.get_value_data() + + target_file.parent.mkdir(parents=True, exist_ok=True) + # TODO: check whether to write text or bytes + target_file.write_text(transformed_value) + + if save: + for field, value in workflow.outputs.items(): + rich_print(f"Saving '[i]{field}[/i]'...") + try: + value_id = value.save() + rich_print(f" -> done, id: [i]{value_id}[/i]") + + except Exception as e: + if is_debug(): + import traceback + + traceback.print_exc() + rich_print(f" -> failed: [red]{e}[/red]") + print() diff --git a/src/kiara/interfaces/cli/type/__init__.py b/src/kiara/interfaces/cli/type/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/kiara/interfaces/cli/type/command.py b/src/kiara/interfaces/cli/type/command.py new file mode 100644 index 000000000..e9eca9cce --- /dev/null +++ b/src/kiara/interfaces/cli/type/command.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +import asyncclick as click + +from kiara import Kiara +from kiara.utils.output import rich_print + + +@click.group(name="type") +@click.pass_context +def type_group(ctx): + """Information about available value types, and details about them.""" + + +@type_group.command(name="list") +@click.pass_context +def list_types(ctx): + """List available types (work in progress).""" + + kiara_obj: Kiara = ctx.obj["kiara"] + + print() + for type_name, type in kiara_obj.value_types.items(): + rich_print(f"{type_name}: {type}") diff --git a/src/kiara/interfaces/cli/utils.py b/src/kiara/interfaces/cli/utils.py new file mode 100644 index 000000000..d110beb63 --- /dev/null +++ b/src/kiara/interfaces/cli/utils.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +import os.path +import typing + +from kiara import KiaraModule +from kiara.utils import dict_from_cli_args + + +def _create_module_instance( + ctx, module_type: str, module_config: typing.Iterable[typing.Any] +) -> KiaraModule: + config = dict_from_cli_args(*module_config) + + kiara_obj = ctx.obj["kiara"] + if os.path.isfile(module_type): + module_type = kiara_obj.register_pipeline_description( + module_type, raise_exception=True + ) + + module_obj = kiara_obj.create_module( + id=module_type, module_type=module_type, module_config=config + ) + return module_obj diff --git a/src/kiara/modules/metadata.py b/src/kiara/modules/metadata.py index c1b8fcd8d..cf08b93c7 100644 --- a/src/kiara/modules/metadata.py +++ b/src/kiara/modules/metadata.py @@ -134,7 +134,7 @@ def process(self, inputs: ValueSet, outputs: ValueSet) -> None: class ExtractPythonClass(ExtractMetadataModule): """Extract metadata about the Python type of this value.""" - _module_type_name = "python_class" + _module_type_name = "metadata.python_class" @classmethod def _get_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]: diff --git a/src/kiara/utils/class_loading.py b/src/kiara/utils/class_loading.py index f079014bb..7c95ab106 100644 --- a/src/kiara/utils/class_loading.py +++ b/src/kiara/utils/class_loading.py @@ -11,6 +11,7 @@ from kiara import KiaraModule from kiara.defaults import RELATIVE_PIPELINES_PATH +from kiara.metadata import MetadataModel from kiara.utils import ( _get_all_subclasses, _import_modules_recursively, @@ -25,9 +26,33 @@ KiaraEntryPointIterable = typing.Iterable[KiaraEntryPointItem] -def find_kiara_modules_under( - module: typing.Union[str, ModuleType], prefix: typing.Optional[str] = "" -) -> typing.Mapping[str, typing.Type[KiaraModule]]: +SUBCLASS_TYPE = typing.TypeVar("SUBCLASS_TYPE") + + +def _get_subclass_name(module: typing.Type[KiaraModule]): + + name = camel_case_to_snake_case(module.__name__) + return name + + +def find_subclasses_under( + base_class: typing.Type[SUBCLASS_TYPE], + module: typing.Union[str, ModuleType], + prefix: typing.Optional[str] = "", + remove_namespace_tokens: typing.Optional[typing.Iterable[str]] = None, + module_name_func: typing.Callable = None, +) -> typing.Mapping[str, typing.Type[SUBCLASS_TYPE]]: + """Find all (non-abstract) subclasses of a base class that live under a module (recursively). + + Arguments: + base_class: the parent class + module: the module to search + prefix: a string to use as a result items namespace prefix, defaults to an empty string, use 'None' to indicate the module path should be used + remove_namespace_tokens: a list of strings to remove from module names when autogenerating subclass ids, and prefix is None + + Returns: + a map containing the (fully namespaced) id of the subclass as key, and the actual class object as value + """ if hasattr(sys, "frozen"): raise NotImplementedError("Pyinstaller bundling not supported yet.") @@ -37,7 +62,7 @@ def find_kiara_modules_under( _import_modules_recursively(module) - subclasses: typing.Iterable[typing.Type[KiaraModule]] = _get_all_subclasses( + subclasses: typing.Iterable[typing.Type[SUBCLASS_TYPE]] = _get_all_subclasses( KiaraModule ) @@ -54,15 +79,19 @@ def find_kiara_modules_under( log.debug(f"Ignoring abstract subclass: {sc}") continue - name = _get_module_name(sc) + if module_name_func is None: + module_name_func = _get_subclass_name + name = module_name_func(sc) path = sc.__module__[len(module.__name__) + 1 :] # noqa full_name = f"{path}.{name}" if prefix is None: prefix = module.__name__ - if prefix.startswith("kiara_modules."): - prefix = prefix[0:-14] + if remove_namespace_tokens: + for rnt in remove_namespace_tokens: + if prefix.startswith(rnt): + prefix = prefix[0 : -len(rnt)] # noqa if prefix: full_name = f"{prefix}.{full_name}" @@ -72,6 +101,174 @@ def find_kiara_modules_under( return result +def load_all_subclasses_for_entry_point( + entry_point_name: str, + base_class: typing.Type[SUBCLASS_TYPE], + set_id_attribute: typing.Union[None, str] = None, + remove_namespace_tokens: typing.Optional[typing.Iterable[str]] = None, +) -> typing.Dict[str, typing.Type[SUBCLASS_TYPE]]: + """Find all subclasses of a base class via package entry points. + + Arguments: + entry_point_name: the entry point name to query entries for + base_class: the base class to look for + set_id_attribute: whether to set the entry point id as attribute to the class, if None, no id attribute will be set, if a string, the attribute with that name will be set + remove_namespace_tokens: a list of strings to remove from module names when autogenerating subclass ids, and prefix is None + + TODO + """ + + log2 = logging.getLogger("stevedore") + out_hdlr = logging.StreamHandler(sys.stdout) + out_hdlr.setFormatter( + logging.Formatter(f"{entry_point_name} plugin search error -> %(message)s") + ) + out_hdlr.setLevel(logging.INFO) + log2.addHandler(out_hdlr) + log2.setLevel(logging.INFO) + + log.debug(f"Finding {entry_point_name} items from search paths...") + + mgr = ExtensionManager( + namespace=entry_point_name, + invoke_on_load=False, + propagate_map_exceptions=True, + ) + + result_entrypoints: typing.Dict[str, typing.Type] = {} + result_dynamic: typing.Dict[str, typing.Type] = {} + for plugin in mgr: + + name = plugin.name + + if isinstance(plugin.plugin, type) and issubclass(plugin.plugin, base_class): + ep = plugin.entry_point + module_cls = ep.load() + if set_id_attribute: + if hasattr(module_cls, set_id_attribute): + if not getattr(module_cls, set_id_attribute) == name: + log.warning( + f"Item id mismatch for type {entry_point_name}: {getattr(module_cls, set_id_attribute)} != {name}, entry point key takes precedence {name})" + ) + setattr(module_cls, set_id_attribute, name) + + else: + setattr(module_cls, set_id_attribute, name) + result_entrypoints[name] = module_cls + elif ( + isinstance(plugin.plugin, tuple) + and len(plugin.plugin) >= 1 + and callable(plugin.plugin[0]) + ) or callable(plugin.plugin): + modules = _callable_wrapper(plugin.plugin) + + for k, v in modules.items(): + _name = f"{name}.{k}" + if _name in result_dynamic.keys(): + raise Exception( + f"Duplicate module name for type {entry_point_name}: {_name}" + ) + result_dynamic[_name] = v + + else: + raise NotImplementedError() + + for k, v in result_dynamic.items(): + if k in result_entrypoints.keys(): + raise Exception(f"Duplicate module name for type {entry_point_name}: {k}") + result_entrypoints[k] = v + + result: typing.Dict[str, typing.Type[SUBCLASS_TYPE]] = {} + + for k, v in result_entrypoints.items(): + if remove_namespace_tokens: + for rnt in remove_namespace_tokens: + if k.startswith(rnt): + k = k[len(rnt) :] # noqa + result[k] = v + + return result + + +def find_all_kiara_modules() -> typing.Dict[str, typing.Type["KiaraModule"]]: + """Find all [KiaraModule][kiara.module.KiaraModule] subclasses via package entry points. + + TODO + """ + + return load_all_subclasses_for_entry_point( + entry_point_name="kiara.modules", + base_class=KiaraModule, # type: ignore + set_id_attribute="_module_type_name", + remove_namespace_tokens=["core."], + ) + + +def find_all_metadata_schemas() -> typing.Dict[str, typing.Type["MetadataModel"]]: + """Find all [KiaraModule][kiara.module.KiaraModule] subclasses via package entry points. + + TODO + """ + + return load_all_subclasses_for_entry_point( + entry_point_name="kiara.metadata_schema", + base_class=MetadataModel, + set_id_attribute="_metadata_key", + remove_namespace_tokens=["core."], + ) + + +def _get_and_set_module_name(module: typing.Type[KiaraModule]): + + if hasattr(module, "_module_type_name"): + return module._module_type_name # type: ignore + else: + name = camel_case_to_snake_case(module.__name__) + if name.endswith("_module"): + name = name[0:-7] + if not inspect.isabstract(module): + setattr(module, "_module_type_name", name) + return name + + +def _get_and_set_metadata_model_name(module: typing.Type[KiaraModule]): + + if hasattr(module, "_metadata_key"): + return module._metadata_key # type: ignore + else: + name = camel_case_to_snake_case(module.__name__) + if name.endswith("_model"): + name = name[0:-6] + if not inspect.isabstract(module): + setattr(module, "_metadata_key", name) + return name + + +def find_kiara_modules_under( + module: typing.Union[str, ModuleType], prefix: typing.Optional[str] = "" +) -> typing.Mapping[str, typing.Type[KiaraModule]]: + return find_subclasses_under( + base_class=KiaraModule, # type: ignore + module=module, + prefix=prefix, + remove_namespace_tokens=["kiara_modules."], + module_name_func=_get_and_set_module_name, + ) + + +def find_metadata_models_under( + module: typing.Union[str, ModuleType], prefix: typing.Optional[str] = "" +) -> typing.Mapping[str, typing.Type[MetadataModel]]: + + return find_subclasses_under( + base_class=MetadataModel, + module=module, + prefix=prefix, + remove_namespace_tokens=[], + module_name_func=_get_and_set_metadata_model_name, + ) + + def find_kiara_pipelines_under( module: typing.Union[str, ModuleType] ) -> typing.List[str]: @@ -155,83 +352,6 @@ def find_all_kiara_pipeline_paths() -> typing.Dict[str, typing.List[str]]: return result -def find_all_kiara_modules() -> typing.Dict[str, typing.Type["KiaraModule"]]: - """Find all [KiaraModule][kiara.module.KiaraModule] subclasses via package entry points. - - TODO - """ - - log2 = logging.getLogger("stevedore") - out_hdlr = logging.StreamHandler(sys.stdout) - out_hdlr.setFormatter( - logging.Formatter("kiara module plugin search error -> %(message)s") - ) - out_hdlr.setLevel(logging.INFO) - log2.addHandler(out_hdlr) - log2.setLevel(logging.INFO) - - log.debug("Finding kiara modules from search paths...") - - mgr = ExtensionManager( - namespace="kiara.modules", - invoke_on_load=False, - propagate_map_exceptions=True, - ) - - result_entrypoints: typing.Dict[str, typing.Type] = {} - result_dynamic: typing.Dict[str, typing.Type] = {} - for plugin in mgr: - - name = plugin.name - - if isinstance(plugin.plugin, type) and issubclass(plugin.plugin, KiaraModule): - ep = plugin.entry_point - module_cls = ep.load() - setattr(module_cls, "_module_type_name", name) - result_entrypoints[name] = module_cls - elif ( - isinstance(plugin.plugin, tuple) - and len(plugin.plugin) >= 1 - and callable(plugin.plugin[0]) - ) or callable(plugin.plugin): - modules = _find_kiara_modules_using_callable(plugin.plugin) - - for k, v in modules.items(): - _name = f"{name}.{k}" - if _name in result_dynamic.keys(): - raise Exception(f"Duplicate module name: {_name}") - result_dynamic[_name] = v - else: - raise NotImplementedError() - - for k, v in result_dynamic.items(): - if k in result_entrypoints.keys(): - raise Exception(f"Duplicate module name: {k}") - result_entrypoints[k] = v - - result: typing.Dict[str, typing.Type[KiaraModule]] = {} - - for k, v in result_entrypoints.items(): - if k.startswith("core."): - k = k[5:] - result[k] = v - - return result - - -def _get_module_name(module: typing.Type[KiaraModule]): - - if hasattr(module, "_module_type_name"): - return module._module_type_name # type: ignore - else: - name = camel_case_to_snake_case(module.__name__) - if name.endswith("_module"): - name = name[0:-7] - if not inspect.isabstract(module): - setattr(module, "_module_type_name", name) - return name - - def _find_pipeline_folders_using_callable( func: typing.Union[typing.Callable, typing.Tuple] ) -> typing.List[str]: @@ -240,12 +360,12 @@ def _find_pipeline_folders_using_callable( return _callable_wrapper(func=func) # type: ignore -def _find_kiara_modules_using_callable( - func: typing.Union[typing.Callable, typing.Tuple] -) -> typing.Mapping[str, typing.Type[KiaraModule]]: - - # TODO: typecheck? - return _callable_wrapper(func=func) # type: ignore +# def _find_kiara_modules_using_callable( +# func: typing.Union[typing.Callable, typing.Tuple] +# ) -> typing.Mapping[str, typing.Type[KiaraModule]]: +# +# # TODO: typecheck? +# return _callable_wrapper(func=func) # type: ignore def _callable_wrapper(func: typing.Union[typing.Callable, typing.Tuple]) -> typing.Any: diff --git a/src/kiara/utils/output.py b/src/kiara/utils/output.py index 3e981c49b..26c9d41c1 100644 --- a/src/kiara/utils/output.py +++ b/src/kiara/utils/output.py @@ -6,6 +6,7 @@ from rich.console import RenderableType from rich.table import Table as RichTable +from kiara.interfaces import get_console from kiara.utils import dict_from_cli_args @@ -176,3 +177,11 @@ def pretty_print_arrow_table( rich_table.add_row(*row) return rich_table + + +def rich_print(msg: typing.Any = None) -> None: + + if msg is None: + msg = "" + console = get_console() + console.print(msg)