Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
"""Stream wrapper for TimeseriesFactory."""
from TimeseriesFactory import TimeseriesFactory
class StreamTimeseriesFactory(TimeseriesFactory):
"""Timeseries Factory for streams.
normally stdio.
Parameters
----------
factory: geomagio.TimeseriesFactory
wrapped factory.
stream: file object
io stream, normally either a file, or stdio
See Also
--------
Timeseriesfactory
"""
def __init__(self, factory, stream):
self.factory = factory
self.stream = stream
self.stream_data = None
def get_timeseries(self, starttime, endtime, observatory=None,
channels=None, type=None, interval=None):
"""Get timeseries using stream as input.
"""
if self.stream_data is None:
# only read stream once
self.stream_data = self.stream.read()
return self.factory.parse_string(
data=self.stream_data,
starttime=starttime,
endtime=endtime,
observatory=observatory)
def put_timeseries(self, timeseries, starttime=None, endtime=None,
channels=None, type=None, interval=None):
"""Put timeseries using stream as output.
"""
self.factory.write_file(self.stream, timeseries, channels)