Skip to content
Snippets Groups Projects
Commit 89bc973e authored by Eddie McWhirter's avatar Eddie McWhirter
Browse files

Begin PCDCP file format and tests.

parent 6c457d13
No related branches found
No related tags found
No related merge requests found
"""Factory that loads PCDCP Files."""
import urllib2
import obspy.core
import os
from geomagio import TimeseriesFactory, TimeseriesFactoryException
from PCDCPParser import PCDCPParser
from PCDCPWriter import PCDCPWriter
from geomagio import ChannelConverter
# pattern for pcdcp file names
PCDCP_FILE_PATTERN = '%(obs)s%(y)s%(j)s.min'
def read_url(url):
"""Open and read url contents.
Parameters
----------
url : str
A urllib2 compatible url, such as http:// or file://.
Returns
-------
str
contents returned by url.
Raises
------
urllib2.URLError
if any occurs
"""
response = urllib2.urlopen(url)
content = None
try:
content = response.read()
except urllib2.URLError, e:
print e.reason
raise
finally:
response.close()
return content
class PCDCPFactory(TimeseriesFactory):
"""TimeseriesFactory for PCDCP formatted files.
Parameters
----------
urlTemplate : str
A string that contains any of the following replacement patterns:
- '%(obs)s' lowercase observatory code
- '%(OBS)s' uppercase observatory code
- '%(y)s' year formatted as YYYY
- '%(j)s' julian day formatted as JJJ
See Also
--------
PCDCPParser
"""
def __init__(self, urlTemplate, observatory=None, channels=None, type=None,
interval=None):
TimeseriesFactory.__init__(self, observatory, channels, type, interval)
self.urlTemplate = urlTemplate
def get_timeseries(self, starttime, endtime, observatory=None,
channels=None, type=None, interval=None):
"""Get timeseries data
Parameters
----------
observatory : str
observatory code.
starttime : obspy.core.UTCDateTime
time of first sample.
endtime : obspy.core.UTCDateTime
time of last sample.
type : {'variation', 'quasi-definitive'}
data type.
interval : {'minute', 'second'}
data interval.
Returns
-------
obspy.core.Stream
timeseries object with requested data.
Raises
------
TimeseriesFactoryException
if invalid values are requested, or errors occur while
retrieving timeseries.
"""
observatory = observatory or self.observatory
channels = channels or self.channels
type = type or self.type
interval = interval or self.interval
days = self._get_days(starttime, endtime)
timeseries = obspy.core.Stream()
for day in days:
url = self._get_url(observatory, day, type, interval)
pcdcpFile = read_url(url)
timeseries += self.parse_string(pcdcpFile)
# merge channel traces for multiple days
timeseries.merge()
# trim to requested start/end time
timeseries.trim(starttime, endtime)
return timeseries
def parse_string(self, pcdcpString):
"""Parse the contents of a string in the format of an pcdcp file.
Parameters
----------
pcdcpString : str
string containing PCDCP content.
Returns
-------
obspy.core.Stream
parsed data.
"""
parser = PCDCPParser()
parser.parse(pcdcpString)
metadata = parser.metadata
starttime = obspy.core.UTCDateTime(parser.times[0])
endtime = obspy.core.UTCDateTime(parser.times[-1])
data = parser.data
length = len(data[data.keys()[0]])
rate = (length - 1) / (endtime - starttime)
stream = obspy.core.Stream()
for channel in data.keys():
stats = obspy.core.Stats(metadata)
stats.starttime = starttime
stats.sampling_rate = rate
stats.npts = length
stats.channel = channel
if channel == 'D':
data[channel] = ChannelConverter.get_radians_from_minutes(
data[channel])
stream += obspy.core.Trace(data[channel], stats)
return stream
def _get_url(self, observatory, date, type='variation', interval='minute'):
"""Get the url for a specified PCDCP file.
Replaces patterns (described in class docstring) with values based on
parameter values.
Parameters
----------
observatory : str
observatory code.
date : obspy.core.UTCDateTime
day to fetch (only year, month, day are used)
type : {'variation', 'quasi-definitive'}
data type.
interval : {'minute', 'second'}
data interval.
Raises
------
TimeseriesFactoryException
if type or interval are not supported.
"""
return self.urlTemplate % {
'i': self._get_interval_abbreviation(interval),
'interval': self._get_interval_name(interval),
'obs': observatory.lower(),
'OBS': observatory.upper(),
't': self._get_type_abbreviation(type),
'type': self._get_type_name(type),
'ymd': date.strftime("%Y%m%d")}
def _get_interval_abbreviation(self, interval):
"""Get abbreviation for a data interval.
Used by ``_get_url`` to replace ``%(i)s`` in urlTemplate.
Parameters
----------
interval : {'daily', 'hourly', 'minute', 'monthly', 'second'}
Returns
-------
abbreviation for ``interval``.
Raises
------
TimeseriesFactoryException
if ``interval`` is not supported.
"""
interval_abbr = None
if interval == 'daily':
interval_abbr = 'day'
elif interval == 'hourly':
interval_abbr = 'hor'
elif interval == 'minute':
interval_abbr = 'min'
elif interval == 'monthly':
interval_abbr = 'mon'
elif interval == 'second':
interval_abbr = 'sec'
else:
raise TimeseriesFactoryException(
'Unexpected interval "%s"' % interval)
return interval_abbr
def _get_interval_name(self, interval):
"""Get name for a data interval.
Used by ``_get_url`` to replace ``%(interval)s`` in urlTemplate.
Parameters
----------
interval : {'minute', 'second'}
Returns
-------
name for ``interval``.
Raises
------
TimeseriesFactoryException
if ``interval`` is not supported.
"""
interval_name = None
if interval == 'minute':
interval_name = 'OneMinute'
elif interval == 'second':
interval_name = 'OneSecond'
else:
raise TimeseriesFactoryException(
'Unsupported interval "%s"' % interval)
return interval_name
def _get_type_abbreviation(self, type):
"""Get abbreviation for a data type.
Used by ``_get_url`` to replace ``%(t)s`` in urlTemplate.
Parameters
----------
type : {'definitive', 'provisional', 'quasi-definitive', 'variation'}
Returns
-------
name for ``type``.
Raises
------
TimeseriesFactoryException
if ``type`` is not supported.
"""
type_abbr = None
if type == 'definitive':
type_abbr = 'd'
elif type == 'provisional':
type_abbr = 'p'
elif type == 'quasi-definitive':
type_abbr = 'q'
elif type == 'variation':
type_abbr = 'v'
else:
raise TimeseriesFactoryException(
'Unexpected type "%s"' % type)
return type_abbr
def _get_type_name(self, type):
"""Get name for a data type.
Used by ``_get_url`` to replace ``%(type)s`` in urlTemplate.
Parameters
----------
type : {'variation', 'quasi-definitive'}
Returns
-------
name for ``type``.
Raises
------
TimeseriesFactoryException
if ``type`` is not supported.
"""
type_name = None
if type == 'variation':
type_name = ''
elif type == 'quasi-definitive':
type_name = 'QuasiDefinitive'
else:
raise TimeseriesFactoryException(
'Unsupported type "%s"' % type)
return type_name
def _get_days(self, starttime, endtime):
"""Get days between (inclusive) starttime and endtime.
Parameters
----------
starttime : obspy.core.UTCDateTime
the start time
endtime : obspy.core.UTCDateTime
the end time
Returns
-------
array_like
list of times, one per day, for all days between and including
``starttime`` and ``endtime``.
Raises
------
TimeseriesFactoryException
if starttime is after endtime
"""
if starttime > endtime:
raise TimeseriesFactoryException('starttime must be before endtime')
days = []
day = starttime
lastday = (endtime.year, endtime.month, endtime.day)
while True:
days.append(day)
if lastday == (day.year, day.month, day.day):
break
# move to next day
day = obspy.core.UTCDateTime(day.timestamp + 86400)
return days
def write_file(self, fh, timeseries, channels):
"""writes timeseries data to the given file object.
Parameters
----------
fh: file object
timeseries : obspy.core.Stream
stream containing traces to store.
channels : array_like
list of channels to store
"""
PCDCPWriter().write(fh, timeseries, channels)
def put_timeseries(self, timeseries, starttime=None, endtime=None,
channels=None, type=None, interval=None):
"""Store timeseries data.
Parameters
----------
timeseries : obspy.core.Stream
stream containing traces to store.
starttime : UTCDateTime
time of first sample in timeseries to store.
uses first sample if unspecified.
endtime : UTCDateTime
time of last sample in timeseries to store.
uses last sample if unspecified.
channels : array_like
list of channels to store, optional.
uses default if unspecified.
type : {'definitive', 'provisional', 'quasi-definitive', 'variation'}
data type, optional.
uses default if unspecified.
interval : {'daily', 'hourly', 'minute', 'monthly', 'second'}
data interval, optional.
uses default if unspecified.
"""
if not self.urlTemplate.startswith('file://'):
raise TimeseriesFactoryException('Only file urls are supported')
channels = channels or self.channels
type = type or self.type
interval = interval or self.interval
stats = timeseries[0].stats
observatory = stats.station
starttime = starttime or stats.starttime
endtime = endtime or stats.endtime
days = self._get_days(starttime, endtime)
for day in days:
day_filename = self._get_file_from_url(
self._get_url(observatory, day, type, interval))
day_timeseries = self._get_slice(timeseries, day, interval)
with open(day_filename, 'w') as fh:
self.write_file(fh, day_timeseries, channels)
def _get_file_from_url(self, url):
"""Get a file for writing.
Ensures parent directory exists.
Parameters
----------
url : str
Url path to PCDCP
Returns
-------
str
path to file without file:// prefix
Raises
------
TimeseriesFactoryException
if url does not start with file://
"""
if not url.startswith('file://'):
raise TimeseriesFactoryException(
'Only file urls are supported for writing')
filename = url.replace('file://', '')
parent = os.path.dirname(filename)
if not os.path.exists(parent):
os.makedirs(parent)
return filename
def _get_slice(self, timeseries, day, interval):
"""Get the first and last time for a day
Parameters
----------
timeseries : obspy.core.Stream
timeseries to slice
day : UTCDateTime
time in day to slice
Returns
-------
obspy.core.Stream
sliced stream
"""
day = day.datetime
start = obspy.core.UTCDateTime(day.year, day.month, day.day, 0, 0, 0)
if interval == 'minute':
end = start + 86340.0
else:
end = start + 86399.999999
return timeseries.slice(start, end)
"""Tests for PCDCPFactory."""
from PCDCPFactory import PCDCPFactory
from obspy.core.utcdatetime import UTCDateTime
from nose.tools import assert_equals
def test__get_days():
"""geomagio.pcdcp.PCDCPFactory_test.test__get_days()
Call the _get_days method with starttime and endtime separated by more
than one day.
Verify it returns all days between the given starttime and endtime.
"""
starttime = UTCDateTime('2014-01-01')
endtime = UTCDateTime('2014-01-07')
assert_equals(PCDCPFactory('')._get_days(starttime, endtime), [
UTCDateTime('2014-01-01'),
UTCDateTime('2014-01-02'),
UTCDateTime('2014-01-03'),
UTCDateTime('2014-01-04'),
UTCDateTime('2014-01-05'),
UTCDateTime('2014-01-06'),
UTCDateTime('2014-01-07')])
"""Parsing methods for the PCDCP Format."""
import numpy
from datetime import datetime
# values that represent missing data points in PCDCP
NINES = numpy.float64('99999.99')
class PCDCPParser(object):
"""PCDCP parser.
Attributes
----------
header : dict
parsed PCDCP header.
times : array
parsed timeseries times.
data : dict
keys are channel names (order listed in ``self.channels``).
values are ``numpy.array`` of timeseries values, array values are
``numpy.nan`` when values are missing.
"""
def __init__(self):
"""Create a new PCDCP parser."""
# header fields
self.header = {}
# timestamps of data (datetime.datetime)
self.times = []
# dictionary of data (channel : numpy.array<float64>)
self.data = {}
# temporary storage for data being parsed
self._parsedata = None
def parse(self, data):
"""Parse a string containing PCDCP formatted data.
Parameters
----------
data : str
PCDCP formatted file contents.
"""
parsing_header = True
lines = data.splitlines()
for line in lines:
if parsing_header:
self._parse_header(line)
parsing_header = False
else:
self._parse_data(line)
self._post_process()
def _parse_header(self, line):
"""Parse header line.
Adds value to ``self.header``.
"""
print "Header:", line
self.header['header'] = line
self.header['observatory'] = line[0:3]
self.header['year'] = line[6:10]
self.header['date'] = line[16:25]
return
def _parse_data(self, line):
"""Parse one data point in the timeseries.
Adds time to ``self.times``.
Adds channel values to ``self.data``.
"""
print "Parse data!"
t, d1, d2, d3, d4 = self._parsedata
t.append(line[0:4])
d1.append(line[5:13])
d2.append(line[14:21])
d3.append(line[22:30])
d4.append(line[31:39])
def _post_process(self):
"""Post processing after data is parsed.
Converts data to numpy arrays.
Replaces empty values with ``numpy.nan``.
"""
self.times = self._parsedata[0]
for channel, data in zip(self.channels, self._parsedata[1:]):
data = numpy.array(data, dtype=numpy.float64)
# filter empty values
data[data == NINES] = numpy.nan
self.data[channel] = data
self._parsedata = None
"""Tests for the PCDCP Parser class."""
from nose.tools import assert_equals
from PCDCPParser import PCDCPParser
PCDCP_EXAMPLE = \
"""
BOU 2015 001 01-Jan-15 HEZF 0.01nT File Version 2.00
0000 2086167 -5707 4745737 5237768
0001 2086190 -5664 4745737 5237777
0002 2086213 -5638 4745741 5237787
0003 2086239 -5632 4745739 5237796
0004 2086198 -5626 4745743 5237786
0005 2086228 -5600 4745728 5237784
0006 2086242 -5578 4745725 5237787
0007 2086258 -5552 4745726 5237792
0008 2086278 -5571 4745734 5237808
"""
def test__parse_header():
"""geomagio.pcdcp.PCDCPParser_test.test_parse_header()
Call the _parse_header method with a header.
Verify the header name and value are split at the correct column.
"""
parser = PCDCPParser()
parser._parse_header('BOU 2015 001 01-Jan-15 HEZF 0.01nT' +
' File Version 2.00')
assert_equals(parser.headers['observatory'], 'BOU')
assert_equals(parser.headers['year'], '2015')
assert_equals(parser.headers['date'], '01-Jan-15')
from cStringIO import StringIO
from geomagio import TimeseriesFactoryException, ChannelConverter
import numpy
import PCDCPParser
import textwrap
from datetime import datetime
class PCDCPWriter(object):
"""PCDCP writer.
"""
def __init__(self, empty_value=PCDCPParser.NINES):
self.empty_value = empty_value
def write(self, out, timeseries, channels):
"""write timeseries to pcdcp file
Parameters
----------
out : file object
File object to be written to. Could be stdout.
timeseries : obspy.core.stream
Timeseries object with data to be written.
channels : array_like
Channels to be written from timeseries object
"""
stats = timeseries[0].stats
out.write(self._format_header(stats, channels))
out.write(self._format_data(timeseries, channels))
def _format_header(self, name, value):
"""format headers for PCDCP file
Parameters
----------
name: str
the name to be written
value: str
the value to written.
Returns
-------
str
a string formatted to be a single header line in a PCDCP file
"""
observatory = 'BOU'
year = '2015'
date = '01-Jan-15'
space = ' '
return ''.join(observatory, space, year, space, '001', space,
date, space, 'HEZF 0.01nT File Version 2.00')
def _format_data(self, timeseries, channels):
"""Format all data lines.
Parameters
----------
timeseries : obspy.core.Stream
stream containing traces with channel listed in channels
channels : sequence
list and order of channel values to output.
"""
buf = []
if timeseries.select(channel='D'):
d = timeseries.select(channel='D')
d[0].data = ChannelConverter.get_minutes_from_radians(d[0].data)
traces = [timeseries.select(channel=c)[0] for c in channels]
starttime = float(traces[0].stats.starttime)
delta = traces[0].stats.delta
for i in xrange(len(traces[0].data)):
buf.append(self._format_values(
datetime.utcfromtimestamp(starttime + i * delta),
(t.data[i] for t in traces)))
return ''.join(buf)
def _format_values(self, time, values):
"""Format one line of data values.
Parameters
----------
time : datetime
timestamp for values
values : sequence
list and order of channel values to output.
if value is NaN, self.empty_value is output in its place.
Returns
-------
unicode
Formatted line containing values.
"""
tt = time.timetuple()
return '{0.tm_year:0>4d}-{0.tm_mon:0>2d}-{0.tm_mday:0>2d} ' \
'{0.tm_hour:0>2d}:{0.tm_min:0>2d}:{0.tm_sec:0>2d}.{1:0>3d} ' \
'{0.tm_yday:0>3d} ' \
'{2:10.2f}{3:10.2f}{4:10.2f}{5:10.2f}\n'.format(
tt, int(time.microsecond / 1000),
*[self.empty_value if numpy.isnan(val) else val
for val in values])
@classmethod
def format(self, timeseries, channels):
"""Get an PCDCP formatted string.
Calls write() with a StringIO, and returns the output.
Parameters
----------
timeseries : obspy.core.Stream
Returns
-------
unicode
PCDCP formatted string.
"""
out = StringIO()
writer = PCDCPWriter()
writer.write(out, timeseries, channels)
return out.getvalue()
"""Factory to load PCDCP files from an input StreamPCDCPFactory."""
from PCDCPFactory import PCDCPFactory
class StreamPCDCPFactory(PCDCPFactory):
"""Timeseries Factory for PCDCP formatted files loaded via a stream.
normally either a single file, or stdio.
Parameters
----------
stream: file object
io stream, normally either a file, or stdio
See Also
--------
PCDCPFactory
Timeseriesfactory
"""
def __init__(self, stream, observatory=None, channels=None,
type=None, interval=None):
PCDCPFactory.__init__(self, None, observatory, channels,
type, interval)
self._stream = stream
def get_timeseries(self, starttime, endtime, observatory=None,
channels=None, type=None, interval=None):
"""Implements get_timeseries
Notes: Calls PCDCPFactory.parse_string in place of
PCDCPFactory.get_timeseries.
"""
return PCDCPFactory.parse_string(self, self._stream.read())
def put_timeseries(self, timeseries, starttime=None, endtime=None,
channels=None, type=None, interval=None):
"""Implements put_timeseries
Notes: Calls PCDCPFactory.write_file in place of
PCDCPFactory.get_timeseries. This can result in a
non-standard PCDCP file, specifically one of longer than
expected length.
"""
PCDCPFactory.write_file(self, self._stream, timeseries, channels)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment