Skip to content
Snippets Groups Projects
RawInputClient.py 14.5 KiB
Newer Older
from __future__ import unicode_literals
from builtins import range, str
Jeremy M Fee's avatar
Jeremy M Fee committed
import struct
Jeremy M Fee's avatar
Jeremy M Fee committed
from datetime import datetime
from ..TimeseriesFactoryException import TimeseriesFactoryException
from ..TimeseriesUtility import round_usecs
import logging
Jeremy M Fee's avatar
Jeremy M Fee committed
from obspy.core import UTCDateTime
Jeremy M Fee's avatar
Jeremy M Fee committed

"""
MAXINPUTSIZE: Edge uses a short for the data count, so the max input size
    for a single data packet to edge is *supposed* to be 32767. However, this
    was never tested thoroughly until we added support for 10Hz data, when
    it was quickly discovered that setting this value caused Edge to hang.
    Therefore, MAXINPUTSIZE was reduced to 10_000, which seems to work well.
HOURSECONDS: The number of seconds in an hour. Used as an arbitrary size
    for sending seconds data.
DAYMINUTES: The numbers of minutes in a day. Used as the size for sending
    minute data,  since Edge stores by the day.
"""
MAXINPUTSIZE = 10_000
"""
PACKSTR, TAGSTR: String's used by pack.struct, to indicate the data format
    for that packet.
PACKEHEAD: The code that leads a packet being sent to Edge.
"""
PACKSTR = "!1H1h12s4h4B3i"
TAGSTR = "!1H1h12s6i"
PACKETHEAD = 0xA1B2

"""
TAG, FORCEOUT: Flags that indicate to edge that a "data" packet has a specific
    function. Goes in the nsamp position of the packet header.
"""
class RawInputClient:
    """RawInputClient for direct to edge data.
    Parameters
    ----------
    tag: str
        A string used by edge to make certain a socket hasn't been
        opened by a different user, and to log transactions.
    host: str
        The IP address of the target host RawInputServer
    port: int
        The port on the IP of the RawInputServer
    station: str
        station code.
    channel: str
        channel to be written
    location: str
        location code
    network: str
        network code
    activity: int
        The activity flags per the SEED manual
    ioClock: int
        The IO/CLOCK flags per the SEED manual
    quality: int
        The data Quality flags per the SEED manual
    timingQuality: int [0-100]
        The overall timing quality

    Raises
    ------
    TimeseriesFactoryException

    NOTES
    -----
    Uses sockets to send data to an edge. See send method for packet encoding
    def __init__(
        self,
        tag="",
        host="",
        port=0,
        station="",
        channel="",
        location="",
        network="",
        activity=0,
        ioclock=0,
        quality=0,
        timingquality=0,
    ):
        self.tag = tag
        self.host = host
        self.port = port
        self.activity = activity
        self.ioclock = ioclock
        self.quality = quality
        self.timingquality = timingquality

        self.seedname = self.create_seedname(station, channel, location, network)
            raise TimeseriesFactoryException("Tag limited to 10 characters")
        """close the open sockets"""
        if self.socket is not None:
            self.socket.close()
    def create_seedname(self, observatory, channel, location="R0", network="NT"):
        """create a seedname for communication with edge.
        PARAMETERS
        ----------
        observatory: str
            observatory code.
        channel: str
            channel to be written
        location: str
            location code
        network: str
            network code

        RETURNS
        -------
        str
            the seedname

        NOTES
        -----
        The seedname is in the form NNSSSSSCCCLL if a parameter is not
        of the correct length,  it should be padded with spaces to be of
        the correct length.  We only expect observatory to ever be of an
        incorrect length.
        return str(network + observatory.ljust(5) + channel + location).encode()
        """force edge to recognize data
        NOTES
        -----
        When sending data to edge it hangs on to the data, until either
            enough data has accumulated, or enough time has passed. At that
            point, it makes the new data available for reading.
            Fourceout tells edge that we're done sending data for now, and
            to go ahead and make it available
        """
        buf = self._get_forceout(UTCDateTime(datetime.utcnow()), 0.0)
        """send an obspy trace using send.

        PARAMETERS
        ----------
        interval: {'day', 'hour', 'minute', 'second', 'tenhertz'}
            data interval.
        trace: obspy.core.trace

        NOTES
        -----
        Edge only takes a short as the max number of samples it takes at one
        time. For ease of calculation, we break a trace into managable chunks
        according to interval type.
        """
        totalsamps = len(trace.data)
        starttime = trace.stats.starttime

        if interval == "tenhertz":
            nsamp = MAXINPUTSIZE
            timeoffset = 1 / 10.0
            samplerate = 10.0
        elif interval == "second":
            samplerate = 1.0
        elif interval == "minute":
            samplerate = 1.0 / 60
            samplerate = 1.0 / 3600
            samplerate = 1.0 / 86400
            raise TimeseriesFactoryException("Unsupported interval for RawInputClient")
        for i in range(0, totalsamps, nsamp):
            if totalsamps - i < nsamp:
                endsample = totalsamps
            else:
                endsample = i + nsamp
            nsamp = endsample - i
            endtime = starttime + (nsamp - 1) * timeoffset
            trace_send = trace.slice(starttime, endtime)
            buf = self._get_data(trace_send.data, starttime, samplerate)
            self._send(buf)
        """Send a block of data to the Edge/CWB combination.
        samples: array like
            An int array with the samples
        time: UTCDateTime
            time of the first sample
        rate: int
            The data rate in Hertz
        forceout: boolean
            indicates the packet to be sent will have a nsamp value of -2,
            to tell edge to force the data to be written
        TimeseriesFactoryException - if the socket will not open
        """

        # Try and send the packet, if the socket doesn't exist open it.
        try:
            if self.socket is None:
                self._open_socket()
            self.socket.sendall(buf)
            self.sequence += 1
