Skip to content
Snippets Groups Projects
EdgeFactory.py 17.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • """Factory that loads data from earthworm and writes to Edge.
    
    EdgeFactory uses obspy earthworm class to read data from any
    earthworm standard Waveserver using the obspy getWaveform call.
    
    Writing will be implemented with Edge specific capabilities,
    to take advantage of it's newer realtime abilities.
    
    Edge is the USGS earthquake hazard centers replacement for earthworm.
    """
    
    Yash Shah's avatar
    Yash Shah committed
    from __future__ import absolute_import
    
    from datetime import datetime
    
    from typing import List, Optional
    
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
    import numpy.ma
    
    from obspy import Stream, Trace, UTCDateTime
    
    from obspy.clients import earthworm, neic
    
    from .. import ChannelConverter, TimeseriesUtility
    
    from ..geomag_types import DataInterval, DataType
    
    from ..TimeseriesFactory import TimeseriesFactory
    from ..TimeseriesFactoryException import TimeseriesFactoryException
    
    from ..ObservatoryMetadata import ObservatoryMetadata
    
    Yash Shah's avatar
    Yash Shah committed
    from .RawInputClient import RawInputClient
    
    from .SNCL import SNCL
    
    from .LegacySNCL import LegacySNCL
    
    class EdgeFactory(TimeseriesFactory):
    
        """TimeseriesFactory for Edge related data.
    
        Parameters
        ----------
        host: str
            a string representing the IP number of the host to connect to.
        port: integer
            the port number the waveserver is listening on.
    
        write_port: integer
            the port number the client is writing to.
        tag: str
            A tag used by edge to log and associate a socket with a given data
            source
        forceout: bool
            Tells edge to forceout a packet to miniseed.  Generally used when
            the user knows no more data is coming.
    
        observatory: str
            the observatory code for the desired observatory.
        channels: array
    
            an array of channels {H, D, E, F, Z, MGD, MSD, HGD}.
            Known since channel names are mapped based on interval and type,
            others are passed through, see #_get_edge_channel().
    
        type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
            data type
        interval: {'second', 'minute', 'hour', 'day'}
            data interval
    
        observatoryMetadata: ObservatoryMetadata object
            an ObservatoryMetadata object used to replace the default
            ObservatoryMetadata.
    
        locationCode: str
            the location code for the given edge server, overrides type
    
            in get_timeseries/put_timeseries
    
        scaleFactor: int
            all data written to edge (via raw input client) will be scaled
            by this integer prior to write; all data read from edge will be
            will be divided by this integer after read; default = 1000
    
        snclMapper: {'geomag','legacy'}
    
            a mapper of common channel names to SEED SNCL codes (that is,
            station, network, channel, location codes); default = legacy
    
    
        See Also
        --------
        TimeseriesFactory
    
    
        Notes
        -----
        This is designed to read from any earthworm style waveserver, but it
            currently only writes to an edge. Edge mimics an earthworm style
            waveserver close enough that we hope to maintain that compatibility
            for reading.
    
        def __init__(
            self,
    
            port: int = 2060,
            write_port: int = 7981,
            tag: str = "GeomagAlg",
            forceout: bool = False,
            observatory: Optional[str] = None,
            channels: Optional[List[str]] = None,
            type: Optional[DataType] = None,
            interval: Optional[DataInterval] = None,
            observatoryMetadata: Optional[ObservatoryMetadata] = None,
            locationCode: Optional[str] = None,
    
            scaleFactor: int = 1000,
    
            snclMapper: str = "legacy",
    
            TimeseriesFactory.__init__(self, observatory, channels, type, interval)
    
            if port == 2060:
                # earthworm (phasing this out gradually)
                self.client = earthworm.Client(host, port)
            else:
                # CWBQuery/miniseed protocol (preferred)
                self.client = neic.Client(host, port)
    
            self.write_port = write_port
    
            self.tag = tag
    
            self.interval = interval
            self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata()
            self.locationCode = locationCode
    
            self.scaleFactor = scaleFactor
    
            if snclMapper == "legacy":
                self.get_sncl = LegacySNCL.get_sncl
            elif snclMapper == "geomag":
                self.get_sncl = SNCL.get_sncl
            else:
                raise TimeseriesFactoryException("Unrecognized SNCL mapper")
    
        def get_timeseries(
            self,
    
            starttime: UTCDateTime,
            endtime: UTCDateTime,
            observatory: Optional[str] = None,
            channels: Optional[List[str]] = None,
            type: Optional[DataType] = None,
            interval: Optional[DataInterval] = None,
    
            add_empty_channels: bool = True,
    
        ) -> Stream:
    
            starttime: UTCDateTime
    
            endtime: UTCDateTime
    
                observatory code.
    
            channels: array
    
            type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
                data type
            interval: {'second', 'minute', 'hour', 'day'}
                data interval
            add_empty_channels: bool
    
                if True, returns channels without data as empty traces
    
            timeseries: Stream
                timeseries object with requested data
    
    
            Raises
            ------
            TimeseriesFactoryException
                if invalid values are requested, or errors occur while
                retrieving timeseries.
            """
            observatory = observatory or self.observatory
            channels = channels or self.channels
            type = type or self.type
            interval = interval or self.interval
    
    
            if starttime > endtime:
                raise TimeseriesFactoryException(
    
                    'Starttime before endtime "%s" "%s"' % (starttime, endtime)
                )
    
            # obspy factories sometimes write to stdout, instead of stderr
    
            original_stdout = sys.stdout
            try:
    
                # get the timeseries
    
                timeseries = Stream()
    
                for channel in channels:
    
                    data = self._get_timeseries(
    
                        starttime,
                        endtime,
                        observatory,
                        channel,
                        type,
                        interval,
                        add_empty_channels,
    
                    if len(data) == 0:
                        continue
    
                    timeseries += data
            finally:
    
                sys.stdout = original_stdout
    
            self._post_process(timeseries, starttime, endtime, channels)
    
        def put_timeseries(
            self,
    
            timeseries: Stream,
            starttime: Optional[UTCDateTime] = None,
            endtime: Optional[UTCDateTime] = None,
            observatory: Optional[str] = None,
            channels: Optional[List[str]] = None,
            type: Optional[DataType] = None,
            interval: Optional[DataInterval] = None,
    
            timeseries: Stream
    
                timeseries object with data to be written
            observatory: str
    
                observatory code
            channels: array
    
            type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
                data type
            interval: {'second', 'minute', 'hour', 'day'}
                data interval
    
            Notes
            -----
            Streams sent to timeseries are expected to have a single trace per
                channel and that trace should have an ndarray, with nan's
                representing gaps.
    
            observatory = observatory or stats.station or self.observatory
    
            channels = channels or self.channels
    
            type = type or self.type or stats.data_type
            interval = interval or self.interval or stats.data_interval
    
    
            if starttime is None or endtime is None:
    
                starttime, endtime = TimeseriesUtility.get_stream_start_end_times(
    
            for channel in channels:
                if timeseries.select(channel=channel).count() == 0:
    
                        'Missing channel "%s" for output, available channels %s'
                        % (channel, str(TimeseriesUtility.get_channels(timeseries)))
                    )
    
                self._put_channel(
                    timeseries, observatory, channel, type, interval, starttime, endtime
                )
    
        def _convert_timeseries_to_decimal(self, stream: Stream):
    
            """convert geomag edge timeseries data stored as ints, to decimal by
                dividing by 1000.00
    
            stream: Stream
                a stream retrieved from a geomag edge representing one channel
    
            Notes
            -----
            This routine changes the values in the timeseries. The user should
                make a copy before calling if they don't want that side effect.
    
                trace.data = numpy.divide(trace.data, self.scaleFactor)
    
        def _convert_trace_to_int(self, trace_in: Trace) -> Trace:
    
            """convert geomag edge traces stored as decimal, to ints by multiplying
               by 1000
    
            Parameters
            ----------
    
            trace: Trace
                a trace retrieved from a geomag edge representing one channel
    
            trace: Trace
    
            Notes
            -----
            this doesn't work on ndarray with nan's in it.
            the trace must be a masked array.
            """
    
            trace.data = numpy.multiply(trace.data, self.scaleFactor)
    
            trace.data = trace.data.astype(int)
    
    
        def _convert_stream_to_masked(self, timeseries: Stream, channel: str) -> Stream:
    
            """convert geomag edge traces in a timeseries stream to a MaskedArray
                This allows for gaps and splitting.
            Parameters
            ----------
    
            stream: Stream
                a stream retrieved from a geomag edge representing one channel
            channel: str
                the channel to be masked
    
            stream: Stream
                a stream with all traces converted to masked arrays
    
            stream = timeseries.copy()
            for trace in stream.select(channel=channel):
    
                trace.data = numpy.ma.masked_invalid(trace.data)
    
        def _get_timeseries(
            self,
    
            starttime: UTCDateTime,
            endtime: UTCDateTime,
            observatory: str,
            channel: str,
            type: DataType,
            interval: DataInterval,
    
            add_empty_channels: bool = True,
    
        ) -> Trace:
    
            """get timeseries data for a single channel.
    
            Parameters
            ----------
    
            starttime: UTCDateTime
    
                the starttime of the requested data
    
            endtime: UTCDateTime
    
                the endtime of the requested data
    
            observatory: str
    
            channel: str
    
            type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
                data type
            interval: {'second', 'minute', 'hour', 'day'}
                data interval
            add_empty_channels: bool
    
                if True, returns channels without data as empty traces
    
            data: Trace
    
                timeseries trace of the requested channel data
            """
    
            sncl = self.get_sncl(
    
                station=observatory,
                data_type=type,
                interval=interval,
                element=channel,
    
                location=self.locationCode,
    
    Erin (Josh) Rigler's avatar
    Erin (Josh) Rigler committed
            # geomag-algorithms *should* treat starttime/endtime as inclusive everywhere;
            # according to its author, EdgeCWB is inclusive of starttime, but exclusive of
            # endtime, to satisfy seismic standards/requirements, to precision delta/2;
            half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2
    
            try:
                data = self.client.get_waveforms(
    
                    sncl.network,
                    sncl.station,
    
                    sncl.location,
    
    Erin (Josh) Rigler's avatar
    Erin (Josh) Rigler committed
                    endtime + half_delta,
    
                )
            except TypeError:
                # get_waveforms() fails if no data is returned from Edge
    
                data = Stream()
    
            # make sure data is 32bit int
            for trace in data:
    
                trace.data = trace.data.astype("i4")
    
            if data.count() == 0 and add_empty_channels:
    
                    starttime=starttime,
                    endtime=endtime,
                    observatory=observatory,
    
                    data_type=type,
                    interval=interval,
    
                    location=sncl.location,
    
            self._set_metadata(data, observatory, channel, type, interval)
    
        def _post_process(
            self,
            timeseries: Stream,
            starttime: UTCDateTime,
            endtime: UTCDateTime,
            channels: List[str],
        ):
    
            """Post process a timeseries stream after the raw data is
                    is fetched from a waveserver. Specifically changes
                    any MaskedArray to a ndarray with nans representing gaps.
    
                    Then calls pad_timeseries to deal with gaps at the
    
                    beggining or end of the streams.
    
            Parameters
            ----------
    
            timeseries: Stream
    
                The timeseries stream as returned by the call to get_waveforms
    
            starttime: UTCDateTime
    
                the starttime of the requested data
    
            endtime: UTCDateTime
    
                the endtime of the requested data
    
            channels: array
    
                list of channels to load
    
            Notes: the original timeseries object is changed.
            """
    
            self._convert_timeseries_to_decimal(timeseries)
    
            for trace in timeseries:
    
                if isinstance(trace.data, numpy.ma.MaskedArray):
                    trace.data.set_fill_value(numpy.nan)
                    trace.data = trace.data.filled()
    
    
            if "D" in channels:
                for trace in timeseries.select(channel="D"):
                    trace.data = ChannelConverter.get_radians_from_minutes(trace.data)
    
    Cain, Payton David's avatar
    Cain, Payton David committed
            TimeseriesUtility.pad_timeseries(timeseries, starttime, endtime)
    
        def _put_channel(
    
            timeseries: Stream,
            observatory: str,
            channel: str,
            type: DataType,
            interval: DataInterval,
            starttime: UTCDateTime,
            endtime: UTCDateTime,
    
            timeseries: Stream
    
                timeseries object with data to be written
    
                observatory code
    
            type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
                data type
            interval: {'second', 'minute', 'hour', 'day'}
                data interval
            starttime: UTCDateTime
            endtime: UTCDateTime
    
    
            Notes
            -----
            RawInputClient seems to only work when sockets are
    
            sncl = self.get_sncl(
    
                station=observatory,
                data_type=type,
                interval=interval,
                element=channel,
    
                location=self.locationCode,
    
            ric = RawInputClient(
    
                self.host,
                self.write_port,
    
                sncl.station,
                sncl.channel,
    
                sncl.location,
    
            stream = self._convert_stream_to_masked(timeseries=timeseries, channel=channel)
    
            # Make certain there's actually data
            if not numpy.ma.any(stream.select(channel=channel)[0].data):
                return
    
    
            for trace in stream.select(channel=channel).split():
    
                trace_send.trim(starttime, endtime)
    
                if channel == "D":
    
                    trace_send.data = ChannelConverter.get_minutes_from_radians(
    
                        trace_send.data
                    )
    
                trace_send = self._convert_trace_to_int(trace_send)
                ric.send_trace(interval, trace_send)
    
            stream: Stream,
    
            observatory: str,
            channel: str,
            type: str,
            interval: str,
        ):
            """set metadata for a given stream/channel
            Parameters
            ----------
    
            observatory: str
    
            channel: str
    
                edge channel code {MVH, MVE, MVD, ...}
    
            type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
                data type
            interval: {'second', 'minute', 'hour', 'day'}
                data interval
    
            """
            for trace in stream:
                self.observatoryMetadata.set_metadata(
                    trace.stats, observatory, channel, type, interval
                )