Newer
Older
"""ImagCDFFactory Implementation Using cdflib
This module provides the ImagCDFFactory class for creating and writing
geomagnetic time series data in the ImagCDF format using the cdflib library.
The ImagCDF format is based on NASA's Common Data Format (CDF) and is designed
to store geomagnetic data with high precision.
References:
- ImagCDF Technical Note: https://intermagnet.org/docs/technical/im_tn_8_ImagCDF.pdf
- ImagCDF Official Documentation: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdfv1-2-intermagnet-exchange-format
- CDF User Guide: https://spdf.gsfc.nasa.gov/pub/software/cdf/doc/cdf_User_Guide.pdf
- CDFLib Docs: [https://cdflib.readthedocs.io/en/latest/#, https://cdflib.readthedocs.io/en/stable/modules/index.html]
"""
from __future__ import absolute_import, print_function
from io import BytesIO
import os
import sys
from typing import List, Optional, Union
from datetime import datetime, timezone, tzinfo
import numpy as np
from obspy import Stream, Trace, UTCDateTime
from sqlalchemy import true
from geomagio.api.ws.Element import TEMPERATURE_ELEMENTS_ID
from .geomag_types import DataInterval, DataType
from .TimeseriesFactoryException import TimeseriesFactoryException
from . import TimeseriesUtility
from . import Util
import cdflib
from cdflib.cdfwrite import CDF as CDFWriter
from cdflib.cdfread import CDF as CDFReader
import tempfile
class IMCDFPublicationLevel:
"""Handles publication levels and mapping between data types and levels.
The ImagCDF format uses publication levels to describe the processing
level of the data. This class maps data types (e.g., 'variation', 'definitive')
to their corresponding publication levels as defined in the ImagCDF documentation.
Publication Levels:
1: Raw data with no processing.
2: Edited data with preliminary baselines.
3: Data suitable for initial bulletins or quasi-definitive publication.
4: Definitive data with no further changes expected.
Reference:
- ImagCDF Technical Documentation: Attributes that Uniquely Identify the Data
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""
class PublicationLevel:
LEVEL_1 = "1"
LEVEL_2 = "2"
LEVEL_3 = "3"
LEVEL_4 = "4"
TYPE_TO_LEVEL = {
"none": PublicationLevel.LEVEL_1,
"variation": PublicationLevel.LEVEL_1,
"reported": PublicationLevel.LEVEL_1,
"provisional": PublicationLevel.LEVEL_2,
"adjusted": PublicationLevel.LEVEL_2,
"quasi-definitive": PublicationLevel.LEVEL_3,
"quasidefinitive": PublicationLevel.LEVEL_3,
"definitive": PublicationLevel.LEVEL_4,
}
def __init__(self, data_type: Optional[str] = None):
"""Initialize with a data type to determine the publication level."""
if data_type:
self.level = self.TYPE_TO_LEVEL.get(data_type.lower())
else:
raise ValueError("data_type must be provided.")
if not self.level:
raise ValueError(f"Unsupported data_type: {data_type}")
def to_string(self) -> str:
"""Return the publication level as a string."""
return self.level
class ImagCDFFactory(TimeseriesFactory):
"""Factory for creating and writing ImagCDF formatted CDF files.
This class extends the TimeseriesFactory to support writing geomagnetic
time series data to files in the ImagCDF format using the cdflib library.
"""
isUniqueTimes=True #used to determine depend_0 and CDF Time Variable Name
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def __init__(
self,
observatory: Optional[str] = None,
channels: List[str] = ("H", "D", "Z", "F"),
type: DataType = "variation",
interval: DataInterval = "minute",
urlTemplate="file://{obs}_{dt}_{t}.cdf",
urlInterval: int = -1,
):
"""
Initialize the ImagCDFFactory with default parameters.
Parameters:
- observatory: IAGA code of the observatory (e.g., 'BOU').
- channels: List of geomagnetic elements (e.g., ['H', 'D', 'Z', 'F']).
- type: Data type indicating the processing level (e.g., 'variation', 'definitive').
- interval: Data interval (e.g., 'minute', 'second').
- urlTemplate: Template for generating file URLs or paths.
- urlInterval: Interval size for splitting data into multiple files.
"""
super().__init__(
observatory=observatory,
channels=channels,
type=type,
interval=interval,
urlTemplate=urlTemplate,
urlInterval=urlInterval,
)
def parse_string(self, data: str, **kwargs):
"""Parse ImagCDF formatted string data into a Stream.
Note: Parsing from strings is not implemented in this factory.
"""
raise NotImplementedError('"parse_string" not implemented')
def write_file(self, fh, timeseries: Stream, channels: List[str]):
# Create a temporary file to write the CDF data
with tempfile.NamedTemporaryFile(delete=False, suffix='.cdf') as tmp_file:
tmp_file_path = tmp_file.name
cdf_spec = {
'Compressed': 9, # Enable compression (0-9)
'Majority': CDFWriter.ROW_MAJOR, # Data layout - gets set automatically
'Encoding': CDFWriter.IBMPC_ENCODING, # gets set automatically
'Checksum': True, # Disable checksum for faster writes (optional)
'rDim_sizes': [], # Applicable only if using rVariables - CDF protocol recommends only using zVariables.
}
cdf_writer = CDFWriter(path=tmp_file_path, cdf_spec=cdf_spec, delete=True)
global_attrs = self._create_global_attributes(timeseries, channels)
cdf_writer.write_globalattrs(global_attrs)
time_vars = self._create_time_stamp_variables(timeseries) #modifies self.isUniqueTimes
'Data_Type': CDFWriter.CDF_TIME_TT2000, # CDF_TIME_TT2000
'Num_Elements': 1,
'Rec_Vary': True,
'Var_Type': 'zVariable',
'Dim_Sizes': [],
'Sparse': 'no_sparse',
# Define time variable attributes
var_attrs = self._create_time_var_attrs(ts_name)
# Write time variable
cdf_writer.write_var(var_spec, var_attrs, ts_data)
temperature_index = 0
if channel in TEMPERATURE_ELEMENTS_ID:
temperature_index += 1 #MUST INCREMENT INDEX BEFORE USING
var_name = f"Temperature{temperature_index}"
else:
var_name = f"GeomagneticField{channel}"
data_type = self._get_cdf_data_type(trace)
num_elements = 1
if data_type in [CDFWriter.CDF_CHAR, CDFWriter.CDF_UCHAR]: # Handle string types
num_elements = len(trace.data[0]) if len(trace.data) > 0 else 1
'Variable': var_name,
'Data_Type': data_type,
'Num_Elements': num_elements,
'Rec_Vary': True,
'Var_Type': 'zVariable',
'Dim_Sizes': [],
'Sparse': 'no_sparse',
var_attrs = self._create_var_attrs(trace, temperature_index, self.isUniqueTimes)
# Write data variable
cdf_writer.write_var(var_spec, var_attrs, trace.data)
# Close the CDF writer
cdf_writer.close()
# Copy the temporary CDF file to the final file handle
with open(tmp_file_path, "rb") as tmp:
cdf_data = tmp.read()
fh.write(cdf_data)
finally:
# Cleanup the temporary file
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def put_timeseries(
self,
timeseries: Stream,
starttime: Optional[UTCDateTime] = None,
endtime: Optional[UTCDateTime] = None,
channels: Optional[List[str]] = None,
type: Optional[DataType] = None,
interval: Optional[DataInterval] = None,
):
"""
Store timeseries data in ImagCDF format using cdflib.
This method writes the timeseries data to one or more files, depending
on the specified urlInterval.
Parameters:
- timeseries: ObsPy Stream containing the geomagnetic data.
- starttime: Start time of the data to be written.
- endtime: End time of the data to be written.
- channels: List of channels to include in the output file.
- type: Data type indicating the processing level.
- interval: Data interval of the data.
"""
if len(timeseries) == 0:
# No data to store
return
channels = channels or self.channels
type = type or self.type
interval = interval or self.interval
# Extract metadata from the first trace
stats = timeseries[0].stats
delta = stats.delta # Sample rate
observatory = stats.station
starttime = starttime or stats.starttime
endtime = endtime or stats.endtime
# Split data into intervals if necessary
urlIntervals = Util.get_intervals(
starttime=starttime, endtime=endtime, size=self.urlInterval
)
for urlInterval in urlIntervals:
interval_start = urlInterval["start"]
interval_end = urlInterval["end"]
if interval_start != interval_end:
interval_end = interval_end - delta
url = self._get_url(
observatory=observatory,
date=interval_start,
type=type,
interval=interval,
channels=channels,
)
# Handle 'stdout' output
if url == 'stdout':
# Write directly to stdout
fh = sys.stdout.buffer
url_data = timeseries.slice(
starttime=interval_start,
endtime=interval_end,
)
self.write_file(fh, url_data, channels)
continue # Proceed to next interval if any
# Handle 'file://' output
elif url.startswith('file://'):
# Get the file path from the URL
url_file = Util.get_file_from_url(url, createParentDirectory=False)
url_data = timeseries.slice(
starttime=interval_start,
endtime=interval_end,
)
# Check if the file already exists to merge data
if os.path.isfile(url_file):
try:
# Read existing data to merge with new data
# existing_cdf.close() #no close method?
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
existing_data = existing_stream
# Merge existing data with new data
for trace in existing_data:
new_trace = url_data.select(
network=trace.stats.network,
station=trace.stats.station,
channel=trace.stats.channel,
)
if new_trace:
trace.data = np.concatenate((trace.data, new_trace[0].data))
url_data = existing_data + url_data
except Exception as e:
# Log the exception if needed
print(f"Warning: Could not read existing CDF file '{url_file}': {e}", file=sys.stderr)
# Proceed with new data
# Pad the data with NaNs to ensure it fits the interval
url_data.trim(
starttime=interval_start,
endtime=interval_end,
nearest_sample=False,
pad=True,
fill_value=np.nan,
)
# Write the data to the CDF file
with open(url_file, "wb") as fh:
self.write_file(fh, url_data, channels)
else:
# Unsupported URL scheme encountered
raise TimeseriesFactoryException("Unsupported URL scheme in urlTemplate")
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
def get_timeseries(
self,
starttime: UTCDateTime,
endtime: UTCDateTime,
add_empty_channels: bool = True,
observatory: Optional[str] = None,
channels: Optional[List[str]] = None,
type: Optional[DataType] = None,
interval: Optional[DataInterval] = None,
) -> Stream:
observatory = observatory or self.observatory
channels = channels or self.channels
type = type or self.type
interval = interval or self.interval
timeseries = Stream()
urlIntervals = Util.get_intervals(
starttime=starttime, endtime=endtime, size=self.urlInterval
)
for urlInterval in urlIntervals:
url = self._get_url(
observatory=observatory,
date=urlInterval["start"],
type=type,
interval=interval,
channels=channels,
)
if url == 'stdout':
continue # stdout is not a valid input source
if not url.startswith("file://"):
raise TimeseriesFactoryException("Only file urls are supported for reading ImagCDF")
url_file = Util.get_file_from_url(url, createParentDirectory=False)
if not os.path.isfile(url_file):
# If file doesn't exist, skip
continue
try:
# Read CDF data and merge
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
file_stream = self._read_cdf(cdf)
# Attempt to select only requested channels
selected = Stream()
for ch in channels:
selected += file_stream.select(channel=ch)
timeseries += selected
except Exception as e:
print(f"Error reading CDF file '{url_file}': {e}", file=sys.stderr)
# After reading all intervals, merge and trim
timeseries.merge(fill_value=np.nan)
timeseries.trim(
starttime=starttime,
endtime=endtime,
nearest_sample=False,
pad=True,
fill_value=np.nan,
)
# If requested, add empty channels not present in data
if add_empty_channels:
present_channels = {tr.stats.channel for tr in timeseries}
missing_channels = set(channels) - present_channels
for ch in missing_channels:
empty_trace = self._get_empty_trace(
starttime=starttime,
endtime=endtime,
observatory=observatory,
channel=ch,
data_type=type,
interval=interval,
)
timeseries += empty_trace
timeseries.sort()
return timeseries
def _create_global_attributes(self, timeseries: Stream, channels: List[str]) -> dict:
"""
Create a dictionary of global attributes for the ImagCDF file.
These attributes apply to all the data in the file and include metadata
such as observatory information, data publication level, and format
descriptions.
References:
- ImagCDF Technical Documentation: ImagCDF Global Attributes
"""
stats = timeseries[0].stats if len(timeseries) > 0 else None
# Extract metadata from stats or fallback to defaults
observatory_name = getattr(stats, 'station_name', None) or self.observatory or ""
station = getattr(stats, 'station', None) or ""
institution = getattr(stats, 'agency_name', None) or ""
latitude = getattr(stats, 'geodetic_latitude', None) or 0.0
longitude = getattr(stats, 'geodetic_longitude', None) or 0.0
elevation = getattr(stats, 'elevation', None) or 99_999.0
conditions_of_use = getattr(stats, 'conditions_of_use', None) or ""
vector_orientation = getattr(stats, 'sensor_orientation', None) or ""
data_interval_type = getattr(stats, 'data_interval_type', None) or self.interval
data_type = getattr(stats, 'data_type', None) or "variation"
sensor_sampling_rate = getattr(stats, 'sensor_sampling_rate', None) or 0.0
comments = getattr(stats, 'filter_comments', None) or ['']
declination_base = getattr(stats, 'declination_base', None) or 0.0
publication_level = IMCDFPublicationLevel(data_type=self.type).to_string()
global_attrs = {
'FormatDescription': {0: 'INTERMAGNET CDF Format'},
'FormatVersion': {0: '1.2'},
'Title': {0: 'Geomagnetic time series data'},
'IagaCode': {0: station},
'ElementsRecorded': {0: ''.join(channels)},
'PublicationLevel': {0: publication_level},
'PublicationDate': {0: [cdflib.cdfepoch.timestamp_to_tt2000(datetime.timestamp(datetime.now(timezone.utc))), "cdf_time_tt2000"]},
'Latitude': {0: np.array([latitude], dtype=np.float64)},
'Longitude': {0: np.array([longitude], dtype=np.float64)},
'Elevation': {0: np.array([elevation], dtype=np.float64)},
'Institution': {0: institution},
'VectorSensOrient': {0: vector_orientation}, #remove F - because its a calculation, not an element?
'StandardLevel': {0: 'None'}, # Set to 'None'
# Temporarily Omit 'StandardName', 'StandardVersion', 'PartialStandDesc'
'Source': {0: 'institute'}, # "institute" - if the named institution provided the data, “INTERMAGNET” - if the data file has been created by INTERMAGNET from another data source, “WDC” - if the World Data Centre has created the file from another data source
'TermsOfUse': {0: conditions_of_use},
# 'UniqueIdentifier': {0: ''},
# 'ParentIdentifiers': {0: ''},
# 'ReferenceLinks': {0: ''}, #links to /ws, plots, USGS.gov
'SensorSamplingRate': {0: sensor_sampling_rate}, #Optional
'DataType': {0: data_type},#Optional
'Comments': {0: comments}, #Optional
'DeclinationBase': {0: declination_base}, #Optional
}
return global_attrs
def _create_time_stamp_variables(self, timeseries: Stream) -> dict:
vector_times = None
scalar_times = None
temperature_times = {}
temperature_index = 1
for trace in timeseries:
channel = trace.stats.channel
times = [
(trace.stats.starttime + trace.stats.delta * i).datetime.replace(tzinfo=timezone.utc)
for i in range(trace.stats.npts)
]
# Convert timestamps to TT2000 format required by CDF
tt2000_times = cdflib.cdfepoch.timestamp_to_tt2000([time.timestamp() for time in times])
if channel in self._get_vector_elements():
if vector_times is None:
vector_times = tt2000_times
else:
if not np.array_equal(vector_times, tt2000_times):
raise ValueError("Time stamps for vector channels are not the same.")
elif channel in self._get_scalar_elements():
if scalar_times is None:
scalar_times = tt2000_times
else:
if not np.array_equal(scalar_times, tt2000_times):
raise ValueError("Time stamps for scalar channels are not the same.")
elif channel in TEMPERATURE_ELEMENTS_ID:
ts_key = f"Temperature{temperature_index}Times"
if ts_key not in temperature_times:
temperature_times[ts_key] = tt2000_times
temperature_index += 1
else:
temperature_times[ts_key] = tt2000_times
time_vars = {}
if vector_times is not None:
time_vars['GeomagneticVectorTimes'] = vector_times
if scalar_times is not None:
time_vars['GeomagneticScalarTimes'] = scalar_times
if temperature_times:
time_vars.update(temperature_times)
last_times = []
self.isUniqueTimes = len(time_vars) == 1 #true if only one set of times, else default to false.
for index, times in enumerate(time_vars.values()):
if index > 0:
self.isUniqueTimes = not np.array_equal(last_times, times)
last_times = times
return time_vars if self.isUniqueTimes else {"DataTimes": last_times}
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
def _create_var_spec(
self,
var_name: str,
data_type: str,
num_elements: int,
var_type: str,
dim_sizes: List[int],
sparse: bool,
compress: int,
pad: Optional[Union[str, np.ndarray]],
) -> dict:
"""
Create a variable specification dictionary for cdflib.
This is used to define the properties of a variable when writing it to
the CDF file.
Parameters:
- var_name: Name of the variable.
- data_type: CDF data type.
- num_elements: Number of elements per record.
- var_type: Variable type ('zVariable' or 'rVariable').
- dim_sizes: Dimensions of the variable (empty list for 0D).
- sparse: Whether the variable uses sparse records.
- compress: Compression level.
- pad: Pad value for sparse records.
Reference:
- CDF User's Guide: Variable Specification
"""
var_spec = {
'Variable': var_name,
'Data_Type': data_type,
'Num_Elements': num_elements,
'Rec_Vary': True,
'Var_Type': var_type,
'Dim_Sizes': dim_sizes,
'Sparse': 'no_sparse' if not sparse else 'pad_sparse',
'Compress': compress,
'Pad': pad,
}
return var_spec
def _create_var_attrs(self, trace: Trace, temperature_index: Optional[int] = None, isUniqueTimes: Optional[bool] = True) -> dict:
channel = trace.stats.channel.upper()
fieldnam = f"Geomagnetic Field Element {channel}" # “Geomagnetic Field Element ” + the element code or “Temperature ” + the name of the location where the temperature was recorded.
units = '' # Must be one of “nT”, “Degrees of arc” or “Celsius”
if channel == 'D':
units = 'Degrees of arc'
validmin = -360.0
validmax = 360.0 # A full circle representation
elif channel == 'I':
units = 'Degrees of arc'
validmin = -90.0
validmax = 90.0 #The magnetic field vector can point straight down (+90°), horizontal (0°), or straight up (-90°).
elif channel in TEMPERATURE_ELEMENTS_ID:
fieldnam = f"Temperature {temperature_index} {trace.stats.location}"
validmin = -273.15 #absolute zero
validmax = 79_999
elif channel in ['F','S']:
units = 'nT'
validmin = 0.0 # negative magnetic field intestity not physically meaningful.
validmax = 79_999.0
elif channel in ['X', 'Y', 'Z', 'H', 'E', 'V', 'G']:
units = 'nT'
validmin = -79_999.0
validmax = 79_999.0
# Determine DEPEND_0 based on channel type
if channel in self._get_vector_elements():
depend_0 = 'GeomagneticVectorTimes'
elif channel in self._get_scalar_elements():
depend_0 = 'GeomagneticScalarTimes'
elif channel in TEMPERATURE_ELEMENTS_ID:
depend_0 = f"Temperature{temperature_index}Times"
var_attrs = {
'FIELDNAM': fieldnam,
'UNITS': units,
'FILLVAL': 99999.0,
'VALIDMIN': validmin,
'VALIDMAX': validmax,
'DEPEND_0': depend_0 if isUniqueTimes else "DataTimes",
'DATA_INTERVAL_TYPE': trace.stats.data_interval_type
}
return var_attrs
def _create_time_var_attrs(self, ts_name: str) -> dict:
"""
Create a dictionary of time variable attributes.
These attributes provide metadata for time variables.
Note: None of these attributes are required for the time stamp variables.
"""
# var_attrs = {
# 'UNITS': 'TT2000',
# 'DISPLAY_TYPE': 'time_series',
# 'LABLAXIS': 'Time',
# }
# return var_attrs
return {}
def _get_cdf_data_type(self, trace: Trace) -> int:
"""
Map ObsPy trace data type to CDF data type.
Determines the appropriate CDF data type based on the NumPy data type
of the trace data.
Returns:
- CDF_DOUBLE (45) for floating-point data.
- CDF_INT4 (41) for integer data.
Reference:
"""
Read CDF data into an ObsPy Stream.
This method reads the data variables and their corresponding time
variables from a CDF file and constructs an ObsPy Stream.
Parameters:
- cdf: cdflib CDF object representing the open CDF file.
Returns:
- An ObsPy Stream containing the data from the CDF file.
"""
stream = Stream()
# Extract global attributes
global_attrs = cdf.globalattsget()
# Map global attributes to Stream-level metadata
observatory = global_attrs.get('IagaCode', [''])[0]
station_name = global_attrs.get('ObservatoryName', [''])[0]
institution = global_attrs.get('Institution', [''])[0]
latitude = global_attrs.get('Latitude', [0.0])[0]
longitude = global_attrs.get('Longitude', [0.0])[0]
elevation = global_attrs.get('Elevation', [99_999.0])[0] #default to 99_999 per technical documents.
sensor_sampling_rate = global_attrs.get('SensorSamplingRate', [0.0])[0]
sensor_orientation = global_attrs.get('VectorSensOrient', [''])[0]
data_type = global_attrs.get('DataType', ['variation'])[0]
publication_level = global_attrs.get('PublicationLevel', ['1'])[0]
comments = global_attrs.get('Comments', [''])
terms_of_use = global_attrs.get('TermsOfUse', [''])[0]
declination_base = global_attrs.get('DeclinationBase', [0.0])[0]
# Identify all time variables
for var in cdf.cdf_info().zVariables:
if var.lower().endswith('times'):
unix_times = cdflib.cdfepoch.unixtime(time_data)
utc_times = [UTCDateTime(t) for t in unix_times]
# Read data variables and associate them with time variables
for var in cdf.cdf_info().zVariables:
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
# Skip time variables
if var.lower().endswith('times'):
continue
data = cdf.varget(var)
attrs = cdf.varattsget(var)
# Determine DEPEND_0 (the time variable name) and validate
ts_name = attrs.get('DEPEND_0')
if not ts_name:
# If no DEPEND_0, skip this variable as we cannot map times
continue
# If we used a DataTimes fallback or similar, ensure we handle it case-insensitively
# and also confirm that time_vars keys are checked properly.
# The ImagCDF can have DataTimes, GeomagneticVectorTimes, etc.
# So try exact match first, if not found, attempt case-insensitive.
matched_time_key = None
for tkey in time_vars.keys():
if tkey == ts_name:
matched_time_key = tkey
break
if tkey.lower() == ts_name.lower():
matched_time_key = tkey
break
if matched_time_key not in time_vars:
# If we cannot find the matching time variable, skip this variable
continue
times = time_vars[matched_time_key]
# Determine delta (sample interval)
if len(times) > 1:
# delta as a float of seconds
delta = (times[1].timestamp - times[0].timestamp)
else:
# if only one sample, use default based on interval
# convert minute, second, etc. to numeric delta
if self.interval == 'minute':
delta = 60.0
elif self.interval == 'second':
delta = 1.0
elif self.interval == 'hour':
delta = 3600.0
else:
# fallback, set delta to 60
delta = 60.0
# Map the variable name back to a standard channel code
# Geomagnetic fields are named like GeomagneticFieldH, GeomagneticFieldD, etc.
# Temperatures are named like Temperature1, Temperature2, ...
# Extract channel name by removing known prefixes
if var.startswith("GeomagneticField"):
channel = var.replace("GeomagneticField", "")
elif var.startswith("Temperature"):
# Temperature variables may not map directly to a geomagnetic channel
# but to temperature sensors. We can just use the label from LABLAXIS if needed
channel = attrs.get('LABLAXIS', var)
else:
# fallback if naming doesn't match expected patterns
channel = var
time_attrs =cdf.varattsget(var)
data_interval = time_attrs.get('DATA_INTERVAL_TYPE', [''])
# Create a trace
trace = Trace(
data=data,
header={
'station': observatory,
'channel': channel,
'starttime': times[0],
'delta': delta,
'geodetic_latitude': latitude,
'geodetic_longitude': longitude,
'elevation': elevation,
'sensor_orientation': "".join(sensor_orientation),
'data_type': data_type,
'station_name': station_name,
'agency_name': institution,
'conditions_of_use': terms_of_use,
'sensor_sampling_rate': sensor_sampling_rate,
'data_interval_type': data_interval,
'declination_base': declination_base,
'filter_comments': comments
}
)
stream += trace
return stream
def _get_url(
self,
observatory: str,
date: UTCDateTime,
type: DataType = "variation",
interval: DataInterval = "minute",
channels: Optional[List[str]] = None,
) -> str:
"""
Generate the file URL specific to ImagCDF conventions.
This method constructs the filename based on the ImagCDF naming
conventions, which include the observatory code, date-time formatted
according to the data interval, and the publication level.
[iaga-code]_[date-time]_[publication-level].cdf
Parameters:
- observatory: IAGA code of the observatory.
- date: Start date for the file.
- type: Data type indicating the processing level.
- interval: Data interval (e.g., 'minute', 'second').
- channels: List of channels (optional). Not Implemented
Returns:
- The formatted file URL or path.
Reference:
- ImagCDF Technical Documentation: ImagCDF File Names
"""
# Get the publication level for the type
publication_level = IMCDFPublicationLevel(data_type=type).to_string()
# Format of Date/Time Portion of Filename based on interval see reference: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#example-data-file:~:text=Format%20of%20Date,%EF%83%81
if interval == "year":
date_format = date.strftime("%Y")
elif interval == "month":
date_format = date.strftime("%Y%m")
elif interval == "day":
date_format = date.strftime("%Y%m%d")
elif interval == "hour":
date_format = date.strftime("%Y%m%d_%H")
elif interval == "minute":
date_format = date.strftime("%Y%m%d_%H%M")
elif interval == "second":
date_format = date.strftime("%Y%m%d_%H%M%S")
else:
raise ValueError(f"Unsupported interval: {interval}") #tenhertz currently not supported
# Filename following ImagCDF convention, see reference: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdf-file-names
filename = f"{observatory.lower()}_{date_format}_{publication_level}.cdf"
# If the urlTemplate explicitly specifies 'stdout', return 'stdout'
if self.urlTemplate.lower() == "stdout":
return "stdout"
# Prepare parameters for templating
params = {
"date": date.datetime,
"i": self._get_interval_abbreviation(interval),
"interval": self._get_interval_name(interval),
"minute": date.hour * 60 + date.minute,
"month": date.strftime("%b").lower(),
"MONTH": date.strftime("%b").upper(),
"obs": observatory.lower(),
"OBS": observatory.upper(),
"t": publication_level,
"type": self._get_type_name(type),
"julian": date.strftime("%j"),
"year": date.strftime("%Y"),
"ymd": date.strftime("%Y%m%d"),
"dt": date_format,
}
# Attempt to use the template provided in urlTemplate
if "{" in self.urlTemplate and "}" in self.urlTemplate:
try:
return self.urlTemplate.format(**params)
except KeyError as e:
raise TimeseriesFactoryException(f"Invalid placeholder in urlTemplate: {e}")
# If the urlTemplate doesn't support placeholders, assume 'file://' scheme
if self.urlTemplate.startswith("file://"):
base_path = self.urlTemplate[7:] # Strip "file://"
if not base_path or base_path == "{obs}_{dt}_{t}.cdf":
base_path = os.getcwd() # Default to current working directory
return os.path.join(base_path, filename)
# Unsupported URL scheme
raise TimeseriesFactoryException(
f"Unsupported URL scheme in urlTemplate: {self.urlTemplate}"
)
def _get_vector_elements(self):
return {'X', 'Y', 'Z', 'H', 'D', 'E', 'V', 'I', 'F'}
def _get_scalar_elements(self):
return {'S', 'G'}