Skip to content
Snippets Groups Projects
EdgeFactory.py 16.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • """Factory that loads data from earthworm and writes to Edge.
    
    EdgeFactory uses obspy earthworm class to read data from any
    earthworm standard Waveserver using the obspy getWaveform call.
    
    Writing will be implemented with Edge specific capabilities,
    to take advantage of it's newer realtime abilities.
    
    Edge is the USGS earthquake hazard centers replacement for earthworm.
    """
    
    from obspy.core.utcdatetime import UTCDateTime
    import numpy
    
    from RawInputClient import RawInputClient
    
    from geomagio import TimeseriesFactory, TimeseriesFactoryException
    from obspy import earthworm
    from ObservatoryMetadata import ObservatoryMetadata
    
    import numpy.ma as ma
    
    HOURSECONDS = 3600
    DAYMINUTES = 1440
    
    
    class EdgeFactory(TimeseriesFactory):
    
        """TimeseriesFactory for Edge related data.
    
        Parameters
        ----------
        host: str
            a string representing the IP number of the host to connect to.
        port: integer
            the port number the waveserver is listening on.
        observatoryMetadata: ObservatoryMetadata object
            an ObservatoryMetadata object used to replace the default
            ObservatoryMetadata.
    
        locationCode: str
            the location code for the given edge server, overrides type
            in get_timeseries/put_timeseries.
    
    
        See Also
        --------
        TimeseriesFactory
        """
    
    
        def __init__(self, host=None, port=None, observatory=None,
    
                channels=None, type=None, interval=None,
    
                observatoryMetadata=None, locationCode=None,
                cwbhost='', cwbport = 0, tag='EdgeFactory'):
    
            TimeseriesFactory.__init__(self, observatory, channels, type, interval)
            self.client = earthworm.Client(host, port)
    
    
            self.ric = ric = RawInputClient(tag, host, port, cwbhost, cwbport)
    
            self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata()
    
            self.locationCode=locationCode
    
        def get_timeseries(self, starttime, endtime, observatory=None,
                channels=None, type=None, interval=None):
            """Get timeseries data
    
            Parameters
            ----------
            starttime : obspy.core.UTCDateTime
                time of first sample.
            endtime : obspy.core.UTCDateTime
                time of last sample.
    
            observatory : str
                observatory code.
    
            channels : array_like
                list of channels to load
            type : {'variation', 'quasi-definitive'}
                data type.
            interval : {'minute', 'second'}
                data interval.
    
            Returns
            -------
            obspy.core.Stream
                timeseries object with requested data.
    
            Raises
            ------
            TimeseriesFactoryException
                if invalid values are requested, or errors occur while
                retrieving timeseries.
            """
            observatory = observatory or self.observatory
            channels = channels or self.channels
            type = type or self.type
            interval = interval or self.interval
    
    
            if starttime > endtime:
                raise TimeseriesFactoryException(
                    'Starttime before endtime "%s" "%s"' % (starttime, endtime))
    
    
            for channel in channels:
                data = self._get_timeseries(starttime, endtime, observatory,
                        channel, type, interval)
    
            self._post_process(timeseries, starttime, endtime, channels)
    
    
        def put_timeseries(self, timeseries, starttime=None, endtime=None,
                    observatory = None, channels=None, type=None, interval=None):
    
            """Put timeseries data
    
            Parameters
            ----------
            observatory : str
                observatory code.
            starttime : obspy.core.UTCDateTime
                time of first sample.
            endtime : obspy.core.UTCDateTime
                time of last sample.
            channels : array_like
                list of channels to load
            type : {'variation', 'quasi-definitive'}
                data type.
            interval : {'minute', 'second'}
                data interval.
    
            Returns
            -------
            obspy.core.Stream
                timeseries object with requested data.
    
            Raises
            ------
            TimeseriesFactoryException
                if invalid values are requested, or errors occur while
                retrieving timeseries.
            """
    
            observatory = observatory or self.observatory
            channels = channels or self.channels
            type = type or self.type
            interval = interval or self.interval
            stats = timeseries[0].stats
    
            starttime = starttime or stats.starttime
            endtime = endtime or stats.endtime
    
            if starttime > endtime:
                raise TimeseriesFactoryException(
                    'Starttime before endtime "%s" "%s"' % (starttime, endtime))
    
            for c in channels:
                timeseries.select(channel=c)[0].data = ma.masked_invalid(
                        timeseries.select(channel=c)[0].data)
                for tr in timeseries.select(channel=c).split():
                    # print tr.data
                    self._send_trace(tr, observatory, c, type, interval)
                    # tr.write(tr.id + ".asc", format='SH_ASC')
    
            self.ric.close()
    
        def _send_trace(self, trace, observatory, channel, type, interval):
            station = self._get_edge_station(observatory, channel,
                    type, interval)
            location = self._get_edge_location(observatory, channel,
                    type, interval)
            network = self._get_edge_network(observatory, channel,
                    type, interval)
            edge_channel = self._get_edge_channel(observatory, channel,
                    type, interval)
            totalsamps = len(trace.data)
            starttime = trace.stats.starttime
            if self.interval == 'second':
                nsamp = HOURSECONDS
                timeoffset = 1
            elif self.interval == 'minute':
                nsamp = DAYMINUTES
                timeoffset = 60
    
            print trace.stats.npts
            print totalsamps
    
            for i in xrange(0, totalsamps, nsamp):
                location = self.locationCode or location
                seedname = self.ric.create_seedname(station,
                        edge_channel, location, network)
                if i - totalsamps < nsamp:
                    endsample = totalsamps
                else:
                    endsample = i + nsamp
                nsamp = endsample - i
                endtime = starttime + nsamp * timeoffset
                trace_send = trace.slice(starttime, endtime)
                self._convert_trace_to_int(trace_send)
                samplerate = 1. / 60
                print seedname
                print nsamp
                print starttime
                print endtime
                print samplerate
                print ' '
                self.ric.send(seedname, nsamp, trace_send, starttime,
                    samplerate, 0, 0, 0, 0)
                starttime += nsamp * timeoffset
                self.ric.forceout(seedname)
    
        def _convert_trace_to_decimal(self, stream):
            """convert geomag edge traces stored as ints, to decimal by dividing
               by 1000.00
            Parameters
            ----------
            stream : obspy.core.stream
                a stream retrieved from a geomag edge representing one channel.
            """
            for trace in stream:
                trace.data = numpy.divide(trace.data, 1000.00)
    
    
        # this doesn't work on ndarray with nan's in it.
        # the trace must be a masked array.
        def _convert_trace_to_int(self, trace):
            trace.data = numpy.multiply(trace.data, 1000.00)
            trace.data = trace.data.astype(int)
    
    
        def _get_edge_network(self, observatory, channel, type, interval):
            """get edge network code.
    
            Parameters
            ----------
            observatory : str
                observatory code
    
            channel : str
                single character channel {H, E, D, Z, F}
    
                data type {definitive, quasi-definitive, variation}
    
            interval : str
                interval length {minute, second}
    
            Returns
            -------
            network
                always NT
            """
            return 'NT'
    
        def _get_edge_station(self, observatory, channel, type, interval):
            """get edge station.
    
            Parameters
            ----------
            observatory : str
                observatory code
    
            channel : str
                single character channel {H, E, D, Z, F}
    
                data type {definitive, quasi-definitive, variation}
    
            interval : str
                interval length {minute, second}
    
            Returns
            -------
            station
                the observatory is returned as the station
            """
            return observatory
    
        def _get_edge_channel(self, observatory, channel, type, interval):
            """get edge channel.
    
            Parameters
            ----------
            observatory : str
                observatory code
    
            channel : str
                single character channel {H, E, D, Z, F}
    
                data type {definitive, quasi-definitive, variation}
    
            interval : str
                interval length {minute, second}
    
            Returns
            -------
            edge_channel
                {MVH, MVE, MVD, etc}
            """
            edge_interval_code = self._get_interval_code(interval)
            edge_channel = None
            if channel == 'D':
                edge_channel = edge_interval_code + 'VD'
            elif channel == 'E':
                edge_channel = edge_interval_code + 'VE'
            elif channel == 'F':
                edge_channel = edge_interval_code + 'SF'
            elif channel == 'H':
                edge_channel = edge_interval_code + 'VH'
            elif channel == 'Z':
                edge_channel = edge_interval_code + 'VZ'
            else:
                raise TimeseriesFactoryException(
                    'Unexpected channel code "%s"' % channel)
            return edge_channel
    
        def _get_edge_location(self, observatory, channel, type, interval):
            """get edge location.
    
            The edge location code is currently determined by the type
                passed in.
    
            Parameters
            ----------
            observatory : str
                observatory code
    
            channel : str
                single character channel {H, E, D, Z, F}
    
                data type {definitive, quasi-definitive, variation}
    
            interval : str
                interval length {minute, second}
    
            Returns
            -------
            location
                returns an edge location code
            """
            location = None
    
            if self.locationCode is not None:
                location = self.locationCode
            else:
                if type == 'variation':
                    location = 'R0'
                elif type == 'quasi-definitive':
                    location = 'Q0'
                elif type == 'definitive':
                    location = 'D0'
    
            return location
    
        def _get_interval_code(self, interval):
            """get edge interval code.
    
            Converts the metadata interval string, into an edge single character
                edge code.
    
            Parameters
            ----------
            observatory : str
                observatory code
    
            channel : str
                single character channel {H, E, D, Z, F}
    
                data type {definitive, quasi-definitive, variation}
    
            interval : str
                interval length {minute, second}
    
            Returns
            -------
            interval type
            """
            interval_code = None
            if interval == 'daily':
                interval_code = 'D'
            elif interval == 'hourly':
                interval_code = 'H'
            elif interval == 'minute':
                interval_code = 'M'
            elif interval == 'second':
                interval_code = 'S'
            else:
                raise TimeseriesFactoryException(
                        'Unexpected interval "%s"' % interval)
            return interval_code
    
        def _get_timeseries(self, starttime, endtime, observatory,
                    channel, type, interval):
            """get timeseries data for a single channel.
    
            Parameters
            ----------
            starttime: obspy.core.UTCDateTime
                the starttime of the requested data
            endtime: obspy.core.UTCDateTime
                the endtime of the requested data
            observatory : str
                observatory code
    
            channel : str
                single character channel {H, E, D, Z, F}
    
                data type {definitive, quasi-definitive, variation}
    
            interval : str
                interval length {minute, second}
    
            Returns
            -------
            obspy.core.trace
                timeseries trace of the requested channel data
            """
            station = self._get_edge_station(observatory, channel,
                    type, interval)
            location = self._get_edge_location(observatory, channel,
                    type, interval)
            network = self._get_edge_network(observatory, channel,
                    type, interval)
    
            edge_channel = self._get_edge_channel(observatory, channel,
    
                    type, interval)
            data = self.client.getWaveform(network, station, location,
    
            self._set_metadata(data,
                    observatory, channel, type, interval)
    
        def _clean_timeseries(self, timeseries, starttime, endtime):
            """Realigns timeseries data so the start and endtimes are the same
                as what was originally asked for, even if the data was during
                a gap.
    
            Parameters
            ----------
            timeseries: obspy.core.stream
                The timeseries stream as returned by the call to getWaveform
            starttime: obspy.core.UTCDateTime
                the starttime of the requested data
            endtime: obspy.core.UTCDateTime
                the endtime of the requested data
    
            Notes: the original timeseries object is changed.
            """
    
            for trace in timeseries:
                trace_starttime = UTCDateTime(trace.stats.starttime)
                trace_endtime = UTCDateTime(trace.stats.endtime)
    
                if trace.stats.starttime > starttime:
                    cnt = int((trace_starttime - starttime) / trace.stats.delta)
                    trace.data = numpy.concatenate([
                            numpy.full(cnt, numpy.nan, dtype=numpy.float64),
                            trace.data])
                    trace.stats.starttime = starttime
                if trace_endtime < endtime:
                    cnt = int((endtime - trace_endtime) / trace.stats.delta)
                    trace.data = numpy.concatenate([
                            trace.data,
                            numpy.full(cnt, numpy.nan, dtype=numpy.float64)])
                    trace.stats.endttime = endtime
    
    
        def _post_process(self, timeseries, starttime, endtime, channels):
            """Post process a timeseries stream after the raw data is
                    is fetched from a waveserver. Specifically changes
                    any MaskedArray to a ndarray with nans representing gaps.
                    Then calls _clean_timeseries to deal with gaps at the
                    beggining or end of the streams.
    
            Parameters
            ----------
            timeseries: obspy.core.stream
                The timeseries stream as returned by the call to getWaveform
            starttime: obspy.core.UTCDateTime
                the starttime of the requested data
            endtime: obspy.core.UTCDateTime
                the endtime of the requested data
            channels: array_like
                list of channels to load
    
            Notes: the original timeseries object is changed.
            """
    
            self._convert_trace_to_decimal(timeseries)
    
            for trace in timeseries:
    
                if isinstance(trace.data, numpy.ma.MaskedArray):
                    trace.data.set_fill_value(numpy.nan)
                    trace.data = trace.data.filled()
    
    
            self._clean_timeseries(timeseries, starttime, endtime)
    
            # TODO add in test for missing channel,  if so,  make it all nans?
    
    
        def _set_metadata(self, stream, observatory, channel, type, interval):
    
            """set metadata for a given stream/channel
            Parameters
            ----------
            observatory : str
                observatory code
            channel : str
                edge channel code {MVH, MVE, MVD, ...}
            type : str
                data type {definitive, quasi-definitive, variation}
            interval : str
                interval length {minute, second}
            """
    
    
                self.observatoryMetadata.set_metadata(trace.stats, observatory,
                        channel, type, interval)