diff --git a/geomagio/edge/FDSNFactory.py b/geomagio/edge/FDSNFactory.py index a7f8c061ee9432560772acf2251cb1aa00dfa6d9..b71e14439438f2d65ce6af1dfc0946537a663bdd 100644 --- a/geomagio/edge/FDSNFactory.py +++ b/geomagio/edge/FDSNFactory.py @@ -73,11 +73,7 @@ class FDSNFactory(TimeseriesFactory): def __init__( self, - base_url: str = "http://service.iris.edu/irisws", - host: str = "edgecwb.usgs.gov", - port: int = 2060, - write_port: int = 7981, - cwbport: int = 0, + base_url: str = "http://service.iris.edu", tag: str = "GeomagAlg", forceout: bool = False, observatory: Optional[str] = None, @@ -87,22 +83,20 @@ class FDSNFactory(TimeseriesFactory): interval: Optional[DataInterval] = None, variometerMetadata: Optional[VariometerMetadata] = None, locationCode: Optional[str] = None, - cwbhost: Optional[str] = None, + ): TimeseriesFactory.__init__(self, observatory, channels, type, interval) - # self.client = Client(base_url=base_url) - self.Client = FDSNClient("IRIS") - self.host = host - self.port = port - self.write_port = write_port - self.cwbport = cwbport + + self.Client = FDSNClient(base_url) + + self.tag = tag self.forceout = forceout self.interval = interval self.observatoryMetadata = variometerMetadata or VariometerMetadata() self.network = network self.locationCode = locationCode - self.cwbhost = cwbhost or "" + def get_timeseries( self, @@ -153,142 +147,42 @@ class FDSNFactory(TimeseriesFactory): raise TimeseriesFactoryException( 'Starttime before endtime "%s" "%s"' % (starttime, endtime) ) - - # obspy factories sometimes write to stdout, instead of stderr - original_stdout = sys.stdout - try: - # send stdout to stderr - sys.stdout = sys.stderr - # get the timeseries - timeseries = Stream() - for channel in channels: - data = self._get_timeseries( - starttime, - endtime, - observatory, - channel, - type, - interval, - add_empty_channels, - ) - if len(data) == 0: - continue - timeseries += data - finally: - # restore stdout - sys.stdout = original_stdout - self._post_process(timeseries, starttime, endtime, channels) - - return timeseries - - def put_timeseries( - self, - timeseries: Stream, - starttime: Optional[UTCDateTime] = None, - endtime: Optional[UTCDateTime] = None, - observatory: Optional[str] = None, - channels: Optional[List[str]] = None, - type: Optional[DataType] = None, - interval: Optional[DataInterval] = None, - ): - """Put timeseries data - - Parameters - ---------- - timeseries: Stream - timeseries object with data to be written - observatory: str - observatory code - channels: array - list of channels to load - type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} - data type - interval: {'second', 'minute', 'hour', 'day'} - data interval - - Notes - ----- - Streams sent to timeseries are expected to have a single trace per - channel and that trace should have an ndarray, with nan's - representing gaps. - """ - stats = timeseries[0].stats - observatory = observatory or stats.station or self.observatory - channels = channels or self.channels - type = type or self.type or stats.data_type - interval = interval or self.interval or stats.data_interval - - if starttime is None or endtime is None: - starttime, endtime = TimeseriesUtility.get_stream_start_end_times( - timeseries - ) - for channel in channels: - if timeseries.select(channel=channel).count() == 0: - raise TimeseriesFactoryException( - 'Missing channel "%s" for output, available channels %s' - % (channel, str(TimeseriesUtility.get_channels(timeseries))) - ) + + ###################### + #I do not entirely understand what the pupose of the sys.stdout was tryinig to accomplish + # so I left it out but I think this may be worth a discussion. Apart from this I removed a lot of + # post processing functionality that dealt with removing NaN values from the timeseries. My understadning is that + # the IRIS DMC will return empty data but not NaN values. + ###################### + + # # obspy factories sometimes write to stdout, instead of stderr + # original_stdout = sys.stdout + # try: + # # send stdout to stderr + # sys.stdout = sys.stderr + # # get the timeseries + timeseries = Stream() for channel in channels: - self._put_channel( - timeseries, observatory, channel, type, interval, starttime, endtime + data = self._get_timeseries( + starttime, + endtime, + observatory, + channel, + type, + interval, + add_empty_channels, ) + if len(data) == 0: + continue + timeseries += data + # finally: + # # restore stdout + # sys.stdout = original_stdout + self._post_process(timeseries, starttime, endtime, channels) - def _convert_timeseries_to_decimal(self, stream: Stream): - """convert geomag edge timeseries data stored as ints, to decimal by - dividing by 1000.00 - Parameters - ---------- - stream: Stream - a stream retrieved from a geomag edge representing one channel - Notes - ----- - This routine changes the values in the timeseries. The user should - make a copy before calling if they don't want that side effect. - """ - for trace in stream: - trace.data = numpy.divide(trace.data, 1000.00) - - def _convert_trace_to_int(self, trace_in: Trace) -> Trace: - """convert geomag edge traces stored as decimal, to ints by multiplying - by 1000 - - Parameters - ---------- - trace: Trace - a trace retrieved from a geomag edge representing one channel - Returns - ------- - trace: Trace - a trace converted to ints - Notes - ----- - this doesn't work on ndarray with nan's in it. - the trace must be a masked array. - """ - trace = trace_in.copy() - trace.data = numpy.multiply(trace.data, 1000.00) - trace.data = trace.data.astype(int) + return timeseries - return trace - def _convert_stream_to_masked(self, timeseries: Stream, channel: str) -> Stream: - """convert geomag edge traces in a timeseries stream to a MaskedArray - This allows for gaps and splitting. - Parameters - ---------- - stream: Stream - a stream retrieved from a geomag edge representing one channel - channel: str - the channel to be masked - Returns - ------- - stream: Stream - a stream with all traces converted to masked arrays - """ - stream = timeseries.copy() - for trace in stream.select(channel=channel): - trace.data = numpy.ma.masked_invalid(trace.data) - return stream def _get_timeseries( self, @@ -403,7 +297,7 @@ class FDSNFactory(TimeseriesFactory): channels: List[str], ): """Post process a timeseries stream after the raw data is - is fetched from a waveserver. Specifically changes + is fetched from querymom. Specifically changes any MaskedArray to a ndarray with nans representing gaps. Then calls pad_timeseries to deal with gaps at the beggining or end of the streams. @@ -421,94 +315,25 @@ class FDSNFactory(TimeseriesFactory): Notes: the original timeseries object is changed. """ - self._convert_timeseries_to_decimal(timeseries) - for trace in timeseries: - if isinstance(trace.data, numpy.ma.MaskedArray): - trace.data.set_fill_value(numpy.nan) - trace.data = trace.data.filled() - - if "D" in channels: - for trace in timeseries.select(channel="D"): - trace.data = ChannelConverter.get_radians_from_minutes(trace.data) TimeseriesUtility.pad_timeseries(timeseries, starttime, endtime) - - def _put_channel( - self, - timeseries: Stream, - observatory: str, - channel: str, - type: DataType, - interval: DataInterval, - starttime: UTCDateTime, - endtime: UTCDateTime, - ): - """Put a channel worth of data - - Parameters - ---------- - timeseries: Stream - timeseries object with data to be written - observatory: str - observatory code - channel: str - channel to load - type: {'adjusted', 'definitive', 'quasi-definitive', 'variation'} - data type - interval: {'second', 'minute', 'hour', 'day'} - data interval - starttime: UTCDateTime - endtime: UTCDateTime - - Notes - ----- - RawInputClient seems to only work when sockets are - """ - sncl = FDSNSNCL.get_sncl( - data_type=type, - element=channel, - interval=interval, - station=observatory, - network=self.locationCode, - ) - - now = UTCDateTime(datetime.utcnow()) - if ((now - endtime) > 864000) and (self.cwbport > 0): - host = self.cwbhost - port = self.cwbport - else: - host = self.host - port = self.write_port - - ric = RawInputClient( - self.tag, - host, - port, - sncl.station, - sncl.channel, - sncl.location, - sncl.network, - ) - - stream = self._convert_stream_to_masked(timeseries=timeseries, channel=channel) - - # Make certain there's actually data - if not numpy.ma.any(stream.select(channel=channel)[0].data): - return - - for trace in stream.select(channel=channel).split(): - trace_send = trace.copy() - trace_send.trim(starttime, endtime) - if channel == "D": - trace_send.data = ChannelConverter.get_minutes_from_radians( - trace_send.data - ) - trace_send = self._convert_trace_to_int(trace_send) - ric.send_trace(interval, trace_send) - if self.forceout: - ric.forceout() - ric.close() - + # Retrieve and set metadata for each trace + for trace in timeseries: + if "comments" in trace.stats: + trace.stats.comments.extend( + [ + "DISCLAIMER", + "This is provisional data. Any data secured from the USGS database that are identified as provisional have not received Director's approval and are subject to revision. Source and Authority: U.S. Geological Survey Manual, Section 500.24", + "DISCLAIMER", + ] + ) # input string + else: + trace.stats.comments = [ + "DISCLAIMER", + "This is provisional data. Any data secured from the USGS database that are identified as provisional have not received Director's approval and are subject to revision. Source and Authority: U.S. Geological Survey Manual, Section 500.24", + "DISCLAIMER", + ] + def _set_metadata( self, stream: Stream, @@ -538,7 +363,7 @@ class FDSNFactory(TimeseriesFactory): self, trace: Trace, starttime: UTCDateTime ) -> tuple[float, float]: # Retrieve station orientation information using FDSN for each trace - # print(trace.stats.location) + sncl = FDSNSNCL.get_sncl( data_type=trace.stats.location, element=trace.stats.channel,