-
Jeremy M Fee authoredJeremy M Fee authored
Controller.py 29.98 KiB
"""Controller class for geomag algorithms"""
from __future__ import absolute_import, print_function
from builtins import str as unicode
import argparse
import sys
from io import BytesIO
from obspy.core import Stream, UTCDateTime
from .algorithm import algorithms
from .PlotTimeseriesFactory import PlotTimeseriesFactory
from .StreamTimeseriesFactory import StreamTimeseriesFactory
from . import TimeseriesUtility, Util
# factory packages
from . import binlog
from . import edge
from . import iaga2002
from . import imfjson
from . import pcdcp
from . import imfv122
from . import imfv283
from . import temperature
from . import vbf
class Controller(object):
"""Controller for geomag algorithms.
Parameters
----------
inputFactory: TimeseriesFactory
the factory that will read in timeseries data
outputFactory: TimeseriesFactory
the factory that will output the timeseries data
algorithm: Algorithm
the algorithm(s) that will procees the timeseries data
Notes
-----
Has 2 basic modes.
Run simply sends all the data in a stream to edge. If a startime/endtime is
provided, it will send the data from the stream that is within that
time span.
Update will update any data that has changed between the source, and
the target during a given timeframe. It will also attempt to
recursively backup so it can update all missing data.
"""
def __init__(self, inputFactory, outputFactory, algorithm):
self._inputFactory = inputFactory
self._algorithm = algorithm
self._outputFactory = outputFactory
def _get_input_timeseries(self, observatory, channels, starttime, endtime):
"""Get timeseries from the input factory for requested options.
Parameters
----------
observatory : array_like
observatories to request.
channels : array_like
channels to request.
starttime : obspy.core.UTCDateTime
time of first sample to request.
endtime : obspy.core.UTCDateTime
time of last sample to request.
renames : array_like
list of channels to rename
each list item should be array_like:
the first element is the channel to rename,
the last element is the new channel name
Returns
-------
timeseries : obspy.core.Stream
"""
timeseries = Stream()
for obs in observatory:
# get input interval for observatory
# do this per observatory in case an
# algorithm needs different amounts of data
input_start, input_end = self._algorithm.get_input_interval(
start=starttime,
end=endtime,
observatory=obs,
channels=channels)
timeseries += self._inputFactory.get_timeseries(
observatory=obs,
starttime=input_start,
endtime=input_end,
channels=channels)
return timeseries
def _rename_channels(self, timeseries, renames):
"""Rename trace channel names.
Parameters
----------
timeseries : obspy.core.Stream
stream with channels to rename
renames : array_like
list of channels to rename
each list item should be array_like:
the first element is the channel to rename,
the last element is the new channel name
Returns
-------
timeseries : obspy.core.Stream
"""
for r in renames:
from_name, to_name = r[0], r[-1]
for t in timeseries.select(channel=from_name):
t.stats.channel = to_name
return timeseries
def _get_output_timeseries(self, observatory, channels, starttime,
endtime):
"""Get timeseries from the output factory for requested options.
Parameters
----------
observatory : array_like
observatories to request.
channels : array_like
channels to request.
starttime : obspy.core.UTCDateTime
time of first sample to request.
endtime : obspy.core.UTCDateTime
time of last sample to request.
Returns
-------
timeseries : obspy.core.Stream
"""
timeseries = Stream()
for obs in observatory:
timeseries += self._outputFactory.get_timeseries(
observatory=obs,
starttime=starttime,
endtime=endtime,
channels=channels)
return timeseries
def run(self, options, input_timeseries=None):
"""run controller
Parameters
----------
options: dictionary
The dictionary of all the command line arguments. Could in theory
contain other options passed in by the controller.
input_timeseries : obspy.core.Stream
Used by run_as_update to save a double input read, since it has
already read the input to confirm data can be produced.
"""
algorithm = self._algorithm
input_channels = options.inchannels or \
algorithm.get_input_channels()
output_channels = options.outchannels or \
algorithm.get_output_channels()
# input
timeseries = input_timeseries or self._get_input_timeseries(
observatory=options.observatory,
starttime=options.starttime,
endtime=options.endtime,
channels=input_channels)
if timeseries.count() == 0:
return
# process
if options.rename_input_channel:
timeseries = self._rename_channels(
timeseries=timeseries,
renames=options.rename_input_channel)
processed = algorithm.process(timeseries)
# trim if --no-trim is not set
if not options.no_trim:
processed.trim(starttime=options.starttime,
endtime=options.endtime)
if options.rename_output_channel:
processed = self._rename_channels(
timeseries=processed,
renames=options.rename_output_channel)
# output
self._outputFactory.put_timeseries(
timeseries=processed,
starttime=options.starttime,
endtime=options.endtime,
channels=output_channels)
def run_as_update(self, options, update_count=0):
"""Updates data.
Parameters
----------
options: dictionary
The dictionary of all the command line arguments. Could in theory
contain other options passed in by the controller.
Notes
-----
Finds gaps in the target data, and if there's new data in the input
source, calls run with the start/end time of a given gap to fill
in.
It checks the start of the target data, and if it's missing, and
there's new data available, it backs up the starttime/endtime,
and recursively calls itself, to check the previous period, to see
if new data is available there as well. Calls run for each new
period, oldest to newest.
"""
# If an update_limit is set, make certain we don't step past it.
if options.update_limit != 0:
if update_count >= options.update_limit:
return
algorithm = self._algorithm
input_channels = options.inchannels or \
algorithm.get_input_channels()
output_observatory = options.output_observatory
output_channels = options.outchannels or \
algorithm.get_output_channels()
print('checking gaps', options.starttime, options.endtime,
output_observatory, output_channels,
file=sys.stderr)
# request output to see what has already been generated
output_timeseries = self._get_output_timeseries(
observatory=options.output_observatory,
starttime=options.starttime,
endtime=options.endtime,
channels=output_channels)
if len(output_timeseries) > 0:
# find gaps in output, so they can be updated
output_gaps = TimeseriesUtility.get_merged_gaps(
TimeseriesUtility.get_stream_gaps(output_timeseries))
else:
output_gaps = [[
options.starttime,
options.endtime,
# next sample time not used
None
]]
for output_gap in output_gaps:
input_timeseries = self._get_input_timeseries(
observatory=options.observatory,
starttime=output_gap[0],
endtime=output_gap[1],
channels=input_channels)
if not algorithm.can_produce_data(
starttime=output_gap[0],
endtime=output_gap[1],
stream=input_timeseries):
continue
# check for fillable gap at start
if output_gap[0] == options.starttime:
# found fillable gap at start, recurse to previous interval
interval = options.endtime - options.starttime
starttime = options.starttime - interval
endtime = options.starttime - 1
options.starttime = starttime
options.endtime = endtime
self.run_as_update(options, update_count + 1)
# fill gap
options.starttime = output_gap[0]
options.endtime = output_gap[1]
print('processing', options.starttime, options.endtime,
output_observatory, output_channels,
file=sys.stderr)
self.run(options, input_timeseries)
def get_input_factory(args):
"""Parse input factory arguments.
Parameters
----------
args : argparse.Namespace
arguments
Returns
-------
TimeseriesFactory
input timeseries factory
"""
input_factory = None
input_factory_args = None
input_stream = None
# standard arguments
input_factory_args = {}
input_factory_args['interval'] = args.interval
input_factory_args['observatory'] = args.observatory
input_factory_args['type'] = args.type
# stream/url arguments
if args.input_file is not None:
input_stream = open(args.input_file, 'r')
elif args.input_stdin:
input_stream = sys.stdin
elif args.input_url is not None:
if '{' in args.input_url:
input_factory_args['urlInterval'] = args.input_url_interval
input_factory_args['urlTemplate'] = args.input_url
else:
input_stream = BytesIO(Util.read_url(args.input_url))
input_type = args.input
if input_type == 'edge':
input_factory = edge.EdgeFactory(
host=args.input_host,
port=args.input_port,
locationCode=args.locationcode,
**input_factory_args)
elif input_type == 'goes':
# TODO: deal with other goes arguments
input_factory = imfv283.GOESIMFV283Factory(
directory=args.input_goes_directory,
getdcpmessages=args.input_goes_getdcpmessages,
password=args.input_goes_password,
server=args.input_goes_server,
user=args.input_goes_user,
**input_factory_args)
else:
# stream compatible factories
if input_type == 'iaga2002':
input_factory = iaga2002.IAGA2002Factory(**input_factory_args)
elif input_type == 'imfv122':
input_factory = imfv122.IMFV122Factory(**input_factory_args)
elif input_type == 'imfv283':
input_factory = imfv283.IMFV283Factory(**input_factory_args)
elif input_type == 'pcdcp':
input_factory = pcdcp.PCDCPFactory(**input_factory_args)
# wrap stream
if input_stream is not None:
input_factory = StreamTimeseriesFactory(
factory=input_factory,
stream=input_stream)
return input_factory
def get_output_factory(args):
"""Parse output factory arguments.
Parameters
----------
args : argparse.Namespace
arguments
Returns
-------
TimeseriesFactory
output timeseries factory
"""
output_factory = None
output_factory_args = None
output_stream = None
output_url = None
# standard arguments
output_factory_args = {}
output_factory_args['interval'] = args.interval
output_factory_args['observatory'] = args.output_observatory
output_factory_args['type'] = args.type
# stream/url arguments
if args.output_file is not None:
output_stream = open(args.output_file, 'wb')
elif args.output_stdout:
try:
# python 3
output_stream = sys.stdout.buffer
except AttributeError:
# python 2
output_stream = sys.stdout
elif args.output_url is not None:
output_url = args.output_url
output_factory_args['urlInterval'] = args.output_url_interval
output_factory_args['urlTemplate'] = output_url
output_type = args.output
if output_type == 'edge':
# TODO: deal with other edge arguments
locationcode = args.outlocationcode or args.locationcode or None
output_factory = edge.EdgeFactory(
host=args.output_host,
port=args.output_edge_read_port,
write_port=args.output_port,
locationCode=locationcode,
tag=args.output_edge_tag,
forceout=args.output_edge_forceout,
**output_factory_args)
elif output_type == 'plot':
output_factory = PlotTimeseriesFactory()
else:
# stream compatible factories
if output_type == 'binlog':
output_factory = binlog.BinLogFactory(**output_factory_args)
elif output_type == 'iaga2002':
output_factory = iaga2002.IAGA2002Factory(**output_factory_args)
elif output_type == 'imfjson':
output_factory = imfjson.IMFJSONFactory(**output_factory_args)
elif output_type == 'pcdcp':
output_factory = pcdcp.PCDCPFactory(**output_factory_args)
elif output_type == 'temperature':
output_factory = temperature.TEMPFactory(**output_factory_args)
elif output_type == 'vbf':
output_factory = vbf.VBFFactory(**output_factory_args)
# wrap stream
if output_stream is not None:
output_factory = StreamTimeseriesFactory(
factory=output_factory,
stream=output_stream)
return output_factory
def main(args):
"""command line factory for geomag algorithms
Inputs
------
use geomag.py --help to see inputs, or see parse_args.
Notes
-----
parses command line options using argparse, then calls the controller
with instantiated I/O factories, and algorithm(s)
"""
# TODO: remove argument mapping in future version
# map legacy input arguments
usingDeprecated = False
if args.input_edge is not None:
args.input = 'edge'
args.input_host = args.input_edge
args.input_port = args.input_edge_port
usingDeprecated = True
elif args.input_iaga_file is not None:
args.input = 'iaga2002'
args.input_file = args.input_iaga_file
usingDeprecated = True
elif args.input_iaga_stdin:
args.input = 'iaga2002'
args.input_stdin = True
usingDeprecated = True
elif args.input_iaga_url is not None:
args.input = 'iaga2002'
args.input_url = args.input_iaga_url
usingDeprecated = True
elif args.input_imfv283_file is not None:
args.input = 'imfv283'
args.input_file = args.input_imfv283_file
usingDeprecated = True
elif args.input_imfv283_url is not None:
args.input = 'imfv283'
args.input_url = args.input_imfv283_url
usingDeprecated = True
elif args.input_imfv283_goes:
args.input = 'goes'
usingDeprecated = True
# map legacy output arguments
if args.output_edge is not None:
args.output = 'edge'
args.output_host = args.output_edge
args.output_port = args.edge_write_port
usingDeprecated = True
elif args.output_iaga_file is not None:
args.output = 'iaga2002'
args.output_file = args.output_iaga_file
usingDeprecated = True
elif args.output_iaga_stdout:
args.output = 'iaga2002'
args.output_stdout = True
usingDeprecated = True
elif args.output_iaga_url is not None:
args.output = 'iaga2002'
args.output_url = args.output_iaga_url
usingDeprecated = True
elif args.output_pcdcp_file is not None:
args.output = 'pcdcp'
args.output_file = args.output_pcdcp_file
usingDeprecated = True
elif args.output_pcdcp_stdout:
args.output = 'pcdcp'
args.output_stdout = True
usingDeprecated = True
elif args.output_pcdcp_url is not None:
args.output = 'pcdcp'
args.output_url = args.output_pcdcp_url
usingDeprecated = True
elif args.output_plot:
args.output = 'plot'
usingDeprecated = True
if usingDeprecated:
print('WARNING: you are using deprecated arguments,' +
' please update your usage', file=sys.stderr)
# TODO check for unused arguments.
# make sure observatory is a tuple
if isinstance(args.observatory, (str, unicode)):
args.observatory = (args.observatory,)
if args.output_observatory is None:
args.output_observatory = args.observatory
elif args.observatory_foreach:
raise Exception("Cannot specify" +
" --output-observatory and --observatory-foreach")
if args.observatory_foreach:
observatory = args.observatory
observatory_exception = None
for obs in observatory:
args.observatory = (obs,)
args.output_observatory = (obs,)
try:
_main(args)
except Exception as e:
print("Exception processing observatory {}".format(obs),
str(e),
file=sys.stderr)
if observatory_exception:
print("Exceptions occurred during processing", file=sys.stderr)
sys.exit(1)
else:
_main(args)
def _main(args):
"""Actual main method logic, called by main
Parameters
----------
args : argparse.Namespace
command line arguments
"""
# create controller
input_factory = get_input_factory(args)
output_factory = get_output_factory(args)
algorithm = algorithms[args.algorithm]()
algorithm.configure(args)
controller = Controller(input_factory, output_factory, algorithm)
if args.realtime:
now = UTCDateTime()
args.endtime = UTCDateTime(now.year, now.month, now.day,
now.hour, now.minute)
if args.interval == 'minute':
args.starttime = args.endtime - 3600
else:
args.starttime = args.endtime - 600
if args.update:
controller.run_as_update(args)
else:
controller.run(args)
def parse_args(args):
"""parse input arguments
Parameters
----------
args : list of strings
Returns
-------
argparse.Namespace
dictionary like object containing arguments.
"""
parser = argparse.ArgumentParser(
description='Use @ to read commands from a file.',
fromfile_prefix_chars='@',)
parser.add_argument('--starttime',
type=UTCDateTime,
default=None,
help='UTC date YYYY-MM-DD HH:MM:SS')
parser.add_argument('--endtime',
type=UTCDateTime,
default=None,
help='UTC date YYYY-MM-DD HH:MM:SS')
parser.add_argument('--observatory',
default=(None,),
help='Observatory code ie BOU, CMO, etc.' +
' CAUTION: Using multiple observatories is not' +
' recommended in most cases; especially with' +
' single observatory formats like IAGA and PCDCP.',
nargs='*',
type=str)
parser.add_argument('--output-observatory',
default=None,
help='Defaults to valur of --observatory argument.' +
' Observatory code ie BOU, CMO, etc.' +
' CAUTION: Using multiple observatories is not' +
' recommended in most cases; especially with' +
' single observatory formats like IAGA and PCDCP.',
nargs='*',
type=str)
parser.add_argument('--observatory-foreach',
action='store_true',
default=False,
help='When specifying multiple observatories, process'
' each observatory separately')
parser.add_argument('--inchannels',
nargs='*',
help='Channels H, E, Z, etc')
parser.add_argument('--outchannels',
nargs='*',
default=None,
help='Channels H, E, Z, etc')
parser.add_argument('--type',
default='variation',
choices=['variation', 'quasi-definitive', 'definitive'])
parser.add_argument('--rename-input-channel',
action='append',
help='Rename an input channel after it is read',
metavar=('FROM', 'TO'),
nargs=2)
parser.add_argument('--rename-output-channel',
action='append',
help='Rename an output channel before it is written',
metavar=('FROM', 'TO'),
nargs=2)
parser.add_argument('--locationcode',
help='EDGE location code, e.g. "R0", "R1"',
type=edge.LocationCode)
parser.add_argument('--outlocationcode',
help='EDGE output location code'
' (if different from --locationcode)',
type=edge.LocationCode)
parser.add_argument('--interval',
default='minute',
choices=['hourly', 'minute', 'second'])
parser.add_argument('--update',
action='store_true',
default=False,
help='Used to update data')
parser.add_argument('--update-limit',
type=int,
default=0,
help='Used to limit the number of iterations update will recurse')
parser.add_argument('--no-trim',
action='store_true',
default=False,
help='Ensures output data will not be trimmed down'),
parser.add_argument('--input-edge-port',
type=int,
default=2060,
help='deprecated. \
Input port # for edge input, defaults to 2060')
parser.add_argument('--output-edge-port',
type=int,
dest='edge_write_port',
default=7981,
help='deprecated. \
Edge port for writing realtime data, defaults to 7981')
parser.add_argument('--output-edge-cwb-port',
type=int,
dest='edge_write_port',
default='7981',
help='deprecated. \
Edge port for writing older data. Not used by geomag.')
parser.add_argument('--output-edge-read-port',
type=int,
default=2060,
help='deprecated. \
Edge port for reading output data, defaults to 2060')
parser.add_argument('--output-edge-tag',
default='GEOMAG',
help='ID Tag for edge connections, defaults to GEOMAG')
parser.add_argument('--output-edge-forceout',
action='store_true',
default=False,
help='Flag to force data into miniseed blocks. Should only ' +
'be used when certain the data is self contained.')
parser.add_argument('--realtime',
action='store_true',
default=False,
help='Flag to run the last hour if interval is minute, ' +
'or the last 10 minutes if interval is seconds')
parser.add_argument('--input-goes-directory',
default='.',
help='Directory for support files for goes input of imfv283 data')
parser.add_argument('--input-goes-getdcpmessages',
default='',
help='Location of getDcpMessages.')
parser.add_argument('--input-goes-password',
default='',
help='Password for goes user')
parser.add_argument('--input-goes-server',
nargs='*',
help='The server name(s) to retrieve the GOES data from')
parser.add_argument('--input-goes-user',
default='GEOMAG',
help='The user name to use to retrieve data from GOES')
# Input group
input_group = parser.add_mutually_exclusive_group(required=True)
input_group.add_argument('--input',
help='Input format',
choices=(
'edge',
'goes',
'iaga2002',
'imfv122',
'imfv283',
'pcdcp'))
parser.add_argument('--input-file',
help='Read from specified file')
parser.add_argument('--input-host',
default='cwbpub.cr.usgs.gov',
help='Hostname or IP address')
parser.add_argument('--input-port',
default=2060,
help='Port number',
type=int)
parser.add_argument('--input-stdin',
action='store_true',
default=False,
help='Read from standard input')
parser.add_argument('--input-url',
help='Read from a url pattern')
parser.add_argument('--input-url-interval',
default=86400,
help='Read url interval in seconds',
type=int)
input_group.add_argument('--input-edge',
help='deprecated. \
Host IP #, see --input-edge-port for optional args')
input_group.add_argument('--input-iaga-file',
help='deprecated. Reads from the specified file.')
input_group.add_argument('--input-iaga-stdin',
action='store_true',
default=False,
help='deprecated. \
Pass in an iaga file using redirection from stdin.')
input_group.add_argument('--input-iaga-url',
help='deprecated. \
Example: file://./%%(obs)s%%(ymd)s%%(t)s%%(i)s.%%(i)s')
input_group.add_argument('--input-imfv283-file',
help='deprecated. Reads from the specified file.')
input_group.add_argument('--input-imfv283-stdin',
action='store_true',
default=False,
help='deprecated. \
Pass in a file using redirection from stdin')
input_group.add_argument('--input-imfv283-url',
help='deprecated. Example file://./')
input_group.add_argument('--input-imfv283-goes',
action='store_true',
default=False,
help='deprecated. \
Retrieves data directly from a goes server to read')
input_group.add_argument('--input-pcdcp-file',
help='deprecated. Reads from the specified file.')
input_group.add_argument('--input-pcdcp-stdin',
action='store_true',
default=False,
help='deprecated. \
Pass in an pcdcp file using redirection from stdin.')
input_group.add_argument('--input-pcdcp-url',
help='deprecated. Example: file://./%%(obs)s%%(Y)s%%(j)s.%%(i)s')
# Output group
output_group = parser.add_mutually_exclusive_group(required=True)
output_group.add_argument('--output-iaga-file',
help='deprecated. Write to a single iaga file.')
output_group.add_argument('--output-iaga-stdout',
action='store_true', default=False,
help='deprecated. Write to stdout.')
output_group.add_argument('--output-iaga-url',
help='deprecated. \
Example: file://./%%(obs)s%%(ymd)s%%(t)s%%(i)s.%%(i)s')
output_group.add_argument('--output-pcdcp-file',
help='deprecated. Write to a single pcdcp file.')
output_group.add_argument('--output-pcdcp-stdout',
action='store_true', default=False,
help='deprecated. Write to stdout.')
output_group.add_argument('--output-pcdcp-url',
help='deprecated. Example: file://./%%(obs)s%%(Y)s%%(j)s.%%(i)s')
output_group.add_argument('--output-edge',
help='deprecated. \
Edge IP #. See --output-edge-* for other optional arguments')
output_group.add_argument('--output-plot',
action='store_true',
default=False,
help='deprecated. Plot the algorithm output using matplotlib')
# output arguments
output_group.add_argument('--output',
choices=(
'binlog',
'edge',
'iaga2002',
'imfjson',
'pcdcp',
'plot',
'temperature',
'vbf'
),
# TODO: set default to 'iaga2002'
help='Output format')
parser.add_argument('--output-file',
help='Write to specified file')
parser.add_argument('--output-host',
default='cwbpub.cr.usgs.gov',
help='Write to specified host')
parser.add_argument('--output-port',
default=7981,
help='Write to specified port',
type=int)
parser.add_argument('--output-stdout',
action='store_true',
default=False,
help='Write to standard output')
parser.add_argument('--output-url',
help='Write to a file:// url pattern')
parser.add_argument('--output-url-interval',
default=86400,
help='Output interval in seconds',
type=int)
# Algorithms group
parser.add_argument('--algorithm',
choices=[k for k in algorithms],
default='identity')
for k in algorithms:
algorithms[k].add_arguments(parser)
return parser.parse_args(args)