Newer
Older
import os
from typing import Optional
from ..algorithm import Algorithm, FilterAlgorithm
from ..edge import EdgeFactory, MiniSeedFactory
from ..Controller import Controller, get_realtime_interval
from ..TimeseriesFactory import TimeseriesFactory
from .factory import get_edge_factory, get_miniseed_factory
def main():
typer.run(filter_realtime)
def filter_realtime(
observatory: str,
input_factory: Optional[TimeseriesFactory] = None,
output_factory: Optional[TimeseriesFactory] = None,
realtime_interval: int = 600,
update_limit: int = 10,
):
"""Filter 10Hz miniseed, 1 second, one minute, and temperature data.
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
Defaults set for realtime processing; can also be implemented to update legacy data"""
obsrio_tenhertz(
observatory, realtime_interval, input_factory, output_factory, update_limit
)
obsrio_second(
observatory, realtime_interval, input_factory, output_factory, update_limit
)
obsrio_minute(
observatory, realtime_interval, input_factory, output_factory, update_limit
)
obsrio_temperatures(
observatory, realtime_interval, input_factory, output_factory, update_limit
)
def obsrio_day(
observatory: str,
input_factory: Optional[TimeseriesFactory] = None,
output_factory: Optional[TimeseriesFactory] = None,
realtime_interval: int = 86400,
update_limit: int = 7,
):
"""Filter 1 second edge H,E,Z,F to 1 day U,V,W,F."""
starttime, endtime = get_realtime_interval(realtime_interval)
# filter 10Hz U,V,W to H,E,Z
controller = Controller(
algorithm=FilterAlgorithm(
input_sample_period=60.0, output_sample_period=86400.0
),
inputFactory=input_factory or get_edge_factory(data_type="variation"),
inputInterval="minute",
outputFactory=output_factory or get_miniseed_factory(data_type="variation"),
outputInterval="day",
)
renames = {"H": "U", "E": "V", "Z": "W", "F": "F"}
for input_channel in renames.keys():
output_channel = renames[input_channel]
controller.run_as_update(
observatory=(observatory,),
output_observatory=(observatory,),
starttime=starttime,
endtime=endtime,
input_channels=(input_channel,),
output_channels=(output_channel,),
realtime=realtime_interval,
rename_output_channel=((input_channel, output_channel),),
update_limit=update_limit,
)
def obsrio_hour(
observatory: str,
input_factory: Optional[TimeseriesFactory] = None,
output_factory: Optional[TimeseriesFactory] = None,
realtime_interval: int = 600,
update_limit: int = 10,
):
"""Filter 1 second edge H,E,Z,F to 1 hour U,V,W,F."""
starttime, endtime = get_realtime_interval(realtime_interval)
# filter 10Hz U,V,W to H,E,Z
controller = Controller(
algorithm=FilterAlgorithm(
input_sample_period=60.0, output_sample_period=3600.0
),
inputFactory=input_factory or get_edge_factory(data_type="variation"),
inputInterval="minute",
outputFactory=output_factory or get_miniseed_factory(data_type="variation"),
outputInterval="hour",
)
renames = {"H": "U", "E": "V", "Z": "W", "F": "F"}
for input_channel in renames.keys():
output_channel = renames[input_channel]
controller.run_as_update(
observatory=(observatory,),
output_observatory=(observatory,),
starttime=starttime,
endtime=endtime,
input_channels=(input_channel,),
output_channels=(output_channel,),
realtime=realtime_interval,
rename_output_channel=((input_channel, output_channel),),
update_limit=update_limit,
)
def obsrio_minute(
observatory: str,
input_factory: Optional[TimeseriesFactory] = None,
output_factory: Optional[TimeseriesFactory] = None,
realtime_interval: int = 600,
update_limit: int = 10,
):
"""Filter 1Hz legacy H,E,Z,F to 1 minute legacy.
Should be called after obsrio_second() and obsrio_tenhertz(),
which populate 1Hz legacy H,E,Z,F.
"""
starttime, endtime = get_realtime_interval(realtime_interval)
controller = Controller(
algorithm=FilterAlgorithm(input_sample_period=1, output_sample_period=60),
inputFactory=input_factory or get_edge_factory(data_type="variation"),
inputInterval="second",
outputFactory=output_factory or get_edge_factory(data_type="variation"),
outputInterval="minute",
)
for channel in ["H", "E", "Z", "F"]:
controller.run_as_update(
observatory=(observatory,),
output_observatory=(observatory,),
starttime=starttime,
endtime=endtime,
input_channels=(channel,),
output_channels=(channel,),
realtime=realtime_interval,
update_limit=update_limit,
)
def obsrio_second(
observatory: str,
input_factory: Optional[TimeseriesFactory] = None,
output_factory: Optional[TimeseriesFactory] = None,
realtime_interval: int = 600,
update_limit: int = 10,
"""Copy 1Hz miniseed F to 1Hz legacy F."""
starttime, endtime = get_realtime_interval(realtime_interval)
controller = Controller(
algorithm=Algorithm(),
inputFactory=input_factory or get_miniseed_factory(data_type="variation"),
inputInterval="second",
outputFactory=output_factory or get_edge_factory(data_type="variation"),
outputInterval="second",
)
controller.run_as_update(
observatory=(observatory,),
output_observatory=(observatory,),
starttime=starttime,
endtime=endtime,
input_channels=("F",),
output_channels=("F",),
realtime=realtime_interval,
update_limit=update_limit,
)
def obsrio_temperatures(
observatory: str,
input_factory: Optional[TimeseriesFactory] = None,
output_factory: Optional[TimeseriesFactory] = None,
realtime_interval: int = 600,
update_limit: int = 10,
"""Filter temperatures 1Hz miniseed (LK1-4) to 1 minute legacy (UK1-4)."""
starttime, endtime = get_realtime_interval(realtime_interval)
controller = Controller(
algorithm=FilterAlgorithm(input_sample_period=1, output_sample_period=60),
inputFactory=input_factory or get_miniseed_factory(data_type="variation"),
inputInterval="second",
outputFactory=output_factory or get_edge_factory(data_type="variation"),
outputInterval="minute",
)
renames = {"LK1": "UK1", "LK2": "UK2", "LK3": "UK3", "LK4": "UK4"}
for input_channel in renames.keys():
output_channel = renames[input_channel]
controller.run_as_update(
observatory=(observatory,),
output_observatory=(observatory,),
starttime=starttime,
endtime=endtime,
input_channels=(input_channel,),
output_channels=(output_channel,),
realtime=realtime_interval,
rename_output_channel=((input_channel, output_channel),),
update_limit=update_limit,
)
def obsrio_tenhertz(
observatory: str,
input_factory: Optional[TimeseriesFactory] = None,
output_factory: Optional[TimeseriesFactory] = None,
realtime_interval: int = 600,
update_limit: int = 10,
"""Filter 10Hz miniseed U,V,W to 1Hz legacy H,E,Z."""
starttime, endtime = get_realtime_interval(realtime_interval)
# filter 10Hz U,V,W to H,E,Z
controller = Controller(
algorithm=FilterAlgorithm(input_sample_period=0.1, output_sample_period=1),
inputFactory=input_factory or get_miniseed_factory(data_type="variation"),
inputInterval="tenhertz",
outputFactory=output_factory or get_edge_factory(data_type="variation"),
outputInterval="second",
)
renames = {"U": "H", "V": "E", "W": "Z"}
for input_channel in renames.keys():
output_channel = renames[input_channel]
controller.run_as_update(
observatory=(observatory,),
output_observatory=(observatory,),
starttime=starttime,
endtime=endtime,
input_channels=(input_channel,),
output_channels=(output_channel,),
realtime=realtime_interval,
rename_output_channel=((input_channel, output_channel),),
update_limit=update_limit,
)