From ad76fa1116d57d8954be2342f922eb3093fc8de3 Mon Sep 17 00:00:00 2001
From: pcain <pcain@usgs.gov>
Date: Mon, 18 Oct 2021 14:29:09 -0600
Subject: [PATCH 1/8] remove obsrio module

---
 geomagio/processing/obsrio.py | 342 ----------------------------------
 1 file changed, 342 deletions(-)
 delete mode 100644 geomagio/processing/obsrio.py

diff --git a/geomagio/processing/obsrio.py b/geomagio/processing/obsrio.py
deleted file mode 100644
index 1d8037fdb..000000000
--- a/geomagio/processing/obsrio.py
+++ /dev/null
@@ -1,342 +0,0 @@
-from typing import List, Optional
-
-import typer
-
-from ..algorithm import Algorithm, FilterAlgorithm
-from ..Controller import (
-    Controller,
-    get_realtime_interval,
-)
-from ..TimeseriesFactory import TimeseriesFactory
-from .factory import get_edge_factory, get_miniseed_factory
-
-
-def main():
-    typer.run(obsrio_filter)
-
-
-def obsrio_filter(
-    interval: str,
-    observatory: str,
-    input_factory: Optional[str] = None,
-    host: str = "127.0.0.1",
-    port: str = 2061,
-    output_factory: Optional[str] = None,
-    output_port: int = typer.Option(
-        2061, help="Port where output factory writes data."
-    ),
-    output_read_port: int = typer.Option(
-        2061, help="Port where output factory reads data"
-    ),
-    realtime_interval: int = 600,
-    update_limit: int = 10,
-):
-    if interval == "realtime":
-        filter_realtime(
-            observatory=observatory,
-            input_factory=input_factory,
-            host=host,
-            port=port,
-            output_factory=output_factory,
-            output_port=output_port,
-            output_read_port=output_read_port,
-            realtime_interval=realtime_interval,
-            update_limit=update_limit,
-        )
-    elif interval in ["hour", "day"]:
-        input_factory = get_edge_factory(host=host, port=port)
-        output_factory = get_miniseed_factory(
-            host=host, port=output_read_port, write_port=output_port
-        )
-        if interval == "hour":
-            obsrio_hour(
-                observatory=observatory,
-                input_factory=input_factory,
-                output_factory=output_factory,
-                realtime_interval=realtime_interval,
-                update_limit=update_limit,
-            )
-        elif interval == "day":
-            obsrio_day(
-                observatory=observatory,
-                input_factory=input_factory,
-                output_factory=output_factory,
-                realtime_interval=realtime_interval,
-                update_limit=update_limit,
-            )
-    else:
-        raise ValueError("Invalid interval")
-
-
-def filter_realtime(
-    observatory: str,
-    input_factory: Optional[str] = None,
-    host: str = "127.0.0.1",
-    port: str = 2061,
-    output_factory: Optional[str] = None,
-    output_port: int = typer.Option(
-        2061, help="Port where output factory writes data."
-    ),
-    output_read_port: int = typer.Option(
-        2061, help="Port where output factory reads data"
-    ),
-    realtime_interval: int = 600,
-    update_limit: int = 10,
-):
-    """Filter 10Hz miniseed, 1 second, one minute, and temperature data.
-    Defaults set for realtime processing; can also be implemented to update legacy data"""
-    if input_factory == "miniseed":
-        input_factory = get_miniseed_factory(host=host, port=port)
-    elif input_factory == "edge":
-        input_factory = get_edge_factory(host=host, port=port)
-    if output_factory == "miniseed":
-        output_factory = get_miniseed_factory(
-            host=host, port=output_read_port, write_port=output_port
-        )
-    elif output_factory == "edge":
-        output_factory = get_edge_factory(
-            host=host, port=output_read_port, write_port=output_port
-        )
-
-    obsrio_tenhertz(
-        observatory=observatory,
-        input_factory=input_factory,
-        output_factory=output_factory,
-        realtime_interval=realtime_interval,
-        update_limit=update_limit,
-    )
-    obsrio_second(
-        observatory=observatory,
-        input_factory=input_factory,
-        output_factory=output_factory,
-        realtime_interval=realtime_interval,
-        update_limit=update_limit,
-    )
-    obsrio_minute(
-        observatory=observatory,
-        input_factory=input_factory,
-        output_factory=output_factory,
-        realtime_interval=realtime_interval,
-        update_limit=update_limit,
-    )
-    obsrio_temperatures(
-        observatory=observatory,
-        input_factory=input_factory,
-        output_factory=output_factory,
-        realtime_interval=realtime_interval,
-        update_limit=update_limit,
-    )
-
-
-def obsrio_day(
-    observatory: str,
-    input_factory: Optional[TimeseriesFactory] = None,
-    output_factory: Optional[TimeseriesFactory] = None,
-    realtime_interval: int = 86400,
-    update_limit: int = 7,
-):
-    """Filter 1 second edge H,E,Z,F to 1 day miniseed U,V,W,F."""
-    starttime, endtime = get_realtime_interval(realtime_interval)
-    controller = Controller(
-        inputFactory=input_factory or get_edge_factory(),
-        inputInterval="minute",
-        outputFactory=output_factory or get_miniseed_factory(),
-        outputInterval="day",
-    )
-    renames = {"H": "U", "E": "V", "Z": "W", "F": "F"}
-    for input_channel in renames.keys():
-        output_channel = renames[input_channel]
-        controller.run_as_update(
-            algorithm=FilterAlgorithm(
-                input_sample_period=60.0,
-                output_sample_period=86400.0,
-                inchannels=(input_channel,),
-                outchannels=(output_channel,),
-            ),
-            observatory=(observatory,),
-            output_observatory=(observatory,),
-            starttime=starttime,
-            endtime=endtime,
-            input_channels=(input_channel,),
-            output_channels=(output_channel,),
-            realtime=realtime_interval,
-            rename_output_channel=((input_channel, output_channel),),
-            update_limit=update_limit,
-        )
-
-
-def obsrio_hour(
-    observatory: str,
-    input_factory: Optional[TimeseriesFactory] = None,
-    output_factory: Optional[TimeseriesFactory] = None,
-    realtime_interval: int = 600,
-    update_limit: int = 10,
-):
-    """Filter 1 minute edge H,E,Z,F to 1 hour miniseed U,V,W,F."""
-    starttime, endtime = get_realtime_interval(realtime_interval)
-    controller = Controller(
-        inputFactory=input_factory or get_edge_factory(),
-        inputInterval="minute",
-        outputFactory=output_factory or get_miniseed_factory(),
-        outputInterval="hour",
-    )
-    renames = {"H": "U", "E": "V", "Z": "W", "F": "F"}
-    for input_channel in renames.keys():
-        output_channel = renames[input_channel]
-        controller.run_as_update(
-            algorithm=FilterAlgorithm(
-                input_sample_period=60.0,
-                output_sample_period=3600.0,
-                inchannels=(input_channel,),
-                outchannels=(output_channel,),
-            ),
-            observatory=(observatory,),
-            output_observatory=(observatory,),
-            starttime=starttime,
-            endtime=endtime,
-            input_channels=(input_channel,),
-            output_channels=(output_channel,),
-            realtime=realtime_interval,
-            rename_output_channel=((input_channel, output_channel),),
-            update_limit=update_limit,
-        )
-
-
-def obsrio_minute(
-    observatory: str,
-    channels: List[str] = ["H", "E", "Z", "F"],
-    input_factory: Optional[TimeseriesFactory] = None,
-    output_factory: Optional[TimeseriesFactory] = None,
-    realtime_interval: int = 600,
-    update_limit: int = 10,
-):
-    """Filter 1Hz legacy channels to 1 minute legacy.
-
-    For H,E,Z,F: should be called after obsrio_second() and obsrio_tenhertz(),
-    which populate 1Hz legacy H,E,Z,F.
-    """
-    starttime, endtime = get_realtime_interval(realtime_interval)
-    controller = Controller(
-        inputFactory=input_factory or get_edge_factory(),
-        inputInterval="second",
-        outputFactory=output_factory or get_edge_factory(),
-        outputInterval="minute",
-    )
-    for channel in channels:
-        controller.run_as_update(
-            algorithm=FilterAlgorithm(
-                input_sample_period=1,
-                output_sample_period=60,
-                inchannels=(channel,),
-                outchannels=(channel,),
-            ),
-            observatory=(observatory,),
-            output_observatory=(observatory,),
-            starttime=starttime,
-            endtime=endtime,
-            input_channels=(channel,),
-            output_channels=(channel,),
-            realtime=realtime_interval,
-            update_limit=update_limit,
-        )
-
-
-def obsrio_second(
-    observatory: str,
-    input_factory: Optional[TimeseriesFactory] = None,
-    output_factory: Optional[TimeseriesFactory] = None,
-    realtime_interval: int = 600,
-    update_limit: int = 10,
-):
-    """Copy 1Hz miniseed F to 1Hz legacy F."""
-    starttime, endtime = get_realtime_interval(realtime_interval)
-    controller = Controller(
-        algorithm=Algorithm(inchannels=("F",), outchannels=("F",)),
-        inputFactory=input_factory or get_miniseed_factory(),
-        outputFactory=output_factory or get_edge_factory(),
-    )
-    controller.run_as_update(
-        observatory=(observatory,),
-        output_observatory=(observatory,),
-        starttime=starttime,
-        endtime=endtime,
-        input_channels=("F",),
-        output_channels=("F",),
-        realtime=realtime_interval,
-        update_limit=update_limit,
-    )
-
-
-def obsrio_temperatures(
-    observatory: str,
-    input_factory: Optional[TimeseriesFactory] = None,
-    output_factory: Optional[TimeseriesFactory] = None,
-    realtime_interval: int = 600,
-    update_limit: int = 10,
-):
-    """Filter temperatures 1Hz miniseed (LK1-4) to 1 minute legacy (UK1-4)."""
-    starttime, endtime = get_realtime_interval(realtime_interval)
-    controller = Controller(
-        inputFactory=input_factory or get_miniseed_factory(),
-        inputInterval="second",
-        outputFactory=output_factory or get_edge_factory(),
-        outputInterval="minute",
-    )
-    renames = {"LK1": "UK1", "LK2": "UK2", "LK3": "UK3", "LK4": "UK4"}
-    for input_channel in renames.keys():
-        output_channel = renames[input_channel]
-        controller.run_as_update(
-            algorithm=FilterAlgorithm(
-                input_sample_period=1,
-                output_sample_period=60,
-                inchannels=(input_channel,),
-                outchannels=(output_channel,),
-            ),
-            observatory=(observatory,),
-            output_observatory=(observatory,),
-            starttime=starttime,
-            endtime=endtime,
-            input_channels=(input_channel,),
-            output_channels=(output_channel,),
-            realtime=realtime_interval,
-            rename_output_channel=((input_channel, output_channel),),
-            update_limit=update_limit,
-        )
-
-
-def obsrio_tenhertz(
-    observatory: str,
-    input_factory: Optional[TimeseriesFactory] = None,
-    output_factory: Optional[TimeseriesFactory] = None,
-    realtime_interval: int = 600,
-    update_limit: int = 10,
-):
-    """Filter 10Hz miniseed U,V,W to 1Hz legacy H,E,Z."""
-    starttime, endtime = get_realtime_interval(realtime_interval)
-    controller = Controller(
-        inputFactory=input_factory
-        or get_miniseed_factory(convert_channels=("U", "V", "W")),
-        inputInterval="tenhertz",
-        outputFactory=output_factory or get_edge_factory(),
-        outputInterval="second",
-    )
-    renames = {"U": "H", "V": "E", "W": "Z"}
-    for input_channel in renames.keys():
-        output_channel = renames[input_channel]
-        controller.run_as_update(
-            algorithm=FilterAlgorithm(
-                input_sample_period=0.1,
-                output_sample_period=1,
-                inchannels=(input_channel,),
-                outchannels=(output_channel,),
-            ),
-            observatory=(observatory,),
-            output_observatory=(observatory,),
-            starttime=starttime,
-            endtime=endtime,
-            input_channels=(input_channel,),
-            output_channels=(output_channel,),
-            realtime=realtime_interval,
-            rename_output_channel=((input_channel, output_channel),),
-            update_limit=update_limit,
-        )
-- 
GitLab


