VYPR
High severityNVD Advisory· Published May 6, 2020· Updated Aug 4, 2024

CVE-2020-12689

CVE-2020-12689

Description

An issue was discovered in OpenStack Keystone before 15.0.1, and 16.0.0. Any user authenticated within a limited scope (trust/oauth/application credential) can create an EC2 credential with an escalated permission, such as obtaining admin while the user is on a limited viewer role. This potentially allows a malicious user to act as the admin on a project another user has the admin role on, which can effectively grant that user global admin privileges.

AI Insight

LLM-synthesized narrative grounded in this CVE's description and references.

Authenticated users with limited-scope tokens in OpenStack Keystone can create EC2 credentials with escalated privileges, potentially gaining global admin.

Vulnerability

Overview

CVE-2020-12689 is an authorization bypass vulnerability in OpenStack Keystone, the identity service used by OpenStack. The issue resides in the EC2 credentials API. A user authenticated with a limited-scope token—such as those obtained via trusts, OAuth, or application credentials—can create an EC2 credential for a project where they hold a minimal role (e.g., viewer). However, the API does not properly restrict the credential's permission level, allowing the user to specify a role higher than their own, such as admin [1][2]. This flaw is present in Keystone versions before 15.0.1 and in 16.0.0 [1].

Exploitation

An attacker must first authenticate to Keystone and obtain a scoped token within a trust, OAuth, or application credential context. No other privileges are required beyond initial authentication. Using that token, the attacker can call the EC2 credential creation endpoint and supply a role (like admin) that they do not hold on the targeted project. Keystone fails to validate that the role matches the user's effective role in that scope [2][3]. The attacker can then use the created EC2 credential to authenticate as that privileged role, effectively escalating their permissions.

Impact

Successful exploitation allows a low-privilege user to gain administrative control over a project that they were not authorized to administer. Since OpenStack projects are often isolated, this could enable unauthorized resource management, data access, or further lateral movement across the cloud infrastructure. If the attacker targets a project where another user has admin, they can impersonate that admin role, potentially leading to global admin privileges across multiple projects [1][2].

Mitigation

The vulnerability is fixed in Keystone 15.0.1 (Stein release) and was never present in 16.0.0 (Train) with the correct patch. The fix, introduced in commit 37e9907a176dad6843819b1bec4946c3aecc4548, ensures that the EC2 credential creation properly validates the requested role against the actual authorization context of the token, including trust, application credential, and OAuth scopes [4]. Operators are strongly advised to upgrade to the patched versions. No workaround is documented beyond applying the official patch.

AI Insight generated on May 21, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
keystonePyPI
< 15.0.115.0.1
keystonePyPI
>= 16.0.0, < 16.0.116.0.1

Affected products

2

Patches

1
37e9907a176d

Fix security issues with EC2 credentials

