Skip to content
Snippets Groups Projects
AdjustedMatrix.py 4.17 KiB
Newer Older
from obspy import Stream, UTCDateTime
from pydantic import BaseModel
from typing import Any, List, Optional
from ..residual.Reading import Reading, get_absolutes_xyz, get_ordinates
from .. import ChannelConverter
from .. import pydantic_utcdatetime

class AdjustedMatrix(BaseModel):
    """Attributes pertaining to adjusted(affine) matrices, applied by the AdjustedAlgorithm

    Attributes
    ----------
    matrix: affine matrix generated by Affine's calculate method
    pier_correction: pier correction generated by Affine's calculate method
    starttime: beginning of interval that matrix is valid for
    endtime: end of interval that matrix is valid for
    NOTE: valid intervals are only generated when bad data is encountered.
    Matrix is non-constrained otherwise
    """

    matrix: Optional[List[List[float]]] = None
    metrics: Optional[List[Metric]] = None
    starttime: Optional[UTCDateTime] = None
    endtime: Optional[UTCDateTime] = None
    def process(
        self,
        stream: Stream,
        inchannels=None,
        outchannels=None,
        """Apply matrix to raw data. Apply pier correction to F when necessary.

        If inchannels is specified, look for inchannels in stream and fail if
        all are not present. If outchannels is specified, return outchannels.
        Both inchannels and outchannels must be consistent with self.matrix.

        If inchannels is not specified, default to the first n-1 non-F channels
        in stream, where n is the dimension of self.matrix. If outchannels is
        not specified, outchannels will match inchannels.

        NOTE: if non-F inchannels and outchannels are not compatible with
              self.matrix, return NaNs for non-F outchannels
        NOTE: if F is not in both inchannels and outchannels, return NaNs for F
        """
        inchannels = inchannels or [trace.stats.channel for trace in stream]
        outchannels = outchannels or inchannels

        # new in/outchannels without "F"
        inchannels_noF = [c for c in inchannels if c != "F"]
        outchannels_noF = [c for c in outchannels if c != "F"]

        raws = np.vstack(
            [stream.select(channel=channel)[0].data for channel in inchannels_noF]
            + [np.ones_like(stream[0].data)]
        )
        if (
            len(inchannels_noF) == len(outchannels_noF)
            and len(inchannels_noF) == len(self.matrix) - 1
        ):
            # matrix multiplication
            adjusted = self.matrix @ raws
        else:
            # return NaNs if non-F inchannels or outchannels are not
            # compatible with self.matrix
            adjusted = np.full((len(self.matrix), raws.shape[1]), np.nan)
        if "F" in inchannels and "F" in outchannels:
            # return F only if specified in both inchannels and outchannels
            f = stream.select(channel="F")[0].data + self.pier_correction
            adjusted[-1] = f
        else:
            adjusted[-1] = np.nan
    def get_metrics(self, readings: List[Reading]) -> List[Metric]:
        """Computes mean absolute error and standard deviation between expected and predicted values
        Metrics are computed for X, Y, Z, and dF values
        readings: list of valid readings
        matrix: composed matrix

        Outputs
        -------
        metrics: list of Metric objects
        """
        absolutes = get_absolutes_xyz(readings=readings)
        ordinates = get_ordinates(readings=readings)
        stacked_ordinates = np.vstack((ordinates, np.ones_like(ordinates[0])))
        predicted = self.matrix @ stacked_ordinates
        metrics = []
        elements = ["X", "Y", "Z", "dF"]
        expected = list(absolutes) + [
            ChannelConverter.get_computed_f_using_squares(*absolutes)
        ]
        predicted = list(predicted[0:3]) + [
            ChannelConverter.get_computed_f_using_squares(*predicted[0:3])
        ]
        return [
            get_metric(element=elements[i], expected=expected[i], actual=predicted[i])
            for i in range(len(elements))
        ]