Skip to content
Snippets Groups Projects
TimeseriesUtility.py 18.8 KiB
Newer Older
from builtins import range
from datetime import datetime
from obspy.core import Stats, Stream, Trace, UTCDateTime

from .Util import get_intervals
def create_empty_trace(
    starttime, endtime, observatory, channel, type, interval, network, station, location
):
    """create an empty trace filled with nans.

    Parameters
    ----------
    starttime: UTCDateTime
        the starttime of the requested data
        the endtime of the requested data
    observatory : str
        observatory code
    channel : str
        single character channel {H, E, D, Z, F}
    type : str
        data type {definitive, quasi-definitive, variation}
    interval : str
        interval length {day, hour, minute, second, tenhertz}
    network: str
        the network code
    station: str
        the observatory station code
    location: str
        the location code
    Returns
    -------
        trace for the requested channel
    """
    delta = get_delta_from_interval(interval)
    stats.network = network
    stats.station = station
    stats.location = location
    stats.channel = channel
    # Calculate first valid sample time based on interval
    trace_starttime = UTCDateTime(numpy.ceil(starttime.timestamp / delta) * delta)
        # hourly and daily average have mid-interval timestamps, minus 30 seconds
        if trace_starttime - starttime < (delta + 60) / 2:
            trace_starttime += (delta - 60) / 2
        else:
            trace_starttime += (delta - 60) / 2 - delta

        if starttime > trace_starttime or endtime < trace_starttime:
            sys.stdout.write(
                'No allowed "%s" timestamps between %s and %s, '
                % (interval, starttime, endtime)
                + "returning empty trace\n",
Cain, Payton David's avatar
Cain, Payton David committed
            )
    stats.starttime = trace_starttime
    stats.delta = delta
    # Calculate number of valid samples up to or before endtime
    length = int((endtime - trace_starttime) / delta)
    stats.npts = max(0, length + 1)
    data = numpy.full(stats.npts, numpy.nan, dtype=numpy.float64)
    return Trace(data, stats)


def encode_stream(stream: Stream, encoding: str) -> Stream:
    """Ensures that factory encoding matches output data encoding

    Parameters:
    -----------
    stream: Stream
        stream of input data

    Returns:
    --------
    out_stream: Stream
        stream with matching data encoding to factory specification

    """
    out_stream = Stream()
    for trace in stream:
        trace_out = trace.copy()
        if trace_out.data.dtype != encoding:
            if "STEIM" in encoding.upper():
                trace_out.data = trace_out.data.astype(numpy.int32)
            else:
                trace_out.data = trace_out.data.astype(encoding)
            if "mseed" in trace_out.stats:
                trace_out.stats.mseed.encoding = encoding.upper()
        out_stream += trace_out
    return out_stream
def get_delta_from_interval(data_interval):
    """Convert interval name to number of seconds

    Parameters
    ----------
    interval : str
        interval length {day, hour, minute, second, tenhertz}

    Returns
    -------
    int
        number of seconds for interval, or None if unknown
    """
    if data_interval == "tenhertz":
    elif data_interval == "second":
        delta = 1.0
    elif data_interval == "minute":
        delta = 60.0
    elif data_interval == "hour":
        delta = 3600.0
    elif data_interval == "day":
        delta = 86400.0
def get_interval_from_delta(delta):
    """Convert delta to an interval name

    Parameters
    ----------
Travis Rivers's avatar
Travis Rivers committed
    delta: str
        number of seconds for interval, or None if unknown

    Returns
    -------
    interval : str
        interval length {day, hour, minute, second, tenhertz}
    """
    if delta == 0.1:
        data_interval = "tenhertz"
    elif delta == 1:
        data_interval = "second"
    elif delta == 60:
        data_interval = "minute"
    elif delta == 3600:
        data_interval = "hour"
    elif delta == 86400:
        data_interval = "day"
    else:
        data_interval = delta
    return data_interval

