Newer
Older
"""Timeseries Utilities"""
from builtins import range
import numpy
import obspy.core
def get_stream_gaps(stream, channels=None):
"""Get gaps in a given stream
Parameters
----------
stream: obspy.core.Stream
the stream to check for gaps
channels: array_like
list of channels to check for gaps
Default is None (check all channels).
Returns
-------
dictionary of channel gaps arrays
Notes
-----
Returns a dictionary with channel: gaps array pairs. Where the gaps array
consists of arrays of starttime/endtime pairs representing each gap.
"""
gaps = {}
for trace in stream:
channel = trace.stats.channel
if channels is not None and channel not in channels:
continue
gaps[channel] = get_trace_gaps(trace)
return gaps
def get_trace_gaps(trace):
"""Gets gaps in a trace representing a single channel
Parameters
----------
trace: obspy.core.Trace
a stream containing a single channel of data.
Returns
-------
array of gaps, which is empty when there are no gaps.
each gap is an array [start of gap, end of gap, next sample]
"""
gaps = []
gap = None
data = trace.data
stats = trace.stats
starttime = stats.starttime
length = len(data)
delta = stats.delta
for i in range(0, length):
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
if numpy.isnan(data[i]):
if gap is None:
# start of a gap
gap = [starttime + i * delta]
else:
if gap is not None:
# end of a gap
gap.extend([
starttime + (i - 1) * delta,
starttime + i * delta])
gaps.append(gap)
gap = None
# check for gap at end
if gap is not None:
gap.extend([
starttime + (length - 1) * delta,
starttime + length * delta])
gaps.append(gap)
return gaps
def get_merged_gaps(gaps):
"""Get gaps merged across channels/streams
Parameters
----------
gaps: dictionary
contains channel/gap array pairs
Returns
-------
array_like
an array of startime/endtime arrays representing gaps.
Notes
-----
Takes an dictionary of gaps, and merges those gaps across channels,
returning an array of the merged gaps.
"""
merged_gaps = []
for key in gaps:
merged_gaps.extend(gaps[key])
# sort gaps so earlier gaps are before later gaps
sorted_gaps = sorted(merged_gaps, key=lambda gap: gap[0])
# merge gaps that overlap
merged_gaps = []
merged_gap = None
for gap in sorted_gaps:
if merged_gap is None:
# start of gap
merged_gap = gap
elif gap[0] > merged_gap[2]:
# next gap starts after current gap ends
merged_gaps.append(merged_gap)
merged_gap = gap
elif gap[0] <= merged_gap[2]:
# next gap starts at or before next data
if gap[1] > merged_gap[1]:
# next gap ends after current gap ends, extend current
merged_gap[1] = gap[1]
merged_gap[2] = gap[2]
if merged_gap is not None:
merged_gaps.append(merged_gap)
return merged_gaps
def get_channels(stream):
"""Get a list of channels in a stream.
Parameters
----------
stream : obspy.core.Stream
Returns
-------
channels : array_like
"""
channels = {}
for trace in stream:
channel = trace.stats.channel
if channel:
channels[channel] = True
return [ch for ch in channels]
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def mask_stream(stream):
"""Convert stream traces to masked arrays.
Parameters
----------
stream : obspy.core.Stream
stream to mask
Returns
-------
obspy.core.Stream
stream with new Trace objects with numpy masked array data.
"""
masked = obspy.core.Stream()
for trace in stream:
masked += obspy.core.Trace(
numpy.ma.masked_invalid(trace.data),
trace.stats)
return masked
def unmask_stream(stream):
"""Convert stream traces to unmasked arrays.
Parameters
----------
stream : obspy.core.Stream
stream to unmask
Returns
-------
obspy.core.Stream
stream with new Trace objects with numpy array data, with numpy.nan
as a fill value in a filled array.
"""
unmasked = obspy.core.Stream()
for trace in stream:
unmasked += obspy.core.Trace(
trace.data.filled(fill_value=numpy.nan)
if isinstance(trace.data, numpy.ma.MaskedArray)
else trace.data,
trace.stats)
return unmasked
def merge_streams(*streams):
"""Merge one or more streams.
Parameters
----------
*streams : obspy.core.Stream
one or more streams to merge
Returns
-------
obspy.core.Stream
stream with contiguous traces merged, and gaps filled with numpy.nan
"""
merged = obspy.core.Stream()
arigdon-usgs
committed
masked = obspy.core.Stream()
arigdon-usgs
committed
# sort out empty
arigdon-usgs
committed
for trace in stream:
if numpy.isnan(trace.data).all():
masked += trace
arigdon-usgs
committed
merged += trace
merged = mask_stream(merged)
# split traces that contain gaps
merged = merged.split()
arigdon-usgs
committed
merged += masked
# merge data
merged.merge(
# 1 = do not interpolate
interpolation_samples=1,
arigdon-usgs
committed
fill_value=numpy.NaN,
# 1 = when there is overlap, use data from trace with last endtime
method=1)
# convert back to NaN filled array
merged = unmask_stream(merged)
return merged