Skip to content
Snippets Groups Projects
obsrio.py 11.6 KiB
Newer Older
from typing import List, Optional
from ..algorithm import Algorithm, FilterAlgorithm
from ..Controller import (
    Controller,
    get_realtime_interval,
)
from ..TimeseriesFactory import TimeseriesFactory
from .factory import get_edge_factory, get_miniseed_factory


def main():
    typer.run(obsrio_filter)
    interval: str,
    observatory: str,
    input_factory: Optional[str] = None,
    host: str = "127.0.0.1",
    port: str = 2061,
    output_factory: Optional[str] = None,
    output_port: int = typer.Option(
        2061, help="Port where output factory writes data."
    ),
    output_read_port: int = typer.Option(
        2061, help="Port where output factory reads data"
    ),
    realtime_interval: int = 600,
    update_limit: int = 10,
):
    if interval == "realtime":
        filter_realtime(
            observatory=observatory,
            input_factory=input_factory,
            host=host,
            port=port,
            output_factory=output_factory,
            output_port=output_port,
            output_read_port=output_read_port,
            realtime_interval=realtime_interval,
            update_limit=update_limit,
        )
    elif interval in ["hour", "day"]:
        input_factory = get_edge_factory(host=host, port=port)
        output_factory = get_miniseed_factory(
            host=host, port=output_read_port, write_port=output_port
        )
        if interval == "hour":
            obsrio_hour(
                observatory=observatory,
                input_factory=input_factory,
                output_factory=output_factory,
                realtime_interval=realtime_interval,
                update_limit=update_limit,
            )
        elif interval == "day":
            obsrio_day(
                observatory=observatory,
                input_factory=input_factory,
                output_factory=output_factory,
                realtime_interval=realtime_interval,
                update_limit=update_limit,
            )
    else:
        raise ValueError("Invalid interval")
Cain, Payton David's avatar
Cain, Payton David committed
def filter_realtime(
    observatory: str,
    input_factory: Optional[str] = None,
    host: str = "127.0.0.1",
    port: str = 2061,
    output_factory: Optional[str] = None,
    output_port: int = typer.Option(
        2061, help="Port where output factory writes data."
    ),
    output_read_port: int = typer.Option(
        2061, help="Port where output factory reads data"
    ),
Cain, Payton David's avatar
Cain, Payton David committed
    realtime_interval: int = 600,
    update_limit: int = 10,
):
    """Filter 10Hz miniseed, 1 second, one minute, and temperature data.
Cain, Payton David's avatar
Cain, Payton David committed
    Defaults set for realtime processing; can also be implemented to update legacy data"""
    if input_factory == "miniseed":
        input_factory = get_miniseed_factory(host=host, port=port)
    elif input_factory == "edge":
        input_factory = get_edge_factory(host=host, port=port)
    if output_factory == "miniseed":
        output_factory = get_miniseed_factory(
            host=host, port=output_read_port, write_port=output_port
        )
    elif output_factory == "edge":
        output_factory = get_edge_factory(
            host=host, port=output_read_port, write_port=output_port
        )
Cain, Payton David's avatar
Cain, Payton David committed
    obsrio_tenhertz(
        observatory=observatory,
        input_factory=input_factory,
        output_factory=output_factory,
        realtime_interval=realtime_interval,
        update_limit=update_limit,
Cain, Payton David's avatar
Cain, Payton David committed
    )
    obsrio_second(
        observatory=observatory,
        input_factory=input_factory,
        output_factory=output_factory,
        realtime_interval=realtime_interval,
        update_limit=update_limit,
Cain, Payton David's avatar
Cain, Payton David committed
    )
    obsrio_minute(
        observatory=observatory,
        input_factory=input_factory,
        output_factory=output_factory,
        realtime_interval=realtime_interval,
        update_limit=update_limit,
Cain, Payton David's avatar
Cain, Payton David committed
    )
    obsrio_temperatures(
        observatory=observatory,
        input_factory=input_factory,
        output_factory=output_factory,
        realtime_interval=realtime_interval,
        update_limit=update_limit,
Cain, Payton David's avatar
Cain, Payton David committed
    )


