From 7613dfb3f52c73454d9fd5f21185aa892859dd55 Mon Sep 17 00:00:00 2001 From: Jeremy Fee <jmfee@usgs.gov> Date: Mon, 16 Dec 2019 15:47:25 -0700 Subject: [PATCH] Initial implementation for miniseed factory put_timeseries --- geomagio/edge/MiniSeedFactory.py | 24 +++++++++- geomagio/edge/MiniSeedInputClient.py | 70 ++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 geomagio/edge/MiniSeedInputClient.py diff --git a/geomagio/edge/MiniSeedFactory.py b/geomagio/edge/MiniSeedFactory.py index bced26bba..d0f770497 100644 --- a/geomagio/edge/MiniSeedFactory.py +++ b/geomagio/edge/MiniSeedFactory.py @@ -178,6 +178,8 @@ class MiniSeedFactory(TimeseriesFactory): for channel in channels: self._put_channel(timeseries, observatory, channel, type, interval, starttime, endtime) + # close socket + self.write_client.close() def _convert_stream_to_masked(self, timeseries, channel): """convert geomag edge traces in a timeseries stream to a MaskedArray @@ -497,7 +499,27 @@ class MiniSeedFactory(TimeseriesFactory): starttime: obspy.core.UTCDateTime endtime: obspy.core.UTCDateTime """ - raise NotImplementedError('"_put_channel" not implemented') + # use separate traces when there are gaps + to_write = timeseries.select(channel=channel) + to_write = TimeseriesUtility.mask_stream(to_write) + to_write = to_write.split() + to_write = TimeseriesUtility.unmask_stream(to_write) + # relabel channels from internal to edge conventions + station = self._get_edge_station(observatory, channel, + type, interval) + location = self._get_edge_location(observatory, channel, + type, interval) + network = self._get_edge_network(observatory, channel, + type, interval) + edge_channel = self._get_edge_channel(observatory, channel, + type, interval) + for trace in to_write: + trace.stats.station = station + trace.stats.location = location + trace.stats.network = network + trace.stats.channel = edge_channel + # finally, send to edge + self.write_client.send(to_write) def _set_metadata(self, stream, observatory, channel, type, interval): """set metadata for a given stream/channel diff --git a/geomagio/edge/MiniSeedInputClient.py b/geomagio/edge/MiniSeedInputClient.py new file mode 100644 index 000000000..e725c28c6 --- /dev/null +++ b/geomagio/edge/MiniSeedInputClient.py @@ -0,0 +1,70 @@ +from __future__ import absolute_import, print_function +import io +import socket +import sys + + +class MiniSeedInputClient(object): + """Client to write MiniSeed formatted data to Edge. + + Connects on first call to send(). + Use close() to disconnect. + + Parameters + ---------- + host: str + MiniSeedServer hostname + port: int + MiniSeedServer port + """ + def __init__(self, host, port=2061): + self.host = host + self.port = port + self.socket = None + + def close(self): + """Close socket if open. + """ + if self.socket is not None: + try: + self.socket.close() + finally: + self.socket = None + + def connect(self, connect_attempts=2): + """Connect to socket if not already open. + """ + if self.socket is not None: + return + s = None + attempts = 0 + while True: + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(self.host, self.port) + break + except socket.error as e: + if ++attempts >= connect_attempts: + raise + print('Unable to connect (%s), trying again' % e, + file=sys.stderr) + self.socket = s + + def send(self, stream): + """Send traces to EDGE in miniseed format. + + All traces in stream will be converted to MiniSeed, and sent as-is. + + Parameters + ---------- + stream: obspy.core.Stream + stream with trace(s) to send. + """ + # connect if needed + if self.socket is None: + self.connect() + # convert stream to miniseed + buf = io.BytesIO() + stream.write(buf, format='MSEED') + # send data + self.socket.sendall(buf) -- GitLab