CVE-2026-27826
Description
MCP Atlassian is a Model Context Protocol (MCP) server for Atlassian products (Confluence and Jira). Prior to version 0.17.0, an unauthenticated attacker who can reach the mcp-atlassian HTTP endpoint can force the server process to make outbound HTTP requests to an arbitrary attacker-controlled URL by supplying two custom HTTP headers without an Authorization header. No authentication is required. The vulnerability exists in the HTTP middleware and dependency injection layer — not in any MCP tool handler - making it invisible to tool-level code analysis. In cloud deployments, this could enable theft of IAM role credentials via the instance metadata endpoint (169[.]254[.]169[.]254). In any HTTP deployment it enables internal network reconnaissance and injection of attacker-controlled content into LLM tool results. Version 0.17.0 fixes the issue.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
mcp-atlassianPyPI | < 0.17.0 | 0.17.0 |
Affected products
1Patches
15cd697dfce91fix(server): add SSRF protection for header-based URL validation (#986)
7 files changed · +428 −2
.env.example+7 −0 modified@@ -196,6 +196,13 @@ MCP_VERY_VERBOSE=true # Enables DEBUG level logging (equivalent to 'mcp-atlass #CONFLUENCE_SOCKS_PROXY=socks5://confluence-proxy.example.com:1080 #CONFLUENCE_NO_PROXY=localhost,127.0.0.1,.internal.confluence.com +# --- SSRF Protection (Advanced) --- +# Optional: Comma-separated list of allowed URL domains for header-based authentication. +# When set, only URLs matching these domains (exact or subdomain) are accepted +# from X-Atlassian-Jira-Url and X-Atlassian-Confluence-Url headers. +# Example: MCP_ALLOWED_URL_DOMAINS=atlassian.net,jira.example.com +#MCP_ALLOWED_URL_DOMAINS= + # --- Custom HTTP Headers (Advanced) --- # Jira-specific custom headers. #JIRA_CUSTOM_HEADERS=X-Jira-Service=mcp-integration,X-Custom-Auth=jira-token,X-Forwarded-User=service-account
src/mcp_atlassian/servers/dependencies.py+40 −0 modified@@ -7,6 +7,7 @@ import dataclasses import logging +from collections.abc import Callable from typing import TYPE_CHECKING, Any from fastmcp import Context @@ -17,6 +18,7 @@ from mcp_atlassian.jira import JiraConfig, JiraFetcher from mcp_atlassian.servers.context import MainAppContext from mcp_atlassian.utils.oauth import OAuthConfig +from mcp_atlassian.utils.urls import validate_url_for_ssrf if TYPE_CHECKING: from mcp_atlassian.confluence.config import ( @@ -27,6 +29,34 @@ logger = logging.getLogger("mcp-atlassian.servers.dependencies") +def _make_ssrf_safe_hook( + validate_fn: Callable[[str], str | None], +) -> Callable[..., Any]: + """Create a requests response hook that validates redirect URLs. + + Blocks HTTP redirects that target internal/private IP addresses + to prevent SSRF via open-redirect chains. + + Args: + validate_fn: A function that returns None if safe, + error string if blocked. + + Returns: + A requests response hook function. + """ + + def hook(response: Any, **kwargs: Any) -> Any: + if response.is_redirect: + redirect_url = response.headers.get("Location", "") + error = validate_fn(redirect_url) + if error: + response.close() + raise ValueError(f"Redirect blocked (SSRF): {error}") + return response + + return hook + + def _resolve_bearer_auth_type( base_config: JiraConfig | ConfluenceConfig, middleware_auth_type: str, @@ -281,6 +311,11 @@ async def get_jira_fetcher(ctx: Context) -> JiraFetcher: ) try: header_jira_fetcher = JiraFetcher(config=header_config) + # Attach SSRF redirect hook to block + # 302 redirects to internal targets + header_jira_fetcher.jira._session.hooks["response"].append( + _make_ssrf_safe_hook(validate_url_for_ssrf) + ) current_user_id = header_jira_fetcher.get_current_user_account_id() logger.debug( f"get_jira_fetcher: Validated header-based Jira token for user ID: {current_user_id}" @@ -488,6 +523,11 @@ async def get_confluence_fetcher(ctx: Context) -> ConfluenceFetcher: ) try: header_confluence_fetcher = ConfluenceFetcher(config=header_config) + # Attach SSRF redirect hook to block + # 302 redirects to internal targets + header_confluence_fetcher.confluence._session.hooks["response"].append( + _make_ssrf_safe_hook(validate_url_for_ssrf) + ) current_user_data = header_confluence_fetcher.get_current_user_info() derived_email = ( current_user_data.get("email")
src/mcp_atlassian/servers/main.py+18 −0 modified@@ -27,6 +27,7 @@ from mcp_atlassian.utils.io import is_read_only_mode from mcp_atlassian.utils.logging import mask_sensitive from mcp_atlassian.utils.tools import get_enabled_tools, should_include_tool +from mcp_atlassian.utils.urls import validate_url_for_ssrf from .confluence import confluence_mcp from .context import MainAppContext @@ -472,6 +473,23 @@ def _process_authentication_headers(self, scope: Scope) -> None: else None ) + # Validate URLs to prevent SSRF + if jira_url_str: + ssrf_error = validate_url_for_ssrf(jira_url_str) + if ssrf_error: + scope["state"]["auth_validation_error"] = ( + f"Forbidden: Invalid Jira URL - {ssrf_error}" + ) + return + + if confluence_url_str: + ssrf_error = validate_url_for_ssrf(confluence_url_str) + if ssrf_error: + scope["state"]["auth_validation_error"] = ( + f"Forbidden: Invalid Confluence URL - {ssrf_error}" + ) + return + # Build service headers dict service_headers = {} if jira_token_str:
src/mcp_atlassian/utils/__init__.py+2 −1 modified@@ -16,7 +16,7 @@ # Export OAuth utilities from .oauth import OAuthConfig, configure_oauth_session from .ssl import SSLIgnoreAdapter, configure_ssl_verification -from .urls import is_atlassian_cloud_url +from .urls import is_atlassian_cloud_url, validate_url_for_ssrf # Export all utility functions for backward compatibility __all__ = [ @@ -32,4 +32,5 @@ "configure_oauth_session", "setup_signal_handlers", "ensure_clean_exit", + "validate_url_for_ssrf", ]
src/mcp_atlassian/utils/urls.py+142 −0 modified@@ -1,6 +1,9 @@ """URL-related utility functions for MCP Atlassian.""" +import ipaddress +import os import re +import socket from urllib.parse import urlparse @@ -40,3 +43,142 @@ def is_atlassian_cloud_url(url: str) -> bool: or ".atlassian-us-gov-mod.net" in hostname # US Gov Moderate (FedRAMP) or ".atlassian-us-gov.net" in hostname # US Gov (FedRAMP) ) + + +def validate_url_for_ssrf(url: str) -> str | None: + """Validate a URL to prevent SSRF attacks. + + Returns None if the URL is safe, or an error message string + describing why it was blocked. + + Args: + url: The URL to validate. + + Returns: + None if safe, error message string if blocked. + """ + if not url or not url.strip(): + return "Empty URL" + + try: + parsed = urlparse(url) + except Exception: + return f"Invalid URL: {url}" + + # Scheme check + if parsed.scheme not in ("http", "https"): + return f"Blocked scheme: {parsed.scheme} (only http/https allowed)" + + hostname = parsed.hostname + if not hostname: + return "No hostname in URL" + + # Check blocked hostnames + blocked_hostnames = {"localhost", "metadata.google.internal"} + if hostname.lower() in blocked_hostnames: + return f"Blocked hostname: {hostname}" + + # Check if hostname is an IP address + ip_error = _check_ip_address(hostname) + if ip_error: + return ip_error + + # Domain allowlist check + allowlist = _get_domain_allowlist() + if allowlist is not None: + if not _hostname_matches_allowlist(hostname, allowlist): + return f"Hostname {hostname} not in allowed domains" + + # DNS resolution check - resolve hostname and check all IPs + dns_error = _check_dns_resolution(hostname) + if dns_error: + return dns_error + + return None + + +def _check_ip_address(hostname: str) -> str | None: + """Check if hostname is a blocked IP address. + + Args: + hostname: The hostname to check. + + Returns: + None if safe, error message string if blocked. + """ + try: + addr = ipaddress.ip_address(hostname) + except ValueError: + return None # Not an IP literal - skip + + # Handle IPv4-mapped IPv6 (e.g., ::ffff:127.0.0.1) + if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped: + addr = addr.ipv4_mapped + + if not addr.is_global: + return f"Blocked IP address: {hostname} (non-global)" + + return None + + +def _get_domain_allowlist() -> list[str] | None: + """Get domain allowlist from environment variable. + + Returns: + List of allowed domain strings, or None if not set. + """ + raw = os.environ.get("MCP_ALLOWED_URL_DOMAINS", "").strip() + if not raw: + return None + return [d.strip().lower() for d in raw.split(",") if d.strip()] + + +def _hostname_matches_allowlist( + hostname: str, + allowlist: list[str], +) -> bool: + """Check if hostname matches any entry in the allowlist. + + Args: + hostname: The hostname to check. + allowlist: List of allowed domain strings. + + Returns: + True if hostname matches, False otherwise. + """ + hostname_lower = hostname.lower() + for domain in allowlist: + if hostname_lower == domain or hostname_lower.endswith(f".{domain}"): + return True + return False + + +def _check_dns_resolution(hostname: str) -> str | None: + """Resolve hostname via DNS and check if any IP is non-global. + + Args: + hostname: The hostname to resolve and check. + + Returns: + None if safe, error message string if blocked. + """ + try: + results = socket.getaddrinfo(hostname, None) + except socket.gaierror: + return f"DNS resolution failed for {hostname}" + except (OSError, UnicodeError): + return f"DNS resolution error for {hostname}" + + for _family, _type, _proto, _canonname, sockaddr in results: + ip_str = sockaddr[0] + try: + addr = ipaddress.ip_address(ip_str) + # Handle IPv4-mapped IPv6 + if isinstance(addr, ipaddress.IPv6Address) and addr.ipv4_mapped: + addr = addr.ipv4_mapped + if not addr.is_global: + return f"DNS for {hostname} resolves to non-global IP: {ip_str}" + except ValueError: + continue + + return None
tests/unit/servers/test_dependencies.py+82 −0 modified@@ -411,6 +411,11 @@ def _create_mock_fetcher(fetcher_class, validation_return=None, validation_error mock_fetcher.get_current_user_account_id.return_value = ( validation_return or "test-account-id" ) + # Set up jira._session.hooks for SSRF redirect hook attachment + mock_session = MagicMock() + mock_session.hooks = {"response": []} + mock_fetcher.jira = MagicMock() + mock_fetcher.jira._session = mock_session elif fetcher_class == ConfluenceFetcher: if validation_error: mock_fetcher.get_current_user_info.side_effect = validation_error @@ -419,6 +424,11 @@ def _create_mock_fetcher(fetcher_class, validation_return=None, validation_error "email": "user@example.com", "displayName": "Test User", } + # Set up confluence._session.hooks for SSRF redirect hook attachment + mock_session = MagicMock() + mock_session.hooks = {"response": []} + mock_fetcher.confluence = MagicMock() + mock_fetcher.confluence._session = mock_session return mock_fetcher @@ -1261,3 +1271,75 @@ def test_minimal_oauth_config_bearer_fallback_to_pat(self): ) result = _resolve_bearer_auth_type(config, "oauth") assert result == "pat" + + +class TestSsrfProtection: + """SSRF protection regression tests.""" + + def test_validate_rejects_private_ip(self) -> None: + """Private IP URLs are rejected by SSRF validation.""" + from mcp_atlassian.utils.urls import validate_url_for_ssrf + + result = validate_url_for_ssrf("http://127.0.0.1:8080") + assert result is not None + + def test_validate_rejects_metadata(self) -> None: + """Cloud metadata endpoint is rejected.""" + from mcp_atlassian.utils.urls import validate_url_for_ssrf + + result = validate_url_for_ssrf("http://169.254.169.254") + assert result is not None + + def test_validate_rejects_file_scheme(self) -> None: + """file:// scheme is rejected.""" + from mcp_atlassian.utils.urls import validate_url_for_ssrf + + result = validate_url_for_ssrf("file:///etc/passwd") + assert result is not None + + def test_redirect_hook_blocks_internal(self) -> None: + """Redirect to internal IP is blocked by SSRF hook.""" + from mcp_atlassian.servers.dependencies import _make_ssrf_safe_hook + from mcp_atlassian.utils.urls import validate_url_for_ssrf + + hook = _make_ssrf_safe_hook(validate_url_for_ssrf) + + # Create a mock response that simulates a redirect + mock_response = MagicMock() + mock_response.is_redirect = True + mock_response.headers = {"Location": "http://169.254.169.254/latest/meta-data"} + + with pytest.raises(ValueError, match="Redirect blocked"): + hook(mock_response) + + def test_redirect_hook_allows_safe(self) -> None: + """Redirect to safe URL passes through.""" + from mcp_atlassian.servers.dependencies import _make_ssrf_safe_hook + from mcp_atlassian.utils.urls import validate_url_for_ssrf + + hook = _make_ssrf_safe_hook(validate_url_for_ssrf) + + mock_response = MagicMock() + mock_response.is_redirect = True + mock_response.headers = { + "Location": "https://company.atlassian.net/rest/api/2/issue" + } + + # Mock DNS for the redirect target + with patch("mcp_atlassian.utils.urls.socket.getaddrinfo") as mock_dns: + mock_dns.return_value = [(2, 1, 6, "", ("104.192.141.1", 0))] + result = hook(mock_response) + assert result == mock_response + + def test_redirect_hook_ignores_non_redirect(self) -> None: + """Non-redirect response passes through without checks.""" + from mcp_atlassian.servers.dependencies import _make_ssrf_safe_hook + from mcp_atlassian.utils.urls import validate_url_for_ssrf + + hook = _make_ssrf_safe_hook(validate_url_for_ssrf) + + mock_response = MagicMock() + mock_response.is_redirect = False + + result = hook(mock_response) + assert result == mock_response
tests/unit/utils/test_urls.py+137 −1 modified@@ -1,6 +1,10 @@ """Tests for the URL utilities module.""" -from mcp_atlassian.utils.urls import is_atlassian_cloud_url +import os +import socket +from unittest.mock import patch + +from mcp_atlassian.utils.urls import is_atlassian_cloud_url, validate_url_for_ssrf def test_is_atlassian_cloud_url_empty(): @@ -90,3 +94,135 @@ def test_is_atlassian_cloud_url_with_protocols(): assert ( is_atlassian_cloud_url("ftp://example.atlassian.net") is True ) # URL parsing still works + + +class TestValidateUrlForSsrf: + """Tests for validate_url_for_ssrf.""" + + def test_valid_cloud_url(self) -> None: + """Atlassian Cloud URL passes validation.""" + with patch("mcp_atlassian.utils.urls.socket.getaddrinfo") as mock_dns: + mock_dns.return_value = [(2, 1, 6, "", ("104.192.141.1", 0))] + assert validate_url_for_ssrf("https://company.atlassian.net") is None + + def test_valid_server_url(self) -> None: + """Server/DC URL passes validation.""" + with patch("mcp_atlassian.utils.urls.socket.getaddrinfo") as mock_dns: + mock_dns.return_value = [(2, 1, 6, "", ("8.8.8.8", 0))] + assert validate_url_for_ssrf("https://jira.example.com") is None + + def test_empty_url(self) -> None: + """Empty URL is rejected.""" + result = validate_url_for_ssrf("") + assert result is not None + assert "Empty" in result + + def test_ftp_scheme(self) -> None: + """FTP scheme is rejected.""" + result = validate_url_for_ssrf("ftp://evil.com") + assert result is not None + assert "scheme" in result.lower() + + def test_file_scheme(self) -> None: + """file:// scheme is rejected.""" + result = validate_url_for_ssrf("file:///etc/passwd") + assert result is not None + assert "scheme" in result.lower() + + def test_localhost(self) -> None: + """localhost is rejected.""" + result = validate_url_for_ssrf("http://localhost:8080") + assert result is not None + assert "localhost" in result.lower() or "Blocked" in result + + def test_loopback_ip(self) -> None: + """127.0.0.1 is rejected.""" + result = validate_url_for_ssrf("http://127.0.0.1") + assert result is not None + + def test_private_10(self) -> None: + """10.x.x.x is rejected.""" + result = validate_url_for_ssrf("http://10.0.0.1") + assert result is not None + + def test_private_172(self) -> None: + """172.16.x.x is rejected.""" + result = validate_url_for_ssrf("http://172.16.0.1") + assert result is not None + + def test_private_192(self) -> None: + """192.168.x.x is rejected.""" + result = validate_url_for_ssrf("http://192.168.1.100") + assert result is not None + + def test_carrier_grade_nat(self) -> None: + """100.64.x.x (CGNAT) is rejected.""" + result = validate_url_for_ssrf("http://100.64.0.1") + assert result is not None + + def test_cloud_metadata(self) -> None: + """169.254.169.254 (cloud metadata) is rejected.""" + result = validate_url_for_ssrf("http://169.254.169.254") + assert result is not None + + def test_ipv6_loopback(self) -> None: + """IPv6 loopback ::1 is rejected.""" + result = validate_url_for_ssrf("http://[::1]") + assert result is not None + + def test_dns_resolves_private(self) -> None: + """Hostname resolving to private IP is rejected.""" + with patch("mcp_atlassian.utils.urls.socket.getaddrinfo") as mock_dns: + mock_dns.return_value = [(2, 1, 6, "", ("10.0.0.1", 0))] + result = validate_url_for_ssrf("https://evil.example.com") + assert result is not None + assert "non-global" in result.lower() + + def test_dns_unresolvable(self) -> None: + """Unresolvable hostname is rejected.""" + with patch("mcp_atlassian.utils.urls.socket.getaddrinfo") as mock_dns: + mock_dns.side_effect = socket.gaierror("Name resolution failed") + result = validate_url_for_ssrf("https://nonexistent.invalid") + assert result is not None + assert "DNS" in result + + def test_allowlist_exact_match(self) -> None: + """Domain allowlist allows exact match.""" + with patch.dict( + os.environ, + {"MCP_ALLOWED_URL_DOMAINS": "corp.com"}, + ): + with patch("mcp_atlassian.utils.urls.socket.getaddrinfo") as mock_dns: + mock_dns.return_value = [(2, 1, 6, "", ("8.8.8.8", 0))] + assert validate_url_for_ssrf("https://corp.com") is None + + def test_allowlist_subdomain_match(self) -> None: + """Domain allowlist allows subdomain match.""" + with patch.dict( + os.environ, + {"MCP_ALLOWED_URL_DOMAINS": "atlassian.net"}, + ): + with patch("mcp_atlassian.utils.urls.socket.getaddrinfo") as mock_dns: + mock_dns.return_value = [(2, 1, 6, "", ("104.192.141.1", 0))] + assert validate_url_for_ssrf("https://company.atlassian.net") is None + + def test_allowlist_reject(self) -> None: + """Domain allowlist rejects non-matching hostname.""" + with patch.dict( + os.environ, + {"MCP_ALLOWED_URL_DOMAINS": "atlassian.net"}, + ): + result = validate_url_for_ssrf("https://evil.com") + assert result is not None + assert "not in allowed" in result.lower() + + def test_metadata_google_internal(self) -> None: + """GCP metadata endpoint is rejected.""" + result = validate_url_for_ssrf("http://metadata.google.internal") + assert result is not None + assert "Blocked hostname" in result + + def test_ipv4_mapped_ipv6(self) -> None: + """IPv4-mapped IPv6 loopback is rejected.""" + result = validate_url_for_ssrf("http://[::ffff:127.0.0.1]") + assert result is not None
Vulnerability mechanics
Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
3- github.com/sooperset/mcp-atlassian/commit/5cd697dfce9116ef330b8dc7a91291640e0528d9nvdPatchWEB
- github.com/advisories/GHSA-7r34-79r5-rcc9ghsaADVISORY
- github.com/sooperset/mcp-atlassian/security/advisories/GHSA-7r34-79r5-rcc9nvdVendor AdvisoryExploitWEB
News mentions
0No linked articles in our index yet.