https://github.com/openstack/keystoneColleen MurphyApr 14, 2020via ghsa
8 files changed · +603 59
  • keystone/api/credentials.py+53 18 modified
    @@ -60,30 +60,41 @@ def _blob_to_json(ref):
                 ref['blob'] = jsonutils.dumps(blob)
             return ref
     
    -    def _assign_unique_id(self, ref, trust_id=None):
    +    def _validate_blob_json(self, ref):
    +        try:
    +            blob = jsonutils.loads(ref.get('blob'))
    +        except (ValueError, TabError):
    +            raise exception.ValidationError(
    +                message=_('Invalid blob in credential'))
    +        if not blob or not isinstance(blob, dict):
    +            raise exception.ValidationError(attribute='blob',
    +                                            target='credential')
    +        if blob.get('access') is None:
    +            raise exception.ValidationError(attribute='access',
    +                                            target='credential')
    +        return blob
    +
    +    def _assign_unique_id(
    +            self, ref, trust_id=None, app_cred_id=None, access_token_id=None):
             # Generates an assigns a unique identifier to a credential reference.
             if ref.get('type', '').lower() == 'ec2':
    -            try:
    -                blob = jsonutils.loads(ref.get('blob'))
    -            except (ValueError, TabError):
    -                raise exception.ValidationError(
    -                    message=_('Invalid blob in credential'))
    -            if not blob or not isinstance(blob, dict):
    -                raise exception.ValidationError(attribute='blob',
    -                                                target='credential')
    -            if blob.get('access') is None:
    -                raise exception.ValidationError(attribute='access',
    -                                                target='credential')
    -
    +            blob = self._validate_blob_json(ref)
                 ref = ref.copy()
                 ref['id'] = hashlib.sha256(
                     blob['access'].encode('utf8')).hexdigest()
    -            # update the blob with the trust_id, so credentials created with
    -            # a trust scoped token will result in trust scoped tokens when
    -            # authentication via ec2tokens happens
    +            # update the blob with the trust_id or app_cred_id, so credentials
    +            # created with a trust- or app cred-scoped token will result in
    +            # trust- or app cred-scoped tokens when authentication via
    +            # ec2tokens happens
                 if trust_id is not None:
                     blob['trust_id'] = trust_id
                     ref['blob'] = jsonutils.dumps(blob)
    +            if app_cred_id is not None:
    +                blob['app_cred_id'] = app_cred_id
    +                ref['blob'] = jsonutils.dumps(blob)
    +            if access_token_id is not None:
    +                blob['access_token_id'] = access_token_id
    +                ref['blob'] = jsonutils.dumps(blob)
                 return ref
             else:
                 return super(CredentialResource, self)._assign_unique_id(ref)
    @@ -146,23 +157,47 @@ def post(self):
             )
             validation.lazy_validate(schema.credential_create, credential)
             trust_id = getattr(self.oslo_context, 'trust_id', None)
    +        app_cred_id = getattr(
    +            self.auth_context['token'], 'application_credential_id', None)
    +        access_token_id = getattr(
    +            self.auth_context['token'], 'access_token_id', None)
             ref = self._assign_unique_id(
    -            self._normalize_dict(credential), trust_id=trust_id)
    +            self._normalize_dict(credential),
    +            trust_id=trust_id, app_cred_id=app_cred_id,
    +            access_token_id=access_token_id)
             ref = PROVIDERS.credential_api.create_credential(
                 ref['id'], ref, initiator=self.audit_initiator)
             return self.wrap_member(ref), http.client.CREATED
     
    +    def _validate_blob_update_keys(self, credential, ref):
    +        if credential.get('type', '').lower() == 'ec2':
    +            new_blob = self._validate_blob_json(ref)
    +            old_blob = credential.get('blob')
    +            if isinstance(old_blob, str):
    +                old_blob = jsonutils.loads(old_blob)
    +            # if there was a scope set, prevent changing it or unsetting it
    +            for key in ['trust_id', 'app_cred_id', 'access_token_id']:
    +                if old_blob.get(key) != new_blob.get(key):
    +                    message = _('%s can not be updated for credential') % key
    +                    raise exception.ValidationError(message=message)
    +
         def patch(self, credential_id):
             # Update Credential
             ENFORCER.enforce_call(
                 action='identity:update_credential',
                 build_target=_build_target_enforcement
             )
    -        PROVIDERS.credential_api.get_credential(credential_id)
    +        current = PROVIDERS.credential_api.get_credential(credential_id)
     
             credential = self.request_body_json.get('credential', {})
             validation.lazy_validate(schema.credential_update, credential)
    +        self._validate_blob_update_keys(current.copy(), credential.copy())
             self._require_matching_id(credential)
    +        # Check that the user hasn't illegally modified the owner or scope
    +        target = {'credential': dict(current, **credential)}
    +        ENFORCER.enforce_call(
    +            action='identity:update_credential', target_attr=target
    +        )
             ref = PROVIDERS.credential_api.update_credential(
                 credential_id, credential)
             return self.wrap_member(ref)
    
  • keystone/api/_shared/EC2_S3_Resource.py+40 5 modified
    @@ -113,7 +113,9 @@ def handle_authenticate(self):
                 project_id=cred.get('project_id'),
                 access=loaded.get('access'),
                 secret=loaded.get('secret'),
    -            trust_id=loaded.get('trust_id')
    +            trust_id=loaded.get('trust_id'),
    +            app_cred_id=loaded.get('app_cred_id'),
    +            access_token_id=loaded.get('access_token_id')
             )
     
             # validate the signature
    @@ -132,8 +134,34 @@ def handle_authenticate(self):
                 raise ks_exceptions.Unauthorized from e
     
             self._check_timestamp(credentials)
    -        roles = PROVIDERS.assignment_api.get_roles_for_user_and_project(
    -            user_ref['id'], project_ref['id'])
    +
    +        trustee_user_id = None
    +        auth_context = None
    +        if cred_data['trust_id']:
    +            trust = PROVIDERS.trust_api.get_trust(cred_data['trust_id'])
    +            roles = [r['id'] for r in trust['roles']]
    +            # NOTE(cmurphy): if this credential was created using a
    +            # trust-scoped token with impersonation, the user_id will be for
    +            # the trustor, not the trustee. In this case, issuing a
    +            # trust-scoped token to the trustor will fail. In order to get a
    +            # trust-scoped token, use the user ID of the trustee. With
    +            # impersonation, the resulting token will still be for the trustor.
    +            # Without impersonation, the token will be for the trustee.
    +            if trust['impersonation'] is True:
    +                trustee_user_id = trust['trustee_user_id']
    +        elif cred_data['app_cred_id']:
    +            ac_client = PROVIDERS.application_credential_api
    +            app_cred = ac_client.get_application_credential(
    +                cred_data['app_cred_id'])
    +            roles = [r['id'] for r in app_cred['roles']]
    +        elif cred_data['access_token_id']:
    +            access_token = PROVIDERS.oauth_api.get_access_token(
    +                cred_data['access_token_id'])
    +            roles = jsonutils.loads(access_token['role_ids'])
    +            auth_context = {'access_token_id': cred_data['access_token_id']}
    +        else:
    +            roles = PROVIDERS.assignment_api.get_roles_for_user_and_project(
    +                user_ref['id'], project_ref['id'])
     
             if not roles:
                 raise ks_exceptions.Unauthorized(_('User not valid for project.'))
    @@ -144,7 +172,14 @@ def handle_authenticate(self):
     
             method_names = ['ec2credential']
     
    +        if trustee_user_id:
    +            user_id = trustee_user_id
    +        else:
    +            user_id = user_ref['id']
             token = PROVIDERS.token_provider_api.issue_token(
    -            user_id=user_ref['id'], method_names=method_names,
    -            project_id=project_ref['id'])
    +            user_id=user_id, method_names=method_names,
    +            project_id=project_ref['id'],
    +            trust_id=cred_data['trust_id'],
    +            app_cred_id=cred_data['app_cred_id'],
    +            auth_context=auth_context)
             return token
    
  • keystone/api/users.py+20 2 modified
    @@ -559,6 +559,25 @@ def _normalize_role_list(app_cred_roles):
                         role['name']))
             return roles
     
    +    def _get_roles(self, app_cred_data, token):
    +        if app_cred_data.get('roles'):
    +            roles = self._normalize_role_list(app_cred_data['roles'])
    +            # NOTE(cmurphy): The user is not allowed to add a role that is not
    +            # in their token. This is to prevent trustees or application
    +            # credential users from escallating their privileges to include
    +            # additional roles that the trustor or application credential
    +            # creator has assigned on the project.
    +            token_roles = [r['id'] for r in token.roles]
    +            for role in roles:
    +                if role['id'] not in token_roles:
    +                    detail = _('Cannot create an application credential with '
    +                               'unassigned role')
    +                    raise ks_exception.ApplicationCredentialValidationError(
    +                        detail=detail)
    +        else:
    +            roles = token.roles
    +        return roles
    +
         def get(self, user_id):
             """List application credentials for user.
     
    @@ -594,8 +613,7 @@ def post(self, user_id):
                 app_cred_data['secret'] = self._generate_secret()
             app_cred_data['user_id'] = user_id
             app_cred_data['project_id'] = project_id
    -        app_cred_data['roles'] = self._normalize_role_list(
    -            app_cred_data.get('roles', token.roles))
    +        app_cred_data['roles'] = self._get_roles(app_cred_data, token)
             if app_cred_data.get('expires_at'):
                 app_cred_data['expires_at'] = utils.parse_expiration_date(
                     app_cred_data['expires_at'])
    
  • keystone/tests/unit/test_v3_application_credential.py+31 0 modified
    @@ -174,6 +174,37 @@ def test_create_application_credential_with_application_credential(self):
                        expected_status_code=http.client.FORBIDDEN,
                        headers={'X-Auth-Token': token})
     
    +    def test_create_application_credential_with_trust(self):
    +        second_role = unit.new_role_ref(name='reader')
    +        PROVIDERS.role_api.create_role(second_role['id'], second_role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            self.user_id, self.project_id, second_role['id'])
    +        with self.test_client() as c:
    +            pw_token = self.get_scoped_token()
    +            # create a self-trust - only the roles are important for this test
    +            trust_ref = unit.new_trust_ref(
    +                trustor_user_id=self.user_id,
    +                trustee_user_id=self.user_id,
    +                project_id=self.project_id,
    +                role_ids=[second_role['id']])
    +            resp = c.post('/v3/OS-TRUST/trusts',
    +                          headers={'X-Auth-Token': pw_token},
    +                          json={'trust': trust_ref})
    +            trust_id = resp.json['trust']['id']
    +            trust_auth = self.build_authentication_request(
    +                user_id=self.user_id,
    +                password=self.user['password'],
    +                trust_id=trust_id)
    +            trust_token = self.v3_create_token(
    +                trust_auth).headers['X-Subject-Token']
    +            app_cred = self._app_cred_body(roles=[{'id': self.role_id}])
    +            # only the roles from the trust token should be allowed, even if
    +            # the user has the role assigned on the project
    +            c.post('/v3/users/%s/application_credentials' % self.user_id,
    +                   headers={'X-Auth-Token': trust_token},
    +                   json=app_cred,
    +                   expected_status_code=http.client.BAD_REQUEST)
    +
         def test_create_application_credential_allow_recursion(self):
             with self.test_client() as c:
                 roles = [{'id': self.role_id}]
    
  • keystone/tests/unit/test_v3_credential.py+393 34 modified
    @@ -21,12 +21,14 @@
     from keystoneclient.contrib.ec2 import utils as ec2_utils
     from oslo_db import exception as oslo_db_exception
     from testtools import matchers
    +import urllib
     
     from keystone.api import ec2tokens
     from keystone.common import provider_api
     from keystone.common import utils
     from keystone.credential.providers import fernet as credential_fernet
     from keystone import exception
    +from keystone import oauth1
     from keystone.tests import unit
     from keystone.tests.unit import ksfixtures
     from keystone.tests.unit import test_v3
    @@ -63,6 +65,33 @@ def _create_dict_blob_credential(self):
     
             return json.dumps(blob), credential_id
     
    +    def _test_get_token(self, access, secret):
    +        """Test signature validation with the access/secret provided."""
    +        signer = ec2_utils.Ec2Signer(secret)
    +        params = {'SignatureMethod': 'HmacSHA256',
    +                  'SignatureVersion': '2',
    +                  'AWSAccessKeyId': access}
    +        request = {'host': 'foo',
    +                   'verb': 'GET',
    +                   'path': '/bar',
    +                   'params': params}
    +        signature = signer.generate(request)
    +
    +        # Now make a request to validate the signed dummy request via the
    +        # ec2tokens API.  This proves the v3 ec2 credentials actually work.
    +        sig_ref = {'access': access,
    +                   'signature': signature,
    +                   'host': 'foo',
    +                   'verb': 'GET',
    +                   'path': '/bar',
    +                   'params': params}
    +        r = self.post(
    +            '/ec2tokens',
    +            body={'ec2Credentials': sig_ref},
    +            expected_status=http.client.OK)
    +        self.assertValidTokenResponse(r)
    +        return r.result['token']
    +
     
     class CredentialTestCase(CredentialBaseTestCase):
         """Test credential CRUD."""
    @@ -258,6 +287,126 @@ def test_update_credential_to_ec2_with_previously_set_project_id(self):
                     'credential_id': credential_id},
                 body={'credential': update_ref})
     
    +    def test_update_credential_non_owner(self):
    +        """Call ``PATCH /credentials/{credential_id}``."""
    +        alt_user = unit.create_user(
    +            PROVIDERS.identity_api, domain_id=self.domain_id)
    +        alt_user_id = alt_user['id']
    +        alt_project = unit.new_project_ref(domain_id=self.domain_id)
    +        alt_project_id = alt_project['id']
    +        PROVIDERS.resource_api.create_project(
    +            alt_project['id'], alt_project)
    +        alt_role = unit.new_role_ref(name='reader')
    +        alt_role_id = alt_role['id']
    +        PROVIDERS.role_api.create_role(alt_role_id, alt_role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            alt_user_id, alt_project_id, alt_role_id)
    +        auth = self.build_authentication_request(
    +            user_id=alt_user_id,
    +            password=alt_user['password'],
    +            project_id=alt_project_id)
    +        ref = unit.new_credential_ref(user_id=alt_user_id,
    +                                      project_id=alt_project_id)
    +        r = self.post(
    +            '/credentials',
    +            auth=auth,
    +            body={'credential': ref})
    +        self.assertValidCredentialResponse(r, ref)
    +        credential_id = r.result.get('credential')['id']
    +
    +        # Cannot change the credential to be owned by another user
    +        update_ref = {'user_id': self.user_id, 'project_id': self.project_id}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            expected_status=403,
    +            auth=auth,
    +            body={'credential': update_ref})
    +
    +    def test_update_ec2_credential_change_trust_id(self):
    +        """Call ``PATCH /credentials/{credential_id}``."""
    +        blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
    +                                            project_id=self.project_id)
    +        blob['trust_id'] = uuid.uuid4().hex
    +        ref['blob'] = json.dumps(blob)
    +        r = self.post(
    +            '/credentials',
    +            body={'credential': ref})
    +        self.assertValidCredentialResponse(r, ref)
    +        credential_id = r.result.get('credential')['id']
    +        # Try changing to a different trust
    +        blob['trust_id'] = uuid.uuid4().hex
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http.client.BAD_REQUEST)
    +        # Try removing the trust
    +        del blob['trust_id']
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http.client.BAD_REQUEST)
    +
    +    def test_update_ec2_credential_change_app_cred_id(self):
    +        """Call ``PATCH /credentials/{credential_id}``."""
    +        blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
    +                                            project_id=self.project_id)
    +        blob['app_cred_id'] = uuid.uuid4().hex
    +        ref['blob'] = json.dumps(blob)
    +        r = self.post(
    +            '/credentials',
    +            body={'credential': ref})
    +        self.assertValidCredentialResponse(r, ref)
    +        credential_id = r.result.get('credential')['id']
    +        # Try changing to a different app cred
    +        blob['app_cred_id'] = uuid.uuid4().hex
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http.client.BAD_REQUEST)
    +        # Try removing the app cred
    +        del blob['app_cred_id']
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http.client.BAD_REQUEST)
    +
    +    def test_update_ec2_credential_change_access_token_id(self):
    +        """Call ``PATCH /credentials/{credential_id}``."""
    +        blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
    +                                            project_id=self.project_id)
    +        blob['access_token_id'] = uuid.uuid4().hex
    +        ref['blob'] = json.dumps(blob)
    +        r = self.post(
    +            '/credentials',
    +            body={'credential': ref})
    +        self.assertValidCredentialResponse(r, ref)
    +        credential_id = r.result.get('credential')['id']
    +        # Try changing to a different access token
    +        blob['access_token_id'] = uuid.uuid4().hex
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http.client.BAD_REQUEST)
    +        # Try removing the access token
    +        del blob['access_token_id']
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http.client.BAD_REQUEST)
    +
         def test_delete_credential(self):
             """Call ``DELETE /credentials/{credential_id}``."""
             self.delete(
    @@ -393,7 +542,7 @@ def test_create_credential_with_admin_token(self):
             self.assertValidCredentialResponse(r, ref)
     
     
    -class TestCredentialTrustScoped(test_v3.RestfulTestCase):
    +class TestCredentialTrustScoped(CredentialBaseTestCase):
         """Test credential with trust scoped token."""
     
         def setUp(self):
    @@ -446,7 +595,7 @@ def test_trust_scoped_ec2_credential(self):
             token_id = r.headers.get('X-Subject-Token')
     
             # Create the credential with the trust scoped token
    -        blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
    +        blob, ref = unit.new_ec2_credential(user_id=self.user_id,
                                                 project_id=self.project_id)
             r = self.post('/credentials', body={'credential': ref}, token=token_id)
     
    @@ -463,6 +612,21 @@ def test_trust_scoped_ec2_credential(self):
             self.assertEqual(hashlib.sha256(access).hexdigest(),
                              r.result['credential']['id'])
     
    +        # Create a role assignment to ensure that it is ignored and only the
    +        # trust-delegated roles are used
    +        role = unit.new_role_ref(name='reader')
    +        role_id = role['id']
    +        PROVIDERS.role_api.create_role(role_id, role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            self.user_id, self.project_id, role_id)
    +
    +        ret_blob = json.loads(r.result['credential']['blob'])
    +        ec2token = self._test_get_token(
    +            access=ret_blob['access'], secret=ret_blob['secret'])
    +        ec2_roles = [role['id'] for role in ec2token['roles']]
    +        self.assertIn(self.role_id, ec2_roles)
    +        self.assertNotIn(role_id, ec2_roles)
    +
             # Create second ec2 credential with the same access key id and check
             # for conflict.
             self.post(
    @@ -472,34 +636,229 @@ def test_trust_scoped_ec2_credential(self):
                 expected_status=http.client.CONFLICT)
     
     
    -class TestCredentialEc2(CredentialBaseTestCase):
    -    """Test v3 credential compatibility with ec2tokens."""
    +class TestCredentialAppCreds(CredentialBaseTestCase):
    +    """Test credential with application credential token."""
     
    -    def _validate_signature(self, access, secret):
    -        """Test signature validation with the access/secret provided."""
    -        signer = ec2_utils.Ec2Signer(secret)
    -        params = {'SignatureMethod': 'HmacSHA256',
    -                  'SignatureVersion': '2',
    -                  'AWSAccessKeyId': access}
    -        request = {'host': 'foo',
    -                   'verb': 'GET',
    -                   'path': '/bar',
    -                   'params': params}
    -        signature = signer.generate(request)
    +    def setUp(self):
    +        super(TestCredentialAppCreds, self).setUp()
    +        self.useFixture(
    +            ksfixtures.KeyRepository(
    +                self.config_fixture,
    +                'credential',
    +                credential_fernet.MAX_ACTIVE_KEYS
    +            )
    +        )
     
    -        # Now make a request to validate the signed dummy request via the
    -        # ec2tokens API.  This proves the v3 ec2 credentials actually work.
    -        sig_ref = {'access': access,
    -                   'signature': signature,
    -                   'host': 'foo',
    -                   'verb': 'GET',
    -                   'path': '/bar',
    -                   'params': params}
    -        r = self.post(
    -            '/ec2tokens',
    -            body={'ec2Credentials': sig_ref},
    -            expected_status=http.client.OK)
    -        self.assertValidTokenResponse(r)
    +    def test_app_cred_ec2_credential(self):
    +        """Test creating ec2 credential from an application credential.
    +
    +        Call ``POST /credentials``.
    +        """
    +        # Create the app cred
    +        ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}])
    +        del ref['id']
    +        r = self.post('/users/%s/application_credentials' % self.user_id,
    +                      body={'application_credential': ref})
    +        app_cred = r.result['application_credential']
    +
    +        # Get an application credential token
    +        auth_data = self.build_authentication_request(
    +            app_cred_id=app_cred['id'],
    +            secret=app_cred['secret'])
    +        r = self.v3_create_token(auth_data)
    +        token_id = r.headers.get('X-Subject-Token')
    +
    +        # Create the credential with the app cred token
    +        blob, ref = unit.new_ec2_credential(user_id=self.user_id,
    +                                            project_id=self.project_id)
    +        r = self.post('/credentials', body={'credential': ref}, token=token_id)
    +
    +        # We expect the response blob to contain the app_cred_id
    +        ret_ref = ref.copy()
    +        ret_blob = blob.copy()
    +        ret_blob['app_cred_id'] = app_cred['id']
    +        ret_ref['blob'] = json.dumps(ret_blob)
    +        self.assertValidCredentialResponse(r, ref=ret_ref)
    +
    +        # Assert credential id is same as hash of access key id for
    +        # ec2 credentials
    +        access = blob['access'].encode('utf-8')
    +        self.assertEqual(hashlib.sha256(access).hexdigest(),
    +                         r.result['credential']['id'])
    +
    +        # Create a role assignment to ensure that it is ignored and only the
    +        # roles in the app cred are used
    +        role = unit.new_role_ref(name='reader')
    +        role_id = role['id']
    +        PROVIDERS.role_api.create_role(role_id, role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            self.user_id, self.project_id, role_id)
    +
    +        ret_blob = json.loads(r.result['credential']['blob'])
    +        ec2token = self._test_get_token(
    +            access=ret_blob['access'], secret=ret_blob['secret'])
    +        ec2_roles = [role['id'] for role in ec2token['roles']]
    +        self.assertIn(self.role_id, ec2_roles)
    +        self.assertNotIn(role_id, ec2_roles)
    +
    +        # Create second ec2 credential with the same access key id and check
    +        # for conflict.
    +        self.post(
    +            '/credentials',
    +            body={'credential': ref},
    +            token=token_id,
    +            expected_status=http.client.CONFLICT)
    +
    +
    +class TestCredentialAccessToken(CredentialBaseTestCase):
    +    """Test credential with access token."""
    +
    +    def setUp(self):
    +        super(TestCredentialAccessToken, self).setUp()
    +        self.useFixture(
    +            ksfixtures.KeyRepository(
    +                self.config_fixture,
    +                'credential',
    +                credential_fernet.MAX_ACTIVE_KEYS
    +            )
    +        )
    +        self.base_url = 'http://localhost/v3'
    +
    +    def _urllib_parse_qs_text_keys(self, content):
    +        results = urllib.parse.parse_qs(content)
    +        return {key.decode('utf-8'): value for key, value in results.items()}
    +
    +    def _create_single_consumer(self):
    +        endpoint = '/OS-OAUTH1/consumers'
    +
    +        ref = {'description': uuid.uuid4().hex}
    +        resp = self.post(endpoint, body={'consumer': ref})
    +        return resp.result['consumer']
    +
    +    def _create_request_token(self, consumer, project_id, base_url=None):
    +        endpoint = '/OS-OAUTH1/request_token'
    +        client = oauth1.Client(consumer['key'],
    +                               client_secret=consumer['secret'],
    +                               signature_method=oauth1.SIG_HMAC,
    +                               callback_uri="oob")
    +        headers = {'requested_project_id': project_id}
    +        if not base_url:
    +            base_url = self.base_url
    +        url, headers, body = client.sign(base_url + endpoint,
    +                                         http_method='POST',
    +                                         headers=headers)
    +        return endpoint, headers
    +
    +    def _create_access_token(self, consumer, token, base_url=None):
    +        endpoint = '/OS-OAUTH1/access_token'
    +        client = oauth1.Client(consumer['key'],
    +                               client_secret=consumer['secret'],
    +                               resource_owner_key=token.key,
    +                               resource_owner_secret=token.secret,
    +                               signature_method=oauth1.SIG_HMAC,
    +                               verifier=token.verifier)
    +        if not base_url:
    +            base_url = self.base_url
    +        url, headers, body = client.sign(base_url + endpoint,
    +                                         http_method='POST')
    +        headers.update({'Content-Type': 'application/json'})
    +        return endpoint, headers
    +
    +    def _get_oauth_token(self, consumer, token):
    +        client = oauth1.Client(consumer['key'],
    +                               client_secret=consumer['secret'],
    +                               resource_owner_key=token.key,
    +                               resource_owner_secret=token.secret,
    +                               signature_method=oauth1.SIG_HMAC)
    +        endpoint = '/auth/tokens'
    +        url, headers, body = client.sign(self.base_url + endpoint,
    +                                         http_method='POST')
    +        headers.update({'Content-Type': 'application/json'})
    +        ref = {'auth': {'identity': {'oauth1': {}, 'methods': ['oauth1']}}}
    +        return endpoint, headers, ref
    +
    +    def _authorize_request_token(self, request_id):
    +        if isinstance(request_id, bytes):
    +            request_id = request_id.decode()
    +        return '/OS-OAUTH1/authorize/%s' % (request_id)
    +
    +    def _get_access_token(self):
    +        consumer = self._create_single_consumer()
    +        consumer_id = consumer['id']
    +        consumer_secret = consumer['secret']
    +        consumer = {'key': consumer_id, 'secret': consumer_secret}
    +
    +        url, headers = self._create_request_token(consumer, self.project_id)
    +        content = self.post(
    +            url, headers=headers,
    +            response_content_type='application/x-www-form-urlencoded')
    +        credentials = self._urllib_parse_qs_text_keys(content.result)
    +        request_key = credentials['oauth_token'][0]
    +        request_secret = credentials['oauth_token_secret'][0]
    +        request_token = oauth1.Token(request_key, request_secret)
    +
    +        url = self._authorize_request_token(request_key)
    +        body = {'roles': [{'id': self.role_id}]}
    +        resp = self.put(url, body=body, expected_status=http.client.OK)
    +        verifier = resp.result['token']['oauth_verifier']
    +
    +        request_token.set_verifier(verifier)
    +        url, headers = self._create_access_token(consumer, request_token)
    +        content = self.post(
    +            url, headers=headers,
    +            response_content_type='application/x-www-form-urlencoded')
    +        credentials = self._urllib_parse_qs_text_keys(content.result)
    +        access_key = credentials['oauth_token'][0]
    +        access_secret = credentials['oauth_token_secret'][0]
    +        access_token = oauth1.Token(access_key, access_secret)
    +
    +        url, headers, body = self._get_oauth_token(consumer, access_token)
    +        content = self.post(url, headers=headers, body=body)
    +        return access_key, content.headers['X-Subject-Token']
    +
    +    def test_access_token_ec2_credential(self):
    +        """Test creating ec2 credential from an oauth access token.
    +
    +        Call ``POST /credentials``.
    +        """
    +        access_key, token_id = self._get_access_token()
    +
    +        # Create the credential with the access token
    +        blob, ref = unit.new_ec2_credential(user_id=self.user_id,
    +                                            project_id=self.project_id)
    +        r = self.post('/credentials', body={'credential': ref}, token=token_id)
    +
    +        # We expect the response blob to contain the access_token_id
    +        ret_ref = ref.copy()
    +        ret_blob = blob.copy()
    +        ret_blob['access_token_id'] = access_key.decode('utf-8')
    +        ret_ref['blob'] = json.dumps(ret_blob)
    +        self.assertValidCredentialResponse(r, ref=ret_ref)
    +
    +        # Assert credential id is same as hash of access key id for
    +        # ec2 credentials
    +        access = blob['access'].encode('utf-8')
    +        self.assertEqual(hashlib.sha256(access).hexdigest(),
    +                         r.result['credential']['id'])
    +
    +        # Create a role assignment to ensure that it is ignored and only the
    +        # roles in the access token are used
    +        role = unit.new_role_ref(name='reader')
    +        role_id = role['id']
    +        PROVIDERS.role_api.create_role(role_id, role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            self.user_id, self.project_id, role_id)
    +
    +        ret_blob = json.loads(r.result['credential']['blob'])
    +        ec2token = self._test_get_token(
    +            access=ret_blob['access'], secret=ret_blob['secret'])
    +        ec2_roles = [role['id'] for role in ec2token['roles']]
    +        self.assertIn(self.role_id, ec2_roles)
    +        self.assertNotIn(role_id, ec2_roles)
    +
    +
    +class TestCredentialEc2(CredentialBaseTestCase):
    +    """Test v3 credential compatibility with ec2tokens."""
     
         def test_ec2_credential_signature_validate(self):
             """Test signature validation with a v3 ec2 credential."""
    @@ -514,15 +873,15 @@ def test_ec2_credential_signature_validate(self):
     
             cred_blob = json.loads(r.result['credential']['blob'])
             self.assertEqual(blob, cred_blob)
    -        self._validate_signature(access=cred_blob['access'],
    -                                 secret=cred_blob['secret'])
    +        self._test_get_token(access=cred_blob['access'],
    +                             secret=cred_blob['secret'])
     
         def test_ec2_credential_signature_validate_legacy(self):
             """Test signature validation with a legacy v3 ec2 credential."""
             cred_json, _ = self._create_dict_blob_credential()
             cred_blob = json.loads(cred_json)
    -        self._validate_signature(access=cred_blob['access'],
    -                                 secret=cred_blob['secret'])
    +        self._test_get_token(access=cred_blob['access'],
    +                             secret=cred_blob['secret'])
     
         def _get_ec2_cred_uri(self):
             return '/users/%s/credentials/OS-EC2' % self.user_id
    @@ -538,8 +897,8 @@ def test_ec2_create_credential(self):
             self.assertEqual(self.user_id, ec2_cred['user_id'])
             self.assertEqual(self.project_id, ec2_cred['tenant_id'])
             self.assertIsNone(ec2_cred['trust_id'])
    -        self._validate_signature(access=ec2_cred['access'],
    -                                 secret=ec2_cred['secret'])
    +        self._test_get_token(access=ec2_cred['access'],
    +                             secret=ec2_cred['secret'])
             uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])
             self.assertThat(ec2_cred['links']['self'],
                             matchers.EndsWith(uri))
    
  • releasenotes/notes/bug-1872733-2377f456a57ad32c.yaml+16 0 added
    @@ -0,0 +1,16 @@
    +---
    +critical:
    +  - |
    +    [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_]
    +    Fixed a critical security issue in which an authenticated user could
    +    escalate their privileges by altering a valid EC2 credential.
    +security:
    +  - |
    +    [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_]
    +    Fixed a critical security issue in which an authenticated user could
    +    escalate their privileges by altering a valid EC2 credential.
    +fixes:
    +  - |
    +    [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_]
    +    Fixed a critical security issue in which an authenticated user could
    +    escalate their privileges by altering a valid EC2 credential.
    
  • releasenotes/notes/bug-1872735-0989e51d2248ce1e.yaml+31 0 added
    @@ -0,0 +1,31 @@
    +---
    +critical:
    +  - |
    +    [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_]
    +    Fixed a security issue in which a trustee or an application credential user
    +    could create an EC2 credential or an application credential that would
    +    permit them to get a token that elevated their role assignments beyond the
    +    subset delegated to them in the trust or application credential. A new
    +    attribute ``app_cred_id`` is now automatically added to the access blob of
    +    an EC2 credential and the role list in the trust or application credential
    +    is respected.
    +security:
    +  - |
    +    [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_]
    +    Fixed a security issue in which a trustee or an application credential user
    +    could create an EC2 credential or an application credential that would
    +    permit them to get a token that elevated their role assignments beyond the
    +    subset delegated to them in the trust or application credential. A new
    +    attribute ``app_cred_id`` is now automatically added to the access blob of
    +    an EC2 credential and the role list in the trust or application credential
    +    is respected.
    +fixes:
    +  - |
    +    [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_]
    +    Fixed a security issue in which a trustee or an application credential user
    +    could create an EC2 credential or an application credential that would
    +    permit them to get a token that elevated their role assignments beyond the
    +    subset delegated to them in the trust or application credential. A new
    +    attribute ``app_cred_id`` is now automatically added to the access blob of
    +    an EC2 credential and the role list in the trust or application credential
    +    is respected.
    
  • releasenotes/notes/bug-1872755-2c81d3267b89f124.yaml+19 0 added
    @@ -0,0 +1,19 @@
    +---
    +security:
    +  - |
    +    [`bug 1872755 <https://bugs.launchpad.net/keystone/+bug/1872755>`_]
    +    Added validation to the EC2 credentials update API to ensure the metadata
    +    labels 'trust_id' and 'app_cred_id' are not altered by the user. These
    +    labels are used by keystone to determine the scope allowed by the
    +    credential, and altering these automatic labels could enable an EC2
    +    credential holder to elevate their access beyond what is permitted by the
    +    application credential or trust that was used to create the EC2 credential.
    +fixes:
    +  - |
    +    [`bug 1872755 <https://bugs.launchpad.net/keystone/+bug/1872755>`_]
    +    Added validation to the EC2 credentials update API to ensure the metadata
    +    labels 'trust_id' and 'app_cred_id' are not altered by the user. These
    +    labels are used by keystone to determine the scope allowed by the
    +    credential, and altering these automatic labels could enable an EC2
    +    credential holder to elevate their access beyond what is permitted by the
    +    application credential or trust that was used to create the EC2 credential.
    

Vulnerability mechanics

Generated on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

12

News mentions

0

No linked articles in our index yet.