Skip to content
Snippets Groups Projects
ImagCDFFactory.py 34.3 KiB
Newer Older
  • Learn to ignore specific revisions
  • Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    """ImagCDFFactory Implementation Using cdflib
    
    This module provides the ImagCDFFactory class for creating and writing
    geomagnetic time series data in the ImagCDF format using the cdflib library.
    The ImagCDF format is based on NASA's Common Data Format (CDF) and is designed
    to store geomagnetic data with high precision.
    
    References:
    
    - ImagCDF Technical Note: https://intermagnet.org/docs/technical/im_tn_8_ImagCDF.pdf
    - ImagCDF Official Documentation: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdfv1-2-intermagnet-exchange-format
    
    - CDF User Guide: https://spdf.gsfc.nasa.gov/pub/software/cdf/doc/cdf_User_Guide.pdf
    - CDFLib Docs: [https://cdflib.readthedocs.io/en/latest/#, https://cdflib.readthedocs.io/en/stable/modules/index.html] 
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    """
    
    import os
    import sys
    
    from typing import List, Optional
    from datetime import datetime, timezone
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    import numpy as np
    from obspy import Stream, Trace, UTCDateTime
    
    from geomagio.TimeseriesFactory import TimeseriesFactory
    
    from geomagio.api.ws.Element import TEMPERATURE_ELEMENTS_ID
    
    from ..geomag_types import DataInterval, DataType
    from ..TimeseriesFactoryException import TimeseriesFactoryException
    from .. import Util
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
    from cdflib.cdfwrite import CDF as CDFWriter
    from cdflib.cdfread import CDF as CDFReader
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    import tempfile
    
    from .IMCDFPublication import IMCDFPublicationLevel
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
    
    class ImagCDFFactory(TimeseriesFactory):
        """Factory for creating and writing ImagCDF formatted CDF files.
    
        This class extends the TimeseriesFactory to support writing geomagnetic
        time series data to files in the ImagCDF format using the cdflib library.
        """
    
    
        isUniqueTimes = True  # used to determine depend_0 and CDF Time Variable Name
    
        NONSTANDARD_ELEMENTS = (
            TEMPERATURE_ELEMENTS_ID  # for using non standard elements such as UK1
        )
        __FILL_VALUE = 99_999
        __MAX_COMPRESSION = 9
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
        def __init__(
            self,
            observatory: Optional[str] = None,
            channels: List[str] = ("H", "D", "Z", "F"),
            type: DataType = "variation",
            interval: DataInterval = "minute",
    
            inputFile: Optional[str] = None,
            output: Optional[str] = "iaga2002",
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
        ):
            """
            Initialize the ImagCDFFactory with default parameters.
    
            Parameters:
            - observatory: IAGA code of the observatory (e.g., 'BOU').
            - channels: List of geomagnetic elements (e.g., ['H', 'D', 'Z', 'F']).
            - type: Data type indicating the processing level (e.g., 'variation', 'definitive').
            - interval: Data interval (e.g., 'minute', 'second').
            - urlTemplate: Template for generating file URLs or paths.
            - urlInterval: Interval size for splitting data into multiple files.
    
            - inputFile: An ImagCDF file to read data from. If not provided urlTemplate is assumed path for reads.
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
            """
            super().__init__(
                observatory=observatory,
                channels=channels,
                type=type,
                interval=interval,
                urlTemplate=urlTemplate,
            )
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
            self.output = (
                output if output in ["iaga2002", "imfjson"] else "iaga2002"
            )  # determines mapping of publication level to data type
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
        def write_file(self, fh, timeseries: Stream, channels: List[str]):
            # Create a temporary file to write the CDF data
    
            with tempfile.NamedTemporaryFile(delete=False, suffix=".cdf") as tmp_file:
    
                tmp_file_path = tmp_file.name
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
            try:
                # Initialize the CDF writer
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
                    "Compressed": self.__MAX_COMPRESSION,  # Max Gzip compression (1-9). Almost always the GZIP is the best choice for all data. (CDF User Guide p.23 1.4.3)
    
                    "Majority": CDFWriter.ROW_MAJOR,  # The first dimension changes the slowest (CDF User Guide p.45 2.3.15 Majority)
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
                    "Encoding": CDFWriter.NETWORK_ENCODING,  #  XDR Encoding -Portable between two or more different types of computers.
    
                    "Checksum": True,  # True for Data Integrity. False for faster writes (optional)
    
                    "rDim_sizes": [],  # Applicable only if using rVariables - CDF protocol recommends only using zVariables.
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
                cdf_writer = CDFWriter(path=tmp_file_path, cdf_spec=cdf_spec, delete=True)
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
    
                # Write global attributes
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                global_attrs = self._create_global_attributes(timeseries, channels)
                cdf_writer.write_globalattrs(global_attrs)
    
    
                # Time variables
    
                time_vars = self._create_time_stamp_variables(
                    timeseries
                )  # modifies self.isUniqueTimes
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                for ts_name, ts_data in time_vars.items():
    
                    # Define time variable specification
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                    var_spec = {
    
                        "Variable": ts_name,
                        "Data_Type": CDFWriter.CDF_TIME_TT2000,  # CDF_TIME_TT2000
                        "Num_Elements": 1,
                        "Rec_Vary": True,
                        "Var_Type": "zVariable",
                        "Dim_Sizes": [],
    
                        "Sparse": "no_sparse",  # no_sparse because there should not be time gaps. (Time stamps must represent a regular time series with no missing values in the series)
    
                        "Pad": None,
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                    }
    
                    # Define time variable attributes
    
                    var_attrs = (
                        {}
                    )  # ImagCDF does not require or recommend any 'time' variable attributes.
    
    
                    # Write time variable
                    cdf_writer.write_var(var_spec, var_attrs, ts_data)
    
                # Data variables
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                for trace in timeseries:
                    channel = trace.stats.channel
    
                    var_name = f"GeomagneticField{channel}"
                    # if channel in REAL_TEMPERATURE:
                    #     temperature_index += 1  # MUST INCREMENT INDEX BEFORE USING
                    #     var_name = f"Temperature{temperature_index}"
    
                    data_type = (
                        CDFWriter.CDF_INT4
                        if trace.data.dtype in [np.int32, np.int64]
                        else CDFWriter.CDF_DOUBLE
                    )
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                    var_spec = {
    
                        "Variable": var_name,
                        "Data_Type": data_type,
    
                        "Rec_Vary": True,
                        "Var_Type": "zVariable",
                        "Dim_Sizes": [],
                        "Sparse": "no_sparse",
    
                        "Pad": None,
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                    }
    
                    var_attrs = self._create_var_attrs(
                        trace, temperature_index, self.isUniqueTimes
                    )
    
    
                    # Write data variable
                    cdf_writer.write_var(var_spec, var_attrs, trace.data)
    
                # Close the CDF writer
                cdf_writer.close()
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
                # Copy the temporary CDF file to the final file handle
                with open(tmp_file_path, "rb") as tmp:
                    cdf_data = tmp.read()
                    fh.write(cdf_data)
    
            finally:
                # Cleanup the temporary file
    
                os.remove(tmp_file_path)
    
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
        def put_timeseries(
            self,
            timeseries: Stream,
            starttime: Optional[UTCDateTime] = None,
            endtime: Optional[UTCDateTime] = None,
            channels: Optional[List[str]] = None,
            type: Optional[DataType] = None,
            interval: Optional[DataInterval] = None,
        ):
            """
            Store timeseries data in ImagCDF format using cdflib.
    
            This method writes the timeseries data to one or more files, depending
            on the specified urlInterval.
    
            Parameters:
            - timeseries: ObsPy Stream containing the geomagnetic data.
            - starttime: Start time of the data to be written.
            - endtime: End time of the data to be written.
            - channels: List of channels to include in the output file.
            - type: Data type indicating the processing level.
            - interval: Data interval of the data.
            """
            if len(timeseries) == 0:
                # No data to store
                return
    
            channels = channels or self.channels
            type = type or self.type
            interval = interval or self.interval
    
            # Extract metadata from the first trace
            stats = timeseries[0].stats
            observatory = stats.station
            starttime = starttime or stats.starttime
            endtime = endtime or stats.endtime
            # Split data into intervals if necessary
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
            urlIntervals = Util.get_intervals(
                starttime=starttime, endtime=endtime, size=self.urlInterval
            )
            for urlInterval in urlIntervals:
                interval_start = urlInterval["start"]
                interval_end = urlInterval["end"]
                url = self._get_url(
                    observatory=observatory,
                    date=interval_start,
                    type=type,
                    interval=interval,
                    channels=channels,
                )
    
                # Handle 'stdout' output
    
                if url == "stdout":
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                    # Write directly to stdout
                    fh = sys.stdout.buffer
                    url_data = timeseries.slice(
                        starttime=interval_start,
                        endtime=interval_end,
                    )
                    self.write_file(fh, url_data, channels)
                    continue  # Proceed to next interval if any
    
                # Handle 'file://' output
    
                elif url.startswith("file://"):
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                    # Get the file path from the URL
                    url_file = Util.get_file_from_url(url, createParentDirectory=False)
                    url_data = timeseries.slice(
                        starttime=interval_start,
                        endtime=interval_end,
                    )
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
                    # Check if the file already exists
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                    if os.path.isfile(url_file):
    
                        raise TimeseriesFactoryException(
                            f"Error: File '{url_file}' already exists."
                        )
    
                    # Pad the data to ensure it fits the interval
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                    url_data.trim(
                        starttime=interval_start,
                        endtime=interval_end,
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                        pad=True,
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
                        fill_value=self.__FILL_VALUE,
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                    )
    
                    # Write the data to the CDF file
                    with open(url_file, "wb") as fh:
                        self.write_file(fh, url_data, channels)
    
                else:
                    # Unsupported URL scheme encountered
    
                    raise TimeseriesFactoryException(
                        "Unsupported URL scheme in urlTemplate"
                    )
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
    
        def get_timeseries(
            self,
            starttime: UTCDateTime,
            endtime: UTCDateTime,
            add_empty_channels: bool = True,
            observatory: Optional[str] = None,
            channels: Optional[List[str]] = None,
            type: Optional[DataType] = None,
            interval: Optional[DataInterval] = None,
        ) -> Stream:
            observatory = observatory or self.observatory
            channels = channels or self.channels
            type = type or self.type
            interval = interval or self.interval
    
            timeseries = Stream()
            urlIntervals = Util.get_intervals(
                starttime=starttime, endtime=endtime, size=self.urlInterval
            )
    
            for urlInterval in urlIntervals:
    
                    url = self._get_url(
                        observatory=observatory,
                        date=urlInterval["start"],
                        type=type,
                        interval=interval,
                        channels=channels,
    
                    if url == "stdout":
                        continue  # stdout is not a valid input source
                    if not url.startswith("file://"):
                        raise TimeseriesFactoryException(
                            "Only file urls are supported for reading ImagCDF"
                        )
                    url_file = Util.get_file_from_url(url, createParentDirectory=False)
                    if not os.path.isfile(url_file):
                        # If file doesn't exist, skip
                        continue
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
                    cdf = CDFReader(url_file)
    
                except Exception as e:
                    print(f"Error reading CDF file '{url_file}': {e}", file=sys.stderr)
    
            # After reading all intervals, merge and trim
    
            timeseries.merge(fill_value=self.__FILL_VALUE)
    
            timeseries.trim(
                starttime=starttime,
                endtime=endtime,
    
            )
    
            # If requested, add empty channels not present in data
            if add_empty_channels:
                present_channels = {tr.stats.channel for tr in timeseries}
                missing_channels = set(channels) - present_channels
                for ch in missing_channels:
                    empty_trace = self._get_empty_trace(
                        starttime=starttime,
                        endtime=endtime,
                        observatory=observatory,
                        channel=ch,
                        data_type=type,
                        interval=interval,
                    )
                    timeseries += empty_trace
    
            timeseries.sort()
            return timeseries
    
    
        def _create_global_attributes(
            self, timeseries: Stream, channels: List[str]
        ) -> dict:
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
            """
            Create a dictionary of global attributes for the ImagCDF file.
    
            These attributes apply to all the data in the file and include metadata
            such as observatory information, data publication level, and format
            descriptions.
    
            References:
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
            - ImagCDF Technical Documentation: ImagCDF Global Attributes
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
            """
            stats = timeseries[0].stats if len(timeseries) > 0 else None
    
            # Extract metadata from stats or fallback to defaults
    
            observatory_name = (
                getattr(stats, "station_name", None) or self.observatory or ""
            )
            station = getattr(stats, "station", None) or ""
    
            network = getattr(stats, "network", None) or ""
    
            institution = getattr(stats, "agency_name", None) or ""
            latitude = getattr(stats, "geodetic_latitude", None) or 0.0
            longitude = getattr(stats, "geodetic_longitude", None) or 0.0
    
            elevation = getattr(stats, "elevation", None) or self.__FILL_VALUE
    
            conditions_of_use = getattr(stats, "conditions_of_use", None) or ""
            vector_orientation = getattr(stats, "sensor_orientation", None) or ""
            data_type = getattr(stats, "data_type", None) or "variation"
            sensor_sampling_rate = getattr(stats, "sensor_sampling_rate", None) or 0.0
            comments = getattr(stats, "filter_comments", None) or [""]
            declination_base = getattr(stats, "declination_base", None) or 0.0
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
            publication_level = IMCDFPublicationLevel(value=self.type).get_level()
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
            global_attrs = {
    
                "FormatDescription": {0: "INTERMAGNET CDF Format"},
                "FormatVersion": {0: "1.2"},
                "Title": {0: "Geomagnetic time series data"},
                "IagaCode": {0: station},
                "ElementsRecorded": {0: "".join(channels)},
                "PublicationLevel": {0: publication_level},
                "PublicationDate": {
                    0: [
    
                            datetime.timestamp(datetime.now(timezone.utc))
                        ),
                        "cdf_time_tt2000",
                    ]
                },
                "ObservatoryName": {0: observatory_name},
                "Latitude": {0: np.array([latitude], dtype=np.float64)},
                "Longitude": {0: np.array([longitude], dtype=np.float64)},
                "Elevation": {0: np.array([elevation], dtype=np.float64)},
                "Institution": {0: institution},
    
                "VectorSensOrient": {0: vector_orientation},
                "StandardLevel": {0: "None"},
                # Can Omit 'StandardName', 'StandardVersion', 'PartialStandDesc' when StandardLevel=None
    
                "Source": {
                    0: "institute"
                },  # "institute" - if the named institution provided the data, “INTERMAGNET” - if the data file has been created by INTERMAGNET from another data source, “WDC” - if the World Data Centre has created the file from another data source
                "TermsOfUse": {0: conditions_of_use},
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                # 'UniqueIdentifier': {0: ''},
                # 'ParentIdentifiers': {0: ''},
    
                # 'ReferenceLinks': {0: ''}, #links to /ws, plots, USGS.gov
    
                # [[Custom Attributes]]
                "SensorSamplingRate": {0: sensor_sampling_rate},
                "DataType": {0: data_type},
                "Comments": {0: comments},
                "DeclinationBase": {0: declination_base},
                "Network": {0: network},
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
            }
            return global_attrs
    
        def _create_time_stamp_variables(self, timeseries: Stream) -> dict:
            vector_times = None
            scalar_times = None
    
            temperature_times = {}
            temperature_index = 1
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
            for trace in timeseries:
                channel = trace.stats.channel
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                times = [
    
                    (trace.stats.starttime + trace.stats.delta * i).datetime.replace(
                        tzinfo=timezone.utc
                    )
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                    for i in range(trace.stats.npts)
                ]
                # Convert timestamps to TT2000 format required by CDF
    
                    [time.timestamp() for time in times]
                )
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
                if channel in self._get_vector_elements():
                    if vector_times is None:
                        vector_times = tt2000_times
                    else:
                        if not np.array_equal(vector_times, tt2000_times):
    
                            raise ValueError(
                                "Time stamps for vector channels are not the same."
                            )
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                elif channel in self._get_scalar_elements():
                    if scalar_times is None:
                        scalar_times = tt2000_times
                    else:
                        if not np.array_equal(scalar_times, tt2000_times):
    
                            raise ValueError(
                                "Time stamps for scalar channels are not the same."
                            )
    
                # elif channel in REAL_TEMPERATURES:
                #     ts_key = f"Temperature{temperature_index}Times"
                #     if ts_key not in temperature_times:
                #         temperature_times[ts_key] = tt2000_times
                #         temperature_index += 1
                #     else:
                #         temperature_times[ts_key] = tt2000_times
                elif channel in self.NONSTANDARD_ELEMENTS:
                    if scalar_times is None:
                        nonstandard_times = tt2000_times
    
                        if not np.array_equal(scalar_times, tt2000_times):
                            raise ValueError(
                                "Time stamps for nonstandard channels are not the same."
                            )
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
            time_vars = {}
            if vector_times is not None:
    
                time_vars["GeomagneticVectorTimes"] = vector_times
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
            if scalar_times is not None:
    
                time_vars["GeomagneticScalarTimes"] = scalar_times
    
            if temperature_times:
                time_vars.update(temperature_times)
    
            isNonStandard = nonstandard_times is not None
            if isNonStandard:
                time_vars["NonstandardTimes"] = nonstandard_times
    
    
            self.isUniqueTimes = (
    
                len(channels) == 1
            )  # (If a single channel, then use specific naming i.e GeomagneticVectorTimes)
    
            for index, times in enumerate(time_vars.values()):
                if index > 0:
                    self.isUniqueTimes = not np.array_equal(last_times, times)
                last_times = times
    
            if self.isUniqueTimes and isNonStandard and len(time_vars) > 2:
                raise ValueError(
                    f"Time stamps must be the same for all channels when using nonstandard channels: {','.join(self.NONSTANDARD_ELEMENTS)}"
                )
            return (
                {"DataTimes": last_times}
                if isNonStandard or not self.isUniqueTimes
                else time_vars
            )
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
    
        def _create_var_attrs(
            self,
            trace: Trace,
            temperature_index: Optional[int] = None,
            isUniqueTimes: Optional[bool] = True,
        ) -> dict:
    
            channel = trace.stats.channel.upper()
    
            fieldnam = f"Geomagnetic Field Element {channel}"  # “Geomagnetic Field Element ” + the element code or “Temperature ” + the name of the location where the temperature was recorded.
            units = ""  # Must be one of “nT”, “Degrees of arc” or “Celsius”
            if channel == "D":
                units = "Degrees of arc"
                validmin = -360.0
                validmax = 360.0  # A full circle representation
            elif channel == "I":
                units = "Degrees of arc"
                validmin = -90.0
                validmax = 90.0  # The magnetic field vector can point straight down (+90°), horizontal (0°), or straight up (-90°).
    
            elif channel in TEMPERATURE_ELEMENTS_ID:
    
                units = "Celsius"
                validmin = -273.15  # absolute zero
    
                depend_0 = "DataTimes"  # can be used for nonstandard element
    
            # elif channel in [REAL_TEMPERATURES]:
            #     units = "Celsius"
            #     fieldnam = f"Temperature {temperature_index} {trace.stats.location}"
            #     validmin = -273.15  # absolute zero
            #     validmax = 79_999
    
            elif channel in ["F", "S"]:
                units = "nT"
                validmin = (
                    0.0  # negative magnetic field intestity not physically meaningful.
                )
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                validmax = 79_999.0
    
            elif channel in ["X", "Y", "Z", "H", "E", "V", "G"]:
                units = "nT"
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                validmin = -79_999.0
                validmax = 79_999.0
    
    
            # Determine DEPEND_0 based on channel type
    
            if not isUniqueTimes:
                depend_0 = "DataTimes"
            elif channel in self._get_vector_elements():
    
                depend_0 = "GeomagneticVectorTimes"
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
            elif channel in self._get_scalar_elements():
    
                depend_0 = "GeomagneticScalarTimes"
    
            # elif channel in REAL_TEMPERATURES:
            #     depend_0 = f"Temperature{temperature_index}Times"
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
            var_attrs = {
    
                "FIELDNAM": fieldnam,
                "UNITS": units,
    
                "VALIDMIN": validmin,
                "VALIDMAX": validmax,
    
                "DISPLAY_TYPE": "time_series",
                "LABLAXIS": channel,
    
                "DATA_INTERVAL_TYPE": trace.stats.data_interval_type,
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
            }
            return var_attrs
    
    
        def _read_cdf(self, cdf: CDFReader, channels: Optional[List[str]]) -> Stream:
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
            """
            Read CDF data into an ObsPy Stream.
    
            This method reads the data variables and their corresponding time
            variables from a CDF file and constructs an ObsPy Stream.
    
            Parameters:
            - cdf: cdflib CDF object representing the open CDF file.
    
            Returns:
            - An ObsPy Stream containing the data from the CDF file.
            """
            stream = Stream()
    
    
            # Extract global attributes
            global_attrs = cdf.globalattsget()
    
            # Define required global attributes based on ImagCDF standards
            required_global_attrs = [
                "FormatDescription",
                "FormatVersion",
                "Title",
                "IagaCode",
                "ElementsRecorded",
                "PublicationLevel",
                "PublicationDate",
                "ObservatoryName",
                "Latitude",
                "Longitude",
                "Elevation",
                "Institution",
                "StandardLevel",
                "Source",
            ]
    
            # Check for missing required global attributes per ImagCDF techincal document
            missing_global_attrs = []
            for attr in required_global_attrs:
                if attr not in global_attrs:
                    missing_global_attrs.append(attr)
    
            if missing_global_attrs:
                error_message = (
                    f"The following required global attributes are missing from the CDF: "
                    f"{', '.join(missing_global_attrs)}"
                )
                raise TimeseriesFactoryException(error_message)
    
            # Map global attributes to Stream-level metadata
    
            observatory = global_attrs.get("IagaCode", [""])[0]
            station_name = global_attrs.get("ObservatoryName", [""])[0]
            institution = global_attrs.get("Institution", [""])[0]
            latitude = global_attrs.get("Latitude", [0.0])[0]
            longitude = global_attrs.get("Longitude", [0.0])[0]
    
            elevation = global_attrs.get("Elevation", [self.__FILL_VALUE])[
    
                0
            ]  # default to 99_999 per technical documents.
            sensor_orientation = global_attrs.get("VectorSensOrient", [""])[0]
    
            publication_level = global_attrs.get("PublicationLevel", ["0"])[0]
            publication = IMCDFPublicationLevel(publication_level)
    
            terms_of_use = global_attrs.get("TermsOfUse", [""])[0]
    
    
            # Not expected in ImagCDF
            data_type = (
                publication.get_imf_data_type()
                if "imf" in self.output
                else publication.get_iaga2002_data_type()
            )
    
            sensor_sampling_rate = global_attrs.get("SensorSamplingRate", [None])[0]
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
            comments = global_attrs.get("Comments", [])  # keep comments as an array
    
            declination_base = global_attrs.get("DeclinationBase", [None])[0]
            network = global_attrs.get("Network", [None])[0]
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
            time_vars = {}
    
            for var in cdf.cdf_info().zVariables:
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                    time_data = cdf.varget(var)
    
                    utc_times = [UTCDateTime(t) for t in unix_times]
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                    time_vars[var] = utc_times
    
    
            # Read data variables and associate them with time variables
    
            for var in cdf.cdf_info().zVariables:
    
                # Map the variable name back to a standard channel code by removing known prefixes
                # Names are like GeomagneticFieldH, GeomagneticFieldD, Temperature1, Temperature2, ...
    
                if var.startswith("GeomagneticField"):
                    channel = var.replace("GeomagneticField", "")
    
                # elif var.startswith("Temperature"):
                #     # Temperature variables may not map directly to a geomagnetic channel
                #     # but to temperature sensors. We can just use the label from LABLAXIS if needed
                #     channel = attrs.get("LABLAXIS", var)
    
                else:
                    # fallback if naming doesn't match expected patterns
                    channel = var
    
    
                if channels and channel not in channels:
                    continue
    
                data = cdf.varget(var)
                attrs = cdf.varattsget(var)
    
                # Determine DEPEND_0 (the time variable name) and validate
    
                ts_name = attrs.get("DEPEND_0")
    
                # The ImagCDF can have DataTimes, GeomagneticVectorTimes, GeomagneticScalarTimes, TemperatureNTimes (for N > 0), etc.
    
                matched_time_key = None
                for tkey in time_vars.keys():
                    if tkey == ts_name:
                        matched_time_key = tkey
                        break
    
    
                    times = time_vars[matched_time_key]
    
    
                # Determine delta (sample interval)
                if len(times) > 1:
                    # delta as a float of seconds
    
                    delta = times[1].timestamp - times[0].timestamp
    
                else:
                    # if only one sample, use default based on interval
                    # convert minute, second, etc. to numeric delta
    
                    if self.interval == "minute":
    
                    elif self.interval == "second":
    
                    elif self.interval == "hour":
    
                        delta = 3600.0
                    else:
                        # fallback, set delta to 60
                        delta = 60.0
    
    
                time_attrs = cdf.varattsget(var)
    
                data_interval = time_attrs.get("DATA_INTERVAL_TYPE", None)
    
    
                # Required variable attributes based on ImagCDF standards
                required_variable_attrs = [
                    "FIELDNAM",
                    "UNITS",
                    "FILLVAL",
                    "VALIDMIN",
                    "VALIDMAX",
                    "DISPLAY_TYPE",
                    "LABLAXIS",
    
                ]
                # Validate presence of required variable attributes
                missing_var_attrs = []
                for var_attr in required_variable_attrs:
                    if var_attr not in attrs:
                        missing_var_attrs.append(var_attr)
    
                if missing_var_attrs or missing_global_attrs:
                    error_message = (
                        f"The variable '{var}' is missing required attributes: "
                        f"{', '.join(missing_var_attrs)}"
                    )
                    raise TimeseriesFactoryException(error_message)
    
    
                header = {
                    "station": observatory,
                    "channel": channel,
                    "starttime": times[0],
                    "delta": delta,
                    "geodetic_latitude": latitude,
                    "geodetic_longitude": longitude,
                    "elevation": elevation,
                    "sensor_orientation": "".join(sensor_orientation),
                    "data_type": data_type,
                    "station_name": station_name,
                    "agency_name": institution,
                    "conditions_of_use": terms_of_use,
                }
                # data points not expected in ImagCDF specs
                if sensor_sampling_rate is not None:
                    header.update({"sensor_sampling_rate": sensor_sampling_rate})
                if data_interval is not None:
                    header.update({"data_interval_type": data_interval})
                if declination_base is not None:
                    header.update({"declination_base": declination_base})
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
                if len(comments) > 0:
    
                    header.update({"filter_comments": comments})
                if network is not None:
                    header.update({"network": network})
    
    
                # Create a trace
                trace = Trace(
                    data=data,
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
        def _get_url(
            self,
            observatory: str,
            date: UTCDateTime,
            type: DataType = "variation",
            interval: DataInterval = "minute",
            channels: Optional[List[str]] = None,
        ) -> str:
            """
            Generate the file URL specific to ImagCDF conventions.
    
            This method constructs the filename based on the ImagCDF naming
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
            conventions.
    
            [iaga-code]_[date-time]_[publication-level].cdf
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
            Parameters:
            - observatory: IAGA code of the observatory.
            - date: Start date for the file.
            - type: Data type indicating the processing level.
            - interval: Data interval (e.g., 'minute', 'second').
    
            - channels: List of channels (optional). Not Implemented
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
            Returns:
            - The formatted file URL or path.
    
            Reference:
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
            - ImagCDF Technical Documentation: ImagCDF File Names
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
            """
            # Get the publication level for the type
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
            publication_level = IMCDFPublicationLevel(value=type).get_level()
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
            if interval == "year":
                date_format = date.strftime("%Y")
            elif interval == "month":
                date_format = date.strftime("%Y%m")
            elif interval == "day":
                date_format = date.strftime("%Y%m%d")
            elif interval == "hour":
                date_format = date.strftime("%Y%m%d_%H")
            elif interval == "minute":
                date_format = date.strftime("%Y%m%d_%H%M")
            elif interval == "second":
                date_format = date.strftime("%Y%m%d_%H%M%S")
            else:
    
                raise ValueError(
                    f"Unsupported interval: {interval}"
                )  # tenhertz currently not supported
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
            filename = f"{observatory.lower()}_{date_format}_{publication_level}.cdf"
    
            # If the urlTemplate explicitly specifies 'stdout', return 'stdout'
            if self.urlTemplate.lower() == "stdout":
                return "stdout"
    
            # Prepare parameters for templating
            params = {
                "date": date.datetime,
                "i": self._get_interval_abbreviation(interval),
                "interval": self._get_interval_name(interval),
                "minute": date.hour * 60 + date.minute,
                "month": date.strftime("%b").lower(),
                "MONTH": date.strftime("%b").upper(),
                "obs": observatory.lower(),
                "OBS": observatory.upper(),
                "t": publication_level,
                "type": self._get_type_name(type),
                "julian": date.strftime("%j"),
                "year": date.strftime("%Y"),
                "ymd": date.strftime("%Y%m%d"),
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
            }
    
            # Attempt to use the template provided in urlTemplate
    
            if "{" in self.urlTemplate and "}" in self.urlTemplate:
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                try:
                    return self.urlTemplate.format(**params)
                except KeyError as e:
    
                    raise TimeseriesFactoryException(
                        f"Invalid placeholder in urlTemplate: {e}"
                    )
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
            # If the urlTemplate doesn't support placeholders, assume 'file://' scheme
            if self.urlTemplate.startswith("file://"):
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
                if not base_path or base_path == "{obs}_{dt}_{t}.cdf":
                    base_path = os.getcwd()  # Default to current working directory
    
                return os.path.join(self.urlTemplate, filename)
    
    Shavers, Nicholas H's avatar
    poc
    Shavers, Nicholas H committed
    
            # Unsupported URL scheme
            raise TimeseriesFactoryException(
                f"Unsupported URL scheme in urlTemplate: {self.urlTemplate}"
            )
    
        def _get_vector_elements(self):
    
            return {"X", "Y", "Z", "H", "D", "E", "V", "I", "F"}
    
        def _get_scalar_elements(self):
            return {"S", "G"}