Skip to content
Snippets Groups Projects
IAGA2002Writer.py 10.4 KiB
Newer Older
Yash Shah's avatar
Yash Shah committed
from __future__ import absolute_import
from builtins import range
from io import BytesIO
from datetime import datetime
Jeremy M Fee's avatar
Jeremy M Fee committed
import numpy
Jeremy M Fee's avatar
Jeremy M Fee committed
import textwrap
from .. import ChannelConverter, TimeseriesUtility
from ..TimeseriesFactoryException import TimeseriesFactoryException
from ..Util import create_empty_trace
Yash Shah's avatar
Yash Shah committed
from . import IAGA2002Parser
Jeremy M Fee's avatar
Jeremy M Fee committed


class IAGA2002Writer(object):
    """IAGA2002 writer."""
    def __init__(
        self,
        empty_value=IAGA2002Parser.NINES,
        empty_channel=IAGA2002Parser.EMPTY_CHANNEL,
    ):
Jeremy M Fee's avatar
Jeremy M Fee committed
        self.empty_value = empty_value
        self.empty_channel = empty_channel
Jeremy M Fee's avatar
Jeremy M Fee committed

    def write(self, out, timeseries, channels):
        """write timeseries to iaga file

        Parameters
        ----------
        out: file object
            file object to be written to. could be stdout
        timeseries: obspy.core.stream
            timeseries object with data to be written
        channels: array_like
            channels to be written from timeseries object
        """
        for channel in channels:
            if timeseries.select(channel=channel).count() == 0:
                raise TimeseriesFactoryException(
                    'Missing channel "%s" for output, available channels %s'
                    % (channel, str(TimeseriesUtility.get_channels(timeseries)))
                )
Jeremy M Fee's avatar
Jeremy M Fee committed
        stats = timeseries[0].stats
            channels = self._pad_to_four_channels(timeseries, channels)
        out.write(self._format_headers(stats, channels).encode("utf8"))
        out.write(self._format_comments(stats).encode("utf8"))
        out.write(self._format_channels(channels, stats.station).encode("utf8"))
        out.write(self._format_data(timeseries, channels).encode("utf8"))
Jeremy M Fee's avatar
Jeremy M Fee committed

    def _format_headers(self, stats, channels):
        """format headers for IAGA2002 file

        Parameters
        ----------
        stats: obspy.core.trace.stats
            holds the observatory metadata
        channels: array_like
            channels to be reported.

        Returns
        -------
        array_like
            an array containing formatted strings of header data.
        """
