From 3cdbca174bb2fef40420a0473ea6d11cba3cc570 Mon Sep 17 00:00:00 2001 From: pcain-usgs <pcain@usgs.gov> Date: Tue, 23 Feb 2021 14:59:37 -0700 Subject: [PATCH] Write and request miniseed hours/days --- geomagio/api/db/metadata_table.py | 2 - geomagio/api/db/session_table.py | 2 - geomagio/api/secure/MetadataQuery.py | 2 +- geomagio/api/secure/SessionMiddleware.py | 3 +- geomagio/api/ws/DataApiQuery.py | 12 ++++- geomagio/api/ws/Element.py | 3 ++ geomagio/api/ws/algorithms.py | 2 +- geomagio/api/ws/data.py | 47 +++++++++++-------- geomagio/api/ws/metadata.py | 2 +- geomagio/processing/factory.py | 1 - geomagio/processing/magproc.py | 2 +- geomagio/processing/observatory.py | 3 -- geomagio/processing/obsrio.py | 58 ++++++++++++++++++++++-- test/api_test/ws_test/data_test.py | 2 + 14 files changed, 102 insertions(+), 39 deletions(-) diff --git a/geomagio/api/db/metadata_table.py b/geomagio/api/db/metadata_table.py index 367e59368..211a18044 100644 --- a/geomagio/api/db/metadata_table.py +++ b/geomagio/api/db/metadata_table.py @@ -1,7 +1,5 @@ from datetime import datetime -import enum -from obspy import UTCDateTime from sqlalchemy import or_, Boolean, Column, Index, Integer, JSON, String, Table, Text import sqlalchemy_utc diff --git a/geomagio/api/db/session_table.py b/geomagio/api/db/session_table.py index a62ebb782..49a62fecf 100644 --- a/geomagio/api/db/session_table.py +++ b/geomagio/api/db/session_table.py @@ -1,6 +1,4 @@ from datetime import datetime, timedelta, timezone -import json -from typing import Dict, Optional import sqlalchemy import sqlalchemy_utc diff --git a/geomagio/api/secure/MetadataQuery.py b/geomagio/api/secure/MetadataQuery.py index 1d92c67d8..6b30699d9 100644 --- a/geomagio/api/secure/MetadataQuery.py +++ b/geomagio/api/secure/MetadataQuery.py @@ -1,7 +1,7 @@ from datetime import timezone from obspy import UTCDateTime -from pydantic import BaseModel, validator +from pydantic import BaseModel from ...metadata import MetadataCategory from ... import pydantic_utcdatetime diff --git a/geomagio/api/secure/SessionMiddleware.py b/geomagio/api/secure/SessionMiddleware.py index 04c621c18..3bf0ea8e0 100644 --- a/geomagio/api/secure/SessionMiddleware.py +++ b/geomagio/api/secure/SessionMiddleware.py @@ -1,10 +1,9 @@ -import base64 import json from typing import Callable, Dict, Mapping import uuid from cryptography.fernet import Fernet -from starlette.datastructures import MutableHeaders, Secret +from starlette.datastructures import MutableHeaders from starlette.requests import HTTPConnection from starlette.types import ASGIApp, Message, Receive, Scope, Send diff --git a/geomagio/api/ws/DataApiQuery.py b/geomagio/api/ws/DataApiQuery.py index f52217eac..8e831f658 100644 --- a/geomagio/api/ws/DataApiQuery.py +++ b/geomagio/api/ws/DataApiQuery.py @@ -1,12 +1,13 @@ import datetime import enum -from typing import Any, Dict, List, Optional, Union +import os +from typing import Dict, List, Optional, Union from obspy import UTCDateTime from pydantic import BaseModel, root_validator, validator from ... import pydantic_utcdatetime -from .Element import ELEMENTS, ELEMENT_INDEX +from .Element import ELEMENTS from .Observatory import OBSERVATORY_INDEX @@ -47,6 +48,7 @@ class DataApiQuery(BaseModel): sampling_period: SamplingPeriod = SamplingPeriod.MINUTE data_type: Union[DataType, str] = DataType.VARIATION format: OutputFormat = OutputFormat.IAGA2002 + factory: Optional[str] = "edge" @validator("data_type") def validate_data_type( @@ -73,6 +75,12 @@ class DataApiQuery(BaseModel): ) return elements + @validator("factory") + def validate_factory(cls, factory: str): + if factory not in ["edge", "miniseed"]: + raise ValueError("Unsupported factory") + return factory + @validator("id") def validate_id(cls, id: str) -> str: if id not in OBSERVATORY_INDEX: diff --git a/geomagio/api/ws/Element.py b/geomagio/api/ws/Element.py index 952feae90..b0df40d1d 100644 --- a/geomagio/api/ws/Element.py +++ b/geomagio/api/ws/Element.py @@ -10,6 +10,9 @@ class Element(BaseModel): ELEMENTS = [ + Element(id="U", name="North Component(miniseed)", units="nT"), + Element(id="V", name="East Component(miniseed)", units="nT"), + Element(id="W", name="Vertical Component(miniseed)", units="nT"), Element(id="H", name="North Component", units="nT"), Element(id="E", name="East Component", units="nT"), Element(id="X", name="Geographic North Magnitude", units="nT"), diff --git a/geomagio/api/ws/algorithms.py b/geomagio/api/ws/algorithms.py index 0e2d51cb9..572045ba8 100644 --- a/geomagio/api/ws/algorithms.py +++ b/geomagio/api/ws/algorithms.py @@ -15,7 +15,7 @@ def get_dbdt( query: DataApiQuery = Depends(get_data_query), data_factory: TimeseriesFactory = Depends(get_data_factory), ) -> Response: - dbdt = DbDtAlgorithm() + dbdt = DbDtAlgorithm(period=query.sampling_period) # read data raw = get_timeseries(data_factory, query) # run dbdt diff --git a/geomagio/api/ws/data.py b/geomagio/api/ws/data.py index 053474c1d..360130c84 100644 --- a/geomagio/api/ws/data.py +++ b/geomagio/api/ws/data.py @@ -1,12 +1,12 @@ import os -from typing import Any, Dict, List, Union +from typing import List, Union from fastapi import APIRouter, Depends, Query from obspy import UTCDateTime, Stream from starlette.responses import Response from ... import TimeseriesFactory, TimeseriesUtility -from ...edge import EdgeFactory +from ...edge import EdgeFactory, MiniSeedFactory from ...iaga2002 import IAGA2002Writer from ...imfjson import IMFJSONWriter from .DataApiQuery import ( @@ -18,23 +18,6 @@ from .DataApiQuery import ( ) -def get_data_factory() -> TimeseriesFactory: - """Reads environment variable to determine the factory to be used - - Returns - ------- - data_factory - Edge or miniseed factory object - """ - data_type = os.getenv("DATA_TYPE", "edge") - data_host = os.getenv("DATA_HOST", "cwbpub.cr.usgs.gov") - data_port = int(os.getenv("DATA_PORT", "2060")) - if data_type == "edge": - return EdgeFactory(host=data_host, port=data_port) - else: - return None - - def get_data_query( id: str = Query(..., title="Observatory code"), starttime: UTCDateTime = Query( @@ -66,6 +49,11 @@ def get_data_query( " For example: R0 is 'internet variation'", ), format: OutputFormat = Query(OutputFormat.IAGA2002), + factory: str = Query( + "edge", + title="data factory", + description="Data source. NOTE: Only edge and miniseed factories are supported", + ), ) -> DataApiQuery: """Define query parameters used for webservice requests. @@ -99,10 +87,31 @@ def get_data_query( sampling_period=sampling_period, data_type=data_type, format=format, + factory=factory, ) return query +def get_data_factory( + query: DataApiQuery = Depends(get_data_query), +) -> TimeseriesFactory: + """Reads environment variable to determine the factory to be used + + Returns + ------- + data_factory + Edge or miniseed factory object + """ + host = os.getenv("DATA_HOST", "cwbpub.cr.usgs.gov") + port = int(os.getenv("DATA_PORT", "2060")) + if query.factory == "edge": + return EdgeFactory(host=host, port=port) + elif query.factory == "miniseed": + return MiniSeedFactory(host=host, port=port) + else: + return None + + def format_timeseries( timeseries: Stream, format: OutputFormat, elements: List[str] ) -> Response: diff --git a/geomagio/api/ws/metadata.py b/geomagio/api/ws/metadata.py index 7c95aaa36..9f4febeb9 100644 --- a/geomagio/api/ws/metadata.py +++ b/geomagio/api/ws/metadata.py @@ -1,6 +1,6 @@ from typing import List -from fastapi import APIRouter, Body, Response +from fastapi import APIRouter from obspy import UTCDateTime from ...metadata import Metadata, MetadataCategory diff --git a/geomagio/processing/factory.py b/geomagio/processing/factory.py index 47576109e..d9a065497 100644 --- a/geomagio/processing/factory.py +++ b/geomagio/processing/factory.py @@ -1,5 +1,4 @@ import os -from typing import Callable from ..TimeseriesFactory import TimeseriesFactory from ..edge import EdgeFactory, MiniSeedFactory diff --git a/geomagio/processing/magproc.py b/geomagio/processing/magproc.py index 428d9288a..748dc91a5 100644 --- a/geomagio/processing/magproc.py +++ b/geomagio/processing/magproc.py @@ -1,7 +1,7 @@ from datetime import datetime import os import sys -from typing import List, Tuple +from typing import List from dateutil.relativedelta import relativedelta from obspy.core import UTCDateTime, Stream diff --git a/geomagio/processing/observatory.py b/geomagio/processing/observatory.py index 14f744538..85a9cfd7f 100644 --- a/geomagio/processing/observatory.py +++ b/geomagio/processing/observatory.py @@ -1,17 +1,14 @@ -import os from typing import List, Optional import numpy from ..algorithm import ( - Algorithm, AdjustedAlgorithm, AverageAlgorithm, DeltaFAlgorithm, SqDistAlgorithm, XYZAlgorithm, ) -from ..edge import EdgeFactory, MiniSeedFactory from ..Controller import Controller, get_realtime_interval from ..TimeseriesFactory import TimeseriesFactory from .factory import get_edge_factory diff --git a/geomagio/processing/obsrio.py b/geomagio/processing/obsrio.py index 6c6d435d3..3765231cf 100644 --- a/geomagio/processing/obsrio.py +++ b/geomagio/processing/obsrio.py @@ -1,4 +1,3 @@ -import os from typing import Optional import typer @@ -14,7 +13,60 @@ from .factory import get_edge_factory, get_miniseed_factory def main(): - typer.run(filter_realtime) + typer.run(filter) + + +def filter( + interval: str, + observatory: str, + input_factory: Optional[str] = None, + host: str = "127.0.0.1", + port: str = 2061, + output_factory: Optional[str] = None, + output_port: int = typer.Option( + 2061, help="Port where output factory writes data." + ), + output_read_port: int = typer.Option( + 2061, help="Port where output factory reads data" + ), + realtime_interval: int = 600, + update_limit: int = 10, +): + if interval == "realtime": + filter_realtime( + observatory=observatory, + input_factory=input_factory, + host=host, + port=port, + output_factory=output_factory, + output_port=output_port, + output_read_port=output_read_port, + realtime_interval=realtime_interval, + update_limit=update_limit, + ) + elif interval in ["hour", "day"]: + input_factory = EdgeFactory(host=host, port=port) + output_factory = MiniSeedFactory( + host=host, port=output_read_port, write_port=output_port + ) + if interval == "hour": + obsrio_hour( + observatory=observatory, + input_factory=input_factory, + output_factory=output_factory, + realtime_interval=realtime_interval, + update_limit=update_limit, + ) + elif interval == "days": + obsrio_day( + observatory=observatory, + input_factory=input_factory, + output_factory=output_factory, + realtime_interval=realtime_interval, + update_limit=update_limit, + ) + else: + raise ValueError("Invalid interval") def filter_realtime( @@ -34,12 +86,10 @@ def filter_realtime( ): """Filter 10Hz miniseed, 1 second, one minute, and temperature data. Defaults set for realtime processing; can also be implemented to update legacy data""" - if input_factory == "miniseed": input_factory = MiniSeedFactory(host=host, port=port) elif input_factory == "edge": input_factory = EdgeFactory(host=host, port=port) - if output_factory == "miniseed": output_factory = MiniSeedFactory( host=host, port=output_read_port, write_port=output_port diff --git a/test/api_test/ws_test/data_test.py b/test/api_test/ws_test/data_test.py index 800b0cd78..e862a2953 100644 --- a/test/api_test/ws_test/data_test.py +++ b/test/api_test/ws_test/data_test.py @@ -14,6 +14,7 @@ def test_get_data_query(): data_type="R1", sampling_period=60, format="iaga2002", + factory="edge", ) assert_equal(query.id, "BOU") assert_equal(query.starttime, UTCDateTime("2020-09-01T00:00:01")) @@ -22,3 +23,4 @@ def test_get_data_query(): assert_equal(query.sampling_period, SamplingPeriod.MINUTE) assert_equal(query.format, OutputFormat.IAGA2002) assert_equal(query.data_type, "R1") + assert_equal(query.factory, "edge") -- GitLab