Skip to content

Commit 9004830

Browse files
Michał Sośnickimichalsosn
authored andcommitted
fix: sort remaining metadata in topo sort
1 parent 12bb37f commit 9004830

File tree

2 files changed

+32
-8
lines changed

2 files changed

+32
-8
lines changed

src/neptune_exporter/loader_manager.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
import datetime
1617
import heapq
1718
from pathlib import Path
1819
from typing import NewType, Optional
@@ -123,9 +124,9 @@ def _topological_sort_runs(
123124
# Otherwise, run is a root (orphaned or no parent)
124125

125126
# Kahn's algorithm: start with nodes with in-degree 0
126-
# Results are sorted by metadata.creation_time
127+
# Results are sorted by metadata.creation_time, those without creation time are last
127128
queue = [
128-
(metadata.creation_time, metadata)
129+
(metadata.creation_time or datetime.datetime.max, metadata)
129130
for metadata in run_metadata
130131
if in_degree[SourceRunId(metadata.run_id)] == 0
131132
]
@@ -141,7 +142,11 @@ def _topological_sort_runs(
141142
if source_run_id in parent_to_children:
142143
for child_metadata in parent_to_children[source_run_id]:
143144
heapq.heappush(
144-
queue, (child_metadata.creation_time, child_metadata)
145+
queue,
146+
(
147+
child_metadata.creation_time or datetime.datetime.max,
148+
child_metadata,
149+
),
145150
)
146151

147152
# Check for cycles (shouldn't happen in Neptune, but defensive)
@@ -157,6 +162,9 @@ def _topological_sort_runs(
157162
for metadata in run_metadata
158163
if SourceRunId(metadata.run_id) not in result_ids
159164
]
165+
remaining_metadata.sort(
166+
key=lambda x: (x.creation_time or datetime.datetime.max, x.run_id)
167+
)
160168
result.extend(remaining_metadata)
161169

162170
return result

tests/unit/test_loader_manager.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,10 @@ def test_topological_sort_creation_time_order():
204204
# Setup: two runs with different creation times
205205
run1_id = "RUN-1"
206206
run2_id = "RUN-2"
207+
run3_id = "RUN-3"
207208
creation_time1 = datetime.datetime(2025, 1, 1, 12, 0, 0)
208-
creation_time2 = datetime.datetime(2025, 1, 1, 12, 0, 1)
209+
creation_time2 = None
210+
creation_time3 = datetime.datetime(2025, 1, 1, 12, 0, 2)
209211

210212
run1_metadata = RunMetadata(
211213
project_id="project1",
@@ -227,9 +229,22 @@ def test_topological_sort_creation_time_order():
227229
creation_time=creation_time2,
228230
)
229231

232+
run3_metadata = RunMetadata(
233+
project_id="project1",
234+
run_id=run3_id,
235+
custom_run_id=None,
236+
experiment_name=None,
237+
parent_source_run_id=None,
238+
fork_step=None,
239+
creation_time=creation_time3,
240+
)
230241
# Mock reader
231-
mock_reader.list_run_files.return_value = [run2_id, run1_id]
232-
mock_reader.read_run_metadata.side_effect = [run2_metadata, run1_metadata]
242+
mock_reader.list_run_files.return_value = [run3_id, run2_id, run1_id]
243+
mock_reader.read_run_metadata.side_effect = [
244+
run3_metadata,
245+
run2_metadata,
246+
run1_metadata,
247+
]
233248
mock_reader.read_run_data.return_value = iter([pa.table({"col": [1]})])
234249

235250
# Track creation order
@@ -248,9 +263,10 @@ def track_create_run(*args, **kwargs):
248263
manager._load_project(project_dir, runs=None)
249264

250265
# Verify: runs should be processed in creation time order
251-
assert len(created_runs) == 2
266+
assert len(created_runs) == 3
252267
assert created_runs[0] == run1_id
253-
assert created_runs[1] == run2_id
268+
assert created_runs[1] == run3_id
269+
assert created_runs[2] == run2_id
254270

255271

256272
def test_topological_sort_multiple_children():

0 commit comments

Comments
 (0)