Skip to content
Snippets Groups Projects
PCDCPWriter.py 3.69 KiB
Newer Older
  • Learn to ignore specific revisions
  • 
    from cStringIO import StringIO
    from geomagio import TimeseriesFactoryException, ChannelConverter
    import numpy
    import PCDCPParser
    import textwrap
    from datetime import datetime
    
    
    class PCDCPWriter(object):
        """PCDCP writer.
        """
    
        def __init__(self, empty_value=PCDCPParser.NINES):
            self.empty_value = empty_value
    
        def write(self, out, timeseries, channels):
            """write timeseries to pcdcp file
    
            Parameters
            ----------
                out : file object
                    File object to be written to. Could be stdout.
                timeseries : obspy.core.stream
                    Timeseries object with data to be written.
                channels : array_like
                    Channels to be written from timeseries object
            """
            stats = timeseries[0].stats
            out.write(self._format_header(stats, channels))
            out.write(self._format_data(timeseries, channels))
    
        def _format_header(self, name, value):
            """format headers for PCDCP file
    
            Parameters
            ----------
            name: str
                the name to be written
            value: str
                the value to written.
    
            Returns
            -------
            str
                a string formatted to be a single header line in a PCDCP file
            """
            observatory = 'BOU'
            year = '2015'
            date = '01-Jan-15'
            space = '  '
    
            return ''.join(observatory, space, year, space, '001', space,
                        date, space, 'HEZF  0.01nT  File Version 2.00')
    
        def _format_data(self, timeseries, channels):
            """Format all data lines.
    
            Parameters
            ----------
            timeseries : obspy.core.Stream
                stream containing traces with channel listed in channels
            channels : sequence
                list and order of channel values to output.
            """
            buf = []
            if timeseries.select(channel='D'):
                d = timeseries.select(channel='D')
                d[0].data = ChannelConverter.get_minutes_from_radians(d[0].data)
    
            traces = [timeseries.select(channel=c)[0] for c in channels]
            starttime = float(traces[0].stats.starttime)
            delta = traces[0].stats.delta
    
            for i in xrange(len(traces[0].data)):
                buf.append(self._format_values(
                    datetime.utcfromtimestamp(starttime + i * delta),
                    (t.data[i] for t in traces)))
    
            return ''.join(buf)
    
        def _format_values(self, time, values):
            """Format one line of data values.
    
            Parameters
            ----------
            time : datetime
                timestamp for values
            values : sequence
                list and order of channel values to output.
                if value is NaN, self.empty_value is output in its place.
    
            Returns
            -------
            unicode
                Formatted line containing values.
            """
            tt = time.timetuple()
            return '{0.tm_year:0>4d}-{0.tm_mon:0>2d}-{0.tm_mday:0>2d} ' \
                    '{0.tm_hour:0>2d}:{0.tm_min:0>2d}:{0.tm_sec:0>2d}.{1:0>3d} ' \
                    '{0.tm_yday:0>3d}   ' \
                    '{2:10.2f}{3:10.2f}{4:10.2f}{5:10.2f}\n'.format(
                    tt, int(time.microsecond / 1000),
                    *[self.empty_value if numpy.isnan(val) else val
                            for val in values])
    
        @classmethod
        def format(self, timeseries, channels):
            """Get an PCDCP formatted string.
    
            Calls write() with a StringIO, and returns the output.
    
            Parameters
            ----------
            timeseries : obspy.core.Stream
    
            Returns
            -------
            unicode
              PCDCP formatted string.
            """
            out = StringIO()
            writer = PCDCPWriter()
            writer.write(out, timeseries, channels)
            return out.getvalue()