From 16fa4a2d194214050edd2ccd32d9b3669a60f1a2 Mon Sep 17 00:00:00 2001 From: pcain-usgs <pcain@usgs.gov> Date: Tue, 28 Jan 2020 14:09:49 -0700 Subject: [PATCH] Tests 10Hz to 1s, 10Hz to 1min, 1min to 1hr, and 10Hz to 1s custom filter --- geomagio/algorithm/FilterAlgorithm.py | 2 +- test/algorithm_test/FilterAlgorithm_test.py | 178 +++++++++++--------- 2 files changed, 97 insertions(+), 83 deletions(-) diff --git a/geomagio/algorithm/FilterAlgorithm.py b/geomagio/algorithm/FilterAlgorithm.py index 5a88dbe44..74b9478d0 100644 --- a/geomagio/algorithm/FilterAlgorithm.py +++ b/geomagio/algorithm/FilterAlgorithm.py @@ -1,5 +1,5 @@ from __future__ import absolute_import -from geomagio.algorithm import Algorithm +from .Algorithm import Algorithm import numpy as np from numpy.lib import stride_tricks as npls import scipy.signal as sps diff --git a/test/algorithm_test/FilterAlgorithm_test.py b/test/algorithm_test/FilterAlgorithm_test.py index 79f6f2f4f..a6f016be7 100644 --- a/test/algorithm_test/FilterAlgorithm_test.py +++ b/test/algorithm_test/FilterAlgorithm_test.py @@ -1,6 +1,5 @@ from geomagio.algorithm import FilterAlgorithm -from obspy.core import UTCDateTime, read -from geomagio.edge import MiniSeedFactory +from obspy.core import read import geomagio.iaga2002 as i2 from nose.tools import assert_almost_equals @@ -14,95 +13,109 @@ def test_process(): f = FilterAlgorithm(filtertype='default', input_sample_period=0.1, output_sample_period=1) - # generation of etc/filter/10HZ_filter.mseed + # generation of etc/filter/10HZ_filter_sec.mseed # starttime = UTCDateTime('2020-01-06T00:00:00Z') - # endtime = UTCDateTime('2020-01-07T00:00:00Z') + # endtime = UTCDateTime('2020-01-07T04:00:00Z') # starttime, endtime = f.get_input_interval(starttime, endtime) # m = MiniSeedFactory(port=2061, host='...') # llo_for_filter = m.get_timeseries(observatory='LLO', - # channels=['U_Bin', 'U_Volt', 'V_Bin', 'V_Volt', 'W_Bin', 'W_Volt'], + # channels=['U_Bin', 'U_Volt', 'V_Bin', + # 'V_Volt', 'W_Bin', 'W_Volt'], # type= 'variation', # interval= 'tenhertz', # starttime=starttime, # endtime=endtime) - llo_for_filter = read('etc/filter/10HZ_filter.mseed') - filtered_sec = f.process(llo_for_filter) + # filtered_sec = f.process(llo_for_filter) + llo_sec = read('etc/filter/10HZ_filter_sec.mseed') + filtered_sec = f.process(llo_sec) - # initialize 10Hz to 1min filter + # # generation of etc/filter/10HZ_filter_min.mseed # f = FilterAlgorithm(filtertype='default', # input_sample_period=0.1, # output_sample_period=60) # starttime = UTCDateTime('2020-01-06T00:00:00Z') - # endtime = UTCDateTime('2020-01-07T00:00:00Z') + # endtime = UTCDateTime('2020-01-07T04:00:00Z') # starttime, endtime = f.get_input_interval(starttime, endtime) # m = MiniSeedFactory(port=2061, host='...') # llo_for_filter = m.get_timeseries(observatory='LLO', - # channels=['U_Bin', 'U_Volt', 'V_Bin', 'V_Volt', 'W_Bin', 'W_Volt'], + # channels=['U_Bin', 'U_Volt', 'V_Bin', + # 'V_Volt', 'W_Bin', 'W_Volt'], # type='variation', # interval='tenhertz', # starttime=starttime, # endtime=endtime) # filtered_minute = f.process(llo_for_filter) - # initialize 10Hz to 1hour filter + llo_min = read('etc/filter/10HZ_filter_min.mseed') + f = FilterAlgorithm(filtertype='default', + input_sample_period=0.1, + output_sample_period=60) + filtered_min = f.process(llo_min) + + # # generation of etc/filter/10HZ_filter_min.mseed # f = FilterAlgorithm(filtertype='default', # input_sample_period=0.1, # output_sample_period=3600) # starttime = UTCDateTime('2020-01-06T00:00:00Z') - # endtime = UTCDateTime('2020-01-07T00:00:00Z') + # endtime = UTCDateTime('2020-01-07T04:00:00Z') # starttime, endtime = f.get_input_interval(starttime, endtime) # m = MiniSeedFactory(port=2061, host='...') # llo_for_filter = m.get_timeseries(observatory='LLO', - # channels=['U_Bin', 'U_Volt', 'V_Bin', 'V_Volt', 'W_Bin', 'W_Volt'], + # channels=['U_Bin', 'U_Volt', 'V_Bin', + # 'V_Volt', 'W_Bin', 'W_Volt'], # type='variation', # interval='tenhertz', # starttime=starttime, # endtime=endtime) # filtered_hour = f.process(llo_for_filter) - # read iaga files for sec, min, and hour - sec_iaga2002_file = open('etc/filter/LLO20200107vsec.sec') - sec_iaga2002_string = sec_iaga2002_file.read() - sec_iaga2002_file.close() - - min_iaga2002_file = open('etc/filter/LLO20200107vmin.min') - min_iaga2002_string = min_iaga2002_file.read() - min_iaga2002_file.close() - - hor_iaga2002_file = open('etc/filter/LLO20200107vhor.hor') - hor_iaga2002_string = hor_iaga2002_file.read() - hor_iaga2002_file.close() - - factory = i2.IAGA2002Factory() - _sec_ = factory.parse_string(sec_iaga2002_string) - _min_ = factory.parse_string(min_iaga2002_string) - _hor_ = factory.parse_string(hor_iaga2002_string) - - # unpack channels from loaded minutes data file - u_sec = _sec_.select(channel='U')[0] - v_sec = _sec_.select(channel='V')[0] - w_sec = _sec_.select(channel='W')[0] - - u_min = _min_.select(channel='U')[0] - v_min = _min_.select(channel='V')[0] - w_min = _min_.select(channel='W')[0] - - u_hor = _hor_.select(channel='U')[0] - v_hor = _hor_.select(channel='V')[0] - w_hor = _hor_.select(channel='W')[0] + llo_hor = read('etc/filter/10HZ_filter_hor.mseed') + f = FilterAlgorithm(filtertype='default', + input_sample_period=60, + output_sample_period=3600) + filtered_hor = f.process(llo_hor) + + # read iaga files for sec, min, and hor + with open('etc/filter/LLO20200106vsec.sec', 'r') as f: + iaga = i2.StreamIAGA2002Factory(stream=f) + LLO_sec = iaga.get_timeseries(starttime=None, + endtime=None, observatory='LLO') + + with open('etc/filter/LLO20200106vmin.min', 'r') as f: + iaga = i2.StreamIAGA2002Factory(stream=f) + LLO_min = iaga.get_timeseries(starttime=None, + endtime=None, observatory='LLO') + + with open('etc/filter/BOU20200106vhor.hor', 'r') as f: + iaga = i2.StreamIAGA2002Factory(stream=f) + BOU_hor = iaga.get_timeseries(starttime=None, + endtime=None, observatory='BOU') + + # unpack channels from loaded sec, min, and hor data files + u_sec = LLO_sec.select(channel='U')[0] + v_sec = LLO_sec.select(channel='V')[0] + w_sec = LLO_sec.select(channel='W')[0] + + u_min = LLO_min.select(channel='U')[0] + v_min = LLO_min.select(channel='V')[0] + w_min = LLO_min.select(channel='W')[0] + + h_hor = BOU_hor.select(channel='H')[0] + e_hor = BOU_hor.select(channel='E')[0] + z_hor = BOU_hor.select(channel='Z')[0] # unpack channels from filtered data u_filt_sec = filtered_sec.select(channel='U')[0] v_filt_sec = filtered_sec.select(channel='V')[0] w_filt_sec = filtered_sec.select(channel='W')[0] - # u_filt_min = filtered_minute.select(channel='U')[0] - # v_filt_min = filtered_minute.select(channel='V')[0] - # w_filt_min = filtered_minute.select(channel='W')[0] + u_filt_min = filtered_min.select(channel='U')[0] + v_filt_min = filtered_min.select(channel='V')[0] + w_filt_min = filtered_min.select(channel='W')[0] - # u_filt_hor = filtered_hour.select(channel='U')[0] - # v_filt_hor = filtered_hour.select(channel='V')[0] - # w_filt_hor = filtered_hour.select(channel='W')[0] + h_filt_hor = filtered_hor.select(channel='H')[0] + e_filt_hor = filtered_hor.select(channel='E')[0] + z_filt_hor = filtered_hor.select(channel='Z')[0] # compare filtered output to iaga files' output for r in range(u_sec.data.size): @@ -110,40 +123,41 @@ def test_process(): assert_almost_equals(v_filt_sec.data[r], v_sec.data[r], 1) assert_almost_equals(w_filt_sec.data[r], w_sec.data[r], 1) - # for r in range(u_min.data.size): - # assert_almost_equals(u_filt_min.data[r], u_min.data[r], 1) - # assert_almost_equals(v_filt_min.data[r], v_min.data[r], 1) - # assert_almost_equals(w_filt_min.data[r], w_min.data[r], 1) + for r in range(u_min.data.size): + assert_almost_equals(u_filt_min.data[r], u_min.data[r], 1) + assert_almost_equals(v_filt_min.data[r], v_min.data[r], 1) + assert_almost_equals(w_filt_min.data[r], w_min.data[r], 1) - # for r in range(u_hor.data.size): - # assert_almost_equals(u_filt_hor.data[r], u_hor.data[r], 1) - # assert_almost_equals(v_filt_hor.data[r], v_hor.data[r], 1) - # assert_almost_equals(w_filt_hor.data[r], w_hor.data[r], 1) + for r in range(h_hor.data.size): + assert_almost_equals(h_filt_hor.data[r], h_hor.data[r], 1) + assert_almost_equals(e_filt_hor.data[r], e_hor.data[r], 1) + assert_almost_equals(z_filt_hor.data[r], z_hor.data[r], 1) - # initialize custom filter - # f = FilterAlgorithm(filtertype='custom', - # coeff_filename='etc/filter/coeffs.json', - # input_sample_period=0.1, - # output_sample_period=1) - # starttime = UTCDateTime('2020-01-06T00:00:00Z') - # endtime = UTCDateTime('2020-01-07T00:00:00Z') - # starttime, endtime = f.get_input_interval(starttime, endtime) - # m = MiniSeedFactory(port=2061, host='...') - # llo_for_filter = m.get_timeseries(observatory='LLO', - # channels=['U_Bin', 'U_Volt', 'V_Bin', 'V_Volt', 'W_Bin', 'W_Volt'], - # type='variation', - # interval='tenhertz', - # starttime=starttime, - # endtime=endtime) - # filtered_hour = f.process(llo_for_filter) + # read iaga file for custom filter(10Hz-1s) + with open('etc/filter/LLO20200106_custom_vsec.sec', 'r') as f: + iaga = i2.StreamIAGA2002Factory(stream=f) + LLO_sec_custom = iaga.get_timeseries(starttime=None, + endtime=None, observatory='LLO') + + f = FilterAlgorithm(filtertype='custom', + input_sample_period=0.1, + output_sample_period=1, + coeff_filename='etc/filter/coeffs.json') + + filtered_sec_custom = f.process(llo_sec) + + # unpack channels from loaded min, sec, and hor data files + u_sec = LLO_sec_custom.select(channel='U')[0] + v_sec = LLO_sec_custom.select(channel='V')[0] + w_sec = LLO_sec_custom.select(channel='W')[0] # unpack channels from filtered data - # u_filt_sec = filtered_sec.select(channel='U')[0] - # v_filt_sec = filtered_sec.select(channel='V')[0] - # w_filt_sec = filtered_sec.select(channel='W')[0] - - # # compare filter output with iaga file - # for r in range(u_sec.data.size): - # assert_almost_equals(u_filt_sec.data[r], u_sec.data[r], 1) - # assert_almost_equals(v_filt_sec.data[r], v_sec.data[r], 1) - # assert_almost_equals(w_filt_sec.data[r], w_sec.data[r], 1) + u_filt_sec = filtered_sec_custom.select(channel='U')[0] + v_filt_sec = filtered_sec_custom.select(channel='V')[0] + w_filt_sec = filtered_sec_custom.select(channel='W')[0] + + # compare filter output with iaga file + for r in range(u_sec.data.size): + assert_almost_equals(u_filt_sec.data[r], u_sec.data[r], 1) + assert_almost_equals(v_filt_sec.data[r], v_sec.data[r], 1) + assert_almost_equals(w_filt_sec.data[r], w_sec.data[r], 1) -- GitLab