Apache Pulsar WebSocket Proxy: Improper Authentication for WebSocket Proxy Endpoint Allows DoS
Description
Improper Authentication vulnerability in Apache Pulsar WebSocket Proxy allows an attacker to connect to the /pingpong endpoint without authentication.
This issue affects Apache Pulsar WebSocket Proxy: from 2.8.0 through 2.8.*, from 2.9.0 through 2.9.*, from 2.10.0 through 2.10.4, from 2.11.0 through 2.11.1, 3.0.0.
The known risks include a denial of service due to the WebSocket Proxy accepting any connections, and excessive data transfer due to misuse of the WebSocket ping/pong feature.
2.10 Pulsar WebSocket Proxy users should upgrade to at least 2.10.5. 2.11 Pulsar WebSocket Proxy users should upgrade to at least 2.11.2. 3.0 Pulsar WebSocket Proxy users should upgrade to at least 3.0.1. 3.1 Pulsar WebSocket Proxy users are unaffected. Any users running the Pulsar WebSocket Proxy for 2.8, 2.9, and earlier should upgrade to one of the above patched versions.
AI Insight
LLM-synthesized narrative grounded in this CVE's description and references.
Apache Pulsar WebSocket Proxy's /pingpong endpoint lacks authentication, allowing unauthenticated connections that can lead to denial of service and excessive data transfer.
Vulnerability
Description
CVE-2023-37544 is an improper authentication vulnerability in the Apache Pulsar WebSocket Proxy. The /pingpong endpoint does not require any authentication, allowing an attacker to connect without credentials [1][2]. This issue affects Pulsar WebSocket Proxy versions from 2.8.0 through 2.8.x, 2.9.0 through 2.9.x, 2.10.0 through 2.10.4, 2.11.0 through 2.11.1, and 3.0.0 [2].
Attack
Vector and Exploitation
The vulnerable endpoint is publicly accessible and does not require any prior authentication or special network position. An attacker can simply initiate WebSocket connections to the /pingpong endpoint without any validation [2]. The fix, as seen in the referenced commits [3][4], involved removing the unnecessary ping/pong implementation and ensuring proper authentication checks are applied to the websocket handler.
Impact
Successful exploitation allows an attacker to cause a denial of service by exhausting server resources through unauthenticated connections. Additionally, misuse of the WebSocket ping/pong feature can lead to excessive data transfer, potentially impacting network bandwidth and server performance [2].
Mitigation
The vulnerability has been patched in Apache Pulsar WebSocket Proxy versions 2.10.5, 2.11.2, 3.0.1, and later. Users running affected versions should upgrade to the latest patched release. Version 3.1.0 and above are unaffected [2]. No workarounds have been provided.
AI Insight generated on May 20, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
org.apache.pulsar:pulsar-websocketMaven | < 2.10.5 | 2.10.5 |
org.apache.pulsar:pulsar-websocketMaven | >= 2.11.0, < 2.11.2 | 2.11.2 |
org.apache.pulsar:pulsar-websocketMaven | >= 3.0.0, < 3.0.1 | 3.0.1 |
Affected products
32.8.0-2.8.*, 2.9.0-2.9.*, 2.10.0-2.10.4, 2.11.0-2.11.1, 3.0.0+ 1 more
- (no CPE)range: 2.8.0-2.8.*, 2.9.0-2.9.*, 2.10.0-2.10.4, 2.11.0-2.11.1, 3.0.0
- (no CPE)range: 2.8.0
Patches
311ee36d03516[fix][ws] Remove unnecessary ping/pong implementation (#20733)
7 files changed · +58 −130
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java+0 −7 modified@@ -169,7 +169,6 @@ import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; import org.apache.pulsar.websocket.WebSocketConsumerServlet; -import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -1029,12 +1028,6 @@ private void addWebSocketServiceHandler(WebService webService, new ServletHolder(readerWebSocketServlet), true, attributeMap); webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new ServletHolder(readerWebSocketServlet), true, attributeMap); - - final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService); - webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH, - new ServletHolder(pingPongWebSocketServlet), true, attributeMap); - webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2, - new ServletHolder(pingPongWebSocketServlet), true, attributeMap); } }
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java+0 −7 modified@@ -49,7 +49,6 @@ import org.apache.pulsar.common.util.DirectMemoryUtils; import org.apache.pulsar.proxy.stats.ProxyStats; import org.apache.pulsar.websocket.WebSocketConsumerServlet; -import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -307,12 +306,6 @@ public static void addWebServerHandlers(WebServer server, new ServletHolder(readerWebSocketServlet)); server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new ServletHolder(readerWebSocketServlet)); - - final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService); - server.addServlet(WebSocketPingPongServlet.SERVLET_PATH, - new ServletHolder(pingPongWebSocketServlet)); - server.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2, - new ServletHolder(pingPongWebSocketServlet)); } }
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java+0 −12 modified@@ -76,18 +76,6 @@ private String computeWsBasePath() { return String.format("ws://localhost:%d/ws", serviceStarter.getServer().getListenPortHTTP().get()); } - @Test - public void testEnableWebSocketServer() throws Exception { - HttpClient httpClient = new HttpClient(); - WebSocketClient webSocketClient = new WebSocketClient(httpClient); - webSocketClient.start(); - MyWebSocket myWebSocket = new MyWebSocket(); - String webSocketUri = computeWsBasePath() + "/pingpong"; - Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri)); - System.out.println("uri" + webSocketUri); - sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes())); - assertTrue(myWebSocket.getResponse().contains("ping")); - } @Test public void testProducer() throws Exception {
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/PingPongHandler.java+0 −50 removed@@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.websocket; - -import java.io.IOException; -import java.nio.ByteBuffer; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.websocket.api.WebSocketAdapter; -import org.eclipse.jetty.websocket.api.WebSocketPingPongListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PingPongHandler extends WebSocketAdapter implements WebSocketPingPongListener { - - private static final Logger log = LoggerFactory.getLogger(PingPongHandler.class); - - @Override - public void onWebSocketPing(ByteBuffer payload) { - try { - if (log.isDebugEnabled()) { - log.debug("PING: {}", BufferUtil.toDetailString(payload)); - } - getRemote().sendPong(payload); - } catch (IOException e) { - log.warn("Failed to send pong: {}", e.getMessage()); - } - } - - @Override - public void onWebSocketPong(ByteBuffer payload) { - - } - -} \ No newline at end of file
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java+0 −4 modified@@ -28,7 +28,6 @@ import org.apache.pulsar.common.configuration.VipStatus; import org.apache.pulsar.common.util.CmdGenerateDocs; import org.apache.pulsar.websocket.WebSocketConsumerServlet; -import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -90,16 +89,13 @@ public static void start(ProxyServer proxyServer, WebSocketService service) thro proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH, new WebSocketProducerServlet(service)); proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH, new WebSocketConsumerServlet(service)); proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH, new WebSocketReaderServlet(service)); - proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH, new WebSocketPingPongServlet(service)); proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH_V2, new WebSocketProducerServlet(service)); proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH_V2, new WebSocketConsumerServlet(service)); proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new WebSocketReaderServlet(service)); - proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH_V2, - new WebSocketPingPongServlet(service)); proxyServer.addRestResource(ADMIN_PATH_V1, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV1.class); proxyServer.addRestResource(ADMIN_PATH_V2, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV2.class);
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketPingPongServlet.java+0 −44 removed@@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.websocket; - -import org.eclipse.jetty.websocket.servlet.WebSocketServlet; -import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; - -public class WebSocketPingPongServlet extends WebSocketServlet { - private static final long serialVersionUID = 1L; - - public static final String SERVLET_PATH = "/ws/pingpong"; - public static final String SERVLET_PATH_V2 = "/ws/v2/pingpong"; - - private final transient WebSocketService service; - - public WebSocketPingPongServlet(WebSocketService service) { - this.service = service; - } - - @Override - public void configure(WebSocketServletFactory factory) { - factory.getPolicy().setMaxTextMessageSize(service.getConfig().getWebSocketMaxTextFrameSize()); - if (service.getConfig().getWebSocketSessionIdleTimeoutMillis() > 0) { - factory.getPolicy().setIdleTimeout(service.getConfig().getWebSocketSessionIdleTimeoutMillis()); - } - factory.setCreator((request, response) -> new PingPongHandler()); - } -} \ No newline at end of file
pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java+58 −6 renamed@@ -18,13 +18,16 @@ */ package org.apache.pulsar.websocket; +import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Future; +import javax.servlet.http.HttpServletRequest; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.web.WebExecutorThreadPool; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.server.Server; @@ -40,11 +43,18 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertTrue; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -public class PingPongHandlerTest { +/** + * Test to ensure {@link AbstractWebSocketHandler} has ping/pong support + */ +public class PingPongSupportTest { private static Server server; @@ -67,9 +77,9 @@ public static void setup() throws Exception { when(config.getWebSocketMaxTextFrameSize()).thenReturn(1048576); when(config.getWebSocketSessionIdleTimeoutMillis()).thenReturn(300000); - ServletHolder servletHolder = new ServletHolder("ws-events", new WebSocketPingPongServlet(service)); + ServletHolder servletHolder = new ServletHolder("ws-events", new GenericWebSocketServlet(service)); ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); - context.setContextPath(WebSocketPingPongServlet.SERVLET_PATH); + context.setContextPath("/ws"); context.addServlet(servletHolder, "/*"); server.setHandler(context); try { @@ -87,18 +97,60 @@ public static void tearDown() throws Exception { executor.stop(); } - @Test - public void testPingPong() throws Exception { + /** + * We test these different endpoints because they are parsed in the AbstractWebSocketHandler. Technically, we are + * not testing these implementations, but the ping/pong support is guaranteed as part of the framework. + */ + @DataProvider(name = "endpoint") + public static Object[][] cacheEnable() { + return new Object[][] { { "producer" }, { "consumer" }, { "reader" } }; + } + + @Test(dataProvider = "endpoint") + public void testPingPong(String endpoint) throws Exception { HttpClient httpClient = new HttpClient(); WebSocketClient webSocketClient = new WebSocketClient(httpClient); webSocketClient.start(); MyWebSocket myWebSocket = new MyWebSocket(); - String webSocketUri = "ws://localhost:8080/ws/pingpong"; + String webSocketUri = "ws://localhost:8080/ws/v2/" + endpoint + "/persistent/my-property/my-ns/my-topic"; Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri)); sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("test".getBytes())); assertTrue(myWebSocket.getResponse().contains("test")); } + public static class GenericWebSocketHandler extends AbstractWebSocketHandler { + + public GenericWebSocketHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) { + super(service, request, response); + } + + @Override + protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception { + return true; + } + + @Override + public void close() throws IOException { + + } + } + + public static class GenericWebSocketServlet extends WebSocketServlet { + + private static final long serialVersionUID = 1L; + private final WebSocketService service; + + public GenericWebSocketServlet(WebSocketService service) { + this.service = service; + } + + @Override + public void configure(WebSocketServletFactory factory) { + factory.setCreator((request, response) -> + new GenericWebSocketHandler(service, request.getHttpServletRequest(), response)); + } + } + @WebSocket public static class MyWebSocket extends WebSocketAdapter implements WebSocketPingPongListener {
894192fb6542[fix][ws] Remove unnecessary ping/pong implementation (#20733)
7 files changed · +58 −130
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java+0 −7 modified@@ -173,7 +173,6 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider; import org.apache.pulsar.websocket.WebSocketConsumerServlet; -import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -1072,12 +1071,6 @@ private void addWebSocketServiceHandler(WebService webService, new ServletHolder(readerWebSocketServlet), true, attributeMap); webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new ServletHolder(readerWebSocketServlet), true, attributeMap); - - final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService); - webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH, - new ServletHolder(pingPongWebSocketServlet), true, attributeMap); - webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2, - new ServletHolder(pingPongWebSocketServlet), true, attributeMap); } }
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java+0 −7 modified@@ -51,7 +51,6 @@ import org.apache.pulsar.common.util.ShutdownUtil; import org.apache.pulsar.proxy.stats.ProxyStats; import org.apache.pulsar.websocket.WebSocketConsumerServlet; -import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -315,12 +314,6 @@ public static void addWebServerHandlers(WebServer server, new ServletHolder(readerWebSocketServlet)); server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new ServletHolder(readerWebSocketServlet)); - - final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService); - server.addServlet(WebSocketPingPongServlet.SERVLET_PATH, - new ServletHolder(pingPongWebSocketServlet)); - server.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2, - new ServletHolder(pingPongWebSocketServlet)); } }
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java+0 −12 modified@@ -76,18 +76,6 @@ private String computeWsBasePath() { return String.format("ws://localhost:%d/ws", serviceStarter.getServer().getListenPortHTTP().get()); } - @Test - public void testEnableWebSocketServer() throws Exception { - HttpClient httpClient = new HttpClient(); - WebSocketClient webSocketClient = new WebSocketClient(httpClient); - webSocketClient.start(); - MyWebSocket myWebSocket = new MyWebSocket(); - String webSocketUri = computeWsBasePath() + "/pingpong"; - Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri)); - System.out.println("uri" + webSocketUri); - sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes())); - assertTrue(myWebSocket.getResponse().contains("ping")); - } @Test public void testProducer() throws Exception {
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/PingPongHandler.java+0 −50 removed@@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.websocket; - -import java.io.IOException; -import java.nio.ByteBuffer; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.websocket.api.WebSocketAdapter; -import org.eclipse.jetty.websocket.api.WebSocketPingPongListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PingPongHandler extends WebSocketAdapter implements WebSocketPingPongListener { - - private static final Logger log = LoggerFactory.getLogger(PingPongHandler.class); - - @Override - public void onWebSocketPing(ByteBuffer payload) { - try { - if (log.isDebugEnabled()) { - log.debug("PING: {}", BufferUtil.toDetailString(payload)); - } - getRemote().sendPong(payload); - } catch (IOException e) { - log.warn("Failed to send pong: {}", e.getMessage()); - } - } - - @Override - public void onWebSocketPong(ByteBuffer payload) { - - } - -} \ No newline at end of file
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java+0 −4 modified@@ -29,7 +29,6 @@ import org.apache.pulsar.common.util.CmdGenerateDocs; import org.apache.pulsar.common.util.ShutdownUtil; import org.apache.pulsar.websocket.WebSocketConsumerServlet; -import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -91,16 +90,13 @@ public static void start(ProxyServer proxyServer, WebSocketService service) thro proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH, new WebSocketProducerServlet(service)); proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH, new WebSocketConsumerServlet(service)); proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH, new WebSocketReaderServlet(service)); - proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH, new WebSocketPingPongServlet(service)); proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH_V2, new WebSocketProducerServlet(service)); proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH_V2, new WebSocketConsumerServlet(service)); proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new WebSocketReaderServlet(service)); - proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH_V2, - new WebSocketPingPongServlet(service)); proxyServer.addRestResource(ADMIN_PATH_V1, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV1.class); proxyServer.addRestResource(ADMIN_PATH_V2, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV2.class);
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketPingPongServlet.java+0 −44 removed@@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.websocket; - -import org.eclipse.jetty.websocket.servlet.WebSocketServlet; -import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; - -public class WebSocketPingPongServlet extends WebSocketServlet { - private static final long serialVersionUID = 1L; - - public static final String SERVLET_PATH = "/ws/pingpong"; - public static final String SERVLET_PATH_V2 = "/ws/v2/pingpong"; - - private final transient WebSocketService service; - - public WebSocketPingPongServlet(WebSocketService service) { - this.service = service; - } - - @Override - public void configure(WebSocketServletFactory factory) { - factory.getPolicy().setMaxTextMessageSize(service.getConfig().getWebSocketMaxTextFrameSize()); - if (service.getConfig().getWebSocketSessionIdleTimeoutMillis() > 0) { - factory.getPolicy().setIdleTimeout(service.getConfig().getWebSocketSessionIdleTimeoutMillis()); - } - factory.setCreator((request, response) -> new PingPongHandler()); - } -} \ No newline at end of file
pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java+58 −6 renamed@@ -18,13 +18,16 @@ */ package org.apache.pulsar.websocket; +import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Future; +import javax.servlet.http.HttpServletRequest; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.web.WebExecutorThreadPool; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.server.Server; @@ -40,11 +43,18 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertTrue; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -public class PingPongHandlerTest { +/** + * Test to ensure {@link AbstractWebSocketHandler} has ping/pong support + */ +public class PingPongSupportTest { private static Server server; @@ -67,9 +77,9 @@ public static void setup() throws Exception { when(config.getWebSocketMaxTextFrameSize()).thenReturn(1048576); when(config.getWebSocketSessionIdleTimeoutMillis()).thenReturn(300000); - ServletHolder servletHolder = new ServletHolder("ws-events", new WebSocketPingPongServlet(service)); + ServletHolder servletHolder = new ServletHolder("ws-events", new GenericWebSocketServlet(service)); ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); - context.setContextPath(WebSocketPingPongServlet.SERVLET_PATH); + context.setContextPath("/ws"); context.addServlet(servletHolder, "/*"); server.setHandler(context); try { @@ -87,18 +97,60 @@ public static void tearDown() throws Exception { executor.stop(); } - @Test - public void testPingPong() throws Exception { + /** + * We test these different endpoints because they are parsed in the AbstractWebSocketHandler. Technically, we are + * not testing these implementations, but the ping/pong support is guaranteed as part of the framework. + */ + @DataProvider(name = "endpoint") + public static Object[][] cacheEnable() { + return new Object[][] { { "producer" }, { "consumer" }, { "reader" } }; + } + + @Test(dataProvider = "endpoint") + public void testPingPong(String endpoint) throws Exception { HttpClient httpClient = new HttpClient(); WebSocketClient webSocketClient = new WebSocketClient(httpClient); webSocketClient.start(); MyWebSocket myWebSocket = new MyWebSocket(); - String webSocketUri = "ws://localhost:8080/ws/pingpong"; + String webSocketUri = "ws://localhost:8080/ws/v2/" + endpoint + "/persistent/my-property/my-ns/my-topic"; Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri)); sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("test".getBytes())); assertTrue(myWebSocket.getResponse().contains("test")); } + public static class GenericWebSocketHandler extends AbstractWebSocketHandler { + + public GenericWebSocketHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) { + super(service, request, response); + } + + @Override + protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception { + return true; + } + + @Override + public void close() throws IOException { + + } + } + + public static class GenericWebSocketServlet extends WebSocketServlet { + + private static final long serialVersionUID = 1L; + private final WebSocketService service; + + public GenericWebSocketServlet(WebSocketService service) { + this.service = service; + } + + @Override + public void configure(WebSocketServletFactory factory) { + factory.setCreator((request, response) -> + new GenericWebSocketHandler(service, request.getHttpServletRequest(), response)); + } + } + @WebSocket public static class MyWebSocket extends WebSocketAdapter implements WebSocketPingPongListener {
eac263e8f2a9[fix][ws] Remove unnecessary ping/pong implementation (#20733)
7 files changed · +58 −130
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java+0 −7 modified@@ -167,7 +167,6 @@ import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; import org.apache.pulsar.websocket.WebSocketConsumerServlet; -import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -968,12 +967,6 @@ private void addWebSocketServiceHandler(WebService webService, new ServletHolder(readerWebSocketServlet), true, attributeMap); webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new ServletHolder(readerWebSocketServlet), true, attributeMap); - - final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService); - webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH, - new ServletHolder(pingPongWebSocketServlet), true, attributeMap); - webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2, - new ServletHolder(pingPongWebSocketServlet), true, attributeMap); } }
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java+0 −7 modified@@ -51,7 +51,6 @@ import org.apache.pulsar.common.util.CmdGenerateDocs; import org.apache.pulsar.proxy.stats.ProxyStats; import org.apache.pulsar.websocket.WebSocketConsumerServlet; -import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -303,12 +302,6 @@ public static void addWebServerHandlers(WebServer server, new ServletHolder(readerWebSocketServlet)); server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new ServletHolder(readerWebSocketServlet)); - - final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService); - server.addServlet(WebSocketPingPongServlet.SERVLET_PATH, - new ServletHolder(pingPongWebSocketServlet)); - server.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2, - new ServletHolder(pingPongWebSocketServlet)); } }
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java+0 −12 modified@@ -79,18 +79,6 @@ private String computeWsBasePath() { return String.format("ws://localhost:%d/ws", serviceStarter.getServer().getListenPortHTTP().get()); } - @Test - public void testEnableWebSocketServer() throws Exception { - HttpClient httpClient = new HttpClient(); - WebSocketClient webSocketClient = new WebSocketClient(httpClient); - webSocketClient.start(); - MyWebSocket myWebSocket = new MyWebSocket(); - String webSocketUri = computeWsBasePath() + "/pingpong"; - Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri)); - System.out.println("uri" + webSocketUri); - sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes())); - assertTrue(myWebSocket.getResponse().contains("ping")); - } @Test public void testProducer() throws Exception {
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/PingPongHandler.java+0 −50 removed@@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.websocket; - -import java.io.IOException; -import java.nio.ByteBuffer; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.websocket.api.WebSocketAdapter; -import org.eclipse.jetty.websocket.api.WebSocketPingPongListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PingPongHandler extends WebSocketAdapter implements WebSocketPingPongListener { - - private static final Logger log = LoggerFactory.getLogger(PingPongHandler.class); - - @Override - public void onWebSocketPing(ByteBuffer payload) { - try { - if (log.isDebugEnabled()) { - log.debug("PING: {}", BufferUtil.toDetailString(payload)); - } - getRemote().sendPong(payload); - } catch (IOException e) { - log.warn("Failed to send pong: {}", e.getMessage()); - } - } - - @Override - public void onWebSocketPong(ByteBuffer payload) { - - } - -} \ No newline at end of file
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java+0 −4 modified@@ -28,7 +28,6 @@ import org.apache.pulsar.common.configuration.VipStatus; import org.apache.pulsar.common.util.CmdGenerateDocs; import org.apache.pulsar.websocket.WebSocketConsumerServlet; -import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -90,16 +89,13 @@ public static void start(ProxyServer proxyServer, WebSocketService service) thro proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH, new WebSocketProducerServlet(service)); proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH, new WebSocketConsumerServlet(service)); proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH, new WebSocketReaderServlet(service)); - proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH, new WebSocketPingPongServlet(service)); proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH_V2, new WebSocketProducerServlet(service)); proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH_V2, new WebSocketConsumerServlet(service)); proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new WebSocketReaderServlet(service)); - proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH_V2, - new WebSocketPingPongServlet(service)); proxyServer.addRestResources(ADMIN_PATH_V1, WebSocketProxyStatsV1.class.getPackage().getName(), ATTRIBUTE_PROXY_SERVICE_NAME, service);
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketPingPongServlet.java+0 −44 removed@@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.websocket; - -import org.eclipse.jetty.websocket.servlet.WebSocketServlet; -import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; - -public class WebSocketPingPongServlet extends WebSocketServlet { - private static final long serialVersionUID = 1L; - - public static final String SERVLET_PATH = "/ws/pingpong"; - public static final String SERVLET_PATH_V2 = "/ws/v2/pingpong"; - - private final transient WebSocketService service; - - public WebSocketPingPongServlet(WebSocketService service) { - this.service = service; - } - - @Override - public void configure(WebSocketServletFactory factory) { - factory.getPolicy().setMaxTextMessageSize(service.getConfig().getWebSocketMaxTextFrameSize()); - if (service.getConfig().getWebSocketSessionIdleTimeoutMillis() > 0) { - factory.getPolicy().setIdleTimeout(service.getConfig().getWebSocketSessionIdleTimeoutMillis()); - } - factory.setCreator((request, response) -> new PingPongHandler()); - } -} \ No newline at end of file
pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java+58 −6 renamed@@ -18,13 +18,16 @@ */ package org.apache.pulsar.websocket; +import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Future; +import javax.servlet.http.HttpServletRequest; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.web.WebExecutorThreadPool; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.server.Server; @@ -40,11 +43,18 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertTrue; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -public class PingPongHandlerTest { +/** + * Test to ensure {@link AbstractWebSocketHandler} has ping/pong support + */ +public class PingPongSupportTest { private static Server server; @@ -67,9 +77,9 @@ public static void setup() throws Exception { when(config.getWebSocketMaxTextFrameSize()).thenReturn(1048576); when(config.getWebSocketSessionIdleTimeoutMillis()).thenReturn(300000); - ServletHolder servletHolder = new ServletHolder("ws-events", new WebSocketPingPongServlet(service)); + ServletHolder servletHolder = new ServletHolder("ws-events", new GenericWebSocketServlet(service)); ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); - context.setContextPath(WebSocketPingPongServlet.SERVLET_PATH); + context.setContextPath("/ws"); context.addServlet(servletHolder, "/*"); server.setHandler(context); try { @@ -87,18 +97,60 @@ public static void tearDown() throws Exception { executor.stop(); } - @Test - public void testPingPong() throws Exception { + /** + * We test these different endpoints because they are parsed in the AbstractWebSocketHandler. Technically, we are + * not testing these implementations, but the ping/pong support is guaranteed as part of the framework. + */ + @DataProvider(name = "endpoint") + public static Object[][] cacheEnable() { + return new Object[][] { { "producer" }, { "consumer" }, { "reader" } }; + } + + @Test(dataProvider = "endpoint") + public void testPingPong(String endpoint) throws Exception { HttpClient httpClient = new HttpClient(); WebSocketClient webSocketClient = new WebSocketClient(httpClient); webSocketClient.start(); MyWebSocket myWebSocket = new MyWebSocket(); - String webSocketUri = "ws://localhost:8080/ws/pingpong"; + String webSocketUri = "ws://localhost:8080/ws/v2/" + endpoint + "/persistent/my-property/my-ns/my-topic"; Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri)); sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("test".getBytes())); assertTrue(myWebSocket.getResponse().contains("test")); } + public static class GenericWebSocketHandler extends AbstractWebSocketHandler { + + public GenericWebSocketHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) { + super(service, request, response); + } + + @Override + protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception { + return true; + } + + @Override + public void close() throws IOException { + + } + } + + public static class GenericWebSocketServlet extends WebSocketServlet { + + private static final long serialVersionUID = 1L; + private final WebSocketService service; + + public GenericWebSocketServlet(WebSocketService service) { + this.service = service; + } + + @Override + public void configure(WebSocketServletFactory factory) { + factory.setCreator((request, response) -> + new GenericWebSocketHandler(service, request.getHttpServletRequest(), response)); + } + } + @WebSocket public static class MyWebSocket extends WebSocketAdapter implements WebSocketPingPongListener {
Vulnerability mechanics
Generated on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
7- github.com/advisories/GHSA-83q5-whqp-r8jrghsaADVISORY
- lists.apache.org/thread/od0k9zts1toc9h9snbqq4pjpyx28mv4mghsavendor-advisoryWEB
- nvd.nist.gov/vuln/detail/CVE-2023-37544ghsaADVISORY
- www.openwall.com/lists/oss-security/2023/12/20/2ghsaWEB
- github.com/apache/pulsar/commit/11ee36d0351644a006d2a8639bdcc714fb602358ghsaWEB
- github.com/apache/pulsar/commit/894192fb6542e504be43034a3c33e90f9c6e528aghsaWEB
- github.com/apache/pulsar/commit/eac263e8f2a93d3b9f707b97c7bbcbc2a826569fghsaWEB
News mentions
0No linked articles in our index yet.