From 8da741190ebace9d6d66a3fb1c8898d2c28da21f Mon Sep 17 00:00:00 2001 From: pcain-usgs <pcain@usgs.gov> Date: Wed, 17 Feb 2021 15:11:36 -0700 Subject: [PATCH] Gitlab token for delete, post, and update --- geomagio/api/secure/login.py | 46 +++++++++++++- geomagio/api/secure/metadata.py | 2 +- geomagio/geomagioapi/metadata.py | 41 ++++++++++--- geomagio/metadata/MetadataFactory.py | 92 +++++++++++++++------------- 4 files changed, 128 insertions(+), 53 deletions(-) diff --git a/geomagio/api/secure/login.py b/geomagio/api/secure/login.py index c3407d931..92246b504 100644 --- a/geomagio/api/secure/login.py +++ b/geomagio/api/secure/login.py @@ -33,8 +33,10 @@ Usage: /logout - logout current user /user - access current user information as json """ +import json import logging import os +import requests from typing import Callable, List, Optional from authlib.integrations.starlette_client import OAuth @@ -56,17 +58,59 @@ class User(BaseModel): async def current_user(request: Request) -> Optional[User]: - """Get logged in user, or None if not logged in. + """Get user information from gitlab access token or session(if currently logged in). + Returns none if access token is not vald or user is not logged in. Usage example: user: Optional[User] = Depends(current_user) """ + if "Authorization" in request.headers: + return get_api_user(token=request.headers["Authorization"]) if "user" in request.session: return User(**request.session["user"]) return None +def get_api_user(token: str) -> Optional[User]: + url = os.getenv("GITLAB_API_URL") + header = {"PRIVATE-TOKEN": token} + # request user information from gitlab api with access token + user_response = requests.get( + f"{url}/user", + headers=header, + ) + userinfo = json.loads(user_response.content) + try: + user = User( + email=userinfo["email"], + sub=userinfo["id"], + name=userinfo["name"], + nickname=userinfo["username"], + picture=userinfo["avatar_url"], + ) + except KeyError: + logging.info("Invalid token") + return None + # use valid token to retrieve user's groups + user.groups = get_groups_api( + groups_response=requests.get( + f"{url}/groups", + headers=header, + ) + ) + + return user + + +def get_groups_api(groups_response: requests.Response) -> List[str]: + """returns a user's groups via gitlab api""" + groups = json.loads(groups_response.content) + group_urls = [g["web_url"] for g in groups] + group_names = [url[url.find("ghsc") :] for url in group_urls] + return group_names + + def require_user( allowed_groups: List[str] = None, ) -> Callable[[Request, User], User]: diff --git a/geomagio/api/secure/metadata.py b/geomagio/api/secure/metadata.py index 710128d1a..227ce65d0 100644 --- a/geomagio/api/secure/metadata.py +++ b/geomagio/api/secure/metadata.py @@ -41,7 +41,7 @@ async def create_metadata( @router.delete("/metadata/{id}") async def delete_metadata( - id: int, user: User = Depends(require_user(os.getenv("ADMIN_GROUP", "admin"))) + id: int, user: User = Depends(require_user([os.getenv("ADMIN_GROUP", "admin")])) ): await metadata_table.delete_metadata(id) diff --git a/geomagio/geomagioapi/metadata.py b/geomagio/geomagioapi/metadata.py index 9d56037c7..a33317e6f 100644 --- a/geomagio/geomagioapi/metadata.py +++ b/geomagio/geomagioapi/metadata.py @@ -1,5 +1,6 @@ +import json import os -from typing import Literal, Optional +from typing import Dict, Optional from obspy import UTCDateTime import typer @@ -17,6 +18,10 @@ def client( url: str = "http://{}/ws/secure/metadata".format( os.getenv("EDGE_HOST", "127.0.0.1:8000") ), + id: Optional[int] = typer.Option( + None, + help="Database id required for deleting and updating metadata. NOTE: Metadata requests by id ignore additional parameters", + ), category: Optional[MetadataCategory] = None, starttime: Optional[str] = None, endtime: Optional[str] = None, @@ -26,10 +31,18 @@ def client( station: Optional[str] = None, channel: Optional[str] = None, location: Optional[str] = None, - data_valid: Optional[bool] = None, + data_valid: Optional[bool] = True, metadata_valid: Optional[bool] = True, + input_file: Optional[str] = typer.Option( + None, + help="JSON formatted file containing non-shared metadata information", + ), + token: Optional[str] = typer.Option( + os.getenv("GITLAB_API_TOKEN"), help="Gitlab account access token" + ), ): query = MetadataQuery( + id=id, category=category, starttime=UTCDateTime(starttime) if starttime else None, endtime=UTCDateTime(endtime) if endtime else None, @@ -42,13 +55,21 @@ def client( data_valid=data_valid, metadata_valid=metadata_valid, ) - factory = MetadataFactory(url=url) + factory = MetadataFactory(url=url, token=token) if action == "delete": - factory.delete_metadata(query=query) + response = factory.delete_metadata(id=query.id) elif action == "get": - metadata = factory.get_metadata(query=query) - if action == "post": - factory.post_metadata(query=query) - if action == "update": - factory.update_metadata(query=query) - return metadata + response = factory.get_metadata(query=query) + elif action in ["post", "update"]: + try: + with open(input_file, "r") as file: + data = json.load(file) + except (FileNotFoundError, TypeError): + raise ValueError("Input file invalid or not provided") + if action == "post": + response = factory.post_metadata(query=query, data=data) + elif action == "update": + response = factory.update_metadata(id=query.id, query=query, data=data) + else: + raise ValueError("Invalid action") + return response diff --git a/geomagio/metadata/MetadataFactory.py b/geomagio/metadata/MetadataFactory.py index dc3fefe09..21c9e003f 100644 --- a/geomagio/metadata/MetadataFactory.py +++ b/geomagio/metadata/MetadataFactory.py @@ -1,16 +1,14 @@ -import os import json +from json.decoder import JSONDecodeError +import os import requests -import urllib from typing import Dict, List, Optional +import urllib from obspy import UTCDateTime -from pydantic import parse_obj_as -from ..api.secure import MetadataQuery -from ..residual import Reading +from ..api.secure.MetadataQuery import MetadataQuery from .Metadata import Metadata -from .MetadataCategory import MetadataCategory class MetadataFactory(object): @@ -19,50 +17,62 @@ class MetadataFactory(object): url: str = "http://{}/ws/secure/metadata".format( os.getenv("EDGE_HOST", "127.0.0.1:8000") ), + token: str = os.getenv("GITLAB_API_TOKEN"), ): self.url = url + self.token = token + self.header = {"Authorization": self.token} if token else None - def delete_metadata(self, query: MetadataQuery): - raise NotImplementedError - - def format_metadata(self, data: Dict): - # formats responses as Metadata objects - return parse_obj_as(List[Metadata], data) + def delete_metadata(self, id: int) -> Dict: + response = requests.delete(url=f"{self.url}/{id}", headers=self.header) + return response - def get_metadata(self, query: MetadataQuery) -> List[Metadata]: + def get_metadata(self, query: MetadataQuery) -> List[Dict]: args = parse_params(query=query) - response = web_request(url=f"{self.url}?{args}") - metadata = self.format_metadata(data=response) - return metadata + raw_response = requests.get(url=f"{self.url}{args}", headers=self.header) + try: + response = json.loads(raw_response.content) + except JSONDecodeError: + raise ValueError("Data not found") + return response - def post_metadata(self, query: MetadataQuery): - raise NotImplementedError + def post_metadata( + self, query: MetadataQuery, data: Optional[Dict] = {} + ) -> requests.Response: + metadata = parse_metadata(query=query, data=data) + response = requests.post(url=self.url, data=metadata, headers=self.header) + return response - def update_metadata(self, query: MetadataQuery): - raise NotImplementedError + def update_metadata( + self, id: int, query: MetadataQuery, data: Optional[Dict] = {} + ) -> requests.Response: + metadata = parse_metadata(query=query, data=data) + response = requests.put( + url=f"{self.url}/{query.id}", data=metadata, headers=self.header + ) + return response -def web_request(url: str) -> Dict: - client_id = os.getenv("OPENID_CLIENT_ID") - client_secret = os.getenv("OPENID_CLIENT_SECRET") - response = requests.get( - url, data={"grant_type": "client_credentials"}, auth=(client_id, client_secret) - ) - metadata = json.loads(response.text) - return metadata +def parse_metadata(query: MetadataQuery, data: Optional[Dict] = {}) -> str: + metadata = Metadata(**query.dict()) + metadata.metadata = data + return metadata.json() -def parse_params(query: MetadataQuery): - d = query.dict() - data = {} - for key in d.keys(): - if d[key] is None: - continue - # convert times to strings - if type(d[key]) == UTCDateTime: - d[key] = d[key].isoformat() - if key == "category": - d[key] = d[key].value - data[key] = d[key] +def parse_params(query: MetadataQuery) -> str: + query = query.dict() + args = {} + for key in query.keys(): + element = query[key] + if element is not None: + # convert times to strings + if type(element) == UTCDateTime: + element = element.isoformat() + # get string value of metadata category + if key == "category": + element = element.value + elif key == "id": + return f"/{element}" + args[key] = element - return urllib.parse.urlencode(data) + return f"?{urllib.parse.urlencode(args)}" -- GitLab