From 066e59b4eca583d786acb5d488324d1a6a5e52ce Mon Sep 17 00:00:00 2001 From: Nicholas Shavers <nshavers@contractor.usgs.gov> Date: Fri, 13 Dec 2024 09:13:41 -0800 Subject: [PATCH] Nonstand elements implemented. Temperatures un-implemented. no longer extends streamfactory on imagcdf input, parse string removed. basic is valid imagcdf logic. --- geomagio/imagcdf/ImagCDFFactory.py | 256 +++++++++++++++++------------ 1 file changed, 148 insertions(+), 108 deletions(-) diff --git a/geomagio/imagcdf/ImagCDFFactory.py b/geomagio/imagcdf/ImagCDFFactory.py index a7838f14..0c63bb70 100644 --- a/geomagio/imagcdf/ImagCDFFactory.py +++ b/geomagio/imagcdf/ImagCDFFactory.py @@ -101,7 +101,7 @@ class ImagCDFFactory(TimeseriesFactory): channels: List[str] = ("H", "D", "Z", "F"), type: DataType = "variation", interval: DataInterval = "minute", - urlTemplate="file://{obs}_{dt}_{t}.cdf", + urlTemplate="file://etc/imagcdf/{obs}_{dt}_{t}.cdf", urlInterval: int = -1, ): """ @@ -134,7 +134,7 @@ class ImagCDFFactory(TimeseriesFactory): cdf_spec = { "Compressed": 9, # Enable compression (0-9) "Majority": CDFWriter.ROW_MAJOR, # Data layout - gets set automatically - "Encoding": CDFWriter.IBMPC_ENCODING, # gets set automatically + "Encoding": CDFWriter.HOST_ENCODING, # gets set automatically "Checksum": True, # Disable checksum for faster writes (optional) "rDim_sizes": [], # Applicable only if using rVariables - CDF protocol recommends only using zVariables. } @@ -259,8 +259,9 @@ class ImagCDFFactory(TimeseriesFactory): for urlInterval in urlIntervals: interval_start = urlInterval["start"] interval_end = urlInterval["end"] - if interval_start != interval_end: - interval_end = interval_end - delta + # Removes last data point ex: if endtime = 02:00:00, this could return 01:59:00 as last data point. + # if interval_start != interval_end: + # interval_end = interval_end - delta url = self._get_url( observatory=observatory, date=interval_start, @@ -290,38 +291,16 @@ class ImagCDFFactory(TimeseriesFactory): ) # Check if the file already exists to merge data if os.path.isfile(url_file): - try: - # Read existing data to merge with new data - existing_cdf = CDFReader(url_file) - existing_stream = self._read_cdf(existing_cdf) - # existing_cdf.close() #no close method? - existing_data = existing_stream - - # Merge existing data with new data - for trace in existing_data: - new_trace = url_data.select( - network=trace.stats.network, - station=trace.stats.station, - channel=trace.stats.channel, - ) - if new_trace: - trace.data = np.concatenate( - (trace.data, new_trace[0].data) - ) - url_data = existing_data + url_data - except Exception as e: - # Log the exception if needed - print( - f"Warning: Could not read existing CDF file '{url_file}': {e}", - file=sys.stderr, - ) - # Proceed with new data - + os.remove(url_file) + print(f"Naming conflict. Deleting exisiting file '{url_file}'.") + # raise TimeseriesFactoryException( + # f"Error: File '{url_file}' already exists." + # ) # Pad the data to ensure it fits the interval url_data.trim( starttime=interval_start, endtime=interval_end, - nearest_sample=False, + nearest_sample=True, pad=True, fill_value=99_999, # FILLVAL ) @@ -370,7 +349,6 @@ class ImagCDFFactory(TimeseriesFactory): raise TimeseriesFactoryException( "Only file urls are supported for reading ImagCDF" ) - url_file = Util.get_file_from_url(url, createParentDirectory=False) if not os.path.isfile(url_file): # If file doesn't exist, skip @@ -379,23 +357,24 @@ class ImagCDFFactory(TimeseriesFactory): try: # Read CDF data and merge cdf = CDFReader(url_file) - file_stream = self._read_cdf(cdf) - # Attempt to select only requested channels - selected = Stream() - for ch in channels: - selected += file_stream.select(channel=ch) - timeseries += selected + # file_stream = self._read_cdf(cdf, channels) + timeseries = self._read_cdf(cdf, channels) + # Attempt to select only requested channelws (redundant as read_cdf can more efficiently filter) + # selected = Stream() + # for ch in channels: + # selected += file_stream.select(channel=ch) + # timeseries += selected except Exception as e: print(f"Error reading CDF file '{url_file}': {e}", file=sys.stderr) # After reading all intervals, merge and trim - timeseries.merge(fill_value=np.nan) + timeseries.merge(fill_value=99_999) timeseries.trim( starttime=starttime, endtime=endtime, - nearest_sample=False, + nearest_sample=True, pad=True, - fill_value=np.nan, + fill_value=99_999, ) # If requested, add empty channels not present in data @@ -416,46 +395,47 @@ class ImagCDFFactory(TimeseriesFactory): timeseries.sort() return timeseries - def parse_string(self, data: str, **kwargs): - """ - Parse ImagCDF binary data into an ObsPy Stream. - - This method writes the provided binary data to a temporary file, - reads the file using `cdflib`, and converts the data into an ObsPy - Stream. - - Parameters - ---------- - data : bytes - Binary data containing ImagCDF content. - - Returns - ------- - Stream - An ObsPy Stream object with the parsed geomagnetic time series data. - - Raises - ------ - TimeseriesFactoryException - If an error occurs while parsing the ImagCDF data. - """ - # Create a temporary file to store the CDF data - with tempfile.NamedTemporaryFile(delete=False, suffix=".cdf") as tmp_file: - tmp_file_name = tmp_file.name - tmp_file.write(data) - - try: - # Read the CDF from the temporary file - cdf = CDFReader(tmp_file_name) - stream = self._read_cdf(cdf) - # no cdf.close() method required - except Exception as e: - raise TimeseriesFactoryException(f"Error parsing ImagCDF data: {e}") - finally: - # Clean up the temporary file - os.remove(tmp_file_name) - - return stream + # Removed - cdflib takes a file path as an input more efficiently than taking in byte data. + # def parse_string(self, data: str, **kwargs): + # """ + # Parse ImagCDF binary data into an ObsPy Stream. + + # This method writes the provided binary data to a temporary file, + # reads the file using `cdflib`, and converts the data into an ObsPy + # Stream. + + # Parameters + # ---------- + # data : bytes + # Binary data containing ImagCDF content. + + # Returns + # ------- + # Stream + # An ObsPy Stream object with the parsed geomagnetic time series data. + + # Raises + # ------ + # TimeseriesFactoryException + # If an error occurs while parsing the ImagCDF data. + # """ + # # Create a temporary file to store the CDF data + # with tempfile.NamedTemporaryFile(delete=False, suffix=".cdf") as tmp_file: + # tmp_file_name = tmp_file.name + # tmp_file.write(data) + # channels = kwargs.get('channels', []) + # try: + # # Read the CDF from the temporary file + # cdf = CDFReader(tmp_file_name) + # stream = self._read_cdf(cdf, channels) + # # no cdf.close() method required + # except Exception as e: + # raise TimeseriesFactoryException(f"Error parsing ImagCDF data: {e}") + # finally: + # # Clean up the temporary file + # os.remove(tmp_file_name) + + # return stream def _create_global_attributes( self, timeseries: Stream, channels: List[str] @@ -674,6 +654,7 @@ class ImagCDFFactory(TimeseriesFactory): units = "Celsius" validmin = -273.15 # absolute zero validmax = 79_999 + depend_0 = "DataTimes" #can be used for nonstandard element # elif channel in [REAL_TEMPERATURES]: # units = "Celsius" # fieldnam = f"Temperature {temperature_index} {trace.stats.location}" @@ -753,7 +734,7 @@ class ImagCDFFactory(TimeseriesFactory): # Default to double precision float return CDFWriter.CDF_DOUBLE - def _read_cdf(self, cdf: CDFReader) -> Stream: + def _read_cdf(self, cdf: CDFReader, channels: Optional[List[str]]) -> Stream: """ Read CDF data into an ObsPy Stream. @@ -771,6 +752,37 @@ class ImagCDFFactory(TimeseriesFactory): # Extract global attributes global_attrs = cdf.globalattsget() + # Define required global attributes based on ImagCDF standards + required_global_attrs = [ + "FormatDescription", + "FormatVersion", + "Title", + "IagaCode", + "ElementsRecorded", + "PublicationLevel", + "PublicationDate", + "ObservatoryName", + "Latitude", + "Longitude", + "Elevation", + "Institution", + "StandardLevel", + "Source", + ] + + # Check for missing required global attributes per ImagCDF techincal document + missing_global_attrs = [] + for attr in required_global_attrs: + if attr not in global_attrs: + missing_global_attrs.append(attr) + + if missing_global_attrs: + error_message = ( + f"The following required global attributes are missing from the CDF: " + f"{', '.join(missing_global_attrs)}" + ) + raise TimeseriesFactoryException(error_message) + # Map global attributes to Stream-level metadata observatory = global_attrs.get("IagaCode", [""])[0] station_name = global_attrs.get("ObservatoryName", [""])[0] @@ -791,7 +803,7 @@ class ImagCDFFactory(TimeseriesFactory): # Identify all time variables time_vars = {} for var in cdf.cdf_info().zVariables: - if var.lower().endswith("times"): + if var.endswith("Times"): time_data = cdf.varget(var) unix_times = cdflib.cdfepoch.unixtime(time_data) utc_times = [UTCDateTime(t) for t in unix_times] @@ -799,18 +811,34 @@ class ImagCDFFactory(TimeseriesFactory): # Read data variables and associate them with time variables for var in cdf.cdf_info().zVariables: + # Skip time variables - if var.lower().endswith("times"): + if var.endswith("Times"): continue + # Map the variable name back to a standard channel code + # Geomagnetic fields are named like GeomagneticFieldH, GeomagneticFieldD, etc. + # Temperatures are named like Temperature1, Temperature2, ... + # Extract channel name by removing known prefixes + if var.startswith("GeomagneticField"): + channel = var.replace("GeomagneticField", "") + elif var.startswith("Temperature"): + # Temperature variables may not map directly to a geomagnetic channel + # but to temperature sensors. We can just use the label from LABLAXIS if needed + channel = attrs.get("LABLAXIS", var) + else: + # fallback if naming doesn't match expected patterns + channel = var + + if channels and channel not in channels: continue data = cdf.varget(var) attrs = cdf.varattsget(var) # Determine DEPEND_0 (the time variable name) and validate ts_name = attrs.get("DEPEND_0") - if not ts_name: - # If no DEPEND_0, skip this variable as we cannot map times - continue + # if not ts_name: + # # If no DEPEND_0, skip this variable as we cannot map times + # continue # The ImagCDF can have DataTimes, GeomagneticVectorTimes, GeomagneticScalarTimes, TemperatureNTimes (for N > 0), etc. matched_time_key = None @@ -819,11 +847,12 @@ class ImagCDFFactory(TimeseriesFactory): matched_time_key = tkey break - if matched_time_key not in time_vars: - # If we cannot find the matching time variable, skip this variable - continue - - times = time_vars[matched_time_key] + # if matched_time_key not in time_vars: + # # If we cannot find the matching time variable, skip this variable + # continue + times = [] + if matched_time_key in time_vars: + times = time_vars[matched_time_key] # Determine delta (sample interval) if len(times) > 1: @@ -842,22 +871,33 @@ class ImagCDFFactory(TimeseriesFactory): # fallback, set delta to 60 delta = 60.0 - # Map the variable name back to a standard channel code - # Geomagnetic fields are named like GeomagneticFieldH, GeomagneticFieldD, etc. - # Temperatures are named like Temperature1, Temperature2, ... - # Extract channel name by removing known prefixes - if var.startswith("GeomagneticField"): - channel = var.replace("GeomagneticField", "") - elif var.startswith("Temperature"): - # Temperature variables may not map directly to a geomagnetic channel - # but to temperature sensors. We can just use the label from LABLAXIS if needed - channel = attrs.get("LABLAXIS", var) - else: - # fallback if naming doesn't match expected patterns - channel = var - time_attrs = cdf.varattsget(var) data_interval = time_attrs.get("DATA_INTERVAL_TYPE", [""]) + + # Required variable attributes based on ImagCDF standards + required_variable_attrs = [ + "FIELDNAM", + "UNITS", + "FILLVAL", + "VALIDMIN", + "VALIDMAX", + "DISPLAY_TYPE", + "LABLAXIS", + "DEPEND_0" + ] + # Validate presence of required variable attributes + missing_var_attrs = [] + for var_attr in required_variable_attrs: + if var_attr not in attrs: + missing_var_attrs.append(var_attr) + + if missing_var_attrs or missing_global_attrs: + error_message = ( + f"The variable '{var}' is missing required attributes: " + f"{', '.join(missing_var_attrs)}" + ) + raise TimeseriesFactoryException(error_message) + # Create a trace trace = Trace( data=data, @@ -974,7 +1014,7 @@ class ImagCDFFactory(TimeseriesFactory): base_path = self.urlTemplate[7:] if not base_path or base_path == "{obs}_{dt}_{t}.cdf": base_path = os.getcwd() # Default to current working directory - return os.path.join(base_path, filename) + return os.path.join(base_path, "etc","imagcdf", filename) return os.path.join(self.urlTemplate, filename) # Unsupported URL scheme -- GitLab