Newer
Older
from obspy.core import Stats, Trace
Hal Simpson
committed
class ObjectView(object):
"""
Wrap a dictionary so its properties can be accessed as an object.
Parameters
----------
d : dictionary
The dictionary to wrap.
"""
Hal Simpson
committed
def __init__(self, d):
self.__dict__ = d
def __str__(self):
"""
Override string representation to output wrapped dictionary.
"""
Hal Simpson
committed
return str(self.__dict__)
def get_file_from_url(url, createParentDirectory=False):
"""Get a file for writing.
Ensures parent directory exists.
Parameters
----------
url : str
path to file
createParentDirectory : bool
whether to create parent directory if it does not exist.
useful when preparing to write to the returned file.
Returns
-------
str
path to file without file:// prefix
Raises
------
Exception
if url does not start with file://
"""
if not url.startswith("file://"):
raise Exception("Only file urls are supported by get_file_from_url")
filename = url.replace("file://", "")
if createParentDirectory:
parent = os.path.dirname(filename)
if not os.path.exists(parent):
os.makedirs(parent)
return filename
def get_intervals(starttime, endtime, size=86400, align=True, trim=False):
"""Divide an interval into smaller intervals.
Divides the interval [starttime, endtime] into chunks.
Parameters
----------
starttime : obspy.core.UTCDateTime
start of time interval to divide
endtime : obspy.core.UTCDateTime
end of time interval to divide
size : int
size of each interval in seconds.
when <= 0, returns one interval from start to end.
align : bool
align intervals to unix epoch.
(works best when size evenly divides a day)
trim : bool
whether to trim first/last interval to starttime and endtime.
Returns
-------
list<dict>
each dictionary has the keys "starttime" and "endtime"
which represent [intervalstart, intervalend).
"""
return [{"start": starttime, "end": endtime}]
if align:
# align based on size
time = starttime - (starttime.timestamp % size)
else:
time = starttime
intervals = []
while time < endtime:
start = time
time = time + size
end = time
if trim:
if start < starttime:
start = starttime
if end > endtime:
end = endtime
intervals.append({"start": start, "end": end})
return intervals
def read_file(filepath):
"""Open and read file contents.
Parameters
----------
filepath : str
path to a file
Returns
-------
str
contents of file
Raises
------
IOError
if file does not exist
"""
file_data = None
file_data = f.read()
return file_data
def read_url(url, connect_timeout=15, max_redirects=5, timeout=300):
"""Open and read url contents.
Parameters
----------
url : str
A urllib2 compatible url, such as http:// or file://.
Returns
-------
str
contents returned by url.
Raises
------
try:
# short circuit file urls
filepath = get_file_from_url(url)
return read_file(filepath)
except IOError as e:
raise e
except Exception:
pass
# wait to import pycurl until it is needed
import pycurl
curl = pycurl.Curl()
curl.setopt(pycurl.FOLLOWLOCATION, 1)
curl.setopt(pycurl.MAXREDIRS, max_redirects)
curl.setopt(pycurl.CONNECTTIMEOUT, connect_timeout)
curl.setopt(pycurl.TIMEOUT, timeout)
curl.setopt(pycurl.NOSIGNAL, 1)
curl.setopt(pycurl.URL, url)
curl.setopt(pycurl.WRITEFUNCTION, out.write)
curl.perform()
content = out.getvalue()
except pycurl.error as e:
raise IOError(e.args)
def create_empty_trace(trace, channel):
"""
Utility to create an empty trace, similar to another trace.
trace: obspy.core.Trace
Trace that is source of most metadata, including array length.
channel: String
Channel name for created Trace.
a Trace object, filled with numpy.nan.
stats = Stats(trace.stats)
stats.channel = channel
count = len(trace.data)
numpy_data = numpy.full((count), numpy.nan)
return Trace(numpy_data, stats)
def write_state_file(filename, data, directory=None):
Writes data to a state file in a thread-safe manner.
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
Parameters:
-----------
filename: String
The name of the file to write to.
data:
The data to write to the file. This should be a Python object that can be serialized with json.
directory: String
The directory to write the file to. If not provided, the file will be written to the .cache directory in the current user's home directory.
Returns:
--------
None
Raises:
-------
IOError: If an I/O error occurs.
TypeError: If the data cannot be serialized to JSON.
"""
if directory is None:
directory = os.path.join(os.path.expanduser("~"), ".cache", "geomag-algorithms")
# Create the directory if it doesn't exist
try:
os.makedirs(directory, exist_ok=True)
except OSError as e:
print(f"Error creating directory: {e}")
raise
filepath = os.path.join(directory, filename)
try:
with open(filepath, "w") as f:
try:
fcntl.flock(f, fcntl.LOCK_EX)
json.dump(data, f)
fcntl.flock(f, fcntl.LOCK_UN)
except IOError as e:
print(f"Error locking or writing to file: {e}")
raise
except TypeError as e:
print(f"Error serializing data to JSON: {e}")
raise
except IOError as e:
print(f"Error opening file: {e}")
raise
def read_state_file(filename, directory=None):
Reads data from a state file in a thread-safe manner.
Parameters:
filename: String
The name of the file to read from.
directory: String
The directory to read the file from. If not provided, the file will be read from the .cache directory in the current user's home directory.
Returns:
--------
data: Object
Python object that was deserialized from the json state file.
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
Raises:
-------
IOError: If an I/O error occurs.
json.JSONDecodeError: If the data cannot be deserialized from JSON.
"""
if directory is None:
directory = os.path.join(os.path.expanduser("~"), ".cache", "geomag-algorithms")
filepath = os.path.join(directory, filename)
try:
with open(filepath, "r") as f:
try:
fcntl.flock(f, fcntl.LOCK_SH)
data = json.load(f)
fcntl.flock(f, fcntl.LOCK_UN)
return data
except IOError as e:
print(f"Error locking or reading from file: {e}")
raise
except json.JSONDecodeError as e:
print(f"Error deserializing data from JSON: {e}")
raise
except IOError as e:
print(f"Error opening file: {e}")
raise