Skip to content
Snippets Groups Projects
IAGA2002Writer.py 9.71 KiB
Newer Older
Jeremy M Fee's avatar
Jeremy M Fee committed

from cStringIO import StringIO
from datetime import datetime
Jeremy M Fee's avatar
Jeremy M Fee committed
import numpy
import textwrap
from .. import ChannelConverter, TimeseriesUtility
from ..TimeseriesFactoryException import TimeseriesFactoryException
from ..Util import create_empty_trace
import IAGA2002Parser
Jeremy M Fee's avatar
Jeremy M Fee committed


class IAGA2002Writer(object):
    """IAGA2002 writer.
    """

    def __init__(self, empty_value=IAGA2002Parser.NINES,
            empty_channel=IAGA2002Parser.EMPTY_CHANNEL):
Jeremy M Fee's avatar
Jeremy M Fee committed
        self.empty_value = empty_value
        self.empty_channel = empty_channel
Jeremy M Fee's avatar
Jeremy M Fee committed

    def write(self, out, timeseries, channels):
        """write timeseries to iaga file

        Parameters
        ----------
        out: file object
            file object to be written to. could be stdout
        timeseries: obspy.core.stream
            timeseries object with data to be written
        channels: array_like
            channels to be written from timeseries object
        """
        for channel in channels:
            if timeseries.select(channel=channel).count() == 0:
                raise TimeseriesFactoryException(
                    'Missing channel "%s" for output, available channels %s' %
                    (channel, str(TimeseriesUtility.get_channels(timeseries))))
Jeremy M Fee's avatar
Jeremy M Fee committed
        stats = timeseries[0].stats
            channels = self._pad_to_four_channels(timeseries, channels)
Jeremy M Fee's avatar
Jeremy M Fee committed
        out.write(self._format_headers(stats, channels))
        out.write(self._format_comments(stats))
        out.write(self._format_channels(channels, stats.station))
Jeremy M Fee's avatar
Jeremy M Fee committed
        out.write(self._format_data(timeseries, channels))

    def _format_headers(self, stats, channels):
        """format headers for IAGA2002 file

        Parameters
        ----------
        stats: obspy.core.trace.stats
            holds the observatory metadata
        channels: array_like
            channels to be reported.

        Returns
        -------
        array_like
            an array containing formatted strings of header data.
        """
Jeremy M Fee's avatar
Jeremy M Fee committed
        buf = []
        buf.append(self._format_header('Format', 'IAGA-2002'))
        if 'agency_name' in stats:
Jeremy M Fee's avatar
Jeremy M Fee committed
            buf.append(self._format_header('Source of Data',
                    stats.agency_name))
        if 'station_name' in stats:
            buf.append(self._format_header('Station Name', stats.station_name))
        buf.append(self._format_header('IAGA CODE', stats.station))
        if 'geodetic_latitude' in stats:
            buf.append(self._format_header('Geodetic Latitude',
                    str(stats.geodetic_latitude)))
        if 'geodetic_longitude' in stats:
            buf.append(self._format_header('Geodetic Longitude',
                    str(stats.geodetic_longitude)))
        if 'elevation' in stats:
            buf.append(self._format_header('Elevation', stats.elevation))
        buf.append(self._format_header('Reported', ''.join(channels)))
        if 'sensor_orientation' in stats:
            buf.append(self._format_header('Sensor Orientation',
                    stats.sensor_orientation))
        if 'sensor_sampling_rate' in stats:
            buf.append(self._format_header('Digital Sampling',
                    str(1 / stats.sensor_sampling_rate) + ' second'))
        if 'data_interval_type' in stats:
            buf.append(self._format_header('Data Interval Type',
                    stats.data_interval_type))
        if 'data_type' in stats:
            buf.append(self._format_header('Data Type', stats.data_type))
