From d5b774d02d739ce215dfcbf1781a294c9ac868d3 Mon Sep 17 00:00:00 2001 From: Hal Simpson <hasimpson@usgs.gov> Date: Wed, 6 May 2015 00:13:12 -0600 Subject: [PATCH] Completed comments. Changed _send_trace to _put_trace, and moved RawInputClient initialization and close into _put_trace. Added routines to change a trace to a masked array, and change channels into int. --- geomagio/edge/EdgeFactory.py | 177 ++++++++++++++++++++++------------- 1 file changed, 113 insertions(+), 64 deletions(-) diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index fbe558ebc..86187c3e2 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -17,10 +17,12 @@ from geomagio import TimeseriesFactory, TimeseriesFactoryException from obspy import earthworm from ObservatoryMetadata import ObservatoryMetadata import numpy.ma as ma +import sys HOURSECONDS = 3600 DAYMINUTES = 1440 + class EdgeFactory(TimeseriesFactory): """TimeseriesFactory for Edge related data. @@ -30,28 +32,55 @@ class EdgeFactory(TimeseriesFactory): a string representing the IP number of the host to connect to. port: integer the port number the waveserver is listening on. + observatory: str + the observatory code for the desired observatory. + channels: array + an array of channels {H, D, E, F, Z} + type: str + the data type {variation, quasi-definitive, definitive} + interval: str + the data interval {daily, hourly, minute, second} observatoryMetadata: ObservatoryMetadata object an ObservatoryMetadata object used to replace the default ObservatoryMetadata. locationCode: str the location code for the given edge server, overrides type - in get_timeseries/put_timeseries. + in get_timeseries/put_timeseries + cwbhost: str + a string represeting the IP number of the cwb host to connect to. + cwbport: int + the port number of the cwb host to connect to. + tag: str + A tag used by edge to log and associate a socket with a given data + source See Also -------- TimeseriesFactory + + Notes + ----- + This is designed to read from any earthworm style waveserver, but it + currently only writes to an edge. Edge mimics an earthworm style + waveserver close enough that we hope to maintain that compatibility + for reading. """ def __init__(self, host=None, port=None, observatory=None, channels=None, type=None, interval=None, observatoryMetadata=None, locationCode=None, - cwbhost='', cwbport = 0, tag='EdgeFactory'): + cwbhost=None, cwbport=0, tag='GeomagAlg'): TimeseriesFactory.__init__(self, observatory, channels, type, interval) self.client = earthworm.Client(host, port) - self.ric = ric = RawInputClient(tag, host, port, cwbhost, cwbport) self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata() - self.locationCode=locationCode + self.tag = tag + self.locationCode = locationCode + self.host = host + self.port = port + self.cwbhost = cwbhost or '' + self.cwbport = cwbport + self.ric = None def get_timeseries(self, starttime, endtime, observatory=None, channels=None, type=None, interval=None): @@ -59,17 +88,17 @@ class EdgeFactory(TimeseriesFactory): Parameters ---------- - starttime : obspy.core.UTCDateTime + starttime: obspy.core.UTCDateTime time of first sample. - endtime : obspy.core.UTCDateTime + endtime: obspy.core.UTCDateTime time of last sample. - observatory : str + observatory: str observatory code. - channels : array_like + channels: array_like list of channels to load - type : {'variation', 'quasi-definitive'} + type: {'variation', 'quasi-definitive', 'definitive'} data type. - interval : {'minute', 'second'} + interval: {'daily', 'hourly', 'minute', 'second'} data interval. Returns @@ -102,60 +131,64 @@ class EdgeFactory(TimeseriesFactory): return timeseries - def put_timeseries(self, timeseries, starttime=None, endtime=None, - observatory = None, channels=None, type=None, interval=None): + def put_timeseries(self, timeseries, observatory=None, + channels=None, type=None, interval=None): """Put timeseries data Parameters ---------- - observatory : str + timeseries: obspy.core.Stream + timeseries object with data to be written + observatory: str observatory code. - starttime : obspy.core.UTCDateTime - time of first sample. - endtime : obspy.core.UTCDateTime - time of last sample. - channels : array_like + channels: array_like list of channels to load - type : {'variation', 'quasi-definitive'} + type: {'variation', 'quasi-definitive', 'definitive'} data type. - interval : {'minute', 'second'} + interval: {'daily', 'hourly', 'minute', 'second'} data interval. - Returns - ------- - obspy.core.Stream - timeseries object with requested data. - - Raises - ------ - TimeseriesFactoryException - if invalid values are requested, or errors occur while - retrieving timeseries. + Notes + ----- + Streams sent to timeseries are expected to have a single trace per + channel and that trace should have an ndarray, with nan's + representing gaps. """ observatory = observatory or self.observatory channels = channels or self.channels type = type or self.type interval = interval or self.interval - stats = timeseries[0].stats - - starttime = starttime or stats.starttime - endtime = endtime or stats.endtime - - if starttime > endtime: - raise TimeseriesFactoryException( - 'Starttime before endtime "%s" "%s"' % (starttime, endtime)) - for c in channels: - timeseries.select(channel=c)[0].data = ma.masked_invalid( - timeseries.select(channel=c)[0].data) - for tr in timeseries.select(channel=c).split(): - # print tr.data - self._send_trace(tr, observatory, c, type, interval) - # tr.write(tr.id + ".asc", format='SH_ASC') + self.ric = RawInputClient(self.tag, self.host, self.port, + self.cwbhost, self.cwbport) + for channel in channels: + if timeseries.select(channel=channel).count() == 0: + sys.stderr.write('Missing channel %s for output, continuing \n' + % channel) + continue + self._convert_stream_to_masked(timeseries=timeseries, + channel=channel) + for trace in timeseries.select(channel=channel).split(): + self._put_trace(trace, observatory, channel, type, interval) self.ric.close() - def _send_trace(self, trace, observatory, channel, type, interval): + def _put_trace(self, trace, observatory, channel, type, interval): + """Put trace + + Parameters + ---------- + trace: obspy.core.Stream.Trace + trace object with data to be written + observatory: str + observatory code. + channel: str + channel to be written + type: {'variation', 'quasi-definitive', 'definitive'} + data type. + interval: {'daily', 'hourly', 'minute', 'second'} + data interval. + """ station = self._get_edge_station(observatory, channel, type, interval) location = self._get_edge_location(observatory, channel, @@ -164,41 +197,34 @@ class EdgeFactory(TimeseriesFactory): type, interval) edge_channel = self._get_edge_channel(observatory, channel, type, interval) + totalsamps = len(trace.data) starttime = trace.stats.starttime if self.interval == 'second': nsamp = HOURSECONDS timeoffset = 1 + samplerate = 1. elif self.interval == 'minute': nsamp = DAYMINUTES timeoffset = 60 - - print trace.stats.npts - print totalsamps + samplerate = 1. / 60 for i in xrange(0, totalsamps, nsamp): location = self.locationCode or location seedname = self.ric.create_seedname(station, edge_channel, location, network) - if i - totalsamps < nsamp: + if totalsamps - i < nsamp: endsample = totalsamps else: endsample = i + nsamp nsamp = endsample - i - endtime = starttime + nsamp * timeoffset + endtime = starttime + (nsamp - 1) * timeoffset trace_send = trace.slice(starttime, endtime) self._convert_trace_to_int(trace_send) - samplerate = 1. / 60 - print seedname - print nsamp - print starttime - print endtime - print samplerate - print ' ' - self.ric.send(seedname, nsamp, trace_send, starttime, - samplerate, 0, 0, 0, 0) + self.ric.send(seedname, nsamp, trace_send.data, starttime, + samplerate, 0, 0, 0, 0) starttime += nsamp * timeoffset - self.ric.forceout(seedname) + self.ric.forceout(seedname) def _convert_trace_to_decimal(self, stream): """convert geomag edge traces stored as ints, to decimal by dividing @@ -211,12 +237,36 @@ class EdgeFactory(TimeseriesFactory): for trace in stream: trace.data = numpy.divide(trace.data, 1000.00) - # this doesn't work on ndarray with nan's in it. - # the trace must be a masked array. def _convert_trace_to_int(self, trace): + """convert geomag edge traces stored as decimal, to ints by multiplying + by 1000 + + Parameters + ---------- + stream : obspy.core.stream + a stream retrieved from a geomag edge representing one channel. + + Notes + ----- + this doesn't work on ndarray with nan's in it. + the trace must be a masked array. + """ trace.data = numpy.multiply(trace.data, 1000.00) trace.data = trace.data.astype(int) + def _convert_stream_to_masked(self, timeseries, channel): + """convert geomag edge traces in a timeseries stream to a MaskedArray + This allows for gaps and splitting. + Parameters + ---------- + stream : obspy.core.stream + a stream retrieved from a geomag edge representing one channel. + channel: string + the channel to be masked. + """ + for trace in timeseries.select(channel=channel): + trace.data = ma.masked_invalid(trace.data) + def _get_edge_network(self, observatory, channel, type, interval): """get edge network code. @@ -463,7 +513,6 @@ class EdgeFactory(TimeseriesFactory): trace.data = trace.data.filled() self._clean_timeseries(timeseries, starttime, endtime) - # TODO add in test for missing channel, if so, make it all nans? def _set_metadata(self, stream, observatory, channel, type, interval): """set metadata for a given stream/channel -- GitLab