Newer
Older
"""Timeseries Utilities"""
from builtins import range
from datetime import datetime
import math
import numpy
import obspy.core
def create_empty_trace(
starttime, endtime, observatory, channel, type, interval, network, station, location
):
"""create an empty trace filled with nans.
Parameters
----------
starttime: obspy.core.UTCDateTime
the starttime of the requested data
endtime: obspy.core.UTCDateTime
the endtime of the requested data
observatory : str
observatory code
channel : str
single character channel {H, E, D, Z, F}
type : str
data type {definitive, quasi-definitive, variation}
interval : str
interval length {minute, second}
network: str
the network code
station: str
the observatory station code
location: str
the location code
Returns
-------
obspy.core.Trace
trace for the requested channel
"""
delta = get_delta_from_interval(interval)
stats = obspy.core.Stats()
stats.network = network
stats.station = station
stats.location = location
stats.channel = channel
# Calculate first valid sample time based on interval
trace_starttime = obspy.core.UTCDateTime(
numpy.ceil(starttime.timestamp / delta) * delta
)
if delta > 60.0:
trace_starttime += (delta - 60) / 2
if trace_starttime > endtime:
sys.stderr.write("Starttime greater than endtime, shifting endtime to next sample")
endtime = trace_starttime + delta
stats.starttime = trace_starttime
stats.delta = delta
# Calculate number of valid samples up to or before endtime
length = int((endtime - trace_starttime) / delta)
stats.npts = length + 1
data = numpy.full(stats.npts, numpy.nan, dtype=numpy.float64)
return obspy.core.Trace(data, stats)
def get_delta_from_interval(data_interval):
"""Convert interval name to number of seconds
Parameters
----------
interval : str
interval length {day, hour, minute, second, tenhertz}
Returns
-------
int
number of seconds for interval, or None if unknown
"""
elif data_interval == "second":
delta = 1.0
elif data_interval == "minute":
delta = 60.0
elif data_interval == "hour":
delta = 3600.0
elif data_interval == "day":
delta = 86400.0
else:
delta = None
return delta
def get_interval_from_delta(delta):
"""Convert delta to an interval name
Parameters
----------
number of seconds for interval, or None if unknown
Returns
-------
interval : str
interval length {day, hour, minute, second, tenhertz}
"""
data_interval = "tenhertz"
elif delta == 1:
def get_stream_start_end_times(timeseries, without_gaps=False):
"""get start and end times from a stream.
Traverses all traces, and find the earliest starttime, and
the latest endtime.
Parameters
----------
timeseries: obspy.core.stream
The timeseries stream
Returns
-------
tuple: (starttime, endtime)
starttime: obspy.core.UTCDateTime
endtime: obspy.core.UTCDateTime
NOTE: when the entire timeseries is a gap, and without_gaps is True,
the returned endtime will be one delta earlier than starttime.
"""
starttime = obspy.core.UTCDateTime(datetime.now())
endtime = obspy.core.UTCDateTime(0)
for trace in timeseries:
if trace.stats.starttime < starttime:
starttime = trace.stats.starttime
if trace.stats.endtime > endtime:
endtime = trace.stats.endtime
if without_gaps:
gaps = get_merged_gaps(get_stream_gaps(timeseries))
for gap in gaps:
if gap[0] == starttime and gap[1] != endtime:
# gap at start of timeseries, move to first data point
starttime = gap[2]
elif gap[1] == endtime:
# gap at end of timeseries
endtime = gap[0] - timeseries[0].stats.delta
return (starttime, endtime)
def get_stream_gaps(stream, channels=None):
"""Get gaps in a given stream
Parameters
----------
stream: obspy.core.Stream
the stream to check for gaps
channels: array_like
list of channels to check for gaps
Default is None (check all channels).
Returns
-------
dictionary of channel gaps arrays
Notes
-----
Returns a dictionary with channel: gaps array pairs. Where the gaps array
consists of arrays of starttime/endtime pairs representing each gap.
"""
gaps = {}
for trace in stream:
channel = trace.stats.channel
if channels is not None and channel not in channels:
continue
gaps[channel] = get_trace_gaps(trace)
return gaps
def get_trace_gaps(trace):
"""Gets gaps in a trace representing a single channel
Parameters
----------
trace: obspy.core.Trace
a stream containing a single channel of data.
Returns
-------
array of gaps, which is empty when there are no gaps.
each gap is an array [start of gap, end of gap, next sample]
"""
gaps = []
gap = None
data = trace.data
stats = trace.stats
starttime = stats.starttime
length = len(data)
delta = stats.delta
for i in range(0, length):
if numpy.isnan(data[i]):
if gap is None:
# start of a gap
gap = [starttime + i * delta]
else:
if gap is not None:
# end of a gap
gap.extend([starttime + (i - 1) * delta, starttime + i * delta])
gaps.append(gap)
gap = None
# check for gap at end
if gap is not None:
gap.extend([starttime + (length - 1) * delta, starttime + length * delta])
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
gaps.append(gap)
return gaps
def get_merged_gaps(gaps):
"""Get gaps merged across channels/streams
Parameters
----------
gaps: dictionary
contains channel/gap array pairs
Returns
-------
array_like
an array of startime/endtime arrays representing gaps.
Notes
-----
Takes an dictionary of gaps, and merges those gaps across channels,
returning an array of the merged gaps.
"""
merged_gaps = []
for key in gaps:
merged_gaps.extend(gaps[key])
# sort gaps so earlier gaps are before later gaps
sorted_gaps = sorted(merged_gaps, key=lambda gap: gap[0])
# merge gaps that overlap
merged_gaps = []
merged_gap = None
for gap in sorted_gaps:
if merged_gap is None:
# start of gap
merged_gap = gap
elif gap[0] > merged_gap[2]:
# next gap starts after current gap ends
merged_gaps.append(merged_gap)
merged_gap = gap
elif gap[0] <= merged_gap[2]:
# next gap starts at or before next data
if gap[1] > merged_gap[1]:
# next gap ends after current gap ends, extend current
merged_gap[1] = gap[1]
merged_gap[2] = gap[2]
if merged_gap is not None:
merged_gaps.append(merged_gap)
return merged_gaps
def get_channels(stream):
"""Get a list of channels in a stream.
Parameters
----------
stream : obspy.core.Stream
Returns
-------
channels : array_like
"""
channels = {}
for trace in stream:
channel = trace.stats.channel
if channel:
channels[channel] = True
return [ch for ch in channels]
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def get_trace_value(traces, time, default=None):
"""Get a value at a specific time.
Parameters
----------
trace : obspy.core.Trace
time : obspy.core.UTCDateTime
Returns
-------
nearest time in trace
value from trace at nearest time, or None
"""
# array of UTCDateTime values corresponding
for trace in traces:
times = trace.times("utcdatetime")
index = times.searchsorted(time)
trace_time = times[index]
trace_value = trace.data[index]
if trace_time == time:
if numpy.isnan(trace_value):
return default
else:
return trace_value
return default
def has_all_channels(stream, channels, starttime, endtime):
"""Check whether all channels have any data within time range.
Parameters
----------
stream: obspy.core.Stream
The input stream we want to make certain has data
channels: array_like
The list of channels that we want to have concurrent data
starttime: UTCDateTime
start time of requested output
end : UTCDateTime
end time of requested output
Returns
-------
bool: True if data found across all channels between starttime/endtime
"""
input_gaps = get_merged_gaps(get_stream_gaps(stream=stream, channels=channels))
for input_gap in input_gaps:
# Check for gaps that include the entire range
if (
starttime >= input_gap[0]
and starttime <= input_gap[1]
and endtime < input_gap[2]
):
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
return False
return True
def has_any_channels(stream, channels, starttime, endtime):
"""Check whether any channel has data within time range.
Parameters
----------
stream: obspy.core.Stream
The input stream we want to make certain has data
channels: array_like
The list of channels that we want to have concurrent data
starttime: UTCDateTime
start time of requested output
end : UTCDateTime
end time of requested output
Returns
-------
bool: True if any data found between starttime/endtime
"""
# process if any channels have data not covering the time range
input_gaps = get_stream_gaps(stream=stream, channels=channels)
for channel in channels:
if channel not in input_gaps:
continue
channel_gaps = input_gaps[channel]
if len(channel_gaps) == 0:
# no gaps in channel
return True
for gap in channel_gaps:
if not (starttime >= gap[0] and starttime <= gap[1] and endtime < gap[2]):
# gap doesn't span channel
return True
# didn't find any data
return False
def mask_stream(stream):
"""Convert stream traces to masked arrays.
Parameters
----------
stream : obspy.core.Stream
stream to mask
Returns
-------
obspy.core.Stream
stream with new Trace objects with numpy masked array data.
"""
masked = obspy.core.Stream()
for trace in stream:
masked += obspy.core.Trace(numpy.ma.masked_invalid(trace.data), trace.stats)
return masked
def unmask_stream(stream):
"""Convert stream traces to unmasked arrays.
Parameters
----------
stream : obspy.core.Stream
stream to unmask
Returns
-------
obspy.core.Stream
stream with new Trace objects with numpy array data, with numpy.nan
as a fill value in a filled array.
"""
unmasked = obspy.core.Stream()
for trace in stream:
unmasked += obspy.core.Trace(
trace.data.filled(fill_value=numpy.nan)
if isinstance(trace.data, numpy.ma.MaskedArray)
else trace.data,
trace.stats,
)
return unmasked
def merge_streams(*streams):
"""Merge one or more streams.
Parameters
----------
*streams : obspy.core.Stream
one or more streams to merge
Returns
-------
obspy.core.Stream
stream with contiguous traces merged, and gaps filled with numpy.nan
"""
merged = obspy.core.Stream()
arigdon-usgs
committed
# sort out empty
arigdon-usgs
committed
merged += stream
arigdon-usgs
committed
arigdon-usgs
committed
split = mask_stream(merged)
# split traces that contain gaps
arigdon-usgs
committed
split = split.split()
# Re-add any empty traces that were removed by split()
readd = obspy.core.Stream()
for trace in merged:
stats = trace.stats
split_stream = split.select(
channel=stats.channel,
station=stats.station,
network=stats.network,
location=stats.location,
)
arigdon-usgs
committed
if len(split_stream) == 0:
readd += trace
split += readd
arigdon-usgs
committed
arigdon-usgs
committed
split.merge(
# 1 = do not interpolate
interpolation_samples=0,
# 1 = when there is overlap, use data from trace with last endtime
method=1,
# np.nan = work-around for (problematic) intermediate masked arrays
fill_value=numpy.nan,
)
# convert back to NaN filled array
arigdon-usgs
committed
merged = unmask_stream(split)
def pad_timeseries(timeseries, starttime, endtime):
"""Calls pad_and_trim_trace for each trace in a stream.
Traces should be merged before calling this method.
Parameters
----------
timeseries: obspy.core.stream
The timeseries stream as returned by the call to getWaveform
starttime: obspy.core.UTCDateTime
the starttime of the requested data
endtime: obspy.core.UTCDateTime
the endtime of the requested data
Notes: the original timeseries object is changed.
"""
for trace in timeseries:
pad_and_trim_trace(trace, starttime, endtime)
def pad_and_trim_trace(trace, starttime, endtime):
"""Pads and trims trace data so it is in the range [starttime, endtime].
Uses trace stats to compute start/end times that are consistent with
other trace data. (starttime and endtime are not checked).
Parameters
----------
trace: obspy.core.Trace
One trace to be processed
starttime: obspy.core.UTCDateTime
the starttime of the requested data
endtime: obspy.core.UTCDateTime
the endtime of the requested data
Notes: the original timeseries object is changed.
"""
trace_starttime = obspy.core.UTCDateTime(trace.stats.starttime)
trace_endtime = obspy.core.UTCDateTime(trace.stats.endtime)
trace_delta = trace.stats.delta
if trace_starttime < starttime:
# trim to starttime
cnt = int(math.ceil(round((starttime - trace_starttime) / trace_delta, 6)))
if cnt > 0:
trace.data = trace.data[cnt:]
trace_starttime = trace_starttime + trace_delta * cnt
trace.stats.starttime = trace_starttime
elif trace_starttime > starttime:
# pad to starttime
cnt = int(round((trace_starttime - starttime) / trace_delta, 6))
# cnt = int((trace_starttime - starttime) / trace_delta)
if cnt > 0:
trace.data = numpy.concatenate(
[numpy.full(cnt, numpy.nan, dtype=numpy.float64), trace.data]
)
trace_starttime = trace_starttime - trace_delta * cnt
trace.stats.starttime = trace_starttime
if trace_endtime > endtime:
# trim to endtime, at least 1 sample to remove
cnt = int(math.ceil(round((trace_endtime - endtime) / trace_delta, 6)))
trace.data = trace.data[:-cnt]
trace.stats.npts = len(trace.data)
elif trace_endtime < endtime:
# pad to endtime
cnt = int(round((endtime - trace_endtime) / trace.stats.delta, 6))
# cnt = int((endtime - trace_endtime) / trace.stats.delta)
if cnt > 0:
trace.data = numpy.concatenate(
[trace.data, numpy.full(cnt, numpy.nan, dtype=numpy.float64)]
)
def round_usecs(time):
"""Rounds residual microseconds to milliseconds.
Parameters
----------
time: UTCDateTime
time containing microsecond values
Returns
----------
time: UTCDateTime
time containing rounded(or non-rounded) microsecond values
"""
# round microseconds to nearest millisecond
rounded_usecs = int(round(usecs / 1000, 0)) * 1000
# reset microseconds to 0 at top of second, add second to input time
if rounded_usecs > 999000:
rounded_usecs = 0
time += 1
if rounded_usecs != usecs:
time = time.replace(microsecond=rounded_usecs)
return time