diff --git a/geomagio/algorithm/AverageAlgorithm.py b/geomagio/algorithm/AverageAlgorithm.py index 75ee9170d6085f57b3409d915a3255f5a80e4fb6..692ff5c8f01fdc59f4cb5943f83e2ba753d0b390 100644 --- a/geomagio/algorithm/AverageAlgorithm.py +++ b/geomagio/algorithm/AverageAlgorithm.py @@ -11,10 +11,34 @@ import obspy.core class AverageAlgorithm(Algorithm): - """Algorithm that creates an averaged Dst. + """Algorithm that creates an average of multiple inputs of same channel (i.e., Dst index). Parameters ---------- + observatories: array_like + strings specifying multiple observatories to average + channel: string + specifies the channel to be processed. + location: string + location code for output + scales: array_like + floats that scale the corresponding observatories + min_count: integer + minimum number of observatories required for valid average + min_count_start: UTCDateTime + the beginning of an interval over which to apply min_count + min_count_end: UTCDateTime + the end of an interval over which to apply min_count + + Notes + ----- + The original version of this algorithm allowed no missing inputs. But, + allowing missing inputs is problematic as one gets nearer to realtime, + and input streams have differing latencies. So, min_count was added to + require a minimum number of observatories (more than 1), and the related + min_count_start and min_count_end were added to allow min_count to be + applied to a sub-interval (most likely something lagging realtime by + a sufficient interval to prevent frequent DC shifts in the output) """ @@ -25,6 +49,8 @@ class AverageAlgorithm(Algorithm): location=None, scales=None, min_count=None, + min_count_start=None, + min_count_end=None, ): Algorithm.__init__(self) self._npts = -1 @@ -35,6 +61,8 @@ class AverageAlgorithm(Algorithm): self.outchannel = channel self.outlocation = location self.min_count = min_count + self.min_count_start = min_count_start + self.min_count_end = min_count_end self.observatoryMetadata = ObservatoryMetadata() def check_stream(self, timeseries): @@ -94,7 +122,9 @@ class AverageAlgorithm(Algorithm): self.outlocation = self.outlocation or timeseries[0].stats.location - self.min_count = self.min_count or len(timeseries) + min_count = self.min_count or len(self.observatories) + min_count_start = self.min_count_start or timeseries[0].stats.starttime + min_count_end = self.min_count_end or timeseries[0].stats.endtime scale_values = self.scales or ([1] * len(timeseries)) lat_corr = {} @@ -130,7 +160,17 @@ class AverageAlgorithm(Algorithm): average_data = numpy.nansum(combined, axis=0) / count_data # apply min_count - average_data[count_data < self.min_count] = numpy.nan + average_data[count_data < min_count] = numpy.nan + + # apply min_count_start and min_count_end + # NOTE: the logic here is not very intuitive, but it works as intended + utc_datetimes = timeseries[0].times(type="utcdatetime") + average_data[ + (count_data < timeseries.count()) & (utc_datetimes > min_count_end) + ] = numpy.nan + average_data[ + (count_data < timeseries.count()) & (utc_datetimes < min_count_start) + ] = numpy.nan # create first output trace metadata average_stats = obspy.core.Stats() @@ -175,8 +215,22 @@ class AverageAlgorithm(Algorithm): "--average-min-count", default=None, help="Minimum number of inputs required to calculate average", - nargs="*", type=int, + metavar="N", + ) + parser.add_argument( + "--average-min-count-start", + default=None, + help="First UTC date time to apply min_count to YYYY-MM-DDTHH:MM:SS", + metavar="ISO8601", + type=obspy.core.UTCDateTime, + ) + parser.add_argument( + "--average-min-count-end", + default=None, + help="Last UTC date time to apply min_count to YYYY-MM-DDTHH:MM:SS", + metavar="ISO8601", + type=obspy.core.UTCDateTime, ) def configure(self, arguments): @@ -204,3 +258,13 @@ class AverageAlgorithm(Algorithm): self.outlocation = arguments.outlocationcode or arguments.locationcode self.min_count = arguments.average_min_count + self.min_count_start = arguments.average_min_count_start + self.min_count_end = arguments.average_min_count_end + + if arguments.realtime and not self.min_count_end: + # if the realtime parameter is defined in arguments (i.e., it is + # set via the controller), but min_count_end is not, then assume + # the most recent interval should NOT allow fewer than the full + # complement of inputs in order to calculate an average (previous + # "update" intervals will allow fewer than the full complement) + self.min_count_end = arguments.starttime diff --git a/test/StreamConverter_test.py b/test/StreamConverter_test.py index 0c8b4207ac380f9d27efb5c526d24f9703dcec78..446a4b553340a1646d9344602cdcf7c478a4a096 100644 --- a/test/StreamConverter_test.py +++ b/test/StreamConverter_test.py @@ -362,6 +362,7 @@ def __create_trace(channel, data, decbase=0): stats = obspy.core.Stats() stats.starttime = STARTTIME stats.delta = 60 + stats.npts = len(data) stats.declination_base = decbase stats.channel = channel numpy_data = numpy.array(data, dtype=numpy.float64) diff --git a/test/algorithm_test/AverageAlgorithm_test.py b/test/algorithm_test/AverageAlgorithm_test.py index 1c3a880095d78db5a296c2fd9d862271c6c33711..bbe377e70fe89abd8e290341c8497597f78a6c23 100644 --- a/test/algorithm_test/AverageAlgorithm_test.py +++ b/test/algorithm_test/AverageAlgorithm_test.py @@ -44,6 +44,18 @@ def test_process(): # Ensure the average of two np.testing.assert_array_equal(outstream[0].data, expected_solution) + # repeat process with min_count_end set + a.min_count_end = timeseries[0].stats.starttime + timeseries[0].stats.delta * 2 + expected_solution[3:] = np.nan + outstream = a.process(timeseries) + np.testing.assert_array_equal(outstream[0].data, expected_solution) + + # repeat process with min_count_start set + a.min_count_start = timeseries[0].stats.starttime + timeseries[0].stats.delta * 2 + expected_solution[:2] = np.nan + outstream = a.process(timeseries) + np.testing.assert_array_equal(outstream[0].data, expected_solution) + def test_gaps(): """AverageAlgorithm_test.test_gaps()