Skip to content
Snippets Groups Projects
Commit d82d3cd9 authored by Jeremy M Fee's avatar Jeremy M Fee
Browse files

Add support for renaming input/output channels, and multiple observatories

parent c998bedf
No related branches found
No related tags found
No related merge requests found
......@@ -3,11 +3,9 @@
import argparse
import sys
from obspy.core import UTCDateTime
from obspy.core import Stream, UTCDateTime
from algorithm import algorithms
import TimeseriesUtility
from TimeseriesFactoryException import TimeseriesFactoryException
from Util import ObjectView
import edge
import iaga2002
......@@ -43,6 +41,97 @@ class Controller(object):
self._algorithm = algorithm
self._outputFactory = outputFactory
def _get_input_timeseries(self, observatory, channels, starttime, endtime):
"""Get timeseries from the input factory for requested options.
Parameters
----------
observatory : array_like
observatories to request.
channels : array_like
channels to request.
starttime : obspy.core.UTCDateTime
time of first sample to request.
endtime : obspy.core.UTCDateTime
time of last sample to request.
renames : array_like
list of channels to rename
each list item should be array_like:
the first element is the channel to rename,
the last element is the new channel name
Returns
-------
timeseries : obspy.core.Stream
"""
timeseries = Stream()
for obs in list(observatory):
# get input interval for observatory
# do this per observatory in case an
# algorithm needs different amounts of data
input_start, input_end = self._algorithm.get_input_interval(
start=starttime,
end=endtime,
observatory=obs,
channel=channels)
timeseries += self._inputFactory.get_timeseries(
observatory=obs,
starttime=input_start,
endtime=input_end,
channels=channels)
return timeseries
def _rename_channels(self, timeseries, renames):
"""Rename trace channel names.
Parameters
----------
timeseries : obspy.core.Stream
stream with channels to rename
renames : array_like
list of channels to rename
each list item should be array_like:
the first element is the channel to rename,
the last element is the new channel name
Returns
-------
timeseries : obspy.core.Stream
"""
for r in renames:
from_name, to_name = r[0], r[-1]
for t in timeseries.select(channel=from_name):
t.stats.channel = to_name
return timeseries
def _get_output_timeseries(self, observatory, channels, starttime,
endtime):
"""Get timeseries from the output factory for requested options.
Parameters
----------
observatory : array_like
observatories to request.
channels : array_like
channels to request.
starttime : obspy.core.UTCDateTime
time of first sample to request.
endtime : obspy.core.UTCDateTime
time of last sample to request.
Returns
-------
timeseries : obspy.core.Stream
"""
timeseries = Stream()
for obs in list(observatory):
timeseries += self._outputFactory.get_timeseries(
observatory=obs,
starttime=starttime,
endtime=endtime,
channels=channels)
return timeseries
def run(self, options):
"""run controller
Parameters
......@@ -52,22 +141,28 @@ class Controller(object):
contain other options passed in by the controller.
"""
algorithm = self._algorithm
input_channels = algorithm.get_input_channels()
output_channels = self._get_output_channels(
algorithm.get_output_channels(),
options.outchannels)
# get input
start, end = self._algorithm.get_input_interval(
start=options.starttime,
end=options.endtime)
timeseries = self._inputFactory.get_timeseries(
starttime=start,
endtime=end,
input_channels = options.inchannels or \
algorithm.get_input_channels()
output_channels = options.outchannels or \
algorithm.get_output_channels()
# input
timeseries = self._get_input_timeseries(
observatory=options.observatory,
starttime=options.starttime,
endtime=options.endtime,
channels=input_channels)
if timeseries.count() == 0:
return
# process
if options.rename_input_channel:
timeseries = self._rename_channels(
timeseries=timeseries,
renames=options.rename_input_channel)
processed = algorithm.process(timeseries)
if options.rename_output_channel:
processed = self._rename_channels(
timeseries=processed,
renames=options.rename_output_channel)
# output
self._outputFactory.put_timeseries(
timeseries=processed,
......@@ -95,12 +190,13 @@ class Controller(object):
period, oldest to newest.
"""
algorithm = self._algorithm
input_channels = algorithm.get_input_channels()
output_channels = self._get_output_channels(
algorithm.get_output_channels(),
options.outchannels)
input_channels = options.inchannels or \
algorithm.get_input_channels()
output_channels = options.outchannels or \
algorithm.get_output_channels()
# request output to see what has already been generated
output_timeseries = self._outputFactory.get_timeseries(
output_timeseries = self._get_output_timeseries(
observatory=options.observatory,
starttime=options.starttime,
endtime=options.endtime,
channels=output_channels)
......@@ -109,12 +205,10 @@ class Controller(object):
output_gaps = TimeseriesUtility.get_merged_gaps(
TimeseriesUtility.get_stream_gaps(output_timeseries))
for output_gap in output_gaps:
input_start, input_end = algorithm.get_input_interval(
start=output_gap[0],
end=output_gap[1])
input_timeseries = self._inputFactory.get_timeseries(
starttime=input_start,
endtime=input_end,
input_timeseries = self._get_input_timeseries(
observatory=options.observatory,
starttime=output_gap[0],
endtime=output_gap[1],
channels=input_channels)
if not algorithm.can_produce_data(
starttime=output_gap[0],
......@@ -125,40 +219,13 @@ class Controller(object):
if output_gap[0] == options.starttime:
# found fillable gap at start, recurse to previous interval
interval = options.endtime - options.starttime
self.run_as_update(ObjectView({
'outchannels': options.outchannels,
'starttime': options.starttime - interval - delta,
'endtime': options.starttime - delta
}))
options.starttime = options.starttime - interval - delta
options.endtime = options.starttime - delta
self.run_as_update(options)
# fill gap
self.run(ObjectView({
'outchannels': options.outchannels,
'starttime': output_gap[0],
'endtime': output_gap[1]
}))
def _get_output_channels(self, algorithm_channels, commandline_channels):
"""get output channels
Parameters
----------
algorithm_channels: array_like
list of channels required by the algorithm
commandline_channels: array_like
list of channels requested by the user
Notes
-----
We want to return the channels requested by the user, but we require
that they be in the list of channels for the algorithm.
"""
if commandline_channels is not None:
for channel in commandline_channels:
if channel not in algorithm_channels:
raise TimeseriesFactoryException(
'Output "%s" Channel not in Algorithm'
% channel)
return commandline_channels
return algorithm_channels
options.starttime = output_gap[0]
options.endtime = output_gap[1]
self.run(options)
def main(args):
......@@ -347,7 +414,11 @@ def parse_args(args):
help='UTC date YYYY-MM-DD HH:MM:SS')
parser.add_argument('--observatory',
help='Observatory code ie BOU, CMO, etc')
help='Observatory code ie BOU, CMO, etc.' +
' CAUTION: Using multiple observatories is not' +
' recommended in most cases; especially with' +
' single observatory formats like IAGA and PCDCP.',
nargs='*')
parser.add_argument('--inchannels',
nargs='*',
help='Channels H, E, Z, etc')
......@@ -358,6 +429,16 @@ def parse_args(args):
parser.add_argument('--type',
default='variation',
choices=['variation', 'quasi-definitive', 'definitive'])
parser.add_argument('--rename-input-channel',
action='append',
help='Rename an input channel after it is read',
metavar=('FROM', 'TO'),
nargs=2)
parser.add_argument('--rename-output-channel',
action='append',
help='Rename an output channel before it is written',
metavar=('FROM', 'TO'),
nargs=2)
parser.add_argument('--locationcode',
choices=['R0', 'R1', 'RM', 'Q0', 'D0', 'C0'])
parser.add_argument('--outlocationcode',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment