From 2fbc669393e62eca250079eb10effeb870533790 Mon Sep 17 00:00:00 2001 From: Nicholas Shavers <nshavers@contractor.usgs.gov> Date: Fri, 27 Dec 2024 15:09:12 -0800 Subject: [PATCH] poc for CovJSON read/write --- geomagio/Controller.py | 9 + geomagio/api/covjson/CovJSONFactory.py | 322 +++++++++++++++++++++++++ 2 files changed, 331 insertions(+) create mode 100644 geomagio/api/covjson/CovJSONFactory.py diff --git a/geomagio/Controller.py b/geomagio/Controller.py index 64138ed7..486ae145 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -7,6 +7,9 @@ from typing import List, Optional, Tuple, Union from obspy.core import Stream, UTCDateTime +from geomagio.api.covjson.CovJSONFactory import CovJSONFactory + + from .algorithm import Algorithm, algorithms, AlgorithmException, FilterAlgorithm from .DerivedTimeseriesFactory import DerivedTimeseriesFactory from .PlotTimeseriesFactory import PlotTimeseriesFactory @@ -562,6 +565,8 @@ def get_input_factory(args): ) elif input_type == "xml": input_factory = xml.XMLFactory(**input_factory_args) + elif input_type == "covjson": + input_factory = CovJSONFactory(**input_factory_args) # wrap stream if input_stream is not None: input_factory = StreamTimeseriesFactory( @@ -631,6 +636,8 @@ def get_output_factory(args): output_factory = iaga2002.IAGA2002Factory(**output_factory_args) elif output_type == "imfjson": output_factory = imfjson.IMFJSONFactory(**output_factory_args) + elif output_type == "covjson": + output_factory = CovJSONFactory(**output_factory_args) elif output_type == "pcdcp": output_factory = pcdcp.PCDCPFactory(**output_factory_args) elif output_type == "temperature": @@ -820,6 +827,7 @@ def parse_args(args): "miniseed", "pcdcp", "xml", + "covjson", ), default="edge", help='Input format (Default "edge")', @@ -1025,6 +1033,7 @@ def parse_args(args): "temperature", "vbf", "xml", + 'covjson', ), # TODO: set default to 'iaga2002' help="Output format", diff --git a/geomagio/api/covjson/CovJSONFactory.py b/geomagio/api/covjson/CovJSONFactory.py new file mode 100644 index 00000000..df26ca10 --- /dev/null +++ b/geomagio/api/covjson/CovJSONFactory.py @@ -0,0 +1,322 @@ +import json +import math +from collections import OrderedDict + +import numpy as np +from obspy import Stream, Trace, UTCDateTime + +from geomagio.TimeseriesFactory import TimeseriesFactory +from geomagio.api.ws.Element import ELEMENT_INDEX + + +class CovJSONFactory(TimeseriesFactory): + """Factory for reading/writing CovJSON format Geomagnetic Data.""" + + def __init__( + self, + time_format="iso8601", # could be "iso8601" or "numeric" + time_axis_mode="expanded", # could be "expanded" or "start-stop-num" + **kwargs + ): + """ + Parameters + ---------- + time_format : {"iso8601", "numeric"} + - "iso8601": each time value is an ISO8601 string + - "numeric": each time value is numeric (epoch milliseconds) + time_axis_mode : {"expanded", "start-stop-num"} + - "expanded": store domain axes as a list of all time values + - "start-stop-num": store domain axes as start, stop, and num + """ + super().__init__(**kwargs) + self.time_format = time_format + self.time_axis_mode = time_axis_mode + + def parse_string(self, data: str, **kwargs): + """ + Parse a CovJSON string into an ObsPy Stream. + + Supports both "expanded" (t->values) and "start-stop-num" domain axes: + - If domain->axes->t->values is found, each entry is a time sample. + - Otherwise, if domain->axes->t has start, stop, and num, + generate times using linear increments (assuming uniform sampling). + + For the time format: + - If times are numeric, they are interpreted as epoch milliseconds. + - If times are strings, they are interpreted as ISO8601 (parsed by UTCDateTime). + + Returns an empty Stream if no valid time axis can be found. + """ + covjson = json.loads(data) + domain = covjson.get("domain", {}) + ranges = covjson.get("ranges", {}) + info = covjson.get("geomag:info", {}) + + # domain->axes->(x, y, z, t) + axes = domain.get("axes", {}) + x_axis = axes.get("x", {}).get("values", []) + y_axis = axes.get("y", {}).get("values", []) + z_axis = axes.get("z", {}).get("values", []) + t_axis = axes.get("t", {}) + + # Determine the times from t_axis: + times = [] + if "values" in t_axis: + # "expanded" approach + raw_values = t_axis["values"] + for val in raw_values: + # If numeric, interpret as epoch ms; if string, parse ISO8601 + if isinstance(val, (int, float)): + # interpret as epoch ms + times.append(UTCDateTime(val / 1000.0)) + else: + # interpret as ISO8601 + times.append(UTCDateTime(val)) + elif {"start", "stop", "num"}.issubset(t_axis): + # "start-stop-num" approach + start_raw = t_axis["start"] + stop_raw = t_axis["stop"] + num = t_axis["num"] + + # parse start, stop + if isinstance(start_raw, (int, float)): + start_dt = UTCDateTime(start_raw / 1000.0) + else: + start_dt = UTCDateTime(start_raw) + + if isinstance(stop_raw, (int, float)): + stop_dt = UTCDateTime(stop_raw / 1000.0) + else: + stop_dt = UTCDateTime(stop_raw) + + if num <= 1: + # trivial case: single sample + times = [start_dt] + else: + # create uniform times from start to stop + step = (stop_dt.timestamp - start_dt.timestamp) / (num - 1) + for i in range(num): + times.append(UTCDateTime(start_dt.timestamp + i * step)) + else: + # no recognized time axis => return empty stream + return Stream() + + # If still no times, return empty + if not times: + return Stream() + + lon = float(x_axis[0] if x_axis else info.get("longitude", 0.0)) + lat = float(y_axis[0] if y_axis else info.get("latitude", 0.0)) + alt = float(z_axis[0] if z_axis else info.get("altitude", 0.0)) + + station = info.get("iaga_code", "") + data_type = info.get("data_type", "") + data_interval_type = info.get("data_interval_type", "") + station_name = info.get("station_name", station) + + stream = Stream() + + # Build traces from ranges + for channel_name, channel_range in ranges.items(): + values = channel_range.get("values", []) + if not values: + continue + + data_array = np.array(values, dtype=float) + stats = { + "network": "", + "station": station, + "channel": channel_name, + "starttime": times[0], + "npts": len(data_array), + } + + # Compute sampling_rate if multiple times + if len(times) > 1: + dt = times[1].timestamp - times[0].timestamp + stats["sampling_rate"] = 1.0 / dt if dt != 0 else 1.0 + else: + stats["sampling_rate"] = 1.0 + + # Additional metadata + stats["geodetic_longitude"] = lon + stats["geodetic_latitude"] = lat + stats["elevation"] = alt + stats["station_name"] = station_name + stats["data_type"] = data_type + stats["data_interval_type"] = data_interval_type + + trace = Trace(data=data_array, header=stats) + stream += trace + + return stream + + def write_file(self, fh, timeseries: Stream, channels): + """ + Write a CovJSON coverage. Currently uses "expanded" time axis and iso8601 times. + """ + if not timeseries or len(timeseries) == 0: + fh.write(json.dumps({}, indent=2).encode("utf-8")) + return + + timeseries.merge() + + # Filter channels + if channels: + new_stream = Stream() + for ch in channels: + new_stream += timeseries.select(channel=ch) + timeseries = new_stream + + timeseries.sort(keys=["starttime"]) + tr = timeseries[0] + stats = tr.stats + + station = stats.station or "" + lat = float(getattr(stats, "geodetic_latitude", 0.0)) + lon = float(getattr(stats, "geodetic_longitude", 0.0)) + alt = float(getattr(stats, "elevation", 0.0)) + data_type = getattr(stats, "data_type", str(self.type)) + data_interval_type = getattr(stats, "data_interval_type", str(self.interval)) + station_name = getattr(stats, "station_name", station) + + reported_orientation = "".join(channels) + sensor_orientation = getattr(stats, "sensor_orientation", "") + digital_sampling_rate = getattr(stats, "digital_sampling_rate", 0.0) + sampling_period = getattr(stats, "sampling_period", 0.0) + + npts = tr.stats.npts + delta = tr.stats.delta + starttime = tr.stats.starttime + + if self.time_axis_mode == "expanded": + time_values = [] + for i in range(npts): + current_time = starttime + i * delta + if self.time_format == "numeric": + # e.g. convert to epoch milliseconds + time_values.append(int(current_time.timestamp * 1000)) + else: + # default iso8601 + time_values.append(current_time.isoformat()) + time_axis_dict = {"values": time_values} + else: + # for "start-stop-num" approach + # numeric epoch example + if self.time_format == "numeric": + start_val = int(starttime.timestamp * 1000) + end_val = int((starttime + (npts - 1) * delta).timestamp * 1000) + else: + # if iso8601, you might store e.g. start/end as iso strings + start_val = starttime.isoformat() + end_val = (starttime + (npts - 1) * delta).isoformat() + + time_axis_dict = { + "start": start_val, + "stop": end_val, + "num": npts, + } + + domain = OrderedDict() + domain["type"] = "Domain" + domain["domainType"] = "PointSeries" + domain["axes"] = { + "x": {"values": [lon]}, + "y": {"values": [lat]}, + "z": {"values": [alt]}, + "t": time_axis_dict, # either start-stop-num or expanded + } + domain["referencing"] = [ + { + "coordinates": ["x", "y", "z"], + "system": { + "type": "GeographicCRS", + "id": "http://www.opengis.net/def/crs/OGC/1.3/CRS84", + }, + }, + { + "coordinates": ["t"], + "system": { + "type": "TemporalRS", + "id": "Gregorian", + "calendar": "Gregorian", + }, + }, + ] + + parameters = OrderedDict() + ranges_obj = OrderedDict() + + for trace in timeseries: + ch_name = trace.stats.channel + element_info = ELEMENT_INDEX.get(ch_name) + if element_info: + param_description = element_info.name + param_unit = element_info.units + param_label = element_info.abbreviation or ch_name + else: + param_description = f"Data for channel {ch_name}" + param_unit = "nT" + param_label = ch_name + + param_meta = { + "type": "Parameter", + "description": {"en": param_description}, + "unit": { + "label": {"en": param_unit}, + "symbol": { + "type": "https://intermagnet.org/faq/10.geomagnetic-comp.html", + "value": param_unit, + }, + }, + "observedProperty": { + "id": "https://intermagnet.org/faq/10.geomagnetic-comp.html", + "label": {"en": param_label}, + }, + } + parameters[ch_name] = param_meta + + arr = trace.data + cov_range = OrderedDict() + cov_range["type"] = "NdArray" + cov_range["dataType"] = "float" + cov_range["axisNames"] = ["t"] + cov_range["shape"] = [len(arr)] + cov_range["values"] = [float(v) for v in arr] + ranges_obj[ch_name] = cov_range + + coverage = OrderedDict() + coverage["type"] = "Coverage" + coverage["domain"] = domain + coverage["parameters"] = parameters + coverage["ranges"] = ranges_obj + + coverage["geomag:info"] = { + "institute": getattr(stats, "agency_name", ""), + "latitude": lat, + "longitude": lon, + "altitude": alt, + "station_name": station_name, + "iaga_code": station, + "reported_orientation": reported_orientation, + "sensor_orientation": sensor_orientation, + "digital_sampling_rate": digital_sampling_rate, + "data_interval_type": data_interval_type, + "data_type": data_type, + "sampling_period": sampling_period, + } + # Optional: Possibly unsupported + optional_fields = { + "network": getattr(stats, "network", None), + "location": getattr(stats, "location", None), + "comments": getattr(stats, "filter_comments", None), + "declination_base":getattr(stats, "declincation_base", None), + "terms_of_use": getattr(stats, "conditions_of_use", None) + } + for key, value in optional_fields.items(): + if value is not None: + coverage["geomag:info"][key] = value + + + covjson_str = json.dumps(coverage, indent=2) + fh.write(covjson_str.encode("utf-8")) -- GitLab