From 8e1b765508468add8ac7585da74d6a3df4f8edc3 Mon Sep 17 00:00:00 2001 From: Jeremy Fee <jmfee@usgs.gov> Date: Wed, 11 May 2016 10:25:30 -0600 Subject: [PATCH] Reorganize input/output factory arguments to be less redundant --- geomagio/Controller.py | 462 +++++++++++++++++++++++++---------------- 1 file changed, 288 insertions(+), 174 deletions(-) diff --git a/geomagio/Controller.py b/geomagio/Controller.py index 146207d6a..cc4743bda 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -243,6 +243,162 @@ class Controller(object): self.run(options) +def get_input_factory(args): + """Parse input factory arguments. + + Parameters + ---------- + args : argparse.Namespace + arguments + + Returns + ------- + TimeseriesFactory + input timeseries factory + """ + input_factory = None + input_factory_args = None + input_stream = None + input_url = None + + # standard arguments + input_factory_args = {} + input_factory_args['interval'] = args.interval + input_factory_args['observatory'] = args.observatory + input_factory_args['type'] = args.type + # stream/url arguments + if args.input_file is not None: + input_stream = open(args.input_file, 'r') + input_factory_args['stream'] = input_stream + elif args.input_stdin: + input_stream = sys.stdin + input_factory_args['stream'] = input_stream + elif args.input_url is not None: + input_url = args.input_url + input_factory_args['urlInterval'] = args.input_url_interval + input_factory_args['urlTemplate'] = input_url + + input_type = args.input + if input_type == 'edge': + input_factory = edge.EdgeFactory( + host=args.input_host, + port=args.input_port, + locationCode=args.locationcode, + **input_factory_args) + elif input_type == 'goes': + # TODO: deal with other goes arguments + input_factory = imfv283.GOESIMFV283Factory( + directory=args.input_goes_directory, + getdcpmessages=args.input_goes_getdcpmessages, + server=args.input_goes_server, + user=args.input_goes_user, + **input_factory_args) + elif input_type == 'iaga2002': + if input_stream is not None: + input_factory = iaga2002.StreamIAGA2002Factory( + **input_factory_args) + elif input_url is not None: + input_factory = iaga2002.IAGA2002Factory( + **input_factory_args) + elif input_type == 'imfv283': + if input_stream is not None: + input_factory = imfv283.StreamIMFV283Factory( + **input_factory_args) + elif input_url is not None: + input_factory = imfv283.IMFV283Factory( + **input_factory_args) + elif input_type == 'pcdcp': + if input_stream is not None: + input_factory = pcdcp.StreamPCDCPFactory( + **input_factory_args) + elif input_url is not None: + input_factory = pcdcp.PCDCPFactory( + **input_factory_args) + return input_factory + + +def get_output_factory(args): + """Parse output factory arguments. + + Parameters + ---------- + args : argparse.Namespace + arguments + + Returns + ------- + TimeseriesFactory + output timeseries factory + """ + output_factory = None + output_factory_args = None + output_stream = None + output_url = None + + # standard arguments + output_factory_args = {} + output_factory_args['interval'] = args.interval + output_factory_args['observatory'] = args.observatory + output_factory_args['type'] = args.type + # stream/url arguments + if args.output_file is not None: + output_stream = open(args.output_file, 'wb') + output_factory_args['stream'] = output_stream + elif args.output_stdin: + output_stream = sys.stdout + output_factory_args['stream'] = output_stream + elif args.output_url is not None: + output_url = args.output_url + output_factory_args['urlInterval'] = args.output_url_interval + output_factory_args['urlTemplate'] = output_url + + output_type = args.output + if output_type == 'edge': + # TODO: deal with other edge arguments + locationcode = args.outlocationcode or args.locationcode or None + output_factory = edge.EdgeFactory( + host=args.output_host, + port=args.output_edge_read_port, + write_port=args.output_port, + locationCode=locationcode, + tag=args.output_edge_tag, + forceout=args.output_edge_forceout, + **output_factory_args) + elif output_type == 'iaga2002': + if output_stream is not None: + output_factory = iaga2002.StreamIAGA2002Factory( + **output_factory_args) + elif output_url is not None: + output_factory = iaga2002.IAGA2002Factory( + **output_factory_args) + elif output_type == 'pcdcp': + if output_stream is not None: + output_factory = pcdcp.StreamPCDCPFactory( + **output_factory_args) + elif output_url is not None: + output_factory = pcdcp.PCDCPFactory( + **output_factory_args) + elif output_type == 'plot': + output_factory = PlotTimeseriesFactory() + elif output_type == 'temperature': + if output_stream is not None: + output_factory = temperature.StreamTEMPFactory( + **output_factory_args) + elif output_url is not None: + output_factory = temperature.TEMPFactory( + **output_factory_args) + elif output_type == 'vbf' or output_type == 'binlog': + if output_stream is not None: + output_factory = vbf.StreamVBFFactory( + output=output_type, + **output_factory_args) + elif output_url is not None: + output_factory = vbf.VBFFactory( + output=output_type, + **output_factory_args) + return output_factory + + def main(args): """command line factory for geomag algorithms @@ -256,161 +412,68 @@ def main(args): with instantiated I/O factories, and algorithm(s) """ - # Input Factory + # TODO: remove argument mapping in future version + # map legacy input arguments if args.input_edge is not None: - inputfactory = edge.EdgeFactory( - host=args.input_edge, - port=args.input_edge_port, - observatory=args.observatory, - type=args.type, - interval=args.interval, - locationCode=args.locationcode) + args.input = 'edge' + args.input_host = args.input_edge + args.input_port = args.input_edge_port elif args.input_iaga_file is not None: - inputfactory = iaga2002.StreamIAGA2002Factory( - stream=open(args.input_iaga_file, 'r'), - observatory=args.observatory, - type=args.type, - interval=args.interval) - # TODO remove magweb option - elif args.input_iaga_magweb: - inputfactory = iaga2002.MagWebFactory( - observatory=args.observatory, - type=args.type, - interval=args.interval) + args.input = 'iaga2002' + args.input_file = args.input_iaga_file elif args.input_iaga_stdin: - inputfactory = iaga2002.StreamIAGA2002Factory( - stream=sys.stdin, - observatory=args.observatory, - type=args.type, - interval=args.interval) + args.input = 'iaga2002' + args.input_stdin = True elif args.input_iaga_url is not None: - inputfactory = iaga2002.IAGA2002Factory( - urlTemplate=args.input_iaga_url, - observatory=args.observatory, - type=args.type, - interval=args.interval) + args.input = 'iaga2002' + args.input_url = args.input_iaga_url elif args.input_imfv283_file is not None: - inputfactory = imfv283.StreamIMFV283Factory( - stream=open(args.input_imfv283_file, 'r'), - observatory=args.observatory) - elif args.input_imfv283_goes: - inputfactory = imfv283.GOESIMFV283Factory( - directory=args.input_goes_directory, - getdcpmessages=args.input_goes_getdcpmessages, - observatory=args.observatory, - server=args.input_goes_server, - user=args.input_goes_user) - elif args.input_imfv283_stdin: - inputfactory = imfv283.StreamIMFV283Factory( - stream=sys.stdin, - observatory=args.observatory) + args.input = 'imfv283' + args.input_file = args.input_imfv283_file elif args.input_imfv283_url is not None: - inputfactory = imfv283.IMFV283Factory( - urlTemplate=args.input_imfv283_url, - observatory=args.observatory) - elif args.input_pcdcp_file is not None: - inputfactory = pcdcp.StreamPCDCPFactory( - stream=open(args.input_pcdcp_file, 'r'), - observatory=args.observatory, - type=args.type, - interval=args.interval) - elif args.input_pcdcp_stdin: - inputfactory = pcdcp.StreamPCDCPFactory( - stream=sys.stdin, - observatory=args.observatory, - type=args.type, - interval=args.interval) - elif args.input_pcdcp_url is not None: - inputfactory = pcdcp.PCDCPFactory( - urlTemplate=args.input_pcdcp_url, - observatory=args.observatory, - type=args.type, - interval=args.interval) - else: - print >> sys.stderr, 'Missing required input directive.' - - # Output Factory - if args.output_iaga_file is not None: - outputfactory = iaga2002.StreamIAGA2002Factory( - stream=open(args.output_iaga_file, 'wb'), - observatory=args.observatory, - type=args.type, - interval=args.interval) + args.input = 'imfv283' + args.input_url = args.input_imfv283_url + elif args.input_imfv283_goes: + args.input = 'goes' + # map legacy output arguments + if args.output_edge is not None: + args.output = 'edge' + args.output_host = args.output_edge + args.output_port = args.edge_write_port + elif args.output_iaga_file is not None: + args.output = 'iaga2002' + args.output_file = args.output_iaga_file elif args.output_iaga_stdout: - outputfactory = iaga2002.StreamIAGA2002Factory( - stream=sys.stdout, - observatory=args.observatory, - type=args.type, - interval=args.interval) + args.output = 'iaga2002' + args.output_stdout = True elif args.output_iaga_url is not None: - outputfactory = iaga2002.IAGA2002Factory( - urlTemplate=args.output_iaga_url, - observatory=args.observatory, - type=args.type, - interval=args.interval) + args.output = 'iaga2002' + args.output_url = args.output_iaga_url elif args.output_pcdcp_file is not None: - outputfactory = pcdcp.StreamPCDCPFactory( - stream=open(args.output_pcdcp_file, 'wb'), - observatory=args.observatory, - type=args.type, - interval=args.interval) + args.output = 'pcdcp' + args.output_file = args.output_pcdcp_file elif args.output_pcdcp_stdout: - outputfactory = pcdcp.StreamPCDCPFactory( - stream=sys.stdout, - observatory=args.observatory, - type=args.type, - interval=args.interval) + args.output = 'pcdcp' + args.output_stdout = True elif args.output_pcdcp_url is not None: - outputfactory = pcdcp.PCDCPFactory( - urlTemplate=args.output_pcdcp_url, - observatory=args.observatory, - type=args.type, - interval=args.interval) - elif args.output_edge is not None: - locationcode = args.outlocationcode or args.locationcode or None - outputfactory = edge.EdgeFactory( - host=args.output_edge, - port=args.output_edge_read_port, - write_port=args.edge_write_port, - observatory=args.observatory, - type=args.type, - interval=args.interval, - locationCode=locationcode, - tag=args.output_edge_tag, - forceout=args.output_edge_forceout) + args.output = 'pcdcp' + args.output_url = args.output_pcdcp_url + elif args.output_plot: + args.output = 'plot' elif args.output_temperature_file is not None: - outputfactory = temperature.StreamTEMPFactory( - stream=open(args.output_temperature_file, 'wb'), - observatory=args.observatory, - type=args.type, - interval=args.interval) - # VBF includes flag for writing vbf file vs bin-change log - elif args.output_vbf_file is not None: - outputfactory = vbf.StreamVBFFactory( - stream=open(args.output_vbf_file, 'wb'), - observatory=args.observatory, - type=args.type, - interval=args.interval, - output='vbf') - elif args.output_binlog_file is not None: - outputfactory = vbf.StreamVBFFactory( - stream=open(args.output_binlog_file, 'wb'), - observatory=args.observatory, - type=args.type, - interval=args.interval, - output='binlog') + args.output = 'temperature' + args.output_file = args.output_temperature_file - elif args.output_plot: - outputfactory = PlotTimeseriesFactory() - else: - print >> sys.stderr, "Missing required output directive" + # TODO check for unused arguments. + # create controller + input_factory = get_input_factory(args) + output_factory = get_output_factory(args) algorithm = algorithms[args.algorithm]() algorithm.configure(args) + controller = Controller(input_factory, output_factory, algorithm) - # TODO check for unused arguments. - - if (args.realtime): + if args.realtime: now = UTCDateTime() args.endtime = UTCDateTime(now.year, now.month, now.day, now.hour, now.minute) @@ -420,8 +483,6 @@ def main(args): args.starttime = args.endtime - 600 print args.starttime, args.endtime - controller = Controller(inputfactory, outputfactory, algorithm) - if args.update: controller.run_as_update(args) else: @@ -501,21 +562,25 @@ def parse_args(args): parser.add_argument('--input-edge-port', type=int, default=2060, - help='Input port # for edge input, defaults to 2060') + help='deprecated. \ + Input port # for edge input, defaults to 2060') parser.add_argument('--output-edge-port', type=int, dest='edge_write_port', default=7981, - help='Edge port for writing realtime data, defaults to 7981') + help='deprecated. \ + Edge port for writing realtime data, defaults to 7981') parser.add_argument('--output-edge-cwb-port', type=int, dest='edge_write_port', default='7981', - help='Edge port for writing older data. Not used by geomag.') + help='deprecated. \ + Edge port for writing older data. Not used by geomag.') parser.add_argument('--output-edge-read-port', type=int, default=2060, - help='Edge port for reading output data, defaults to 2060') + help='deprecated. \ + Edge port for reading output data, defaults to 2060') parser.add_argument('--output-edge-tag', default='GEOMAG', help='ID Tag for edge connections, defaults to GEOMAG') @@ -544,71 +609,120 @@ def parse_args(args): # Input group input_group = parser.add_mutually_exclusive_group(required=True) + input_group.add_argument('--input', + help='Input format', + choices=( + 'edge', + 'goes', + 'iaga2002', + 'imfv283', + 'pcdcp')) + + parser.add_argument('--input-file', + help='Read from specified file') + parser.add_argument('--input-host', + default='cwbpub.cr.usgs.gov', + help='Hostname or IP address') + parser.add_argument('--input-port', + default=2060, + help='Port number', + type=int) + parser.add_argument('--input-url', + help='Read from a url pattern') + parser.add_argument('--input-url-interval', + default=86400, + help='Read url interval in seconds', + type=int) + input_group.add_argument('--input-edge', - help='Host IP #, see --input-edge-port for optional args') + help='deprecated. \ + Host IP #, see --input-edge-port for optional args') input_group.add_argument('--input-iaga-file', - help='Reads from the specified file.') - input_group.add_argument('--input-iaga-magweb', - action='store_true', - default=False, - help='Indicates iaga2002 files will be read from \ - http://magweb.cr.usgs.gov/data/magnetometer/') + help='deprecated. Reads from the specified file.') input_group.add_argument('--input-iaga-stdin', action='store_true', default=False, - help='Pass in an iaga file using redirection from stdin.') + help='deprecated. \ + Pass in an iaga file using redirection from stdin.') input_group.add_argument('--input-iaga-url', - help='Example: file://./%%(obs)s%%(ymd)s%%(t)s%%(i)s.%%(i)s') + help='deprecated. \ + Example: file://./%%(obs)s%%(ymd)s%%(t)s%%(i)s.%%(i)s') input_group.add_argument('--input-imfv283-file', - help='Reads from the specified file.') + help='deprecated. Reads from the specified file.') input_group.add_argument('--input-imfv283-stdin', action='store_true', default=False, - help='Pass in a file using redirection from stdin') + help='deprecated. \ + Pass in a file using redirection from stdin') input_group.add_argument('--input-imfv283-url', - help='Example file://./') + help='deprecated. Example file://./') input_group.add_argument('--input-imfv283-goes', action='store_true', default=False, - help='Retrieves data directly from a goes server to read') + help='deprecated. \ + Retrieves data directly from a goes server to read') input_group.add_argument('--input-pcdcp-file', - help='Reads from the specified file.') + help='deprecated. Reads from the specified file.') input_group.add_argument('--input-pcdcp-stdin', action='store_true', default=False, - help='Pass in an pcdcp file using redirection from stdin.') + help='deprecated. \ + Pass in an pcdcp file using redirection from stdin.') input_group.add_argument('--input-pcdcp-url', - help='Example: file://./%%(obs)s%%(Y)s%%(j)s.%%(i)s') + help='deprecated. Example: file://./%%(obs)s%%(Y)s%%(j)s.%%(i)s') # Output group output_group = parser.add_mutually_exclusive_group(required=True) - output_group.add_argument('--output-temperature-file', - help='Write to a single temperature/battery file.') - output_group.add_argument('--output-vbf-file', - help='Write to a single voltage-bin file.') - output_group.add_argument('--output-binlog-file', - help='Write to a single bin-change log file.') output_group.add_argument('--output-iaga-file', - help='Write to a single iaga file.') + help='deprecated. Write to a single iaga file.') output_group.add_argument('--output-iaga-stdout', action='store_true', default=False, - help='Write to stdout.') + help='deprecated. Write to stdout.') output_group.add_argument('--output-iaga-url', - help='Example: file://./%%(obs)s%%(ymd)s%%(t)s%%(i)s.%%(i)s') + help='deprecated. \ + Example: file://./%%(obs)s%%(ymd)s%%(t)s%%(i)s.%%(i)s') output_group.add_argument('--output-pcdcp-file', - help='Write to a single pcdcp file.') + help='deprecated. Write to a single pcdcp file.') output_group.add_argument('--output-pcdcp-stdout', action='store_true', default=False, - help='Write to stdout.') + help='deprecated. Write to stdout.') output_group.add_argument('--output-pcdcp-url', - help='Example: file://./%%(obs)s%%(Y)s%%(j)s.%%(i)s') + help='deprecated. Example: file://./%%(obs)s%%(Y)s%%(j)s.%%(i)s') output_group.add_argument('--output-edge', - help='Edge IP #. See --output-edge-* for other optional arguments') + help='deprecated. \ + Edge IP #. See --output-edge-* for other optional arguments') output_group.add_argument('--output-plot', action='store_true', default=False, - help='Plot the algorithm output using matplotlib') + help='deprecated. Plot the algorithm output using matplotlib') + + # output arguments + output_group.add_argument('--output', + choices=( + 'binlog', + 'edge', + 'iaga2002', + 'pcdcp', + 'plot', + 'temperature', + 'vbf' + ), + # TODO: set default to 'iaga2002' + help='Output format') + + parser.add_argument('--output-file', + help='Write to specified file') + parser.add_argument('--output-stdout', + action='store_true', + default=False, + help='Write to standard output') + parser.add_argument('--output-url', + help='Write to a file:// url pattern') + parser.add_argument('--output-url-interval', + default=86400, + help='Output interval in seconds', + type=int) # Algorithms group parser.add_argument('--algorithm', -- GitLab