Skip to content
Snippets Groups Projects
AdjustedMatrix.py 3.06 KiB
Newer Older
  • Learn to ignore specific revisions
  • from obspy import Stream, UTCDateTime
    
    from pydantic import BaseModel
    
    from typing import Any, List, Optional
    
    from .. import ChannelConverter
    
    from .. import pydantic_utcdatetime
    
    from ..residual.Reading import Reading, get_absolutes_xyz, get_ordinates
    
    from .Metric import Metric
    
    
    class AdjustedMatrix(BaseModel):
    
        """Attributes pertaining to adjusted(affine) matrices, applied by the AdjustedAlgorithm
    
        Attributes
        ----------
        matrix: affine matrix generated by Affine's calculate method
        pier_correction: pier correction generated by Affine's calculate method
        starttime: beginning of interval that matrix is valid for
        endtime: end of interval that matrix is valid for
        NOTE: valid intervals are only generated when bad data is encountered.
        Matrix is non-constrained otherwise
        """
    
    
        matrix: Optional[Any] = None
        pier_correction: Optional[float] = None
    
        metrics: Optional[List[Metric]] = None
    
        starttime: Optional[UTCDateTime] = None
        endtime: Optional[UTCDateTime] = None
    
        def process(
            self,
            stream: Stream,
            inchannels=["H", "E", "Z", "F"],
            outchannels=["X", "Y", "Z", "F"],
        ):
    
            """ Apply matrix to raw data. Apply pier correction to F when necessary """
    
            raws = np.vstack(
                [
                    stream.select(channel=channel)[0].data
                    for channel in inchannels
                    if channel != "F"
                ]
                + [np.ones_like(stream[0].data)]
            )
            adjusted = self.matrix @ raws
            if "F" in inchannels and "F" in outchannels:
                f = stream.select(channel="F")[0].data + self.pier_correction
                adjusted[-1] = f
    
        def get_metrics(self, readings: List[Reading]) -> List[Metric]:
    
            """Computes mean absolute error and standard deviation for X, Y, Z, and dF between expected and predicted values.
    
            Attributes
            ----------
    
            readings: list of Readings
    
            matrix: composed matrix
    
            Outputs
            -------
            metrics: list of Metric objects
            """
    
            absolutes = get_absolutes_xyz(readings=readings)
            ordinates = get_ordinates(readings=readings)
            stacked_ordinates = np.vstack((ordinates, np.ones_like(ordinates[0])))
            predicted = self.matrix @ stacked_ordinates
    
            metrics = []
            elements = ["X", "Y", "Z", "dF"]
    
            expected = np.vstack(
                (
                    absolutes,
                    ChannelConverter.get_computed_f_using_squares(*absolutes),
                )
    
            predicted = np.vstack(
                (
                    predicted[0:3],
                    ChannelConverter.get_computed_f_using_squares(*predicted[0:3]),
                )
    
            for i in range(len(elements)):
    
                diff = expected[i] - predicted[i]
                metrics.append(
                    Metric(
                        element=elements[i],
                        absmean=abs(np.nanmean(diff)),
                        stddev=np.std(diff),
                    )
                )