Jeremy M Fee's avatar
Jeremy M Fee committed
        return ''.join(buf)

    def _format_comments(self, stats):
        """format comments for IAGA2002 file

        Parameters
        ----------
        stats: obspy.core.trace.stats
            holds the observatory metadata

        Returns
        -------
        array_like
            an array containing formatted strings of header data.
        """
        comments = []
        if 'declination_base' in stats and stats.declination_base is not None:
            comments.append('DECBAS               {:<8d}'
                    '(Baseline declination value in tenths of minutes East'
                    ' (0-216,000)).'.format(stats.declination_base))
        if 'filter_comments' in stats:
            comments.extend(stats.filter_comments)
        if 'comments' in stats:
            comments.extend(stats.comments)
        if 'is_gin' in stats and stats.is_gin:
            comments.append('This data file was constructed by the Golden ' +
        if 'is_intermagnet' in stats and stats.is_intermagnet:
            comments.append('Final data will be available on the' +
                    ' INTERMAGNET DVD.')
            comments.append('Go to www.intermagnet.org for details on' +
                    ' obtaining this product.')
Jeremy M Fee's avatar
Jeremy M Fee committed
        if 'conditions_of_use' in stats and \
                stats.conditions_of_use is not None:
            comments.append('CONDITIONS OF USE: ' + stats.conditions_of_use)
        # generate comment output
Jeremy M Fee's avatar
Jeremy M Fee committed
        buf = []
        for comment in comments:
            buf.append(self._format_comment(comment))
        return ''.join(buf)

    def _format_header(self, name, value):
        """format headers for IAGA2002 file

        Parameters
        ----------
        name: str
            the name to be written
        value: str
            the value to written.

        Returns
        -------
        str
            a string formatted to be a single header line in an IAGA2002 file
        """
Jeremy M Fee's avatar
Jeremy M Fee committed
        prefix = ' '
        suffix = ' |\n'
        return ''.join((prefix, name.ljust(23), value.ljust(44), suffix))

    def _format_comment(self, comment):
        """format header for IAGA2002 file

        Parameters
        ----------
        comment: str
            a single comment to be broken formatted if needed.
        Returns
        -------
        str
            a string formatted to be a single comment in an IAGA2002 file.
        """
Jeremy M Fee's avatar
Jeremy M Fee committed
        buf = []
        prefix = ' # '
        suffix = ' |\n'
        lines = textwrap.wrap(comment, 65)
        for line in lines:
            buf.extend((prefix, line.ljust(65), suffix))
        return ''.join(buf)

    def _format_channels(self, channels, iaga_code):
        """Format channel header line.

        Parameters
        ----------
        channels : sequence
            list and order of channel values to output.
        iaga_code : str
            observatory code, which is prefixed to channel name in output.

        Returns
        -------
        str
            Channel header line as a string (including newline)
        """
        iaga_code_len = len(iaga_code)
        if iaga_code_len != 3 and iaga_code_len != 4:
Jeremy M Fee's avatar
Jeremy M Fee committed
            raise TimeseriesFactoryException(
                    'iaga_code "{}" is not 3 characters'.format(iaga_code))
        if len(channels) != 4:
            raise TimeseriesFactoryException(
                    'more than 4 channels {}'.format(channels))
Jeremy M Fee's avatar
Jeremy M Fee committed
        for channel in channels:
            channel_len = len(channel)
            if channel_len < 1 or channel_len > 3:
Jeremy M Fee's avatar
Jeremy M Fee committed
                raise TimeseriesFactoryException(
                        'channel "{}" is not 1 character'.format(channel))
            buf.append('   {:<7s}'.format(iaga_code + channel))
Jeremy M Fee's avatar
Jeremy M Fee committed
        return ''.join(buf)

    def _format_data(self, timeseries, channels):
        """Format all data lines.

        Parameters
        ----------
        timeseries : obspy.core.Stream
            stream containing traces with channel listed in channels
        channels : sequence
            list and order of channel values to output.
        """
        buf = []
        if timeseries.select(channel='D'):
            d = timeseries.select(channel='D')
            d[0].data = ChannelConverter.get_minutes_from_radians(d[0].data)
Jeremy M Fee's avatar
Jeremy M Fee committed
        traces = [timeseries.select(channel=c)[0] for c in channels]
        starttime = float(traces[0].stats.starttime)
Jeremy M Fee's avatar
Jeremy M Fee committed
        delta = traces[0].stats.delta
        for i in xrange(len(traces[0].data)):
            buf.append(self._format_values(
                datetime.utcfromtimestamp(starttime + i * delta),
Jeremy M Fee's avatar
Jeremy M Fee committed
                (t.data[i] for t in traces)))
        return ''.join(buf)

    def _format_values(self, time, values):
        """Format one line of data values.

        Parameters
        ----------
        time : datetime
Jeremy M Fee's avatar
Jeremy M Fee committed
            timestamp for values
        values : sequence
            list and order of channel values to output.
            if value is NaN, self.empty_value is output in its place.

        Returns
        -------
        unicode
            Formatted line containing values.
        """
        tt = time.timetuple()
        return '{0.tm_year:0>4d}-{0.tm_mon:0>2d}-{0.tm_mday:0>2d} ' \
                '{0.tm_hour:0>2d}:{0.tm_min:0>2d}:{0.tm_sec:0>2d}.{1:0>3d} ' \
                '{0.tm_yday:0>3d}   ' \
                '{2:10.2f}{3:10.2f}{4:10.2f}{5:10.2f}\n'.format(
                tt, int(time.microsecond / 1000),
                *[self.empty_value if numpy.isnan(val) else val
                        for val in values])
    def _pad_to_four_channels(self, timeseries, channels):
            channel = self.empty_channel
            timeseries += create_empty_trace(timeseries[0], channel)
Jeremy M Fee's avatar
Jeremy M Fee committed
    @classmethod
    def format(self, timeseries, channels):
        """Get an IAGA2002 formatted string.

        Calls write() with a StringIO, and returns the output.

        Parameters
        ----------
        timeseries : obspy.core.Stream

        Returns
        -------
        unicode
          IAGA2002 formatted string.
        """
        out = StringIO()
        writer = IAGA2002Writer()
        writer.write(out, timeseries, channels)
        return out.getvalue()