Skip to content
Snippets Groups Projects
Commit 4e39c8a7 authored by Bucknell, Mary S.'s avatar Bucknell, Mary S.
Browse files

Start of change to use asynchronous services. need to update to Flask 2.0 and use python 3.8 first.

parent 8b4a0ab2
No related branches found
No related tags found
1 merge request!31Wdfn 618 - Change data loading to be asynchronous
{
"name": "waterdataui-assets",
"version": "0.49.0dev",
"version": "0.50.0dev",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
......
......@@ -3,117 +3,105 @@ Class and functions for calling NWIS services and working with
the returned data.
"""
from requests import exceptions as request_exceptions, Session
from ..utils import parse_rdb
from .. import app
class SiteService:
async def get(session, params):
# pylint: disable=no-member
"""
Provides access to the NWIS site service
Returns a tuple containing the request status code and a list of dictionaries that represent the contents of
RDB file
:param session - instance aiohttp.ClientSession
:param dict params:
:yields
- status_code - status code returned from the service request
- reason - string
- site_data - list of dictionaries
"""
def __init__(self, endpoint):
"""
Constructor method.
:param str endpoint: the scheme, host and path to the NWIS site service
"""
self.endpoint = endpoint
self.session = Session()
def get(self, params):
# pylint: disable=no-member
"""
Returns a tuple containing the request status code and a list of dictionaries that represent the contents of
RDB file
:param dict params:
:returns
- status_code - status code returned from the service request
- reason - string
- site_data - list of dictionaries
"""
app.logger.debug(f'Requesting data from {self.endpoint}')
default_params = {
'format': 'rdb'
}
default_params.update(params)
try:
response = self.session.get(self.endpoint, params=default_params)
except (request_exceptions.Timeout, request_exceptions.ConnectionError) as err:
app.logger.error(repr(err))
return 500, repr(err), None
endpoint = app.config["SITE_DATA_ENDPOINT"]
app.logger.debug(f'Requesting data from {endpoint}')
default_params = {
'format': 'rdb'
}
default_params.update(params)
async with session.get(endpoint, params=default_params) as response:
if response.status_code == 200:
return 200, response.reason, list(parse_rdb(response.iter_lines(decode_unicode=True)))
async with response.iter_lines(decode_unicode=True) as lines:
return 200, response.reason, list(parse_rdb(lines))
return response.status_code, response.reason, []
def get_site_data(self, site_no, agency_cd=''):
"""
Get the metadata for site_no, agency_cd (which may be blank) using the additional query parameters, param
Note that more than one dictionary can be returned if agency_cd is empty.
:param str site_no: site identifier
:param str agency_cd: identifier for the agency that owns the site
:returns:
- status - status code from response
- reason - string
- site_metadata - list of dict representing the data returned in the rdb file
"""
params = {
'sites': site_no,
'siteOutput': 'expanded'
}
if agency_cd:
params['agencyCd'] = agency_cd
return self.get(params)
def get_period_of_record(self, site_no, agency_cd=''):
"""
Get the parameters measured at the site(s).
async def get_site_data(session, site_no, agency_cd=''):
"""
Get the metadata for site_no, agency_cd (which may be blank) using the additional query parameters, param
Note that more than one dictionary can be returned if agency_cd is empty.
:param session: instance of aiohttp.ClientSession
:param str site_no: site identifier
:param str agency_cd: identifier for the agency that owns the site
:yields:
- status - status code from response
- reason - string
- site_metadata - list of dict representing the data returned in the rdb file
"""
params = {
'sites': site_no,
'siteOutput': 'expanded'
}
if agency_cd:
params['agencyCd'] = agency_cd
return await get(session, params)
:param str site_no: site identifier
:param str agency_cd: identifier for the agency that owns the site:
:returns:
- status - status code from response
- reason - string
- periodOfRecord - list of dict representing the period of record for the data available at the site
"""
params = {
'sites': site_no,
'seriesCatalogOutput': True,
'siteStatus': 'all'
}
if agency_cd:
params['agencyCd'] = agency_cd
return self.get(params)
async def get_period_of_record(session, site_no, agency_cd=''):
"""
Get the parameters measured at the site(s).
:param session: instance of aiohttp.ClientSession
:param str site_no: site identifier
:param str agency_cd: identifier for the agency that owns the site:
:returns:
- status - status code from response
- reason - string
- periodOfRecord - list of dict representing the period of record for the data available at the site
"""
params = {
'sites': site_no,
'seriesCatalogOutput': True,
'siteStatus': 'all'
}
if agency_cd:
params['agencyCd'] = agency_cd
return await get(session, params)
def get_huc_sites(self, huc_cd):
"""
Get all sites within a hydrologic unit as identified by its
hydrologic unit code (HUC).
:param str huc_cd: hydrologic unit code
:returns: all sites in the specified HUC
- status - status code from response
- reason - string
- sites - list of dict representing the sites in huc_cd
"""
return self.get({
'huc': huc_cd
})
async def get_huc_sites(session, huc_cd):
"""
Get all sites within a hydrologic unit as identified by its
hydrologic unit code (HUC).
:param session: instance of aiohttp.ClientSession
:param str huc_cd: hydrologic unit code
:returns: all sites in the specified HUC
- status - status code from response
- reason - string
- sites - list of dict representing the sites in huc_cd
"""
return await get(session, {
'huc': huc_cd
})
def get_county_sites(self, state_county_cd):
"""
Get all sites within a county.
:param str state_county_cd: FIPS ID for a statecounty
:returns: all sites in the specified county
- status - status code from response
- reason - string
- sites - list of dict representing the site in state_county_cd
"""
return self.get({
'countyCd': state_county_cd
})
def get_county_sites(session, state_county_cd):
"""
Get all sites within a county.
:param session: instance of aiohttp.ClientSession
:param str state_county_cd: FIPS ID for a statecounty
:returns: all sites in the specified county
- status - status code from response
- reason - string
- sites - list of dict representing the site in state_county_cd
"""
return get(session, {
'countyCd': state_county_cd
})
"""
Helpers to retrieve SIFTA cooperator data.
"""
from requests import exceptions as request_exceptions, Session
from .. import app
class SiftaService:
# pylint: disable=too-few-public-methods, no-member
async def get_cooperators(session, site_no):
"""
Provide access to a service that returns cooperator data
"""
def __init__(self, endpoint):
self.endpoint = endpoint
self.session = Session()
Gets the cooperator data from the SIFTA service
def get_cooperators(self, site_no):
"""
Gets the cooperator data from the SIFTA service
:param session - instance aiohttp.ClientSession
:param str site_no: USGS site number
:yields Array of dict
"""
url = f'{app.config["COOPERATOR_SERVICE_ENDPOINT"]}{site_no}'
app.logger.debug(f'Requesting data from {url}') # pylint: disable=no-member
:param site_no: USGS site number
:return Array of dict
"""
url = f'{self.endpoint}{site_no}'
try:
response = self.session.get(url)
except (request_exceptions.Timeout, request_exceptions.ConnectionError) as err:
app.logger.error(repr(err))
return []
async with session.get(url) as response:
# TODO: Add exception handling
app.logger.debug(f'Retrieved data from {url}') # pylint: disable=no-member
if response.status_code != 200:
return []
......
"""
Main application views.
"""
import asyncio
import datetime
import json
import smtplib
......@@ -16,17 +17,15 @@ from .utils import defined_when, set_cookie_for_banner_message, create_message
from .services.camera import get_monitoring_location_camera_details
from .services.nwissite import SiteService
from .services.ogc import MonitoringLocationNetworkService
from .services.sifta import SiftaService
from .services.sifta import get_cooperators
from .services.timezone import TimeZoneService
# Station Fields Mapping to Descriptions
from .constants import STATION_FIELDS_D
site_service = SiteService(app.config['SITE_DATA_ENDPOINT'])
monitoring_location_network_service = \
MonitoringLocationNetworkService(app.config['MONITORING_LOCATIONS_OBSERVATIONS_ENDPOINT'])
time_zone_service = TimeZoneService(app.config['WEATHER_SERVICE_ENDPOINT'])
sifta_service = SiftaService(app.config['COOPERATOR_SERVICE_ENDPOINT'])
@app.context_processor
......@@ -98,18 +97,21 @@ def iv_data_availability():
"""Render the IV data availability statement page."""
return render_template('iv_data_availability_statement.html')
@app.route('/monitoring-location/<site_no>/', methods=['GET'])
def monitoring_location(site_no):
async def monitoring_location(site_no):
# pylint: disable=too-many-locals
# TODO: refactor to use more utility functions so that this doesn't have too many locals # pylint: disable=fixme
"""
Monitoring Location view
:param site_no: USGS site number
"""
agency_cd = request.args.get('agency_cd', '')
async with aiohttp.ClientSession()
loop = asyncio.get_event_loop()
coroutines = [get_cooperators(sifta_endpoint, site_no)]
results = loop.run_until_complete(asyncio.gather(*coroutines))
site_status, site_status_reason, site_data = site_service.get_site_data(site_no, agency_cd)
json_ld = None
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment