Newer
Older
import os
from typing import Optional
from argparse import Namespace
from ..algorithm import Algorithm, FilterAlgorithm
from ..edge import EdgeFactory, MiniSeedFactory
from ..Controller import (
Controller,
get_input_factory,
get_output_factory,
get_realtime_interval,
)
from ..TimeseriesFactory import TimeseriesFactory
from .factory import get_edge_factory, get_miniseed_factory
def main():
typer.run(filter_realtime)
input_factory: str = "miniseed",
host: str = "127.0.0.1",
port: str = 2061,
output_factory: str = "edge",
write_port: int = 2061,
realtime_interval: int = 600,
update_limit: int = 10,
):
"""Filter 10Hz miniseed, 1 second, one minute, and temperature data.
Defaults set for realtime processing; can also be implemented to update legacy data"""
args = Namespace()
args.input = input_factory
args.input_host = host
args.input_port = port
args.output = output_factory
args.output_port = write_port
input_factory = get_input_factory(args)
output_factory = get_output_factory(args)
obsrio_tenhertz(
observatory, realtime_interval, input_factory, output_factory, update_limit
)
obsrio_second(
observatory, realtime_interval, input_factory, output_factory, update_limit
)
obsrio_minute(
observatory, realtime_interval, input_factory, output_factory, update_limit
)
obsrio_temperatures(
observatory, realtime_interval, input_factory, output_factory, update_limit
)
def obsrio_day(
observatory: str,
input_factory: Optional[TimeseriesFactory] = None,
output_factory: Optional[TimeseriesFactory] = None,
realtime_interval: int = 86400,
update_limit: int = 7,
):
"""Filter 1 second edge H,E,Z,F to 1 day miniseed U,V,W,F."""
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
starttime, endtime = get_realtime_interval(realtime_interval)
# filter 10Hz U,V,W to H,E,Z
controller = Controller(
algorithm=FilterAlgorithm(
input_sample_period=60.0, output_sample_period=86400.0
),
inputFactory=input_factory or get_edge_factory(data_type="variation"),
inputInterval="minute",
outputFactory=output_factory or get_miniseed_factory(data_type="variation"),
outputInterval="day",
)
renames = {"H": "U", "E": "V", "Z": "W", "F": "F"}
for input_channel in renames.keys():
output_channel = renames[input_channel]
controller.run_as_update(
observatory=(observatory,),
output_observatory=(observatory,),
starttime=starttime,
endtime=endtime,
input_channels=(input_channel,),
output_channels=(output_channel,),
realtime=realtime_interval,
rename_output_channel=((input_channel, output_channel),),
update_limit=update_limit,
)
def obsrio_hour(
observatory: str,
input_factory: Optional[TimeseriesFactory] = None,
output_factory: Optional[TimeseriesFactory] = None,
realtime_interval: int = 600,
update_limit: int = 10,
):
"""Filter 1 second edge H,E,Z,F to 1 hour miniseed U,V,W,F."""
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
starttime, endtime = get_realtime_interval(realtime_interval)
# filter 10Hz U,V,W to H,E,Z
controller = Controller(
algorithm=FilterAlgorithm(
input_sample_period=60.0, output_sample_period=3600.0
),
inputFactory=input_factory or get_edge_factory(data_type="variation"),
inputInterval="minute",
outputFactory=output_factory or get_miniseed_factory(data_type="variation"),
outputInterval="hour",
)
renames = {"H": "U", "E": "V", "Z": "W", "F": "F"}
for input_channel in renames.keys():
output_channel = renames[input_channel]
controller.run_as_update(
observatory=(observatory,),
output_observatory=(observatory,),
starttime=starttime,
endtime=endtime,
input_channels=(input_channel,),
output_channels=(output_channel,),
realtime=realtime_interval,
rename_output_channel=((input_channel, output_channel),),
update_limit=update_limit,
)
def obsrio_minute(
observatory: str,
input_factory: Optional[TimeseriesFactory] = None,
output_factory: Optional[TimeseriesFactory] = None,
realtime_interval: int = 600,
update_limit: int = 10,
):
"""Filter 1Hz legacy H,E,Z,F to 1 minute legacy.
Should be called after obsrio_second() and obsrio_tenhertz(),
which populate 1Hz legacy H,E,Z,F.
"""
starttime, endtime = get_realtime_interval(realtime_interval)
controller = Controller(
algorithm=FilterAlgorithm(input_sample_period=1, output_sample_period=60),
inputFactory=input_factory or get_edge_factory(data_type="variation"),
inputInterval="second",
outputFactory=output_factory or get_edge_factory(data_type="variation"),
outputInterval="minute",
)
for channel in ["H", "E", "Z", "F"]:
controller.run_as_update(
observatory=(observatory,),
output_observatory=(observatory,),
starttime=starttime,
endtime=endtime,
input_channels=(channel,),
output_channels=(channel,),
realtime=realtime_interval,
update_limit=update_limit,
)
def obsrio_second(
observatory: str,
input_factory: Optional[TimeseriesFactory] = None,
output_factory: Optional[TimeseriesFactory] = None,
realtime_interval: int = 600,
update_limit: int = 10,
"""Copy 1Hz miniseed F to 1Hz legacy F."""
starttime, endtime = get_realtime_interval(realtime_interval)
controller = Controller(
algorithm=Algorithm(),
inputFactory=input_factory or get_miniseed_factory(data_type="variation"),
inputInterval="second",
outputFactory=output_factory or get_edge_factory(data_type="variation"),
outputInterval="second",
)
controller.run_as_update(
observatory=(observatory,),
output_observatory=(observatory,),
starttime=starttime,
endtime=endtime,
input_channels=("F",),
output_channels=("F",),
realtime=realtime_interval,
update_limit=update_limit,
)
def obsrio_temperatures(
observatory: str,
input_factory: Optional[TimeseriesFactory] = None,
output_factory: Optional[TimeseriesFactory] = None,
realtime_interval: int = 600,
update_limit: int = 10,
"""Filter temperatures 1Hz miniseed (LK1-4) to 1 minute legacy (UK1-4)."""
starttime, endtime = get_realtime_interval(realtime_interval)
controller = Controller(
algorithm=FilterAlgorithm(input_sample_period=1, output_sample_period=60),
inputFactory=input_factory or get_miniseed_factory(data_type="variation"),
inputInterval="second",
outputFactory=output_factory or get_edge_factory(data_type="variation"),
outputInterval="minute",
)
renames = {"LK1": "UK1", "LK2": "UK2", "LK3": "UK3", "LK4": "UK4"}
for input_channel in renames.keys():
output_channel = renames[input_channel]
controller.run_as_update(
observatory=(observatory,),
output_observatory=(observatory,),
starttime=starttime,
endtime=endtime,
input_channels=(input_channel,),
output_channels=(output_channel,),
realtime=realtime_interval,
rename_output_channel=((input_channel, output_channel),),
update_limit=update_limit,
)
def obsrio_tenhertz(
observatory: str,
input_factory: Optional[TimeseriesFactory] = None,
output_factory: Optional[TimeseriesFactory] = None,
realtime_interval: int = 600,
update_limit: int = 10,
"""Filter 10Hz miniseed U,V,W to 1Hz legacy H,E,Z."""
starttime, endtime = get_realtime_interval(realtime_interval)
# filter 10Hz U,V,W to H,E,Z
controller = Controller(
algorithm=FilterAlgorithm(input_sample_period=0.1, output_sample_period=1),
inputFactory=input_factory or get_miniseed_factory(data_type="variation"),
inputInterval="tenhertz",
outputFactory=output_factory or get_edge_factory(data_type="variation"),
outputInterval="second",
)
renames = {"U": "H", "V": "E", "W": "Z"}
for input_channel in renames.keys():
output_channel = renames[input_channel]
controller.run_as_update(
observatory=(observatory,),
output_observatory=(observatory,),
starttime=starttime,
endtime=endtime,
input_channels=(input_channel,),
output_channels=(output_channel,),
realtime=realtime_interval,
rename_output_channel=((input_channel, output_channel),),
update_limit=update_limit,
)