def obsrio_day(
    observatory: str,
    input_factory: Optional[TimeseriesFactory] = None,
    output_factory: Optional[TimeseriesFactory] = None,
    realtime_interval: int = 86400,
    update_limit: int = 7,
):
    """Filter 1 second edge H,E,Z,F to 1 day miniseed U,V,W,F."""
Cain, Payton David's avatar
Cain, Payton David committed
    starttime, endtime = get_realtime_interval(realtime_interval)
    controller = Controller(
        inputFactory=input_factory or get_edge_factory(),
Cain, Payton David's avatar
Cain, Payton David committed
        inputInterval="minute",
        outputFactory=output_factory or get_miniseed_factory(),
Cain, Payton David's avatar
Cain, Payton David committed
        outputInterval="day",
    )
    renames = {"H": "U", "E": "V", "Z": "W", "F": "F"}
    for input_channel in renames.keys():
        output_channel = renames[input_channel]
        controller.run_as_update(
            algorithm=FilterAlgorithm(
                input_sample_period=60.0,
                output_sample_period=86400.0,
                inchannels=(input_channel,),
                outchannels=(output_channel,),
            ),
Cain, Payton David's avatar
Cain, Payton David committed
            observatory=(observatory,),
            output_observatory=(observatory,),
            starttime=starttime,
            endtime=endtime,
            input_channels=(input_channel,),
            output_channels=(output_channel,),
            realtime=realtime_interval,
            rename_output_channel=((input_channel, output_channel),),
            update_limit=update_limit,
        )


def obsrio_hour(
    observatory: str,
    input_factory: Optional[TimeseriesFactory] = None,
    output_factory: Optional[TimeseriesFactory] = None,
    realtime_interval: int = 600,
    update_limit: int = 10,
):
    """Filter 1 minute edge H,E,Z,F to 1 hour miniseed U,V,W,F."""
Cain, Payton David's avatar
Cain, Payton David committed
    starttime, endtime = get_realtime_interval(realtime_interval)
    controller = Controller(
        inputFactory=input_factory or get_edge_factory(),
Cain, Payton David's avatar
Cain, Payton David committed
        inputInterval="minute",
        outputFactory=output_factory or get_miniseed_factory(),
Cain, Payton David's avatar
Cain, Payton David committed
        outputInterval="hour",
    )
    renames = {"H": "U", "E": "V", "Z": "W", "F": "F"}
    for input_channel in renames.keys():
        output_channel = renames[input_channel]
        controller.run_as_update(
            algorithm=FilterAlgorithm(
                input_sample_period=60.0,
                output_sample_period=3600.0,
                inchannels=(input_channel,),
                outchannels=(output_channel,),
            ),
Cain, Payton David's avatar
Cain, Payton David committed
            observatory=(observatory,),
            output_observatory=(observatory,),
            starttime=starttime,
            endtime=endtime,
            input_channels=(input_channel,),
            output_channels=(output_channel,),
            realtime=realtime_interval,
            rename_output_channel=((input_channel, output_channel),),
            update_limit=update_limit,
        )


