diff --git a/geomagio/adjusted/AdjustedMatrix.py b/geomagio/adjusted/AdjustedMatrix.py index d94d83bb7fbfe9612653f29dfed2a0324f00cb1f..38714db9c0b1e0a22deac425b321ba44314edd8c 100644 --- a/geomagio/adjusted/AdjustedMatrix.py +++ b/geomagio/adjusted/AdjustedMatrix.py @@ -1,12 +1,12 @@ import numpy as np -from obspy import UTCDateTime +from obspy import Stream, UTCDateTime from pydantic import BaseModel from typing import Any, List, Optional from .. import ChannelConverter from .. import pydantic_utcdatetime -from .Metric import Metric from ..residual.Reading import Reading, get_absolutes_xyz, get_ordinates +from .Metric import Metric class AdjustedMatrix(BaseModel): @@ -29,15 +29,25 @@ class AdjustedMatrix(BaseModel): endtime: Optional[UTCDateTime] = None time: Optional[UTCDateTime] = None - def process(self, values: List[List[float]], outchannels=["X", "Y", "Z", "F"]): + def process( + self, + stream: Stream, + inchannels=["H", "E", "Z", "F"], + outchannels=["X", "Y", "Z", "F"], + ): """ Apply matrix to raw data. Apply pier correction to F when necessary """ - data = np.vstack([values[0:3]] + [np.ones_like(values[0])]) - adjusted = self.matrix @ data - if "F" in outchannels: - f = values[-1] + self.pier_correction - adjusted = np.vstack([adjusted[0 : len(outchannels) - 1]] + [f]) - else: - adjusted = adjusted[0 : len(outchannels)] + raws = np.vstack( + [ + stream.select(channel=channel)[0].data + for channel in inchannels + if channel != "F" + ] + + [np.ones_like(stream[0].data)] + ) + adjusted = self.matrix @ raws + if "F" in inchannels and "F" in outchannels: + f = stream.select(channel="F")[0].data + self.pier_correction + adjusted[-1] = f return adjusted def get_metrics(self, readings: List[Reading]) -> List[Metric]: diff --git a/geomagio/adjusted/Affine.py b/geomagio/adjusted/Affine.py index 8fecfb8b72e6e18fea9a9f1c19fcf96d5c3711dd..055461d5728ab285abce503df445a30edd123948 100644 --- a/geomagio/adjusted/Affine.py +++ b/geomagio/adjusted/Affine.py @@ -8,11 +8,9 @@ from .. import pydantic_utcdatetime from ..residual.Reading import ( Reading, get_absolutes_xyz, - get_baselines, get_ordinates, ) from .AdjustedMatrix import AdjustedMatrix - from .transform import RotationTranslationXY, TranslateOrigins, Transform @@ -74,7 +72,7 @@ class Affine(BaseModel): or (epoch_end is None or r.time < epoch_end) ] M = self.calculate_matrix(time, readings) - # if readings are trimmed by bad data, mark the vakid interval + # if readings are trimmed by bad data, mark the valid interval if M: M.starttime = epoch_start M.endtime = epoch_end @@ -99,20 +97,16 @@ class Affine(BaseModel): AdjustedMatrix object containing result """ absolutes = get_absolutes_xyz(readings) - baselines = get_baselines(readings) ordinates = get_ordinates(readings) - times = get_times(readings) Ms = [] weights = [] inputs = ordinates for transform in self.transforms: weights = transform.get_weights( + readings=readings, time=time.timestamp, - times=times, ) - # zero out statistically 'bad' baselines - weights = filter_iqrs(multiseries=baselines, weights=weights) # return None if no valid observations if np.sum(weights) == 0: return AdjustedMatrix(time=time) @@ -122,9 +116,8 @@ class Affine(BaseModel): ) # apply latest M matrix to inputs to get intermediate inputs - inputs = np.dot( - M, np.vstack([inputs[0], inputs[1], inputs[2], np.ones_like(inputs[0])]) - )[0:3] + inputs = np.vstack([*inputs, np.ones_like(inputs[0])]) + inputs = np.dot(M, inputs)[0:3] Ms.append(M) # compose affine transform matrices using reverse ordered matrices @@ -140,82 +133,6 @@ class Affine(BaseModel): return matrix -def filter_iqr( - series: List[float], threshold: int = 3.0, weights: List[int] = None -) -> List[int]: - """ - Identify "good" elements in series by calculating potentially weighted - 25%, 50% (median), and 75% quantiles of series, the number of 25%-50% - quantile ranges below, or 50%-75% quantile ranges above each value of - series falls from the median, and finally, setting elements of good to - True that fall within these multiples of quantile ranges. - - NOTE: NumPy has a percentile function, but it does not yet handle - weights. This algorithm was adapted from the PyPI - package wquantiles (https://pypi.org/project/wquantiles/) - - Inputs: - series: array of observations to filter - - Options: - threshold: threshold in fractional number of 25%-50% (50%-75%) - quantile ranges below (above) the median each element of - series may fall and still be considered "good" - Default set to 6. - weights: weights to assign to each element of series. Default set to 1. - - Output: - good: Boolean array where True values correspond to "good" data - - """ - - if weights is None: - weights = np.ones_like(series) - - # initialize good as all True for weights greater than 0 - good = (weights > 0).astype(bool) - if np.size(good) <= 1: - # if a singleton is passed, assume it is "good" - return good - - good_old = ~good - while not (good_old == good).all(): - good_old = good - - wq25 = weighted_quartile(series[good], weights[good], 0.25) - wq50 = weighted_quartile(series[good], weights[good], 0.50) - wq75 = weighted_quartile(series[good], weights[good], 0.75) - - # NOTE: it is necessary to include good on the RHS here - # to prevent oscillation between two equally likely - # "optimal" solutions; this is a common problem with - # expectation maximization algorithms - good = ( - good - & (series >= (wq50 - threshold * (wq50 - wq25))) - & (series <= (wq50 + threshold * (wq75 - wq50))) - ) - - return good - - -def filter_iqrs( - multiseries: List[List[float]], - weights: List[float], - threshold: float = 3.0, -) -> List[float]: - """Filters "bad" weights generated by unreliable readings""" - good = None - for series in multiseries: - filtered = filter_iqr(series, threshold=threshold, weights=weights) - if good is None: - good = filtered - else: - good = good & filtered - - return weights * good - - def get_epochs( epochs: List[float], time: UTCDateTime, @@ -244,27 +161,3 @@ def get_epochs( if epoch_start is None or e > epoch_start: epoch_start = e return epoch_start, epoch_end - - -def get_times(readings: List[UTCDateTime]): - return np.array([reading.get_absolute("H").endtime for reading in readings]) - - -def weighted_quartile(data: List[float], weights: List[float], quant: float) -> float: - """Get weighted quartile to determine statistically good/bad data - - Attributes - ---------- - data: filtered array of observations - weights: array of vector distances/metrics - quant: statistical percentile of input data - """ - # sort data and weights - ind_sorted = np.argsort(data) - sorted_data = data[ind_sorted] - sorted_weights = weights[ind_sorted] - # compute auxiliary arrays - Sn = np.cumsum(sorted_weights) - Pn = (Sn - 0.5 * sorted_weights) / Sn[-1] - # interpolate to weighted quantile - return np.interp(quant, Pn, sorted_data) diff --git a/geomagio/adjusted/Metric.py b/geomagio/adjusted/Metric.py index f02e29791eee9a02a757f4ebdb49ade54ca29ab6..983cb538f5b2dd07fce1699fb5f8cf4dca1ac6cb 100644 --- a/geomagio/adjusted/Metric.py +++ b/geomagio/adjusted/Metric.py @@ -1,4 +1,3 @@ -import numpy as np from pydantic import BaseModel diff --git a/geomagio/adjusted/transform/Transform.py b/geomagio/adjusted/transform/Transform.py index 4d1f0c75bd92c657005f1ceda395930087d6c574..42d6b0210560387e246303fcbf76baa84851a1c3 100644 --- a/geomagio/adjusted/transform/Transform.py +++ b/geomagio/adjusted/transform/Transform.py @@ -3,6 +3,8 @@ from obspy import UTCDateTime from pydantic import BaseModel from typing import List, Optional, Tuple +from ...residual.Reading import Reading, get_baselines, get_times + class Transform(BaseModel): """Method for generating an affine matrix. @@ -34,7 +36,7 @@ class Transform(BaseModel): """ return - def get_weights(self, times: UTCDateTime, time: int = None) -> List[float]: + def get_weights(self, readings: List[Reading], time: int = None) -> List[float]: """ Calculate time-dependent weights according to exponential decay. @@ -44,15 +46,18 @@ class Transform(BaseModel): weights: array of vector distances/metrics """ + times = get_times(readings) # convert to array of floats times = np.asarray(times).astype(float) if time is None: time = float(max(times)) + baselines = get_baselines(readings) + # if memory is actually infinite, return equal weights if np.isinf(self.memory): - return np.ones(times.shape) + return filter_iqrs(multiseries=baselines, weights=np.ones(times.shape)) # initialize weights weights = np.zeros(times.shape) @@ -64,4 +69,102 @@ class Transform(BaseModel): if not self.acausal: weights[times > time] = 0.0 + weights = filter_iqrs(multiseries=baselines, weights=weights) + return weights + + +def filter_iqr( + series: List[float], threshold: int = 3.0, weights: List[int] = None +) -> List[int]: + """ + Identify "good" elements in series by calculating potentially weighted + 25%, 50% (median), and 75% quantiles of series, the number of 25%-50% + quantile ranges below, or 50%-75% quantile ranges above each value of + series falls from the median, and finally, setting elements of good to + True that fall within these multiples of quantile ranges. + + NOTE: NumPy has a percentile function, but it does not yet handle + weights. This algorithm was adapted from the PyPI + package wquantiles (https://pypi.org/project/wquantiles/) + + Inputs: + series: array of observations to filter + + Options: + threshold: threshold in fractional number of 25%-50% (50%-75%) + quantile ranges below (above) the median each element of + series may fall and still be considered "good" + Default set to 6. + weights: weights to assign to each element of series. Default set to 1. + + Output: + good: Boolean array where True values correspond to "good" data + + """ + + if weights is None: + weights = np.ones_like(series) + + # initialize good as all True for weights greater than 0 + good = (weights > 0).astype(bool) + if np.size(good) <= 1: + # if a singleton is passed, assume it is "good" + return good + + good_old = ~good + while not (good_old == good).all(): + good_old = good + + wq25 = weighted_quartile(series[good], weights[good], 0.25) + wq50 = weighted_quartile(series[good], weights[good], 0.50) + wq75 = weighted_quartile(series[good], weights[good], 0.75) + + # NOTE: it is necessary to include good on the RHS here + # to prevent oscillation between two equally likely + # "optimal" solutions; this is a common problem with + # expectation maximization algorithms + good = ( + good + & (series >= (wq50 - threshold * (wq50 - wq25))) + & (series <= (wq50 + threshold * (wq75 - wq50))) + ) + + return good + + +def filter_iqrs( + multiseries: List[List[float]], + weights: List[float], + threshold: float = 3.0, +) -> List[float]: + """Filters "bad" weights generated by unreliable readings""" + good = None + for series in multiseries: + filtered = filter_iqr(series, threshold=threshold, weights=weights) + if good is None: + good = filtered + else: + good = good & filtered + + return weights * good + + +def weighted_quartile(data: List[float], weights: List[float], quant: float) -> float: + """Get weighted quartile to determine statistically good/bad data + + Attributes + ---------- + data: filtered array of observations + weights: array of vector distances/metrics + quant: statistical percentile of input data + """ + # sort data and weights + ind_sorted = np.argsort(data) + sorted_data = data[ind_sorted] + sorted_weights = weights[ind_sorted] + # compute auxiliary arrays + Sn = np.cumsum(sorted_weights) + Pn = (Sn - 0.5 * sorted_weights) / Sn[-1] + # interpolate to weighted quantile + return np.interp(quant, Pn, sorted_data) diff --git a/geomagio/algorithm/AdjustedAlgorithm.py b/geomagio/algorithm/AdjustedAlgorithm.py index 78a38ae25e69be62ababc3044ec162360a3e8af0..ceb87ba52efe7385d46917d54e4f163f363a2556 100644 --- a/geomagio/algorithm/AdjustedAlgorithm.py +++ b/geomagio/algorithm/AdjustedAlgorithm.py @@ -131,10 +131,11 @@ class AdjustedAlgorithm(Algorithm): out = None inchannels = self.get_input_channels() outchannels = self.get_output_channels() - raws = np.vstack( - [stream.select(channel=channel)[0].data for channel in inchannels] + adjusted = self.matrix.process( + stream, + inchannels=inchannels, + outchannels=outchannels, ) - adjusted = self.matrix.process(values=raws, outchannels=outchannels) out = Stream( [ self.create_trace( @@ -142,7 +143,7 @@ class AdjustedAlgorithm(Algorithm): stream.select(channel=inchannels[i])[0].stats, adjusted[i], ) - for i in range(len(adjusted)) + for i in range(len(outchannels)) ] ) return out diff --git a/geomagio/residual/Reading.py b/geomagio/residual/Reading.py index 1802b707747daa687b399dc53c6c68eefa5b5608..c91fcd6cc91e3c365bacdb1f65ae52cbe37a8c79 100644 --- a/geomagio/residual/Reading.py +++ b/geomagio/residual/Reading.py @@ -176,3 +176,7 @@ def get_ordinates( e_ord = h_abs * np.radians(d_ord) h_ord = np.sqrt(h_ord ** 2 - e_ord ** 2) return (h_ord, e_ord, z_ord) + + +def get_times(readings: List[UTCDateTime]): + return np.array([reading.get_absolute("H").endtime for reading in readings])