diff --git a/geomagio/edge/EdgeFactory.py b/geomagio/edge/EdgeFactory.py index 9b672df4d95b40abdba97306be47d1f37bdd8d7d..694dec6c3f5ed20db99cd381b16ac6808ceb6921 100644 --- a/geomagio/edge/EdgeFactory.py +++ b/geomagio/edge/EdgeFactory.py @@ -11,20 +11,11 @@ Edge is the USGS earthquake hazard centers replacement for earthworm. from __future__ import absolute_import import sys -from io import BytesIO import numpy import numpy.ma import obspy.core from datetime import datetime -# try: -# # obspy 1.x -# from obspy.clients import earthworm -# except: -# # obspy 0.x -# from obspy import earthworm - -# use local version of earthworm client to test memory leak fix -from . import client as earthworm +from obspy.clients import earthworm from .. import ChannelConverter, TimeseriesUtility from ..TimeseriesFactory import TimeseriesFactory @@ -138,24 +129,19 @@ class EdgeFactory(TimeseriesFactory): raise TimeseriesFactoryException( 'Starttime before endtime "%s" "%s"' % (starttime, endtime)) - # need this until https://github.com/obspy/obspy/pull/1179 - # replace stdout + # obspy factories sometimes write to stdout, instead of stderr original_stdout = sys.stdout - temp_stdout = BytesIO() try: - sys.stdout = temp_stdout + # send stdout to stderr + sys.stdout = sys.stderr # get the timeseries timeseries = obspy.core.Stream() for channel in channels: data = self._get_timeseries(starttime, endtime, observatory, channel, type, interval) timeseries += data - # restore stdout finally: - output = temp_stdout.getvalue() - if output: - sys.stderr.write(str(output)) - temp_stdout.close() + # restore stdout sys.stdout = original_stdout self._post_process(timeseries, starttime, endtime, channels) diff --git a/geomagio/edge/MiniSeedFactory.py b/geomagio/edge/MiniSeedFactory.py index aabf673b2d79d5791ab203ced018825659b7c864..9f76b619e41060670c457663f8463103681572bc 100644 --- a/geomagio/edge/MiniSeedFactory.py +++ b/geomagio/edge/MiniSeedFactory.py @@ -11,7 +11,6 @@ Edge is the USGS earthquake hazard centers replacement for earthworm. from __future__ import absolute_import import sys -from io import BytesIO import numpy import numpy.ma @@ -119,25 +118,19 @@ class MiniSeedFactory(TimeseriesFactory): raise TimeseriesFactoryException( 'Starttime before endtime "%s" "%s"' % (starttime, endtime)) - # need this until https://github.com/obspy/obspy/pull/1179 - # replace stdout + # obspy factories sometimes write to stdout, instead of stderr original_stdout = sys.stdout - temp_stdout = BytesIO() try: - sys.stdout = temp_stdout + # send stdout to stderr + sys.stdout = sys.stderr # get the timeseries timeseries = obspy.core.Stream() for channel in channels: data = self._get_timeseries(starttime, endtime, observatory, channel, type, interval) timeseries += data - - # restore stdout finally: - output = temp_stdout.getvalue() - if output: - sys.stderr.write(str(output)) - temp_stdout.close() + # restore stdout sys.stdout = original_stdout if self.convert_channels is not None: diff --git a/geomagio/edge/client.py b/geomagio/edge/client.py deleted file mode 100644 index 15dd81592869fe7a6336fb0be561009ec06f58dd..0000000000000000000000000000000000000000 --- a/geomagio/edge/client.py +++ /dev/null @@ -1,234 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Earthworm Wave Server client for ObsPy. - -:copyright: - The ObsPy Development Team (devs@obspy.org) & Victor Kress -:license: - GNU Lesser General Public License, Version 3 - (https://www.gnu.org/copyleft/lesser.html) - -.. seealso:: http://www.isti2.com/ew/PROGRAMMER/wsv_protocol.html -""" -from __future__ import (absolute_import, division, print_function, - unicode_literals) -from future.builtins import * # NOQA @UnusedWildImport - -from fnmatch import fnmatch - -from obspy import Stream, UTCDateTime -from .waveserver import get_menu, read_wave_server_v - - -class Client(object): - """ - A Earthworm Wave Server client. - - :type host: str - :param host: Host name of the remote Earthworm Wave Server server. - :type port: int - :param port: Port of the remote Earthworm Wave Server server. - :type timeout: int, optional - :param timeout: Seconds before a connection timeout is raised (default is - ``None``). - :type debug: bool, optional - :param debug: Enables verbose output of the connection handling (default is - ``False``). - """ - def __init__(self, host, port, timeout=None, debug=False): - """ - Initializes a Earthworm Wave Server client. - - See :class:`obspy.clients.earthworm.client.Client` for all parameters. - """ - self.host = host - self.port = port - self.timeout = timeout - self.debug = debug - - def get_waveforms(self, network, station, location, channel, starttime, - endtime, cleanup=True): - """ - Retrieves waveform data from Earthworm Wave Server and returns an ObsPy - Stream object. - - :type filename: str - :param filename: Name of the output file. - :type network: str - :param network: Network code, e.g. ``'UW'``. - :type station: str - :param station: Station code, e.g. ``'TUCA'``. - :type location: str - :param location: Location code, e.g. ``'--'``. - :type channel: str - :param channel: Channel code, e.g. ``'BHZ'``. Last character (i.e. - component) can be a wildcard ('?' or '*') to fetch `Z`, `N` and - `E` component. - :type starttime: :class:`~obspy.core.utcdatetime.UTCDateTime` - :param starttime: Start date and time. - :type endtime: :class:`~obspy.core.utcdatetime.UTCDateTime` - :param endtime: End date and time. - :return: ObsPy :class:`~obspy.core.stream.Stream` object. - :type cleanup: bool - :param cleanup: Specifies whether perfectly aligned traces should be - merged or not. See :meth:`obspy.core.stream.Stream.merge` for - ``method=-1``. - - .. rubric:: Example - - >>> from obspy.clients.earthworm import Client - >>> client = Client("pubavo1.wr.usgs.gov", 16022) - >>> dt = UTCDateTime() - 2000 # now - 2000 seconds - >>> st = client.get_waveforms('AV', 'ACH', '', 'EHE', dt, dt + 10) - >>> st.plot() # doctest: +SKIP - >>> st = client.get_waveforms('AV', 'ACH', '', 'EH*', dt, dt + 10) - >>> st.plot() # doctest: +SKIP - - .. plot:: - - from obspy.clients.earthworm import Client - from obspy import UTCDateTime - client = Client("pubavo1.wr.usgs.gov", 16022, timeout=5) - dt = UTCDateTime() - 2000 # now - 2000 seconds - st = client.get_waveforms('AV', 'ACH', '', 'EHE', dt, dt + 10) - st.plot() - st = client.get_waveforms('AV', 'ACH', '', 'EH*', dt, dt + 10) - st.plot() - """ - # replace wildcards in last char of channel and fetch all 3 components - if channel[-1] in "?*": - st = Stream() - for comp in ("Z", "N", "E"): - channel_new = channel[:-1] + comp - st += self.get_waveforms(network, station, location, - channel_new, starttime, endtime, - cleanup=cleanup) - return st - if location == '': - location = '--' - scnl = (station, channel, network, location) - # fetch waveform - tbl = read_wave_server_v(self.host, self.port, scnl, starttime, - endtime, timeout=self.timeout) - # create new stream - st = Stream() - for tb in tbl: - st.append(tb.get_obspy_trace()) - if cleanup: - st._cleanup() - st.trim(starttime, endtime) - return st - - def save_waveforms(self, filename, network, station, location, channel, - starttime, endtime, format="MSEED", cleanup=True): - """ - Writes a retrieved waveform directly into a file. - - :type filename: str - :param filename: Name of the output file. - :type network: str - :param network: Network code, e.g. ``'UW'``. - :type station: str - :param station: Station code, e.g. ``'TUCA'``. - :type location: str - :param location: Location code, e.g. ``''``. - :type channel: str - :param channel: Channel code, e.g. ``'BHZ'``. Last character (i.e. - component) can be a wildcard ('?' or '*') to fetch `Z`, `N` and - `E` component. - :type starttime: :class:`~obspy.core.utcdatetime.UTCDateTime` - :param starttime: Start date and time. - :type endtime: :class:`~obspy.core.utcdatetime.UTCDateTime` - :param endtime: End date and time. - :type format: str, optional - :param format: Output format. One of ``"MSEED"``, ``"GSE2"``, - ``"SAC"``, ``"SACXY"``, ``"Q"``, ``"SH_ASC"``, ``"SEGY"``, - ``"SU"``, ``"WAV"``. See the Supported Formats section in method - :meth:`~obspy.core.stream.Stream.write` for a full list of - supported formats. Defaults to ``'MSEED'``. - :type cleanup: bool - :param cleanup: Specifies whether perfectly aligned traces should be - merged or not. See :meth:`~obspy.core.stream.Stream.merge`, - `method` -1 or :meth:`~obspy.core.stream.Stream._cleanup`. - :return: None - - .. rubric:: Example - - >>> from obspy.clients.earthworm import Client - >>> client = Client("pubavo1.wr.usgs.gov", 16022) - >>> t = UTCDateTime() - 2000 # now - 2000 seconds - >>> client.save_waveforms('AV.ACH.--.EHE.mseed', - ... 'AV', 'ACH', '', 'EHE', - ... t, t + 10, format='MSEED') # doctest: +SKIP - """ - st = self.get_waveforms(network, station, location, channel, starttime, - endtime, cleanup=cleanup) - st.write(filename, format=format) - - def get_availability(self, network="*", station="*", location="*", - channel="*"): - """ - Gets a list of data available on the server. - - This method returns information about what time series data is - available on the server. The query can optionally be restricted to - specific network, station, channel and/or location criteria. - - :type network: str - :param network: Network code, e.g. ``'UW'``, wildcards allowed. - :type station: str - :param station: Station code, e.g. ``'TUCA'``, wildcards allowed. - :type location: str - :param location: Location code, e.g. ``'--'``, wildcards allowed. - :type channel: str - :param channel: Channel code, e.g. ``'BHZ'``, wildcards allowed. - :rtype: list - :return: List of tuples with information on the available data. One - tuple consists of network, station, location, channel - (all strings), start time and end time - (both as :class:`~obspy.core.utcdatetime.UTCDateTime`). - - .. rubric:: Example - - >>> from obspy.clients.earthworm import Client - >>> client = Client("pubavo1.wr.usgs.gov", 16022, timeout=5) - >>> response = client.get_availability( - ... network="AV", station="ACH", channel="EH*") - >>> print(response) # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE - [('AV', - 'ACH', - '--', - 'EHE', - UTCDateTime(...), - UTCDateTime(...)), - ('AV', - 'ACH', - '--', - 'EHN', - UTCDateTime(...), - UTCDateTime(...)), - ('AV', - 'ACH', - '--', - 'EHZ', - UTCDateTime(...), - UTCDateTime(...))] - """ - # build up possibly wildcarded trace id pattern for query - if location == '': - location = '--' - pattern = ".".join((network, station, location, channel)) - # get overview of all available data, winston wave servers can not - # restrict the query via network, station etc. so we do that manually - response = get_menu(self.host, self.port, timeout=self.timeout) - # reorder items and convert time info to UTCDateTime - response = [(x[3], x[1], x[4], x[2], UTCDateTime(x[5]), - UTCDateTime(x[6])) for x in response] - # restrict results according to user input - response = [x for x in response if fnmatch(".".join(x[:4]), pattern)] - return response - - -if __name__ == '__main__': - import doctest - doctest.testmod(exclude_empty=True) diff --git a/geomagio/edge/waveserver.py b/geomagio/edge/waveserver.py deleted file mode 100644 index 47ac57c5df5eb359e717856eecaf36447de8c943..0000000000000000000000000000000000000000 --- a/geomagio/edge/waveserver.py +++ /dev/null @@ -1,290 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Low-level Earthworm Wave Server tools. - -:copyright: - The ObsPy Development Team (devs@obspy.org) & Victor Kress -:license: - GNU Lesser General Public License, Version 3 - (https://www.gnu.org/copyleft/lesser.html) -""" -from __future__ import (absolute_import, division, print_function, - unicode_literals) -from future.builtins import * # NOQA @UnusedWildImport -from future.utils import native_str - -import socket -import struct -import sys - -import numpy as np - -from obspy import Stream, Trace, UTCDateTime -from obspy.core import Stats - - -RETURNFLAG_KEY = { - 'F': 'success', - 'FR': 'requested data right (later) than tank contents', - 'FL': 'requested data left (earlier) than tank contents', - 'FG': 'requested data lie in tank gap', - 'FB': 'syntax error in request', - 'FC': 'data tank corrupt', - 'FN': 'requested tank not found', - 'FU': 'unknown error' -} - -DATATYPE_KEY = { - b't4': '>f4', b't8': '>f8', - b's4': '>i4', b's2': '>i2', - b'f4': '<f4', b'f8': '<f8', - b'i4': '<i4', b'i2': '<i2' -} - - -def get_numpy_type(tpstr): - """ - given a TraceBuf2 type string from header, - return appropriate numpy.dtype object - """ - dtypestr = DATATYPE_KEY[tpstr] - tp = np.dtype(native_str(dtypestr)) - return tp - - -class TraceBuf2(object): - """ - """ - byteswap = False - ndata = 0 # number of samples in instance - inputType = None # NumPy data type - - def read_tb2(self, tb2): - """ - Reads single TraceBuf2 packet from beginning of input byte array tb. - returns number of bytes read or 0 on read fail. - """ - if len(tb2) < 64: - return 0 # not enough array to hold header - head = tb2[:64] - self.parse_header(head) - nbytes = 64 + self.ndata * self.inputType.itemsize - if len(tb2) < nbytes: - return 0 # not enough array to hold data specified in header - dat = tb2[64:nbytes] - self.parse_data(dat) - return nbytes - - def parse_header(self, head): - """ - Parse tracebuf header into class variables - """ - pack_str = b'2i3d7s9s4s3s2s3s2s2s' - dtype = head[-7:-5] - if dtype[0:1] in b'ts': - endian = b'>' - elif dtype[0:1] in b'if': - endian = b'<' - else: - raise ValueError - self.inputType = get_numpy_type(dtype) - (self.pinno, self.ndata, ts, te, self.rate, self.sta, self.net, - self.chan, self.loc, self.version, tp, self.qual, _pad) = \ - struct.unpack(endian + pack_str, head) - if not tp.startswith(dtype): - msg = 'Error parsing header: %s!=%s' - print(msg % (dtype, tp), file=sys.stderr) - self.start = UTCDateTime(ts) - self.end = UTCDateTime(te) - return - - def parse_data(self, dat): - """ - Parse tracebuf char array data into self.data - """ - self.data = np.fromstring(dat, self.inputType) - ndat = len(self.data) - if self.ndata != ndat: - msg = 'data count in header (%d) != data count (%d)' - print(msg % (self.nsamp, ndat), file=sys.stderr) - self.ndata = ndat - return - - def get_obspy_trace(self): - """ - Return class contents as obspy.Trace object - """ - stat = Stats() - stat.network = self.net.split(b'\x00')[0].decode() - stat.station = self.sta.split(b'\x00')[0].decode() - location = self.loc.split(b'\x00')[0].decode() - if location == '--': - stat.location = '' - else: - stat.location = location - stat.channel = self.chan.split(b'\x00')[0].decode() - stat.starttime = UTCDateTime(self.start) - stat.sampling_rate = self.rate - stat.npts = len(self.data) - return Trace(data=self.data, header=stat) - - -def send_sock_req(server, port, req_str, timeout=None): - """ - Sets up socket to server and port, sends req_str - to socket and returns open socket - """ - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.settimeout(timeout) - s.connect((server, port)) - if req_str[-1:] == b'\n': - s.send(req_str) - else: - s.send(req_str + b'\n') - return s - - -def get_sock_char_line(sock, timeout=10.): - """ - Retrieves one newline terminated string from input open socket - """ - sock.settimeout(timeout) - chunks = [] - indat = b'^' - try: - while indat[-1:] != b'\n': - # see https://github.com/obspy/obspy/issues/383 - # indat = sock.recv(8192) - indat = sock.recv(1) - if not indat: - break - chunks.append(indat) - except socket.timeout: - print('socket timeout in get_sock_char_line()', file=sys.stderr) - return None - if chunks: - response = b''.join(chunks) - return response - else: - return None - - -def get_sock_bytes(sock, nbytes, timeout=None): - """ - Listens for nbytes from open socket. - Returns byte array as python string or None if timeout - """ - sock.settimeout(timeout) - chunks = [] - btoread = nbytes - try: - while btoread: - indat = sock.recv(min(btoread, 8192)) - if not indat: - break - btoread -= len(indat) - chunks.append(indat) - except socket.timeout: - print('socket timeout in get_sock_bytes()', file=sys.stderr) - return None - if chunks: - response = b''.join(chunks) - return response - else: - return None - - -def get_menu(server, port, scnl=None, timeout=None): - """ - Return list of tanks on server - """ - rid = 'get_menu' - if scnl: - # only works on regular waveservers (not winston) - getstr = 'MENUSCNL: %s %s %s %s %s\n' % ( - rid, scnl[0], scnl[1], scnl[2], scnl[3]) - else: - # added SCNL not documented but required - getstr = 'MENU: %s SCNL\n' % rid - sock = send_sock_req(server, port, getstr.encode('ascii', 'strict'), - timeout=timeout) - r = get_sock_char_line(sock, timeout=timeout) - sock.close() - if r: - # XXX: we got here from bytes to utf-8 to keep the remaining code - # intact - tokens = str(r.decode()).split() - if tokens[0] == rid: - tokens = tokens[1:] - flag = tokens[-1] - if flag in ['FN', 'FC', 'FU']: - msg = 'request returned %s - %s' - print(msg % (flag, RETURNFLAG_KEY[flag]), file=sys.stderr) - return [] - if tokens[7].encode() in DATATYPE_KEY: - elen = 8 # length of return entry if location included - elif tokens[6].encode() in DATATYPE_KEY: - elen = 7 # length of return entry if location omitted - else: - print('no type token found in get_menu', file=sys.stderr) - return [] - outlist = [] - for p in range(0, len(tokens), elen): - l = tokens[p:p + elen] - if elen == 8: - outlist.append((int(l[0]), l[1], l[2], l[3], l[4], - float(l[5]), float(l[6]), l[7])) - else: - outlist.append((int(l[0]), l[1], l[2], l[3], '--', - float(l[4]), float(l[5]), l[6])) - return outlist - return [] - - -def read_wave_server_v(server, port, scnl, start, end, timeout=None): - """ - Reads data for specified time interval and scnl on specified waveserverV. - - Returns list of TraceBuf2 objects - """ - rid = 'rwserv' - scnlstr = '%s %s %s %s' % scnl - reqstr = 'GETSCNLRAW: %s %s %f %f\n' % (rid, scnlstr, start, end) - sock = send_sock_req(server, port, reqstr.encode('ascii', 'strict'), - timeout=timeout) - r = get_sock_char_line(sock, timeout=timeout) - if not r: - return [] - tokens = str(r.decode()).split() - flag = tokens[6] - if flag != 'F': - msg = 'read_wave_server_v returned flag %s - %s' - print(msg % (flag, RETURNFLAG_KEY[flag]), file=sys.stderr) - return [] - nbytes = int(tokens[-1]) - dat = get_sock_bytes(sock, nbytes, timeout=timeout) - sock.close() - tbl = [] - new = TraceBuf2() # empty..filled below - bytesread = 1 - p = 0 - while bytesread and p < len(dat): - bytesread = new.read_tb2(dat[p:]) - if bytesread: - tbl.append(new) - new = TraceBuf2() # empty..filled on next iteration - p += bytesread - return tbl - - -def trace_bufs2obspy_stream(tbuflist): - """ - Returns obspy.Stream object from input list of TraceBuf2 objects - """ - if not tbuflist: - return None - tlist = [] - for tb in tbuflist: - tlist.append(tb.get_obspy_trace()) - strm = Stream(tlist) - return strm