Newer
Older
Hal Simpson
committed
"""Controller class for geomag algorithms"""
import argparse
import sys
from typing import List, Optional, Tuple, Union
from obspy.core import Stream, UTCDateTime
Wilbur, Spencer Franklin
committed
from .algorithm import Algorithm, algorithms, AlgorithmException, FilterAlgorithm
from .DerivedTimeseriesFactory import DerivedTimeseriesFactory
from .PlotTimeseriesFactory import PlotTimeseriesFactory
from .StreamTimeseriesFactory import StreamTimeseriesFactory
from . import TimeseriesUtility, Util
from . import binlog
from . import edge
from . import iaga2002
from . import imfjson
from . import pcdcp
from . import imfv122
from . import imfv283
from . import temperature
from . import vbf
class Controller(object):
"""Controller for geomag algorithms.
Parameters
----------
the factory that will read in timeseries data
the factory that will output the timeseries data
Hal Simpson
committed
algorithm: Algorithm
the algorithm(s) that will procees the timeseries data
Notes
-----
Hal Simpson
committed
Run simply sends all the data in a stream to edge. If a startime/endtime is
provided, it will send the data from the stream that is within that
time span.
Update will update any data that has changed between the source, and
the target during a given timeframe. It will also attempt to
recursively backup so it can update all missing data.
def __init__(
self,
inputFactory,
outputFactory,
algorithm: Optional[Algorithm] = None,
inputInterval: Optional[str] = None,
outputInterval: Optional[str] = None,
):
self._inputFactory = inputFactory
self._inputInterval = inputInterval
self._outputInterval = outputInterval
def _get_input_timeseries(
self,
observatory,
channels,
starttime,
endtime,
algorithm=None,
interval=None,
"""Get timeseries from the input factory for requested options.
Parameters
----------
observatory : array_like
observatories to request.
channels : array_like
channels to request.
starttime : obspy.core.UTCDateTime
time of first sample to request.
endtime : obspy.core.UTCDateTime
time of last sample to request.
renames : array_like
list of channels to rename
each list item should be array_like:
the first element is the channel to rename,
the last element is the new channel name
Returns
-------
timeseries : obspy.core.Stream
"""
algorithm = algorithm or self._algorithm
timeseries = Stream()
Wilbur, Spencer Franklin
committed
# get input interval for observatory
# do this per observatory in case an
# algorithm needs different amounts of data
input_start, input_end = algorithm.get_input_interval(
start=starttime, end=endtime, observatory=obs, channels=channels
)
if input_start is None or input_end is None:
continue
timeseries += self._inputFactory.get_timeseries(
observatory=obs,
starttime=input_start,
endtime=input_end,
channels=channels,
interval=interval or self._inputInterval,
return timeseries
def _rename_channels(self, timeseries, renames):
"""Rename trace channel names.
Parameters
----------
timeseries : obspy.core.Stream
stream with channels to rename
renames : array_like
list of channels to rename
each list item should be array_like:
the first element is the channel to rename,
the last element is the new channel name
Returns
-------
timeseries : obspy.core.Stream
"""
for r in renames:
from_name, to_name = r[0], r[-1]
for t in timeseries.select(channel=from_name):
t.stats.channel = to_name
return timeseries
def _get_output_timeseries(
self,
observatory,
channels,
starttime,
endtime,
interval=None,
):
"""Get timeseries from the output factory for requested options.
Parameters
----------
observatory : array_like
observatories to request.
channels : array_like
channels to request.
starttime : obspy.core.UTCDateTime
time of first sample to request.
endtime : obspy.core.UTCDateTime
time of last sample to request.
Returns
-------
timeseries : obspy.core.Stream
"""
timeseries = Stream()
timeseries += self._outputFactory.get_timeseries(
observatory=obs,
starttime=starttime,
endtime=endtime,
channels=channels,
interval=interval or self._outputInterval,
return timeseries
def _run(self, options, input_timeseries=None):
Hal Simpson
committed
"""run controller
options: dictionary
The dictionary of all the command line arguments. Could in theory
contain other options passed in by the controller.
input_timeseries : obspy.core.Stream
Used by run_as_update to save a double input read, since it has
already read the input to confirm data can be produced.
self.run(
observatory=options.observatory,
starttime=options.starttime,
endtime=options.endtime,
input_channels=options.inchannels,
input_timeseries=input_timeseries,
output_channels=options.outchannels,
input_interval=options.input_interval or options.interval,
output_interval=options.output_interval or options.interval,
no_trim=options.no_trim,
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
rename_input_channel=options.rename_input_channel,
rename_output_channel=options.rename_output_channel,
realtime=options.realtime,
)
def _run_as_update(self, options, update_count=0):
"""Updates data.
Parameters
----------
options: dictionary
The dictionary of all the command line arguments. Could in theory
contain other options passed in by the controller.
Notes
-----
Finds gaps in the target data, and if there's new data in the input
source, calls run with the start/end time of a given gap to fill
in.
It checks t
Loading
Loading full blame...