From 3820fc2cb9ff893bdcabd59ea4227e86b57a3b46 Mon Sep 17 00:00:00 2001 From: Jeremy Fee <jmfee@usgs.gov> Date: Mon, 6 Jun 2016 16:20:04 -0600 Subject: [PATCH] Refactor stream factories into separate class --- geomagio/Controller.py | 109 ++++++++++------------------ geomagio/StreamTimeseriesFactory.py | 43 +++++++++++ 2 files changed, 80 insertions(+), 72 deletions(-) create mode 100644 geomagio/StreamTimeseriesFactory.py diff --git a/geomagio/Controller.py b/geomagio/Controller.py index 370170a9..0f04652c 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -6,18 +6,18 @@ import sys from obspy.core import Stream, UTCDateTime from algorithm import algorithms from PlotTimeseriesFactory import PlotTimeseriesFactory +from StreamTimeseriesFactory import StreamTimeseriesFactory import TimeseriesUtility +# factory packages +import binlog import edge import iaga2002 import pcdcp import imfv122 import imfv283 - -# factories for new filetypes import temperature import vbf -import binlog class Controller(object): @@ -243,16 +243,16 @@ class Controller(object): if output_gap[0] == options.starttime: # found fillable gap at start, recurse to previous interval interval = options.endtime - options.starttime - starttime = options.starttime - interval - 1 + starttime = options.starttime - interval endtime = options.starttime - 1 options.starttime = starttime options.endtime = endtime self.run_as_update(options, update_count + 1) # fill gap - print >> sys.stderr, 'processing', \ - options.starttime, options.endtime options.starttime = output_gap[0] options.endtime = output_gap[1] + print >> sys.stderr, 'processing', \ + options.starttime, options.endtime self.run(options) @@ -272,7 +272,6 @@ def get_input_factory(args): input_factory = None input_factory_args = None input_stream = None - input_url = None # standard arguments input_factory_args = {} @@ -282,14 +281,11 @@ def get_input_factory(args): # stream/url arguments if args.input_file is not None: input_stream = open(args.input_file, 'r') - input_factory_args['stream'] = input_stream elif args.input_stdin: input_stream = sys.stdin - input_factory_args['stream'] = input_stream elif args.input_url is not None: - input_url = args.input_url input_factory_args['urlInterval'] = args.input_url_interval - input_factory_args['urlTemplate'] = input_url + input_factory_args['urlTemplate'] = args.input_url input_type = args.input if input_type == 'edge': @@ -306,34 +302,21 @@ def get_input_factory(args): server=args.input_goes_server, user=args.input_goes_user, **input_factory_args) - elif input_type == 'iaga2002': - if input_stream is not None: - input_factory = iaga2002.StreamIAGA2002Factory( - **input_factory_args) - elif input_url is not None: - input_factory = iaga2002.IAGA2002Factory( - **input_factory_args) - elif input_type == 'imfv122': - if input_stream is not None: - input_factory = imfv122.StreamIMFV122Factory( - **input_factory_args) - elif input_url is not None: - input_factory = imfv122.IMFV122Factory( - **input_factory_args) - elif input_type == 'imfv283': - if input_stream is not None: - input_factory = imfv283.StreamIMFV283Factory( - **input_factory_args) - elif input_url is not None: - input_factory = imfv283.IMFV283Factory( - **input_factory_args) - elif input_type == 'pcdcp': + else: + # stream compatible factories + if input_type == 'iaga2002': + input_factory = iaga2002.IAGA2002Factory(**input_factory_args) + elif input_type == 'imfv122': + input_factory = imfv122.IMFV122Factory(**input_factory_args) + elif input_type == 'imfv283': + input_factory = imfv283.IMFV283Factory(**input_factory_args) + elif input_type == 'pcdcp': + input_factory = pcdcp.PCDCPFactory(**input_factory_args) + # wrap stream if input_stream is not None: - input_factory = pcdcp.StreamPCDCPFactory( - **input_factory_args) - elif input_url is not None: - input_factory = pcdcp.PCDCPFactory( - **input_factory_args) + input_factory = StreamTimeseriesFactory( + factory=input_factory, + stream=input_stream) return input_factory @@ -363,10 +346,8 @@ def get_output_factory(args): # stream/url arguments if args.output_file is not None: output_stream = open(args.output_file, 'wb') - output_factory_args['stream'] = output_stream elif args.output_stdout: output_stream = sys.stdout - output_factory_args['stream'] = output_stream elif args.output_url is not None: output_url = args.output_url output_factory_args['urlInterval'] = args.output_url_interval @@ -384,41 +365,25 @@ def get_output_factory(args): tag=args.output_edge_tag, forceout=args.output_edge_forceout, **output_factory_args) - elif output_type == 'iaga2002': - if output_stream is not None: - output_factory = iaga2002.StreamIAGA2002Factory( - **output_factory_args) - elif output_url is not None: - output_factory = iaga2002.IAGA2002Factory( - **output_factory_args) - elif output_type == 'pcdcp': - if output_stream is not None: - output_factory = pcdcp.StreamPCDCPFactory( - **output_factory_args) - elif output_url is not None: - output_factory = pcdcp.PCDCPFactory( - **output_factory_args) elif output_type == 'plot': output_factory = PlotTimeseriesFactory() - elif output_type == 'temperature': - if output_stream is not None: - output_factory = temperature.StreamTEMPFactory( - **output_factory_args) - elif output_url is not None: - output_factory = temperature.TEMPFactory( - **output_factory_args) - elif output_type == 'vbf': - if output_stream is not None: - output_factory = vbf.StreamVBFFactory( - **output_factory_args) - elif output_type == 'binlog': + else: + # stream compatible factories + if output_type == 'binlog': + output_factory = binlog.BinLogFactory(**output_factory_args) + elif output_type == 'iaga2002': + output_factory = iaga2002.IAGA2002Factory(**output_factory_args) + elif output_type == 'pcdcp': + output_factory = pcdcp.PCDCPFactory(**output_factory_args) + elif output_type == 'temperature': + output_factory = temperature.TEMPFactory(**output_factory_args) + elif output_type == 'vbf': + output_factory = vbf.VBFFactory(**output_factory_args) + # wrap stream if output_stream is not None: - output_factory = binlog.StreamBinLogFactory( - **output_factory_args) - elif output_url is not None: - output_factory = binlog.BinLogFactory( - output=output_type, - **output_factory_args) + output_factory = StreamTimeseriesFactory( + factory=output_factory, + stream=output_stream) return output_factory diff --git a/geomagio/StreamTimeseriesFactory.py b/geomagio/StreamTimeseriesFactory.py new file mode 100644 index 00000000..fb7f00ae --- /dev/null +++ b/geomagio/StreamTimeseriesFactory.py @@ -0,0 +1,43 @@ +"""Stream wrapper for TimeseriesFactory.""" + +from TimeseriesFactory import TimeseriesFactory + + +class StreamTimeseriesFactory(TimeseriesFactory): + """Timeseries Factory for streams. + normally stdio. + + Parameters + ---------- + factory: geomagio.TimeseriesFactory + wrapped factory. + stream: file object + io stream, normally either a file, or stdio + + See Also + -------- + Timeseriesfactory + """ + def __init__(self, factory, stream): + self.factory = factory + self.stream = stream + self.stream_data = None + + def get_timeseries(self, starttime, endtime, observatory=None, + channels=None, type=None, interval=None): + """Get timeseries using stream as input. + """ + if self.stream_data is None: + # only read stream once + self.stream_data = self.stream.read() + return self.factory.parse_string( + data=self.stream_data, + starttime=starttime, + endtime=endtime, + observatory=observatory) + + def put_timeseries(self, timeseries, starttime=None, endtime=None, + channels=None, type=None, interval=None): + """Put timeseries using stream as output. + """ + self.factory.write_file(self.stream, timeseries, channels) -- GitLab