VYPR
High severity7.5NVD Advisory· Published Jun 10, 2026

CVE-2026-10142

CVE-2026-10142

Description

A denial-of-service vulnerability in kafka-python allows crafted frame lengths to cause memory exhaustion or connection hangs.

AI Insight

LLM-synthesized narrative grounded in this CVE's description and references.

A denial-of-service vulnerability in kafka-python allows crafted frame lengths to cause memory exhaustion or connection hangs.

Vulnerability

A denial-of-service vulnerability exists in the protocol parser of kafka-python versions prior to 2.3.2. The issue stems from a lack of bounds validation on the 4-byte frame length value within the receive_bytes() function. This allows a malicious broker or a man-in-the-middle attacker to send a specially crafted frame length [3].

Exploitation

An attacker can exploit this vulnerability by sending a specially crafted 4-byte frame length value through the receive_bytes() function. This can be achieved by a malicious broker or a machine-in-the-middle attacker without requiring any specific authentication or user interaction [3].

Impact

Successful exploitation can lead to either a multi-gigabyte memory allocation, causing denial of service by exhausting system memory, or an uncaught ValueError. The latter leaves the connection in a broken state, resulting in requests hanging and consumers failing to send heartbeats, necessitating a restart [3].

Mitigation

The vulnerability is fixed in kafka-python version 2.3.2. No workarounds are disclosed in the available references. The affected versions are prior to 2.3.2 [3].

AI Insight generated on Jun 10, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.

Affected products

1

Patches

2
6e4831444f97

