Newer
Older
"""Factory that loads data from earthworm and writes to Edge.
EdgeFactory uses obspy earthworm class to read data from any
earthworm standard Waveserver using the obspy getWaveform call.
Writing will be implemented with Edge specific capabilities,
to take advantage of it's newer realtime abilities.
Edge is the USGS earthquake hazard centers replacement for earthworm.
"""
Hal Simpson
committed
import numpy
from obspy import Stream, Trace, UTCDateTime

Jeremy M Fee
committed
from obspy.clients import earthworm
from .. import ChannelConverter, TimeseriesUtility
from ..geomag_types import DataInterval, DataType
from ..TimeseriesFactory import TimeseriesFactory
from ..TimeseriesFactoryException import TimeseriesFactoryException
from ..ObservatoryMetadata import ObservatoryMetadata
from .LegacySNCL import LegacySNCL
class EdgeFactory(TimeseriesFactory):
"""TimeseriesFactory for Edge related data.
Parameters
----------
host: str
a string representing the IP number of the host to connect to.
port: integer
the port number the waveserver is listening on.
write_port: integer
the port number the client is writing to.
cwbport: int
the port number of the cwb host to connect to.
tag: str
A tag used by edge to log and associate a socket with a given data
source
forceout: bool
Tells edge to forceout a packet to miniseed. Generally used when
the user knows no more data is coming.
observatory: str
the observatory code for the desired observatory.
channels: array
an array of channels {H, D, E, F, Z, MGD, MSD, HGD}.
Known since channel names are mapped based on interval and type,
others are passed through, see #_get_edge_channel().
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
data interval
observatoryMetadata: ObservatoryMetadata object
an ObservatoryMetadata object used to replace the default
ObservatoryMetadata.
locationCode: str
the location code for the given edge server, overrides type
in get_timeseries/put_timeseries
cwbhost: str
a string represeting the IP number of the cwb host to connect to.
See Also
--------
TimeseriesFactory
Notes
-----
This is designed to read from any earthworm style waveserver, but it
currently only writes to an edge. Edge mimics an earthworm style
waveserver close enough that we hope to maintain that compatibility
for reading.
host: str = "cwbpub.cr.usgs.gov",
port: int = 2060,
write_port: int = 7981,
cwbport: int = 0,
tag: str = "GeomagAlg",
forceout: bool = False,
observatory: Optional[str] = None,
channels: Optional[List[str]] = None,
type: Optional[DataType] = None,
interval: Optional[DataInterval] = None,
observatoryMetadata: Optional[ObservatoryMetadata] = None,
locationCode: Optional[str] = None,
cwbhost: Optional[str] = None,
TimeseriesFactory.__init__(self, observatory, channels, type, interval)
self.client = earthworm.Client(host, port)
self.host = host
self.port = port
self.cwbport = cwbport
self.forceout = forceout
self.interval = interval
self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata()
self.locationCode = locationCode
self.cwbhost = cwbhost or ""
Hal Simpson
committed
starttime: UTCDateTime,
endtime: UTCDateTime,
observatory: Optional[str] = None,
channels: Optional[List[str]] = None,
type: Optional[DataType] = None,
interval: Optional[DataInterval] = None,
add_empty_channels: bool = True,
"""Get timeseries data
Parameters
----------
time of first sample.
time of last sample.
observatory: str
list of channels to load
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
data interval
add_empty_channels: bool
if True, returns channels without data as empty traces
Returns
-------
timeseries: Stream
timeseries object with requested data
Raises
------
TimeseriesFactoryException
if invalid values are requested, or errors occur while
retrieving timeseries.
"""
observatory = observatory or self.observatory
channels = channels or self.channels
type = type or self.type
interval = interval or self.interval
Hal Simpson
committed
if starttime > endtime:
raise TimeseriesFactoryException(
'Starttime before endtime "%s" "%s"' % (starttime, endtime)
)
Hal Simpson
committed

Jeremy M Fee
committed
# obspy factories sometimes write to stdout, instead of stderr
original_stdout = sys.stdout
try:

Jeremy M Fee
committed
# send stdout to stderr
sys.stdout = sys.stderr
starttime,
endtime,
observatory,
channel,
type,
interval,
add_empty_channels,
if len(data) == 0:
continue
timeseries += data
finally:

Jeremy M Fee
committed
# restore stdout
sys.stdout = original_stdout
self._post_process(timeseries, starttime, endtime, channels)
Hal Simpson
committed
return timeseries
timeseries: Stream,
starttime: Optional[UTCDateTime] = None,
endtime: Optional[UTCDateTime] = None,
observatory: Optional[str] = None,
channels: Optional[List[str]] = None,
type: Optional[DataType] = None,
interval: Optional[DataInterval] = None,
"""Put timeseries data
Parameters
----------
timeseries object with data to be written
observatory: str
list of channels to load
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
data interval
Notes
-----
Streams sent to timeseries are expected to have a single trace per
channel and that trace should have an ndarray, with nan's
representing gaps.
"""
Hal Simpson
committed
stats = timeseries[0].stats
geomag_user
committed
observatory = observatory or stats.station or self.observatory
channels = channels or self.channels
Hal Simpson
committed
type = type or self.type or stats.data_type
interval = interval or self.interval or stats.data_interval
if starttime is None or endtime is None:
starttime, endtime = TimeseriesUtility.get_stream_start_end_times(
for channel in channels:
if timeseries.select(channel=channel).count() == 0:
raise TimeseriesFactoryException(
'Missing channel "%s" for output, available channels %s'
% (channel, str(TimeseriesUtility.get_channels(timeseries)))
)
for channel in channels:
self._put_channel(
timeseries, observatory, channel, type, interval, starttime, endtime
)
def _convert_timeseries_to_decimal(self, stream: Stream):
"""convert geomag edge timeseries data stored as ints, to decimal by
dividing by 1000.00
Hal Simpson
committed
Parameters
----------
stream: Stream
a stream retrieved from a geomag edge representing one channel
Notes
-----
This routine changes the values in the timeseries. The user should
make a copy before calling if they don't want that side effect.
Hal Simpson
committed
"""
for trace in stream:
trace.data = numpy.divide(trace.data, 1000.00)
def _convert_trace_to_int(self, trace_in: Trace) -> Trace:
"""convert geomag edge traces stored as decimal, to ints by multiplying
by 1000
Parameters
----------
trace: Trace
a trace retrieved from a geomag edge representing one channel
Returns
-------
a trace converted to ints
Notes
-----
this doesn't work on ndarray with nan's in it.
the trace must be a masked array.
"""
trace = trace_in.copy()
trace.data = numpy.multiply(trace.data, 1000.00)
trace.data = trace.data.astype(int)
return trace
def _convert_stream_to_masked(self, timeseries: Stream, channel: str) -> Stream:
"""convert geomag edge traces in a timeseries stream to a MaskedArray
This allows for gaps and splitting.
Parameters
----------
stream: Stream
a stream retrieved from a geomag edge representing one channel
channel: str
the channel to be masked
Returns
-------
stream: Stream
a stream with all traces converted to masked arrays
stream = timeseries.copy()
for trace in stream.select(channel=channel):
Hal Simpson
committed
trace.data = numpy.ma.masked_invalid(trace.data)
return stream
def _get_timeseries(
self,
starttime: UTCDateTime,
endtime: UTCDateTime,
observatory: str,
channel: str,
type: DataType,
interval: DataInterval,
add_empty_channels: bool = True,
"""get timeseries data for a single channel.
Parameters
----------
the starttime of the requested data
the endtime of the requested data
observatory code
Hal Simpson
committed
single character channel {H, E, D, Z, F}
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
data interval
add_empty_channels: bool
if True, returns channels without data as empty traces
Returns
-------
timeseries trace of the requested channel data
"""
sncl = LegacySNCL.get_sncl(
station=observatory,
data_type=type,
interval=interval,
element=channel,
# geomag-algorithms *should* treat starttime/endtime as inclusive everywhere;
# according to its author, EdgeCWB is inclusive of starttime, but exclusive of
# endtime, to satisfy seismic standards/requirements, to precision delta/2;
half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2
try:
data = self.client.get_waveforms(
sncl.network,
sncl.station,
sncl.channel,
starttime,
)
except TypeError:
# get_waveforms() fails if no data is returned from Edge
# make sure data is 32bit int
for trace in data:
trace.data = trace.data.astype("i4")
Hal Simpson
committed
data.merge()
if data.count() == 0 and add_empty_channels:
Cain, Payton David
committed
data += self._get_empty_trace(
starttime=starttime,
endtime=endtime,
observatory=observatory,
Cain, Payton David
committed
channel=channel,
data_type=type,
interval=interval,
Cain, Payton David
committed
network=sncl.network,
location=sncl.location,
self._set_metadata(data, observatory, channel, type, interval)
return data
Hal Simpson
committed
def _post_process(
self,
timeseries: Stream,
starttime: UTCDateTime,
endtime: UTCDateTime,
channels: List[str],
):
"""Post process a timeseries stream after the raw data is
is fetched from a waveserver. Specifically changes
any MaskedArray to a ndarray with nans representing gaps.
Then calls pad_timeseries to deal with gaps at the
beggining or end of the streams.
Parameters
----------
The timeseries stream as returned by the call to get_waveforms
list of channels to load
Notes: the original timeseries object is changed.
"""
self._convert_timeseries_to_decimal(timeseries)
Hal Simpson
committed
if isinstance(trace.data, numpy.ma.MaskedArray):
trace.data.set_fill_value(numpy.nan)
trace.data = trace.data.filled()
if "D" in channels:
for trace in timeseries.select(channel="D"):
trace.data = ChannelConverter.get_radians_from_minutes(trace.data)
TimeseriesUtility.pad_timeseries(timeseries, starttime, endtime)
Hal Simpson
committed
timeseries: Stream,
observatory: str,
channel: str,
type: DataType,
interval: DataInterval,
starttime: UTCDateTime,
endtime: UTCDateTime,
Hal Simpson
committed
"""Put a channel worth of data
Parameters
----------
Hal Simpson
committed
timeseries object with data to be written
observatory: str
channel: str
Hal Simpson
committed
channel to load
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
data interval
starttime: UTCDateTime
endtime: UTCDateTime
Hal Simpson
committed
Notes
-----
RawInputClient seems to only work when sockets are
"""
sncl = LegacySNCL.get_sncl(
station=observatory,
data_type=type,
interval=interval,
element=channel,
Hal Simpson
committed
if ((now - endtime) > 864000) and (self.cwbport > 0):
host = self.cwbhost
port = self.cwbport
else:
host = self.host
Hal Simpson
committed
self.tag,
host,
port,
sncl.station,
sncl.channel,
Hal Simpson
committed
stream = self._convert_stream_to_masked(timeseries=timeseries, channel=channel)
# Make certain there's actually data
if not numpy.ma.any(stream.select(channel=channel)[0].data):
return
for trace in stream.select(channel=channel).split():
Hal Simpson
committed
trace_send = trace.copy()
trace_send.trim(starttime, endtime)
trace_send.data = ChannelConverter.get_minutes_from_radians(
trace_send = self._convert_trace_to_int(trace_send)
ric.send_trace(interval, trace_send)
if self.forceout:
ric.forceout()
ric.close()
def _set_metadata(
self,
observatory: str,
channel: str,
type: str,
interval: str,
):
"""set metadata for a given stream/channel
Parameters
----------
edge channel code {MVH, MVE, MVD, ...}
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
data interval
"""
for trace in stream:
self.observatoryMetadata.set_metadata(
trace.stats, observatory, channel, type, interval
)