From 4ffc5407b9d7e73223429046d6f98c736ec08d67 Mon Sep 17 00:00:00 2001 From: pcain-usgs <pcain@usgs.gov> Date: Tue, 8 Sep 2020 18:08:37 -0600 Subject: [PATCH] Implement _validate_step --- geomagio/algorithm/FilterAlgorithm.py | 26 +++++------------- test/algorithm_test/FilterAlgorithm_test.py | 29 +++++++++++---------- 2 files changed, 22 insertions(+), 33 deletions(-) diff --git a/geomagio/algorithm/FilterAlgorithm.py b/geomagio/algorithm/FilterAlgorithm.py index ba78a759b..a7c2e37bb 100644 --- a/geomagio/algorithm/FilterAlgorithm.py +++ b/geomagio/algorithm/FilterAlgorithm.py @@ -72,10 +72,9 @@ class FilterAlgorithm(Algorithm): self.output_sample_period = output_sample_period self.steps = steps self.load_state() - # ensure correctly aligned coefficients in each step - self.steps = ( - self.steps and [self._prepare_step(step) for step in self.steps] or [] - ) + #  ensure correctly aligned coefficients in each step + self.steps = self.steps or [] + self._validate_steps() def load_state(self): """Load filter coefficients from json file if custom filter is used. @@ -134,21 +133,10 @@ class FilterAlgorithm(Algorithm): steps.append(step) return steps - def _prepare_step(self, step) -> Dict: - window = step["window"] - if step["type"] == "firfilter": - factor = 1 - if step["type"] == "average": - factor = 0 - if len(window) % 2 == factor: - return step - sys.stderr.write("Invalid number of taps. Appending center coefficient.") - new_window = np.array(window) - i = len(window) // 2 - np.insert(new_window, i + 1, np.average(window[i : i + 2])) - new_step = dict(step) - new_step["window"] = new_window - return new_step + def _validate_steps(self): + for step in self.steps: + if step["type"] == "firfilter" and len(step["window"]) % 2 != 1: + raise ValueError("Firfilter requires an odd number of coefficients") def can_produce_data(self, starttime, endtime, stream): """Can Produce data diff --git a/test/algorithm_test/FilterAlgorithm_test.py b/test/algorithm_test/FilterAlgorithm_test.py index f7d43b176..b03a37237 100644 --- a/test/algorithm_test/FilterAlgorithm_test.py +++ b/test/algorithm_test/FilterAlgorithm_test.py @@ -3,6 +3,7 @@ import json from numpy.testing import assert_almost_equal, assert_equal import numpy as np from obspy import read, UTCDateTime +import pytest from geomagio.algorithm import FilterAlgorithm import geomagio.iaga2002 as i2 @@ -259,9 +260,9 @@ def test_starttime_shift(): assert_equal(filtered[0].stats.endtime, UTCDateTime("2020-01-01T00:13:00Z")) -def test_prepare_step(): - """algorithm_test.FilterAlgorithm_test.test_custom() - Tests algorithm for 10Hz to second with custom filter coefficients. +def test_validate_steps(): + """algorithm_test.FilterAlgorithm_test.test_validate_steps() + Validates algorithm steps 10 Hz to second with custom coefficients. """ with open("etc/filter/coeffs.json", "rb") as f: step = json.loads(f.read()) @@ -270,14 +271,14 @@ def test_prepare_step(): half = numtaps // 2 # check initial assumption assert_equal(numtaps % 2, 1) - # expect step to be unchanged when window has odd length - unchanged = f._prepare_step(step) - assert_equal(unchanged, step) - # expect step to be extended when window has event length - even_step = {"window": np.delete(step["window"], numtaps // 2, 0)} - assert_equal(len(even_step["window"]) % 2, 0) - prepared = f._prepare_step(step) - assert_equal(len(prepared["window"]) % 2, 1) - # value is inserted in middle - assert_equal(prepared["window"][: half + 1], step["window"][: half + 1]) - assert_equal(prepared["window"][half:], step["window"][half:]) + f._validate_steps() + # expect step to raise a value error when window has an even length + f.steps = [ + { + "window": np.delete(step["window"], numtaps // 2, 0), + "type": "firfilter", + } + ] + assert_equal(len(f.steps[0]["window"]) % 2, 0) + with pytest.raises(ValueError): + f._validate_steps() -- GitLab