Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""Module for authentication.
Requires sessions to store user information (recommend ..db.SessionMiddleware)
Configuration:
uses environment variables:
OPENID_CLIENT_ID - application id, assigned by OIDC provider
OPENID_CLIENT_SECRET - application secret, assigned by OIDC provider
OPENID_METADATA_URL - url for OIDC provider information
Usage:
current_user() - get user information if logged in
def login_optional(user: Optional[User] = Depends(current_user))
require_user() - require user login and group membership
NOTE: this is a function generator that must be called.
def any_logged_in_user(user: User = Depends(require_user()))
def admin_users_only(user: User = Depends(require_user(allowed_groups=["admin"])))
router - APIRouter to be registered with FastAPI application:
app.include_router(login.router)
creates routes:
/authorize - callback from OpenIDConnect provider after authentication
/login - redirect to OpenIDConnect provider to authenticate
/logout - logout current user
/user - access current user information as json
"""
import logging
import os
from typing import Callable, List, Optional
from authlib.integrations.starlette_client import OAuth
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from starlette.requests import Request
from starlette.responses import RedirectResponse
class User(BaseModel):
"""Information about a logged in user.
"""
email: str
sub: str # unique outh id
groups: List[str] = []
name: str = None
nickname: str = None
picture: str = None
async def current_user(request: Request) -> Optional[User]:
"""Get logged in user, or None if not logged in.
Usage example:
user: Optional[User] = Depends(current_user)
"""
if "user" in request.session:
return User(**request.session["user"])
return None
def require_user(allowed_groups: List[str] = None,) -> Callable[[Request, User], User]:
"""Create function to verifies user in allowed_groups
Usage example:
user: User = Depends(require_user(["admin"]))
Parameters
----------
allowed_groups: require user to be member of any group in list.
"""
async def verify_groups(
request: Request, user: Optional[User] = Depends(current_user)
) -> User:
if not user:
# not logged in, redirect
return RedirectResponse(request.url_for("login"))
if allowed_groups is not None and not any(
g in user.groups for g in allowed_groups
):
logging.info(
f"user ({user.email}, sub={user.sub})"
f" not member of any allowed group ({allowed_groups})"
)
raise HTTPException(401, "Not Authorized")
return user
return verify_groups
oauth = OAuth()
# creates provider "oauth.openid"
oauth.register(
name="openid",
client_id=os.getenv("OPENID_CLIENT_ID"),
client_secret=os.getenv("OPENID_CLIENT_SECRET"),
server_metadata_url=os.getenv("OPENID_METADATA_URL"),
client_kwargs={"scope": "openid email profile"},
)
# routes for login/logout
router = APIRouter()
@router.get("/authorize")
async def authorize(request: Request):
"""Authorize callback after authenticating using OpenID
"""
# finish login
token = await oauth.openid.authorize_access_token(request)
request.session["token"] = token
# add user to session
userinfo = await oauth.openid.userinfo(token=token)
request.session["user"] = dict(userinfo)
# redirect to original location
url = request.session.pop(
"after_authorize_redirect",
# or fall back to index
request.url_for("index"),
)
return RedirectResponse(url=url)
@router.get("/login")
async def login(request: Request):
"""Redirect to OpenID provider.
"""
redirect_uri = request.url_for("authorize")
# save original location
if "Referer" in request.headers:
request.session["after_authorize_redirect"] = request.headers["Referer"]
# redirect to openid login
return await oauth.openid.authorize_redirect(request, redirect_uri)
@router.get("/logout")
async def logout(request: Request):
"""Clear session and redirect to index page.
"""
request.session.pop("token", None)
request.session.pop("user", None)
return RedirectResponse(request.url_for("index"))
@router.get("/user")
async def user(request: Request, user: User = Depends(require_user())) -> User:
"""Get currently logged in user.
"""
return user