Skip to content
Snippets Groups Projects

Overhaul EdgeFactory and MiniSeedFactory for Edge data migration

1 file
+ 301
121
Compare changes
  • Side-by-side
  • Inline
  • 2702d9f4
    EdgeFactory Upgrades: · 2702d9f4
    Erin (Josh) Rigler authored
        - configurable scale_factor and float vs. int conversion logic
        - configurable sncl_mode for different channel mapping
        - multiple read ports allowed for fail-over logic
        - generally more (if not perfectly) compatible with MiniSeedFactory
+ 301
121
@@ -16,11 +16,12 @@ from typing import List, Optional
@@ -16,11 +16,12 @@ from typing import List, Optional
import numpy
import numpy
import numpy.ma
import numpy.ma
from obspy import Stream, Trace, UTCDateTime
from obspy.core import Stats, Stream, Trace, UTCDateTime
from obspy.clients import earthworm, neic
from obspy.clients import earthworm
from .. import ChannelConverter, TimeseriesUtility
from .. import ChannelConverter, TimeseriesUtility
from ..geomag_types import DataInterval, DataType
from ..geomag_types import DataInterval, DataType
 
from ..metadata.instrument.InstrumentCalibrations import get_instrument_calibrations
from ..TimeseriesFactory import TimeseriesFactory
from ..TimeseriesFactory import TimeseriesFactory
from ..TimeseriesFactoryException import TimeseriesFactoryException
from ..TimeseriesFactoryException import TimeseriesFactoryException
from ..ObservatoryMetadata import ObservatoryMetadata
from ..ObservatoryMetadata import ObservatoryMetadata
@@ -37,9 +38,11 @@ class EdgeFactory(TimeseriesFactory):
@@ -37,9 +38,11 @@ class EdgeFactory(TimeseriesFactory):
host: str
host: str
a string representing the IP number of the host to connect to.
a string representing the IP number of the host to connect to.
port: integer
port: integer
the port number the waveserver is listening on.
port number on which EdgeCWB's CWB waveserver (CWBWS, obspy's
 
"earthworm" client) listens for requests to retrieve timeseries data.
write_port: integer
write_port: integer
the port number the client is writing to.
the port number on which EdgeCWB's RawInputServer is listening for
 
requests to write timeseries data.
tag: str
tag: str
A tag used by edge to log and associate a socket with a given data
A tag used by edge to log and associate a socket with a given data
source
source
@@ -54,7 +57,7 @@ class EdgeFactory(TimeseriesFactory):
@@ -54,7 +57,7 @@ class EdgeFactory(TimeseriesFactory):
others are passed through, see #_get_edge_channel().
others are passed through, see #_get_edge_channel().
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
data type
interval: {'second', 'minute', 'hour', 'day'}
interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
data interval
data interval
observatoryMetadata: ObservatoryMetadata object
observatoryMetadata: ObservatoryMetadata object
an ObservatoryMetadata object used to replace the default
an ObservatoryMetadata object used to replace the default
@@ -62,12 +65,15 @@ class EdgeFactory(TimeseriesFactory):
@@ -62,12 +65,15 @@ class EdgeFactory(TimeseriesFactory):
locationCode: str
locationCode: str
the location code for the given edge server, overrides type
the location code for the given edge server, overrides type
in get_timeseries/put_timeseries
in get_timeseries/put_timeseries
scaleFactor: int
convert_channels: array
all data written to edge (via raw input client) will be scaled
list of channels to convert from volt/bin to nT
by this integer prior to write; all data read from edge will be
scale_factor: float
will be divided by this integer after read; default = 1000
override default scalings when reading/writing miniseed blocks;
snclMapper: {'geomag','legacy'}
(reading integer blocks divides by 1000; reading floating point
a mapper of common channel names to SEED SNCL codes (that is,
blocks divides by 1; writing all data multiplies by 1000)
 
default = None
 
sncl_mode: {'geomag','legacy'}
 
force mode to convert common names to SEED SNCL codes (that is,
station, network, channel, location codes); default = legacy
station, network, channel, location codes); default = legacy
See Also
See Also
@@ -76,17 +82,19 @@ class EdgeFactory(TimeseriesFactory):
@@ -76,17 +82,19 @@ class EdgeFactory(TimeseriesFactory):
Notes
Notes
-----
-----
This is designed to read from any earthworm style waveserver, but it
The EdgeFactory reads so-called trace-bufs and writes integer data
currently only writes to an edge. Edge mimics an earthworm style
to an EdgeCWB RawInputServer, which places these data, sample-by-sample,
waveserver close enough that we hope to maintain that compatibility
into a RAM buffer where it can be immediately retrieved, even before
for reading.
full miniseed blocks can be constructed and written to disk. The
 
