Skip to content
Snippets Groups Projects
Commit dbb14460 authored by Jeremy M Fee's avatar Jeremy M Fee
Browse files

Test and refactor TimeseriesUtility, update Controller run_as_update method

parent 0fcd7077
No related branches found
No related tags found
No related merge requests found
...@@ -56,9 +56,14 @@ class Algorithm(object): ...@@ -56,9 +56,14 @@ class Algorithm(object):
def get_input_interval(self, start, end): def get_input_interval(self, start, end):
"""Get Input Interval """Get Input Interval
start : UTCDateTime
start time of requested output
end : UTCDateTime
end time of requested output
Returns Returns
------- -------
tuple tuple : (input_start, input_end)
start/end time pair of the input interval start and end of required input to generate requested output.
""" """
return (start, end) return (start, end)
"""Controller class for geomag algorithms""" """Controller class for geomag algorithms"""
import TimeseriesUtilities as TUtils import TimeseriesUtility
import TimeseriesFactoryException import TimeseriesFactoryException
import copy
class Controller(object): class Controller(object):
...@@ -41,21 +40,26 @@ class Controller(object): ...@@ -41,21 +40,26 @@ class Controller(object):
The dictionary of all the command line arguments. Could in theory The dictionary of all the command line arguments. Could in theory
contain other options passed in by the controller. contain other options passed in by the controller.
""" """
input_channels = self._algorithm.get_input_channels() algorithm = self._algorithm
algorithm_start, algorithm_end = self._algorithm.get_input_interval( input_channels = algorithm.get_input_channels()
options.starttime, options.endtime) output_channels = self._get_output_channels(
algorithm.get_output_channels(),
timeseries = self._inputFactory.get_timeseries(algorithm_start,
algorithm_end, channels=input_channels)
processed = self._algorithm.process(timeseries)
output_channels = self._algorithm.get_output_channels()
output_channels = self._get_output_channels(output_channels,
options.outchannels) options.outchannels)
# get input
self._outputFactory.put_timeseries(timeseries=processed, start, end = self._algorithm.get_input_interval(
starttime=options.starttime, endtime=options.endtime, start=options.starttime,
end=options.endtime)
timeseries = self._inputFactory.get_timeseries(
starttime=start,
endtime=end,
channels=input_channels)
# process
processed = algorithm.process(timeseries)
# output
self._outputFactory.put_timeseries(
timeseries=processed,
starttime=options.starttime,
endtime=options.endtime,
channels=output_channels) channels=output_channels)
def run_as_update(self, options): def run_as_update(self, options):
...@@ -77,45 +81,45 @@ class Controller(object): ...@@ -77,45 +81,45 @@ class Controller(object):
if new data is available there as well. Calls run for each new if new data is available there as well. Calls run for each new
period, oldest to newest. period, oldest to newest.
""" """
input_channels = self._algorithm.get_input_channels() algorithm = self._algorithm
output_channels = self._algorithm.get_output_channels() input_channels = algorithm.get_input_channels()
output_channels = self._get_output_channels(
output_channels = self._get_output_channels(output_channels, algorithm.get_output_channels(),
options.outchannels) options.outchannels)
# request output to see what has already been generated
timeseries_source = self._inputFactory.get_timeseries( output_timeseries = self._outputFactory.get_timeseries(
options.starttime, options.endtime, channels=input_channels) starttime=options.starttime,
timeseries_target = self._outputFactory.get_timeseries( endtime=options.endtime,
options.starttime, options.endtime, channels=output_channels) channels=output_channels)
# find gaps in output, so they can be updated
source_gaps = TUtils.get_timeseries_gaps( output_gaps = TimeseriesUtility.get_stream_gaps(output_timeseries)
timeseries_source, input_channels, options.starttime, for gap in output_gaps:
options.endtime) start, end = algorithm.get_input_interval(
target_gaps = TUtils.get_timeseries_gaps( start=gap[0],
timeseries_target, output_channels, options.starttime, end=gap[1])
options.endtime) input_timeseries = self._inputFactory.get_timeseries(
source_gaps = TUtils.get_merged_gaps(source_gaps, input_channels) starttime=start,
target_gaps = TUtils.get_merged_gaps(target_gaps, output_channels) endtime=end,
del timeseries_source channels=input_channels)
del timeseries_target input_gaps = TimeseriesUtility.get_stream_gaps(input_timeseries)
if len(input_gaps) > 0:
if ((not len(source_gaps) or # TODO: are certain gaps acceptable?
len(source_gaps) and source_gaps[0][0] != options.starttime)
and
len(target_gaps) and target_gaps[0][0] == options.starttime):
new_options = copy.deepcopy(options)
new_options.starttime = (options.starttime -
(options.endtime - options.starttime))
new_options.endtime = (options.starttime -
TUtils.get_seconds_of_interval(options.interval))
self.run_as_update(new_options)
for target_gap in target_gaps:
if not TUtils.gap_is_new_data(source_gaps, target_gap):
continue continue
new_options = copy.deepcopy(options) # check for fillable gap at start
new_options.starttime = target_gap[0] if gap[0] == options.starttime:
new_options.endtime = target_gap[1] # found fillable gap at start, recurse to previous interval
self.run(new_options) interval = options.endtime - options.starttime
self.run_as_update({
'outchannels': options.outchannels,
'starttime': options.starttime - interval,
'endtime': options.starttime
})
# fill gap
self.run({
'outchannels': options.outchannels,
'starttime': gap[0],
'endtime': gap[1]
})
def _get_output_channels(self, algorithm_channels, commandline_channels): def _get_output_channels(self, algorithm_channels, commandline_channels):
"""get output channels """get output channels
......
"""Timeseries Utilities"""
import numpy
def get_timeseries_gaps(timeseries, channels, starttime, endtime):
"""Get gaps in a given timeseries
Parameters
----------
timeseries: obspy.core.stream
the stream to check for gaps in
channels: array_like
list of channels to look for gaps in
starttime: obspy.core.UTCDateTime
time of first sample.
endtime: obspy.core.UTCDateTime
time of last sample.
Returns
-------
dictionary of channel gaps arrays
Notes
-----
Returns a dictionary with channel: gaps array pairs. Where the gaps array
consists of arrays of starttime/endtime pairs representing each gap.
"""
gaps = {}
for channel in channels:
stream_gap = get_stream_gaps(
timeseries.select(channel=channel), starttime, endtime)
gaps[channel] = stream_gap
return gaps
def get_stream_gaps(stream, starttime, endtime):
"""Gets gaps in a stream representing a single channel
Parameters
----------
stream: obspy.core.stream
a stream containing a single channel of data.
starttime: obspy.core.UTCDateTime
time of first sample.
endtime: obspy.core.UTCDateTime
time of last sample.
Returns
-------
array of gaps
"""
gaps = []
gap = None
i = 0
data = stream[0].data
length = len(data)
for i in xrange(0, length):
if numpy.isnan(data[i]) and gap is None:
gap = [starttime + i * 60]
if not numpy.isnan(data[i]) and gap is not None:
gap.append(starttime + (i - 1) * 60)
gaps.append(gap)
gap = None
if gap is not None:
gap.append(endtime)
gaps.append(gap)
return gaps
def get_merged_gaps(gaps, channels):
"""Get gaps merged across channels/streams
Parameters
----------
gaps: dictionary
contains channel/gap array pairs
channels: array_like
array of channels to look for gaps in
Returns
-------
array_like
an array of startime/endtime arrays representing gaps.
Notes
-----
Takes an dictionary of gaps, and merges those gaps across channels,
returning an array of the merged gaps.
"""
gap_stream = []
for channel in channels:
gap_stream.extend(gaps[channel])
if len(gap_stream) == 0:
return []
sorted_gaps = sorted(gap_stream, key=lambda starttime: starttime[1])
merged_gaps = []
gap = sorted_gaps[0]
for i in range(1, len(sorted_gaps)):
nxtgap = sorted_gaps[i]
if nxtgap[0] >= gap[0] and nxtgap[0] <= gap[1]:
if nxtgap[1] > gap[1]:
gap[1] = nxtgap[1]
else:
merged_gaps.append(gap)
gap = nxtgap
merged_gaps.append(gap)
return merged_gaps
def is_new_data(input_gaps, output_gaps):
"""Is new data available in gaps
Parameters
----------
input_gaps: array_like
an array of startime/endtime gap pairs holding the input gaps
output_gaps: array_like
an array of starttime/endtime gap pairs holding the ouput gaps
Returns
boolean
True if there's new data available, False otherwise
"""
for output_gap in output_gaps:
for input_gap in input_gaps:
if (output_gap[0] >= input_gap[0] and
output_gap[0] <= input_gap[1] and
output_gap[1] <= input_gap[1]):
return False
return True
def gap_is_new_data(input_gaps, output_gap):
"""Is new data available for a single gap
Parameters
----------
input_gaps: array_like
an array of startime/endtime gap pairs holding the input gaps
output_gaps: array_like
starttime/endtime pair representing a single gap
Returns
boolean
True if there's new data available for the gap, False otherwise
"""
for input_gap in input_gaps:
if (output_gap[0] >= input_gap[0] and
output_gap[0] <= input_gap[1] and
output_gap[1] <= input_gap[1]):
return False
return True
def get_seconds_of_interval(interval):
"""Gets number of seconds for a given interval string
Parameters
----------
interval: string
The string representing an interval size
"""
if interval == 'second':
return 1
if interval == 'minute':
return 60
if interval == 'hourly':
return 3600
if interval == 'daily':
return 86400
"""Timeseries Utilities"""
import numpy
def get_stream_gaps(stream):
"""Get gaps in a given stream
Parameters
----------
stream: obspy.core.Stream
the stream to check for gaps
channels: array_like
list of channels to check for gaps
Returns
-------
dictionary of channel gaps arrays
Notes
-----
Returns a dictionary with channel: gaps array pairs. Where the gaps array
consists of arrays of starttime/endtime pairs representing each gap.
"""
gaps = {}
for trace in stream:
channel = trace.stats.channel
gaps[channel] = get_trace_gaps(trace)
return gaps
def get_trace_gaps(trace):
"""Gets gaps in a trace representing a single channel
Parameters
----------
trace: obspy.core.Trace
a stream containing a single channel of data.
Returns
-------
array of gaps, which is empty when there are no gaps.
each gap is an array [start of gap, end of gap, next sample]
"""
gaps = []
gap = None
data = trace.data
stats = trace.stats
starttime = stats.starttime
length = len(data)
delta = stats.delta
for i in xrange(0, length):
if numpy.isnan(data[i]):
if gap is None:
# start of a gap
gap = [starttime + i * delta]
else:
if gap is not None:
# end of a gap
gap.extend([
starttime + (i - 1) * delta,
starttime + i * delta])
gaps.append(gap)
gap = None
# check for gap at end
if gap is not None:
gap.extend([
starttime + (length - 1) * delta,
starttime + length * delta])
gaps.append(gap)
return gaps
def get_merged_gaps(gaps):
"""Get gaps merged across channels/streams
Parameters
----------
gaps: dictionary
contains channel/gap array pairs
Returns
-------
array_like
an array of startime/endtime arrays representing gaps.
Notes
-----
Takes an dictionary of gaps, and merges those gaps across channels,
returning an array of the merged gaps.
"""
merged_gaps = []
for key in gaps:
merged_gaps.extend(gaps[key])
# sort gaps so earlier gaps are before later gaps
sorted_gaps = sorted(merged_gaps, key=lambda gap: gap[0])
# merge gaps that overlap
merged_gaps = []
merged_gap = None
for gap in sorted_gaps:
if merged_gap is None:
# start of gap
merged_gap = gap
elif gap[0] > merged_gap[2]:
# next gap starts after current gap ends
merged_gaps.append(merged_gap)
merged_gap = gap
elif gap[0] <= merged_gap[2]:
# next gap starts at or before next data
if gap[1] > merged_gap[1]:
# next gap ends after current gap ends, extend current
merged_gap[1] = gap[1]
merged_gap[2] = gap[2]
if merged_gap is not None:
merged_gaps.append(merged_gap)
return merged_gaps
#! /usr/bin/env python
from nose.tools import assert_equals
from StreamConverter_test import __create_trace
import numpy
import TimeseriesUtility
from obspy.core import Stream, UTCDateTime
def test_get_stream_gaps():
"""geomag.TimeseriesUtility_test.test_get_stream_gaps
confirms that gaps are found in a stream
"""
stream = Stream([
__create_trace('H', [numpy.nan, 1, 1, numpy.nan, numpy.nan]),
__create_trace('Z', [0, 0, 0, 1, 1, 1])
])
for trace in stream:
# set time of first sample
trace.stats.starttime = UTCDateTime('2015-01-01T00:00:00Z')
# set sample rate to 1 second
trace.stats.delta = 1
# find gaps
gaps = TimeseriesUtility.get_stream_gaps(stream)
assert_equals(len(gaps['H']), 2)
# gap at start of H
gap = gaps['H'][0]
assert_equals(gap[0], UTCDateTime('2015-01-01T00:00:00Z'))
assert_equals(gap[1], UTCDateTime('2015-01-01T00:00:00Z'))
# gap at end of H
gap = gaps['H'][1]
assert_equals(gap[0], UTCDateTime('2015-01-01T00:00:03Z'))
assert_equals(gap[1], UTCDateTime('2015-01-01T00:00:04Z'))
# no gaps in Z channel
assert_equals(len(gaps['Z']), 0)
def test_get_trace_gaps():
"""geomag.TimeseriesUtility_test.test_get_trace_gaps
confirm that gaps are found in a trace
"""
trace = __create_trace('H', [1, 1, numpy.nan, numpy.nan, 0, 1])
# set time of first sample
trace.stats.starttime = UTCDateTime('2015-01-01T00:00:00Z')
# set sample rate to 1 minute
trace.stats.delta = 60
# find gap
gaps = TimeseriesUtility.get_trace_gaps(trace)
assert_equals(len(gaps), 1)
gap = gaps[0]
assert_equals(gap[0], UTCDateTime('2015-01-01T00:02:00Z'))
assert_equals(gap[1], UTCDateTime('2015-01-01T00:03:00Z'))
def test_get_merged_gaps():
"""geomag.TimeseriesUtility_test.test_get_merged_gaps
confirm that gaps are merged
"""
merged = TimeseriesUtility.get_merged_gaps({
'H': [
# gap for 2 seconds, that starts after next gap
[
UTCDateTime('2015-01-01T00:00:01Z'),
UTCDateTime('2015-01-01T00:00:03Z'),
UTCDateTime('2015-01-01T00:00:04Z')
]
],
# gap for 1 second, that occurs before previous gap
'Z': [
[
UTCDateTime('2015-01-01T00:00:00Z'),
UTCDateTime('2015-01-01T00:00:00Z'),
UTCDateTime('2015-01-01T00:00:01Z')
],
[
UTCDateTime('2015-01-01T00:00:05Z'),
UTCDateTime('2015-01-01T00:00:07Z'),
UTCDateTime('2015-01-01T00:00:08Z')
],
]
})
assert_equals(len(merged), 2)
# first gap combines H and Z gaps
gap = merged[0]
assert_equals(gap[0], UTCDateTime('2015-01-01T00:00:00Z'))
assert_equals(gap[1], UTCDateTime('2015-01-01T00:00:03Z'))
# second gap is second Z gap
gap = merged[1]
assert_equals(gap[0], UTCDateTime('2015-01-01T00:00:05Z'))
assert_equals(gap[1], UTCDateTime('2015-01-01T00:00:07Z'))
...@@ -9,8 +9,8 @@ from AlgorithmException import AlgorithmException ...@@ -9,8 +9,8 @@ from AlgorithmException import AlgorithmException
from Controller import Controller from Controller import Controller
from TimeseriesFactory import TimeseriesFactory from TimeseriesFactory import TimeseriesFactory
from TimeseriesFactoryException import TimeseriesFactoryException from TimeseriesFactoryException import TimeseriesFactoryException
import TimeseriesUtility
from XYZAlgorithm import XYZAlgorithm from XYZAlgorithm import XYZAlgorithm
import TimeseriesUtilities
__all__ = [ __all__ = [
'Algorithm', 'Algorithm',
...@@ -20,6 +20,6 @@ __all__ = [ ...@@ -20,6 +20,6 @@ __all__ = [
'StreamConverter', 'StreamConverter',
'TimeseriesFactory', 'TimeseriesFactory',
'TimeseriesFactoryException', 'TimeseriesFactoryException',
'XYZAlgorithm', 'TimeseriesUtility',
'TimeseriesUtilities' 'XYZAlgorithm'
] ]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment