diff --git a/geomagio/TimeseriesUtility.py b/geomagio/TimeseriesUtility.py index e0729c21b81796707a06cff1184765aeab5be29c..c294cebe8dc27aa9fe238f8abceb17e71fdd7adf 100644 --- a/geomagio/TimeseriesUtility.py +++ b/geomagio/TimeseriesUtility.py @@ -610,9 +610,16 @@ def split_trace(trace: Trace, size: int = 86400) -> Stream: size=size, trim=True, ): + interval_start = interval["start"] + interval_end = interval["end"] + delta = out_trace.stats.delta + # accounts for trace containing one sample + if interval_end - delta < interval_start: + stream += out_trace + continue stream += out_trace.slice( - starttime=interval["start"], - endtime=interval["end"] - out_trace.stats.delta, + starttime=interval_start, + endtime=interval_end - delta, nearest_sample=False, ) return stream diff --git a/geomagio/algorithm/AdjustedAlgorithm.py b/geomagio/algorithm/AdjustedAlgorithm.py index fcbe2dcebb8e66e2d95e9235a7134e905f52eca6..7b9f0a4d44a2ae2112841a31d3de0bfde50768b5 100644 --- a/geomagio/algorithm/AdjustedAlgorithm.py +++ b/geomagio/algorithm/AdjustedAlgorithm.py @@ -161,6 +161,14 @@ class AdjustedAlgorithm(Algorithm): ): return True + # if E-E and E-N available + if ( + "E-E" in channels + and "E-N" in channels + and super().can_produce_data(starttime, endtime, stream) + ): + return True + # check validity of remaining channels for c in channels: if c != "F" and not ( diff --git a/geomagio/processing/derived.py b/geomagio/processing/derived.py index 04161aaadd37ec687cb423fe56dacda0e236d202..5f45313c36c3eceff07007566572fb28d5571fdf 100644 --- a/geomagio/processing/derived.py +++ b/geomagio/processing/derived.py @@ -2,6 +2,7 @@ from typing import List, Optional import numpy +from ..adjusted import AdjustedMatrix from ..algorithm import ( AdjustedAlgorithm, AverageAlgorithm, @@ -9,18 +10,20 @@ from ..algorithm import ( ) from ..Controller import Controller, get_realtime_interval from ..TimeseriesFactory import TimeseriesFactory -from .factory import get_edge_factory +from .factory import get_edge_factory, get_miniseed_factory def adjusted( observatory: str, input_factory: Optional[TimeseriesFactory] = None, + input_channels: List[str] = ["H", "E", "Z", "F"], interval: str = "second", output_factory: Optional[TimeseriesFactory] = None, - matrix: Optional[numpy.ndarray] = None, - pier_correction: Optional[float] = None, + output_channels: List[str] = ["X", "Y", "Z", "F"], + matrix: AdjustedMatrix = None, statefile: Optional[str] = None, realtime_interval: int = 600, + update_limit: int = 10, ): """Run Adjusted algorithm. @@ -28,25 +31,26 @@ def adjusted( ---------- observatory: observatory to calculate input_factory: where to read, should be configured with data_type + input_channels: adjusted algorithm input channels interval: data interval output_factory: where to write, should be configured with data_type + output_channels: adjusted algorithm output channels matrix: adjusted matrix - pier_correction: adjusted pier correction statefile: adjusted statefile realtime_interval: window in seconds - - Uses update_limit=10. + update_limit: maximum number of windows to backfill """ - if not statefile and (not matrix or not pier_correction): - raise ValueError("Either statefile or matrix and pier_correction are required.") + if not statefile and not matrix: + raise ValueError("Either statefile or matrix are required.") starttime, endtime = get_realtime_interval(realtime_interval) controller = Controller( algorithm=AdjustedAlgorithm( matrix=matrix, - pier_correction=pier_correction, statefile=statefile, data_type="adjusted", location="A0", + inchannels=input_channels, + outchannels=output_channels, ), inputFactory=input_factory or get_edge_factory(data_type="variation"), inputInterval=interval, @@ -58,10 +62,10 @@ def adjusted( output_observatory=(observatory,), starttime=starttime, endtime=endtime, - input_channels=("H", "E", "Z", "F"), - output_channels=("X", "Y", "Z", "F"), + input_channels=input_channels, + output_channels=output_channels, realtime=realtime_interval, - update_limit=10, + update_limit=update_limit, ) diff --git a/geomagio/processing/efield.py b/geomagio/processing/efield.py new file mode 100644 index 0000000000000000000000000000000000000000..0bd3e6980d39c68409580853f703e740d54ddca5 --- /dev/null +++ b/geomagio/processing/efield.py @@ -0,0 +1,80 @@ +import typer + +from ..adjusted import AdjustedMatrix +from .derived import adjusted +from .factory import get_edge_factory +from .obsrio import obsrio_minute + +app = typer.Typer() + + +def main(): + app() + + +@app.command(name="realtime") +def efield_realtime( + observatory: str = "BOU", + host: str = "127.0.0.1", + realtime_interval: int = 600, + update_limit: int = 10, +): + """ + inverts polarity of 1Hz E-E/E-N + filters 1Hz inverted/non-inverted E-E/E-N to 1 minute + """ + adjusted( + observatory=observatory, + interval="second", + input_factory=get_edge_factory(host=host, data_type="variation"), + input_channels=["E-E", "E-N"], + output_factory=get_edge_factory(host=host, data_type="adjusted"), + output_channels=["E-E", "E-N"], + matrix=AdjustedMatrix( + matrix=[ + [-1, 0, 0], + [0, -1, 0], + [0, 0, 1], + ], + ), + realtime_interval=realtime_interval, + update_limit=update_limit, + ) + obsrio_minute( + observatory=observatory, + channels=["E-E", "E-N"], + input_factory=get_edge_factory(host=host, data_type="variation"), + output_factory=get_edge_factory(host=host, data_type="variation"), + realtime_interval=realtime_interval, + update_limit=update_limit, + ) + obsrio_minute( + observatory=observatory, + channels=["E-E", "E-N"], + input_factory=get_edge_factory(host=host, data_type="adjusted"), + output_factory=get_edge_factory(host=host, data_type="adjusted"), + realtime_interval=realtime_interval, + update_limit=update_limit, + ) + + +@app.command(name="hour") +def efield_hour( + observatory: str = "BOU", + host: str = "127.0.0.1", + realtime_interval: int = 600, + update_limit: int = 10, +): + """filters 1 minute inverted/non-inverted E-E/E-N to 1 hour""" + raise NotImplementedError("hour not implemented") + + +@app.command(name="day") +def efield_day( + observatory: str = "BOU", + host: str = "127.0.0.1", + realtime_interval: int = 600, + update_limit: int = 10, +): + """filters 1 minute inverted/non-inverted E-E/E-N to 1 day""" + raise NotImplementedError("day not implemented") diff --git a/geomagio/processing/obsrio.py b/geomagio/processing/obsrio.py index 3a6182037bb2fef08dcda3a60edc3541df96827b..1d8037fdb166a79d0c82e91c6e3072aec29c96c4 100644 --- a/geomagio/processing/obsrio.py +++ b/geomagio/processing/obsrio.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import List, Optional import typer @@ -204,14 +204,15 @@ def obsrio_hour( def obsrio_minute( observatory: str, + channels: List[str] = ["H", "E", "Z", "F"], input_factory: Optional[TimeseriesFactory] = None, output_factory: Optional[TimeseriesFactory] = None, realtime_interval: int = 600, update_limit: int = 10, ): - """Filter 1Hz legacy H,E,Z,F to 1 minute legacy. + """Filter 1Hz legacy channels to 1 minute legacy. - Should be called after obsrio_second() and obsrio_tenhertz(), + For H,E,Z,F: should be called after obsrio_second() and obsrio_tenhertz(), which populate 1Hz legacy H,E,Z,F. """ starttime, endtime = get_realtime_interval(realtime_interval) @@ -221,7 +222,7 @@ def obsrio_minute( outputFactory=output_factory or get_edge_factory(), outputInterval="minute", ) - for channel in ["H", "E", "Z", "F"]: + for channel in channels: controller.run_as_update( algorithm=FilterAlgorithm( input_sample_period=1, diff --git a/pyproject.toml b/pyproject.toml index a3d76cee9f83cbae8cda869070c42776043e40fe..7fe63c7e8afd9ed159329bb5e8c49c62db883ff3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,6 +65,7 @@ pycurl = ["pycurl"] [tool.poetry.scripts] generate-matrix = "geomagio.processing.affine_matrix:main" +geomag-efield = "geomagio.processing.efield:main" geomag-metadata = "geomagio.metadata.main:main" geomag-monitor = "geomagio.processing.monitor:main" geomag-py = "geomagio.Controller:main"