import numpy import os from obspy.core import Stats, Trace from obspy import UTCDateTime from io import BytesIO import json import warnings try: import fcntl except: pass class ObjectView(object): """ Wrap a dictionary so its properties can be accessed as an object. Parameters ---------- d : dictionary The dictionary to wrap. """ def __init__(self, d): self.__dict__ = d def __str__(self): """ Override string representation to output wrapped dictionary. """ return str(self.__dict__) def get_file_from_url(url, createParentDirectory=False): """Get a file for writing. Ensures parent directory exists. Parameters ---------- url : str path to file createParentDirectory : bool whether to create parent directory if it does not exist. useful when preparing to write to the returned file. Returns ------- str path to file without file:// prefix Raises ------ Exception if url does not start with file:// """ if not url.startswith("file://"): raise Exception("Only file urls are supported by get_file_from_url") filename = url.replace("file://", "") if createParentDirectory: parent = os.path.dirname(filename) if not os.path.exists(parent): os.makedirs(parent) return filename def get_intervals(starttime, endtime, size=86400, align=True, trim=False): """Divide an interval into smaller intervals. Divides the interval [starttime, endtime] into chunks. Parameters ---------- starttime : obspy.core.UTCDateTime start of time interval to divide endtime : obspy.core.UTCDateTime end of time interval to divide size : int size of each interval in seconds. when <= 0, returns one interval from start to end. align : bool align intervals to unix epoch. (works best when size evenly divides a day) trim : bool whether to trim first/last interval to starttime and endtime. Returns ------- list<dict> each dictionary has the keys "starttime" and "endtime" which represent [intervalstart, intervalend). """ if size <= 0: return [{"start": starttime, "end": endtime}] if align: # align based on size time = starttime - (starttime.timestamp % size) else: time = starttime intervals = [] while time < endtime: start = time time = time + size end = time if trim: if start < starttime: start = starttime if end > endtime: end = endtime intervals.append({"start": start, "end": end}) return intervals def read_file(filepath): """Open and read file contents. Parameters ---------- filepath : str path to a file Returns ------- str contents of file Raises ------ IOError if file does not exist """ file_data = None with open(filepath, "r") as f: file_data = f.read() return file_data def read_url(url, connect_timeout=15, max_redirects=5, timeout=300): """Open and read url contents. Parameters ---------- url : str A urllib2 compatible url, such as http:// or file://. Returns ------- str contents returned by url. Raises ------ IOError if any occurs """ try: # short circuit file urls filepath = get_file_from_url(url) return read_file(filepath) except IOError as e: raise e except Exception: pass # wait to import pycurl until it is needed import pycurl content = None out = BytesIO() curl = pycurl.Curl() try: curl.setopt(pycurl.FOLLOWLOCATION, 1) curl.setopt(pycurl.MAXREDIRS, max_redirects) curl.setopt(pycurl.CONNECTTIMEOUT, connect_timeout) curl.setopt(pycurl.TIMEOUT, timeout) curl.setopt(pycurl.NOSIGNAL, 1) curl.setopt(pycurl.URL, url) curl.setopt(pycurl.WRITEFUNCTION, out.write) curl.perform() content = out.getvalue() content = content.decode("utf-8") except pycurl.error as e: raise IOError(e.args) finally: curl.close() return content def create_empty_trace(trace, channel): """ Utility to create an empty trace, similar to another trace. Parameters ---------- trace: obspy.core.Trace Trace that is source of most metadata, including array length. channel: String Channel name for created Trace. Returns ------- obspy.core.Trace a Trace object, filled with numpy.nan. """ stats = Stats(trace.stats) stats.channel = channel count = len(trace.data) numpy_data = numpy.full((count), numpy.nan) return Trace(numpy_data, stats) def encode_utcdatetime(obj): """ Custom JSON encoder for dealing with UTCDateTime objects """ if isinstance(obj, UTCDateTime): return str(obj) raise TypeError( f"Object of type '{obj.__class__.__name__}' is not JSON serializable" ) def decode_utcdatetime(dct): """ Custom JSON decoder for converting time fields back to UTCDateTime objects """ for key in ["start_time", "end_time", "starttime", "endtime"]: if key in dct: dct[key] = UTCDateTime(dct[key]) if dct[key] else None return dct def write_state_file(filename, data, directory=None, encoder=None): """ Writes data to a state file in a thread-safe manner. Parameters: ----------- filename: String The name of the file to write to. data: The data to write to the file. This should be a Python object that can be serialized with json. directory: String The directory to write the file to. If not provided, the file will be written to the .cache directory in the current user's home directory. encoder: function Function to be given to json.dump's 'default' parameter. If not provided it will use a simple encoder that handles UTCDateTime objects. Returns: -------- None Raises: ------- IOError: If an I/O error occurs. TypeError: If the data cannot be serialized to JSON. """ if directory is None: directory = os.path.join(os.path.expanduser("~"), ".cache", "geomag-algorithms") if encoder is None: encoder = encode_utcdatetime # Create the directory if it doesn't exist try: os.makedirs(directory, exist_ok=True) except OSError as e: print(f"Error creating directory: {e}") raise filepath = os.path.join(directory, filename) try: with open(filepath, "w") as f: try: fcntl.flock(f, fcntl.LOCK_EX) json.dump(data, f, default=encoder) fcntl.flock(f, fcntl.LOCK_UN) except NameError as e: print( f"The fcntl module is not supported in Windows. Reading/writing state files will not work: {e}" ) pass except IOError as e: print(f"Error locking or writing to file: {e}") raise except TypeError as e: print(f"Error serializing data to JSON: {e}") raise except IOError as e: print(f"Error opening file: {e}") raise def read_state_file(filename, directory=None, decoder=None): """ Reads data from a state file in a thread-safe manner. Parameters: filename: String The name of the file to read from. directory: String The directory to read the file from. If not provided, the file will be read from the .cache directory in the current user's home directory. encoder: function Object hook function to be given to json.load. If not provided it will use a simple decoder that handles common start/end time fields. Returns: -------- data: Object Python object that was deserialized from the json state file. Raises: ------- IOError: If an I/O error occurs. json.JSONDecodeError: If the data cannot be deserialized from JSON. """ if directory is None: directory = os.path.join(os.path.expanduser("~"), ".cache", "geomag-algorithms") if decoder is None: decoder = decode_utcdatetime filepath = os.path.join(directory, filename) try: with open(filepath, "r") as f: try: fcntl.flock(f, fcntl.LOCK_SH) data = json.load(f, object_hook=decoder) fcntl.flock(f, fcntl.LOCK_UN) return data except NameError as e: print( f"The fcntl module is not supported in Windows. Reading/writing state files will not work: {e}" ) except IOError as e: print(f"Error locking or reading from file: {e}") raise except json.JSONDecodeError as e: print(f"Error deserializing data from JSON: {e}") raise except IOError as e: print(f"Error opening file: {e}") raise