Skip to content
Snippets Groups Projects
Commit 970705c1 authored by Cain, Payton David's avatar Cain, Payton David
Browse files

_get_header, get_metadata_by_id, allow exception for no data, move query

parent f0da49c6
No related branches found
No related tags found
2 merge requests!146Release CMO metadata to production,!65Metadata client
from geomagio.apiclient.metadata import create
create(station="BOU", category="reading")
......@@ -19,11 +19,10 @@ from typing import List
from fastapi import APIRouter, Body, Depends, Request, Response
from obspy import UTCDateTime
from ...metadata import Metadata, MetadataCategory
from ...metadata import Metadata, MetadataCategory, MetadataQuery
from ... import pydantic_utcdatetime
from ..db import metadata_table
from .login import require_user, User
from .MetadataQuery import MetadataQuery
from ... import pydantic_utcdatetime
# routes for login/logout
router = APIRouter()
......
......@@ -3,8 +3,7 @@ from typing import List
from fastapi import APIRouter
from obspy import UTCDateTime
from ...metadata import Metadata, MetadataCategory
from ..secure.MetadataQuery import MetadataQuery
from ...metadata import Metadata, MetadataCategory, MetadataQuery
from ..db import metadata_table
router = APIRouter()
......
......@@ -5,8 +5,7 @@ from typing import List, Union
from obspy import UTCDateTime
from pydantic import parse_obj_as
from ..api.secure.MetadataQuery import MetadataQuery
from ..metadata import Metadata
from ..metadata import Metadata, MetadataQuery
class MetadataFactory(object):
......@@ -19,7 +18,10 @@ class MetadataFactory(object):
):
self.url = url
self.token = token
self.header = {"Authorization": self.token} if token else None
self.header = self._get_headers()
def _get_headers(self):
return {"Authorization": self.token} if self.token else None
def delete_metadata(self, metadata: Metadata) -> bool:
response = requests.delete(url=f"{self.url}/{metadata.id}", headers=self.header)
......@@ -28,19 +30,19 @@ class MetadataFactory(object):
return False
def get_metadata(self, query: MetadataQuery) -> List[Metadata]:
args = parse_params(query=query)
if "id" in args:
self.url = f"{self.url}/{args['id']}"
args = {}
response = requests.get(url=self.url, params=args, headers=self.header)
try:
metadata = parse_obj_as(Union[List[Metadata], Metadata], response.json())
except:
return []
if query.id:
response = self.get_metadata_by_id(id=query.id)
else:
args = parse_params(query=query)
response = requests.get(url=self.url, params=args, headers=self.header)
metadata = parse_obj_as(Union[List[Metadata], Metadata], response.json())
if isinstance(metadata, Metadata):
metadata = [metadata]
return metadata
def get_metadata_by_id(self, id: int) -> requests.Response:
return requests.get(f"{self.url}/{id}", headers=self.header)
def create_metadata(self, metadata: Metadata) -> Metadata:
response = requests.post(
url=self.url, data=metadata.json(), headers=self.header
......@@ -55,19 +57,15 @@ class MetadataFactory(object):
def parse_params(query: MetadataQuery) -> str:
query = query.dict()
query = query.dict(exclude_none=True)
args = {}
for key in query.keys():
element = query[key]
if element is not None:
# convert times to strings
if type(element) == UTCDateTime:
element = element.isoformat()
# get string value of metadata category
if key == "category":
element = element.value
elif key == "id":
return {"id": element}
args[key] = element
# convert times to strings
if isinstance(element, UTCDateTime):
element = element.isoformat()
# get string value of metadata category
if key == "category":
element = element.value
args[key] = element
return args
......@@ -6,8 +6,7 @@ from typing import Dict, Optional
from obspy import UTCDateTime
import typer
from ..api.secure.MetadataQuery import MetadataQuery
from ..metadata import Metadata, MetadataCategory
from ..metadata import Metadata, MetadataCategory, MetadataQuery
from .MetadataFactory import MetadataFactory
......@@ -58,7 +57,7 @@ def create(
endtime=UTCDateTime(endtime) if endtime else None,
id=id,
location=location,
metadata=input_metadata["metadata"],
metadata=input_metadata,
metadata_valid=metadata_valid,
network=network,
starttime=UTCDateTime(starttime) if starttime else None,
......@@ -78,7 +77,8 @@ def delete(
metadata_dict = load_metadata(input_file=input_file)
metadata = Metadata(**metadata_dict)
deleted = MetadataFactory(url=url).delete_metadata(metadata=metadata)
print(deleted)
if not deleted:
sys.exit(1)
@app.command()
......@@ -124,7 +124,7 @@ def get(
raise ValueError("More than one matching record")
print(metadata[0].json())
return
print([m.json() for m in metadata])
print("[" + ",".join([m.json() for m in metadata]) + "]")
@app.command()
......
......@@ -4,8 +4,8 @@ from obspy import UTCDateTime
from pydantic import BaseModel
from typing import Optional
from ...metadata import MetadataCategory
from ... import pydantic_utcdatetime
from .. import pydantic_utcdatetime
from .MetadataCategory import MetadataCategory
class MetadataQuery(BaseModel):
......
from .Metadata import Metadata
from .MetadataCategory import MetadataCategory
from .MetadataQuery import MetadataQuery
__all__ = ["Metadata", "MetadataCategory"]
__all__ = ["Metadata", "MetadataCategory", "MetadataQuery"]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment