"""ImagCDFFactory Implementation Using cdflib This module provides the ImagCDFFactory class for creating and writing geomagnetic time series data in the ImagCDF format using the cdflib library. The ImagCDF format is based on NASA's Common Data Format (CDF) and is designed to store geomagnetic data with high precision. References: - ImagCDF Technical Note: https://intermagnet.org/docs/technical/im_tn_8_ImagCDF.pdf - ImagCDF Official Documentation: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdfv1-2-intermagnet-exchange-format - CDF User Guide: https://spdf.gsfc.nasa.gov/pub/software/cdf/doc/cdf_User_Guide.pdf - CDFLib Docs: [https://cdflib.readthedocs.io/en/latest/#, https://cdflib.readthedocs.io/en/stable/modules/index.html] """ from __future__ import absolute_import, print_function from io import BytesIO import os import sys from typing import List, Optional, Union from datetime import datetime, timezone, tzinfo import numpy as np from obspy import Stream, Trace, UTCDateTime from sqlalchemy import true from geomagio.TimeseriesFactory import TimeseriesFactory from geomagio.api.ws.Element import TEMPERATURE_ELEMENTS_ID from .geomag_types import DataInterval, DataType from .TimeseriesFactoryException import TimeseriesFactoryException from . import TimeseriesUtility from . import Util import cdflib from cdflib.cdfwrite import CDF as CDFWriter from cdflib.cdfread import CDF as CDFReader import tempfile class IMCDFPublicationLevel: """Handles publication levels and mapping between data types and levels. The ImagCDF format uses publication levels to describe the processing level of the data. This class maps data types (e.g., 'variation', 'definitive') to their corresponding publication levels as defined in the ImagCDF documentation. Publication Levels: 1: Raw data with no processing. 2: Edited data with preliminary baselines. 3: Data suitable for initial bulletins or quasi-definitive publication. 4: Definitive data with no further changes expected. Reference: - ImagCDF Technical Documentation: Attributes that Uniquely Identify the Data """ class PublicationLevel: LEVEL_1 = "1" LEVEL_2 = "2" LEVEL_3 = "3" LEVEL_4 = "4" TYPE_TO_LEVEL = { "none": PublicationLevel.LEVEL_1, "variation": PublicationLevel.LEVEL_1, "reported": PublicationLevel.LEVEL_1, "provisional": PublicationLevel.LEVEL_2, "adjusted": PublicationLevel.LEVEL_2, "quasi-definitive": PublicationLevel.LEVEL_3, "quasidefinitive": PublicationLevel.LEVEL_3, "definitive": PublicationLevel.LEVEL_4, } def __init__(self, data_type: Optional[str] = None): """Initialize with a data type to determine the publication level.""" if data_type: self.level = self.TYPE_TO_LEVEL.get(data_type.lower()) else: raise ValueError("data_type must be provided.") if not self.level: raise ValueError(f"Unsupported data_type: {data_type}") def to_string(self) -> str: """Return the publication level as a string.""" return self.level class ImagCDFFactory(TimeseriesFactory): """Factory for creating and writing ImagCDF formatted CDF files. This class extends the TimeseriesFactory to support writing geomagnetic time series data to files in the ImagCDF format using the cdflib library. """ isUniqueTimes=True #used to determine depend_0 and CDF Time Variable Name def __init__( self, observatory: Optional[str] = None, channels: List[str] = ("H", "D", "Z", "F"), type: DataType = "variation", interval: DataInterval = "minute", urlTemplate="file://{obs}_{dt}_{t}.cdf", urlInterval: int = -1, ): """ Initialize the ImagCDFFactory with default parameters. Parameters: - observatory: IAGA code of the observatory (e.g., 'BOU'). - channels: List of geomagnetic elements (e.g., ['H', 'D', 'Z', 'F']). - type: Data type indicating the processing level (e.g., 'variation', 'definitive'). - interval: Data interval (e.g., 'minute', 'second'). - urlTemplate: Template for generating file URLs or paths. - urlInterval: Interval size for splitting data into multiple files. """ super().__init__( observatory=observatory, channels=channels, type=type, interval=interval, urlTemplate=urlTemplate, urlInterval=urlInterval, ) def parse_string(self, data: str, **kwargs): """Parse ImagCDF formatted string data into a Stream. Note: Parsing from strings is not implemented in this factory. """ raise NotImplementedError('"parse_string" not implemented') def write_file(self, fh, timeseries: Stream, channels: List[str]): # Create a temporary file to write the CDF data with tempfile.NamedTemporaryFile(delete=False, suffix='.cdf') as tmp_file: tmp_file_path = tmp_file.name try: # Initialize the CDF writer cdf_spec = { 'Compressed': 9, # Enable compression (0-9) 'Majority': CDFWriter.ROW_MAJOR, # Data layout - gets set automatically 'Encoding': CDFWriter.IBMPC_ENCODING, # gets set automatically 'Checksum': True, # Disable checksum for faster writes (optional) 'rDim_sizes': [], # Applicable only if using rVariables - CDF protocol recommends only using zVariables. } cdf_writer = CDFWriter(path=tmp_file_path, cdf_spec=cdf_spec, delete=True) # Write global attributes global_attrs = self._create_global_attributes(timeseries, channels) cdf_writer.write_globalattrs(global_attrs) # Time variables time_vars = self._create_time_stamp_variables(timeseries) #modifies self.isUniqueTimes for ts_name, ts_data in time_vars.items(): # Define time variable specification var_spec = { 'Variable': ts_name, 'Data_Type': CDFWriter.CDF_TIME_TT2000, # CDF_TIME_TT2000 'Num_Elements': 1, 'Rec_Vary': True, 'Var_Type': 'zVariable', 'Dim_Sizes': [], 'Sparse': 'no_sparse', 'Compress': 9, 'Pad': None, } # Define time variable attributes var_attrs = self._create_time_var_attrs(ts_name) # Write time variable cdf_writer.write_var(var_spec, var_attrs, ts_data) # Data variables temperature_index = 0 for trace in timeseries: channel = trace.stats.channel if channel in TEMPERATURE_ELEMENTS_ID: temperature_index += 1 #MUST INCREMENT INDEX BEFORE USING var_name = f"Temperature{temperature_index}" else: var_name = f"GeomagneticField{channel}" data_type = self._get_cdf_data_type(trace) num_elements = 1 if data_type in [CDFWriter.CDF_CHAR, CDFWriter.CDF_UCHAR]: # Handle string types num_elements = len(trace.data[0]) if len(trace.data) > 0 else 1 var_spec = { 'Variable': var_name, 'Data_Type': data_type, 'Num_Elements': num_elements, 'Rec_Vary': True, 'Var_Type': 'zVariable', 'Dim_Sizes': [], 'Sparse': 'no_sparse', 'Compress': 9, 'Pad': None, } var_attrs = self._create_var_attrs(trace, temperature_index, self.isUniqueTimes) # Write data variable cdf_writer.write_var(var_spec, var_attrs, trace.data) # Close the CDF writer cdf_writer.close() # Copy the temporary CDF file to the final file handle with open(tmp_file_path, "rb") as tmp: cdf_data = tmp.read() fh.write(cdf_data) finally: # Cleanup the temporary file os.remove(tmp_file_path) def put_timeseries( self, timeseries: Stream, starttime: Optional[UTCDateTime] = None, endtime: Optional[UTCDateTime] = None, channels: Optional[List[str]] = None, type: Optional[DataType] = None, interval: Optional[DataInterval] = None, ): """ Store timeseries data in ImagCDF format using cdflib. This method writes the timeseries data to one or more files, depending on the specified urlInterval. Parameters: - timeseries: ObsPy Stream containing the geomagnetic data. - starttime: Start time of the data to be written. - endtime: End time of the data to be written. - channels: List of channels to include in the output file. - type: Data type indicating the processing level. - interval: Data interval of the data. """ if len(timeseries) == 0: # No data to store return channels = channels or self.channels type = type or self.type interval = interval or self.interval # Extract metadata from the first trace stats = timeseries[0].stats delta = stats.delta # Sample rate observatory = stats.station starttime = starttime or stats.starttime endtime = endtime or stats.endtime # Split data into intervals if necessary urlIntervals = Util.get_intervals( starttime=starttime, endtime=endtime, size=self.urlInterval ) for urlInterval in urlIntervals: interval_start = urlInterval["start"] interval_end = urlInterval["end"] if interval_start != interval_end: interval_end = interval_end - delta url = self._get_url( observatory=observatory, date=interval_start, type=type, interval=interval, channels=channels, ) # Handle 'stdout' output if url == 'stdout': # Write directly to stdout fh = sys.stdout.buffer url_data = timeseries.slice( starttime=interval_start, endtime=interval_end, ) self.write_file(fh, url_data, channels) continue # Proceed to next interval if any # Handle 'file://' output elif url.startswith('file://'): # Get the file path from the URL url_file = Util.get_file_from_url(url, createParentDirectory=False) url_data = timeseries.slice( starttime=interval_start, endtime=interval_end, ) # Check if the file already exists to merge data if os.path.isfile(url_file): try: # Read existing data to merge with new data existing_cdf = CDFReader(url_file) existing_stream = self._read_cdf(existing_cdf) # existing_cdf.close() #no close method? existing_data = existing_stream # Merge existing data with new data for trace in existing_data: new_trace = url_data.select( network=trace.stats.network, station=trace.stats.station, channel=trace.stats.channel, ) if new_trace: trace.data = np.concatenate((trace.data, new_trace[0].data)) url_data = existing_data + url_data except Exception as e: # Log the exception if needed print(f"Warning: Could not read existing CDF file '{url_file}': {e}", file=sys.stderr) # Proceed with new data # Pad the data with NaNs to ensure it fits the interval url_data.trim( starttime=interval_start, endtime=interval_end, nearest_sample=False, pad=True, fill_value=np.nan, ) # Write the data to the CDF file with open(url_file, "wb") as fh: self.write_file(fh, url_data, channels) else: # Unsupported URL scheme encountered raise TimeseriesFactoryException("Unsupported URL scheme in urlTemplate") def get_timeseries( self, starttime: UTCDateTime, endtime: UTCDateTime, add_empty_channels: bool = True, observatory: Optional[str] = None, channels: Optional[List[str]] = None, type: Optional[DataType] = None, interval: Optional[DataInterval] = None, ) -> Stream: observatory = observatory or self.observatory channels = channels or self.channels type = type or self.type interval = interval or self.interval timeseries = Stream() urlIntervals = Util.get_intervals( starttime=starttime, endtime=endtime, size=self.urlInterval ) for urlInterval in urlIntervals: url = self._get_url( observatory=observatory, date=urlInterval["start"], type=type, interval=interval, channels=channels, ) if url == 'stdout': continue # stdout is not a valid input source if not url.startswith("file://"): raise TimeseriesFactoryException("Only file urls are supported for reading ImagCDF") url_file = Util.get_file_from_url(url, createParentDirectory=False) if not os.path.isfile(url_file): # If file doesn't exist, skip continue try: # Read CDF data and merge cdf = CDFReader(url_file) file_stream = self._read_cdf(cdf) # Attempt to select only requested channels selected = Stream() for ch in channels: selected += file_stream.select(channel=ch) timeseries += selected except Exception as e: print(f"Error reading CDF file '{url_file}': {e}", file=sys.stderr) # After reading all intervals, merge and trim timeseries.merge(fill_value=np.nan) timeseries.trim( starttime=starttime, endtime=endtime, nearest_sample=False, pad=True, fill_value=np.nan, ) # If requested, add empty channels not present in data if add_empty_channels: present_channels = {tr.stats.channel for tr in timeseries} missing_channels = set(channels) - present_channels for ch in missing_channels: empty_trace = self._get_empty_trace( starttime=starttime, endtime=endtime, observatory=observatory, channel=ch, data_type=type, interval=interval, ) timeseries += empty_trace timeseries.sort() return timeseries def _create_global_attributes(self, timeseries: Stream, channels: List[str]) -> dict: """ Create a dictionary of global attributes for the ImagCDF file. These attributes apply to all the data in the file and include metadata such as observatory information, data publication level, and format descriptions. References: - ImagCDF Technical Documentation: ImagCDF Global Attributes """ stats = timeseries[0].stats if len(timeseries) > 0 else None # Extract metadata from stats or fallback to defaults observatory_name = getattr(stats, 'station_name', None) or self.observatory or "" station = getattr(stats, 'station', None) or "" institution = getattr(stats, 'agency_name', None) or "" latitude = getattr(stats, 'geodetic_latitude', None) or 0.0 longitude = getattr(stats, 'geodetic_longitude', None) or 0.0 elevation = getattr(stats, 'elevation', None) or 99_999.0 conditions_of_use = getattr(stats, 'conditions_of_use', None) or "" vector_orientation = getattr(stats, 'sensor_orientation', None) or "" data_interval_type = getattr(stats, 'data_interval_type', None) or self.interval data_type = getattr(stats, 'data_type', None) or "variation" sensor_sampling_rate = getattr(stats, 'sensor_sampling_rate', None) or 0.0 comments = getattr(stats, 'filter_comments', None) or [''] declination_base = getattr(stats, 'declination_base', None) or 0.0 publication_level = IMCDFPublicationLevel(data_type=self.type).to_string() global_attrs = { 'FormatDescription': {0: 'INTERMAGNET CDF Format'}, 'FormatVersion': {0: '1.2'}, 'Title': {0: 'Geomagnetic time series data'}, 'IagaCode': {0: station}, 'ElementsRecorded': {0: ''.join(channels)}, 'PublicationLevel': {0: publication_level}, 'PublicationDate': {0: [cdflib.cdfepoch.timestamp_to_tt2000(datetime.timestamp(datetime.now(timezone.utc))), "cdf_time_tt2000"]}, 'ObservatoryName': {0: observatory_name}, 'Latitude': {0: np.array([latitude], dtype=np.float64)}, 'Longitude': {0: np.array([longitude], dtype=np.float64)}, 'Elevation': {0: np.array([elevation], dtype=np.float64)}, 'Institution': {0: institution}, 'VectorSensOrient': {0: vector_orientation}, #remove F - because its a calculation, not an element? 'StandardLevel': {0: 'None'}, # Set to 'None' # Temporarily Omit 'StandardName', 'StandardVersion', 'PartialStandDesc' 'Source': {0: 'institute'}, # "institute" - if the named institution provided the data, “INTERMAGNET” - if the data file has been created by INTERMAGNET from another data source, “WDC” - if the World Data Centre has created the file from another data source 'TermsOfUse': {0: conditions_of_use}, # 'UniqueIdentifier': {0: ''}, # 'ParentIdentifiers': {0: ''}, # 'ReferenceLinks': {0: ''}, #links to /ws, plots, USGS.gov 'SensorSamplingRate': {0: sensor_sampling_rate}, #Optional 'DataType': {0: data_type},#Optional 'Comments': {0: comments}, #Optional 'DeclinationBase': {0: declination_base}, #Optional } return global_attrs def _create_time_stamp_variables(self, timeseries: Stream) -> dict: vector_times = None scalar_times = None temperature_times = {} temperature_index = 1 for trace in timeseries: channel = trace.stats.channel times = [ (trace.stats.starttime + trace.stats.delta * i).datetime.replace(tzinfo=timezone.utc) for i in range(trace.stats.npts) ] # Convert timestamps to TT2000 format required by CDF tt2000_times = cdflib.cdfepoch.timestamp_to_tt2000([time.timestamp() for time in times]) if channel in self._get_vector_elements(): if vector_times is None: vector_times = tt2000_times else: if not np.array_equal(vector_times, tt2000_times): raise ValueError("Time stamps for vector channels are not the same.") elif channel in self._get_scalar_elements(): if scalar_times is None: scalar_times = tt2000_times else: if not np.array_equal(scalar_times, tt2000_times): raise ValueError("Time stamps for scalar channels are not the same.") elif channel in TEMPERATURE_ELEMENTS_ID: ts_key = f"Temperature{temperature_index}Times" if ts_key not in temperature_times: temperature_times[ts_key] = tt2000_times temperature_index += 1 else: temperature_times[ts_key] = tt2000_times time_vars = {} if vector_times is not None: time_vars['GeomagneticVectorTimes'] = vector_times if scalar_times is not None: time_vars['GeomagneticScalarTimes'] = scalar_times if temperature_times: time_vars.update(temperature_times) last_times = [] self.isUniqueTimes = len(time_vars) == 1 #true if only one set of times, else default to false. for index, times in enumerate(time_vars.values()): if index > 0: self.isUniqueTimes = not np.array_equal(last_times, times) last_times = times return time_vars if self.isUniqueTimes else {"DataTimes": last_times} def _create_var_spec( self, var_name: str, data_type: str, num_elements: int, var_type: str, dim_sizes: List[int], sparse: bool, compress: int, pad: Optional[Union[str, np.ndarray]], ) -> dict: """ Create a variable specification dictionary for cdflib. This is used to define the properties of a variable when writing it to the CDF file. Parameters: - var_name: Name of the variable. - data_type: CDF data type. - num_elements: Number of elements per record. - var_type: Variable type ('zVariable' or 'rVariable'). - dim_sizes: Dimensions of the variable (empty list for 0D). - sparse: Whether the variable uses sparse records. - compress: Compression level. - pad: Pad value for sparse records. Reference: - CDF User's Guide: Variable Specification """ var_spec = { 'Variable': var_name, 'Data_Type': data_type, 'Num_Elements': num_elements, 'Rec_Vary': True, 'Var_Type': var_type, 'Dim_Sizes': dim_sizes, 'Sparse': 'no_sparse' if not sparse else 'pad_sparse', 'Compress': compress, 'Pad': pad, } return var_spec def _create_var_attrs(self, trace: Trace, temperature_index: Optional[int] = None, isUniqueTimes: Optional[bool] = True) -> dict: channel = trace.stats.channel.upper() fieldnam = f"Geomagnetic Field Element {channel}" # “Geomagnetic Field Element ” + the element code or “Temperature ” + the name of the location where the temperature was recorded. units = '' # Must be one of “nT”, “Degrees of arc” or “Celsius” if channel == 'D': units = 'Degrees of arc' validmin = -360.0 validmax = 360.0 # A full circle representation elif channel == 'I': units = 'Degrees of arc' validmin = -90.0 validmax = 90.0 #The magnetic field vector can point straight down (+90°), horizontal (0°), or straight up (-90°). elif channel in TEMPERATURE_ELEMENTS_ID: units = 'Celsius' fieldnam = f"Temperature {temperature_index} {trace.stats.location}" validmin = -273.15 #absolute zero validmax = 79_999 elif channel in ['F','S']: units = 'nT' validmin = 0.0 # negative magnetic field intestity not physically meaningful. validmax = 79_999.0 elif channel in ['X', 'Y', 'Z', 'H', 'E', 'V', 'G']: units = 'nT' validmin = -79_999.0 validmax = 79_999.0 # Determine DEPEND_0 based on channel type if channel in self._get_vector_elements(): depend_0 = 'GeomagneticVectorTimes' elif channel in self._get_scalar_elements(): depend_0 = 'GeomagneticScalarTimes' elif channel in TEMPERATURE_ELEMENTS_ID: depend_0 = f"Temperature{temperature_index}Times" var_attrs = { 'FIELDNAM': fieldnam, 'UNITS': units, 'FILLVAL': 99999.0, 'VALIDMIN': validmin, 'VALIDMAX': validmax, 'DEPEND_0': depend_0 if isUniqueTimes else "DataTimes", 'DISPLAY_TYPE': 'time_series', 'LABLAXIS': channel, 'DATA_INTERVAL_TYPE': trace.stats.data_interval_type } return var_attrs def _create_time_var_attrs(self, ts_name: str) -> dict: """ Create a dictionary of time variable attributes. These attributes provide metadata for time variables. Note: None of these attributes are required for the time stamp variables. Reference: - ImagCDF Technical Documentation: ImagCDF Data """ # var_attrs = { # 'UNITS': 'TT2000', # 'DISPLAY_TYPE': 'time_series', # 'LABLAXIS': 'Time', # } # return var_attrs return {} def _get_cdf_data_type(self, trace: Trace) -> int: """ Map ObsPy trace data type to CDF data type. Determines the appropriate CDF data type based on the NumPy data type of the trace data. Returns: - CDF_DOUBLE (45) for floating-point data. - CDF_INT4 (41) for integer data. Reference: - See CDF for more data types """ if trace.data.dtype in [np.float32, np.float64]: return CDFWriter.CDF_DOUBLE elif trace.data.dtype in [np.int32, np.int64]: return CDFWriter.CDF_INT4 else: # Default to double precision float return CDFWriter.CDF_DOUBLE def _read_cdf(self, cdf: CDFReader) -> Stream: """ Read CDF data into an ObsPy Stream. This method reads the data variables and their corresponding time variables from a CDF file and constructs an ObsPy Stream. Parameters: - cdf: cdflib CDF object representing the open CDF file. Returns: - An ObsPy Stream containing the data from the CDF file. """ stream = Stream() # Extract global attributes global_attrs = cdf.globalattsget() # Map global attributes to Stream-level metadata observatory = global_attrs.get('IagaCode', [''])[0] station_name = global_attrs.get('ObservatoryName', [''])[0] institution = global_attrs.get('Institution', [''])[0] latitude = global_attrs.get('Latitude', [0.0])[0] longitude = global_attrs.get('Longitude', [0.0])[0] elevation = global_attrs.get('Elevation', [99_999.0])[0] #default to 99_999 per technical documents. sensor_sampling_rate = global_attrs.get('SensorSamplingRate', [0.0])[0] sensor_orientation = global_attrs.get('VectorSensOrient', [''])[0] data_type = global_attrs.get('DataType', ['variation'])[0] publication_level = global_attrs.get('PublicationLevel', ['1'])[0] comments = global_attrs.get('Comments', ['']) terms_of_use = global_attrs.get('TermsOfUse', [''])[0] declination_base = global_attrs.get('DeclinationBase', [0.0])[0] # Identify all time variables time_vars = {} for var in cdf.cdf_info().zVariables: if var.lower().endswith('times'): time_data = cdf.varget(var) unix_times = cdflib.cdfepoch.unixtime(time_data) utc_times = [UTCDateTime(t) for t in unix_times] time_vars[var] = utc_times # Read data variables and associate them with time variables for var in cdf.cdf_info().zVariables: # Skip time variables if var.lower().endswith('times'): continue data = cdf.varget(var) attrs = cdf.varattsget(var) # Determine DEPEND_0 (the time variable name) and validate ts_name = attrs.get('DEPEND_0') if not ts_name: # If no DEPEND_0, skip this variable as we cannot map times continue # If we used a DataTimes fallback or similar, ensure we handle it case-insensitively # and also confirm that time_vars keys are checked properly. # The ImagCDF can have DataTimes, GeomagneticVectorTimes, etc. # So try exact match first, if not found, attempt case-insensitive. matched_time_key = None for tkey in time_vars.keys(): if tkey == ts_name: matched_time_key = tkey break if tkey.lower() == ts_name.lower(): matched_time_key = tkey break if matched_time_key not in time_vars: # If we cannot find the matching time variable, skip this variable continue times = time_vars[matched_time_key] # Determine delta (sample interval) if len(times) > 1: # delta as a float of seconds delta = (times[1].timestamp - times[0].timestamp) else: # if only one sample, use default based on interval # convert minute, second, etc. to numeric delta if self.interval == 'minute': delta = 60.0 elif self.interval == 'second': delta = 1.0 elif self.interval == 'hour': delta = 3600.0 else: # fallback, set delta to 60 delta = 60.0 # Map the variable name back to a standard channel code # Geomagnetic fields are named like GeomagneticFieldH, GeomagneticFieldD, etc. # Temperatures are named like Temperature1, Temperature2, ... # Extract channel name by removing known prefixes if var.startswith("GeomagneticField"): channel = var.replace("GeomagneticField", "") elif var.startswith("Temperature"): # Temperature variables may not map directly to a geomagnetic channel # but to temperature sensors. We can just use the label from LABLAXIS if needed channel = attrs.get('LABLAXIS', var) else: # fallback if naming doesn't match expected patterns channel = var time_attrs =cdf.varattsget(var) data_interval = time_attrs.get('DATA_INTERVAL_TYPE', ['']) # Create a trace trace = Trace( data=data, header={ 'station': observatory, 'channel': channel, 'starttime': times[0], 'delta': delta, 'geodetic_latitude': latitude, 'geodetic_longitude': longitude, 'elevation': elevation, 'sensor_orientation': "".join(sensor_orientation), 'data_type': data_type, 'station_name': station_name, 'agency_name': institution, 'conditions_of_use': terms_of_use, 'sensor_sampling_rate': sensor_sampling_rate, 'data_interval_type': data_interval, 'declination_base': declination_base, 'filter_comments': comments } ) stream += trace return stream def _get_url( self, observatory: str, date: UTCDateTime, type: DataType = "variation", interval: DataInterval = "minute", channels: Optional[List[str]] = None, ) -> str: """ Generate the file URL specific to ImagCDF conventions. This method constructs the filename based on the ImagCDF naming conventions, which include the observatory code, date-time formatted according to the data interval, and the publication level. [iaga-code]_[date-time]_[publication-level].cdf Parameters: - observatory: IAGA code of the observatory. - date: Start date for the file. - type: Data type indicating the processing level. - interval: Data interval (e.g., 'minute', 'second'). - channels: List of channels (optional). Not Implemented Returns: - The formatted file URL or path. Reference: - ImagCDF Technical Documentation: ImagCDF File Names """ # Get the publication level for the type publication_level = IMCDFPublicationLevel(data_type=type).to_string() # Format of Date/Time Portion of Filename based on interval see reference: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#example-data-file:~:text=Format%20of%20Date,%EF%83%81 if interval == "year": date_format = date.strftime("%Y") elif interval == "month": date_format = date.strftime("%Y%m") elif interval == "day": date_format = date.strftime("%Y%m%d") elif interval == "hour": date_format = date.strftime("%Y%m%d_%H") elif interval == "minute": date_format = date.strftime("%Y%m%d_%H%M") elif interval == "second": date_format = date.strftime("%Y%m%d_%H%M%S") else: raise ValueError(f"Unsupported interval: {interval}") #tenhertz currently not supported # Filename following ImagCDF convention, see reference: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdf-file-names filename = f"{observatory.lower()}_{date_format}_{publication_level}.cdf" # If the urlTemplate explicitly specifies 'stdout', return 'stdout' if self.urlTemplate.lower() == "stdout": return "stdout" # Prepare parameters for templating params = { "date": date.datetime, "i": self._get_interval_abbreviation(interval), "interval": self._get_interval_name(interval), "minute": date.hour * 60 + date.minute, "month": date.strftime("%b").lower(), "MONTH": date.strftime("%b").upper(), "obs": observatory.lower(), "OBS": observatory.upper(), "t": publication_level, "type": self._get_type_name(type), "julian": date.strftime("%j"), "year": date.strftime("%Y"), "ymd": date.strftime("%Y%m%d"), "dt": date_format, } # Attempt to use the template provided in urlTemplate if "{" in self.urlTemplate and "}" in self.urlTemplate: try: return self.urlTemplate.format(**params) except KeyError as e: raise TimeseriesFactoryException(f"Invalid placeholder in urlTemplate: {e}") # If the urlTemplate doesn't support placeholders, assume 'file://' scheme if self.urlTemplate.startswith("file://"): base_path = self.urlTemplate[7:] # Strip "file://" if not base_path or base_path == "{obs}_{dt}_{t}.cdf": base_path = os.getcwd() # Default to current working directory return os.path.join(base_path, filename) # Unsupported URL scheme raise TimeseriesFactoryException( f"Unsupported URL scheme in urlTemplate: {self.urlTemplate}" ) def _get_vector_elements(self): return {'X', 'Y', 'Z', 'H', 'D', 'E', 'V', 'I', 'F'} def _get_scalar_elements(self): return {'S', 'G'}