VYPR
High severity7.5NVD Advisory· Published Jun 10, 2026· Updated Jun 10, 2026

Acknowledgement extension out of memory

CVE-2025-53114

Description

CometD server versions prior to 5.0.x, 6.0.x, and 8.0.x are vulnerable to an OutOfMemoryError due to a flawed acknowledgement extension.

AI Insight

LLM-synthesized narrative grounded in this CVE's description and references.

CometD server versions prior to 5.0.x, 6.0.x, and 8.0.x are vulnerable to an OutOfMemoryError due to a flawed acknowledgement extension.

Vulnerability

Certain versions of CometD servers, specifically prior to 5.0.x, 6.0.x, and 8.0.x, are susceptible to an OutOfMemoryError. This occurs when a malicious client repeatedly sends connection requests with a fixed acknowledgement value, preventing the server from clearing its unacknowledged message queue [3], [4].

Exploitation

An attacker can exploit this vulnerability by sending continuous /meta/connect messages with the ext: { "ack": 1 } field to a CometD server. This specific payload causes the server to indefinitely accumulate messages in its unacknowledged queue, as it never clears them [3], [4]. The attacker only needs network access to the server.

Impact

Successful exploitation leads to a denial-of-service condition. The unacknowledged message queue on the server grows without bound, eventually exhausting all available heap memory and causing an OutOfMemoryError, resulting in a complete server outage [3], [4]. A single malicious client can trigger this condition.

Mitigation

Patches are available for CometD versions 5.0.x, 6.0.x, and 8.0.x, released via pull requests [1], [2]. A workaround is to disable the acknowledgement extension entirely. No information regarding End-of-Life status or KEV listing is available in the provided references.

AI Insight generated on Jun 10, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.

Affected products

1
  • CometD/CometDllm-fuzzy
    Range: before 5.0.x, before 6.0.x, before 8.0.x

Patches

3
08fa2f2690b6

Backported improvements from #2117. (#2168)

https://github.com/cometd/cometdSimone BordetAug 19, 2025via body-scan-shorthand
2 files changed · +47 3
  • cometd-java/cometd-java-server/cometd-java-server-common/src/main/java/org/cometd/server/ext/AcknowledgedMessagesExtension.java+21 0 modified
    @@ -17,6 +17,7 @@
     
     import java.util.List;
     import java.util.Map;
    +import java.util.Queue;
     import java.util.concurrent.CopyOnWriteArrayList;
     import org.cometd.bayeux.Channel;
     import org.cometd.bayeux.server.BayeuxServer;
    @@ -38,6 +39,7 @@
     public class AcknowledgedMessagesExtension implements Extension {
         private final Logger _logger = LoggerFactory.getLogger(getClass().getName());
         private final List<Listener> _listeners = new CopyOnWriteArrayList<>();
    +    private int _maxQueueSize = -1;
     
         public void addListener(Listener listener) {
             _listeners.add(listener);
    @@ -47,6 +49,14 @@ public void removeListener(Listener listener) {
             _listeners.remove(listener);
         }
     
    +    public int getMaxQueueSize() {
    +        return _maxQueueSize;
    +    }
    +
    +    public void setMaxQueueSize(int maxQueueSize) {
    +        _maxQueueSize = maxQueueSize;
    +    }
    +
         @Override
         public boolean rcvMeta(ServerSession remote, Mutable message) {
             if (Channel.META_HANDSHAKE.equals(message.getChannel())) {
    @@ -60,6 +70,7 @@ public boolean rcvMeta(ServerSession remote, Mutable message) {
     
                     AcknowledgedMessagesSessionExtension extension = newSessionExtension(remote);
                     extension.addListeners(_listeners);
    +                extension.setMaxQueueSize(getMaxQueueSize());
     
                     // Make sure that adding the extension and importing the queue is atomic.
                     ServerSessionImpl session = (ServerSessionImpl)remote;
    @@ -105,5 +116,15 @@ default void onBatchSend(ServerSession session, List<ServerMessage> messages, lo
              */
             default void onBatchReceive(ServerSession session, long batch) {
             }
    +
    +        /**
    +         * <p>Callback method invoked when the unacknowledged message queue
    +         * size exceeds the value returned by {@link #getMaxQueueSize()}.</p>
    +         *
    +         * @param session the session
    +         * @param queue   the unacknowledged message queue
    +         */
    +        default void onBatchQueueMaxed(ServerSession session, Queue<ServerMessage> queue) {
    +        }
         }
     }
    
  • cometd-java/cometd-java-server/cometd-java-server-common/src/main/java/org/cometd/server/ext/AcknowledgedMessagesSessionExtension.java+26 3 modified
    @@ -43,6 +43,7 @@ public class AcknowledgedMessagesSessionExtension implements Extension, ServerSe
         private final ServerSessionImpl _session;
         private final BatchArrayQueue<ServerMessage> _queue;
         private long _lastBatch;
    +    private int _maxQueueSize;
     
         public AcknowledgedMessagesSessionExtension(ServerSession session) {
             _session = (ServerSessionImpl)session;
    @@ -63,6 +64,14 @@ public void removeListener(AcknowledgedMessagesExtension.Listener listener) {
             _listeners.remove(listener);
         }
     
    +    public int getMaxQueueSize() {
    +        return _maxQueueSize;
    +    }
    +
    +    public void setMaxQueueSize(int maxQueueSize) {
    +        _maxQueueSize = maxQueueSize;
    +    }
    +
         @Override
         public boolean rcv(ServerSession from, Mutable message) {
             return true;
    @@ -124,6 +133,10 @@ public void queued(ServerSession sender, ServerMessage message) {
                 if (_logger.isDebugEnabled()) {
                     _logger.debug("Stored at batch {} {} for {}", _queue.getBatch(), message, _session);
                 }
    +            int maxQueueSize = getMaxQueueSize();
    +            if (maxQueueSize > 0 && _queue.size() > maxQueueSize) {
    +                notifyBatchQueueMaxed(_session, _queue);
    +            }
             }
         }
     
    @@ -174,8 +187,8 @@ public void deQueue(ServerSession session, Queue<ServerMessage> queue, List<Muta
                 }
             }
             if (reply != null) {
    -            long batch = _batches.remove(reply.getId());
                 synchronized (_session.getLock()) {
    +                long batch = _batches.remove(reply.getId());
                     if (_logger.isDebugEnabled()) {
                         _logger.debug("Dequeuing {}/{} messages until batch {} for {} on {}", queue.size(), _queue.size(), batch, reply, _session);
                     }
    @@ -205,7 +218,7 @@ private void notifyBatchSend(ServerSession session, Queue<ServerMessage> queue,
                 try {
                     listener.onBatchSend(session, messages, batch);
                 } catch (Throwable x) {
    -                _logger.info("Exception while invoking listener " + listener, x);
    +                _logger.info("Exception while invoking listener {}", listener, x);
                 }
             }
         }
    @@ -215,7 +228,17 @@ private void notifyBatchReceive(ServerSession session, long batch) {
                 try {
                     listener.onBatchReceive(session, batch);
                 } catch (Throwable x) {
    -                _logger.info("Exception while invoking listener " + listener, x);
    +                _logger.info("Exception while invoking listener {}", listener, x);
    +            }
    +        }
    +    }
    +
    +    private void notifyBatchQueueMaxed(ServerSession session, Queue<ServerMessage> queue) {
    +        for (AcknowledgedMessagesExtension.Listener listener : _listeners) {
    +            try {
    +                listener.onBatchQueueMaxed(session, queue);
    +            } catch (Throwable x) {
    +                _logger.info("Exception while invoking listener {}", listener, x);
                 }
             }
         }
    
f28d4fb4a04d

Backported improvements from #2117.

https://github.com/cometd/cometdSimone BordetAug 19, 2025via body-scan-shorthand
2 files changed · +47 3
  • cometd-java/cometd-java-server/cometd-java-server-common/src/main/java/org/cometd/server/ext/AcknowledgedMessagesExtension.java+21 0 modified
    @@ -17,6 +17,7 @@
     
     import java.util.List;
     import java.util.Map;
    +import java.util.Queue;
     import java.util.concurrent.CopyOnWriteArrayList;
     import org.cometd.bayeux.Channel;
     import org.cometd.bayeux.server.BayeuxServer;
    @@ -38,6 +39,7 @@
     public class AcknowledgedMessagesExtension implements Extension {
         private final Logger _logger = LoggerFactory.getLogger(getClass().getName());
         private final List<Listener> _listeners = new CopyOnWriteArrayList<>();
    +    private int _maxQueueSize = -1;
     
         public void addListener(Listener listener) {
             _listeners.add(listener);
    @@ -47,6 +49,14 @@ public void removeListener(Listener listener) {
             _listeners.remove(listener);
         }
     
    +    public int getMaxQueueSize() {
    +        return _maxQueueSize;
    +    }
    +
    +    public void setMaxQueueSize(int maxQueueSize) {
    +        _maxQueueSize = maxQueueSize;
    +    }
    +
         @Override
         public boolean rcvMeta(ServerSession remote, Mutable message) {
             if (Channel.META_HANDSHAKE.equals(message.getChannel())) {
    @@ -60,6 +70,7 @@ public boolean rcvMeta(ServerSession remote, Mutable message) {
     
                     AcknowledgedMessagesSessionExtension extension = newSessionExtension(remote);
                     extension.addListeners(_listeners);
    +                extension.setMaxQueueSize(getMaxQueueSize());
     
                     // Make sure that adding the extension and importing the queue is atomic.
                     ServerSessionImpl session = (ServerSessionImpl)remote;
    @@ -108,5 +119,15 @@ default void onBatchSend(ServerSession session, List<ServerMessage> messages, lo
              */
             default void onBatchReceive(ServerSession session, long batch) {
             }
    +
    +        /**
    +         * <p>Callback method invoked when the unacknowledged message queue
    +         * size exceeds the value returned by {@link #getMaxQueueSize()}.</p>
    +         *
    +         * @param session the session
    +         * @param queue   the unacknowledged message queue
    +         */
    +        default void onBatchQueueMaxed(ServerSession session, Queue<ServerMessage> queue) {
    +        }
         }
     }
    
  • cometd-java/cometd-java-server/cometd-java-server-common/src/main/java/org/cometd/server/ext/AcknowledgedMessagesSessionExtension.java+26 3 modified
    @@ -41,6 +41,7 @@ public class AcknowledgedMessagesSessionExtension implements Extension, ServerSe
         private final ServerSessionImpl _session;
         private final BatchArrayQueue<ServerMessage> _queue;
         private long _lastBatch;
    +    private int _maxQueueSize;
     
         public AcknowledgedMessagesSessionExtension(ServerSession session) {
             _session = (ServerSessionImpl)session;
    @@ -61,6 +62,14 @@ public void removeListener(AcknowledgedMessagesExtension.Listener listener) {
             _listeners.remove(listener);
         }
     
    +    public int getMaxQueueSize() {
    +        return _maxQueueSize;
    +    }
    +
    +    public void setMaxQueueSize(int maxQueueSize) {
    +        _maxQueueSize = maxQueueSize;
    +    }
    +
         @Override
         public boolean rcv(ServerSession from, Mutable message) {
             return true;
    @@ -129,6 +138,10 @@ public void queued(ServerSession sender, ServerMessage message) {
                 if (_logger.isDebugEnabled()) {
                     _logger.debug("Stored at batch {} {} for {}", _queue.getBatch(), message, _session);
                 }
    +            int maxQueueSize = getMaxQueueSize();
    +            if (maxQueueSize > 0 && _queue.size() > maxQueueSize) {
    +                notifyBatchQueueMaxed(_session, _queue);
    +            }
             } finally {
                 _session.getLock().unlock();
             }
    @@ -184,9 +197,9 @@ public void deQueue(ServerSession session, Queue<ServerMessage> queue, List<Muta
                 }
             }
             if (reply != null) {
    -            long batch = _batches.remove(reply.getId());
                 _session.getLock().lock();
                 try {
    +                long batch = _batches.remove(reply.getId());
                     if (_logger.isDebugEnabled()) {
                         _logger.debug("Dequeuing {}/{} messages until batch {} for {} on {}", queue.size(), _queue.size(), batch, reply, _session);
                     }
    @@ -221,7 +234,7 @@ private void notifyBatchSend(ServerSession session, Queue<ServerMessage> queue,
                 try {
                     listener.onBatchSend(session, messages, batch);
                 } catch (Throwable x) {
    -                _logger.info("Exception while invoking listener " + listener, x);
    +                _logger.info("Exception while invoking listener {}", listener, x);
                 }
             }
         }
    @@ -231,7 +244,17 @@ private void notifyBatchReceive(ServerSession session, long batch) {
                 try {
                     listener.onBatchReceive(session, batch);
                 } catch (Throwable x) {
    -                _logger.info("Exception while invoking listener " + listener, x);
    +                _logger.info("Exception while invoking listener {}", listener, x);
    +            }
    +        }
    +    }
    +
    +    private void notifyBatchQueueMaxed(ServerSession session, Queue<ServerMessage> queue) {
    +        for (AcknowledgedMessagesExtension.Listener listener : _listeners) {
    +            try {
    +                listener.onBatchQueueMaxed(session, queue);
    +            } catch (Throwable x) {
    +                _logger.info("Exception while invoking listener {}", listener, x);
                 }
             }
         }
    
1df70a2563fc

Fixes #2117 - Improve ack extension.

https://github.com/cometd/cometdSimone BordetJul 1, 2025via body-scan-shorthand
8 files changed · +380 39
  • cometd-documentation/src/main/asciidoc/security.adoc+28 0 modified
    @@ -148,3 +148,31 @@ However, it will have `+Origin: https://evilbob.com+` and not the expected `+Ori
     As with the CSRF attack, the application at `cometd-chat.com` can install the cross-origin filter and configure it to allow requests only from the `cometd-chat.com` origin, effectively blocking Bob's CSWSH attack.
     
     In this case, the cross-origin filter must be installed _before_ the WebSocket upgrade mechanism takes place, or the WebSocket upgrade mechanism must have a way to test against a configured list of allowed origins and reject the WebSocket connection attempt if the origin is not allowed.
    +
    +[[_security_memory]]
    +=== Limiting the Server Memory Usage
    +
    +Server-side CometD applications create a xref:concepts.adoc#_concepts_sessions[server session] for each remote client, and each server session has a message queue, where the server enqueues messages destined to that particular remote client.
    +
    +If the communication between the remote client and the server is faulty, for example due to network issues, the server maintains the server session around for a short period of time (configurable via the `maxInterval` parameter, see xref:java_server_configuration.adoc#_java_server_configuration_transports[this section] for more details), in case the communication can be restored.
    +If the communication cannot be restored within `maxInterval`, the server expires the server session, which is discarded along with its queue of messages.
    +
    +During these short communication outages, messages are still enqueued in the message queue of the server sessions, but they likely cannot be sent, so the message queue continues to grow.
    +If this situation happens for a large number of sessions, the server memory usage may grow beyond limits and possibly cause ``OutOfMemoryError``s or other server-wide outages.
    +
    +Server-side CometD applications can register a `ServerSession.QueueMaxedListener` to be notified when the server session queue size exceeds the value configured with the `maxQueue` parameter (see xref:java_server_configuration.adoc#_java_server_configuration[this section] for more details).
    +
    +Similarly, Server-side CometD applications that use the xref:extensions_acknowledge.adoc[acknowledgement extension] can register a `AcknowledgedMessagesExtension.Listener` to be notified when the unacknowledged message queue exceeds the value configured via `AcknowledgedMessagesExtension.setMaxQueueSize(int)`.
    +
    +These "queue maxed" listeners can implement different strategies to cope with the growth of the queue, for example:
    +
    +* Clear the queue and disconnect the server session.
    +* Allow a dynamic queue size, perhaps depending on the current load, or on a particular server session, by allowing few more messages to be queued.
    +* Drain the queue and save the messages in an external storage, if the messages cannot be lost.
    +
    +Below you can find a code example for the first case, clear and disconnect:
    +
    +[source,java,indent=0]
    +----
    +include::{doc_code}/SecurityDocs.java[tags=queueMaxed]
    +----
    
  • cometd-documentation/src/main/java/org/cometd/documentation/SecurityDocs.java+54 0 added
    @@ -0,0 +1,54 @@
    +/*
    + * Copyright (c) 2008 the original author or authors.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.cometd.documentation;
    +
    +import java.util.Queue;
    +import org.cometd.bayeux.server.BayeuxServer;
    +import org.cometd.bayeux.server.ServerMessage;
    +import org.cometd.bayeux.server.ServerSession;
    +import org.cometd.server.BayeuxServerImpl;
    +
    +@SuppressWarnings("unused")
    +public class SecurityDocs {
    +    public void queueMaxed() {
    +        BayeuxServer bayeuxServer = new BayeuxServerImpl();
    +        // tag::queueMaxed[]
    +        // Configure CometDServlet init-param maxQueue=128,
    +        // or CometDHandler.setOptions(Map.of("maxQueue", "128"))
    +
    +        // Add a SessionListener to be notified every time a new
    +        // ServerSession is created, to add a QueueMaxedListener that
    +        // will be notified when the ServerSession queue size exceeds 128.
    +        bayeuxServer.addListener(new BayeuxServer.SessionListener() {
    +            @Override
    +            public void sessionAdded(ServerSession serverSession, ServerMessage serverMessage) {
    +                // Add a QueueMaxedListener that will be
    +                // notified when the queue size exceeds 128.
    +                serverSession.addListener(new ServerSession.QueueMaxedListener() {
    +                    @Override
    +                    public boolean queueMaxed(ServerSession session, Queue<ServerMessage> queue, ServerSession sender, ServerMessage message) {
    +                        // When the queue overflows, clear and disconnect.
    +                        queue.clear();
    +                        session.disconnect();
    +                        return false;
    +                    }
    +                });
    +            }
    +        });
    +        // end::queueMaxed[]
    +    }
    +}
    
  • cometd-java/cometd-java-server/cometd-java-server-common/src/main/java/org/cometd/server/ext/AcknowledgedMessagesExtension.java+21 0 modified
    @@ -18,6 +18,7 @@
     
     import java.util.List;
     import java.util.Map;
    +import java.util.Queue;
     import java.util.concurrent.CopyOnWriteArrayList;
     import org.cometd.bayeux.Channel;
     import org.cometd.bayeux.server.BayeuxServer;
    @@ -39,6 +40,7 @@
     public class AcknowledgedMessagesExtension implements Extension {
         private final Logger _logger = LoggerFactory.getLogger(getClass().getName());
         private final List<Listener> _listeners = new CopyOnWriteArrayList<>();
    +    private int _maxQueueSize = -1;
     
         public void addListener(Listener listener) {
             _listeners.add(listener);
    @@ -48,6 +50,14 @@ public void removeListener(Listener listener) {
             _listeners.remove(listener);
         }
     
    +    public int getMaxQueueSize() {
    +        return _maxQueueSize;
    +    }
    +
    +    public void setMaxQueueSize(int maxQueueSize) {
    +        _maxQueueSize = maxQueueSize;
    +    }
    +
         @Override
         public boolean rcvMeta(ServerSession remote, Mutable message) {
             if (Channel.META_HANDSHAKE.equals(message.getChannel())) {
    @@ -61,6 +71,7 @@ public boolean rcvMeta(ServerSession remote, Mutable message) {
     
                     AcknowledgedMessagesSessionExtension extension = newSessionExtension(remote);
                     extension.addListeners(_listeners);
    +                extension.setMaxQueueSize(getMaxQueueSize());
     
                     // Make sure that adding the extension and importing the queue is atomic.
                     ServerSessionImpl session = (ServerSessionImpl)remote;
    @@ -109,5 +120,15 @@ default void onBatchSend(ServerSession session, List<ServerMessage> messages, lo
              */
             default void onBatchReceive(ServerSession session, long batch) {
             }
    +
    +        /**
    +         * <p>Callback method invoked when the unacknowledged message queue
    +         * size exceeds the value returned by {@link #getMaxQueueSize()}.</p>
    +         *
    +         * @param session the session
    +         * @param queue   the unacknowledged message queue
    +         */
    +        default void onBatchQueueMaxed(ServerSession session, Queue<ServerMessage> queue) {
    +        }
         }
     }
    
  • cometd-java/cometd-java-server/cometd-java-server-common/src/main/java/org/cometd/server/ext/AcknowledgedMessagesSessionExtension.java+23 0 modified
    @@ -42,6 +42,7 @@ public class AcknowledgedMessagesSessionExtension implements Extension, ServerSe
         private final ServerSessionImpl _session;
         private final BatchArrayQueue<ServerMessage> _queue;
         private long _lastBatch;
    +    private int _maxQueueSize;
     
         public AcknowledgedMessagesSessionExtension(ServerSession session) {
             _session = (ServerSessionImpl)session;
    @@ -62,6 +63,14 @@ public void removeListener(AcknowledgedMessagesExtension.Listener listener) {
             _listeners.remove(listener);
         }
     
    +    public int getMaxQueueSize() {
    +        return _maxQueueSize;
    +    }
    +
    +    public void setMaxQueueSize(int maxQueueSize) {
    +        _maxQueueSize = maxQueueSize;
    +    }
    +
         @Override
         public boolean rcv(ServerSession from, Mutable message) {
             return true;
    @@ -130,6 +139,10 @@ public void queued(ServerSession sender, ServerMessage message) {
                 if (_logger.isDebugEnabled()) {
                     _logger.debug("Stored at batch {} {} for {}", _queue.getBatch(), message, _session);
                 }
    +            int maxQueueSize = getMaxQueueSize();
    +            if (maxQueueSize > 0 && _queue.size() > maxQueueSize) {
    +                notifyBatchQueueMaxed(_session, _queue);
    +            }
             } finally {
                 _session.getLock().unlock();
             }
    @@ -237,6 +250,16 @@ private void notifyBatchReceive(ServerSession session, long batch) {
             }
         }
     
    +    private void notifyBatchQueueMaxed(ServerSession session, Queue<ServerMessage> queue) {
    +        for (AcknowledgedMessagesExtension.Listener listener : _listeners) {
    +            try {
    +                listener.onBatchQueueMaxed(session, queue);
    +            } catch (Throwable x) {
    +                _logger.info("Exception while invoking listener " + listener, x);
    +            }
    +        }
    +    }
    +
         // Used only in tests.
         public BatchArrayQueue<ServerMessage> getBatchArrayQueue() {
             return _queue;
    
  • cometd-java/cometd-java-server/cometd-java-server-common/src/main/java/org/cometd/server/ext/BatchArrayQueue.java+30 0 modified
    @@ -401,6 +401,36 @@ public void clearToBatch(long batch) {
             }
         }
     
    +    /**
    +     * <p>Copies the elements of this queue into the given queue,
    +     * up to the given batch number.</p>
    +     * <p>For example, given:</p>
    +     * <pre>{@code
    +     * this queue:
    +     *           head             tail
    +     *             |               |
    +     * elements: [E1, E2, E3, E4, E5]
    +     * batches : [ 1,  1,  2,  3,  3]
    +     *
    +     * target queue: []
    +     * }</pre>
    +     * <p>then calling {@code exportMessagesToBatch(2)} would copy
    +     * the elements belonging to batches up to {@code 2} into the
    +     * target queue, leaving this queue unchanged:</p>
    +     * <pre>{@code
    +     * this queue:
    +     *           head             tail
    +     *             |               |
    +     * elements: [E1, E2, E3, E4, E5]
    +     * batches : [ 1,  1,  2,  3,  3]
    +     *
    +     * target queue: [E1, E2, E3]
    +     * }</pre>
    +     *
    +     * @param target the target queue
    +     * @param batch  the batch number
    +     * @see #clearToBatch(long)
    +     */
         public void exportMessagesToBatch(Queue<T> target, long batch) {
             lock.lock();
             try {
    
  • cometd-java/cometd-java-server/cometd-java-server-common/src/test/java/org/cometd/server/ext/BatchArrayQueueTest.java+43 21 modified
    @@ -19,12 +19,15 @@
     import java.util.ArrayDeque;
     import java.util.Queue;
     import java.util.concurrent.locks.ReentrantLock;
    -import org.junit.jupiter.api.Assertions;
     import org.junit.jupiter.api.Test;
     
    +import static org.junit.jupiter.api.Assertions.assertEquals;
    +import static org.junit.jupiter.api.Assertions.assertNotNull;
    +import static org.junit.jupiter.api.Assertions.assertTrue;
    +
     public class BatchArrayQueueTest {
         @Test
    -    public void test_Offer_Next_Offer_Export_Clear() {
    +    public void testOfferNextOfferExportClear() {
             BatchArrayQueue<String> queue = new BatchArrayQueue<>(16, new ReentrantLock());
     
             queue.offer("A");
    @@ -36,21 +39,21 @@ public void test_Offer_Next_Offer_Export_Clear() {
             Queue<String> target = new ArrayDeque<>();
             queue.exportMessagesToBatch(target, batch);
     
    -        Assertions.assertEquals(1, target.size());
    +        assertEquals(1, target.size());
             String targetItem = target.peek();
    -        Assertions.assertNotNull(targetItem);
    -        Assertions.assertTrue(targetItem.startsWith("A"));
    +        assertNotNull(targetItem);
    +        assertTrue(targetItem.startsWith("A"));
     
             queue.clearToBatch(batch);
     
    -        Assertions.assertEquals(1, queue.size());
    +        assertEquals(1, queue.size());
             String queueItem = queue.peek();
    -        Assertions.assertNotNull(queueItem);
    -        Assertions.assertTrue(queueItem.startsWith("B"));
    +        assertNotNull(queueItem);
    +        assertTrue(queueItem.startsWith("B"));
         }
     
         @Test
    -    public void test_Offer_Grow_Poll_Offer() {
    +    public void testOfferGrowPollOffer() {
             BatchArrayQueue<String> queue = new BatchArrayQueue<>(2, new ReentrantLock());
     
             queue.offer("A1");
    @@ -63,22 +66,22 @@ public void test_Offer_Grow_Poll_Offer() {
     
             queue.offer("B1");
     
    -        Assertions.assertEquals(batch, queue.batchOf(0));
    -        Assertions.assertEquals(batch, queue.batchOf(1));
    -        Assertions.assertEquals(batch, queue.batchOf(2));
    -        Assertions.assertEquals(nextBatch, queue.batchOf(3));
    +        assertEquals(batch, queue.batchOf(0));
    +        assertEquals(batch, queue.batchOf(1));
    +        assertEquals(batch, queue.batchOf(2));
    +        assertEquals(nextBatch, queue.batchOf(3));
     
             queue.poll();
             queue.offer("B2");
     
    -        Assertions.assertEquals(batch, queue.batchOf(0));
    -        Assertions.assertEquals(batch, queue.batchOf(1));
    -        Assertions.assertEquals(nextBatch, queue.batchOf(2));
    -        Assertions.assertEquals(nextBatch, queue.batchOf(3));
    +        assertEquals(batch, queue.batchOf(0));
    +        assertEquals(batch, queue.batchOf(1));
    +        assertEquals(nextBatch, queue.batchOf(2));
    +        assertEquals(nextBatch, queue.batchOf(3));
         }
     
         @Test
    -    public void test_Offer_Grow_Next_Offer_Grow_Export_Clear() {
    +    public void testOfferGrowNextOfferGrowExportClear() {
             BatchArrayQueue<String> queue = new BatchArrayQueue<>(2, new ReentrantLock());
     
             queue.offer("A1");
    @@ -94,15 +97,34 @@ public void test_Offer_Grow_Next_Offer_Grow_Export_Clear() {
             Queue<String> target = new ArrayDeque<>();
             queue.exportMessagesToBatch(target, batch);
     
    -        Assertions.assertEquals(3, target.size());
    +        assertEquals(3, target.size());
             for (String element : target) {
    -            Assertions.assertTrue(element.startsWith("A"));
    +            assertTrue(element.startsWith("A"));
             }
     
             queue.clearToBatch(batch);
     
             for (String element : queue) {
    -            Assertions.assertTrue(element.startsWith("B"));
    +            assertTrue(element.startsWith("B"));
             }
         }
    +
    +    @Test
    +    public void testOfferNextOfferClearToCurrent() {
    +        BatchArrayQueue<String> queue = new BatchArrayQueue<>(16, new ReentrantLock());
    +
    +        queue.offer("A");
    +        queue.nextBatch();
    +        queue.offer("B");
    +        long batch = queue.getBatch();
    +        queue.clearToBatch(batch);
    +        assertEquals(0, queue.size());
    +        assertEquals(batch, queue.getBatch());
    +
    +        queue.nextBatch();
    +        queue.offer("C");
    +        queue.clear();
    +        assertEquals(0, queue.size());
    +        assertEquals(1, queue.getBatch());
    +    }
     }
    
  • cometd-java/cometd-java-server/cometd-java-server-http/cometd-java-server-http-tests/src/test/java/org/cometd/server/http/MaxQueuedTest.java+111 15 modified
    @@ -17,54 +17,70 @@
     package org.cometd.server.http;
     
     import java.util.HashMap;
    +import java.util.List;
     import java.util.Map;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +import org.cometd.bayeux.Channel;
    +import org.cometd.bayeux.Message;
     import org.cometd.bayeux.Promise;
    +import org.cometd.bayeux.server.ServerMessage;
     import org.cometd.bayeux.server.ServerSession;
    +import org.cometd.common.JettyJSONContextClient;
     import org.cometd.server.AbstractServerTransport;
    +import org.eclipse.jetty.client.CompletableResponseListener;
     import org.eclipse.jetty.client.ContentResponse;
     import org.eclipse.jetty.client.Request;
     import org.junit.jupiter.api.Assertions;
     import org.junit.jupiter.params.ParameterizedTest;
     import org.junit.jupiter.params.provider.MethodSource;
     
    +import static org.junit.jupiter.api.Assertions.assertNotNull;
    +import static org.junit.jupiter.api.Assertions.assertTrue;
    +
     public class MaxQueuedTest extends AbstractBayeuxClientServerTest {
         @ParameterizedTest
         @MethodSource("transports")
    -    public void testMaxQueued(Transport transport) throws Exception {
    +    public void testMaxQueuedThenServerSideRemoveSession(Transport transport) throws Exception {
             int maxQueue = 2;
             Map<String, String> options = new HashMap<>();
             options.put(AbstractServerTransport.MAX_QUEUE_OPTION, String.valueOf(maxQueue));
             // Makes the test simpler: publishes are only sent via /meta/connect.
             options.put(AbstractServerTransport.META_CONNECT_DELIVERY_OPTION, String.valueOf(true));
             startServer(transport, options);
     
    -        Request handshake = newBayeuxRequest("[{" +
    -                                             "\"channel\": \"/meta/handshake\"," +
    -                                             "\"version\": \"1.0\"," +
    -                                             "\"minimumVersion\": \"1.0\"," +
    -                                             "\"supportedConnectionTypes\": [\"long-polling\"]" +
    -                                             "}]");
    +        Request handshake = newBayeuxRequest("""
    +                [{
    +                "channel": "/meta/handshake",
    +                "version": "1.0",
    +                "minimumVersion": "1.0",
    +                "supportedConnectionTypes": ["long-polling"]
    +                }]""");
             ContentResponse response = handshake.send();
             Assertions.assertEquals(200, response.getStatus());
     
             String clientId = extractClientId(response);
     
    -        Request connect1 = newBayeuxRequest("[{" +
    -                                            "\"channel\": \"/meta/connect\"," +
    -                                            "\"clientId\": \"" + clientId + "\"," +
    -                                            "\"connectionType\": \"long-polling\"" +
    -                                            "}]");
    +        Request connect1 = newBayeuxRequest("""
    +                [{
    +                "channel": "/meta/connect",
    +                "clientId": "%s",
    +                "connectionType": "long-polling"
    +                }]""".formatted(clientId));
             response = connect1.send();
             Assertions.assertEquals(200, response.getStatus());
     
             ServerSession serverSession = bayeux.getSession(clientId);
    -        Assertions.assertNotNull(serverSession);
    +        assertNotNull(serverSession);
     
             serverSession.addListener((ServerSession.QueueMaxedListener)(session, queue, sender, message) -> {
                 // Cannot use session.disconnect(), because it will queue the
    -            // disconnect message and invoke this method again, causing a loop.
    +            // disconnect message and invoke this method again, because
    +            // the queue has not been cleared, causing a loop.
                 bayeux.removeSession(session);
    -            return false;
    +            // For this test, accept the overflowing message.
    +            return true;
             });
     
             // Overflow the message queue.
    @@ -75,4 +91,84 @@ public void testMaxQueued(Transport transport) throws Exception {
             // Session should be gone.
             Assertions.assertNull(bayeux.getSession(clientId));
         }
    +
    +    @ParameterizedTest
    +    @MethodSource("transports")
    +    public void testMaxQueuedThenServerSideDisconnect(Transport transport) throws Exception {
    +        int maxQueue = 2;
    +        Map<String, String> options = new HashMap<>();
    +        options.put(AbstractServerTransport.MAX_QUEUE_OPTION, String.valueOf(maxQueue));
    +        // Makes the test simpler: publishes are only sent via /meta/connect.
    +        options.put(AbstractServerTransport.META_CONNECT_DELIVERY_OPTION, String.valueOf(true));
    +        startServer(transport, options);
    +
    +        Request handshake = newBayeuxRequest("""
    +                [{
    +                "channel": "/meta/handshake",
    +                "version": "1.0",
    +                "minimumVersion": "1.0",
    +                "supportedConnectionTypes": ["long-polling"]
    +                }]""");
    +        ContentResponse response = handshake.send();
    +        Assertions.assertEquals(200, response.getStatus());
    +
    +        String clientId = extractClientId(response);
    +
    +        ServerSession serverSession = bayeux.getSession(clientId);
    +        assertNotNull(serverSession);
    +        CountDownLatch suspendedLatch = new CountDownLatch(1);
    +        serverSession.addListener(new ServerSession.HeartBeatListener() {
    +            @Override
    +            public void onSuspended(ServerSession session, ServerMessage message, long timeout) {
    +                suspendedLatch.countDown();
    +            }
    +        });
    +
    +        Request connect1 = newBayeuxRequest("""
    +                [{
    +                "channel": "/meta/connect",
    +                "clientId": "%s",
    +                "connectionType": "long-polling"
    +                "advice": { "timeout": 0 }
    +                }]""".formatted(clientId));
    +        response = connect1.send();
    +        Assertions.assertEquals(200, response.getStatus());
    +
    +        // This /meta/connect should be suspended.
    +        Request connect2 = newBayeuxRequest("""
    +                [{
    +                "channel": "/meta/connect",
    +                "clientId": "%s",
    +                "connectionType": "long-polling"
    +                }]""".formatted(clientId));
    +        CompletableFuture<ContentResponse> response2Completable = new CompletableResponseListener(connect2).send();
    +        assertTrue(suspendedLatch.await(5, TimeUnit.SECONDS));
    +
    +        serverSession.addListener((ServerSession.QueueMaxedListener)(session, queue, sender, message) -> {
    +            // Clear to queue to allow the disconnect
    +            // message to be queued without maxing.
    +            queue.clear();
    +            session.disconnect();
    +            return false;
    +        });
    +
    +        // Overflow the message queue in a batch,
    +        // so that the suspended /meta/connect is not resumed
    +        // when the first message is delivered.
    +        serverSession.batch(() -> {
    +            for (int i = 0; i < maxQueue + 1; ++i) {
    +                serverSession.deliver(null, "/max_queue", "message_" + i, Promise.noop());
    +            }
    +        });
    +
    +        ContentResponse response2 = response2Completable.get(5, TimeUnit.SECONDS);
    +        Assertions.assertEquals(200, response2.getStatus());
    +
    +        // Must have received the /meta/disconnect from the server.
    +        List<Message.Mutable> messages = new JettyJSONContextClient().parse(response2.getContentAsString());
    +        assertTrue(messages.stream().anyMatch(m -> Channel.META_DISCONNECT.equals(m.getChannel())));
    +
    +        // Session should be gone on server.
    +        Assertions.assertNull(bayeux.getSession(clientId));
    +    }
     }
    
  • cometd-java/cometd-java-tests/cometd-java-tests-common/src/test/java/org/cometd/tests/AckExtensionTest.java+70 3 modified
    @@ -18,20 +18,28 @@
     
     import java.util.List;
     import java.util.Map;
    +import java.util.Queue;
     import java.util.concurrent.ConcurrentHashMap;
     import java.util.concurrent.CountDownLatch;
     import java.util.concurrent.TimeUnit;
    +import org.cometd.bayeux.Channel;
    +import org.cometd.bayeux.Message;
     import org.cometd.bayeux.Promise;
    +import org.cometd.bayeux.client.ClientSession;
     import org.cometd.bayeux.client.ClientSessionChannel;
     import org.cometd.bayeux.server.ServerMessage;
     import org.cometd.bayeux.server.ServerSession;
     import org.cometd.client.BayeuxClient;
     import org.cometd.client.ext.AckExtension;
    +import org.cometd.server.AbstractServerTransport;
     import org.cometd.server.ext.AcknowledgedMessagesExtension;
     import org.junit.jupiter.api.Assertions;
     import org.junit.jupiter.params.ParameterizedTest;
     import org.junit.jupiter.params.provider.MethodSource;
     
    +import static org.junit.jupiter.api.Assertions.assertNotNull;
    +import static org.junit.jupiter.api.Assertions.assertTrue;
    +
     public class AckExtensionTest extends AbstractClientServerTest {
         @ParameterizedTest
         @MethodSource("transports")
    @@ -71,16 +79,75 @@ public void onBatchReceive(ServerSession session, long batch) {
                 }
             });
     
    -        Assertions.assertTrue(readyLatch.await(5, TimeUnit.SECONDS));
    +        assertTrue(readyLatch.await(5, TimeUnit.SECONDS));
             String sessionId = client.getId();
             ServerSession serverSession = bayeuxServer.getSession(sessionId);
             Assertions.assertNotNull(serverSession);
     
             // Send a message directly to the client.
             serverSession.deliver(null, channelName, "data", Promise.noop());
     
    -        Assertions.assertTrue(messageLatch.await(5, TimeUnit.SECONDS));
    -        Assertions.assertTrue(batchReceiveLatch.await(5, TimeUnit.SECONDS));
    +        assertTrue(messageLatch.await(5, TimeUnit.SECONDS));
    +        assertTrue(batchReceiveLatch.await(5, TimeUnit.SECONDS));
    +
    +        disconnectBayeuxClient(client);
    +    }
    +
    +    @ParameterizedTest
    +    @MethodSource("transports")
    +    public void testUnacknowledgedMessagesLimit(Transport transport) throws Exception {
    +        int maxQueue = 5;
    +        Map<String, String> options = serverOptions(transport);
    +        options.put(AbstractServerTransport.MAX_QUEUE_OPTION, String.valueOf(maxQueue));
    +        start(transport, options);
    +
    +        CountDownLatch batchQueueMaxedLatch = new CountDownLatch(1);
    +        AcknowledgedMessagesExtension ackExt = new AcknowledgedMessagesExtension();
    +        ackExt.setMaxQueueSize(maxQueue * 2);
    +        ackExt.addListener(new AcknowledgedMessagesExtension.Listener() {
    +            @Override
    +            public void onBatchQueueMaxed(ServerSession session, Queue<ServerMessage> queue) {
    +                batchQueueMaxedLatch.countDown();
    +            }
    +        });
    +        bayeuxServer.addExtension(ackExt);
    +
    +        String channelName = "/unacked-messages-limit";
    +
    +        // Do not add the client-side ack extension, we will handle it manually.
    +        BayeuxClient client = newBayeuxClient(transport);
    +
    +        client.addExtension(new ClientSession.Extension() {
    +            @Override
    +            public boolean sendMeta(ClientSession session, Message.Mutable message) {
    +                if (Channel.META_CONNECT.equals(message.getChannel())) {
    +                    message.getExt(true).put("ack", 1L);
    +                }
    +                return true;
    +            }
    +        });
    +
    +        CountDownLatch readyLatch = new CountDownLatch(1);
    +        client.handshake(Map.of("ext", Map.of("ack", true)), hsReply -> {
    +            if (hsReply.isSuccessful()) {
    +                ClientSessionChannel clientChannel = client.getChannel(channelName);
    +                clientChannel.subscribe((channel, message) -> {
    +                    // Not interested in receiving messages.
    +                }, sbReply -> readyLatch.countDown());
    +            }
    +        });
    +
    +        assertTrue(readyLatch.await(5, TimeUnit.SECONDS));
    +        ServerSession serverSession = bayeuxServer.getSession(client.getId());
    +        assertNotNull(serverSession);
    +
    +        client.batch(() -> {
    +            for (int i = 0; i < maxQueue * 2 + 1; ++i) {
    +                client.getChannel(channelName).publish("data-" + i);
    +            }
    +        });
    +
    +        assertTrue(batchQueueMaxedLatch.await(5, TimeUnit.SECONDS));
     
             disconnectBayeuxClient(client);
         }
    

Vulnerability mechanics

No source-code context for this CVE — mechanics is only generated when we can read the actual fix diff. Without that, the four sections (root cause, attack vector, affected code, fix) would be speculation rather than analysis.

References

6

News mentions

0

No linked articles in our index yet.