Skip to content
Snippets Groups Projects
obsrio.py 8.77 KiB
Newer Older
import os
from typing import Optional

from argparse import Namespace
from ..algorithm import Algorithm, FilterAlgorithm
from ..edge import EdgeFactory, MiniSeedFactory
from ..Controller import (
    Controller,
    get_input_factory,
    get_output_factory,
    get_realtime_interval,
)
from ..TimeseriesFactory import TimeseriesFactory
from .factory import get_edge_factory, get_miniseed_factory


def main():
    typer.run(filter_realtime)


Cain, Payton David's avatar
Cain, Payton David committed
def filter_realtime(
    observatory: str,
    input_factory: str = "miniseed",
    host: str = "127.0.0.1",
    port: str = 2061,
    output_factory: str = "edge",
    write_port: int = 2061,
Cain, Payton David's avatar
Cain, Payton David committed
    realtime_interval: int = 600,
    update_limit: int = 10,
):
    """Filter 10Hz miniseed, 1 second, one minute, and temperature data.
Cain, Payton David's avatar
Cain, Payton David committed
    Defaults set for realtime processing; can also be implemented to update legacy data"""

    args = Namespace()
    args.input = input_factory
    args.input_host = host
    args.input_port = port
    args.output = output_factory
    args.output_port = write_port
    input_factory = get_input_factory(args)
    output_factory = get_output_factory(args)

Cain, Payton David's avatar
Cain, Payton David committed
    obsrio_tenhertz(
        observatory, realtime_interval, input_factory, output_factory, update_limit
    )
    obsrio_second(
        observatory, realtime_interval, input_factory, output_factory, update_limit
    )
    obsrio_minute(
        observatory, realtime_interval, input_factory, output_factory, update_limit
    )
    obsrio_temperatures(
        observatory, realtime_interval, input_factory, output_factory, update_limit
    )


def obsrio_day(
    observatory: str,
    input_factory: Optional[TimeseriesFactory] = None,
    output_factory: Optional[TimeseriesFactory] = None,
    realtime_interval: int = 86400,
    update_limit: int = 7,
):
    """Filter 1 second edge H,E,Z,F to 1 day miniseed U,V,W,F."""
Cain, Payton David's avatar
Cain, Payton David committed
    starttime, endtime = get_realtime_interval(realtime_interval)
    # filter 10Hz U,V,W to H,E,Z
    controller = Controller(
        algorithm=FilterAlgorithm(
            input_sample_period=60.0, output_sample_period=86400.0
        ),
        inputFactory=input_factory or get_edge_factory(data_type="variation"),
        inputInterval="minute",
        outputFactory=output_factory or get_miniseed_factory(data_type="variation"),
        outputInterval="day",
    )
    renames = {"H": "U", "E": "V", "Z": "W", "F": "F"}
    for input_channel in renames.keys():
        output_channel = renames[input_channel]
        controller.run_as_update(
            observatory=(observatory,),
            output_observatory=(observatory,),
            starttime=starttime,
            endtime=endtime,
            input_channels=(input_channel,),
            output_channels=(output_channel,),
            realtime=realtime_interval,
            rename_output_channel=((input_channel, output_channel),),
            update_limit=update_limit,
        )


def obsrio_hour(
    observatory: str,
    input_factory: Optional[TimeseriesFactory] = None,
    output_factory: Optional[TimeseriesFactory] = None,
    realtime_interval: int = 600,
    update_limit: int = 10,
):
    """Filter 1 second edge H,E,Z,F to 1 hour miniseed U,V,W,F."""
Cain, Payton David's avatar
Cain, Payton David committed
    starttime, endtime = get_realtime_interval(realtime_interval)
    # filter 10Hz U,V,W to H,E,Z
    controller = Controller(
        algorithm=FilterAlgorithm(
            input_sample_period=60.0, output_sample_period=3600.0
        ),
        inputFactory=input_factory or get_edge_factory(data_type="variation"),
        inputInterval="minute",
        outputFactory=output_factory or get_miniseed_factory(data_type="variation"),
        outputInterval="hour",
    )
    renames = {"H": "U", "E": "V", "Z": "W", "F": "F"}
    for input_channel in renames.keys():
        output_channel = renames[input_channel]
        controller.run_as_update(
            observatory=(observatory,),
            output_observatory=(observatory,),
            starttime=starttime,
            endtime=endtime,
            input_channels=(input_channel,),
            output_channels=(output_channel,),
            realtime=realtime_interval,
            rename_output_channel=((input_channel, output_channel),),
            update_limit=update_limit,
        )


