From 61c75d4a3cfe8f40a195e58c83d0df191634a397 Mon Sep 17 00:00:00 2001 From: pcain-usgs <pcain@usgs.gov> Date: Wed, 11 Aug 2021 13:10:46 -0600 Subject: [PATCH] DerivedTimeseriesFactory/test --- etc/adjusted/BOU201601adj.min | 2 +- geomagio/DerivedTimeseriesFactory.py | 185 ++++++++++++++++++++++++++ geomagio/__init__.py | 2 + test/DerivedTimeseriesFactory_test.py | 109 +++++++++++++++ 4 files changed, 297 insertions(+), 1 deletion(-) create mode 100644 geomagio/DerivedTimeseriesFactory.py create mode 100644 test/DerivedTimeseriesFactory_test.py diff --git a/etc/adjusted/BOU201601adj.min b/etc/adjusted/BOU201601adj.min index 7d9262b9a..14a73fb7d 100644 --- a/etc/adjusted/BOU201601adj.min +++ b/etc/adjusted/BOU201601adj.min @@ -9,7 +9,7 @@ Sensor Orientation HDZF | Digital Sampling 100.0 second | Data Interval Type filtered 1-minute (00:15-01:45) | - Data Type variation | + Data Type adjusted | # DECBAS 5527 (Baseline declination value in | # tenths of minutes East (0-216,000)). | # Vector 1-minute values are computed from 1-second values using | diff --git a/geomagio/DerivedTimeseriesFactory.py b/geomagio/DerivedTimeseriesFactory.py new file mode 100644 index 000000000..cc4a0ff7a --- /dev/null +++ b/geomagio/DerivedTimeseriesFactory.py @@ -0,0 +1,185 @@ +from typing import List, Optional + +from obspy import Stream, UTCDateTime + +from .algorithm import Algorithm, DeltaFAlgorithm, XYZAlgorithm +from .TimeseriesFactory import TimeseriesFactory, TimeseriesUtility + + +class DerivedTimeseriesFactory(TimeseriesFactory): + factory: TimeseriesFactory + + def __init__(self, factory: TimeseriesFactory): + self.factory = factory + super().__init__( + observatory=factory.observatory, + channels=factory.channels, + type=factory.type, + interval=factory.interval, + urlTemplate=factory.urlTemplate, + urlInterval=factory.urlInterval, + ) + + def get_timeseries( + self, + starttime: UTCDateTime, + endtime: UTCDateTime, + observatory: str, + channels: List[str], + interval: str, + add_empty_channels: bool = True, + derive_missing: bool = True, + data_type: Optional[str] = None, + ) -> Stream: + data_type = data_type or self.type + timeseries = self.factory.get_timeseries( + starttime=starttime, + endtime=endtime, + observatory=observatory, + channels=channels, + type=data_type, + interval=interval, + add_empty_channels=False, + ) + missing = get_missing(timeseries, channels) + if missing and derive_missing: + timeseries += self._get_derived_channels( + starttime=starttime, + endtime=endtime, + observatory=observatory, + channels=channels, + data_type=data_type, + interval=interval, + timeseries=timeseries, + ) + missing = get_missing(timeseries, channels) + if missing and add_empty_channels: + timeseries += self._get_empty_channels( + starttime=starttime, + endtime=endtime, + observatory=observatory, + channels=channels, + data_type=data_type, + interval=interval, + location=timeseries[0].stats.location, + ) + # file-based factories return all channels found in file + timeseries = Stream([t for t in timeseries if t.stats.channel in channels]) + return timeseries + + def _get_empty_channels( + self, + starttime: UTCDateTime, + endtime: UTCDateTime, + observatory: str, + channels: List[str], + data_type: str, + interval: str, + location: str, + ) -> Stream: + """create empty channels""" + output_stream = Stream() + for channel in channels: + output_stream += TimeseriesUtility.create_empty_trace( + starttime=starttime, + endtime=endtime, + observatory=observatory, + channel=channel, + type=data_type, + interval=interval, + network="NT", + station=observatory, + location=location, + ) + return output_stream + + def _get_derived_channels( + self, + starttime: UTCDateTime, + endtime: UTCDateTime, + observatory: str, + channels: List[str], + data_type: str, + interval: str, + timeseries: Stream, + ): + """calculate derived channels""" + input_timeseries = timeseries.copy() + input_channels = [] + for channel in channels: + input_channels += self._get_derived_input_channels(channel, data_type) + missing_inputs = get_missing(input_timeseries, list(set(input_channels))) + if missing_inputs: + input_timeseries += self.factory.get_timeseries( + starttime=starttime, + endtime=endtime, + observatory=observatory, + channels=missing_inputs, + type=data_type, + interval=interval, + add_empty_channels=False, + ) + output_timeseries = Stream() + for channel in channels: + if channel in get_missing(output_timeseries, channels): + derived = self._derive_trace( + input_timeseries=input_timeseries, + channel=channel, + data_type=data_type, + ) + for channel in get_missing( + output_timeseries, TimeseriesUtility.get_channels(stream=derived) + ): + output_timeseries += derived.select(channel=channel) + return output_timeseries + + def _get_derived_input_channels(self, channel: str, data_type: str) -> List[str]: + """get channels required to calculate desired channel""" + if data_type == "variation": + if channel == "G": + return ["H", "E", "Z", "F"] + elif channel in ["X", "Y", "D"]: + return ["H", "E"] + else: + if channel == "G": + return ["X", "Y", "Z", "F"] + elif channel in ["H", "D"]: + return ["X", "Y"] + return [] + + def _derive_trace( + self, input_timeseries: Stream, channel: str, data_type: str + ) -> Stream: + """Process timeseries based on desired channel + + Note: All derived channels are returned + """ + if data_type == "variation": + if channel == "G": + return DeltaFAlgorithm(informat="obs").process( + timeseries=input_timeseries + ) + elif channel in ["X", "Y"]: + return XYZAlgorithm(informat="obs", outformat="geo").process( + timeseries=input_timeseries + ) + elif channel == "D": + return XYZAlgorithm(informat="obs", outformat="obsd").process( + timeseries=input_timeseries + ) + else: + if channel == "G": + return DeltaFAlgorithm(informat="geo").process( + timeseries=input_timeseries + ) + elif channel in ["H", "D"]: + return XYZAlgorithm(informat="geo", outformat="mag").process( + timeseries=input_timeseries + ) + return Stream() + + +def get_missing(input: Stream, desired: List[str]) -> List[str]: + """Return missing channels from input""" + present = TimeseriesUtility.get_channels(stream=input) + return list(set(desired).difference(set(present))) diff --git a/geomagio/__init__.py b/geomagio/__init__.py index 05fcd276c..c9698e9e1 100644 --- a/geomagio/__init__.py +++ b/geomagio/__init__.py @@ -9,6 +9,7 @@ from . import TimeseriesUtility from . import Util from .Controller import Controller +from .DerivedTimeseriesFactory import DerivedTimeseriesFactory from .ObservatoryMetadata import ObservatoryMetadata from .PlotTimeseriesFactory import PlotTimeseriesFactory from .TimeseriesFactory import TimeseriesFactory @@ -18,6 +19,7 @@ __all__ = [ "ChannelConverter", "Controller", "DeltaFAlgorithm", + "DerivedTimeseriesFactory", "ObservatoryMetadata", "PlotTimeseriesFactory", "StreamConverter", diff --git a/test/DerivedTimeseriesFactory_test.py b/test/DerivedTimeseriesFactory_test.py new file mode 100644 index 000000000..4861521f9 --- /dev/null +++ b/test/DerivedTimeseriesFactory_test.py @@ -0,0 +1,109 @@ +from typing import List + +from obspy import Stream + +from geomagio import DerivedTimeseriesFactory, TimeseriesUtility +from geomagio.algorithm import Algorithm, DeltaFAlgorithm, XYZAlgorithm +from geomagio.iaga2002 import StreamIAGA2002Factory +from geomagio.edge import EdgeFactory + + +def test_derive_trace(): + """test.DerivedTimeseriesFactory_test.test_derive_trace()""" + timeseries = get_derived_timeseries( + "etc/filter/BOU20200101vsec.sec", ["H", "E", "Z", "F"], "variation", "second" + ) + factory = DerivedTimeseriesFactory(EdgeFactory()) + assert factory._derive_trace( + input_timeseries=timeseries, channel="G", data_type="variation" + ) == DeltaFAlgorithm(informat="obs").process(timeseries=timeseries) + assert factory._derive_trace( + input_timeseries=timeseries, channel="X", data_type="variation" + ) == XYZAlgorithm(informat="obs", outformat="geo").process(timeseries=timeseries) + assert factory._derive_trace( + input_timeseries=timeseries, channel="Y", data_type="variation" + ) == XYZAlgorithm(informat="obs", outformat="geo").process(timeseries=timeseries) + assert factory._derive_trace( + input_timeseries=timeseries, channel="D", data_type="variation" + ) == XYZAlgorithm(informat="obs", outformat="obsd").process(timeseries=timeseries) + timeseries = get_derived_timeseries( + "etc/adjusted/BOU201601adj.min", ["X", "Y", "Z", "F"], "adjusted", "minute" + ) + assert factory._derive_trace( + input_timeseries=timeseries, channel="G", data_type="adjusted" + ) == DeltaFAlgorithm(informat="geo").process(timeseries=timeseries) + assert factory._derive_trace( + input_timeseries=timeseries, channel="H", data_type="adjusted" + ) == XYZAlgorithm(informat="geo", outformat="mag").process(timeseries=timeseries) + assert factory._derive_trace( + input_timeseries=timeseries, channel="D", data_type="adjusted" + ) == XYZAlgorithm(informat="geo", outformat="mag").process(timeseries=timeseries) + + +def test_get_derived_input_channels(): + """test.DerivedTimeseriesFactory_test.test_get_derived_input_channels()""" + factory = DerivedTimeseriesFactory(EdgeFactory(host=None, port=None)) + assert factory._get_derived_input_channels(channel="G", data_type="variation") == [ + "H", + "E", + "Z", + "F", + ] + assert factory._get_derived_input_channels(channel="G", data_type="adjusted") == [ + "X", + "Y", + "Z", + "F", + ] + assert factory._get_derived_input_channels(channel="X", data_type="variation") == [ + "H", + "E", + ] + assert factory._get_derived_input_channels(channel="H", data_type="adjusted") == [ + "X", + "Y", + ] + # invalid channel, should return empty list + assert factory._get_derived_input_channels(channel="Q", data_type="variation") == [] + + +def test_get_timeseries(): + """test.DerivedTimeseriesFactory_test.test_get_timeseries()""" + variation_url = "etc/filter/BOU20200101vsec.sec" + timeseries = get_derived_timeseries( + variation_url, ["H", "E", "Z", "F"], "variation", "second" + ) + assert TimeseriesUtility.get_channels(timeseries) == ["H", "E", "Z", "F"] + timeseries = get_derived_timeseries(variation_url, ["G"], "variation", "second") + assert TimeseriesUtility.get_channels(timeseries) == ["G"] + timeseries = get_derived_timeseries( + variation_url, ["X", "Y"], "variation", "second" + ) + assert set(TimeseriesUtility.get_channels(timeseries)) == set(["X", "Y"]) + adjusted_url = "etc/adjusted/BOU201601adj.min" + timeseries = get_derived_timeseries( + adjusted_url, ["X", "Y", "Z", "F"], "adjusted", "minute" + ) + assert TimeseriesUtility.get_channels(timeseries) == ["X", "Y", "Z", "F"] + timeseries = get_derived_timeseries(adjusted_url, ["G"], "adjusted", "minute") + assert TimeseriesUtility.get_channels(timeseries) == ["G"] + timeseries = get_derived_timeseries(adjusted_url, ["H", "D"], "adjusted", "minute") + assert set(TimeseriesUtility.get_channels(timeseries)) == set(["H", "D"]) + + +def get_derived_timeseries( + url: str, channels: List[str], data_type: str, interval: str +) -> Stream: + with open(url, "r") as file: + return DerivedTimeseriesFactory( + StreamIAGA2002Factory(stream=file) + ).get_timeseries( + starttime=None, + endtime=None, + observatory="BOU", + channels=channels, + interval=interval, + add_empty_channels=False, + derive_missing=True, + data_type=data_type, + ) -- GitLab