Skip to content
Snippets Groups Projects
FDSNFactory.py 14.6 KiB
Newer Older
"""Factory that loads data from FDSN to edge
"""
from __future__ import absolute_import
from datetime import datetime
from typing import List, Optional

import numpy
import sys
import numpy.ma
from obspy import Stream, UTCDateTime, Trace, read, read_inventory
from obspy.clients.iris import Client
from obspy.clients.fdsn import Client as FDSNClient

from .. import ChannelConverter, TimeseriesUtility
from ..geomag_types import DataInterval, DataType
from ..TimeseriesFactory import TimeseriesFactory
from ..TimeseriesFactoryException import TimeseriesFactoryException
from ..ObservatoryMetadata import ObservatoryMetadata
from ..VariometerMetadata import VariometerMetadata
from .RawInputClient import RawInputClient
from .FDSNSNCL import FDSNSNCL


class FDSNFactory(TimeseriesFactory):
    """TimeseriesFactory for Edge related data.

    Parameters
    ----------
    host: str
        a string representing the IP number of the host to connect to.
    port: integer
        the port number the waveserver is listening on.
    write_port: integer
        the port number the client is writing to.
    cwbport: int
        the port number of the cwb host to connect to.
    tag: str
        A tag used by edge to log and associate a socket with a given data
        source
    forceout: bool
        Tells edge to forceout a packet to miniseed.  Generally used when
        the user knows no more data is coming.
    observatory: str
        the observatory code for the desired observatory.
    channels: array
        an array of channels {H, D, E, F, Z, MGD, MSD, HGD}.
        Known since channel names are mapped based on interval and type,
        others are passed through, see #_get_edge_channel().
    type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
        data type
    interval: {'second', 'minute', 'hour', 'day'}
        data interval
    observatoryMetadata: ObservatoryMetadata object
        an ObservatoryMetadata object used to replace the default
        ObservatoryMetadata.
    locationCode: str
        the location code for the given edge server, overrides type
        in get_timeseries/put_timeseries
    cwbhost: str
        a string represeting the IP number of the cwb host to connect to.

    See Also
    --------
    TimeseriesFactory

    Notes
    -----
    This is designed to read from any earthworm style waveserver, but it
        currently only writes to an edge. Edge mimics an earthworm style
        waveserver close enough that we hope to maintain that compatibility
        for reading.
    """

    def __init__(
        self,
        tag: str = "GeomagAlg",
        forceout: bool = False,
        observatory: Optional[str] = None,
        network: str = "NT",
        channels: Optional[List[str]] = None,
        type: Optional[DataType] = None,
        interval: Optional[DataInterval] = None,
        variometerMetadata: Optional[VariometerMetadata] = None,
        locationCode: Optional[str] = None,
    ):
        TimeseriesFactory.__init__(self, observatory, channels, type, interval)
        self.tag = tag
        self.forceout = forceout
        self.interval = interval
        self.observatoryMetadata = variometerMetadata or VariometerMetadata()
        self.network = network
        self.locationCode = locationCode

    def get_timeseries(
        self,
        starttime: UTCDateTime,
        endtime: UTCDateTime,
        observatory: Optional[str] = None,
        channels: Optional[List[str]] = None,
        type: Optional[DataType] = None,
        interval: Optional[DataInterval] = None,
        add_empty_channels: bool = True,
    ) -> Stream:
        """Get timeseries data

        Parameters
        ----------
        starttime: UTCDateTime
            time of first sample.
        endtime: UTCDateTime
            time of last sample.
        observatory: str
            observatory code.
        channels: array
            list of channels to load
        type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
            data type
        interval: {'second', 'minute', 'hour', 'day'}
            data interval
        add_empty_channels: bool
            if True, returns channels without data as empty traces

        Returns
        -------
        timeseries: Stream
            timeseries object with requested data

        Raises
        ------
        TimeseriesFactoryException
            if invalid values are requested, or errors occur while
            retrieving timeseries.
        """
        observatory = observatory or self.observatory
        channels = channels or self.channels
        type = type or self.type
        interval = interval or self.interval

        if starttime > endtime:
            raise TimeseriesFactoryException(
                'Starttime before endtime "%s" "%s"' % (starttime, endtime)
            )
        
        ######################
        #I do not entirely understand what the pupose of the sys.stdout was tryinig to accomplish 
        # so I left it out but I think this may be worth a discussion. Apart from this I removed a lot of
        # post processing functionality that dealt with removing NaN values from the timeseries. My understadning is that 
        # the IRIS DMC will return empty data but not NaN values.  
        ######################

        # # obspy factories sometimes write to stdout, instead of stderr
        # original_stdout = sys.stdout
        # try:
        #     # send stdout to stderr
        #     sys.stdout = sys.stderr
        #     # get the timeseries
        timeseries = Stream()
            data = self._get_timeseries(
                starttime,
                endtime,
                observatory,
                channel,
                type,
                interval,
                add_empty_channels,
            if len(data) == 0:
                continue
            timeseries += data
        # finally:
        #     # restore stdout
        #     sys.stdout = original_stdout
        self._post_process(timeseries, starttime, endtime, channels)



    def _get_timeseries(
        self,
        starttime: UTCDateTime,
        endtime: UTCDateTime,
        observatory: str,
        channel: str,
        type: DataType,
        interval: DataInterval,
        add_empty_channels: bool = True,
    ) -> Trace:
        """get timeseries data for a single channel.

        Parameters
        ----------
        starttime: UTCDateTime
            the starttime of the requested data
        endtime: UTCDateTime
            the endtime of the requested data
        observatory: str
            observatory code
        channel: str
            single character channel {H, E, D, Z, F}
        type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
            data type
        interval: {'second', 'minute', 'hour', 'day'}
            data interval
        add_empty_channels: bool
            if True, returns channels without data as empty traces

        Returns
        -------
        data: Trace
            timeseries trace of the requested channel data
        """
        sncl = FDSNSNCL.get_sncl(
            station=observatory,
            data_type=type,
            interval=interval,
            element=channel,
            network=self.network,
            location=self.locationCode,
        )
        # geomag-algorithms *should* treat starttime/endtime as inclusive everywhere;
        # according to its author, EdgeCWB is inclusive of starttime, but exclusive of
        # endtime, to satisfy seismic standards/requirements, to precision delta/2;
        half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2
        try:
            data = self.Client.get_waveforms(
                network=sncl.network,
                station=sncl.station,
                location=sncl.location,
                channel=sncl.channel,
                starttime=starttime,
                endtime=endtime + half_delta,
                attach_response=True,
            )
        except TypeError:
            data = Stream()

        # make sure data is 32bit int
        for trace in data:
            trace.data = trace.data.astype("i4")
        data.merge()
        if data.count() == 0 and add_empty_channels:
            data += self._get_empty_trace(
                starttime=starttime,
                endtime=endtime,
                observatory=observatory,
                channel=channel,
                data_type=type,
                interval=interval,
                network=sncl.network,
                location=sncl.location,
            )
        if data.count() != 0:
            TimeseriesUtility.pad_and_trim_trace(
                trace=data[0], starttime=starttime, endtime=endtime
            )
        # Beneath is necessary code to check the reported azimuth
        # for the LF1 (E or Y) and LF2 (H or X) channels and determine if intstrument axes have been reversed.
        azi, dip = self._get_orientations(data[0], starttime)

        # Checks Azimuth for LF2 channel
        if 270 > azi and azi > 90.0 and data[0].stats.channel == "LF2":
            data[0].data *= -1

        # Checks Azimuth for LF1 channel
        elif azi > 180 and azi < 360 and data[0].stats.channel == "LF1":
            data[0].data *= -1

        # Checks Dip for LFZ channel
        elif dip > 0 and dip < 180 and data[0].stats.channel == "LFZ":
            data[0].data *= -1

        # Remove channel Response:
        # The function "remove_response" appears to be doing what we want;
        # i.e. convert from counts to NT, but this may be a placeholder
        # at least until we see how this function behaves if
        # a station has a frequency response.
        data.remove_response(output="DEF", zero_mean=False, taper=False)

        self._set_metadata(data, observatory, channel, type, interval)

        return data

    def _post_process(
        self,
        timeseries: Stream,
        starttime: UTCDateTime,
        endtime: UTCDateTime,
        channels: List[str],
    ):
        """Post process a timeseries stream after the raw data is
                is fetched from querymom. Specifically changes
                any MaskedArray to a ndarray with nans representing gaps.
                Then calls pad_timeseries to deal with gaps at the
                beggining or end of the streams.

        Parameters
        ----------
        timeseries: Stream
            The timeseries stream as returned by the call to get_waveforms
        starttime: UTCDateTime
            the starttime of the requested data
        endtime: UTCDateTime
            the endtime of the requested data
        channels: array
            list of channels to load

        Notes: the original timeseries object is changed.
        """

        TimeseriesUtility.pad_timeseries(timeseries, starttime, endtime)
        # Retrieve and set metadata for each trace
        for trace in timeseries:
            if "comments" in trace.stats:
                trace.stats.comments.extend(
                    [
                        "DISCLAIMER",
                        "This is provisional data. Any data secured from the USGS database that are identified as provisional have not received Director's approval and are subject to revision. Source and Authority: U.S. Geological Survey Manual, Section 500.24",
                        "DISCLAIMER",
                    ]
                )  # input string
            else:
                trace.stats.comments = [
                    "DISCLAIMER",
                    "This is provisional data. Any data secured from the USGS database that are identified as provisional have not received Director's approval and are subject to revision. Source and Authority: U.S. Geological Survey Manual, Section 500.24",
                    "DISCLAIMER",
                ]
  
    def _set_metadata(
        self,
        stream: Stream,
        observatory: str,
        channel: str,
        type: DataType,
        interval: DataInterval,
    ):
        """set metadata for a given stream/channel
        Parameters
        ----------
        observatory: str
            observatory code
        channel: str
            edge channel code {MVH, MVE, MVD, ...}
        type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
            data type
        interval: {'second', 'minute', 'hour', 'day'}
            data interval
        """
        for trace in stream:
            self.observatoryMetadata.set_metadata(
                trace.stats, observatory, channel, type, interval
            )

    def _get_orientations(
        self, trace: Trace, starttime: UTCDateTime
    ) -> tuple[float, float]:
        # Retrieve station orientation information using FDSN for each trace
        sncl = FDSNSNCL.get_sncl(
            data_type=trace.stats.location,
            element=trace.stats.channel,
            interval=self.interval,
            station=trace.stats.station,
            network=trace.stats.network,
            location=trace.stats.location,
        )

        FDSN = FDSNClient("IRIS")
        inv = FDSN.get_stations(
            network=sncl.network,
            station=sncl.station,
            channel=sncl.channel,
            level="channel",
        )

        # Construct the channel code using the current trace's information
        channel_code = f"{sncl.network}.{sncl.station}.{sncl.location}.{sncl.channel}"

        # Get orientation for the constructed channel code and time
        orient = inv.get_orientation(channel_code, starttime)

        azimuth = orient["azimuth"]
        dip = orient["dip"]

        # Set azimuth in trace.stats.azimuth
        trace.stats.azimuth = azimuth

        # Add comments to trace.stats
        if "comments" in trace.stats:
            trace.stats.comments.extend(
                [
                    "DISCLAIMER",
                    "This is provisional data. Any data secured from the USGS database that are identified as provisional have not received Director's approval and are subject to revision. Source and Authority: U.S. Geological Survey Manual, Section 500.24",
                    "DISCLAIMER",
                ]
            )
        else:
            trace.stats.comments = [
                "DISCLAIMER",
                "This is provisional data. Any data secured from the USGS database that are identified as provisional have not received Director's approval and are subject to revision. Source and Authority: U.S. Geological Survey Manual, Section 500.24",
                "DISCLAIMER",
            ]

        # Return both azimuth and dip
        return azimuth, dip