From 62a163fdffebf222ad986c57095f0927dfe8df3e Mon Sep 17 00:00:00 2001 From: Jeremy Fee <jmfee@usgs.gov> Date: Thu, 20 Feb 2020 10:53:34 -0700 Subject: [PATCH] Add has_all_channels and has_any_channels utility methods and tests --- geomagio/TimeseriesUtility.py | 68 ++++++++++++++++++++++++++++++++++ test/TimeseriesUtility_test.py | 53 +++++++++++++++++++++++++- 2 files changed, 120 insertions(+), 1 deletion(-) diff --git a/geomagio/TimeseriesUtility.py b/geomagio/TimeseriesUtility.py index 05db4e2cb..0fcf546bb 100644 --- a/geomagio/TimeseriesUtility.py +++ b/geomagio/TimeseriesUtility.py @@ -250,6 +250,74 @@ def get_channels(stream): return [ch for ch in channels] +def has_all_channels(stream, channels, starttime, endtime): + """Check whether all channels have any data within time range. + + Parameters + ---------- + stream: obspy.core.Stream + The input stream we want to make certain has data + channels: array_like + The list of channels that we want to have concurrent data + starttime: UTCDateTime + start time of requested output + end : UTCDateTime + end time of requested output + + Returns + ------- + bool: True if data found across all channels between starttime/endtime + """ + input_gaps = get_merged_gaps( + get_stream_gaps(stream=stream, channels=channels)) + print(starttime, endtime) + print(input_gaps) + for input_gap in input_gaps: + # Check for gaps that include the entire range + if (starttime >= input_gap[0] and + starttime <= input_gap[1] and + endtime < input_gap[2]): + return False + return True + + +def has_any_channels(stream, channels, starttime, endtime): + """Check whether any channel has data within time range. + + Parameters + ---------- + stream: obspy.core.Stream + The input stream we want to make certain has data + channels: array_like + The list of channels that we want to have concurrent data + starttime: UTCDateTime + start time of requested output + end : UTCDateTime + end time of requested output + + Returns + ------- + bool: True if any data found between starttime/endtime + """ + # process if any channels have data not covering the time range + input_gaps = get_stream_gaps(stream=stream, channels=channels) + for channel in channels: + if channel not in input_gaps: + continue + channel_gaps = input_gaps[channel] + if len(channel_gaps) == 0: + # no gaps in channel + return True + for gap in channel_gaps: + if not (starttime >= gap[0] and + starttime <= gap[1] and + endtime < gap[2]): + # gap doesn't span channel + return True + # didn't find any data + return False + + def mask_stream(stream): """Convert stream traces to masked arrays. diff --git a/test/TimeseriesUtility_test.py b/test/TimeseriesUtility_test.py index 627393c15..b2eea78cc 100644 --- a/test/TimeseriesUtility_test.py +++ b/test/TimeseriesUtility_test.py @@ -90,7 +90,6 @@ def test_get_stream_gaps_channels(): test that gaps are only checked in specified channels. """ - stream = Stream stream = Stream([ __create_trace('H', [numpy.nan, 1, 1, numpy.nan, numpy.nan]), __create_trace('Z', [0, 0, 0, 1, 1, 1]) @@ -163,6 +162,58 @@ def test_get_merged_gaps(): assert_equal(gap[1], UTCDateTime('2015-01-01T00:00:07Z')) +def test_has_all_channels(): + """TimeseriesUtility_test.test_has_all_channels(): + """ + nan = numpy.nan + stream = Stream([ + __create_trace('H', [nan, 1, 1, nan, nan]), + __create_trace('Z', [0, 0, 0, 1, 1]), + __create_trace('E', [nan, nan, nan, nan, nan]) + ]) + for trace in stream: + # set time of first sample + trace.stats.starttime = UTCDateTime('2015-01-01T00:00:00Z') + # set sample rate to 1 second + trace.stats.delta = 1 + trace.stats.npts = len(trace.data) + # check for channels + starttime = stream[0].stats.starttime + endtime = stream[0].stats.endtime + assert_equal(TimeseriesUtility.has_all_channels( + stream, ['H', 'Z'], starttime, endtime), True) + assert_equal(TimeseriesUtility.has_all_channels( + stream, ['H', 'Z', 'E'], starttime, endtime), False) + assert_equal(TimeseriesUtility.has_all_channels( + stream, ['E'], starttime, endtime), False) + + +def test_has_any_channels(): + """TimeseriesUtility_test.test_has_any_channels(): + """ + nan = numpy.nan + stream = Stream([ + __create_trace('H', [nan, 1, 1, nan, nan]), + __create_trace('Z', [0, 0, 0, 1, 1, 1]), + __create_trace('E', [nan, nan, nan, nan, nan]) + ]) + for trace in stream: + # set time of first sample + trace.stats.starttime = UTCDateTime('2015-01-01T00:00:00Z') + # set sample rate to 1 second + trace.stats.delta = 1 + trace.stats.npts = len(trace.data) + # check for channels + starttime = stream[0].stats.starttime + endtime = stream[0].stats.endtime + assert_equal(TimeseriesUtility.has_any_channels( + stream, ['H', 'Z'], starttime, endtime), True) + assert_equal(TimeseriesUtility.has_any_channels( + stream, ['H', 'Z', 'E'], starttime, endtime), True) + assert_equal(TimeseriesUtility.has_any_channels( + stream, ['E'], starttime, endtime), False) + + def test_merge_streams(): """TimeseriesUtility_test.test_merge_streams() -- GitLab