diff --git a/seismic/xcorqc/correlator.py b/seismic/xcorqc/correlator.py index 47285f91..7f2f646f 100644 --- a/seismic/xcorqc/correlator.py +++ b/seismic/xcorqc/correlator.py @@ -31,6 +31,7 @@ from seismic.xcorqc.xcorqc import IntervalStackXCorr from seismic.xcorqc.utils import getStationInventory, read_location_preferences, Dataset +from seismic.xcorqc.subset_stacker import SubsetStacker from seismic.misc import get_git_revision_hash, rtp2xyz, split_list from seismic.misc_p import ProgressTracker from itertools import product @@ -45,7 +46,7 @@ def process(data_source1, data_source2, output_path, clip_to_2std=False, whitening=False, whitening_window_frequency=0, one_bit_normalize=False, location_preferences=None, ds1_zchan=None, ds1_nchan=None, ds1_echan=None, ds2_zchan=None, ds2_nchan=None, ds2_echan=None, corr_chan=None, - envelope_normalize=False, ensemble_stack=False, subset_stack=False, apply_simple_stacking=True, + envelope_normalize=False, ensemble_stack=False, subset_stacker=None, apply_simple_stacking=True, restart=False, dry_run=False, no_tracking_tag=False, scratch_folder=None): """ :param data_source1: Text file containing paths to ASDF files @@ -119,7 +120,7 @@ def outputConfigParameters(): if(whitening): f.write('%35s\t\t\t: %s\n' % ('--whitening-window-frequency', whitening_window_frequency)) f.write('%35s\t\t\t: %s\n' % ('--ensemble-stack', ensemble_stack)) - f.write('%35s\t\t\t: %s\n' % ('--subset-stack', subset_stack)) + f.write('%35s\t\t\t: %s\n' % ('--subset-stack', subset_stacker is not None)) f.write('%35s\t\t\t: %s\n' % ('--restart', 'TRUE' if restart else 'FALSE')) f.write('%35s\t\t\t: %s\n' % ('--no-tracking-tag', 'TRUE' if no_tracking_tag else 'FALSE')) f.write('%35s\t\t\t: %s\n' % ('--scratch-folder', scratch_folder)) @@ -309,7 +310,7 @@ def get_loccha(cha1, cha2): interval_seconds, window_seconds, window_overlap, window_buffer_length, fmin, fmax, clip_to_2std, whitening, whitening_window_frequency, one_bit_normalize, envelope_normalize, - ensemble_stack, subset_stack, apply_simple_stacking, output_path, 2, + ensemble_stack, subset_stacker, apply_simple_stacking, output_path, 2, time_tag, scratch_folder, git_hash) # end for # end for @@ -528,13 +529,17 @@ def main(data_source1, data_source2, output_path, window_seconds, window_overlap apply_simple_stacking = True # end if + # instantiate subset-stacker if requested + subset_stacker = None + if(subset_stack): subset_stacker = SubsetStacker() + process(data_source1, data_source2, output_path, interval_seconds, window_seconds, window_overlap, window_buffer_length, read_ahead_window_seconds, resample_rate, taper_length, nearest_neighbours, pair_min_dist, pair_max_dist, fmin, fmax, station_names1, station_names2, pairs_to_compute, start_time, end_time, instrument_response_inventory, instrument_response_output, water_level, clip_to_2std, whitening, whitening_window_frequency, one_bit_normalize, location_preferences, ds1_zchan, ds1_nchan, ds1_echan, ds2_zchan, ds2_nchan, ds2_echan, corr_chan, envelope_normalize, - ensemble_stack, subset_stack, apply_simple_stacking, restart, dry_run, no_tracking_tag, + ensemble_stack, subset_stacker, apply_simple_stacking, restart, dry_run, no_tracking_tag, scratch_folder) # end func diff --git a/seismic/xcorqc/subset_stacker.py b/seismic/xcorqc/subset_stacker.py index 7fb9f34e..cfbfa67d 100644 --- a/seismic/xcorqc/subset_stacker.py +++ b/seismic/xcorqc/subset_stacker.py @@ -138,6 +138,7 @@ def get_affected_indices(source_eids, pat, swat, swet): p2 = [slon2, slat2] az, baz, dist = self.gc.geod.inv(p1[0], p1[1], p2[0], p2[1]) + #print(az, baz) eaz1, ebaz1, edist1 = self.gc.geod.inv(np.ones(len(cat)) * p1[0], np.ones(len(cat)) * p1[1], cat['lon'], cat['lat']) @@ -154,7 +155,6 @@ def get_affected_indices(source_eids, pat, swat, swet): # compute P and SW arrival times for relevant events at the two stations edepth_km = np.array(cat['dep']) otime = np.array(cat['EventOrigintim']) - mag = np.array(cat['MwG']) ptt1 = self.tti.get_tt('P', edistdeg1, edepth_km) ptt2 = self.tti.get_tt('P', edistdeg2, edepth_km) @@ -196,10 +196,16 @@ def get_affected_indices(source_eids, pat, swat, swet): ccids2_inside_az = get_affected_indices(eids2_inside_az, pat2, swat2, swet2) ccids_inside_az = ccids1_inside_az | ccids2_inside_az + #print('ccs inside azimuth for station 1: {}'.format(np.sum(ccids1_inside_az))) + #print('ccs inside azimuth for station 2: {}'.format(np.sum(ccids2_inside_az))) + ccids1_outside_az = get_affected_indices(eids_outside_az, pat1, swat1, swet1) ccids2_outside_az = get_affected_indices(eids_outside_az, pat2, swat2, swet2) ccids_outside_az = ccids1_outside_az | ccids2_outside_az + #print('ccs outside azimuth for station 1: {}'.format(np.sum(ccids1_outside_az))) + #print('ccs outside azimuth for station 2: {}'.format(np.sum(ccids2_outside_az))) + # aliases to indices as named in the manuscript idsXei = ccids_inside_az # inside az idsXec = ~(ccids_inside_az | ccids_outside_az) # no events @@ -207,6 +213,7 @@ def get_affected_indices(source_eids, pat, swat, swet): idsXeo = ccids_outside_az # compute means + mean = mean_Xei = mean_Xec = mean_XeiUXec = mean_Xeo = None mean = np.zeros(spooled_matrix.ncols) mean_Xei = np.zeros(spooled_matrix.ncols) mean_Xec = np.zeros(spooled_matrix.ncols) @@ -229,6 +236,14 @@ def get_affected_indices(source_eids, pat, swat, swet): mean_XeiUXec /= float(np.sum(idsXeiUXec)) mean_Xeo /= float(np.sum(idsXeo)) + """ + np.savez('stack3outputs.npz', xcf=mean, + xcf1=mean_Xei, xcf2=mean_Xec, xcf3=mean_XeiUXec, + xcf4=mean_Xeo, idsXei=idsXei, idsXec=idsXec, + idsXeiUXec=idsXeiUXec, idsXeo=idsXeo) + """ +# end if + return mean, mean_Xei, mean_Xec, mean_XeiUXec, mean_Xeo # end func # end class diff --git a/seismic/xcorqc/utils.py b/seismic/xcorqc/utils.py index 9d87d715..d56e8354 100644 --- a/seismic/xcorqc/utils.py +++ b/seismic/xcorqc/utils.py @@ -11,6 +11,7 @@ from seismic.misc import rtp2xyz, read_key_value_pairs from seismic.misc import get_git_revision_hash, rtp2xyz, split_list import os, psutil +from netCDF4 import Dataset as ncDataset class Dataset: def __init__(self, asdf_file_name, netsta_list='*'): @@ -180,8 +181,11 @@ def read_subset_stacker_config()->dict: fn = os.path.join(os.getcwd(), 'subset_stack.conf') try: d = read_key_value_pairs(fn, keys, strict=True) + for k in keys[1:]: + d[k] = float(d[k]) + # end for except Exception as e: - print(e) + print('Reading {} failed with error: {}'.format(fn, e)) # end try return d @@ -387,6 +391,38 @@ def __init__(self, ncols, dtype=np.float32, max_size_mb=2048, prefix='', dir=Non self._file = SpooledTemporaryFile(prefix = self._prefix, mode = 'w+b', max_size = max_size_mb * 1024**2, dir=dir) # end func + @classmethod + def from_nc(cls, nc_file): + """ + Alternative instantiation for testing purposes + @param nc_file: + @return: + """ + try: + ds = ncDataset(nc_file) + xcorr = np.array(ds.variables['xcorr']) + shp = xcorr.shape + ncols = 0 + + if(len(shp) == 1): ncols = shp[0] + elif(len(shp) == 2): ncols = shp[1] + + sm = cls(ncols, dtype=xcorr.dtype) + + if(len(shp) == 1): + sm.write_row(xcorr) + elif(len(shp) == 2): + for i in np.arange(shp[0]): + sm.write_row(xcorr[i, :]) + # end for + # end if + + return sm + except Exception as e: + print('Failed to load {} with error: {}'.format(nc_file, str(e))) + # end try + # end func + @property def ncols(self): return self._ncols diff --git a/seismic/xcorqc/xcorqc.py b/seismic/xcorqc/xcorqc.py index 1ec8b374..58d304f4 100644 --- a/seismic/xcorqc/xcorqc.py +++ b/seismic/xcorqc/xcorqc.py @@ -34,6 +34,7 @@ from seismic.xcorqc.fft import * from seismic.ASDFdatabase.FederatedASDFDataSet import FederatedASDFDataSet from seismic.xcorqc.utils import get_stream, fill_gaps, MemoryTracker +from seismic.xcorqc.subset_stacker import SubsetStacker from netCDF4 import Dataset from functools import reduce from seismic.xcorqc.utils import SpooledMatrix @@ -522,7 +523,7 @@ def IntervalStackXCorr(refds, tempds, clip_to_2std=False, whitening=False, whitening_window_frequency=0, one_bit_normalize=False, envelope_normalize=False, ensemble_stack=False, - subset_stack=False, + subset_stacker: SubsetStacker=None, apply_simple_stacking=True, outputPath='/tmp', verbose=1, tracking_tag='', scratch_folder=None, git_hash=''): @@ -596,9 +597,9 @@ def IntervalStackXCorr(refds, tempds, :param envelope_normalize: Envelope via Hilbert transforms and normalize :type ensemble_stack: bool :param ensemble_stack: Outputs a single CC function stacked over all data for a given station-pair - :type subset_stack: bool - :param subset_stack: Outputs stacks over subsets of CC windows, identified based on the presence/absence - of azimuthal earthquake energy + :type subset_stacker: SubsetStacker + :param subset_stacker: Custom stacker to stack over subsets of CC windows, identified based on the + presence/absence of azimuthal earthquake energy :type apply_simple_stacking: bool :param apply_simple_stacking: stacks cross-correlation windows over intervals :type outputPath: str @@ -887,46 +888,58 @@ def IntervalStackXCorr(refds, tempds, else: xc[:] = es # end if - elif subset_stack: - pass else: root_grp.createDimension('interval', flattenedIntervalStartTimes.shape[0]) root_grp.createDimension('window', flattenedWindowStartTimes.shape[0]) - # Variables + # create and polulate variables interval = root_grp.createVariable('interval', 'f4', ('interval',)) ist = root_grp.createVariable('IntervalStartTimes', 'i8', ('interval',)) iet = root_grp.createVariable('IntervalEndTimes', 'i8', ('interval',)) wst = root_grp.createVariable('WindowStartTimes', 'i8', ('window',)) wet = root_grp.createVariable('WindowEndTimes', 'i8', ('window',)) - xc = None - nsw = None + interval[:] = np.arange(flattenedIntervalStartTimes.shape[0]) + ist[:] = flattenedIntervalStartTimes + iet[:] = flattenedIntervalEndTimes + wst[:] = flattenedWindowStartTimes + wet[:] = flattenedWindowEndTimes + if(apply_simple_stacking): nsw = root_grp.createVariable('NumStackedWindows', 'f4', ('interval',)) xc = root_grp.createVariable('xcorr', 'f4', ('interval', 'lag',), chunksizes=(1, spooledXcorr.ncols), zlib=True) + nsw[:] = flattenedWindowCounts + for irow in np.arange(spooledXcorr.nrows): + xc[irow, :] = spooledXcorr.read_row(irow) + # end for + elif subset_stacker is not None: + xc = root_grp.createVariable('xcorr', 'f4', ('lag',)) + xc_Xei = root_grp.createVariable('xcorr_Xei', 'f4', ('lag',)) + xc_Xec = root_grp.createVariable('xcorr_Xec', 'f4', ('lag',)) + xc_XeiUXec = root_grp.createVariable('xcorr_XeiUXec', 'f4', ('lag',)) + xc_Xeo = root_grp.createVariable('xcorr_Xeo', 'f4', ('lag',)) + + slon1, slat1 = refds.unique_coordinates[ref_net_sta] + slon2, slat2 = tempds.unique_coordinates[temp_net_sta] + mean, mean_Xei, mean_Xec, mean_XeiUXec, mean_Xeo = \ + subset_stacker.stack(spooledXcorr, flattenedWindowStartTimes, + flattenedWindowEndTimes, + slon1, slat1, slon2, slat2) + xc[:] = mean + xc_Xei[:] = mean_Xei + xc_Xec[:] = mean_Xec + xc_XeiUXec[:] = mean_XeiUXec + xc_Xeo[:] = mean_Xeo else: xc = root_grp.createVariable('xcorr', 'f4', ('window', 'lag',), chunksizes=(1, spooledXcorr.ncols), zlib=True) + for irow in np.arange(spooledXcorr.nrows): + xc[irow, :] = spooledXcorr.read_row(irow) + # end for # end if - - # Populate variables - if(apply_simple_stacking): - nsw[:] = flattenedWindowCounts - # end if - - interval[:] = np.arange(flattenedIntervalStartTimes.shape[0]) - ist[:] = flattenedIntervalStartTimes - iet[:] = flattenedIntervalEndTimes - wst[:] = flattenedWindowStartTimes - wet[:] = flattenedWindowEndTimes - - for irow in np.arange(spooledXcorr.nrows): - xc[irow, :] = spooledXcorr.read_row(irow) - # end for # end if lag[:] = x diff --git a/tests/test_seismic/xcorqc/test_correlator.py b/tests/test_seismic/xcorqc/test_correlator.py index 34045b12..3b0e6f88 100644 --- a/tests/test_seismic/xcorqc/test_correlator.py +++ b/tests/test_seismic/xcorqc/test_correlator.py @@ -55,9 +55,9 @@ expected_folder = '%s/data/expected/'%(path) output_folder = str(tempfile.mkdtemp()) +#output_folder = '/tmp' os.mkdir(os.path.join(output_folder, 'stacked')) os.mkdir(os.path.join(output_folder, 'unstacked')) -#output_folder = '/tmp' def test_correlator(): start_time = '2006-11-03T00:00:00' @@ -76,7 +76,7 @@ def test_correlator(): netsta2, None, start_time, end_time, None, 'vel', 50, False, True, 0.02, True, loc_pref, '*Z', '*N', '*E', '*Z', '*N', '*E', 'z', False, False, - True, False, False, True, None) + None, True, False, False, True, None) # Read result @@ -109,7 +109,7 @@ def test_correlator(): netsta2, None, start_time, end_time, None, 'vel', 50, False, True, 0.02, True, loc_pref, '*Z', '*N', '*E', '*Z', '*N', '*E', 'z', False, False, - False, False, False, True, None) + None, False, False, False, True, None) # Read result diff --git a/tests/test_seismic/xcorqc/test_xcorr.py b/tests/test_seismic/xcorqc/test_xcorr.py index 18b4c526..2163fd4d 100644 --- a/tests/test_seismic/xcorqc/test_xcorr.py +++ b/tests/test_seismic/xcorqc/test_xcorr.py @@ -175,7 +175,7 @@ def expected_lines_iter(): interval_seconds=isec, window_overlap=olap, window_buffer_length=wbl, - apply_stacking=True) + apply_simple_stacking=True) olines.append("============[test: {}]============\n".format(i)) olines.append('Params: raw: {}, wsec: {}, olap: {}, wbl: {}\n'.format(raw, wsec, olap, wbl)) @@ -260,7 +260,7 @@ def expected_lines_iter(): interval_seconds=isec, window_overlap=olap, window_buffer_length=wbl, - apply_stacking=True) + apply_simple_stacking=True) olines.append("============[test: {}]============\n".format(i)) olines.append('Params: wsec: {}, isec: {}, olap: {}, wbl: {}\n'.format(wsec, isec, olap, wbl))