diff --git a/.travis.yml b/.travis.yml index c05899dc55d44263589fc0a88dc42bd0ac62020c..040a943540ac27b11b9fe967cd982838c57d653c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,7 +21,7 @@ before_install: - conda info -a install: - conda config --add channels conda-forge - - conda create -q -n test-environment python=$TRAVIS_PYTHON_VERSION obspy pycurl nose flake8 coverage + - conda create -q -n test-environment python=$TRAVIS_PYTHON_VERSION obspy pycurl nose flake8 coverage webtest - source activate test-environment script: - flake8 --config=.flake8 bin/ geomagio/ test/ diff --git a/docs/develop.md b/docs/develop.md index 2a1721be0effec75a6e0ffe8695cc903607c01de..0ec225f47c18fd4b69e8f2f9a5611498d251caeb 100644 --- a/docs/develop.md +++ b/docs/develop.md @@ -8,11 +8,11 @@ code for this project, and may also be used to run a local copy of the code. Begin Developing ---------------- -1. Install `obspy`, `pycurl`, `flake8`, and `nose`. +1. Install `obspy`, `pycurl`, `flake8`, `nose`, `webtest`. > Using Anaconda is recommended ( https://conda.io/miniconda.html ). conda config --add channels conda-forge - conda create --name geomagenv obspy pycurl flake8 nose coverage + conda create --name geomagenv obspy pycurl flake8 nose coverage webtest source activate geomagenv 2. Fork this project on Github ( https://guides.github.com/activities/forking/ ). diff --git a/geomagio/WebService.py b/geomagio/WebService.py index 9a4af1edfe9539a28cc17335d98178f52fd02834..07bfcf059fc6b93d383fa4389751ba835c445c8d 100644 --- a/geomagio/WebService.py +++ b/geomagio/WebService.py @@ -6,7 +6,6 @@ from cgi import escape, parse_qs from collections import OrderedDict from datetime import datetime from json import dumps -import sys from geomagio.edge import EdgeFactory from geomagio.iaga2002 import IAGA2002Writer @@ -87,9 +86,8 @@ class WebService(object): query = self.parse(parse_qs(environ['QUERY_STRING'])) query._verify_parameters() self.output_format = query.output_format - except Exception: - exception = sys.exc_info()[1] - message = exception.args[0] + except Exception as e: + message = str(e) ftype = parse_qs(environ['QUERY_STRING']).get('format', [''])[0] if ftype == 'json': self.output_format = 'json' @@ -105,9 +103,8 @@ class WebService(object): query, timeseries, start_response, environ) if isinstance(timeseries_string, str): timeseries_string = timeseries_string.encode('utf8') - except Exception: - exception = sys.exc_info()[1] - message = exception.args[0] + except Exception as e: + message = "Server error." error_body = self.error(500, message, environ, start_response) return [error_body] return [timeseries_string] diff --git a/test/WebService_test.py b/test/WebService_test.py new file mode 100644 index 0000000000000000000000000000000000000000..b57167bfe387a033c577b57f57d7a583075b1904 --- /dev/null +++ b/test/WebService_test.py @@ -0,0 +1,145 @@ +"""Unit Tests for WebService""" +from cgi import parse_qs +from datetime import datetime +from nose.tools import assert_equals, assert_is_instance, assert_raises +import numpy +from webtest import TestApp + +from geomagio.WebService import _get_param +from geomagio.WebService import WebService +import obspy.core +from obspy.core.stream import Stream +from obspy.core.utcdatetime import UTCDateTime + + +class TestFactory(object): + "Factory to test for 200 and 400 response statuses." + @staticmethod + def get_timeseries(observatory=None, channels=None, + starttime=None, endtime=None, type=None, + interval=None): + stream = obspy.core.Stream() + for channel in channels: + stats = obspy.core.Stats() + stats.channel = channel + stats.starttime = starttime + stats.network = 'Test' + stats.station = observatory + stats.location = observatory + if interval == 'second': + stats.sampling_rate = 1. + elif interval == 'minute': + stats.sampling_rate = 1. / 60. + elif interval == 'hourly': + stats.sampling_rate = 1. / 3600. + elif interval == 'daily': + stats.sampling_rate = 1. / 86400. + length = int((endtime - starttime) * stats.sampling_rate) + stats.npts = length + 1 + data = numpy.full(length, numpy.nan, dtype=numpy.float64) + trace = obspy.core.Trace(data, stats) + stream.append(trace) + return stream + + +class ErrorFactory(object): + "Factory to test for 500 response status." + @staticmethod + def get_timeseries(observatory=None, channels=None, + starttime=None, endtime=None, type=None, + interval=None): + pass + + +def test__get_param(): + """WebService_test.test__get_param() + + Call function _get_param to make certain it gets back + the appropriate values and raises exceptions for invalid values. + """ + params = { + 'id': None, + 'elements': 'H,E,Z,F', + 'sampling_period': ['1', '60'], + } + assert_raises(Exception, _get_param, params, 'id', required=True) + elements = _get_param(params, 'elements') + assert_equals(elements, 'H,E,Z,F') + assert_raises(Exception, _get_param, params, 'sampling_period') + + +def test_fetch(): + """WebService_test.test_fetch()) + + Call function WebService.fetch to confirm tht it returns an + obspy.core.stream object. + """ + service = WebService(TestFactory()) + query = service.parse(parse_qs('id=BOU&starttime=2016-06-06' + '&endtime=2016-06-07&elements=H,E,Z,F&sampling_period=60' + '&format=iaga2002&type=variation')) + timeseries = service.fetch(query) + assert_is_instance(timeseries, Stream) + + +def test_parse(): + """WebService_test.test_parse() + + Create WebService instance and call parse to confirm that query + string values are applied to the correct class attribute. Also + confirm that default values are applied correctly. + """ + service = WebService(TestFactory()) + query = service.parse(parse_qs('id=BOU&starttime=2016-06-06' + '&endtime=2016-06-07&elements=H,E,Z,F&sampling_period=60' + '&format=iaga2002&type=variation')) + assert_equals(query.observatory_id, 'BOU') + assert_equals(query.starttime, UTCDateTime(2016, 6, 6, 0)) + assert_equals(query.endtime, UTCDateTime(2016, 6, 7, 0)) + assert_equals(query.elements, ['H', 'E', 'Z', 'F']) + assert_equals(query.sampling_period, '60') + assert_equals(query.output_format, 'iaga2002') + assert_equals(query.data_type, 'variation') + # Test that defaults are set for unspecified values + now = datetime.now() + today = UTCDateTime(year=now.year, month=now.month, day=now.day, hour=0) + tomorrow = today + (24 * 60 * 60 - 1) + query = service.parse(parse_qs('id=BOU')) + assert_equals(query.observatory_id, 'BOU') + assert_equals(query.starttime, today) + assert_equals(query.endtime, tomorrow) + assert_equals(query.elements, ('X', 'Y', 'Z', 'F')) + assert_equals(query.sampling_period, '60') + assert_equals(query.output_format, 'iaga2002') + assert_equals(query.data_type, 'variation') + assert_raises(Exception, service.parse, parse_qs('/?id=bad')) + + +def test_requests(): + """WebService_test.test_requests() + + Use TestApp to confirm correct response status, status int, + and content-type. + """ + app = TestApp(WebService(TestFactory())) + # Check invalid request (bad values) + response = app.get('/?id=bad', expect_errors=True) + assert_equals(response.status_int, 400) + assert_equals(response.status, '400 Bad Request') + assert_equals(response.content_type, 'text/plain') + # Check invalid request (duplicates) + response = app.get('/?id=BOU&id=BOU', expect_errors=True) + assert_equals(response.status_int, 400) + assert_equals(response.status, '400 Bad Request') + assert_equals(response.content_type, 'text/plain') + # Check valid request (upper and lower case) + response = app.get('/?id=BOU') + assert_equals(response.status_int, 200) + assert_equals(response.status, '200 OK') + assert_equals(response.content_type, 'text/plain') + # Test internal server error (use fake factory) + app = TestApp(WebService(ErrorFactory())) + response = app.get('/?id=BOU', expect_errors=True) + assert_equals(response.status_int, 500) + assert_equals(response.status, '500 Internal Server Error') + assert_equals(response.content_type, 'text/plain')