Newer
Older
from __future__ import unicode_literals
from builtins import range, str
Hal Simpson
committed
import socket # noqa
Hal Simpson
committed
import sys
from ..TimeseriesFactoryException import TimeseriesFactoryException
from ..TimeseriesUtility import round_usecs
from time import sleep
Hal Simpson
committed
Hal Simpson
committed
"""
MAXINPUTSIZE: Edge uses a short for the data count, so the max input size
for a single data packet to edge is *supposed* to be 32767. However, this
was never tested thoroughly until we added support for 10Hz data, when
it was quickly discovered that setting this value caused Edge to hang.
Therefore, MAXINPUTSIZE was reduced to 10_000, which seems to work well.
Hal Simpson
committed
HOURSECONDS: The number of seconds in an hour. Used as an arbitrary size
for sending seconds data.
DAYMINUTES: The numbers of minutes in a day. Used as the size for sending
minute data, since Edge stores by the day.
"""
Hal Simpson
committed
HOURSECONDS = 3600
DAYMINUTES = 1440
Hal Simpson
committed
Hal Simpson
committed
"""
PACKSTR, TAGSTR: String's used by pack.struct, to indicate the data format
for that packet.
PACKEHEAD: The code that leads a packet being sent to Edge.
"""
PACKSTR = "!1H1h12s4h4B3i"
TAGSTR = "!1H1h12s6i"
PACKETHEAD = 0xA1B2
Hal Simpson
committed
"""
TAG, FORCEOUT: Flags that indicate to edge that a "data" packet has a specific
function. Goes in the nsamp position of the packet header.
"""
TAG = -1
FORCEOUT = -2
Hal Simpson
committed
"""RawInputClient for direct to edge data.
Parameters
----------
tag: str
A string used by edge to make certain a socket hasn't been
opened by a different user, and to log transactions.
host: str
The IP address of the target host RawInputServer
port: int
The port on the IP of the RawInputServer
Hal Simpson
committed
station: str
station code.
channel: str
channel to be written
location: str
location code
network: str
network code
activity: int
The activity flags per the SEED manual
ioClock: int
The IO/CLOCK flags per the SEED manual
quality: int
The data Quality flags per the SEED manual
timingQuality: int [0-100]
The overall timing quality
Raises
------
TimeseriesFactoryException
NOTES
-----
Uses sockets to send data to an edge. See send method for packet encoding
Hal Simpson
committed
"""
def __init__(
self,
tag="",
host="",
port=0,
station="",
channel="",
location="",
network="",
activity=0,
ioclock=0,
quality=0,
timingquality=0,
):
Hal Simpson
committed
self.tag = tag
self.host = host
self.port = port
Hal Simpson
committed
self.activity = activity
self.ioclock = ioclock
self.quality = quality
self.timingquality = timingquality
Hal Simpson
committed
self.socket = None
self.buf = None
Hal Simpson
committed
self.sequence = 0
self.seedname = self.create_seedname(station, channel, location, network)
Hal Simpson
committed
if len(self.tag) > 10:
raise TimeseriesFactoryException("Tag limited to 10 characters")
Hal Simpson
committed
def close(self):
"""close the open sockets"""
Hal Simpson
committed
if self.socket is not None:
self.socket.close()
self.socket = None
Hal Simpson
committed
def create_seedname(self, observatory, channel, location="R0", network="NT"):
"""create a seedname for communication with edge.
Hal Simpson
committed
PARAMETERS
----------
observatory: str
observatory code.
channel: str
channel to be written
location: str
location code
network: str
network code
RETURNS
-------
str
the seedname
NOTES
-----
The seedname is in the form NNSSSSSCCCLL if a parameter is not
of the correct length, it should be padded with spaces to be of
the correct length. We only expect observatory to ever be of an
incorrect length.
Hal Simpson
committed
"""
return str(network + observatory.ljust(5) + channel + location).encode()
Hal Simpson
committed
def forceout(self):
"""force edge to recognize data
Hal Simpson
committed
NOTES
-----
When sending data to edge it hangs on to the data, until either
enough data has accumulated, or enough time has passed. At that
point, it makes the new data available for reading.
Fourceout tells edge that we're done sending data for now, and
to go ahead and make it available
"""
buf = self._get_forceout(UTCDateTime(datetime.utcnow()), 0.0)
self._send(buf)
Hal Simpson
committed
def send_trace(self, interval, trace):
Hal Simpson
committed
"""send an obspy trace using send.
PARAMETERS
----------
interval: {'day', 'hour', 'minute', 'second', 'tenhertz'}
Hal Simpson
committed
data interval.
trace: obspy.core.trace
NOTES
-----
Edge only takes a short as the max number of samples it takes at one
time. For ease of calculation, we break a trace into managable chunks
according to interval type.
"""
totalsamps = len(trace.data)
starttime = trace.stats.starttime
if interval == "tenhertz":
nsamp = MAXINPUTSIZE
timeoffset = 1 / 10.0
samplerate = 10.0
elif interval == "second":
Hal Simpson
committed
nsamp = HOURSECONDS
timeoffset = 1
samplerate = 1.0
elif interval == "minute":
Hal Simpson
committed
nsamp = DAYMINUTES
timeoffset = 60
elif interval == "hour":
Hal Simpson
committed
nsamp = MAXINPUTSIZE
timeoffset = 3600
elif interval == "day":
Hal Simpson
committed
nsamp = MAXINPUTSIZE
timeoffset = 86400
Hal Simpson
committed
else:
raise TimeseriesFactoryException("Unsupported interval for RawInputClient")
Hal Simpson
committed
for i in range(0, totalsamps, nsamp):
Hal Simpson
committed
if totalsamps - i < nsamp:
endsample = totalsamps
else:
endsample = i + nsamp
nsamp = endsample - i
endtime = starttime + (nsamp - 1) * timeoffset
trace_send = trace.slice(starttime, endtime)
buf = self._get_data(trace_send.data, starttime, samplerate)
self._send(buf)
Hal Simpson
committed
starttime += nsamp * timeoffset
def _send(self, buf):
"""Send a block of data to the Edge/CWB combination.
Hal Simpson
committed
PARAMETERS
----------
Hal Simpson
committed
samples: array like
An int array with the samples
time: UTCDateTime
time of the first sample
rate: int
The data rate in Hertz
forceout: boolean
Hal Simpson
committed
indicates the packet to be sent will have a nsamp value of -2,
Hal Simpson
committed
to tell edge to force the data to be written
Raises
------
Hal Simpson
committed
TimeseriesFactoryException - if the socket will not open
Hal Simpson
committed
"""
# Try and send the packet, if the socket doesn't exist open it.
try:
if self.socket is None:
self._open_socket()
self.socket.sendall(buf)
self.sequence += 1
Hal Simpson
committed
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
sys.stderr.write(error)
raise TimeseriesFactoryException(error)
def _get_forceout(self, time, rate):
"""
PARAMETERS
----------
time: UTCDateTime
time of the first sample
rate: int
The data rate in Hertz
RETURNS
-------
str
NOTES
-----
Data is encoded into a C style structure using struct.pack with the
following variables and type.
0xa1b2 (short)
nsamp (short)
seedname (12 char)
yr (short)
doy (short)
ratemantissa (short)
ratedivisor (short)
activity (byte)
ioClock (byte)
quality (byte)
timeingQuality (byte)
secs (int)
seq (int) Seems to be the miniseed sequence, but not certain.
basically we increment it for every new packet we send
The nsamp parameter is signed.
-2 is for a force out packet
"""
yr, doy, secs, usecs = self._get_time_values(time)
ratemantissa, ratedivisor = self._get_mantissa_divisor(rate)
buf = struct.pack(
PACKSTR,
PACKETHEAD,
FORCEOUT,
self.seedname,
yr,
doy,
ratemantissa,
ratedivisor,
self.activity,
self.ioclock,
self.quality,
self.timingquality,
secs,
usecs,
self.sequence,
)
Hal Simpson
committed
return buf
def _get_data(self, samples, time, rate):
"""
PARAMETERS
----------
samples: array like
An int array with the samples
time: UTCDateTime
time of the first sample
rate: int
The data rate in Hertz
RETURNS
-------
str
NOTES
-----
Data is encoded into a C style structure using struct.pack with the
following variables and type.
0xa1b2 (short)
nsamp (short)
seedname (12 char)
yr (short)
doy (short)
ratemantissa (short)
ratedivisor (short)
activity (byte)
ioClock (byte)
quality (byte)
timeingQuality (byte)
secs (int)
seq (int) Seems to be the miniseed sequence, but not certain.
basically we increment it for every new set we send
data [int]
Notice that we expect the data to already be ints.
The nsamp parameter is signed. If it's positive we send a data packet.
Hal Simpson
committed
Hal Simpson
committed
"""
Hal Simpson
committed
nsamp = len(samples)
Hal Simpson
committed
if nsamp > 32767:
raise TimeseriesFactoryException(
"Edge input limited to 32767 integers per packet."
)
Hal Simpson
committed
Hal Simpson
committed
yr, doy, secs, usecs = self._get_time_values(time)
ratemantissa, ratedivisor = self._get_mantissa_divisor(rate)
Hal Simpson
committed
packStr = "%s%d%s" % (PACKSTR, nsamp, "i")
buf = struct.pack(
bpackStr,
PACKETHEAD,
nsamp,
self.seedname,
yr,
doy,
ratemantissa,
ratedivisor,
self.activity,
self.ioclock,
self.quality,
self.timingquality,
secs,
usecs,
self.sequence,
Hal Simpson
committed
Hal Simpson
committed
return buf
def _get_mantissa_divisor(self, rate):
"""
PARAMETERS
----------
rate: int
The data rate in Hertz
RETURNS
-------
tuple: (ratemantissa, ratedivosor)
ratemantissa: int
ratedivosor: int
NOTE that `ratemantissa` and `ratedivisor` map directly to the SEED
standard's "sample rate factor" and "sample rate multiplier", which
are stored in a miniseed block's fixed data header as 2-byte words;
earlier versions of this method did not handle sample rates lower
than 1/second properly (they did handle 1/minute as a special-case);
Hal Simpson
committed
"""
Hal Simpson
committed
if rate > 0.9999:
# sample rates greater or equal to 1/second
# (this will not handle >327 Hz, but for consistency
# with earlier versions, we leave it unchanged)
Hal Simpson
committed
ratemantissa = int(rate * 100 + 0.001)
ratedivisor = -100
elif rate * 21600 > 1.0001:
# sample periods between 1 second and 6 hours
# (this should handle 1-minute data identically to
# earlier versions)
ratemantissa = -int(1 / rate + 0.001)
ratedivisor = -1
Hal Simpson
committed
else:
# sample periods 6 hours and longer
# (this will not handle periods longer than ~22 years)
ratemantissa = -int(1 / rate / 21600)
ratedivisor = -21600
Hal Simpson
committed
Hal Simpson
committed
return (ratemantissa, ratedivisor)
Hal Simpson
committed
Hal Simpson
committed
def _get_tag(self):
"""Get tag struct
Hal Simpson
committed
Hal Simpson
committed
RETURNS
-------
str
NOTES
-----
The tag packet is used to by the edge server to log/determine a new
"user" has connected to the edge, not one who's connection dropped,
and is trying to continue sending data.
The packet uses -1 in the nsamp position to indicate it's a tag packet
The tag is right padded with spaces.
The Packet is right padded with zeros
The Packet must be 40 Bytes long.
"""
tg = str(self.tag + " ").encode()
tb = struct.pack(TAGSTR, PACKETHEAD, TAG, tg[:12], 0, 0, 0, 0, 0, 0)
Hal Simpson
committed
return tb
def _get_time_values(self, time):
"""
PARAMETERS
----------
input_time: UTCDateTime
Hal Simpson
committed
time of the first sample
RETURNS
-------
tuple: (yr, doy, secs, usecs)
yr: int
doy: int
secs: int
usecs: int
"""
# check for presence of residual microseconds
if (time.microsecond / 1000).is_integer() == False:
# create warning message when rounding is necessary
"residual microsecond values encountered, rounding to nearest millisecond"
# round residual microsecond values
time = round_usecs(time)
Hal Simpson
committed
yr = time.year
doy = time.datetime.timetuple().tm_yday
secs = time.hour * 3600 + time.minute * 60 + time.second
usecs = time.microsecond
return (yr, doy, secs, usecs)
def _open_socket(self):
"""Open a socket
NOTES
-----
Loops until a socket is opened, with a 1 second wait between attempts
"""
done = False
newsocket = None
Hal Simpson
committed
trys = 0
while not done:
try:
newsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
newsocket.connect((self.host, self.port))
done = True
sys.stderr.write("Could not connect to socket, trying again")
sys.stderr.write("sockect error %d" % v[0])
sleep(1)
Hal Simpson
committed
if trys > 2:
raise TimeseriesFactoryException("Could not open socket")
Hal Simpson
committed
trys += 1
Hal Simpson
committed
self.socket = newsocket
self.socket.sendall(self._get_tag())