diff --git a/test/algorithm_test/FilterAlgorithm_test.py b/test/algorithm_test/FilterAlgorithm_test.py index 9f0b387a40732e5f002e7da7cefebeb27928943a..1bab77193a558dfa70a48bf4b57e730769189794 100644 --- a/test/algorithm_test/FilterAlgorithm_test.py +++ b/test/algorithm_test/FilterAlgorithm_test.py @@ -4,24 +4,21 @@ import geomagio.iaga2002 as i2 from nose.tools import assert_almost_equals -def test_process(): +def test_second(): + """algorithm_test.FilterAlgorithm_test.test_second() + Tests algorithm for 10Hz to second. """ - Check one minute filter data processing versus files generated from - original script - """ - # initialize 10Hz to 1sec filter - f = FilterAlgorithm(filtertype='default', - input_sample_period=0.1, + f = FilterAlgorithm(input_sample_period=0.1, output_sample_period=1) - # generation of etc/filter/10HZ_filter_sec.mseed + + # generation of 10HZ_filter_sec.mseed # starttime = UTCDateTime('2020-01-06T00:00:00Z') # endtime = UTCDateTime('2020-01-06T04:00:00Z') # m = MiniSeedFactory(port=2061, host='...', # convert_channels=['U', 'V', 'W'], # bin_conv=500, volt_conv=100) # f = FilterAlgorithm(input_sample_period=0.1, - # output_sample_period=1.0, - # filtertype='default') + # output_sample_period=1.0) # starttime, endtime = f.get_input_interval(starttime,endtime) # LLO_raw = m.get_timeseries(observatory='LLO', # starttime=starttime,endtime=endtime, @@ -29,136 +26,157 @@ def test_process(): # 'V_Bin', 'W_Volt', 'W_Bin'], # interval='tenhertz', type='variaton') # LLO_raw.write('10HZ_filter_sec.mseed') - llo_sec = read('etc/filter/10HZ_filter_sec.mseed') - filtered_sec = f.process(llo_sec) - # # generation of etc/filter/10HZ_filter_min.mseed + llo = read('etc/filter/10HZ_filter_sec.mseed') + filtered = f.process(llo) + + with open('etc/filter/LLO20200106vsec.sec', 'r') as f: + iaga = i2.StreamIAGA2002Factory(stream=f) + LLO = iaga.get_timeseries(starttime=None, + endtime=None, observatory='LLO') + + u = LLO.select(channel='U')[0] + v = LLO.select(channel='V')[0] + w = LLO.select(channel='W')[0] + + u_filt = filtered.select(channel='U')[0] + v_filt = filtered.select(channel='V')[0] + w_filt = filtered.select(channel='W')[0] + + for r in range(u.data.size): + assert_almost_equals(u_filt.data[r], u.data[r], 1) + assert_almost_equals(v_filt.data[r], v.data[r], 1) + assert_almost_equals(w_filt.data[r], w.data[r], 1) + + +def test_minute(): + """algorithm_test.FilterAlgorithm_test.test_minute() + Tests algorithm for 10Hz to minute. + """ + f = FilterAlgorithm(input_sample_period=0.1, + output_sample_period=60.0) + + # generation of 10HZ_filter_min.mseed # starttime = UTCDateTime('2020-01-06T00:00:00Z') # endtime = UTCDateTime('2020-01-06T04:00:00Z') # m = MiniSeedFactory(port=2061, host='...', - # convert_channels=['U', 'V', 'W'], - # bin_conv=500, volt_conv=100) + # convert_channels=['U', 'V', 'W']) # f = FilterAlgorithm(input_sample_period=0.1, - # output_sample_period=60.0, - # filtertype='default') + # output_sample_period=60.0) # starttime, endtime = f.get_input_interval(starttime,endtime) - # LLO_raw = m.get_timeseries(observatory='LLO', + # LLO = m.get_timeseries(observatory='LLO', # starttime=starttime,endtime=endtime, # channels=['U_Volt', 'U_Bin', 'V_Volt', # 'V_Bin', 'W_Volt', 'W_Bin'], # interval='tenhertz', type='variaton') - # LLO_raw.write('10HZ_filter_min.mseed') - llo_min = read('etc/filter/10HZ_filter_min.mseed') - f = FilterAlgorithm(filtertype='default', - input_sample_period=0.1, - output_sample_period=60) - filtered_min = f.process(llo_min) - - # # generation of etc/filter/10HZ_filter_hor.mseed + # LLO.write('10HZ_filter_min.mseed') + + llo = read('etc/filter/10HZ_filter_min.mseed') + filtered = f.process(llo) + + with open('etc/filter/LLO20200106vmin.min', 'r') as f: + iaga = i2.StreamIAGA2002Factory(stream=f) + LLO = iaga.get_timeseries(starttime=None, + endtime=None, observatory='LLO') + + u = LLO.select(channel='U')[0] + v = LLO.select(channel='V')[0] + w = LLO.select(channel='W')[0] + + u_filt = filtered.select(channel='U')[0] + v_filt = filtered.select(channel='V')[0] + w_filt = filtered.select(channel='W')[0] + + for r in range(u.data.size): + assert_almost_equals(u_filt.data[r], u.data[r], 1) + assert_almost_equals(v_filt.data[r], v.data[r], 1) + assert_almost_equals(w_filt.data[r], w.data[r], 1) + + +def test_hour(): + """algorithm_test.FilterAlgorithm_test.test_hour() + Tests algorithm for 10Hz to hour. + """ + f = FilterAlgorithm(input_sample_period=0.1, + output_sample_period=3600.0) + + # generation of 10HZ_filter_hor.mseed # starttime = UTCDateTime('2020-01-06T00:00:00Z') # endtime = UTCDateTime('2020-01-06T04:00:00Z') # m = MiniSeedFactory(port=2061, host='...', - # convert_channels=['U', 'V', 'W'], - # bin_conv=500, volt_conv=100) + # convert_channels=['U', 'V', 'W']) # f = FilterAlgorithm(input_sample_period=0.1, - # output_sample_period=3600.0, - # filtertype='default') + # output_sample_period=3600.0) # starttime, endtime = f.get_input_interval(starttime,endtime) - # LLO_raw = m.get_timeseries(observatory='LLO', + # LLO = m.get_timeseries(observatory='LLO', # starttime=starttime,endtime=endtime, # channels=['U_Volt', 'U_Bin', 'V_Volt', # 'V_Bin', 'W_Volt', 'W_Bin'], # interval='tenhertz', type='variaton') - # LLO_raw.write('10HZ_filter_hor.mseed') - llo_hor = read('etc/filter/10HZ_filter_hor.mseed') - f = FilterAlgorithm(filtertype='default', - input_sample_period=0.1, - output_sample_period=3600) - filtered_hor = f.process(llo_hor) - - # read iaga files for sec, min, and hor - with open('etc/filter/LLO20200106vsec.sec', 'r') as f: - iaga = i2.StreamIAGA2002Factory(stream=f) - LLO_sec = iaga.get_timeseries(starttime=None, - endtime=None, observatory='LLO') + # LLO.write('10HZ_filter_hor.mseed') - with open('etc/filter/LLO20200106vmin.min', 'r') as f: - iaga = i2.StreamIAGA2002Factory(stream=f) - LLO_min = iaga.get_timeseries(starttime=None, - endtime=None, observatory='LLO') + llo = read('etc/filter/10HZ_filter_hor.mseed') + filtered = f.process(llo) with open('etc/filter/LLO20200106vhor.hor', 'r') as f: iaga = i2.StreamIAGA2002Factory(stream=f) - LLO_hor = iaga.get_timeseries(starttime=None, + LLO = iaga.get_timeseries(starttime=None, endtime=None, observatory='LLO') - # unpack channels from loaded sec, min, and hor data files - u_sec = LLO_sec.select(channel='U')[0] - v_sec = LLO_sec.select(channel='V')[0] - w_sec = LLO_sec.select(channel='W')[0] - - u_min = LLO_min.select(channel='U')[0] - v_min = LLO_min.select(channel='V')[0] - w_min = LLO_min.select(channel='W')[0] - - u_hor = LLO_hor.select(channel='U')[0] - v_hor = LLO_hor.select(channel='V')[0] - w_hor = LLO_hor.select(channel='W')[0] - - # unpack channels from filtered data - u_filt_sec = filtered_sec.select(channel='U')[0] - v_filt_sec = filtered_sec.select(channel='V')[0] - w_filt_sec = filtered_sec.select(channel='W')[0] - - u_filt_min = filtered_min.select(channel='U')[0] - v_filt_min = filtered_min.select(channel='V')[0] - w_filt_min = filtered_min.select(channel='W')[0] - - u_filt_hor = filtered_hor.select(channel='U')[0] - v_filt_hor = filtered_hor.select(channel='V')[0] - w_filt_hor = filtered_hor.select(channel='W')[0] - - # compare filtered output to iaga files' output - for r in range(u_sec.data.size): - assert_almost_equals(u_filt_sec.data[r], u_sec.data[r], 1) - assert_almost_equals(v_filt_sec.data[r], v_sec.data[r], 1) - assert_almost_equals(w_filt_sec.data[r], w_sec.data[r], 1) - - for r in range(u_min.data.size): - assert_almost_equals(u_filt_min.data[r], u_min.data[r], 1) - assert_almost_equals(v_filt_min.data[r], v_min.data[r], 1) - assert_almost_equals(w_filt_min.data[r], w_min.data[r], 1) - - for r in range(u_hor.data.size): - assert_almost_equals(u_filt_hor.data[r], u_hor.data[r], 1) - assert_almost_equals(v_filt_hor.data[r], v_hor.data[r], 1) - assert_almost_equals(w_filt_hor.data[r], w_hor.data[r], 1) - - # read iaga file for custom filter(10Hz-1s) - with open('etc/filter/LLO20200106_custom_vsec.sec', 'r') as f: - iaga = i2.StreamIAGA2002Factory(stream=f) - LLO_sec_custom = iaga.get_timeseries(starttime=None, - endtime=None, observatory='LLO') + u = LLO.select(channel='U')[0] + v = LLO.select(channel='V')[0] + w = LLO.select(channel='W')[0] + + u_filt = filtered.select(channel='U')[0] + v_filt = filtered.select(channel='V')[0] + w_filt = filtered.select(channel='W')[0] - f = FilterAlgorithm(filtertype='custom', - input_sample_period=0.1, + for r in range(u.data.size): + assert_almost_equals(u_filt.data[r], u.data[r], 1) + assert_almost_equals(v_filt.data[r], v.data[r], 1) + assert_almost_equals(w_filt.data[r], w.data[r], 1) + + +def test_custom(): + """algorithm_test.FilterAlgorithm_test.test_custom() + Tests algorithm for 10Hz to second with custom filter coefficients. + """ + f = FilterAlgorithm(input_sample_period=0.1, output_sample_period=1.0, coeff_filename='etc/filter/coeffs.json') - filtered_sec_custom = f.process(llo_sec) + # generation of 10HZ_filter_sec.mseed + # starttime = UTCDateTime('2020-01-06T00:00:00Z') + # endtime = UTCDateTime('2020-01-06T04:00:00Z') + # m = MiniSeedFactory(port=2061, host='...', + # convert_channels=['U', 'V', 'W']) + # f = FilterAlgorithm(input_sample_period=0.1, + # output_sample_period=1.0) + # starttime, endtime = f.get_input_interval(starttime,endtime) + # LLO = m.get_timeseries(observatory='LLO', + # starttime=starttime,endtime=endtime, + # channels=['U_Volt', 'U_Bin', 'V_Volt', + # 'V_Bin', 'W_Volt', 'W_Bin'], + # interval='tenhertz', type='variaton') + # LLO.write('10HZ_filter_sec.mseed') + + llo = read('etc/filter/10HZ_filter_sec.mseed') + filtered = f.process(llo) + + with open('etc/filter/LLO20200106_custom_vsec.sec', 'r') as f: + iaga = i2.StreamIAGA2002Factory(stream=f) + LLO = iaga.get_timeseries(starttime=None, + endtime=None, observatory='LLO') - # # unpack channels from loaded min, sec, and hor data files - u_sec = LLO_sec_custom.select(channel='U')[0] - v_sec = LLO_sec_custom.select(channel='V')[0] - w_sec = LLO_sec_custom.select(channel='W')[0] + u = LLO.select(channel='U')[0] + v = LLO.select(channel='V')[0] + w = LLO.select(channel='W')[0] - # # unpack channels from filtered data - u_filt_sec = filtered_sec_custom.select(channel='U')[0] - v_filt_sec = filtered_sec_custom.select(channel='V')[0] - w_filt_sec = filtered_sec_custom.select(channel='W')[0] + u_filt = filtered.select(channel='U')[0] + v_filt = filtered.select(channel='V')[0] + w_filt = filtered.select(channel='W')[0] - # # compare filter output with iaga file - for r in range(u_sec.data.size): - assert_almost_equals(u_filt_sec.data[r], u_sec.data[r], 1) - assert_almost_equals(v_filt_sec.data[r], v_sec.data[r], 1) - assert_almost_equals(w_filt_sec.data[r], w_sec.data[r], 1) + for r in range(u.data.size): + assert_almost_equals(u_filt.data[r], u.data[r], 1) + assert_almost_equals(v_filt.data[r], v.data[r], 1) + assert_almost_equals(w_filt.data[r], w.data[r], 1)