EdgeFactory cannot handle non-integer data, and is inefficient for
 
larger data transfers. In those cases, consider using MiniSeedFactory.
"""
"""
def __init__(
def __init__(
self,
self,
host: str = "edgecwb.usgs.gov",
host: Optional[str] = None,
port: int = 2060,
port: Optional[int] = None,
write_port: int = 7981,
write_port: Optional[int] = None,
tag: str = "GeomagAlg",
tag: str = "GeomagAlg",
forceout: bool = False,
forceout: bool = False,
observatory: Optional[str] = None,
observatory: Optional[str] = None,
@@ -95,31 +103,27 @@ class EdgeFactory(TimeseriesFactory):
@@ -95,31 +103,27 @@ class EdgeFactory(TimeseriesFactory):
interval: Optional[DataInterval] = None,
interval: Optional[DataInterval] = None,
observatoryMetadata: Optional[ObservatoryMetadata] = None,
observatoryMetadata: Optional[ObservatoryMetadata] = None,
locationCode: Optional[str] = None,
locationCode: Optional[str] = None,
scaleFactor: int = 1000,
convert_channels: Optional[List[str]] = None,
snclMapper: str = "legacy",
scale_factor: Optional[int] = None,
 
sncl_mode: Optional[str] = "legacy",
):
):
TimeseriesFactory.__init__(self, observatory, channels, type, interval)
TimeseriesFactory.__init__(self, observatory, channels, type, interval)
if port == 2060:
self.host = host or "edgecwb.usgs.gov"
# earthworm (phasing this out gradually)
self.port = port or [2060]
self.client = earthworm.Client(host, port)
self.write_port = write_port # no default write port
else:
# CWBQuery/miniseed protocol (preferred)
self.client = neic.Client(host, port)
self.host = host
self.port = port
self.write_port = write_port
self.tag = tag
self.tag = tag
self.forceout = forceout
self.forceout = forceout
self.interval = interval
self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata()
self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata()
self.locationCode = locationCode
self.locationCode = locationCode
self.scaleFactor = scaleFactor
self.convert_channels = convert_channels or []
if snclMapper == "legacy":
self.scale_factor = scale_factor
 
self.sncl_mode = sncl_mode
 
if sncl_mode == "legacy":
self.get_sncl = LegacySNCL.get_sncl
self.get_sncl = LegacySNCL.get_sncl
elif snclMapper == "geomag":
elif sncl_mode == "geomag":
self.get_sncl = SNCL.get_sncl
self.get_sncl = SNCL.get_sncl
else:
else:
raise TimeseriesFactoryException("Unrecognized SNCL mapper")
raise TimeseriesFactoryException("Unrecognized SNCL mode")
def get_timeseries(
def get_timeseries(
self,
self,
@@ -145,7 +149,7 @@ class EdgeFactory(TimeseriesFactory):
@@ -145,7 +149,7 @@ class EdgeFactory(TimeseriesFactory):
list of channels to load
list of channels to load
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
data type
interval: {'second', 'minute', 'hour', 'day'}
interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
data interval
data interval
add_empty_channels: bool
add_empty_channels: bool
if True, returns channels without data as empty traces
if True, returns channels without data as empty traces
@@ -179,17 +183,22 @@ class EdgeFactory(TimeseriesFactory):
@@ -179,17 +183,22 @@ class EdgeFactory(TimeseriesFactory):
# get the timeseries
# get the timeseries
timeseries = Stream()
timeseries = Stream()
for channel in channels:
for channel in channels:
data = self._get_timeseries(
if channel in self.convert_channels:
starttime,
data = self._convert_timeseries(
endtime,
starttime, endtime, observatory, channel, type, interval
observatory,
)
channel,
else:
type,
data = self._get_timeseries(
interval,
starttime,
add_empty_channels,
endtime,
)
observatory,
if len(data) == 0:
channel,
continue
type,
 
interval,
 
add_empty_channels,
 
)
 
if len(data) == 0:
 
continue
timeseries += data
timeseries += data
finally:
finally:
# restore stdout
# restore stdout
@@ -220,7 +229,7 @@ class EdgeFactory(TimeseriesFactory):
@@ -220,7 +229,7 @@ class EdgeFactory(TimeseriesFactory):
list of channels to load
list of channels to load
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
data type
interval: {'second', 'minute', 'hour', 'day'}
interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
data interval
data interval
Notes
Notes
@@ -247,46 +256,84 @@ class EdgeFactory(TimeseriesFactory):
@@ -247,46 +256,84 @@ class EdgeFactory(TimeseriesFactory):
)
)
for channel in channels:
for channel in channels:
self._put_channel(
self._put_channel(
timeseries, observatory, channel, type, interval, starttime, endtime
timeseries.select(channel=channel),
 
observatory,
 
channel,
 
type,
 
interval,
 
starttime,
 
endtime,
)
)
def _convert_timeseries_to_decimal(self, stream: Stream):
def get_calculated_timeseries(
"""convert geomag edge timeseries data stored as ints, to decimal by
self,
dividing by 1000.00
starttime: UTCDateTime,
Parameters
endtime: UTCDateTime,
----------
observatory: str,
stream: Stream
channel: str,
a stream retrieved from a geomag edge representing one channel
type: DataType,
Notes
interval: DataInterval,
-----
components: List[dict],
This routine changes the values in the timeseries. The user should
) -> Trace:
make a copy before calling if they don't want that side effect.
"""Calculate a single channel using multiple component channels.
"""
for trace in stream:
trace.data = numpy.divide(trace.data, self.scaleFactor)
def _convert_trace_to_int(self, trace_in: Trace) -> Trace:
"""convert geomag edge traces stored as decimal, to ints by multiplying
by 1000
Parameters
Parameters
----------
----------
trace: Trace
starttime: UTCDateTime
a trace retrieved from a geomag edge representing one channel
the starttime of the requested data
 
endtime: UTCDateTime
 
the endtime of the requested data
 
observatory: str
 
observatory code
 
channel: str
 
single character channel {H, E, D, Z, F}
 
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
 
data type
 
interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
 
data interval
 
components: list
 
each component is a dictionary with the following keys:
 
channel: str
 
offset: float
 
scale: float
 
Returns
Returns
-------
-------
trace: Trace
out: Trace
a trace converted to ints
timeseries trace of the converted channel data
Notes
-----
this doesn't work on ndarray with nan's in it.
the trace must be a masked array.
"""
"""
trace = trace_in.copy()
# sum channels
trace.data = numpy.multiply(trace.data, self.scaleFactor)
stats = None
trace.data = trace.data.astype(int)
converted = None
for component in components:
return trace
# load component
 
data = self._get_timeseries(
 
starttime, endtime, observatory, component["channel"], type, interval
 
)[0]
 
# convert to nT
 
nt = data.data * component["scale"] + component["offset"]
 
# add to converted
 
if converted is None:
 
converted = nt
 
stats = Stats(data.stats)
 
else:
 
converted += nt
 
# set channel parameter to U, V, or W
 
stats.channel = channel
 
# create empty trace with adapted stats
 
out = TimeseriesUtility.create_empty_trace(
 
stats.starttime,
 
stats.endtime,
 
stats.station,
 
stats.channel,
 
stats.data_type,
 
stats.data_interval,
 
stats.network,
 
stats.station,
 
stats.location,
 
)
 
out.data = converted
 
return out
def _convert_stream_to_masked(self, timeseries: Stream, channel: str) -> Stream:
def _convert_stream_to_masked(self, timeseries: Stream, channel: str) -> Stream:
"""convert geomag edge traces in a timeseries stream to a MaskedArray
"""convert geomag edge traces in a timeseries stream to a MaskedArray
@@ -331,7 +378,7 @@ class EdgeFactory(TimeseriesFactory):
@@ -331,7 +378,7 @@ class EdgeFactory(TimeseriesFactory):
single character channel {H, E, D, Z, F}
single character channel {H, E, D, Z, F}
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
data type
interval: {'second', 'minute', 'hour', 'day'}
interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
data interval
data interval
add_empty_channels: bool
add_empty_channels: bool
if True, returns channels without data as empty traces
if True, returns channels without data as empty traces
@@ -352,23 +399,75 @@ class EdgeFactory(TimeseriesFactory):
@@ -352,23 +399,75 @@ class EdgeFactory(TimeseriesFactory):
# according to its author, EdgeCWB is inclusive of starttime, but exclusive of
# according to its author, EdgeCWB is inclusive of starttime, but exclusive of
# endtime, to satisfy seismic standards/requirements, to precision delta/2;
# endtime, to satisfy seismic standards/requirements, to precision delta/2;
half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2
half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2
 
 
# list-type ports variable needed for fail-back logic
try:
try:
data = self.client.get_waveforms(
ports = list(self.port)
sncl.network,
sncl.station,
sncl.location,
sncl.channel,
starttime,
endtime + half_delta,
)
except TypeError:
except TypeError:
# get_waveforms() fails if no data is returned from Edge
ports = [self.port]
data = Stream()
 
data = Stream()
 
for port in ports:
 
try:
 
client = earthworm.Client(self.host, port, timeout=10)
 
 
data += client.get_waveforms(
 
sncl.network,
 
sncl.station,
 
sncl.location,
 
sncl.channel,
 
starttime,
 
endtime + half_delta,
 
)
 
 
if data:
 
# if data was returned, force this port in subsequent calls
 
# to get_timeseries() that use this instance of EdgeFactory
 
self.port = [port]
 
break
 
print("No data returned from ", self.host, " on port ", port)
 
# try alternate port(s) if provided
 
except Exception as e:
 
print("Failed to get data from ", self.host, " on port ", port)
 
print("Ignoring error: ", e)
 
# try alternate port(s) if provided
 
continue
# make sure data is 32bit int
for trace in data:
for trace in data:
trace.data = trace.data.astype("i4")
if trace.data.dtype.kind == "i":
data.merge()
# convert all integer traces to rescaled 64-bit floats;
 
if sncl.channel[1] == "E":
 
# instrumental voltages are stored as 1/10 microvolts
 
trace.data = trace.data.astype(float) / (self.scale_factor or 1e7)
 
else:
 
# everything else (mostly magnetics stored as picoteslas)
 
trace.data = trace.data.astype(float) / (self.scale_factor or 1e3)
 
elif trace.data.dtype.kind == "f":
 
# convert all float traces to 64-bit floats;
 
trace.data = trace.data.astype(float) / (self.scale_factor or 1.0)
 
 
# when Traces with identical NSCL codes overlap, prioritize samples
 
# that come "later" in Stream; this depends on Edge returning miniseed
 
# packets in the order written
 
# NOTE: this is not possible with single calls to Stream.merge()
 
st_tmp = Stream()
 
for tr in data:
 
try:
 
# add tr to temporary stream
 
st_tmp += tr
 
# replace time overlaps with gaps
 
st_tmp.merge(0)
 
# add tr to temporary stream again
 
st_tmp += tr
 
# replace gaps with tr's data
 
st_tmp.merge(0)
 
except Exception as e:
 
tr = st_tmp.pop() # remove bad trace
 
print("Dropped trace: ", tr)
 
print("Ignoring error: ", e)
 
 
# point `data` to the new stream and continue processing
 
data = st_tmp
if data.count() == 0 and add_empty_channels:
if data.count() == 0 and add_empty_channels:
data += self._get_empty_trace(
data += self._get_empty_trace(
starttime=starttime,
starttime=starttime,
@@ -383,6 +482,84 @@ class EdgeFactory(TimeseriesFactory):
@@ -383,6 +482,84 @@ class EdgeFactory(TimeseriesFactory):
self._set_metadata(data, observatory, channel, type, interval)
self._set_metadata(data, observatory, channel, type, interval)
return data
return data
 
def _convert_timeseries(
 
self,
 
starttime: UTCDateTime,
 
endtime: UTCDateTime,
 
observatory: str,
 
channel: str,
 
type: DataType,
 
interval: DataInterval,
 
) -> Stream:
 
"""Generate a single channel using multiple components.
 
 
Finds metadata, then calls _get_converted_timeseries for actual
 
conversion.
 
 
Parameters
 
----------
 
starttime: UTCDateTime
 
the starttime of the requested data
 
endtime: UTCDateTime
 
the endtime of the requested data
 
observatory : str
 
observatory code
 
channel : str
 
single character channel {H, E, D, Z, F}
 
type : {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
 
data type
 
interval : {'tenhertz', 'second', 'minute', 'hour', 'day'}
 
data interval
 
 
Returns
 
-------
 
out: Stream
 
timeseries with single trace of the requested channel data
 
NOTE: this originally returned a Trace, but was modified
 
(probably due to misunderstanding) at some point to
 
return a Stream. This Stream is, however, expected
 
to contain just a single Trace.
 
"""
 
out = Stream()
 
metadata = get_instrument_calibrations(observatory, starttime, endtime)
 
# loop in case request spans different configurations
 
for entry in metadata:
 
entry_endtime = entry["end_time"]
 
entry_starttime = entry["start_time"]
 
instrument = entry["instrument"]
 
instrument_channels = instrument["channels"]
 
if channel not in instrument_channels:
 
# no idea how to convert
 
continue
 
# determine metadata overlap with request
 
start = (
 
starttime
 
if entry_starttime is None or entry_starttime < starttime
 
else entry_starttime
 
)
 
end = (
 
endtime
 
if entry_endtime is None or entry_endtime > endtime
 
else entry_endtime
 
)
 
# now convert
 
out += self.get_calculated_timeseries(
 
start,
 
end,
 
observatory,
 
channel,
 
type,
 
interval,
 
instrument_channels[channel],
 
)
 
# force to match the first trace's datatype
 
for trace in out:
 
trace.data = trace.data.astype(out[0].data.dtype)
 
# merge to force a single trace
 
# (precedence given to later inteval when metadata overlap)
 
out.merge(1)
 
return out
 
def _post_process(
def _post_process(
self,
self,
timeseries: Stream,
timeseries: Stream,
@@ -391,7 +568,7 @@ class EdgeFactory(TimeseriesFactory):
@@ -391,7 +568,7 @@ class EdgeFactory(TimeseriesFactory):
channels: List[str],
channels: List[str],
):
):
"""Post process a timeseries stream after the raw data is
"""Post process a timeseries stream after the raw data is
is fetched from a waveserver. Specifically changes
is fetched from waveserver. Specifically changes
any MaskedArray to a ndarray with nans representing gaps.
any MaskedArray to a ndarray with nans representing gaps.
Then calls pad_timeseries to deal with gaps at the
Then calls pad_timeseries to deal with gaps at the
beggining or end of the streams.
beggining or end of the streams.
@@ -409,7 +586,6 @@ class EdgeFactory(TimeseriesFactory):
@@ -409,7 +586,6 @@ class EdgeFactory(TimeseriesFactory):
Notes: the original timeseries object is changed.
Notes: the original timeseries object is changed.
"""
"""
self._convert_timeseries_to_decimal(timeseries)
for trace in timeseries:
for trace in timeseries:
if isinstance(trace.data, numpy.ma.MaskedArray):
if isinstance(trace.data, numpy.ma.MaskedArray):
trace.data.set_fill_value(numpy.nan)
trace.data.set_fill_value(numpy.nan)
@@ -443,14 +619,10 @@ class EdgeFactory(TimeseriesFactory):
@@ -443,14 +619,10 @@ class EdgeFactory(TimeseriesFactory):
channel to load
channel to load
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
data type
interval: {'second', 'minute', 'hour', 'day'}
interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
data interval
data interval
starttime: UTCDateTime
starttime: UTCDateTime
endtime: UTCDateTime
endtime: UTCDateTime
Notes
-----
RawInputClient seems to only work when sockets are
"""
"""
sncl = self.get_sncl(
sncl = self.get_sncl(
station=observatory,
station=observatory,
@@ -460,34 +632,42 @@ class EdgeFactory(TimeseriesFactory):
@@ -460,34 +632,42 @@ class EdgeFactory(TimeseriesFactory):
location=self.locationCode,
location=self.locationCode,
)
)
ric = RawInputClient(
stream_masked = self._convert_stream_to_masked(
self.tag,
timeseries=timeseries, channel=channel
self.host,
self.write_port,
sncl.station,
sncl.channel,
sncl.location,
sncl.network,
)
)
 
stream_split = stream_masked.split()
stream = self._convert_stream_to_masked(timeseries=timeseries, channel=channel)
for trace in stream_split:
# Make certain there's actually data
if not numpy.ma.any(stream.select(channel=channel)[0].data):
return
for trace in stream.select(channel=channel).split():
trace_send = trace.copy()
trace_send = trace.copy()
trace_send.trim(starttime, endtime)
trace_send.trim(starttime, endtime)
if channel == "D":
if channel == "D":
trace_send.data = ChannelConverter.get_minutes_from_radians(
trace_send.data = ChannelConverter.get_minutes_from_radians(
trace_send.data
trace_send.data
)
)
trace_send = self._convert_trace_to_int(trace_send)
if sncl.channel[1] == "E":
ric.send_trace(interval, trace_send)
# instrumental voltages are stored as 1/10 microvolts
if self.forceout:
trace_send.data = trace_send.data * (self.scale_factor or 1e7)
ric.forceout()
else:
ric.close()
# everything else (mostly magnetics stored as picoteslas)
 
trace_send.data = trace_send.data * (self.scale_factor or 1e3)
 
 
if self.write_port:
 
ric = RawInputClient(
 
self.tag,
 
self.host,
 
self.write_port,
 
sncl.station,
 
sncl.channel,
 
sncl.location,
 
sncl.network,
 
)
 
trace_send.data = trace_send.data.astype(int) # ric requires ints
 
ric.send_trace(interval, trace_send)
 
if self.forceout:
 
ric.forceout()
 
ric.close()
 
else:
 
raise TimeseriesFactoryException("Valid write port was not specified.")
def _set_metadata(
def _set_metadata(
self,
self,
@@ -506,7 +686,7 @@ class EdgeFactory(TimeseriesFactory):
@@ -506,7 +686,7 @@ class EdgeFactory(TimeseriesFactory):
edge channel code {MVH, MVE, MVD, ...}
edge channel code {MVH, MVE, MVD, ...}
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
data type
interval: {'second', 'minute', 'hour', 'day'}
interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
data interval
data interval
"""
"""
for trace in stream:
for trace in stream:
Loading