High severityGHSA Advisory· Published Sep 11, 2025· Updated Apr 15, 2026
CVE-2025-10193
CVE-2025-10193
Description
DNS rebinding vulnerability in Neo4j Cypher MCP server allows malicious websites to bypass Same-Origin Policy protections and execute unauthorised tool invocations against locally running Neo4j MCP instances. The attack relies on the user being enticed to visit a malicious website and spend sufficient time there for DNS rebinding to succeed.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
mcp-neo4j-cypherPyPI | >= 0.2.2, < 0.4.0 | 0.4.0 |
Affected products
1- Range: >= 0.2.2, < 0.4.0
Patches
15b9fbdda6401cypher - add middleware (#165)
12 files changed · +673 −25
servers/mcp-neo4j-cypher/CHANGELOG.md+3 −0 modified@@ -7,6 +7,9 @@ ### Added * Added Cypher result sanitation function from Neo4j GraphRAG that removes embedding values from the result +* Add env variable `NEO4J_MCP_SERVER_ALLOW_ORIGINS` and cli variable `--allow-origins` to configure CORS Middleware for remote deployments +* Add env variable `NEO4J_MCP_SERVER_ALLOWED_HOSTS` and cli variable `--allowed-hosts` to configure Trusted Hosts Middleware for remote deployments +* Update HTTP and SSE transports to use security middleware * Added `read_neo4j_cypher` query timeout configuration via `--read-timeout` CLI parameter and `NEO4J_READ_TIMEOUT` environment variable (defaults to 30 seconds) * Add response token limit for read Cypher responses
servers/mcp-neo4j-cypher/docker-compose.yml+2 −0 modified@@ -25,6 +25,8 @@ services: - NEO4J_MCP_SERVER_PORT=8000 - NEO4J_MCP_SERVER_PATH=/api/mcp/ - NEO4J_NAMESPACE=local + - NEO4J_MCP_SERVER_ALLOWED_HOSTS=localhost,127.0.0.1 + - NEO4J_MCP_SERVER_ALLOW_ORIGINS="" depends_on: - neo4j
servers/mcp-neo4j-cypher/Dockerfile+3 −1 modified@@ -26,10 +26,12 @@ ENV NEO4J_USERNAME="neo4j" ENV NEO4J_PASSWORD="password" ENV NEO4J_DATABASE="neo4j" ENV NEO4J_NAMESPACE="" -ENV NEO4J_TRANSPORT="stdio" +ENV NEO4J_TRANSPORT="http" ENV NEO4J_MCP_SERVER_HOST="0.0.0.0" ENV NEO4J_MCP_SERVER_PORT="8000" ENV NEO4J_MCP_SERVER_PATH="/api/mcp/" +ENV NEO4J_MCP_SERVER_ALLOW_ORIGINS="" +ENV NEO4J_MCP_SERVER_ALLOWED_HOSTS="localhost,127.0.0.1" EXPOSE 8000
servers/mcp-neo4j-cypher/Makefile+1 −1 modified@@ -8,7 +8,7 @@ test-integration: uv run pytest tests/integration/ -v test-http: - uv run pytest tests/integration/test_http_transport.py -v + uv run pytest tests/integration/test_http_transport_IT.py -v test-all: uv run pytest tests/ -v
servers/mcp-neo4j-cypher/manifest.json+18 −0 modified@@ -32,6 +32,8 @@ "NEO4J_MCP_SERVER_HOST": "${user_config.mcp_server_host}", "NEO4J_MCP_SERVER_PORT": "${user_config.mcp_server_port}", "NEO4J_MCP_SERVER_PATH": "${user_config.mcp_server_path}", + "NEO4J_MCP_SERVER_ALLOW_ORIGINS": "${user_config.mcp_server_allow_origins}", + "NEO4J_MCP_SERVER_ALLOWED_HOSTS": "${user_config.mcp_server_allowed_hosts}", "NEO4J_RESPONSE_TOKEN_LIMIT": "${user_config.token_limit}", "NEO4J_READ_TIMEOUT": "${user_config.read_timeout}" } @@ -127,6 +129,14 @@ "required": false, "sensitive": false }, + "mcp_server_allow_origins": { + "type": "string", + "title": "MCP Server Allow Origins", + "description": "The allowed origins for the MCP server as a comma-separated list, if not using stdio. Defaults to no allowed origins", + "default": "", + "required": false, + "sensitive": false + }, "token_limit": { "type": "number", "title": "Response token limit", @@ -135,6 +145,14 @@ "required": false, "sensitive": false }, + "mcp_server_allowed_hosts": { + "type": "string", + "title": "MCP Server Allowed Hosts", + "description": "The allowed hosts for the MCP server as a comma-separated list, if not using stdio. Defaults to localhost and 127.0.0.1", + "default": "localhost,127.0.0.1", + "required": false, + "sensitive": false + }, "read_timeout": { "type": "number", "title": "Read timeout",
servers/mcp-neo4j-cypher/README.md+97 −19 modified@@ -138,6 +138,61 @@ Choose your transport based on use case: - **Remote deployment**: Use `http` - **Legacy web clients**: Use `sse` +## 🔒 Security Protection + +The server includes comprehensive security protection with **secure defaults** that protect against common web-based attacks while preserving full MCP functionality when using HTTP transport. + +### 🛡️ DNS Rebinding Protection + +**TrustedHost Middleware** validates Host headers to prevent DNS rebinding attacks: + +**Secure by Default:** +- Only `localhost` and `127.0.0.1` hosts are allowed by default +- Malicious websites cannot trick browsers into accessing your local server + +**Environment Variable:** +```bash +export NEO4J_MCP_SERVER_ALLOWED_HOSTS="example.com,www.example.com" +``` + +### 🌐 CORS Protection + +**Cross-Origin Resource Sharing (CORS)** protection blocks browser-based requests by default: + +**Environment Variable:** +```bash +export NEO4J_MCP_SERVER_ALLOW_ORIGINS="https://example.com,https://example.com" +``` + +### 🔧 Complete Security Configuration + +**Development Setup:** +```bash +mcp-neo4j-cypher --transport http \ + --allowed-hosts "localhost,127.0.0.1" \ + --allow-origins "http://localhost:3000" +``` + +**Production Setup:** +```bash +mcp-neo4j-cypher --transport http \ + --allowed-hosts "example.com,www.example.com" \ + --allow-origins "https://example.com,https://example.com" +``` + + +### 🚨 Security Best Practices + +**For `allow_origins`:** +- Be specific: `["https://example.com", "https://example.com"]` +- Never use `"*"` in production with credentials +- Use HTTPS origins in production + +**For `allowed_hosts`:** +- Include your actual domain: `["example.com", "www.example.com"]` +- Include localhost only for development +- Never use `"*"` unless you understand the risks + ## 🔧 Usage with Claude Desktop ### Using DXT @@ -148,7 +203,7 @@ Download the latest `.dxt` file from the [releases page](https://github.com/neo4 Can be found on PyPi https://pypi.org/project/mcp-neo4j-cypher/ -Add the server to your `claude_desktop_config.json` with the database connection configuration through environment variables. You may also specify the transport method and namespace with cli arguments or environment variables. +Add the server to your `claude_desktop_config.json` with the database connection configuration through environment variables. You may also specify the transport method, namespace and other config variables with cli arguments or environment variables. ```json { @@ -169,17 +224,24 @@ Add the server to your `claude_desktop_config.json` with the database connection ### 🌐 HTTP Transport Configuration -For custom HTTP configurations beyond the defaults: +For custom HTTP configurations with security middleware: ```bash -# Custom HTTP configuration -mcp-neo4j-cypher --transport http --server-host 127.0.0.1 --server-port 8080 --server-path /api/mcp/ - -# Or using environment variables +# Complete HTTP configuration with security +mcp-neo4j-cypher --transport http \ + --server-host 127.0.0.1 \ + --server-port 8080 \ + --server-path /api/mcp/ \ + --allowed-hosts "localhost,127.0.0.1,example.com" \ + --allow-origins "https://yourapp.com" + +# Using environment variables export NEO4J_TRANSPORT=http export NEO4J_MCP_SERVER_HOST=127.0.0.1 export NEO4J_MCP_SERVER_PORT=8080 export NEO4J_MCP_SERVER_PATH=/api/mcp/ +export NEO4J_MCP_SERVER_ALLOWED_HOSTS="localhost,127.0.0.1,example.com" +export NEO4J_MCP_SERVER_ALLOW_ORIGINS="https://yourapp.com" mcp-neo4j-cypher ``` @@ -310,23 +372,39 @@ docker run --rm -p 8000:8000 \ -e NEO4J_MCP_SERVER_PORT="8000" \ -e NEO4J_MCP_SERVER_PATH="/mcp/" \ mcp/neo4j-cypher:latest + +# Run with security middleware for production +docker run --rm -p 8000:8000 \ + -e NEO4J_URI="bolt://host.docker.internal:7687" \ + -e NEO4J_USERNAME="neo4j" \ + -e NEO4J_PASSWORD="password" \ + -e NEO4J_DATABASE="neo4j" \ + -e NEO4J_TRANSPORT="http" \ + -e NEO4J_MCP_SERVER_HOST="0.0.0.0" \ + -e NEO4J_MCP_SERVER_PORT="8000" \ + -e NEO4J_MCP_SERVER_PATH="/mcp/" \ + -e NEO4J_MCP_SERVER_ALLOWED_HOSTS="example.com,www.example.com" \ + -e NEO4J_MCP_SERVER_ALLOW_ORIGINS="https://example.com" \ + mcp/neo4j-cypher:latest ``` ### 🔧 Environment Variables -| Variable | Default | Description | -| ----------------------------- | --------------------------------------- | ---------------------------------------------- | -| `NEO4J_URI` | `bolt://localhost:7687` | Neo4j connection URI | -| `NEO4J_USERNAME` | `neo4j` | Neo4j username | -| `NEO4J_PASSWORD` | `password` | Neo4j password | -| `NEO4J_DATABASE` | `neo4j` | Neo4j database name | -| `NEO4J_TRANSPORT` | `stdio` (local), `http` (remote) | Transport protocol (`stdio`, `http`, or `sse`) | -| `NEO4J_NAMESPACE` | _(empty)_ | Tool namespace prefix | -| `NEO4J_MCP_SERVER_HOST` | `127.0.0.1` (local) | Host to bind to | -| `NEO4J_MCP_SERVER_PORT` | `8000` | Port for HTTP/SSE transport | -| `NEO4J_MCP_SERVER_PATH` | `/api/mcp/` | Path for accessing MCP server | -| `NEO4J_RESPONSE_TOKEN_LIMIT` | _(none)_ | Maximum tokens for read query responses | -| `NEO4J_READ_TIMEOUT` | `30` | Timeout in seconds for read queries | +| Variable | Default | Description | +| ---------------------------------- | --------------------------------------- | -------------------------------------------------- | +| `NEO4J_URI` | `bolt://localhost:7687` | Neo4j connection URI | +| `NEO4J_USERNAME` | `neo4j` | Neo4j username | +| `NEO4J_PASSWORD` | `password` | Neo4j password | +| `NEO4J_DATABASE` | `neo4j` | Neo4j database name | +| `NEO4J_TRANSPORT` | `stdio` (local), `http` (remote) | Transport protocol (`stdio`, `http`, or `sse`) | +| `NEO4J_NAMESPACE` | _(empty)_ | Tool namespace prefix | +| `NEO4J_MCP_SERVER_HOST` | `127.0.0.1` (local) | Host to bind to | +| `NEO4J_MCP_SERVER_PORT` | `8000` | Port for HTTP/SSE transport | +| `NEO4J_MCP_SERVER_PATH` | `/api/mcp/` | Path for accessing MCP server | +| `NEO4J_MCP_SERVER_ALLOW_ORIGINS` | _(empty - secure by default)_ | Comma-separated list of allowed CORS origins | +| `NEO4J_MCP_SERVER_ALLOWED_HOSTS` | `localhost,127.0.0.1` | Comma-separated list of allowed hosts (DNS rebinding protection) | +| `NEO4J_RESPONSE_TOKEN_LIMIT` | _(none)_ | Maximum tokens for read query responses | +| `NEO4J_READ_TIMEOUT` | `30` | Timeout in seconds for read queries | ### 🌐 SSE Transport for Legacy Web Access
servers/mcp-neo4j-cypher/src/mcp_neo4j_cypher/__init__.py+10 −0 modified@@ -21,6 +21,16 @@ def main(): ) parser.add_argument("--server-host", default=None, help="Server host") parser.add_argument("--server-port", default=None, help="Server port") + parser.add_argument( + "--allow-origins", + default=None, + help="Allow origins for remote servers (comma-separated list)", + ) + parser.add_argument( + "--allowed-hosts", + default=None, + help="Allowed hosts for DNS rebinding protection on remote servers(comma-separated list)", + ) parser.add_argument( "--read-timeout", type=int,
servers/mcp-neo4j-cypher/src/mcp_neo4j_cypher/server.py+22 −3 modified@@ -10,6 +10,11 @@ from neo4j import AsyncDriver, AsyncGraphDatabase, RoutingControl, Query from neo4j.exceptions import ClientError, Neo4jError from pydantic import Field +from starlette.middleware import Middleware +from starlette.middleware.cors import CORSMiddleware +from starlette.middleware.trustedhost import TrustedHostMiddleware + +from .utils import _value_sanitize from .utils import _value_sanitize, _truncate_string_to_tokens logger = logging.getLogger("mcp_neo4j_cypher") @@ -263,6 +268,8 @@ async def main( host: str = "127.0.0.1", port: int = 8000, path: str = "/mcp/", + allow_origins: list[str] = [], + allowed_hosts: list[str] = [], read_timeout: int = 30, token_limit: Optional[int] = None, ) -> None: @@ -275,7 +282,17 @@ async def main( password, ), ) - + custom_middleware = [ + Middleware( + CORSMiddleware, + allow_origins=allow_origins, + allow_methods=["GET", "POST"], + allow_headers=["*"], + ), + Middleware(TrustedHostMiddleware, + allowed_hosts=allowed_hosts) + ] + mcp = create_mcp_server(neo4j_driver, database, namespace, read_timeout, token_limit) # Run the server with the specified transport @@ -284,15 +301,17 @@ async def main( logger.info( f"Running Neo4j Cypher MCP Server with HTTP transport on {host}:{port}..." ) - await mcp.run_http_async(host=host, port=port, path=path) + await mcp.run_http_async( + host=host, port=port, path=path, middleware=custom_middleware + ) case "stdio": logger.info("Running Neo4j Cypher MCP Server with stdio transport...") await mcp.run_stdio_async() case "sse": logger.info( f"Running Neo4j Cypher MCP Server with SSE transport on {host}:{port}..." ) - await mcp.run_sse_async(host=host, port=port, path=path) + await mcp.run_http_async(host=host, port=port, path=path, middleware=custom_middleware, transport="sse") case _: logger.error( f"Invalid transport: {transport} | Must be either 'stdio', 'sse', or 'http'"
servers/mcp-neo4j-cypher/src/mcp_neo4j_cypher/utils.py+37 −1 modified@@ -170,6 +170,43 @@ def process_config(args: argparse.Namespace) -> dict[str, Union[str, int, None]] ) config["path"] = None + # parse allow origins + if args.allow_origins is not None: + # Handle comma-separated string from CLI + + config["allow_origins"] = [origin.strip() for origin in args.allow_origins.split(",") if origin.strip()] + + else: + if os.getenv("NEO4J_MCP_SERVER_ALLOW_ORIGINS") is not None: + # split comma-separated string into list + config["allow_origins"] = [ + origin.strip() for origin in os.getenv("NEO4J_MCP_SERVER_ALLOW_ORIGINS", "").split(",") + if origin.strip() + ] + else: + logger.info( + "Info: No allow origins provided. Defaulting to no allowed origins." + ) + config["allow_origins"] = list() + + # parse allowed hosts for DNS rebinding protection + if args.allowed_hosts is not None: + # Handle comma-separated string from CLI + config["allowed_hosts"] = [host.strip() for host in args.allowed_hosts.split(",") if host.strip()] + + else: + if os.getenv("NEO4J_MCP_SERVER_ALLOWED_HOSTS") is not None: + # split comma-separated string into list + config["allowed_hosts"] = [ + host.strip() for host in os.getenv("NEO4J_MCP_SERVER_ALLOWED_HOSTS", "").split(",") + if host.strip() + ] + else: + logger.info( + "Info: No allowed hosts provided. Defaulting to secure mode - only localhost and 127.0.0.1 allowed." + ) + config["allowed_hosts"] = ["localhost", "127.0.0.1"] + # parse token limit if args.token_limit is not None: config["token_limit"] = args.token_limit @@ -258,7 +295,6 @@ def _value_sanitize(d: Any, list_limit: int = 128) -> Any: else: return d - def _truncate_string_to_tokens( text: str, token_limit: int, model: str = "gpt-4" ) -> str:
servers/mcp-neo4j-cypher/tests/integration/conftest.py+98 −0 modified@@ -171,3 +171,101 @@ async def http_server(setup: Neo4jContainer): except asyncio.TimeoutError: process.kill() await process.wait() + + +@pytest_asyncio.fixture +async def http_server_restricted_cors(setup: Neo4jContainer): + """Start the MCP server in HTTP mode with restricted CORS origins.""" + + # Start server process in HTTP mode with restricted CORS + process = await asyncio.create_subprocess_exec( + "uv", + "run", + "mcp-neo4j-cypher", + "--transport", + "http", + "--server-host", + "127.0.0.1", + "--server-port", + "8003", + "--allow-origins", + "http://localhost:3000,https://trusted-site.com", + "--db-url", + setup.get_connection_url(), + "--username", + setup.username, + "--password", + setup.password, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=os.getcwd(), + ) + + # Wait for server to start + await asyncio.sleep(3) + + # Check if process is still running + if process.returncode is not None: + stdout, stderr = await process.communicate() + raise RuntimeError( + f"Restricted CORS server failed to start. stdout: {stdout.decode()}, stderr: {stderr.decode()}" + ) + + yield process + + # Cleanup + try: + process.terminate() + await asyncio.wait_for(process.wait(), timeout=5.0) + except asyncio.TimeoutError: + process.kill() + await process.wait() + + +@pytest_asyncio.fixture +async def http_server_custom_hosts(setup: Neo4jContainer): + """Start the MCP server in HTTP mode with custom allowed hosts.""" + + # Start server process in HTTP mode with custom allowed hosts + process = await asyncio.create_subprocess_exec( + "uv", + "run", + "mcp-neo4j-cypher", + "--transport", + "http", + "--server-host", + "127.0.0.1", + "--server-port", + "8004", + "--allowed-hosts", + "example.com,test.local", + "--db-url", + setup.get_connection_url(), + "--username", + setup.username, + "--password", + setup.password, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=os.getcwd(), + ) + + # Wait for server to start + await asyncio.sleep(3) + + # Check if process is still running + if process.returncode is not None: + stdout, stderr = await process.communicate() + raise RuntimeError( + f"Custom hosts server failed to start. stdout: {stdout.decode()}, stderr: {stderr.decode()}" + ) + + yield process + + # Cleanup + try: + process.terminate() + await asyncio.wait_for(process.wait(), timeout=5.0) + except asyncio.TimeoutError: + process.kill() + await process.wait()
servers/mcp-neo4j-cypher/tests/integration/test_http_transport_IT.py+299 −0 modified@@ -241,3 +241,302 @@ async def test_http_full_workflow(http_server): result = await parse_sse_response(response) assert response.status == 200 assert "result" in result + + +# CORS Middleware Tests + + +@pytest.mark.asyncio +async def test_cors_preflight_empty_default_origins(http_server): + """Test CORS preflight request with empty default allowed origins.""" + async with aiohttp.ClientSession() as session: + async with session.options( + "http://127.0.0.1:8001/mcp/", + headers={ + "Origin": "http://localhost:3000", + "Access-Control-Request-Method": "POST", + "Access-Control-Request-Headers": "content-type", + }, + ) as response: + print(f"CORS preflight response status: {response.status}") + print(f"CORS preflight response headers: {dict(response.headers)}") + + # Should return 400 when origin is not in allow_origins (empty list blocks all) + assert response.status == 400 + # Should NOT allow any origin with empty default + cors_origin = response.headers.get("Access-Control-Allow-Origin") + assert cors_origin is None + + +@pytest.mark.asyncio +async def test_cors_preflight_any_origin_blocked(http_server): + """Test CORS preflight request - all origins should be blocked with empty default.""" + async with aiohttp.ClientSession() as session: + async with session.options( + "http://127.0.0.1:8001/mcp/", + headers={ + "Origin": "http://127.0.0.1:3000", + "Access-Control-Request-Method": "POST", + "Access-Control-Request-Headers": "content-type", + }, + ) as response: + # Should return 400 when origin is blocked by empty allow_origins + assert response.status == 400 + # Should not include CORS allow origin header for any origin + cors_origin = response.headers.get("Access-Control-Allow-Origin") + assert cors_origin is None + + +@pytest.mark.asyncio +async def test_cors_preflight_malicious_origin_blocked(http_server): + """Test CORS preflight request with malicious origin (should be blocked).""" + async with aiohttp.ClientSession() as session: + async with session.options( + "http://127.0.0.1:8001/mcp/", + headers={ + "Origin": "http://malicious-site.com", + "Access-Control-Request-Method": "POST", + "Access-Control-Request-Headers": "content-type", + }, + ) as response: + print(f"Malicious origin response status: {response.status}") + print(f"Malicious origin response headers: {dict(response.headers)}") + + # Should return 400 when malicious origin is blocked + assert response.status == 400 + # Should not include CORS headers for any origins (empty default) + cors_origin = response.headers.get("Access-Control-Allow-Origin") + assert cors_origin is None + + +@pytest.mark.asyncio +async def test_cors_actual_request_no_cors_headers(http_server): + """Test actual request without Origin header (should work - not CORS).""" + session_id = str(uuid.uuid4()) + async with aiohttp.ClientSession() as session: + async with session.post( + "http://127.0.0.1:8001/mcp/", + json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"}, + headers={ + "Accept": "application/json, text/event-stream", + "Content-Type": "application/json", + "mcp-session-id": session_id, + }, + ) as response: + assert response.status == 200 + # No Origin header means no CORS, request should work + result = await parse_sse_response(response) + assert "result" in result + assert "tools" in result["result"] + + +@pytest.mark.asyncio +async def test_cors_actual_request_with_origin_blocked(http_server): + """Test actual CORS request with origin header (should work but no CORS headers).""" + session_id = str(uuid.uuid4()) + async with aiohttp.ClientSession() as session: + async with session.post( + "http://127.0.0.1:8001/mcp/", + json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"}, + headers={ + "Accept": "application/json, text/event-stream", + "Content-Type": "application/json", + "Origin": "http://localhost:3000", + "mcp-session-id": session_id, + }, + ) as response: + # Request should still work (server processes it) but no CORS headers + assert response.status == 200 + cors_origin = response.headers.get("Access-Control-Allow-Origin") + assert cors_origin is None + + result = await parse_sse_response(response) + assert "result" in result + assert "tools" in result["result"] + + +@pytest.mark.asyncio +async def test_cors_restricted_server_allowed_origin(http_server_restricted_cors): + """Test CORS with restricted server and allowed origin.""" + async with aiohttp.ClientSession() as session: + async with session.options( + "http://127.0.0.1:8003/mcp/", + headers={ + "Origin": "http://localhost:3000", + "Access-Control-Request-Method": "POST", + "Access-Control-Request-Headers": "content-type", + }, + ) as response: + print( + f"Restricted server allowed origin response status: {response.status}" + ) + print( + f"Restricted server allowed origin response headers: {dict(response.headers)}" + ) + + assert response.status == 200 + assert "Access-Control-Allow-Origin" in response.headers + assert ( + response.headers["Access-Control-Allow-Origin"] + == "http://localhost:3000" + ) + + +@pytest.mark.asyncio +async def test_cors_restricted_server_disallowed_origin(http_server_restricted_cors): + """Test CORS with restricted server and disallowed origin.""" + async with aiohttp.ClientSession() as session: + async with ( + session.options( + "http://127.0.0.1:8003/mcp/", + headers={ + "Origin": "http://127.0.0.1:3000", # This should be disallowed on restricted server + "Access-Control-Request-Method": "POST", + "Access-Control-Request-Headers": "content-type", + }, + ) as response + ): + print( + f"Restricted server disallowed origin response status: {response.status}" + ) + print( + f"Restricted server disallowed origin response headers: {dict(response.headers)}" + ) + + # Should return 400 when origin is not in restricted allow_origins list + assert response.status == 400 + # Should not allow 127.0.0.1 on restricted server + cors_origin = response.headers.get("Access-Control-Allow-Origin") + assert cors_origin is None + + +@pytest.mark.asyncio +async def test_cors_restricted_server_trusted_site(http_server_restricted_cors): + """Test CORS with restricted server and trusted site origin.""" + async with aiohttp.ClientSession() as session: + async with session.options( + "http://127.0.0.1:8003/mcp/", + headers={ + "Origin": "https://trusted-site.com", + "Access-Control-Request-Method": "POST", + "Access-Control-Request-Headers": "content-type", + }, + ) as response: + assert response.status == 200 + assert "Access-Control-Allow-Origin" in response.headers + assert ( + response.headers["Access-Control-Allow-Origin"] + == "https://trusted-site.com" + ) + + +@pytest.mark.asyncio +async def test_dns_rebinding_protection_trusted_hosts(http_server): + """Test DNS rebinding protection with TrustedHostMiddleware - allowed hosts.""" + session_id = str(uuid.uuid4()) + async with aiohttp.ClientSession() as session: + # Test with localhost - should be allowed (in default allowed_hosts) + async with session.post( + "http://127.0.0.1:8001/mcp/", + json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"}, + headers={ + "Accept": "application/json, text/event-stream", + "Content-Type": "application/json", + "Host": "localhost:8001", + "mcp-session-id": session_id, + }, + ) as response: + print(f"Trusted host (localhost) response status: {response.status}") + print(f"Trusted host (localhost) response headers: {dict(response.headers)}") + + # Should work with trusted host + assert response.status == 200 + result = await parse_sse_response(response) + assert "result" in result + assert "tools" in result["result"] + + +@pytest.mark.asyncio +async def test_dns_rebinding_protection_untrusted_hosts(http_server): + """Test DNS rebinding protection with TrustedHostMiddleware - untrusted hosts.""" + session_id = str(uuid.uuid4()) + async with aiohttp.ClientSession() as session: + # Test with malicious host - should be blocked + async with session.post( + "http://127.0.0.1:8001/mcp/", + json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"}, + headers={ + "Accept": "application/json, text/event-stream", + "Content-Type": "application/json", + "Host": "malicious-site.evil", + "mcp-session-id": session_id, + }, + ) as response: + print(f"Untrusted host response status: {response.status}") + print(f"Untrusted host response headers: {dict(response.headers)}") + + # Should block untrusted host (DNS rebinding protection) + assert response.status == 400 + + +@pytest.mark.asyncio +async def test_dns_rebinding_custom_allowed_hosts(http_server_custom_hosts): + """Test DNS rebinding protection with custom allowed hosts configuration.""" + session_id = str(uuid.uuid4()) + async with aiohttp.ClientSession() as session: + # Test with custom allowed host - should work + async with session.post( + "http://127.0.0.1:8004/mcp/", + json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"}, + headers={ + "Accept": "application/json, text/event-stream", + "Content-Type": "application/json", + "Host": "example.com", + "mcp-session-id": session_id, + }, + ) as response: + print(f"Custom allowed host (example.com) response status: {response.status}") + print(f"Custom allowed host response headers: {dict(response.headers)}") + + # Should work with custom allowed host + assert response.status == 200 + result = await parse_sse_response(response) + assert "result" in result + assert "tools" in result["result"] + + # Test with another custom allowed host with port - should work + async with session.post( + "http://127.0.0.1:8004/mcp/", + json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"}, + headers={ + "Accept": "application/json, text/event-stream", + "Content-Type": "application/json", + "Host": "test.local:8004", + "mcp-session-id": session_id, + }, + ) as response: + print(f"Custom allowed host (test.local:8004) response status: {response.status}") + print(f"Custom allowed host response headers: {dict(response.headers)}") + + # Should work with custom allowed host + assert response.status == 200 + result = await parse_sse_response(response) + assert "result" in result + assert "tools" in result["result"] + + # Test with localhost (not in custom allowed list) - should be blocked + async with session.post( + "http://127.0.0.1:8004/mcp/", + json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"}, + headers={ + "Accept": "application/json, text/event-stream", + "Content-Type": "application/json", + "Host": "localhost:8004", + "mcp-session-id": session_id, + }, + ) as response: + print(f"Localhost (not in custom allowed) response status: {response.status}") + print(f"Localhost response headers: {dict(response.headers)}") + + # Should block localhost when not in custom allowed hosts + assert response.status == 400
servers/mcp-neo4j-cypher/tests/unit/test_utils.py+83 −0 modified@@ -21,6 +21,8 @@ def clean_env(): "NEO4J_MCP_SERVER_HOST", "NEO4J_MCP_SERVER_PORT", "NEO4J_MCP_SERVER_PATH", + "NEO4J_MCP_SERVER_ALLOW_ORIGINS", + "NEO4J_MCP_SERVER_ALLOWED_HOSTS", "NEO4J_NAMESPACE", "NEO4J_READ_TIMEOUT", "NEO4J_RESPONSE_TOKEN_LIMIT", @@ -54,6 +56,8 @@ def _create_args(**kwargs): "server_host": None, "server_port": None, "server_path": None, + "allow_origins": None, + "allowed_hosts": None, "read_timeout": None, "token_limit": None, } @@ -348,6 +352,85 @@ def test_info_logging_stdio_transport(clean_env, args_factory, mock_logger): assert len(stdio_info) == 3 # host, port, path info messages +# CORS allow_origins tests + + +def test_allow_origins_cli_args(clean_env, args_factory): + """Test allow_origins configuration from CLI arguments.""" + origins = "http://localhost:3000,https://trusted-site.com" + expected_origins = ["http://localhost:3000", "https://trusted-site.com"] + args = args_factory(allow_origins=origins) + config = process_config(args) + + assert config["allow_origins"] == expected_origins + + +def test_allow_origins_env_var(clean_env, args_factory): + """Test allow_origins configuration from environment variable.""" + origins_str = "http://localhost:3000,https://trusted-site.com" + expected_origins = ["http://localhost:3000", "https://trusted-site.com"] + os.environ["NEO4J_MCP_SERVER_ALLOW_ORIGINS"] = origins_str + + args = args_factory() + config = process_config(args) + + assert config["allow_origins"] == expected_origins + + +def test_allow_origins_defaults(clean_env, args_factory, mock_logger): + """Test allow_origins uses empty list as default when not provided.""" + args = args_factory() + config = process_config(args) + + assert config["allow_origins"] == [] + + # Check that info message was logged about using defaults + info_calls = [call.args[0] for call in mock_logger.info.call_args_list] + allow_origins_info = [ + msg + for msg in info_calls + if "allow origins" in msg and "Defaulting to no" in msg + ] + assert len(allow_origins_info) == 1 + + +def test_allow_origins_cli_overrides_env(clean_env, args_factory): + """Test that CLI allow_origins takes precedence over environment variable.""" + os.environ["NEO4J_MCP_SERVER_ALLOW_ORIGINS"] = "http://env-site.com" + + cli_origins = "http://cli-site.com,https://cli-secure.com" + expected_origins = ["http://cli-site.com", "https://cli-secure.com"] + args = args_factory(allow_origins=cli_origins) + config = process_config(args) + + assert config["allow_origins"] == expected_origins + + +def test_allow_origins_empty_list(clean_env, args_factory): + """Test allow_origins with empty list from CLI.""" + args = args_factory(allow_origins="") + config = process_config(args) + + assert config["allow_origins"] == [] + + +def test_allow_origins_single_origin(clean_env, args_factory): + """Test allow_origins with single origin.""" + single_origin = "https://single-site.com" + args = args_factory(allow_origins=single_origin) + config = process_config(args) + + assert config["allow_origins"] == [single_origin] + + +def test_allow_origins_wildcard(clean_env, args_factory): + """Test allow_origins with wildcard.""" + wildcard_origins = "*" + args = args_factory(allow_origins=wildcard_origins) + config = process_config(args) + + assert config["allow_origins"] == [wildcard_origins] + def test_read_timeout_cli_arg(clean_env, args_factory): """Test that read_timeout CLI argument is properly processed.""" args = args_factory(read_timeout=60)
Vulnerability mechanics
Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
7- github.com/advisories/GHSA-vcqx-v2mg-7chxghsaADVISORY
- nvd.nist.gov/vuln/detail/CVE-2025-10193ghsaADVISORY
- github.com/neo4j-contrib/mcp-neo4j/commit/5b9fbdda6401668d7aa006daf7e644805c067c15ghsaWEB
- github.com/neo4j-contrib/mcp-neo4j/pull/165ghsaWEB
- github.com/neo4j-contrib/mcp-neo4j/releases/tag/mcp-neo4j-cypher-v0.4.0nvdWEB
- github.com/neo4j-contrib/mcp-neo4j/security/advisories/GHSA-vcqx-v2mg-7chxnvdWEB
- neo4j.com/security/cve-2025-10193nvdWEB
News mentions
0No linked articles in our index yet.