diff --git a/geomagio/Controller.py b/geomagio/Controller.py index 370170a9b4eb8a9809d3577c8e900f26958d0dc2..0f04652cbf40a36c6ec7748e004ecb7b47ca9f3f 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -6,18 +6,18 @@ import sys from obspy.core import Stream, UTCDateTime from algorithm import algorithms from PlotTimeseriesFactory import PlotTimeseriesFactory +from StreamTimeseriesFactory import StreamTimeseriesFactory import TimeseriesUtility +# factory packages +import binlog import edge import iaga2002 import pcdcp import imfv122 import imfv283 - -# factories for new filetypes import temperature import vbf -import binlog class Controller(object): @@ -243,16 +243,16 @@ class Controller(object): if output_gap[0] == options.starttime: # found fillable gap at start, recurse to previous interval interval = options.endtime - options.starttime - starttime = options.starttime - interval - 1 + starttime = options.starttime - interval endtime = options.starttime - 1 options.starttime = starttime options.endtime = endtime self.run_as_update(options, update_count + 1) # fill gap - print >> sys.stderr, 'processing', \ - options.starttime, options.endtime options.starttime = output_gap[0] options.endtime = output_gap[1] + print >> sys.stderr, 'processing', \ + options.starttime, options.endtime self.run(options) @@ -272,7 +272,6 @@ def get_input_factory(args): input_factory = None input_factory_args = None input_stream = None - input_url = None # standard arguments input_factory_args = {} @@ -282,14 +281,11 @@ def get_input_factory(args): # stream/url arguments if args.input_file is not None: input_stream = open(args.input_file, 'r') - input_factory_args['stream'] = input_stream elif args.input_stdin: input_stream = sys.stdin - input_factory_args['stream'] = input_stream elif args.input_url is not None: - input_url = args.input_url input_factory_args['urlInterval'] = args.input_url_interval - input_factory_args['urlTemplate'] = input_url + input_factory_args['urlTemplate'] = args.input_url input_type = args.input if input_type == 'edge': @@ -306,34 +302,21 @@ def get_input_factory(args): server=args.input_goes_server, user=args.input_goes_user, **input_factory_args) - elif input_type == 'iaga2002': - if input_stream is not None: - input_factory = iaga2002.StreamIAGA2002Factory( - **input_factory_args) - elif input_url is not None: - input_factory = iaga2002.IAGA2002Factory( - **input_factory_args) - elif input_type == 'imfv122': - if input_stream is not None: - input_factory = imfv122.StreamIMFV122Factory( - **input_factory_args) - elif input_url is not None: - input_factory = imfv122.IMFV122Factory( - **input_factory_args) - elif input_type == 'imfv283': - if input_stream is not None: - input_factory = imfv283.StreamIMFV283Factory( - **input_factory_args) - elif input_url is not None: - input_factory = imfv283.IMFV283Factory( - **input_factory_args) - elif input_type == 'pcdcp': + else: + # stream compatible factories + if input_type == 'iaga2002': + input_factory = iaga2002.IAGA2002Factory(**input_factory_args) + elif input_type == 'imfv122': + input_factory = imfv122.IMFV122Factory(**input_factory_args) + elif input_type == 'imfv283': + input_factory = imfv283.IMFV283Factory(**input_factory_args) + elif input_type == 'pcdcp': + input_factory = pcdcp.PCDCPFactory(**input_factory_args) + # wrap stream if input_stream is not None: - input_factory = pcdcp.StreamPCDCPFactory( - **input_factory_args) - elif input_url is not None: - input_factory = pcdcp.PCDCPFactory( - **input_factory_args) + input_factory = StreamTimeseriesFactory( + factory=input_factory, + stream=input_stream) return input_factory @@ -363,10 +346,8 @@ def get_output_factory(args): # stream/url arguments if args.output_file is not None: output_stream = open(args.output_file, 'wb') - output_factory_args['stream'] = output_stream elif args.output_stdout: output_stream = sys.stdout - output_factory_args['stream'] = output_stream elif args.output_url is not None: output_url = args.output_url output_factory_args['urlInterval'] = args.output_url_interval @@ -384,41 +365,25 @@ def get_output_factory(args): tag=args.output_edge_tag, forceout=args.output_edge_forceout, **output_factory_args) - elif output_type == 'iaga2002': - if output_stream is not None: - output_factory = iaga2002.StreamIAGA2002Factory( - **output_factory_args) - elif output_url is not None: - output_factory = iaga2002.IAGA2002Factory( - **output_factory_args) - elif output_type == 'pcdcp': - if output_stream is not None: - output_factory = pcdcp.StreamPCDCPFactory( - **output_factory_args) - elif output_url is not None: - output_factory = pcdcp.PCDCPFactory( - **output_factory_args) elif output_type == 'plot': output_factory = PlotTimeseriesFactory() - elif output_type == 'temperature': - if output_stream is not None: - output_factory = temperature.StreamTEMPFactory( - **output_factory_args) - elif output_url is not None: - output_factory = temperature.TEMPFactory( - **output_factory_args) - elif output_type == 'vbf': - if output_stream is not None: - output_factory = vbf.StreamVBFFactory( - **output_factory_args) - elif output_type == 'binlog': + else: + # stream compatible factories + if output_type == 'binlog': + output_factory = binlog.BinLogFactory(**output_factory_args) + elif output_type == 'iaga2002': + output_factory = iaga2002.IAGA2002Factory(**output_factory_args) + elif output_type == 'pcdcp': + output_factory = pcdcp.PCDCPFactory(**output_factory_args) + elif output_type == 'temperature': + output_factory = temperature.TEMPFactory(**output_factory_args) + elif output_type == 'vbf': + output_factory = vbf.VBFFactory(**output_factory_args) + # wrap stream if output_stream is not None: - output_factory = binlog.StreamBinLogFactory( - **output_factory_args) - elif output_url is not None: - output_factory = binlog.BinLogFactory( - output=output_type, - **output_factory_args) + output_factory = StreamTimeseriesFactory( + factory=output_factory, + stream=output_stream) return output_factory diff --git a/geomagio/StreamTimeseriesFactory.py b/geomagio/StreamTimeseriesFactory.py new file mode 100644 index 0000000000000000000000000000000000000000..fb7f00ae1124831773e4fbd18497251b34732550 --- /dev/null +++ b/geomagio/StreamTimeseriesFactory.py @@ -0,0 +1,43 @@ +"""Stream wrapper for TimeseriesFactory.""" + +from TimeseriesFactory import TimeseriesFactory + + +class StreamTimeseriesFactory(TimeseriesFactory): + """Timeseries Factory for streams. + normally stdio. + + Parameters + ---------- + factory: geomagio.TimeseriesFactory + wrapped factory. + stream: file object + io stream, normally either a file, or stdio + + See Also + -------- + Timeseriesfactory + """ + def __init__(self, factory, stream): + self.factory = factory + self.stream = stream + self.stream_data = None + + def get_timeseries(self, starttime, endtime, observatory=None, + channels=None, type=None, interval=None): + """Get timeseries using stream as input. + """ + if self.stream_data is None: + # only read stream once + self.stream_data = self.stream.read() + return self.factory.parse_string( + data=self.stream_data, + starttime=starttime, + endtime=endtime, + observatory=observatory) + + def put_timeseries(self, timeseries, starttime=None, endtime=None, + channels=None, type=None, interval=None): + """Put timeseries using stream as output. + """ + self.factory.write_file(self.stream, timeseries, channels) diff --git a/geomagio/TimeseriesFactory.py b/geomagio/TimeseriesFactory.py index 0b5ed6020f705cb71e004d7587e6fea8674de7f6..abef4d15b25d516e5e61a01a7049a24fd4beb398 100644 --- a/geomagio/TimeseriesFactory.py +++ b/geomagio/TimeseriesFactory.py @@ -1,4 +1,5 @@ """Abstract Timeseries Factory Interface.""" +import numpy import obspy.core import os import sys @@ -119,8 +120,18 @@ class TimeseriesFactory(object): except Exception as e: print >> sys.stderr, "Error parsing data: " + str(e) print >> sys.stderr, data + if channels is not None: + filtered = obspy.core.Stream() + for channel in channels: + filtered += timeseries.select(channel=channel) + timeseries = filtered timeseries.merge() - timeseries.trim(starttime, endtime) + timeseries.trim( + starttime=starttime, + endtime=endtime, + nearest_sample=False, + pad=True, + fill_value=numpy.nan) return timeseries def parse_string(self, data, **kwargs): @@ -166,6 +177,9 @@ class TimeseriesFactory(object): TimeseriesFactoryException if any errors occur. """ + if len(timeseries) == 0: + # no data to put + return if not self.urlTemplate.startswith('file://'): raise TimeseriesFactoryException('Only file urls are supported') channels = channels or self.channels @@ -218,12 +232,6 @@ class TimeseriesFactory(object): except NotImplementedError: # factory only supports output pass - except Exception as e: - print >> sys.stderr, \ - 'Unable to merge with existing data.' + \ - '\nfile={}' + \ - '\nerror={}'.format(url_file, str(e)) - raise e with open(url_file, 'wb') as fh: try: self.write_file(fh, url_data, channels) diff --git a/geomagio/Util.py b/geomagio/Util.py index e5404ae45b5ef556e894e1d7309b51b15aac77c0..e9d8ca5c3364125b920f54fd6696c38a2919bc1f 100644 --- a/geomagio/Util.py +++ b/geomagio/Util.py @@ -143,7 +143,7 @@ def read_url(url, connect_timeout=15, max_redirects=5, timeout=300): Raises ------ - urllib2.URLError + IOError if any occurs """ try: @@ -167,6 +167,8 @@ def read_url(url, connect_timeout=15, max_redirects=5, timeout=300): curl.setopt(pycurl.WRITEFUNCTION, out.write) curl.perform() content = out.getvalue() + except pycurl.error as e: + raise IOError(e.args) finally: curl.close() return content diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index e9e43c274abc66a75973e81c58de512b33abe740..2198c6062f7b02a1b36e5a28d65a08a81d35feb4 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -15,8 +15,12 @@ import numpy import numpy.ma import obspy.core from datetime import datetime -from obspy import earthworm -from obspy.core import UTCDateTime +try: + # obspy 1.x + from obspy.clients import earthworm +except: + # obspy 0.x + from obspy import earthworm from .. import ChannelConverter, TimeseriesUtility from ..TimeseriesFactory import TimeseriesFactory from ..TimeseriesFactoryException import TimeseriesFactoryException @@ -209,8 +213,8 @@ class EdgeFactory(TimeseriesFactory): Notes: the original timeseries object is changed. """ for trace in timeseries: - trace_starttime = UTCDateTime(trace.stats.starttime) - trace_endtime = UTCDateTime(trace.stats.endtime) + trace_starttime = obspy.core.UTCDateTime(trace.stats.starttime) + trace_endtime = obspy.core.UTCDateTime(trace.stats.endtime) if trace.stats.starttime > starttime: cnt = int((trace_starttime - starttime) / trace.stats.delta) @@ -530,7 +534,7 @@ class EdgeFactory(TimeseriesFactory): type, interval) edge_channel = self._get_edge_channel(observatory, channel, type, interval) - data = self.client.getWaveform(network, station, location, + data = self.client.get_waveforms(network, station, location, edge_channel, starttime, endtime) data.merge() if data.count() == 0: @@ -553,11 +557,11 @@ class EdgeFactory(TimeseriesFactory): Returns ------- tuple: (starttime, endtime) - starttime: UTCDateTime - endtime: UTCDateTime + starttime: obspy.core.UTCDateTime + endtime: obspy.core.UTCDateTime """ - starttime = UTCDateTime(datetime.now()) - endtime = UTCDateTime(0) + starttime = obspy.core.UTCDateTime(datetime.now()) + endtime = obspy.core.UTCDateTime(0) for trace in timeseries: if trace.stats.starttime < starttime: starttime = trace.stats.starttime @@ -575,7 +579,7 @@ class EdgeFactory(TimeseriesFactory): Parameters ---------- timeseries: obspy.core.stream - The timeseries stream as returned by the call to getWaveform + The timeseries stream as returned by the call to get_waveforms starttime: obspy.core.UTCDateTime the starttime of the requested data endtime: obspy.core.UTCDateTime @@ -614,8 +618,8 @@ class EdgeFactory(TimeseriesFactory): data type. interval: {'daily', 'hourly', 'minute', 'second'} data interval. - starttime: UTCDateTime - endtime: UTCDateTime + starttime: obspy.core.UTCDateTime + endtime: obspy.core.UTCDateTime Notes ----- @@ -630,7 +634,7 @@ class EdgeFactory(TimeseriesFactory): edge_channel = self._get_edge_channel(observatory, channel, type, interval) - now = UTCDateTime(datetime.utcnow()) + now = obspy.core.UTCDateTime(datetime.utcnow()) if ((now - endtime) > 864000) and (self.cwbport > 0): host = self.cwbhost port = self.cwbport diff --git a/geomagio/imfv122/IMFV122Parser.py b/geomagio/imfv122/IMFV122Parser.py index 1c92647226e0b05b575b4b4694878b2907c76b24..2a496728db8a66822f630b305838d52fe23a6699 100644 --- a/geomagio/imfv122/IMFV122Parser.py +++ b/geomagio/imfv122/IMFV122Parser.py @@ -100,7 +100,7 @@ class IMFV122Parser(object): dayminute = int(start) hour = int(dayminute / 60) minute = dayminute % 60 - self._delta = 1 + self._delta = 60 self._nexttime = UTCDateTime( year=year, julday=julday, diff --git a/setup.py b/setup.py index 3d803c6d97f332977defc8809e6433ef742e459c..796e3e841f5bdaf5a1b5a264596fe829291650d4 100644 --- a/setup.py +++ b/setup.py @@ -2,16 +2,20 @@ from distutils.core import setup setup( name='geomag-algorithms', - version='0.0.0', + version='0.2.0', description='USGS Geomag IO Library', url='https://github.com/usgs/geomag-algorithms', packages=[ 'geomagio', 'geomagio.algorithm', + 'geomagio.binlog', + 'geomagio.edge', 'geomagio.iaga2002', + 'geomagio.imfv122', 'geomagio.imfv283', - 'geomagio.edge', - 'geomagio.pcdcp' + 'geomagio.pcdcp', + 'geomagio.temperature', + 'geomagio.vbf' ], install_requires=[ 'numpy', diff --git a/test/imfv122_test/IMFV122Parser_test.py b/test/imfv122_test/IMFV122Parser_test.py index 573a0f4a8623a972fe92b8f2657fa9616f686132..ad5a50f2932d98569b841731c20094cefe155013 100644 --- a/test/imfv122_test/IMFV122Parser_test.py +++ b/test/imfv122_test/IMFV122Parser_test.py @@ -5,7 +5,7 @@ from geomagio.imfv122 import IMFV122Parser from obspy.core import UTCDateTime -def test_imfv122_parse_header__minutes(): +def test_imfv122_parse_header__hour_of_day(): """imfv122_test.test_imfv122_parse_header__minutes. """ parser = IMFV122Parser() @@ -21,7 +21,7 @@ def test_imfv122_parse_header__minutes(): assert_equals(parser._nexttime, UTCDateTime('2016-05-02T03:00:00Z')) -def test_imfv122_parse_header__seconds(): +def test_imfv122_parse_header__minute_of_day(): """imfv122_test.test_imfv122_parse_header__seconds. """ parser = IMFV122Parser() @@ -33,7 +33,7 @@ def test_imfv122_parse_header__seconds(): assert_equals(metadata['geodetic_latitude'], 124.4) assert_equals(metadata['geodetic_longitude'], 19.2) assert_equals(metadata['station'], 'HER') - assert_equals(parser._delta, 1) + assert_equals(parser._delta, 60) assert_equals(parser._nexttime, UTCDateTime('2016-01-01T02:03:00Z')) @@ -44,12 +44,14 @@ def test_imfv122_parse_data(): parser._parse_header( 'HER JAN0116 001 0123 HDZF R EDI 12440192 -14161 DRRRRRRRRRRRRRRR') parser._parse_data('1234 5678 9101 1121 3141 5161 7181 9202') + import pprint + pprint.pprint(parser._parsedata) assert_equals(parser._parsedata[0][0], UTCDateTime('2016-01-01T02:03:00Z')) assert_equals(parser._parsedata[1][0], '1234') assert_equals(parser._parsedata[2][0], '5678') assert_equals(parser._parsedata[3][0], '9101') assert_equals(parser._parsedata[4][0], '1121') - assert_equals(parser._parsedata[0][1], UTCDateTime('2016-01-01T02:03:01Z')) + assert_equals(parser._parsedata[0][1], UTCDateTime('2016-01-01T02:04:00Z')) assert_equals(parser._parsedata[1][1], '3141') assert_equals(parser._parsedata[2][1], '5161') assert_equals(parser._parsedata[3][1], '7181') @@ -69,7 +71,7 @@ def test_imfv122_post_process(): assert_equals(parser.data['D'][0], 56.78) assert_equals(parser.data['Z'][0], 910.1) assert_equals(parser.data['F'][0], 112.1) - assert_equals(parser.times[1], UTCDateTime('2016-01-01T02:03:01Z')) + assert_equals(parser.times[1], UTCDateTime('2016-01-01T02:04:00Z')) assert_equals(parser.data['H'][1], 314.1) assert_equals(parser.data['D'][1], 51.61) assert_equals(parser.data['Z'][1], 718.1)