CVE-2026-10142
Description
A denial-of-service vulnerability in kafka-python allows crafted frame lengths to cause memory exhaustion or connection hangs.
AI Insight
LLM-synthesized narrative grounded in this CVE's description and references.
A denial-of-service vulnerability in kafka-python allows crafted frame lengths to cause memory exhaustion or connection hangs.
Vulnerability
A denial-of-service vulnerability exists in the protocol parser of kafka-python versions prior to 2.3.2. The issue stems from a lack of bounds validation on the 4-byte frame length value within the receive_bytes() function. This allows a malicious broker or a man-in-the-middle attacker to send a specially crafted frame length [3].
Exploitation
An attacker can exploit this vulnerability by sending a specially crafted 4-byte frame length value through the receive_bytes() function. This can be achieved by a malicious broker or a machine-in-the-middle attacker without requiring any specific authentication or user interaction [3].
Impact
Successful exploitation can lead to either a multi-gigabyte memory allocation, causing denial of service by exhausting system memory, or an uncaught ValueError. The latter leaves the connection in a broken state, resulting in requests hanging and consumers failing to send heartbeats, necessitating a restart [3].
Mitigation
The vulnerability is fixed in kafka-python version 2.3.2. No workarounds are disclosed in the available references. The affected versions are prior to 2.3.2 [3].
AI Insight generated on Jun 10, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.
Affected products
1- Range: <2.3.2
Patches
26e4831444f97kafka.net: Validate SASL/SCRAM iterations (#3026)
1 file changed · +6 −1
kafka/sasl/scram.py+6 −1 modified@@ -99,7 +99,12 @@ def process_server_first_message(self, server_first_message): self.auth_message += b',c=biws,r=' + self.nonce salt = base64.b64decode(params['s'].encode('utf-8')) - iterations = int(params['i']) + try: + iterations = int(params['i']) + if iterations > 1000000: + raise ValueError('too many iterations') + except (TypeError, ValueError): + raise ValueError('Invalid value (not integer or too large) for Iteration count in server-first-message') self.create_salted_password(salt, iterations) self.client_key = self.hmac(self.salted_password, b'Client Key')
bdb46ab1fe4fKafkaProtocol: validate network frame size; retore log-prefix; fix 0.8.2 quirk check (#3019)
9 files changed · +146 −24
kafka/admin/client.py+4 −2 modified@@ -85,6 +85,9 @@ class KafkaAdminClient( max_in_flight_requests_per_connection (int): Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Default: 5. + receive_message_max_bytes (int): Maximum allowed network frame size. + Used to avoid OOM when decoding malformed network message header. + Default: 1000000. receive_buffer_bytes (int): The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). Java client defaults to 32768. @@ -162,11 +165,10 @@ class KafkaAdminClient( 'reconnect_backoff_ms': 50, 'reconnect_backoff_max_ms': 30000, 'max_in_flight_requests_per_connection': 5, + 'receive_message_max_bytes': 1000000, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], - 'sock_chunk_bytes': 4096, # undocumented experimental option - 'sock_chunk_buffer_count': 1000, # undocumented experimental option 'retry_backoff_ms': 100, 'metadata_max_age_ms': 300000, 'client_dns_lookup': 'use_all_dns_ips',
kafka/consumer/group.py+4 −2 modified@@ -176,6 +176,9 @@ class KafkaConsumer: should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. Default: 3000 + receive_message_max_bytes (int): Maximum allowed network frame size. + Used to avoid OOM when decoding malformed network message header. + Default: 1000000. receive_buffer_bytes (int): The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). The java client defaults to 32768. @@ -319,9 +322,8 @@ class KafkaConsumer: 'heartbeat_interval_ms': 3000, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, + 'receive_message_max_bytes': 1000000, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], - 'sock_chunk_bytes': 4096, # undocumented experimental option - 'sock_chunk_buffer_count': 1000, # undocumented experimental option 'consumer_timeout_ms': float('inf'), 'security_protocol': 'PLAINTEXT', 'ssl_context': None,
kafka/errors.py+4 −0 modified@@ -54,6 +54,10 @@ class CorrelationIdError(KafkaProtocolError): retriable = True +class InvalidReceiveError(KafkaProtocolError): + pass + + class KafkaTimeoutError(KafkaError): retriable = True
kafka/net/connection.py+7 −2 modified@@ -23,8 +23,9 @@ class KafkaConnection: 'client_id': 'kafka-python-' + __version__, 'client_software_name': 'kafka-python', 'client_software_version': __version__, - 'request_timeout_ms': 30000, 'max_in_flight_requests_per_connection': 5, + 'receive_message_max_bytes': 1000000, + 'request_timeout_ms': 30000, 'security_protocol': 'PLAINTEXT', 'sasl_mechanism': None, 'sasl_plain_username': None, @@ -263,7 +264,11 @@ def connection_made(self, transport): self.transport.set_protocol(self) self.initializing = True self.transport.resume_reading() - self.parser = KafkaProtocol(client_id=self.config['client_id']) + log_prefix = 'node=%s[%s:%s]' % (self.node_id, *self.transport.getPeer()) + self.parser = KafkaProtocol( + client_id=self.config['client_id'], + receive_message_max_bytes=self.config['receive_message_max_bytes'], + ident=log_prefix) self.net.call_soon(self._check_version) def pause(self, v):
kafka/net/manager.py+1 −0 modified@@ -26,6 +26,7 @@ class KafkaConnectionManager: 'client_id': 'kafka-python-' + __version__, 'client_software_name': 'kafka-python', 'client_software_version': __version__, + 'receive_message_max_bytes': 1000000, 'reconnect_backoff_ms': 50, 'reconnect_backoff_max_ms': 30000, 'request_timeout_ms': 30000,
kafka/producer/kafka.py+4 −2 modified@@ -278,6 +278,9 @@ class KafkaProducer: errors. Default: 100. request_timeout_ms (int): Client request timeout in milliseconds. Default: 30000. + receive_message_max_bytes (int): Maximum allowed network frame size. + Used to avoid OOM when decoding malformed network message header. + Default: 1000000. receive_buffer_bytes (int): The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). Java client defaults to 32768. @@ -409,11 +412,10 @@ class KafkaProducer: 'client_dns_lookup': 'use_all_dns_ips', 'retry_backoff_ms': 100, 'request_timeout_ms': 30000, + 'receive_message_max_bytes': 1000000, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], - 'sock_chunk_bytes': 4096, # undocumented experimental option - 'sock_chunk_buffer_count': 1000, # undocumented experimental option 'reconnect_backoff_ms': 50, 'reconnect_backoff_max_ms': 30000, 'max_in_flight_requests_per_connection': 5,
kafka/protocol/parser.py+16 −15 modified@@ -19,16 +19,14 @@ class KafkaProtocol: Arguments: client_id (str): identifier string to be included in each request - api_version (tuple): Optional tuple to specify api_version to use. - Currently only used to check for 0.8.2 protocol quirks, but - may be used for more in the future. + ident (str): Optional log-prefix identifier. + receive_message_max_bytes (int): Maximum allowed message frame size. + Default: 100000000 (100MB). """ - def __init__(self, client_id=None, api_version=None, ident=''): - self._ident = ident - if client_id is None: - client_id = self._gen_client_id() - self._client_id = client_id - self._api_version = api_version + def __init__(self, **config): + self._ident = config.get('ident', '') + self._client_id = config.get('client_id', self._gen_client_id()) + self._max_frame_size = config.get('receive_message_max_bytes', 100000000) self._correlation_id = 0 self._header = KafkaBytes(4) self._rbuffer = None @@ -114,6 +112,7 @@ def receive_bytes(self, data): if self._header.tell() == 4: self._header.seek(0) nbytes = Int32.decode(self._header) + self._validate_frame_size(nbytes) # reset buffer and switch state to receiving payload bytes self._rbuffer = KafkaBytes(nbytes) self._receiving = True @@ -141,6 +140,10 @@ def receive_bytes(self, data): self._reset_buffer() return responses + def _validate_frame_size(self, nbytes): + if nbytes < 0 or nbytes > self._max_frame_size: + raise Errors.InvalidReceiveError('Invalid frame length: %d' % nbytes) + def _process_response(self, read_buffer): if not self.in_flight_requests: raise Errors.CorrelationIdError('No in-flight-request found for server response') @@ -153,12 +156,10 @@ def _process_response(self, read_buffer): raise Errors.KafkaProtocolError('Unable to find response type for api %d v%d' % (header.api_key, header.api_version)) response_header = response_type.parse_header(read_buffer) recv_correlation_id = response_header.correlation_id - # 0.8.2 quirk - if (recv_correlation_id == 0 and - correlation_id != 0 and - response_type is FindCoordinatorResponse[0] and - (self._api_version == (0, 8, 2) or self._api_version is None)): - log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse' + # ignore correlation id mismatch for 0.8.2 quirk + if (recv_correlation_id == 0 and correlation_id != 0 and + response_type is FindCoordinatorResponse and header.api_version == 0): + log.warning('Kafka 0.8.2 quirk -- FindCoordinatorResponse' ' Correlation ID does not match request. This' ' should go away once at least one topic has been' ' initialized on the broker.')
test/net/test_connection.py+66 −0 modified@@ -1,3 +1,5 @@ +import socket +import struct import time from unittest.mock import MagicMock, patch @@ -6,6 +8,7 @@ from kafka.future import Future from kafka.net.selector import NetworkSelector from kafka.net.connection import KafkaConnection +from kafka.net.transport import KafkaTCPTransport from kafka.protocol.broker_version_data import BrokerVersionData from kafka.protocol.metadata import ApiVersionsRequest from kafka.protocol.parser import KafkaProtocol @@ -22,6 +25,19 @@ def connection(net): return KafkaConnection(net, node_id='test-0') +@pytest.fixture +def socketpair(): + rsock, wsock = socket.socketpair() + rsock.setblocking(False) + wsock.setblocking(False) + yield rsock, wsock + for sock in (rsock, wsock): + try: + sock.close() + except OSError: + pass + + class TestKafkaConnectionInit: def test_default_state(self, connection): assert connection.node_id == 'test-0' @@ -253,6 +269,7 @@ class TestKafkaConnectionConnectionLifecycle: def test_connection_made(self, connection): transport = MagicMock() transport.get_protocol.return_value = None + transport.getPeer.return_value = ('127.0.0.1', 9092) connection.connection_made(transport) assert connection.transport is transport assert connection.initializing is True @@ -577,3 +594,52 @@ def test_sasl_falls_back_to_peer_ip_when_transport_host_unset(self, net): captured = self._drive_handshake_with_recording_mechanism(net, conn) assert captured['host'] == '10.0.0.1' + + +class TestKafkaConnectionInvalidReceive: + """An over-large/negative frame size makes the parser raise + InvalidReceiveError from inside data_received. On a connected + connection that must tear the connection down (the transport catches + the KafkaProtocolError and aborts), rather than leaving a wedged, + half-read connection in place.""" + + def _make_connected(self, net, transport, max_frame_bytes=1000): + conn = KafkaConnection(net, node_id='test-0') + conn.transport = transport + conn.connected = True + conn.initializing = False + conn._init_future.success(True) + conn.parser = KafkaProtocol( + client_id='test', receive_message_max_bytes=max_frame_bytes) + transport.set_protocol(conn) + return conn + + def test_oversized_frame_closes_connection(self, net, socketpair): + rsock, wsock = socketpair + transport = KafkaTCPTransport(net, wsock) + conn = self._make_connected(net, transport, max_frame_bytes=1000) + assert not conn.closed + + transport.resume_reading() + # 4-byte length prefix declaring a 2000-byte frame, over the 1000 + # byte limit -> parser raises InvalidReceiveError on the header. + rsock.send(struct.pack('>i', 2000)) + net.poll(timeout_ms=1000, future=conn.close_future) + + assert conn.closed + assert conn.close_future.failed() + assert isinstance(conn.close_future.exception, Errors.InvalidReceiveError) + + def test_negative_frame_closes_connection(self, net, socketpair): + rsock, wsock = socketpair + transport = KafkaTCPTransport(net, wsock) + conn = self._make_connected(net, transport) + assert not conn.closed + + transport.resume_reading() + rsock.send(struct.pack('>i', -1)) + net.poll(timeout_ms=1000, future=conn.close_future) + + assert conn.closed + assert conn.close_future.failed() + assert isinstance(conn.close_future.exception, Errors.InvalidReceiveError)
test/protocol/test_parser.py+40 −1 modified@@ -1,6 +1,8 @@ +import struct + import pytest -from kafka.errors import KafkaProtocolError, CorrelationIdError +from kafka.errors import KafkaProtocolError, CorrelationIdError, InvalidReceiveError from kafka.protocol.parser import KafkaProtocol from kafka.protocol.metadata import ( ApiVersionsRequest, ApiVersionsResponse, @@ -265,3 +267,40 @@ def test_parser_error(): bad_bytes = b''.join([resp_bytes[0:3], b'\x0a', resp_bytes[4:]]) with pytest.raises(KafkaProtocolError): responses = parser.receive_bytes(bad_bytes) + + +def test_oversized_frame_raises_invalid_receive(): + parser = KafkaProtocol(client_id='test-parser-error', receive_message_max_bytes=1000) + # 4-byte length prefix declaring a frame larger than the configured max. + with pytest.raises(InvalidReceiveError): + parser.receive_bytes(struct.pack('>i', 1001)) + + +def test_negative_frame_size_raises_invalid_receive(): + parser = KafkaProtocol(client_id='test-parser-error', receive_message_max_bytes=1000) + # A negative length prefix can only come from a malformed/hostile frame. + with pytest.raises(InvalidReceiveError): + parser.receive_bytes(struct.pack('>i', -1)) + + +def test_frame_at_max_size_is_accepted(): + parser = KafkaProtocol(client_id='test-parser-error', receive_message_max_bytes=1000) + # The bound is inclusive: a frame exactly at the max passes validation + # and the parser transitions to receiving the payload (no exception). + parser.receive_bytes(struct.pack('>i', 1000)) + assert parser._receiving + + +@pytest.mark.parametrize('nbytes,valid', [ + (-1, False), + (0, True), + (1000, True), + (1001, False), +]) +def test_validate_frame_size_bounds(nbytes, valid): + parser = KafkaProtocol(client_id='test-parser-error', receive_message_max_bytes=1000) + if valid: + parser._validate_frame_size(nbytes) + else: + with pytest.raises(InvalidReceiveError): + parser._validate_frame_size(nbytes)
Vulnerability mechanics
No source-code context for this CVE — mechanics is only generated when we can read the actual fix diff. Without that, the four sections (root cause, attack vector, affected code, fix) would be speculation rather than analysis.
References
4News mentions
0No linked articles in our index yet.