From 9562f70f06d6301063376c9584786b7430422b78 Mon Sep 17 00:00:00 2001
From: pcain <pcain@usgs.gov>
Date: Mon, 18 Oct 2021 14:29:21 -0600
Subject: [PATCH 2/8] update factory module type hints

---
 geomagio/processing/factory.py | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/geomagio/processing/factory.py b/geomagio/processing/factory.py
index 6e741d469..481b0459b 100644
--- a/geomagio/processing/factory.py
+++ b/geomagio/processing/factory.py
@@ -1,6 +1,5 @@
 import os
 
-from ..TimeseriesFactory import TimeseriesFactory
 from ..edge import EdgeFactory, MiniSeedFactory
 
 
@@ -9,7 +8,7 @@ def get_edge_factory(
     host=os.getenv("EDGE_HOST", "127.0.0.1"),
     interval="second",
     **kwargs
-) -> TimeseriesFactory:
+) -> EdgeFactory:
     return EdgeFactory(host=host, interval=interval, type=data_type, **kwargs)
 
 
@@ -18,5 +17,5 @@ def get_miniseed_factory(
     host=os.getenv("EDGE_HOST", "127.0.0.1"),
     interval="second",
     **kwargs
-) -> TimeseriesFactory:
+) -> MiniSeedFactory:
     return MiniSeedFactory(host=host, interval=interval, type=data_type, **kwargs)
