VYPR
High severityNVD Advisory· Published May 6, 2020· Updated Aug 4, 2024

CVE-2020-12691

CVE-2020-12691

Description

An issue was discovered in OpenStack Keystone before 15.0.1, and 16.0.0. Any authenticated user can create an EC2 credential for themselves for a project that they have a specified role on, and then perform an update to the credential user and project, allowing them to masquerade as another user. This potentially allows a malicious user to act as the admin on a project another user has the admin role on, which can effectively grant that user global admin privileges.

AI Insight

LLM-synthesized narrative grounded in this CVE's description and references.

In OpenStack Keystone, an authenticated user can create an EC2 credential and modify its owner to impersonate another user, potentially gaining global admin privileges.

Vulnerability

CVE-2020-12691 is an authorization bypass in OpenStack Keystone's EC2 credentials API. An authenticated user can create an EC2 credential for a project they have a role on, then perform an update to change the credential's user and project fields, effectively masquerading as another user [1][2]. The root cause is insufficient validation of credential owner modification, allowing the attacker to reassign the credential to a different user after creation.

Exploitation

Exploitation requires an authenticated user with any role on a target project. The attacker first creates an EC2 credential for themselves for that project, then sends an update request to the credential endpoint to change the user and project fields to those of another user (e.g., an admin). No additional privileges or authentication tokens beyond the initial scoped context are needed [1]. The vulnerability was present in Keystone versions before 15.0.1 and in 16.0.0.

Impact

A successful attack allows the malicious user to impersonate any other user, potentially gaining that user's roles and permissions. If the impersonated target is an admin on a project, the attacker can effectively gain global admin privileges across the OpenStack deployment [1][2]. This undermines the entire trust model of Keystone's identity service.

Mitigation

The vulnerability is fixed in Keystone 15.0.1 and all later releases. Patches were merged into the stable/stein and stable/train branches (commits 95b2bbe and 40cbb7b) [3][4]. Operators should upgrade to a patched version or apply the relevant patches immediately.

AI Insight generated on May 21, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
keystonePyPI
< 15.0.115.0.1
keystonePyPI
>= 16.0.0, < 16.0.116.0.1

Affected products

2

Patches

3
95b2bbeab113

Merge "Fix security issues with EC2 credentials" into stable/stein

