-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathparsers.py
150 lines (114 loc) · 5.59 KB
/
parsers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""
Parsers provided by aiida_aurora.
Register parsers via the "aiida.parsers" entry point in setup.json.
"""
import json
import os
import re
import numpy as np
from aiida.common import exceptions
from aiida.engine import ExitCode
from aiida.orm import ArrayData, SinglefileData
from aiida.parsers.parser import Parser
from aiida.plugins import CalculationFactory
BatteryCyclerExperiment = CalculationFactory("aurora.cycler")
class TomatoParser(Parser):
"""
Parser class for parsing the output of a BatteryCyclerExperiment generated by tomato.
Two files should be found in the working directory:
- a ZIP file, containing the raw data
- a JSON file, containing the data pre-processed by tomato & yadg
Priority is to retrieve the parsed JSON file.
The raw data ZIP file is also retained if found.
"""
def __init__(self, node):
"""
Initialize Parser instance
Checks that the ProcessNode being passed was produced by a BatteryCyclerExperiment.
:param node: ProcessNode of calculation
:param type node: :class:`aiida.orm.ProcessNode`
"""
super().__init__(node)
if not issubclass(node.process_class, BatteryCyclerExperiment):
raise exceptions.ParsingError("Can only parse BatteryCyclerExperiment")
def parse(self, **kwargs):
"""
Parse outputs, store results in database.
:returns: an exit code, if parsing fails (or nothing if parsing succeeds)
"""
retrieved_temporary_folder = kwargs["retrieved_temporary_folder"]
output_json_filename = self.node.get_option("output_filename") + ".json"
output_zip_filename = os.path.join(retrieved_temporary_folder, self.node.get_option("output_filename") + ".zip")
files_retrieved = self.retrieved.list_object_names()
# Check that zip file is present
if os.path.isfile(output_zip_filename):
try:
self.logger.debug(f"Storing '{output_zip_filename}'")
output_raw_data_node = SinglefileData(output_zip_filename)
self.out("raw_data", output_raw_data_node)
output_raw_data_node_created = True
except Exception:
self.logger.warning(f"The raw data zip file '{output_zip_filename}' could not be read.")
output_raw_data_node_created = False
else:
self.logger.warning(f"The raw data zip file '{output_zip_filename}' is missing.")
output_raw_data_node_created = False
# Check that json file is present
if output_json_filename not in files_retrieved:
self.logger.error(f"The output json file '{output_json_filename}' is missing.")
if output_raw_data_node_created:
# only json file is missing
return self.exit_codes.ERROR_OUTPUT_JSON_MISSING
# both files are missing
return self.exit_codes.ERROR_OUTPUT_FILES_MISSING
# If a json file was found, parse it and add output node
try:
self.logger.debug(f"Parsing '{output_json_filename}'")
with self.retrieved.open(output_json_filename, "r") as handle:
output_results_node = self.parse_tomato_results(json.load(handle), self.logger)
self.out("results", output_results_node)
except OSError:
self.logger.error(f"Error opening the json file '{output_json_filename}'.")
return self.exit_codes.ERROR_OUTPUT_JSON_READ
except json.JSONDecodeError:
self.logger.error(f"Error parsing json file '{output_json_filename}'.")
return self.exit_codes.ERROR_OUTPUT_JSON_PARSE
# If found, the zip file was already stored in a SinglefileData node
if not output_raw_data_node_created:
return self.exit_codes.WARNING_OUTPUT_ZIP_MISSING
# If files were retrieved and parsed by the job was completed with error or cancelled,
# use the specific error codes
job_annotation = self.node.get_attribute("last_job_info").get("annotation")
if job_annotation == "Completed with error":
return self.exit_codes.WARNING_COMPLETED_ERROR
elif job_annotation == "Cancelled":
return self.exit_codes.WARNING_COMPLETED_CANCELLED
return ExitCode(0)
@staticmethod
def parse_tomato_results(data_dic, logger=None):
"""
Parse results.json file.
:returns: a :class:`aiida.orm.ArrayData` in this way:
- `metadata` is stored as attribute
- `data` is split in steps, physical quantity name, and n/s/u identifier (nominal value, std error, units)
The name of each array is: `'step{step_number}_{raw_quantity_name}_{identifier}'`
"""
parsed = {}
data = data_dic["steps"][0]["data"]
keys = ["Ewe", "I"] # HACK hardcoded
fill = {"n": np.nan, "s": np.nan, "u": ""}
if logger:
logger.debug(f"parse_tomato_results: storing {keys}")
for key in keys:
clean_key = re.sub("[^0-9a-zA-Z_]", "_", key) # TODO necessary?
for id in ("n", "s", "u"):
values = [step["raw"].get(key, fill)[id] for step in data]
parsed[f"step0_{clean_key}_{id}"] = np.array(values)
parsed["step0_uts"] = np.array([step["uts"] for step in data])
node = ArrayData()
for key, value in parsed.items():
node.set_array(key, value)
node.set_attribute_many(data_dic["metadata"])
if logger:
logger.debug(f"parse_tomato_results: {list(parsed.keys())} stored")
return node