Skip to content
Snippets Groups Projects
AdjustedMatrix.py 4.17 KiB
Newer Older
  • Learn to ignore specific revisions
  • from obspy import Stream, UTCDateTime
    
    from pydantic import BaseModel
    
    from typing import Any, List, Optional
    
    from ..residual.Reading import Reading, get_absolutes_xyz, get_ordinates
    
    from .. import ChannelConverter
    
    from .. import pydantic_utcdatetime
    
    
    class AdjustedMatrix(BaseModel):
    
        """Attributes pertaining to adjusted(affine) matrices, applied by the AdjustedAlgorithm
    
        Attributes
        ----------
        matrix: affine matrix generated by Affine's calculate method
        pier_correction: pier correction generated by Affine's calculate method
        starttime: beginning of interval that matrix is valid for
        endtime: end of interval that matrix is valid for
        NOTE: valid intervals are only generated when bad data is encountered.
        Matrix is non-constrained otherwise
        """
    
    
        matrix: Optional[List[List[float]]] = None
    
        metrics: Optional[List[Metric]] = None
    
        starttime: Optional[UTCDateTime] = None
        endtime: Optional[UTCDateTime] = None
    
        def process(
            self,
            stream: Stream,
    
            inchannels=None,
            outchannels=None,
    
            """Apply matrix to raw data. Apply pier correction to F when necessary.
    
            If inchannels is specified, look for inchannels in stream and fail if
            all are not present. If outchannels is specified, return outchannels.
            Both inchannels and outchannels must be consistent with self.matrix.
    
            If inchannels is not specified, default to the first n-1 non-F channels
            in stream, where n is the dimension of self.matrix. If outchannels is
            not specified, outchannels will match inchannels.
    
            NOTE: if non-F inchannels and outchannels are not compatible with
                  self.matrix, return NaNs for non-F outchannels
            NOTE: if F is not in both inchannels and outchannels, return NaNs for F
            """
            inchannels = inchannels or [trace.stats.channel for trace in stream]
            outchannels = outchannels or inchannels
    
            # new in/outchannels without "F"
            inchannels_noF = [c for c in inchannels if c != "F"]
            outchannels_noF = [c for c in outchannels if c != "F"]
    
    
            raws = np.vstack(
    
                [stream.select(channel=channel)[0].data for channel in inchannels_noF]
    
                + [np.ones_like(stream[0].data)]
            )
    
            if (
                len(inchannels_noF) == len(outchannels_noF)
                and len(inchannels_noF) == len(self.matrix) - 1
            ):
                # matrix multiplication
                adjusted = self.matrix @ raws
            else:
                # return NaNs if non-F inchannels or outchannels are not
                # compatible with self.matrix
                adjusted = np.full((len(self.matrix), raws.shape[1]), np.nan)
    
            if "F" in inchannels and "F" in outchannels:
    
                # return F only if specified in both inchannels and outchannels
    
                f = stream.select(channel="F")[0].data + self.pier_correction
                adjusted[-1] = f
    
            else:
                adjusted[-1] = np.nan
    
        def get_metrics(self, readings: List[Reading]) -> List[Metric]:
    
            """Computes mean absolute error and standard deviation between expected and predicted values
            Metrics are computed for X, Y, Z, and dF values
    
            readings: list of valid readings
    
            matrix: composed matrix
    
            Outputs
            -------
            metrics: list of Metric objects
            """
    
            absolutes = get_absolutes_xyz(readings=readings)
            ordinates = get_ordinates(readings=readings)
            stacked_ordinates = np.vstack((ordinates, np.ones_like(ordinates[0])))
            predicted = self.matrix @ stacked_ordinates
    
            metrics = []
            elements = ["X", "Y", "Z", "dF"]
    
            expected = list(absolutes) + [
                ChannelConverter.get_computed_f_using_squares(*absolutes)
            ]
            predicted = list(predicted[0:3]) + [
                ChannelConverter.get_computed_f_using_squares(*predicted[0:3])
            ]
            return [
                get_metric(element=elements[i], expected=expected[i], actual=predicted[i])
                for i in range(len(elements))
            ]