Newer
Older
from __future__ import absolute_import
from collections import OrderedDict
from datetime import datetime
import json
import numpy as np
from .. import ChannelConverter, TimeseriesUtility
from ..TimeseriesFactoryException import TimeseriesFactoryException
class IMFJSONWriter(object):
def write(self, out, timeseries, channels, url=None):
Parameters
----------
out: file object
file object to be written to. could be stdout
timeseries: obspy.core.stream
timeseries object with data to be written
channels: array_like
channels to be written from timeseries object
url: str
string with the requested url
Raises
------
TimeseriesFactoryException
if there is a missing channel.
for channel in channels:
if timeseries.select(channel=channel).count() == 0:
raise TimeseriesFactoryException(
'Missing channel "%s" for output, available channels %s' %
(channel, str(TimeseriesUtility.get_channels(timeseries))))
stats = timeseries[0].stats
file_dict['type'] = 'Timeseries'
file_dict['metadata'] = self._format_metadata(stats, channels)
file_dict['metadata']['url'] = url
file_dict['times'] = self._format_times(timeseries, channels)
file_dict['values'] = self._format_data(timeseries, channels, stats)
formatted_timeseries = json.dumps(file_dict,
ensure_ascii=True, separators=(',', ':')).encode('utf8')
out.write(str(formatted_timeseries))
def _format_data(self, timeseries, channels, stats):
"""Format all data lines.
Parameters
----------
timeseries : obspy.core.Stream
stream containing traces with channel listed in channels
channels : sequence
list and order of channel values to output.
stats: obspy.core.trace.stats
holds the observatory metadata
Returns
-------
array_like
an array containing dictionaries of data.
"""
values = []
for c in channels:
value_dict = OrderedDict()
trace = timeseries.select(channel=c)[0]
value_dict['id'] = c
value_dict['metadata'] = OrderedDict()
metadata = value_dict['metadata']
metadata['element'] = c
metadata['network'] = stats.network
metadata['station'] = stats.station
edge_channel = trace.stats.channel
metadata['channel'] = edge_channel
if stats.location == "":
if (stats.data_type == 'variation' or
stats.location = 'R0'
elif (stats.data_type == 'adjusted' or
stats.location = 'A0'
elif stats.data_type == 'quasi-definitive':
stats.location = 'Q0'
elif stats.data_type == 'definitive':
stats.location = 'D0'
metadata['location'] = stats.location
series = np.copy(trace.data)
if c == 'D':
series = ChannelConverter.get_minutes_from_radians(series)
# Converting numpy array to list required for JSON serialization
series = series.tolist()
series = [None if str(x) == 'nan' else x for x in series]
value_dict['values'] = series
# TODO: Add flag metadata
def _format_metadata(self, stats, channels):
"""Format metadata for json file and update dictionary
Parameters
----------
stats: obspy.core.trace.stats
holds the observatory metadata
channels: array_like
channels to be reported.
Returns
-------
dictionary
a dictionary containing metadata.
metadata_dict = OrderedDict()
intermag = OrderedDict()
imo = OrderedDict()
imo['iaga_code'] = stats.station
imo['name'] = stats.station_name
coords = [None] * 3
if 'geodetic_longitude' in stats:
coords[0] = float(stats.geodetic_longitude)
coords[1] = float(stats.geodetic_latitude)
try:
if 'elevation' in stats:
coords[2] = float(stats.elevation)
except (KeyError, ValueError, TypeError):
pass
imo['coordinates'] = coords
intermag['imo'] = imo
intermag['reported_orientation'] = ''.join(channels)
intermag['sensor_orientation'] = stats.sensor_orientation
intermag['data_type'] = stats.data_type
if 'sampling_rate' in stats:
if stats.sampling_rate == 1. / 60.:
rate = 60
elif stats.sampling_rate == 1. / 3600.:
rate = 3600
elif stats.sampling_rate == 1. / 86400.:
rate = 86400
else:
rate = 1
intermag['sampling_period'] = rate
# 1/sampling_rate to output in seconds rather than hertz
sampling = 1 / stats.sensor_sampling_rate
intermag['digital_sampling_rate'] = sampling
metadata_dict['intermagnet'] = intermag
metadata_dict['status'] = 200
generated = datetime.utcnow()
metadata_dict['generated'] = generated.strftime("%Y-%m-%dT%H:%M:%SZ")
return metadata_dict
def _format_times(self, timeseries, channels):
"""Format times for json file and update dictionary
timeseries : obspy.core.Stream
stream containing traces with channel listed in channels
channels: array_like
channels to be reported.
Returns
-------
array_like
an array containing formatted strings of time data.
"""
times = []
traces = [timeseries.select(channel=c)[0] for c in channels]
starttime = float(traces[0].stats.starttime)
delta = traces[0].stats.delta
for i in range(len(traces[0].data)):
times.append(self._format_time_string(
datetime.utcfromtimestamp(starttime + i * delta)))
return times
def _format_time_string(self, time):
Parameters
----------
time : datetime
timestamp for values
Returns
-------
unicode
formatted time.
"""
tt = time.timetuple()
return '{0.tm_year:0>4d}-{0.tm_mon:0>2d}-{0.tm_mday:0>2d}T' \
'{0.tm_hour:0>2d}:{0.tm_min:0>2d}:{0.tm_sec:0>2d}.{1:0>3d}Z' \
''.format(tt, int(time.microsecond / 1000))
def format(self, timeseries, channels, url=None):
"""Get a json formatted string.
Calls write() with a BytesIO, and returns the output.
Parameters
----------
timeseries : obspy.core.Stream
stream containing traces with channel listed in channels
channels: array_like
channels to be written from timeseries
url: str
string with the requested url
Returns
-------
unicode
json formatted string.
"""
out = BytesIO()
writer.write(out, timeseries, channels, url=url)