Skip to content
Snippets Groups Projects

Overhaul EdgeFactory and MiniSeedFactory for Edge data migration

1 file
+ 161
62
Compare changes
  • Side-by-side
  • Inline
  • 2bb0ec23
    MiniSeedFactory Upgrades: · 2bb0ec23
    Erin (Josh) Rigler authored
    - configurable scale_factor and float vs. int conversion logic
    - configurable sncl_mode for different channel mapping
    - multiple read ports allowed for fail-over logic
    - configurable write_encoding for setting miniseed data type
    - generally more (if not perfectly) compatible with EdgeFactory
@@ -17,7 +17,7 @@ from typing import List, Optional
import numpy
import numpy.ma
from obspy import read
from obspy.clients.neic import client as miniseed
from obspy.clients import neic
from obspy.core import Stats, Stream, Trace, UTCDateTime
from .. import ChannelConverter, TimeseriesUtility
@@ -28,6 +28,7 @@ from ..TimeseriesFactoryException import TimeseriesFactoryException
from ..ObservatoryMetadata import ObservatoryMetadata
from .MiniSeedInputClient import MiniSeedInputClient
from .SNCL import SNCL
from .LegacySNCL import LegacySNCL
class MiniSeedFactory(TimeseriesFactory):
@@ -38,7 +39,11 @@ class MiniSeedFactory(TimeseriesFactory):
host: str
a string representing the IP number of the host to connect to.
port: integer
the port number the miniseed query server is listening on.
port number on which EdgeCWB's queryserver (QS, obspy's "neic" client)
listens for requests to retrieve timeseries data.
write_port: integer
port number on which EdgeCWB's MiniSeedServer is listening for
requests to write timeseries data.
observatory: str
the observatory code for the desired observatory.
channels: array
@@ -57,6 +62,14 @@ class MiniSeedFactory(TimeseriesFactory):
in get_timeseries/put_timeseries
convert_channels: array
list of channels to convert from volt/bin to nT
scale_factor: float
override default scalings when reading/writing miniseed blocks;
(reading integer blocks divides by 1000; reading floating point
blocks divides by 1; writing all data multiplies by 1000)
default = None
sncl_mode: {'geomag','legacy'}
force mode to convert common names to SEED SNCL codes (that is,
station, network, channel, location codes); default = legacy
See Also
--------
@@ -64,17 +77,25 @@ class MiniSeedFactory(TimeseriesFactory):
Notes
-----
This is designed to read from any earthworm style waveserver, but it
currently only writes to an edge. Edge mimics an earthworm style
waveserver close enough that we hope to maintain that compatibility
for reading.
The MiniSeedFactory reads and writes miniseed blocks. This allows it
to be used to transfer non-integer data. That said, this factory cannot
read from or write to EdgeCWB's RAM buffer, and so is not able to access
eally "real time" data. MiniSeedFactory should be used for large data
transfers, or to transfer data that cannot be handled by the default
EdgeFactory (that is, non-integer data, or data that has not yet been
written to the EdgeCWB disk store).
Also, this factory allows one to write non-full miniseed blocks. This
is occasionally useful, but more often it results in wasted disk space
on the EdgeCWB server.
"""
def __init__(
self,
host: str = "edgecwb.usgs.gov",
port: int = 2061,
write_port: int = 7974,
host: Optional[str] = None,
port: Optional[int] = None,
write_port: Optional[int] = None,
write_encoding: Optional[str] = None,
observatory: Optional[str] = None,
channels: Optional[List[str]] = None,
type: Optional[DataType] = None,
@@ -82,18 +103,25 @@ class MiniSeedFactory(TimeseriesFactory):
observatoryMetadata: Optional[ObservatoryMetadata] = None,
locationCode: Optional[str] = None,
convert_channels: Optional[List[str]] = None,
scale_factor: Optional[int] = None,
sncl_mode: Optional[str] = "geomag",
):
TimeseriesFactory.__init__(self, observatory, channels, type, interval)
self.client = miniseed.Client(host, port)
self.host = host or "edgecwb.usgs.gov"
self.port = port or [22061, 2061]
self.write_port = write_port # no default write port
self.write_encoding = write_encoding or "float32"
self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata()
self.locationCode = locationCode
self.interval = interval
self.host = host
self.port = port
self.write_port = write_port
self.convert_channels = convert_channels or []
self.write_client = MiniSeedInputClient(self.host, self.write_port)
self.scale_factor = scale_factor
self.sncl_mode = sncl_mode
if sncl_mode == "legacy":
self.get_sncl = LegacySNCL.get_sncl
elif sncl_mode == "geomag":
self.get_sncl = SNCL.get_sncl
else:
raise TimeseriesFactoryException("Unrecognized SNCL mode")
def get_timeseries(
self,
@@ -110,19 +138,19 @@ class MiniSeedFactory(TimeseriesFactory):
Parameters
----------
starttime: UTCDateTime
time of first sample
time of first sample.
endtime: UTCDateTime
time of last sample
add_empty_channels: bool
if True, returns channels without data as empty traces
time of last sample.
observatory: str
observatory code
observatory code.
channels: array
list of channels to load
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
data interval
add_empty_channels: bool
if True, returns channels without data as empty traces
Returns
-------
@@ -226,10 +254,14 @@ class MiniSeedFactory(TimeseriesFactory):
)
for channel in channels:
self._put_channel(
timeseries, observatory, channel, type, interval, starttime, endtime
timeseries.select(channel=channel),
observatory,
channel,
type,
interval,
starttime,
endtime,
)
# close socket
self.write_client.close()
def get_calculated_timeseries(
self,
@@ -345,7 +377,7 @@ class MiniSeedFactory(TimeseriesFactory):
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'tenhertz', 'second', 'minute', 'hour', 'day'}
interval length
data interval
add_empty_channels: bool
if True, returns channels without data as empty traces
@@ -365,31 +397,73 @@ class MiniSeedFactory(TimeseriesFactory):
# according to its author, EdgeCWB is inclusive of starttime, but exclusive of
# endtime, to satisfy seismic standards/requirements, to precision delta/2;
half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2
data = self.client.get_waveforms(
sncl.network,
sncl.station,
sncl.location,
sncl.channel,
starttime,
endtime + half_delta,
)
for trace in data:
trace.data = trace.data.astype(data[0].data.dtype)
# handle overlapping samples in Traces with identical NSCL codes by
# prioritizing samples from those Traces in the Stream `data` that come
# later in sequence; these will most likely be the most recent versions
# (this is not possible with single calls to Stream.merge())
# list-type ports variable needed for fail-back logic
try:
ports = list(self.port)
except TypeError:
ports = [self.port]
data = Stream()
for port in ports:
try:
client = neic.Client(self.host, port, timeout=10)
data += client.get_waveforms(
sncl.network,
sncl.station,
sncl.location,
sncl.channel,
starttime,
endtime + half_delta,
)
if data:
# if data was returned, force this port in subsequent calls
# to get_timeseries() that use this instance of EdgeFactory
self.port = [port]
break
print("No data returned from ", self.host, " on port ", port)
# try alternate port(s) if provided
except Exception as e:
print("Failed to get data from ", self.host, " on port ", port)
print("Ignoring error: ", e)
# try alternate port(s) if provided
continue
for trace in data:
if trace.data.dtype.kind == "i":
# convert all integer traces to rescaled 64-bit floats;
if sncl.channel[1] == "E":
# instrumental voltages are stored as 1/10 microvolts
trace.data = trace.data.astype(float) / (self.scale_factor or 1e7)
else:
# everything else (mostly magnetics stored as picoteslas)
trace.data = trace.data.astype(float) / (self.scale_factor or 1e3)
elif trace.data.dtype.kind == "f":
# convert all float traces to 64-bit floats
trace.data = trace.data.astype(float)
# when Traces with identical NSCL codes overlap, prioritize samples
# that come "later" in Stream; this depends on Edge returning miniseed
# packets in the order written
# NOTE: this is not possible with single calls to Stream.merge()
st_tmp = Stream()
for tr in data:
# add tr to temporary stream
st_tmp += tr
# replace time overlaps with gaps
st_tmp.merge(0)
# add tr to temporary stream again
st_tmp += tr
# replace gaps with tr's data
st_tmp.merge(0)
try:
# add tr to temporary stream
st_tmp += tr
# replace time overlaps with gaps
st_tmp.merge(0)
# add tr to temporary stream again
st_tmp += tr
# replace gaps with tr's data
st_tmp.merge(0)
except Exception as e:
tr = st_tmp.pop() # remove bad trace
print("Dropped trace: ", tr)
print("Ignoring error: ", e)
# point `data` to the new stream and continue processing
data = st_tmp
@@ -497,7 +571,7 @@ class MiniSeedFactory(TimeseriesFactory):
channels: List[str],
):
"""Post process a timeseries stream after the raw data is
is fetched from querymom. Specifically changes
is fetched from queryserver. Specifically changes
any MaskedArray to a ndarray with nans representing gaps.
Then calls pad_timeseries to deal with gaps at the
beggining or end of the streams.
@@ -553,26 +627,51 @@ class MiniSeedFactory(TimeseriesFactory):
starttime: UTCDateTime
endtime: UTCDateTime
"""
# use separate traces when there are gaps
to_write = timeseries.select(channel=channel)
to_write = TimeseriesUtility.mask_stream(to_write)
to_write = to_write.split()
to_write = TimeseriesUtility.unmask_stream(to_write)
# relabel channels from internal to edge conventions
sncl = SNCL.get_sncl(
sncl = self.get_sncl(
station=observatory,
data_type=type,
interval=interval,
element=channel,
location=self.locationCode,
)
for trace in to_write:
trace.stats.station = sncl.station
trace.stats.location = sncl.location
trace.stats.network = sncl.network
trace.stats.channel = sncl.channel
# finally, send to edge
self.write_client.send(to_write)
stream_masked = self._convert_stream_to_masked(
timeseries=timeseries, channel=channel
)
stream_split = stream_masked.split()
for trace in stream_split:
trace_send = trace.copy()
trace_send.trim(starttime, endtime)
if channel == "D":
trace_send.data = ChannelConverter.get_minutes_from_radians(
trace_send.data
)
if "float" not in self.write_encoding:
# if write_encoding is a float, ignore scale_factor
if sncl.channel[1] == "E":
# instrumental voltages are stored as 1/10 microvolts
trace_send.data = trace_send.data * (self.scale_factor or 1e7)
else:
# everything else (mostly magnetics stored as picoteslas)
trace_send.data = trace_send.data * (self.scale_factor or 1e3)
if self.write_port:
# FIXME: input clients should be rewritten for consistency,
# allowing us to clean up stuff like this if-else-block
msic = MiniSeedInputClient(
self.host, self.write_port, self.write_encoding
)
trace_send.stats.station = sncl.station
trace_send.stats.location = sncl.location
trace_send.stats.network = sncl.network
trace_send.stats.channel = sncl.channel
stream_send = Stream(trace_send)
msic.send(stream_send)
msic.close()
else:
raise TimeseriesFactoryException("Valid write port was not specified.")
def _set_metadata(
self,
Loading