Skip to content
Snippets Groups Projects
Commit 8b4f9b06 authored by Hal Simpson's avatar Hal Simpson
Browse files

Removed cwbhost/cwbport logic from code, will let EdgeFactory deal with that....

Removed cwbhost/cwbport logic from code, will let EdgeFactory deal with that. Rewrote the get_tag routine to use strings, which only work if the string is spliced to exactly 12 characters.  Rewrote the get_tag routine to remove the sequence code.  Rewrote the sequence code for data packets to use class variables. Other cleanup, especially comments.
parent 8ea11f0d
No related branches found
No related tags found
No related merge requests found
......@@ -8,26 +8,31 @@ Input Client for Edge/CWB/QueryMom.
GNU General Public License (GPLv2)
(http://www.gnu.org/licenses/old-licenses/gpl-2.0.html)
"""
# ensure geomag is on the path before importing
try:
import geomagio # noqa (tells linter to ignor this line.)
except:
from os import path
import sys
script_dir = path.dirname(path.abspath(__file__))
sys.path.append(path.normpath(path.join(script_dir, '../..')))
from obspy.core.utcdatetime import UTCDateTime
from datetime import datetime
from geomagio import TimeseriesFactoryException
import struct
import socket # noqa
import sys
from time import sleep
"""
MAXINPUTSIZE: Edge uses a short for the data count, so the max input size
for a single data packet to edge is 32767
HOURSECONDS: The number of seconds in an hour. Used as an arbitrary size
for sending seconds data.
DAYMINUTES: The numbers of minutes in a day. Used as the size for sending
minute data, since Edge stores by the day.
"""
MAXINPUTSIZE = 32767
HOURSECONDS = 3600
DAYMINUTES = 1440
class RawInputClient():
Sequence = 0
Seedname = ''
"""RawInputClient for direct to edge data.
Parameters
----------
......@@ -54,20 +59,12 @@ class RawInputClient():
Uses sockets to send data to an edge. See send method for packet encoding
"""
def __init__(self, tag='', host='', port=0, cwbhost='', cwbport=0):
def __init__(self, tag='', host='', port=0):
self.tag = tag
self.host = host
self.port = port
self.cwbhost = cwbhost or ''
self.cwbport = cwbport
self.socket = None
self.cwbsocket = None
self.buf = None
self.seq = 0
self.tagseq = 0
self.now = UTCDateTime(datetime.utcnow())
self.dummy = UTCDateTime(datetime.utcnow())
self.timeout = 10
self.seedname = ''
if len(self.tag) > 10:
......@@ -80,9 +77,6 @@ class RawInputClient():
if self.socket is not None:
self.socket.close()
self.socket = None
if self.cwbsocket is not None:
self.cwbsocket.close()
self.cwbsocket = None
def forceout(self, seedname):
""" force edge to recognize data
......@@ -99,7 +93,8 @@ class RawInputClient():
Fourceout tells edge that we're done sending data for now, and
to go ahead and make it available
"""
self.send(seedname, -2, [], self.dummy, 0., 0, 0, 0, 0)
self.send(seedname, [], UTCDateTime(datetime.utcnow()),
0., 0, 0, 0, 0, True)
def create_seedname(self, observatory, channel, location='R0',
network='NT'):
......@@ -130,26 +125,88 @@ class RawInputClient():
"""
return network + observatory.ljust(5) + channel + location
def send(self, seedname, nsamp, samples, time, rate,
activity=0, ioclock=0, quality=0, timingquality=0):
def send_trace(self, seedname, interval, trace):
"""send an obspy trace using send.
PARAMETERS
----------
seedname: str
the seedname for the trace
interval: {'daily', 'hourly', 'minute', 'second'}
data interval.
trace: obspy.core.trace
NOTES
-----
Edge only takes a short as the max number of samples it takes at one
time. For ease of calculation, we break a trace into managable chunks
according to interval type.
"""
totalsamps = len(trace.data)
starttime = trace.stats.starttime
if interval == 'second':
nsamp = HOURSECONDS
timeoffset = 1
samplerate = 1.
elif interval == 'minute':
nsamp = DAYMINUTES
timeoffset = 60
samplerate = 1. / 60
elif interval == 'hourly':
nsamp = MAXINPUTSIZE
timeoffset = 3600
samplerate = 1. / 3600
elif interval == 'daily':
nsamp = MAXINPUTSIZE
timeoffset = 86400
samplerate = 1. / 86400
else:
raise TimeseriesFactoryException(
'Unsupported interval for RawInputClient')
for i in xrange(0, totalsamps, nsamp):
if totalsamps - i < nsamp:
endsample = totalsamps
else:
endsample = i + nsamp
nsamp = endsample - i
endtime = starttime + (nsamp - 1) * timeoffset
trace_send = trace.slice(starttime, endtime)
self.send(seedname, trace_send.data, starttime,
samplerate, 0, 0, 0, 0)
starttime += nsamp * timeoffset
def send(self, seedname, samples, time, rate,
activity=0, ioclock=0, quality=0, timingquality=0,
forceout=False):
""" Send a block of data to the Edge/CWB combination.
PARAMETERS
----------
seedname: The 12 character seedname of the channel
NNSSSSSCCCLL fixed format
nsamp The number of data samples (negative will force buffer clear)
samples: An int array with the samples
time: With the time of the first sample as a UTCDateTime
rate: The data rate in Hertz
activity: The activity flags per the SEED manual
ioClock: The IO/CLOCK flags per the SEED manual
quality: The data Quality flags per the SEED manual
timingQuality: The overall timing quality (must be 0-100)
seedname: str
The 12 character seedname of the channel NNSSSSSCCCLL fixed format
samples: array like
An int array with the samples
time: UTCDateTime
time of the first sample
rate: int
The data rate in Hertz
activity: int
The activity flags per the SEED manual
ioClock: int
The IO/CLOCK flags per the SEED manual
quality: int
The data Quality flags per the SEED manual
timingQuality: int [0-100]
The overall timing quality
forceout: boolean
indicates the packet to be sent will have a nsamp value of -1,
to tell edge to force the data to be written
Raises
------
UnknownHostException - if the socket will not open
TimeseriesFactoryException - if the socket will not open
NOTES
-----
......@@ -176,14 +233,15 @@ class RawInputClient():
-1 is for a tag packet (see _get_next_tag method)
-2 is for a force out packet (see forceout method)
"""
nsamp = len(samples)
if nsamp > 32767:
raise TimeseriesFactoryException(
'Edge input limited to 32767 integers per packet.')
# If this is a new channel, reset sequence.
if self.seedname != seedname:
self.seedname = seedname
self.seq = 0
if RawInputClient.Seedname != seedname:
RawInputClient.Seedname = seedname
RawInputClient.Sequence = 0
# Get time parameters for packet
yr = time.year
......@@ -204,36 +262,30 @@ class RawInputClient():
ratedivisor = -10000
# Construct the packet to be sent.
packStr = '!1H1h12s4h4B3i'
if nsamp > 0:
packStr = '%s%d%s' % (packStr, nsamp, 'i')
buf = struct.pack(packStr, 0xa1b2, nsamp, seedname, yr, doy,
ratemantissa, ratedivisor, activity, ioclock,
quality, timingquality, secs, usecs, self.seq,
*samples)
packStr = '!1H1h12s4h4B3i0i'
if forceout:
buf = struct.pack(packStr, 0xa1b2, -1, seedname, yr, doy,
ratemantissa, ratedivisor, activity, ioclock, quality,
timingquality, secs, usecs, RawInputClient.Sequence)
else:
packStr = '%s%d%s' % (packStr, nsamp, 'i')
buf = struct.pack(packStr, 0xa1b2, nsamp, seedname, yr, doy,
ratemantissa, ratedivisor, activity, ioclock,
quality, timingquality, secs, usecs, self.seq)
self.seq += 1
ratemantissa, ratedivisor, activity, ioclock, quality,
timingquality, secs, usecs, RawInputClient.Sequence,
*samples)
RawInputClient.Sequence +=1
# Try and send the packet, if the socket doesn't exist open it.
try:
# Older then 10 days, send to cwb.
if (abs(self.now.timestamp - time.timestamp) > 864000 and
self.cwbport > 0):
if self.cwbsocket is None:
self.cwbsocket = self._open_socket()
self.cwbsocket.sendall(self._get_next_tag())
self.cwbsocket.sendall(buf)
# Realtime data send to edge.
else:
if self.socket is None:
self.socket = self._open_socket()
self.socket.sendall(self._get_next_tag())
self.socket.sendall(buf)
if self.socket is None:
self.socket = self._open_socket()
self.socket.sendall(self._get_tag())
self.socket.sendall(buf)
except socket.error, v:
sys.stderr.write('sockect error %d' % v[0])
error = 'Socket error %d' % v[0]
sys.stderr.write(error)
raise TimeseriesFactoryException(error)
def _open_socket(self):
"""Open a socket
......@@ -248,6 +300,7 @@ class RawInputClient():
"""
done = False
newsocket = None
trys = 0
while not done:
try:
newsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
......@@ -257,10 +310,13 @@ class RawInputClient():
sys.stderr.write('Could not connect to socket, trying again')
sys.stderr.write('sockect error %d' % v[0])
sleep(1)
if trys > 2:
raise TimeseriesFactoryException('Could not open socket')
trys += 1
return newsocket
def _get_next_tag(self):
"""Get the next tag name
def _get_tag(self):
"""Get tag struct
RETURNS
-------
......@@ -271,17 +327,11 @@ class RawInputClient():
The tag packet is used to by the edge server to log/determine a new
"user" has connected to the edge, not one who's connection dropped,
and is trying to continue sending data.
This routine adds a number to the tag provided by the user. It's
probably unneeded.
The packet uses -1 in the nsamp position to indicate it's a tag packet
The tag is right padded with spaces.
The Packet is right padded with zeros
The Packet must be 40 Bytes long.
"""
tmp = []
tmp.append(self.tag)
tmp.append(str(self.tagseq))
tmp.append(' ')
tg = ''.join(tmp)
zeros = [0] * 6
tb = struct.pack('!H1h12s6i', 0xa1b2, -1, tg, *zeros)
if self.tagseq > 99:
self.tagseq = 0
self.tagseq += 1
tg = self.tag + ' '
tb = struct.pack('!1H1h12s6i', 0xa1b2, -1, tg[:12], 0, 0, 0, 0, 0, 0)
return tb
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment