Moderate severityNVD Advisory· Published Oct 24, 2023· Updated Sep 11, 2024
RabbitMQ Java client's lack of message size limitation leads to remote DoS attack
CVE-2023-46120
Description
The RabbitMQ Java client library allows Java and JVM-based applications to connect to and interact with RabbitMQ nodes. maxBodyLebgth was not used when receiving Message objects. Attackers could send a very large Message causing a memory overflow and triggering an OOM Error. Users of RabbitMQ may suffer from DoS attacks from RabbitMQ Java client which will ultimately exhaust the memory of the consumer. This vulnerability was patched in version 5.18.0.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
com.rabbitmq:amqp-clientMaven | < 5.18.0 | 5.18.0 |
Affected products
1- Range: < 5.18.0
Patches
1714aae602dcaAdd max inbound message size to ConnectionFactory
17 files changed · +282 −48
src/main/java/com/rabbitmq/client/ConnectionFactory.java+30 −3 modified@@ -1,4 +1,4 @@ -// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -205,6 +205,13 @@ public class ConnectionFactory implements Cloneable { private CredentialsRefreshService credentialsRefreshService; + /** + * Maximum body size of inbound (received) messages in bytes. + * + * <p>Default value is 67,108,864 (64 MiB). + */ + private int maxInboundMessageBodySize = 1_048_576 * 64; + /** @return the default host to use for connections */ public String getHost() { return host; @@ -970,11 +977,15 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO if(this.nioParams.getNioExecutor() == null && this.nioParams.getThreadFactory() == null) { this.nioParams.setThreadFactory(getThreadFactory()); } - this.frameHandlerFactory = new SocketChannelFrameHandlerFactory(connectionTimeout, nioParams, isSSL(), sslContextFactory); + this.frameHandlerFactory = new SocketChannelFrameHandlerFactory( + connectionTimeout, nioParams, isSSL(), sslContextFactory, + this.maxInboundMessageBodySize); } return this.frameHandlerFactory; } else { - return new SocketFrameHandlerFactory(connectionTimeout, socketFactory, socketConf, isSSL(), this.shutdownExecutor, sslContextFactory); + return new SocketFrameHandlerFactory(connectionTimeout, socketFactory, + socketConf, isSSL(), this.shutdownExecutor, sslContextFactory, + this.maxInboundMessageBodySize); } } @@ -1273,6 +1284,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) { result.setRecoveredQueueNameSupplier(recoveredQueueNameSupplier); result.setTrafficListener(trafficListener); result.setCredentialsRefreshService(credentialsRefreshService); + result.setMaxInboundMessageBodySize(maxInboundMessageBodySize); return result; } @@ -1556,6 +1568,21 @@ public int getChannelRpcTimeout() { return channelRpcTimeout; } + /** + * Maximum body size of inbound (received) messages in bytes. + * + * <p>Default value is 67,108,864 (64 MiB). + * + * @param maxInboundMessageBodySize the maximum size of inbound messages + */ + public void setMaxInboundMessageBodySize(int maxInboundMessageBodySize) { + if (maxInboundMessageBodySize <= 0) { + throw new IllegalArgumentException("Max inbound message body size must be greater than 0: " + + maxInboundMessageBodySize); + } + this.maxInboundMessageBodySize = maxInboundMessageBodySize; + } + /** * The factory to create SSL contexts. * This provides more flexibility to create {@link SSLContext}s
src/main/java/com/rabbitmq/client/impl/AbstractFrameHandlerFactory.java+19 −1 modified@@ -1,3 +1,18 @@ +// Copyright (c) 2016-2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + package com.rabbitmq.client.impl; import com.rabbitmq.client.SocketConfigurator; @@ -10,10 +25,13 @@ public abstract class AbstractFrameHandlerFactory implements FrameHandlerFactory protected final int connectionTimeout; protected final SocketConfigurator configurator; protected final boolean ssl; + protected final int maxInboundMessageBodySize; - protected AbstractFrameHandlerFactory(int connectionTimeout, SocketConfigurator configurator, boolean ssl) { + protected AbstractFrameHandlerFactory(int connectionTimeout, SocketConfigurator configurator, + boolean ssl, int maxInboundMessageBodySize) { this.connectionTimeout = connectionTimeout; this.configurator = configurator; this.ssl = ssl; + this.maxInboundMessageBodySize = maxInboundMessageBodySize; } }
src/main/java/com/rabbitmq/client/impl/AMQChannel.java+6 −3 modified@@ -1,4 +1,4 @@ -// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -62,7 +62,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent { private final int _channelNumber; /** Command being assembled */ - private AMQCommand _command = new AMQCommand(); + private AMQCommand _command; /** The current outstanding RPC request, if any. (Could become a queue in future.) */ private RpcWrapper _activeRpc = null; @@ -76,6 +76,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent { private final boolean _checkRpcResponseType; private final TrafficListener _trafficListener; + private final int maxInboundMessageBodySize; /** * Construct a channel on the given connection, with the given channel number. @@ -91,6 +92,8 @@ public AMQChannel(AMQConnection connection, int channelNumber) { this._rpcTimeout = connection.getChannelRpcTimeout(); this._checkRpcResponseType = connection.willCheckRpcResponseType(); this._trafficListener = connection.getTrafficListener(); + this.maxInboundMessageBodySize = connection.getMaxInboundMessageBodySize(); + this._command = new AMQCommand(this.maxInboundMessageBodySize); } /** @@ -110,7 +113,7 @@ public int getChannelNumber() { public void handleFrame(Frame frame) throws IOException { AMQCommand command = _command; if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line - _command = new AMQCommand(); // prepare for the next one + _command = new AMQCommand(this.maxInboundMessageBodySize); // prepare for the next one handleCompleteInboundCommand(command); } }
src/main/java/com/rabbitmq/client/impl/AMQCommand.java+20 −4 modified@@ -1,4 +1,4 @@ -// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -44,17 +44,21 @@ public class AMQCommand implements Command { /** The assembler for this command - synchronised on - contains all the state */ private final CommandAssembler assembler; + AMQCommand(int maxBodyLength) { + this(null, null, null, maxBodyLength); + } + /** Construct a command ready to fill in by reading frames */ public AMQCommand() { - this(null, null, null); + this(null, null, null, Integer.MAX_VALUE); } /** * Construct a command with just a method, and without header or body. * @param method the wrapped method */ public AMQCommand(com.rabbitmq.client.Method method) { - this(method, null, null); + this(method, null, null, Integer.MAX_VALUE); } /** @@ -64,7 +68,19 @@ public AMQCommand(com.rabbitmq.client.Method method) { * @param body the message body data */ public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body) { - this.assembler = new CommandAssembler((Method) method, contentHeader, body); + this.assembler = new CommandAssembler((Method) method, contentHeader, body, Integer.MAX_VALUE); + } + + /** + * Construct a command with a specified method, header and body. + * @param method the wrapped method + * @param contentHeader the wrapped content header + * @param body the message body data + * @param maxBodyLength the maximum size for an inbound message body + */ + public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body, + int maxBodyLength) { + this.assembler = new CommandAssembler((Method) method, contentHeader, body, maxBodyLength); } /** Public API - {@inheritDoc} */
src/main/java/com/rabbitmq/client/impl/AMQConnection.java+7 −0 modified@@ -157,6 +157,7 @@ public static Map<String, Object> defaultClientProperties() { private volatile ChannelManager _channelManager; /** Saved server properties field from connection.start */ private volatile Map<String, Object> _serverProperties; + private final int maxInboundMessageBodySize; /** * Protected API - respond, in the driver thread, to a ShutdownSignal. @@ -244,6 +245,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics this.credentialsRefreshService = params.getCredentialsRefreshService(); + this._channel0 = createChannel0(); this._channelManager = null; @@ -257,6 +259,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics this.errorOnWriteListener = params.getErrorOnWriteListener() != null ? params.getErrorOnWriteListener() : (connection, exception) -> { throw exception; }; // we just propagate the exception for non-recoverable connections this.workPoolTimeout = params.getWorkPoolTimeout(); + this.maxInboundMessageBodySize = params.getMaxInboundMessageBodySize(); } AMQChannel createChannel0() { @@ -1202,4 +1205,8 @@ public boolean willCheckRpcResponseType() { public TrafficListener getTrafficListener() { return trafficListener; } + + int getMaxInboundMessageBodySize() { + return maxInboundMessageBodySize; + } }
src/main/java/com/rabbitmq/client/impl/CommandAssembler.java+16 −4 modified@@ -1,4 +1,4 @@ -// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -21,6 +21,7 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.UnexpectedFrameError; +import static java.lang.String.format; /** * Class responsible for piecing together a command from a series of {@link Frame}s. @@ -52,12 +53,16 @@ private enum CAState { /** No bytes of content body not yet accumulated */ private long remainingBodyBytes; - public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body) { + private final int maxBodyLength; + + public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body, + int maxBodyLength) { this.method = method; this.contentHeader = contentHeader; - this.bodyN = new ArrayList<byte[]>(2); + this.bodyN = new ArrayList<>(2); this.bodyLength = 0; this.remainingBodyBytes = 0; + this.maxBodyLength = maxBodyLength; appendBodyFragment(body); if (method == null) { this.state = CAState.EXPECTING_METHOD; @@ -99,7 +104,14 @@ private void consumeMethodFrame(Frame f) throws IOException { private void consumeHeaderFrame(Frame f) throws IOException { if (f.type == AMQP.FRAME_HEADER) { this.contentHeader = AMQImpl.readContentHeaderFrom(f.getInputStream()); - this.remainingBodyBytes = this.contentHeader.getBodySize(); + long bodySize = this.contentHeader.getBodySize(); + if (bodySize >= this.maxBodyLength) { + throw new IllegalStateException(format( + "Message body is too large (%d), maximum size is %d", + bodySize, this.maxBodyLength + )); + } + this.remainingBodyBytes = bodySize; updateContentBodyState(); } else { throw new UnexpectedFrameError(f, AMQP.FRAME_HEADER);
src/main/java/com/rabbitmq/client/impl/ConnectionParams.java+11 −1 modified@@ -1,4 +1,4 @@ -// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -64,6 +64,8 @@ public class ConnectionParams { private CredentialsRefreshService credentialsRefreshService; + private int maxInboundMessageBodySize; + public ConnectionParams() {} public CredentialsProvider getCredentialsProvider() { @@ -297,4 +299,12 @@ public void setCredentialsRefreshService(CredentialsRefreshService credentialsRe public CredentialsRefreshService getCredentialsRefreshService() { return credentialsRefreshService; } + + public int getMaxInboundMessageBodySize() { + return maxInboundMessageBodySize; + } + + public void setMaxInboundMessageBodySize(int maxInboundMessageBodySize) { + this.maxInboundMessageBodySize = maxInboundMessageBodySize; + } }
src/main/java/com/rabbitmq/client/impl/Frame.java+9 −2 modified@@ -1,4 +1,4 @@ -// Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -25,6 +25,7 @@ import java.util.Date; import java.util.List; import java.util.Map; +import static java.lang.String.format; /** * Represents an AMQP wire-protocol frame, with frame type, channel number, and payload bytes. @@ -82,7 +83,7 @@ public static Frame fromBodyFragment(int channelNumber, byte[] body, int offset, * * @return a new Frame if we read a frame successfully, otherwise null */ - public static Frame readFrom(DataInputStream is) throws IOException { + public static Frame readFrom(DataInputStream is, int maxPayloadSize) throws IOException { int type; int channel; @@ -108,6 +109,12 @@ public static Frame readFrom(DataInputStream is) throws IOException { channel = is.readUnsignedShort(); int payloadSize = is.readInt(); + if (payloadSize >= maxPayloadSize) { + throw new IllegalStateException(format( + "Frame body is too large (%d), maximum size is %d", + payloadSize, maxPayloadSize + )); + } byte[] payload = new byte[payloadSize]; is.readFully(payload);
src/main/java/com/rabbitmq/client/impl/nio/FrameBuilder.java+12 −3 modified@@ -1,4 +1,4 @@ -// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import static java.lang.String.format; /** * Class to create AMQP frames from a {@link ReadableByteChannel}. @@ -43,6 +44,7 @@ public class FrameBuilder { protected final ReadableByteChannel channel; protected final ByteBuffer applicationBuffer; + private final int maxPayloadSize; // to store the bytes of the outstanding data // 3 byte-long because the longest we read is an unsigned int // (not need to store the latest byte) @@ -52,9 +54,10 @@ public class FrameBuilder { private byte[] framePayload; private int bytesRead = 0; - public FrameBuilder(ReadableByteChannel channel, ByteBuffer buffer) { + public FrameBuilder(ReadableByteChannel channel, ByteBuffer buffer, int maxPayloadSize) { this.channel = channel; this.applicationBuffer = buffer; + this.maxPayloadSize = maxPayloadSize; } /** @@ -65,7 +68,7 @@ public FrameBuilder(ReadableByteChannel channel, ByteBuffer buffer) { * * @return a complete frame or null if a frame couldn't have been fully built * @throws IOException - * @see Frame#readFrom(DataInputStream) + * @see Frame#readFrom(DataInputStream, int) */ public Frame readFrame() throws IOException { while (somethingToRead()) { @@ -93,6 +96,12 @@ public Frame readFrame() throws IOException { } else if (bytesRead == 6) { // payload size 4/4 int framePayloadSize = (frameBuffer[0] << 24) + (frameBuffer[1] << 16) + (frameBuffer[2] << 8) + readFromBuffer(); + if (framePayloadSize >= maxPayloadSize) { + throw new IllegalStateException(format( + "Frame body is too large (%d), maximum size is %d", + framePayloadSize, maxPayloadSize + )); + } framePayload = new byte[framePayloadSize]; } else if (bytesRead >= PAYLOAD_OFFSET && bytesRead < framePayload.length + PAYLOAD_OFFSET) { framePayload[bytesRead - PAYLOAD_OFFSET] = (byte) readFromBuffer();
src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerFactory.java+7 −4 modified@@ -1,4 +1,4 @@ -// Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -58,8 +58,10 @@ public class SocketChannelFrameHandlerFactory extends AbstractFrameHandlerFactor private final List<NioLoopContext> nioLoopContexts; - public SocketChannelFrameHandlerFactory(int connectionTimeout, NioParams nioParams, boolean ssl, SslContextFactory sslContextFactory) { - super(connectionTimeout, null, ssl); + public SocketChannelFrameHandlerFactory(int connectionTimeout, NioParams nioParams, boolean ssl, + SslContextFactory sslContextFactory, + int maxInboundMessageBodySize) { + super(connectionTimeout, null, ssl, maxInboundMessageBodySize); this.nioParams = new NioParams(nioParams); this.sslContextFactory = sslContextFactory; this.nioLoopContexts = new ArrayList<>(this.nioParams.getNbIoThreads()); @@ -134,7 +136,8 @@ public FrameHandler create(Address addr, String connectionName) throws IOExcepti channel, nioLoopContext, nioParams, - sslEngine + sslEngine, + this.maxInboundMessageBodySize ); state.startReading(); SocketChannelFrameHandler frameHandler = new SocketChannelFrameHandler(state);
src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java+7 −4 modified@@ -1,4 +1,4 @@ -// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -71,7 +71,9 @@ public class SocketChannelFrameHandlerState { final FrameBuilder frameBuilder; - public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioLoopsState, NioParams nioParams, SSLEngine sslEngine) { + public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioLoopsState, + NioParams nioParams, SSLEngine sslEngine, + int maxFramePayloadSize) { this.channel = channel; this.readSelectorState = nioLoopsState.readSelectorState; this.writeSelectorState = nioLoopsState.writeSelectorState; @@ -94,7 +96,7 @@ public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioL new ByteBufferOutputStream(channel, plainOut) ); - this.frameBuilder = new FrameBuilder(channel, plainIn); + this.frameBuilder = new FrameBuilder(channel, plainIn, maxFramePayloadSize); } else { this.ssl = true; @@ -106,7 +108,8 @@ public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioL this.outputStream = new DataOutputStream( new SslEngineByteBufferOutputStream(sslEngine, plainOut, cipherOut, channel) ); - this.frameBuilder = new SslEngineFrameBuilder(sslEngine, plainIn, cipherIn, channel); + this.frameBuilder = new SslEngineFrameBuilder(sslEngine, plainIn, + cipherIn, channel, maxFramePayloadSize); } }
src/main/java/com/rabbitmq/client/impl/nio/SslEngineFrameBuilder.java+5 −3 modified@@ -1,4 +1,4 @@ -// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -35,8 +35,10 @@ public class SslEngineFrameBuilder extends FrameBuilder { private boolean isUnderflowHandlingEnabled = false; - public SslEngineFrameBuilder(SSLEngine sslEngine, ByteBuffer plainIn, ByteBuffer cipherIn, ReadableByteChannel channel) { - super(channel, plainIn); + public SslEngineFrameBuilder(SSLEngine sslEngine, ByteBuffer plainIn, + ByteBuffer cipherIn, ReadableByteChannel channel, + int maxPayloadSize) { + super(channel, plainIn, maxPayloadSize); this.sslEngine = sslEngine; this.cipherBuffer = cipherIn; }
src/main/java/com/rabbitmq/client/impl/SocketFrameHandlerFactory.java+7 −5 modified@@ -1,4 +1,4 @@ -// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -38,12 +38,14 @@ public SocketFrameHandlerFactory(int connectionTimeout, SocketFactory socketFact public SocketFrameHandlerFactory(int connectionTimeout, SocketFactory socketFactory, SocketConfigurator configurator, boolean ssl, ExecutorService shutdownExecutor) { - this(connectionTimeout, socketFactory, configurator, ssl, shutdownExecutor, null); + this(connectionTimeout, socketFactory, configurator, ssl, shutdownExecutor, null, + Integer.MAX_VALUE); } public SocketFrameHandlerFactory(int connectionTimeout, SocketFactory socketFactory, SocketConfigurator configurator, - boolean ssl, ExecutorService shutdownExecutor, SslContextFactory sslContextFactory) { - super(connectionTimeout, configurator, ssl); + boolean ssl, ExecutorService shutdownExecutor, SslContextFactory sslContextFactory, + int maxInboundMessageBodySize) { + super(connectionTimeout, configurator, ssl, maxInboundMessageBodySize); this.socketFactory = socketFactory; this.shutdownExecutor = shutdownExecutor; this.sslContextFactory = sslContextFactory; @@ -79,7 +81,7 @@ protected Socket createSocket(String connectionName) throws IOException { public FrameHandler create(Socket sock) throws IOException { - return new SocketFrameHandler(sock, this.shutdownExecutor); + return new SocketFrameHandler(sock, this.shutdownExecutor, this.maxInboundMessageBodySize); } private static void quietTrySocketClose(Socket socket) {
src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java+8 −4 modified@@ -1,4 +1,4 @@ -// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -52,22 +52,26 @@ public class SocketFrameHandler implements FrameHandler { /** Socket's outputstream - data to the broker - synchronized on */ private final DataOutputStream _outputStream; + private final int maxInboundMessageBodySize; + /** Time to linger before closing the socket forcefully. */ public static final int SOCKET_CLOSING_TIMEOUT = 1; /** * @param socket the socket to use */ public SocketFrameHandler(Socket socket) throws IOException { - this(socket, null); + this(socket, null, Integer.MAX_VALUE); } /** * @param socket the socket to use */ - public SocketFrameHandler(Socket socket, ExecutorService shutdownExecutor) throws IOException { + public SocketFrameHandler(Socket socket, ExecutorService shutdownExecutor, + int maxInboundMessageBodySize) throws IOException { _socket = socket; _shutdownExecutor = shutdownExecutor; + this.maxInboundMessageBodySize = maxInboundMessageBodySize; _inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream())); _outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream())); @@ -181,7 +185,7 @@ public void initialize(AMQConnection connection) { @Override public Frame readFrame() throws IOException { synchronized (_inputStream) { - return Frame.readFrom(_inputStream); + return Frame.readFrom(_inputStream, this.maxInboundMessageBodySize); } }
src/test/java/com/rabbitmq/client/test/FrameBuilderTest.java+4 −4 modified@@ -59,7 +59,7 @@ void tearDown() throws Exception { @Test public void buildFrameInOneGo() throws IOException { buffer = ByteBuffer.wrap(new byte[] { 1, 0, 0, 0, 0, 0, 3, 1, 2, 3, end() }); - builder = new FrameBuilder(channel, buffer); + builder = new FrameBuilder(channel, buffer, Integer.MAX_VALUE); Frame frame = builder.readFrame(); assertThat(frame).isNotNull(); assertThat(frame.type).isEqualTo(1); @@ -78,7 +78,7 @@ public void buildFramesInOneGo() throws IOException { } } buffer = ByteBuffer.wrap(frames); - builder = new FrameBuilder(channel, buffer); + builder = new FrameBuilder(channel, buffer, Integer.MAX_VALUE); int frameCount = 0; Frame frame; while ((frame = builder.readFrame()) != null) { @@ -94,7 +94,7 @@ public void buildFramesInOneGo() throws IOException { @Test public void buildFrameInSeveralCalls() throws IOException { buffer = ByteBuffer.wrap(new byte[] { 1, 0, 0, 0, 0, 0, 3, 1, 2 }); - builder = new FrameBuilder(channel, buffer); + builder = new FrameBuilder(channel, buffer, Integer.MAX_VALUE); Frame frame = builder.readFrame(); assertThat(frame).isNull(); @@ -131,7 +131,7 @@ public void protocolMismatchHeader() throws IOException { }; for (int i = 0; i < buffers.length; i++) { - builder = new FrameBuilder(channel, buffers[i]); + builder = new FrameBuilder(channel, buffers[i], Integer.MAX_VALUE); try { builder.readFrame(); fail("protocol header not correct, exception should have been thrown");
src/test/java/com/rabbitmq/client/test/MaxInboundMessageSizeTest.java+97 −0 added@@ -0,0 +1,97 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.test; + +import static com.rabbitmq.client.test.TestUtils.LatchConditions.completed; +import static org.assertj.core.api.Assertions.assertThat; + +import com.rabbitmq.client.*; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +public class MaxInboundMessageSizeTest extends BrokerTestCase { + + String q; + + private static void safeClose(Connection c) { + try { + c.close(); + } catch (Exception e) { + // OK + } + } + + @Override + protected void createResources() throws IOException, TimeoutException { + q = generateQueueName(); + declareTransientQueue(q); + super.createResources(); + } + + @CsvSource({ + "20000,5000,true", + "20000,100000,true", + "20000,5000,false", + "20000,100000,false", + }) + @ParameterizedTest + void maxInboundMessageSizeMustBeEnforced(int maxMessageSize, int frameMax, boolean basicGet) + throws Exception { + ConnectionFactory cf = newConnectionFactory(); + cf.setMaxInboundMessageBodySize(maxMessageSize); + cf.setRequestedFrameMax(frameMax); + Connection c = cf.newConnection(); + try { + Channel ch = c.createChannel(); + ch.confirmSelect(); + byte[] body = new byte[maxMessageSize * 2]; + ch.basicPublish("", q, null, body); + ch.waitForConfirmsOrDie(); + AtomicReference<Throwable> exception = new AtomicReference<>(); + CountDownLatch errorLatch = new CountDownLatch(1); + ch.addShutdownListener( + cause -> { + exception.set(cause.getCause()); + errorLatch.countDown(); + }); + if (basicGet) { + try { + ch.basicGet(q, true); + } catch (Exception e) { + // OK for basicGet + } + } else { + ch.basicConsume(q, new DefaultConsumer(ch)); + } + assertThat(errorLatch).is(completed()); + assertThat(exception.get()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Message body is too large"); + } finally { + safeClose(c); + } + } + + @Override + protected void releaseResources() throws IOException { + deleteQueue(q); + super.releaseResources(); + } +}
src/test/java/com/rabbitmq/client/test/TestUtils.java+17 −3 modified@@ -26,11 +26,11 @@ import java.lang.annotation.Target; import java.util.function.Function; import org.assertj.core.api.Assertions; +import org.assertj.core.api.Condition; import org.junit.jupiter.api.extension.ConditionEvaluationResult; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ExtensionContext.Namespace; -import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; @@ -47,8 +47,6 @@ public class TestUtils { - private static final Logger LOGGER = LoggerFactory.getLogger(TestUtils.class); - public static final boolean USE_NIO = System.getProperty("use.nio") != null; public static ConnectionFactory connectionFactory() { @@ -303,6 +301,22 @@ public interface CallableFunction<T, R> { } + public static class LatchConditions { + + static Condition<CountDownLatch> completed() { + return new Condition<>( + countDownLatch-> { + try { + return countDownLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + "Latch did not complete in 10 seconds"); + } + + } + public static boolean basicGetBasicConsume(Connection connection, String queue, final CountDownLatch latch, int msgSize) throws Exception { Channel channel = connection.createChannel();
Vulnerability mechanics
Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
6- github.com/advisories/GHSA-mm8h-8587-p46hghsaADVISORY
- nvd.nist.gov/vuln/detail/CVE-2023-46120ghsaADVISORY
- github.com/rabbitmq/rabbitmq-java-client/commit/714aae602dcae6cb4b53cadf009323ebac313cc8ghsax_refsource_MISCWEB
- github.com/rabbitmq/rabbitmq-java-client/issues/1062ghsax_refsource_MISCWEB
- github.com/rabbitmq/rabbitmq-java-client/releases/tag/v5.18.0ghsax_refsource_MISCWEB
- github.com/rabbitmq/rabbitmq-java-client/security/advisories/GHSA-mm8h-8587-p46hghsax_refsource_CONFIRMWEB
News mentions
0No linked articles in our index yet.