Skip to content
Snippets Groups Projects
Commit ea6760ae authored by Jeremy M Fee's avatar Jeremy M Fee
Browse files

Merge pull request #4 from hasimpson-usgs/StreamConverter

Stream converter
parents 2e681c2e 4d59cab7
No related branches found
No related tags found
No related merge requests found
...@@ -83,7 +83,7 @@ def get_geo_x_from_mag(h, d): ...@@ -83,7 +83,7 @@ def get_geo_x_from_mag(h, d):
array_like array_like
x component x component
""" """
return h * numpy.cos(d) return numpy.multiply(h, numpy.cos(d))
def get_geo_y_from_mag(h, d): def get_geo_y_from_mag(h, d):
...@@ -101,7 +101,7 @@ def get_geo_y_from_mag(h, d): ...@@ -101,7 +101,7 @@ def get_geo_y_from_mag(h, d):
array_like array_like
y component y component
""" """
return h * numpy.sin(d) return numpy.multiply(h, numpy.sin(d))
# ### # ###
...@@ -168,7 +168,7 @@ def get_mag_d_from_obs(h, e, d0=0): ...@@ -168,7 +168,7 @@ def get_mag_d_from_obs(h, e, d0=0):
array_like array_like
the total magnetic declination the total magnetic declination
""" """
return d0 + get_obs_d_from_obs(h, e) return numpy.add(d0, get_obs_d_from_obs(h, e))
def get_mag_d_from_geo(x, y): def get_mag_d_from_geo(x, y):
...@@ -310,7 +310,7 @@ def get_obs_d_from_mag_d(d, d0=0): ...@@ -310,7 +310,7 @@ def get_obs_d_from_mag_d(d, d0=0):
array_like array_like
the observatory d declination the observatory d declination
""" """
return d - d0 return numpy.subtract(d, d0)
def get_obs_e_from_mag(h, d, d0=0): def get_obs_e_from_mag(h, d, d0=0):
...@@ -331,7 +331,7 @@ def get_obs_e_from_mag(h, d, d0=0): ...@@ -331,7 +331,7 @@ def get_obs_e_from_mag(h, d, d0=0):
the observatory e component the observatory e component
""" """
obs_d = get_obs_d_from_mag_d(d, d0) obs_d = get_obs_d_from_mag_d(d, d0)
return h * numpy.sin(obs_d) return numpy.multiply(h, numpy.sin(obs_d))
def get_obs_e_from_obs(h, d): def get_obs_e_from_obs(h, d):
...@@ -349,7 +349,7 @@ def get_obs_e_from_obs(h, d): ...@@ -349,7 +349,7 @@ def get_obs_e_from_obs(h, d):
array_like array_like
the observatory e component the observatory e component
""" """
return h * numpy.tan(d) return numpy.multiply(h, numpy.tan(d))
def get_obs_h_from_mag(h, d, d0=0): def get_obs_h_from_mag(h, d, d0=0):
...@@ -370,7 +370,7 @@ def get_obs_h_from_mag(h, d, d0=0): ...@@ -370,7 +370,7 @@ def get_obs_h_from_mag(h, d, d0=0):
the observatory h component the observatory h component
""" """
obs_d = get_obs_d_from_mag_d(d, d0) obs_d = get_obs_d_from_mag_d(d, d0)
return h * numpy.cos(obs_d) return numpy.multiply(h, numpy.cos(obs_d))
def get_radians_from_minutes(m): def get_radians_from_minutes(m):
...@@ -380,7 +380,7 @@ def get_radians_from_minutes(m): ...@@ -380,7 +380,7 @@ def get_radians_from_minutes(m):
d: array_like d: array_like
the decimal value to be converted the decimal value to be converted
""" """
return m * M2R return numpy.multiply(m, M2R)
def get_minutes_from_radians(r): def get_minutes_from_radians(r):
...@@ -391,4 +391,4 @@ def get_minutes_from_radians(r): ...@@ -391,4 +391,4 @@ def get_minutes_from_radians(r):
r: float r: float
the radian value to be converted the radian value to be converted
""" """
return r * R2M return numpy.multiply(r, R2M)
...@@ -40,6 +40,10 @@ class ChannelConverterTest: ...@@ -40,6 +40,10 @@ class ChannelConverterTest:
geographic north. geographic north.
Y: the component corresponding to the field strength along Y: the component corresponding to the field strength along
geographic east. geographic east.
We are using triangle identities for variables to test with. Specifically
the hypotenuse is normally equal to 1, causing the adjacent angle
length to be cos(angle) and the opposite length to be sin(angle)
""" """
def test_get_geo_from_obs(self): def test_get_geo_from_obs(self):
......
...@@ -13,7 +13,29 @@ import ChannelConverter ...@@ -13,7 +13,29 @@ import ChannelConverter
def get_geo_from_mag(mag): def get_geo_from_mag(mag):
pass """Convert a stream to geographic coordinate system.
Parameters
----------
stream : obspy.core.Stream
stream containing observatory components H, D, Z, and F.
Returns
-------
obspy.core.Stream
new stream object containing geographic components X, Y, Z, and F.
"""
h = mag.select(channel='H')[0]
d = mag.select(channel='D')[0]
z = mag.select(channel='Z')[0]
f = mag.select(channel='F')[0]
mag_h = h.data
mag_d = d.data
(geo_x, geo_y) = ChannelConverter.get_geo_from_mag(mag_h, mag_d)
return obspy.core.Stream((
__get_trace('X', h.stats, geo_x),
__get_trace('Y', d.stats, geo_y),
z, f))
def get_geo_from_obs(obs): def get_geo_from_obs(obs):
...@@ -33,7 +55,29 @@ def get_geo_from_obs(obs): ...@@ -33,7 +55,29 @@ def get_geo_from_obs(obs):
def get_mag_from_geo(geo): def get_mag_from_geo(geo):
pass """Convert a stream to magnetic coordinate system.
Parameters
----------
geo: obspy.core.Stream
stream containing observatory components X, Y, Z, and F.
Returns
-------
obspy.core.Stream
new stream object containing magnetic components H, D, Z, and F.
"""
x = geo.select(channel='X')[0]
y = geo.select(channel='Y')[0]
z = geo.select(channel='Z')[0]
f = geo.select(channel='F')[0]
geo_x = x.data
geo_y = y.data
(mag_h, mag_d) = ChannelConverter.get_mag_from_geo(geo_x, geo_y)
return obspy.core.Stream((
__get_trace('H', x.stats, mag_h),
__get_trace('D', y.stats, mag_d),
z, f))
def get_mag_from_obs(obs): def get_mag_from_obs(obs):
...@@ -50,22 +94,22 @@ def get_mag_from_obs(obs): ...@@ -50,22 +94,22 @@ def get_mag_from_obs(obs):
new stream object containing magnetic components H, D, Z, and F. new stream object containing magnetic components H, D, Z, and F.
""" """
h = obs.select(channel='H')[0] h = obs.select(channel='H')[0]
d = __get_obs_d_from_obs(obs) e = __get_obs_e_from_obs(obs)
z = obs.select(channel='Z')[0] z = obs.select(channel='Z')[0]
f = obs.select(channel='F')[0] f = obs.select(channel='F')[0]
obs_h = h.data obs_h = h.data
obs_d = d.data obs_e = e.data
d0 = ChannelConverter.get_radians_from_minutes( d0 = ChannelConverter.get_radians_from_minutes(
numpy.float64(d.stats['DECBAS']) / 10) numpy.float64(e.stats['DECBAS']) / 10)
(mag_h, mag_d) = ChannelConverter.get_mag_from_obs(obs_h, obs_d, d0) (mag_h, mag_d) = ChannelConverter.get_mag_from_obs(obs_h, obs_e, d0)
return obspy.core.Stream(( return obspy.core.Stream((
__get_trace('H', h.stats, mag_h), __get_trace('H', h.stats, mag_h),
__get_trace('D', d.stats, mag_d), __get_trace('D', e.stats, mag_d),
z, f)) z, f))
def get_obs_from_geo(geo, include_e=False): def get_obs_from_geo(geo, include_d=False):
"""Convert a stream to geographic coordinate system. """Convert a stream to observatory coordinate system.
Parameters Parameters
---------- ----------
...@@ -79,15 +123,72 @@ def get_obs_from_geo(geo, include_e=False): ...@@ -79,15 +123,72 @@ def get_obs_from_geo(geo, include_e=False):
obspy.core.Stream obspy.core.Stream
new stream object containing observatory components H, D, E, Z, and F. new stream object containing observatory components H, D, E, Z, and F.
""" """
return get_obs_from_mag(get_mag_from_geo(geo)) return get_obs_from_mag(get_mag_from_geo(geo), include_d)
def get_obs_from_mag(mag, include_d=False):
"""Convert a stream to magnetic observatory coordinate system.
Parameters
----------
stream: obspy.core.Stream
stream containing magnetic components H, D, Z, and F.
include_e: boolean
whether to also include the observatory E component
Returns
-------
obspy.core.Stream
new stream object containing observatory components H, D, E, Z, and F
"""
h = mag.select(channel='H')[0]
d = mag.select(channel='D')[0]
z = mag.select(channel='Z')[0]
f = mag.select(channel='F')[0]
mag_h = h.data
mag_d = d.data
d0 = ChannelConverter.get_radians_from_minutes(
numpy.float64(d.stats['DECBAS']) / 10)
(obs_h, obs_e) = ChannelConverter.get_obs_from_mag(mag_h, mag_d, d0)
traces = (
__get_trace('H', h.stats, obs_h),
__get_trace('E', d.stats, obs_e),
z, f)
if include_d:
obs_d = ChannelConverter.get_obs_d_from_obs(obs_h, obs_e)
traces = traces + (__get_trace('D', d.stats, obs_d),)
return obspy.core.Stream(traces)
def get_obs_from_mag(mag, include_e=False):
# TODO def get_obs_from_obs(obs, include_e=False, include_d=False):
obs = obspy.core.Stream() """Fill in the observatory parameters as requested
Parameters
----------
stream: obspy.core.Stream
stream containing the observatory components H, D of E, Z, and F.
include_e: boolean
whether to include the e component
include_d: boolean
whether to include the d component
Returns
-------
obspy.core.Stream
new stream object containing observatory components H, D, E, Z, and F
"""
h = obs.select(channel='H')[0]
z = obs.select(channel='Z')[0]
f = obs.select(channel='F')[0]
traces = (h, z, f)
if include_d:
d = __get_obs_d_from_obs(obs)
traces = traces + (d, )
if include_e: if include_e:
obs += __get_obs_e_from_obs(obs) e = __get_obs_e_from_obs(obs)
return obs traces = traces + (e, )
return obspy.core.Stream(traces)
def __get_trace(channel, stats, data): def __get_trace(channel, stats, data):
......
"""Unit Tests for StreamConverter
Notes
-----
We are using triangle identities for variables to test with. Specifically
the hypotenuse is normally equal to 1, causing the adjacent angle length
to be cos(angle) and the opposite length to be sin(angle)
For more info on the components see the Notes in ChannelConverterTest.py.
"""
import obspy.core
import numpy
import StreamConverter
import ChannelConverter
assert_equals = numpy.testing.assert_equal
assert_almost_equals = numpy.testing.assert_almost_equal
cos = numpy.cos
sin = numpy.sin
D2R = numpy.pi / 180
D2I = 60 * 10 # Degrees to Iaga Decbas
STARTTIME = obspy.core.UTCDateTime('2014-11-01')
def test_get_geo_from_mag():
"""geomag.StreamConverter_test.test_get_geo_from_mag()
The magnetic north stream containing the traces ''h'', ''d'', ''z'', and
''f'' converts to the geographics stream containing the traces ''x'',
''y'', ''z'' and ''f''
"""
mag = obspy.core.Stream()
# Call get_geo_from_magusing a decbas of 15 degrees, and streams with
# H = [1, 1], and D = [15 degrees, 30 degrees], expect streams of
# X = [cos(15), cos(30)] and Y = [sin(15), sin(30)]
# stats.DECBAS = 15 * D2I
mag += __create_trace('H', [1, 1])
mag += __create_trace('D', [15 * D2R, 30 * D2R])
mag += __create_trace('Z', [1, 1])
mag += __create_trace('F', [1, 1])
geo = StreamConverter.get_geo_from_mag(mag)
X = geo.select(channel='X')[0].data
Y = geo.select(channel='Y')[0].data
assert_equals(X, [cos(15 * D2R), cos(30 * D2R)],
'Expect X to equal [cos(15), cos(30)]', True)
assert_equals(Y, [sin(15 * D2R), sin(30 * D2R)],
'Expect Y to equal [sin(15), sin(30)]', True)
def test_get_geo_from_obs():
"""geomag.StreamConverter_test.test_get_geo_from_obs()
The observatory stream containing the observatory traces
''h'', ''d'' or ''e'', ''z'', and ''f'' converts to the geographic
stream containing the traces ''x'', ''y'', ''z'', and ''f''
"""
obs = obspy.core.Stream()
# 1) Call get_geo_from_obs using equal h, e streams with a decbas of 0
# the geographic stream values X, Y will be the same.
obs += __create_trace('H', [1])
obs += __create_trace('E', [1])
obs += __create_trace('Z', [1])
obs += __create_trace('F', [1])
geo = StreamConverter.get_geo_from_obs(obs)
X = geo.select(channel='X')[0].data
Y = geo.select(channel='Y')[0].data
assert_almost_equals(X[0], 1, 9,
'Expect X to almost equal 1', True)
assert_almost_equals(Y[0], 1, 9,
'Expect Y to almost equal 1', True)
# 2) Call get_geo_from_obs using a decbas of 15 degrees, and streams
# with H = [cos(15), cos(30)], and E = [sin(15), sin(30)].
# Expect streams of X = [cos(30), cos(45)] and Y = sin(30), sin(45)
obs = obspy.core.Stream()
DECBAS = 15 * D2I
obs += __create_trace('H', [cos(15 * D2R), cos(30 * D2R)], DECBAS)
obs += __create_trace('E', [sin(15 * D2R), sin(30 * D2R)], DECBAS)
obs += __create_trace('Z', [1, 1], DECBAS)
obs += __create_trace('F', [1, 1], DECBAS)
geo = StreamConverter.get_geo_from_obs(obs)
X = geo.select(channel='X')[0].data
Y = geo.select(channel='Y')[0].data
assert_equals(X, [cos(30 * D2R), cos(45 * D2R)],
'Expect X to equal [cos(30), cos(45)]', True)
assert_equals(Y, [sin(30 * D2R), sin(45 * D2R)],
'Expect Y to equal [sin(30), sin(45)]', True)
def test_get_mag_from_geo():
"""geomag.StreamConverter_test.test_get_mag_from_geo()
The geographic stream containing the traces ''x'', ''y'', ''z'', and
''f'' converts to the magnetic stream containing the traces
''h'', ''d'', ''z'' and ''f''.
"""
geo = obspy.core.Stream()
# Call get_mag_from_geo using a decbas of 15, a X stream of
# [cos(15), cos(30)], and a Y stream of [sin(15), sin(30)].
# Expect a H stream of [1,1] and a D strem of [15 degrees, 30 degrees]
DECBAS = 15 * D2I
geo += __create_trace('X', [cos(15 * D2R), cos(30 * D2R)], DECBAS)
geo += __create_trace('Y', [sin(15 * D2R), sin(30 * D2R)], DECBAS)
geo += __create_trace('Z', [1, 1], DECBAS)
geo += __create_trace('F', [1, 1], DECBAS)
mag = StreamConverter.get_mag_from_geo(geo)
H = mag.select(channel='H')[0].data
D = mag.select(channel='D')[0].data
assert_equals(H, [1, 1],
'Expect H to equal [1,1]', True)
assert_equals(D, [15 * D2R, 30 * D2R],
'Expect D to equal [15 degrees, 30 degrees]', True)
def test_get_mag_from_obs():
"""geomag.StreamConverter_test.test_get_mag_from_obs()
The observatory stream containing the traces ''h'', ''e'' or ''d'',
''z'' and ''f''
"""
obs = obspy.core.Stream()
# Call get_mag_from_obs using a DECBAS of 15 degrees, a H stream of
# [cos(15), cos(30)] and a E stream of [sin(15), sin(30)].
# Expect a H stream of [1, 1] and a D stream of [30 degrees, 45 degrees]
DECBAS = 15 * D2I
obs += __create_trace('H', [cos(15 * D2R), cos(30 * D2R)], DECBAS)
obs += __create_trace('E', [sin(15 * D2R), sin(30 * D2R)], DECBAS)
obs += __create_trace('Z', [1, 1], DECBAS)
obs += __create_trace('F', [1, 1], DECBAS)
mag = StreamConverter.get_mag_from_obs(obs)
H = mag.select(channel='H')[0].data
D = mag.select(channel='D')[0].data
assert_equals(H, [1, 1],
'Expect H to equal [1,1]', True)
assert_equals(D, [30 * D2R, 45 * D2R],
'Expect D to equal [30 degrees, 45 degrees]', True)
def test_get_obs_from_geo():
"""geomag.StreamConverter_test.test_get_obs_from_geo()
The geographic stream containing the traces ''x'', ''y'', ''z'', and
''f'' converts to the observatory stream containing the traces
''h'', ''d'' or ''e'', ''z'', and ''f''.
"""
geo = obspy.core.Stream()
# Call get_geo_from_obs using a decbas of 15, a X stream of
# [cos(30), cos(45)], and a Y stream of [sin(30), sin(45)].
# Expect a H stream of [cos(15), cos(30)] and a
# E stream of [sin(15), sin(30)]
DECBAS = 15 * D2I
geo += __create_trace('X', [cos(30 * D2R), cos(45 * D2R)], DECBAS)
geo += __create_trace('Y', [sin(30 * D2R), sin(45 * D2R)], DECBAS)
geo += __create_trace('Z', [1, 1], DECBAS)
geo += __create_trace('F', [1, 1], DECBAS)
obs = StreamConverter.get_obs_from_geo(geo, True)
H = obs.select(channel='H')[0].data
E = obs.select(channel='E')[0].data
D = obs.select(channel='D')[0].data
assert_almost_equals(H, [cos(15 * D2R), cos(30 * D2R)], 9,
'Expect H to equal [cos(15), cos(30)]', True)
assert_almost_equals(E, [sin(15 * D2R), sin(30 * D2R)], 9,
'Expect E to equal [sin(15), sin(30)', True)
assert_almost_equals(D, [15 * D2R, 30 * D2R], 9,
'Expect D to equal [15 degress, 30 degrees]', True)
def test_get_obs_from_mag():
"""geomag.StreamConverter_test.test_get_obs_from_mag()
The magnetic stream containing the traces ''h'', ''d'', ''z'', and ''f''
converts to the observatory stream containing the traces
''h'', ''e'' and/or ''d'', ''z'', and ''f''
"""
mag = obspy.core.Stream()
# Call get_obs_from_mag using a decbas of 15, a H stream of [1,1],
# and a D stream of [30 degrees, 45 degrees]. Expect a H stream
# of [cos(15), cos(30)], a D stream of [30 degrees, 45 degrees],
# and a E stream of [sin(15), sin(30)]
DECBAS = 15 * D2I
mag += __create_trace('H', [1, 1], DECBAS)
mag += __create_trace('D', [30 * D2R, 45 * D2R], DECBAS)
mag += __create_trace('Z', [1, 1], DECBAS)
mag += __create_trace('F', [1, 1], DECBAS)
obs = StreamConverter.get_obs_from_mag(mag, True)
H = obs.select(channel='H')[0].data
D = obs.select(channel='D')[0].data
E = obs.select(channel='E')[0].data
assert_almost_equals(H, [cos(15 * D2R), cos(30 * D2R)], 9,
'Expect H to equal [cos(15), cos(30)', True)
assert_almost_equals(D, [15 * D2R, 30 * D2R], 9,
'Expect D to equal [15 degrees, 30 degrees', True)
assert_almost_equals(E, [sin(15 * D2R), sin(30 * D2R)], 9,
'Expect E to equal [sin(15), sin(30)', True)
def test_get_obs_from_obs():
"""geomag.StreamConverter_test.test_get_obs_from_obs()
The observatory stream can contain either ''d'' or ''e'' depending
on it's source. get_obs_from_obs will return either or both as part
of the obs Stream.
"""
# 1) Call get_obs_from_obs using a decbas of 15, a H stream of
# [cos(15), cos(30)], and a E stream of [sin(15), sin(30)].
# Expect a D stream of [15 degrees, 30 degrees]
obs_e = obspy.core.Stream()
DECBAS = 15 * D2I
obs_e += __create_trace('H', [cos(15 * D2R), cos(30 * D2R)], DECBAS)
obs_e += __create_trace('E', [sin(15 * D2R), sin(30 * D2R)], DECBAS)
obs_e += __create_trace('Z', [1, 1], DECBAS)
obs_e += __create_trace('F', [1, 1], DECBAS)
obs_D = StreamConverter.get_obs_from_obs(obs_e, False, True)
d = obs_D.select(channel='D')[0].data
assert_almost_equals(d, [15 * D2R, 30 * D2R], 9,
'Expect D to equal [15 degrees, 30 degrees]', True)
# 2) Call get_obs_from_obs using a decbase of 15 degrees, a H stream of
# [cos(15), cos(30)], and a D stream of [15, 30].
# Expect a D stream of [sin(15), sin(30)]
obs_d = obspy.core.Stream()
obs_d += __create_trace('H', [cos(15 * D2R), cos(30 * D2R)], DECBAS)
obs_d += __create_trace('D', [15 * D2R, 30 * D2R], DECBAS)
obs_d += __create_trace('Z', [1, 1], DECBAS)
obs_d += __create_trace('F', [1, 1], DECBAS)
obs_E = StreamConverter.get_obs_from_obs(obs_d, True, False)
e = obs_E.select(channel='E')[0].data
assert_almost_equals(e, [sin(15 * D2R), sin(30 * D2R)], 9,
'Expect E to equal [sin(15), sin(30)', True)
def test_verification_data():
"""
This is a verification test of data done with different
converters, to see if the same result is returned.
Since the small angle approximation was used in the other
converters, AND round off was done differently, we can't
get the exact results.
Change the precision in assert_almost_equals to larger precision
(ie 2 to 8) to see how off the data is. Most are well within
expectations.
"""
DECBAS = 552.7
obs_v = obspy.core.Stream()
obs_v += __create_trace('H',
[20889.55, 20889.57, 20889.74, 20889.86, 20889.91, 20889.81], DECBAS)
obs_v += __create_trace('E',
[-21.10, -20.89, -20.72, -20.57, -20.39, -20.12], DECBAS)
obs_v += __create_trace('Z',
[47565.29, 47565.34, 47565.39, 47565.45, 47565.51, 47565.54], DECBAS)
obs_v += __create_trace('F',
[52485.77, 52485.84, 52485.94, 52486.06, 52486.11, 52486.10], DECBAS)
obs_V = StreamConverter.get_obs_from_obs(obs_v, True, True)
d = obs_V.select(channel='D')[0].data
d = ChannelConverter.get_minutes_from_radians(d)
# Using d values calculated using small angle approximation.
assert_almost_equals(d,
[-3.47, -3.43, -3.40, -3.38, -3.35, -3.31], 2,
'Expect d to equal [-3.47, -3.43, -3.40, -3.38, -3.35, -3.31]', True)
mag = obspy.core.Stream()
DECBAS = 552.7
mag += __create_trace('H',
[20884.04, 20883.45, 20883.38, 20883.43, 20883.07, 20882.76], DECBAS)
d = ChannelConverter.get_radians_from_minutes(
[556.51, 556.52, 556.56, 556.61, 556.65, 556.64])
mag += __create_trace('D', d, DECBAS)
mag += __create_trace('Z',
[48546.90, 48546.80, 48546.80, 48546.70, 48546.80, 48546.90], DECBAS)
mag += __create_trace('F',
[0.10, 0.00, 0.10, 0.00, 0.00, 0.00, 0.00], DECBAS)
geo = StreamConverter.get_geo_from_mag(mag)
X = geo.select(channel='X')[0].data
Y = geo.select(channel='Y')[0].data
assert_almost_equals(X,
[20611.00, 20610.40, 20610.30, 20610.30, 20609.90, 20609.60], 2)
assert_almost_equals(Y,
[3366.00, 3366.00, 3366.20, 3366.50, 3366.70, 3366.60], 1)
def __create_trace(channel, data, decbase=0):
"""
Utility to create a trace containing the given numpy array.
Parameters
----------
channel: string
The name of the trace being created.
data: array
The array to be inserted into the trace.
Returns
-------
obspy.core.Stream
Stream containing the channel.
"""
stats = obspy.core.Stats()
stats.starttime = STARTTIME
stats.sampling_rate = 0.0166666666667
stats.DECBAS = decbase
stats.channel = channel
numpy_data = numpy.array(data, dtype=numpy.float64)
return obspy.core.Trace(numpy_data, stats)
...@@ -6,6 +6,7 @@ import os ...@@ -6,6 +6,7 @@ import os
from geomagio import TimeseriesFactory, TimeseriesFactoryException from geomagio import TimeseriesFactory, TimeseriesFactoryException
from IAGA2002Parser import IAGA2002Parser from IAGA2002Parser import IAGA2002Parser
from IAGA2002Writer import IAGA2002Writer from IAGA2002Writer import IAGA2002Writer
from geomagio import ChannelConverter
# pattern for iaga 2002 file names # pattern for iaga 2002 file names
...@@ -103,14 +104,15 @@ class IAGA2002Factory(TimeseriesFactory): ...@@ -103,14 +104,15 @@ class IAGA2002Factory(TimeseriesFactory):
timeseries = obspy.core.Stream() timeseries = obspy.core.Stream()
for day in days: for day in days:
url = self._get_url(observatory, day, type, interval) url = self._get_url(observatory, day, type, interval)
timeseries += self._parse_url(url) iagaFile = read_url(url)
timeseries += self.parse_file(iagaFile)
# merge channel traces for multiple days # merge channel traces for multiple days
timeseries.merge() timeseries.merge()
# trim to requested start/end time # trim to requested start/end time
timeseries.trim(starttime, endtime) timeseries.trim(starttime, endtime)
return timeseries return timeseries
def _parse_url(self, url): def parse_file(self, iagaFile):
"""Parse the contents of a url to an IAGA2002 file. """Parse the contents of a url to an IAGA2002 file.
Parameters Parameters
...@@ -124,7 +126,7 @@ class IAGA2002Factory(TimeseriesFactory): ...@@ -124,7 +126,7 @@ class IAGA2002Factory(TimeseriesFactory):
parsed data. parsed data.
""" """
parser = IAGA2002Parser() parser = IAGA2002Parser()
parser.parse(read_url(url)) parser.parse(iagaFile)
headers = parser.headers headers = parser.headers
station = headers['IAGA CODE'] station = headers['IAGA CODE']
comments = tuple(parser.comments) comments = tuple(parser.comments)
...@@ -143,6 +145,9 @@ class IAGA2002Factory(TimeseriesFactory): ...@@ -143,6 +145,9 @@ class IAGA2002Factory(TimeseriesFactory):
stats.network = 'IAGA' stats.network = 'IAGA'
stats.station = station stats.station = station
stats.channel = channel stats.channel = channel
if stats.channel == 'D':
data[channel] = ChannelConverter.get_radians_from_minutes(
data[channel])
stream += obspy.core.Trace(data[channel], stats) stream += obspy.core.Trace(data[channel], stats)
return stream return stream
......
from cStringIO import StringIO from cStringIO import StringIO
from geomagio import TimeseriesFactoryException from geomagio import TimeseriesFactoryException, ChannelConverter
import numpy import numpy
import IAGA2002Parser import IAGA2002Parser
import textwrap import textwrap
...@@ -105,6 +105,9 @@ class IAGA2002Writer(object): ...@@ -105,6 +105,9 @@ class IAGA2002Writer(object):
list and order of channel values to output. list and order of channel values to output.
""" """
buf = [] buf = []
if timeseries.select(channel='D'):
d = timeseries.select(channel='D')
d[0].data = ChannelConverter.get_minutes_from_radians(d[0].data)
traces = [timeseries.select(channel=c)[0] for c in channels] traces = [timeseries.select(channel=c)[0] for c in channels]
starttime = float(traces[0].stats.starttime) starttime = float(traces[0].stats.starttime)
delta = traces[0].stats.delta delta = traces[0].stats.delta
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment