diff --git a/geomagio/algorithm/SqDistAlgorithm.py b/geomagio/algorithm/SqDistAlgorithm.py index 6ac282abb333b7d00136c0c431e2f8fb08876a49..d9aa1584a957f8b79023dc6f607ee927ca4dd2ad 100644 --- a/geomagio/algorithm/SqDistAlgorithm.py +++ b/geomagio/algorithm/SqDistAlgorithm.py @@ -25,10 +25,25 @@ from scipy.optimize import fmin_l_bfgs_b class SqDistAlgorithm(Algorithm): """Solar Quiet, Secular Variation, and Disturbance algorithm""" - def __init__(self, alpha=None, beta=None, gamma=None, phi=1, m=1, - yhat0=None, s0=None, l0=None, b0=None, sigma0=None, - zthresh=6, fc=0, hstep=0, statefile=None, mag=False, - smooth=1): + def __init__( + self, + alpha=None, + beta=None, + gamma=None, + phi=1, + m=1, + yhat0=None, + s0=None, + l0=None, + b0=None, + sigma0=None, + zthresh=6, + fc=0, + hstep=0, + statefile=None, + mag=False, + smooth=1, + ): Algorithm.__init__(self, inchannels=None, outchannels=None) self.alpha = alpha self.beta = beta @@ -73,11 +88,13 @@ class SqDistAlgorithm(Algorithm): end of input required to generate requested output. """ if self.mag: - channels = ('H') - if observatory == self.last_observatory \ - and len(channels) == 1 \ - and channels[0] == self.last_channel \ - and start == self.next_starttime: + channels = "H" + if ( + observatory == self.last_observatory + and len(channels) == 1 + and channels[0] == self.last_channel + and start == self.next_starttime + ): # state is up to date, only need new data return (start, end) # state not up to date, need to prime @@ -112,22 +129,22 @@ class SqDistAlgorithm(Algorithm): return data = None try: - with open(self.statefile, 'r') as f: + with open(self.statefile, "r") as f: data = f.read() data = json.loads(data) except Exception: pass - if data is None or data == '': + if data is None or data == "": return - self.yhat0 = data['yhat0'] - self.s0 = data['s0'] - self.l0 = data['l0'] - self.b0 = data['b0'] - self.sigma0 = data['sigma0'] - self.last_observatory = data['last_observatory'] - self.last_channel = data['last_channel'] - self.last_delta = 'last_delta' in data and data['last_delta'] or None - self.next_starttime = UTCDateTime(data['next_starttime']) + self.yhat0 = data["yhat0"] + self.s0 = data["s0"] + self.l0 = data["l0"] + self.b0 = data["b0"] + self.sigma0 = data["sigma0"] + self.last_observatory = data["last_observatory"] + self.last_channel = data["last_channel"] + self.last_delta = "last_delta" in data and data["last_delta"] or None + self.next_starttime = UTCDateTime(data["next_starttime"]) def save_state(self): """Save algorithm state to a file. @@ -137,17 +154,17 @@ class SqDistAlgorithm(Algorithm): if self.statefile is None: return data = { - 'yhat0': list(self.yhat0), - 's0': list(self.s0), - 'l0': self.l0, - 'b0': self.b0, - 'sigma0': list(self.sigma0), - 'last_observatory': self.last_observatory, - 'last_channel': self.last_channel, - 'last_delta': self.last_delta, - 'next_starttime': str(self.next_starttime) + "yhat0": list(self.yhat0), + "s0": list(self.s0), + "l0": self.l0, + "b0": self.b0, + "sigma0": list(self.sigma0), + "last_observatory": self.last_observatory, + "last_channel": self.last_channel, + "last_delta": self.last_delta, + "next_starttime": str(self.next_starttime), } - with open(self.statefile, 'w') as f: + with open(self.statefile, "w") as f: f.write(json.dumps(data)) def process(self, stream): @@ -169,15 +186,19 @@ class SqDistAlgorithm(Algorithm): if self.mag: # convert stream to mag - if stream.select(channel='H').count() > 0 \ - and stream.select(channel='E').count() > 0: + if ( + stream.select(channel="H").count() > 0 + and stream.select(channel="E").count() > 0 + ): stream = StreamConverter.get_mag_from_obs(stream) - elif stream.select(channel='X').count() > 0 \ - and stream.select(channel='Y').count() > 0: + elif ( + stream.select(channel="X").count() > 0 + and stream.select(channel="Y").count() > 0 + ): stream = StreamConverter.get_mag_from_geo(stream) else: - raise AlgorithmException('Unable to convert to magnetic H') - stream = stream.select(channel='H') + raise AlgorithmException("Unable to convert to magnetic H") + stream = stream.select(channel="H") for trace in stream.traces: out += self.process_one(trace) @@ -206,44 +227,52 @@ class SqDistAlgorithm(Algorithm): """ out = Stream() # check state - if self.last_observatory is not None \ - or self.last_channel is not None \ - or self.last_delta is not None \ - or self.next_starttime is not None: + if ( + self.last_observatory is not None + or self.last_channel is not None + or self.last_delta is not None + or self.next_starttime is not None + ): # have state, verify okay to proceed - if trace.stats.station != self.last_observatory \ - or trace.stats.channel != self.last_channel \ - or trace.stats.delta != self.last_delta \ - or trace.stats.starttime != self.next_starttime: + if ( + trace.stats.station != self.last_observatory + or trace.stats.channel != self.last_channel + or trace.stats.delta != self.last_delta + or trace.stats.starttime != self.next_starttime + ): # state not correct raise AlgorithmException( - 'Inconsistent SQDist algorithm state' + - ' process(%s, %s, %s, %s) <> state(%s, %s, %s, %s)' % - (trace.stats.station, - trace.stats.channel, - trace.stats.delta, - trace.stats.starttime, - self.last_observatory, - self.last_channel, - self.last_delta, - self.next_starttime)) + "Inconsistent SQDist algorithm state" + + " process(%s, %s, %s, %s) <> state(%s, %s, %s, %s)" + % ( + trace.stats.station, + trace.stats.channel, + trace.stats.delta, + trace.stats.starttime, + self.last_observatory, + self.last_channel, + self.last_delta, + self.next_starttime, + ) + ) # process yhat, shat, sigmahat, yhat0, s0, l0, b0, sigma0 = self.additive( - yobs=trace.data, - m=self.m, - alpha=self.alpha, - beta=self.beta, - gamma=self.gamma, - phi=self.phi, - yhat0=self.yhat0, - s0=self.s0, - l0=self.l0, - b0=self.b0, - sigma0=self.sigma0, - zthresh=self.zthresh, - fc=self.fc, - hstep=self.hstep, - smooth=self.smooth) + yobs=trace.data, + m=self.m, + alpha=self.alpha, + beta=self.beta, + gamma=self.gamma, + phi=self.phi, + yhat0=self.yhat0, + s0=self.s0, + l0=self.l0, + b0=self.b0, + sigma0=self.sigma0, + zthresh=self.zthresh, + fc=self.fc, + hstep=self.hstep, + smooth=self.smooth, + ) # update state self.yhat0 = yhat0 self.s0 = s0 @@ -253,8 +282,9 @@ class SqDistAlgorithm(Algorithm): self.last_observatory = trace.stats.station self.last_channel = trace.stats.channel self.last_delta = trace.stats.delta - self.next_starttime = trace.stats.starttime + \ - (trace.stats.delta * trace.stats.npts) + self.next_starttime = trace.stats.starttime + ( + trace.stats.delta * trace.stats.npts + ) self.save_state() # create updated traces channel = trace.stats.channel @@ -270,16 +300,31 @@ class SqDistAlgorithm(Algorithm): # if the data array is longer than expected, as when self.fc is non- # zero; HOWEVER, if self.hstep is non-zero, both stats.starttime and # stats.endtime must be adjusted to accomadate the time-shift. - out += self.create_trace(channel + '_Dist', trace.stats, dist) - out += self.create_trace(channel + '_SQ', trace.stats, sq) - out += self.create_trace(channel + '_SV', trace.stats, sv) - out += self.create_trace(channel + '_Sigma', trace.stats, sigmahat) + out += self.create_trace(channel + "_Dist", trace.stats, dist) + out += self.create_trace(channel + "_SQ", trace.stats, sq) + out += self.create_trace(channel + "_SV", trace.stats, sv) + out += self.create_trace(channel + "_Sigma", trace.stats, sigmahat) return out @classmethod - def additive(cls, yobs, m, alpha, beta, gamma, phi=1, - yhat0=None, s0=None, l0=None, b0=None, sigma0=None, - zthresh=6, fc=0, hstep=0, smooth=1): + def additive( + cls, + yobs, + m, + alpha, + beta, + gamma, + phi=1, + yhat0=None, + s0=None, + l0=None, + b0=None, + sigma0=None, + zthresh=6, + fc=0, + hstep=0, + smooth=1, + ): """Primary function for Holt-Winters smoothing/forecasting with damped linear trend and additive seasonal component. @@ -360,19 +405,19 @@ class SqDistAlgorithm(Algorithm): """ if alpha is None: - raise AlgorithmException('alpha is required') + raise AlgorithmException("alpha is required") if beta is None: - raise AlgorithmException('beta is required') + raise AlgorithmException("beta is required") if gamma is None: - raise AlgorithmException('gamma is required') + raise AlgorithmException("gamma is required") if phi is None: - raise AlgorithmException('phi is required') + raise AlgorithmException("phi is required") # set some default values if l0 is None: - l = np.nanmean(yobs[0:int(m)]) + l = np.nanmean(yobs[0 : int(m)]) if np.isnan(l): - l = 0. + l = 0.0 else: l = l0 if not np.isscalar(l0): @@ -404,8 +449,7 @@ class SqDistAlgorithm(Algorithm): else: sigma = list(sigma0) if len(sigma) != (hstep + 1): - raise AlgorithmException( - "sigma0 must have length %d" % (hstep + 1)) + raise AlgorithmException("sigma0 must have length %d" % (hstep + 1)) # generate a vector of weights that will "smooth" seasonal # variations locally...the quotes are because what we really @@ -416,16 +460,16 @@ class SqDistAlgorithm(Algorithm): # smooth parameter should specify the required cut-off period in terms # of discrete samples; for now, we generate a Gaussian filter according # to White et al. (USGS SIR 2014-5045). - fom = 10**(-3 / 20.) # halve power at corner frequency + fom = 10 ** (-3 / 20.0) # halve power at corner frequency omg = np.pi / np.float64(smooth) # corner angular frequency - sig = np.sqrt(-2 * np.log(fom) / omg**2) + np.finfo(float).eps # sig>0 - ts = np.linspace(np.max((-m, -3 * np.round(sig))), - np.min((m, 3 * np.round(sig))), - np.int(np.round( - np.min((2 * m, 6 * np.round(sig))) + 1 - ))) + sig = np.sqrt(-2 * np.log(fom) / omg ** 2) + np.finfo(float).eps # sig>0 + ts = np.linspace( + np.max((-m, -3 * np.round(sig))), + np.min((m, 3 * np.round(sig))), + np.int(np.round(np.min((2 * m, 6 * np.round(sig))) + 1)), + ) nts = ts.size - weights = np.exp(-0.5 * (ts / sig)**2) + weights = np.exp(-0.5 * (ts / sig) ** 2) weights = weights / np.sum(weights) # @@ -443,8 +487,11 @@ class SqDistAlgorithm(Algorithm): phiHminus1 = 0 for h in range(1, hstep): phiHminus1 = phiHminus1 + phi ** (h - 1) - sumc2_H = sumc2_H + (alpha * (1 + phiHminus1 * beta) + - gamma * (1 if (h % m == 0) else 0)) ** 2 + sumc2_H = ( + sumc2_H + + (alpha * (1 + phiHminus1 * beta) + gamma * (1 if (h % m == 0) else 0)) + ** 2 + ) phiJminus1 = phiHminus1 sumc2 = sumc2_H jstep = hstep @@ -475,7 +522,7 @@ class SqDistAlgorithm(Algorithm): # fc>0, so simulate beyond last input et = np.nan - if (np.isnan(et) or np.abs(et) > zthresh * sigma[i]): + if np.isnan(et) or np.abs(et) > zthresh * sigma[i]: # forecast (i.e., update l, b, and s assuming et==0) # no change in seasonal adjustments @@ -492,8 +539,14 @@ class SqDistAlgorithm(Algorithm): # valid observation phiJminus1 = phiJminus1 + phi ** jstep jstep = jstep + 1 - sumc2 = sumc2 + (alpha * (1 + phiJminus1 * beta) + - gamma * (1 if (jstep % m == 0) else 0)) ** 2 + sumc2 = ( + sumc2 + + ( + alpha * (1 + phiJminus1 * beta) + + gamma * (1 if (jstep % m == 0) else 0) + ) + ** 2 + ) else: # still update sigma using et when et > zthresh * sigma @@ -512,12 +565,14 @@ class SqDistAlgorithm(Algorithm): # distribute error correction across range of seasonal # corrections according to weights calculated above s[i + m] = s[i] + gamma * (1 - alpha) * et * weights[nts // 2] - s[i + m - nts // 2:i + m] = (s[i + m - nts // 2:i + m] + - gamma * (1 - alpha) * et * - weights[:nts // 2]) - s[i + 1:i + nts // 2 + 1] = (s[i + 1:i + nts // 2 + 1] + - gamma * (1 - alpha) * et * - weights[nts // 2 + 1:]) + s[i + m - nts // 2 : i + m] = ( + s[i + m - nts // 2 : i + m] + + gamma * (1 - alpha) * et * weights[: nts // 2] + ) + s[i + 1 : i + nts // 2 + 1] = ( + s[i + 1 : i + nts // 2 + 1] + + gamma * (1 - alpha) * et * weights[nts // 2 + 1 :] + ) # update l and b using equation-error formulation l = l + phi * b + alpha * et @@ -533,11 +588,11 @@ class SqDistAlgorithm(Algorithm): # freeze state with last input for reinitialization if i == (len(yobs) - 1): - yhat0 = yhat[len(yobs):(len(yobs) + hstep)].copy() - s0 = s[len(yobs):(len(yobs) + m)].copy() - r[i + 1] + yhat0 = yhat[len(yobs) : (len(yobs) + hstep)].copy() + s0 = s[len(yobs) : (len(yobs) + m)].copy() - r[i + 1] l0 = l + r[i + 1] b0 = b - sigma0 = sigma[len(yobs):(len(yobs) + hstep + 1)].copy() + sigma0 = sigma[len(yobs) : (len(yobs) + hstep + 1)].copy() # endfor i in range(len(yobs) + fc) @@ -545,20 +600,38 @@ class SqDistAlgorithm(Algorithm): l = l + r[-1] s = np.array(s) - np.hstack((r, np.tile(r[-1], m - 1))) - return (yhat[:len(yobs) + fc], - s[:len(yobs) + fc], - sigma[1:len(yobs) + fc + 1], - yhat0, - s0, - l0, - b0, - sigma0) + return ( + yhat[: len(yobs) + fc], + s[: len(yobs) + fc], + sigma[1 : len(yobs) + fc + 1], + yhat0, + s0, + l0, + b0, + sigma0, + ) @classmethod - def estimate_parameters(cls, yobs, m, alpha=None, beta=None, gamma=None, - phi=1, yhat0=None, s0=None, l0=None, b0=None, sigma0=None, - zthresh=6, fc=0, hstep=0, - alpha0=0.3, beta0=0.1, gamma0=0.1): + def estimate_parameters( + cls, + yobs, + m, + alpha=None, + beta=None, + gamma=None, + phi=1, + yhat0=None, + s0=None, + l0=None, + b0=None, + sigma0=None, + zthresh=6, + fc=0, + hstep=0, + alpha0=0.3, + beta0=0.1, + gamma0=0.1, + ): """Estimate alpha, beta, and gamma parameters based on observed data. Parameters @@ -626,13 +699,15 @@ class SqDistAlgorithm(Algorithm): boundaries = [ (alpha, alpha) if alpha is not None else (0, 1), (beta, beta) if beta is not None else (0, 1), - (gamma, gamma) if gamma is not None else (0, 1) + (gamma, gamma) if gamma is not None else (0, 1), ] - initial_values = np.array([ - alpha if alpha is not None else alpha0, - beta if beta is not None else beta0, - gamma if gamma is not None else gamma0 - ]) + initial_values = np.array( + [ + alpha if alpha is not None else alpha0, + beta if beta is not None else beta0, + gamma if gamma is not None else gamma0, + ] + ) def func(params, *args): """Function that computes root-mean-squared-error based on current @@ -647,15 +722,24 @@ class SqDistAlgorithm(Algorithm): alpha, beta, gamma = params # call Holt-Winters with additive seasonality yhat, _, _, _, _, _, _, _ = cls.additive( - yobs, m, - alpha=alpha, beta=beta, gamma=gamma, l0=l0, b0=b0, s0=s0, - zthresh=zthresh, hstep=hstep) + yobs, + m, + alpha=alpha, + beta=beta, + gamma=gamma, + l0=l0, + b0=b0, + s0=s0, + zthresh=zthresh, + hstep=hstep, + ) # compute root-mean-squared-error of predictions error = np.sqrt(np.nanmean(np.square(np.subtract(yobs, yhat)))) return error - parameters = fmin_l_bfgs_b(func, x0=initial_values, args=(), - bounds=boundaries, approx_grad=True) + parameters = fmin_l_bfgs_b( + func, x0=initial_values, args=(), bounds=boundaries, approx_grad=True + ) alpha, beta, gamma = parameters[0] rmse = parameters[1] return (alpha, beta, gamma, rmse) @@ -669,40 +753,47 @@ class SqDistAlgorithm(Algorithm): parser: ArgumentParser command line argument parser """ - parser.add_argument('--sqdist-alpha', - # default=1.0 / 1440.0 / 30, - default=0, - help='Smoothing parameter for secular variation', - type=float) - parser.add_argument('--sqdist-beta', - default=0, - help='Smoothing parameter for slope', - type=float) - parser.add_argument('--sqdist-gamma', - # default=1.0 / 30, - default=0, - help='Smoothing parameter for solar quiet', - type=float) - parser.add_argument('--sqdist-m', - # default=1440, - default=1, - help='SqDist m parameter', - type=int) - parser.add_argument('--sqdist-mag', - action='store_true', - default=False, - help='Generate sqdist based on magnetic H component') - parser.add_argument('--sqdist-statefile', - default=None, - help='File to store state between calls to algorithm') - parser.add_argument('--sqdist-zthresh', - default=6, - help='Set Z-score threshold', - type=float) - parser.add_argument('--sqdist-smooth', - default=1, - help='Local SQ smoothing parameter', - type=int) + parser.add_argument( + "--sqdist-alpha", + # default=1.0 / 1440.0 / 30, + default=0, + help="Smoothing parameter for secular variation", + type=float, + ) + parser.add_argument( + "--sqdist-beta", default=0, help="Smoothing parameter for slope", type=float + ) + parser.add_argument( + "--sqdist-gamma", + # default=1.0 / 30, + default=0, + help="Smoothing parameter for solar quiet", + type=float, + ) + parser.add_argument( + "--sqdist-m", + # default=1440, + default=1, + help="SqDist m parameter", + type=int, + ) + parser.add_argument( + "--sqdist-mag", + action="store_true", + default=False, + help="Generate sqdist based on magnetic H component", + ) + parser.add_argument( + "--sqdist-statefile", + default=None, + help="File to store state between calls to algorithm", + ) + parser.add_argument( + "--sqdist-zthresh", default=6, help="Set Z-score threshold", type=float + ) + parser.add_argument( + "--sqdist-smooth", default=1, help="Local SQ smoothing parameter", type=int + ) def configure(self, arguments): """Configure algorithm using comand line arguments.