Skip to content
Snippets Groups Projects
ImagCDFFactory.py 31.01 KiB
"""ImagCDFFactory Implementation Using cdflib

This module provides the ImagCDFFactory class for creating and writing
geomagnetic time series data in the ImagCDF format using the cdflib library.
The ImagCDF format is based on NASA's Common Data Format (CDF) and is designed
to store geomagnetic data with high precision.

References:
- ImagCDF Technical Note: https://intermagnet.org/docs/technical/im_tn_8_ImagCDF.pdf
- ImagCDF Official Documentation: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdfv1-2-intermagnet-exchange-format
- CDF: http://cdf.gsfc.nasa.gov/
"""

from __future__ import absolute_import, print_function
from io import BytesIO
import os
import sys
from typing import List, Optional, Union
from datetime import datetime, timezone
import numpy as np
from obspy import Stream, Trace, UTCDateTime
from sqlalchemy import true

from geomagio.TimeseriesFactory import TimeseriesFactory
from geomagio.api.ws.Element import TEMPERATURE_ELEMENTS_ID

from .geomag_types import DataInterval, DataType
from .TimeseriesFactoryException import TimeseriesFactoryException
from . import TimeseriesUtility
from . import Util

import cdflib
import tempfile


class IMCDFPublicationLevel:
    """Handles publication levels and mapping between data types and levels.

    The ImagCDF format uses publication levels to describe the processing
    level of the data. This class maps data types (e.g., 'variation', 'definitive')
    to their corresponding publication levels as defined in the ImagCDF documentation.

    Publication Levels:
        1: Raw data with no processing.
        2: Edited data with preliminary baselines.
        3: Data suitable for initial bulletins or quasi-definitive publication.
        4: Definitive data with no further changes expected.

    Reference:
    - ImagCDF Documentation Section 4.2: Attributes that Uniquely Identify the Data
    """

    class PublicationLevel:
        LEVEL_1 = "1"
        LEVEL_2 = "2"
        LEVEL_3 = "3"
        LEVEL_4 = "4"

    TYPE_TO_LEVEL = {
        "none": PublicationLevel.LEVEL_1,
        "variation": PublicationLevel.LEVEL_1,
        "reported": PublicationLevel.LEVEL_1,
        "provisional": PublicationLevel.LEVEL_2,
        "adjusted": PublicationLevel.LEVEL_2,
        "quasi-definitive": PublicationLevel.LEVEL_3,
        "quasidefinitive": PublicationLevel.LEVEL_3,
        "definitive": PublicationLevel.LEVEL_4,
    }

    def __init__(self, data_type: Optional[str] = None):
        """Initialize with a data type to determine the publication level."""
        if data_type:
            self.level = self.TYPE_TO_LEVEL.get(data_type.lower())
        else:
            raise ValueError("data_type must be provided.")

        if not self.level:
            raise ValueError(f"Unsupported data_type: {data_type}")

    def to_string(self) -> str:
        """Return the publication level as a string."""
        return self.level


