diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index ad4d5786ab478c991382bb176f04ac0b4606c9d2..b569683a3cab425893fb84728a06d84dd1488c37 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -16,10 +16,8 @@ from RawInputClient import RawInputClient from geomagio import TimeseriesFactory, TimeseriesFactoryException from obspy import earthworm from ObservatoryMetadata import ObservatoryMetadata -import numpy.ma as ma - -HOURSECONDS = 3600 -DAYMINUTES = 1440 +from datetime import datetime +import numpy.ma class EdgeFactory(TimeseriesFactory): @@ -75,6 +73,7 @@ class EdgeFactory(TimeseriesFactory): self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata() self.tag = tag self.locationCode = locationCode + self.interval = interval self.host = host self.port = port self.cwbhost = cwbhost or '' @@ -128,12 +127,10 @@ class EdgeFactory(TimeseriesFactory): self._post_process(timeseries, starttime, endtime, channels) - print timeseries - return timeseries - def put_timeseries(self, timeseries, observatory=None, - channels=None, type=None, interval=None): + def put_timeseries(self, timeseries, starttime=None, endtime=None, + observatory=None, channels=None, type=None, interval=None): """Put timeseries data Parameters @@ -155,10 +152,14 @@ class EdgeFactory(TimeseriesFactory): channel and that trace should have an ndarray, with nan's representing gaps. """ - observatory = observatory or self.observatory + stats = timeseries[0].stats + observatory = observatory or self.observatory or stats.station channels = channels or self.channels - type = type or self.type - interval = interval or self.interval + type = type or self.type or stats.data_type + interval = interval or self.interval or stats.data_interval + + if (starttime == None or endtime == None): + starttime, endtime = self._get_stream_start_end_times(timeseries) for channel in channels: if timeseries.select(channel=channel).count() == 0: @@ -168,8 +169,8 @@ class EdgeFactory(TimeseriesFactory): for channel in channels: self._convert_stream_to_masked(timeseries=timeseries, channel=channel) - for trace in timeseries.select(channel=channel).split(): - self._put_trace(trace, observatory, channel, type, interval) + self._put_channel(timeseries, observatory, channel, type, + interval, starttime, endtime) def _clean_timeseries(self, timeseries, starttime, endtime): """Realigns timeseries data so the start and endtimes are the same @@ -243,7 +244,7 @@ class EdgeFactory(TimeseriesFactory): the channel to be masked. """ for trace in timeseries.select(channel=channel): - trace.data = ma.masked_invalid(trace.data) + trace.data = numpy.ma.masked_invalid(trace.data) def _create_missing_channel(self, starttime, endtime, observatory, channel, type, interval, network, station, location): @@ -484,6 +485,30 @@ class EdgeFactory(TimeseriesFactory): observatory, channel, type, interval) return data + def _get_stream_start_end_times(self, timeseries): + """get start and end times from a stream. + Traverses all traces, and find the earliest starttime, and + the latest endtime. + Parameters + ---------- + timeseries: obspy.core.stream + The timeseries stream + + Returns + ------- + tuple: (starttime, endtime) + starttime: UTCDateTime + endtime: UTCDateTime + """ + starttime = UTCDateTime(datetime.now()) + endtime = UTCDateTime(0) + for trace in timeseries: + if trace.stats.starttime < starttime: + starttime = trace.stats.starttime + if trace.stats.endtime > endtime: + endtime = trace.stats.endtime + return (starttime, endtime) + def _post_process(self, timeseries, starttime, endtime, channels): """Post process a timeseries stream after the raw data is is fetched from a waveserver. Specifically changes @@ -512,21 +537,28 @@ class EdgeFactory(TimeseriesFactory): self._clean_timeseries(timeseries, starttime, endtime) - def _put_trace(self, trace, observatory, channel, type, interval): - """Put trace + def _put_channel(self, timeseries, observatory, channel, type, interval, + starttime, endtime): + """Put a channel worth of data Parameters ---------- - trace: obspy.core.Stream.Trace - trace object with data to be written + timeseries: obspy.core.Stream + timeseries object with data to be written observatory: str observatory code. channel: str - channel to be written + channel to load type: {'variation', 'quasi-definitive', 'definitive'} data type. interval: {'daily', 'hourly', 'minute', 'second'} data interval. + starttime: UTCDateTime + endtime: UTCDateTime + + Notes + ----- + RawInputClient seems to only work when sockets are """ station = self._get_edge_station(observatory, channel, type, interval) @@ -537,35 +569,24 @@ class EdgeFactory(TimeseriesFactory): edge_channel = self._get_edge_channel(observatory, channel, type, interval) - self.ric = RawInputClient(self.tag, self.host, self.port, - self.cwbhost, self.cwbport) - - totalsamps = len(trace.data) - starttime = trace.stats.starttime - if self.interval == 'second': - nsamp = HOURSECONDS - timeoffset = 1 - samplerate = 1. - elif self.interval == 'minute': - nsamp = DAYMINUTES - timeoffset = 60 - samplerate = 1. / 60 - - for i in xrange(0, totalsamps, nsamp): - location = self.locationCode or location - seedname = self.ric.create_seedname(station, - edge_channel, location, network) - if totalsamps - i < nsamp: - endsample = totalsamps - else: - endsample = i + nsamp - nsamp = endsample - i - endtime = starttime + (nsamp - 1) * timeoffset - trace_send = trace.slice(starttime, endtime) + now = UTCDateTime(datetime.utcnow()) + if ((now - endtime) > 864000) and (self.cwbport > 0): + host = self.cwbhost + port = self.cwbport + else: + host = self.host + port = self.port + + self.ric = RawInputClient(self.tag, host, port) + + seedname = self.ric.create_seedname(station, + edge_channel, location, network) + + for trace in timeseries.select(channel=channel).split(): + trace.trim(starttime, endtime) + trace_send = trace.copy() self._convert_trace_to_int(trace_send) - self.ric.send(seedname, nsamp, trace_send.data, starttime, - samplerate, 0, 0, 0, 0) - starttime += nsamp * timeoffset + self.ric.send_trace(seedname, interval, trace_send) self.ric.forceout(seedname) self.ric.close()