Skip to content
Snippets Groups Projects
FDSNFactory.py 13.4 KiB
Newer Older
from __future__ import absolute_import
from datetime import datetime
from typing import List, Optional, Tuple

import numpy
import sys
import numpy.ma
from obspy import Stream, UTCDateTime, Trace, read, read_inventory
from obspy.clients.fdsn.header import FDSNNoDataException
from obspy.clients.iris import Client
from obspy.clients.fdsn import Client as FDSNClient

from .. import ChannelConverter, TimeseriesUtility
from ..geomag_types import DataInterval, DataType
from ..TimeseriesFactory import TimeseriesFactory
from ..TimeseriesFactoryException import TimeseriesFactoryException
from ..ObservatoryMetadata import ObservatoryMetadata
from ..VariometerMetadata import VariometerMetadata
from .RawInputClient import RawInputClient
from .FDSNSNCL import FDSNSNCL


class FDSNFactory(TimeseriesFactory):
    """TimeseriesFactory for Edge related data.

    Parameters
    ----------
    host: str
        a string representing the IP number of the host to connect to.
    port: integer
        the port number the waveserver is listening on.
    write_port: integer
        the port number the client is writing to.
    cwbport: int
        the port number of the cwb host to connect to.
    tag: str
        A tag used by edge to log and associate a socket with a given data
        source
    forceout: bool
        Tells edge to forceout a packet to miniseed.  Generally used when
        the user knows no more data is coming.
    observatory: str
        the observatory code for the desired observatory.
    channels: array
        an array of channels {H, D, E, F, Z, MGD, MSD, HGD}.
        Known since channel names are mapped based on interval and type,
        others are passed through, see #_get_edge_channel().
    type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
        data type
    interval: {'second', 'minute', 'hour', 'day'}
        data interval
    observatoryMetadata: ObservatoryMetadata object
        an ObservatoryMetadata object used to replace the default
        ObservatoryMetadata.
    locationCode: str
        the location code for the given edge server, overrides type
        in get_timeseries/put_timeseries
    cwbhost: str
        a string represeting the IP number of the cwb host to connect to.

    See Also
    --------
    TimeseriesFactory

    Notes
    -----
    This is designed to read from any earthworm style waveserver, but it
        currently only writes to an edge. Edge mimics an earthworm style
        waveserver close enough that we hope to maintain that compatibility
        for reading.
    """

    def __init__(
        self,
        tag: str = "GeomagAlg",
        forceout: bool = False,
        observatory: Optional[str] = None,
        network: str = "NT",
        channels: Optional[List[str]] = None,
        type: Optional[DataType] = None,
        interval: Optional[DataInterval] = None,
        variometerMetadata: Optional[VariometerMetadata] = None,
        locationCode: Optional[str] = None,
    ):
        TimeseriesFactory.__init__(self, observatory, channels, type, interval)
        self.tag = tag
        self.forceout = forceout
        self.interval = interval
        self.observatoryMetadata = variometerMetadata or VariometerMetadata()
        self.network = network
        self.locationCode = locationCode
        if snclMapper == "geomag":
            self.get_sncl = FDSNSNCL.get_sncl
        else:
            raise TimeseriesFactoryException("Unrecognized SNCL mapper")

    def get_timeseries(
        self,
        starttime: UTCDateTime,
        endtime: UTCDateTime,
        observatory: Optional[str] = None,
        channels: Optional[List[str]] = None,
        type: Optional[DataType] = None,
        interval: Optional[DataInterval] = None,
Wilbur, Spencer Franklin's avatar
Wilbur, Spencer Franklin committed
        add_empty_channels: bool = True,
    ) -> Stream:
        """Get timeseries data

        Parameters
        ----------
        starttime: UTCDateTime
            time of first sample.
        endtime: UTCDateTime
            time of last sample.
        observatory: str
            observatory code.
        channels: array
            list of channels to load
        type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
            data type
        interval: {'second', 'minute', 'hour', 'day'}
            data interval
        add_empty_channels: bool
            if True, returns channels without data as empty traces

        Returns
        -------
        timeseries: Stream
            timeseries object with requested data

        Raises
        ------
        TimeseriesFactoryException
            if invalid values are requested, or errors occur while
            retrieving timeseries.
        """
        observatory = observatory or self.observatory
        channels = channels or self.channels
        type = type or self.type
        interval = interval or self.interval

        if starttime > endtime:
            raise TimeseriesFactoryException(
                'Starttime before endtime "%s" "%s"' % (starttime, endtime)
            )
        # I do not entirely understand what the pupose of the sys.stdout was tryinig to accomplish
        # so I left it out but I think this may be worth a discussion. Apart from this I removed a lot of
        # post processing functionality that dealt with removing NaN values from the timeseries. My understadning is that
        # the IRIS DMC will return empty data but not NaN values.
        ######################

        # # obspy factories sometimes write to stdout, instead of stderr
        # original_stdout = sys.stdout
        # try:
        #     # send stdout to stderr
        #     sys.stdout = sys.stderr
        #     # get the timeseries
        timeseries = Stream()
            data = self._get_timeseries(
                starttime,
                endtime,
                observatory,
                channel,
                type,
                interval,
                add_empty_channels,
            if len(data) == 0:
                continue
            timeseries += data
        # finally:
        #     # restore stdout
        #     sys.stdout = original_stdout
        self._post_process(timeseries, starttime, endtime, channels)

    def _get_timeseries(
        self,
        starttime: UTCDateTime,
        endtime: UTCDateTime,
        observatory: str,
        channel: str,
        type: DataType,
        interval: DataInterval,
        add_empty_channels: bool = True,
    ) -> Trace:
        """get timeseries data for a single channel.

        Parameters
        ----------
        starttime: UTCDateTime
            the starttime of the requested data
        endtime: UTCDateTime
            the endtime of the requested data
        observatory: str
            observatory code
        channel: str
            single character channel {H, E, D, Z, F}
        type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
            data type
        interval: {'second', 'minute', 'hour', 'day'}
            data interval
        add_empty_channels: bool
            if True, returns channels without data as empty traces

        Returns
        -------
        data: Trace
            timeseries trace of the requested channel data
        """
            station=observatory,
            data_type=type,
            interval=interval,
            element=channel,
            network=self.network,
            location=self.locationCode,
        )
        # geomag-algorithms *should* treat starttime/endtime as inclusive everywhere;
        # according to its author, EdgeCWB is inclusive of starttime, but exclusive of
        # endtime, to satisfy seismic standards/requirements, to precision delta/2;
        half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2

        # Rotate the trace into a right handed coordinate frame.
        # This will worrk assuming the metadata is reported correctly.

        # Channel that require rotations
        channel_rotations = ["X", "Y", "Z"]

        try:
            data = self.Client.get_waveforms(
                network=sncl.network,
                station=sncl.station,
                location=sncl.location,
                channel=sncl.channel,
                starttime=starttime,
                endtime=endtime + half_delta,
                attach_response=True,
            )

        if channel in channel_rotations:
            data = self._rotate_and_return_requested_channel(
                data, sncl, starttime, endtime, channel
            )
        if data.count() == 0 and add_empty_channels:
            data += self._get_empty_trace(
                starttime=starttime,
                endtime=endtime,
                observatory=observatory,
                channel=channel,
                data_type=type,
                interval=interval,
                network=sncl.network,
                location=sncl.location,
            )
        if data.count() != 0:
            TimeseriesUtility.pad_and_trim_trace(
                trace=data[0], starttime=starttime, endtime=endtime
            )
        self._set_metadata(data, observatory, channel, type, interval)
