CVE-2026-35554
Description
A race condition in the Apache Kafka Java producer client’s buffer pool management can cause messages to be silently delivered to incorrect topics.
When a produce batch expires due to delivery.timeout.ms while a network request containing that batch is still in flight, the batch’s ByteBuffer is prematurely deallocated and returned to the buffer pool. If a subsequent producer batch—potentially destined for a different topic—reuses this freed buffer before the original network request completes, the buffer contents may become corrupted. This can result in messages being delivered to unintended topics without any error being reported to the producer.
Data Confidentiality: Messages intended for one topic may be delivered to a different topic, potentially exposing sensitive data to consumers who have access to the destination topic but not the intended source topic.
Data Integrity: Consumers on the receiving topic may encounter unexpected or incompatible messages, leading to deserialization failures, processing errors, and corrupted downstream data.
This issue affects Apache Kafka versions ≤ 3.9.1, ≤ 4.0.1, and ≤ 4.1.1.
Kafka users are advised to upgrade to 3.9.2, 4.0.2, 4.1.2, 4.2.0, or later to address this vulnerability.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
org.apache.kafka:kafka-clientsMaven | >= 2.8.0, < 3.9.2 | 3.9.2 |
org.apache.kafka:kafka-clientsMaven | >= 4.0.0, < 4.0.2 | 4.0.2 |
org.apache.kafka:kafka-clientsMaven | >= 4.1.0, < 4.1.2 | 4.1.2 |
Patches
11df2ac5b2ba4KAFKA-19012 Fix rare producer message corruption, don't reuse buffers on the client in certain error cases (#21065)
5 files changed · +166 −38
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java+19 −0 modified@@ -72,6 +72,9 @@ private enum FinalState { ABORTED, FAILED, SUCCEEDED } private final AtomicInteger attempts = new AtomicInteger(0); private final boolean isSplitBatch; private final AtomicReference<FinalState> finalState = new AtomicReference<>(null); + private boolean bufferDeallocated = false; + // Tracks if the batch has been sent to the NetworkClient + private boolean inflight = false; int recordCount; int maxRecordSize; @@ -581,6 +584,22 @@ public boolean sequenceHasBeenReset() { return reopened; } + public boolean isBufferDeallocated() { + return bufferDeallocated; + } + + public void markBufferDeallocated() { + bufferDeallocated = true; + } + + public boolean isInflight() { + return inflight; + } + + public void setInflight(boolean inflight) { + this.inflight = inflight; + } + // VisibleForTesting OptionalInt currentLeaderEpoch() { return currentLeaderEpoch;
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java+38 −6 modified@@ -1027,14 +1027,39 @@ BuiltInPartitioner createBuiltInPartitioner(LogContext logContext, String topic, } /** - * Deallocate the record batch + * Complete and deallocate the record batch + */ + public void completeAndDeallocateBatch(ProducerBatch batch) { + completeBatch(batch); + deallocate(batch); + } + + /** + * Only perform deallocation (and not removal from the incomplete set) */ public void deallocate(ProducerBatch batch) { - incomplete.remove(batch); // Only deallocate the batch if it is not a split batch because split batch are allocated outside the // buffer pool. - if (!batch.isSplitBatch()) - free.deallocate(batch.buffer(), batch.initialCapacity()); + if (!batch.isSplitBatch()) { + if (batch.isBufferDeallocated()) { + log.warn("Skipping deallocating a batch that has already been deallocated. Batch is {}, created time is {}", batch, batch.createdMs); + } else { + batch.markBufferDeallocated(); + if (batch.isInflight()) { + // Create a fresh ByteBuffer to give to BufferPool to reuse since we can't safely call deallocate with the ProduceBatch's buffer + free.deallocate(ByteBuffer.allocate(batch.initialCapacity())); + throw new IllegalStateException("Attempting to deallocate a batch that is inflight. Batch is " + batch); + } + free.deallocate(batch.buffer(), batch.initialCapacity()); + } + } + } + + /** + * Remove from the incomplete list but do not free memory yet + */ + public void completeBatch(ProducerBatch batch) { + incomplete.remove(batch); } /** @@ -1132,7 +1157,14 @@ void abortBatches(final RuntimeException reason) { dq.remove(batch); } batch.abort(reason); - deallocate(batch); + if (batch.isInflight()) { + // KAFKA-19012: if the batch has been sent it might still be in use by the network client so we cannot allow it to be reused yet. + // We skip deallocating it now. When the request in network client completes with a response, either Sender.completeBatch() or + // Sender.failBatch() will be called with deallocateBatch=true. The buffer associated with the batch will be deallocated then. + completeBatch(batch); + } else { + completeAndDeallocateBatch(batch); + } } } @@ -1152,7 +1184,7 @@ void abortUndrainedBatches(RuntimeException reason) { } if (aborted) { batch.abort(reason); - deallocate(batch); + completeAndDeallocateBatch(batch); } } }
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java+57 −25 modified@@ -171,7 +171,12 @@ private void maybeRemoveFromInflightBatches(ProducerBatch batch) { private void maybeRemoveAndDeallocateBatch(ProducerBatch batch) { maybeRemoveFromInflightBatches(batch); - this.accumulator.deallocate(batch); + this.accumulator.completeAndDeallocateBatch(batch); + } + + private void maybeRemoveAndDeallocateBatchLater(ProducerBatch batch) { + maybeRemoveFromInflightBatches(batch); + this.accumulator.completeBatch(batch); } /** @@ -354,6 +359,24 @@ private boolean shouldHandleAuthorizationError(RuntimeException exception) { return false; } + private void failExpiredBatches(List<ProducerBatch> expiredBatches, long now, boolean deallocateBuffer) { + // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics + // for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why + // we need to reset the producer id here. + if (!expiredBatches.isEmpty()) + log.trace("Expired {} batches in accumulator", expiredBatches.size()); + for (ProducerBatch expiredBatch : expiredBatches) { + String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition + + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation. " + + "The request has not been sent, or no server response has been received yet."; + failBatch(expiredBatch, new TimeoutException(errorMessage), false, deallocateBuffer); + if (transactionManager != null && expiredBatch.inRetry()) { + // This ensures that no new batches are drained until the current in flight batches are fully resolved. + transactionManager.markSequenceUnresolved(expiredBatch); + } + } + } + private long sendProducerData(long now) { MetadataSnapshot metadataSnapshot = metadata.fetchMetadataSnapshot(); // get the list of partitions with data ready to send @@ -405,23 +428,10 @@ private long sendProducerData(long now) { accumulator.resetNextBatchExpiryTime(); List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now); List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now); - expiredBatches.addAll(expiredInflightBatches); - // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics - // for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why - // we need to reset the producer id here. - if (!expiredBatches.isEmpty()) - log.trace("Expired {} batches in accumulator", expiredBatches.size()); - for (ProducerBatch expiredBatch : expiredBatches) { - String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition - + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation. " - + "The request has not been sent, or no server response has been received yet."; - failBatch(expiredBatch, new TimeoutException(errorMessage), false); - if (transactionManager != null && expiredBatch.inRetry()) { - // This ensures that no new batches are drained until the current in flight batches are fully resolved. - transactionManager.markSequenceUnresolved(expiredBatch); - } - } + failExpiredBatches(expiredBatches, now, true); + failExpiredBatches(expiredInflightBatches, now, false); + sensors.updateProduceRequestMetrics(batches); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately @@ -524,6 +534,7 @@ private void maybeAbortBatches(RuntimeException exception) { if (accumulator.hasIncomplete()) { log.error("Aborting producer batches due to fatal error", exception); accumulator.abortBatches(exception); + inFlightBatches.clear(); } } @@ -659,6 +670,7 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, */ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId, long now, Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo) { + batch.setInflight(false); Errors error = response.error; if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && !batch.isDone() && @@ -696,7 +708,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons // tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust // its retries -- if it did, we don't know whether the sequence number was accepted or not, and // thus it is not safe to reassign the sequence. - failBatch(batch, response, batch.attempts() < this.retries); + failBatch(batch, response, batch.attempts() < this.retries, true); } if (error.exception() instanceof InvalidMetadataException) { if (error.exception() instanceof UnknownTopicOrPartitionException) { @@ -749,12 +761,16 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons if (batch.complete(response.baseOffset, response.logAppendTime)) { maybeRemoveAndDeallocateBatch(batch); + } else { + // Always safe to call deallocate because the batch keeps track of whether or not it was deallocated yet + this.accumulator.deallocate(batch); } } private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, - boolean adjustSequenceNumbers) { + boolean adjustSequenceNumbers, + boolean deallocateBatch) { final RuntimeException topLevelException; if (response.error == Errors.TOPIC_AUTHORIZATION_FAILED) topLevelException = new TopicAuthorizationException(Collections.singleton(batch.topicPartition.topic())); @@ -764,7 +780,7 @@ else if (response.error == Errors.CLUSTER_AUTHORIZATION_FAILED) topLevelException = response.error.exception(response.errorMessage); if (response.recordErrors == null || response.recordErrors.isEmpty()) { - failBatch(batch, topLevelException, adjustSequenceNumbers); + failBatch(batch, topLevelException, adjustSequenceNumbers, deallocateBatch); } else { Map<Integer, RuntimeException> recordErrorMap = new HashMap<>(response.recordErrors.size()); for (ProduceResponse.RecordError recordError : response.recordErrors) { @@ -803,23 +819,25 @@ else if (response.error == Errors.CLUSTER_AUTHORIZATION_FAILED) } }; - failBatch(batch, topLevelException, recordExceptions, adjustSequenceNumbers); + failBatch(batch, topLevelException, recordExceptions, adjustSequenceNumbers, deallocateBatch); } } private void failBatch( ProducerBatch batch, RuntimeException topLevelException, - boolean adjustSequenceNumbers + boolean adjustSequenceNumbers, + boolean deallocateBatch ) { - failBatch(batch, topLevelException, batchIndex -> topLevelException, adjustSequenceNumbers); + failBatch(batch, topLevelException, batchIndex -> topLevelException, adjustSequenceNumbers, deallocateBatch); } private void failBatch( ProducerBatch batch, RuntimeException topLevelException, Function<Integer, RuntimeException> recordExceptions, - boolean adjustSequenceNumbers + boolean adjustSequenceNumbers, + boolean deallocateBatch ) { this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); @@ -833,7 +851,20 @@ private void failBatch( log.debug("Encountered error when transaction manager was handling a failed batch", e); } } - maybeRemoveAndDeallocateBatch(batch); + if (deallocateBatch) { + maybeRemoveAndDeallocateBatch(batch); + } else { + // Fix for KAFKA-19012 + // The pooled ByteBuffer associated with this batch might still be in use by the network client so we + // cannot allow it to be reused yet. We skip deallocating it now. When the request in the network client + // completes with a response, either completeBatch() or failBatch() will be called with deallocateBatch=true. + // The buffer associated with the batch will be deallocated then. + maybeRemoveAndDeallocateBatchLater(batch); + } + } else { + if (deallocateBatch) { + this.accumulator.deallocate(batch); + } } } @@ -886,6 +917,7 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo .setIndex(tp.partition()) .setRecords(records)); recordsByPartition.put(tp, batch); + batch.setInflight(true); } String transactionalId = null;
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java+4 −4 modified@@ -405,7 +405,7 @@ public void testStressfulSituation() throws Exception { for (ProducerBatch batch : batches) { for (@SuppressWarnings("UnusedLocalVariable") Record ignored : batch.records().records()) read++; - accum.deallocate(batch); + accum.completeAndDeallocateBatch(batch); } } } @@ -669,7 +669,7 @@ public void testFlush() throws Exception { for (List<ProducerBatch> batches: results.values()) for (ProducerBatch batch: batches) - accum.deallocate(batch); + accum.completeAndDeallocateBatch(batch); // should be complete with no unsent records. accum.awaitFlushCompletion(); @@ -1575,7 +1575,7 @@ private int prepareSplitBatches(RecordAccumulator accum, long seed, int recordSi assertEquals(1, batches.values().iterator().next().size()); ProducerBatch batch = batches.values().iterator().next().get(0); int numSplitBatches = accum.splitAndReenqueue(batch); - accum.deallocate(batch); + accum.completeAndDeallocateBatch(batch); return numSplitBatches; } @@ -1599,7 +1599,7 @@ private BatchDrainedResult completeOrSplitBatches(RecordAccumulator accum, int b } else { batch.complete(0L, 0L); } - accum.deallocate(batch); + accum.completeAndDeallocateBatch(batch); } } } while (batchDrained);
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java+48 −3 modified@@ -126,6 +126,7 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -2183,7 +2184,10 @@ public void testClusterAuthorizationExceptionInProduceRequest() throws Exception public void testCancelInFlightRequestAfterFatalError() throws Exception { final long producerId = 343434L; TransactionManager transactionManager = createTransactionManager(); - setupWithTransactionState(transactionManager); + long totalSize = 1024 * 1024; + String metricGrpName = "producer-custom-metrics"; + MatchingBufferPool pool = new MatchingBufferPool(totalSize, batchSize, metrics, time, metricGrpName); + setupWithTransactionState(transactionManager, false, pool); prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); @@ -2195,6 +2199,8 @@ public void testCancelInFlightRequestAfterFatalError() throws Exception { Future<RecordMetadata> future2 = appendToAccumulator(tp1); sender.runOnce(); + assertFalse(pool.allMatch()); + client.respond( body -> body instanceof ProduceRequest && RequestTestUtils.hasIdempotentRecords((ProduceRequest) body), produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0)); @@ -2205,12 +2211,14 @@ public void testCancelInFlightRequestAfterFatalError() throws Exception { sender.runOnce(); assertFutureFailure(future2, ClusterAuthorizationException.class); + assertFalse(pool.allMatch(), "Batch should not be deallocated before the response is received"); // Should be fine if the second response eventually returns client.respond( body -> body instanceof ProduceRequest && RequestTestUtils.hasIdempotentRecords((ProduceRequest) body), produceResponse(tp1, 0, Errors.NONE, 0)); sender.runOnce(); + assertTrue(pool.allMatch(), "The batch should have been de-allocated"); } @Test @@ -2436,12 +2444,15 @@ private void testSplitBatchAndSend(TransactionManager txnManager, assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey()); Node node = new Node(Integer.parseInt(id), "localhost", 0); assertEquals(1, client.inFlightRequestCount()); + ProducerBatch inflightBatch = sender.inFlightBatches(tpId.topicPartition()).get(0); + assertTrue(inflightBatch.isInflight(), "Batch should be marked inflight after being sent"); assertTrue(client.isReady(node, time.milliseconds()), "Client ready status should be true"); Map<TopicIdPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>(); responseMap.put(tpId, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE)); client.respond(new ProduceResponse(responseMap)); sender.runOnce(); // split and reenqueue + assertFalse(inflightBatch.isInflight(), "Batch should be marked as not inflight after being split and re-enqueued"); assertEquals(2, txnManager.sequenceNumber(tpId.topicPartition()), "The next sequence should be 2"); // The compression ratio should have been improved once. assertEquals(CompressionType.GZIP.rate - CompressionRatioEstimator.COMPRESSION_RATIO_IMPROVING_STEP, @@ -2499,14 +2510,16 @@ public void testNoDoubleDeallocation() throws Exception { sender.runOnce(); // send request assertEquals(1, client.inFlightRequestCount()); assertEquals(1, sender.inFlightBatches(tp0).size()); + assertFalse(sender.inFlightBatches(tp0).get(0).isBufferDeallocated(), "Buffer not deallocated yet"); + ProducerBatch inflightBatch = sender.inFlightBatches(tp0).get(0); time.sleep(REQUEST_TIMEOUT); assertFalse(pool.allMatch()); - sender.runOnce(); // expire the batch + sender.runOnce(); // times out the request assertTrue(request1.isDone()); + assertTrue(inflightBatch.isBufferDeallocated(), "Buffer should be deallocated after request timeout"); assertTrue(pool.allMatch(), "The batch should have been de-allocated"); - assertTrue(pool.allMatch()); sender.runOnce(); assertTrue(pool.allMatch(), "The batch should have been de-allocated"); @@ -3591,6 +3604,38 @@ public void testWhenProduceResponseReturnsWithALeaderShipChangeErrorAndNewLeader } } + @Test + public void testNoBufferReuseWhenBatchExpires() throws Exception { + long totalSize = 1024 * 1024; + try (Metrics m = new Metrics()) { + BufferPool pool = new BufferPool(totalSize, batchSize, m, time, "producer-internal-metrics"); + + // Allocate and store a poolable buffer, then return it to the pool so the Sender can pick it up + ByteBuffer buffer = pool.allocate(batchSize, 0); + pool.deallocate(buffer); + + setupWithTransactionState(null, false, pool); + appendToAccumulator(tp0, 0L, "key", "value"); + sender.runOnce(); // connect + sender.runOnce(); // send produce request + + assertEquals(1, client.inFlightRequestCount()); + assertEquals(1, sender.inFlightBatches(tp0).size()); + + ProducerBatch batch = sender.inFlightBatches(tp0).get(0); + // Validate the backing array of the buffer is the same as the pooled one from the start + assertSame(buffer.array(), batch.records().buffer().array(), "Sender should have allocated the same buffer we created"); + + time.sleep(DELIVERY_TIMEOUT_MS + 100); + sender.runOnce(); + + ByteBuffer newBuffer = pool.allocate(batchSize, 0); + + // TEST buffer should not be reused + assertNotSame(buffer.array(), newBuffer.array(), "Buffer should not be reused"); + } + } + private void verifyErrorMessage(ProduceResponse response, String expectedMessage) throws Exception { Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key", "value");
Vulnerability mechanics
Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
11- github.com/advisories/GHSA-5qcv-4rpc-jp93ghsaADVISORY
- nvd.nist.gov/vuln/detail/CVE-2026-35554ghsaADVISORY
- www.openwall.com/lists/oss-security/2026/04/07/6nvdWEB
- github.com/apache/kafka/commit/1df2ac5b2ba4d1b5ed54b895ff6fb9539303ccb5ghsaWEB
- github.com/apache/kafka/pull/21065ghsaWEB
- github.com/apache/kafka/pull/21285ghsaWEB
- github.com/apache/kafka/pull/21286ghsaWEB
- github.com/apache/kafka/pull/21287ghsaWEB
- github.com/apache/kafka/pull/21288ghsaWEB
- issues.apache.org/jira/browse/KAFKA-19012nvdWEB
- lists.apache.org/thread/f07x7j8ovyqhjd1to25jsnqbm6wj01d6nvdWEB
News mentions
0No linked articles in our index yet.