From e3387de2c4c11016628413518dce8a053f962fba Mon Sep 17 00:00:00 2001
From: "E. Joshua Rigler" <erigler@usgs.gov>
Date: Tue, 4 Feb 2025 15:30:43 -0700
Subject: [PATCH] Update filters.py (code for geomag-filter typer app): - stop
 using factory.py; just instantiate factories in filters.py...it's only a
 little extra code, and a lot less confusing - copy 1-second ObsRio float data
 (e.g., F, UK*) to RawInput so it is available using EdgeFactory, and also so
 we have the option in the future to prune 10Hz data from EdgeCWB that has
 already been archived at IRIS. - set up calls to _copy_channel to legacy
 sncls to be removed once Edge data migration is complete - make all *_filter
 methods require an instantiated io factory, and set no default options -
 other than the *temporary* write ports for copies of legacy sncl data, allow
 no default ports in filters.py; either accept the factory defaults, or force
 user to specify ports with call to geomag-filter app.

---
 geomagio/processing/filters.py | 377 ++++++++++++++++++++++++---------
 1 file changed, 272 insertions(+), 105 deletions(-)

diff --git a/geomagio/processing/filters.py b/geomagio/processing/filters.py
index e69d762e..2408ffa1 100644
--- a/geomagio/processing/filters.py
+++ b/geomagio/processing/filters.py
@@ -7,7 +7,7 @@ from ..algorithm import Algorithm, FilterAlgorithm
 from ..Controller import Controller, get_realtime_interval
 from ..geomag_types import DataInterval
 from ..TimeseriesFactory import TimeseriesFactory
-from .factory import get_edge_factory, get_miniseed_factory
+from ..edge import EdgeFactory, MiniSeedFactory
 
 
 class DataFormat(str, Enum):
@@ -29,33 +29,57 @@ def main():
 def day_command(
     observatory: str = Argument(None, help="observatory id"),
     input_host: str = Option("127.0.0.1", help="host to request data from"),
-    input_port: int = Option(None, help="port to retrieve data through"),
-    output_port: int = Option(None, help="port to write data through"),
     output_host: str = Option("127.0.0.1", help="host to write data to"),
+    input_port: int = Option(None, help="port to pull inputs from on input host"),
+    output_read_port: int = Option(None, help="port to check for gaps on output host"),
+    output_port: int = Option(None, help="port to write to on output host"),
     realtime_interval: int = Option(
         604800, help="length of update window (in seconds)"
     ),
-    update_limit: int = Option(7, help="number of update windows"),
+    update_limit: int = Option(4, help="number of update windows"),
 ):
     day_filter(
         observatory=observatory,
-        input_factory=get_miniseed_factory(host=input_host, input_port=input_port),
-        output_factory=get_miniseed_factory(host=output_host, output_port=output_port),
+        channels=("U", "V", "W", "F", "T1", "T2", "T3", "T4"),
+        input_factory=EdgeFactory(
+            host=input_host,
+            port=input_port,
+            type="variation",
+            sncl_mode="geomag",
+        ),
+        output_factory=EdgeFactory(
+            host=output_host,
+            port=output_read_port,
+            write_port=output_port,
+            type="variation",
+            sncl_mode="geomag",
+        ),
         realtime_interval=realtime_interval,
         update_limit=update_limit,
     )
