VYPR
High severityGHSA Advisory· Published Sep 11, 2025· Updated Apr 15, 2026

CVE-2025-10193

CVE-2025-10193

Description

DNS rebinding vulnerability in Neo4j Cypher MCP server allows malicious websites to bypass Same-Origin Policy protections and execute unauthorised tool invocations against locally running Neo4j MCP instances. The attack relies on the user being enticed to visit a malicious website and spend sufficient time there for DNS rebinding to succeed.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
mcp-neo4j-cypherPyPI
>= 0.2.2, < 0.4.00.4.0

Affected products

1

Patches

1
5b9fbdda6401

cypher - add middleware (#165)

12 files changed · +673 25
  • servers/mcp-neo4j-cypher/CHANGELOG.md+3 0 modified
    @@ -7,6 +7,9 @@
     
     ### Added
     * Added Cypher result sanitation function from Neo4j GraphRAG that removes embedding values from the result
    +* Add env variable `NEO4J_MCP_SERVER_ALLOW_ORIGINS` and cli variable `--allow-origins` to configure CORS Middleware for remote deployments
    +* Add env variable `NEO4J_MCP_SERVER_ALLOWED_HOSTS` and cli variable `--allowed-hosts` to configure Trusted Hosts Middleware for remote deployments
    +* Update HTTP and SSE transports to use security middleware
     * Added `read_neo4j_cypher` query timeout configuration via `--read-timeout` CLI parameter and `NEO4J_READ_TIMEOUT` environment variable (defaults to 30 seconds)
     * Add response token limit for read Cypher responses
     
    
  • servers/mcp-neo4j-cypher/docker-compose.yml+2 0 modified
    @@ -25,6 +25,8 @@ services:
           - NEO4J_MCP_SERVER_PORT=8000
           - NEO4J_MCP_SERVER_PATH=/api/mcp/
           - NEO4J_NAMESPACE=local
    +      - NEO4J_MCP_SERVER_ALLOWED_HOSTS=localhost,127.0.0.1
    +      - NEO4J_MCP_SERVER_ALLOW_ORIGINS=""
         depends_on:
           - neo4j
     
    
  • servers/mcp-neo4j-cypher/Dockerfile+3 1 modified
    @@ -26,10 +26,12 @@ ENV NEO4J_USERNAME="neo4j"
     ENV NEO4J_PASSWORD="password"
     ENV NEO4J_DATABASE="neo4j"
     ENV NEO4J_NAMESPACE=""
    -ENV NEO4J_TRANSPORT="stdio"
    +ENV NEO4J_TRANSPORT="http"
     ENV NEO4J_MCP_SERVER_HOST="0.0.0.0"
     ENV NEO4J_MCP_SERVER_PORT="8000"
     ENV NEO4J_MCP_SERVER_PATH="/api/mcp/"
    +ENV NEO4J_MCP_SERVER_ALLOW_ORIGINS=""
    +ENV NEO4J_MCP_SERVER_ALLOWED_HOSTS="localhost,127.0.0.1"
     
     EXPOSE 8000
     
    
  • servers/mcp-neo4j-cypher/Makefile+1 1 modified
    @@ -8,7 +8,7 @@ test-integration:
     	uv run pytest tests/integration/ -v
     
     test-http:
    -	uv run pytest tests/integration/test_http_transport.py -v
    +	uv run pytest tests/integration/test_http_transport_IT.py -v
     
     test-all:
     	uv run pytest tests/ -v
    
  • servers/mcp-neo4j-cypher/manifest.json+18 0 modified
    @@ -32,6 +32,8 @@
             "NEO4J_MCP_SERVER_HOST": "${user_config.mcp_server_host}",
             "NEO4J_MCP_SERVER_PORT": "${user_config.mcp_server_port}",
             "NEO4J_MCP_SERVER_PATH": "${user_config.mcp_server_path}",
    +        "NEO4J_MCP_SERVER_ALLOW_ORIGINS": "${user_config.mcp_server_allow_origins}",
    +        "NEO4J_MCP_SERVER_ALLOWED_HOSTS": "${user_config.mcp_server_allowed_hosts}",
             "NEO4J_RESPONSE_TOKEN_LIMIT": "${user_config.token_limit}",
             "NEO4J_READ_TIMEOUT": "${user_config.read_timeout}"
           }
    @@ -127,6 +129,14 @@
           "required": false,
           "sensitive": false
         },
    +    "mcp_server_allow_origins": {
    +      "type": "string",
    +      "title": "MCP Server Allow Origins",
    +      "description": "The allowed origins for the MCP server as a comma-separated list, if not using stdio. Defaults to no allowed origins",
    +      "default": "",
    +      "required": false,
    +      "sensitive": false
    +    },
         "token_limit": {
           "type": "number",
           "title": "Response token limit",
    @@ -135,6 +145,14 @@
           "required": false,
           "sensitive": false
         },
    +    "mcp_server_allowed_hosts": {
    +      "type": "string",
    +      "title": "MCP Server Allowed Hosts",
    +      "description": "The allowed hosts for the MCP server as a comma-separated list, if not using stdio. Defaults to localhost and 127.0.0.1",
    +      "default": "localhost,127.0.0.1",
    +      "required": false,
    +      "sensitive": false
    +    },
         "read_timeout": {
           "type": "number",
           "title": "Read timeout",
    
  • servers/mcp-neo4j-cypher/README.md+97 19 modified
    @@ -138,6 +138,61 @@ Choose your transport based on use case:
     - **Remote deployment**: Use `http`
     - **Legacy web clients**: Use `sse`
     
    +## 🔒 Security Protection
    +
    +The server includes comprehensive security protection with **secure defaults** that protect against common web-based attacks while preserving full MCP functionality when using HTTP transport.
    +
    +### 🛡️ DNS Rebinding Protection
    +
    +**TrustedHost Middleware** validates Host headers to prevent DNS rebinding attacks:
    +
    +**Secure by Default:**
    +- Only `localhost` and `127.0.0.1` hosts are allowed by default
    +- Malicious websites cannot trick browsers into accessing your local server
    +
    +**Environment Variable:**
    +```bash
    +export NEO4J_MCP_SERVER_ALLOWED_HOSTS="example.com,www.example.com"
    +```
    +
    +### 🌐 CORS Protection
    +
    +**Cross-Origin Resource Sharing (CORS)** protection blocks browser-based requests by default:
    +
    +**Environment Variable:**
    +```bash
    +export NEO4J_MCP_SERVER_ALLOW_ORIGINS="https://example.com,https://example.com"
    +```
    +
    +### 🔧 Complete Security Configuration
    +
    +**Development Setup:**
    +```bash
    +mcp-neo4j-cypher --transport http \
    +  --allowed-hosts "localhost,127.0.0.1" \
    +  --allow-origins "http://localhost:3000"
    +```
    +
    +**Production Setup:**
    +```bash
    +mcp-neo4j-cypher --transport http \
    +  --allowed-hosts "example.com,www.example.com" \
    +  --allow-origins "https://example.com,https://example.com"
    +```
    +
    +
    +### 🚨 Security Best Practices
    +
    +**For `allow_origins`:**
    +- Be specific: `["https://example.com", "https://example.com"]`
    +- Never use `"*"` in production with credentials
    +- Use HTTPS origins in production
    +
    +**For `allowed_hosts`:**
    +- Include your actual domain: `["example.com", "www.example.com"]`
    +- Include localhost only for development
    +- Never use `"*"` unless you understand the risks
    +
     ## 🔧 Usage with Claude Desktop
     
     ### Using DXT
    @@ -148,7 +203,7 @@ Download the latest `.dxt` file from the [releases page](https://github.com/neo4
     
     Can be found on PyPi https://pypi.org/project/mcp-neo4j-cypher/
     
    -Add the server to your `claude_desktop_config.json` with the database connection configuration through environment variables. You may also specify the transport method and namespace with cli arguments or environment variables.
    +Add the server to your `claude_desktop_config.json` with the database connection configuration through environment variables. You may also specify the transport method, namespace and other config variables with cli arguments or environment variables.
     
     ```json
     {
    @@ -169,17 +224,24 @@ Add the server to your `claude_desktop_config.json` with the database connection
     
     ### 🌐 HTTP Transport Configuration
     
    -For custom HTTP configurations beyond the defaults:
    +For custom HTTP configurations with security middleware:
     
     ```bash
    -# Custom HTTP configuration
    -mcp-neo4j-cypher --transport http --server-host 127.0.0.1 --server-port 8080 --server-path /api/mcp/
    -
    -# Or using environment variables
    +# Complete HTTP configuration with security
    +mcp-neo4j-cypher --transport http \
    +  --server-host 127.0.0.1 \
    +  --server-port 8080 \
    +  --server-path /api/mcp/ \
    +  --allowed-hosts "localhost,127.0.0.1,example.com" \
    +  --allow-origins "https://yourapp.com"
    +
    +# Using environment variables
     export NEO4J_TRANSPORT=http
     export NEO4J_MCP_SERVER_HOST=127.0.0.1
     export NEO4J_MCP_SERVER_PORT=8080
     export NEO4J_MCP_SERVER_PATH=/api/mcp/
    +export NEO4J_MCP_SERVER_ALLOWED_HOSTS="localhost,127.0.0.1,example.com"
    +export NEO4J_MCP_SERVER_ALLOW_ORIGINS="https://yourapp.com"
     mcp-neo4j-cypher
     ```
     
    @@ -310,23 +372,39 @@ docker run --rm -p 8000:8000 \
       -e NEO4J_MCP_SERVER_PORT="8000" \
       -e NEO4J_MCP_SERVER_PATH="/mcp/" \
       mcp/neo4j-cypher:latest
    +
    +# Run with security middleware for production
    +docker run --rm -p 8000:8000 \
    +  -e NEO4J_URI="bolt://host.docker.internal:7687" \
    +  -e NEO4J_USERNAME="neo4j" \
    +  -e NEO4J_PASSWORD="password" \
    +  -e NEO4J_DATABASE="neo4j" \
    +  -e NEO4J_TRANSPORT="http" \
    +  -e NEO4J_MCP_SERVER_HOST="0.0.0.0" \
    +  -e NEO4J_MCP_SERVER_PORT="8000" \
    +  -e NEO4J_MCP_SERVER_PATH="/mcp/" \
    +  -e NEO4J_MCP_SERVER_ALLOWED_HOSTS="example.com,www.example.com" \
    +  -e NEO4J_MCP_SERVER_ALLOW_ORIGINS="https://example.com" \
    +  mcp/neo4j-cypher:latest
     ```
     
     ### 🔧 Environment Variables
     
    -| Variable                      | Default                                 | Description                                    |
    -| ----------------------------- | --------------------------------------- | ---------------------------------------------- |
    -| `NEO4J_URI`                   | `bolt://localhost:7687`                 | Neo4j connection URI                           |
    -| `NEO4J_USERNAME`              | `neo4j`                                 | Neo4j username                                 |
    -| `NEO4J_PASSWORD`              | `password`                              | Neo4j password                                 |
    -| `NEO4J_DATABASE`              | `neo4j`                                 | Neo4j database name                            |
    -| `NEO4J_TRANSPORT`             | `stdio` (local), `http` (remote)        | Transport protocol (`stdio`, `http`, or `sse`) |
    -| `NEO4J_NAMESPACE`             | _(empty)_                               | Tool namespace prefix                          |
    -| `NEO4J_MCP_SERVER_HOST`       | `127.0.0.1` (local)                     | Host to bind to                                |
    -| `NEO4J_MCP_SERVER_PORT`       | `8000`                                  | Port for HTTP/SSE transport                    |
    -| `NEO4J_MCP_SERVER_PATH`       | `/api/mcp/`                             | Path for accessing MCP server                  |
    -| `NEO4J_RESPONSE_TOKEN_LIMIT`  | _(none)_                                | Maximum tokens for read query responses        |
    -| `NEO4J_READ_TIMEOUT`          | `30`                                    | Timeout in seconds for read queries            |
    +| Variable                           | Default                                 | Description                                        |
    +| ---------------------------------- | --------------------------------------- | -------------------------------------------------- |
    +| `NEO4J_URI`                        | `bolt://localhost:7687`                 | Neo4j connection URI                               |
    +| `NEO4J_USERNAME`                   | `neo4j`                                 | Neo4j username                                     |
    +| `NEO4J_PASSWORD`                   | `password`                              | Neo4j password                                     |
    +| `NEO4J_DATABASE`                   | `neo4j`                                 | Neo4j database name                                |
    +| `NEO4J_TRANSPORT`                  | `stdio` (local), `http` (remote)        | Transport protocol (`stdio`, `http`, or `sse`)     |
    +| `NEO4J_NAMESPACE`                  | _(empty)_                               | Tool namespace prefix                              |
    +| `NEO4J_MCP_SERVER_HOST`            | `127.0.0.1` (local)                     | Host to bind to                                    |
    +| `NEO4J_MCP_SERVER_PORT`            | `8000`                                  | Port for HTTP/SSE transport                        |
    +| `NEO4J_MCP_SERVER_PATH`            | `/api/mcp/`                             | Path for accessing MCP server                      |
    +| `NEO4J_MCP_SERVER_ALLOW_ORIGINS`   | _(empty - secure by default)_           | Comma-separated list of allowed CORS origins       |
    +| `NEO4J_MCP_SERVER_ALLOWED_HOSTS`   | `localhost,127.0.0.1`                   | Comma-separated list of allowed hosts (DNS rebinding protection) |
    +| `NEO4J_RESPONSE_TOKEN_LIMIT`       | _(none)_                                | Maximum tokens for read query responses            |
    +| `NEO4J_READ_TIMEOUT`               | `30`                                    | Timeout in seconds for read queries                |
     
     ### 🌐 SSE Transport for Legacy Web Access
     
    
  • servers/mcp-neo4j-cypher/src/mcp_neo4j_cypher/__init__.py+10 0 modified
    @@ -21,6 +21,16 @@ def main():
         )
         parser.add_argument("--server-host", default=None, help="Server host")
         parser.add_argument("--server-port", default=None, help="Server port")
    +    parser.add_argument(
    +        "--allow-origins",
    +        default=None,
    +        help="Allow origins for remote servers (comma-separated list)",
    +    )
    +    parser.add_argument(
    +        "--allowed-hosts",
    +        default=None,
    +        help="Allowed hosts for DNS rebinding protection on remote servers(comma-separated list)",
    +    )
         parser.add_argument(
             "--read-timeout", 
             type=int, 
    
  • servers/mcp-neo4j-cypher/src/mcp_neo4j_cypher/server.py+22 3 modified
    @@ -10,6 +10,11 @@
     from neo4j import AsyncDriver, AsyncGraphDatabase, RoutingControl, Query
     from neo4j.exceptions import ClientError, Neo4jError
     from pydantic import Field
    +from starlette.middleware import Middleware
    +from starlette.middleware.cors import CORSMiddleware
    +from starlette.middleware.trustedhost import TrustedHostMiddleware
    +
    +from .utils import _value_sanitize
     from .utils import _value_sanitize, _truncate_string_to_tokens
     
     logger = logging.getLogger("mcp_neo4j_cypher")
    @@ -263,6 +268,8 @@ async def main(
         host: str = "127.0.0.1",
         port: int = 8000,
         path: str = "/mcp/",
    +    allow_origins: list[str] = [],
    +    allowed_hosts: list[str] = [],
         read_timeout: int = 30,
         token_limit: Optional[int] = None,
     ) -> None:
    @@ -275,7 +282,17 @@ async def main(
                 password,
             ),
         )
    -
    +    custom_middleware = [
    +        Middleware(
    +            CORSMiddleware,
    +            allow_origins=allow_origins,
    +            allow_methods=["GET", "POST"],
    +            allow_headers=["*"],
    +        ),
    +        Middleware(TrustedHostMiddleware, 
    +                   allowed_hosts=allowed_hosts)
    +    ]
    +    
         mcp = create_mcp_server(neo4j_driver, database, namespace, read_timeout, token_limit)
     
         # Run the server with the specified transport
    @@ -284,15 +301,17 @@ async def main(
                 logger.info(
                     f"Running Neo4j Cypher MCP Server with HTTP transport on {host}:{port}..."
                 )
    -            await mcp.run_http_async(host=host, port=port, path=path)
    +            await mcp.run_http_async(
    +                host=host, port=port, path=path, middleware=custom_middleware
    +            )
             case "stdio":
                 logger.info("Running Neo4j Cypher MCP Server with stdio transport...")
                 await mcp.run_stdio_async()
             case "sse":
                 logger.info(
                     f"Running Neo4j Cypher MCP Server with SSE transport on {host}:{port}..."
                 )
    -            await mcp.run_sse_async(host=host, port=port, path=path)
    +            await mcp.run_http_async(host=host, port=port, path=path, middleware=custom_middleware, transport="sse")
             case _:
                 logger.error(
                     f"Invalid transport: {transport} | Must be either 'stdio', 'sse', or 'http'"
    
  • servers/mcp-neo4j-cypher/src/mcp_neo4j_cypher/utils.py+37 1 modified
    @@ -170,6 +170,43 @@ def process_config(args: argparse.Namespace) -> dict[str, Union[str, int, None]]
                 )
                 config["path"] = None
     
    +    # parse allow origins
    +    if args.allow_origins is not None:
    +        # Handle comma-separated string from CLI
    +     
    +        config["allow_origins"] = [origin.strip() for origin in args.allow_origins.split(",") if origin.strip()]
    +
    +    else:
    +        if os.getenv("NEO4J_MCP_SERVER_ALLOW_ORIGINS") is not None:
    +            # split comma-separated string into list
    +            config["allow_origins"] = [
    +                origin.strip() for origin in os.getenv("NEO4J_MCP_SERVER_ALLOW_ORIGINS", "").split(",") 
    +                if origin.strip()
    +            ]
    +        else:
    +            logger.info(
    +                "Info: No allow origins provided. Defaulting to no allowed origins."
    +            )
    +            config["allow_origins"] = list()
    +
    +    # parse allowed hosts for DNS rebinding protection
    +    if args.allowed_hosts is not None:
    +        # Handle comma-separated string from CLI
    +        config["allowed_hosts"] = [host.strip() for host in args.allowed_hosts.split(",") if host.strip()]
    +      
    +    else:
    +        if os.getenv("NEO4J_MCP_SERVER_ALLOWED_HOSTS") is not None:
    +            # split comma-separated string into list
    +            config["allowed_hosts"] = [
    +                host.strip() for host in os.getenv("NEO4J_MCP_SERVER_ALLOWED_HOSTS", "").split(",") 
    +                if host.strip()
    +            ]
    +        else:
    +            logger.info(
    +                "Info: No allowed hosts provided. Defaulting to secure mode - only localhost and 127.0.0.1 allowed."
    +            )
    +            config["allowed_hosts"] = ["localhost", "127.0.0.1"]
    +            
         # parse token limit
         if args.token_limit is not None:
             config["token_limit"] = args.token_limit
    @@ -258,7 +295,6 @@ def _value_sanitize(d: Any, list_limit: int = 128) -> Any:
         else:
             return d
     
    -
     def _truncate_string_to_tokens(
         text: str, token_limit: int, model: str = "gpt-4"
     ) -> str:
    
  • servers/mcp-neo4j-cypher/tests/integration/conftest.py+98 0 modified
    @@ -171,3 +171,101 @@ async def http_server(setup: Neo4jContainer):
         except asyncio.TimeoutError:
             process.kill()
             await process.wait()
    +
    +
    +@pytest_asyncio.fixture
    +async def http_server_restricted_cors(setup: Neo4jContainer):
    +    """Start the MCP server in HTTP mode with restricted CORS origins."""
    +
    +    # Start server process in HTTP mode with restricted CORS
    +    process = await asyncio.create_subprocess_exec(
    +        "uv",
    +        "run",
    +        "mcp-neo4j-cypher",
    +        "--transport",
    +        "http",
    +        "--server-host",
    +        "127.0.0.1",
    +        "--server-port",
    +        "8003",
    +        "--allow-origins",
    +        "http://localhost:3000,https://trusted-site.com",
    +        "--db-url",
    +        setup.get_connection_url(),
    +        "--username",
    +        setup.username,
    +        "--password",
    +        setup.password,
    +        stdout=subprocess.PIPE,
    +        stderr=subprocess.PIPE,
    +        cwd=os.getcwd(),
    +    )
    +
    +    # Wait for server to start
    +    await asyncio.sleep(3)
    +
    +    # Check if process is still running
    +    if process.returncode is not None:
    +        stdout, stderr = await process.communicate()
    +        raise RuntimeError(
    +            f"Restricted CORS server failed to start. stdout: {stdout.decode()}, stderr: {stderr.decode()}"
    +        )
    +
    +    yield process
    +
    +    # Cleanup
    +    try:
    +        process.terminate()
    +        await asyncio.wait_for(process.wait(), timeout=5.0)
    +    except asyncio.TimeoutError:
    +        process.kill()
    +        await process.wait()
    +
    +
    +@pytest_asyncio.fixture
    +async def http_server_custom_hosts(setup: Neo4jContainer):
    +    """Start the MCP server in HTTP mode with custom allowed hosts."""
    +
    +    # Start server process in HTTP mode with custom allowed hosts
    +    process = await asyncio.create_subprocess_exec(
    +        "uv",
    +        "run",
    +        "mcp-neo4j-cypher",
    +        "--transport",
    +        "http",
    +        "--server-host",
    +        "127.0.0.1",
    +        "--server-port",
    +        "8004",
    +        "--allowed-hosts",
    +        "example.com,test.local",
    +        "--db-url",
    +        setup.get_connection_url(),
    +        "--username",
    +        setup.username,
    +        "--password",
    +        setup.password,
    +        stdout=subprocess.PIPE,
    +        stderr=subprocess.PIPE,
    +        cwd=os.getcwd(),
    +    )
    +
    +    # Wait for server to start
    +    await asyncio.sleep(3)
    +
    +    # Check if process is still running
    +    if process.returncode is not None:
    +        stdout, stderr = await process.communicate()
    +        raise RuntimeError(
    +            f"Custom hosts server failed to start. stdout: {stdout.decode()}, stderr: {stderr.decode()}"
    +        )
    +
    +    yield process
    +
    +    # Cleanup
    +    try:
    +        process.terminate()
    +        await asyncio.wait_for(process.wait(), timeout=5.0)
    +    except asyncio.TimeoutError:
    +        process.kill()
    +        await process.wait()
    
  • servers/mcp-neo4j-cypher/tests/integration/test_http_transport_IT.py+299 0 modified
    @@ -241,3 +241,302 @@ async def test_http_full_workflow(http_server):
                 result = await parse_sse_response(response)
                 assert response.status == 200
                 assert "result" in result
    +
    +
    +# CORS Middleware Tests
    +
    +
    +@pytest.mark.asyncio
    +async def test_cors_preflight_empty_default_origins(http_server):
    +    """Test CORS preflight request with empty default allowed origins."""
    +    async with aiohttp.ClientSession() as session:
    +        async with session.options(
    +            "http://127.0.0.1:8001/mcp/",
    +            headers={
    +                "Origin": "http://localhost:3000",
    +                "Access-Control-Request-Method": "POST",
    +                "Access-Control-Request-Headers": "content-type",
    +            },
    +        ) as response:
    +            print(f"CORS preflight response status: {response.status}")
    +            print(f"CORS preflight response headers: {dict(response.headers)}")
    +
    +            # Should return 400 when origin is not in allow_origins (empty list blocks all)
    +            assert response.status == 400
    +            # Should NOT allow any origin with empty default
    +            cors_origin = response.headers.get("Access-Control-Allow-Origin")
    +            assert cors_origin is None
    +
    +
    +@pytest.mark.asyncio
    +async def test_cors_preflight_any_origin_blocked(http_server):
    +    """Test CORS preflight request - all origins should be blocked with empty default."""
    +    async with aiohttp.ClientSession() as session:
    +        async with session.options(
    +            "http://127.0.0.1:8001/mcp/",
    +            headers={
    +                "Origin": "http://127.0.0.1:3000",
    +                "Access-Control-Request-Method": "POST",
    +                "Access-Control-Request-Headers": "content-type",
    +            },
    +        ) as response:
    +            # Should return 400 when origin is blocked by empty allow_origins
    +            assert response.status == 400
    +            # Should not include CORS allow origin header for any origin
    +            cors_origin = response.headers.get("Access-Control-Allow-Origin")
    +            assert cors_origin is None
    +
    +
    +@pytest.mark.asyncio
    +async def test_cors_preflight_malicious_origin_blocked(http_server):
    +    """Test CORS preflight request with malicious origin (should be blocked)."""
    +    async with aiohttp.ClientSession() as session:
    +        async with session.options(
    +            "http://127.0.0.1:8001/mcp/",
    +            headers={
    +                "Origin": "http://malicious-site.com",
    +                "Access-Control-Request-Method": "POST",
    +                "Access-Control-Request-Headers": "content-type",
    +            },
    +        ) as response:
    +            print(f"Malicious origin response status: {response.status}")
    +            print(f"Malicious origin response headers: {dict(response.headers)}")
    +
    +            # Should return 400 when malicious origin is blocked
    +            assert response.status == 400
    +            # Should not include CORS headers for any origins (empty default)
    +            cors_origin = response.headers.get("Access-Control-Allow-Origin")
    +            assert cors_origin is None
    +
    +
    +@pytest.mark.asyncio
    +async def test_cors_actual_request_no_cors_headers(http_server):
    +    """Test actual request without Origin header (should work - not CORS)."""
    +    session_id = str(uuid.uuid4())
    +    async with aiohttp.ClientSession() as session:
    +        async with session.post(
    +            "http://127.0.0.1:8001/mcp/",
    +            json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"},
    +            headers={
    +                "Accept": "application/json, text/event-stream",
    +                "Content-Type": "application/json",
    +                "mcp-session-id": session_id,
    +            },
    +        ) as response:
    +            assert response.status == 200
    +            # No Origin header means no CORS, request should work
    +            result = await parse_sse_response(response)
    +            assert "result" in result
    +            assert "tools" in result["result"]
    +
    +
    +@pytest.mark.asyncio
    +async def test_cors_actual_request_with_origin_blocked(http_server):
    +    """Test actual CORS request with origin header (should work but no CORS headers)."""
    +    session_id = str(uuid.uuid4())
    +    async with aiohttp.ClientSession() as session:
    +        async with session.post(
    +            "http://127.0.0.1:8001/mcp/",
    +            json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"},
    +            headers={
    +                "Accept": "application/json, text/event-stream",
    +                "Content-Type": "application/json",
    +                "Origin": "http://localhost:3000",
    +                "mcp-session-id": session_id,
    +            },
    +        ) as response:
    +            # Request should still work (server processes it) but no CORS headers
    +            assert response.status == 200
    +            cors_origin = response.headers.get("Access-Control-Allow-Origin")
    +            assert cors_origin is None
    +
    +            result = await parse_sse_response(response)
    +            assert "result" in result
    +            assert "tools" in result["result"]
    +
    +
    +@pytest.mark.asyncio
    +async def test_cors_restricted_server_allowed_origin(http_server_restricted_cors):
    +    """Test CORS with restricted server and allowed origin."""
    +    async with aiohttp.ClientSession() as session:
    +        async with session.options(
    +            "http://127.0.0.1:8003/mcp/",
    +            headers={
    +                "Origin": "http://localhost:3000",
    +                "Access-Control-Request-Method": "POST",
    +                "Access-Control-Request-Headers": "content-type",
    +            },
    +        ) as response:
    +            print(
    +                f"Restricted server allowed origin response status: {response.status}"
    +            )
    +            print(
    +                f"Restricted server allowed origin response headers: {dict(response.headers)}"
    +            )
    +
    +            assert response.status == 200
    +            assert "Access-Control-Allow-Origin" in response.headers
    +            assert (
    +                response.headers["Access-Control-Allow-Origin"]
    +                == "http://localhost:3000"
    +            )
    +
    +
    +@pytest.mark.asyncio
    +async def test_cors_restricted_server_disallowed_origin(http_server_restricted_cors):
    +    """Test CORS with restricted server and disallowed origin."""
    +    async with aiohttp.ClientSession() as session:
    +        async with (
    +            session.options(
    +                "http://127.0.0.1:8003/mcp/",
    +                headers={
    +                    "Origin": "http://127.0.0.1:3000",  # This should be disallowed on restricted server
    +                    "Access-Control-Request-Method": "POST",
    +                    "Access-Control-Request-Headers": "content-type",
    +                },
    +            ) as response
    +        ):
    +            print(
    +                f"Restricted server disallowed origin response status: {response.status}"
    +            )
    +            print(
    +                f"Restricted server disallowed origin response headers: {dict(response.headers)}"
    +            )
    +
    +            # Should return 400 when origin is not in restricted allow_origins list
    +            assert response.status == 400
    +            # Should not allow 127.0.0.1 on restricted server
    +            cors_origin = response.headers.get("Access-Control-Allow-Origin")
    +            assert cors_origin is None
    +
    +
    +@pytest.mark.asyncio
    +async def test_cors_restricted_server_trusted_site(http_server_restricted_cors):
    +    """Test CORS with restricted server and trusted site origin."""
    +    async with aiohttp.ClientSession() as session:
    +        async with session.options(
    +            "http://127.0.0.1:8003/mcp/",
    +            headers={
    +                "Origin": "https://trusted-site.com",
    +                "Access-Control-Request-Method": "POST",
    +                "Access-Control-Request-Headers": "content-type",
    +            },
    +        ) as response:
    +            assert response.status == 200
    +            assert "Access-Control-Allow-Origin" in response.headers
    +            assert (
    +                response.headers["Access-Control-Allow-Origin"]
    +                == "https://trusted-site.com"
    +            )
    +
    +
    +@pytest.mark.asyncio
    +async def test_dns_rebinding_protection_trusted_hosts(http_server):
    +    """Test DNS rebinding protection with TrustedHostMiddleware - allowed hosts."""
    +    session_id = str(uuid.uuid4())
    +    async with aiohttp.ClientSession() as session:
    +        # Test with localhost - should be allowed (in default allowed_hosts)
    +        async with session.post(
    +            "http://127.0.0.1:8001/mcp/",
    +            json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"},
    +            headers={
    +                "Accept": "application/json, text/event-stream", 
    +                "Content-Type": "application/json",
    +                "Host": "localhost:8001",
    +                "mcp-session-id": session_id,
    +            },
    +        ) as response:
    +            print(f"Trusted host (localhost) response status: {response.status}")
    +            print(f"Trusted host (localhost) response headers: {dict(response.headers)}")
    +            
    +            # Should work with trusted host
    +            assert response.status == 200
    +            result = await parse_sse_response(response)
    +            assert "result" in result
    +            assert "tools" in result["result"]
    +
    +
    +@pytest.mark.asyncio  
    +async def test_dns_rebinding_protection_untrusted_hosts(http_server):
    +    """Test DNS rebinding protection with TrustedHostMiddleware - untrusted hosts."""
    +    session_id = str(uuid.uuid4())
    +    async with aiohttp.ClientSession() as session:
    +        # Test with malicious host - should be blocked
    +        async with session.post(
    +            "http://127.0.0.1:8001/mcp/",
    +            json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"},
    +            headers={
    +                "Accept": "application/json, text/event-stream",
    +                "Content-Type": "application/json", 
    +                "Host": "malicious-site.evil",
    +                "mcp-session-id": session_id,
    +            },
    +        ) as response:
    +            print(f"Untrusted host response status: {response.status}")
    +            print(f"Untrusted host response headers: {dict(response.headers)}")
    +            
    +            # Should block untrusted host (DNS rebinding protection)
    +            assert response.status == 400
    +
    +
    +@pytest.mark.asyncio
    +async def test_dns_rebinding_custom_allowed_hosts(http_server_custom_hosts):
    +    """Test DNS rebinding protection with custom allowed hosts configuration."""
    +    session_id = str(uuid.uuid4())
    +    async with aiohttp.ClientSession() as session:
    +        # Test with custom allowed host - should work
    +        async with session.post(
    +            "http://127.0.0.1:8004/mcp/",
    +            json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"},
    +            headers={
    +                "Accept": "application/json, text/event-stream",
    +                "Content-Type": "application/json",
    +                "Host": "example.com",
    +                "mcp-session-id": session_id,
    +            },
    +        ) as response:
    +            print(f"Custom allowed host (example.com) response status: {response.status}")
    +            print(f"Custom allowed host response headers: {dict(response.headers)}")
    +            
    +            # Should work with custom allowed host
    +            assert response.status == 200
    +            result = await parse_sse_response(response)
    +            assert "result" in result
    +            assert "tools" in result["result"]
    +        
    +        # Test with another custom allowed host with port - should work 
    +        async with session.post(
    +            "http://127.0.0.1:8004/mcp/",
    +            json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"},
    +            headers={
    +                "Accept": "application/json, text/event-stream",
    +                "Content-Type": "application/json",
    +                "Host": "test.local:8004",
    +                "mcp-session-id": session_id,
    +            },
    +        ) as response:
    +            print(f"Custom allowed host (test.local:8004) response status: {response.status}")
    +            print(f"Custom allowed host response headers: {dict(response.headers)}")
    +            
    +            # Should work with custom allowed host 
    +            assert response.status == 200
    +            result = await parse_sse_response(response)
    +            assert "result" in result
    +            assert "tools" in result["result"]
    +
    +        # Test with localhost (not in custom allowed list) - should be blocked
    +        async with session.post(
    +            "http://127.0.0.1:8004/mcp/",
    +            json={"jsonrpc": "2.0", "id": 1, "method": "tools/list"},
    +            headers={
    +                "Accept": "application/json, text/event-stream",
    +                "Content-Type": "application/json",
    +                "Host": "localhost:8004",
    +                "mcp-session-id": session_id,
    +            },
    +        ) as response:
    +            print(f"Localhost (not in custom allowed) response status: {response.status}")
    +            print(f"Localhost response headers: {dict(response.headers)}")
    +            
    +            # Should block localhost when not in custom allowed hosts
    +            assert response.status == 400
    
  • servers/mcp-neo4j-cypher/tests/unit/test_utils.py+83 0 modified
    @@ -21,6 +21,8 @@ def clean_env():
             "NEO4J_MCP_SERVER_HOST",
             "NEO4J_MCP_SERVER_PORT",
             "NEO4J_MCP_SERVER_PATH",
    +        "NEO4J_MCP_SERVER_ALLOW_ORIGINS",
    +        "NEO4J_MCP_SERVER_ALLOWED_HOSTS",
             "NEO4J_NAMESPACE",
             "NEO4J_READ_TIMEOUT",
             "NEO4J_RESPONSE_TOKEN_LIMIT",
    @@ -54,6 +56,8 @@ def _create_args(**kwargs):
                 "server_host": None,
                 "server_port": None,
                 "server_path": None,
    +            "allow_origins": None,
    +            "allowed_hosts": None,
                 "read_timeout": None,
                 "token_limit": None,
             }
    @@ -348,6 +352,85 @@ def test_info_logging_stdio_transport(clean_env, args_factory, mock_logger):
         assert len(stdio_info) == 3  # host, port, path info messages
     
     
    +# CORS allow_origins tests
    +
    +
    +def test_allow_origins_cli_args(clean_env, args_factory):
    +    """Test allow_origins configuration from CLI arguments."""
    +    origins = "http://localhost:3000,https://trusted-site.com"
    +    expected_origins = ["http://localhost:3000", "https://trusted-site.com"]
    +    args = args_factory(allow_origins=origins)
    +    config = process_config(args)
    +
    +    assert config["allow_origins"] == expected_origins
    +
    +
    +def test_allow_origins_env_var(clean_env, args_factory):
    +    """Test allow_origins configuration from environment variable."""
    +    origins_str = "http://localhost:3000,https://trusted-site.com"
    +    expected_origins = ["http://localhost:3000", "https://trusted-site.com"]
    +    os.environ["NEO4J_MCP_SERVER_ALLOW_ORIGINS"] = origins_str
    +
    +    args = args_factory()
    +    config = process_config(args)
    +
    +    assert config["allow_origins"] == expected_origins
    +
    +
    +def test_allow_origins_defaults(clean_env, args_factory, mock_logger):
    +    """Test allow_origins uses empty list as default when not provided."""
    +    args = args_factory()
    +    config = process_config(args)
    +
    +    assert config["allow_origins"] == []
    +
    +    # Check that info message was logged about using defaults
    +    info_calls = [call.args[0] for call in mock_logger.info.call_args_list]
    +    allow_origins_info = [
    +        msg
    +        for msg in info_calls
    +        if "allow origins" in msg and "Defaulting to no" in msg
    +    ]
    +    assert len(allow_origins_info) == 1
    +
    +
    +def test_allow_origins_cli_overrides_env(clean_env, args_factory):
    +    """Test that CLI allow_origins takes precedence over environment variable."""
    +    os.environ["NEO4J_MCP_SERVER_ALLOW_ORIGINS"] = "http://env-site.com"
    +
    +    cli_origins = "http://cli-site.com,https://cli-secure.com"
    +    expected_origins = ["http://cli-site.com", "https://cli-secure.com"]
    +    args = args_factory(allow_origins=cli_origins)
    +    config = process_config(args)
    +
    +    assert config["allow_origins"] == expected_origins
    +
    +
    +def test_allow_origins_empty_list(clean_env, args_factory):
    +    """Test allow_origins with empty list from CLI."""
    +    args = args_factory(allow_origins="")
    +    config = process_config(args)
    +
    +    assert config["allow_origins"] == []
    +
    +
    +def test_allow_origins_single_origin(clean_env, args_factory):
    +    """Test allow_origins with single origin."""
    +    single_origin = "https://single-site.com"
    +    args = args_factory(allow_origins=single_origin)
    +    config = process_config(args)
    +
    +    assert config["allow_origins"] == [single_origin]
    +
    +
    +def test_allow_origins_wildcard(clean_env, args_factory):
    +    """Test allow_origins with wildcard."""
    +    wildcard_origins = "*"
    +    args = args_factory(allow_origins=wildcard_origins)
    +    config = process_config(args)
    +
    +    assert config["allow_origins"] == [wildcard_origins]
    +
     def test_read_timeout_cli_arg(clean_env, args_factory):
         """Test that read_timeout CLI argument is properly processed."""
         args = args_factory(read_timeout=60)
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

7

News mentions

0

No linked articles in our index yet.