Skip to content
Snippets Groups Projects
Commit d5b774d0 authored by Hal Simpson's avatar Hal Simpson
Browse files

Completed comments. Changed _send_trace to _put_trace, and moved...

Completed comments.  Changed _send_trace to _put_trace, and moved RawInputClient initialization and close into _put_trace.  Added routines to change a trace to a masked array, and change channels into int.
parent 400264fb
No related branches found
No related tags found
No related merge requests found
...@@ -17,10 +17,12 @@ from geomagio import TimeseriesFactory, TimeseriesFactoryException ...@@ -17,10 +17,12 @@ from geomagio import TimeseriesFactory, TimeseriesFactoryException
from obspy import earthworm from obspy import earthworm
from ObservatoryMetadata import ObservatoryMetadata from ObservatoryMetadata import ObservatoryMetadata
import numpy.ma as ma import numpy.ma as ma
import sys
HOURSECONDS = 3600 HOURSECONDS = 3600
DAYMINUTES = 1440 DAYMINUTES = 1440
class EdgeFactory(TimeseriesFactory): class EdgeFactory(TimeseriesFactory):
"""TimeseriesFactory for Edge related data. """TimeseriesFactory for Edge related data.
...@@ -30,28 +32,55 @@ class EdgeFactory(TimeseriesFactory): ...@@ -30,28 +32,55 @@ class EdgeFactory(TimeseriesFactory):
a string representing the IP number of the host to connect to. a string representing the IP number of the host to connect to.
port: integer port: integer
the port number the waveserver is listening on. the port number the waveserver is listening on.
observatory: str
the observatory code for the desired observatory.
channels: array
an array of channels {H, D, E, F, Z}
type: str
the data type {variation, quasi-definitive, definitive}
interval: str
the data interval {daily, hourly, minute, second}
observatoryMetadata: ObservatoryMetadata object observatoryMetadata: ObservatoryMetadata object
an ObservatoryMetadata object used to replace the default an ObservatoryMetadata object used to replace the default
ObservatoryMetadata. ObservatoryMetadata.
locationCode: str locationCode: str
the location code for the given edge server, overrides type the location code for the given edge server, overrides type
in get_timeseries/put_timeseries. in get_timeseries/put_timeseries
cwbhost: str
a string represeting the IP number of the cwb host to connect to.
cwbport: int
the port number of the cwb host to connect to.
tag: str
A tag used by edge to log and associate a socket with a given data
source
See Also See Also
-------- --------
TimeseriesFactory TimeseriesFactory
Notes
-----
This is designed to read from any earthworm style waveserver, but it
currently only writes to an edge. Edge mimics an earthworm style
waveserver close enough that we hope to maintain that compatibility
for reading.
""" """
def __init__(self, host=None, port=None, observatory=None, def __init__(self, host=None, port=None, observatory=None,
channels=None, type=None, interval=None, channels=None, type=None, interval=None,
observatoryMetadata=None, locationCode=None, observatoryMetadata=None, locationCode=None,
cwbhost='', cwbport = 0, tag='EdgeFactory'): cwbhost=None, cwbport=0, tag='GeomagAlg'):
TimeseriesFactory.__init__(self, observatory, channels, type, interval) TimeseriesFactory.__init__(self, observatory, channels, type, interval)
self.client = earthworm.Client(host, port) self.client = earthworm.Client(host, port)
self.ric = ric = RawInputClient(tag, host, port, cwbhost, cwbport)
self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata() self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata()
self.locationCode=locationCode self.tag = tag
self.locationCode = locationCode
self.host = host
self.port = port
self.cwbhost = cwbhost or ''
self.cwbport = cwbport
self.ric = None
def get_timeseries(self, starttime, endtime, observatory=None, def get_timeseries(self, starttime, endtime, observatory=None,
channels=None, type=None, interval=None): channels=None, type=None, interval=None):
...@@ -59,17 +88,17 @@ class EdgeFactory(TimeseriesFactory): ...@@ -59,17 +88,17 @@ class EdgeFactory(TimeseriesFactory):
Parameters Parameters
---------- ----------
starttime : obspy.core.UTCDateTime starttime: obspy.core.UTCDateTime
time of first sample. time of first sample.
endtime : obspy.core.UTCDateTime endtime: obspy.core.UTCDateTime
time of last sample. time of last sample.
observatory : str observatory: str
observatory code. observatory code.
channels : array_like channels: array_like
list of channels to load list of channels to load
type : {'variation', 'quasi-definitive'} type: {'variation', 'quasi-definitive', 'definitive'}
data type. data type.
interval : {'minute', 'second'} interval: {'daily', 'hourly', 'minute', 'second'}
data interval. data interval.
Returns Returns
...@@ -102,60 +131,64 @@ class EdgeFactory(TimeseriesFactory): ...@@ -102,60 +131,64 @@ class EdgeFactory(TimeseriesFactory):
return timeseries return timeseries
def put_timeseries(self, timeseries, starttime=None, endtime=None, def put_timeseries(self, timeseries, observatory=None,
observatory = None, channels=None, type=None, interval=None): channels=None, type=None, interval=None):
"""Put timeseries data """Put timeseries data
Parameters Parameters
---------- ----------
observatory : str timeseries: obspy.core.Stream
timeseries object with data to be written
observatory: str
observatory code. observatory code.
starttime : obspy.core.UTCDateTime channels: array_like
time of first sample.
endtime : obspy.core.UTCDateTime
time of last sample.
channels : array_like
list of channels to load list of channels to load
type : {'variation', 'quasi-definitive'} type: {'variation', 'quasi-definitive', 'definitive'}
data type. data type.
interval : {'minute', 'second'} interval: {'daily', 'hourly', 'minute', 'second'}
data interval. data interval.
Returns Notes
------- -----
obspy.core.Stream Streams sent to timeseries are expected to have a single trace per
timeseries object with requested data. channel and that trace should have an ndarray, with nan's
representing gaps.
Raises
------
TimeseriesFactoryException
if invalid values are requested, or errors occur while
retrieving timeseries.
""" """
observatory = observatory or self.observatory observatory = observatory or self.observatory
channels = channels or self.channels channels = channels or self.channels
type = type or self.type type = type or self.type
interval = interval or self.interval interval = interval or self.interval
stats = timeseries[0].stats
starttime = starttime or stats.starttime
endtime = endtime or stats.endtime
if starttime > endtime:
raise TimeseriesFactoryException(
'Starttime before endtime "%s" "%s"' % (starttime, endtime))
for c in channels: self.ric = RawInputClient(self.tag, self.host, self.port,
timeseries.select(channel=c)[0].data = ma.masked_invalid( self.cwbhost, self.cwbport)
timeseries.select(channel=c)[0].data)
for tr in timeseries.select(channel=c).split():
# print tr.data
self._send_trace(tr, observatory, c, type, interval)
# tr.write(tr.id + ".asc", format='SH_ASC')
for channel in channels:
if timeseries.select(channel=channel).count() == 0:
sys.stderr.write('Missing channel %s for output, continuing \n'
% channel)
continue
self._convert_stream_to_masked(timeseries=timeseries,
channel=channel)
for trace in timeseries.select(channel=channel).split():
self._put_trace(trace, observatory, channel, type, interval)
self.ric.close() self.ric.close()
def _send_trace(self, trace, observatory, channel, type, interval): def _put_trace(self, trace, observatory, channel, type, interval):
"""Put trace
Parameters
----------
trace: obspy.core.Stream.Trace
trace object with data to be written
observatory: str
observatory code.
channel: str
channel to be written
type: {'variation', 'quasi-definitive', 'definitive'}
data type.
interval: {'daily', 'hourly', 'minute', 'second'}
data interval.
"""
station = self._get_edge_station(observatory, channel, station = self._get_edge_station(observatory, channel,
type, interval) type, interval)
location = self._get_edge_location(observatory, channel, location = self._get_edge_location(observatory, channel,
...@@ -164,41 +197,34 @@ class EdgeFactory(TimeseriesFactory): ...@@ -164,41 +197,34 @@ class EdgeFactory(TimeseriesFactory):
type, interval) type, interval)
edge_channel = self._get_edge_channel(observatory, channel, edge_channel = self._get_edge_channel(observatory, channel,
type, interval) type, interval)
totalsamps = len(trace.data) totalsamps = len(trace.data)
starttime = trace.stats.starttime starttime = trace.stats.starttime
if self.interval == 'second': if self.interval == 'second':
nsamp = HOURSECONDS nsamp = HOURSECONDS
timeoffset = 1 timeoffset = 1
samplerate = 1.
elif self.interval == 'minute': elif self.interval == 'minute':
nsamp = DAYMINUTES nsamp = DAYMINUTES
timeoffset = 60 timeoffset = 60
samplerate = 1. / 60
print trace.stats.npts
print totalsamps
for i in xrange(0, totalsamps, nsamp): for i in xrange(0, totalsamps, nsamp):
location = self.locationCode or location location = self.locationCode or location
seedname = self.ric.create_seedname(station, seedname = self.ric.create_seedname(station,
edge_channel, location, network) edge_channel, location, network)
if i - totalsamps < nsamp: if totalsamps - i < nsamp:
endsample = totalsamps endsample = totalsamps
else: else:
endsample = i + nsamp endsample = i + nsamp
nsamp = endsample - i nsamp = endsample - i
endtime = starttime + nsamp * timeoffset endtime = starttime + (nsamp - 1) * timeoffset
trace_send = trace.slice(starttime, endtime) trace_send = trace.slice(starttime, endtime)
self._convert_trace_to_int(trace_send) self._convert_trace_to_int(trace_send)
samplerate = 1. / 60 self.ric.send(seedname, nsamp, trace_send.data, starttime,
print seedname samplerate, 0, 0, 0, 0)
print nsamp
print starttime
print endtime
print samplerate
print ' '
self.ric.send(seedname, nsamp, trace_send, starttime,
samplerate, 0, 0, 0, 0)
starttime += nsamp * timeoffset starttime += nsamp * timeoffset
self.ric.forceout(seedname) self.ric.forceout(seedname)
def _convert_trace_to_decimal(self, stream): def _convert_trace_to_decimal(self, stream):
"""convert geomag edge traces stored as ints, to decimal by dividing """convert geomag edge traces stored as ints, to decimal by dividing
...@@ -211,12 +237,36 @@ class EdgeFactory(TimeseriesFactory): ...@@ -211,12 +237,36 @@ class EdgeFactory(TimeseriesFactory):
for trace in stream: for trace in stream:
trace.data = numpy.divide(trace.data, 1000.00) trace.data = numpy.divide(trace.data, 1000.00)
# this doesn't work on ndarray with nan's in it.
# the trace must be a masked array.
def _convert_trace_to_int(self, trace): def _convert_trace_to_int(self, trace):
"""convert geomag edge traces stored as decimal, to ints by multiplying
by 1000
Parameters
----------
stream : obspy.core.stream
a stream retrieved from a geomag edge representing one channel.
Notes
-----
this doesn't work on ndarray with nan's in it.
the trace must be a masked array.
"""
trace.data = numpy.multiply(trace.data, 1000.00) trace.data = numpy.multiply(trace.data, 1000.00)
trace.data = trace.data.astype(int) trace.data = trace.data.astype(int)
def _convert_stream_to_masked(self, timeseries, channel):
"""convert geomag edge traces in a timeseries stream to a MaskedArray
This allows for gaps and splitting.
Parameters
----------
stream : obspy.core.stream
a stream retrieved from a geomag edge representing one channel.
channel: string
the channel to be masked.
"""
for trace in timeseries.select(channel=channel):
trace.data = ma.masked_invalid(trace.data)
def _get_edge_network(self, observatory, channel, type, interval): def _get_edge_network(self, observatory, channel, type, interval):
"""get edge network code. """get edge network code.
...@@ -463,7 +513,6 @@ class EdgeFactory(TimeseriesFactory): ...@@ -463,7 +513,6 @@ class EdgeFactory(TimeseriesFactory):
trace.data = trace.data.filled() trace.data = trace.data.filled()
self._clean_timeseries(timeseries, starttime, endtime) self._clean_timeseries(timeseries, starttime, endtime)
# TODO add in test for missing channel, if so, make it all nans?
def _set_metadata(self, stream, observatory, channel, type, interval): def _set_metadata(self, stream, observatory, channel, type, interval):
"""set metadata for a given stream/channel """set metadata for a given stream/channel
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment