diff --git a/docs/elements.md b/docs/elements.md index d4340c308c55d41c0caed68645d75983e2d853dc..ff68b4c9aaab31d8ec85be936ef15f0947801157 100644 --- a/docs/elements.md +++ b/docs/elements.md @@ -19,7 +19,7 @@ Elements generated by observatories running the Obsrio data acquisition system. - U, V, W - uses instrument metadata from `geomagio.Metadata.get_instrument` to convert volt+bin to engineering units. + uses instrument metadata from `geomagio.metadata.instrument.InstrumentCalibrations.get_instrument_calibrations` to convert volt+bin to engineering units. uses `geomagio.algorithm.FilterAlgorithm` to filter from 10Hz to 1Hz. diff --git a/geomagio/Util.py b/geomagio/Util.py index 4f02a749d94c593535f67d34653c5b924ed3ab5a..dff464a09bf60fe9c1c38d03a448f1984687d3ed 100644 --- a/geomagio/Util.py +++ b/geomagio/Util.py @@ -2,6 +2,8 @@ import numpy import os from obspy.core import Stats, Trace from io import BytesIO +import json +import fcntl class ObjectView(object): @@ -199,3 +201,97 @@ def create_empty_trace(trace, channel): count = len(trace.data) numpy_data = numpy.full((count), numpy.nan) return Trace(numpy_data, stats) + + +def write_state_file(filename, data, directory=None): + """ + Writes data to a state file in a thread-safe manner. + + Parameters: + ----------- + filename: String + The name of the file to write to. + data: + The data to write to the file. This should be a Python object that can be serialized with json. + directory: String + The directory to write the file to. If not provided, the file will be written to the .cache directory in the current user's home directory. + + Returns: + -------- + None + + Raises: + ------- + IOError: If an I/O error occurs. + TypeError: If the data cannot be serialized to JSON. + """ + if directory is None: + directory = os.path.join(os.path.expanduser("~"), ".cache", "geomag-algorithms") + + # Create the directory if it doesn't exist + try: + os.makedirs(directory, exist_ok=True) + except OSError as e: + print(f"Error creating directory: {e}") + raise + + filepath = os.path.join(directory, filename) + + try: + with open(filepath, "w") as f: + try: + fcntl.flock(f, fcntl.LOCK_EX) + json.dump(data, f) + fcntl.flock(f, fcntl.LOCK_UN) + except IOError as e: + print(f"Error locking or writing to file: {e}") + raise + except TypeError as e: + print(f"Error serializing data to JSON: {e}") + raise + except IOError as e: + print(f"Error opening file: {e}") + raise + + +def read_state_file(filename, directory=None): + """ + Reads data from a state file in a thread-safe manner. + + Parameters: + filename: String + The name of the file to read from. + directory: String + The directory to read the file from. If not provided, the file will be read from the .cache directory in the current user's home directory. + + Returns: + -------- + data: Object + Python object that was deserialized from the json state file. + + Raises: + ------- + IOError: If an I/O error occurs. + json.JSONDecodeError: If the data cannot be deserialized from JSON. + """ + if directory is None: + directory = os.path.join(os.path.expanduser("~"), ".cache", "geomag-algorithms") + + filepath = os.path.join(directory, filename) + + try: + with open(filepath, "r") as f: + try: + fcntl.flock(f, fcntl.LOCK_SH) + data = json.load(f) + fcntl.flock(f, fcntl.LOCK_UN) + return data + except IOError as e: + print(f"Error locking or reading from file: {e}") + raise + except json.JSONDecodeError as e: + print(f"Error deserializing data from JSON: {e}") + raise + except IOError as e: + print(f"Error opening file: {e}") + raise diff --git a/geomagio/edge/MiniSeedFactory.py b/geomagio/edge/MiniSeedFactory.py index 5de50c1ab52ce5920152a9d8e9283e69a912fa8f..63bbffe860fb2034a13fc216320a0f6bd22d61d0 100644 --- a/geomagio/edge/MiniSeedFactory.py +++ b/geomagio/edge/MiniSeedFactory.py @@ -20,7 +20,7 @@ from obspy.core import Stats, Stream, Trace, UTCDateTime from .. import ChannelConverter, TimeseriesUtility from ..geomag_types import DataInterval, DataType -from ..Metadata import get_instrument +from ..metadata.instrument.InstrumentCalibrations import get_instrument_calibrations from ..TimeseriesFactory import TimeseriesFactory from ..TimeseriesFactoryException import TimeseriesFactoryException from ..ObservatoryMetadata import ObservatoryMetadata @@ -448,7 +448,7 @@ class MiniSeedFactory(TimeseriesFactory): to contain just a single Trace. """ out = Stream() - metadata = get_instrument(observatory, starttime, endtime) + metadata = get_instrument_calibrations(observatory, starttime, endtime) # loop in case request spans different configurations for entry in metadata: entry_endtime = entry["end_time"] diff --git a/geomagio/metadata/instrument/InstrumentCalibrations.py b/geomagio/metadata/instrument/InstrumentCalibrations.py new file mode 100644 index 0000000000000000000000000000000000000000..a78f2db1bbf935952368a3925d45c6066231c788 --- /dev/null +++ b/geomagio/metadata/instrument/InstrumentCalibrations.py @@ -0,0 +1,365 @@ +from obspy import UTCDateTime + +from requests.exceptions import JSONDecodeError, ConnectTimeout + +from geomagio.metadata import Metadata, MetadataFactory, MetadataCategory +from geomagio.Util import write_state_file, read_state_file + + +class InstrumentCalibrations: + """ + A class used to get calibrations from instrument metadata. + + ... + + Attributes + ---------- + metadata_list : list + a list of metadata objects to be processed + previous_calibration : dict + a dictionary to store the previous calibration values for each axis and key + station : str + the station code, checked for consistency across all metadata objects + location : str + the location code, checked for consistency across all metadata objects + network : str + the network code, checked for consistency across all metadata objects + + Methods + ------- + get_calibrations(): + Main method to gather applicable calibrations from the metadata list. + create_overlap_interval(current_metadata): + Creates an overlap interval from the current metadata. + update_overlap_interval(overlap_interval, current_metadata): + Updates the overlap interval with the current metadata. + update_overlap_interval_with_same_starttime(i, sorted_metadata_list, overlap_interval): + Updates the overlap interval with metadata that have the same starttime. + set_endtime(i, sorted_metadata_list, overlap_interval, current_metadata): + Sets the endtime for the overlap interval. + convert_to_calibration(overlap_metadata): + Converts overlapping metadata information to an applicable calibration. + get_channels(overlap_metadata): + Gets the channels for the applicable calibration. + """ + + def __init__(self, metadata_list): + """ + Constructs necessary attributes for the InstrumentCalibrations object and verify consistency of metadata_list. + + Parameters + ---------- + metadata_list : list + a list of metadata objects to be processed + """ + self.metadata_list = metadata_list + self.previous_calibration = { + f"{axis}_{key}": None + for axis in ["u", "v", "w"] + for key in ["constant", "bin", "offset"] + } + self.previous_calibration.update( + {"resistance": None, "temperature_scale": None, "type": None} + ) + + # Check for consistency in "station", "location", and "network" codes + stations = {metadata.station for metadata in metadata_list} + locations = {metadata.location for metadata in metadata_list} + networks = {metadata.network for metadata in metadata_list} + + if len(stations) > 1: # or len(locations) > 1: + raise ValueError( + f"Inconsistent 'station' codes in metadata_list: {stations}" + ) + else: + self.station = stations.pop() + self.location = locations.pop() + self.network = networks.pop() + + def get_calibrations(self): + """ + Main method to compile applicable calibrations from the metadata list. + + Parameters + ---------- + query_starttime : UTCDateTime + the query starttime + + Returns + ------- + list + a list of dictionaries, each representing an applicable calibration + """ + sorted_metadata_list = sorted( + self.metadata_list, + key=lambda x: x.starttime if x.starttime is not None else UTCDateTime(0), + ) + i = 0 + calibrations = [] + while i < len(sorted_metadata_list): + current_metadata = sorted_metadata_list[i] + overlap_interval = self.create_overlap_interval(current_metadata) + i = self.update_overlap_interval_with_same_starttime( + i, sorted_metadata_list, overlap_interval + ) + self.set_endtime( + i, sorted_metadata_list, overlap_interval, current_metadata + ) + calibration = self.convert_to_calibration(overlap_interval) + # if ( + # calibration["end_time"] >= query_starttime + # or calibration["end_time"] == None + # ): + calibrations.append(calibration) + i += 1 + return calibrations + + def create_overlap_interval(self, current_metadata): + """ + Creates an overlap interval from the current metadata. + + Parameters + ---------- + current_metadata : Metadata + the current metadata object + + Returns + ------- + dict + a dictionary representing an overlap interval + """ + overlap_interval = {"starttime": current_metadata.starttime} + self.update_overlap_interval(overlap_interval, current_metadata) + return overlap_interval + + def update_overlap_interval(self, overlap_interval, current_metadata): + """ + Updates the overlap interval with the current metadata. + + Parameters + ---------- + overlap_interval : dict + the overlap interval to be updated + current_metadata : Metadata + the current metadata object + """ + for axis in ["u", "v", "w"]: + for key in ["constant", "bin", "offset"]: + current_key = f"{axis}_{key}" + if current_key in current_metadata.metadata: + overlap_interval[current_key] = current_metadata.metadata[ + current_key + ] + self.previous_calibration[current_key] = current_metadata.metadata[ + current_key + ] + elif current_key in self.previous_calibration: + overlap_interval[current_key] = self.previous_calibration[ + current_key + ] + for key in ["resistance", "temperature_scale", "type"]: + if key in current_metadata.metadata: + overlap_interval[key] = current_metadata.metadata[key] + self.previous_calibration[key] = current_metadata.metadata[key] + elif key in self.previous_calibration: + overlap_interval[key] = self.previous_calibration[key] + + def update_overlap_interval_with_same_starttime( + self, i, sorted_metadata_list, overlap_interval + ): + """ + Updates the overlap interval with metadata that have the same starttime. + + Parameters + ---------- + i : int + the current index in the sorted metadata list + sorted_metadata_list : list + the sorted list of metadata objects + overlap_interval : dict + the overlap interval to be updated + + Returns + ------- + int + the updated index in the sorted metadata list + """ + while ( + i + 1 < len(sorted_metadata_list) + and sorted_metadata_list[i + 1].starttime + == sorted_metadata_list[i].starttime + ): + i += 1 + next_metadata = sorted_metadata_list[i] + self.update_overlap_interval(overlap_interval, next_metadata) + return i + + def set_endtime(self, i, sorted_metadata_list, overlap_interval, current_metadata): + """ + Sets the endtime for the overlap interval. + + Parameters + ---------- + i : int + the current index in the sorted metadata list + sorted_metadata_list : list + the sorted list of metadata objects + overlap_interval : dict + the overlap interval to be updated + current_metadata : Metadata + the current metadata object + """ + if ( + i + 1 < len(sorted_metadata_list) + and sorted_metadata_list[i + 1].starttime != current_metadata.starttime + ): + overlap_interval["endtime"] = sorted_metadata_list[i + 1].starttime + else: + overlap_interval["endtime"] = current_metadata.endtime + + def convert_to_calibration(self, overlap_metadata): + """ + Converts an overlap_metadata object to an applicable calibration. + + Parameters + ---------- + overlap_metadata : dict + the metadata overlap data to be converted + + Returns + ------- + dict + a dictionary representing an applicable calibration + """ + calibration = { + "network": self.network, + "station": self.station, + "location": self.location, + "start_time": overlap_metadata["starttime"], + "end_time": overlap_metadata["endtime"], + "instrument": { + "type": overlap_metadata["type"], + "channels": self.get_channels(overlap_metadata), + }, + } + return calibration + + def get_channels(self, overlap_metadata): + """ + Gets the channels for the applicable calibration. + + Parameters + ---------- + overlap_metadata : dict + the metadata overlap data from which to get the channels + + Returns + ------- + dict + a dictionary representing the channels for the applicable calibration + """ + channels = {} + for axis in ["U", "V", "W"]: + if ( + f"{axis.lower()}_bin" in overlap_metadata + and overlap_metadata[f"{axis.lower()}_bin"] is not None + ): + channels[axis] = [ + { + "channel": f"{axis}_Volt", + "offset": ( + overlap_metadata[f"{axis.lower()}_offset"] + if overlap_metadata[f"{axis.lower()}_offset"] is not None + else 0 + ), + "scale": overlap_metadata[f"{axis.lower()}_constant"], + }, + { + "channel": f"{axis}_Bin", + "offset": 0, + "scale": overlap_metadata[f"{axis.lower()}_bin"], + }, + ] + elif ( + "resistance" in overlap_metadata + and overlap_metadata["resistance"] is not None + ): + channels[axis] = [ + { + "channel": f"{axis}_Volt", + "offset": ( + overlap_metadata[f"{axis.lower()}_offset"] + if overlap_metadata[f"{axis.lower()}_offset"] is not None + else 0 + ), + "scale": overlap_metadata[f"{axis.lower()}_constant"] + / overlap_metadata["resistance"], + } + ] + return channels + + +def get_instrument_calibrations( + observatory, start_time=None, end_time=None, calibrations=None, metadata_url=None +): + """Get instrument metadata + + Args: + observatory: observatory code + start_time: start time to match, or None to match any. + end_time: end time to match, or None to match any. + calibrations: use custom list, defaults to pulling and converting instrument metadata + metadata_url: metadata database url + Returns: + list of applicable instrument calibrations + """ + + if not calibrations: + state_filename = f"{observatory}_instrument_cals.json" + metadata = [] + + factory = MetadataFactory( + url=metadata_url or "https://staging-geomag.cr.usgs.gov/ws/secure/metadata", + ) + query = Metadata( + category=MetadataCategory.INSTRUMENT, + starttime=start_time, + endtime=end_time, + station=observatory, + data_valid=True, + ) + try: + metadata = factory.get_metadata(query=query) + except: + print( + "Warning: An error occurred while trying to pull metadata from the metadata server!" + ) + + if not metadata: + print( + f"Warning: No valid metadata returned for {observatory} for time interval: {start_time} - {end_time}" + ) + else: + try: + instrumentCalibrations = InstrumentCalibrations(metadata) + calibrations = instrumentCalibrations.get_calibrations() + except ValueError as e: + print(e) + + if not calibrations: + print( + f"Warning: No valid calibrations generated for {observatory} for time interval: {start_time} - {end_time}" + ) + print(f"Pulling calibrations for {observatory} from state file.") + + calibrations = read_state_file(filename=state_filename) + else: + write_state_file(state_filename, calibrations) + + return [ + c + for c in calibrations + if c["station"] == observatory + and (end_time is None or c["start_time"] is None or c["start_time"] < end_time) + and (start_time is None or c["end_time"] is None or c["end_time"] > start_time) + ] diff --git a/geomagio/metadata/instrument/__init__.py b/geomagio/metadata/instrument/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..c470473d1dcbd0aab8e8985c965267b10585d82f --- /dev/null +++ b/geomagio/metadata/instrument/__init__.py @@ -0,0 +1,6 @@ +from .InstrumentCalibrations import InstrumentCalibrations + + +__all__ = [ + "InstrumentCalibrations", +] diff --git a/test/Metadata_test.py b/test/Metadata_test.py index 01e5e417144c6b3ac4c1a9557dcea7b07934a8ae..141a657c16e8046f9e4df22274891c538d45df26 100644 --- a/test/Metadata_test.py +++ b/test/Metadata_test.py @@ -1,5 +1,7 @@ from obspy import UTCDateTime -from geomagio.Metadata import get_instrument +from geomagio.metadata.instrument.InstrumentCalibrations import ( + get_instrument_calibrations, +) from numpy.testing import assert_equal @@ -26,7 +28,7 @@ TEST_METADATA = [METADATA1, METADATA2, METADATA3] def test_get_instrument_after(): """Request an interval after the last entry, that has start_time None""" - matches = get_instrument( + matches = get_instrument_calibrations( "TST", UTCDateTime("2021-02-02T00:00:00Z"), UTCDateTime("2022-01-02T00:00:00Z"), @@ -37,7 +39,7 @@ def test_get_instrument_after(): def test_get_instrument_before(): """Request an interval before the first entry, that has start_time None""" - matches = get_instrument( + matches = get_instrument_calibrations( "TST", UTCDateTime("2019-02-02T00:00:00Z"), UTCDateTime("2020-01-02T00:00:00Z"), @@ -48,7 +50,7 @@ def test_get_instrument_before(): def test_get_instrument_inside(): """Request an interval that is wholly contained by one entry""" - matches = get_instrument( + matches = get_instrument_calibrations( "TST", UTCDateTime("2020-02-02T01:00:00Z"), UTCDateTime("2020-02-02T02:00:00Z"), @@ -59,7 +61,7 @@ def test_get_instrument_inside(): def test_get_instrument_span(): """Request a time interval that spans multiple entries""" - matches = get_instrument( + matches = get_instrument_calibrations( "TST", UTCDateTime("2020-01-02T00:00:00Z"), UTCDateTime("2020-02-02T01:00:00Z"), @@ -70,7 +72,7 @@ def test_get_instrument_span(): def test_get_instrument_unknown(): """Request an unknown observatory""" - matches = get_instrument( + matches = get_instrument_calibrations( "OTHER", UTCDateTime("2020-01-02T00:00:00Z"), UTCDateTime("2020-02-02T01:00:00Z"), diff --git a/test/edge_test/MiniSeedFactory_test.py b/test/edge_test/MiniSeedFactory_test.py index 3b8ced3b0418d7454da62d0530f83290dd31f890..75f895e5c8a7a2bb83d0bdbe9215076a52095635 100644 --- a/test/edge_test/MiniSeedFactory_test.py +++ b/test/edge_test/MiniSeedFactory_test.py @@ -10,7 +10,9 @@ import pytest from geomagio import TimeseriesUtility from geomagio.edge import MiniSeedFactory, MiniSeedInputClient -from geomagio.Metadata import get_instrument +from geomagio.metadata.instrument.InstrumentCalibrations import ( + get_instrument_calibrations, +) from .mseed_test_clients import MockMiniSeedClient, MisalignedMiniSeedClient @@ -44,7 +46,7 @@ def misaligned_miniseed_factory() -> MiniSeedFactory: @pytest.fixture() def bou_u_metadata(): - metadata = get_instrument(observatory="BOU") + metadata = get_instrument_calibrations(observatory="BOU") instrument = metadata[0]["instrument"] channels = instrument["channels"] yield channels["U"]