diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index f46119396e5e6948a4de2612fe55c1d60f79bb3f..569dc2c3a7707ba4ce5a10031d16862461eb3ca1 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -16,11 +16,12 @@ from typing import List, Optional import numpy import numpy.ma -from obspy import Stream, Trace, UTCDateTime -from obspy.clients import earthworm, neic +from obspy.core import Stats, Stream, Trace, UTCDateTime +from obspy.clients import earthworm from .. import ChannelConverter, TimeseriesUtility from ..geomag_types import DataInterval, DataType +from ..metadata.instrument.InstrumentCalibrations import get_instrument_calibrations from ..TimeseriesFactory import TimeseriesFactory from ..TimeseriesFactoryException import TimeseriesFactoryException from ..ObservatoryMetadata import ObservatoryMetadata @@ -37,9 +38,11 @@ class EdgeFactory(TimeseriesFactory): host: str a string representing the IP number of the host to connect to. port: integer - the port number the waveserver is listening on. + port number on which EdgeCWB's CWB waveserver (CWBWS, obspy's + "earthworm" client) listens for requests to retrieve timeseries data. write_port: integer - the port number the client is writing to. + the port number on which EdgeCWB's RawInputServer is listening for + requests to write timeseries data. tag: str A tag used by edge to log and associate a socket with a given data source @@ -54,7 +57,7 @@ class EdgeFactory(TimeseriesFactory): others are passed through, see #_get_edge_channel(). type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type - interval: {'second', 'minute', 'hour', 'day'} + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} data interval observatoryMetadata: ObservatoryMetadata object an ObservatoryMetadata object used to replace the default @@ -62,12 +65,15 @@ class EdgeFactory(TimeseriesFactory): locationCode: str the location code for the given edge server, overrides type in get_timeseries/put_timeseries - scaleFactor: int - all data written to edge (via raw input client) will be scaled - by this integer prior to write; all data read from edge will be - will be divided by this integer after read; default = 1000 - snclMapper: {'geomag','legacy'} - a mapper of common channel names to SEED SNCL codes (that is, + convert_channels: array + list of channels to convert from volt/bin to nT + scale_factor: float + override default scalings when reading/writing miniseed blocks; + (reading integer blocks divides by 1000; reading floating point + blocks divides by 1; writing all data multiplies by 1000) + default = None + sncl_mode: {'geomag','legacy'} + force mode to convert common names to SEED SNCL codes (that is, station, network, channel, location codes); default = legacy See Also @@ -76,17 +82,19 @@ class EdgeFactory(TimeseriesFactory): Notes ----- - This is designed to read from any earthworm style waveserver, but it - currently only writes to an edge. Edge mimics an earthworm style - waveserver close enough that we hope to maintain that compatibility - for reading. + The EdgeFactory reads so-called trace-bufs and writes integer data + to an EdgeCWB RawInputServer, which places these data, sample-by-sample, + into a RAM buffer where it can be immediately retrieved, even before + full miniseed blocks can be constructed and written to disk. The + EdgeFactory cannot handle non-integer data, and is inefficient for + larger data transfers. In those cases, consider using MiniSeedFactory. """ def __init__( self, - host: str = "edgecwb.usgs.gov", - port: int = 2060, - write_port: int = 7981, + host: Optional[str] = None, + port: Optional[int] = None, + write_port: Optional[int] = None, tag: str = "GeomagAlg", forceout: bool = False, observatory: Optional[str] = None, @@ -95,31 +103,27 @@ class EdgeFactory(TimeseriesFactory): interval: Optional[DataInterval] = None, observatoryMetadata: Optional[ObservatoryMetadata] = None, locationCode: Optional[str] = None, - scaleFactor: int = 1000, - snclMapper: str = "legacy", + convert_channels: Optional[List[str]] = None, + scale_factor: Optional[int] = None, + sncl_mode: Optional[str] = "legacy", ): TimeseriesFactory.__init__(self, observatory, channels, type, interval) - if port == 2060: - # earthworm (phasing this out gradually) - self.client = earthworm.Client(host, port) - else: - # CWBQuery/miniseed protocol (preferred) - self.client = neic.Client(host, port) - self.host = host - self.port = port - self.write_port = write_port + self.host = host or "edgecwb.usgs.gov" + self.port = port or [2060] + self.write_port = write_port # no default write port self.tag = tag self.forceout = forceout - self.interval = interval self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata() self.locationCode = locationCode - self.scaleFactor = scaleFactor - if snclMapper == "legacy": + self.convert_channels = convert_channels or [] + self.scale_factor = scale_factor + self.sncl_mode = sncl_mode + if sncl_mode == "legacy": self.get_sncl = LegacySNCL.get_sncl - elif snclMapper == "geomag": + elif sncl_mode == "geomag": self.get_sncl = SNCL.get_sncl else: - raise TimeseriesFactoryException("Unrecognized SNCL mapper") + raise TimeseriesFactoryException("Unrecognized SNCL mode") def get_timeseries( self, @@ -145,7 +149,7 @@ class EdgeFactory(TimeseriesFactory): list of channels to load type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type - interval: {'second', 'minute', 'hour', 'day'} + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} data interval add_empty_channels: bool if True, returns channels without data as empty traces @@ -179,17 +183,22 @@ class EdgeFactory(TimeseriesFactory): # get the timeseries timeseries = Stream() for channel in channels: - data = self._get_timeseries( - starttime, - endtime, - observatory, - channel, - type, - interval, - add_empty_channels, - ) - if len(data) == 0: - continue + if channel in self.convert_channels: + data = self._convert_timeseries( + starttime, endtime, observatory, channel, type, interval + ) + else: + data = self._get_timeseries( + starttime, + endtime, + observatory, + channel, + type, + interval, + add_empty_channels, + ) + if len(data) == 0: + continue timeseries += data finally: # restore stdout @@ -220,7 +229,7 @@ class EdgeFactory(TimeseriesFactory): list of channels to load type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type - interval: {'second', 'minute', 'hour', 'day'} + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} data interval Notes @@ -247,46 +256,84 @@ class EdgeFactory(TimeseriesFactory): ) for channel in channels: self._put_channel( - timeseries, observatory, channel, type, interval, starttime, endtime + timeseries.select(channel=channel), + observatory, + channel, + type, + interval, + starttime, + endtime, ) - def _convert_timeseries_to_decimal(self, stream: Stream): - """convert geomag edge timeseries data stored as ints, to decimal by - dividing by 1000.00 - Parameters - ---------- - stream: Stream - a stream retrieved from a geomag edge representing one channel - Notes - ----- - This routine changes the values in the timeseries. The user should - make a copy before calling if they don't want that side effect. - """ - for trace in stream: - trace.data = numpy.divide(trace.data, self.scaleFactor) - - def _convert_trace_to_int(self, trace_in: Trace) -> Trace: - """convert geomag edge traces stored as decimal, to ints by multiplying - by 1000 + def get_calculated_timeseries( + self, + starttime: UTCDateTime, + endtime: UTCDateTime, + observatory: str, + channel: str, + type: DataType, + interval: DataInterval, + components: List[dict], + ) -> Trace: + """Calculate a single channel using multiple component channels. Parameters ---------- - trace: Trace - a trace retrieved from a geomag edge representing one channel + starttime: UTCDateTime + the starttime of the requested data + endtime: UTCDateTime + the endtime of the requested data + observatory: str + observatory code + channel: str + single character channel {H, E, D, Z, F} + type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} + data type + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} + data interval + components: list + each component is a dictionary with the following keys: + channel: str + offset: float + scale: float + Returns ------- - trace: Trace - a trace converted to ints - Notes - ----- - this doesn't work on ndarray with nan's in it. - the trace must be a masked array. + out: Trace + timeseries trace of the converted channel data """ - trace = trace_in.copy() - trace.data = numpy.multiply(trace.data, self.scaleFactor) - trace.data = trace.data.astype(int) - - return trace + # sum channels + stats = None + converted = None + for component in components: + # load component + data = self._get_timeseries( + starttime, endtime, observatory, component["channel"], type, interval + )[0] + # convert to nT + nt = data.data * component["scale"] + component["offset"] + # add to converted + if converted is None: + converted = nt + stats = Stats(data.stats) + else: + converted += nt + # set channel parameter to U, V, or W + stats.channel = channel + # create empty trace with adapted stats + out = TimeseriesUtility.create_empty_trace( + stats.starttime, + stats.endtime, + stats.station, + stats.channel, + stats.data_type, + stats.data_interval, + stats.network, + stats.station, + stats.location, + ) + out.data = converted + return out def _convert_stream_to_masked(self, timeseries: Stream, channel: str) -> Stream: """convert geomag edge traces in a timeseries stream to a MaskedArray @@ -331,7 +378,7 @@ class EdgeFactory(TimeseriesFactory): single character channel {H, E, D, Z, F} type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type - interval: {'second', 'minute', 'hour', 'day'} + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} data interval add_empty_channels: bool if True, returns channels without data as empty traces @@ -352,23 +399,75 @@ class EdgeFactory(TimeseriesFactory): # according to its author, EdgeCWB is inclusive of starttime, but exclusive of # endtime, to satisfy seismic standards/requirements, to precision delta/2; half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2 + + # list-type ports variable needed for fail-back logic try: - data = self.client.get_waveforms( - sncl.network, - sncl.station, - sncl.location, - sncl.channel, - starttime, - endtime + half_delta, - ) + ports = list(self.port) except TypeError: - # get_waveforms() fails if no data is returned from Edge - data = Stream() + ports = [self.port] + + data = Stream() + for port in ports: + try: + client = earthworm.Client(self.host, port, timeout=10) + + data += client.get_waveforms( + sncl.network, + sncl.station, + sncl.location, + sncl.channel, + starttime, + endtime + half_delta, + ) + + if data: + # if data was returned, force this port in subsequent calls + # to get_timeseries() that use this instance of EdgeFactory + self.port = [port] + break + print("No data returned from ", self.host, " on port ", port) + # try alternate port(s) if provided + except Exception as e: + print("Failed to get data from ", self.host, " on port ", port) + print("Ignoring error: ", e) + # try alternate port(s) if provided + continue - # make sure data is 32bit int for trace in data: - trace.data = trace.data.astype("i4") - data.merge() + if trace.data.dtype.kind == "i": + # convert all integer traces to rescaled 64-bit floats; + if sncl.channel[1] == "E": + # instrumental voltages are stored as 1/10 microvolts + trace.data = trace.data.astype(float) / (self.scale_factor or 1e7) + else: + # everything else (mostly magnetics stored as picoteslas) + trace.data = trace.data.astype(float) / (self.scale_factor or 1e3) + elif trace.data.dtype.kind == "f": + # convert all float traces to 64-bit floats; + trace.data = trace.data.astype(float) / (self.scale_factor or 1.0) + + # when Traces with identical NSCL codes overlap, prioritize samples + # that come "later" in Stream; this depends on Edge returning miniseed + # packets in the order written + # NOTE: this is not possible with single calls to Stream.merge() + st_tmp = Stream() + for tr in data: + try: + # add tr to temporary stream + st_tmp += tr + # replace time overlaps with gaps + st_tmp.merge(0) + # add tr to temporary stream again + st_tmp += tr + # replace gaps with tr's data + st_tmp.merge(0) + except Exception as e: + tr = st_tmp.pop() # remove bad trace + print("Dropped trace: ", tr) + print("Ignoring error: ", e) + + # point `data` to the new stream and continue processing + data = st_tmp if data.count() == 0 and add_empty_channels: data += self._get_empty_trace( starttime=starttime, @@ -383,6 +482,84 @@ class EdgeFactory(TimeseriesFactory): self._set_metadata(data, observatory, channel, type, interval) return data + def _convert_timeseries( + self, + starttime: UTCDateTime, + endtime: UTCDateTime, + observatory: str, + channel: str, + type: DataType, + interval: DataInterval, + ) -> Stream: + """Generate a single channel using multiple components. + + Finds metadata, then calls _get_converted_timeseries for actual + conversion. + + Parameters + ---------- + starttime: UTCDateTime + the starttime of the requested data + endtime: UTCDateTime + the endtime of the requested data + observatory : str + observatory code + channel : str + single character channel {H, E, D, Z, F} + type : {'adjusted', 'definitive', 'quasi-definitive', 'variation'} + data type + interval : {'tenhertz', 'second', 'minute', 'hour', 'day'} + data interval + + Returns + ------- + out: Stream + timeseries with single trace of the requested channel data + NOTE: this originally returned a Trace, but was modified + (probably due to misunderstanding) at some point to + return a Stream. This Stream is, however, expected + to contain just a single Trace. + """ + out = Stream() + metadata = get_instrument_calibrations(observatory, starttime, endtime) + # loop in case request spans different configurations + for entry in metadata: + entry_endtime = entry["end_time"] + entry_starttime = entry["start_time"] + instrument = entry["instrument"] + instrument_channels = instrument["channels"] + if channel not in instrument_channels: + # no idea how to convert + continue + # determine metadata overlap with request + start = ( + starttime + if entry_starttime is None or entry_starttime < starttime + else entry_starttime + ) + end = ( + endtime + if entry_endtime is None or entry_endtime > endtime + else entry_endtime + ) + # now convert + out += self.get_calculated_timeseries( + start, + end, + observatory, + channel, + type, + interval, + instrument_channels[channel], + ) + # force to match the first trace's datatype + for trace in out: + trace.data = trace.data.astype(out[0].data.dtype) + # merge to force a single trace + # (precedence given to later inteval when metadata overlap) + out.merge(1) + return out + def _post_process( self, timeseries: Stream, @@ -391,7 +568,7 @@ class EdgeFactory(TimeseriesFactory): channels: List[str], ): """Post process a timeseries stream after the raw data is - is fetched from a waveserver. Specifically changes + is fetched from waveserver. Specifically changes any MaskedArray to a ndarray with nans representing gaps. Then calls pad_timeseries to deal with gaps at the beggining or end of the streams. @@ -409,7 +586,6 @@ class EdgeFactory(TimeseriesFactory): Notes: the original timeseries object is changed. """ - self._convert_timeseries_to_decimal(timeseries) for trace in timeseries: if isinstance(trace.data, numpy.ma.MaskedArray): trace.data.set_fill_value(numpy.nan) @@ -443,14 +619,10 @@ class EdgeFactory(TimeseriesFactory): channel to load type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type - interval: {'second', 'minute', 'hour', 'day'} + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} data interval starttime: UTCDateTime endtime: UTCDateTime - - Notes - ----- - RawInputClient seems to only work when sockets are """ sncl = self.get_sncl( station=observatory, @@ -460,34 +632,42 @@ class EdgeFactory(TimeseriesFactory): location=self.locationCode, ) - ric = RawInputClient( - self.tag, - self.host, - self.write_port, - sncl.station, - sncl.channel, - sncl.location, - sncl.network, + stream_masked = self._convert_stream_to_masked( + timeseries=timeseries, channel=channel ) + stream_split = stream_masked.split() - stream = self._convert_stream_to_masked(timeseries=timeseries, channel=channel) - - # Make certain there's actually data - if not numpy.ma.any(stream.select(channel=channel)[0].data): - return - - for trace in stream.select(channel=channel).split(): + for trace in stream_split: trace_send = trace.copy() trace_send.trim(starttime, endtime) if channel == "D": trace_send.data = ChannelConverter.get_minutes_from_radians( trace_send.data ) - trace_send = self._convert_trace_to_int(trace_send) - ric.send_trace(interval, trace_send) - if self.forceout: - ric.forceout() - ric.close() + if sncl.channel[1] == "E": + # instrumental voltages are stored as 1/10 microvolts + trace_send.data = trace_send.data * (self.scale_factor or 1e7) + else: + # everything else (mostly magnetics stored as picoteslas) + trace_send.data = trace_send.data * (self.scale_factor or 1e3) + + if self.write_port: + ric = RawInputClient( + self.tag, + self.host, + self.write_port, + sncl.station, + sncl.channel, + sncl.location, + sncl.network, + ) + trace_send.data = trace_send.data.astype(int) # ric requires ints + ric.send_trace(interval, trace_send) + if self.forceout: + ric.forceout() + ric.close() + else: + raise TimeseriesFactoryException("Valid write port was not specified.") def _set_metadata( self, @@ -506,7 +686,7 @@ class EdgeFactory(TimeseriesFactory): edge channel code {MVH, MVE, MVD, ...} type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type - interval: {'second', 'minute', 'hour', 'day'} + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} data interval """ for trace in stream: