From 6b773910195b7b5a0447ffd3300571485744e6a5 Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Wed, 20 Nov 2024 12:10:48 -0700 Subject: [PATCH 01/24] Reduce MAXINPUTSIZE in RawInputClient.py --- geomagio/edge/RawInputClient.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/geomagio/edge/RawInputClient.py b/geomagio/edge/RawInputClient.py index 55eae89c..fb642a6a 100644 --- a/geomagio/edge/RawInputClient.py +++ b/geomagio/edge/RawInputClient.py @@ -14,13 +14,16 @@ from time import sleep """ MAXINPUTSIZE: Edge uses a short for the data count, so the max input size - for a single data packet to edge is 32767 + for a single data packet to edge is *supposed* to be 32767. However, this + was never tested thoroughly until we added support for 10Hz data, when + it was quickly discovered that setting this value caused Edge to hang. + Therefore, MAXINPUTSIZE was reduced to 10_000, which seems to work well. HOURSECONDS: The number of seconds in an hour. Used as an arbitrary size for sending seconds data. DAYMINUTES: The numbers of minutes in a day. Used as the size for sending minute data, since Edge stores by the day. """ -MAXINPUTSIZE = 32767 +MAXINPUTSIZE = 10_000 HOURSECONDS = 3600 DAYMINUTES = 1440 -- GitLab From 65ee3c2c90f9fa28a4b024167d77dcaa06b568e3 Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Wed, 20 Nov 2024 12:16:01 -0700 Subject: [PATCH 02/24] Allow 10 Hz ('tenhertz') samples in RawInputClient.py --- geomagio/edge/RawInputClient.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/geomagio/edge/RawInputClient.py b/geomagio/edge/RawInputClient.py index fb642a6a..9e09e0ed 100644 --- a/geomagio/edge/RawInputClient.py +++ b/geomagio/edge/RawInputClient.py @@ -164,7 +164,7 @@ class RawInputClient: PARAMETERS ---------- - interval: {'day', 'hour', 'minute', 'second'} + interval: {'day', 'hour', 'minute', 'second', 'tenhertz'} data interval. trace: obspy.core.trace @@ -177,7 +177,11 @@ class RawInputClient: totalsamps = len(trace.data) starttime = trace.stats.starttime - if interval == "second": + if interval == "tenhertz": + nsamp = MAXINPUTSIZE + timeoffset = 1 / 10.0 + samplerate = 10.0 + elif interval == "second": nsamp = HOURSECONDS timeoffset = 1 samplerate = 1.0 @@ -363,7 +367,7 @@ class RawInputClient: secs, usecs, self.sequence, - *samples + *samples, ) return buf -- GitLab From 7d0d4eb797ccac6a43837ab4dd23c901e06c344b Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Wed, 20 Nov 2024 16:18:10 -0700 Subject: [PATCH 03/24] Handle "STEIM" encoding in TimeseriesUtility.encode_stream() --- geomagio/TimeseriesUtility.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/geomagio/TimeseriesUtility.py b/geomagio/TimeseriesUtility.py index 9550187c..932ec783 100644 --- a/geomagio/TimeseriesUtility.py +++ b/geomagio/TimeseriesUtility.py @@ -82,7 +82,10 @@ def encode_stream(stream: Stream, encoding: str) -> Stream: for trace in stream: trace_out = trace.copy() if trace_out.data.dtype != encoding: - trace_out.data = trace_out.data.astype(encoding) + if "STEIM" in encoding.upper(): + trace_out.data = trace_out.data.astype(numpy.int32) + else: + trace_out.data = trace_out.data.astype(encoding) if "mseed" in trace_out.stats: trace_out.stats.mseed.encoding = encoding.upper() out_stream += trace_out -- GitLab From 43ae1bf80a43b85a670bfc5eedb4a29da152f8de Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Wed, 20 Nov 2024 16:40:20 -0700 Subject: [PATCH 04/24] Incorporate MiniSeedFactory functionality into EdgeFactory - mostly get_timeseries related stuff --- geomagio/edge/EdgeFactory.py | 341 +++++++++++++++++++++++++++-------- 1 file changed, 261 insertions(+), 80 deletions(-) diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index f4611939..dffc2b42 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -16,11 +16,12 @@ from typing import List, Optional import numpy import numpy.ma -from obspy import Stream, Trace, UTCDateTime +from obspy.core import Stats, Stream, Trace, UTCDateTime from obspy.clients import earthworm, neic from .. import ChannelConverter, TimeseriesUtility from ..geomag_types import DataInterval, DataType +from ..metadata.instrument.InstrumentCalibrations import get_instrument_calibrations from ..TimeseriesFactory import TimeseriesFactory from ..TimeseriesFactoryException import TimeseriesFactoryException from ..ObservatoryMetadata import ObservatoryMetadata @@ -62,12 +63,15 @@ class EdgeFactory(TimeseriesFactory): locationCode: str the location code for the given edge server, overrides type in get_timeseries/put_timeseries - scaleFactor: int - all data written to edge (via raw input client) will be scaled - by this integer prior to write; all data read from edge will be - will be divided by this integer after read; default = 1000 - snclMapper: {'geomag','legacy'} - a mapper of common channel names to SEED SNCL codes (that is, + convert_channels: array + list of channels to convert from volt/bin to nT + scale_factor: float + override default scalings when reading/writing miniseed blocks; + (reading integer blocks divides by 1000; reading floating point + blocks divides by 1; writing all data multiplies by 1000) + default = None + sncl_mode: {'geomag','legacy'} + force mode to convert common names to SEED SNCL codes (that is, station, network, channel, location codes); default = legacy See Also @@ -85,8 +89,10 @@ class EdgeFactory(TimeseriesFactory): def __init__( self, host: str = "edgecwb.usgs.gov", - port: int = 2060, - write_port: int = 7981, + port: Optional[int] = None, + client: Optional[str] = None, + write_port: Optional[int] = None, + write_client: Optional[str] = None, tag: str = "GeomagAlg", forceout: bool = False, observatory: Optional[str] = None, @@ -95,31 +101,34 @@ class EdgeFactory(TimeseriesFactory): interval: Optional[DataInterval] = None, observatoryMetadata: Optional[ObservatoryMetadata] = None, locationCode: Optional[str] = None, - scaleFactor: int = 1000, - snclMapper: str = "legacy", + convert_channels: Optional[List[str]] = None, + scale_factor: Optional[int] = None, + sncl_mode: Optional[str] = "legacy", + write_mode: Optional[str] = "raw", ): TimeseriesFactory.__init__(self, observatory, channels, type, interval) - if port == 2060: - # earthworm (phasing this out gradually) - self.client = earthworm.Client(host, port) - else: - # CWBQuery/miniseed protocol (preferred) - self.client = neic.Client(host, port) self.host = host self.port = port + self.client = client self.write_port = write_port + self.write_client = write_client self.tag = tag self.forceout = forceout self.interval = interval self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata() self.locationCode = locationCode - self.scaleFactor = scaleFactor - if snclMapper == "legacy": + + self.convert_channels = convert_channels or [] + self.scale_factor = scale_factor + self.sncl_mode = sncl_mode + self.write_mode = write_mode + + if sncl_mode == "legacy": self.get_sncl = LegacySNCL.get_sncl - elif snclMapper == "geomag": + elif sncl_mode == "geomag": self.get_sncl = SNCL.get_sncl else: - raise TimeseriesFactoryException("Unrecognized SNCL mapper") + raise TimeseriesFactoryException("Unrecognized SNCL mode") def get_timeseries( self, @@ -179,17 +188,22 @@ class EdgeFactory(TimeseriesFactory): # get the timeseries timeseries = Stream() for channel in channels: - data = self._get_timeseries( - starttime, - endtime, - observatory, - channel, - type, - interval, - add_empty_channels, - ) - if len(data) == 0: - continue + if channel in self.convert_channels: + data = self._convert_timeseries( + starttime, endtime, observatory, channel, type, interval + ) + else: + data = self._get_timeseries( + starttime, + endtime, + observatory, + channel, + type, + interval, + add_empty_channels, + ) + if len(data) == 0: + continue timeseries += data finally: # restore stdout @@ -250,43 +264,75 @@ class EdgeFactory(TimeseriesFactory): timeseries, observatory, channel, type, interval, starttime, endtime ) - def _convert_timeseries_to_decimal(self, stream: Stream): - """convert geomag edge timeseries data stored as ints, to decimal by - dividing by 1000.00 - Parameters - ---------- - stream: Stream - a stream retrieved from a geomag edge representing one channel - Notes - ----- - This routine changes the values in the timeseries. The user should - make a copy before calling if they don't want that side effect. - """ - for trace in stream: - trace.data = numpy.divide(trace.data, self.scaleFactor) - - def _convert_trace_to_int(self, trace_in: Trace) -> Trace: - """convert geomag edge traces stored as decimal, to ints by multiplying - by 1000 + def get_calculated_timeseries( + self, + starttime: UTCDateTime, + endtime: UTCDateTime, + observatory: str, + channel: str, + type: DataType, + interval: DataInterval, + components: List[dict], + ) -> Trace: + """Calculate a single channel using multiple component channels. Parameters ---------- - trace: Trace - a trace retrieved from a geomag edge representing one channel + starttime: UTCDateTime + the starttime of the requested data + endtime: UTCDateTime + the endtime of the requested data + observatory: str + observatory code + channel: str + single character channel {H, E, D, Z, F} + type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} + data type + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} + data interval + components: list + each component is a dictionary with the following keys: + channel: str + offset: float + scale: float + Returns ------- - trace: Trace - a trace converted to ints - Notes - ----- - this doesn't work on ndarray with nan's in it. - the trace must be a masked array. + out: Trace + timeseries trace of the converted channel data """ - trace = trace_in.copy() - trace.data = numpy.multiply(trace.data, self.scaleFactor) - trace.data = trace.data.astype(int) - - return trace + # sum channels + stats = None + converted = None + for component in components: + # load component + data = self._get_timeseries( + starttime, endtime, observatory, component["channel"], type, interval + )[0] + # convert to nT + nt = data.data * component["scale"] + component["offset"] + # add to converted + if converted is None: + converted = nt + stats = Stats(data.stats) + else: + converted += nt + # set channel parameter to U, V, or W + stats.channel = channel + # create empty trace with adapted stats + out = TimeseriesUtility.create_empty_trace( + stats.starttime, + stats.endtime, + stats.station, + stats.channel, + stats.data_type, + stats.data_interval, + stats.network, + stats.station, + stats.location, + ) + out.data = converted + return out def _convert_stream_to_masked(self, timeseries: Stream, channel: str) -> Stream: """convert geomag edge traces in a timeseries stream to a MaskedArray @@ -331,7 +377,7 @@ class EdgeFactory(TimeseriesFactory): single character channel {H, E, D, Z, F} type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type - interval: {'second', 'minute', 'hour', 'day'} + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} data interval add_empty_channels: bool if True, returns channels without data as empty traces @@ -352,23 +398,83 @@ class EdgeFactory(TimeseriesFactory): # according to its author, EdgeCWB is inclusive of starttime, but exclusive of # endtime, to satisfy seismic standards/requirements, to precision delta/2; half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2 - try: - data = self.client.get_waveforms( - sncl.network, - sncl.station, - sncl.location, - sncl.channel, - starttime, - endtime + half_delta, - ) - except TypeError: - # get_waveforms() fails if no data is returned from Edge - data = Stream() - # make sure data is 32bit int + if self.port: + ports = [self.port] + starttimes = [starttime] + endtimes = [endtime] + else: + midnight = UTCDateTime(UTCDateTime.now().date) + if starttime < endtime: + if endtime < midnight: + # cwbquery port gives access to complete miniseed packets + ports = [22061] + starttimes = [starttime] + endtimes = [endtime] + elif starttime >= midnight: + # earthworm port gives access to really realtime data + ports = [2060] + starttimes = [starttime] + endtimes = [endtime] + else: + # starttime-endtime straddle midnight + # port=22061 before today; port=2060 for today + ports = [22061, 2060] + starttimes = [starttime, midnight] + endtimes = [midnight - half_delta, endtime] + + data = Stream() + for i, port in enumerate(ports): + + if port == 2060: + # earthworm for really realtime data (slower) + client = earthworm.Client(self.host, port) + else: + # neic for complete miniseed packets (faster) + client = neic.Client(self.host, port) + + try: + data += client.get_waveforms( + sncl.network, + sncl.station, + sncl.location, + sncl.channel, + starttimes[i], + endtimes[i] + half_delta, + ) + except Exception as e: + print("Ingnoring error: ", e) + for trace in data: - trace.data = trace.data.astype("i4") - data.merge() + if trace.data.dtype.kind == "i": + # convert all integer traces to rescaled 64-bit floats; + if sncl.channel[1] == "E": + # instrumental voltages are stored as 1/10 microvolts + trace.data = trace.data.astype(float) / (self.scale_factor or 1e7) + else: + # everything else (mostly magnetics stored as picoteslas) + trace.data = trace.data.astype(float) / (self.scale_factor or 1e3) + elif trace.data.dtype.kind == "f": + # convert all float traces to 64-bit floats; + trace.data = trace.data.astype(float) / (self.scale_factor or 1.0) + + # when Traces with identical NSCL codes overlap, prioritize samples + # that come "later" in Stream; this depends on Edge returning miniseed + # packets in the order written + # NOTE: this is not possible with single calls to Stream.merge() + st_tmp = Stream() + for tr in data: + # add tr to temporary stream + st_tmp += tr + # replace time overlaps with gaps + st_tmp.merge(0) + # add tr to temporary stream again + st_tmp += tr + # replace gaps with tr's data + st_tmp.merge(0) + # point `data` to the new stream and continue processing + data = st_tmp + if data.count() == 0 and add_empty_channels: data += self._get_empty_trace( starttime=starttime, @@ -380,9 +486,85 @@ class EdgeFactory(TimeseriesFactory): network=sncl.network, location=sncl.location, ) + if data.count() != 0: + TimeseriesUtility.pad_and_trim_trace( + trace=data[0], starttime=starttime, endtime=endtime + ) self._set_metadata(data, observatory, channel, type, interval) return data + def _convert_timeseries( + self, + starttime: UTCDateTime, + endtime: UTCDateTime, + observatory: str, + channel: str, + type: DataType, + interval: DataInterval, + ) -> Stream: + """Generate a single channel using multiple components. + + Finds metadata, then calls _get_converted_timeseries for actual + conversion. + + Parameters + ---------- + starttime: UTCDateTime + the starttime of the requested data + endtime: UTCDateTime + the endtime of the requested data + observatory : str + observatory code + channel : str + single character channel {H, E, D, Z, F} + type : {'adjusted', 'definitive', 'quasi-definitive', 'variation'} + data type + interval : {'tenhertz', 'second', 'minute', 'hour', 'day'} + data interval + + Returns + ------- + out: Stream + timeseries with single trace of the requested channel data + NOTE: this originally returned a Trace, but was modified + (probably due to misunderstanding) at some point to + return a Stream. This Stream is, however, expected + to contain just a single Trace. + """ + out = Stream() + metadata = get_instrument_calibrations(observatory, starttime, endtime) + # loop in case request spans different configurations + for entry in metadata: + entry_endtime = entry["end_time"] + entry_starttime = entry["start_time"] + instrument = entry["instrument"] + instrument_channels = instrument["channels"] + if channel not in instrument_channels: + # no idea how to convert + continue + # determine metadata overlap with request + start = ( + starttime + if entry_starttime is None or entry_starttime < starttime + else entry_starttime + ) + end = ( + endtime + if entry_endtime is None or entry_endtime > endtime + else entry_endtime + ) + # now convert + out += self.get_calculated_timeseries( + start, + end, + observatory, + channel, + type, + interval, + instrument_channels[channel], + ) + return out + def _post_process( self, timeseries: Stream, @@ -409,7 +591,6 @@ class EdgeFactory(TimeseriesFactory): Notes: the original timeseries object is changed. """ - self._convert_timeseries_to_decimal(timeseries) for trace in timeseries: if isinstance(trace.data, numpy.ma.MaskedArray): trace.data.set_fill_value(numpy.nan) -- GitLab From bf6f659e0d98c1324e90322e7867bea47e3d4617 Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Wed, 20 Nov 2024 16:42:50 -0700 Subject: [PATCH 05/24] Incorporate MiniSeedFactory functionality into EdgeFactory - mostly put_timeseries related stuff --- geomagio/edge/EdgeFactory.py | 71 +++++++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index dffc2b42..f6ee9c25 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -26,6 +26,7 @@ from ..TimeseriesFactory import TimeseriesFactory from ..TimeseriesFactoryException import TimeseriesFactoryException from ..ObservatoryMetadata import ObservatoryMetadata from .RawInputClient import RawInputClient +from .MiniSeedInputClient import MiniSeedInputClient from .SNCL import SNCL from .LegacySNCL import LegacySNCL @@ -73,6 +74,11 @@ class EdgeFactory(TimeseriesFactory): sncl_mode: {'geomag','legacy'} force mode to convert common names to SEED SNCL codes (that is, station, network, channel, location codes); default = legacy + write_mode: {'raw', 'miniseed'} + raw writes to Edge's RAM buffer, so better for real time interaction; + miniseed writes complete miniseed blocks, so better for large uploads + becaue it reduces processing required by Edge; default = raw + See Also -------- @@ -641,34 +647,65 @@ class EdgeFactory(TimeseriesFactory): location=self.locationCode, ) - ric = RawInputClient( - self.tag, - self.host, - self.write_port, - sncl.station, - sncl.channel, - sncl.location, - sncl.network, - ) - stream = self._convert_stream_to_masked(timeseries=timeseries, channel=channel) + stream = stream.split() # Make certain there's actually data - if not numpy.ma.any(stream.select(channel=channel)[0].data): + if not numpy.ma.any(stream[0].data): return - for trace in stream.select(channel=channel).split(): + for trace in stream: trace_send = trace.copy() trace_send.trim(starttime, endtime) if channel == "D": trace_send.data = ChannelConverter.get_minutes_from_radians( trace_send.data ) - trace_send = self._convert_trace_to_int(trace_send) - ric.send_trace(interval, trace_send) - if self.forceout: - ric.forceout() - ric.close() + if sncl.channel[1] == "E": + # instrumental voltages are stored as 1/10 microvolts + trace_send.data = trace_send.data * (self.scale_factor or 1e7) + else: + # everything else (mostly magnetics stored as picoteslas) + trace_send.data = trace_send.data * (self.scale_factor or 1e3) + + if self.write_mode.lower() in ["raw"]: + # FIXME: input clients should be rewritten for consistency, + # allowing us to clean up stuff like this if-else-block + ric = RawInputClient( + self.tag, + self.host, + self.write_port, + sncl.station, + sncl.channel, + sncl.location, + sncl.network, + ) + trace_send.data = trace_send.data.astype(int) # ric requires ints + ric.send_trace(interval, trace_send) + + elif self.write_mode.lower() in ["miniseed"]: + # FIXME: input clients should be rewritten for consistency, + # allowing us to clean up stuff like this if-else-block + msic = MiniSeedInputClient( + self.host, self.write_port, encoding="STEIM2" + ) + trace_send.stats.station = sncl.station + trace_send.stats.location = sncl.location + trace_send.stats.network = sncl.network + trace_send.stats.channel = sncl.channel + stream_send = Stream(trace_send) + msic.send(stream_send) + else: + raise TimeseriesFactoryException( + "Unrecognized write_mode specified: ", self.write_mode + ) + + if self.write_mode.lower() in ["raw"]: + if self.forceout: + ric.forceout() + ric.close() + elif self.write_mode.lower() in ["miniseed"]: + msic.close() def _set_metadata( self, -- GitLab From 5883b5faf328befcfacb1b56ddeca308cc27c89b Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Sat, 30 Nov 2024 15:03:25 -0700 Subject: [PATCH 06/24] Prune bad trace before merging stream in `_get_timeseries()` --- geomagio/edge/EdgeFactory.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index f6ee9c25..ce6063b6 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -470,14 +470,20 @@ class EdgeFactory(TimeseriesFactory): # NOTE: this is not possible with single calls to Stream.merge() st_tmp = Stream() for tr in data: - # add tr to temporary stream - st_tmp += tr - # replace time overlaps with gaps - st_tmp.merge(0) - # add tr to temporary stream again - st_tmp += tr - # replace gaps with tr's data - st_tmp.merge(0) + try: + # add tr to temporary stream + st_tmp += tr + # replace time overlaps with gaps + st_tmp.merge(0) + # add tr to temporary stream again + st_tmp += tr + # replace gaps with tr's data + st_tmp.merge(0) + except Exception as e: + tr = st_tmp.pop() # remove bad trace + print("Dropped trace: ", tr) + print("Ignoring error: ", e) + # point `data` to the new stream and continue processing data = st_tmp -- GitLab From 5d950c09c22c3063391c62941e277cfaf948dcbc Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Mon, 16 Dec 2024 15:57:31 -0700 Subject: [PATCH 07/24] Remove unnecessary (and flawed...removed all zeros, which might be valid) "sanity check" in _put_channel() --- geomagio/edge/EdgeFactory.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index ce6063b6..2b157979 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -656,10 +656,6 @@ class EdgeFactory(TimeseriesFactory): stream = self._convert_stream_to_masked(timeseries=timeseries, channel=channel) stream = stream.split() - # Make certain there's actually data - if not numpy.ma.any(stream[0].data): - return - for trace in stream: trace_send = trace.copy() trace_send.trim(starttime, endtime) -- GitLab From 8731bceaf51e5b2e34a3ff7881f58e4c8e4a99a1 Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Fri, 27 Dec 2024 14:08:31 -0700 Subject: [PATCH 08/24] Move write factory closes inside write_mode blocks. --- geomagio/edge/EdgeFactory.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index 2b157979..234b2f65 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -684,6 +684,9 @@ class EdgeFactory(TimeseriesFactory): ) trace_send.data = trace_send.data.astype(int) # ric requires ints ric.send_trace(interval, trace_send) + if self.forceout: + ric.forceout() + ric.close() elif self.write_mode.lower() in ["miniseed"]: # FIXME: input clients should be rewritten for consistency, @@ -697,18 +700,12 @@ class EdgeFactory(TimeseriesFactory): trace_send.stats.channel = sncl.channel stream_send = Stream(trace_send) msic.send(stream_send) + msic.close() else: raise TimeseriesFactoryException( "Unrecognized write_mode specified: ", self.write_mode ) - if self.write_mode.lower() in ["raw"]: - if self.forceout: - ric.forceout() - ric.close() - elif self.write_mode.lower() in ["miniseed"]: - msic.close() - def _set_metadata( self, stream: Stream, -- GitLab From 167845f172a87c50c81681f2809169283dcfdeee Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Sat, 28 Dec 2024 10:36:19 -0700 Subject: [PATCH 09/24] Clean up abandoned `__init__` options for specifying read/write clients --- geomagio/edge/EdgeFactory.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index 234b2f65..b395b2c1 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -96,9 +96,7 @@ class EdgeFactory(TimeseriesFactory): self, host: str = "edgecwb.usgs.gov", port: Optional[int] = None, - client: Optional[str] = None, write_port: Optional[int] = None, - write_client: Optional[str] = None, tag: str = "GeomagAlg", forceout: bool = False, observatory: Optional[str] = None, @@ -115,9 +113,7 @@ class EdgeFactory(TimeseriesFactory): TimeseriesFactory.__init__(self, observatory, channels, type, interval) self.host = host self.port = port - self.client = client self.write_port = write_port - self.write_client = write_client self.tag = tag self.forceout = forceout self.interval = interval -- GitLab From 256f10d43aa988eea07091a21d7f5719ae06f4a5 Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Fri, 3 Jan 2025 11:57:31 -0700 Subject: [PATCH 10/24] Made input/output port configuration is more flexible --- geomagio/edge/EdgeFactory.py | 108 +++++++++++++++++++++++------------ 1 file changed, 73 insertions(+), 35 deletions(-) diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index b395b2c1..a0a6683d 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -39,9 +39,15 @@ class EdgeFactory(TimeseriesFactory): host: str a string representing the IP number of the host to connect to. port: integer - the port number the waveserver is listening on. + the port number to retrieve data from; if not set, use `ws_port` + for today's data and `qs_port` for older data; if only one of + those ports is set, use it for all data; if no ports are set, + default to 2060 (a "standard" waveserver port) write_port: integer - the port number the client is writing to. + the port number to store data to; if not set, use `ris_port` + for today's data and `mss_port` for older data; if only one of + those ports is set, use it for all data; if no ports are set, + there are not "standard" ris or mss ports, so fail on writes tag: str A tag used by edge to log and associate a socket with a given data source @@ -74,11 +80,23 @@ class EdgeFactory(TimeseriesFactory): sncl_mode: {'geomag','legacy'} force mode to convert common names to SEED SNCL codes (that is, station, network, channel, location codes); default = legacy - write_mode: {'raw', 'miniseed'} - raw writes to Edge's RAM buffer, so better for real time interaction; - miniseed writes complete miniseed blocks, so better for large uploads - becaue it reduces processing required by Edge; default = raw - + ws_port: int + port for retrieving data using waveserver protocol; this allows + access to RAM buffer, and potentially more up-to-date real time + data; not valid if stored data are not integers + qs_port: int + port for retrieving data using queryserver protocol; this uses + miniseed binary, and is more efficient for larger transfers; this + does not access Edge's RAM buffer, limiting real time access to + low sample rate data, but can be used to pull non-integer data + ris_port: int + port for storing data via Edge's RawInputServer (RIS); this places + data sample-by-sample into a RAM buffer, where it accumulates until + a full miniseed block can be written to disk + mss_port: int + port for storing data via Edge's MiniSeedServer (MSS); this creates + a miniseed block client-side, then sends this to Edge to write to + disk immediately See Also -------- @@ -86,10 +104,11 @@ class EdgeFactory(TimeseriesFactory): Notes ----- - This is designed to read from any earthworm style waveserver, but it - currently only writes to an edge. Edge mimics an earthworm style - waveserver close enough that we hope to maintain that compatibility - for reading. + Initial versions of this factory were limited in their ability to + interface with EdgeCWB. Capabilities have been expanded, espeically + with regard to reading/writing miniseed blocks directly. However, to + retain backward compatibility, especially with regard to "defaults", + some extraneous logic was necessary. """ def __init__( @@ -108,7 +127,10 @@ class EdgeFactory(TimeseriesFactory): convert_channels: Optional[List[str]] = None, scale_factor: Optional[int] = None, sncl_mode: Optional[str] = "legacy", - write_mode: Optional[str] = "raw", + ws_port: Optional[int] = None, + qs_port: Optional[int] = None, + ris_port: Optional[int] = None, + mss_port: Optional[int] = None, ): TimeseriesFactory.__init__(self, observatory, channels, type, interval) self.host = host @@ -123,8 +145,6 @@ class EdgeFactory(TimeseriesFactory): self.convert_channels = convert_channels or [] self.scale_factor = scale_factor self.sncl_mode = sncl_mode - self.write_mode = write_mode - if sncl_mode == "legacy": self.get_sncl = LegacySNCL.get_sncl elif sncl_mode == "geomag": @@ -132,6 +152,14 @@ class EdgeFactory(TimeseriesFactory): else: raise TimeseriesFactoryException("Unrecognized SNCL mode") + if port is ws_port is qs_port is None: + # for backward compatibility + ws_port = 2060 + self.ws_port = ws_port + self.qs_port = qs_port + self.ris_port = ris_port + self.mss_port = mss_port + def get_timeseries( self, starttime: UTCDateTime, @@ -263,7 +291,13 @@ class EdgeFactory(TimeseriesFactory): ) for channel in channels: self._put_channel( - timeseries, observatory, channel, type, interval, starttime, endtime + timeseries.select(channel=channel), + observatory, + channel, + type, + interval, + starttime, + endtime, ) def get_calculated_timeseries( @@ -410,31 +444,35 @@ class EdgeFactory(TimeseriesFactory): if starttime < endtime: if endtime < midnight: # cwbquery port gives access to complete miniseed packets - ports = [22061] + ports = [self.qs_port or self.ws_port] starttimes = [starttime] endtimes = [endtime] elif starttime >= midnight: # earthworm port gives access to really realtime data - ports = [2060] + ports = [self.ws_port or self.qs_port] starttimes = [starttime] endtimes = [endtime] else: # starttime-endtime straddle midnight # port=22061 before today; port=2060 for today - ports = [22061, 2060] + ports = [ + (self.qs_port or self.ws_port), + (self.ws_port or self.qs_port), + ] starttimes = [starttime, midnight] endtimes = [midnight - half_delta, endtime] data = Stream() for i, port in enumerate(ports): - if port == 2060: - # earthworm for really realtime data (slower) - client = earthworm.Client(self.host, port) - else: + if port and port == self.qs_port: # neic for complete miniseed packets (faster) client = neic.Client(self.host, port) + if port and (port == self.port or port == self.ws_port): + # earthworm for really realtime data (slower) + client = earthworm.Client(self.host, port) + try: data += client.get_waveforms( sncl.network, @@ -632,7 +670,7 @@ class EdgeFactory(TimeseriesFactory): channel to load type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type - interval: {'second', 'minute', 'hour', 'day'} + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} data interval starttime: UTCDateTime endtime: UTCDateTime @@ -649,10 +687,12 @@ class EdgeFactory(TimeseriesFactory): location=self.locationCode, ) - stream = self._convert_stream_to_masked(timeseries=timeseries, channel=channel) - stream = stream.split() + stream_masked = self._convert_stream_to_masked( + timeseries=timeseries, channel=channel + ) + stream_split = stream_masked.split() - for trace in stream: + for trace in stream_split: trace_send = trace.copy() trace_send.trim(starttime, endtime) if channel == "D": @@ -666,13 +706,14 @@ class EdgeFactory(TimeseriesFactory): # everything else (mostly magnetics stored as picoteslas) trace_send.data = trace_send.data * (self.scale_factor or 1e3) - if self.write_mode.lower() in ["raw"]: + # if self.write_mode.lower() in ["raw"]: + if self.write_port or self.ris_port: # FIXME: input clients should be rewritten for consistency, # allowing us to clean up stuff like this if-else-block ric = RawInputClient( self.tag, self.host, - self.write_port, + self.write_port or self.ris_port, sncl.station, sncl.channel, sncl.location, @@ -684,12 +725,11 @@ class EdgeFactory(TimeseriesFactory): ric.forceout() ric.close() - elif self.write_mode.lower() in ["miniseed"]: + # elif self.write_mode.lower() in ["miniseed"]: + elif self.mss_port: # FIXME: input clients should be rewritten for consistency, # allowing us to clean up stuff like this if-else-block - msic = MiniSeedInputClient( - self.host, self.write_port, encoding="STEIM2" - ) + msic = MiniSeedInputClient(self.host, self.mss_port, encoding="STEIM2") trace_send.stats.station = sncl.station trace_send.stats.location = sncl.location trace_send.stats.network = sncl.network @@ -698,9 +738,7 @@ class EdgeFactory(TimeseriesFactory): msic.send(stream_send) msic.close() else: - raise TimeseriesFactoryException( - "Unrecognized write_mode specified: ", self.write_mode - ) + raise TimeseriesFactoryException("Valid write port was not specified.") def _set_metadata( self, -- GitLab From a77065b2763bd59b141f0438a83d94846ca09d5e Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Fri, 3 Jan 2025 11:59:19 -0700 Subject: [PATCH 11/24] Add `tenhertz` to interval options --- geomagio/edge/LegacySNCL.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/geomagio/edge/LegacySNCL.py b/geomagio/edge/LegacySNCL.py index f80b438a..b0fdb39e 100644 --- a/geomagio/edge/LegacySNCL.py +++ b/geomagio/edge/LegacySNCL.py @@ -75,7 +75,9 @@ def _check_predefined_element(channel: str) -> Optional[str]: def _get_channel_start(interval: str) -> str: - if interval == "second": + if interval == "tenhertz": + return "T" + elif interval == "second": return "S" elif interval == "minute": return "M" -- GitLab From 08708ab170761b5a9c6c005b8f286df313a20685 Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Fri, 3 Jan 2025 12:04:26 -0700 Subject: [PATCH 12/24] Changes needed for `geomag-filter realtime` to work with new EdgeFactory (and no longer use MiniSeedFactory). --- geomagio/processing/filters.py | 211 ++++++++++++++++++++++++++------- 1 file changed, 169 insertions(+), 42 deletions(-) diff --git a/geomagio/processing/filters.py b/geomagio/processing/filters.py index e69d762e..6c7ffe27 100644 --- a/geomagio/processing/filters.py +++ b/geomagio/processing/filters.py @@ -7,7 +7,7 @@ from ..algorithm import Algorithm, FilterAlgorithm from ..Controller import Controller, get_realtime_interval from ..geomag_types import DataInterval from ..TimeseriesFactory import TimeseriesFactory -from .factory import get_edge_factory, get_miniseed_factory +from ..edge import EdgeFactory class DataFormat(str, Enum): @@ -104,13 +104,14 @@ def hour_command( help=""" ObsRIO: - Filters 10Hz U,V,W miniseed to 1 second miniseed + Filters 10Hz U,V,W to 1 second U,V,W - Filters 1 second U,V,W,F miniseed to 1 minute miniseed + Filters 1 second U,V,W,F to 1 minute U,V,W,F - Filters 1 second T1-4 miniseed to 1 minute UK1-4 legacy + Filters 1 second T1-4 to 1 minute UK1-4 - Copies 1 second and 1 minute U,V,W,F miniseed to H,E,Z,F earthworm + Copies 1 second and 1 minute U,V,W,F miniseed to legacy + H,E,Z,F PCDCP: @@ -124,25 +125,71 @@ def realtime_command( observatory: str = Argument(None, help="observatory id"), input_host: str = Option("127.0.0.1", help="host to request data from"), output_host: str = Option("127.0.0.1", help="host to write data to"), - input_port: int = Option(None, help="port to retrieve data through"), - output_port: int = Option(None, help="port to write data through"), + input_port: int = Option(None, help="port to pull inputs from on input host"), + output_read_port: int = Option(None, help="port to check for gaps on output host"), + output_port: int = Option(None, help="port to write to on output host"), data_format: DataFormat = Option(DataFormat.PCDCP, help="Data acquisition system"), realtime_interval: int = Option(600, help="length of update window (in seconds)"), update_limit: int = Option(10, help="number of update windows"), ): if data_format == DataFormat.OBSRIO: + # ObsRIO data logger processing + + _copy_channels( + # + # copy and compress (non-tenhertz) float32 channels + # TEMPORARY UNTIL INPUTS CONVERTED TO INT32 + # + observatory=observatory, + channels=( + ("LFF", "LFF"), + ("LK1", "LK1"), + ("LK2", "LK2"), + ("LK3", "LK3"), + ("LK4", "LK4"), + ), + interval="second", + input_factory=EdgeFactory( + host=input_host, + qs_port=2061, # hard-code query server port + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=2060, # earthworm required for realtime + write_port=7981, # hard-code raw input port + type="variation", + sncl_mode="geomag", + ), + realtime_interval=realtime_interval, + update_limit=update_limit, + ) + second_filter( observatory=observatory, - input_factory=get_miniseed_factory( - host=input_host, convert_channels=("U", "V", "W"), input_port=input_port + channels=("U", "V", "W"), + input_factory=EdgeFactory( + host=input_host, + qs_port=2061, # force query server port until inputs are integers + type="variation", + convert_channels=("U", "V", "W"), + sncl_mode="geomag", ), - output_factory=get_miniseed_factory( - host=output_host, output_port=output_port + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, # earthworm port required for realtime + write_port=output_port, # rawinput port required for realtime + type="variation", + sncl_mode="geomag", ), realtime_interval=realtime_interval, update_limit=update_limit, ) + _copy_channels( + # duplicate channels using legacy SNCLs + # TEMPORARY UNTIL SNCL MIGRATION COMPLETED observatory=observatory, channels=( ("U", "H"), @@ -151,27 +198,73 @@ def realtime_command( ("F", "F"), ), interval="second", - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - output_factory=get_edge_factory(host=output_host, output_port=output_port), + input_factory=EdgeFactory( + host=input_host, + port=input_port, # earthworm port required for realtime + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, # earthworm required for realtime + write_port=7981, # hard-code rawinput until migration, then remove + type="variation", + sncl_mode="legacy", + ), realtime_interval=realtime_interval, update_limit=update_limit, ) - temperature_filter( + + minute_filter( + observatory=observatory, + channels=("U", "V", "W", "F", "T1", "T2", "T3", "T4"), + input_factory=EdgeFactory( + host=input_host, + port=input_port, # earthworm port required for realtime + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, # earthworm port required for realtime + write_port=output_port, # rawinput port required for realtime + type="variation", + sncl_mode="geomag", + ), + realtime_interval=realtime_interval, + update_limit=update_limit, + ) + + _copy_channels( + # duplicate channels using legacy SNCLs + # TEMPORARY UNTIL SNCL MIGRATION COMPLETED observatory=observatory, channels=( - ("LK1", "UK1"), - ("LK2", "UK2"), - ("LK3", "UK3"), - ("LK4", "UK4"), + ("U", "H"), + ("V", "E"), + ("W", "Z"), + ("F", "F"), + ), + interval="minute", + input_factory=EdgeFactory( + host=input_host, + port=input_port, # earthworm port required for realtime + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, # earthworm port required for realtime + write_port=7981, # hard-coded rawinput until migration, then remove + type="variation", + sncl_mode="legacy", ), - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - input_interval="second", - output_factory=get_edge_factory(host=output_host, output_port=output_port), - output_interval="minute", realtime_interval=realtime_interval, update_limit=update_limit, ) + else: + # legacy PCDCP processing _copy_channels( observatory=observatory, channels=( @@ -181,22 +274,43 @@ def realtime_command( ("F", "F"), ), interval="second", - input_factory=get_edge_factory(host=input_host, input_port=input_port), - output_factory=get_miniseed_factory( - host=output_host, output_port=output_port + input_factory=EdgeFactory( + host=input_host, + input_port=input_port, # earthworm port required for realtime + type="variation", + sncl_mode="legacy", + ), + output_factory=EdgeFactory( + host=output_host, + output_port=output_read_port, # earthworm port required for realtime + write_port=output_port, + type="variation", + sncl_mode="geomag", ), realtime_interval=realtime_interval, update_limit=update_limit, ) - minute_filter( - observatory=observatory, - channels=("U", "V", "W", "F"), - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - output_factory=get_miniseed_factory(host=output_host, output_port=output_port), - realtime_interval=realtime_interval, - update_limit=update_limit, - ) - if data_format == DataFormat.OBSRIO: + + minute_filter( + observatory=observatory, + channels=("U", "V", "W", "F"), + input_factory=EdgeFactory( + host=input_host, + port=input_port, # earthworm port required for realtime + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, # earthworm port required for realtime + write_port=output_port, # rawinput port required for realtime + type="variation", + sncl_mode="geomag", + ), + realtime_interval=realtime_interval, + update_limit=update_limit, + ) + _copy_channels( observatory=observatory, channels=( @@ -206,8 +320,19 @@ def realtime_command( ("F", "F"), ), interval="minute", - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - output_factory=get_edge_factory(host=output_host, output_port=output_port), + input_factory=EdgeFactory( + host=input_host, + input_port=input_port, # earthworm port required for realtime + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, # earthworm port required for realtime + write_port=7981, # hard-coded rawinput until migration, then remove + type="variation", + sncl_mode="legacy", + ), realtime_interval=realtime_interval, update_limit=update_limit, ) @@ -342,9 +467,9 @@ def minute_filter( """ starttime, endtime = get_realtime_interval(realtime_interval) controller = Controller( - inputFactory=input_factory or get_miniseed_factory(), + inputFactory=input_factory, inputInterval="second", - outputFactory=output_factory or get_miniseed_factory(), + outputFactory=output_factory, outputInterval="minute", ) for channel in channels: @@ -368,6 +493,7 @@ def minute_filter( def second_filter( observatory: str, + channels: List[str] = ["U", "V", "W"], input_factory: Optional[TimeseriesFactory] = None, output_factory: Optional[TimeseriesFactory] = None, realtime_interval: int = 600, @@ -379,6 +505,8 @@ def second_filter( ----------- observatory: str observatory id + channels: array + list of channels to filter input_factory: TimeseriesFactory factory to request data output_factory: TimeseriesFactory @@ -390,13 +518,12 @@ def second_filter( """ starttime, endtime = get_realtime_interval(realtime_interval) controller = Controller( - inputFactory=input_factory - or get_miniseed_factory(convert_channels=("U", "V", "W")), + inputFactory=input_factory, inputInterval="tenhertz", - outputFactory=output_factory or get_miniseed_factory(), + outputFactory=output_factory, outputInterval="second", ) - for channel in ("U", "V", "W"): + for channel in channels: controller.run_as_update( algorithm=FilterAlgorithm( input_sample_period=0.1, -- GitLab From c0793a59dc934ae7b5c11761a0c22f32a3cb47ac Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Fri, 3 Jan 2025 14:37:02 -0700 Subject: [PATCH 13/24] Remove "default" ports...probably this entire module should be removed, but that requires additional work elsewhere. --- geomagio/processing/factory.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/geomagio/processing/factory.py b/geomagio/processing/factory.py index b857d68c..17ff3aa7 100644 --- a/geomagio/processing/factory.py +++ b/geomagio/processing/factory.py @@ -11,10 +11,6 @@ def get_edge_factory( output_port=os.getenv("OUTPUT_EDGE_PORT", None), **kwargs ) -> EdgeFactory: - if input_port is None: - input_port = 2060 - if output_port is None: - output_port = 7981 return EdgeFactory( host=host, interval=interval, @@ -33,10 +29,6 @@ def get_miniseed_factory( output_port=os.getenv("OUTPUT_MINISEED_PORT", None), **kwargs ) -> MiniSeedFactory: - if input_port is None: - input_port = 2061 - if output_port is None: - output_port = 7974 return MiniSeedFactory( host=host, interval=interval, -- GitLab From c943b4d4727bdb2597efac7db2ec964a7a36e5ab Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Sat, 4 Jan 2025 10:18:02 -0700 Subject: [PATCH 14/24] Long-time bug that wrote bad ratemantissa/ratedivisor values to Edge for sample periods longer than 1 minute --- geomagio/edge/RawInputClient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geomagio/edge/RawInputClient.py b/geomagio/edge/RawInputClient.py index 9e09e0ed..722cc340 100644 --- a/geomagio/edge/RawInputClient.py +++ b/geomagio/edge/RawInputClient.py @@ -388,7 +388,7 @@ class RawInputClient: if rate > 0.9999: ratemantissa = int(rate * 100 + 0.001) ratedivisor = -100 - elif rate * 60.0 - 1.0 < 0.00000001: # one minute data + elif abs(rate * 60.0 - 1.0) < 0.00000001: # one minute data ratemantissa = -60 ratedivisor = 1 else: -- GitLab From ff3dd02b6b5fbcb4a89bf7eb7a9f88ed0f4dff18 Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Wed, 8 Jan 2025 16:57:06 -0700 Subject: [PATCH 15/24] Remove redundant `pad_and_trim` (this is handled by _post_process()) --- geomagio/edge/EdgeFactory.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index a0a6683d..9a61c502 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -532,10 +532,7 @@ class EdgeFactory(TimeseriesFactory): network=sncl.network, location=sncl.location, ) - if data.count() != 0: - TimeseriesUtility.pad_and_trim_trace( - trace=data[0], starttime=starttime, endtime=endtime - ) + self._set_metadata(data, observatory, channel, type, interval) return data -- GitLab From a33e87f17573cfdf83ffa00cc0268d8248a0156e Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Wed, 8 Jan 2025 16:58:56 -0700 Subject: [PATCH 16/24] Additional fixes to handle periods longer than 1 minute --- geomagio/edge/RawInputClient.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/geomagio/edge/RawInputClient.py b/geomagio/edge/RawInputClient.py index 722cc340..1ddf0001 100644 --- a/geomagio/edge/RawInputClient.py +++ b/geomagio/edge/RawInputClient.py @@ -384,16 +384,30 @@ class RawInputClient: tuple: (ratemantissa, ratedivosor) ratemantissa: int ratedivosor: int + + NOTE that `ratemantissa` and `ratedivisor` map directly to the SEED + standard's "sample rate factor" and "sample rate multiplier", which + are stored in a miniseed block's fixed data header as 2-byte words; + earlier versions of this method did not handle sample rates lower + than 1/second properly (they did handle 1/minute as a special-case); """ if rate > 0.9999: + # sample rates greater or equal to 1/second + # (this will not handle >327 Hz, but for consistency + # with earlier versions, we leave it unchanged) ratemantissa = int(rate * 100 + 0.001) ratedivisor = -100 - elif abs(rate * 60.0 - 1.0) < 0.00000001: # one minute data - ratemantissa = -60 + elif rate * 21600 > 1.0001: + # sample periods between 1 second and 6 hours + # (this should handle 1-minute data identically to + # earlier versions) + ratemantissa = -int(1 / rate + 0.001) ratedivisor = 1 else: - ratemantissa = int(rate * 10000.0 + 0.001) - ratedivisor = -10000 + # sample periods 6 hours and longer + # (this will not handle periods longer than ~22 years) + ratemantissa = -int(1 / rate / 21600) + ratedivisor = 21600 return (ratemantissa, ratedivisor) -- GitLab From 1cddbb570fcf28fbc76be486ea2d10abcc9cca70 Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Thu, 9 Jan 2025 14:52:02 -0700 Subject: [PATCH 17/24] test_raw_input_client used an obsolete EdgeFactory helper function. --- test/edge_test/RawInputClient_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/edge_test/RawInputClient_test.py b/test/edge_test/RawInputClient_test.py index ef21290a..876bc569 100644 --- a/test/edge_test/RawInputClient_test.py +++ b/test/edge_test/RawInputClient_test.py @@ -51,7 +51,8 @@ def test_raw_input_client(): location=location, network=network, ) - trace_send = EdgeFactory()._convert_trace_to_int(trace.copy()) + trace_send = trace.copy() + trace_send.data = numpy.int32(trace_send.data * 1000) client.send_trace("minute", trace_send) # verify data was sent assert_equal(len(client.last_send), 1) -- GitLab From 5ae598b2f7e798f27dbc4bf60962d2827bc908ca Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Thu, 9 Jan 2025 14:55:57 -0700 Subject: [PATCH 18/24] No need to check if starttime is less than endtime in _get_timeseries(), since this is done elsewhere --- geomagio/edge/EdgeFactory.py | 39 ++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index 9a61c502..13441922 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -441,26 +441,25 @@ class EdgeFactory(TimeseriesFactory): endtimes = [endtime] else: midnight = UTCDateTime(UTCDateTime.now().date) - if starttime < endtime: - if endtime < midnight: - # cwbquery port gives access to complete miniseed packets - ports = [self.qs_port or self.ws_port] - starttimes = [starttime] - endtimes = [endtime] - elif starttime >= midnight: - # earthworm port gives access to really realtime data - ports = [self.ws_port or self.qs_port] - starttimes = [starttime] - endtimes = [endtime] - else: - # starttime-endtime straddle midnight - # port=22061 before today; port=2060 for today - ports = [ - (self.qs_port or self.ws_port), - (self.ws_port or self.qs_port), - ] - starttimes = [starttime, midnight] - endtimes = [midnight - half_delta, endtime] + if endtime < midnight: + # cwbquery port gives access to complete miniseed packets + ports = [self.qs_port or self.ws_port] + starttimes = [starttime] + endtimes = [endtime] + elif starttime >= midnight: + # earthworm port gives access to really realtime data + ports = [self.ws_port or self.qs_port] + starttimes = [starttime] + endtimes = [endtime] + else: + # starttime-endtime straddle midnight + # port=22061 before today; port=2060 for today + ports = [ + (self.qs_port or self.ws_port), + (self.ws_port or self.qs_port), + ] + starttimes = [starttime, midnight] + endtimes = [midnight - half_delta, endtime] data = Stream() for i, port in enumerate(ports): -- GitLab From aa0f71aa0f6a1b8d7b6869d3ecb6d7c7b942d1fc Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Thu, 9 Jan 2025 14:57:34 -0700 Subject: [PATCH 19/24] Additional fixes to handle periods longer than 6 hours --- geomagio/edge/RawInputClient.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/geomagio/edge/RawInputClient.py b/geomagio/edge/RawInputClient.py index 1ddf0001..649fa933 100644 --- a/geomagio/edge/RawInputClient.py +++ b/geomagio/edge/RawInputClient.py @@ -402,12 +402,12 @@ class RawInputClient: # (this should handle 1-minute data identically to # earlier versions) ratemantissa = -int(1 / rate + 0.001) - ratedivisor = 1 + ratedivisor = -1 else: # sample periods 6 hours and longer # (this will not handle periods longer than ~22 years) ratemantissa = -int(1 / rate / 21600) - ratedivisor = 21600 + ratedivisor = -21600 return (ratemantissa, ratedivisor) -- GitLab From 471811b6388a19bc890cb238b627c214c7134790 Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Thu, 9 Jan 2025 15:00:47 -0700 Subject: [PATCH 20/24] Controller "update" logic fixed to handle single-sample update intervals --- geomagio/Controller.py | 60 ++++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 31 deletions(-) diff --git a/geomagio/Controller.py b/geomagio/Controller.py index 65b148d8..a17078dd 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -393,41 +393,39 @@ class Controller(object): channels=output_channels, interval=output_interval, ) - if len(output_timeseries) > 0: - # force starttime & endtime to be consistent with output_interval - # FIXME: this is lazy; a better solution would be to use something - # like FilterAlgorithm's get_nearest_time() -EJR - starttime = output_timeseries[0].stats.starttime - endtime = output_timeseries[0].stats.endtime - print( - "checking gaps", - starttime, - endtime, - output_observatory, - output_channels, - file=sys.stderr, - ) - # find gaps in output, so they can be updated - output_gaps = TimeseriesUtility.get_merged_gaps( - TimeseriesUtility.get_stream_gaps(output_timeseries) - ) - else: - output_gaps = [ - [ - starttime, - endtime, - # next sample time not used - None, - ] - ] + # - output_timeseries can never be empty; + # - output_timeseries' endtime must be greater than + # or equal to its starttime + starttime = output_timeseries[0].stats.starttime + endtime = output_timeseries[0].stats.endtime + print( + "checking gaps", + starttime, + endtime, + output_observatory, + output_channels, + file=sys.stderr, + ) + # find gaps in output, so they can be updated + output_gaps = TimeseriesUtility.get_merged_gaps( + TimeseriesUtility.get_stream_gaps(output_timeseries) + ) for output_gap in output_gaps: # check for fillable gap at start if output_gap[0] == starttime: # found fillable gap at start, recurse to previous interval - recurse_starttime, recurse_endtime = get_previous_interval( - start=starttime, - end=endtime, - ) + update_interval = round(endtime - starttime) # round to whole second + if update_interval == 0: + recurse_starttime = ( + starttime + - TimeseriesUtility.get_delta_from_interval(output_interval) + ) + recurse_endtime = recurse_starttime + else: + recurse_starttime = starttime - update_interval + recurse_endtime = ( + starttime - 1e-3 + ) # excludes final sample in update iterations self.run_as_update( algorithm=algorithm, observatory=observatory, -- GitLab From e8a8962ece6d83cf944a53c0f45acfc8536686b7 Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Thu, 9 Jan 2025 15:05:45 -0700 Subject: [PATCH 21/24] create_empty_trace() assigned the wrong dattime to a single sample interval when "interval" was longer than 1 minute. --- geomagio/TimeseriesUtility.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/geomagio/TimeseriesUtility.py b/geomagio/TimeseriesUtility.py index 932ec783..415a9877 100644 --- a/geomagio/TimeseriesUtility.py +++ b/geomagio/TimeseriesUtility.py @@ -49,17 +49,23 @@ def create_empty_trace( # Calculate first valid sample time based on interval trace_starttime = UTCDateTime(numpy.ceil(starttime.timestamp / delta) * delta) if delta > 60.0: - trace_starttime += (delta - 60) / 2 - if trace_starttime > endtime: - sys.stderr.write( - "Starttime greater than endtime, adjusting request to one sample" + # hourly and daily average have mid-interval timestamps, minus 30 seconds + if trace_starttime - starttime < (delta + 60) / 2: + trace_starttime += (delta - 60) / 2 + else: + trace_starttime += (delta - 60) / 2 - delta + + if starttime > trace_starttime or endtime < trace_starttime: + sys.stdout.write( + 'No allowed "%s" timestamps between %s and %s, ' + % (interval, starttime, endtime) + + "returning empty trace\n", ) - endtime = trace_starttime stats.starttime = trace_starttime stats.delta = delta # Calculate number of valid samples up to or before endtime length = int((endtime - trace_starttime) / delta) - stats.npts = length + 1 + stats.npts = max(0, length + 1) data = numpy.full(stats.npts, numpy.nan, dtype=numpy.float64) return Trace(data, stats) @@ -570,8 +576,9 @@ def pad_and_trim_trace(trace, starttime, endtime): if trace_endtime > endtime: # trim to endtime, at least 1 sample to remove cnt = int(math.ceil(round((trace_endtime - endtime) / trace_delta, 6))) - trace.data = trace.data[:-cnt] - trace.stats.npts = len(trace.data) + if cnt > 0: + trace.data = trace.data[:-cnt] + trace.stats.npts = len(trace.data) elif trace_endtime < endtime: # pad to endtime cnt = int(round((endtime - trace_endtime) / trace.stats.delta, 6)) -- GitLab From d2a0dfc7a47d1ba8cc697009c770f0582c876155 Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Thu, 9 Jan 2025 15:13:27 -0700 Subject: [PATCH 22/24] Update geomag-filter's hour and day commands to work with recent changes to EdgeFactory. --- geomagio/processing/filters.py | 76 ++++++++++++++++------------------ 1 file changed, 36 insertions(+), 40 deletions(-) diff --git a/geomagio/processing/filters.py b/geomagio/processing/filters.py index 6c7ffe27..8c67d99b 100644 --- a/geomagio/processing/filters.py +++ b/geomagio/processing/filters.py @@ -29,33 +29,31 @@ def main(): def day_command( observatory: str = Argument(None, help="observatory id"), input_host: str = Option("127.0.0.1", help="host to request data from"), - input_port: int = Option(None, help="port to retrieve data through"), - output_port: int = Option(None, help="port to write data through"), output_host: str = Option("127.0.0.1", help="host to write data to"), + input_port: int = Option(None, help="port to pull inputs from on input host"), + output_read_port: int = Option(None, help="port to check for gaps on output host"), + output_port: int = Option(None, help="port to write to on output host"), realtime_interval: int = Option( 604800, help="length of update window (in seconds)" ), - update_limit: int = Option(7, help="number of update windows"), + update_limit: int = Option(4, help="number of update windows"), ): day_filter( observatory=observatory, - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - output_factory=get_miniseed_factory(host=output_host, output_port=output_port), - realtime_interval=realtime_interval, - update_limit=update_limit, - ) - temperature_filter( - observatory=observatory, - channels=( - ("UK1", "PK1"), - ("UK2", "PK2"), - ("UK3", "PK3"), - ("UK4", "PK4"), + channels=("U", "V", "W", "F", "T1", "T2", "T3", "T4"), + input_factory=EdgeFactory( + host=input_host, + port=input_port, # earthworm port required for realtime + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, # earthworm port required for realtime + write_port=output_port, # rawinput port required for realtime + type="variation", + sncl_mode="geomag", ), - input_factory=get_edge_factory(host=input_host, input_port=input_port), - input_interval="minute", - output_factory=get_miniseed_factory(host=output_host, output_port=output_port), - output_interval="day", realtime_interval=realtime_interval, update_limit=update_limit, ) @@ -69,30 +67,28 @@ def hour_command( observatory: str = Argument(None, help="observatory id"), input_host: str = Option("127.0.0.1", help="host to request data from"), output_host: str = Option("127.0.0.1", help="host to write data to"), - input_port: int = Option(None, help="port to retrieve data through"), - output_port: int = Option(None, help="port to write data through"), + input_port: int = Option(None, help="port to pull inputs from on input host"), + output_read_port: int = Option(None, help="port to check for gaps on output host"), + output_port: int = Option(None, help="port to write to on output host"), realtime_interval: int = Option(86400, help="length of update window (in seconds)"), - update_limit: int = Option(24, help="number of update windows"), + update_limit: int = Option(7, help="number of update windows"), ): hour_filter( observatory=observatory, - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - output_factory=get_miniseed_factory(host=output_host, output_port=output_port), - realtime_interval=realtime_interval, - update_limit=update_limit, - ) - temperature_filter( - observatory=observatory, - channels=( - ("UK1", "RK1"), - ("UK2", "RK2"), - ("UK3", "RK3"), - ("UK4", "RK4"), + channels=("U", "V", "W", "F", "T1", "T2", "T3", "T4"), + input_factory=EdgeFactory( + host=input_host, + port=input_port, # earthworm port required for realtime + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, # earthworm port required for realtime + write_port=output_port, # rawinput port required for realtime + type="variation", + sncl_mode="geomag", ), - input_factory=get_edge_factory(host=input_host, input_port=input_port), - input_interval="minute", - output_factory=get_miniseed_factory(host=output_host, output_port=output_port), - output_interval="hour", realtime_interval=realtime_interval, update_limit=update_limit, ) @@ -416,9 +412,9 @@ def hour_filter( """ starttime, endtime = get_realtime_interval(realtime_interval) controller = Controller( - inputFactory=input_factory or get_miniseed_factory(), + inputFactory=input_factory, inputInterval="minute", - outputFactory=output_factory or get_miniseed_factory(), + outputFactory=output_factory, outputInterval="hour", ) for channel in channels: -- GitLab From 3b88d500269ba2d8f50f5b472e3183c815ca18c1 Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Thu, 9 Jan 2025 15:17:35 -0700 Subject: [PATCH 23/24] Eliminate any kind of default options for *_filter functions in filters.py. Also, eliminate all references to get_*_factory, and rely entirely on EdgeFactory. --- geomagio/processing/filters.py | 54 +++++++++++++++++----------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/geomagio/processing/filters.py b/geomagio/processing/filters.py index 8c67d99b..8be2af20 100644 --- a/geomagio/processing/filters.py +++ b/geomagio/processing/filters.py @@ -336,11 +336,11 @@ def realtime_command( def day_filter( observatory: str, - channels: List[str] = ["U", "V", "W", "F"], - input_factory: Optional[TimeseriesFactory] = None, - output_factory: Optional[TimeseriesFactory] = None, - realtime_interval: int = 86400, - update_limit: int = 7, + channels: List[str], + input_factory: TimeseriesFactory, + output_factory: TimeseriesFactory, + realtime_interval: int, + update_limit: int, ): """Filter 1 second miniseed channels to 1 day @@ -361,9 +361,9 @@ def day_filter( """ starttime, endtime = get_realtime_interval(realtime_interval) controller = Controller( - inputFactory=input_factory or get_miniseed_factory(), + inputFactory=input_factory, inputInterval="minute", - outputFactory=output_factory or get_miniseed_factory(), + outputFactory=output_factory, outputInterval="day", ) for channel in channels: @@ -387,11 +387,11 @@ def day_filter( def hour_filter( observatory: str, - channels: List[str] = ["U", "V", "W", "F"], - input_factory: Optional[TimeseriesFactory] = None, - output_factory: Optional[TimeseriesFactory] = None, - realtime_interval: int = 600, - update_limit: int = 10, + channels: List[str], + input_factory: TimeseriesFactory, + output_factory: TimeseriesFactory, + realtime_interval: int, + update_limit: int, ): """Filter 1 minute miniseed channels to 1 hour @@ -438,9 +438,9 @@ def hour_filter( def minute_filter( observatory: str, - channels: List[str] = ["U", "V", "W", "F"], - input_factory: Optional[TimeseriesFactory] = None, - output_factory: Optional[TimeseriesFactory] = None, + channels: List[str], + input_factory: TimeseriesFactory, + output_factory: TimeseriesFactory, realtime_interval: int = 600, update_limit: int = 10, ): @@ -541,26 +541,26 @@ def second_filter( def temperature_filter( observatory: str, channels: List[List[str]], - input_factory: Optional[TimeseriesFactory] = None, - input_interval: int = "second", - output_factory: Optional[TimeseriesFactory] = None, - output_interval: int = "minute", - realtime_interval: int = 600, - update_limit: int = 10, + input_factory: TimeseriesFactory, + input_interval: int, + output_factory: TimeseriesFactory, + output_interval: int, + realtime_interval: int, + update_limit: int, ): """Filter temperatures 1Hz miniseed (LK1-4) to 1 minute legacy (UK1-4).""" starttime, endtime = get_realtime_interval(realtime_interval) controller = Controller( - inputFactory=input_factory or get_miniseed_factory(), + inputFactory=input_factory, inputInterval=input_interval, - outputFactory=output_factory or get_edge_factory(), + outputFactory=output_factory, outputInterval=output_interval, ) for input_channel, output_channel in channels: controller.run_as_update( algorithm=FilterAlgorithm( - input_sample_period=1, - output_sample_period=60, + input_sample_period=input_interval, + output_sample_period=output_interval, inchannels=(input_channel,), outchannels=(output_channel,), ), @@ -607,9 +607,9 @@ def _copy_channels( """ starttime, endtime = get_realtime_interval(interval_seconds=realtime_interval) controller = Controller( - inputFactory=input_factory or get_miniseed_factory(), + inputFactory=input_factory, inputInterval=interval, - outputFactory=output_factory or get_edge_factory(), + outputFactory=output_factory, outputInterval=interval, ) for input_channel, output_channel in channels: -- GitLab From a3b62d4c696b30c657348156b12d87969b6671f7 Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Thu, 9 Jan 2025 16:04:01 -0700 Subject: [PATCH 24/24] Update docstring for create_empty_trace --- geomagio/TimeseriesUtility.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geomagio/TimeseriesUtility.py b/geomagio/TimeseriesUtility.py index 415a9877..35e7ffdd 100644 --- a/geomagio/TimeseriesUtility.py +++ b/geomagio/TimeseriesUtility.py @@ -28,7 +28,7 @@ def create_empty_trace( type : str data type {definitive, quasi-definitive, variation} interval : str - interval length {minute, second} + interval length {day, hour, minute, second, tenhertz} network: str the network code station: str -- GitLab