Newer
Older
from typing import List, Optional
from ..adjusted import AdjustedMatrix
from ..algorithm import (
AdjustedAlgorithm,
SqDistAlgorithm,
)
from ..Controller import Controller, get_realtime_interval
from ..TimeseriesFactory import TimeseriesFactory
from .factory import get_edge_factory, get_miniseed_factory
def adjusted(
observatory: str,
input_factory: Optional[TimeseriesFactory] = None,
input_channels: List[str] = ["H", "E", "Z", "F"],
interval: str = "second",
output_factory: Optional[TimeseriesFactory] = None,
output_channels: List[str] = ["X", "Y", "Z", "F"],
matrix: AdjustedMatrix = None,
statefile: Optional[str] = None,
realtime_interval: int = 600,
update_limit: int = 10,
):
"""Run Adjusted algorithm.
Parameters
----------
observatory: observatory to calculate
input_factory: where to read, should be configured with data_type
input_channels: adjusted algorithm input channels
interval: data interval
output_factory: where to write, should be configured with data_type
output_channels: adjusted algorithm output channels
matrix: adjusted matrix
statefile: adjusted statefile
realtime_interval: window in seconds
update_limit: maximum number of windows to backfill
if not statefile and not matrix:
raise ValueError("Either statefile or matrix are required.")
starttime, endtime = get_realtime_interval(realtime_interval)
controller = Controller(
algorithm=AdjustedAlgorithm(
matrix=matrix,
statefile=statefile,
data_type="adjusted",
location="A0",
inchannels=input_channels,
outchannels=output_channels,
),
inputFactory=input_factory or get_edge_factory(data_type="variation"),
inputInterval=interval,
outputFactory=output_factory or get_edge_factory(data_type="adjusted"),
outputInterval=interval,
)
controller.run_as_update(
observatory=(observatory,),
output_observatory=(observatory,),
starttime=starttime,
endtime=endtime,
input_channels=input_channels,
output_channels=output_channels,
realtime=realtime_interval,
update_limit=update_limit,
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def average(
observatories: List[str],
input_channel: str,
input_factory: Optional[TimeseriesFactory] = None,
interval: str = "second",
output_channel: str = None,
output_factory: Optional[TimeseriesFactory] = None,
output_observatory: str = "USGS",
realtime_interval: int = 600,
):
"""Run Average algorithm.
Parameters
----------
observatories: input observatories to calculate
input_channel: channel from multiple observatories to average
input_factory: where to read, should be configured with data_type and interval
interval: data interval
output_channel: channel to write (defaults to input_channel).
output_factory: where to write, should be configured with data_type and interval
output_observatory: observatory where output is written
realtime_interval: window in seconds
Uses update_limit=10.
"""
starttime, endtime = get_realtime_interval(realtime_interval)
controller = Controller(
algorithm=AverageAlgorithm(observatories=observatories, channel=output_channel),
inputFactory=input_factory or get_edge_factory(),
inputInterval=interval,
outputFactory=output_factory or get_edge_factory(),
outputInterval=interval,
)
controller.run_as_update(
observatory=observatories,
output_observatory=(output_observatory,),
starttime=starttime,
endtime=endtime,
output_channels=(output_channel or input_channel,),
realtime=realtime_interval,
update_limit=10,
)
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def sqdist_minute(
observatory: str,
statefile: str,
input_factory: Optional[TimeseriesFactory] = None,
output_factory: Optional[TimeseriesFactory] = None,
realtime_interval: int = 1800,
):
"""Run SqDist algorithm.
Only supports "minute" interval.
Parameters
----------
observatory: observatory to calculate
statefile: sqdist statefile must already exist
input_factory: where to read, should be configured with data_type and interval
output_factory: where to write, should be configured with data_type and interval
realtime_interval: window in seconds
"""
if not statefile:
raise ValueError("Statefile is required.")
starttime, endtime = get_realtime_interval(realtime_interval)
controller = Controller(
algorithm=SqDistAlgorithm(
alpha=2.3148e-5,
gamma=3.3333e-2,
m=1440,
mag=True,
smooth=180,
statefile=statefile,
),
inputFactory=input_factory or get_edge_factory(interval="minute"),
inputInterval="minute",
outputFactory=output_factory or get_edge_factory(interval="minute"),
outputInterval="minute",
)
# sqdist is stateful, use run
controller.run(
observatory=(observatory,),
output_observatory=(observatory,),
starttime=starttime,
endtime=endtime,
input_channels=("X", "Y", "Z", "F"),
output_channels=("MDT", "MSQ", "MSV"),
realtime=realtime_interval,
rename_output_channel=(("H_Dist", "MDT"), ("H_SQ", "MSQ"), ("H_SV", "MSV")),
update_limit=10,
)