Newer
Older
Wilbur, Spencer Franklin
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
"""Factory that loads data from FDSN to edge
"""
from __future__ import absolute_import
from datetime import datetime
from typing import List, Optional
import numpy
import sys
import numpy.ma
from obspy import Stream, UTCDateTime, Trace, read, read_inventory
from obspy.clients.iris import Client
from obspy.clients.fdsn import Client as FDSNClient
from .. import ChannelConverter, TimeseriesUtility
from ..geomag_types import DataInterval, DataType
from ..TimeseriesFactory import TimeseriesFactory
from ..TimeseriesFactoryException import TimeseriesFactoryException
from ..ObservatoryMetadata import ObservatoryMetadata
from ..VariometerMetadata import VariometerMetadata
from .RawInputClient import RawInputClient
from .FDSNSNCL import FDSNSNCL
class FDSNFactory(TimeseriesFactory):
"""TimeseriesFactory for Edge related data.
Parameters
----------
host: str
a string representing the IP number of the host to connect to.
port: integer
the port number the waveserver is listening on.
write_port: integer
the port number the client is writing to.
cwbport: int
the port number of the cwb host to connect to.
tag: str
A tag used by edge to log and associate a socket with a given data
source
forceout: bool
Tells edge to forceout a packet to miniseed. Generally used when
the user knows no more data is coming.
observatory: str
the observatory code for the desired observatory.
channels: array
an array of channels {H, D, E, F, Z, MGD, MSD, HGD}.
Known since channel names are mapped based on interval and type,
others are passed through, see #_get_edge_channel().
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
data interval
observatoryMetadata: ObservatoryMetadata object
an ObservatoryMetadata object used to replace the default
ObservatoryMetadata.
locationCode: str
the location code for the given edge server, overrides type
in get_timeseries/put_timeseries
cwbhost: str
a string represeting the IP number of the cwb host to connect to.
See Also
--------
TimeseriesFactory
Notes
-----
This is designed to read from any earthworm style waveserver, but it
currently only writes to an edge. Edge mimics an earthworm style
waveserver close enough that we hope to maintain that compatibility
for reading.
"""
def __init__(
self,
Wilbur, Spencer Franklin
committed
base_url: str = "http://service.iris.edu",
Wilbur, Spencer Franklin
committed
tag: str = "GeomagAlg",
forceout: bool = False,
observatory: Optional[str] = None,
network: str = "NT",
channels: Optional[List[str]] = None,
type: Optional[DataType] = None,
interval: Optional[DataInterval] = None,
variometerMetadata: Optional[VariometerMetadata] = None,
locationCode: Optional[str] = None,
Wilbur, Spencer Franklin
committed
Wilbur, Spencer Franklin
committed
):
TimeseriesFactory.__init__(self, observatory, channels, type, interval)
Wilbur, Spencer Franklin
committed
self.Client = FDSNClient(base_url)
Wilbur, Spencer Franklin
committed
self.tag = tag
self.forceout = forceout
self.interval = interval
self.observatoryMetadata = variometerMetadata or VariometerMetadata()
self.network = network
self.locationCode = locationCode
Wilbur, Spencer Franklin
committed
Wilbur, Spencer Franklin
committed
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def get_timeseries(
self,
starttime: UTCDateTime,
endtime: UTCDateTime,
observatory: Optional[str] = None,
channels: Optional[List[str]] = None,
type: Optional[DataType] = None,
interval: Optional[DataInterval] = None,
add_empty_channels: bool = True,
) -> Stream:
"""Get timeseries data
Parameters
----------
starttime: UTCDateTime
time of first sample.
endtime: UTCDateTime
time of last sample.
observatory: str
observatory code.
channels: array
list of channels to load
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
data interval
add_empty_channels: bool
if True, returns channels without data as empty traces
Returns
-------
timeseries: Stream
timeseries object with requested data
Raises
------
TimeseriesFactoryException
if invalid values are requested, or errors occur while
retrieving timeseries.
"""
observatory = observatory or self.observatory
channels = channels or self.channels
type = type or self.type
interval = interval or self.interval
if starttime > endtime:
raise TimeseriesFactoryException(
'Starttime before endtime "%s" "%s"' % (starttime, endtime)
)
Wilbur, Spencer Franklin
committed
######################
#I do not entirely understand what the pupose of the sys.stdout was tryinig to accomplish
# so I left it out but I think this may be worth a discussion. Apart from this I removed a lot of
# post processing functionality that dealt with removing NaN values from the timeseries. My understadning is that
# the IRIS DMC will return empty data but not NaN values.
######################
# # obspy factories sometimes write to stdout, instead of stderr
# original_stdout = sys.stdout
# try:
# # send stdout to stderr
# sys.stdout = sys.stderr
# # get the timeseries
timeseries = Stream()
Wilbur, Spencer Franklin
committed
for channel in channels:
Wilbur, Spencer Franklin
committed
data = self._get_timeseries(
starttime,
endtime,
observatory,
channel,
type,
interval,
add_empty_channels,
Wilbur, Spencer Franklin
committed
)
Wilbur, Spencer Franklin
committed
if len(data) == 0:
continue
timeseries += data
# finally:
# # restore stdout
# sys.stdout = original_stdout
self._post_process(timeseries, starttime, endtime, channels)
Wilbur, Spencer Franklin
committed
Wilbur, Spencer Franklin
committed
return timeseries
Wilbur, Spencer Franklin
committed
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def _get_timeseries(
self,
starttime: UTCDateTime,
endtime: UTCDateTime,
observatory: str,
channel: str,
type: DataType,
interval: DataInterval,
add_empty_channels: bool = True,
) -> Trace:
"""get timeseries data for a single channel.
Parameters
----------
starttime: UTCDateTime
the starttime of the requested data
endtime: UTCDateTime
the endtime of the requested data
observatory: str
observatory code
channel: str
single character channel {H, E, D, Z, F}
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
data interval
add_empty_channels: bool
if True, returns channels without data as empty traces
Returns
-------
data: Trace
timeseries trace of the requested channel data
"""
sncl = FDSNSNCL.get_sncl(
station=observatory,
data_type=type,
interval=interval,
element=channel,
network=self.network,
location=self.locationCode,
)
# geomag-algorithms *should* treat starttime/endtime as inclusive everywhere;
# according to its author, EdgeCWB is inclusive of starttime, but exclusive of
# endtime, to satisfy seismic standards/requirements, to precision delta/2;
half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2
try:
data = self.Client.get_waveforms(
network=sncl.network,
station=sncl.station,
location=sncl.location,
channel=sncl.channel,
starttime=starttime,
endtime=endtime + half_delta,
attach_response=True,
)
except TypeError:
data = Stream()
# make sure data is 32bit int
for trace in data:
trace.data = trace.data.astype("i4")
data.merge()
if data.count() == 0 and add_empty_channels:
data += self._get_empty_trace(
starttime=starttime,
endtime=endtime,
observatory=observatory,
channel=channel,
data_type=type,
interval=interval,
network=sncl.network,
location=sncl.location,
)
if data.count() != 0:
TimeseriesUtility.pad_and_trim_trace(
trace=data[0], starttime=starttime, endtime=endtime
)
# Beneath is necessary code to check the reported azimuth
# for the LF1 (E or Y) and LF2 (H or X) channels and determine if intstrument axes have been reversed.
azi, dip = self._get_orientations(data[0], starttime)
# Checks Azimuth for LF2 channel
if 270 > azi and azi > 90.0 and data[0].stats.channel == "LF2":
data[0].data *= -1
# Checks Azimuth for LF1 channel
elif azi > 180 and azi < 360 and data[0].stats.channel == "LF1":
data[0].data *= -1
# Checks Dip for LFZ channel
elif dip > 0 and dip < 180 and data[0].stats.channel == "LFZ":
data[0].data *= -1
# Remove channel Response:
# The function "remove_response" appears to be doing what we want;
# i.e. convert from counts to NT, but this may be a placeholder
# at least until we see how this function behaves if
# a station has a frequency response.
data.remove_response(output="DEF", zero_mean=False, taper=False)
self._set_metadata(data, observatory, channel, type, interval)
return data
def _post_process(
self,
timeseries: Stream,
starttime: UTCDateTime,
endtime: UTCDateTime,
channels: List[str],
):
"""Post process a timeseries stream after the raw data is
Wilbur, Spencer Franklin
committed
is fetched from querymom. Specifically changes
Wilbur, Spencer Franklin
committed
any MaskedArray to a ndarray with nans representing gaps.
Then calls pad_timeseries to deal with gaps at the
beggining or end of the streams.
Parameters
----------
timeseries: Stream
The timeseries stream as returned by the call to get_waveforms
starttime: UTCDateTime
the starttime of the requested data
endtime: UTCDateTime
the endtime of the requested data
channels: array
list of channels to load
Notes: the original timeseries object is changed.
"""
TimeseriesUtility.pad_timeseries(timeseries, starttime, endtime)
Wilbur, Spencer Franklin
committed
# Retrieve and set metadata for each trace
for trace in timeseries:
if "comments" in trace.stats:
trace.stats.comments.extend(
[
"DISCLAIMER",
"This is provisional data. Any data secured from the USGS database that are identified as provisional have not received Director's approval and are subject to revision. Source and Authority: U.S. Geological Survey Manual, Section 500.24",
"DISCLAIMER",
]
) # input string
else:
trace.stats.comments = [
"DISCLAIMER",
"This is provisional data. Any data secured from the USGS database that are identified as provisional have not received Director's approval and are subject to revision. Source and Authority: U.S. Geological Survey Manual, Section 500.24",
"DISCLAIMER",
]
Wilbur, Spencer Franklin
committed
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def _set_metadata(
self,
stream: Stream,
observatory: str,
channel: str,
type: DataType,
interval: DataInterval,
):
"""set metadata for a given stream/channel
Parameters
----------
observatory: str
observatory code
channel: str
edge channel code {MVH, MVE, MVD, ...}
type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'}
data type
interval: {'second', 'minute', 'hour', 'day'}
data interval
"""
for trace in stream:
self.observatoryMetadata.set_metadata(
trace.stats, observatory, channel, type, interval
)
def _get_orientations(
self, trace: Trace, starttime: UTCDateTime
) -> tuple[float, float]:
# Retrieve station orientation information using FDSN for each trace
Wilbur, Spencer Franklin
committed
Wilbur, Spencer Franklin
committed
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
sncl = FDSNSNCL.get_sncl(
data_type=trace.stats.location,
element=trace.stats.channel,
interval=self.interval,
station=trace.stats.station,
network=trace.stats.network,
location=trace.stats.location,
)
FDSN = FDSNClient("IRIS")
inv = FDSN.get_stations(
network=sncl.network,
station=sncl.station,
channel=sncl.channel,
level="channel",
)
# Construct the channel code using the current trace's information
channel_code = f"{sncl.network}.{sncl.station}.{sncl.location}.{sncl.channel}"
# Get orientation for the constructed channel code and time
orient = inv.get_orientation(channel_code, starttime)
azimuth = orient["azimuth"]
dip = orient["dip"]
# Set azimuth in trace.stats.azimuth
trace.stats.azimuth = azimuth
# Add comments to trace.stats
if "comments" in trace.stats:
trace.stats.comments.extend(
[
"DISCLAIMER",
"This is provisional data. Any data secured from the USGS database that are identified as provisional have not received Director's approval and are subject to revision. Source and Authority: U.S. Geological Survey Manual, Section 500.24",
"DISCLAIMER",
]
)
else:
trace.stats.comments = [
"DISCLAIMER",
"This is provisional data. Any data secured from the USGS database that are identified as provisional have not received Director's approval and are subject to revision. Source and Authority: U.S. Geological Survey Manual, Section 500.24",
"DISCLAIMER",
]
# Return both azimuth and dip
return azimuth, dip