From 2bb0ec2344f1e7aad81468b037db3f9409b4b146 Mon Sep 17 00:00:00 2001
From: "E. Joshua Rigler" <erigler@usgs.gov>
Date: Tue, 4 Feb 2025 14:53:27 -0700
Subject: [PATCH] MiniSeedFactory Upgrades: - configurable scale_factor and
 float vs. int conversion logic - configurable sncl_mode for different channel
 mapping - multiple read ports allowed for fail-over logic - configurable
 write_encoding for setting miniseed data type - generally more (if not
 perfectly) compatible with EdgeFactory

---
 geomagio/edge/MiniSeedFactory.py | 223 ++++++++++++++++++++++---------
 1 file changed, 161 insertions(+), 62 deletions(-)

diff --git a/geomagio/edge/MiniSeedFactory.py b/geomagio/edge/MiniSeedFactory.py
index 94acf540..127bf476 100644
--- a/geomagio/edge/MiniSeedFactory.py
+++ b/geomagio/edge/MiniSeedFactory.py
@@ -17,7 +17,7 @@ from typing import List, Optional
 import numpy
 import numpy.ma
 from obspy import read
-from obspy.clients.neic import client as miniseed
+from obspy.clients import neic
 from obspy.core import Stats, Stream, Trace, UTCDateTime
 
 from .. import ChannelConverter, TimeseriesUtility
@@ -28,6 +28,7 @@ from ..TimeseriesFactoryException import TimeseriesFactoryException
 from ..ObservatoryMetadata import ObservatoryMetadata
 from .MiniSeedInputClient import MiniSeedInputClient
 from .SNCL import SNCL
+from .LegacySNCL import LegacySNCL
 
 
 class MiniSeedFactory(TimeseriesFactory):
@@ -38,7 +39,11 @@ class MiniSeedFactory(TimeseriesFactory):
     host: str
         a string representing the IP number of the host to connect to.
     port: integer
-        the port number the miniseed query server is listening on.
+        port number on which EdgeCWB's queryserver (QS, obspy's "neic" client)
+        listens for requests to retrieve timeseries data.
+    write_port: integer
+        port number on which EdgeCWB's MiniSeedServer is listening for
+        requests to write timeseries data.
     observatory: str
         the observatory code for the desired observatory.
     channels: array
@@ -57,6 +62,14 @@ class MiniSeedFactory(TimeseriesFactory):
         in get_timeseries/put_timeseries
     convert_channels: array
         list of channels to convert from volt/bin to nT
+    scale_factor: float
+        override default scalings when reading/writing miniseed blocks;
+        (reading integer blocks divides by 1000; reading floating point
+         blocks divides by 1; writing all data multiplies by 1000)
+        default = None
+    sncl_mode: {'geomag','legacy'}
+        force mode to convert common names to SEED SNCL codes (that is,
+        station, network, channel, location codes); default = legacy
 
     See Also
     --------
@@ -64,17 +77,25 @@ class MiniSeedFactory(TimeseriesFactory):
 
     Notes
     -----
