Newer
Older
"""ImagCDFFactory Implementation Using cdflib
This module provides the ImagCDFFactory class for creating and writing
geomagnetic time series data in the ImagCDF format using the cdflib library.
The ImagCDF format is based on NASA's Common Data Format (CDF) and is designed
to store geomagnetic data with high precision.
References:
- ImagCDF Technical Note: https://intermagnet.org/docs/technical/im_tn_8_ImagCDF.pdf
- ImagCDF Official Documentation: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdfv1-2-intermagnet-exchange-format
- CDF User Guide: https://spdf.gsfc.nasa.gov/pub/software/cdf/doc/cdf_User_Guide.pdf
- CDFLib Docs: [https://cdflib.readthedocs.io/en/latest/#, https://cdflib.readthedocs.io/en/stable/modules/index.html]
Shavers, Nicholas H
committed
from typing import List, Optional
from datetime import datetime, timezone
import numpy as np
from obspy import Stream, Trace, UTCDateTime
from geomagio.TimeseriesFactory import TimeseriesFactory
from geomagio.api.ws.Element import TEMPERATURE_ELEMENTS_ID
Shavers, Nicholas H
committed
from ..geomag_types import DataInterval, DataType
from ..TimeseriesFactoryException import TimeseriesFactoryException
from .. import Util
Shavers, Nicholas H
committed
from cdflib import cdfepoch
from cdflib.cdfwrite import CDF as CDFWriter
from cdflib.cdfread import CDF as CDFReader
Shavers, Nicholas H
committed
from .IMCDFPublication import IMCDFPublicationLevel
class ImagCDFFactory(TimeseriesFactory):
"""Factory for creating and writing ImagCDF formatted CDF files.
This class extends the TimeseriesFactory to support writing geomagnetic
time series data to files in the ImagCDF format using the cdflib library.
"""
isUniqueTimes = True # used to determine depend_0 and CDF Time Variable Name
Shavers, Nicholas H
committed
NONSTANDARD_ELEMENTS = (
TEMPERATURE_ELEMENTS_ID # for using non standard elements such as UK1
)
__FILL_VALUE = 99_999
__MAX_COMPRESSION = 9
def __init__(
self,
observatory: Optional[str] = None,
channels: List[str] = ("H", "D", "Z", "F"),
type: DataType = "variation",
interval: DataInterval = "minute",
Shavers, Nicholas H
committed
urlTemplate="file://{obs}_{dt}_{t}.cdf",
Shavers, Nicholas H
committed
inputFile: Optional[str] = None,
output: Optional[str] = "iaga2002",
):
"""
Initialize the ImagCDFFactory with default parameters.
Parameters:
- observatory: IAGA code of the observatory (e.g., 'BOU').
- channels: List of geomagnetic elements (e.g., ['H', 'D', 'Z', 'F']).
- type: Data type indicating the processing level (e.g., 'variation', 'definitive').
- interval: Data interval (e.g., 'minute', 'second').
- urlTemplate: Template for generating file URLs or paths.
- urlInterval: Interval size for splitting data into multiple files.
Shavers, Nicholas H
committed
- inputFile: An ImagCDF file to read data from. If not provided urlTemplate is assumed path for reads.
"""
super().__init__(
observatory=observatory,
channels=channels,
type=type,
interval=interval,
urlTemplate=urlTemplate,
)
Shavers, Nicholas H
committed
self.inputFile = inputFile
self.output = (
output if output in ["iaga2002", "imfjson"] else "iaga2002"
) # determines mapping of publication level to data type
def write_file(self, fh, timeseries: Stream, channels: List[str]):
# Create a temporary file to write the CDF data
with tempfile.NamedTemporaryFile(delete=False, suffix=".cdf") as tmp_file:
"Compressed": self.__MAX_COMPRESSION, # Max Gzip compression (1-9). Almost always the GZIP is the best choice for all data. (CDF User Guide p.23 1.4.3)
Shavers, Nicholas H
committed
"Majority": CDFWriter.ROW_MAJOR, # The first dimension changes the slowest (CDF User Guide p.45 2.3.15 Majority)
"Encoding": CDFWriter.NETWORK_ENCODING, # XDR Encoding -Portable between two or more different types of computers.
Shavers, Nicholas H
committed
"Checksum": True, # True for Data Integrity. False for faster writes (optional)
"rDim_sizes": [], # Applicable only if using rVariables - CDF protocol recommends only using zVariables.
cdf_writer = CDFWriter(path=tmp_file_path, cdf_spec=cdf_spec, delete=True)
global_attrs = self._create_global_attributes(timeseries, channels)
cdf_writer.write_globalattrs(global_attrs)
time_vars = self._create_time_stamp_variables(
timeseries
) # modifies self.isUniqueTimes
"Variable": ts_name,
"Data_Type": CDFWriter.CDF_TIME_TT2000, # CDF_TIME_TT2000
"Num_Elements": 1,
"Rec_Vary": True,
"Var_Type": "zVariable",
"Dim_Sizes": [],
Shavers, Nicholas H
committed
"Sparse": "no_sparse", # no_sparse because there should not be time gaps. (Time stamps must represent a regular time series with no missing values in the series)
Shavers, Nicholas H
committed
"Compress": self.__MAX_COMPRESSION,
Shavers, Nicholas H
committed
var_attrs = (
{}
) # ImagCDF does not require or recommend any 'time' variable attributes.
# Write time variable
cdf_writer.write_var(var_spec, var_attrs, ts_data)
temperature_index = 0
Shavers, Nicholas H
committed
var_name = f"GeomagneticField{channel}"
# if channel in REAL_TEMPERATURE:
# temperature_index += 1 # MUST INCREMENT INDEX BEFORE USING
# var_name = f"Temperature{temperature_index}"
Shavers, Nicholas H
committed
data_type = (
CDFWriter.CDF_INT4
if trace.data.dtype in [np.int32, np.int64]
else CDFWriter.CDF_DOUBLE
)
"Variable": var_name,
"Data_Type": data_type,
Shavers, Nicholas H
committed
"Num_Elements": 1,
"Rec_Vary": True,
"Var_Type": "zVariable",
"Dim_Sizes": [],
"Sparse": "no_sparse",
Shavers, Nicholas H
committed
"Compress": self.__MAX_COMPRESSION,
var_attrs = self._create_var_attrs(
trace, temperature_index, self.isUniqueTimes
)
# Write data variable
cdf_writer.write_var(var_spec, var_attrs, trace.data)
# Close the CDF writer
cdf_writer.close()
# Copy the temporary CDF file to the final file handle
with open(tmp_file_path, "rb") as tmp:
cdf_data = tmp.read()
fh.write(cdf_data)
finally:
# Cleanup the temporary file
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def put_timeseries(
self,
timeseries: Stream,
starttime: Optional[UTCDateTime] = None,
endtime: Optional[UTCDateTime] = None,
channels: Optional[List[str]] = None,
type: Optional[DataType] = None,
interval: Optional[DataInterval] = None,
):
"""
Store timeseries data in ImagCDF format using cdflib.
This method writes the timeseries data to one or more files, depending
on the specified urlInterval.
Parameters:
- timeseries: ObsPy Stream containing the geomagnetic data.
- starttime: Start time of the data to be written.
- endtime: End time of the data to be written.
- channels: List of channels to include in the output file.
- type: Data type indicating the processing level.
- interval: Data interval of the data.
"""
if len(timeseries) == 0:
# No data to store
return
channels = channels or self.channels
type = type or self.type
interval = interval or self.interval
# Extract metadata from the first trace
stats = timeseries[0].stats
observatory = stats.station
starttime = starttime or stats.starttime
endtime = endtime or stats.endtime
# Split data into intervals if necessary
Shavers, Nicholas H
committed
urlIntervals = Util.get_intervals(
starttime=starttime, endtime=endtime, size=self.urlInterval
)
for urlInterval in urlIntervals:
interval_start = urlInterval["start"]
interval_end = urlInterval["end"]
url = self._get_url(
observatory=observatory,
date=interval_start,
type=type,
interval=interval,
channels=channels,
)
# Handle 'stdout' output
# Write directly to stdout
fh = sys.stdout.buffer
url_data = timeseries.slice(
starttime=interval_start,
endtime=interval_end,
)
self.write_file(fh, url_data, channels)
continue # Proceed to next interval if any
# Handle 'file://' output
# Get the file path from the URL
url_file = Util.get_file_from_url(url, createParentDirectory=False)
url_data = timeseries.slice(
starttime=interval_start,
endtime=interval_end,
)
Shavers, Nicholas H
committed
raise TimeseriesFactoryException(
f"Error: File '{url_file}' already exists."
)
Shavers, Nicholas H
committed
# Pad the data to ensure it fits the interval
url_data.trim(
starttime=interval_start,
endtime=interval_end,
Shavers, Nicholas H
committed
nearest_sample=True,
)
# Write the data to the CDF file
with open(url_file, "wb") as fh:
self.write_file(fh, url_data, channels)
else:
# Unsupported URL scheme encountered
raise TimeseriesFactoryException(
"Unsupported URL scheme in urlTemplate"
)
def get_timeseries(
self,
starttime: UTCDateTime,
endtime: UTCDateTime,
add_empty_channels: bool = True,
observatory: Optional[str] = None,
channels: Optional[List[str]] = None,
type: Optional[DataType] = None,
interval: Optional[DataInterval] = None,
) -> Stream:
observatory = observatory or self.observatory
channels = channels or self.channels
type = type or self.type
interval = interval or self.interval
timeseries = Stream()
urlIntervals = Util.get_intervals(
starttime=starttime, endtime=endtime, size=self.urlInterval
)
for urlInterval in urlIntervals:
Shavers, Nicholas H
committed
if self.inputFile is None:
Shavers, Nicholas H
committed
url = self._get_url(
observatory=observatory,
date=urlInterval["start"],
type=type,
interval=interval,
channels=channels,
Shavers, Nicholas H
committed
if url == "stdout":
continue # stdout is not a valid input source
if not url.startswith("file://"):
raise TimeseriesFactoryException(
"Only file urls are supported for reading ImagCDF"
)
url_file = Util.get_file_from_url(url, createParentDirectory=False)
if not os.path.isfile(url_file):
# If file doesn't exist, skip
continue
Shavers, Nicholas H
committed
else:
Shavers, Nicholas H
committed
url_file = self.inputFile
try:
# Read CDF data and merge
Shavers, Nicholas H
committed
timeseries = self._read_cdf(cdf, channels)
except Exception as e:
print(f"Error reading CDF file '{url_file}': {e}", file=sys.stderr)
# After reading all intervals, merge and trim
Shavers, Nicholas H
committed
timeseries.merge(fill_value=self.__FILL_VALUE)
timeseries.trim(
starttime=starttime,
endtime=endtime,
Shavers, Nicholas H
committed
nearest_sample=True,
pad=True,
Shavers, Nicholas H
committed
fill_value=self.__FILL_VALUE,
)
# If requested, add empty channels not present in data
if add_empty_channels:
present_channels = {tr.stats.channel for tr in timeseries}
missing_channels = set(channels) - present_channels
for ch in missing_channels:
empty_trace = self._get_empty_trace(
starttime=starttime,
endtime=endtime,
observatory=observatory,
channel=ch,
data_type=type,
interval=interval,
)
timeseries += empty_trace
timeseries.sort()
return timeseries
def _create_global_attributes(
self, timeseries: Stream, channels: List[str]
) -> dict:
"""
Create a dictionary of global attributes for the ImagCDF file.
These attributes apply to all the data in the file and include metadata
such as observatory information, data publication level, and format
descriptions.
References:
- ImagCDF Technical Documentation: ImagCDF Global Attributes
"""
stats = timeseries[0].stats if len(timeseries) > 0 else None
# Extract metadata from stats or fallback to defaults
observatory_name = (
getattr(stats, "station_name", None) or self.observatory or ""
)
station = getattr(stats, "station", None) or ""
Shavers, Nicholas H
committed
network = getattr(stats, "network", None) or ""
institution = getattr(stats, "agency_name", None) or ""
latitude = getattr(stats, "geodetic_latitude", None) or 0.0
longitude = getattr(stats, "geodetic_longitude", None) or 0.0
Shavers, Nicholas H
committed
elevation = getattr(stats, "elevation", None) or self.__FILL_VALUE
conditions_of_use = getattr(stats, "conditions_of_use", None) or ""
vector_orientation = getattr(stats, "sensor_orientation", None) or ""
data_type = getattr(stats, "data_type", None) or "variation"
sensor_sampling_rate = getattr(stats, "sensor_sampling_rate", None) or 0.0
comments = getattr(stats, "filter_comments", None) or [""]
declination_base = getattr(stats, "declination_base", None) or 0.0
publication_level = IMCDFPublicationLevel(value=self.type).get_level()
"FormatDescription": {0: "INTERMAGNET CDF Format"},
"FormatVersion": {0: "1.2"},
"Title": {0: "Geomagnetic time series data"},
"IagaCode": {0: station},
"ElementsRecorded": {0: "".join(channels)},
"PublicationLevel": {0: publication_level},
"PublicationDate": {
0: [
Shavers, Nicholas H
committed
cdfepoch.timestamp_to_tt2000(
datetime.timestamp(datetime.now(timezone.utc))
),
"cdf_time_tt2000",
]
},
"ObservatoryName": {0: observatory_name},
"Latitude": {0: np.array([latitude], dtype=np.float64)},
"Longitude": {0: np.array([longitude], dtype=np.float64)},
"Elevation": {0: np.array([elevation], dtype=np.float64)},
"Institution": {0: institution},
Shavers, Nicholas H
committed
"VectorSensOrient": {0: vector_orientation},
"StandardLevel": {0: "None"},
# Can Omit 'StandardName', 'StandardVersion', 'PartialStandDesc' when StandardLevel=None
"Source": {
0: "institute"
}, # "institute" - if the named institution provided the data, “INTERMAGNET” - if the data file has been created by INTERMAGNET from another data source, “WDC” - if the World Data Centre has created the file from another data source
"TermsOfUse": {0: conditions_of_use},
Shavers, Nicholas H
committed
# [[Optional Attributes]]
# 'UniqueIdentifier': {0: ''},
# 'ParentIdentifiers': {0: ''},
# 'ReferenceLinks': {0: ''}, #links to /ws, plots, USGS.gov
Shavers, Nicholas H
committed
# [[Custom Attributes]]
"SensorSamplingRate": {0: sensor_sampling_rate},
"DataType": {0: data_type},
"Comments": {0: comments},
"DeclinationBase": {0: declination_base},
"Network": {0: network},
}
return global_attrs
def _create_time_stamp_variables(self, timeseries: Stream) -> dict:
vector_times = None
scalar_times = None
Shavers, Nicholas H
committed
nonstandard_times = None
temperature_times = {}
temperature_index = 1
Shavers, Nicholas H
committed
channels = []
Shavers, Nicholas H
committed
channels.append(channel)
(trace.stats.starttime + trace.stats.delta * i).datetime.replace(
tzinfo=timezone.utc
)
for i in range(trace.stats.npts)
]
# Convert timestamps to TT2000 format required by CDF
Shavers, Nicholas H
committed
tt2000_times = cdfepoch.timestamp_to_tt2000(
[time.timestamp() for time in times]
)
if channel in self._get_vector_elements():
if vector_times is None:
vector_times = tt2000_times
else:
if not np.array_equal(vector_times, tt2000_times):
raise ValueError(
"Time stamps for vector channels are not the same."
)
elif channel in self._get_scalar_elements():
if scalar_times is None:
scalar_times = tt2000_times
else:
if not np.array_equal(scalar_times, tt2000_times):
raise ValueError(
"Time stamps for scalar channels are not the same."
)
Shavers, Nicholas H
committed
# elif channel in REAL_TEMPERATURES:
# ts_key = f"Temperature{temperature_index}Times"
# if ts_key not in temperature_times:
# temperature_times[ts_key] = tt2000_times
# temperature_index += 1
# else:
# temperature_times[ts_key] = tt2000_times
elif channel in self.NONSTANDARD_ELEMENTS:
if scalar_times is None:
nonstandard_times = tt2000_times
else:
Shavers, Nicholas H
committed
if not np.array_equal(scalar_times, tt2000_times):
raise ValueError(
"Time stamps for nonstandard channels are not the same."
)
time_vars["GeomagneticVectorTimes"] = vector_times
time_vars["GeomagneticScalarTimes"] = scalar_times
if temperature_times:
time_vars.update(temperature_times)
Shavers, Nicholas H
committed
isNonStandard = nonstandard_times is not None
if isNonStandard:
time_vars["NonstandardTimes"] = nonstandard_times
last_times = []
Shavers, Nicholas H
committed
Shavers, Nicholas H
committed
len(channels) == 1
) # (If a single channel, then use specific naming i.e GeomagneticVectorTimes)
for index, times in enumerate(time_vars.values()):
if index > 0:
self.isUniqueTimes = not np.array_equal(last_times, times)
last_times = times
Shavers, Nicholas H
committed
if self.isUniqueTimes and isNonStandard and len(time_vars) > 2:
raise ValueError(
f"Time stamps must be the same for all channels when using nonstandard channels: {','.join(self.NONSTANDARD_ELEMENTS)}"
)
return (
{"DataTimes": last_times}
if isNonStandard or not self.isUniqueTimes
else time_vars
)
def _create_var_attrs(
self,
trace: Trace,
temperature_index: Optional[int] = None,
isUniqueTimes: Optional[bool] = True,
) -> dict:
channel = trace.stats.channel.upper()
fieldnam = f"Geomagnetic Field Element {channel}" # “Geomagnetic Field Element ” + the element code or “Temperature ” + the name of the location where the temperature was recorded.
units = "" # Must be one of “nT”, “Degrees of arc” or “Celsius”
if channel == "D":
units = "Degrees of arc"
validmin = -360.0
validmax = 360.0 # A full circle representation
elif channel == "I":
units = "Degrees of arc"
validmin = -90.0
validmax = 90.0 # The magnetic field vector can point straight down (+90°), horizontal (0°), or straight up (-90°).
elif channel in TEMPERATURE_ELEMENTS_ID:
units = "Celsius"
validmin = -273.15 # absolute zero
validmax = 79_999
depend_0 = "DataTimes" # can be used for nonstandard element
Shavers, Nicholas H
committed
# elif channel in [REAL_TEMPERATURES]:
# units = "Celsius"
# fieldnam = f"Temperature {temperature_index} {trace.stats.location}"
# validmin = -273.15 # absolute zero
# validmax = 79_999
elif channel in ["F", "S"]:
units = "nT"
validmin = (
0.0 # negative magnetic field intestity not physically meaningful.
)
elif channel in ["X", "Y", "Z", "H", "E", "V", "G"]:
units = "nT"
# Determine DEPEND_0 based on channel type
Shavers, Nicholas H
committed
if not isUniqueTimes:
depend_0 = "DataTimes"
elif channel in self._get_vector_elements():
depend_0 = "GeomagneticVectorTimes"
depend_0 = "GeomagneticScalarTimes"
Shavers, Nicholas H
committed
# elif channel in REAL_TEMPERATURES:
# depend_0 = f"Temperature{temperature_index}Times"
"FIELDNAM": fieldnam,
"UNITS": units,
Shavers, Nicholas H
committed
"FILLVAL": self.__FILL_VALUE,
"VALIDMIN": validmin,
"VALIDMAX": validmax,
Shavers, Nicholas H
committed
"DEPEND_0": depend_0,
"DISPLAY_TYPE": "time_series",
"LABLAXIS": channel,
Shavers, Nicholas H
committed
# [[Custom Attributes]]
Shavers, Nicholas H
committed
"DATA_INTERVAL_TYPE": trace.stats.data_interval_type,
Shavers, Nicholas H
committed
def _read_cdf(self, cdf: CDFReader, channels: Optional[List[str]]) -> Stream:
"""
Read CDF data into an ObsPy Stream.
This method reads the data variables and their corresponding time
variables from a CDF file and constructs an ObsPy Stream.
Parameters:
- cdf: cdflib CDF object representing the open CDF file.
Returns:
- An ObsPy Stream containing the data from the CDF file.
"""
stream = Stream()
# Extract global attributes
global_attrs = cdf.globalattsget()
Shavers, Nicholas H
committed
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
# Define required global attributes based on ImagCDF standards
required_global_attrs = [
"FormatDescription",
"FormatVersion",
"Title",
"IagaCode",
"ElementsRecorded",
"PublicationLevel",
"PublicationDate",
"ObservatoryName",
"Latitude",
"Longitude",
"Elevation",
"Institution",
"StandardLevel",
"Source",
]
# Check for missing required global attributes per ImagCDF techincal document
missing_global_attrs = []
for attr in required_global_attrs:
if attr not in global_attrs:
missing_global_attrs.append(attr)
if missing_global_attrs:
error_message = (
f"The following required global attributes are missing from the CDF: "
f"{', '.join(missing_global_attrs)}"
)
raise TimeseriesFactoryException(error_message)
# Map global attributes to Stream-level metadata
observatory = global_attrs.get("IagaCode", [""])[0]
station_name = global_attrs.get("ObservatoryName", [""])[0]
institution = global_attrs.get("Institution", [""])[0]
latitude = global_attrs.get("Latitude", [0.0])[0]
longitude = global_attrs.get("Longitude", [0.0])[0]
Shavers, Nicholas H
committed
elevation = global_attrs.get("Elevation", [self.__FILL_VALUE])[
0
] # default to 99_999 per technical documents.
sensor_orientation = global_attrs.get("VectorSensOrient", [""])[0]
Shavers, Nicholas H
committed
publication_level = global_attrs.get("PublicationLevel", ["0"])[0]
publication = IMCDFPublicationLevel(publication_level)
terms_of_use = global_attrs.get("TermsOfUse", [""])[0]
Shavers, Nicholas H
committed
# Not expected in ImagCDF
data_type = (
publication.get_imf_data_type()
if "imf" in self.output
else publication.get_iaga2002_data_type()
)
sensor_sampling_rate = global_attrs.get("SensorSamplingRate", [None])[0]
comments = global_attrs.get("Comments", []) # keep comments as an array
Shavers, Nicholas H
committed
declination_base = global_attrs.get("DeclinationBase", [None])[0]
network = global_attrs.get("Network", [None])[0]
# Identify all time variables
for var in cdf.cdf_info().zVariables:
Shavers, Nicholas H
committed
if var.endswith("Times"):
Shavers, Nicholas H
committed
unix_times = cdfepoch.unixtime(time_data)
utc_times = [UTCDateTime(t) for t in unix_times]
# Read data variables and associate them with time variables
for var in cdf.cdf_info().zVariables:
# Skip time variables
Shavers, Nicholas H
committed
if var.endswith("Times"):
continue
# Map the variable name back to a standard channel code by removing known prefixes
# Names are like GeomagneticFieldH, GeomagneticFieldD, Temperature1, Temperature2, ...
Shavers, Nicholas H
committed
if var.startswith("GeomagneticField"):
channel = var.replace("GeomagneticField", "")
# elif var.startswith("Temperature"):
# # Temperature variables may not map directly to a geomagnetic channel
# # but to temperature sensors. We can just use the label from LABLAXIS if needed
# channel = attrs.get("LABLAXIS", var)
Shavers, Nicholas H
committed
else:
# fallback if naming doesn't match expected patterns
channel = var
if channels and channel not in channels:
continue
data = cdf.varget(var)
attrs = cdf.varattsget(var)
# Determine DEPEND_0 (the time variable name) and validate
Shavers, Nicholas H
committed
# The ImagCDF can have DataTimes, GeomagneticVectorTimes, GeomagneticScalarTimes, TemperatureNTimes (for N > 0), etc.
matched_time_key = None
for tkey in time_vars.keys():
if tkey == ts_name:
matched_time_key = tkey
break
Shavers, Nicholas H
committed
times = []
if matched_time_key in time_vars:
times = time_vars[matched_time_key]
# Determine delta (sample interval)
if len(times) > 1:
# delta as a float of seconds
delta = times[1].timestamp - times[0].timestamp
else:
# if only one sample, use default based on interval
# convert minute, second, etc. to numeric delta
delta = 60.0
delta = 1.0
delta = 3600.0
else:
# fallback, set delta to 60
delta = 60.0
Shavers, Nicholas H
committed
data_interval = time_attrs.get("DATA_INTERVAL_TYPE", None)
Shavers, Nicholas H
committed
# Required variable attributes based on ImagCDF standards
required_variable_attrs = [
"FIELDNAM",
"UNITS",
"FILLVAL",
"VALIDMIN",
"VALIDMAX",
"DISPLAY_TYPE",
"LABLAXIS",
"DEPEND_0",
Shavers, Nicholas H
committed
]
# Validate presence of required variable attributes
missing_var_attrs = []
for var_attr in required_variable_attrs:
if var_attr not in attrs:
missing_var_attrs.append(var_attr)
if missing_var_attrs or missing_global_attrs:
error_message = (
f"The variable '{var}' is missing required attributes: "
f"{', '.join(missing_var_attrs)}"
)
raise TimeseriesFactoryException(error_message)
Shavers, Nicholas H
committed
header = {
"station": observatory,
"channel": channel,
"starttime": times[0],
"delta": delta,
"geodetic_latitude": latitude,
"geodetic_longitude": longitude,
"elevation": elevation,
"sensor_orientation": "".join(sensor_orientation),
"data_type": data_type,
"station_name": station_name,
"agency_name": institution,
"conditions_of_use": terms_of_use,
}
# data points not expected in ImagCDF specs
if sensor_sampling_rate is not None:
header.update({"sensor_sampling_rate": sensor_sampling_rate})
if data_interval is not None:
header.update({"data_interval_type": data_interval})
if declination_base is not None:
header.update({"declination_base": declination_base})
Shavers, Nicholas H
committed
header.update({"filter_comments": comments})
if network is not None:
header.update({"network": network})
# Create a trace
trace = Trace(
data=data,
Shavers, Nicholas H
committed
header=header,
)
stream += trace
return stream
def _get_url(
self,
observatory: str,
date: UTCDateTime,
type: DataType = "variation",
interval: DataInterval = "minute",
channels: Optional[List[str]] = None,
) -> str:
"""
Generate the file URL specific to ImagCDF conventions.
This method constructs the filename based on the ImagCDF naming
[iaga-code]_[date-time]_[publication-level].cdf
Parameters:
- observatory: IAGA code of the observatory.
- date: Start date for the file.
- type: Data type indicating the processing level.
- interval: Data interval (e.g., 'minute', 'second').
- channels: List of channels (optional). Not Implemented
Returns:
- The formatted file URL or path.
Reference:
- ImagCDF Technical Documentation: ImagCDF File Names
publication_level = IMCDFPublicationLevel(value=type).get_level()
if interval == "year":
date_format = date.strftime("%Y")
elif interval == "month":
date_format = date.strftime("%Y%m")
elif interval == "day":
date_format = date.strftime("%Y%m%d")
elif interval == "hour":
date_format = date.strftime("%Y%m%d_%H")
elif interval == "minute":
date_format = date.strftime("%Y%m%d_%H%M")
elif interval == "second":
date_format = date.strftime("%Y%m%d_%H%M%S")
else:
raise ValueError(
f"Unsupported interval: {interval}"
) # tenhertz currently not supported
filename = f"{observatory.lower()}_{date_format}_{publication_level}.cdf"
# If the urlTemplate explicitly specifies 'stdout', return 'stdout'
if self.urlTemplate.lower() == "stdout":
return "stdout"
# Prepare parameters for templating
params = {
"date": date.datetime,
"i": self._get_interval_abbreviation(interval),
"interval": self._get_interval_name(interval),
"minute": date.hour * 60 + date.minute,
"month": date.strftime("%b").lower(),
"MONTH": date.strftime("%b").upper(),
"obs": observatory.lower(),
"OBS": observatory.upper(),
"t": publication_level,
"type": self._get_type_name(type),
"julian": date.strftime("%j"),
"year": date.strftime("%Y"),
"ymd": date.strftime("%Y%m%d"),
"dt": date_format,
}
# Attempt to use the template provided in urlTemplate
if "{" in self.urlTemplate and "}" in self.urlTemplate:
try:
return self.urlTemplate.format(**params)
except KeyError as e:
raise TimeseriesFactoryException(
f"Invalid placeholder in urlTemplate: {e}"
)
# If the urlTemplate doesn't support placeholders, assume 'file://' scheme
if self.urlTemplate.startswith("file://"):
Shavers, Nicholas H
committed
base_path = self.urlTemplate[7:]
if not base_path or base_path == "{obs}_{dt}_{t}.cdf":
base_path = os.getcwd() # Default to current working directory
Shavers, Nicholas H
committed
return os.path.join(base_path,filename)
Shavers, Nicholas H
committed
return os.path.join(self.urlTemplate, filename)
# Unsupported URL scheme
raise TimeseriesFactoryException(
f"Unsupported URL scheme in urlTemplate: {self.urlTemplate}"
)
def _get_vector_elements(self):
return {"X", "Y", "Z", "H", "D", "E", "V", "I", "F"}
def _get_scalar_elements(self):
return {"S", "G"}