Skip to content
Snippets Groups Projects
magproc.py 8.19 KiB
Newer Older
  • Learn to ignore specific revisions
  • import os
    import sys
    
    from typing import List
    
    
    from dateutil.relativedelta import relativedelta
    
    from obspy.core import UTCDateTime, Stream
    
    from ..algorithm.FilterAlgorithm import FilterAlgorithm
    
    from ..pcdcp import PCDCPFactory, PCDCP_FILE_PATTERN
    
    from ..metadata import Metadata, MetadataCategory, MetadataFactory
    from ..residual import CalFileFactory, Reading, WebAbsolutesFactory
    
    from ..Util import get_intervals
    
    class ReadingsFactory(str, Enum):
        METADATA_FACTORY = "metadatafactory"
        WEB_ABSOLUTES = "webabsolutes"
    
    
    
    CAL_TEMPLATE = "{OBSERVATORY}/{OBSERVATORY}{YEAR}PCD.cal"
    PCDCP_TEMPLATE = f"%(OBS)s/{PCDCP_FILE_PATTERN}"
    
        """Entrypoint for magproc-prepfiles command defined in setup.py.
    
        Runs prepfiles() with typer for argument parsing and usage.
        """
    
        observatory: str = typer.Argument(
            ...,
            help="Observatory code",
        ),
        year: int = typer.Option(
            default=datetime.now().year,
            help="Search year, default",
        ),
        month: int = typer.Option(
            default=datetime.now().month,
            help="Search month, default",
        ),
        factory: ReadingsFactory = ReadingsFactory.WEB_ABSOLUTES,
    
        calibration_path: str = typer.Argument(
            os.getenv("CALIBRATION_PATH", "file://c:/Calibrat")
        ),
        second_path: str = typer.Argument(os.getenv("SECOND_PATH", "file://c:/RAW")),
        minute_path: str = typer.Argument(os.getenv("MINUTE_PATH", "file://c:/USGSDCP")),
        temperature_path: str = typer.Argument(
            os.getenv("TEMPERATURE_PATH", "file://c:/DEG")
        ),
        edge_host: str = typer.Argument(os.getenv("EDGE_HOST", "edgecwb.usgs.gov")),
        metadata_token: str = typer.Option(
            default=os.getenv("GITLAB_API_TOKEN"),
            help="Token for metadata web service.",
            metavar="TOKEN",
            show_default="environment variable GITLAB_API_TOKEN",
        ),
        metadata_url: str = typer.Option(
            default="https://geomag.usgs.gov/ws/secure/metadata",
            help="URL to metadata web service",
            metavar="URL",
        ),
        web_absolutes_url: str = typer.Option(
            default="https://geomag.usgs.gov/baselines/observation.json.php",
            help="URL to web absolutes service",
            metavar="URL",
        ),
    
        month_start = datetime(year, month, 1)
        month_end = month_start + relativedelta(months=1)
    
        # Confirm which factory to use
        if factory.value == ReadingsFactory.WEB_ABSOLUTES:
            factory = WebAbsolutesFactory(url=web_absolutes_url)
        else:
            factory = MetadataFactory(token=metadata_token, url=metadata_url)
    
            starttime=UTCDateTime(month_start - relativedelta(months=1)),
            endtime=UTCDateTime(month_end + relativedelta(months=1)),
            observatory=observatory,
    
            template="file://" + os.path.join(calibration_path, CAL_TEMPLATE),
    
    
        intervals = get_intervals(UTCDateTime(month_start), UTCDateTime(month_end))
        for interval in intervals:
            # Variation data
            write_variation_data(
                host=edge_host,
                starttime=interval["start"],
                endtime=interval["end"],
                observatory=observatory,
                second_template="file://" + os.path.join(second_path, PCDCP_TEMPLATE),
                minute_template="file://" + os.path.join(minute_path, PCDCP_TEMPLATE),
            )
            # Temperature data
            write_temperature_data(
                host=edge_host,
                starttime=interval["start"],
                endtime=interval["end"],
                observatory=observatory,
                template="file://" + os.path.join(temperature_path, PCDCP_TEMPLATE),
            )
    
    
    def write_cal_file(
    
        starttime: UTCDateTime,
        endtime: UTCDateTime,
        observatory: str,
        template: str,
    
        print(
            f"Loading calibration data for {observatory} [{starttime}, {endtime}]",
            file=sys.stderr,
        )
        url = template.format(OBSERVATORY=observatory, YEAR=starttime.year)
    
        try:
            readings = factory.get_readings(
                observatory=observatory,
                starttime=starttime,
                endtime=endtime,
                include_measurements=True,
            )
        except:
            query = Metadata(
                category=MetadataCategory.READING,
                starttime=starttime,
                endtime=endtime,
                station=observatory,
    
            )
            metadata = factory.get_metadata(query=query)
            # convert returned metadata.metadata into Reading objects
            readings = []
    
            for meta in metadata:
                reading = Reading(**meta.metadata)
    
        # write cal file to specified path
    
        CalFileFactory().write_file(url=url, readings=readings)
    
    def write_pcdcp_file(
        starttime: UTCDateTime,
        endtime: UTCDateTime,
        timeseries: Stream,
        observatory: str,
        interval: str,
        channels: List[str],
        template: str = PCDCP_FILE_PATTERN,
    
        temperatures=False,
    
        PCDCPFactory(
            urlInterval=86400, urlTemplate=template, temperatures=temperatures
        ).put_timeseries(
    
            timeseries=timeseries,
            starttime=starttime,
            endtime=endtime,
            channels=channels,
            interval=interval,
    
            type="variation",
        )
    
    
    
    def write_temperature_data(
        host: str,
        starttime: UTCDateTime,
        endtime: UTCDateTime,
        observatory: str,
        template: str = PCDCP_FILE_PATTERN,
    ) -> Stream:
        algorithm = FilterAlgorithm(input_sample_period=60.0, output_sample_period=3600.0)
        factory = EdgeFactory(host=host)
        # load minute temperature data
    
        f_starttime, f_endtime = algorithm.get_input_interval(starttime, endtime)
        print(
            f"Loading minute temperature data for {observatory} [{f_starttime}, {f_endtime}]",
            file=sys.stderr,
        )
        timeseries_temp = factory.get_timeseries(
            starttime=f_starttime,
            endtime=f_endtime,
            observatory=observatory,
            channels=["UK1", "UK2", "UK3", "UK4"],
            type="variation",
            interval="minute",
        )
        # filter to one hour
        print(
            f"Generating hourly temperature data for {observatory} [{starttime}, {endtime}]",
            file=sys.stderr,
        )
        timeseries_temperature = algorithm.process(timeseries_temp)
        # write data
        write_pcdcp_file(
            starttime=starttime,
            endtime=endtime,
            timeseries=timeseries_temperature,
            observatory=observatory,
    
            interval="hour",
    
            channels=["UK1", "UK2", "UK3", "UK4"],
            template=template,
    
            temperatures=True,
    
        starttime: UTCDateTime,
        endtime: UTCDateTime,
        observatory: str,
    
        second_template: str = PCDCP_FILE_PATTERN,
        minute_template: str = PCDCP_FILE_PATTERN,
    
        algorithm = FilterAlgorithm(input_sample_period=1.0, output_sample_period=60.0)
        factory = EdgeFactory(host=host)
    
        # load second data
        f_starttime, f_endtime = algorithm.get_input_interval(starttime, endtime)
        print(
            f"Loading second variation data for {observatory} [{f_starttime}, {f_endtime}]",
            file=sys.stderr,
        )
        timeseries_second = factory.get_timeseries(
            starttime=f_starttime,
            endtime=f_endtime,
            observatory=observatory,
            channels=["H", "E", "Z", "F"],
            type="variation",
            interval="second",
        )
        # filter to one minute
        print(
            f"Generating one minute variation data for {observatory} [{starttime}, {endtime}]",
            file=sys.stderr,
        )
        timeseries_minute = algorithm.process(timeseries_second)
        # write files
        write_pcdcp_file(
            starttime=starttime,
            endtime=endtime,
            timeseries=timeseries_second.trim(starttime, endtime),
            observatory=observatory,
            interval="second",
            channels=["H", "E", "Z", "F"],
            template=second_template,
        )
        write_pcdcp_file(
            starttime=starttime,
            endtime=endtime,
            timeseries=timeseries_minute,
            observatory=observatory,
            interval="minute",
            channels=["H", "E", "Z", "F"],
            template=minute_template,
        )