def get_stream_start_end_times(timeseries, without_gaps=False):
    """get start and end times from a stream.
            Traverses all traces, and find the earliest starttime, and
            the latest endtime.
    Parameters
    ----------
        The timeseries stream

    Returns
    -------
    tuple: (starttime, endtime)
        starttime: UTCDateTime
        endtime: UTCDateTime

    NOTE: when the entire timeseries is a gap, and without_gaps is True,
    the returned endtime will be one delta earlier than starttime.
    starttime = UTCDateTime(datetime.now())
    endtime = UTCDateTime(0)
    for trace in timeseries:
        if trace.stats.starttime < starttime:
            starttime = trace.stats.starttime
        if trace.stats.endtime > endtime:
            endtime = trace.stats.endtime
    if without_gaps:
        gaps = get_merged_gaps(get_stream_gaps(timeseries))
        for gap in gaps:
            if gap[0] == starttime and gap[1] != endtime:
                # gap at start of timeseries, move to first data point
                starttime = gap[2]
            elif gap[1] == endtime:
                # gap at end of timeseries
                endtime = gap[0] - timeseries[0].stats.delta
def get_stream_gaps(stream, channels=None):
    """Get gaps in a given stream
    Parameters
    ----------
        the stream to check for gaps
    channels: array_like
        list of channels to check for gaps
        Default is None (check all channels).

    Returns
    -------
    dictionary of channel gaps arrays

    Notes
    -----
    Returns a dictionary with channel: gaps array pairs. Where the gaps array
        consists of arrays of starttime/endtime pairs representing each gap.
    """
    gaps = {}
    for trace in stream:
        channel = trace.stats.channel
        if channels is not None and channel not in channels:
            continue
        if channel in gaps:
            gaps[channel].extend(get_trace_gaps(trace))
        else:
            gaps[channel] = get_trace_gaps(trace)
    return gaps


def get_trace_gaps(trace):
    """Gets gaps in a trace representing a single channel
    Parameters
    ----------
        a stream containing a single channel of data.

    Returns
    -------
    array of gaps, which is empty when there are no gaps.
    each gap is an array [start of gap, end of gap, next sample]
    """
    gaps = []
    gap = None
    data = trace.data
    stats = trace.stats
    starttime = stats.starttime
    length = len(data)
    delta = stats.delta
        if numpy.isnan(data[i]):
            if gap is None:
                # start of a gap
                gap = [starttime + i * delta]
        else:
            if gap is not None:
                # end of a gap
                gap.extend([starttime + (i - 1) * delta, starttime + i * delta])
                gaps.append(gap)
                gap = None
    # check for gap at end
    if gap is not None:
        gap.extend([starttime + (length - 1) * delta, starttime + length * delta])
        gaps.append(gap)
    return gaps


def get_merged_gaps(gaps):
    """Get gaps merged across channels/streams
    Parameters
    ----------
    gaps: dictionary
        contains channel/gap array pairs

    Returns
    -------
    array_like
        an array of startime/endtime arrays representing gaps.

    Notes
    -----
    Takes an dictionary of gaps, and merges those gaps across channels,
        returning an array of the merged gaps.
    """
    merged_gaps = []
    for key in gaps:
        merged_gaps.extend(gaps[key])
    # sort gaps so earlier gaps are before later gaps
    sorted_gaps = sorted(merged_gaps, key=lambda gap: gap[0])
    # merge gaps that overlap
    merged_gaps = []
    merged_gap = None
    for gap in sorted_gaps:
        if merged_gap is None:
            # start of gap
            merged_gap = gap
        elif gap[0] > merged_gap[2]:
            # next gap starts after current gap ends
            merged_gaps.append(merged_gap)
            merged_gap = gap
        elif gap[0] <= merged_gap[2]:
            # next gap starts at or before next data
            if gap[1] > merged_gap[1]:
                # next gap ends after current gap ends, extend current
                merged_gap[1] = gap[1]
                merged_gap[2] = gap[2]
    if merged_gap is not None:
        merged_gaps.append(merged_gap)
    return merged_gaps


def get_channels(stream):
    """Get a list of channels in a stream.

    Parameters
    ----------

    Returns
    -------
    channels : array_like
    """
    channels = {}
    for trace in stream:
        channel = trace.stats.channel
        if channel:
            channels[channel] = True
    return [ch for ch in channels]
def get_trace_value(traces, time, default=None):
    """Get a value at a specific time.

    Parameters
    ----------
    trace : Trace
    time : UTCDateTime

    Returns
    -------
    nearest time in trace
    value from trace at nearest time, or None
    """
    # array of UTCDateTime values corresponding
    for trace in traces:
        times = trace.times("utcdatetime")
        index = times.searchsorted(time)
        trace_time = times[index]
        trace_value = trace.data[index]
        if trace_time == time:
            if numpy.isnan(trace_value):
                return default
            else:
                return trace_value
    return default


def has_all_channels(stream, channels, starttime, endtime):
    """Check whether all channels have any data within time range.

    Parameters
    ----------
        The input stream we want to make certain has data
    channels: array_like
        The list of channels that we want to have concurrent data
    starttime: UTCDateTime
        start time of requested output
    end : UTCDateTime
        end time of requested output

    Returns
    -------
    bool: True if data found across all channels between starttime/endtime
    """
    input_gaps = get_merged_gaps(get_stream_gaps(stream=stream, channels=channels))
    for input_gap in input_gaps:
        # Check for gaps that include the entire range
        if (
            starttime >= input_gap[0]
            and starttime <= input_gap[1]
            and endtime < input_gap[2]
        ):
            return False
    return True


def has_any_channels(stream, channels, starttime, endtime):
    """Check whether any channel has data within time range.

    Parameters
    ----------
        The input stream we want to make certain has data
    channels: array_like
        The list of channels that we want to have concurrent data
    starttime: UTCDateTime
        start time of requested output
    end : UTCDateTime
        end time of requested output

    Returns
    -------
    bool: True if any data found between starttime/endtime
    """
    # process if any channels have data not covering the time range
    for trace in stream:
        if trace.stats.channel in channels:
            gaps = get_trace_gaps(trace)
            if len(gaps) == 0:
                # no gaps in trace;
                # "any" critierion achieved for stream
                return True
            nodata = [
                (starttime >= gap[0] and starttime <= gap[1] and endtime < gap[2])
                for gap in gaps
            ]
            if not any(nodata):
                # no gaps in trace span interval;
                # "any" criterion achieved for stream
def mask_stream(stream):
    """Convert stream traces to masked arrays.

    Parameters
    ----------
        stream with new Trace objects with numpy masked array data.
    """
        masked += Trace(numpy.ma.masked_invalid(trace.data), trace.stats)
    return masked


def unmask_stream(stream):
    """Convert stream traces to unmasked arrays.

    Parameters
    ----------
        stream with new Trace objects with numpy array data, with numpy.nan
        as a fill value in a filled array.
    """
Geels, Brendan Ryan's avatar
Geels, Brendan Ryan committed
            (
                trace.data.filled(fill_value=numpy.nan)
                if isinstance(trace.data, numpy.ma.MaskedArray)
                else trace.data
            ),
    return unmasked


def merge_streams(*streams):
    """Merge one or more streams.

    Parameters
    ----------
        one or more streams to merge

    Returns
    -------
        stream with contiguous traces merged, and gaps filled with numpy.nan
    """
    for stream in streams:
    # split traces that contain gaps
    split = split.split()

    # Re-add any empty traces that were removed by split()
    for trace in merged:
        stats = trace.stats
        split_stream = split.select(
            channel=stats.channel,
            station=stats.station,
            network=stats.network,
            location=stats.location,
        )
        if len(split_stream) == 0:
            readd += trace
    split += readd
        # 1 = do not interpolate
        interpolation_samples=0,
        # 1 = when there is overlap, use data from trace with last endtime
        method=1,
        # np.nan = work-around for (problematic) intermediate masked arrays
        fill_value=numpy.nan,
    )
    # convert back to NaN filled array


def pad_timeseries(timeseries, starttime, endtime):
    """Calls pad_and_trim_trace for each trace in a stream.

    Traces should be merged before calling this method.
        The timeseries stream as returned by the call to getWaveform
    starttime: UTCDateTime
        the starttime of the requested data
        the endtime of the requested data

    Notes: the original timeseries object is changed.
    """
    for trace in timeseries:
        pad_and_trim_trace(trace, starttime, endtime)


def pad_and_trim_trace(trace, starttime, endtime):
    """Pads and trims trace data so it is in the range [starttime, endtime].

    Uses trace stats to compute start/end times that are consistent with
    other trace data.  (starttime and endtime are not checked).

    Parameters
    ----------
    starttime: UTCDateTime
        the starttime of the requested data
        the endtime of the requested data

    Notes: the original timeseries object is changed.
    """
    trace_starttime = UTCDateTime(trace.stats.starttime)
    trace_endtime = UTCDateTime(trace.stats.endtime)
    trace_delta = trace.stats.delta
    if trace_starttime < starttime:
        # trim to starttime
        cnt = int(math.ceil(round((starttime - trace_starttime) / trace_delta, 6)))
            trace_starttime = trace_starttime + trace_delta * cnt
            trace.stats.starttime = trace_starttime
    elif trace_starttime > starttime:
        # pad to starttime
        cnt = int(round((trace_starttime - starttime) / trace_delta, 6))
        # cnt = int((trace_starttime - starttime) / trace_delta)
            trace.data = numpy.concatenate(
                [numpy.full(cnt, numpy.nan, dtype=numpy.float64), trace.data]
            )
            trace_starttime = trace_starttime - trace_delta * cnt
            trace.stats.starttime = trace_starttime
    if trace_endtime > endtime:
        # trim to endtime, at least 1 sample to remove
        cnt = int(math.ceil(round((trace_endtime - endtime) / trace_delta, 6)))
        if cnt > 0:
            trace.data = trace.data[:-cnt]
            trace.stats.npts = len(trace.data)
    elif trace_endtime < endtime:
        # pad to endtime
        cnt = int(round((endtime - trace_endtime) / trace.stats.delta, 6))
        # cnt = int((endtime - trace_endtime) / trace.stats.delta)
            trace.data = numpy.concatenate(
                [trace.data, numpy.full(cnt, numpy.nan, dtype=numpy.float64)]
            )


def round_usecs(time):
    """Rounds residual microseconds to milliseconds.

    Parameters
    ----------
    time: UTCDateTime
        time containing microsecond values

    Returns
    ----------
    time: UTCDateTime
        time containing rounded(or non-rounded) microsecond values
    """
    usecs = time.microsecond
    # round microseconds to nearest millisecond
    rounded_usecs = int(round(usecs / 1000, 0)) * 1000
    # reset microseconds to 0 at top of second, add second to input time
    if rounded_usecs > 999000:
        rounded_usecs = 0
        time += 1
    if rounded_usecs != usecs:
        time = time.replace(microsecond=rounded_usecs)
def split_stream(stream: Stream, size: int = 86400) -> Stream:
    out_stream = Stream()
    for trace in stream:
        out_stream += split_trace(trace, size)
    return out_stream


def split_trace(trace: Trace, size: int = 86400) -> Stream:
    # copy in case original trace changes later
    stream = Stream()
    out_trace = trace.copy()
    for interval in get_intervals(
        starttime=out_trace.stats.starttime,
        endtime=out_trace.stats.endtime,
        size=size,
        trim=True,
    ):
        interval_start = interval["start"]
        interval_end = interval["end"]
        delta = out_trace.stats.delta
        if interval_end - delta < interval_start:
            # trace contains one sample
            stream += out_trace
            continue
        if interval_end.timestamp % size:
            # trace does NOT contain first sample in next interval
            stream += out_trace.slice(
                starttime=interval_start, endtime=interval_end, nearest_sample=False
            )
        else:
            # trace DOES contain first sample in next interval
            stream += out_trace.slice(
                starttime=interval_start,
                endtime=interval_end - delta,
                nearest_sample=False,
            )
            if interval_end == out_trace.stats.endtime:
                # ONLY if it is the last interval
                stream += out_trace.slice(
                    starttime=interval_end, endtime=interval_end, nearest_sample=False
                )