Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"""WSGI implementation of Intermagnet Web Service
"""
from __future__ import print_function
from builtins import bytes, str
from geomagio.edge import EdgeFactory
from geomagio.iaga2002 import IAGA2002Writer
from obspy.core import UTCDateTime
class WebService(object):
def __init__(self, factory):
self.factory = factory
def __call__(self, environ, start_response):
"""Implement WSGI interface"""
# parse params
# fetch data
# send response
start_response('200 OK',
[
("Content-Type", "text/plain")
])
data = self.factory.get_timeseries(
observatory='BOU',
channels=('H', 'E', 'Z', 'F'),
starttime=UTCDateTime('2017-07-14T00:00:00Z'),
endtime=UTCDateTime('2017-07-15T00:00:00Z'),
type='variation',
interval='minute')
data = IAGA2002Writer.format(data, ('H', 'E', 'Z', 'F'))
if isinstance(data, str):
data = data.encode('utf8')
return [data]
class WebServiceQuery(object):
"""Query parameters for a web service request.
Parameters
----------
id : str
observatory
starttime : obspy.core.UTCDateTime
time of first requested sample
endtime : obspy.core.UTCDateTime
time of last requested sample
elements : array_like
list of requested elements
sampling_period : int
period between samples in seconds
default 60.
type : {'variation', 'adjusted', 'quasi-definitive', 'definitive'}
data type
default 'variation'.
format : {'iaga2002', 'json'}
output format.
default 'iaga2002'.
"""
def __init__(self, id=None, starttime=None, endtime=None, elements=None,
sampling_period=60, type='variation', format='iaga2002'):
self.id = id
self.starttime = starttime
self.endtime = endtime
self.elements = elements
self.sampling_period = sampling_period
self.type = type
self.format = format
@classmethod
def parse(params):
"""Parse query parameters from a dictionary.
Parameters
----------
params : dict
Returns
-------
WebServiceQuery
parsed query object.
"""
pass
if __name__ == '__main__':
from wsgiref.simple_server import make_server
app = WebService(EdgeFactory())
httpd = make_server('', 8080, app)
httpd.serve_forever()