diff --git a/README.md b/README.md index 9de81eed3331cd64d729eca0df52c30e79c3af90..aef477d8c713508256cf240e45ca93cc08aad190 100644 --- a/README.md +++ b/README.md @@ -4,11 +4,10 @@ [](https://code.usgs.gov/ghsc/geomag/geomag-algorithms/-/commits/master) Geomag Algorithms is an open source library for processing -Geomagnetic timeseries data. It includes algorithms and input/output factories -used by the [USGS Geomagnetism Program](http://geomag.usgs.gov) to -translate between data formats, -generate derived data and indices in near-realtime, -and research and develop new algorithms. +Geomagnetic timeseries data. It includes algorithms and input/output +factories used by the [USGS Geomagnetism Program](http://geomag.usgs.gov) to +translate between data formats, generate derived data and indices in +near-realtime, and research and develop new algorithms. - Accesses USGS data services. - Built using established open source python libraries diff --git a/geomagio/Controller.py b/geomagio/Controller.py index 564b54c99350baf8a92a7c31fa97c52c777dfbcb..236618176bbaeed9d334b090ce2a0100fb0363d6 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -7,7 +7,6 @@ from typing import List, Optional, Tuple, Union from obspy.core import Stream, UTCDateTime - from .algorithm import Algorithm, algorithms, AlgorithmException, FilterAlgorithm from .DerivedTimeseriesFactory import DerivedTimeseriesFactory from .PlotTimeseriesFactory import PlotTimeseriesFactory @@ -27,6 +26,7 @@ from . import vbf from . import xml from . import covjson from . import netcdf +from . import imagcdf class Controller(object): @@ -503,6 +503,10 @@ def get_input_factory(args): if args.input_file is not None: if input_type in ["netcdf", "miniseed"]: input_stream = open(args.input_file, "rb") + elif input_type in ["imagcdf"]: + input_factory_args["inputFile"] = ( + args.input_file + ) # imagcdf file is binary but lib used accepts a file path else: input_stream = open(args.input_file, "r") elif args.input_stdin: @@ -569,6 +573,9 @@ def get_input_factory(args): input_factory = xml.XMLFactory(**input_factory_args) elif input_type == "covjson": input_factory = covjson.CovJSONFactory(**input_factory_args) + elif input_type == "imagcdf": + input_factory_args["output"] = args.output + input_factory = imagcdf.ImagCDFFactory(**input_factory_args) # wrap stream if input_stream is not None: input_factory = StreamTimeseriesFactory( @@ -660,6 +667,8 @@ def get_output_factory(args): ) elif output_type == "xml": output_factory = xml.XMLFactory(**output_factory_args) + elif output_type == "imagcdf": + output_factory = imagcdf.ImagCDFFactory(**output_factory_args) # wrap stream if output_stream is not None: output_factory = StreamTimeseriesFactory( @@ -833,6 +842,7 @@ def parse_args(args): "xml", "covjson", "netcdf", + "imagcdf", ), default="edge", help='Input format (Default "edge")', @@ -1040,6 +1050,7 @@ def parse_args(args): "xml", "covjson", "netcdf", + "imagcdf", ), # TODO: set default to 'iaga2002' help="Output format", diff --git a/geomagio/api/ws/Element.py b/geomagio/api/ws/Element.py index ae5c4ead9ebd8420c7f0abc0af7211c9cc137417..e5ab3e623e3d38ce1615b64b97d7c3703d5b43ba 100644 --- a/geomagio/api/ws/Element.py +++ b/geomagio/api/ws/Element.py @@ -45,3 +45,8 @@ ELEMENTS = [ ] ELEMENT_INDEX = {e.id: e for e in ELEMENTS} + + +TEMPERATURE_ELEMENTS_ID = [ + element.id for element in ELEMENTS if "Temperature" in element.name +] diff --git a/geomagio/api/ws/data.py b/geomagio/api/ws/data.py index 1eb735fd54c39d89f7ec75a38dc8725b5cff09b4..c2bc92274c2fca1a9df5b914f81f63fb362ec65f 100644 --- a/geomagio/api/ws/data.py +++ b/geomagio/api/ws/data.py @@ -5,7 +5,8 @@ from fastapi import APIRouter, Depends, Query, Request from obspy import UTCDateTime, Stream from starlette.responses import Response -from ... import DerivedTimeseriesFactory, TimeseriesFactory, TimeseriesUtility +from ... import TimeseriesFactory, TimeseriesUtility +from ...DerivedTimeseriesFactory import DerivedTimeseriesFactory from ...edge import EdgeFactory, FDSNFactory, MiniSeedFactory from ...iaga2002 import IAGA2002Writer from ...imfjson import IMFJSONWriter diff --git a/geomagio/imagcdf/IMCDFPublication.py b/geomagio/imagcdf/IMCDFPublication.py new file mode 100644 index 0000000000000000000000000000000000000000..36f66be3281f528eb89264f1f51700b349da3a4b --- /dev/null +++ b/geomagio/imagcdf/IMCDFPublication.py @@ -0,0 +1,108 @@ +from typing import Optional, Union + + +class IMCDFPublicationLevel: + """Handles publication levels and mapping between data types and levels. + + The ImagCDF format uses publication levels to describe the processing + level of the data. This class maps data types (e.g., 'variation', 'definitive') + to their corresponding publication levels as defined in the ImagCDF documentation. + + Publication Levels: + 1: Raw data with no processing. + 2: Edited data with preliminary baselines. + 3: Data suitable for initial bulletins or quasi-definitive publication. + 4: Definitive data with no further changes expected. + + Reference: + - ImagCDF Technical Documentation: Attributes that Uniquely Identify the Data + """ + + class PublicationLevel: + LEVEL_1 = "1" + LEVEL_2 = "2" + LEVEL_3 = "3" + LEVEL_4 = "4" + + TYPE_TO_LEVEL = { + "none": PublicationLevel.LEVEL_1, + "variation": PublicationLevel.LEVEL_1, + "reported": PublicationLevel.LEVEL_1, + "provisional": PublicationLevel.LEVEL_2, + "adjusted": PublicationLevel.LEVEL_2, + "quasi-definitive": PublicationLevel.LEVEL_3, + "quasidefinitive": PublicationLevel.LEVEL_3, + "definitive": PublicationLevel.LEVEL_4, + } + + def __init__(self, value: Union[str, int]): + """Initialize with a data type or publication level to determine the publication level. + + Args: + value (Union[str, int]): The data type (str) or publication level (int). + + Raises: + ValueError: If the value is not provided or is unsupported. + """ + if isinstance(value, int) or (isinstance(value, str) and value.isdigit()): + self.level = str(value) + if self.level not in self.PublicationLevel.__dict__.values(): + raise ValueError(f"Unsupported level: {value}") + elif isinstance(value, str): + self.level = self.TYPE_TO_LEVEL.get(value.lower()) + if not self.level: + raise ValueError(f"Unsupported data_type: {value}") + else: + raise ValueError( + "value must be a string or an integer representing data_type or level." + ) + + def get_level(self) -> str: + """Return the publication level as a string. + + Returns: + str: The publication level. + """ + return self.level + + def get_imf_data_type(self, long_form: Optional[bool] = True) -> str: + """Get the IMF data type based on the publication level. + + Args: + long_form (bool): If True, return the full description; otherwise, return the abbreviated form. + + Returns: + str: The IMF data type. + + Reference: + https://tech-man.intermagnet.org/latest/appendices/dataformats.html#intermagnet-satellite-transmission-format-imfv2-83 + """ + if self.level == self.PublicationLevel.LEVEL_4: + return "Definitive" if long_form else "D" + elif self.level == self.PublicationLevel.LEVEL_3: + return "Quasi-definitive" if long_form else "Q" + elif self.level == self.PublicationLevel.LEVEL_2: + return "Adjusted" if long_form else "A" + else: + return "Reported" if long_form else "R" + + def get_iaga2002_data_type(self, long_form: Optional[bool] = True) -> str: + """Get the IAGA-2002 data type based on the publication level. + + Args: + long_form (bool): If True, return the full description; otherwise, return the abbreviated form. + + Returns: + str: The IAGA-2002 data type. + + Reference: + https://tech-man.intermagnet.org/latest/appendices/dataformats.html#iaga2002-intermagnet-exchange-format-spreadsheet-compatible + """ + if self.level == self.PublicationLevel.LEVEL_4: + return "Definitive" if long_form else "D" + elif self.level == self.PublicationLevel.LEVEL_3: + return "Quasi-definitive" if long_form else "Q" + elif self.level == self.PublicationLevel.LEVEL_2: + return "Provisional" if long_form else "P" + else: + return "Variation" if long_form else "V" diff --git a/geomagio/imagcdf/ImagCDFFactory.py b/geomagio/imagcdf/ImagCDFFactory.py new file mode 100644 index 0000000000000000000000000000000000000000..b68f03d5e28beb573dc83d1c9a29f88801c2e76a --- /dev/null +++ b/geomagio/imagcdf/ImagCDFFactory.py @@ -0,0 +1,870 @@ +"""ImagCDFFactory Implementation Using cdflib + +This module provides the ImagCDFFactory class for creating and writing +geomagnetic time series data in the ImagCDF format using the cdflib library. +The ImagCDF format is based on NASA's Common Data Format (CDF) and is designed +to store geomagnetic data with high precision. + +References: +- ImagCDF Technical Note: https://intermagnet.org/docs/technical/im_tn_8_ImagCDF.pdf +- ImagCDF Official Documentation: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdfv1-2-intermagnet-exchange-format +- CDF User Guide: https://spdf.gsfc.nasa.gov/pub/software/cdf/doc/cdf_User_Guide.pdf +- CDFLib Docs: [https://cdflib.readthedocs.io/en/latest/#, https://cdflib.readthedocs.io/en/stable/modules/index.html] +""" + +import os +import sys +from typing import List, Optional +from datetime import datetime, timezone +import numpy as np +from obspy import Stream, Trace, UTCDateTime + +from geomagio.TimeseriesFactory import TimeseriesFactory +from geomagio.api.ws.Element import TEMPERATURE_ELEMENTS_ID +from ..geomag_types import DataInterval, DataType +from ..TimeseriesFactoryException import TimeseriesFactoryException +from .. import Util +from cdflib import cdfepoch +from cdflib.cdfwrite import CDF as CDFWriter +from cdflib.cdfread import CDF as CDFReader +import tempfile +from .IMCDFPublication import IMCDFPublicationLevel + + +class ImagCDFFactory(TimeseriesFactory): + """Factory for creating and writing ImagCDF formatted CDF files. + + This class extends the TimeseriesFactory to support writing geomagnetic + time series data to files in the ImagCDF format using the cdflib library. + """ + + isUniqueTimes = True # used to determine depend_0 and CDF Time Variable Name + NONSTANDARD_ELEMENTS = ( + TEMPERATURE_ELEMENTS_ID # for using non standard elements such as UK1 + ) + __FILL_VALUE = 99_999 + __MAX_COMPRESSION = 9 + + def __init__( + self, + observatory: Optional[str] = None, + channels: List[str] = ("H", "D", "Z", "F"), + type: DataType = "variation", + interval: DataInterval = "minute", + urlTemplate="file://{obs}_{dt}_{t}.cdf", + inputFile: Optional[str] = None, + output: Optional[str] = "iaga2002", + urlInterval: int = -1, + ): + """ + Initialize the ImagCDFFactory with default parameters. + + Parameters: + - observatory: IAGA code of the observatory (e.g., 'BOU'). + - channels: List of geomagnetic elements (e.g., ['H', 'D', 'Z', 'F']). + - type: Data type indicating the processing level (e.g., 'variation', 'definitive'). + - interval: Data interval (e.g., 'minute', 'second'). + - urlTemplate: Template for generating file URLs or paths. + - urlInterval: Interval size for splitting data into multiple files. + - inputFile: An ImagCDF file to read data from. If not provided urlTemplate is assumed path for reads. + """ + super().__init__( + observatory=observatory, + channels=channels, + type=type, + interval=interval, + urlTemplate=urlTemplate, + urlInterval=urlInterval, + ) + self.inputFile = inputFile + self.output = ( + output if output in ["iaga2002", "imfjson"] else "iaga2002" + ) # determines mapping of publication level to data type + + def write_file(self, fh, timeseries: Stream, channels: List[str]): + # Create a temporary file to write the CDF data + with tempfile.NamedTemporaryFile(delete=False, suffix=".cdf") as tmp_file: + tmp_file_path = tmp_file.name + + try: + # Initialize the CDF writer + cdf_spec = { + "Compressed": self.__MAX_COMPRESSION, # Max Gzip compression (1-9). Almost always the GZIP is the best choice for all data. (CDF User Guide p.23 1.4.3) + "Majority": CDFWriter.ROW_MAJOR, # The first dimension changes the slowest (CDF User Guide p.45 2.3.15 Majority) + "Encoding": CDFWriter.NETWORK_ENCODING, # XDR Encoding -Portable between two or more different types of computers. + "Checksum": True, # True for Data Integrity. False for faster writes (optional) + "rDim_sizes": [], # Applicable only if using rVariables - CDF protocol recommends only using zVariables. + } + + cdf_writer = CDFWriter(path=tmp_file_path, cdf_spec=cdf_spec, delete=True) + + # Write global attributes + global_attrs = self._create_global_attributes(timeseries, channels) + cdf_writer.write_globalattrs(global_attrs) + + # Time variables + time_vars = self._create_time_stamp_variables( + timeseries + ) # modifies self.isUniqueTimes + for ts_name, ts_data in time_vars.items(): + # Define time variable specification + var_spec = { + "Variable": ts_name, + "Data_Type": CDFWriter.CDF_TIME_TT2000, # CDF_TIME_TT2000 + "Num_Elements": 1, + "Rec_Vary": True, + "Var_Type": "zVariable", + "Dim_Sizes": [], + "Sparse": "no_sparse", # no_sparse because there should not be time gaps. (Time stamps must represent a regular time series with no missing values in the series) + "Compress": self.__MAX_COMPRESSION, + "Pad": None, + } + # Define time variable attributes + var_attrs = ( + {} + ) # ImagCDF does not require or recommend any 'time' variable attributes. + + # Write time variable + cdf_writer.write_var(var_spec, var_attrs, ts_data) + + # Data variables + temperature_index = 0 + for trace in timeseries: + channel = trace.stats.channel + if channels and channel not in channels: + continue + var_name = f"GeomagneticField{channel}" + # if channel in REAL_TEMPERATURE: + # temperature_index += 1 # MUST INCREMENT INDEX BEFORE USING + # var_name = f"Temperature{temperature_index}" + data_type = ( + CDFWriter.CDF_INT4 + if trace.data.dtype in [np.int32, np.int64] + else CDFWriter.CDF_DOUBLE + ) + var_spec = { + "Variable": var_name, + "Data_Type": data_type, + "Num_Elements": 1, + "Rec_Vary": True, + "Var_Type": "zVariable", + "Dim_Sizes": [], + "Sparse": "no_sparse", + "Compress": self.__MAX_COMPRESSION, + "Pad": None, + } + var_attrs = self._create_var_attrs( + trace, temperature_index, self.isUniqueTimes + ) + + # Write data variable + cdf_writer.write_var(var_spec, var_attrs, trace.data) + + # Close the CDF writer + cdf_writer.close() + + # Copy the temporary CDF file to the final file handle + with open(tmp_file_path, "rb") as tmp: + cdf_data = tmp.read() + fh.write(cdf_data) + + finally: + # Cleanup the temporary file + os.remove(tmp_file_path) + + def put_timeseries( + self, + timeseries: Stream, + starttime: Optional[UTCDateTime] = None, + endtime: Optional[UTCDateTime] = None, + channels: Optional[List[str]] = None, + type: Optional[DataType] = None, + interval: Optional[DataInterval] = None, + ): + """ + Store timeseries data in ImagCDF format using cdflib. + + This method writes the timeseries data to one or more files, depending + on the specified urlInterval. + + Parameters: + - timeseries: ObsPy Stream containing the geomagnetic data. + - starttime: Start time of the data to be written. + - endtime: End time of the data to be written. + - channels: List of channels to include in the output file. + - type: Data type indicating the processing level. + - interval: Data interval of the data. + """ + if len(timeseries) == 0: + # No data to store + return + + channels = channels or self.channels + type = type or self.type + interval = interval or self.interval + + # Extract metadata from the first trace + stats = timeseries[0].stats + observatory = stats.station + starttime = starttime or stats.starttime + endtime = endtime or stats.endtime + # Split data into intervals if necessary + + urlIntervals = Util.get_intervals( + starttime=starttime, endtime=endtime, size=self.urlInterval + ) + for urlInterval in urlIntervals: + interval_start = urlInterval["start"] + interval_end = urlInterval["end"] + url = self._get_url( + observatory=observatory, + date=interval_start, + type=type, + interval=interval, + channels=channels, + ) + + # Handle 'stdout' output + if url == "stdout": + # Write directly to stdout + fh = sys.stdout.buffer + url_data = timeseries.slice( + starttime=interval_start, + endtime=interval_end, + ) + self.write_file(fh, url_data, channels) + continue # Proceed to next interval if any + + # Handle 'file://' output + elif url.startswith("file://"): + # Get the file path from the URL + url_file = Util.get_file_from_url(url, createParentDirectory=True) + url_data = timeseries.slice( + starttime=interval_start, + endtime=interval_end, + ) + # Check if the file already exists + if os.path.isfile(url_file): + raise TimeseriesFactoryException( + f"Error: File '{url_file}' already exists." + ) + # Pad the data to ensure it fits the interval + url_data.trim( + starttime=interval_start, + endtime=interval_end, + nearest_sample=True, + pad=True, + fill_value=self.__FILL_VALUE, + ) + + # Write the data to the CDF file + with open(url_file, "wb") as fh: + self.write_file(fh, url_data, channels) + + else: + # Unsupported URL scheme encountered + raise TimeseriesFactoryException( + "Unsupported URL scheme in urlTemplate" + ) + + def get_timeseries( + self, + starttime: UTCDateTime, + endtime: UTCDateTime, + add_empty_channels: bool = True, + observatory: Optional[str] = None, + channels: Optional[List[str]] = None, + type: Optional[DataType] = None, + interval: Optional[DataInterval] = None, + ) -> Stream: + observatory = observatory or self.observatory + channels = channels or self.channels + type = type or self.type + interval = interval or self.interval + + timeseries = Stream() + urlIntervals = Util.get_intervals( + starttime=starttime, endtime=endtime, size=self.urlInterval + ) + + for urlInterval in urlIntervals: + if self.inputFile is None: + url = self._get_url( + observatory=observatory, + date=urlInterval["start"], + type=type, + interval=interval, + channels=channels, + ) + if url == "stdout": + continue # stdout is not a valid input source + if not url.startswith("file://"): + raise TimeseriesFactoryException( + "Only file urls are supported for reading ImagCDF" + ) + url_file = Util.get_file_from_url(url, createParentDirectory=False) + if not os.path.isfile(url_file): + # If file doesn't exist, skip + continue + else: + url_file = self.inputFile + try: + # Read CDF data and merge + cdf = CDFReader(url_file) + timeseries = self._read_cdf(cdf, channels) + except Exception as e: + print(f"Error reading CDF file '{url_file}': {e}", file=sys.stderr) + + # After reading all intervals, merge and trim + timeseries.merge(fill_value=self.__FILL_VALUE) + timeseries.trim( + starttime=starttime, + endtime=endtime, + nearest_sample=True, + pad=True, + fill_value=self.__FILL_VALUE, + ) + + # If requested, add empty channels not present in data + if add_empty_channels: + present_channels = {tr.stats.channel for tr in timeseries} + missing_channels = set(channels) - present_channels + for ch in missing_channels: + empty_trace = self._get_empty_trace( + starttime=starttime, + endtime=endtime, + observatory=observatory, + channel=ch, + data_type=type, + interval=interval, + ) + timeseries += empty_trace + + timeseries.sort() + return timeseries + + def _create_global_attributes( + self, timeseries: Stream, channels: List[str] + ) -> dict: + """ + Create a dictionary of global attributes for the ImagCDF file. + + These attributes apply to all the data in the file and include metadata + such as observatory information, data publication level, and format + descriptions. + + References: + - ImagCDF Technical Documentation: ImagCDF Global Attributes + """ + stats = timeseries[0].stats if len(timeseries) > 0 else None + + # Extract metadata from stats or fallback to defaults + observatory_name = ( + getattr(stats, "station_name", None) or self.observatory or "" + ) + station = getattr(stats, "station", None) or "" + network = getattr(stats, "network", None) or "" + institution = getattr(stats, "agency_name", None) or "" + latitude = getattr(stats, "geodetic_latitude", None) or 0.0 + longitude = getattr(stats, "geodetic_longitude", None) or 0.0 + elevation = getattr(stats, "elevation", None) or self.__FILL_VALUE + conditions_of_use = getattr(stats, "conditions_of_use", None) or "" + vector_orientation = getattr(stats, "sensor_orientation", None) or "" + data_type = getattr(stats, "data_type", None) or "variation" + sensor_sampling_rate = getattr(stats, "sensor_sampling_rate", None) or 0.0 + comments = getattr(stats, "filter_comments", None) or [""] + declination_base = getattr(stats, "declination_base", None) or 0.0 + publication_level = IMCDFPublicationLevel(value=self.type).get_level() + global_attrs = { + "FormatDescription": {0: "INTERMAGNET CDF Format"}, + "FormatVersion": {0: "1.2"}, + "Title": {0: "Geomagnetic time series data"}, + "IagaCode": {0: station}, + "ElementsRecorded": {0: "".join(channels)}, + "PublicationLevel": {0: publication_level}, + "PublicationDate": { + 0: [ + cdfepoch.timestamp_to_tt2000( + datetime.timestamp(datetime.now(timezone.utc)) + ), + "cdf_time_tt2000", + ] + }, + "ObservatoryName": {0: observatory_name}, + "Latitude": {0: np.array([latitude], dtype=np.float64)}, + "Longitude": {0: np.array([longitude], dtype=np.float64)}, + "Elevation": {0: np.array([elevation], dtype=np.float64)}, + "Institution": {0: institution}, + "VectorSensOrient": {0: vector_orientation}, + "StandardLevel": {0: "None"}, + # Can Omit 'StandardName', 'StandardVersion', 'PartialStandDesc' when StandardLevel=None + "Source": { + 0: "institute" + }, # "institute" - if the named institution provided the data, “INTERMAGNET†- if the data file has been created by INTERMAGNET from another data source, “WDC†- if the World Data Centre has created the file from another data source + "TermsOfUse": {0: conditions_of_use}, + # [[Optional Attributes]] + # 'UniqueIdentifier': {0: ''}, + # 'ParentIdentifiers': {0: ''}, + # 'ReferenceLinks': {0: ''}, #links to /ws, plots, USGS.gov + # [[Custom Attributes]] + "SensorSamplingRate": {0: sensor_sampling_rate}, + "DataType": {0: data_type}, + "Comments": {0: comments}, + "DeclinationBase": {0: declination_base}, + "Network": {0: network}, + } + return global_attrs + + def _create_time_stamp_variables(self, timeseries: Stream) -> dict: + vector_times = None + scalar_times = None + nonstandard_times = None + temperature_times = {} + temperature_index = 1 + channels = [] + for trace in timeseries: + channel = trace.stats.channel + channels.append(channel) + times = [ + (trace.stats.starttime + trace.stats.delta * i).datetime.replace( + tzinfo=timezone.utc + ) + for i in range(trace.stats.npts) + ] + # Convert timestamps to TT2000 format required by CDF + tt2000_times = cdfepoch.timestamp_to_tt2000( + [time.timestamp() for time in times] + ) + + if channel in self._get_vector_elements(): + if vector_times is None: + vector_times = tt2000_times + else: + if not np.array_equal(vector_times, tt2000_times): + raise ValueError( + "Time stamps for vector channels are not the same." + ) + elif channel in self._get_scalar_elements(): + if scalar_times is None: + scalar_times = tt2000_times + else: + if not np.array_equal(scalar_times, tt2000_times): + raise ValueError( + "Time stamps for scalar channels are not the same." + ) + # elif channel in REAL_TEMPERATURES: + # ts_key = f"Temperature{temperature_index}Times" + # if ts_key not in temperature_times: + # temperature_times[ts_key] = tt2000_times + # temperature_index += 1 + # else: + # temperature_times[ts_key] = tt2000_times + elif channel in self.NONSTANDARD_ELEMENTS: + if scalar_times is None: + nonstandard_times = tt2000_times + else: + if not np.array_equal(scalar_times, tt2000_times): + raise ValueError( + "Time stamps for nonstandard channels are not the same." + ) + time_vars = {} + if vector_times is not None: + time_vars["GeomagneticVectorTimes"] = vector_times + if scalar_times is not None: + time_vars["GeomagneticScalarTimes"] = scalar_times + if temperature_times: + time_vars.update(temperature_times) + + isNonStandard = nonstandard_times is not None + if isNonStandard: + time_vars["NonstandardTimes"] = nonstandard_times + + last_times = [] + + self.isUniqueTimes = ( + len(channels) == 1 + ) # (If a single channel, then use specific naming i.e GeomagneticVectorTimes) + for index, times in enumerate(time_vars.values()): + if index > 0: + self.isUniqueTimes = not np.array_equal(last_times, times) + last_times = times + if self.isUniqueTimes and isNonStandard and len(time_vars) > 2: + raise ValueError( + f"Time stamps must be the same for all channels when using nonstandard channels: {','.join(self.NONSTANDARD_ELEMENTS)}" + ) + return ( + {"DataTimes": last_times} + if isNonStandard or not self.isUniqueTimes + else time_vars + ) + + def _create_var_attrs( + self, + trace: Trace, + temperature_index: Optional[int] = None, + isUniqueTimes: Optional[bool] = True, + ) -> dict: + channel = trace.stats.channel.upper() + fieldnam = f"Geomagnetic Field Element {channel}" # “Geomagnetic Field Element †+ the element code or “Temperature †+ the name of the location where the temperature was recorded. + units = "" # Should be one of “nTâ€, “Degrees of arc†or “Celsius†to adhere to standard. + depend_0 = "" + if channel == "D": + units = "Degrees of arc" + validmin = -360.0 + validmax = 360.0 # A full circle representation + elif channel == "I": + units = "Degrees of arc" + validmin = -90.0 + validmax = 90.0 # The magnetic field vector can point straight down (+90°), horizontal (0°), or straight up (-90°). + elif channel in TEMPERATURE_ELEMENTS_ID: + units = "Celsius" + validmin = -273.15 # absolute zero + validmax = 79_999 + depend_0 = "DataTimes" # can be used for nonstandard element + # elif channel in [REAL_TEMPERATURES]: + # units = "Celsius" + # fieldnam = f"Temperature {temperature_index} {trace.stats.location}" + # validmin = -273.15 # absolute zero + # validmax = 79_999 + elif channel in ["F", "S"]: + units = "nT" + validmin = ( + 0.0 # negative magnetic field intestity not physically meaningful. + ) + validmax = 79_999.0 + elif channel in ["X", "Y", "Z", "H", "E", "V", "G"]: + units = "nT" + validmin = -79_999.0 + validmax = 79_999.0 + else: + units = "Unknown" # unknown or possibly assume to be nT + validmin = -79_999 # largest valid min for all other elements + validmax = 79_999 # largest valid max for all other elements + depend_0 = "DataTimes" # can be used for nonstandard element + + # Determine DEPEND_0 based on channel type if necessary + if not depend_0: + if not isUniqueTimes: + depend_0 = "DataTimes" + elif channel in self._get_vector_elements(): + depend_0 = "GeomagneticVectorTimes" + elif channel in self._get_scalar_elements(): + depend_0 = "GeomagneticScalarTimes" + # elif channel in REAL_TEMPERATURES: + # depend_0 = f"Temperature{temperature_index}Times" + + var_attrs = { + "FIELDNAM": fieldnam, + "UNITS": units, + "FILLVAL": self.__FILL_VALUE, + "VALIDMIN": validmin, + "VALIDMAX": validmax, + "DEPEND_0": depend_0, + "DISPLAY_TYPE": "time_series", + "LABLAXIS": channel, + # [[Custom Attributes]] + "DATA_INTERVAL_TYPE": trace.stats.data_interval_type, + } + return var_attrs + + def _read_cdf(self, cdf: CDFReader, channels: Optional[List[str]]) -> Stream: + """ + Read CDF data into an ObsPy Stream. + + This method reads the data variables and their corresponding time + variables from a CDF file and constructs an ObsPy Stream. + + Parameters: + - cdf: cdflib CDF object representing the open CDF file. + + Returns: + - An ObsPy Stream containing the data from the CDF file. + """ + stream = Stream() + + # Extract global attributes + global_attrs = cdf.globalattsget() + + # Define required global attributes based on ImagCDF standards + required_global_attrs = [ + "FormatDescription", + "FormatVersion", + "Title", + "IagaCode", + "ElementsRecorded", + "PublicationLevel", + "PublicationDate", + "ObservatoryName", + "Latitude", + "Longitude", + "Elevation", + "Institution", + "StandardLevel", + "Source", + ] + + # Check for missing required global attributes per ImagCDF techincal document + missing_global_attrs = [] + for attr in required_global_attrs: + if attr not in global_attrs: + missing_global_attrs.append(attr) + + if missing_global_attrs: + error_message = ( + f"The following required global attributes are missing from the CDF: " + f"{', '.join(missing_global_attrs)}" + ) + raise TimeseriesFactoryException(error_message) + + # Map global attributes to Stream-level metadata + observatory = global_attrs.get("IagaCode", [""])[0] + station_name = global_attrs.get("ObservatoryName", [""])[0] + institution = global_attrs.get("Institution", [""])[0] + latitude = global_attrs.get("Latitude", [0.0])[0] + longitude = global_attrs.get("Longitude", [0.0])[0] + elevation = global_attrs.get("Elevation", [self.__FILL_VALUE])[ + 0 + ] # default to 99_999 per technical documents. + sensor_orientation = global_attrs.get("VectorSensOrient", [""])[0] + publication_level = global_attrs.get("PublicationLevel", ["0"])[0] + publication = IMCDFPublicationLevel(publication_level) + terms_of_use = global_attrs.get("TermsOfUse", [""])[0] + + # Not expected in ImagCDF + data_type = ( + publication.get_imf_data_type() + if "imf" in self.output + else publication.get_iaga2002_data_type() + ) + + sensor_sampling_rate = global_attrs.get("SensorSamplingRate", [None])[0] + comments = global_attrs.get("Comments", []) # keep comments as an array + declination_base = global_attrs.get("DeclinationBase", [None])[0] + network = global_attrs.get("Network", [None])[0] + + # Identify all time variables + time_vars = {} + for var in cdf.cdf_info().zVariables: + if var.endswith("Times"): + time_data = cdf.varget(var) + unix_times = cdfepoch.unixtime(time_data) + utc_times = [UTCDateTime(t) for t in unix_times] + time_vars[var] = utc_times + + # Read data variables and associate them with time variables + for var in cdf.cdf_info().zVariables: + + # Skip time variables + if var.endswith("Times"): + continue + + # Map the variable name back to a standard channel code by removing known prefixes + # Names are like GeomagneticFieldH, GeomagneticFieldD, Temperature1, Temperature2, ... + if var.startswith("GeomagneticField"): + channel = var.replace("GeomagneticField", "") + # elif var.startswith("Temperature"): + # # Temperature variables may not map directly to a geomagnetic channel + # # but to temperature sensors. We can just use the label from LABLAXIS if needed + # channel = attrs.get("LABLAXIS", var) + else: + # fallback if naming doesn't match expected patterns + channel = var + + if channels and channel not in channels: + continue + data = cdf.varget(var) + attrs = cdf.varattsget(var) + + # Determine DEPEND_0 (the time variable name) and validate + ts_name = attrs.get("DEPEND_0") + + # The ImagCDF can have DataTimes, GeomagneticVectorTimes, GeomagneticScalarTimes, TemperatureNTimes (for N > 0), etc. + matched_time_key = None + for tkey in time_vars.keys(): + if tkey == ts_name: + matched_time_key = tkey + break + + times = [] + if matched_time_key in time_vars: + times = time_vars[matched_time_key] + + # Determine delta (sample interval) + if len(times) > 1: + # delta as a float of seconds + delta = times[1].timestamp - times[0].timestamp + else: + # if only one sample, use default based on interval + # convert minute, second, etc. to numeric delta + if self.interval == "minute": + delta = 60.0 + elif self.interval == "second": + delta = 1.0 + elif self.interval == "hour": + delta = 3600.0 + else: + # fallback, set delta to 60 + delta = 60.0 + + time_attrs = cdf.varattsget(var) + data_interval = time_attrs.get("DATA_INTERVAL_TYPE", None) + + # Required variable attributes based on ImagCDF standards + required_variable_attrs = [ + "FIELDNAM", + "UNITS", + "FILLVAL", + "VALIDMIN", + "VALIDMAX", + "DISPLAY_TYPE", + "LABLAXIS", + "DEPEND_0", + ] + # Validate presence of required variable attributes + missing_var_attrs = [] + for var_attr in required_variable_attrs: + if var_attr not in attrs: + missing_var_attrs.append(var_attr) + + if missing_var_attrs or missing_global_attrs: + error_message = ( + f"The variable '{var}' is missing required attributes: " + f"{', '.join(missing_var_attrs)}" + ) + raise TimeseriesFactoryException(error_message) + + header = { + "station": observatory, + "channel": channel, + "starttime": times[0], + "delta": delta, + "geodetic_latitude": latitude, + "geodetic_longitude": longitude, + "elevation": elevation, + "sensor_orientation": "".join(sensor_orientation), + "data_type": data_type, + "station_name": station_name, + "agency_name": institution, + "conditions_of_use": terms_of_use, + } + # data points not expected in ImagCDF specs + if sensor_sampling_rate is not None: + header.update({"sensor_sampling_rate": sensor_sampling_rate}) + if data_interval is not None: + header.update({"data_interval_type": data_interval}) + if declination_base is not None: + header.update({"declination_base": declination_base}) + if len(comments) > 0: + header.update({"filter_comments": comments}) + if network is not None: + header.update({"network": network}) + + # Create a trace + trace = Trace( + data=data, + header=header, + ) + stream += trace + + return stream + + def _get_url( + self, + observatory: str, + date: UTCDateTime, + type: DataType = "variation", + interval: DataInterval = "minute", + channels: Optional[List[str]] = None, + ) -> str: + """ + Generate the file URL specific to ImagCDF conventions. + + This method constructs the filename based on the ImagCDF naming + conventions. + + [iaga-code]_[date-time]_[publication-level].cdf + + Parameters: + - observatory: IAGA code of the observatory. + - date: Start date for the file. + - type: Data type indicating the processing level. + - interval: Data interval (e.g., 'minute', 'second'). + - channels: List of channels (optional). Not Implemented + + Returns: + - The formatted file URL or path. + + Reference: + - ImagCDF Technical Documentation: ImagCDF File Names + """ + # Get the publication level for the type + publication_level = IMCDFPublicationLevel(value=type).get_level() + + if interval == "year": + date_format = date.strftime("%Y") + elif interval == "month": + date_format = date.strftime("%Y%m") + elif interval == "day": + date_format = date.strftime("%Y%m%d") + elif interval == "hour": + date_format = date.strftime("%Y%m%d_%H") + elif interval == "minute": + date_format = date.strftime("%Y%m%d_%H%M") + elif interval == "second": + date_format = date.strftime("%Y%m%d_%H%M%S") + else: + raise ValueError( + f"Unsupported interval: {interval}" + ) # tenhertz currently not supported + + filename = f"{observatory.lower()}_{date_format}_{publication_level}.cdf" + + # If the urlTemplate explicitly specifies 'stdout', return 'stdout' + if self.urlTemplate.lower() == "stdout": + return "stdout" + + # Prepare parameters for templating + params = { + "date": date.datetime, + "i": self._get_interval_abbreviation(interval), + "interval": self._get_interval_name(interval), + "minute": date.hour * 60 + date.minute, + "month": date.strftime("%b").lower(), + "MONTH": date.strftime("%b").upper(), + "obs": observatory.lower(), + "OBS": observatory.upper(), + "t": publication_level, + "type": self._get_type_name(type), + "julian": date.strftime("%j"), + "year": date.strftime("%Y"), + "ymd": date.strftime("%Y%m%d"), + "dt": date_format, + } + + # Attempt to use the template provided in urlTemplate + if "{" in self.urlTemplate and "}" in self.urlTemplate: + try: + return self.urlTemplate.format(**params) + except KeyError as e: + raise TimeseriesFactoryException( + f"Invalid placeholder in urlTemplate: {e}" + ) + + # If the urlTemplate doesn't support placeholders, assume 'file://' scheme + if self.urlTemplate.startswith("file://"): + base_path = self.urlTemplate[7:] + if not base_path or base_path == "{obs}_{dt}_{t}.cdf": + base_path = os.getcwd() # Default to current working directory + return os.path.join(base_path, filename) + return os.path.join(self.urlTemplate, filename) + + # Unsupported URL scheme + raise TimeseriesFactoryException( + f"Unsupported URL scheme in urlTemplate: {self.urlTemplate}" + ) + + def _get_vector_elements(self): + return {"X", "Y", "Z", "H", "D", "E", "V", "I", "F"} + + def _get_scalar_elements(self): + return {"S", "G"} diff --git a/geomagio/imagcdf/__init__.py b/geomagio/imagcdf/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e7611ab4cdd29eb6c6eea8426606b749ed3675ec --- /dev/null +++ b/geomagio/imagcdf/__init__.py @@ -0,0 +1,10 @@ +"""IO Module for ImagCDF Format +""" + +from __future__ import absolute_import + +from .ImagCDFFactory import ImagCDFFactory +from .IMCDFPublication import IMCDFPublicationLevel + + +__all__ = ["ImagCDFFactory", "IMCDFPublicationLevel"] diff --git a/poetry.lock b/poetry.lock index 3d7add0ef0705dadcbc212cfb759c2d627cbb19e..05a9e1d10a247928ca8c2971da8b2eddeb607624 100644 --- a/poetry.lock +++ b/poetry.lock @@ -153,6 +153,25 @@ d = ["aiohttp (>=3.7.4)", "aiohttp (>=3.7.4,!=3.9.0)"] jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"] uvloop = ["uvloop (>=0.15.2)"] +[[package]] +name = "cdflib" +version = "1.3.2" +description = "A python CDF reader toolkit" +optional = false +python-versions = ">=3.8" +files = [ + {file = "cdflib-1.3.2-py3-none-any.whl", hash = "sha256:49af97acc328c586ac5b7c27fd8e67bccf24af82c5bd8c37d8cfe048a0c1752a"}, + {file = "cdflib-1.3.2.tar.gz", hash = "sha256:97f27ac629e4c0ac1367eb8f4edd7a1d184190272ab98a6401e999f3a2e05687"}, +] + +[package.dependencies] +numpy = ">=1.21" + +[package.extras] +dev = ["ipython", "pre-commit"] +docs = ["astropy", "netcdf4", "sphinx", "sphinx-automodapi", "sphinx-copybutton", "sphinx-rtd-theme", "xarray"] +tests = ["astropy", "h5netcdf", "hypothesis", "netcdf4", "pytest (>=3.9)", "pytest-cov", "pytest-remotedata", "xarray"] + [[package]] name = "certifi" version = "2024.12.14" @@ -3047,4 +3066,4 @@ pycurl = ["pycurl"] [metadata] lock-version = "2.0" python-versions = ">=3.8,<3.12" -content-hash = "de2ec42ee0160626455b58e0b8f67e2669ad3ef5e13d58bc4500d0b8a1643cb8" +content-hash = "517a488d608ab52b45699d4605d4f2c4f9dd57533102a42e026049be71b9a3ce" diff --git a/pyproject.toml b/pyproject.toml index c163d9b0a173e3723f429d2c20e77a29a76f0965..51a5ee31cd50c67ca83cbeb0b70e0dacfc7ed970 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ SQLAlchemy = "1.4.41" SQLAlchemy-Utc = "^0.14.0" uvicorn = {extras = ["standard"], version = "^0.22.0"} netcdf4 = "^1.7.2" +cdflib = "^1.3.2" [tool.poetry.dev-dependencies]