Source code for filterpicker.filterpicker

import os
import numpy as np
import matplotlib.pyplot as plt


# ---------------------------------------- Class Def


[docs]class FilterPicker(object): """ Main Class for the FilterPicker algorithm Implementation of A.Lomax FilterPicker in Python language. Support available for Python >= 3.5 At the moment the picker works on the entire trace, not suitable for online picking. Support will be given soon ... Args: dt (int, float): is the sampling time-delta for the given array in seconds. data (list, tuple, np.array): is the actual time-series array. It must contain only numbers. filter_window (int, float): picker's parameter longterm_window (int, float): picker's parameter t_up (int, float): picker's parameter threshold_1 (int, float): picker's parameter threshold_2 (int, float): picker's parameter base (int): picker's parameter For an exhaustive description of the picker's parameters the reader is referred to the reference paper References: Lomax, A., C. Satriano and M. Vassallo (2012), Automatic picker developments and optimization: FilterPicker - a robust, broadband picker for real-time seismic monitoring and earthquake early-warning, Seism. Res. Lett. , 83, 531-540, doi: [10.1785/gssrl.83.3.531](10.1785/gssrl.83.3.531) """ def __init__(self, dt, data, filter_window=3.0, # dt=0.01 sec --> 300 samples longterm_window=5.0, # dt=0.01 sec --> 500 samples t_up=0.2, # dt=0.01 sec --> 500 samples threshold_1=10, threshold_2=10, base=2): self.dt = dt self.y = data self.tf = self._sec2sample(filter_window, 1.0/self.dt) self.tl = self._sec2sample(longterm_window, 1.0/self.dt) self.tup = self._sec2sample(t_up, 1.0/self.dt) self.thr1 = float(threshold_1) self.thr2 = float(threshold_2) self.numSMP = len(self.y) # MB: next line can be changed to work on smaller portion self.veclim = (0, len(self.y)) self.base = base # For new 'get' methods self.ididrun = False def _sec2sample(self, value, df): """ Convert seconds to samples number Utility method to define convert USER input parameters (seconds) into obspy pickers `n_sample` units. Formula: `int(round( INSEC * df))` Args: value (int, float): input seconds df (int, float): number of samples per seconds Returns: number of samples """ _tmp = value * df _tmp += 0.00111 # trick/workaround to properly round XXX.5 to XXX+1.0 return int(np.round(_tmp)) def _setup(self): """ Finalize the class attribute """ self.ididrun = True # Filter window in dT PRM_Tflt = self.base**np.ceil(np.log(self.tf)/np.log(self.base)) self.PRM_Tlng = self.tl self.PRM_Clng = 1 - (1/self.PRM_Tlng) self.PRM_Tup = np.max([1, self.tup]) self.MIN_SIG = np.finfo(float).tiny Navg = np.min([np.round(self.PRM_Tlng), self.numSMP]) self.y0 = np.mean(self.y[0:Navg]) self.numBnd = int(np.ceil(np.log(PRM_Tflt)/np.log(self.base))) self.Fn = np.zeros([self.numBnd, self.numSMP]) self.FnL = np.zeros([self.numBnd, self.numSMP]) self.Yout = np.zeros([self.numBnd, self.numSMP]) return True def _loopOverBands(self): """ Core loop of the picker's algorithm """ # MB: For each band the CF is created for n in range(1, self.numBnd): # --- Define Poles for filtering bands w = (self.base**n*self.dt) / (2*np.pi) cHP = w/(w+self.dt) cLP = self.dt/(w+self.dt) yHP1p = 0 yHP2p = 0 yLPp = 0 # avg = 0 vrn = 0 sig = 0 tmpFNL = self.thr1/2 for i in range(self.veclim[0], self.veclim[1]): # First high-pass if i != 0: yHP1 = cHP*(yHP1p + self.y[i] - self.y[i-1]) else: yHP1 = cHP*(yHP1p + self.y[i] - self.y0) # yHP2 = cHP*(yHP2p + yHP1 - yHP1p) # Second high-pass yLP = yLPp + cLP*(yHP2 - yLPp) # Low-pass En = yLP**2 # Envelope # MB: store anyway for further plots self.Yout[n, i] = yLP # yHP1p = yHP1 yHP2p = yHP2 yLPp = yLP if (sig > self.MIN_SIG): if ((En-avg) / sig > 5*self.thr1): En = 5*self.thr1 * sig+avg self.Fn[n, i] = 5*self.thr1 else: self.Fn[n, i] = (En-avg)/sig # CF if (self.Fn[n, i] < 1): self.Fn[n, i] = 0.0 avg = self.PRM_Clng*avg + (1-self.PRM_Clng)*En vrn = self.PRM_Clng*vrn + (1-self.PRM_Clng)*(En-avg) ** 2 sig = np.sqrt(vrn) # Standard deviation tmpFNL = self.PRM_Clng*tmpFNL + (1-self.PRM_Clng)*self.Fn[n, i] tmpFNL = min(self.thr1/2, tmpFNL) tmpFNL = max(0.5, tmpFNL) self.FnL[n, i] = tmpFNL # self.FnS = self.Fn.max(0) # Maximum of all bands self.FnS[self.FnS < 0] = 0 convVec = (np.concatenate(([0.0], np.ones(self.PRM_Tup-2), [0.0])) / self.PRM_Tup) self.FnMAvg = np.convolve(self.FnS, convVec, 'valid') self.PotTrg = np.where(self.FnS > self.thr1)[0] return True def _analyzeTrigger(self): """ Final part of picker's algorithm """ # Next line find index to remove from array remidx = np.where((self.PotTrg < (self.PRM_Tlng + self.veclim[0])) | (self.PotTrg > (self.numSMP - self.PRM_Tup)) )[0] self.PotTrg = np.delete(self.PotTrg, remidx) flagTrg = np.full(np.shape(self.PotTrg), True) pickIDX = np.full(np.shape(self.PotTrg), np.nan) pickUNC = np.full(np.shape(self.PotTrg), np.nan) pickFRQ = np.full(np.shape(self.PotTrg), np.nan) for i in range(0, len(flagTrg)): # *** MB check index if flagTrg[i]: tmpTrg = self.PotTrg[i] if self.FnMAvg[tmpTrg] > self.thr2: bandTrg = np.where(self.Fn[:, tmpTrg] > self.thr1)[0][0] firstPot = np.where((self.Fn[bandTrg, np.arange(tmpTrg, -1, -1)] - self.FnL[bandTrg, np.arange(tmpTrg, -1, -1)]) < 0)[0][0] # if not firstPot: firstPot = 0 pickFRQ[i] = bandTrg pickIDX[i] = (tmpTrg - firstPot) limT = (self.base**bandTrg)/20 if firstPot < limT: pickUNC[i] = limT * self.dt else: pickUNC[i] = firstPot * self.dt # Disable next potential picks until FnS drops below 2 try: idxDrop = np.where(self.FnS[tmpTrg:] < 2)[0][0] except IndexError: idxDrop = None if idxDrop: flagTrg[(self.PotTrg > tmpTrg) & (self.PotTrg <= (tmpTrg + idxDrop))] = 0 # self.PotTrg = self.PotTrg[~np.isnan(pickIDX)] self.pickIDX = pickIDX[~np.isnan(pickIDX)] self.pickUNC = pickUNC[~np.isnan(pickUNC)] self.pickFRQ = pickFRQ[~np.isnan(pickFRQ)] return True def _normalizeTrace(self, nparr, rangeVal=[-1, 1]): """ Utility function for time-series normalization This simple method will normalize (scaling) the trace between rangeVal. Args: nparr (np.array): the input time-series rangeval (list): a 2-element list defining upper and lower value or the scaling Returns: outarr (np.array): a normalized copy of the orginal array """ outarr = np.zeros(shape=nparr.shape) outarr = (((nparr - np.min(nparr)) / (np.max(nparr) - np.min(nparr))) * (rangeVal[1] - rangeVal[0])) outarr = outarr + rangeVal[0] return outarr # ----------------------------------------------- PUBLIC
[docs] def run(self): """ Main method for picking Orchestrator of the picker, calls in sequence the needed functions. Returns: pickIDX (list): the time-series indexes of picks pickUNC (list): the time-series indexes of picks pickFRQ (list): the frequency band where it was triggered """ self._setup() self._loopOverBands() self._analyzeTrigger() return self.pickIDX*self.dt, self.pickUNC, self.pickFRQ+1
[docs] def plot(self, show=True, fp_param_title=True, fig_title=None): """ Main picker's plot routine. Create a comprehensive figure of the different CFs and picks. If show=True the fgire is displayed realtime. Optionals: show (bool): if True, the figure will be displayed at run time fp_param_title (bool): if True, the figure's title will be composed with the picking parameters value adopted fig_title (str): default to None. If a string is given, that will be the figure's title. Returns fig (plt.figure): figure's handle axLst (list): list of plot axis handles """ fig, axLst = plt.subplots(nrows=self.numBnd, sharex=True, sharey=True, figsize=(10, 7)) ymax = np.max(self.FnS[self.PRM_Tlng:]) timeax = np.arange(0, self.numSMP, 1)*self.dt pltVec = self._normalizeTrace(self.y, rangeVal=[0, ymax]) # Raw Plot axLst[0].plot(timeax, pltVec[:], '-k', lw=2, label='Raw Input') axLst[0].plot(timeax, self.FnS, ':k', lw=1.5, label='max CF') axLst[0].plot(np.arange(0, len(self.FnMAvg), 1)*self.dt, self.FnMAvg, ':r', lw=1.5, label='Mov. Avg. (Tup)') # 24052019 Next if avoid crashing of software if no pick found if self.PotTrg.size and self.pickIDX.size: for _xx in range(self.PotTrg.size): axLst[0].axvline(self.PotTrg[_xx]*self.dt, color='b', ls='solid', lw=2, label='Triggers') for _yy in range(self.pickIDX.size): axLst[0].axvline(self.pickIDX[_yy]*self.dt, color='r', ls='solid', lw=2, label='Picks') axLst[0].set_xlim([timeax[0], timeax[-1]]) axLst[0].legend(loc='upper left') # for _kk in range(1, self.numBnd): # LP pltVec = (ymax*self.Yout[_kk, :] / (2*np.max(np.abs(self.Yout[_kk, :])))) axLst[_kk].plot( timeax, pltVec, '-', label=('{:.2f}'.format(1/(self.base**(_kk-1)*self.dt)) + ' HZ Lp')) # CF pltVec = self.Fn[_kk, :] axLst[_kk].plot( timeax, pltVec, '-', color='gold', label=('{:.2f}'.format(1/(self.base**(_kk-1)*self.dt)) + ' HZ Cf')) # HZ Accum. pltVec = self.FnL[_kk, :] axLst[_kk].plot( timeax, pltVec, '-', color='black', label=('{:.2f}'.format(1/(self.base**(_kk-1)*self.dt)) + ' HZ Accum.')) # Additional axLst[_kk].set_xlim([timeax[0], timeax[-1]]) axLst[_kk].legend(loc='upper left') plt.tight_layout() plt.subplots_adjust(hspace=0.0) # if fp_param_title: fig.suptitle((('TL, TF, Tup, S1, S2' + os.linesep + '%5.2f, %5.2f, %5.2f, %5.2f, %5.2f') % (self.tf, self.tl, self.tup, self.thr1, self.thr2)), fontsize=16, fontweight="bold") else: if fig_title: fig.suptitle(fig_title, fontsize=16, fontweight="bold") if show: plt.show() # return fig, axLst
[docs] def get_evaluation_function(self): """ Returns the carachteristic functions where picks are triggered. """ if self.ididrun: return self.FnS else: raise AttributeError("Missing evaluation function! " "Use the 'run' method before hand")
[docs] def get_bands(self): """ Returns the carachteristic function of each band """ out_dict = {} ymax = np.max(self.FnS[self.PRM_Tlng:]) # if self.ididrun: for _kk in range(1, self.numBnd): labkk = '{:.2f}'.format(1/(self.base**(_kk-1)*self.dt)) out_dict[labkk+" Hz LP"] = ( ymax*self.Yout[_kk, :] / (2*np.max(np.abs(self.Yout[_kk, :]))) ) out_dict[labkk+" Hz CF"] = self.Fn[_kk, :] out_dict[labkk+" Hz Cum."] = self.FnL[_kk, :] # return out_dict else: raise AttributeError("Missing filtered carachteristic functions! " "Use the 'run' method before hand")