Newer
Older
Wilbur, Spencer Franklin
committed
"""Factory that loads data from FDSN to edge
"""
Wilbur, Spencer Franklin
committed
from __future__ import absolute_import
from datetime import datetime
from typing import List, Optional, Tuple
Wilbur, Spencer Franklin
committed
import numpy
import sys
import numpy.ma
from obspy import Stream, UTCDateTime, Trace, read, read_inventory
Wilbur, Spencer Franklin
committed
from obspy.clients.fdsn.header import FDSNNoDataException
Wilbur, Spencer Franklin
committed
from obspy.clients.iris import Client
from obspy.clients.fdsn import Client as FDSNClient
from .. import ChannelConverter, TimeseriesUtility
from ..geomag_types import DataInterval, DataType
from ..TimeseriesFactory import TimeseriesFactory
from ..TimeseriesFactoryException import TimeseriesFactoryException
from ..ObservatoryMetadata import ObservatoryMetadata
from ..VariometerMetadata import VariometerMetadata
from .RawInputClient import RawInputClient
from .FDSNSNCL import FDSNSNCL
Wilbur, Spencer Franklin
committed
from .SNCL import SNCL
Wilbur, Spencer Franklin
committed
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class FDSNFactory(TimeseriesFactory):
"""TimeseriesFactory for Edge related data.
Parameters
----------
host: str
a string representing the IP number of the host to connect to.
port: integer
the port number the waveserver is listening on.
write_port: integer
the port number the client is writing to.
cwbport: int
the port number of the cwb host to connect to.
tag: str
A tag used by edge to log and associate a socket with a given data
source
forceout: bool
Tells edge to forceout a packet to miniseed. Generally used when
the user knows no more data is coming.
observatory: str
the observatory code for the desired observatory.
channels: array
an array of channels {H, D, E, F, Z, MGD, MSD, HGD}.
Known since channel names are mapped based on interval and type,
others are passed through, see #_get_edge_channel().
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
data interval
observatoryMetadata: ObservatoryMetadata object
an ObservatoryMetadata object used to replace the default
ObservatoryMetadata.
locationCode: str
the location code for the given edge server, overrides type
in get_timeseries/put_timeseries
cwbhost: str
a string represeting the IP number of the cwb host to connect to.
See Also
--------
TimeseriesFactory
Notes
-----
This is designed to read from any earthworm style waveserver, but it
currently only writes to an edge. Edge mimics an earthworm style
waveserver close enough that we hope to maintain that compatibility
for reading.
"""
def __init__(
self,
Wilbur, Spencer Franklin
committed
base_url: str = "http://service.iris.edu",
Wilbur, Spencer Franklin
committed
tag: str = "GeomagAlg",
forceout: bool = False,
observatory: Optional[str] = None,
network: str = "NT",
channels: Optional[List[str]] = None,
type: Optional[DataType] = None,
interval: Optional[DataInterval] = None,
variometerMetadata: Optional[VariometerMetadata] = None,
locationCode: Optional[str] = None,
Wilbur, Spencer Franklin
committed
snclMapper: str = "geomag",
Wilbur, Spencer Franklin
committed
):
TimeseriesFactory.__init__(self, observatory, channels, type, interval)
Wilbur, Spencer Franklin
committed
self.Client = FDSNClient(base_url)
Wilbur, Spencer Franklin
committed
self.tag = tag
self.forceout = forceout
self.interval = interval
self.observatoryMetadata = variometerMetadata or VariometerMetadata()
self.network = network
self.locationCode = locationCode
Wilbur, Spencer Franklin
committed
if snclMapper == "geomag":
self.get_sncl = FDSNSNCL.get_sncl
else:
raise TimeseriesFactoryException("Unrecognized SNCL mapper")
Wilbur, Spencer Franklin
committed
def get_timeseries(
self,
starttime: UTCDateTime,
endtime: UTCDateTime,
observatory: Optional[str] = None,
channels: Optional[List[str]] = None,
type: Optional[DataType] = None,
interval: Optional[DataInterval] = None,
Wilbur, Spencer Franklin
committed
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
) -> Stream:
"""Get timeseries data
Parameters
----------
starttime: UTCDateTime
time of first sample.
endtime: UTCDateTime
time of last sample.
observatory: str
observatory code.
channels: array
list of channels to load
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
data interval
add_empty_channels: bool
if True, returns channels without data as empty traces
Returns
-------
timeseries: Stream
timeseries object with requested data
Raises
------
TimeseriesFactoryException
if invalid values are requested, or errors occur while
retrieving timeseries.
"""
observatory = observatory or self.observatory
channels = channels or self.channels
type = type or self.type
interval = interval or self.interval
if starttime > endtime:
raise TimeseriesFactoryException(
'Starttime before endtime "%s" "%s"' % (starttime, endtime)
)
Wilbur, Spencer Franklin
committed
######################
# I do not entirely understand what the pupose of the sys.stdout was tryinig to accomplish
Wilbur, Spencer Franklin
committed
# so I left it out but I think this may be worth a discussion. Apart from this I removed a lot of
# post processing functionality that dealt with removing NaN values from the timeseries. My understadning is that
# the IRIS DMC will return empty data but not NaN values.
Wilbur, Spencer Franklin
committed
######################
# # obspy factories sometimes write to stdout, instead of stderr
# original_stdout = sys.stdout
# try:
# # send stdout to stderr
# sys.stdout = sys.stderr
# # get the timeseries
timeseries = Stream()
Wilbur, Spencer Franklin
committed
for channel in channels:
Wilbur, Spencer Franklin
committed
data = self._get_timeseries(
starttime,
endtime,
observatory,
channel,
type,
interval,
add_empty_channels,
Wilbur, Spencer Franklin
committed
)
Wilbur, Spencer Franklin
committed
if len(data) == 0:
continue
timeseries += data
# finally:
# # restore stdout
# sys.stdout = original_stdout
self._post_process(timeseries, starttime, endtime, channels)
Wilbur, Spencer Franklin
committed
Wilbur, Spencer Franklin
committed
return timeseries
Wilbur, Spencer Franklin
committed
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def _get_timeseries(
self,
starttime: UTCDateTime,
endtime: UTCDateTime,
observatory: str,
channel: str,
type: DataType,
interval: DataInterval,
add_empty_channels: bool = True,
) -> Trace:
"""get timeseries data for a single channel.
Parameters
----------
starttime: UTCDateTime
the starttime of the requested data
endtime: UTCDateTime
the endtime of the requested data
observatory: str
observatory code
channel: str
single character channel {H, E, D, Z, F}
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
data interval
add_empty_channels: bool
if True, returns channels without data as empty traces
Returns
-------
data: Trace
timeseries trace of the requested channel data
"""
Wilbur, Spencer Franklin
committed
sncl = self.get_sncl(
Wilbur, Spencer Franklin
committed
station=observatory,
data_type=type,
interval=interval,
element=channel,
network=self.network,
location=self.locationCode,
)
Wilbur, Spencer Franklin
committed
Wilbur, Spencer Franklin
committed
# geomag-algorithms *should* treat starttime/endtime as inclusive everywhere;
# according to its author, EdgeCWB is inclusive of starttime, but exclusive of
# endtime, to satisfy seismic standards/requirements, to precision delta/2;
half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2
Wilbur, Spencer Franklin
committed
# Rotate the trace into a right handed coordinate frame.
# This will worrk assuming the metadata is reported correctly.
# Channel that require rotations
channel_rotations = ["X", "Y", "Z"]
Wilbur, Spencer Franklin
committed
try:
data = self.Client.get_waveforms(
network=sncl.network,
station=sncl.station,
location=sncl.location,
channel=sncl.channel,
starttime=starttime,
endtime=endtime + half_delta,
attach_response=True,
)
Wilbur, Spencer Franklin
committed
except FDSNNoDataException:
Wilbur, Spencer Franklin
committed
data = Stream()
data.merge()
Wilbur, Spencer Franklin
committed
if channel in channel_rotations:
data = self._rotate_and_return_requested_channel(
data, sncl, starttime, endtime, channel
)
Wilbur, Spencer Franklin
committed
if data.count() == 0 and add_empty_channels:
data += self._get_empty_trace(
starttime=starttime,
endtime=endtime,
observatory=observatory,
channel=channel,
data_type=type,
interval=interval,
network=sncl.network,
location=sncl.location,
)
if data.count() != 0:
TimeseriesUtility.pad_and_trim_trace(
trace=data[0], starttime=starttime, endtime=endtime
)
Wilbur, Spencer Franklin
committed
Wilbur, Spencer Franklin
committed
self._set_metadata(data, observatory, channel, type, interval)
Wilbur, Spencer Franklin
committed
return data
def _post_process(
self,
timeseries: Stream,
starttime: UTCDateTime,
endtime: UTCDateTime,
channels: List[str],
):
"""Post process a timeseries stream after the raw data is
Wilbur, Spencer Franklin
committed
is fetched from querymom. Specifically changes
Wilbur, Spencer Franklin
committed
any MaskedArray to a ndarray with nans representing gaps.
Then calls pad_timeseries to deal with gaps at the
beggining or end of the streams.
Parameters
----------
timeseries: Stream
The timeseries stream as returned by the call to get_waveforms
starttime: UTCDateTime
the starttime of the requested data
endtime: UTCDateTime
the endtime of the requested data
channels: array
list of channels to load
Notes: the original timeseries object is changed.
"""
TimeseriesUtility.pad_timeseries(timeseries, starttime, endtime)
Wilbur, Spencer Franklin
committed
# Retrieve and set metadata for each trace
Wilbur, Spencer Franklin
committed
disclaimer_texts = [
"DISCLAIMER",
"This is provisional data. Any data secured from the USGS database that are identified as provisional have not received Director's approval and are subject to revision. Source and Authority: U.S. Geological Survey Manual, Section 500.24",
]
Wilbur, Spencer Franklin
committed
for trace in timeseries:
Wilbur, Spencer Franklin
committed
if "comments" not in trace.stats:
trace.stats.comments = []
# Check if the disclaimer is already in the comments to avoid duplication
if not any("DISCLAIMER" in comment for comment in trace.stats.comments):
trace.stats.comments.extend(disclaimer_texts)
Wilbur, Spencer Franklin
committed
def _set_metadata(
self,
stream: Stream,
observatory: str,
channel: str,
Wilbur, Spencer Franklin
committed
type: str,
interval: str,
Wilbur, Spencer Franklin
committed
):
"""set metadata for a given stream/channel
Parameters
----------
observatory: str
observatory code
channel: str
edge channel code {MVH, MVE, MVD, ...}
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
data interval
"""
for trace in stream:
self.observatoryMetadata.set_metadata(
trace.stats, observatory, channel, type, interval
)
Wilbur, Spencer Franklin
committed
def _rotate_and_return_requested_channel(
Wilbur, Spencer Franklin
committed
self,
data: Stream,
sncl,
starttime: UTCDateTime,
endtime: UTCDateTime,
requested_channel: str,
Wilbur, Spencer Franklin
committed
) -> Stream:
"""
Wilbur, Spencer Franklin
committed
Retrieves the necessary *F channels, rotates them to ZNE, and returns the requested channel.
Channels are mapped as follows: 'X' -> '*F2', 'Y' -> '*F1', 'Z' -> '*FZ'.
Wilbur, Spencer Franklin
committed
"""
Wilbur, Spencer Franklin
committed
Wilbur, Spencer Franklin
committed
# Initialize FDSN client and get the necessary data
FDSN = self.Client
# Determine if the requested channel is X, Y, or Z
Wilbur, Spencer Franklin
committed
Wilbur, Spencer Franklin
committed
inv = FDSN.get_stations(
network=sncl.network,
station=sncl.station,
location=sncl.location, # Use location if necessary
channel=sncl.channel, # Request *F1, *F2, and *FZ
Wilbur, Spencer Franklin
committed
starttime=starttime,
endtime=endtime,
level="response",
)
Wilbur, Spencer Franklin
committed
Wilbur, Spencer Franklin
committed
# Rotate the stream to ZNE
data.rotate(method="->ZNE", inventory=inv)
Wilbur, Spencer Franklin
committed
# After rotation, extract the correct channel based on requested_channel
if requested_channel == "X":
selected_channel = "*FN"
elif requested_channel == "Y":
selected_channel = "*FE"
elif requested_channel == "Z":
selected_channel = "*FZ"
else:
raise ValueError(f"Invalid channel {requested_channel}")
Wilbur, Spencer Franklin
committed
Wilbur, Spencer Franklin
committed
# Filter the rotated stream to return only the trace for the selected channel
rotated_stream = data.select(channel=selected_channel)
Wilbur, Spencer Franklin
committed
Wilbur, Spencer Franklin
committed
# If no data found for the selected channel, return an empty stream
Wilbur, Spencer Franklin
committed
return Stream()