Skip to content
Snippets Groups Projects
Commit 3fcb4e37 authored by hasimpson-usgs's avatar hasimpson-usgs Committed by GitHub
Browse files

Merge pull request #117 from jmfee-usgs/timeseries-factory-refactor

Timeseries factory refactor
parents aa704c01 87e17ba2
No related branches found
No related tags found
No related merge requests found
......@@ -6,18 +6,18 @@ import sys
from obspy.core import Stream, UTCDateTime
from algorithm import algorithms
from PlotTimeseriesFactory import PlotTimeseriesFactory
from StreamTimeseriesFactory import StreamTimeseriesFactory
import TimeseriesUtility
# factory packages
import binlog
import edge
import iaga2002
import pcdcp
import imfv122
import imfv283
# factories for new filetypes
import temperature
import vbf
import binlog
class Controller(object):
......@@ -243,16 +243,16 @@ class Controller(object):
if output_gap[0] == options.starttime:
# found fillable gap at start, recurse to previous interval
interval = options.endtime - options.starttime
starttime = options.starttime - interval - 1
starttime = options.starttime - interval
endtime = options.starttime - 1
options.starttime = starttime
options.endtime = endtime
self.run_as_update(options, update_count + 1)
# fill gap
print >> sys.stderr, 'processing', \
options.starttime, options.endtime
options.starttime = output_gap[0]
options.endtime = output_gap[1]
print >> sys.stderr, 'processing', \
options.starttime, options.endtime
self.run(options)
......@@ -272,7 +272,6 @@ def get_input_factory(args):
input_factory = None
input_factory_args = None
input_stream = None
input_url = None
# standard arguments
input_factory_args = {}
......@@ -282,14 +281,11 @@ def get_input_factory(args):
# stream/url arguments
if args.input_file is not None:
input_stream = open(args.input_file, 'r')
input_factory_args['stream'] = input_stream
elif args.input_stdin:
input_stream = sys.stdin
input_factory_args['stream'] = input_stream
elif args.input_url is not None:
input_url = args.input_url
input_factory_args['urlInterval'] = args.input_url_interval
input_factory_args['urlTemplate'] = input_url
input_factory_args['urlTemplate'] = args.input_url
input_type = args.input
if input_type == 'edge':
......@@ -306,34 +302,21 @@ def get_input_factory(args):
server=args.input_goes_server,
user=args.input_goes_user,
**input_factory_args)
elif input_type == 'iaga2002':
if input_stream is not None:
input_factory = iaga2002.StreamIAGA2002Factory(
**input_factory_args)
elif input_url is not None:
input_factory = iaga2002.IAGA2002Factory(
**input_factory_args)
elif input_type == 'imfv122':
if input_stream is not None:
input_factory = imfv122.StreamIMFV122Factory(
**input_factory_args)
elif input_url is not None:
input_factory = imfv122.IMFV122Factory(
**input_factory_args)
elif input_type == 'imfv283':
if input_stream is not None:
input_factory = imfv283.StreamIMFV283Factory(
**input_factory_args)
elif input_url is not None:
input_factory = imfv283.IMFV283Factory(
**input_factory_args)
elif input_type == 'pcdcp':
else:
# stream compatible factories
if input_type == 'iaga2002':
input_factory = iaga2002.IAGA2002Factory(**input_factory_args)
elif input_type == 'imfv122':
input_factory = imfv122.IMFV122Factory(**input_factory_args)
elif input_type == 'imfv283':
input_factory = imfv283.IMFV283Factory(**input_factory_args)
elif input_type == 'pcdcp':
input_factory = pcdcp.PCDCPFactory(**input_factory_args)
# wrap stream
if input_stream is not None:
input_factory = pcdcp.StreamPCDCPFactory(
**input_factory_args)
elif input_url is not None:
input_factory = pcdcp.PCDCPFactory(
**input_factory_args)
input_factory = StreamTimeseriesFactory(
factory=input_factory,
stream=input_stream)
return input_factory
......@@ -363,10 +346,8 @@ def get_output_factory(args):
# stream/url arguments
if args.output_file is not None:
output_stream = open(args.output_file, 'wb')
output_factory_args['stream'] = output_stream
elif args.output_stdout:
output_stream = sys.stdout
output_factory_args['stream'] = output_stream
elif args.output_url is not None:
output_url = args.output_url
output_factory_args['urlInterval'] = args.output_url_interval
......@@ -384,41 +365,25 @@ def get_output_factory(args):
tag=args.output_edge_tag,
forceout=args.output_edge_forceout,
**output_factory_args)
elif output_type == 'iaga2002':
if output_stream is not None:
output_factory = iaga2002.StreamIAGA2002Factory(
**output_factory_args)
elif output_url is not None:
output_factory = iaga2002.IAGA2002Factory(
**output_factory_args)
elif output_type == 'pcdcp':
if output_stream is not None:
output_factory = pcdcp.StreamPCDCPFactory(
**output_factory_args)
elif output_url is not None:
output_factory = pcdcp.PCDCPFactory(
**output_factory_args)
elif output_type == 'plot':
output_factory = PlotTimeseriesFactory()
elif output_type == 'temperature':
if output_stream is not None:
output_factory = temperature.StreamTEMPFactory(
**output_factory_args)
elif output_url is not None:
output_factory = temperature.TEMPFactory(
**output_factory_args)
elif output_type == 'vbf':
if output_stream is not None:
output_factory = vbf.StreamVBFFactory(
**output_factory_args)
elif output_type == 'binlog':
else:
# stream compatible factories
if output_type == 'binlog':
output_factory = binlog.BinLogFactory(**output_factory_args)
elif output_type == 'iaga2002':
output_factory = iaga2002.IAGA2002Factory(**output_factory_args)
elif output_type == 'pcdcp':
output_factory = pcdcp.PCDCPFactory(**output_factory_args)
elif output_type == 'temperature':
output_factory = temperature.TEMPFactory(**output_factory_args)
elif output_type == 'vbf':
output_factory = vbf.VBFFactory(**output_factory_args)
# wrap stream
if output_stream is not None:
output_factory = binlog.StreamBinLogFactory(
**output_factory_args)
elif output_url is not None:
output_factory = binlog.BinLogFactory(
output=output_type,
**output_factory_args)
output_factory = StreamTimeseriesFactory(
factory=output_factory,
stream=output_stream)
return output_factory
......
"""Stream wrapper for TimeseriesFactory."""
from TimeseriesFactory import TimeseriesFactory
class StreamTimeseriesFactory(TimeseriesFactory):
"""Timeseries Factory for streams.
normally stdio.
Parameters
----------
factory: geomagio.TimeseriesFactory
wrapped factory.
stream: file object
io stream, normally either a file, or stdio
See Also
--------
Timeseriesfactory
"""
def __init__(self, factory, stream):
self.factory = factory
self.stream = stream
self.stream_data = None
def get_timeseries(self, starttime, endtime, observatory=None,
channels=None, type=None, interval=None):
"""Get timeseries using stream as input.
"""
if self.stream_data is None:
# only read stream once
self.stream_data = self.stream.read()
return self.factory.parse_string(
data=self.stream_data,
starttime=starttime,
endtime=endtime,
observatory=observatory)
def put_timeseries(self, timeseries, starttime=None, endtime=None,
channels=None, type=None, interval=None):
"""Put timeseries using stream as output.
"""
self.factory.write_file(self.stream, timeseries, channels)
"""Abstract Timeseries Factory Interface."""
import numpy
import obspy.core
import os
import sys
......@@ -119,8 +120,18 @@ class TimeseriesFactory(object):
except Exception as e:
print >> sys.stderr, "Error parsing data: " + str(e)
print >> sys.stderr, data
if channels is not None:
filtered = obspy.core.Stream()
for channel in channels:
filtered += timeseries.select(channel=channel)
timeseries = filtered
timeseries.merge()
timeseries.trim(starttime, endtime)
timeseries.trim(
starttime=starttime,
endtime=endtime,
nearest_sample=False,
pad=True,
fill_value=numpy.nan)
return timeseries
def parse_string(self, data, **kwargs):
......@@ -166,6 +177,9 @@ class TimeseriesFactory(object):
TimeseriesFactoryException
if any errors occur.
"""
if len(timeseries) == 0:
# no data to put
return
if not self.urlTemplate.startswith('file://'):
raise TimeseriesFactoryException('Only file urls are supported')
channels = channels or self.channels
......@@ -218,12 +232,6 @@ class TimeseriesFactory(object):
except NotImplementedError:
# factory only supports output
pass
except Exception as e:
print >> sys.stderr, \
'Unable to merge with existing data.' + \
'\nfile={}' + \
'\nerror={}'.format(url_file, str(e))
raise e
with open(url_file, 'wb') as fh:
try:
self.write_file(fh, url_data, channels)
......
......@@ -143,7 +143,7 @@ def read_url(url, connect_timeout=15, max_redirects=5, timeout=300):
Raises
------
urllib2.URLError
IOError
if any occurs
"""
try:
......@@ -167,6 +167,8 @@ def read_url(url, connect_timeout=15, max_redirects=5, timeout=300):
curl.setopt(pycurl.WRITEFUNCTION, out.write)
curl.perform()
content = out.getvalue()
except pycurl.error as e:
raise IOError(e.args)
finally:
curl.close()
return content
......
......@@ -15,8 +15,12 @@ import numpy
import numpy.ma
import obspy.core
from datetime import datetime
from obspy import earthworm
from obspy.core import UTCDateTime
try:
# obspy 1.x
from obspy.clients import earthworm
except:
# obspy 0.x
from obspy import earthworm
from .. import ChannelConverter, TimeseriesUtility
from ..TimeseriesFactory import TimeseriesFactory
from ..TimeseriesFactoryException import TimeseriesFactoryException
......@@ -209,8 +213,8 @@ class EdgeFactory(TimeseriesFactory):
Notes: the original timeseries object is changed.
"""
for trace in timeseries:
trace_starttime = UTCDateTime(trace.stats.starttime)
trace_endtime = UTCDateTime(trace.stats.endtime)
trace_starttime = obspy.core.UTCDateTime(trace.stats.starttime)
trace_endtime = obspy.core.UTCDateTime(trace.stats.endtime)
if trace.stats.starttime > starttime:
cnt = int((trace_starttime - starttime) / trace.stats.delta)
......@@ -530,7 +534,7 @@ class EdgeFactory(TimeseriesFactory):
type, interval)
edge_channel = self._get_edge_channel(observatory, channel,
type, interval)
data = self.client.getWaveform(network, station, location,
data = self.client.get_waveforms(network, station, location,
edge_channel, starttime, endtime)
data.merge()
if data.count() == 0:
......@@ -553,11 +557,11 @@ class EdgeFactory(TimeseriesFactory):
Returns
-------
tuple: (starttime, endtime)
starttime: UTCDateTime
endtime: UTCDateTime
starttime: obspy.core.UTCDateTime
endtime: obspy.core.UTCDateTime
"""
starttime = UTCDateTime(datetime.now())
endtime = UTCDateTime(0)
starttime = obspy.core.UTCDateTime(datetime.now())
endtime = obspy.core.UTCDateTime(0)
for trace in timeseries:
if trace.stats.starttime < starttime:
starttime = trace.stats.starttime
......@@ -575,7 +579,7 @@ class EdgeFactory(TimeseriesFactory):
Parameters
----------
timeseries: obspy.core.stream
The timeseries stream as returned by the call to getWaveform
The timeseries stream as returned by the call to get_waveforms
starttime: obspy.core.UTCDateTime
the starttime of the requested data
endtime: obspy.core.UTCDateTime
......@@ -614,8 +618,8 @@ class EdgeFactory(TimeseriesFactory):
data type.
interval: {'daily', 'hourly', 'minute', 'second'}
data interval.
starttime: UTCDateTime
endtime: UTCDateTime
starttime: obspy.core.UTCDateTime
endtime: obspy.core.UTCDateTime
Notes
-----
......@@ -630,7 +634,7 @@ class EdgeFactory(TimeseriesFactory):
edge_channel = self._get_edge_channel(observatory, channel,
type, interval)
now = UTCDateTime(datetime.utcnow())
now = obspy.core.UTCDateTime(datetime.utcnow())
if ((now - endtime) > 864000) and (self.cwbport > 0):
host = self.cwbhost
port = self.cwbport
......
......@@ -100,7 +100,7 @@ class IMFV122Parser(object):
dayminute = int(start)
hour = int(dayminute / 60)
minute = dayminute % 60
self._delta = 1
self._delta = 60
self._nexttime = UTCDateTime(
year=year,
julday=julday,
......
......@@ -2,16 +2,20 @@ from distutils.core import setup
setup(
name='geomag-algorithms',
version='0.0.0',
version='0.2.0',
description='USGS Geomag IO Library',
url='https://github.com/usgs/geomag-algorithms',
packages=[
'geomagio',
'geomagio.algorithm',
'geomagio.binlog',
'geomagio.edge',
'geomagio.iaga2002',
'geomagio.imfv122',
'geomagio.imfv283',
'geomagio.edge',
'geomagio.pcdcp'
'geomagio.pcdcp',
'geomagio.temperature',
'geomagio.vbf'
],
install_requires=[
'numpy',
......
......@@ -5,7 +5,7 @@ from geomagio.imfv122 import IMFV122Parser
from obspy.core import UTCDateTime
def test_imfv122_parse_header__minutes():
def test_imfv122_parse_header__hour_of_day():
"""imfv122_test.test_imfv122_parse_header__minutes.
"""
parser = IMFV122Parser()
......@@ -21,7 +21,7 @@ def test_imfv122_parse_header__minutes():
assert_equals(parser._nexttime, UTCDateTime('2016-05-02T03:00:00Z'))
def test_imfv122_parse_header__seconds():
def test_imfv122_parse_header__minute_of_day():
"""imfv122_test.test_imfv122_parse_header__seconds.
"""
parser = IMFV122Parser()
......@@ -33,7 +33,7 @@ def test_imfv122_parse_header__seconds():
assert_equals(metadata['geodetic_latitude'], 124.4)
assert_equals(metadata['geodetic_longitude'], 19.2)
assert_equals(metadata['station'], 'HER')
assert_equals(parser._delta, 1)
assert_equals(parser._delta, 60)
assert_equals(parser._nexttime, UTCDateTime('2016-01-01T02:03:00Z'))
......@@ -44,12 +44,14 @@ def test_imfv122_parse_data():
parser._parse_header(
'HER JAN0116 001 0123 HDZF R EDI 12440192 -14161 DRRRRRRRRRRRRRRR')
parser._parse_data('1234 5678 9101 1121 3141 5161 7181 9202')
import pprint
pprint.pprint(parser._parsedata)
assert_equals(parser._parsedata[0][0], UTCDateTime('2016-01-01T02:03:00Z'))
assert_equals(parser._parsedata[1][0], '1234')
assert_equals(parser._parsedata[2][0], '5678')
assert_equals(parser._parsedata[3][0], '9101')
assert_equals(parser._parsedata[4][0], '1121')
assert_equals(parser._parsedata[0][1], UTCDateTime('2016-01-01T02:03:01Z'))
assert_equals(parser._parsedata[0][1], UTCDateTime('2016-01-01T02:04:00Z'))
assert_equals(parser._parsedata[1][1], '3141')
assert_equals(parser._parsedata[2][1], '5161')
assert_equals(parser._parsedata[3][1], '7181')
......@@ -69,7 +71,7 @@ def test_imfv122_post_process():
assert_equals(parser.data['D'][0], 56.78)
assert_equals(parser.data['Z'][0], 910.1)
assert_equals(parser.data['F'][0], 112.1)
assert_equals(parser.times[1], UTCDateTime('2016-01-01T02:03:01Z'))
assert_equals(parser.times[1], UTCDateTime('2016-01-01T02:04:00Z'))
assert_equals(parser.data['H'][1], 314.1)
assert_equals(parser.data['D'][1], 51.61)
assert_equals(parser.data['Z'][1], 718.1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment