From d8d533339a26772b87fb63a7e86474cf4d238008 Mon Sep 17 00:00:00 2001 From: Duff Stewart <dcstewart@usgs.gov> Date: Wed, 27 Apr 2016 13:57:17 -0600 Subject: [PATCH] added temp, vbf factories and custom chan.loc in edge factory --- geomagio/Controller.py | 37 +++ geomagio/edge/EdgeFactory.py | 14 + geomagio/pcdcp/PCDCPWriter.py | 57 +++- geomagio/temperature/StreamTEMPFactory.py | 44 +++ geomagio/temperature/TEMPFactory.py | 261 ++++++++++++++++ geomagio/temperature/TEMPParser.py | 115 +++++++ geomagio/temperature/TEMPWriter.py | 151 +++++++++ geomagio/temperature/__init__.py | 15 + geomagio/vbf/StreamVBFFactory.py | 47 +++ geomagio/vbf/VBFFactory.py | 269 ++++++++++++++++ geomagio/vbf/VBFParser.py | 115 +++++++ geomagio/vbf/VBFWriter.py | 360 ++++++++++++++++++++++ geomagio/vbf/__init__.py | 15 + 13 files changed, 1490 insertions(+), 10 deletions(-) create mode 100644 geomagio/temperature/StreamTEMPFactory.py create mode 100644 geomagio/temperature/TEMPFactory.py create mode 100644 geomagio/temperature/TEMPParser.py create mode 100644 geomagio/temperature/TEMPWriter.py create mode 100644 geomagio/temperature/__init__.py create mode 100644 geomagio/vbf/StreamVBFFactory.py create mode 100644 geomagio/vbf/VBFFactory.py create mode 100644 geomagio/vbf/VBFParser.py create mode 100644 geomagio/vbf/VBFWriter.py create mode 100644 geomagio/vbf/__init__.py diff --git a/geomagio/Controller.py b/geomagio/Controller.py index 41107d856..177f61b32 100644 --- a/geomagio/Controller.py +++ b/geomagio/Controller.py @@ -13,6 +13,10 @@ import iaga2002 import pcdcp import imfv283 +# DCS 20160326 -- factories for new filetypes +import temperature +import vbf + class Controller(object): """Controller for geomag algorithms. @@ -374,6 +378,30 @@ def main(args): locationCode=locationcode, tag=args.output_edge_tag, forceout=args.output_edge_forceout) + + # DCS 20160326 -- new factories + elif args.output_temperature_file is not None: + outputfactory = temperature.StreamTEMPFactory( + stream=open(args.output_temperature_file, 'wb'), + observatory=args.observatory, + type=args.type, + interval=args.interval) + # DCS 20160401 -- param list for StreamVBF includes flag for vbf vs binlog + elif args.output_vbf_file is not None: + outputfactory = vbf.StreamVBFFactory( + stream=open(args.output_vbf_file, 'wb'), + observatory=args.observatory, + type=args.type, + interval=args.interval, + output = 'vbf') + elif args.output_binlog_file is not None: + outputfactory = vbf.StreamVBFFactory( + stream=open(args.output_binlog_file, 'wb'), + observatory=args.observatory, + type=args.type, + interval=args.interval, + output = 'binlog') + elif args.output_plot: outputfactory = PlotTimeseriesFactory() else: @@ -556,6 +584,15 @@ def parse_args(args): # Output group output_group = parser.add_mutually_exclusive_group(required=True) + + # DCS -- options for new output factories + output_group.add_argument('--output-temperature-file', + help='Write to a single temperature/battery file.') + output_group.add_argument('--output-vbf-file', + help='Write to a single voltage-bin file.') + output_group.add_argument('--output-binlog-file', + help='Write to a single bin-change log file.') + output_group.add_argument('--output-iaga-file', help='Write to a single iaga file.') output_group.add_argument('--output-iaga-stdout', diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index 7f9c1cdaa..12f5a5af0 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -353,6 +353,13 @@ class EdgeFactory(TimeseriesFactory): """ edge_interval_code = self._get_interval_code(interval) edge_channel = None + + # DCS 20160403 -- if form is chan.loc, return chan portion + # Allows specific chan/loc selection in geomag.py + if channel.find('.') >= 0: + tmplist = channel.split('.') + return tmplist[0].strip() + if channel == 'D': edge_channel = edge_interval_code + 'VD' elif channel == 'E': @@ -396,6 +403,13 @@ class EdgeFactory(TimeseriesFactory): returns an edge location code """ location = None + + # DCS 20160403 -- if form is chan.loc, return loc portion + # Allows specific chan/loc selection in geomag.py + if channel.find('.') >= 0: + tmplist = channel.split('.') + return tmplist[1].strip() + if self.locationCode is not None: location = self.locationCode else: diff --git a/geomagio/pcdcp/PCDCPWriter.py b/geomagio/pcdcp/PCDCPWriter.py index 3042df76e..4080fa50e 100644 --- a/geomagio/pcdcp/PCDCPWriter.py +++ b/geomagio/pcdcp/PCDCPWriter.py @@ -33,8 +33,16 @@ class PCDCPWriter(object): 'Missing channel "%s" for output, available channels %s' % (channel, str(TimeseriesUtility.get_channels(timeseries)))) stats = timeseries[0].stats + + # DCS 20160328 -- set dead val for 1-sec data + # won't work if input is IAGA2002: stats missing data_interval + if stats.data_interval == "second": + self.empty_value = PCDCPParser.NINES_RAW + out.write(self._format_header(stats)) - out.write(self._format_data(timeseries, channels)) + + # DCS 20160325 -- pass stats to _format_data + out.write(self._format_data(timeseries, channels, stats)) def _format_header(self, stats): """format headers for PCDCP file @@ -56,12 +64,20 @@ class PCDCPWriter(object): yearday = str(stats.starttime.julday).zfill(3) date = stats.starttime.strftime("%d-%b-%y") + # DCS 20160325 -- 1-sec vs 1-min headers + resolution = "0.01nT" + # won't work if input is IAGA2002: stats missing data_interval + if stats.data_interval == "second": + resolution = "0.001nT" + buf.append(observatory + ' ' + year + ' ' + yearday + ' ' + - date + ' HEZF 0.01nT File Version 2.00\n') + date + ' HEZF ' + resolution + ' File Version 2.00\n') + return ''.join(buf) - def _format_data(self, timeseries, channels): + # DCS 20160325 -- include stats in parms list + def _format_data(self, timeseries, channels, stats): """Format all data lines. Parameters @@ -98,14 +114,17 @@ class PCDCPWriter(object): starttime = float(traces[0].stats.starttime) delta = traces[0].stats.delta + # DCS 20160325 -- pass stats to _format_values for i in xrange(len(traces[0].data)): buf.append(self._format_values( datetime.utcfromtimestamp(starttime + i * delta), - (t.data[i] for t in traces))) + (t.data[i] for t in traces), stats + )) return ''.join(buf) - def _format_values(self, time, values): + # DCS 20160325 -- include stats in parms list + def _format_values(self, time, values, stats): """Format one line of data values. Parameters @@ -121,14 +140,32 @@ class PCDCPWriter(object): unicode Formatted line containing values. """ + # DCS 20160325 -- 1-sec and 1-min data have different formats + # won't work if input is IAGA2002: stats missing data_interval + time_width = 4 + data_width = 8 + data_multiplier = 100 + hr_multiplier = 60 + mn_multiplier = 1 + sc_multiplier = 0 + if stats.data_interval == "second": + time_width = 5 + data_width = 9 + data_multiplier = 1000 + hr_multiplier = 3600 + mn_multiplier = 60 + sc_multiplier = 1 + tt = time.timetuple() - totalMinutes = int(tt.tm_hour * 60 + tt.tm_min) - return '{0:0>4d} {2: >8d} {3: >8d} {4: >8d} {5: >8d}\n'.format( - totalMinutes, int(time.microsecond / 1000), + totalMinutes = int(tt.tm_hour * hr_multiplier + \ + tt.tm_min * mn_multiplier + tt.tm_sec * sc_multiplier) + + return '{0:0>{tw}d} {1: >{dw}d} {2: >{dw}d} {3: >{dw}d} {4: >{dw}d}\n'.\ + format( totalMinutes, tw=time_width, *[self.empty_value if numpy.isnan(val) else int(round( - val * 100)) - for val in values]) + val * data_multiplier)) + for val in values], dw=data_width) @classmethod def format(self, timeseries, channels): diff --git a/geomagio/temperature/StreamTEMPFactory.py b/geomagio/temperature/StreamTEMPFactory.py new file mode 100644 index 000000000..c99b0ae9e --- /dev/null +++ b/geomagio/temperature/StreamTEMPFactory.py @@ -0,0 +1,44 @@ +"""Factory to load temp/volt files from an input StreamTEMPFactory.""" + +from TEMPFactory import TEMPFactory + + +class StreamTEMPFactory(TEMPFactory): + """Timeseries Factory for temp/volt formatted files loaded via a stream. + normally either a single file, or stdio. + + Parameters + ---------- + stream: file object + io stream, normally either a file, or stdio + + See Also + -------- + TEMPFactory + Timeseriesfactory + """ + def __init__(self, stream, observatory=None, channels=None, + type=None, interval=None): + TEMPFactory.__init__(self, None, observatory, channels, + type, interval) + self._stream = stream + + def get_timeseries(self, starttime, endtime, observatory=None, + channels=None, type=None, interval=None): + """Implements get_timeseries + + Notes: Calls TEMPFactory.parse_string in place of + TEMPFactory.get_timeseries. + """ + return TEMPFactory.parse_string(self, self._stream.read()) + + def put_timeseries(self, timeseries, starttime=None, endtime=None, + channels=None, type=None, interval=None): + """Implements put_timeseries + + Notes: Calls TEMPFactory.write_file in place of + TEMPFactory.get_timeseries. This can result in a + non-standard TEMP file, specifically one of longer than + expected length. + """ + TEMPFactory.write_file(self, self._stream, timeseries, channels) diff --git a/geomagio/temperature/TEMPFactory.py b/geomagio/temperature/TEMPFactory.py new file mode 100644 index 000000000..52b4071de --- /dev/null +++ b/geomagio/temperature/TEMPFactory.py @@ -0,0 +1,261 @@ +"""Factory that loads temp/volt Files.""" + +import obspy.core +from .. import ChannelConverter +from ..TimeseriesFactory import TimeseriesFactory +from ..TimeseriesFactoryException import TimeseriesFactoryException +from ..Util import read_url +from TEMPParser import TEMPParser +from TEMPWriter import TEMPWriter + + +# pattern for temp file names +TEMP_FILE_PATTERN = '%(obs)s%(y)s%(j)s.%(i)s' + + +class TEMPFactory(TimeseriesFactory): + """TimeseriesFactory for temp/volt formatted files. + + Parameters + ---------- + urlTemplate : str + A string that contains any of the following replacement patterns: + - '%(i)s' : interval abbreviation + - '%(interval)s' interval name + - '%(julian)s' julian day formatted as JJJ + - '%(obs)s' lowercase observatory code + - '%(OBS)s' uppercase observatory code + - '%(t)s' type abbreviation + - '%(type)s' type name + - '%(year)s' year formatted as YYYY + - '%(ymd)s' time formatted as YYYYMMDD + + See Also + -------- + TEMPParser + """ + + def __init__(self, urlTemplate, observatory=None, channels=None, type=None, + interval=None): + TimeseriesFactory.__init__(self, observatory, channels, type, + interval, urlTemplate) + + def get_timeseries(self, starttime, endtime, observatory=None, + channels=None, type=None, interval=None): + """Get timeseries data + + Parameters + ---------- + observatory : str + 3-letter observatory code. + starttime : obspy.core.UTCDateTime + Time of first sample. + endtime : obspy.core.UTCDateTime + Time of last sample. + type : {'variation', 'quasi-definitive'} + Data type. + interval : {'minute', 'second'} + Data interval. + + Returns + ------- + obspy.core.Stream + timeseries object with requested data. + + Raises + ------ + TimeseriesFactoryException + If invalid values are requested, or errors occur while + retrieving timeseries. + """ + observatory = observatory or self.observatory + channels = channels or self.channels + type = type or self.type + interval = interval or self.interval + days = self._get_days(starttime, endtime) + timeseries = obspy.core.Stream() + for day in days: + url_id = self._get_url(observatory, day, type, interval) + tempFile = read_url(url_id) + timeseries += self.parse_string(tempFile) + + # merge channel traces for multiple days + timeseries.merge() + + # trim to requested start/end time + timeseries.trim(starttime, endtime) + + return timeseries + + def parse_string(self, tempString): + """Parse the contents of a string in the format of a temp/volt file. + + Parameters + ---------- + tempString : str + String containing TEMP content. + + Returns + ------- + obspy.core.Stream + Parsed data. + """ + parser = TEMPParser() + parser.parse(tempString) + + year = parser.header['year'] + yearday = parser.header['yearday'] + + begin = int(parser.times[0]) + startHour = str(int(begin / 60.0)) + startMinute = str(int(begin % 60.0)) + ending = int(parser.times[-1]) + endHour = str(int(ending / 60.0)) + endMinute = str(int(ending % 60.0)) + + start = year + yearday + "T" + startHour + ":" + \ + startMinute + ":" + "00.0" + end = year + yearday + "T" + endHour + ":" + endMinute + ":" + "00.0" + + starttime = obspy.core.UTCDateTime(start) + endtime = obspy.core.UTCDateTime(end) + + data = parser.data + length = len(data[data.keys()[0]]) + rate = (length - 1) / (endtime - starttime) + stream = obspy.core.Stream() + + for channel in data.keys(): + stats = obspy.core.Stats() + stats.network = 'NT' + stats.station = parser.header['station'] + stats.starttime = starttime + stats.sampling_rate = rate + stats.npts = length + stats.channel = channel + + if channel == 'D': + data[channel] = ChannelConverter.get_radians_from_minutes( + data[channel]) + + stream += obspy.core.Trace(data[channel], stats) + + return stream + + def _get_days(self, starttime, endtime): + """Get days between (inclusive) starttime and endtime. + + Parameters + ---------- + starttime : obspy.core.UTCDateTime + the start time + endtime : obspy.core.UTCDateTime + the end time + + Returns + ------- + array_like + list of times, one per day, for all days between and including + ``starttime`` and ``endtime``. + + Raises + ------ + TimeseriesFactoryException + if starttime is after endtime + """ + if starttime > endtime: + raise TimeseriesFactoryException( + 'starttime must be before endtime.') + + days = [] + day = starttime + lastday = (endtime.year, endtime.month, endtime.day) + while True: + days.append(day) + if lastday == (day.year, day.month, day.day): + break + # move to next day + day = obspy.core.UTCDateTime(day.timestamp + 86400) + return days + + def write_file(self, fh, timeseries, channels): + """writes timeseries data to the given file object. + + Parameters + ---------- + fh: file object + timeseries : obspy.core.Stream + stream containing traces to store. + channels : array_like + list of channels to store + """ + TEMPWriter().write(fh, timeseries, channels) + + def put_timeseries(self, timeseries, starttime=None, endtime=None, + channels=None, type=None, interval=None): + """Store timeseries data. + + Parameters + ---------- + timeseries : obspy.core.Stream + stream containing traces to store. + starttime : UTCDateTime + time of first sample in timeseries to store. + uses first sample if unspecified. + endtime : UTCDateTime + time of last sample in timeseries to store. + uses last sample if unspecified. + channels : array_like + list of channels to store, optional. + uses default if unspecified. + type : {'definitive', 'provisional', 'quasi-definitive', 'variation'} + data type, optional. + uses default if unspecified. + interval : {'daily', 'hourly', 'minute', 'monthly', 'second'} + data interval, optional. + uses default if unspecified. + """ + print self.urlTemplate + if not self.urlTemplate.startswith('file://'): + raise TimeseriesFactoryException('Only file urls are supported') + + channels = channels or self.channels + type = type or self.type + interval = interval or self.interval + stats = timeseries[0].stats + observatory = stats.station + starttime = starttime or stats.starttime + endtime = endtime or stats.endtime + days = self._get_days(starttime, endtime) + for day in days: + day_filename = self._get_file_from_url( + self._get_url(observatory, day, type, interval)) + + day_timeseries = self._get_slice(timeseries, day, interval) + with open(day_filename, 'wb') as fh: + self.write_file(fh, day_timeseries, channels) + + def _get_slice(self, timeseries, day, interval): + """Get the first and last time for a day + + Parameters + ---------- + timeseries : obspy.core.Stream + timeseries to slice + day : UTCDateTime + time in day to slice + + Returns + ------- + obspy.core.Stream + sliced stream + """ + day = day.datetime + start = obspy.core.UTCDateTime(day.year, day.month, day.day, 0, 0, 0) + + if interval == 'minute': + end = start + 86340.0 + else: + end = start + 86399.999999 + + return timeseries.slice(start, end) diff --git a/geomagio/temperature/TEMPParser.py b/geomagio/temperature/TEMPParser.py new file mode 100644 index 000000000..2e66a87ec --- /dev/null +++ b/geomagio/temperature/TEMPParser.py @@ -0,0 +1,115 @@ +"""Parsing methods for the temp/volt Format.""" + + +import numpy + +# values that represent missing data points in TEMP +NINES = numpy.int('9999999') +NINES_RAW = numpy.int('99999990') +NINES_DEG = numpy.int('9999') + + +class TEMPParser(object): + """TEMP parser. + + Attributes + ---------- + header : dict + parsed TEMP header. + channels : array + parsed channel names. + times : array + parsed timeseries times. + data : dict + keys are channel names (order listed in ``self.channels``). + values are ``numpy.array`` of timeseries values, array values are + ``numpy.nan`` when values are missing. + """ + + def __init__(self): + """Create a new TEMP parser.""" + # header fields + self.header = {} + # array of channel names + self.channels = [] + # timestamps of data (datetime.datetime) + self.times = [] + # dictionary of data (channel : numpy.array<float64>) + self.data = {} + # temporary storage for data being parsed + self._parsedata = None + + def parse(self, data): + """Parse a string containing TEMP formatted data. + + Parameters + ---------- + data : str + temp/volt formatted file contents. + """ + self._set_channels() + + parsing_header = True + lines = data.splitlines() + for line in lines: + if parsing_header: + self._parse_header(line) + parsing_header = False + else: + self._parse_data(line) + self._post_process() + + def _parse_header(self, line): + """Parse header line. + + Adds value to ``self.header``. + """ + self.header['header'] = line + self.header['station'] = line[0:3] + self.header['year'] = line[5:9] + self.header['yearday'] = line[11:14] + self.header['date'] = line[16:25] + + return + + def _parse_data(self, line): + """Parse one data point in the timeseries. + + Adds time to ``self.times``. + Adds channel values to ``self.data``. + """ + t, d1, d2, d3, d4 = self._parsedata + + t.append(line[0:4]) + d1.append(int(line[5:13])) + d2.append(int(line[14:22])) + d3.append(int(line[23:31])) + d4.append(int(line[32:40])) + + def _post_process(self): + """Post processing after data is parsed. + + Converts data to numpy arrays. + Replaces empty values with ``numpy.nan``. + """ + self.times = self._parsedata[0] + + for channel, data in zip(self.channels, self._parsedata[1:]): + data = numpy.array(data, dtype=numpy.float64) + # filter empty values + data[data == NINES] = numpy.nan + data = numpy.divide(data, 100) + self.data[channel] = data + + self._parsedata = None + + def _set_channels(self): + """Adds channel names to ``self.channels``. + Creates empty values arrays in ``self.data``. + """ + self.channels.append('H') + self.channels.append('E') + self.channels.append('Z') + self.channels.append('F') + + self._parsedata = ([], [], [], [], []) diff --git a/geomagio/temperature/TEMPWriter.py b/geomagio/temperature/TEMPWriter.py new file mode 100644 index 000000000..f5280f9e9 --- /dev/null +++ b/geomagio/temperature/TEMPWriter.py @@ -0,0 +1,151 @@ + +import numpy +import TEMPParser +from cStringIO import StringIO +from datetime import datetime +from .. import ChannelConverter, TimeseriesUtility +from ..TimeseriesFactoryException import TimeseriesFactoryException +from obspy.core import Stream + + +class TEMPWriter(object): + """TEMP writer. + """ + + def __init__(self, empty_value=TEMPParser.NINES_DEG): + self.empty_value = empty_value + + def write(self, out, timeseries, channels): + """Write timeseries to temp/volt file. + + Parameters + ---------- + out : file object + File object to be written to. Could be stdout. + timeseries : obspy.core.stream + Timeseries object with data to be written. + channels : array_like + Channels to be written from timeseries object. + """ + for channel in channels: + if timeseries.select(channel=channel).count() == 0: + raise TimeseriesFactoryException( + 'Missing channel "%s" for output, available channels %s' % + (channel, str(TimeseriesUtility.get_channels(timeseries)))) + stats = timeseries[0].stats + out.write(self._format_header(stats)) + out.write(self._format_data(timeseries, channels)) + + def _format_header(self, stats): + """format headers for temp/volt file + + Parameters + ---------- + stats : List + An object with the header values to be written. + + Returns + ------- + str + A string formatted to be a single header line in a temp/volt file. + """ + buf = [] + + observatory = stats.station + year = str(stats.starttime.year) + yearday = str(stats.starttime.julday).zfill(3) + date = stats.starttime.strftime("%d-%b-%y") + + buf.append(observatory + ' ' + year + ' ' + yearday + ' ' + date + + ' T1 T2 T3 T4 V1 Deg-C*10/volts*10 File Version 1.00\n') + + return ''.join(buf) + + def _format_data(self, timeseries, channels): + """Format all data lines. + + Parameters + ---------- + timeseries : obspy.core.Stream + Stream containing traces with channel listed in channels + channels : sequence + List and order of channel values to output. + + Returns + ------- + str + A string formatted to be the data lines in a temp/volt file. + """ + buf = [] + + # create new stream + timeseriesLocal = Stream() + # Use a copy of the trace so that we don't modify the original. + for trace in timeseries: + traceLocal = trace.copy() + + # TODO - we should look into multiplying the trace all at once + # like this, but this gives an error on Windows at the moment. + # traceLocal.data = \ + # numpy.round(numpy.multiply(traceLocal.data, 100)).astype(int) + + timeseriesLocal.append(traceLocal) + + traces = [timeseriesLocal.select(channel=c)[0] for c in channels] + starttime = float(traces[0].stats.starttime) + delta = traces[0].stats.delta + + for i in xrange(len(traces[0].data)): + buf.append(self._format_values( + datetime.utcfromtimestamp(starttime + i * delta), + (t.data[i] for t in traces))) + + return ''.join(buf) + + def _format_values(self, time, values): + """Format one line of data values. + + Parameters + ---------- + time : datetime + Timestamp for values. + values : sequence + List and order of channel values to output. + If value is NaN, self.empty_value is output in its place. + + Returns + ------- + unicode + Formatted line containing values. + """ + tt = time.timetuple() + totalMinutes = int(tt.tm_hour * 60 + tt.tm_min) + + return '{0:0>4d} {1: >5d} {2: >5d} {3: >5d} {4: >5d} {5: >5d}\n'.format( + totalMinutes, + *[self.empty_value if numpy.isnan(val) else int(round( + val * 10)) + for val in values]) + + @classmethod + def format(self, timeseries, channels): + """Get an temp/volt formatted string. + + Calls write() with a StringIO, and returns the output. + + Parameters + ---------- + timeseries : obspy.core.Stream + Stream containing traces with channel listed in channels + channels : sequence + List and order of channel values to output. + + Returns + ------- + unicode + temp/volt formatted string. + """ + out = StringIO() + writer = TEMPWriter() + writer.write(out, timeseries, channels) + return out.getvalue() diff --git a/geomagio/temperature/__init__.py b/geomagio/temperature/__init__.py new file mode 100644 index 000000000..2d5067b57 --- /dev/null +++ b/geomagio/temperature/__init__.py @@ -0,0 +1,15 @@ +"""IO Module for TEMP Format +""" + +from TEMPFactory import TEMPFactory +from StreamTEMPFactory import StreamTEMPFactory +from TEMPParser import TEMPParser +from TEMPWriter import TEMPWriter + + +__all__ = [ + 'TEMPFactory', + 'StreamTEMPFactory', + 'TEMPParser', + 'TEMPWriter' +] diff --git a/geomagio/vbf/StreamVBFFactory.py b/geomagio/vbf/StreamVBFFactory.py new file mode 100644 index 000000000..ebe4b73a1 --- /dev/null +++ b/geomagio/vbf/StreamVBFFactory.py @@ -0,0 +1,47 @@ +"""Factory to load VBF files from an input StreamVBFFactory.""" + +from VBFFactory import VBFFactory + + +class StreamVBFFactory(VBFFactory): + """Timeseries Factory for VBF formatted files loaded via a stream. + normally either a single file, or stdio. + + Parameters + ---------- + stream: file object + io stream, normally either a file, or stdio + + See Also + -------- + VBFFactory + Timeseriesfactory + """ + + # DCS 20160401 -- 'output' flag added to parm list. + # If 'vbf' then write a vbf file, if 'binlog' then make a bin change log + def __init__(self, stream, observatory=None, channels=None, + type=None, interval=None, output='vbf'): + VBFFactory.__init__(self, None, observatory, channels, + type, interval, output) + self._stream = stream + + def get_timeseries(self, starttime, endtime, observatory=None, + channels=None, type=None, interval=None): + """Implements get_timeseries + + Notes: Calls VBFFactory.parse_string in place of + VBFFactory.get_timeseries. + """ + return VBFFactory.parse_string(self, self._stream.read()) + + def put_timeseries(self, timeseries, starttime=None, endtime=None, + channels=None, type=None, interval=None): + """Implements put_timeseries + + Notes: Calls VBFFactory.write_file in place of + VBFFactory.get_timeseries. This can result in a + non-standard VBF file, specifically one of longer than + expected length. + """ + VBFFactory.write_file(self, self._stream, timeseries, channels) diff --git a/geomagio/vbf/VBFFactory.py b/geomagio/vbf/VBFFactory.py new file mode 100644 index 000000000..adc0d80f3 --- /dev/null +++ b/geomagio/vbf/VBFFactory.py @@ -0,0 +1,269 @@ +"""Factory that loads VBF Files.""" + +import obspy.core +from .. import ChannelConverter +from ..TimeseriesFactory import TimeseriesFactory +from ..TimeseriesFactoryException import TimeseriesFactoryException +from ..Util import read_url +from VBFParser import VBFParser +from VBFWriter import VBFWriter + + +# pattern for vbf file names +VBF_FILE_PATTERN = '%(obs)s%(y)s%(j)s.%(i)s' + + +class VBFFactory(TimeseriesFactory): + """TimeseriesFactory for VBF formatted files. + + Parameters + ---------- + urlTemplate : str + A string that contains any of the following replacement patterns: + - '%(i)s' : interval abbreviation + - '%(interval)s' interval name + - '%(julian)s' julian day formatted as JJJ + - '%(obs)s' lowercase observatory code + - '%(OBS)s' uppercase observatory code + - '%(t)s' type abbreviation + - '%(type)s' type name + - '%(year)s' year formatted as YYYY + - '%(ymd)s' time formatted as YYYYMMDD + + See Also + -------- + VBFParser + """ + + # DCS -- 20160401 -- output flag added to parm list for vbf vs binlog + def __init__(self, urlTemplate, observatory=None, channels=None, type=None, + interval=None, output='vbf'): + TimeseriesFactory.__init__(self, observatory, channels, type, + interval, urlTemplate) + self.output = output + + def get_timeseries(self, starttime, endtime, observatory=None, + channels=None, type=None, interval=None): + """Get timeseries data + + Parameters + ---------- + observatory : str + 3-letter observatory code. + starttime : obspy.core.UTCDateTime + Time of first sample. + endtime : obspy.core.UTCDateTime + Time of last sample. + type : {'variation', 'quasi-definitive'} + Data type. + interval : {'minute', 'second'} + Data interval. + + Returns + ------- + obspy.core.Stream + timeseries object with requested data. + + Raises + ------ + TimeseriesFactoryException + If invalid values are requested, or errors occur while + retrieving timeseries. + """ + observatory = observatory or self.observatory + channels = channels or self.channels + type = type or self.type + interval = interval or self.interval + days = self._get_days(starttime, endtime) + timeseries = obspy.core.Stream() + for day in days: + url_id = self._get_url(observatory, day, type, interval) + vbfFile = read_url(url_id) + timeseries += self.parse_string(vbfFile) + + # merge channel traces for multiple days + timeseries.merge() + + # trim to requested start/end time + timeseries.trim(starttime, endtime) + + return timeseries + + def parse_string(self, vbfString): + """Parse the contents of a string in the format of a vbf file. + + Parameters + ---------- + vbfString : str + String containing VBF content. + + Returns + ------- + obspy.core.Stream + Parsed data. + """ + parser = VBFParser() + parser.parse(vbfString) + + year = parser.header['year'] + yearday = parser.header['yearday'] + + begin = int(parser.times[0]) + startHour = str(int(begin / 60.0)) + startMinute = str(int(begin % 60.0)) + ending = int(parser.times[-1]) + endHour = str(int(ending / 60.0)) + endMinute = str(int(ending % 60.0)) + + start = year + yearday + "T" + startHour + ":" + \ + startMinute + ":" + "00.0" + end = year + yearday + "T" + endHour + ":" + endMinute + ":" + "00.0" + + starttime = obspy.core.UTCDateTime(start) + endtime = obspy.core.UTCDateTime(end) + + data = parser.data + length = len(data[data.keys()[0]]) + rate = (length - 1) / (endtime - starttime) + stream = obspy.core.Stream() + + for channel in data.keys(): + stats = obspy.core.Stats() + stats.network = 'NT' + stats.station = parser.header['station'] + stats.starttime = starttime + stats.sampling_rate = rate + stats.npts = length + stats.channel = channel + + if channel == 'D': + data[channel] = ChannelConverter.get_radians_from_minutes( + data[channel]) + + stream += obspy.core.Trace(data[channel], stats) + + return stream + + def _get_days(self, starttime, endtime): + """Get days between (inclusive) starttime and endtime. + + Parameters + ---------- + starttime : obspy.core.UTCDateTime + the start time + endtime : obspy.core.UTCDateTime + the end time + + Returns + ------- + array_like + list of times, one per day, for all days between and including + ``starttime`` and ``endtime``. + + Raises + ------ + TimeseriesFactoryException + if starttime is after endtime + """ + if starttime > endtime: + raise TimeseriesFactoryException( + 'starttime must be before endtime.') + + days = [] + day = starttime + lastday = (endtime.year, endtime.month, endtime.day) + while True: + days.append(day) + if lastday == (day.year, day.month, day.day): + break + # move to next day + day = obspy.core.UTCDateTime(day.timestamp + 86400) + return days + + def write_file(self, fh, timeseries, channels): + """writes timeseries data to the given file object. + + Parameters + ---------- + fh: file object + timeseries : obspy.core.Stream + stream containing traces to store. + channels : array_like + list of channels to store + """ + + # DCS 20160401 -- if making a bin change log, call the _change_ + # version of 'write'. Otherwise, call the usual 'write' + if self.output == 'binlog': + VBFWriter().write_change_log(fh, timeseries, channels) + else: + VBFWriter().write(fh, timeseries, channels) + + def put_timeseries(self, timeseries, starttime=None, endtime=None, + channels=None, type=None, interval=None): + """Store timeseries data. + + Parameters + ---------- + timeseries : obspy.core.Stream + stream containing traces to store. + starttime : UTCDateTime + time of first sample in timeseries to store. + uses first sample if unspecified. + endtime : UTCDateTime + time of last sample in timeseries to store. + uses last sample if unspecified. + channels : array_like + list of channels to store, optional. + uses default if unspecified. + type : {'definitive', 'provisional', 'quasi-definitive', 'variation'} + data type, optional. + uses default if unspecified. + interval : {'daily', 'hourly', 'minute', 'monthly', 'second'} + data interval, optional. + uses default if unspecified. + """ + print self.urlTemplate + if not self.urlTemplate.startswith('file://'): + raise TimeseriesFactoryException('Only file urls are supported') + + channels = channels or self.channels + type = type or self.type + interval = interval or self.interval + stats = timeseries[0].stats + observatory = stats.station + starttime = starttime or stats.starttime + endtime = endtime or stats.endtime + days = self._get_days(starttime, endtime) + for day in days: + day_filename = self._get_file_from_url( + self._get_url(observatory, day, type, interval)) + + day_timeseries = self._get_slice(timeseries, day, interval) + with open(day_filename, 'wb') as fh: + self.write_file(fh, day_timeseries, channels) + + def _get_slice(self, timeseries, day, interval): + """Get the first and last time for a day + + Parameters + ---------- + timeseries : obspy.core.Stream + timeseries to slice + day : UTCDateTime + time in day to slice + + Returns + ------- + obspy.core.Stream + sliced stream + """ + day = day.datetime + start = obspy.core.UTCDateTime(day.year, day.month, day.day, 0, 0, 0) + + if interval == 'minute': + end = start + 86340.0 + else: + end = start + 86399.999999 + + return timeseries.slice(start, end) diff --git a/geomagio/vbf/VBFParser.py b/geomagio/vbf/VBFParser.py new file mode 100644 index 000000000..a2954478c --- /dev/null +++ b/geomagio/vbf/VBFParser.py @@ -0,0 +1,115 @@ +"""Parsing methods for the VBF Format.""" + + +import numpy + +# values that represent missing data points in VBF +NINES = numpy.int('9999999') +NINES_RAW = numpy.int('99999990') +NINES_DEG = numpy.int('9999') + + +class VBFParser(object): + """VBF parser. + + Attributes + ---------- + header : dict + parsed VBF header. + channels : array + parsed channel names. + times : array + parsed timeseries times. + data : dict + keys are channel names (order listed in ``self.channels``). + values are ``numpy.array`` of timeseries values, array values are + ``numpy.nan`` when values are missing. + """ + + def __init__(self): + """Create a new VBF parser.""" + # header fields + self.header = {} + # array of channel names + self.channels = [] + # timestamps of data (datetime.datetime) + self.times = [] + # dictionary of data (channel : numpy.array<float64>) + self.data = {} + # temporary storage for data being parsed + self._parsedata = None + + def parse(self, data): + """Parse a string containing VBF formatted data. + + Parameters + ---------- + data : str + VBF formatted file contents. + """ + self._set_channels() + + parsing_header = True + lines = data.splitlines() + for line in lines: + if parsing_header: + self._parse_header(line) + parsing_header = False + else: + self._parse_data(line) + self._post_process() + + def _parse_header(self, line): + """Parse header line. + + Adds value to ``self.header``. + """ + self.header['header'] = line + self.header['station'] = line[0:3] + self.header['year'] = line[5:9] + self.header['yearday'] = line[11:14] + self.header['date'] = line[16:25] + + return + + def _parse_data(self, line): + """Parse one data point in the timeseries. + + Adds time to ``self.times``. + Adds channel values to ``self.data``. + """ + t, d1, d2, d3, d4 = self._parsedata + + t.append(line[0:4]) + d1.append(int(line[5:13])) + d2.append(int(line[14:22])) + d3.append(int(line[23:31])) + d4.append(int(line[32:40])) + + def _post_process(self): + """Post processing after data is parsed. + + Converts data to numpy arrays. + Replaces empty values with ``numpy.nan``. + """ + self.times = self._parsedata[0] + + for channel, data in zip(self.channels, self._parsedata[1:]): + data = numpy.array(data, dtype=numpy.float64) + # filter empty values + data[data == NINES] = numpy.nan + data = numpy.divide(data, 100) + self.data[channel] = data + + self._parsedata = None + + def _set_channels(self): + """Adds channel names to ``self.channels``. + Creates empty values arrays in ``self.data``. + """ + self.channels.append('H') + self.channels.append('E') + self.channels.append('Z') + self.channels.append('F') + + self._parsedata = ([], [], [], [], []) diff --git a/geomagio/vbf/VBFWriter.py b/geomagio/vbf/VBFWriter.py new file mode 100644 index 000000000..b4f185318 --- /dev/null +++ b/geomagio/vbf/VBFWriter.py @@ -0,0 +1,360 @@ + +import numpy +import VBFParser +from cStringIO import StringIO +from datetime import datetime +from .. import ChannelConverter, TimeseriesUtility +from ..TimeseriesFactoryException import TimeseriesFactoryException +from obspy.core import Stream + + +# DCS 20160328 -- for a binlog, need to save previous time and volts +h_prev = [99.999999, 999] +e_prev = [99.999999, 999] +z_prev = [99.999999, 999] +# DCS 20160328 -- use seperate HEZ buffers to group binlog output by component +Hbuf = [] +Ebuf = [] +Zbuf = [] + +class VBFWriter(object): + """VBF writer. + """ + + def __init__(self, empty_value=VBFParser.NINES): + self.empty_value = empty_value + + def write(self, out, timeseries, channels): + """Write timeseries to vbf file. + + Parameters + ---------- + out : file object + File object to be written to. Could be stdout. + timeseries : obspy.core.stream + Timeseries object with data to be written. + channels : array_like + Channels to be written from timeseries object. + """ + for channel in channels: + if timeseries.select(channel=channel).count() == 0: + raise TimeseriesFactoryException( + 'Missing channel "%s" for output, available channels %s' % + (channel, str(TimeseriesUtility.get_channels(timeseries)))) + stats = timeseries[0].stats + + out.write(self._format_header(stats)) + + out.write(self._format_data(timeseries, channels)) + + def _format_header(self, stats): + """format headers for VBF file + + Parameters + ---------- + stats : List + An object with the header values to be written. + + Returns + ------- + str + A string formatted to be a single header line in a VBF file. + """ + buf = [] + + observatory = stats.station + year = str(stats.starttime.year) + yearday = str(stats.starttime.julday).zfill(3) + date = stats.starttime.strftime("%d-%b-%y") + + buf.append(observatory + ' ' + year + ' ' + yearday + ' ' + + date + ' Hvolt Hbin Evolt Ebin Zvolt Zbin Version 1.0\n') + + return ''.join(buf) + + def _format_data(self, timeseries, channels): + """Format all data lines. + + Parameters + ---------- + timeseries : obspy.core.Stream + Stream containing traces with channel listed in channels + channels : sequence + List and order of channel values to output. + + Returns + ------- + str + A string formatted to be the data lines in a VBF file. + """ + buf = [] + + # create new stream + timeseriesLocal = Stream() + # Use a copy of the trace so that we don't modify the original. + for trace in timeseries: + traceLocal = trace.copy() + if traceLocal.stats.channel == 'D': + traceLocal.data = \ + ChannelConverter.get_minutes_from_radians(traceLocal.data) + + # TODO - we should look into multiplying the trace all at once + # like this, but this gives an error on Windows at the moment. + # traceLocal.data = \ + # numpy.round(numpy.multiply(traceLocal.data, 100)).astype(int) + + timeseriesLocal.append(traceLocal) + + traces = [timeseriesLocal.select(channel=c)[0] for c in channels] + starttime = float(traces[0].stats.starttime) + delta = traces[0].stats.delta + + for i in xrange(len(traces[0].data)): + buf.append(self._format_values( + datetime.utcfromtimestamp(starttime + i * delta), + (t.data[i] for t in traces))) + + return ''.join(buf) + + def _format_values(self, time, values): + """Format one line of data values. + + Parameters + ---------- + time : datetime + Timestamp for values. + values : sequence + List and order of channel values to output. + If value is NaN, self.empty_value is output in its place. + + Returns + ------- + unicode + Formatted line containing values. + """ + tt = time.timetuple() + totalMinutes = int(tt.tm_hour * 3600 + tt.tm_min * 60 + tt.tm_sec) + + # DCS 20160328 -- init volt/bin vals to dead + vdead = 99.999999 + bdead = 999 + vblist =[vdead, bdead, vdead, bdead, vdead, bdead] + + # now "un-dead" the good vals, format volts as float, bins as int + for idx, valx in enumerate(values): + if ~numpy.isnan(valx): + if idx == 0 or idx == 2 or idx == 4: + vblist[idx] = valx / 1000. + else: + vblist[idx] = int(valx) + + + return '{0:0>5d} {1: >10.6f} {2: >4d} {3: >10.6f} {4: >4d} ' \ + '{5: >10.6f} {6: >4d}\n'.format(totalMinutes, *vblist) + + + + + #=============================================== + # CODE BELOW IS FOR MAKING A BIN CHANGE LOG. + # VBFFactory calls the '_change_' version of the + # procedures rather than the "usual" procedures + #=============================================== + + def write_change_log(self, out, timeseries, channels): + """Write timeseries to vbf file. + + Parameters + ---------- + out : file object + File object to be written to. Could be stdout. + timeseries : obspy.core.stream + Timeseries object with data to be written. + channels : array_like + Channels to be written from timeseries object. + """ + for channel in channels: + if timeseries.select(channel=channel).count() == 0: + raise TimeseriesFactoryException( + 'Missing channel "%s" for output, available channels %s' % + (channel, str(TimeseriesUtility.get_channels(timeseries)))) + stats = timeseries[0].stats + + + out.write(self._format_change_header(stats)) + + self._format_change_data(timeseries, channels) + + if (len(Hbuf) + len(Ebuf) + len(Zbuf)) > 0: + out.write(' C Date Time DaySec Bin change Voltage change\n') + out.write(''.join(Hbuf)) + out.write('\n') + out.write(''.join(Ebuf)) + out.write('\n') + out.write(''.join(Zbuf)) + else: + out.write('*** No Bin Changes Found ***\n') + + def _format_change_header(self, stats): + """format headers for VBF file + + Parameters + ---------- + stats : List + An object with the header values to be written. + + Returns + ------- + str + A string formatted to be a single header line in a VBF file. + """ + buf = [] + + observatory = stats.station + sttdate = stats.starttime.strftime("%d-%b-%y") + enddate = stats.endtime.strftime("%d-%b-%y") + + buf.append('Bin Change Report: ' + observatory + ' Start Day: ' + + sttdate + ' End Day: ' + enddate + '\n\n') + + return ''.join(buf) + + + def _format_change_data(self, timeseries, channels): + """Format all data lines. + + Parameters + ---------- + timeseries : obspy.core.Stream + Stream containing traces with channel listed in channels + channels : sequence + List and order of channel values to output. + + Returns + ------- + str + A string formatted to be the data lines in a VBF file. + """ + buf = [] + + + # create new stream + timeseriesLocal = Stream() + # Use a copy of the trace so that we don't modify the original. + for trace in timeseries: + traceLocal = trace.copy() + if traceLocal.stats.channel == 'D': + traceLocal.data = \ + ChannelConverter.get_minutes_from_radians(traceLocal.data) + + # TODO - we should look into multiplying the trace all at once + # like this, but this gives an error on Windows at the moment. + # traceLocal.data = \ + # numpy.round(numpy.multiply(traceLocal.data, 100)).astype(int) + + timeseriesLocal.append(traceLocal) + + traces = [timeseriesLocal.select(channel=c)[0] for c in channels] + starttime = float(traces[0].stats.starttime) + delta = traces[0].stats.delta + + for i in xrange(len(traces[0].data)): + self._format_change_values( + datetime.utcfromtimestamp(starttime + i * delta), + (t.data[i] for t in traces)) + + return + + def _format_change_values(self, time, values): + """Format one line of data values. + + Parameters + ---------- + time : datetime + Timestamp for values. + values : sequence + List and order of channel values to output. + If value is NaN, self.empty_value is output in its place. + + Returns + ------- + unicode + Formatted line containing values. + """ + + tt = time.timetuple() + totalMinutes = int(tt.tm_hour * 3600 + tt.tm_min * 60 + tt.tm_sec) + + timestr = '{0.tm_year:0>4d}-{0.tm_mon:0>2d}-{0.tm_mday:0>2d} ' \ + '{0.tm_hour:0>2d}:{0.tm_min:0>2d}:{0.tm_sec:0>2d}' \ + ' ({1:0>5d})'. \ + format(tt, totalMinutes) + + + # init volt/bin vals to dead + vdead = 99.999999 + bdead = 999 + vblist =[vdead, bdead, vdead, bdead, vdead, bdead] + + # now "un-dead" the non-nans, format volts as float, bins as int + for idx, valx in enumerate(values): + if ~numpy.isnan(valx): + if idx == 0 or idx == 2 or idx == 4: + vblist[idx] = valx / 1000. + else: + vblist[idx] = int(valx) + + + if vblist[1] != 999 and h_prev[1] != 999 and vblist[1] != h_prev[1]: + Hbuf.append('{0: >3s} {1:>s} ' \ + '{2: >4d} to {3: >4d} {4: >10.6f} to {5: >10.6f}\n'. \ + format('(H)', timestr, h_prev[1], + vblist[1], h_prev[0], vblist[0])) + + if vblist[3] != 999 and e_prev[1] != 999 and vblist[3] != e_prev[1]: + Ebuf.append('{0: >3s} {1:>s} ' \ + '{2: >4d} to {3: >4d} {4: >10.6f} to {5: >10.6f}\n'. \ + format('(E)', timestr, e_prev[1], + vblist[3], e_prev[0], vblist[2])) + + if vblist[5] != 999 and z_prev[1] != 999 and vblist[5] != z_prev[1]: + Zbuf.append('{0: >3s} {1:>s} ' \ + '{2: >4d} to {3: >4d} {4: >10.6f} to {5: >10.6f}\n'. \ + format('(Z)', timestr, z_prev[1], + vblist[5], z_prev[0], vblist[4])) + + h_prev[0] = vblist[0] + h_prev[1] = vblist[1] + + e_prev[0] = vblist[2] + e_prev[1] = vblist[3] + + z_prev[0] = vblist[4] + z_prev[1] = vblist[5] + + + return + + + @classmethod + def format(self, timeseries, channels): + """Get an VBF formatted string. + + Calls write() with a StringIO, and returns the output. + + Parameters + ---------- + timeseries : obspy.core.Stream + Stream containing traces with channel listed in channels + channels : sequence + List and order of channel values to output. + + Returns + ------- + unicode + VBF formatted string. + """ + out = StringIO() + writer = VBFWriter() + writer.write(out, timeseries, channels) + return out.getvalue() diff --git a/geomagio/vbf/__init__.py b/geomagio/vbf/__init__.py new file mode 100644 index 000000000..a4b28ccf2 --- /dev/null +++ b/geomagio/vbf/__init__.py @@ -0,0 +1,15 @@ +"""IO Module for VBF Format +""" + +from VBFFactory import VBFFactory +from StreamVBFFactory import StreamVBFFactory +from VBFParser import VBFParser +from VBFWriter import VBFWriter + + +__all__ = [ + 'VBFFactory', + 'StreamVBFFactory', + 'VBFParser', + 'VBFWriter' +] -- GitLab