Newer
Older
"""ImagCDFFactory Implementation Using cdflib
This module provides the ImagCDFFactory class for creating and writing
geomagnetic time series data in the ImagCDF format using the cdflib library.
The ImagCDF format is based on NASA's Common Data Format (CDF) and is designed
to store geomagnetic data with high precision.
References:
- ImagCDF Technical Note: https://intermagnet.org/docs/technical/im_tn_8_ImagCDF.pdf
- ImagCDF Official Documentation: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdfv1-2-intermagnet-exchange-format
- CDF User Guide: https://spdf.gsfc.nasa.gov/pub/software/cdf/doc/cdf_User_Guide.pdf
- CDFLib Docs: [https://cdflib.readthedocs.io/en/latest/#, https://cdflib.readthedocs.io/en/stable/modules/index.html]
"""
from __future__ import absolute_import, print_function
from io import BytesIO
import os
import sys
from typing import List, Optional, Union
from datetime import datetime, timezone, tzinfo
import numpy as np
from obspy import Stream, Trace, UTCDateTime
from sqlalchemy import true
from geomagio.api.ws.Element import TEMPERATURE_ELEMENTS_ID
Shavers, Nicholas H
committed
from ..geomag_types import DataInterval, DataType
from ..TimeseriesFactoryException import TimeseriesFactoryException
from .. import TimeseriesUtility
from .. import Util
from cdflib.cdfwrite import CDF as CDFWriter
from cdflib.cdfread import CDF as CDFReader
import tempfile
class IMCDFPublicationLevel:
"""Handles publication levels and mapping between data types and levels.
The ImagCDF format uses publication levels to describe the processing
level of the data. This class maps data types (e.g., 'variation', 'definitive')
to their corresponding publication levels as defined in the ImagCDF documentation.
Publication Levels:
1: Raw data with no processing.
2: Edited data with preliminary baselines.
3: Data suitable for initial bulletins or quasi-definitive publication.
4: Definitive data with no further changes expected.
Reference:
- ImagCDF Technical Documentation: Attributes that Uniquely Identify the Data
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""
class PublicationLevel:
LEVEL_1 = "1"
LEVEL_2 = "2"
LEVEL_3 = "3"
LEVEL_4 = "4"
TYPE_TO_LEVEL = {
"none": PublicationLevel.LEVEL_1,
"variation": PublicationLevel.LEVEL_1,
"reported": PublicationLevel.LEVEL_1,
"provisional": PublicationLevel.LEVEL_2,
"adjusted": PublicationLevel.LEVEL_2,
"quasi-definitive": PublicationLevel.LEVEL_3,
"quasidefinitive": PublicationLevel.LEVEL_3,
"definitive": PublicationLevel.LEVEL_4,
}
def __init__(self, data_type: Optional[str] = None):
"""Initialize with a data type to determine the publication level."""
if data_type:
self.level = self.TYPE_TO_LEVEL.get(data_type.lower())
else:
raise ValueError("data_type must be provided.")
if not self.level:
raise ValueError(f"Unsupported data_type: {data_type}")
def to_string(self) -> str:
"""Return the publication level as a string."""
return self.level
class ImagCDFFactory(TimeseriesFactory):
"""Factory for creating and writing ImagCDF formatted CDF files.
This class extends the TimeseriesFactory to support writing geomagnetic
time series data to files in the ImagCDF format using the cdflib library.
"""
isUniqueTimes = True # used to determine depend_0 and CDF Time Variable Name
Shavers, Nicholas H
committed
NONSTANDARD_ELEMENTS = TEMPERATURE_ELEMENTS_ID
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def __init__(
self,
observatory: Optional[str] = None,
channels: List[str] = ("H", "D", "Z", "F"),
type: DataType = "variation",
interval: DataInterval = "minute",
urlTemplate="file://{obs}_{dt}_{t}.cdf",
urlInterval: int = -1,
):
"""
Initialize the ImagCDFFactory with default parameters.
Parameters:
- observatory: IAGA code of the observatory (e.g., 'BOU').
- channels: List of geomagnetic elements (e.g., ['H', 'D', 'Z', 'F']).
- type: Data type indicating the processing level (e.g., 'variation', 'definitive').
- interval: Data interval (e.g., 'minute', 'second').
- urlTemplate: Template for generating file URLs or paths.
- urlInterval: Interval size for splitting data into multiple files.
"""
super().__init__(
observatory=observatory,
channels=channels,
type=type,
interval=interval,
urlTemplate=urlTemplate,
urlInterval=urlInterval,
)
def write_file(self, fh, timeseries: Stream, channels: List[str]):
# Create a temporary file to write the CDF data
with tempfile.NamedTemporaryFile(delete=False, suffix=".cdf") as tmp_file:
"Compressed": 9, # Enable compression (0-9)
"Majority": CDFWriter.ROW_MAJOR, # Data layout - gets set automatically
"Encoding": CDFWriter.IBMPC_ENCODING, # gets set automatically
"Checksum": True, # Disable checksum for faster writes (optional)
"rDim_sizes": [], # Applicable only if using rVariables - CDF protocol recommends only using zVariables.
cdf_writer = CDFWriter(path=tmp_file_path, cdf_spec=cdf_spec, delete=True)
global_attrs = self._create_global_attributes(timeseries, channels)
cdf_writer.write_globalattrs(global_attrs)
time_vars = self._create_time_stamp_variables(
timeseries
) # modifies self.isUniqueTimes
"Variable": ts_name,
"Data_Type": CDFWriter.CDF_TIME_TT2000, # CDF_TIME_TT2000
"Num_Elements": 1,
"Rec_Vary": True,
"Var_Type": "zVariable",
"Dim_Sizes": [],
Shavers, Nicholas H
committed
"Sparse": "no_sparse", # no_sparse because there should not be time gaps. (Time stamps must represent a regular time series with no missing values in the series)
# Define time variable attributes
var_attrs = self._create_time_var_attrs(ts_name)
# Write time variable
cdf_writer.write_var(var_spec, var_attrs, ts_data)
temperature_index = 0
Shavers, Nicholas H
committed
var_name = f"GeomagneticField{channel}"
# if channel in REAL_TEMPERATURE:
# temperature_index += 1 # MUST INCREMENT INDEX BEFORE USING
# var_name = f"Temperature{temperature_index}"
data_type = self._get_cdf_data_type(trace)
num_elements = 1
if data_type in [
CDFWriter.CDF_CHAR,
CDFWriter.CDF_UCHAR,
]: # Handle string types
num_elements = len(trace.data[0]) if len(trace.data) > 0 else 1
"Variable": var_name,
"Data_Type": data_type,
"Num_Elements": num_elements,
"Rec_Vary": True,
"Var_Type": "zVariable",
"Dim_Sizes": [],
"Sparse": "no_sparse",
"Compress": 9,
"Pad": None,
var_attrs = self._create_var_attrs(
trace, temperature_index, self.isUniqueTimes
)
# Write data variable
cdf_writer.write_var(var_spec, var_attrs, trace.data)
# Close the CDF writer
cdf_writer.close()
# Copy the temporary CDF file to the final file handle
with open(tmp_file_path, "rb") as tmp:
cdf_data = tmp.read()
fh.write(cdf_data)
finally:
# Cleanup the temporary file
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def put_timeseries(
self,
timeseries: Stream,
starttime: Optional[UTCDateTime] = None,
endtime: Optional[UTCDateTime] = None,
channels: Optional[List[str]] = None,
type: Optional[DataType] = None,
interval: Optional[DataInterval] = None,
):
"""
Store timeseries data in ImagCDF format using cdflib.
This method writes the timeseries data to one or more files, depending
on the specified urlInterval.
Parameters:
- timeseries: ObsPy Stream containing the geomagnetic data.
- starttime: Start time of the data to be written.
- endtime: End time of the data to be written.
- channels: List of channels to include in the output file.
- type: Data type indicating the processing level.
- interval: Data interval of the data.
"""
if len(timeseries) == 0:
# No data to store
return
channels = channels or self.channels
type = type or self.type
interval = interval or self.interval
# Extract metadata from the first trace
stats = timeseries[0].stats
delta = stats.delta # Sample rate
observatory = stats.station
starttime = starttime or stats.starttime
endtime = endtime or stats.endtime
# Split data into intervals if necessary
Shavers, Nicholas H
committed
urlIntervals = Util.get_intervals(
starttime=starttime, endtime=endtime, size=self.urlInterval
)
for urlInterval in urlIntervals:
interval_start = urlInterval["start"]
interval_end = urlInterval["end"]
if interval_start != interval_end:
interval_end = interval_end - delta
url = self._get_url(
observatory=observatory,
date=interval_start,
type=type,
interval=interval,
channels=channels,
)
# Handle 'stdout' output
# Write directly to stdout
fh = sys.stdout.buffer
url_data = timeseries.slice(
starttime=interval_start,
endtime=interval_end,
)
self.write_file(fh, url_data, channels)
continue # Proceed to next interval if any
# Handle 'file://' output
# Get the file path from the URL
url_file = Util.get_file_from_url(url, createParentDirectory=False)
url_data = timeseries.slice(
starttime=interval_start,
endtime=interval_end,
)
# Check if the file already exists to merge data
if os.path.isfile(url_file):
try:
# Read existing data to merge with new data
# existing_cdf.close() #no close method?
existing_data = existing_stream
# Merge existing data with new data
for trace in existing_data:
new_trace = url_data.select(
network=trace.stats.network,
station=trace.stats.station,
channel=trace.stats.channel,
)
if new_trace:
trace.data = np.concatenate(
(trace.data, new_trace[0].data)
)
url_data = existing_data + url_data
except Exception as e:
# Log the exception if needed
print(
f"Warning: Could not read existing CDF file '{url_file}': {e}",
file=sys.stderr,
)
Shavers, Nicholas H
committed
# Pad the data to ensure it fits the interval
url_data.trim(
starttime=interval_start,
endtime=interval_end,
nearest_sample=False,
pad=True,
Shavers, Nicholas H
committed
fill_value=99_999, # FILLVAL
)
# Write the data to the CDF file
with open(url_file, "wb") as fh:
self.write_file(fh, url_data, channels)
else:
# Unsupported URL scheme encountered
raise TimeseriesFactoryException(
"Unsupported URL scheme in urlTemplate"
)
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def get_timeseries(
self,
starttime: UTCDateTime,
endtime: UTCDateTime,
add_empty_channels: bool = True,
observatory: Optional[str] = None,
channels: Optional[List[str]] = None,
type: Optional[DataType] = None,
interval: Optional[DataInterval] = None,
) -> Stream:
observatory = observatory or self.observatory
channels = channels or self.channels
type = type or self.type
interval = interval or self.interval
timeseries = Stream()
urlIntervals = Util.get_intervals(
starttime=starttime, endtime=endtime, size=self.urlInterval
)
for urlInterval in urlIntervals:
url = self._get_url(
observatory=observatory,
date=urlInterval["start"],
type=type,
interval=interval,
channels=channels,
)
continue # stdout is not a valid input source
if not url.startswith("file://"):
raise TimeseriesFactoryException(
"Only file urls are supported for reading ImagCDF"
)
url_file = Util.get_file_from_url(url, createParentDirectory=False)
if not os.path.isfile(url_file):
# If file doesn't exist, skip
continue
try:
# Read CDF data and merge
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
file_stream = self._read_cdf(cdf)
# Attempt to select only requested channels
selected = Stream()
for ch in channels:
selected += file_stream.select(channel=ch)
timeseries += selected
except Exception as e:
print(f"Error reading CDF file '{url_file}': {e}", file=sys.stderr)
# After reading all intervals, merge and trim
timeseries.merge(fill_value=np.nan)
timeseries.trim(
starttime=starttime,
endtime=endtime,
nearest_sample=False,
pad=True,
fill_value=np.nan,
)
# If requested, add empty channels not present in data
if add_empty_channels:
present_channels = {tr.stats.channel for tr in timeseries}
missing_channels = set(channels) - present_channels
for ch in missing_channels:
empty_trace = self._get_empty_trace(
starttime=starttime,
endtime=endtime,
observatory=observatory,
channel=ch,
data_type=type,
interval=interval,
)
timeseries += empty_trace
timeseries.sort()
return timeseries
Shavers, Nicholas H
committed
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
def parse_string(self, data: str, **kwargs):
"""
Parse ImagCDF binary data into an ObsPy Stream.
This method writes the provided binary data to a temporary file,
reads the file using `cdflib`, and converts the data into an ObsPy
Stream.
Parameters
----------
data : bytes
Binary data containing ImagCDF content.
Returns
-------
Stream
An ObsPy Stream object with the parsed geomagnetic time series data.
Raises
------
TimeseriesFactoryException
If an error occurs while parsing the ImagCDF data.
"""
# Create a temporary file to store the CDF data
with tempfile.NamedTemporaryFile(delete=False, suffix=".cdf") as tmp_file:
tmp_file_name = tmp_file.name
tmp_file.write(data)
try:
# Read the CDF from the temporary file
cdf = CDFReader(tmp_file_name)
stream = self._read_cdf(cdf)
# no cdf.close() method required
except Exception as e:
raise TimeseriesFactoryException(f"Error parsing ImagCDF data: {e}")
finally:
# Clean up the temporary file
os.remove(tmp_file_name)
return stream
def _create_global_attributes(
self, timeseries: Stream, channels: List[str]
) -> dict:
"""
Create a dictionary of global attributes for the ImagCDF file.
These attributes apply to all the data in the file and include metadata
such as observatory information, data publication level, and format
descriptions.
References:
- ImagCDF Technical Documentation: ImagCDF Global Attributes
"""
stats = timeseries[0].stats if len(timeseries) > 0 else None
# Extract metadata from stats or fallback to defaults
observatory_name = (
getattr(stats, "station_name", None) or self.observatory or ""
)
station = getattr(stats, "station", None) or ""
institution = getattr(stats, "agency_name", None) or ""
latitude = getattr(stats, "geodetic_latitude", None) or 0.0
longitude = getattr(stats, "geodetic_longitude", None) or 0.0
elevation = getattr(stats, "elevation", None) or 99_999.0
conditions_of_use = getattr(stats, "conditions_of_use", None) or ""
vector_orientation = getattr(stats, "sensor_orientation", None) or ""
data_interval_type = getattr(stats, "data_interval_type", None) or self.interval
data_type = getattr(stats, "data_type", None) or "variation"
sensor_sampling_rate = getattr(stats, "sensor_sampling_rate", None) or 0.0
comments = getattr(stats, "filter_comments", None) or [""]
declination_base = getattr(stats, "declination_base", None) or 0.0
publication_level = IMCDFPublicationLevel(data_type=self.type).to_string()
global_attrs = {
"FormatDescription": {0: "INTERMAGNET CDF Format"},
"FormatVersion": {0: "1.2"},
"Title": {0: "Geomagnetic time series data"},
"IagaCode": {0: station},
"ElementsRecorded": {0: "".join(channels)},
"PublicationLevel": {0: publication_level},
"PublicationDate": {
0: [
cdflib.cdfepoch.timestamp_to_tt2000(
datetime.timestamp(datetime.now(timezone.utc))
),
"cdf_time_tt2000",
]
},
"ObservatoryName": {0: observatory_name},
"Latitude": {0: np.array([latitude], dtype=np.float64)},
"Longitude": {0: np.array([longitude], dtype=np.float64)},
"Elevation": {0: np.array([elevation], dtype=np.float64)},
"Institution": {0: institution},
"VectorSensOrient": {
0: vector_orientation
}, # remove F - because its a calculation, not an element?
"StandardLevel": {0: "None"}, # Set to 'None'
# Temporarily Omit 'StandardName', 'StandardVersion', 'PartialStandDesc'
"Source": {
0: "institute"
}, # "institute" - if the named institution provided the data, “INTERMAGNET” - if the data file has been created by INTERMAGNET from another data source, “WDC” - if the World Data Centre has created the file from another data source
"TermsOfUse": {0: conditions_of_use},
# 'UniqueIdentifier': {0: ''},
# 'ParentIdentifiers': {0: ''},
# 'ReferenceLinks': {0: ''}, #links to /ws, plots, USGS.gov
"SensorSamplingRate": {0: sensor_sampling_rate}, # Optional
"DataType": {0: data_type}, # Optional
"Comments": {0: comments}, # Optional
"DeclinationBase": {0: declination_base}, # Optional
}
return global_attrs
def _create_time_stamp_variables(self, timeseries: Stream) -> dict:
vector_times = None
scalar_times = None
Shavers, Nicholas H
committed
nonstandard_times = None
temperature_times = {}
temperature_index = 1
for trace in timeseries:
channel = trace.stats.channel
times = [
(trace.stats.starttime + trace.stats.delta * i).datetime.replace(
tzinfo=timezone.utc
)
for i in range(trace.stats.npts)
]
# Convert timestamps to TT2000 format required by CDF
tt2000_times = cdflib.cdfepoch.timestamp_to_tt2000(
[time.timestamp() for time in times]
)
if channel in self._get_vector_elements():
if vector_times is None:
vector_times = tt2000_times
else:
if not np.array_equal(vector_times, tt2000_times):
raise ValueError(
"Time stamps for vector channels are not the same."
)
elif channel in self._get_scalar_elements():
if scalar_times is None:
scalar_times = tt2000_times
else:
if not np.array_equal(scalar_times, tt2000_times):
raise ValueError(
"Time stamps for scalar channels are not the same."
)
Shavers, Nicholas H
committed
# elif channel in REAL_TEMPERATURES:
# ts_key = f"Temperature{temperature_index}Times"
# if ts_key not in temperature_times:
# temperature_times[ts_key] = tt2000_times
# temperature_index += 1
# else:
# temperature_times[ts_key] = tt2000_times
elif channel in self.NONSTANDARD_ELEMENTS:
if scalar_times is None:
nonstandard_times = tt2000_times
else:
Shavers, Nicholas H
committed
if not np.array_equal(scalar_times, tt2000_times):
raise ValueError(
"Time stamps for nonstandard channels are not the same."
)
time_vars["GeomagneticVectorTimes"] = vector_times
time_vars["GeomagneticScalarTimes"] = scalar_times
if temperature_times:
time_vars.update(temperature_times)
Shavers, Nicholas H
committed
isNonStandard = nonstandard_times is not None
if isNonStandard:
time_vars["NonstandardTimes"] = nonstandard_times
last_times = []
Shavers, Nicholas H
committed
self.isUniqueTimes = (
len(time_vars) == 1
Shavers, Nicholas H
committed
) # (true if is a single time stamp variable in the file)
for index, times in enumerate(time_vars.values()):
if index > 0:
self.isUniqueTimes = not np.array_equal(last_times, times)
last_times = times
Shavers, Nicholas H
committed
if self.isUniqueTimes and isNonStandard and len(time_vars) > 2:
raise ValueError(
f"Time stamps must be the same for all channels when using nonstandard channels: {','.join(self.NONSTANDARD_ELEMENTS)}"
)
return (
{"DataTimes": last_times}
if isNonStandard or not self.isUniqueTimes
else time_vars
)
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
def _create_var_spec(
self,
var_name: str,
data_type: str,
num_elements: int,
var_type: str,
dim_sizes: List[int],
sparse: bool,
compress: int,
pad: Optional[Union[str, np.ndarray]],
) -> dict:
"""
Create a variable specification dictionary for cdflib.
This is used to define the properties of a variable when writing it to
the CDF file.
Parameters:
- var_name: Name of the variable.
- data_type: CDF data type.
- num_elements: Number of elements per record.
- var_type: Variable type ('zVariable' or 'rVariable').
- dim_sizes: Dimensions of the variable (empty list for 0D).
- sparse: Whether the variable uses sparse records.
- compress: Compression level.
- pad: Pad value for sparse records.
Reference:
- CDF User's Guide: Variable Specification
"""
var_spec = {
"Variable": var_name,
"Data_Type": data_type,
"Num_Elements": num_elements,
"Rec_Vary": True,
"Var_Type": var_type,
"Dim_Sizes": dim_sizes,
"Sparse": "no_sparse" if not sparse else "pad_sparse",
"Compress": compress,
"Pad": pad,
def _create_var_attrs(
self,
trace: Trace,
temperature_index: Optional[int] = None,
isUniqueTimes: Optional[bool] = True,
) -> dict:
channel = trace.stats.channel.upper()
fieldnam = f"Geomagnetic Field Element {channel}" # “Geomagnetic Field Element ” + the element code or “Temperature ” + the name of the location where the temperature was recorded.
units = "" # Must be one of “nT”, “Degrees of arc” or “Celsius”
if channel == "D":
units = "Degrees of arc"
validmin = -360.0
validmax = 360.0 # A full circle representation
elif channel == "I":
units = "Degrees of arc"
validmin = -90.0
validmax = 90.0 # The magnetic field vector can point straight down (+90°), horizontal (0°), or straight up (-90°).
elif channel in TEMPERATURE_ELEMENTS_ID:
units = "Celsius"
validmin = -273.15 # absolute zero
validmax = 79_999
Shavers, Nicholas H
committed
# elif channel in [REAL_TEMPERATURES]:
# units = "Celsius"
# fieldnam = f"Temperature {temperature_index} {trace.stats.location}"
# validmin = -273.15 # absolute zero
# validmax = 79_999
elif channel in ["F", "S"]:
units = "nT"
validmin = (
0.0 # negative magnetic field intestity not physically meaningful.
)
elif channel in ["X", "Y", "Z", "H", "E", "V", "G"]:
units = "nT"
# Determine DEPEND_0 based on channel type
Shavers, Nicholas H
committed
if not isUniqueTimes:
depend_0 = "DataTimes"
elif channel in self._get_vector_elements():
depend_0 = "GeomagneticVectorTimes"
depend_0 = "GeomagneticScalarTimes"
Shavers, Nicholas H
committed
# elif channel in REAL_TEMPERATURES:
# depend_0 = f"Temperature{temperature_index}Times"
"FIELDNAM": fieldnam,
"UNITS": units,
"FILLVAL": 99999.0,
"VALIDMIN": validmin,
"VALIDMAX": validmax,
Shavers, Nicholas H
committed
"DEPEND_0": depend_0,
"DISPLAY_TYPE": "time_series",
"LABLAXIS": channel,
Shavers, Nicholas H
committed
"DATA_INTERVAL_TYPE": trace.stats.data_interval_type, # optional
}
return var_attrs
def _create_time_var_attrs(self, ts_name: str) -> dict:
"""
Create a dictionary of time variable attributes.
These attributes provide metadata for time variables.
Note: None of these attributes are required for the time stamp variables.
# 'UNITS': 'TT2000',
# 'DISPLAY_TYPE': 'time_series',
# 'LABLAXIS': 'Time',
# }
# return var_attrs
return {}
def _get_cdf_data_type(self, trace: Trace) -> int:
"""
Map ObsPy trace data type to CDF data type.
Determines the appropriate CDF data type based on the NumPy data type
of the trace data.
Returns:
- CDF_DOUBLE (45) for floating-point data.
- CDF_INT4 (41) for integer data.
Reference:
"""
Read CDF data into an ObsPy Stream.
This method reads the data variables and their corresponding time
variables from a CDF file and constructs an ObsPy Stream.
Parameters:
- cdf: cdflib CDF object representing the open CDF file.
Returns:
- An ObsPy Stream containing the data from the CDF file.
"""
stream = Stream()
# Extract global attributes
global_attrs = cdf.globalattsget()
# Map global attributes to Stream-level metadata
observatory = global_attrs.get("IagaCode", [""])[0]
station_name = global_attrs.get("ObservatoryName", [""])[0]
institution = global_attrs.get("Institution", [""])[0]
latitude = global_attrs.get("Latitude", [0.0])[0]
longitude = global_attrs.get("Longitude", [0.0])[0]
elevation = global_attrs.get("Elevation", [99_999.0])[
0
] # default to 99_999 per technical documents.
sensor_sampling_rate = global_attrs.get("SensorSamplingRate", [0.0])[0]
sensor_orientation = global_attrs.get("VectorSensOrient", [""])[0]
data_type = global_attrs.get("DataType", ["variation"])[0]
publication_level = global_attrs.get("PublicationLevel", ["1"])[0]
comments = global_attrs.get("Comments", [""])
terms_of_use = global_attrs.get("TermsOfUse", [""])[0]
declination_base = global_attrs.get("DeclinationBase", [0.0])[0]
# Identify all time variables
for var in cdf.cdf_info().zVariables:
unix_times = cdflib.cdfepoch.unixtime(time_data)
utc_times = [UTCDateTime(t) for t in unix_times]
# Read data variables and associate them with time variables
for var in cdf.cdf_info().zVariables:
# Skip time variables
continue
data = cdf.varget(var)
attrs = cdf.varattsget(var)
# Determine DEPEND_0 (the time variable name) and validate
if not ts_name:
# If no DEPEND_0, skip this variable as we cannot map times
continue
Shavers, Nicholas H
committed
# The ImagCDF can have DataTimes, GeomagneticVectorTimes, GeomagneticScalarTimes, TemperatureNTimes (for N > 0), etc.
matched_time_key = None
for tkey in time_vars.keys():
if tkey == ts_name:
matched_time_key = tkey
break
if matched_time_key not in time_vars:
# If we cannot find the matching time variable, skip this variable
continue
times = time_vars[matched_time_key]
# Determine delta (sample interval)
if len(times) > 1:
# delta as a float of seconds
delta = times[1].timestamp - times[0].timestamp
else:
# if only one sample, use default based on interval
# convert minute, second, etc. to numeric delta
delta = 60.0
delta = 1.0
delta = 3600.0
else:
# fallback, set delta to 60
delta = 60.0
# Map the variable name back to a standard channel code
# Geomagnetic fields are named like GeomagneticFieldH, GeomagneticFieldD, etc.
# Temperatures are named like Temperature1, Temperature2, ...
# Extract channel name by removing known prefixes
if var.startswith("GeomagneticField"):
channel = var.replace("GeomagneticField", "")
elif var.startswith("Temperature"):
# Temperature variables may not map directly to a geomagnetic channel
# but to temperature sensors. We can just use the label from LABLAXIS if needed
channel = attrs.get("LABLAXIS", var)
else:
# fallback if naming doesn't match expected patterns
channel = var
time_attrs = cdf.varattsget(var)
data_interval = time_attrs.get("DATA_INTERVAL_TYPE", [""])
# Create a trace
trace = Trace(
data=data,
header={
"station": observatory,
"channel": channel,
"starttime": times[0],
"delta": delta,
"geodetic_latitude": latitude,
"geodetic_longitude": longitude,
"elevation": elevation,
"sensor_orientation": "".join(sensor_orientation),
"data_type": data_type,
"station_name": station_name,
"agency_name": institution,
"conditions_of_use": terms_of_use,
"sensor_sampling_rate": sensor_sampling_rate,
"data_interval_type": data_interval,
"declination_base": declination_base,
"filter_comments": comments,
},
)
stream += trace
return stream
def _get_url(
self,
observatory: str,
date: UTCDateTime,
type: DataType = "variation",
interval: DataInterval = "minute",
channels: Optional[List[str]] = None,
) -> str:
"""
Generate the file URL specific to ImagCDF conventions.
This method constructs the filename based on the ImagCDF naming
conventions, which include the observatory code, date-time formatted
according to the data interval, and the publication level.
[iaga-code]_[date-time]_[publication-level].cdf
Parameters:
- observatory: IAGA code of the observatory.
- date: Start date for the file.
- type: Data type indicating the processing level.
- interval: Data interval (e.g., 'minute', 'second').
- channels: List of channels (optional). Not Implemented
Returns:
- The formatted file URL or path.
Reference:
- ImagCDF Technical Documentation: ImagCDF File Names
"""
# Get the publication level for the type
publication_level = IMCDFPublicationLevel(data_type=type).to_string()
# Format of Date/Time Portion of Filename based on interval see reference: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#example-data-file:~:text=Format%20of%20Date,%EF%83%81
if interval == "year":
date_format = date.strftime("%Y")
elif interval == "month":
date_format = date.strftime("%Y%m")
elif interval == "day":
date_format = date.strftime("%Y%m%d")
elif interval == "hour":
date_format = date.strftime("%Y%m%d_%H")
elif interval == "minute":
date_format = date.strftime("%Y%m%d_%H%M")
elif interval == "second":
date_format = date.strftime("%Y%m%d_%H%M%S")
else:
raise ValueError(
f"Unsupported interval: {interval}"
) # tenhertz currently not supported
# Filename following ImagCDF convention, see reference: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdf-file-names
filename = f"{observatory.lower()}_{date_format}_{publication_level}.cdf"
# If the urlTemplate explicitly specifies 'stdout', return 'stdout'
if self.urlTemplate.lower() == "stdout":
return "stdout"
# Prepare parameters for templating
params = {
"date": date.datetime,
"i": self._get_interval_abbreviation(interval),
"interval": self._get_interval_name(interval),
"minute": date.hour * 60 + date.minute,
"month": date.strftime("%b").lower(),
"MONTH": date.strftime("%b").upper(),
"obs": observatory.lower(),
"OBS": observatory.upper(),
"t": publication_level,
"type": self._get_type_name(type),
"julian": date.strftime("%j"),
"year": date.strftime("%Y"),
"ymd": date.strftime("%Y%m%d"),
"dt": date_format,
}
# Attempt to use the template provided in urlTemplate
if "{" in self.urlTemplate and "}" in self.urlTemplate:
try:
return self.urlTemplate.format(**params)
except KeyError as e:
raise TimeseriesFactoryException(
f"Invalid placeholder in urlTemplate: {e}"
)
# If the urlTemplate doesn't support placeholders, assume 'file://' scheme
if self.urlTemplate.startswith("file://"):
Shavers, Nicholas H
committed
base_path = self.urlTemplate[7:]
if not base_path or base_path == "{obs}_{dt}_{t}.cdf":
base_path = os.getcwd() # Default to current working directory
Shavers, Nicholas H
committed
return os.path.join(base_path, filename)
return os.path.join(self.urlTemplate, filename)
# Unsupported URL scheme
raise TimeseriesFactoryException(
f"Unsupported URL scheme in urlTemplate: {self.urlTemplate}"
)
def _get_vector_elements(self):
return {"X", "Y", "Z", "H", "D", "E", "V", "I", "F"}
def _get_scalar_elements(self):
return {"S", "G"}