Skip to content
Snippets Groups Projects

Covjson mvp

2 files
+ 331
0
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 322
0
import json
import math
from collections import OrderedDict
import numpy as np
from obspy import Stream, Trace, UTCDateTime
from geomagio.TimeseriesFactory import TimeseriesFactory
from geomagio.api.ws.Element import ELEMENT_INDEX
class CovJSONFactory(TimeseriesFactory):
"""Factory for reading/writing CovJSON format Geomagnetic Data."""
def __init__(
self,
time_format="iso8601", # could be "iso8601" or "numeric"
time_axis_mode="expanded", # could be "expanded" or "start-stop-num"
**kwargs
):
"""
Parameters
----------
time_format : {"iso8601", "numeric"}
- "iso8601": each time value is an ISO8601 string
- "numeric": each time value is numeric (epoch milliseconds)
time_axis_mode : {"expanded", "start-stop-num"}
- "expanded": store domain axes as a list of all time values
- "start-stop-num": store domain axes as start, stop, and num
"""
super().__init__(**kwargs)
self.time_format = time_format
self.time_axis_mode = time_axis_mode
def parse_string(self, data: str, **kwargs):
"""
Parse a CovJSON string into an ObsPy Stream.
Supports both "expanded" (t->values) and "start-stop-num" domain axes:
- If domain->axes->t->values is found, each entry is a time sample.
- Otherwise, if domain->axes->t has start, stop, and num,
generate times using linear increments (assuming uniform sampling).
For the time format:
- If times are numeric, they are interpreted as epoch milliseconds.
- If times are strings, they are interpreted as ISO8601 (parsed by UTCDateTime).
Returns an empty Stream if no valid time axis can be found.
"""
covjson = json.loads(data)
domain = covjson.get("domain", {})
ranges = covjson.get("ranges", {})
info = covjson.get("geomag:info", {})
# domain->axes->(x, y, z, t)
axes = domain.get("axes", {})
x_axis = axes.get("x", {}).get("values", [])
y_axis = axes.get("y", {}).get("values", [])
z_axis = axes.get("z", {}).get("values", [])
t_axis = axes.get("t", {})
# Determine the times from t_axis:
times = []
if "values" in t_axis:
# "expanded" approach
raw_values = t_axis["values"]
for val in raw_values:
# If numeric, interpret as epoch ms; if string, parse ISO8601
if isinstance(val, (int, float)):
# interpret as epoch ms
times.append(UTCDateTime(val / 1000.0))
else:
# interpret as ISO8601
times.append(UTCDateTime(val))
elif {"start", "stop", "num"}.issubset(t_axis):
# "start-stop-num" approach
start_raw = t_axis["start"]
stop_raw = t_axis["stop"]
num = t_axis["num"]
# parse start, stop
if isinstance(start_raw, (int, float)):
start_dt = UTCDateTime(start_raw / 1000.0)
else:
start_dt = UTCDateTime(start_raw)
if isinstance(stop_raw, (int, float)):
stop_dt = UTCDateTime(stop_raw / 1000.0)
else:
stop_dt = UTCDateTime(stop_raw)
if num <= 1:
# trivial case: single sample
times = [start_dt]
else:
# create uniform times from start to stop
step = (stop_dt.timestamp - start_dt.timestamp) / (num - 1)
for i in range(num):
times.append(UTCDateTime(start_dt.timestamp + i * step))
else:
# no recognized time axis => return empty stream
return Stream()
# If still no times, return empty
if not times:
return Stream()
lon = float(x_axis[0] if x_axis else info.get("longitude", 0.0))
lat = float(y_axis[0] if y_axis else info.get("latitude", 0.0))
alt = float(z_axis[0] if z_axis else info.get("altitude", 0.0))
station = info.get("iaga_code", "")
data_type = info.get("data_type", "")
data_interval_type = info.get("data_interval_type", "")
station_name = info.get("station_name", station)
stream = Stream()
# Build traces from ranges
for channel_name, channel_range in ranges.items():
values = channel_range.get("values", [])
if not values:
continue
data_array = np.array(values, dtype=float)
stats = {
"network": "",
"station": station,
"channel": channel_name,
"starttime": times[0],
"npts": len(data_array),
}
# Compute sampling_rate if multiple times
if len(times) > 1:
dt = times[1].timestamp - times[0].timestamp
stats["sampling_rate"] = 1.0 / dt if dt != 0 else 1.0
else:
stats["sampling_rate"] = 1.0
# Additional metadata
stats["geodetic_longitude"] = lon
stats["geodetic_latitude"] = lat
stats["elevation"] = alt
stats["station_name"] = station_name
stats["data_type"] = data_type
stats["data_interval_type"] = data_interval_type
trace = Trace(data=data_array, header=stats)
stream += trace
return stream
def write_file(self, fh, timeseries: Stream, channels):
"""
Write a CovJSON coverage. Currently uses "expanded" time axis and iso8601 times.
"""
if not timeseries or len(timeseries) == 0:
fh.write(json.dumps({}, indent=2).encode("utf-8"))
return
timeseries.merge()
# Filter channels
if channels:
new_stream = Stream()
for ch in channels:
new_stream += timeseries.select(channel=ch)
timeseries = new_stream
timeseries.sort(keys=["starttime"])
tr = timeseries[0]
stats = tr.stats
station = stats.station or ""
lat = float(getattr(stats, "geodetic_latitude", 0.0))
lon = float(getattr(stats, "geodetic_longitude", 0.0))
alt = float(getattr(stats, "elevation", 0.0))
data_type = getattr(stats, "data_type", str(self.type))
data_interval_type = getattr(stats, "data_interval_type", str(self.interval))
station_name = getattr(stats, "station_name", station)
reported_orientation = "".join(channels)
sensor_orientation = getattr(stats, "sensor_orientation", "")
digital_sampling_rate = getattr(stats, "digital_sampling_rate", 0.0)
sampling_period = getattr(stats, "sampling_period", 0.0)
npts = tr.stats.npts
delta = tr.stats.delta
starttime = tr.stats.starttime
if self.time_axis_mode == "expanded":
time_values = []
for i in range(npts):
current_time = starttime + i * delta
if self.time_format == "numeric":
# e.g. convert to epoch milliseconds
time_values.append(int(current_time.timestamp * 1000))
else:
# default iso8601
time_values.append(current_time.isoformat())
time_axis_dict = {"values": time_values}
else:
# for "start-stop-num" approach
# numeric epoch example
if self.time_format == "numeric":
start_val = int(starttime.timestamp * 1000)
end_val = int((starttime + (npts - 1) * delta).timestamp * 1000)
else:
# if iso8601, you might store e.g. start/end as iso strings
start_val = starttime.isoformat()
end_val = (starttime + (npts - 1) * delta).isoformat()
time_axis_dict = {
"start": start_val,
"stop": end_val,
"num": npts,
}
domain = OrderedDict()
domain["type"] = "Domain"
domain["domainType"] = "PointSeries"
domain["axes"] = {
"x": {"values": [lon]},
"y": {"values": [lat]},
"z": {"values": [alt]},
"t": time_axis_dict, # either start-stop-num or expanded
}
domain["referencing"] = [
{
"coordinates": ["x", "y", "z"],
"system": {
"type": "GeographicCRS",
"id": "http://www.opengis.net/def/crs/OGC/1.3/CRS84",
},
},
{
"coordinates": ["t"],
"system": {
"type": "TemporalRS",
"id": "Gregorian",
"calendar": "Gregorian",
},
},
]
parameters = OrderedDict()
ranges_obj = OrderedDict()
for trace in timeseries:
ch_name = trace.stats.channel
element_info = ELEMENT_INDEX.get(ch_name)
if element_info:
param_description = element_info.name
param_unit = element_info.units
param_label = element_info.abbreviation or ch_name
else:
param_description = f"Data for channel {ch_name}"
param_unit = "nT"
param_label = ch_name
param_meta = {
"type": "Parameter",
"description": {"en": param_description},
"unit": {
"label": {"en": param_unit},
"symbol": {
"type": "https://intermagnet.org/faq/10.geomagnetic-comp.html",
"value": param_unit,
},
},
"observedProperty": {
"id": "https://intermagnet.org/faq/10.geomagnetic-comp.html",
"label": {"en": param_label},
},
}
parameters[ch_name] = param_meta
arr = trace.data
cov_range = OrderedDict()
cov_range["type"] = "NdArray"
cov_range["dataType"] = "float"
cov_range["axisNames"] = ["t"]
cov_range["shape"] = [len(arr)]
cov_range["values"] = [float(v) for v in arr]
ranges_obj[ch_name] = cov_range
coverage = OrderedDict()
coverage["type"] = "Coverage"
coverage["domain"] = domain
coverage["parameters"] = parameters
coverage["ranges"] = ranges_obj
coverage["geomag:info"] = {
"institute": getattr(stats, "agency_name", ""),
"latitude": lat,
"longitude": lon,
"altitude": alt,
"station_name": station_name,
"iaga_code": station,
"reported_orientation": reported_orientation,
"sensor_orientation": sensor_orientation,
"digital_sampling_rate": digital_sampling_rate,
"data_interval_type": data_interval_type,
"data_type": data_type,
"sampling_period": sampling_period,
}
# Optional: Possibly unsupported
optional_fields = {
"network": getattr(stats, "network", None),
"location": getattr(stats, "location", None),
"comments": getattr(stats, "filter_comments", None),
"declination_base":getattr(stats, "declincation_base", None),
"terms_of_use": getattr(stats, "conditions_of_use", None)
}
for key, value in optional_fields.items():
if value is not None:
coverage["geomag:info"][key] = value
covjson_str = json.dumps(coverage, indent=2)
fh.write(covjson_str.encode("utf-8"))
Loading