Wilbur, Spencer Franklin's avatar
Wilbur, Spencer Franklin committed

        return data

    def _post_process(
        self,
        timeseries: Stream,
        starttime: UTCDateTime,
        endtime: UTCDateTime,
        channels: List[str],
    ):
        """Post process a timeseries stream after the raw data is
                is fetched from querymom. Specifically changes
                any MaskedArray to a ndarray with nans representing gaps.
                Then calls pad_timeseries to deal with gaps at the
                beggining or end of the streams.

        Parameters
        ----------
        timeseries: Stream
            The timeseries stream as returned by the call to get_waveforms
        starttime: UTCDateTime
            the starttime of the requested data
        endtime: UTCDateTime
            the endtime of the requested data
        channels: array
            list of channels to load

        Notes: the original timeseries object is changed.
        """

        TimeseriesUtility.pad_timeseries(timeseries, starttime, endtime)
        disclaimer_texts = [
            "DISCLAIMER",
            "This is provisional data. Any data secured from the USGS database that are identified as provisional have not received Director's approval and are subject to revision. Source and Authority: U.S. Geological Survey Manual, Section 500.24",
        ]

            if "comments" not in trace.stats:
                trace.stats.comments = []
            # Check if the disclaimer is already in the comments to avoid duplication
            if not any("DISCLAIMER" in comment for comment in trace.stats.comments):
                trace.stats.comments.extend(disclaimer_texts)
    def _set_metadata(
        self,
        stream: Stream,
        observatory: str,
        channel: str,
    ):
        """set metadata for a given stream/channel
        Parameters
        ----------
        observatory: str
            observatory code
        channel: str
            edge channel code {MVH, MVE, MVD, ...}
        type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
            data type
        interval: {'second', 'minute', 'hour', 'day'}
            data interval
        """
        for trace in stream:
            self.observatoryMetadata.set_metadata(
                trace.stats, observatory, channel, type, interval
            )

        self,
        data: Stream,
        sncl,
        starttime: UTCDateTime,
        endtime: UTCDateTime,
        requested_channel: str,
        Retrieves the necessary *F channels, rotates them to ZNE, and returns the requested channel.
        Channels are mapped as follows: 'X' -> '*F2', 'Y' -> '*F1', 'Z' -> '*FZ'.
        # Initialize FDSN client and get the necessary data
        FDSN = self.Client
        # Determine if the requested channel is X, Y, or Z
        inv = FDSN.get_stations(
            network=sncl.network,
            station=sncl.station,
            location=sncl.location,  # Use location if necessary
            channel=sncl.channel,  # Request *F1, *F2, and *FZ
            starttime=starttime,
            endtime=endtime,
            level="response",
        )
        # Rotate the stream to ZNE
        data.rotate(method="->ZNE", inventory=inv)

        # After rotation, extract the correct channel based on requested_channel
        if requested_channel == "X":
            selected_channel = "*FN"
        elif requested_channel == "Y":
            selected_channel = "*FE"
        elif requested_channel == "Z":
            selected_channel = "*FZ"
        else:
            raise ValueError(f"Invalid channel {requested_channel}")
        # Filter the rotated stream to return only the trace for the selected channel
        rotated_stream = data.select(channel=selected_channel)
        # If no data found for the selected channel, return an empty stream
        if len(rotated_stream) == 0:
Wilbur, Spencer Franklin's avatar
Wilbur, Spencer Franklin committed

        return rotated_stream