Newer
Older
from obspy.core import Stats, Trace
from obspy import UTCDateTime
import warnings
try:
import fcntl
except:
pass
Hal Simpson
committed
class ObjectView(object):
"""
Wrap a dictionary so its properties can be accessed as an object.
Parameters
----------
d : dictionary
The dictionary to wrap.
"""
Hal Simpson
committed
def __init__(self, d):
self.__dict__ = d
def __str__(self):
"""
Override string representation to output wrapped dictionary.
"""
Hal Simpson
committed
return str(self.__dict__)
def get_file_from_url(url, createParentDirectory=False):
"""Get a file for writing.
Ensures parent directory exists.
Parameters
----------
url : str
path to file
createParentDirectory : bool
whether to create parent directory if it does not exist.
useful when preparing to write to the returned file.
Returns
-------
str
path to file without file:// prefix
Raises
------
Exception
if url does not start with file://
"""
if not url.startswith("file://"):
raise Exception("Only file urls are supported by get_file_from_url")
filename = url.replace("file://", "")
if createParentDirectory:
parent = os.path.dirname(filename)
if not os.path.exists(parent):
os.makedirs(parent)
return filename
def get_intervals(starttime, endtime, size=86400, align=True, trim=False):
"""Divide an interval into smaller intervals.
Divides the interval [starttime, endtime] into chunks.
Parameters
----------
starttime : obspy.core.UTCDateTime
start of time interval to divide
endtime : obspy.core.UTCDateTime
end of time interval to divide
size : int
size of each interval in seconds.
when <= 0, returns one interval from start to end.
align : bool
align intervals to unix epoch.
(works best when size evenly divides a day)
trim : bool
whether to trim first/last interval to starttime and endtime.
Returns
-------
list<dict>
each dictionary has the keys "starttime" and "endtime"
which represent [intervalstart, intervalend).
"""
return [{"start": starttime, "end": endtime}]
if align:
# align based on size
time = starttime - (starttime.timestamp % size)
else:
time = starttime
intervals = []
while time < endtime:
start = time
time = time + size
end = time
if trim:
if start < starttime:
start = starttime
if end > endtime:
end = endtime
intervals.append({"start": start, "end": end})
return intervals
def read_file(filepath):
"""Open and read file contents.
Parameters
----------
filepath : str
path to a file
Returns
-------
str
contents of file
Raises
------
IOError
if file does not exist
"""
file_data = None
file_data = f.read()
return file_data
def read_url(url, connect_timeout=15, max_redirects=5, timeout=300):
"""Open and read url contents.
Parameters
----------
url : str
A urllib2 compatible url, such as http:// or file://.
Returns
-------
str
contents returned by url.
Raises
------
try:
# short circuit file urls
filepath = get_file_from_url(url)
return read_file(filepath)
except IOError as e:
raise e
except Exception:
pass
# wait to import pycurl until it is needed
import pycurl
curl = pycurl.Curl()
curl.setopt(pycurl.FOLLOWLOCATION, 1)
curl.setopt(pycurl.MAXREDIRS, max_redirects)
curl.setopt(pycurl.CONNECTTIMEOUT, connect_timeout)
curl.setopt(pycurl.TIMEOUT, timeout)
curl.setopt(pycurl.NOSIGNAL, 1)
curl.setopt(pycurl.URL, url)
curl.setopt(pycurl.WRITEFUNCTION, out.write)
curl.perform()
content = out.getvalue()
except pycurl.error as e:
raise IOError(e.args)
def create_empty_trace(trace, channel):
"""
Utility to create an empty trace, similar to another trace.
trace: obspy.core.Trace
Trace that is source of most metadata, including array length.
channel: String
Channel name for created Trace.
a Trace object, filled with numpy.nan.
stats = Stats(trace.stats)
stats.channel = channel
count = len(trace.data)
numpy_data = numpy.full((count), numpy.nan)
return Trace(numpy_data, stats)
def encode_utcdatetime(obj):
"""
Custom JSON encoder for dealing with UTCDateTime objects
"""
if isinstance(obj, UTCDateTime):
return str(obj)
raise TypeError(
f"Object of type '{obj.__class__.__name__}' is not JSON serializable"
)
def decode_utcdatetime(dct):
"""
Custom JSON decoder for converting time fields back to UTCDateTime objects
"""
for key in ["start_time", "end_time", "starttime", "endtime"]:
if key in dct:
dct[key] = UTCDateTime(dct[key]) if dct[key] else None
return dct
def write_state_file(filename, data, directory=None, encoder=None):
Writes data to a state file in a thread-safe manner.
Parameters:
-----------
filename: String
The name of the file to write to.
data:
The data to write to the file. This should be a Python object that can be serialized with json.
directory: String
The directory to write the file to. If not provided, the file will be written to the .cache directory in the current user's home directory.
encoder: function
Function to be given to json.dump's 'default' parameter. If not provided it will use a simple encoder that handles UTCDateTime objects.
Returns:
--------
None
Raises:
-------
IOError: If an I/O error occurs.
TypeError: If the data cannot be serialized to JSON.
"""
if directory is None:
directory = os.path.join(os.path.expanduser("~"), ".cache", "geomag-algorithms")
if encoder is None:
encoder = encode_utcdatetime
# Create the directory if it doesn't exist
try:
os.makedirs(directory, exist_ok=True)
except OSError as e:
print(f"Error creating directory: {e}")
raise
filepath = os.path.join(directory, filename)
try:
with open(filepath, "w") as f:
try:
fcntl.flock(f, fcntl.LOCK_EX)
json.dump(data, f, default=encoder)
except NameError as e:
print(
f"The fcntl module is not supported in Windows. Reading/writing state files will not work: {e}"
)
except IOError as e:
print(f"Error locking or writing to file: {e}")
raise
except TypeError as e:
print(f"Error serializing data to JSON: {e}")
raise
except IOError as e:
print(f"Error opening file: {e}")
raise
def read_state_file(filename, directory=None, decoder=None):
Reads data from a state file in a thread-safe manner.
Parameters:
filename: String
The name of the file to read from.
directory: String
The directory to read the file from. If not provided, the file will be read from the .cache directory in the current user's home directory.
encoder: function
Object hook function to be given to json.load. If not provided it will use a simple decoder that handles common start/end time fields.
Returns:
--------
data: Object
Python object that was deserialized from the json state file.
Raises:
-------
IOError: If an I/O error occurs.
json.JSONDecodeError: If the data cannot be deserialized from JSON.
"""
if directory is None:
directory = os.path.join(os.path.expanduser("~"), ".cache", "geomag-algorithms")
if decoder is None:
decoder = decode_utcdatetime
filepath = os.path.join(directory, filename)
try:
with open(filepath, "r") as f:
try:
fcntl.flock(f, fcntl.LOCK_SH)
data = json.load(f, object_hook=decoder)
fcntl.flock(f, fcntl.LOCK_UN)
return data
except NameError as e:
print(
f"The fcntl module is not supported in Windows. Reading/writing state files will not work: {e}"
)
except IOError as e:
print(f"Error locking or reading from file: {e}")
raise
except json.JSONDecodeError as e:
print(f"Error deserializing data from JSON: {e}")
raise
except IOError as e:
print(f"Error opening file: {e}")
raise