diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index 61cc2093bffa2efb8529361948b11aa62bd9597f..fbe558ebcee419558f4640749b845b88bd1c35fb 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -12,10 +12,14 @@ Edge is the USGS earthquake hazard centers replacement for earthworm. import obspy.core from obspy.core.utcdatetime import UTCDateTime import numpy +from RawInputClient import RawInputClient from geomagio import TimeseriesFactory, TimeseriesFactoryException from obspy import earthworm from ObservatoryMetadata import ObservatoryMetadata +import numpy.ma as ma +HOURSECONDS = 3600 +DAYMINUTES = 1440 class EdgeFactory(TimeseriesFactory): """TimeseriesFactory for Edge related data. @@ -29,6 +33,9 @@ class EdgeFactory(TimeseriesFactory): observatoryMetadata: ObservatoryMetadata object an ObservatoryMetadata object used to replace the default ObservatoryMetadata. + locationCode: str + the location code for the given edge server, overrides type + in get_timeseries/put_timeseries. See Also -------- @@ -37,11 +44,14 @@ class EdgeFactory(TimeseriesFactory): def __init__(self, host=None, port=None, observatory=None, channels=None, type=None, interval=None, - observatoryMetadata=None): + observatoryMetadata=None, locationCode=None, + cwbhost='', cwbport = 0, tag='EdgeFactory'): TimeseriesFactory.__init__(self, observatory, channels, type, interval) self.client = earthworm.Client(host, port) + self.ric = ric = RawInputClient(tag, host, port, cwbhost, cwbport) self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata() + self.locationCode=locationCode def get_timeseries(self, starttime, endtime, observatory=None, channels=None, type=None, interval=None): @@ -92,8 +102,8 @@ class EdgeFactory(TimeseriesFactory): return timeseries - def put_timeseries(self, starttime, endtime, observatory=None, - channels=None, type=None, interval=None): + def put_timeseries(self, timeseries, starttime=None, endtime=None, + observatory = None, channels=None, type=None, interval=None): """Put timeseries data Parameters @@ -122,7 +132,73 @@ class EdgeFactory(TimeseriesFactory): if invalid values are requested, or errors occur while retrieving timeseries. """ - raise NotImplementedError('"put_timeseries" not implemented') + observatory = observatory or self.observatory + channels = channels or self.channels + type = type or self.type + interval = interval or self.interval + stats = timeseries[0].stats + + starttime = starttime or stats.starttime + endtime = endtime or stats.endtime + + if starttime > endtime: + raise TimeseriesFactoryException( + 'Starttime before endtime "%s" "%s"' % (starttime, endtime)) + + for c in channels: + timeseries.select(channel=c)[0].data = ma.masked_invalid( + timeseries.select(channel=c)[0].data) + for tr in timeseries.select(channel=c).split(): + # print tr.data + self._send_trace(tr, observatory, c, type, interval) + # tr.write(tr.id + ".asc", format='SH_ASC') + + self.ric.close() + + def _send_trace(self, trace, observatory, channel, type, interval): + station = self._get_edge_station(observatory, channel, + type, interval) + location = self._get_edge_location(observatory, channel, + type, interval) + network = self._get_edge_network(observatory, channel, + type, interval) + edge_channel = self._get_edge_channel(observatory, channel, + type, interval) + totalsamps = len(trace.data) + starttime = trace.stats.starttime + if self.interval == 'second': + nsamp = HOURSECONDS + timeoffset = 1 + elif self.interval == 'minute': + nsamp = DAYMINUTES + timeoffset = 60 + + print trace.stats.npts + print totalsamps + + for i in xrange(0, totalsamps, nsamp): + location = self.locationCode or location + seedname = self.ric.create_seedname(station, + edge_channel, location, network) + if i - totalsamps < nsamp: + endsample = totalsamps + else: + endsample = i + nsamp + nsamp = endsample - i + endtime = starttime + nsamp * timeoffset + trace_send = trace.slice(starttime, endtime) + self._convert_trace_to_int(trace_send) + samplerate = 1. / 60 + print seedname + print nsamp + print starttime + print endtime + print samplerate + print ' ' + self.ric.send(seedname, nsamp, trace_send, starttime, + samplerate, 0, 0, 0, 0) + starttime += nsamp * timeoffset + self.ric.forceout(seedname) def _convert_trace_to_decimal(self, stream): """convert geomag edge traces stored as ints, to decimal by dividing @@ -135,6 +211,12 @@ class EdgeFactory(TimeseriesFactory): for trace in stream: trace.data = numpy.divide(trace.data, 1000.00) + # this doesn't work on ndarray with nan's in it. + # the trace must be a masked array. + def _convert_trace_to_int(self, trace): + trace.data = numpy.multiply(trace.data, 1000.00) + trace.data = trace.data.astype(int) + def _get_edge_network(self, observatory, channel, type, interval): """get edge network code. @@ -236,12 +318,15 @@ class EdgeFactory(TimeseriesFactory): returns an edge location code """ location = None - if type == 'variation': - location = 'R0' - elif type == 'quasi-definitive': - location = 'Q0' - elif type == 'definitive': - location = 'D0' + if self.locationCode is not None: + location = self.locationCode + else: + if type == 'variation': + location = 'R0' + elif type == 'quasi-definitive': + location = 'Q0' + elif type == 'definitive': + location = 'D0' return location def _get_interval_code(self, interval):