Skip to content
Snippets Groups Projects
FDSNFactory.py 13.4 KiB
Newer Older
  • Learn to ignore specific revisions
  • from __future__ import absolute_import
    from datetime import datetime
    
    from typing import List, Optional, Tuple
    
    
    import numpy
    import sys
    import numpy.ma
    from obspy import Stream, UTCDateTime, Trace, read, read_inventory
    
    from obspy.clients.fdsn.header import FDSNNoDataException
    
    from obspy.clients.iris import Client
    from obspy.clients.fdsn import Client as FDSNClient
    
    from .. import ChannelConverter, TimeseriesUtility
    from ..geomag_types import DataInterval, DataType
    from ..TimeseriesFactory import TimeseriesFactory
    from ..TimeseriesFactoryException import TimeseriesFactoryException
    from ..ObservatoryMetadata import ObservatoryMetadata
    from ..VariometerMetadata import VariometerMetadata
    from .RawInputClient import RawInputClient
    from .FDSNSNCL import FDSNSNCL
    
    
    
    class FDSNFactory(TimeseriesFactory):
        """TimeseriesFactory for Edge related data.
    
        Parameters
        ----------
        host: str
            a string representing the IP number of the host to connect to.
        port: integer
            the port number the waveserver is listening on.
        write_port: integer
            the port number the client is writing to.
        cwbport: int
            the port number of the cwb host to connect to.
        tag: str
            A tag used by edge to log and associate a socket with a given data
            source
        forceout: bool
            Tells edge to forceout a packet to miniseed.  Generally used when
            the user knows no more data is coming.
        observatory: str
            the observatory code for the desired observatory.
        channels: array
            an array of channels {H, D, E, F, Z, MGD, MSD, HGD}.
            Known since channel names are mapped based on interval and type,
            others are passed through, see #_get_edge_channel().
        type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
            data type
        interval: {'second', 'minute', 'hour', 'day'}
            data interval
        observatoryMetadata: ObservatoryMetadata object
            an ObservatoryMetadata object used to replace the default
            ObservatoryMetadata.
        locationCode: str
            the location code for the given edge server, overrides type
            in get_timeseries/put_timeseries
        cwbhost: str
            a string represeting the IP number of the cwb host to connect to.
    
        See Also
        --------
        TimeseriesFactory
    
        Notes
        -----
        This is designed to read from any earthworm style waveserver, but it
            currently only writes to an edge. Edge mimics an earthworm style
            waveserver close enough that we hope to maintain that compatibility
            for reading.
        """
    
        def __init__(
            self,
    
            tag: str = "GeomagAlg",
            forceout: bool = False,
            observatory: Optional[str] = None,
            network: str = "NT",
            channels: Optional[List[str]] = None,
            type: Optional[DataType] = None,
            interval: Optional[DataInterval] = None,
            variometerMetadata: Optional[VariometerMetadata] = None,
            locationCode: Optional[str] = None,
    
        ):
            TimeseriesFactory.__init__(self, observatory, channels, type, interval)
    
            self.tag = tag
            self.forceout = forceout
            self.interval = interval
            self.observatoryMetadata = variometerMetadata or VariometerMetadata()
            self.network = network
            self.locationCode = locationCode
    
            if snclMapper == "geomag":
                self.get_sncl = FDSNSNCL.get_sncl
            else:
                raise TimeseriesFactoryException("Unrecognized SNCL mapper")
    
    
        def get_timeseries(
            self,
            starttime: UTCDateTime,
            endtime: UTCDateTime,
            observatory: Optional[str] = None,
            channels: Optional[List[str]] = None,
            type: Optional[DataType] = None,
            interval: Optional[DataInterval] = None,
    
    Wilbur, Spencer Franklin's avatar
    Wilbur, Spencer Franklin committed
            add_empty_channels: bool = True,
    
        ) -> Stream:
            """Get timeseries data
    
            Parameters
            ----------
            starttime: UTCDateTime
                time of first sample.
            endtime: UTCDateTime
                time of last sample.
            observatory: str
                observatory code.
            channels: array
                list of channels to load
            type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
                data type
            interval: {'second', 'minute', 'hour', 'day'}
                data interval
            add_empty_channels: bool
                if True, returns channels without data as empty traces
    
            Returns
            -------
            timeseries: Stream
                timeseries object with requested data
    
            Raises
            ------
            TimeseriesFactoryException
                if invalid values are requested, or errors occur while
                retrieving timeseries.
            """
            observatory = observatory or self.observatory
            channels = channels or self.channels
            type = type or self.type
            interval = interval or self.interval
    
            if starttime > endtime:
                raise TimeseriesFactoryException(
                    'Starttime before endtime "%s" "%s"' % (starttime, endtime)
                )
    
            # I do not entirely understand what the pupose of the sys.stdout was tryinig to accomplish
    
            # so I left it out but I think this may be worth a discussion. Apart from this I removed a lot of
    
            # post processing functionality that dealt with removing NaN values from the timeseries. My understadning is that
            # the IRIS DMC will return empty data but not NaN values.
    
            ######################
    
            # # obspy factories sometimes write to stdout, instead of stderr
            # original_stdout = sys.stdout
            # try:
            #     # send stdout to stderr
            #     sys.stdout = sys.stderr
            #     # get the timeseries
            timeseries = Stream()
    
                data = self._get_timeseries(
                    starttime,
                    endtime,
                    observatory,
                    channel,
                    type,
                    interval,
                    add_empty_channels,
    
                if len(data) == 0:
                    continue
                timeseries += data
            # finally:
            #     # restore stdout
            #     sys.stdout = original_stdout
            self._post_process(timeseries, starttime, endtime, channels)
    
    
        def _get_timeseries(
            self,
            starttime: UTCDateTime,
            endtime: UTCDateTime,
            observatory: str,
            channel: str,
            type: DataType,
            interval: DataInterval,
            add_empty_channels: bool = True,
        ) -> Trace:
            """get timeseries data for a single channel.
    
            Parameters
            ----------
            starttime: UTCDateTime
                the starttime of the requested data
            endtime: UTCDateTime
                the endtime of the requested data
            observatory: str
                observatory code
            channel: str
                single character channel {H, E, D, Z, F}
            type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
                data type
            interval: {'second', 'minute', 'hour', 'day'}
                data interval
            add_empty_channels: bool
                if True, returns channels without data as empty traces
    
            Returns
            -------
            data: Trace
                timeseries trace of the requested channel data
            """
    
                station=observatory,
                data_type=type,
                interval=interval,
                element=channel,
                network=self.network,
                location=self.locationCode,
            )
    
            print("the channel reported to SCNL is ", channel)
    
            # geomag-algorithms *should* treat starttime/endtime as inclusive everywhere;
            # according to its author, EdgeCWB is inclusive of starttime, but exclusive of
            # endtime, to satisfy seismic standards/requirements, to precision delta/2;
            half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2
    
    
            # Rotate the trace into a right handed coordinate frame.
            # This will worrk assuming the metadata is reported correctly.
    
            # Channel that require rotations
            channel_rotations = ["X", "Y", "Z"]
    
    
            try:
                data = self.Client.get_waveforms(
                    network=sncl.network,
                    station=sncl.station,
                    location=sncl.location,
                    channel=sncl.channel,
                    starttime=starttime,
                    endtime=endtime + half_delta,
                    attach_response=True,
                )
    
    
            if channel in channel_rotations:
                print("The rotation function has been called for channel", channel)
                data = self._rotate_and_return_requested_channel(
                    data, sncl, starttime, endtime, channel
                )
    
            if data.count() == 0 and add_empty_channels:
                data += self._get_empty_trace(
                    starttime=starttime,
                    endtime=endtime,
                    observatory=observatory,
                    channel=channel,
                    data_type=type,
                    interval=interval,
                    network=sncl.network,
                    location=sncl.location,
                )
            if data.count() != 0:
                TimeseriesUtility.pad_and_trim_trace(
                    trace=data[0], starttime=starttime, endtime=endtime
                )
    
            self._set_metadata(data, observatory, channel, type, interval)
    
    Wilbur, Spencer Franklin's avatar
    Wilbur, Spencer Franklin committed
    
    
            return data
    
        def _post_process(
            self,
            timeseries: Stream,
            starttime: UTCDateTime,
            endtime: UTCDateTime,
            channels: List[str],
        ):
            """Post process a timeseries stream after the raw data is
    
                    is fetched from querymom. Specifically changes
    
                    any MaskedArray to a ndarray with nans representing gaps.
                    Then calls pad_timeseries to deal with gaps at the
                    beggining or end of the streams.
    
            Parameters
            ----------
            timeseries: Stream
                The timeseries stream as returned by the call to get_waveforms
            starttime: UTCDateTime
                the starttime of the requested data
            endtime: UTCDateTime
                the endtime of the requested data
            channels: array
                list of channels to load
    
            Notes: the original timeseries object is changed.
            """
    
            TimeseriesUtility.pad_timeseries(timeseries, starttime, endtime)
    
            disclaimer_texts = [
                "DISCLAIMER",
                "This is provisional data. Any data secured from the USGS database that are identified as provisional have not received Director's approval and are subject to revision. Source and Authority: U.S. Geological Survey Manual, Section 500.24",
            ]
    
    
                if "comments" not in trace.stats:
                    trace.stats.comments = []
                # Check if the disclaimer is already in the comments to avoid duplication
                if not any("DISCLAIMER" in comment for comment in trace.stats.comments):
                    trace.stats.comments.extend(disclaimer_texts)
    
        def _set_metadata(
            self,
            stream: Stream,
            observatory: str,
            channel: str,
    
        ):
            """set metadata for a given stream/channel
            Parameters
            ----------
            observatory: str
                observatory code
            channel: str
                edge channel code {MVH, MVE, MVD, ...}
            type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
                data type
            interval: {'second', 'minute', 'hour', 'day'}
                data interval
            """
            for trace in stream:
                self.observatoryMetadata.set_metadata(
                    trace.stats, observatory, channel, type, interval
                )
    
    
            self,
            data: Stream,
            sncl,
            starttime: UTCDateTime,
            endtime: UTCDateTime,
            requested_channel: str,
    
            Retrieves the necessary *F channels, rotates them to ZNE, and returns the requested channel.
            Channels are mapped as follows: 'X' -> '*F2', 'Y' -> '*F1', 'Z' -> '*FZ'.
    
            # Initialize FDSN client and get the necessary data
            FDSN = self.Client
            # Determine if the requested channel is X, Y, or Z
    
            # Pull the LF* channels needed for rotation
            f_channels = ["?F1", "?F2", "?FZ"]
    
            inv = FDSN.get_stations(
                network=sncl.network,
                station=sncl.station,
                location=sncl.location,  # Use location if necessary
                channel=",".join(f_channels),  # Request *F1, *F2, and *FZ
                starttime=starttime,
                endtime=endtime,
                level="response",
            )
    
            # Rotate the stream to ZNE
            data.rotate(method="->ZNE", inventory=inv)
            print(data)
    
            # # Now return only the requested channel (mapped to Z, N, or E)
            # if requested_channel == "X":
            #     return data.select(channel="?FN")  # N after rotation
            # elif requested_channel == "Y":
            #     return data.select(channel="?FE")  # E after rotation
            # elif requested_channel == "Z":
            #     return data.select(channel="?FZ")  # Z remains Z