Java Deserialization vulnerability in Spring-Kafka When Improperly Configured
Description
Spring for Apache Kafka vulnerable to deserialization attack via malicious headers when unusual configuration is applied.
AI Insight
LLM-synthesized narrative grounded in this CVE's description and references.
Spring for Apache Kafka vulnerable to deserialization attack via malicious headers when unusual configuration is applied.
CVE-2023-34040 is a deserialization vulnerability in Spring for Apache Kafka versions 3.0.9 and earlier, and 2.9.10 and earlier. The root cause is that when an application does not configure an ErrorHandlingDeserializer for the key or value of a record, and explicitly sets the container properties checkDeserExWhenKeyNull and/or checkDeserExWhenValueNull to true, the container will attempt to deserialize exception headers that may contain a malicious serialized object [1]. By default, these properties are false, and the container only processes headers if an ErrorHandlingDeserializer is configured, which removes such malicious headers [1].
To exploit this vulnerability, an attacker must have the ability to publish messages to a Kafka topic that the vulnerable application consumes. The attacker then crafts a malicious serialized object and places it in one of the deserialization exception record headers. The attack requires the unusual configuration described above, meaning the application must have explicitly enabled the checkDeserExWhenKeyNull or checkDeserExWhenValueNull properties and omitted the ErrorHandlingDeserializer [1].
Successful exploitation could allow an attacker to achieve arbitrary code execution through deserialization of the malicious object, as the container processes the header without proper sanitization [1]. The impact is limited to applications that meet all three preconditions: no ErrorHandlingDeserializer, explicit enabling of the check properties, and allowing untrusted sources to publish to the topic [1].
Mitigation is straightforward: users should either configure an ErrorHandlingDeserializer for the key and/or value of the record, or leave the checkDeserExWhenKeyNull and checkDeserExWhenValueNull properties at their default false values. The Spring for Apache Kafka project has released patches that introduce a private header type for deserialization exceptions, preventing external manipulation of these headers [3][4]. Users are advised to upgrade to versions 3.0.10 or later, or 2.9.11 or later [1].
AI Insight generated on May 20, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
org.springframework.kafka:spring-kafkaMaven | >= 2.8.1, < 2.9.11 | 2.9.11 |
org.springframework.kafka:spring-kafkaMaven | >= 3.0.0, < 3.0.10 | 3.0.10 |
Affected products
2- Range: 2.8.x
Patches
225ac793a7872Private Header Type for DeserializationExceptions
12 files changed · +268 −46
spring-kafka-docs/src/main/asciidoc/kafka.adoc+12 −7 modified@@ -4655,10 +4655,15 @@ void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<M Thing thing = in.get(i); if (thing == null && headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) { - DeserializationException deserEx = ListenerUtils.byteArrayToDeserializationException(this.logger, - (byte[]) headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)); - if (deserEx != null) { - logger.error(deserEx, "Record at index " + i + " could not be deserialized"); + try { + DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger, + headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)); + if (deserEx != null) { + logger.error(deserEx, "Record at index " + i + " could not be deserialized"); + } + } + catch (Exception ex) { + logger.error(ex, "Record at index " + i + " could not be deserialized"); } throw new BatchListenerFailedException("Deserialization", deserEx, i); } @@ -4668,9 +4673,9 @@ void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<M ---- ==== -`ListenerUtils.byteArrayToDeserializationException()` can be used to convert the header to a `DeserializationException`. +`SerializationUtils.byteArrayToDeserializationException()` can be used to convert the header to a `DeserializationException`. -When consuming `List<ConsumerRecord<?, ?>`, `ListenerUtils.getExceptionFromHeader()` is used instead: +When consuming `List<ConsumerRecord<?, ?>`, `SerializationUtils.getExceptionFromHeader()` is used instead: ==== [source, java] @@ -4680,7 +4685,7 @@ void listen(List<ConsumerRecord<String, Thing>> in) { for (int i = 0; i < in.size(); i++) { ConsumerRecord<String, Thing> rec = in.get(i); if (rec.value() == null) { - DeserializationException deserEx = ListenerUtils.getExceptionFromHeader(rec, + DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger); if (deserEx != null) { logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java+2 −2 modified@@ -486,9 +486,9 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume if (consumer != null && this.verifyPartition) { tp = checkPartition(tp, consumer); } - DeserializationException vDeserEx = ListenerUtils.getExceptionFromHeader(record, + DeserializationException vDeserEx = SerializationUtils.getExceptionFromHeader(record, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger); - DeserializationException kDeserEx = ListenerUtils.getExceptionFromHeader(record, + DeserializationException kDeserEx = SerializationUtils.getExceptionFromHeader(record, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, this.logger); Headers headers = new RecordHeaders(record.headers().toArray()); addAndEnhanceHeaders(record, exception, vDeserEx, kDeserEx, headers);
spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java+3 −2 modified@@ -2929,8 +2929,9 @@ private void fixStackTrace(Exception ex, Exception toHandle) { } } - public void checkDeser(final ConsumerRecord<K, V> record, String headerName) { - DeserializationException exception = ListenerUtils.getExceptionFromHeader(record, headerName, this.logger); + public void checkDeser(final ConsumerRecord<K, V> cRecord, String headerName) { + DeserializationException exception = SerializationUtils.getExceptionFromHeader(cRecord, headerName, + this.logger); if (exception != null) { /* * Wrapping in a LEFE is not strictly correct, but required for backwards compatibility.
spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java+9 −15 modified@@ -26,13 +26,11 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.header.internals.RecordHeaders; import org.springframework.core.log.LogAccessor; import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.serializer.DeserializationException; +import org.springframework.kafka.support.serializer.SerializationUtils; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.backoff.BackOff; @@ -96,23 +94,15 @@ else if (listener instanceof GenericMessageListener) { * @param logger the logger for logging errors. * @return the exception or null. * @since 2.3 + * @deprecated in favor of + * {@link SerializationUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)}. */ + @Deprecated @Nullable public static DeserializationException getExceptionFromHeader(final ConsumerRecord<?, ?> record, String headerName, LogAccessor logger) { - Header header = record.headers().lastHeader(headerName); - if (header != null) { - byte[] value = header.value(); - DeserializationException exception = byteArrayToDeserializationException(logger, value); - if (exception != null) { - Headers headers = new RecordHeaders(record.headers().toArray()); - headers.remove(headerName); - exception.setHeaders(headers); - } - return exception; - } - return null; + return SerializationUtils.getExceptionFromHeader(record, headerName, logger); } /** @@ -122,7 +112,11 @@ public static DeserializationException getExceptionFromHeader(final ConsumerReco * @param value the bytes. * @return the exception or null if deserialization fails. * @since 2.8.1 + * @deprecated in favor of + * {@link SerializationUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)} or + * {@link SerializationUtils#byteArrayToDeserializationException(LogAccessor, org.apache.kafka.common.header.Header)}. */ + @Deprecated @Nullable public static DeserializationException byteArrayToDeserializationException(LogAccessor logger, byte[] value) { try {
spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java+4 −5 modified@@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,7 +48,6 @@ import org.springframework.kafka.listener.ConsumerSeekAware; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.GenericMessageListenerContainer; -import org.springframework.kafka.listener.ListenerUtils; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.TopicPartitionOffset; @@ -531,7 +530,7 @@ protected Exception checkForErrors(ConsumerRecord<K, R> record) { * Return a {@link DeserializationException} if either the key or value failed * deserialization; null otherwise. If you need to determine whether it was the key or * value, call - * {@link ListenerUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)} + * {@link SerializationUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)} * with {@link SerializationUtils#KEY_DESERIALIZER_EXCEPTION_HEADER} and * {@link SerializationUtils#VALUE_DESERIALIZER_EXCEPTION_HEADER} instead. * @param record the record. @@ -541,14 +540,14 @@ protected Exception checkForErrors(ConsumerRecord<K, R> record) { */ @Nullable public static DeserializationException checkDeserialization(ConsumerRecord<?, ?> record, LogAccessor logger) { - DeserializationException exception = ListenerUtils.getExceptionFromHeader(record, + DeserializationException exception = SerializationUtils.getExceptionFromHeader(record, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger); if (exception != null) { logger.error(exception, () -> "Reply value deserialization failed for " + record.topic() + "-" + record.partition() + "@" + record.offset()); return exception; } - exception = ListenerUtils.getExceptionFromHeader(record, + exception = SerializationUtils.getExceptionFromHeader(record, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, logger); if (exception != null) { logger.error(exception, () -> "Reply key deserialization failed for " + record.topic() + "-"
spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DeserializationExceptionHeader.java+40 −0 added@@ -0,0 +1,40 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.serializer; + +import org.apache.kafka.common.header.internals.RecordHeader; + +/** + * A package-protected header used to contain serialized + * {@link DeserializationException}s. Only headers of this type will be examined for + * deserialization. + * + * @author Gary Russell + * @since 2.9.11 + */ +class DeserializationExceptionHeader extends RecordHeader { + + /** + * Construct an instance with the provided properties. + * @param key the key. + * @param value the value; + */ + DeserializationExceptionHeader(String key, byte[] value) { + super(key, value); + } + +}
spring-kafka/src/main/java/org/springframework/kafka/support/serializer/SerializationUtils.java+83 −3 modified@@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,16 +16,24 @@ package org.springframework.kafka.support.serializer; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.io.ObjectStreamClass; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.function.BiFunction; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.support.KafkaUtils; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -166,10 +174,82 @@ data, isForKeyArg, new RuntimeException("Could not serialize type " } } headers.add( - new RecordHeader(isForKeyArg + new DeserializationExceptionHeader(isForKeyArg ? KEY_DESERIALIZER_EXCEPTION_HEADER : VALUE_DESERIALIZER_EXCEPTION_HEADER, stream.toByteArray())); } + /** + * Extract a {@link DeserializationException} from the supplied header name, if + * present. + * @param record the consumer record. + * @param headerName the header name. + * @param logger the logger for logging errors. + * @return the exception or null. + * @since 2.9.11 + */ + @Nullable + public static DeserializationException getExceptionFromHeader(final ConsumerRecord<?, ?> record, + String headerName, LogAccessor logger) { + + Header header = record.headers().lastHeader(headerName); + if (!(header instanceof DeserializationExceptionHeader)) { + logger.warn( + () -> String.format("Foreign deserialization exception header in (%s) ignored; possible attack?", + KafkaUtils.format(record))); + return null; + } + if (header != null) { + byte[] value = header.value(); + DeserializationException exception = byteArrayToDeserializationException(logger, header); + if (exception != null) { + Headers headers = new RecordHeaders(record.headers().toArray()); + headers.remove(headerName); + exception.setHeaders(headers); + } + return exception; + } + return null; + } + + /** + * Convert a byte array containing a serialized {@link DeserializationException} to the + * {@link DeserializationException}. + * @param logger a log accessor to log errors. + * @param header the header. + * @return the exception or null if deserialization fails. + * @since 2.9.11 + */ + @Nullable + public static DeserializationException byteArrayToDeserializationException(LogAccessor logger, Header header) { + + if (!(header instanceof DeserializationExceptionHeader)) { + throw new IllegalStateException("Foreign deserialization exception header ignored; possible attack?"); + } + try { + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(header.value())) { + + boolean first = true; + + @Override + protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { + if (this.first) { + this.first = false; + Assert.state(desc.getName().equals(DeserializationException.class.getName()), + "Header does not contain a DeserializationException"); + } + return super.resolveClass(desc); + } + + + }; + return (DeserializationException) ois.readObject(); + } + catch (IOException | ClassNotFoundException | ClassCastException e) { + logger.error(e, "Failed to deserialize a deserialization exception"); + return null; + } + } + }
spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchAdapterConversionErrorsTests.java+1 −2 modified@@ -37,7 +37,6 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.BatchListenerFailedException; import org.springframework.kafka.listener.ListenerExecutionFailedException; -import org.springframework.kafka.listener.ListenerUtils; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.ConversionException; @@ -74,7 +73,7 @@ void testNullInList(@Autowired KafkaListenerEndpointRegistry registry, @Autowire .extracting("index") .isEqualTo(1); assertThat(listener.values).containsExactly(new Foo("baz"), null, new Foo("qux")); - DeserializationException vDeserEx = ListenerUtils.getExceptionFromHeader(junkRecord, + DeserializationException vDeserEx = SerializationUtils.getExceptionFromHeader(junkRecord, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, null); assertThat(vDeserEx).isNotNull(); assertThat(vDeserEx.getData()).isEqualTo("JUNK".getBytes());
spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java+13 −7 modified@@ -71,6 +71,7 @@ import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.support.serializer.DeserializationException; +import org.springframework.kafka.support.serializer.SerializationTestUtils; import org.springframework.kafka.support.serializer.SerializationUtils; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.util.concurrent.ListenableFuture; @@ -172,8 +173,10 @@ void valueHeaderStripped() { KafkaOperations<?, ?> template = mock(KafkaOperations.class); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); Headers headers = new RecordHeaders(); - headers.add(new RecordHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, header(false))); - headers.add(new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true))); + headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, + header(false))); + headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, + header(true))); Headers custom = new RecordHeaders(); custom.add(new RecordHeader("foo", "bar".getBytes())); recoverer.setHeadersFunction((rec, ex) -> custom); @@ -202,7 +205,8 @@ void keyHeaderStripped() { KafkaOperations<?, ?> template = mock(KafkaOperations.class); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); Headers headers = new RecordHeaders(); - headers.add(new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true))); + headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, + header(true))); SettableListenableFuture future = new SettableListenableFuture(); future.set(new Object()); willReturn(future).given(template).send(any(ProducerRecord.class)); @@ -222,8 +226,8 @@ void keyDeserOnly() { DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); Headers headers = new RecordHeaders(); DeserializationException deserEx = createDeserEx(true); - headers.add( - new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true, deserEx))); + headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, + header(true, deserEx))); SettableListenableFuture future = new SettableListenableFuture(); future.set(new Object()); willReturn(future).given(template).send(any(ProducerRecord.class)); @@ -245,8 +249,10 @@ void headersNotStripped() { DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); recoverer.setRetainExceptionHeader(true); Headers headers = new RecordHeaders(); - headers.add(new RecordHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, header(false))); - headers.add(new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true))); + headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, + header(false))); + headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, + header(true))); SettableListenableFuture future = new SettableListenableFuture(); future.set(new Object()); willReturn(future).given(template).send(any(ProducerRecord.class));
spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java+3 −3 modified@@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -128,8 +128,8 @@ public String deserialize(String topic, Headers headers, byte[] data) { ErrorHandlingDeserializer<String> ehd = new ErrorHandlingDeserializer<>(new MyDes()); Headers headers = new RecordHeaders(); ehd.deserialize("foo", headers, new byte[1]); - DeserializationException dex = ListenerUtils.byteArrayToDeserializationException(null, - headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER).value()); + DeserializationException dex = SerializationUtils.byteArrayToDeserializationException(null, + headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)); assertThat(dex.getMessage()) .contains("Could not serialize") .contains("original exception message");
spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationTestUtils.java+35 −0 added@@ -0,0 +1,35 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.serializer; + +import org.apache.kafka.common.header.Header; + +/** + * @author Gary Russell + * @since 2.9.11 + * + */ +public final class SerializationTestUtils { + + private SerializationTestUtils() { + } + + public static Header deserializationHeader(String key, byte[] value) { + return new DeserializationExceptionHeader(key, value); + } + +}
spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationUtilsTests.java+63 −0 added@@ -0,0 +1,63 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.serializer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +import java.util.List; +import java.util.function.Supplier; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import org.springframework.core.log.LogAccessor; + +/** + * @author Gary Russell + * @since 2.9.11 + * + */ +public class SerializationUtilsTests { + + @Test + void foreignDeserEx() { + RecordHeaders headers = new RecordHeaders( + List.of(new RecordHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, "junk".getBytes()))); + ConsumerRecord<String, String> rec = mock(ConsumerRecord.class); + willReturn(headers).given(rec).headers(); + given(rec.topic()).willReturn("foo"); + given(rec.partition()).willReturn(1); + given(rec.offset()).willReturn(0L); + LogAccessor logger = spy(new LogAccessor(LogFactory.getLog(getClass()))); + ArgumentCaptor<Supplier<String>> captor = ArgumentCaptor.forClass(Supplier.class); + willAnswer(inv -> null).given(logger).warn(captor.capture()); + assertThat(SerializationUtils.getExceptionFromHeader(rec, + SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger)).isNull(); + assertThat(captor.getValue().get()) + .isEqualTo("Foreign deserialization exception header in (foo-1@0) ignored; possible attack?"); + } + +}
eb779679812fPrivate Header Type for DeserializationExceptions
12 files changed · +267 −45
spring-kafka-docs/src/main/asciidoc/kafka.adoc+12 −7 modified@@ -4671,10 +4671,15 @@ void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<M Thing thing = in.get(i); if (thing == null && headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) { - DeserializationException deserEx = ListenerUtils.byteArrayToDeserializationException(this.logger, - (byte[]) headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)); - if (deserEx != null) { - logger.error(deserEx, "Record at index " + i + " could not be deserialized"); + try { + DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger, + headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)); + if (deserEx != null) { + logger.error(deserEx, "Record at index " + i + " could not be deserialized"); + } + } + catch (Exception ex) { + logger.error(ex, "Record at index " + i + " could not be deserialized"); } throw new BatchListenerFailedException("Deserialization", deserEx, i); } @@ -4684,9 +4689,9 @@ void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<M ---- ==== -`ListenerUtils.byteArrayToDeserializationException()` can be used to convert the header to a `DeserializationException`. +`SerializationUtils.byteArrayToDeserializationException()` can be used to convert the header to a `DeserializationException`. -When consuming `List<ConsumerRecord<?, ?>`, `ListenerUtils.getExceptionFromHeader()` is used instead: +When consuming `List<ConsumerRecord<?, ?>`, `SerializationUtils.getExceptionFromHeader()` is used instead: ==== [source, java] @@ -4696,7 +4701,7 @@ void listen(List<ConsumerRecord<String, Thing>> in) { for (int i = 0; i < in.size(); i++) { ConsumerRecord<String, Thing> rec = in.get(i); if (rec.value() == null) { - DeserializationException deserEx = ListenerUtils.getExceptionFromHeader(rec, + DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger); if (deserEx != null) { logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java+2 −2 modified@@ -506,9 +506,9 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume if (consumer != null && this.verifyPartition) { tp = checkPartition(tp, consumer); } - DeserializationException vDeserEx = ListenerUtils.getExceptionFromHeader(record, + DeserializationException vDeserEx = SerializationUtils.getExceptionFromHeader(record, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger); - DeserializationException kDeserEx = ListenerUtils.getExceptionFromHeader(record, + DeserializationException kDeserEx = SerializationUtils.getExceptionFromHeader(record, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, this.logger); Headers headers = new RecordHeaders(record.headers().toArray()); addAndEnhanceHeaders(record, exception, vDeserEx, kDeserEx, headers);
spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java+2 −1 modified@@ -2985,7 +2985,8 @@ private void fixStackTrace(Exception ex, Exception toHandle) { } public void checkDeser(final ConsumerRecord<K, V> cRecord, String headerName) { - DeserializationException exception = ListenerUtils.getExceptionFromHeader(cRecord, headerName, this.logger); + DeserializationException exception = SerializationUtils.getExceptionFromHeader(cRecord, headerName, + this.logger); if (exception != null) { /* * Wrapping in a LEFE is not strictly correct, but required for backwards compatibility.
spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java+9 −15 modified@@ -24,12 +24,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.header.internals.RecordHeaders; import org.springframework.core.log.LogAccessor; import org.springframework.kafka.support.serializer.DeserializationException; +import org.springframework.kafka.support.serializer.SerializationUtils; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.backoff.BackOff; @@ -92,23 +90,15 @@ else if (listener instanceof GenericMessageListener) { * @param logger the logger for logging errors. * @return the exception or null. * @since 2.3 + * @deprecated in favor of + * {@link SerializationUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)}. */ + @Deprecated @Nullable public static DeserializationException getExceptionFromHeader(final ConsumerRecord<?, ?> record, String headerName, LogAccessor logger) { - Header header = record.headers().lastHeader(headerName); - if (header != null) { - byte[] value = header.value(); - DeserializationException exception = byteArrayToDeserializationException(logger, value); - if (exception != null) { - Headers headers = new RecordHeaders(record.headers().toArray()); - headers.remove(headerName); - exception.setHeaders(headers); - } - return exception; - } - return null; + return SerializationUtils.getExceptionFromHeader(record, headerName, logger); } /** @@ -118,7 +108,11 @@ public static DeserializationException getExceptionFromHeader(final ConsumerReco * @param value the bytes. * @return the exception or null if deserialization fails. * @since 2.8.1 + * @deprecated in favor of + * {@link SerializationUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)} or + * {@link SerializationUtils#byteArrayToDeserializationException(LogAccessor, org.apache.kafka.common.header.Header)}. */ + @Deprecated @Nullable public static DeserializationException byteArrayToDeserializationException(LogAccessor logger, byte[] value) { try {
spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java+4 −5 modified@@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,7 +48,6 @@ import org.springframework.kafka.listener.ConsumerSeekAware; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.GenericMessageListenerContainer; -import org.springframework.kafka.listener.ListenerUtils; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.TopicPartitionOffset; @@ -558,7 +557,7 @@ protected Exception checkForErrors(ConsumerRecord<K, R> record) { * Return a {@link DeserializationException} if either the key or value failed * deserialization; null otherwise. If you need to determine whether it was the key or * value, call - * {@link ListenerUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)} + * {@link SerializationUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)} * with {@link SerializationUtils#KEY_DESERIALIZER_EXCEPTION_HEADER} and * {@link SerializationUtils#VALUE_DESERIALIZER_EXCEPTION_HEADER} instead. * @param record the record. @@ -568,14 +567,14 @@ protected Exception checkForErrors(ConsumerRecord<K, R> record) { */ @Nullable public static DeserializationException checkDeserialization(ConsumerRecord<?, ?> record, LogAccessor logger) { - DeserializationException exception = ListenerUtils.getExceptionFromHeader(record, + DeserializationException exception = SerializationUtils.getExceptionFromHeader(record, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger); if (exception != null) { logger.error(exception, () -> "Reply value deserialization failed for " + record.topic() + "-" + record.partition() + "@" + record.offset()); return exception; } - exception = ListenerUtils.getExceptionFromHeader(record, + exception = SerializationUtils.getExceptionFromHeader(record, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, logger); if (exception != null) { logger.error(exception, () -> "Reply key deserialization failed for " + record.topic() + "-"
spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DeserializationExceptionHeader.java+40 −0 added@@ -0,0 +1,40 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.serializer; + +import org.apache.kafka.common.header.internals.RecordHeader; + +/** + * A package-protected header used to contain serialized + * {@link DeserializationException}s. Only headers of this type will be examined for + * deserialization. + * + * @author Gary Russell + * @since 2.9.11 + */ +class DeserializationExceptionHeader extends RecordHeader { + + /** + * Construct an instance with the provided properties. + * @param key the key. + * @param value the value; + */ + DeserializationExceptionHeader(String key, byte[] value) { + super(key, value); + } + +}
spring-kafka/src/main/java/org/springframework/kafka/support/serializer/SerializationUtils.java+83 −3 modified@@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,16 +16,24 @@ package org.springframework.kafka.support.serializer; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.io.ObjectStreamClass; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.function.BiFunction; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.support.KafkaUtils; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -166,10 +174,82 @@ data, isForKeyArg, new RuntimeException("Could not serialize type " } } headers.add( - new RecordHeader(isForKeyArg + new DeserializationExceptionHeader(isForKeyArg ? KEY_DESERIALIZER_EXCEPTION_HEADER : VALUE_DESERIALIZER_EXCEPTION_HEADER, stream.toByteArray())); } + /** + * Extract a {@link DeserializationException} from the supplied header name, if + * present. + * @param record the consumer record. + * @param headerName the header name. + * @param logger the logger for logging errors. + * @return the exception or null. + * @since 2.9.11 + */ + @Nullable + public static DeserializationException getExceptionFromHeader(final ConsumerRecord<?, ?> record, + String headerName, LogAccessor logger) { + + Header header = record.headers().lastHeader(headerName); + if (!(header instanceof DeserializationExceptionHeader)) { + logger.warn( + () -> String.format("Foreign deserialization exception header in (%s) ignored; possible attack?", + KafkaUtils.format(record))); + return null; + } + if (header != null) { + byte[] value = header.value(); + DeserializationException exception = byteArrayToDeserializationException(logger, header); + if (exception != null) { + Headers headers = new RecordHeaders(record.headers().toArray()); + headers.remove(headerName); + exception.setHeaders(headers); + } + return exception; + } + return null; + } + + /** + * Convert a byte array containing a serialized {@link DeserializationException} to the + * {@link DeserializationException}. + * @param logger a log accessor to log errors. + * @param header the header. + * @return the exception or null if deserialization fails. + * @since 2.9.11 + */ + @Nullable + public static DeserializationException byteArrayToDeserializationException(LogAccessor logger, Header header) { + + if (!(header instanceof DeserializationExceptionHeader)) { + throw new IllegalStateException("Foreign deserialization exception header ignored; possible attack?"); + } + try { + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(header.value())) { + + boolean first = true; + + @Override + protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { + if (this.first) { + this.first = false; + Assert.state(desc.getName().equals(DeserializationException.class.getName()), + "Header does not contain a DeserializationException"); + } + return super.resolveClass(desc); + } + + + }; + return (DeserializationException) ois.readObject(); + } + catch (IOException | ClassNotFoundException | ClassCastException e) { + logger.error(e, "Failed to deserialize a deserialization exception"); + return null; + } + } + }
spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchAdapterConversionErrorsTests.java+1 −2 modified@@ -37,7 +37,6 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.BatchListenerFailedException; import org.springframework.kafka.listener.ListenerExecutionFailedException; -import org.springframework.kafka.listener.ListenerUtils; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.ConversionException; @@ -74,7 +73,7 @@ void testNullInList(@Autowired KafkaListenerEndpointRegistry registry, @Autowire .extracting("index") .isEqualTo(1); assertThat(listener.values).containsExactly(new Foo("baz"), null, new Foo("qux")); - DeserializationException vDeserEx = ListenerUtils.getExceptionFromHeader(junkRecord, + DeserializationException vDeserEx = SerializationUtils.getExceptionFromHeader(junkRecord, SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, null); assertThat(vDeserEx).isNotNull(); assertThat(vDeserEx.getData()).isEqualTo("JUNK".getBytes());
spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java+13 −7 modified@@ -73,6 +73,7 @@ import org.springframework.kafka.support.SendResult; import org.springframework.kafka.support.converter.ConversionException; import org.springframework.kafka.support.serializer.DeserializationException; +import org.springframework.kafka.support.serializer.SerializationTestUtils; import org.springframework.kafka.support.serializer.SerializationUtils; import org.springframework.kafka.test.utils.KafkaTestUtils; @@ -172,8 +173,10 @@ void valueHeaderStripped() { KafkaOperations<?, ?> template = mock(KafkaOperations.class); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); Headers headers = new RecordHeaders(); - headers.add(new RecordHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, header(false))); - headers.add(new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true))); + headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, + header(false))); + headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, + header(true))); Headers custom = new RecordHeaders(); custom.add(new RecordHeader("foo", "bar".getBytes())); recoverer.setHeadersFunction((rec, ex) -> custom); @@ -202,7 +205,8 @@ void keyHeaderStripped() { KafkaOperations<?, ?> template = mock(KafkaOperations.class); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); Headers headers = new RecordHeaders(); - headers.add(new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true))); + headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, + header(true))); CompletableFuture future = new CompletableFuture(); future.complete(new Object()); willReturn(future).given(template).send(any(ProducerRecord.class)); @@ -222,8 +226,8 @@ void keyDeserOnly() { DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); Headers headers = new RecordHeaders(); DeserializationException deserEx = createDeserEx(true); - headers.add( - new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true, deserEx))); + headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, + header(true, deserEx))); CompletableFuture future = new CompletableFuture(); future.complete(new Object()); willReturn(future).given(template).send(any(ProducerRecord.class)); @@ -245,8 +249,10 @@ void headersNotStripped() { DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); recoverer.setRetainExceptionHeader(true); Headers headers = new RecordHeaders(); - headers.add(new RecordHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, header(false))); - headers.add(new RecordHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true))); + headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, + header(false))); + headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, + header(true))); CompletableFuture future = new CompletableFuture(); future.complete(new Object()); willReturn(future).given(template).send(any(ProducerRecord.class));
spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java+3 −3 modified@@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -129,8 +129,8 @@ public String deserialize(String topic, Headers headers, byte[] data) { ErrorHandlingDeserializer<String> ehd = new ErrorHandlingDeserializer<>(new MyDes()); Headers headers = new RecordHeaders(); ehd.deserialize("foo", headers, new byte[1]); - DeserializationException dex = ListenerUtils.byteArrayToDeserializationException(null, - headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER).value()); + DeserializationException dex = SerializationUtils.byteArrayToDeserializationException(null, + headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)); assertThat(dex.getCause().getMessage()) .contains("Could not serialize") .contains("original exception message");
spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationTestUtils.java+35 −0 added@@ -0,0 +1,35 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.serializer; + +import org.apache.kafka.common.header.Header; + +/** + * @author Gary Russell + * @since 2.9.11 + * + */ +public final class SerializationTestUtils { + + private SerializationTestUtils() { + } + + public static Header deserializationHeader(String key, byte[] value) { + return new DeserializationExceptionHeader(key, value); + } + +}
spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationUtilsTests.java+63 −0 added@@ -0,0 +1,63 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support.serializer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +import java.util.List; +import java.util.function.Supplier; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import org.springframework.core.log.LogAccessor; + +/** + * @author Gary Russell + * @since 2.9.11 + * + */ +public class SerializationUtilsTests { + + @Test + void foreignDeserEx() { + RecordHeaders headers = new RecordHeaders( + List.of(new RecordHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, "junk".getBytes()))); + ConsumerRecord<String, String> rec = mock(ConsumerRecord.class); + willReturn(headers).given(rec).headers(); + given(rec.topic()).willReturn("foo"); + given(rec.partition()).willReturn(1); + given(rec.offset()).willReturn(0L); + LogAccessor logger = spy(new LogAccessor(LogFactory.getLog(getClass()))); + ArgumentCaptor<Supplier<String>> captor = ArgumentCaptor.forClass(Supplier.class); + willAnswer(inv -> null).given(logger).warn(captor.capture()); + assertThat(SerializationUtils.getExceptionFromHeader(rec, + SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger)).isNull(); + assertThat(captor.getValue().get()) + .isEqualTo("Foreign deserialization exception header in (foo-1@0) ignored; possible attack?"); + } + +}
Vulnerability mechanics
Generated on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
5- github.com/advisories/GHSA-crqf-q9fp-hwjwghsaADVISORY
- nvd.nist.gov/vuln/detail/CVE-2023-34040ghsaADVISORY
- github.com/spring-projects/spring-kafka/commit/25ac793a78725e2ca4a3a2888a1506a4bfcf0c9dghsaWEB
- github.com/spring-projects/spring-kafka/commit/eb779679812f61a8553ced3d0e4069dca65560edghsaWEB
- spring.io/security/cve-2023-34040ghsaWEB
News mentions
0No linked articles in our index yet.