Skip to content
Snippets Groups Projects
Commit 210de01c authored by Cain, Payton David's avatar Cain, Payton David
Browse files

Add jmfee commit for run/run_as_update arguments

parent cbdbcc83
No related branches found
No related tags found
2 merge requests!146Release CMO metadata to production,!52Update legacy
"""Controller class for geomag algorithms""" """Controller class for geomag algorithms"""
from __future__ import absolute_import, print_function
from builtins import str as unicode
import argparse import argparse
import sys
from io import BytesIO from io import BytesIO
import sys
from typing import List, Optional, Tuple, Union
from obspy.core import Stream, UTCDateTime from obspy.core import Stream, UTCDateTime
from .algorithm import algorithms, AlgorithmException from .algorithm import algorithms, AlgorithmException
from .PlotTimeseriesFactory import PlotTimeseriesFactory from .PlotTimeseriesFactory import PlotTimeseriesFactory
from .StreamTimeseriesFactory import StreamTimeseriesFactory from .StreamTimeseriesFactory import StreamTimeseriesFactory
...@@ -140,7 +141,7 @@ class Controller(object): ...@@ -140,7 +141,7 @@ class Controller(object):
) )
return timeseries return timeseries
def run(self, options, input_timeseries=None): def _run(self, options, input_timeseries=None):
"""run controller """run controller
Parameters Parameters
---------- ----------
...@@ -151,15 +152,88 @@ class Controller(object): ...@@ -151,15 +152,88 @@ class Controller(object):
Used by run_as_update to save a double input read, since it has Used by run_as_update to save a double input read, since it has
already read the input to confirm data can be produced. already read the input to confirm data can be produced.
""" """
self.run(
observatory=options.observatory,
starttime=options.starttime,
endtime=options.endtime,
input_channels=options.inchannels,
input_timeseries=input_timeseries,
output_channels=options.outchannels,
no_trim=options.no_trim,
rename_input_channel=options.rename_input_channel,
rename_output_channel=options.rename_output_channel,
realtime=options.realtime,
)
def _run_as_update(self, options, update_count=0):
"""Updates data.
Parameters
----------
options: dictionary
The dictionary of all the command line arguments. Could in theory
contain other options passed in by the controller.
Notes
-----
Finds gaps in the target data, and if there's new data in the input
source, calls run with the start/end time of a given gap to fill
in.
It checks the start of the target data, and if it's missing, and
there's new data available, it backs up the starttime/endtime,
and recursively calls itself, to check the previous period, to see
if new data is available there as well. Calls run for each new
period, oldest to newest.
"""
self.run_as_update(
observatory=options.observatory,
output_observatory=options.output_observatory,
starttime=options.starttime,
endtime=options.endtime,
input_channels=options.input_channels,
output_channels=options.output_channels,
no_trim=options.no_trim,
realtime=options.realtime,
rename_input_channel=options.rename_input_channel,
rename_output_channel=options.rename_output_channel,
update_limit=options.update_limit,
)
def run(
self,
observatory: List[str],
starttime: UTCDateTime,
endtime: UTCDateTime,
input_channels: Optional[List[str]] = None,
input_timeseries: Optional[Stream] = None,
output_channels: Optional[List[str]] = None,
no_trim: bool = False,
realtime: Union[bool, int] = False,
rename_input_channel: Optional[List[List[str]]] = None,
rename_output_channel: Optional[List[List[str]]] = None,
):
"""Run algorithm for a specific time range.
Parameters
----------
observatory: the observatory or list of observatories for processing
starttime: time of first data
endtime: time of last data
input_channels: list of channels to read
input_timeseries: used by run_as_update, which has already read input.
output_channels: list of channels to write
no_trim: whether to trim output to starttime/endtime interval
realtime: number of seconds in realtime interval
rename_input_channel: list of input channel renames
rename_output_channel: list of output channel renames
"""
algorithm = self._algorithm algorithm = self._algorithm
input_channels = options.inchannels or algorithm.get_input_channels() input_channels = input_channels or algorithm.get_input_channels()
output_channels = options.outchannels or algorithm.get_output_channels() output_channels = output_channels or algorithm.get_output_channels()
next_starttime = algorithm.get_next_starttime() next_starttime = algorithm.get_next_starttime()
starttime = next_starttime or options.starttime starttime = next_starttime or starttime
endtime = options.endtime
# input # input
timeseries = input_timeseries or self._get_input_timeseries( timeseries = input_timeseries or self._get_input_timeseries(
observatory=options.observatory, observatory=observatory,
starttime=starttime, starttime=starttime,
endtime=endtime, endtime=endtime,
channels=input_channels, channels=input_channels,
...@@ -168,30 +242,30 @@ class Controller(object): ...@@ -168,30 +242,30 @@ class Controller(object):
# no data to process # no data to process
return return
# pre-process # pre-process
if next_starttime and options.realtime: if next_starttime and realtime:
# when running a stateful algorithms with the realtime option # when running a stateful algorithms with the realtime option
# pad/trim timeseries to the interval: # pad/trim timeseries to the interval:
# [next_starttime, max(timeseries.endtime, now-options.realtime)] # [next_starttime, max(timeseries.endtime, now-options.realtime)]
input_start, input_end = TimeseriesUtility.get_stream_start_end_times( input_start, input_end = TimeseriesUtility.get_stream_start_end_times(
timeseries, without_gaps=True timeseries, without_gaps=True
) )
realtime_gap = endtime - options.realtime realtime_gap = endtime - realtime
if input_end < realtime_gap: if input_end < realtime_gap:
input_end = realtime_gap input_end = realtime_gap
# pad to the start of the "realtime gap" # pad to the start of the "realtime gap"
TimeseriesUtility.pad_timeseries(timeseries, next_starttime, input_end) TimeseriesUtility.pad_timeseries(timeseries, next_starttime, input_end)
# process # process
if options.rename_input_channel: if rename_input_channel:
timeseries = self._rename_channels( timeseries = self._rename_channels(
timeseries=timeseries, renames=options.rename_input_channel timeseries=timeseries, renames=rename_input_channel
) )
processed = algorithm.process(timeseries) processed = algorithm.process(timeseries)
# trim if --no-trim is not set # trim if --no-trim is not set
if not options.no_trim: if not no_trim:
processed.trim(starttime=starttime, endtime=endtime) processed.trim(starttime=starttime, endtime=endtime)
if options.rename_output_channel: if rename_output_channel:
processed = self._rename_channels( processed = self._rename_channels(
timeseries=processed, renames=options.rename_output_channel timeseries=processed, renames=rename_output_channel
) )
# output # output
self._outputFactory.put_timeseries( self._outputFactory.put_timeseries(
...@@ -201,13 +275,36 @@ class Controller(object): ...@@ -201,13 +275,36 @@ class Controller(object):
channels=output_channels, channels=output_channels,
) )
def run_as_update(self, options, update_count=0): def run_as_update(
"""Updates data. self,
observatory: List[str],
output_observatory: List[str],
starttime: UTCDateTime,
endtime: UTCDateTime,
input_channels: Optional[List[str]] = None,
output_channels: Optional[List[str]] = None,
no_trim: bool = False,
realtime: Union[bool, int] = False,
rename_input_channel: Optional[List[List[str]]] = None,
rename_output_channel: Optional[List[List[str]]] = None,
update_limit: int = 1,
update_count: int = 0,
):
"""Try to fill gaps in output data.
Parameters Parameters
---------- ----------
options: dictionary observatory: list of observatories for input
The dictionary of all the command line arguments. Could in theory output_observatory: list of observatories for output
contain other options passed in by the controller. starttime: time of first data
endtime: time of last data
input_channels: list of channels to read
input_timeseries: used by run_as_update, which has already read input.
output_channels: list of channels to write
no_trim: whether to trim output to starttime/endtime interval
realtime: number of seconds in realtime interval
rename_input_channel: list of input channel renames
rename_output_channel: list of output channel renames
Notes Notes
----- -----
...@@ -221,28 +318,27 @@ class Controller(object): ...@@ -221,28 +318,27 @@ class Controller(object):
period, oldest to newest. period, oldest to newest.
""" """
# If an update_limit is set, make certain we don't step past it. # If an update_limit is set, make certain we don't step past it.
if options.update_limit != 0: if update_limit != 0:
if update_count >= options.update_limit: if update_count >= update_limit:
return return
algorithm = self._algorithm algorithm = self._algorithm
if algorithm.get_next_starttime() is not None: if algorithm.get_next_starttime() is not None:
raise AlgorithmException("Stateful algorithms cannot use run_as_update") raise AlgorithmException("Stateful algorithms cannot use run_as_update")
input_channels = options.inchannels or algorithm.get_input_channels() input_channels = input_channels or algorithm.get_input_channels()
output_observatory = options.output_observatory output_channels = output_channels or algorithm.get_output_channels()
output_channels = options.outchannels or algorithm.get_output_channels()
print( print(
"checking gaps", "checking gaps",
options.starttime, starttime,
options.endtime, endtime,
output_observatory, output_observatory,
output_channels, output_channels,
file=sys.stderr, file=sys.stderr,
) )
# request output to see what has already been generated # request output to see what has already been generated
output_timeseries = self._get_output_timeseries( output_timeseries = self._get_output_timeseries(
observatory=options.output_observatory, observatory=output_observatory,
starttime=options.starttime, starttime=starttime,
endtime=options.endtime, endtime=endtime,
channels=output_channels, channels=output_channels,
) )
if len(output_timeseries) > 0: if len(output_timeseries) > 0:
...@@ -253,15 +349,15 @@ class Controller(object): ...@@ -253,15 +349,15 @@ class Controller(object):
else: else:
output_gaps = [ output_gaps = [
[ [
options.starttime, starttime,
options.endtime, endtime,
# next sample time not used # next sample time not used
None, None,
] ]
] ]
for output_gap in output_gaps: for output_gap in output_gaps:
input_timeseries = self._get_input_timeseries( input_timeseries = self._get_input_timeseries(
observatory=options.observatory, observatory=observatory,
starttime=output_gap[0], starttime=output_gap[0],
endtime=output_gap[1], endtime=output_gap[1],
channels=input_channels, channels=input_channels,
...@@ -271,26 +367,50 @@ class Controller(object): ...@@ -271,26 +367,50 @@ class Controller(object):
): ):
continue continue
# check for fillable gap at start # check for fillable gap at start
if output_gap[0] == options.starttime: if output_gap[0] == starttime:
# found fillable gap at start, recurse to previous interval # found fillable gap at start, recurse to previous interval
interval = options.endtime - options.starttime interval = endtime - starttime
starttime = options.starttime - interval gap_starttime = starttime - interval
endtime = options.starttime - 1 gap_endtime = starttime - 1
options.starttime = starttime options.starttime = starttime
options.endtime = endtime options.endtime = endtime
self.run_as_update(options, update_count + 1) self.run_as_update(
observatory=observatory,
output_observatory=output_observatory,
starttime=gap_starttime,
endtime=gap_endtime,
input_channels=input_channels,
output_channels=output_channels,
no_trim=no_trim,
realtime=realtime,
rename_input_channel=rename_input_channel,
rename_output_channel=rename_output_channel,
update_limit=update_limit,
update_count=update_count + 1,
)
# fill gap # fill gap
options.starttime = output_gap[0] gap_starttime = output_gap[0]
options.endtime = output_gap[1] gap_endtime = output_gap[1]
print( print(
"processing", "processing",
options.starttime, gap_starttime,
options.endtime, gap_endtime,
output_observatory, output_observatory,
output_channels, output_channels,
file=sys.stderr, file=sys.stderr,
) )
self.run(options, input_timeseries) self.run(
observatory=observatory,
starttime=gap_starttime,
endtime=gap_endtime,
input_channels=input_channels,
input_timeseries=input_timeseries,
output_channels=output_channels,
no_trim=no_trim,
realtime=realtime,
rename_input_channel=rename_input_channel,
rename_output_channel=rename_output_channel,
)
def get_input_factory(args): def get_input_factory(args):
...@@ -473,7 +593,7 @@ def main(args): ...@@ -473,7 +593,7 @@ def main(args):
parse_deprecated_arguments(args) parse_deprecated_arguments(args)
# make sure observatory is a tuple # make sure observatory is a tuple
if isinstance(args.observatory, (str, unicode)): if isinstance(args.observatory, str):
args.observatory = (args.observatory,) args.observatory = (args.observatory,)
if args.output_observatory is None: if args.output_observatory is None:
...@@ -541,9 +661,9 @@ def _main(args): ...@@ -541,9 +661,9 @@ def _main(args):
controller = Controller(input_factory, output_factory, algorithm) controller = Controller(input_factory, output_factory, algorithm)
if args.update: if args.update:
controller.run_as_update(args) controller._run_as_update(args)
else: else:
controller.run(args) controller._run(args)
def parse_args(args): def parse_args(args):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment