Skip to content
Snippets Groups Projects
Commit 62a163fd authored by Jeremy M Fee's avatar Jeremy M Fee
Browse files

Add has_all_channels and has_any_channels utility methods and tests

parent 1c06af9d
No related branches found
No related tags found
No related merge requests found
...@@ -250,6 +250,74 @@ def get_channels(stream): ...@@ -250,6 +250,74 @@ def get_channels(stream):
return [ch for ch in channels] return [ch for ch in channels]
def has_all_channels(stream, channels, starttime, endtime):
"""Check whether all channels have any data within time range.
Parameters
----------
stream: obspy.core.Stream
The input stream we want to make certain has data
channels: array_like
The list of channels that we want to have concurrent data
starttime: UTCDateTime
start time of requested output
end : UTCDateTime
end time of requested output
Returns
-------
bool: True if data found across all channels between starttime/endtime
"""
input_gaps = get_merged_gaps(
get_stream_gaps(stream=stream, channels=channels))
print(starttime, endtime)
print(input_gaps)
for input_gap in input_gaps:
# Check for gaps that include the entire range
if (starttime >= input_gap[0] and
starttime <= input_gap[1] and
endtime < input_gap[2]):
return False
return True
def has_any_channels(stream, channels, starttime, endtime):
"""Check whether any channel has data within time range.
Parameters
----------
stream: obspy.core.Stream
The input stream we want to make certain has data
channels: array_like
The list of channels that we want to have concurrent data
starttime: UTCDateTime
start time of requested output
end : UTCDateTime
end time of requested output
Returns
-------
bool: True if any data found between starttime/endtime
"""
# process if any channels have data not covering the time range
input_gaps = get_stream_gaps(stream=stream, channels=channels)
for channel in channels:
if channel not in input_gaps:
continue
channel_gaps = input_gaps[channel]
if len(channel_gaps) == 0:
# no gaps in channel
return True
for gap in channel_gaps:
if not (starttime >= gap[0] and
starttime <= gap[1] and
endtime < gap[2]):
# gap doesn't span channel
return True
# didn't find any data
return False
def mask_stream(stream): def mask_stream(stream):
"""Convert stream traces to masked arrays. """Convert stream traces to masked arrays.
......
...@@ -90,7 +90,6 @@ def test_get_stream_gaps_channels(): ...@@ -90,7 +90,6 @@ def test_get_stream_gaps_channels():
test that gaps are only checked in specified channels. test that gaps are only checked in specified channels.
""" """
stream = Stream
stream = Stream([ stream = Stream([
__create_trace('H', [numpy.nan, 1, 1, numpy.nan, numpy.nan]), __create_trace('H', [numpy.nan, 1, 1, numpy.nan, numpy.nan]),
__create_trace('Z', [0, 0, 0, 1, 1, 1]) __create_trace('Z', [0, 0, 0, 1, 1, 1])
...@@ -163,6 +162,58 @@ def test_get_merged_gaps(): ...@@ -163,6 +162,58 @@ def test_get_merged_gaps():
assert_equal(gap[1], UTCDateTime('2015-01-01T00:00:07Z')) assert_equal(gap[1], UTCDateTime('2015-01-01T00:00:07Z'))
def test_has_all_channels():
"""TimeseriesUtility_test.test_has_all_channels():
"""
nan = numpy.nan
stream = Stream([
__create_trace('H', [nan, 1, 1, nan, nan]),
__create_trace('Z', [0, 0, 0, 1, 1]),
__create_trace('E', [nan, nan, nan, nan, nan])
])
for trace in stream:
# set time of first sample
trace.stats.starttime = UTCDateTime('2015-01-01T00:00:00Z')
# set sample rate to 1 second
trace.stats.delta = 1
trace.stats.npts = len(trace.data)
# check for channels
starttime = stream[0].stats.starttime
endtime = stream[0].stats.endtime
assert_equal(TimeseriesUtility.has_all_channels(
stream, ['H', 'Z'], starttime, endtime), True)
assert_equal(TimeseriesUtility.has_all_channels(
stream, ['H', 'Z', 'E'], starttime, endtime), False)
assert_equal(TimeseriesUtility.has_all_channels(
stream, ['E'], starttime, endtime), False)
def test_has_any_channels():
"""TimeseriesUtility_test.test_has_any_channels():
"""
nan = numpy.nan
stream = Stream([
__create_trace('H', [nan, 1, 1, nan, nan]),
__create_trace('Z', [0, 0, 0, 1, 1, 1]),
__create_trace('E', [nan, nan, nan, nan, nan])
])
for trace in stream:
# set time of first sample
trace.stats.starttime = UTCDateTime('2015-01-01T00:00:00Z')
# set sample rate to 1 second
trace.stats.delta = 1
trace.stats.npts = len(trace.data)
# check for channels
starttime = stream[0].stats.starttime
endtime = stream[0].stats.endtime
assert_equal(TimeseriesUtility.has_any_channels(
stream, ['H', 'Z'], starttime, endtime), True)
assert_equal(TimeseriesUtility.has_any_channels(
stream, ['H', 'Z', 'E'], starttime, endtime), True)
assert_equal(TimeseriesUtility.has_any_channels(
stream, ['E'], starttime, endtime), False)
def test_merge_streams(): def test_merge_streams():
"""TimeseriesUtility_test.test_merge_streams() """TimeseriesUtility_test.test_merge_streams()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment