Newer
Older
"""
from Algorithm import Algorithm
from AlgorithmException import AlgorithmException
import StreamConverter as StreamConverter
# List of channels by geomagnetic observatory orientation.
# geo represents a geographic north/south orientation
# obs represents the sensor orientation aligned close to the mag orientation
# obsd is the same as obs, but with D(declination) instead of E (e/w vector)
CHANNELS = {
'geo': ['X', 'Y', 'Z', 'F'],
'obs': ['H', 'E', 'Z', 'F'],
'obsd': ['H', 'D', 'Z', 'F']
}
class DeltaFAlgorithm(Algorithm):
"""Algorithm for getting Delta F.
Parameters
----------
informat: str
the code that represents the incoming data form that the Algorithm
will be converting from.
"""
def __init__(self, informat=None):
Algorithm.__init__(self, inchannels=CHANNELS[informat],
outchannels=['G'])
self.informat = informat
def check_stream(self, timeseries):
"""checks a stream to make certain all the required channels
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
exist.
Parameters
----------
timeseries: obspy.core.Stream
stream to be checked.
"""
for channel in self._inchannels:
if len(timeseries.select(channel=channel)) == 0:
raise AlgorithmException(
'Channel %s not found in input' % channel)
def process(self, timeseries):
"""converts a timeseries stream into a different coordinate system
Parameters
----------
informat: string
indicates the input coordinate system.
Returns
-------
out_stream: obspy.core.Stream
new stream object containing the converted coordinates.
"""
self.check_stream(timeseries)
out_stream = None
if self.informat == 'geo':
out_stream = StreamConverter.get_deltaf_from_geo(timeseries)
elif self.informat == 'obs' or self.informat == 'obsd':
out_stream = StreamConverter.get_deltaf_from_obs(timeseries)
return out_stream