diff --git a/geomagio/Util.py b/geomagio/Util.py index ee2cff6a011f58cd7a77d38e43b80ef5650c0b83..a72f6a05bd8fd2fe826e6f1d80a7b7494eb833d5 100644 --- a/geomagio/Util.py +++ b/geomagio/Util.py @@ -1,5 +1,6 @@ import urllib2 import numpy +import os from obspy.core import Stats, Trace @@ -22,6 +23,86 @@ class ObjectView(object): return str(self.__dict__) +def get_file_from_url(url, createParentDirectory=False): + """Get a file for writing. + + Ensures parent directory exists. + + Parameters + ---------- + url : str + path to file + createParentDirectory : bool + whether to create parent directory if it does not exist. + useful when preparing to write to the returned file. + + Returns + ------- + str + path to file without file:// prefix + + Raises + ------ + Exception + if url does not start with file:// + """ + if not url.startswith('file://'): + raise Exception('Only file urls are supported by get_file_from_url') + filename = url.replace('file://', '') + if createParentDirectory: + parent = os.path.dirname(filename) + if not os.path.exists(parent): + os.makedirs(parent) + return filename + + +def get_intervals(starttime, endtime, size=86400, align=True, trim=False): + """Divide an interval into smaller intervals. + + Divides the interval [starttime, endtime] into chunks. + + Parameters + ---------- + starttime : obspy.core.UTCDateTime + start of time interval to divide + endtime : obspy.core.UTCDateTime + end of time interval to divide + size : int + size of each interval in seconds. + align : bool + align intervals to unix epoch. + (works best when size evenly divides a day) + trim : bool + whether to trim first/last interval to starttime and endtime. + + Returns + ------- + list<dict> + each dictionary has the keys "starttime" and "endtime" + which represent [intervalstart, intervalend). + """ + if align: + # align based on size + time = starttime - (starttime.timestamp % size) + else: + time = starttime + intervals = [] + while time < endtime: + start = time + time = time + size + end = time + if trim: + if start < starttime: + start = starttime + if end > endtime: + end = endtime + intervals.append({ + 'start': start, + 'end': end + }) + return intervals + + def read_url(url): """Open and read url contents. diff --git a/test/Util_test.py b/test/Util_test.py new file mode 100644 index 0000000000000000000000000000000000000000..01618de6a0473e0246a3e007c3bc0b5a0b287fe3 --- /dev/null +++ b/test/Util_test.py @@ -0,0 +1,66 @@ +#! /usr/bin/env python +import os.path +import shutil +from nose.tools import assert_equals, assert_false +from geomagio import Util +from obspy.core import UTCDateTime + + +def test_get_file_for_url__throws_exception(): + """Util_test.test_get_file_for_url__throws_exception() + """ + # throws exception for non "file://" urls + try: + Util.get_file_from_url('http://someserver/path') + assert_false('expected exception') + except Exception: + pass + + +def test_get_file_for_url__parses_file_urls(): + """Util_test.test_get_file_for_url__parses_file_urls() + """ + # parses file urls + f = Util.get_file_from_url('file://./somefile') + assert_equals(f, './somefile') + + +def test_get_file_for_url__creates_directories(): + """Util_test.test_get_file_for_url__creates_directories() + """ + # creates directories if requested + if os.path.isdir('/tmp/_geomag_algorithms_test_'): + shutil.rmtree('/tmp/_geomag_algorithms_test_') + f = Util.get_file_from_url('file:///tmp/_geomag_algorithms_test_/somefile', + createParentDirectory=True) + if not os.path.isdir('/tmp/_geomag_algorithms_test_'): + assert_false('directory not created') + shutil.rmtree('/tmp/_geomag_algorithms_test_') + assert_equals(f, '/tmp/_geomag_algorithms_test_/somefile') + + +def test_get_interval__defaults(): + """Util_test.test_get_interval() + """ + starttime = UTCDateTime('2015-01-01T00:00:00Z') + endtime = UTCDateTime('2015-02-01T00:00:00Z') + intervals = Util.get_intervals(starttime, endtime) + assert_equals(len(intervals), 31) + + +def test_get_interval__custom_size(): + """Util_test.test_get_interval__custom_size() + """ + starttime = UTCDateTime('2015-01-01T00:00:00Z') + endtime = UTCDateTime('2015-01-02T00:00:00Z') + intervals = Util.get_intervals(starttime, endtime, size=3600) + assert_equals(len(intervals), 24) + + +def test_get_interval__trim(): + """Util_test.test_get_interval__trim() + """ + starttime = UTCDateTime('2015-01-01T01:00:00Z') + endtime = UTCDateTime('2015-01-02T00:00:00Z') + intervals = Util.get_intervals(starttime, endtime, trim=True) + assert_equals(intervals[0]['start'], starttime)