kafka.net: Validate SASL/SCRAM iterations (#3026)

https://github.com/dpkp/kafka-pythonDana PowersJun 3, 2026via nvd-ref
1 file changed · +6 1
  • kafka/sasl/scram.py+6 1 modified
    @@ -99,7 +99,12 @@ def process_server_first_message(self, server_first_message):
             self.auth_message += b',c=biws,r=' + self.nonce
     
             salt = base64.b64decode(params['s'].encode('utf-8'))
    -        iterations = int(params['i'])
    +        try:
    +            iterations = int(params['i'])
    +            if iterations > 1000000:
    +                raise ValueError('too many iterations')
    +        except (TypeError, ValueError):
    +            raise ValueError('Invalid value (not integer or too large) for Iteration count in server-first-message')
             self.create_salted_password(salt, iterations)
     
             self.client_key = self.hmac(self.salted_password, b'Client Key')
    
bdb46ab1fe4f

KafkaProtocol: validate network frame size; retore log-prefix; fix 0.8.2 quirk check (#3019)

https://github.com/dpkp/kafka-pythonDana PowersMay 30, 2026via nvd-ref
9 files changed · +146 24
  • kafka/admin/client.py+4 2 modified
    @@ -85,6 +85,9 @@ class KafkaAdminClient(
             max_in_flight_requests_per_connection (int): Requests are pipelined
                 to kafka brokers up to this number of maximum requests per
                 broker connection. Default: 5.
    +        receive_message_max_bytes (int): Maximum allowed network frame size.
    +            Used to avoid OOM when decoding malformed network message header.
    +            Default: 1000000.
             receive_buffer_bytes (int): The size of the TCP receive buffer
                 (SO_RCVBUF) to use when reading data. Default: None (relies on
                 system defaults). Java client defaults to 32768.
    @@ -162,11 +165,10 @@ class KafkaAdminClient(
             'reconnect_backoff_ms': 50,
             'reconnect_backoff_max_ms': 30000,
             'max_in_flight_requests_per_connection': 5,
    +        'receive_message_max_bytes': 1000000,
             'receive_buffer_bytes': None,
             'send_buffer_bytes': None,
             'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
    -        'sock_chunk_bytes': 4096,  # undocumented experimental option
    -        'sock_chunk_buffer_count': 1000,  # undocumented experimental option
             'retry_backoff_ms': 100,
             'metadata_max_age_ms': 300000,
             'client_dns_lookup': 'use_all_dns_ips',
    
  • kafka/consumer/group.py+4 2 modified
    @@ -176,6 +176,9 @@ class KafkaConsumer:
                 should be set no higher than 1/3 of that value. It can be
                 adjusted even lower to control the expected time for normal
                 rebalances. Default: 3000
    +        receive_message_max_bytes (int): Maximum allowed network frame size.
    +            Used to avoid OOM when decoding malformed network message header.
    +            Default: 1000000.
             receive_buffer_bytes (int): The size of the TCP receive buffer
                 (SO_RCVBUF) to use when reading data. Default: None (relies on
                 system defaults). The java client defaults to 32768.
    @@ -319,9 +322,8 @@ class KafkaConsumer:
             'heartbeat_interval_ms': 3000,
             'receive_buffer_bytes': None,
             'send_buffer_bytes': None,
    +        'receive_message_max_bytes': 1000000,
             'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
    -        'sock_chunk_bytes': 4096,  # undocumented experimental option
    -        'sock_chunk_buffer_count': 1000,  # undocumented experimental option
             'consumer_timeout_ms': float('inf'),
             'security_protocol': 'PLAINTEXT',
             'ssl_context': None,
    
  • kafka/errors.py+4 0 modified
    @@ -54,6 +54,10 @@ class CorrelationIdError(KafkaProtocolError):
         retriable = True
     
     
    +class InvalidReceiveError(KafkaProtocolError):
    +    pass
    +
    +
     class KafkaTimeoutError(KafkaError):
         retriable = True
     
    
  • kafka/net/connection.py+7 2 modified
    @@ -23,8 +23,9 @@ class KafkaConnection:
             'client_id': 'kafka-python-' + __version__,
             'client_software_name': 'kafka-python',
             'client_software_version': __version__,
    -        'request_timeout_ms': 30000,
             'max_in_flight_requests_per_connection': 5,
    +        'receive_message_max_bytes': 1000000,
    +        'request_timeout_ms': 30000,
             'security_protocol': 'PLAINTEXT',
             'sasl_mechanism': None,
             'sasl_plain_username': None,
    @@ -263,7 +264,11 @@ def connection_made(self, transport):
                 self.transport.set_protocol(self)
             self.initializing = True
             self.transport.resume_reading()
    -        self.parser = KafkaProtocol(client_id=self.config['client_id'])
    +        log_prefix = 'node=%s[%s:%s]' % (self.node_id, *self.transport.getPeer())
    +        self.parser = KafkaProtocol(
    +            client_id=self.config['client_id'],
    +            receive_message_max_bytes=self.config['receive_message_max_bytes'],
    +            ident=log_prefix)
             self.net.call_soon(self._check_version)
     
         def pause(self, v):
    
  • kafka/net/manager.py+1 0 modified
    @@ -26,6 +26,7 @@ class KafkaConnectionManager:
             'client_id': 'kafka-python-' + __version__,
             'client_software_name': 'kafka-python',
             'client_software_version': __version__,
    +        'receive_message_max_bytes': 1000000,
             'reconnect_backoff_ms': 50,
             'reconnect_backoff_max_ms': 30000,
             'request_timeout_ms': 30000,
    
  • kafka/producer/kafka.py+4 2 modified
    @@ -278,6 +278,9 @@ class KafkaProducer:
                 errors. Default: 100.
             request_timeout_ms (int): Client request timeout in milliseconds.
                 Default: 30000.
    +        receive_message_max_bytes (int): Maximum allowed network frame size.
    +            Used to avoid OOM when decoding malformed network message header.
    +            Default: 1000000.
             receive_buffer_bytes (int): The size of the TCP receive buffer
                 (SO_RCVBUF) to use when reading data. Default: None (relies on
                 system defaults). Java client defaults to 32768.
    @@ -409,11 +412,10 @@ class KafkaProducer:
             'client_dns_lookup': 'use_all_dns_ips',
             'retry_backoff_ms': 100,
             'request_timeout_ms': 30000,
    +        'receive_message_max_bytes': 1000000,
             'receive_buffer_bytes': None,
             'send_buffer_bytes': None,
             'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
    -        'sock_chunk_bytes': 4096,  # undocumented experimental option
    -        'sock_chunk_buffer_count': 1000,  # undocumented experimental option
             'reconnect_backoff_ms': 50,
             'reconnect_backoff_max_ms': 30000,
             'max_in_flight_requests_per_connection': 5,
    
  • kafka/protocol/parser.py+16 15 modified
    @@ -19,16 +19,14 @@ class KafkaProtocol:
     
         Arguments:
             client_id (str): identifier string to be included in each request
    -        api_version (tuple): Optional tuple to specify api_version to use.
    -            Currently only used to check for 0.8.2 protocol quirks, but
    -            may be used for more in the future.
    +        ident (str): Optional log-prefix identifier.
    +        receive_message_max_bytes (int): Maximum allowed message frame size.
    +            Default: 100000000 (100MB).
         """
    -    def __init__(self, client_id=None, api_version=None, ident=''):
    -        self._ident = ident
    -        if client_id is None:
    -            client_id = self._gen_client_id()
    -        self._client_id = client_id
    -        self._api_version = api_version
    +    def __init__(self, **config):
    +        self._ident = config.get('ident', '')
    +        self._client_id = config.get('client_id', self._gen_client_id())
    +        self._max_frame_size = config.get('receive_message_max_bytes', 100000000)
             self._correlation_id = 0
             self._header = KafkaBytes(4)
             self._rbuffer = None
    @@ -114,6 +112,7 @@ def receive_bytes(self, data):
                     if self._header.tell() == 4:
                         self._header.seek(0)
                         nbytes = Int32.decode(self._header)
    +                    self._validate_frame_size(nbytes)
                         # reset buffer and switch state to receiving payload bytes
                         self._rbuffer = KafkaBytes(nbytes)
                         self._receiving = True
    @@ -141,6 +140,10 @@ def receive_bytes(self, data):
                     self._reset_buffer()
             return responses
     
    +    def _validate_frame_size(self, nbytes):
    +        if nbytes < 0 or nbytes > self._max_frame_size:
    +            raise Errors.InvalidReceiveError('Invalid frame length: %d' % nbytes)
    +
         def _process_response(self, read_buffer):
             if not self.in_flight_requests:
                 raise Errors.CorrelationIdError('No in-flight-request found for server response')
    @@ -153,12 +156,10 @@ def _process_response(self, read_buffer):
                 raise Errors.KafkaProtocolError('Unable to find response type for api %d v%d' % (header.api_key, header.api_version))
             response_header = response_type.parse_header(read_buffer)
             recv_correlation_id = response_header.correlation_id
    -        # 0.8.2 quirk
    -        if (recv_correlation_id == 0 and
    -            correlation_id != 0 and
    -            response_type is FindCoordinatorResponse[0] and
    -            (self._api_version == (0, 8, 2) or self._api_version is None)):
    -            log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse'
    +        # ignore correlation id mismatch for 0.8.2 quirk
    +        if (recv_correlation_id == 0 and correlation_id != 0 and
    +            response_type is FindCoordinatorResponse and header.api_version == 0):
    +            log.warning('Kafka 0.8.2 quirk -- FindCoordinatorResponse'
                             ' Correlation ID does not match request. This'
                             ' should go away once at least one topic has been'
                             ' initialized on the broker.')
    
  • test/net/test_connection.py+66 0 modified
    @@ -1,3 +1,5 @@
    +import socket
    +import struct
     import time
     from unittest.mock import MagicMock, patch
     
    @@ -6,6 +8,7 @@
     from kafka.future import Future
     from kafka.net.selector import NetworkSelector
     from kafka.net.connection import KafkaConnection
    +from kafka.net.transport import KafkaTCPTransport
     from kafka.protocol.broker_version_data import BrokerVersionData
     from kafka.protocol.metadata import ApiVersionsRequest
     from kafka.protocol.parser import KafkaProtocol
    @@ -22,6 +25,19 @@ def connection(net):
         return KafkaConnection(net, node_id='test-0')
     
     
    +@pytest.fixture
    +def socketpair():
    +    rsock, wsock = socket.socketpair()
    +    rsock.setblocking(False)
    +    wsock.setblocking(False)
    +    yield rsock, wsock
    +    for sock in (rsock, wsock):
    +        try:
    +            sock.close()
    +        except OSError:
    +            pass
    +
    +
     class TestKafkaConnectionInit:
         def test_default_state(self, connection):
             assert connection.node_id == 'test-0'
    @@ -253,6 +269,7 @@ class TestKafkaConnectionConnectionLifecycle:
         def test_connection_made(self, connection):
             transport = MagicMock()
             transport.get_protocol.return_value = None
    +        transport.getPeer.return_value = ('127.0.0.1', 9092)
             connection.connection_made(transport)
             assert connection.transport is transport
             assert connection.initializing is True
    @@ -577,3 +594,52 @@ def test_sasl_falls_back_to_peer_ip_when_transport_host_unset(self, net):
     
             captured = self._drive_handshake_with_recording_mechanism(net, conn)
             assert captured['host'] == '10.0.0.1'
    +
    +
    +class TestKafkaConnectionInvalidReceive:
    +    """An over-large/negative frame size makes the parser raise
    +    InvalidReceiveError from inside data_received. On a connected
    +    connection that must tear the connection down (the transport catches
    +    the KafkaProtocolError and aborts), rather than leaving a wedged,
    +    half-read connection in place."""
    +
    +    def _make_connected(self, net, transport, max_frame_bytes=1000):
    +        conn = KafkaConnection(net, node_id='test-0')
    +        conn.transport = transport
    +        conn.connected = True
    +        conn.initializing = False
    +        conn._init_future.success(True)
    +        conn.parser = KafkaProtocol(
    +            client_id='test', receive_message_max_bytes=max_frame_bytes)
    +        transport.set_protocol(conn)
    +        return conn
    +
    +    def test_oversized_frame_closes_connection(self, net, socketpair):
    +        rsock, wsock = socketpair
    +        transport = KafkaTCPTransport(net, wsock)
    +        conn = self._make_connected(net, transport, max_frame_bytes=1000)
    +        assert not conn.closed
    +
    +        transport.resume_reading()
    +        # 4-byte length prefix declaring a 2000-byte frame, over the 1000
    +        # byte limit -> parser raises InvalidReceiveError on the header.
    +        rsock.send(struct.pack('>i', 2000))
    +        net.poll(timeout_ms=1000, future=conn.close_future)
    +
    +        assert conn.closed
    +        assert conn.close_future.failed()
    +        assert isinstance(conn.close_future.exception, Errors.InvalidReceiveError)
    +
    +    def test_negative_frame_closes_connection(self, net, socketpair):
    +        rsock, wsock = socketpair
    +        transport = KafkaTCPTransport(net, wsock)
    +        conn = self._make_connected(net, transport)
    +        assert not conn.closed
    +
    +        transport.resume_reading()
    +        rsock.send(struct.pack('>i', -1))
    +        net.poll(timeout_ms=1000, future=conn.close_future)
    +
    +        assert conn.closed
    +        assert conn.close_future.failed()
    +        assert isinstance(conn.close_future.exception, Errors.InvalidReceiveError)
    
  • test/protocol/test_parser.py+40 1 modified
    @@ -1,6 +1,8 @@
    +import struct
    +
     import pytest
     
    -from kafka.errors import KafkaProtocolError, CorrelationIdError
    +from kafka.errors import KafkaProtocolError, CorrelationIdError, InvalidReceiveError
     from kafka.protocol.parser import KafkaProtocol
     from kafka.protocol.metadata import (
         ApiVersionsRequest, ApiVersionsResponse,
    @@ -265,3 +267,40 @@ def test_parser_error():
         bad_bytes = b''.join([resp_bytes[0:3], b'\x0a', resp_bytes[4:]])
         with pytest.raises(KafkaProtocolError):
             responses = parser.receive_bytes(bad_bytes)
    +
    +
    +def test_oversized_frame_raises_invalid_receive():
    +    parser = KafkaProtocol(client_id='test-parser-error', receive_message_max_bytes=1000)
    +    # 4-byte length prefix declaring a frame larger than the configured max.
    +    with pytest.raises(InvalidReceiveError):
    +        parser.receive_bytes(struct.pack('>i', 1001))
    +
    +
    +def test_negative_frame_size_raises_invalid_receive():
    +    parser = KafkaProtocol(client_id='test-parser-error', receive_message_max_bytes=1000)
    +    # A negative length prefix can only come from a malformed/hostile frame.
    +    with pytest.raises(InvalidReceiveError):
    +        parser.receive_bytes(struct.pack('>i', -1))
    +
    +
    +def test_frame_at_max_size_is_accepted():
    +    parser = KafkaProtocol(client_id='test-parser-error', receive_message_max_bytes=1000)
    +    # The bound is inclusive: a frame exactly at the max passes validation
    +    # and the parser transitions to receiving the payload (no exception).
    +    parser.receive_bytes(struct.pack('>i', 1000))
    +    assert parser._receiving
    +
    +
    +@pytest.mark.parametrize('nbytes,valid', [
    +    (-1, False),
    +    (0, True),
    +    (1000, True),
    +    (1001, False),
    +])
    +def test_validate_frame_size_bounds(nbytes, valid):
    +    parser = KafkaProtocol(client_id='test-parser-error', receive_message_max_bytes=1000)
    +    if valid:
    +        parser._validate_frame_size(nbytes)
    +    else:
    +        with pytest.raises(InvalidReceiveError):
    +            parser._validate_frame_size(nbytes)
    

Vulnerability mechanics

No source-code context for this CVE — mechanics is only generated when we can read the actual fix diff. Without that, the four sections (root cause, attack vector, affected code, fix) would be speculation rather than analysis.

References

4

News mentions

0

No linked articles in our index yet.