diff --git a/geomagio/TimeseriesUtility.py b/geomagio/TimeseriesUtility.py index 459287dbe69add0b6dc98da98cd56d8395e18bd2..4fe0e9b18fde6797763ad75a11a0d2fe43d8d7ad 100644 --- a/geomagio/TimeseriesUtility.py +++ b/geomagio/TimeseriesUtility.py @@ -568,3 +568,48 @@ def round_usecs(time): if rounded_usecs != usecs: time = time.replace(microsecond=rounded_usecs) return time + + +def split_streams_by_interval(stream, interval): + """Splits streams by interval + + Parameters: + ----------- + stream: obspy.core.Stream + stream of input data + interval: int + interval streams will be split by + + Returns: + -------- + out_streams: List[obspy.core.Stream] + list of streams split by interval + + """ + delta = stream[0].stats.delta + interval_start = stream[0].stats.starttime.timestamp + times = stream[0].times("timestamp") + interval_ends = times[times % interval == 0] - delta + if not interval_ends.any(): + return [stream] + out_streams = [] + for interval_end in interval_ends: + # should only occur with stream starttime occuring at midnight + if interval_end == interval_start - delta: + continue + stream_copy = stream.copy() + pad_timeseries( + timeseries=stream_copy, + starttime=obspy.core.UTCDateTime(interval_start), + endtime=obspy.core.UTCDateTime(interval_end), + ) + out_streams.append(stream_copy) + interval_start = interval_end + delta + # last trim will extend from interval_start to the end of the original input stream + pad_timeseries( + stream, + starttime=obspy.core.UTCDateTime(interval_start), + endtime=stream[0].stats.endtime, + ) + out_streams.append(stream) + return out_streams diff --git a/geomagio/edge/MiniSeedInputClient.py b/geomagio/edge/MiniSeedInputClient.py index f89f336b3345eb01cf6585d68c3defb17b9e88b2..b273604a63fe0ce386f4851216982b63d25544ff 100644 --- a/geomagio/edge/MiniSeedInputClient.py +++ b/geomagio/edge/MiniSeedInputClient.py @@ -3,6 +3,8 @@ import io import socket import sys +from ..TimeseriesUtility import split_streams_by_interval + class MiniSeedInputClient(object): """Client to write MiniSeed formatted data to Edge. @@ -74,12 +76,43 @@ class MiniSeedInputClient(object): self.connect() # convert stream to miniseed buf = io.BytesIO() - stream = self._pre_process(stream) - stream.write(buf, encoding=self.encoding, format="MSEED", reclen=512) + streams = self._pre_process(stream) + for stream in streams: + stream.write(buf, format="MSEED", reclen=512) # send data self.socket.sendall(buf.getvalue()) def _pre_process(self, stream): + """Encodes and splits streams at daily intervals + + Paramters: + ---------- + stream: obspy.core.stream + stream of input data + + Returns: + -------- + streams: List[obspy.core.stream] + list of encoded streams split at daily intervals + """ + stream = self.__encode_stream(stream) + streams = split_streams_by_interval(stream, interval=86400) + return streams + + def __encode_stream(self, stream): + """Ensures that factory encoding matches output data encoding + + Parameters: + ----------- + stream: obspy.core.Stream + stream of input data + + Returns: + -------- + stream: obspy.core.Stream + stream with matching data encoding to factory specification + + """ for trace in stream: if trace.data.dtype != self.encoding: trace.data = trace.data.astype(self.encoding)