Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from typing import List
from fastapi import APIRouter, Body, Depends, Request, Response
from obspy import UTCDateTime
from ...metadata import Metadata, MetadataCategory
from ..db import metadata_table
from .login import require_user, User
from .MetadataQuery import MetadataQuery
from .MetadataResponse import MetadataResponse
from ... import pydantic_utcdatetime
# routes for login/logout
router = APIRouter()
@router.post("/metadata", response_model=Metadata)
async def create_metadata(
request: Request, metadata: Metadata, user: User = Depends(require_user()),
):
metadata = await metadata_table.create_metadata(metadata)
return Response(metadata, status_code=201, media_type="application/json")
@router.delete("/metadata/{id}")
async def delete_metadata(id: int, user: User = Depends(require_user())):
await metadata_table.delete_metadata(id)
@router.get("/metadata", response_model=List[Metadata])
async def get_metadata(
category: MetadataCategory = None,
starttime: UTCDateTime = None,
endtime: UTCDateTime = None,
network: str = None,
station: str = None,
channel: str = None,
location: str = None,
data_valid: bool = None,
metadata_valid: bool = True,
):
query = MetadataQuery(
category=category,
starttime=starttime,
endtime=endtime,
network=network,
station=station,
channel=channel,
location=location,
data_valid=data_valid,
metadata_valid=metadata_valid,
)
metas = await metadata_table.get_metadata(**query.datetime_dict(exclude={"id"}))
return metas
@router.get("/metadata/{id}", response_model=Metadata)
async def get_metadata_by_id(id: int):
meta = await metadata_table.get_metadata(id=id)
if len(meta) != 1:
return Response(status_code=404)
else:
return meta[0]
@router.put("/metadata/{id}")
async def update_metadata(
id: int, metadata: Metadata = Body(...), user: User = Depends(require_user()),
):
await metadata_table.update_metadata(metadata)