Newer
Older
Hal Simpson
committed
"""Controller class for geomag algorithms"""
from __future__ import absolute_import, print_function
import argparse
import sys
from obspy.core import Stream, UTCDateTime
from .PlotTimeseriesFactory import PlotTimeseriesFactory
from .StreamTimeseriesFactory import StreamTimeseriesFactory
from . import TimeseriesUtility, Util
from . import binlog
from . import edge
from . import iaga2002
from . import imfjson
from . import pcdcp
from . import imfv122
from . import imfv283
from . import temperature
from . import vbf
class Controller(object):
"""Controller for geomag algorithms.
Parameters
----------
the factory that will read in timeseries data
the factory that will output the timeseries data
Hal Simpson
committed
algorithm: Algorithm
the algorithm(s) that will procees the timeseries data
Notes
-----
Hal Simpson
committed
Run simply sends all the data in a stream to edge. If a startime/endtime is
provided, it will send the data from the stream that is within that
time span.
Update will update any data that has changed between the source, and
the target during a given timeframe. It will also attempt to
recursively backup so it can update all missing data.
def __init__(self, inputFactory, outputFactory, algorithm):
self._inputFactory = inputFactory
self._algorithm = algorithm
self._outputFactory = outputFactory
def _get_input_timeseries(self, observatory, channels, starttime, endtime):
"""Get timeseries from the input factory for requested options.
Parameters
----------
observatory : array_like
observatories to request.
channels : array_like
channels to request.
starttime : obspy.core.UTCDateTime
time of first sample to request.
endtime : obspy.core.UTCDateTime
time of last sample to request.
renames : array_like
list of channels to rename
each list item should be array_like:
the first element is the channel to rename,
the last element is the new channel name
Returns
-------
timeseries : obspy.core.Stream
"""
timeseries = Stream()
# get input interval for observatory
# do this per observatory in case an
# algorithm needs different amounts of data
input_start, input_end = self._algorithm.get_input_interval(
start=starttime, end=endtime, observatory=obs, channels=channels
)
if input_start is None or input_end is None:
continue
timeseries += self._inputFactory.get_timeseries(
observatory=obs,
starttime=input_start,
endtime=input_end,
channels=channels,
)
return timeseries
def _rename_channels(self, timeseries, renames):
"""Rename trace channel names.
Parameters
----------
timeseries : obspy.core.Stream
stream with channels to rename
renames : array_like
list of channels to rename
each list item should be array_like:
the first element is the channel to rename,
the last element is the new channel name
Returns
-------
timeseries : obspy.core.Stream
"""
for r in renames:
from_name, to_name = r[0], r[-1]
for t in timeseries.select(channel=from_name):
t.stats.channel = to_name
return timeseries
def _get_output_timeseries(self, observatory, channels, starttime, endtime):
"""Get timeseries from the output factory for requested options.
Parameters
----------
observatory : array_like
observatories to request.
channels : array_like
channels to request.
starttime : obspy.core.UTCDateTime
time of first sample to request.
endtime : obspy.core.UTCDateTime
time of last sample to request.
Returns
-------
timeseries : obspy.core.Stream
"""
timeseries = Stream()
timeseries += self._outputFactory.get_timeseries(
observatory=obs, starttime=starttime, endtime=endtime, channels=channels
)
return timeseries
def run(self, options, input_timeseries=None):
Hal Simpson
committed
"""run controller
options: dictionary
The dictionary of all the command line arguments. Could in theory
contain other options passed in by the controller.
input_timeseries : obspy.core.Stream
Used by run_as_update to save a double input read, since it has
already read the input to confirm data can be produced.
algorithm = self._algorithm
input_channels = options.inchannels or algorithm.get_input_channels()
output_channels = options.outchannels or algorithm.get_output_channels()

Jeremy M Fee
committed
next_starttime = algorithm.get_next_starttime()
starttime = next_starttime or options.starttime
endtime = options.endtime
# input
timeseries = input_timeseries or self._get_input_timeseries(
observatory=options.observatory,
starttime=starttime,
endtime=endtime,
channels=input_channels,
)
Hal Simpson
committed
if timeseries.count() == 0:

Jeremy M Fee
committed
# no data to process
Hal Simpson
committed
return

Jeremy M Fee
committed
# pre-process
if next_starttime and options.realtime:
# when running a stateful algorithms with the realtime option
# pad/trim timeseries to the interval:
# [next_starttime, max(timeseries.endtime, now-options.realtime)]
input_start, input_end = TimeseriesUtility.get_stream_start_end_times(
timeseries, without_gaps=True
)

Jeremy M Fee
committed
realtime_gap = endtime - options.realtime
if input_end < realtime_gap:
input_end = realtime_gap
# pad to the start of the "realtime gap"
TimeseriesUtility.pad_timeseries(timeseries, next_starttime, input_end)
# process
if options.rename_input_channel:
timeseries = self._rename_channels(
timeseries=timeseries, renames=options.rename_input_channel
)
processed = algorithm.process(timeseries)
if options.output_interval in ["hour", "day"]:
starttime = processed[0].stats.starttime
endtime = processed[0].stats.endtime
processed.trim(starttime=starttime, endtime=endtime)
if options.rename_output_channel:
processed = self._rename_channels(
timeseries=processed, renames=options.rename_output_channel
)
# output
self._outputFactory.put_timeseries(
timeseries=processed,
starttime=starttime,
endtime=endtime,
channels=output_channels,
)
Hal Simpson
committed
Hal Simpson
committed
def run_as_update(self, options, update_count=0):
Loading
Loading full blame...