From cfd231ce361bfdd7f78d30c6745dca0f03c6dc34 Mon Sep 17 00:00:00 2001 From: Nicholas Shavers <nshavers@contractor.usgs.gov> Date: Tue, 3 Dec 2024 13:49:18 -0800 Subject: [PATCH] poc --- geomagio/ImagCDFFactory.py | 753 +++++++++++++++++++++++++++++++++++++ poetry.lock | 19 + pyproject.toml | 1 + 3 files changed, 773 insertions(+) create mode 100644 geomagio/ImagCDFFactory.py diff --git a/geomagio/ImagCDFFactory.py b/geomagio/ImagCDFFactory.py new file mode 100644 index 00000000..5d104acd --- /dev/null +++ b/geomagio/ImagCDFFactory.py @@ -0,0 +1,753 @@ +"""ImagCDFFactory Implementation Using cdflib + +This module provides the ImagCDFFactory class for creating and writing +geomagnetic time series data in the ImagCDF format using the cdflib library. +The ImagCDF format is based on NASA's Common Data Format (CDF) and is designed +to store geomagnetic data with high precision. + +References: +- ImagCDF Format Documentation: https://intermagnet.org/docs/technical/im_tn_8_ImagCDF.pdf +- CDF Library: http://cdf.gsfc.nasa.gov/ +""" + +from __future__ import absolute_import, print_function +from io import BytesIO +import os +import sys +from typing import List, Optional, Union + +import numpy as np +from obspy import Stream, Trace, UTCDateTime + +from geomagio.TimeseriesFactory import TimeseriesFactory + +from .geomag_types import DataInterval, DataType +from .TimeseriesFactoryException import TimeseriesFactoryException +from . import TimeseriesUtility +from . import Util + +import cdflib +import tempfile + + +class IMCDFPublicationLevel: + """Handles publication levels and mapping between data types and levels. + + The ImagCDF format uses publication levels to describe the processing + level of the data. This class maps data types (e.g., 'variation', 'definitive') + to their corresponding publication levels as defined in the ImagCDF documentation. + + Publication Levels: + 1: Raw data with no processing. + 2: Edited data with preliminary baselines. + 3: Data suitable for initial bulletins or quasi-definitive publication. + 4: Definitive data with no further changes expected. + + Reference: + - ImagCDF Documentation Section 4.2: Attributes that Uniquely Identify the Data + """ + + class PublicationLevel: + LEVEL_1 = "1" + LEVEL_2 = "2" + LEVEL_3 = "3" + LEVEL_4 = "4" + + TYPE_TO_LEVEL = { + "none": PublicationLevel.LEVEL_1, + "variation": PublicationLevel.LEVEL_1, + "reported": PublicationLevel.LEVEL_1, + "provisional": PublicationLevel.LEVEL_2, + "adjusted": PublicationLevel.LEVEL_2, + "quasi-definitive": PublicationLevel.LEVEL_3, + "quasidefinitive": PublicationLevel.LEVEL_3, + "definitive": PublicationLevel.LEVEL_4, + } + + def __init__(self, data_type: Optional[str] = None): + """Initialize with a data type to determine the publication level.""" + if data_type: + self.level = self.TYPE_TO_LEVEL.get(data_type.lower()) + else: + raise ValueError("data_type must be provided.") + + if not self.level: + raise ValueError(f"Unsupported data_type: {data_type}") + + def to_string(self) -> str: + """Return the publication level as a string.""" + return self.level + + +class ImagCDFFactory(TimeseriesFactory): + """Factory for creating and writing ImagCDF formatted CDF files. + + This class extends the TimeseriesFactory to support writing geomagnetic + time series data to files in the ImagCDF format using the cdflib library. + """ + + def __init__( + self, + observatory: Optional[str] = None, + channels: List[str] = ("H", "D", "Z", "F"), + type: DataType = "variation", + interval: DataInterval = "minute", + urlTemplate="file://{obs}_{dt}_{t}.cdf", + urlInterval: int = -1, + ): + """ + Initialize the ImagCDFFactory with default parameters. + + Parameters: + - observatory: IAGA code of the observatory (e.g., 'BOU'). + - channels: List of geomagnetic elements (e.g., ['H', 'D', 'Z', 'F']). + - type: Data type indicating the processing level (e.g., 'variation', 'definitive'). + - interval: Data interval (e.g., 'minute', 'second'). + - urlTemplate: Template for generating file URLs or paths. + - urlInterval: Interval size for splitting data into multiple files. + """ + super().__init__( + observatory=observatory, + channels=channels, + type=type, + interval=interval, + urlTemplate=urlTemplate, + urlInterval=urlInterval, + ) + + def parse_string(self, data: str, **kwargs): + """Parse ImagCDF formatted string data into a Stream. + + Note: Parsing from strings is not implemented in this factory. + """ + raise NotImplementedError('"parse_string" not implemented') + + def write_file(self, fh, timeseries: Stream, channels: List[str]): + """Write the timeseries data to a file handle in ImagCDF format. + + Parameters: + - fh: File handle to write the data. + - timeseries: ObsPy Stream containing the geomagnetic data. + - channels: List of channels to include in the output file. + """ + # Create a temporary file to write the CDF data + with tempfile.NamedTemporaryFile(delete=False) as tmp_file: + tmp_file_path = tmp_file.name + ".cdf" + + try: + # Initialize the CDF writer + cdf_writer = cdflib.cdfwrite.CDF(tmp_file_path, cdf_spec=None) + + # Write global attributes (metadata that applies to the entire file) + global_attrs = self._create_global_attributes(timeseries, channels) + cdf_writer.write_globalattrs(global_attrs) + + # Write time variables for each channel + time_vars = self._create_time_stamp_variables(timeseries) + for ts_name, ts_data in time_vars.items(): + var_spec = { + "Variable": ts_name, + "Data_Type": 33, # CDF TT2000 data type + "Num_Elements": 1, + "Rec_Vary": True, + "Dim_Sizes": [], + "Var_Type": "zVariable", + } + print(f"Writing time variable {ts_name} with data length: {len(ts_data)}") + cdf_writer.write_var( + var_spec=var_spec, + var_attrs=self._create_time_var_attrs(ts_name), + var_data=ts_data, + ) + + # Write geomagnetic data variables + for trace in timeseries: + channel = trace.stats.channel + var_name = f"GeomagneticField{channel}" + var_spec = { + "Variable": var_name, + "Data_Type": self._get_cdf_data_type(trace), + "Num_Elements": 1, + "Rec_Vary": True, + "Dim_Sizes": [], + "Var_Type": "zVariable", + } + print(f"Writing data variable {var_name} with data shape: {trace.data.shape}") + cdf_writer.write_var( + var_spec=var_spec, + var_attrs=self._create_var_attrs(trace), + var_data=trace.data, + ) + + # Copy the temporary CDF file to the final file handle + with open(tmp_file_path, "rb") as tmp: + cdf_data = tmp.read() + fh.write(cdf_data) + + cdf_writer.close() + + finally: + # Cleanup the temporary file + print(tmp_file_path) + + def put_timeseries( + self, + timeseries: Stream, + starttime: Optional[UTCDateTime] = None, + endtime: Optional[UTCDateTime] = None, + channels: Optional[List[str]] = None, + type: Optional[DataType] = None, + interval: Optional[DataInterval] = None, + ): + """ + Store timeseries data in ImagCDF format using cdflib. + + This method writes the timeseries data to one or more files, depending + on the specified urlInterval. + + Parameters: + - timeseries: ObsPy Stream containing the geomagnetic data. + - starttime: Start time of the data to be written. + - endtime: End time of the data to be written. + - channels: List of channels to include in the output file. + - type: Data type indicating the processing level. + - interval: Data interval of the data. + """ + if len(timeseries) == 0: + # No data to store + return + + channels = channels or self.channels + type = type or self.type + interval = interval or self.interval + + # Extract metadata from the first trace + stats = timeseries[0].stats + delta = stats.delta # Sample rate + observatory = stats.station + starttime = starttime or stats.starttime + endtime = endtime or stats.endtime + + # Split data into intervals if necessary + urlIntervals = Util.get_intervals( + starttime=starttime, endtime=endtime, size=self.urlInterval + ) + for urlInterval in urlIntervals: + interval_start = urlInterval["start"] + interval_end = urlInterval["end"] + if interval_start != interval_end: + interval_end = interval_end - delta + url = self._get_url( + observatory=observatory, + date=interval_start, + type=type, + interval=interval, + channels=channels, + ) + + # Handle 'stdout' output + if url == 'stdout': + # Write directly to stdout + fh = sys.stdout.buffer + url_data = timeseries.slice( + starttime=interval_start, + endtime=interval_end, + ) + self.write_file(fh, url_data, channels) + continue # Proceed to next interval if any + + # Handle 'file://' output + elif url.startswith('file://'): + # Get the file path from the URL + url_file = Util.get_file_from_url(url, createParentDirectory=False) + url_data = timeseries.slice( + starttime=interval_start, + endtime=interval_end, + ) + + # Check if the file already exists to merge data + if os.path.isfile(url_file): + try: + # Read existing data to merge with new data + existing_cdf = cdflib.cdfread.CDF(url_file) + existing_stream = self._read_cdf(existing_cdf) + existing_cdf.close() + existing_data = existing_stream + + # Merge existing data with new data + for trace in existing_data: + new_trace = url_data.select( + network=trace.stats.network, + station=trace.stats.station, + channel=trace.stats.channel, + ) + if new_trace: + trace.data = np.concatenate((trace.data, new_trace[0].data)) + url_data = existing_data + url_data + except Exception as e: + # Log the exception if needed + print(f"Warning: Could not read existing CDF file '{url_file}': {e}", file=sys.stderr) + # Proceed with new data + + # Pad the data with NaNs to ensure it fits the interval + url_data.trim( + starttime=interval_start, + endtime=interval_end, + nearest_sample=False, + pad=True, + fill_value=np.nan, + ) + + # Write the data to the CDF file + with open(url_file, "wb") as fh: + self.write_file(fh, url_data, channels) + + else: + # Unsupported URL scheme encountered + raise TimeseriesFactoryException("Unsupported URL scheme in urlTemplate") + + def _create_global_attributes(self, timeseries: Stream, channels: List[str]) -> dict: + """ + Create a dictionary of global attributes for the ImagCDF file. + + These attributes apply to all the data in the file and include metadata + such as observatory information, data publication level, and format + descriptions. + + References: + - ImagCDF Documentation Section 4: ImagCDF Global Attributes + """ + stats = timeseries[0].stats if len(timeseries) > 0 else None + + # Extract metadata from stats or fallback to defaults + observatory_name = getattr(stats, 'station_name', None) or self.observatory or "Unknown Observatory" + station = getattr(stats, 'station', None) or "Unknown Iaga Code" + institution = getattr(stats, 'agency_name', None) or "Unknown Institution" + latitude = getattr(stats, 'geodetic_latitude', None) or 0.0 + longitude = getattr(stats, 'geodetic_longitude', None) or 0.0 + elevation = getattr(stats, 'elevation', None) or 99_999.0 + vector_orientation = getattr(stats, 'sensor_orientation', None) or "" + data_interval_type = getattr(stats, 'data_interval_type', None) or self.interval + publication_level = IMCDFPublicationLevel(data_type=self.type).to_string() + global_attrs = { + 'FormatDescription': {0: 'INTERMAGNET CDF Format'}, + 'FormatVersion': {0: '1.2'}, + 'Title': {0: 'Geomagnetic time series data'}, + 'IagaCode': {0: station}, + 'ElementsRecorded': {0: ''.join(channels)}, + 'PublicationLevel': {0: publication_level}, + 'PublicationDate': {0: UTCDateTime.now().strftime("%Y-%m-%dT%H:%M:%SZ")}, + 'ObservatoryName': {0: observatory_name}, + 'Latitude': {0: latitude}, + 'Longitude': {0: longitude}, + 'Elevation': {0: elevation}, + 'Institution': {0: institution}, + 'VectorSensOrient': {0: vector_orientation}, #remove F - because its a calculation, not an element? + 'StandardLevel': {0: 'None'}, # Set to 'None' + # Temporarily Omit 'StandardName', 'StandardVersion', 'PartialStandDesc' + 'Source': {0: 'institute'}, # "institute" - if the named institution provided the data, “INTERMAGNET†- if the data file has been created by INTERMAGNET from another data source, “WDC†- if the World Data Centre has created the file from another data source + # 'TermsOfUse': {0: self.getINTERMAGNETTermsOfUse()}, + # 'UniqueIdentifier': {0: ''}, + # 'ParentIdentifiers': {0: ''}, + # 'ReferenceLinks': {0: ''}, #links to /ws, plots, USGS.gov + } + return global_attrs + + def _create_time_stamp_variables(self, timeseries: Stream) -> dict: + vector_times = None + scalar_times = None + + for trace in timeseries: + channel = trace.stats.channel + times = [ + (trace.stats.starttime + trace.stats.delta * i).datetime + for i in range(trace.stats.npts) + ] + # Convert timestamps to TT2000 format required by CDF + tt2000_times = cdflib.cdfepoch.timestamp_to_tt2000([time.timestamp() for time in times]) + # tt2000_times = cdflib.cdfepoch.compute_tt2000(times) #this does not work + + if channel in self._get_vector_elements(): + if vector_times is None: + vector_times = tt2000_times + else: + if not np.array_equal(vector_times, tt2000_times): + raise ValueError("Time stamps for vector channels are not the same.") + elif channel in self._get_scalar_elements(): + if scalar_times is None: + scalar_times = tt2000_times + else: + if not np.array_equal(scalar_times, tt2000_times): + raise ValueError("Time stamps for scalar channels are not the same.") + else: + # Handle other channels if necessary + pass + + time_vars = {} + if vector_times is not None: + time_vars['GeomagneticVectorTimes'] = vector_times + if scalar_times is not None: + time_vars['GeomagneticScalarTimes'] = scalar_times + + return time_vars + + + def _create_var_spec( + self, + var_name: str, + data_type: str, + num_elements: int, + var_type: str, + dim_sizes: List[int], + sparse: bool, + compress: int, + pad: Optional[Union[str, np.ndarray]], + ) -> dict: + """ + Create a variable specification dictionary for cdflib. + + This is used to define the properties of a variable when writing it to + the CDF file. + + Parameters: + - var_name: Name of the variable. + - data_type: CDF data type. + - num_elements: Number of elements per record. + - var_type: Variable type ('zVariable' or 'rVariable'). + - dim_sizes: Dimensions of the variable (empty list for 0D). + - sparse: Whether the variable uses sparse records. + - compress: Compression level. + - pad: Pad value for sparse records. + + Reference: + - CDF User's Guide: Variable Specification + """ + var_spec = { + 'Variable': var_name, + 'Data_Type': data_type, + 'Num_Elements': num_elements, + 'Rec_Vary': True, + 'Var_Type': var_type, + 'Dim_Sizes': dim_sizes, + 'Sparse': 'no_sparse' if not sparse else 'pad_sparse', + 'Compress': compress, + 'Pad': pad, + } + return var_spec + + def _create_var_attrs(self, trace: Trace) -> dict: + print(trace.stats) + channel = trace.stats.channel + fieldnam = f"Geomagnetic Field Element {channel}" # “Geomagnetic Field Element †+ the element code or “Temperature †+ the name of the location where the temperature was recorded. + units = '' # Must be one of “nTâ€, “Degrees of arc†or “Celsius†+ if channel == 'D': + units = 'Degrees of arc' + validmin = -360.0 + validmax = 360.0 # A full circle representation + elif channel == 'I': + units = 'Degrees of arc' + validmin = -90.0 + validmax = 90.0 #The magnetic field vector can point straight down (+90°), horizontal (0°), or straight up (-90°). + elif 'Temperature' in channel: + units = 'Celsius' + fieldnam = f"Temperature {trace.stats.location}" + elif channel == 'F': + units = 'nT' + validmin = 0.0 # negative magnetic field intestity not physically meaningful. + validmax = 79_999.0 + elif channel in ['X', 'Y', 'Z', 'H', 'E', 'V', 'G']: + units = 'nT' + validmin = -79_999.0 + validmax = 79_999.0 + + if channel in self._get_vector_elements(): + depend_0 = 'GeomagneticVectorTimes' + elif channel in self._get_scalar_elements(): + depend_0 = 'GeomagneticScalarTimes' + else: + depend_0 = None # Handle other cases if necessary + + + var_attrs = { + 'FIELDNAM': fieldnam, + 'UNITS': units, + 'FILLVAL': 99999.0, + 'VALIDMIN': validmin, + 'VALIDMAX': validmax, + 'DEPEND_0': depend_0, + 'DISPLAY_TYPE': 'time_series', + 'LABLAXIS': channel, + } + return var_attrs + + + def _create_time_var_attrs(self, ts_name: str) -> dict: + """ + Create a dictionary of time variable attributes. + + These attributes provide metadata for time variables. + Note: None of these attributes are required for the time stamp variables GeomagneticVectorTimes and GeomagneticScalarTimes. + Reference: + - ImagCDF Documentation Section 3: ImagCDF Data + """ + # var_attrs = { + # 'UNITS': 'TT2000', + # 'DISPLAY_TYPE': 'time_series', + # 'LABLAXIS': 'Time', + # } + # return var_attrs + return {} + + def _get_cdf_data_type(self, trace: Trace) -> int: + """ + Map ObsPy trace data type to CDF data type. + + Determines the appropriate CDF data type based on the NumPy data type + of the trace data. + + Returns: + - CDF_DOUBLE (45) for floating-point data. + - CDF_INT4 (41) for integer data. + + Reference: + - CDF Data Types: http://cdf.gsfc.nasa.gov/html/cdfdatatypes.html + """ + # CDF data type constants + CDF_DOUBLE = 45 # CDF_DOUBLE corresponds to 64-bit float + CDF_INT4 = 41 # CDF_INT4 corresponds to 32-bit int + + if trace.data.dtype in [np.float32, np.float64]: + return CDF_DOUBLE + elif trace.data.dtype in [np.int32, np.int64]: + return CDF_INT4 + else: + # Default to double precision float + return CDF_DOUBLE + + def _read_cdf(self, cdf: cdflib.cdfread.CDF) -> Stream: + """ + Read CDF data into an ObsPy Stream. + + This method reads the data variables and their corresponding time + variables from a CDF file and constructs an ObsPy Stream. + + Parameters: + - cdf: cdflib CDF object representing the open CDF file. + + Returns: + - An ObsPy Stream containing the data from the CDF file. + """ + stream = Stream() + # Read time variables + time_vars = {} + for var in cdf.cdf_info()['zVariables']: + if var.endswith('Time'): + time_data = cdf.varget(var) + # Convert TT2000 to UTCDateTime + utc_times = [UTCDateTime(t) for t in cdflib.cdfepoch.to_datetime(time_data)] + time_vars[var] = utc_times + + # Read data variables + for var in cdf.cdf_info()['zVariables']: + if not var.endswith('Time'): + data = cdf.varget(var) + attrs = cdf.varattsget(var) + if 'DEPEND_0' in attrs: + ts_name = attrs['DEPEND_0'] + if ts_name in time_vars: + times = time_vars[ts_name] + if len(times) > 1: + delta = times[1] - times[0] # Calculate sample interval + else: + delta = 60 if self.interval == 'minute' else 1 + trace = Trace( + data=data, + header={ + 'station': self.observatory, + 'channel': var, + 'starttime': times[0], + 'delta': delta, + } + ) + stream += trace + return stream + + @staticmethod + def getINTERMAGNETTermsOfUse() -> str: + """ + Return the INTERMAGNET Terms of Use. + + These terms should be included in the 'TermsOfUse' global attribute + as per the ImagCDF specification. + + Reference: + - ImagCDF Documentation Section 4.5: Attributes that Relate to Publication of the Data + """ + return ( + "CONDITIONS OF USE FOR DATA PROVIDED THROUGH INTERMAGNET:\n" + "The data made available through INTERMAGNET are provided for\n" + "your use and are not for commercial use or sale or distribution\n" + "to third parties without the written permission of the institute\n" + "(http://www.intermagnet.org/Institutes_e.html) operating\n" + "the observatory. Publications making use of the data\n" + "should include an acknowledgment statement of the form given below.\n" + "A citation reference should be sent to the INTERMAGNET Secretary\n" + "(secretary@intermagnet.org) for inclusion in a publications list\n" + "on the INTERMAGNET website.\n" + "\n" + " ACKNOWLEDGEMENT OF DATA FROM OBSERVATORIES\n" + " PARTICIPATING IN INTERMAGNET\n" + "We offer two acknowledgement templates. The first is for cases\n" + "where data from many observatories have been used and it is not\n" + "practical to list them all, or each of their operating institutes.\n" + "The second is for cases where research results have been produced\n" + "using a smaller set of observatories.\n" + "\n" + " Suggested Acknowledgement Text (template 1)\n" + "The results presented in this paper rely on data collected\n" + "at magnetic observatories. We thank the national institutes that\n" + "support them and INTERMAGNET for promoting high standards of\n" + "magnetic observatory practice (www.intermagnet.org).\n" + "\n" + " Suggested Acknowledgement Text (template 2)\n" + "The results presented in this paper rely on the data\n" + "collected at <observatory name>. We thank <institute name>,\n" + "for supporting its operation and INTERMAGNET for promoting high\n" + "standards of magnetic observatory practice (www.intermagnet.org).\n" + ) + + def _get_url( + self, + observatory: str, + date: UTCDateTime, + type: DataType = "variation", + interval: DataInterval = "minute", + channels: Optional[List[str]] = None, + ) -> str: + """ + Generate the file URL specific to ImagCDF conventions. + + This method constructs the filename based on the ImagCDF naming + conventions, which include the observatory code, date-time formatted + according to the data interval, and the publication level. + + Parameters: + - observatory: IAGA code of the observatory. + - date: Start date for the file. + - type: Data type indicating the processing level. + - interval: Data interval (e.g., 'minute', 'second'). + - channels: List of channels (optional). + + Returns: + - The formatted file URL or path. + + Reference: + - ImagCDF Documentation Section 5: ImagCDF File Names + """ + # Get the publication level for the type + publication_level = IMCDFPublicationLevel(data_type=type).to_string() + + # Determine filename date format based on interval + if interval == "year": + date_format = date.strftime("%Y") + elif interval == "month": + date_format = date.strftime("%Y%m") + elif interval == "day": + date_format = date.strftime("%Y%m%d") + elif interval == "hour": + date_format = date.strftime("%Y%m%d_%H") + elif interval == "minute": + date_format = date.strftime("%Y%m%d_%H%M") + elif interval == "second": + date_format = date.strftime("%Y%m%d_%H%M%S") + else: + raise ValueError(f"Unsupported interval: {interval}") + + # Default filename following ImagCDF convention + # Filename format: [iaga-code]_[date-time]_[publication-level].cdf + filename = f"{observatory.lower()}_{date_format}_{publication_level}.cdf" + + # If the urlTemplate explicitly specifies 'stdout', return 'stdout' + if self.urlTemplate.lower() == "stdout": + return "stdout" + + # Prepare parameters for templating + params = { + "date": date.datetime, + "i": self._get_interval_abbreviation(interval), + "interval": self._get_interval_name(interval), + "minute": date.hour * 60 + date.minute, + "month": date.strftime("%b").lower(), + "MONTH": date.strftime("%b").upper(), + "obs": observatory.lower(), + "OBS": observatory.upper(), + "t": publication_level, + "type": self._get_type_name(type), + "julian": date.strftime("%j"), + "year": date.strftime("%Y"), + "ymd": date.strftime("%Y%m%d"), + "dt": date_format, # Add the date-time formatted string + } + + # Attempt to use the template provided in urlTemplate + if "{" in self.urlTemplate and "}" in self.urlTemplate: + try: + return self.urlTemplate.format(**params) + except KeyError as e: + raise TimeseriesFactoryException(f"Invalid placeholder in urlTemplate: {e}") + + # If the urlTemplate doesn't support placeholders, assume 'file://' scheme + if self.urlTemplate.startswith("file://"): + base_path = self.urlTemplate[7:] # Strip "file://" + if not base_path or base_path == "{obs}_{dt}_{t}.cdf": + base_path = os.getcwd() # Default to current working directory + return os.path.join(base_path, filename) + + # Unsupported URL scheme + raise TimeseriesFactoryException( + f"Unsupported URL scheme in urlTemplate: {self.urlTemplate}" + ) + + # Placeholder methods for interval and type names/abbreviations + def _get_interval_abbreviation(self, interval: DataInterval) -> str: + """Get the abbreviation for the data interval.""" + abbreviations = { + "year": "yr", + "month": "mon", + "day": "day", + "hour": "hr", + "minute": "min", + "second": "sec", + } + return abbreviations.get(interval, "min") + + def _get_interval_name(self, interval: DataInterval) -> str: + """Get the full name for the data interval.""" + names = { + "year": "yearly", + "month": "monthly", + "day": "daily", + "hour": "hourly", + "minute": "minute", + "second": "second", + } + return names.get(interval, "minute") + + def _get_type_name(self, type: DataType) -> str: + """Get the full name for the data type.""" + type_names = { + "variation": "variation", + "definitive": "definitive", + "quasi-definitive": "quasi-definitive", + "provisional": "provisional", + "adjusted": "adjusted", + "none": "none", + } + return type_names.get(type, "variation") + + + def _get_vector_elements(self): + return {'X', 'Y', 'Z', 'H', 'D', 'E', 'V', 'I', 'F'} + + def _get_scalar_elements(self): + return {'S', 'G'} \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index 3d7add0e..90b76110 100644 --- a/poetry.lock +++ b/poetry.lock @@ -153,6 +153,25 @@ d = ["aiohttp (>=3.7.4)", "aiohttp (>=3.7.4,!=3.9.0)"] jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"] uvloop = ["uvloop (>=0.15.2)"] +[[package]] +name = "cdflib" +version = "1.3.2" +description = "A python CDF reader toolkit" +optional = false +python-versions = ">=3.8" +files = [ + {file = "cdflib-1.3.2-py3-none-any.whl", hash = "sha256:49af97acc328c586ac5b7c27fd8e67bccf24af82c5bd8c37d8cfe048a0c1752a"}, + {file = "cdflib-1.3.2.tar.gz", hash = "sha256:97f27ac629e4c0ac1367eb8f4edd7a1d184190272ab98a6401e999f3a2e05687"}, +] + +[package.dependencies] +numpy = ">=1.21" + +[package.extras] +dev = ["ipython", "pre-commit"] +docs = ["astropy", "netcdf4", "sphinx", "sphinx-automodapi", "sphinx-copybutton", "sphinx-rtd-theme", "xarray"] +tests = ["astropy", "h5netcdf", "hypothesis", "netcdf4", "pytest (>=3.9)", "pytest-cov", "pytest-remotedata", "xarray"] + [[package]] name = "certifi" version = "2024.12.14" diff --git a/pyproject.toml b/pyproject.toml index c163d9b0..51a5ee31 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ SQLAlchemy = "1.4.41" SQLAlchemy-Utc = "^0.14.0" uvicorn = {extras = ["standard"], version = "^0.22.0"} netcdf4 = "^1.7.2" +cdflib = "^1.3.2" [tool.poetry.dev-dependencies] -- GitLab