-    This is designed to read from any earthworm style waveserver, but it
-        currently only writes to an edge. Edge mimics an earthworm style
-        waveserver close enough that we hope to maintain that compatibility
-        for reading.
+    The MiniSeedFactory reads and writes miniseed blocks. This allows it
+    to be used to transfer non-integer data. That said, this factory cannot
+    read from or write to EdgeCWB's RAM buffer, and so is not able to access
+    eally "real time" data. MiniSeedFactory should be used for large data
+    transfers, or to transfer data that cannot be handled by the default
+    EdgeFactory (that is, non-integer data, or data that has not yet been
+    written to the EdgeCWB disk store).
+
+    Also, this factory allows one to write non-full miniseed blocks. This
+    is occasionally useful, but more often it results in wasted disk space
+    on the EdgeCWB server.
     """
 
     def __init__(
         self,
-        host: str = "edgecwb.usgs.gov",
-        port: int = 2061,
-        write_port: int = 7974,
+        host: Optional[str] = None,
+        port: Optional[int] = None,
+        write_port: Optional[int] = None,
+        write_encoding: Optional[str] = None,
         observatory: Optional[str] = None,
         channels: Optional[List[str]] = None,
         type: Optional[DataType] = None,
@@ -82,18 +103,25 @@ class MiniSeedFactory(TimeseriesFactory):
         observatoryMetadata: Optional[ObservatoryMetadata] = None,
         locationCode: Optional[str] = None,
         convert_channels: Optional[List[str]] = None,
+        scale_factor: Optional[int] = None,
+        sncl_mode: Optional[str] = "geomag",
     ):
         TimeseriesFactory.__init__(self, observatory, channels, type, interval)
-
-        self.client = miniseed.Client(host, port)
+        self.host = host or "edgecwb.usgs.gov"
+        self.port = port or [22061, 2061]
+        self.write_port = write_port  # no default write port
+        self.write_encoding = write_encoding or "float32"
         self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata()
         self.locationCode = locationCode
-        self.interval = interval
-        self.host = host
-        self.port = port
-        self.write_port = write_port
         self.convert_channels = convert_channels or []
-        self.write_client = MiniSeedInputClient(self.host, self.write_port)
+        self.scale_factor = scale_factor
+        self.sncl_mode = sncl_mode
+        if sncl_mode == "legacy":
+            self.get_sncl = LegacySNCL.get_sncl
+        elif sncl_mode == "geomag":
+            self.get_sncl = SNCL.get_sncl
+        else:
+            raise TimeseriesFactoryException("Unrecognized SNCL mode")
 
     def get_timeseries(
         self,
@@ -110,19 +138,19 @@ class MiniSeedFactory(TimeseriesFactory):
         Parameters
         ----------
         starttime: UTCDateTime
-            time of first sample
+            time of first sample.
         endtime: UTCDateTime
-            time of last sample
-        add_empty_channels: bool
-            if True, returns channels without data as empty traces
+            time of last sample.
         observatory: str
-            observatory code
+            observatory code.
         channels: array
             list of channels to load
         type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
             data type
         interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
             data interval
+        add_empty_channels: bool
+            if True, returns channels without data as empty traces
 
         Returns
         -------
@@ -226,10 +254,14 @@ class MiniSeedFactory(TimeseriesFactory):
                 )
         for channel in channels:
             self._put_channel(
-                timeseries, observatory, channel, type, interval, starttime, endtime
+                timeseries.select(channel=channel),
+                observatory,
+                channel,
+                type,
+                interval,
+                starttime,
+                endtime,
             )
-        # close socket
-        self.write_client.close()
 
     def get_calculated_timeseries(
         self,
@@ -345,7 +377,7 @@ class MiniSeedFactory(TimeseriesFactory):
         type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
             data type
         interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
-            interval length
+            data interval
         add_empty_channels: bool
             if True, returns channels without data as empty traces
 
@@ -365,31 +397,73 @@ class MiniSeedFactory(TimeseriesFactory):
         # according to its author, EdgeCWB is inclusive of starttime, but exclusive of
         # endtime, to satisfy seismic standards/requirements, to precision delta/2;
         half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2
-        data = self.client.get_waveforms(
-            sncl.network,
-            sncl.station,
-            sncl.location,
-            sncl.channel,
-            starttime,
-            endtime + half_delta,
-        )
-        for trace in data:
-            trace.data = trace.data.astype(data[0].data.dtype)
 
-        # handle overlapping samples in Traces with identical NSCL codes by
-        # prioritizing samples from those Traces in the Stream `data` that come
-        # later in sequence; these will most likely be the most recent versions
-        # (this is not possible with single calls to Stream.merge())
+        # list-type ports variable needed for fail-back logic
+        try:
+            ports = list(self.port)
+        except TypeError:
+            ports = [self.port]
+
+        data = Stream()
+        for port in ports:
+            try:
+                client = neic.Client(self.host, port, timeout=10)
+
+                data += client.get_waveforms(
+                    sncl.network,
+                    sncl.station,
+                    sncl.location,
+                    sncl.channel,
+                    starttime,
+                    endtime + half_delta,
+                )
+
+                if data:
+                    # if data was returned, force this port in subsequent calls
+                    # to get_timeseries() that use this instance of EdgeFactory
+                    self.port = [port]
+                    break
+                print("No data returned from ", self.host, " on port ", port)
+                # try alternate port(s) if provided
+            except Exception as e:
+                print("Failed to get data from ", self.host, " on port ", port)
+                print("Ignoring error: ", e)
+                # try alternate port(s) if provided
+                continue
+
+        for trace in data:
+            if trace.data.dtype.kind == "i":
+                # convert all integer traces to rescaled 64-bit floats;
+                if sncl.channel[1] == "E":
+                    # instrumental voltages are stored as 1/10 microvolts
+                    trace.data = trace.data.astype(float) / (self.scale_factor or 1e7)
+                else:
+                    # everything else (mostly magnetics stored as picoteslas)
+                    trace.data = trace.data.astype(float) / (self.scale_factor or 1e3)
+            elif trace.data.dtype.kind == "f":
+                # convert all float traces to 64-bit floats
+                trace.data = trace.data.astype(float)
+
+        # when Traces with identical NSCL codes overlap, prioritize samples
+        # that come  "later" in Stream; this depends on Edge returning miniseed
+        # packets in the order written
+        # NOTE: this is not possible with single calls to Stream.merge()
         st_tmp = Stream()
         for tr in data:
-            # add tr to temporary stream
-            st_tmp += tr
-            # replace time overlaps with gaps
-            st_tmp.merge(0)
-            # add tr to temporary stream again
-            st_tmp += tr
-            # replace gaps with tr's data
-            st_tmp.merge(0)
+            try:
+                # add tr to temporary stream
+                st_tmp += tr
+                # replace time overlaps with gaps
+                st_tmp.merge(0)
+                # add tr to temporary stream again
+                st_tmp += tr
+                # replace gaps with tr's data
+                st_tmp.merge(0)
+            except Exception as e:
+                tr = st_tmp.pop()  # remove bad trace
+                print("Dropped trace: ", tr)
+                print("Ignoring error: ", e)
+
         # point `data` to the new stream and continue processing
         data = st_tmp
 
@@ -497,7 +571,7 @@ class MiniSeedFactory(TimeseriesFactory):
         channels: List[str],
     ):
         """Post process a timeseries stream after the raw data is
