diff --git a/etc/adjusted/BOU201601adj.min b/etc/adjusted/BOU201601adj.min index 7d9262b9ad3cd6ace9916009e5be409dad16a35d..14a73fb7de70ae4dcc28e24f91a7dba8daa97fbe 100644 --- a/etc/adjusted/BOU201601adj.min +++ b/etc/adjusted/BOU201601adj.min @@ -9,7 +9,7 @@ Sensor Orientation HDZF | Digital Sampling 100.0 second | Data Interval Type filtered 1-minute (00:15-01:45) | - Data Type variation | + Data Type adjusted | # DECBAS 5527 (Baseline declination value in | # tenths of minutes East (0-216,000)). | # Vector 1-minute values are computed from 1-second values using | diff --git a/geomagio/Controller.py b/geomagio/Controller.py index 432f63f9f4d54d3451b7495d0c88e7b5625bde6c..f985c4fae44d9ebbb3224f4b7398854fdb3bb231 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -8,6 +8,7 @@ from typing import List, Optional, Tuple, Union from obspy.core import Stream, UTCDateTime from .algorithm import Algorithm, algorithms, AlgorithmException +from .DerivedTimeseriesFactory import DerivedTimeseriesFactory from .PlotTimeseriesFactory import PlotTimeseriesFactory from .StreamTimeseriesFactory import StreamTimeseriesFactory from . import TimeseriesUtility, Util @@ -543,7 +544,7 @@ def get_input_factory(args): input_factory = StreamTimeseriesFactory( factory=input_factory, stream=input_stream ) - return input_factory + return DerivedTimeseriesFactory(input_factory) def get_output_factory(args): diff --git a/geomagio/DerivedTimeseriesFactory.py b/geomagio/DerivedTimeseriesFactory.py new file mode 100644 index 0000000000000000000000000000000000000000..cc4a0ff7a7b826405eb6ee8d6dde68636dde1321 --- /dev/null +++ b/geomagio/DerivedTimeseriesFactory.py @@ -0,0 +1,185 @@ +from typing import List, Optional + +from obspy import Stream, UTCDateTime + +from .algorithm import Algorithm, DeltaFAlgorithm, XYZAlgorithm +from .TimeseriesFactory import TimeseriesFactory, TimeseriesUtility + + +class DerivedTimeseriesFactory(TimeseriesFactory): + factory: TimeseriesFactory + + def __init__(self, factory: TimeseriesFactory): + self.factory = factory + super().__init__( + observatory=factory.observatory, + channels=factory.channels, + type=factory.type, + interval=factory.interval, + urlTemplate=factory.urlTemplate, + urlInterval=factory.urlInterval, + ) + + def get_timeseries( + self, + starttime: UTCDateTime, + endtime: UTCDateTime, + observatory: str, + channels: List[str], + interval: str, + add_empty_channels: bool = True, + derive_missing: bool = True, + data_type: Optional[str] = None, + ) -> Stream: + data_type = data_type or self.type + timeseries = self.factory.get_timeseries( + starttime=starttime, + endtime=endtime, + observatory=observatory, + channels=channels, + type=data_type, + interval=interval, + add_empty_channels=False, + ) + missing = get_missing(timeseries, channels) + if missing and derive_missing: + timeseries += self._get_derived_channels( + starttime=starttime, + endtime=endtime, + observatory=observatory, + channels=channels, + data_type=data_type, + interval=interval, + timeseries=timeseries, + ) + missing = get_missing(timeseries, channels) + if missing and add_empty_channels: + timeseries += self._get_empty_channels( + starttime=starttime, + endtime=endtime, + observatory=observatory, + channels=channels, + data_type=data_type, + interval=interval, + location=timeseries[0].stats.location, + ) + # file-based factories return all channels found in file + timeseries = Stream([t for t in timeseries if t.stats.channel in channels]) + return timeseries + + def _get_empty_channels( + self, + starttime: UTCDateTime, + endtime: UTCDateTime, + observatory: str, + channels: List[str], + data_type: str, + interval: str, + location: str, + ) -> Stream: + """create empty channels""" + output_stream = Stream() + for channel in channels: + output_stream += TimeseriesUtility.create_empty_trace( + starttime=starttime, + endtime=endtime, + observatory=observatory, + channel=channel, + type=data_type, + interval=interval, + network="NT", + station=observatory, + location=location, + ) + return output_stream + + def _get_derived_channels( + self, + starttime: UTCDateTime, + endtime: UTCDateTime, + observatory: str, + channels: List[str], + data_type: str, + interval: str, + timeseries: Stream, + ): + """calculate derived channels""" + input_timeseries = timeseries.copy() + input_channels = [] + for channel in channels: + input_channels += self._get_derived_input_channels(channel, data_type) + missing_inputs = get_missing(input_timeseries, list(set(input_channels))) + if missing_inputs: + input_timeseries += self.factory.get_timeseries( + starttime=starttime, + endtime=endtime, + observatory=observatory, + channels=missing_inputs, + type=data_type, + interval=interval, + add_empty_channels=False, + ) + output_timeseries = Stream() + for channel in channels: + if channel in get_missing(output_timeseries, channels): + derived = self._derive_trace( + input_timeseries=input_timeseries, + channel=channel, + data_type=data_type, + ) + for channel in get_missing( + output_timeseries, TimeseriesUtility.get_channels(stream=derived) + ): + output_timeseries += derived.select(channel=channel) + return output_timeseries + + def _get_derived_input_channels(self, channel: str, data_type: str) -> List[str]: + """get channels required to calculate desired channel""" + if data_type == "variation": + if channel == "G": + return ["H", "E", "Z", "F"] + elif channel in ["X", "Y", "D"]: + return ["H", "E"] + else: + if channel == "G": + return ["X", "Y", "Z", "F"] + elif channel in ["H", "D"]: + return ["X", "Y"] + return [] + + def _derive_trace( + self, input_timeseries: Stream, channel: str, data_type: str + ) -> Stream: + """Process timeseries based on desired channel + + Note: All derived channels are returned + """ + if data_type == "variation": + if channel == "G": + return DeltaFAlgorithm(informat="obs").process( + timeseries=input_timeseries + ) + elif channel in ["X", "Y"]: + return XYZAlgorithm(informat="obs", outformat="geo").process( + timeseries=input_timeseries + ) + elif channel == "D": + return XYZAlgorithm(informat="obs", outformat="obsd").process( + timeseries=input_timeseries + ) + else: + if channel == "G": + return DeltaFAlgorithm(informat="geo").process( + timeseries=input_timeseries + ) + elif channel in ["H", "D"]: + return XYZAlgorithm(informat="geo", outformat="mag").process( + timeseries=input_timeseries + ) + return Stream() + + +def get_missing(input: Stream, desired: List[str]) -> List[str]: + """Return missing channels from input""" + present = TimeseriesUtility.get_channels(stream=input) + return list(set(desired).difference(set(present))) diff --git a/geomagio/TimeseriesFactory.py b/geomagio/TimeseriesFactory.py index 7446f81ee6cc4877dde540044c53c742e9a3564f..064b868d6618b2d8cfedf2a8b12df342169cfe88 100644 --- a/geomagio/TimeseriesFactory.py +++ b/geomagio/TimeseriesFactory.py @@ -67,6 +67,7 @@ class TimeseriesFactory(object): channels=None, type=None, interval=None, + add_empty_channels: bool = True, ): """Get timeseries data. @@ -93,6 +94,8 @@ class TimeseriesFactory(object): interval : {'day', 'hour', 'minute', 'month', 'second'} data interval, optional. uses default if unspecified. + add_empty_channels + if True, returns channels without data as empty traces Returns ------- diff --git a/geomagio/__init__.py b/geomagio/__init__.py index 05fcd276c5d8542e00e34daaa4e6f71b9a3567e1..c9698e9e15f0ad6cd06c63083a013a1f8c961297 100644 --- a/geomagio/__init__.py +++ b/geomagio/__init__.py @@ -9,6 +9,7 @@ from . import TimeseriesUtility from . import Util from .Controller import Controller +from .DerivedTimeseriesFactory import DerivedTimeseriesFactory from .ObservatoryMetadata import ObservatoryMetadata from .PlotTimeseriesFactory import PlotTimeseriesFactory from .TimeseriesFactory import TimeseriesFactory @@ -18,6 +19,7 @@ __all__ = [ "ChannelConverter", "Controller", "DeltaFAlgorithm", + "DerivedTimeseriesFactory", "ObservatoryMetadata", "PlotTimeseriesFactory", "StreamConverter", diff --git a/geomagio/api/ws/data.py b/geomagio/api/ws/data.py index bec7e546e39d1b3452597d7e428d6afac3f3db2b..24f16b226acc8aa1e44913c622a202cf63b8c55b 100644 --- a/geomagio/api/ws/data.py +++ b/geomagio/api/ws/data.py @@ -5,7 +5,7 @@ from fastapi import APIRouter, Depends, Query from obspy import UTCDateTime, Stream from starlette.responses import Response -from ... import TimeseriesFactory, TimeseriesUtility +from ... import DerivedTimeseriesFactory, TimeseriesFactory, TimeseriesUtility from ...edge import EdgeFactory, MiniSeedFactory from ...iaga2002 import IAGA2002Writer from ...imfjson import IMFJSONWriter @@ -35,15 +35,16 @@ def get_data_factory( SamplingPeriod.HOUR, SamplingPeriod.DAY, ]: - return MiniSeedFactory( + factory = MiniSeedFactory( host=host, port=int(os.getenv("DATA_MINISEED_PORT", "2061")) ) elif sampling_period in [SamplingPeriod.SECOND, SamplingPeriod.MINUTE]: - return EdgeFactory( + factory = EdgeFactory( host=host, port=int(os.getenv("DATA_EARTHWORM_PORT", "2060")) ) else: return None + return DerivedTimeseriesFactory(factory) def get_data_query( diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index 682d03f733ed2b24fa04269925859f3279f179ed..5dbde248fe229a42b9b68072c97337bdeb62ba94 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -91,7 +91,6 @@ class EdgeFactory(TimeseriesFactory): ): TimeseriesFactory.__init__(self, observatory, channels, type, interval) self.client = earthworm.Client(host, port) - self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata() self.tag = tag self.locationCode = locationCode @@ -111,6 +110,7 @@ class EdgeFactory(TimeseriesFactory): channels=None, type=None, interval=None, + add_empty_channels: bool = True, ): """Get timeseries data @@ -128,6 +128,8 @@ class EdgeFactory(TimeseriesFactory): data type. interval: {'day', 'hour', 'minute', 'second', 'tenhertz'} data interval. + add_empty_channels + if True, returns channels without data as empty traces Returns ------- @@ -159,8 +161,16 @@ class EdgeFactory(TimeseriesFactory): timeseries = obspy.core.Stream() for channel in channels: data = self._get_timeseries( - starttime, endtime, observatory, channel, type, interval + starttime, + endtime, + observatory, + channel, + type, + interval, + add_empty_channels, ) + if len(data) == 0: + continue timeseries += data finally: # restore stdout @@ -278,7 +288,16 @@ class EdgeFactory(TimeseriesFactory): trace.data = numpy.ma.masked_invalid(trace.data) return stream - def _get_timeseries(self, starttime, endtime, observatory, channel, type, interval): + def _get_timeseries( + self, + starttime, + endtime, + observatory, + channel, + type, + interval, + add_empty_channels: bool = True, + ): """get timeseries data for a single channel. Parameters @@ -295,6 +314,8 @@ class EdgeFactory(TimeseriesFactory): data type {definitive, quasi-definitive, variation} interval : str interval length {minute, second} + add_empty_channels + if True, returns channels without data as empty traces Returns ------- @@ -325,7 +346,7 @@ class EdgeFactory(TimeseriesFactory): for trace in data: trace.data = trace.data.astype("i4") data.merge() - if data.count() == 0: + if data.count() == 0 and add_empty_channels: data += TimeseriesUtility.create_empty_trace( starttime, endtime, diff --git a/geomagio/edge/MiniSeedFactory.py b/geomagio/edge/MiniSeedFactory.py index abcf8b261a501132f9e210a76c2efb670287eaaf..d708b30f47b7c21907b415b6e817c0e2b15f5024 100644 --- a/geomagio/edge/MiniSeedFactory.py +++ b/geomagio/edge/MiniSeedFactory.py @@ -97,6 +97,7 @@ class MiniSeedFactory(TimeseriesFactory): channels=None, type=None, interval=None, + add_empty_channels: bool = True, ): """Get timeseries data @@ -114,6 +115,8 @@ class MiniSeedFactory(TimeseriesFactory): data type. interval: {'day', 'hour', 'minute', 'second', 'tenhertz'} data interval. + add_empty_channels + if True, returns channels without data as empty traces Returns ------- @@ -150,8 +153,16 @@ class MiniSeedFactory(TimeseriesFactory): ) else: data = self._get_timeseries( - starttime, endtime, observatory, channel, type, interval + starttime, + endtime, + observatory, + channel, + type, + interval, + add_empty_channels, ) + if len(data) == 0: + continue timeseries += data finally: # restore stdout @@ -296,7 +307,16 @@ class MiniSeedFactory(TimeseriesFactory): trace.data = numpy.ma.masked_invalid(trace.data) return stream - def _get_timeseries(self, starttime, endtime, observatory, channel, type, interval): + def _get_timeseries( + self, + starttime, + endtime, + observatory, + channel, + type, + interval, + add_empty_channels: bool = True, + ): """get timeseries data for a single channel. Parameters @@ -313,6 +333,8 @@ class MiniSeedFactory(TimeseriesFactory): data type {definitive, quasi-definitive, variation} interval : str interval length {'day', 'hour', 'minute', 'second', 'tenhertz'} + add_empty_channels + if True, returns channels without data as empty traces Returns ------- @@ -330,7 +352,7 @@ class MiniSeedFactory(TimeseriesFactory): sncl.network, sncl.station, sncl.location, sncl.channel, starttime, endtime ) data.merge() - if data.count() == 0: + if data.count() == 0 and add_empty_channels: data += TimeseriesUtility.create_empty_trace( starttime, endtime, diff --git a/geomagio/iaga2002/StreamIAGA2002Factory.py b/geomagio/iaga2002/StreamIAGA2002Factory.py index 62f6f871c99b61aad9a4ce903f63a99cdbce0d66..eb1625f62c57b309c0f5063e520cb09a437b44d6 100644 --- a/geomagio/iaga2002/StreamIAGA2002Factory.py +++ b/geomagio/iaga2002/StreamIAGA2002Factory.py @@ -31,6 +31,7 @@ class StreamIAGA2002Factory(IAGA2002Factory): channels=None, type=None, interval=None, + add_empty_channels: bool = True, ): """Implements get_timeseries diff --git a/geomagio/imfv122/StreamIMFV122Factory.py b/geomagio/imfv122/StreamIMFV122Factory.py index b868d8dd20318e1587403dda00b6f751876ad1cd..4a9575fc9dba04c986b54e76014477c0e6a0c7ad 100644 --- a/geomagio/imfv122/StreamIMFV122Factory.py +++ b/geomagio/imfv122/StreamIMFV122Factory.py @@ -31,6 +31,7 @@ class StreamIMFV122Factory(IMFV122Factory): channels=None, type=None, interval=None, + add_empty_channels: bool = True, ): """Implements get_timeseries diff --git a/geomagio/imfv283/GOESIMFV283Factory.py b/geomagio/imfv283/GOESIMFV283Factory.py index 29bda1d9b9f912b481ba3b0f8ad9cc0bc5569d4f..195e8710505ea21bd631ec8fdacbb252dfd63947 100644 --- a/geomagio/imfv283/GOESIMFV283Factory.py +++ b/geomagio/imfv283/GOESIMFV283Factory.py @@ -67,6 +67,7 @@ class GOESIMFV283Factory(IMFV283Factory): channels=None, type=None, interval=None, + add_empty_channels: bool = True, ): """Implements get_timeseries diff --git a/geomagio/imfv283/StreamIMFV283Factory.py b/geomagio/imfv283/StreamIMFV283Factory.py index aff52877f6bbe3948acb82fddd31326148fb6854..402f0e2c3199ceb58ddcb9223815e045f582edf8 100644 --- a/geomagio/imfv283/StreamIMFV283Factory.py +++ b/geomagio/imfv283/StreamIMFV283Factory.py @@ -31,6 +31,7 @@ class StreamIMFV283Factory(IMFV283Factory): channels=None, type=None, interval=None, + add_empty_channels: bool = True, ): """Implements get_timeseries diff --git a/test/DerivedTimeseriesFactory_test.py b/test/DerivedTimeseriesFactory_test.py new file mode 100644 index 0000000000000000000000000000000000000000..4861521f97d1c456704716bdda1c8d5b405cbcf5 --- /dev/null +++ b/test/DerivedTimeseriesFactory_test.py @@ -0,0 +1,109 @@ +from typing import List + +from obspy import Stream + +from geomagio import DerivedTimeseriesFactory, TimeseriesUtility +from geomagio.algorithm import Algorithm, DeltaFAlgorithm, XYZAlgorithm +from geomagio.iaga2002 import StreamIAGA2002Factory +from geomagio.edge import EdgeFactory + + +def test_derive_trace(): + """test.DerivedTimeseriesFactory_test.test_derive_trace()""" + timeseries = get_derived_timeseries( + "etc/filter/BOU20200101vsec.sec", ["H", "E", "Z", "F"], "variation", "second" + ) + factory = DerivedTimeseriesFactory(EdgeFactory()) + assert factory._derive_trace( + input_timeseries=timeseries, channel="G", data_type="variation" + ) == DeltaFAlgorithm(informat="obs").process(timeseries=timeseries) + assert factory._derive_trace( + input_timeseries=timeseries, channel="X", data_type="variation" + ) == XYZAlgorithm(informat="obs", outformat="geo").process(timeseries=timeseries) + assert factory._derive_trace( + input_timeseries=timeseries, channel="Y", data_type="variation" + ) == XYZAlgorithm(informat="obs", outformat="geo").process(timeseries=timeseries) + assert factory._derive_trace( + input_timeseries=timeseries, channel="D", data_type="variation" + ) == XYZAlgorithm(informat="obs", outformat="obsd").process(timeseries=timeseries) + timeseries = get_derived_timeseries( + "etc/adjusted/BOU201601adj.min", ["X", "Y", "Z", "F"], "adjusted", "minute" + ) + assert factory._derive_trace( + input_timeseries=timeseries, channel="G", data_type="adjusted" + ) == DeltaFAlgorithm(informat="geo").process(timeseries=timeseries) + assert factory._derive_trace( + input_timeseries=timeseries, channel="H", data_type="adjusted" + ) == XYZAlgorithm(informat="geo", outformat="mag").process(timeseries=timeseries) + assert factory._derive_trace( + input_timeseries=timeseries, channel="D", data_type="adjusted" + ) == XYZAlgorithm(informat="geo", outformat="mag").process(timeseries=timeseries) + + +def test_get_derived_input_channels(): + """test.DerivedTimeseriesFactory_test.test_get_derived_input_channels()""" + factory = DerivedTimeseriesFactory(EdgeFactory(host=None, port=None)) + assert factory._get_derived_input_channels(channel="G", data_type="variation") == [ + "H", + "E", + "Z", + "F", + ] + assert factory._get_derived_input_channels(channel="G", data_type="adjusted") == [ + "X", + "Y", + "Z", + "F", + ] + assert factory._get_derived_input_channels(channel="X", data_type="variation") == [ + "H", + "E", + ] + assert factory._get_derived_input_channels(channel="H", data_type="adjusted") == [ + "X", + "Y", + ] + # invalid channel, should return empty list + assert factory._get_derived_input_channels(channel="Q", data_type="variation") == [] + + +def test_get_timeseries(): + """test.DerivedTimeseriesFactory_test.test_get_timeseries()""" + variation_url = "etc/filter/BOU20200101vsec.sec" + timeseries = get_derived_timeseries( + variation_url, ["H", "E", "Z", "F"], "variation", "second" + ) + assert TimeseriesUtility.get_channels(timeseries) == ["H", "E", "Z", "F"] + timeseries = get_derived_timeseries(variation_url, ["G"], "variation", "second") + assert TimeseriesUtility.get_channels(timeseries) == ["G"] + timeseries = get_derived_timeseries( + variation_url, ["X", "Y"], "variation", "second" + ) + assert set(TimeseriesUtility.get_channels(timeseries)) == set(["X", "Y"]) + adjusted_url = "etc/adjusted/BOU201601adj.min" + timeseries = get_derived_timeseries( + adjusted_url, ["X", "Y", "Z", "F"], "adjusted", "minute" + ) + assert TimeseriesUtility.get_channels(timeseries) == ["X", "Y", "Z", "F"] + timeseries = get_derived_timeseries(adjusted_url, ["G"], "adjusted", "minute") + assert TimeseriesUtility.get_channels(timeseries) == ["G"] + timeseries = get_derived_timeseries(adjusted_url, ["H", "D"], "adjusted", "minute") + assert set(TimeseriesUtility.get_channels(timeseries)) == set(["H", "D"]) + + +def get_derived_timeseries( + url: str, channels: List[str], data_type: str, interval: str +) -> Stream: + with open(url, "r") as file: + return DerivedTimeseriesFactory( + StreamIAGA2002Factory(stream=file) + ).get_timeseries( + starttime=None, + endtime=None, + observatory="BOU", + channels=channels, + interval=interval, + add_empty_channels=False, + derive_missing=True, + data_type=data_type, + ) diff --git a/test/edge_test/EdgeFactory_test.py b/test/edge_test/EdgeFactory_test.py index 6b524fb318b3fad25afdee0613255d4551ff8aba..55e94f1266fef85c9f6b0cdafa15b781420d656e 100644 --- a/test/edge_test/EdgeFactory_test.py +++ b/test/edge_test/EdgeFactory_test.py @@ -29,3 +29,28 @@ def dont_get_timeseries(): "H", "Expect timeseries stats channel to be equal to H", ) + + +def test_add_empty_channels(): + """edge_test.EdgeFactory_test.test_add_empty_channels()""" + edge_factory = EdgeFactory(host="TODO", port="TODO") + timeseries = edge_factory.get_timeseries( + UTCDateTime(2015, 3, 1, 0, 0, 0), + UTCDateTime(2015, 3, 1, 1, 0, 0), + "BOU", + ("H"), + "variation", + "minute", + add_empty_channels=False, + ) + assert len(timeseries) == 0 + timeseries = edge_factory.get_timeseries( + UTCDateTime(2015, 3, 1, 0, 0, 0), + UTCDateTime(2015, 3, 1, 1, 0, 0), + "BOU", + ("H"), + "variation", + "minute", + add_empty_channels=True, # default + ) + assert len(timeseries) == 1