Skip to content

Commit 345367a

Browse files
author
Michał Sośnicki
committed
chore(exporter): add ErrorReporter
1 parent d84d74c commit 345367a

File tree

11 files changed

+198
-133
lines changed

11 files changed

+198
-133
lines changed

src/neptune_exporter/export_manager.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from tqdm import tqdm
2121
import pyarrow as pa
2222
import pyarrow.compute as pc
23+
from neptune_exporter.exporters.error_reporter import ErrorReporter
2324
from neptune_exporter.exporters.exporter import NeptuneExporter
2425
from neptune_exporter.storage.parquet_writer import ParquetWriter, RunWriterContext
2526
from neptune_exporter.storage.parquet_reader import ParquetReader
@@ -33,13 +34,15 @@ def __init__(
3334
exporter: NeptuneExporter,
3435
reader: ParquetReader,
3536
writer: ParquetWriter,
37+
error_reporter: ErrorReporter,
3638
files_destination: Path,
3739
batch_size: int = 16,
3840
progress_bar: bool = True,
3941
):
4042
self._exporter = exporter
4143
self._reader = reader
4244
self._writer = writer
45+
self._error_reporter = error_reporter
4346
self._files_destination = files_destination
4447
self._batch_size = batch_size
4548
self._progress_bar = progress_bar
@@ -185,10 +188,10 @@ def run(
185188
# Update progress bar for completed batch
186189
runs_pbar.update(len(batch_run_ids))
187190

188-
exception_infos = self._exporter.get_exception_infos()
189-
if exception_infos:
191+
exception_summary = self._error_reporter.get_summary()
192+
if exception_summary.exception_count > 0:
190193
self._logger.error(
191-
f"{len(exception_infos)} exceptions occurred during export. See the logs for details."
194+
f"{exception_summary.exception_count} exceptions occurred during export. See the logs for details."
192195
)
193196

194197
return total_runs
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
#
2+
# Copyright (c) 2025, Neptune Labs Sp. z o.o.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from dataclasses import dataclass
17+
import json
18+
from pathlib import Path
19+
import threading
20+
from typing import Optional
21+
22+
from neptune_exporter.types import ProjectId, SourceRunId
23+
24+
25+
@dataclass
26+
class ErrorSummary:
27+
exception_count: int
28+
29+
30+
class ErrorReporter:
31+
def __init__(
32+
self,
33+
path: Path,
34+
) -> None:
35+
self.path = path
36+
self._lock = threading.Lock()
37+
self._summary = ErrorSummary(exception_count=0)
38+
39+
def get_summary(self) -> ErrorSummary:
40+
with self._lock:
41+
return ErrorSummary(exception_count=self._summary.exception_count)
42+
43+
def record_exception(
44+
self,
45+
project_id: ProjectId,
46+
run_id: SourceRunId,
47+
attribute_path: Optional[str],
48+
attribute_type: Optional[str],
49+
exception: Exception,
50+
) -> None:
51+
with self._lock:
52+
self._summary.exception_count += 1
53+
self._write_record(
54+
project_id=project_id,
55+
run_id=run_id,
56+
attribute_path=attribute_path,
57+
attribute_type=attribute_type,
58+
exception=exception,
59+
)
60+
61+
def record_batch_exception(
62+
self,
63+
project_id: ProjectId,
64+
run_ids: list[SourceRunId],
65+
attribute_paths: Optional[list[str]],
66+
exception: Exception,
67+
) -> None:
68+
with self._lock:
69+
for run_id in run_ids:
70+
if attribute_paths is None:
71+
self._summary.exception_count += 1
72+
self._write_record(
73+
project_id=project_id,
74+
run_id=run_id,
75+
attribute_path=None,
76+
attribute_type=None,
77+
exception=exception,
78+
)
79+
else:
80+
for attribute_path in attribute_paths:
81+
self._summary.exception_count += 1
82+
self._write_record(
83+
project_id=project_id,
84+
run_id=run_id,
85+
attribute_path=attribute_path,
86+
attribute_type=None,
87+
exception=exception,
88+
)
89+
90+
def _write_record(
91+
self,
92+
project_id: ProjectId,
93+
run_id: SourceRunId,
94+
attribute_path: Optional[str],
95+
attribute_type: Optional[str],
96+
exception: Exception,
97+
) -> None:
98+
with open(self.path, "a") as f:
99+
f.write(
100+
json.dumps(
101+
{
102+
"project_id": project_id,
103+
"run_id": run_id,
104+
"attribute_path": attribute_path,
105+
"attribute_type": attribute_type,
106+
"exception": exception.__class__.__name__,
107+
}
108+
)
109+
+ "\n"
110+
)

src/neptune_exporter/exporters/exporter.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,13 @@
1616
"""Core exporter abstract base class and types."""
1717

1818
from abc import ABC, abstractmethod
19-
from dataclasses import dataclass
2019
from typing import Generator, Optional, Sequence
2120
from pathlib import Path
2221
import pyarrow as pa
2322

2423
from neptune_exporter.types import ProjectId, SourceRunId
2524

2625

27-
@dataclass
28-
class ExceptionInfo:
29-
project_id: ProjectId
30-
run_id: SourceRunId
31-
attribute_path: Optional[str]
32-
attribute_type: Optional[str]
33-
exception: Exception
34-
35-
3626
class NeptuneExporter(ABC):
3727
"""Abstract base class for Neptune data exporters."""
3828

@@ -89,11 +79,6 @@ def download_files(
8979
"""Download files from Neptune runs."""
9080
pass
9181

92-
@abstractmethod
93-
def get_exception_infos(self) -> list[ExceptionInfo]:
94-
"""Get list of exceptions that occurred during export."""
95-
pass
96-
9782
def close(self) -> None:
9883
"""Close the exporter."""
9984
pass

src/neptune_exporter/exporters/neptune2.py

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
from neptune import management
3333

3434
from neptune_exporter import model
35-
from neptune_exporter.exporters.exporter import ExceptionInfo, NeptuneExporter
35+
from neptune_exporter.exporters.exporter import NeptuneExporter
36+
from neptune_exporter.exporters.error_reporter import ErrorReporter
3637
from neptune_exporter.types import ProjectId, SourceRunId
3738

3839
_ATTRIBUTE_TYPE_MAP = {
@@ -72,19 +73,20 @@
7273
class Neptune2Exporter(NeptuneExporter):
7374
def __init__(
7475
self,
76+
error_reporter: ErrorReporter,
7577
api_token: Optional[str] = None,
7678
max_workers: int = 16,
7779
show_client_logs: bool = False,
7880
include_trashed_runs: bool = False,
7981
):
82+
self._error_reporter = error_reporter
8083
self._include_trashed_runs = include_trashed_runs
8184
self._api_token = api_token
8285
self._quantize_base = Decimal("1.000000")
8386
self._executor = ThreadPoolExecutor(max_workers=max_workers)
8487
self._logger = logging.getLogger(__name__)
8588
self._show_client_logs = show_client_logs
8689
self._initialize_client(show_client_logs=show_client_logs)
87-
self._exception_infos: list[ExceptionInfo] = []
8890

8991
def _initialize_client(self, show_client_logs: bool) -> None:
9092
if show_client_logs:
@@ -696,28 +698,6 @@ def _should_include_attribute(
696698

697699
return True
698700

699-
def get_exception_infos(self) -> list[ExceptionInfo]:
700-
"""Get list of exceptions that occurred during export."""
701-
return self._exception_infos
702-
703-
def _record_exception(
704-
self,
705-
project_id: ProjectId,
706-
run_id: SourceRunId,
707-
attribute_path: Optional[str],
708-
attribute_type: Optional[str],
709-
exception: Exception,
710-
) -> None:
711-
self._exception_infos.append(
712-
ExceptionInfo(
713-
project_id=project_id,
714-
run_id=run_id,
715-
attribute_path=attribute_path,
716-
attribute_type=attribute_type,
717-
exception=exception,
718-
)
719-
)
720-
721701
def _handle_run_exception(
722702
self, project_id: ProjectId, run_id: SourceRunId, exception: Exception
723703
) -> None:
@@ -740,7 +720,7 @@ def _handle_run_exception(
740720
f"Skipping project {project_id}, run {run_id} because of unexpected error.",
741721
exc_info=True,
742722
)
743-
self._record_exception(
723+
self._error_reporter.record_exception(
744724
project_id=project_id,
745725
run_id=run_id,
746726
attribute_path=None,
@@ -786,7 +766,7 @@ def _handle_attribute_exception(
786766
f"Skipping project {project_id}, run {run_id}, attribute {attribute_path} ({attribute_type}) because of unexpected error.",
787767
exc_info=True,
788768
)
789-
self._record_exception(
769+
self._error_reporter.record_exception(
790770
project_id=project_id,
791771
run_id=run_id,
792772
attribute_path=attribute_path,

src/neptune_exporter/exporters/neptune3.py

Lines changed: 6 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
import logging
2828

2929
from neptune_exporter import model
30-
from neptune_exporter.exporters.exporter import ExceptionInfo, NeptuneExporter
30+
from neptune_exporter.exporters.exporter import NeptuneExporter
31+
from neptune_exporter.exporters.error_reporter import ErrorReporter
3132
from neptune_exporter.types import ProjectId, SourceRunId
3233

3334

@@ -51,6 +52,7 @@
5152
class Neptune3Exporter(NeptuneExporter):
5253
def __init__(
5354
self,
55+
error_reporter: ErrorReporter,
5456
api_token: Optional[str] = None,
5557
series_attribute_batch_size: int = 128,
5658
file_attribute_batch_size: int = 16,
@@ -67,7 +69,7 @@ def __init__(
6769
self._executor = ThreadPoolExecutor(max_workers=max_workers)
6870
self._logger = logging.getLogger(__name__)
6971
self._initialize_client(api_token=api_token, show_client_logs=show_client_logs)
70-
self._exception_infos: list[ExceptionInfo] = []
72+
self._error_reporter = error_reporter
7173

7274
def _initialize_client(
7375
self, api_token: Optional[str], show_client_logs: bool
@@ -693,40 +695,6 @@ def _convert_files_to_schema(
693695
}
694696
)
695697

696-
def get_exception_infos(self) -> list[ExceptionInfo]:
697-
"""Get list of exceptions that occurred during export."""
698-
return self._exception_infos
699-
700-
def _record_exception(
701-
self,
702-
project_id: ProjectId,
703-
run_ids: list[SourceRunId],
704-
attribute_paths: Optional[list[str]],
705-
exception: Exception,
706-
) -> None:
707-
for run_id in run_ids:
708-
if attribute_paths is None:
709-
self._exception_infos.append(
710-
ExceptionInfo(
711-
project_id=project_id,
712-
run_id=run_id,
713-
attribute_path=None,
714-
attribute_type=None,
715-
exception=exception,
716-
)
717-
)
718-
else:
719-
for attribute_path in attribute_paths:
720-
self._exception_infos.append(
721-
ExceptionInfo(
722-
project_id=project_id,
723-
run_id=run_id,
724-
attribute_path=attribute_path,
725-
attribute_type=None,
726-
exception=exception,
727-
)
728-
)
729-
730698
def _handle_batch_exception(
731699
self,
732700
project_id: ProjectId,
@@ -755,7 +723,7 @@ def _handle_batch_exception(
755723
exc_info=True,
756724
)
757725

758-
self._record_exception(
726+
self._error_reporter.record_batch_exception(
759727
project_id=project_id,
760728
run_ids=run_ids,
761729
attribute_paths=attribute_batch,
@@ -789,7 +757,7 @@ def _handle_runs_exception(
789757
exc_info=True,
790758
)
791759

792-
self._record_exception(
760+
self._error_reporter.record_batch_exception(
793761
project_id=project_id,
794762
run_ids=run_ids,
795763
attribute_paths=None,

0 commit comments

Comments
 (0)