Newer
Older
import typer
from ..adjusted import AdjustedMatrix
from .derived import adjusted
from .factory import get_edge_factory
from .filters import minute_filter
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
app = typer.Typer()
def main():
app()
@app.command(name="realtime")
def efield_realtime(
observatory: str = "BOU",
host: str = "127.0.0.1",
realtime_interval: int = 600,
update_limit: int = 10,
):
"""
inverts polarity of 1Hz E-E/E-N
filters 1Hz inverted/non-inverted E-E/E-N to 1 minute
"""
adjusted(
observatory=observatory,
interval="second",
input_factory=get_edge_factory(host=host, data_type="variation"),
input_channels=["E-E", "E-N"],
output_factory=get_edge_factory(host=host, data_type="adjusted"),
output_channels=["E-E", "E-N"],
matrix=AdjustedMatrix(
matrix=[
[-1, 0, 0],
[0, -1, 0],
[0, 0, 1],
],
),
realtime_interval=realtime_interval,
update_limit=update_limit,
)
observatory=observatory,
channels=["E-E", "E-N"],
input_factory=get_edge_factory(host=host, data_type="variation"),
output_factory=get_edge_factory(host=host, data_type="variation"),
realtime_interval=realtime_interval,
update_limit=update_limit,
)
observatory=observatory,
channels=["E-E", "E-N"],
input_factory=get_edge_factory(host=host, data_type="adjusted"),
output_factory=get_edge_factory(host=host, data_type="adjusted"),
realtime_interval=realtime_interval,
update_limit=update_limit,
)
@app.command(name="hour")
def efield_hour(
observatory: str = "BOU",
host: str = "127.0.0.1",
realtime_interval: int = 600,
update_limit: int = 10,
):
"""filters 1 minute inverted/non-inverted E-E/E-N to 1 hour"""
raise NotImplementedError("hour not implemented")
@app.command(name="day")
def efield_day(
observatory: str = "BOU",
host: str = "127.0.0.1",
realtime_interval: int = 600,
update_limit: int = 10,
):
"""filters 1 minute inverted/non-inverted E-E/E-N to 1 day"""
raise NotImplementedError("day not implemented")