From 2a4d71e1a80a6cf1489ff217a5806a2cd3320541 Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Mon, 14 Nov 2022 09:07:24 -0700 Subject: [PATCH 1/5] Fix __create_trace() in StreamConverter_test.py A "feature" in obspy is that the stats.npts metadata object is not automatically calculated if a Trace is created by passing in a Stats object (instead of a simple dictionary). The Trace still contained the data array, which is probably why this issue wasn't discovered sooner, but something as simple as trace.times() is broken if npts is not specified. It's amazing this wasn't discovered sooner, given how many times traces get "created" throughout geomag-algoriothms. Probably, we should survey the codebase to make sure this isn't a problem elsewhere. --- test/StreamConverter_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/StreamConverter_test.py b/test/StreamConverter_test.py index 0c8b4207a..446a4b553 100644 --- a/test/StreamConverter_test.py +++ b/test/StreamConverter_test.py @@ -362,6 +362,7 @@ def __create_trace(channel, data, decbase=0): stats = obspy.core.Stats() stats.starttime = STARTTIME stats.delta = 60 + stats.npts = len(data) stats.declination_base = decbase stats.channel = channel numpy_data = numpy.array(data, dtype=numpy.float64) -- GitLab From c946f502b68105aa7f6e37fcbbafe85c95273aff Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Mon, 14 Nov 2022 09:50:51 -0700 Subject: [PATCH 2/5] Add start/end options AverageAlgorithm's min_count The recently added `min_count` option to AverageAlgorithm.py leads to some undesirable behavior when realtime data, with asynchronous inputs, are being processed. By adding the ability to specify an interval over which `min_count` is applied, some of this undesirable behavior can be mitigated. In particular, if the `realtime` option is specified via the controller, and `min_count` is defined, the minimum number of inputs will be allowed only for time steps prior to `(UTCDateTime.now() - realtime)`; the full complement of inputs will be required to calculate averages more recent than that. One drawback is that if an input observatory goes offline for an extended period, the Dst index will be calculated with a persistent lag `realtime` seconds long. A user can always override this admittedly ad-hoc default behavior using the `min_count_start` and `min_count_end` options. --- geomagio/algorithm/AverageAlgorithm.py | 70 +++++++++++++++++++- test/algorithm_test/AverageAlgorithm_test.py | 12 ++++ 2 files changed, 80 insertions(+), 2 deletions(-) diff --git a/geomagio/algorithm/AverageAlgorithm.py b/geomagio/algorithm/AverageAlgorithm.py index 75ee9170d..3a0879188 100644 --- a/geomagio/algorithm/AverageAlgorithm.py +++ b/geomagio/algorithm/AverageAlgorithm.py @@ -11,10 +11,34 @@ import obspy.core class AverageAlgorithm(Algorithm): - """Algorithm that creates an averaged Dst. + """Algorithm that creates an average of multiple inputs of same channel (i.e., Dst index). Parameters ---------- + observatories: array_like + strings specifying multiple observatories to average + channel: string + specifies the channel to be processed. + location: string + location code for output + scales: array_like + floats that scale the corresponding observatories + min_count: integer + minimum number of observatories required for valid average + min_count_start: UTCDateTime + the beginning of an interval over which to apply min_count + min_count_end: UTCDateTime + the end of an interval over which to apply min_count + + Notes + ----- + The original version of this algorithm allowed no missing inputs. But, + allowing missing inputs is problematic as one gets nearer to realtime, + and input streams have differing latencies. So, min_count was added to + require a minimum number of observatories (more than 1), and the related + min_count_start and min_count_end were added to allow min_count to be + applied to a sub-interval (most likely something lagging realtime by + a sufficient interval to prevent frequent DC shifts in the output) """ @@ -25,6 +49,8 @@ class AverageAlgorithm(Algorithm): location=None, scales=None, min_count=None, + min_count_start=None, + min_count_end=None, ): Algorithm.__init__(self) self._npts = -1 @@ -35,6 +61,8 @@ class AverageAlgorithm(Algorithm): self.outchannel = channel self.outlocation = location self.min_count = min_count + self.min_count_start = min_count_start + self.min_count_end = min_count_end self.observatoryMetadata = ObservatoryMetadata() def check_stream(self, timeseries): @@ -95,6 +123,8 @@ class AverageAlgorithm(Algorithm): self.outlocation = self.outlocation or timeseries[0].stats.location self.min_count = self.min_count or len(timeseries) + self.min_count_start = self.min_count_start or timeseries[0].stats.starttime + self.min_count_end = self.min_count_end or timeseries[0].stats.endtime scale_values = self.scales or ([1] * len(timeseries)) lat_corr = {} @@ -132,6 +162,18 @@ class AverageAlgorithm(Algorithm): # apply min_count average_data[count_data < self.min_count] = numpy.nan + # apply min_count_start and min_count_end + # NOTE: the logic here is not very intuitive, but it works as intended + utc_datetimes = numpy.array( + [timeseries[0].stats.starttime + t for t in timeseries[0].times()] + ) + average_data[ + (count_data < timeseries.count()) & (utc_datetimes > self.min_count_end) + ] = numpy.nan + average_data[ + (count_data < timeseries.count()) & (utc_datetimes < self.min_count_start) + ] = numpy.nan + # create first output trace metadata average_stats = obspy.core.Stats() average_stats.station = "USGS" @@ -175,8 +217,22 @@ class AverageAlgorithm(Algorithm): "--average-min-count", default=None, help="Minimum number of inputs required to calculate average", - nargs="*", type=int, + metavar="N", + ) + parser.add_argument( + "--average-min-count-start", + default=None, + help="First UTC date time to apply min_count to YYYY-MM-DDTHH:MM:SS", + metavar="ISO8601", + type=obspy.core.UTCDateTime, + ) + parser.add_argument( + "--average-min-count-end", + default=None, + help="Last UTC date time to apply min_count to YYYY-MM-DDTHH:MM:SS", + metavar="ISO8601", + type=obspy.core.UTCDateTime, ) def configure(self, arguments): @@ -204,3 +260,13 @@ class AverageAlgorithm(Algorithm): self.outlocation = arguments.outlocationcode or arguments.locationcode self.min_count = arguments.average_min_count + self.min_count_start = arguments.average_min_count_start + self.min_count_end = arguments.average_min_count_end + + if arguments.realtime and not self.min_count_end: + # if the realtime parameter is defined in arguments (i.e., it is + # set via the controller), but min_count_end is not, then assume + # the most recent interval should NOT allow fewer than the full + # complement of inputs in order to calculate an average (previous + # "update" intervals will allow fewer than the full complement) + self.min_count_end = arguments.starttime diff --git a/test/algorithm_test/AverageAlgorithm_test.py b/test/algorithm_test/AverageAlgorithm_test.py index 1c3a88009..bbe377e70 100644 --- a/test/algorithm_test/AverageAlgorithm_test.py +++ b/test/algorithm_test/AverageAlgorithm_test.py @@ -44,6 +44,18 @@ def test_process(): # Ensure the average of two np.testing.assert_array_equal(outstream[0].data, expected_solution) + # repeat process with min_count_end set + a.min_count_end = timeseries[0].stats.starttime + timeseries[0].stats.delta * 2 + expected_solution[3:] = np.nan + outstream = a.process(timeseries) + np.testing.assert_array_equal(outstream[0].data, expected_solution) + + # repeat process with min_count_start set + a.min_count_start = timeseries[0].stats.starttime + timeseries[0].stats.delta * 2 + expected_solution[:2] = np.nan + outstream = a.process(timeseries) + np.testing.assert_array_equal(outstream[0].data, expected_solution) + def test_gaps(): """AverageAlgorithm_test.test_gaps() -- GitLab From 0dcdead5c2a87373586a1036a8c33b6de6075dfa Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Mon, 14 Nov 2022 12:57:00 -0700 Subject: [PATCH 3/5] Use self.observatories to determine number of inputs Previously use the number of traces in the input Stream. This could be fewer than the number of desired observatories if a given input's data was completely missing. --- geomagio/algorithm/AverageAlgorithm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geomagio/algorithm/AverageAlgorithm.py b/geomagio/algorithm/AverageAlgorithm.py index 3a0879188..1787ad199 100644 --- a/geomagio/algorithm/AverageAlgorithm.py +++ b/geomagio/algorithm/AverageAlgorithm.py @@ -122,7 +122,7 @@ class AverageAlgorithm(Algorithm): self.outlocation = self.outlocation or timeseries[0].stats.location - self.min_count = self.min_count or len(timeseries) + self.min_count = self.min_count or len(self.observatories) self.min_count_start = self.min_count_start or timeseries[0].stats.starttime self.min_count_end = self.min_count_end or timeseries[0].stats.endtime -- GitLab From d20c1df913f1b09c60686f365954fd7ad88ae5d3 Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Tue, 15 Nov 2022 15:05:14 -0700 Subject: [PATCH 4/5] Better way to create array of UTCDateTimes --- geomagio/algorithm/AverageAlgorithm.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/geomagio/algorithm/AverageAlgorithm.py b/geomagio/algorithm/AverageAlgorithm.py index 1787ad199..b065f51f2 100644 --- a/geomagio/algorithm/AverageAlgorithm.py +++ b/geomagio/algorithm/AverageAlgorithm.py @@ -164,9 +164,7 @@ class AverageAlgorithm(Algorithm): # apply min_count_start and min_count_end # NOTE: the logic here is not very intuitive, but it works as intended - utc_datetimes = numpy.array( - [timeseries[0].stats.starttime + t for t in timeseries[0].times()] - ) + utc_datetimes = timeseries[0].times(type="utcdatetime") average_data[ (count_data < timeseries.count()) & (utc_datetimes > self.min_count_end) ] = numpy.nan -- GitLab From 219be6e56905da8d4d3a2b7346b0e167df7695e0 Mon Sep 17 00:00:00 2001 From: "E. Joshua Rigler" <erigler@usgs.gov> Date: Tue, 15 Nov 2022 18:29:39 -0700 Subject: [PATCH 5/5] Don't modify algorithm state inside process() method --- geomagio/algorithm/AverageAlgorithm.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/geomagio/algorithm/AverageAlgorithm.py b/geomagio/algorithm/AverageAlgorithm.py index b065f51f2..692ff5c8f 100644 --- a/geomagio/algorithm/AverageAlgorithm.py +++ b/geomagio/algorithm/AverageAlgorithm.py @@ -122,9 +122,9 @@ class AverageAlgorithm(Algorithm): self.outlocation = self.outlocation or timeseries[0].stats.location - self.min_count = self.min_count or len(self.observatories) - self.min_count_start = self.min_count_start or timeseries[0].stats.starttime - self.min_count_end = self.min_count_end or timeseries[0].stats.endtime + min_count = self.min_count or len(self.observatories) + min_count_start = self.min_count_start or timeseries[0].stats.starttime + min_count_end = self.min_count_end or timeseries[0].stats.endtime scale_values = self.scales or ([1] * len(timeseries)) lat_corr = {} @@ -160,16 +160,16 @@ class AverageAlgorithm(Algorithm): average_data = numpy.nansum(combined, axis=0) / count_data # apply min_count - average_data[count_data < self.min_count] = numpy.nan + average_data[count_data < min_count] = numpy.nan # apply min_count_start and min_count_end # NOTE: the logic here is not very intuitive, but it works as intended utc_datetimes = timeseries[0].times(type="utcdatetime") average_data[ - (count_data < timeseries.count()) & (utc_datetimes > self.min_count_end) + (count_data < timeseries.count()) & (utc_datetimes > min_count_end) ] = numpy.nan average_data[ - (count_data < timeseries.count()) & (utc_datetimes < self.min_count_start) + (count_data < timeseries.count()) & (utc_datetimes < min_count_start) ] = numpy.nan # create first output trace metadata -- GitLab