Skip to content
Snippets Groups Projects
copy_absolutes.py 4.77 KiB
Newer Older
  • Learn to ignore specific revisions
  • # -*- coding: utf-8 -*-
    """
    Created on Thu Jul  7 11:05:19 2022
    
    @author: awernle
    """
    import json
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
    import os
    
    from datetime import date, datetime, time, timedelta
    from typing import List
    
    import typer
    
    from ..metadata import Metadata, MetadataFactory, MetadataCategory
    from ..residual import Reading, WebAbsolutesFactory, SpreadsheetAbsolutesFactory
    
    class factories(str, Enum):
        web_absolutes_factory = "web_absolutes_factory"
        spreadsheet_factory = "spreadsheet_factory"
    
    
    
    def copy_absolutes(
        observatory: str = typer.Option(..., help="Observatory code"),
        starttime: datetime = typer.Option(
            default=TODAY - timedelta(days=1), help="Search start time, default "
        ),
        endtime: datetime = typer.Option(default=TODAY),
        metadata_token: str = typer.Option(
            default=os.getenv("GITLAB_API_TOKEN"),
            help="Token for metadata web service.",
            metavar="TOKEN",
            show_default="environment variable GITLAB_API_TOKEN",
        ),
        metadata_url: str = typer.Option(
    
            help="URL to metadata web service",
            metavar="URL",
        ),
        web_absolutes_url: str = typer.Option(
            default="https://geomag.usgs.gov/baselines/observation.json.php",
            help="URL to web absolutes service",
            metavar="URL",
        ),
    
        spreadsheets_dir: str = typer.Option(
            default="---", help="Directory to residual spreadsheets", metavar="DIR"
        ),
        # I am not sure what to specify as the default directory, I was using a local directory as a test
        factory: factories = factories.web_absolutes_factory,
    
        """Copy absolutes from the web absolutes service OR residual spreadsheets into the metadata service."""
        print(f"Using factory: {factory.value}")
    
        if factory.value == factories.web_absolutes_factory:
            factory = WebAbsolutesFactory(url=web_absolutes_url)
        else:
            factory = SpreadsheetAbsolutesFactory(base_directory=spreadsheets_dir)
    
    
            observatory=observatory,
            starttime=UTCDateTime(starttime),
            endtime=UTCDateTime(endtime),
        )
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
        print(f"Found {len(readings)} absolutes")
    
        # write readings to metadata service
        metadata_factory = MetadataFactory(token=metadata_token, url=metadata_url)
        with typer.progressbar(
            iterable=readings, label="Uploading to metadata service"
        ) as progressbar:
            for reading in progressbar:
                upload_reading(factory=metadata_factory, reading=reading)
    
    
    def create_reading_metadata(reading: Reading) -> Metadata:
        """Create reading metadata object.
    
        Parameters
        ----------
        reading:
            reading to convert to metadata.
    
        Returns
        -------
        metadata object for reading
        """
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
        measurement_times = [m.time for m in reading.measurements if m.time]
    
        metadata = Metadata(
            category=MetadataCategory.READING,
            created_by=reading.metadata.get("observer", "copy_absolutes"),
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
            endtime=max(measurement_times),
            metadata=json.loads(reading.json()),
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
            starttime=min(measurement_times),
    
    Jeremy M Fee's avatar
    Jeremy M Fee committed
            status="reviewed" if reading.metadata.get("reviewed") else "new",
            updated_by=reading.metadata.get("reviewer"),
    
        factory: factories, observatory: str, starttime: UTCDateTime, endtime: UTCDateTime
    
        """Get readings from web absolutes or residual spreadsheets.
    
            configured WebAbsolutesFactory or SpreadsheetAbsolutesFactory
    
        observatory
            search observatory
        starttime
            search start time
        endtime
            search end time
    
        Returns
        -------
        list of found readings
        """
        readings = factory.get_readings(
            observatory=observatory,
            starttime=UTCDateTime(starttime),
            endtime=UTCDateTime(endtime),
            include_measurements=True,
        )
        return readings
    
    
    
    def main() -> None:
        """Entrypoint for copy absolutes method."""
        # for one command, can use typer.run
        typer.run(copy_absolutes)
    
    
    
    def upload_reading(factory: MetadataFactory, reading: Reading) -> Metadata:
        """Upload reading to metadata service
    
        Parameters
        ----------
        factory
            factory configured for metadata service
        reading
            reading to upload
    
        Returns
        -------
        created metadata object
        """
        metadata = create_reading_metadata(reading=reading)
        # TODO: should this check if metadata was already uploaded?
        # TODO: should that check occur before calling this method?