diff --git a/aiida_workgraph/calculations/python.py b/aiida_workgraph/calculations/python.py index 8a469863..90961605 100644 --- a/aiida_workgraph/calculations/python.py +++ b/aiida_workgraph/calculations/python.py @@ -259,7 +259,15 @@ def prepare_for_submission(self, folder: Folder) -> CalcInfo: # TODO: should check this recursively elif isinstance(value, (AttributeDict, dict)): # if the value is an AttributeDict, use recursively - input_values[key] = {k: v.value for k, v in value.items()} + if len(value.keys()) > 0 and list(value.keys())[0].startswith( + "list_data_" + ): + ndata = len(value.keys()) + input_values[key] = [ + value[f"list_data_{i}"].value for i in range(ndata) + ] + else: + input_values[key] = {k: v.value for k, v in value.items()} else: raise ValueError( f"Input data {value} is not supported. Only AiiDA data Node with a value attribute is allowed. " diff --git a/aiida_workgraph/calculations/python_parser.py b/aiida_workgraph/calculations/python_parser.py index 6c02c4c3..00f486c6 100644 --- a/aiida_workgraph/calculations/python_parser.py +++ b/aiida_workgraph/calculations/python_parser.py @@ -117,6 +117,12 @@ def serialize_output(self, result, output): else: serialized_result[key] = general_serializer(value) return serialized_result + elif isinstance(result, list): + serialized_result = {} + for i, value in enumerate(result): + key = f"list_data_{i}" + serialized_result[key] = general_serializer(value) + return serialized_result else: self.exit_codes.ERROR_INVALID_OUTPUT else: diff --git a/aiida_workgraph/decorator.py b/aiida_workgraph/decorator.py index 95a21655..1d19837e 100644 --- a/aiida_workgraph/decorator.py +++ b/aiida_workgraph/decorator.py @@ -274,6 +274,11 @@ def build_pythonjob_task(func: Callable) -> Task: tdata = {"executor": PythonJob, "task_type": "CALCJOB"} _, tdata_py = build_task_from_AiiDA(tdata) tdata = deepcopy(func.tdata) + # if the function has var_kwargs, we need to change the input type to Namespace + if tdata["var_kwargs"]: + for input in tdata["inputs"]: + if input["name"] == tdata["var_kwargs"]: + input["identifier"] = "workgraph.namespace" # merge the inputs and outputs from the PythonJob task to the function task # skip the already existed inputs and outputs inputs = tdata["inputs"] diff --git a/aiida_workgraph/engine/utils.py b/aiida_workgraph/engine/utils.py index 6c2d5a95..e4ddb03e 100644 --- a/aiida_workgraph/engine/utils.py +++ b/aiida_workgraph/engine/utils.py @@ -1,4 +1,3 @@ -from aiida_workgraph.orm.serializer import serialize_to_aiida_nodes from aiida import orm from aiida.common.extendeddicts import AttributeDict @@ -91,8 +90,6 @@ def prepare_for_python_task(task: dict, kwargs: dict, var_kwargs: dict) -> dict: ) # outputs output_info = task["outputs"] - # serialize the kwargs into AiiDA Data - function_kwargs = serialize_to_aiida_nodes(function_kwargs) # transfer the args to kwargs inputs = { "process_label": f"PythonJob<{task['name']}>", diff --git a/aiida_workgraph/utils/__init__.py b/aiida_workgraph/utils/__init__.py index 3ed54bde..1434340a 100644 --- a/aiida_workgraph/utils/__init__.py +++ b/aiida_workgraph/utils/__init__.py @@ -340,20 +340,28 @@ def serialize_pythonjob_properties(wgdata): if not task["metadata"]["node_type"].upper() == "PYTHONJOB": continue # get the names kwargs for the PythonJob, which are the inputs before _wait - input_kwargs = [] for input in task["inputs"]: if input["name"] == "_wait": break - input_kwargs.append(input["name"]) - for name in input_kwargs: - prop = task["properties"][name] - # if value is not None, not {} - if not ( - prop["value"] is None - or isinstance(prop["value"], dict) - and prop["value"] == {} - ): - prop["value"] = general_serializer(prop["value"]) + prop = task["properties"][input["name"]] + if input["identifier"] == "workgraph.namespace": + if isinstance(prop["value"], list): + prop["value"] = { + f"list_data_{i}": general_serializer(v) + for i, v in enumerate(prop["value"]) + } + elif isinstance(prop["value"], dict): + prop["value"] = { + k: general_serializer(v) for k, v in prop["value"].items() + } + else: + # if value is not None, not {} + if not ( + prop["value"] is None + or isinstance(prop["value"], dict) + and prop["value"] == {} + ): + prop["value"] = general_serializer(prop["value"]) def generate_bash_to_create_python_env( diff --git a/aiida_workgraph/web/backend/app/utils.py b/aiida_workgraph/web/backend/app/utils.py index 25c35751..5061629b 100644 --- a/aiida_workgraph/web/backend/app/utils.py +++ b/aiida_workgraph/web/backend/app/utils.py @@ -17,6 +17,8 @@ def get_executor_source(tdata: Any) -> Tuple[bool, Optional[str]]: source_code = "".join(source_lines) return source_code except (TypeError, OSError): + # In case of load function defined inside the Jupyter-notebook, + # OSError('source code not available') source_code = tdata["executor"].get("function_source_code", "") return source_code else: diff --git a/docs/source/built-in/pythonjob.ipynb b/docs/source/built-in/pythonjob.ipynb index 662a85c7..57aec6ca 100644 --- a/docs/source/built-in/pythonjob.ipynb +++ b/docs/source/built-in/pythonjob.ipynb @@ -1505,19 +1505,21 @@ "- **Querying**: The data in the namespace output is stored as an AiiDA data node, allowing for easy querying and retrieval.\n", "- **Data Provenance**: When the data is used as input for subsequent tasks, the origin of data is tracked.\n", "\n", - "### Example Use Case\n", - "\n", - "Consider a molecule adsorption calculation where the namespace output stores the surface slabs of the molecule adsorbed on different surface sites. The number of surface slabs can vary depending on the surface. These output surface slabs can be utilized as input to the next task to calculate the energy.\n", "\n", "### Defining Namespace Outputs\n", "\n", - "To declare a namespace output, set the `identifier` to `workgraph.namespace` in the `outputs` parameter of the `@task` decorator. For example:\n", + "To declare a namespace output, set the `identifier` to `workgraph.namespace` in the `outputs` parameter of the `@task` decorator. Take the equation of state (EOS) calculation as an example. The namespace output stores the scaled structures, which can vary depending on the scale list.\n", + "\n", "\n", "```python\n", - "@task(outputs=[{\"name\": \"structures\", \"identifier\": \"workgraph.namespace\"}])\n", - "def generate_surface_slabs():\n", - " # Function logic to generate surface slabs\n", - " return {\"slab1\": slab_data1, \"slab2\": slab_data2}\n", + "@task.pythonjob(outputs=[{\"name\": \"structures\", \"identifier\": \"workgraph.namespace\"}])\n", + "def scaled_structures(structure: Atoms, scales: list) -> list[Atoms]:\n", + " structures = {}\n", + " for i in range(len(scales)):\n", + " scaled_structure = structure.copy()\n", + " scaled_structure.set_cell(scaled_structure.cell * scales[i], scale_atoms=True)\n", + " structures[f\"scaled_{i}\"] = scaled_structure\n", + " return structures\n", "```\n", "\n", "\n", @@ -1570,7 +1572,22 @@ " x=wg.tasks[\"myfunc\"].outputs[\"add_multiply.add\"],\n", " )\n", "```\n", + "### List as Namespace Output and Input (Experimental)\n", + "\n", + "`PythonJob` also supports using a list of AiiDA data nodes as the output and input. Internally, the list output and input will be transferred to a dictionary with a special key (starting with `list_data_{index}`, where the index starts from 0). Note that this will be handled internally by the `PythonJob`, so the user will not be aware of this.\n", "\n", + "In the following example, we define a task that returns a list of `Atoms` objects:\n", + "\n", + "```python\n", + "@task.pythonjob(outputs=[{\"name\": \"structures\", \"identifier\": \"workgraph.namespace\"}])\n", + "def scaled_structures(structure: Atoms, scales: list) -> list[Atoms]:\n", + " structures = []\n", + " for scale in scales:\n", + " scaled_structure = structure.copy()\n", + " scaled_structure.set_cell(scaled_structure.cell * scale, scale_atoms=True)\n", + " structures.append(scaled_structure)\n", + " return structures\n", + "```\n", "\n", "## Second Real-world Workflow: Equation of state (EOS) WorkGraph\n", "\n" @@ -2418,7 +2435,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.0" + "version": "3.10.0" }, "vscode": { "interpreter": { diff --git a/tests/test_python.py b/tests/test_python.py index d0a13906..ea020ce4 100644 --- a/tests/test_python.py +++ b/tests/test_python.py @@ -231,6 +231,42 @@ def myfunc3(x, y): assert wg.tasks["myfunc3"].outputs["result"].value.value == 7 +def test_PythonJob_namespace_list(fixture_localhost): + """Test function with namespace output and input.""" + + # output namespace list + @task.pythonjob( + outputs=[ + { + "name": "result", + "identifier": "workgraph.namespace", + }, + ] + ) + def myfunc(x, y): + return [x + i for i in range(y)] + + # task use list as input + @task.pythonjob() + def myfunc2(x): + return sum(x) + + # + wg = WorkGraph("test_namespace_outputs") + wg.add_task(myfunc, name="myfunc") + wg.add_task(myfunc2, name="myfunc2", x=wg.tasks["myfunc"].outputs["result"]) + wg.run( + inputs={ + "myfunc": { + "x": 1, + "y": 4, + "computer": "localhost", + } + }, + ) + assert wg.tasks["myfunc2"].outputs["result"].value.value == 10 + + def test_PythonJob_parent_folder(fixture_localhost): """Test function with parent folder."""