Skip to content
Snippets Groups Projects
Commit 8da74119 authored by Cain, Payton David's avatar Cain, Payton David
Browse files

Gitlab token for delete, post, and update

parent d10fa1b8
No related branches found
No related tags found
2 merge requests!146Release CMO metadata to production,!65Metadata client
......@@ -33,8 +33,10 @@ Usage:
/logout - logout current user
/user - access current user information as json
"""
import json
import logging
import os
import requests
from typing import Callable, List, Optional
from authlib.integrations.starlette_client import OAuth
......@@ -56,17 +58,59 @@ class User(BaseModel):
async def current_user(request: Request) -> Optional[User]:
"""Get logged in user, or None if not logged in.
"""Get user information from gitlab access token or session(if currently logged in).
Returns none if access token is not vald or user is not logged in.
Usage example:
user: Optional[User] = Depends(current_user)
"""
if "Authorization" in request.headers:
return get_api_user(token=request.headers["Authorization"])
if "user" in request.session:
return User(**request.session["user"])
return None
def get_api_user(token: str) -> Optional[User]:
url = os.getenv("GITLAB_API_URL")
header = {"PRIVATE-TOKEN": token}
# request user information from gitlab api with access token
user_response = requests.get(
f"{url}/user",
headers=header,
)
userinfo = json.loads(user_response.content)
try:
user = User(
email=userinfo["email"],
sub=userinfo["id"],
name=userinfo["name"],
nickname=userinfo["username"],
picture=userinfo["avatar_url"],
)
except KeyError:
logging.info("Invalid token")
return None
# use valid token to retrieve user's groups
user.groups = get_groups_api(
groups_response=requests.get(
f"{url}/groups",
headers=header,
)
)
return user
def get_groups_api(groups_response: requests.Response) -> List[str]:
"""returns a user's groups via gitlab api"""
groups = json.loads(groups_response.content)
group_urls = [g["web_url"] for g in groups]
group_names = [url[url.find("ghsc") :] for url in group_urls]
return group_names
def require_user(
allowed_groups: List[str] = None,
) -> Callable[[Request, User], User]:
......
......@@ -41,7 +41,7 @@ async def create_metadata(
@router.delete("/metadata/{id}")
async def delete_metadata(
id: int, user: User = Depends(require_user(os.getenv("ADMIN_GROUP", "admin")))
id: int, user: User = Depends(require_user([os.getenv("ADMIN_GROUP", "admin")]))
):
await metadata_table.delete_metadata(id)
......
import json
import os
from typing import Literal, Optional
from typing import Dict, Optional
from obspy import UTCDateTime
import typer
......@@ -17,6 +18,10 @@ def client(
url: str = "http://{}/ws/secure/metadata".format(
os.getenv("EDGE_HOST", "127.0.0.1:8000")
),
id: Optional[int] = typer.Option(
None,
help="Database id required for deleting and updating metadata. NOTE: Metadata requests by id ignore additional parameters",
),
category: Optional[MetadataCategory] = None,
starttime: Optional[str] = None,
endtime: Optional[str] = None,
......@@ -26,10 +31,18 @@ def client(
station: Optional[str] = None,
channel: Optional[str] = None,
location: Optional[str] = None,
data_valid: Optional[bool] = None,
data_valid: Optional[bool] = True,
metadata_valid: Optional[bool] = True,
input_file: Optional[str] = typer.Option(
None,
help="JSON formatted file containing non-shared metadata information",
),
token: Optional[str] = typer.Option(
os.getenv("GITLAB_API_TOKEN"), help="Gitlab account access token"
),
):
query = MetadataQuery(
id=id,
category=category,
starttime=UTCDateTime(starttime) if starttime else None,
endtime=UTCDateTime(endtime) if endtime else None,
......@@ -42,13 +55,21 @@ def client(
data_valid=data_valid,
metadata_valid=metadata_valid,
)
factory = MetadataFactory(url=url)
factory = MetadataFactory(url=url, token=token)
if action == "delete":
factory.delete_metadata(query=query)
response = factory.delete_metadata(id=query.id)
elif action == "get":
metadata = factory.get_metadata(query=query)
if action == "post":
factory.post_metadata(query=query)
if action == "update":
factory.update_metadata(query=query)
return metadata
response = factory.get_metadata(query=query)
elif action in ["post", "update"]:
try:
with open(input_file, "r") as file:
data = json.load(file)
except (FileNotFoundError, TypeError):
raise ValueError("Input file invalid or not provided")
if action == "post":
response = factory.post_metadata(query=query, data=data)
elif action == "update":
response = factory.update_metadata(id=query.id, query=query, data=data)
else:
raise ValueError("Invalid action")
return response
import os
import json
from json.decoder import JSONDecodeError
import os
import requests
import urllib
from typing import Dict, List, Optional
import urllib
from obspy import UTCDateTime
from pydantic import parse_obj_as
from ..api.secure import MetadataQuery
from ..residual import Reading
from ..api.secure.MetadataQuery import MetadataQuery
from .Metadata import Metadata
from .MetadataCategory import MetadataCategory
class MetadataFactory(object):
......@@ -19,50 +17,62 @@ class MetadataFactory(object):
url: str = "http://{}/ws/secure/metadata".format(
os.getenv("EDGE_HOST", "127.0.0.1:8000")
),
token: str = os.getenv("GITLAB_API_TOKEN"),
):
self.url = url
self.token = token
self.header = {"Authorization": self.token} if token else None
def delete_metadata(self, query: MetadataQuery):
raise NotImplementedError
def format_metadata(self, data: Dict):
# formats responses as Metadata objects
return parse_obj_as(List[Metadata], data)
def delete_metadata(self, id: int) -> Dict:
response = requests.delete(url=f"{self.url}/{id}", headers=self.header)
return response
def get_metadata(self, query: MetadataQuery) -> List[Metadata]:
def get_metadata(self, query: MetadataQuery) -> List[Dict]:
args = parse_params(query=query)
response = web_request(url=f"{self.url}?{args}")
metadata = self.format_metadata(data=response)
return metadata
raw_response = requests.get(url=f"{self.url}{args}", headers=self.header)
try:
response = json.loads(raw_response.content)
except JSONDecodeError:
raise ValueError("Data not found")
return response
def post_metadata(self, query: MetadataQuery):
raise NotImplementedError
def post_metadata(
self, query: MetadataQuery, data: Optional[Dict] = {}
) -> requests.Response:
metadata = parse_metadata(query=query, data=data)
response = requests.post(url=self.url, data=metadata, headers=self.header)
return response
def update_metadata(self, query: MetadataQuery):
raise NotImplementedError
def update_metadata(
self, id: int, query: MetadataQuery, data: Optional[Dict] = {}
) -> requests.Response:
metadata = parse_metadata(query=query, data=data)
response = requests.put(
url=f"{self.url}/{query.id}", data=metadata, headers=self.header
)
return response
def web_request(url: str) -> Dict:
client_id = os.getenv("OPENID_CLIENT_ID")
client_secret = os.getenv("OPENID_CLIENT_SECRET")
response = requests.get(
url, data={"grant_type": "client_credentials"}, auth=(client_id, client_secret)
)
metadata = json.loads(response.text)
return metadata
def parse_metadata(query: MetadataQuery, data: Optional[Dict] = {}) -> str:
metadata = Metadata(**query.dict())
metadata.metadata = data
return metadata.json()
def parse_params(query: MetadataQuery):
d = query.dict()
data = {}
for key in d.keys():
if d[key] is None:
continue
# convert times to strings
if type(d[key]) == UTCDateTime:
d[key] = d[key].isoformat()
if key == "category":
d[key] = d[key].value
data[key] = d[key]
def parse_params(query: MetadataQuery) -> str:
query = query.dict()
args = {}
for key in query.keys():
element = query[key]
if element is not None:
# convert times to strings
if type(element) == UTCDateTime:
element = element.isoformat()
# get string value of metadata category
if key == "category":
element = element.value
elif key == "id":
return f"/{element}"
args[key] = element
return urllib.parse.urlencode(data)
return f"?{urllib.parse.urlencode(args)}"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment