Skip to content
Snippets Groups Projects
Commit 8ea11f0d authored by Hal Simpson's avatar Hal Simpson
Browse files

Refactored code to put one channel at a time, instead of one trace at a time,...

Refactored code to put one channel at a time,  instead of one trace at a time, to fix a problem with writing to edge. Added logic to use cwbhost/cwbport if data is older then 10 days, and the cwbhost/cwbport are provided. Added starttime/endtime to put_timeseries to match algorithm base class. Other cleanup as discovered.
parent e8b72a4f
No related branches found
No related tags found
No related merge requests found
......@@ -16,10 +16,8 @@ from RawInputClient import RawInputClient
from geomagio import TimeseriesFactory, TimeseriesFactoryException
from obspy import earthworm
from ObservatoryMetadata import ObservatoryMetadata
import numpy.ma as ma
HOURSECONDS = 3600
DAYMINUTES = 1440
from datetime import datetime
import numpy.ma
class EdgeFactory(TimeseriesFactory):
......@@ -75,6 +73,7 @@ class EdgeFactory(TimeseriesFactory):
self.observatoryMetadata = observatoryMetadata or ObservatoryMetadata()
self.tag = tag
self.locationCode = locationCode
self.interval = interval
self.host = host
self.port = port
self.cwbhost = cwbhost or ''
......@@ -128,12 +127,10 @@ class EdgeFactory(TimeseriesFactory):
self._post_process(timeseries, starttime, endtime, channels)
print timeseries
return timeseries
def put_timeseries(self, timeseries, observatory=None,
channels=None, type=None, interval=None):
def put_timeseries(self, timeseries, starttime=None, endtime=None,
observatory=None, channels=None, type=None, interval=None):
"""Put timeseries data
Parameters
......@@ -155,10 +152,14 @@ class EdgeFactory(TimeseriesFactory):
channel and that trace should have an ndarray, with nan's
representing gaps.
"""
observatory = observatory or self.observatory
stats = timeseries[0].stats
observatory = observatory or self.observatory or stats.station
channels = channels or self.channels
type = type or self.type
interval = interval or self.interval
type = type or self.type or stats.data_type
interval = interval or self.interval or stats.data_interval
if (starttime == None or endtime == None):
starttime, endtime = self._get_stream_start_end_times(timeseries)
for channel in channels:
if timeseries.select(channel=channel).count() == 0:
......@@ -168,8 +169,8 @@ class EdgeFactory(TimeseriesFactory):
for channel in channels:
self._convert_stream_to_masked(timeseries=timeseries,
channel=channel)
for trace in timeseries.select(channel=channel).split():
self._put_trace(trace, observatory, channel, type, interval)
self._put_channel(timeseries, observatory, channel, type,
interval, starttime, endtime)
def _clean_timeseries(self, timeseries, starttime, endtime):
"""Realigns timeseries data so the start and endtimes are the same
......@@ -243,7 +244,7 @@ class EdgeFactory(TimeseriesFactory):
the channel to be masked.
"""
for trace in timeseries.select(channel=channel):
trace.data = ma.masked_invalid(trace.data)
trace.data = numpy.ma.masked_invalid(trace.data)
def _create_missing_channel(self, starttime, endtime, observatory,
channel, type, interval, network, station, location):
......@@ -484,6 +485,30 @@ class EdgeFactory(TimeseriesFactory):
observatory, channel, type, interval)
return data
def _get_stream_start_end_times(self, timeseries):
"""get start and end times from a stream.
Traverses all traces, and find the earliest starttime, and
the latest endtime.
Parameters
----------
timeseries: obspy.core.stream
The timeseries stream
Returns
-------
tuple: (starttime, endtime)
starttime: UTCDateTime
endtime: UTCDateTime
"""
starttime = UTCDateTime(datetime.now())
endtime = UTCDateTime(0)
for trace in timeseries:
if trace.stats.starttime < starttime:
starttime = trace.stats.starttime
if trace.stats.endtime > endtime:
endtime = trace.stats.endtime
return (starttime, endtime)
def _post_process(self, timeseries, starttime, endtime, channels):
"""Post process a timeseries stream after the raw data is
is fetched from a waveserver. Specifically changes
......@@ -512,21 +537,28 @@ class EdgeFactory(TimeseriesFactory):
self._clean_timeseries(timeseries, starttime, endtime)
def _put_trace(self, trace, observatory, channel, type, interval):
"""Put trace
def _put_channel(self, timeseries, observatory, channel, type, interval,
starttime, endtime):
"""Put a channel worth of data
Parameters
----------
trace: obspy.core.Stream.Trace
trace object with data to be written
timeseries: obspy.core.Stream
timeseries object with data to be written
observatory: str
observatory code.
channel: str
channel to be written
channel to load
type: {'variation', 'quasi-definitive', 'definitive'}
data type.
interval: {'daily', 'hourly', 'minute', 'second'}
data interval.
starttime: UTCDateTime
endtime: UTCDateTime
Notes
-----
RawInputClient seems to only work when sockets are
"""
station = self._get_edge_station(observatory, channel,
type, interval)
......@@ -537,35 +569,24 @@ class EdgeFactory(TimeseriesFactory):
edge_channel = self._get_edge_channel(observatory, channel,
type, interval)
self.ric = RawInputClient(self.tag, self.host, self.port,
self.cwbhost, self.cwbport)
totalsamps = len(trace.data)
starttime = trace.stats.starttime
if self.interval == 'second':
nsamp = HOURSECONDS
timeoffset = 1
samplerate = 1.
elif self.interval == 'minute':
nsamp = DAYMINUTES
timeoffset = 60
samplerate = 1. / 60
for i in xrange(0, totalsamps, nsamp):
location = self.locationCode or location
seedname = self.ric.create_seedname(station,
edge_channel, location, network)
if totalsamps - i < nsamp:
endsample = totalsamps
else:
endsample = i + nsamp
nsamp = endsample - i
endtime = starttime + (nsamp - 1) * timeoffset
trace_send = trace.slice(starttime, endtime)
now = UTCDateTime(datetime.utcnow())
if ((now - endtime) > 864000) and (self.cwbport > 0):
host = self.cwbhost
port = self.cwbport
else:
host = self.host
port = self.port
self.ric = RawInputClient(self.tag, host, port)
seedname = self.ric.create_seedname(station,
edge_channel, location, network)
for trace in timeseries.select(channel=channel).split():
trace.trim(starttime, endtime)
trace_send = trace.copy()
self._convert_trace_to_int(trace_send)
self.ric.send(seedname, nsamp, trace_send.data, starttime,
samplerate, 0, 0, 0, 0)
starttime += nsamp * timeoffset
self.ric.send_trace(seedname, interval, trace_send)
self.ric.forceout(seedname)
self.ric.close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment