Skip to content
Snippets Groups Projects

Overhaul EdgeFactory and MiniSeedFactory for Edge data migration

1 file
+ 19
18
Compare changes
  • Side-by-side
  • Inline
+ 301
120
@@ -16,11 +16,12 @@ from typing import List, Optional
import numpy
import numpy.ma
from obspy import Stream, Trace, UTCDateTime
from obspy.clients import earthworm, neic
from obspy.core import Stats, Stream, Trace, UTCDateTime
from obspy.clients import earthworm
from .. import ChannelConverter, TimeseriesUtility
from ..geomag_types import DataInterval, DataType
from ..metadata.instrument.InstrumentCalibrations import get_instrument_calibrations
from ..TimeseriesFactory import TimeseriesFactory
from ..TimeseriesFactoryException import TimeseriesFactoryException
from ..ObservatoryMetadata import ObservatoryMetadata
@@ -37,9 +38,11 @@ class EdgeFactory(TimeseriesFactory):
host: str
a string representing the IP number of the host to connect to.
port: integer
the port number the waveserver is listening on.
port number on which EdgeCWB's CWB waveserver (CWBWS, obspy's
"earthworm" client) listens for requests to retrieve timeseries data.
write_port: integer
the port number the client is writing to.
the port number on which EdgeCWB's RawInputServer is listening for
requests to write timeseries data.
tag: str
A tag used by edge to log and associate a socket with a given data
source
@@ -54,7 +57,7 @@ class EdgeFactory(TimeseriesFactory):
others are passed through, see #_get_edge_channel().
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
data interval
observatoryMetadata: ObservatoryMetadata object
an ObservatoryMetadata object used to replace the default
@@ -62,13 +65,18 @@ class EdgeFactory(TimeseriesFactory):
locationCode: str
the location code for the given edge server, overrides type
in get_timeseries/put_timeseries
scaleFactor: int
all data written to edge (via raw input client) will be scaled
by this integer prior to write; all data read from edge will be
will be divided by this integer after read; default = 1000
snclMapper: {'geomag','legacy'}
a mapper of common channel names to SEED SNCL codes (that is,
convert_channels: array
list of channels to convert from volt/bin to nT
scale_factor: float
override default scalings when reading/writing miniseed blocks;
(reading integer blocks divides by 1000; reading floating point
blocks divides by 1; writing all data multiplies by 1000)
default = None
sncl_mode: {'geomag','legacy'}
force mode to convert common names to SEED SNCL codes (that is,
station, network, channel, location codes); default = legacy
timeout: float
timeout for Earthworm client; default=10
See Also
--------
@@ -76,17 +84,19 @@ class EdgeFactory(TimeseriesFactory):
Notes
-----
This is designed to read from any earthworm style waveserver, but it
currently only writes to an edge. Edge mimics an earthworm style
waveserver close enough that we hope to maintain that compatibility
for reading.
The EdgeFactory reads so-called trace-bufs and writes integer data
to an EdgeCWB RawInputServer, which places these data, sample-by-sample,
into a RAM buffer where it can be immediately retrieved, even before
full miniseed blocks can be constructed and written to disk. The
EdgeFactory cannot handle non-integer data, and is inefficient for
larger data transfers. In those cases, consider using MiniSeedFactory.
"""
def __init__(
self,
host: str = "edgecwb.usgs.gov",
port: int = 2060,
write_port: int = 7981,
host: Optional[str] = None,
port: Optional[int] = None,
write_port: Optional[int] = None,
tag: str = "GeomagAlg",
forceout: bool = False,
observatory: Optional[str] = None,
@@ -95,31 +105,29 @@ class EdgeFactory(TimeseriesFactory):
interval: Optional[DataInterval] = None,
observatoryMetadata: Optional[ObservatoryMetadata] = None,
locationCode: Optional[str] = None,
scaleFactor: int = 1000,
snclMapper: str = "legacy",
convert_channels: Optional[List[str]] = None,
scale_factor: Optional[int] = None,
sncl_mode: Optional[str] = "legacy",
timeout: Optional[float] = None,
):
TimeseriesFactory.__init__(self, observatory, channels, type, interval)
if port == 2060:
# earthworm (phasing this out gradually)
self.client = earthworm.Client(host, port)
else:
# CWBQuery/miniseed protocol (preferred)
self.client = neic.Client(host, port)
self.host = host
self.port = port
self.write_port = write_port
self.host = host or "edgecwb.usgs.gov"
self.port = port or [2060]
self.write_port = write_port # no default write port
self.tag = tag
self.forceout = forceout
self.interval = interval
self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata()
self.locationCode = locationCode
self.scaleFactor = scaleFactor
if snclMapper == "legacy":
self.convert_channels = convert_channels or []
self.scale_factor = scale_factor
self.sncl_mode = sncl_mode
self.timeout = timeout or 10
if sncl_mode == "legacy":
self.get_sncl = LegacySNCL.get_sncl
elif snclMapper == "geomag":
elif sncl_mode == "geomag":
self.get_sncl = SNCL.get_sncl
else:
raise TimeseriesFactoryException("Unrecognized SNCL mapper")
raise TimeseriesFactoryException("Unrecognized SNCL mode")
def get_timeseries(
self,
@@ -145,7 +153,7 @@ class EdgeFactory(TimeseriesFactory):
list of channels to load
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
data interval
add_empty_channels: bool
if True, returns channels without data as empty traces
@@ -179,17 +187,22 @@ class EdgeFactory(TimeseriesFactory):
# get the timeseries
timeseries = Stream()
for channel in channels:
data = self._get_timeseries(
starttime,
endtime,
observatory,
channel,
type,
interval,
add_empty_channels,
)
if len(data) == 0:
continue
if channel in self.convert_channels:
data = self._convert_timeseries(
starttime, endtime, observatory, channel, type, interval
)
else:
data = self._get_timeseries(
starttime,
endtime,
observatory,
channel,
type,
interval,
add_empty_channels,
)
if len(data) == 0:
continue
timeseries += data
finally:
# restore stdout
@@ -220,7 +233,7 @@ class EdgeFactory(TimeseriesFactory):
list of channels to load
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
data interval
Notes
@@ -247,46 +260,84 @@ class EdgeFactory(TimeseriesFactory):
)
for channel in channels:
self._put_channel(
timeseries, observatory, channel, type, interval, starttime, endtime
timeseries.select(channel=channel),
observatory,
channel,
type,
interval,
starttime,
endtime,
)
def _convert_timeseries_to_decimal(self, stream: Stream):
"""convert geomag edge timeseries data stored as ints, to decimal by
dividing by 1000.00
Parameters
----------
stream: Stream
a stream retrieved from a geomag edge representing one channel
Notes
-----
This routine changes the values in the timeseries. The user should
make a copy before calling if they don't want that side effect.
"""
for trace in stream:
trace.data = numpy.divide(trace.data, self.scaleFactor)
def _convert_trace_to_int(self, trace_in: Trace) -> Trace:
"""convert geomag edge traces stored as decimal, to ints by multiplying
by 1000
def get_calculated_timeseries(
self,
starttime: UTCDateTime,
endtime: UTCDateTime,
observatory: str,
channel: str,
type: DataType,
interval: DataInterval,
components: List[dict],
) -> Trace:
"""Calculate a single channel using multiple component channels.
Parameters
----------
trace: Trace
a trace retrieved from a geomag edge representing one channel
starttime: UTCDateTime
the starttime of the requested data
endtime: UTCDateTime
the endtime of the requested data
observatory: str
observatory code
channel: str
single character channel {H, E, D, Z, F}
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
data interval
components: list
each component is a dictionary with the following keys:
channel: str
offset: float
scale: float
Returns
-------
trace: Trace
a trace converted to ints
Notes
-----
this doesn't work on ndarray with nan's in it.
the trace must be a masked array.
out: Trace
timeseries trace of the converted channel data
"""
trace = trace_in.copy()
trace.data = numpy.multiply(trace.data, self.scaleFactor)
trace.data = trace.data.astype(int)
return trace
# sum channels
stats = None
converted = None
for component in components:
# load component
data = self._get_timeseries(
starttime, endtime, observatory, component["channel"], type, interval
)[0]
# convert to nT
nt = data.data * component["scale"] + component["offset"]
# add to converted
if converted is None:
converted = nt
stats = Stats(data.stats)
else:
converted += nt
# set channel parameter to U, V, or W
stats.channel = channel
# create empty trace with adapted stats
out = TimeseriesUtility.create_empty_trace(
stats.starttime,
stats.endtime,
stats.station,
stats.channel,
stats.data_type,
stats.data_interval,
stats.network,
stats.station,
stats.location,
)
out.data = converted
return out
def _convert_stream_to_masked(self, timeseries: Stream, channel: str) -> Stream:
"""convert geomag edge traces in a timeseries stream to a MaskedArray
@@ -331,7 +382,7 @@ class EdgeFactory(TimeseriesFactory):
single character channel {H, E, D, Z, F}
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
data interval
add_empty_channels: bool
if True, returns channels without data as empty traces
@@ -352,23 +403,75 @@ class EdgeFactory(TimeseriesFactory):
# according to its author, EdgeCWB is inclusive of starttime, but exclusive of
# endtime, to satisfy seismic standards/requirements, to precision delta/2;
half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2
# list-type ports variable needed for fail-back logic
try:
data = self.client.get_waveforms(
sncl.network,
sncl.station,
sncl.location,
sncl.channel,
starttime,
endtime + half_delta,
)
ports = list(self.port)
except TypeError:
# get_waveforms() fails if no data is returned from Edge
data = Stream()
ports = [self.port]
data = Stream()
for port in ports:
try:
client = earthworm.Client(self.host, port, timeout=self.timeout)
data += client.get_waveforms(
sncl.network,
sncl.station,
sncl.location,
sncl.channel,
starttime,
endtime + half_delta,
)
if data:
# if data was returned, force this port in subsequent calls
# to get_timeseries() that use this instance of EdgeFactory
self.port = [port]
break
print("No data returned from ", self.host, " on port ", port)
# try alternate port(s) if provided
except Exception as e:
print("Failed to get data from ", self.host, " on port ", port)
print("Ignoring error: ", e)
# try alternate port(s) if provided
continue
# make sure data is 32bit int
for trace in data:
trace.data = trace.data.astype("i4")
data.merge()
if trace.data.dtype.kind == "i":
# convert all integer traces to rescaled 64-bit floats;
if sncl.channel[1] == "E":
# instrumental voltages are stored as 1/10 microvolts
trace.data = trace.data.astype(float) / (self.scale_factor or 1e7)
else:
# everything else (mostly magnetics stored as picoteslas)
trace.data = trace.data.astype(float) / (self.scale_factor or 1e3)
elif trace.data.dtype.kind == "f":
# convert all float traces to 64-bit floats;
trace.data = trace.data.astype(float) / (self.scale_factor or 1.0)
# when Traces with identical NSCL codes overlap, prioritize samples
# that come "later" in Stream; this depends on Edge returning miniseed
# packets in the order written
# NOTE: this is not possible with single calls to Stream.merge()
st_tmp = Stream()
for tr in data:
try:
# add tr to temporary stream
st_tmp += tr
# replace time overlaps with gaps
st_tmp.merge(0)
# add tr to temporary stream again
st_tmp += tr
# replace gaps with tr's data
st_tmp.merge(0)
except Exception as e:
tr = st_tmp.pop() # remove bad trace
print("Dropped trace: ", tr)
print("Ignoring error: ", e)
# point `data` to the new stream and continue processing
data = st_tmp
if data.count() == 0 and add_empty_channels:
data += self._get_empty_trace(
starttime=starttime,
@@ -383,6 +486,84 @@ class EdgeFactory(TimeseriesFactory):
self._set_metadata(data, observatory, channel, type, interval)
return data
def _convert_timeseries(
self,
starttime: UTCDateTime,
endtime: UTCDateTime,
observatory: str,
channel: str,
type: DataType,
interval: DataInterval,
) -> Stream:
"""Generate a single channel using multiple components.
Finds metadata, then calls _get_converted_timeseries for actual
conversion.
Parameters
----------
starttime: UTCDateTime
the starttime of the requested data
endtime: UTCDateTime
the endtime of the requested data
observatory : str
observatory code
channel : str
single character channel {H, E, D, Z, F}
type : {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval : {'tenhertz', 'second', 'minute', 'hour', 'day'}
data interval
Returns
-------
out: Stream
timeseries with single trace of the requested channel data
NOTE: this originally returned a Trace, but was modified
(probably due to misunderstanding) at some point to
return a Stream. This Stream is, however, expected
to contain just a single Trace.
"""
out = Stream()
metadata = get_instrument_calibrations(observatory, starttime, endtime)
# loop in case request spans different configurations
for entry in metadata:
entry_endtime = entry["end_time"]
entry_starttime = entry["start_time"]
instrument = entry["instrument"]
instrument_channels = instrument["channels"]
if channel not in instrument_channels:
# no idea how to convert
continue
# determine metadata overlap with request
start = (
starttime
if entry_starttime is None or entry_starttime < starttime
else entry_starttime
)
end = (
endtime
if entry_endtime is None or entry_endtime > endtime
else entry_endtime
)
# now convert
out += self.get_calculated_timeseries(
start,
end,
observatory,
channel,
type,
interval,
instrument_channels[channel],
)
# force to match the first trace's datatype
for trace in out:
trace.data = trace.data.astype(out[0].data.dtype)
# merge to force a single trace
# (precedence given to later inteval when metadata overlap)
out.merge(1)
return out
def _post_process(
self,
timeseries: Stream,
@@ -391,7 +572,7 @@ class EdgeFactory(TimeseriesFactory):
channels: List[str],
):
"""Post process a timeseries stream after the raw data is
is fetched from a waveserver. Specifically changes
is fetched from waveserver. Specifically changes
any MaskedArray to a ndarray with nans representing gaps.
Then calls pad_timeseries to deal with gaps at the
beggining or end of the streams.
@@ -409,7 +590,6 @@ class EdgeFactory(TimeseriesFactory):
Notes: the original timeseries object is changed.
"""
self._convert_timeseries_to_decimal(timeseries)
for trace in timeseries:
if isinstance(trace.data, numpy.ma.MaskedArray):
trace.data.set_fill_value(numpy.nan)
@@ -443,14 +623,10 @@ class EdgeFactory(TimeseriesFactory):
channel to load
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
data interval
starttime: UTCDateTime
endtime: UTCDateTime
Notes
-----
RawInputClient seems to only work when sockets are
"""
sncl = self.get_sncl(
station=observatory,
@@ -460,34 +636,39 @@ class EdgeFactory(TimeseriesFactory):
location=self.locationCode,
)
ric = RawInputClient(
self.tag,
self.host,
self.write_port,
sncl.station,
sncl.channel,
sncl.location,
sncl.network,
stream_masked = self._convert_stream_to_masked(
timeseries=timeseries, channel=channel
)
stream_split = stream_masked.split()
stream = self._convert_stream_to_masked(timeseries=timeseries, channel=channel)
# Make certain there's actually data
if not numpy.ma.any(stream.select(channel=channel)[0].data):
return
for trace in stream.select(channel=channel).split():
for trace in stream_split:
trace_send = trace.copy()
trace_send.trim(starttime, endtime)
if channel == "D":
trace_send.data = ChannelConverter.get_minutes_from_radians(
trace_send.data
)
trace_send = self._convert_trace_to_int(trace_send)
if sncl.channel[1] == "E":
# instrumental voltages are stored as 1/10 microvolts
trace_send.data = trace_send.data * (self.scale_factor or 1e7)
else:
# everything else (mostly magnetics stored as picoteslas)
trace_send.data = trace_send.data * (self.scale_factor or 1e3)
ric = RawInputClient(
self.tag,
self.host,
self.write_port,
sncl.station,
sncl.channel,
sncl.location,
sncl.network,
)
trace_send.data = trace_send.data.astype(int) # ric requires ints
ric.send_trace(interval, trace_send)
if self.forceout:
ric.forceout()
ric.close()
if self.forceout:
ric.forceout()
ric.close()
def _set_metadata(
self,
@@ -506,7 +687,7 @@ class EdgeFactory(TimeseriesFactory):
edge channel code {MVH, MVE, MVD, ...}
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
data interval
"""
for trace in stream:
Loading