diff --git a/geomagio/TimeseriesUtility.py b/geomagio/TimeseriesUtility.py index 4fe0e9b18fde6797763ad75a11a0d2fe43d8d7ad..e0729c21b81796707a06cff1184765aeab5be29c 100644 --- a/geomagio/TimeseriesUtility.py +++ b/geomagio/TimeseriesUtility.py @@ -4,7 +4,9 @@ from datetime import datetime import math import sys import numpy -import obspy.core +from obspy.core import Stats, Stream, Trace, UTCDateTime + +from .Util import get_intervals def create_empty_trace( @@ -14,9 +16,9 @@ def create_empty_trace( Parameters ---------- - starttime: obspy.core.UTCDateTime + starttime: UTCDateTime the starttime of the requested data - endtime: obspy.core.UTCDateTime + endtime: UTCDateTime the endtime of the requested data observatory : str observatory code @@ -34,19 +36,17 @@ def create_empty_trace( the location code Returns ------- - obspy.core.Trace + Trace: trace for the requested channel """ delta = get_delta_from_interval(interval) - stats = obspy.core.Stats() + stats = Stats() stats.network = network stats.station = station stats.location = location stats.channel = channel # Calculate first valid sample time based on interval - trace_starttime = obspy.core.UTCDateTime( - numpy.ceil(starttime.timestamp / delta) * delta - ) + trace_starttime = UTCDateTime(numpy.ceil(starttime.timestamp / delta) * delta) if delta > 60.0: trace_starttime += (delta - 60) / 2 if trace_starttime > endtime: @@ -60,7 +60,30 @@ def create_empty_trace( length = int((endtime - trace_starttime) / delta) stats.npts = length + 1 data = numpy.full(stats.npts, numpy.nan, dtype=numpy.float64) - return obspy.core.Trace(data, stats) + return Trace(data, stats) + + +def encode_stream(stream: Stream, encoding: str) -> Stream: + """Ensures that factory encoding matches output data encoding + + Parameters: + ----------- + stream: Stream + stream of input data + + Returns: + -------- + out_stream: Stream + stream with matching data encoding to factory specification + + """ + out_stream = Stream() + for trace in stream: + trace_out = trace.copy() + if trace_out.data.dtype != encoding: + trace_out.data = trace_out.data.astype(encoding) + out_stream += trace_out + return out_stream def get_delta_from_interval(data_interval): @@ -125,20 +148,20 @@ def get_stream_start_end_times(timeseries, without_gaps=False): the latest endtime. Parameters ---------- - timeseries: obspy.core.stream + timeseries: Stream The timeseries stream Returns ------- tuple: (starttime, endtime) - starttime: obspy.core.UTCDateTime - endtime: obspy.core.UTCDateTime + starttime: UTCDateTime + endtime: UTCDateTime NOTE: when the entire timeseries is a gap, and without_gaps is True, the returned endtime will be one delta earlier than starttime. """ - starttime = obspy.core.UTCDateTime(datetime.now()) - endtime = obspy.core.UTCDateTime(0) + starttime = UTCDateTime(datetime.now()) + endtime = UTCDateTime(0) for trace in timeseries: if trace.stats.starttime < starttime: starttime = trace.stats.starttime @@ -160,7 +183,7 @@ def get_stream_gaps(stream, channels=None): """Get gaps in a given stream Parameters ---------- - stream: obspy.core.Stream + stream: Stream the stream to check for gaps channels: array_like list of channels to check for gaps @@ -188,7 +211,7 @@ def get_trace_gaps(trace): """Gets gaps in a trace representing a single channel Parameters ---------- - trace: obspy.core.Trace + trace: Trace a stream containing a single channel of data. Returns @@ -270,7 +293,7 @@ def get_channels(stream): Parameters ---------- - stream : obspy.core.Stream + stream : Stream Returns ------- @@ -289,8 +312,8 @@ def get_trace_value(traces, time, default=None): Parameters ---------- - trace : obspy.core.Trace - time : obspy.core.UTCDateTime + trace : Trace + time : UTCDateTime Returns ------- @@ -316,7 +339,7 @@ def has_all_channels(stream, channels, starttime, endtime): Parameters ---------- - stream: obspy.core.Stream + stream: Stream The input stream we want to make certain has data channels: array_like The list of channels that we want to have concurrent data @@ -346,7 +369,7 @@ def has_any_channels(stream, channels, starttime, endtime): Parameters ---------- - stream: obspy.core.Stream + stream: Stream The input stream we want to make certain has data channels: array_like The list of channels that we want to have concurrent data @@ -381,17 +404,17 @@ def mask_stream(stream): Parameters ---------- - stream : obspy.core.Stream + stream : Stream stream to mask Returns ------- - obspy.core.Stream + Stream stream with new Trace objects with numpy masked array data. """ - masked = obspy.core.Stream() + masked = Stream() for trace in stream: - masked += obspy.core.Trace(numpy.ma.masked_invalid(trace.data), trace.stats) + masked += Trace(numpy.ma.masked_invalid(trace.data), trace.stats) return masked @@ -400,18 +423,18 @@ def unmask_stream(stream): Parameters ---------- - stream : obspy.core.Stream + stream : Stream stream to unmask Returns ------- - obspy.core.Stream + Stream stream with new Trace objects with numpy array data, with numpy.nan as a fill value in a filled array. """ - unmasked = obspy.core.Stream() + unmasked = Stream() for trace in stream: - unmasked += obspy.core.Trace( + unmasked += Trace( trace.data.filled(fill_value=numpy.nan) if isinstance(trace.data, numpy.ma.MaskedArray) else trace.data, @@ -425,15 +448,15 @@ def merge_streams(*streams): Parameters ---------- - *streams : obspy.core.Stream + *streams : Stream one or more streams to merge Returns ------- - obspy.core.Stream + Stream stream with contiguous traces merged, and gaps filled with numpy.nan """ - merged = obspy.core.Stream() + merged = Stream() # sort out empty for stream in streams: @@ -445,7 +468,7 @@ def merge_streams(*streams): split = split.split() # Re-add any empty traces that were removed by split() - readd = obspy.core.Stream() + readd = Stream() for trace in merged: stats = trace.stats split_stream = split.select( @@ -480,11 +503,11 @@ def pad_timeseries(timeseries, starttime, endtime): Parameters ---------- - timeseries: obspy.core.stream + timeseries: Stream The timeseries stream as returned by the call to getWaveform - starttime: obspy.core.UTCDateTime + starttime: UTCDateTime the starttime of the requested data - endtime: obspy.core.UTCDateTime + endtime: UTCDateTime the endtime of the requested data Notes: the original timeseries object is changed. @@ -501,17 +524,17 @@ def pad_and_trim_trace(trace, starttime, endtime): Parameters ---------- - trace: obspy.core.Trace + trace: Trace One trace to be processed - starttime: obspy.core.UTCDateTime + starttime: UTCDateTime the starttime of the requested data - endtime: obspy.core.UTCDateTime + endtime: UTCDateTime the endtime of the requested data Notes: the original timeseries object is changed. """ - trace_starttime = obspy.core.UTCDateTime(trace.stats.starttime) - trace_endtime = obspy.core.UTCDateTime(trace.stats.endtime) + trace_starttime = UTCDateTime(trace.stats.starttime) + trace_endtime = UTCDateTime(trace.stats.endtime) trace_delta = trace.stats.delta if trace_starttime < starttime: # trim to starttime @@ -570,46 +593,26 @@ def round_usecs(time): return time -def split_streams_by_interval(stream, interval): - """Splits streams by interval - - Parameters: - ----------- - stream: obspy.core.Stream - stream of input data - interval: int - interval streams will be split by - - Returns: - -------- - out_streams: List[obspy.core.Stream] - list of streams split by interval - - """ - delta = stream[0].stats.delta - interval_start = stream[0].stats.starttime.timestamp - times = stream[0].times("timestamp") - interval_ends = times[times % interval == 0] - delta - if not interval_ends.any(): - return [stream] - out_streams = [] - for interval_end in interval_ends: - # should only occur with stream starttime occuring at midnight - if interval_end == interval_start - delta: - continue - stream_copy = stream.copy() - pad_timeseries( - timeseries=stream_copy, - starttime=obspy.core.UTCDateTime(interval_start), - endtime=obspy.core.UTCDateTime(interval_end), +def split_stream(stream: Stream, size: int = 86400) -> Stream: + out_stream = Stream() + for trace in stream: + out_stream += split_trace(trace, size) + return out_stream + + +def split_trace(trace: Trace, size: int = 86400) -> Stream: + # copy in case original trace changes later + stream = Stream() + out_trace = trace.copy() + for interval in get_intervals( + starttime=out_trace.stats.starttime, + endtime=out_trace.stats.endtime, + size=size, + trim=True, + ): + stream += out_trace.slice( + starttime=interval["start"], + endtime=interval["end"] - out_trace.stats.delta, + nearest_sample=False, ) - out_streams.append(stream_copy) - interval_start = interval_end + delta - # last trim will extend from interval_start to the end of the original input stream - pad_timeseries( - stream, - starttime=obspy.core.UTCDateTime(interval_start), - endtime=stream[0].stats.endtime, - ) - out_streams.append(stream) - return out_streams + return stream diff --git a/geomagio/edge/MiniSeedInputClient.py b/geomagio/edge/MiniSeedInputClient.py index 0edc200008e25bccfd374f4c3bc10d31f31194f8..4a3db6ee0d41333bf956673f635f885ebf221ad1 100644 --- a/geomagio/edge/MiniSeedInputClient.py +++ b/geomagio/edge/MiniSeedInputClient.py @@ -3,7 +3,9 @@ import io import socket import sys -from ..TimeseriesUtility import split_streams_by_interval +from obspy.core import Stream + +from ..TimeseriesUtility import encode_stream, split_stream class MiniSeedInputClient(object): @@ -74,48 +76,28 @@ class MiniSeedInputClient(object): # connect if needed if self.socket is None: self.connect() - # convert stream to miniseed - buf = io.BytesIO() - streams = self._pre_process(stream) - for stream in streams: - stream.write( - buf, encoding=self.encoding.upper(), format="MSEED", reclen=512 - ) - # send data - self.socket.sendall(buf.getvalue()) - - def _pre_process(self, stream): + processed = self._pre_process(stream=stream) + for trace in processed: + buf = io.BytesIO() + # convert stream to miniseed + trace.write(buf, format="MSEED", reclen=512) + # send data + self.socket.sendall(buf.getvalue()) + buf.close() + + def _pre_process(self, stream: Stream) -> Stream: """Encodes and splits streams at daily intervals Paramters: ---------- - stream: obspy.core.stream + stream: Stream stream of input data Returns: -------- - streams: List[obspy.core.stream] - list of encoded streams split at daily intervals - """ - stream = self.__encode_stream(stream) - streams = split_streams_by_interval(stream, interval=86400) - return streams - - def __encode_stream(self, stream): - """Ensures that factory encoding matches output data encoding - - Parameters: - ----------- - stream: obspy.core.Stream - stream of input data - - Returns: - -------- - stream: obspy.core.Stream - stream with matching data encoding to factory specification - + stream: Stream + list of encoded trace split at daily intervals """ - for trace in stream: - if trace.data.dtype != self.encoding: - trace.data = trace.data.astype(self.encoding) + stream = encode_stream(stream=stream, encoding=self.encoding) + stream = split_stream(stream=stream, size=86400) return stream diff --git a/geomagio/edge/__init__.py b/geomagio/edge/__init__.py index 93888daa3356a5d4f8b5d3fed1034772106e6f13..2a53748a195983a3dc737b87731330584f10c739 100644 --- a/geomagio/edge/__init__.py +++ b/geomagio/edge/__init__.py @@ -5,6 +5,13 @@ from __future__ import absolute_import from .EdgeFactory import EdgeFactory from .LocationCode import LocationCode from .MiniSeedFactory import MiniSeedFactory +from .MiniSeedInputClient import MiniSeedInputClient from .RawInputClient import RawInputClient -__all__ = ["EdgeFactory", "LocationCode", "MiniSeedFactory", "RawInputClient"] +__all__ = [ + "EdgeFactory", + "LocationCode", + "MiniSeedFactory", + "MiniSeedInputClient", + "RawInputClient", +] diff --git a/test/edge_test/MiniSeedFactory_test.py b/test/edge_test/MiniSeedFactory_test.py index 24397dbcbde8e20781418f133a23696609b6ec9e..7f3421311348b37825b4a2e222722be352a43d08 100644 --- a/test/edge_test/MiniSeedFactory_test.py +++ b/test/edge_test/MiniSeedFactory_test.py @@ -1,10 +1,11 @@ """Tests for MiniSeedFactory.py""" +import io import numpy from numpy.testing import assert_equal -from obspy.core import Stats, Stream, Trace, UTCDateTime +from obspy.core import read, Stats, Stream, Trace, UTCDateTime from geomagio import TimeseriesUtility -from geomagio.edge import MiniSeedFactory +from geomagio.edge import MiniSeedFactory, MiniSeedInputClient def test__get_edge_network(): @@ -90,6 +91,28 @@ def test__put_timeseries(): assert_equal(sent[1].stats.endtime, trace1.stats.endtime) +def test__pre_process(): + """edge_test.MiniSeedFactory_test.test__pre_process()""" + # create 2 day long trace with additional midnight sample + trace1 = __create_trace(numpy.arange((86400 * 2) + 1), channel="H") + out_stream = Stream() + processed = MiniSeedInputClient(host=None, encoding="float32")._pre_process( + stream=Stream(trace1) + ) + for trace in processed: + buf = io.BytesIO() + trace.write(buf, format="MSEED", reclen=512) + out_stream += read(buf) + buf.close() + assert len(out_stream) == 2 + for trace in out_stream: + assert trace.data.dtype == "float32" + stats = trace.stats + assert stats.npts == 86400 + assert stats.starttime.timestamp % 86400 == 0 + assert stats.endtime.timestamp % 86400 != 0 + + def test__set_metadata(): """edge_test.MiniSeedFactory_test.test__set_metadata()""" # Call _set_metadata with 2 traces, and make certain the stats get