8 files changed · +608 62
  • keystone/api/credentials.py+56 20 modified
    @@ -13,6 +13,7 @@
     # This file handles all flask-restful resources for /v3/credentials
     
     import hashlib
    +import six
     
     import flask
     from oslo_serialization import jsonutils
    @@ -60,30 +61,41 @@ def _blob_to_json(ref):
                 ref['blob'] = jsonutils.dumps(blob)
             return ref
     
    -    def _assign_unique_id(self, ref, trust_id=None):
    +    def _validate_blob_json(self, ref):
    +        try:
    +            blob = jsonutils.loads(ref.get('blob'))
    +        except (ValueError, TabError):
    +            raise exception.ValidationError(
    +                message=_('Invalid blob in credential'))
    +        if not blob or not isinstance(blob, dict):
    +            raise exception.ValidationError(attribute='blob',
    +                                            target='credential')
    +        if blob.get('access') is None:
    +            raise exception.ValidationError(attribute='access',
    +                                            target='credential')
    +        return blob
    +
    +    def _assign_unique_id(
    +            self, ref, trust_id=None, app_cred_id=None, access_token_id=None):
             # Generates an assigns a unique identifier to a credential reference.
             if ref.get('type', '').lower() == 'ec2':
    -            try:
    -                blob = jsonutils.loads(ref.get('blob'))
    -            except (ValueError, TabError):
    -                raise exception.ValidationError(
    -                    message=_('Invalid blob in credential'))
    -            if not blob or not isinstance(blob, dict):
    -                raise exception.ValidationError(attribute='blob',
    -                                                target='credential')
    -            if blob.get('access') is None:
    -                raise exception.ValidationError(attribute='access',
    -                                                target='credential')
    -
    +            blob = self._validate_blob_json(ref)
                 ref = ref.copy()
                 ref['id'] = hashlib.sha256(
                     blob['access'].encode('utf8')).hexdigest()
    -            # update the blob with the trust_id, so credentials created with
    -            # a trust scoped token will result in trust scoped tokens when
    -            # authentication via ec2tokens happens
    +            # update the blob with the trust_id or app_cred_id, so credentials
    +            # created with a trust- or app cred-scoped token will result in
    +            # trust- or app cred-scoped tokens when authentication via
    +            # ec2tokens happens
                 if trust_id is not None:
                     blob['trust_id'] = trust_id
                     ref['blob'] = jsonutils.dumps(blob)
    +            if app_cred_id is not None:
    +                blob['app_cred_id'] = app_cred_id
    +                ref['blob'] = jsonutils.dumps(blob)
    +            if access_token_id is not None:
    +                blob['access_token_id'] = access_token_id
    +                ref['blob'] = jsonutils.dumps(blob)
                 return ref
             else:
                 return super(CredentialResource, self)._assign_unique_id(ref)
    @@ -146,23 +158,47 @@ def post(self):
             )
             validation.lazy_validate(schema.credential_create, credential)
             trust_id = getattr(self.oslo_context, 'trust_id', None)
    +        app_cred_id = getattr(
    +            self.auth_context['token'], 'application_credential_id', None)
    +        access_token_id = getattr(
    +            self.auth_context['token'], 'access_token_id', None)
             ref = self._assign_unique_id(
    -            self._normalize_dict(credential), trust_id=trust_id)
    -        ref = PROVIDERS.credential_api.create_credential(ref['id'], ref,
    -                                                         initiator=self.audit_initiator)
    +            self._normalize_dict(credential),
    +            trust_id=trust_id, app_cred_id=app_cred_id,
    +            access_token_id=access_token_id)
    +        ref = PROVIDERS.credential_api.create_credential(
    +            ref['id'], ref, initiator=self.audit_initiator)
             return self.wrap_member(ref), http_client.CREATED
     
    +    def _validate_blob_update_keys(self, credential, ref):
    +        if credential.get('type', '').lower() == 'ec2':
    +            new_blob = self._validate_blob_json(ref)
    +            old_blob = credential.get('blob')
    +            if isinstance(old_blob, six.string_types):
    +                old_blob = jsonutils.loads(old_blob)
    +            # if there was a scope set, prevent changing it or unsetting it
    +            for key in ['trust_id', 'app_cred_id', 'access_token_id']:
    +                if old_blob.get(key) != new_blob.get(key):
    +                    message = _('%s can not be updated for credential') % key
    +                    raise exception.ValidationError(message=message)
    +
         def patch(self, credential_id):
             # Update Credential
             ENFORCER.enforce_call(
                 action='identity:update_credential',
                 build_target=_build_target_enforcement
             )
    -        PROVIDERS.credential_api.get_credential(credential_id)
    +        current = PROVIDERS.credential_api.get_credential(credential_id)
     
             credential = self.request_body_json.get('credential', {})
             validation.lazy_validate(schema.credential_update, credential)
    +        self._validate_blob_update_keys(current.copy(), credential.copy())
             self._require_matching_id(credential)
    +        # Check that the user hasn't illegally modified the owner or scope
    +        target = {'credential': dict(current, **credential)}
    +        ENFORCER.enforce_call(
    +            action='identity:update_credential', target_attr=target
    +        )
             ref = PROVIDERS.credential_api.update_credential(
                 credential_id, credential)
             return self.wrap_member(ref)
    
  • keystone/api/_shared/EC2_S3_Resource.py+40 5 modified
    @@ -115,7 +115,9 @@ def handle_authenticate(self):
                 project_id=cred.get('project_id'),
                 access=loaded.get('access'),
                 secret=loaded.get('secret'),
    -            trust_id=loaded.get('trust_id')
    +            trust_id=loaded.get('trust_id'),
    +            app_cred_id=loaded.get('app_cred_id'),
    +            access_token_id=loaded.get('access_token_id')
             )
     
             # validate the signature
    @@ -137,8 +139,34 @@ def handle_authenticate(self):
                     sys.exc_info()[2])
     
             self._check_timestamp(credentials)
    -        roles = PROVIDERS.assignment_api.get_roles_for_user_and_project(
    -            user_ref['id'], project_ref['id'])
    +
    +        trustee_user_id = None
    +        auth_context = None
    +        if cred_data['trust_id']:
    +            trust = PROVIDERS.trust_api.get_trust(cred_data['trust_id'])
    +            roles = [r['id'] for r in trust['roles']]
    +            # NOTE(cmurphy): if this credential was created using a
    +            # trust-scoped token with impersonation, the user_id will be for
    +            # the trustor, not the trustee. In this case, issuing a
    +            # trust-scoped token to the trustor will fail. In order to get a
    +            # trust-scoped token, use the user ID of the trustee. With
    +            # impersonation, the resulting token will still be for the trustor.
    +            # Without impersonation, the token will be for the trustee.
    +            if trust['impersonation'] is True:
    +                trustee_user_id = trust['trustee_user_id']
    +        elif cred_data['app_cred_id']:
    +            ac_client = PROVIDERS.application_credential_api
    +            app_cred = ac_client.get_application_credential(
    +                cred_data['app_cred_id'])
    +            roles = [r['id'] for r in app_cred['roles']]
    +        elif cred_data['access_token_id']:
    +            access_token = PROVIDERS.oauth_api.get_access_token(
    +                cred_data['access_token_id'])
    +            roles = jsonutils.loads(access_token['role_ids'])
    +            auth_context = {'access_token_id': cred_data['access_token_id']}
    +        else:
    +            roles = PROVIDERS.assignment_api.get_roles_for_user_and_project(
    +                user_ref['id'], project_ref['id'])
     
             if not roles:
                 raise ks_exceptions.Unauthorized(_('User not valid for project.'))
    @@ -149,7 +177,14 @@ def handle_authenticate(self):
     
             method_names = ['ec2credential']
     
    +        if trustee_user_id:
    +            user_id = trustee_user_id
    +        else:
    +            user_id = user_ref['id']
             token = PROVIDERS.token_provider_api.issue_token(
    -            user_id=user_ref['id'], method_names=method_names,
    -            project_id=project_ref['id'])
    +            user_id=user_id, method_names=method_names,
    +            project_id=project_ref['id'],
    +            trust_id=cred_data['trust_id'],
    +            app_cred_id=cred_data['app_cred_id'],
    +            auth_context=auth_context)
             return token
    
  • keystone/api/users.py+20 2 modified
    @@ -568,6 +568,25 @@ def _normalize_role_list(app_cred_roles):
                         role['name']))
             return roles
     
    +    def _get_roles(self, app_cred_data, token):
    +        if app_cred_data.get('roles'):
    +            roles = self._normalize_role_list(app_cred_data['roles'])
    +            # NOTE(cmurphy): The user is not allowed to add a role that is not
    +            # in their token. This is to prevent trustees or application
    +            # credential users from escallating their privileges to include
    +            # additional roles that the trustor or application credential
    +            # creator has assigned on the project.
    +            token_roles = [r['id'] for r in token.roles]
    +            for role in roles:
    +                if role['id'] not in token_roles:
    +                    detail = _('Cannot create an application credential with '
    +                               'unassigned role')
    +                    raise ks_exception.ApplicationCredentialValidationError(
    +                        detail=detail)
    +        else:
    +            roles = token.roles
    +        return roles
    +
         def get(self, user_id):
             """List application credentials for user.
     
    @@ -603,8 +622,7 @@ def post(self, user_id):
                 app_cred_data['secret'] = self._generate_secret()
             app_cred_data['user_id'] = user_id
             app_cred_data['project_id'] = project_id
    -        app_cred_data['roles'] = self._normalize_role_list(
    -            app_cred_data.get('roles', token.roles))
    +        app_cred_data['roles'] = self._get_roles(app_cred_data, token)
             if app_cred_data.get('expires_at'):
                 app_cred_data['expires_at'] = utils.parse_expiration_date(
                     app_cred_data['expires_at'])
    
  • keystone/tests/unit/test_v3_application_credential.py+31 0 modified
    @@ -166,6 +166,37 @@ def test_create_application_credential_with_application_credential(self):
                        expected_status_code=http_client.FORBIDDEN,
                        headers={'X-Auth-Token': token})
     
    +    def test_create_application_credential_with_trust(self):
    +        second_role = unit.new_role_ref(name='reader')
    +        PROVIDERS.role_api.create_role(second_role['id'], second_role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            self.user_id, self.project_id, second_role['id'])
    +        with self.test_client() as c:
    +            pw_token = self.get_scoped_token()
    +            # create a self-trust - only the roles are important for this test
    +            trust_ref = unit.new_trust_ref(
    +                trustor_user_id=self.user_id,
    +                trustee_user_id=self.user_id,
    +                project_id=self.project_id,
    +                role_ids=[second_role['id']])
    +            resp = c.post('/v3/OS-TRUST/trusts',
    +                          headers={'X-Auth-Token': pw_token},
    +                          json={'trust': trust_ref})
    +            trust_id = resp.json['trust']['id']
    +            trust_auth = self.build_authentication_request(
    +                user_id=self.user_id,
    +                password=self.user['password'],
    +                trust_id=trust_id)
    +            trust_token = self.v3_create_token(
    +                trust_auth).headers['X-Subject-Token']
    +            app_cred = self._app_cred_body(roles=[{'id': self.role_id}])
    +            # only the roles from the trust token should be allowed, even if
    +            # the user has the role assigned on the project
    +            c.post('/v3/users/%s/application_credentials' % self.user_id,
    +                   headers={'X-Auth-Token': trust_token},
    +                   json=app_cred,
    +                   expected_status_code=http_client.BAD_REQUEST)
    +
         def test_create_application_credential_allow_recursion(self):
             with self.test_client() as c:
                 roles = [{'id': self.role_id}]
    
  • keystone/tests/unit/test_v3_credential.py+395 35 modified
    @@ -19,14 +19,15 @@
     from keystoneclient.contrib.ec2 import utils as ec2_utils
     import mock
     from oslo_db import exception as oslo_db_exception
    -from six.moves import http_client
    +from six.moves import http_client, urllib
     from testtools import matchers
     
     from keystone.api import ec2tokens
     from keystone.common import provider_api
     from keystone.common import utils
     from keystone.credential.providers import fernet as credential_fernet
     from keystone import exception
    +from keystone import oauth1
     from keystone.tests import unit
     from keystone.tests.unit import ksfixtures
     from keystone.tests.unit import test_v3
    @@ -63,6 +64,33 @@ def _create_dict_blob_credential(self):
     
             return json.dumps(blob), credential_id
     
    +    def _test_get_token(self, access, secret):
    +        """Test signature validation with the access/secret provided."""
    +        signer = ec2_utils.Ec2Signer(secret)
    +        params = {'SignatureMethod': 'HmacSHA256',
    +                  'SignatureVersion': '2',
    +                  'AWSAccessKeyId': access}
    +        request = {'host': 'foo',
    +                   'verb': 'GET',
    +                   'path': '/bar',
    +                   'params': params}
    +        signature = signer.generate(request)
    +
    +        # Now make a request to validate the signed dummy request via the
    +        # ec2tokens API.  This proves the v3 ec2 credentials actually work.
    +        sig_ref = {'access': access,
    +                   'signature': signature,
    +                   'host': 'foo',
    +                   'verb': 'GET',
    +                   'path': '/bar',
    +                   'params': params}
    +        r = self.post(
    +            '/ec2tokens',
    +            body={'ec2Credentials': sig_ref},
    +            expected_status=http_client.OK)
    +        self.assertValidTokenResponse(r)
    +        return r.result['token']
    +
     
     class CredentialTestCase(CredentialBaseTestCase):
         """Test credential CRUD."""
    @@ -258,6 +286,126 @@ def test_update_credential_to_ec2_with_previously_set_project_id(self):
                     'credential_id': credential_id},
                 body={'credential': update_ref})
     
    +    def test_update_credential_non_owner(self):
    +        """Call ``PATCH /credentials/{credential_id}``."""
    +        alt_user = unit.create_user(
    +            PROVIDERS.identity_api, domain_id=self.domain_id)
    +        alt_user_id = alt_user['id']
    +        alt_project = unit.new_project_ref(domain_id=self.domain_id)
    +        alt_project_id = alt_project['id']
    +        PROVIDERS.resource_api.create_project(
    +            alt_project['id'], alt_project)
    +        alt_role = unit.new_role_ref(name='reader')
    +        alt_role_id = alt_role['id']
    +        PROVIDERS.role_api.create_role(alt_role_id, alt_role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            alt_user_id, alt_project_id, alt_role_id)
    +        auth = self.build_authentication_request(
    +            user_id=alt_user_id,
    +            password=alt_user['password'],
    +            project_id=alt_project_id)
    +        ref = unit.new_credential_ref(user_id=alt_user_id,
    +                                      project_id=alt_project_id)
    +        r = self.post(
    +            '/credentials',
    +            auth=auth,
    +            body={'credential': ref})
    +        self.assertValidCredentialResponse(r, ref)
    +        credential_id = r.result.get('credential')['id']
    +
    +        # Cannot change the credential to be owned by another user
    +        update_ref = {'user_id': self.user_id, 'project_id': self.project_id}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            expected_status=403,
    +            auth=auth,
    +            body={'credential': update_ref})
    +
    +    def test_update_ec2_credential_change_trust_id(self):
    +        """Call ``PATCH /credentials/{credential_id}``."""
    +        blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
    +                                            project_id=self.project_id)
    +        blob['trust_id'] = uuid.uuid4().hex
    +        ref['blob'] = json.dumps(blob)
    +        r = self.post(
    +            '/credentials',
    +            body={'credential': ref})
    +        self.assertValidCredentialResponse(r, ref)
    +        credential_id = r.result.get('credential')['id']
    +        # Try changing to a different trust
    +        blob['trust_id'] = uuid.uuid4().hex
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http_client.BAD_REQUEST)
    +        # Try removing the trust
    +        del blob['trust_id']
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http_client.BAD_REQUEST)
    +
    +    def test_update_ec2_credential_change_app_cred_id(self):
    +        """Call ``PATCH /credentials/{credential_id}``."""
    +        blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
    +                                            project_id=self.project_id)
    +        blob['app_cred_id'] = uuid.uuid4().hex
    +        ref['blob'] = json.dumps(blob)
    +        r = self.post(
    +            '/credentials',
    +            body={'credential': ref})
    +        self.assertValidCredentialResponse(r, ref)
    +        credential_id = r.result.get('credential')['id']
    +        # Try changing to a different app cred
    +        blob['app_cred_id'] = uuid.uuid4().hex
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http_client.BAD_REQUEST)
    +        # Try removing the app cred
    +        del blob['app_cred_id']
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http_client.BAD_REQUEST)
    +
    +    def test_update_ec2_credential_change_access_token_id(self):
    +        """Call ``PATCH /credentials/{credential_id}``."""
    +        blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
    +                                            project_id=self.project_id)
    +        blob['access_token_id'] = uuid.uuid4().hex
    +        ref['blob'] = json.dumps(blob)
    +        r = self.post(
    +            '/credentials',
    +            body={'credential': ref})
    +        self.assertValidCredentialResponse(r, ref)
    +        credential_id = r.result.get('credential')['id']
    +        # Try changing to a different access token
    +        blob['access_token_id'] = uuid.uuid4().hex
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http_client.BAD_REQUEST)
    +        # Try removing the access token
    +        del blob['access_token_id']
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http_client.BAD_REQUEST)
    +
         def test_delete_credential(self):
             """Call ``DELETE /credentials/{credential_id}``."""
             self.delete(
    @@ -393,7 +541,7 @@ def test_create_credential_with_admin_token(self):
             self.assertValidCredentialResponse(r, ref)
     
     
    -class TestCredentialTrustScoped(test_v3.RestfulTestCase):
    +class TestCredentialTrustScoped(CredentialBaseTestCase):
         """Test credential with trust scoped token."""
     
         def setUp(self):
    @@ -446,7 +594,7 @@ def test_trust_scoped_ec2_credential(self):
             token_id = r.headers.get('X-Subject-Token')
     
             # Create the credential with the trust scoped token
    -        blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
    +        blob, ref = unit.new_ec2_credential(user_id=self.user_id,
                                                 project_id=self.project_id)
             r = self.post('/credentials', body={'credential': ref}, token=token_id)
     
    @@ -463,6 +611,97 @@ def test_trust_scoped_ec2_credential(self):
             self.assertEqual(hashlib.sha256(access).hexdigest(),
                              r.result['credential']['id'])
     
    +        # Create a role assignment to ensure that it is ignored and only the
    +        # trust-delegated roles are used
    +        role = unit.new_role_ref(name='reader')
    +        role_id = role['id']
    +        PROVIDERS.role_api.create_role(role_id, role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            self.user_id, self.project_id, role_id)
    +
    +        ret_blob = json.loads(r.result['credential']['blob'])
    +        ec2token = self._test_get_token(
    +            access=ret_blob['access'], secret=ret_blob['secret'])
    +        ec2_roles = [role['id'] for role in ec2token['roles']]
    +        self.assertIn(self.role_id, ec2_roles)
    +        self.assertNotIn(role_id, ec2_roles)
    +
    +        # Create second ec2 credential with the same access key id and check
    +        # for conflict.
    +        self.post(
    +            '/credentials',
    +            body={'credential': ref},
    +            token=token_id,
    +            expected_status=http_client.CONFLICT)
    +
    +
    +class TestCredentialAppCreds(CredentialBaseTestCase):
    +    """Test credential with application credential token."""
    +
    +    def setUp(self):
    +        super(TestCredentialAppCreds, self).setUp()
    +        self.useFixture(
    +            ksfixtures.KeyRepository(
    +                self.config_fixture,
    +                'credential',
    +                credential_fernet.MAX_ACTIVE_KEYS
    +            )
    +        )
    +
    +    def test_app_cred_ec2_credential(self):
    +        """Test creating ec2 credential from an application credential.
    +
    +        Call ``POST /credentials``.
    +        """
    +        # Create the app cred
    +        ref = {
    +            'name': uuid.uuid4().hex,
    +            'roles': [{'id': self.role_id}]
    +        }
    +        r = self.post('/users/%s/application_credentials' % self.user_id,
    +                      body={'application_credential': ref})
    +        app_cred = r.result['application_credential']
    +
    +        # Get an application credential token
    +        auth_data = self.build_authentication_request(
    +            app_cred_id=app_cred['id'],
    +            secret=app_cred['secret'])
    +        r = self.v3_create_token(auth_data)
    +        token_id = r.headers.get('X-Subject-Token')
    +
    +        # Create the credential with the app cred token
    +        blob, ref = unit.new_ec2_credential(user_id=self.user_id,
    +                                            project_id=self.project_id)
    +        r = self.post('/credentials', body={'credential': ref}, token=token_id)
    +
    +        # We expect the response blob to contain the app_cred_id
    +        ret_ref = ref.copy()
    +        ret_blob = blob.copy()
    +        ret_blob['app_cred_id'] = app_cred['id']
    +        ret_ref['blob'] = json.dumps(ret_blob)
    +        self.assertValidCredentialResponse(r, ref=ret_ref)
    +
    +        # Assert credential id is same as hash of access key id for
    +        # ec2 credentials
    +        access = blob['access'].encode('utf-8')
    +        self.assertEqual(hashlib.sha256(access).hexdigest(),
    +                         r.result['credential']['id'])
    +
    +        # Create a role assignment to ensure that it is ignored and only the
    +        # roles in the app cred are used
    +        role = unit.new_role_ref(name='reader')
    +        role_id = role['id']
    +        PROVIDERS.role_api.create_role(role_id, role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            self.user_id, self.project_id, role_id)
    +
    +        ret_blob = json.loads(r.result['credential']['blob'])
    +        ec2token = self._test_get_token(
    +            access=ret_blob['access'], secret=ret_blob['secret'])
    +        ec2_roles = [role['id'] for role in ec2token['roles']]
    +        self.assertIn(self.role_id, ec2_roles)
    +        self.assertNotIn(role_id, ec2_roles)
    +
             # Create second ec2 credential with the same access key id and check
             # for conflict.
             self.post(
    @@ -472,34 +711,155 @@ def test_trust_scoped_ec2_credential(self):
                 expected_status=http_client.CONFLICT)
     
     
    -class TestCredentialEc2(CredentialBaseTestCase):
    -    """Test v3 credential compatibility with ec2tokens."""
    +class TestCredentialAccessToken(CredentialBaseTestCase):
    +    """Test credential with access token."""
     
    -    def _validate_signature(self, access, secret):
    -        """Test signature validation with the access/secret provided."""
    -        signer = ec2_utils.Ec2Signer(secret)
    -        params = {'SignatureMethod': 'HmacSHA256',
    -                  'SignatureVersion': '2',
    -                  'AWSAccessKeyId': access}
    -        request = {'host': 'foo',
    -                   'verb': 'GET',
    -                   'path': '/bar',
    -                   'params': params}
    -        signature = signer.generate(request)
    +    def setUp(self):
    +        super(TestCredentialAccessToken, self).setUp()
    +        self.useFixture(
    +            ksfixtures.KeyRepository(
    +                self.config_fixture,
    +                'credential',
    +                credential_fernet.MAX_ACTIVE_KEYS
    +            )
    +        )
    +        self.base_url = 'http://localhost/v3'
    +
    +    def _urllib_parse_qs_text_keys(self, content):
    +        results = urllib.parse.parse_qs(content)
    +        return {key.decode('utf-8'): value for key, value in results.items()}
    +
    +    def _create_single_consumer(self):
    +        endpoint = '/OS-OAUTH1/consumers'
    +
    +        ref = {'description': uuid.uuid4().hex}
    +        resp = self.post(endpoint, body={'consumer': ref})
    +        return resp.result['consumer']
    +
    +    def _create_request_token(self, consumer, project_id, base_url=None):
    +        endpoint = '/OS-OAUTH1/request_token'
    +        client = oauth1.Client(consumer['key'],
    +                               client_secret=consumer['secret'],
    +                               signature_method=oauth1.SIG_HMAC,
    +                               callback_uri="oob")
    +        headers = {'requested_project_id': project_id}
    +        if not base_url:
    +            base_url = self.base_url
    +        url, headers, body = client.sign(base_url + endpoint,
    +                                         http_method='POST',
    +                                         headers=headers)
    +        return endpoint, headers
    +
    +    def _create_access_token(self, consumer, token, base_url=None):
    +        endpoint = '/OS-OAUTH1/access_token'
    +        client = oauth1.Client(consumer['key'],
    +                               client_secret=consumer['secret'],
    +                               resource_owner_key=token.key,
    +                               resource_owner_secret=token.secret,
    +                               signature_method=oauth1.SIG_HMAC,
    +                               verifier=token.verifier)
    +        if not base_url:
    +            base_url = self.base_url
    +        url, headers, body = client.sign(base_url + endpoint,
    +                                         http_method='POST')
    +        headers.update({'Content-Type': 'application/json'})
    +        return endpoint, headers
    +
    +    def _get_oauth_token(self, consumer, token):
    +        client = oauth1.Client(consumer['key'],
    +                               client_secret=consumer['secret'],
    +                               resource_owner_key=token.key,
    +                               resource_owner_secret=token.secret,
    +                               signature_method=oauth1.SIG_HMAC)
    +        endpoint = '/auth/tokens'
    +        url, headers, body = client.sign(self.base_url + endpoint,
    +                                         http_method='POST')
    +        headers.update({'Content-Type': 'application/json'})
    +        ref = {'auth': {'identity': {'oauth1': {}, 'methods': ['oauth1']}}}
    +        return endpoint, headers, ref
    +
    +    def _authorize_request_token(self, request_id):
    +        if isinstance(request_id, bytes):
    +            request_id = request_id.decode()
    +        return '/OS-OAUTH1/authorize/%s' % (request_id)
    +
    +    def _get_access_token(self):
    +        consumer = self._create_single_consumer()
    +        consumer_id = consumer['id']
    +        consumer_secret = consumer['secret']
    +        consumer = {'key': consumer_id, 'secret': consumer_secret}
    +
    +        url, headers = self._create_request_token(consumer, self.project_id)
    +        content = self.post(
    +            url, headers=headers,
    +            response_content_type='application/x-www-form-urlencoded')
    +        credentials = self._urllib_parse_qs_text_keys(content.result)
    +        request_key = credentials['oauth_token'][0]
    +        request_secret = credentials['oauth_token_secret'][0]
    +        request_token = oauth1.Token(request_key, request_secret)
    +
    +        url = self._authorize_request_token(request_key)
    +        body = {'roles': [{'id': self.role_id}]}
    +        resp = self.put(url, body=body, expected_status=http_client.OK)
    +        verifier = resp.result['token']['oauth_verifier']
    +
    +        request_token.set_verifier(verifier)
    +        url, headers = self._create_access_token(consumer, request_token)
    +        content = self.post(
    +            url, headers=headers,
    +            response_content_type='application/x-www-form-urlencoded')
    +        credentials = self._urllib_parse_qs_text_keys(content.result)
    +        access_key = credentials['oauth_token'][0]
    +        access_secret = credentials['oauth_token_secret'][0]
    +        access_token = oauth1.Token(access_key, access_secret)
    +
    +        url, headers, body = self._get_oauth_token(consumer, access_token)
    +        content = self.post(url, headers=headers, body=body)
    +        return access_key, content.headers['X-Subject-Token']
    +
    +    def test_access_token_ec2_credential(self):
    +        """Test creating ec2 credential from an oauth access token.
     
    -        # Now make a request to validate the signed dummy request via the
    -        # ec2tokens API.  This proves the v3 ec2 credentials actually work.
    -        sig_ref = {'access': access,
    -                   'signature': signature,
    -                   'host': 'foo',
    -                   'verb': 'GET',
    -                   'path': '/bar',
    -                   'params': params}
    -        r = self.post(
    -            '/ec2tokens',
    -            body={'ec2Credentials': sig_ref},
    -            expected_status=http_client.OK)
    -        self.assertValidTokenResponse(r)
    +        Call ``POST /credentials``.
    +        """
    +        access_key, token_id = self._get_access_token()
    +
    +        # Create the credential with the access token
    +        blob, ref = unit.new_ec2_credential(user_id=self.user_id,
    +                                            project_id=self.project_id)
    +        r = self.post('/credentials', body={'credential': ref}, token=token_id)
    +
    +        # We expect the response blob to contain the access_token_id
    +        ret_ref = ref.copy()
    +        ret_blob = blob.copy()
    +        ret_blob['access_token_id'] = access_key.decode('utf-8')
    +        ret_ref['blob'] = json.dumps(ret_blob)
    +        self.assertValidCredentialResponse(r, ref=ret_ref)
    +
    +        # Assert credential id is same as hash of access key id for
    +        # ec2 credentials
    +        access = blob['access'].encode('utf-8')
    +        self.assertEqual(hashlib.sha256(access).hexdigest(),
    +                         r.result['credential']['id'])
    +
    +        # Create a role assignment to ensure that it is ignored and only the
    +        # roles in the access token are used
    +        role = unit.new_role_ref(name='reader')
    +        role_id = role['id']
    +        PROVIDERS.role_api.create_role(role_id, role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            self.user_id, self.project_id, role_id)
    +
    +        ret_blob = json.loads(r.result['credential']['blob'])
    +        ec2token = self._test_get_token(
    +            access=ret_blob['access'], secret=ret_blob['secret'])
    +        ec2_roles = [role['id'] for role in ec2token['roles']]
    +        self.assertIn(self.role_id, ec2_roles)
    +        self.assertNotIn(role_id, ec2_roles)
    +
    +
    +class TestCredentialEc2(CredentialBaseTestCase):
    +    """Test v3 credential compatibility with ec2tokens."""
     
         def test_ec2_credential_signature_validate(self):
             """Test signature validation with a v3 ec2 credential."""
    @@ -514,15 +874,15 @@ def test_ec2_credential_signature_validate(self):
     
             cred_blob = json.loads(r.result['credential']['blob'])
             self.assertEqual(blob, cred_blob)
    -        self._validate_signature(access=cred_blob['access'],
    -                                 secret=cred_blob['secret'])
    +        self._test_get_token(access=cred_blob['access'],
    +                             secret=cred_blob['secret'])
     
         def test_ec2_credential_signature_validate_legacy(self):
             """Test signature validation with a legacy v3 ec2 credential."""
             cred_json, _ = self._create_dict_blob_credential()
             cred_blob = json.loads(cred_json)
    -        self._validate_signature(access=cred_blob['access'],
    -                                 secret=cred_blob['secret'])
    +        self._test_get_token(access=cred_blob['access'],
    +                             secret=cred_blob['secret'])
     
         def _get_ec2_cred_uri(self):
             return '/users/%s/credentials/OS-EC2' % self.user_id
    @@ -538,8 +898,8 @@ def test_ec2_create_credential(self):
             self.assertEqual(self.user_id, ec2_cred['user_id'])
             self.assertEqual(self.project_id, ec2_cred['tenant_id'])
             self.assertIsNone(ec2_cred['trust_id'])
    -        self._validate_signature(access=ec2_cred['access'],
    -                                 secret=ec2_cred['secret'])
    +        self._test_get_token(access=ec2_cred['access'],
    +                             secret=ec2_cred['secret'])
             uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])
             self.assertThat(ec2_cred['links']['self'],
                             matchers.EndsWith(uri))
    
  • releasenotes/notes/bug-1872733-2377f456a57ad32c.yaml+16 0 added
    @@ -0,0 +1,16 @@
    +---
    +critical:
    +  - |
    +    [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_]
    +    Fixed a critical security issue in which an authenticated user could
    +    escalate their privileges by altering a valid EC2 credential.
    +security:
    +  - |
    +    [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_]
    +    Fixed a critical security issue in which an authenticated user could
    +    escalate their privileges by altering a valid EC2 credential.
    +fixes:
    +  - |
    +    [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_]
    +    Fixed a critical security issue in which an authenticated user could
    +    escalate their privileges by altering a valid EC2 credential.
    
  • releasenotes/notes/bug-1872735-0989e51d2248ce1e.yaml+31 0 added
    @@ -0,0 +1,31 @@
    +---
    +critical:
    +  - |
    +    [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_]
    +    Fixed a security issue in which a trustee or an application credential user
    +    could create an EC2 credential or an application credential that would
    +    permit them to get a token that elevated their role assignments beyond the
    +    subset delegated to them in the trust or application credential. A new
    +    attribute ``app_cred_id`` is now automatically added to the access blob of
    +    an EC2 credential and the role list in the trust or application credential
    +    is respected.
    +security:
    +  - |
    +    [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_]
    +    Fixed a security issue in which a trustee or an application credential user
    +    could create an EC2 credential or an application credential that would
    +    permit them to get a token that elevated their role assignments beyond the
    +    subset delegated to them in the trust or application credential. A new
    +    attribute ``app_cred_id`` is now automatically added to the access blob of
    +    an EC2 credential and the role list in the trust or application credential
    +    is respected.
    +fixes:
    +  - |
    +    [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_]
    +    Fixed a security issue in which a trustee or an application credential user
    +    could create an EC2 credential or an application credential that would
    +    permit them to get a token that elevated their role assignments beyond the
    +    subset delegated to them in the trust or application credential. A new
    +    attribute ``app_cred_id`` is now automatically added to the access blob of
    +    an EC2 credential and the role list in the trust or application credential
    +    is respected.
    
  • releasenotes/notes/bug-1872755-2c81d3267b89f124.yaml+19 0 added
    @@ -0,0 +1,19 @@
    +---
    +security:
    +  - |
    +    [`bug 1872755 <https://bugs.launchpad.net/keystone/+bug/1872755>`_]
    +    Added validation to the EC2 credentials update API to ensure the metadata
    +    labels 'trust_id' and 'app_cred_id' are not altered by the user. These
    +    labels are used by keystone to determine the scope allowed by the
    +    credential, and altering these automatic labels could enable an EC2
    +    credential holder to elevate their access beyond what is permitted by the
    +    application credential or trust that was used to create the EC2 credential.
    +fixes:
    +  - |
    +    [`bug 1872755 <https://bugs.launchpad.net/keystone/+bug/1872755>`_]
    +    Added validation to the EC2 credentials update API to ensure the metadata
    +    labels 'trust_id' and 'app_cred_id' are not altered by the user. These
    +    labels are used by keystone to determine the scope allowed by the
    +    credential, and altering these automatic labels could enable an EC2
    +    credential holder to elevate their access beyond what is permitted by the
    +    application credential or trust that was used to create the EC2 credential.
    
40cbb7bebd50

Merge "Fix security issues with EC2 credentials" into stable/train

8 files changed · +606 62
  • keystone/api/credentials.py+56 20 modified
    @@ -13,6 +13,7 @@
     # This file handles all flask-restful resources for /v3/credentials
     
     import hashlib
    +import six
     
     import flask
     from oslo_serialization import jsonutils
    @@ -60,30 +61,41 @@ def _blob_to_json(ref):
                 ref['blob'] = jsonutils.dumps(blob)
             return ref
     
    -    def _assign_unique_id(self, ref, trust_id=None):
    +    def _validate_blob_json(self, ref):
    +        try:
    +            blob = jsonutils.loads(ref.get('blob'))
    +        except (ValueError, TabError):
    +            raise exception.ValidationError(
    +                message=_('Invalid blob in credential'))
    +        if not blob or not isinstance(blob, dict):
    +            raise exception.ValidationError(attribute='blob',
    +                                            target='credential')
    +        if blob.get('access') is None:
    +            raise exception.ValidationError(attribute='access',
    +                                            target='credential')
    +        return blob
    +
    +    def _assign_unique_id(
    +            self, ref, trust_id=None, app_cred_id=None, access_token_id=None):
             # Generates an assigns a unique identifier to a credential reference.
             if ref.get('type', '').lower() == 'ec2':
    -            try:
    -                blob = jsonutils.loads(ref.get('blob'))
    -            except (ValueError, TabError):
    -                raise exception.ValidationError(
    -                    message=_('Invalid blob in credential'))
    -            if not blob or not isinstance(blob, dict):
    -                raise exception.ValidationError(attribute='blob',
    -                                                target='credential')
    -            if blob.get('access') is None:
    -                raise exception.ValidationError(attribute='access',
    -                                                target='credential')
    -
    +            blob = self._validate_blob_json(ref)
                 ref = ref.copy()
                 ref['id'] = hashlib.sha256(
                     blob['access'].encode('utf8')).hexdigest()
    -            # update the blob with the trust_id, so credentials created with
    -            # a trust scoped token will result in trust scoped tokens when
    -            # authentication via ec2tokens happens
    +            # update the blob with the trust_id or app_cred_id, so credentials
    +            # created with a trust- or app cred-scoped token will result in
    +            # trust- or app cred-scoped tokens when authentication via
    +            # ec2tokens happens
                 if trust_id is not None:
                     blob['trust_id'] = trust_id
                     ref['blob'] = jsonutils.dumps(blob)
    +            if app_cred_id is not None:
    +                blob['app_cred_id'] = app_cred_id
    +                ref['blob'] = jsonutils.dumps(blob)
    +            if access_token_id is not None:
    +                blob['access_token_id'] = access_token_id
    +                ref['blob'] = jsonutils.dumps(blob)
                 return ref
             else:
                 return super(CredentialResource, self)._assign_unique_id(ref)
    @@ -146,23 +158,47 @@ def post(self):
             )
             validation.lazy_validate(schema.credential_create, credential)
             trust_id = getattr(self.oslo_context, 'trust_id', None)
    +        app_cred_id = getattr(
    +            self.auth_context['token'], 'application_credential_id', None)
    +        access_token_id = getattr(
    +            self.auth_context['token'], 'access_token_id', None)
             ref = self._assign_unique_id(
    -            self._normalize_dict(credential), trust_id=trust_id)
    -        ref = PROVIDERS.credential_api.create_credential(ref['id'], ref,
    -                                                         initiator=self.audit_initiator)
    +            self._normalize_dict(credential),
    +            trust_id=trust_id, app_cred_id=app_cred_id,
    +            access_token_id=access_token_id)
    +        ref = PROVIDERS.credential_api.create_credential(
    +            ref['id'], ref, initiator=self.audit_initiator)
             return self.wrap_member(ref), http_client.CREATED
     
    +    def _validate_blob_update_keys(self, credential, ref):
    +        if credential.get('type', '').lower() == 'ec2':
    +            new_blob = self._validate_blob_json(ref)
    +            old_blob = credential.get('blob')
    +            if isinstance(old_blob, six.string_types):
    +                old_blob = jsonutils.loads(old_blob)
    +            # if there was a scope set, prevent changing it or unsetting it
    +            for key in ['trust_id', 'app_cred_id', 'access_token_id']:
    +                if old_blob.get(key) != new_blob.get(key):
    +                    message = _('%s can not be updated for credential') % key
    +                    raise exception.ValidationError(message=message)
    +
         def patch(self, credential_id):
             # Update Credential
             ENFORCER.enforce_call(
                 action='identity:update_credential',
                 build_target=_build_target_enforcement
             )
    -        PROVIDERS.credential_api.get_credential(credential_id)
    +        current = PROVIDERS.credential_api.get_credential(credential_id)
     
             credential = self.request_body_json.get('credential', {})
             validation.lazy_validate(schema.credential_update, credential)
    +        self._validate_blob_update_keys(current.copy(), credential.copy())
             self._require_matching_id(credential)
    +        # Check that the user hasn't illegally modified the owner or scope
    +        target = {'credential': dict(current, **credential)}
    +        ENFORCER.enforce_call(
    +            action='identity:update_credential', target_attr=target
    +        )
             ref = PROVIDERS.credential_api.update_credential(
                 credential_id, credential)
             return self.wrap_member(ref)
    
  • keystone/api/_shared/EC2_S3_Resource.py+40 5 modified
    @@ -115,7 +115,9 @@ def handle_authenticate(self):
                 project_id=cred.get('project_id'),
                 access=loaded.get('access'),
                 secret=loaded.get('secret'),
    -            trust_id=loaded.get('trust_id')
    +            trust_id=loaded.get('trust_id'),
    +            app_cred_id=loaded.get('app_cred_id'),
    +            access_token_id=loaded.get('access_token_id')
             )
     
             # validate the signature
    @@ -137,8 +139,34 @@ def handle_authenticate(self):
                     sys.exc_info()[2])
     
             self._check_timestamp(credentials)
    -        roles = PROVIDERS.assignment_api.get_roles_for_user_and_project(
    -            user_ref['id'], project_ref['id'])
    +
    +        trustee_user_id = None
    +        auth_context = None
    +        if cred_data['trust_id']:
    +            trust = PROVIDERS.trust_api.get_trust(cred_data['trust_id'])
    +            roles = [r['id'] for r in trust['roles']]
    +            # NOTE(cmurphy): if this credential was created using a
    +            # trust-scoped token with impersonation, the user_id will be for
    +            # the trustor, not the trustee. In this case, issuing a
    +            # trust-scoped token to the trustor will fail. In order to get a
    +            # trust-scoped token, use the user ID of the trustee. With
    +            # impersonation, the resulting token will still be for the trustor.
    +            # Without impersonation, the token will be for the trustee.
    +            if trust['impersonation'] is True:
    +                trustee_user_id = trust['trustee_user_id']
    +        elif cred_data['app_cred_id']:
    +            ac_client = PROVIDERS.application_credential_api
    +            app_cred = ac_client.get_application_credential(
    +                cred_data['app_cred_id'])
    +            roles = [r['id'] for r in app_cred['roles']]
    +        elif cred_data['access_token_id']:
    +            access_token = PROVIDERS.oauth_api.get_access_token(
    +                cred_data['access_token_id'])
    +            roles = jsonutils.loads(access_token['role_ids'])
    +            auth_context = {'access_token_id': cred_data['access_token_id']}
    +        else:
    +            roles = PROVIDERS.assignment_api.get_roles_for_user_and_project(
    +                user_ref['id'], project_ref['id'])
     
             if not roles:
                 raise ks_exceptions.Unauthorized(_('User not valid for project.'))
    @@ -149,7 +177,14 @@ def handle_authenticate(self):
     
             method_names = ['ec2credential']
     
    +        if trustee_user_id:
    +            user_id = trustee_user_id
    +        else:
    +            user_id = user_ref['id']
             token = PROVIDERS.token_provider_api.issue_token(
    -            user_id=user_ref['id'], method_names=method_names,
    -            project_id=project_ref['id'])
    +            user_id=user_id, method_names=method_names,
    +            project_id=project_ref['id'],
    +            trust_id=cred_data['trust_id'],
    +            app_cred_id=cred_data['app_cred_id'],
    +            auth_context=auth_context)
             return token
    
  • keystone/api/users.py+20 2 modified
    @@ -559,6 +559,25 @@ def _normalize_role_list(app_cred_roles):
                         role['name']))
             return roles
     
    +    def _get_roles(self, app_cred_data, token):
    +        if app_cred_data.get('roles'):
    +            roles = self._normalize_role_list(app_cred_data['roles'])
    +            # NOTE(cmurphy): The user is not allowed to add a role that is not
    +            # in their token. This is to prevent trustees or application
    +            # credential users from escallating their privileges to include
    +            # additional roles that the trustor or application credential
    +            # creator has assigned on the project.
    +            token_roles = [r['id'] for r in token.roles]
    +            for role in roles:
    +                if role['id'] not in token_roles:
    +                    detail = _('Cannot create an application credential with '
    +                               'unassigned role')
    +                    raise ks_exception.ApplicationCredentialValidationError(
    +                        detail=detail)
    +        else:
    +            roles = token.roles
    +        return roles
    +
         def get(self, user_id):
             """List application credentials for user.
     
    @@ -594,8 +613,7 @@ def post(self, user_id):
                 app_cred_data['secret'] = self._generate_secret()
             app_cred_data['user_id'] = user_id
             app_cred_data['project_id'] = project_id
    -        app_cred_data['roles'] = self._normalize_role_list(
    -            app_cred_data.get('roles', token.roles))
    +        app_cred_data['roles'] = self._get_roles(app_cred_data, token)
             if app_cred_data.get('expires_at'):
                 app_cred_data['expires_at'] = utils.parse_expiration_date(
                     app_cred_data['expires_at'])
    
  • keystone/tests/unit/test_v3_application_credential.py+31 0 modified
    @@ -169,6 +169,37 @@ def test_create_application_credential_with_application_credential(self):
                        expected_status_code=http_client.FORBIDDEN,
                        headers={'X-Auth-Token': token})
     
    +    def test_create_application_credential_with_trust(self):
    +        second_role = unit.new_role_ref(name='reader')
    +        PROVIDERS.role_api.create_role(second_role['id'], second_role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            self.user_id, self.project_id, second_role['id'])
    +        with self.test_client() as c:
    +            pw_token = self.get_scoped_token()
    +            # create a self-trust - only the roles are important for this test
    +            trust_ref = unit.new_trust_ref(
    +                trustor_user_id=self.user_id,
    +                trustee_user_id=self.user_id,
    +                project_id=self.project_id,
    +                role_ids=[second_role['id']])
    +            resp = c.post('/v3/OS-TRUST/trusts',
    +                          headers={'X-Auth-Token': pw_token},
    +                          json={'trust': trust_ref})
    +            trust_id = resp.json['trust']['id']
    +            trust_auth = self.build_authentication_request(
    +                user_id=self.user_id,
    +                password=self.user['password'],
    +                trust_id=trust_id)
    +            trust_token = self.v3_create_token(
    +                trust_auth).headers['X-Subject-Token']
    +            app_cred = self._app_cred_body(roles=[{'id': self.role_id}])
    +            # only the roles from the trust token should be allowed, even if
    +            # the user has the role assigned on the project
    +            c.post('/v3/users/%s/application_credentials' % self.user_id,
    +                   headers={'X-Auth-Token': trust_token},
    +                   json=app_cred,
    +                   expected_status_code=http_client.BAD_REQUEST)
    +
         def test_create_application_credential_allow_recursion(self):
             with self.test_client() as c:
                 roles = [{'id': self.role_id}]
    
  • keystone/tests/unit/test_v3_credential.py+393 35 modified
    @@ -19,14 +19,15 @@
     from keystoneclient.contrib.ec2 import utils as ec2_utils
     import mock
     from oslo_db import exception as oslo_db_exception
    -from six.moves import http_client
    +from six.moves import http_client, urllib
     from testtools import matchers
     
     from keystone.api import ec2tokens
     from keystone.common import provider_api
     from keystone.common import utils
     from keystone.credential.providers import fernet as credential_fernet
     from keystone import exception
    +from keystone import oauth1
     from keystone.tests import unit
     from keystone.tests.unit import ksfixtures
     from keystone.tests.unit import test_v3
    @@ -63,6 +64,33 @@ def _create_dict_blob_credential(self):
     
             return json.dumps(blob), credential_id
     
    +    def _test_get_token(self, access, secret):
    +        """Test signature validation with the access/secret provided."""
    +        signer = ec2_utils.Ec2Signer(secret)
    +        params = {'SignatureMethod': 'HmacSHA256',
    +                  'SignatureVersion': '2',
    +                  'AWSAccessKeyId': access}
    +        request = {'host': 'foo',
    +                   'verb': 'GET',
    +                   'path': '/bar',
    +                   'params': params}
    +        signature = signer.generate(request)
    +
    +        # Now make a request to validate the signed dummy request via the
    +        # ec2tokens API.  This proves the v3 ec2 credentials actually work.
    +        sig_ref = {'access': access,
    +                   'signature': signature,
    +                   'host': 'foo',
    +                   'verb': 'GET',
    +                   'path': '/bar',
    +                   'params': params}
    +        r = self.post(
    +            '/ec2tokens',
    +            body={'ec2Credentials': sig_ref},
    +            expected_status=http_client.OK)
    +        self.assertValidTokenResponse(r)
    +        return r.result['token']
    +
     
     class CredentialTestCase(CredentialBaseTestCase):
         """Test credential CRUD."""
    @@ -258,6 +286,126 @@ def test_update_credential_to_ec2_with_previously_set_project_id(self):
                     'credential_id': credential_id},
                 body={'credential': update_ref})
     
    +    def test_update_credential_non_owner(self):
    +        """Call ``PATCH /credentials/{credential_id}``."""
    +        alt_user = unit.create_user(
    +            PROVIDERS.identity_api, domain_id=self.domain_id)
    +        alt_user_id = alt_user['id']
    +        alt_project = unit.new_project_ref(domain_id=self.domain_id)
    +        alt_project_id = alt_project['id']
    +        PROVIDERS.resource_api.create_project(
    +            alt_project['id'], alt_project)
    +        alt_role = unit.new_role_ref(name='reader')
    +        alt_role_id = alt_role['id']
    +        PROVIDERS.role_api.create_role(alt_role_id, alt_role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            alt_user_id, alt_project_id, alt_role_id)
    +        auth = self.build_authentication_request(
    +            user_id=alt_user_id,
    +            password=alt_user['password'],
    +            project_id=alt_project_id)
    +        ref = unit.new_credential_ref(user_id=alt_user_id,
    +                                      project_id=alt_project_id)
    +        r = self.post(
    +            '/credentials',
    +            auth=auth,
    +            body={'credential': ref})
    +        self.assertValidCredentialResponse(r, ref)
    +        credential_id = r.result.get('credential')['id']
    +
    +        # Cannot change the credential to be owned by another user
    +        update_ref = {'user_id': self.user_id, 'project_id': self.project_id}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            expected_status=403,
    +            auth=auth,
    +            body={'credential': update_ref})
    +
    +    def test_update_ec2_credential_change_trust_id(self):
    +        """Call ``PATCH /credentials/{credential_id}``."""
    +        blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
    +                                            project_id=self.project_id)
    +        blob['trust_id'] = uuid.uuid4().hex
    +        ref['blob'] = json.dumps(blob)
    +        r = self.post(
    +            '/credentials',
    +            body={'credential': ref})
    +        self.assertValidCredentialResponse(r, ref)
    +        credential_id = r.result.get('credential')['id']
    +        # Try changing to a different trust
    +        blob['trust_id'] = uuid.uuid4().hex
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http_client.BAD_REQUEST)
    +        # Try removing the trust
    +        del blob['trust_id']
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http_client.BAD_REQUEST)
    +
    +    def test_update_ec2_credential_change_app_cred_id(self):
    +        """Call ``PATCH /credentials/{credential_id}``."""
    +        blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
    +                                            project_id=self.project_id)
    +        blob['app_cred_id'] = uuid.uuid4().hex
    +        ref['blob'] = json.dumps(blob)
    +        r = self.post(
    +            '/credentials',
    +            body={'credential': ref})
    +        self.assertValidCredentialResponse(r, ref)
    +        credential_id = r.result.get('credential')['id']
    +        # Try changing to a different app cred
    +        blob['app_cred_id'] = uuid.uuid4().hex
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http_client.BAD_REQUEST)
    +        # Try removing the app cred
    +        del blob['app_cred_id']
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http_client.BAD_REQUEST)
    +
    +    def test_update_ec2_credential_change_access_token_id(self):
    +        """Call ``PATCH /credentials/{credential_id}``."""
    +        blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
    +                                            project_id=self.project_id)
    +        blob['access_token_id'] = uuid.uuid4().hex
    +        ref['blob'] = json.dumps(blob)
    +        r = self.post(
    +            '/credentials',
    +            body={'credential': ref})
    +        self.assertValidCredentialResponse(r, ref)
    +        credential_id = r.result.get('credential')['id']
    +        # Try changing to a different access token
    +        blob['access_token_id'] = uuid.uuid4().hex
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http_client.BAD_REQUEST)
    +        # Try removing the access token
    +        del blob['access_token_id']
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http_client.BAD_REQUEST)
    +
         def test_delete_credential(self):
             """Call ``DELETE /credentials/{credential_id}``."""
             self.delete(
    @@ -393,7 +541,7 @@ def test_create_credential_with_admin_token(self):
             self.assertValidCredentialResponse(r, ref)
     
     
    -class TestCredentialTrustScoped(test_v3.RestfulTestCase):
    +class TestCredentialTrustScoped(CredentialBaseTestCase):
         """Test credential with trust scoped token."""
     
         def setUp(self):
    @@ -446,7 +594,7 @@ def test_trust_scoped_ec2_credential(self):
             token_id = r.headers.get('X-Subject-Token')
     
             # Create the credential with the trust scoped token
    -        blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
    +        blob, ref = unit.new_ec2_credential(user_id=self.user_id,
                                                 project_id=self.project_id)
             r = self.post('/credentials', body={'credential': ref}, token=token_id)
     
    @@ -463,6 +611,21 @@ def test_trust_scoped_ec2_credential(self):
             self.assertEqual(hashlib.sha256(access).hexdigest(),
                              r.result['credential']['id'])
     
    +        # Create a role assignment to ensure that it is ignored and only the
    +        # trust-delegated roles are used
    +        role = unit.new_role_ref(name='reader')
    +        role_id = role['id']
    +        PROVIDERS.role_api.create_role(role_id, role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            self.user_id, self.project_id, role_id)
    +
    +        ret_blob = json.loads(r.result['credential']['blob'])
    +        ec2token = self._test_get_token(
    +            access=ret_blob['access'], secret=ret_blob['secret'])
    +        ec2_roles = [role['id'] for role in ec2token['roles']]
    +        self.assertIn(self.role_id, ec2_roles)
    +        self.assertNotIn(role_id, ec2_roles)
    +
             # Create second ec2 credential with the same access key id and check
             # for conflict.
             self.post(
    @@ -472,34 +635,229 @@ def test_trust_scoped_ec2_credential(self):
                 expected_status=http_client.CONFLICT)
     
     
    -class TestCredentialEc2(CredentialBaseTestCase):
    -    """Test v3 credential compatibility with ec2tokens."""
    +class TestCredentialAppCreds(CredentialBaseTestCase):
    +    """Test credential with application credential token."""
     
    -    def _validate_signature(self, access, secret):
    -        """Test signature validation with the access/secret provided."""
    -        signer = ec2_utils.Ec2Signer(secret)
    -        params = {'SignatureMethod': 'HmacSHA256',
    -                  'SignatureVersion': '2',
    -                  'AWSAccessKeyId': access}
    -        request = {'host': 'foo',
    -                   'verb': 'GET',
    -                   'path': '/bar',
    -                   'params': params}
    -        signature = signer.generate(request)
    +    def setUp(self):
    +        super(TestCredentialAppCreds, self).setUp()
    +        self.useFixture(
    +            ksfixtures.KeyRepository(
    +                self.config_fixture,
    +                'credential',
    +                credential_fernet.MAX_ACTIVE_KEYS
    +            )
    +        )
     
    -        # Now make a request to validate the signed dummy request via the
    -        # ec2tokens API.  This proves the v3 ec2 credentials actually work.
    -        sig_ref = {'access': access,
    -                   'signature': signature,
    -                   'host': 'foo',
    -                   'verb': 'GET',
    -                   'path': '/bar',
    -                   'params': params}
    -        r = self.post(
    -            '/ec2tokens',
    -            body={'ec2Credentials': sig_ref},
    -            expected_status=http_client.OK)
    -        self.assertValidTokenResponse(r)
    +    def test_app_cred_ec2_credential(self):
    +        """Test creating ec2 credential from an application credential.
    +
    +        Call ``POST /credentials``.
    +        """
    +        # Create the app cred
    +        ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}])
    +        del ref['id']
    +        r = self.post('/users/%s/application_credentials' % self.user_id,
    +                      body={'application_credential': ref})
    +        app_cred = r.result['application_credential']
    +
    +        # Get an application credential token
    +        auth_data = self.build_authentication_request(
    +            app_cred_id=app_cred['id'],
    +            secret=app_cred['secret'])
    +        r = self.v3_create_token(auth_data)
    +        token_id = r.headers.get('X-Subject-Token')
    +
    +        # Create the credential with the app cred token
    +        blob, ref = unit.new_ec2_credential(user_id=self.user_id,
    +                                            project_id=self.project_id)
    +        r = self.post('/credentials', body={'credential': ref}, token=token_id)
    +
    +        # We expect the response blob to contain the app_cred_id
    +        ret_ref = ref.copy()
    +        ret_blob = blob.copy()
    +        ret_blob['app_cred_id'] = app_cred['id']
    +        ret_ref['blob'] = json.dumps(ret_blob)
    +        self.assertValidCredentialResponse(r, ref=ret_ref)
    +
    +        # Assert credential id is same as hash of access key id for
    +        # ec2 credentials
    +        access = blob['access'].encode('utf-8')
    +        self.assertEqual(hashlib.sha256(access).hexdigest(),
    +                         r.result['credential']['id'])
    +
    +        # Create a role assignment to ensure that it is ignored and only the
    +        # roles in the app cred are used
    +        role = unit.new_role_ref(name='reader')
    +        role_id = role['id']
    +        PROVIDERS.role_api.create_role(role_id, role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            self.user_id, self.project_id, role_id)
    +
    +        ret_blob = json.loads(r.result['credential']['blob'])
    +        ec2token = self._test_get_token(
    +            access=ret_blob['access'], secret=ret_blob['secret'])
    +        ec2_roles = [role['id'] for role in ec2token['roles']]
    +        self.assertIn(self.role_id, ec2_roles)
    +        self.assertNotIn(role_id, ec2_roles)
    +
    +        # Create second ec2 credential with the same access key id and check
    +        # for conflict.
    +        self.post(
    +            '/credentials',
    +            body={'credential': ref},
    +            token=token_id,
    +            expected_status=http_client.CONFLICT)
    +
    +
    +class TestCredentialAccessToken(CredentialBaseTestCase):
    +    """Test credential with access token."""
    +
    +    def setUp(self):
    +        super(TestCredentialAccessToken, self).setUp()
    +        self.useFixture(
    +            ksfixtures.KeyRepository(
    +                self.config_fixture,
    +                'credential',
    +                credential_fernet.MAX_ACTIVE_KEYS
    +            )
    +        )
    +        self.base_url = 'http://localhost/v3'
    +
    +    def _urllib_parse_qs_text_keys(self, content):
    +        results = urllib.parse.parse_qs(content)
    +        return {key.decode('utf-8'): value for key, value in results.items()}
    +
    +    def _create_single_consumer(self):
    +        endpoint = '/OS-OAUTH1/consumers'
    +
    +        ref = {'description': uuid.uuid4().hex}
    +        resp = self.post(endpoint, body={'consumer': ref})
    +        return resp.result['consumer']
    +
    +    def _create_request_token(self, consumer, project_id, base_url=None):
    +        endpoint = '/OS-OAUTH1/request_token'
    +        client = oauth1.Client(consumer['key'],
    +                               client_secret=consumer['secret'],
    +                               signature_method=oauth1.SIG_HMAC,
    +                               callback_uri="oob")
    +        headers = {'requested_project_id': project_id}
    +        if not base_url:
    +            base_url = self.base_url
    +        url, headers, body = client.sign(base_url + endpoint,
    +                                         http_method='POST',
    +                                         headers=headers)
    +        return endpoint, headers
    +
    +    def _create_access_token(self, consumer, token, base_url=None):
    +        endpoint = '/OS-OAUTH1/access_token'
    +        client = oauth1.Client(consumer['key'],
    +                               client_secret=consumer['secret'],
    +                               resource_owner_key=token.key,
    +                               resource_owner_secret=token.secret,
    +                               signature_method=oauth1.SIG_HMAC,
    +                               verifier=token.verifier)
    +        if not base_url:
    +            base_url = self.base_url
    +        url, headers, body = client.sign(base_url + endpoint,
    +                                         http_method='POST')
    +        headers.update({'Content-Type': 'application/json'})
    +        return endpoint, headers
    +
    +    def _get_oauth_token(self, consumer, token):
    +        client = oauth1.Client(consumer['key'],
    +                               client_secret=consumer['secret'],
    +                               resource_owner_key=token.key,
    +                               resource_owner_secret=token.secret,
    +                               signature_method=oauth1.SIG_HMAC)
    +        endpoint = '/auth/tokens'
    +        url, headers, body = client.sign(self.base_url + endpoint,
    +                                         http_method='POST')
    +        headers.update({'Content-Type': 'application/json'})
    +        ref = {'auth': {'identity': {'oauth1': {}, 'methods': ['oauth1']}}}
    +        return endpoint, headers, ref
    +
    +    def _authorize_request_token(self, request_id):
    +        if isinstance(request_id, bytes):
    +            request_id = request_id.decode()
    +        return '/OS-OAUTH1/authorize/%s' % (request_id)
    +
    +    def _get_access_token(self):
    +        consumer = self._create_single_consumer()
    +        consumer_id = consumer['id']
    +        consumer_secret = consumer['secret']
    +        consumer = {'key': consumer_id, 'secret': consumer_secret}
    +
    +        url, headers = self._create_request_token(consumer, self.project_id)
    +        content = self.post(
    +            url, headers=headers,
    +            response_content_type='application/x-www-form-urlencoded')
    +        credentials = self._urllib_parse_qs_text_keys(content.result)
    +        request_key = credentials['oauth_token'][0]
    +        request_secret = credentials['oauth_token_secret'][0]
    +        request_token = oauth1.Token(request_key, request_secret)
    +
    +        url = self._authorize_request_token(request_key)
    +        body = {'roles': [{'id': self.role_id}]}
    +        resp = self.put(url, body=body, expected_status=http_client.OK)
    +        verifier = resp.result['token']['oauth_verifier']
    +
    +        request_token.set_verifier(verifier)
    +        url, headers = self._create_access_token(consumer, request_token)
    +        content = self.post(
    +            url, headers=headers,
    +            response_content_type='application/x-www-form-urlencoded')
    +        credentials = self._urllib_parse_qs_text_keys(content.result)
    +        access_key = credentials['oauth_token'][0]
    +        access_secret = credentials['oauth_token_secret'][0]
    +        access_token = oauth1.Token(access_key, access_secret)
    +
    +        url, headers, body = self._get_oauth_token(consumer, access_token)
    +        content = self.post(url, headers=headers, body=body)
    +        return access_key, content.headers['X-Subject-Token']
    +
    +    def test_access_token_ec2_credential(self):
    +        """Test creating ec2 credential from an oauth access token.
    +
    +        Call ``POST /credentials``.
    +        """
    +        access_key, token_id = self._get_access_token()
    +
    +        # Create the credential with the access token
    +        blob, ref = unit.new_ec2_credential(user_id=self.user_id,
    +                                            project_id=self.project_id)
    +        r = self.post('/credentials', body={'credential': ref}, token=token_id)
    +
    +        # We expect the response blob to contain the access_token_id
    +        ret_ref = ref.copy()
    +        ret_blob = blob.copy()
    +        ret_blob['access_token_id'] = access_key.decode('utf-8')
    +        ret_ref['blob'] = json.dumps(ret_blob)
    +        self.assertValidCredentialResponse(r, ref=ret_ref)
    +
    +        # Assert credential id is same as hash of access key id for
    +        # ec2 credentials
    +        access = blob['access'].encode('utf-8')
    +        self.assertEqual(hashlib.sha256(access).hexdigest(),
    +                         r.result['credential']['id'])
    +
    +        # Create a role assignment to ensure that it is ignored and only the
    +        # roles in the access token are used
    +        role = unit.new_role_ref(name='reader')
    +        role_id = role['id']
    +        PROVIDERS.role_api.create_role(role_id, role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            self.user_id, self.project_id, role_id)
    +
    +        ret_blob = json.loads(r.result['credential']['blob'])
    +        ec2token = self._test_get_token(
    +            access=ret_blob['access'], secret=ret_blob['secret'])
    +        ec2_roles = [role['id'] for role in ec2token['roles']]
    +        self.assertIn(self.role_id, ec2_roles)
    +        self.assertNotIn(role_id, ec2_roles)
    +
    +
    +class TestCredentialEc2(CredentialBaseTestCase):
    +    """Test v3 credential compatibility with ec2tokens."""
     
         def test_ec2_credential_signature_validate(self):
             """Test signature validation with a v3 ec2 credential."""
    @@ -514,15 +872,15 @@ def test_ec2_credential_signature_validate(self):
     
             cred_blob = json.loads(r.result['credential']['blob'])
             self.assertEqual(blob, cred_blob)
    -        self._validate_signature(access=cred_blob['access'],
    -                                 secret=cred_blob['secret'])
    +        self._test_get_token(access=cred_blob['access'],
    +                             secret=cred_blob['secret'])
     
         def test_ec2_credential_signature_validate_legacy(self):
             """Test signature validation with a legacy v3 ec2 credential."""
             cred_json, _ = self._create_dict_blob_credential()
             cred_blob = json.loads(cred_json)
    -        self._validate_signature(access=cred_blob['access'],
    -                                 secret=cred_blob['secret'])
    +        self._test_get_token(access=cred_blob['access'],
    +                             secret=cred_blob['secret'])
     
         def _get_ec2_cred_uri(self):
             return '/users/%s/credentials/OS-EC2' % self.user_id
    @@ -538,8 +896,8 @@ def test_ec2_create_credential(self):
             self.assertEqual(self.user_id, ec2_cred['user_id'])
             self.assertEqual(self.project_id, ec2_cred['tenant_id'])
             self.assertIsNone(ec2_cred['trust_id'])
    -        self._validate_signature(access=ec2_cred['access'],
    -                                 secret=ec2_cred['secret'])
    +        self._test_get_token(access=ec2_cred['access'],
    +                             secret=ec2_cred['secret'])
             uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])
             self.assertThat(ec2_cred['links']['self'],
                             matchers.EndsWith(uri))
    
  • releasenotes/notes/bug-1872733-2377f456a57ad32c.yaml+16 0 added
    @@ -0,0 +1,16 @@
    +---
    +critical:
    +  - |
    +    [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_]
    +    Fixed a critical security issue in which an authenticated user could
    +    escalate their privileges by altering a valid EC2 credential.
    +security:
    +  - |
    +    [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_]
    +    Fixed a critical security issue in which an authenticated user could
    +    escalate their privileges by altering a valid EC2 credential.
    +fixes:
    +  - |
    +    [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_]
    +    Fixed a critical security issue in which an authenticated user could
    +    escalate their privileges by altering a valid EC2 credential.
    
  • releasenotes/notes/bug-1872735-0989e51d2248ce1e.yaml+31 0 added
    @@ -0,0 +1,31 @@
    +---
    +critical:
    +  - |
    +    [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_]
    +    Fixed a security issue in which a trustee or an application credential user
    +    could create an EC2 credential or an application credential that would
    +    permit them to get a token that elevated their role assignments beyond the
    +    subset delegated to them in the trust or application credential. A new
    +    attribute ``app_cred_id`` is now automatically added to the access blob of
    +    an EC2 credential and the role list in the trust or application credential
    +    is respected.
    +security:
    +  - |
    +    [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_]
    +    Fixed a security issue in which a trustee or an application credential user
    +    could create an EC2 credential or an application credential that would
    +    permit them to get a token that elevated their role assignments beyond the
    +    subset delegated to them in the trust or application credential. A new
    +    attribute ``app_cred_id`` is now automatically added to the access blob of
    +    an EC2 credential and the role list in the trust or application credential
    +    is respected.
    +fixes:
    +  - |
    +    [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_]
    +    Fixed a security issue in which a trustee or an application credential user
    +    could create an EC2 credential or an application credential that would
    +    permit them to get a token that elevated their role assignments beyond the
    +    subset delegated to them in the trust or application credential. A new
    +    attribute ``app_cred_id`` is now automatically added to the access blob of
    +    an EC2 credential and the role list in the trust or application credential
    +    is respected.
    
  • releasenotes/notes/bug-1872755-2c81d3267b89f124.yaml+19 0 added
    @@ -0,0 +1,19 @@
    +---
    +security:
    +  - |
    +    [`bug 1872755 <https://bugs.launchpad.net/keystone/+bug/1872755>`_]
    +    Added validation to the EC2 credentials update API to ensure the metadata
    +    labels 'trust_id' and 'app_cred_id' are not altered by the user. These
    +    labels are used by keystone to determine the scope allowed by the
    +    credential, and altering these automatic labels could enable an EC2
    +    credential holder to elevate their access beyond what is permitted by the
    +    application credential or trust that was used to create the EC2 credential.
    +fixes:
    +  - |
    +    [`bug 1872755 <https://bugs.launchpad.net/keystone/+bug/1872755>`_]
    +    Added validation to the EC2 credentials update API to ensure the metadata
    +    labels 'trust_id' and 'app_cred_id' are not altered by the user. These
    +    labels are used by keystone to determine the scope allowed by the
    +    credential, and altering these automatic labels could enable an EC2
    +    credential holder to elevate their access beyond what is permitted by the
    +    application credential or trust that was used to create the EC2 credential.
    
