diff --git a/code.json b/code.json index 196e8e82ce516a13a5c61b66f9a590a8560d87ba..c13b31f7dec4420d90a72245aa81d36ca2330dd1 100644 --- a/code.json +++ b/code.json @@ -3,7 +3,7 @@ "name": "geomag-algorithms", "organization": "U.S. Geological Survey", "description": "Library for processing Geomagnetic timeseries data.", - "version": "1.12.3", + "version": "1.13.1", "status": "Development", "permissions": { "usageType": "openSource", @@ -35,7 +35,7 @@ "email": "gs-haz_dev_team_group@usgs.gov" }, "date": { - "metadataLastUpdated": "2025-02-13" + "metadataLastUpdated": "2025-02-18" } } ] \ No newline at end of file diff --git a/geomagio/Controller.py b/geomagio/Controller.py index 236618176bbaeed9d334b090ce2a0100fb0363d6..7895fc5400dfb930b42c62ba3db0c248063e07c6 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -397,40 +397,35 @@ class Controller(object): channels=output_channels, interval=output_interval, ) - if len(output_timeseries) > 0: - # force starttime & endtime to be consistent with output_interval - # FIXME: this is lazy; a better solution would be to use something - # like FilterAlgorithm's get_nearest_time() -EJR - starttime = output_timeseries[0].stats.starttime - endtime = output_timeseries[0].stats.endtime - print( - "checking gaps", - starttime, - endtime, - output_observatory, - output_channels, - file=sys.stderr, - ) - # find gaps in output, so they can be updated - output_gaps = TimeseriesUtility.get_merged_gaps( - TimeseriesUtility.get_stream_gaps(output_timeseries) - ) - else: - output_gaps = [ - [ - starttime, - endtime, - # next sample time not used - None, - ] - ] + # - output_timeseries can never be empty; + # - output_timeseries' endtime must be greater than + # or equal to its starttime + starttime = output_timeseries[0].stats.starttime + endtime = output_timeseries[0].stats.endtime + print( + "checking gaps", + starttime, + endtime, + output_observatory, + output_channels, + file=sys.stderr, + ) + # find gaps in output, so they can be updated + output_gaps = TimeseriesUtility.get_merged_gaps( + TimeseriesUtility.get_stream_gaps(output_timeseries) + ) for output_gap in output_gaps: # check for fillable gap at start if output_gap[0] == starttime: - # found fillable gap at start, recurse to previous interval - recurse_starttime, recurse_endtime = get_previous_interval( - start=starttime, - end=endtime, + delta = TimeseriesUtility.get_delta_from_interval(output_interval) + update_interval = endtime - starttime + recurse_endtime = starttime - delta + recurse_starttime = ( + # ensure starttime and endtime are inclusive on first update, + # then exclude previous starttime with subsequent updates + starttime + - update_interval + - (update_count and delta) ) self.run_as_update( algorithm=algorithm, @@ -700,12 +695,16 @@ def get_previous_interval( return (start - interval_size, start - 1e-3) -def get_realtime_interval(interval_seconds: int) -> Tuple[UTCDateTime, UTCDateTime]: +def get_realtime_interval( + interval_seconds: int, + interval_str: Optional[str] = "second", # default to second for compatibility +) -> Tuple[UTCDateTime, UTCDateTime]: # calculate endtime/starttime now = UTCDateTime() - endtime = UTCDateTime( - now.year, now.month, now.day, now.hour, now.minute, now.second - ) + delta = TimeseriesUtility.get_delta_from_interval(interval_str) + endtime = now - now.timestamp % delta + if delta > 60: # handle hour and day intervals + endtime += delta / 2 - 30 starttime = endtime - interval_seconds return starttime, endtime @@ -757,7 +756,9 @@ def main(args: Optional[List[str]] = None): else: args.realtime = 600 # calculate endtime/starttime - args.starttime, args.endtime = get_realtime_interval(args.realtime) + args.starttime, args.endtime = get_realtime_interval( + args.realtime, args.interval + ) if args.observatory_foreach: observatory = args.observatory diff --git a/geomagio/TimeseriesUtility.py b/geomagio/TimeseriesUtility.py index 9550187c29eaea8239a5daa775a220c642bf20ba..35e7ffdd7cab3bdf60e700138568330ce82b2e66 100644 --- a/geomagio/TimeseriesUtility.py +++ b/geomagio/TimeseriesUtility.py @@ -28,7 +28,7 @@ def create_empty_trace( type : str data type {definitive, quasi-definitive, variation} interval : str - interval length {minute, second} + interval length {day, hour, minute, second, tenhertz} network: str the network code station: str @@ -49,17 +49,23 @@ def create_empty_trace( # Calculate first valid sample time based on interval trace_starttime = UTCDateTime(numpy.ceil(starttime.timestamp / delta) * delta) if delta > 60.0: - trace_starttime += (delta - 60) / 2 - if trace_starttime > endtime: - sys.stderr.write( - "Starttime greater than endtime, adjusting request to one sample" + # hourly and daily average have mid-interval timestamps, minus 30 seconds + if trace_starttime - starttime < (delta + 60) / 2: + trace_starttime += (delta - 60) / 2 + else: + trace_starttime += (delta - 60) / 2 - delta + + if starttime > trace_starttime or endtime < trace_starttime: + sys.stdout.write( + 'No allowed "%s" timestamps between %s and %s, ' + % (interval, starttime, endtime) + + "returning empty trace\n", ) - endtime = trace_starttime stats.starttime = trace_starttime stats.delta = delta # Calculate number of valid samples up to or before endtime length = int((endtime - trace_starttime) / delta) - stats.npts = length + 1 + stats.npts = max(0, length + 1) data = numpy.full(stats.npts, numpy.nan, dtype=numpy.float64) return Trace(data, stats) @@ -82,7 +88,10 @@ def encode_stream(stream: Stream, encoding: str) -> Stream: for trace in stream: trace_out = trace.copy() if trace_out.data.dtype != encoding: - trace_out.data = trace_out.data.astype(encoding) + if "STEIM" in encoding.upper(): + trace_out.data = trace_out.data.astype(numpy.int32) + else: + trace_out.data = trace_out.data.astype(encoding) if "mseed" in trace_out.stats: trace_out.stats.mseed.encoding = encoding.upper() out_stream += trace_out @@ -567,8 +576,9 @@ def pad_and_trim_trace(trace, starttime, endtime): if trace_endtime > endtime: # trim to endtime, at least 1 sample to remove cnt = int(math.ceil(round((trace_endtime - endtime) / trace_delta, 6))) - trace.data = trace.data[:-cnt] - trace.stats.npts = len(trace.data) + if cnt > 0: + trace.data = trace.data[:-cnt] + trace.stats.npts = len(trace.data) elif trace_endtime < endtime: # pad to endtime cnt = int(round((endtime - trace_endtime) / trace.stats.delta, 6)) diff --git a/geomagio/api/ws/data.py b/geomagio/api/ws/data.py index c2bc92274c2fca1a9df5b914f81f63fb362ec65f..47971b277fa5fc801622678cd2547c0dedba3e30 100644 --- a/geomagio/api/ws/data.py +++ b/geomagio/api/ws/data.py @@ -40,16 +40,17 @@ def get_data_factory( factory = FDSNFactory(network=observatory.network, locationCode="40") elif sampling_period in [ SamplingPeriod.TEN_HERTZ, - SamplingPeriod.HOUR, - SamplingPeriod.DAY, ]: + # MiniSeedFactory required for floating point data; + # MiniSeedFactory advised for 10 Hz sampling in general factory = MiniSeedFactory( - host=host, port=int(os.getenv("DATA_MINISEED_PORT", "2061")) - ) - elif sampling_period in [SamplingPeriod.SECOND, SamplingPeriod.MINUTE]: - factory = EdgeFactory( - host=host, port=int(os.getenv("DATA_EARTHWORM_PORT", "2060")) + host=host, + port=os.getenv("DATA_MINISEED_PORT", None), + convert_channels=["U", "V", "W"], # no channel mapping (e.g., "H"->"U") ) + elif sampling_period in list(SamplingPeriod): + # EdgeFactory required for real time data with long sample periods + factory = EdgeFactory(host=host, port=os.getenv("DATA_EARTHWORM_PORT", None)) else: return None return DerivedTimeseriesFactory(factory) diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index f46119396e5e6948a4de2612fe55c1d60f79bb3f..16235edb573703a67c4f2d4fc39a4e2317c04f04 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -16,11 +16,12 @@ from typing import List, Optional import numpy import numpy.ma -from obspy import Stream, Trace, UTCDateTime -from obspy.clients import earthworm, neic +from obspy.core import Stats, Stream, Trace, UTCDateTime +from obspy.clients import earthworm from .. import ChannelConverter, TimeseriesUtility from ..geomag_types import DataInterval, DataType +from ..metadata.instrument.InstrumentCalibrations import get_instrument_calibrations from ..TimeseriesFactory import TimeseriesFactory from ..TimeseriesFactoryException import TimeseriesFactoryException from ..ObservatoryMetadata import ObservatoryMetadata @@ -37,9 +38,11 @@ class EdgeFactory(TimeseriesFactory): host: str a string representing the IP number of the host to connect to. port: integer - the port number the waveserver is listening on. + port number on which EdgeCWB's CWB waveserver (CWBWS, obspy's + "earthworm" client) listens for requests to retrieve timeseries data. write_port: integer - the port number the client is writing to. + the port number on which EdgeCWB's RawInputServer is listening for + requests to write timeseries data. tag: str A tag used by edge to log and associate a socket with a given data source @@ -54,7 +57,7 @@ class EdgeFactory(TimeseriesFactory): others are passed through, see #_get_edge_channel(). type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type - interval: {'second', 'minute', 'hour', 'day'} + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} data interval observatoryMetadata: ObservatoryMetadata object an ObservatoryMetadata object used to replace the default @@ -62,13 +65,18 @@ class EdgeFactory(TimeseriesFactory): locationCode: str the location code for the given edge server, overrides type in get_timeseries/put_timeseries - scaleFactor: int - all data written to edge (via raw input client) will be scaled - by this integer prior to write; all data read from edge will be - will be divided by this integer after read; default = 1000 - snclMapper: {'geomag','legacy'} - a mapper of common channel names to SEED SNCL codes (that is, + convert_channels: array + list of channels to convert from volt/bin to nT + scale_factor: float + override default scalings when reading/writing miniseed blocks; + (reading integer blocks divides by 1000; reading floating point + blocks divides by 1; writing all data multiplies by 1000) + default = None + sncl_mode: {'geomag','legacy'} + force mode to convert common names to SEED SNCL codes (that is, station, network, channel, location codes); default = legacy + timeout: float + timeout for Earthworm client; default=10 See Also -------- @@ -76,17 +84,19 @@ class EdgeFactory(TimeseriesFactory): Notes ----- - This is designed to read from any earthworm style waveserver, but it - currently only writes to an edge. Edge mimics an earthworm style - waveserver close enough that we hope to maintain that compatibility - for reading. + The EdgeFactory reads so-called trace-bufs and writes integer data + to an EdgeCWB RawInputServer, which places these data, sample-by-sample, + into a RAM buffer where it can be immediately retrieved, even before + full miniseed blocks can be constructed and written to disk. The + EdgeFactory cannot handle non-integer data, and is inefficient for + larger data transfers. In those cases, consider using MiniSeedFactory. """ def __init__( self, - host: str = "edgecwb.usgs.gov", - port: int = 2060, - write_port: int = 7981, + host: Optional[str] = None, + port: Optional[int] = None, + write_port: Optional[int] = None, tag: str = "GeomagAlg", forceout: bool = False, observatory: Optional[str] = None, @@ -95,31 +105,29 @@ class EdgeFactory(TimeseriesFactory): interval: Optional[DataInterval] = None, observatoryMetadata: Optional[ObservatoryMetadata] = None, locationCode: Optional[str] = None, - scaleFactor: int = 1000, - snclMapper: str = "legacy", + convert_channels: Optional[List[str]] = None, + scale_factor: Optional[int] = None, + sncl_mode: Optional[str] = "legacy", + timeout: Optional[float] = None, ): TimeseriesFactory.__init__(self, observatory, channels, type, interval) - if port == 2060: - # earthworm (phasing this out gradually) - self.client = earthworm.Client(host, port) - else: - # CWBQuery/miniseed protocol (preferred) - self.client = neic.Client(host, port) - self.host = host - self.port = port - self.write_port = write_port + self.host = host or "edgecwb.usgs.gov" + self.port = port or [2060] + self.write_port = write_port # no default write port self.tag = tag self.forceout = forceout - self.interval = interval self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata() self.locationCode = locationCode - self.scaleFactor = scaleFactor - if snclMapper == "legacy": + self.convert_channels = convert_channels or [] + self.scale_factor = scale_factor + self.sncl_mode = sncl_mode + self.timeout = timeout or 10 + if sncl_mode == "legacy": self.get_sncl = LegacySNCL.get_sncl - elif snclMapper == "geomag": + elif sncl_mode == "geomag": self.get_sncl = SNCL.get_sncl else: - raise TimeseriesFactoryException("Unrecognized SNCL mapper") + raise TimeseriesFactoryException("Unrecognized SNCL mode") def get_timeseries( self, @@ -145,7 +153,7 @@ class EdgeFactory(TimeseriesFactory): list of channels to load type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type - interval: {'second', 'minute', 'hour', 'day'} + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} data interval add_empty_channels: bool if True, returns channels without data as empty traces @@ -179,17 +187,22 @@ class EdgeFactory(TimeseriesFactory): # get the timeseries timeseries = Stream() for channel in channels: - data = self._get_timeseries( - starttime, - endtime, - observatory, - channel, - type, - interval, - add_empty_channels, - ) - if len(data) == 0: - continue + if channel in self.convert_channels: + data = self._convert_timeseries( + starttime, endtime, observatory, channel, type, interval + ) + else: + data = self._get_timeseries( + starttime, + endtime, + observatory, + channel, + type, + interval, + add_empty_channels, + ) + if len(data) == 0: + continue timeseries += data finally: # restore stdout @@ -220,7 +233,7 @@ class EdgeFactory(TimeseriesFactory): list of channels to load type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type - interval: {'second', 'minute', 'hour', 'day'} + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} data interval Notes @@ -247,46 +260,84 @@ class EdgeFactory(TimeseriesFactory): ) for channel in channels: self._put_channel( - timeseries, observatory, channel, type, interval, starttime, endtime + timeseries.select(channel=channel), + observatory, + channel, + type, + interval, + starttime, + endtime, ) - def _convert_timeseries_to_decimal(self, stream: Stream): - """convert geomag edge timeseries data stored as ints, to decimal by - dividing by 1000.00 - Parameters - ---------- - stream: Stream - a stream retrieved from a geomag edge representing one channel - Notes - ----- - This routine changes the values in the timeseries. The user should - make a copy before calling if they don't want that side effect. - """ - for trace in stream: - trace.data = numpy.divide(trace.data, self.scaleFactor) - - def _convert_trace_to_int(self, trace_in: Trace) -> Trace: - """convert geomag edge traces stored as decimal, to ints by multiplying - by 1000 + def get_calculated_timeseries( + self, + starttime: UTCDateTime, + endtime: UTCDateTime, + observatory: str, + channel: str, + type: DataType, + interval: DataInterval, + components: List[dict], + ) -> Trace: + """Calculate a single channel using multiple component channels. Parameters ---------- - trace: Trace - a trace retrieved from a geomag edge representing one channel + starttime: UTCDateTime + the starttime of the requested data + endtime: UTCDateTime + the endtime of the requested data + observatory: str + observatory code + channel: str + single character channel {H, E, D, Z, F} + type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} + data type + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} + data interval + components: list + each component is a dictionary with the following keys: + channel: str + offset: float + scale: float + Returns ------- - trace: Trace - a trace converted to ints - Notes - ----- - this doesn't work on ndarray with nan's in it. - the trace must be a masked array. + out: Trace + timeseries trace of the converted channel data """ - trace = trace_in.copy() - trace.data = numpy.multiply(trace.data, self.scaleFactor) - trace.data = trace.data.astype(int) - - return trace + # sum channels + stats = None + converted = None + for component in components: + # load component + data = self._get_timeseries( + starttime, endtime, observatory, component["channel"], type, interval + )[0] + # convert to nT + nt = data.data * component["scale"] + component["offset"] + # add to converted + if converted is None: + converted = nt + stats = Stats(data.stats) + else: + converted += nt + # set channel parameter to U, V, or W + stats.channel = channel + # create empty trace with adapted stats + out = TimeseriesUtility.create_empty_trace( + stats.starttime, + stats.endtime, + stats.station, + stats.channel, + stats.data_type, + stats.data_interval, + stats.network, + stats.station, + stats.location, + ) + out.data = converted + return out def _convert_stream_to_masked(self, timeseries: Stream, channel: str) -> Stream: """convert geomag edge traces in a timeseries stream to a MaskedArray @@ -331,7 +382,7 @@ class EdgeFactory(TimeseriesFactory): single character channel {H, E, D, Z, F} type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type - interval: {'second', 'minute', 'hour', 'day'} + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} data interval add_empty_channels: bool if True, returns channels without data as empty traces @@ -352,23 +403,75 @@ class EdgeFactory(TimeseriesFactory): # according to its author, EdgeCWB is inclusive of starttime, but exclusive of # endtime, to satisfy seismic standards/requirements, to precision delta/2; half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2 + + # list-type ports variable needed for fail-back logic try: - data = self.client.get_waveforms( - sncl.network, - sncl.station, - sncl.location, - sncl.channel, - starttime, - endtime + half_delta, - ) + ports = list(self.port) except TypeError: - # get_waveforms() fails if no data is returned from Edge - data = Stream() + ports = [self.port] + + data = Stream() + for port in ports: + try: + client = earthworm.Client(self.host, port, timeout=self.timeout) + + data += client.get_waveforms( + sncl.network, + sncl.station, + sncl.location, + sncl.channel, + starttime, + endtime + half_delta, + ) + + if data: + # if data was returned, force this port in subsequent calls + # to get_timeseries() that use this instance of EdgeFactory + self.port = [port] + break + print("No data returned from ", self.host, " on port ", port) + # try alternate port(s) if provided + except Exception as e: + print("Failed to get data from ", self.host, " on port ", port) + print("Ignoring error: ", e) + # try alternate port(s) if provided + continue - # make sure data is 32bit int for trace in data: - trace.data = trace.data.astype("i4") - data.merge() + if trace.data.dtype.kind == "i": + # convert all integer traces to rescaled 64-bit floats; + if sncl.channel[1] == "E": + # instrumental voltages are stored as 1/10 microvolts + trace.data = trace.data.astype(float) / (self.scale_factor or 1e7) + else: + # everything else (mostly magnetics stored as picoteslas) + trace.data = trace.data.astype(float) / (self.scale_factor or 1e3) + elif trace.data.dtype.kind == "f": + # convert all float traces to 64-bit floats; + trace.data = trace.data.astype(float) / (self.scale_factor or 1.0) + + # when Traces with identical NSCL codes overlap, prioritize samples + # that come "later" in Stream; this depends on Edge returning miniseed + # packets in the order written + # NOTE: this is not possible with single calls to Stream.merge() + st_tmp = Stream() + for tr in data: + try: + # add tr to temporary stream + st_tmp += tr + # replace time overlaps with gaps + st_tmp.merge(0) + # add tr to temporary stream again + st_tmp += tr + # replace gaps with tr's data + st_tmp.merge(0) + except Exception as e: + tr = st_tmp.pop() # remove bad trace + print("Dropped trace: ", tr) + print("Ignoring error: ", e) + + # point `data` to the new stream and continue processing + data = st_tmp if data.count() == 0 and add_empty_channels: data += self._get_empty_trace( starttime=starttime, @@ -383,6 +486,84 @@ class EdgeFactory(TimeseriesFactory): self._set_metadata(data, observatory, channel, type, interval) return data + def _convert_timeseries( + self, + starttime: UTCDateTime, + endtime: UTCDateTime, + observatory: str, + channel: str, + type: DataType, + interval: DataInterval, + ) -> Stream: + """Generate a single channel using multiple components. + + Finds metadata, then calls _get_converted_timeseries for actual + conversion. + + Parameters + ---------- + starttime: UTCDateTime + the starttime of the requested data + endtime: UTCDateTime + the endtime of the requested data + observatory : str + observatory code + channel : str + single character channel {H, E, D, Z, F} + type : {'adjusted', 'definitive', 'quasi-definitive', 'variation'} + data type + interval : {'tenhertz', 'second', 'minute', 'hour', 'day'} + data interval + + Returns + ------- + out: Stream + timeseries with single trace of the requested channel data + NOTE: this originally returned a Trace, but was modified + (probably due to misunderstanding) at some point to + return a Stream. This Stream is, however, expected + to contain just a single Trace. + """ + out = Stream() + metadata = get_instrument_calibrations(observatory, starttime, endtime) + # loop in case request spans different configurations + for entry in metadata: + entry_endtime = entry["end_time"] + entry_starttime = entry["start_time"] + instrument = entry["instrument"] + instrument_channels = instrument["channels"] + if channel not in instrument_channels: + # no idea how to convert + continue + # determine metadata overlap with request + start = ( + starttime + if entry_starttime is None or entry_starttime < starttime + else entry_starttime + ) + end = ( + endtime + if entry_endtime is None or entry_endtime > endtime + else entry_endtime + ) + # now convert + out += self.get_calculated_timeseries( + start, + end, + observatory, + channel, + type, + interval, + instrument_channels[channel], + ) + # force to match the first trace's datatype + for trace in out: + trace.data = trace.data.astype(out[0].data.dtype) + # merge to force a single trace + # (precedence given to later inteval when metadata overlap) + out.merge(1) + return out + def _post_process( self, timeseries: Stream, @@ -391,7 +572,7 @@ class EdgeFactory(TimeseriesFactory): channels: List[str], ): """Post process a timeseries stream after the raw data is - is fetched from a waveserver. Specifically changes + is fetched from waveserver. Specifically changes any MaskedArray to a ndarray with nans representing gaps. Then calls pad_timeseries to deal with gaps at the beggining or end of the streams. @@ -409,7 +590,6 @@ class EdgeFactory(TimeseriesFactory): Notes: the original timeseries object is changed. """ - self._convert_timeseries_to_decimal(timeseries) for trace in timeseries: if isinstance(trace.data, numpy.ma.MaskedArray): trace.data.set_fill_value(numpy.nan) @@ -443,14 +623,10 @@ class EdgeFactory(TimeseriesFactory): channel to load type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type - interval: {'second', 'minute', 'hour', 'day'} + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} data interval starttime: UTCDateTime endtime: UTCDateTime - - Notes - ----- - RawInputClient seems to only work when sockets are """ sncl = self.get_sncl( station=observatory, @@ -460,34 +636,39 @@ class EdgeFactory(TimeseriesFactory): location=self.locationCode, ) - ric = RawInputClient( - self.tag, - self.host, - self.write_port, - sncl.station, - sncl.channel, - sncl.location, - sncl.network, + stream_masked = self._convert_stream_to_masked( + timeseries=timeseries, channel=channel ) + stream_split = stream_masked.split() - stream = self._convert_stream_to_masked(timeseries=timeseries, channel=channel) - - # Make certain there's actually data - if not numpy.ma.any(stream.select(channel=channel)[0].data): - return - - for trace in stream.select(channel=channel).split(): + for trace in stream_split: trace_send = trace.copy() trace_send.trim(starttime, endtime) if channel == "D": trace_send.data = ChannelConverter.get_minutes_from_radians( trace_send.data ) - trace_send = self._convert_trace_to_int(trace_send) + if sncl.channel[1] == "E": + # instrumental voltages are stored as 1/10 microvolts + trace_send.data = trace_send.data * (self.scale_factor or 1e7) + else: + # everything else (mostly magnetics stored as picoteslas) + trace_send.data = trace_send.data * (self.scale_factor or 1e3) + + ric = RawInputClient( + self.tag, + self.host, + self.write_port, + sncl.station, + sncl.channel, + sncl.location, + sncl.network, + ) + trace_send.data = trace_send.data.astype(int) # ric requires ints ric.send_trace(interval, trace_send) - if self.forceout: - ric.forceout() - ric.close() + if self.forceout: + ric.forceout() + ric.close() def _set_metadata( self, @@ -506,7 +687,7 @@ class EdgeFactory(TimeseriesFactory): edge channel code {MVH, MVE, MVD, ...} type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type - interval: {'second', 'minute', 'hour', 'day'} + interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} data interval """ for trace in stream: diff --git a/geomagio/edge/LegacySNCL.py b/geomagio/edge/LegacySNCL.py index f80b438a602ce15ec0dc552ff10c7345701c0222..b0fdb39e82b3b2b7673f6d167e887c71890bdf0b 100644 --- a/geomagio/edge/LegacySNCL.py +++ b/geomagio/edge/LegacySNCL.py @@ -75,7 +75,9 @@ def _check_predefined_element(channel: str) -> Optional[str]: def _get_channel_start(interval: str) -> str: - if interval == "second": + if interval == "tenhertz": + return "T" + elif interval == "second": return "S" elif interval == "minute": return "M" diff --git a/geomagio/edge/MiniSeedFactory.py b/geomagio/edge/MiniSeedFactory.py index 94acf5407f8c02b604c4309961a74ef8dda4c8b2..862087efd6d99c1f83213d8cddca6d943eb18d85 100644 --- a/geomagio/edge/MiniSeedFactory.py +++ b/geomagio/edge/MiniSeedFactory.py @@ -17,7 +17,7 @@ from typing import List, Optional import numpy import numpy.ma from obspy import read -from obspy.clients.neic import client as miniseed +from obspy.clients import neic from obspy.core import Stats, Stream, Trace, UTCDateTime from .. import ChannelConverter, TimeseriesUtility @@ -28,6 +28,7 @@ from ..TimeseriesFactoryException import TimeseriesFactoryException from ..ObservatoryMetadata import ObservatoryMetadata from .MiniSeedInputClient import MiniSeedInputClient from .SNCL import SNCL +from .LegacySNCL import LegacySNCL class MiniSeedFactory(TimeseriesFactory): @@ -38,7 +39,11 @@ class MiniSeedFactory(TimeseriesFactory): host: str a string representing the IP number of the host to connect to. port: integer - the port number the miniseed query server is listening on. + port number on which EdgeCWB's queryserver (QS, obspy's "neic" client) + listens for requests to retrieve timeseries data. + write_port: integer + port number on which EdgeCWB's MiniSeedServer is listening for + requests to write timeseries data. observatory: str the observatory code for the desired observatory. channels: array @@ -57,6 +62,16 @@ class MiniSeedFactory(TimeseriesFactory): in get_timeseries/put_timeseries convert_channels: array list of channels to convert from volt/bin to nT + scale_factor: float + override default scalings when reading/writing miniseed blocks; + (reading integer blocks divides by 1000; reading floating point + blocks divides by 1; writing all data multiplies by 1000) + default = None + sncl_mode: {'geomag','legacy'} + force mode to convert common names to SEED SNCL codes (that is, + station, network, channel, location codes); default = legacy + timeout: float + timeout for NEIC client; default=10 See Also -------- @@ -64,17 +79,25 @@ class MiniSeedFactory(TimeseriesFactory): Notes ----- - This is designed to read from any earthworm style waveserver, but it - currently only writes to an edge. Edge mimics an earthworm style - waveserver close enough that we hope to maintain that compatibility - for reading. + The MiniSeedFactory reads and writes miniseed blocks. This allows it + to be used to transfer non-integer data. That said, this factory cannot + read from or write to EdgeCWB's RAM buffer, and so is not able to access + eally "real time" data. MiniSeedFactory should be used for large data + transfers, or to transfer data that cannot be handled by the default + EdgeFactory (that is, non-integer data, or data that has not yet been + written to the EdgeCWB disk store). + + Also, this factory allows one to write non-full miniseed blocks. This + is occasionally useful, but more often it results in wasted disk space + on the EdgeCWB server. """ def __init__( self, - host: str = "edgecwb.usgs.gov", - port: int = 2061, - write_port: int = 7974, + host: Optional[str] = None, + port: Optional[int] = None, + write_port: Optional[int] = None, + write_encoding: Optional[str] = None, observatory: Optional[str] = None, channels: Optional[List[str]] = None, type: Optional[DataType] = None, @@ -82,18 +105,31 @@ class MiniSeedFactory(TimeseriesFactory): observatoryMetadata: Optional[ObservatoryMetadata] = None, locationCode: Optional[str] = None, convert_channels: Optional[List[str]] = None, + scale_factor: Optional[int] = None, + sncl_mode: Optional[str] = "geomag", + timeout: Optional[float] = None, ): TimeseriesFactory.__init__(self, observatory, channels, type, interval) - - self.client = miniseed.Client(host, port) + self.host = host or "edgecwb.usgs.gov" + self.port = port or [22061, 2061] + self.write_port = write_port # no default write port + self.write_encoding = write_encoding or "float32" self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata() self.locationCode = locationCode - self.interval = interval - self.host = host - self.port = port - self.write_port = write_port self.convert_channels = convert_channels or [] - self.write_client = MiniSeedInputClient(self.host, self.write_port) + self.scale_factor = scale_factor + self.sncl_mode = sncl_mode + self.timeout = timeout or 10 + if sncl_mode == "legacy": + self.get_sncl = LegacySNCL.get_sncl + elif sncl_mode == "geomag": + self.get_sncl = SNCL.get_sncl + else: + raise TimeseriesFactoryException("Unrecognized SNCL mode") + + # instantiate clients immediately; allows mock clients if/when needed + self.client = neic.Client(self.host) + self.write_client = MiniSeedInputClient(self.host) def get_timeseries( self, @@ -110,19 +146,19 @@ class MiniSeedFactory(TimeseriesFactory): Parameters ---------- starttime: UTCDateTime - time of first sample + time of first sample. endtime: UTCDateTime - time of last sample - add_empty_channels: bool - if True, returns channels without data as empty traces + time of last sample. observatory: str - observatory code + observatory code. channels: array list of channels to load type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} data interval + add_empty_channels: bool + if True, returns channels without data as empty traces Returns ------- @@ -226,7 +262,13 @@ class MiniSeedFactory(TimeseriesFactory): ) for channel in channels: self._put_channel( - timeseries, observatory, channel, type, interval, starttime, endtime + timeseries.select(channel=channel), + observatory, + channel, + type, + interval, + starttime, + endtime, ) # close socket self.write_client.close() @@ -345,7 +387,7 @@ class MiniSeedFactory(TimeseriesFactory): type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type interval: {'tenhertz', 'second', 'minute', 'hour', 'day'} - interval length + data interval add_empty_channels: bool if True, returns channels without data as empty traces @@ -365,31 +407,75 @@ class MiniSeedFactory(TimeseriesFactory): # according to its author, EdgeCWB is inclusive of starttime, but exclusive of # endtime, to satisfy seismic standards/requirements, to precision delta/2; half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2 - data = self.client.get_waveforms( - sncl.network, - sncl.station, - sncl.location, - sncl.channel, - starttime, - endtime + half_delta, - ) - for trace in data: - trace.data = trace.data.astype(data[0].data.dtype) - # handle overlapping samples in Traces with identical NSCL codes by - # prioritizing samples from those Traces in the Stream `data` that come - # later in sequence; these will most likely be the most recent versions - # (this is not possible with single calls to Stream.merge()) + # list-type ports variable needed for fail-back logic + try: + ports = list(self.port) + except TypeError: + ports = [self.port] + + data = Stream() + for port in ports: + try: + + self.client.port = port + self.client.timeout = self.timeout + + data += self.client.get_waveforms( + sncl.network, + sncl.station, + sncl.location, + sncl.channel, + starttime, + endtime + half_delta, + ) + + if data: + # if data was returned, force this port in subsequent calls + # to get_timeseries() that use this instance of EdgeFactory + self.port = [port] + break + print("No data returned from ", self.host, " on port ", port) + # try alternate port(s) if provided + except Exception as e: + print("Failed to get data from ", self.host, " on port ", port) + print("Ignoring error: ", e) + # try alternate port(s) if provided + continue + + for trace in data: + if trace.data.dtype.kind == "i": + # convert all integer traces to rescaled 64-bit floats; + if sncl.channel[1] == "E": + # instrumental voltages are stored as 1/10 microvolts + trace.data = trace.data.astype(float) / (self.scale_factor or 1e7) + else: + # everything else (mostly magnetics stored as picoteslas) + trace.data = trace.data.astype(float) / (self.scale_factor or 1e3) + elif trace.data.dtype.kind == "f": + # convert all float traces to 64-bit floats + trace.data = trace.data.astype(float) + + # when Traces with identical NSCL codes overlap, prioritize samples + # that come "later" in Stream; this depends on Edge returning miniseed + # packets in the order written + # NOTE: this is not possible with single calls to Stream.merge() st_tmp = Stream() for tr in data: - # add tr to temporary stream - st_tmp += tr - # replace time overlaps with gaps - st_tmp.merge(0) - # add tr to temporary stream again - st_tmp += tr - # replace gaps with tr's data - st_tmp.merge(0) + try: + # add tr to temporary stream + st_tmp += tr + # replace time overlaps with gaps + st_tmp.merge(0) + # add tr to temporary stream again + st_tmp += tr + # replace gaps with tr's data + st_tmp.merge(0) + except Exception as e: + tr = st_tmp.pop() # remove bad trace + print("Dropped trace: ", tr) + print("Ignoring error: ", e) + # point `data` to the new stream and continue processing data = st_tmp @@ -497,7 +583,7 @@ class MiniSeedFactory(TimeseriesFactory): channels: List[str], ): """Post process a timeseries stream after the raw data is - is fetched from querymom. Specifically changes + is fetched from queryserver. Specifically changes any MaskedArray to a ndarray with nans representing gaps. Then calls pad_timeseries to deal with gaps at the beggining or end of the streams. @@ -553,26 +639,47 @@ class MiniSeedFactory(TimeseriesFactory): starttime: UTCDateTime endtime: UTCDateTime """ - # use separate traces when there are gaps - to_write = timeseries.select(channel=channel) - to_write = TimeseriesUtility.mask_stream(to_write) - to_write = to_write.split() - to_write = TimeseriesUtility.unmask_stream(to_write) - # relabel channels from internal to edge conventions - sncl = SNCL.get_sncl( + sncl = self.get_sncl( station=observatory, data_type=type, interval=interval, element=channel, location=self.locationCode, ) - for trace in to_write: - trace.stats.station = sncl.station - trace.stats.location = sncl.location - trace.stats.network = sncl.network - trace.stats.channel = sncl.channel - # finally, send to edge - self.write_client.send(to_write) + + stream_masked = self._convert_stream_to_masked( + timeseries=timeseries, channel=channel + ) + stream_send = Stream() + for trace in stream_masked.split(): + trace_send = trace.copy() + trace_send.trim(starttime, endtime) + if channel == "D": + trace_send.data = ChannelConverter.get_minutes_from_radians( + trace_send.data + ) + + if "float" not in self.write_encoding: + # if write_encoding is a float, ignore scale_factor + if sncl.channel[1] == "E": + # instrumental voltages are stored as 1/10 microvolts + trace_send.data = trace_send.data * (self.scale_factor or 1e7) + else: + # everything else (mostly magnetics stored as picoteslas) + trace_send.data = trace_send.data * (self.scale_factor or 1e3) + + trace_send.stats.station = sncl.station + trace_send.stats.location = sncl.location + trace_send.stats.network = sncl.network + trace_send.stats.channel = sncl.channel + stream_send += trace_send + # FIXME: MiniSeedInputClient sends a stream, while + # RawInputClient sends a trace; these write + # clients should be re-written for consistency + self.write_client.host = self.host + self.write_client.port = self.write_port + self.write_client.encoding = self.write_encoding + self.write_client.send(stream_send) def _set_metadata( self, diff --git a/geomagio/edge/RawInputClient.py b/geomagio/edge/RawInputClient.py index 55eae89c95affb323dad9951a32e7c128401b26d..649fa933f536d50610718f707429bf1fec418f8f 100644 --- a/geomagio/edge/RawInputClient.py +++ b/geomagio/edge/RawInputClient.py @@ -14,13 +14,16 @@ from time import sleep """ MAXINPUTSIZE: Edge uses a short for the data count, so the max input size - for a single data packet to edge is 32767 + for a single data packet to edge is *supposed* to be 32767. However, this + was never tested thoroughly until we added support for 10Hz data, when + it was quickly discovered that setting this value caused Edge to hang. + Therefore, MAXINPUTSIZE was reduced to 10_000, which seems to work well. HOURSECONDS: The number of seconds in an hour. Used as an arbitrary size for sending seconds data. DAYMINUTES: The numbers of minutes in a day. Used as the size for sending minute data, since Edge stores by the day. """ -MAXINPUTSIZE = 32767 +MAXINPUTSIZE = 10_000 HOURSECONDS = 3600 DAYMINUTES = 1440 @@ -161,7 +164,7 @@ class RawInputClient: PARAMETERS ---------- - interval: {'day', 'hour', 'minute', 'second'} + interval: {'day', 'hour', 'minute', 'second', 'tenhertz'} data interval. trace: obspy.core.trace @@ -174,7 +177,11 @@ class RawInputClient: totalsamps = len(trace.data) starttime = trace.stats.starttime - if interval == "second": + if interval == "tenhertz": + nsamp = MAXINPUTSIZE + timeoffset = 1 / 10.0 + samplerate = 10.0 + elif interval == "second": nsamp = HOURSECONDS timeoffset = 1 samplerate = 1.0 @@ -360,7 +367,7 @@ class RawInputClient: secs, usecs, self.sequence, - *samples + *samples, ) return buf @@ -377,16 +384,30 @@ class RawInputClient: tuple: (ratemantissa, ratedivosor) ratemantissa: int ratedivosor: int + + NOTE that `ratemantissa` and `ratedivisor` map directly to the SEED + standard's "sample rate factor" and "sample rate multiplier", which + are stored in a miniseed block's fixed data header as 2-byte words; + earlier versions of this method did not handle sample rates lower + than 1/second properly (they did handle 1/minute as a special-case); """ if rate > 0.9999: + # sample rates greater or equal to 1/second + # (this will not handle >327 Hz, but for consistency + # with earlier versions, we leave it unchanged) ratemantissa = int(rate * 100 + 0.001) ratedivisor = -100 - elif rate * 60.0 - 1.0 < 0.00000001: # one minute data - ratemantissa = -60 - ratedivisor = 1 + elif rate * 21600 > 1.0001: + # sample periods between 1 second and 6 hours + # (this should handle 1-minute data identically to + # earlier versions) + ratemantissa = -int(1 / rate + 0.001) + ratedivisor = -1 else: - ratemantissa = int(rate * 10000.0 + 0.001) - ratedivisor = -10000 + # sample periods 6 hours and longer + # (this will not handle periods longer than ~22 years) + ratemantissa = -int(1 / rate / 21600) + ratedivisor = -21600 return (ratemantissa, ratedivisor) diff --git a/geomagio/processing/filters.py b/geomagio/processing/filters.py index e69d762e62ebd4abb95cf4fc7965d8a5886d36ef..2408ffa1cce73f3bbc27e990742d20d37e80d467 100644 --- a/geomagio/processing/filters.py +++ b/geomagio/processing/filters.py @@ -7,7 +7,7 @@ from ..algorithm import Algorithm, FilterAlgorithm from ..Controller import Controller, get_realtime_interval from ..geomag_types import DataInterval from ..TimeseriesFactory import TimeseriesFactory -from .factory import get_edge_factory, get_miniseed_factory +from ..edge import EdgeFactory, MiniSeedFactory class DataFormat(str, Enum): @@ -29,33 +29,57 @@ def main(): def day_command( observatory: str = Argument(None, help="observatory id"), input_host: str = Option("127.0.0.1", help="host to request data from"), - input_port: int = Option(None, help="port to retrieve data through"), - output_port: int = Option(None, help="port to write data through"), output_host: str = Option("127.0.0.1", help="host to write data to"), + input_port: int = Option(None, help="port to pull inputs from on input host"), + output_read_port: int = Option(None, help="port to check for gaps on output host"), + output_port: int = Option(None, help="port to write to on output host"), realtime_interval: int = Option( 604800, help="length of update window (in seconds)" ), - update_limit: int = Option(7, help="number of update windows"), + update_limit: int = Option(4, help="number of update windows"), ): day_filter( observatory=observatory, - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - output_factory=get_miniseed_factory(host=output_host, output_port=output_port), + channels=("U", "V", "W", "F", "T1", "T2", "T3", "T4"), + input_factory=EdgeFactory( + host=input_host, + port=input_port, + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, + write_port=output_port, + type="variation", + sncl_mode="geomag", + ), realtime_interval=realtime_interval, update_limit=update_limit, ) - temperature_filter( + # remove the following after data migration is complete + _copy_channels( observatory=observatory, channels=( - ("UK1", "PK1"), - ("UK2", "PK2"), - ("UK3", "PK3"), - ("UK4", "PK4"), + ("U", "H"), + ("V", "E"), + ("W", "Z"), + ("F", "F"), + ), + interval="day", + input_factory=EdgeFactory( + host=input_host, + port=input_port, + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, + write_port=7981, # hard-coded rawinput until migration, then remove + type="variation", + sncl_mode="legacy", ), - input_factory=get_edge_factory(host=input_host, input_port=input_port), - input_interval="minute", - output_factory=get_miniseed_factory(host=output_host, output_port=output_port), - output_interval="day", realtime_interval=realtime_interval, update_limit=update_limit, ) @@ -69,30 +93,54 @@ def hour_command( observatory: str = Argument(None, help="observatory id"), input_host: str = Option("127.0.0.1", help="host to request data from"), output_host: str = Option("127.0.0.1", help="host to write data to"), - input_port: int = Option(None, help="port to retrieve data through"), - output_port: int = Option(None, help="port to write data through"), + input_port: int = Option(None, help="port to pull inputs from on input host"), + output_read_port: int = Option(None, help="port to check for gaps on output host"), + output_port: int = Option(None, help="port to write to on output host"), realtime_interval: int = Option(86400, help="length of update window (in seconds)"), - update_limit: int = Option(24, help="number of update windows"), + update_limit: int = Option(7, help="number of update windows"), ): hour_filter( observatory=observatory, - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - output_factory=get_miniseed_factory(host=output_host, output_port=output_port), + channels=("U", "V", "W", "F", "T1", "T2", "T3", "T4"), + input_factory=EdgeFactory( + host=input_host, + port=input_port, + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, + write_port=output_port, + type="variation", + sncl_mode="geomag", + ), realtime_interval=realtime_interval, update_limit=update_limit, ) - temperature_filter( + # remove the following after data migration is complete + _copy_channels( observatory=observatory, channels=( - ("UK1", "RK1"), - ("UK2", "RK2"), - ("UK3", "RK3"), - ("UK4", "RK4"), + ("U", "H"), + ("V", "E"), + ("W", "Z"), + ("F", "F"), + ), + interval="hour", + input_factory=EdgeFactory( + host=input_host, + port=input_port, + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, + write_port=7981, # hard-coded rawinput until migration, then remove + type="variation", + sncl_mode="legacy", ), - input_factory=get_edge_factory(host=input_host, input_port=input_port), - input_interval="minute", - output_factory=get_miniseed_factory(host=output_host, output_port=output_port), - output_interval="hour", realtime_interval=realtime_interval, update_limit=update_limit, ) @@ -104,13 +152,14 @@ def hour_command( help=""" ObsRIO: - Filters 10Hz U,V,W miniseed to 1 second miniseed + Filters 10Hz U,V,W to 1 second U,V,W - Filters 1 second U,V,W,F miniseed to 1 minute miniseed + Filters 1 second U,V,W,F to 1 minute U,V,W,F - Filters 1 second T1-4 miniseed to 1 minute UK1-4 legacy + Filters 1 second T1-4 to 1 minute UK1-4 - Copies 1 second and 1 minute U,V,W,F miniseed to H,E,Z,F earthworm + Copies 1 second and 1 minute U,V,W,F miniseed to legacy + H,E,Z,F PCDCP: @@ -124,8 +173,9 @@ def realtime_command( observatory: str = Argument(None, help="observatory id"), input_host: str = Option("127.0.0.1", help="host to request data from"), output_host: str = Option("127.0.0.1", help="host to write data to"), - input_port: int = Option(None, help="port to retrieve data through"), - output_port: int = Option(None, help="port to write data through"), + input_port: int = Option(None, help="port to pull inputs from on input host"), + output_read_port: int = Option(None, help="port to check for gaps on output host"), + output_port: int = Option(None, help="port to write to on output host"), data_format: DataFormat = Option(DataFormat.PCDCP, help="Data acquisition system"), realtime_interval: int = Option(600, help="length of update window (in seconds)"), update_limit: int = Option(10, help="number of update windows"), @@ -133,15 +183,56 @@ def realtime_command( if data_format == DataFormat.OBSRIO: second_filter( observatory=observatory, - input_factory=get_miniseed_factory( - host=input_host, convert_channels=("U", "V", "W"), input_port=input_port + input_factory=MiniSeedFactory( + host=input_host, + port=None, # use MiniSeedFactory default + convert_channels=("U", "V", "W"), + type="variation", + sncl_mode="geomag", ), - output_factory=get_miniseed_factory( - host=output_host, output_port=output_port + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, + write_port=output_port, + type="variation", + sncl_mode="geomag", ), realtime_interval=realtime_interval, update_limit=update_limit, ) + _copy_channels( + # copy 1-sec ObsRIO channels + # NOTE: yes, this creates redundant data; however... + # - it is compressed + # - it is integer, so readable by EdgeFactory + # - it provides more "permanent" data, since the + # ObsRIO raw data *may* eventually get pruned + observatory=observatory, + channels=( + ("LFF", "LFF"), + ("LK1", "LK1"), + ("LK2", "LK2"), + ("LK3", "LK3"), + ("LK4", "LK4"), + ), + interval="second", + input_factory=MiniSeedFactory( + host=input_host, + port=None, # use MiniSeedFactory default + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, + write_port=output_port, + type="variation", + sncl_mode="geomag", + ), + realtime_interval=realtime_interval, + update_limit=update_limit, + ) + # remove the following after data migration is complete _copy_channels( observatory=observatory, channels=( @@ -151,27 +242,68 @@ def realtime_command( ("F", "F"), ), interval="second", - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - output_factory=get_edge_factory(host=output_host, output_port=output_port), + input_factory=EdgeFactory( + host=input_host, port=input_port, type="variation", sncl_mode="geomag" + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, + write_port=7981, # hard-code port for legacy sncls + type="variation", + sncl_mode="legacy", + ), realtime_interval=realtime_interval, update_limit=update_limit, ) - temperature_filter( + + minute_filter( + observatory=observatory, + channels=("U", "V", "W", "F", "T1", "T2", "T3", "T4"), + input_factory=EdgeFactory( + host=input_host, + port=input_port, # earthworm port required for realtime + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, # earthworm port required for realtime + write_port=output_port, # rawinput port required for realtime + type="variation", + sncl_mode="geomag", + ), + realtime_interval=realtime_interval, + update_limit=update_limit, + ) + # remove the following after data migration is complete + _copy_channels( observatory=observatory, channels=( - ("LK1", "UK1"), - ("LK2", "UK2"), - ("LK3", "UK3"), - ("LK4", "UK4"), + ("U", "H"), + ("V", "E"), + ("W", "Z"), + ("F", "F"), + ), + interval="minute", + input_factory=EdgeFactory( + host=input_host, + port=input_port, + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, + write_port=7981, # hard-coded rawinput until migration, then remove + type="variation", + sncl_mode="legacy", ), - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - input_interval="second", - output_factory=get_edge_factory(host=output_host, output_port=output_port), - output_interval="minute", realtime_interval=realtime_interval, update_limit=update_limit, ) + else: + # legacy PCDCP processing _copy_channels( observatory=observatory, channels=( @@ -181,22 +313,42 @@ def realtime_command( ("F", "F"), ), interval="second", - input_factory=get_edge_factory(host=input_host, input_port=input_port), - output_factory=get_miniseed_factory( - host=output_host, output_port=output_port + input_factory=EdgeFactory( + host=input_host, + port=input_port, + type="variation", + sncl_mode="legacy", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, + write_port=output_port, + type="variation", + sncl_mode="geomag", ), realtime_interval=realtime_interval, update_limit=update_limit, ) - minute_filter( - observatory=observatory, - channels=("U", "V", "W", "F"), - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - output_factory=get_miniseed_factory(host=output_host, output_port=output_port), - realtime_interval=realtime_interval, - update_limit=update_limit, - ) - if data_format == DataFormat.OBSRIO: + minute_filter( + observatory=observatory, + channels=("U", "V", "W", "F"), + input_factory=EdgeFactory( + host=input_host, + port=input_port, + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, + write_port=output_port, + type="variation", + sncl_mode="geomag", + ), + realtime_interval=realtime_interval, + update_limit=update_limit, + ) + # remove the following after data migration is complete _copy_channels( observatory=observatory, channels=( @@ -206,8 +358,19 @@ def realtime_command( ("F", "F"), ), interval="minute", - input_factory=get_miniseed_factory(host=input_host, input_port=input_port), - output_factory=get_edge_factory(host=output_host, output_port=output_port), + input_factory=EdgeFactory( + host=input_host, + port=input_port, + type="variation", + sncl_mode="geomag", + ), + output_factory=EdgeFactory( + host=output_host, + port=output_read_port, + write_port=7981, + type="variation", + sncl_mode="legacy", + ), realtime_interval=realtime_interval, update_limit=update_limit, ) @@ -215,11 +378,11 @@ def realtime_command( def day_filter( observatory: str, - channels: List[str] = ["U", "V", "W", "F"], - input_factory: Optional[TimeseriesFactory] = None, - output_factory: Optional[TimeseriesFactory] = None, - realtime_interval: int = 86400, - update_limit: int = 7, + channels: List[str], + input_factory: TimeseriesFactory, + output_factory: TimeseriesFactory, + realtime_interval: int, + update_limit: int, ): """Filter 1 second miniseed channels to 1 day @@ -238,11 +401,11 @@ def day_filter( update_limit: int number of update windows """ - starttime, endtime = get_realtime_interval(realtime_interval) + starttime, endtime = get_realtime_interval(realtime_interval, "day") controller = Controller( - inputFactory=input_factory or get_miniseed_factory(), + inputFactory=input_factory, inputInterval="minute", - outputFactory=output_factory or get_miniseed_factory(), + outputFactory=output_factory, outputInterval="day", ) for channel in channels: @@ -266,11 +429,11 @@ def day_filter( def hour_filter( observatory: str, - channels: List[str] = ["U", "V", "W", "F"], - input_factory: Optional[TimeseriesFactory] = None, - output_factory: Optional[TimeseriesFactory] = None, - realtime_interval: int = 600, - update_limit: int = 10, + channels: List[str], + input_factory: TimeseriesFactory, + output_factory: TimeseriesFactory, + realtime_interval: int, + update_limit: int, ): """Filter 1 minute miniseed channels to 1 hour @@ -289,11 +452,11 @@ def hour_filter( update_limit: int number of update windows """ - starttime, endtime = get_realtime_interval(realtime_interval) + starttime, endtime = get_realtime_interval(realtime_interval, "hour") controller = Controller( - inputFactory=input_factory or get_miniseed_factory(), + inputFactory=input_factory, inputInterval="minute", - outputFactory=output_factory or get_miniseed_factory(), + outputFactory=output_factory, outputInterval="hour", ) for channel in channels: @@ -317,9 +480,9 @@ def hour_filter( def minute_filter( observatory: str, - channels: List[str] = ["U", "V", "W", "F"], - input_factory: Optional[TimeseriesFactory] = None, - output_factory: Optional[TimeseriesFactory] = None, + channels: List[str], + input_factory: TimeseriesFactory, + output_factory: TimeseriesFactory, realtime_interval: int = 600, update_limit: int = 10, ): @@ -331,6 +494,8 @@ def minute_filter( observatory id channels: array list of channels to filter + type: str + data type (e.g., "variation", "adjusted", or "R0", "A0") input_factory: TimeseriesFactory factory to request data output_factory: TimeseriesFactory @@ -340,11 +505,11 @@ def minute_filter( update_limit: int number of update windows """ - starttime, endtime = get_realtime_interval(realtime_interval) + starttime, endtime = get_realtime_interval(realtime_interval, "minute") controller = Controller( - inputFactory=input_factory or get_miniseed_factory(), + inputFactory=input_factory, inputInterval="second", - outputFactory=output_factory or get_miniseed_factory(), + outputFactory=output_factory, outputInterval="minute", ) for channel in channels: @@ -368,6 +533,7 @@ def minute_filter( def second_filter( observatory: str, + channels: List[str] = ["U", "V", "W"], input_factory: Optional[TimeseriesFactory] = None, output_factory: Optional[TimeseriesFactory] = None, realtime_interval: int = 600, @@ -379,6 +545,8 @@ def second_filter( ----------- observatory: str observatory id + channels: array + list of channels to filter input_factory: TimeseriesFactory factory to request data output_factory: TimeseriesFactory @@ -388,15 +556,14 @@ def second_filter( update_limit: int number of update windows """ - starttime, endtime = get_realtime_interval(realtime_interval) + starttime, endtime = get_realtime_interval(realtime_interval, "second") controller = Controller( - inputFactory=input_factory - or get_miniseed_factory(convert_channels=("U", "V", "W")), + inputFactory=input_factory, inputInterval="tenhertz", - outputFactory=output_factory or get_miniseed_factory(), + outputFactory=output_factory, outputInterval="second", ) - for channel in ("U", "V", "W"): + for channel in channels: controller.run_as_update( algorithm=FilterAlgorithm( input_sample_period=0.1, @@ -418,26 +585,26 @@ def second_filter( def temperature_filter( observatory: str, channels: List[List[str]], - input_factory: Optional[TimeseriesFactory] = None, - input_interval: int = "second", - output_factory: Optional[TimeseriesFactory] = None, - output_interval: int = "minute", - realtime_interval: int = 600, - update_limit: int = 10, + input_factory: TimeseriesFactory, + input_interval: int, + output_factory: TimeseriesFactory, + output_interval: int, + realtime_interval: int, + update_limit: int, ): """Filter temperatures 1Hz miniseed (LK1-4) to 1 minute legacy (UK1-4).""" - starttime, endtime = get_realtime_interval(realtime_interval) + starttime, endtime = get_realtime_interval(realtime_interval, output_interval) controller = Controller( - inputFactory=input_factory or get_miniseed_factory(), + inputFactory=input_factory, inputInterval=input_interval, - outputFactory=output_factory or get_edge_factory(), + outputFactory=output_factory, outputInterval=output_interval, ) for input_channel, output_channel in channels: controller.run_as_update( algorithm=FilterAlgorithm( - input_sample_period=1, - output_sample_period=60, + input_sample_period=input_interval, + output_sample_period=output_interval, inchannels=(input_channel,), outchannels=(output_channel,), ), @@ -482,11 +649,11 @@ def _copy_channels( update_limit: int number of update windows """ - starttime, endtime = get_realtime_interval(interval_seconds=realtime_interval) + starttime, endtime = get_realtime_interval(realtime_interval, interval) controller = Controller( - inputFactory=input_factory or get_miniseed_factory(), + inputFactory=input_factory, inputInterval=interval, - outputFactory=output_factory or get_edge_factory(), + outputFactory=output_factory, outputInterval=interval, ) for input_channel, output_channel in channels: diff --git a/pyproject.toml b/pyproject.toml index 3e1d96aeba773e5cbd027adc5930caddaa0e8066..35b255a9aced14f689cb259b98fb82de35908a4a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ packages = [ {include = "geomagio" } ] repository="https://code.usgs.gov/ghsc/geomag/geomag-algorithms" -version = "1.12.3" +version = "1.13.1" [tool.poetry.dependencies] diff --git a/test/edge_test/RawInputClient_test.py b/test/edge_test/RawInputClient_test.py index ef21290aca7b15bb3784628999673fe8a30431bb..876bc5694d781cff4029853c05082dceb00997bd 100644 --- a/test/edge_test/RawInputClient_test.py +++ b/test/edge_test/RawInputClient_test.py @@ -51,7 +51,8 @@ def test_raw_input_client(): location=location, network=network, ) - trace_send = EdgeFactory()._convert_trace_to_int(trace.copy()) + trace_send = trace.copy() + trace_send.data = numpy.int32(trace_send.data * 1000) client.send_trace("minute", trace_send) # verify data was sent assert_equal(len(client.last_send), 1)