Skip to content
Snippets Groups Projects
IAGA2002Writer.py 9.17 KiB
Newer Older
  • Learn to ignore specific revisions
  • Jeremy M Fee's avatar
    Jeremy M Fee committed
    
    from cStringIO import StringIO
    
    from datetime import datetime
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
    import numpy
    import textwrap
    
    from .. import ChannelConverter
    from ..TimeseriesFactoryException import TimeseriesFactoryException
    import IAGA2002Parser
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
    
    
    class IAGA2002Writer(object):
        """IAGA2002 writer.
        """
    
        def __init__(self, empty_value=IAGA2002Parser.NINES):
            self.empty_value = empty_value
    
        def write(self, out, timeseries, channels):
    
            """write timeseries to iaga file
    
            Parameters
            ----------
            out: file object
                file object to be written to. could be stdout
            timeseries: obspy.core.stream
                timeseries object with data to be written
            channels: array_like
                channels to be written from timeseries object
            """
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
            stats = timeseries[0].stats
    
            if len(channels) != 4:
                self._pad_to_four_channels(timeseries, channels)
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
            out.write(self._format_headers(stats, channels))
    
            out.write(self._format_comments(stats))
            out.write(self._format_channels(channels, stats.station))
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
            out.write(self._format_data(timeseries, channels))
    
    
        def _create_empty_trace(self, trace, channel):
            """
            Utility to create a trace containing the given numpy array.
    
            Parameters
            ----------
            stream: obspy.core.stream
    
            Returns
            -------
            obspy.core.Trace
                Trace a duplicated empty channel.
            """
            stats = obspy.core.Stats(trace.stats)
            stats.channel = channel
            count = len(trace.data)
            numpy_data = numpy.full((count), numpy.nan)
            return obspy.core.Trace(numpy_data, stats)
    
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
        def _format_headers(self, stats, channels):
    
            """format headers for IAGA2002 file
    
            Parameters
            ----------
            stats: obspy.core.trace.stats
                holds the observatory metadata
            channels: array_like
                channels to be reported.
    
            Returns
            -------
            array_like
                an array containing formatted strings of header data.
            """
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
            buf = []
    
            buf.append(self._format_header('Format', 'IAGA-2002'))
            buf.append(self._format_header('Source of Data', stats.agency_name))
            buf.append(self._format_header('Station Name', stats.station_name))
            buf.append(self._format_header('IAGA CODE', stats.station))
            buf.append(self._format_header('Geodetic Latitude',
                    stats.geodetic_latitude))
            buf.append(self._format_header('Geodetic Longitude',
                    stats.geodetic_longitude))
            buf.append(self._format_header('Elevation', stats.elevation))
            buf.append(self._format_header('Reported', ''.join(channels)))
            buf.append(self._format_header('Sensor Orientation',
                    stats.sensor_orientation))
            buf.append(self._format_header('Digital Sampling',
                    str(1 / stats.sensor_sampling_rate) + ' second'))
            buf.append(self._format_header('Data Interval Type',
                    stats.data_interval_type))
            buf.append(self._format_header('Data Type', stats.data_type))
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
            return ''.join(buf)
    
    
        def _format_comments(self, stats):
    
            """format comments for IAGA2002 file
    
            Parameters
            ----------
            stats: obspy.core.trace.stats
                holds the observatory metadata
    
            Returns
            -------
            array_like
                an array containing formatted strings of header data.
            """
    
            comments = []
            if 'declination_base' in stats:
    
                comments.append('DECBAS               {:<8d}'
                        '(Baseline declination value in tenths of minutes East'
    
                        ' (0-216,000)).'.format(stats.declination_base))
            if 'filter_comments' in stats:
                comments.extend(stats.filter_comments)
            if 'comments' in stats:
                comments.extend(stats.comments)
    
            if 'is_gin' in stats and stats.is_gin:
    
                comments.append('This data file was constructed by the Golden ' +
    
            if 'is_intermagnet' in stats and stats.is_intermagnet:
                comments.append('Final data will be available on the' +
                        ' INTERMAGNET DVD.')
                comments.append('Go to www.intermagnet.org for details on' +
                        ' obtaining this product.')
            if 'conditions_of_use' in stats:
    
                comments.append('CONDITIONS OF USE: ' + stats.conditions_of_use)
    
            # generate comment output
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
            buf = []
            for comment in comments:
                buf.append(self._format_comment(comment))
            return ''.join(buf)
    
        def _format_header(self, name, value):
    
            """format headers for IAGA2002 file
    
            Parameters
            ----------
            name: str
                the name to be written
            value: str
                the value to written.
    
            Returns
            -------
            str
                a string formatted to be a single header line in an IAGA2002 file
            """
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
            prefix = ' '
            suffix = ' |\n'
            return ''.join((prefix, name.ljust(23), value.ljust(44), suffix))
    
        def _format_comment(self, comment):
    
            """format header for IAGA2002 file
    
            Parameters
            ----------
            comment: str
                a single comment to be broken formatted if needed.
            Returns
            -------
            str
                a string formatted to be a single comment in an IAGA2002 file.
            """
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
            buf = []
            prefix = ' # '
            suffix = ' |\n'
            lines = textwrap.wrap(comment, 65)
            for line in lines:
                buf.extend((prefix, line.ljust(65), suffix))
            return ''.join(buf)
    
        def _format_channels(self, channels, iaga_code):
            """Format channel header line.
    
            Parameters
            ----------
            channels : sequence
                list and order of channel values to output.
            iaga_code : str
                observatory code, which is prefixed to channel name in output.
    
            Returns
            -------
            str
                Channel header line as a string (including newline)
            """
    
            iaga_code_len = len(iaga_code)
            if iaga_code_len != 3 and iaga_code_len != 4:
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
                raise TimeseriesFactoryException(
                        'iaga_code "{}" is not 3 characters'.format(iaga_code))
            if len(channels) != 4:
                raise TimeseriesFactoryException(
                        'more than 4 channels {}'.format(channels))
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
            for channel in channels:
    
                if len(channel) != 1 and len(channel) != 3:
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
                    raise TimeseriesFactoryException(
                            'channel "{}" is not 1 character'.format(channel))
    
                buf.append('   %7s' % (iaga_code + channel) )
            buf.append('|\n')
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
            return ''.join(buf)
    
        def _format_data(self, timeseries, channels):
            """Format all data lines.
    
            Parameters
            ----------
            timeseries : obspy.core.Stream
                stream containing traces with channel listed in channels
            channels : sequence
                list and order of channel values to output.
            """
            buf = []
    
            if timeseries.select(channel='D'):
                d = timeseries.select(channel='D')
                d[0].data = ChannelConverter.get_minutes_from_radians(d[0].data)
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
            traces = [timeseries.select(channel=c)[0] for c in channels]
    
            starttime = float(traces[0].stats.starttime)
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
            delta = traces[0].stats.delta
            for i in xrange(len(traces[0].data)):
                buf.append(self._format_values(
    
                    datetime.utcfromtimestamp(starttime + i * delta),
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
                    (t.data[i] for t in traces)))
            return ''.join(buf)
    
        def _format_values(self, time, values):
            """Format one line of data values.
    
            Parameters
            ----------
    
            time : datetime
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
                timestamp for values
            values : sequence
                list and order of channel values to output.
                if value is NaN, self.empty_value is output in its place.
    
            Returns
            -------
            unicode
                Formatted line containing values.
            """
    
            tt = time.timetuple()
            return '{0.tm_year:0>4d}-{0.tm_mon:0>2d}-{0.tm_mday:0>2d} ' \
                    '{0.tm_hour:0>2d}:{0.tm_min:0>2d}:{0.tm_sec:0>2d}.{1:0>3d} ' \
                    '{0.tm_yday:0>3d}   ' \
                    '{2:10.2f}{3:10.2f}{4:10.2f}{5:10.2f}\n'.format(
                    tt, int(time.microsecond / 1000),
                    *[self.empty_value if numpy.isnan(val) else val
                            for val in values])
    
        def _pad_to_four_channels(self, timeseries, channels):
            for x in range(len(channels), 4):
                channel = ' - '
                channels.append(channel)
                timeseries += self._create_empty_trace(timeseries[0], channel)
    
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
        @classmethod
        def format(self, timeseries, channels):
            """Get an IAGA2002 formatted string.
    
            Calls write() with a StringIO, and returns the output.
    
            Parameters
            ----------
            timeseries : obspy.core.Stream
    
            Returns
            -------
            unicode
              IAGA2002 formatted string.
            """
            out = StringIO()
            writer = IAGA2002Writer()
            writer.write(out, timeseries, channels)
            return out.getvalue()