Newer
Older
"""Abstract Timeseries Factory Interface."""
from __future__ import absolute_import, print_function

Jeremy M Fee
committed
import numpy
import obspy.core
from .TimeseriesFactoryException import TimeseriesFactoryException
from . import TimeseriesUtility
from . import Util
class TimeseriesFactory(object):
"""Base class for timeseries factories.
Add input support by:
- implementing `parse_string`
- or, overriding `get_timeseries`
Add output support by:
- implementing `write_file`
- or, overriding `put_timeseries`
Attributes
----------
observatory : str
default observatory code, usually 3 characters.
channels : array_like
default list of channels to load, optional.
default ('H', 'D', 'Z', 'F')
type : {'definitive', 'provisional', 'quasi-definitive', 'variation'}
default data type, optional.
default 'variation'.
interval : {'day', 'hour', 'minute', 'month, 'second'}
data interval, optional.
default 'minute'.

Jeremy M Fee
committed
A string that contains replacement patterns.
See https://github.com/usgs/geomag-algorithms/blob/master/docs/io.md
and/or TimeseriesFactory._get_url()
urlInterval : int
Interval in seconds between URLs.
Intervals begin at the unix epoch (1970-01-01T00:00:00Z)
"""
def __init__(
self,
observatory=None,
channels=("H", "D", "Z", "F"),
type="variation",
interval="minute",
urlTemplate="",
urlInterval=-1,
):
self.observatory = observatory
self.channels = channels
self.type = type
self.interval = interval
self.urlInterval = urlInterval
def get_timeseries(
self,
starttime,
endtime,
observatory=None,
channels=None,
type=None,
interval=None,
):
"""Get timeseries data.
Support for specific channels, types, and intervals varies
between factory and observatory. Subclasses should raise
TimeseriesFactoryException if the data is not available, or
if an error occurs accessing data.
Parameters
----------
starttime : UTCDateTime
time of first sample in timeseries.
endtime : UTCDateTime
time of last sample in timeseries.
observatory : str
observatory code, usually 3 characters, optional.
uses default if unspecified.
channels : array_like
list of channels to load, optional.
uses default if unspecified.
type : {'definitive', 'provisional', 'quasi-definitive', 'variation'}
data type, optional.
uses default if unspecified.
interval : {'day', 'hour', 'minute', 'month', 'second'}
data interval, optional.
uses default if unspecified.
Returns
-------
obspy.core.Stream
stream containing traces for requested timeseries.
Raises
------
TimeseriesFactoryException
if any parameters are unsupported, or errors occur loading data.

Jeremy M Fee
committed
observatory = observatory or self.observatory
channels = channels or self.channels
type = type or self.type
interval = interval or self.interval
timeseries = obspy.core.Stream()
urlIntervals = Util.get_intervals(
starttime=starttime, endtime=endtime, size=self.urlInterval
)

Jeremy M Fee
committed
for urlInterval in urlIntervals:
url = self._get_url(
observatory=observatory,
date=urlInterval["start"],
type=type,
interval=interval,
channels=channels,
)

Jeremy M Fee
committed
try:
data = Util.read_url(url)
except IOError as e:
print("Error reading url: %s, continuing" % str(e), file=sys.stderr)

Jeremy M Fee
committed
continue

Jeremy M Fee
committed
try:
timeseries += self.parse_string(
data,
observatory=observatory,
type=type,
interval=interval,
channels=channels,
)

Jeremy M Fee
committed
except NotImplementedError:
raise NotImplementedError('"get_timeseries" not implemented')
except Exception as e:
print("Error parsing data: " + str(e), file=sys.stderr)
print(data, file=sys.stderr)

Jeremy M Fee
committed
if channels is not None:
filtered = obspy.core.Stream()
for channel in channels:
filtered += timeseries.select(channel=channel)
timeseries = filtered

Jeremy M Fee
committed
timeseries.merge()

Jeremy M Fee
committed
timeseries.trim(
starttime=starttime,
endtime=endtime,
nearest_sample=False,
pad=True,
fill_value=numpy.nan,
)

Jeremy M Fee
committed
return timeseries

Jeremy M Fee
committed
def parse_string(self, data, **kwargs):
"""Creates error message that this functions is not implemented by
Parameters
----------

