From ed7a7bc0d589b81bb813b47f77e96ffa46cbdc23 Mon Sep 17 00:00:00 2001 From: Jeremy Fee <jmfee@usgs.gov> Date: Mon, 28 Nov 2016 12:37:47 -0700 Subject: [PATCH] Add local version of obspy earthworm client and waveserver modules --- geomagio/edge/EdgeFactory.py | 16 +- geomagio/edge/client.py | 234 ++++++++++++++++++++++++++++ geomagio/edge/waveserver.py | 286 +++++++++++++++++++++++++++++++++++ 3 files changed, 530 insertions(+), 6 deletions(-) create mode 100644 geomagio/edge/client.py create mode 100644 geomagio/edge/waveserver.py diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index b904195cf..7f9262c73 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -15,12 +15,16 @@ import numpy import numpy.ma import obspy.core from datetime import datetime -try: - # obspy 1.x - from obspy.clients import earthworm -except: - # obspy 0.x - from obspy import earthworm +# try: +# # obspy 1.x +# from obspy.clients import earthworm +# except: +# # obspy 0.x +# from obspy import earthworm + +# use local version of earthworm client to test memory leak fix +import client as earthworm + from .. import ChannelConverter, TimeseriesUtility from ..TimeseriesFactory import TimeseriesFactory from ..TimeseriesFactoryException import TimeseriesFactoryException diff --git a/geomagio/edge/client.py b/geomagio/edge/client.py new file mode 100644 index 000000000..15dd81592 --- /dev/null +++ b/geomagio/edge/client.py @@ -0,0 +1,234 @@ +# -*- coding: utf-8 -*- +""" +Earthworm Wave Server client for ObsPy. + +:copyright: + The ObsPy Development Team (devs@obspy.org) & Victor Kress +:license: + GNU Lesser General Public License, Version 3 + (https://www.gnu.org/copyleft/lesser.html) + +.. seealso:: http://www.isti2.com/ew/PROGRAMMER/wsv_protocol.html +""" +from __future__ import (absolute_import, division, print_function, + unicode_literals) +from future.builtins import * # NOQA @UnusedWildImport + +from fnmatch import fnmatch + +from obspy import Stream, UTCDateTime +from .waveserver import get_menu, read_wave_server_v + + +class Client(object): + """ + A Earthworm Wave Server client. + + :type host: str + :param host: Host name of the remote Earthworm Wave Server server. + :type port: int + :param port: Port of the remote Earthworm Wave Server server. + :type timeout: int, optional + :param timeout: Seconds before a connection timeout is raised (default is + ``None``). + :type debug: bool, optional + :param debug: Enables verbose output of the connection handling (default is + ``False``). + """ + def __init__(self, host, port, timeout=None, debug=False): + """ + Initializes a Earthworm Wave Server client. + + See :class:`obspy.clients.earthworm.client.Client` for all parameters. + """ + self.host = host + self.port = port + self.timeout = timeout + self.debug = debug + + def get_waveforms(self, network, station, location, channel, starttime, + endtime, cleanup=True): + """ + Retrieves waveform data from Earthworm Wave Server and returns an ObsPy + Stream object. + + :type filename: str + :param filename: Name of the output file. + :type network: str + :param network: Network code, e.g. ``'UW'``. + :type station: str + :param station: Station code, e.g. ``'TUCA'``. + :type location: str + :param location: Location code, e.g. ``'--'``. + :type channel: str + :param channel: Channel code, e.g. ``'BHZ'``. Last character (i.e. + component) can be a wildcard ('?' or '*') to fetch `Z`, `N` and + `E` component. + :type starttime: :class:`~obspy.core.utcdatetime.UTCDateTime` + :param starttime: Start date and time. + :type endtime: :class:`~obspy.core.utcdatetime.UTCDateTime` + :param endtime: End date and time. + :return: ObsPy :class:`~obspy.core.stream.Stream` object. + :type cleanup: bool + :param cleanup: Specifies whether perfectly aligned traces should be + merged or not. See :meth:`obspy.core.stream.Stream.merge` for + ``method=-1``. + + .. rubric:: Example + + >>> from obspy.clients.earthworm import Client + >>> client = Client("pubavo1.wr.usgs.gov", 16022) + >>> dt = UTCDateTime() - 2000 # now - 2000 seconds + >>> st = client.get_waveforms('AV', 'ACH', '', 'EHE', dt, dt + 10) + >>> st.plot() # doctest: +SKIP + >>> st = client.get_waveforms('AV', 'ACH', '', 'EH*', dt, dt + 10) + >>> st.plot() # doctest: +SKIP + + .. plot:: + + from obspy.clients.earthworm import Client + from obspy import UTCDateTime + client = Client("pubavo1.wr.usgs.gov", 16022, timeout=5) + dt = UTCDateTime() - 2000 # now - 2000 seconds + st = client.get_waveforms('AV', 'ACH', '', 'EHE', dt, dt + 10) + st.plot() + st = client.get_waveforms('AV', 'ACH', '', 'EH*', dt, dt + 10) + st.plot() + """ + # replace wildcards in last char of channel and fetch all 3 components + if channel[-1] in "?*": + st = Stream() + for comp in ("Z", "N", "E"): + channel_new = channel[:-1] + comp + st += self.get_waveforms(network, station, location, + channel_new, starttime, endtime, + cleanup=cleanup) + return st + if location == '': + location = '--' + scnl = (station, channel, network, location) + # fetch waveform + tbl = read_wave_server_v(self.host, self.port, scnl, starttime, + endtime, timeout=self.timeout) + # create new stream + st = Stream() + for tb in tbl: + st.append(tb.get_obspy_trace()) + if cleanup: + st._cleanup() + st.trim(starttime, endtime) + return st + + def save_waveforms(self, filename, network, station, location, channel, + starttime, endtime, format="MSEED", cleanup=True): + """ + Writes a retrieved waveform directly into a file. + + :type filename: str + :param filename: Name of the output file. + :type network: str + :param network: Network code, e.g. ``'UW'``. + :type station: str + :param station: Station code, e.g. ``'TUCA'``. + :type location: str + :param location: Location code, e.g. ``''``. + :type channel: str + :param channel: Channel code, e.g. ``'BHZ'``. Last character (i.e. + component) can be a wildcard ('?' or '*') to fetch `Z`, `N` and + `E` component. + :type starttime: :class:`~obspy.core.utcdatetime.UTCDateTime` + :param starttime: Start date and time. + :type endtime: :class:`~obspy.core.utcdatetime.UTCDateTime` + :param endtime: End date and time. + :type format: str, optional + :param format: Output format. One of ``"MSEED"``, ``"GSE2"``, + ``"SAC"``, ``"SACXY"``, ``"Q"``, ``"SH_ASC"``, ``"SEGY"``, + ``"SU"``, ``"WAV"``. See the Supported Formats section in method + :meth:`~obspy.core.stream.Stream.write` for a full list of + supported formats. Defaults to ``'MSEED'``. + :type cleanup: bool + :param cleanup: Specifies whether perfectly aligned traces should be + merged or not. See :meth:`~obspy.core.stream.Stream.merge`, + `method` -1 or :meth:`~obspy.core.stream.Stream._cleanup`. + :return: None + + .. rubric:: Example + + >>> from obspy.clients.earthworm import Client + >>> client = Client("pubavo1.wr.usgs.gov", 16022) + >>> t = UTCDateTime() - 2000 # now - 2000 seconds + >>> client.save_waveforms('AV.ACH.--.EHE.mseed', + ... 'AV', 'ACH', '', 'EHE', + ... t, t + 10, format='MSEED') # doctest: +SKIP + """ + st = self.get_waveforms(network, station, location, channel, starttime, + endtime, cleanup=cleanup) + st.write(filename, format=format) + + def get_availability(self, network="*", station="*", location="*", + channel="*"): + """ + Gets a list of data available on the server. + + This method returns information about what time series data is + available on the server. The query can optionally be restricted to + specific network, station, channel and/or location criteria. + + :type network: str + :param network: Network code, e.g. ``'UW'``, wildcards allowed. + :type station: str + :param station: Station code, e.g. ``'TUCA'``, wildcards allowed. + :type location: str + :param location: Location code, e.g. ``'--'``, wildcards allowed. + :type channel: str + :param channel: Channel code, e.g. ``'BHZ'``, wildcards allowed. + :rtype: list + :return: List of tuples with information on the available data. One + tuple consists of network, station, location, channel + (all strings), start time and end time + (both as :class:`~obspy.core.utcdatetime.UTCDateTime`). + + .. rubric:: Example + + >>> from obspy.clients.earthworm import Client + >>> client = Client("pubavo1.wr.usgs.gov", 16022, timeout=5) + >>> response = client.get_availability( + ... network="AV", station="ACH", channel="EH*") + >>> print(response) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE + [('AV', + 'ACH', + '--', + 'EHE', + UTCDateTime(...), + UTCDateTime(...)), + ('AV', + 'ACH', + '--', + 'EHN', + UTCDateTime(...), + UTCDateTime(...)), + ('AV', + 'ACH', + '--', + 'EHZ', + UTCDateTime(...), + UTCDateTime(...))] + """ + # build up possibly wildcarded trace id pattern for query + if location == '': + location = '--' + pattern = ".".join((network, station, location, channel)) + # get overview of all available data, winston wave servers can not + # restrict the query via network, station etc. so we do that manually + response = get_menu(self.host, self.port, timeout=self.timeout) + # reorder items and convert time info to UTCDateTime + response = [(x[3], x[1], x[4], x[2], UTCDateTime(x[5]), + UTCDateTime(x[6])) for x in response] + # restrict results according to user input + response = [x for x in response if fnmatch(".".join(x[:4]), pattern)] + return response + + +if __name__ == '__main__': + import doctest + doctest.testmod(exclude_empty=True) diff --git a/geomagio/edge/waveserver.py b/geomagio/edge/waveserver.py new file mode 100644 index 000000000..056d9377c --- /dev/null +++ b/geomagio/edge/waveserver.py @@ -0,0 +1,286 @@ +# -*- coding: utf-8 -*- +""" +Low-level Earthworm Wave Server tools. + +:copyright: + The ObsPy Development Team (devs@obspy.org) & Victor Kress +:license: + GNU Lesser General Public License, Version 3 + (https://www.gnu.org/copyleft/lesser.html) +""" +from __future__ import (absolute_import, division, print_function, + unicode_literals) +from future.builtins import * # NOQA @UnusedWildImport +from future.utils import native_str + +import socket +import struct +import sys + +import numpy as np + +from obspy import Stream, Trace, UTCDateTime +from obspy.core import Stats + + +RETURNFLAG_KEY = { + 'F': 'success', + 'FR': 'requested data right (later) than tank contents', + 'FL': 'requested data left (earlier) than tank contents', + 'FG': 'requested data lie in tank gap', + 'FB': 'syntax error in request', + 'FC': 'data tank corrupt', + 'FN': 'requested tank not found', + 'FU': 'unknown error' +} + +DATATYPE_KEY = { + b't4': '>f4', b't8': '>f8', + b's4': '>i4', b's2': '>i2', + b'f4': '<f4', b'f8': '<f8', + b'i4': '<i4', b'i2': '<i2' +} + + +def get_numpy_type(tpstr): + """ + given a TraceBuf2 type string from header, + return appropriate numpy.dtype object + """ + dtypestr = DATATYPE_KEY[tpstr] + tp = np.dtype(native_str(dtypestr)) + return tp + + +class TraceBuf2(object): + """ + """ + byteswap = False + ndata = 0 # number of samples in instance + inputType = None # NumPy data type + + def read_tb2(self, tb2): + """ + Reads single TraceBuf2 packet from beginning of input byte array tb. + returns number of bytes read or 0 on read fail. + """ + if len(tb2) < 64: + return 0 # not enough array to hold header + head = tb2[:64] + self.parse_header(head) + nbytes = 64 + self.ndata * self.inputType.itemsize + if len(tb2) < nbytes: + return 0 # not enough array to hold data specified in header + dat = tb2[64:nbytes] + self.parse_data(dat) + return nbytes + + def parse_header(self, head): + """ + Parse tracebuf header into class variables + """ + pack_str = b'2i3d7s9s4s3s2s3s2s2s' + dtype = head[-7:-5] + if dtype[0:1] in b'ts': + endian = b'>' + elif dtype[0:1] in b'if': + endian = b'<' + else: + raise ValueError + self.inputType = get_numpy_type(dtype) + (self.pinno, self.ndata, ts, te, self.rate, self.sta, self.net, + self.chan, self.loc, self.version, tp, self.qual, _pad) = \ + struct.unpack(endian + pack_str, head) + if not tp.startswith(dtype): + msg = 'Error parsing header: %s!=%s' + print(msg % (dtype, tp), file=sys.stderr) + self.start = UTCDateTime(ts) + self.end = UTCDateTime(te) + return + + def parse_data(self, dat): + """ + Parse tracebuf char array data into self.data + """ + self.data = np.fromstring(dat, self.inputType) + ndat = len(self.data) + if self.ndata != ndat: + msg = 'data count in header (%d) != data count (%d)' + print(msg % (self.nsamp, ndat), file=sys.stderr) + self.ndata = ndat + return + + def get_obspy_trace(self): + """ + Return class contents as obspy.Trace object + """ + stat = Stats() + stat.network = self.net.split(b'\x00')[0].decode() + stat.station = self.sta.split(b'\x00')[0].decode() + location = self.loc.split(b'\x00')[0].decode() + if location == '--': + stat.location = '' + else: + stat.location = location + stat.channel = self.chan.split(b'\x00')[0].decode() + stat.starttime = UTCDateTime(self.start) + stat.sampling_rate = self.rate + stat.npts = len(self.data) + return Trace(data=self.data, header=stat) + + +def send_sock_req(server, port, req_str, timeout=None): + """ + Sets up socket to server and port, sends req_str + to socket and returns open socket + """ + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(timeout) + s.connect((server, port)) + if req_str[-1:] == b'\n': + s.send(req_str) + else: + s.send(req_str + b'\n') + return s + + +def get_sock_char_line(sock, timeout=10.): + """ + Retrieves one newline terminated string from input open socket + """ + sock.settimeout(timeout) + chunks = [] + indat = b'^' + try: + while indat[-1:] != b'\n': + # see https://github.com/obspy/obspy/issues/383 + # indat = sock.recv(8192) + indat = sock.recv(1) + chunks.append(indat) + except socket.timeout: + print('socket timeout in get_sock_char_line()', file=sys.stderr) + return None + if chunks: + response = b''.join(chunks) + return response + else: + return None + + +def get_sock_bytes(sock, nbytes, timeout=None): + """ + Listens for nbytes from open socket. + Returns byte array as python string or None if timeout + """ + sock.settimeout(timeout) + chunks = [] + btoread = nbytes + try: + while btoread: + indat = sock.recv(min(btoread, 8192)) + btoread -= len(indat) + chunks.append(indat) + except socket.timeout: + print('socket timeout in get_sock_bytes()', file=sys.stderr) + return None + if chunks: + response = b''.join(chunks) + return response + else: + return None + + +def get_menu(server, port, scnl=None, timeout=None): + """ + Return list of tanks on server + """ + rid = 'get_menu' + if scnl: + # only works on regular waveservers (not winston) + getstr = 'MENUSCNL: %s %s %s %s %s\n' % ( + rid, scnl[0], scnl[1], scnl[2], scnl[3]) + else: + # added SCNL not documented but required + getstr = 'MENU: %s SCNL\n' % rid + sock = send_sock_req(server, port, getstr.encode('ascii', 'strict'), + timeout=timeout) + r = get_sock_char_line(sock, timeout=timeout) + sock.close() + if r: + # XXX: we got here from bytes to utf-8 to keep the remaining code + # intact + tokens = str(r.decode()).split() + if tokens[0] == rid: + tokens = tokens[1:] + flag = tokens[-1] + if flag in ['FN', 'FC', 'FU']: + msg = 'request returned %s - %s' + print(msg % (flag, RETURNFLAG_KEY[flag]), file=sys.stderr) + return [] + if tokens[7].encode() in DATATYPE_KEY: + elen = 8 # length of return entry if location included + elif tokens[6].encode() in DATATYPE_KEY: + elen = 7 # length of return entry if location omitted + else: + print('no type token found in get_menu', file=sys.stderr) + return [] + outlist = [] + for p in range(0, len(tokens), elen): + l = tokens[p:p + elen] + if elen == 8: + outlist.append((int(l[0]), l[1], l[2], l[3], l[4], + float(l[5]), float(l[6]), l[7])) + else: + outlist.append((int(l[0]), l[1], l[2], l[3], '--', + float(l[4]), float(l[5]), l[6])) + return outlist + return [] + + +def read_wave_server_v(server, port, scnl, start, end, timeout=None): + """ + Reads data for specified time interval and scnl on specified waveserverV. + + Returns list of TraceBuf2 objects + """ + rid = 'rwserv' + scnlstr = '%s %s %s %s' % scnl + reqstr = 'GETSCNLRAW: %s %s %f %f\n' % (rid, scnlstr, start, end) + sock = send_sock_req(server, port, reqstr.encode('ascii', 'strict'), + timeout=timeout) + r = get_sock_char_line(sock, timeout=timeout) + if not r: + return [] + tokens = str(r.decode()).split() + flag = tokens[6] + if flag != 'F': + msg = 'read_wave_server_v returned flag %s - %s' + print(msg % (flag, RETURNFLAG_KEY[flag]), file=sys.stderr) + return [] + nbytes = int(tokens[-1]) + dat = get_sock_bytes(sock, nbytes, timeout=timeout) + sock.close() + tbl = [] + new = TraceBuf2() # empty..filled below + bytesread = 1 + p = 0 + while bytesread and p < len(dat): + bytesread = new.read_tb2(dat[p:]) + if bytesread: + tbl.append(new) + new = TraceBuf2() # empty..filled on next iteration + p += bytesread + return tbl + + +def trace_bufs2obspy_stream(tbuflist): + """ + Returns obspy.Stream object from input list of TraceBuf2 objects + """ + if not tbuflist: + return None + tlist = [] + for tb in tbuflist: + tlist.append(tb.get_obspy_trace()) + strm = Stream(tlist) + return strm -- GitLab