Skip to content
Snippets Groups Projects
Util.py 9.14 KiB
Newer Older
import numpy
from obspy.core import Stats, Trace
from obspy import UTCDateTime
from io import BytesIO
import warnings

try:
    import fcntl
except:
    pass
    """
    Wrap a dictionary so its properties can be accessed as an object.

    Parameters
    ----------
    d : dictionary
        The dictionary to wrap.
    """
    def __init__(self, d):
        self.__dict__ = d

    def __str__(self):
        """
        Override string representation to output wrapped dictionary.
        """
def get_file_from_url(url, createParentDirectory=False):
    """Get a file for writing.

    Ensures parent directory exists.

    Parameters
    ----------
    url : str
        path to file
    createParentDirectory : bool
        whether to create parent directory if it does not exist.
        useful when preparing to write to the returned file.

    Returns
    -------
    str
        path to file without file:// prefix

    Raises
    ------
    Exception
        if url does not start with file://
    """
    if not url.startswith("file://"):
        raise Exception("Only file urls are supported by get_file_from_url")
    filename = url.replace("file://", "")
    if createParentDirectory:
        parent = os.path.dirname(filename)
        if not os.path.exists(parent):
            os.makedirs(parent)
    return filename


def get_intervals(starttime, endtime, size=86400, align=True, trim=False):
    """Divide an interval into smaller intervals.

    Divides the interval [starttime, endtime] into chunks.

    Parameters
    ----------
    starttime : obspy.core.UTCDateTime
        start of time interval to divide
    endtime : obspy.core.UTCDateTime
        end of time interval to divide
    size : int
        size of each interval in seconds.
        when <= 0, returns one interval from start to end.
    align : bool
        align intervals to unix epoch.
        (works best when size evenly divides a day)
    trim : bool
        whether to trim first/last interval to starttime and endtime.

    Returns
    -------
    list<dict>
        each dictionary has the keys "starttime" and "endtime"
        which represent [intervalstart, intervalend).
    """
        return [{"start": starttime, "end": endtime}]
    if align:
        # align based on size
        time = starttime - (starttime.timestamp % size)
    else:
        time = starttime
    intervals = []
    while time < endtime:
        start = time
        time = time + size
        end = time
        if trim:
            if start < starttime:
                start = starttime
            if end > endtime:
                end = endtime
        intervals.append({"start": start, "end": end})
def read_file(filepath):
    """Open and read file contents.

    Parameters
    ----------
    filepath : str
        path to a file

    Returns
    -------
    str
        contents of file

    Raises
    ------
    IOError
        if file does not exist
    """
    file_data = None
    with open(filepath, "r") as f:
def read_url(url, connect_timeout=15, max_redirects=5, timeout=300):
    """Open and read url contents.

    Parameters
    ----------
    url : str
        A urllib2 compatible url, such as http:// or file://.

    Returns
    -------
    str
        contents returned by url.

    Raises
    ------
        if any occurs
    """
    try:
        # short circuit file urls
        filepath = get_file_from_url(url)
        return read_file(filepath)
    except IOError as e:
        raise e
    except Exception:
        pass
    # wait to import pycurl until it is needed
    import pycurl
    content = None
    out = BytesIO()
    try:
        curl.setopt(pycurl.FOLLOWLOCATION, 1)
        curl.setopt(pycurl.MAXREDIRS, max_redirects)
        curl.setopt(pycurl.CONNECTTIMEOUT, connect_timeout)
        curl.setopt(pycurl.TIMEOUT, timeout)
        curl.setopt(pycurl.NOSIGNAL, 1)
        curl.setopt(pycurl.URL, url)
        curl.setopt(pycurl.WRITEFUNCTION, out.write)
        curl.perform()
        content = out.getvalue()
        content = content.decode("utf-8")
    except pycurl.error as e:
        raise IOError(e.args)
    finally:
        curl.close()
    return content
Hal Simpson's avatar
Hal Simpson committed


def create_empty_trace(trace, channel):
    """
    Utility to create an empty trace, similar to another trace.
Hal Simpson's avatar
Hal Simpson committed

    Parameters
    ----------
    trace: obspy.core.Trace
        Trace that is source of most metadata, including array length.
    channel: String
        Channel name for created Trace.
Hal Simpson's avatar
Hal Simpson committed

    Returns
    -------
    obspy.core.Trace
        a Trace object, filled with numpy.nan.
Hal Simpson's avatar
Hal Simpson committed
    """
    stats = Stats(trace.stats)
