1
+ #!/usr/bin/env python3
2
+
3
+ import os
4
+ import sys
5
+ import glob
6
+ import yaml
7
+ import re
8
+ import argparse
9
+
10
+ # Valid result choices, including an OR case
11
+ VALID_RESULTS = [
12
+ "PASS" ,
13
+ "FAIL" ,
14
+ "NOT-APPLICABLE" ,
15
+ "FAIL OR NOT-APPLICABLE"
16
+ ]
17
+
18
+ def prompt_user_choice (prompt , choices , allow_empty = False ):
19
+ """
20
+ Prompt the user with a set of valid choices, or optionally allow an empty response.
21
+ Returns the user's selection (string).
22
+ If allow_empty is True, user can press Enter to skip (returns '').
23
+ """
24
+ choices_str = "/" .join (choices )
25
+ while True :
26
+ user_input = input (f"{ prompt } [{ choices_str } ]{ ' (or press Enter to skip)' if allow_empty else '' } : " )
27
+ user_input = user_input .strip ().upper ()
28
+ if allow_empty and user_input == "" :
29
+ return ""
30
+ valid_upper = [c .upper () for c in choices ]
31
+ if user_input in valid_upper :
32
+ idx = valid_upper .index (user_input )
33
+ return choices [idx ]
34
+ print (f"Invalid input. Please choose one of: { choices_str } " )
35
+
36
+
37
+ def load_yaml (filepath ):
38
+ """Safely load YAML from a file, returning a dict (or empty dict on error)."""
39
+ try :
40
+ with open (filepath , "r" ) as f :
41
+ data = yaml .safe_load (f )
42
+ if not isinstance (data , dict ):
43
+ return {}
44
+ return data
45
+ except Exception as e :
46
+ print (f"Error loading YAML file { filepath } : { e } " )
47
+ return {}
48
+
49
+
50
+ def extract_ocp_version (filename ):
51
+ """
52
+ Extracts version from the filename by matching a dash followed by something
53
+ like 4.15, 4.18, etc. Example: 'rhcos4-moderate-4.15.yml' -> '4.15'.
54
+ Returns None if no match is found.
55
+ """
56
+ # We look for a pattern '-X.Y' or '-X.Y.Z', etc. near the end before the .yml
57
+ match = re .search (r'-([\d.]+)\.yml$' , os .path .basename (filename ))
58
+ if match :
59
+ return match .group (1 )
60
+ return None
61
+
62
+
63
+ def main ():
64
+ parser = argparse .ArgumentParser (
65
+ description = "Update rule assertions in YAML files (interactive or via flags)."
66
+ )
67
+ parser .add_argument ("--rule" ,
68
+ help = "Exact rule name to search for (underscores will be turned into dashes)." )
69
+ parser .add_argument ("--bulk" , action = "store_true" ,
70
+ help = "If specified, apply updates in bulk to all matching assertions." )
71
+ parser .add_argument ("--default-result" ,
72
+ help = "Set this as the new default_result for matching rules." )
73
+ parser .add_argument ("--result-after-remediation" ,
74
+ help = "Set this as the new result_after_remediation for matching rules." )
75
+ parser .add_argument ("--exclude-version" , action = "append" , default = [],
76
+ help = "Exclude certain OCP versions from changes. Can be used multiple times." )
77
+ args = parser .parse_args ()
78
+
79
+ # 1) Get the rule name (either from CLI or prompt)
80
+ if args .rule :
81
+ rule_name = args .rule .strip ()
82
+ else :
83
+ rule_name = input ("Enter the rule name to search for: " ).strip ()
84
+
85
+ if not rule_name :
86
+ print ("No rule name provided. Exiting." )
87
+ sys .exit (0 )
88
+
89
+ # Replace underscores with dashes in the user rule.
90
+ # We only match if the normalized key ends with this string.
91
+ rule_name = rule_name .replace ("_" , "-" )
92
+
93
+ # Path to your assertion files
94
+ assertions_dir = "assertions/ocp4"
95
+
96
+ # Collect excluded versions
97
+ exclude_versions = set (args .exclude_version ) if args .exclude_version else set ()
98
+ if exclude_versions :
99
+ print (f"\n Excluding OCP versions: { ', ' .join (sorted (exclude_versions ))} " )
100
+
101
+ # Find all YAML files
102
+ yaml_files = glob .glob (os .path .join (assertions_dir , "**/*.yml" ), recursive = True )
103
+
104
+ # Collect all matches: (yaml_file, key_in_yaml, default_res, remediation_res, version)
105
+ matches = []
106
+ for yf in yaml_files :
107
+ # Extract the OCP version from the filename
108
+ file_version = extract_ocp_version (yf )
109
+ # If version is in the excluded set, skip entirely
110
+ if file_version and file_version in exclude_versions :
111
+ continue
112
+
113
+ data = load_yaml (yf )
114
+ rule_results = data .get ("rule_results" , {})
115
+ if not isinstance (rule_results , dict ):
116
+ continue
117
+
118
+ # For each key, check if it ends with rule_name after normalizing
119
+ for key , val in rule_results .items ():
120
+ normalized_key = key .replace ("_" , "-" )
121
+ if normalized_key .endswith (rule_name ):
122
+ dres = val .get ("default_result" )
123
+ rres = val .get ("result_after_remediation" )
124
+ matches .append ((yf , key , dres , rres , file_version ))
125
+
126
+ if not matches :
127
+ print (f"\n No matching rule_results found that end with '{ rule_name } ' "
128
+ f"(or they were all excluded by version)." )
129
+ sys .exit (0 )
130
+
131
+ # Summarize matches
132
+ print (f"\n Found { len (matches )} matching rule_results that end with '{ rule_name } ':\n " )
133
+ for i , (fpath , key , dres , rres , ver ) in enumerate (matches , start = 1 ):
134
+ print (f"{ i } . File: { fpath } " )
135
+ if ver :
136
+ print (f" OCP Version: { ver } " )
137
+ else :
138
+ print (" OCP Version: <not detected>" )
139
+ print (f" Key: { key } " )
140
+ print (f" default_result: { dres } " )
141
+ print (f" result_after_remediation: { rres if rres else '<not set>' } " )
142
+ print ("" )
143
+
144
+ # We'll store changes in memory before writing
145
+ files_to_update = {} # file_path -> updated_data
146
+
147
+ # 2) Decide Bulk or Individual
148
+ if args .bulk :
149
+ do_bulk = True
150
+ else :
151
+ choice = input ("Do you want to update ALL of these assertions in bulk? [y/N]: " ).strip ().lower ()
152
+ do_bulk = (choice == "y" )
153
+
154
+ if do_bulk :
155
+ # 3a) Bulk update
156
+ if args .default_result :
157
+ # Validate
158
+ dr = args .default_result .upper ()
159
+ if dr not in [v .upper () for v in VALID_RESULTS ]:
160
+ print (f"ERROR: '{ args .default_result } ' is not a valid result. Choose from { VALID_RESULTS } ." )
161
+ sys .exit (1 )
162
+ idx = [v .upper () for v in VALID_RESULTS ].index (dr )
163
+ new_default_result = VALID_RESULTS [idx ]
164
+ else :
165
+ new_default_result = prompt_user_choice ("New default_result" , VALID_RESULTS )
166
+
167
+ if args .result_after_remediation :
168
+ rr = args .result_after_remediation .upper ()
169
+ if rr not in [v .upper () for v in VALID_RESULTS ]:
170
+ print (f"ERROR: '{ args .result_after_remediation } ' is not valid. Choose from { VALID_RESULTS } ." )
171
+ sys .exit (1 )
172
+ idx = [v .upper () for v in VALID_RESULTS ].index (rr )
173
+ new_remediation_result = VALID_RESULTS [idx ]
174
+ else :
175
+ set_remediation = input ("Do you want to set 'result_after_remediation'? [y/N]: " ).strip ().lower ()
176
+ if set_remediation == "y" :
177
+ new_remediation_result = prompt_user_choice ("New result_after_remediation" , VALID_RESULTS )
178
+ else :
179
+ new_remediation_result = ""
180
+
181
+ # Apply these updates to all matches
182
+ for (yf , key , _ , _ , file_version ) in matches :
183
+ # Already know file_version is not in excluded set, so proceed
184
+ if yf not in files_to_update :
185
+ files_to_update [yf ] = load_yaml (yf )
186
+
187
+ data = files_to_update [yf ]
188
+ rr = data .get ("rule_results" , {})
189
+ if key not in rr :
190
+ rr [key ] = {}
191
+ rr [key ]["default_result" ] = new_default_result
192
+ if new_remediation_result :
193
+ rr [key ]["result_after_remediation" ] = new_remediation_result
194
+ else :
195
+ rr [key ].pop ("result_after_remediation" , None )
196
+ data ["rule_results" ] = rr
197
+
198
+ else :
199
+ # 3b) Individual update
200
+ for i , (yf , key , dres , rres , file_version ) in enumerate (matches , start = 1 ):
201
+ print (f"\n Match #{ i } in file: { yf } , key: { key } " )
202
+ if file_version :
203
+ print (f" OCP Version: { file_version } " )
204
+ else :
205
+ print (" OCP Version: <not detected>" )
206
+ print (f" Current default_result: { dres } " )
207
+ print (f" Current result_after_remediation: { rres if rres else '<not set>' } " )
208
+
209
+ if args .default_result :
210
+ df = args .default_result .upper ()
211
+ if df not in [v .upper () for v in VALID_RESULTS ]:
212
+ print (f"ERROR: '{ args .default_result } ' is not valid. Choose from { VALID_RESULTS } ." )
213
+ sys .exit (1 )
214
+ idx = [v .upper () for v in VALID_RESULTS ].index (df )
215
+ new_default_result = VALID_RESULTS [idx ]
216
+ # We assume user wants to update
217
+ else :
218
+ choice = input (" Do you want to update this assertion? [y/N]: " ).strip ().lower ()
219
+ if choice == "y" :
220
+ new_default_result = prompt_user_choice (" New default_result" , VALID_RESULTS )
221
+ else :
222
+ continue
223
+
224
+ if args .result_after_remediation :
225
+ r = args .result_after_remediation .upper ()
226
+ if r not in [v .upper () for v in VALID_RESULTS ]:
227
+ print (f"ERROR: '{ args .result_after_remediation } ' is not valid. { VALID_RESULTS } " )
228
+ sys .exit (1 )
229
+ idx = [v .upper () for v in VALID_RESULTS ].index (r )
230
+ new_remediation_result = VALID_RESULTS [idx ]
231
+ else :
232
+ set_r = input (" Do you want to set 'result_after_remediation'? [y/N]: " ).strip ().lower ()
233
+ if set_r == "y" :
234
+ new_remediation_result = prompt_user_choice (" New result_after_remediation" , VALID_RESULTS )
235
+ else :
236
+ new_remediation_result = ""
237
+
238
+ if yf not in files_to_update :
239
+ files_to_update [yf ] = load_yaml (yf )
240
+
241
+ data = files_to_update [yf ]
242
+ rr = data .get ("rule_results" , {})
243
+ if key not in rr :
244
+ rr [key ] = {}
245
+
246
+ rr [key ]["default_result" ] = new_default_result
247
+ if new_remediation_result :
248
+ rr [key ]["result_after_remediation" ] = new_remediation_result
249
+ else :
250
+ rr [key ].pop ("result_after_remediation" , None )
251
+
252
+ data ["rule_results" ] = rr
253
+
254
+ # 4) Final confirmation, then write
255
+ if not files_to_update :
256
+ print ("\n No changes were made." )
257
+ sys .exit (0 )
258
+
259
+ print ("\n The following files would be updated:" )
260
+ for fpath in files_to_update :
261
+ print (f" - { fpath } " )
262
+
263
+ confirm = input ("\n Confirm saving changes to disk? [y/N]: " ).strip ().lower ()
264
+ if confirm != "y" :
265
+ print ("Changes NOT saved." )
266
+ sys .exit (0 )
267
+
268
+ # Write to disk
269
+ for fpath , updated_data in files_to_update .items ():
270
+ try :
271
+ with open (fpath , "w" ) as wf :
272
+ yaml .dump (updated_data , wf , sort_keys = False , Dumper = yaml .SafeDumper )
273
+ print (f"Saved: { fpath } " )
274
+ except Exception as e :
275
+ print (f"Error writing { fpath } : { e } " )
276
+
277
+ print ("\n All done. Changes have been saved." )
278
+
279
+
280
+ if __name__ == "__main__" :
281
+ main ()
0 commit comments