class ImagCDFFactory(TimeseriesFactory):
    """Factory for creating and writing ImagCDF formatted CDF files.

    This class extends the TimeseriesFactory to support writing geomagnetic
    time series data to files in the ImagCDF format using the cdflib library.
    """
    
    isUniqueTimes=True #used to determine depend_0 and CDF Time Variable Name

    def __init__(
        self,
        observatory: Optional[str] = None,
        channels: List[str] = ("H", "D", "Z", "F"),
        type: DataType = "variation",
        interval: DataInterval = "minute",
        urlTemplate="file://{obs}_{dt}_{t}.cdf",
        urlInterval: int = -1,
    ):
        """
        Initialize the ImagCDFFactory with default parameters.

        Parameters:
        - observatory: IAGA code of the observatory (e.g., 'BOU').
        - channels: List of geomagnetic elements (e.g., ['H', 'D', 'Z', 'F']).
        - type: Data type indicating the processing level (e.g., 'variation', 'definitive').
        - interval: Data interval (e.g., 'minute', 'second').
        - urlTemplate: Template for generating file URLs or paths.
        - urlInterval: Interval size for splitting data into multiple files.
        """
        super().__init__(
            observatory=observatory,
            channels=channels,
            type=type,
            interval=interval,
            urlTemplate=urlTemplate,
            urlInterval=urlInterval,
        )

    def parse_string(self, data: str, **kwargs):
        """Parse ImagCDF formatted string data into a Stream.

        Note: Parsing from strings is not implemented in this factory.
        """
        raise NotImplementedError('"parse_string" not implemented')
    
    def write_file(self, fh, timeseries: Stream, channels: List[str]):
        # Create a temporary file to write the CDF data
        with tempfile.NamedTemporaryFile(delete=False, suffix='.cdf') as tmp_file:
            tmp_file_path = tmp_file.name

        try:
            # Initialize the CDF writer
            cdf_spec = {
                'Compressed': 9,          # Enable compression (0-9)
                # 'Majority': 'row_major',        # Data layout - gets set automatically
                # 'Encoding': 'ibmpc',      # Corrupts CDF - gets set automatically
                'Checksum': True,        # Disable checksum for faster writes (optional)
                'rDim_sizes': [],         # Applicable only if using rVariables - CDF protocol recommends only using zVariables. 
            }

            cdf_writer = cdflib.cdfwrite.CDF(path=tmp_file_path, cdf_spec=cdf_spec, delete=True)

            # Write global attributes
            global_attrs = self._create_global_attributes(timeseries, channels)
            cdf_writer.write_globalattrs(global_attrs)

            # Time variables
            time_vars = self._create_time_stamp_variables(timeseries) #modifies self.isUniqueTimes
            for ts_name, ts_data in time_vars.items():
                # Define time variable specification
                var_spec = {
                    'Variable': ts_name,
                    'Data_Type': 33,  # CDF_TIME_TT2000
                    'Num_Elements': 1,
                    'Rec_Vary': True,
                    'Var_Type': 'zVariable',
                    'Dim_Sizes': [],
                    'Sparse': 'no_sparse',
                    'Compress': 6,
                    'Pad': None,
                }
                # Define time variable attributes
                var_attrs = self._create_time_var_attrs(ts_name)

                # Write time variable
                cdf_writer.write_var(var_spec, var_attrs, ts_data)
            
            
            # Data variables
            temperature_index = 0
            for trace in timeseries:
                channel = trace.stats.channel
                if channel in TEMPERATURE_ELEMENTS_ID:
                    temperature_index += 1 #MUST INCREMENT INDEX BEFORE USING
                    var_name = f"Temperature{temperature_index}"
                else:
                    var_name = f"GeomagneticField{channel}"
                data_type = self._get_cdf_data_type(trace)
                num_elements = 1
                CDF_CHAR = 51
                CDF_UCHAR = 52
                if data_type in [CDF_CHAR, CDF_UCHAR]:  # Handle string types
                    num_elements = len(trace.data[0]) if len(trace.data) > 0 else 1
            
                var_spec = {
                    'Variable': var_name,
                    'Data_Type': data_type,
                    'Num_Elements': num_elements,
                    'Rec_Vary': True,
                    'Var_Type': 'zVariable',
                    'Dim_Sizes': [],
                    'Sparse': 'no_sparse',
                    'Compress': 6,
                    'Pad': None,
                }
                var_attrs = self._create_var_attrs(trace, temperature_index, self.isUniqueTimes)

                # Write data variable
                cdf_writer.write_var(var_spec, var_attrs, trace.data)

            # Close the CDF writer
            cdf_writer.close()

            # Copy the temporary CDF file to the final file handle
            with open(tmp_file_path, "rb") as tmp:
                cdf_data = tmp.read()
                fh.write(cdf_data)

        finally:
            # Cleanup the temporary file
            os.remove(tmp_file_path)

    def put_timeseries(
        self,
        timeseries: Stream,
        starttime: Optional[UTCDateTime] = None,
        endtime: Optional[UTCDateTime] = None,
        channels: Optional[List[str]] = None,
        type: Optional[DataType] = None,
        interval: Optional[DataInterval] = None,
    ):
        """
        Store timeseries data in ImagCDF format using cdflib.

        This method writes the timeseries data to one or more files, depending
        on the specified urlInterval.

        Parameters:
        - timeseries: ObsPy Stream containing the geomagnetic data.
        - starttime: Start time of the data to be written.
        - endtime: End time of the data to be written.
        - channels: List of channels to include in the output file.
        - type: Data type indicating the processing level.
        - interval: Data interval of the data.
        """
        if len(timeseries) == 0:
            # No data to store
            return

        channels = channels or self.channels
        type = type or self.type
        interval = interval or self.interval

        # Extract metadata from the first trace
        stats = timeseries[0].stats
        delta = stats.delta  # Sample rate
        observatory = stats.station
        starttime = starttime or stats.starttime
        endtime = endtime or stats.endtime

        # Split data into intervals if necessary
        urlIntervals = Util.get_intervals(
            starttime=starttime, endtime=endtime, size=self.urlInterval
        )
        for urlInterval in urlIntervals:
            interval_start = urlInterval["start"]
            interval_end = urlInterval["end"]
            if interval_start != interval_end:
                interval_end = interval_end - delta
            url = self._get_url(
                observatory=observatory,
                date=interval_start,
                type=type,
                interval=interval,
                channels=channels,
            )

            # Handle 'stdout' output
            if url == 'stdout':
                # Write directly to stdout
                fh = sys.stdout.buffer
                url_data = timeseries.slice(
                    starttime=interval_start,
                    endtime=interval_end,
                )
                self.write_file(fh, url_data, channels)
                continue  # Proceed to next interval if any

            # Handle 'file://' output
            elif url.startswith('file://'):
                # Get the file path from the URL
                url_file = Util.get_file_from_url(url, createParentDirectory=False)
                url_data = timeseries.slice(
                    starttime=interval_start,
                    endtime=interval_end,
                )

                # Check if the file already exists to merge data
                if os.path.isfile(url_file):
                    try:
                        # Read existing data to merge with new data
                        existing_cdf = cdflib.cdfread.CDF(url_file)
                        existing_stream = self._read_cdf(existing_cdf)
                        # existing_cdf.close() #no close method?
                        existing_data = existing_stream

                        # Merge existing data with new data
                        for trace in existing_data:
                            new_trace = url_data.select(
                                network=trace.stats.network,
                                station=trace.stats.station,
                                channel=trace.stats.channel,
                            )
                            if new_trace:
                                trace.data = np.concatenate((trace.data, new_trace[0].data))
                        url_data = existing_data + url_data
                    except Exception as e:
                        # Log the exception if needed
                        print(f"Warning: Could not read existing CDF file '{url_file}': {e}", file=sys.stderr)
                        # Proceed with new data

                # Pad the data with NaNs to ensure it fits the interval
                url_data.trim(
                    starttime=interval_start,
                    endtime=interval_end,
                    nearest_sample=False,
                    pad=True,
                    fill_value=np.nan,
                )

                # Write the data to the CDF file
                with open(url_file, "wb") as fh:
                    self.write_file(fh, url_data, channels)

            else:
                # Unsupported URL scheme encountered
                raise TimeseriesFactoryException("Unsupported URL scheme in urlTemplate")

    def _create_global_attributes(self, timeseries: Stream, channels: List[str]) -> dict:
        """
        Create a dictionary of global attributes for the ImagCDF file.

        These attributes apply to all the data in the file and include metadata
        such as observatory information, data publication level, and format
        descriptions.

        References:
        - ImagCDF Documentation Section 4: ImagCDF Global Attributes
        """
        stats = timeseries[0].stats if len(timeseries) > 0 else None

        # Extract metadata from stats or fallback to defaults
        observatory_name = getattr(stats, 'station_name', None) or self.observatory or "Unknown Observatory"
        station = getattr(stats, 'station', None) or "Unknown Iaga Code"
        institution = getattr(stats, 'agency_name', None) or "Unknown Institution"
        latitude = getattr(stats, 'geodetic_latitude', None) or 0.0
        longitude = getattr(stats, 'geodetic_longitude', None) or 0.0
        elevation = getattr(stats, 'elevation', None) or 99_999.0
        vector_orientation = getattr(stats, 'sensor_orientation', None) or ""
        data_interval_type = getattr(stats, 'data_interval_type', None) or self.interval
        publication_level = IMCDFPublicationLevel(data_type=self.type).to_string()
        global_attrs = {
            'FormatDescription': {0: 'INTERMAGNET CDF Format'},
            'FormatVersion': {0: '1.2'},
            'Title': {0: 'Geomagnetic time series data'},
            'IagaCode': {0: station},
            'ElementsRecorded': {0: ''.join(channels)},
            'PublicationLevel': {0: publication_level},
            'PublicationDate': {0: [cdflib.cdfepoch.timestamp_to_tt2000(datetime.timestamp(datetime.now(timezone.utc))), "cdf_time_tt2000"]},
            'ObservatoryName': {0: observatory_name},
            'Latitude': {0: np.array([latitude], dtype=np.float64)},
            'Longitude': {0: np.array([longitude], dtype=np.float64)},
            'Elevation': {0: np.array([elevation], dtype=np.float64)},
            'Institution': {0: institution},
            'VectorSensOrient': {0: vector_orientation}, #remove F - because its a calculation, not an element?
            'StandardLevel': {0: 'None'},  # Set to 'None'
            # Temporarily Omit 'StandardName', 'StandardVersion', 'PartialStandDesc'
            'Source': {0: 'institute'}, # "institute" - if the named institution provided the data, “INTERMAGNET” - if the data file has been created by INTERMAGNET from another data source, “WDC” - if the World Data Centre has created the file from another data source
            # 'TermsOfUse': {0: self.getINTERMAGNETTermsOfUse()},
            # 'UniqueIdentifier': {0: ''},
            # 'ParentIdentifiers': {0: ''},
            # 'ReferenceLinks': {0: ''}, #links to /ws, plots, USGS.gov 
        }
        return global_attrs

    def _create_time_stamp_variables(self, timeseries: Stream) -> dict:
        vector_times = None
        scalar_times = None
        temperature_times = {}
        temperature_index = 1
        for trace in timeseries:
            channel = trace.stats.channel
            times = [
                (trace.stats.starttime + trace.stats.delta * i).datetime
                for i in range(trace.stats.npts)
            ]
            # Convert timestamps to TT2000 format required by CDF
            tt2000_times = cdflib.cdfepoch.timestamp_to_tt2000([time.timestamp() for time in times])

            if channel in self._get_vector_elements():
                if vector_times is None:
                    vector_times = tt2000_times
                else:
                    if not np.array_equal(vector_times, tt2000_times):
                        raise ValueError("Time stamps for vector channels are not the same.")
            elif channel in self._get_scalar_elements():
                if scalar_times is None:
                    scalar_times = tt2000_times
                else:
                    if not np.array_equal(scalar_times, tt2000_times):
                        raise ValueError("Time stamps for scalar channels are not the same.")
            elif channel in TEMPERATURE_ELEMENTS_ID:
                ts_key = f"Temperature{temperature_index}Times"
                if ts_key not in temperature_times:
                    temperature_times[ts_key] = tt2000_times
                    temperature_index += 1
                else:
                    temperature_times[ts_key] = tt2000_times

        time_vars = {}
        if vector_times is not None:
            time_vars['GeomagneticVectorTimes'] = vector_times
        if scalar_times is not None:
            time_vars['GeomagneticScalarTimes'] = scalar_times
        if temperature_times:
            time_vars.update(temperature_times)
            
        last_times = []
        self.isUniqueTimes = len(time_vars) == 1 #true if only one set of times, else default to false.
        for index, times in enumerate(time_vars.values()):
            if index > 0:
                self.isUniqueTimes = not np.array_equal(last_times, times)
            last_times = times
        
        return time_vars if self.isUniqueTimes else {"DataTimes": last_times}


    def _create_var_spec(
        self,
        var_name: str,
        data_type: str,
        num_elements: int,
        var_type: str,
        dim_sizes: List[int],
        sparse: bool,
        compress: int,
        pad: Optional[Union[str, np.ndarray]],
    ) -> dict:
        """
        Create a variable specification dictionary for cdflib.

        This is used to define the properties of a variable when writing it to
        the CDF file.

        Parameters:
        - var_name: Name of the variable.
        - data_type: CDF data type.
        - num_elements: Number of elements per record.
        - var_type: Variable type ('zVariable' or 'rVariable').
        - dim_sizes: Dimensions of the variable (empty list for 0D).
        - sparse: Whether the variable uses sparse records.
        - compress: Compression level.
        - pad: Pad value for sparse records.

        Reference:
        - CDF User's Guide: Variable Specification
        """
        var_spec = {
            'Variable': var_name,
            'Data_Type': data_type,
            'Num_Elements': num_elements,
            'Rec_Vary': True,
            'Var_Type': var_type,
            'Dim_Sizes': dim_sizes,
            'Sparse': 'no_sparse' if not sparse else 'pad_sparse',
            'Compress': compress,
            'Pad': pad,
        }
        return var_spec

    def _create_var_attrs(self, trace: Trace, temperature_index: Optional[int] = None, isUniqueTimes: Optional[bool] = True) -> dict:
        channel = trace.stats.channel
        fieldnam = f"Geomagnetic Field Element {channel}" # “Geomagnetic Field Element ” + the element code or “Temperature ” + the name of the location where the temperature was recorded.
        units = '' # Must be one of “nT”, “Degrees of arc” or “Celsius”
        if channel == 'D':
            units = 'Degrees of arc'
            validmin = -360.0 
            validmax = 360.0 # A full circle representation
        elif channel == 'I':
            units = 'Degrees of arc'
            validmin = -90.0 
            validmax = 90.0 #The magnetic field vector can point straight down (+90°), horizontal (0°), or straight up (-90°).
        elif channel in TEMPERATURE_ELEMENTS_ID:
            units = 'Celsius'
            fieldnam = f"Temperature {temperature_index} {trace.stats.location}"
            validmin = -273.15 #absolute zero
            validmax = 79_999
        elif channel in ['F','S']:
            units = 'nT'
            validmin = 0.0 # negative magnetic field intestity not physically meaningful.
            validmax = 79_999.0
        elif channel in ['X', 'Y', 'Z', 'H', 'E', 'V', 'G']:
            units = 'nT'
            validmin = -79_999.0
            validmax = 79_999.0

        # Determine DEPEND_0 based on channel type
        if channel in self._get_vector_elements():
            depend_0 = 'GeomagneticVectorTimes'
        elif channel in self._get_scalar_elements():
            depend_0 = 'GeomagneticScalarTimes'
        elif channel in TEMPERATURE_ELEMENTS_ID:
            depend_0 = f"Temperature{temperature_index}Times"

    
        var_attrs = {
            'FIELDNAM': fieldnam,
            'UNITS': units,
            'FILLVAL': 99999.0,
            'VALIDMIN': validmin,
            'VALIDMAX': validmax,
            'DEPEND_0': depend_0 if isUniqueTimes else "DataTimes",
            'DISPLAY_TYPE': 'time_series',
            'LABLAXIS': channel,
        }
        return var_attrs

    def _create_time_var_attrs(self, ts_name: str) -> dict:
        """
        Create a dictionary of time variable attributes.

        These attributes provide metadata for time variables.
        Note: None of these attributes are required for the time stamp variables GeomagneticVectorTimes and GeomagneticScalarTimes.
        Reference:
        - ImagCDF Documentation Section 3: ImagCDF Data
        """
        # var_attrs = {
            # 'UNITS': 'TT2000',
            # 'DISPLAY_TYPE': 'time_series',
            # 'LABLAXIS': 'Time',
        # }
        # return var_attrs
        return {}

    def _get_cdf_data_type(self, trace: Trace) -> int:
        """
        Map ObsPy trace data type to CDF data type.

        Determines the appropriate CDF data type based on the NumPy data type
        of the trace data.

        Returns:
        - CDF_DOUBLE (45) for floating-point data.
        - CDF_INT4 (41) for integer data.

        Reference:
        - CDF Data Types: http://cdf.gsfc.nasa.gov/html/cdfdatatypes.html
        """
        # CDF data type constants
        CDF_DOUBLE = 45  # CDF_DOUBLE corresponds to 64-bit float
        CDF_INT4 = 41    # CDF_INT4 corresponds to 32-bit int

        if trace.data.dtype in [np.float32, np.float64]:
            return CDF_DOUBLE
        elif trace.data.dtype in [np.int32, np.int64]:
            return CDF_INT4
        else:
            # Default to double precision float
            return CDF_DOUBLE

    def _read_cdf(self, cdf: cdflib.cdfread.CDF) -> Stream:
        """
        Read CDF data into an ObsPy Stream.

        This method reads the data variables and their corresponding time
        variables from a CDF file and constructs an ObsPy Stream.

        Parameters:
        - cdf: cdflib CDF object representing the open CDF file.

        Returns:
        - An ObsPy Stream containing the data from the CDF file.
        """
        stream = Stream()
        # Read time variables
        time_vars = {}
        for var in cdf.cdf_info().zVariables:
            if var.endswith('Time'):
                time_data = cdf.varget(var)
                # Convert TT2000 to UTCDateTime
                utc_times = [UTCDateTime(t) for t in cdflib.cdfepoch.to_datetime(time_data)]
                time_vars[var] = utc_times

        # Read data variables
        for var in cdf.cdf_info().zVariables:
            if not var.endswith('Time'):
                data = cdf.varget(var)
                attrs = cdf.varattsget(var)
                if 'DEPEND_0' in attrs:
                    ts_name = attrs['DEPEND_0']
                    if ts_name in time_vars:
                        times = time_vars[ts_name]
                        if len(times) > 1:
                            delta = times[1] - times[0]  # Calculate sample interval
                        else:
                            delta = 60 if self.interval == 'minute' else 1
                        trace = Trace(
                            data=data,
                            header={
                                'station': self.observatory,
                                'channel': var,
                                'starttime': times[0],
                                'delta': delta,
                            }
                        )
                        stream += trace
        return stream

    @staticmethod
    def getINTERMAGNETTermsOfUse() -> str:
        """
        Return the INTERMAGNET Terms of Use.

        These terms should be included in the 'TermsOfUse' global attribute
        as per the ImagCDF specification.

        Reference:
        - ImagCDF Documentation Section 4.5: Attributes that Relate to Publication of the Data
        """
        return (
            "CONDITIONS OF USE FOR DATA PROVIDED THROUGH INTERMAGNET:\n"
            "The data made available through INTERMAGNET are provided for\n"
            "your use and are not for commercial use or sale or distribution\n"
            "to third parties without the written permission of the institute\n"
            "(http://www.intermagnet.org/Institutes_e.html) operating\n"
            "the observatory. Publications making use of the data\n"
            "should include an acknowledgment statement of the form given below.\n"
            "A citation reference should be sent to the INTERMAGNET Secretary\n"
            "(secretary@intermagnet.org) for inclusion in a publications list\n"
            "on the INTERMAGNET website.\n"
            "\n"
            "     ACKNOWLEDGEMENT OF DATA FROM OBSERVATORIES\n"
            "     PARTICIPATING IN INTERMAGNET\n"
            "We offer two acknowledgement templates. The first is for cases\n"
            "where data from many observatories have been used and it is not\n"
            "practical to list them all, or each of their operating institutes.\n"
            "The second is for cases where research results have been produced\n"
            "using a smaller set of observatories.\n"
            "\n"
            "     Suggested Acknowledgement Text (template 1)\n"
            "The results presented in this paper rely on data collected\n"
            "at magnetic observatories. We thank the national institutes that\n"
            "support them and INTERMAGNET for promoting high standards of\n"
            "magnetic observatory practice (www.intermagnet.org).\n"
            "\n"
            "     Suggested Acknowledgement Text (template 2)\n"
            "The results presented in this paper rely on the data\n"
            "collected at <observatory name>. We thank <institute name>,\n"
            "for supporting its operation and INTERMAGNET for promoting high\n"
            "standards of magnetic observatory practice (www.intermagnet.org).\n"
        )

    def _get_url(
        self,
        observatory: str,
        date: UTCDateTime,
        type: DataType = "variation",
        interval: DataInterval = "minute",
        channels: Optional[List[str]] = None,
    ) -> str:
        """
        Generate the file URL specific to ImagCDF conventions.

        This method constructs the filename based on the ImagCDF naming
        conventions, which include the observatory code, date-time formatted
        according to the data interval, and the publication level.  
        
        [iaga-code]_[date-time]_[publication-level].cdf

        Parameters:
        - observatory: IAGA code of the observatory.
        - date: Start date for the file.
        - type: Data type indicating the processing level.
        - interval: Data interval (e.g., 'minute', 'second').
        - channels: List of channels (optional). Not Implemented

        Returns:
        - The formatted file URL or path.

        Reference:
        - ImagCDF Documentation Section 5: ImagCDF File Names
        """
        # Get the publication level for the type
        publication_level = IMCDFPublicationLevel(data_type=type).to_string()

        # Format of Date/Time Portion of Filename based on interval see reference: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#example-data-file:~:text=Format%20of%20Date,%EF%83%81
        if interval == "year":
            date_format = date.strftime("%Y")
        elif interval == "month":
            date_format = date.strftime("%Y%m")
        elif interval == "day":
            date_format = date.strftime("%Y%m%d")
        elif interval == "hour":
            date_format = date.strftime("%Y%m%d_%H")
        elif interval == "minute":
            date_format = date.strftime("%Y%m%d_%H%M")
        elif interval == "second":
            date_format = date.strftime("%Y%m%d_%H%M%S")
        else:
            raise ValueError(f"Unsupported interval: {interval}") #tenhertz currently not supported

        # Filename following ImagCDF convention, see reference: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdf-file-names
        filename = f"{observatory.lower()}_{date_format}_{publication_level}.cdf"

        # If the urlTemplate explicitly specifies 'stdout', return 'stdout'
        if self.urlTemplate.lower() == "stdout":
            return "stdout"

        # Prepare parameters for templating
        params = {
            "date": date.datetime,
            "i": self._get_interval_abbreviation(interval),
            "interval": self._get_interval_name(interval),
            "minute": date.hour * 60 + date.minute,
            "month": date.strftime("%b").lower(),
            "MONTH": date.strftime("%b").upper(),
            "obs": observatory.lower(),
            "OBS": observatory.upper(),
            "t": publication_level,
            "type": self._get_type_name(type),
            "julian": date.strftime("%j"),
            "year": date.strftime("%Y"),
            "ymd": date.strftime("%Y%m%d"),
            "dt": date_format,  # Add the date-time formatted string
        }

        # Attempt to use the template provided in urlTemplate
        if "{" in self.urlTemplate and "}" in self.urlTemplate: 
            try:
                return self.urlTemplate.format(**params)
            except KeyError as e:
                raise TimeseriesFactoryException(f"Invalid placeholder in urlTemplate: {e}")

        # If the urlTemplate doesn't support placeholders, assume 'file://' scheme
        if self.urlTemplate.startswith("file://"):
            base_path = self.urlTemplate[7:]  # Strip "file://"
            if not base_path or base_path == "{obs}_{dt}_{t}.cdf":
                base_path = os.getcwd()  # Default to current working directory
            return os.path.join(base_path, filename)

        # Unsupported URL scheme
        raise TimeseriesFactoryException(
            f"Unsupported URL scheme in urlTemplate: {self.urlTemplate}"
        )

    def _get_vector_elements(self):
        return {'X', 'Y', 'Z', 'H', 'D', 'E', 'V', 'I', 'F'}
    
    def _get_scalar_elements(self):
        return {'S', 'G'}