Skip to content
Snippets Groups Projects
login.py 4.77 KiB
Newer Older
  • Learn to ignore specific revisions
  • """Module for authentication.
    
    Requires sessions to store user information (recommend ..db.SessionMiddleware)
    
    
    Configuration:
        uses environment variables:
    
        OPENID_CLIENT_ID      - application id, assigned by OIDC provider
        OPENID_CLIENT_SECRET  - application secret, assigned by OIDC provider
        OPENID_METADATA_URL   - url for OIDC provider information
    
    
    Usage:
    
        current_user() - get user information if logged in
    
            def login_optional(user: Optional[User] = Depends(current_user))
    
        require_user() - require user login and group membership
                         NOTE: this is a function generator that must be called.
    
            def any_logged_in_user(user: User = Depends(require_user()))
    
            def admin_users_only(user: User = Depends(require_user(allowed_groups=["admin"])))
    
        router - APIRouter to be registered with FastAPI application:
            app.include_router(login.router)
    
            creates routes:
                /authorize  - callback from OpenIDConnect provider after authentication
                /login      - redirect to OpenIDConnect provider to authenticate
                /logout     - logout current user
                /user       - access current user information as json
    """
    import logging
    import os
    from typing import Callable, List, Optional
    
    from authlib.integrations.starlette_client import OAuth
    from fastapi import APIRouter, Depends, HTTPException
    from pydantic import BaseModel
    from starlette.requests import Request
    from starlette.responses import RedirectResponse
    
    
    class User(BaseModel):
        """Information about a logged in user.
        """
    
        email: str
        sub: str  # unique outh id
        groups: List[str] = []
        name: str = None
        nickname: str = None
        picture: str = None
    
    
    async def current_user(request: Request) -> Optional[User]:
        """Get logged in user, or None if not logged in.
    
        Usage example:
            user: Optional[User] = Depends(current_user)
    
        """
        if "user" in request.session:
            return User(**request.session["user"])
        return None
    
    
    def require_user(allowed_groups: List[str] = None,) -> Callable[[Request, User], User]:
        """Create function to verifies user in allowed_groups
    
        Usage example:
            user: User = Depends(require_user(["admin"]))
    
        Parameters
        ----------
        allowed_groups: require user to be member of any group in list.
        """
    
        async def verify_groups(
            request: Request, user: Optional[User] = Depends(current_user)
        ) -> User:
            if not user:
    
                # not logged in
                raise HTTPException(status_code=401, detail=request.url_for("login"))
    
            if allowed_groups is not None and not any(
                g in user.groups for g in allowed_groups
            ):
                logging.info(
                    f"user ({user.email}, sub={user.sub})"
                    f" not member of any allowed group ({allowed_groups})"
                )
    
                raise HTTPException(403, detail="Forbidden")
    
            return user
    
        return verify_groups
    
    
    oauth = OAuth()
    # creates provider "oauth.openid"
    oauth.register(
        name="openid",
        client_id=os.getenv("OPENID_CLIENT_ID"),
        client_secret=os.getenv("OPENID_CLIENT_SECRET"),
        server_metadata_url=os.getenv("OPENID_METADATA_URL"),
        client_kwargs={"scope": "openid email profile"},
    )
    # routes for login/logout
    router = APIRouter()
    
    
    @router.get("/authorize")
    async def authorize(request: Request):
        """Authorize callback after authenticating using OpenID
        """
    
        # finish login
        token = await oauth.openid.authorize_access_token(request)
    
        request.session["token"] = token
        # add user to session
        userinfo = await oauth.openid.userinfo(token=token)
        request.session["user"] = dict(userinfo)
    
        # redirect
        return RedirectResponse(
            url=request.session.pop(
                "after_authorize_redirect",
                # fall back to index
                request.url_for("index"),
            )
    
        )
    
    
    @router.get("/login")
    async def login(request: Request):
        """Redirect to OpenID provider.
        """
        redirect_uri = request.url_for("authorize")
        # save original location
        if "Referer" in request.headers:
            request.session["after_authorize_redirect"] = request.headers["Referer"]
        # redirect to openid login
        return await oauth.openid.authorize_redirect(request, redirect_uri)
    
    
    @router.get("/logout")
    async def logout(request: Request):
        """Clear session and redirect to index page.
        """
        request.session.pop("token", None)
        request.session.pop("user", None)
    
        return RedirectResponse(
            # referrer when set
            "Referer" in request.headers
            and request.headers["Referer"]
            # otherwise index
            or request.url_for("index")
        )
    
    
    
    @router.get("/user")
    async def user(request: Request, user: User = Depends(require_user())) -> User:
        """Get currently logged in user.
        """
        return user