Skip to content
Snippets Groups Projects
Commit 9b93a82d authored by Hal Simpson's avatar Hal Simpson
Browse files

Refactored to reflect the change to write out an entire channel before closing...

Refactored to reflect the change to write out an entire channel before closing socket. Also divided out buffer creation, from sending buffer.
parent 875ad62f
No related branches found
No related tags found
No related merge requests found
...@@ -29,13 +29,25 @@ MAXINPUTSIZE = 32767 ...@@ -29,13 +29,25 @@ MAXINPUTSIZE = 32767
HOURSECONDS = 3600 HOURSECONDS = 3600
DAYMINUTES = 1440 DAYMINUTES = 1440
"""
PACKSTR, TAGSTR: String's used by pack.struct, to indicate the data format
for that packet.
PACKEHEAD: The code that leads a packet being sent to Edge.
"""
PACKSTR = '!1H1h12s4h4B3i'
TAGSTR = '!1H1h12s6i'
PACKETHEAD = 0xa1b2
"""
TAG, FORCEOUT: Flags that indicate to edge that a "data" packet has a specific
function. Goes in the nsamp position of the packet header.
"""
TAG = -1 TAG = -1
FORCEOUT = -2 FORCEOUT = -2
class RawInputClient(): class RawInputClient():
Sequence = 0
Seedname = ''
"""RawInputClient for direct to edge data. """RawInputClient for direct to edge data.
Parameters Parameters
---------- ----------
...@@ -46,12 +58,22 @@ class RawInputClient(): ...@@ -46,12 +58,22 @@ class RawInputClient():
The IP address of the target host RawInputServer The IP address of the target host RawInputServer
port: int port: int
The port on the IP of the RawInputServer The port on the IP of the RawInputServer
cwbhost: str station: str
The host to use for data out of the 10 day window station code.
cwbport: int channel: str
The port to use for data out of the 10 day window channel to be written
location: str
Note: cwbhost/cwbport are not currently used for geomag data location code
network: str
network code
activity: int
The activity flags per the SEED manual
ioClock: int
The IO/CLOCK flags per the SEED manual
quality: int
The data Quality flags per the SEED manual
timingQuality: int [0-100]
The overall timing quality
Raises Raises
------ ------
...@@ -62,13 +84,23 @@ class RawInputClient(): ...@@ -62,13 +84,23 @@ class RawInputClient():
Uses sockets to send data to an edge. See send method for packet encoding Uses sockets to send data to an edge. See send method for packet encoding
""" """
def __init__(self, tag='', host='', port=0): def __init__(self, tag='', host='', port=0, station='', channel='',
location='', network='', activity=0, ioclock=0, quality=0,
timingquality=0):
self.tag = tag self.tag = tag
self.host = host self.host = host
self.port = port self.port = port
self.activity = activity
self.ioclock = ioclock
self.quality = quality
self.timingquality = timingquality
self.socket = None self.socket = None
self.buf = None self.buf = None
self.seedname = '' self.sequence = 0
self.seedname = self.create_seedname(station, channel,
location, network)
if len(self.tag) > 10: if len(self.tag) > 10:
raise TimeseriesFactoryException( raise TimeseriesFactoryException(
...@@ -81,24 +113,6 @@ class RawInputClient(): ...@@ -81,24 +113,6 @@ class RawInputClient():
self.socket.close() self.socket.close()
self.socket = None self.socket = None
def forceout(self, seedname):
""" force edge to recognize data
PARAMETERS
----------
seedname: str
The seedname of the data
NOTES
-----
When sending data to edge it hangs on to the data, until either
enough data has accumulated, or enough time has passed. At that
point, it makes the new data available for reading.
Fourceout tells edge that we're done sending data for now, and
to go ahead and make it available
"""
self.send(seedname, [], UTCDateTime(datetime.utcnow()),
0., 0, 0, 0, 0, True)
def create_seedname(self, observatory, channel, location='R0', def create_seedname(self, observatory, channel, location='R0',
network='NT'): network='NT'):
"""create a seedname for communication with edge. """create a seedname for communication with edge.
...@@ -128,13 +142,23 @@ class RawInputClient(): ...@@ -128,13 +142,23 @@ class RawInputClient():
""" """
return network + observatory.ljust(5) + channel + location return network + observatory.ljust(5) + channel + location
def send_trace(self, seedname, interval, trace): def forceout(self):
""" force edge to recognize data
NOTES
-----
When sending data to edge it hangs on to the data, until either
enough data has accumulated, or enough time has passed. At that
point, it makes the new data available for reading.
Fourceout tells edge that we're done sending data for now, and
to go ahead and make it available
"""
self._send([], UTCDateTime(datetime.utcnow()), 0., True)
def send_trace(self, interval, trace):
"""send an obspy trace using send. """send an obspy trace using send.
PARAMETERS PARAMETERS
---------- ----------
seedname: str
the seedname for the trace
interval: {'daily', 'hourly', 'minute', 'second'} interval: {'daily', 'hourly', 'minute', 'second'}
data interval. data interval.
trace: obspy.core.trace trace: obspy.core.trace
...@@ -176,40 +200,103 @@ class RawInputClient(): ...@@ -176,40 +200,103 @@ class RawInputClient():
nsamp = endsample - i nsamp = endsample - i
endtime = starttime + (nsamp - 1) * timeoffset endtime = starttime + (nsamp - 1) * timeoffset
trace_send = trace.slice(starttime, endtime) trace_send = trace.slice(starttime, endtime)
self.send(seedname, trace_send.data, starttime, self._send(trace_send.data, starttime, samplerate)
samplerate, 0, 0, 0, 0)
starttime += nsamp * timeoffset starttime += nsamp * timeoffset
def send(self, seedname, samples, time, rate, def _send(self, samples, time, rate, forceout=False):
activity=0, ioclock=0, quality=0, timingquality=0,
forceout=False):
""" Send a block of data to the Edge/CWB combination. """ Send a block of data to the Edge/CWB combination.
PARAMETERS PARAMETERS
---------- ----------
seedname: str
The 12 character seedname of the channel NNSSSSSCCCLL fixed format
samples: array like samples: array like
An int array with the samples An int array with the samples
time: UTCDateTime time: UTCDateTime
time of the first sample time of the first sample
rate: int rate: int
The data rate in Hertz The data rate in Hertz
activity: int
The activity flags per the SEED manual
ioClock: int
The IO/CLOCK flags per the SEED manual
quality: int
The data Quality flags per the SEED manual
timingQuality: int [0-100]
The overall timing quality
forceout: boolean forceout: boolean
indicates the packet to be sent will have a nsamp value of -1, indicates the packet to be sent will have a nsamp value of -2,
to tell edge to force the data to be written to tell edge to force the data to be written
Raises Raises
------ ------
TimeseriesFactoryException - if the socket will not open TimeseriesFactoryException - if the socket will not open
"""
if forceout:
buf = self._get_forceout(time, rate)
else:
buf = self._get_data(samples, time, rate)
# Try and send the packet, if the socket doesn't exist open it.
try:
if self.socket is None:
self._open_socket()
self.socket.sendall(buf)
self.sequence += 1
except socket.error, v:
error = 'Socket error %d' % v[0]
sys.stderr.write(error)
raise TimeseriesFactoryException(error)
def _get_forceout(self, time, rate):
"""
PARAMETERS
----------
time: UTCDateTime
time of the first sample
rate: int
The data rate in Hertz
RETURNS
-------
str
NOTES
-----
Data is encoded into a C style structure using struct.pack with the
following variables and type.
0xa1b2 (short)
nsamp (short)
seedname (12 char)
yr (short)
doy (short)
ratemantissa (short)
ratedivisor (short)
activity (byte)
ioClock (byte)
quality (byte)
timeingQuality (byte)
secs (int)
seq (int) Seems to be the miniseed sequence, but not certain.
basically we increment it for every new packet we send
The nsamp parameter is signed.
-2 is for a force out packet
"""
yr, doy, secs, usecs = self._get_time_values(time)
ratemantissa, ratedivisor = self._get_mantissa_divisor(rate)
buf = struct.pack(PACKSTR, PACKETHEAD, FORCEOUT, self.seedname, yr,
doy, ratemantissa, ratedivisor, self.activity, self.ioclock,
self.quality, self.timingquality, secs, usecs,
self.sequence)
return buf
def _get_data(self, samples, time, rate):
"""
PARAMETERS
----------
samples: array like
An int array with the samples
time: UTCDateTime
time of the first sample
rate: int
The data rate in Hertz
RETURNS
-------
str
NOTES NOTES
----- -----
...@@ -233,27 +320,37 @@ class RawInputClient(): ...@@ -233,27 +320,37 @@ class RawInputClient():
Notice that we expect the data to already be ints. Notice that we expect the data to already be ints.
The nsamp parameter is signed. If it's positive we send a data packet. The nsamp parameter is signed. If it's positive we send a data packet.
-1 is for a tag packet (see _get_next_tag method)
-2 is for a force out packet (see forceout method)
""" """
nsamp = len(samples) nsamp = len(samples)
if nsamp > 32767: if nsamp > 32767:
raise TimeseriesFactoryException( raise TimeseriesFactoryException(
'Edge input limited to 32767 integers per packet.') 'Edge input limited to 32767 integers per packet.')
# If this is a new channel, reset sequence. yr, doy, secs, usecs = self._get_time_values(time)
if RawInputClient.Seedname != seedname: ratemantissa, ratedivisor = self._get_mantissa_divisor(rate)
RawInputClient.Seedname = seedname
RawInputClient.Sequence = 0
# Get time parameters for packet packStr = '%s%d%s' % (PACKSTR, nsamp, 'i')
yr = time.year buf = struct.pack(packStr, PACKETHEAD, nsamp, self.seedname, yr, doy,
doy = time.datetime.timetuple().tm_yday ratemantissa, ratedivisor, self.activity, self.ioclock,
secs = time.hour * 3600 + time.minute * 60 + time.second self.quality, self.timingquality, secs, usecs, self.sequence,
usecs = time.microsecond *samples)
# Calculate ratemantissa and ratedivisor for edge. return buf
# force one minute data to be -60 and 1
def _get_mantissa_divisor(self, rate):
"""
PARAMETERS
----------
rate: int
The data rate in Hertz
RETURNS
-------
tuple: (ratemantissa, ratedivosor)
ratemantissa: int
ratedivosor: int
"""
if rate > 0.9999: if rate > 0.9999:
ratemantissa = int(rate * 100 + 0.001) ratemantissa = int(rate * 100 + 0.001)
ratedivisor = -100 ratedivisor = -100
...@@ -264,32 +361,51 @@ class RawInputClient(): ...@@ -264,32 +361,51 @@ class RawInputClient():
ratemantissa = int(rate * 10000. + 0.001) ratemantissa = int(rate * 10000. + 0.001)
ratedivisor = -10000 ratedivisor = -10000
# Construct the packet to be sent. return (ratemantissa, ratedivisor)
packStr = '!1H1h12s4h4B3i'
if forceout:
buf = struct.pack(packStr, 0xa1b2, FORCEOUT, seedname, yr,
doy, ratemantissa, ratedivisor, activity, ioclock,
quality, timingquality, secs, usecs,
RawInputClient.Sequence)
else:
packStr = '%s%d%s' % (packStr, nsamp, 'i')
buf = struct.pack(packStr, 0xa1b2, nsamp, seedname, yr, doy,
ratemantissa, ratedivisor, activity, ioclock, quality,
timingquality, secs, usecs, RawInputClient.Sequence,
*samples)
RawInputClient.Sequence +=1 def _get_tag(self):
"""Get tag struct
# Try and send the packet, if the socket doesn't exist open it. RETURNS
try: -------
if self.socket is None: str
self.socket = self._open_socket()
self.socket.sendall(self._get_tag()) NOTES
self.socket.sendall(buf) -----
except socket.error, v: The tag packet is used to by the edge server to log/determine a new
error = 'Socket error %d' % v[0] "user" has connected to the edge, not one who's connection dropped,
sys.stderr.write(error) and is trying to continue sending data.
raise TimeseriesFactoryException(error) The packet uses -1 in the nsamp position to indicate it's a tag packet
The tag is right padded with spaces.
The Packet is right padded with zeros
The Packet must be 40 Bytes long.
"""
tg = self.tag + ' '
tb = struct.pack(TAGSTR, PACKETHEAD, TAG, tg[:12],
0, 0, 0, 0, 0, 0)
return tb
def _get_time_values(self, time):
"""
PARAMETERS
----------
time: UTCDateTime
time of the first sample
RETURNS
-------
tuple: (yr, doy, secs, usecs)
yr: int
doy: int
secs: int
usecs: int
"""
yr = time.year
doy = time.datetime.timetuple().tm_yday
secs = time.hour * 3600 + time.minute * 60 + time.second
usecs = time.microsecond
return (yr, doy, secs, usecs)
def _open_socket(self): def _open_socket(self):
"""Open a socket """Open a socket
...@@ -317,26 +433,5 @@ class RawInputClient(): ...@@ -317,26 +433,5 @@ class RawInputClient():
if trys > 2: if trys > 2:
raise TimeseriesFactoryException('Could not open socket') raise TimeseriesFactoryException('Could not open socket')
trys += 1 trys += 1
return newsocket self.socket = newsocket
self.socket.sendall(self._get_tag())
def _get_tag(self):
"""Get tag struct
RETURNS
-------
str
NOTES
-----
The tag packet is used to by the edge server to log/determine a new
"user" has connected to the edge, not one who's connection dropped,
and is trying to continue sending data.
The packet uses -1 in the nsamp position to indicate it's a tag packet
The tag is right padded with spaces.
The Packet is right padded with zeros
The Packet must be 40 Bytes long.
"""
tg = self.tag + ' '
tb = struct.pack('!1H1h12s6i', 0xa1b2, TAG, tg[:12],
0, 0, 0, 0, 0, 0)
return tb
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment