Skip to content
Snippets Groups Projects
ImagCDFFactory.py 35.9 KiB
Newer Older
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
"""ImagCDFFactory Implementation Using cdflib

This module provides the ImagCDFFactory class for creating and writing
geomagnetic time series data in the ImagCDF format using the cdflib library.
The ImagCDF format is based on NASA's Common Data Format (CDF) and is designed
to store geomagnetic data with high precision.

References:
- ImagCDF Technical Note: https://intermagnet.org/docs/technical/im_tn_8_ImagCDF.pdf
- ImagCDF Official Documentation: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdfv1-2-intermagnet-exchange-format
- CDF User Guide: https://spdf.gsfc.nasa.gov/pub/software/cdf/doc/cdf_User_Guide.pdf
- CDFLib Docs: [https://cdflib.readthedocs.io/en/latest/#, https://cdflib.readthedocs.io/en/stable/modules/index.html] 
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
"""

from __future__ import absolute_import, print_function
from io import BytesIO
import os
import sys
from typing import List, Optional, Union
from datetime import datetime, timezone, tzinfo
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
import numpy as np
from obspy import Stream, Trace, UTCDateTime
from sqlalchemy import true
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

from geomagio.TimeseriesFactory import TimeseriesFactory
from geomagio.api.ws.Element import TEMPERATURE_ELEMENTS_ID
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

from .geomag_types import DataInterval, DataType
from .TimeseriesFactoryException import TimeseriesFactoryException
from . import TimeseriesUtility
from . import Util

import cdflib
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
from cdflib.cdfwrite import CDF as CDFWriter
from cdflib.cdfread import CDF as CDFReader
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
import tempfile


class IMCDFPublicationLevel:
    """Handles publication levels and mapping between data types and levels.

    The ImagCDF format uses publication levels to describe the processing
    level of the data. This class maps data types (e.g., 'variation', 'definitive')
    to their corresponding publication levels as defined in the ImagCDF documentation.

    Publication Levels:
        1: Raw data with no processing.
        2: Edited data with preliminary baselines.
        3: Data suitable for initial bulletins or quasi-definitive publication.
        4: Definitive data with no further changes expected.

    Reference:
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
    - ImagCDF Technical Documentation: Attributes that Uniquely Identify the Data
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
    """

    class PublicationLevel:
        LEVEL_1 = "1"
        LEVEL_2 = "2"
        LEVEL_3 = "3"
        LEVEL_4 = "4"

    TYPE_TO_LEVEL = {
        "none": PublicationLevel.LEVEL_1,
        "variation": PublicationLevel.LEVEL_1,
        "reported": PublicationLevel.LEVEL_1,
        "provisional": PublicationLevel.LEVEL_2,
        "adjusted": PublicationLevel.LEVEL_2,
        "quasi-definitive": PublicationLevel.LEVEL_3,
        "quasidefinitive": PublicationLevel.LEVEL_3,
        "definitive": PublicationLevel.LEVEL_4,
    }

    def __init__(self, data_type: Optional[str] = None):
        """Initialize with a data type to determine the publication level."""
        if data_type:
            self.level = self.TYPE_TO_LEVEL.get(data_type.lower())
        else:
            raise ValueError("data_type must be provided.")

        if not self.level:
            raise ValueError(f"Unsupported data_type: {data_type}")

    def to_string(self) -> str:
        """Return the publication level as a string."""
        return self.level


class ImagCDFFactory(TimeseriesFactory):
    """Factory for creating and writing ImagCDF formatted CDF files.

    This class extends the TimeseriesFactory to support writing geomagnetic
    time series data to files in the ImagCDF format using the cdflib library.
    """
    
    isUniqueTimes=True #used to determine depend_0 and CDF Time Variable Name
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

    def __init__(
        self,
        observatory: Optional[str] = None,
        channels: List[str] = ("H", "D", "Z", "F"),
        type: DataType = "variation",
        interval: DataInterval = "minute",
        urlTemplate="file://{obs}_{dt}_{t}.cdf",
        urlInterval: int = -1,
    ):
        """
        Initialize the ImagCDFFactory with default parameters.

        Parameters:
        - observatory: IAGA code of the observatory (e.g., 'BOU').
        - channels: List of geomagnetic elements (e.g., ['H', 'D', 'Z', 'F']).
        - type: Data type indicating the processing level (e.g., 'variation', 'definitive').
        - interval: Data interval (e.g., 'minute', 'second').
        - urlTemplate: Template for generating file URLs or paths.
        - urlInterval: Interval size for splitting data into multiple files.
        """
        super().__init__(
            observatory=observatory,
            channels=channels,
            type=type,
            interval=interval,
            urlTemplate=urlTemplate,
            urlInterval=urlInterval,
        )

    def parse_string(self, data: str, **kwargs):
        """Parse ImagCDF formatted string data into a Stream.

        Note: Parsing from strings is not implemented in this factory.
        """
        raise NotImplementedError('"parse_string" not implemented')
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
    def write_file(self, fh, timeseries: Stream, channels: List[str]):
        # Create a temporary file to write the CDF data
        with tempfile.NamedTemporaryFile(delete=False, suffix='.cdf') as tmp_file:
            tmp_file_path = tmp_file.name
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

        try:
            # Initialize the CDF writer
            cdf_spec = {
                'Compressed': 9,          # Enable compression (0-9)
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
                'Majority': CDFWriter.ROW_MAJOR,        # Data layout - gets set automatically
                'Encoding': CDFWriter.IBMPC_ENCODING, #  gets set automatically
                'Checksum': True,        # Disable checksum for faster writes (optional)
                'rDim_sizes': [],         # Applicable only if using rVariables - CDF protocol recommends only using zVariables. 
            }

Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
            cdf_writer = CDFWriter(path=tmp_file_path, cdf_spec=cdf_spec, delete=True)
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

            # Write global attributes
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            global_attrs = self._create_global_attributes(timeseries, channels)
            cdf_writer.write_globalattrs(global_attrs)

            # Time variables
            time_vars = self._create_time_stamp_variables(timeseries) #modifies self.isUniqueTimes
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            for ts_name, ts_data in time_vars.items():
                # Define time variable specification
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                var_spec = {
                    'Variable': ts_name,
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
                    'Data_Type': CDFWriter.CDF_TIME_TT2000,  # CDF_TIME_TT2000
                    'Num_Elements': 1,
                    'Rec_Vary': True,
                    'Var_Type': 'zVariable',
                    'Dim_Sizes': [],
                    'Sparse': 'no_sparse',
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
                    'Compress': 9,
                    'Pad': None,
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                }
                # Define time variable attributes
                var_attrs = self._create_time_var_attrs(ts_name)

                # Write time variable
                cdf_writer.write_var(var_spec, var_attrs, ts_data)
            # Data variables
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            for trace in timeseries:
                channel = trace.stats.channel
                if channel in TEMPERATURE_ELEMENTS_ID:
                    temperature_index += 1 #MUST INCREMENT INDEX BEFORE USING
                    var_name = f"Temperature{temperature_index}"
                else:
                    var_name = f"GeomagneticField{channel}"
                data_type = self._get_cdf_data_type(trace)
                num_elements = 1
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
                if data_type in [CDFWriter.CDF_CHAR, CDFWriter.CDF_UCHAR]:  # Handle string types
                    num_elements = len(trace.data[0]) if len(trace.data) > 0 else 1
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                var_spec = {
                    'Variable': var_name,
                    'Data_Type': data_type,
                    'Num_Elements': num_elements,
                    'Rec_Vary': True,
                    'Var_Type': 'zVariable',
                    'Dim_Sizes': [],
                    'Sparse': 'no_sparse',
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
                    'Compress': 9,
                    'Pad': None,
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                }
                var_attrs = self._create_var_attrs(trace, temperature_index, self.isUniqueTimes)

                # Write data variable
                cdf_writer.write_var(var_spec, var_attrs, trace.data)

            # Close the CDF writer
            cdf_writer.close()
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

            # Copy the temporary CDF file to the final file handle
            with open(tmp_file_path, "rb") as tmp:
                cdf_data = tmp.read()
                fh.write(cdf_data)

        finally:
            # Cleanup the temporary file
            os.remove(tmp_file_path)

Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
    def put_timeseries(
        self,
        timeseries: Stream,
        starttime: Optional[UTCDateTime] = None,
        endtime: Optional[UTCDateTime] = None,
        channels: Optional[List[str]] = None,
        type: Optional[DataType] = None,
        interval: Optional[DataInterval] = None,
    ):
        """
        Store timeseries data in ImagCDF format using cdflib.

        This method writes the timeseries data to one or more files, depending
        on the specified urlInterval.

        Parameters:
        - timeseries: ObsPy Stream containing the geomagnetic data.
        - starttime: Start time of the data to be written.
        - endtime: End time of the data to be written.
        - channels: List of channels to include in the output file.
        - type: Data type indicating the processing level.
        - interval: Data interval of the data.
        """
        if len(timeseries) == 0:
            # No data to store
            return

        channels = channels or self.channels
        type = type or self.type
        interval = interval or self.interval

        # Extract metadata from the first trace
        stats = timeseries[0].stats
        delta = stats.delta  # Sample rate
        observatory = stats.station
        starttime = starttime or stats.starttime
        endtime = endtime or stats.endtime

        # Split data into intervals if necessary
        urlIntervals = Util.get_intervals(
            starttime=starttime, endtime=endtime, size=self.urlInterval
        )
        for urlInterval in urlIntervals:
            interval_start = urlInterval["start"]
            interval_end = urlInterval["end"]
            if interval_start != interval_end:
                interval_end = interval_end - delta
            url = self._get_url(
                observatory=observatory,
                date=interval_start,
                type=type,
                interval=interval,
                channels=channels,
            )

            # Handle 'stdout' output
            if url == 'stdout':
                # Write directly to stdout
                fh = sys.stdout.buffer
                url_data = timeseries.slice(
                    starttime=interval_start,
                    endtime=interval_end,
                )
                self.write_file(fh, url_data, channels)
                continue  # Proceed to next interval if any

            # Handle 'file://' output
            elif url.startswith('file://'):
                # Get the file path from the URL
                url_file = Util.get_file_from_url(url, createParentDirectory=False)
                url_data = timeseries.slice(
                    starttime=interval_start,
                    endtime=interval_end,
                )

                # Check if the file already exists to merge data
                if os.path.isfile(url_file):
                    try:
                        # Read existing data to merge with new data
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
                        existing_cdf = CDFReader(url_file)
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                        existing_stream = self._read_cdf(existing_cdf)
                        # existing_cdf.close() #no close method?
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                        existing_data = existing_stream

                        # Merge existing data with new data
                        for trace in existing_data:
                            new_trace = url_data.select(
                                network=trace.stats.network,
                                station=trace.stats.station,
                                channel=trace.stats.channel,
                            )
                            if new_trace:
                                trace.data = np.concatenate((trace.data, new_trace[0].data))
                        url_data = existing_data + url_data
                    except Exception as e:
                        # Log the exception if needed
                        print(f"Warning: Could not read existing CDF file '{url_file}': {e}", file=sys.stderr)
                        # Proceed with new data

                # Pad the data with NaNs to ensure it fits the interval
                url_data.trim(
                    starttime=interval_start,
                    endtime=interval_end,
                    nearest_sample=False,
                    pad=True,
                    fill_value=np.nan,
                )

                # Write the data to the CDF file
                with open(url_file, "wb") as fh:
                    self.write_file(fh, url_data, channels)

            else:
                # Unsupported URL scheme encountered
                raise TimeseriesFactoryException("Unsupported URL scheme in urlTemplate")

    def get_timeseries(
        self,
        starttime: UTCDateTime,
        endtime: UTCDateTime,
        add_empty_channels: bool = True,
        observatory: Optional[str] = None,
        channels: Optional[List[str]] = None,
        type: Optional[DataType] = None,
        interval: Optional[DataInterval] = None,
    ) -> Stream:
        observatory = observatory or self.observatory
        channels = channels or self.channels
        type = type or self.type
        interval = interval or self.interval

        timeseries = Stream()
        urlIntervals = Util.get_intervals(
            starttime=starttime, endtime=endtime, size=self.urlInterval
        )

        for urlInterval in urlIntervals:
            url = self._get_url(
                observatory=observatory,
                date=urlInterval["start"],
                type=type,
                interval=interval,
                channels=channels,
            )
            if url == 'stdout':
                continue  # stdout is not a valid input source
            if not url.startswith("file://"):
                raise TimeseriesFactoryException("Only file urls are supported for reading ImagCDF")

            url_file = Util.get_file_from_url(url, createParentDirectory=False)
            if not os.path.isfile(url_file):
                # If file doesn't exist, skip
                continue

            try:
                # Read CDF data and merge
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
                cdf = CDFReader(url_file)
                file_stream = self._read_cdf(cdf)
                # Attempt to select only requested channels
                selected = Stream()
                for ch in channels:
                    selected += file_stream.select(channel=ch)
                timeseries += selected
            except Exception as e:
                print(f"Error reading CDF file '{url_file}': {e}", file=sys.stderr)

        # After reading all intervals, merge and trim
        timeseries.merge(fill_value=np.nan)
        timeseries.trim(
            starttime=starttime,
            endtime=endtime,
            nearest_sample=False,
            pad=True,
            fill_value=np.nan,
        )

        # If requested, add empty channels not present in data
        if add_empty_channels:
            present_channels = {tr.stats.channel for tr in timeseries}
            missing_channels = set(channels) - present_channels
            for ch in missing_channels:
                empty_trace = self._get_empty_trace(
                    starttime=starttime,
                    endtime=endtime,
                    observatory=observatory,
                    channel=ch,
                    data_type=type,
                    interval=interval,
                )
                timeseries += empty_trace

        timeseries.sort()
        return timeseries

Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
    def _create_global_attributes(self, timeseries: Stream, channels: List[str]) -> dict:
        """
        Create a dictionary of global attributes for the ImagCDF file.

        These attributes apply to all the data in the file and include metadata
        such as observatory information, data publication level, and format
        descriptions.

        References:
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
        - ImagCDF Technical Documentation: ImagCDF Global Attributes
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        """
        stats = timeseries[0].stats if len(timeseries) > 0 else None

        # Extract metadata from stats or fallback to defaults
        observatory_name = getattr(stats, 'station_name', None) or self.observatory or ""
        station = getattr(stats, 'station', None) or ""
        institution = getattr(stats, 'agency_name', None) or ""
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        latitude = getattr(stats, 'geodetic_latitude', None) or 0.0
        longitude = getattr(stats, 'geodetic_longitude', None) or 0.0
        elevation = getattr(stats, 'elevation', None) or 99_999.0
        conditions_of_use = getattr(stats, 'conditions_of_use', None) or ""
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        vector_orientation = getattr(stats, 'sensor_orientation', None) or ""
        data_interval_type = getattr(stats, 'data_interval_type', None) or self.interval
        data_type = getattr(stats, 'data_type', None) or "variation"
        sensor_sampling_rate = getattr(stats, 'sensor_sampling_rate', None) or 0.0
        comments = getattr(stats, 'filter_comments', None) or ['']
        declination_base = getattr(stats, 'declination_base', None) or 0.0
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        publication_level = IMCDFPublicationLevel(data_type=self.type).to_string()
        global_attrs = {
            'FormatDescription': {0: 'INTERMAGNET CDF Format'},
            'FormatVersion': {0: '1.2'},
            'Title': {0: 'Geomagnetic time series data'},
            'IagaCode': {0: station},
            'ElementsRecorded': {0: ''.join(channels)},
            'PublicationLevel': {0: publication_level},
            'PublicationDate': {0: [cdflib.cdfepoch.timestamp_to_tt2000(datetime.timestamp(datetime.now(timezone.utc))), "cdf_time_tt2000"]},
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            'ObservatoryName': {0: observatory_name},
            'Latitude': {0: np.array([latitude], dtype=np.float64)},
            'Longitude': {0: np.array([longitude], dtype=np.float64)},
            'Elevation': {0: np.array([elevation], dtype=np.float64)},
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            'Institution': {0: institution},
            'VectorSensOrient': {0: vector_orientation}, #remove F - because its a calculation, not an element?
            'StandardLevel': {0: 'None'},  # Set to 'None'
            # Temporarily Omit 'StandardName', 'StandardVersion', 'PartialStandDesc'
            'Source': {0: 'institute'}, # "institute" - if the named institution provided the data, “INTERMAGNET” - if the data file has been created by INTERMAGNET from another data source, “WDC” - if the World Data Centre has created the file from another data source
            'TermsOfUse': {0: conditions_of_use},
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            # 'UniqueIdentifier': {0: ''},
            # 'ParentIdentifiers': {0: ''},
            # 'ReferenceLinks': {0: ''}, #links to /ws, plots, USGS.gov 
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
            'SensorSamplingRate': {0: sensor_sampling_rate}, #Optional            
            'DataType': {0: data_type},#Optional
            'Comments': {0: comments}, #Optional
            'DeclinationBase': {0: declination_base}, #Optional
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        }
        return global_attrs

    def _create_time_stamp_variables(self, timeseries: Stream) -> dict:
        vector_times = None
        scalar_times = None
        temperature_times = {}
        temperature_index = 1
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        for trace in timeseries:
            channel = trace.stats.channel
            times = [
                (trace.stats.starttime + trace.stats.delta * i).datetime.replace(tzinfo=timezone.utc)
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                for i in range(trace.stats.npts)
            ]
            # Convert timestamps to TT2000 format required by CDF
            tt2000_times = cdflib.cdfepoch.timestamp_to_tt2000([time.timestamp() for time in times])

            if channel in self._get_vector_elements():
                if vector_times is None:
                    vector_times = tt2000_times
                else:
                    if not np.array_equal(vector_times, tt2000_times):
                        raise ValueError("Time stamps for vector channels are not the same.")
            elif channel in self._get_scalar_elements():
                if scalar_times is None:
                    scalar_times = tt2000_times
                else:
                    if not np.array_equal(scalar_times, tt2000_times):
                        raise ValueError("Time stamps for scalar channels are not the same.")
            elif channel in TEMPERATURE_ELEMENTS_ID:
                ts_key = f"Temperature{temperature_index}Times"
                if ts_key not in temperature_times:
                    temperature_times[ts_key] = tt2000_times
                    temperature_index += 1
                else:
                    temperature_times[ts_key] = tt2000_times
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

        time_vars = {}
        if vector_times is not None:
            time_vars['GeomagneticVectorTimes'] = vector_times
        if scalar_times is not None:
            time_vars['GeomagneticScalarTimes'] = scalar_times
        if temperature_times:
            time_vars.update(temperature_times)
            
        last_times = []
        self.isUniqueTimes = len(time_vars) == 1 #true if only one set of times, else default to false.
        for index, times in enumerate(time_vars.values()):
            if index > 0:
                self.isUniqueTimes = not np.array_equal(last_times, times)
            last_times = times
        
        return time_vars if self.isUniqueTimes else {"DataTimes": last_times}
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed


    def _create_var_spec(
        self,
        var_name: str,
        data_type: str,
        num_elements: int,
        var_type: str,
        dim_sizes: List[int],
        sparse: bool,
        compress: int,
        pad: Optional[Union[str, np.ndarray]],
    ) -> dict:
        """
        Create a variable specification dictionary for cdflib.

        This is used to define the properties of a variable when writing it to
        the CDF file.

        Parameters:
        - var_name: Name of the variable.
        - data_type: CDF data type.
        - num_elements: Number of elements per record.
        - var_type: Variable type ('zVariable' or 'rVariable').
        - dim_sizes: Dimensions of the variable (empty list for 0D).
        - sparse: Whether the variable uses sparse records.
        - compress: Compression level.
        - pad: Pad value for sparse records.

        Reference:
        - CDF User's Guide: Variable Specification
        """
        var_spec = {
            'Variable': var_name,
            'Data_Type': data_type,
            'Num_Elements': num_elements,
            'Rec_Vary': True,
            'Var_Type': var_type,
            'Dim_Sizes': dim_sizes,
            'Sparse': 'no_sparse' if not sparse else 'pad_sparse',
            'Compress': compress,
            'Pad': pad,
        }
        return var_spec

    def _create_var_attrs(self, trace: Trace, temperature_index: Optional[int] = None, isUniqueTimes: Optional[bool] = True) -> dict:
        channel = trace.stats.channel.upper()
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        fieldnam = f"Geomagnetic Field Element {channel}" # “Geomagnetic Field Element ” + the element code or “Temperature ” + the name of the location where the temperature was recorded.
        units = '' # Must be one of “nT”, “Degrees of arc” or “Celsius”
        if channel == 'D':
            units = 'Degrees of arc'
            validmin = -360.0 
            validmax = 360.0 # A full circle representation
        elif channel == 'I':
            units = 'Degrees of arc'
            validmin = -90.0 
            validmax = 90.0 #The magnetic field vector can point straight down (+90°), horizontal (0°), or straight up (-90°).
        elif channel in TEMPERATURE_ELEMENTS_ID:
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            units = 'Celsius'
            fieldnam = f"Temperature {temperature_index} {trace.stats.location}"
            validmin = -273.15 #absolute zero
            validmax = 79_999
        elif channel in ['F','S']:
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            units = 'nT'
            validmin = 0.0 # negative magnetic field intestity not physically meaningful.
            validmax = 79_999.0
        elif channel in ['X', 'Y', 'Z', 'H', 'E', 'V', 'G']:
            units = 'nT'
            validmin = -79_999.0
            validmax = 79_999.0

        # Determine DEPEND_0 based on channel type
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        if channel in self._get_vector_elements():
            depend_0 = 'GeomagneticVectorTimes'
        elif channel in self._get_scalar_elements():
            depend_0 = 'GeomagneticScalarTimes'
        elif channel in TEMPERATURE_ELEMENTS_ID:
            depend_0 = f"Temperature{temperature_index}Times"
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

    
        var_attrs = {
            'FIELDNAM': fieldnam,
            'UNITS': units,
            'FILLVAL': 99999.0,
            'VALIDMIN': validmin,
            'VALIDMAX': validmax,
            'DEPEND_0': depend_0 if isUniqueTimes else "DataTimes",
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            'DISPLAY_TYPE': 'time_series',
            'LABLAXIS': channel,
            'DATA_INTERVAL_TYPE': trace.stats.data_interval_type 
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        }
        return var_attrs

    def _create_time_var_attrs(self, ts_name: str) -> dict:
        """
        Create a dictionary of time variable attributes.

        These attributes provide metadata for time variables.
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
        Note: None of these attributes are required for the time stamp variables.
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        Reference:
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
        - ImagCDF Technical Documentation: ImagCDF Data
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        """
        # var_attrs = {
            # 'UNITS': 'TT2000',
            # 'DISPLAY_TYPE': 'time_series',
            # 'LABLAXIS': 'Time',
        # }
        # return var_attrs
        return {}

    def _get_cdf_data_type(self, trace: Trace) -> int:
        """
        Map ObsPy trace data type to CDF data type.

        Determines the appropriate CDF data type based on the NumPy data type
        of the trace data.

        Returns:
        - CDF_DOUBLE (45) for floating-point data.
        - CDF_INT4 (41) for integer data.

        Reference:
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
        - See CDF for more data types
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        """

        if trace.data.dtype in [np.float32, np.float64]:
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
            return CDFWriter.CDF_DOUBLE
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        elif trace.data.dtype in [np.int32, np.int64]:
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
            return CDFWriter.CDF_INT4
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        else:
            # Default to double precision float
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
            return CDFWriter.CDF_DOUBLE
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
    def _read_cdf(self, cdf: CDFReader) -> Stream:
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        """
        Read CDF data into an ObsPy Stream.

        This method reads the data variables and their corresponding time
        variables from a CDF file and constructs an ObsPy Stream.

        Parameters:
        - cdf: cdflib CDF object representing the open CDF file.

        Returns:
        - An ObsPy Stream containing the data from the CDF file.
        """
        stream = Stream()

        # Extract global attributes
        global_attrs = cdf.globalattsget()
        
        # Map global attributes to Stream-level metadata
        observatory = global_attrs.get('IagaCode', [''])[0]
        station_name = global_attrs.get('ObservatoryName', [''])[0]
        institution = global_attrs.get('Institution', [''])[0]
        latitude = global_attrs.get('Latitude', [0.0])[0]
        longitude = global_attrs.get('Longitude', [0.0])[0]
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
        elevation = global_attrs.get('Elevation', [99_999.0])[0] #default to 99_999 per technical documents.
        sensor_sampling_rate = global_attrs.get('SensorSamplingRate', [0.0])[0]
        sensor_orientation = global_attrs.get('VectorSensOrient', [''])[0]
        data_type = global_attrs.get('DataType', ['variation'])[0]
        publication_level = global_attrs.get('PublicationLevel', ['1'])[0]
        comments = global_attrs.get('Comments', [''])
        terms_of_use = global_attrs.get('TermsOfUse', [''])[0]
        declination_base = global_attrs.get('DeclinationBase', [0.0])[0]

        # Identify all time variables
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        time_vars = {}
        for var in cdf.cdf_info().zVariables:
            if var.lower().endswith('times'):
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                time_data = cdf.varget(var)
                unix_times = cdflib.cdfepoch.unixtime(time_data)
                utc_times = [UTCDateTime(t) for t in unix_times]
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
                time_vars[var] = utc_times

        # Read data variables and associate them with time variables
        for var in cdf.cdf_info().zVariables:
            # Skip time variables
            if var.lower().endswith('times'):
                continue

            data = cdf.varget(var)
            attrs = cdf.varattsget(var)

            # Determine DEPEND_0 (the time variable name) and validate
            ts_name = attrs.get('DEPEND_0')
            if not ts_name:
                # If no DEPEND_0, skip this variable as we cannot map times
                continue

            # If we used a DataTimes fallback or similar, ensure we handle it case-insensitively
            # and also confirm that time_vars keys are checked properly.
            # The ImagCDF can have DataTimes, GeomagneticVectorTimes, etc.
            # So try exact match first, if not found, attempt case-insensitive.
            matched_time_key = None
            for tkey in time_vars.keys():
                if tkey == ts_name:
                    matched_time_key = tkey
                    break
                if tkey.lower() == ts_name.lower():
                    matched_time_key = tkey
                    break

            if matched_time_key not in time_vars:
                # If we cannot find the matching time variable, skip this variable
                continue

            times = time_vars[matched_time_key]

            # Determine delta (sample interval)
            if len(times) > 1:
                # delta as a float of seconds
                delta = (times[1].timestamp - times[0].timestamp)
            else:
                # if only one sample, use default based on interval
                # convert minute, second, etc. to numeric delta
                if self.interval == 'minute':
                    delta = 60.0
                elif self.interval == 'second':
                    delta = 1.0
                elif self.interval == 'hour':
                    delta = 3600.0
                else:
                    # fallback, set delta to 60
                    delta = 60.0

            # Map the variable name back to a standard channel code
            # Geomagnetic fields are named like GeomagneticFieldH, GeomagneticFieldD, etc.
            # Temperatures are named like Temperature1, Temperature2, ...
            # Extract channel name by removing known prefixes
            if var.startswith("GeomagneticField"):
                channel = var.replace("GeomagneticField", "")
            elif var.startswith("Temperature"):
                # Temperature variables may not map directly to a geomagnetic channel
                # but to temperature sensors. We can just use the label from LABLAXIS if needed
                channel = attrs.get('LABLAXIS', var)
            else:
                # fallback if naming doesn't match expected patterns
                channel = var

            time_attrs =cdf.varattsget(var)
            data_interval = time_attrs.get('DATA_INTERVAL_TYPE', [''])
            # Create a trace
            trace = Trace(
                data=data,
                header={
                    'station': observatory,
                    'channel': channel,
                    'starttime': times[0],
                    'delta': delta,
                    'geodetic_latitude': latitude,
                    'geodetic_longitude': longitude,
                    'elevation': elevation,
                    'sensor_orientation': "".join(sensor_orientation),
                    'data_type': data_type,
                    'station_name': station_name,
                    'agency_name': institution,
                    'conditions_of_use': terms_of_use,
                    'sensor_sampling_rate': sensor_sampling_rate, 
                    'data_interval_type': data_interval,
                    'declination_base': declination_base,
                    'filter_comments': comments 
                }
            )
            stream += trace
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

    def _get_url(
        self,
        observatory: str,
        date: UTCDateTime,
        type: DataType = "variation",
        interval: DataInterval = "minute",
        channels: Optional[List[str]] = None,
    ) -> str:
        """
        Generate the file URL specific to ImagCDF conventions.

        This method constructs the filename based on the ImagCDF naming
        conventions, which include the observatory code, date-time formatted
        according to the data interval, and the publication level.  
        
        [iaga-code]_[date-time]_[publication-level].cdf
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

        Parameters:
        - observatory: IAGA code of the observatory.
        - date: Start date for the file.
        - type: Data type indicating the processing level.
        - interval: Data interval (e.g., 'minute', 'second').
        - channels: List of channels (optional). Not Implemented
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

        Returns:
        - The formatted file URL or path.

        Reference:
Shavers, Nicholas H's avatar
Shavers, Nicholas H committed
        - ImagCDF Technical Documentation: ImagCDF File Names
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        """
        # Get the publication level for the type
        publication_level = IMCDFPublicationLevel(data_type=type).to_string()

        # Format of Date/Time Portion of Filename based on interval see reference: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#example-data-file:~:text=Format%20of%20Date,%EF%83%81
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        if interval == "year":
            date_format = date.strftime("%Y")
        elif interval == "month":
            date_format = date.strftime("%Y%m")
        elif interval == "day":
            date_format = date.strftime("%Y%m%d")
        elif interval == "hour":
            date_format = date.strftime("%Y%m%d_%H")
        elif interval == "minute":
            date_format = date.strftime("%Y%m%d_%H%M")
        elif interval == "second":
            date_format = date.strftime("%Y%m%d_%H%M%S")
        else:
            raise ValueError(f"Unsupported interval: {interval}") #tenhertz currently not supported
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed

        # Filename following ImagCDF convention, see reference: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdf-file-names
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        filename = f"{observatory.lower()}_{date_format}_{publication_level}.cdf"

        # If the urlTemplate explicitly specifies 'stdout', return 'stdout'
        if self.urlTemplate.lower() == "stdout":
            return "stdout"

        # Prepare parameters for templating
        params = {
            "date": date.datetime,
            "i": self._get_interval_abbreviation(interval),
            "interval": self._get_interval_name(interval),
            "minute": date.hour * 60 + date.minute,
            "month": date.strftime("%b").lower(),
            "MONTH": date.strftime("%b").upper(),
            "obs": observatory.lower(),
            "OBS": observatory.upper(),
            "t": publication_level,
            "type": self._get_type_name(type),
            "julian": date.strftime("%j"),
            "year": date.strftime("%Y"),
            "ymd": date.strftime("%Y%m%d"),
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
        }

        # Attempt to use the template provided in urlTemplate
        if "{" in self.urlTemplate and "}" in self.urlTemplate: 
Shavers, Nicholas H's avatar
poc
Shavers, Nicholas H committed
            try:
                return self.urlTemplate.format(**params)
            except KeyError as e:
                raise TimeseriesFactoryException(f"Invalid placeholder in urlTemplate: {e}")

        # If the urlTemplate doesn't support placeholders, assume 'file://' scheme
        if self.urlTemplate.startswith("file://"):
            base_path = self.urlTemplate[7:]  # Strip "file://"
            if not base_path or base_path == "{obs}_{dt}_{t}.cdf":
                base_path = os.getcwd()  # Default to current working directory
            return os.path.join(base_path, filename)

        # Unsupported URL scheme
        raise TimeseriesFactoryException(
            f"Unsupported URL scheme in urlTemplate: {self.urlTemplate}"
        )

    def _get_vector_elements(self):
        return {'X', 'Y', 'Z', 'H', 'D', 'E', 'V', 'I', 'F'}
    
    def _get_scalar_elements(self):