From 1d3db2d422cd9b1557758a2e036b115053ee8d49 Mon Sep 17 00:00:00 2001 From: Hal Simpson <hasimpson@usgs.gov> Date: Wed, 1 Jul 2015 00:12:59 -0600 Subject: [PATCH] Abandoned realtime update method over all new data, replaced with recursive method for a single timespan at a time, oldest to newest. --- geomagio/Controller.py | 152 +++++++++++++++++++++-------------------- 1 file changed, 79 insertions(+), 73 deletions(-) diff --git a/geomagio/Controller.py b/geomagio/Controller.py index 2f7b3bf3..b37031a1 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -14,7 +14,25 @@ class Controller(object): the factory that will read in timeseries data outputFactory: TimeseriesFactory the factory that will output the timeseries data - algorithm: the algorithm(s) that will take procees the timeseries data + algorithm: Algorithm + the algorithm(s) that will procees the timeseries data + update: boolean + indicates that data is to be updated. + interval: string + the data interval {daily, hourly, minute, second} + update_realtime: boolean + indicates + + Notes + ----- + Has 2(3) basic modes. + Run simply sends all the data in a stream to edge. If a startime/endtime is + provided, it will send the data from the stream that is within that + time span. + Update will update any data that has changed between the source, and + the target during a given timeframe. If the update_realtime flag + is set, it will attempt to recursively backup so it can update all + missing data. """ def __init__(self, inputFactory, outputFactory, algorithm, update=False, @@ -25,88 +43,76 @@ class Controller(object): self._update = update self._interval = interval self._update_realtime = update_realtime + self._interval_in_seconds = TUtils.get_seconds_of_interval(interval) def run(self, starttime, endtime): - """run an algorithm as setup up by the main script. - + """run controller Parameters ---------- - starttime : UTCDateTime - time of first sample to be worked on. - endtime : UTCDateTime - time of last sample to be worked on. + starttime: obspy.core.UTCDateTime + time of first sample. None if starttime should come from dataset + endtime: obspy.core.UTCDateTime + endtime of last sampel. None if endtime should come from dataset """ - if (self._update): - self.run_as_update(starttime, endtime) - else: - self.run_as_timeseries(starttime, endtime) - - def run_as_update(self, starttime, endtime): input_channels = self._algorithm.get_input_channels() - output_channels = self._algorithm.get_output_channels() - seconds_of_interval = TUtils.get_seconds_of_interval(self._interval) - backup_start = starttime - backup_end = endtime - backup = self._update_realtime - - # get new input_channel data, and last output data. - timeseries_in = self._inputFactory.get_timeseries(backup_start, - endtime, channels=input_channels) - timeseries_update = self._outputFactory.get_timeseries(backup_start, - endtime, channels=output_channels) - # repeat until ouput is succesfully backfilled. - while backup: - first_in_exists = self._first_value_exists(timeseries_in, - input_channels) - first_out_exists = self._first_value_exists(timeseries_update, - output_channels) - if first_in_exists and not first_out_exists: - new_backup_end = backup_start - seconds_of_interval - backup_start = backup_start - (backup_end - backup_start) - backup_end = new_backup_end - timeseries_in += self._inputFactory.get_timeseries( - backup_start, backup_end, channels=input_channels) - timeseries_update += self._outputFactory.get_timeseries( - backup_start, backup_end, channels=output_channels) - timeseries_in.merge() - timeseries_update.merge() - else: - backup = False - print backup_start, backup_end - starttime = backup_start + algorithm_start, algorithm_end = self._algorithm.get_input_interval( + starttime, endtime) - input_gaps = TUtils.get_timeseries_gaps( - timeseries_in, input_channels, starttime, endtime) - output_gaps = TUtils.get_timeseries_gaps( - timeseries_update, output_channels, starttime, endtime) + timeseries = self._inputFactory.get_timeseries(algorithm_start, + algorithm_end, channels=input_channels) - input_merged_gaps = TUtils.get_merged_gaps( - input_gaps, input_channels) - output_merged_gaps = TUtils.get_merged_gaps( - output_gaps, output_channels) - - if TUtils.is_new_data(input_merged_gaps, output_merged_gaps): - pass - - # TODO call algorithm - - # TODO call output - - def _first_value_exists(self, timeseries, channels): - for channel in channels: - stream = timeseries.select(channel=channel) - if len(stream[0].data) and not numpy.isnan(stream[0].data[0]): - return True - return False + processed = self._algorithm.process(timeseries) + output_channels = self._algorithm.get_output_channels() + self._outputFactory.put_timeseries(timeseries=processed, + starttime=starttime, endtime=endtime, + channels=output_channels) - def run_as_timeseries(self, starttime, endtime): + def run_as_update(self, starttime, endtime): + """Updates data. + Parameters + ---------- + starttime: obspy.core.UTCDateTime + time of first sample. None if starttime should come from dataset + endtime: obspy.core.UTCDateTime + endtime of last sampel. None if endtime should come from dataset + + Notes + ----- + Finds gaps in the target data, and if there's new data in the input + source, calls run with the start/end time of a given gap to fill + in. + If the update_realtime flag is set, it checks the start of the target + data, and if it's missing, and there's new data available, it backs + up the starttime/endtime, and recursively calls itself, to check + the previous period, to see if new data is available there as well. + Calls run for each new period, oldest to newest. + """ input_channels = self._algorithm.get_input_channels() + output_channels = self._algorithm.get_output_channels() - timeseries = self._inputFactory.get_timeseries(starttime, endtime, - channels=input_channels) + timeseries_source = self._inputFactory.get_timeseries(starttime, + endtime, channels=input_channels) + timeseries_target = self._outputFactory.get_timeseries(starttime, + endtime, channels=output_channels) - processed = self._algorithm.process(timeseries) - output_channels = self._algorithm.get_output_channels() - self._outputFactory.put_timeseries(timeseries=processed, - channels=output_channels) + source_gaps = TUtils.get_timeseries_gaps( + timeseries_source, input_channels, starttime, endtime) + target_gaps = TUtils.get_timeseries_gaps( + timeseries_target, output_channels, starttime, endtime) + source_gaps = TUtils.get_merged_gaps( + source_gaps, input_channels) + target_gaps = TUtils.get_merged_gaps( + target_gaps, output_channels) + del timeseries_source + del timeseries_target + if (self._update_realtime and + (not len(source_gaps) or + len(source_gaps) and source_gaps[0][0] != starttime) and + len(target_gaps) and target_gaps[0][0] == starttime): + self.run_as_update((starttime - (endtime - starttime)), + (starttime - self._interval_in_seconds)) + for target_gap in target_gaps: + if not TUtils.gap_is_new_data(source_gaps, target_gap): + continue + self.run(target_gap[0], target_gap[1]) -- GitLab