Newer
Older
"""ImagCDFFactory Implementation Using cdflib
This module provides the ImagCDFFactory class for creating and writing
geomagnetic time series data in the ImagCDF format using the cdflib library.
The ImagCDF format is based on NASA's Common Data Format (CDF) and is designed
to store geomagnetic data with high precision.
References:
- ImagCDF Technical Note: https://intermagnet.org/docs/technical/im_tn_8_ImagCDF.pdf
- ImagCDF Official Documentation: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdfv1-2-intermagnet-exchange-format
- CDF User Guide: https://spdf.gsfc.nasa.gov/pub/software/cdf/doc/cdf_User_Guide.pdf
- CDFLib Docs: [https://cdflib.readthedocs.io/en/latest/#, https://cdflib.readthedocs.io/en/stable/modules/index.html]
"""
from __future__ import absolute_import, print_function
from io import BytesIO
import os
import sys
from typing import List, Optional, Union
from datetime import datetime, timezone, tzinfo
import numpy as np
from obspy import Stream, Trace, UTCDateTime
from sqlalchemy import true
from geomagio.api.ws.Element import TEMPERATURE_ELEMENTS_ID
Shavers, Nicholas H
committed
from ..geomag_types import DataInterval, DataType
from ..TimeseriesFactoryException import TimeseriesFactoryException
from .. import TimeseriesUtility
from .. import Util
from cdflib.cdfwrite import CDF as CDFWriter
from cdflib.cdfread import CDF as CDFReader
import tempfile
class IMCDFPublicationLevel:
"""Handles publication levels and mapping between data types and levels.
The ImagCDF format uses publication levels to describe the processing
level of the data. This class maps data types (e.g., 'variation', 'definitive')
to their corresponding publication levels as defined in the ImagCDF documentation.
Publication Levels:
1: Raw data with no processing.
2: Edited data with preliminary baselines.
3: Data suitable for initial bulletins or quasi-definitive publication.
4: Definitive data with no further changes expected.
Reference:
- ImagCDF Technical Documentation: Attributes that Uniquely Identify the Data
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""
class PublicationLevel:
LEVEL_1 = "1"
LEVEL_2 = "2"
LEVEL_3 = "3"
LEVEL_4 = "4"
TYPE_TO_LEVEL = {
"none": PublicationLevel.LEVEL_1,
"variation": PublicationLevel.LEVEL_1,
"reported": PublicationLevel.LEVEL_1,
"provisional": PublicationLevel.LEVEL_2,
"adjusted": PublicationLevel.LEVEL_2,
"quasi-definitive": PublicationLevel.LEVEL_3,
"quasidefinitive": PublicationLevel.LEVEL_3,
"definitive": PublicationLevel.LEVEL_4,
}
def __init__(self, data_type: Optional[str] = None):
"""Initialize with a data type to determine the publication level."""
if data_type:
self.level = self.TYPE_TO_LEVEL.get(data_type.lower())
else:
raise ValueError("data_type must be provided.")
if not self.level:
raise ValueError(f"Unsupported data_type: {data_type}")
def to_string(self) -> str:
"""Return the publication level as a string."""
return self.level
class ImagCDFFactory(TimeseriesFactory):
"""Factory for creating and writing ImagCDF formatted CDF files.
This class extends the TimeseriesFactory to support writing geomagnetic
time series data to files in the ImagCDF format using the cdflib library.
"""
isUniqueTimes = True # used to determine depend_0 and CDF Time Variable Name
Shavers, Nicholas H
committed
NONSTANDARD_ELEMENTS = TEMPERATURE_ELEMENTS_ID
def __init__(
self,
observatory: Optional[str] = None,
channels: List[str] = ("H", "D", "Z", "F"),
type: DataType = "variation",
interval: DataInterval = "minute",
Shavers, Nicholas H
committed
urlTemplate="file://etc/imagcdf/{obs}_{dt}_{t}.cdf",
urlInterval: int = -1,
):
"""
Initialize the ImagCDFFactory with default parameters.
Parameters:
- observatory: IAGA code of the observatory (e.g., 'BOU').
- channels: List of geomagnetic elements (e.g., ['H', 'D', 'Z', 'F']).
- type: Data type indicating the processing level (e.g., 'variation', 'definitive').
- interval: Data interval (e.g., 'minute', 'second').
- urlTemplate: Template for generating file URLs or paths.
- urlInterval: Interval size for splitting data into multiple files.
"""
super().__init__(
observatory=observatory,
channels=channels,
type=type,
interval=interval,
urlTemplate=urlTemplate,
urlInterval=urlInterval,
)
def write_file(self, fh, timeseries: Stream, channels: List[str]):
# Create a temporary file to write the CDF data
with tempfile.NamedTemporaryFile(delete=False, suffix=".cdf") as tmp_file:
"Compressed": 9, # Enable compression (1-9)
"Majority": CDFWriter.ROW_MAJOR,
"Encoding": CDFWriter.NETWORK_ENCODING, # XDR Encoding - If a CDF must be portable between two or more different types of computers use network encoded.
"Checksum": True, # Disable checksum for faster writes (optional)
"rDim_sizes": [], # Applicable only if using rVariables - CDF protocol recommends only using zVariables.
cdf_writer = CDFWriter(path=tmp_file_path, cdf_spec=cdf_spec, delete=True)
global_attrs = self._create_global_attributes(timeseries, channels)
cdf_writer.write_globalattrs(global_attrs)
time_vars = self._create_time_stamp_variables(
timeseries
) # modifies self.isUniqueTimes
"Variable": ts_name,
"Data_Type": CDFWriter.CDF_TIME_TT2000, # CDF_TIME_TT2000
"Num_Elements": 1,
"Rec_Vary": True,
"Var_Type": "zVariable",
"Dim_Sizes": [],
Shavers, Nicholas H
committed
"Sparse": "no_sparse", # no_sparse because there should not be time gaps. (Time stamps must represent a regular time series with no missing values in the series)
# Define time variable attributes
var_attrs = self._create_time_var_attrs(ts_name)
# Write time variable
cdf_writer.write_var(var_spec, var_attrs, ts_data)
temperature_index = 0
Shavers, Nicholas H
committed
var_name = f"GeomagneticField{channel}"
# if channel in REAL_TEMPERATURE:
# temperature_index += 1 # MUST INCREMENT INDEX BEFORE USING
# var_name = f"Temperature{temperature_index}"
data_type = self._get_cdf_data_type(trace)
num_elements = 1
if data_type in [
CDFWriter.CDF_CHAR,
CDFWriter.CDF_UCHAR,
]: # Handle string types
num_elements = len(trace.data[0]) if len(trace.data) > 0 else 1
"Variable": var_name,
"Data_Type": data_type,
"Num_Elements": num_elements,
"Rec_Vary": True,
"Var_Type": "zVariable",
"Dim_Sizes": [],
"Sparse": "no_sparse",
"Compress": 9,
"Pad": None,
var_attrs = self._create_var_attrs(
trace, temperature_index, self.isUniqueTimes
)
# Write data variable
cdf_writer.write_var(var_spec, var_attrs, trace.data)
# Close the CDF writer
cdf_writer.close()
# Copy the temporary CDF file to the final file handle
with open(tmp_file_path, "rb") as tmp:
cdf_data = tmp.read()
fh.write(cdf_data)
finally:
# Cleanup the temporary file
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def put_timeseries(
self,
timeseries: Stream,
starttime: Optional[UTCDateTime] = None,
endtime: Optional[UTCDateTime] = None,
channels: Optional[List[str]] = None,
type: Optional[DataType] = None,
interval: Optional[DataInterval] = None,
):
"""
Store timeseries data in ImagCDF format using cdflib.
This method writes the timeseries data to one or more files, depending
on the specified urlInterval.
Parameters:
- timeseries: ObsPy Stream containing the geomagnetic data.
- starttime: Start time of the data to be written.
- endtime: End time of the data to be written.
- channels: List of channels to include in the output file.
- type: Data type indicating the processing level.
- interval: Data interval of the data.
"""
if len(timeseries) == 0:
# No data to store
return
channels = channels or self.channels
type = type or self.type
interval = interval or self.interval
# Extract metadata from the first trace
stats = timeseries[0].stats
delta = stats.delta # Sample rate
observatory = stats.station
starttime = starttime or stats.starttime
endtime = endtime or stats.endtime
# Split data into intervals if necessary
Shavers, Nicholas H
committed
urlIntervals = Util.get_intervals(
starttime=starttime, endtime=endtime, size=self.urlInterval
)
for urlInterval in urlIntervals:
interval_start = urlInterval["start"]
interval_end = urlInterval["end"]
url = self._get_url(
observatory=observatory,
date=interval_start,
type=type,
interval=interval,
channels=channels,
)
# Handle 'stdout' output
# Write directly to stdout
fh = sys.stdout.buffer
url_data = timeseries.slice(
starttime=interval_start,
endtime=interval_end,
)
self.write_file(fh, url_data, channels)
continue # Proceed to next interval if any
# Handle 'file://' output
# Get the file path from the URL
url_file = Util.get_file_from_url(url, createParentDirectory=False)
url_data = timeseries.slice(
starttime=interval_start,
endtime=interval_end,
)
# Check if the file already exists to merge data
if os.path.isfile(url_file):
Shavers, Nicholas H
committed
os.remove(url_file)
print(f"Naming conflict. Deleting exisiting file '{url_file}'.")
# raise TimeseriesFactoryException(
# f"Error: File '{url_file}' already exists."
# )
Shavers, Nicholas H
committed
# Pad the data to ensure it fits the interval
url_data.trim(
starttime=interval_start,
endtime=interval_end,
Shavers, Nicholas H
committed
nearest_sample=True,
Shavers, Nicholas H
committed
fill_value=99_999, # FILLVAL
)
# Write the data to the CDF file
with open(url_file, "wb") as fh:
self.write_file(fh, url_data, channels)
else:
# Unsupported URL scheme encountered
raise TimeseriesFactoryException(
"Unsupported URL scheme in urlTemplate"
)
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def get_timeseries(
self,
starttime: UTCDateTime,
endtime: UTCDateTime,
add_empty_channels: bool = True,
observatory: Optional[str] = None,
channels: Optional[List[str]] = None,
type: Optional[DataType] = None,
interval: Optional[DataInterval] = None,
) -> Stream:
observatory = observatory or self.observatory
channels = channels or self.channels
type = type or self.type
interval = interval or self.interval
timeseries = Stream()
urlIntervals = Util.get_intervals(
starttime=starttime, endtime=endtime, size=self.urlInterval
)
for urlInterval in urlIntervals:
url = self._get_url(
observatory=observatory,
date=urlInterval["start"],
type=type,
interval=interval,
channels=channels,
)
continue # stdout is not a valid input source
if not url.startswith("file://"):
raise TimeseriesFactoryException(
"Only file urls are supported for reading ImagCDF"
)
url_file = Util.get_file_from_url(url, createParentDirectory=False)
if not os.path.isfile(url_file):
# If file doesn't exist, skip
continue
try:
# Read CDF data and merge
Shavers, Nicholas H
committed
timeseries = self._read_cdf(cdf, channels)
except Exception as e:
print(f"Error reading CDF file '{url_file}': {e}", file=sys.stderr)
# After reading all intervals, merge and trim
Shavers, Nicholas H
committed
timeseries.merge(fill_value=99_999)
timeseries.trim(
starttime=starttime,
endtime=endtime,
Shavers, Nicholas H
committed
nearest_sample=True,
pad=True,
Shavers, Nicholas H
committed
fill_value=99_999,
)
# If requested, add empty channels not present in data
if add_empty_channels:
present_channels = {tr.stats.channel for tr in timeseries}
missing_channels = set(channels) - present_channels
for ch in missing_channels:
empty_trace = self._get_empty_trace(
starttime=starttime,
endtime=endtime,
observatory=observatory,
channel=ch,
data_type=type,
interval=interval,
)
timeseries += empty_trace
timeseries.sort()
return timeseries
def _create_global_attributes(
self, timeseries: Stream, channels: List[str]
) -> dict:
"""
Create a dictionary of global attributes for the ImagCDF file.
These attributes apply to all the data in the file and include metadata
such as observatory information, data publication level, and format
descriptions.
References:
- ImagCDF Technical Documentation: ImagCDF Global Attributes
"""
stats = timeseries[0].stats if len(timeseries) > 0 else None
# Extract metadata from stats or fallback to defaults
observatory_name = (
getattr(stats, "station_name", None) or self.observatory or ""
)
station = getattr(stats, "station", None) or ""
institution = getattr(stats, "agency_name", None) or ""
latitude = getattr(stats, "geodetic_latitude", None) or 0.0
longitude = getattr(stats, "geodetic_longitude", None) or 0.0
elevation = getattr(stats, "elevation", None) or 99_999.0
conditions_of_use = getattr(stats, "conditions_of_use", None) or ""
vector_orientation = getattr(stats, "sensor_orientation", None) or ""
data_interval_type = getattr(stats, "data_interval_type", None) or self.interval
data_type = getattr(stats, "data_type", None) or "variation"
sensor_sampling_rate = getattr(stats, "sensor_sampling_rate", None) or 0.0
comments = getattr(stats, "filter_comments", None) or [""]
declination_base = getattr(stats, "declination_base", None) or 0.0
publication_level = IMCDFPublicationLevel(data_type=self.type).to_string()
global_attrs = {
"FormatDescription": {0: "INTERMAGNET CDF Format"},
"FormatVersion": {0: "1.2"},
"Title": {0: "Geomagnetic time series data"},
"IagaCode": {0: station},
"ElementsRecorded": {0: "".join(channels)},
"PublicationLevel": {0: publication_level},
"PublicationDate": {
0: [
cdflib.cdfepoch.timestamp_to_tt2000(
datetime.timestamp(datetime.now(timezone.utc))
),
"cdf_time_tt2000",
]
},
"ObservatoryName": {0: observatory_name},
"Latitude": {0: np.array([latitude], dtype=np.float64)},
"Longitude": {0: np.array([longitude], dtype=np.float64)},
"Elevation": {0: np.array([elevation], dtype=np.float64)},
"Institution": {0: institution},
"VectorSensOrient": {
0: vector_orientation
}, # remove F - because its a calculation, not an element?
"StandardLevel": {0: "None"}, # Set to 'None'
# Temporarily Omit 'StandardName', 'StandardVersion', 'PartialStandDesc'
"Source": {
0: "institute"
}, # "institute" - if the named institution provided the data, “INTERMAGNET” - if the data file has been created by INTERMAGNET from another data source, “WDC” - if the World Data Centre has created the file from another data source
"TermsOfUse": {0: conditions_of_use},
# 'UniqueIdentifier': {0: ''},
# 'ParentIdentifiers': {0: ''},
# 'ReferenceLinks': {0: ''}, #links to /ws, plots, USGS.gov
"SensorSamplingRate": {0: sensor_sampling_rate}, # Optional
"DataType": {0: data_type}, # Optional
"Comments": {0: comments}, # Optional
"DeclinationBase": {0: declination_base}, # Optional
}
return global_attrs
def _create_time_stamp_variables(self, timeseries: Stream) -> dict:
vector_times = None
scalar_times = None
Shavers, Nicholas H
committed
nonstandard_times = None
temperature_times = {}
temperature_index = 1
for trace in timeseries:
channel = trace.stats.channel
times = [
(trace.stats.starttime + trace.stats.delta * i).datetime.replace(
tzinfo=timezone.utc
)
for i in range(trace.stats.npts)
]
# Convert timestamps to TT2000 format required by CDF
tt2000_times = cdflib.cdfepoch.timestamp_to_tt2000(
[time.timestamp() for time in times]
)
if channel in self._get_vector_elements():
if vector_times is None:
vector_times = tt2000_times
else:
if not np.array_equal(vector_times, tt2000_times):
raise ValueError(
"Time stamps for vector channels are not the same."
)
elif channel in self._get_scalar_elements():
if scalar_times is None:
scalar_times = tt2000_times
else:
if not np.array_equal(scalar_times, tt2000_times):
raise ValueError(
"Time stamps for scalar channels are not the same."
)
Shavers, Nicholas H
committed
# elif channel in REAL_TEMPERATURES:
# ts_key = f"Temperature{temperature_index}Times"
# if ts_key not in temperature_times:
# temperature_times[ts_key] = tt2000_times
# temperature_index += 1
# else:
# temperature_times[ts_key] = tt2000_times
elif channel in self.NONSTANDARD_ELEMENTS:
if scalar_times is None:
nonstandard_times = tt2000_times
else:
Shavers, Nicholas H
committed
if not np.array_equal(scalar_times, tt2000_times):
raise ValueError(
"Time stamps for nonstandard channels are not the same."
)
time_vars["GeomagneticVectorTimes"] = vector_times
time_vars["GeomagneticScalarTimes"] = scalar_times
if temperature_times:
time_vars.update(temperature_times)
Shavers, Nicholas H
committed
isNonStandard = nonstandard_times is not None
if isNonStandard:
time_vars["NonstandardTimes"] = nonstandard_times
last_times = []
Shavers, Nicholas H
committed
self.isUniqueTimes = (
len(time_vars) == 1
Shavers, Nicholas H
committed
) # (true if is a single time stamp variable in the file)
for index, times in enumerate(time_vars.values()):
if index > 0:
self.isUniqueTimes = not np.array_equal(last_times, times)
last_times = times
Shavers, Nicholas H
committed
if self.isUniqueTimes and isNonStandard and len(time_vars) > 2:
raise ValueError(
f"Time stamps must be the same for all channels when using nonstandard channels: {','.join(self.NONSTANDARD_ELEMENTS)}"
)
return (
{"DataTimes": last_times}
if isNonStandard or not self.isUniqueTimes
else time_vars
)
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
def _create_var_spec(
self,
var_name: str,
data_type: str,
num_elements: int,
var_type: str,
dim_sizes: List[int],
sparse: bool,
compress: int,
pad: Optional[Union[str, np.ndarray]],
) -> dict:
"""
Create a variable specification dictionary for cdflib.
This is used to define the properties of a variable when writing it to
the CDF file.
Parameters:
- var_name: Name of the variable.
- data_type: CDF data type.
- num_elements: Number of elements per record.
- var_type: Variable type ('zVariable' or 'rVariable').
- dim_sizes: Dimensions of the variable (empty list for 0D).
- sparse: Whether the variable uses sparse records.
- compress: Compression level.
- pad: Pad value for sparse records.
Reference:
- CDF User's Guide: Variable Specification
"""
var_spec = {
"Variable": var_name,
"Data_Type": data_type,
"Num_Elements": num_elements,
"Rec_Vary": True,
"Var_Type": var_type,
"Dim_Sizes": dim_sizes,
"Sparse": "no_sparse" if not sparse else "pad_sparse",
"Compress": compress,
"Pad": pad,
def _create_var_attrs(
self,
trace: Trace,
temperature_index: Optional[int] = None,
isUniqueTimes: Optional[bool] = True,
) -> dict:
channel = trace.stats.channel.upper()
fieldnam = f"Geomagnetic Field Element {channel}" # “Geomagnetic Field Element ” + the element code or “Temperature ” + the name of the location where the temperature was recorded.
units = "" # Must be one of “nT”, “Degrees of arc” or “Celsius”
if channel == "D":
units = "Degrees of arc"
validmin = -360.0
validmax = 360.0 # A full circle representation
elif channel == "I":
units = "Degrees of arc"
validmin = -90.0
validmax = 90.0 # The magnetic field vector can point straight down (+90°), horizontal (0°), or straight up (-90°).
elif channel in TEMPERATURE_ELEMENTS_ID:
units = "Celsius"
validmin = -273.15 # absolute zero
validmax = 79_999
depend_0 = "DataTimes" # can be used for nonstandard element
Shavers, Nicholas H
committed
# elif channel in [REAL_TEMPERATURES]:
# units = "Celsius"
# fieldnam = f"Temperature {temperature_index} {trace.stats.location}"
# validmin = -273.15 # absolute zero
# validmax = 79_999
elif channel in ["F", "S"]:
units = "nT"
validmin = (
0.0 # negative magnetic field intestity not physically meaningful.
)
elif channel in ["X", "Y", "Z", "H", "E", "V", "G"]:
units = "nT"
# Determine DEPEND_0 based on channel type
Shavers, Nicholas H
committed
if not isUniqueTimes:
depend_0 = "DataTimes"
elif channel in self._get_vector_elements():
depend_0 = "GeomagneticVectorTimes"
depend_0 = "GeomagneticScalarTimes"
Shavers, Nicholas H
committed
# elif channel in REAL_TEMPERATURES:
# depend_0 = f"Temperature{temperature_index}Times"
"FIELDNAM": fieldnam,
"UNITS": units,
"FILLVAL": 99999.0,
"VALIDMIN": validmin,
"VALIDMAX": validmax,
Shavers, Nicholas H
committed
"DEPEND_0": depend_0,
"DISPLAY_TYPE": "time_series",
"LABLAXIS": channel,
Shavers, Nicholas H
committed
"DATA_INTERVAL_TYPE": trace.stats.data_interval_type, # optional
}
return var_attrs
def _create_time_var_attrs(self, ts_name: str) -> dict:
"""
Create a dictionary of time variable attributes.
These attributes provide metadata for time variables.
Note: None of these attributes are required for the time stamp variables.
# 'UNITS': 'TT2000',
# 'DISPLAY_TYPE': 'time_series',
# 'LABLAXIS': 'Time',
# }
# return var_attrs
return {}
def _get_cdf_data_type(self, trace: Trace) -> int:
"""
Map ObsPy trace data type to CDF data type.
Determines the appropriate CDF data type based on the NumPy data type
of the trace data.
Returns:
- CDF_DOUBLE (45) for floating-point data.
- CDF_INT4 (41) for integer data.
Reference:
Shavers, Nicholas H
committed
def _read_cdf(self, cdf: CDFReader, channels: Optional[List[str]]) -> Stream:
"""
Read CDF data into an ObsPy Stream.
This method reads the data variables and their corresponding time
variables from a CDF file and constructs an ObsPy Stream.
Parameters:
- cdf: cdflib CDF object representing the open CDF file.
Returns:
- An ObsPy Stream containing the data from the CDF file.
"""
stream = Stream()
# Extract global attributes
global_attrs = cdf.globalattsget()
Shavers, Nicholas H
committed
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
# Define required global attributes based on ImagCDF standards
required_global_attrs = [
"FormatDescription",
"FormatVersion",
"Title",
"IagaCode",
"ElementsRecorded",
"PublicationLevel",
"PublicationDate",
"ObservatoryName",
"Latitude",
"Longitude",
"Elevation",
"Institution",
"StandardLevel",
"Source",
]
# Check for missing required global attributes per ImagCDF techincal document
missing_global_attrs = []
for attr in required_global_attrs:
if attr not in global_attrs:
missing_global_attrs.append(attr)
if missing_global_attrs:
error_message = (
f"The following required global attributes are missing from the CDF: "
f"{', '.join(missing_global_attrs)}"
)
raise TimeseriesFactoryException(error_message)
# Map global attributes to Stream-level metadata
observatory = global_attrs.get("IagaCode", [""])[0]
station_name = global_attrs.get("ObservatoryName", [""])[0]
institution = global_attrs.get("Institution", [""])[0]
latitude = global_attrs.get("Latitude", [0.0])[0]
longitude = global_attrs.get("Longitude", [0.0])[0]
elevation = global_attrs.get("Elevation", [99_999.0])[
0
] # default to 99_999 per technical documents.
sensor_sampling_rate = global_attrs.get("SensorSamplingRate", [0.0])[0]
sensor_orientation = global_attrs.get("VectorSensOrient", [""])[0]
data_type = global_attrs.get("DataType", ["variation"])[0]
publication_level = global_attrs.get("PublicationLevel", ["1"])[0]
comments = global_attrs.get("Comments", [""])
terms_of_use = global_attrs.get("TermsOfUse", [""])[0]
declination_base = global_attrs.get("DeclinationBase", [0.0])[0]
# Identify all time variables
for var in cdf.cdf_info().zVariables:
Shavers, Nicholas H
committed
if var.endswith("Times"):
unix_times = cdflib.cdfepoch.unixtime(time_data)
utc_times = [UTCDateTime(t) for t in unix_times]
# Read data variables and associate them with time variables
for var in cdf.cdf_info().zVariables:
# Skip time variables
Shavers, Nicholas H
committed
if var.endswith("Times"):
continue
# Map the variable name back to a standard channel code by removing known prefixes
# Names are like GeomagneticFieldH, GeomagneticFieldD, Temperature1, Temperature2, ...
Shavers, Nicholas H
committed
if var.startswith("GeomagneticField"):
channel = var.replace("GeomagneticField", "")
# elif var.startswith("Temperature"):
# # Temperature variables may not map directly to a geomagnetic channel
# # but to temperature sensors. We can just use the label from LABLAXIS if needed
# channel = attrs.get("LABLAXIS", var)
Shavers, Nicholas H
committed
else:
# fallback if naming doesn't match expected patterns
channel = var
if channels and channel not in channels:
continue
data = cdf.varget(var)
attrs = cdf.varattsget(var)
# Determine DEPEND_0 (the time variable name) and validate
Shavers, Nicholas H
committed
# if not ts_name:
# # If no DEPEND_0, skip this variable as we cannot map times
# continue
Shavers, Nicholas H
committed
# The ImagCDF can have DataTimes, GeomagneticVectorTimes, GeomagneticScalarTimes, TemperatureNTimes (for N > 0), etc.
matched_time_key = None
for tkey in time_vars.keys():
if tkey == ts_name:
matched_time_key = tkey
break
Shavers, Nicholas H
committed
# if matched_time_key not in time_vars:
# # If we cannot find the matching time variable, skip this variable
# continue
times = []
if matched_time_key in time_vars:
times = time_vars[matched_time_key]
# Determine delta (sample interval)
if len(times) > 1:
# delta as a float of seconds
delta = times[1].timestamp - times[0].timestamp
else:
# if only one sample, use default based on interval
# convert minute, second, etc. to numeric delta
delta = 60.0
delta = 1.0
delta = 3600.0
else:
# fallback, set delta to 60
delta = 60.0
time_attrs = cdf.varattsget(var)
data_interval = time_attrs.get("DATA_INTERVAL_TYPE", [""])
Shavers, Nicholas H
committed
# Required variable attributes based on ImagCDF standards
required_variable_attrs = [
"FIELDNAM",
"UNITS",
"FILLVAL",
"VALIDMIN",
"VALIDMAX",
"DISPLAY_TYPE",
"LABLAXIS",
"DEPEND_0",
Shavers, Nicholas H
committed
]
# Validate presence of required variable attributes
missing_var_attrs = []
for var_attr in required_variable_attrs:
if var_attr not in attrs:
missing_var_attrs.append(var_attr)
if missing_var_attrs or missing_global_attrs:
error_message = (
f"The variable '{var}' is missing required attributes: "
f"{', '.join(missing_var_attrs)}"
)
raise TimeseriesFactoryException(error_message)
# Create a trace
trace = Trace(
data=data,
header={
"station": observatory,
"channel": channel,
"starttime": times[0],
"delta": delta,
"geodetic_latitude": latitude,
"geodetic_longitude": longitude,
"elevation": elevation,
"sensor_orientation": "".join(sensor_orientation),
"data_type": data_type,
"station_name": station_name,
"agency_name": institution,
"conditions_of_use": terms_of_use,
"sensor_sampling_rate": sensor_sampling_rate,
"data_interval_type": data_interval,
"declination_base": declination_base,
"filter_comments": comments,
},
)
stream += trace
return stream
def _get_url(
self,
observatory: str,
date: UTCDateTime,
type: DataType = "variation",
interval: DataInterval = "minute",
channels: Optional[List[str]] = None,
) -> str:
"""
Generate the file URL specific to ImagCDF conventions.
This method constructs the filename based on the ImagCDF naming
conventions, which include the observatory code, date-time formatted
according to the data interval, and the publication level.
[iaga-code]_[date-time]_[publication-level].cdf
Parameters:
- observatory: IAGA code of the observatory.
- date: Start date for the file.
- type: Data type indicating the processing level.
- interval: Data interval (e.g., 'minute', 'second').
- channels: List of channels (optional). Not Implemented
Returns:
- The formatted file URL or path.
Reference:
- ImagCDF Technical Documentation: ImagCDF File Names
"""
# Get the publication level for the type
publication_level = IMCDFPublicationLevel(data_type=type).to_string()
# Format of Date/Time Portion of Filename based on interval see reference: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#example-data-file:~:text=Format%20of%20Date,%EF%83%81
if interval == "year":
date_format = date.strftime("%Y")
elif interval == "month":
date_format = date.strftime("%Y%m")
elif interval == "day":
date_format = date.strftime("%Y%m%d")
elif interval == "hour":
date_format = date.strftime("%Y%m%d_%H")
elif interval == "minute":
date_format = date.strftime("%Y%m%d_%H%M")
elif interval == "second":
date_format = date.strftime("%Y%m%d_%H%M%S")
else:
raise ValueError(
f"Unsupported interval: {interval}"
) # tenhertz currently not supported
# Filename following ImagCDF convention, see reference: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdf-file-names
filename = f"{observatory.lower()}_{date_format}_{publication_level}.cdf"
# If the urlTemplate explicitly specifies 'stdout', return 'stdout'
if self.urlTemplate.lower() == "stdout":
return "stdout"
# Prepare parameters for templating
params = {
"date": date.datetime,
"i": self._get_interval_abbreviation(interval),
"interval": self._get_interval_name(interval),
"minute": date.hour * 60 + date.minute,
"month": date.strftime("%b").lower(),
"MONTH": date.strftime("%b").upper(),
"obs": observatory.lower(),
"OBS": observatory.upper(),
"t": publication_level,
"type": self._get_type_name(type),
"julian": date.strftime("%j"),
"year": date.strftime("%Y"),
"ymd": date.strftime("%Y%m%d"),
"dt": date_format,
}
# Attempt to use the template provided in urlTemplate
if "{" in self.urlTemplate and "}" in self.urlTemplate:
try:
return self.urlTemplate.format(**params)
except KeyError as e:
raise TimeseriesFactoryException(
f"Invalid placeholder in urlTemplate: {e}"
)
# If the urlTemplate doesn't support placeholders, assume 'file://' scheme
if self.urlTemplate.startswith("file://"):
Shavers, Nicholas H
committed
base_path = self.urlTemplate[7:]
if not base_path or base_path == "{obs}_{dt}_{t}.cdf":
base_path = os.getcwd() # Default to current working directory
return os.path.join(base_path, "etc", "imagcdf", filename)
Shavers, Nicholas H
committed
return os.path.join(self.urlTemplate, filename)
# Unsupported URL scheme
raise TimeseriesFactoryException(
f"Unsupported URL scheme in urlTemplate: {self.urlTemplate}"
)
def _get_vector_elements(self):
return {"X", "Y", "Z", "H", "D", "E", "V", "I", "F"}
def _get_scalar_elements(self):
return {"S", "G"}