Newer
Older
"""Timeseries Utilities"""
from builtins import range
from datetime import datetime
import math
import numpy
from obspy.core import Stats, Stream, Trace, UTCDateTime
from .Util import get_intervals
def create_empty_trace(
starttime, endtime, observatory, channel, type, interval, network, station, location
):
"""create an empty trace filled with nans.
Parameters
----------
starttime: UTCDateTime
the starttime of the requested data
the endtime of the requested data
observatory : str
observatory code
channel : str
single character channel {H, E, D, Z, F}
type : str
data type {definitive, quasi-definitive, variation}
interval : str
interval length {day, hour, minute, second, tenhertz}
network: str
the network code
station: str
the observatory station code
location: str
the location code
Returns
-------
trace for the requested channel
"""
delta = get_delta_from_interval(interval)
stats.network = network
stats.station = station
stats.location = location
stats.channel = channel
# Calculate first valid sample time based on interval
trace_starttime = UTCDateTime(numpy.ceil(starttime.timestamp / delta) * delta)
# hourly and daily average have mid-interval timestamps, minus 30 seconds
if trace_starttime - starttime < (delta + 60) / 2:
trace_starttime += (delta - 60) / 2
else:
trace_starttime += (delta - 60) / 2 - delta
if starttime > trace_starttime or endtime < trace_starttime:
sys.stdout.write(
'No allowed "%s" timestamps between %s and %s, '
% (interval, starttime, endtime)
+ "returning empty trace\n",
stats.starttime = trace_starttime
stats.delta = delta
# Calculate number of valid samples up to or before endtime
length = int((endtime - trace_starttime) / delta)
data = numpy.full(stats.npts, numpy.nan, dtype=numpy.float64)
return Trace(data, stats)
def encode_stream(stream: Stream, encoding: str) -> Stream:
"""Ensures that factory encoding matches output data encoding
Parameters:
-----------
stream: Stream
stream of input data
Returns:
--------
out_stream: Stream
stream with matching data encoding to factory specification
"""
out_stream = Stream()
for trace in stream:
trace_out = trace.copy()
if trace_out.data.dtype != encoding:
if "STEIM" in encoding.upper():
trace_out.data = trace_out.data.astype(numpy.int32)
else:
trace_out.data = trace_out.data.astype(encoding)
if "mseed" in trace_out.stats:
trace_out.stats.mseed.encoding = encoding.upper()
out_stream += trace_out
return out_stream
def get_delta_from_interval(data_interval):
"""Convert interval name to number of seconds
Parameters
----------
interval : str
interval length {day, hour, minute, second, tenhertz}
Returns
-------
int
number of seconds for interval, or None if unknown
"""
elif data_interval == "second":
delta = 1.0
elif data_interval == "minute":
delta = 60.0
elif data_interval == "hour":
delta = 3600.0
elif data_interval == "day":
delta = 86400.0
else:
delta = None
return delta
def get_interval_from_delta(delta):
"""Convert delta to an interval name
Parameters
----------
number of seconds for interval, or None if unknown
Returns
-------
interval : str
interval length {day, hour, minute, second, tenhertz}
"""
data_interval = "tenhertz"
elif delta == 1:
def get_stream_start_end_times(timeseries, without_gaps=False):
"""get start and end times from a stream.
Traverses all traces, and find the earliest starttime, and
the latest endtime.
Parameters
----------
The timeseries stream
Returns
-------
tuple: (starttime, endtime)
starttime: UTCDateTime
endtime: UTCDateTime
NOTE: when the entire timeseries is a gap, and without_gaps is True,
the returned endtime will be one delta earlier than starttime.
starttime = UTCDateTime(datetime.now())
endtime = UTCDateTime(0)
for trace in timeseries:
if trace.stats.starttime < starttime:
starttime = trace.stats.starttime
if trace.stats.endtime > endtime:
endtime = trace.stats.endtime
if without_gaps:
gaps = get_merged_gaps(get_stream_gaps(timeseries))
for gap in gaps:
if gap[0] == starttime and gap[1] != endtime:
# gap at start of timeseries, move to first data point
starttime = gap[2]
elif gap[1] == endtime:
# gap at end of timeseries
endtime = gap[0] - timeseries[0].stats.delta
return (starttime, endtime)
def get_stream_gaps(stream, channels=None):
"""Get gaps in a given stream
Parameters
----------
the stream to check for gaps
channels: array_like
list of channels to check for gaps
Default is None (check all channels).
Returns
-------
dictionary of channel gaps arrays
Notes
-----
Returns a dictionary with channel: gaps array pairs. Where the gaps array
consists of arrays of starttime/endtime pairs representing each gap.
"""
gaps = {}
for trace in stream:
channel = trace.stats.channel
if channels is not None and channel not in channels:
continue
if channel in gaps:
gaps[channel].extend(get_trace_gaps(trace))
else:
gaps[channel] = get_trace_gaps(trace)
return gaps
def get_trace_gaps(trace):
"""Gets gaps in a trace representing a single channel
Parameters
----------
a stream containing a single channel of data.
Returns
-------
array of gaps, which is empty when there are no gaps.
each gap is an array [start of gap, end of gap, next sample]
"""
gaps = []
gap = None
data = trace.data
stats = trace.stats
starttime = stats.starttime
length = len(data)
delta = stats.delta
for i in range(0, length):
if numpy.isnan(data[i]):
if gap is None:
# start of a gap
gap = [starttime + i * delta]
else:
if gap is not None:
# end of a gap
gap.extend([starttime + (i - 1) * delta, starttime + i * delta])
gaps.append(gap)
gap = None
# check for gap at end
if gap is not None:
gap.extend([starttime + (length - 1) * delta, starttime + length * delta])
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
gaps.append(gap)
return gaps
def get_merged_gaps(gaps):
"""Get gaps merged across channels/streams
Parameters
----------
gaps: dictionary
contains channel/gap array pairs
Returns
-------
array_like
an array of startime/endtime arrays representing gaps.
Notes
-----
Takes an dictionary of gaps, and merges those gaps across channels,
returning an array of the merged gaps.
"""
merged_gaps = []
for key in gaps:
merged_gaps.extend(gaps[key])
# sort gaps so earlier gaps are before later gaps
sorted_gaps = sorted(merged_gaps, key=lambda gap: gap[0])
# merge gaps that overlap
merged_gaps = []
merged_gap = None
for gap in sorted_gaps:
if merged_gap is None:
# start of gap
merged_gap = gap
elif gap[0] > merged_gap[2]:
# next gap starts after current gap ends
merged_gaps.append(merged_gap)
merged_gap = gap
elif gap[0] <= merged_gap[2]:
# next gap starts at or before next data
if gap[1] > merged_gap[1]:
# next gap ends after current gap ends, extend current
merged_gap[1] = gap[1]
merged_gap[2] = gap[2]
if merged_gap is not None:
merged_gaps.append(merged_gap)
return merged_gaps
def get_channels(stream):
"""Get a list of channels in a stream.
Parameters
----------
Returns
-------
channels : array_like
"""
channels = {}
for trace in stream:
channel = trace.stats.channel
if channel:
channels[channel] = True
return [ch for ch in channels]
def get_trace_value(traces, time, default=None):
"""Get a value at a specific time.
Parameters
----------
trace : Trace
time : UTCDateTime
Returns
-------
nearest time in trace
value from trace at nearest time, or None
"""
# array of UTCDateTime values corresponding
for trace in traces:
times = trace.times("utcdatetime")
index = times.searchsorted(time)
trace_time = times[index]
trace_value = trace.data[index]
if trace_time == time:
if numpy.isnan(trace_value):
return default
else:
return trace_value
return default
def has_all_channels(stream, channels, starttime, endtime):
"""Check whether all channels have any data within time range.
Parameters
----------
The input stream we want to make certain has data
channels: array_like
The list of channels that we want to have concurrent data
starttime: UTCDateTime
start time of requested output
end : UTCDateTime
end time of requested output
Returns
-------
bool: True if data found across all channels between starttime/endtime
"""
input_gaps = get_merged_gaps(get_stream_gaps(stream=stream, channels=channels))
for input_gap in input_gaps:
# Check for gaps that include the entire range
if (
starttime >= input_gap[0]
and starttime <= input_gap[1]
and endtime < input_gap[2]
):
return False
return True
def has_any_channels(stream, channels, starttime, endtime):
"""Check whether any channel has data within time range.
Parameters
----------
The input stream we want to make certain has data
channels: array_like
The list of channels that we want to have concurrent data
starttime: UTCDateTime
start time of requested output
end : UTCDateTime
end time of requested output
Returns
-------
bool: True if any data found between starttime/endtime
"""
# process if any channels have data not covering the time range
for trace in stream:
if trace.stats.channel in channels:
gaps = get_trace_gaps(trace)
if len(gaps) == 0:
# no gaps in trace;
# "any" critierion achieved for stream
return True
nodata = [
(starttime >= gap[0] and starttime <= gap[1] and endtime < gap[2])
for gap in gaps
]
if not any(nodata):
# no gaps in trace span interval;
# "any" criterion achieved for stream
return True
# didn't find any data
return False
def mask_stream(stream):
"""Convert stream traces to masked arrays.
Parameters
----------
stream to mask
Returns
-------
stream with new Trace objects with numpy masked array data.
"""
for trace in stream:
masked += Trace(numpy.ma.masked_invalid(trace.data), trace.stats)
return masked
def unmask_stream(stream):
"""Convert stream traces to unmasked arrays.
Parameters
----------
stream to unmask
Returns
-------
stream with new Trace objects with numpy array data, with numpy.nan
as a fill value in a filled array.
"""
for trace in stream:
(
trace.data.filled(fill_value=numpy.nan)
if isinstance(trace.data, numpy.ma.MaskedArray)
else trace.data
),
return unmasked
def merge_streams(*streams):
"""Merge one or more streams.
Parameters
----------
one or more streams to merge
Returns
-------
stream with contiguous traces merged, and gaps filled with numpy.nan
"""
arigdon-usgs
committed
# sort out empty
arigdon-usgs
committed
merged += stream
arigdon-usgs
committed
arigdon-usgs
committed
split = mask_stream(merged)
# split traces that contain gaps
arigdon-usgs
committed
split = split.split()
# Re-add any empty traces that were removed by split()
arigdon-usgs
committed
for trace in merged:
stats = trace.stats
split_stream = split.select(
channel=stats.channel,
station=stats.station,
network=stats.network,
location=stats.location,
)
arigdon-usgs
committed
if len(split_stream) == 0:
readd += trace
split += readd
arigdon-usgs
committed
arigdon-usgs
committed
split.merge(
# 1 = do not interpolate
interpolation_samples=0,
# 1 = when there is overlap, use data from trace with last endtime
method=1,
# np.nan = work-around for (problematic) intermediate masked arrays
fill_value=numpy.nan,
)
# convert back to NaN filled array
arigdon-usgs
committed
merged = unmask_stream(split)
def pad_timeseries(timeseries, starttime, endtime):
"""Calls pad_and_trim_trace for each trace in a stream.
Traces should be merged before calling this method.
Parameters
----------
The timeseries stream as returned by the call to getWaveform
starttime: UTCDateTime
the starttime of the requested data
endtime: UTCDateTime
the endtime of the requested data
Notes: the original timeseries object is changed.
"""
for trace in timeseries:
pad_and_trim_trace(trace, starttime, endtime)
def pad_and_trim_trace(trace, starttime, endtime):
"""Pads and trims trace data so it is in the range [starttime, endtime].
Uses trace stats to compute start/end times that are consistent with
other trace data. (starttime and endtime are not checked).
Parameters
----------
One trace to be processed
starttime: UTCDateTime
the starttime of the requested data
endtime: UTCDateTime
the endtime of the requested data
Notes: the original timeseries object is changed.
"""
trace_starttime = UTCDateTime(trace.stats.starttime)
trace_endtime = UTCDateTime(trace.stats.endtime)
trace_delta = trace.stats.delta
if trace_starttime < starttime:
# trim to starttime
cnt = int(math.ceil(round((starttime - trace_starttime) / trace_delta, 6)))
if cnt > 0:
trace.data = trace.data[cnt:]
trace_starttime = trace_starttime + trace_delta * cnt
trace.stats.starttime = trace_starttime
elif trace_starttime > starttime:
# pad to starttime
cnt = int(round((trace_starttime - starttime) / trace_delta, 6))
# cnt = int((trace_starttime - starttime) / trace_delta)
if cnt > 0:
trace.data = numpy.concatenate(
[numpy.full(cnt, numpy.nan, dtype=numpy.float64), trace.data]
)
trace_starttime = trace_starttime - trace_delta * cnt
trace.stats.starttime = trace_starttime
if trace_endtime > endtime:
# trim to endtime, at least 1 sample to remove
cnt = int(math.ceil(round((trace_endtime - endtime) / trace_delta, 6)))
if cnt > 0:
trace.data = trace.data[:-cnt]
trace.stats.npts = len(trace.data)
elif trace_endtime < endtime:
# pad to endtime
cnt = int(round((endtime - trace_endtime) / trace.stats.delta, 6))
# cnt = int((endtime - trace_endtime) / trace.stats.delta)
if cnt > 0:
trace.data = numpy.concatenate(
[trace.data, numpy.full(cnt, numpy.nan, dtype=numpy.float64)]
)
def round_usecs(time):
"""Rounds residual microseconds to milliseconds.
Parameters
----------
time: UTCDateTime
time containing microsecond values
Returns
----------
time: UTCDateTime
time containing rounded(or non-rounded) microsecond values
"""
# round microseconds to nearest millisecond
rounded_usecs = int(round(usecs / 1000, 0)) * 1000
# reset microseconds to 0 at top of second, add second to input time
if rounded_usecs > 999000:
rounded_usecs = 0
time += 1
if rounded_usecs != usecs:
time = time.replace(microsecond=rounded_usecs)
return time
def split_stream(stream: Stream, size: int = 86400) -> Stream:
out_stream = Stream()
for trace in stream:
out_stream += split_trace(trace, size)
return out_stream
def split_trace(trace: Trace, size: int = 86400) -> Stream:
# copy in case original trace changes later
stream = Stream()
out_trace = trace.copy()
for interval in get_intervals(
starttime=out_trace.stats.starttime,
endtime=out_trace.stats.endtime,
size=size,
trim=True,
):
interval_start = interval["start"]
interval_end = interval["end"]
delta = out_trace.stats.delta
if interval_end - delta < interval_start:
# trace contains one sample
if interval_end.timestamp % size:
# trace does NOT contain first sample in next interval
stream += out_trace.slice(
starttime=interval_start, endtime=interval_end, nearest_sample=False
)
else:
# trace DOES contain first sample in next interval
stream += out_trace.slice(
starttime=interval_start,
endtime=interval_end - delta,
nearest_sample=False,
)
if interval_end == out_trace.stats.endtime:
# ONLY if it is the last interval
stream += out_trace.slice(
starttime=interval_end, endtime=interval_end, nearest_sample=False
)