From 9b93a82d31442c454c2e0cf41d1c97856ad16c4a Mon Sep 17 00:00:00 2001 From: Hal Simpson <hasimpson@usgs.gov> Date: Thu, 25 Jun 2015 23:36:50 -0600 Subject: [PATCH] Refactored to reflect the change to write out an entire channel before closing socket. Also divided out buffer creation, from sending buffer. --- geomagio/edge/RawInputClient.py | 309 +++++++++++++++++++++----------- 1 file changed, 202 insertions(+), 107 deletions(-) diff --git a/geomagio/edge/RawInputClient.py b/geomagio/edge/RawInputClient.py index d3e4b5005..b9a67f572 100644 --- a/geomagio/edge/RawInputClient.py +++ b/geomagio/edge/RawInputClient.py @@ -29,13 +29,25 @@ MAXINPUTSIZE = 32767 HOURSECONDS = 3600 DAYMINUTES = 1440 +""" +PACKSTR, TAGSTR: String's used by pack.struct, to indicate the data format + for that packet. +PACKEHEAD: The code that leads a packet being sent to Edge. +""" +PACKSTR = '!1H1h12s4h4B3i' +TAGSTR = '!1H1h12s6i' +PACKETHEAD = 0xa1b2 + +""" +TAG, FORCEOUT: Flags that indicate to edge that a "data" packet has a specific + function. Goes in the nsamp position of the packet header. +""" TAG = -1 FORCEOUT = -2 class RawInputClient(): - Sequence = 0 - Seedname = '' + """RawInputClient for direct to edge data. Parameters ---------- @@ -46,12 +58,22 @@ class RawInputClient(): The IP address of the target host RawInputServer port: int The port on the IP of the RawInputServer - cwbhost: str - The host to use for data out of the 10 day window - cwbport: int - The port to use for data out of the 10 day window - - Note: cwbhost/cwbport are not currently used for geomag data + station: str + station code. + channel: str + channel to be written + location: str + location code + network: str + network code + activity: int + The activity flags per the SEED manual + ioClock: int + The IO/CLOCK flags per the SEED manual + quality: int + The data Quality flags per the SEED manual + timingQuality: int [0-100] + The overall timing quality Raises ------ @@ -62,13 +84,23 @@ class RawInputClient(): Uses sockets to send data to an edge. See send method for packet encoding """ - def __init__(self, tag='', host='', port=0): + def __init__(self, tag='', host='', port=0, station='', channel='', + location='', network='', activity=0, ioclock=0, quality=0, + timingquality=0): self.tag = tag self.host = host self.port = port + self.activity = activity + self.ioclock = ioclock + self.quality = quality + self.timingquality = timingquality + self.socket = None self.buf = None - self.seedname = '' + self.sequence = 0 + + self.seedname = self.create_seedname(station, channel, + location, network) if len(self.tag) > 10: raise TimeseriesFactoryException( @@ -81,24 +113,6 @@ class RawInputClient(): self.socket.close() self.socket = None - def forceout(self, seedname): - """ force edge to recognize data - - PARAMETERS - ---------- - seedname: str - The seedname of the data - NOTES - ----- - When sending data to edge it hangs on to the data, until either - enough data has accumulated, or enough time has passed. At that - point, it makes the new data available for reading. - Fourceout tells edge that we're done sending data for now, and - to go ahead and make it available - """ - self.send(seedname, [], UTCDateTime(datetime.utcnow()), - 0., 0, 0, 0, 0, True) - def create_seedname(self, observatory, channel, location='R0', network='NT'): """create a seedname for communication with edge. @@ -128,13 +142,23 @@ class RawInputClient(): """ return network + observatory.ljust(5) + channel + location - def send_trace(self, seedname, interval, trace): + def forceout(self): + """ force edge to recognize data + NOTES + ----- + When sending data to edge it hangs on to the data, until either + enough data has accumulated, or enough time has passed. At that + point, it makes the new data available for reading. + Fourceout tells edge that we're done sending data for now, and + to go ahead and make it available + """ + self._send([], UTCDateTime(datetime.utcnow()), 0., True) + + def send_trace(self, interval, trace): """send an obspy trace using send. PARAMETERS ---------- - seedname: str - the seedname for the trace interval: {'daily', 'hourly', 'minute', 'second'} data interval. trace: obspy.core.trace @@ -176,40 +200,103 @@ class RawInputClient(): nsamp = endsample - i endtime = starttime + (nsamp - 1) * timeoffset trace_send = trace.slice(starttime, endtime) - self.send(seedname, trace_send.data, starttime, - samplerate, 0, 0, 0, 0) + self._send(trace_send.data, starttime, samplerate) starttime += nsamp * timeoffset - def send(self, seedname, samples, time, rate, - activity=0, ioclock=0, quality=0, timingquality=0, - forceout=False): + def _send(self, samples, time, rate, forceout=False): """ Send a block of data to the Edge/CWB combination. PARAMETERS ---------- - seedname: str - The 12 character seedname of the channel NNSSSSSCCCLL fixed format samples: array like An int array with the samples time: UTCDateTime time of the first sample rate: int The data rate in Hertz - activity: int - The activity flags per the SEED manual - ioClock: int - The IO/CLOCK flags per the SEED manual - quality: int - The data Quality flags per the SEED manual - timingQuality: int [0-100] - The overall timing quality forceout: boolean - indicates the packet to be sent will have a nsamp value of -1, + indicates the packet to be sent will have a nsamp value of -2, to tell edge to force the data to be written Raises ------ TimeseriesFactoryException - if the socket will not open + """ + if forceout: + buf = self._get_forceout(time, rate) + else: + buf = self._get_data(samples, time, rate) + + # Try and send the packet, if the socket doesn't exist open it. + try: + if self.socket is None: + self._open_socket() + self.socket.sendall(buf) + self.sequence += 1 + except socket.error, v: + error = 'Socket error %d' % v[0] + sys.stderr.write(error) + raise TimeseriesFactoryException(error) + + def _get_forceout(self, time, rate): + """ + PARAMETERS + ---------- + time: UTCDateTime + time of the first sample + rate: int + The data rate in Hertz + + RETURNS + ------- + str + + NOTES + ----- + Data is encoded into a C style structure using struct.pack with the + following variables and type. + 0xa1b2 (short) + nsamp (short) + seedname (12 char) + yr (short) + doy (short) + ratemantissa (short) + ratedivisor (short) + activity (byte) + ioClock (byte) + quality (byte) + timeingQuality (byte) + secs (int) + seq (int) Seems to be the miniseed sequence, but not certain. + basically we increment it for every new packet we send + + The nsamp parameter is signed. + -2 is for a force out packet + + """ + yr, doy, secs, usecs = self._get_time_values(time) + ratemantissa, ratedivisor = self._get_mantissa_divisor(rate) + + buf = struct.pack(PACKSTR, PACKETHEAD, FORCEOUT, self.seedname, yr, + doy, ratemantissa, ratedivisor, self.activity, self.ioclock, + self.quality, self.timingquality, secs, usecs, + self.sequence) + return buf + + def _get_data(self, samples, time, rate): + """ + PARAMETERS + ---------- + samples: array like + An int array with the samples + time: UTCDateTime + time of the first sample + rate: int + The data rate in Hertz + + RETURNS + ------- + str NOTES ----- @@ -233,27 +320,37 @@ class RawInputClient(): Notice that we expect the data to already be ints. The nsamp parameter is signed. If it's positive we send a data packet. - -1 is for a tag packet (see _get_next_tag method) - -2 is for a force out packet (see forceout method) + """ nsamp = len(samples) if nsamp > 32767: raise TimeseriesFactoryException( 'Edge input limited to 32767 integers per packet.') - # If this is a new channel, reset sequence. - if RawInputClient.Seedname != seedname: - RawInputClient.Seedname = seedname - RawInputClient.Sequence = 0 + yr, doy, secs, usecs = self._get_time_values(time) + ratemantissa, ratedivisor = self._get_mantissa_divisor(rate) - # Get time parameters for packet - yr = time.year - doy = time.datetime.timetuple().tm_yday - secs = time.hour * 3600 + time.minute * 60 + time.second - usecs = time.microsecond + packStr = '%s%d%s' % (PACKSTR, nsamp, 'i') + buf = struct.pack(packStr, PACKETHEAD, nsamp, self.seedname, yr, doy, + ratemantissa, ratedivisor, self.activity, self.ioclock, + self.quality, self.timingquality, secs, usecs, self.sequence, + *samples) - # Calculate ratemantissa and ratedivisor for edge. - # force one minute data to be -60 and 1 + return buf + + def _get_mantissa_divisor(self, rate): + """ + PARAMETERS + ---------- + rate: int + The data rate in Hertz + + RETURNS + ------- + tuple: (ratemantissa, ratedivosor) + ratemantissa: int + ratedivosor: int + """ if rate > 0.9999: ratemantissa = int(rate * 100 + 0.001) ratedivisor = -100 @@ -264,32 +361,51 @@ class RawInputClient(): ratemantissa = int(rate * 10000. + 0.001) ratedivisor = -10000 - # Construct the packet to be sent. - packStr = '!1H1h12s4h4B3i' - if forceout: - buf = struct.pack(packStr, 0xa1b2, FORCEOUT, seedname, yr, - doy, ratemantissa, ratedivisor, activity, ioclock, - quality, timingquality, secs, usecs, - RawInputClient.Sequence) - else: - packStr = '%s%d%s' % (packStr, nsamp, 'i') - buf = struct.pack(packStr, 0xa1b2, nsamp, seedname, yr, doy, - ratemantissa, ratedivisor, activity, ioclock, quality, - timingquality, secs, usecs, RawInputClient.Sequence, - *samples) + return (ratemantissa, ratedivisor) - RawInputClient.Sequence +=1 + def _get_tag(self): + """Get tag struct - # Try and send the packet, if the socket doesn't exist open it. - try: - if self.socket is None: - self.socket = self._open_socket() - self.socket.sendall(self._get_tag()) - self.socket.sendall(buf) - except socket.error, v: - error = 'Socket error %d' % v[0] - sys.stderr.write(error) - raise TimeseriesFactoryException(error) + RETURNS + ------- + str + + NOTES + ----- + The tag packet is used to by the edge server to log/determine a new + "user" has connected to the edge, not one who's connection dropped, + and is trying to continue sending data. + The packet uses -1 in the nsamp position to indicate it's a tag packet + The tag is right padded with spaces. + The Packet is right padded with zeros + The Packet must be 40 Bytes long. + """ + tg = self.tag + ' ' + tb = struct.pack(TAGSTR, PACKETHEAD, TAG, tg[:12], + 0, 0, 0, 0, 0, 0) + return tb + + def _get_time_values(self, time): + """ + PARAMETERS + ---------- + time: UTCDateTime + time of the first sample + + RETURNS + ------- + tuple: (yr, doy, secs, usecs) + yr: int + doy: int + secs: int + usecs: int + """ + yr = time.year + doy = time.datetime.timetuple().tm_yday + secs = time.hour * 3600 + time.minute * 60 + time.second + usecs = time.microsecond + + return (yr, doy, secs, usecs) def _open_socket(self): """Open a socket @@ -317,26 +433,5 @@ class RawInputClient(): if trys > 2: raise TimeseriesFactoryException('Could not open socket') trys += 1 - return newsocket - - def _get_tag(self): - """Get tag struct - - RETURNS - ------- - str - - NOTES - ----- - The tag packet is used to by the edge server to log/determine a new - "user" has connected to the edge, not one who's connection dropped, - and is trying to continue sending data. - The packet uses -1 in the nsamp position to indicate it's a tag packet - The tag is right padded with spaces. - The Packet is right padded with zeros - The Packet must be 40 Bytes long. - """ - tg = self.tag + ' ' - tb = struct.pack('!1H1h12s6i', 0xa1b2, TAG, tg[:12], - 0, 0, 0, 0, 0, 0) - return tb + self.socket = newsocket + self.socket.sendall(self._get_tag()) -- GitLab