def obsrio_minute(
    observatory: str,
    input_factory: Optional[TimeseriesFactory] = None,
    output_factory: Optional[TimeseriesFactory] = None,
    realtime_interval: int = 600,
    update_limit: int = 10,
):
    """Filter 1Hz legacy H,E,Z,F to 1 minute legacy.

    Should be called after obsrio_second() and obsrio_tenhertz(),
    which populate 1Hz legacy H,E,Z,F.
    """
    starttime, endtime = get_realtime_interval(realtime_interval)
    controller = Controller(
        algorithm=FilterAlgorithm(input_sample_period=1, output_sample_period=60),
        inputFactory=input_factory or get_edge_factory(data_type="variation"),
        inputInterval="second",
        outputFactory=output_factory or get_edge_factory(data_type="variation"),
        outputInterval="minute",
    )
    for channel in ["H", "E", "Z", "F"]:
        controller.run_as_update(
            observatory=(observatory,),
            output_observatory=(observatory,),
            starttime=starttime,
            endtime=endtime,
            input_channels=(channel,),
            output_channels=(channel,),
            realtime=realtime_interval,
            update_limit=update_limit,
        )


def obsrio_second(
    observatory: str,
    input_factory: Optional[TimeseriesFactory] = None,
    output_factory: Optional[TimeseriesFactory] = None,
    realtime_interval: int = 600,
    update_limit: int = 10,
    """Copy 1Hz miniseed F to 1Hz legacy F."""
    starttime, endtime = get_realtime_interval(realtime_interval)
    controller = Controller(
        algorithm=Algorithm(),
        inputFactory=input_factory or get_miniseed_factory(data_type="variation"),
        inputInterval="second",
        outputFactory=output_factory or get_edge_factory(data_type="variation"),
        outputInterval="second",
    )
    controller.run_as_update(
        observatory=(observatory,),
        output_observatory=(observatory,),
        starttime=starttime,
        endtime=endtime,
        input_channels=("F",),
        output_channels=("F",),
        realtime=realtime_interval,
        update_limit=update_limit,
    )


def obsrio_temperatures(
    observatory: str,
    input_factory: Optional[TimeseriesFactory] = None,
    output_factory: Optional[TimeseriesFactory] = None,
    realtime_interval: int = 600,
    update_limit: int = 10,
    """Filter temperatures 1Hz miniseed (LK1-4) to 1 minute legacy (UK1-4)."""
    starttime, endtime = get_realtime_interval(realtime_interval)
    controller = Controller(
        algorithm=FilterAlgorithm(input_sample_period=1, output_sample_period=60),
        inputFactory=input_factory or get_miniseed_factory(data_type="variation"),
        inputInterval="second",
        outputFactory=output_factory or get_edge_factory(data_type="variation"),
        outputInterval="minute",
    )
    renames = {"LK1": "UK1", "LK2": "UK2", "LK3": "UK3", "LK4": "UK4"}
    for input_channel in renames.keys():
        output_channel = renames[input_channel]
        controller.run_as_update(
            observatory=(observatory,),
            output_observatory=(observatory,),
            starttime=starttime,
            endtime=endtime,
            input_channels=(input_channel,),
            output_channels=(output_channel,),
            realtime=realtime_interval,
            rename_output_channel=((input_channel, output_channel),),
            update_limit=update_limit,
        )


def obsrio_tenhertz(
    observatory: str,
    input_factory: Optional[TimeseriesFactory] = None,
    output_factory: Optional[TimeseriesFactory] = None,
    realtime_interval: int = 600,
    update_limit: int = 10,
    """Filter 10Hz miniseed U,V,W to 1Hz legacy H,E,Z."""
    starttime, endtime = get_realtime_interval(realtime_interval)
    # filter 10Hz U,V,W to H,E,Z
    controller = Controller(
        algorithm=FilterAlgorithm(input_sample_period=0.1, output_sample_period=1),
        inputFactory=input_factory or get_miniseed_factory(data_type="variation"),
        inputInterval="tenhertz",
        outputFactory=output_factory or get_edge_factory(data_type="variation"),
        outputInterval="second",
    )
    renames = {"U": "H", "V": "E", "W": "Z"}
    for input_channel in renames.keys():
        output_channel = renames[input_channel]
        controller.run_as_update(
            observatory=(observatory,),
            output_observatory=(observatory,),
            starttime=starttime,
            endtime=endtime,
            input_channels=(input_channel,),
            output_channels=(output_channel,),
            realtime=realtime_interval,
            rename_output_channel=((input_channel, output_channel),),
            update_limit=update_limit,
        )