Jeremy M Fee
committed
data : str
"""
raise NotImplementedError('"parse_string" not implemented')
def put_timeseries(
self,
timeseries,
starttime=None,
endtime=None,
channels=None,
type=None,
interval=None,
):
"""Store timeseries data.
Parameters
----------
timeseries : obspy.core.Stream
stream containing traces to store.
starttime : UTCDateTime
time of first sample in timeseries to store.
uses first sample if unspecified.
endtime : UTCDateTime
time of last sample in timeseries to store.
uses last sample if unspecified.
channels : array_like
list of channels to store, optional.
uses default if unspecified.
type : {'definitive', 'provisional', 'quasi-definitive', 'variation'}
data type, optional.
uses default if unspecified.
interval : {'day', 'hour', 'minute', 'month', 'second'}
data interval, optional.
uses default if unspecified.
Raises
------
TimeseriesFactoryException
if any errors occur.
"""

Jeremy M Fee
committed
if len(timeseries) == 0:
# no data to put
return
if not self.urlTemplate.startswith("file://"):
raise TimeseriesFactoryException("Only file urls are supported")

Jeremy M Fee
committed
channels = channels or self.channels
type = type or self.type
interval = interval or self.interval
stats = timeseries[0].stats
delta = stats.delta
observatory = stats.station
starttime = starttime or stats.starttime
endtime = endtime or stats.endtime
urlIntervals = Util.get_intervals(
starttime=starttime, endtime=endtime, size=self.urlInterval
)

Jeremy M Fee
committed
for urlInterval in urlIntervals:
interval_start = urlInterval["start"]
interval_end = urlInterval["end"]
if interval_start != interval_end:
interval_end = interval_end - delta

Jeremy M Fee
committed
url = self._get_url(
observatory=observatory,
date=interval_start,
type=type,
interval=interval,
channels=channels,
)

Jeremy M Fee
committed
url_data = timeseries.slice(
starttime=interval_start,
# subtract delta to omit the sample at end: `[start, end)`
endtime=interval_end,
)

Jeremy M Fee
committed
url_file = Util.get_file_from_url(url, createParentDirectory=True)

Jeremy M Fee
committed
# existing data file, merge new data into existing
if os.path.isfile(url_file):
try:
existing_data = Util.read_file(url_file)
existing_data = self.parse_string(
existing_data,
observatory=url_data[0].stats.station,
type=type,
interval=interval,
channels=channels,
)

Jeremy M Fee
committed
# TODO: make parse_string return the correct location code
for trace in existing_data:
# make location codes match, just in case
new_trace = url_data.select(
network=trace.stats.network,
station=trace.stats.station,
channel=trace.stats.channel,
)[0]

Jeremy M Fee
committed
trace.stats.location = new_trace.stats.location
url_data = TimeseriesUtility.merge_streams(existing_data, url_data)

Jeremy M Fee
committed
except IOError:
# no data yet
pass
except NotImplementedError:
# factory only supports output
pass
# pad with NaN's out to urlInterval (like get_timeseries())
url_data.trim(
starttime=interval_start,
endtime=interval_end,
fill_value=numpy.nan,
)
with open(url_file, "wb") as fh:

Jeremy M Fee
committed
try:
self.write_file(fh, url_data, channels)
except NotImplementedError:
raise NotImplementedError('"put_timeseries" not implemented')

Jeremy M Fee
committed
def write_file(self, fh, timeseries, channels):
"""Write timeseries data to the given file object.
Parameters
----------
fh : writable
file handle where data is written.
timeseries : obspy.core.Stream
stream containing traces to store.
channels : list
list of channels to store.
"""
raise NotImplementedError('"write_file" not implemented')
def _get_file_from_url(self, url):
"""Get a file for writing.
Ensures parent directory exists.
Parameters
----------
url : str
path to file
Returns
-------
str
path to file without file:// prefix
Raises
------
TimeseriesFactoryException
if url does not start with file://
"""
if not url.startswith("file://"):
raise TimeseriesFactoryException("Only file urls are supported for writing")
filename = url.replace("file://", "")
parent = os.path.dirname(filename)
if not os.path.exists(parent):
os.makedirs(parent)
return filename
def _get_url(
self, observatory, date, type="variation", interval="minute", channels=None
):
"""Get the url for a specified file.
Replaces patterns (described in class docstring) with values based on
parameter values.
Parameters
----------
observatory : str
observatory code.
date : obspy.core.UTCDateTime
day to fetch (only year, month, day are used)
type : {'variation', 'reported', 'provisional', 'adjusted',
'quasi-definitive', 'definitive'}
interval : {'minute', 'second', 'hour', 'day'}
channels : list
list of data channels being requested
Raises
------
TimeseriesFactoryException
if type or interval are not supported.
"""

Jeremy M Fee
committed
params = {
"date": date.datetime,
"i": self._get_interval_abbreviation(interval),
"interval": self._get_interval_name(interval),
"minute": date.hour * 60 + date.minute,
"month": date.strftime("%b").lower(),
"MONTH": date.strftime("%b").upper(),
"obs": observatory.lower(),
"OBS": observatory.upper(),
"t": self._get_type_abbreviation(type),
"type": self._get_type_name(type),

Jeremy M Fee
committed
# LEGACY
# old date properties, string.format supports any strftime format
# i.e. '{date:%j}'
"julian": date.strftime("%j"),
"year": date.strftime("%Y"),
"ymd": date.strftime("%Y%m%d"),

Jeremy M Fee
committed
}

Jeremy M Fee
committed
# use new style string formatting
return self.urlTemplate.format(**params)
# use old style string interpolation
return self.urlTemplate % params
def _get_interval_abbreviation(self, interval):
"""Get abbreviation for a data interval.
Used by ``_get_url`` to replace ``%(i)s`` in urlTemplate.
Parameters
----------
interval : {'day', 'hour', 'minute', 'month', 'second'}
Returns
-------
abbreviation for ``interval``.
Raises
------
TimeseriesFactoryException
if ``interval`` is not supported.
"""
elif interval == "hour":
interval_abbr = "hor"
elif interval == "minute":
interval_abbr = "min"
elif interval == "month":
interval_abbr = "mon"
elif interval == "second":
interval_abbr = "sec"
raise TimeseriesFactoryException('Unexpected interval "%s"' % interval)
return interval_abbr
def _get_interval_name(self, interval):
"""Get name for a data interval.
Used by ``_get_url`` to replace ``%(interval)s`` in urlTemplate.
Parameters
----------
interval : {'minute', 'second'}
Returns
-------
name for ``interval``.
Raises
------
TimeseriesFactoryException
if ``interval`` is not supported.
"""
interval_name = None
if interval == "minute":
interval_name = "OneMinute"
elif interval == "second":
interval_name = "OneSecond"
elif interval == "hour":
raise TimeseriesFactoryException('Unsupported interval "%s"' % interval)
return interval_name
def _get_type_abbreviation(self, type):
"""Get abbreviation for a data type.
Used by ``_get_url`` to replace ``%(t)s`` in urlTemplate.
Parameters
----------
type : {'definitive', 'provisional', 'quasi-definitive', 'variation'}
Returns
-------
name for ``type``.
Raises
------
TimeseriesFactoryException
if ``type`` is not supported.
"""
type_abbr = None
if type == "definitive":
type_abbr = "d"
elif type == "provisional" or type == "adjusted":
type_abbr = "p"
elif type == "quasi-definitive":
type_abbr = "q"
elif type == "variation" or type == "reported":
type_abbr = "v"
raise TimeseriesFactoryException('Unexpected type "%s"' % type)
return type_abbr
def _get_type_name(self, type):
"""Get name for a data type.
Used by ``_get_url`` to replace ``%(type)s`` in urlTemplate.
Parameters
----------
type : {'variation', 'reported', 'provisional', 'adjusted',
'quasi-definitive', 'quasidefinitive', 'definitive' }
Returns
-------
name for ``type``.
Raises
------
TimeseriesFactoryException
if ``type`` is not supported.
"""
type_name = None
if type == "variation" or type == "reported":
type_name = ""
elif type == "provisional" or type == "adjusted":
type_name = "Provisional"
elif type == "quasi-definitive" or type == "quasidefinitive":
type_name = "QuasiDefinitive"
elif type == "definitive":
type_name = "Definitive"
raise TimeseriesFactoryException('Unsupported type "%s"' % type)