diff --git a/geomagio/pcdcp/PCDCPFactory.py b/geomagio/pcdcp/PCDCPFactory.py new file mode 100644 index 0000000000000000000000000000000000000000..af097ac116a0d2f7dd937e6ff829a69efaae3b85 --- /dev/null +++ b/geomagio/pcdcp/PCDCPFactory.py @@ -0,0 +1,445 @@ +"""Factory that loads PCDCP Files.""" + +import urllib2 +import obspy.core +import os +from geomagio import TimeseriesFactory, TimeseriesFactoryException +from PCDCPParser import PCDCPParser +from PCDCPWriter import PCDCPWriter +from geomagio import ChannelConverter + + +# pattern for pcdcp file names +PCDCP_FILE_PATTERN = '%(obs)s%(y)s%(j)s.min' + + +def read_url(url): + """Open and read url contents. + + Parameters + ---------- + url : str + A urllib2 compatible url, such as http:// or file://. + + Returns + ------- + str + contents returned by url. + + Raises + ------ + urllib2.URLError + if any occurs + """ + response = urllib2.urlopen(url) + content = None + try: + content = response.read() + except urllib2.URLError, e: + print e.reason + raise + finally: + response.close() + return content + + +class PCDCPFactory(TimeseriesFactory): + """TimeseriesFactory for PCDCP formatted files. + + Parameters + ---------- + urlTemplate : str + A string that contains any of the following replacement patterns: + - '%(obs)s' lowercase observatory code + - '%(OBS)s' uppercase observatory code + - '%(y)s' year formatted as YYYY + - '%(j)s' julian day formatted as JJJ + + See Also + -------- + PCDCPParser + """ + + def __init__(self, urlTemplate, observatory=None, channels=None, type=None, + interval=None): + TimeseriesFactory.__init__(self, observatory, channels, type, interval) + self.urlTemplate = urlTemplate + + def get_timeseries(self, starttime, endtime, observatory=None, + channels=None, type=None, interval=None): + """Get timeseries data + + Parameters + ---------- + observatory : str + observatory code. + starttime : obspy.core.UTCDateTime + time of first sample. + endtime : obspy.core.UTCDateTime + time of last sample. + type : {'variation', 'quasi-definitive'} + data type. + interval : {'minute', 'second'} + data interval. + + Returns + ------- + obspy.core.Stream + timeseries object with requested data. + + Raises + ------ + TimeseriesFactoryException + if invalid values are requested, or errors occur while + retrieving timeseries. + """ + observatory = observatory or self.observatory + channels = channels or self.channels + type = type or self.type + interval = interval or self.interval + days = self._get_days(starttime, endtime) + timeseries = obspy.core.Stream() + for day in days: + url = self._get_url(observatory, day, type, interval) + pcdcpFile = read_url(url) + timeseries += self.parse_string(pcdcpFile) + # merge channel traces for multiple days + timeseries.merge() + # trim to requested start/end time + timeseries.trim(starttime, endtime) + return timeseries + + def parse_string(self, pcdcpString): + """Parse the contents of a string in the format of an pcdcp file. + + Parameters + ---------- + pcdcpString : str + string containing PCDCP content. + + Returns + ------- + obspy.core.Stream + parsed data. + """ + parser = PCDCPParser() + parser.parse(pcdcpString) + metadata = parser.metadata + starttime = obspy.core.UTCDateTime(parser.times[0]) + endtime = obspy.core.UTCDateTime(parser.times[-1]) + data = parser.data + length = len(data[data.keys()[0]]) + rate = (length - 1) / (endtime - starttime) + stream = obspy.core.Stream() + for channel in data.keys(): + stats = obspy.core.Stats(metadata) + stats.starttime = starttime + stats.sampling_rate = rate + stats.npts = length + stats.channel = channel + if channel == 'D': + data[channel] = ChannelConverter.get_radians_from_minutes( + data[channel]) + stream += obspy.core.Trace(data[channel], stats) + return stream + + def _get_url(self, observatory, date, type='variation', interval='minute'): + """Get the url for a specified PCDCP file. + + Replaces patterns (described in class docstring) with values based on + parameter values. + + Parameters + ---------- + observatory : str + observatory code. + date : obspy.core.UTCDateTime + day to fetch (only year, month, day are used) + type : {'variation', 'quasi-definitive'} + data type. + interval : {'minute', 'second'} + data interval. + + Raises + ------ + TimeseriesFactoryException + if type or interval are not supported. + """ + return self.urlTemplate % { + 'i': self._get_interval_abbreviation(interval), + 'interval': self._get_interval_name(interval), + 'obs': observatory.lower(), + 'OBS': observatory.upper(), + 't': self._get_type_abbreviation(type), + 'type': self._get_type_name(type), + 'ymd': date.strftime("%Y%m%d")} + + def _get_interval_abbreviation(self, interval): + """Get abbreviation for a data interval. + + Used by ``_get_url`` to replace ``%(i)s`` in urlTemplate. + + Parameters + ---------- + interval : {'daily', 'hourly', 'minute', 'monthly', 'second'} + + Returns + ------- + abbreviation for ``interval``. + + Raises + ------ + TimeseriesFactoryException + if ``interval`` is not supported. + """ + interval_abbr = None + if interval == 'daily': + interval_abbr = 'day' + elif interval == 'hourly': + interval_abbr = 'hor' + elif interval == 'minute': + interval_abbr = 'min' + elif interval == 'monthly': + interval_abbr = 'mon' + elif interval == 'second': + interval_abbr = 'sec' + else: + raise TimeseriesFactoryException( + 'Unexpected interval "%s"' % interval) + return interval_abbr + + def _get_interval_name(self, interval): + """Get name for a data interval. + + Used by ``_get_url`` to replace ``%(interval)s`` in urlTemplate. + + Parameters + ---------- + interval : {'minute', 'second'} + + Returns + ------- + name for ``interval``. + + Raises + ------ + TimeseriesFactoryException + if ``interval`` is not supported. + """ + interval_name = None + if interval == 'minute': + interval_name = 'OneMinute' + elif interval == 'second': + interval_name = 'OneSecond' + else: + raise TimeseriesFactoryException( + 'Unsupported interval "%s"' % interval) + return interval_name + + def _get_type_abbreviation(self, type): + """Get abbreviation for a data type. + + Used by ``_get_url`` to replace ``%(t)s`` in urlTemplate. + + Parameters + ---------- + type : {'definitive', 'provisional', 'quasi-definitive', 'variation'} + + Returns + ------- + name for ``type``. + + Raises + ------ + TimeseriesFactoryException + if ``type`` is not supported. + """ + type_abbr = None + if type == 'definitive': + type_abbr = 'd' + elif type == 'provisional': + type_abbr = 'p' + elif type == 'quasi-definitive': + type_abbr = 'q' + elif type == 'variation': + type_abbr = 'v' + else: + raise TimeseriesFactoryException( + 'Unexpected type "%s"' % type) + return type_abbr + + def _get_type_name(self, type): + """Get name for a data type. + + Used by ``_get_url`` to replace ``%(type)s`` in urlTemplate. + + Parameters + ---------- + type : {'variation', 'quasi-definitive'} + + Returns + ------- + name for ``type``. + + Raises + ------ + TimeseriesFactoryException + if ``type`` is not supported. + """ + type_name = None + if type == 'variation': + type_name = '' + elif type == 'quasi-definitive': + type_name = 'QuasiDefinitive' + else: + raise TimeseriesFactoryException( + 'Unsupported type "%s"' % type) + return type_name + + def _get_days(self, starttime, endtime): + """Get days between (inclusive) starttime and endtime. + + Parameters + ---------- + starttime : obspy.core.UTCDateTime + the start time + endtime : obspy.core.UTCDateTime + the end time + + Returns + ------- + array_like + list of times, one per day, for all days between and including + ``starttime`` and ``endtime``. + + Raises + ------ + TimeseriesFactoryException + if starttime is after endtime + """ + if starttime > endtime: + raise TimeseriesFactoryException('starttime must be before endtime') + + days = [] + day = starttime + lastday = (endtime.year, endtime.month, endtime.day) + while True: + days.append(day) + if lastday == (day.year, day.month, day.day): + break + # move to next day + day = obspy.core.UTCDateTime(day.timestamp + 86400) + return days + + def write_file(self, fh, timeseries, channels): + """writes timeseries data to the given file object. + + Parameters + ---------- + fh: file object + timeseries : obspy.core.Stream + stream containing traces to store. + channels : array_like + list of channels to store + """ + PCDCPWriter().write(fh, timeseries, channels) + + def put_timeseries(self, timeseries, starttime=None, endtime=None, + channels=None, type=None, interval=None): + """Store timeseries data. + + Parameters + ---------- + timeseries : obspy.core.Stream + stream containing traces to store. + starttime : UTCDateTime + time of first sample in timeseries to store. + uses first sample if unspecified. + endtime : UTCDateTime + time of last sample in timeseries to store. + uses last sample if unspecified. + channels : array_like + list of channels to store, optional. + uses default if unspecified. + type : {'definitive', 'provisional', 'quasi-definitive', 'variation'} + data type, optional. + uses default if unspecified. + interval : {'daily', 'hourly', 'minute', 'monthly', 'second'} + data interval, optional. + uses default if unspecified. + """ + if not self.urlTemplate.startswith('file://'): + raise TimeseriesFactoryException('Only file urls are supported') + + channels = channels or self.channels + type = type or self.type + interval = interval or self.interval + stats = timeseries[0].stats + observatory = stats.station + starttime = starttime or stats.starttime + endtime = endtime or stats.endtime + days = self._get_days(starttime, endtime) + + for day in days: + day_filename = self._get_file_from_url( + self._get_url(observatory, day, type, interval)) + day_timeseries = self._get_slice(timeseries, day, interval) + with open(day_filename, 'w') as fh: + self.write_file(fh, day_timeseries, channels) + + def _get_file_from_url(self, url): + """Get a file for writing. + + Ensures parent directory exists. + + Parameters + ---------- + url : str + Url path to PCDCP + + Returns + ------- + str + path to file without file:// prefix + + Raises + ------ + TimeseriesFactoryException + if url does not start with file:// + """ + if not url.startswith('file://'): + raise TimeseriesFactoryException( + 'Only file urls are supported for writing') + + filename = url.replace('file://', '') + parent = os.path.dirname(filename) + + if not os.path.exists(parent): + os.makedirs(parent) + + return filename + + def _get_slice(self, timeseries, day, interval): + """Get the first and last time for a day + + Parameters + ---------- + timeseries : obspy.core.Stream + timeseries to slice + day : UTCDateTime + time in day to slice + + Returns + ------- + obspy.core.Stream + sliced stream + """ + day = day.datetime + start = obspy.core.UTCDateTime(day.year, day.month, day.day, 0, 0, 0) + + if interval == 'minute': + end = start + 86340.0 + else: + end = start + 86399.999999 + + return timeseries.slice(start, end) diff --git a/geomagio/pcdcp/PCDCPFactory_test.py b/geomagio/pcdcp/PCDCPFactory_test.py new file mode 100644 index 0000000000000000000000000000000000000000..a028c2c8059fc6900990dc85a1dd51c49118c968 --- /dev/null +++ b/geomagio/pcdcp/PCDCPFactory_test.py @@ -0,0 +1,25 @@ +"""Tests for PCDCPFactory.""" + +from PCDCPFactory import PCDCPFactory +from obspy.core.utcdatetime import UTCDateTime +from nose.tools import assert_equals + + +def test__get_days(): + """geomagio.pcdcp.PCDCPFactory_test.test__get_days() + + Call the _get_days method with starttime and endtime separated by more + than one day. + Verify it returns all days between the given starttime and endtime. + """ + starttime = UTCDateTime('2014-01-01') + endtime = UTCDateTime('2014-01-07') + + assert_equals(PCDCPFactory('')._get_days(starttime, endtime), [ + UTCDateTime('2014-01-01'), + UTCDateTime('2014-01-02'), + UTCDateTime('2014-01-03'), + UTCDateTime('2014-01-04'), + UTCDateTime('2014-01-05'), + UTCDateTime('2014-01-06'), + UTCDateTime('2014-01-07')]) diff --git a/geomagio/pcdcp/PCDCPParser.py b/geomagio/pcdcp/PCDCPParser.py new file mode 100644 index 0000000000000000000000000000000000000000..d05fbe077232d7f019cfb1bd231d3b9558e61f76 --- /dev/null +++ b/geomagio/pcdcp/PCDCPParser.py @@ -0,0 +1,93 @@ +"""Parsing methods for the PCDCP Format.""" + + +import numpy +from datetime import datetime + +# values that represent missing data points in PCDCP +NINES = numpy.float64('99999.99') + + +class PCDCPParser(object): + """PCDCP parser. + + Attributes + ---------- + header : dict + parsed PCDCP header. + times : array + parsed timeseries times. + data : dict + keys are channel names (order listed in ``self.channels``). + values are ``numpy.array`` of timeseries values, array values are + ``numpy.nan`` when values are missing. + """ + + def __init__(self): + """Create a new PCDCP parser.""" + # header fields + self.header = {} + # timestamps of data (datetime.datetime) + self.times = [] + # dictionary of data (channel : numpy.array<float64>) + self.data = {} + # temporary storage for data being parsed + self._parsedata = None + + def parse(self, data): + """Parse a string containing PCDCP formatted data. + + Parameters + ---------- + data : str + PCDCP formatted file contents. + """ + parsing_header = True + lines = data.splitlines() + for line in lines: + if parsing_header: + self._parse_header(line) + parsing_header = False + else: + self._parse_data(line) + self._post_process() + + def _parse_header(self, line): + """Parse header line. + + Adds value to ``self.header``. + """ + print "Header:", line + self.header['header'] = line + self.header['observatory'] = line[0:3] + self.header['year'] = line[6:10] + self.header['date'] = line[16:25] + return + + def _parse_data(self, line): + """Parse one data point in the timeseries. + + Adds time to ``self.times``. + Adds channel values to ``self.data``. + """ + print "Parse data!" + t, d1, d2, d3, d4 = self._parsedata + t.append(line[0:4]) + d1.append(line[5:13]) + d2.append(line[14:21]) + d3.append(line[22:30]) + d4.append(line[31:39]) + + def _post_process(self): + """Post processing after data is parsed. + + Converts data to numpy arrays. + Replaces empty values with ``numpy.nan``. + """ + self.times = self._parsedata[0] + for channel, data in zip(self.channels, self._parsedata[1:]): + data = numpy.array(data, dtype=numpy.float64) + # filter empty values + data[data == NINES] = numpy.nan + self.data[channel] = data + self._parsedata = None diff --git a/geomagio/pcdcp/PCDCPParser_test.py b/geomagio/pcdcp/PCDCPParser_test.py new file mode 100644 index 0000000000000000000000000000000000000000..c278443a5657160bb84ba7400fd1aa496e23a2ec --- /dev/null +++ b/geomagio/pcdcp/PCDCPParser_test.py @@ -0,0 +1,34 @@ +"""Tests for the PCDCP Parser class.""" + +from nose.tools import assert_equals +from PCDCPParser import PCDCPParser + + +PCDCP_EXAMPLE = \ +""" +BOU 2015 001 01-Jan-15 HEZF 0.01nT File Version 2.00 +0000 2086167 -5707 4745737 5237768 +0001 2086190 -5664 4745737 5237777 +0002 2086213 -5638 4745741 5237787 +0003 2086239 -5632 4745739 5237796 +0004 2086198 -5626 4745743 5237786 +0005 2086228 -5600 4745728 5237784 +0006 2086242 -5578 4745725 5237787 +0007 2086258 -5552 4745726 5237792 +0008 2086278 -5571 4745734 5237808 +""" + + +def test__parse_header(): + """geomagio.pcdcp.PCDCPParser_test.test_parse_header() + + Call the _parse_header method with a header. + Verify the header name and value are split at the correct column. + """ + parser = PCDCPParser() + parser._parse_header('BOU 2015 001 01-Jan-15 HEZF 0.01nT' + + ' File Version 2.00') + + assert_equals(parser.headers['observatory'], 'BOU') + assert_equals(parser.headers['year'], '2015') + assert_equals(parser.headers['date'], '01-Jan-15') diff --git a/geomagio/pcdcp/PCDCPWriter.py b/geomagio/pcdcp/PCDCPWriter.py new file mode 100644 index 0000000000000000000000000000000000000000..10fa0285582bf2b61afd93ca7f7fa132d4310dd1 --- /dev/null +++ b/geomagio/pcdcp/PCDCPWriter.py @@ -0,0 +1,125 @@ + +from cStringIO import StringIO +from geomagio import TimeseriesFactoryException, ChannelConverter +import numpy +import PCDCPParser +import textwrap +from datetime import datetime + + +class PCDCPWriter(object): + """PCDCP writer. + """ + + def __init__(self, empty_value=PCDCPParser.NINES): + self.empty_value = empty_value + + def write(self, out, timeseries, channels): + """write timeseries to pcdcp file + + Parameters + ---------- + out : file object + File object to be written to. Could be stdout. + timeseries : obspy.core.stream + Timeseries object with data to be written. + channels : array_like + Channels to be written from timeseries object + """ + stats = timeseries[0].stats + out.write(self._format_header(stats, channels)) + out.write(self._format_data(timeseries, channels)) + + def _format_header(self, name, value): + """format headers for PCDCP file + + Parameters + ---------- + name: str + the name to be written + value: str + the value to written. + + Returns + ------- + str + a string formatted to be a single header line in a PCDCP file + """ + observatory = 'BOU' + year = '2015' + date = '01-Jan-15' + space = ' ' + + return ''.join(observatory, space, year, space, '001', space, + date, space, 'HEZF 0.01nT File Version 2.00') + + def _format_data(self, timeseries, channels): + """Format all data lines. + + Parameters + ---------- + timeseries : obspy.core.Stream + stream containing traces with channel listed in channels + channels : sequence + list and order of channel values to output. + """ + buf = [] + if timeseries.select(channel='D'): + d = timeseries.select(channel='D') + d[0].data = ChannelConverter.get_minutes_from_radians(d[0].data) + + traces = [timeseries.select(channel=c)[0] for c in channels] + starttime = float(traces[0].stats.starttime) + delta = traces[0].stats.delta + + for i in xrange(len(traces[0].data)): + buf.append(self._format_values( + datetime.utcfromtimestamp(starttime + i * delta), + (t.data[i] for t in traces))) + + return ''.join(buf) + + def _format_values(self, time, values): + """Format one line of data values. + + Parameters + ---------- + time : datetime + timestamp for values + values : sequence + list and order of channel values to output. + if value is NaN, self.empty_value is output in its place. + + Returns + ------- + unicode + Formatted line containing values. + """ + tt = time.timetuple() + return '{0.tm_year:0>4d}-{0.tm_mon:0>2d}-{0.tm_mday:0>2d} ' \ + '{0.tm_hour:0>2d}:{0.tm_min:0>2d}:{0.tm_sec:0>2d}.{1:0>3d} ' \ + '{0.tm_yday:0>3d} ' \ + '{2:10.2f}{3:10.2f}{4:10.2f}{5:10.2f}\n'.format( + tt, int(time.microsecond / 1000), + *[self.empty_value if numpy.isnan(val) else val + for val in values]) + + @classmethod + def format(self, timeseries, channels): + """Get an PCDCP formatted string. + + Calls write() with a StringIO, and returns the output. + + Parameters + ---------- + timeseries : obspy.core.Stream + + Returns + ------- + unicode + PCDCP formatted string. + """ + out = StringIO() + writer = PCDCPWriter() + writer.write(out, timeseries, channels) + return out.getvalue() diff --git a/geomagio/pcdcp/StreamPCDCPFactory.py b/geomagio/pcdcp/StreamPCDCPFactory.py new file mode 100644 index 0000000000000000000000000000000000000000..2204fb47303e6dfcc2575f810abeb731b2114446 --- /dev/null +++ b/geomagio/pcdcp/StreamPCDCPFactory.py @@ -0,0 +1,44 @@ +"""Factory to load PCDCP files from an input StreamPCDCPFactory.""" + +from PCDCPFactory import PCDCPFactory + + +class StreamPCDCPFactory(PCDCPFactory): + """Timeseries Factory for PCDCP formatted files loaded via a stream. + normally either a single file, or stdio. + + Parameters + ---------- + stream: file object + io stream, normally either a file, or stdio + + See Also + -------- + PCDCPFactory + Timeseriesfactory + """ + def __init__(self, stream, observatory=None, channels=None, + type=None, interval=None): + PCDCPFactory.__init__(self, None, observatory, channels, + type, interval) + self._stream = stream + + def get_timeseries(self, starttime, endtime, observatory=None, + channels=None, type=None, interval=None): + """Implements get_timeseries + + Notes: Calls PCDCPFactory.parse_string in place of + PCDCPFactory.get_timeseries. + """ + return PCDCPFactory.parse_string(self, self._stream.read()) + + def put_timeseries(self, timeseries, starttime=None, endtime=None, + channels=None, type=None, interval=None): + """Implements put_timeseries + + Notes: Calls PCDCPFactory.write_file in place of + PCDCPFactory.get_timeseries. This can result in a + non-standard PCDCP file, specifically one of longer than + expected length. + """ + PCDCPFactory.write_file(self, self._stream, timeseries, channels)