diff --git a/geomagio/ImagCDFFactory.py b/geomagio/ImagCDFFactory.py index 3d8ba136623ca5b852971e830b58c6a4f8f09cf4..ea94790320d45247416ca1b4957fc66b522aec67 100644 --- a/geomagio/ImagCDFFactory.py +++ b/geomagio/ImagCDFFactory.py @@ -91,8 +91,8 @@ class ImagCDFFactory(TimeseriesFactory): This class extends the TimeseriesFactory to support writing geomagnetic time series data to files in the ImagCDF format using the cdflib library. """ - - isUniqueTimes=True #used to determine depend_0 and CDF Time Variable Name + + isUniqueTimes = True # used to determine depend_0 and CDF Time Variable Name def __init__( self, @@ -129,20 +129,20 @@ class ImagCDFFactory(TimeseriesFactory): Note: Parsing from strings is not implemented in this factory. """ raise NotImplementedError('"parse_string" not implemented') - + def write_file(self, fh, timeseries: Stream, channels: List[str]): # Create a temporary file to write the CDF data - with tempfile.NamedTemporaryFile(delete=False, suffix='.cdf') as tmp_file: + with tempfile.NamedTemporaryFile(delete=False, suffix=".cdf") as tmp_file: tmp_file_path = tmp_file.name try: # Initialize the CDF writer cdf_spec = { - 'Compressed': 9, # Enable compression (0-9) - 'Majority': CDFWriter.ROW_MAJOR, # Data layout - gets set automatically - 'Encoding': CDFWriter.IBMPC_ENCODING, # gets set automatically - 'Checksum': True, # Disable checksum for faster writes (optional) - 'rDim_sizes': [], # Applicable only if using rVariables - CDF protocol recommends only using zVariables. + "Compressed": 9, # Enable compression (0-9) + "Majority": CDFWriter.ROW_MAJOR, # Data layout - gets set automatically + "Encoding": CDFWriter.IBMPC_ENCODING, # gets set automatically + "Checksum": True, # Disable checksum for faster writes (optional) + "rDim_sizes": [], # Applicable only if using rVariables - CDF protocol recommends only using zVariables. } cdf_writer = CDFWriter(path=tmp_file_path, cdf_spec=cdf_spec, delete=True) @@ -152,53 +152,59 @@ class ImagCDFFactory(TimeseriesFactory): cdf_writer.write_globalattrs(global_attrs) # Time variables - time_vars = self._create_time_stamp_variables(timeseries) #modifies self.isUniqueTimes + time_vars = self._create_time_stamp_variables( + timeseries + ) # modifies self.isUniqueTimes for ts_name, ts_data in time_vars.items(): # Define time variable specification var_spec = { - 'Variable': ts_name, - 'Data_Type': CDFWriter.CDF_TIME_TT2000, # CDF_TIME_TT2000 - 'Num_Elements': 1, - 'Rec_Vary': True, - 'Var_Type': 'zVariable', - 'Dim_Sizes': [], - 'Sparse': 'no_sparse', - 'Compress': 9, - 'Pad': None, + "Variable": ts_name, + "Data_Type": CDFWriter.CDF_TIME_TT2000, # CDF_TIME_TT2000 + "Num_Elements": 1, + "Rec_Vary": True, + "Var_Type": "zVariable", + "Dim_Sizes": [], + "Sparse": "no_sparse", + "Compress": 9, + "Pad": None, } # Define time variable attributes var_attrs = self._create_time_var_attrs(ts_name) # Write time variable cdf_writer.write_var(var_spec, var_attrs, ts_data) - - + # Data variables temperature_index = 0 for trace in timeseries: channel = trace.stats.channel if channel in TEMPERATURE_ELEMENTS_ID: - temperature_index += 1 #MUST INCREMENT INDEX BEFORE USING + temperature_index += 1 # MUST INCREMENT INDEX BEFORE USING var_name = f"Temperature{temperature_index}" else: var_name = f"GeomagneticField{channel}" data_type = self._get_cdf_data_type(trace) num_elements = 1 - if data_type in [CDFWriter.CDF_CHAR, CDFWriter.CDF_UCHAR]: # Handle string types + if data_type in [ + CDFWriter.CDF_CHAR, + CDFWriter.CDF_UCHAR, + ]: # Handle string types num_elements = len(trace.data[0]) if len(trace.data) > 0 else 1 - + var_spec = { - 'Variable': var_name, - 'Data_Type': data_type, - 'Num_Elements': num_elements, - 'Rec_Vary': True, - 'Var_Type': 'zVariable', - 'Dim_Sizes': [], - 'Sparse': 'no_sparse', - 'Compress': 9, - 'Pad': None, + "Variable": var_name, + "Data_Type": data_type, + "Num_Elements": num_elements, + "Rec_Vary": True, + "Var_Type": "zVariable", + "Dim_Sizes": [], + "Sparse": "no_sparse", + "Compress": 9, + "Pad": None, } - var_attrs = self._create_var_attrs(trace, temperature_index, self.isUniqueTimes) + var_attrs = self._create_var_attrs( + trace, temperature_index, self.isUniqueTimes + ) # Write data variable cdf_writer.write_var(var_spec, var_attrs, trace.data) @@ -271,7 +277,7 @@ class ImagCDFFactory(TimeseriesFactory): ) # Handle 'stdout' output - if url == 'stdout': + if url == "stdout": # Write directly to stdout fh = sys.stdout.buffer url_data = timeseries.slice( @@ -282,7 +288,7 @@ class ImagCDFFactory(TimeseriesFactory): continue # Proceed to next interval if any # Handle 'file://' output - elif url.startswith('file://'): + elif url.startswith("file://"): # Get the file path from the URL url_file = Util.get_file_from_url(url, createParentDirectory=False) url_data = timeseries.slice( @@ -307,11 +313,16 @@ class ImagCDFFactory(TimeseriesFactory): channel=trace.stats.channel, ) if new_trace: - trace.data = np.concatenate((trace.data, new_trace[0].data)) + trace.data = np.concatenate( + (trace.data, new_trace[0].data) + ) url_data = existing_data + url_data except Exception as e: # Log the exception if needed - print(f"Warning: Could not read existing CDF file '{url_file}': {e}", file=sys.stderr) + print( + f"Warning: Could not read existing CDF file '{url_file}': {e}", + file=sys.stderr, + ) # Proceed with new data # Pad the data with NaNs to ensure it fits the interval @@ -329,7 +340,9 @@ class ImagCDFFactory(TimeseriesFactory): else: # Unsupported URL scheme encountered - raise TimeseriesFactoryException("Unsupported URL scheme in urlTemplate") + raise TimeseriesFactoryException( + "Unsupported URL scheme in urlTemplate" + ) def get_timeseries( self, @@ -359,10 +372,12 @@ class ImagCDFFactory(TimeseriesFactory): interval=interval, channels=channels, ) - if url == 'stdout': + if url == "stdout": continue # stdout is not a valid input source if not url.startswith("file://"): - raise TimeseriesFactoryException("Only file urls are supported for reading ImagCDF") + raise TimeseriesFactoryException( + "Only file urls are supported for reading ImagCDF" + ) url_file = Util.get_file_from_url(url, createParentDirectory=False) if not os.path.isfile(url_file): @@ -409,7 +424,9 @@ class ImagCDFFactory(TimeseriesFactory): timeseries.sort() return timeseries - def _create_global_attributes(self, timeseries: Stream, channels: List[str]) -> dict: + def _create_global_attributes( + self, timeseries: Stream, channels: List[str] + ) -> dict: """ Create a dictionary of global attributes for the ImagCDF file. @@ -423,45 +440,58 @@ class ImagCDFFactory(TimeseriesFactory): stats = timeseries[0].stats if len(timeseries) > 0 else None # Extract metadata from stats or fallback to defaults - observatory_name = getattr(stats, 'station_name', None) or self.observatory or "" - station = getattr(stats, 'station', None) or "" - institution = getattr(stats, 'agency_name', None) or "" - latitude = getattr(stats, 'geodetic_latitude', None) or 0.0 - longitude = getattr(stats, 'geodetic_longitude', None) or 0.0 - elevation = getattr(stats, 'elevation', None) or 99_999.0 - conditions_of_use = getattr(stats, 'conditions_of_use', None) or "" - vector_orientation = getattr(stats, 'sensor_orientation', None) or "" - data_interval_type = getattr(stats, 'data_interval_type', None) or self.interval - data_type = getattr(stats, 'data_type', None) or "variation" - sensor_sampling_rate = getattr(stats, 'sensor_sampling_rate', None) or 0.0 - comments = getattr(stats, 'filter_comments', None) or [''] - declination_base = getattr(stats, 'declination_base', None) or 0.0 + observatory_name = ( + getattr(stats, "station_name", None) or self.observatory or "" + ) + station = getattr(stats, "station", None) or "" + institution = getattr(stats, "agency_name", None) or "" + latitude = getattr(stats, "geodetic_latitude", None) or 0.0 + longitude = getattr(stats, "geodetic_longitude", None) or 0.0 + elevation = getattr(stats, "elevation", None) or 99_999.0 + conditions_of_use = getattr(stats, "conditions_of_use", None) or "" + vector_orientation = getattr(stats, "sensor_orientation", None) or "" + data_interval_type = getattr(stats, "data_interval_type", None) or self.interval + data_type = getattr(stats, "data_type", None) or "variation" + sensor_sampling_rate = getattr(stats, "sensor_sampling_rate", None) or 0.0 + comments = getattr(stats, "filter_comments", None) or [""] + declination_base = getattr(stats, "declination_base", None) or 0.0 publication_level = IMCDFPublicationLevel(data_type=self.type).to_string() global_attrs = { - 'FormatDescription': {0: 'INTERMAGNET CDF Format'}, - 'FormatVersion': {0: '1.2'}, - 'Title': {0: 'Geomagnetic time series data'}, - 'IagaCode': {0: station}, - 'ElementsRecorded': {0: ''.join(channels)}, - 'PublicationLevel': {0: publication_level}, - 'PublicationDate': {0: [cdflib.cdfepoch.timestamp_to_tt2000(datetime.timestamp(datetime.now(timezone.utc))), "cdf_time_tt2000"]}, - 'ObservatoryName': {0: observatory_name}, - 'Latitude': {0: np.array([latitude], dtype=np.float64)}, - 'Longitude': {0: np.array([longitude], dtype=np.float64)}, - 'Elevation': {0: np.array([elevation], dtype=np.float64)}, - 'Institution': {0: institution}, - 'VectorSensOrient': {0: vector_orientation}, #remove F - because its a calculation, not an element? - 'StandardLevel': {0: 'None'}, # Set to 'None' + "FormatDescription": {0: "INTERMAGNET CDF Format"}, + "FormatVersion": {0: "1.2"}, + "Title": {0: "Geomagnetic time series data"}, + "IagaCode": {0: station}, + "ElementsRecorded": {0: "".join(channels)}, + "PublicationLevel": {0: publication_level}, + "PublicationDate": { + 0: [ + cdflib.cdfepoch.timestamp_to_tt2000( + datetime.timestamp(datetime.now(timezone.utc)) + ), + "cdf_time_tt2000", + ] + }, + "ObservatoryName": {0: observatory_name}, + "Latitude": {0: np.array([latitude], dtype=np.float64)}, + "Longitude": {0: np.array([longitude], dtype=np.float64)}, + "Elevation": {0: np.array([elevation], dtype=np.float64)}, + "Institution": {0: institution}, + "VectorSensOrient": { + 0: vector_orientation + }, # remove F - because its a calculation, not an element? + "StandardLevel": {0: "None"}, # Set to 'None' # Temporarily Omit 'StandardName', 'StandardVersion', 'PartialStandDesc' - 'Source': {0: 'institute'}, # "institute" - if the named institution provided the data, “INTERMAGNET†- if the data file has been created by INTERMAGNET from another data source, “WDC†- if the World Data Centre has created the file from another data source - 'TermsOfUse': {0: conditions_of_use}, + "Source": { + 0: "institute" + }, # "institute" - if the named institution provided the data, “INTERMAGNET†- if the data file has been created by INTERMAGNET from another data source, “WDC†- if the World Data Centre has created the file from another data source + "TermsOfUse": {0: conditions_of_use}, # 'UniqueIdentifier': {0: ''}, # 'ParentIdentifiers': {0: ''}, - # 'ReferenceLinks': {0: ''}, #links to /ws, plots, USGS.gov - 'SensorSamplingRate': {0: sensor_sampling_rate}, #Optional - 'DataType': {0: data_type},#Optional - 'Comments': {0: comments}, #Optional - 'DeclinationBase': {0: declination_base}, #Optional + # 'ReferenceLinks': {0: ''}, #links to /ws, plots, USGS.gov + "SensorSamplingRate": {0: sensor_sampling_rate}, # Optional + "DataType": {0: data_type}, # Optional + "Comments": {0: comments}, # Optional + "DeclinationBase": {0: declination_base}, # Optional } return global_attrs @@ -474,24 +504,32 @@ class ImagCDFFactory(TimeseriesFactory): for trace in timeseries: channel = trace.stats.channel times = [ - (trace.stats.starttime + trace.stats.delta * i).datetime.replace(tzinfo=timezone.utc) + (trace.stats.starttime + trace.stats.delta * i).datetime.replace( + tzinfo=timezone.utc + ) for i in range(trace.stats.npts) ] # Convert timestamps to TT2000 format required by CDF - tt2000_times = cdflib.cdfepoch.timestamp_to_tt2000([time.timestamp() for time in times]) + tt2000_times = cdflib.cdfepoch.timestamp_to_tt2000( + [time.timestamp() for time in times] + ) if channel in self._get_vector_elements(): if vector_times is None: vector_times = tt2000_times else: if not np.array_equal(vector_times, tt2000_times): - raise ValueError("Time stamps for vector channels are not the same.") + raise ValueError( + "Time stamps for vector channels are not the same." + ) elif channel in self._get_scalar_elements(): if scalar_times is None: scalar_times = tt2000_times else: if not np.array_equal(scalar_times, tt2000_times): - raise ValueError("Time stamps for scalar channels are not the same.") + raise ValueError( + "Time stamps for scalar channels are not the same." + ) elif channel in TEMPERATURE_ELEMENTS_ID: ts_key = f"Temperature{temperature_index}Times" if ts_key not in temperature_times: @@ -502,21 +540,22 @@ class ImagCDFFactory(TimeseriesFactory): time_vars = {} if vector_times is not None: - time_vars['GeomagneticVectorTimes'] = vector_times + time_vars["GeomagneticVectorTimes"] = vector_times if scalar_times is not None: - time_vars['GeomagneticScalarTimes'] = scalar_times + time_vars["GeomagneticScalarTimes"] = scalar_times if temperature_times: time_vars.update(temperature_times) - + last_times = [] - self.isUniqueTimes = len(time_vars) == 1 #true if only one set of times, else default to false. + self.isUniqueTimes = ( + len(time_vars) == 1 + ) # true if only one set of times, else default to false. for index, times in enumerate(time_vars.values()): if index > 0: self.isUniqueTimes = not np.array_equal(last_times, times) last_times = times - - return time_vars if self.isUniqueTimes else {"DataTimes": last_times} + return time_vars if self.isUniqueTimes else {"DataTimes": last_times} def _create_var_spec( self, @@ -549,63 +588,69 @@ class ImagCDFFactory(TimeseriesFactory): - CDF User's Guide: Variable Specification """ var_spec = { - 'Variable': var_name, - 'Data_Type': data_type, - 'Num_Elements': num_elements, - 'Rec_Vary': True, - 'Var_Type': var_type, - 'Dim_Sizes': dim_sizes, - 'Sparse': 'no_sparse' if not sparse else 'pad_sparse', - 'Compress': compress, - 'Pad': pad, + "Variable": var_name, + "Data_Type": data_type, + "Num_Elements": num_elements, + "Rec_Vary": True, + "Var_Type": var_type, + "Dim_Sizes": dim_sizes, + "Sparse": "no_sparse" if not sparse else "pad_sparse", + "Compress": compress, + "Pad": pad, } return var_spec - def _create_var_attrs(self, trace: Trace, temperature_index: Optional[int] = None, isUniqueTimes: Optional[bool] = True) -> dict: + def _create_var_attrs( + self, + trace: Trace, + temperature_index: Optional[int] = None, + isUniqueTimes: Optional[bool] = True, + ) -> dict: channel = trace.stats.channel.upper() - fieldnam = f"Geomagnetic Field Element {channel}" # “Geomagnetic Field Element †+ the element code or “Temperature †+ the name of the location where the temperature was recorded. - units = '' # Must be one of “nTâ€, “Degrees of arc†or “Celsius†- if channel == 'D': - units = 'Degrees of arc' - validmin = -360.0 - validmax = 360.0 # A full circle representation - elif channel == 'I': - units = 'Degrees of arc' - validmin = -90.0 - validmax = 90.0 #The magnetic field vector can point straight down (+90°), horizontal (0°), or straight up (-90°). + fieldnam = f"Geomagnetic Field Element {channel}" # “Geomagnetic Field Element †+ the element code or “Temperature †+ the name of the location where the temperature was recorded. + units = "" # Must be one of “nTâ€, “Degrees of arc†or “Celsius†+ if channel == "D": + units = "Degrees of arc" + validmin = -360.0 + validmax = 360.0 # A full circle representation + elif channel == "I": + units = "Degrees of arc" + validmin = -90.0 + validmax = 90.0 # The magnetic field vector can point straight down (+90°), horizontal (0°), or straight up (-90°). elif channel in TEMPERATURE_ELEMENTS_ID: - units = 'Celsius' + units = "Celsius" fieldnam = f"Temperature {temperature_index} {trace.stats.location}" - validmin = -273.15 #absolute zero + validmin = -273.15 # absolute zero validmax = 79_999 - elif channel in ['F','S']: - units = 'nT' - validmin = 0.0 # negative magnetic field intestity not physically meaningful. + elif channel in ["F", "S"]: + units = "nT" + validmin = ( + 0.0 # negative magnetic field intestity not physically meaningful. + ) validmax = 79_999.0 - elif channel in ['X', 'Y', 'Z', 'H', 'E', 'V', 'G']: - units = 'nT' + elif channel in ["X", "Y", "Z", "H", "E", "V", "G"]: + units = "nT" validmin = -79_999.0 validmax = 79_999.0 # Determine DEPEND_0 based on channel type if channel in self._get_vector_elements(): - depend_0 = 'GeomagneticVectorTimes' + depend_0 = "GeomagneticVectorTimes" elif channel in self._get_scalar_elements(): - depend_0 = 'GeomagneticScalarTimes' + depend_0 = "GeomagneticScalarTimes" elif channel in TEMPERATURE_ELEMENTS_ID: depend_0 = f"Temperature{temperature_index}Times" - var_attrs = { - 'FIELDNAM': fieldnam, - 'UNITS': units, - 'FILLVAL': 99999.0, - 'VALIDMIN': validmin, - 'VALIDMAX': validmax, - 'DEPEND_0': depend_0 if isUniqueTimes else "DataTimes", - 'DISPLAY_TYPE': 'time_series', - 'LABLAXIS': channel, - 'DATA_INTERVAL_TYPE': trace.stats.data_interval_type + "FIELDNAM": fieldnam, + "UNITS": units, + "FILLVAL": 99999.0, + "VALIDMIN": validmin, + "VALIDMAX": validmax, + "DEPEND_0": depend_0 if isUniqueTimes else "DataTimes", + "DISPLAY_TYPE": "time_series", + "LABLAXIS": channel, + "DATA_INTERVAL_TYPE": trace.stats.data_interval_type, } return var_attrs @@ -619,9 +664,9 @@ class ImagCDFFactory(TimeseriesFactory): - ImagCDF Technical Documentation: ImagCDF Data """ # var_attrs = { - # 'UNITS': 'TT2000', - # 'DISPLAY_TYPE': 'time_series', - # 'LABLAXIS': 'Time', + # 'UNITS': 'TT2000', + # 'DISPLAY_TYPE': 'time_series', + # 'LABLAXIS': 'Time', # } # return var_attrs return {} @@ -666,26 +711,28 @@ class ImagCDFFactory(TimeseriesFactory): # Extract global attributes global_attrs = cdf.globalattsget() - + # Map global attributes to Stream-level metadata - observatory = global_attrs.get('IagaCode', [''])[0] - station_name = global_attrs.get('ObservatoryName', [''])[0] - institution = global_attrs.get('Institution', [''])[0] - latitude = global_attrs.get('Latitude', [0.0])[0] - longitude = global_attrs.get('Longitude', [0.0])[0] - elevation = global_attrs.get('Elevation', [99_999.0])[0] #default to 99_999 per technical documents. - sensor_sampling_rate = global_attrs.get('SensorSamplingRate', [0.0])[0] - sensor_orientation = global_attrs.get('VectorSensOrient', [''])[0] - data_type = global_attrs.get('DataType', ['variation'])[0] - publication_level = global_attrs.get('PublicationLevel', ['1'])[0] - comments = global_attrs.get('Comments', ['']) - terms_of_use = global_attrs.get('TermsOfUse', [''])[0] - declination_base = global_attrs.get('DeclinationBase', [0.0])[0] + observatory = global_attrs.get("IagaCode", [""])[0] + station_name = global_attrs.get("ObservatoryName", [""])[0] + institution = global_attrs.get("Institution", [""])[0] + latitude = global_attrs.get("Latitude", [0.0])[0] + longitude = global_attrs.get("Longitude", [0.0])[0] + elevation = global_attrs.get("Elevation", [99_999.0])[ + 0 + ] # default to 99_999 per technical documents. + sensor_sampling_rate = global_attrs.get("SensorSamplingRate", [0.0])[0] + sensor_orientation = global_attrs.get("VectorSensOrient", [""])[0] + data_type = global_attrs.get("DataType", ["variation"])[0] + publication_level = global_attrs.get("PublicationLevel", ["1"])[0] + comments = global_attrs.get("Comments", [""]) + terms_of_use = global_attrs.get("TermsOfUse", [""])[0] + declination_base = global_attrs.get("DeclinationBase", [0.0])[0] # Identify all time variables time_vars = {} for var in cdf.cdf_info().zVariables: - if var.lower().endswith('times'): + if var.lower().endswith("times"): time_data = cdf.varget(var) unix_times = cdflib.cdfepoch.unixtime(time_data) utc_times = [UTCDateTime(t) for t in unix_times] @@ -694,14 +741,14 @@ class ImagCDFFactory(TimeseriesFactory): # Read data variables and associate them with time variables for var in cdf.cdf_info().zVariables: # Skip time variables - if var.lower().endswith('times'): + if var.lower().endswith("times"): continue data = cdf.varget(var) attrs = cdf.varattsget(var) # Determine DEPEND_0 (the time variable name) and validate - ts_name = attrs.get('DEPEND_0') + ts_name = attrs.get("DEPEND_0") if not ts_name: # If no DEPEND_0, skip this variable as we cannot map times continue @@ -728,15 +775,15 @@ class ImagCDFFactory(TimeseriesFactory): # Determine delta (sample interval) if len(times) > 1: # delta as a float of seconds - delta = (times[1].timestamp - times[0].timestamp) + delta = times[1].timestamp - times[0].timestamp else: # if only one sample, use default based on interval # convert minute, second, etc. to numeric delta - if self.interval == 'minute': + if self.interval == "minute": delta = 60.0 - elif self.interval == 'second': + elif self.interval == "second": delta = 1.0 - elif self.interval == 'hour': + elif self.interval == "hour": delta = 3600.0 else: # fallback, set delta to 60 @@ -751,34 +798,34 @@ class ImagCDFFactory(TimeseriesFactory): elif var.startswith("Temperature"): # Temperature variables may not map directly to a geomagnetic channel # but to temperature sensors. We can just use the label from LABLAXIS if needed - channel = attrs.get('LABLAXIS', var) + channel = attrs.get("LABLAXIS", var) else: # fallback if naming doesn't match expected patterns channel = var - time_attrs =cdf.varattsget(var) - data_interval = time_attrs.get('DATA_INTERVAL_TYPE', ['']) + time_attrs = cdf.varattsget(var) + data_interval = time_attrs.get("DATA_INTERVAL_TYPE", [""]) # Create a trace trace = Trace( data=data, header={ - 'station': observatory, - 'channel': channel, - 'starttime': times[0], - 'delta': delta, - 'geodetic_latitude': latitude, - 'geodetic_longitude': longitude, - 'elevation': elevation, - 'sensor_orientation': "".join(sensor_orientation), - 'data_type': data_type, - 'station_name': station_name, - 'agency_name': institution, - 'conditions_of_use': terms_of_use, - 'sensor_sampling_rate': sensor_sampling_rate, - 'data_interval_type': data_interval, - 'declination_base': declination_base, - 'filter_comments': comments - } + "station": observatory, + "channel": channel, + "starttime": times[0], + "delta": delta, + "geodetic_latitude": latitude, + "geodetic_longitude": longitude, + "elevation": elevation, + "sensor_orientation": "".join(sensor_orientation), + "data_type": data_type, + "station_name": station_name, + "agency_name": institution, + "conditions_of_use": terms_of_use, + "sensor_sampling_rate": sensor_sampling_rate, + "data_interval_type": data_interval, + "declination_base": declination_base, + "filter_comments": comments, + }, ) stream += trace @@ -797,8 +844,8 @@ class ImagCDFFactory(TimeseriesFactory): This method constructs the filename based on the ImagCDF naming conventions, which include the observatory code, date-time formatted - according to the data interval, and the publication level. - + according to the data interval, and the publication level. + [iaga-code]_[date-time]_[publication-level].cdf Parameters: @@ -831,7 +878,9 @@ class ImagCDFFactory(TimeseriesFactory): elif interval == "second": date_format = date.strftime("%Y%m%d_%H%M%S") else: - raise ValueError(f"Unsupported interval: {interval}") #tenhertz currently not supported + raise ValueError( + f"Unsupported interval: {interval}" + ) # tenhertz currently not supported # Filename following ImagCDF convention, see reference: https://tech-man.intermagnet.org/latest/appendices/dataformats.html#imagcdf-file-names filename = f"{observatory.lower()}_{date_format}_{publication_level}.cdf" @@ -859,11 +908,13 @@ class ImagCDFFactory(TimeseriesFactory): } # Attempt to use the template provided in urlTemplate - if "{" in self.urlTemplate and "}" in self.urlTemplate: + if "{" in self.urlTemplate and "}" in self.urlTemplate: try: return self.urlTemplate.format(**params) except KeyError as e: - raise TimeseriesFactoryException(f"Invalid placeholder in urlTemplate: {e}") + raise TimeseriesFactoryException( + f"Invalid placeholder in urlTemplate: {e}" + ) # If the urlTemplate doesn't support placeholders, assume 'file://' scheme if self.urlTemplate.startswith("file://"): @@ -878,9 +929,7 @@ class ImagCDFFactory(TimeseriesFactory): ) def _get_vector_elements(self): - return {'X', 'Y', 'Z', 'H', 'D', 'E', 'V', 'I', 'F'} - - def _get_scalar_elements(self): - return {'S', 'G'} - + return {"X", "Y", "Z", "H", "D", "E", "V", "I", "F"} + def _get_scalar_elements(self): + return {"S", "G"}