Yash Shah's avatar
Yash Shah committed
        except socket.error as v:
            error = "Socket error %d" % v[0]
            sys.stderr.write(error)
            raise TimeseriesFactoryException(error)

    def _get_forceout(self, time, rate):
        """
        PARAMETERS
        ----------
        time: UTCDateTime
            time of the first sample
        rate: int
            The data rate in Hertz

        RETURNS
        -------
        str

        NOTES
        -----
        Data is encoded into a C style structure using struct.pack with the
        following variables and type.
            0xa1b2 (short)
            nsamp (short)
            seedname (12 char)
            yr (short)
            doy (short)
            ratemantissa (short)
            ratedivisor (short)
            activity (byte)
            ioClock (byte)
            quality (byte)
            timeingQuality (byte)
            secs (int)
            seq  (int)   Seems to be the miniseed sequence, but not certain.
                        basically we increment it for every new packet we send

        The nsamp parameter is signed.
            -2 is for a force out packet

        """
        yr, doy, secs, usecs = self._get_time_values(time)
        ratemantissa, ratedivisor = self._get_mantissa_divisor(rate)

        buf = struct.pack(
            PACKSTR,
            PACKETHEAD,
            FORCEOUT,
            self.seedname,
            yr,
            doy,
            ratemantissa,
            ratedivisor,
            self.activity,
            self.ioclock,
            self.quality,
            self.timingquality,
            secs,
            usecs,
            self.sequence,
        )
        return buf

    def _get_data(self, samples, time, rate):
        """
        PARAMETERS
        ----------
        samples: array like
            An int array with the samples
        time: UTCDateTime
            time of the first sample
        rate: int
            The data rate in Hertz

        RETURNS
        -------
        str

        NOTES
        -----
        Data is encoded into a C style structure using struct.pack with the
        following variables and type.
            0xa1b2 (short)
            nsamp (short)
            seedname (12 char)
            yr (short)
            doy (short)
            ratemantissa (short)
            ratedivisor (short)
            activity (byte)
            ioClock (byte)
            quality (byte)
            timeingQuality (byte)
            secs (int)
            seq  (int)   Seems to be the miniseed sequence, but not certain.
                        basically we increment it for every new set we send
            data [int]

        Notice that we expect the data to already be ints.
        The nsamp parameter is signed. If it's positive we send a data packet.
        if nsamp > 32767:
            raise TimeseriesFactoryException(
                "Edge input limited to 32767 integers per packet."
            )
        yr, doy, secs, usecs = self._get_time_values(time)
        ratemantissa, ratedivisor = self._get_mantissa_divisor(rate)
        packStr = "%s%d%s" % (PACKSTR, nsamp, "i")
        bpackStr = str(packStr).encode()
        buf = struct.pack(
            bpackStr,
            PACKETHEAD,
            nsamp,
            self.seedname,
            yr,
            doy,
            ratemantissa,
            ratedivisor,
            self.activity,
            self.ioclock,
            self.quality,
            self.timingquality,
            secs,
            usecs,
            self.sequence,
            *samples,
        return buf

    def _get_mantissa_divisor(self, rate):
        """
        PARAMETERS
        ----------
        rate: int
            The data rate in Hertz

        RETURNS
        -------
        tuple: (ratemantissa, ratedivosor)
            ratemantissa: int
            ratedivosor: int

        NOTE that `ratemantissa` and `ratedivisor` map directly to the SEED
        standard's "sample rate factor" and "sample rate multiplier", which
        are stored in a miniseed block's fixed data header as 2-byte words;
        earlier versions of this method did not handle sample rates lower
        than 1/second properly (they did handle 1/minute as a special-case);
            # sample rates greater or equal to 1/second
            # (this will not handle >327 Hz, but for consistency
            #  with earlier versions, we leave it unchanged)
            ratemantissa = int(rate * 100 + 0.001)
            ratedivisor = -100
        elif rate * 21600 > 1.0001:
            # sample periods between 1 second and 6 hours
            # (this should handle 1-minute data identically to
            #  earlier versions)
            ratemantissa = -int(1 / rate + 0.001)
            ratedivisor = -1
            # sample periods 6 hours and longer
            # (this will not handle periods longer than ~22 years)
            ratemantissa = -int(1 / rate / 21600)
            ratedivisor = -21600
        RETURNS
        -------
        str

        NOTES
        -----
        The tag packet is used to by the edge server to log/determine a new
        "user" has connected to the edge, not one who's connection dropped,
        and is trying to continue sending data.
        The packet uses -1 in the nsamp position to indicate it's a tag packet
        The tag is right padded with spaces.
        The Packet is right padded with zeros
        The Packet must be 40 Bytes long.
        """
        tg = str(self.tag + "            ").encode()
        tb = struct.pack(TAGSTR, PACKETHEAD, TAG, tg[:12], 0, 0, 0, 0, 0, 0)
        return tb

    def _get_time_values(self, time):
        """
        PARAMETERS
        ----------
            time of the first sample

        RETURNS
        -------
        tuple: (yr, doy, secs, usecs)
            yr: int
            doy: int
            secs: int
            usecs: int
        """

        # check for presence of residual microseconds
        if (time.microsecond / 1000).is_integer() == False:
            # create warning message when rounding is necessary
            logging.warning(
                "residual microsecond values encountered, rounding to nearest millisecond"
            # round residual microsecond values
            time = round_usecs(time)

        yr = time.year
        doy = time.datetime.timetuple().tm_yday
        secs = time.hour * 3600 + time.minute * 60 + time.second
        usecs = time.microsecond
        return (yr, doy, secs, usecs)

    def _open_socket(self):
        """Open a socket

        NOTES
        -----
        Loops until a socket is opened, with a 1 second wait between attempts
Hal Simpson's avatar
Hal Simpson committed
        Sends tag.
        while not done:
            try:
                newsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                newsocket.connect((self.host, self.port))
                done = True
Yash Shah's avatar
Yash Shah committed
            except socket.error as v:
                sys.stderr.write("Could not connect to socket, trying again")
                sys.stderr.write("sockect error %d" % v[0])
                raise TimeseriesFactoryException("Could not open socket")
        self.socket = newsocket
        self.socket.sendall(self._get_tag())