Newer
Older
Hobbs, Alexandra (Contractor)
committed
from typing import List, Union, Optional
from fastapi import APIRouter, Depends, Query, Request
from obspy import UTCDateTime, Stream
from starlette.responses import Response
from ... import TimeseriesFactory, TimeseriesUtility
from ...DerivedTimeseriesFactory import DerivedTimeseriesFactory
Wilbur, Spencer Franklin
committed
from ...edge import EdgeFactory, FDSNFactory, MiniSeedFactory
from ...iaga2002 import IAGA2002Writer
from ...imfjson import IMFJSONWriter
from .Observatory import ASL_OBSERVATORY_INDEX
from .DataApiQuery import (
DEFAULT_ELEMENTS,
DataApiQuery,
DataType,
OutputFormat,
SamplingPeriod,
Hobbs, Alexandra (Contractor)
committed
from ...pydantic_utcdatetime import CustomUTCDateTimeType
def get_data_factory(
query: DataApiQuery,
) -> TimeseriesFactory:
"""Reads environment variable to determine the factory to be used
Returns
-------
data_factory
Edge or miniseed factory object
"""
host = query.data_host or DataHost.DEFAULT
Wilbur, Spencer Franklin
committed
observatory = ASL_OBSERVATORY_INDEX[query.id]
factory = FDSNFactory(network=observatory.network, locationCode="40")
# MiniSeedFactory required for floating point data;
# MiniSeedFactory advised for 10 Hz sampling in general
host=host,
port=os.getenv("DATA_MINISEED_PORT", None),
convert_channels=["U", "V", "W"], # no channel mapping (e.g., "H"->"U")
elif sampling_period in list(SamplingPeriod):
# EdgeFactory required for real time data with long sample periods
factory = EdgeFactory(host=host, port=os.getenv("DATA_EARTHWORM_PORT", None))
return DerivedTimeseriesFactory(factory)
id: str = Query(..., title="Observatory code"),
Hobbs, Alexandra (Contractor)
committed
starttime: Optional[CustomUTCDateTimeType] = Query(
None,
title="Start Time",
description="Time of first requested data. Default is start of current UTC day.",
),
Hobbs, Alexandra (Contractor)
committed
endtime: Optional[CustomUTCDateTimeType] = Query(
None,
title="End Time",
description="Time of last requested data. Default is starttime plus 24 hours.",
),
elements: List[str] = Query(
DEFAULT_ELEMENTS,
title="Geomagnetic Elements.",
description="Either comma separated list of elements, or repeated query parameter"
" NOTE: when using 'iaga2002' output format, a maximum of 4 elements is allowed",
),
sampling_period: Union[SamplingPeriod, float] = Query(
Wilbur, Spencer Franklin
committed
None,
title="data rate",
description="Interval in seconds between values.",
),
data_type: Union[DataType, str] = Query(
DataType.ADJUSTED,
alias="type",
description="Type of data."
" NOTE: the USGS web service also supports specific EDGE location codes."
" For example: R0 is 'internet variation'",
),
format: Union[OutputFormat, str] = Query(
OutputFormat.IAGA2002,
title="Output Format",
description="Format of returned time series data.",
),
data_host: Union[DataHost, str] = Query(
DataHost.DEFAULT,
title="Data Host",
description="Edge host to pull data from.",
) -> DataApiQuery:
"""Define query parameters used for webservice requests.
Uses DataApiQuery for parsing and validation.
Parameters
-------
id
observatory iaga code
starttime
query start
default is start of current UTC day.
endtime
query end
default is end of current UTC day.
elements
geomagnetic elements, or EDGE channel codes
sampling_period
data rate
data_type
data processing level
format
output format
"""
Wilbur, Spencer Franklin
committed
Wilbur, Spencer Franklin
committed
default_params = [
"id",
"starttime",
"endtime",
"elements",
"sampling_period",
"type",
"format",
"data_host",
]
invalid_params = []
for param in request.query_params.keys():
if param not in default_params:
invalid_params.append(param)
if len(invalid_params) > 0:
msg = ", ".join(invalid_params)
raise ValueError(f"Invalid query parameter(s): {msg}")
# parse query
query = DataApiQuery(
id=id,
starttime=starttime,
endtime=endtime,
elements=elements,
sampling_period=sampling_period,
data_type=data_type,
format=format,
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def format_timeseries(
timeseries: Stream, format: OutputFormat, elements: List[str]
) -> Response:
"""Formats timeseries output
Parameters
----------
timeseries: data to format
format: output format
obspy.core.Stream
timeseries object with requested data
"""
if format == OutputFormat.JSON:
data = IMFJSONWriter.format(timeseries, elements)
media_type = "application/json"
else:
data = IAGA2002Writer.format(timeseries, elements)
media_type = "text/plain"
return Response(data, media_type=media_type)
def get_timeseries(data_factory: TimeseriesFactory, query: DataApiQuery) -> Stream:
"""Get timeseries data
Parameters
----------
data_factory: where to read data
query: parameters for the data to read
"""
Wilbur, Spencer Franklin
committed
# get data
timeseries = data_factory.get_timeseries(
starttime=query.starttime,
endtime=query.endtime,
observatory=query.id,
channels=query.elements,
type=query.data_type,
interval=TimeseriesUtility.get_interval_from_delta(query.sampling_period),
)
@router.get(
"/data/",
name="Request data",
description="Returns timeseries depending on query parameters\n\n"
+ "Limited to 345600 data points",
)
query: DataApiQuery = Depends(get_data_query),
data_factory = get_data_factory(query=query)
# read data
timeseries = get_timeseries(data_factory, query)
# output response
timeseries=timeseries, format=query.format, elements=query.elements