VYPR
High severityNVD Advisory· Published Feb 6, 2026· Updated Feb 9, 2026

Pydantic AI Affected by Server-Side Request Forgery (SSRF) in URL Download Handling

CVE-2026-25580

Description

Pydantic AI is a Python agent framework for building applications and workflows with Generative AI. From 0.0.26 to before 1.56.0, aServer-Side Request Forgery (SSRF) vulnerability exists in Pydantic AI's URL download functionality. When applications accept message history from untrusted sources, attackers can include malicious URLs that cause the server to make HTTP requests to internal network resources, potentially accessing internal services or cloud credentials. This vulnerability only affects applications that accept message history from external users. This vulnerability is fixed in 1.56.0.

AI Insight

LLM-synthesized narrative grounded in this CVE's description and references.

Server-Side Request Forgery in Pydantic AI's URL download allows attackers to access internal networks and cloud metadata when processing untrusted message history.

Pydantic AI is a Python agent framework for building Generative AI applications [2]. In versions 0.0.26 to before 1.56.0, a Server-Side Request Forgery (SSRF) vulnerability exists in its URL download functionality [3][4]. The download_item() helper function downloads content from URLs without validating whether the target is a public internet address, allowing requests to private IP ranges and cloud metadata endpoints [1].

Exploitation requires that the application accepts message history from untrusted sources, such as via Agent.to_web, VercelAIAdapter, AGUIAdapter, or custom APIs [3]. An attacker can supply malicious URLs in chat messages or file attachments, causing the server to make HTTP requests to internal network resources [3].

A successful attack can lead to accessing internal services, scanning internal networks, and stealing cloud provider credentials (e.g., AWS IMDSv1, GCP, Azure, Alibaba Cloud) [3].

The vulnerability is fixed in version 1.56.0, which implements URL validation that blocks requests to private networks and known cloud metadata endpoints [1][4].

AI Insight generated on May 19, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
pydantic-aiPyPI
>= 0.0.26, < 1.56.01.56.0
pydantic-ai-slimPyPI
>= 0.0.26, < 1.56.01.56.0

Affected products

2

Patches

1
d398bc9d39ae

Disallow downloading `FileUrl`s pointing at the local network by default (#4227)

https://github.com/pydantic/pydantic-aiDouwe MaanFeb 6, 2026via ghsa
13 files changed · +1222 70
  • pydantic_ai_slim/pydantic_ai/messages.py+20 9 modified
    @@ -111,6 +111,15 @@
     ]
     """Reason the model finished generating the response, normalized to OpenTelemetry values."""
     
    +ForceDownloadMode: TypeAlias = bool | Literal['allow-local']
    +"""Type for the force_download parameter on FileUrl subclasses.
    +
    +- `False`: The URL is sent directly to providers that support it. For providers that don't,
    +  the file is downloaded with SSRF protection (blocks private IPs and cloud metadata).
    +- `True`: The file is always downloaded with SSRF protection (blocks private IPs and cloud metadata).
    +- `'allow-local'`: The file is always downloaded, allowing private IPs but still blocking cloud metadata.
    +"""
    +
     ProviderDetailsDelta: TypeAlias = dict[str, Any] | Callable[[dict[str, Any] | None], dict[str, Any]] | None
     """Type for provider_details input: can be a static dict, a callback to update existing details, or None."""
     
    @@ -167,11 +176,13 @@ class FileUrl(ABC):
     
         _: KW_ONLY
     
    -    force_download: bool = False
    -    """For OpenAI, Google APIs and xAI it:
    +    force_download: ForceDownloadMode = False
    +    """Controls whether the file is downloaded and how SSRF protection is applied:
     
    -    * If True, the file is downloaded and the data is sent to the model as bytes.
    -    * If False, the URL is sent directly to the model and no download is performed.
    +    * If `False`, the URL is sent directly to providers that support it. For providers that don't,
    +      the file is downloaded with SSRF protection (blocks private IPs and cloud metadata).
    +    * If `True`, the file is always downloaded with SSRF protection (blocks private IPs and cloud metadata).
    +    * If `'allow-local'`, the file is always downloaded, allowing private IPs but still blocking cloud metadata.
         """
     
         vendor_metadata: dict[str, Any] | None = None
    @@ -199,7 +210,7 @@ def __init__(
             *,
             media_type: str | None = None,
             identifier: str | None = None,
    -        force_download: bool = False,
    +        force_download: ForceDownloadMode = False,
             vendor_metadata: dict[str, Any] | None = None,
             # Required for inline-snapshot which expects all dataclass `__init__` methods to take all field names as kwargs.
             _media_type: str | None = None,
    @@ -263,7 +274,7 @@ def __init__(
             *,
             media_type: str | None = None,
             identifier: str | None = None,
    -        force_download: bool = False,
    +        force_download: ForceDownloadMode = False,
             vendor_metadata: dict[str, Any] | None = None,
             kind: Literal['video-url'] = 'video-url',
             # Required for inline-snapshot which expects all dataclass `__init__` methods to take all field names as kwargs.
    @@ -322,7 +333,7 @@ def __init__(
             *,
             media_type: str | None = None,
             identifier: str | None = None,
    -        force_download: bool = False,
    +        force_download: ForceDownloadMode = False,
             vendor_metadata: dict[str, Any] | None = None,
             kind: Literal['audio-url'] = 'audio-url',
             # Required for inline-snapshot which expects all dataclass `__init__` methods to take all field names as kwargs.
    @@ -369,7 +380,7 @@ def __init__(
             *,
             media_type: str | None = None,
             identifier: str | None = None,
    -        force_download: bool = False,
    +        force_download: ForceDownloadMode = False,
             vendor_metadata: dict[str, Any] | None = None,
             kind: Literal['image-url'] = 'image-url',
             # Required for inline-snapshot which expects all dataclass `__init__` methods to take all field names as kwargs.
    @@ -415,7 +426,7 @@ def __init__(
             *,
             media_type: str | None = None,
             identifier: str | None = None,
    -        force_download: bool = False,
    +        force_download: ForceDownloadMode = False,
             vendor_metadata: dict[str, Any] | None = None,
             kind: Literal['document-url'] = 'document-url',
             # Required for inline-snapshot which expects all dataclass `__init__` methods to take all field names as kwargs.
    
  • pydantic_ai_slim/pydantic_ai/models/__init__.py+16 9 modified
    @@ -1278,6 +1278,14 @@ async def download_item(
     ) -> DownloadedItem[str] | DownloadedItem[bytes]:
         """Download an item by URL and return the content as a bytes object or a (base64-encoded) string.
     
    +    This function includes SSRF (Server-Side Request Forgery) protection:
    +    - Only http:// and https:// protocols are allowed
    +    - Private/internal IP addresses are blocked by default
    +    - Cloud metadata endpoints (169.254.169.254) are always blocked
    +    - Hostnames are resolved before requests to prevent DNS rebinding
    +
    +    Set `item.force_download='allow-local'` to allow private IP addresses.
    +
         Args:
             item: The item to download.
             data_format: The format to return the content in:
    @@ -1290,18 +1298,17 @@ async def download_item(
                 - `extension`: The media type as an extension.
     
         Raises:
    -        UserError: If the URL points to a YouTube video or its protocol is gs://.
    +        UserError: If the URL points to a YouTube video.
    +        ValueError: If the URL uses an unsupported protocol or targets a private/internal
    +            IP address (unless allow-local is set).
         """
    -    if item.url.startswith('gs://'):
    -        raise UserError('Downloading from protocol "gs://" is not supported.')
    -    elif item.url.startswith('s3://'):
    -        raise UserError('Downloading from protocol "s3://" is not supported.')
    -    elif isinstance(item, VideoUrl) and item.is_youtube:
    +    if isinstance(item, VideoUrl) and item.is_youtube:
             raise UserError('Downloading YouTube videos is not supported.')
     
    -    client = cached_async_http_client()
    -    response = await client.get(item.url, follow_redirects=True)
    -    response.raise_for_status()
    +    from .._ssrf import safe_download
    +
    +    allow_local = item.force_download == 'allow-local'
    +    response = await safe_download(item.url, allow_local=allow_local)
     
         if content_type := response.headers.get('content-type'):
             content_type = content_type.split(';')[0]
    
  • pydantic_ai_slim/pydantic_ai/_ssrf.py+368 0 added
    @@ -0,0 +1,368 @@
    +"""SSRF (Server-Side Request Forgery) protection for URL downloads.
    +
    +This module provides security measures to prevent SSRF attacks when downloading
    +content from URLs. It validates protocols, resolves hostnames to IP addresses,
    +and blocks requests to private/internal networks and cloud metadata endpoints.
    +"""
    +
    +from __future__ import annotations
    +
    +import ipaddress
    +import socket
    +from dataclasses import dataclass
    +from urllib.parse import urlparse, urlunparse
    +
    +import httpx
    +
    +from ._utils import run_in_executor
    +from .models import cached_async_http_client
    +
    +__all__ = ['safe_download']
    +
    +# Private IP ranges that should be blocked by default
    +_PRIVATE_NETWORKS: tuple[ipaddress.IPv4Network | ipaddress.IPv6Network, ...] = (
    +    # IPv4 private ranges
    +    ipaddress.IPv4Network('127.0.0.0/8'),  # Loopback
    +    ipaddress.IPv4Network('10.0.0.0/8'),  # Private
    +    ipaddress.IPv4Network('172.16.0.0/12'),  # Private
    +    ipaddress.IPv4Network('192.168.0.0/16'),  # Private
    +    ipaddress.IPv4Network('169.254.0.0/16'),  # Link-local (includes cloud metadata)
    +    ipaddress.IPv4Network('0.0.0.0/8'),  # "This" network
    +    ipaddress.IPv4Network('100.64.0.0/10'),  # CGNAT (RFC 6598), includes Alibaba Cloud metadata
    +    # IPv6 private ranges
    +    ipaddress.IPv6Network('::1/128'),  # Loopback
    +    ipaddress.IPv6Network('fe80::/10'),  # Link-local
    +    ipaddress.IPv6Network('fc00::/7'),  # Unique local address
    +    ipaddress.IPv6Network('2002::/16'),  # 6to4 (can embed private IPv4 addresses)
    +)
    +
    +# Cloud metadata IPs - always blocked, even with allow_local=True
    +# These need to be checked explicitly because when allow_local=True,
    +# we skip the private IP check but still need to block metadata endpoints.
    +_CLOUD_METADATA_IPS: frozenset[str] = frozenset(
    +    {
    +        '169.254.169.254',  # AWS, GCP, Azure metadata endpoint
    +        'fd00:ec2::254',  # AWS EC2 IPv6 metadata endpoint
    +        '100.100.100.200',  # Alibaba Cloud metadata endpoint
    +    }
    +)
    +
    +_MAX_REDIRECTS = 10
    +_DEFAULT_TIMEOUT = 30  # seconds
    +
    +
    +@dataclass
    +class ResolvedUrl:
    +    """Result of URL validation and DNS resolution."""
    +
    +    resolved_ip: str
    +    """The resolved IP address to connect to."""
    +
    +    hostname: str
    +    """The original hostname (used for Host header)."""
    +
    +    port: int
    +    """The port number."""
    +
    +    is_https: bool
    +    """Whether to use HTTPS."""
    +
    +    path: str
    +    """The path including query string and fragment."""
    +
    +
    +def is_cloud_metadata_ip(ip_str: str) -> bool:
    +    """Check if an IP address is a cloud metadata endpoint.
    +
    +    These are always blocked for security reasons, even with allow_local=True.
    +    """
    +    return ip_str in _CLOUD_METADATA_IPS
    +
    +
    +def is_private_ip(ip_str: str) -> bool:
    +    """Check if an IP address is in a private/internal range.
    +
    +    Handles both IPv4 and IPv6 addresses, including IPv4-mapped IPv6 addresses.
    +    """
    +    try:
    +        ip = ipaddress.ip_address(ip_str)
    +
    +        # Handle IPv4-mapped IPv6 addresses (e.g., ::ffff:192.168.1.1)
    +        if isinstance(ip, ipaddress.IPv6Address) and ip.ipv4_mapped:
    +            ip = ip.ipv4_mapped
    +
    +        return any(ip in network for network in _PRIVATE_NETWORKS)
    +    except ValueError:
    +        # Invalid IP address, treat as potentially dangerous
    +        return True
    +
    +
    +async def resolve_hostname(hostname: str) -> list[str]:
    +    """Resolve a hostname to its IP addresses using DNS.
    +
    +    Uses run_in_executor to run DNS resolution in a thread pool to avoid blocking.
    +
    +    Returns:
    +        List of IP address strings, preserving DNS order with duplicates removed.
    +
    +    Raises:
    +        ValueError: If DNS resolution fails.
    +    """
    +    try:
    +        # getaddrinfo returns list of (family, type, proto, canonname, sockaddr)
    +        # sockaddr is (ip, port) for IPv4 or (ip, port, flowinfo, scope_id) for IPv6
    +        results = await run_in_executor(socket.getaddrinfo, hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
    +        # Extract unique IP addresses, preserving order (first IP is typically preferred)
    +        seen: set[str] = set()
    +        ips: list[str] = []
    +        for result in results:
    +            ip = str(result[4][0])
    +            if ip not in seen:
    +                seen.add(ip)
    +                ips.append(ip)
    +        if not ips:
    +            raise ValueError(f'DNS resolution failed for hostname: {hostname}')  # pragma: no cover
    +        return ips
    +    except socket.gaierror as e:
    +        raise ValueError(f'DNS resolution failed for hostname "{hostname}": {e}') from e
    +
    +
    +def validate_url_protocol(url: str) -> tuple[str, bool]:
    +    """Validate that the URL uses an allowed protocol (http or https).
    +
    +    Args:
    +        url: The URL to validate.
    +
    +    Returns:
    +        Tuple of (scheme, is_https).
    +
    +    Raises:
    +        ValueError: If the protocol is not http or https.
    +    """
    +    parsed = urlparse(url)
    +    scheme = parsed.scheme.lower()
    +
    +    if scheme not in ('http', 'https'):
    +        raise ValueError(f'URL protocol "{scheme}" is not allowed. Only http:// and https:// are supported.')
    +
    +    return scheme, scheme == 'https'
    +
    +
    +def extract_host_and_port(url: str) -> tuple[str, str, int, bool]:
    +    """Extract hostname, path, port, and protocol info from a URL.
    +
    +    Returns:
    +        Tuple of (hostname, path_with_query, port, is_https)
    +
    +    Raises:
    +        ValueError: If the URL is malformed or uses an unsupported protocol.
    +    """
    +    # Validate protocol first, before trying to extract hostname
    +    _, is_https = validate_url_protocol(url)
    +
    +    parsed = urlparse(url)
    +    hostname = parsed.hostname
    +
    +    if not hostname:
    +        raise ValueError(f'Invalid URL: no hostname found in "{url}"')
    +
    +    default_port = 443 if is_https else 80
    +    port = parsed.port or default_port
    +
    +    # Reconstruct path with query string
    +    path = parsed.path or '/'
    +    if parsed.query:
    +        path = f'{path}?{parsed.query}'
    +    if parsed.fragment:
    +        path = f'{path}#{parsed.fragment}'
    +
    +    return hostname, path, port, is_https
    +
    +
    +def build_url_with_ip(resolved: ResolvedUrl) -> str:
    +    """Build a URL using a resolved IP address instead of the hostname.
    +
    +    For IPv6 addresses, wraps them in brackets as required by URL syntax.
    +    """
    +    scheme = 'https' if resolved.is_https else 'http'
    +    default_port = 443 if resolved.is_https else 80
    +
    +    # IPv6 addresses need brackets in URLs
    +    try:
    +        ip_obj = ipaddress.ip_address(resolved.resolved_ip)
    +        if isinstance(ip_obj, ipaddress.IPv6Address):
    +            host_part = f'[{resolved.resolved_ip}]'
    +        else:
    +            host_part = resolved.resolved_ip
    +    except ValueError:
    +        host_part = resolved.resolved_ip
    +
    +    # Only include port if non-default
    +    if resolved.port != default_port:
    +        host_part = f'{host_part}:{resolved.port}'
    +
    +    return urlunparse((scheme, host_part, resolved.path, '', '', ''))
    +
    +
    +async def validate_and_resolve_url(url: str, allow_local: bool) -> ResolvedUrl:
    +    """Validate URL and resolve hostname to IP addresses.
    +
    +    Performs protocol validation, DNS resolution, and IP validation.
    +
    +    Args:
    +        url: The URL to validate.
    +        allow_local: Whether to allow private/internal IP addresses.
    +
    +    Returns:
    +        ResolvedUrl with all the information needed to make the request.
    +
    +    Raises:
    +        ValueError: If the URL fails validation.
    +    """
    +    hostname, path, port, is_https = extract_host_and_port(url)
    +
    +    # Check if hostname is already an IP address
    +    try:
    +        # Handle IPv6 addresses in brackets
    +        ip_str = hostname.strip('[]')
    +        ipaddress.ip_address(ip_str)
    +        ips = [ip_str]
    +    except ValueError:
    +        # It's a hostname, resolve it
    +        ips = await resolve_hostname(hostname)
    +
    +    # Validate all resolved IPs
    +    for ip in ips:
    +        # Cloud metadata IPs are always blocked
    +        if is_cloud_metadata_ip(ip):
    +            raise ValueError(f'Access to cloud metadata service ({ip}) is blocked for security reasons.')
    +
    +        # Private IPs are blocked unless allow_local is True
    +        if not allow_local and is_private_ip(ip):
    +            raise ValueError(
    +                f'Access to private/internal IP address ({ip}) is blocked. '
    +                f'Use force_download="allow-local" to allow local network access.'
    +            )
    +
    +    # Use the first resolved IP
    +    return ResolvedUrl(
    +        resolved_ip=ips[0],
    +        hostname=hostname,
    +        port=port,
    +        is_https=is_https,
    +        path=path,
    +    )
    +
    +
    +def resolve_redirect_url(current_url: str, location: str) -> str:
    +    """Resolve a redirect location against the current URL.
    +
    +    Args:
    +        current_url: The URL that returned the redirect.
    +        location: The Location header value (absolute or relative).
    +
    +    Returns:
    +        The absolute URL to follow.
    +    """
    +    parsed_location = urlparse(location)
    +
    +    # Check if it's an absolute URL (has scheme) or protocol-relative URL (has netloc but no scheme)
    +    if parsed_location.scheme:
    +        return location
    +    if parsed_location.netloc:
    +        # Protocol-relative URL (e.g., "//example.com/path") - use current scheme
    +        parsed_current = urlparse(current_url)
    +        return urlunparse(
    +            (
    +                parsed_current.scheme,
    +                parsed_location.netloc,
    +                parsed_location.path,
    +                '',
    +                parsed_location.query,
    +                parsed_location.fragment,
    +            )
    +        )
    +
    +    # Relative URL - resolve against current URL
    +    parsed_current = urlparse(current_url)
    +    if location.startswith('/'):
    +        # Absolute path
    +        return urlunparse((parsed_current.scheme, parsed_current.netloc, location, '', '', ''))
    +    else:
    +        # Relative path
    +        base_path = parsed_current.path.rsplit('/', 1)[0]
    +        return urlunparse((parsed_current.scheme, parsed_current.netloc, f'{base_path}/{location}', '', '', ''))
    +
    +
    +async def safe_download(
    +    url: str,
    +    allow_local: bool = False,
    +    max_redirects: int = _MAX_REDIRECTS,
    +    timeout: int = _DEFAULT_TIMEOUT,
    +) -> httpx.Response:
    +    """Download content from a URL with SSRF protection.
    +
    +    This function:
    +    1. Validates the URL protocol (only http/https allowed)
    +    2. Resolves the hostname to IP addresses
    +    3. Validates that no resolved IP is private (unless allow_local=True)
    +    4. Always blocks cloud metadata endpoints
    +    5. Makes the request to the resolved IP with the Host header set
    +    6. Manually follows redirects, validating each hop
    +
    +    Args:
    +        url: The URL to download from.
    +        allow_local: If True, allows requests to private/internal IP addresses.
    +                    Cloud metadata endpoints are always blocked regardless.
    +        max_redirects: Maximum number of redirects to follow (default: 10).
    +        timeout: Request timeout in seconds (default: 30).
    +
    +    Returns:
    +        The httpx.Response object.
    +
    +    Raises:
    +        ValueError: If the URL fails SSRF validation or too many redirects occur.
    +        httpx.HTTPStatusError: If the response has an error status code.
    +    """
    +    current_url = url
    +    redirects_followed = 0
    +
    +    client = cached_async_http_client(timeout=timeout)
    +    while True:
    +        # Validate and resolve the current URL
    +        resolved = await validate_and_resolve_url(current_url, allow_local)
    +
    +        # Build URL with resolved IP
    +        request_url = build_url_with_ip(resolved)
    +
    +        # For HTTPS, set sni_hostname so TLS uses the original hostname for SNI
    +        # and certificate validation, even though we're connecting to the resolved IP.
    +        extensions: dict[str, str] = {}
    +        if resolved.is_https:
    +            extensions['sni_hostname'] = resolved.hostname
    +
    +        # Make request with Host header set to original hostname
    +        response = await client.get(
    +            request_url,
    +            headers={'Host': resolved.hostname},
    +            extensions=extensions,
    +            follow_redirects=False,
    +        )
    +
    +        # Check if we need to follow a redirect
    +        if response.is_redirect:
    +            redirects_followed += 1
    +            if redirects_followed > max_redirects:
    +                raise ValueError(f'Too many redirects ({redirects_followed}). Maximum allowed: {max_redirects}')
    +
    +            # Get redirect location
    +            location = response.headers.get('location')
    +            if not location:
    +                raise ValueError('Redirect response missing Location header')
    +
    +            current_url = resolve_redirect_url(current_url, location)
    +            continue
    +
    +        # Not a redirect, we're done
    +        response.raise_for_status()
    +        return response
    
  • tests/conftest.py+24 0 modified
    @@ -668,3 +668,27 @@ def generate_snapshot_id(node_id: str) -> str:
             return f'{node_id}:{i}'
     
         return mocker.patch('pydantic_graph.nodes.generate_snapshot_id', side_effect=generate_snapshot_id)
    +
    +
    +@pytest.fixture
    +def disable_ssrf_protection_for_vcr():
    +    """Disable SSRF protection for VCR compatibility.
    +
    +    VCR cassettes record requests with the original hostname. Since SSRF protection
    +    resolves hostnames to IPs before making requests, we need to disable the validation
    +    for VCR tests to match the pre-recorded cassettes.
    +
    +    This fixture patches validate_and_resolve_url to return the hostname in place
    +    of the resolved IP, allowing the request URL to use the original hostname.
    +    """
    +    from unittest.mock import patch
    +
    +    from pydantic_ai._ssrf import ResolvedUrl, extract_host_and_port
    +
    +    async def mock_validate_and_resolve(url: str, allow_local: bool) -> ResolvedUrl:
    +        hostname, path, port, is_https = extract_host_and_port(url)
    +        # Return hostname in place of resolved IP - this allows VCR matching
    +        return ResolvedUrl(resolved_ip=hostname, hostname=hostname, port=port, is_https=is_https, path=path)
    +
    +    with patch('pydantic_ai._ssrf.validate_and_resolve_url', mock_validate_and_resolve):
    +        yield
    
  • tests/models/test_anthropic.py+6 2 modified
    @@ -1557,7 +1557,9 @@ async def test_image_url_input(allow_model_requests: None, anthropic_api_key: st
         )
     
     
    -async def test_image_url_input_force_download(allow_model_requests: None, anthropic_api_key: str):
    +async def test_image_url_input_force_download(
    +    allow_model_requests: None, anthropic_api_key: str, disable_ssrf_protection_for_vcr: None
    +):
         m = AnthropicModel('claude-haiku-4-5', provider=AnthropicProvider(api_key=anthropic_api_key))
         agent = Agent(m)
     
    @@ -1919,7 +1921,9 @@ async def test_document_url_input(allow_model_requests: None, anthropic_api_key:
         )
     
     
    -async def test_text_document_url_input(allow_model_requests: None, anthropic_api_key: str):
    +async def test_text_document_url_input(
    +    allow_model_requests: None, anthropic_api_key: str, disable_ssrf_protection_for_vcr: None
    +):
         m = AnthropicModel('claude-sonnet-4-5', provider=AnthropicProvider(api_key=anthropic_api_key))
         agent = Agent(m)
     
    
  • tests/models/test_bedrock.py+12 4 modified
    @@ -734,7 +734,9 @@ async def test_video_as_binary_content_input(
     
     
     @pytest.mark.vcr()
    -async def test_image_url_input(allow_model_requests: None, bedrock_provider: BedrockProvider):
    +async def test_image_url_input(
    +    allow_model_requests: None, bedrock_provider: BedrockProvider, disable_ssrf_protection_for_vcr: None
    +):
         m = BedrockConverseModel('us.amazon.nova-pro-v1:0', provider=bedrock_provider)
         agent = Agent(m, instructions='You are a helpful chatbot.')
     
    @@ -750,7 +752,9 @@ async def test_image_url_input(allow_model_requests: None, bedrock_provider: Bed
     
     
     @pytest.mark.vcr()
    -async def test_video_url_input(allow_model_requests: None, bedrock_provider: BedrockProvider):
    +async def test_video_url_input(
    +    allow_model_requests: None, bedrock_provider: BedrockProvider, disable_ssrf_protection_for_vcr: None
    +):
         m = BedrockConverseModel('us.amazon.nova-pro-v1:0', provider=bedrock_provider)
         agent = Agent(m, instructions='You are a helpful chatbot.')
     
    @@ -766,7 +770,9 @@ async def test_video_url_input(allow_model_requests: None, bedrock_provider: Bed
     
     
     @pytest.mark.vcr()
    -async def test_document_url_input(allow_model_requests: None, bedrock_provider: BedrockProvider):
    +async def test_document_url_input(
    +    allow_model_requests: None, bedrock_provider: BedrockProvider, disable_ssrf_protection_for_vcr: None
    +):
         m = BedrockConverseModel('anthropic.claude-v2', provider=bedrock_provider)
         agent = Agent(m, instructions='You are a helpful chatbot.')
     
    @@ -779,7 +785,9 @@ async def test_document_url_input(allow_model_requests: None, bedrock_provider:
     
     
     @pytest.mark.vcr()
    -async def test_text_document_url_input(allow_model_requests: None, bedrock_provider: BedrockProvider):
    +async def test_text_document_url_input(
    +    allow_model_requests: None, bedrock_provider: BedrockProvider, disable_ssrf_protection_for_vcr: None
    +):
         m = BedrockConverseModel('anthropic.claude-v2', provider=bedrock_provider)
         agent = Agent(m, instructions='You are a helpful chatbot.')
     
    
  • tests/models/test_download_item.py+25 26 modified
    @@ -9,34 +9,33 @@
     
     
     @pytest.mark.parametrize(
    -    'url',
    +    ('url', 'protocol'),
         (
    -        pytest.param(AudioUrl(url='gs://pydantic-ai-dev/openai-alloy.wav')),
    -        pytest.param(DocumentUrl(url='gs://pydantic-ai-dev/Gemini_1_5_Pro_Technical_Report_Arxiv_1805.pdf')),
    -        pytest.param(ImageUrl(url='gs://pydantic-ai-dev/wikipedia_screenshot.png')),
    -        pytest.param(VideoUrl(url='gs://pydantic-ai-dev/grepit-tiny-video.mp4')),
    -    ),
    -)
    -async def test_download_item_raises_user_error_with_gs_uri(
    -    url: AudioUrl | DocumentUrl | ImageUrl | VideoUrl,
    -) -> None:
    -    with pytest.raises(UserError, match='Downloading from protocol "gs://" is not supported.'):
    -        _ = await download_item(url, data_format='bytes')
    -
    -
    -@pytest.mark.parametrize(
    -    'url',
    -    (
    -        pytest.param(AudioUrl(url='s3://my-bucket/audio.wav')),
    -        pytest.param(DocumentUrl(url='s3://my-bucket/document.pdf')),
    -        pytest.param(ImageUrl(url='s3://my-bucket/image.png')),
    -        pytest.param(VideoUrl(url='s3://my-bucket/video.mp4')),
    +        pytest.param(AudioUrl(url='gs://pydantic-ai-dev/openai-alloy.wav', force_download=True), 'gs', id='gs-audio'),
    +        pytest.param(
    +            DocumentUrl(url='gs://pydantic-ai-dev/Gemini_1_5_Pro_Technical_Report_Arxiv_1805.pdf', force_download=True),
    +            'gs',
    +            id='gs-document',
    +        ),
    +        pytest.param(
    +            ImageUrl(url='gs://pydantic-ai-dev/wikipedia_screenshot.png', force_download=True), 'gs', id='gs-image'
    +        ),
    +        pytest.param(
    +            VideoUrl(url='gs://pydantic-ai-dev/grepit-tiny-video.mp4', force_download=True), 'gs', id='gs-video'
    +        ),
    +        pytest.param(AudioUrl(url='s3://my-bucket/audio.wav', force_download=True), 's3', id='s3-audio'),
    +        pytest.param(DocumentUrl(url='s3://my-bucket/document.pdf', force_download=True), 's3', id='s3-document'),
    +        pytest.param(ImageUrl(url='s3://my-bucket/image.png', force_download=True), 's3', id='s3-image'),
    +        pytest.param(VideoUrl(url='s3://my-bucket/video.mp4', force_download=True), 's3', id='s3-video'),
    +        pytest.param(DocumentUrl(url='file:///etc/passwd', force_download=True), 'file', id='file-document'),
    +        pytest.param(ImageUrl(url='ftp://ftp.example.com/image.png', force_download=True), 'ftp', id='ftp-image'),
         ),
     )
    -async def test_download_item_raises_user_error_with_s3_uri(
    +async def test_download_item_raises_user_error_with_unsupported_protocol(
         url: AudioUrl | DocumentUrl | ImageUrl | VideoUrl,
    +    protocol: str,
     ) -> None:
    -    with pytest.raises(UserError, match='Downloading from protocol "s3://" is not supported.'):
    +    with pytest.raises(ValueError, match=f'URL protocol "{protocol}" is not allowed'):
             _ = await download_item(url, data_format='bytes')
     
     
    @@ -46,7 +45,7 @@ async def test_download_item_raises_user_error_with_youtube_url() -> None:
     
     
     @pytest.mark.vcr()
    -async def test_download_item_application_octet_stream() -> None:
    +async def test_download_item_application_octet_stream(disable_ssrf_protection_for_vcr: None) -> None:
         downloaded_item = await download_item(
             VideoUrl(
                 url='https://raw.githubusercontent.com/pydantic/pydantic-ai/refs/heads/main/tests/assets/small_video.mp4'
    @@ -58,7 +57,7 @@ async def test_download_item_application_octet_stream() -> None:
     
     
     @pytest.mark.vcr()
    -async def test_download_item_audio_mpeg() -> None:
    +async def test_download_item_audio_mpeg(disable_ssrf_protection_for_vcr: None) -> None:
         downloaded_item = await download_item(
             AudioUrl(url='https://smokeshow.helpmanual.io/4l1l1s0s6q4741012x1w/common_voice_en_537507.mp3'),
             data_format='bytes',
    @@ -68,7 +67,7 @@ async def test_download_item_audio_mpeg() -> None:
     
     
     @pytest.mark.vcr()
    -async def test_download_item_no_content_type() -> None:
    +async def test_download_item_no_content_type(disable_ssrf_protection_for_vcr: None) -> None:
         downloaded_item = await download_item(
             DocumentUrl(url='https://raw.githubusercontent.com/pydantic/pydantic-ai/refs/heads/main/docs/help.md'),
             data_format='text',
    
  • tests/models/test_gemini.py+9 3 modified
    @@ -1355,7 +1355,9 @@ async def test_image_as_binary_content_input(
     
     
     @pytest.mark.vcr()
    -async def test_image_url_input(allow_model_requests: None, gemini_api_key: str) -> None:
    +async def test_image_url_input(
    +    allow_model_requests: None, gemini_api_key: str, disable_ssrf_protection_for_vcr: None
    +) -> None:
         m = GeminiModel('gemini-2.0-flash-exp', provider=GoogleGLAProvider(api_key=gemini_api_key))
         agent = Agent(m)
     
    @@ -1389,7 +1391,9 @@ async def test_video_as_binary_content_input(
     
     
     @pytest.mark.vcr()
    -async def test_video_url_input(allow_model_requests: None, gemini_api_key: str) -> None:
    +async def test_video_url_input(
    +    allow_model_requests: None, gemini_api_key: str, disable_ssrf_protection_for_vcr: None
    +) -> None:
         m = GeminiModel('gemini-2.5-flash', provider=GoogleGLAProvider(api_key=gemini_api_key))
         agent = Agent(m, instructions='You are a helpful chatbot.')
     
    @@ -1416,7 +1420,9 @@ async def test_video_url_input(allow_model_requests: None, gemini_api_key: str)
     
     
     @pytest.mark.vcr()
    -async def test_document_url_input(allow_model_requests: None, gemini_api_key: str) -> None:
    +async def test_document_url_input(
    +    allow_model_requests: None, gemini_api_key: str, disable_ssrf_protection_for_vcr: None
    +) -> None:
         m = GeminiModel('gemini-2.0-flash-thinking-exp-01-21', provider=GoogleGLAProvider(api_key=gemini_api_key))
         agent = Agent(m)
     
    
  • tests/models/test_gemini_vertex.py+4 3 modified
    @@ -17,7 +17,6 @@
         UserPromptPart,
         VideoUrl,
     )
    -from pydantic_ai.exceptions import UserError
     from pydantic_ai.models.gemini import GeminiModel, GeminiModelSettings
     from pydantic_ai.usage import RequestUsage
     
    @@ -163,7 +162,9 @@ async def test_url_input(
         not os.getenv('CI', False), reason='Requires properly configured local google vertex config to pass'
     )
     @pytest.mark.vcr()
    -async def test_url_input_force_download(allow_model_requests: None) -> None:  # pragma: lax no cover
    +async def test_url_input_force_download(
    +    allow_model_requests: None, disable_ssrf_protection_for_vcr: None
    +) -> None:  # pragma: lax no cover
         provider = GoogleVertexProvider(project_id='pydantic-ai', region='us-central1')
         m = GeminiModel('gemini-2.0-flash', provider=provider)
         agent = Agent(m)
    @@ -207,5 +208,5 @@ async def test_gs_url_force_download_raises_user_error(allow_model_requests: Non
         agent = Agent(m)
     
         url = ImageUrl(url='gs://pydantic-ai-dev/wikipedia_screenshot.png', force_download=True)
    -    with pytest.raises(UserError, match='Downloading from protocol "gs://" is not supported.'):
    +    with pytest.raises(ValueError, match='URL protocol "gs" is not allowed'):
             _ = await agent.run(['What is the main content of this URL?', url])
    
  • tests/models/test_google.py+16 7 modified
    @@ -848,7 +848,9 @@ async def test_google_model_video_as_binary_content_input_with_vendor_metadata(
     """)
     
     
    -async def test_google_model_image_url_input(allow_model_requests: None, google_provider: GoogleProvider):
    +async def test_google_model_image_url_input(
    +    allow_model_requests: None, google_provider: GoogleProvider, disable_ssrf_protection_for_vcr: None
    +):
         m = GoogleModel('gemini-2.0-flash', provider=google_provider)
         agent = Agent(m, instructions='You are a helpful chatbot.')
     
    @@ -861,7 +863,9 @@ async def test_google_model_image_url_input(allow_model_requests: None, google_p
         assert result.output == snapshot('That is a potato.')
     
     
    -async def test_google_model_video_url_input(allow_model_requests: None, google_provider: GoogleProvider):
    +async def test_google_model_video_url_input(
    +    allow_model_requests: None, google_provider: GoogleProvider, disable_ssrf_protection_for_vcr: None
    +):
         m = GoogleModel('gemini-2.0-flash', provider=google_provider)
         agent = Agent(m, instructions='You are a helpful chatbot.')
     
    @@ -921,7 +925,9 @@ async def test_google_model_youtube_video_url_input_with_vendor_metadata(
     """)
     
     
    -async def test_google_model_document_url_input(allow_model_requests: None, google_provider: GoogleProvider):
    +async def test_google_model_document_url_input(
    +    allow_model_requests: None, google_provider: GoogleProvider, disable_ssrf_protection_for_vcr: None
    +):
         m = GoogleModel('gemini-2.0-flash', provider=google_provider)
         agent = Agent(m, instructions='You are a helpful chatbot.')
     
    @@ -931,7 +937,9 @@ async def test_google_model_document_url_input(allow_model_requests: None, googl
         assert result.output == snapshot('The document appears to be a dummy PDF file.\n')
     
     
    -async def test_google_model_text_document_url_input(allow_model_requests: None, google_provider: GoogleProvider):
    +async def test_google_model_text_document_url_input(
    +    allow_model_requests: None, google_provider: GoogleProvider, disable_ssrf_protection_for_vcr: None
    +):
         m = GoogleModel('gemini-2.0-flash', provider=google_provider)
         agent = Agent(m, instructions='You are a helpful chatbot.')
     
    @@ -2531,7 +2539,7 @@ async def test_google_url_input(
     )
     @pytest.mark.vcr()
     async def test_google_url_input_force_download(
    -    allow_model_requests: None, vertex_provider: GoogleProvider
    +    allow_model_requests: None, vertex_provider: GoogleProvider, disable_ssrf_protection_for_vcr: None
     ) -> None:  # pragma: lax no cover
         m = GoogleModel('gemini-2.0-flash', provider=vertex_provider)
         agent = Agent(m)
    @@ -2576,7 +2584,7 @@ async def test_google_gs_url_force_download_raises_user_error(allow_model_reques
         agent = Agent(m)
     
         url = ImageUrl(url='gs://pydantic-ai-dev/wikipedia_screenshot.png', force_download=True)
    -    with pytest.raises(UserError, match='Downloading from protocol "gs://" is not supported.'):
    +    with pytest.raises(ValueError, match='URL protocol "gs" is not allowed'):
             _ = await agent.run(['What is the main content of this URL?', url])
     
     
    @@ -4805,14 +4813,15 @@ async def test_gcs_video_url_raises_error_on_google_gla():
     
         google-gla cannot access GCS buckets, so attempting to use gs:// URLs
         should fail with a helpful error message rather than a cryptic API error.
    +    SSRF protection now catches non-http(s) protocols first.
         """
         model = GoogleModel('gemini-1.5-flash', provider=GoogleProvider(api_key='test-key'))
         # google-gla is the default for GoogleProvider with api_key, but be explicit
         assert model.system == 'google-gla'
     
         video = VideoUrl(url='gs://bucket/video.mp4')
     
    -    with pytest.raises(UserError, match='Downloading from protocol "gs://" is not supported'):
    +    with pytest.raises(ValueError, match='URL protocol "gs" is not allowed'):
             await model._map_user_prompt(UserPromptPart(content=[video]))  # pyright: ignore[reportPrivateUsage]
     
     
    
  • tests/models/test_openai.py+18 6 modified
    @@ -947,7 +947,9 @@ async def test_image_url_input(allow_model_requests: None):
         )
     
     
    -async def test_image_url_input_force_download(allow_model_requests: None, openai_api_key: str):
    +async def test_image_url_input_force_download(
    +    allow_model_requests: None, openai_api_key: str, disable_ssrf_protection_for_vcr: None
    +):
         provider = OpenAIProvider(api_key=openai_api_key)
         m = OpenAIChatModel('gpt-4.1-nano', provider=provider)
         agent = Agent(m)
    @@ -964,7 +966,9 @@ async def test_image_url_input_force_download(allow_model_requests: None, openai
         assert result.output == snapshot('This vegetable is a potato.')
     
     
    -async def test_image_url_input_force_download_response_api(allow_model_requests: None, openai_api_key: str):
    +async def test_image_url_input_force_download_response_api(
    +    allow_model_requests: None, openai_api_key: str, disable_ssrf_protection_for_vcr: None
    +):
         provider = OpenAIProvider(api_key=openai_api_key)
         m = OpenAIResponsesModel('gpt-4.1-nano', provider=provider)
         agent = Agent(m)
    @@ -981,7 +985,9 @@ async def test_image_url_input_force_download_response_api(allow_model_requests:
         assert result.output == snapshot('This is a potato.')
     
     
    -async def test_openai_audio_url_input(allow_model_requests: None, openai_api_key: str):
    +async def test_openai_audio_url_input(
    +    allow_model_requests: None, openai_api_key: str, disable_ssrf_protection_for_vcr: None
    +):
         m = OpenAIChatModel('gpt-4o-audio-preview', provider=OpenAIProvider(api_key=openai_api_key))
         agent = Agent(m)
     
    @@ -1006,7 +1012,9 @@ async def test_openai_audio_url_input(allow_model_requests: None, openai_api_key
         )
     
     
    -async def test_document_url_input(allow_model_requests: None, openai_api_key: str):
    +async def test_document_url_input(
    +    allow_model_requests: None, openai_api_key: str, disable_ssrf_protection_for_vcr: None
    +):
         m = OpenAIChatModel('gpt-4o', provider=OpenAIProvider(api_key=openai_api_key))
         agent = Agent(m)
     
    @@ -1028,7 +1036,9 @@ async def test_document_url_input_response_api(allow_model_requests: None, opena
         assert 'Dummy PDF' in result.output
     
     
    -async def test_document_url_input_force_download_response_api(allow_model_requests: None, openai_api_key: str):
    +async def test_document_url_input_force_download_response_api(
    +    allow_model_requests: None, openai_api_key: str, disable_ssrf_protection_for_vcr: None
    +):
         """Test DocumentUrl with force_download=True downloads and sends as file_data."""
         provider = OpenAIProvider(api_key=openai_api_key)
         m = OpenAIResponsesModel('gpt-4.1-nano', provider=provider)
    @@ -1419,7 +1429,9 @@ async def test_document_as_binary_content_input(
         assert result.output == snapshot('The main content of the document is "Dummy PDF file."')
     
     
    -async def test_text_document_url_input(allow_model_requests: None, openai_api_key: str):
    +async def test_text_document_url_input(
    +    allow_model_requests: None, openai_api_key: str, disable_ssrf_protection_for_vcr: None
    +):
         m = OpenAIChatModel('gpt-4o', provider=OpenAIProvider(api_key=openai_api_key))
         agent = Agent(m)
     
    
  • tests/models/test_outlines.py+1 1 modified
    @@ -508,7 +508,7 @@ def test_request_image_binary(transformers_multimodal_model: OutlinesModel, bina
     
     
     @skip_if_transformers_imports_unsuccessful
    -def test_request_image_url(transformers_multimodal_model: OutlinesModel) -> None:
    +def test_request_image_url(transformers_multimodal_model: OutlinesModel, disable_ssrf_protection_for_vcr: None) -> None:
         agent = Agent(transformers_multimodal_model)
         result = agent.run_sync(
             [
    
  • tests/test_ssrf.py+703 0 added
    @@ -0,0 +1,703 @@
    +"""Tests for SSRF (Server-Side Request Forgery) protection."""
    +
    +from __future__ import annotations
    +
    +from unittest.mock import AsyncMock, patch
    +
    +import pytest
    +
    +from pydantic_ai._ssrf import (
    +    _DEFAULT_TIMEOUT,  # pyright: ignore[reportPrivateUsage]
    +    _MAX_REDIRECTS,  # pyright: ignore[reportPrivateUsage]
    +    ResolvedUrl,
    +    build_url_with_ip,
    +    extract_host_and_port,
    +    is_cloud_metadata_ip,
    +    is_private_ip,
    +    resolve_hostname,
    +    resolve_redirect_url,
    +    safe_download,
    +    validate_and_resolve_url,
    +    validate_url_protocol,
    +)
    +
    +pytestmark = [pytest.mark.anyio]
    +
    +
    +class TestIsPrivateIp:
    +    """Tests for is_private_ip function."""
    +
    +    @pytest.mark.parametrize(
    +        'ip',
    +        [
    +            # IPv4 loopback
    +            '127.0.0.1',
    +            '127.0.0.2',
    +            '127.255.255.255',
    +            # IPv4 private class A
    +            '10.0.0.1',
    +            '10.255.255.255',
    +            # IPv4 private class B
    +            '172.16.0.1',
    +            '172.31.255.255',
    +            # IPv4 private class C
    +            '192.168.0.1',
    +            '192.168.255.255',
    +            # IPv4 link-local
    +            '169.254.0.1',
    +            '169.254.255.255',
    +            # IPv4 "this" network
    +            '0.0.0.0',
    +            '0.255.255.255',
    +            # IPv4 CGNAT (RFC 6598)
    +            '100.64.0.1',
    +            '100.127.255.255',
    +            '100.100.100.200',  # Alibaba Cloud metadata
    +            # IPv6 loopback
    +            '::1',
    +            # IPv6 link-local
    +            'fe80::1',
    +            'fe80::ffff:ffff:ffff:ffff',
    +            # IPv6 unique local
    +            'fc00::1',
    +            'fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff',
    +            # IPv6 6to4 (can embed private IPv4)
    +            '2002::1',
    +            '2002:c0a8:0101::1',  # Embeds 192.168.1.1
    +            '2002:0a00:0001::1',  # Embeds 10.0.0.1
    +        ],
    +    )
    +    def test_private_ips_detected(self, ip: str) -> None:
    +        assert is_private_ip(ip) is True
    +
    +    @pytest.mark.parametrize(
    +        'ip',
    +        [
    +            # Public IPv4
    +            '8.8.8.8',
    +            '1.1.1.1',
    +            '203.0.113.50',
    +            '198.51.100.1',
    +            # Public IPv6
    +            '2001:4860:4860::8888',
    +            '2606:4700:4700::1111',
    +        ],
    +    )
    +    def test_public_ips_allowed(self, ip: str) -> None:
    +        assert is_private_ip(ip) is False
    +
    +    @pytest.mark.parametrize(
    +        'ip',
    +        [
    +            # IPv4-mapped IPv6 private addresses
    +            '::ffff:127.0.0.1',
    +            '::ffff:10.0.0.1',
    +            '::ffff:192.168.1.1',
    +            '::ffff:172.16.0.1',
    +        ],
    +    )
    +    def test_ipv4_mapped_ipv6_private(self, ip: str) -> None:
    +        assert is_private_ip(ip) is True
    +
    +    @pytest.mark.parametrize(
    +        'ip',
    +        [
    +            # IPv4-mapped IPv6 public addresses
    +            '::ffff:8.8.8.8',
    +            '::ffff:1.1.1.1',
    +        ],
    +    )
    +    def test_ipv4_mapped_ipv6_public(self, ip: str) -> None:
    +        assert is_private_ip(ip) is False
    +
    +    def test_invalid_ip_treated_as_private(self) -> None:
    +        """Invalid IP addresses should be treated as potentially dangerous."""
    +        assert is_private_ip('not-an-ip') is True
    +        assert is_private_ip('') is True
    +
    +
    +class TestIsCloudMetadataIp:
    +    """Tests for is_cloud_metadata_ip function."""
    +
    +    @pytest.mark.parametrize(
    +        'ip',
    +        [
    +            '169.254.169.254',  # AWS, GCP, Azure
    +            'fd00:ec2::254',  # AWS EC2 IPv6
    +            '100.100.100.200',  # Alibaba Cloud
    +        ],
    +    )
    +    def test_cloud_metadata_ips_detected(self, ip: str) -> None:
    +        assert is_cloud_metadata_ip(ip) is True
    +
    +    @pytest.mark.parametrize(
    +        'ip',
    +        [
    +            '8.8.8.8',
    +            '127.0.0.1',
    +            '169.254.169.253',  # Close but not the metadata IP
    +            '169.254.169.255',
    +            '100.100.100.199',  # Close but not Alibaba metadata
    +            '100.100.100.201',
    +        ],
    +    )
    +    def test_non_metadata_ips(self, ip: str) -> None:
    +        assert is_cloud_metadata_ip(ip) is False
    +
    +
    +class TestValidateUrlProtocol:
    +    """Tests for validate_url_protocol function."""
    +
    +    @pytest.mark.parametrize(
    +        'url',
    +        [
    +            'http://example.com',
    +            'https://example.com',
    +            'HTTP://EXAMPLE.COM',
    +            'HTTPS://EXAMPLE.COM',
    +        ],
    +    )
    +    def test_allowed_protocols(self, url: str) -> None:
    +        scheme, is_https = validate_url_protocol(url)
    +        assert scheme in ('http', 'https')
    +        assert is_https == (scheme == 'https')
    +
    +    @pytest.mark.parametrize(
    +        ('url', 'protocol'),
    +        [
    +            ('file:///etc/passwd', 'file'),
    +            ('ftp://ftp.example.com/file.txt', 'ftp'),
    +            ('gopher://gopher.example.com', 'gopher'),
    +            ('gs://bucket/object', 'gs'),
    +            ('s3://bucket/key', 's3'),
    +            ('data:text/plain,hello', 'data'),
    +            ('javascript:alert(1)', 'javascript'),
    +        ],
    +    )
    +    def test_blocked_protocols(self, url: str, protocol: str) -> None:
    +        with pytest.raises(ValueError, match=f'URL protocol "{protocol}" is not allowed'):
    +            validate_url_protocol(url)
    +
    +
    +class TestExtractHostAndPort:
    +    """Tests for extract_host_and_port function."""
    +
    +    def test_basic_http_url(self) -> None:
    +        hostname, path, port, is_https = extract_host_and_port('http://example.com/path')
    +        assert hostname == 'example.com'
    +        assert path == '/path'
    +        assert port == 80
    +        assert is_https is False
    +
    +    def test_basic_https_url(self) -> None:
    +        hostname, path, port, is_https = extract_host_and_port('https://example.com/path')
    +        assert hostname == 'example.com'
    +        assert path == '/path'
    +        assert port == 443
    +        assert is_https is True
    +
    +    def test_custom_port(self) -> None:
    +        hostname, path, port, is_https = extract_host_and_port('http://example.com:8080/path')
    +        assert hostname == 'example.com'
    +        assert path == '/path'
    +        assert port == 8080
    +        assert is_https is False
    +
    +    def test_path_with_query_string(self) -> None:
    +        hostname, path, port, is_https = extract_host_and_port('https://example.com/path?query=value')
    +        assert hostname == 'example.com'
    +        assert path == '/path?query=value'
    +        assert port == 443
    +        assert is_https is True
    +
    +    def test_path_with_fragment(self) -> None:
    +        hostname, path, port, is_https = extract_host_and_port('https://example.com/path#fragment')
    +        assert hostname == 'example.com'
    +        assert path == '/path#fragment'
    +        assert port == 443
    +        assert is_https is True
    +
    +    def test_empty_path(self) -> None:
    +        hostname, path, port, is_https = extract_host_and_port('https://example.com')
    +        assert hostname == 'example.com'
    +        assert path == '/'
    +        assert port == 443
    +        assert is_https is True
    +
    +    def test_invalid_url_no_hostname(self) -> None:
    +        with pytest.raises(ValueError, match='Invalid URL: no hostname found'):
    +            extract_host_and_port('http://')
    +
    +
    +class TestBuildUrlWithIp:
    +    """Tests for build_url_with_ip function."""
    +
    +    def test_http_default_port(self) -> None:
    +        resolved = ResolvedUrl(
    +            resolved_ip='203.0.113.50', hostname='example.com', port=80, is_https=False, path='/path'
    +        )
    +        url = build_url_with_ip(resolved)
    +        assert url == 'http://203.0.113.50/path'
    +
    +    def test_https_default_port(self) -> None:
    +        resolved = ResolvedUrl(
    +            resolved_ip='203.0.113.50', hostname='example.com', port=443, is_https=True, path='/path'
    +        )
    +        url = build_url_with_ip(resolved)
    +        assert url == 'https://203.0.113.50/path'
    +
    +    def test_custom_port(self) -> None:
    +        resolved = ResolvedUrl(
    +            resolved_ip='203.0.113.50', hostname='example.com', port=8080, is_https=False, path='/path'
    +        )
    +        url = build_url_with_ip(resolved)
    +        assert url == 'http://203.0.113.50:8080/path'
    +
    +    def test_ipv6_address(self) -> None:
    +        resolved = ResolvedUrl(resolved_ip='2001:db8::1', hostname='example.com', port=443, is_https=True, path='/path')
    +        url = build_url_with_ip(resolved)
    +        assert url == 'https://[2001:db8::1]/path'
    +
    +    def test_ipv6_address_custom_port(self) -> None:
    +        resolved = ResolvedUrl(
    +            resolved_ip='2001:db8::1', hostname='example.com', port=8443, is_https=True, path='/path'
    +        )
    +        url = build_url_with_ip(resolved)
    +        assert url == 'https://[2001:db8::1]:8443/path'
    +
    +
    +class TestResolveRedirectUrl:
    +    """Tests for resolve_redirect_url function."""
    +
    +    def test_absolute_url(self) -> None:
    +        """Test that absolute URLs are returned as-is."""
    +        result = resolve_redirect_url('https://example.com/path', 'https://other.com/new-path')
    +        assert result == 'https://other.com/new-path'
    +
    +    def test_protocol_relative_url(self) -> None:
    +        """Test that protocol-relative URLs use the current scheme."""
    +        result = resolve_redirect_url('https://example.com/path', '//other.com/new-path')
    +        assert result == 'https://other.com/new-path'
    +
    +        result = resolve_redirect_url('http://example.com/path', '//other.com/new-path')
    +        assert result == 'http://other.com/new-path'
    +
    +    def test_absolute_path(self) -> None:
    +        """Test that absolute paths are resolved against the current URL."""
    +        result = resolve_redirect_url('https://example.com/old/path', '/new/path')
    +        assert result == 'https://example.com/new/path'
    +
    +    def test_relative_path(self) -> None:
    +        """Test that relative paths are resolved against the current URL."""
    +        result = resolve_redirect_url('https://example.com/old/path', 'new-file.txt')
    +        assert result == 'https://example.com/old/new-file.txt'
    +
    +    def test_protocol_relative_url_preserves_query_and_fragment(self) -> None:
    +        """Test that protocol-relative URLs preserve query strings and fragments."""
    +        result = resolve_redirect_url('https://example.com/path', '//cdn.example.com/file.txt?token=abc#section')
    +        assert result == 'https://cdn.example.com/file.txt?token=abc#section'
    +
    +
    +class TestResolveHostname:
    +    """Tests for resolve_hostname function."""
    +
    +    async def test_resolve_success(self) -> None:
    +        """Test that hostname resolution returns IP addresses."""
    +        with patch('pydantic_ai._ssrf.run_in_executor') as mock_executor:
    +            mock_executor.return_value = [
    +                (2, 1, 6, '', ('93.184.215.14', 0)),
    +                (2, 1, 6, '', ('93.184.215.14', 0)),  # Duplicate should be removed
    +            ]
    +            ips = await resolve_hostname('example.com')
    +            assert ips == ['93.184.215.14']
    +
    +    async def test_resolve_failure(self) -> None:
    +        """Test that DNS resolution failure raises ValueError."""
    +        import socket
    +
    +        with patch('pydantic_ai._ssrf.run_in_executor', side_effect=socket.gaierror('DNS lookup failed')):
    +            with pytest.raises(ValueError, match='DNS resolution failed for hostname'):
    +                await resolve_hostname('nonexistent.invalid')
    +
    +
    +class TestValidateAndResolveUrl:
    +    """Tests for validate_and_resolve_url function."""
    +
    +    async def test_public_ip_allowed(self) -> None:
    +        """Test that public IPs are allowed."""
    +        with patch('pydantic_ai._ssrf.run_in_executor') as mock_executor:
    +            mock_executor.return_value = [(2, 1, 6, '', ('93.184.215.14', 0))]
    +            resolved = await validate_and_resolve_url('https://example.com/path', allow_local=False)
    +            assert resolved.resolved_ip == '93.184.215.14'
    +            assert resolved.hostname == 'example.com'
    +            assert resolved.port == 443
    +            assert resolved.is_https is True
    +            assert resolved.path == '/path'
    +
    +    async def test_private_ip_blocked_by_default(self) -> None:
    +        """Test that private IPs are blocked by default."""
    +        with patch('pydantic_ai._ssrf.run_in_executor') as mock_executor:
    +            mock_executor.return_value = [(2, 1, 6, '', ('192.168.1.1', 0))]
    +            with pytest.raises(ValueError, match='Access to private/internal IP address'):
    +                await validate_and_resolve_url('http://internal.local/path', allow_local=False)
    +
    +    async def test_private_ip_allowed_with_allow_local(self) -> None:
    +        """Test that private IPs are allowed with allow_local=True."""
    +        with patch('pydantic_ai._ssrf.run_in_executor') as mock_executor:
    +            mock_executor.return_value = [(2, 1, 6, '', ('192.168.1.1', 0))]
    +            resolved = await validate_and_resolve_url('http://internal.local/path', allow_local=True)
    +            assert resolved.resolved_ip == '192.168.1.1'
    +
    +    async def test_cloud_metadata_always_blocked(self) -> None:
    +        """Test that cloud metadata IPs are always blocked, even with allow_local=True."""
    +        with patch('pydantic_ai._ssrf.run_in_executor') as mock_executor:
    +            mock_executor.return_value = [(2, 1, 6, '', ('169.254.169.254', 0))]
    +            with pytest.raises(ValueError, match='Access to cloud metadata service'):
    +                await validate_and_resolve_url('http://metadata.google.internal/path', allow_local=True)
    +
    +    async def test_alibaba_cloud_metadata_always_blocked(self) -> None:
    +        """Test that Alibaba Cloud metadata IP is always blocked, even with allow_local=True."""
    +        with patch('pydantic_ai._ssrf.run_in_executor') as mock_executor:
    +            mock_executor.return_value = [(2, 1, 6, '', ('100.100.100.200', 0))]
    +            with pytest.raises(ValueError, match='Access to cloud metadata service'):
    +                await validate_and_resolve_url('http://metadata.aliyun.internal/path', allow_local=True)
    +
    +    async def test_literal_ip_address_in_url(self) -> None:
    +        """Test handling of literal IP addresses in URLs."""
    +        # Public IP - should work
    +        resolved = await validate_and_resolve_url('http://8.8.8.8/path', allow_local=False)
    +        assert resolved.resolved_ip == '8.8.8.8'
    +        assert resolved.hostname == '8.8.8.8'
    +
    +    async def test_literal_private_ip_blocked(self) -> None:
    +        """Test that literal private IPs in URLs are blocked."""
    +        with pytest.raises(ValueError, match='Access to private/internal IP address'):
    +            await validate_and_resolve_url('http://192.168.1.1/path', allow_local=False)
    +
    +    async def test_any_private_ip_blocks_request(self) -> None:
    +        """Test that if any resolved IP is private, the request is blocked."""
    +        with patch('pydantic_ai._ssrf.run_in_executor') as mock_executor:
    +            # Return both public and private IPs
    +            mock_executor.return_value = [
    +                (2, 1, 6, '', ('93.184.215.14', 0)),
    +                (2, 1, 6, '', ('192.168.1.1', 0)),
    +            ]
    +            with pytest.raises(ValueError, match='Access to private/internal IP address'):
    +                await validate_and_resolve_url('http://example.com/path', allow_local=False)
    +
    +    async def test_6to4_address_blocked(self) -> None:
    +        """Test that 6to4 addresses (which can embed private IPv4) are blocked."""
    +        # 2002:c0a8:0101::1 embeds 192.168.1.1
    +        with pytest.raises(ValueError, match='Access to private/internal IP address'):
    +            await validate_and_resolve_url('http://[2002:c0a8:0101::1]/path', allow_local=False)
    +
    +    async def test_cgnat_range_blocked(self) -> None:
    +        """Test that CGNAT range (100.64.0.0/10) is blocked."""
    +        with patch('pydantic_ai._ssrf.run_in_executor') as mock_executor:
    +            mock_executor.return_value = [(2, 1, 6, '', ('100.64.0.1', 0))]
    +            with pytest.raises(ValueError, match='Access to private/internal IP address'):
    +                await validate_and_resolve_url('http://cgnat-host.internal/path', allow_local=False)
    +
    +
    +class TestSafeDownload:
    +    """Tests for safe_download function."""
    +
    +    async def test_successful_download(self) -> None:
    +        """Test successful download of a public URL."""
    +        mock_response = AsyncMock()
    +        mock_response.is_redirect = False
    +        mock_response.raise_for_status = lambda: None
    +        mock_response.content = b'test content'
    +
    +        with (
    +            patch('pydantic_ai._ssrf.run_in_executor') as mock_executor,
    +            patch('pydantic_ai._ssrf.cached_async_http_client') as mock_client_fn,
    +        ):
    +            mock_executor.return_value = [(2, 1, 6, '', ('93.184.215.14', 0))]
    +
    +            mock_client = AsyncMock()
    +            mock_client.get.return_value = mock_response
    +            mock_client_fn.return_value = mock_client
    +
    +            response = await safe_download('https://example.com/file.txt')
    +            assert response.content == b'test content'
    +
    +            # Verify the request was made to the resolved IP with Host header and SNI
    +            mock_client.get.assert_called_once()
    +            call_args = mock_client.get.call_args
    +            assert '93.184.215.14' in call_args[0][0]
    +            assert call_args[1]['headers']['Host'] == 'example.com'
    +            assert call_args[1]['extensions'] == {'sni_hostname': 'example.com'}
    +
    +    async def test_redirect_followed(self) -> None:
    +        """Test that redirects are followed with validation."""
    +        redirect_response = AsyncMock()
    +        redirect_response.is_redirect = True
    +        redirect_response.headers = {'location': 'https://cdn.example.com/file.txt'}
    +
    +        final_response = AsyncMock()
    +        final_response.is_redirect = False
    +        final_response.raise_for_status = lambda: None
    +        final_response.content = b'final content'
    +
    +        with (
    +            patch('pydantic_ai._ssrf.run_in_executor') as mock_executor,
    +            patch('pydantic_ai._ssrf.cached_async_http_client') as mock_client_fn,
    +        ):
    +            # First call for example.com, second for cdn.example.com
    +            mock_executor.side_effect = [
    +                [(2, 1, 6, '', ('93.184.215.14', 0))],
    +                [(2, 1, 6, '', ('203.0.113.50', 0))],
    +            ]
    +
    +            mock_client = AsyncMock()
    +            mock_client.get.side_effect = [redirect_response, final_response]
    +            mock_client_fn.return_value = mock_client
    +
    +            response = await safe_download('https://example.com/file.txt')
    +            assert response.content == b'final content'
    +            assert mock_client.get.call_count == 2
    +
    +    async def test_redirect_to_private_ip_blocked(self) -> None:
    +        """Test that redirects to private IPs are blocked."""
    +        redirect_response = AsyncMock()
    +        redirect_response.is_redirect = True
    +        redirect_response.headers = {'location': 'http://internal.local/file.txt'}
    +
    +        with (
    +            patch('pydantic_ai._ssrf.run_in_executor') as mock_executor,
    +            patch('pydantic_ai._ssrf.cached_async_http_client') as mock_client_fn,
    +        ):
    +            # First call for example.com (public), second for internal.local (private)
    +            mock_executor.side_effect = [
    +                [(2, 1, 6, '', ('93.184.215.14', 0))],
    +                [(2, 1, 6, '', ('192.168.1.1', 0))],
    +            ]
    +
    +            mock_client = AsyncMock()
    +            mock_client.get.return_value = redirect_response
    +            mock_client_fn.return_value = mock_client
    +
    +            with pytest.raises(ValueError, match='Access to private/internal IP address'):
    +                await safe_download('https://example.com/file.txt')
    +
    +    async def test_max_redirects_exceeded(self) -> None:
    +        """Test that too many redirects raises an error."""
    +        redirect_response = AsyncMock()
    +        redirect_response.is_redirect = True
    +        redirect_response.headers = {'location': 'https://example.com/redirect'}
    +
    +        with (
    +            patch('pydantic_ai._ssrf.run_in_executor') as mock_executor,
    +            patch('pydantic_ai._ssrf.cached_async_http_client') as mock_client_fn,
    +        ):
    +            mock_executor.return_value = [(2, 1, 6, '', ('93.184.215.14', 0))]
    +
    +            mock_client = AsyncMock()
    +            mock_client.get.return_value = redirect_response
    +            mock_client_fn.return_value = mock_client
    +
    +            with pytest.raises(ValueError, match=f'Too many redirects \\({_MAX_REDIRECTS + 1}\\)'):
    +                await safe_download('https://example.com/file.txt')
    +
    +    async def test_relative_redirect_resolved(self) -> None:
    +        """Test that relative redirect URLs are resolved correctly."""
    +        redirect_response = AsyncMock()
    +        redirect_response.is_redirect = True
    +        redirect_response.headers = {'location': '/new-path/file.txt'}
    +
    +        final_response = AsyncMock()
    +        final_response.is_redirect = False
    +        final_response.raise_for_status = lambda: None
    +        final_response.content = b'final content'
    +
    +        with (
    +            patch('pydantic_ai._ssrf.run_in_executor') as mock_executor,
    +            patch('pydantic_ai._ssrf.cached_async_http_client') as mock_client_fn,
    +        ):
    +            mock_executor.return_value = [(2, 1, 6, '', ('93.184.215.14', 0))]
    +
    +            mock_client = AsyncMock()
    +            mock_client.get.side_effect = [redirect_response, final_response]
    +            mock_client_fn.return_value = mock_client
    +
    +            response = await safe_download('https://example.com/old-path/file.txt')
    +            assert response.content == b'final content'
    +
    +            # Check that the second request was to the correct path
    +            second_call = mock_client.get.call_args_list[1]
    +            assert '/new-path/file.txt' in second_call[0][0]
    +
    +    async def test_missing_location_header(self) -> None:
    +        """Test that redirect without Location header raises error."""
    +        redirect_response = AsyncMock()
    +        redirect_response.is_redirect = True
    +        redirect_response.headers = {}
    +
    +        with (
    +            patch('pydantic_ai._ssrf.run_in_executor') as mock_executor,
    +            patch('pydantic_ai._ssrf.cached_async_http_client') as mock_client_fn,
    +        ):
    +            mock_executor.return_value = [(2, 1, 6, '', ('93.184.215.14', 0))]
    +
    +            mock_client = AsyncMock()
    +            mock_client.get.return_value = redirect_response
    +            mock_client_fn.return_value = mock_client
    +
    +            with pytest.raises(ValueError, match='Redirect response missing Location header'):
    +                await safe_download('https://example.com/file.txt')
    +
    +    async def test_protocol_relative_redirect(self) -> None:
    +        """Test that protocol-relative redirects are handled correctly."""
    +        redirect_response = AsyncMock()
    +        redirect_response.is_redirect = True
    +        redirect_response.headers = {'location': '//cdn.example.com/file.txt'}
    +
    +        final_response = AsyncMock()
    +        final_response.is_redirect = False
    +        final_response.raise_for_status = lambda: None
    +        final_response.content = b'final content'
    +
    +        with (
    +            patch('pydantic_ai._ssrf.run_in_executor') as mock_executor,
    +            patch('pydantic_ai._ssrf.cached_async_http_client') as mock_client_fn,
    +        ):
    +            # First call for example.com, second for cdn.example.com
    +            mock_executor.side_effect = [
    +                [(2, 1, 6, '', ('93.184.215.14', 0))],
    +                [(2, 1, 6, '', ('203.0.113.50', 0))],
    +            ]
    +
    +            mock_client = AsyncMock()
    +            mock_client.get.side_effect = [redirect_response, final_response]
    +            mock_client_fn.return_value = mock_client
    +
    +            response = await safe_download('https://example.com/file.txt')
    +            assert response.content == b'final content'
    +            assert mock_client.get.call_count == 2
    +
    +            # Verify second request was to cdn.example.com with https
    +            second_call = mock_client.get.call_args_list[1]
    +            assert second_call[1]['headers']['Host'] == 'cdn.example.com'
    +
    +    async def test_protocol_relative_redirect_to_private_blocked(self) -> None:
    +        """Test that protocol-relative redirects to private IPs are blocked."""
    +        redirect_response = AsyncMock()
    +        redirect_response.is_redirect = True
    +        redirect_response.headers = {'location': '//internal.local/file.txt'}
    +
    +        with (
    +            patch('pydantic_ai._ssrf.run_in_executor') as mock_executor,
    +            patch('pydantic_ai._ssrf.cached_async_http_client') as mock_client_fn,
    +        ):
    +            mock_executor.side_effect = [
    +                [(2, 1, 6, '', ('93.184.215.14', 0))],
    +                [(2, 1, 6, '', ('192.168.1.1', 0))],
    +            ]
    +
    +            mock_client = AsyncMock()
    +            mock_client.get.return_value = redirect_response
    +            mock_client_fn.return_value = mock_client
    +
    +            with pytest.raises(ValueError, match='Access to private/internal IP address'):
    +                await safe_download('https://example.com/file.txt')
    +
    +    async def test_http_no_sni_extension(self) -> None:
    +        """Test that sni_hostname extension is not set for HTTP requests."""
    +        mock_response = AsyncMock()
    +        mock_response.is_redirect = False
    +        mock_response.raise_for_status = lambda: None
    +
    +        with (
    +            patch('pydantic_ai._ssrf.run_in_executor') as mock_executor,
    +            patch('pydantic_ai._ssrf.cached_async_http_client') as mock_client_fn,
    +        ):
    +            mock_executor.return_value = [(2, 1, 6, '', ('93.184.215.14', 0))]
    +
    +            mock_client = AsyncMock()
    +            mock_client.get.return_value = mock_response
    +            mock_client_fn.return_value = mock_client
    +
    +            await safe_download('http://example.com/file.txt')
    +
    +            call_args = mock_client.get.call_args
    +            assert call_args[1]['extensions'] == {}
    +
    +    async def test_protocol_validation(self) -> None:
    +        """Test that non-http(s) protocols are rejected."""
    +        with pytest.raises(ValueError, match='URL protocol "file" is not allowed'):
    +            await safe_download('file:///etc/passwd')
    +
    +        with pytest.raises(ValueError, match='URL protocol "ftp" is not allowed'):
    +            await safe_download('ftp://ftp.example.com/file.txt')
    +
    +    async def test_timeout_parameter(self) -> None:
    +        """Test that timeout parameter is passed to client."""
    +        mock_response = AsyncMock()
    +        mock_response.is_redirect = False
    +        mock_response.raise_for_status = lambda: None
    +
    +        with (
    +            patch('pydantic_ai._ssrf.run_in_executor') as mock_executor,
    +            patch('pydantic_ai._ssrf.cached_async_http_client') as mock_client_fn,
    +        ):
    +            mock_executor.return_value = [(2, 1, 6, '', ('93.184.215.14', 0))]
    +
    +            mock_client = AsyncMock()
    +            mock_client.get.return_value = mock_response
    +            mock_client_fn.return_value = mock_client
    +
    +            await safe_download('https://example.com/file.txt', timeout=60)
    +
    +            mock_client_fn.assert_called_once_with(timeout=60)
    +
    +    async def test_default_timeout(self) -> None:
    +        """Test that default timeout is used."""
    +        mock_response = AsyncMock()
    +        mock_response.is_redirect = False
    +        mock_response.raise_for_status = lambda: None
    +
    +        with (
    +            patch('pydantic_ai._ssrf.run_in_executor') as mock_executor,
    +            patch('pydantic_ai._ssrf.cached_async_http_client') as mock_client_fn,
    +        ):
    +            mock_executor.return_value = [(2, 1, 6, '', ('93.184.215.14', 0))]
    +
    +            mock_client = AsyncMock()
    +            mock_client.get.return_value = mock_response
    +            mock_client_fn.return_value = mock_client
    +
    +            await safe_download('https://example.com/file.txt')
    +
    +            mock_client_fn.assert_called_once_with(timeout=_DEFAULT_TIMEOUT)
    +
    +
    +class TestDnsRebindingPrevention:
    +    """Tests specifically for DNS rebinding attack prevention."""
    +
    +    async def test_hostname_resolving_to_private_ip_blocked(self) -> None:
    +        """Test that a hostname resolving to a private IP is blocked."""
    +        with patch('pydantic_ai._ssrf.run_in_executor') as mock_executor:
    +            # Attacker's DNS returns private IP
    +            mock_executor.return_value = [(2, 1, 6, '', ('127.0.0.1', 0))]
    +            with pytest.raises(ValueError, match='Access to private/internal IP address'):
    +                await validate_and_resolve_url('http://attacker.com/path', allow_local=False)
    +
    +    async def test_hostname_resolving_to_cloud_metadata_blocked(self) -> None:
    +        """Test that a hostname resolving to cloud metadata IP is blocked."""
    +        with patch('pydantic_ai._ssrf.run_in_executor') as mock_executor:
    +            # Attacker's DNS returns cloud metadata IP
    +            mock_executor.return_value = [(2, 1, 6, '', ('169.254.169.254', 0))]
    +            with pytest.raises(ValueError, match='Access to cloud metadata service'):
    +                await validate_and_resolve_url('http://attacker.com/path', allow_local=True)
    +
    +    async def test_multiple_ips_with_any_private_blocked(self) -> None:
    +        """Test that if any IP in the resolution is private, request is blocked."""
    +        with patch('pydantic_ai._ssrf.run_in_executor') as mock_executor:
    +            # DNS returns multiple IPs, one of which is private
    +            mock_executor.return_value = [
    +                (2, 1, 6, '', ('8.8.8.8', 0)),  # Public
    +                (10, 1, 6, '', ('::1', 0)),  # Private IPv6 loopback
    +            ]
    +            with pytest.raises(ValueError, match='Access to private/internal IP address'):
    +                await validate_and_resolve_url('http://attacker.com/path', allow_local=False)
    

Vulnerability mechanics

Generated on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

4

News mentions

0

No linked articles in our index yet.