-                is fetched from querymom. Specifically changes
+                is fetched from queryserver. Specifically changes
                 any MaskedArray to a ndarray with nans representing gaps.
                 Then calls pad_timeseries to deal with gaps at the
                 beggining or end of the streams.
@@ -553,26 +627,51 @@ class MiniSeedFactory(TimeseriesFactory):
         starttime: UTCDateTime
         endtime: UTCDateTime
         """
-        # use separate traces when there are gaps
-        to_write = timeseries.select(channel=channel)
-        to_write = TimeseriesUtility.mask_stream(to_write)
-        to_write = to_write.split()
-        to_write = TimeseriesUtility.unmask_stream(to_write)
-        # relabel channels from internal to edge conventions
-        sncl = SNCL.get_sncl(
+        sncl = self.get_sncl(
             station=observatory,
             data_type=type,
             interval=interval,
             element=channel,
             location=self.locationCode,
         )
-        for trace in to_write:
-            trace.stats.station = sncl.station
-            trace.stats.location = sncl.location
-            trace.stats.network = sncl.network
-            trace.stats.channel = sncl.channel
-        # finally, send to edge
-        self.write_client.send(to_write)
+
+        stream_masked = self._convert_stream_to_masked(
+            timeseries=timeseries, channel=channel
+        )
+        stream_split = stream_masked.split()
+
+        for trace in stream_split:
+            trace_send = trace.copy()
+            trace_send.trim(starttime, endtime)
+            if channel == "D":
+                trace_send.data = ChannelConverter.get_minutes_from_radians(
+                    trace_send.data
+                )
+
+            if "float" not in self.write_encoding:
+                # if write_encoding is a float, ignore scale_factor
+                if sncl.channel[1] == "E":
+                    # instrumental voltages are stored as 1/10 microvolts
+                    trace_send.data = trace_send.data * (self.scale_factor or 1e7)
+                else:
+                    # everything else (mostly magnetics stored as picoteslas)
+                    trace_send.data = trace_send.data * (self.scale_factor or 1e3)
+
+            if self.write_port:
+                # FIXME: input clients should be rewritten for consistency,
+                #        allowing us to clean up stuff like this if-else-block
+                msic = MiniSeedInputClient(
+                    self.host, self.write_port, self.write_encoding
+                )
+                trace_send.stats.station = sncl.station
+                trace_send.stats.location = sncl.location
+                trace_send.stats.network = sncl.network
+                trace_send.stats.channel = sncl.channel
+                stream_send = Stream(trace_send)
+                msic.send(stream_send)
+                msic.close()
+            else:
+                raise TimeseriesFactoryException("Valid write port was not specified.")
 
     def _set_metadata(
         self,
-- 
GitLab