diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index 5b800d7ec241ffeda6171906daa951a01fbd2ae2..311e56443b1583f85d4784a2e474a946fe8b92d5 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -17,7 +17,6 @@ from geomagio import TimeseriesFactory, TimeseriesFactoryException from obspy import earthworm from ObservatoryMetadata import ObservatoryMetadata import numpy.ma as ma -import sys HOURSECONDS = 3600 DAYMINUTES = 1440 @@ -129,6 +128,8 @@ class EdgeFactory(TimeseriesFactory): self._post_process(timeseries, starttime, endtime, channels) + print timeseries + return timeseries def put_timeseries(self, timeseries, observatory=None, @@ -161,71 +162,47 @@ class EdgeFactory(TimeseriesFactory): for channel in channels: if timeseries.select(channel=channel).count() == 0: - sys.stderr.write('Missing channel %s for output, continuing \n' - % channel) - continue + raise TimeseriesFactoryException( + 'Missing channel "%s" for output' % channel) + + for channel in channels: self._convert_stream_to_masked(timeseries=timeseries, channel=channel) for trace in timeseries.select(channel=channel).split(): self._put_trace(trace, observatory, channel, type, interval) - def _put_trace(self, trace, observatory, channel, type, interval): - """Put trace + def _clean_timeseries(self, timeseries, starttime, endtime): + """Realigns timeseries data so the start and endtimes are the same + as what was originally asked for, even if the data was during + a gap. Parameters ---------- - trace: obspy.core.Stream.Trace - trace object with data to be written - observatory: str - observatory code. - channel: str - channel to be written - type: {'variation', 'quasi-definitive', 'definitive'} - data type. - interval: {'daily', 'hourly', 'minute', 'second'} - data interval. - """ - station = self._get_edge_station(observatory, channel, - type, interval) - location = self._get_edge_location(observatory, channel, - type, interval) - network = self._get_edge_network(observatory, channel, - type, interval) - edge_channel = self._get_edge_channel(observatory, channel, - type, interval) - - self.ric = RawInputClient(self.tag, self.host, self.port, - self.cwbhost, self.cwbport) + timeseries: obspy.core.stream + The timeseries stream as returned by the call to getWaveform + starttime: obspy.core.UTCDateTime + the starttime of the requested data + endtime: obspy.core.UTCDateTime + the endtime of the requested data - totalsamps = len(trace.data) - starttime = trace.stats.starttime - if self.interval == 'second': - nsamp = HOURSECONDS - timeoffset = 1 - samplerate = 1. - elif self.interval == 'minute': - nsamp = DAYMINUTES - timeoffset = 60 - samplerate = 1. / 60 + Notes: the original timeseries object is changed. + """ + for trace in timeseries: + trace_starttime = UTCDateTime(trace.stats.starttime) + trace_endtime = UTCDateTime(trace.stats.endtime) - for i in xrange(0, totalsamps, nsamp): - location = self.locationCode or location - seedname = self.ric.create_seedname(station, - edge_channel, location, network) - if totalsamps - i < nsamp: - endsample = totalsamps - else: - endsample = i + nsamp - nsamp = endsample - i - endtime = starttime + (nsamp - 1) * timeoffset - trace_send = trace.slice(starttime, endtime) - self._convert_trace_to_int(trace_send) - print 'send' - self.ric.send(seedname, nsamp, trace_send.data, starttime, - samplerate, 0, 0, 0, 0) - starttime += nsamp * timeoffset - self.ric.forceout(seedname) - self.ric.close() + if trace.stats.starttime > starttime: + cnt = int((trace_starttime - starttime) / trace.stats.delta) + trace.data = numpy.concatenate([ + numpy.full(cnt, numpy.nan, dtype=numpy.float64), + trace.data]) + trace.stats.starttime = starttime + if trace_endtime < endtime: + cnt = int((endtime - trace_endtime) / trace.stats.delta) + trace.data = numpy.concatenate([ + trace.data, + numpy.full(cnt, numpy.nan, dtype=numpy.float64)]) + trace.stats.endttime = endtime def _convert_trace_to_decimal(self, stream): """convert geomag edge traces stored as ints, to decimal by dividing @@ -268,11 +245,16 @@ class EdgeFactory(TimeseriesFactory): for trace in timeseries.select(channel=channel): trace.data = ma.masked_invalid(trace.data) - def _get_edge_network(self, observatory, channel, type, interval): - """get edge network code. + def _create_missing_channel(self, starttime, endtime, observatory, + channel, type, interval, network, station, location): + """fill a missing channel with nans. Parameters ---------- + starttime: obspy.core.UTCDateTime + the starttime of the requested data + endtime: obspy.core.UTCDateTime + the endtime of the requested data observatory : str observatory code channel : str @@ -281,34 +263,36 @@ class EdgeFactory(TimeseriesFactory): data type {definitive, quasi-definitive, variation} interval : str interval length {minute, second} - + network: str + the network code + station: str + the observatory station code + location: str + the location code Returns ------- - network - always NT + obspy.core.Stream + stream of the requested channel data """ - return 'NT' - - def _get_edge_station(self, observatory, channel, type, interval): - """get edge station. - - Parameters - ---------- - observatory : str - observatory code - channel : str - single character channel {H, E, D, Z, F} - type : str - data type {definitive, quasi-definitive, variation} - interval : str - interval length {minute, second} + stats = obspy.core.Stats() + stats.channel = channel + stats.starttime = starttime + stats.network = network + stats.station = station + stats.location = location + if interval == 'second': + stats.sampling_rate = 1. + elif interval == 'minute': + stats.sampling_rate = 1 / 60. + elif interval == 'hourly': + stats.sampling_rate = 1 / 3600 + elif interval == 'daily': + stats.sampling_rate = 1 / 86400 + length = int((endtime - starttime) * stats.sampling_rate) + stats.npts = length + 1 - Returns - ------- - station - the observatory is returned as the station - """ - return observatory + data = numpy.full(length, numpy.nan, dtype=numpy.float64) + return obspy.core.Stream(obspy.core.Trace(data, stats)) def _get_edge_channel(self, observatory, channel, type, interval): """get edge channel. @@ -380,6 +364,48 @@ class EdgeFactory(TimeseriesFactory): location = 'D0' return location + def _get_edge_network(self, observatory, channel, type, interval): + """get edge network code. + + Parameters + ---------- + observatory : str + observatory code + channel : str + single character channel {H, E, D, Z, F} + type : str + data type {definitive, quasi-definitive, variation} + interval : str + interval length {minute, second} + + Returns + ------- + network + always NT + """ + return 'NT' + + def _get_edge_station(self, observatory, channel, type, interval): + """get edge station. + + Parameters + ---------- + observatory : str + observatory code + channel : str + single character channel {H, E, D, Z, F} + type : str + data type {definitive, quasi-definitive, variation} + interval : str + interval length {minute, second} + + Returns + ------- + station + the observatory is returned as the station + """ + return observatory + def _get_interval_code(self, interval): """get edge interval code. @@ -450,43 +476,13 @@ class EdgeFactory(TimeseriesFactory): data = self.client.getWaveform(network, station, location, edge_channel, starttime, endtime) data.merge() + if data.count() == 0: + data = self._create_missing_channel(starttime, endtime, observatory, + channel, type, interval, network, station, location) self._set_metadata(data, observatory, channel, type, interval) return data - def _clean_timeseries(self, timeseries, starttime, endtime): - """Realigns timeseries data so the start and endtimes are the same - as what was originally asked for, even if the data was during - a gap. - - Parameters - ---------- - timeseries: obspy.core.stream - The timeseries stream as returned by the call to getWaveform - starttime: obspy.core.UTCDateTime - the starttime of the requested data - endtime: obspy.core.UTCDateTime - the endtime of the requested data - - Notes: the original timeseries object is changed. - """ - for trace in timeseries: - trace_starttime = UTCDateTime(trace.stats.starttime) - trace_endtime = UTCDateTime(trace.stats.endtime) - - if trace.stats.starttime > starttime: - cnt = int((trace_starttime - starttime) / trace.stats.delta) - trace.data = numpy.concatenate([ - numpy.full(cnt, numpy.nan, dtype=numpy.float64), - trace.data]) - trace.stats.starttime = starttime - if trace_endtime < endtime: - cnt = int((endtime - trace_endtime) / trace.stats.delta) - trace.data = numpy.concatenate([ - trace.data, - numpy.full(cnt, numpy.nan, dtype=numpy.float64)]) - trace.stats.endttime = endtime - def _post_process(self, timeseries, starttime, endtime, channels): """Post process a timeseries stream after the raw data is is fetched from a waveserver. Specifically changes @@ -515,6 +511,63 @@ class EdgeFactory(TimeseriesFactory): self._clean_timeseries(timeseries, starttime, endtime) + def _put_trace(self, trace, observatory, channel, type, interval): + """Put trace + + Parameters + ---------- + trace: obspy.core.Stream.Trace + trace object with data to be written + observatory: str + observatory code. + channel: str + channel to be written + type: {'variation', 'quasi-definitive', 'definitive'} + data type. + interval: {'daily', 'hourly', 'minute', 'second'} + data interval. + """ + station = self._get_edge_station(observatory, channel, + type, interval) + location = self._get_edge_location(observatory, channel, + type, interval) + network = self._get_edge_network(observatory, channel, + type, interval) + edge_channel = self._get_edge_channel(observatory, channel, + type, interval) + + self.ric = RawInputClient(self.tag, self.host, self.port, + self.cwbhost, self.cwbport) + + totalsamps = len(trace.data) + starttime = trace.stats.starttime + if self.interval == 'second': + nsamp = HOURSECONDS + timeoffset = 1 + samplerate = 1. + elif self.interval == 'minute': + nsamp = DAYMINUTES + timeoffset = 60 + samplerate = 1. / 60 + + for i in xrange(0, totalsamps, nsamp): + location = self.locationCode or location + seedname = self.ric.create_seedname(station, + edge_channel, location, network) + if totalsamps - i < nsamp: + endsample = totalsamps + else: + endsample = i + nsamp + nsamp = endsample - i + endtime = starttime + (nsamp - 1) * timeoffset + trace_send = trace.slice(starttime, endtime) + self._convert_trace_to_int(trace_send) + self.ric.send(seedname, nsamp, trace_send.data, starttime, + samplerate, 0, 0, 0, 0) + starttime += nsamp * timeoffset + self.ric.forceout(seedname) + self.ric.close() + def _set_metadata(self, stream, observatory, channel, type, interval): """set metadata for a given stream/channel Parameters