diff --git a/geomagio/Controller.py b/geomagio/Controller.py index 65b148d81f925e1218072af34117ac0405e91fbe..a17078ddcb034ce814758ce84fe47aaa69d49ed5 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -393,41 +393,39 @@ class Controller(object): channels=output_channels, interval=output_interval, ) - if len(output_timeseries) > 0: - # force starttime & endtime to be consistent with output_interval - # FIXME: this is lazy; a better solution would be to use something - # like FilterAlgorithm's get_nearest_time() -EJR - starttime = output_timeseries[0].stats.starttime - endtime = output_timeseries[0].stats.endtime - print( - "checking gaps", - starttime, - endtime, - output_observatory, - output_channels, - file=sys.stderr, - ) - # find gaps in output, so they can be updated - output_gaps = TimeseriesUtility.get_merged_gaps( - TimeseriesUtility.get_stream_gaps(output_timeseries) - ) - else: - output_gaps = [ - [ - starttime, - endtime, - # next sample time not used - None, - ] - ] + # - output_timeseries can never be empty; + # - output_timeseries' endtime must be greater than + # or equal to its starttime + starttime = output_timeseries[0].stats.starttime + endtime = output_timeseries[0].stats.endtime + print( + "checking gaps", + starttime, + endtime, + output_observatory, + output_channels, + file=sys.stderr, + ) + # find gaps in output, so they can be updated + output_gaps = TimeseriesUtility.get_merged_gaps( + TimeseriesUtility.get_stream_gaps(output_timeseries) + ) for output_gap in output_gaps: # check for fillable gap at start if output_gap[0] == starttime: # found fillable gap at start, recurse to previous interval - recurse_starttime, recurse_endtime = get_previous_interval( - start=starttime, - end=endtime, - ) + update_interval = round(endtime - starttime) # round to whole second + if update_interval == 0: + recurse_starttime = ( + starttime + - TimeseriesUtility.get_delta_from_interval(output_interval) + ) + recurse_endtime = recurse_starttime + else: + recurse_starttime = starttime - update_interval + recurse_endtime = ( + starttime - 1e-3 + ) # excludes final sample in update iterations self.run_as_update( algorithm=algorithm, observatory=observatory, diff --git a/geomagio/TimeseriesUtility.py b/geomagio/TimeseriesUtility.py index 9550187c29eaea8239a5daa775a220c642bf20ba..35e7ffdd7cab3bdf60e700138568330ce82b2e66 100644 --- a/geomagio/TimeseriesUtility.py +++ b/geomagio/TimeseriesUtility.py @@ -28,7 +28,7 @@ def create_empty_trace( type : str data type {definitive, quasi-definitive, variation} interval : str - interval length {minute, second} + interval length {day, hour, minute, second, tenhertz} network: str the network code station: str @@ -49,17 +49,23 @@ def create_empty_trace( # Calculate first valid sample time based on interval trace_starttime = UTCDateTime(numpy.ceil(starttime.timestamp / delta) * delta) if delta > 60.0: - trace_starttime += (delta - 60) / 2 - if trace_starttime > endtime: - sys.stderr.write( - "Starttime greater than endtime, adjusting request to one sample" + # hourly and daily average have mid-interval timestamps, minus 30 seconds + if trace_starttime - starttime < (delta + 60) / 2: + trace_starttime += (delta - 60) / 2 + else: + trace_starttime += (delta - 60) / 2 - delta + + if starttime > trace_starttime or endtime < trace_starttime: + sys.stdout.write( + 'No allowed "%s" timestamps between %s and %s, ' + % (interval, starttime, endtime) + + "returning empty trace\n", ) - endtime = trace_starttime stats.starttime = trace_starttime stats.delta = delta # Calculate number of valid samples up to or before endtime length = int((endtime - trace_starttime) / delta) - stats.npts = length + 1 + stats.npts = max(0, length + 1) data = numpy.full(stats.npts, numpy.nan, dtype=numpy.float64) return Trace(data, stats) @@ -82,7 +88,10 @@ def encode_stream(stream: Stream, encoding: str) -> Stream: for trace in stream: trace_out = trace.copy() if trace_out.data.dtype != encoding: - trace_out.data = trace_out.data.astype(encoding) + if "STEIM" in encoding.upper(): + trace_out.data = trace_out.data.astype(numpy.int32) + else: + trace_out.data = trace_out.data.astype(encoding) if "mseed" in trace_out.stats: trace_out.stats.mseed.encoding = encoding.upper() out_stream += trace_out @@ -567,8 +576,9 @@ def pad_and_trim_trace(trace, starttime, endtime): if trace_endtime > endtime: # trim to endtime, at least 1 sample to remove cnt = int(math.ceil(round((trace_endtime - endtime) / trace_delta, 6))) - trace.data = trace.data[:-cnt] - trace.stats.npts = len(trace.data) + if cnt > 0: + trace.data = trace.data[:-cnt] + trace.stats.npts = len(trace.data) elif trace_endtime < endtime: # pad to endtime cnt = int(round((endtime - trace_endtime) / trace.stats.delta, 6)) diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index f46119396e5e6948a4de2612fe55c1d60f79bb3f..1344192235601dddd88ccc31493cdf9100643648 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -16,15 +16,17 @@ from typing import List, Optional import numpy import numpy.ma -from obspy import Stream, Trace, UTCDateTime +from obspy.core import Stats, Stream, Trace, UTCDateTime from obspy.clients import earthworm, neic from .. import ChannelConverter, TimeseriesUtility from ..geomag_types import DataInterval, DataType +from ..metadata.instrument.InstrumentCalibrations import get_instrument_calibrations from ..TimeseriesFactory import TimeseriesFactory from ..TimeseriesFactoryException import TimeseriesFactoryException from ..ObservatoryMetadata import ObservatoryMetadata from .RawInputClient import RawInputClient +from .MiniSeedInputClient import MiniSeedInputClient from .SNCL import SNCL from .LegacySNCL import LegacySNCL @@ -37,9 +39,15 @@ class EdgeFactory(TimeseriesFactory): host: str a string representing the IP number of the host to connect to. port: integer - the port number the waveserver is listening on. + the port number to retrieve data from; if not set, use `ws_port` + for today's data and `qs_port` for older data; if only one of + those ports is set, use it for all data; if no ports are set, + default to 2060 (a "standard" waveserver port) write_port: integer - the port number the client is writing to. + the port number to store data to; if not set, use `ris_port` + for today's data and `mss_port` for older data; if only one of + those ports is set, use it for all data; if no ports are set, + there are not "standard" ris or mss ports, so fail on writes tag: str A tag used by edge to log and associate a socket with a given data source @@ -62,13 +70,33 @@ class EdgeFactory(TimeseriesFactory): locationCode: str the location code for the given edge server, overrides type in get_timeseries/put_timeseries - scaleFactor: int - all data written to edge (via raw input client) will be scaled - by this integer prior to write; all data read from edge will be - will be divided by this integer after read; default = 1000 - snclMapper: {'geomag','legacy'} - a mapper of common channel names to SEED SNCL codes (that is, + convert_channels: array + list of channels to convert from volt/bin to nT + scale_factor: float + override default scalings when reading/writing miniseed blocks; + (reading integer blocks divides by 1000; reading floating point + blocks divides by 1; writing all data multiplies by 1000) + default = None + sncl_mode: {'geomag','legacy'} + force mode to convert common names to SEED SNCL codes (that is, station, network, channel, location codes); default = legacy + ws_port: int + port for retrieving data using waveserver protocol; this allows + access to RAM buffer, and potentially more up-to-date real time + data; not valid if stored data are not integers + qs_port: int + port for retrieving data using queryserver protocol; this uses + miniseed binary, and is more efficient for larger transfers; this + does not access Edge's RAM buffer, limiting real time access to + low sample rate data, but can be used to pull non-integer data + ris_port: int + port for storing data via Edge's RawInputServer (RIS); this places + data sample-by-sample into a RAM buffer, where it accumulates until + a full miniseed block can be written to disk + mss_port: int + port for storing data via Edge's MiniSeedServer (MSS); this creates + a miniseed block client-side, then sends this to Edge to write to + disk immediately See Also -------- @@ -76,17 +104,18 @@ class EdgeFactory(TimeseriesFactory): Notes ----- - This is designed to read from any earthworm style waveserver, but it - currently only writes to an edge. Edge mimics an earthworm style - waveserver close enough that we hope to maintain that compatibility - for reading. + Initial versions of this factory were limited in their ability to + interface with EdgeCWB. Capabilities have been expanded, espeically + with regard to reading/writing miniseed blocks directly. However, to + retain backward compatibility, especially with regard to "defaults", + some extraneous logic was necessary. """ def __init__( self, host: str = "edgecwb.usgs.gov", - port: int = 2060, - write_port: int = 7981, + port: Optional[int] = None, + write_port: Optional[int] = None, tag: str = "GeomagAlg", forceout: bool = False, observatory: Optional[str] = None, @@ -95,16 +124,15 @@ class EdgeFactory(TimeseriesFactory): interval: Optional[DataInterval] = None, observatoryMetadata: Optional[ObservatoryMetadata] = None, locationCode: Optional[str] = None, - scaleFactor: int = 1000, - snclMapper: str = "legacy", + convert_channels: Optional[List[str]] = None, + scale_factor: Optional[int] = None, + sncl_mode: Optional[str] = "legacy", + ws_port: Optional[int] = None, + qs_port: Optional[int] = None, + ris_port: Optional[int] = None, + mss_port: Optional[int] = None, ): TimeseriesFactory.__init__(self, observatory, channels, type, interval) - if port == 2060: - # earthworm (phasing this out gradually) - self.client = earthworm.Client(host, port) - else: - # CWBQuery/miniseed protocol (preferred) - self.client = neic.Client(host, port) self.host = host self.port = port self.write_port = write_port @@ -113,13 +141,24 @@ class EdgeFactory(TimeseriesFactory): self.interval = interval self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata() self.locationCode = locationCode - self.scaleFactor = scaleFactor - if snclMapper == "legacy": + + self.convert_channels = convert_channels or [] + self.scale_factor = scale_factor + self.sncl_mode = sncl_mode + if sncl_mode == "legacy": self.get_sncl = LegacySNCL.get_sncl - elif snclMapper == "geomag": + elif sncl_mode == "geomag": self.get_sncl = SNCL.get_sncl else: - raise TimeseriesFactoryException("Unrecognized SNCL mapper") + raise TimeseriesFactoryException("Unrecognized SNCL mode") + + if port is ws_port is qs_port is None: + # for backward compatibility + ws_port = 2060 + self.ws_port = ws_port + self.qs_port = qs_port + self.ris_port = ris_port + self.mss_port = mss_port def get_timeseries( self, @@ -179,17 +218,22 @@ class EdgeFactory(TimeseriesFactory): # get the timeseries timeseries = Stream() for channel in channels: - data = self._get_timeseries( - starttime, - endtime, - observatory, - channel, - type, - interval, - add_empty_channels, - ) - if len(data) == 0: - continue + if channel in self.convert_channels: + data = self._convert_timeseries( + starttime, endtime, observatory, channel, type, interval + ) + else: + data = self._get_timeseries( + starttime, + endtime, + observatory, + channel, + type, + interval, + add_empty_channels, + ) + if len(data) == 0: + continue timeseries += data finally: # restore stdout @@ -247,46 +291,84 @@ class EdgeFactory(TimeseriesFactory): ) for channel in channels: self._put_channel( - timeseries, observatory, channel, type, interval, starttime, endtime + timeseries.select(channel=channel), + observatory, + channel, + type, + interval, + starttime, + endtime, ) - def _convert_timeseries_to_decimal(self, stream: Stream): - """convert geomag edge timeseries data stored as ints, to decimal by - dividing by 1000.00 - Parameters - ---------- - stream: Stream - a stream retrieved from a geomag edge representing one channel - Notes - ----- - This routine changes the values in the timeseries. The user should - make a copy before calling if they don't want that side effect. - """ - for trace in stream: - trace.data = numpy.divide(trace.data, self.scaleFactor) - - def _convert_trace_to_int(self, trace_in: Trace) -> Trace: - """convert geomag edge traces stored as decimal, to ints by multiplying - by 1000 + def get_calculated_timeseries( + self, + starttime: UTCDateTime, + endtime: UTCDateTime, + observatory: str, + channel: str, + type: DataType, + interval: DataInterval, + components: List[dict], + ) -> Trace: + """Calculate a single channel using multiple component channels. Parameters ---------- - trace: Trace - a trace retrieved from a geomag edge representing one channel + starttime: UTCDateTime + the starttime of the requested data + endtime: UTCDateTime + the endtime of the requested data + observatory: str + observatory code + channel: str + single character channel {H, E, D, Z, F} + type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} + data type + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} + data interval + components: list + each component is a dictionary with the following keys: + channel: str + offset: float + scale: float + Returns ------- - trace: Trace - a trace converted to ints - Notes - ----- - this doesn't work on ndarray with nan's in it. - the trace must be a masked array. + out: Trace + timeseries trace of the converted channel data """ - trace = trace_in.copy() - trace.data = numpy.multiply(trace.data, self.scaleFactor) - trace.data = trace.data.astype(int) - - return trace + # sum channels + stats = None + converted = None + for component in components: + # load component + data = self._get_timeseries( + starttime, endtime, observatory, component["channel"], type, interval + )[0] + # convert to nT + nt = data.data * component["scale"] + component["offset"] + # add to converted + if converted is None: + converted = nt + stats = Stats(data.stats) + else: + converted += nt + # set channel parameter to U, V, or W + stats.channel = channel + # create empty trace with adapted stats + out = TimeseriesUtility.create_empty_trace( + stats.starttime, + stats.endtime, + stats.station, + stats.channel, + stats.data_type, + stats.data_interval, + stats.network, + stats.station, + stats.location, + ) + out.data = converted + return out def _convert_stream_to_masked(self, timeseries: Stream, channel: str) -> Stream: """convert geomag edge traces in a timeseries stream to a MaskedArray @@ -331,7 +413,7 @@ class EdgeFactory(TimeseriesFactory): single character channel {H, E, D, Z, F} type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type - interval: {'second', 'minute', 'hour', 'day'} + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} data interval add_empty_channels: bool if True, returns channels without data as empty traces @@ -352,23 +434,92 @@ class EdgeFactory(TimeseriesFactory): # according to its author, EdgeCWB is inclusive of starttime, but exclusive of # endtime, to satisfy seismic standards/requirements, to precision delta/2; half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2 - try: - data = self.client.get_waveforms( - sncl.network, - sncl.station, - sncl.location, - sncl.channel, - starttime, - endtime + half_delta, - ) - except TypeError: - # get_waveforms() fails if no data is returned from Edge - data = Stream() - # make sure data is 32bit int + if self.port: + ports = [self.port] + starttimes = [starttime] + endtimes = [endtime] + else: + midnight = UTCDateTime(UTCDateTime.now().date) + if endtime < midnight: + # cwbquery port gives access to complete miniseed packets + ports = [self.qs_port or self.ws_port] + starttimes = [starttime] + endtimes = [endtime] + elif starttime >= midnight: + # earthworm port gives access to really realtime data + ports = [self.ws_port or self.qs_port] + starttimes = [starttime] + endtimes = [endtime] + else: + # starttime-endtime straddle midnight + # port=22061 before today; port=2060 for today + ports = [ + (self.qs_port or self.ws_port), + (self.ws_port or self.qs_port), + ] + starttimes = [starttime, midnight] + endtimes = [midnight - half_delta, endtime] + + data = Stream() + for i, port in enumerate(ports): + + if port and port == self.qs_port: + # neic for complete miniseed packets (faster) + client = neic.Client(self.host, port) + + if port and (port == self.port or port == self.ws_port): + # earthworm for really realtime data (slower) + client = earthworm.Client(self.host, port) + + try: + data += client.get_waveforms( + sncl.network, + sncl.station, + sncl.location, + sncl.channel, + starttimes[i], + endtimes[i] + half_delta, + ) + except Exception as e: + print("Ingnoring error: ", e) + for trace in data: - trace.data = trace.data.astype("i4") - data.merge() + if trace.data.dtype.kind == "i": + # convert all integer traces to rescaled 64-bit floats; + if sncl.channel[1] == "E": + # instrumental voltages are stored as 1/10 microvolts + trace.data = trace.data.astype(float) / (self.scale_factor or 1e7) + else: + # everything else (mostly magnetics stored as picoteslas) + trace.data = trace.data.astype(float) / (self.scale_factor or 1e3) + elif trace.data.dtype.kind == "f": + # convert all float traces to 64-bit floats; + trace.data = trace.data.astype(float) / (self.scale_factor or 1.0) + + # when Traces with identical NSCL codes overlap, prioritize samples + # that come "later" in Stream; this depends on Edge returning miniseed + # packets in the order written + # NOTE: this is not possible with single calls to Stream.merge() + st_tmp = Stream() + for tr in data: + try: + # add tr to temporary stream + st_tmp += tr + # replace time overlaps with gaps + st_tmp.merge(0) + # add tr to temporary stream again + st_tmp += tr + # replace gaps with tr's data + st_tmp.merge(0) + except Exception as e: + tr = st_tmp.pop() # remove bad trace + print("Dropped trace: ", tr) + print("Ignoring error: ", e) + + # point `data` to the new stream and continue processing + data = st_tmp + if data.count() == 0 and add_empty_channels: data += self._get_empty_trace( starttime=starttime, @@ -380,9 +531,82 @@ class EdgeFactory(TimeseriesFactory): network=sncl.network, location=sncl.location, ) + self._set_metadata(data, observatory, channel, type, interval) return data + def _convert_timeseries( + self, + starttime: UTCDateTime, + endtime: UTCDateTime, + observatory: str, + channel: str, + type: DataType, + interval: DataInterval, + ) -> Stream: + """Generate a single channel using multiple components. + + Finds metadata, then calls _get_converted_timeseries for actual + conversion. + + Parameters + ---------- + starttime: UTCDateTime + the starttime of the requested data + endtime: UTCDateTime + the endtime of the requested data + observatory : str + observatory code + channel : str + single character channel {H, E, D, Z, F} + type : {'adjusted', 'definitive', 'quasi-definitive', 'variation'} + data type + interval : {'tenhertz', 'second', 'minute', 'hour', 'day'} + data interval + + Returns + ------- + out: Stream + timeseries with single trace of the requested channel data + NOTE: this originally returned a Trace, but was modified + (probably due to misunderstanding) at some point to + return a Stream. This Stream is, however, expected + to contain just a single Trace. + """ + out = Stream() + metadata = get_instrument_calibrations(observatory, starttime, endtime) + # loop in case request spans different configurations + for entry in metadata: + entry_endtime = entry["end_time"] + entry_starttime = entry["start_time"] + instrument = entry["instrument"] + instrument_channels = instrument["channels"] + if channel not in instrument_channels: + # no idea how to convert + continue + # determine metadata overlap with request + start = ( + starttime + if entry_starttime is None or entry_starttime < starttime + else entry_starttime + ) + end = ( + endtime + if entry_endtime is None or entry_endtime > endtime + else entry_endtime + ) + # now convert + out += self.get_calculated_timeseries( + start, + end, + observatory, + channel, + type, + interval, + instrument_channels[channel], + ) + return out + def _post_process( self, timeseries: Stream, @@ -409,7 +633,6 @@ class EdgeFactory(TimeseriesFactory): Notes: the original timeseries object is changed. """ - self._convert_timeseries_to_decimal(timeseries) for trace in timeseries: if isinstance(trace.data, numpy.ma.MaskedArray): trace.data.set_fill_value(numpy.nan) @@ -443,7 +666,7 @@ class EdgeFactory(TimeseriesFactory): channel to load type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type - interval: {'second', 'minute', 'hour', 'day'} + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} data interval starttime: UTCDateTime endtime: UTCDateTime @@ -460,34 +683,58 @@ class EdgeFactory(TimeseriesFactory): location=self.locationCode, ) - ric = RawInputClient( - self.tag, - self.host, - self.write_port, - sncl.station, - sncl.channel, - sncl.location, - sncl.network, + stream_masked = self._convert_stream_to_masked( + timeseries=timeseries, channel=channel ) + stream_split = stream_masked.split() - stream = self._convert_stream_to_masked(timeseries=timeseries, channel=channel) - - # Make certain there's actually data - if not numpy.ma.any(stream.select(channel=channel)[0].data): - return - - for trace in stream.select(channel=channel).split(): + for trace in stream_split: trace_send = trace.copy() trace_send.trim(starttime, endtime) if channel == "D": trace_send.data = ChannelConverter.get_minutes_from_radians( trace_send.data ) - trace_send = self._convert_trace_to_int(trace_send) - ric.send_trace(interval, trace_send) - if self.forceout: - ric.forceout() - ric.close() + if sncl.channel[1] == "E": + # instrumental voltages are stored as 1/10 microvolts + trace_send.data = trace_send.data * (self.scale_factor or 1e7) + else: + # everything else (mostly magnetics stored as picoteslas) + trace_send.data = trace_send.data * (self.scale_factor or 1e3) + + # if self.write_mode.lower() in ["raw"]: + if self.write_port or self.ris_port: + # FIXME: input clients should be rewritten for consistency, + # allowing us to clean up stuff like this if-else-block + ric = RawInputClient( + self.tag, + self.host, + self.write_port or self.ris_port, + sncl.station, + sncl.channel, + sncl.location, + sncl.network, + ) + trace_send.data = trace_send.data.astype(int) # ric requires ints + ric.send_trace(interval, trace_send) + if self.forceout: + ric.forceout() + ric.close() + + # elif self.write_mode.lower() in ["miniseed"]: + elif self.mss_port: + # FIXME: input clients should be rewritten for consistency, + # allowing us to clean up stuff like this if-else-block + msic = MiniSeedInputClient(self.host, self.mss_port, encoding="STEIM2") + trace_send.stats.station = sncl.station + trace_send.stats.location = sncl.location + trace_send.stats.network = sncl.network + trace_send.stats.channel = sncl.channel + stream_send = Stream(trace_send) + msic.send(stream_send) + msic.close() + else: + raise TimeseriesFactoryException("Valid write port was not specified.") def _set_metadata( self, diff --git a/geomagio/edge/LegacySNCL.py b/geomagio/edge/LegacySNCL.py index f80b438a602ce15ec0dc552ff10c7345701c0222..b0fdb39e82b3b2b7673f6d167e887c71890bdf0b 100644 --- a/geomagio/edge/LegacySNCL.py +++ b/geomagio/edge/LegacySNCL.py @@ -75,7 +75,9 @@ def _check_predefined_element(channel: str) -> Optional[str]: def _get_channel_start(interval: str) -> str: - if interval == "second": + if interval == "tenhertz": + return "T" + elif interval == "second": return "S" elif interval == "minute": return "M" diff --git a/geomagio/edge/RawInputClient.py b/geomagio/edge/RawInputClient.py index 55eae89c95affb323dad9951a32e7c128401b26d..649fa933f536d50610718f707429bf1fec418f8f 100644 --- a/geomagio/edge/RawInputClient.py +++ b/geomagio/edge/RawInputClient.py @@ -14,13 +14,16 @@ from time import sleep """ MAXINPUTSIZE: Edge uses a short for the data count, so the max input size - for a single data packet to edge is 32767 + for a single data packet to edge is *supposed* to be 32767. However, this + was never tested thoroughly until we added support for 10Hz data, when + it was quickly discovered that setting this value caused Edge to hang. + Therefore, MAXINPUTSIZE was reduced to 10_000, which seems to work well. HOURSECONDS: The number of seconds in an hour. Used as an arbitrary size for sending seconds data. DAYMINUTES: The numbers of minutes in a day. Used as the size for sending minute data, since Edge stores by the day. """ -MAXINPUTSIZE = 32767 +MAXINPUTSIZE = 10_000 HOURSECONDS = 3600 DAYMINUTES = 1440 @@ -161,7 +164,7 @@ class RawInputClient: PARAMETERS ---------- - interval: {'day', 'hour', 'minute', 'second'} + interval: {'day', 'hour', 'minute', 'second', 'tenhertz'} data interval. trace: obspy.core.trace @@ -174,7 +177,11 @@ class RawInputClient: totalsamps = len(trace.data) starttime = trace.stats.starttime - if interval == "second": + if interval == "tenhertz": + nsamp = MAXINPUTSIZE + timeoffset = 1 / 10.0 + samplerate = 10.0 + elif interval == "second": nsamp = HOURSECONDS timeoffset = 1 samplerate = 1.0 @@ -360,7 +367,7 @@ class RawInputClient: secs, usecs, self.sequence, - *samples + *samples, ) return buf @@ -377,16 +384,30 @@ class RawInputClient: tuple: (ratemantissa, ratedivosor) ratemantissa: int ratedivosor: int + + NOTE that `ratemantissa` and `ratedivisor` map directly to the SEED + standard's "sample rate factor" and "sample rate multiplier", which + are stored in a miniseed block's fixed data header as 2-byte words; + earlier versions of this method did not handle sample rates lower + than 1/second properly (they did handle 1/minute as a special-case); """ if rate > 0.9999: + # sample rates greater or equal to 1/second + # (this will not handle >327 Hz, but for consistency + # with earlier versions, we leave it unchanged) ratemantissa = int(rate * 100 + 0.001) ratedivisor = -100 - elif rate * 60.0 - 1.0 < 0.00000001: # one minute data - ratemantissa = -60 - ratedivisor = 1 + elif rate * 21600 > 1.0001: + # sample periods between 1 second and 6 hours + # (this should handle 1-minute data identically to + # earlier versions) + ratemantissa = -int(1 / rate + 0.001) + ratedivisor = -1 else: - ratemantissa = int(rate * 10000.0 + 0.001) - ratedivisor = -10000 + # sample periods 6 hours and longer + # (this will not handle periods longer than ~22 years) + ratemantissa = -int(1 / rate / 21600) + ratedivisor = -21600 return (ratemantissa, ratedivisor) diff --git a/geomagio/processing/factory.py b/geomagio/processing/factory.py index b857d68cec6786b76c26c6693f3333f5511e7a0c..17ff3aa7d5124f4807cecb1e7053be7a3ac729bf 100644 --- a/geomagio/processing/factory.py +++ b/geomagio/processing/factory.py @@ -11,10 +11,6 @@ def get_edge_factory( output_port=os.getenv("OUTPUT_EDGE_PORT", None), **kwargs ) -> EdgeFactory: - if input_port is None: - input_port = 2060 - if output_port is None: - output_port = 7981 return EdgeFactory( host=host, interval=interval, @@ -33,10 +29,6 @@ def get_miniseed_factory( output_port=os.getenv("OUTPUT_MINISEED_PORT", None), **kwargs ) -> MiniSeedFactory: - if input_port is None: - input_port = 2061 - if output_port is None: - output_port = 7974 return MiniSeedFactory( host=host, interval=interval, diff --git a/geomagio/processing/filters.py b/geomagio/processing/filters.py index e69d762e62ebd4abb95cf4fc7965d8a5886d36ef..8be2af2044a2c6aed3d870d1061b086d179bf1d7 100644 --- a/geomagio/processing/filters.py +++ b/geomagio/processing/filters.py @@ -7,7 +7,7 @@ from ..algorithm import Algorithm, FilterAlgorithm from ..Controller import Controller, get_realtime_interval from ..geomag_types import DataInterval from ..TimeseriesFactory import TimeseriesFactory -from .factory import get_edge_factory, get_miniseed_factory +from ..edge import EdgeFactory class DataFormat(str, Enum): @@ -29,33 +29,31 @@ def main(): def day_command( observatory: str = Argument(None, help="observatory id"), input_host: str = Option("127.0.0.1", help="host to request data from"), - input_port: int = Option(None, help="port to retrieve data through"), - output_port: int = Option(None, help="port to write data through"), output_host: str = Option("127.0.0.1", help="host to write data to"), + input_port: int = Option(None, help="port to pull inputs from on input host"), + output_read_port: int = Option(None, help="port to check for gaps on output host"), + output_port: int = Option(None, help="port to write to on output host"), realtime_interval: int = Option( 604800, help="length of update window (in seconds)" ), - update_limit: int = Option(7, help="number of update windows"), + update_limit: int = Option(4, help="number of update windows"), ): day_filter( observatory=observatory, - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - output_factory=get_miniseed_factory(host=output_host, output_port=output_port), - realtime_interval=realtime_interval, - update_limit=update_limit, - ) - temperature_filter( - observatory=observatory, - channels=( - ("UK1", "PK1"), - ("UK2", "PK2"), - ("UK3", "PK3"), - ("UK4", "PK4"), + channels=("U", "V", "W", "F", "T1", "T2", "T3", "T4"), + input_factory=EdgeFactory( + host=input_host, + port=input_port, # earthworm port required for realtime + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, # earthworm port required for realtime + write_port=output_port, # rawinput port required for realtime + type="variation", + sncl_mode="geomag", ), - input_factory=get_edge_factory(host=input_host, input_port=input_port), - input_interval="minute", - output_factory=get_miniseed_factory(host=output_host, output_port=output_port), - output_interval="day", realtime_interval=realtime_interval, update_limit=update_limit, ) @@ -69,30 +67,28 @@ def hour_command( observatory: str = Argument(None, help="observatory id"), input_host: str = Option("127.0.0.1", help="host to request data from"), output_host: str = Option("127.0.0.1", help="host to write data to"), - input_port: int = Option(None, help="port to retrieve data through"), - output_port: int = Option(None, help="port to write data through"), + input_port: int = Option(None, help="port to pull inputs from on input host"), + output_read_port: int = Option(None, help="port to check for gaps on output host"), + output_port: int = Option(None, help="port to write to on output host"), realtime_interval: int = Option(86400, help="length of update window (in seconds)"), - update_limit: int = Option(24, help="number of update windows"), + update_limit: int = Option(7, help="number of update windows"), ): hour_filter( observatory=observatory, - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - output_factory=get_miniseed_factory(host=output_host, output_port=output_port), - realtime_interval=realtime_interval, - update_limit=update_limit, - ) - temperature_filter( - observatory=observatory, - channels=( - ("UK1", "RK1"), - ("UK2", "RK2"), - ("UK3", "RK3"), - ("UK4", "RK4"), + channels=("U", "V", "W", "F", "T1", "T2", "T3", "T4"), + input_factory=EdgeFactory( + host=input_host, + port=input_port, # earthworm port required for realtime + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, # earthworm port required for realtime + write_port=output_port, # rawinput port required for realtime + type="variation", + sncl_mode="geomag", ), - input_factory=get_edge_factory(host=input_host, input_port=input_port), - input_interval="minute", - output_factory=get_miniseed_factory(host=output_host, output_port=output_port), - output_interval="hour", realtime_interval=realtime_interval, update_limit=update_limit, ) @@ -104,13 +100,14 @@ def hour_command( help=""" ObsRIO: - Filters 10Hz U,V,W miniseed to 1 second miniseed + Filters 10Hz U,V,W to 1 second U,V,W - Filters 1 second U,V,W,F miniseed to 1 minute miniseed + Filters 1 second U,V,W,F to 1 minute U,V,W,F - Filters 1 second T1-4 miniseed to 1 minute UK1-4 legacy + Filters 1 second T1-4 to 1 minute UK1-4 - Copies 1 second and 1 minute U,V,W,F miniseed to H,E,Z,F earthworm + Copies 1 second and 1 minute U,V,W,F miniseed to legacy + H,E,Z,F PCDCP: @@ -124,25 +121,71 @@ def realtime_command( observatory: str = Argument(None, help="observatory id"), input_host: str = Option("127.0.0.1", help="host to request data from"), output_host: str = Option("127.0.0.1", help="host to write data to"), - input_port: int = Option(None, help="port to retrieve data through"), - output_port: int = Option(None, help="port to write data through"), + input_port: int = Option(None, help="port to pull inputs from on input host"), + output_read_port: int = Option(None, help="port to check for gaps on output host"), + output_port: int = Option(None, help="port to write to on output host"), data_format: DataFormat = Option(DataFormat.PCDCP, help="Data acquisition system"), realtime_interval: int = Option(600, help="length of update window (in seconds)"), update_limit: int = Option(10, help="number of update windows"), ): if data_format == DataFormat.OBSRIO: + # ObsRIO data logger processing + + _copy_channels( + # + # copy and compress (non-tenhertz) float32 channels + # TEMPORARY UNTIL INPUTS CONVERTED TO INT32 + # + observatory=observatory, + channels=( + ("LFF", "LFF"), + ("LK1", "LK1"), + ("LK2", "LK2"), + ("LK3", "LK3"), + ("LK4", "LK4"), + ), + interval="second", + input_factory=EdgeFactory( + host=input_host, + qs_port=2061, # hard-code query server port + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=2060, # earthworm required for realtime + write_port=7981, # hard-code raw input port + type="variation", + sncl_mode="geomag", + ), + realtime_interval=realtime_interval, + update_limit=update_limit, + ) + second_filter( observatory=observatory, - input_factory=get_miniseed_factory( - host=input_host, convert_channels=("U", "V", "W"), input_port=input_port + channels=("U", "V", "W"), + input_factory=EdgeFactory( + host=input_host, + qs_port=2061, # force query server port until inputs are integers + type="variation", + convert_channels=("U", "V", "W"), + sncl_mode="geomag", ), - output_factory=get_miniseed_factory( - host=output_host, output_port=output_port + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, # earthworm port required for realtime + write_port=output_port, # rawinput port required for realtime + type="variation", + sncl_mode="geomag", ), realtime_interval=realtime_interval, update_limit=update_limit, ) + _copy_channels( + # duplicate channels using legacy SNCLs + # TEMPORARY UNTIL SNCL MIGRATION COMPLETED observatory=observatory, channels=( ("U", "H"), @@ -151,27 +194,73 @@ def realtime_command( ("F", "F"), ), interval="second", - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - output_factory=get_edge_factory(host=output_host, output_port=output_port), + input_factory=EdgeFactory( + host=input_host, + port=input_port, # earthworm port required for realtime + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, # earthworm required for realtime + write_port=7981, # hard-code rawinput until migration, then remove + type="variation", + sncl_mode="legacy", + ), realtime_interval=realtime_interval, update_limit=update_limit, ) - temperature_filter( + + minute_filter( + observatory=observatory, + channels=("U", "V", "W", "F", "T1", "T2", "T3", "T4"), + input_factory=EdgeFactory( + host=input_host, + port=input_port, # earthworm port required for realtime + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, # earthworm port required for realtime + write_port=output_port, # rawinput port required for realtime + type="variation", + sncl_mode="geomag", + ), + realtime_interval=realtime_interval, + update_limit=update_limit, + ) + + _copy_channels( + # duplicate channels using legacy SNCLs + # TEMPORARY UNTIL SNCL MIGRATION COMPLETED observatory=observatory, channels=( - ("LK1", "UK1"), - ("LK2", "UK2"), - ("LK3", "UK3"), - ("LK4", "UK4"), + ("U", "H"), + ("V", "E"), + ("W", "Z"), + ("F", "F"), + ), + interval="minute", + input_factory=EdgeFactory( + host=input_host, + port=input_port, # earthworm port required for realtime + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, # earthworm port required for realtime + write_port=7981, # hard-coded rawinput until migration, then remove + type="variation", + sncl_mode="legacy", ), - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - input_interval="second", - output_factory=get_edge_factory(host=output_host, output_port=output_port), - output_interval="minute", realtime_interval=realtime_interval, update_limit=update_limit, ) + else: + # legacy PCDCP processing _copy_channels( observatory=observatory, channels=( @@ -181,22 +270,43 @@ def realtime_command( ("F", "F"), ), interval="second", - input_factory=get_edge_factory(host=input_host, input_port=input_port), - output_factory=get_miniseed_factory( - host=output_host, output_port=output_port + input_factory=EdgeFactory( + host=input_host, + input_port=input_port, # earthworm port required for realtime + type="variation", + sncl_mode="legacy", + ), + output_factory=EdgeFactory( + host=output_host, + output_port=output_read_port, # earthworm port required for realtime + write_port=output_port, + type="variation", + sncl_mode="geomag", ), realtime_interval=realtime_interval, update_limit=update_limit, ) - minute_filter( - observatory=observatory, - channels=("U", "V", "W", "F"), - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - output_factory=get_miniseed_factory(host=output_host, output_port=output_port), - realtime_interval=realtime_interval, - update_limit=update_limit, - ) - if data_format == DataFormat.OBSRIO: + + minute_filter( + observatory=observatory, + channels=("U", "V", "W", "F"), + input_factory=EdgeFactory( + host=input_host, + port=input_port, # earthworm port required for realtime + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, # earthworm port required for realtime + write_port=output_port, # rawinput port required for realtime + type="variation", + sncl_mode="geomag", + ), + realtime_interval=realtime_interval, + update_limit=update_limit, + ) + _copy_channels( observatory=observatory, channels=( @@ -206,8 +316,19 @@ def realtime_command( ("F", "F"), ), interval="minute", - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - output_factory=get_edge_factory(host=output_host, output_port=output_port), + input_factory=EdgeFactory( + host=input_host, + input_port=input_port, # earthworm port required for realtime + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, # earthworm port required for realtime + write_port=7981, # hard-coded rawinput until migration, then remove + type="variation", + sncl_mode="legacy", + ), realtime_interval=realtime_interval, update_limit=update_limit, ) @@ -215,11 +336,11 @@ def realtime_command( def day_filter( observatory: str, - channels: List[str] = ["U", "V", "W", "F"], - input_factory: Optional[TimeseriesFactory] = None, - output_factory: Optional[TimeseriesFactory] = None, - realtime_interval: int = 86400, - update_limit: int = 7, + channels: List[str], + input_factory: TimeseriesFactory, + output_factory: TimeseriesFactory, + realtime_interval: int, + update_limit: int, ): """Filter 1 second miniseed channels to 1 day @@ -240,9 +361,9 @@ def day_filter( """ starttime, endtime = get_realtime_interval(realtime_interval) controller = Controller( - inputFactory=input_factory or get_miniseed_factory(), + inputFactory=input_factory, inputInterval="minute", - outputFactory=output_factory or get_miniseed_factory(), + outputFactory=output_factory, outputInterval="day", ) for channel in channels: @@ -266,11 +387,11 @@ def day_filter( def hour_filter( observatory: str, - channels: List[str] = ["U", "V", "W", "F"], - input_factory: Optional[TimeseriesFactory] = None, - output_factory: Optional[TimeseriesFactory] = None, - realtime_interval: int = 600, - update_limit: int = 10, + channels: List[str], + input_factory: TimeseriesFactory, + output_factory: TimeseriesFactory, + realtime_interval: int, + update_limit: int, ): """Filter 1 minute miniseed channels to 1 hour @@ -291,9 +412,9 @@ def hour_filter( """ starttime, endtime = get_realtime_interval(realtime_interval) controller = Controller( - inputFactory=input_factory or get_miniseed_factory(), + inputFactory=input_factory, inputInterval="minute", - outputFactory=output_factory or get_miniseed_factory(), + outputFactory=output_factory, outputInterval="hour", ) for channel in channels: @@ -317,9 +438,9 @@ def hour_filter( def minute_filter( observatory: str, - channels: List[str] = ["U", "V", "W", "F"], - input_factory: Optional[TimeseriesFactory] = None, - output_factory: Optional[TimeseriesFactory] = None, + channels: List[str], + input_factory: TimeseriesFactory, + output_factory: TimeseriesFactory, realtime_interval: int = 600, update_limit: int = 10, ): @@ -342,9 +463,9 @@ def minute_filter( """ starttime, endtime = get_realtime_interval(realtime_interval) controller = Controller( - inputFactory=input_factory or get_miniseed_factory(), + inputFactory=input_factory, inputInterval="second", - outputFactory=output_factory or get_miniseed_factory(), + outputFactory=output_factory, outputInterval="minute", ) for channel in channels: @@ -368,6 +489,7 @@ def minute_filter( def second_filter( observatory: str, + channels: List[str] = ["U", "V", "W"], input_factory: Optional[TimeseriesFactory] = None, output_factory: Optional[TimeseriesFactory] = None, realtime_interval: int = 600, @@ -379,6 +501,8 @@ def second_filter( ----------- observatory: str observatory id + channels: array + list of channels to filter input_factory: TimeseriesFactory factory to request data output_factory: TimeseriesFactory @@ -390,13 +514,12 @@ def second_filter( """ starttime, endtime = get_realtime_interval(realtime_interval) controller = Controller( - inputFactory=input_factory - or get_miniseed_factory(convert_channels=("U", "V", "W")), + inputFactory=input_factory, inputInterval="tenhertz", - outputFactory=output_factory or get_miniseed_factory(), + outputFactory=output_factory, outputInterval="second", ) - for channel in ("U", "V", "W"): + for channel in channels: controller.run_as_update( algorithm=FilterAlgorithm( input_sample_period=0.1, @@ -418,26 +541,26 @@ def second_filter( def temperature_filter( observatory: str, channels: List[List[str]], - input_factory: Optional[TimeseriesFactory] = None, - input_interval: int = "second", - output_factory: Optional[TimeseriesFactory] = None, - output_interval: int = "minute", - realtime_interval: int = 600, - update_limit: int = 10, + input_factory: TimeseriesFactory, + input_interval: int, + output_factory: TimeseriesFactory, + output_interval: int, + realtime_interval: int, + update_limit: int, ): """Filter temperatures 1Hz miniseed (LK1-4) to 1 minute legacy (UK1-4).""" starttime, endtime = get_realtime_interval(realtime_interval) controller = Controller( - inputFactory=input_factory or get_miniseed_factory(), + inputFactory=input_factory, inputInterval=input_interval, - outputFactory=output_factory or get_edge_factory(), + outputFactory=output_factory, outputInterval=output_interval, ) for input_channel, output_channel in channels: controller.run_as_update( algorithm=FilterAlgorithm( - input_sample_period=1, - output_sample_period=60, + input_sample_period=input_interval, + output_sample_period=output_interval, inchannels=(input_channel,), outchannels=(output_channel,), ), @@ -484,9 +607,9 @@ def _copy_channels( """ starttime, endtime = get_realtime_interval(interval_seconds=realtime_interval) controller = Controller( - inputFactory=input_factory or get_miniseed_factory(), + inputFactory=input_factory, inputInterval=interval, - outputFactory=output_factory or get_edge_factory(), + outputFactory=output_factory, outputInterval=interval, ) for input_channel, output_channel in channels: diff --git a/test/edge_test/RawInputClient_test.py b/test/edge_test/RawInputClient_test.py index ef21290aca7b15bb3784628999673fe8a30431bb..876bc5694d781cff4029853c05082dceb00997bd 100644 --- a/test/edge_test/RawInputClient_test.py +++ b/test/edge_test/RawInputClient_test.py @@ -51,7 +51,8 @@ def test_raw_input_client(): location=location, network=network, ) - trace_send = EdgeFactory()._convert_trace_to_int(trace.copy()) + trace_send = trace.copy() + trace_send.data = numpy.int32(trace_send.data * 1000) client.send_trace("minute", trace_send) # verify data was sent assert_equal(len(client.last_send), 1)