Hal Simpson's avatar
Hal Simpson committed
    stats.channel = channel
    count = len(trace.data)
    numpy_data = numpy.full((count), numpy.nan)
    return Trace(numpy_data, stats)
def encode_utcdatetime(obj):
    """
    Custom JSON encoder for dealing with UTCDateTime objects
    """
    if isinstance(obj, UTCDateTime):
        return str(obj)
    raise TypeError(
        f"Object of type '{obj.__class__.__name__}' is not JSON serializable"
    )


def decode_utcdatetime(dct):
    """
    Custom JSON decoder for converting time fields back to UTCDateTime objects
    """
    for key in ["start_time", "end_time", "starttime", "endtime"]:
        if key in dct:
            dct[key] = UTCDateTime(dct[key]) if dct[key] else None
    return dct


def write_state_file(filename, data, directory=None, encoder=None):
    Writes data to a state file in a thread-safe manner.

    Parameters:
    -----------
    filename: String
        The name of the file to write to.
    data:
        The data to write to the file. This should be a Python object that can be serialized with json.
    directory: String
        The directory to write the file to. If not provided, the file will be written to the .cache directory in the current user's home directory.
    encoder: function
        Function to be given to json.dump's 'default' parameter. If not provided it will use a simple encoder that handles UTCDateTime objects.

    Returns:
    --------
    None

    Raises:
    -------
    IOError: If an I/O error occurs.
    TypeError: If the data cannot be serialized to JSON.
    """
    if directory is None:
        directory = os.path.join(os.path.expanduser("~"), ".cache", "geomag-algorithms")

    if encoder is None:
        encoder = encode_utcdatetime

    # Create the directory if it doesn't exist
    try:
        os.makedirs(directory, exist_ok=True)
    except OSError as e:
        print(f"Error creating directory: {e}")
        raise

    filepath = os.path.join(directory, filename)

    try:
        with open(filepath, "w") as f:
            try:
                fcntl.flock(f, fcntl.LOCK_EX)
                json.dump(data, f, default=encoder)
                fcntl.flock(f, fcntl.LOCK_UN)
            except NameError as e:
                print(
                    f"The fcntl module is not supported in Windows. Reading/writing state files will not work: {e}"
                )
Geels, Brendan Ryan's avatar
Geels, Brendan Ryan committed
                pass
            except IOError as e:
                print(f"Error locking or writing to file: {e}")
                raise
            except TypeError as e:
                print(f"Error serializing data to JSON: {e}")
                raise
    except IOError as e:
        print(f"Error opening file: {e}")
        raise


def read_state_file(filename, directory=None, decoder=None):
    Reads data from a state file in a thread-safe manner.

    Parameters:
    filename: String
        The name of the file to read from.
    directory: String
        The directory to read the file from. If not provided, the file will be read from the .cache directory in the current user's home directory.
    encoder: function
        Object hook function to be given to json.load. If not provided it will use a simple decoder that handles common start/end time fields.

    Returns:
    --------
    data: Object
        Python object that was deserialized from the json state file.

    Raises:
    -------
    IOError: If an I/O error occurs.
    json.JSONDecodeError: If the data cannot be deserialized from JSON.
    """
    if directory is None:
        directory = os.path.join(os.path.expanduser("~"), ".cache", "geomag-algorithms")

    if decoder is None:
        decoder = decode_utcdatetime

    filepath = os.path.join(directory, filename)

    try:
        with open(filepath, "r") as f:
            try:
                fcntl.flock(f, fcntl.LOCK_SH)
                data = json.load(f, object_hook=decoder)
                fcntl.flock(f, fcntl.LOCK_UN)
                return data
            except NameError as e:
                print(
                    f"The fcntl module is not supported in Windows. Reading/writing state files will not work: {e}"
                )
            except IOError as e:
                print(f"Error locking or reading from file: {e}")
                raise
            except json.JSONDecodeError as e:
                print(f"Error deserializing data from JSON: {e}")
                raise
    except IOError as e:
        print(f"Error opening file: {e}")
        raise