High severityOSV Advisory· Published Jan 5, 2026· Updated Jan 6, 2026
AIOHTTP's HTTP Parser auto_decompress feature is vulnerable to zip bomb
CVE-2025-69223
Description
AIOHTTP is an asynchronous HTTP client/server framework for asyncio and Python. Versions 3.13.2 and below allow a zip bomb to be used to execute a DoS against the AIOHTTP server. An attacker may be able to send a compressed request that when decompressed by AIOHTTP could exhaust the host's memory. This issue is fixed in version 3.13.3.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
aiohttpPyPI | < 3.13.3 | 3.13.3 |
Affected products
1Patches
12b920c39002cUse decompressor max_length parameter (#11898) (#11918)
12 files changed · +335 −88
aiohttp/compression_utils.py+75 −46 modified@@ -1,6 +1,7 @@ import asyncio import sys import zlib +from abc import ABC, abstractmethod from concurrent.futures import Executor from typing import Any, Final, Optional, Protocol, TypedDict, cast @@ -32,7 +33,12 @@ HAS_ZSTD = False -MAX_SYNC_CHUNK_SIZE = 1024 +MAX_SYNC_CHUNK_SIZE = 4096 +DEFAULT_MAX_DECOMPRESS_SIZE = 2**25 # 32MiB + +# Unlimited decompression constants - different libraries use different conventions +ZLIB_MAX_LENGTH_UNLIMITED = 0 # zlib uses 0 to mean unlimited +ZSTD_MAX_LENGTH_UNLIMITED = -1 # zstd uses -1 to mean unlimited class ZLibCompressObjProtocol(Protocol): @@ -144,19 +150,37 @@ def encoding_to_mode( return -ZLibBackend.MAX_WBITS if suppress_deflate_header else ZLibBackend.MAX_WBITS -class ZlibBaseHandler: +class DecompressionBaseHandler(ABC): def __init__( self, - mode: int, executor: Optional[Executor] = None, max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE, ): - self._mode = mode + """Base class for decompression handlers.""" self._executor = executor self._max_sync_chunk_size = max_sync_chunk_size + @abstractmethod + def decompress_sync( + self, data: bytes, max_length: int = ZLIB_MAX_LENGTH_UNLIMITED + ) -> bytes: + """Decompress the given data.""" + + async def decompress( + self, data: bytes, max_length: int = ZLIB_MAX_LENGTH_UNLIMITED + ) -> bytes: + """Decompress the given data.""" + if ( + self._max_sync_chunk_size is not None + and len(data) > self._max_sync_chunk_size + ): + return await asyncio.get_event_loop().run_in_executor( + self._executor, self.decompress_sync, data, max_length + ) + return self.decompress_sync(data, max_length) + -class ZLibCompressor(ZlibBaseHandler): +class ZLibCompressor: def __init__( self, encoding: Optional[str] = None, @@ -167,14 +191,12 @@ def __init__( executor: Optional[Executor] = None, max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE, ): - super().__init__( - mode=( - encoding_to_mode(encoding, suppress_deflate_header) - if wbits is None - else wbits - ), - executor=executor, - max_sync_chunk_size=max_sync_chunk_size, + self._executor = executor + self._max_sync_chunk_size = max_sync_chunk_size + self._mode = ( + encoding_to_mode(encoding, suppress_deflate_header) + if wbits is None + else wbits ) self._zlib_backend: Final = ZLibBackendWrapper(ZLibBackend._zlib_backend) @@ -233,41 +255,24 @@ def flush(self, mode: Optional[int] = None) -> bytes: ) -class ZLibDecompressor(ZlibBaseHandler): +class ZLibDecompressor(DecompressionBaseHandler): def __init__( self, encoding: Optional[str] = None, suppress_deflate_header: bool = False, executor: Optional[Executor] = None, max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE, ): - super().__init__( - mode=encoding_to_mode(encoding, suppress_deflate_header), - executor=executor, - max_sync_chunk_size=max_sync_chunk_size, - ) + super().__init__(executor=executor, max_sync_chunk_size=max_sync_chunk_size) + self._mode = encoding_to_mode(encoding, suppress_deflate_header) self._zlib_backend: Final = ZLibBackendWrapper(ZLibBackend._zlib_backend) self._decompressor = self._zlib_backend.decompressobj(wbits=self._mode) - def decompress_sync(self, data: bytes, max_length: int = 0) -> bytes: + def decompress_sync( + self, data: Buffer, max_length: int = ZLIB_MAX_LENGTH_UNLIMITED + ) -> bytes: return self._decompressor.decompress(data, max_length) - async def decompress(self, data: bytes, max_length: int = 0) -> bytes: - """Decompress the data and return the decompressed bytes. - - If the data size is large than the max_sync_chunk_size, the decompression - will be done in the executor. Otherwise, the decompression will be done - in the event loop. - """ - if ( - self._max_sync_chunk_size is not None - and len(data) > self._max_sync_chunk_size - ): - return await asyncio.get_running_loop().run_in_executor( - self._executor, self._decompressor.decompress, data, max_length - ) - return self.decompress_sync(data, max_length) - def flush(self, length: int = 0) -> bytes: return ( self._decompressor.flush(length) @@ -280,40 +285,64 @@ def eof(self) -> bool: return self._decompressor.eof -class BrotliDecompressor: +class BrotliDecompressor(DecompressionBaseHandler): # Supports both 'brotlipy' and 'Brotli' packages # since they share an import name. The top branches # are for 'brotlipy' and bottom branches for 'Brotli' - def __init__(self) -> None: + def __init__( + self, + executor: Optional[Executor] = None, + max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE, + ) -> None: + """Decompress data using the Brotli library.""" if not HAS_BROTLI: raise RuntimeError( "The brotli decompression is not available. " "Please install `Brotli` module" ) self._obj = brotli.Decompressor() + super().__init__(executor=executor, max_sync_chunk_size=max_sync_chunk_size) - def decompress_sync(self, data: bytes) -> bytes: + def decompress_sync( + self, data: Buffer, max_length: int = ZLIB_MAX_LENGTH_UNLIMITED + ) -> bytes: + """Decompress the given data.""" if hasattr(self._obj, "decompress"): - return cast(bytes, self._obj.decompress(data)) - return cast(bytes, self._obj.process(data)) + return cast(bytes, self._obj.decompress(data, max_length)) + return cast(bytes, self._obj.process(data, max_length)) def flush(self) -> bytes: + """Flush the decompressor.""" if hasattr(self._obj, "flush"): return cast(bytes, self._obj.flush()) return b"" -class ZSTDDecompressor: - def __init__(self) -> None: +class ZSTDDecompressor(DecompressionBaseHandler): + def __init__( + self, + executor: Optional[Executor] = None, + max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE, + ) -> None: if not HAS_ZSTD: raise RuntimeError( "The zstd decompression is not available. " "Please install `backports.zstd` module" ) self._obj = ZstdDecompressor() - - def decompress_sync(self, data: bytes) -> bytes: - return self._obj.decompress(data) + super().__init__(executor=executor, max_sync_chunk_size=max_sync_chunk_size) + + def decompress_sync( + self, data: bytes, max_length: int = ZLIB_MAX_LENGTH_UNLIMITED + ) -> bytes: + # zstd uses -1 for unlimited, while zlib uses 0 for unlimited + # Convert the zlib convention (0=unlimited) to zstd convention (-1=unlimited) + zstd_max_length = ( + ZSTD_MAX_LENGTH_UNLIMITED + if max_length == ZLIB_MAX_LENGTH_UNLIMITED + else max_length + ) + return self._obj.decompress(data, zstd_max_length) def flush(self) -> bytes: return b""
aiohttp/http_exceptions.py+4 −0 modified@@ -74,6 +74,10 @@ class ContentLengthError(PayloadEncodingError): """Not enough data to satisfy content length header.""" +class DecompressSizeError(PayloadEncodingError): + """Decompressed size exceeds the configured limit.""" + + class LineTooLong(BadHttpMessage): def __init__( self, line: str, limit: str = "Unknown", actual_size: str = "Unknown"
aiohttp/http_parser.py+21 −2 modified@@ -27,6 +27,7 @@ from . import hdrs from .base_protocol import BaseProtocol from .compression_utils import ( + DEFAULT_MAX_DECOMPRESS_SIZE, HAS_BROTLI, HAS_ZSTD, BrotliDecompressor, @@ -48,6 +49,7 @@ BadStatusLine, ContentEncodingError, ContentLengthError, + DecompressSizeError, InvalidHeader, InvalidURLError, LineTooLong, @@ -963,7 +965,12 @@ class DeflateBuffer: decompressor: Any - def __init__(self, out: StreamReader, encoding: Optional[str]) -> None: + def __init__( + self, + out: StreamReader, + encoding: Optional[str], + max_decompress_size: int = DEFAULT_MAX_DECOMPRESS_SIZE, + ) -> None: self.out = out self.size = 0 out.total_compressed_bytes = self.size @@ -988,6 +995,8 @@ def __init__(self, out: StreamReader, encoding: Optional[str]) -> None: else: self.decompressor = ZLibDecompressor(encoding=encoding) + self._max_decompress_size = max_decompress_size + def set_exception( self, exc: BaseException, @@ -1017,14 +1026,24 @@ def feed_data(self, chunk: bytes, size: int) -> None: ) try: - chunk = self.decompressor.decompress_sync(chunk) + # Decompress with limit + 1 so we can detect if output exceeds limit + chunk = self.decompressor.decompress_sync( + chunk, max_length=self._max_decompress_size + 1 + ) except Exception: raise ContentEncodingError( "Can not decode content-encoding: %s" % self.encoding ) self._started_decoding = True + # Check if decompression limit was exceeded + if len(chunk) > self._max_decompress_size: + raise DecompressSizeError( + "Decompressed data exceeds the configured limit of %d bytes" + % self._max_decompress_size + ) + if chunk: self.out.feed_data(chunk, len(chunk))
aiohttp/multipart.py+20 −11 modified@@ -25,7 +25,12 @@ from multidict import CIMultiDict, CIMultiDictProxy -from .compression_utils import ZLibCompressor, ZLibDecompressor +from .abc import AbstractStreamWriter +from .compression_utils import ( + DEFAULT_MAX_DECOMPRESS_SIZE, + ZLibCompressor, + ZLibDecompressor, +) from .hdrs import ( CONTENT_DISPOSITION, CONTENT_ENCODING, @@ -273,6 +278,7 @@ def __init__( *, subtype: str = "mixed", default_charset: Optional[str] = None, + max_decompress_size: int = DEFAULT_MAX_DECOMPRESS_SIZE, ) -> None: self.headers = headers self._boundary = boundary @@ -289,6 +295,7 @@ def __init__( self._prev_chunk: Optional[bytes] = None self._content_eof = 0 self._cache: Dict[str, Any] = {} + self._max_decompress_size = max_decompress_size def __aiter__(self: Self) -> Self: return self @@ -318,7 +325,7 @@ async def read(self, *, decode: bool = False) -> bytes: while not self._at_eof: data.extend(await self.read_chunk(self.chunk_size)) if decode: - return self.decode(data) + return await self.decode(data) return data async def read_chunk(self, size: int = chunk_size) -> bytes: @@ -496,7 +503,7 @@ def at_eof(self) -> bool: """Returns True if the boundary was reached or False otherwise.""" return self._at_eof - def decode(self, data: bytes) -> bytes: + async def decode(self, data: bytes) -> bytes: """Decodes data. Decoding is done according the specified Content-Encoding @@ -506,18 +513,18 @@ def decode(self, data: bytes) -> bytes: data = self._decode_content_transfer(data) # https://datatracker.ietf.org/doc/html/rfc7578#section-4.8 if not self._is_form_data and CONTENT_ENCODING in self.headers: - return self._decode_content(data) + return await self._decode_content(data) return data - def _decode_content(self, data: bytes) -> bytes: + async def _decode_content(self, data: bytes) -> bytes: encoding = self.headers.get(CONTENT_ENCODING, "").lower() if encoding == "identity": return data if encoding in {"deflate", "gzip"}: - return ZLibDecompressor( + return await ZLibDecompressor( encoding=encoding, suppress_deflate_header=True, - ).decompress_sync(data) + ).decompress(data, max_length=self._max_decompress_size) raise RuntimeError(f"unknown content encoding: {encoding}") @@ -588,11 +595,11 @@ async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> byt """ raise TypeError("Unable to read body part as bytes. Use write() to consume.") - async def write(self, writer: Any) -> None: + async def write(self, writer: AbstractStreamWriter) -> None: field = self._value chunk = await field.read_chunk(size=2**16) while chunk: - await writer.write(field.decode(chunk)) + await writer.write(await field.decode(chunk)) chunk = await field.read_chunk(size=2**16) @@ -1032,7 +1039,9 @@ async def as_bytes(self, encoding: str = "utf-8", errors: str = "strict") -> byt return b"".join(parts) - async def write(self, writer: Any, close_boundary: bool = True) -> None: + async def write( + self, writer: AbstractStreamWriter, close_boundary: bool = True + ) -> None: """Write body.""" for part, encoding, te_encoding in self._parts: if self._is_form_data: @@ -1086,7 +1095,7 @@ async def close(self) -> None: class MultipartPayloadWriter: - def __init__(self, writer: Any) -> None: + def __init__(self, writer: AbstractStreamWriter) -> None: self._writer = writer self._encoding: Optional[str] = None self._compress: Optional[ZLibCompressor] = None
aiohttp/web_request.py+1 −1 modified@@ -740,7 +740,7 @@ async def post(self) -> "MultiDictProxy[Union[str, bytes, FileField]]": ) chunk = await field.read_chunk(size=2**16) while chunk: - chunk = field.decode(chunk) + chunk = await field.decode(chunk) await self._loop.run_in_executor(None, tmp.write, chunk) size += len(chunk) if 0 < max_size < size:
CHANGES/11898.breaking.rst+2 −0 added@@ -0,0 +1,2 @@ +``Brotli`` and ``brotlicffi`` minimum version is now 1.2. +Decompression now has a default maximum output size of 32MiB per decompress call -- by :user:`Dreamsorcerer`.
docs/spelling_wordlist.txt+1 −0 modified@@ -189,6 +189,7 @@ lowercased Mako manylinux metadata +MiB microservice middleware middlewares
pyproject.toml+2 −2 modified@@ -50,8 +50,8 @@ dynamic = [ [project.optional-dependencies] speedups = [ "aiodns >= 3.3.0", - "Brotli; platform_python_implementation == 'CPython'", - "brotlicffi; platform_python_implementation != 'CPython'", + "Brotli >= 1.2; platform_python_implementation == 'CPython'", + "brotlicffi >= 1.2; platform_python_implementation != 'CPython'", "backports.zstd; platform_python_implementation == 'CPython' and python_version < '3.14'", ]
requirements/runtime-deps.in+2 −2 modified@@ -6,8 +6,8 @@ aiosignal >= 1.4.0 async-timeout >= 4.0, < 6.0 ; python_version < '3.11' attrs >= 17.3.0 backports.zstd; platform_python_implementation == 'CPython' and python_version < '3.14' -Brotli; platform_python_implementation == 'CPython' -brotlicffi; platform_python_implementation != 'CPython' +Brotli >= 1.2; platform_python_implementation == 'CPython' +brotlicffi >= 1.2; platform_python_implementation != 'CPython' frozenlist >= 1.1.1 multidict >=4.5, < 7.0 propcache >= 0.2.0
tests/test_client_functional.py+112 −2 modified@@ -13,6 +13,7 @@ import tarfile import time import zipfile +import zlib from contextlib import suppress from typing import ( Any, @@ -26,6 +27,19 @@ ) from unittest import mock +try: + try: + import brotlicffi as brotli + except ImportError: + import brotli +except ImportError: + brotli = None # pragma: no cover + +try: + from backports.zstd import ZstdCompressor +except ImportError: + ZstdCompressor = None # type: ignore[assignment,misc] # pragma: no cover + import pytest from multidict import MultiDict from pytest_mock import MockerFixture @@ -45,7 +59,9 @@ TooManyRedirects, ) from aiohttp.client_reqrep import ClientRequest +from aiohttp.compression_utils import DEFAULT_MAX_DECOMPRESS_SIZE from aiohttp.connector import Connection +from aiohttp.http_exceptions import DecompressSizeError from aiohttp.http_writer import StreamWriter from aiohttp.payload import ( AsyncIterablePayload, @@ -2419,8 +2435,102 @@ async def handler(request): resp.close() -async def test_bad_payload_chunked_encoding(aiohttp_client) -> None: - async def handler(request): +async def test_payload_decompress_size_limit(aiohttp_client: AiohttpClient) -> None: + """Test that decompression size limit triggers DecompressSizeError. + + When a compressed payload expands beyond the configured limit, + we raise DecompressSizeError. + """ + # Create a highly compressible payload that exceeds the decompression limit. + # 64MiB of repeated bytes compresses to ~32KB but expands beyond the + # 32MiB per-call limit. + original = b"A" * (64 * 2**20) + compressed = zlib.compress(original) + assert len(original) > DEFAULT_MAX_DECOMPRESS_SIZE + + async def handler(request: web.Request) -> web.Response: + # Send compressed data with Content-Encoding header + resp = web.Response(body=compressed) + resp.headers["Content-Encoding"] = "deflate" + return resp + + app = web.Application() + app.router.add_get("/", handler) + client = await aiohttp_client(app) + + async with client.get("/") as resp: + assert resp.status == 200 + + with pytest.raises(aiohttp.ClientPayloadError) as exc_info: + await resp.read() + + assert isinstance(exc_info.value.__cause__, DecompressSizeError) + assert "Decompressed data exceeds" in str(exc_info.value.__cause__) + + +@pytest.mark.skipif(brotli is None, reason="brotli is not installed") +async def test_payload_decompress_size_limit_brotli( + aiohttp_client: AiohttpClient, +) -> None: + """Test that brotli decompression size limit triggers DecompressSizeError.""" + assert brotli is not None + # Create a highly compressible payload that exceeds the decompression limit. + original = b"A" * (64 * 2**20) + compressed = brotli.compress(original) + assert len(original) > DEFAULT_MAX_DECOMPRESS_SIZE + + async def handler(request: web.Request) -> web.Response: + resp = web.Response(body=compressed) + resp.headers["Content-Encoding"] = "br" + return resp + + app = web.Application() + app.router.add_get("/", handler) + client = await aiohttp_client(app) + + async with client.get("/") as resp: + assert resp.status == 200 + + with pytest.raises(aiohttp.ClientPayloadError) as exc_info: + await resp.read() + + assert isinstance(exc_info.value.__cause__, DecompressSizeError) + assert "Decompressed data exceeds" in str(exc_info.value.__cause__) + + +@pytest.mark.skipif(ZstdCompressor is None, reason="backports.zstd is not installed") +async def test_payload_decompress_size_limit_zstd( + aiohttp_client: AiohttpClient, +) -> None: + """Test that zstd decompression size limit triggers DecompressSizeError.""" + assert ZstdCompressor is not None + # Create a highly compressible payload that exceeds the decompression limit. + original = b"A" * (64 * 2**20) + compressor = ZstdCompressor() + compressed = compressor.compress(original) + compressor.flush() + assert len(original) > DEFAULT_MAX_DECOMPRESS_SIZE + + async def handler(request: web.Request) -> web.Response: + resp = web.Response(body=compressed) + resp.headers["Content-Encoding"] = "zstd" + return resp + + app = web.Application() + app.router.add_get("/", handler) + client = await aiohttp_client(app) + + async with client.get("/") as resp: + assert resp.status == 200 + + with pytest.raises(aiohttp.ClientPayloadError) as exc_info: + await resp.read() + + assert isinstance(exc_info.value.__cause__, DecompressSizeError) + assert "Decompressed data exceeds" in str(exc_info.value.__cause__) + + +async def test_bad_payload_chunked_encoding(aiohttp_client: AiohttpClient) -> None: + async def handler(request: web.Request) -> web.StreamResponse: resp = web.StreamResponse() resp.force_close() resp._length_check = False
tests/test_http_parser.py+34 −0 modified@@ -3,6 +3,7 @@ import asyncio import re import sys +import zlib from contextlib import nullcontext from typing import Any, Dict, List from unittest import mock @@ -1919,3 +1920,36 @@ async def test_empty_body(self, protocol: BaseProtocol) -> None: dbuf.feed_eof() assert buf.at_eof() + + @pytest.mark.parametrize( + "chunk_size", + [1024, 2**14, 2**16], # 1KB, 16KB, 64KB + ids=["1KB", "16KB", "64KB"], + ) + async def test_streaming_decompress_large_payload( + self, protocol: BaseProtocol, chunk_size: int + ) -> None: + """Test that large payloads decompress correctly when streamed in chunks. + + This simulates real HTTP streaming where compressed data arrives in + small network chunks. Each chunk's decompressed output should be within + the max_decompress_size limit, allowing full recovery of the original data. + """ + # Create a large payload (3MiB) that compresses well + original = b"A" * (3 * 2**20) + compressed = zlib.compress(original) + + buf = aiohttp.StreamReader(protocol, 2**16, loop=asyncio.get_running_loop()) + dbuf = DeflateBuffer(buf, "deflate") + + # Feed compressed data in chunks (simulating network streaming) + for i in range(0, len(compressed), chunk_size): + chunk = compressed[i : i + chunk_size] + dbuf.feed_data(chunk, len(chunk)) + + dbuf.feed_eof() + + # Read all decompressed data + result = b"".join(buf._buffer) + assert len(result) == len(original) + assert result == original
tests/test_multipart.py+61 −22 modified@@ -10,6 +10,7 @@ import aiohttp from aiohttp import payload +from aiohttp.abc import AbstractStreamWriter from aiohttp.compression_utils import ZLibBackend from aiohttp.hdrs import ( CONTENT_DISPOSITION, @@ -37,14 +38,14 @@ def buf(): @pytest.fixture -def stream(buf): - writer = mock.Mock() +def stream(buf: bytearray) -> AbstractStreamWriter: + writer = mock.create_autospec(AbstractStreamWriter, instance=True, spec_set=True) async def write(chunk): buf.extend(chunk) writer.write.side_effect = write - return writer + return writer # type: ignore[no-any-return] @pytest.fixture @@ -416,7 +417,7 @@ async def test_decode_with_content_transfer_encoding_base64(self) -> None: result = b"" while not obj.at_eof(): chunk = await obj.read_chunk(size=6) - result += obj.decode(chunk) + result += await obj.decode(chunk) assert b"Time to Relax!" == result @pytest.mark.parametrize("encoding", ("binary", "8bit", "7bit")) @@ -1129,7 +1130,9 @@ async def test_writer(writer) -> None: assert writer.boundary == ":" -async def test_writer_serialize_io_chunk(buf, stream, writer) -> None: +async def test_writer_serialize_io_chunk( + buf: bytearray, stream: AbstractStreamWriter, writer: aiohttp.MultipartWriter +) -> None: with io.BytesIO(b"foobarbaz") as file_handle: writer.append(file_handle) await writer.write(stream) @@ -1139,7 +1142,9 @@ async def test_writer_serialize_io_chunk(buf, stream, writer) -> None: ) -async def test_writer_serialize_json(buf, stream, writer) -> None: +async def test_writer_serialize_json( + buf: bytearray, stream: AbstractStreamWriter, writer: aiohttp.MultipartWriter +) -> None: writer.append_json({"привет": "мир"}) await writer.write(stream) assert ( @@ -1148,23 +1153,29 @@ async def test_writer_serialize_json(buf, stream, writer) -> None: ) -async def test_writer_serialize_form(buf, stream, writer) -> None: +async def test_writer_serialize_form( + buf: bytearray, stream: AbstractStreamWriter, writer: aiohttp.MultipartWriter +) -> None: data = [("foo", "bar"), ("foo", "baz"), ("boo", "zoo")] writer.append_form(data) await writer.write(stream) assert b"foo=bar&foo=baz&boo=zoo" in buf -async def test_writer_serialize_form_dict(buf, stream, writer) -> None: +async def test_writer_serialize_form_dict( + buf: bytearray, stream: AbstractStreamWriter, writer: aiohttp.MultipartWriter +) -> None: data = {"hello": "мир"} writer.append_form(data) await writer.write(stream) assert b"hello=%D0%BC%D0%B8%D1%80" in buf -async def test_writer_write(buf, stream, writer) -> None: +async def test_writer_write( + buf: bytearray, stream: AbstractStreamWriter, writer: aiohttp.MultipartWriter +) -> None: writer.append("foo-bar-baz") writer.append_json({"test": "passed"}) writer.append_form({"test": "passed"}) @@ -1210,7 +1221,9 @@ async def test_writer_write(buf, stream, writer) -> None: ) == bytes(buf) -async def test_writer_write_no_close_boundary(buf, stream) -> None: +async def test_writer_write_no_close_boundary( + buf: bytearray, stream: AbstractStreamWriter +) -> None: writer = aiohttp.MultipartWriter(boundary=":") writer.append("foo-bar-baz") writer.append_json({"test": "passed"}) @@ -1242,13 +1255,19 @@ async def test_writer_write_no_close_boundary(buf, stream) -> None: ) == bytes(buf) -async def test_writer_write_no_parts(buf, stream, writer) -> None: +async def test_writer_write_no_parts( + buf: bytearray, stream: AbstractStreamWriter, writer: aiohttp.MultipartWriter +) -> None: await writer.write(stream) assert b"--:--\r\n" == bytes(buf) @pytest.mark.usefixtures("parametrize_zlib_backend") -async def test_writer_serialize_with_content_encoding_gzip(buf, stream, writer): +async def test_writer_serialize_with_content_encoding_gzip( + buf: bytearray, + stream: AbstractStreamWriter, + writer: aiohttp.MultipartWriter, +) -> None: writer.append("Time to Relax!", {CONTENT_ENCODING: "gzip"}) await writer.write(stream) headers, message = bytes(buf).split(b"\r\n\r\n", 1) @@ -1264,7 +1283,9 @@ async def test_writer_serialize_with_content_encoding_gzip(buf, stream, writer): assert b"Time to Relax!" == data -async def test_writer_serialize_with_content_encoding_deflate(buf, stream, writer): +async def test_writer_serialize_with_content_encoding_deflate( + buf: bytearray, stream: AbstractStreamWriter, writer: aiohttp.MultipartWriter +) -> None: writer.append("Time to Relax!", {CONTENT_ENCODING: "deflate"}) await writer.write(stream) headers, message = bytes(buf).split(b"\r\n\r\n", 1) @@ -1278,7 +1299,9 @@ async def test_writer_serialize_with_content_encoding_deflate(buf, stream, write assert thing == message -async def test_writer_serialize_with_content_encoding_identity(buf, stream, writer): +async def test_writer_serialize_with_content_encoding_identity( + buf: bytearray, stream: AbstractStreamWriter, writer: aiohttp.MultipartWriter +) -> None: thing = b"\x0b\xc9\xccMU(\xc9W\x08J\xcdI\xacP\x04\x00" writer.append(thing, {CONTENT_ENCODING: "identity"}) await writer.write(stream) @@ -1293,12 +1316,16 @@ async def test_writer_serialize_with_content_encoding_identity(buf, stream, writ assert thing == message.split(b"\r\n")[0] -def test_writer_serialize_with_content_encoding_unknown(buf, stream, writer): +def test_writer_serialize_with_content_encoding_unknown( + buf: bytearray, stream: AbstractStreamWriter, writer: aiohttp.MultipartWriter +) -> None: with pytest.raises(RuntimeError): writer.append("Time to Relax!", {CONTENT_ENCODING: "snappy"}) -async def test_writer_with_content_transfer_encoding_base64(buf, stream, writer): +async def test_writer_with_content_transfer_encoding_base64( + buf: bytearray, stream: AbstractStreamWriter, writer: aiohttp.MultipartWriter +) -> None: writer.append("Time to Relax!", {CONTENT_TRANSFER_ENCODING: "base64"}) await writer.write(stream) headers, message = bytes(buf).split(b"\r\n\r\n", 1) @@ -1311,7 +1338,9 @@ async def test_writer_with_content_transfer_encoding_base64(buf, stream, writer) assert b"VGltZSB0byBSZWxheCE=" == message.split(b"\r\n")[0] -async def test_writer_content_transfer_encoding_quote_printable(buf, stream, writer): +async def test_writer_content_transfer_encoding_quote_printable( + buf: bytearray, stream: AbstractStreamWriter, writer: aiohttp.MultipartWriter +) -> None: writer.append("Привет, мир!", {CONTENT_TRANSFER_ENCODING: "quoted-printable"}) await writer.write(stream) headers, message = bytes(buf).split(b"\r\n\r\n", 1) @@ -1327,7 +1356,9 @@ async def test_writer_content_transfer_encoding_quote_printable(buf, stream, wri ) -def test_writer_content_transfer_encoding_unknown(buf, stream, writer) -> None: +def test_writer_content_transfer_encoding_unknown( + buf: bytearray, stream: AbstractStreamWriter, writer: aiohttp.MultipartWriter +) -> None: with pytest.raises(RuntimeError): writer.append("Time to Relax!", {CONTENT_TRANSFER_ENCODING: "unknown"}) @@ -1451,7 +1482,9 @@ def test_append_none_not_allowed(self) -> None: with aiohttp.MultipartWriter(boundary=":") as writer: writer.append(None) - async def test_write_preserves_content_disposition(self, buf, stream) -> None: + async def test_write_preserves_content_disposition( + self, buf: bytearray, stream: AbstractStreamWriter + ) -> None: with aiohttp.MultipartWriter(boundary=":") as writer: part = writer.append(b"foo", headers={CONTENT_TYPE: "test/passed"}) part.set_content_disposition("form-data", filename="bug") @@ -1468,7 +1501,9 @@ async def test_write_preserves_content_disposition(self, buf, stream) -> None: ) assert message == b"foo\r\n--:--\r\n" - async def test_preserve_content_disposition_header(self, buf, stream): + async def test_preserve_content_disposition_header( + self, buf: bytearray, stream: AbstractStreamWriter + ) -> None: # https://github.com/aio-libs/aiohttp/pull/3475#issuecomment-451072381 with pathlib.Path(__file__).open("rb") as fobj: with aiohttp.MultipartWriter("form-data", boundary=":") as writer: @@ -1492,7 +1527,9 @@ async def test_preserve_content_disposition_header(self, buf, stream): b'Content-Disposition: attachments; filename="bug.py"' ) - async def test_set_content_disposition_override(self, buf, stream): + async def test_set_content_disposition_override( + self, buf: bytearray, stream: AbstractStreamWriter + ) -> None: # https://github.com/aio-libs/aiohttp/pull/3475#issuecomment-451072381 with pathlib.Path(__file__).open("rb") as fobj: with aiohttp.MultipartWriter("form-data", boundary=":") as writer: @@ -1516,7 +1553,9 @@ async def test_set_content_disposition_override(self, buf, stream): b'Content-Disposition: attachments; filename="bug.py"' ) - async def test_reset_content_disposition_header(self, buf, stream): + async def test_reset_content_disposition_header( + self, buf: bytearray, stream: AbstractStreamWriter + ) -> None: # https://github.com/aio-libs/aiohttp/pull/3475#issuecomment-451072381 with pathlib.Path(__file__).open("rb") as fobj: with aiohttp.MultipartWriter("form-data", boundary=":") as writer:
Vulnerability mechanics
Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
4- github.com/advisories/GHSA-6mq8-rvhq-8wggghsaADVISORY
- nvd.nist.gov/vuln/detail/CVE-2025-69223ghsaADVISORY
- github.com/aio-libs/aiohttp/commit/2b920c39002cee0ec5b402581779bbaaf7c9138aghsax_refsource_MISCWEB
- github.com/aio-libs/aiohttp/security/advisories/GHSA-6mq8-rvhq-8wggghsax_refsource_CONFIRMWEB
News mentions
0No linked articles in our index yet.