From 2fbc669393e62eca250079eb10effeb870533790 Mon Sep 17 00:00:00 2001 From: Nicholas Shavers <nshavers@contractor.usgs.gov> Date: Fri, 27 Dec 2024 15:09:12 -0800 Subject: [PATCH 1/7] poc for CovJSON read/write --- geomagio/Controller.py | 9 + geomagio/api/covjson/CovJSONFactory.py | 322 +++++++++++++++++++++++++ 2 files changed, 331 insertions(+) create mode 100644 geomagio/api/covjson/CovJSONFactory.py diff --git a/geomagio/Controller.py b/geomagio/Controller.py index 64138ed7..486ae145 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -7,6 +7,9 @@ from typing import List, Optional, Tuple, Union from obspy.core import Stream, UTCDateTime +from geomagio.api.covjson.CovJSONFactory import CovJSONFactory + + from .algorithm import Algorithm, algorithms, AlgorithmException, FilterAlgorithm from .DerivedTimeseriesFactory import DerivedTimeseriesFactory from .PlotTimeseriesFactory import PlotTimeseriesFactory @@ -562,6 +565,8 @@ def get_input_factory(args): ) elif input_type == "xml": input_factory = xml.XMLFactory(**input_factory_args) + elif input_type == "covjson": + input_factory = CovJSONFactory(**input_factory_args) # wrap stream if input_stream is not None: input_factory = StreamTimeseriesFactory( @@ -631,6 +636,8 @@ def get_output_factory(args): output_factory = iaga2002.IAGA2002Factory(**output_factory_args) elif output_type == "imfjson": output_factory = imfjson.IMFJSONFactory(**output_factory_args) + elif output_type == "covjson": + output_factory = CovJSONFactory(**output_factory_args) elif output_type == "pcdcp": output_factory = pcdcp.PCDCPFactory(**output_factory_args) elif output_type == "temperature": @@ -820,6 +827,7 @@ def parse_args(args): "miniseed", "pcdcp", "xml", + "covjson", ), default="edge", help='Input format (Default "edge")', @@ -1025,6 +1033,7 @@ def parse_args(args): "temperature", "vbf", "xml", + 'covjson', ), # TODO: set default to 'iaga2002' help="Output format", diff --git a/geomagio/api/covjson/CovJSONFactory.py b/geomagio/api/covjson/CovJSONFactory.py new file mode 100644 index 00000000..df26ca10 --- /dev/null +++ b/geomagio/api/covjson/CovJSONFactory.py @@ -0,0 +1,322 @@ +import json +import math +from collections import OrderedDict + +import numpy as np +from obspy import Stream, Trace, UTCDateTime + +from geomagio.TimeseriesFactory import TimeseriesFactory +from geomagio.api.ws.Element import ELEMENT_INDEX + + +class CovJSONFactory(TimeseriesFactory): + """Factory for reading/writing CovJSON format Geomagnetic Data.""" + + def __init__( + self, + time_format="iso8601", # could be "iso8601" or "numeric" + time_axis_mode="expanded", # could be "expanded" or "start-stop-num" + **kwargs + ): + """ + Parameters + ---------- + time_format : {"iso8601", "numeric"} + - "iso8601": each time value is an ISO8601 string + - "numeric": each time value is numeric (epoch milliseconds) + time_axis_mode : {"expanded", "start-stop-num"} + - "expanded": store domain axes as a list of all time values + - "start-stop-num": store domain axes as start, stop, and num + """ + super().__init__(**kwargs) + self.time_format = time_format + self.time_axis_mode = time_axis_mode + + def parse_string(self, data: str, **kwargs): + """ + Parse a CovJSON string into an ObsPy Stream. + + Supports both "expanded" (t->values) and "start-stop-num" domain axes: + - If domain->axes->t->values is found, each entry is a time sample. + - Otherwise, if domain->axes->t has start, stop, and num, + generate times using linear increments (assuming uniform sampling). + + For the time format: + - If times are numeric, they are interpreted as epoch milliseconds. + - If times are strings, they are interpreted as ISO8601 (parsed by UTCDateTime). + + Returns an empty Stream if no valid time axis can be found. + """ + covjson = json.loads(data) + domain = covjson.get("domain", {}) + ranges = covjson.get("ranges", {}) + info = covjson.get("geomag:info", {}) + + # domain->axes->(x, y, z, t) + axes = domain.get("axes", {}) + x_axis = axes.get("x", {}).get("values", []) + y_axis = axes.get("y", {}).get("values", []) + z_axis = axes.get("z", {}).get("values", []) + t_axis = axes.get("t", {}) + + # Determine the times from t_axis: + times = [] + if "values" in t_axis: + # "expanded" approach + raw_values = t_axis["values"] + for val in raw_values: + # If numeric, interpret as epoch ms; if string, parse ISO8601 + if isinstance(val, (int, float)): + # interpret as epoch ms + times.append(UTCDateTime(val / 1000.0)) + else: + # interpret as ISO8601 + times.append(UTCDateTime(val)) + elif {"start", "stop", "num"}.issubset(t_axis): + # "start-stop-num" approach + start_raw = t_axis["start"] + stop_raw = t_axis["stop"] + num = t_axis["num"] + + # parse start, stop + if isinstance(start_raw, (int, float)): + start_dt = UTCDateTime(start_raw / 1000.0) + else: + start_dt = UTCDateTime(start_raw) + + if isinstance(stop_raw, (int, float)): + stop_dt = UTCDateTime(stop_raw / 1000.0) + else: + stop_dt = UTCDateTime(stop_raw) + + if num <= 1: + # trivial case: single sample + times = [start_dt] + else: + # create uniform times from start to stop + step = (stop_dt.timestamp - start_dt.timestamp) / (num - 1) + for i in range(num): + times.append(UTCDateTime(start_dt.timestamp + i * step)) + else: + # no recognized time axis => return empty stream + return Stream() + + # If still no times, return empty + if not times: + return Stream() + + lon = float(x_axis[0] if x_axis else info.get("longitude", 0.0)) + lat = float(y_axis[0] if y_axis else info.get("latitude", 0.0)) + alt = float(z_axis[0] if z_axis else info.get("altitude", 0.0)) + + station = info.get("iaga_code", "") + data_type = info.get("data_type", "") + data_interval_type = info.get("data_interval_type", "") + station_name = info.get("station_name", station) + + stream = Stream() + + # Build traces from ranges + for channel_name, channel_range in ranges.items(): + values = channel_range.get("values", []) + if not values: + continue + + data_array = np.array(values, dtype=float) + stats = { + "network": "", + "station": station, + "channel": channel_name, + "starttime": times[0], + "npts": len(data_array), + } + + # Compute sampling_rate if multiple times + if len(times) > 1: + dt = times[1].timestamp - times[0].timestamp + stats["sampling_rate"] = 1.0 / dt if dt != 0 else 1.0 + else: + stats["sampling_rate"] = 1.0 + + # Additional metadata + stats["geodetic_longitude"] = lon + stats["geodetic_latitude"] = lat + stats["elevation"] = alt + stats["station_name"] = station_name + stats["data_type"] = data_type + stats["data_interval_type"] = data_interval_type + + trace = Trace(data=data_array, header=stats) + stream += trace + + return stream + + def write_file(self, fh, timeseries: Stream, channels): + """ + Write a CovJSON coverage. Currently uses "expanded" time axis and iso8601 times. + """ + if not timeseries or len(timeseries) == 0: + fh.write(json.dumps({}, indent=2).encode("utf-8")) + return + + timeseries.merge() + + # Filter channels + if channels: + new_stream = Stream() + for ch in channels: + new_stream += timeseries.select(channel=ch) + timeseries = new_stream + + timeseries.sort(keys=["starttime"]) + tr = timeseries[0] + stats = tr.stats + + station = stats.station or "" + lat = float(getattr(stats, "geodetic_latitude", 0.0)) + lon = float(getattr(stats, "geodetic_longitude", 0.0)) + alt = float(getattr(stats, "elevation", 0.0)) + data_type = getattr(stats, "data_type", str(self.type)) + data_interval_type = getattr(stats, "data_interval_type", str(self.interval)) + station_name = getattr(stats, "station_name", station) + + reported_orientation = "".join(channels) + sensor_orientation = getattr(stats, "sensor_orientation", "") + digital_sampling_rate = getattr(stats, "digital_sampling_rate", 0.0) + sampling_period = getattr(stats, "sampling_period", 0.0) + + npts = tr.stats.npts + delta = tr.stats.delta + starttime = tr.stats.starttime + + if self.time_axis_mode == "expanded": + time_values = [] + for i in range(npts): + current_time = starttime + i * delta + if self.time_format == "numeric": + # e.g. convert to epoch milliseconds + time_values.append(int(current_time.timestamp * 1000)) + else: + # default iso8601 + time_values.append(current_time.isoformat()) + time_axis_dict = {"values": time_values} + else: + # for "start-stop-num" approach + # numeric epoch example + if self.time_format == "numeric": + start_val = int(starttime.timestamp * 1000) + end_val = int((starttime + (npts - 1) * delta).timestamp * 1000) + else: + # if iso8601, you might store e.g. start/end as iso strings + start_val = starttime.isoformat() + end_val = (starttime + (npts - 1) * delta).isoformat() + + time_axis_dict = { + "start": start_val, + "stop": end_val, + "num": npts, + } + + domain = OrderedDict() + domain["type"] = "Domain" + domain["domainType"] = "PointSeries" + domain["axes"] = { + "x": {"values": [lon]}, + "y": {"values": [lat]}, + "z": {"values": [alt]}, + "t": time_axis_dict, # either start-stop-num or expanded + } + domain["referencing"] = [ + { + "coordinates": ["x", "y", "z"], + "system": { + "type": "GeographicCRS", + "id": "http://www.opengis.net/def/crs/OGC/1.3/CRS84", + }, + }, + { + "coordinates": ["t"], + "system": { + "type": "TemporalRS", + "id": "Gregorian", + "calendar": "Gregorian", + }, + }, + ] + + parameters = OrderedDict() + ranges_obj = OrderedDict() + + for trace in timeseries: + ch_name = trace.stats.channel + element_info = ELEMENT_INDEX.get(ch_name) + if element_info: + param_description = element_info.name + param_unit = element_info.units + param_label = element_info.abbreviation or ch_name + else: + param_description = f"Data for channel {ch_name}" + param_unit = "nT" + param_label = ch_name + + param_meta = { + "type": "Parameter", + "description": {"en": param_description}, + "unit": { + "label": {"en": param_unit}, + "symbol": { + "type": "https://intermagnet.org/faq/10.geomagnetic-comp.html", + "value": param_unit, + }, + }, + "observedProperty": { + "id": "https://intermagnet.org/faq/10.geomagnetic-comp.html", + "label": {"en": param_label}, + }, + } + parameters[ch_name] = param_meta + + arr = trace.data + cov_range = OrderedDict() + cov_range["type"] = "NdArray" + cov_range["dataType"] = "float" + cov_range["axisNames"] = ["t"] + cov_range["shape"] = [len(arr)] + cov_range["values"] = [float(v) for v in arr] + ranges_obj[ch_name] = cov_range + + coverage = OrderedDict() + coverage["type"] = "Coverage" + coverage["domain"] = domain + coverage["parameters"] = parameters + coverage["ranges"] = ranges_obj + + coverage["geomag:info"] = { + "institute": getattr(stats, "agency_name", ""), + "latitude": lat, + "longitude": lon, + "altitude": alt, + "station_name": station_name, + "iaga_code": station, + "reported_orientation": reported_orientation, + "sensor_orientation": sensor_orientation, + "digital_sampling_rate": digital_sampling_rate, + "data_interval_type": data_interval_type, + "data_type": data_type, + "sampling_period": sampling_period, + } + # Optional: Possibly unsupported + optional_fields = { + "network": getattr(stats, "network", None), + "location": getattr(stats, "location", None), + "comments": getattr(stats, "filter_comments", None), + "declination_base":getattr(stats, "declincation_base", None), + "terms_of_use": getattr(stats, "conditions_of_use", None) + } + for key, value in optional_fields.items(): + if value is not None: + coverage["geomag:info"][key] = value + + + covjson_str = json.dumps(coverage, indent=2) + fh.write(covjson_str.encode("utf-8")) -- GitLab From b7893bf6b962f229b02d188de2c06a36cf613dec Mon Sep 17 00:00:00 2001 From: Nicholas Shavers <nshavers@contractor.usgs.gov> Date: Mon, 30 Dec 2024 13:28:51 -0800 Subject: [PATCH 2/7] linting --- geomagio/api/covjson/CovJSONFactory.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/geomagio/api/covjson/CovJSONFactory.py b/geomagio/api/covjson/CovJSONFactory.py index df26ca10..02088e52 100644 --- a/geomagio/api/covjson/CovJSONFactory.py +++ b/geomagio/api/covjson/CovJSONFactory.py @@ -14,9 +14,9 @@ class CovJSONFactory(TimeseriesFactory): def __init__( self, - time_format="iso8601", # could be "iso8601" or "numeric" - time_axis_mode="expanded", # could be "expanded" or "start-stop-num" - **kwargs + time_format="iso8601", # could be "iso8601" or "numeric" + time_axis_mode="expanded", # could be "expanded" or "start-stop-num" + **kwargs, ): """ Parameters @@ -307,16 +307,15 @@ class CovJSONFactory(TimeseriesFactory): } # Optional: Possibly unsupported optional_fields = { - "network": getattr(stats, "network", None), + "network": getattr(stats, "network", None), "location": getattr(stats, "location", None), - "comments": getattr(stats, "filter_comments", None), - "declination_base":getattr(stats, "declincation_base", None), - "terms_of_use": getattr(stats, "conditions_of_use", None) + "comments": getattr(stats, "filter_comments", None), + "declination_base": getattr(stats, "declincation_base", None), + "terms_of_use": getattr(stats, "conditions_of_use", None), } for key, value in optional_fields.items(): if value is not None: coverage["geomag:info"][key] = value - covjson_str = json.dumps(coverage, indent=2) fh.write(covjson_str.encode("utf-8")) -- GitLab From 1feab71aea201b33e28891b4934660f583a9bab8 Mon Sep 17 00:00:00 2001 From: Nicholas Shavers <nshavers@contractor.usgs.gov> Date: Wed, 15 Jan 2025 13:41:22 -0800 Subject: [PATCH 3/7] covjson folder placed into geomagio, sibling to other factory folders. Import module syntax used i.e __init__.py --- geomagio/{api => }/covjson/CovJSONFactory.py | 0 geomagio/covjson/__init__.py | 7 +++++++ 2 files changed, 7 insertions(+) rename geomagio/{api => }/covjson/CovJSONFactory.py (100%) create mode 100644 geomagio/covjson/__init__.py diff --git a/geomagio/api/covjson/CovJSONFactory.py b/geomagio/covjson/CovJSONFactory.py similarity index 100% rename from geomagio/api/covjson/CovJSONFactory.py rename to geomagio/covjson/CovJSONFactory.py diff --git a/geomagio/covjson/__init__.py b/geomagio/covjson/__init__.py new file mode 100644 index 00000000..11211bd3 --- /dev/null +++ b/geomagio/covjson/__init__.py @@ -0,0 +1,7 @@ +"""IO Module for CovJSON Format""" + +from __future__ import absolute_import + +from .CovJSONFactory import CovJSONFactory + +__all__ = ["CovJSONFactory"] -- GitLab From 3467503de52104aa72a1ca7b738e97a2d6d8d3b9 Mon Sep 17 00:00:00 2001 From: Nicholas Shavers <nshavers@contractor.usgs.gov> Date: Wed, 15 Jan 2025 13:44:30 -0800 Subject: [PATCH 4/7] skip writing timeseries if channel not in request channels --- geomagio/covjson/CovJSONFactory.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/geomagio/covjson/CovJSONFactory.py b/geomagio/covjson/CovJSONFactory.py index 02088e52..401ca653 100644 --- a/geomagio/covjson/CovJSONFactory.py +++ b/geomagio/covjson/CovJSONFactory.py @@ -249,6 +249,8 @@ class CovJSONFactory(TimeseriesFactory): for trace in timeseries: ch_name = trace.stats.channel + if channels and ch_name not in channels: + continue element_info = ELEMENT_INDEX.get(ch_name) if element_info: param_description = element_info.name -- GitLab From b0da191033bedd855e14778f358d6ed40a9bcbfa Mon Sep 17 00:00:00 2001 From: Nicholas Shavers <nshavers@contractor.usgs.gov> Date: Wed, 15 Jan 2025 13:50:13 -0800 Subject: [PATCH 5/7] other half of __init__.py work... --- geomagio/Controller.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/geomagio/Controller.py b/geomagio/Controller.py index 486ae145..1b7fe683 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -7,9 +7,6 @@ from typing import List, Optional, Tuple, Union from obspy.core import Stream, UTCDateTime -from geomagio.api.covjson.CovJSONFactory import CovJSONFactory - - from .algorithm import Algorithm, algorithms, AlgorithmException, FilterAlgorithm from .DerivedTimeseriesFactory import DerivedTimeseriesFactory from .PlotTimeseriesFactory import PlotTimeseriesFactory @@ -27,7 +24,7 @@ from . import imfv283 from . import temperature from . import vbf from . import xml - +from . import covjson class Controller(object): """Controller for geomag algorithms. @@ -566,7 +563,7 @@ def get_input_factory(args): elif input_type == "xml": input_factory = xml.XMLFactory(**input_factory_args) elif input_type == "covjson": - input_factory = CovJSONFactory(**input_factory_args) + input_factory = covjson.CovJSONFactory(**input_factory_args) # wrap stream if input_stream is not None: input_factory = StreamTimeseriesFactory( @@ -637,7 +634,7 @@ def get_output_factory(args): elif output_type == "imfjson": output_factory = imfjson.IMFJSONFactory(**output_factory_args) elif output_type == "covjson": - output_factory = CovJSONFactory(**output_factory_args) + output_factory = covjson.CovJSONFactory(**output_factory_args) elif output_type == "pcdcp": output_factory = pcdcp.PCDCPFactory(**output_factory_args) elif output_type == "temperature": -- GitLab From 4767347595ea5649385963010fe972c97f6a98f5 Mon Sep 17 00:00:00 2001 From: Nicholas Shavers <nshavers@contractor.usgs.gov> Date: Wed, 15 Jan 2025 14:41:15 -0800 Subject: [PATCH 6/7] lint --- geomagio/Controller.py | 1 + 1 file changed, 1 insertion(+) diff --git a/geomagio/Controller.py b/geomagio/Controller.py index 1b7fe683..ce39685f 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -26,6 +26,7 @@ from . import vbf from . import xml from . import covjson + class Controller(object): """Controller for geomag algorithms. -- GitLab From 716488ca7164aa0b22fb3304ef1807115bb57c03 Mon Sep 17 00:00:00 2001 From: Nicholas Shavers <nshavers@contractor.usgs.gov> Date: Fri, 17 Jan 2025 10:18:14 -0800 Subject: [PATCH 7/7] lint... --- geomagio/Controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geomagio/Controller.py b/geomagio/Controller.py index ce39685f..dce6b919 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -1031,7 +1031,7 @@ def parse_args(args): "temperature", "vbf", "xml", - 'covjson', + "covjson", ), # TODO: set default to 'iaga2002' help="Output format", -- GitLab