diff --git a/geomagio/Controller.py b/geomagio/Controller.py index 3305ecb683635b1e426134f3fbb518825ee2350d..d8e31bdcfa1cefb84e641f56b5a4ea842d2222e4 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -1,11 +1,12 @@ """Controller class for geomag algorithms""" -from __future__ import absolute_import, print_function -from builtins import str as unicode import argparse -import sys from io import BytesIO +import sys +from typing import List, Optional, Tuple, Union + from obspy.core import Stream, UTCDateTime + from .algorithm import algorithms, AlgorithmException from .PlotTimeseriesFactory import PlotTimeseriesFactory from .StreamTimeseriesFactory import StreamTimeseriesFactory @@ -140,7 +141,7 @@ class Controller(object): ) return timeseries - def run(self, options, input_timeseries=None): + def _run(self, options, input_timeseries=None): """run controller Parameters ---------- @@ -151,15 +152,88 @@ class Controller(object): Used by run_as_update to save a double input read, since it has already read the input to confirm data can be produced. """ + self.run( + observatory=options.observatory, + starttime=options.starttime, + endtime=options.endtime, + input_channels=options.inchannels, + input_timeseries=input_timeseries, + output_channels=options.outchannels, + no_trim=options.no_trim, + rename_input_channel=options.rename_input_channel, + rename_output_channel=options.rename_output_channel, + realtime=options.realtime, + ) + + def _run_as_update(self, options, update_count=0): + """Updates data. + Parameters + ---------- + options: dictionary + The dictionary of all the command line arguments. Could in theory + contain other options passed in by the controller. + + Notes + ----- + Finds gaps in the target data, and if there's new data in the input + source, calls run with the start/end time of a given gap to fill + in. + It checks the start of the target data, and if it's missing, and + there's new data available, it backs up the starttime/endtime, + and recursively calls itself, to check the previous period, to see + if new data is available there as well. Calls run for each new + period, oldest to newest. + """ + self.run_as_update( + observatory=options.observatory, + output_observatory=options.output_observatory, + starttime=options.starttime, + endtime=options.endtime, + input_channels=options.input_channels, + output_channels=options.output_channels, + no_trim=options.no_trim, + realtime=options.realtime, + rename_input_channel=options.rename_input_channel, + rename_output_channel=options.rename_output_channel, + update_limit=options.update_limit, + ) + + def run( + self, + observatory: List[str], + starttime: UTCDateTime, + endtime: UTCDateTime, + input_channels: Optional[List[str]] = None, + input_timeseries: Optional[Stream] = None, + output_channels: Optional[List[str]] = None, + no_trim: bool = False, + realtime: Union[bool, int] = False, + rename_input_channel: Optional[List[List[str]]] = None, + rename_output_channel: Optional[List[List[str]]] = None, + ): + """Run algorithm for a specific time range. + + Parameters + ---------- + observatory: the observatory or list of observatories for processing + starttime: time of first data + endtime: time of last data + input_channels: list of channels to read + input_timeseries: used by run_as_update, which has already read input. + output_channels: list of channels to write + no_trim: whether to trim output to starttime/endtime interval + realtime: number of seconds in realtime interval + rename_input_channel: list of input channel renames + rename_output_channel: list of output channel renames + """ algorithm = self._algorithm - input_channels = options.inchannels or algorithm.get_input_channels() - output_channels = options.outchannels or algorithm.get_output_channels() + input_channels = input_channels or algorithm.get_input_channels() + output_channels = output_channels or algorithm.get_output_channels() next_starttime = algorithm.get_next_starttime() - starttime = next_starttime or options.starttime - endtime = options.endtime + starttime = next_starttime or starttime # input timeseries = input_timeseries or self._get_input_timeseries( - observatory=options.observatory, + observatory=observatory, starttime=starttime, endtime=endtime, channels=input_channels, @@ -168,30 +242,30 @@ class Controller(object): # no data to process return # pre-process - if next_starttime and options.realtime: + if next_starttime and realtime: # when running a stateful algorithms with the realtime option # pad/trim timeseries to the interval: # [next_starttime, max(timeseries.endtime, now-options.realtime)] input_start, input_end = TimeseriesUtility.get_stream_start_end_times( timeseries, without_gaps=True ) - realtime_gap = endtime - options.realtime + realtime_gap = endtime - realtime if input_end < realtime_gap: input_end = realtime_gap # pad to the start of the "realtime gap" TimeseriesUtility.pad_timeseries(timeseries, next_starttime, input_end) # process - if options.rename_input_channel: + if rename_input_channel: timeseries = self._rename_channels( - timeseries=timeseries, renames=options.rename_input_channel + timeseries=timeseries, renames=rename_input_channel ) processed = algorithm.process(timeseries) # trim if --no-trim is not set - if not options.no_trim: + if not no_trim: processed.trim(starttime=starttime, endtime=endtime) - if options.rename_output_channel: + if rename_output_channel: processed = self._rename_channels( - timeseries=processed, renames=options.rename_output_channel + timeseries=processed, renames=rename_output_channel ) # output self._outputFactory.put_timeseries( @@ -201,13 +275,36 @@ class Controller(object): channels=output_channels, ) - def run_as_update(self, options, update_count=0): - """Updates data. + def run_as_update( + self, + observatory: List[str], + output_observatory: List[str], + starttime: UTCDateTime, + endtime: UTCDateTime, + input_channels: Optional[List[str]] = None, + output_channels: Optional[List[str]] = None, + no_trim: bool = False, + realtime: Union[bool, int] = False, + rename_input_channel: Optional[List[List[str]]] = None, + rename_output_channel: Optional[List[List[str]]] = None, + update_limit: int = 1, + update_count: int = 0, + ): + """Try to fill gaps in output data. + Parameters ---------- - options: dictionary - The dictionary of all the command line arguments. Could in theory - contain other options passed in by the controller. + observatory: list of observatories for input + output_observatory: list of observatories for output + starttime: time of first data + endtime: time of last data + input_channels: list of channels to read + input_timeseries: used by run_as_update, which has already read input. + output_channels: list of channels to write + no_trim: whether to trim output to starttime/endtime interval + realtime: number of seconds in realtime interval + rename_input_channel: list of input channel renames + rename_output_channel: list of output channel renames Notes ----- @@ -221,28 +318,27 @@ class Controller(object): period, oldest to newest. """ # If an update_limit is set, make certain we don't step past it. - if options.update_limit != 0: - if update_count >= options.update_limit: + if update_limit != 0: + if update_count >= update_limit: return algorithm = self._algorithm if algorithm.get_next_starttime() is not None: raise AlgorithmException("Stateful algorithms cannot use run_as_update") - input_channels = options.inchannels or algorithm.get_input_channels() - output_observatory = options.output_observatory - output_channels = options.outchannels or algorithm.get_output_channels() + input_channels = input_channels or algorithm.get_input_channels() + output_channels = output_channels or algorithm.get_output_channels() print( "checking gaps", - options.starttime, - options.endtime, + starttime, + endtime, output_observatory, output_channels, file=sys.stderr, ) # request output to see what has already been generated output_timeseries = self._get_output_timeseries( - observatory=options.output_observatory, - starttime=options.starttime, - endtime=options.endtime, + observatory=output_observatory, + starttime=starttime, + endtime=endtime, channels=output_channels, ) if len(output_timeseries) > 0: @@ -253,15 +349,15 @@ class Controller(object): else: output_gaps = [ [ - options.starttime, - options.endtime, + starttime, + endtime, # next sample time not used None, ] ] for output_gap in output_gaps: input_timeseries = self._get_input_timeseries( - observatory=options.observatory, + observatory=observatory, starttime=output_gap[0], endtime=output_gap[1], channels=input_channels, @@ -271,26 +367,50 @@ class Controller(object): ): continue # check for fillable gap at start - if output_gap[0] == options.starttime: + if output_gap[0] == starttime: # found fillable gap at start, recurse to previous interval - interval = options.endtime - options.starttime - starttime = options.starttime - interval - endtime = options.starttime - 1 + interval = endtime - starttime + gap_starttime = starttime - interval + gap_endtime = starttime - 1 options.starttime = starttime options.endtime = endtime - self.run_as_update(options, update_count + 1) + self.run_as_update( + observatory=observatory, + output_observatory=output_observatory, + starttime=gap_starttime, + endtime=gap_endtime, + input_channels=input_channels, + output_channels=output_channels, + no_trim=no_trim, + realtime=realtime, + rename_input_channel=rename_input_channel, + rename_output_channel=rename_output_channel, + update_limit=update_limit, + update_count=update_count + 1, + ) # fill gap - options.starttime = output_gap[0] - options.endtime = output_gap[1] + gap_starttime = output_gap[0] + gap_endtime = output_gap[1] print( "processing", - options.starttime, - options.endtime, + gap_starttime, + gap_endtime, output_observatory, output_channels, file=sys.stderr, ) - self.run(options, input_timeseries) + self.run( + observatory=observatory, + starttime=gap_starttime, + endtime=gap_endtime, + input_channels=input_channels, + input_timeseries=input_timeseries, + output_channels=output_channels, + no_trim=no_trim, + realtime=realtime, + rename_input_channel=rename_input_channel, + rename_output_channel=rename_output_channel, + ) def get_input_factory(args): @@ -473,7 +593,7 @@ def main(args): parse_deprecated_arguments(args) # make sure observatory is a tuple - if isinstance(args.observatory, (str, unicode)): + if isinstance(args.observatory, str): args.observatory = (args.observatory,) if args.output_observatory is None: @@ -541,9 +661,9 @@ def _main(args): controller = Controller(input_factory, output_factory, algorithm) if args.update: - controller.run_as_update(args) + controller._run_as_update(args) else: - controller.run(args) + controller._run(args) def parse_args(args):