37e9907a176d

Fix security issues with EC2 credentials

https://github.com/openstack/keystoneColleen MurphyApr 14, 2020via ghsa
8 files changed · +603 59
  • keystone/api/credentials.py+53 18 modified
    @@ -60,30 +60,41 @@ def _blob_to_json(ref):
                 ref['blob'] = jsonutils.dumps(blob)
             return ref
     
    -    def _assign_unique_id(self, ref, trust_id=None):
    +    def _validate_blob_json(self, ref):
    +        try:
    +            blob = jsonutils.loads(ref.get('blob'))
    +        except (ValueError, TabError):
    +            raise exception.ValidationError(
    +                message=_('Invalid blob in credential'))
    +        if not blob or not isinstance(blob, dict):
    +            raise exception.ValidationError(attribute='blob',
    +                                            target='credential')
    +        if blob.get('access') is None:
    +            raise exception.ValidationError(attribute='access',
    +                                            target='credential')
    +        return blob
    +
    +    def _assign_unique_id(
    +            self, ref, trust_id=None, app_cred_id=None, access_token_id=None):
             # Generates an assigns a unique identifier to a credential reference.
             if ref.get('type', '').lower() == 'ec2':
    -            try:
    -                blob = jsonutils.loads(ref.get('blob'))
    -            except (ValueError, TabError):
    -                raise exception.ValidationError(
    -                    message=_('Invalid blob in credential'))
    -            if not blob or not isinstance(blob, dict):
    -                raise exception.ValidationError(attribute='blob',
    -                                                target='credential')
    -            if blob.get('access') is None:
    -                raise exception.ValidationError(attribute='access',
    -                                                target='credential')
    -
    +            blob = self._validate_blob_json(ref)
                 ref = ref.copy()
                 ref['id'] = hashlib.sha256(
                     blob['access'].encode('utf8')).hexdigest()
    -            # update the blob with the trust_id, so credentials created with
    -            # a trust scoped token will result in trust scoped tokens when
    -            # authentication via ec2tokens happens
    +            # update the blob with the trust_id or app_cred_id, so credentials
    +            # created with a trust- or app cred-scoped token will result in
    +            # trust- or app cred-scoped tokens when authentication via
    +            # ec2tokens happens
                 if trust_id is not None:
                     blob['trust_id'] = trust_id
                     ref['blob'] = jsonutils.dumps(blob)
    +            if app_cred_id is not None:
    +                blob['app_cred_id'] = app_cred_id
    +                ref['blob'] = jsonutils.dumps(blob)
    +            if access_token_id is not None:
    +                blob['access_token_id'] = access_token_id
    +                ref['blob'] = jsonutils.dumps(blob)
                 return ref
             else:
                 return super(CredentialResource, self)._assign_unique_id(ref)
    @@ -146,23 +157,47 @@ def post(self):
             )
             validation.lazy_validate(schema.credential_create, credential)
             trust_id = getattr(self.oslo_context, 'trust_id', None)
    +        app_cred_id = getattr(
    +            self.auth_context['token'], 'application_credential_id', None)
    +        access_token_id = getattr(
    +            self.auth_context['token'], 'access_token_id', None)
             ref = self._assign_unique_id(
    -            self._normalize_dict(credential), trust_id=trust_id)
    +            self._normalize_dict(credential),
    +            trust_id=trust_id, app_cred_id=app_cred_id,
    +            access_token_id=access_token_id)
             ref = PROVIDERS.credential_api.create_credential(
                 ref['id'], ref, initiator=self.audit_initiator)
             return self.wrap_member(ref), http.client.CREATED
     
    +    def _validate_blob_update_keys(self, credential, ref):
    +        if credential.get('type', '').lower() == 'ec2':
    +            new_blob = self._validate_blob_json(ref)
    +            old_blob = credential.get('blob')
    +            if isinstance(old_blob, str):
    +                old_blob = jsonutils.loads(old_blob)
    +            # if there was a scope set, prevent changing it or unsetting it
    +            for key in ['trust_id', 'app_cred_id', 'access_token_id']:
    +                if old_blob.get(key) != new_blob.get(key):
    +                    message = _('%s can not be updated for credential') % key
    +                    raise exception.ValidationError(message=message)
    +
         def patch(self, credential_id):
             # Update Credential
             ENFORCER.enforce_call(
                 action='identity:update_credential',
                 build_target=_build_target_enforcement
             )
    -        PROVIDERS.credential_api.get_credential(credential_id)
    +        current = PROVIDERS.credential_api.get_credential(credential_id)
     
             credential = self.request_body_json.get('credential', {})
             validation.lazy_validate(schema.credential_update, credential)
    +        self._validate_blob_update_keys(current.copy(), credential.copy())
             self._require_matching_id(credential)
    +        # Check that the user hasn't illegally modified the owner or scope
    +        target = {'credential': dict(current, **credential)}
    +        ENFORCER.enforce_call(
    +            action='identity:update_credential', target_attr=target
    +        )
             ref = PROVIDERS.credential_api.update_credential(
                 credential_id, credential)
             return self.wrap_member(ref)
    
  • keystone/api/_shared/EC2_S3_Resource.py+40 5 modified
    @@ -113,7 +113,9 @@ def handle_authenticate(self):
                 project_id=cred.get('project_id'),
                 access=loaded.get('access'),
                 secret=loaded.get('secret'),
    -            trust_id=loaded.get('trust_id')
    +            trust_id=loaded.get('trust_id'),
    +            app_cred_id=loaded.get('app_cred_id'),
    +            access_token_id=loaded.get('access_token_id')
             )
     
             # validate the signature
    @@ -132,8 +134,34 @@ def handle_authenticate(self):
                 raise ks_exceptions.Unauthorized from e
     
             self._check_timestamp(credentials)
    -        roles = PROVIDERS.assignment_api.get_roles_for_user_and_project(
    -            user_ref['id'], project_ref['id'])
    +
    +        trustee_user_id = None
    +        auth_context = None
    +        if cred_data['trust_id']:
    +            trust = PROVIDERS.trust_api.get_trust(cred_data['trust_id'])
    +            roles = [r['id'] for r in trust['roles']]
    +            # NOTE(cmurphy): if this credential was created using a
    +            # trust-scoped token with impersonation, the user_id will be for
    +            # the trustor, not the trustee. In this case, issuing a
    +            # trust-scoped token to the trustor will fail. In order to get a
    +            # trust-scoped token, use the user ID of the trustee. With
    +            # impersonation, the resulting token will still be for the trustor.
    +            # Without impersonation, the token will be for the trustee.
    +            if trust['impersonation'] is True:
    +                trustee_user_id = trust['trustee_user_id']
    +        elif cred_data['app_cred_id']:
    +            ac_client = PROVIDERS.application_credential_api
    +            app_cred = ac_client.get_application_credential(
    +                cred_data['app_cred_id'])
    +            roles = [r['id'] for r in app_cred['roles']]
    +        elif cred_data['access_token_id']:
    +            access_token = PROVIDERS.oauth_api.get_access_token(
    +                cred_data['access_token_id'])
    +            roles = jsonutils.loads(access_token['role_ids'])
    +            auth_context = {'access_token_id': cred_data['access_token_id']}
    +        else:
    +            roles = PROVIDERS.assignment_api.get_roles_for_user_and_project(
    +                user_ref['id'], project_ref['id'])
     
             if not roles:
                 raise ks_exceptions.Unauthorized(_('User not valid for project.'))
    @@ -144,7 +172,14 @@ def handle_authenticate(self):
     
             method_names = ['ec2credential']
     
    +        if trustee_user_id:
    +            user_id = trustee_user_id
    +        else:
    +            user_id = user_ref['id']
             token = PROVIDERS.token_provider_api.issue_token(
    -            user_id=user_ref['id'], method_names=method_names,
    -            project_id=project_ref['id'])
    +            user_id=user_id, method_names=method_names,
    +            project_id=project_ref['id'],
    +            trust_id=cred_data['trust_id'],
    +            app_cred_id=cred_data['app_cred_id'],
    +            auth_context=auth_context)
             return token
    
  • keystone/api/users.py+20 2 modified
    @@ -559,6 +559,25 @@ def _normalize_role_list(app_cred_roles):
                         role['name']))
             return roles
     
    +    def _get_roles(self, app_cred_data, token):
    +        if app_cred_data.get('roles'):
    +            roles = self._normalize_role_list(app_cred_data['roles'])
    +            # NOTE(cmurphy): The user is not allowed to add a role that is not
    +            # in their token. This is to prevent trustees or application
    +            # credential users from escallating their privileges to include
    +            # additional roles that the trustor or application credential
    +            # creator has assigned on the project.
    +            token_roles = [r['id'] for r in token.roles]
    +            for role in roles:
    +                if role['id'] not in token_roles:
    +                    detail = _('Cannot create an application credential with '
    +                               'unassigned role')
    +                    raise ks_exception.ApplicationCredentialValidationError(
    +                        detail=detail)
    +        else:
    +            roles = token.roles
    +        return roles
    +
         def get(self, user_id):
             """List application credentials for user.
     
    @@ -594,8 +613,7 @@ def post(self, user_id):
                 app_cred_data['secret'] = self._generate_secret()
             app_cred_data['user_id'] = user_id
             app_cred_data['project_id'] = project_id
    -        app_cred_data['roles'] = self._normalize_role_list(
    -            app_cred_data.get('roles', token.roles))
    +        app_cred_data['roles'] = self._get_roles(app_cred_data, token)
             if app_cred_data.get('expires_at'):
                 app_cred_data['expires_at'] = utils.parse_expiration_date(
                     app_cred_data['expires_at'])
    
  • keystone/tests/unit/test_v3_application_credential.py+31 0 modified
    @@ -174,6 +174,37 @@ def test_create_application_credential_with_application_credential(self):
                        expected_status_code=http.client.FORBIDDEN,
                        headers={'X-Auth-Token': token})
     
    +    def test_create_application_credential_with_trust(self):
    +        second_role = unit.new_role_ref(name='reader')
    +        PROVIDERS.role_api.create_role(second_role['id'], second_role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            self.user_id, self.project_id, second_role['id'])
    +        with self.test_client() as c:
    +            pw_token = self.get_scoped_token()
    +            # create a self-trust - only the roles are important for this test
    +            trust_ref = unit.new_trust_ref(
    +                trustor_user_id=self.user_id,
    +                trustee_user_id=self.user_id,
    +                project_id=self.project_id,
    +                role_ids=[second_role['id']])
    +            resp = c.post('/v3/OS-TRUST/trusts',
    +                          headers={'X-Auth-Token': pw_token},
    +                          json={'trust': trust_ref})
    +            trust_id = resp.json['trust']['id']
    +            trust_auth = self.build_authentication_request(
    +                user_id=self.user_id,
    +                password=self.user['password'],
    +                trust_id=trust_id)
    +            trust_token = self.v3_create_token(
    +                trust_auth).headers['X-Subject-Token']
    +            app_cred = self._app_cred_body(roles=[{'id': self.role_id}])
    +            # only the roles from the trust token should be allowed, even if
    +            # the user has the role assigned on the project
    +            c.post('/v3/users/%s/application_credentials' % self.user_id,
    +                   headers={'X-Auth-Token': trust_token},
    +                   json=app_cred,
    +                   expected_status_code=http.client.BAD_REQUEST)
    +
         def test_create_application_credential_allow_recursion(self):
             with self.test_client() as c:
                 roles = [{'id': self.role_id}]
    
  • keystone/tests/unit/test_v3_credential.py+393 34 modified
    @@ -21,12 +21,14 @@
     from keystoneclient.contrib.ec2 import utils as ec2_utils
     from oslo_db import exception as oslo_db_exception
     from testtools import matchers
    +import urllib
     
     from keystone.api import ec2tokens
     from keystone.common import provider_api
     from keystone.common import utils
     from keystone.credential.providers import fernet as credential_fernet
     from keystone import exception
    +from keystone import oauth1
     from keystone.tests import unit
     from keystone.tests.unit import ksfixtures
     from keystone.tests.unit import test_v3
    @@ -63,6 +65,33 @@ def _create_dict_blob_credential(self):
     
             return json.dumps(blob), credential_id
     
    +    def _test_get_token(self, access, secret):
    +        """Test signature validation with the access/secret provided."""
    +        signer = ec2_utils.Ec2Signer(secret)
    +        params = {'SignatureMethod': 'HmacSHA256',
    +                  'SignatureVersion': '2',
    +                  'AWSAccessKeyId': access}
    +        request = {'host': 'foo',
    +                   'verb': 'GET',
    +                   'path': '/bar',
    +                   'params': params}
    +        signature = signer.generate(request)
    +
    +        # Now make a request to validate the signed dummy request via the
    +        # ec2tokens API.  This proves the v3 ec2 credentials actually work.
    +        sig_ref = {'access': access,
    +                   'signature': signature,
    +                   'host': 'foo',
    +                   'verb': 'GET',
    +                   'path': '/bar',
    +                   'params': params}
    +        r = self.post(
    +            '/ec2tokens',
    +            body={'ec2Credentials': sig_ref},
    +            expected_status=http.client.OK)
    +        self.assertValidTokenResponse(r)
    +        return r.result['token']
    +
     
     class CredentialTestCase(CredentialBaseTestCase):
         """Test credential CRUD."""
    @@ -258,6 +287,126 @@ def test_update_credential_to_ec2_with_previously_set_project_id(self):
                     'credential_id': credential_id},
                 body={'credential': update_ref})
     
    +    def test_update_credential_non_owner(self):
    +        """Call ``PATCH /credentials/{credential_id}``."""
    +        alt_user = unit.create_user(
    +            PROVIDERS.identity_api, domain_id=self.domain_id)
    +        alt_user_id = alt_user['id']
    +        alt_project = unit.new_project_ref(domain_id=self.domain_id)
    +        alt_project_id = alt_project['id']
    +        PROVIDERS.resource_api.create_project(
    +            alt_project['id'], alt_project)
    +        alt_role = unit.new_role_ref(name='reader')
    +        alt_role_id = alt_role['id']
    +        PROVIDERS.role_api.create_role(alt_role_id, alt_role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            alt_user_id, alt_project_id, alt_role_id)
    +        auth = self.build_authentication_request(
    +            user_id=alt_user_id,
    +            password=alt_user['password'],
    +            project_id=alt_project_id)
    +        ref = unit.new_credential_ref(user_id=alt_user_id,
    +                                      project_id=alt_project_id)
    +        r = self.post(
    +            '/credentials',
    +            auth=auth,
    +            body={'credential': ref})
    +        self.assertValidCredentialResponse(r, ref)
    +        credential_id = r.result.get('credential')['id']
    +
    +        # Cannot change the credential to be owned by another user
    +        update_ref = {'user_id': self.user_id, 'project_id': self.project_id}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            expected_status=403,
    +            auth=auth,
    +            body={'credential': update_ref})
    +
    +    def test_update_ec2_credential_change_trust_id(self):
    +        """Call ``PATCH /credentials/{credential_id}``."""
    +        blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
    +                                            project_id=self.project_id)
    +        blob['trust_id'] = uuid.uuid4().hex
    +        ref['blob'] = json.dumps(blob)
    +        r = self.post(
    +            '/credentials',
    +            body={'credential': ref})
    +        self.assertValidCredentialResponse(r, ref)
    +        credential_id = r.result.get('credential')['id']
    +        # Try changing to a different trust
    +        blob['trust_id'] = uuid.uuid4().hex
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http.client.BAD_REQUEST)
    +        # Try removing the trust
    +        del blob['trust_id']
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http.client.BAD_REQUEST)
    +
    +    def test_update_ec2_credential_change_app_cred_id(self):
    +        """Call ``PATCH /credentials/{credential_id}``."""
    +        blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
    +                                            project_id=self.project_id)
    +        blob['app_cred_id'] = uuid.uuid4().hex
    +        ref['blob'] = json.dumps(blob)
    +        r = self.post(
    +            '/credentials',
    +            body={'credential': ref})
    +        self.assertValidCredentialResponse(r, ref)
    +        credential_id = r.result.get('credential')['id']
    +        # Try changing to a different app cred
    +        blob['app_cred_id'] = uuid.uuid4().hex
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http.client.BAD_REQUEST)
    +        # Try removing the app cred
    +        del blob['app_cred_id']
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http.client.BAD_REQUEST)
    +
    +    def test_update_ec2_credential_change_access_token_id(self):
    +        """Call ``PATCH /credentials/{credential_id}``."""
    +        blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
    +                                            project_id=self.project_id)
    +        blob['access_token_id'] = uuid.uuid4().hex
    +        ref['blob'] = json.dumps(blob)
    +        r = self.post(
    +            '/credentials',
    +            body={'credential': ref})
    +        self.assertValidCredentialResponse(r, ref)
    +        credential_id = r.result.get('credential')['id']
    +        # Try changing to a different access token
    +        blob['access_token_id'] = uuid.uuid4().hex
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http.client.BAD_REQUEST)
    +        # Try removing the access token
    +        del blob['access_token_id']
    +        update_ref = {'blob': json.dumps(blob)}
    +        self.patch(
    +            '/credentials/%(credential_id)s' % {
    +                'credential_id': credential_id},
    +            body={'credential': update_ref},
    +            expected_status=http.client.BAD_REQUEST)
    +
         def test_delete_credential(self):
             """Call ``DELETE /credentials/{credential_id}``."""
             self.delete(
    @@ -393,7 +542,7 @@ def test_create_credential_with_admin_token(self):
             self.assertValidCredentialResponse(r, ref)
     
     
    -class TestCredentialTrustScoped(test_v3.RestfulTestCase):
    +class TestCredentialTrustScoped(CredentialBaseTestCase):
         """Test credential with trust scoped token."""
     
         def setUp(self):
    @@ -446,7 +595,7 @@ def test_trust_scoped_ec2_credential(self):
             token_id = r.headers.get('X-Subject-Token')
     
             # Create the credential with the trust scoped token
    -        blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
    +        blob, ref = unit.new_ec2_credential(user_id=self.user_id,
                                                 project_id=self.project_id)
             r = self.post('/credentials', body={'credential': ref}, token=token_id)
     
    @@ -463,6 +612,21 @@ def test_trust_scoped_ec2_credential(self):
             self.assertEqual(hashlib.sha256(access).hexdigest(),
                              r.result['credential']['id'])
     
    +        # Create a role assignment to ensure that it is ignored and only the
    +        # trust-delegated roles are used
    +        role = unit.new_role_ref(name='reader')
    +        role_id = role['id']
    +        PROVIDERS.role_api.create_role(role_id, role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            self.user_id, self.project_id, role_id)
    +
    +        ret_blob = json.loads(r.result['credential']['blob'])
    +        ec2token = self._test_get_token(
    +            access=ret_blob['access'], secret=ret_blob['secret'])
    +        ec2_roles = [role['id'] for role in ec2token['roles']]
    +        self.assertIn(self.role_id, ec2_roles)
    +        self.assertNotIn(role_id, ec2_roles)
    +
             # Create second ec2 credential with the same access key id and check
             # for conflict.
             self.post(
    @@ -472,34 +636,229 @@ def test_trust_scoped_ec2_credential(self):
                 expected_status=http.client.CONFLICT)
     
     
    -class TestCredentialEc2(CredentialBaseTestCase):
    -    """Test v3 credential compatibility with ec2tokens."""
    +class TestCredentialAppCreds(CredentialBaseTestCase):
    +    """Test credential with application credential token."""
     
    -    def _validate_signature(self, access, secret):
    -        """Test signature validation with the access/secret provided."""
    -        signer = ec2_utils.Ec2Signer(secret)
    -        params = {'SignatureMethod': 'HmacSHA256',
    -                  'SignatureVersion': '2',
    -                  'AWSAccessKeyId': access}
    -        request = {'host': 'foo',
    -                   'verb': 'GET',
    -                   'path': '/bar',
    -                   'params': params}
    -        signature = signer.generate(request)
    +    def setUp(self):
    +        super(TestCredentialAppCreds, self).setUp()
    +        self.useFixture(
    +            ksfixtures.KeyRepository(
    +                self.config_fixture,
    +                'credential',
    +                credential_fernet.MAX_ACTIVE_KEYS
    +            )
    +        )
     
    -        # Now make a request to validate the signed dummy request via the
    -        # ec2tokens API.  This proves the v3 ec2 credentials actually work.
    -        sig_ref = {'access': access,
    -                   'signature': signature,
    -                   'host': 'foo',
    -                   'verb': 'GET',
    -                   'path': '/bar',
    -                   'params': params}
    -        r = self.post(
    -            '/ec2tokens',
    -            body={'ec2Credentials': sig_ref},
    -            expected_status=http.client.OK)
    -        self.assertValidTokenResponse(r)
    +    def test_app_cred_ec2_credential(self):
    +        """Test creating ec2 credential from an application credential.
    +
    +        Call ``POST /credentials``.
    +        """
    +        # Create the app cred
    +        ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}])
    +        del ref['id']
    +        r = self.post('/users/%s/application_credentials' % self.user_id,
    +                      body={'application_credential': ref})
    +        app_cred = r.result['application_credential']
    +
    +        # Get an application credential token
    +        auth_data = self.build_authentication_request(
    +            app_cred_id=app_cred['id'],
    +            secret=app_cred['secret'])
    +        r = self.v3_create_token(auth_data)
    +        token_id = r.headers.get('X-Subject-Token')
    +
    +        # Create the credential with the app cred token
    +        blob, ref = unit.new_ec2_credential(user_id=self.user_id,
    +                                            project_id=self.project_id)
    +        r = self.post('/credentials', body={'credential': ref}, token=token_id)
    +
    +        # We expect the response blob to contain the app_cred_id
    +        ret_ref = ref.copy()
    +        ret_blob = blob.copy()
    +        ret_blob['app_cred_id'] = app_cred['id']
    +        ret_ref['blob'] = json.dumps(ret_blob)
    +        self.assertValidCredentialResponse(r, ref=ret_ref)
    +
    +        # Assert credential id is same as hash of access key id for
    +        # ec2 credentials
    +        access = blob['access'].encode('utf-8')
    +        self.assertEqual(hashlib.sha256(access).hexdigest(),
    +                         r.result['credential']['id'])
    +
    +        # Create a role assignment to ensure that it is ignored and only the
    +        # roles in the app cred are used
    +        role = unit.new_role_ref(name='reader')
    +        role_id = role['id']
    +        PROVIDERS.role_api.create_role(role_id, role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            self.user_id, self.project_id, role_id)
    +
    +        ret_blob = json.loads(r.result['credential']['blob'])
    +        ec2token = self._test_get_token(
    +            access=ret_blob['access'], secret=ret_blob['secret'])
    +        ec2_roles = [role['id'] for role in ec2token['roles']]
    +        self.assertIn(self.role_id, ec2_roles)
    +        self.assertNotIn(role_id, ec2_roles)
    +
    +        # Create second ec2 credential with the same access key id and check
    +        # for conflict.
    +        self.post(
    +            '/credentials',
    +            body={'credential': ref},
    +            token=token_id,
    +            expected_status=http.client.CONFLICT)
    +
    +
    +class TestCredentialAccessToken(CredentialBaseTestCase):
    +    """Test credential with access token."""
    +
    +    def setUp(self):
    +        super(TestCredentialAccessToken, self).setUp()
    +        self.useFixture(
    +            ksfixtures.KeyRepository(
    +                self.config_fixture,
    +                'credential',
    +                credential_fernet.MAX_ACTIVE_KEYS
    +            )
    +        )
    +        self.base_url = 'http://localhost/v3'
    +
    +    def _urllib_parse_qs_text_keys(self, content):
    +        results = urllib.parse.parse_qs(content)
    +        return {key.decode('utf-8'): value for key, value in results.items()}
    +
    +    def _create_single_consumer(self):
    +        endpoint = '/OS-OAUTH1/consumers'
    +
    +        ref = {'description': uuid.uuid4().hex}
    +        resp = self.post(endpoint, body={'consumer': ref})
    +        return resp.result['consumer']
    +
    +    def _create_request_token(self, consumer, project_id, base_url=None):
    +        endpoint = '/OS-OAUTH1/request_token'
    +        client = oauth1.Client(consumer['key'],
    +                               client_secret=consumer['secret'],
    +                               signature_method=oauth1.SIG_HMAC,
    +                               callback_uri="oob")
    +        headers = {'requested_project_id': project_id}
    +        if not base_url:
    +            base_url = self.base_url
    +        url, headers, body = client.sign(base_url + endpoint,
    +                                         http_method='POST',
    +                                         headers=headers)
    +        return endpoint, headers
    +
    +    def _create_access_token(self, consumer, token, base_url=None):
    +        endpoint = '/OS-OAUTH1/access_token'
    +        client = oauth1.Client(consumer['key'],
    +                               client_secret=consumer['secret'],
    +                               resource_owner_key=token.key,
    +                               resource_owner_secret=token.secret,
    +                               signature_method=oauth1.SIG_HMAC,
    +                               verifier=token.verifier)
    +        if not base_url:
    +            base_url = self.base_url
    +        url, headers, body = client.sign(base_url + endpoint,
    +                                         http_method='POST')
    +        headers.update({'Content-Type': 'application/json'})
    +        return endpoint, headers
    +
    +    def _get_oauth_token(self, consumer, token):
    +        client = oauth1.Client(consumer['key'],
    +                               client_secret=consumer['secret'],
    +                               resource_owner_key=token.key,
    +                               resource_owner_secret=token.secret,
    +                               signature_method=oauth1.SIG_HMAC)
    +        endpoint = '/auth/tokens'
    +        url, headers, body = client.sign(self.base_url + endpoint,
    +                                         http_method='POST')
    +        headers.update({'Content-Type': 'application/json'})
    +        ref = {'auth': {'identity': {'oauth1': {}, 'methods': ['oauth1']}}}
    +        return endpoint, headers, ref
    +
    +    def _authorize_request_token(self, request_id):
    +        if isinstance(request_id, bytes):
    +            request_id = request_id.decode()
    +        return '/OS-OAUTH1/authorize/%s' % (request_id)
    +
    +    def _get_access_token(self):
    +        consumer = self._create_single_consumer()
    +        consumer_id = consumer['id']
    +        consumer_secret = consumer['secret']
    +        consumer = {'key': consumer_id, 'secret': consumer_secret}
    +
    +        url, headers = self._create_request_token(consumer, self.project_id)
    +        content = self.post(
    +            url, headers=headers,
    +            response_content_type='application/x-www-form-urlencoded')
    +        credentials = self._urllib_parse_qs_text_keys(content.result)
    +        request_key = credentials['oauth_token'][0]
    +        request_secret = credentials['oauth_token_secret'][0]
    +        request_token = oauth1.Token(request_key, request_secret)
    +
    +        url = self._authorize_request_token(request_key)
    +        body = {'roles': [{'id': self.role_id}]}
    +        resp = self.put(url, body=body, expected_status=http.client.OK)
    +        verifier = resp.result['token']['oauth_verifier']
    +
    +        request_token.set_verifier(verifier)
    +        url, headers = self._create_access_token(consumer, request_token)
    +        content = self.post(
    +            url, headers=headers,
    +            response_content_type='application/x-www-form-urlencoded')
    +        credentials = self._urllib_parse_qs_text_keys(content.result)
    +        access_key = credentials['oauth_token'][0]
    +        access_secret = credentials['oauth_token_secret'][0]
    +        access_token = oauth1.Token(access_key, access_secret)
    +
    +        url, headers, body = self._get_oauth_token(consumer, access_token)
    +        content = self.post(url, headers=headers, body=body)
    +        return access_key, content.headers['X-Subject-Token']
    +
    +    def test_access_token_ec2_credential(self):
    +        """Test creating ec2 credential from an oauth access token.
    +
    +        Call ``POST /credentials``.
    +        """
    +        access_key, token_id = self._get_access_token()
    +
    +        # Create the credential with the access token
    +        blob, ref = unit.new_ec2_credential(user_id=self.user_id,
    +                                            project_id=self.project_id)
    +        r = self.post('/credentials', body={'credential': ref}, token=token_id)
    +
    +        # We expect the response blob to contain the access_token_id
    +        ret_ref = ref.copy()
    +        ret_blob = blob.copy()
    +        ret_blob['access_token_id'] = access_key.decode('utf-8')
    +        ret_ref['blob'] = json.dumps(ret_blob)
    +        self.assertValidCredentialResponse(r, ref=ret_ref)
    +
    +        # Assert credential id is same as hash of access key id for
    +        # ec2 credentials
    +        access = blob['access'].encode('utf-8')
    +        self.assertEqual(hashlib.sha256(access).hexdigest(),
    +                         r.result['credential']['id'])
    +
    +        # Create a role assignment to ensure that it is ignored and only the
    +        # roles in the access token are used
    +        role = unit.new_role_ref(name='reader')
    +        role_id = role['id']
    +        PROVIDERS.role_api.create_role(role_id, role)
    +        PROVIDERS.assignment_api.add_role_to_user_and_project(
    +            self.user_id, self.project_id, role_id)
    +
    +        ret_blob = json.loads(r.result['credential']['blob'])
    +        ec2token = self._test_get_token(
    +            access=ret_blob['access'], secret=ret_blob['secret'])
    +        ec2_roles = [role['id'] for role in ec2token['roles']]
    +        self.assertIn(self.role_id, ec2_roles)
    +        self.assertNotIn(role_id, ec2_roles)
    +
    +
    +class TestCredentialEc2(CredentialBaseTestCase):
    +    """Test v3 credential compatibility with ec2tokens."""
     
         def test_ec2_credential_signature_validate(self):
             """Test signature validation with a v3 ec2 credential."""
    @@ -514,15 +873,15 @@ def test_ec2_credential_signature_validate(self):
     
             cred_blob = json.loads(r.result['credential']['blob'])
             self.assertEqual(blob, cred_blob)
    -        self._validate_signature(access=cred_blob['access'],
    -                                 secret=cred_blob['secret'])
    +        self._test_get_token(access=cred_blob['access'],
    +                             secret=cred_blob['secret'])
     
         def test_ec2_credential_signature_validate_legacy(self):
             """Test signature validation with a legacy v3 ec2 credential."""
             cred_json, _ = self._create_dict_blob_credential()
             cred_blob = json.loads(cred_json)
    -        self._validate_signature(access=cred_blob['access'],
    -                                 secret=cred_blob['secret'])
    +        self._test_get_token(access=cred_blob['access'],
    +                             secret=cred_blob['secret'])
     
         def _get_ec2_cred_uri(self):
             return '/users/%s/credentials/OS-EC2' % self.user_id
    @@ -538,8 +897,8 @@ def test_ec2_create_credential(self):
             self.assertEqual(self.user_id, ec2_cred['user_id'])
             self.assertEqual(self.project_id, ec2_cred['tenant_id'])
             self.assertIsNone(ec2_cred['trust_id'])
    -        self._validate_signature(access=ec2_cred['access'],
    -                                 secret=ec2_cred['secret'])
    +        self._test_get_token(access=ec2_cred['access'],
    +                             secret=ec2_cred['secret'])
             uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])
             self.assertThat(ec2_cred['links']['self'],
                             matchers.EndsWith(uri))
    
  • releasenotes/notes/bug-1872733-2377f456a57ad32c.yaml+16 0 added
    @@ -0,0 +1,16 @@
    +---
    +critical:
    +  - |
    +    [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_]
    +    Fixed a critical security issue in which an authenticated user could
    +    escalate their privileges by altering a valid EC2 credential.
    +security:
    +  - |
    +    [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_]
    +    Fixed a critical security issue in which an authenticated user could
    +    escalate their privileges by altering a valid EC2 credential.
    +fixes:
    +  - |
    +    [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_]
    +    Fixed a critical security issue in which an authenticated user could
    +    escalate their privileges by altering a valid EC2 credential.
    
  • releasenotes/notes/bug-1872735-0989e51d2248ce1e.yaml+31 0 added
    @@ -0,0 +1,31 @@
    +---
    +critical:
    +  - |
    +    [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_]
    +    Fixed a security issue in which a trustee or an application credential user
    +    could create an EC2 credential or an application credential that would
    +    permit them to get a token that elevated their role assignments beyond the
    +    subset delegated to them in the trust or application credential. A new
    +    attribute ``app_cred_id`` is now automatically added to the access blob of
    +    an EC2 credential and the role list in the trust or application credential
    +    is respected.
    +security:
    +  - |
    +    [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_]
    +    Fixed a security issue in which a trustee or an application credential user
    +    could create an EC2 credential or an application credential that would
    +    permit them to get a token that elevated their role assignments beyond the
    +    subset delegated to them in the trust or application credential. A new
    +    attribute ``app_cred_id`` is now automatically added to the access blob of
    +    an EC2 credential and the role list in the trust or application credential
    +    is respected.
    +fixes:
    +  - |
    +    [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_]
    +    Fixed a security issue in which a trustee or an application credential user
    +    could create an EC2 credential or an application credential that would
    +    permit them to get a token that elevated their role assignments beyond the
    +    subset delegated to them in the trust or application credential. A new
    +    attribute ``app_cred_id`` is now automatically added to the access blob of
    +    an EC2 credential and the role list in the trust or application credential
    +    is respected.
    
  • releasenotes/notes/bug-1872755-2c81d3267b89f124.yaml+19 0 added
    @@ -0,0 +1,19 @@
    +---
    +security:
    +  - |
    +    [`bug 1872755 <https://bugs.launchpad.net/keystone/+bug/1872755>`_]
    +    Added validation to the EC2 credentials update API to ensure the metadata
    +    labels 'trust_id' and 'app_cred_id' are not altered by the user. These
    +    labels are used by keystone to determine the scope allowed by the
    +    credential, and altering these automatic labels could enable an EC2
    +    credential holder to elevate their access beyond what is permitted by the
    +    application credential or trust that was used to create the EC2 credential.
    +fixes:
    +  - |
    +    [`bug 1872755 <https://bugs.launchpad.net/keystone/+bug/1872755>`_]
    +    Added validation to the EC2 credentials update API to ensure the metadata
    +    labels 'trust_id' and 'app_cred_id' are not altered by the user. These
    +    labels are used by keystone to determine the scope allowed by the
    +    credential, and altering these automatic labels could enable an EC2
    +    credential holder to elevate their access beyond what is permitted by the
    +    application credential or trust that was used to create the EC2 credential.
    

Vulnerability mechanics

Generated on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

16

News mentions

0

No linked articles in our index yet.