Skip to content
Snippets Groups Projects
data.py 6.22 KiB
Newer Older
import os
from fastapi import APIRouter, Depends, Query, Request
from obspy import UTCDateTime, Stream
from starlette.responses import Response

from ... import TimeseriesFactory, TimeseriesUtility
from ...DerivedTimeseriesFactory import DerivedTimeseriesFactory
from ...edge import EdgeFactory, FDSNFactory, MiniSeedFactory
from ...iaga2002 import IAGA2002Writer
from ...imfjson import IMFJSONWriter
from .Observatory import ASL_OBSERVATORY_INDEX
from .DataApiQuery import (
    DEFAULT_ELEMENTS,
    DataApiQuery,
    DataType,
    OutputFormat,
    SamplingPeriod,
from ...pydantic_utcdatetime import CustomUTCDateTimeType

def get_data_factory(
    query: DataApiQuery,
) -> TimeseriesFactory:
    """Reads environment variable to determine the factory to be used

    Returns
    -------
    data_factory
        Edge or miniseed factory object
    """
    host = query.data_host or DataHost.DEFAULT
    sampling_period = query.sampling_period
    if query.id in ASL_OBSERVATORY_INDEX:
        observatory = ASL_OBSERVATORY_INDEX[query.id]
        factory = FDSNFactory(network=observatory.network, locationCode="40")
    elif sampling_period in [
        SamplingPeriod.TEN_HERTZ,
    ]:
        # MiniSeedFactory required for floating point data;
        # MiniSeedFactory advised for 10 Hz sampling in general
        factory = MiniSeedFactory(
            host=host,
            port=os.getenv("DATA_MINISEED_PORT", None),
            convert_channels=["U", "V", "W"],  # no channel mapping (e.g., "H"->"U")
    elif sampling_period in list(SamplingPeriod):
        # EdgeFactory required for real time data with long sample periods
        factory = EdgeFactory(host=host, port=os.getenv("DATA_EARTHWORM_PORT", None))
    return DerivedTimeseriesFactory(factory)
def get_data_query(
    request: Request,
    id: str = Query(..., title="Observatory code"),
    starttime: Optional[CustomUTCDateTimeType] = Query(
        None,
        title="Start Time",
        description="Time of first requested data. Default is start of current UTC day.",
    ),
    endtime: Optional[CustomUTCDateTimeType] = Query(
        None,
        title="End Time",
        description="Time of last requested data. Default is starttime plus 24 hours.",
    ),
    elements: List[str] = Query(
        DEFAULT_ELEMENTS,
        title="Geomagnetic Elements.",
        description="Either comma separated list of elements, or repeated query parameter"
        " NOTE: when using 'iaga2002' output format, a maximum of 4 elements is allowed",
    ),
    sampling_period: Union[SamplingPeriod, float] = Query(
        title="data rate",
        description="Interval in seconds between values.",
    ),
    data_type: Union[DataType, str] = Query(
        DataType.ADJUSTED,
        alias="type",
        description="Type of data."
        " NOTE: the USGS web service also supports specific EDGE location codes."
        " For example: R0 is 'internet variation'",
    ),
    format: Union[OutputFormat, str] = Query(
        OutputFormat.IAGA2002,
        title="Output Format",
        description="Format of returned time series data.",
    ),
    data_host: Union[DataHost, str] = Query(
        DataHost.DEFAULT,
        title="Data Host",
        description="Edge host to pull data from.",
) -> DataApiQuery:
    """Define query parameters used for webservice requests.

    Uses DataApiQuery for parsing and validation.

    Parameters
    -------
    id
        observatory iaga code
    starttime
        query start
        default is start of current UTC day.
    endtime
        query end
        default is end of current UTC day.
    elements
        geomagnetic elements, or EDGE channel codes
    sampling_period
        data rate
    data_type
        data processing level
    format
        output format
    """
    default_params = [
        "id",
        "starttime",
        "endtime",
        "elements",
        "sampling_period",
        "type",
        "format",
        "data_host",
    ]
    invalid_params = []
    for param in request.query_params.keys():
        if param not in default_params:
            invalid_params.append(param)
    if len(invalid_params) > 0:
        msg = ", ".join(invalid_params)
        raise ValueError(f"Invalid query parameter(s): {msg}")
    # parse query
    query = DataApiQuery(
        id=id,
        starttime=starttime,
        endtime=endtime,
        elements=elements,
        sampling_period=sampling_period,
        data_type=data_type,
        format=format,
def format_timeseries(
    timeseries: Stream, format: OutputFormat, elements: List[str]
) -> Response:
    """Formats timeseries output

    Parameters
    ----------
    timeseries: data to format
    format: output format
    obspy.core.Stream
        timeseries object with requested data
    """
    if format == OutputFormat.JSON:
        data = IMFJSONWriter.format(timeseries, elements)
        media_type = "application/json"
    else:
        data = IAGA2002Writer.format(timeseries, elements)
        media_type = "text/plain"
    return Response(data, media_type=media_type)


def get_timeseries(data_factory: TimeseriesFactory, query: DataApiQuery) -> Stream:
    """Get timeseries data

    Parameters
    ----------
    data_factory: where to read data
    query: parameters for the data to read
    """
    # get data
    timeseries = data_factory.get_timeseries(
        starttime=query.starttime,
        endtime=query.endtime,
        observatory=query.id,
        channels=query.elements,
        type=query.data_type,
        interval=TimeseriesUtility.get_interval_from_delta(query.sampling_period),
    )
    return timeseries
@router.get(
    "/data/",
    name="Request data",
    description="Returns timeseries depending on query parameters\n\n"
    + "Limited to 345600 data points",
)
def get_data(
    query: DataApiQuery = Depends(get_data_query),
) -> Response:
    data_factory = get_data_factory(query=query)
    # read data
    timeseries = get_timeseries(data_factory, query)
    # output response
    return format_timeseries(
        timeseries=timeseries, format=query.format, elements=query.elements