Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from cStringIO import StringIO
from geomagio import TimeseriesFactoryException, ChannelConverter
import numpy
import PCDCPParser
import textwrap
from datetime import datetime
class PCDCPWriter(object):
"""PCDCP writer.
"""
def __init__(self, empty_value=PCDCPParser.NINES):
self.empty_value = empty_value
def write(self, out, timeseries, channels):
"""write timeseries to pcdcp file
Parameters
----------
out : file object
File object to be written to. Could be stdout.
timeseries : obspy.core.stream
Timeseries object with data to be written.
channels : array_like
Channels to be written from timeseries object
"""
stats = timeseries[0].stats
out.write(self._format_header(stats, channels))
out.write(self._format_data(timeseries, channels))
def _format_header(self, name, value):
"""format headers for PCDCP file
Parameters
----------
name: str
the name to be written
value: str
the value to written.
Returns
-------
str
a string formatted to be a single header line in a PCDCP file
"""
observatory = 'BOU'
year = '2015'
date = '01-Jan-15'
space = ' '
return ''.join(observatory, space, year, space, '001', space,
date, space, 'HEZF 0.01nT File Version 2.00')
def _format_data(self, timeseries, channels):
"""Format all data lines.
Parameters
----------
timeseries : obspy.core.Stream
stream containing traces with channel listed in channels
channels : sequence
list and order of channel values to output.
"""
buf = []
if timeseries.select(channel='D'):
d = timeseries.select(channel='D')
d[0].data = ChannelConverter.get_minutes_from_radians(d[0].data)
traces = [timeseries.select(channel=c)[0] for c in channels]
starttime = float(traces[0].stats.starttime)
delta = traces[0].stats.delta
for i in xrange(len(traces[0].data)):
buf.append(self._format_values(
datetime.utcfromtimestamp(starttime + i * delta),
(t.data[i] for t in traces)))
return ''.join(buf)
def _format_values(self, time, values):
"""Format one line of data values.
Parameters
----------
time : datetime
timestamp for values
values : sequence
list and order of channel values to output.
if value is NaN, self.empty_value is output in its place.
Returns
-------
unicode
Formatted line containing values.
"""
tt = time.timetuple()
return '{0.tm_year:0>4d}-{0.tm_mon:0>2d}-{0.tm_mday:0>2d} ' \
'{0.tm_hour:0>2d}:{0.tm_min:0>2d}:{0.tm_sec:0>2d}.{1:0>3d} ' \
'{0.tm_yday:0>3d} ' \
'{2:10.2f}{3:10.2f}{4:10.2f}{5:10.2f}\n'.format(
tt, int(time.microsecond / 1000),
*[self.empty_value if numpy.isnan(val) else val
for val in values])
@classmethod
def format(self, timeseries, channels):
"""Get an PCDCP formatted string.
Calls write() with a StringIO, and returns the output.
Parameters
----------
timeseries : obspy.core.Stream
Returns
-------
unicode
PCDCP formatted string.
"""
out = StringIO()
writer = PCDCPWriter()
writer.write(out, timeseries, channels)
return out.getvalue()