Jeremy M Fee's avatar
Jeremy M Fee committed
        buf = []
        buf.append(self._format_header("Format", "IAGA-2002"))
        if "agency_name" in stats:
            buf.append(self._format_header("Source of Data", stats.agency_name))
        if "station_name" in stats:
            buf.append(self._format_header("Station Name", stats.station_name))
        buf.append(self._format_header("IAGA CODE", stats.station))
        if "geodetic_latitude" in stats:
            buf.append(
                self._format_header("Geodetic Latitude", str(stats.geodetic_latitude))
            )
        if "geodetic_longitude" in stats:
            buf.append(
                self._format_header("Geodetic Longitude", str(stats.geodetic_longitude))
            )
        if "elevation" in stats:
            buf.append(self._format_header("Elevation", str(stats.elevation)))
        buf.append(self._format_header("Reported", "".join(channels)))
        if "sensor_orientation" in stats:
            buf.append(
                self._format_header("Sensor Orientation", stats.sensor_orientation)
            )
        if "sensor_sampling_rate" in stats:
            try:
                buf.append(
                    self._format_header(
                        "Digital Sampling",
                        str(1 / stats.sensor_sampling_rate) + " second",
                    )
            except TypeError:
                buf.append(self._format_header("Digital Sampling", ""))
        if "data_interval_type" in stats:
            buf.append(
                self._format_header("Data Interval Type", stats.data_interval_type)
            )
        if "data_type" in stats:
            buf.append(self._format_header("Data Type", stats.data_type))
        return "".join(buf)
    def _format_comments(self, stats):
        """format comments for IAGA2002 file

        Parameters
        ----------
        stats: obspy.core.trace.stats
            holds the observatory metadata

        Returns
        -------
        array_like
            an array containing formatted strings of header data.
        """
        comments = []
        if (
            "declination_base" in stats
            and stats.declination_base is not None
            and (
                stats.data_type == "variation"
                or stats.data_type == "reported"
                or stats.data_type[0] == "R"
            )
        ):
            comments.append(
                "DECBAS               {:<8d}"
                "(Baseline declination value in tenths of minutes East"
                " (0-216,000)).".format(stats.declination_base)
            )
        if "filter_comments" in stats:
            comments.extend(stats.filter_comments)
        if "comments" in stats:
            comments.extend(stats.comments)
        if "is_gin" in stats and stats.is_gin:
            comments.append("This data file was constructed by the Golden " + "GIN.")
        if "is_intermagnet" in stats and stats.is_intermagnet:
            comments.append("Final data will be available on the" + " INTERMAGNET DVD.")
            comments.append(
                "Go to www.intermagnet.org for details on" + " obtaining this product."
            )
        if "conditions_of_use" in stats and stats.conditions_of_use is not None:
            comments.append("CONDITIONS OF USE: " + stats.conditions_of_use)
        # generate comment output
Jeremy M Fee's avatar
Jeremy M Fee committed
        buf = []
        for comment in comments:
            buf.append(self._format_comment(comment))
        return "".join(buf)
Jeremy M Fee's avatar
Jeremy M Fee committed

    def _format_header(self, name, value):
        """format headers for IAGA2002 file

        Parameters
        ----------
        name: str
            the name to be written
        value: str
            the value to written.

        Returns
        -------
        str
            a string formatted to be a single header line in an IAGA2002 file
        """
        prefix = " "
        suffix = " |" + linesep
        return "".join((prefix, name.ljust(23), value.ljust(44), suffix))
Jeremy M Fee's avatar
Jeremy M Fee committed

    def _format_comment(self, comment):
        """format header for IAGA2002 file

        Parameters
        ----------
        comment: str
            a single comment to be broken formatted if needed.
        Returns
        -------
        str
            a string formatted to be a single comment in an IAGA2002 file.
        """
Jeremy M Fee's avatar
Jeremy M Fee committed
        buf = []
        prefix = " # "
        suffix = " |" + linesep
Jeremy M Fee's avatar
Jeremy M Fee committed
        lines = textwrap.wrap(comment, 65)
        for line in lines:
            buf.extend((prefix, line.ljust(65), suffix))
        return "".join(buf)
Jeremy M Fee's avatar
Jeremy M Fee committed

    def _format_channels(self, channels, iaga_code):
        """Format channel header line.

        Parameters
        ----------
        channels : sequence
            list and order of channel values to output.
        iaga_code : str
            observatory code, which is prefixed to channel name in output.

        Returns
        -------
        str
            Channel header line as a string (including newline)
        """
        iaga_code_len = len(iaga_code)
        if iaga_code_len != 3 and iaga_code_len != 4:
Jeremy M Fee's avatar
Jeremy M Fee committed
            raise TimeseriesFactoryException(
                'iaga_code "{}" is not 3 characters'.format(iaga_code)
            )
Jeremy M Fee's avatar
Jeremy M Fee committed
        if len(channels) != 4:
            raise TimeseriesFactoryException("more than 4 channels {}".format(channels))
        buf = ["DATE       TIME         DOY  "]
Jeremy M Fee's avatar
Jeremy M Fee committed
        for channel in channels:
            if channel_len < 1 or channel_len > 4:
Jeremy M Fee's avatar
Jeremy M Fee committed
                raise TimeseriesFactoryException(
                    'channel "{}" is not 1 character'.format(channel)
                )
            buf.append("   {:<7s}".format(iaga_code + channel))
        buf.append("|" + linesep)
        return "".join(buf)
Jeremy M Fee's avatar
Jeremy M Fee committed

    def _format_data(self, timeseries, channels):
        """Format all data lines.

        Parameters
        ----------
        timeseries : obspy.core.Stream
            stream containing traces with channel listed in channels
        channels : sequence
            list and order of channel values to output.
        """
        buf = []
        if timeseries.select(channel="D"):
            d = timeseries.select(channel="D")
            d[0].data = ChannelConverter.get_minutes_from_radians(d[0].data)
Jeremy M Fee's avatar
Jeremy M Fee committed
        traces = [timeseries.select(channel=c)[0] for c in channels]
        starttime = float(traces[0].stats.starttime)
Jeremy M Fee's avatar
Jeremy M Fee committed
        delta = traces[0].stats.delta
        for i in range(len(traces[0].data)):
            buf.append(
                self._format_values(
                    datetime.utcfromtimestamp(starttime + i * delta),
                    (t.data[i] for t in traces),
                )
            )
        return "".join(buf)
Jeremy M Fee's avatar
Jeremy M Fee committed

    def _format_values(self, time, values):
        """Format one line of data values.

        Parameters
        ----------
        time : datetime
Jeremy M Fee's avatar
Jeremy M Fee committed
            timestamp for values
        values : sequence
            list and order of channel values to output.
            if value is NaN, self.empty_value is output in its place.

        Returns
        -------
        unicode
            Formatted line containing values.
        """
        tt = time.timetuple()
        return (
            "{0.tm_year:0>4d}-{0.tm_mon:0>2d}-{0.tm_mday:0>2d} "
            "{0.tm_hour:0>2d}:{0.tm_min:0>2d}:{0.tm_sec:0>2d}.{1:0>3d} "
            "{0.tm_yday:0>3d}   "
            " {2:9.2f} {3:9.2f} {4:9.2f} {5:9.2f}".format(
                tt,
                int(time.microsecond / 1000),
                *[self.empty_value if numpy.isnan(val) else val for val in values]
            )
            + linesep
        )
    def _pad_to_four_channels(self, timeseries, channels):
            channel = self.empty_channel
            timeseries += create_empty_trace(timeseries[0], channel)
Jeremy M Fee's avatar
Jeremy M Fee committed
    @classmethod
    def format(self, timeseries, channels):
        """Get an IAGA2002 formatted string.

        Calls write() with a BytesIO, and returns the output.
Jeremy M Fee's avatar
Jeremy M Fee committed

        Parameters
        ----------
        timeseries : obspy.core.Stream

        Returns
        -------
        unicode
          IAGA2002 formatted string.
        """
        out = BytesIO()
Jeremy M Fee's avatar
Jeremy M Fee committed
        writer = IAGA2002Writer()
        writer.write(out, timeseries, channels)
        return out.getvalue()