def obsrio_minute(
    observatory: str,
    channels: List[str] = ["H", "E", "Z", "F"],
    input_factory: Optional[TimeseriesFactory] = None,
    output_factory: Optional[TimeseriesFactory] = None,
    realtime_interval: int = 600,
    update_limit: int = 10,
    """Filter 1Hz legacy channels to 1 minute legacy.
    For H,E,Z,F: should be called after obsrio_second() and obsrio_tenhertz(),
    which populate 1Hz legacy H,E,Z,F.
    """
    starttime, endtime = get_realtime_interval(realtime_interval)
    controller = Controller(
        inputFactory=input_factory or get_edge_factory(),
        inputInterval="second",
        outputFactory=output_factory or get_edge_factory(),
        outputInterval="minute",
    )
    for channel in channels:
        controller.run_as_update(
            algorithm=FilterAlgorithm(
                input_sample_period=1,
                output_sample_period=60,
                inchannels=(channel,),
                outchannels=(channel,),
            ),
            observatory=(observatory,),
            output_observatory=(observatory,),
            starttime=starttime,
            endtime=endtime,
            input_channels=(channel,),
            output_channels=(channel,),
            realtime=realtime_interval,
            update_limit=update_limit,
        )


def obsrio_second(
    observatory: str,
    input_factory: Optional[TimeseriesFactory] = None,
    output_factory: Optional[TimeseriesFactory] = None,
    realtime_interval: int = 600,
    update_limit: int = 10,
    """Copy 1Hz miniseed F to 1Hz legacy F."""
    starttime, endtime = get_realtime_interval(realtime_interval)
    controller = Controller(
        algorithm=Algorithm(inchannels=("F",), outchannels=("F",)),
        inputFactory=input_factory or get_miniseed_factory(),
        outputFactory=output_factory or get_edge_factory(),
    )
    controller.run_as_update(
        observatory=(observatory,),
        output_observatory=(observatory,),
        starttime=starttime,
        endtime=endtime,
        input_channels=("F",),
        output_channels=("F",),
        realtime=realtime_interval,
        update_limit=update_limit,
    )


def obsrio_temperatures(
    observatory: str,
    input_factory: Optional[TimeseriesFactory] = None,
    output_factory: Optional[TimeseriesFactory] = None,
    realtime_interval: int = 600,
    update_limit: int = 10,
    """Filter temperatures 1Hz miniseed (LK1-4) to 1 minute legacy (UK1-4)."""
    starttime, endtime = get_realtime_interval(realtime_interval)
    controller = Controller(
        inputFactory=input_factory or get_miniseed_factory(),
        inputInterval="second",
        outputFactory=output_factory or get_edge_factory(),
        outputInterval="minute",
    )
    renames = {"LK1": "UK1", "LK2": "UK2", "LK3": "UK3", "LK4": "UK4"}
    for input_channel in renames.keys():
        output_channel = renames[input_channel]
        controller.run_as_update(
            algorithm=FilterAlgorithm(
                input_sample_period=1,
                output_sample_period=60,
                inchannels=(input_channel,),
                outchannels=(output_channel,),
            ),
            observatory=(observatory,),
            output_observatory=(observatory,),
            starttime=starttime,
            endtime=endtime,
            input_channels=(input_channel,),
            output_channels=(output_channel,),
            realtime=realtime_interval,
            rename_output_channel=((input_channel, output_channel),),
            update_limit=update_limit,
        )


def obsrio_tenhertz(
    observatory: str,
    input_factory: Optional[TimeseriesFactory] = None,
    output_factory: Optional[TimeseriesFactory] = None,
    realtime_interval: int = 600,
    update_limit: int = 10,
    """Filter 10Hz miniseed U,V,W to 1Hz legacy H,E,Z."""
    starttime, endtime = get_realtime_interval(realtime_interval)
    controller = Controller(
        inputFactory=input_factory
        or get_miniseed_factory(convert_channels=("U", "V", "W")),
        inputInterval="tenhertz",
        outputFactory=output_factory or get_edge_factory(),
        outputInterval="second",
    )
    renames = {"U": "H", "V": "E", "W": "Z"}
    for input_channel in renames.keys():
        output_channel = renames[input_channel]
        controller.run_as_update(
            algorithm=FilterAlgorithm(
                input_sample_period=0.1,
                output_sample_period=1,
                inchannels=(input_channel,),
                outchannels=(output_channel,),
            ),
            observatory=(observatory,),
            output_observatory=(observatory,),
            starttime=starttime,
            endtime=endtime,
            input_channels=(input_channel,),
            output_channels=(output_channel,),
            realtime=realtime_interval,
            rename_output_channel=((input_channel, output_channel),),
            update_limit=update_limit,
        )