Skip to content
Snippets Groups Projects
CovJSONFactory.py 11.8 KiB
Newer Older
  • Learn to ignore specific revisions
  • import json
    import math
    from collections import OrderedDict
    
    import numpy as np
    from obspy import Stream, Trace, UTCDateTime
    
    from geomagio.TimeseriesFactory import TimeseriesFactory
    from geomagio.api.ws.Element import ELEMENT_INDEX
    
    
    class CovJSONFactory(TimeseriesFactory):
        """Factory for reading/writing CovJSON format Geomagnetic Data."""
    
        def __init__(
            self,
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
            time_format="iso8601",  # could be "iso8601" or "numeric"
            time_axis_mode="expanded",  # could be "expanded" or "start-stop-num"
            **kwargs,
    
        ):
            """
            Parameters
            ----------
            time_format : {"iso8601", "numeric"}
                - "iso8601": each time value is an ISO8601 string
                - "numeric": each time value is numeric (epoch milliseconds)
            time_axis_mode : {"expanded", "start-stop-num"}
                - "expanded": store domain axes as a list of all time values
                - "start-stop-num": store domain axes as start, stop, and num
            """
            super().__init__(**kwargs)
            self.time_format = time_format
            self.time_axis_mode = time_axis_mode
    
        def parse_string(self, data: str, **kwargs):
            """
            Parse a CovJSON string into an ObsPy Stream.
    
            Supports both "expanded" (t->values) and "start-stop-num" domain axes:
            - If domain->axes->t->values is found, each entry is a time sample.
            - Otherwise, if domain->axes->t has start, stop, and num,
            generate times using linear increments (assuming uniform sampling).
    
            For the time format:
            - If times are numeric, they are interpreted as epoch milliseconds.
            - If times are strings, they are interpreted as ISO8601 (parsed by UTCDateTime).
    
            Returns an empty Stream if no valid time axis can be found.
            """
            covjson = json.loads(data)
            domain = covjson.get("domain", {})
            ranges = covjson.get("ranges", {})
            info = covjson.get("geomag:info", {})
    
            # domain->axes->(x, y, z, t)
            axes = domain.get("axes", {})
            x_axis = axes.get("x", {}).get("values", [])
            y_axis = axes.get("y", {}).get("values", [])
            z_axis = axes.get("z", {}).get("values", [])
            t_axis = axes.get("t", {})
    
            # Determine the times from t_axis:
            times = []
            if "values" in t_axis:
                # "expanded" approach
                raw_values = t_axis["values"]
                for val in raw_values:
                    # If numeric, interpret as epoch ms; if string, parse ISO8601
                    if isinstance(val, (int, float)):
                        # interpret as epoch ms
                        times.append(UTCDateTime(val / 1000.0))
                    else:
                        # interpret as ISO8601
                        times.append(UTCDateTime(val))
            elif {"start", "stop", "num"}.issubset(t_axis):
                # "start-stop-num" approach
                start_raw = t_axis["start"]
                stop_raw = t_axis["stop"]
                num = t_axis["num"]
    
                # parse start, stop
                if isinstance(start_raw, (int, float)):
                    start_dt = UTCDateTime(start_raw / 1000.0)
                else:
                    start_dt = UTCDateTime(start_raw)
    
                if isinstance(stop_raw, (int, float)):
                    stop_dt = UTCDateTime(stop_raw / 1000.0)
                else:
                    stop_dt = UTCDateTime(stop_raw)
    
                if num <= 1:
                    # trivial case: single sample
                    times = [start_dt]
                else:
                    # create uniform times from start to stop
                    step = (stop_dt.timestamp - start_dt.timestamp) / (num - 1)
                    for i in range(num):
                        times.append(UTCDateTime(start_dt.timestamp + i * step))
            else:
                # no recognized time axis => return empty stream
                return Stream()
    
            # If still no times, return empty
            if not times:
                return Stream()
    
            lon = float(x_axis[0] if x_axis else info.get("longitude", 0.0))
            lat = float(y_axis[0] if y_axis else info.get("latitude", 0.0))
            alt = float(z_axis[0] if z_axis else info.get("altitude", 0.0))
    
            station = info.get("iaga_code", "")
            data_type = info.get("data_type", "")
            data_interval_type = info.get("data_interval_type", "")
            station_name = info.get("station_name", station)
    
            stream = Stream()
    
            # Build traces from ranges
            for channel_name, channel_range in ranges.items():
                values = channel_range.get("values", [])
                if not values:
                    continue
    
                data_array = np.array(values, dtype=float)
                stats = {
                    "network": "",
                    "station": station,
                    "channel": channel_name,
                    "starttime": times[0],
                    "npts": len(data_array),
                }
    
                # Compute sampling_rate if multiple times
                if len(times) > 1:
                    dt = times[1].timestamp - times[0].timestamp
                    stats["sampling_rate"] = 1.0 / dt if dt != 0 else 1.0
                else:
                    stats["sampling_rate"] = 1.0
    
                # Additional metadata
                stats["geodetic_longitude"] = lon
                stats["geodetic_latitude"] = lat
                stats["elevation"] = alt
                stats["station_name"] = station_name
                stats["data_type"] = data_type
                stats["data_interval_type"] = data_interval_type
    
                trace = Trace(data=data_array, header=stats)
                stream += trace
    
            return stream
    
        def write_file(self, fh, timeseries: Stream, channels):
            """
            Write a CovJSON coverage. Currently uses "expanded" time axis and iso8601 times.
            """
            if not timeseries or len(timeseries) == 0:
                fh.write(json.dumps({}, indent=2).encode("utf-8"))
                return
    
            timeseries.merge()
    
            # Filter channels
            if channels:
                new_stream = Stream()
                for ch in channels:
                    new_stream += timeseries.select(channel=ch)
                timeseries = new_stream
    
            timeseries.sort(keys=["starttime"])
            tr = timeseries[0]
            stats = tr.stats
    
            station = stats.station or ""
            lat = float(getattr(stats, "geodetic_latitude", 0.0))
            lon = float(getattr(stats, "geodetic_longitude", 0.0))
            alt = float(getattr(stats, "elevation", 0.0))
            data_type = getattr(stats, "data_type", str(self.type))
            data_interval_type = getattr(stats, "data_interval_type", str(self.interval))
            station_name = getattr(stats, "station_name", station)
    
            reported_orientation = "".join(channels)
            sensor_orientation = getattr(stats, "sensor_orientation", "")
            digital_sampling_rate = getattr(stats, "digital_sampling_rate", 0.0)
            sampling_period = getattr(stats, "sampling_period", 0.0)
    
            npts = tr.stats.npts
            delta = tr.stats.delta
            starttime = tr.stats.starttime
    
            if self.time_axis_mode == "expanded":
                time_values = []
                for i in range(npts):
                    current_time = starttime + i * delta
                    if self.time_format == "numeric":
                        # e.g. convert to epoch milliseconds
                        time_values.append(int(current_time.timestamp * 1000))
                    else:
                        # default iso8601
                        time_values.append(current_time.isoformat())
                time_axis_dict = {"values": time_values}
            else:
                # for "start-stop-num" approach
                # numeric epoch example
                if self.time_format == "numeric":
                    start_val = int(starttime.timestamp * 1000)
                    end_val = int((starttime + (npts - 1) * delta).timestamp * 1000)
                else:
                    # if iso8601, you might store e.g. start/end as iso strings
                    start_val = starttime.isoformat()
                    end_val = (starttime + (npts - 1) * delta).isoformat()
    
                time_axis_dict = {
                    "start": start_val,
                    "stop": end_val,
                    "num": npts,
                }
    
            domain = OrderedDict()
            domain["type"] = "Domain"
            domain["domainType"] = "PointSeries"
            domain["axes"] = {
                "x": {"values": [lon]},
                "y": {"values": [lat]},
                "z": {"values": [alt]},
                "t": time_axis_dict,  # either start-stop-num or expanded
            }
            domain["referencing"] = [
                {
                    "coordinates": ["x", "y", "z"],
                    "system": {
                        "type": "GeographicCRS",
                        "id": "http://www.opengis.net/def/crs/OGC/1.3/CRS84",
                    },
                },
                {
                    "coordinates": ["t"],
                    "system": {
                        "type": "TemporalRS",
                        "id": "Gregorian",
                        "calendar": "Gregorian",
                    },
                },
            ]
    
            parameters = OrderedDict()
            ranges_obj = OrderedDict()
    
            for trace in timeseries:
                ch_name = trace.stats.channel
    
                if channels and ch_name not in channels:
                    continue
    
                element_info = ELEMENT_INDEX.get(ch_name)
                if element_info:
                    param_description = element_info.name
                    param_unit = element_info.units
                    param_label = element_info.abbreviation or ch_name
                else:
                    param_description = f"Data for channel {ch_name}"
                    param_unit = "nT"
                    param_label = ch_name
    
                param_meta = {
                    "type": "Parameter",
                    "description": {"en": param_description},
                    "unit": {
                        "label": {"en": param_unit},
                        "symbol": {
                            "type": "https://intermagnet.org/faq/10.geomagnetic-comp.html",
                            "value": param_unit,
                        },
                    },
                    "observedProperty": {
                        "id": "https://intermagnet.org/faq/10.geomagnetic-comp.html",
                        "label": {"en": param_label},
                    },
                }
                parameters[ch_name] = param_meta
    
                arr = trace.data
                cov_range = OrderedDict()
                cov_range["type"] = "NdArray"
                cov_range["dataType"] = "float"
                cov_range["axisNames"] = ["t"]
                cov_range["shape"] = [len(arr)]
                cov_range["values"] = [float(v) for v in arr]
                ranges_obj[ch_name] = cov_range
    
            coverage = OrderedDict()
            coverage["type"] = "Coverage"
            coverage["domain"] = domain
            coverage["parameters"] = parameters
            coverage["ranges"] = ranges_obj
    
            coverage["geomag:info"] = {
                "institute": getattr(stats, "agency_name", ""),
                "latitude": lat,
                "longitude": lon,
                "altitude": alt,
                "station_name": station_name,
                "iaga_code": station,
                "reported_orientation": reported_orientation,
                "sensor_orientation": sensor_orientation,
                "digital_sampling_rate": digital_sampling_rate,
                "data_interval_type": data_interval_type,
                "data_type": data_type,
                "sampling_period": sampling_period,
            }
            # Optional: Possibly unsupported
            optional_fields = {
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
                "network": getattr(stats, "network", None),
    
                "location": getattr(stats, "location", None),
    
    Shavers, Nicholas H's avatar
    Shavers, Nicholas H committed
                "comments": getattr(stats, "filter_comments", None),
                "declination_base": getattr(stats, "declincation_base", None),
                "terms_of_use": getattr(stats, "conditions_of_use", None),
    
            }
            for key, value in optional_fields.items():
                if value is not None:
                    coverage["geomag:info"][key] = value
    
            covjson_str = json.dumps(coverage, indent=2)
            fh.write(covjson_str.encode("utf-8"))