diff --git a/geomagio/Controller.py b/geomagio/Controller.py index a698279eab185901b23ae8f823d33e0c112cf925..df3b841951608a4da5310954bbf88b82b57b0851 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -71,7 +71,7 @@ class Controller(object): timeseries : obspy.core.Stream """ timeseries = Stream() - for obs in list(observatory): + for obs in observatory: # get input interval for observatory # do this per observatory in case an # algorithm needs different amounts of data @@ -130,7 +130,7 @@ class Controller(object): timeseries : obspy.core.Stream """ timeseries = Stream() - for obs in list(observatory): + for obs in observatory: timeseries += self._outputFactory.get_timeseries( observatory=obs, starttime=starttime, @@ -203,6 +203,8 @@ class Controller(object): if options.update_limit != 0: if update_count >= options.update_limit: return + print >> sys.stderr, 'checking gaps', \ + options.starttime, options.endtime algorithm = self._algorithm input_channels = options.inchannels or \ algorithm.get_input_channels() @@ -214,10 +216,17 @@ class Controller(object): starttime=options.starttime, endtime=options.endtime, channels=output_channels) - delta = output_timeseries[0].stats.delta - # find gaps in output, so they can be updated - output_gaps = TimeseriesUtility.get_merged_gaps( - TimeseriesUtility.get_stream_gaps(output_timeseries)) + if len(output_timeseries) > 0: + # find gaps in output, so they can be updated + output_gaps = TimeseriesUtility.get_merged_gaps( + TimeseriesUtility.get_stream_gaps(output_timeseries)) + else: + output_gaps = [[ + options.starttime, + options.endtime, + # next sample time not used + None + ]] for output_gap in output_gaps: input_timeseries = self._get_input_timeseries( observatory=options.observatory, @@ -233,12 +242,14 @@ class Controller(object): if output_gap[0] == options.starttime: # found fillable gap at start, recurse to previous interval interval = options.endtime - options.starttime - starttime = options.starttime - interval - delta - endtime = options.starttime - delta + starttime = options.starttime - interval - 1 + endtime = options.starttime - 1 options.starttime = starttime options.endtime = endtime self.run_as_update(options, update_count + 1) # fill gap + print >> sys.stderr, 'processing', \ + options.starttime, options.endtime options.starttime = output_gap[0] options.endtime = output_gap[1] self.run(options) @@ -490,6 +501,27 @@ def main(args): ' please update your usage' # TODO check for unused arguments. + # make sure observatory is a tuple + if isinstance(args.observatory, (str, unicode)): + args.observatory = (args.observatory,) + + if args.observatory_foreach: + observatory = args.observatory + for obs in observatory: + args.observatory = (obs,) + _main(args) + else: + _main(args) + + +def _main(args): + """Actual main method logic, called by main + + Parameters + ---------- + args : argparse.Namespace + command line arguments + """ # create controller input_factory = get_input_factory(args) output_factory = get_output_factory(args) @@ -505,7 +537,6 @@ def main(args): args.starttime = args.endtime - 3600 else: args.starttime = args.endtime - 600 - print args.starttime, args.endtime if args.update: controller.run_as_update(args) @@ -539,12 +570,18 @@ def parse_args(args): help='UTC date YYYY-MM-DD HH:MM:SS') parser.add_argument('--observatory', + default=(None,), help='Observatory code ie BOU, CMO, etc.' + ' CAUTION: Using multiple observatories is not' + ' recommended in most cases; especially with' + ' single observatory formats like IAGA and PCDCP.', nargs='*', type=str) + parser.add_argument('--observatory-foreach', + action='store_true', + default=False, + help='When specifying multiple observatories, process' + ' each observatory separately') parser.add_argument('--inchannels', nargs='*', help='Channels H, E, Z, etc') diff --git a/geomagio/TimeseriesFactory.py b/geomagio/TimeseriesFactory.py index fb0ba2f6404ecf8ac21eea7b68828c77b67931a9..0b5ed6020f705cb71e004d7587e6fea8674de7f6 100644 --- a/geomagio/TimeseriesFactory.py +++ b/geomagio/TimeseriesFactory.py @@ -3,6 +3,7 @@ import obspy.core import os import sys from TimeseriesFactoryException import TimeseriesFactoryException +import TimeseriesUtility import Util @@ -103,7 +104,10 @@ class TimeseriesFactory(object): type=type, interval=interval, channels=channels) - data = Util.read_url(url) + try: + data = Util.read_url(url) + except IOError as e: + continue try: timeseries += self.parse_string(data, observatory=observatory, @@ -189,6 +193,37 @@ class TimeseriesFactory(object): # subtract delta to omit the sample at end: `[start, end)` endtime=(urlInterval['end'] - delta)) url_file = Util.get_file_from_url(url, createParentDirectory=True) + # existing data file, merge new data into existing + if os.path.isfile(url_file): + try: + existing_data = Util.read_file(url_file) + existing_data = self.parse_string(existing_data, + observatory=url_data[0].stats.station, + type=type, + interval=interval, + channels=channels) + # TODO: make parse_string return the correct location code + for trace in existing_data: + # make location codes match, just in case + new_trace = url_data.select( + network=trace.stats.network, + station=trace.stats.station, + channel=trace.stats.channel)[0] + trace.stats.location = new_trace.stats.location + url_data = TimeseriesUtility.merge_streams( + existing_data, url_data) + except IOError: + # no data yet + pass + except NotImplementedError: + # factory only supports output + pass + except Exception as e: + print >> sys.stderr, \ + 'Unable to merge with existing data.' + \ + '\nfile={}' + \ + '\nerror={}'.format(url_file, str(e)) + raise e with open(url_file, 'wb') as fh: try: self.write_file(fh, url_data, channels) diff --git a/geomagio/TimeseriesUtility.py b/geomagio/TimeseriesUtility.py index e547e63c9578b2ff053ad45ddb5e12841dddd3fa..1e9061c0b8b3108271a09468612534a6f3238379 100644 --- a/geomagio/TimeseriesUtility.py +++ b/geomagio/TimeseriesUtility.py @@ -1,5 +1,6 @@ """Timeseries Utilities""" import numpy +import obspy.core def get_stream_gaps(stream): @@ -129,3 +130,78 @@ def get_channels(stream): if channel: channels[channel] = True return [ch for ch in channels] + + +def mask_stream(stream): + """Convert stream traces to masked arrays. + + Parameters + ---------- + stream : obspy.core.Stream + stream to mask + + Returns + ------- + obspy.core.Stream + stream with new Trace objects with numpy masked array data. + """ + masked = obspy.core.Stream() + for trace in stream: + masked += obspy.core.Trace( + numpy.ma.masked_invalid(trace.data), + trace.stats) + return masked + + +def unmask_stream(stream): + """Convert stream traces to unmasked arrays. + + Parameters + ---------- + stream : obspy.core.Stream + stream to unmask + + Returns + ------- + obspy.core.Stream + stream with new Trace objects with numpy array data, with numpy.nan + as a fill value in a filled array. + """ + unmasked = obspy.core.Stream() + for trace in stream: + unmasked += obspy.core.Trace( + trace.data.filled(fill_value=numpy.nan) + if isinstance(trace.data, numpy.ma.MaskedArray) + else trace.data, + trace.stats) + return unmasked + + +def merge_streams(*streams): + """Merge one or more streams. + + Parameters + ---------- + *streams : obspy.core.Stream + one or more streams to merge + + Returns + ------- + obspy.core.Stream + stream with contiguous traces merged, and gaps filled with numpy.nan + """ + merged = obspy.core.Stream() + # add unmasked, split traces to be merged + for stream in streams: + merged += mask_stream(stream) + # split traces that contain gaps + merged = merged.split() + # merge data + merged.merge( + # 1 = do not interpolate + interpolation_samples=1, + # 1 = when there is overlap, use data from trace with last endtime + method=1) + # convert back to NaN filled array + merged = unmask_stream(merged) + return merged diff --git a/geomagio/Util.py b/geomagio/Util.py index cd92c247278bf34c53ee36bf523c929be33720a5..e5404ae45b5ef556e894e1d7309b51b15aac77c0 100644 --- a/geomagio/Util.py +++ b/geomagio/Util.py @@ -3,7 +3,6 @@ import numpy import os from obspy.core import Stats, Trace from StringIO import StringIO -import sys class ObjectView(object): @@ -105,6 +104,30 @@ def get_intervals(starttime, endtime, size=86400, align=True, trim=False): return intervals +def read_file(filepath): + """Open and read file contents. + + Parameters + ---------- + filepath : str + path to a file + + Returns + ------- + str + contents of file + + Raises + ------ + IOError + if file does not exist + """ + file_data = None + with open(filepath, 'r') as f: + file_data = f.read() + return file_data + + def read_url(url, connect_timeout=15, max_redirects=5, timeout=300): """Open and read url contents. @@ -123,9 +146,17 @@ def read_url(url, connect_timeout=15, max_redirects=5, timeout=300): urllib2.URLError if any occurs """ + try: + # short circuit file urls + filepath = get_file_from_url(url) + return read_file(filepath) + except IOError as e: + raise e + except Exception: + pass content = None - curl = pycurl.Curl() out = StringIO() + curl = pycurl.Curl() try: curl.setopt(pycurl.FOLLOWLOCATION, 1) curl.setopt(pycurl.MAXREDIRS, max_redirects) @@ -136,9 +167,6 @@ def read_url(url, connect_timeout=15, max_redirects=5, timeout=300): curl.setopt(pycurl.WRITEFUNCTION, out.write) curl.perform() content = out.getvalue() - except Exception as e: - print >> sys.stderr, "Error reading url: " + str(e) - print >> sys.stderr, url finally: curl.close() return content diff --git a/geomagio/iaga2002/IAGA2002Factory.py b/geomagio/iaga2002/IAGA2002Factory.py index 0faadc7e7c7e2d6aa9ad3ad620ee5b9e44e9f148..cc575782de5a7f59825dc6d11c795628a3f04c0d 100644 --- a/geomagio/iaga2002/IAGA2002Factory.py +++ b/geomagio/iaga2002/IAGA2002Factory.py @@ -34,7 +34,8 @@ class IAGA2002Factory(TimeseriesFactory): def __init__(self, **kwargs): TimeseriesFactory.__init__(self, **kwargs) - def parse_string(self, data, observatory=None, **kwargs): + def parse_string(self, data, observatory=None, interval='minute', + **kwargs): """Parse the contents of a string in the format of an IAGA2002 file. Parameters @@ -55,9 +56,18 @@ class IAGA2002Factory(TimeseriesFactory): starttime = obspy.core.UTCDateTime(parser.times[0]) endtime = obspy.core.UTCDateTime(parser.times[-1]) data = parser.data - length = len(data[data.keys()[0]]) - rate = (length - 1) / (endtime - starttime) stream = obspy.core.Stream() + length = len(data[data.keys()[0]]) + if starttime != endtime: + rate = (length - 1) / (endtime - starttime) + else: + # guess based on args + if interval == 'minute': + rate = 1 / 60 + elif interval == 'second': + rate = 1 + else: + raise Exception('one sample, and unable to guess rate') for channel in data.keys(): stats = obspy.core.Stats(metadata) stats.starttime = starttime diff --git a/geomagio/iaga2002/IAGA2002Writer.py b/geomagio/iaga2002/IAGA2002Writer.py index 10cec07de7005b003065ed5a4301bc5c350a0619..b74bcc2be109e199a57a99194639ccacc7ec3c8a 100644 --- a/geomagio/iaga2002/IAGA2002Writer.py +++ b/geomagio/iaga2002/IAGA2002Writer.py @@ -37,7 +37,7 @@ class IAGA2002Writer(object): (channel, str(TimeseriesUtility.get_channels(timeseries)))) stats = timeseries[0].stats if len(channels) != 4: - self._pad_to_four_channels(timeseries, channels) + channels = self._pad_to_four_channels(timeseries, channels) out.write(self._format_headers(stats, channels)) out.write(self._format_comments(stats)) out.write(self._format_channels(channels, stats.station)) @@ -247,10 +247,12 @@ class IAGA2002Writer(object): for val in values]) def _pad_to_four_channels(self, timeseries, channels): + padded = channels[:] for x in range(len(channels), 4): channel = self.empty_channel - channels.append(channel) + padded.append(channel) timeseries += create_empty_trace(timeseries[0], channel) + return padded @classmethod def format(self, timeseries, channels):