VYPR
Moderate severityNVD Advisory· Published Sep 8, 2021· Updated Aug 3, 2024

URL Redirection to Untrusted Site ('Open Redirect') in Flask-AppBuilder

CVE-2021-32805

Description

Flask-AppBuilder is an application development framework, built on top of Flask. In affected versions if using Flask-AppBuilder OAuth, an attacker can share a carefully crafted URL with a trusted domain for an application built with Flask-AppBuilder, this URL can redirect a user to a malicious site. This is an open redirect vulnerability. To resolve this issue upgrade to Flask-AppBuilder 3.2.2 or above. If upgrading is infeasible users may filter HTTP traffic containing ?next={next-site} where the next-site domain is different from the application you are protecting as a workaround.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
Flask-AppBuilderPyPI
< 3.3.23.3.2

Affected products

1

Patches

1
6af285215895

fix: improve next URL on OAuth (#1668)

https://github.com/dpgaspar/Flask-AppBuilderDaniel Vaz GasparJul 13, 2021via ghsa
5 files changed · +166 58
  • flask_appbuilder/security/views.py+30 56 modified
    @@ -1,8 +1,20 @@
     import datetime
     import logging
     import re
    -
    -from flask import abort, current_app, flash, g, redirect, request, session, url_for
    +from typing import Optional
    +from urllib.parse import urlparse
    +
    +from flask import (
    +    abort,
    +    current_app,
    +    flash,
    +    g,
    +    redirect,
    +    request,
    +    Response,
    +    session,
    +    url_for,
    +)
     from flask_babel import lazy_gettext
     from flask_login import login_user, logout_user
     import jwt
    @@ -537,53 +549,6 @@ def login(self):
                 self.login_template, title=self.title, form=form, appbuilder=self.appbuilder
             )
     
    -    """
    -        For Future Use, API Auth, must check howto keep REST stateless
    -    """
    -
    -    """
    -    @expose_api(name='auth',url='/api/auth')
    -    def auth(self):
    -        if g.user is not None and g.user.is_authenticated:
    -            http_return_code = 401
    -            response = make_response(
    -                jsonify(
    -                    {
    -                        'message': 'Login Failed already authenticated',
    -                        'severity': 'critical'
    -                    }
    -                ),
    -                http_return_code
    -            )
    -        username = str(request.args.get('username'))
    -        password = str(request.args.get('password'))
    -        user = self.appbuilder.sm.auth_user_ldap(username, password)
    -        if not user:
    -            http_return_code = 401
    -            response = make_response(
    -                jsonify(
    -                    {
    -                        'message': 'Login Failed',
    -                        'severity': 'critical'
    -                    }
    -                ),
    -                http_return_code
    -            )
    -        else:
    -            login_user(user, remember=False)
    -            http_return_code = 201
    -            response = make_response(
    -                jsonify(
    -                    {
    -                        'message': 'Login Success',
    -                         'severity': 'info'
    -                    }
    -                ),
    -                http_return_code
    -            )
    -        return response
    -    """
    -
     
     class AuthOIDView(AuthView):
         login_template = "appbuilder/general/security/login_oid.html"
    @@ -641,7 +606,9 @@ class AuthOAuthView(AuthView):
         @expose("/login/")
         @expose("/login/<provider>")
         @expose("/login/<provider>/<register>")
    -    def login(self, provider=None, register=None):
    +    def login(
    +        self, provider: Optional[str] = None, register: Optional[str] = None
    +    ) -> Response:
             log.debug("Provider: {0}".format(provider))
             if g.user is not None and g.user.is_authenticated:
                 log.debug("Already authenticated {0}".format(g.user))
    @@ -690,8 +657,12 @@ def login(self, provider=None, register=None):
                 return redirect(self.appbuilder.get_url_for_index)
     
         @expose("/oauth-authorized/<provider>")
    -    def oauth_authorized(self, provider):
    +    def oauth_authorized(self, provider: str) -> Response:
             log.debug("Authorized init")
    +        if provider not in self.appbuilder.sm.oauth_remotes:
    +            flash(u"Provider not supported.", "warning")
    +            log.warning("OAuth authorized got an unknown provider %s", provider)
    +            return redirect(self.appbuilder.get_url_for_login)
             resp = self.appbuilder.sm.oauth_remotes[provider].authorize_access_token()
             if resp is None:
                 flash(u"You denied the request to sign in.", "warning")
    @@ -735,11 +706,14 @@ def oauth_authorized(self, provider):
                 except jwt.InvalidTokenError:
                     raise Exception("State signature is not valid!")
     
    -            try:
    -                next_url = state["next"][0] or self.appbuilder.get_url_for_index
    -            except (KeyError, IndexError):
    -                next_url = self.appbuilder.get_url_for_index
    -
    +            next_url = self.appbuilder.get_url_for_index
    +            # Check if there is a next url on state
    +            if "next" in state and len(state["next"]) > 0:
    +                parsed_uri = urlparse(state["next"][0])
    +                if parsed_uri.netloc != request.host:
    +                    log.warning("Got an invalid next URL: %s", parsed_uri.netloc)
    +                else:
    +                    next_url = state["next"][0]
                 return redirect(next_url)
     
     
    
  • flask_appbuilder/tests/config_oauth.py+36 0 added
    @@ -0,0 +1,36 @@
    +import os
    +
    +from flask_appbuilder.security.manager import AUTH_OAUTH
    +
    +basedir = os.path.abspath(os.path.dirname(__file__))
    +
    +SQLALCHEMY_DATABASE_URI = os.environ.get(
    +    "SQLALCHEMY_DATABASE_URI"
    +) or "sqlite:///" + os.path.join(basedir, "app.db")
    +
    +SECRET_KEY = "thisismyscretkey"
    +
    +AUTH_TYPE = AUTH_OAUTH
    +
    +OAUTH_PROVIDERS = [
    +    {
    +        "name": "google",
    +        "icon": "fa-google",
    +        "token_key": "access_token",
    +        "remote_app": {
    +            "client_id": "CLIENT_ID",
    +            "client_secret": "CLIENT_SECRET",
    +            "api_base_url": "https://www.googleapis.com/oauth2/v2/",
    +            "client_kwargs": {"scope": "email profile"},
    +            "request_token_url": None,
    +            "access_token_url": "https://accounts.google.com/o/oauth2/token",
    +            "authorize_url": "https://accounts.google.com/o/oauth2/auth",
    +        },
    +    }
    +]
    +
    +# Will allow user self registration
    +AUTH_USER_REGISTRATION = True
    +
    +# The default user self registration role for all users
    +AUTH_USER_REGISTRATION_ROLE = "Admin"
    
  • flask_appbuilder/tests/test_mvc_oauth.py+95 0 added
    @@ -0,0 +1,95 @@
    +from flask_appbuilder import SQLA
    +from flask_appbuilder.tests.base import FABTestCase
    +import jwt
    +
    +
    +class UserInfoReponseMock:
    +    def json(self):
    +        return {
    +            "id": "1",
    +            "given_name": "first-name",
    +            "family_name": "last-name",
    +            "email": "user1@fab.org",
    +        }
    +
    +
    +class OAuthRemoteMock:
    +    def authorize_access_token(self):
    +        return {"access_token": "some-key"}
    +
    +    def get(self, item):
    +        if item == "userinfo":
    +            return UserInfoReponseMock()
    +
    +
    +class APICSRFTestCase(FABTestCase):
    +    def setUp(self):
    +        from flask import Flask
    +        from flask_wtf import CSRFProtect
    +        from flask_appbuilder import AppBuilder
    +
    +        self.app = Flask(__name__)
    +        self.app.config.from_object("flask_appbuilder.tests.config_oauth")
    +        self.app.config["WTF_CSRF_ENABLED"] = True
    +
    +        self.csrf = CSRFProtect(self.app)
    +        self.db = SQLA(self.app)
    +        self.appbuilder = AppBuilder(self.app, self.db.session)
    +
    +    def test_oauth_login(self):
    +        """
    +        OAuth: Test login
    +        """
    +        client = self.app.test_client()
    +
    +        self.appbuilder.sm.oauth_remotes = {"google": OAuthRemoteMock()}
    +
    +        raw_state = {}
    +        state = jwt.encode(raw_state, self.app.config["SECRET_KEY"], algorithm="HS256")
    +
    +        response = client.get(f"/oauth-authorized/google?state={state.decode('utf-8')}")
    +        self.assertEqual(response.location, "http://localhost/")
    +
    +    def test_oauth_login_unknown_provider(self):
    +        """
    +        OAuth: Test login with unknown provider
    +        """
    +        client = self.app.test_client()
    +
    +        self.appbuilder.sm.oauth_remotes = {"google": OAuthRemoteMock()}
    +
    +        raw_state = {}
    +        state = jwt.encode(raw_state, self.app.config["SECRET_KEY"], algorithm="HS256")
    +
    +        response = client.get(
    +            f"/oauth-authorized/unknown_provider?state={state.decode('utf-8')}"
    +        )
    +        self.assertEqual(response.location, "http://localhost/login/")
    +
    +    def test_oauth_login_next(self):
    +        """
    +        OAuth: Test login next
    +        """
    +        client = self.app.test_client()
    +
    +        self.appbuilder.sm.oauth_remotes = {"google": OAuthRemoteMock()}
    +
    +        raw_state = {"next": ["http://localhost/users/list/"]}
    +        state = jwt.encode(raw_state, self.app.config["SECRET_KEY"], algorithm="HS256")
    +
    +        response = client.get(f"/oauth-authorized/google?state={state.decode('utf-8')}")
    +        self.assertEqual(response.location, "http://localhost/users/list/")
    +
    +    def test_oauth_login_next_check(self):
    +        """
    +        OAuth: Test login next check
    +        """
    +        client = self.app.test_client()
    +
    +        self.appbuilder.sm.oauth_remotes = {"google": OAuthRemoteMock()}
    +
    +        raw_state = {"next": ["http://www.google.com"]}
    +        state = jwt.encode(raw_state, self.app.config["SECRET_KEY"], algorithm="HS256")
    +
    +        response = client.get(f"/oauth-authorized/google?state={state.decode('utf-8')}")
    +        self.assertEqual(response.location, "http://localhost/")
    
  • requirements-extra.txt+1 1 modified
    @@ -7,5 +7,5 @@ mysqlclient==2.0.1
     psycopg2-binary==2.8.6
     pyodbc==4.0.30
     requests==2.25.0
    -Authlib==0.15.2
    +Authlib==0.15.4
     python-ldap==3.3.1
    
  • setup.py+4 1 modified
    @@ -67,7 +67,10 @@ def desc():
             "PyJWT>=1.7.1, <2.0.0",
             "sqlalchemy-utils>=0.32.21, <1",
         ],
    -    extras_require={"jmespath": ["jmespath>=0.9.5"]},
    +    extras_require={
    +        "jmespath": ["jmespath>=0.9.5"],
    +        "oauth": ["Authlib>=0.14, <1.0.0"],
    +    },
         tests_require=["nose>=1.0", "mockldap>=0.3.0"],
         classifiers=[
             "Development Status :: 5 - Production/Stable",
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

7

News mentions

0

No linked articles in our index yet.