Skip to content

Commit b62aa49

Browse files
committed
Fixed stealthy issue with closures and dangling FedASDF instance
1 parent f65e1f8 commit b62aa49

File tree

1 file changed

+22
-16
lines changed

1 file changed

+22
-16
lines changed

seismic/extract_event_traces.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from obspy.core.event import Catalog
1212
from obspy.core import Stream, Trace, UTCDateTime
1313
import click
14-
1514
from seismic.ASDFdatabase.FederatedASDFDataSet import FederatedASDFDataSet
1615
from seismic.stream_processing import zne_order
1716
from seismic.stream_io import safe_iter_event_data, write_h5_event_stream
@@ -34,11 +33,11 @@
3433
# descriptions
3534
DESCS = {'P': 'P-wave', 'S': 'S-wave', 'SW': 'Surface-wave'}
3635

37-
def asdf_get_waveforms(asdf_dataset, network, station, location, channel, starttime,
36+
def asdf_get_waveforms(ds:FederatedASDFDataSet, network, station, location, channel, starttime,
3837
endtime):
3938
"""Custom waveform getter function to retrieve waveforms from FederatedASDFDataSet.
4039
41-
:param asdf_dataset: Instance of FederatedASDFDataSet to query
40+
:param ds: Instance of FederatedASDFDataSet to query
4241
:type asdf_dataset: seismic.ASDFdatabase.FederatedASDFDataSet
4342
:param network: Network code
4443
:type network: str
@@ -56,14 +55,14 @@ def asdf_get_waveforms(asdf_dataset, network, station, location, channel, startt
5655
:rtype: obspy.Stream of obspy.Traces
5756
"""
5857
st = Stream()
59-
matching_stations = asdf_dataset.get_stations(starttime, endtime, network=network, station=station,
58+
matching_stations = ds.get_stations(starttime, endtime, network=network, station=station,
6059
location=location)
6160
if matching_stations:
6261
channel = channel.replace('?', '.') # replace greedy matching by single-character matching
6362
ch_matcher = re.compile(channel)
6463
for net, sta, loc, cha, _, _, _ in matching_stations:
6564
if ch_matcher.match(cha):
66-
st += asdf_dataset.get_waveforms(net, sta, loc, cha, starttime, endtime,
65+
st += ds.get_waveforms(net, sta, loc, cha, starttime, endtime,
6766
trace_count_threshold=50)
6867
# end if
6968
# end for
@@ -173,14 +172,11 @@ def pick(self, ztrace, ntrace, etrace, phase='P'):
173172
# end func
174173
# end class
175174

176-
def extract_data(fds, catalog, inventory, event_trace_datafile, log_folder,
175+
def extract_data(recording_timespan_getter, waveform_getter,
176+
catalog, inventory, event_trace_datafile, log_folder,
177177
wave, request_window, time_range, distance_range, magnitude_range,
178178
depth_range, min_areal_separation_km, resample_hz, tt_model='iasp91', pad=10,
179179
dry_run=True):
180-
def closure_get_waveforms(network, station, location, channel, starttime, endtime):
181-
return asdf_get_waveforms(fds, network, station, location, channel, starttime, endtime)
182-
# end func
183-
184180
assert wave in ['P', 'S', 'SW'], 'Only P, S and SW (surface wave) is supported. Aborting..'
185181

186182
# initialize phase-map dict
@@ -259,7 +255,7 @@ def closure_get_waveforms(network, station, location, channel, starttime, endtim
259255
sta_lon, sta_lat = coord['longitude'], coord['latitude']
260256

261257
# set start- and end-times
262-
st, et = fds.get_recording_timespan(network=net, station=sta, location=loc)
258+
st, et = recording_timespan_getter(network=net, station=sta, location=loc)
263259
if(time_range[0] is None):
264260
time_range[0] = st
265261
else:
@@ -293,7 +289,7 @@ def closure_get_waveforms(network, station, location, channel, starttime, endtim
293289
stream_count = 0
294290
sta_stream = Stream()
295291
status = DataFrame()
296-
for s in safe_iter_event_data(curr_cat, curr_inv, closure_get_waveforms,
292+
for s in safe_iter_event_data(curr_cat, curr_inv, waveform_getter,
297293
use_rfstats=rfstats_map[wave],
298294
phase=phase_map[wave],
299295
tt_model=tt_model, pbar=None,
@@ -540,7 +536,7 @@ def main(data_source, network_list, station_list, gcmt_catalog_file, output_file
540536
# end for
541537
# end for
542538

543-
if(len(netsta) == 0):
539+
if(len(netsta_df) == 0):
544540
log.error('Inventory is empty! Aborting..')
545541
parallel_abort('')
546542
else:
@@ -554,14 +550,24 @@ def main(data_source, network_list, station_list, gcmt_catalog_file, output_file
554550
if(rank == 0):
555551
assert not os.path.exists(output_file), \
556552
"Output file {} already exists, please remove!".format(output_file)
557-
log.info("Traces will be written to: {}".format(output_file))
553+
log.info("Traces will be written to: {}\n".format(output_file))
558554
# end if
559555

556+
# define closures for getting recording timespans and waveforms
557+
def recording_timespan_getter(network, station, location):
558+
return fds.get_recording_timespan(network=network, station=station, location=location)
559+
# end func
560+
561+
def waveform_getter(network, station, location, channel, starttime, endtime):
562+
return asdf_get_waveforms(fds, network, station, location, channel, starttime, endtime)
563+
# end func
564+
560565
for wave, flag in owave_types.items():
561566
if(not flag): continue
562567

563-
log.info('Processing {} events..'.format(DESCS[wave]))
564-
extract_data(fds, catalog, inventory, output_file, log_folder,
568+
if(rank == 0): log.info('Processing {} events..'.format(DESCS[wave]))
569+
extract_data(recording_timespan_getter, waveform_getter, catalog,
570+
inventory, output_file, log_folder,
565571
wave, request_window[wave], [start_time, end_time],
566572
distance_range[wave], magnitude_range[wave],
567573
depth_range[wave], areal_separation_km[wave],

0 commit comments

Comments
 (0)