From 3bd6d631cb6a8d92ac156948e74047ef6275fe6d Mon Sep 17 00:00:00 2001 From: Jeremy Fee <jmfee@usgs.gov> Date: Wed, 29 Apr 2020 10:04:25 -0600 Subject: [PATCH] Add methods to read measurement data from a factory --- geomagio/TimeseriesUtility.py | 27 +++++++++++++++ geomagio/residual/Reading.py | 60 ++++++++++++++++++++++++++++++++++ test/TimeseriesUtility_test.py | 42 ++++++++++++++++++++++++ 3 files changed, 129 insertions(+) diff --git a/geomagio/TimeseriesUtility.py b/geomagio/TimeseriesUtility.py index 8b0d3ee49..34358454c 100644 --- a/geomagio/TimeseriesUtility.py +++ b/geomagio/TimeseriesUtility.py @@ -276,6 +276,33 @@ def get_channels(stream): return [ch for ch in channels] +def get_trace_value(traces, time, default=None): + """Get a value at a specific time. + + Parameters + ---------- + trace : obspy.core.Trace + time : obspy.core.UTCDateTime + + Returns + ------- + nearest time in trace + value from trace at nearest time, or None + """ + # array of UTCDateTime values corresponding + for trace in traces: + times = trace.times("utcdatetime") + index = times.searchsorted(time) + trace_time = times[index] + trace_value = trace.data[index] + if trace_time == time: + if numpy.isnan(trace_value): + return default + else: + return trace_value + return default + + def has_all_channels(stream, channels, starttime, endtime): """Check whether all channels have any data within time range. diff --git a/geomagio/residual/Reading.py b/geomagio/residual/Reading.py index 19041231f..70c99dd4d 100644 --- a/geomagio/residual/Reading.py +++ b/geomagio/residual/Reading.py @@ -2,8 +2,11 @@ import collections from typing import Dict, List, Optional from typing_extensions import Literal +from obspy import Stream from pydantic import BaseModel +from .. import TimeseriesUtility +from ..TimeseriesFactory import TimeseriesFactory from .Absolute import Absolute from .Measurement import AverageMeasurement, Measurement, average_measurement from .MeasurementType import MeasurementType @@ -36,3 +39,60 @@ class Reading(BaseModel): Example: reading[MeasurementType.WEST_DOWN] """ return [m for m in self.measurements if m.measurement_type == measurement_type] + + def load_ordinates( + self, + observatory: str, + timeseries_factory: TimeseriesFactory, + default_existing: bool = True, + ): + """Load ordinates from a timeseries factory. + + Parameters + ---------- + observatory: the observatory to load. + timeseries_factory: source of data. + default_existing: keep existing values if data not found. + """ + mean = average_measurement(self.measurements) + data = timeseries_factory.get_timeseries( + observatory=observatory, + channels=("H", "E", "Z", "F"), + interval="second", + type="variation", + starttime=mean.time, + endtime=mean.endtime, + ) + self.update_measurement_ordinates(data, default_existing) + + def update_measurement_ordinates(self, data: Stream, default_existing: bool = True): + """Update ordinates. + + Parameters + ---------- + data: source of data. + default_existing: keep existing values if data not found. + """ + for measurement in self.measurements: + if not measurement.time: + continue + measurement.h = TimeseriesUtility.get_trace_value( + traces=data.select(channel="H"), + time=measurement.time, + default=default_existing and measurement.h or None, + ) + measurement.e = TimeseriesUtility.get_trace_value( + traces=data.select(channel="E"), + time=measurement.time, + default=default_existing and measurement.e or None, + ) + measurement.z = TimeseriesUtility.get_trace_value( + traces=data.select(channel="Z"), + time=measurement.time, + default=default_existing and measurement.z or None, + ) + measurement.f = TimeseriesUtility.get_trace_value( + traces=data.select(channel="F"), + time=measurement.time, + default=default_existing and measurement.f or None, + ) diff --git a/test/TimeseriesUtility_test.py b/test/TimeseriesUtility_test.py index 25a93f404..8c036f775 100644 --- a/test/TimeseriesUtility_test.py +++ b/test/TimeseriesUtility_test.py @@ -170,6 +170,48 @@ def test_get_merged_gaps(): assert_equal(gap[1], UTCDateTime("2015-01-01T00:00:07Z")) +def test_get_trace_values(): + """TimeseriesUtility_test.test_get_trace_values() + """ + stream = Stream( + [ + __create_trace("H", [numpy.nan, 1, 1, numpy.nan, numpy.nan]), + __create_trace("Z", [0, 0, 0, 1, 1, 1]), + ] + ) + for trace in stream: + # set time of first sample + trace.stats.starttime = UTCDateTime("2015-01-01T00:00:00Z") + # set sample rate to 1 second + trace.stats.delta = 1 + trace.stats.npts = len(trace.data) + print(stream) + print(stream.select(channel="H")[0].times("utcdatetime")) + # value that doesn't exist + assert_equal( + TimeseriesUtility.get_trace_value( + traces=stream.select(channel="H"), time=UTCDateTime("2015-01-01T00:00:00Z") + ), + None, + ) + # value that exists + assert_equal( + TimeseriesUtility.get_trace_value( + traces=stream.select(channel="Z"), time=UTCDateTime("2015-01-01T00:00:00Z") + ), + 0, + ) + # default for value that doesn't exist + assert_equal( + TimeseriesUtility.get_trace_value( + traces=stream.select(channel="H"), + time=UTCDateTime("2015-01-01T00:00:03Z"), + default=4, + ), + 4, + ) + + def test_has_all_channels(): """TimeseriesUtility_test.test_has_all_channels(): """ -- GitLab