diff --git a/geomagio/adjusted/AdjustedMatrix.py b/geomagio/adjusted/AdjustedMatrix.py index 759067f86668df11f19d772db6668735c991facd..ee8b9c854de574084a9072e6d8db544e731e0003 100644 --- a/geomagio/adjusted/AdjustedMatrix.py +++ b/geomagio/adjusted/AdjustedMatrix.py @@ -1,9 +1,21 @@ -from pydantic import BaseModel -from typing import List, Optional, Any from obspy import UTCDateTime +from pydantic import BaseModel +from typing import Optional, Any class AdjustedMatrix(BaseModel): + """Attributes pertaining to adjusted(affine) matrices, applied by the AdjustedAlgorithm + + Attributes + ---------- + matrix: affine matrix generated by Affine's calculate method + pier_correction: pier correction generated by Affine's calculate method + starttime: beginning of interval that matrix is valid for + endtime: end of interval that matrix is valid for + NOTE: valid intervals are only generated when bad data is encountered. + Matrix is non-constrained otherwise + """ + matrix: Any pier_correction: float starttime: Optional[UTCDateTime] = None diff --git a/geomagio/adjusted/Affine.py b/geomagio/adjusted/Affine.py index de91b2b5507dbfc63ef9b6551929d6c7c05c381e..4de30c94ae52d31383106eed2bcf08d208842af5 100644 --- a/geomagio/adjusted/Affine.py +++ b/geomagio/adjusted/Affine.py @@ -1,16 +1,23 @@ -import numpy as np from functools import reduce +import numpy as np from obspy import UTCDateTime from pydantic import BaseModel -from .. import pydantic_utcdatetime -from typing import List, Any, Optional, Type +from typing import List, Optional, Tuple -from .Transform import Transform, TranslateOrigins, RotationTranslationXY from .AdjustedMatrix import AdjustedMatrix from ..residual import Reading +from .Transform import Transform, TranslateOrigins, RotationTranslationXY def weighted_quartile(data: List[float], weights: List[float], quant: float) -> float: + """Get weighted quartile to determine statistically good/bad data + + Attributes + ---------- + data: filtered array of observations + weights: array of vector distances/metrics + quant: statistical percentile of input data + """ # sort data and weights ind_sorted = np.argsort(data) sorted_data = data[ind_sorted] @@ -39,18 +46,17 @@ def filter_iqr( should use it instead. Inputs: - series - 1D NumPy array of observations to filter + series: array of observations to filter Options: - threshold - threshold in fractional number of 25%-50% (50%-75%) - quantile ranges below (above) the median each element of - series may fall and still be considered "good" - (default = 6) - weights - weights to assign to each element of series - (default = 1) + threshold: threshold in fractional number of 25%-50% (50%-75%) + quantile ranges below (above) the median each element of + series may fall and still be considered "good" + Default set to 6. + weights: weights to assign to each element of series. Default set to 1. Output: - good - Boolean array where True values correspond to "good" data + good: Boolean array where True values correspond to "good" data """ @@ -85,6 +91,18 @@ def filter_iqr( class Affine(BaseModel): + """Creates AdjustedMatrix objects from readings + + Attributes + ---------- + observatory: 3-letter observatory code + starttime: beginning time for matrix creation + endtime: end time for matrix creation + acausal: when True, utilizes readings from after set endtime + update_interval: window of time a matrix is representative of + transforms: list of methods for matrix calculation + """ + observatory: str = None starttime: UTCDateTime = UTCDateTime() - (86400 * 7) endtime: UTCDateTime = UTCDateTime() @@ -99,6 +117,16 @@ class Affine(BaseModel): arbitrary_types_allowed = True def calculate(self, readings: List[Reading]) -> List[AdjustedMatrix]: + """Calculates affine matrices for a range of times + + Attributes + ---------- + readings: list of readings containing absolutes + + Outputs + ------- + Ms: list of AdjustedMatrix objects created from calculations + """ update_interval = self.update_interval or (self.endtime - self.starttime) all_readings = [r for r in readings if r.valid] Ms = [] @@ -133,7 +161,7 @@ class Affine(BaseModel): epoch_end: float, epochs: List[float], time: UTCDateTime, - ): + ) -> Tuple[float, float]: for e in epochs: if e > time: if epoch_end is None or e < epoch_end: @@ -146,7 +174,10 @@ class Affine(BaseModel): def get_times(self, readings: List[UTCDateTime]): return np.array([reading.get_absolute("H").endtime for reading in readings]) - def get_ordinates(self, readings: List[Reading]): + def get_ordinates( + self, readings: List[Reading] + ) -> Tuple[List[float], List[float], List[float]]: + """Calculates ordinates from absolutes and baselines""" h_abs, d_abs, z_abs = self.get_absolutes(readings) h_bas, d_bas, z_bas = self.get_baselines(readings) @@ -167,20 +198,29 @@ class Affine(BaseModel): z_o = z_ord return (h_o, e_o, z_o) - def get_baselines(self, readings: List[Reading]): + def get_baselines( + self, readings: List[Reading] + ) -> Tuple[List[float], List[float], List[float]]: + """Get H, D and Z baselines""" h_bas = np.array([reading.get_absolute("H").baseline for reading in readings]) d_bas = np.array([reading.get_absolute("D").baseline for reading in readings]) z_bas = np.array([reading.get_absolute("Z").baseline for reading in readings]) return (h_bas, d_bas, z_bas) - def get_absolutes(self, readings: List[Reading]): + def get_absolutes( + self, readings: List[Reading] + ) -> Tuple[List[float], List[float], List[float]]: + """Get H, D and Z absolutes""" h_abs = np.array([reading.get_absolute("H").absolute for reading in readings]) d_abs = np.array([reading.get_absolute("D").absolute for reading in readings]) z_abs = np.array([reading.get_absolute("Z").absolute for reading in readings]) return (h_abs, d_abs, z_abs) - def get_absolutes_xyz(self, readings: List[Reading]): + def get_absolutes_xyz( + self, readings: List[Reading] + ) -> Tuple[List[float], List[float], List[float]]: + """Get X, Y and Z absolutes from H, D and Z baselines""" h_abs, d_abs, z_abs = self.get_absolutes(readings) # convert from cylindrical to Cartesian coordinates @@ -192,6 +232,12 @@ class Affine(BaseModel): def calculate_matrix( self, time: UTCDateTime, readings: List[Reading] ) -> AdjustedMatrix: + """Calculates affine matrix for a given time + + Outputs + ------- + AdjustedMatrix object containing result + """ absolutes = self.get_absolutes_xyz(readings) baselines = self.get_baselines(readings) ordinates = self.get_ordinates(readings) @@ -237,7 +283,7 @@ class Affine(BaseModel): time: UTCDateTime, times: List[UTCDateTime], transform: Transform, - ): + ) -> np.array: weights = transform.get_weights(time=time.timestamp, times=times) # set weights for future observations to zero if not acausal if not self.acausal: diff --git a/geomagio/adjusted/SpreadsheetSummaryFactory.py b/geomagio/adjusted/SpreadsheetSummaryFactory.py index ab3c8c1d341e6d8bd9497676b8b64538db9cdc6a..b16411f12f1fc75053a54920f828800aa470c2e0 100644 --- a/geomagio/adjusted/SpreadsheetSummaryFactory.py +++ b/geomagio/adjusted/SpreadsheetSummaryFactory.py @@ -1,23 +1,32 @@ import os -from typing import List from obspy import UTCDateTime import openpyxl -import numpy as np +from typing import List from ..residual import Absolute, Angle, Reading from ..residual.SpreadsheetAbsolutesFactory import parse_relative_time class SpreadsheetSummaryFactory(object): + """Read absolutes from summary spreadsheets""" + def __init__( - self, base_directory=r"Volumes/geomag/pub/Caldata/Checked Baseline Data" + self, base_directory: str = r"Volumes/geomag/pub/Caldata/Checked Baseline Data" ): self.base_directory = base_directory def get_readings( self, observatory: str, starttime: UTCDateTime, endtime: UTCDateTime - ): + ) -> List[Reading]: + """Gathers readings from factory's base directory + + Attributes + ---------- + observatory: 3-letter observatory code + starttime: beginning date of readings + endtime: end date of readings + """ readings = [] for year in range(starttime.year, endtime.year + 1): for (dirpath, _, filenames) in os.walk(self.base_directory): @@ -43,7 +52,14 @@ class SpreadsheetSummaryFactory(object): readings = self._parse_readings(sheet, path) return readings - def _parse_metadata(self, sheet, observatory) -> dict: + def _parse_metadata(self, sheet: openpyxl.worksheet, observatory: str) -> dict: + """gather metadata from spreadsheet + + Attributes + ---------- + sheet: excel sheet containing residual summary values + observatory: 3-letter observatory code + """ date = sheet["I1"].value date = f"{date.year}{date.month:02}{date.day:02}" return { @@ -54,7 +70,19 @@ class SpreadsheetSummaryFactory(object): "observer": sheet["I10"].value, } - def _parse_readings(self, sheet, path): + def _parse_readings(self, sheet: openpyxl.worksheet, path: str) -> List[Reading]: + """get list of readings from spreadsheet + + Attributes + ---------- + sheet: excel sheet containing residual summary values + path: spreadsheet's filepath + + Outputs + ------- + List of valid readings from spreadsheet. + If all readings are valid, 4 readings are returned + """ metadata = self._parse_metadata(sheet, path.split("/")[-1][0:3]) date = sheet["I1"].value base_date = f"{date.year}{date.month:02}{date.day:02}" diff --git a/geomagio/adjusted/Transform.py b/geomagio/adjusted/Transform.py index 534234f05ce90a16b374a4b648841190e514acec..ad79e5d24a575aa9e41e51f63a1330993207dfc2 100644 --- a/geomagio/adjusted/Transform.py +++ b/geomagio/adjusted/Transform.py @@ -1,11 +1,19 @@ import numpy as np -from typing import List, Tuple, Optional -import scipy.linalg as spl from obspy import UTCDateTime +import scipy.linalg as spl +from typing import List, Tuple, Optional class Transform(object): - def __init__(self, memory=np.inf): + """Method for generating an affine matrix. + + Attributes + ---------- + memory: Controls impacts of measurements from the past. + Defaults to infinite(equal weighting) + """ + + def __init__(self, memory: float = np.inf): self.memory = memory def get_weights(self, times: UTCDateTime, time: int = None) -> List[float]: @@ -13,23 +21,9 @@ class Transform(object): Calculate time-dependent weights according to exponential decay. Inputs: - times - 1D array of times, or any time-like index whose - relative values represent spacing between events - memory - exp(-1) time scale; weights will be ~37% of max - weight when time difference equals memory, and ~5% - of max weight when time difference is 3X memory - - Options: - epoch - time at which weights maximize - (default = max(times)) - - Outout: - weights - an M element array of vector distances/metrics - - NOTE: ObsPy UTCDateTime objects can be passed in times, but - memory must then be specified in seconds - FIXME: Python datetime objects not supported yet - + times: array of times, or any time-like index whose relative values represent spacing between events + Output: + weights: array of vector distances/metrics """ # convert to array of floats @@ -58,18 +52,49 @@ class Transform(object): absolutes: Tuple[List[float], List[float], List[float]], weights: List[float], ) -> np.array: + """Type skeleton inherited by any instance of Transform + + Attributes + ---------- + ordinates: H, E and Z ordinates + absolutes: X, Y and Z absolutes(NOTE: absolutes must be rotated from original H, E and Z values) + weights: time weights to apply during calculations of matrices + """ return class LeastSq(Transform): + """Intance of Transform. Applies least squares to generate matrices""" + def get_stacked_absolutes(self, absolutes): + """Formats absolutes for least squares method + + Attributes + ---------- + absolutes: Rotated X, Y, and Z absolutes + + Output + ------ + X, Y and Z absolutes place end to end and transposed + """ return np.vstack([absolutes[0], absolutes[1], absolutes[2]]).T.ravel() def get_weighted_values( self, values: Tuple[List[float], List[float], List[float]], weights: Optional[List[float]], - ): + ) -> Tuple[List[float], List[float], List[float]]: + """Application of weights for least squares methods, which calls for square rooting of weights + + Attributes + ---------- + values: absolutes or ordinates + + Outputs + ------- + tuple of weights applied to each element of values + + """ if weights is None: return values weights = np.sqrt(weights) @@ -80,15 +105,30 @@ class LeastSq(Transform): ) -class SingularValueDecomposition(Transform): - def get_stacked_values(self, values, weighted_values, ndims=3): +class SVD(Transform): + """Instance of Transform. Applies singular value decomposition to generate matrices""" + + def get_stacked_values(self, values, weighted_values, ndims=3) -> np.array: + """Supports intermediate mathematical steps by differencing and shaping values for SVD + + Attributes + ---------- + values: absolutes or ordinates + weighted_values: absolutes or ordinates with weights applied + ndims: number of rows desired in return array(case specific). Default set to 3 dimensions(XYZ/HEZ) + + Outputs + ------- + Stacked and differenced values from their weighted counterparts + """ return np.vstack([[values[i] - weighted_values[i]] for i in range(ndims)]) def get_weighted_values( self, values: Tuple[List[float], List[float], List[float]], weights: Optional[List[float]], - ): + ) -> Tuple[List[float], List[float], List[float]]: + """Application of weights for SVD methods, which call for weighted averages""" if weights is None: weights = np.ones_like(values[0]) return ( @@ -105,6 +145,7 @@ class NoConstraints(LeastSq): absolutes: Tuple[List[float], List[float], List[float]], weights: List[float], ) -> np.array: + """Calculates affine with no constraints using least squares.""" ordinates = self.get_weighted_values(ordinates, weights) absolutes = self.get_weighted_values(absolutes, weights) # LHS, or dependent variables @@ -158,6 +199,7 @@ class ZRotationShear(LeastSq): absolutes: Tuple[List[float], List[float], List[float]], weights: List[float], ) -> np.array: + """Calculates affine using least squares, constrained to rotate about the Z axis.""" ordinates = self.get_weighted_values(ordinates, weights) absolutes = self.get_weighted_values(absolutes, weights) # LHS, or dependent variables @@ -195,6 +237,9 @@ class ZRotationShear(LeastSq): class ZRotationHscale(LeastSq): + """Calculates affine using least squares, constrained to rotate about the Z axis + and apply uniform horizontal scaling.""" + def calculate( self, ordinates: Tuple[List[float], List[float], List[float]], @@ -238,12 +283,8 @@ class ZRotationHscale(LeastSq): class ZRotationHscaleZbaseline(LeastSq): - def rotate_values(self, values): - return ( - np.sqrt(values[0] ** 2 + values[1] ** 2), - np.arctan2(values[1], values[0]), - values[2], - ) + """Calculates affine using least squares, constrained to rotate about the Z axis, + apply a uniform horizontal scaling, and apply a baseline shift for the Z axis.""" def calculate( self, @@ -288,7 +329,10 @@ class ZRotationHscaleZbaseline(LeastSq): ] -class RotationTranslation3D(SingularValueDecomposition): +class RotationTranslation3D(SVD): + """Calculates affine using using singular value decomposition, + constrained to 3D scaled rotation and translation(no shear)""" + def calculate( self, ordinates: Tuple[List[float], List[float], List[float]], @@ -336,6 +380,8 @@ class RotationTranslation3D(SingularValueDecomposition): class Rescale3D(LeastSq): + """Calculates affine using using least squares, constrained to re-scale each axis""" + def calculate( self, ordinates: Tuple[List[float], List[float], List[float]], @@ -374,11 +420,15 @@ class Rescale3D(LeastSq): class TranslateOrigins(LeastSq): + """Calculates affine using using least squares, constrained to tanslate origins""" + def get_weighted_values( self, values: Tuple[List[float], List[float], List[float]], weights: Optional[List[float]], ): + """Weights are applied after matrix creation steps, + requiring weights to be stacked similar to ordinates and absolutes""" if weights is not None: weights = np.sqrt(weights) weights = np.vstack((weights, weights, weights)).T.ravel() @@ -425,6 +475,8 @@ class TranslateOrigins(LeastSq): class ShearYZ(LeastSq): + """Calculates affine using least squares, constrained to shear y and z, but not x.""" + def calculate( self, ordinates: Tuple[List[float], List[float], List[float]], @@ -463,7 +515,11 @@ class ShearYZ(LeastSq): ] -class RotationTranslationXY(SingularValueDecomposition): +class RotationTranslationXY(SVD): + """Calculates affine using singular value decomposition, + constrained to rotation and translation in XY(no scale or shear), + and only translation in Z""" + def calculate( self, ordinates: Tuple[List[float], List[float], List[float]], @@ -520,12 +576,15 @@ class RotationTranslationXY(SingularValueDecomposition): ] -class QRFactorization(SingularValueDecomposition): +class QRFactorization(SVD): + """Calculates affine using singular value decomposition with QR factorization""" + def get_weighted_values_lstsq( self, values: Tuple[List[float], List[float], List[float]], weights: Optional[List[float]], ): + """ Applies least squares weights in two dimensions(X and Y)""" if weights is None: return values weights = np.sqrt(weights)