diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index c25955fe141ab4114ed5c28b4a95d32ff1cfe2ed..525999834e0e0dd383e6a9d33fe310cd83454db7 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -9,15 +9,17 @@ to take advantage of it's newer realtime abilities. Edge is the USGS earthquake hazard centers replacement for earthworm. """ from __future__ import absolute_import - +from datetime import datetime import sys +from typing import List, Optional + import numpy import numpy.ma -import obspy.core -from datetime import datetime +from obspy import Stream, Trace, UTCDateTime from obspy.clients import earthworm from .. import ChannelConverter, TimeseriesUtility +from ..geomag_types import DataInterval, DataType from ..TimeseriesFactory import TimeseriesFactory from ..TimeseriesFactoryException import TimeseriesFactoryException from ..ObservatoryMetadata import ObservatoryMetadata @@ -34,16 +36,26 @@ class EdgeFactory(TimeseriesFactory): a string representing the IP number of the host to connect to. port: integer the port number the waveserver is listening on. + write_port: integer + the port number the client is writing to. + cwbport: int + the port number of the cwb host to connect to. + tag: str + A tag used by edge to log and associate a socket with a given data + source + forceout: bool + Tells edge to forceout a packet to miniseed. Generally used when + the user knows no more data is coming. observatory: str the observatory code for the desired observatory. channels: array an array of channels {H, D, E, F, Z, MGD, MSD, HGD}. Known since channel names are mapped based on interval and type, others are passed through, see #_get_edge_channel(). - type: str - the data type {variation, quasi-definitive, definitive} - interval: str - the data interval {day, hour, minute, second} + type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} + data type + interval: {'second', 'minute', 'hour', 'day'} + data interval observatoryMetadata: ObservatoryMetadata object an ObservatoryMetadata object used to replace the default ObservatoryMetadata. @@ -52,14 +64,6 @@ class EdgeFactory(TimeseriesFactory): in get_timeseries/put_timeseries cwbhost: str a string represeting the IP number of the cwb host to connect to. - cwbport: int - the port number of the cwb host to connect to. - tag: str - A tag used by edge to log and associate a socket with a given data - source - forceout: bool - Tells edge to forceout a packet to miniseed. Generally used when - the user knows no more data is coming. See Also -------- @@ -75,66 +79,66 @@ class EdgeFactory(TimeseriesFactory): def __init__( self, - host="cwbpub.cr.usgs.gov", - port=2060, - write_port=7981, - observatory=None, - channels=None, - type=None, - interval=None, - observatoryMetadata=None, - locationCode=None, - cwbhost=None, - cwbport=0, - tag="GeomagAlg", - forceout=False, + host: str = "cwbpub.cr.usgs.gov", + port: int = 2060, + write_port: int = 7981, + cwbport: int = 0, + tag: str = "GeomagAlg", + forceout: bool = False, + observatory: Optional[str] = None, + channels: Optional[List[str]] = None, + type: Optional[DataType] = None, + interval: Optional[DataInterval] = None, + observatoryMetadata: Optional[ObservatoryMetadata] = None, + locationCode: Optional[str] = None, + cwbhost: Optional[str] = None, ): TimeseriesFactory.__init__(self, observatory, channels, type, interval) self.client = earthworm.Client(host, port) - self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata() - self.tag = tag - self.locationCode = locationCode - self.interval = interval self.host = host self.port = port self.write_port = write_port - self.cwbhost = cwbhost or "" self.cwbport = cwbport + self.tag = tag self.forceout = forceout + self.interval = interval + self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata() + self.locationCode = locationCode + self.cwbhost = cwbhost or "" def get_timeseries( self, - starttime, - endtime, - observatory=None, - channels=None, - type=None, - interval=None, + starttime: UTCDateTime, + endtime: UTCDateTime, + observatory: Optional[str] = None, + channels: Optional[List[str]] = None, + type: Optional[DataType] = None, + interval: Optional[DataInterval] = None, add_empty_channels: bool = True, - ): + ) -> Stream: """Get timeseries data Parameters ---------- - starttime: obspy.core.UTCDateTime + starttime: UTCDateTime time of first sample. - endtime: obspy.core.UTCDateTime + endtime: UTCDateTime time of last sample. observatory: str observatory code. - channels: array_like + channels: array list of channels to load - type: {'variation', 'quasi-definitive', 'definitive'} - data type. - interval: {'day', 'hour', 'minute', 'second', 'tenhertz'} - data interval. - add_empty_channels + type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} + data type + interval: {'second', 'minute', 'hour', 'day'} + data interval + add_empty_channels: bool if True, returns channels without data as empty traces Returns ------- - obspy.core.Stream - timeseries object with requested data. + timeseries: Stream + timeseries object with requested data Raises ------ @@ -158,7 +162,7 @@ class EdgeFactory(TimeseriesFactory): # send stdout to stderr sys.stdout = sys.stderr # get the timeseries - timeseries = obspy.core.Stream() + timeseries = Stream() for channel in channels: data = self._get_timeseries( starttime, @@ -181,28 +185,28 @@ class EdgeFactory(TimeseriesFactory): def put_timeseries( self, - timeseries, - starttime=None, - endtime=None, - observatory=None, - channels=None, - type=None, - interval=None, + timeseries: Stream, + starttime: Optional[UTCDateTime] = None, + endtime: Optional[UTCDateTime] = None, + observatory: Optional[str] = None, + channels: Optional[List[str]] = None, + type: Optional[DataType] = None, + interval: Optional[DataInterval] = None, ): """Put timeseries data Parameters ---------- - timeseries: obspy.core.Stream + timeseries: Stream timeseries object with data to be written observatory: str - observatory code. - channels: array_like + observatory code + channels: array list of channels to load - type: {'variation', 'quasi-definitive', 'definitive'} - data type. - interval: {'day', 'hour', 'minute', 'second', 'tenhertz'} - data interval. + type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} + data type + interval: {'second', 'minute', 'hour', 'day'} + data interval Notes ----- @@ -231,13 +235,13 @@ class EdgeFactory(TimeseriesFactory): timeseries, observatory, channel, type, interval, starttime, endtime ) - def _convert_timeseries_to_decimal(self, stream): + def _convert_timeseries_to_decimal(self, stream: Stream): """convert geomag edge timeseries data stored as ints, to decimal by dividing by 1000.00 Parameters ---------- - stream : obspy.core.stream - a stream retrieved from a geomag edge representing one channel. + stream: Stream + a stream retrieved from a geomag edge representing one channel Notes ----- This routine changes the values in the timeseries. The user should @@ -246,17 +250,17 @@ class EdgeFactory(TimeseriesFactory): for trace in stream: trace.data = numpy.divide(trace.data, 1000.00) - def _convert_trace_to_int(self, trace_in): + def _convert_trace_to_int(self, trace_in: Trace) -> Trace: """convert geomag edge traces stored as decimal, to ints by multiplying by 1000 Parameters ---------- - trace : obspy.core.trace - a trace retrieved from a geomag edge representing one channel. + trace: Trace + a trace retrieved from a geomag edge representing one channel Returns ------- - obspy.core.trace + trace: Trace a trace converted to ints Notes ----- @@ -269,19 +273,19 @@ class EdgeFactory(TimeseriesFactory): return trace - def _convert_stream_to_masked(self, timeseries, channel): + def _convert_stream_to_masked(self, timeseries: Stream, channel: str) -> Stream: """convert geomag edge traces in a timeseries stream to a MaskedArray This allows for gaps and splitting. Parameters ---------- - stream : obspy.core.stream - a stream retrieved from a geomag edge representing one channel. - channel: string - the channel to be masked. + stream: Stream + a stream retrieved from a geomag edge representing one channel + channel: str + the channel to be masked Returns ------- - obspy.core.stream - a stream with all traces converted to masked arrays. + stream: Stream + a stream with all traces converted to masked arrays """ stream = timeseries.copy() for trace in stream.select(channel=channel): @@ -290,36 +294,36 @@ class EdgeFactory(TimeseriesFactory): def _get_timeseries( self, - starttime, - endtime, - observatory, - channel, - type, - interval, + starttime: UTCDateTime, + endtime: UTCDateTime, + observatory: str, + channel: str, + type: DataType, + interval: DataInterval, add_empty_channels: bool = True, - ): + ) -> Trace: """get timeseries data for a single channel. Parameters ---------- - starttime: obspy.core.UTCDateTime + starttime: UTCDateTime the starttime of the requested data - endtime: obspy.core.UTCDateTime + endtime: UTCDateTime the endtime of the requested data - observatory : str + observatory: str observatory code - channel : str + channel: str single character channel {H, E, D, Z, F} - type : str - data type {definitive, quasi-definitive, variation} - interval : str - interval length {minute, second} - add_empty_channels + type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} + data type + interval: {'second', 'minute', 'hour', 'day'} + data interval + add_empty_channels: bool if True, returns channels without data as empty traces Returns ------- - obspy.core.trace + data: Trace timeseries trace of the requested channel data """ sncl = LegacySNCL.get_sncl( @@ -340,7 +344,7 @@ class EdgeFactory(TimeseriesFactory): ) except TypeError: # get_waveforms() fails if no data is returned from Edge - data = obspy.core.Stream() + data = Stream() # make sure data is 32bit int for trace in data: @@ -360,7 +364,13 @@ class EdgeFactory(TimeseriesFactory): self._set_metadata(data, observatory, channel, type, interval) return data - def _post_process(self, timeseries, starttime, endtime, channels): + def _post_process( + self, + timeseries: Stream, + starttime: UTCDateTime, + endtime: UTCDateTime, + channels: List[str], + ): """Post process a timeseries stream after the raw data is is fetched from a waveserver. Specifically changes any MaskedArray to a ndarray with nans representing gaps. @@ -369,13 +379,13 @@ class EdgeFactory(TimeseriesFactory): Parameters ---------- - timeseries: obspy.core.stream + timeseries: Stream The timeseries stream as returned by the call to get_waveforms - starttime: obspy.core.UTCDateTime + starttime: UTCDateTime the starttime of the requested data - endtime: obspy.core.UTCDateTime + endtime: UTCDateTime the endtime of the requested data - channels: array_like + channels: array list of channels to load Notes: the original timeseries object is changed. @@ -394,30 +404,30 @@ class EdgeFactory(TimeseriesFactory): def _put_channel( self, - timeseries, - observatory, - channel, - type, - interval, - starttime, - endtime, + timeseries: Stream, + observatory: str, + channel: str, + type: DataType, + interval: DataInterval, + starttime: UTCDateTime, + endtime: UTCDateTime, ): """Put a channel worth of data Parameters ---------- - timeseries: obspy.core.Stream + timeseries: Stream timeseries object with data to be written observatory: str - observatory code. + observatory code channel: str channel to load - type: {'variation', 'quasi-definitive', 'definitive'} - data type. - interval: {'day', 'hour', 'minute', 'second', 'tenhertz'} - data interval. - starttime: obspy.core.UTCDateTime - endtime: obspy.core.UTCDateTime + type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} + data type + interval: {'second', 'minute', 'hour', 'day'} + data interval + starttime: UTCDateTime + endtime: UTCDateTime Notes ----- @@ -431,7 +441,7 @@ class EdgeFactory(TimeseriesFactory): location=self.locationCode, ) - now = obspy.core.UTCDateTime(datetime.utcnow()) + now = UTCDateTime(datetime.utcnow()) if ((now - endtime) > 864000) and (self.cwbport > 0): host = self.cwbhost port = self.cwbport @@ -470,7 +480,7 @@ class EdgeFactory(TimeseriesFactory): def _set_metadata( self, - stream: obspy.core.Stream, + stream: Stream, observatory: str, channel: str, type: str, @@ -479,14 +489,14 @@ class EdgeFactory(TimeseriesFactory): """set metadata for a given stream/channel Parameters ---------- - observatory + observatory: str observatory code - channel + channel: str edge channel code {MVH, MVE, MVD, ...} - type - data type {definitive, quasi-definitive, variation} - interval - interval length {minute, second} + type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} + data type + interval: {'second', 'minute', 'hour', 'day'} + data interval """ for trace in stream: self.observatoryMetadata.set_metadata(