reflex icon indicating copy to clipboard operation
reflex copied to clipboard

Okta Authentication Integration

Open amd-srijaroy opened this issue 1 year ago β€’ 1 comments

Can you please add Okta Authentication like Google oAuth ?

https://developer.okta.com/code/

Python SDK : https://github.com/okta/okta-sdk-python. Example : https://github.com/okta/samples-python-flask

React SDK : https://github.com/okta/okta-react Example : https://github.com/okta-samples/okta-react-sample

amd-srijaroy avatar Jul 16 '24 11:07 amd-srijaroy

This can be a part of #3622 plugins

Manas1820 avatar Jul 16 '24 16:07 Manas1820

Solution:

1. File : client_secrets.json

Download it from : https://github.com/okta/samples-python-flask/blob/master/okta-hosted-login/client_secrets.json.dist and fill with your credentials

In callback endpoint : ".... .. /authorization-code/callback" . Refer to the "callback_page" function for the end point url.


2. File : reflex_okta_auth/init.py

from .okta_auth import Login, CallbackState, require_login, is_authorized

__all__ = [
    "Login", "CallbackState", "require_login", "is_authorized"
]


3. File : reflex_okta_auth/helpers.py

import asyncio
import json

from okta_jwt_verifier import AccessTokenVerifier, IDTokenVerifier
import nest_asyncio
nest_asyncio.apply()

loop = asyncio.get_event_loop()

def is_access_token_valid(token, issuer):
    jwt_verifier = AccessTokenVerifier(issuer=issuer, audience='api://default')
    try:
        loop.run_until_complete(jwt_verifier.verify(token))
        return True
    except Exception as e:
        print(str(e))
        return False


def is_id_token_valid(token, issuer, client_id, nonce):
    jwt_verifier = IDTokenVerifier(issuer=issuer, client_id=client_id, audience='api://default')
    try:
        loop.run_until_complete(jwt_verifier.verify(token, nonce=nonce))
        return True
    except Exception as e:
        print(str(e))
        return False


def load_config(fname='./client_secrets.json'):
    config = None
    with open(fname) as f:
        config = json.load(f)
    return config


config = load_config()

Comments: Adopted form https://github.com/okta/samples-python-flask/blob/master/okta-hosted-login/helpers.py


4. File : reflex_okta_auth/okta_auth.py

import reflex as rx
import requests
from reflex_okta_auth.helpers import is_access_token_valid, is_id_token_valid, config
import json


class Login(rx.State):
    
    okta_user_details: str = rx.LocalStorage(name="okta_user_details",sync=True)
    okta_access_token: str = rx.LocalStorage(name="okta_access_token",sync=True)
    okta_id_token: str = rx.LocalStorage(name="okta_id_token",sync=True)
    
    def redirect(self) -> rx.event.EventSpec | None:
        

        if "unique_id" in self.okta_user_details and is_authorized(self.okta_access_token):
            
            return rx.redirect("/")
        
        else:
        
            APP_STATE = 'ApplicationState'
            NONCE = 'SampleNonce'
            
            query_params = {
                'client_id': config["client_id"],
                'redirect_uri': config["redirect_uri"],
                'scope': "openid email profile",
                'state': APP_STATE,
                'nonce': NONCE,
                'response_type': 'code',
                'response_mode': 'query'
            }

            request_uri = "{base_url}?{query_params}".format(
                base_url=config["auth_uri"],
                query_params=requests.compat.urlencode(query_params)
            )

            return rx.redirect(request_uri)


    @rx.cached_var
    def is_authenticated(self) -> bool:
        """Whether the current user is authenticated.

        Returns:
            True if the authenticated user has a positive user ID, False otherwise.
        """
        if "unique_id" in self.okta_user_details and is_authorized(self.okta_access_token):
            return True
        else:
            return False

def require_login(page: rx.app.ComponentCallable) -> rx.app.ComponentCallable:

    def protected_page():
        return rx.fragment(
            rx.cond(
                Login.is_hydrated & Login.is_authenticated,  # type: ignore
                page(),
                rx.center(
                    # When this text mounts, it will redirect to the login page
                    rx.text("Loading...", on_mount=Login.redirect),
                ),
                
            )
        )

    protected_page.__name__ = page.__name__
    return protected_page



def is_authorized(token):
    """Get access token from authorization header."""
    try:
        return is_access_token_valid(token, config["issuer"])
    except Exception as e:
        print(e)
        return False
    
    
    


class CallbackState(rx.State):
    
    okta_user_details: str = rx.LocalStorage(name="okta_user_details",sync=True)
    okta_access_token: str = rx.LocalStorage(name="okta_access_token",sync=True)
    okta_id_token: str = rx.LocalStorage(name="okta_id_token",sync=True)
    code: str = ""

    def get_code(self) -> rx.event.EventSpec | None:
        # Fetch data
        self.code = self.router.page.params.get("code", "")



        NONCE = 'SampleNonce'

        headers = {'Content-Type': 'application/x-www-form-urlencoded'}

        if not self.code:
            return "The code was not returned or is not accessible", 403

        query_params = {
            'grant_type': 'authorization_code',
            'code': self.code,
            'redirect_uri': config["redirect_uri"]
        }
        query_params = requests.compat.urlencode(query_params)
        exchange = requests.post(
            config["token_uri"],
            headers=headers,
            data=query_params,
            auth=(config["client_id"], config["client_secret"]),
        ).json()


        if not exchange.get("token_type"):
            return "Unsupported token type. Should be 'Bearer'.", 403
        access_token = exchange["access_token"]
        id_token = exchange["id_token"] 

        # print(" access_token : ", access_token)
        # print(" id_token : ", id_token)
        if not is_access_token_valid(access_token, config["issuer"]):
            return "Access token is invalid", 403
        else:
            self.okta_access_token = str(access_token)

        if not is_id_token_valid(id_token, config["issuer"], config["client_id"], NONCE):
            return "ID token is invalid", 403
        else:
            self.okta_id_token = str(id_token)
        
        userinfo_response = requests.get(
                                            config["userinfo_uri"],
                                            headers={'Authorization': f'Bearer {access_token}'}
                                        ).json()
        
        self.okta_user_details = json.dumps({"unique_id" : userinfo_response["sub"],
                                        "user_email" : userinfo_response["email"],
                                        "user_name" : userinfo_response["given_name"]})
        
        self.okta_access_token = str(access_token)
        self.okta_id_token = str(id_token)
        return rx.redirect("/")
        

5. File : main_app.py

In your main app

import reflex_okta_auth 

@rx.page(route="/authorization-code/callback", on_load=reflex_okta_auth.CallbackState.get_code)
def callback_page():
    return rx.vstack(
        # rx.text("Code: ", CallbackState.code),
        rx.center(rx.text("Authenticating...")),
    )

@reflex_okta_auth.require_login
def index() -> rx.Component:
    """The main app."""

     ... ... ..


# Add state and page to the app.
app = rx.App( )
app.add_page(index, "/")

amd-srijaroy avatar Sep 16 '24 13:09 amd-srijaroy