"""Factory that loads data from FDSN to edge """ from __future__ import absolute_import from datetime import datetime from typing import List, Optional, Tuple import numpy import sys import numpy.ma from obspy import Stream, UTCDateTime, Trace, read, read_inventory from obspy.clients.fdsn.header import FDSNNoDataException from obspy.clients.iris import Client from obspy.clients.fdsn import Client as FDSNClient from .. import ChannelConverter, TimeseriesUtility from ..geomag_types import DataInterval, DataType from ..TimeseriesFactory import TimeseriesFactory from ..TimeseriesFactoryException import TimeseriesFactoryException from ..ObservatoryMetadata import ObservatoryMetadata from ..VariometerMetadata import VariometerMetadata from .RawInputClient import RawInputClient from .FDSNSNCL import FDSNSNCL from .SNCL import SNCL class FDSNFactory(TimeseriesFactory): """TimeseriesFactory for Edge related data. Parameters ---------- host: str a string representing the IP number of the host to connect to. port: integer the port number the waveserver is listening on. write_port: integer the port number the client is writing to. cwbport: int the port number of the cwb host to connect to. tag: str A tag used by edge to log and associate a socket with a given data source forceout: bool Tells edge to forceout a packet to miniseed. Generally used when the user knows no more data is coming. observatory: str the observatory code for the desired observatory. channels: array an array of channels {H, D, E, F, Z, MGD, MSD, HGD}. Known since channel names are mapped based on interval and type, others are passed through, see #_get_edge_channel(). type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type interval: {'second', 'minute', 'hour', 'day'} data interval observatoryMetadata: ObservatoryMetadata object an ObservatoryMetadata object used to replace the default ObservatoryMetadata. locationCode: str the location code for the given edge server, overrides type in get_timeseries/put_timeseries cwbhost: str a string represeting the IP number of the cwb host to connect to. See Also -------- TimeseriesFactory Notes ----- This is designed to read from any earthworm style waveserver, but it currently only writes to an edge. Edge mimics an earthworm style waveserver close enough that we hope to maintain that compatibility for reading. """ def __init__( self, base_url: str = "http://service.iris.edu", tag: str = "GeomagAlg", forceout: bool = False, observatory: Optional[str] = None, network: str = "NT", channels: Optional[List[str]] = None, type: Optional[DataType] = None, interval: Optional[DataInterval] = None, variometerMetadata: Optional[VariometerMetadata] = None, locationCode: Optional[str] = None, snclMapper: str = "geomag", ): TimeseriesFactory.__init__(self, observatory, channels, type, interval) self.Client = FDSNClient(base_url) self.tag = tag self.forceout = forceout self.interval = interval self.observatoryMetadata = variometerMetadata or VariometerMetadata() self.network = network self.locationCode = locationCode if snclMapper == "geomag": self.get_sncl = FDSNSNCL.get_sncl else: raise TimeseriesFactoryException("Unrecognized SNCL mapper") def get_timeseries( self, starttime: UTCDateTime, endtime: UTCDateTime, observatory: Optional[str] = None, channels: Optional[List[str]] = None, type: Optional[DataType] = None, interval: Optional[DataInterval] = None, add_empty_channels: bool = True, ) -> Stream: """Get timeseries data Parameters ---------- starttime: UTCDateTime time of first sample. endtime: UTCDateTime time of last sample. observatory: str observatory code. channels: array list of channels to load type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type interval: {'second', 'minute', 'hour', 'day'} data interval add_empty_channels: bool if True, returns channels without data as empty traces Returns ------- timeseries: Stream timeseries object with requested data Raises ------ TimeseriesFactoryException if invalid values are requested, or errors occur while retrieving timeseries. """ observatory = observatory or self.observatory channels = channels or self.channels type = type or self.type interval = interval or self.interval if starttime > endtime: raise TimeseriesFactoryException( 'Starttime before endtime "%s" "%s"' % (starttime, endtime) ) ###################### # I do not entirely understand what the pupose of the sys.stdout was tryinig to accomplish # so I left it out but I think this may be worth a discussion. Apart from this I removed a lot of # post processing functionality that dealt with removing NaN values from the timeseries. My understadning is that # the IRIS DMC will return empty data but not NaN values. ###################### # # obspy factories sometimes write to stdout, instead of stderr # original_stdout = sys.stdout # try: # # send stdout to stderr # sys.stdout = sys.stderr # # get the timeseries timeseries = Stream() for channel in channels: data = self._get_timeseries( starttime, endtime, observatory, channel, type, interval, add_empty_channels, ) if len(data) == 0: continue timeseries += data # finally: # # restore stdout # sys.stdout = original_stdout self._post_process(timeseries, starttime, endtime, channels) return timeseries def _get_timeseries( self, starttime: UTCDateTime, endtime: UTCDateTime, observatory: str, channel: str, type: DataType, interval: DataInterval, add_empty_channels: bool = True, ) -> Trace: """get timeseries data for a single channel. Parameters ---------- starttime: UTCDateTime the starttime of the requested data endtime: UTCDateTime the endtime of the requested data observatory: str observatory code channel: str single character channel {H, E, D, Z, F} type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type interval: {'second', 'minute', 'hour', 'day'} data interval add_empty_channels: bool if True, returns channels without data as empty traces Returns ------- data: Trace timeseries trace of the requested channel data """ sncl = self.get_sncl( station=observatory, data_type=type, interval=interval, element=channel, network=self.network, location=self.locationCode, ) # geomag-algorithms *should* treat starttime/endtime as inclusive everywhere; # according to its author, EdgeCWB is inclusive of starttime, but exclusive of # endtime, to satisfy seismic standards/requirements, to precision delta/2; half_delta = TimeseriesUtility.get_delta_from_interval(interval) / 2 # Rotate the trace into a right handed coordinate frame. # This will worrk assuming the metadata is reported correctly. # Channel that require rotations channel_rotations = ["X", "Y", "Z"] try: data = self.Client.get_waveforms( network=sncl.network, station=sncl.station, location=sncl.location, channel=sncl.channel, starttime=starttime, endtime=endtime + half_delta, attach_response=True, ) except FDSNNoDataException: data = Stream() data.merge() if channel in channel_rotations: data = self._rotate_and_return_requested_channel( data, sncl, starttime, endtime, channel ) if data.count() == 0 and add_empty_channels: data += self._get_empty_trace( starttime=starttime, endtime=endtime, observatory=observatory, channel=channel, data_type=type, interval=interval, network=sncl.network, location=sncl.location, ) if data.count() != 0: TimeseriesUtility.pad_and_trim_trace( trace=data[0], starttime=starttime, endtime=endtime ) self._set_metadata(data, observatory, channel, type, interval) return data def _post_process( self, timeseries: Stream, starttime: UTCDateTime, endtime: UTCDateTime, channels: List[str], ): """Post process a timeseries stream after the raw data is is fetched from querymom. Specifically changes any MaskedArray to a ndarray with nans representing gaps. Then calls pad_timeseries to deal with gaps at the beggining or end of the streams. Parameters ---------- timeseries: Stream The timeseries stream as returned by the call to get_waveforms starttime: UTCDateTime the starttime of the requested data endtime: UTCDateTime the endtime of the requested data channels: array list of channels to load Notes: the original timeseries object is changed. """ TimeseriesUtility.pad_timeseries(timeseries, starttime, endtime) # Retrieve and set metadata for each trace disclaimer_texts = [ "DISCLAIMER", "This is provisional data. Any data secured from the USGS database that are identified as provisional have not received Director's approval and are subject to revision. Source and Authority: U.S. Geological Survey Manual, Section 500.24", ] for trace in timeseries: if "comments" not in trace.stats: trace.stats.comments = [] # Check if the disclaimer is already in the comments to avoid duplication if not any("DISCLAIMER" in comment for comment in trace.stats.comments): trace.stats.comments.extend(disclaimer_texts) def _set_metadata( self, stream: Stream, observatory: str, channel: str, type: str, interval: str, ): """set metadata for a given stream/channel Parameters ---------- observatory: str observatory code channel: str edge channel code {MVH, MVE, MVD, ...} type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} data type interval: {'second', 'minute', 'hour', 'day'} data interval """ for trace in stream: self.observatoryMetadata.set_metadata( trace.stats, observatory, channel, type, interval ) def _rotate_and_return_requested_channel( self, data: Stream, sncl, starttime: UTCDateTime, endtime: UTCDateTime, requested_channel: str, ) -> Stream: """ Retrieves the necessary *F channels, rotates them to ZNE, and returns the requested channel. Channels are mapped as follows: 'X' -> '*F2', 'Y' -> '*F1', 'Z' -> '*FZ'. """ # Initialize FDSN client and get the necessary data FDSN = self.Client # Determine if the requested channel is X, Y, or Z inv = FDSN.get_stations( network=sncl.network, station=sncl.station, location=sncl.location, # Use location if necessary channel=sncl.channel, # Request *F1, *F2, and *FZ starttime=starttime, endtime=endtime, level="response", ) # Rotate the stream to ZNE data.rotate(method="->ZNE", inventory=inv) # After rotation, extract the correct channel based on requested_channel if requested_channel == "X": selected_channel = "*FN" elif requested_channel == "Y": selected_channel = "*FE" elif requested_channel == "Z": selected_channel = "*FZ" else: raise ValueError(f"Invalid channel {requested_channel}") # Filter the rotated stream to return only the trace for the selected channel rotated_stream = data.select(channel=selected_channel) # If no data found for the selected channel, return an empty stream if len(rotated_stream) == 0: return Stream() return rotated_stream