From de8f92f9318f5f3241161f3131c4ddbe6c14fec2 Mon Sep 17 00:00:00 2001 From: Julian Geiger Date: Mon, 10 Feb 2025 18:56:41 +0100 Subject: [PATCH] Commit changes to continue. --- src/aiida/cmdline/commands/cmd_profile.py | 71 +++-- src/aiida/cmdline/params/options/main.py | 23 +- src/aiida/common/utils.py | 2 + src/aiida/repository/repository.py | 1 + src/aiida/tools/dumping/base.py | 4 + src/aiida/tools/dumping/collection.py | 318 +++++++++++++++++----- src/aiida/tools/dumping/config.py | 11 + src/aiida/tools/dumping/process.py | 9 +- src/aiida/tools/dumping/profile.py | 131 +++++---- src/aiida/tools/dumping/utils.py | 88 ++++-- tests/tools/dumping/test_collection.py | 151 +++++----- 11 files changed, 553 insertions(+), 256 deletions(-) create mode 100644 src/aiida/tools/dumping/config.py diff --git a/src/aiida/cmdline/commands/cmd_profile.py b/src/aiida/cmdline/commands/cmd_profile.py index 215af6c2b6..e616debd06 100644 --- a/src/aiida/cmdline/commands/cmd_profile.py +++ b/src/aiida/cmdline/commands/cmd_profile.py @@ -273,37 +273,57 @@ def profile_delete(force, delete_data, profiles): @verdi_profile.command('mirror') @options.PATH() +@options.DRY_RUN() @options.OVERWRITE() -# @options.INCREMENTAL() @options.DUMP_PROCESSES() -@options.DEDUPLICATE() +@options.GROUPS() +@options.ORGANIZE_BY_GROUPS() +# @options.DEDUPLICATE() +# @click.option( +# '--check-dirs/--no-check-dirs', +# default=False, +# show_default=True, +# help='Check for existence of dump directories. Otherwise, incremental mirroring is only evaluated from the log.') +@click.option( + '--symlink-duplicates/--no-symlink-duplicates', + default=True, + show_default=True, + help='Symlink data if the same node is contained in multiple groups.') +@click.option( + '--delete-missing/--no-delete-missing', + default=False, + show_default=True, + help="If a previously dumped node is deleted from AiiDA's DB, also delete the corresponding dump directory.") +@click.option( + '--extra-calc-dirs/--no-extra-calc-dirs', + default=False, + show_default=True, + help='If a top-level process calls sub-processes, create a designated directory only for the top-level process.') @options.INCLUDE_INPUTS() @options.INCLUDE_OUTPUTS() @options.INCLUDE_ATTRIBUTES() @options.INCLUDE_EXTRAS() @options.FLAT() -@options.DUMP_CONFIG_FILE() -@options.GROUPS() -@options.ORGANIZE_BY_GROUPS() -@options.DRY_RUN() @click.pass_context def profile_mirror( ctx, path, - overwrite, - organize_by_groups, dry_run, + overwrite, dump_processes, - deduplicate, + groups, + organize_by_groups, + symlink_duplicates, + delete_missing, + extra_calc_dirs, + # check_dirs, include_inputs, include_outputs, include_attributes, include_extras, flat, - dump_config_file, - groups, ): - """Dump all data in an AiiDA profile's storage to disk.""" + """Dump all data in an AiiDA profile's storage to disk in a human-readable directory tree.""" import json from datetime import datetime @@ -313,6 +333,7 @@ def profile_mirror( from aiida.tools.dumping.base import BaseDumper from aiida.tools.dumping.logger import DumpLogger from aiida.tools.dumping.utils import prepare_dump_path + from aiida.tools.dumping.config import ProfileDumpConfig profile = ctx.obj['profile'] @@ -321,7 +342,7 @@ def profile_mirror( if path is None: path = Path.cwd() / f'{profile.name}-mirror' - echo.echo_report(f'Mirroring data of profile `{profile.name}`at path: `{path}`.') + echo.echo_report(f'Mirroring data of profile `{profile.name}` at path: `{path}`.') SAFEGUARD_FILE: str = '.verdi_profile_mirror' # noqa: N806 safeguard_file_path: Path = path / SAFEGUARD_FILE @@ -336,8 +357,6 @@ def profile_mirror( except FileExistsError as exc: echo.echo_critical(str(exc)) - breakpoint() - try: with safeguard_file_path.open('r') as fhandle: last_dump_time = datetime.fromisoformat(fhandle.readlines()[-1].strip().split()[-1]).astimezone() @@ -346,9 +365,10 @@ def profile_mirror( if dry_run: node_counts = ProfileDumper._get_number_of_nodes_to_dump(last_dump_time) - node_counts_str = ' & '.join(f'{count} {node_type}' for node_type, count in node_counts.items()) - dry_run_message = f'Dry run for mirroring of profile `{profile.name}`: {node_counts_str} to dump.\n' + dry_run_message = f'Dry run for mirroring of profile `{profile.name}`. Would dump:' echo.echo_report(dry_run_message) + for count, node_type in node_counts.items(): + echo.echo_report(f'{count}: {node_type}') return if incremental: @@ -376,18 +396,25 @@ def profile_mirror( flat=flat, ) + # breakpoint() + profile_dump_config = ProfileDumpConfig( + dump_processes=dump_processes, + symlink_duplicates=symlink_duplicates, + delete_missing=delete_missing, + extra_calc_dirs=extra_calc_dirs, + organize_by_groups=organize_by_groups, + ) + profile_dumper = ProfileDumper( + profile=profile, + profile_dump_config=profile_dump_config, base_dumper=base_dumper, process_dumper=process_dumper, dump_logger=dump_logger, groups=groups, - organize_by_groups=organize_by_groups, - deduplicate=deduplicate, - profile=profile, - dump_processes=dump_processes, ) - profile_dumper.dump() + profile_dumper.dump_processes() # Append the current time to the file last_dump_time = datetime.now().astimezone() diff --git a/src/aiida/cmdline/params/options/main.py b/src/aiida/cmdline/params/options/main.py index 82d4fda8d8..a806d1b1a2 100644 --- a/src/aiida/cmdline/params/options/main.py +++ b/src/aiida/cmdline/params/options/main.py @@ -53,12 +53,10 @@ 'DB_PORT', 'DB_USERNAME', 'DEBUG', - 'DEDUPLICATE', 'DESCRIPTION', 'DICT_FORMAT', 'DICT_KEYS', 'DRY_RUN', - 'DUMP_CONFIG_FILE', 'DUMP_PROCESSES', 'EXIT_STATUS', 'EXPORT_FORMAT', @@ -792,13 +790,13 @@ def set_log_level(ctx, _param, value): show_default=True, ) -DEDUPLICATE = OverridableOption( - '--deduplicate/--no-deduplicate', - is_flag=True, - default=True, - show_default=True, - help='', -) +# DEDUPLICATE = OverridableOption( +# '--deduplicate/--no-deduplicate', +# is_flag=True, +# default=True, +# show_default=True, +# help='', +# ) DUMP_PROCESSES = OverridableOption( '--dump-processes/--no-dump-processes', @@ -808,13 +806,6 @@ def set_log_level(ctx, _param, value): help='Dump process data.', ) -DUMP_CONFIG_FILE = OverridableOption( - '--dump-config-file', - default=None, - type=types.FileOrUrl(), - help='Provide dumping options via a config file in YAML format.', -) - ORGANIZE_BY_GROUPS = OverridableOption( '--organize-by-groups/--no-organize-by-groups', default=True, diff --git a/src/aiida/common/utils.py b/src/aiida/common/utils.py index 1b2f2b14ce..8cd1046dfb 100644 --- a/src/aiida/common/utils.py +++ b/src/aiida/common/utils.py @@ -17,6 +17,8 @@ from datetime import datetime from typing import Any, Dict from uuid import UUID +from aiida.manage import get_manager, load_profile +from aiida.manage.configuration.profile import Profile from .lang import classproperty diff --git a/src/aiida/repository/repository.py b/src/aiida/repository/repository.py index 992a96447d..32351ddeef 100644 --- a/src/aiida/repository/repository.py +++ b/src/aiida/repository/repository.py @@ -519,6 +519,7 @@ def copy_tree(self, target: Union[str, pathlib.Path], path: Optional[FilePath] = dirpath.mkdir(parents=True, exist_ok=True) with self.open(root / filename) as handle: + # TODO: Possibly skip filepath.write_bytes(handle.read()) # these methods are not actually used in aiida-core, but are here for completeness diff --git a/src/aiida/tools/dumping/base.py b/src/aiida/tools/dumping/base.py index bbe63c9301..6bbd5b505e 100644 --- a/src/aiida/tools/dumping/base.py +++ b/src/aiida/tools/dumping/base.py @@ -14,9 +14,13 @@ @dataclass class BaseDumper: + """Container for shared arguments of all Dumper classes.""" + dump_parent_path: Path | None = None overwrite: bool = False incremental: bool = True + check_dirs: bool = False + # TODO: Make this a per-class attribute? last_dump_time: datetime | None = None def __post_init__(self): diff --git a/src/aiida/tools/dumping/collection.py b/src/aiida/tools/dumping/collection.py index edc8219b98..56352b4574 100644 --- a/src/aiida/tools/dumping/collection.py +++ b/src/aiida/tools/dumping/collection.py @@ -10,26 +10,27 @@ from __future__ import annotations +from dataclasses import dataclass import os from datetime import datetime -from functools import cached_property from pathlib import Path -from typing import TYPE_CHECKING, NamedTuple, TypeVar +from typing import TYPE_CHECKING, NamedTuple from aiida import orm +from aiida.common.exceptions import NotExistent from aiida.common.log import AIIDA_LOGGER from aiida.tools.dumping.base import BaseDumper from aiida.tools.dumping.logger import DumpLog, DumpLogger from aiida.tools.dumping.process import ProcessDumper from aiida.tools.dumping.utils import filter_by_last_dump_time +from aiida.tools.dumping.config import ProfileDumpConfig +from typing import Literal if TYPE_CHECKING: from collections.abc import Sequence from aiida.tools.dumping.logger import DumpDict -T = TypeVar('T', bound='orm.ProcessNode') - logger = AIIDA_LOGGER.getChild('tools.dumping') @@ -38,98 +39,175 @@ class ProcessesToDump(NamedTuple): calculations: Sequence[orm.CalculationNode] workflows: Sequence[orm.WorkflowNode] + @property + def is_empty(self) -> bool: + """Check if there are any processes to dump.""" + return len(self.calculations) == 0 and len(self.workflows) == 0 + + +# @dataclass +# class CollectionDumpConfig: +# dump_processes: bool = True +# symlink_duplicates: bool = True +# delete_missing: bool = False +# extra_calc_dirs: bool = False +# organize_by_groups: bool = True class CollectionDumper: + """Class to handle dumping of a collection of AiiDA ORM entities.""" + def __init__( self, + collection: orm.Group | str | Sequence[str] | Sequence[int], + profile_dump_config: ProfileDumpConfig | None = None, base_dumper: BaseDumper | None = None, process_dumper: ProcessDumper | None = None, dump_logger: DumpLogger | None = None, - collection: orm.Group | str | list[str] | None = None, - deduplicate: bool = True, output_path: Path | None = None, - processes_to_dump: ProcessesToDump | None = None, ): - self.deduplicate = deduplicate + """Initialize the CollectionDumper. - # Collection could be a Group or a list of nodes - if isinstance(collection, str): - try: - collection = orm.load_group(collection) - except: - raise + :param collection: The collection of AiiDA ORM entities to be dumped, either a group, group label, or list of + :param base_dumper: Base dumper instance or None (gets instantiated). + :param process_dumper: Process dumper instance or None (gets instantiated). + :param dump_logger: Logger for the dumping (gets instantiated). + :param output_path: The parent output path for dumping the collection nodes. + :param processes_to_dump: Optional precomputed processes to dump. + """ - self.collection = collection + self.collection = self._validate_collection(collection) self.base_dumper = base_dumper or BaseDumper() self.process_dumper = process_dumper or ProcessDumper() self.dump_logger = dump_logger or DumpLogger(dump_parent_path=self.base_dumper.dump_parent_path) - # Properly set the `output_path` attribute - if output_path is not None: - self.output_path = output_path - else: - self.output_path = Path.cwd() - - @cached_property - def nodes(self) -> list[str]: - return self._get_nodes() - - def _get_nodes(self) -> list[str]: - nodes: list[str] | None = None - if isinstance(self.collection, orm.Group): - nodes = [n.uuid for n in self.collection.nodes] - elif isinstance(self.collection, list) and len(self.collection) > 0: - if all(isinstance(n, str) for n in self.collection): - nodes = self.collection - else: - msg = 'A collection of nodes must be passed via their UUIDs.' - raise TypeError(msg) + self.output_path = output_path or Path.cwd() + + self.profile_dump_config = profile_dump_config or ProfileDumpConfig() + + self._collection_nodes: Sequence[str] | Sequence[int] | None = None + self._processes_to_dump: ProcessesToDump | None = None + + def _validate_collection( + self, collection: orm.Group | str | Sequence[str] | Sequence[int] + ) -> orm.Group | Sequence[str] | Sequence[int]: + """Validate the given collection identifier. + + :param collection: The input collection to validate. + :return: The validated collection. + :raises NotExistent: If no ``orm.Group`` can be loaded for a given label. + :raises ValueError: If no list of integers or strings to identify nodes is passed. + """ + + if isinstance(collection, str): + try: + return orm.load_group(collection) + except Exception as exc: + msg = f'Could not load group: {collection}.' + raise NotExistent(msg) from exc + if (isinstance(collection, list) and all(isinstance(n, (str, int)) for n in collection)) or isinstance( + collection, orm.Group + ): + return collection + else: - nodes = [] + msg = f'{collection} is an invalid collection.' + raise ValueError(msg) + + @property + def collection_nodes(self) -> Sequence[str] | Sequence[int]: + """Return collection nodes. + + :return: List of collection node identifiers. + """ + if not self._collection_nodes: + self._collection_nodes = self._get_collection_nodes() + return self._collection_nodes + + def _get_collection_nodes(self) -> Sequence[str] | Sequence[int]: + """Retrieve the node ``PK``s/``UUID``s from the collection, filtered by the last dump time, if incremental + dumping is selected. + + :return: List of node identifiers. + """ + if not self.collection: + return [] + + nodes = [n.uuid for n in self.collection.nodes] if isinstance(self.collection, orm.Group) else self.collection + + if self.base_dumper.incremental and self.base_dumper.last_dump_time: + nodes = filter_by_last_dump_time(nodes, last_dump_time=self.base_dumper.last_dump_time) - # TODO: Possibly have `last_dump_time` as attribute of CollectionDumper instead - # nodes = filter_by_last_dump_time(nodes=nodes, last_dump_time=self.last_dump_time) - nodes = filter_by_last_dump_time(nodes=nodes, last_dump_time=self.base_dumper.last_dump_time) return nodes - @cached_property + @property def processes_to_dump(self) -> ProcessesToDump: - return self._get_processes_to_dump() + """Get the processes to dump from the collection of nodes. + + :return: Instance of the ``ProcessesToDump`` class containing the selected calculations and workflows. + """ + if not self._processes_to_dump: + self._processes_to_dump = self._get_processes_to_dump() + return self._processes_to_dump def _get_processes_to_dump(self) -> ProcessesToDump: - nodes = [orm.load_node(n) for n in self.nodes] - workflows = [node for node in nodes if isinstance(node, orm.WorkflowNode)] - calculations = [node for node in nodes if isinstance(node, orm.CalculationNode)] + """Retrieve the processeses from the collection nodes. - # Make sure that only top-level workflows are dumped in their own directories when de-duplcation is enabled - if self.deduplicate: - workflows = [workflow for workflow in workflows if workflow.caller is None] + If deduplication is selected, this method takes care of only dumping top-level workflows and only dump + calculations in their own designated directories if they are not part of a workflow. - else: - # If no deduplication, also sub-calculations that were called by workflows of the group, and which are not - # contained in the group.nodes directly are being dumped explicitly + :return: Instance of the ``ProcessesToDump`` class containing the selected calculations and workflows. + """ + + if not self.collection_nodes: + return ProcessesToDump(calculations=[], workflows=[]) + + # Better than: `nodes = [orm.load_node(n) for n in self.collection_nodes]` + # As the list comprehension fetches each node from the DB individually + nodes_orm = orm.QueryBuilder().append(orm.Node, filters={'uuid': {'in': self.collection_nodes}}).all(flat=True) + + workflows = [node for node in nodes_orm if isinstance(node, orm.WorkflowNode)] + calculations = [node for node in nodes_orm if isinstance(node, orm.CalculationNode)] + + # Make sure that only top-level workflows and calculations are dumped + workflows = [workflow for workflow in workflows if workflow.caller is None] + + # If sub-calculations that were called by workflows of the group, and which are not + # contained in the group.nodes directly are being dumped explicitly + # breakpoint() + if self.profile_dump_config.extra_calc_dirs: called_calculations = [] for workflow in workflows: called_calculations += [ node for node in workflow.called_descendants if isinstance(node, orm.CalculationNode) ] - calculations += called_calculations + # Convert to set to avoid duplicates + calculations = list(set(calculations + called_calculations)) + else: + calculations = [calculation for calculation in calculations if calculation.caller is None] return ProcessesToDump( calculations=calculations, workflows=workflows, ) - def should_dump_processes(self) -> bool: - # if self.processes_to_dump is None: - # self._get_processes_to_dump() - return (len(self.processes_to_dump.calculations) + len(self.processes_to_dump.workflows)) > 0 - def _dump_calculations(self, calculations: Sequence[orm.CalculationNode]) -> None: + + """Dump a collection of calculations. + + Deduplication is already handled in the ``get_processes`` method, where PKs/UUIDs are used, rather than AiiDA + ORM entities as here. Specifically, calculations that are part of a workflow are not dumpid in their own, + dedicated directory if they are part of a workflow. + + :param calculations: Sequence of ``orm.CalculationNode``s + :return: None + """ + calculations_path = self.output_path / 'calculations' - dumped_calculations = {} + dumped_calculations: dict[str, DumpLog] = {} + + logged_calculations: DumpDict = self.dump_logger.get_log()['calculations'] for calculation in calculations: calculation_dumper = self.process_dumper @@ -138,6 +216,13 @@ def _dump_calculations(self, calculations: Sequence[orm.CalculationNode]) -> Non process_node=calculation, prefix=None ) + if self.profile_dump_config.symlink_duplicates and calculation.uuid in logged_calculations.keys(): + calculation_dump_path.parent.mkdir(exist_ok=True, parents=True) + os.symlink( + src=logged_calculations[calculation.uuid].path, + dst=calculation_dump_path, + ) + # This is handled in the get_processes method: `if calculation.caller is None:` calculation_dumper._dump_calculation(calculation_node=calculation, output_path=calculation_dump_path) @@ -149,11 +234,16 @@ def _dump_calculations(self, calculations: Sequence[orm.CalculationNode]) -> Non self.dump_logger.update_calculations(new_calculations=dumped_calculations) def _dump_workflows(self, workflows: Sequence[orm.WorkflowNode]) -> None: + """Dump a collection of workflows. + + """ workflow_path: Path = self.output_path / 'workflows' dumped_workflows: dict[str, DumpLog] = {} workflow_path.mkdir(exist_ok=True, parents=True) + logged_workflows: DumpDict = self.dump_logger.get_log()['workflows'] + for workflow in workflows: workflow_dumper: ProcessDumper = self.process_dumper @@ -161,10 +251,10 @@ def _dump_workflows(self, workflows: Sequence[orm.WorkflowNode]) -> None: process_node=workflow, prefix=None ) - logged_workflows: DumpDict = self.dump_logger.get_log()['workflows'] - # Symlink here, if deduplication enabled and workflow was already dumped - if self.deduplicate and workflow in logged_workflows.keys(): + if self.profile_dump_config.symlink_duplicates and workflow.uuid in logged_workflows.keys(): + workflow_dump_path.parent.mkdir(exist_ok=True, parents=True) + os.symlink( src=logged_workflows[workflow.uuid].path, dst=workflow_dump_path, @@ -175,18 +265,110 @@ def _dump_workflows(self, workflows: Sequence[orm.WorkflowNode]) -> None: output_path=workflow_dump_path, ) - dumped_workflows[workflow.uuid] = DumpLog( - path=workflow_dump_path, - time=datetime.now().astimezone(), - ) + dumped_workflows[workflow.uuid] = DumpLog( + path=workflow_dump_path, + time=datetime.now().astimezone(), + ) self.dump_logger.update_workflows(new_workflows=dumped_workflows) def dump(self) -> None: + """Top-level method that actually performs the dumping of the AiiDA data for the collection. + + :return: None + """ + self.output_path.mkdir(exist_ok=True, parents=True) collection_processes: ProcessesToDump = self._get_processes_to_dump() + # breakpoint() + + if not self.processes_to_dump.is_empty: + # self._dump_processes(processes=collection_processes) + + # First, dump workflows, then calculations + if len(collection_processes.workflows) > 0: + # breakpoint() + self._dump_workflows(workflows=collection_processes.workflows) + if len(collection_processes.calculations) > 0: + # breakpoint() + self._dump_calculations(calculations=collection_processes.calculations) + +# TODO: See, if I can generalize the dump sub-methods + # def _dump_processes( + # self, + # # processes: Sequence[orm.CalculationNode | orm.WorkflowNode], + # processes: Sequence[orm.CalculationNode] | Sequence[orm.WorkflowNode], + # ) -> None: + # """Dump a collection of calculations or workflows. + + # :param processes: Sequence of ``orm.CalculationNode``s or ``orm.WorkflowNode``s + # :param process_type: Type of processes, either 'calculations' or 'workflows' + # :return: None + # """ + + # # From, e.g., 'aiida.workflows:core.arithmetic.multiply_add' to 'workflows + # if isinstance(processes[0], orm.CalculationNode): + # process_type_str = 'calculations' + # elif isinstance(processes[0], orm.WorkflowNode): + # process_type_str = 'workflows' + # # else: + # # breakpoint() + # # process_type_str = processes[0].process_type.split(':')[0].split('.')[1] + # process_type_path = self.output_path / process_type_str + # process_type_path.mkdir(exist_ok=True, parents=True) + + # dumped_processes: dict[str, DumpLog] = {} + # logged_processes: DumpDict = self.dump_logger.get_log()[process_type_str] + + # # breakpoint() + + # for process in processes: + # process_dumper = self.process_dumper + + # process_dump_path = process_type_path / process_dumper._generate_default_dump_path( + # process_node=process, prefix=None + # ) + + # # Target directory already exists, skip this process + # if process_dump_path.exists(): + # continue + + # else: + # # Symlink here, if deduplication enabled and process was already dumped + # # TODO: Possibly check dirs here + # # TODO: Alternatively have method/endpoint to delete one calculation from the dumping + # # TODO: Which would also update the log. + # # Otherwise, one might delete a calculation, maybe because it was wrong, and then it won't be dumped + # # anymore ever. + # if self.deduplicate and process.uuid in logged_processes.keys(): + # try: + # os.symlink( + # src=logged_processes[process.uuid].path, + # dst=process_dump_path, + # ) + # except: + # # raise + # pass + # # breakpoint() + # else: + # if process_type_str == 'calculations': + # process_dumper._dump_calculation(calculation_node=process, output_path=process_dump_path) + # elif process_type_str == 'workflows': + # process_dumper._dump_workflow( + # workflow_node=process, + # output_path=process_dump_path, + # ) + + + # dumped_processes[process.uuid] = DumpLog( + # path=process_dump_path, + # time=datetime.now().astimezone(), + # ) + + # # breakpoint() + + # if process_type_str == 'calculations': + # self.dump_logger.update_calculations(new_calculations=dumped_processes) + # elif process_type_str == 'workflows': + # self.dump_logger.update_workflows(new_workflows=dumped_processes) - if len(collection_processes.calculations) > 0: - self._dump_calculations(calculations=collection_processes.calculations) - if len(collection_processes.workflows) > 0: - self._dump_workflows(workflows=collection_processes.workflows) diff --git a/src/aiida/tools/dumping/config.py b/src/aiida/tools/dumping/config.py new file mode 100644 index 0000000000..cd8537ce3b --- /dev/null +++ b/src/aiida/tools/dumping/config.py @@ -0,0 +1,11 @@ +from dataclasses import dataclass + + +@dataclass +class ProfileDumpConfig: + dump_processes: bool = True + symlink_duplicates: bool = True # + delete_missing: bool = False # profile + extra_calc_dirs: bool = False # collection + organize_by_groups: bool = True # profile + diff --git a/src/aiida/tools/dumping/process.py b/src/aiida/tools/dumping/process.py index f65da5a15e..8a4f962bf2 100644 --- a/src/aiida/tools/dumping/process.py +++ b/src/aiida/tools/dumping/process.py @@ -42,6 +42,8 @@ class ProcessDumper: + """Class to handle dumping of an AiiDA process.""" + def __init__( self, base_dumper: BaseDumper | None = None, @@ -52,6 +54,10 @@ def __init__( flat: bool = False, dump_unsealed: bool = False, ) -> None: + """Initialize the CollectionDumper. + + + """ self.include_inputs = include_inputs self.include_outputs = include_outputs self.include_attributes = include_attributes @@ -218,8 +224,7 @@ def dump( # for key, value in kwargs.items(): # setattr(self, key, value) - if output_path is None: - output_path = self._generate_default_dump_path(process_node=process_node) + output_path = output_path or self._generate_default_dump_path(process_node=process_node) prepare_dump_path( path_to_validate=output_path, overwrite=self.base_dumper.overwrite, incremental=self.base_dumper.incremental diff --git a/src/aiida/tools/dumping/profile.py b/src/aiida/tools/dumping/profile.py index 6b9f33a58e..04374ebe16 100644 --- a/src/aiida/tools/dumping/profile.py +++ b/src/aiida/tools/dumping/profile.py @@ -7,11 +7,12 @@ # For further information please visit http://www.aiida.net # ########################################################################### -# TODO: Use `batch_iter` from aiida.tools.archive.common +# TODO: Possibly use `batch_iter` from aiida.tools.archive.common from __future__ import annotations -from typing import cast +from dataclasses import dataclass +from typing import Sequence, cast from aiida import orm from aiida.common.log import AIIDA_LOGGER @@ -19,6 +20,7 @@ from aiida.manage.configuration.profile import Profile from aiida.tools.dumping.base import BaseDumper from aiida.tools.dumping.collection import CollectionDumper +from aiida.tools.dumping.config import ProfileDumpConfig from aiida.tools.dumping.logger import DumpLogger from aiida.tools.dumping.process import ProcessDumper from aiida.tools.dumping.utils import filter_by_last_dump_time @@ -27,73 +29,67 @@ class ProfileDumper: + """Class to handle dumping of the data of an AiiDA profile.""" + def __init__( self, profile: str | Profile | None = None, + profile_dump_config: ProfileDumpConfig | None = None, base_dumper: BaseDumper | None = None, process_dumper: ProcessDumper | None = None, dump_logger: DumpLogger | None = None, - organize_by_groups: bool = True, - deduplicate: bool = True, - groups: list[str | orm.Group] | None = None, - dump_processes: bool = True, + # deduplicate: bool = True, + groups: Sequence[str | orm.Group] | None = None, ): - self.organize_by_groups = organize_by_groups - self.deduplicate = deduplicate - self.dump_processes = dump_processes + """Initialize the ProfileDumper. + + :param profile: The selected profile to dump. + :param base_dumper: Base dumper instance or None (gets instantiated). + :param process_dumper: Process dumper instance or None (gets instantiated). + :param dump_logger: Logger for the dumping (gets instantiated). + :param organize_by_groups: Organize dumped data by groups. + :param groups: Dump data only for selected groups. + :param dump_processes: Should dump process data? + """ + self.groups = groups self.base_dumper = base_dumper or BaseDumper() self.process_dumper = process_dumper or ProcessDumper() self.dump_logger = dump_logger or DumpLogger(dump_parent_path=self.base_dumper.dump_parent_path) - # Load the profile - if isinstance(profile, str): - profile = load_profile(profile) + self.profile_dump_config = profile_dump_config or ProfileDumpConfig() - if profile is None: - manager = get_manager() - profile = manager.get_profile() - - assert profile is not None + if not isinstance(profile, Profile): + profile = load_profile(profile=profile, allow_switch=True) self.profile = profile - def dump(self): - # No groups selected, dump data which is not part of any group - # If groups selected, however, this data should not also be dumped automatically - if not self.groups: - self._dump_processes_not_in_any_group() - - # Still, even without selecting groups, by default, all profile data should be dumped - # Thus, we obtain all groups in the profile here - profile_groups = orm.QueryBuilder().append(orm.Group).all(flat=True) - self._dump_processes_per_group(groups=profile_groups) - - else: - self._dump_processes_per_group(groups=self.groups) - def _dump_processes_not_in_any_group(self): - # === Dump the data that is not associated with any group === + """Dump the profile's process data not contained in any group.""" - # `dump_parent_path` is set in the `post_init` method of the `BaseDumper` dataclass + # `dump_parent_path` set to CWD in the `post_init` method of the `BaseDumper` dataclass if not given assert self.base_dumper.dump_parent_path is not None - if self.organize_by_groups: + if self.profile_dump_config.organize_by_groups: output_path = self.base_dumper.dump_parent_path / 'no-group' else: output_path = self.base_dumper.dump_parent_path - no_group_nodes = self._get_no_group_nodes() + no_group_nodes = self._get_no_group_processes() no_group_dumper = CollectionDumper( + collection=no_group_nodes, + profile_dump_config=self.profile_dump_config, base_dumper=self.base_dumper, process_dumper=self.process_dumper, - collection=no_group_nodes, - deduplicate=self.deduplicate, + # deduplicate=self.deduplicate, dump_logger=self.dump_logger, output_path=output_path, ) - if self.dump_processes and no_group_dumper.should_dump_processes(): + # Add additional check here to only issue the message when there are actual processes to dump for a group + # This might not be the case for, e.g., pseudopotential groups, or if there are no new processes since the + # last dumping + if self.dump_processes and not no_group_dumper.processes_to_dump.is_empty: logger.report(f'Dumping processes not in any group for profile `{self.profile.name}`...') no_group_dumper.dump() @@ -104,59 +100,84 @@ def _dump_processes_per_group(self, groups): assert self.base_dumper.dump_parent_path is not None for group in groups: - if self.organize_by_groups: + if self.profile_dump_config.organize_by_groups: output_path = self.base_dumper.dump_parent_path / f'group-{group.label}' else: output_path = self.base_dumper.dump_parent_path group_dumper = CollectionDumper( base_dumper=self.base_dumper, + profile_dump_config=self.profile_dump_config, process_dumper=self.process_dumper, dump_logger=self.dump_logger, collection=group, - deduplicate=self.deduplicate, + # deduplicate=self.deduplicate, output_path=output_path, ) - if self.dump_processes and group_dumper.should_dump_processes(): + # Add additional check here to only issue the message when there are actual processes to dump for a group + # This might not be the case for, e.g., pseudopotential groups, or if there are no new processes since the + # last dumping + # breakpoint() + if self.dump_processes and not group_dumper.processes_to_dump.is_empty: + # breakpoint() logger.report(f'Dumping processes in group {group.label} for profile `{self.profile.name}`...') group_dumper.dump() - def _get_no_group_nodes(self) -> list[str]: - # Get all nodes that are _not_ in any group + def _get_no_group_processes(self) -> Sequence[str] | Sequence[int]: + """Obtain nodes in the profile that are not part of any group. + + :return: List of UUIDs of selected nodes. + """ + group_qb = orm.QueryBuilder().append(orm.Group) - profile_groups = cast(list[orm.Group], group_qb.all(flat=True)) - node_qb = orm.QueryBuilder().append(orm.Node, project=['uuid']) - profile_nodes = cast(list[str], node_qb.all(flat=True)) + profile_groups = cast(Sequence[orm.Group], group_qb.all(flat=True)) + process_qb = orm.QueryBuilder().append(orm.ProcessNode, project=['uuid']) + profile_nodes = cast(Sequence[str], process_qb.all(flat=True)) - nodes_in_groups: list[str] = [node.uuid for group in profile_groups for node in group.nodes] + nodes_in_groups: Sequence[str] = [node.uuid for group in profile_groups for node in group.nodes] # Need to expand here also with the called_descendants of `WorkflowNodes`, otherwise the called # `CalculationNode`s for `WorkflowNode`s that are part of a group are dumped twice # Get the called descendants of WorkflowNodes within the nodes_in_groups list - - sub_nodes_in_groups: list[str] = [ + sub_nodes_in_groups: Sequence[str] = [ node.uuid for n in nodes_in_groups - if isinstance((workflow_node := orm.load_node(n)), orm.WorkflowNode) + # if isinstance((workflow_node := orm.load_node(n)), orm.WorkflowNode) + if isinstance((workflow_node := orm.load_node(n)), orm.ProcessNode) for node in workflow_node.called_descendants ] - # sub_nodes_in_groups: list[str] = [node.uuid for node in sub_nodes_in_groups] nodes_in_groups += sub_nodes_in_groups - nodes: list[str] = [profile_node for profile_node in profile_nodes if profile_node not in nodes_in_groups] - nodes = filter_by_last_dump_time(nodes=nodes, last_dump_time=self.base_dumper.last_dump_time) + process_nodes: Sequence[str | int] = [ + profile_node for profile_node in profile_nodes if profile_node not in nodes_in_groups + ] + process_nodes = filter_by_last_dump_time(nodes=process_nodes, last_dump_time=self.base_dumper.last_dump_time) + + return process_nodes + + def dump_processes(self): + # No groups selected, dump data which is not part of any group + # If groups selected, however, this data should not also be dumped automatically + if not self.groups: + self._dump_processes_not_in_any_group() + + # Still, even without selecting groups, by default, all profile data should be dumped + # Thus, we obtain all groups in the profile here + profile_groups = orm.QueryBuilder().append(orm.Group).all(flat=True) + self._dump_processes_per_group(groups=profile_groups) - return nodes + else: + self._dump_processes_per_group(groups=self.groups) @staticmethod def _get_number_of_nodes_to_dump(last_dump_time) -> dict[str, int]: result = {} for node_type in (orm.CalculationNode, orm.WorkflowNode): qb = orm.QueryBuilder().append(node_type, project=['uuid']) - nodes = cast(list[str], qb.all(flat=True)) + nodes = cast(Sequence[str], qb.all(flat=True)) nodes = filter_by_last_dump_time(nodes=nodes, last_dump_time=last_dump_time) result[node_type.class_node_type.split('.')[-2] + 's'] = len(nodes) return result diff --git a/src/aiida/tools/dumping/utils.py b/src/aiida/tools/dumping/utils.py index 0573fede09..d2f216c539 100644 --- a/src/aiida/tools/dumping/utils.py +++ b/src/aiida/tools/dumping/utils.py @@ -13,6 +13,7 @@ import shutil from datetime import datetime from pathlib import Path +from typing import cast from aiida import orm from aiida.common.log import AIIDA_LOGGER @@ -40,6 +41,8 @@ def prepare_dump_path( `incremental` are enabled. :raises FileNotFoundError: If no `safeguard_file` is found.""" + # TODO: Handle symlinks + if overwrite and incremental: msg = 'Both overwrite and incremental set to True. Only specify one.' raise ValueError(msg) @@ -63,9 +66,16 @@ def prepare_dump_path( safeguard_exists = (path_to_validate / safeguard_file).is_file() if safeguard_exists: + logger.report(path_to_validate) + # breakpoint() msg = '`--overwrite` option selected. Will recreate directory.' logger.report(msg) - shutil.rmtree(path_to_validate) + try: + shutil.rmtree(path_to_validate) + except OSError: + # `shutil.rmtree` fails for symbolic links with + # OSError: Cannot call rmtree on a symbolic link + _delete_dir_recursively(path_to_validate) else: msg = ( @@ -79,20 +89,64 @@ def prepare_dump_path( (path_to_validate / safeguard_file).touch() -def sanitize_file_extension(filename: str | Path): - if isinstance(filename, Path): - filename = str(filename) - if filename.endswith('.mpl_pdf'): - filename = filename.replace('.mpl_pdf', '.pdf') - if filename.endswith('.mpl_png'): - filename = filename.replace('.mpl_png', '.png') - - return Path(filename) - - -def filter_by_last_dump_time(nodes: list[str], last_dump_time: datetime | None = None) -> list[str]: - if last_dump_time is not None: - orm_nodes = [orm.load_node(node) for node in nodes] - return [node.uuid for node in orm_nodes if node.mtime > last_dump_time] - else: +def _delete_dir_recursively(path): + """ + Delete folder, sub-folders and files. + Implementation taken from: https://stackoverflow.com/a/70285390/9431838 + """ + for f in path.glob('**/*'): + if f.is_symlink(): + f.unlink(missing_ok=True) # missing_ok is added in python 3.8 + elif f.is_file(): + f.unlink() + elif f.is_dir(): + try: + f.rmdir() # delete empty sub-folder + except OSError: # sub-folder is not empty + _delete_dir_recursively(f) # recurse the current sub-folder + except Exception as exception: # capture other exception + print(f'exception name: {exception.__class__.__name__}') + print(f'exception msg: {exception}') + + try: + path.rmdir() # time to delete an empty folder + except NotADirectoryError: + path.unlink() # delete folder even if it is a symlink, linux + except Exception as exception: + print(f'exception name: {exception.__class__.__name__}') + print(f'exception msg: {exception}') + + +def _get_filtered_nodes(nodes: list[str | int], last_dump_time: datetime, key: str = 'uuid') -> list[str | int]: + """Helper function to get ``orm.Node``s from the DB based on ``id``/``uuid`` and filter by ``mtime``. + + :param nodes: Collection of node PKs or UUIDs + :param last_dump_time: Last time nodes were dumped to disk. + :param key: Identifier to obtain nodes with, either ``id`` or ``uuid``. + :return: List of nodes filtered by ``last_dump_time``. + """ + + qb = orm.QueryBuilder().append(orm.Node, filters={key: {'in': nodes}}) + nodes_orm: list[orm.Node] = cast(list[orm.Node], qb.all(flat=True)) + return [getattr(node, key) for node in nodes_orm if node.mtime > last_dump_time] + + +def filter_by_last_dump_time(nodes: list[str | int], last_dump_time: datetime) -> list[str | int]: + """Filter a list of nodes by the last dump time of the corresponding dumper. + + :param nodes: A list of node identifiers, which can be either UUIDs (str) or IDs (int). + :param last_dump_time: Only include nodes dumped after this timestamp. + :return: A list of node identifiers that have a dump time after the specified last_dump_time. + """ + + # TODO: Possibly directly use QueryBuilder filter. Though, `nodes` directly accessible from orm.Group.nodes + + if not nodes or last_dump_time is None: return nodes + + key = 'uuid' if isinstance(nodes[0], str) else 'id' + return _get_filtered_nodes( + nodes=nodes, + last_dump_time=last_dump_time, + key=key, + ) diff --git a/tests/tools/dumping/test_collection.py b/tests/tools/dumping/test_collection.py index 5ad3ddd01b..6b79dd1195 100644 --- a/tests/tools/dumping/test_collection.py +++ b/tests/tools/dumping/test_collection.py @@ -1,4 +1,3 @@ -########################################################################### # Copyright (c), The AiiDA team. All rights reserved. # # This file is part of the AiiDA code. # # # @@ -17,7 +16,7 @@ import pytest from aiida import orm -from aiida.tools.dumping import CollectionDumper, collection +from aiida.tools.dumping import CollectionDumper from .test_utils import compare_tree @@ -28,7 +27,7 @@ # generate_calculation_node_add_class() # You can also do any additional setup here -@pytest.mark.usefixtures('aiida_profile_clean') +# @pytest.mark.usefixtures('aiida_profile_clean') @pytest.fixture() def setup_no_process_group() -> orm.Group: no_process_group, _ = orm.Group.collection.get_or_create(label='no-process') @@ -38,7 +37,7 @@ def setup_no_process_group() -> orm.Group: return no_process_group -@pytest.mark.usefixtures('aiida_profile_clean') +# @pytest.mark.usefixtures('aiida_profile_clean') @pytest.fixture() def setup_add_group(generate_calculation_node_add) -> orm.Group: add_group, _ = orm.Group.collection.get_or_create(label='add') @@ -48,7 +47,7 @@ def setup_add_group(generate_calculation_node_add) -> orm.Group: return add_group -@pytest.mark.usefixtures('aiida_profile_clean') +# @pytest.mark.usefixtures('aiida_profile_clean') @pytest.fixture() def setup_multiply_add_group(generate_workchain_multiply_add) -> orm.Group: multiply_add_group, _ = orm.Group.collection.get_or_create(label='multiply-add') @@ -58,7 +57,7 @@ def setup_multiply_add_group(generate_workchain_multiply_add) -> orm.Group: return multiply_add_group -@pytest.mark.usefixtures('aiida_profile_clean') +# @pytest.mark.usefixtures('aiida_profile_clean') @pytest.fixture() def duplicate_group(): def _duplicate_group(source_group: orm.Group, dest_group_label: str): @@ -69,67 +68,68 @@ def _duplicate_group(source_group: orm.Group, dest_group_label: str): return _duplicate_group -@pytest.mark.usefixtures('aiida_profile_clean_class') +# @pytest.mark.usefixtures('aiida_profile_clean_class') class TestCollectionDumper: - def test_should_dump_processes(self, setup_no_process_group, setup_add_group): - """""" - no_process_group: orm.Group = setup_no_process_group - add_group: orm.Group = setup_add_group - - collection_dumper = CollectionDumper(collection=no_process_group) - - assert collection_dumper.should_dump_processes() is False + # @pytest.mark.usefixtures('aiida_profile_clean') + # def test_should_dump_processes(self, setup_no_process_group, setup_add_group): + # """""" + # no_process_group: orm.Group = setup_no_process_group + # add_group: orm.Group = setup_add_group - collection_dumper = CollectionDumper(collection=add_group) + # collection_dumper = CollectionDumper(collection=no_process_group) - assert collection_dumper.should_dump_processes() is True + # assert collection_dumper._should_dump_processes() is False + # collection_dumper = CollectionDumper(collection=add_group) - def test_get_nodes_add_group(self, setup_add_group): + # assert collection_dumper._should_dump_processes() is True + @pytest.mark.usefixtures('aiida_profile_clean') + def test_resolve_collection_nodes(self, setup_add_group, generate_calculation_node_add): add_group: orm.Group = setup_add_group + add_nodes = add_group.nodes - collection_dumper = CollectionDumper(collection=add_group) + add_dumper = CollectionDumper(collection=add_group) - nodes = collection_dumper._get_nodes() + nodes = add_dumper._get_collection_nodes() assert len(nodes) == 1 - # add_group: orm.Group = setup_add_group - - # collection_dumper = CollectionDumper(collection=add_group) - # nodes = collection_dumper._get_nodes() - # group_node = orm.load_node(nodes[0]) - # group_node_uuid = nodes[0] - - # assert len(nodes) == 1 - # assert isinstance(nodes[0], str) - # assert isinstance(group_node, orm.CalcJobNode) - # assert nodes[0] == group_node_uuid - - # # Now, add another CalcJobNode to the profile - # # As not part of the group, should not be returned - # cj_node1 = generate_calculation_node_add() - # nodes = collection_dumper._get_nodes() - # assert len(nodes) == 1 + assert isinstance(nodes[0], str) + assert nodes[0] == add_nodes[0].uuid + assert isinstance(orm.load_node(nodes[0]), orm.CalcJobNode) + + # Now, add another CalcJobNode to the profile + # As not part of the group, should not be returned + # Also, last_dump_time is None here by default, so no filtering applied + # Still contains the previous node in the returned collection + cj_node1 = generate_calculation_node_add() + nodes = add_dumper._get_collection_nodes() + assert len(nodes) == 1 + assert isinstance(nodes[0], str) + assert nodes[0] == add_nodes[0].uuid + assert isinstance(orm.load_node(nodes[0]), orm.CalcJobNode) - # # Now, add the node to the group, should be captured by get_nodes - # add_group.add_nodes([cj_node1]) - # nodes = collection_dumper._get_nodes() - # assert len(nodes) == 2 + # Now, add the node to the group, should be captured by get_nodes + add_group.add_nodes([cj_node1]) + nodes = add_dumper._get_collection_nodes() + assert len(nodes) == 2 + assert set(nodes) == set([add_nodes[0].uuid, cj_node1.uuid]) - # # Filtering by time should work - # collection_dumper.base_dumper.last_dump_time = datetime.now().astimezone() + # Filtering by time should work -> Now, only cj_node2 gets returned + add_dumper.base_dumper.last_dump_time = datetime.now().astimezone() - # cj_node2 = generate_calculation_node_add() - # add_group.add_nodes([cj_node2]) + cj_node2 = generate_calculation_node_add() + add_group.add_nodes([cj_node2]) - # nodes = collection_dumper._get_nodes() - # assert len(nodes) == 1 - # assert nodes[0] == cj_node2.uuid + nodes = add_dumper._get_collection_nodes() + assert len(nodes) == 1 + assert nodes[0] == cj_node2.uuid - # with pytest.raises(TypeError): - # collection_dumper = CollectionDumper(collection=[1]) - # collection_dumper._get_nodes() + for invalid_collection in [{'foo': 'bar'}, [1.0, 1.1]]: + collection_dumper = CollectionDumper(collection=invalid_collection) + with pytest.raises(ValueError): + collection_dumper._get_collection_nodes() + @pytest.mark.usefixtures('aiida_profile_clean') def test_get_processes_to_dump(self, setup_add_group, setup_multiply_add_group, duplicate_group): add_group: orm.Group = setup_add_group multiply_add_group: orm.Group = setup_multiply_add_group @@ -154,21 +154,15 @@ def test_get_processes_to_dump(self, setup_add_group, setup_multiply_add_group, # TODO: Test here also de-duplication with a Workflow with a sub-workflow - def test_dump_calculations(self, setup_add_group, setup_multiply_add_group, tmp_path): + @pytest.mark.usefixtures('aiida_profile_clean') + def test_dump_calculations_add(self, setup_add_group, tmp_path): add_group: orm.Group = setup_add_group - multiply_add_group: orm.Group = setup_multiply_add_group - - add_group_path = Path('add_group') - multiply_add_group_path = Path('multiply_add_group') - - add_dumper = CollectionDumper(collection=add_group, output_path=tmp_path / add_group_path) - multiply_add_dumper = CollectionDumper( - collection=multiply_add_group, output_path=tmp_path / multiply_add_group_path - ) + add_group_label = add_group.label + add_group_path = tmp_path / add_group_label - add_processes_to_dump = add_dumper._get_processes_to_dump() + add_dumper = CollectionDumper(collection=add_group, output_path=add_group_path) - add_dumper._dump_calculations(add_processes_to_dump.calculations) + add_dumper._dump_calculations(add_dumper._get_processes_to_dump().calculations) expected_tree = { 'calculations': { @@ -182,39 +176,44 @@ def test_dump_calculations(self, setup_add_group, setup_multiply_add_group, tmp_ compare_tree(expected=expected_tree, base_path=tmp_path, relative_path=add_group_path) - multiply_add_processes_to_dump = multiply_add_dumper._get_processes_to_dump() + @pytest.mark.usefixtures('aiida_profile_clean') + def test_dump_calculations_multiply_add(self, setup_multiply_add_group, tmp_path): + multiply_add_group: orm.Group = setup_multiply_add_group + multiply_add_group_label = multiply_add_group.label + multiply_add_group_path = tmp_path / multiply_add_group_label - # No calculations to dump when deduplication is enabled - multiply_add_dumper._dump_calculations(multiply_add_processes_to_dump.calculations) - multiply_add_test_path: Path = multiply_add_group_path / 'calculations' + multiply_add_dumper = CollectionDumper(collection=multiply_add_group, output_path=multiply_add_group_path) - assert not multiply_add_test_path.exists() + # No calculations to dump when deduplication is enabled + multiply_add_dumper._dump_calculations(multiply_add_dumper._get_processes_to_dump().calculations) + assert not (multiply_add_group_path / 'calculations').exists() + # Now, disable de-duplication -> Should dump calculations multiply_add_dumper_no_dedup = CollectionDumper( collection=multiply_add_group, output_path=multiply_add_group_path, deduplicate=False ) - multiply_add_processes_to_dump = multiply_add_dumper_no_dedup._get_processes_to_dump() - # calculations to dump when deduplication is enabled - multiply_add_dumper_no_dedup._dump_calculations(multiply_add_processes_to_dump.calculations) + multiply_add_dumper_no_dedup._dump_calculations( + multiply_add_dumper_no_dedup._get_processes_to_dump().calculations + ) expected_tree_no_dedup = { 'calculations': { - 'ArithmeticAddCalculation-15': { + 'ArithmeticAddCalculation-8': { 'inputs': ['_aiidasubmit.sh', 'aiida.in'], 'node_inputs': [], 'outputs': ['_scheduler-stderr.txt', '_scheduler-stdout.txt', 'aiida.out'], }, - 'multiply-13': { + 'multiply-6': { 'inputs': ['source_file'], 'node_inputs': [], }, } } - compare_tree(expected=expected_tree_no_dedup, base_path=tmp_path, relative_path=multiply_add_group_path) + compare_tree(expected=expected_tree_no_dedup, base_path=tmp_path, relative_path=Path(multiply_add_group_label)) - pytest.set_trace() + # pytest.set_trace() # def test_dump_workflows(self): # pass @@ -261,4 +260,4 @@ def test_dump_calculations(self, setup_add_group, setup_multiply_add_group, tmp_ # with pytest.raises(TypeError): # collection_dumper = CollectionDumper(collection=[1]) - # collection_dumper._get_nodes() \ No newline at end of file + # collection_dumper._get_nodes()