VYPR
High severityNVD Advisory· Published Dec 2, 2020· Updated Aug 4, 2024

Cryptographic issues in Python oic

CVE-2020-26244

Description

Python oic is a Python OpenID Connect implementation. In Python oic before version 1.2.1, there are several related cryptographic issues affecting client implementations that use the library. The issues are: 1) The IdToken signature algorithm was not checked automatically, but only if the expected algorithm was passed in as a kwarg. 2) JWA none algorithm was allowed in all flows. 3) oic.consumer.Consumer.parse_authz returns an unverified IdToken. The verification of the token was left to the discretion of the implementator. 4) iat claim was not checked for sanity (i.e. it could be in the future). These issues are patched in version 1.2.1.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
oicPyPI
< 1.2.11.2.1

Affected products

1

Patches

1
62f8d753fa17

Merge pull request from GHSA-4fjv-pmhg-3rfg

https://github.com/OpenIDC/pyoidctpazderkaDec 1, 2020via ghsa
7 files changed · +392 30
  • src/oic/oic/consumer.py+29 7 modified
    @@ -3,6 +3,8 @@
     import warnings
     from typing import Dict
     from typing import Optional
    +from typing import Tuple
    +from typing import Union
     
     from oic import rndstr
     from oic.exception import AuthzError
    @@ -22,6 +24,7 @@
     from oic.oic.message import BackChannelLogoutRequest
     from oic.oic.message import Claims
     from oic.oic.message import ClaimsRequest
    +from oic.oic.message import IdToken
     from oic.utils import http_util
     from oic.utils.sanitize import sanitize
     from oic.utils.sdb import DictSessionBackend
    @@ -340,6 +343,7 @@ def begin(self, scope="", response_type="", use_nonce=False, path="", **kwargs):
             if self.debug:
                 _log_info("Redirecting to: %s" % location)
     
    +        self.authz_req[areq["state"]] = areq
             return sid, location
     
         def _parse_authz(self, query="", **kwargs):
    @@ -364,7 +368,16 @@ def _parse_authz(self, query="", **kwargs):
             self.redirect_uris = [self.sdb[_state]["redirect_uris"]]
             return aresp, _state
     
    -    def parse_authz(self, query="", **kwargs):
    +    def parse_authz(
    +        self, query="", **kwargs
    +    ) -> Union[
    +        http_util.BadRequest,
    +        Tuple[
    +            Optional[AuthorizationResponse],
    +            Optional[AccessTokenResponse],
    +            Optional[IdToken],
    +        ],
    +    ]:
             """
             Parse authorization response from server.
     
    @@ -375,17 +388,20 @@ def parse_authz(self, query="", **kwargs):
             ["id_token"]
             ["id_token", "token"]
             ["token"]
    -
    -        :return: A AccessTokenResponse instance
             """
             _log_info = logger.info
             logger.debug("- authorization -")
     
    +        # FIXME: This shouldn't be here... We should rather raise a sepcific Client error
    +        # That would simplify the return value of this function
    +        # and drop bunch of assertions from tests added in this commit.
             if not query:
                 return http_util.BadRequest("Missing query")
     
             _log_info("response: %s" % sanitize(query))
     
    +        if "algs" not in kwargs:
    +            kwargs["algs"] = self.sign_enc_algs("id_token")
             if "code" in self.consumer_config["response_type"]:
                 aresp, _state = self._parse_authz(query, **kwargs)
     
    @@ -410,9 +426,10 @@ def parse_authz(self, query="", **kwargs):
                     except KeyError:
                         pass
     
    -            return aresp, atr, idt
             elif "token" in self.consumer_config["response_type"]:  # implicit flow
                 _log_info("Expect Access Token Response")
    +            aresp = None
    +            _state = None
                 atr = self.parse_response(
                     AccessTokenResponse,
                     info=query,
    @@ -423,8 +440,8 @@ def parse_authz(self, query="", **kwargs):
                 if isinstance(atr, ErrorResponse):
                     raise TokenError(atr.get("error"), atr)
     
    -            idt = None
    -            return None, atr, idt
    +            idt = atr.get("id_token")
    +
             else:  # only id_token
                 aresp, _state = self._parse_authz(query, **kwargs)
     
    @@ -437,8 +454,13 @@ def parse_authz(self, query="", **kwargs):
                         session_update(self.sso_db, _state, "smid", idt["sid"])
                     except KeyError:
                         pass
    +            # Null the aresp as only id_token should be returned
    +            aresp = atr = None
     
    -            return None, None, idt
    +        # Verify the IdToken if it was present
    +        if idt is not None:
    +            self.verify_id_token(idt, self.authz_req.get(_state or atr["state"]))
    +        return aresp, atr, idt
     
         def complete(self, state):
             """
    
  • src/oic/oic/__init__.py+14 2 modified
    @@ -44,6 +44,7 @@
     from oic.oauth2.message import ErrorResponse
     from oic.oauth2.message import Message
     from oic.oauth2.message import MessageFactory
    +from oic.oauth2.message import WrongSigningAlgorithm
     from oic.oauth2.util import get_or_post
     from oic.oic.message import SCOPE2CLAIMS
     from oic.oic.message import AccessTokenResponse
    @@ -1432,7 +1433,13 @@ def sign_enc_algs(self, typ):
             return resp
     
         def _verify_id_token(
    -        self, id_token, nonce="", acr_values=None, auth_time=0, max_age=0
    +        self,
    +        id_token,
    +        nonce="",
    +        acr_values=None,
    +        auth_time=0,
    +        max_age=0,
    +        response_type="",
         ):
             """
             Verify IdToken.
    @@ -1465,6 +1472,11 @@ def _verify_id_token(
             if _now > id_token["exp"]:
                 raise OtherError("Passed best before date")
     
    +        if response_type != ["code"] and id_token.jws_header["alg"] == "none":
    +            raise WrongSigningAlgorithm(
    +                "none is not allowed outside Authorization Flow."
    +            )
    +
             if (
                 self.id_token_max_age
                 and _now > int(id_token["iat"]) + self.id_token_max_age
    @@ -1491,7 +1503,7 @@ def verify_id_token(self, id_token, authn_req):
             except KeyError:
                 pass
     
    -        for param in ["acr_values", "max_age"]:
    +        for param in ["acr_values", "max_age", "response_type"]:
                 try:
                     kwa[param] = authn_req[param]
                 except KeyError:
    
  • src/oic/oic/message.py+12 5 modified
    @@ -313,17 +313,19 @@ def verify_id_token(instance, check_hash=False, **kwargs):
     
         if check_hash:
             _alg = idt.jws_header["alg"]
    -        # What if _alg == 'none'
    -
    -        hfunc = "HS" + _alg[-3:]
    +        if _alg != "none":
    +            hfunc = "HS" + _alg[-3:]
    +        else:
    +            # This is allowed only for `code` and it needs to be checked by a Client
    +            hfunc = None
     
    -        if "access_token" in instance:
    +        if "access_token" in instance and hfunc is not None:
                 if "at_hash" not in idt:
                     raise MissingRequiredAttribute("Missing at_hash property", idt)
                 if idt["at_hash"] != jws.left_hash(instance["access_token"], hfunc):
                     raise AtHashError("Failed to verify access_token hash", idt)
     
    -        if "code" in instance:
    +        if "code" in instance and hfunc is not None:
                 if "c_hash" not in idt:
                     raise MissingRequiredAttribute("Missing c_hash property", idt)
                 if idt["c_hash"] != jws.left_hash(instance["code"], hfunc):
    @@ -780,6 +782,11 @@ def verify(self, **kwargs):
             else:
                 if (_iat + _storage_time) < (_now - _skew):
                     raise IATError("Issued too long ago")
    +            if _now < (_iat - _skew):
    +                raise IATError("Issued in the future")
    +
    +        if _exp < _iat:
    +            raise EXPError("Invalid expiration time")
     
             return True
     
    
  • tests/test_oic_consumer_logout.py+15 6 modified
    @@ -159,13 +159,16 @@ def test_logout_with_sub(self):
                 "openid", "code", path="https://example.com"
             )
             resp = self.provider.authorization_endpoint(request=request_location)
    -        aresp = self.consumer.parse_authz(resp.message)
    +        part = self.consumer.parse_authz(resp.message)
    +        assert isinstance(part, tuple)
    +        aresp = part[0]
    +        assert aresp
     
             assert self.consumer.sdb[sid]["issuer"] == self.provider.baseurl
     
             # Simulate an accesstoken request
             areq = AccessTokenRequest(
    -            code=aresp[0]["code"],
    +            code=aresp["code"],
                 client_id=CLIENT_ID,
                 redirect_uri="http://example.com/authz",
                 client_secret=self.consumer.client_secret,
    @@ -228,13 +231,16 @@ def test_logout_without_sub(self):
                 "openid", "code", path="https://example.com"
             )
             resp = self.provider.authorization_endpoint(request=request_location)
    -        aresp = self.consumer.parse_authz(resp.message)
    +        part = self.consumer.parse_authz(resp.message)
    +        assert isinstance(part, tuple)
    +        aresp = part[0]
    +        assert aresp
     
             assert self.consumer.sdb[sid]["issuer"] == self.provider.baseurl
     
             # Simulate an accesstoken request
             areq = AccessTokenRequest(
    -            code=aresp[0]["code"],
    +            code=aresp["code"],
                 client_id=CLIENT_ID,
                 redirect_uri="http://example.com/authz",
                 client_secret=self.consumer.client_secret,
    @@ -308,13 +314,16 @@ def test_sso_db_dict(self):
                 "openid", "code", path="https://example.com"
             )
             resp = self.provider.authorization_endpoint(request=request_location)
    -        aresp = _consumer.parse_authz(resp.message)
    +        part = _consumer.parse_authz(resp.message)
    +        assert isinstance(part, tuple)
    +        aresp = part[0]
    +        assert aresp
     
             assert _consumer.sdb[sid]["issuer"] == self.provider.baseurl
     
             # Simulate an accesstoken request
             areq = AccessTokenRequest(
    -            code=aresp[0]["code"],
    +            code=aresp["code"],
                 client_id=CLIENT_ID,
                 redirect_uri="http://example.com/authz",
                 client_secret=_consumer.client_secret,
    
  • tests/test_oic_consumer.py+249 8 modified
    @@ -10,12 +10,14 @@
     from jwkest.jwk import SYMKey
     
     from oic.oauth2.message import MissingSigningKey
    +from oic.oauth2.message import WrongSigningAlgorithm
     from oic.oic import DEF_SIGN_ALG
     from oic.oic import Server
     from oic.oic import response_types_to_grant_types
     from oic.oic.consumer import IGNORE
     from oic.oic.consumer import Consumer
     from oic.oic.consumer import clean_response
    +from oic.oic.message import AccessTokenRequest
     from oic.oic.message import AccessTokenResponse
     from oic.oic.message import AuthorizationResponse
     from oic.oic.message import IdToken
    @@ -147,6 +149,9 @@ def setup_consumer(self, session_db_factory):
             self.consumer.userinfo_endpoint = "https://example.com/userinfo"  # type: ignore
             self.consumer.client_secret = "hemlig"
             self.consumer.secret_type = "basic"
    +        self.consumer.provider_info = ProviderConfigurationResponse(
    +            issuer="https://example.com"
    +        )  # abs min
     
         def test_backup_keys(self):
             keys = self.consumer.__dict__.keys()
    @@ -326,6 +331,7 @@ def test_parse_authz(self):
             self.consumer._backup(_state)
     
             part = self.consumer.parse_authz(query=result.headers["location"])
    +        assert isinstance(part, tuple)
             atr = part[0]
             assert part[1] is None
             assert part[2] is None
    @@ -359,6 +365,7 @@ def test_parse_authz_implicit(self):
                 )
     
             part = self.consumer.parse_authz(query=result.headers["location"])
    +        assert isinstance(part, tuple)
             assert part[0] is None
             atr = part[1]
             assert part[2] is None
    @@ -440,6 +447,7 @@ def test_complete_auth_token(self):
     
             parsed = urlparse(result.headers["location"])
             part = self.consumer.parse_authz(query=parsed.query)
    +        assert isinstance(part, tuple)
             auth = part[0]
             acc = part[1]
             assert part[2] is None
    @@ -456,8 +464,67 @@ def test_complete_auth_token_idtoken(self):
             _state = "state0"
             self.consumer.consumer_config["response_type"] = ["id_token", "token"]
             self.consumer.registration_response = RegistrationResponse(
    -            id_token_signed_response_alg="RS256"
    +            id_token_signed_response_alg="HS256"
    +        )
    +        self.consumer.authz_req = {}  # Store AuthzReq with state as key
    +
    +        args = {
    +            "client_id": self.consumer.client_id,
    +            "response_type": self.consumer.consumer_config["response_type"],
    +            "scope": ["openid"],
    +            "nonce": "nonce",
    +        }
    +        token = IdToken(
    +            iss="https://example.com",
    +            aud="client_1",
    +            sub="some_sub",
    +            exp=1565348600,
    +            iat=1565348300,
    +            nonce="nonce",
    +        )
    +        location = (
    +            "https://example.com/cb?state=state0&access_token=token&token_type=bearer&"
    +            "scope=openid&id_token={}".format(
    +                token.to_jwt(key=[SYMKey(key="hemlig")], algorithm="HS256")
    +            )
    +        )
    +        with responses.RequestsMock() as rsps:
    +            rsps.add(
    +                responses.GET,
    +                "https://example.com/authorization",
    +                status=302,
    +                headers={"location": location},
    +            )
    +            result = self.consumer.do_authorization_request(
    +                state=_state, request_args=args
    +            )
    +            query = parse_qs(urlparse(result.request.url).query)
    +            assert query["client_id"] == ["client_1"]
    +            assert query["scope"] == ["openid"]
    +            assert query["response_type"] == ["id_token token"]
    +            assert query["state"] == ["state0"]
    +            assert query["nonce"] == ["nonce"]
    +            assert query["redirect_uri"] == ["https://example.com/cb"]
    +
    +        parsed = urlparse(result.headers["location"])
    +
    +        with freeze_time("2019-08-09 11:00:00"):
    +            part = self.consumer.parse_authz(query=parsed.query)
    +        assert isinstance(part, tuple)
    +        auth = part[0]
    +        atr = part[1]
    +        idt = part[2]
    +
    +        assert auth is None
    +        assert isinstance(atr, AccessTokenResponse)
    +        assert _eq(
    +            atr.keys(), ["access_token", "id_token", "token_type", "state", "scope"]
             )
    +        assert isinstance(idt, IdToken)
    +
    +    def test_complete_auth_token_idtoken_no_alg_config(self):
    +        _state = "state0"
    +        self.consumer.consumer_config["response_type"] = ["id_token", "token"]
             self.consumer.provider_info = ProviderConfigurationResponse(
                 issuer="https://example.com"
             )  # abs min
    @@ -480,7 +547,7 @@ def test_complete_auth_token_idtoken(self):
             location = (
                 "https://example.com/cb?state=state0&access_token=token&token_type=bearer&"
                 "scope=openid&id_token={}".format(
    -                token.to_jwt(key=KC_RSA.keys(), algorithm="RS256")
    +                token.to_jwt(key=[SYMKey(key="hemlig")], algorithm="HS256")
                 )
             )
             with responses.RequestsMock() as rsps:
    @@ -504,23 +571,196 @@ def test_complete_auth_token_idtoken(self):
             parsed = urlparse(result.headers["location"])
     
             with freeze_time("2019-08-09 11:00:00"):
    -            part = self.consumer.parse_authz(
    -                query=parsed.query, algs=self.consumer.sign_enc_algs("id_token")
    -            )
    +            part = self.consumer.parse_authz(query=parsed.query, algs={"sign": "HS256"})
    +        assert isinstance(part, tuple)
             auth = part[0]
             atr = part[1]
    -        assert part[2] is None
    +        idt = part[2]
     
             assert auth is None
             assert isinstance(atr, AccessTokenResponse)
             assert _eq(
                 atr.keys(), ["access_token", "id_token", "token_type", "state", "scope"]
             )
    +        assert isinstance(idt, IdToken)
    +
    +    def test_complete_auth_token_idtoken_none_cipher_code(self):
    +        _state = "state0"
    +        self.consumer.consumer_config["response_type"] = ["code"]
    +        self.consumer.registration_response = RegistrationResponse(
    +            id_token_signed_response_alg="none"
    +        )
    +        self.consumer.provider_info = ProviderConfigurationResponse(
    +            issuer="https://example.com"
    +        )  # abs min
    +        self.consumer.authz_req = {}  # Store AuthzReq with state as key
    +        self.consumer.sdb[_state] = {"redirect_uris": []}
    +
    +        args = {
    +            "client_id": self.consumer.client_id,
    +            "response_type": self.consumer.consumer_config["response_type"],
    +            "scope": ["openid"],
    +            "nonce": "nonce",
    +        }
    +        token = IdToken(
    +            iss="https://example.com",
    +            aud="client_1",
    +            sub="some_sub",
    +            exp=1565348600,
    +            iat=1565348300,
    +            nonce="nonce",
    +            at_hash="aaa",
    +        )
    +        # Downgrade the algorithm to `none`
    +        location = (
    +            "https://example.com/cb?state=state0&access_token=token&token_type=bearer&"
    +            "scope=openid&id_token={}".format(
    +                token.to_jwt(key=KC_RSA.keys(), algorithm="none")
    +            )
    +        )
    +        with responses.RequestsMock() as rsps:
    +            rsps.add(
    +                responses.GET,
    +                "https://example.com/authorization",
    +                status=302,
    +                headers={"location": location},
    +            )
    +            result = self.consumer.do_authorization_request(
    +                state=_state, request_args=args
    +            )
    +            query = parse_qs(urlparse(result.request.url).query)
    +            assert query["client_id"] == ["client_1"]
    +            assert query["scope"] == ["openid"]
    +            assert query["response_type"] == ["code"]
    +            assert query["state"] == ["state0"]
    +            assert query["nonce"] == ["nonce"]
    +            assert query["redirect_uri"] == ["https://example.com/cb"]
    +
    +        parsed = urlparse(result.headers["location"])
    +
    +        with freeze_time("2019-08-09 11:00:00"):
    +            part = self.consumer.parse_authz(query=parsed.query)
    +        assert isinstance(part, tuple)
    +        auth = part[0]
    +        atr = part[1]
    +        idt = part[2]
    +
    +        assert isinstance(auth, AuthorizationResponse)
    +        assert isinstance(atr, AccessTokenResponse)
    +        assert _eq(
    +            atr.keys(), ["access_token", "id_token", "token_type", "state", "scope"]
    +        )
    +        assert isinstance(idt, IdToken)
    +
    +    def test_complete_auth_token_idtoken_none_cipher_token(self):
    +        _state = "state0"
    +        self.consumer.consumer_config["response_type"] = ["token"]
    +        self.consumer.registration_response = RegistrationResponse(
    +            id_token_signed_response_alg="none"
    +        )
    +        self.consumer.provider_info = ProviderConfigurationResponse(
    +            issuer="https://example.com"
    +        )  # abs min
    +        self.consumer.authz_req = {}  # Store AuthzReq with state as key
    +        self.consumer.sdb[_state] = {"redirect_uris": []}
    +
    +        args = {
    +            "client_id": self.consumer.client_id,
    +            "response_type": self.consumer.consumer_config["response_type"],
    +            "scope": ["openid"],
    +            "nonce": "nonce",
    +        }
    +        token = IdToken(
    +            iss="https://example.com",
    +            aud="client_1",
    +            sub="some_sub",
    +            exp=1565348600,
    +            iat=1565348300,
    +            nonce="nonce",
    +        )
    +        # Downgrade the algorithm to `none`
    +        location = (
    +            "https://example.com/cb?state=state0&access_token=token&token_type=bearer&"
    +            "scope=openid&id_token={}".format(
    +                token.to_jwt(key=KC_RSA.keys(), algorithm="none")
    +            )
    +        )
    +        with responses.RequestsMock() as rsps:
    +            rsps.add(
    +                responses.GET,
    +                "https://example.com/authorization",
    +                status=302,
    +                headers={"location": location},
    +            )
    +            result = self.consumer.do_authorization_request(
    +                state=_state, request_args=args
    +            )
    +            query = parse_qs(urlparse(result.request.url).query)
    +            assert query["client_id"] == ["client_1"]
    +            assert query["scope"] == ["openid"]
    +            assert query["response_type"] == ["token"]
    +            assert query["state"] == ["state0"]
    +            assert query["nonce"] == ["nonce"]
    +            assert query["redirect_uri"] == ["https://example.com/cb"]
    +
    +        parsed = urlparse(result.headers["location"])
     
             with freeze_time("2019-08-09 11:00:00"):
    -            self.consumer.verify_id_token(
    -                atr["id_token"], self.consumer.authz_req[atr["state"]]
    +            with pytest.raises(WrongSigningAlgorithm):
    +                self.consumer.parse_authz(query=parsed.query)
    +
    +    def test_complete_auth_token_idtoken_cipher_downgrade(self):
    +        _state = "state0"
    +        self.consumer.consumer_config["response_type"] = ["id_token", "token"]
    +        self.consumer.provider_info = ProviderConfigurationResponse(
    +            issuer="https://example.com"
    +        )  # abs min
    +        self.consumer.authz_req = {}  # Store AuthzReq with state as key
    +
    +        args = {
    +            "client_id": self.consumer.client_id,
    +            "response_type": self.consumer.consumer_config["response_type"],
    +            "scope": ["openid"],
    +            "nonce": "nonce",
    +        }
    +        token = IdToken(
    +            iss="https://example.com",
    +            aud="client_1",
    +            sub="some_sub",
    +            exp=1565348600,
    +            iat=1565348300,
    +            nonce="nonce",
    +        )
    +        # Downgrade the algorithm to `none`
    +        location = (
    +            "https://example.com/cb?state=state0&access_token=token&token_type=bearer&"
    +            "scope=openid&id_token={}".format(
    +                token.to_jwt(key=KC_RSA.keys(), algorithm="none")
    +            )
    +        )
    +        with responses.RequestsMock() as rsps:
    +            rsps.add(
    +                responses.GET,
    +                "https://example.com/authorization",
    +                status=302,
    +                headers={"location": location},
    +            )
    +            result = self.consumer.do_authorization_request(
    +                state=_state, request_args=args
                 )
    +            query = parse_qs(urlparse(result.request.url).query)
    +            assert query["client_id"] == ["client_1"]
    +            assert query["scope"] == ["openid"]
    +            assert query["response_type"] == ["id_token token"]
    +            assert query["state"] == ["state0"]
    +            assert query["nonce"] == ["nonce"]
    +            assert query["redirect_uri"] == ["https://example.com/cb"]
    +
    +        parsed = urlparse(result.headers["location"])
    +
    +        with freeze_time("2019-08-09 11:00:00"):
    +            with pytest.raises(WrongSigningAlgorithm):
    +                self.consumer.parse_authz(query=parsed.query)
     
         def test_userinfo(self):
             _state = "state0"
    @@ -920,6 +1160,7 @@ def test_get_session_management_id(self):
             self.consumer.sdb[_state] = {"redirect_uris": ["https://example.org/cb"]}
             resp = AuthorizationResponse(id_token=_signed_jwt, state=_state)
             self.consumer.consumer_config["response_type"] = ["id_token"]
    +        self.consumer.authz_req[_state] = AccessTokenRequest(nonce="KUEYfRM2VzKDaaKD")
             self.consumer.parse_authz(resp.to_urlencoded())
             assert self.consumer.sso_db["state"]["smid"] == smid
             assert session_get(self.consumer.sso_db, "smid", smid) == [_state]
    
  • tests/test_oic_message.py+66 0 modified
    @@ -5,6 +5,7 @@
     from urllib.parse import urlencode
     
     import pytest
    +from freezegun import freeze_time
     from jwkest import BadSignature
     from jwkest.jwk import SYMKey
     from jwkest.jws import left_hash
    @@ -24,7 +25,9 @@
     from oic.oic.message import BackChannelLogoutRequest
     from oic.oic.message import CHashError
     from oic.oic.message import Claims
    +from oic.oic.message import EXPError
     from oic.oic.message import FrontChannelLogoutRequest
    +from oic.oic.message import IATError
     from oic.oic.message import IdToken
     from oic.oic.message import LogoutToken
     from oic.oic.message import OpenIDSchema
    @@ -609,6 +612,69 @@ def test_token_type(self):
                 at.verify()
     
     
    +class TestIdToken(object):
    +    """Unittests for IdToken class."""
    +
    +    @freeze_time("2020-01-01 11:00:00")
    +    def test_verify_iat_in_future(self):
    +        now = time_util.utc_time_sans_frac()
    +
    +        idt = IdToken(
    +            **{
    +                "sub": "553df2bcf909104751cfd8b2",
    +                "aud": ["5542958437706128204e0000", "554295ce3770612820620000"],
    +                "auth_time": 1441364872,
    +                "azp": "554295ce3770612820620000",
    +                "at_hash": "L4Ign7TCAD_EppRbHAuCyw",
    +                "iat": now + 7200,
    +                "exp": now + 3600,
    +                "iss": "https://sso.qa.7pass.ctf.prosiebensat1.com",
    +            }
    +        )
    +
    +        with pytest.raises(IATError):
    +            idt.verify()
    +
    +    @freeze_time("2020-01-01 11:00:00")
    +    def test_verify_iat_in_future_expired(self):
    +        now = time_util.utc_time_sans_frac()
    +
    +        idt = IdToken(
    +            **{
    +                "sub": "553df2bcf909104751cfd8b2",
    +                "aud": ["5542958437706128204e0000", "554295ce3770612820620000"],
    +                "auth_time": 1441364872,
    +                "azp": "554295ce3770612820620000",
    +                "at_hash": "L4Ign7TCAD_EppRbHAuCyw",
    +                "iat": now + 3600,
    +                "exp": now,
    +                "iss": "https://sso.qa.7pass.ctf.prosiebensat1.com",
    +            }
    +        )
    +
    +        with pytest.raises(EXPError):
    +            idt.verify(skew=7200)
    +
    +    @freeze_time("2020-01-01 11:00:00")
    +    def test_verify_iat_in_future_skew(self):
    +        now = time_util.utc_time_sans_frac()
    +
    +        idt = IdToken(
    +            **{
    +                "sub": "553df2bcf909104751cfd8b2",
    +                "aud": ["5542958437706128204e0000", "554295ce3770612820620000"],
    +                "auth_time": 1441364872,
    +                "azp": "554295ce3770612820620000",
    +                "at_hash": "L4Ign7TCAD_EppRbHAuCyw",
    +                "iat": now + 7200,
    +                "exp": now + 7600,
    +                "iss": "https://sso.qa.7pass.ctf.prosiebensat1.com",
    +            }
    +        )
    +
    +        idt.verify(skew=7200)
    +
    +
     def test_id_token():
         _now = time_util.utc_time_sans_frac()
     
    
  • tests/test_oic_provider.py+7 2 modified
    @@ -38,6 +38,7 @@
     from oic.oic.message import IdToken
     from oic.oic.message import Message
     from oic.oic.message import OpenIDSchema
    +from oic.oic.message import ProviderConfigurationResponse
     from oic.oic.message import RefreshAccessTokenRequest
     from oic.oic.message import RegistrationRequest
     from oic.oic.message import RegistrationResponse
    @@ -224,6 +225,9 @@ def create_provider(self, session_db_factory):
             self.cons.keyjar.import_jwks(
                 self.provider.keyjar.export_jwks(), self.cons.issuer
             )
    +        self.cons.provider_info = ProviderConfigurationResponse(
    +            issuer=SERVER_INFO["issuer"]
    +        )
     
             self.cons2 = Consumer(
                 {}, CONSUMER_CONFIG.copy(), CLIENT_CONFIG_2, server_info=SERVER_INFO
    @@ -373,6 +377,7 @@ def test_authenticated(self):
     
             part = self.cons.parse_authz(query=resp.message)
     
    +        assert isinstance(part, tuple)
             aresp = part[0]
             assert part[1] is None
             assert part[2] is None
    @@ -410,9 +415,10 @@ def test_authenticated_hybrid(self):
     
             part = self.cons.parse_authz(resp.message)
     
    +        assert isinstance(part, tuple)
             aresp = part[0]
             assert part[1] is None
    -        assert part[2] is not None
    +        id_token = part[2]
     
             assert isinstance(aresp, AuthorizationResponse)
             assert _eq(aresp.keys(), ["scope", "state", "id_token", "client_id", "code"])
    @@ -421,7 +427,6 @@ def test_authenticated_hybrid(self):
                 self.cons.grant[_state].keys(),
                 ["code", "id_token", "tokens", "exp_in", "grant_expiration_time", "seed"],
             )
    -        id_token = part[2]
             assert isinstance(id_token, IdToken)
             assert _eq(
                 id_token.keys(),
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

8

News mentions

0

No linked articles in our index yet.