diff --git a/geomagio/Algorithm.py b/geomagio/Algorithm.py index 604569e5950af47b628018a20c3c9cca3b8b8edf..d4a2a45a8c78d23c334aeda6739ab8fba65b1cbb 100644 --- a/geomagio/Algorithm.py +++ b/geomagio/Algorithm.py @@ -56,9 +56,14 @@ class Algorithm(object): def get_input_interval(self, start, end): """Get Input Interval + start : UTCDateTime + start time of requested output + end : UTCDateTime + end time of requested output + Returns ------- - tuple - start/end time pair of the input interval + tuple : (input_start, input_end) + start and end of required input to generate requested output. """ return (start, end) diff --git a/geomagio/Controller.py b/geomagio/Controller.py index ebeba5f48b3c8ca336a1981ec96b322a3c8868d8..d05f49857cd4f3c7466081fd6f9ed9f5371006b2 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -1,8 +1,7 @@ """Controller class for geomag algorithms""" -import TimeseriesUtilities as TUtils +import TimeseriesUtility import TimeseriesFactoryException -import copy class Controller(object): @@ -41,21 +40,26 @@ class Controller(object): The dictionary of all the command line arguments. Could in theory contain other options passed in by the controller. """ - input_channels = self._algorithm.get_input_channels() - algorithm_start, algorithm_end = self._algorithm.get_input_interval( - options.starttime, options.endtime) - - timeseries = self._inputFactory.get_timeseries(algorithm_start, - algorithm_end, channels=input_channels) - - processed = self._algorithm.process(timeseries) - output_channels = self._algorithm.get_output_channels() - - output_channels = self._get_output_channels(output_channels, + algorithm = self._algorithm + input_channels = algorithm.get_input_channels() + output_channels = self._get_output_channels( + algorithm.get_output_channels(), options.outchannels) - - self._outputFactory.put_timeseries(timeseries=processed, - starttime=options.starttime, endtime=options.endtime, + # get input + start, end = self._algorithm.get_input_interval( + start=options.starttime, + end=options.endtime) + timeseries = self._inputFactory.get_timeseries( + starttime=start, + endtime=end, + channels=input_channels) + # process + processed = algorithm.process(timeseries) + # output + self._outputFactory.put_timeseries( + timeseries=processed, + starttime=options.starttime, + endtime=options.endtime, channels=output_channels) def run_as_update(self, options): @@ -77,45 +81,45 @@ class Controller(object): if new data is available there as well. Calls run for each new period, oldest to newest. """ - input_channels = self._algorithm.get_input_channels() - output_channels = self._algorithm.get_output_channels() - - output_channels = self._get_output_channels(output_channels, + algorithm = self._algorithm + input_channels = algorithm.get_input_channels() + output_channels = self._get_output_channels( + algorithm.get_output_channels(), options.outchannels) - - timeseries_source = self._inputFactory.get_timeseries( - options.starttime, options.endtime, channels=input_channels) - timeseries_target = self._outputFactory.get_timeseries( - options.starttime, options.endtime, channels=output_channels) - - source_gaps = TUtils.get_timeseries_gaps( - timeseries_source, input_channels, options.starttime, - options.endtime) - target_gaps = TUtils.get_timeseries_gaps( - timeseries_target, output_channels, options.starttime, - options.endtime) - source_gaps = TUtils.get_merged_gaps(source_gaps, input_channels) - target_gaps = TUtils.get_merged_gaps(target_gaps, output_channels) - del timeseries_source - del timeseries_target - - if ((not len(source_gaps) or - len(source_gaps) and source_gaps[0][0] != options.starttime) - and - len(target_gaps) and target_gaps[0][0] == options.starttime): - new_options = copy.deepcopy(options) - new_options.starttime = (options.starttime - - (options.endtime - options.starttime)) - new_options.endtime = (options.starttime - - TUtils.get_seconds_of_interval(options.interval)) - self.run_as_update(new_options) - for target_gap in target_gaps: - if not TUtils.gap_is_new_data(source_gaps, target_gap): + # request output to see what has already been generated + output_timeseries = self._outputFactory.get_timeseries( + starttime=options.starttime, + endtime=options.endtime, + channels=output_channels) + # find gaps in output, so they can be updated + output_gaps = TimeseriesUtility.get_stream_gaps(output_timeseries) + for gap in output_gaps: + start, end = algorithm.get_input_interval( + start=gap[0], + end=gap[1]) + input_timeseries = self._inputFactory.get_timeseries( + starttime=start, + endtime=end, + channels=input_channels) + input_gaps = TimeseriesUtility.get_stream_gaps(input_timeseries) + if len(input_gaps) > 0: + # TODO: are certain gaps acceptable? continue - new_options = copy.deepcopy(options) - new_options.starttime = target_gap[0] - new_options.endtime = target_gap[1] - self.run(new_options) + # check for fillable gap at start + if gap[0] == options.starttime: + # found fillable gap at start, recurse to previous interval + interval = options.endtime - options.starttime + self.run_as_update({ + 'outchannels': options.outchannels, + 'starttime': options.starttime - interval, + 'endtime': options.starttime + }) + # fill gap + self.run({ + 'outchannels': options.outchannels, + 'starttime': gap[0], + 'endtime': gap[1] + }) def _get_output_channels(self, algorithm_channels, commandline_channels): """get output channels diff --git a/geomagio/TimeseriesUtilities.py b/geomagio/TimeseriesUtilities.py deleted file mode 100644 index 2ea33df0ad69464c85f99db97f284be659bcbe42..0000000000000000000000000000000000000000 --- a/geomagio/TimeseriesUtilities.py +++ /dev/null @@ -1,171 +0,0 @@ -"""Timeseries Utilities""" -import numpy - - -def get_timeseries_gaps(timeseries, channels, starttime, endtime): - """Get gaps in a given timeseries - Parameters - ---------- - timeseries: obspy.core.stream - the stream to check for gaps in - channels: array_like - list of channels to look for gaps in - starttime: obspy.core.UTCDateTime - time of first sample. - endtime: obspy.core.UTCDateTime - time of last sample. - - Returns - ------- - dictionary of channel gaps arrays - - Notes - ----- - Returns a dictionary with channel: gaps array pairs. Where the gaps array - consists of arrays of starttime/endtime pairs representing each gap. - """ - gaps = {} - for channel in channels: - stream_gap = get_stream_gaps( - timeseries.select(channel=channel), starttime, endtime) - gaps[channel] = stream_gap - - return gaps - - -def get_stream_gaps(stream, starttime, endtime): - """Gets gaps in a stream representing a single channel - Parameters - ---------- - stream: obspy.core.stream - a stream containing a single channel of data. - starttime: obspy.core.UTCDateTime - time of first sample. - endtime: obspy.core.UTCDateTime - time of last sample. - - Returns - ------- - array of gaps - """ - gaps = [] - gap = None - - i = 0 - data = stream[0].data - length = len(data) - for i in xrange(0, length): - if numpy.isnan(data[i]) and gap is None: - gap = [starttime + i * 60] - if not numpy.isnan(data[i]) and gap is not None: - gap.append(starttime + (i - 1) * 60) - gaps.append(gap) - gap = None - if gap is not None: - gap.append(endtime) - gaps.append(gap) - - return gaps - - -def get_merged_gaps(gaps, channels): - """Get gaps merged across channels/streams - Parameters - ---------- - gaps: dictionary - contains channel/gap array pairs - channels: array_like - array of channels to look for gaps in - - Returns - ------- - array_like - an array of startime/endtime arrays representing gaps. - - Notes - ----- - Takes an dictionary of gaps, and merges those gaps across channels, - returning an array of the merged gaps. - """ - gap_stream = [] - for channel in channels: - gap_stream.extend(gaps[channel]) - - if len(gap_stream) == 0: - return [] - - sorted_gaps = sorted(gap_stream, key=lambda starttime: starttime[1]) - merged_gaps = [] - - gap = sorted_gaps[0] - for i in range(1, len(sorted_gaps)): - nxtgap = sorted_gaps[i] - if nxtgap[0] >= gap[0] and nxtgap[0] <= gap[1]: - if nxtgap[1] > gap[1]: - gap[1] = nxtgap[1] - else: - merged_gaps.append(gap) - gap = nxtgap - merged_gaps.append(gap) - - return merged_gaps - - -def is_new_data(input_gaps, output_gaps): - """Is new data available in gaps - Parameters - ---------- - input_gaps: array_like - an array of startime/endtime gap pairs holding the input gaps - output_gaps: array_like - an array of starttime/endtime gap pairs holding the ouput gaps - - Returns - boolean - True if there's new data available, False otherwise - """ - for output_gap in output_gaps: - for input_gap in input_gaps: - if (output_gap[0] >= input_gap[0] and - output_gap[0] <= input_gap[1] and - output_gap[1] <= input_gap[1]): - return False - return True - - -def gap_is_new_data(input_gaps, output_gap): - """Is new data available for a single gap - Parameters - ---------- - input_gaps: array_like - an array of startime/endtime gap pairs holding the input gaps - output_gaps: array_like - starttime/endtime pair representing a single gap - - Returns - boolean - True if there's new data available for the gap, False otherwise - """ - for input_gap in input_gaps: - if (output_gap[0] >= input_gap[0] and - output_gap[0] <= input_gap[1] and - output_gap[1] <= input_gap[1]): - return False - return True - - -def get_seconds_of_interval(interval): - """Gets number of seconds for a given interval string - Parameters - ---------- - interval: string - The string representing an interval size - """ - if interval == 'second': - return 1 - if interval == 'minute': - return 60 - if interval == 'hourly': - return 3600 - if interval == 'daily': - return 86400 diff --git a/geomagio/TimeseriesUtility.py b/geomagio/TimeseriesUtility.py new file mode 100644 index 0000000000000000000000000000000000000000..20aa4974bbb5d94db587a74030278a2ef063b20e --- /dev/null +++ b/geomagio/TimeseriesUtility.py @@ -0,0 +1,112 @@ +"""Timeseries Utilities""" +import numpy + + +def get_stream_gaps(stream): + """Get gaps in a given stream + Parameters + ---------- + stream: obspy.core.Stream + the stream to check for gaps + channels: array_like + list of channels to check for gaps + + Returns + ------- + dictionary of channel gaps arrays + + Notes + ----- + Returns a dictionary with channel: gaps array pairs. Where the gaps array + consists of arrays of starttime/endtime pairs representing each gap. + """ + gaps = {} + for trace in stream: + channel = trace.stats.channel + gaps[channel] = get_trace_gaps(trace) + return gaps + + +def get_trace_gaps(trace): + """Gets gaps in a trace representing a single channel + Parameters + ---------- + trace: obspy.core.Trace + a stream containing a single channel of data. + + Returns + ------- + array of gaps, which is empty when there are no gaps. + each gap is an array [start of gap, end of gap, next sample] + """ + gaps = [] + gap = None + data = trace.data + stats = trace.stats + starttime = stats.starttime + length = len(data) + delta = stats.delta + for i in xrange(0, length): + if numpy.isnan(data[i]): + if gap is None: + # start of a gap + gap = [starttime + i * delta] + else: + if gap is not None: + # end of a gap + gap.extend([ + starttime + (i - 1) * delta, + starttime + i * delta]) + gaps.append(gap) + gap = None + # check for gap at end + if gap is not None: + gap.extend([ + starttime + (length - 1) * delta, + starttime + length * delta]) + gaps.append(gap) + return gaps + + +def get_merged_gaps(gaps): + """Get gaps merged across channels/streams + Parameters + ---------- + gaps: dictionary + contains channel/gap array pairs + + Returns + ------- + array_like + an array of startime/endtime arrays representing gaps. + + Notes + ----- + Takes an dictionary of gaps, and merges those gaps across channels, + returning an array of the merged gaps. + """ + merged_gaps = [] + for key in gaps: + merged_gaps.extend(gaps[key]) + # sort gaps so earlier gaps are before later gaps + sorted_gaps = sorted(merged_gaps, key=lambda gap: gap[0]) + # merge gaps that overlap + merged_gaps = [] + merged_gap = None + for gap in sorted_gaps: + if merged_gap is None: + # start of gap + merged_gap = gap + elif gap[0] > merged_gap[2]: + # next gap starts after current gap ends + merged_gaps.append(merged_gap) + merged_gap = gap + elif gap[0] <= merged_gap[2]: + # next gap starts at or before next data + if gap[1] > merged_gap[1]: + # next gap ends after current gap ends, extend current + merged_gap[1] = gap[1] + merged_gap[2] = gap[2] + if merged_gap is not None: + merged_gaps.append(merged_gap) + return merged_gaps diff --git a/geomagio/TimeseriesUtility_test.py b/geomagio/TimeseriesUtility_test.py new file mode 100644 index 0000000000000000000000000000000000000000..5f5ea092a89616b7154f7a8af9a2370b6bef548c --- /dev/null +++ b/geomagio/TimeseriesUtility_test.py @@ -0,0 +1,92 @@ +#! /usr/bin/env python +from nose.tools import assert_equals +from StreamConverter_test import __create_trace +import numpy +import TimeseriesUtility +from obspy.core import Stream, UTCDateTime + + +def test_get_stream_gaps(): + """geomag.TimeseriesUtility_test.test_get_stream_gaps + + confirms that gaps are found in a stream + """ + stream = Stream([ + __create_trace('H', [numpy.nan, 1, 1, numpy.nan, numpy.nan]), + __create_trace('Z', [0, 0, 0, 1, 1, 1]) + ]) + for trace in stream: + # set time of first sample + trace.stats.starttime = UTCDateTime('2015-01-01T00:00:00Z') + # set sample rate to 1 second + trace.stats.delta = 1 + # find gaps + gaps = TimeseriesUtility.get_stream_gaps(stream) + assert_equals(len(gaps['H']), 2) + # gap at start of H + gap = gaps['H'][0] + assert_equals(gap[0], UTCDateTime('2015-01-01T00:00:00Z')) + assert_equals(gap[1], UTCDateTime('2015-01-01T00:00:00Z')) + # gap at end of H + gap = gaps['H'][1] + assert_equals(gap[0], UTCDateTime('2015-01-01T00:00:03Z')) + assert_equals(gap[1], UTCDateTime('2015-01-01T00:00:04Z')) + # no gaps in Z channel + assert_equals(len(gaps['Z']), 0) + + +def test_get_trace_gaps(): + """geomag.TimeseriesUtility_test.test_get_trace_gaps + + confirm that gaps are found in a trace + """ + trace = __create_trace('H', [1, 1, numpy.nan, numpy.nan, 0, 1]) + # set time of first sample + trace.stats.starttime = UTCDateTime('2015-01-01T00:00:00Z') + # set sample rate to 1 minute + trace.stats.delta = 60 + # find gap + gaps = TimeseriesUtility.get_trace_gaps(trace) + assert_equals(len(gaps), 1) + gap = gaps[0] + assert_equals(gap[0], UTCDateTime('2015-01-01T00:02:00Z')) + assert_equals(gap[1], UTCDateTime('2015-01-01T00:03:00Z')) + + +def test_get_merged_gaps(): + """geomag.TimeseriesUtility_test.test_get_merged_gaps + + confirm that gaps are merged + """ + merged = TimeseriesUtility.get_merged_gaps({ + 'H': [ + # gap for 2 seconds, that starts after next gap + [ + UTCDateTime('2015-01-01T00:00:01Z'), + UTCDateTime('2015-01-01T00:00:03Z'), + UTCDateTime('2015-01-01T00:00:04Z') + ] + ], + # gap for 1 second, that occurs before previous gap + 'Z': [ + [ + UTCDateTime('2015-01-01T00:00:00Z'), + UTCDateTime('2015-01-01T00:00:00Z'), + UTCDateTime('2015-01-01T00:00:01Z') + ], + [ + UTCDateTime('2015-01-01T00:00:05Z'), + UTCDateTime('2015-01-01T00:00:07Z'), + UTCDateTime('2015-01-01T00:00:08Z') + ], + ] + }) + assert_equals(len(merged), 2) + # first gap combines H and Z gaps + gap = merged[0] + assert_equals(gap[0], UTCDateTime('2015-01-01T00:00:00Z')) + assert_equals(gap[1], UTCDateTime('2015-01-01T00:00:03Z')) + # second gap is second Z gap + gap = merged[1] + assert_equals(gap[0], UTCDateTime('2015-01-01T00:00:05Z')) + assert_equals(gap[1], UTCDateTime('2015-01-01T00:00:07Z')) diff --git a/geomagio/__init__.py b/geomagio/__init__.py index 213e65e455ab34b86cadc1e7eb92a2e3bfb43e31..8ca36785f120cefbfe37ed3f4702252530a6533b 100644 --- a/geomagio/__init__.py +++ b/geomagio/__init__.py @@ -9,8 +9,8 @@ from AlgorithmException import AlgorithmException from Controller import Controller from TimeseriesFactory import TimeseriesFactory from TimeseriesFactoryException import TimeseriesFactoryException +import TimeseriesUtility from XYZAlgorithm import XYZAlgorithm -import TimeseriesUtilities __all__ = [ 'Algorithm', @@ -20,6 +20,6 @@ __all__ = [ 'StreamConverter', 'TimeseriesFactory', 'TimeseriesFactoryException', - 'XYZAlgorithm', - 'TimeseriesUtilities' + 'TimeseriesUtility', + 'XYZAlgorithm' ]