Newer
Older
"""Parsing methods for the IAGA2002 Format."""
# values that represent missing data points in IAGA2002
EIGHTS = numpy.float64('88888.88')
NINES = numpy.float64('99999.99')
class IAGA2002Parser(object):
"""IAGA2002 parser.
Based on documentation at:
http://www.ngdc.noaa.gov/IAGA/vdat/iagaformat.html
Attributes
----------
headers : dict
parsed IAGA headers.
comments : array
parsed comments.
channels : array
parsed channel names.
times : array
parsed timeseries times.
data : dict
keys are channel names (order listed in ``self.channels``).
values are ``numpy.array`` of timeseries values, array values are
``numpy.nan`` when values are missing.
"""Create a new IAGA2002 parser."""
# header fields
self.headers = {}
self.metadata = {
'network': 'NT'
}
# header comments
self.comments = []
# array of channel names
self.channels = []
self.times = []
# dictionary of data (channel : numpy.array<float64>)
self.data = {}
# temporary storage for data being parsed
self._parsedata = None
"""Parse a string containing IAGA2002 formatted data.
Parameters
----------
data : str
IAGA 2002 formatted file contents.
"""
parsing_headers = True
lines = data.splitlines()
for line in lines:
if parsing_headers:
if line.startswith(' ') and line.endswith('|'):
# still in headers
if line.startswith(' #'):
self._parse_comment(line)
else:
self._parse_header(line)
else:
parsing_headers = False
self._parse_channels(line)
else:
self._parse_data(line)
self._post_process()
def _parse_header(self, line):
"""Parse header line.
Adds value to ``self.headers``.
"""
key = line[1:24].strip()
value = line[24:69].strip()
self.headers[key] = value
key_upper = key.upper()
if key_upper == 'SOURCE OF DATA':
key = 'agency_name'
elif key_upper == 'STATION NAME':
key = 'station_name'
elif key_upper == 'IAGA CODE':
key = 'station'
elif key_upper == 'GEODETIC LATITUDE':
key = 'geodetic_latitude'
elif key_upper == 'GEODETIC LONGITUDE':
key = 'geodetic_longitude'
elif key_upper == 'ELEVATION':
key = 'elevation'
elif key_upper == 'SENSOR ORIENTATION':
key = 'sensor_orientation'
elif key_upper == 'DIGITAL SAMPLING':
key = 'sensor_sampling_rate'
value = 1 / float(value.replace('second', '').strip())
elif key_upper == 'DATA INTERVAL TYPE':
key = 'data_interval_type'
elif key_upper == 'DATA TYPE':
key = 'data_type'
else:
# not a required header
return
self.metadata[key] = value
def _parse_comment(self, line):
"""Parse comment line.
Adds line to ``self.comments``.
"""
self.comments.append(line[2:69].strip())
def _parse_channels(self, line):
"""Parse data header that contains channel names.
Adds channel names to ``self.channels``.
Creates empty values arrays in ``self.data``.
iaga_code = self.metadata['station']
self.channels.append(line[30:40].strip().replace(iaga_code, ''))
self.channels.append(line[40:50].strip().replace(iaga_code, ''))
self.channels.append(line[50:60].strip().replace(iaga_code, ''))
self.channels.append(line[60:69].strip().replace(iaga_code, ''))
# create parsing data arrays
self._parsedata = ([], [], [], [], [])
"""Parse one data point in the timeseries.
Adds time to ``self.times``.
Adds channel values to ``self.data``.
# parsing time components is much faster
time = datetime(
# date
int(line[0:4]), int(line[5:7]), int(line[8:10]),
# time
int(line[11:13]), int(line[14:16]), int(line[17:19]),
# microseconds
int(line[20:23]) * 1000)
t, d1, d2, d3, d4 = self._parsedata
t.append(time)
d1.append(line[31:40])
d2.append(line[41:50])
d3.append(line[51:60])
d4.append(line[61:70])
"""Post processing after data is parsed.
Merges comment lines.
Parses additional comment-based header values.
Converts data to numpy arrays.
Replaces empty values with ``numpy.nan``.
self.comments = self._merge_comments(self.comments)
self.times = self._parsedata[0]
for channel, data in zip(self.channels, self._parsedata[1:]):
data = numpy.array(data, dtype=numpy.float64)
# filter empty values
data[data == EIGHTS] = numpy.nan
data[data == NINES] = numpy.nan
self.data[channel] = data
"""Parse header values embedded in comments."""
comments = []
filter_comments = []
conditions_of_use = None
declination_base = None
is_intermagnet = False
for comment in self.comments:
if comment.startswith('DECBAS'):
# parse DECBAS
decbas = comment.replace('DECBAS', '').strip()
declination_base = int(decbas[:decbas.find(' ')])
elif comment.startswith('CONDITIONS OF USE:'):
conditions_of_use = comment.replace(
'CONDITIONS OF USE:', '').strip()
else:
if 'FILTER' in comment_upper:
filter_comments.append(comment)
elif 'INTERMAGNET DVD' in comment_upper or \
'WWW.INTERMAGNET.ORG' in comment_upper:
is_intermagnet = True
else:
comments.append(comment)
self.metadata['comments'] = tuple(comments)
self.metadata['filter_comments'] = tuple(filter_comments)
self.metadata['conditions_of_use'] = conditions_of_use
self.metadata['declination_base'] = declination_base
self.metadata['is_intermagnet'] = is_intermagnet
def _merge_comments(self, comments):
"""Combine multi-line, period-delimited comments.
Parameters
----------
comments : array_like
array of comment strings.
Returns
-------
array_like
merged comment strings.
"""
merged = []
partial = None
for comment in comments:
if partial is None:
partial = comment
else:
partial = partial + ' ' + comment
# comments end with period
if partial.endswith('.'):
merged.append(partial)
partial = None
# comment that doesn't end in a period
if partial is not None:
merged.append(partial)
return merged