@@ -26,7 +26,7 @@ def __init__(
26
26
metadata_format = ECHO10_C ,
27
27
messages_override = None ,
28
28
checks_override = None ,
29
- rules_override = None
29
+ rules_override = None ,
30
30
):
31
31
"""
32
32
Args:
@@ -53,13 +53,13 @@ def __init__(
53
53
self .rules_override ,
54
54
self .checks ,
55
55
self .checks_override ,
56
- metadata_format = metadata_format
56
+ metadata_format = metadata_format ,
57
+ )
58
+ self .schema_validator = SchemaValidator (
59
+ self .messages_override or self .messages , metadata_format
57
60
)
58
- self .schema_validator = SchemaValidator (self .messages_override or self .messages , metadata_format )
59
61
self .tracker = Tracker (
60
- self .rule_mapping ,
61
- self .rules_override ,
62
- metadata_format = metadata_format
62
+ self .rule_mapping , self .rules_override , metadata_format = metadata_format
63
63
)
64
64
65
65
@staticmethod
@@ -76,15 +76,9 @@ def load_schemas(self):
76
76
self .checks = Checker ._json_load_schema ("checks" )
77
77
self .rule_mapping = Checker ._json_load_schema ("rule_mapping" )
78
78
self .messages = Checker ._json_load_schema ("check_messages" )
79
- self .messages_override = Checker ._json_load_schema (
80
- self .msgs_override_file
81
- )
82
- self .rules_override = Checker ._json_load_schema (
83
- self .rules_override_file
84
- )
85
- self .checks_override = Checker ._json_load_schema (
86
- self .checks_override_file
87
- )
79
+ self .messages_override = Checker ._json_load_schema (self .msgs_override_file )
80
+ self .rules_override = Checker ._json_load_schema (self .rules_override_file )
81
+ self .checks_override = Checker ._json_load_schema (self .checks_override_file )
88
82
89
83
@staticmethod
90
84
def map_to_function (data_type , function ):
@@ -112,19 +106,19 @@ def message(self, rule_id, msg_type):
112
106
msg_type can be any one of 'failure', 'remediation'
113
107
"""
114
108
messages = self .messages_override .get (rule_id ) or self .messages .get (rule_id )
115
- return messages [msg_type ] if messages else ''
109
+ return messages [msg_type ] if messages else ""
116
110
117
111
def build_message (self , result , rule_id ):
118
112
"""
119
113
Formats the message for `rule_id` based on the result
120
114
"""
121
115
failure_message = self .message (rule_id , "failure" )
122
- rule_mapping = self .rules_override .get (
116
+ rule_mapping = self .rules_override .get (rule_id ) or self . rule_mapping . get (
123
117
rule_id
124
- ) or self . rule_mapping . get ( rule_id )
118
+ )
125
119
severity = rule_mapping .get ("severity" , "error" )
126
120
messages = []
127
- if not (result ["valid" ]) and result .get ("value" ):
121
+ if not (result ["valid" ]) and result .get ("value" ):
128
122
for value in result ["value" ]:
129
123
formatted_message = failure_message
130
124
value = value if isinstance (value , tuple ) else (value ,)
@@ -143,7 +137,9 @@ def _check_dependency_validity(self, dependency, field_dict):
143
137
"""
144
138
Checks if the dependent check called `dependency` is valid
145
139
"""
146
- dependency_fields = field_dict ["fields" ] if len (dependency ) == 1 else [dependency [1 ]]
140
+ dependency_fields = (
141
+ field_dict ["fields" ] if len (dependency ) == 1 else [dependency [1 ]]
142
+ )
147
143
for field in dependency_fields :
148
144
if not self .tracker .read_data (dependency [0 ], field ).get ("valid" ):
149
145
return False
@@ -162,27 +158,26 @@ def _run_func(self, func, check, rule_id, metadata_content, result_dict):
162
158
"""
163
159
Run the check function for `rule_id` and update `result_dict`
164
160
"""
165
- rule_mapping = self .rules_override .get (
161
+ rule_mapping = self .rules_override .get (rule_id ) or self . rule_mapping . get (
166
162
rule_id
167
- ) or self . rule_mapping . get ( rule_id )
163
+ )
168
164
external_data = rule_mapping .get ("data" , [])
169
165
relation = rule_mapping .get ("relation" )
170
- list_of_fields_to_apply = \
171
- rule_mapping .get ("fields_to_apply" ).get (self .metadata_format , {})
172
-
166
+ list_of_fields_to_apply = rule_mapping .get ("fields_to_apply" ).get (
167
+ self .metadata_format , {}
168
+ )
169
+
173
170
for field_dict in list_of_fields_to_apply :
174
- dependencies = self .scheduler .get_all_dependencies (rule_mapping , check , field_dict )
171
+ dependencies = self .scheduler .get_all_dependencies (
172
+ rule_mapping , check , field_dict
173
+ )
175
174
main_field = field_dict ["fields" ][0 ]
176
175
external_data = field_dict .get ("data" , external_data )
177
176
result_dict .setdefault (main_field , {})
178
177
if not self ._check_dependencies_validity (dependencies , field_dict ):
179
178
continue
180
179
result = self .custom_checker .run (
181
- func ,
182
- metadata_content ,
183
- field_dict ,
184
- external_data ,
185
- relation
180
+ func , metadata_content , field_dict , external_data , relation
186
181
)
187
182
188
183
self .tracker .update_data (rule_id , main_field , result ["valid" ])
@@ -211,14 +206,16 @@ def perform_custom_checks(self, metadata_content):
211
206
) or self .rule_mapping .get (rule_id )
212
207
check_id = rule_mapping .get ("check_id" , rule_id )
213
208
check = self .checks_override .get (check_id ) or self .checks .get (check_id )
214
- func = Checker .map_to_function (check ["data_type" ], check ["check_function" ])
209
+ func = Checker .map_to_function (
210
+ check ["data_type" ], check ["check_function" ]
211
+ )
215
212
if func :
216
213
self ._run_func (func , check , rule_id , metadata_content , result_dict )
217
214
except Exception as e :
218
215
pyquarc_errors .append (
219
216
{
220
217
"message" : f"Running check for the rule: '{ rule_id } ' failed." ,
221
- "details" : str (e )
218
+ "details" : str (e ),
222
219
}
223
220
)
224
221
return result_dict , pyquarc_errors
@@ -233,6 +230,7 @@ def run(self, metadata_content):
233
230
Returns:
234
231
(dict): The results of the jsonschema check and all custom checks
235
232
"""
233
+
236
234
def _xml_postprocessor (_ , key , value ):
237
235
"""
238
236
Sometimes the XML values contain attributes.
@@ -259,11 +257,7 @@ def _xml_postprocessor(_, key, value):
259
257
parser = parse
260
258
kwargs = {"postprocessor" : _xml_postprocessor }
261
259
json_metadata = parser (metadata_content , ** kwargs )
262
- result_schema = self .perform_schema_check (
263
- metadata_content
264
- )
260
+ result_schema = self .perform_schema_check (metadata_content )
265
261
result_custom , pyquarc_errors = self .perform_custom_checks (json_metadata )
266
- result = {
267
- ** result_schema , ** result_custom
268
- }
262
+ result = {** result_schema , ** result_custom }
269
263
return result , pyquarc_errors
0 commit comments