diff --git a/seismic/catalogue/__init__.py b/seismic/catalogue/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/seismic/catalogue/gcmt.py b/seismic/catalogue/gcmt.py new file mode 100644 index 00000000..d18ecaa5 --- /dev/null +++ b/seismic/catalogue/gcmt.py @@ -0,0 +1,180 @@ +import numpy as np +import pandas as pd +from pandas.core.series import Series +from scipy.spatial import cKDTree +from pyproj import Geod +from obspy.core import UTCDateTime +from collections import defaultdict +from obspy.geodetics.base import degrees2kilometers +from seismic.misc import rtp2xyz + +def azimuth_difference(a, b, p, ellipse='WGS84'): + geod = Geod(ellps=ellipse) + + # ab Geodesic from A to B. + # az_ab Azimuth of geodesic from A to B. + az_ab, _, _ = geod.inv(a[0], a[1], b[0], b[1]) + + # ap Geodesic from A to P. + # az_ap Azimuth of geodesic from A to P. + az_ap, _, d_ap = geod.inv(a[0], a[1], p[0], p[1]) + + # bp Geodesic from B to P. + # az_bp Azimuth of geodesic from B to P. + az_bp, _, d_bp = geod.inv(b[0], b[1], p[0], p[1]) + + if(d_ap > d_bp): + return np.min(np.array([np.fabs(az_ab - az_ap), np.fabs(az_ab-az_bp)])) + else: + return azimuth_difference(b, a, p) + # end if +# end func + +class GCMTCatalog: + def __init__(self, fn, ellipse='WGS84'): + def read_gcmt_catalog(fn): + """ + @param fn: GCMT catalog file name in text format. The expected columns (space-separated) are: + lonp lon lat dep mrr mtt mff mrt mrf mtf exp EventOrigintim DC CLVD VOL Mw str1 dip1 rak1 \ + str2 dip2 rak2 plunP azP plunT azT Hlonp Hlon Hlat Hdep Ctime Chdur MwG base Bst Bco Bmp Bper \ + Sst Sco Smp Sper Mst Mco Mmp Mper + @return: catalog in a pandas dataframe + """ + def convert_to_timestamp(eotime): + a = str(eotime) + b = a[0:4] + '-' + a[4:6] + '-' + a[6:8] + 'T' + a[8:10] + ':' + a[10:12] + ':' + a[12:] + return UTCDateTime(b).timestamp + + # end func + + cat = pd.read_csv(fn, header=[1], delimiter='\s+') + cat['EventOrigintim'] = cat['EventOrigintim'].map(convert_to_timestamp) + + return cat + # end func + + self.fn = fn + self.cat = read_gcmt_catalog(fn) + self.EARTH_RADIUS_KM = 6371. + self.tree = None + self.geod = Geod(ellps=ellipse) + # end func + + def get_compatible_events(self, station_lon1, station_lat1, + station_lon2, station_lat2, + max_areal_separation_km=15, + max_depth_separation_km=10, + min_magnitude=-1, + az_range=80, min_event_dist_deg=10, + max_event_dist_deg=100, + max_mt_angle=15): + """ + For a given pair of stations, finds a list of pairs of proximal earthquakes that meet the + given criteria for event proximity, distance, azimuth and magnitude + @param station_lon1: longitude of station 1 + @param station_lat1: latitude of station 1 + @param station_lon2: longitude of station 2 + @param station_lat2: latitude of station 2 + @param max_areal_separation_km: maximum areal separation of event-pair + @param max_depth_separation_km: maximum separation of event-pair in depth + @param min_magnitude: minimum magnitude of earthquakes + @param az_range: (+/-) azimuth range + @param min_event_dist_deg: minimum distance of event epicentres from either station in degrees + @param max_event_dist_deg: maximum distance of event epicentres from either station in degrees + @param max_mt_angle: maximum moment-tensor angle between event-pairs + @return: dictionary indexed by a pair of event IDs, with the moment-tensor angle between + them as the value + """ + + # create kDTree for spatial queries if not done so yet + if(self.tree is None): + # create kdtree for spatial queries + r = np.ones(len(self.cat)) * self.EARTH_RADIUS_KM + t = np.radians(90 - self.cat['lat']) + p = np.radians(self.cat['lon']) + + xyz = rtp2xyz(r, t, p) + self.tree = cKDTree(xyz) + # end if + + MIN_DIST = degrees2kilometers(min_event_dist_deg) * 1e3 + MAX_DIST = degrees2kilometers(max_event_dist_deg) * 1e3 + + p1 = [station_lon1, station_lat1] + p2 = [station_lon2, station_lat2] + az, baz, dist = self.geod.inv(p1[0], p1[1], p2[0], p2[1]) + + eaz1, ebaz1, edist1 = self.geod.inv(np.ones(len(self.cat)) * p1[0], + np.ones(len(self.cat)) * p1[1], + self.cat['lon'], self.cat['lat']) + eaz2, ebaz2, edist2 = self.geod.inv(np.ones(len(self.cat)) * p2[0], + np.ones(len(self.cat)) * p2[1], + self.cat['lon'], self.cat['lat']) + + # find event IDs that match the given criteria + good_ids = ((eaz1 > (baz - az_range)) & (eaz1 < (baz + az_range))) | \ + ((eaz2 > (az - az_range)) & (eaz2 < (az + az_range))) + good_ids &= ((edist1 >= MIN_DIST) & (edist1 <= MAX_DIST)) & \ + ((edist2 >= MIN_DIST) & (edist2 <= MAX_DIST)) + good_ids &= (self.cat['Mw'] >= min_magnitude) + + # find all proximal events + qr = np.ones(np.sum(good_ids)) * self.EARTH_RADIUS_KM + qt = np.radians(90 - self.cat['lat'][good_ids]) + qp = np.radians(self.cat['lon'][good_ids]) + + qxyz = rtp2xyz(qr, qt, qp) + id_lists = self.tree.query_ball_point(qxyz, max_areal_separation_km) + + result = defaultdict(list) + # find angles between all proximal events + for ids in id_lists: + if (len(ids) < 2): continue + + ids = np.array(ids) + # drop events by magnitude + ids = ids[(self.cat['Mw'][ids] >= min_magnitude)] + + gmat = np.array(self.cat.iloc[ids, 4:10]).T + gmat_norm = np.linalg.norm(gmat, axis=0) + + gmat /= gmat_norm + angles = np.arccos(np.clip(np.matmul(gmat.T, gmat), -1, 1)) + + mask = np.zeros(angles.shape) + mask[np.mask_indices(angles.shape[0], np.tril)] = 1 + angles = np.degrees(np.ma.masked_array(angles, mask=mask)) + + s_ids = np.argsort(angles.flatten()) + s_ids_i, s_ids_j = np.unravel_index(s_ids, angles.shape) + + for i, j in zip(s_ids_i, s_ids_j): + if (not np.ma.is_masked(angles[i, j])): + # drop events by depth-difference limit + depth_difference = np.fabs(self.cat['dep'][ids[i]] - self.cat['dep'][ids[j]]) + if(depth_difference > max_depth_separation_km): continue + + if(angles[i, j] > max_mt_angle): break + result[(ids[i], ids[j])] = angles[i, j] + # end if + # end for + # end for + + return result + # end func + + @staticmethod + def get_mt_angle(e1:Series, e2:Series): + """ + @param e1: an event row from a GCMTCatalog instance + @param e2: an event row from a GCMTCatalog instance + @return: moment-tensor angle between the two events + """ + + mt_angle = np.degrees(np.arccos(np.min([np.dot(e1.iloc[4:10], + e2.iloc[4:10]) / \ + (np.linalg.norm(e1.iloc[4:10]) * + np.linalg.norm(e2.iloc[4:10])), 1.]))) + return mt_angle + # end func +# end class diff --git a/seismic/misc.py b/seismic/misc.py index 4fcec7a3..925ca169 100644 --- a/seismic/misc.py +++ b/seismic/misc.py @@ -86,6 +86,50 @@ def rtp2xyz(r, theta, phi): return xout # end func +def read_key_value_pairs(file_path:str, keys:list, strict=False)->dict: + """ + Reads a text file containing colon-separated key-value pairs and returns a dictionary with values for specified keys. + Raises a ValueError if any of the specified keys are not found. + + :param file_path: Path to the text file. + :param keys: A list of keys to search for in the file. + :param strict: Ensures all keys are found in the file + :return: A dictionary with the specified keys and their corresponding values. + :raises ValueError: If any key is not found in the file, if strict is set to True. + """ + result = {} + keys_found = set() + + f = None + try: + f = open(file_path, 'r') + except Exception as e: + raise e + else: + with open(file_path, 'r') as f: + for line in f: + # Split the line by colon to get the key-value pair + if ':' in line: + key, value = line.strip().split(':', 1) + key, value = key.strip(), value.strip() # Clean up extra spaces + # If the key is in the provided list, add it to the result + if key in keys: + result[key] = value + keys_found.add(key) + # end if + # end for + # end with + # end try + + # Check if any keys were not found + missing_keys = set(keys) - keys_found + if missing_keys: + raise ValueError(f"The following keys were not found in the file: {', '.join(missing_keys)}") + # end if + + return result +# end func + def print_exception(e: Exception): exc_type, exc_obj, exc_tb = sys.exc_info() fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] diff --git a/seismic/xcorqc/correlator.py b/seismic/xcorqc/correlator.py index d3f895ef..7f2f646f 100644 --- a/seismic/xcorqc/correlator.py +++ b/seismic/xcorqc/correlator.py @@ -31,6 +31,7 @@ from seismic.xcorqc.xcorqc import IntervalStackXCorr from seismic.xcorqc.utils import getStationInventory, read_location_preferences, Dataset +from seismic.xcorqc.subset_stacker import SubsetStacker from seismic.misc import get_git_revision_hash, rtp2xyz, split_list from seismic.misc_p import ProgressTracker from itertools import product @@ -43,10 +44,9 @@ def process(data_source1, data_source2, output_path, start_time='1970-01-01T00:00:00', end_time='2100-01-01T00:00:00', instrument_response_inventory=None, instrument_response_output='vel', water_level=50, clip_to_2std=False, whitening=False, whitening_window_frequency=0, - one_bit_normalize=False, location_preferences=None, - ds1_zchan=None, ds1_nchan=None, ds1_echan=None, - ds2_zchan=None, ds2_nchan=None, ds2_echan=None, corr_chan=None, - envelope_normalize=False, ensemble_stack=False, apply_stacking=True, + one_bit_normalize=False, location_preferences=None, ds1_zchan=None, ds1_nchan=None, + ds1_echan=None, ds2_zchan=None, ds2_nchan=None, ds2_echan=None, corr_chan=None, + envelope_normalize=False, ensemble_stack=False, subset_stacker=None, apply_simple_stacking=True, restart=False, dry_run=False, no_tracking_tag=False, scratch_folder=None): """ :param data_source1: Text file containing paths to ASDF files @@ -120,6 +120,7 @@ def outputConfigParameters(): if(whitening): f.write('%35s\t\t\t: %s\n' % ('--whitening-window-frequency', whitening_window_frequency)) f.write('%35s\t\t\t: %s\n' % ('--ensemble-stack', ensemble_stack)) + f.write('%35s\t\t\t: %s\n' % ('--subset-stack', subset_stacker is not None)) f.write('%35s\t\t\t: %s\n' % ('--restart', 'TRUE' if restart else 'FALSE')) f.write('%35s\t\t\t: %s\n' % ('--no-tracking-tag', 'TRUE' if no_tracking_tag else 'FALSE')) f.write('%35s\t\t\t: %s\n' % ('--scratch-folder', scratch_folder)) @@ -309,8 +310,8 @@ def get_loccha(cha1, cha2): interval_seconds, window_seconds, window_overlap, window_buffer_length, fmin, fmax, clip_to_2std, whitening, whitening_window_frequency, one_bit_normalize, envelope_normalize, - ensemble_stack, apply_stacking, output_path, 2, time_tag, - scratch_folder, git_hash) + ensemble_stack, subset_stacker, apply_simple_stacking, output_path, 2, + time_tag, scratch_folder, git_hash) # end for # end for # end func @@ -319,9 +320,9 @@ def get_loccha(cha1, cha2): CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'], show_default=True) @click.command(context_settings=CONTEXT_SETTINGS) @click.argument('data-source1', - type=click.Path('r')) + type=click.Path(exists=True)) @click.argument('data-source2', - type=click.Path('r')) + type=click.Path(exists=True)) @click.argument('output-path', required=True, type=click.Path(exists=True)) @click.argument('window-seconds', required=True, @@ -363,7 +364,7 @@ def get_loccha(cha1, cha2): @click.option('--station-names2', default='*', type=str, help="Either station name(s) (space-delimited) or a text file containing NET.STA entries in each line to " "process in data-source-2; default is '*', which processes all available stations.") -@click.option('--pairs-to-compute', default=None, type=click.Path('r'), +@click.option('--pairs-to-compute', default=None, type=click.Path(exists=True), help="Text file containing station pairs (NET.STA.NET.STA) for which cross-correlations are to be computed." "Note that this parameter is intended as a way to restrict the number of computations to only the " "station-pairs listed in the text-file.") @@ -374,7 +375,7 @@ def get_loccha(cha1, cha2): type=str, help="Date and time (in UTC format) to stop at") @click.option('--instrument-response-inventory', default=None, - type=click.Path('r'), + type=click.Path(exists=True), help="FDSNxml inventory containing instrument response information. Note that when this parameter is provided, " "instrument response corrections are automatically applied for matching stations with response " "information.") @@ -401,7 +402,7 @@ def get_loccha(cha1, cha2): help="Apply one-bit normalization to data in each window. Note that the default time-domain normalization " "is N(0,1), i.e. 0-mean and unit variance") @click.option('--location-preferences', default=None, - type=click.Path('r'), + type=click.Path(exists=True), help="A comma-separated two-columned text file containing location code preferences for " "stations in the form: 'NET.STA, LOC'. Note that location code preferences need not " "be provided for all stations -- the default is None. This approach allows for " @@ -439,6 +440,20 @@ def get_loccha(cha1, cha2): "for a given station-pair. In other words, stacks over " "'interval-seconds' are in turn stacked to produce a " "single cross-correlation function") +@click.option('--subset-stack', is_flag=True, + help="Outputs a number of stacks of cross-correlation subsets as identified based on the presence " + "or absence of azimuthal earthquake energy within cross-correlation windows, as outlined in " + "Hejrani et al. (in prep.). Note that this option is not compatible with --ensemble-stack and " + "--stacking-interval-seconds. A file named subset_stack.conf is expected in the current working " + "folder, with appropriate colon-separated values for the keys below: " + "CMT_CATALOG_PATH: 'path/to/CMT/catalog'" + "SW_VMIN: 2" + "SW_VMAX: 5.5" + "DIST_MIN: 0" + "DIST_MAX: 999" + "EMAG_MIN: 6" + "EMAG_MAX: 999" + "AZ_TOL: 30") @click.option('--restart', default=False, is_flag=True, help='Restart job') @click.option('--dry-run', default=False, is_flag=True, help='Dry run for printing out station-pairs and ' 'additional stats.') @@ -451,7 +466,7 @@ def main(data_source1, data_source2, output_path, window_seconds, window_overlap start_time, end_time, instrument_response_inventory, instrument_response_output, water_level, clip_to_2std, whitening, whitening_window_frequency, one_bit_normalize, location_preferences, ds1_zchan, ds1_nchan, ds1_echan, ds2_zchan, ds2_nchan, ds2_echan, corr_chan, envelope_normalize, - ensemble_stack, restart, dry_run, no_tracking_tag, scratch_folder): + ensemble_stack, subset_stack, restart, dry_run, no_tracking_tag, scratch_folder): """ DATA_SOURCE1: Text file containing paths to ASDF files \n DATA_SOURCE2: Text file containing paths to ASDF files \n @@ -476,11 +491,16 @@ def main(data_source1, data_source2, output_path, window_seconds, window_overlap if(stacking_interval_seconds is not None): if(stacking_interval_seconds < window_seconds): raise ValueError('Invalid value for --stacking-interval-seconds, must be > WINDOW_SECONDS') - if (stacking_interval_seconds > window_seconds * read_ahead_windows): + if(stacking_interval_seconds > window_seconds * read_ahead_windows): raise ValueError("""Invalid value for --stacking-interval-seconds, \ must be < WINDOW_SECONDS*READ_AHEAD_WINDOWS""") - # end if + if(subset_stack): raise ValueError('Subset-stacking based on a GCMT catalog is incompatible with stacking over ' + 'fixed intervals, e.g. over 24 hrs, as done through --stacking-interval-seconds') + # end if + if(ensemble_stack and subset_stack): + raise ValueError('Ensemble-stacking and subset-stacking are mutually exclusive options') + # end if ####################################################### # Compute amount of data to be read in in each IO call, @@ -489,7 +509,7 @@ def main(data_source1, data_source2, output_path, window_seconds, window_overlap ####################################################### interval_seconds = None read_ahead_window_seconds = None - apply_stacking = None + apply_simple_stacking = False if(stacking_interval_seconds is None): if(ensemble_stack): raise ValueError('--ensemble-stack is only applicable with --stacking-interval-seconds. Aborting..') @@ -500,21 +520,27 @@ def main(data_source1, data_source2, output_path, window_seconds, window_overlap window_seconds * window_buffer_length * 2 + \ window_overlap * window_seconds * 2 interval_seconds = read_ahead_window_seconds - apply_stacking = False + apply_simple_stacking = False + #print(read_ahead_window_seconds) else: read_ahead_window_seconds = window_seconds * read_ahead_windows interval_seconds = stacking_interval_seconds - apply_stacking = True + apply_simple_stacking = True # end if + # instantiate subset-stacker if requested + subset_stacker = None + if(subset_stack): subset_stacker = SubsetStacker() + process(data_source1, data_source2, output_path, interval_seconds, window_seconds, window_overlap, window_buffer_length, read_ahead_window_seconds, resample_rate, taper_length, nearest_neighbours, pair_min_dist, pair_max_dist, fmin, fmax, station_names1, station_names2, pairs_to_compute, start_time, end_time, instrument_response_inventory, instrument_response_output, water_level, clip_to_2std, whitening, whitening_window_frequency, one_bit_normalize, location_preferences, ds1_zchan, ds1_nchan, ds1_echan, ds2_zchan, ds2_nchan, ds2_echan, corr_chan, envelope_normalize, - ensemble_stack, apply_stacking, restart, dry_run, no_tracking_tag, scratch_folder) + ensemble_stack, subset_stacker, apply_simple_stacking, restart, dry_run, no_tracking_tag, + scratch_folder) # end func if __name__ == '__main__': diff --git a/seismic/xcorqc/subset_stacker.py b/seismic/xcorqc/subset_stacker.py new file mode 100644 index 00000000..35e5aada --- /dev/null +++ b/seismic/xcorqc/subset_stacker.py @@ -0,0 +1,267 @@ +from mpi4py import MPI +from seismic.catalogue.gcmt import GCMTCatalog +from seismic.xcorqc.utils import SpooledMatrix +import numpy as np +from scipy.io import loadmat +from scipy.interpolate import interp2d +from obspy.geodetics import degrees2kilometers +from seismic.xcorqc.utils import read_subset_stacker_config +import os + +DEG2KM = degrees2kilometers(1) + +class ExtendedTTInterpolator: + def __init__(self): + tt_folder = os.path.dirname(os.path.abspath(__file__)) + '/tt/' + ptable = loadmat(os.path.join(tt_folder, 'P.mat')) + stable = loadmat(os.path.join(tt_folder, 'S.mat')) + + self.pio = interp2d(ptable['deg'].squeeze(), + ptable['depths'].squeeze(), ptable['tt'].T, kind='cubic') + self.sio = interp2d(stable['deg'].squeeze(), + stable['depths'].squeeze(), stable['tt'].T, kind='cubic') + # end func + + def get_tt(self, phase, dist_deg, depth_km): + def get_values(io, x, y): + if (isinstance(x, np.ndarray) and isinstance(y, np.ndarray)): + assert len(x) == len(y), 'Length of x and y must be the same' + rv = np.zeros(len(x)) + for i in np.arange(len(x)): + rv[i] = io(x[i], y[i]) + # end for + return rv + else: + rv = io(x, y) + return rv[0] + # end if + # end func + + if (phase == 'P'): + return get_values(self.pio, dist_deg, depth_km) + elif (phase == 'S'): + return get_values(self.sio, dist_deg, depth_km) + else: + raise ValueError('TT tables for phase {} not available. Aborting..'. \ + format(phase)) + # end if + # end func +# end class + +class SubsetStacker(): + def __init__(self): + self.comm = MPI.COMM_WORLD + self.nproc = self.comm.Get_size() + self.rank = self.comm.Get_rank() + + # read configuration + d = read_subset_stacker_config() + self.gcmt_catalog_fn = d['CMT_CATALOG_PATH'] + self.V_MINMAX = [d['SW_VMIN'], d['SW_VMAX']] # km/s + self.DIST_MINMAX = [d['DIST_MIN'], d['DIST_MAX']] + self.EMAG_MINMAX = [d['EMAG_MIN'], d['EMAG_MAX']] + self.AZ_TOL = d['AZ_TOL'] + + self.gc = None + if(self.rank == 0): + self.gc = GCMTCatalog(self.gcmt_catalog_fn, ellipse='sphere') + # end if + + # broadcast gcmt-catalog instance to all ranks + self.comm.Barrier() + self.gc = self.comm.bcast(self.gc, root=0) + + # instantiate extended travel-time interpolator + self.tti = ExtendedTTInterpolator() + # end func + + def stack(self, spooled_matrix:SpooledMatrix, + window_start_times:np.ndarray, + windows_end_times: np.ndarray, + slon1:float, slat1:float, slon2:float, slat2:float): + """ + @param spooled_matrix: SpooledMatrix containing cross-correlation entries as rows + @param window_start_times: start-times corresponding to each row in SpooledMatrix + @param window_end_times: end-time corresponding to each row in SpooledMatrix + @param slon1: longitude of site 1 + @param slat1: latitude of site 1 + @param slon2: longitude of site 1 + @param slat2: latitude of site 2 + @return: returns np.ndarrays mean, mean_Xei, mean_Xec, mean_XeiUXec and mean_Xeo + as defined in the manuscript + """ + + def get_affected_indices(source_eids, pat, swat, swet): + """ + Finds indices of CC windows affected by P and SW energy + @param source_eids: source event indices, e.g. for station 1 within/outside azimuth + @param pat: P arrival time for given station + @param swat: SW arrival time for given station + @param swet: SW end time for given station + @return: returns CC window indices affected by P and/or SW energy + """ + affected_indices = np.zeros(len(xcst), dtype='?') + + for eid in np.where(source_eids)[0]: + for i in np.where((pat[eid] >= xcst) & (pat[eid] <= xcet))[0]: + affected_indices[i] = 1 + for j in np.where((swat[eid] >= xcst) & (swat[eid] <= xcet))[0]: + affected_indices[j] = 1 + if (j > i): affected_indices[i:j] = 1 + + for k in np.where((swet[eid] >= xcst) & (swet[eid] <= xcet))[0]: + affected_indices[k] = 1 + if (k > j): affected_indices[j:k] = 1 + # end for + # end for + # end for + # end for + return affected_indices + # end func + + assert (spooled_matrix.nrows == len(window_start_times) == len(windows_end_times)), \ + 'Number of rows in the spooled-matrix should equal the number of window start- and ' \ + 'end-times provided' + + xcst = window_start_times + xcet = windows_end_times + + # relevant events + reids = (self.gc.cat['EventOrigintim'] >= xcst[0]) & \ + (self.gc.cat['EventOrigintim'] <= xcet[-1]) & \ + (self.gc.cat['MwG'] >= self.EMAG_MINMAX[0]) & \ + (self.gc.cat['MwG'] <= self.EMAG_MINMAX[1]) + cat = self.gc.cat[reids] # subset of events relevant for CC being processed + + # compute distances and azimuths of relevant ecents from the two stations + p1 = [slon1, slat1] + p2 = [slon2, slat2] + az, baz, dist = self.gc.geod.inv(p1[0], p1[1], p2[0], p2[1]) + + #print(az, baz) + eaz1, ebaz1, edist1 = self.gc.geod.inv(np.ones(len(cat)) * p1[0], + np.ones(len(cat)) * p1[1], + cat['lon'], cat['lat']) + eaz2, ebaz2, edist2 = self.gc.geod.inv(np.ones(len(cat)) * p2[0], + np.ones(len(cat)) * p2[1], + cat['lon'], cat['lat']) + + edistkm1 = edist1 / 1e3 + edistkm2 = edist2 / 1e3 + + edistdeg1 = edistkm1 / DEG2KM + edistdeg2 = edistkm2 / DEG2KM + + # compute P and SW arrival times for relevant events at the two stations + edepth_km = np.array(cat['dep']) + otime = np.array(cat['EventOrigintim']) + + ptt1 = self.tti.get_tt('P', edistdeg1, edepth_km) + ptt2 = self.tti.get_tt('P', edistdeg2, edepth_km) + + pat1 = ptt1 + otime + pat2 = ptt2 + otime + + swat1 = (edistkm1 / self.V_MINMAX[1]) + otime + swet1 = (edistkm1 / self.V_MINMAX[0]) + otime + + swat2 = (edistkm2 / self.V_MINMAX[1]) + otime + swet2 = (edistkm2 / self.V_MINMAX[0]) + otime + + # find event indices that meet distance criteria for stations 1 and 2 + eids1 = (edistdeg1 >= self.DIST_MINMAX[0]) & (edistdeg1 <= self.DIST_MINMAX[1]) + eids2 = (edistdeg2 >= self.DIST_MINMAX[0]) & (edistdeg2 <= self.DIST_MINMAX[1]) + + # find event indices within azimuth of stations 1 and 2 + eids1_inside_az = eids1 & ((eaz1 >= (baz - self.AZ_TOL)) & (eaz1 <= (baz + self.AZ_TOL))) + eids2_inside_az = eids2 & ((eaz2 >= (az - self.AZ_TOL)) & (eaz2 <= (az + self.AZ_TOL))) + + eids_inside_az = eids1_inside_az | eids2_inside_az + + # find event indices outside azimuth of both stations + eids_outside_az = ~(eids_inside_az) + + if(True): + # sanity check + test = (eids1 & ((eaz1 < (baz - self.AZ_TOL)) | (eaz1 > (baz + self.AZ_TOL)))) & \ + (eids2 & ((eaz2 < (az - self.AZ_TOL)) | (eaz2 > (az + self.AZ_TOL)))) + + assert np.alltrue(eids_outside_az == test) + assert len(set(np.where(eids1_inside_az | eids2_inside_az)[0]).intersection( \ + set(np.where(eids_outside_az)[0]))) == 0 + # end if + + # find indices of CCs inside/outside azimuth of relevant events + ccids1_inside_az = get_affected_indices(eids1_inside_az, pat1, swat1, swet1) + ccids2_inside_az = get_affected_indices(eids2_inside_az, pat2, swat2, swet2) + ccids_inside_az = ccids1_inside_az | ccids2_inside_az + + #print('ccs inside azimuth for station 1: {}'.format(np.sum(ccids1_inside_az))) + #print('ccs inside azimuth for station 2: {}'.format(np.sum(ccids2_inside_az))) + + ccids1_outside_az = get_affected_indices(eids_outside_az, pat1, swat1, swet1) + ccids2_outside_az = get_affected_indices(eids_outside_az, pat2, swat2, swet2) + ccids_outside_az = ccids1_outside_az | ccids2_outside_az + + #print('ccs outside azimuth for station 1: {}'.format(np.sum(ccids1_outside_az))) + #print('ccs outside azimuth for station 2: {}'.format(np.sum(ccids2_outside_az))) + + # aliases to indices as named in the manuscript + idsXei = ccids_inside_az # inside az + idsXec = ~(ccids_inside_az | ccids_outside_az) # no events + idsXeiUXec = idsXei | idsXec # no events outside az range + idsXeo = ccids_outside_az + + # compute means + mean = mean_Xei = mean_Xec = mean_XeiUXec = mean_Xeo = None + mean = np.zeros(spooled_matrix.ncols) + mean_Xei = np.zeros(spooled_matrix.ncols) + mean_Xec = np.zeros(spooled_matrix.ncols) + mean_XeiUXec = np.zeros(spooled_matrix.ncols) + mean_Xeo = np.zeros(spooled_matrix.ncols) + + for i in np.arange(spooled_matrix.nrows): + row = spooled_matrix.read_row(i) + mean += row + + if (idsXei[i]): mean_Xei += row + if (idsXec[i]): mean_Xec += row + if (idsXeiUXec[i]): mean_XeiUXec += row + if (idsXeo[i]): mean_Xeo += row + # end for + + wc = spooled_matrix.nrows + wc_Xei = np.sum(idsXei) + wc_Xec = np.sum(idsXec) + wc_XeiUXec = np.sum(idsXeiUXec) + wc_Xeo = np.sum(idsXeo) + + mean /= float(wc) + mean_Xei /= float(wc_Xei) + mean_Xec /= float(wc_Xec) + mean_XeiUXec /= float(wc_XeiUXec) + mean_Xeo /= float(wc_Xeo) + + #""" + np.savez('stack3outputs.npz', xcf=mean, + xcf1=mean_Xei, xcf2=mean_Xec, xcf3=mean_XeiUXec, + xcf4=mean_Xeo, idsXei=idsXei, idsXec=idsXec, + idsXeiUXec=idsXeiUXec, idsXeo=idsXeo) + #""" + + return mean, mean_Xei, mean_Xec, mean_XeiUXec, mean_Xeo, \ + wc, wc_Xei, wc_Xec, wc_XeiUXec, wc_Xeo + # end func +# end class + +if __name__=="__main__": + ss = SubsetStacker() + + for i in np.arange(ss.nproc): + if(i==ss.rank): + pass + #print('Rank: {}\n================'.format(i)) + #print(ss.gc.cat['EventOrigintim']) + # end if + # end for +# end if \ No newline at end of file diff --git a/seismic/xcorqc/tt/P.mat b/seismic/xcorqc/tt/P.mat new file mode 100644 index 00000000..f5567eb1 Binary files /dev/null and b/seismic/xcorqc/tt/P.mat differ diff --git a/seismic/xcorqc/tt/S.mat b/seismic/xcorqc/tt/S.mat new file mode 100644 index 00000000..2914922c Binary files /dev/null and b/seismic/xcorqc/tt/S.mat differ diff --git a/seismic/xcorqc/utils.py b/seismic/xcorqc/utils.py index edb40597..d56e8354 100644 --- a/seismic/xcorqc/utils.py +++ b/seismic/xcorqc/utils.py @@ -8,9 +8,10 @@ from tempfile import SpooledTemporaryFile from scipy.interpolate import interp1d from seismic.ASDFdatabase.FederatedASDFDataSet import FederatedASDFDataSet -from seismic.misc import rtp2xyz +from seismic.misc import rtp2xyz, read_key_value_pairs from seismic.misc import get_git_revision_hash, rtp2xyz, split_list import os, psutil +from netCDF4 import Dataset as ncDataset class Dataset: def __init__(self, asdf_file_name, netsta_list='*'): @@ -168,6 +169,28 @@ def get_unique_station_pairs(self, other_dataset, nn=1, require_overlap=True, # end func # end class +def read_subset_stacker_config()->dict: + keys = ["CMT_CATALOG_PATH", + "SW_VMIN", + "SW_VMAX", + "DIST_MIN", + "DIST_MAX", + "EMAG_MIN", + "EMAG_MAX", + "AZ_TOL"] + fn = os.path.join(os.getcwd(), 'subset_stack.conf') + try: + d = read_key_value_pairs(fn, keys, strict=True) + for k in keys[1:]: + d[k] = float(d[k]) + # end for + except Exception as e: + print('Reading {} failed with error: {}'.format(fn, e)) + # end try + + return d +# end func + def read_location_preferences(location_preferences_fn): result = defaultdict(lambda: None) @@ -368,6 +391,38 @@ def __init__(self, ncols, dtype=np.float32, max_size_mb=2048, prefix='', dir=Non self._file = SpooledTemporaryFile(prefix = self._prefix, mode = 'w+b', max_size = max_size_mb * 1024**2, dir=dir) # end func + @classmethod + def from_nc(cls, nc_file): + """ + Alternative instantiation for testing purposes + @param nc_file: + @return: + """ + try: + ds = ncDataset(nc_file) + xcorr = np.array(ds.variables['xcorr']) + shp = xcorr.shape + ncols = 0 + + if(len(shp) == 1): ncols = shp[0] + elif(len(shp) == 2): ncols = shp[1] + + sm = cls(ncols, dtype=xcorr.dtype) + + if(len(shp) == 1): + sm.write_row(xcorr) + elif(len(shp) == 2): + for i in np.arange(shp[0]): + sm.write_row(xcorr[i, :]) + # end for + # end if + + return sm + except Exception as e: + print('Failed to load {} with error: {}'.format(nc_file, str(e))) + # end try + # end func + @property def ncols(self): return self._ncols diff --git a/seismic/xcorqc/xcorqc.py b/seismic/xcorqc/xcorqc.py index 6a68f6f3..d50cf2ec 100644 --- a/seismic/xcorqc/xcorqc.py +++ b/seismic/xcorqc/xcorqc.py @@ -34,6 +34,7 @@ from seismic.xcorqc.fft import * from seismic.ASDFdatabase.FederatedASDFDataSet import FederatedASDFDataSet from seismic.xcorqc.utils import get_stream, fill_gaps, MemoryTracker +from seismic.xcorqc.subset_stacker import SubsetStacker from netCDF4 import Dataset from functools import reduce from seismic.xcorqc.utils import SpooledMatrix @@ -116,7 +117,7 @@ def xcorr2(tr1, tr2, sta1_inv=None, sta2_inv=None, interval_seconds=86400, taper_length=0.05, resample_rate=None, flo=None, fhi=None, clip_to_2std=False, whitening=False, whitening_window_frequency=0, one_bit_normalize=False, envelope_normalize=False, - apply_stacking=True, verbose=1, logger=None): + apply_simple_stacking=True, verbose=1, logger=None): # Length of window_buffer in seconds window_buffer_seconds = window_buffer_length * window_seconds @@ -179,12 +180,12 @@ def xcorr2(tr1, tr2, sta1_inv=None, sta2_inv=None, # Track back to avoid losing data while traversing onto the next interval. # Note that this only applies when stacking is not applied. - if((intervalCount > 0) and (not apply_stacking)): + if((intervalCount > 0) and (not apply_simple_stacking)): itr1s -= (2*window_buffer_seconds*sr1_orig + window_samples_1 * window_overlap) itr2s -= (2*window_buffer_seconds*sr2_orig + window_samples_2 * window_overlap) # end if - if(apply_stacking): + if(apply_simple_stacking): # The starting time of stacking-intervals is relative to hour 0000, # and not the start-times of input traces. To maintain this consistency, # when input trace start-times are not aligned to hour 0000, we stack data for a @@ -455,7 +456,7 @@ def xcorr2(tr1, tr2, sta1_inv=None, sta2_inv=None, itr1s = itr1e itr2s = itr2e - if(apply_stacking): + if(apply_simple_stacking): if windowCount > 0: mean = reduce((lambda tx, ty: tx + ty), intervalXcorrList) / float(windowCount) else: @@ -522,7 +523,8 @@ def IntervalStackXCorr(refds, tempds, clip_to_2std=False, whitening=False, whitening_window_frequency=0, one_bit_normalize=False, envelope_normalize=False, ensemble_stack=False, - apply_stacking=True, + subset_stacker: SubsetStacker=None, + apply_simple_stacking=True, outputPath='/tmp', verbose=1, tracking_tag='', scratch_folder=None, git_hash=''): """ @@ -595,8 +597,11 @@ def IntervalStackXCorr(refds, tempds, :param envelope_normalize: Envelope via Hilbert transforms and normalize :type ensemble_stack: bool :param ensemble_stack: Outputs a single CC function stacked over all data for a given station-pair - :type apply_stacking: bool - :param apply_stacking: stacks cross-correlation windows over intervals + :type subset_stacker: SubsetStacker + :param subset_stacker: Custom stacker to stack over subsets of CC windows, identified based on the + presence/absence of azimuthal earthquake energy + :type apply_simple_stacking: bool + :param apply_simple_stacking: stacks cross-correlation windows over intervals :type outputPath: str :param outputPath: Folder to write results to :type verbose: int @@ -690,7 +695,7 @@ def IntervalStackXCorr(refds, tempds, while cTime < endTime: # track back to avoid losing data while traversing onto the next # block of data - if((cTime > startTime) and (not apply_stacking)): + if((cTime > startTime) and (not apply_simple_stacking)): cTime -= (2*window_buffer_length*window_seconds + window_seconds*window_overlap) # end if @@ -778,7 +783,7 @@ def IntervalStackXCorr(refds, tempds, whitening_window_frequency=whitening_window_frequency, one_bit_normalize=one_bit_normalize, envelope_normalize=envelope_normalize, - apply_stacking=apply_stacking, + apply_simple_stacking=apply_simple_stacking, verbose=verbose, logger=logger) # Continue if no results were returned due to data-gaps @@ -887,40 +892,68 @@ def IntervalStackXCorr(refds, tempds, root_grp.createDimension('interval', flattenedIntervalStartTimes.shape[0]) root_grp.createDimension('window', flattenedWindowStartTimes.shape[0]) - # Variables + # create and polulate variables interval = root_grp.createVariable('interval', 'f4', ('interval',)) ist = root_grp.createVariable('IntervalStartTimes', 'i8', ('interval',)) iet = root_grp.createVariable('IntervalEndTimes', 'i8', ('interval',)) wst = root_grp.createVariable('WindowStartTimes', 'i8', ('window',)) wet = root_grp.createVariable('WindowEndTimes', 'i8', ('window',)) - xc = None - nsw = None - if(apply_stacking): + interval[:] = np.arange(flattenedIntervalStartTimes.shape[0]) + ist[:] = flattenedIntervalStartTimes + iet[:] = flattenedIntervalEndTimes + wst[:] = flattenedWindowStartTimes + wet[:] = flattenedWindowEndTimes + + if(apply_simple_stacking): nsw = root_grp.createVariable('NumStackedWindows', 'f4', ('interval',)) xc = root_grp.createVariable('xcorr', 'f4', ('interval', 'lag',), chunksizes=(1, spooledXcorr.ncols), zlib=True) + nsw[:] = flattenedWindowCounts + for irow in np.arange(spooledXcorr.nrows): + xc[irow, :] = spooledXcorr.read_row(irow) + # end for + elif subset_stacker is not None: + xc = root_grp.createVariable('xcorr', 'f4', ('lag',)) + xc_Xei = root_grp.createVariable('xcorr_Xei', 'f4', ('lag',)) + xc_Xec = root_grp.createVariable('xcorr_Xec', 'f4', ('lag',)) + xc_XeiUXec = root_grp.createVariable('xcorr_XeiUXec', 'f4', ('lag',)) + xc_Xeo = root_grp.createVariable('xcorr_Xeo', 'f4', ('lag',)) + + xc_wc = root_grp.createVariable('xcorr_NumStackedWindows', 'i8') + xc_Xei_wc = root_grp.createVariable('xcorr_Xei_NumStackedWindows', 'i8') + xc_Xec_wc = root_grp.createVariable('xcorr_Xec_NumStackedWindows', 'i8') + xc_XeiUXec_wc = root_grp.createVariable('xcorr_XeiUXec_NumStackedWindows', 'i8') + xc_Xeo_wc = root_grp.createVariable('xcorr_Xeo_NumStackedWindows', 'i8') + + slon1, slat1 = refds.unique_coordinates[ref_net_sta] + slon2, slat2 = tempds.unique_coordinates[temp_net_sta] + mean, mean_Xei, mean_Xec, mean_XeiUXec, mean_Xeo, \ + wc, wc_Xei, wc_Xec, wc_XeiUXec, wc_Xeo = \ + subset_stacker.stack(spooledXcorr, flattenedWindowStartTimes, + flattenedWindowEndTimes, + slon1, slat1, slon2, slat2) + xc[:] = mean + xc_Xei[:] = mean_Xei + xc_Xec[:] = mean_Xec + xc_XeiUXec[:] = mean_XeiUXec + xc_Xeo[:] = mean_Xeo + + # add window counts + xc_wc[:] = wc + xc_Xei_wc[:] = wc_Xei + xc_Xec_wc[:] = wc_Xec + xc_XeiUXec_wc[:] = wc_XeiUXec + xc_Xeo_wc[:] = wc_Xeo else: xc = root_grp.createVariable('xcorr', 'f4', ('window', 'lag',), chunksizes=(1, spooledXcorr.ncols), zlib=True) + for irow in np.arange(spooledXcorr.nrows): + xc[irow, :] = spooledXcorr.read_row(irow) + # end for # end if - - # Populate variables - if(apply_stacking): - nsw[:] = flattenedWindowCounts - # end if - - interval[:] = np.arange(flattenedIntervalStartTimes.shape[0]) - ist[:] = flattenedIntervalStartTimes - iet[:] = flattenedIntervalEndTimes - wst[:] = flattenedWindowStartTimes - wet[:] = flattenedWindowEndTimes - - for irow in np.arange(spooledXcorr.nrows): - xc[irow, :] = spooledXcorr.read_row(irow) - # end for # end if lag[:] = x @@ -947,7 +980,9 @@ def IntervalStackXCorr(refds, tempds, 'zero_mean_1std_normalize': int(clip_to_2std is False and one_bit_normalize is False), 'spectral_whitening': int(whitening), 'envelope_normalize': int(envelope_normalize), - 'ensemble_stack': int(ensemble_stack)} + 'ensemble_stack': int(ensemble_stack), + 'simple_stack': int(apply_simple_stacking), + 'subset_stack': int(subset_stacker is not None)} if whitening: params['whitening_window_frequency'] = whitening_window_frequency diff --git a/tests/test_seismic/xcorqc/test_correlator.py b/tests/test_seismic/xcorqc/test_correlator.py index 34045b12..3b0e6f88 100644 --- a/tests/test_seismic/xcorqc/test_correlator.py +++ b/tests/test_seismic/xcorqc/test_correlator.py @@ -55,9 +55,9 @@ expected_folder = '%s/data/expected/'%(path) output_folder = str(tempfile.mkdtemp()) +#output_folder = '/tmp' os.mkdir(os.path.join(output_folder, 'stacked')) os.mkdir(os.path.join(output_folder, 'unstacked')) -#output_folder = '/tmp' def test_correlator(): start_time = '2006-11-03T00:00:00' @@ -76,7 +76,7 @@ def test_correlator(): netsta2, None, start_time, end_time, None, 'vel', 50, False, True, 0.02, True, loc_pref, '*Z', '*N', '*E', '*Z', '*N', '*E', 'z', False, False, - True, False, False, True, None) + None, True, False, False, True, None) # Read result @@ -109,7 +109,7 @@ def test_correlator(): netsta2, None, start_time, end_time, None, 'vel', 50, False, True, 0.02, True, loc_pref, '*Z', '*N', '*E', '*Z', '*N', '*E', 'z', False, False, - False, False, False, True, None) + None, False, False, False, True, None) # Read result diff --git a/tests/test_seismic/xcorqc/test_xcorr.py b/tests/test_seismic/xcorqc/test_xcorr.py index 18b4c526..2163fd4d 100644 --- a/tests/test_seismic/xcorqc/test_xcorr.py +++ b/tests/test_seismic/xcorqc/test_xcorr.py @@ -175,7 +175,7 @@ def expected_lines_iter(): interval_seconds=isec, window_overlap=olap, window_buffer_length=wbl, - apply_stacking=True) + apply_simple_stacking=True) olines.append("============[test: {}]============\n".format(i)) olines.append('Params: raw: {}, wsec: {}, olap: {}, wbl: {}\n'.format(raw, wsec, olap, wbl)) @@ -260,7 +260,7 @@ def expected_lines_iter(): interval_seconds=isec, window_overlap=olap, window_buffer_length=wbl, - apply_stacking=True) + apply_simple_stacking=True) olines.append("============[test: {}]============\n".format(i)) olines.append('Params: wsec: {}, isec: {}, olap: {}, wbl: {}\n'.format(wsec, isec, olap, wbl))