import numpy as np from obspy import Stream, UTCDateTime from pydantic import BaseModel from typing import Any, List, Optional from ..residual.Reading import Reading, get_absolutes_xyz, get_ordinates from .. import ChannelConverter from .. import pydantic_utcdatetime from .Metric import Metric, get_metric class AdjustedMatrix(BaseModel): """Attributes pertaining to adjusted(affine) matrices, applied by the AdjustedAlgorithm Attributes ---------- matrix: affine matrix generated by Affine's calculate method pier_correction: pier correction generated by Affine's calculate method starttime: beginning of interval that matrix is valid for endtime: end of interval that matrix is valid for NOTE: valid intervals are only generated when bad data is encountered. Matrix is non-constrained otherwise """ matrix: Optional[List[List[float]]] = None pier_correction: float = 0 metrics: Optional[List[Metric]] = None starttime: Optional[UTCDateTime] = None endtime: Optional[UTCDateTime] = None time: Optional[UTCDateTime] = None def process( self, stream: Stream, inchannels=None, outchannels=None, ): """Apply matrix to raw data. Apply pier correction to F when necessary. If inchannels is specified, look for inchannels in stream and fail if all are not present. If outchannels is specified, return outchannels. Both inchannels and outchannels must be consistent with self.matrix. If inchannels is not specified, default to the first n-1 non-F channels in stream, where n is the dimension of self.matrix. If outchannels is not specified, outchannels will match inchannels. NOTE: if non-F inchannels and outchannels are not compatible with self.matrix, return NaNs for non-F outchannels NOTE: if F is not in both inchannels and outchannels, return NaNs for F """ inchannels = inchannels or [trace.stats.channel for trace in stream] outchannels = outchannels or inchannels # new in/outchannels without "F" inchannels_noF = [c for c in inchannels if c != "F"] outchannels_noF = [c for c in outchannels if c != "F"] raws = np.vstack( [stream.select(channel=channel)[0].data for channel in inchannels_noF] + [np.ones_like(stream[0].data)] ) if ( len(inchannels_noF) == len(outchannels_noF) and len(inchannels_noF) == len(self.matrix) - 1 ): # matrix multiplication adjusted = self.matrix @ raws else: # return NaNs if non-F inchannels or outchannels are not # compatible with self.matrix adjusted = np.full((len(self.matrix), raws.shape[1]), np.nan) if "F" in inchannels and "F" in outchannels: # return F only if specified in both inchannels and outchannels f = stream.select(channel="F")[0].data + self.pier_correction adjusted[-1] = f else: adjusted[-1] = np.nan return adjusted def get_metrics(self, readings: List[Reading]) -> List[Metric]: """Computes mean absolute error and standard deviation between expected and predicted values Metrics are computed for X, Y, Z, and dF values Attributes ---------- readings: list of valid readings matrix: composed matrix Outputs ------- metrics: list of Metric objects """ absolutes = get_absolutes_xyz(readings=readings) ordinates = get_ordinates(readings=readings) stacked_ordinates = np.vstack((ordinates, np.ones_like(ordinates[0]))) predicted = self.matrix @ stacked_ordinates metrics = [] elements = ["X", "Y", "Z", "dF"] expected = list(absolutes) + [ ChannelConverter.get_computed_f_using_squares(*absolutes) ] predicted = list(predicted[0:3]) + [ ChannelConverter.get_computed_f_using_squares(*predicted[0:3]) ] return [ get_metric(element=elements[i], expected=expected[i], actual=predicted[i]) for i in range(len(elements)) ]