Skip to content
Snippets Groups Projects
ImagCDFFactory.py 38.2 KiB
Newer Older
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
"""ImagCDFFactory Implementation Using cdflib

This module provides the ImagCDFFactory class for creating and writing
geomagnetic time series data in the ImagCDF format using the cdflib library.
The ImagCDF format is based on NASA's Common Data Format (CDF) and is designed
to store geomagnetic data with high precision.

References:
- ImagCDF Technical Note: https://intermagnet.org/docs/technical/im_tn_8_ImagCDF.pdf
- ImagCDF Official Documentation: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdfv1-2-intermagnet-exchange-format
- CDF User Guide: https://spdf.gsfc.nasa.gov/pub/software/cdf/doc/cdf_User_Guide.pdf
- CDFLib Docs: [https://cdflib.readthedocs.io/en/latest/#, https://cdflib.readthedocs.io/en/stable/modules/index.html] 
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
"""

from __future__ import absolute_import, print_function
from io import BytesIO
import os
import sys
from typing import List, Optional, Union
from datetime import datetime, timezone, tzinfo
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
import numpy as np
from obspy import Stream, Trace, UTCDateTime
from sqlalchemy import true
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

from geomagio.TimeseriesFactory import TimeseriesFactory
from geomagio.api.ws.Element import TEMPERATURE_ELEMENTS_ID
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

from ..geomag_types import DataInterval, DataType
from ..TimeseriesFactoryException import TimeseriesFactoryException
from .. import TimeseriesUtility
from .. import Util
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

import cdflib
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
from cdflib.cdfwrite import CDF as CDFWriter
from cdflib.cdfread import CDF as CDFReader
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
import tempfile


class IMCDFPublicationLevel:
    """Handles publication levels and mapping between data types and levels.

    The ImagCDF format uses publication levels to describe the processing
    level of the data. This class maps data types (e.g., 'variation', 'definitive')
    to their corresponding publication levels as defined in the ImagCDF documentation.

    Publication Levels:
        1: Raw data with no processing.
        2: Edited data with preliminary baselines.
        3: Data suitable for initial bulletins or quasi-definitive publication.
        4: Definitive data with no further changes expected.

    Reference:
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
    - ImagCDF Technical Documentation: Attributes that Uniquely Identify the Data
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
    """

    class PublicationLevel:
        LEVEL_1 = "1"
        LEVEL_2 = "2"
        LEVEL_3 = "3"
        LEVEL_4 = "4"

    TYPE_TO_LEVEL = {
        "none": PublicationLevel.LEVEL_1,
        "variation": PublicationLevel.LEVEL_1,
        "reported": PublicationLevel.LEVEL_1,
        "provisional": PublicationLevel.LEVEL_2,
        "adjusted": PublicationLevel.LEVEL_2,
        "quasi-definitive": PublicationLevel.LEVEL_3,
        "quasidefinitive": PublicationLevel.LEVEL_3,
        "definitive": PublicationLevel.LEVEL_4,
    }

    def __init__(self, data_type: Optional[str] = None):
        """Initialize with a data type to determine the publication level."""
        if data_type:
            self.level = self.TYPE_TO_LEVEL.get(data_type.lower())
        else:
            raise ValueError("data_type must be provided.")

        if not self.level:
            raise ValueError(f"Unsupported data_type: {data_type}")

    def to_string(self) -> str:
        """Return the publication level as a string."""
        return self.level