-    temperature_filter(
+    # remove the following after data migration is complete
+    _copy_channels(
         observatory=observatory,
         channels=(
-            ("UK1", "PK1"),
-            ("UK2", "PK2"),
-            ("UK3", "PK3"),
-            ("UK4", "PK4"),
+            ("U", "H"),
+            ("V", "E"),
+            ("W", "Z"),
+            ("F", "F"),
+        ),
+        interval="day",
+        input_factory=EdgeFactory(
+            host=input_host,
+            port=input_port,
+            type="variation",
+            sncl_mode="geomag",
+        ),
+        output_factory=EdgeFactory(
+            host=output_host,
+            port=output_read_port,
+            write_port=7981,  # hard-coded rawinput until migration, then remove
+            type="variation",
+            sncl_mode="legacy",
         ),
-        input_factory=get_edge_factory(host=input_host, input_port=input_port),
-        input_interval="minute",
-        output_factory=get_miniseed_factory(host=output_host, output_port=output_port),
-        output_interval="day",
         realtime_interval=realtime_interval,
         update_limit=update_limit,
     )
@@ -69,30 +93,54 @@ def hour_command(
     observatory: str = Argument(None, help="observatory id"),
     input_host: str = Option("127.0.0.1", help="host to request data from"),
     output_host: str = Option("127.0.0.1", help="host to write data to"),
-    input_port: int = Option(None, help="port to retrieve data through"),
-    output_port: int = Option(None, help="port to write data through"),
+    input_port: int = Option(None, help="port to pull inputs from on input host"),
+    output_read_port: int = Option(None, help="port to check for gaps on output host"),
+    output_port: int = Option(None, help="port to write to on output host"),
     realtime_interval: int = Option(86400, help="length of update window (in seconds)"),
-    update_limit: int = Option(24, help="number of update windows"),
+    update_limit: int = Option(7, help="number of update windows"),
 ):
     hour_filter(
         observatory=observatory,
-        input_factory=get_miniseed_factory(host=input_host, input_port=input_port),
-        output_factory=get_miniseed_factory(host=output_host, output_port=output_port),
+        channels=("U", "V", "W", "F", "T1", "T2", "T3", "T4"),
+        input_factory=EdgeFactory(
+            host=input_host,
+            port=input_port,
+            type="variation",
+            sncl_mode="geomag",
+        ),
+        output_factory=EdgeFactory(
+            host=output_host,
+            port=output_read_port,
+            write_port=output_port,
+            type="variation",
+            sncl_mode="geomag",
+        ),
         realtime_interval=realtime_interval,
         update_limit=update_limit,
     )
-    temperature_filter(
+    # remove the following after data migration is complete
+    _copy_channels(
         observatory=observatory,
         channels=(
-            ("UK1", "RK1"),
-            ("UK2", "RK2"),
-            ("UK3", "RK3"),
-            ("UK4", "RK4"),
+            ("U", "H"),
+            ("V", "E"),
+            ("W", "Z"),
+            ("F", "F"),
+        ),
+        interval="hour",
+        input_factory=EdgeFactory(
+            host=input_host,
+            port=input_port,
+            type="variation",
+            sncl_mode="geomag",
+        ),
+        output_factory=EdgeFactory(
+            host=output_host,
+            port=output_read_port,
+            write_port=7981,  # hard-coded rawinput until migration, then remove
+            type="variation",
+            sncl_mode="legacy",
         ),
-        input_factory=get_edge_factory(host=input_host, input_port=input_port),
-        input_interval="minute",
-        output_factory=get_miniseed_factory(host=output_host, output_port=output_port),
-        output_interval="hour",
         realtime_interval=realtime_interval,
         update_limit=update_limit,
     )
@@ -104,13 +152,14 @@ def hour_command(
     help="""
     ObsRIO:
 
-        Filters 10Hz U,V,W miniseed to 1 second miniseed
+        Filters 10Hz U,V,W to 1 second U,V,W
 
-        Filters 1 second U,V,W,F miniseed to 1 minute miniseed
+        Filters 1 second U,V,W,F to 1 minute U,V,W,F
 
-        Filters 1 second T1-4 miniseed to 1 minute UK1-4 legacy
+        Filters 1 second T1-4 to 1 minute UK1-4
 
-        Copies 1 second and 1 minute U,V,W,F miniseed to H,E,Z,F earthworm
+        Copies 1 second and 1 minute U,V,W,F miniseed to legacy
+        H,E,Z,F 
 
     PCDCP:
 
@@ -124,8 +173,9 @@ def realtime_command(
     observatory: str = Argument(None, help="observatory id"),
     input_host: str = Option("127.0.0.1", help="host to request data from"),
     output_host: str = Option("127.0.0.1", help="host to write data to"),
-    input_port: int = Option(None, help="port to retrieve data through"),
-    output_port: int = Option(None, help="port to write data through"),
+    input_port: int = Option(None, help="port to pull inputs from on input host"),
+    output_read_port: int = Option(None, help="port to check for gaps on output host"),
+    output_port: int = Option(None, help="port to write to on output host"),
     data_format: DataFormat = Option(DataFormat.PCDCP, help="Data acquisition system"),
     realtime_interval: int = Option(600, help="length of update window (in seconds)"),
     update_limit: int = Option(10, help="number of update windows"),
@@ -133,15 +183,56 @@ def realtime_command(
     if data_format == DataFormat.OBSRIO:
         second_filter(
             observatory=observatory,
-            input_factory=get_miniseed_factory(
-                host=input_host, convert_channels=("U", "V", "W"), input_port=input_port
+            input_factory=MiniSeedFactory(
+                host=input_host,
+                port=None,  # use MiniSeedFactory default
+                convert_channels=("U", "V", "W"),
+                type="variation",
+                sncl_mode="geomag",
             ),
-            output_factory=get_miniseed_factory(
-                host=output_host, output_port=output_port
+            output_factory=EdgeFactory(
+                host=output_host,
+                port=output_read_port,
+                write_port=output_port,
+                type="variation",
+                sncl_mode="geomag",
             ),
             realtime_interval=realtime_interval,
             update_limit=update_limit,
         )
+        _copy_channels(
+            # copy 1-sec ObsRIO channels
+            # NOTE: yes, this creates redundant data; however...
+            #       - it is compressed
+            #       - it is integer, so readable by EdgeFactory
+            #       - it provides more "permanent" data, since the
+            #         ObsRIO raw data *may* eventually get pruned
+            observatory=observatory,
+            channels=(
+                ("LFF", "LFF"),
+                ("LK1", "LK1"),
+                ("LK2", "LK2"),
+                ("LK3", "LK3"),
+                ("LK4", "LK4"),
+            ),
+            interval="second",
+            input_factory=MiniSeedFactory(
+                host=input_host,
+                port=None,  # use MiniSeedFactory default
+                type="variation",
+                sncl_mode="geomag",
+            ),
+            output_factory=EdgeFactory(
+                host=output_host,
+                port=output_read_port,
+                write_port=output_port,
+                type="variation",
+                sncl_mode="geomag",
+            ),
+            realtime_interval=realtime_interval,
+            update_limit=update_limit,
+        )
+        # remove the following after data migration is complete
         _copy_channels(
             observatory=observatory,
             channels=(
@@ -151,27 +242,68 @@ def realtime_command(
                 ("F", "F"),
             ),
             interval="second",
-            input_factory=get_miniseed_factory(host=input_host, input_port=input_port),
-            output_factory=get_edge_factory(host=output_host, output_port=output_port),
+            input_factory=EdgeFactory(
+                host=input_host, port=input_port, type="variation", sncl_mode="geomag"
+            ),
+            output_factory=EdgeFactory(
+                host=output_host,
+                port=output_read_port,
+                write_port=7981,  # hard-code port for legacy sncls
+                type="variation",
+                sncl_mode="legacy",
+            ),
             realtime_interval=realtime_interval,
             update_limit=update_limit,
         )
-        temperature_filter(
+
+        minute_filter(
+            observatory=observatory,
+            channels=("U", "V", "W", "F", "T1", "T2", "T3", "T4"),
+            input_factory=EdgeFactory(
+                host=input_host,
+                port=input_port,  # earthworm port required for realtime
+                type="variation",
+                sncl_mode="geomag",
+            ),
+            output_factory=EdgeFactory(
+                host=output_host,
+                port=output_read_port,  # earthworm port required for realtime
+                write_port=output_port,  # rawinput port required for realtime
+                type="variation",
+                sncl_mode="geomag",
+            ),
+            realtime_interval=realtime_interval,
+            update_limit=update_limit,
+        )
+        # remove the following after data migration is complete
+        _copy_channels(
             observatory=observatory,
             channels=(
-                ("LK1", "UK1"),
-                ("LK2", "UK2"),
-                ("LK3", "UK3"),
-                ("LK4", "UK4"),
+                ("U", "H"),
+                ("V", "E"),
+                ("W", "Z"),
+                ("F", "F"),
+            ),
+            interval="minute",
+            input_factory=EdgeFactory(
+                host=input_host,
+                port=input_port,
+                type="variation",
+                sncl_mode="geomag",
+            ),
+            output_factory=EdgeFactory(
+                host=output_host,
+                port=output_read_port,
+                write_port=7981,  # hard-coded rawinput until migration, then remove
+                type="variation",
+                sncl_mode="legacy",
             ),
-            input_factory=get_miniseed_factory(host=input_host, input_port=input_port),
-            input_interval="second",
-            output_factory=get_edge_factory(host=output_host, output_port=output_port),
-            output_interval="minute",
             realtime_interval=realtime_interval,
             update_limit=update_limit,
         )
+
     else:
+        # legacy PCDCP processing
         _copy_channels(
             observatory=observatory,
             channels=(
@@ -181,22 +313,42 @@ def realtime_command(
                 ("F", "F"),
             ),
             interval="second",
-            input_factory=get_edge_factory(host=input_host, input_port=input_port),
-            output_factory=get_miniseed_factory(
-                host=output_host, output_port=output_port
+            input_factory=EdgeFactory(
+                host=input_host,
+                port=input_port,
+                type="variation",
+                sncl_mode="legacy",
+            ),
+            output_factory=EdgeFactory(
+                host=output_host,
+                port=output_read_port,
+                write_port=output_port,
+                type="variation",
+                sncl_mode="geomag",
             ),
             realtime_interval=realtime_interval,
             update_limit=update_limit,
         )
-    minute_filter(
-        observatory=observatory,
-        channels=("U", "V", "W", "F"),
-        input_factory=get_miniseed_factory(host=input_host, input_port=input_port),
-        output_factory=get_miniseed_factory(host=output_host, output_port=output_port),
-        realtime_interval=realtime_interval,
-        update_limit=update_limit,
-    )
-    if data_format == DataFormat.OBSRIO:
+        minute_filter(
+            observatory=observatory,
+            channels=("U", "V", "W", "F"),
+            input_factory=EdgeFactory(
+                host=input_host,
+                port=input_port,
+                type="variation",
+                sncl_mode="geomag",
+            ),
+            output_factory=EdgeFactory(
+                host=output_host,
+                port=output_read_port,
+                write_port=output_port,
+                type="variation",
+                sncl_mode="geomag",
+            ),
+            realtime_interval=realtime_interval,
+            update_limit=update_limit,
+        )
+        # remove the following after data migration is complete
         _copy_channels(
             observatory=observatory,
             channels=(
@@ -206,8 +358,19 @@ def realtime_command(
                 ("F", "F"),
             ),
             interval="minute",
-            input_factory=get_miniseed_factory(host=input_host, input_port=input_port),
-            output_factory=get_edge_factory(host=output_host, output_port=output_port),
+            input_factory=EdgeFactory(
+                host=input_host,
+                port=input_port,
+                type="variation",
+                sncl_mode="geomag",
+            ),
+            output_factory=EdgeFactory(
+                host=output_host,
+                port=output_read_port,
+                write_port=7981,
+                type="variation",
+                sncl_mode="legacy",
+            ),
             realtime_interval=realtime_interval,
             update_limit=update_limit,
         )
@@ -215,11 +378,11 @@ def realtime_command(
 
 def day_filter(
     observatory: str,
-    channels: List[str] = ["U", "V", "W", "F"],
-    input_factory: Optional[TimeseriesFactory] = None,
-    output_factory: Optional[TimeseriesFactory] = None,
-    realtime_interval: int = 86400,
-    update_limit: int = 7,
+    channels: List[str],
+    input_factory: TimeseriesFactory,
+    output_factory: TimeseriesFactory,
+    realtime_interval: int,
+    update_limit: int,
 ):
     """Filter 1 second miniseed channels to 1 day
 
@@ -238,11 +401,11 @@ def day_filter(
     update_limit: int
         number of update windows
     """
-    starttime, endtime = get_realtime_interval(realtime_interval)
+    starttime, endtime = get_realtime_interval(realtime_interval, "day")
     controller = Controller(
-        inputFactory=input_factory or get_miniseed_factory(),
+        inputFactory=input_factory,
         inputInterval="minute",
-        outputFactory=output_factory or get_miniseed_factory(),
+        outputFactory=output_factory,
         outputInterval="day",
     )
     for channel in channels:
@@ -266,11 +429,11 @@ def day_filter(
 
 def hour_filter(
     observatory: str,
-    channels: List[str] = ["U", "V", "W", "F"],
-    input_factory: Optional[TimeseriesFactory] = None,
-    output_factory: Optional[TimeseriesFactory] = None,
-    realtime_interval: int = 600,
-    update_limit: int = 10,
+    channels: List[str],
+    input_factory: TimeseriesFactory,
+    output_factory: TimeseriesFactory,
+    realtime_interval: int,
+    update_limit: int,
 ):
     """Filter 1 minute miniseed channels to 1 hour
 
@@ -289,11 +452,11 @@ def hour_filter(
     update_limit: int
         number of update windows
     """
-    starttime, endtime = get_realtime_interval(realtime_interval)
+    starttime, endtime = get_realtime_interval(realtime_interval, "hour")
     controller = Controller(
-        inputFactory=input_factory or get_miniseed_factory(),
+        inputFactory=input_factory,
         inputInterval="minute",
-        outputFactory=output_factory or get_miniseed_factory(),
+        outputFactory=output_factory,
         outputInterval="hour",
     )
     for channel in channels:
@@ -317,9 +480,9 @@ def hour_filter(
 
 def minute_filter(
     observatory: str,
-    channels: List[str] = ["U", "V", "W", "F"],
-    input_factory: Optional[TimeseriesFactory] = None,
-    output_factory: Optional[TimeseriesFactory] = None,
+    channels: List[str],
+    input_factory: TimeseriesFactory,
+    output_factory: TimeseriesFactory,
     realtime_interval: int = 600,
     update_limit: int = 10,
 ):
@@ -331,6 +494,8 @@ def minute_filter(
         observatory id
     channels: array
         list of channels to filter
+    type: str
+        data type (e.g., "variation", "adjusted", or "R0", "A0")
     input_factory: TimeseriesFactory
         factory to request data
     output_factory: TimeseriesFactory
@@ -340,11 +505,11 @@ def minute_filter(
     update_limit: int
         number of update windows
     """
-    starttime, endtime = get_realtime_interval(realtime_interval)
+    starttime, endtime = get_realtime_interval(realtime_interval, "minute")
     controller = Controller(
-        inputFactory=input_factory or get_miniseed_factory(),
+        inputFactory=input_factory,
         inputInterval="second",
-        outputFactory=output_factory or get_miniseed_factory(),
+        outputFactory=output_factory,
         outputInterval="minute",
     )
     for channel in channels:
@@ -368,6 +533,7 @@ def minute_filter(
 
 def second_filter(
     observatory: str,
+    channels: List[str] = ["U", "V", "W"],
     input_factory: Optional[TimeseriesFactory] = None,
     output_factory: Optional[TimeseriesFactory] = None,
     realtime_interval: int = 600,
@@ -379,6 +545,8 @@ def second_filter(
     -----------
     observatory: str
         observatory id
+    channels: array
+        list of channels to filter
     input_factory: TimeseriesFactory
         factory to request data
     output_factory: TimeseriesFactory
@@ -388,15 +556,14 @@ def second_filter(
     update_limit: int
         number of update windows
     """
-    starttime, endtime = get_realtime_interval(realtime_interval)
+    starttime, endtime = get_realtime_interval(realtime_interval, "second")
     controller = Controller(
-        inputFactory=input_factory
-        or get_miniseed_factory(convert_channels=("U", "V", "W")),
+        inputFactory=input_factory,
         inputInterval="tenhertz",
-        outputFactory=output_factory or get_miniseed_factory(),
+        outputFactory=output_factory,
         outputInterval="second",
     )
-    for channel in ("U", "V", "W"):
+    for channel in channels:
         controller.run_as_update(
             algorithm=FilterAlgorithm(
                 input_sample_period=0.1,
@@ -418,26 +585,26 @@ def second_filter(
 def temperature_filter(
     observatory: str,
     channels: List[List[str]],
-    input_factory: Optional[TimeseriesFactory] = None,
-    input_interval: int = "second",
-    output_factory: Optional[TimeseriesFactory] = None,
-    output_interval: int = "minute",
-    realtime_interval: int = 600,
-    update_limit: int = 10,
+    input_factory: TimeseriesFactory,
+    input_interval: int,
+    output_factory: TimeseriesFactory,
+    output_interval: int,
+    realtime_interval: int,
+    update_limit: int,
 ):
     """Filter temperatures 1Hz miniseed (LK1-4) to 1 minute legacy (UK1-4)."""
-    starttime, endtime = get_realtime_interval(realtime_interval)
+    starttime, endtime = get_realtime_interval(realtime_interval, output_interval)
     controller = Controller(
-        inputFactory=input_factory or get_miniseed_factory(),
+        inputFactory=input_factory,
         inputInterval=input_interval,
-        outputFactory=output_factory or get_edge_factory(),
+        outputFactory=output_factory,
         outputInterval=output_interval,
     )
     for input_channel, output_channel in channels:
         controller.run_as_update(
             algorithm=FilterAlgorithm(
-                input_sample_period=1,
-                output_sample_period=60,
+                input_sample_period=input_interval,
+                output_sample_period=output_interval,
                 inchannels=(input_channel,),
                 outchannels=(output_channel,),
             ),
@@ -482,11 +649,11 @@ def _copy_channels(
     update_limit: int
         number of update windows
     """
-    starttime, endtime = get_realtime_interval(interval_seconds=realtime_interval)
+    starttime, endtime = get_realtime_interval(realtime_interval, interval)
     controller = Controller(
-        inputFactory=input_factory or get_miniseed_factory(),
+        inputFactory=input_factory,
         inputInterval=interval,
-        outputFactory=output_factory or get_edge_factory(),
+        outputFactory=output_factory,
         outputInterval=interval,
     )
     for input_channel, output_channel in channels:
-- 
GitLab