-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmerge_expectations.py
executable file
·114 lines (99 loc) · 3.87 KB
/
merge_expectations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/usr/bin/env python3
from collections import defaultdict
import argparse
import json
import pathlib
import sys
def update_or_equal(dict_, key, new_value):
if key in dict_:
old_value = dict_[key]
if old_value == new_value:
return 0
old_without_mtime = dict(old_value)
del old_without_mtime["mtime"]
new_without_mtime = dict(new_value)
del new_without_mtime["mtime"]
if old_without_mtime != new_without_mtime:
print(f"ERROR: CONFLICT for key {key}:\n{old_value}\n{new_value}")
return 1
# At this point, the conflict is only about mtime. This is actually quite common, so we want to report it only once, each.
if old_value["mtime"] is None:
# Already reported, nothing to do.
return 0
print(
f"Warning: Conflicting mtime for {key} (e.g. {old_value['mtime']} vs. {new_value['mtime']})",
file=sys.stderr,
)
old_value['mtime'] = None
# No need to update dict_, since it still contains old_value by reference.
else:
dict_[key] = new_value
return 0
def do_merge(sources):
# Step 1: Resolve potential conflicts and prepare data:
path_to_filedict = dict()
path_to_all_children = defaultdict(set)
# TODO: Stream instead of doing multi-pass.
# Also, memory-efficiency in general.
errors = 0
for source_index, source in enumerate(sources):
for file_expectation in source:
# Wtf, black?!
assert (
file_expectation["type"] == "file"
), f"Entry in source #{i} (0-indexed) does not have type=file. Maybe target and sources mixed up?"
name = file_expectation["name"]
path = pathlib.Path(name)
errors += update_or_equal(path_to_filedict, path.as_posix(), file_expectation)
if name != ".":
path_to_all_children[path.parent.as_posix()].add(path.name)
# Step 2: Generate new expectations
# Specifically, we now expect that each directory *only* contains the mentioned files.
for parent, children in path_to_all_children.items():
assert parent in path_to_filedict
parent_entry = path_to_filedict[parent]
assert parent_entry["filetype"] == "dir"
assert parent_entry["children"] == None
children_list = list(children)
children_list.sort()
parent_entry["children"] = children_list
# Sanity check that we caught all dirs
for entry in path_to_filedict.values():
if entry["filetype"] != "dir":
continue
if "children" not in entry:
print(f"Not checking children of {entry.name}", file=sys.stderr)
# Step 3: Emit deduplicated, rearranged data
expectations = list(path_to_filedict.values())
del path_to_filedict # Early gc, just in case it helps
expectations.sort(key=lambda e: e["name"])
return expectations, errors
def run(args):
sources = []
for source_filename in args.source_filenames:
with open(source_filename, "r") as fp:
sources.append(json.load(fp))
result, errors = do_merge(sources)
with open(args.result_filename, "w") as fp:
json.dump(result, fp)
if errors:
print(f"Encountered {errors} errors. Output file is usable, but will cause false positives.")
exit(1)
def build_parser():
parser = argparse.ArgumentParser()
parser.add_argument(
"result_filename",
metavar="RESULT.total.json",
)
parser.add_argument(
"source_filenames",
metavar="TWO_OR_MORE_SOURCES.deb.json",
nargs="+"
)
return parser
if __name__ == "__main__":
args = build_parser().parse_args()
if len(args.source_filenames) == 1:
print(f"Only one source file given. This does not usually make sense, aborting.", file=sys.stderr)
exit(1)
run(args)