VYPR
Moderate severityNVD Advisory· Published Jan 18, 2023· Updated Apr 4, 2025

CVE-2022-47950

CVE-2022-47950

Description

An issue was discovered in OpenStack Swift before 2.28.1, 2.29.x before 2.29.2, and 2.30.0. By supplying crafted XML files, an authenticated user may coerce the S3 API into returning arbitrary file contents from the host server, resulting in unauthorized read access to potentially sensitive data. This impacts both s3api deployments (Rocky or later), and swift3 deployments (Queens and earlier, no longer actively developed).

AI Insight

LLM-synthesized narrative grounded in this CVE's description and references.

CVE-2022-47950: An XML External Entity (XXE) injection in OpenStack Swift's S3 API allows authenticated users to read arbitrary host files.

Vulnerability

Description

CVE-2022-47950 is an XML External Entity (XXE) injection vulnerability in the S3-compatible API of OpenStack Swift, affecting s3api deployments from Rocky onward and swift3 deployments on Queens and earlier (no longer actively developed). The issue arises from insufficient sanitization of crafted XML files processed by the S3 API, allowing an attacker to define external entities that reference local or remote resources [1].

Exploitation

An attacker must be authenticated to the Swift S3 API and can supply a malicious XML file to a supported S3 operation (e.g., bucket ACL or object tagging). The XML parser processes the external entity, causing the server to include arbitrary file contents in the response [2][3][4]. No further authentication or special privileges beyond the valid S3 credentials are required.

Impact

Successful exploitation leads to unauthorized read access to any file readable by the Swift server process, including configuration files, credentials, and other sensitive data. Information disclosure can compromise the entire cloud infrastructure, enabling further attacks [1].

Mitigation

The vulnerability is patched in OpenStack Swift versions 2.28.1, 2.29.2, and 2.30.0. Users of older versions should upgrade immediately. For swift3 deployments, migration to the supported s3api is recommended. No workaround is available other than upgrading or disabling the S3 API [1][2][3][4].

AI Insight generated on May 20, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
swiftPyPI
< 2.28.12.28.1
swiftPyPI
>= 2.29.0, < 2.29.22.29.2
swiftPyPI
>= 2.30.0, < 2.30.12.30.1

Affected products

18

Patches

8
baa98848451b

s3api: Prevent XXE injections

https://github.com/openstack/swiftAymeric DucroquetzOct 25, 2022via ghsa
3 files changed · +274 1
  • swift/common/middleware/s3api/etree.py+1 1 modified
    @@ -130,7 +130,7 @@ def text(self, value):
     
     
     parser_lookup = lxml.etree.ElementDefaultClassLookup(element=_Element)
    -parser = lxml.etree.XMLParser()
    +parser = lxml.etree.XMLParser(resolve_entities=False, no_network=True)
     parser.set_element_class_lookup(parser_lookup)
     
     Element = parser.makeelement
    
  • test/functional/s3api/test_xxe_injection.py+233 0 added
    @@ -0,0 +1,233 @@
    +#!/usr/bin/env python
    +# Copyright (c) 2022 OpenStack Foundation
    +#
    +# Licensed under the Apache License, Version 2.0 (the "License");
    +# you may not use this file except in compliance with the License.
    +# You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
    +# implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +
    +import requests
    +
    +import botocore
    +
    +import test.functional as tf
    +from test.functional.s3api import S3ApiBaseBoto3
    +
    +
    +def setUpModule():
    +    tf.setup_package()
    +
    +
    +def tearDownModule():
    +    tf.teardown_package()
    +
    +
    +class TestS3ApiXxeInjection(S3ApiBaseBoto3):
    +
    +    def setUp(self):
    +        super(TestS3ApiXxeInjection, self).setUp()
    +        self.bucket = 'test-s3api-xxe-injection'
    +
    +    def _create_bucket(self, **kwargs):
    +        resp = self.conn.create_bucket(Bucket=self.bucket, **kwargs)
    +        response_metadata = resp.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +
    +    @staticmethod
    +    def _clear_data(request, **_kwargs):
    +        if 'Content-MD5' in request.headers:
    +            del request.headers['Content-MD5']
    +        request.data = b''
    +
    +    def _presign_url(self, method, key=None, **kwargs):
    +        params = {
    +            'Bucket': self.bucket
    +        }
    +        if key:
    +            params['Key'] = key
    +        params.update(kwargs)
    +        try:
    +            # https://github.com/boto/boto3/issues/2192
    +            self.conn.meta.events.register(
    +                'before-sign.s3.*', self._clear_data)
    +            return self.conn.generate_presigned_url(
    +                method, Params=params, ExpiresIn=60)
    +        finally:
    +            self.conn.meta.events.unregister(
    +                'before-sign.s3.*', self._clear_data)
    +
    +    def test_put_bucket_acl(self):
    +        if not tf.cluster_info['s3api'].get('s3_acl'):
    +            self.skipTest('s3_acl must be enabled')
    +
    +        self._create_bucket()
    +
    +        url = self._presign_url('put_bucket_acl')
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +<Owner>
    +    <DisplayName>test:tester</DisplayName>
    +    <ID>test:tester</ID>
    +</Owner>
    +<AccessControlList>
    +    <Grant>
    +        <Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="CanonicalUser">
    +            <DisplayName>name&xxe;</DisplayName>
    +            <ID>id&xxe;</ID>
    +        </Grantee>
    +        <Permission>WRITE</Permission>
    +    </Grant>
    +</AccessControlList>
    +</AccessControlPolicy>
    +""")  # noqa: E501
    +        self.assertEqual(200, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        acl = self.conn.get_bucket_acl(Bucket=self.bucket)
    +        response_metadata = acl.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        self.assertDictEqual({
    +            'Owner': {
    +                'DisplayName': 'test:tester',
    +                'ID': 'test:tester'
    +            },
    +            'Grants': [
    +                {
    +                    'Grantee': {
    +                        'DisplayName': 'id',
    +                        'ID': 'id',
    +                        'Type': 'CanonicalUser'
    +                    },
    +                    'Permission': 'WRITE'
    +                }
    +            ]
    +        }, acl)
    +
    +    def test_create_bucket(self):
    +        url = self._presign_url('create_bucket')
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <LocationConstraint>&xxe;</LocationConstraint>
    +</CreateBucketConfiguration>
    +""")  # noqa: E501
    +        self.assertEqual(400, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        self.assertRaisesRegexp(
    +            botocore.exceptions.ClientError, 'Not Found',
    +            self.conn.head_bucket, Bucket=self.bucket)
    +
    +    def test_delete_objects(self):
    +        self._create_bucket()
    +
    +        url = self._presign_url(
    +            'delete_objects',
    +            Delete={
    +                'Objects': [
    +                    {
    +                        'Key': 'string',
    +                        'VersionId': 'string'
    +                    }
    +                ]
    +            })
    +        body = """
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<Delete xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <Object>
    +        <Key>&xxe;</Key>
    +    </Object>
    +</Delete>
    +"""
    +        body = body.encode('utf-8')
    +        resp = requests.post(url, data=body)
    +        self.assertEqual(400, resp.status_code, resp.content)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +    def test_complete_multipart_upload(self):
    +        self._create_bucket()
    +
    +        resp = self.conn.create_multipart_upload(
    +            Bucket=self.bucket, Key='test')
    +        response_metadata = resp.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        uploadid = resp.get('UploadId')
    +
    +        try:
    +            url = self._presign_url(
    +                'complete_multipart_upload',
    +                Key='key',
    +                MultipartUpload={
    +                    'Parts': [
    +                        {
    +                            'ETag': 'string',
    +                            'PartNumber': 1
    +                        }
    +                    ],
    +                },
    +                UploadId=uploadid)
    +            resp = requests.post(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +   <Part>
    +      <ETag>"{uploadid}"</ETag>
    +      <PartNumber>&xxe;</PartNumber>
    +   </Part>
    +</CompleteMultipartUpload>
    +""")  # noqa: E501
    +            self.assertEqual(404, resp.status_code)
    +            self.assertNotIn(b'xxe', resp.content)
    +            self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +            resp = requests.post(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +   <Part>
    +      <ETag>"&xxe;"</ETag>
    +      <PartNumber>1</PartNumber>
    +   </Part>
    +</CompleteMultipartUpload>
    +""")  # noqa: E501
    +            self.assertEqual(404, resp.status_code)
    +            self.assertNotIn(b'xxe', resp.content)
    +            self.assertNotIn(b'[swift-hash]', resp.content)
    +        finally:
    +            resp = self.conn.abort_multipart_upload(
    +                Bucket=self.bucket, Key='test', UploadId=uploadid)
    +            response_metadata = resp.pop('ResponseMetadata', {})
    +            self.assertEqual(204, response_metadata.get('HTTPStatusCode'))
    +
    +    def test_put_bucket_versioning(self):
    +        self._create_bucket()
    +
    +        url = self._presign_url(
    +            'put_bucket_versioning',
    +            VersioningConfiguration={
    +                'Status': 'Enabled'
    +            })
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <Status>&xxe;</Status>
    +</VersioningConfiguration>
    +""")  # noqa: E501
    +        self.assertEqual(400, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        versioning = self.conn.get_bucket_versioning(Bucket=self.bucket)
    +        response_metadata = versioning.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        self.assertDictEqual({}, versioning)
    
  • test/unit/common/middleware/s3api/test_multi_delete.py+40 0 modified
    @@ -445,6 +445,7 @@ def test_object_multi_DELETE_unhandled_exception(self):
                                 body=body)
             status, headers, body = self.call_s3api(req)
             self.assertEqual(status.split()[0], '200')
    +        self.assertIn(b'<Error><Key>Key1</Key><Code>Server Error</Code>', body)
     
         def _test_object_multi_DELETE(self, account):
             self.keys = ['Key1', 'Key2']
    @@ -500,6 +501,45 @@ def test_object_multi_DELETE_with_fullcontrol_permission(self):
             elem = fromstring(body)
             self.assertEqual(len(elem.findall('Deleted')), len(self.keys))
     
    +    def test_object_multi_DELETE_with_system_entity(self):
    +        self.keys = ['Key1', 'Key2']
    +        self.swift.register(
    +            'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[0],
    +            swob.HTTPNotFound, {}, None)
    +        self.swift.register(
    +            'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[1],
    +            swob.HTTPNoContent, {}, None)
    +
    +        elem = Element('Delete')
    +        for key in self.keys:
    +            obj = SubElement(elem, 'Object')
    +            SubElement(obj, 'Key').text = key
    +        body = tostring(elem, use_s3ns=False)
    +        body = body.replace(
    +            b'?>\n',
    +            b'?>\n<!DOCTYPE foo '
    +            b'[<!ENTITY ent SYSTEM "file:///etc/passwd"> ]>\n',
    +        ).replace(b'>Key1<', b'>Key1&ent;<')
    +        content_md5 = (
    +            base64.b64encode(md5(body).digest())
    +            .strip())
    +
    +        req = Request.blank('/bucket?delete',
    +                            environ={'REQUEST_METHOD': 'POST'},
    +                            headers={
    +                                'Authorization': 'AWS test:full_control:hmac',
    +                                'Date': self.get_date_header(),
    +                                'Content-MD5': content_md5},
    +                            body=body)
    +        req.date = datetime.now()
    +        req.content_type = 'text/plain'
    +
    +        status, headers, body = self.call_s3api(req)
    +        self.assertEqual(status, '200 OK', body)
    +        self.assertIn(b'<Deleted><Key>Key2</Key></Deleted>', body)
    +        self.assertNotIn(b'root:/root', body)
    +        self.assertIn(b'<Deleted><Key>Key1</Key></Deleted>', body)
    +
         def _test_no_body(self, use_content_length=False,
                           use_transfer_encoding=False, string_to_md5=b''):
             content_md5 = base64.b64encode(md5(string_to_md5).digest()).strip()
    
d8d04ef43c90

s3api: Prevent XXE injections

https://github.com/openstack/swiftAymeric DucroquetzOct 25, 2022via ghsa
3 files changed · +272 1
  • swift/common/middleware/s3api/etree.py+1 1 modified
    @@ -130,7 +130,7 @@ def text(self, value):
     
     
     parser_lookup = lxml.etree.ElementDefaultClassLookup(element=_Element)
    -parser = lxml.etree.XMLParser()
    +parser = lxml.etree.XMLParser(resolve_entities=False, no_network=True)
     parser.set_element_class_lookup(parser_lookup)
     
     Element = parser.makeelement
    
  • test/functional/s3api/test_xxe_injection.py+231 0 added
    @@ -0,0 +1,231 @@
    +#!/usr/bin/env python
    +# Copyright (c) 2022 OpenStack Foundation
    +#
    +# Licensed under the Apache License, Version 2.0 (the "License");
    +# you may not use this file except in compliance with the License.
    +# You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
    +# implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +
    +import requests
    +
    +import botocore
    +
    +import test.functional as tf
    +from test.functional.s3api import S3ApiBaseBoto3
    +
    +
    +def setUpModule():
    +    tf.setup_package()
    +
    +
    +def tearDownModule():
    +    tf.teardown_package()
    +
    +
    +class TestS3ApiXxeInjection(S3ApiBaseBoto3):
    +
    +    def setUp(self):
    +        super(TestS3ApiXxeInjection, self).setUp()
    +        self.bucket = 'test-s3api-xxe-injection'
    +
    +    def _create_bucket(self, **kwargs):
    +        resp = self.conn.create_bucket(Bucket=self.bucket, **kwargs)
    +        response_metadata = resp.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +
    +    @staticmethod
    +    def _clear_data(request, **_kwargs):
    +        request.data = b''
    +
    +    def _presign_url(self, method, key=None, **kwargs):
    +        params = {
    +            'Bucket': self.bucket
    +        }
    +        if key:
    +            params['Key'] = key
    +        params.update(kwargs)
    +        try:
    +            # https://github.com/boto/boto3/issues/2192
    +            self.conn.meta.events.register(
    +                'before-sign.s3.*', self._clear_data)
    +            return self.conn.generate_presigned_url(
    +                method, Params=params, ExpiresIn=60)
    +        finally:
    +            self.conn.meta.events.unregister(
    +                'before-sign.s3.*', self._clear_data)
    +
    +    def test_put_bucket_acl(self):
    +        if not tf.cluster_info['s3api'].get('s3_acl'):
    +            self.skipTest('s3_acl must be enabled')
    +
    +        self._create_bucket()
    +
    +        url = self._presign_url('put_bucket_acl')
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +<Owner>
    +    <DisplayName>test:tester</DisplayName>
    +    <ID>test:tester</ID>
    +</Owner>
    +<AccessControlList>
    +    <Grant>
    +        <Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="CanonicalUser">
    +            <DisplayName>name&xxe;</DisplayName>
    +            <ID>id&xxe;</ID>
    +        </Grantee>
    +        <Permission>WRITE</Permission>
    +    </Grant>
    +</AccessControlList>
    +</AccessControlPolicy>
    +""")  # noqa: E501
    +        self.assertEqual(200, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        acl = self.conn.get_bucket_acl(Bucket=self.bucket)
    +        response_metadata = acl.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        self.assertDictEqual({
    +            'Owner': {
    +                'DisplayName': 'test:tester',
    +                'ID': 'test:tester'
    +            },
    +            'Grants': [
    +                {
    +                    'Grantee': {
    +                        'DisplayName': 'id',
    +                        'ID': 'id',
    +                        'Type': 'CanonicalUser'
    +                    },
    +                    'Permission': 'WRITE'
    +                }
    +            ]
    +        }, acl)
    +
    +    def test_create_bucket(self):
    +        url = self._presign_url('create_bucket')
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <LocationConstraint>&xxe;</LocationConstraint>
    +</CreateBucketConfiguration>
    +""")  # noqa: E501
    +        self.assertEqual(400, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        self.assertRaisesRegexp(
    +            botocore.exceptions.ClientError, 'Not Found',
    +            self.conn.head_bucket, Bucket=self.bucket)
    +
    +    def test_delete_objects(self):
    +        self._create_bucket()
    +
    +        url = self._presign_url(
    +            'delete_objects',
    +            Delete={
    +                'Objects': [
    +                    {
    +                        'Key': 'string',
    +                        'VersionId': 'string'
    +                    }
    +                ]
    +            })
    +        body = """
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<Delete xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <Object>
    +        <Key>&xxe;</Key>
    +    </Object>
    +</Delete>
    +"""
    +        body = body.encode('utf-8')
    +        resp = requests.post(url, data=body)
    +        self.assertEqual(400, resp.status_code, resp.content)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +    def test_complete_multipart_upload(self):
    +        self._create_bucket()
    +
    +        resp = self.conn.create_multipart_upload(
    +            Bucket=self.bucket, Key='test')
    +        response_metadata = resp.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        uploadid = resp.get('UploadId')
    +
    +        try:
    +            url = self._presign_url(
    +                'complete_multipart_upload',
    +                Key='key',
    +                MultipartUpload={
    +                    'Parts': [
    +                        {
    +                            'ETag': 'string',
    +                            'PartNumber': 1
    +                        }
    +                    ],
    +                },
    +                UploadId=uploadid)
    +            resp = requests.post(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +   <Part>
    +      <ETag>"{uploadid}"</ETag>
    +      <PartNumber>&xxe;</PartNumber>
    +   </Part>
    +</CompleteMultipartUpload>
    +""")  # noqa: E501
    +            self.assertEqual(404, resp.status_code)
    +            self.assertNotIn(b'xxe', resp.content)
    +            self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +            resp = requests.post(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +   <Part>
    +      <ETag>"&xxe;"</ETag>
    +      <PartNumber>1</PartNumber>
    +   </Part>
    +</CompleteMultipartUpload>
    +""")  # noqa: E501
    +            self.assertEqual(404, resp.status_code)
    +            self.assertNotIn(b'xxe', resp.content)
    +            self.assertNotIn(b'[swift-hash]', resp.content)
    +        finally:
    +            resp = self.conn.abort_multipart_upload(
    +                Bucket=self.bucket, Key='test', UploadId=uploadid)
    +            response_metadata = resp.pop('ResponseMetadata', {})
    +            self.assertEqual(204, response_metadata.get('HTTPStatusCode'))
    +
    +    def test_put_bucket_versioning(self):
    +        self._create_bucket()
    +
    +        url = self._presign_url(
    +            'put_bucket_versioning',
    +            VersioningConfiguration={
    +                'Status': 'Enabled'
    +            })
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <Status>&xxe;</Status>
    +</VersioningConfiguration>
    +""")  # noqa: E501
    +        self.assertEqual(400, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        versioning = self.conn.get_bucket_versioning(Bucket=self.bucket)
    +        response_metadata = versioning.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        self.assertDictEqual({}, versioning)
    
  • test/unit/common/middleware/s3api/test_multi_delete.py+40 0 modified
    @@ -521,6 +521,7 @@ def test_object_multi_DELETE_unhandled_exception(self):
                                 body=body)
             status, headers, body = self.call_s3api(req)
             self.assertEqual(status.split()[0], '200')
    +        self.assertIn(b'<Error><Key>Key1</Key><Code>Server Error</Code>', body)
     
         def _test_object_multi_DELETE(self, account):
             self.keys = ['Key1', 'Key2']
    @@ -578,6 +579,45 @@ def test_object_multi_DELETE_with_fullcontrol_permission(self):
             elem = fromstring(body)
             self.assertEqual(len(elem.findall('Deleted')), len(self.keys))
     
    +    def test_object_multi_DELETE_with_system_entity(self):
    +        self.keys = ['Key1', 'Key2']
    +        self.swift.register(
    +            'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[0],
    +            swob.HTTPNotFound, {}, None)
    +        self.swift.register(
    +            'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[1],
    +            swob.HTTPNoContent, {}, None)
    +
    +        elem = Element('Delete')
    +        for key in self.keys:
    +            obj = SubElement(elem, 'Object')
    +            SubElement(obj, 'Key').text = key
    +        body = tostring(elem, use_s3ns=False)
    +        body = body.replace(
    +            b'?>\n',
    +            b'?>\n<!DOCTYPE foo '
    +            b'[<!ENTITY ent SYSTEM "file:///etc/passwd"> ]>\n',
    +        ).replace(b'>Key1<', b'>Key1&ent;<')
    +        content_md5 = (
    +            base64.b64encode(md5(body, usedforsecurity=False).digest())
    +            .strip())
    +
    +        req = Request.blank('/bucket?delete',
    +                            environ={'REQUEST_METHOD': 'POST'},
    +                            headers={
    +                                'Authorization': 'AWS test:full_control:hmac',
    +                                'Date': self.get_date_header(),
    +                                'Content-MD5': content_md5},
    +                            body=body)
    +        req.date = datetime.now()
    +        req.content_type = 'text/plain'
    +
    +        status, headers, body = self.call_s3api(req)
    +        self.assertEqual(status, '200 OK', body)
    +        self.assertIn(b'<Deleted><Key>Key2</Key></Deleted>', body)
    +        self.assertNotIn(b'root:/root', body)
    +        self.assertIn(b'<Deleted><Key>Key1</Key></Deleted>', body)
    +
         def _test_no_body(self, use_content_length=False,
                           use_transfer_encoding=False, string_to_md5=b''):
             content_md5 = (base64.b64encode(
    
12e54391861e

s3api: Prevent XXE injections

https://github.com/openstack/swiftAymeric DucroquetzOct 25, 2022via ghsa
2 files changed · +41 1
  • swift/common/middleware/s3api/etree.py+1 1 modified
    @@ -128,7 +128,7 @@ def text(self, value):
     
     
     parser_lookup = lxml.etree.ElementDefaultClassLookup(element=_Element)
    -parser = lxml.etree.XMLParser()
    +parser = lxml.etree.XMLParser(resolve_entities=False, no_network=True)
     parser.set_element_class_lookup(parser_lookup)
     
     Element = parser.makeelement
    
  • test/unit/common/middleware/s3api/test_multi_delete.py+40 0 modified
    @@ -13,6 +13,7 @@
     # See the License for the specific language governing permissions and
     # limitations under the License.
     
    +import base64
     import json
     import unittest
     from datetime import datetime
    @@ -373,6 +374,45 @@ def test_object_multi_DELETE_with_fullcontrol_permission(self):
             elem = fromstring(body)
             self.assertEqual(len(elem.findall('Deleted')), len(self.keys))
     
    +    def test_object_multi_DELETE_with_system_entity(self):
    +        self.keys = ['Key1', 'Key2']
    +        self.swift.register(
    +            'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[0],
    +            swob.HTTPNotFound, {}, None)
    +        self.swift.register(
    +            'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[1],
    +            swob.HTTPNoContent, {}, None)
    +
    +        elem = Element('Delete')
    +        for key in self.keys:
    +            obj = SubElement(elem, 'Object')
    +            SubElement(obj, 'Key').text = key
    +        body = tostring(elem, use_s3ns=False)
    +        body = body.replace(
    +            b'?>\n',
    +            b'?>\n<!DOCTYPE foo '
    +            b'[<!ENTITY ent SYSTEM "file:///etc/passwd"> ]>\n',
    +        ).replace(b'>Key1<', b'>Key1&ent;<')
    +        content_md5 = (
    +            base64.b64encode(md5(body).digest())
    +            .strip())
    +
    +        req = Request.blank('/bucket?delete',
    +                            environ={'REQUEST_METHOD': 'POST'},
    +                            headers={
    +                                'Authorization': 'AWS test:full_control:hmac',
    +                                'Date': self.get_date_header(),
    +                                'Content-MD5': content_md5},
    +                            body=body)
    +        req.date = datetime.now()
    +        req.content_type = 'text/plain'
    +
    +        status, headers, body = self.call_s3api(req)
    +        self.assertEqual(status, '200 OK', body)
    +        self.assertIn(b'<Deleted><Key>Key2</Key></Deleted>', body)
    +        self.assertNotIn(b'root:/root', body)
    +        self.assertIn(b'<Deleted><Key>Key1</Key></Deleted>', body)
    +
         def _test_no_body(self, use_content_length=False,
                           use_transfer_encoding=False, string_to_md5=''):
             content_md5 = md5(string_to_md5).digest().encode('base64').strip()
    
7d13d1a82e1f

s3api: Prevent XXE injections

https://github.com/openstack/swiftAymeric DucroquetzOct 25, 2022via ghsa
3 files changed · +274 1
  • swift/common/middleware/s3api/etree.py+1 1 modified
    @@ -130,7 +130,7 @@ def text(self, value):
     
     
     parser_lookup = lxml.etree.ElementDefaultClassLookup(element=_Element)
    -parser = lxml.etree.XMLParser()
    +parser = lxml.etree.XMLParser(resolve_entities=False, no_network=True)
     parser.set_element_class_lookup(parser_lookup)
     
     Element = parser.makeelement
    
  • test/functional/s3api/test_xxe_injection.py+233 0 added
    @@ -0,0 +1,233 @@
    +#!/usr/bin/env python
    +# Copyright (c) 2022 OpenStack Foundation
    +#
    +# Licensed under the Apache License, Version 2.0 (the "License");
    +# you may not use this file except in compliance with the License.
    +# You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
    +# implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +
    +import requests
    +
    +import botocore
    +
    +import test.functional as tf
    +from test.functional.s3api import S3ApiBaseBoto3
    +
    +
    +def setUpModule():
    +    tf.setup_package()
    +
    +
    +def tearDownModule():
    +    tf.teardown_package()
    +
    +
    +class TestS3ApiXxeInjection(S3ApiBaseBoto3):
    +
    +    def setUp(self):
    +        super(TestS3ApiXxeInjection, self).setUp()
    +        self.bucket = 'test-s3api-xxe-injection'
    +
    +    def _create_bucket(self, **kwargs):
    +        resp = self.conn.create_bucket(Bucket=self.bucket, **kwargs)
    +        response_metadata = resp.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +
    +    @staticmethod
    +    def _clear_data(request, **_kwargs):
    +        if 'Content-MD5' in request.headers:
    +            del request.headers['Content-MD5']
    +        request.data = b''
    +
    +    def _presign_url(self, method, key=None, **kwargs):
    +        params = {
    +            'Bucket': self.bucket
    +        }
    +        if key:
    +            params['Key'] = key
    +        params.update(kwargs)
    +        try:
    +            # https://github.com/boto/boto3/issues/2192
    +            self.conn.meta.events.register(
    +                'before-sign.s3.*', self._clear_data)
    +            return self.conn.generate_presigned_url(
    +                method, Params=params, ExpiresIn=60)
    +        finally:
    +            self.conn.meta.events.unregister(
    +                'before-sign.s3.*', self._clear_data)
    +
    +    def test_put_bucket_acl(self):
    +        if not tf.cluster_info['s3api'].get('s3_acl'):
    +            self.skipTest('s3_acl must be enabled')
    +
    +        self._create_bucket()
    +
    +        url = self._presign_url('put_bucket_acl')
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +<Owner>
    +    <DisplayName>test:tester</DisplayName>
    +    <ID>test:tester</ID>
    +</Owner>
    +<AccessControlList>
    +    <Grant>
    +        <Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="CanonicalUser">
    +            <DisplayName>name&xxe;</DisplayName>
    +            <ID>id&xxe;</ID>
    +        </Grantee>
    +        <Permission>WRITE</Permission>
    +    </Grant>
    +</AccessControlList>
    +</AccessControlPolicy>
    +""")  # noqa: E501
    +        self.assertEqual(200, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        acl = self.conn.get_bucket_acl(Bucket=self.bucket)
    +        response_metadata = acl.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        self.assertDictEqual({
    +            'Owner': {
    +                'DisplayName': 'test:tester',
    +                'ID': 'test:tester'
    +            },
    +            'Grants': [
    +                {
    +                    'Grantee': {
    +                        'DisplayName': 'id',
    +                        'ID': 'id',
    +                        'Type': 'CanonicalUser'
    +                    },
    +                    'Permission': 'WRITE'
    +                }
    +            ]
    +        }, acl)
    +
    +    def test_create_bucket(self):
    +        url = self._presign_url('create_bucket')
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <LocationConstraint>&xxe;</LocationConstraint>
    +</CreateBucketConfiguration>
    +""")  # noqa: E501
    +        self.assertEqual(400, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        self.assertRaisesRegexp(
    +            botocore.exceptions.ClientError, 'Not Found',
    +            self.conn.head_bucket, Bucket=self.bucket)
    +
    +    def test_delete_objects(self):
    +        self._create_bucket()
    +
    +        url = self._presign_url(
    +            'delete_objects',
    +            Delete={
    +                'Objects': [
    +                    {
    +                        'Key': 'string',
    +                        'VersionId': 'string'
    +                    }
    +                ]
    +            })
    +        body = """
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<Delete xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <Object>
    +        <Key>&xxe;</Key>
    +    </Object>
    +</Delete>
    +"""
    +        body = body.encode('utf-8')
    +        resp = requests.post(url, data=body)
    +        self.assertEqual(400, resp.status_code, resp.content)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +    def test_complete_multipart_upload(self):
    +        self._create_bucket()
    +
    +        resp = self.conn.create_multipart_upload(
    +            Bucket=self.bucket, Key='test')
    +        response_metadata = resp.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        uploadid = resp.get('UploadId')
    +
    +        try:
    +            url = self._presign_url(
    +                'complete_multipart_upload',
    +                Key='key',
    +                MultipartUpload={
    +                    'Parts': [
    +                        {
    +                            'ETag': 'string',
    +                            'PartNumber': 1
    +                        }
    +                    ],
    +                },
    +                UploadId=uploadid)
    +            resp = requests.post(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +   <Part>
    +      <ETag>"{uploadid}"</ETag>
    +      <PartNumber>&xxe;</PartNumber>
    +   </Part>
    +</CompleteMultipartUpload>
    +""")  # noqa: E501
    +            self.assertEqual(404, resp.status_code)
    +            self.assertNotIn(b'xxe', resp.content)
    +            self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +            resp = requests.post(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +   <Part>
    +      <ETag>"&xxe;"</ETag>
    +      <PartNumber>1</PartNumber>
    +   </Part>
    +</CompleteMultipartUpload>
    +""")  # noqa: E501
    +            self.assertEqual(404, resp.status_code)
    +            self.assertNotIn(b'xxe', resp.content)
    +            self.assertNotIn(b'[swift-hash]', resp.content)
    +        finally:
    +            resp = self.conn.abort_multipart_upload(
    +                Bucket=self.bucket, Key='test', UploadId=uploadid)
    +            response_metadata = resp.pop('ResponseMetadata', {})
    +            self.assertEqual(204, response_metadata.get('HTTPStatusCode'))
    +
    +    def test_put_bucket_versioning(self):
    +        self._create_bucket()
    +
    +        url = self._presign_url(
    +            'put_bucket_versioning',
    +            VersioningConfiguration={
    +                'Status': 'Enabled'
    +            })
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <Status>&xxe;</Status>
    +</VersioningConfiguration>
    +""")  # noqa: E501
    +        self.assertEqual(400, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        versioning = self.conn.get_bucket_versioning(Bucket=self.bucket)
    +        response_metadata = versioning.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        self.assertDictEqual({}, versioning)
    
  • test/unit/common/middleware/s3api/test_multi_delete.py+40 0 modified
    @@ -501,6 +501,7 @@ def test_object_multi_DELETE_unhandled_exception(self):
                                 body=body)
             status, headers, body = self.call_s3api(req)
             self.assertEqual(status.split()[0], '200')
    +        self.assertIn(b'<Error><Key>Key1</Key><Code>Server Error</Code>', body)
     
         def _test_object_multi_DELETE(self, account):
             self.keys = ['Key1', 'Key2']
    @@ -558,6 +559,45 @@ def test_object_multi_DELETE_with_fullcontrol_permission(self):
             elem = fromstring(body)
             self.assertEqual(len(elem.findall('Deleted')), len(self.keys))
     
    +    def test_object_multi_DELETE_with_system_entity(self):
    +        self.keys = ['Key1', 'Key2']
    +        self.swift.register(
    +            'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[0],
    +            swob.HTTPNotFound, {}, None)
    +        self.swift.register(
    +            'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[1],
    +            swob.HTTPNoContent, {}, None)
    +
    +        elem = Element('Delete')
    +        for key in self.keys:
    +            obj = SubElement(elem, 'Object')
    +            SubElement(obj, 'Key').text = key
    +        body = tostring(elem, use_s3ns=False)
    +        body = body.replace(
    +            b'?>\n',
    +            b'?>\n<!DOCTYPE foo '
    +            b'[<!ENTITY ent SYSTEM "file:///etc/passwd"> ]>\n',
    +        ).replace(b'>Key1<', b'>Key1&ent;<')
    +        content_md5 = (
    +            base64.b64encode(md5(body, usedforsecurity=False).digest())
    +            .strip())
    +
    +        req = Request.blank('/bucket?delete',
    +                            environ={'REQUEST_METHOD': 'POST'},
    +                            headers={
    +                                'Authorization': 'AWS test:full_control:hmac',
    +                                'Date': self.get_date_header(),
    +                                'Content-MD5': content_md5},
    +                            body=body)
    +        req.date = datetime.now()
    +        req.content_type = 'text/plain'
    +
    +        status, headers, body = self.call_s3api(req)
    +        self.assertEqual(status, '200 OK', body)
    +        self.assertIn(b'<Deleted><Key>Key2</Key></Deleted>', body)
    +        self.assertNotIn(b'root:/root', body)
    +        self.assertIn(b'<Deleted><Key>Key1</Key></Deleted>', body)
    +
         def _test_no_body(self, use_content_length=False,
                           use_transfer_encoding=False, string_to_md5=b''):
             content_md5 = (base64.b64encode(
    
c834e7a53d5a

s3api: Prevent XXE injections

https://github.com/openstack/swiftAymeric DucroquetzOct 25, 2022via ghsa
3 files changed · +274 1
  • swift/common/middleware/s3api/etree.py+1 1 modified
    @@ -130,7 +130,7 @@ def text(self, value):
     
     
     parser_lookup = lxml.etree.ElementDefaultClassLookup(element=_Element)
    -parser = lxml.etree.XMLParser()
    +parser = lxml.etree.XMLParser(resolve_entities=False, no_network=True)
     parser.set_element_class_lookup(parser_lookup)
     
     Element = parser.makeelement
    
  • test/functional/s3api/test_xxe_injection.py+233 0 added
    @@ -0,0 +1,233 @@
    +#!/usr/bin/env python
    +# Copyright (c) 2022 OpenStack Foundation
    +#
    +# Licensed under the Apache License, Version 2.0 (the "License");
    +# you may not use this file except in compliance with the License.
    +# You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
    +# implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +
    +import requests
    +
    +import botocore
    +
    +import test.functional as tf
    +from test.functional.s3api import S3ApiBaseBoto3
    +
    +
    +def setUpModule():
    +    tf.setup_package()
    +
    +
    +def tearDownModule():
    +    tf.teardown_package()
    +
    +
    +class TestS3ApiXxeInjection(S3ApiBaseBoto3):
    +
    +    def setUp(self):
    +        super(TestS3ApiXxeInjection, self).setUp()
    +        self.bucket = 'test-s3api-xxe-injection'
    +
    +    def _create_bucket(self, **kwargs):
    +        resp = self.conn.create_bucket(Bucket=self.bucket, **kwargs)
    +        response_metadata = resp.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +
    +    @staticmethod
    +    def _clear_data(request, **_kwargs):
    +        if 'Content-MD5' in request.headers:
    +            del request.headers['Content-MD5']
    +        request.data = b''
    +
    +    def _presign_url(self, method, key=None, **kwargs):
    +        params = {
    +            'Bucket': self.bucket
    +        }
    +        if key:
    +            params['Key'] = key
    +        params.update(kwargs)
    +        try:
    +            # https://github.com/boto/boto3/issues/2192
    +            self.conn.meta.events.register(
    +                'before-sign.s3.*', self._clear_data)
    +            return self.conn.generate_presigned_url(
    +                method, Params=params, ExpiresIn=60)
    +        finally:
    +            self.conn.meta.events.unregister(
    +                'before-sign.s3.*', self._clear_data)
    +
    +    def test_put_bucket_acl(self):
    +        if not tf.cluster_info['s3api'].get('s3_acl'):
    +            self.skipTest('s3_acl must be enabled')
    +
    +        self._create_bucket()
    +
    +        url = self._presign_url('put_bucket_acl')
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +<Owner>
    +    <DisplayName>test:tester</DisplayName>
    +    <ID>test:tester</ID>
    +</Owner>
    +<AccessControlList>
    +    <Grant>
    +        <Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="CanonicalUser">
    +            <DisplayName>name&xxe;</DisplayName>
    +            <ID>id&xxe;</ID>
    +        </Grantee>
    +        <Permission>WRITE</Permission>
    +    </Grant>
    +</AccessControlList>
    +</AccessControlPolicy>
    +""")  # noqa: E501
    +        self.assertEqual(200, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        acl = self.conn.get_bucket_acl(Bucket=self.bucket)
    +        response_metadata = acl.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        self.assertDictEqual({
    +            'Owner': {
    +                'DisplayName': 'test:tester',
    +                'ID': 'test:tester'
    +            },
    +            'Grants': [
    +                {
    +                    'Grantee': {
    +                        'DisplayName': 'id',
    +                        'ID': 'id',
    +                        'Type': 'CanonicalUser'
    +                    },
    +                    'Permission': 'WRITE'
    +                }
    +            ]
    +        }, acl)
    +
    +    def test_create_bucket(self):
    +        url = self._presign_url('create_bucket')
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <LocationConstraint>&xxe;</LocationConstraint>
    +</CreateBucketConfiguration>
    +""")  # noqa: E501
    +        self.assertEqual(400, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        self.assertRaisesRegexp(
    +            botocore.exceptions.ClientError, 'Not Found',
    +            self.conn.head_bucket, Bucket=self.bucket)
    +
    +    def test_delete_objects(self):
    +        self._create_bucket()
    +
    +        url = self._presign_url(
    +            'delete_objects',
    +            Delete={
    +                'Objects': [
    +                    {
    +                        'Key': 'string',
    +                        'VersionId': 'string'
    +                    }
    +                ]
    +            })
    +        body = """
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<Delete xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <Object>
    +        <Key>&xxe;</Key>
    +    </Object>
    +</Delete>
    +"""
    +        body = body.encode('utf-8')
    +        resp = requests.post(url, data=body)
    +        self.assertEqual(400, resp.status_code, resp.content)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +    def test_complete_multipart_upload(self):
    +        self._create_bucket()
    +
    +        resp = self.conn.create_multipart_upload(
    +            Bucket=self.bucket, Key='test')
    +        response_metadata = resp.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        uploadid = resp.get('UploadId')
    +
    +        try:
    +            url = self._presign_url(
    +                'complete_multipart_upload',
    +                Key='key',
    +                MultipartUpload={
    +                    'Parts': [
    +                        {
    +                            'ETag': 'string',
    +                            'PartNumber': 1
    +                        }
    +                    ],
    +                },
    +                UploadId=uploadid)
    +            resp = requests.post(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +   <Part>
    +      <ETag>"{uploadid}"</ETag>
    +      <PartNumber>&xxe;</PartNumber>
    +   </Part>
    +</CompleteMultipartUpload>
    +""")  # noqa: E501
    +            self.assertEqual(404, resp.status_code)
    +            self.assertNotIn(b'xxe', resp.content)
    +            self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +            resp = requests.post(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +   <Part>
    +      <ETag>"&xxe;"</ETag>
    +      <PartNumber>1</PartNumber>
    +   </Part>
    +</CompleteMultipartUpload>
    +""")  # noqa: E501
    +            self.assertEqual(404, resp.status_code)
    +            self.assertNotIn(b'xxe', resp.content)
    +            self.assertNotIn(b'[swift-hash]', resp.content)
    +        finally:
    +            resp = self.conn.abort_multipart_upload(
    +                Bucket=self.bucket, Key='test', UploadId=uploadid)
    +            response_metadata = resp.pop('ResponseMetadata', {})
    +            self.assertEqual(204, response_metadata.get('HTTPStatusCode'))
    +
    +    def test_put_bucket_versioning(self):
    +        self._create_bucket()
    +
    +        url = self._presign_url(
    +            'put_bucket_versioning',
    +            VersioningConfiguration={
    +                'Status': 'Enabled'
    +            })
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <Status>&xxe;</Status>
    +</VersioningConfiguration>
    +""")  # noqa: E501
    +        self.assertEqual(400, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        versioning = self.conn.get_bucket_versioning(Bucket=self.bucket)
    +        response_metadata = versioning.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        self.assertDictEqual({}, versioning)
    
  • test/unit/common/middleware/s3api/test_multi_delete.py+40 0 modified
    @@ -445,6 +445,7 @@ def test_object_multi_DELETE_unhandled_exception(self):
                                 body=body)
             status, headers, body = self.call_s3api(req)
             self.assertEqual(status.split()[0], '200')
    +        self.assertIn(b'<Error><Key>Key1</Key><Code>Server Error</Code>', body)
     
         def _test_object_multi_DELETE(self, account):
             self.keys = ['Key1', 'Key2']
    @@ -500,6 +501,45 @@ def test_object_multi_DELETE_with_fullcontrol_permission(self):
             elem = fromstring(body)
             self.assertEqual(len(elem.findall('Deleted')), len(self.keys))
     
    +    def test_object_multi_DELETE_with_system_entity(self):
    +        self.keys = ['Key1', 'Key2']
    +        self.swift.register(
    +            'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[0],
    +            swob.HTTPNotFound, {}, None)
    +        self.swift.register(
    +            'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[1],
    +            swob.HTTPNoContent, {}, None)
    +
    +        elem = Element('Delete')
    +        for key in self.keys:
    +            obj = SubElement(elem, 'Object')
    +            SubElement(obj, 'Key').text = key
    +        body = tostring(elem, use_s3ns=False)
    +        body = body.replace(
    +            b'?>\n',
    +            b'?>\n<!DOCTYPE foo '
    +            b'[<!ENTITY ent SYSTEM "file:///etc/passwd"> ]>\n',
    +        ).replace(b'>Key1<', b'>Key1&ent;<')
    +        content_md5 = (
    +            base64.b64encode(md5(body).digest())
    +            .strip())
    +
    +        req = Request.blank('/bucket?delete',
    +                            environ={'REQUEST_METHOD': 'POST'},
    +                            headers={
    +                                'Authorization': 'AWS test:full_control:hmac',
    +                                'Date': self.get_date_header(),
    +                                'Content-MD5': content_md5},
    +                            body=body)
    +        req.date = datetime.now()
    +        req.content_type = 'text/plain'
    +
    +        status, headers, body = self.call_s3api(req)
    +        self.assertEqual(status, '200 OK', body)
    +        self.assertIn(b'<Deleted><Key>Key2</Key></Deleted>', body)
    +        self.assertNotIn(b'root:/root', body)
    +        self.assertIn(b'<Deleted><Key>Key1</Key></Deleted>', body)
    +
         def _test_no_body(self, use_content_length=False,
                           use_transfer_encoding=False, string_to_md5=b''):
             content_md5 = base64.b64encode(md5(string_to_md5).digest()).strip()
    
f10672514217

s3api: Prevent XXE injections

https://github.com/openstack/swiftAymeric DucroquetzOct 25, 2022via ghsa
2 files changed · +41 1
  • swift/common/middleware/s3api/etree.py+1 1 modified
    @@ -127,7 +127,7 @@ def text(self, value):
     
     
     parser_lookup = lxml.etree.ElementDefaultClassLookup(element=_Element)
    -parser = lxml.etree.XMLParser()
    +parser = lxml.etree.XMLParser(resolve_entities=False, no_network=True)
     parser.set_element_class_lookup(parser_lookup)
     
     Element = parser.makeelement
    
  • test/unit/common/middleware/s3api/test_multi_delete.py+40 0 modified
    @@ -13,6 +13,7 @@
     # See the License for the specific language governing permissions and
     # limitations under the License.
     
    +import base64
     import json
     import unittest
     from datetime import datetime
    @@ -284,6 +285,45 @@ def test_object_multi_DELETE_with_fullcontrol_permission(self):
             elem = fromstring(body)
             self.assertEqual(len(elem.findall('Deleted')), len(self.keys))
     
    +    def test_object_multi_DELETE_with_system_entity(self):
    +        self.keys = ['Key1', 'Key2']
    +        self.swift.register(
    +            'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[0],
    +            swob.HTTPNotFound, {}, None)
    +        self.swift.register(
    +            'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[1],
    +            swob.HTTPNoContent, {}, None)
    +
    +        elem = Element('Delete')
    +        for key in self.keys:
    +            obj = SubElement(elem, 'Object')
    +            SubElement(obj, 'Key').text = key
    +        body = tostring(elem, use_s3ns=False)
    +        body = body.replace(
    +            b'?>\n',
    +            b'?>\n<!DOCTYPE foo '
    +            b'[<!ENTITY ent SYSTEM "file:///etc/passwd"> ]>\n',
    +        ).replace(b'>Key1<', b'>Key1&ent;<')
    +        content_md5 = (
    +            base64.b64encode(md5(body).digest())
    +            .strip())
    +
    +        req = Request.blank('/bucket?delete',
    +                            environ={'REQUEST_METHOD': 'POST'},
    +                            headers={
    +                                'Authorization': 'AWS test:full_control:hmac',
    +                                'Date': self.get_date_header(),
    +                                'Content-MD5': content_md5},
    +                            body=body)
    +        req.date = datetime.now()
    +        req.content_type = 'text/plain'
    +
    +        status, headers, body = self.call_s3api(req)
    +        self.assertEqual(status, '200 OK', body)
    +        self.assertIn(b'<Deleted><Key>Key2</Key></Deleted>', body)
    +        self.assertNotIn(b'root:/root', body)
    +        self.assertIn(b'<Deleted><Key>Key1</Key></Deleted>', body)
    +
         def _test_no_body(self, use_content_length=False,
                           use_transfer_encoding=False, string_to_md5=''):
             content_md5 = md5(string_to_md5).digest().encode('base64').strip()
    
b8467e190f6f

s3api: Prevent XXE injections

https://github.com/openstack/swiftAymeric DucroquetzOct 25, 2022via ghsa
3 files changed · +270 1
  • swift/common/middleware/s3api/etree.py+1 1 modified
    @@ -130,7 +130,7 @@ def text(self, value):
     
     
     parser_lookup = lxml.etree.ElementDefaultClassLookup(element=_Element)
    -parser = lxml.etree.XMLParser()
    +parser = lxml.etree.XMLParser(resolve_entities=False, no_network=True)
     parser.set_element_class_lookup(parser_lookup)
     
     Element = parser.makeelement
    
  • test/functional/s3api/test_xxe_injection.py+229 0 added
    @@ -0,0 +1,229 @@
    +#!/usr/bin/env python
    +# Copyright (c) 2022 OpenStack Foundation
    +#
    +# Licensed under the Apache License, Version 2.0 (the "License");
    +# you may not use this file except in compliance with the License.
    +# You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
    +# implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +
    +import base64
    +import requests
    +
    +import botocore
    +
    +from swift.common.utils import md5
    +
    +import test.functional as tf
    +from test.functional.s3api import S3ApiBaseBoto3
    +
    +
    +class TestS3ApiXxeInjection(S3ApiBaseBoto3):
    +
    +    def setUp(self):
    +        super(TestS3ApiXxeInjection, self).setUp()
    +        self.bucket = 'test-s3api-xxe-injection'
    +
    +    def _create_bucket(self, **kwargs):
    +        resp = self.conn.create_bucket(Bucket=self.bucket, **kwargs)
    +        response_metadata = resp.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +
    +    @staticmethod
    +    def _clear_data(request, **_kwargs):
    +        request.data = b''
    +
    +    def _presign_url(self, method, key=None, **kwargs):
    +        params = {
    +            'Bucket': self.bucket
    +        }
    +        if key:
    +            params['Key'] = key
    +        params.update(kwargs)
    +        try:
    +            # https://github.com/boto/boto3/issues/2192
    +            self.conn.meta.events.register(
    +                'before-sign.s3.*', self._clear_data)
    +            return self.conn.generate_presigned_url(
    +                method, Params=params, ExpiresIn=60)
    +        finally:
    +            self.conn.meta.events.unregister(
    +                'before-sign.s3.*', self._clear_data)
    +
    +    def test_put_bucket_acl(self):
    +        if not tf.cluster_info['s3api'].get('s3_acl'):
    +            self.skipTest('s3_acl must be enabled')
    +
    +        self._create_bucket()
    +
    +        url = self._presign_url('put_bucket_acl')
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +<Owner>
    +    <DisplayName>test:tester</DisplayName>
    +    <ID>test:tester</ID>
    +</Owner>
    +<AccessControlList>
    +    <Grant>
    +        <Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="CanonicalUser">
    +            <DisplayName>name&xxe;</DisplayName>
    +            <ID>id&xxe;</ID>
    +        </Grantee>
    +        <Permission>WRITE</Permission>
    +    </Grant>
    +</AccessControlList>
    +</AccessControlPolicy>
    +""")  # noqa: E501
    +        self.assertEqual(200, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        acl = self.conn.get_bucket_acl(Bucket=self.bucket)
    +        response_metadata = acl.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        self.assertDictEqual({
    +            'Owner': {
    +                'DisplayName': 'test:tester',
    +                'ID': 'test:tester'
    +            },
    +            'Grants': [
    +                {
    +                    'Grantee': {
    +                        'DisplayName': 'id',
    +                        'ID': 'id',
    +                        'Type': 'CanonicalUser'
    +                    },
    +                    'Permission': 'WRITE'
    +                }
    +            ]
    +        }, acl)
    +
    +    def test_create_bucket(self):
    +        url = self._presign_url('create_bucket')
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <LocationConstraint>&xxe;</LocationConstraint>
    +</CreateBucketConfiguration>
    +""")  # noqa: E501
    +        self.assertEqual(400, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        self.assertRaisesRegex(
    +            botocore.exceptions.ClientError, 'Not Found',
    +            self.conn.head_bucket, Bucket=self.bucket)
    +
    +    def test_delete_objects(self):
    +        self._create_bucket()
    +
    +        url = self._presign_url(
    +            'delete_objects',
    +            Delete={
    +                'Objects': [
    +                    {
    +                        'Key': 'string',
    +                        'VersionId': 'string'
    +                    }
    +                ]
    +            })
    +        body = """
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<Delete xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <Object>
    +        <Key>&xxe;</Key>
    +    </Object>
    +</Delete>
    +"""
    +        body = body.encode('utf-8')
    +        content_md5 = (
    +            base64.b64encode(md5(body, usedforsecurity=False).digest()))
    +        resp = requests.post(
    +            url, headers={'Content-MD5': content_md5}, data=body)
    +        self.assertEqual(400, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +    def test_complete_multipart_upload(self):
    +        self._create_bucket()
    +
    +        resp = self.conn.create_multipart_upload(
    +            Bucket=self.bucket, Key='test')
    +        response_metadata = resp.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        uploadid = resp.get('UploadId')
    +
    +        try:
    +            url = self._presign_url(
    +                'complete_multipart_upload',
    +                Key='key',
    +                MultipartUpload={
    +                    'Parts': [
    +                        {
    +                            'ETag': 'string',
    +                            'PartNumber': 1
    +                        }
    +                    ],
    +                },
    +                UploadId=uploadid)
    +            resp = requests.post(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +   <Part>
    +      <ETag>"{uploadid}"</ETag>
    +      <PartNumber>&xxe;</PartNumber>
    +   </Part>
    +</CompleteMultipartUpload>
    +""")  # noqa: E501
    +            self.assertEqual(404, resp.status_code)
    +            self.assertNotIn(b'xxe', resp.content)
    +            self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +            resp = requests.post(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +   <Part>
    +      <ETag>"&xxe;"</ETag>
    +      <PartNumber>1</PartNumber>
    +   </Part>
    +</CompleteMultipartUpload>
    +""")  # noqa: E501
    +            self.assertEqual(404, resp.status_code)
    +            self.assertNotIn(b'xxe', resp.content)
    +            self.assertNotIn(b'[swift-hash]', resp.content)
    +        finally:
    +            resp = self.conn.abort_multipart_upload(
    +                Bucket=self.bucket, Key='test', UploadId=uploadid)
    +            response_metadata = resp.pop('ResponseMetadata', {})
    +            self.assertEqual(204, response_metadata.get('HTTPStatusCode'))
    +
    +    def test_put_bucket_versioning(self):
    +        self._create_bucket()
    +
    +        url = self._presign_url(
    +            'put_bucket_versioning',
    +            VersioningConfiguration={
    +                'Status': 'Enabled'
    +            })
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <Status>&xxe;</Status>
    +</VersioningConfiguration>
    +""")  # noqa: E501
    +        self.assertEqual(400, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        versioning = self.conn.get_bucket_versioning(Bucket=self.bucket)
    +        response_metadata = versioning.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        self.assertDictEqual({}, versioning)
    
  • test/unit/common/middleware/s3api/test_multi_delete.py+40 0 modified
    @@ -523,6 +523,7 @@ def test_object_multi_DELETE_unhandled_exception(self):
                                 body=body)
             status, headers, body = self.call_s3api(req)
             self.assertEqual(status.split()[0], '200')
    +        self.assertIn(b'<Error><Key>Key1</Key><Code>Server Error</Code>', body)
     
         def _test_object_multi_DELETE(self, account):
             self.keys = ['Key1', 'Key2']
    @@ -580,6 +581,45 @@ def test_object_multi_DELETE_with_fullcontrol_permission(self):
             elem = fromstring(body)
             self.assertEqual(len(elem.findall('Deleted')), len(self.keys))
     
    +    def test_object_multi_DELETE_with_system_entity(self):
    +        self.keys = ['Key1', 'Key2']
    +        self.swift.register(
    +            'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[0],
    +            swob.HTTPNotFound, {}, None)
    +        self.swift.register(
    +            'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[1],
    +            swob.HTTPNoContent, {}, None)
    +
    +        elem = Element('Delete')
    +        for key in self.keys:
    +            obj = SubElement(elem, 'Object')
    +            SubElement(obj, 'Key').text = key
    +        body = tostring(elem, use_s3ns=False)
    +        body = body.replace(
    +            b'?>\n',
    +            b'?>\n<!DOCTYPE foo '
    +            b'[<!ENTITY ent SYSTEM "file:///etc/passwd"> ]>\n',
    +        ).replace(b'>Key1<', b'>Key1&ent;<')
    +        content_md5 = (
    +            base64.b64encode(md5(body, usedforsecurity=False).digest())
    +            .strip())
    +
    +        req = Request.blank('/bucket?delete',
    +                            environ={'REQUEST_METHOD': 'POST'},
    +                            headers={
    +                                'Authorization': 'AWS test:full_control:hmac',
    +                                'Date': self.get_date_header(),
    +                                'Content-MD5': content_md5},
    +                            body=body)
    +        req.date = datetime.now()
    +        req.content_type = 'text/plain'
    +
    +        status, headers, body = self.call_s3api(req)
    +        self.assertEqual(status, '200 OK', body)
    +        self.assertIn(b'<Deleted><Key>Key2</Key></Deleted>', body)
    +        self.assertNotIn(b'root:/root', body)
    +        self.assertIn(b'<Deleted><Key>Key1</Key></Deleted>', body)
    +
         def _test_no_body(self, use_content_length=False,
                           use_transfer_encoding=False, string_to_md5=b''):
             content_md5 = (base64.b64encode(
    
8dd96470a859

s3api: Prevent XXE injections

https://github.com/openstack/swiftAymeric DucroquetzOct 25, 2022via ghsa
3 files changed · +270 1
  • swift/common/middleware/s3api/etree.py+1 1 modified
    @@ -130,7 +130,7 @@ def text(self, value):
     
     
     parser_lookup = lxml.etree.ElementDefaultClassLookup(element=_Element)
    -parser = lxml.etree.XMLParser()
    +parser = lxml.etree.XMLParser(resolve_entities=False, no_network=True)
     parser.set_element_class_lookup(parser_lookup)
     
     Element = parser.makeelement
    
  • test/functional/s3api/test_xxe_injection.py+229 0 added
    @@ -0,0 +1,229 @@
    +#!/usr/bin/env python
    +# Copyright (c) 2022 OpenStack Foundation
    +#
    +# Licensed under the Apache License, Version 2.0 (the "License");
    +# you may not use this file except in compliance with the License.
    +# You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
    +# implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +
    +import base64
    +import requests
    +
    +import botocore
    +
    +from swift.common.utils import md5
    +
    +import test.functional as tf
    +from test.functional.s3api import S3ApiBaseBoto3
    +
    +
    +class TestS3ApiXxeInjection(S3ApiBaseBoto3):
    +
    +    def setUp(self):
    +        super(TestS3ApiXxeInjection, self).setUp()
    +        self.bucket = 'test-s3api-xxe-injection'
    +
    +    def _create_bucket(self, **kwargs):
    +        resp = self.conn.create_bucket(Bucket=self.bucket, **kwargs)
    +        response_metadata = resp.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +
    +    @staticmethod
    +    def _clear_data(request, **_kwargs):
    +        request.data = b''
    +
    +    def _presign_url(self, method, key=None, **kwargs):
    +        params = {
    +            'Bucket': self.bucket
    +        }
    +        if key:
    +            params['Key'] = key
    +        params.update(kwargs)
    +        try:
    +            # https://github.com/boto/boto3/issues/2192
    +            self.conn.meta.events.register(
    +                'before-sign.s3.*', self._clear_data)
    +            return self.conn.generate_presigned_url(
    +                method, Params=params, ExpiresIn=60)
    +        finally:
    +            self.conn.meta.events.unregister(
    +                'before-sign.s3.*', self._clear_data)
    +
    +    def test_put_bucket_acl(self):
    +        if not tf.cluster_info['s3api'].get('s3_acl'):
    +            self.skipTest('s3_acl must be enabled')
    +
    +        self._create_bucket()
    +
    +        url = self._presign_url('put_bucket_acl')
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<AccessControlPolicy xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +<Owner>
    +    <DisplayName>test:tester</DisplayName>
    +    <ID>test:tester</ID>
    +</Owner>
    +<AccessControlList>
    +    <Grant>
    +        <Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:type="CanonicalUser">
    +            <DisplayName>name&xxe;</DisplayName>
    +            <ID>id&xxe;</ID>
    +        </Grantee>
    +        <Permission>WRITE</Permission>
    +    </Grant>
    +</AccessControlList>
    +</AccessControlPolicy>
    +""")  # noqa: E501
    +        self.assertEqual(200, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        acl = self.conn.get_bucket_acl(Bucket=self.bucket)
    +        response_metadata = acl.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        self.assertDictEqual({
    +            'Owner': {
    +                'DisplayName': 'test:tester',
    +                'ID': 'test:tester'
    +            },
    +            'Grants': [
    +                {
    +                    'Grantee': {
    +                        'DisplayName': 'id',
    +                        'ID': 'id',
    +                        'Type': 'CanonicalUser'
    +                    },
    +                    'Permission': 'WRITE'
    +                }
    +            ]
    +        }, acl)
    +
    +    def test_create_bucket(self):
    +        url = self._presign_url('create_bucket')
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <LocationConstraint>&xxe;</LocationConstraint>
    +</CreateBucketConfiguration>
    +""")  # noqa: E501
    +        self.assertEqual(400, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        self.assertRaisesRegex(
    +            botocore.exceptions.ClientError, 'Not Found',
    +            self.conn.head_bucket, Bucket=self.bucket)
    +
    +    def test_delete_objects(self):
    +        self._create_bucket()
    +
    +        url = self._presign_url(
    +            'delete_objects',
    +            Delete={
    +                'Objects': [
    +                    {
    +                        'Key': 'string',
    +                        'VersionId': 'string'
    +                    }
    +                ]
    +            })
    +        body = """
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<Delete xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <Object>
    +        <Key>&xxe;</Key>
    +    </Object>
    +</Delete>
    +"""
    +        body = body.encode('utf-8')
    +        content_md5 = (
    +            base64.b64encode(md5(body, usedforsecurity=False).digest()))
    +        resp = requests.post(
    +            url, headers={'Content-MD5': content_md5}, data=body)
    +        self.assertEqual(400, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +    def test_complete_multipart_upload(self):
    +        self._create_bucket()
    +
    +        resp = self.conn.create_multipart_upload(
    +            Bucket=self.bucket, Key='test')
    +        response_metadata = resp.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        uploadid = resp.get('UploadId')
    +
    +        try:
    +            url = self._presign_url(
    +                'complete_multipart_upload',
    +                Key='key',
    +                MultipartUpload={
    +                    'Parts': [
    +                        {
    +                            'ETag': 'string',
    +                            'PartNumber': 1
    +                        }
    +                    ],
    +                },
    +                UploadId=uploadid)
    +            resp = requests.post(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +   <Part>
    +      <ETag>"{uploadid}"</ETag>
    +      <PartNumber>&xxe;</PartNumber>
    +   </Part>
    +</CompleteMultipartUpload>
    +""")  # noqa: E501
    +            self.assertEqual(404, resp.status_code)
    +            self.assertNotIn(b'xxe', resp.content)
    +            self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +            resp = requests.post(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<CompleteMultipartUpload xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +   <Part>
    +      <ETag>"&xxe;"</ETag>
    +      <PartNumber>1</PartNumber>
    +   </Part>
    +</CompleteMultipartUpload>
    +""")  # noqa: E501
    +            self.assertEqual(404, resp.status_code)
    +            self.assertNotIn(b'xxe', resp.content)
    +            self.assertNotIn(b'[swift-hash]', resp.content)
    +        finally:
    +            resp = self.conn.abort_multipart_upload(
    +                Bucket=self.bucket, Key='test', UploadId=uploadid)
    +            response_metadata = resp.pop('ResponseMetadata', {})
    +            self.assertEqual(204, response_metadata.get('HTTPStatusCode'))
    +
    +    def test_put_bucket_versioning(self):
    +        self._create_bucket()
    +
    +        url = self._presign_url(
    +            'put_bucket_versioning',
    +            VersioningConfiguration={
    +                'Status': 'Enabled'
    +            })
    +        resp = requests.put(url, data="""
    +<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/swift/swift.conf"> ]>
    +<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
    +    <Status>&xxe;</Status>
    +</VersioningConfiguration>
    +""")  # noqa: E501
    +        self.assertEqual(400, resp.status_code)
    +        self.assertNotIn(b'xxe', resp.content)
    +        self.assertNotIn(b'[swift-hash]', resp.content)
    +
    +        versioning = self.conn.get_bucket_versioning(Bucket=self.bucket)
    +        response_metadata = versioning.pop('ResponseMetadata', {})
    +        self.assertEqual(200, response_metadata.get('HTTPStatusCode'))
    +        self.assertDictEqual({}, versioning)
    
  • test/unit/common/middleware/s3api/test_multi_delete.py+40 0 modified
    @@ -523,6 +523,7 @@ def test_object_multi_DELETE_unhandled_exception(self):
                                 body=body)
             status, headers, body = self.call_s3api(req)
             self.assertEqual(status.split()[0], '200')
    +        self.assertIn(b'<Error><Key>Key1</Key><Code>Server Error</Code>', body)
     
         def _test_object_multi_DELETE(self, account):
             self.keys = ['Key1', 'Key2']
    @@ -580,6 +581,45 @@ def test_object_multi_DELETE_with_fullcontrol_permission(self):
             elem = fromstring(body)
             self.assertEqual(len(elem.findall('Deleted')), len(self.keys))
     
    +    def test_object_multi_DELETE_with_system_entity(self):
    +        self.keys = ['Key1', 'Key2']
    +        self.swift.register(
    +            'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[0],
    +            swob.HTTPNotFound, {}, None)
    +        self.swift.register(
    +            'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[1],
    +            swob.HTTPNoContent, {}, None)
    +
    +        elem = Element('Delete')
    +        for key in self.keys:
    +            obj = SubElement(elem, 'Object')
    +            SubElement(obj, 'Key').text = key
    +        body = tostring(elem, use_s3ns=False)
    +        body = body.replace(
    +            b'?>\n',
    +            b'?>\n<!DOCTYPE foo '
    +            b'[<!ENTITY ent SYSTEM "file:///etc/passwd"> ]>\n',
    +        ).replace(b'>Key1<', b'>Key1&ent;<')
    +        content_md5 = (
    +            base64.b64encode(md5(body, usedforsecurity=False).digest())
    +            .strip())
    +
    +        req = Request.blank('/bucket?delete',
    +                            environ={'REQUEST_METHOD': 'POST'},
    +                            headers={
    +                                'Authorization': 'AWS test:full_control:hmac',
    +                                'Date': self.get_date_header(),
    +                                'Content-MD5': content_md5},
    +                            body=body)
    +        req.date = datetime.now()
    +        req.content_type = 'text/plain'
    +
    +        status, headers, body = self.call_s3api(req)
    +        self.assertEqual(status, '200 OK', body)
    +        self.assertIn(b'<Deleted><Key>Key2</Key></Deleted>', body)
    +        self.assertNotIn(b'root:/root', body)
    +        self.assertIn(b'<Deleted><Key>Key1</Key></Deleted>', body)
    +
         def _test_no_body(self, use_content_length=False,
                           use_transfer_encoding=False, string_to_md5=b''):
             content_md5 = (base64.b64encode(
    

Vulnerability mechanics

Generated on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

14

News mentions

0

No linked articles in our index yet.