Skip to content
Snippets Groups Projects
Commit 3820fc2c authored by Jeremy M Fee's avatar Jeremy M Fee
Browse files

Refactor stream factories into separate class

parent 4a22ca80
No related branches found
No related tags found
No related merge requests found
...@@ -6,18 +6,18 @@ import sys ...@@ -6,18 +6,18 @@ import sys
from obspy.core import Stream, UTCDateTime from obspy.core import Stream, UTCDateTime
from algorithm import algorithms from algorithm import algorithms
from PlotTimeseriesFactory import PlotTimeseriesFactory from PlotTimeseriesFactory import PlotTimeseriesFactory
from StreamTimeseriesFactory import StreamTimeseriesFactory
import TimeseriesUtility import TimeseriesUtility
# factory packages
import binlog
import edge import edge
import iaga2002 import iaga2002
import pcdcp import pcdcp
import imfv122 import imfv122
import imfv283 import imfv283
# factories for new filetypes
import temperature import temperature
import vbf import vbf
import binlog
class Controller(object): class Controller(object):
...@@ -243,16 +243,16 @@ class Controller(object): ...@@ -243,16 +243,16 @@ class Controller(object):
if output_gap[0] == options.starttime: if output_gap[0] == options.starttime:
# found fillable gap at start, recurse to previous interval # found fillable gap at start, recurse to previous interval
interval = options.endtime - options.starttime interval = options.endtime - options.starttime
starttime = options.starttime - interval - 1 starttime = options.starttime - interval
endtime = options.starttime - 1 endtime = options.starttime - 1
options.starttime = starttime options.starttime = starttime
options.endtime = endtime options.endtime = endtime
self.run_as_update(options, update_count + 1) self.run_as_update(options, update_count + 1)
# fill gap # fill gap
print >> sys.stderr, 'processing', \
options.starttime, options.endtime
options.starttime = output_gap[0] options.starttime = output_gap[0]
options.endtime = output_gap[1] options.endtime = output_gap[1]
print >> sys.stderr, 'processing', \
options.starttime, options.endtime
self.run(options) self.run(options)
...@@ -272,7 +272,6 @@ def get_input_factory(args): ...@@ -272,7 +272,6 @@ def get_input_factory(args):
input_factory = None input_factory = None
input_factory_args = None input_factory_args = None
input_stream = None input_stream = None
input_url = None
# standard arguments # standard arguments
input_factory_args = {} input_factory_args = {}
...@@ -282,14 +281,11 @@ def get_input_factory(args): ...@@ -282,14 +281,11 @@ def get_input_factory(args):
# stream/url arguments # stream/url arguments
if args.input_file is not None: if args.input_file is not None:
input_stream = open(args.input_file, 'r') input_stream = open(args.input_file, 'r')
input_factory_args['stream'] = input_stream
elif args.input_stdin: elif args.input_stdin:
input_stream = sys.stdin input_stream = sys.stdin
input_factory_args['stream'] = input_stream
elif args.input_url is not None: elif args.input_url is not None:
input_url = args.input_url
input_factory_args['urlInterval'] = args.input_url_interval input_factory_args['urlInterval'] = args.input_url_interval
input_factory_args['urlTemplate'] = input_url input_factory_args['urlTemplate'] = args.input_url
input_type = args.input input_type = args.input
if input_type == 'edge': if input_type == 'edge':
...@@ -306,34 +302,21 @@ def get_input_factory(args): ...@@ -306,34 +302,21 @@ def get_input_factory(args):
server=args.input_goes_server, server=args.input_goes_server,
user=args.input_goes_user, user=args.input_goes_user,
**input_factory_args) **input_factory_args)
elif input_type == 'iaga2002': else:
if input_stream is not None: # stream compatible factories
input_factory = iaga2002.StreamIAGA2002Factory( if input_type == 'iaga2002':
**input_factory_args) input_factory = iaga2002.IAGA2002Factory(**input_factory_args)
elif input_url is not None: elif input_type == 'imfv122':
input_factory = iaga2002.IAGA2002Factory( input_factory = imfv122.IMFV122Factory(**input_factory_args)
**input_factory_args) elif input_type == 'imfv283':
elif input_type == 'imfv122': input_factory = imfv283.IMFV283Factory(**input_factory_args)
if input_stream is not None: elif input_type == 'pcdcp':
input_factory = imfv122.StreamIMFV122Factory( input_factory = pcdcp.PCDCPFactory(**input_factory_args)
**input_factory_args) # wrap stream
elif input_url is not None:
input_factory = imfv122.IMFV122Factory(
**input_factory_args)
elif input_type == 'imfv283':
if input_stream is not None:
input_factory = imfv283.StreamIMFV283Factory(
**input_factory_args)
elif input_url is not None:
input_factory = imfv283.IMFV283Factory(
**input_factory_args)
elif input_type == 'pcdcp':
if input_stream is not None: if input_stream is not None:
input_factory = pcdcp.StreamPCDCPFactory( input_factory = StreamTimeseriesFactory(
**input_factory_args) factory=input_factory,
elif input_url is not None: stream=input_stream)
input_factory = pcdcp.PCDCPFactory(
**input_factory_args)
return input_factory return input_factory
...@@ -363,10 +346,8 @@ def get_output_factory(args): ...@@ -363,10 +346,8 @@ def get_output_factory(args):
# stream/url arguments # stream/url arguments
if args.output_file is not None: if args.output_file is not None:
output_stream = open(args.output_file, 'wb') output_stream = open(args.output_file, 'wb')
output_factory_args['stream'] = output_stream
elif args.output_stdout: elif args.output_stdout:
output_stream = sys.stdout output_stream = sys.stdout
output_factory_args['stream'] = output_stream
elif args.output_url is not None: elif args.output_url is not None:
output_url = args.output_url output_url = args.output_url
output_factory_args['urlInterval'] = args.output_url_interval output_factory_args['urlInterval'] = args.output_url_interval
...@@ -384,41 +365,25 @@ def get_output_factory(args): ...@@ -384,41 +365,25 @@ def get_output_factory(args):
tag=args.output_edge_tag, tag=args.output_edge_tag,
forceout=args.output_edge_forceout, forceout=args.output_edge_forceout,
**output_factory_args) **output_factory_args)
elif output_type == 'iaga2002':
if output_stream is not None:
output_factory = iaga2002.StreamIAGA2002Factory(
**output_factory_args)
elif output_url is not None:
output_factory = iaga2002.IAGA2002Factory(
**output_factory_args)
elif output_type == 'pcdcp':
if output_stream is not None:
output_factory = pcdcp.StreamPCDCPFactory(
**output_factory_args)
elif output_url is not None:
output_factory = pcdcp.PCDCPFactory(
**output_factory_args)
elif output_type == 'plot': elif output_type == 'plot':
output_factory = PlotTimeseriesFactory() output_factory = PlotTimeseriesFactory()
elif output_type == 'temperature': else:
if output_stream is not None: # stream compatible factories
output_factory = temperature.StreamTEMPFactory( if output_type == 'binlog':
**output_factory_args) output_factory = binlog.BinLogFactory(**output_factory_args)
elif output_url is not None: elif output_type == 'iaga2002':
output_factory = temperature.TEMPFactory( output_factory = iaga2002.IAGA2002Factory(**output_factory_args)
**output_factory_args) elif output_type == 'pcdcp':
elif output_type == 'vbf': output_factory = pcdcp.PCDCPFactory(**output_factory_args)
if output_stream is not None: elif output_type == 'temperature':
output_factory = vbf.StreamVBFFactory( output_factory = temperature.TEMPFactory(**output_factory_args)
**output_factory_args) elif output_type == 'vbf':
elif output_type == 'binlog': output_factory = vbf.VBFFactory(**output_factory_args)
# wrap stream
if output_stream is not None: if output_stream is not None:
output_factory = binlog.StreamBinLogFactory( output_factory = StreamTimeseriesFactory(
**output_factory_args) factory=output_factory,
elif output_url is not None: stream=output_stream)
output_factory = binlog.BinLogFactory(
output=output_type,
**output_factory_args)
return output_factory return output_factory
......
"""Stream wrapper for TimeseriesFactory."""
from TimeseriesFactory import TimeseriesFactory
class StreamTimeseriesFactory(TimeseriesFactory):
"""Timeseries Factory for streams.
normally stdio.
Parameters
----------
factory: geomagio.TimeseriesFactory
wrapped factory.
stream: file object
io stream, normally either a file, or stdio
See Also
--------
Timeseriesfactory
"""
def __init__(self, factory, stream):
self.factory = factory
self.stream = stream
self.stream_data = None
def get_timeseries(self, starttime, endtime, observatory=None,
channels=None, type=None, interval=None):
"""Get timeseries using stream as input.
"""
if self.stream_data is None:
# only read stream once
self.stream_data = self.stream.read()
return self.factory.parse_string(
data=self.stream_data,
starttime=starttime,
endtime=endtime,
observatory=observatory)
def put_timeseries(self, timeseries, starttime=None, endtime=None,
channels=None, type=None, interval=None):
"""Put timeseries using stream as output.
"""
self.factory.write_file(self.stream, timeseries, channels)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment