Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""Parsing methods for the IMFV122 Format."""
import numpy
from obspy.core import UTCDateTime
# values that represent missing data points in IAGA2002
EIGHTS = numpy.float64('88888.88')
NINES = numpy.float64('99999.99')
class IMFV122Parser(object):
"""IMFV122 parser.
Based on documentation at:
http://www.intermagnet.org/data-donnee/formats/imfv122-eng.php
Attributes
----------
metadata : dict
parsed IMFV122 metadata.
channels : array
parsed channel names.
times : array
parsed timeseries times.
data : dict
keys are channel names (order listed in ``self.channels``).
values are ``numpy.array`` of timeseries values, array values are
``numpy.nan`` when values are missing.
"""
def __init__(self, observatory=None):
"""Create a new IAGA2002 parser."""
# header fields
self.metadata = {
'network': 'NT',
'station': observatory
}
# array of channel names
self.channels = []
# timestamps of data (datetime.datetime)
self.times = []
# dictionary of data (channel : numpy.array<float64>)
self.data = {}
# temporary storage for data being parsed
self._parsedata = ([], [], [], [], [])
def parse(self, data):
"""Parse a string containing IAGA2002 formatted data.
Parameters
----------
data : str
IAGA 2002 formatted file contents.
"""
station = data[0:3]
lines = data.splitlines()
for line in lines:
if line.startswith(station):
self._parse_header(line)
else:
self._parse_data(line)
self._post_process()
def _parse_header(self, line):
"""Parse header line.
Adds value to ``self.headers``.
"""
(observatory,
date,
doy,
start,
components,
type,
gin,
colalong,
decbas,
reserved) = line.split()
self.channels = list(components)
self.metadata['declination_base'] = int(decbas)
self.metadata['geodetic_latitude'] = float(colalong[:4]) / 10
self.metadata['geodetic_longitude'] = float(colalong[4:]) / 10
self.metadata['station'] = observatory
self.metadata['gin'] = gin
year = 1900 + int(date[-2:])
julday = int(doy)
hour = 0
minute = 0
if year < 1971:
year = year + 100
if len(start) == 2:
# minutes data
hour = int(start)
self._delta = 60
else:
# seconds data
dayminute = int(start)
hour = int(dayminute / 60)
minute = dayminute % 60
self._delta = 1
self._nexttime = UTCDateTime(
year=year,
julday=julday,
hour=hour,
minute=minute)
def _parse_data(self, line):
"""Parse one data point in the timeseries.
Adds time to ``self.times``.
Adds channel values to ``self.data``.
"""
(d11, d21, d31, d41, d12, d22, d32, d42) = line.split()
t, d1, d2, d3, d4 = self._parsedata
t.append(self._nexttime)
d1.append(d11)
d2.append(d21)
d3.append(d31)
d4.append(d41)
self._nexttime = self._nexttime + self._delta
t.append(self._nexttime)
d1.append(d12)
d2.append(d22)
d3.append(d32)
d4.append(d42)
self._nexttime = self._nexttime + self._delta
def _post_process(self):
"""Post processing after data is parsed.
Converts data to numpy arrays.
Replaces empty values with ``numpy.nan``.
"""
self.times = self._parsedata[0]
for channel, data in zip(self.channels, self._parsedata[1:]):
data = numpy.array(data, dtype=numpy.float64)
data[data == int(EIGHTS)] = numpy.nan
data[data == EIGHTS] = numpy.nan
data[data == NINES] = numpy.nan
if channel == 'D':
data = data / 100
else:
data = data / 10
self.data[channel] = data
self._parsedata = None