Skip to content

Commit 0e9fd44

Browse files
Fix anonymous reading in materialize and add rate limited logging. (#898)
* Fix anonymous reading in materialize and add rate limited logging. * In materialize, try reading using the credentials, but if it doesn't work, fall back to reading anonymously if that seems to be working. * Add rate limited logging to reading via materialize in local mode. * Check for no root before checking if a source since that makes more sense. * switch ntsb_loader_materialized.py over to read in local mode, it was working (with the anonymous fix), but was very slow hence the logging.
1 parent 1b102e3 commit 0e9fd44

File tree

4 files changed

+102
-7
lines changed

4 files changed

+102
-7
lines changed

examples/query/ntsb_loader_materialized.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,11 @@
7373
}
7474

7575

76-
context = sycamore.init()
77-
_ = context.read.materialize("s3://aryn-public/materialize/examples/luna/ntsb_loader_2024-08-29").write.opensearch(
78-
os_client_args=os_client_args,
79-
index_name=INDEX,
80-
index_settings=index_settings,
76+
context = sycamore.init(exec_mode=sycamore.EXEC_LOCAL)
77+
(
78+
context.read.materialize("s3://aryn-public/materialize/examples/luna/ntsb_loader_2024-08-29").write.opensearch(
79+
os_client_args=os_client_args,
80+
index_name=INDEX,
81+
index_settings=index_settings,
82+
)
8183
)

lib/sycamore/sycamore/materialize.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,17 +95,45 @@ def __init__(
9595

9696
super().__init__(child, **kwargs)
9797

98+
self._maybe_anonymous()
99+
100+
def _maybe_anonymous(self):
101+
if self._root is None:
102+
return
103+
from pyarrow.fs import S3FileSystem
104+
105+
if not isinstance(self._fs, S3FileSystem):
106+
return
107+
108+
try:
109+
self._fs.get_file_info(str(self._root))
110+
return
111+
except OSError as e:
112+
logging.warning(f"Got error {e} trying to get file info on {self._root}, trying again in anonymous mode")
113+
114+
fs = S3FileSystem(anonymous=True)
115+
try:
116+
fs.get_file_info(str(self._root))
117+
self._fs = fs
118+
self._fshelper = _PyArrowFsHelper(self._fs)
119+
return
120+
except OSError as e:
121+
logging.warning(
122+
f"Got error {e} trying to anonymously get file info on {self._root}. Likely to fail shortly."
123+
)
124+
return
125+
98126
def prepare(self):
99127
"""
100128
Clean up the materialize location if necessary.
101129
Validate that cleaning worked, but only once all materializes have finished cleaning.
102130
This protects against multiple materializes pointing to the same location.
103131
"""
104132

105-
if self._will_be_source():
133+
if self._root is None:
106134
return
107135

108-
if self._root is None:
136+
if self._will_be_source():
109137
return
110138

111139
if not self._clean_root:
@@ -207,13 +235,21 @@ def local_source(self) -> list[Document]:
207235
logger.info(f"Using {self._orig_path} as cached source of data")
208236
if not self._fshelper.file_exists(self._success_path()):
209237
logging.warning(f"materialize.success not found in {self._orig_path}. Returning partial data")
238+
from sycamore.utils.sycamore_logger import LoggerFilter
239+
240+
limited_logger = logging.getLogger(__name__ + ".limited_local_source")
241+
limited_logger.addFilter(LoggerFilter())
210242
ret = []
243+
count = 0
211244
for fi in self._fshelper.list_files(self._root):
212245
n = Path(fi.path)
213246
if n.suffix == ".pickle":
247+
limited_logger.info(f" reading file {count} from {str(n)}")
248+
count = count + 1
214249
f = self._fs.open_input_stream(str(n))
215250
ret.append(Document.deserialize(f.read()))
216251
f.close()
252+
logger.info(f" read {count} total files")
217253

218254
return ret
219255

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import logging
2+
import time
3+
4+
from sycamore.utils.sycamore_logger import LoggerFilter
5+
6+
7+
def test_logger_ratelimit(caplog):
8+
logger = logging.getLogger("test_sycamore")
9+
10+
with caplog.at_level(logging.INFO):
11+
for i in range(5):
12+
logger.info(f"Unbounded {i}")
13+
14+
logger.addFilter(LoggerFilter())
15+
for i in range(5):
16+
logger.info(f"Bounded {i}")
17+
18+
time.sleep(1)
19+
logger.info("Bounded After")
20+
21+
for i in range(5):
22+
assert f"Unbounded {i}\n" in caplog.text
23+
24+
assert "Bounded 0" in caplog.text
25+
for i in range(1, 5):
26+
assert f"Bounded {i}\n" not in caplog.text
27+
28+
assert "Bounded After\n" in caplog.text

lib/sycamore/sycamore/utils/sycamore_logger.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import sys
33
import time
44

5+
from datetime import datetime, timedelta
6+
57
handler_setup = False
68

79

@@ -27,3 +29,30 @@ def get_logger():
2729
"""Get an application logger"""
2830
logger = logging.getLogger("sycamore")
2931
return logger
32+
33+
34+
class LoggerFilter(logging.Filter):
35+
def __init__(self, seconds=1):
36+
"""
37+
A filter limit the rate of log messages.
38+
39+
logger = logging.getLogger(__name__)
40+
logger.setFilter(LoggerFilter())
41+
42+
43+
Args:
44+
seconds: Minimum seconds between log messages.
45+
"""
46+
47+
self._min_interval = timedelta(seconds=seconds)
48+
self._next_log = datetime.now()
49+
50+
def filter(self, record=None):
51+
if record is None:
52+
assert False
53+
now = datetime.now()
54+
if now >= self._next_log:
55+
self._next_log = now + self._min_interval
56+
return True
57+
58+
return False

0 commit comments

Comments
 (0)