Newer
Older
from obspy.core import Stats, Trace
Hal Simpson
committed
class ObjectView(object):
"""
Wrap a dictionary so its properties can be accessed as an object.
Parameters
----------
d : dictionary
The dictionary to wrap.
"""
Hal Simpson
committed
def __init__(self, d):
self.__dict__ = d
def __str__(self):
"""
Override string representation to output wrapped dictionary.
"""
Hal Simpson
committed
return str(self.__dict__)
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
def get_file_from_url(url, createParentDirectory=False):
"""Get a file for writing.
Ensures parent directory exists.
Parameters
----------
url : str
path to file
createParentDirectory : bool
whether to create parent directory if it does not exist.
useful when preparing to write to the returned file.
Returns
-------
str
path to file without file:// prefix
Raises
------
Exception
if url does not start with file://
"""
if not url.startswith('file://'):
raise Exception('Only file urls are supported by get_file_from_url')
filename = url.replace('file://', '')
if createParentDirectory:
parent = os.path.dirname(filename)
if not os.path.exists(parent):
os.makedirs(parent)
return filename
def get_intervals(starttime, endtime, size=86400, align=True, trim=False):
"""Divide an interval into smaller intervals.
Divides the interval [starttime, endtime] into chunks.
Parameters
----------
starttime : obspy.core.UTCDateTime
start of time interval to divide
endtime : obspy.core.UTCDateTime
end of time interval to divide
size : int
size of each interval in seconds.
align : bool
align intervals to unix epoch.
(works best when size evenly divides a day)
trim : bool
whether to trim first/last interval to starttime and endtime.
Returns
-------
list<dict>
each dictionary has the keys "starttime" and "endtime"
which represent [intervalstart, intervalend).
"""
if align:
# align based on size
time = starttime - (starttime.timestamp % size)
else:
time = starttime
intervals = []
while time < endtime:
start = time
time = time + size
end = time
if trim:
if start < starttime:
start = starttime
if end > endtime:
end = endtime
intervals.append({
'start': start,
'end': end
})
return intervals
def read_file(filepath):
"""Open and read file contents.
Parameters
----------
filepath : str
path to a file
Returns
-------
str
contents of file
Raises
------
IOError
if file does not exist
"""
file_data = None
with open(filepath, 'r') as f:
file_data = f.read()
return file_data
def read_url(url, connect_timeout=15, max_redirects=5, timeout=300):
"""Open and read url contents.
Parameters
----------
url : str
A urllib2 compatible url, such as http:// or file://.
Returns
-------
str
contents returned by url.
Raises
------
try:
# short circuit file urls
filepath = get_file_from_url(url)
return read_file(filepath)
except IOError as e:
raise e
except Exception:
pass
curl = pycurl.Curl()
curl.setopt(pycurl.FOLLOWLOCATION, 1)
curl.setopt(pycurl.MAXREDIRS, max_redirects)
curl.setopt(pycurl.CONNECTTIMEOUT, connect_timeout)
curl.setopt(pycurl.TIMEOUT, timeout)
curl.setopt(pycurl.NOSIGNAL, 1)
curl.setopt(pycurl.URL, url)
curl.setopt(pycurl.WRITEFUNCTION, out.write)
curl.perform()
content = out.getvalue()
except pycurl.error as e:
raise IOError(e.args)
def create_empty_trace(trace, channel):
"""
Utility to create an empty trace, similar to another trace.
trace: obspy.core.Trace
Trace that is source of most metadata, including array length.
channel: String
Channel name for created Trace.
a Trace object, filled with numpy.nan.
stats = Stats(trace.stats)
stats.channel = channel
count = len(trace.data)
numpy_data = numpy.full((count), numpy.nan)
return Trace(numpy_data, stats)