diff --git a/geomagio/TimeseriesUtility.py b/geomagio/TimeseriesUtility.py index 05db4e2cbd1593b53160cb226dbc5f8a430246ef..0fcf546bb4dc9a602ce75de6f6e640bf2bad33d0 100644 --- a/geomagio/TimeseriesUtility.py +++ b/geomagio/TimeseriesUtility.py @@ -250,6 +250,74 @@ def get_channels(stream): return [ch for ch in channels] +def has_all_channels(stream, channels, starttime, endtime): + """Check whether all channels have any data within time range. + + Parameters + ---------- + stream: obspy.core.Stream + The input stream we want to make certain has data + channels: array_like + The list of channels that we want to have concurrent data + starttime: UTCDateTime + start time of requested output + end : UTCDateTime + end time of requested output + + Returns + ------- + bool: True if data found across all channels between starttime/endtime + """ + input_gaps = get_merged_gaps( + get_stream_gaps(stream=stream, channels=channels)) + print(starttime, endtime) + print(input_gaps) + for input_gap in input_gaps: + # Check for gaps that include the entire range + if (starttime >= input_gap[0] and + starttime <= input_gap[1] and + endtime < input_gap[2]): + return False + return True + + +def has_any_channels(stream, channels, starttime, endtime): + """Check whether any channel has data within time range. + + Parameters + ---------- + stream: obspy.core.Stream + The input stream we want to make certain has data + channels: array_like + The list of channels that we want to have concurrent data + starttime: UTCDateTime + start time of requested output + end : UTCDateTime + end time of requested output + + Returns + ------- + bool: True if any data found between starttime/endtime + """ + # process if any channels have data not covering the time range + input_gaps = get_stream_gaps(stream=stream, channels=channels) + for channel in channels: + if channel not in input_gaps: + continue + channel_gaps = input_gaps[channel] + if len(channel_gaps) == 0: + # no gaps in channel + return True + for gap in channel_gaps: + if not (starttime >= gap[0] and + starttime <= gap[1] and + endtime < gap[2]): + # gap doesn't span channel + return True + # didn't find any data + return False + + def mask_stream(stream): """Convert stream traces to masked arrays. diff --git a/test/TimeseriesUtility_test.py b/test/TimeseriesUtility_test.py index 627393c154ef2ae8a670f62f2718918ecffe6ab8..b2eea78ccb39eb35b3277be2ca3cceea066e111b 100644 --- a/test/TimeseriesUtility_test.py +++ b/test/TimeseriesUtility_test.py @@ -90,7 +90,6 @@ def test_get_stream_gaps_channels(): test that gaps are only checked in specified channels. """ - stream = Stream stream = Stream([ __create_trace('H', [numpy.nan, 1, 1, numpy.nan, numpy.nan]), __create_trace('Z', [0, 0, 0, 1, 1, 1]) @@ -163,6 +162,58 @@ def test_get_merged_gaps(): assert_equal(gap[1], UTCDateTime('2015-01-01T00:00:07Z')) +def test_has_all_channels(): + """TimeseriesUtility_test.test_has_all_channels(): + """ + nan = numpy.nan + stream = Stream([ + __create_trace('H', [nan, 1, 1, nan, nan]), + __create_trace('Z', [0, 0, 0, 1, 1]), + __create_trace('E', [nan, nan, nan, nan, nan]) + ]) + for trace in stream: + # set time of first sample + trace.stats.starttime = UTCDateTime('2015-01-01T00:00:00Z') + # set sample rate to 1 second + trace.stats.delta = 1 + trace.stats.npts = len(trace.data) + # check for channels + starttime = stream[0].stats.starttime + endtime = stream[0].stats.endtime + assert_equal(TimeseriesUtility.has_all_channels( + stream, ['H', 'Z'], starttime, endtime), True) + assert_equal(TimeseriesUtility.has_all_channels( + stream, ['H', 'Z', 'E'], starttime, endtime), False) + assert_equal(TimeseriesUtility.has_all_channels( + stream, ['E'], starttime, endtime), False) + + +def test_has_any_channels(): + """TimeseriesUtility_test.test_has_any_channels(): + """ + nan = numpy.nan + stream = Stream([ + __create_trace('H', [nan, 1, 1, nan, nan]), + __create_trace('Z', [0, 0, 0, 1, 1, 1]), + __create_trace('E', [nan, nan, nan, nan, nan]) + ]) + for trace in stream: + # set time of first sample + trace.stats.starttime = UTCDateTime('2015-01-01T00:00:00Z') + # set sample rate to 1 second + trace.stats.delta = 1 + trace.stats.npts = len(trace.data) + # check for channels + starttime = stream[0].stats.starttime + endtime = stream[0].stats.endtime + assert_equal(TimeseriesUtility.has_any_channels( + stream, ['H', 'Z'], starttime, endtime), True) + assert_equal(TimeseriesUtility.has_any_channels( + stream, ['H', 'Z', 'E'], starttime, endtime), True) + assert_equal(TimeseriesUtility.has_any_channels( + stream, ['E'], starttime, endtime), False) + + def test_merge_streams(): """TimeseriesUtility_test.test_merge_streams()