Skip to content

Commit 473727f

Browse files
committed
fix(upgrade): use timestamps of partners at layers > 2
1 parent fca6eff commit 473727f

File tree

2 files changed

+29
-18
lines changed

2 files changed

+29
-18
lines changed

pychunkedgraph/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "3.0.7"
1+
__version__ = "3.0.8"

pychunkedgraph/ingest/upgrade/parent_layer.py

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,14 @@ def _populate_cx_edges_with_timestamps(
6666
for node, node_ts in zip(nodes, nodes_ts):
6767
CX_EDGES[node] = {}
6868
timestamps = timestamps_d[node]
69-
timestamps.add(node_ts)
69+
cx_edges_d_node_ts = _get_cx_edges_at_timestamp(node, response, node_ts)
70+
71+
edges = np.concatenate([empty_2d] + list(cx_edges_d_node_ts.values()))
72+
partner_parent_ts_d = get_parent_timestamps(cg, edges[:, 1])
73+
for v in partner_parent_ts_d.values():
74+
timestamps.update(v)
75+
CX_EDGES[node][node_ts] = cx_edges_d_node_ts
76+
7077
for ts in sorted(timestamps):
7178
if ts < earliest_ts:
7279
ts = earliest_ts
@@ -152,21 +159,25 @@ def update_chunk(
152159
nodes_ts = cg.get_node_timestamps(nodes, return_numpy=False, normalize=True)
153160
_populate_cx_edges_with_timestamps(cg, layer, nodes, nodes_ts, earliest_ts)
154161

155-
task_size = int(math.ceil(len(nodes) / mp.cpu_count() / 2))
156-
chunked_nodes = chunked(nodes, task_size)
157-
chunked_nodes_ts = chunked(nodes_ts, task_size)
158-
cg_info = cg.get_serialized_info()
159-
160-
tasks = []
161-
for chunk, ts_chunk in zip(chunked_nodes, chunked_nodes_ts):
162-
args = (cg_info, layer, chunk, ts_chunk, earliest_ts)
163-
tasks.append(args)
164-
165-
with mp.Pool(min(mp.cpu_count(), len(tasks))) as pool:
166-
_ = list(
167-
tqdm(
168-
pool.imap_unordered(_update_cross_edges_helper, tasks),
169-
total=len(tasks),
162+
if nodes:
163+
for node, node_ts in zip(nodes, nodes_ts):
164+
update_cross_edges(cg, layer, node, node_ts, earliest_ts)
165+
else:
166+
task_size = int(math.ceil(len(nodes) / mp.cpu_count() / 2))
167+
chunked_nodes = chunked(nodes, task_size)
168+
chunked_nodes_ts = chunked(nodes_ts, task_size)
169+
cg_info = cg.get_serialized_info()
170+
171+
tasks = []
172+
for chunk, ts_chunk in zip(chunked_nodes, chunked_nodes_ts):
173+
args = (cg_info, layer, chunk, ts_chunk, earliest_ts)
174+
tasks.append(args)
175+
176+
with mp.Pool(min(mp.cpu_count(), len(tasks))) as pool:
177+
_ = list(
178+
tqdm(
179+
pool.imap_unordered(_update_cross_edges_helper, tasks),
180+
total=len(tasks),
181+
)
170182
)
171-
)
172183
print(f"total elaspsed time: {time.time() - start}")

0 commit comments

Comments
 (0)