-- 
GitLab


From b64a887e97e306d4dc885df15611877791dea23c Mon Sep 17 00:00:00 2001
From: pcain <pcain@usgs.gov>
Date: Mon, 18 Oct 2021 14:29:34 -0600
Subject: [PATCH 3/8] add filter application

---
 geomagio/processing/filters.py | 435 +++++++++++++++++++++++++++++++++
 1 file changed, 435 insertions(+)
 create mode 100644 geomagio/processing/filters.py

diff --git a/geomagio/processing/filters.py b/geomagio/processing/filters.py
new file mode 100644
index 000000000..0decbfb59
--- /dev/null
+++ b/geomagio/processing/filters.py
@@ -0,0 +1,435 @@
+from enum import Enum
+from typing import List, Optional
+
+from typer import Argument, Option, Typer
+
+from ..algorithm import Algorithm, FilterAlgorithm
+from ..Controller import Controller, get_realtime_interval
+from ..geomag_types import DataInterval
+from ..TimeseriesFactory import TimeseriesFactory
+from .factory import get_edge_factory, get_miniseed_factory
+
+
+class DataFormat(str, Enum):
+    OBSRIO = "OBSRIO"
+    PCDCP = "PCDCP"
+
+
+app = Typer(help="Filter geomagnetic timeseries data")
+
+
+def main():
+    app()
+
+
+@app.command(
+    name="day",
+    help="Filter 1 day nT/temperature data",
+)
+def day_command(
+    observatory: str = Argument(None, help="observatory id"),
+    input_host: str = Option("127.0.0.1", help="host to request data"),
+    output_host: str = Option("127.0.0.1", help="host to write data"),
+    realtime_interval: int = Option(86400, help="length of update window"),
+    update_limit: int = Option(7, help="number of update windows"),
+):
+    day_filter(
+        observatory=observatory,
+        input_factory=get_miniseed_factory(host=input_host),
+        output_factory=get_miniseed_factory(host=output_host),
+        realtime_interval=realtime_interval,
+        update_limit=update_limit,
+    )
+
+
+@app.command(
+    name="hour",
+    help="Filter 1 hour nT/temperature data",
+)
+def hour_command(
+    observatory: str = Argument(None, help="observatory id"),
+    input_host: str = Option("127.0.0.1", help="host to request data"),
+    output_host: str = Option("127.0.0.1", help="host to write data"),
+    realtime_interval: int = Option(3600, help="length of update window"),
+    update_limit: int = Option(24, help="number of update windows"),
+):
+    hour_filter(
+        observatory=observatory,
+        input_factory=get_miniseed_factory(host=input_host),
+        output_factory=get_miniseed_factory(host=output_host),
+        realtime_interval=realtime_interval,
+        update_limit=update_limit,
+    )
+
+
+@app.command(
+    name="realtime",
+    short_help="Filter 1 second and 1 minute nT/temperature data",
+    help="""
+    ObsRIO:
+        Filters 10Hz U,V,W miniseed to 1 second miniseed
+        Filters 1 second U,V,W,F miniseed to 1 minute miniseed
+        Filters 1 second T1-4 miniseed to 1 minute miniseed
+        Copies 1 second and 1 minute U,V,W,F,T1-4 miniseed to H,E,Z,F,UK1-4 earthworm
+    PCDCP:
+        Copies 1 second H,E,Z,F earthworm to U,V,W,F miniseed
+        Copies 1 minute UK1-4 earthworm to T1-4 miniseed
+        Filters 1 second U,V,W,F miniseed to 1 minute miniseed
+    """,
+)
+def realtime_command(
+    observatory: str = Argument(None, help="observatory id"),
+    input_host: str = Option("127.0.0.1", help="host to request data"),
+    output_host: str = Option("127.0.0.1", help="host to write data"),
+    data_format: DataFormat = Option(DataFormat.PCDCP, help="Data acquisition system"),
+    realtime_interval: int = Option(600, help="length of update window"),
+    update_limit: int = Option(10, help="number of update windows"),
+):
+    if data_format == DataFormat.OBSRIO:
+        second_filter(
+            observatory=observatory,
+            input_factory=get_miniseed_factory(
+                host=input_host, convert_channels=("U", "V", "W")
+            ),
+            output_factory=get_miniseed_factory(host=output_host),
+            realtime_interval=realtime_interval,
+            update_limit=update_limit,
+        )
+        _copy_channels(
+            observatory=observatory,
+            channels=(
+                ("U", "H"),
+                ("V", "E"),
+                ("W", "Z"),
+                ("F", "F"),
+                ("T1", "LK1"),
+                ("T2", "LK2"),
+                ("T3", "LK3"),
+                ("T4", "LK4"),
+            ),
+            interval="second",
+            input_factory=get_miniseed_factory(host=input_host),
+            output_factory=get_edge_factory(host=output_host),
+            realtime_interval=realtime_interval,
+            update_limit=update_limit,
+        )
+        minute_filter(
+            observatory=observatory,
+            channels=("T1", "T2", "T3", "T4"),
+            input_factory=get_miniseed_factory(host=input_host),
+            output_factory=get_miniseed_factory(host=output_host),
+            realtime_interval=realtime_interval,
+            update_limit=update_limit,
+        )
+    else:
+        _copy_channels(
+            observatory=observatory,
+            channels=(
+                ("H", "U"),
+                ("E", "V"),
+                ("Z", "W"),
+                ("F", "F"),
+            ),
+            interval="second",
+            input_factory=get_edge_factory(host=input_host),
+            output_factory=get_miniseed_factory(host=output_host),
+            realtime_interval=realtime_interval,
+            update_limit=update_limit,
+        )
+        _copy_channels(
+            observatory=observatory,
+            channels=(
+                ("UK1", "T1"),
+                ("UK2", "T2"),
+                ("UK3", "T3"),
+                ("UK4", "T4"),
+            ),
+            interval="minute",
+            input_factory=get_edge_factory(host=input_host),
+            output_factory=get_miniseed_factory(host=output_host),
+            realtime_interval=realtime_interval,
+            update_limit=update_limit,
+        )
+    minute_filter(
+        observatory=observatory,
+        channels=("U", "V", "W", "F"),
+        input_factory=get_miniseed_factory(host=input_host),
+        output_factory=get_miniseed_factory(host=output_host),
+        realtime_interval=realtime_interval,
+        update_limit=update_limit,
+    )
+    if data_format == DataFormat.OBSRIO:
+        _copy_channels(
+            observatory=observatory,
+            channels=(
+                ("U", "H"),
+                ("V", "E"),
+                ("W", "Z"),
+                ("F", "F"),
+                ("T1", "UK1"),
+                ("T2", "UK2"),
+                ("T3", "UK3"),
+                ("T4", "UK4"),
+            ),
+            interval="minute",
+            input_factory=get_miniseed_factory(host=input_host),
+            output_factory=get_edge_factory(host=output_host),
+            realtime_interval=realtime_interval,
+            update_limit=update_limit,
+        )
+
+
+def day_filter(
+    observatory: str,
+    channels: List[str] = ["U", "V", "W", "F", "T1", "T2", "T3", "T4"],
+    input_factory: Optional[TimeseriesFactory] = None,
+    output_factory: Optional[TimeseriesFactory] = None,
+    realtime_interval: int = 86400,
+    update_limit: int = 7,
+):
+    """Filter 1 second miniseed channels to 1 day
+
+    Parameters:
+    -----------
+    observatory: str
+        observatory id
+    channels: array
+        list of channels to filter
+    input_factory: TimeseriesFactory
+        factory to request data
+    output_factory: TimeseriesFactory
+        factory to write data
+    realtime_interval: int
+        length of update window
+    update_limit: int
+        number of update windows
+    """
+    starttime, endtime = get_realtime_interval(realtime_interval)
+    controller = Controller(
+        inputFactory=input_factory or get_miniseed_factory(),
+        inputInterval="minute",
+        outputFactory=output_factory or get_miniseed_factory(),
+        outputInterval="day",
+    )
+    for channel in channels:
+        controller.run_as_update(
+            algorithm=FilterAlgorithm(
+                input_sample_period=60.0,
+                output_sample_period=86400.0,
+                inchannels=(channel,),
+                outchannels=(channel,),
+            ),
+            observatory=(observatory,),
+            output_observatory=(observatory,),
+            starttime=starttime,
+            endtime=endtime,
+            input_channels=(channel,),
+            output_channels=(channel,),
+            realtime=realtime_interval,
+            update_limit=update_limit,
+        )
+
+
+def hour_filter(
+    observatory: str,
+    channels: List[str] = ["U", "V", "W", "F", "T1", "T2", "T3", "T4"],
+    input_factory: Optional[TimeseriesFactory] = None,
+    output_factory: Optional[TimeseriesFactory] = None,
+    realtime_interval: int = 600,
+    update_limit: int = 10,
+):
+    """Filter 1 minute miniseed channels to 1 hour
+
+    Parameters:
+    -----------
+    observatory: str
+        observatory id
+    channels: array
+        list of channels to filter
+    input_factory: TimeseriesFactory
+        factory to request data
+    output_factory: TimeseriesFactory
+        factory to write data
+    realtime_interval: int
+        length of update window
+    update_limit: int
+        number of update windows
+    """
+    starttime, endtime = get_realtime_interval(realtime_interval)
+    controller = Controller(
+        inputFactory=input_factory or get_miniseed_factory(),
+        inputInterval="minute",
+        outputFactory=output_factory or get_miniseed_factory(),
+        outputInterval="hour",
+    )
+    for channel in channels:
+        controller.run_as_update(
+            algorithm=FilterAlgorithm(
+                input_sample_period=60.0,
+                output_sample_period=3600.0,
+                inchannels=(channel,),
+                outchannels=(channel,),
+            ),
+            observatory=(observatory,),
+            output_observatory=(observatory,),
+            starttime=starttime,
+            endtime=endtime,
+            input_channels=(channel,),
+            output_channels=(channel,),
+            realtime=realtime_interval,
+            update_limit=update_limit,
+        )
+
+
+def minute_filter(
+    observatory: str,
+    channels: List[str] = ["U", "V", "W", "F"],
+    input_factory: Optional[TimeseriesFactory] = None,
+    output_factory: Optional[TimeseriesFactory] = None,
+    realtime_interval: int = 600,
+    update_limit: int = 10,
+):
+    """Filter 1 second miniseed channels to 1 minute
+
+    Parameters:
+    -----------
+    observatory: str
+        observatory id
+    channels: array
+        list of channels to filter
+    input_factory: TimeseriesFactory
+        factory to request data
+    output_factory: TimeseriesFactory
+        factory to write data
+    realtime_interval: int
+        length of update window
+    update_limit: int
+        number of update windows
+    """
+    starttime, endtime = get_realtime_interval(realtime_interval)
+    controller = Controller(
+        inputFactory=input_factory or get_miniseed_factory(),
+        inputInterval="second",
+        outputFactory=output_factory or get_miniseed_factory(),
+        outputInterval="minute",
+    )
+    for channel in channels:
+        controller.run_as_update(
+            algorithm=FilterAlgorithm(
+                input_sample_period=1,
+                output_sample_period=60,
+                inchannels=(channel,),
+                outchannels=(channel,),
+            ),
+            observatory=(observatory,),
+            output_observatory=(observatory,),
+            starttime=starttime,
+            endtime=endtime,
+            input_channels=(channel,),
+            output_channels=(channel,),
+            realtime=realtime_interval,
+            update_limit=update_limit,
+        )
+
+
+def second_filter(
+    observatory: str,
+    input_factory: Optional[TimeseriesFactory] = None,
+    output_factory: Optional[TimeseriesFactory] = None,
+    realtime_interval: int = 600,
+    update_limit: int = 10,
+):
+    """Filter 10Hz miniseed U,V,W to 1 second
+
+    Parameters:
+    -----------
+    observatory: str
+        observatory id
+    input_factory: TimeseriesFactory
+        factory to request data
+    output_factory: TimeseriesFactory
+        factory to write data
+    realtime_interval: int
+        length of update window
+    update_limit: int
+        number of update windows
+    """
+    starttime, endtime = get_realtime_interval(realtime_interval)
+    controller = Controller(
+        inputFactory=input_factory
+        or get_miniseed_factory(convert_channels=("U", "V", "W")),
+        inputInterval="tenhertz",
+        outputFactory=output_factory or get_miniseed_factory(),
+        outputInterval="second",
+    )
+    for channel in ("U", "V", "W"):
+        controller.run_as_update(
+            algorithm=FilterAlgorithm(
+                input_sample_period=0.1,
+                output_sample_period=1,
+                inchannels=(channel,),
+                outchannels=(channel,),
+            ),
+            observatory=(observatory,),
+            output_observatory=(observatory,),
+            starttime=starttime,
+            endtime=endtime,
+            input_channels=(channel,),
+            output_channels=(channel,),
+            realtime=realtime_interval,
+            update_limit=update_limit,
+        )
+
+
+def _copy_channels(
+    observatory: str,
+    channels: List[List[str]],
+    interval: DataInterval,
+    input_factory: Optional[TimeseriesFactory],
+    output_factory: Optional[TimeseriesFactory],
+    realtime_interval: int = 600,
+    update_limit: int = 10,
+):
+    """copy channels between earthworm and miniseed formats
+
+    Parameters:
+    -----------
+    observatory: str
+        observatory id
+    channels: array
+        list of channel conversions
+        format: ((input_channel_1, output_channel_1), ...)
+    interval: {tenhertz, second, minute, hour, day}
+        data interval
+    input_factory: TimeseriesFactory
+        factory to request data
+    output_factory: TimeseriesFactory
+        factory to write data
+    realtime_interval: int
+        length of update window
+    update_limit: int
+        number of update windows
+    """
+    starttime, endtime = get_realtime_interval(interval_seconds=realtime_interval)
+    controller = Controller(
+        inputFactory=input_factory or get_miniseed_factory(),
+        inputInterval=interval,
+        outputFactory=output_factory or get_edge_factory(),
+        outputInterval=interval,
+    )
+    for input_channel, output_channel in channels:
+        controller.run_as_update(
+            algorithm=Algorithm(
+                inchannels=(input_channel,),
+                outchannels=(output_channel,),
+            ),
+            observatory=(observatory,),
+            output_observatory=(observatory,),
+            starttime=starttime,
+            endtime=endtime,
+            input_channels=(input_channel,),
+            output_channels=(output_channel,),
+            rename_output_channel=((input_channel, output_channel),),
+            realtime=realtime_interval,
+            update_limit=update_limit,
+        )
-- 
GitLab


