CVE-2020-12691
Description
An issue was discovered in OpenStack Keystone before 15.0.1, and 16.0.0. Any authenticated user can create an EC2 credential for themselves for a project that they have a specified role on, and then perform an update to the credential user and project, allowing them to masquerade as another user. This potentially allows a malicious user to act as the admin on a project another user has the admin role on, which can effectively grant that user global admin privileges.
AI Insight
LLM-synthesized narrative grounded in this CVE's description and references.
In OpenStack Keystone, an authenticated user can create an EC2 credential and modify its owner to impersonate another user, potentially gaining global admin privileges.
Vulnerability
CVE-2020-12691 is an authorization bypass in OpenStack Keystone's EC2 credentials API. An authenticated user can create an EC2 credential for a project they have a role on, then perform an update to change the credential's user and project fields, effectively masquerading as another user [1][2]. The root cause is insufficient validation of credential owner modification, allowing the attacker to reassign the credential to a different user after creation.
Exploitation
Exploitation requires an authenticated user with any role on a target project. The attacker first creates an EC2 credential for themselves for that project, then sends an update request to the credential endpoint to change the user and project fields to those of another user (e.g., an admin). No additional privileges or authentication tokens beyond the initial scoped context are needed [1]. The vulnerability was present in Keystone versions before 15.0.1 and in 16.0.0.
Impact
A successful attack allows the malicious user to impersonate any other user, potentially gaining that user's roles and permissions. If the impersonated target is an admin on a project, the attacker can effectively gain global admin privileges across the OpenStack deployment [1][2]. This undermines the entire trust model of Keystone's identity service.
Mitigation
The vulnerability is fixed in Keystone 15.0.1 and all later releases. Patches were merged into the stable/stein and stable/train branches (commits 95b2bbe and 40cbb7b) [3][4]. Operators should upgrade to a patched version or apply the relevant patches immediately.
- security - Re: [OSSA-2020-004] Keystone: Keystone credential endpoints allow owner modification and are not protected from a scoped context (CVE PENDING)
- NVD - CVE-2020-12691
- Merge "Fix security issues with EC2 credentials" into stable/stein · openstack/keystone@95b2bbe
- Merge "Fix security issues with EC2 credentials" into stable/train · openstack/keystone@40cbb7b
AI Insight generated on May 21, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
keystonePyPI | < 15.0.1 | 15.0.1 |
keystonePyPI | >= 16.0.0, < 16.0.1 | 16.0.1 |
Affected products
2- OpenStack/Keystonedescription
Patches
395b2bbeab113Merge "Fix security issues with EC2 credentials" into stable/stein
8 files changed · +608 −62
keystone/api/credentials.py+56 −20 modified@@ -13,6 +13,7 @@ # This file handles all flask-restful resources for /v3/credentials import hashlib +import six import flask from oslo_serialization import jsonutils @@ -60,30 +61,41 @@ def _blob_to_json(ref): ref['blob'] = jsonutils.dumps(blob) return ref - def _assign_unique_id(self, ref, trust_id=None): + def _validate_blob_json(self, ref): + try: + blob = jsonutils.loads(ref.get('blob')) + except (ValueError, TabError): + raise exception.ValidationError( + message=_('Invalid blob in credential')) + if not blob or not isinstance(blob, dict): + raise exception.ValidationError(attribute='blob', + target='credential') + if blob.get('access') is None: + raise exception.ValidationError(attribute='access', + target='credential') + return blob + + def _assign_unique_id( + self, ref, trust_id=None, app_cred_id=None, access_token_id=None): # Generates an assigns a unique identifier to a credential reference. if ref.get('type', '').lower() == 'ec2': - try: - blob = jsonutils.loads(ref.get('blob')) - except (ValueError, TabError): - raise exception.ValidationError( - message=_('Invalid blob in credential')) - if not blob or not isinstance(blob, dict): - raise exception.ValidationError(attribute='blob', - target='credential') - if blob.get('access') is None: - raise exception.ValidationError(attribute='access', - target='credential') - + blob = self._validate_blob_json(ref) ref = ref.copy() ref['id'] = hashlib.sha256( blob['access'].encode('utf8')).hexdigest() - # update the blob with the trust_id, so credentials created with - # a trust scoped token will result in trust scoped tokens when - # authentication via ec2tokens happens + # update the blob with the trust_id or app_cred_id, so credentials + # created with a trust- or app cred-scoped token will result in + # trust- or app cred-scoped tokens when authentication via + # ec2tokens happens if trust_id is not None: blob['trust_id'] = trust_id ref['blob'] = jsonutils.dumps(blob) + if app_cred_id is not None: + blob['app_cred_id'] = app_cred_id + ref['blob'] = jsonutils.dumps(blob) + if access_token_id is not None: + blob['access_token_id'] = access_token_id + ref['blob'] = jsonutils.dumps(blob) return ref else: return super(CredentialResource, self)._assign_unique_id(ref) @@ -146,23 +158,47 @@ def post(self): ) validation.lazy_validate(schema.credential_create, credential) trust_id = getattr(self.oslo_context, 'trust_id', None) + app_cred_id = getattr( + self.auth_context['token'], 'application_credential_id', None) + access_token_id = getattr( + self.auth_context['token'], 'access_token_id', None) ref = self._assign_unique_id( - self._normalize_dict(credential), trust_id=trust_id) - ref = PROVIDERS.credential_api.create_credential(ref['id'], ref, - initiator=self.audit_initiator) + self._normalize_dict(credential), + trust_id=trust_id, app_cred_id=app_cred_id, + access_token_id=access_token_id) + ref = PROVIDERS.credential_api.create_credential( + ref['id'], ref, initiator=self.audit_initiator) return self.wrap_member(ref), http_client.CREATED + def _validate_blob_update_keys(self, credential, ref): + if credential.get('type', '').lower() == 'ec2': + new_blob = self._validate_blob_json(ref) + old_blob = credential.get('blob') + if isinstance(old_blob, six.string_types): + old_blob = jsonutils.loads(old_blob) + # if there was a scope set, prevent changing it or unsetting it + for key in ['trust_id', 'app_cred_id', 'access_token_id']: + if old_blob.get(key) != new_blob.get(key): + message = _('%s can not be updated for credential') % key + raise exception.ValidationError(message=message) + def patch(self, credential_id): # Update Credential ENFORCER.enforce_call( action='identity:update_credential', build_target=_build_target_enforcement ) - PROVIDERS.credential_api.get_credential(credential_id) + current = PROVIDERS.credential_api.get_credential(credential_id) credential = self.request_body_json.get('credential', {}) validation.lazy_validate(schema.credential_update, credential) + self._validate_blob_update_keys(current.copy(), credential.copy()) self._require_matching_id(credential) + # Check that the user hasn't illegally modified the owner or scope + target = {'credential': dict(current, **credential)} + ENFORCER.enforce_call( + action='identity:update_credential', target_attr=target + ) ref = PROVIDERS.credential_api.update_credential( credential_id, credential) return self.wrap_member(ref)
keystone/api/_shared/EC2_S3_Resource.py+40 −5 modified@@ -115,7 +115,9 @@ def handle_authenticate(self): project_id=cred.get('project_id'), access=loaded.get('access'), secret=loaded.get('secret'), - trust_id=loaded.get('trust_id') + trust_id=loaded.get('trust_id'), + app_cred_id=loaded.get('app_cred_id'), + access_token_id=loaded.get('access_token_id') ) # validate the signature @@ -137,8 +139,34 @@ def handle_authenticate(self): sys.exc_info()[2]) self._check_timestamp(credentials) - roles = PROVIDERS.assignment_api.get_roles_for_user_and_project( - user_ref['id'], project_ref['id']) + + trustee_user_id = None + auth_context = None + if cred_data['trust_id']: + trust = PROVIDERS.trust_api.get_trust(cred_data['trust_id']) + roles = [r['id'] for r in trust['roles']] + # NOTE(cmurphy): if this credential was created using a + # trust-scoped token with impersonation, the user_id will be for + # the trustor, not the trustee. In this case, issuing a + # trust-scoped token to the trustor will fail. In order to get a + # trust-scoped token, use the user ID of the trustee. With + # impersonation, the resulting token will still be for the trustor. + # Without impersonation, the token will be for the trustee. + if trust['impersonation'] is True: + trustee_user_id = trust['trustee_user_id'] + elif cred_data['app_cred_id']: + ac_client = PROVIDERS.application_credential_api + app_cred = ac_client.get_application_credential( + cred_data['app_cred_id']) + roles = [r['id'] for r in app_cred['roles']] + elif cred_data['access_token_id']: + access_token = PROVIDERS.oauth_api.get_access_token( + cred_data['access_token_id']) + roles = jsonutils.loads(access_token['role_ids']) + auth_context = {'access_token_id': cred_data['access_token_id']} + else: + roles = PROVIDERS.assignment_api.get_roles_for_user_and_project( + user_ref['id'], project_ref['id']) if not roles: raise ks_exceptions.Unauthorized(_('User not valid for project.')) @@ -149,7 +177,14 @@ def handle_authenticate(self): method_names = ['ec2credential'] + if trustee_user_id: + user_id = trustee_user_id + else: + user_id = user_ref['id'] token = PROVIDERS.token_provider_api.issue_token( - user_id=user_ref['id'], method_names=method_names, - project_id=project_ref['id']) + user_id=user_id, method_names=method_names, + project_id=project_ref['id'], + trust_id=cred_data['trust_id'], + app_cred_id=cred_data['app_cred_id'], + auth_context=auth_context) return token
keystone/api/users.py+20 −2 modified@@ -568,6 +568,25 @@ def _normalize_role_list(app_cred_roles): role['name'])) return roles + def _get_roles(self, app_cred_data, token): + if app_cred_data.get('roles'): + roles = self._normalize_role_list(app_cred_data['roles']) + # NOTE(cmurphy): The user is not allowed to add a role that is not + # in their token. This is to prevent trustees or application + # credential users from escallating their privileges to include + # additional roles that the trustor or application credential + # creator has assigned on the project. + token_roles = [r['id'] for r in token.roles] + for role in roles: + if role['id'] not in token_roles: + detail = _('Cannot create an application credential with ' + 'unassigned role') + raise ks_exception.ApplicationCredentialValidationError( + detail=detail) + else: + roles = token.roles + return roles + def get(self, user_id): """List application credentials for user. @@ -603,8 +622,7 @@ def post(self, user_id): app_cred_data['secret'] = self._generate_secret() app_cred_data['user_id'] = user_id app_cred_data['project_id'] = project_id - app_cred_data['roles'] = self._normalize_role_list( - app_cred_data.get('roles', token.roles)) + app_cred_data['roles'] = self._get_roles(app_cred_data, token) if app_cred_data.get('expires_at'): app_cred_data['expires_at'] = utils.parse_expiration_date( app_cred_data['expires_at'])
keystone/tests/unit/test_v3_application_credential.py+31 −0 modified@@ -166,6 +166,37 @@ def test_create_application_credential_with_application_credential(self): expected_status_code=http_client.FORBIDDEN, headers={'X-Auth-Token': token}) + def test_create_application_credential_with_trust(self): + second_role = unit.new_role_ref(name='reader') + PROVIDERS.role_api.create_role(second_role['id'], second_role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, self.project_id, second_role['id']) + with self.test_client() as c: + pw_token = self.get_scoped_token() + # create a self-trust - only the roles are important for this test + trust_ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.user_id, + project_id=self.project_id, + role_ids=[second_role['id']]) + resp = c.post('/v3/OS-TRUST/trusts', + headers={'X-Auth-Token': pw_token}, + json={'trust': trust_ref}) + trust_id = resp.json['trust']['id'] + trust_auth = self.build_authentication_request( + user_id=self.user_id, + password=self.user['password'], + trust_id=trust_id) + trust_token = self.v3_create_token( + trust_auth).headers['X-Subject-Token'] + app_cred = self._app_cred_body(roles=[{'id': self.role_id}]) + # only the roles from the trust token should be allowed, even if + # the user has the role assigned on the project + c.post('/v3/users/%s/application_credentials' % self.user_id, + headers={'X-Auth-Token': trust_token}, + json=app_cred, + expected_status_code=http_client.BAD_REQUEST) + def test_create_application_credential_allow_recursion(self): with self.test_client() as c: roles = [{'id': self.role_id}]
keystone/tests/unit/test_v3_credential.py+395 −35 modified@@ -19,14 +19,15 @@ from keystoneclient.contrib.ec2 import utils as ec2_utils import mock from oslo_db import exception as oslo_db_exception -from six.moves import http_client +from six.moves import http_client, urllib from testtools import matchers from keystone.api import ec2tokens from keystone.common import provider_api from keystone.common import utils from keystone.credential.providers import fernet as credential_fernet from keystone import exception +from keystone import oauth1 from keystone.tests import unit from keystone.tests.unit import ksfixtures from keystone.tests.unit import test_v3 @@ -63,6 +64,33 @@ def _create_dict_blob_credential(self): return json.dumps(blob), credential_id + def _test_get_token(self, access, secret): + """Test signature validation with the access/secret provided.""" + signer = ec2_utils.Ec2Signer(secret) + params = {'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': access} + request = {'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + signature = signer.generate(request) + + # Now make a request to validate the signed dummy request via the + # ec2tokens API. This proves the v3 ec2 credentials actually work. + sig_ref = {'access': access, + 'signature': signature, + 'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + r = self.post( + '/ec2tokens', + body={'ec2Credentials': sig_ref}, + expected_status=http_client.OK) + self.assertValidTokenResponse(r) + return r.result['token'] + class CredentialTestCase(CredentialBaseTestCase): """Test credential CRUD.""" @@ -258,6 +286,126 @@ def test_update_credential_to_ec2_with_previously_set_project_id(self): 'credential_id': credential_id}, body={'credential': update_ref}) + def test_update_credential_non_owner(self): + """Call ``PATCH /credentials/{credential_id}``.""" + alt_user = unit.create_user( + PROVIDERS.identity_api, domain_id=self.domain_id) + alt_user_id = alt_user['id'] + alt_project = unit.new_project_ref(domain_id=self.domain_id) + alt_project_id = alt_project['id'] + PROVIDERS.resource_api.create_project( + alt_project['id'], alt_project) + alt_role = unit.new_role_ref(name='reader') + alt_role_id = alt_role['id'] + PROVIDERS.role_api.create_role(alt_role_id, alt_role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + alt_user_id, alt_project_id, alt_role_id) + auth = self.build_authentication_request( + user_id=alt_user_id, + password=alt_user['password'], + project_id=alt_project_id) + ref = unit.new_credential_ref(user_id=alt_user_id, + project_id=alt_project_id) + r = self.post( + '/credentials', + auth=auth, + body={'credential': ref}) + self.assertValidCredentialResponse(r, ref) + credential_id = r.result.get('credential')['id'] + + # Cannot change the credential to be owned by another user + update_ref = {'user_id': self.user_id, 'project_id': self.project_id} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + expected_status=403, + auth=auth, + body={'credential': update_ref}) + + def test_update_ec2_credential_change_trust_id(self): + """Call ``PATCH /credentials/{credential_id}``.""" + blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + project_id=self.project_id) + blob['trust_id'] = uuid.uuid4().hex + ref['blob'] = json.dumps(blob) + r = self.post( + '/credentials', + body={'credential': ref}) + self.assertValidCredentialResponse(r, ref) + credential_id = r.result.get('credential')['id'] + # Try changing to a different trust + blob['trust_id'] = uuid.uuid4().hex + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http_client.BAD_REQUEST) + # Try removing the trust + del blob['trust_id'] + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http_client.BAD_REQUEST) + + def test_update_ec2_credential_change_app_cred_id(self): + """Call ``PATCH /credentials/{credential_id}``.""" + blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + project_id=self.project_id) + blob['app_cred_id'] = uuid.uuid4().hex + ref['blob'] = json.dumps(blob) + r = self.post( + '/credentials', + body={'credential': ref}) + self.assertValidCredentialResponse(r, ref) + credential_id = r.result.get('credential')['id'] + # Try changing to a different app cred + blob['app_cred_id'] = uuid.uuid4().hex + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http_client.BAD_REQUEST) + # Try removing the app cred + del blob['app_cred_id'] + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http_client.BAD_REQUEST) + + def test_update_ec2_credential_change_access_token_id(self): + """Call ``PATCH /credentials/{credential_id}``.""" + blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + project_id=self.project_id) + blob['access_token_id'] = uuid.uuid4().hex + ref['blob'] = json.dumps(blob) + r = self.post( + '/credentials', + body={'credential': ref}) + self.assertValidCredentialResponse(r, ref) + credential_id = r.result.get('credential')['id'] + # Try changing to a different access token + blob['access_token_id'] = uuid.uuid4().hex + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http_client.BAD_REQUEST) + # Try removing the access token + del blob['access_token_id'] + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http_client.BAD_REQUEST) + def test_delete_credential(self): """Call ``DELETE /credentials/{credential_id}``.""" self.delete( @@ -393,7 +541,7 @@ def test_create_credential_with_admin_token(self): self.assertValidCredentialResponse(r, ref) -class TestCredentialTrustScoped(test_v3.RestfulTestCase): +class TestCredentialTrustScoped(CredentialBaseTestCase): """Test credential with trust scoped token.""" def setUp(self): @@ -446,7 +594,7 @@ def test_trust_scoped_ec2_credential(self): token_id = r.headers.get('X-Subject-Token') # Create the credential with the trust scoped token - blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + blob, ref = unit.new_ec2_credential(user_id=self.user_id, project_id=self.project_id) r = self.post('/credentials', body={'credential': ref}, token=token_id) @@ -463,6 +611,97 @@ def test_trust_scoped_ec2_credential(self): self.assertEqual(hashlib.sha256(access).hexdigest(), r.result['credential']['id']) + # Create a role assignment to ensure that it is ignored and only the + # trust-delegated roles are used + role = unit.new_role_ref(name='reader') + role_id = role['id'] + PROVIDERS.role_api.create_role(role_id, role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, self.project_id, role_id) + + ret_blob = json.loads(r.result['credential']['blob']) + ec2token = self._test_get_token( + access=ret_blob['access'], secret=ret_blob['secret']) + ec2_roles = [role['id'] for role in ec2token['roles']] + self.assertIn(self.role_id, ec2_roles) + self.assertNotIn(role_id, ec2_roles) + + # Create second ec2 credential with the same access key id and check + # for conflict. + self.post( + '/credentials', + body={'credential': ref}, + token=token_id, + expected_status=http_client.CONFLICT) + + +class TestCredentialAppCreds(CredentialBaseTestCase): + """Test credential with application credential token.""" + + def setUp(self): + super(TestCredentialAppCreds, self).setUp() + self.useFixture( + ksfixtures.KeyRepository( + self.config_fixture, + 'credential', + credential_fernet.MAX_ACTIVE_KEYS + ) + ) + + def test_app_cred_ec2_credential(self): + """Test creating ec2 credential from an application credential. + + Call ``POST /credentials``. + """ + # Create the app cred + ref = { + 'name': uuid.uuid4().hex, + 'roles': [{'id': self.role_id}] + } + r = self.post('/users/%s/application_credentials' % self.user_id, + body={'application_credential': ref}) + app_cred = r.result['application_credential'] + + # Get an application credential token + auth_data = self.build_authentication_request( + app_cred_id=app_cred['id'], + secret=app_cred['secret']) + r = self.v3_create_token(auth_data) + token_id = r.headers.get('X-Subject-Token') + + # Create the credential with the app cred token + blob, ref = unit.new_ec2_credential(user_id=self.user_id, + project_id=self.project_id) + r = self.post('/credentials', body={'credential': ref}, token=token_id) + + # We expect the response blob to contain the app_cred_id + ret_ref = ref.copy() + ret_blob = blob.copy() + ret_blob['app_cred_id'] = app_cred['id'] + ret_ref['blob'] = json.dumps(ret_blob) + self.assertValidCredentialResponse(r, ref=ret_ref) + + # Assert credential id is same as hash of access key id for + # ec2 credentials + access = blob['access'].encode('utf-8') + self.assertEqual(hashlib.sha256(access).hexdigest(), + r.result['credential']['id']) + + # Create a role assignment to ensure that it is ignored and only the + # roles in the app cred are used + role = unit.new_role_ref(name='reader') + role_id = role['id'] + PROVIDERS.role_api.create_role(role_id, role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, self.project_id, role_id) + + ret_blob = json.loads(r.result['credential']['blob']) + ec2token = self._test_get_token( + access=ret_blob['access'], secret=ret_blob['secret']) + ec2_roles = [role['id'] for role in ec2token['roles']] + self.assertIn(self.role_id, ec2_roles) + self.assertNotIn(role_id, ec2_roles) + # Create second ec2 credential with the same access key id and check # for conflict. self.post( @@ -472,34 +711,155 @@ def test_trust_scoped_ec2_credential(self): expected_status=http_client.CONFLICT) -class TestCredentialEc2(CredentialBaseTestCase): - """Test v3 credential compatibility with ec2tokens.""" +class TestCredentialAccessToken(CredentialBaseTestCase): + """Test credential with access token.""" - def _validate_signature(self, access, secret): - """Test signature validation with the access/secret provided.""" - signer = ec2_utils.Ec2Signer(secret) - params = {'SignatureMethod': 'HmacSHA256', - 'SignatureVersion': '2', - 'AWSAccessKeyId': access} - request = {'host': 'foo', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - signature = signer.generate(request) + def setUp(self): + super(TestCredentialAccessToken, self).setUp() + self.useFixture( + ksfixtures.KeyRepository( + self.config_fixture, + 'credential', + credential_fernet.MAX_ACTIVE_KEYS + ) + ) + self.base_url = 'http://localhost/v3' + + def _urllib_parse_qs_text_keys(self, content): + results = urllib.parse.parse_qs(content) + return {key.decode('utf-8'): value for key, value in results.items()} + + def _create_single_consumer(self): + endpoint = '/OS-OAUTH1/consumers' + + ref = {'description': uuid.uuid4().hex} + resp = self.post(endpoint, body={'consumer': ref}) + return resp.result['consumer'] + + def _create_request_token(self, consumer, project_id, base_url=None): + endpoint = '/OS-OAUTH1/request_token' + client = oauth1.Client(consumer['key'], + client_secret=consumer['secret'], + signature_method=oauth1.SIG_HMAC, + callback_uri="oob") + headers = {'requested_project_id': project_id} + if not base_url: + base_url = self.base_url + url, headers, body = client.sign(base_url + endpoint, + http_method='POST', + headers=headers) + return endpoint, headers + + def _create_access_token(self, consumer, token, base_url=None): + endpoint = '/OS-OAUTH1/access_token' + client = oauth1.Client(consumer['key'], + client_secret=consumer['secret'], + resource_owner_key=token.key, + resource_owner_secret=token.secret, + signature_method=oauth1.SIG_HMAC, + verifier=token.verifier) + if not base_url: + base_url = self.base_url + url, headers, body = client.sign(base_url + endpoint, + http_method='POST') + headers.update({'Content-Type': 'application/json'}) + return endpoint, headers + + def _get_oauth_token(self, consumer, token): + client = oauth1.Client(consumer['key'], + client_secret=consumer['secret'], + resource_owner_key=token.key, + resource_owner_secret=token.secret, + signature_method=oauth1.SIG_HMAC) + endpoint = '/auth/tokens' + url, headers, body = client.sign(self.base_url + endpoint, + http_method='POST') + headers.update({'Content-Type': 'application/json'}) + ref = {'auth': {'identity': {'oauth1': {}, 'methods': ['oauth1']}}} + return endpoint, headers, ref + + def _authorize_request_token(self, request_id): + if isinstance(request_id, bytes): + request_id = request_id.decode() + return '/OS-OAUTH1/authorize/%s' % (request_id) + + def _get_access_token(self): + consumer = self._create_single_consumer() + consumer_id = consumer['id'] + consumer_secret = consumer['secret'] + consumer = {'key': consumer_id, 'secret': consumer_secret} + + url, headers = self._create_request_token(consumer, self.project_id) + content = self.post( + url, headers=headers, + response_content_type='application/x-www-form-urlencoded') + credentials = self._urllib_parse_qs_text_keys(content.result) + request_key = credentials['oauth_token'][0] + request_secret = credentials['oauth_token_secret'][0] + request_token = oauth1.Token(request_key, request_secret) + + url = self._authorize_request_token(request_key) + body = {'roles': [{'id': self.role_id}]} + resp = self.put(url, body=body, expected_status=http_client.OK) + verifier = resp.result['token']['oauth_verifier'] + + request_token.set_verifier(verifier) + url, headers = self._create_access_token(consumer, request_token) + content = self.post( + url, headers=headers, + response_content_type='application/x-www-form-urlencoded') + credentials = self._urllib_parse_qs_text_keys(content.result) + access_key = credentials['oauth_token'][0] + access_secret = credentials['oauth_token_secret'][0] + access_token = oauth1.Token(access_key, access_secret) + + url, headers, body = self._get_oauth_token(consumer, access_token) + content = self.post(url, headers=headers, body=body) + return access_key, content.headers['X-Subject-Token'] + + def test_access_token_ec2_credential(self): + """Test creating ec2 credential from an oauth access token. - # Now make a request to validate the signed dummy request via the - # ec2tokens API. This proves the v3 ec2 credentials actually work. - sig_ref = {'access': access, - 'signature': signature, - 'host': 'foo', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - r = self.post( - '/ec2tokens', - body={'ec2Credentials': sig_ref}, - expected_status=http_client.OK) - self.assertValidTokenResponse(r) + Call ``POST /credentials``. + """ + access_key, token_id = self._get_access_token() + + # Create the credential with the access token + blob, ref = unit.new_ec2_credential(user_id=self.user_id, + project_id=self.project_id) + r = self.post('/credentials', body={'credential': ref}, token=token_id) + + # We expect the response blob to contain the access_token_id + ret_ref = ref.copy() + ret_blob = blob.copy() + ret_blob['access_token_id'] = access_key.decode('utf-8') + ret_ref['blob'] = json.dumps(ret_blob) + self.assertValidCredentialResponse(r, ref=ret_ref) + + # Assert credential id is same as hash of access key id for + # ec2 credentials + access = blob['access'].encode('utf-8') + self.assertEqual(hashlib.sha256(access).hexdigest(), + r.result['credential']['id']) + + # Create a role assignment to ensure that it is ignored and only the + # roles in the access token are used + role = unit.new_role_ref(name='reader') + role_id = role['id'] + PROVIDERS.role_api.create_role(role_id, role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, self.project_id, role_id) + + ret_blob = json.loads(r.result['credential']['blob']) + ec2token = self._test_get_token( + access=ret_blob['access'], secret=ret_blob['secret']) + ec2_roles = [role['id'] for role in ec2token['roles']] + self.assertIn(self.role_id, ec2_roles) + self.assertNotIn(role_id, ec2_roles) + + +class TestCredentialEc2(CredentialBaseTestCase): + """Test v3 credential compatibility with ec2tokens.""" def test_ec2_credential_signature_validate(self): """Test signature validation with a v3 ec2 credential.""" @@ -514,15 +874,15 @@ def test_ec2_credential_signature_validate(self): cred_blob = json.loads(r.result['credential']['blob']) self.assertEqual(blob, cred_blob) - self._validate_signature(access=cred_blob['access'], - secret=cred_blob['secret']) + self._test_get_token(access=cred_blob['access'], + secret=cred_blob['secret']) def test_ec2_credential_signature_validate_legacy(self): """Test signature validation with a legacy v3 ec2 credential.""" cred_json, _ = self._create_dict_blob_credential() cred_blob = json.loads(cred_json) - self._validate_signature(access=cred_blob['access'], - secret=cred_blob['secret']) + self._test_get_token(access=cred_blob['access'], + secret=cred_blob['secret']) def _get_ec2_cred_uri(self): return '/users/%s/credentials/OS-EC2' % self.user_id @@ -538,8 +898,8 @@ def test_ec2_create_credential(self): self.assertEqual(self.user_id, ec2_cred['user_id']) self.assertEqual(self.project_id, ec2_cred['tenant_id']) self.assertIsNone(ec2_cred['trust_id']) - self._validate_signature(access=ec2_cred['access'], - secret=ec2_cred['secret']) + self._test_get_token(access=ec2_cred['access'], + secret=ec2_cred['secret']) uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']]) self.assertThat(ec2_cred['links']['self'], matchers.EndsWith(uri))
releasenotes/notes/bug-1872733-2377f456a57ad32c.yaml+16 −0 added@@ -0,0 +1,16 @@ +--- +critical: + - | + [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_] + Fixed a critical security issue in which an authenticated user could + escalate their privileges by altering a valid EC2 credential. +security: + - | + [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_] + Fixed a critical security issue in which an authenticated user could + escalate their privileges by altering a valid EC2 credential. +fixes: + - | + [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_] + Fixed a critical security issue in which an authenticated user could + escalate their privileges by altering a valid EC2 credential.
releasenotes/notes/bug-1872735-0989e51d2248ce1e.yaml+31 −0 added@@ -0,0 +1,31 @@ +--- +critical: + - | + [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_] + Fixed a security issue in which a trustee or an application credential user + could create an EC2 credential or an application credential that would + permit them to get a token that elevated their role assignments beyond the + subset delegated to them in the trust or application credential. A new + attribute ``app_cred_id`` is now automatically added to the access blob of + an EC2 credential and the role list in the trust or application credential + is respected. +security: + - | + [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_] + Fixed a security issue in which a trustee or an application credential user + could create an EC2 credential or an application credential that would + permit them to get a token that elevated their role assignments beyond the + subset delegated to them in the trust or application credential. A new + attribute ``app_cred_id`` is now automatically added to the access blob of + an EC2 credential and the role list in the trust or application credential + is respected. +fixes: + - | + [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_] + Fixed a security issue in which a trustee or an application credential user + could create an EC2 credential or an application credential that would + permit them to get a token that elevated their role assignments beyond the + subset delegated to them in the trust or application credential. A new + attribute ``app_cred_id`` is now automatically added to the access blob of + an EC2 credential and the role list in the trust or application credential + is respected.
releasenotes/notes/bug-1872755-2c81d3267b89f124.yaml+19 −0 added@@ -0,0 +1,19 @@ +--- +security: + - | + [`bug 1872755 <https://bugs.launchpad.net/keystone/+bug/1872755>`_] + Added validation to the EC2 credentials update API to ensure the metadata + labels 'trust_id' and 'app_cred_id' are not altered by the user. These + labels are used by keystone to determine the scope allowed by the + credential, and altering these automatic labels could enable an EC2 + credential holder to elevate their access beyond what is permitted by the + application credential or trust that was used to create the EC2 credential. +fixes: + - | + [`bug 1872755 <https://bugs.launchpad.net/keystone/+bug/1872755>`_] + Added validation to the EC2 credentials update API to ensure the metadata + labels 'trust_id' and 'app_cred_id' are not altered by the user. These + labels are used by keystone to determine the scope allowed by the + credential, and altering these automatic labels could enable an EC2 + credential holder to elevate their access beyond what is permitted by the + application credential or trust that was used to create the EC2 credential.
40cbb7bebd50Merge "Fix security issues with EC2 credentials" into stable/train
8 files changed · +606 −62
keystone/api/credentials.py+56 −20 modified@@ -13,6 +13,7 @@ # This file handles all flask-restful resources for /v3/credentials import hashlib +import six import flask from oslo_serialization import jsonutils @@ -60,30 +61,41 @@ def _blob_to_json(ref): ref['blob'] = jsonutils.dumps(blob) return ref - def _assign_unique_id(self, ref, trust_id=None): + def _validate_blob_json(self, ref): + try: + blob = jsonutils.loads(ref.get('blob')) + except (ValueError, TabError): + raise exception.ValidationError( + message=_('Invalid blob in credential')) + if not blob or not isinstance(blob, dict): + raise exception.ValidationError(attribute='blob', + target='credential') + if blob.get('access') is None: + raise exception.ValidationError(attribute='access', + target='credential') + return blob + + def _assign_unique_id( + self, ref, trust_id=None, app_cred_id=None, access_token_id=None): # Generates an assigns a unique identifier to a credential reference. if ref.get('type', '').lower() == 'ec2': - try: - blob = jsonutils.loads(ref.get('blob')) - except (ValueError, TabError): - raise exception.ValidationError( - message=_('Invalid blob in credential')) - if not blob or not isinstance(blob, dict): - raise exception.ValidationError(attribute='blob', - target='credential') - if blob.get('access') is None: - raise exception.ValidationError(attribute='access', - target='credential') - + blob = self._validate_blob_json(ref) ref = ref.copy() ref['id'] = hashlib.sha256( blob['access'].encode('utf8')).hexdigest() - # update the blob with the trust_id, so credentials created with - # a trust scoped token will result in trust scoped tokens when - # authentication via ec2tokens happens + # update the blob with the trust_id or app_cred_id, so credentials + # created with a trust- or app cred-scoped token will result in + # trust- or app cred-scoped tokens when authentication via + # ec2tokens happens if trust_id is not None: blob['trust_id'] = trust_id ref['blob'] = jsonutils.dumps(blob) + if app_cred_id is not None: + blob['app_cred_id'] = app_cred_id + ref['blob'] = jsonutils.dumps(blob) + if access_token_id is not None: + blob['access_token_id'] = access_token_id + ref['blob'] = jsonutils.dumps(blob) return ref else: return super(CredentialResource, self)._assign_unique_id(ref) @@ -146,23 +158,47 @@ def post(self): ) validation.lazy_validate(schema.credential_create, credential) trust_id = getattr(self.oslo_context, 'trust_id', None) + app_cred_id = getattr( + self.auth_context['token'], 'application_credential_id', None) + access_token_id = getattr( + self.auth_context['token'], 'access_token_id', None) ref = self._assign_unique_id( - self._normalize_dict(credential), trust_id=trust_id) - ref = PROVIDERS.credential_api.create_credential(ref['id'], ref, - initiator=self.audit_initiator) + self._normalize_dict(credential), + trust_id=trust_id, app_cred_id=app_cred_id, + access_token_id=access_token_id) + ref = PROVIDERS.credential_api.create_credential( + ref['id'], ref, initiator=self.audit_initiator) return self.wrap_member(ref), http_client.CREATED + def _validate_blob_update_keys(self, credential, ref): + if credential.get('type', '').lower() == 'ec2': + new_blob = self._validate_blob_json(ref) + old_blob = credential.get('blob') + if isinstance(old_blob, six.string_types): + old_blob = jsonutils.loads(old_blob) + # if there was a scope set, prevent changing it or unsetting it + for key in ['trust_id', 'app_cred_id', 'access_token_id']: + if old_blob.get(key) != new_blob.get(key): + message = _('%s can not be updated for credential') % key + raise exception.ValidationError(message=message) + def patch(self, credential_id): # Update Credential ENFORCER.enforce_call( action='identity:update_credential', build_target=_build_target_enforcement ) - PROVIDERS.credential_api.get_credential(credential_id) + current = PROVIDERS.credential_api.get_credential(credential_id) credential = self.request_body_json.get('credential', {}) validation.lazy_validate(schema.credential_update, credential) + self._validate_blob_update_keys(current.copy(), credential.copy()) self._require_matching_id(credential) + # Check that the user hasn't illegally modified the owner or scope + target = {'credential': dict(current, **credential)} + ENFORCER.enforce_call( + action='identity:update_credential', target_attr=target + ) ref = PROVIDERS.credential_api.update_credential( credential_id, credential) return self.wrap_member(ref)
keystone/api/_shared/EC2_S3_Resource.py+40 −5 modified@@ -115,7 +115,9 @@ def handle_authenticate(self): project_id=cred.get('project_id'), access=loaded.get('access'), secret=loaded.get('secret'), - trust_id=loaded.get('trust_id') + trust_id=loaded.get('trust_id'), + app_cred_id=loaded.get('app_cred_id'), + access_token_id=loaded.get('access_token_id') ) # validate the signature @@ -137,8 +139,34 @@ def handle_authenticate(self): sys.exc_info()[2]) self._check_timestamp(credentials) - roles = PROVIDERS.assignment_api.get_roles_for_user_and_project( - user_ref['id'], project_ref['id']) + + trustee_user_id = None + auth_context = None + if cred_data['trust_id']: + trust = PROVIDERS.trust_api.get_trust(cred_data['trust_id']) + roles = [r['id'] for r in trust['roles']] + # NOTE(cmurphy): if this credential was created using a + # trust-scoped token with impersonation, the user_id will be for + # the trustor, not the trustee. In this case, issuing a + # trust-scoped token to the trustor will fail. In order to get a + # trust-scoped token, use the user ID of the trustee. With + # impersonation, the resulting token will still be for the trustor. + # Without impersonation, the token will be for the trustee. + if trust['impersonation'] is True: + trustee_user_id = trust['trustee_user_id'] + elif cred_data['app_cred_id']: + ac_client = PROVIDERS.application_credential_api + app_cred = ac_client.get_application_credential( + cred_data['app_cred_id']) + roles = [r['id'] for r in app_cred['roles']] + elif cred_data['access_token_id']: + access_token = PROVIDERS.oauth_api.get_access_token( + cred_data['access_token_id']) + roles = jsonutils.loads(access_token['role_ids']) + auth_context = {'access_token_id': cred_data['access_token_id']} + else: + roles = PROVIDERS.assignment_api.get_roles_for_user_and_project( + user_ref['id'], project_ref['id']) if not roles: raise ks_exceptions.Unauthorized(_('User not valid for project.')) @@ -149,7 +177,14 @@ def handle_authenticate(self): method_names = ['ec2credential'] + if trustee_user_id: + user_id = trustee_user_id + else: + user_id = user_ref['id'] token = PROVIDERS.token_provider_api.issue_token( - user_id=user_ref['id'], method_names=method_names, - project_id=project_ref['id']) + user_id=user_id, method_names=method_names, + project_id=project_ref['id'], + trust_id=cred_data['trust_id'], + app_cred_id=cred_data['app_cred_id'], + auth_context=auth_context) return token
keystone/api/users.py+20 −2 modified@@ -559,6 +559,25 @@ def _normalize_role_list(app_cred_roles): role['name'])) return roles + def _get_roles(self, app_cred_data, token): + if app_cred_data.get('roles'): + roles = self._normalize_role_list(app_cred_data['roles']) + # NOTE(cmurphy): The user is not allowed to add a role that is not + # in their token. This is to prevent trustees or application + # credential users from escallating their privileges to include + # additional roles that the trustor or application credential + # creator has assigned on the project. + token_roles = [r['id'] for r in token.roles] + for role in roles: + if role['id'] not in token_roles: + detail = _('Cannot create an application credential with ' + 'unassigned role') + raise ks_exception.ApplicationCredentialValidationError( + detail=detail) + else: + roles = token.roles + return roles + def get(self, user_id): """List application credentials for user. @@ -594,8 +613,7 @@ def post(self, user_id): app_cred_data['secret'] = self._generate_secret() app_cred_data['user_id'] = user_id app_cred_data['project_id'] = project_id - app_cred_data['roles'] = self._normalize_role_list( - app_cred_data.get('roles', token.roles)) + app_cred_data['roles'] = self._get_roles(app_cred_data, token) if app_cred_data.get('expires_at'): app_cred_data['expires_at'] = utils.parse_expiration_date( app_cred_data['expires_at'])
keystone/tests/unit/test_v3_application_credential.py+31 −0 modified@@ -169,6 +169,37 @@ def test_create_application_credential_with_application_credential(self): expected_status_code=http_client.FORBIDDEN, headers={'X-Auth-Token': token}) + def test_create_application_credential_with_trust(self): + second_role = unit.new_role_ref(name='reader') + PROVIDERS.role_api.create_role(second_role['id'], second_role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, self.project_id, second_role['id']) + with self.test_client() as c: + pw_token = self.get_scoped_token() + # create a self-trust - only the roles are important for this test + trust_ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.user_id, + project_id=self.project_id, + role_ids=[second_role['id']]) + resp = c.post('/v3/OS-TRUST/trusts', + headers={'X-Auth-Token': pw_token}, + json={'trust': trust_ref}) + trust_id = resp.json['trust']['id'] + trust_auth = self.build_authentication_request( + user_id=self.user_id, + password=self.user['password'], + trust_id=trust_id) + trust_token = self.v3_create_token( + trust_auth).headers['X-Subject-Token'] + app_cred = self._app_cred_body(roles=[{'id': self.role_id}]) + # only the roles from the trust token should be allowed, even if + # the user has the role assigned on the project + c.post('/v3/users/%s/application_credentials' % self.user_id, + headers={'X-Auth-Token': trust_token}, + json=app_cred, + expected_status_code=http_client.BAD_REQUEST) + def test_create_application_credential_allow_recursion(self): with self.test_client() as c: roles = [{'id': self.role_id}]
keystone/tests/unit/test_v3_credential.py+393 −35 modified@@ -19,14 +19,15 @@ from keystoneclient.contrib.ec2 import utils as ec2_utils import mock from oslo_db import exception as oslo_db_exception -from six.moves import http_client +from six.moves import http_client, urllib from testtools import matchers from keystone.api import ec2tokens from keystone.common import provider_api from keystone.common import utils from keystone.credential.providers import fernet as credential_fernet from keystone import exception +from keystone import oauth1 from keystone.tests import unit from keystone.tests.unit import ksfixtures from keystone.tests.unit import test_v3 @@ -63,6 +64,33 @@ def _create_dict_blob_credential(self): return json.dumps(blob), credential_id + def _test_get_token(self, access, secret): + """Test signature validation with the access/secret provided.""" + signer = ec2_utils.Ec2Signer(secret) + params = {'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': access} + request = {'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + signature = signer.generate(request) + + # Now make a request to validate the signed dummy request via the + # ec2tokens API. This proves the v3 ec2 credentials actually work. + sig_ref = {'access': access, + 'signature': signature, + 'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + r = self.post( + '/ec2tokens', + body={'ec2Credentials': sig_ref}, + expected_status=http_client.OK) + self.assertValidTokenResponse(r) + return r.result['token'] + class CredentialTestCase(CredentialBaseTestCase): """Test credential CRUD.""" @@ -258,6 +286,126 @@ def test_update_credential_to_ec2_with_previously_set_project_id(self): 'credential_id': credential_id}, body={'credential': update_ref}) + def test_update_credential_non_owner(self): + """Call ``PATCH /credentials/{credential_id}``.""" + alt_user = unit.create_user( + PROVIDERS.identity_api, domain_id=self.domain_id) + alt_user_id = alt_user['id'] + alt_project = unit.new_project_ref(domain_id=self.domain_id) + alt_project_id = alt_project['id'] + PROVIDERS.resource_api.create_project( + alt_project['id'], alt_project) + alt_role = unit.new_role_ref(name='reader') + alt_role_id = alt_role['id'] + PROVIDERS.role_api.create_role(alt_role_id, alt_role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + alt_user_id, alt_project_id, alt_role_id) + auth = self.build_authentication_request( + user_id=alt_user_id, + password=alt_user['password'], + project_id=alt_project_id) + ref = unit.new_credential_ref(user_id=alt_user_id, + project_id=alt_project_id) + r = self.post( + '/credentials', + auth=auth, + body={'credential': ref}) + self.assertValidCredentialResponse(r, ref) + credential_id = r.result.get('credential')['id'] + + # Cannot change the credential to be owned by another user + update_ref = {'user_id': self.user_id, 'project_id': self.project_id} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + expected_status=403, + auth=auth, + body={'credential': update_ref}) + + def test_update_ec2_credential_change_trust_id(self): + """Call ``PATCH /credentials/{credential_id}``.""" + blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + project_id=self.project_id) + blob['trust_id'] = uuid.uuid4().hex + ref['blob'] = json.dumps(blob) + r = self.post( + '/credentials', + body={'credential': ref}) + self.assertValidCredentialResponse(r, ref) + credential_id = r.result.get('credential')['id'] + # Try changing to a different trust + blob['trust_id'] = uuid.uuid4().hex + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http_client.BAD_REQUEST) + # Try removing the trust + del blob['trust_id'] + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http_client.BAD_REQUEST) + + def test_update_ec2_credential_change_app_cred_id(self): + """Call ``PATCH /credentials/{credential_id}``.""" + blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + project_id=self.project_id) + blob['app_cred_id'] = uuid.uuid4().hex + ref['blob'] = json.dumps(blob) + r = self.post( + '/credentials', + body={'credential': ref}) + self.assertValidCredentialResponse(r, ref) + credential_id = r.result.get('credential')['id'] + # Try changing to a different app cred + blob['app_cred_id'] = uuid.uuid4().hex + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http_client.BAD_REQUEST) + # Try removing the app cred + del blob['app_cred_id'] + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http_client.BAD_REQUEST) + + def test_update_ec2_credential_change_access_token_id(self): + """Call ``PATCH /credentials/{credential_id}``.""" + blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + project_id=self.project_id) + blob['access_token_id'] = uuid.uuid4().hex + ref['blob'] = json.dumps(blob) + r = self.post( + '/credentials', + body={'credential': ref}) + self.assertValidCredentialResponse(r, ref) + credential_id = r.result.get('credential')['id'] + # Try changing to a different access token + blob['access_token_id'] = uuid.uuid4().hex + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http_client.BAD_REQUEST) + # Try removing the access token + del blob['access_token_id'] + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http_client.BAD_REQUEST) + def test_delete_credential(self): """Call ``DELETE /credentials/{credential_id}``.""" self.delete( @@ -393,7 +541,7 @@ def test_create_credential_with_admin_token(self): self.assertValidCredentialResponse(r, ref) -class TestCredentialTrustScoped(test_v3.RestfulTestCase): +class TestCredentialTrustScoped(CredentialBaseTestCase): """Test credential with trust scoped token.""" def setUp(self): @@ -446,7 +594,7 @@ def test_trust_scoped_ec2_credential(self): token_id = r.headers.get('X-Subject-Token') # Create the credential with the trust scoped token - blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + blob, ref = unit.new_ec2_credential(user_id=self.user_id, project_id=self.project_id) r = self.post('/credentials', body={'credential': ref}, token=token_id) @@ -463,6 +611,21 @@ def test_trust_scoped_ec2_credential(self): self.assertEqual(hashlib.sha256(access).hexdigest(), r.result['credential']['id']) + # Create a role assignment to ensure that it is ignored and only the + # trust-delegated roles are used + role = unit.new_role_ref(name='reader') + role_id = role['id'] + PROVIDERS.role_api.create_role(role_id, role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, self.project_id, role_id) + + ret_blob = json.loads(r.result['credential']['blob']) + ec2token = self._test_get_token( + access=ret_blob['access'], secret=ret_blob['secret']) + ec2_roles = [role['id'] for role in ec2token['roles']] + self.assertIn(self.role_id, ec2_roles) + self.assertNotIn(role_id, ec2_roles) + # Create second ec2 credential with the same access key id and check # for conflict. self.post( @@ -472,34 +635,229 @@ def test_trust_scoped_ec2_credential(self): expected_status=http_client.CONFLICT) -class TestCredentialEc2(CredentialBaseTestCase): - """Test v3 credential compatibility with ec2tokens.""" +class TestCredentialAppCreds(CredentialBaseTestCase): + """Test credential with application credential token.""" - def _validate_signature(self, access, secret): - """Test signature validation with the access/secret provided.""" - signer = ec2_utils.Ec2Signer(secret) - params = {'SignatureMethod': 'HmacSHA256', - 'SignatureVersion': '2', - 'AWSAccessKeyId': access} - request = {'host': 'foo', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - signature = signer.generate(request) + def setUp(self): + super(TestCredentialAppCreds, self).setUp() + self.useFixture( + ksfixtures.KeyRepository( + self.config_fixture, + 'credential', + credential_fernet.MAX_ACTIVE_KEYS + ) + ) - # Now make a request to validate the signed dummy request via the - # ec2tokens API. This proves the v3 ec2 credentials actually work. - sig_ref = {'access': access, - 'signature': signature, - 'host': 'foo', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - r = self.post( - '/ec2tokens', - body={'ec2Credentials': sig_ref}, - expected_status=http_client.OK) - self.assertValidTokenResponse(r) + def test_app_cred_ec2_credential(self): + """Test creating ec2 credential from an application credential. + + Call ``POST /credentials``. + """ + # Create the app cred + ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}]) + del ref['id'] + r = self.post('/users/%s/application_credentials' % self.user_id, + body={'application_credential': ref}) + app_cred = r.result['application_credential'] + + # Get an application credential token + auth_data = self.build_authentication_request( + app_cred_id=app_cred['id'], + secret=app_cred['secret']) + r = self.v3_create_token(auth_data) + token_id = r.headers.get('X-Subject-Token') + + # Create the credential with the app cred token + blob, ref = unit.new_ec2_credential(user_id=self.user_id, + project_id=self.project_id) + r = self.post('/credentials', body={'credential': ref}, token=token_id) + + # We expect the response blob to contain the app_cred_id + ret_ref = ref.copy() + ret_blob = blob.copy() + ret_blob['app_cred_id'] = app_cred['id'] + ret_ref['blob'] = json.dumps(ret_blob) + self.assertValidCredentialResponse(r, ref=ret_ref) + + # Assert credential id is same as hash of access key id for + # ec2 credentials + access = blob['access'].encode('utf-8') + self.assertEqual(hashlib.sha256(access).hexdigest(), + r.result['credential']['id']) + + # Create a role assignment to ensure that it is ignored and only the + # roles in the app cred are used + role = unit.new_role_ref(name='reader') + role_id = role['id'] + PROVIDERS.role_api.create_role(role_id, role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, self.project_id, role_id) + + ret_blob = json.loads(r.result['credential']['blob']) + ec2token = self._test_get_token( + access=ret_blob['access'], secret=ret_blob['secret']) + ec2_roles = [role['id'] for role in ec2token['roles']] + self.assertIn(self.role_id, ec2_roles) + self.assertNotIn(role_id, ec2_roles) + + # Create second ec2 credential with the same access key id and check + # for conflict. + self.post( + '/credentials', + body={'credential': ref}, + token=token_id, + expected_status=http_client.CONFLICT) + + +class TestCredentialAccessToken(CredentialBaseTestCase): + """Test credential with access token.""" + + def setUp(self): + super(TestCredentialAccessToken, self).setUp() + self.useFixture( + ksfixtures.KeyRepository( + self.config_fixture, + 'credential', + credential_fernet.MAX_ACTIVE_KEYS + ) + ) + self.base_url = 'http://localhost/v3' + + def _urllib_parse_qs_text_keys(self, content): + results = urllib.parse.parse_qs(content) + return {key.decode('utf-8'): value for key, value in results.items()} + + def _create_single_consumer(self): + endpoint = '/OS-OAUTH1/consumers' + + ref = {'description': uuid.uuid4().hex} + resp = self.post(endpoint, body={'consumer': ref}) + return resp.result['consumer'] + + def _create_request_token(self, consumer, project_id, base_url=None): + endpoint = '/OS-OAUTH1/request_token' + client = oauth1.Client(consumer['key'], + client_secret=consumer['secret'], + signature_method=oauth1.SIG_HMAC, + callback_uri="oob") + headers = {'requested_project_id': project_id} + if not base_url: + base_url = self.base_url + url, headers, body = client.sign(base_url + endpoint, + http_method='POST', + headers=headers) + return endpoint, headers + + def _create_access_token(self, consumer, token, base_url=None): + endpoint = '/OS-OAUTH1/access_token' + client = oauth1.Client(consumer['key'], + client_secret=consumer['secret'], + resource_owner_key=token.key, + resource_owner_secret=token.secret, + signature_method=oauth1.SIG_HMAC, + verifier=token.verifier) + if not base_url: + base_url = self.base_url + url, headers, body = client.sign(base_url + endpoint, + http_method='POST') + headers.update({'Content-Type': 'application/json'}) + return endpoint, headers + + def _get_oauth_token(self, consumer, token): + client = oauth1.Client(consumer['key'], + client_secret=consumer['secret'], + resource_owner_key=token.key, + resource_owner_secret=token.secret, + signature_method=oauth1.SIG_HMAC) + endpoint = '/auth/tokens' + url, headers, body = client.sign(self.base_url + endpoint, + http_method='POST') + headers.update({'Content-Type': 'application/json'}) + ref = {'auth': {'identity': {'oauth1': {}, 'methods': ['oauth1']}}} + return endpoint, headers, ref + + def _authorize_request_token(self, request_id): + if isinstance(request_id, bytes): + request_id = request_id.decode() + return '/OS-OAUTH1/authorize/%s' % (request_id) + + def _get_access_token(self): + consumer = self._create_single_consumer() + consumer_id = consumer['id'] + consumer_secret = consumer['secret'] + consumer = {'key': consumer_id, 'secret': consumer_secret} + + url, headers = self._create_request_token(consumer, self.project_id) + content = self.post( + url, headers=headers, + response_content_type='application/x-www-form-urlencoded') + credentials = self._urllib_parse_qs_text_keys(content.result) + request_key = credentials['oauth_token'][0] + request_secret = credentials['oauth_token_secret'][0] + request_token = oauth1.Token(request_key, request_secret) + + url = self._authorize_request_token(request_key) + body = {'roles': [{'id': self.role_id}]} + resp = self.put(url, body=body, expected_status=http_client.OK) + verifier = resp.result['token']['oauth_verifier'] + + request_token.set_verifier(verifier) + url, headers = self._create_access_token(consumer, request_token) + content = self.post( + url, headers=headers, + response_content_type='application/x-www-form-urlencoded') + credentials = self._urllib_parse_qs_text_keys(content.result) + access_key = credentials['oauth_token'][0] + access_secret = credentials['oauth_token_secret'][0] + access_token = oauth1.Token(access_key, access_secret) + + url, headers, body = self._get_oauth_token(consumer, access_token) + content = self.post(url, headers=headers, body=body) + return access_key, content.headers['X-Subject-Token'] + + def test_access_token_ec2_credential(self): + """Test creating ec2 credential from an oauth access token. + + Call ``POST /credentials``. + """ + access_key, token_id = self._get_access_token() + + # Create the credential with the access token + blob, ref = unit.new_ec2_credential(user_id=self.user_id, + project_id=self.project_id) + r = self.post('/credentials', body={'credential': ref}, token=token_id) + + # We expect the response blob to contain the access_token_id + ret_ref = ref.copy() + ret_blob = blob.copy() + ret_blob['access_token_id'] = access_key.decode('utf-8') + ret_ref['blob'] = json.dumps(ret_blob) + self.assertValidCredentialResponse(r, ref=ret_ref) + + # Assert credential id is same as hash of access key id for + # ec2 credentials + access = blob['access'].encode('utf-8') + self.assertEqual(hashlib.sha256(access).hexdigest(), + r.result['credential']['id']) + + # Create a role assignment to ensure that it is ignored and only the + # roles in the access token are used + role = unit.new_role_ref(name='reader') + role_id = role['id'] + PROVIDERS.role_api.create_role(role_id, role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, self.project_id, role_id) + + ret_blob = json.loads(r.result['credential']['blob']) + ec2token = self._test_get_token( + access=ret_blob['access'], secret=ret_blob['secret']) + ec2_roles = [role['id'] for role in ec2token['roles']] + self.assertIn(self.role_id, ec2_roles) + self.assertNotIn(role_id, ec2_roles) + + +class TestCredentialEc2(CredentialBaseTestCase): + """Test v3 credential compatibility with ec2tokens.""" def test_ec2_credential_signature_validate(self): """Test signature validation with a v3 ec2 credential.""" @@ -514,15 +872,15 @@ def test_ec2_credential_signature_validate(self): cred_blob = json.loads(r.result['credential']['blob']) self.assertEqual(blob, cred_blob) - self._validate_signature(access=cred_blob['access'], - secret=cred_blob['secret']) + self._test_get_token(access=cred_blob['access'], + secret=cred_blob['secret']) def test_ec2_credential_signature_validate_legacy(self): """Test signature validation with a legacy v3 ec2 credential.""" cred_json, _ = self._create_dict_blob_credential() cred_blob = json.loads(cred_json) - self._validate_signature(access=cred_blob['access'], - secret=cred_blob['secret']) + self._test_get_token(access=cred_blob['access'], + secret=cred_blob['secret']) def _get_ec2_cred_uri(self): return '/users/%s/credentials/OS-EC2' % self.user_id @@ -538,8 +896,8 @@ def test_ec2_create_credential(self): self.assertEqual(self.user_id, ec2_cred['user_id']) self.assertEqual(self.project_id, ec2_cred['tenant_id']) self.assertIsNone(ec2_cred['trust_id']) - self._validate_signature(access=ec2_cred['access'], - secret=ec2_cred['secret']) + self._test_get_token(access=ec2_cred['access'], + secret=ec2_cred['secret']) uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']]) self.assertThat(ec2_cred['links']['self'], matchers.EndsWith(uri))
releasenotes/notes/bug-1872733-2377f456a57ad32c.yaml+16 −0 added@@ -0,0 +1,16 @@ +--- +critical: + - | + [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_] + Fixed a critical security issue in which an authenticated user could + escalate their privileges by altering a valid EC2 credential. +security: + - | + [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_] + Fixed a critical security issue in which an authenticated user could + escalate their privileges by altering a valid EC2 credential. +fixes: + - | + [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_] + Fixed a critical security issue in which an authenticated user could + escalate their privileges by altering a valid EC2 credential.
releasenotes/notes/bug-1872735-0989e51d2248ce1e.yaml+31 −0 added@@ -0,0 +1,31 @@ +--- +critical: + - | + [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_] + Fixed a security issue in which a trustee or an application credential user + could create an EC2 credential or an application credential that would + permit them to get a token that elevated their role assignments beyond the + subset delegated to them in the trust or application credential. A new + attribute ``app_cred_id`` is now automatically added to the access blob of + an EC2 credential and the role list in the trust or application credential + is respected. +security: + - | + [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_] + Fixed a security issue in which a trustee or an application credential user + could create an EC2 credential or an application credential that would + permit them to get a token that elevated their role assignments beyond the + subset delegated to them in the trust or application credential. A new + attribute ``app_cred_id`` is now automatically added to the access blob of + an EC2 credential and the role list in the trust or application credential + is respected. +fixes: + - | + [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_] + Fixed a security issue in which a trustee or an application credential user + could create an EC2 credential or an application credential that would + permit them to get a token that elevated their role assignments beyond the + subset delegated to them in the trust or application credential. A new + attribute ``app_cred_id`` is now automatically added to the access blob of + an EC2 credential and the role list in the trust or application credential + is respected.
releasenotes/notes/bug-1872755-2c81d3267b89f124.yaml+19 −0 added@@ -0,0 +1,19 @@ +--- +security: + - | + [`bug 1872755 <https://bugs.launchpad.net/keystone/+bug/1872755>`_] + Added validation to the EC2 credentials update API to ensure the metadata + labels 'trust_id' and 'app_cred_id' are not altered by the user. These + labels are used by keystone to determine the scope allowed by the + credential, and altering these automatic labels could enable an EC2 + credential holder to elevate their access beyond what is permitted by the + application credential or trust that was used to create the EC2 credential. +fixes: + - | + [`bug 1872755 <https://bugs.launchpad.net/keystone/+bug/1872755>`_] + Added validation to the EC2 credentials update API to ensure the metadata + labels 'trust_id' and 'app_cred_id' are not altered by the user. These + labels are used by keystone to determine the scope allowed by the + credential, and altering these automatic labels could enable an EC2 + credential holder to elevate their access beyond what is permitted by the + application credential or trust that was used to create the EC2 credential.
37e9907a176dFix security issues with EC2 credentials
8 files changed · +603 −59
keystone/api/credentials.py+53 −18 modified@@ -60,30 +60,41 @@ def _blob_to_json(ref): ref['blob'] = jsonutils.dumps(blob) return ref - def _assign_unique_id(self, ref, trust_id=None): + def _validate_blob_json(self, ref): + try: + blob = jsonutils.loads(ref.get('blob')) + except (ValueError, TabError): + raise exception.ValidationError( + message=_('Invalid blob in credential')) + if not blob or not isinstance(blob, dict): + raise exception.ValidationError(attribute='blob', + target='credential') + if blob.get('access') is None: + raise exception.ValidationError(attribute='access', + target='credential') + return blob + + def _assign_unique_id( + self, ref, trust_id=None, app_cred_id=None, access_token_id=None): # Generates an assigns a unique identifier to a credential reference. if ref.get('type', '').lower() == 'ec2': - try: - blob = jsonutils.loads(ref.get('blob')) - except (ValueError, TabError): - raise exception.ValidationError( - message=_('Invalid blob in credential')) - if not blob or not isinstance(blob, dict): - raise exception.ValidationError(attribute='blob', - target='credential') - if blob.get('access') is None: - raise exception.ValidationError(attribute='access', - target='credential') - + blob = self._validate_blob_json(ref) ref = ref.copy() ref['id'] = hashlib.sha256( blob['access'].encode('utf8')).hexdigest() - # update the blob with the trust_id, so credentials created with - # a trust scoped token will result in trust scoped tokens when - # authentication via ec2tokens happens + # update the blob with the trust_id or app_cred_id, so credentials + # created with a trust- or app cred-scoped token will result in + # trust- or app cred-scoped tokens when authentication via + # ec2tokens happens if trust_id is not None: blob['trust_id'] = trust_id ref['blob'] = jsonutils.dumps(blob) + if app_cred_id is not None: + blob['app_cred_id'] = app_cred_id + ref['blob'] = jsonutils.dumps(blob) + if access_token_id is not None: + blob['access_token_id'] = access_token_id + ref['blob'] = jsonutils.dumps(blob) return ref else: return super(CredentialResource, self)._assign_unique_id(ref) @@ -146,23 +157,47 @@ def post(self): ) validation.lazy_validate(schema.credential_create, credential) trust_id = getattr(self.oslo_context, 'trust_id', None) + app_cred_id = getattr( + self.auth_context['token'], 'application_credential_id', None) + access_token_id = getattr( + self.auth_context['token'], 'access_token_id', None) ref = self._assign_unique_id( - self._normalize_dict(credential), trust_id=trust_id) + self._normalize_dict(credential), + trust_id=trust_id, app_cred_id=app_cred_id, + access_token_id=access_token_id) ref = PROVIDERS.credential_api.create_credential( ref['id'], ref, initiator=self.audit_initiator) return self.wrap_member(ref), http.client.CREATED + def _validate_blob_update_keys(self, credential, ref): + if credential.get('type', '').lower() == 'ec2': + new_blob = self._validate_blob_json(ref) + old_blob = credential.get('blob') + if isinstance(old_blob, str): + old_blob = jsonutils.loads(old_blob) + # if there was a scope set, prevent changing it or unsetting it + for key in ['trust_id', 'app_cred_id', 'access_token_id']: + if old_blob.get(key) != new_blob.get(key): + message = _('%s can not be updated for credential') % key + raise exception.ValidationError(message=message) + def patch(self, credential_id): # Update Credential ENFORCER.enforce_call( action='identity:update_credential', build_target=_build_target_enforcement ) - PROVIDERS.credential_api.get_credential(credential_id) + current = PROVIDERS.credential_api.get_credential(credential_id) credential = self.request_body_json.get('credential', {}) validation.lazy_validate(schema.credential_update, credential) + self._validate_blob_update_keys(current.copy(), credential.copy()) self._require_matching_id(credential) + # Check that the user hasn't illegally modified the owner or scope + target = {'credential': dict(current, **credential)} + ENFORCER.enforce_call( + action='identity:update_credential', target_attr=target + ) ref = PROVIDERS.credential_api.update_credential( credential_id, credential) return self.wrap_member(ref)
keystone/api/_shared/EC2_S3_Resource.py+40 −5 modified@@ -113,7 +113,9 @@ def handle_authenticate(self): project_id=cred.get('project_id'), access=loaded.get('access'), secret=loaded.get('secret'), - trust_id=loaded.get('trust_id') + trust_id=loaded.get('trust_id'), + app_cred_id=loaded.get('app_cred_id'), + access_token_id=loaded.get('access_token_id') ) # validate the signature @@ -132,8 +134,34 @@ def handle_authenticate(self): raise ks_exceptions.Unauthorized from e self._check_timestamp(credentials) - roles = PROVIDERS.assignment_api.get_roles_for_user_and_project( - user_ref['id'], project_ref['id']) + + trustee_user_id = None + auth_context = None + if cred_data['trust_id']: + trust = PROVIDERS.trust_api.get_trust(cred_data['trust_id']) + roles = [r['id'] for r in trust['roles']] + # NOTE(cmurphy): if this credential was created using a + # trust-scoped token with impersonation, the user_id will be for + # the trustor, not the trustee. In this case, issuing a + # trust-scoped token to the trustor will fail. In order to get a + # trust-scoped token, use the user ID of the trustee. With + # impersonation, the resulting token will still be for the trustor. + # Without impersonation, the token will be for the trustee. + if trust['impersonation'] is True: + trustee_user_id = trust['trustee_user_id'] + elif cred_data['app_cred_id']: + ac_client = PROVIDERS.application_credential_api + app_cred = ac_client.get_application_credential( + cred_data['app_cred_id']) + roles = [r['id'] for r in app_cred['roles']] + elif cred_data['access_token_id']: + access_token = PROVIDERS.oauth_api.get_access_token( + cred_data['access_token_id']) + roles = jsonutils.loads(access_token['role_ids']) + auth_context = {'access_token_id': cred_data['access_token_id']} + else: + roles = PROVIDERS.assignment_api.get_roles_for_user_and_project( + user_ref['id'], project_ref['id']) if not roles: raise ks_exceptions.Unauthorized(_('User not valid for project.')) @@ -144,7 +172,14 @@ def handle_authenticate(self): method_names = ['ec2credential'] + if trustee_user_id: + user_id = trustee_user_id + else: + user_id = user_ref['id'] token = PROVIDERS.token_provider_api.issue_token( - user_id=user_ref['id'], method_names=method_names, - project_id=project_ref['id']) + user_id=user_id, method_names=method_names, + project_id=project_ref['id'], + trust_id=cred_data['trust_id'], + app_cred_id=cred_data['app_cred_id'], + auth_context=auth_context) return token
keystone/api/users.py+20 −2 modified@@ -559,6 +559,25 @@ def _normalize_role_list(app_cred_roles): role['name'])) return roles + def _get_roles(self, app_cred_data, token): + if app_cred_data.get('roles'): + roles = self._normalize_role_list(app_cred_data['roles']) + # NOTE(cmurphy): The user is not allowed to add a role that is not + # in their token. This is to prevent trustees or application + # credential users from escallating their privileges to include + # additional roles that the trustor or application credential + # creator has assigned on the project. + token_roles = [r['id'] for r in token.roles] + for role in roles: + if role['id'] not in token_roles: + detail = _('Cannot create an application credential with ' + 'unassigned role') + raise ks_exception.ApplicationCredentialValidationError( + detail=detail) + else: + roles = token.roles + return roles + def get(self, user_id): """List application credentials for user. @@ -594,8 +613,7 @@ def post(self, user_id): app_cred_data['secret'] = self._generate_secret() app_cred_data['user_id'] = user_id app_cred_data['project_id'] = project_id - app_cred_data['roles'] = self._normalize_role_list( - app_cred_data.get('roles', token.roles)) + app_cred_data['roles'] = self._get_roles(app_cred_data, token) if app_cred_data.get('expires_at'): app_cred_data['expires_at'] = utils.parse_expiration_date( app_cred_data['expires_at'])
keystone/tests/unit/test_v3_application_credential.py+31 −0 modified@@ -174,6 +174,37 @@ def test_create_application_credential_with_application_credential(self): expected_status_code=http.client.FORBIDDEN, headers={'X-Auth-Token': token}) + def test_create_application_credential_with_trust(self): + second_role = unit.new_role_ref(name='reader') + PROVIDERS.role_api.create_role(second_role['id'], second_role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, self.project_id, second_role['id']) + with self.test_client() as c: + pw_token = self.get_scoped_token() + # create a self-trust - only the roles are important for this test + trust_ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.user_id, + project_id=self.project_id, + role_ids=[second_role['id']]) + resp = c.post('/v3/OS-TRUST/trusts', + headers={'X-Auth-Token': pw_token}, + json={'trust': trust_ref}) + trust_id = resp.json['trust']['id'] + trust_auth = self.build_authentication_request( + user_id=self.user_id, + password=self.user['password'], + trust_id=trust_id) + trust_token = self.v3_create_token( + trust_auth).headers['X-Subject-Token'] + app_cred = self._app_cred_body(roles=[{'id': self.role_id}]) + # only the roles from the trust token should be allowed, even if + # the user has the role assigned on the project + c.post('/v3/users/%s/application_credentials' % self.user_id, + headers={'X-Auth-Token': trust_token}, + json=app_cred, + expected_status_code=http.client.BAD_REQUEST) + def test_create_application_credential_allow_recursion(self): with self.test_client() as c: roles = [{'id': self.role_id}]
keystone/tests/unit/test_v3_credential.py+393 −34 modified@@ -21,12 +21,14 @@ from keystoneclient.contrib.ec2 import utils as ec2_utils from oslo_db import exception as oslo_db_exception from testtools import matchers +import urllib from keystone.api import ec2tokens from keystone.common import provider_api from keystone.common import utils from keystone.credential.providers import fernet as credential_fernet from keystone import exception +from keystone import oauth1 from keystone.tests import unit from keystone.tests.unit import ksfixtures from keystone.tests.unit import test_v3 @@ -63,6 +65,33 @@ def _create_dict_blob_credential(self): return json.dumps(blob), credential_id + def _test_get_token(self, access, secret): + """Test signature validation with the access/secret provided.""" + signer = ec2_utils.Ec2Signer(secret) + params = {'SignatureMethod': 'HmacSHA256', + 'SignatureVersion': '2', + 'AWSAccessKeyId': access} + request = {'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + signature = signer.generate(request) + + # Now make a request to validate the signed dummy request via the + # ec2tokens API. This proves the v3 ec2 credentials actually work. + sig_ref = {'access': access, + 'signature': signature, + 'host': 'foo', + 'verb': 'GET', + 'path': '/bar', + 'params': params} + r = self.post( + '/ec2tokens', + body={'ec2Credentials': sig_ref}, + expected_status=http.client.OK) + self.assertValidTokenResponse(r) + return r.result['token'] + class CredentialTestCase(CredentialBaseTestCase): """Test credential CRUD.""" @@ -258,6 +287,126 @@ def test_update_credential_to_ec2_with_previously_set_project_id(self): 'credential_id': credential_id}, body={'credential': update_ref}) + def test_update_credential_non_owner(self): + """Call ``PATCH /credentials/{credential_id}``.""" + alt_user = unit.create_user( + PROVIDERS.identity_api, domain_id=self.domain_id) + alt_user_id = alt_user['id'] + alt_project = unit.new_project_ref(domain_id=self.domain_id) + alt_project_id = alt_project['id'] + PROVIDERS.resource_api.create_project( + alt_project['id'], alt_project) + alt_role = unit.new_role_ref(name='reader') + alt_role_id = alt_role['id'] + PROVIDERS.role_api.create_role(alt_role_id, alt_role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + alt_user_id, alt_project_id, alt_role_id) + auth = self.build_authentication_request( + user_id=alt_user_id, + password=alt_user['password'], + project_id=alt_project_id) + ref = unit.new_credential_ref(user_id=alt_user_id, + project_id=alt_project_id) + r = self.post( + '/credentials', + auth=auth, + body={'credential': ref}) + self.assertValidCredentialResponse(r, ref) + credential_id = r.result.get('credential')['id'] + + # Cannot change the credential to be owned by another user + update_ref = {'user_id': self.user_id, 'project_id': self.project_id} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + expected_status=403, + auth=auth, + body={'credential': update_ref}) + + def test_update_ec2_credential_change_trust_id(self): + """Call ``PATCH /credentials/{credential_id}``.""" + blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + project_id=self.project_id) + blob['trust_id'] = uuid.uuid4().hex + ref['blob'] = json.dumps(blob) + r = self.post( + '/credentials', + body={'credential': ref}) + self.assertValidCredentialResponse(r, ref) + credential_id = r.result.get('credential')['id'] + # Try changing to a different trust + blob['trust_id'] = uuid.uuid4().hex + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http.client.BAD_REQUEST) + # Try removing the trust + del blob['trust_id'] + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http.client.BAD_REQUEST) + + def test_update_ec2_credential_change_app_cred_id(self): + """Call ``PATCH /credentials/{credential_id}``.""" + blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + project_id=self.project_id) + blob['app_cred_id'] = uuid.uuid4().hex + ref['blob'] = json.dumps(blob) + r = self.post( + '/credentials', + body={'credential': ref}) + self.assertValidCredentialResponse(r, ref) + credential_id = r.result.get('credential')['id'] + # Try changing to a different app cred + blob['app_cred_id'] = uuid.uuid4().hex + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http.client.BAD_REQUEST) + # Try removing the app cred + del blob['app_cred_id'] + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http.client.BAD_REQUEST) + + def test_update_ec2_credential_change_access_token_id(self): + """Call ``PATCH /credentials/{credential_id}``.""" + blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + project_id=self.project_id) + blob['access_token_id'] = uuid.uuid4().hex + ref['blob'] = json.dumps(blob) + r = self.post( + '/credentials', + body={'credential': ref}) + self.assertValidCredentialResponse(r, ref) + credential_id = r.result.get('credential')['id'] + # Try changing to a different access token + blob['access_token_id'] = uuid.uuid4().hex + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http.client.BAD_REQUEST) + # Try removing the access token + del blob['access_token_id'] + update_ref = {'blob': json.dumps(blob)} + self.patch( + '/credentials/%(credential_id)s' % { + 'credential_id': credential_id}, + body={'credential': update_ref}, + expected_status=http.client.BAD_REQUEST) + def test_delete_credential(self): """Call ``DELETE /credentials/{credential_id}``.""" self.delete( @@ -393,7 +542,7 @@ def test_create_credential_with_admin_token(self): self.assertValidCredentialResponse(r, ref) -class TestCredentialTrustScoped(test_v3.RestfulTestCase): +class TestCredentialTrustScoped(CredentialBaseTestCase): """Test credential with trust scoped token.""" def setUp(self): @@ -446,7 +595,7 @@ def test_trust_scoped_ec2_credential(self): token_id = r.headers.get('X-Subject-Token') # Create the credential with the trust scoped token - blob, ref = unit.new_ec2_credential(user_id=self.user['id'], + blob, ref = unit.new_ec2_credential(user_id=self.user_id, project_id=self.project_id) r = self.post('/credentials', body={'credential': ref}, token=token_id) @@ -463,6 +612,21 @@ def test_trust_scoped_ec2_credential(self): self.assertEqual(hashlib.sha256(access).hexdigest(), r.result['credential']['id']) + # Create a role assignment to ensure that it is ignored and only the + # trust-delegated roles are used + role = unit.new_role_ref(name='reader') + role_id = role['id'] + PROVIDERS.role_api.create_role(role_id, role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, self.project_id, role_id) + + ret_blob = json.loads(r.result['credential']['blob']) + ec2token = self._test_get_token( + access=ret_blob['access'], secret=ret_blob['secret']) + ec2_roles = [role['id'] for role in ec2token['roles']] + self.assertIn(self.role_id, ec2_roles) + self.assertNotIn(role_id, ec2_roles) + # Create second ec2 credential with the same access key id and check # for conflict. self.post( @@ -472,34 +636,229 @@ def test_trust_scoped_ec2_credential(self): expected_status=http.client.CONFLICT) -class TestCredentialEc2(CredentialBaseTestCase): - """Test v3 credential compatibility with ec2tokens.""" +class TestCredentialAppCreds(CredentialBaseTestCase): + """Test credential with application credential token.""" - def _validate_signature(self, access, secret): - """Test signature validation with the access/secret provided.""" - signer = ec2_utils.Ec2Signer(secret) - params = {'SignatureMethod': 'HmacSHA256', - 'SignatureVersion': '2', - 'AWSAccessKeyId': access} - request = {'host': 'foo', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - signature = signer.generate(request) + def setUp(self): + super(TestCredentialAppCreds, self).setUp() + self.useFixture( + ksfixtures.KeyRepository( + self.config_fixture, + 'credential', + credential_fernet.MAX_ACTIVE_KEYS + ) + ) - # Now make a request to validate the signed dummy request via the - # ec2tokens API. This proves the v3 ec2 credentials actually work. - sig_ref = {'access': access, - 'signature': signature, - 'host': 'foo', - 'verb': 'GET', - 'path': '/bar', - 'params': params} - r = self.post( - '/ec2tokens', - body={'ec2Credentials': sig_ref}, - expected_status=http.client.OK) - self.assertValidTokenResponse(r) + def test_app_cred_ec2_credential(self): + """Test creating ec2 credential from an application credential. + + Call ``POST /credentials``. + """ + # Create the app cred + ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}]) + del ref['id'] + r = self.post('/users/%s/application_credentials' % self.user_id, + body={'application_credential': ref}) + app_cred = r.result['application_credential'] + + # Get an application credential token + auth_data = self.build_authentication_request( + app_cred_id=app_cred['id'], + secret=app_cred['secret']) + r = self.v3_create_token(auth_data) + token_id = r.headers.get('X-Subject-Token') + + # Create the credential with the app cred token + blob, ref = unit.new_ec2_credential(user_id=self.user_id, + project_id=self.project_id) + r = self.post('/credentials', body={'credential': ref}, token=token_id) + + # We expect the response blob to contain the app_cred_id + ret_ref = ref.copy() + ret_blob = blob.copy() + ret_blob['app_cred_id'] = app_cred['id'] + ret_ref['blob'] = json.dumps(ret_blob) + self.assertValidCredentialResponse(r, ref=ret_ref) + + # Assert credential id is same as hash of access key id for + # ec2 credentials + access = blob['access'].encode('utf-8') + self.assertEqual(hashlib.sha256(access).hexdigest(), + r.result['credential']['id']) + + # Create a role assignment to ensure that it is ignored and only the + # roles in the app cred are used + role = unit.new_role_ref(name='reader') + role_id = role['id'] + PROVIDERS.role_api.create_role(role_id, role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, self.project_id, role_id) + + ret_blob = json.loads(r.result['credential']['blob']) + ec2token = self._test_get_token( + access=ret_blob['access'], secret=ret_blob['secret']) + ec2_roles = [role['id'] for role in ec2token['roles']] + self.assertIn(self.role_id, ec2_roles) + self.assertNotIn(role_id, ec2_roles) + + # Create second ec2 credential with the same access key id and check + # for conflict. + self.post( + '/credentials', + body={'credential': ref}, + token=token_id, + expected_status=http.client.CONFLICT) + + +class TestCredentialAccessToken(CredentialBaseTestCase): + """Test credential with access token.""" + + def setUp(self): + super(TestCredentialAccessToken, self).setUp() + self.useFixture( + ksfixtures.KeyRepository( + self.config_fixture, + 'credential', + credential_fernet.MAX_ACTIVE_KEYS + ) + ) + self.base_url = 'http://localhost/v3' + + def _urllib_parse_qs_text_keys(self, content): + results = urllib.parse.parse_qs(content) + return {key.decode('utf-8'): value for key, value in results.items()} + + def _create_single_consumer(self): + endpoint = '/OS-OAUTH1/consumers' + + ref = {'description': uuid.uuid4().hex} + resp = self.post(endpoint, body={'consumer': ref}) + return resp.result['consumer'] + + def _create_request_token(self, consumer, project_id, base_url=None): + endpoint = '/OS-OAUTH1/request_token' + client = oauth1.Client(consumer['key'], + client_secret=consumer['secret'], + signature_method=oauth1.SIG_HMAC, + callback_uri="oob") + headers = {'requested_project_id': project_id} + if not base_url: + base_url = self.base_url + url, headers, body = client.sign(base_url + endpoint, + http_method='POST', + headers=headers) + return endpoint, headers + + def _create_access_token(self, consumer, token, base_url=None): + endpoint = '/OS-OAUTH1/access_token' + client = oauth1.Client(consumer['key'], + client_secret=consumer['secret'], + resource_owner_key=token.key, + resource_owner_secret=token.secret, + signature_method=oauth1.SIG_HMAC, + verifier=token.verifier) + if not base_url: + base_url = self.base_url + url, headers, body = client.sign(base_url + endpoint, + http_method='POST') + headers.update({'Content-Type': 'application/json'}) + return endpoint, headers + + def _get_oauth_token(self, consumer, token): + client = oauth1.Client(consumer['key'], + client_secret=consumer['secret'], + resource_owner_key=token.key, + resource_owner_secret=token.secret, + signature_method=oauth1.SIG_HMAC) + endpoint = '/auth/tokens' + url, headers, body = client.sign(self.base_url + endpoint, + http_method='POST') + headers.update({'Content-Type': 'application/json'}) + ref = {'auth': {'identity': {'oauth1': {}, 'methods': ['oauth1']}}} + return endpoint, headers, ref + + def _authorize_request_token(self, request_id): + if isinstance(request_id, bytes): + request_id = request_id.decode() + return '/OS-OAUTH1/authorize/%s' % (request_id) + + def _get_access_token(self): + consumer = self._create_single_consumer() + consumer_id = consumer['id'] + consumer_secret = consumer['secret'] + consumer = {'key': consumer_id, 'secret': consumer_secret} + + url, headers = self._create_request_token(consumer, self.project_id) + content = self.post( + url, headers=headers, + response_content_type='application/x-www-form-urlencoded') + credentials = self._urllib_parse_qs_text_keys(content.result) + request_key = credentials['oauth_token'][0] + request_secret = credentials['oauth_token_secret'][0] + request_token = oauth1.Token(request_key, request_secret) + + url = self._authorize_request_token(request_key) + body = {'roles': [{'id': self.role_id}]} + resp = self.put(url, body=body, expected_status=http.client.OK) + verifier = resp.result['token']['oauth_verifier'] + + request_token.set_verifier(verifier) + url, headers = self._create_access_token(consumer, request_token) + content = self.post( + url, headers=headers, + response_content_type='application/x-www-form-urlencoded') + credentials = self._urllib_parse_qs_text_keys(content.result) + access_key = credentials['oauth_token'][0] + access_secret = credentials['oauth_token_secret'][0] + access_token = oauth1.Token(access_key, access_secret) + + url, headers, body = self._get_oauth_token(consumer, access_token) + content = self.post(url, headers=headers, body=body) + return access_key, content.headers['X-Subject-Token'] + + def test_access_token_ec2_credential(self): + """Test creating ec2 credential from an oauth access token. + + Call ``POST /credentials``. + """ + access_key, token_id = self._get_access_token() + + # Create the credential with the access token + blob, ref = unit.new_ec2_credential(user_id=self.user_id, + project_id=self.project_id) + r = self.post('/credentials', body={'credential': ref}, token=token_id) + + # We expect the response blob to contain the access_token_id + ret_ref = ref.copy() + ret_blob = blob.copy() + ret_blob['access_token_id'] = access_key.decode('utf-8') + ret_ref['blob'] = json.dumps(ret_blob) + self.assertValidCredentialResponse(r, ref=ret_ref) + + # Assert credential id is same as hash of access key id for + # ec2 credentials + access = blob['access'].encode('utf-8') + self.assertEqual(hashlib.sha256(access).hexdigest(), + r.result['credential']['id']) + + # Create a role assignment to ensure that it is ignored and only the + # roles in the access token are used + role = unit.new_role_ref(name='reader') + role_id = role['id'] + PROVIDERS.role_api.create_role(role_id, role) + PROVIDERS.assignment_api.add_role_to_user_and_project( + self.user_id, self.project_id, role_id) + + ret_blob = json.loads(r.result['credential']['blob']) + ec2token = self._test_get_token( + access=ret_blob['access'], secret=ret_blob['secret']) + ec2_roles = [role['id'] for role in ec2token['roles']] + self.assertIn(self.role_id, ec2_roles) + self.assertNotIn(role_id, ec2_roles) + + +class TestCredentialEc2(CredentialBaseTestCase): + """Test v3 credential compatibility with ec2tokens.""" def test_ec2_credential_signature_validate(self): """Test signature validation with a v3 ec2 credential.""" @@ -514,15 +873,15 @@ def test_ec2_credential_signature_validate(self): cred_blob = json.loads(r.result['credential']['blob']) self.assertEqual(blob, cred_blob) - self._validate_signature(access=cred_blob['access'], - secret=cred_blob['secret']) + self._test_get_token(access=cred_blob['access'], + secret=cred_blob['secret']) def test_ec2_credential_signature_validate_legacy(self): """Test signature validation with a legacy v3 ec2 credential.""" cred_json, _ = self._create_dict_blob_credential() cred_blob = json.loads(cred_json) - self._validate_signature(access=cred_blob['access'], - secret=cred_blob['secret']) + self._test_get_token(access=cred_blob['access'], + secret=cred_blob['secret']) def _get_ec2_cred_uri(self): return '/users/%s/credentials/OS-EC2' % self.user_id @@ -538,8 +897,8 @@ def test_ec2_create_credential(self): self.assertEqual(self.user_id, ec2_cred['user_id']) self.assertEqual(self.project_id, ec2_cred['tenant_id']) self.assertIsNone(ec2_cred['trust_id']) - self._validate_signature(access=ec2_cred['access'], - secret=ec2_cred['secret']) + self._test_get_token(access=ec2_cred['access'], + secret=ec2_cred['secret']) uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']]) self.assertThat(ec2_cred['links']['self'], matchers.EndsWith(uri))
releasenotes/notes/bug-1872733-2377f456a57ad32c.yaml+16 −0 added@@ -0,0 +1,16 @@ +--- +critical: + - | + [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_] + Fixed a critical security issue in which an authenticated user could + escalate their privileges by altering a valid EC2 credential. +security: + - | + [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_] + Fixed a critical security issue in which an authenticated user could + escalate their privileges by altering a valid EC2 credential. +fixes: + - | + [`bug 1872733 <https://bugs.launchpad.net/keystone/+bug/1872733>`_] + Fixed a critical security issue in which an authenticated user could + escalate their privileges by altering a valid EC2 credential.
releasenotes/notes/bug-1872735-0989e51d2248ce1e.yaml+31 −0 added@@ -0,0 +1,31 @@ +--- +critical: + - | + [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_] + Fixed a security issue in which a trustee or an application credential user + could create an EC2 credential or an application credential that would + permit them to get a token that elevated their role assignments beyond the + subset delegated to them in the trust or application credential. A new + attribute ``app_cred_id`` is now automatically added to the access blob of + an EC2 credential and the role list in the trust or application credential + is respected. +security: + - | + [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_] + Fixed a security issue in which a trustee or an application credential user + could create an EC2 credential or an application credential that would + permit them to get a token that elevated their role assignments beyond the + subset delegated to them in the trust or application credential. A new + attribute ``app_cred_id`` is now automatically added to the access blob of + an EC2 credential and the role list in the trust or application credential + is respected. +fixes: + - | + [`bug 1872735 <https://bugs.launchpad.net/keystone/+bug/1872735>`_] + Fixed a security issue in which a trustee or an application credential user + could create an EC2 credential or an application credential that would + permit them to get a token that elevated their role assignments beyond the + subset delegated to them in the trust or application credential. A new + attribute ``app_cred_id`` is now automatically added to the access blob of + an EC2 credential and the role list in the trust or application credential + is respected.
releasenotes/notes/bug-1872755-2c81d3267b89f124.yaml+19 −0 added@@ -0,0 +1,19 @@ +--- +security: + - | + [`bug 1872755 <https://bugs.launchpad.net/keystone/+bug/1872755>`_] + Added validation to the EC2 credentials update API to ensure the metadata + labels 'trust_id' and 'app_cred_id' are not altered by the user. These + labels are used by keystone to determine the scope allowed by the + credential, and altering these automatic labels could enable an EC2 + credential holder to elevate their access beyond what is permitted by the + application credential or trust that was used to create the EC2 credential. +fixes: + - | + [`bug 1872755 <https://bugs.launchpad.net/keystone/+bug/1872755>`_] + Added validation to the EC2 credentials update API to ensure the metadata + labels 'trust_id' and 'app_cred_id' are not altered by the user. These + labels are used by keystone to determine the scope allowed by the + credential, and altering these automatic labels could enable an EC2 + credential holder to elevate their access beyond what is permitted by the + application credential or trust that was used to create the EC2 credential.
Vulnerability mechanics
Generated on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
16- github.com/advisories/GHSA-4427-7f3w-mqv6ghsaADVISORY
- nvd.nist.gov/vuln/detail/CVE-2020-12691ghsaADVISORY
- usn.ubuntu.com/4480-1/mitrevendor-advisoryx_refsource_UBUNTU
- www.openwall.com/lists/oss-security/2020/05/07/2ghsamailing-listx_refsource_MLISTWEB
- bugs.launchpad.net/keystone/+bug/1872733ghsax_refsource_MISCWEB
- github.com/openstack/keystone/commit/37e9907a176dad6843819b1bec4946c3aecc4548ghsaWEB
- github.com/openstack/keystone/commit/40cbb7bebd50276412daa1981ff5a7c7b3b899a5ghsaWEB
- github.com/openstack/keystone/commit/95b2bbeab113d9f04d1c81f7f1b48bf692bce979ghsaWEB
- github.com/pypa/advisory-database/tree/main/vulns/keystone/PYSEC-2020-55.yamlghsaWEB
- lists.apache.org/thread.html/re237267da268c690df5e1c6ea6a38a7fc11617725e8049490f58a6fa%40%3Ccommits.druid.apache.org%3Emitremailing-listx_refsource_MLIST
- lists.apache.org/thread.html/re237267da268c690df5e1c6ea6a38a7fc11617725e8049490f58a6fa@%3Ccommits.druid.apache.org%3EghsaWEB
- lists.apache.org/thread.html/re4ffc55cd2f1b55a26e07c83b3c22c3fe4bae6054d000a57fb48d8c2%40%3Ccommits.druid.apache.org%3Emitremailing-listx_refsource_MLIST
- lists.apache.org/thread.html/re4ffc55cd2f1b55a26e07c83b3c22c3fe4bae6054d000a57fb48d8c2@%3Ccommits.druid.apache.org%3EghsaWEB
- security.openstack.org/ossa/OSSA-2020-004.htmlghsax_refsource_CONFIRMWEB
- usn.ubuntu.com/4480-1ghsaWEB
- www.openwall.com/lists/oss-security/2020/05/06/5ghsax_refsource_MISCWEB
News mentions
0No linked articles in our index yet.