VYPR
High severity8.2NVD Advisory· Published Mar 10, 2026· Updated Apr 13, 2026

CVE-2026-27826

CVE-2026-27826

Description

MCP Atlassian is a Model Context Protocol (MCP) server for Atlassian products (Confluence and Jira). Prior to version 0.17.0, an unauthenticated attacker who can reach the mcp-atlassian HTTP endpoint can force the server process to make outbound HTTP requests to an arbitrary attacker-controlled URL by supplying two custom HTTP headers without an Authorization header. No authentication is required. The vulnerability exists in the HTTP middleware and dependency injection layer — not in any MCP tool handler - making it invisible to tool-level code analysis. In cloud deployments, this could enable theft of IAM role credentials via the instance metadata endpoint (169[.]254[.]169[.]254). In any HTTP deployment it enables internal network reconnaissance and injection of attacker-controlled content into LLM tool results. Version 0.17.0 fixes the issue.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
mcp-atlassianPyPI
< 0.17.00.17.0

Affected products

1

Patches

1
5cd697dfce91

fix(server): add SSRF protection for header-based URL validation (#986)

https://github.com/sooperset/mcp-atlassianHyeonsoo LeeFeb 24, 2026via ghsa
7 files changed · +428 2
  • .env.example+7 0 modified
    @@ -196,6 +196,13 @@ MCP_VERY_VERBOSE=true   # Enables DEBUG level logging (equivalent to 'mcp-atlass
     #CONFLUENCE_SOCKS_PROXY=socks5://confluence-proxy.example.com:1080
     #CONFLUENCE_NO_PROXY=localhost,127.0.0.1,.internal.confluence.com
     
    +# --- SSRF Protection (Advanced) ---
    +# Optional: Comma-separated list of allowed URL domains for header-based authentication.
    +# When set, only URLs matching these domains (exact or subdomain) are accepted
    +# from X-Atlassian-Jira-Url and X-Atlassian-Confluence-Url headers.
    +# Example: MCP_ALLOWED_URL_DOMAINS=atlassian.net,jira.example.com
    +#MCP_ALLOWED_URL_DOMAINS=
    +
     # --- Custom HTTP Headers (Advanced) ---
     # Jira-specific custom headers.
     #JIRA_CUSTOM_HEADERS=X-Jira-Service=mcp-integration,X-Custom-Auth=jira-token,X-Forwarded-User=service-account
    
  • src/mcp_atlassian/servers/dependencies.py+40 0 modified
    @@ -7,6 +7,7 @@
     
     import dataclasses
     import logging
    +from collections.abc import Callable
     from typing import TYPE_CHECKING, Any
     
     from fastmcp import Context
    @@ -17,6 +18,7 @@
     from mcp_atlassian.jira import JiraConfig, JiraFetcher
     from mcp_atlassian.servers.context import MainAppContext
     from mcp_atlassian.utils.oauth import OAuthConfig
    +from mcp_atlassian.utils.urls import validate_url_for_ssrf
     
     if TYPE_CHECKING:
         from mcp_atlassian.confluence.config import (
    @@ -27,6 +29,34 @@
     logger = logging.getLogger("mcp-atlassian.servers.dependencies")
     
     
    +def _make_ssrf_safe_hook(
    +    validate_fn: Callable[[str], str | None],
    +) -> Callable[..., Any]:
    +    """Create a requests response hook that validates redirect URLs.
    +
    +    Blocks HTTP redirects that target internal/private IP addresses
    +    to prevent SSRF via open-redirect chains.
    +
    +    Args:
    +        validate_fn: A function that returns None if safe,
    +            error string if blocked.
    +
    +    Returns:
    +        A requests response hook function.
    +    """
    +
    +    def hook(response: Any, **kwargs: Any) -> Any:
    +        if response.is_redirect:
    +            redirect_url = response.headers.get("Location", "")
    +            error = validate_fn(redirect_url)
    +            if error:
    +                response.close()
    +                raise ValueError(f"Redirect blocked (SSRF): {error}")
    +        return response
    +
    +    return hook
    +
    +
     def _resolve_bearer_auth_type(
         base_config: JiraConfig | ConfluenceConfig,
         middleware_auth_type: str,
    @@ -281,6 +311,11 @@ async def get_jira_fetcher(ctx: Context) -> JiraFetcher:
                 )
                 try:
                     header_jira_fetcher = JiraFetcher(config=header_config)
    +                # Attach SSRF redirect hook to block
    +                # 302 redirects to internal targets
    +                header_jira_fetcher.jira._session.hooks["response"].append(
    +                    _make_ssrf_safe_hook(validate_url_for_ssrf)
    +                )
                     current_user_id = header_jira_fetcher.get_current_user_account_id()
                     logger.debug(
                         f"get_jira_fetcher: Validated header-based Jira token for user ID: {current_user_id}"
    @@ -488,6 +523,11 @@ async def get_confluence_fetcher(ctx: Context) -> ConfluenceFetcher:
                 )
                 try:
                     header_confluence_fetcher = ConfluenceFetcher(config=header_config)
    +                # Attach SSRF redirect hook to block
    +                # 302 redirects to internal targets
    +                header_confluence_fetcher.confluence._session.hooks["response"].append(
    +                    _make_ssrf_safe_hook(validate_url_for_ssrf)
    +                )
                     current_user_data = header_confluence_fetcher.get_current_user_info()
                     derived_email = (
                         current_user_data.get("email")
    
  • src/mcp_atlassian/servers/main.py+18 0 modified
    @@ -27,6 +27,7 @@
     from mcp_atlassian.utils.io import is_read_only_mode
     from mcp_atlassian.utils.logging import mask_sensitive
     from mcp_atlassian.utils.tools import get_enabled_tools, should_include_tool
    +from mcp_atlassian.utils.urls import validate_url_for_ssrf
     
     from .confluence import confluence_mcp
     from .context import MainAppContext
    @@ -472,6 +473,23 @@ def _process_authentication_headers(self, scope: Scope) -> None:
                     else None
                 )
     
    +            # Validate URLs to prevent SSRF
    +            if jira_url_str:
    +                ssrf_error = validate_url_for_ssrf(jira_url_str)
    +                if ssrf_error:
    +                    scope["state"]["auth_validation_error"] = (
    +                        f"Forbidden: Invalid Jira URL - {ssrf_error}"
    +                    )
    +                    return
    +
    +            if confluence_url_str:
    +                ssrf_error = validate_url_for_ssrf(confluence_url_str)
    +                if ssrf_error:
    +                    scope["state"]["auth_validation_error"] = (
    +                        f"Forbidden: Invalid Confluence URL - {ssrf_error}"
    +                    )
    +                    return
    +
                 # Build service headers dict
                 service_headers = {}
                 if jira_token_str:
    
  • src/mcp_atlassian/utils/__init__.py+2 1 modified
    @@ -16,7 +16,7 @@
     # Export OAuth utilities
     from .oauth import OAuthConfig, configure_oauth_session
     from .ssl import SSLIgnoreAdapter, configure_ssl_verification
    -from .urls import is_atlassian_cloud_url
    +from .urls import is_atlassian_cloud_url, validate_url_for_ssrf
     
     # Export all utility functions for backward compatibility
     __all__ = [
    @@ -32,4 +32,5 @@
         "configure_oauth_session",
         "setup_signal_handlers",
         "ensure_clean_exit",
    +    "validate_url_for_ssrf",
     ]
    
  • src/mcp_atlassian/utils/urls.py+142 0 modified
    @@ -1,6 +1,9 @@
     """URL-related utility functions for MCP Atlassian."""
     
    +import ipaddress
    +import os
     import re
    +import socket
     from urllib.parse import urlparse
     
     
    @@ -40,3 +43,142 @@ def is_atlassian_cloud_url(url: str) -> bool:
             or ".atlassian-us-gov-mod.net" in hostname  # US Gov Moderate (FedRAMP)
             or ".atlassian-us-gov.net" in hostname  # US Gov (FedRAMP)
         )
    +
    +
    +def validate_url_for_ssrf(url: str) -> str | None:
    +    """Validate a URL to prevent SSRF attacks.
    +
    +    Returns None if the URL is safe, or an error message string
    +    describing why it was blocked.
    +
    +    Args:
    +        url: The URL to validate.
    +
    +    Returns:
    +        None if safe, error message string if blocked.
    +    """
    +    if not url or not url.strip():
    +        return "Empty URL"
    +
    +    try:
    +        parsed = urlparse(url)
    +    except Exception:
    +        return f"Invalid URL: {url}"
    +
    +    # Scheme check
    +    if parsed.scheme not in ("http", "https"):
    +        return f"Blocked scheme: {parsed.scheme} (only http/https allowed)"
    +
    +    hostname = parsed.hostname
    +    if not hostname:
    +        return "No hostname in URL"
    +
    +    # Check blocked hostnames
    +    blocked_hostnames = {"localhost", "metadata.google.internal"}
    +    if hostname.lower() in blocked_hostnames:
    +        return f"Blocked hostname: {hostname}"
    +
    +    # Check if hostname is an IP address
    +    ip_error = _check_ip_address(hostname)
    +    if ip_error:
    +        return ip_error
    +
    +    # Domain allowlist check
    +    allowlist = _get_domain_allowlist()
    +    if allowlist is not None:
    +        if not _hostname_matches_allowlist(hostname, allowlist):
    +            return f"Hostname {hostname} not in allowed domains"
    +
    +    # DNS resolution check - resolve hostname and check all IPs
    +    dns_error = _check_dns_resolution(hostname)
    +    if dns_error:
    +        return dns_error
    +
    +    return None
    +
    +
    +def _check_ip_address(hostname: str) -> str | None:
    +    """Check if hostname is a blocked IP address.
    +
    +    Args:
    +        hostname: The hostname to check.
    +
    +    Returns:
    +        None if safe, error message string if blocked.
    +    """
    +    try:
    +        addr = ipaddress.ip_address(hostname)
    +    except ValueError:
    +        return None  # Not an IP literal - skip
    +
    +    # Handle IPv4-mapped IPv6 (e.g., ::ffff:127.0.0.1)
    +    if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped:
    +        addr = addr.ipv4_mapped
    +
    +    if not addr.is_global:
    +        return f"Blocked IP address: {hostname} (non-global)"
    +
    +    return None
    +
    +
    +def _get_domain_allowlist() -> list[str] | None:
    +    """Get domain allowlist from environment variable.
    +
    +    Returns:
    +        List of allowed domain strings, or None if not set.
    +    """
    +    raw = os.environ.get("MCP_ALLOWED_URL_DOMAINS", "").strip()
    +    if not raw:
    +        return None
    +    return [d.strip().lower() for d in raw.split(",") if d.strip()]
    +
    +
    +def _hostname_matches_allowlist(
    +    hostname: str,
    +    allowlist: list[str],
    +) -> bool:
    +    """Check if hostname matches any entry in the allowlist.
    +
    +    Args:
    +        hostname: The hostname to check.
    +        allowlist: List of allowed domain strings.
    +
    +    Returns:
    +        True if hostname matches, False otherwise.
    +    """
    +    hostname_lower = hostname.lower()
    +    for domain in allowlist:
    +        if hostname_lower == domain or hostname_lower.endswith(f".{domain}"):
    +            return True
    +    return False
    +
    +
    +def _check_dns_resolution(hostname: str) -> str | None:
    +    """Resolve hostname via DNS and check if any IP is non-global.
    +
    +    Args:
    +        hostname: The hostname to resolve and check.
    +
    +    Returns:
    +        None if safe, error message string if blocked.
    +    """
    +    try:
    +        results = socket.getaddrinfo(hostname, None)
    +    except socket.gaierror:
    +        return f"DNS resolution failed for {hostname}"
    +    except (OSError, UnicodeError):
    +        return f"DNS resolution error for {hostname}"
    +
    +    for _family, _type, _proto, _canonname, sockaddr in results:
    +        ip_str = sockaddr[0]
    +        try:
    +            addr = ipaddress.ip_address(ip_str)
    +            # Handle IPv4-mapped IPv6
    +            if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped:
    +                addr = addr.ipv4_mapped
    +            if not addr.is_global:
    +                return f"DNS for {hostname} resolves to non-global IP: {ip_str}"
    +        except ValueError:
    +            continue
    +
    +    return None
    
  • tests/unit/servers/test_dependencies.py+82 0 modified
    @@ -411,6 +411,11 @@ def _create_mock_fetcher(fetcher_class, validation_return=None, validation_error
                 mock_fetcher.get_current_user_account_id.return_value = (
                     validation_return or "test-account-id"
                 )
    +        # Set up jira._session.hooks for SSRF redirect hook attachment
    +        mock_session = MagicMock()
    +        mock_session.hooks = {"response": []}
    +        mock_fetcher.jira = MagicMock()
    +        mock_fetcher.jira._session = mock_session
         elif fetcher_class == ConfluenceFetcher:
             if validation_error:
                 mock_fetcher.get_current_user_info.side_effect = validation_error
    @@ -419,6 +424,11 @@ def _create_mock_fetcher(fetcher_class, validation_return=None, validation_error
                     "email": "user@example.com",
                     "displayName": "Test User",
                 }
    +        # Set up confluence._session.hooks for SSRF redirect hook attachment
    +        mock_session = MagicMock()
    +        mock_session.hooks = {"response": []}
    +        mock_fetcher.confluence = MagicMock()
    +        mock_fetcher.confluence._session = mock_session
     
         return mock_fetcher
     
    @@ -1261,3 +1271,75 @@ def test_minimal_oauth_config_bearer_fallback_to_pat(self):
             )
             result = _resolve_bearer_auth_type(config, "oauth")
             assert result == "pat"
    +
    +
    +class TestSsrfProtection:
    +    """SSRF protection regression tests."""
    +
    +    def test_validate_rejects_private_ip(self) -> None:
    +        """Private IP URLs are rejected by SSRF validation."""
    +        from mcp_atlassian.utils.urls import validate_url_for_ssrf
    +
    +        result = validate_url_for_ssrf("http://127.0.0.1:8080")
    +        assert result is not None
    +
    +    def test_validate_rejects_metadata(self) -> None:
    +        """Cloud metadata endpoint is rejected."""
    +        from mcp_atlassian.utils.urls import validate_url_for_ssrf
    +
    +        result = validate_url_for_ssrf("http://169.254.169.254")
    +        assert result is not None
    +
    +    def test_validate_rejects_file_scheme(self) -> None:
    +        """file:// scheme is rejected."""
    +        from mcp_atlassian.utils.urls import validate_url_for_ssrf
    +
    +        result = validate_url_for_ssrf("file:///etc/passwd")
    +        assert result is not None
    +
    +    def test_redirect_hook_blocks_internal(self) -> None:
    +        """Redirect to internal IP is blocked by SSRF hook."""
    +        from mcp_atlassian.servers.dependencies import _make_ssrf_safe_hook
    +        from mcp_atlassian.utils.urls import validate_url_for_ssrf
    +
    +        hook = _make_ssrf_safe_hook(validate_url_for_ssrf)
    +
    +        # Create a mock response that simulates a redirect
    +        mock_response = MagicMock()
    +        mock_response.is_redirect = True
    +        mock_response.headers = {"Location": "http://169.254.169.254/latest/meta-data"}
    +
    +        with pytest.raises(ValueError, match="Redirect blocked"):
    +            hook(mock_response)
    +
    +    def test_redirect_hook_allows_safe(self) -> None:
    +        """Redirect to safe URL passes through."""
    +        from mcp_atlassian.servers.dependencies import _make_ssrf_safe_hook
    +        from mcp_atlassian.utils.urls import validate_url_for_ssrf
    +
    +        hook = _make_ssrf_safe_hook(validate_url_for_ssrf)
    +
    +        mock_response = MagicMock()
    +        mock_response.is_redirect = True
    +        mock_response.headers = {
    +            "Location": "https://company.atlassian.net/rest/api/2/issue"
    +        }
    +
    +        # Mock DNS for the redirect target
    +        with patch("mcp_atlassian.utils.urls.socket.getaddrinfo") as mock_dns:
    +            mock_dns.return_value = [(2, 1, 6, "", ("104.192.141.1", 0))]
    +            result = hook(mock_response)
    +            assert result == mock_response
    +
    +    def test_redirect_hook_ignores_non_redirect(self) -> None:
    +        """Non-redirect response passes through without checks."""
    +        from mcp_atlassian.servers.dependencies import _make_ssrf_safe_hook
    +        from mcp_atlassian.utils.urls import validate_url_for_ssrf
    +
    +        hook = _make_ssrf_safe_hook(validate_url_for_ssrf)
    +
    +        mock_response = MagicMock()
    +        mock_response.is_redirect = False
    +
    +        result = hook(mock_response)
    +        assert result == mock_response
    
  • tests/unit/utils/test_urls.py+137 1 modified
    @@ -1,6 +1,10 @@
     """Tests for the URL utilities module."""
     
    -from mcp_atlassian.utils.urls import is_atlassian_cloud_url
    +import os
    +import socket
    +from unittest.mock import patch
    +
    +from mcp_atlassian.utils.urls import is_atlassian_cloud_url, validate_url_for_ssrf
     
     
     def test_is_atlassian_cloud_url_empty():
    @@ -90,3 +94,135 @@ def test_is_atlassian_cloud_url_with_protocols():
         assert (
             is_atlassian_cloud_url("ftp://example.atlassian.net") is True
         )  # URL parsing still works
    +
    +
    +class TestValidateUrlForSsrf:
    +    """Tests for validate_url_for_ssrf."""
    +
    +    def test_valid_cloud_url(self) -> None:
    +        """Atlassian Cloud URL passes validation."""
    +        with patch("mcp_atlassian.utils.urls.socket.getaddrinfo") as mock_dns:
    +            mock_dns.return_value = [(2, 1, 6, "", ("104.192.141.1", 0))]
    +            assert validate_url_for_ssrf("https://company.atlassian.net") is None
    +
    +    def test_valid_server_url(self) -> None:
    +        """Server/DC URL passes validation."""
    +        with patch("mcp_atlassian.utils.urls.socket.getaddrinfo") as mock_dns:
    +            mock_dns.return_value = [(2, 1, 6, "", ("8.8.8.8", 0))]
    +            assert validate_url_for_ssrf("https://jira.example.com") is None
    +
    +    def test_empty_url(self) -> None:
    +        """Empty URL is rejected."""
    +        result = validate_url_for_ssrf("")
    +        assert result is not None
    +        assert "Empty" in result
    +
    +    def test_ftp_scheme(self) -> None:
    +        """FTP scheme is rejected."""
    +        result = validate_url_for_ssrf("ftp://evil.com")
    +        assert result is not None
    +        assert "scheme" in result.lower()
    +
    +    def test_file_scheme(self) -> None:
    +        """file:// scheme is rejected."""
    +        result = validate_url_for_ssrf("file:///etc/passwd")
    +        assert result is not None
    +        assert "scheme" in result.lower()
    +
    +    def test_localhost(self) -> None:
    +        """localhost is rejected."""
    +        result = validate_url_for_ssrf("http://localhost:8080")
    +        assert result is not None
    +        assert "localhost" in result.lower() or "Blocked" in result
    +
    +    def test_loopback_ip(self) -> None:
    +        """127.0.0.1 is rejected."""
    +        result = validate_url_for_ssrf("http://127.0.0.1")
    +        assert result is not None
    +
    +    def test_private_10(self) -> None:
    +        """10.x.x.x is rejected."""
    +        result = validate_url_for_ssrf("http://10.0.0.1")
    +        assert result is not None
    +
    +    def test_private_172(self) -> None:
    +        """172.16.x.x is rejected."""
    +        result = validate_url_for_ssrf("http://172.16.0.1")
    +        assert result is not None
    +
    +    def test_private_192(self) -> None:
    +        """192.168.x.x is rejected."""
    +        result = validate_url_for_ssrf("http://192.168.1.100")
    +        assert result is not None
    +
    +    def test_carrier_grade_nat(self) -> None:
    +        """100.64.x.x (CGNAT) is rejected."""
    +        result = validate_url_for_ssrf("http://100.64.0.1")
    +        assert result is not None
    +
    +    def test_cloud_metadata(self) -> None:
    +        """169.254.169.254 (cloud metadata) is rejected."""
    +        result = validate_url_for_ssrf("http://169.254.169.254")
    +        assert result is not None
    +
    +    def test_ipv6_loopback(self) -> None:
    +        """IPv6 loopback ::1 is rejected."""
    +        result = validate_url_for_ssrf("http://[::1]")
    +        assert result is not None
    +
    +    def test_dns_resolves_private(self) -> None:
    +        """Hostname resolving to private IP is rejected."""
    +        with patch("mcp_atlassian.utils.urls.socket.getaddrinfo") as mock_dns:
    +            mock_dns.return_value = [(2, 1, 6, "", ("10.0.0.1", 0))]
    +            result = validate_url_for_ssrf("https://evil.example.com")
    +            assert result is not None
    +            assert "non-global" in result.lower()
    +
    +    def test_dns_unresolvable(self) -> None:
    +        """Unresolvable hostname is rejected."""
    +        with patch("mcp_atlassian.utils.urls.socket.getaddrinfo") as mock_dns:
    +            mock_dns.side_effect = socket.gaierror("Name resolution failed")
    +            result = validate_url_for_ssrf("https://nonexistent.invalid")
    +            assert result is not None
    +            assert "DNS" in result
    +
    +    def test_allowlist_exact_match(self) -> None:
    +        """Domain allowlist allows exact match."""
    +        with patch.dict(
    +            os.environ,
    +            {"MCP_ALLOWED_URL_DOMAINS": "corp.com"},
    +        ):
    +            with patch("mcp_atlassian.utils.urls.socket.getaddrinfo") as mock_dns:
    +                mock_dns.return_value = [(2, 1, 6, "", ("8.8.8.8", 0))]
    +                assert validate_url_for_ssrf("https://corp.com") is None
    +
    +    def test_allowlist_subdomain_match(self) -> None:
    +        """Domain allowlist allows subdomain match."""
    +        with patch.dict(
    +            os.environ,
    +            {"MCP_ALLOWED_URL_DOMAINS": "atlassian.net"},
    +        ):
    +            with patch("mcp_atlassian.utils.urls.socket.getaddrinfo") as mock_dns:
    +                mock_dns.return_value = [(2, 1, 6, "", ("104.192.141.1", 0))]
    +                assert validate_url_for_ssrf("https://company.atlassian.net") is None
    +
    +    def test_allowlist_reject(self) -> None:
    +        """Domain allowlist rejects non-matching hostname."""
    +        with patch.dict(
    +            os.environ,
    +            {"MCP_ALLOWED_URL_DOMAINS": "atlassian.net"},
    +        ):
    +            result = validate_url_for_ssrf("https://evil.com")
    +            assert result is not None
    +            assert "not in allowed" in result.lower()
    +
    +    def test_metadata_google_internal(self) -> None:
    +        """GCP metadata endpoint is rejected."""
    +        result = validate_url_for_ssrf("http://metadata.google.internal")
    +        assert result is not None
    +        assert "Blocked hostname" in result
    +
    +    def test_ipv4_mapped_ipv6(self) -> None:
    +        """IPv4-mapped IPv6 loopback is rejected."""
    +        result = validate_url_for_ssrf("http://[::ffff:127.0.0.1]")
    +        assert result is not None
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

3

News mentions

0

No linked articles in our index yet.