class ImagCDFFactory(TimeseriesFactory):
    """Factory for creating and writing ImagCDF formatted CDF files.

    This class extends the TimeseriesFactory to support writing geomagnetic
    time series data to files in the ImagCDF format using the cdflib library.
    """

    isUniqueTimes = True  # used to determine depend_0 and CDF Time Variable Name
    NONSTANDARD_ELEMENTS = TEMPERATURE_ELEMENTS_ID
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

    def __init__(
        self,
        observatory: Optional[str] = None,
        channels: List[str] = ("H", "D", "Z", "F"),
        type: DataType = "variation",
        interval: DataInterval = "minute",
        urlTemplate="file://etc/imagcdf/{obs}_{dt}_{t}.cdf",
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        urlInterval: int = -1,
    ):
        """
        Initialize the ImagCDFFactory with default parameters.

        Parameters:
        - observatory: IAGA code of the observatory (e.g., 'BOU').
        - channels: List of geomagnetic elements (e.g., ['H', 'D', 'Z', 'F']).
        - type: Data type indicating the processing level (e.g., 'variation', 'definitive').
        - interval: Data interval (e.g., 'minute', 'second').
        - urlTemplate: Template for generating file URLs or paths.
        - urlInterval: Interval size for splitting data into multiple files.
        """
        super().__init__(
            observatory=observatory,
            channels=channels,
            type=type,
            interval=interval,
            urlTemplate=urlTemplate,
            urlInterval=urlInterval,
        )

    def write_file(self, fh, timeseries: Stream, channels: List[str]):
        # Create a temporary file to write the CDF data
        with tempfile.NamedTemporaryFile(delete=False, suffix=".cdf") as tmp_file:
            tmp_file_path = tmp_file.name
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

        try:
            # Initialize the CDF writer
                "Compressed": 9,  # Enable compression (1-9)
                "Majority": CDFWriter.ROW_MAJOR,
                "Encoding": CDFWriter.NETWORK_ENCODING,  #  XDR Encoding - If a CDF must be portable between two or more different types of computers use network encoded.
                "Checksum": True,  # Disable checksum for faster writes (optional)
                "rDim_sizes": [],  # Applicable only if using rVariables - CDF protocol recommends only using zVariables.
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
            cdf_writer = CDFWriter(path=tmp_file_path, cdf_spec=cdf_spec, delete=True)
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

            # Write global attributes
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            global_attrs = self._create_global_attributes(timeseries, channels)
            cdf_writer.write_globalattrs(global_attrs)

            # Time variables
            time_vars = self._create_time_stamp_variables(
                timeseries
            )  # modifies self.isUniqueTimes
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            for ts_name, ts_data in time_vars.items():
                # Define time variable specification
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                var_spec = {
                    "Variable": ts_name,
                    "Data_Type": CDFWriter.CDF_TIME_TT2000,  # CDF_TIME_TT2000
                    "Num_Elements": 1,
                    "Rec_Vary": True,
                    "Var_Type": "zVariable",
                    "Dim_Sizes": [],
                    "Sparse": "no_sparse",  # no_sparse because there should not be time gaps. (Time stamps must represent a regular time series with no missing values in the series)
                    "Compress": 9,
                    "Pad": None,
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                }
                # Define time variable attributes
                var_attrs = self._create_time_var_attrs(ts_name)

                # Write time variable
                cdf_writer.write_var(var_spec, var_attrs, ts_data)
            # Data variables
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            for trace in timeseries:
                channel = trace.stats.channel
                var_name = f"GeomagneticField{channel}"
                # if channel in REAL_TEMPERATURE:
                #     temperature_index += 1  # MUST INCREMENT INDEX BEFORE USING
                #     var_name = f"Temperature{temperature_index}"
                data_type = self._get_cdf_data_type(trace)
                num_elements = 1
                if data_type in [
                    CDFWriter.CDF_CHAR,
                    CDFWriter.CDF_UCHAR,
                ]:  # Handle string types
                    num_elements = len(trace.data[0]) if len(trace.data) > 0 else 1
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                var_spec = {
                    "Variable": var_name,
                    "Data_Type": data_type,
                    "Num_Elements": num_elements,
                    "Rec_Vary": True,
                    "Var_Type": "zVariable",
                    "Dim_Sizes": [],
                    "Sparse": "no_sparse",
                    "Compress": 9,
                    "Pad": None,
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                }
                var_attrs = self._create_var_attrs(
                    trace, temperature_index, self.isUniqueTimes
                )

                # Write data variable
                cdf_writer.write_var(var_spec, var_attrs, trace.data)

            # Close the CDF writer
            cdf_writer.close()
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

            # Copy the temporary CDF file to the final file handle
            with open(tmp_file_path, "rb") as tmp:
                cdf_data = tmp.read()
                fh.write(cdf_data)

        finally:
            # Cleanup the temporary file
            os.remove(tmp_file_path)

Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
    def put_timeseries(
        self,
        timeseries: Stream,
        starttime: Optional[UTCDateTime] = None,
        endtime: Optional[UTCDateTime] = None,
        channels: Optional[List[str]] = None,
        type: Optional[DataType] = None,
        interval: Optional[DataInterval] = None,
    ):
        """
        Store timeseries data in ImagCDF format using cdflib.

        This method writes the timeseries data to one or more files, depending
        on the specified urlInterval.

        Parameters:
        - timeseries: ObsPy Stream containing the geomagnetic data.
        - starttime: Start time of the data to be written.
        - endtime: End time of the data to be written.
        - channels: List of channels to include in the output file.
        - type: Data type indicating the processing level.
        - interval: Data interval of the data.
        """
        if len(timeseries) == 0:
            # No data to store
            return

        channels = channels or self.channels
        type = type or self.type
        interval = interval or self.interval

        # Extract metadata from the first trace
        stats = timeseries[0].stats
        delta = stats.delta  # Sample rate
        observatory = stats.station
        starttime = starttime or stats.starttime
        endtime = endtime or stats.endtime
        # Split data into intervals if necessary
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        urlIntervals = Util.get_intervals(
            starttime=starttime, endtime=endtime, size=self.urlInterval
        )
        for urlInterval in urlIntervals:
            interval_start = urlInterval["start"]
            interval_end = urlInterval["end"]
            url = self._get_url(
                observatory=observatory,
                date=interval_start,
                type=type,
                interval=interval,
                channels=channels,
            )

            # Handle 'stdout' output
            if url == "stdout":
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                # Write directly to stdout
                fh = sys.stdout.buffer
                url_data = timeseries.slice(
                    starttime=interval_start,
                    endtime=interval_end,
                )
                self.write_file(fh, url_data, channels)
                continue  # Proceed to next interval if any

            # Handle 'file://' output
            elif url.startswith("file://"):
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                # Get the file path from the URL
                url_file = Util.get_file_from_url(url, createParentDirectory=False)
                url_data = timeseries.slice(
                    starttime=interval_start,
                    endtime=interval_end,
                )
                # Check if the file already exists to merge data
                if os.path.isfile(url_file):
                    os.remove(url_file)
                    print(f"Naming conflict. Deleting exisiting file '{url_file}'.")
                    # raise TimeseriesFactoryException(
                    #     f"Error: File '{url_file}' already exists."
                    # )
                # Pad the data to ensure it fits the interval
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                url_data.trim(
                    starttime=interval_start,
                    endtime=interval_end,
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                    pad=True,
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                )

                # Write the data to the CDF file
                with open(url_file, "wb") as fh:
                    self.write_file(fh, url_data, channels)

            else:
                # Unsupported URL scheme encountered
                raise TimeseriesFactoryException(
                    "Unsupported URL scheme in urlTemplate"
                )
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

    def get_timeseries(
        self,
        starttime: UTCDateTime,
        endtime: UTCDateTime,
        add_empty_channels: bool = True,
        observatory: Optional[str] = None,
        channels: Optional[List[str]] = None,
        type: Optional[DataType] = None,
        interval: Optional[DataInterval] = None,
    ) -> Stream:
        observatory = observatory or self.observatory
        channels = channels or self.channels
        type = type or self.type
        interval = interval or self.interval

        timeseries = Stream()
        urlIntervals = Util.get_intervals(
            starttime=starttime, endtime=endtime, size=self.urlInterval
        )

        for urlInterval in urlIntervals:
            url = self._get_url(
                observatory=observatory,
                date=urlInterval["start"],
                type=type,
                interval=interval,
                channels=channels,
            )
            if url == "stdout":
                continue  # stdout is not a valid input source
            if not url.startswith("file://"):
                raise TimeseriesFactoryException(
                    "Only file urls are supported for reading ImagCDF"
                )
            url_file = Util.get_file_from_url(url, createParentDirectory=False)
            if not os.path.isfile(url_file):
                # If file doesn't exist, skip
                continue

            try:
                # Read CDF data and merge
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
                cdf = CDFReader(url_file)
            except Exception as e:
                print(f"Error reading CDF file '{url_file}': {e}", file=sys.stderr)

        # After reading all intervals, merge and trim
        timeseries.trim(
            starttime=starttime,
            endtime=endtime,
        )

        # If requested, add empty channels not present in data
        if add_empty_channels:
            present_channels = {tr.stats.channel for tr in timeseries}
            missing_channels = set(channels) - present_channels
            for ch in missing_channels:
                empty_trace = self._get_empty_trace(
                    starttime=starttime,
                    endtime=endtime,
                    observatory=observatory,
                    channel=ch,
                    data_type=type,
                    interval=interval,
                )
                timeseries += empty_trace

        timeseries.sort()
        return timeseries

    def _create_global_attributes(
        self, timeseries: Stream, channels: List[str]
    ) -> dict:
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        """
        Create a dictionary of global attributes for the ImagCDF file.

        These attributes apply to all the data in the file and include metadata
        such as observatory information, data publication level, and format
        descriptions.

        References:
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
        - ImagCDF Technical Documentation: ImagCDF Global Attributes
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        """
        stats = timeseries[0].stats if len(timeseries) > 0 else None

        # Extract metadata from stats or fallback to defaults
        observatory_name = (
            getattr(stats, "station_name", None) or self.observatory or ""
        )
        station = getattr(stats, "station", None) or ""
        institution = getattr(stats, "agency_name", None) or ""
        latitude = getattr(stats, "geodetic_latitude", None) or 0.0
        longitude = getattr(stats, "geodetic_longitude", None) or 0.0
        elevation = getattr(stats, "elevation", None) or 99_999.0
        conditions_of_use = getattr(stats, "conditions_of_use", None) or ""
        vector_orientation = getattr(stats, "sensor_orientation", None) or ""
        data_interval_type = getattr(stats, "data_interval_type", None) or self.interval
        data_type = getattr(stats, "data_type", None) or "variation"
        sensor_sampling_rate = getattr(stats, "sensor_sampling_rate", None) or 0.0
        comments = getattr(stats, "filter_comments", None) or [""]
        declination_base = getattr(stats, "declination_base", None) or 0.0
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        publication_level = IMCDFPublicationLevel(data_type=self.type).to_string()
        global_attrs = {
            "FormatDescription": {0: "INTERMAGNET CDF Format"},
            "FormatVersion": {0: "1.2"},
            "Title": {0: "Geomagnetic time series data"},
            "IagaCode": {0: station},
            "ElementsRecorded": {0: "".join(channels)},
            "PublicationLevel": {0: publication_level},
            "PublicationDate": {
                0: [
                    cdflib.cdfepoch.timestamp_to_tt2000(
                        datetime.timestamp(datetime.now(timezone.utc))
                    ),
                    "cdf_time_tt2000",
                ]
            },
            "ObservatoryName": {0: observatory_name},
            "Latitude": {0: np.array([latitude], dtype=np.float64)},
            "Longitude": {0: np.array([longitude], dtype=np.float64)},
            "Elevation": {0: np.array([elevation], dtype=np.float64)},
            "Institution": {0: institution},
            "VectorSensOrient": {
                0: vector_orientation
            },  # remove F - because its a calculation, not an element?
            "StandardLevel": {0: "None"},  # Set to 'None'
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            # Temporarily Omit 'StandardName', 'StandardVersion', 'PartialStandDesc'
            "Source": {
                0: "institute"
            },  # "institute" - if the named institution provided the data, “INTERMAGNET” - if the data file has been created by INTERMAGNET from another data source, “WDC” - if the World Data Centre has created the file from another data source
            "TermsOfUse": {0: conditions_of_use},
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            # 'UniqueIdentifier': {0: ''},
            # 'ParentIdentifiers': {0: ''},
            # 'ReferenceLinks': {0: ''}, #links to /ws, plots, USGS.gov
            "SensorSamplingRate": {0: sensor_sampling_rate},  # Optional
            "DataType": {0: data_type},  # Optional
            "Comments": {0: comments},  # Optional
            "DeclinationBase": {0: declination_base},  # Optional
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        }
        return global_attrs

    def _create_time_stamp_variables(self, timeseries: Stream) -> dict:
        vector_times = None
        scalar_times = None
        temperature_times = {}
        temperature_index = 1
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        for trace in timeseries:
            channel = trace.stats.channel
            times = [
                (trace.stats.starttime + trace.stats.delta * i).datetime.replace(
                    tzinfo=timezone.utc
                )
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                for i in range(trace.stats.npts)
            ]
            # Convert timestamps to TT2000 format required by CDF
            tt2000_times = cdflib.cdfepoch.timestamp_to_tt2000(
                [time.timestamp() for time in times]
            )
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

            if channel in self._get_vector_elements():
                if vector_times is None:
                    vector_times = tt2000_times
                else:
                    if not np.array_equal(vector_times, tt2000_times):
                        raise ValueError(
                            "Time stamps for vector channels are not the same."
                        )
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            elif channel in self._get_scalar_elements():
                if scalar_times is None:
                    scalar_times = tt2000_times
                else:
                    if not np.array_equal(scalar_times, tt2000_times):
                        raise ValueError(
                            "Time stamps for scalar channels are not the same."
                        )
            # elif channel in REAL_TEMPERATURES:
            #     ts_key = f"Temperature{temperature_index}Times"
            #     if ts_key not in temperature_times:
            #         temperature_times[ts_key] = tt2000_times
            #         temperature_index += 1
            #     else:
            #         temperature_times[ts_key] = tt2000_times
            elif channel in self.NONSTANDARD_ELEMENTS:
                if scalar_times is None:
                    nonstandard_times = tt2000_times
                    if not np.array_equal(scalar_times, tt2000_times):
                        raise ValueError(
                            "Time stamps for nonstandard channels are not the same."
                        )
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        time_vars = {}
        if vector_times is not None:
            time_vars["GeomagneticVectorTimes"] = vector_times
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        if scalar_times is not None:
            time_vars["GeomagneticScalarTimes"] = scalar_times
        if temperature_times:
            time_vars.update(temperature_times)
        isNonStandard = nonstandard_times is not None
        if isNonStandard:
            time_vars["NonstandardTimes"] = nonstandard_times

        self.isUniqueTimes = (
            len(time_vars) == 1
        )  # (true if is a single time stamp variable in the file)
        for index, times in enumerate(time_vars.values()):
            if index > 0:
                self.isUniqueTimes = not np.array_equal(last_times, times)
            last_times = times
        if self.isUniqueTimes and isNonStandard and len(time_vars) > 2:
            raise ValueError(
                f"Time stamps must be the same for all channels when using nonstandard channels: {','.join(self.NONSTANDARD_ELEMENTS)}"
            )
        return (
            {"DataTimes": last_times}
            if isNonStandard or not self.isUniqueTimes
            else time_vars
        )
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

    def _create_var_spec(
        self,
        var_name: str,
        data_type: str,
        num_elements: int,
        var_type: str,
        dim_sizes: List[int],
        sparse: bool,
        compress: int,
        pad: Optional[Union[str, np.ndarray]],
    ) -> dict:
        """
        Create a variable specification dictionary for cdflib.

        This is used to define the properties of a variable when writing it to
        the CDF file.

        Parameters:
        - var_name: Name of the variable.
        - data_type: CDF data type.
        - num_elements: Number of elements per record.
        - var_type: Variable type ('zVariable' or 'rVariable').
        - dim_sizes: Dimensions of the variable (empty list for 0D).
        - sparse: Whether the variable uses sparse records.
        - compress: Compression level.
        - pad: Pad value for sparse records.

        Reference:
        - CDF User's Guide: Variable Specification
        """
        var_spec = {
            "Variable": var_name,
            "Data_Type": data_type,
            "Num_Elements": num_elements,
            "Rec_Vary": True,
            "Var_Type": var_type,
            "Dim_Sizes": dim_sizes,
            "Sparse": "no_sparse" if not sparse else "pad_sparse",
            "Compress": compress,
            "Pad": pad,
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        }
        return var_spec

    def _create_var_attrs(
        self,
        trace: Trace,
        temperature_index: Optional[int] = None,
        isUniqueTimes: Optional[bool] = True,
    ) -> dict:
        channel = trace.stats.channel.upper()
        fieldnam = f"Geomagnetic Field Element {channel}"  # “Geomagnetic Field Element ” + the element code or “Temperature ” + the name of the location where the temperature was recorded.
        units = ""  # Must be one of “nT”, “Degrees of arc” or “Celsius”
        if channel == "D":
            units = "Degrees of arc"
            validmin = -360.0
            validmax = 360.0  # A full circle representation
        elif channel == "I":
            units = "Degrees of arc"
            validmin = -90.0
            validmax = 90.0  # The magnetic field vector can point straight down (+90°), horizontal (0°), or straight up (-90°).
        elif channel in TEMPERATURE_ELEMENTS_ID:
            units = "Celsius"
            validmin = -273.15  # absolute zero
            depend_0 = "DataTimes"  # can be used for nonstandard element
        # elif channel in [REAL_TEMPERATURES]:
        #     units = "Celsius"
        #     fieldnam = f"Temperature {temperature_index} {trace.stats.location}"
        #     validmin = -273.15  # absolute zero
        #     validmax = 79_999
        elif channel in ["F", "S"]:
            units = "nT"
            validmin = (
                0.0  # negative magnetic field intestity not physically meaningful.
            )
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            validmax = 79_999.0
        elif channel in ["X", "Y", "Z", "H", "E", "V", "G"]:
            units = "nT"
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            validmin = -79_999.0
            validmax = 79_999.0

        # Determine DEPEND_0 based on channel type
        if not isUniqueTimes:
            depend_0 = "DataTimes"
        elif channel in self._get_vector_elements():
            depend_0 = "GeomagneticVectorTimes"
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        elif channel in self._get_scalar_elements():
            depend_0 = "GeomagneticScalarTimes"
        # elif channel in REAL_TEMPERATURES:
        #     depend_0 = f"Temperature{temperature_index}Times"
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

        var_attrs = {
            "FIELDNAM": fieldnam,
            "UNITS": units,
            "FILLVAL": 99999.0,
            "VALIDMIN": validmin,
            "VALIDMAX": validmax,
            "DISPLAY_TYPE": "time_series",
            "LABLAXIS": channel,
            "DATA_INTERVAL_TYPE": trace.stats.data_interval_type,  # optional
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        }
        return var_attrs

    def _create_time_var_attrs(self, ts_name: str) -> dict:
        """
        Create a dictionary of time variable attributes.

        These attributes provide metadata for time variables.
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
        Note: None of these attributes are required for the time stamp variables.
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        Reference:
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
        - ImagCDF Technical Documentation: ImagCDF Data
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        """
        # var_attrs = {
        # 'UNITS': 'TT2000',
        # 'DISPLAY_TYPE': 'time_series',
        # 'LABLAXIS': 'Time',
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        # }
        # return var_attrs
        return {}

    def _get_cdf_data_type(self, trace: Trace) -> int:
        """
        Map ObsPy trace data type to CDF data type.

        Determines the appropriate CDF data type based on the NumPy data type
        of the trace data.

        Returns:
        - CDF_DOUBLE (45) for floating-point data.
        - CDF_INT4 (41) for integer data.

        Reference:
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
        - See CDF for more data types
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        """

        if trace.data.dtype in [np.float32, np.float64]:
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
            return CDFWriter.CDF_DOUBLE
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        elif trace.data.dtype in [np.int32, np.int64]:
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
            return CDFWriter.CDF_INT4
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        else:
            # Default to double precision float
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
            return CDFWriter.CDF_DOUBLE
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

    def _read_cdf(self, cdf: CDFReader, channels: Optional[List[str]]) -> Stream:
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        """
        Read CDF data into an ObsPy Stream.

        This method reads the data variables and their corresponding time
        variables from a CDF file and constructs an ObsPy Stream.

        Parameters:
        - cdf: cdflib CDF object representing the open CDF file.

        Returns:
        - An ObsPy Stream containing the data from the CDF file.
        """
        stream = Stream()

        # Extract global attributes
        global_attrs = cdf.globalattsget()
        # Define required global attributes based on ImagCDF standards
        required_global_attrs = [
            "FormatDescription",
            "FormatVersion",
            "Title",
            "IagaCode",
            "ElementsRecorded",
            "PublicationLevel",
            "PublicationDate",
            "ObservatoryName",
            "Latitude",
            "Longitude",
            "Elevation",
            "Institution",
            "StandardLevel",
            "Source",
        ]

        # Check for missing required global attributes per ImagCDF techincal document
        missing_global_attrs = []
        for attr in required_global_attrs:
            if attr not in global_attrs:
                missing_global_attrs.append(attr)

        if missing_global_attrs:
            error_message = (
                f"The following required global attributes are missing from the CDF: "
                f"{', '.join(missing_global_attrs)}"
            )
            raise TimeseriesFactoryException(error_message)
        # Map global attributes to Stream-level metadata
        observatory = global_attrs.get("IagaCode", [""])[0]
        station_name = global_attrs.get("ObservatoryName", [""])[0]
        institution = global_attrs.get("Institution", [""])[0]
        latitude = global_attrs.get("Latitude", [0.0])[0]
        longitude = global_attrs.get("Longitude", [0.0])[0]
        elevation = global_attrs.get("Elevation", [99_999.0])[
            0
        ]  # default to 99_999 per technical documents.
        sensor_sampling_rate = global_attrs.get("SensorSamplingRate", [0.0])[0]
        sensor_orientation = global_attrs.get("VectorSensOrient", [""])[0]
        data_type = global_attrs.get("DataType", ["variation"])[0]
        publication_level = global_attrs.get("PublicationLevel", ["1"])[0]
        comments = global_attrs.get("Comments", [""])
        terms_of_use = global_attrs.get("TermsOfUse", [""])[0]
        declination_base = global_attrs.get("DeclinationBase", [0.0])[0]
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        time_vars = {}
        for var in cdf.cdf_info().zVariables:
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                time_data = cdf.varget(var)
                unix_times = cdflib.cdfepoch.unixtime(time_data)
                utc_times = [UTCDateTime(t) for t in unix_times]
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                time_vars[var] = utc_times

        # Read data variables and associate them with time variables
        for var in cdf.cdf_info().zVariables:
            # Map the variable name back to a standard channel code by removing known prefixes
            # Names are like GeomagneticFieldH, GeomagneticFieldD, Temperature1, Temperature2, ...
            if var.startswith("GeomagneticField"):
                channel = var.replace("GeomagneticField", "")
            # elif var.startswith("Temperature"):
            #     # Temperature variables may not map directly to a geomagnetic channel
            #     # but to temperature sensors. We can just use the label from LABLAXIS if needed
            #     channel = attrs.get("LABLAXIS", var)
            else:
                # fallback if naming doesn't match expected patterns
                channel = var

            if channels and channel not in channels:
                continue
            data = cdf.varget(var)
            attrs = cdf.varattsget(var)

            # Determine DEPEND_0 (the time variable name) and validate
            ts_name = attrs.get("DEPEND_0")
            # if not ts_name:
            #     # If no DEPEND_0, skip this variable as we cannot map times
            #     continue
            # The ImagCDF can have DataTimes, GeomagneticVectorTimes, GeomagneticScalarTimes, TemperatureNTimes (for N > 0), etc.
            matched_time_key = None
            for tkey in time_vars.keys():
                if tkey == ts_name:
                    matched_time_key = tkey
                    break

            # if matched_time_key not in time_vars:
            #     # If we cannot find the matching time variable, skip this variable
            #     continue
            times = []
            if matched_time_key in time_vars:
                times = time_vars[matched_time_key]

            # Determine delta (sample interval)
            if len(times) > 1:
                # delta as a float of seconds
                delta = times[1].timestamp - times[0].timestamp
            else:
                # if only one sample, use default based on interval
                # convert minute, second, etc. to numeric delta
                if self.interval == "minute":
                elif self.interval == "second":
                elif self.interval == "hour":
                    delta = 3600.0
                else:
                    # fallback, set delta to 60
                    delta = 60.0

            time_attrs = cdf.varattsget(var)
            data_interval = time_attrs.get("DATA_INTERVAL_TYPE", [""])

            # Required variable attributes based on ImagCDF standards
            required_variable_attrs = [
                "FIELDNAM",
                "UNITS",
                "FILLVAL",
                "VALIDMIN",
                "VALIDMAX",
                "DISPLAY_TYPE",
                "LABLAXIS",
            ]
            # Validate presence of required variable attributes
            missing_var_attrs = []
            for var_attr in required_variable_attrs:
                if var_attr not in attrs:
                    missing_var_attrs.append(var_attr)

            if missing_var_attrs or missing_global_attrs:
                error_message = (
                    f"The variable '{var}' is missing required attributes: "
                    f"{', '.join(missing_var_attrs)}"
                )
                raise TimeseriesFactoryException(error_message)

            # Create a trace
            trace = Trace(
                data=data,
                header={
                    "station": observatory,
                    "channel": channel,
                    "starttime": times[0],
                    "delta": delta,
                    "geodetic_latitude": latitude,
                    "geodetic_longitude": longitude,
                    "elevation": elevation,
                    "sensor_orientation": "".join(sensor_orientation),
                    "data_type": data_type,
                    "station_name": station_name,
                    "agency_name": institution,
                    "conditions_of_use": terms_of_use,
                    "sensor_sampling_rate": sensor_sampling_rate,
                    "data_interval_type": data_interval,
                    "declination_base": declination_base,
                    "filter_comments": comments,
                },
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

    def _get_url(
        self,
        observatory: str,
        date: UTCDateTime,
        type: DataType = "variation",
        interval: DataInterval = "minute",
        channels: Optional[List[str]] = None,
    ) -> str:
        """
        Generate the file URL specific to ImagCDF conventions.

        This method constructs the filename based on the ImagCDF naming
        conventions, which include the observatory code, date-time formatted
        according to the data interval, and the publication level.

        [iaga-code]_[date-time]_[publication-level].cdf
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

        Parameters:
        - observatory: IAGA code of the observatory.
        - date: Start date for the file.
        - type: Data type indicating the processing level.
        - interval: Data interval (e.g., 'minute', 'second').
        - channels: List of channels (optional). Not Implemented
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

        Returns:
        - The formatted file URL or path.

        Reference:
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
        - ImagCDF Technical Documentation: ImagCDF File Names
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        """
        # Get the publication level for the type
        publication_level = IMCDFPublicationLevel(data_type=type).to_string()

        # Format of Date/Time Portion of Filename based on interval see reference: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#example-data-file:~:text=Format%20of%20Date,%EF%83%81
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        if interval == "year":
            date_format = date.strftime("%Y")
        elif interval == "month":
            date_format = date.strftime("%Y%m")
        elif interval == "day":
            date_format = date.strftime("%Y%m%d")
        elif interval == "hour":
            date_format = date.strftime("%Y%m%d_%H")
        elif interval == "minute":
            date_format = date.strftime("%Y%m%d_%H%M")
        elif interval == "second":
            date_format = date.strftime("%Y%m%d_%H%M%S")
        else:
            raise ValueError(
                f"Unsupported interval: {interval}"
            )  # tenhertz currently not supported
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

        # Filename following ImagCDF convention, see reference: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdf-file-names
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        filename = f"{observatory.lower()}_{date_format}_{publication_level}.cdf"

        # If the urlTemplate explicitly specifies 'stdout', return 'stdout'
        if self.urlTemplate.lower() == "stdout":
            return "stdout"

        # Prepare parameters for templating
        params = {
            "date": date.datetime,
            "i": self._get_interval_abbreviation(interval),
            "interval": self._get_interval_name(interval),
            "minute": date.hour * 60 + date.minute,
            "month": date.strftime("%b").lower(),
            "MONTH": date.strftime("%b").upper(),
            "obs": observatory.lower(),
            "OBS": observatory.upper(),
            "t": publication_level,
            "type": self._get_type_name(type),
            "julian": date.strftime("%j"),
            "year": date.strftime("%Y"),
            "ymd": date.strftime("%Y%m%d"),
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        }

        # Attempt to use the template provided in urlTemplate
        if "{" in self.urlTemplate and "}" in self.urlTemplate:
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            try:
                return self.urlTemplate.format(**params)
            except KeyError as e:
                raise TimeseriesFactoryException(
                    f"Invalid placeholder in urlTemplate: {e}"
                )
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

        # If the urlTemplate doesn't support placeholders, assume 'file://' scheme
        if self.urlTemplate.startswith("file://"):
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            if not base_path or base_path == "{obs}_{dt}_{t}.cdf":
                base_path = os.getcwd()  # Default to current working directory
                return os.path.join(base_path, "etc", "imagcdf", filename)
            return os.path.join(self.urlTemplate, filename)
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

        # Unsupported URL scheme
        raise TimeseriesFactoryException(
            f"Unsupported URL scheme in urlTemplate: {self.urlTemplate}"
        )

    def _get_vector_elements(self):
        return {"X", "Y", "Z", "H", "D", "E", "V", "I", "F"}
    def _get_scalar_elements(self):
        return {"S", "G"}