From defaa8b204aea84422d7a28160dcda1768617687 Mon Sep 17 00:00:00 2001
From: pcain <pcain@usgs.gov>
Date: Mon, 18 Oct 2021 14:29:50 -0600
Subject: [PATCH 4/8] add filter application to pyproject.toml

---
 pyproject.toml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pyproject.toml b/pyproject.toml
index ffbd9204c..c41865f9c 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -71,4 +71,4 @@ geomag-monitor = "geomagio.processing.monitor:main"
 geomag-py = "geomagio.Controller:main"
 magproc-prepfiles = "geomagio.processing.magproc:main"
 make-cal = "geomagio.processing.make_cal:main"
-obsrio-filter = "geomagio.processing.obsrio:main"
+geomag-filter = "geomagio.processing.filters:main"
-- 
GitLab


From f80b3eb9bdf32b43040323b1b7ff507bc52f8b07 Mon Sep 17 00:00:00 2001
From: pcain <pcain@usgs.gov>
Date: Mon, 18 Oct 2021 14:30:03 -0600
Subject: [PATCH 5/8] update filter methods in processing __init__

---
 geomagio/processing/__init__.py | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/geomagio/processing/__init__.py b/geomagio/processing/__init__.py
index 77f23254b..573ec1ae8 100644
--- a/geomagio/processing/__init__.py
+++ b/geomagio/processing/__init__.py
@@ -5,7 +5,7 @@ and should be considered less stable than other packages in the library.
 """
 from .factory import get_edge_factory, get_miniseed_factory
 from .derived import adjusted, average, sqdist_minute
-from .obsrio import obsrio_minute, obsrio_second, obsrio_temperatures, obsrio_tenhertz
+from .filters import minute_filter, second_filter
 
 
 __all__ = [
@@ -13,9 +13,7 @@ __all__ = [
     "average",
     "get_edge_factory",
     "get_miniseed_factory",
-    "obsrio_minute",
-    "obsrio_second",
-    "obsrio_temperatures",
-    "obsrio_tenhertz",
+    "minute_filter",
+    "second_filter",
     "sqdist_minute",
 ]
-- 
GitLab


From 2ddb2a25d3c01eee3b22e6d459cc5a621881957d Mon Sep 17 00:00:00 2001
From: pcain <pcain@usgs.gov>
Date: Mon, 18 Oct 2021 14:30:19 -0600
Subject: [PATCH 6/8] update filter methods in efield entrypoint

---
 geomagio/processing/efield.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/geomagio/processing/efield.py b/geomagio/processing/efield.py
index 0bd3e6980..df3f345dd 100644
--- a/geomagio/processing/efield.py
+++ b/geomagio/processing/efield.py
@@ -3,7 +3,7 @@ import typer
 from ..adjusted import AdjustedMatrix
 from .derived import adjusted
 from .factory import get_edge_factory
-from .obsrio import obsrio_minute
+from .filters import minute_filter
 
 app = typer.Typer()
 
@@ -40,7 +40,7 @@ def efield_realtime(
         realtime_interval=realtime_interval,
         update_limit=update_limit,
     )
-    obsrio_minute(
+    minute_filter(
         observatory=observatory,
         channels=["E-E", "E-N"],
         input_factory=get_edge_factory(host=host, data_type="variation"),
@@ -48,7 +48,7 @@ def efield_realtime(
         realtime_interval=realtime_interval,
         update_limit=update_limit,
     )
-    obsrio_minute(
+    minute_filter(
         observatory=observatory,
         channels=["E-E", "E-N"],
         input_factory=get_edge_factory(host=host, data_type="adjusted"),
-- 
GitLab


From 40fb8c66fb3a5476d99ec42006623152176710b0 Mon Sep 17 00:00:00 2001
From: pcain <pcain@usgs.gov>
Date: Thu, 4 Nov 2021 16:50:23 -0600
Subject: [PATCH 7/8] add spacing to realtime filter docstring

---
 geomagio/processing/filters.py | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/geomagio/processing/filters.py b/geomagio/processing/filters.py
index 0decbfb59..0a727c4ac 100644
--- a/geomagio/processing/filters.py
+++ b/geomagio/processing/filters.py
@@ -67,14 +67,23 @@ def hour_command(
     short_help="Filter 1 second and 1 minute nT/temperature data",
     help="""
     ObsRIO:
+
         Filters 10Hz U,V,W miniseed to 1 second miniseed
+
         Filters 1 second U,V,W,F miniseed to 1 minute miniseed
+
         Filters 1 second T1-4 miniseed to 1 minute miniseed
+
         Copies 1 second and 1 minute U,V,W,F,T1-4 miniseed to H,E,Z,F,UK1-4 earthworm
+
     PCDCP:
+
         Copies 1 second H,E,Z,F earthworm to U,V,W,F miniseed
+
         Copies 1 minute UK1-4 earthworm to T1-4 miniseed
+
         Filters 1 second U,V,W,F miniseed to 1 minute miniseed
+
     """,
 )
 def realtime_command(
-- 
GitLab


From edf3aad4971e8bbec5c8ade9dc62195ca73525f1 Mon Sep 17 00:00:00 2001
From: pcain <pcain@usgs.gov>
Date: Fri, 5 Nov 2021 18:10:40 +0000
Subject: [PATCH 8/8] update filter application docstrings

---
 geomagio/processing/filters.py | 28 ++++++++++++++--------------
 1 file changed, 14 insertions(+), 14 deletions(-)

diff --git a/geomagio/processing/filters.py b/geomagio/processing/filters.py
index 0a727c4ac..b36b07198 100644
--- a/geomagio/processing/filters.py
+++ b/geomagio/processing/filters.py
@@ -28,9 +28,9 @@ def main():
 )
 def day_command(
     observatory: str = Argument(None, help="observatory id"),
-    input_host: str = Option("127.0.0.1", help="host to request data"),
-    output_host: str = Option("127.0.0.1", help="host to write data"),
-    realtime_interval: int = Option(86400, help="length of update window"),
+    input_host: str = Option("127.0.0.1", help="host to request data from"),
+    output_host: str = Option("127.0.0.1", help="host to write data to"),
+    realtime_interval: int = Option(86400, help="length of update window (in seconds)"),
     update_limit: int = Option(7, help="number of update windows"),
 ):
     day_filter(
@@ -48,9 +48,9 @@ def day_command(
 )
 def hour_command(
     observatory: str = Argument(None, help="observatory id"),
-    input_host: str = Option("127.0.0.1", help="host to request data"),
-    output_host: str = Option("127.0.0.1", help="host to write data"),
-    realtime_interval: int = Option(3600, help="length of update window"),
+    input_host: str = Option("127.0.0.1", help="host to request data from"),
+    output_host: str = Option("127.0.0.1", help="host to write data to"),
+    realtime_interval: int = Option(3600, help="length of update window (in seconds)"),
     update_limit: int = Option(24, help="number of update windows"),
 ):
     hour_filter(
@@ -88,10 +88,10 @@ def hour_command(
 )
 def realtime_command(
     observatory: str = Argument(None, help="observatory id"),
-    input_host: str = Option("127.0.0.1", help="host to request data"),
-    output_host: str = Option("127.0.0.1", help="host to write data"),
+    input_host: str = Option("127.0.0.1", help="host to request data from"),
+    output_host: str = Option("127.0.0.1", help="host to write data to"),
     data_format: DataFormat = Option(DataFormat.PCDCP, help="Data acquisition system"),
-    realtime_interval: int = Option(600, help="length of update window"),
+    realtime_interval: int = Option(600, help="length of update window (in seconds)"),
     update_limit: int = Option(10, help="number of update windows"),
 ):
     if data_format == DataFormat.OBSRIO:
@@ -209,7 +209,7 @@ def day_filter(
     output_factory: TimeseriesFactory
         factory to write data
     realtime_interval: int
-        length of update window
+        length of update window (in seconds)
     update_limit: int
         number of update windows
     """
@@ -260,7 +260,7 @@ def hour_filter(
     output_factory: TimeseriesFactory
         factory to write data
     realtime_interval: int
-        length of update window
+        length of update window (in seconds)
     update_limit: int
         number of update windows
     """
@@ -311,7 +311,7 @@ def minute_filter(
     output_factory: TimeseriesFactory
         factory to write data
     realtime_interval: int
-        length of update window
+        length of update window (in seconds)
     update_limit: int
         number of update windows
     """
@@ -359,7 +359,7 @@ def second_filter(
     output_factory: TimeseriesFactory
         factory to write data
     realtime_interval: int
-        length of update window
+        length of update window (in seconds)
     update_limit: int
         number of update windows
     """
@@ -415,7 +415,7 @@ def _copy_channels(
     output_factory: TimeseriesFactory
         factory to write data
     realtime_interval: int
-        length of update window
+        length of update window (in seconds)
     update_limit: int
         number of update windows
     """
-- 
GitLab