diff --git a/geomagio/Controller.py b/geomagio/Controller.py index 40e6b173d28336d1fda7ec53473e9b859e435c53..96ebb28c2f5c76274d90418b7a9892c76ab52ed5 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -547,6 +547,8 @@ def get_input_factory(args): locationCode=args.locationcode, **input_factory_args, ) + elif input_type == "imagcdf": + input_factory = ImagCDFFactory(**input_factory_args) else: # stream compatible factories if input_type == "iaga2002": @@ -837,6 +839,7 @@ def parse_args(args): "xml", "covjson", "netcdf", + "imagcdf" ), default="edge", help='Input format (Default "edge")', diff --git a/geomagio/ImagCDFFactory.py b/geomagio/ImagCDFFactory.py index 0acf6eceb9abe4345941a994422781e2461659db..6ba44f3e44aa1c5f674024a948bdd266915c976c 100644 --- a/geomagio/ImagCDFFactory.py +++ b/geomagio/ImagCDFFactory.py @@ -8,7 +8,8 @@ to store geomagnetic data with high precision. References: - ImagCDF Technical Note: https://intermagnet.org/docs/technical/im_tn_8_ImagCDF.pdf - ImagCDF Official Documentation: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdfv1-2-intermagnet-exchange-format -- CDF: http://cdf.gsfc.nasa.gov/ +- CDF User Guide: https://spdf.gsfc.nasa.gov/pub/software/cdf/doc/cdf_User_Guide.pdf +- CDFLib Docs: [https://cdflib.readthedocs.io/en/latest/#, https://cdflib.readthedocs.io/en/stable/modules/index.html] """ from __future__ import absolute_import, print_function @@ -16,7 +17,7 @@ from io import BytesIO import os import sys from typing import List, Optional, Union -from datetime import datetime, timezone +from datetime import datetime, timezone, tzinfo import numpy as np from obspy import Stream, Trace, UTCDateTime from sqlalchemy import true @@ -330,6 +331,84 @@ class ImagCDFFactory(TimeseriesFactory): # Unsupported URL scheme encountered raise TimeseriesFactoryException("Unsupported URL scheme in urlTemplate") + def get_timeseries( + self, + starttime: UTCDateTime, + endtime: UTCDateTime, + add_empty_channels: bool = True, + observatory: Optional[str] = None, + channels: Optional[List[str]] = None, + type: Optional[DataType] = None, + interval: Optional[DataInterval] = None, + ) -> Stream: + observatory = observatory or self.observatory + channels = channels or self.channels + type = type or self.type + interval = interval or self.interval + + timeseries = Stream() + urlIntervals = Util.get_intervals( + starttime=starttime, endtime=endtime, size=self.urlInterval + ) + + for urlInterval in urlIntervals: + url = self._get_url( + observatory=observatory, + date=urlInterval["start"], + type=type, + interval=interval, + channels=channels, + ) + if url == 'stdout': + continue # stdout is not a valid input source + if not url.startswith("file://"): + raise TimeseriesFactoryException("Only file urls are supported for reading ImagCDF") + + url_file = Util.get_file_from_url(url, createParentDirectory=False) + if not os.path.isfile(url_file): + # If file doesn't exist, skip + continue + + try: + # Read CDF data and merge + cdf = cdflib.cdfread.CDF(url_file) + file_stream = self._read_cdf(cdf) + # Attempt to select only requested channels + selected = Stream() + for ch in channels: + selected += file_stream.select(channel=ch) + timeseries += selected + except Exception as e: + print(f"Error reading CDF file '{url_file}': {e}", file=sys.stderr) + + # After reading all intervals, merge and trim + timeseries.merge(fill_value=np.nan) + timeseries.trim( + starttime=starttime, + endtime=endtime, + nearest_sample=False, + pad=True, + fill_value=np.nan, + ) + + # If requested, add empty channels not present in data + if add_empty_channels: + present_channels = {tr.stats.channel for tr in timeseries} + missing_channels = set(channels) - present_channels + for ch in missing_channels: + empty_trace = self._get_empty_trace( + starttime=starttime, + endtime=endtime, + observatory=observatory, + channel=ch, + data_type=type, + interval=interval, + ) + timeseries += empty_trace + + timeseries.sort() + return timeseries + def _create_global_attributes(self, timeseries: Stream, channels: List[str]) -> dict: """ Create a dictionary of global attributes for the ImagCDF file. @@ -344,14 +423,19 @@ class ImagCDFFactory(TimeseriesFactory): stats = timeseries[0].stats if len(timeseries) > 0 else None # Extract metadata from stats or fallback to defaults - observatory_name = getattr(stats, 'station_name', None) or self.observatory or "Unknown Observatory" - station = getattr(stats, 'station', None) or "Unknown Iaga Code" - institution = getattr(stats, 'agency_name', None) or "Unknown Institution" + observatory_name = getattr(stats, 'station_name', None) or self.observatory or "" + station = getattr(stats, 'station', None) or "" + institution = getattr(stats, 'agency_name', None) or "" latitude = getattr(stats, 'geodetic_latitude', None) or 0.0 longitude = getattr(stats, 'geodetic_longitude', None) or 0.0 elevation = getattr(stats, 'elevation', None) or 99_999.0 + conditions_of_use = getattr(stats, 'conditions_of_use', None) or "" vector_orientation = getattr(stats, 'sensor_orientation', None) or "" data_interval_type = getattr(stats, 'data_interval_type', None) or self.interval + data_type = getattr(stats, 'data_type', None) or "variation" + sensor_sampling_rate = getattr(stats, 'sensor_sampling_rate', None) or 0.0 + comments = getattr(stats, 'filter_comments', None) or [''] + declination_base = getattr(stats, 'declination_base', None) or 0.0 publication_level = IMCDFPublicationLevel(data_type=self.type).to_string() global_attrs = { 'FormatDescription': {0: 'INTERMAGNET CDF Format'}, @@ -370,10 +454,14 @@ class ImagCDFFactory(TimeseriesFactory): 'StandardLevel': {0: 'None'}, # Set to 'None' # Temporarily Omit 'StandardName', 'StandardVersion', 'PartialStandDesc' 'Source': {0: 'institute'}, # "institute" - if the named institution provided the data, “INTERMAGNET†- if the data file has been created by INTERMAGNET from another data source, “WDC†- if the World Data Centre has created the file from another data source - # 'TermsOfUse': {0: self.getINTERMAGNETTermsOfUse()}, + 'TermsOfUse': {0: conditions_of_use}, # 'UniqueIdentifier': {0: ''}, # 'ParentIdentifiers': {0: ''}, # 'ReferenceLinks': {0: ''}, #links to /ws, plots, USGS.gov + 'SensorSamplingRate': {0: sensor_sampling_rate}, + 'DataType': {0: data_type}, + 'Comments': {0: comments}, + 'DeclinationBase': {0: declination_base}, } return global_attrs @@ -382,10 +470,12 @@ class ImagCDFFactory(TimeseriesFactory): scalar_times = None temperature_times = {} temperature_index = 1 + print(timeseries[0].stats) + for trace in timeseries: channel = trace.stats.channel times = [ - (trace.stats.starttime + trace.stats.delta * i).datetime + (trace.stats.starttime + trace.stats.delta * i).datetime.replace(tzinfo=timezone.utc) for i in range(trace.stats.npts) ] # Convert timestamps to TT2000 format required by CDF @@ -473,7 +563,7 @@ class ImagCDFFactory(TimeseriesFactory): return var_spec def _create_var_attrs(self, trace: Trace, temperature_index: Optional[int] = None, isUniqueTimes: Optional[bool] = True) -> dict: - channel = trace.stats.channel + channel = trace.stats.channel.upper() fieldnam = f"Geomagnetic Field Element {channel}" # “Geomagnetic Field Element †+ the element code or “Temperature †+ the name of the location where the temperature was recorded. units = '' # Must be one of “nTâ€, “Degrees of arc†or “Celsius†if channel == 'D': @@ -516,6 +606,7 @@ class ImagCDFFactory(TimeseriesFactory): 'DEPEND_0': depend_0 if isUniqueTimes else "DataTimes", 'DISPLAY_TYPE': 'time_series', 'LABLAXIS': channel, + 'DATA_INTERVAL_TYPE': trace.stats.data_interval_type } return var_attrs @@ -576,83 +667,126 @@ class ImagCDFFactory(TimeseriesFactory): - An ObsPy Stream containing the data from the CDF file. """ stream = Stream() - # Read time variables + + # Extract global attributes + global_attrs = cdf.globalattsget() + + # Map global attributes to Stream-level metadata + observatory = global_attrs.get('IagaCode', [''])[0] + station_name = global_attrs.get('ObservatoryName', [''])[0] + institution = global_attrs.get('Institution', [''])[0] + latitude = global_attrs.get('Latitude', [0.0])[0] + longitude = global_attrs.get('Longitude', [0.0])[0] + elevation = global_attrs.get('Elevation', [99_999.0])[0] + sensor_sampling_rate = global_attrs.get('SensorSamplingRate', [0.0])[0] + sensor_orientation = global_attrs.get('VectorSensOrient', [''])[0] + data_type = global_attrs.get('DataType', ['variation'])[0] + publication_level = global_attrs.get('PublicationLevel', ['1'])[0] + comments = global_attrs.get('Comments', ['']) + terms_of_use = global_attrs.get('TermsOfUse', [''])[0] + declination_base = global_attrs.get('DeclinationBase', [0.0])[0] + + # Identify all time variables time_vars = {} for var in cdf.cdf_info().zVariables: - if var.endswith('Time'): + if var.lower().endswith('times'): time_data = cdf.varget(var) - # Convert TT2000 to UTCDateTime - utc_times = [UTCDateTime(t) for t in cdflib.cdfepoch.to_datetime(time_data)] + unix_times = cdflib.cdfepoch.unixtime(time_data) + utc_times = [UTCDateTime(t) for t in unix_times] time_vars[var] = utc_times - # Read data variables + # Read data variables and associate them with time variables for var in cdf.cdf_info().zVariables: - if not var.endswith('Time'): - data = cdf.varget(var) - attrs = cdf.varattsget(var) - if 'DEPEND_0' in attrs: - ts_name = attrs['DEPEND_0'] - if ts_name in time_vars: - times = time_vars[ts_name] - if len(times) > 1: - delta = times[1] - times[0] # Calculate sample interval - else: - delta = 60 if self.interval == 'minute' else 1 - trace = Trace( - data=data, - header={ - 'station': self.observatory, - 'channel': var, - 'starttime': times[0], - 'delta': delta, - } - ) - stream += trace - return stream - - @staticmethod - def getINTERMAGNETTermsOfUse() -> str: - """ - Return the INTERMAGNET Terms of Use. - - These terms should be included in the 'TermsOfUse' global attribute - as per the ImagCDF specification. + # Skip time variables + if var.lower().endswith('times'): + continue + + data = cdf.varget(var) + attrs = cdf.varattsget(var) + + # Determine DEPEND_0 (the time variable name) and validate + ts_name = attrs.get('DEPEND_0') + if not ts_name: + # If no DEPEND_0, skip this variable as we cannot map times + continue + + # If we used a DataTimes fallback or similar, ensure we handle it case-insensitively + # and also confirm that time_vars keys are checked properly. + # The ImagCDF can have DataTimes, GeomagneticVectorTimes, etc. + # So try exact match first, if not found, attempt case-insensitive. + matched_time_key = None + for tkey in time_vars.keys(): + if tkey == ts_name: + matched_time_key = tkey + break + if tkey.lower() == ts_name.lower(): + matched_time_key = tkey + break + + if matched_time_key not in time_vars: + # If we cannot find the matching time variable, skip this variable + continue + + times = time_vars[matched_time_key] + + # Determine delta (sample interval) + if len(times) > 1: + # delta as a float of seconds + delta = (times[1].timestamp - times[0].timestamp) + else: + # if only one sample, use default based on interval + # convert minute, second, etc. to numeric delta + if self.interval == 'minute': + delta = 60.0 + elif self.interval == 'second': + delta = 1.0 + elif self.interval == 'hour': + delta = 3600.0 + else: + # fallback, set delta to 60 + delta = 60.0 + + # Map the variable name back to a standard channel code + # Geomagnetic fields are named like GeomagneticFieldH, GeomagneticFieldD, etc. + # Temperatures are named like Temperature1, Temperature2, ... + # Extract channel name by removing known prefixes + if var.startswith("GeomagneticField"): + channel = var.replace("GeomagneticField", "") + elif var.startswith("Temperature"): + # Temperature variables may not map directly to a geomagnetic channel + # but to temperature sensors. We can just use the label from LABLAXIS if needed + channel = attrs.get('LABLAXIS', var) + else: + # fallback if naming doesn't match expected patterns + channel = var + + time_attrs =cdf.varattsget(var) + data_interval = time_attrs.get('DATA_INTERVAL_TYPE', ['']) + # Create a trace + trace = Trace( + data=data, + header={ + 'station': observatory, + 'channel': channel, + 'starttime': times[0], + 'delta': delta, + 'geodetic_latitude': latitude, + 'geodetic_longitude': longitude, + 'elevation': elevation, + 'sensor_orientation': "".join(sensor_orientation), + 'data_type': data_type, + 'station_name': station_name, + 'agency_name': institution, + 'conditions_of_use': terms_of_use, + 'sensor_sampling_rate': sensor_sampling_rate, + 'data_interval_type': data_interval, + 'declination_base': declination_base, + 'filter_comments': comments + } + ) + stream += trace - Reference: - - ImagCDF Documentation Section 4.5: Attributes that Relate to Publication of the Data - """ - return ( - "CONDITIONS OF USE FOR DATA PROVIDED THROUGH INTERMAGNET:\n" - "The data made available through INTERMAGNET are provided for\n" - "your use and are not for commercial use or sale or distribution\n" - "to third parties without the written permission of the institute\n" - "(http://www.intermagnet.org/Institutes_e.html) operating\n" - "the observatory. Publications making use of the data\n" - "should include an acknowledgment statement of the form given below.\n" - "A citation reference should be sent to the INTERMAGNET Secretary\n" - "(secretary@intermagnet.org) for inclusion in a publications list\n" - "on the INTERMAGNET website.\n" - "\n" - " ACKNOWLEDGEMENT OF DATA FROM OBSERVATORIES\n" - " PARTICIPATING IN INTERMAGNET\n" - "We offer two acknowledgement templates. The first is for cases\n" - "where data from many observatories have been used and it is not\n" - "practical to list them all, or each of their operating institutes.\n" - "The second is for cases where research results have been produced\n" - "using a smaller set of observatories.\n" - "\n" - " Suggested Acknowledgement Text (template 1)\n" - "The results presented in this paper rely on data collected\n" - "at magnetic observatories. We thank the national institutes that\n" - "support them and INTERMAGNET for promoting high standards of\n" - "magnetic observatory practice (www.intermagnet.org).\n" - "\n" - " Suggested Acknowledgement Text (template 2)\n" - "The results presented in this paper rely on the data\n" - "collected at <observatory name>. We thank <institute name>,\n" - "for supporting its operation and INTERMAGNET for promoting high\n" - "standards of magnetic observatory practice (www.intermagnet.org).\n" - ) + return stream def _get_url( self, @@ -725,7 +859,7 @@ class ImagCDFFactory(TimeseriesFactory): "julian": date.strftime("%j"), "year": date.strftime("%Y"), "ymd": date.strftime("%Y%m%d"), - "dt": date_format, # Add the date-time formatted string + "dt": date_format, } # Attempt to use the template provided in urlTemplate @@ -751,4 +885,6 @@ class ImagCDFFactory(TimeseriesFactory): return {'X', 'Y', 'Z', 'H', 'D', 'E', 'V', 'I', 'F'} def _get_scalar_elements(self): - return {'S', 'G'} \ No newline at end of file + return {'S', 'G'} + +