VYPR
Medium severity5.0GHSA Advisory· Published May 28, 2026

CVE-2026-46526

CVE-2026-46526

Description

Local Deep Research is an AI-powered research assistant for deep, iterative research. Prior to 1.6.10, the URL checking logic in local-deep-research has a logical flaw that could be bypassed by attackers, leading to SSRF attacks. The current project uses validate_url to validate the input URL. The main logic is to perform security checks on the host portion of the URL extracted by urlparse to prevent SSRF attacks. However, there are indeed differences in parsing between urlparse and the library that actually sends the request. For example, in safe_get, validate_url is first used to perform an SSRF check, and then requests.get is used to send the actual request. This vulnerability is fixed in 1.6.10.

AI Insight

LLM-synthesized narrative grounded in this CVE's description and references.

A parser-differential SSRF vulnerability in local-deep-research before 1.6.10 allows attackers to reach internal services by exploiting URL parsing inconsistencies.

Vulnerability

A server-side request forgery (SSRF) vulnerability exists in local-deep-research versions prior to 1.6.10. The SSRF validator used urllib.parse.urlparse to parse URLs while the HTTP client (requests) used urllib3. This parser differential meant that crafted URLs like http://127.0.0.1\@1.1.1.1 were interpreted differently: urlparse extracted 1.1.1.1 (passing the SSRF check) while requests connected to 127.0.0.1. In addition, IPv6 unspecified address :: was not blocked, allowing access to loopback services via http://[::]:port/ on Linux. The attack surface also included IPv6 transition prefixes (6to4, NAT64, Teredo, etc.) which could route to internal addresses on hosts with tunnel routes configured [1][2][3][4].

Exploitation

An attacker can supply a malicious URL to the local-deep-research tool, for example via the research input. By using backslash characters (\) to confuse the URL parsers, or by using IPv6 unspecified or transition-prefix addresses, the attacker can bypass the validator. The attacker does not need prior authentication; network access to the application is sufficient. The sequence involves submitting a crafted URL that the validator considers safe but that requests.get resolves to an internal IP address (e.g., 127.0.0.1, [::1], or cloud metadata endpoints at 169.254.169.254). The application then sends an HTTP request to that internal target [1][2][4].

Impact

Successful exploitation allows the attacker to make the local-deep-research server send HTTP requests to arbitrary internal hosts, including loopback services and cloud metadata endpoints. This can result in information disclosure (e.g., cloud instance metadata, credentials), access to internal APIs, and potentially further compromise of the host or cloud environment. No code execution is achieved directly, but the SSRF can be used to pivot to other internal resources [1][2][3].

Mitigation

The vulnerability is fixed in version 1.6.10, released on or about May 28, 2026 [1]. The fix introduces two layers: (1) reject URLs containing backslash, ASCII control bytes, or whitespace; (2) use urllib3.util.parse_url (the same parser as requests) for SSRF validation so parsers agree by construction [2]. Additionally, IPv6 unspecified addresses (::) and IPv6 transition prefixes (6to4 2002::/16, NAT64 64:ff9b::/96, Teredo 2001::/32, discard 100::/64) have been added to private IP ranges [3][4]. Users should upgrade to 1.6.10 or later. No workaround is available; the fix is in the core URL validation logic [1][2].

AI Insight generated on May 28, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.

Affected products

2

Patches

4
00745f7a47ee

security: block IPv6 transition prefixes in SSRF defense (#3932)

https://github.com/LearningCircuit/local-deep-researchLearningCircuitMay 10, 2026via body-scan-shorthand
9 files changed · +1080 31
  • changelog.d/+ipv6-transition-prefixes.security.md+25 0 added
    @@ -0,0 +1,25 @@
    +SSRF defense-in-depth: block IPv6 transition prefixes that can wrap
    +private IPv4 destinations on hosts with kernel sit0/NAT64 routes.
    +
    +- `2002::/16` (6to4, RFC 3056 — deprecated by RFC 7526)
    +- `64:ff9b::/96` (NAT64 well-known, RFC 6052)
    +- `64:ff9b:1::/48` (NAT64 local-use, RFC 8215 — same SSRF threat class
    +  as the WKP; missing it earned a HackerOne bounty against
    +  ssrf_filter)
    +- `2001::/32` (Teredo, RFC 4380)
    +- `100::/64` (IPv6 discard, RFC 6666)
    +- `::/96` (IPv4-Compatible IPv6, RFC 4291 §2.5.5.1 — DEPRECATED 2006;
    +  same SSRF threat class as the transition prefixes)
    +
    +The metadata-IP block is hardened against IPv6-wrapped IMDS access:
    +when an IPv6 destination falls in a NAT64 prefix, the embedded IPv4 is
    +extracted and matched against `ALWAYS_BLOCKED_METADATA_IPS`, so
    +`[64:ff9b::a9fe:a9fe]` cannot reach 169.254.169.254 even on a NAT64
    +host.
    +
    +Operators on IPv6-only deployments using DNS64+NAT64 (where outbound
    +IPv4 traffic is synthesized through `64:ff9b::/96`) can opt back in via
    +the env-only setting `security.allow_nat64`
    +(`LDR_SECURITY_ALLOW_NAT64=true`). The opt-in is scoped strictly to
    +the two NAT64 prefixes — 6to4, Teredo, and discard remain blocked
    +unconditionally, and the IMDS embedded-IPv4 carve-out still applies.
    
  • SECURITY.md+21 0 modified
    @@ -150,8 +150,29 @@ Both `ssrf_validator.validate_url` and `NotificationURLValidator.validate_servic
     | `169.254.0.23` | Tencent Cloud |
     | `100.100.100.200` | AlibabaCloud |
     
    +The block also catches IPv6-wrapped forms of these metadata IPs. When an IPv6 destination falls in a NAT64 prefix (`64:ff9b::/96` RFC 6052 well-known or `64:ff9b:1::/48` RFC 8215 local-use), the validator extracts the embedded IPv4 from the low 32 bits and matches it against this set — so `[64:ff9b::a9fe:a9fe]` cannot reach `169.254.169.254` even on a host with NAT64 routes configured. The check fires before any opt-in carve-out, so the operator switch described below cannot license IMDS exposure.
    +
    +Both `ssrf_validator.is_ip_blocked` and `NotificationURLValidator.validate_service_url` enforce this absolutely, including under `allow_private_ips=True`. The latter flag is an operator opt-in for self-hosted webhooks on internal networks (RFC1918, CGNAT, loopback, link-local, IPv6 ULA); it does NOT extend to metadata IPs or NAT64-wrapped metadata. Both validators delegate to the same `is_ip_blocked` helper to keep the absolute-block invariant in lockstep.
    +
     Future contributors must not remove entries from this set. Adding a new cloud provider's metadata IP is encouraged when a new public-cloud target appears.
     
    +### IPv6 Transition Prefix Block List
    +
    +`PRIVATE_IP_RANGES` blocks four IPv6 prefixes that can wrap private-IPv4 destinations on hosts with kernel transition routes configured:
    +
    +| Prefix | Purpose | RFC |
    +| --- | --- | --- |
    +| `2002::/16` | 6to4 | RFC 3056 (deprecated by RFC 7526) |
    +| `64:ff9b::/96` | NAT64 well-known prefix | RFC 6052 |
    +| `64:ff9b:1::/48` | NAT64 local-use prefix | RFC 8215 |
    +| `2001::/32` | Teredo | RFC 4380 |
    +| `100::/64` | IPv6 discard prefix | RFC 6666 |
    +| `::/96` | IPv4-Compatible IPv6 (deprecated) | RFC 4291 §2.5.5.1 |
    +
    +Default Linux has no `sit0` / NAT64 routes so this is defensive-only on the typical deployment, but blocking these prefixes closes the IPv6-wrapped SSRF bypass class on hosts where transition tunnels are enabled.
    +
    +Operators on IPv6-only deployments using DNS64+NAT64 (AWS / GKE / Azure IPv6-only nodes) reach IPv4 services through `64:ff9b::/96`. They can opt back into NAT64 reachability via the env-only setting `security.allow_nat64` (`LDR_SECURITY_ALLOW_NAT64=true`). The opt-in is scoped strictly to the two NAT64 prefixes — 6to4, Teredo, and discard remain unconditionally blocked because they have no live legitimate use, and the IMDS embedded-IPv4 check above still applies so cloud metadata stays unreachable through any NAT64 wrap.
    +
     URL rejection log lines route through `ssrf_validator.redact_url_for_log` to drop userinfo (RFC 3986 §3.2.1 allows credentials in the URL), path, and query — operators see `scheme://host:port` only. Operators with grep/regex tooling on the rejection log lines will see authority-only strings instead of full URLs.
     
     ## Supported Versions
    
  • src/local_deep_research/security/ip_ranges.py+32 0 modified
    @@ -25,4 +25,36 @@
         ipaddress.ip_network(
             "::/128"
         ),  # IPv6 unspecified — Linux routes connections to local host
    +    # IPv6 transition prefixes that can wrap private IPv4 destinations.
    +    # On Linux hosts with kernel sit0 / NAT64 routes configured, these
    +    # prefixes are forwarded to the embedded IPv4 (e.g. 2002:7f00:1::
    +    # → 127.0.0.1 via 6to4). Default Linux has no such routes so they
    +    # are not exploitable in the typical deployment, but blocking them
    +    # closes the gap for operators who do enable transition tunnels.
    +    ipaddress.ip_network("2002::/16"),  # 6to4 (RFC 3056, deprecated RFC 7526)
    +    ipaddress.ip_network("64:ff9b::/96"),  # NAT64 well-known prefix (RFC 6052)
    +    ipaddress.ip_network(
    +        "64:ff9b:1::/48"
    +    ),  # NAT64 local-use prefix (RFC 8215) — same SSRF threat class as the WKP
    +    ipaddress.ip_network("2001::/32"),  # Teredo (RFC 4380)
    +    ipaddress.ip_network("100::/64"),  # IPv6 discard prefix (RFC 6666)
    +    # IPv4-Compatible IPv6 — DEPRECATED by RFC 4291 §2.5.5.1 in 2006 but
    +    # still parseable by ipaddress and routable on hosts with ::/96 routes
    +    # configured (rare but real). Embeds the IPv4 in the low 32 bits so
    +    # [::169.254.169.254] would otherwise reach IMDS, identically to the
    +    # 6to4/NAT64 wraps. Has zero legitimate live use; blocking it is the
    +    # same defense-in-depth move as the transition prefixes above.
    +    ipaddress.ip_network("::/96"),
    +]
    +
    +# NAT64 prefixes — operators on IPv6-only hosts using DNS64+NAT64 reach
    +# IPv4 services through these. Blocking by default protects the typical
    +# deployment shape (laptops / dual-stack) from the IPv6-wrapped IMDS /
    +# RFC1918 SSRF bypass class. Operators who actually need NAT64 reachable
    +# can opt in via the env-only setting ``security.allow_nat64``
    +# (LDR_SECURITY_ALLOW_NAT64=true). 6to4, Teredo, and discard remain
    +# unconditionally blocked — they have no legitimate live use.
    +NAT64_PREFIXES = [
    +    ipaddress.ip_network("64:ff9b::/96"),
    +    ipaddress.ip_network("64:ff9b:1::/48"),
     ]
    
  • src/local_deep_research/security/notification_validator.py+48 12 modified
    @@ -64,18 +64,47 @@ class NotificationURLValidator:
         PRIVATE_IP_RANGES = _PRIVATE_IP_RANGES
     
         @staticmethod
    -    def _is_private_ip(hostname: str) -> bool:
    +    def _ip_matches_blocked_range(ip, allow_private_ips: bool = False) -> bool:
    +        """Block-decision for a parsed IP, delegating to
    +        ``ssrf_validator.is_ip_blocked`` so the two validators share a
    +        single source of truth.
    +
    +        Honors:
    +        - ALWAYS_BLOCKED_METADATA_IPS (cloud metadata, absolute)
    +        - is_nat64_wrapped_metadata_ip (NAT64-wrapped IMDS, absolute)
    +        - security.allow_nat64 env carve-out for the two NAT64 prefixes
    +        - allow_private_ips: when True, RFC1918 / CGNAT / loopback /
    +          link-local / IPv6 ULA are allowed BUT the two absolute checks
    +          above still fire. This closes the historical bypass where
    +          ``allow_private_ips=True`` skipped the host check entirely
    +          and let metadata IPs through the notification path.
    +        """
    +        from .ssrf_validator import is_ip_blocked
    +
    +        return is_ip_blocked(str(ip), allow_private_ips=allow_private_ips)
    +
    +    @staticmethod
    +    def _is_private_ip(hostname: str, allow_private_ips: bool = False) -> bool:
             """
             Check if hostname resolves to a private IP address.
     
             Args:
                 hostname: Hostname to check
    +            allow_private_ips: When True, RFC1918 / CGNAT / loopback /
    +                link-local / IPv6 ULA are NOT considered private. Cloud
    +                metadata IPs and NAT64-wrapped metadata IPs are blocked
    +                regardless — the operator opt-in cannot license IMDS
    +                exposure.
     
             Returns:
    -            True if hostname is a private IP or localhost
    +            True if hostname is a private IP or localhost (subject to
    +            allow_private_ips), or wraps a metadata IP unconditionally
             """
    -        # Check for localhost variations
    -        if hostname.lower() in (
    +        # Localhost-string shortcuts only apply when the operator hasn't
    +        # opted into private-IP reachability. With allow_private_ips=True
    +        # we let the IP path (DNS-resolved or literal) make the decision
    +        # so metadata-IP literals like "169.254.169.254" still block.
    +        if not allow_private_ips and hostname.lower() in (
                 "localhost",
                 "127.0.0.1",
                 "::1",
    @@ -87,9 +116,8 @@ def _is_private_ip(hostname: str) -> bool:
             # Try to parse as IP address
             try:
                 ip = ipaddress.ip_address(hostname)
    -            return any(
    -                ip in network
    -                for network in NotificationURLValidator.PRIVATE_IP_RANGES
    +            return NotificationURLValidator._ip_matches_blocked_range(
    +                ip, allow_private_ips=allow_private_ips
                 )
             except ValueError:
                 # Hostname - resolve to IP and check.
    @@ -131,9 +159,8 @@ def _is_private_ip(hostname: str) -> bool:
                         executor.shutdown(wait=False, cancel_futures=True)
                     for _family, _, _, _, sockaddr in resolved_ips:
                         ip = ipaddress.ip_address(sockaddr[0])
    -                    if any(
    -                        ip in network
    -                        for network in NotificationURLValidator.PRIVATE_IP_RANGES
    +                    if NotificationURLValidator._ip_matches_blocked_range(
    +                        ip, allow_private_ips=allow_private_ips
                         ):
                             return True
                 except (socket.gaierror, OSError, TimeoutError):
    @@ -233,7 +260,14 @@ def validate_service_url(
             # (GHSA-g23j-2vwm-5c25). For non-HTTP schemes (Apprise transports
             # like discord://, slack://, mailto://) Apprise handles the URL
             # itself and the parser-differential doesn't apply.
    -        if scheme in ("http", "https") and not allow_private_ips:
    +        #
    +        # The host check runs even when ``allow_private_ips=True`` —
    +        # ``_is_private_ip`` propagates that flag, so RFC1918 / loopback
    +        # are allowed through, but cloud-metadata IPs and NAT64-wrapped
    +        # metadata still block. ``allow_private_ips=True`` is an
    +        # operator opt-in for self-hosted webhooks on internal networks,
    +        # not for IMDS exfiltration.
    +        if scheme in ("http", "https"):
                 try:
                     u3 = parse_url(url)
                 except LocationParseError:
    @@ -255,7 +289,9 @@ def validate_service_url(
                     hostname = hostname[1:-1]
                 if hostname:
                     hostname = hostname.rstrip(".")
    -            if hostname and NotificationURLValidator._is_private_ip(hostname):
    +            if hostname and NotificationURLValidator._is_private_ip(
    +                hostname, allow_private_ips=allow_private_ips
    +            ):
                     logger.warning(
                         f"Blocked private/internal IP in notification URL: "
                         f"{hostname}"
    
  • src/local_deep_research/security/ssrf_validator.py+43 0 modified
    @@ -15,6 +15,7 @@
     from urllib3.util import parse_url
     
     from .ip_ranges import PRIVATE_IP_RANGES as BLOCKED_IP_RANGES
    +from .ip_ranges import NAT64_PREFIXES
     
     # Cloud-provider metadata endpoints — always blocked, even with
     # allow_localhost=True or allow_private_ips=True. These IPs expose IAM /
    @@ -33,6 +34,26 @@
     # Allowed URL schemes
     ALLOWED_SCHEMES = {"http", "https"}
     
    +
    +def is_nat64_wrapped_metadata_ip(ip: ipaddress._BaseAddress) -> bool:
    +    """True iff ``ip`` is an IPv6 address inside a NAT64 prefix whose
    +    embedded IPv4 (low 32 bits) is in ``ALWAYS_BLOCKED_METADATA_IPS``.
    +
    +    Both ``is_ip_blocked`` and ``NotificationURLValidator._ip_matches_blocked_range``
    +    consult this before honoring the ``security.allow_nat64`` operator
    +    opt-in, so cloud-metadata access cannot be re-opened through an
    +    IPv6-wrapped destination on a NAT64-equipped host. Keeping the
    +    extraction in one place prevents the two validators from drifting.
    +    """
    +    if not isinstance(ip, ipaddress.IPv6Address):
    +        return False
    +    for nat64_prefix in NAT64_PREFIXES:
    +        if ip in nat64_prefix:
    +            embedded_v4 = ipaddress.IPv4Address(int(ip) & 0xFFFFFFFF)
    +            return str(embedded_v4) in ALWAYS_BLOCKED_METADATA_IPS
    +    return False
    +
    +
     # RFC 3986 forbids these characters in URLs; their presence in a URL signals
     # a parser-differential attempt (GHSA-g23j-2vwm-5c25). \s covers space, \t,
     # \n, \r, \v, \f. Backslash is the load-bearing payload — Python's urlparse
    @@ -104,9 +125,31 @@ def is_ip_blocked(
             if str(ip) in ALWAYS_BLOCKED_METADATA_IPS:
                 return True
     
    +        # Also block metadata IPs reached via NAT64 wrap. NAT64 prefixes
    +        # embed the IPv4 destination in the low 32 bits; even when the
    +        # operator has set LDR_SECURITY_ALLOW_NAT64=true the metadata
    +        # block is "always" — an opt-in for IPv4 reachability does NOT
    +        # license IMDS exposure.
    +        if is_nat64_wrapped_metadata_ip(ip):
    +            return True
    +
    +        # Operator escape hatch for IPv6-only deployments using DNS64+NAT64.
    +        # Read lazily (not at import) so test monkeypatching works and so the
    +        # value is not cached across env mutations. Cloud-metadata IPs are
    +        # ALWAYS blocked above, so this carve-out cannot reopen IMDS via
    +        # the IPv6-wrapped form.
    +        from ..settings.env_registry import get_env_setting
    +
    +        nat64_allowed = bool(get_env_setting("security.allow_nat64", False))
    +
             # Check if IP is in any blocked range
             for blocked_range in BLOCKED_IP_RANGES:
                 if ip in blocked_range:
    +                # NAT64 carve-out: when the operator has opted in, the two
    +                # NAT64 prefixes don't block. 6to4 / Teredo / discard remain
    +                # blocked unconditionally.
    +                if nat64_allowed and blocked_range in NAT64_PREFIXES:
    +                    continue
                     # If allow_private_ips is True, skip blocking for private + loopback
                     if allow_private_ips:
                         is_loopback = any(ip in lr for lr in LOOPBACK_RANGES)
    
  • src/local_deep_research/settings/env_definitions/security.py+19 0 modified
    @@ -43,6 +43,25 @@
             ),
             default=False,
         ),
    +    BooleanSetting(
    +        key="security.allow_nat64",
    +        description=(
    +            "Allow outbound traffic to NAT64 prefixes (64:ff9b::/96 RFC 6052 "
    +            "well-known and 64:ff9b:1::/48 RFC 8215 local-use). Disabled by "
    +            "default to close the IPv6-wrapped SSRF bypass class — on hosts "
    +            "configured with NAT64 routes, attacker-supplied URLs can wrap "
    +            "cloud-metadata or RFC1918 destinations through these prefixes. "
    +            "Enable only on IPv6-only deployments (DNS64+NAT64) where "
    +            "outbound IPv4 traffic is synthesized through this prefix and "
    +            "the operator has accepted the residual SSRF risk. 6to4 "
    +            "(2002::/16), Teredo (2001::/32), and the discard prefix "
    +            "(100::/64) remain unconditionally blocked because they have no "
    +            "live legitimate use in 2026. The cloud-metadata block "
    +            "(ALWAYS_BLOCKED_METADATA_IPS) still applies via embedded-IPv4 "
    +            "extraction — see SECURITY.md."
    +        ),
    +        default=False,
    +    ),
         BooleanSetting(
             key="notifications.allow_outbound",
             description=(
    
  • tests/security/test_ip_ranges.py+187 0 modified
    @@ -10,6 +10,17 @@
     import ipaddress
     
     
    +def _ip_is_private(ip_str: str) -> bool:
    +    """Module-level helper used across the IPv6-transition-prefix test
    +    classes. (TestPrivateIPDetection has its own copy that returns
    +    False for invalid IPs; this one is for callers that pass only
    +    well-formed addresses.)"""
    +    from local_deep_research.security.ip_ranges import PRIVATE_IP_RANGES
    +
    +    ip = ipaddress.ip_address(ip_str)
    +    return any(ip in network for network in PRIVATE_IP_RANGES)
    +
    +
     class TestPrivateIPRanges:
         """Tests for PRIVATE_IP_RANGES constant."""
     
    @@ -114,6 +125,65 @@ def test_contains_ipv6_unspecified(self):
             unspecified_v6 = ipaddress.ip_network("::/128")
             assert unspecified_v6 in PRIVATE_IP_RANGES
     
    +    def test_contains_6to4_prefix(self):
    +        """Should contain 2002::/16 (6to4 transition prefix). Wraps
    +        private IPv4 destinations on hosts with sit0 routes."""
    +        from local_deep_research.security.ip_ranges import PRIVATE_IP_RANGES
    +
    +        sixto4 = ipaddress.ip_network("2002::/16")
    +        assert sixto4 in PRIVATE_IP_RANGES
    +
    +    def test_contains_nat64_prefix(self):
    +        """Should contain 64:ff9b::/96 (NAT64 well-known prefix)."""
    +        from local_deep_research.security.ip_ranges import PRIVATE_IP_RANGES
    +
    +        nat64 = ipaddress.ip_network("64:ff9b::/96")
    +        assert nat64 in PRIVATE_IP_RANGES
    +
    +    def test_contains_nat64_local_use_prefix(self):
    +        """Should contain 64:ff9b:1::/48 (RFC 8215 NAT64 local-use prefix)
    +        — same SSRF threat class as the well-known prefix; missing it is
    +        the exact bypass paid out as a HackerOne bounty against
    +        ssrf_filter."""
    +        from local_deep_research.security.ip_ranges import PRIVATE_IP_RANGES
    +
    +        nat64_local = ipaddress.ip_network("64:ff9b:1::/48")
    +        assert nat64_local in PRIVATE_IP_RANGES
    +
    +    def test_nat64_prefixes_constant_exposes_both(self):
    +        """NAT64_PREFIXES must contain exactly the two NAT64 prefixes —
    +        used by validators to identify which deny entries the
    +        security.allow_nat64 env carve-out should skip."""
    +        from local_deep_research.security.ip_ranges import NAT64_PREFIXES
    +
    +        assert ipaddress.ip_network("64:ff9b::/96") in NAT64_PREFIXES
    +        assert ipaddress.ip_network("64:ff9b:1::/48") in NAT64_PREFIXES
    +        assert len(NAT64_PREFIXES) == 2
    +
    +    def test_contains_teredo_prefix(self):
    +        """Should contain 2001::/32 (Teredo)."""
    +        from local_deep_research.security.ip_ranges import PRIVATE_IP_RANGES
    +
    +        teredo = ipaddress.ip_network("2001::/32")
    +        assert teredo in PRIVATE_IP_RANGES
    +
    +    def test_contains_ipv6_discard_prefix(self):
    +        """Should contain 100::/64 (RFC 6666 discard)."""
    +        from local_deep_research.security.ip_ranges import PRIVATE_IP_RANGES
    +
    +        discard = ipaddress.ip_network("100::/64")
    +        assert discard in PRIVATE_IP_RANGES
    +
    +    def test_contains_ipv4_compatible_ipv6_prefix(self):
    +        """Should contain ::/96 (RFC 4291 IPv4-Compatible IPv6 — DEPRECATED).
    +        Same SSRF threat class as the transition prefixes: embeds an IPv4
    +        address in the low 32 bits and is routable on hosts with ::/96
    +        routes configured. [::169.254.169.254] would otherwise reach IMDS."""
    +        from local_deep_research.security.ip_ranges import PRIVATE_IP_RANGES
    +
    +        ipv4_compat = ipaddress.ip_network("::/96")
    +        assert ipv4_compat in PRIVATE_IP_RANGES
    +
     
     class TestPrivateIPDetection:
         """Tests for using PRIVATE_IP_RANGES to detect private IPs."""
    @@ -175,6 +245,123 @@ def test_ipv6_public_is_not_private(self):
             assert self._is_private("2001:4860:4860::8888") is False  # Google DNS
     
     
    +class TestIPv6TransitionPrefixesAntiCollision:
    +    """Anti-regression: confirm the new IPv6 transition-prefix entries
    +    do NOT swallow legitimate global IPv6 allocations at their boundaries.
    +
    +    Each prefix is precisely scoped at the bit level:
    +      - 2001::/32 fixes the second hextet to 0x0000 (Teredo only).
    +      - 2002::/16 fixes the first hextet to 0x2002 (6to4 only).
    +      - 64:ff9b::/96 fixes the first 96 bits (the well-known NAT64 prefix
    +        only; RFC 8215 local-use 64:ff9b:1::/48 must NOT match).
    +      - 100::/64 fixes the first 64 bits (RFC 6666 discard only; the rest
    +        of 100::/8 is unallocated, not discard).
    +    """
    +
    +    _is_private = staticmethod(_ip_is_private)
    +
    +    def test_google_dns_v6_not_blocked(self):
    +        """2001:4860:4860::8888 — second hextet 0x4860, outside 2001::/32."""
    +        assert self._is_private("2001:4860:4860::8888") is False
    +
    +    def test_cloudflare_dns_v6_not_blocked(self):
    +        """2606:4700:4700::1111 — first hextet 0x2606, far from 2001/2002."""
    +        assert self._is_private("2606:4700:4700::1111") is False
    +
    +    def test_documentation_prefix_v6_not_blocked(self):
    +        """2001:db8::/32 (RFC 3849) — second hextet 0x0db8, outside Teredo."""
    +        assert self._is_private("2001:db8::1") is False
    +
    +    def test_root_server_v6_not_blocked(self):
    +        """2001:500::/30 (root-server allocation) — second hextet 0x0500."""
    +        assert self._is_private("2001:500:88::1") is False
    +
    +    def test_he_tunnelbroker_v6_not_blocked(self):
    +        """2001:470::/32 (Hurricane Electric) — second hextet 0x0470."""
    +        assert self._is_private("2001:470:1f04::1") is False
    +
    +    def test_neighbor_above_6to4_not_blocked(self):
    +        """2003::/16 (Deutsche Telekom) sits adjacent to 2002::/16."""
    +        assert self._is_private("2003::1") is False
    +
    +    def test_neighbor_below_6to4_not_blocked(self):
    +        """2001:ffff::1 — last address of 2001:: space, not in 2002::/16."""
    +        assert self._is_private("2001:ffff::1") is False
    +
    +    # NOTE: RFC 8215's 64:ff9b:1::/48 (NAT64 local-use) IS now blocked —
    +    # see TestPrivateIPRanges::test_contains_nat64_local_use_prefix. It
    +    # is the same SSRF threat class as the well-known /96 and missing
    +    # it has been paid out as a HackerOne bounty against ssrf_filter.
    +
    +    def test_ipv6_discard_prefix_neighbor_not_blocked(self):
    +        """100:1::/64 — second hextet 0x0001, outside the /64 discard
    +        block. The surrounding 100::/8 is reserved-unallocated, not
    +        discard, so we must not over-block it."""
    +        assert self._is_private("100:1::1") is False
    +
    +
    +class TestIPv6TransitionPrefixesPositiveDetection:
    +    """Confirm the new transition prefixes detect their full address space,
    +    including embedded private-IPv4 wraps relevant to SSRF."""
    +
    +    _is_private = staticmethod(_ip_is_private)
    +
    +    def test_6to4_wraps_loopback(self):
    +        """[2002:7f00:1::] — 6to4 wrap of 127.0.0.1."""
    +        assert self._is_private("2002:7f00:1::") is True
    +
    +    def test_6to4_wraps_rfc1918_class_a(self):
    +        """[2002:0a00:1::] — 6to4 wrap of 10.0.0.1."""
    +        assert self._is_private("2002:0a00:1::") is True
    +
    +    def test_6to4_wraps_rfc1918_class_b(self):
    +        """[2002:ac10:1::] — 6to4 wrap of 172.16.0.1."""
    +        assert self._is_private("2002:ac10:1::") is True
    +
    +    def test_6to4_wraps_rfc1918_class_c(self):
    +        """[2002:c0a8:101::] — 6to4 wrap of 192.168.1.1."""
    +        assert self._is_private("2002:c0a8:101::") is True
    +
    +    def test_6to4_wraps_aws_metadata(self):
    +        """[2002:a9fe:a9fe::] — 6to4 wrap of 169.254.169.254 (AWS IMDS).
    +        High-value SSRF target; must be caught by the prefix block."""
    +        assert self._is_private("2002:a9fe:a9fe::") is True
    +
    +    def test_6to4_upper_boundary(self):
    +        """Last address in 2002::/16."""
    +        assert (
    +            self._is_private("2002:ffff:ffff:ffff:ffff:ffff:ffff:ffff") is True
    +        )
    +
    +    def test_nat64_wraps_loopback(self):
    +        """[64:ff9b::7f00:1] — NAT64 wrap of 127.0.0.1."""
    +        assert self._is_private("64:ff9b::7f00:1") is True
    +
    +    def test_nat64_wraps_rfc1918_class_a(self):
    +        """[64:ff9b::a00:1] — NAT64 wrap of 10.0.0.1."""
    +        assert self._is_private("64:ff9b::a00:1") is True
    +
    +    def test_nat64_wraps_rfc1918_class_b(self):
    +        """[64:ff9b::ac10:1] — NAT64 wrap of 172.16.0.1."""
    +        assert self._is_private("64:ff9b::ac10:1") is True
    +
    +    def test_nat64_wraps_aws_metadata(self):
    +        """[64:ff9b::a9fe:a9fe] — NAT64 wrap of 169.254.169.254 (AWS IMDS)."""
    +        assert self._is_private("64:ff9b::a9fe:a9fe") is True
    +
    +    def test_teredo_lower_boundary(self):
    +        """2001:0:0:0:0:0:0:0 — first address in 2001::/32."""
    +        assert self._is_private("2001::") is True
    +
    +    def test_teredo_upper_boundary(self):
    +        """2001:0:ffff:ffff:ffff:ffff:ffff:ffff — last address in 2001::/32."""
    +        assert self._is_private("2001:0:ffff:ffff:ffff:ffff:ffff:ffff") is True
    +
    +    def test_discard_prefix_upper_boundary(self):
    +        """Last address in 100::/64."""
    +        assert self._is_private("100::ffff:ffff:ffff:ffff") is True
    +
    +
     class TestIPRangesUsedByValidators:
         """Tests to verify PRIVATE_IP_RANGES is correctly imported by validators."""
     
    
  • tests/security/test_notification_validator.py+205 0 modified
    @@ -524,3 +524,208 @@ def test_private_ip_ranges_exist(self):
             assert "127.0.0.0/8" in range_strings
             assert "10.0.0.0/8" in range_strings
             assert "192.168.0.0/16" in range_strings
    +
    +
    +class TestNat64EnvOptOutInNotificationValidator:
    +    """Mirror of ssrf_validator's TestNat64EnvOptOut for the notification
    +    path. The notification validator must honor the same operator
    +    opt-in semantics AND keep the cloud-metadata block absolute."""
    +
    +    def test_nat64_wkp_blocked_when_env_unset(self, monkeypatch):
    +        monkeypatch.delenv("LDR_SECURITY_ALLOW_NAT64", raising=False)
    +        # 64:ff9b::a00:1 is the NAT64 wrap of 10.0.0.1.
    +        assert NotificationURLValidator._is_private_ip("64:ff9b::a00:1") is True
    +
    +    def test_nat64_wkp_allowed_when_env_true(self, monkeypatch):
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        # NAT64 wrap of 8.8.8.8 — canonical IPv6-only-deployment use case.
    +        assert (
    +            NotificationURLValidator._is_private_ip("64:ff9b::808:808") is False
    +        )
    +
    +    def test_nat64_local_use_allowed_when_env_true(self, monkeypatch):
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        assert (
    +            NotificationURLValidator._is_private_ip("64:ff9b:1::808:808")
    +            is False
    +        )
    +
    +    def test_imds_via_nat64_wkp_wrap_blocked_under_env_true(self, monkeypatch):
    +        """[64:ff9b::a9fe:a9fe] — NAT64 WKP wrap of 169.254.169.254.
    +        Must remain blocked even with the operator opt-in. Mirrors the
    +        ssrf_validator embedded-IPv4 IMDS check."""
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        assert (
    +            NotificationURLValidator._is_private_ip("64:ff9b::a9fe:a9fe")
    +            is True
    +        )
    +
    +    def test_imds_via_nat64_local_use_wrap_blocked_under_env_true(
    +        self, monkeypatch
    +    ):
    +        """Same lock-in for the RFC 8215 local-use prefix wrap."""
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        assert (
    +            NotificationURLValidator._is_private_ip("64:ff9b:1::a9fe:a9fe")
    +            is True
    +        )
    +
    +    def test_ecs_metadata_via_nat64_wrap_blocked_under_env_true(
    +        self, monkeypatch
    +    ):
    +        """169.254.170.2 = 0xa9feaa02 — AWS ECS task metadata v3."""
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        assert (
    +            NotificationURLValidator._is_private_ip("64:ff9b::a9fe:aa02")
    +            is True
    +        )
    +
    +    def test_alibaba_metadata_via_nat64_wrap_blocked_under_env_true(
    +        self, monkeypatch
    +    ):
    +        """100.100.100.200 = 0x646464c8 — AlibabaCloud metadata."""
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        assert (
    +            NotificationURLValidator._is_private_ip("64:ff9b::6464:64c8")
    +            is True
    +        )
    +
    +    def test_env_does_not_unblock_6to4_in_notification_path(self, monkeypatch):
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        assert (
    +            NotificationURLValidator._is_private_ip("2002:c0a8:101::") is True
    +        )
    +
    +    def test_env_does_not_unblock_teredo_in_notification_path(
    +        self, monkeypatch
    +    ):
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        assert NotificationURLValidator._is_private_ip("2001::1") is True
    +
    +    def test_imds_via_nat64_wrap_blocked_when_env_unset(self, monkeypatch):
    +        """Sanity: the IMDS embedded-IPv4 check fires regardless of env
    +        state — when env is unset, the NAT64 prefix entry already blocks
    +        directly, but the embedded-IPv4 path is still well-formed."""
    +        monkeypatch.delenv("LDR_SECURITY_ALLOW_NAT64", raising=False)
    +        assert (
    +            NotificationURLValidator._is_private_ip("64:ff9b::a9fe:a9fe")
    +            is True
    +        )
    +
    +    def test_ipv4_mapped_imds_blocked(self, monkeypatch):
    +        """Cross-validator parity: ssrf_validator unwraps IPv4-mapped
    +        IPv6 (``::ffff:169.254.169.254``) before the IMDS literal check.
    +        notification_validator must do the same — otherwise an attacker
    +        who can configure a webhook URL can reach IMDS via the IPv4-
    +        mapped form. Pre-PR this was a real gap; locked in here so it
    +        cannot regress."""
    +        monkeypatch.delenv("LDR_SECURITY_ALLOW_NAT64", raising=False)
    +        assert (
    +            NotificationURLValidator._is_private_ip("::ffff:169.254.169.254")
    +            is True
    +        )
    +
    +    def test_ipv4_mapped_loopback_blocked(self, monkeypatch):
    +        """Same parity check for the loopback IPv4-mapped form."""
    +        monkeypatch.delenv("LDR_SECURITY_ALLOW_NAT64", raising=False)
    +        assert (
    +            NotificationURLValidator._is_private_ip("::ffff:127.0.0.1") is True
    +        )
    +
    +    def test_ipv4_mapped_public_ip_passes(self, monkeypatch):
    +        """Anti-collision: the unwrap must not over-block public IPv4."""
    +        monkeypatch.delenv("LDR_SECURITY_ALLOW_NAT64", raising=False)
    +        assert (
    +            NotificationURLValidator._is_private_ip("::ffff:8.8.8.8") is False
    +        )
    +
    +    def test_validate_service_url_imds_blocked_under_allow_private_ips(self):
    +        """Round-3 audit regression: validate_service_url with
    +        allow_private_ips=True previously short-circuited the entire
    +        host check, allowing http://169.254.169.254/ through. The opt-in
    +        is for self-hosted webhooks on internal networks, not for IMDS
    +        exfiltration. ALWAYS_BLOCKED_METADATA_IPS must remain absolute."""
    +        is_valid, error = NotificationURLValidator.validate_service_url(
    +            "http://169.254.169.254/latest/meta-data/",
    +            allow_private_ips=True,
    +        )
    +        assert is_valid is False
    +        assert error is not None
    +
    +    def test_validate_service_url_imds_v6_mapped_blocked_under_allow_private_ips(
    +        self,
    +    ):
    +        is_valid, _ = NotificationURLValidator.validate_service_url(
    +            "http://[::ffff:169.254.169.254]/", allow_private_ips=True
    +        )
    +        assert is_valid is False
    +
    +    def test_validate_service_url_imds_via_nat64_wkp_blocked_under_allow_private_ips(
    +        self,
    +    ):
    +        is_valid, _ = NotificationURLValidator.validate_service_url(
    +            "http://[64:ff9b::a9fe:a9fe]/", allow_private_ips=True
    +        )
    +        assert is_valid is False
    +
    +    def test_validate_service_url_imds_via_nat64_local_use_blocked_under_allow_private_ips(
    +        self,
    +    ):
    +        is_valid, _ = NotificationURLValidator.validate_service_url(
    +            "http://[64:ff9b:1::a9fe:a9fe]/", allow_private_ips=True
    +        )
    +        assert is_valid is False
    +
    +    def test_validate_service_url_alibaba_metadata_blocked_under_allow_private_ips(
    +        self,
    +    ):
    +        """100.100.100.200 is in ALWAYS_BLOCKED_METADATA_IPS and ALSO in
    +        the CGNAT range (100.64.0.0/10) — pre-fix the carve-out for
    +        CGNAT under allow_private_ips=True would have leaked it."""
    +        is_valid, _ = NotificationURLValidator.validate_service_url(
    +            "http://100.100.100.200/", allow_private_ips=True
    +        )
    +        assert is_valid is False
    +
    +    def test_validate_service_url_rfc1918_allowed_under_allow_private_ips(self):
    +        """Anti-collision: the fix must not over-block legitimate
    +        self-hosted webhook destinations. allow_private_ips=True is
    +        designed for exactly this case."""
    +        is_valid, _ = NotificationURLValidator.validate_service_url(
    +            "http://192.168.1.100/webhook", allow_private_ips=True
    +        )
    +        assert is_valid is True
    +
    +    def test_validate_service_url_localhost_allowed_under_allow_private_ips(
    +        self,
    +    ):
    +        is_valid, _ = NotificationURLValidator.validate_service_url(
    +            "http://localhost:5000/webhook", allow_private_ips=True
    +        )
    +        assert is_valid is True
    +
    +    def test_dns_resolved_imds_via_nat64_blocked_under_env_true(
    +        self, monkeypatch
    +    ):
    +        """Hostname-resolution branch: a hostname that resolves to a
    +        NAT64-wrapped IMDS IPv4 must still be blocked under env opt-in.
    +        This exercises the second call site of _ip_matches_blocked_range."""
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        # AF_INET6 result tuple: (family, type, proto, canonname, sockaddr)
    +        # sockaddr for IPv6 is (host, port, flowinfo, scopeid)
    +        with patch(
    +            "socket.getaddrinfo",
    +            return_value=[
    +                (
    +                    socket.AF_INET6,
    +                    socket.SOCK_STREAM,
    +                    6,
    +                    "",
    +                    ("64:ff9b::a9fe:a9fe", 0, 0, 0),
    +                )
    +            ],
    +        ):
    +            assert (
    +                NotificationURLValidator._is_private_ip("imds.attacker.example")
    +                is True
    +            )
    
  • tests/security/test_ssrf_validator.py+500 19 modified
    @@ -1176,37 +1176,518 @@ def test_pathological_input_returns_bool(self, weird_input):
             assert isinstance(result, bool)
     
     
    -class TestOutOfScopeBehaviorLockedIn:
    +class TestIPv6TransitionPrefixesBlocked:
    +    """IPv6 transition prefixes (6to4, NAT64, Teredo, discard) are now
    +    blocked. On Linux hosts with kernel sit0/NAT64 routes configured,
    +    these prefixes wrap private IPv4 destinations. Default Linux has no
    +    such routes (so this isn't exploitable in the typical deployment),
    +    but blocking them closes the gap for operators who do enable
    +    transition tunnels."""
    +
    +    def test_6to4_wrapped_loopback_blocked(self):
    +        """``[2002:7f00:1::]`` — 6to4 wrap of 127.0.0.1."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[2002:7f00:1::]/") is False
    +
    +    def test_6to4_wrapped_rfc1918_blocked(self):
    +        """``[2002:c0a8:101::]`` — 6to4 wrap of 192.168.1.1."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[2002:c0a8:101::]/") is False
    +
    +    def test_nat64_wrapped_loopback_blocked(self):
    +        """``[64:ff9b::7f00:1]`` — NAT64 wrap of 127.0.0.1."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[64:ff9b::7f00:1]/") is False
    +
    +    def test_teredo_prefix_blocked(self):
    +        """Teredo (2001::/32) tunnels IPv6-over-UDP/IPv4."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[2001::1]/") is False
    +
    +    def test_ipv6_discard_prefix_blocked(self):
    +        """RFC 6666 discard prefix (100::/64) is reserved for sinkholes."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[100::1]/") is False
    +
    +    def test_6to4_wraps_aws_metadata_blocked(self):
    +        """[2002:a9fe:a9fe::] — 6to4 wrap of 169.254.169.254 (AWS IMDS).
    +        Cloud metadata is the highest-value SSRF target; the 2002::/16
    +        block is what catches this case (the IMDS hardcoded literal
    +        check is on the IPv4 form only)."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[2002:a9fe:a9fe::]/") is False
    +
    +    def test_nat64_wraps_aws_metadata_blocked(self):
    +        """[64:ff9b::a9fe:a9fe] — NAT64 wrap of 169.254.169.254 (AWS IMDS)."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[64:ff9b::a9fe:a9fe]/") is False
    +
    +    def test_6to4_wraps_rfc1918_class_a_blocked(self):
    +        """[2002:0a00:1::] — 6to4 wrap of 10.0.0.1."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[2002:0a00:1::]/") is False
    +
    +    def test_6to4_wraps_rfc1918_class_b_blocked(self):
    +        """[2002:ac10:1::] — 6to4 wrap of 172.16.0.1."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[2002:ac10:1::]/") is False
    +
    +    def test_nat64_wraps_rfc1918_class_a_blocked(self):
    +        """[64:ff9b::a00:1] — NAT64 wrap of 10.0.0.1."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[64:ff9b::a00:1]/") is False
    +
    +    def test_nat64_local_use_prefix_blocked(self):
    +        """RFC 8215's 64:ff9b:1::/48 (NAT64 local-use) is the same SSRF
    +        threat class as the well-known /96. On hosts configured to route
    +        the local-use prefix, [64:ff9b:1::a9fe:a9fe] reaches AWS IMDS
    +        identically to the WKP form. Missing this prefix earned a
    +        HackerOne bounty against the Ruby ssrf_filter library."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[64:ff9b:1::1]/") is False
    +
    +    def test_nat64_local_use_wraps_aws_metadata_blocked(self):
    +        """[64:ff9b:1::a9fe:a9fe] — local-use NAT64 wrap of 169.254.169.254."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[64:ff9b:1::a9fe:a9fe]/") is False
    +
    +    def test_ipv4_compatible_imds_blocked(self):
    +        """[::169.254.169.254] — RFC 4291 IPv4-Compatible IPv6 form
    +        (DEPRECATED 2006). On hosts with ::/96 routes this reaches IMDS
    +        identically to the IPv4-mapped and NAT64-wrapped forms. Same
    +        defense-in-depth class as the transition prefixes."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[::169.254.169.254]/") is False
    +
    +    def test_ipv4_compatible_imds_hex_form_blocked(self):
    +        """Same address, hex form: [::a9fe:a9fe]."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[::a9fe:a9fe]/") is False
    +
    +    def test_ipv4_compatible_rfc1918_blocked(self):
    +        """[::192.168.1.1] — IPv4-Compatible wrap of RFC1918."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[::192.168.1.1]/") is False
    +
    +
    +class TestIPv6TransitionPrefixesAllowFlagMatrix:
    +    """Lock in the design decision: ``allow_private_ips=True`` does NOT
    +    bypass the IPv6 transition prefixes (2002::/16, 64:ff9b::/96,
    +    2001::/32, 100::/64). The override carve-out only covers the local
    +    LOOPBACK_RANGES + PRIVATE_RANGES lists in ssrf_validator.py; the
    +    transition prefixes are intentionally excluded so that an attacker
    +    cannot reach a private IPv4 destination by tunneling through 6to4
    +    or NAT64 even when the operator has set ``allow_private_ips=True``
    +    for a self-hosted service like Ollama.
    +
    +    If you ever need a self-hosted service reachable via 6to4 or
    +    NAT64, that's a deliberate config decision and the design here
    +    forces it to be made explicitly.
         """
    -    Behaviours documented as out-of-scope (filed as separate hardening
    -    issues, not a bypass of GHSA-g23j-2vwm-5c25). Tests here lock in the
    -    *current* behaviour so the gap is visible — if we later harden these,
    -    these tests should flip and be moved into the bypass class.
    +
    +    def test_6to4_blocked_under_allow_localhost(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            is_ip_blocked,
    +        )
    +
    +        assert is_ip_blocked("2002:7f00:1::", allow_localhost=True) is True
    +
    +    def test_6to4_blocked_under_allow_private_ips(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            is_ip_blocked,
    +        )
    +
    +        assert is_ip_blocked("2002:c0a8:101::", allow_private_ips=True) is True
    +
    +    def test_nat64_blocked_under_allow_localhost(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            is_ip_blocked,
    +        )
    +
    +        assert is_ip_blocked("64:ff9b::7f00:1", allow_localhost=True) is True
    +
    +    def test_nat64_blocked_under_allow_private_ips(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            is_ip_blocked,
    +        )
    +
    +        assert is_ip_blocked("64:ff9b::a00:1", allow_private_ips=True) is True
    +
    +    def test_teredo_blocked_under_allow_private_ips(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            is_ip_blocked,
    +        )
    +
    +        assert is_ip_blocked("2001::1", allow_private_ips=True) is True
    +
    +    def test_discard_blocked_under_allow_private_ips(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            is_ip_blocked,
    +        )
    +
    +        assert is_ip_blocked("100::1", allow_private_ips=True) is True
    +
    +    def test_6to4_aws_metadata_blocked_under_allow_private_ips(self):
    +        """High-value: even with the most permissive flag, the 6to4 wrap
    +        of AWS IMDS must remain blocked."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert (
    +            validate_url("http://[2002:a9fe:a9fe::]/", allow_private_ips=True)
    +            is False
    +        )
    +
    +    def test_nat64_aws_metadata_blocked_under_allow_private_ips(self):
    +        """Same locking-in for NAT64 wrap of IMDS."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert (
    +            validate_url("http://[64:ff9b::a9fe:a9fe]/", allow_private_ips=True)
    +            is False
    +        )
    +
    +
    +class TestIPv6TransitionPrefixesAntiCollision:
    +    """Anti-regression: legitimate IPv6 destinations adjacent to the new
    +    transition prefixes must still pass validation. These tests guard
    +    against accidental over-blocking if anyone widens a prefix later."""
    +
    +    def test_google_dns_v6_passes(self):
    +        """2001:4860:4860::8888 — Google Public DNS. Second hextet 0x4860
    +        is outside the 2001::/32 Teredo block."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[2001:4860:4860::8888]/") is True
    +
    +    def test_cloudflare_dns_v6_passes(self):
    +        """2606:4700:4700::1111 — Cloudflare Public DNS, far from any
    +        transition prefix."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[2606:4700:4700::1111]/") is True
    +
    +    def test_root_server_v6_passes(self):
    +        """2001:500::/30 root-server allocation — second hextet 0x0500
    +        is outside Teredo."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[2001:500:88::1]/") is True
    +
    +    def test_he_tunnelbroker_v6_passes(self):
    +        """2001:470::/32 Hurricane Electric — second hextet 0x0470."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[2001:470:1f04::1]/") is True
    +
    +    def test_neighbor_above_6to4_passes(self):
    +        """2003::/16 sits adjacent to 2002::/16 but is not in it."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[2003::1]/") is True
    +
    +    def test_discard_prefix_neighbor_passes(self):
    +        """100:1::/16 sits outside the 100::/64 discard prefix
    +        (second hextet 0x0001)."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[100:1::1]/") is True
    +
    +
    +class TestNat64EnvOptOut:
    +    """Operator escape hatch: ``LDR_SECURITY_ALLOW_NAT64=true`` opens the
    +    two NAT64 prefixes for IPv6-only deployments using DNS64+NAT64.
    +
    +    Critical invariants:
    +    - The carve-out is ONLY for the two NAT64 prefixes (well-known and
    +      RFC 8215 local-use). 6to4, Teredo, discard remain blocked.
    +    - The carve-out does NOT reopen the IPv4-form cloud-metadata block
    +      (169.254.169.254 stays blocked).
    +    - Reading the env var lazily (per-call, not at import) means
    +      monkeypatching works in tests.
         """
     
    -    def test_6to4_wrapped_loopback_currently_passes(self):
    -        """
    -        ``[2002:7f00:1::]`` is the 6to4 wrap of ``127.0.0.1``. On hosts
    -        with kernel sit0/6to4 routes configured, this routes to ``127.0.0.1``.
    -        Default Linux has no such route, so this is not exploitable in
    -        the default configuration — but the validator does not catch it.
    -        Filed separately.
    -        """
    +    def test_nat64_wkp_blocked_when_env_unset(self, monkeypatch):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        monkeypatch.delenv("LDR_SECURITY_ALLOW_NAT64", raising=False)
    +        assert validate_url("http://[64:ff9b::a00:1]/") is False
    +
    +    def test_nat64_wkp_allowed_when_env_true(self, monkeypatch):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        # 64:ff9b::8.8.8.8 — NAT64 wrap of Google DNS, the canonical
    +        # IPv6-only-deployment use case.
    +        assert validate_url("http://[64:ff9b::808:808]/") is True
    +
    +    def test_nat64_local_use_allowed_when_env_true(self, monkeypatch):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        assert validate_url("http://[64:ff9b:1::808:808]/") is True
    +
    +    def test_env_does_not_unblock_6to4(self, monkeypatch):
    +        """6to4 has no live legitimate use; the operator switch must
    +        not extend to it."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        assert validate_url("http://[2002:c0a8:101::]/") is False
    +
    +    def test_env_does_not_unblock_teredo(self, monkeypatch):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        assert validate_url("http://[2001::1]/") is False
    +
    +    def test_env_does_not_unblock_discard(self, monkeypatch):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        assert validate_url("http://[100::1]/") is False
    +
    +    def test_env_does_not_unblock_imds_v4_literal(self, monkeypatch):
    +        """The IPv4-form metadata literal is in ALWAYS_BLOCKED_METADATA_IPS
    +        and is checked BEFORE the prefix loop. The NAT64 carve-out
    +        cannot reach it."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        assert validate_url("http://169.254.169.254/") is False
    +
    +    def test_env_does_not_unblock_imds_via_nat64_wkp_wrap(self, monkeypatch):
    +        """The IMDS embedded-IPv4 check fires before the NAT64 carve-out:
    +        even with operator opt-in, [64:ff9b::a9fe:a9fe] (NAT64 WKP wrap
    +        of 169.254.169.254) stays blocked. ALWAYS_BLOCKED_METADATA_IPS
    +        is absolute by design."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        assert validate_url("http://[64:ff9b::a9fe:a9fe]/") is False
    +
    +    def test_env_does_not_unblock_imds_via_nat64_local_use_wrap(
    +        self, monkeypatch
    +    ):
    +        """Same lock-in for the RFC 8215 local-use prefix wrap."""
             from src.local_deep_research.security.ssrf_validator import (
                 validate_url,
             )
     
    -        # Document current behaviour: passes.  If we ever add the IPv6
    -        # transition prefixes to BLOCKED_IP_RANGES, flip this assertion.
    -        assert validate_url("http://[2002:7f00:1::]/") is True
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        assert validate_url("http://[64:ff9b:1::a9fe:a9fe]/") is False
     
    -    def test_nat64_wrapped_loopback_currently_passes(self):
    -        """``[64:ff9b::7f00:1]`` is the NAT64 wrap of ``127.0.0.1``."""
    +    def test_env_does_not_unblock_ecs_metadata_via_nat64_wrap(
    +        self, monkeypatch
    +    ):
    +        """169.254.170.2 (AWS ECS task metadata v3) is also in the
    +        always-blocked set; NAT64 wrap stays blocked under opt-in."""
             from src.local_deep_research.security.ssrf_validator import (
                 validate_url,
             )
     
    -        assert validate_url("http://[64:ff9b::7f00:1]/") is True
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        # 169.254.170.2 = 0xa9feaa02
    +        assert validate_url("http://[64:ff9b::a9fe:aa02]/") is False
    +
    +    def test_env_falsy_values_keep_blocked(self, monkeypatch):
    +        """'false', '0', and unset must all keep the block in place."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        for value in ("false", "0", "no", ""):
    +            monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", value)
    +            assert validate_url("http://[64:ff9b::a00:1]/") is False, (
    +                f"NAT64 must remain blocked for env value {value!r}"
    +            )
    +
    +    def test_env_true_does_not_bypass_loopback_in_block_list(self, monkeypatch):
    +        """Sanity: opting into NAT64 must not accidentally unblock
    +        non-NAT64 entries that share a prefix family."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            is_ip_blocked,
    +        )
    +
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        assert is_ip_blocked("127.0.0.1") is True
    +        assert is_ip_blocked("::1") is True
    +
    +    def test_env_true_does_not_unblock_ipv6_ula(self, monkeypatch):
    +        """The carve-out's ``continue`` lives in the same loop that walks
    +        ULA (fc00::/7) and link-local (fe80::/10). Pin that opting into
    +        NAT64 does not accidentally unblock these adjacent IPv6 ranges."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            is_ip_blocked,
    +        )
    +
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        assert is_ip_blocked("fc00::1") is True
    +        assert is_ip_blocked("fd12:3456:789a::1") is True
    +
    +    def test_env_true_does_not_unblock_ipv6_link_local(self, monkeypatch):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            is_ip_blocked,
    +        )
    +
    +        monkeypatch.setenv("LDR_SECURITY_ALLOW_NAT64", "true")
    +        assert is_ip_blocked("fe80::1") is True
    +
    +
    +class TestIsNat64WrappedMetadataIp:
    +    """Direct unit tests for the shared helper. Both validators rely on
    +    its IPv4 short-circuit and on the metadata-set membership check;
    +    surface those contracts explicitly so a refactor of either branch
    +    can't silently flip them."""
    +
    +    def test_returns_false_for_ipv4(self):
    +        """The helper must short-circuit for IPv4 inputs because it's
    +        called after ``is_ip_blocked`` unwraps IPv4-mapped IPv6 — at
    +        that point the address is no longer IPv6 and the NAT64 check
    +        does not apply."""
    +        import ipaddress
    +        from src.local_deep_research.security.ssrf_validator import (
    +            is_nat64_wrapped_metadata_ip,
    +        )
    +
    +        assert (
    +            is_nat64_wrapped_metadata_ip(
    +                ipaddress.IPv4Address("169.254.169.254")
    +            )
    +            is False
    +        )
    +
    +    def test_returns_false_for_non_nat64_ipv6(self):
    +        """Public IPv6 (Google DNS) is not in any NAT64 prefix."""
    +        import ipaddress
    +        from src.local_deep_research.security.ssrf_validator import (
    +            is_nat64_wrapped_metadata_ip,
    +        )
    +
    +        assert (
    +            is_nat64_wrapped_metadata_ip(
    +                ipaddress.IPv6Address("2001:4860:4860::8888")
    +            )
    +            is False
    +        )
    +
    +    def test_returns_false_for_nat64_wrap_of_non_metadata(self):
    +        """[64:ff9b::a00:1] (NAT64 wrap of 10.0.0.1) is in a NAT64
    +        prefix but the embedded IPv4 is not metadata — helper returns
    +        False so the broader carve-out logic can apply."""
    +        import ipaddress
    +        from src.local_deep_research.security.ssrf_validator import (
    +            is_nat64_wrapped_metadata_ip,
    +        )
    +
    +        assert (
    +            is_nat64_wrapped_metadata_ip(
    +                ipaddress.IPv6Address("64:ff9b::a00:1")
    +            )
    +            is False
    +        )
    +
    +    def test_returns_true_for_imds_via_wkp(self):
    +        import ipaddress
    +        from src.local_deep_research.security.ssrf_validator import (
    +            is_nat64_wrapped_metadata_ip,
    +        )
    +
    +        assert (
    +            is_nat64_wrapped_metadata_ip(
    +                ipaddress.IPv6Address("64:ff9b::a9fe:a9fe")
    +            )
    +            is True
    +        )
    +
    +    def test_returns_true_for_imds_via_local_use(self):
    +        import ipaddress
    +        from src.local_deep_research.security.ssrf_validator import (
    +            is_nat64_wrapped_metadata_ip,
    +        )
    +
    +        assert (
    +            is_nat64_wrapped_metadata_ip(
    +                ipaddress.IPv6Address("64:ff9b:1::a9fe:a9fe")
    +            )
    +            is True
    +        )
     
     
     class TestValidateUrlEdgeCases:
    
c27395301fc2

test(security): lock in real-world URL fixtures + behavior changes from #3873/#3882 (#3889)

https://github.com/LearningCircuit/local-deep-researchLearningCircuitMay 9, 2026via body-scan-shorthand
1 file changed · +290 0
  • tests/security/test_real_world_urls_regression_prevention.py+290 0 added
    @@ -0,0 +1,290 @@
    +"""
    +Regression-prevention fixtures for the SSRF hardening (PR #3873, #3882).
    +
    +This is a defensive regression net: if a future change to ``validate_url``
    +accidentally rejects URLs LDR is documented to fetch, these tests fail
    +loudly. Patterns extracted from a real codebase audit of:
    +
    +- ``src/local_deep_research/research_library/downloaders/`` (academic)
    +- ``src/local_deep_research/web_search_engines/engines/`` (search)
    +- ``src/local_deep_research/llm/providers/implementations/`` (LLM)
    +- ``src/local_deep_research/notifications/`` (Apprise)
    +
    +Plus a complementary list of attack URLs that MUST stay blocked, and a
    +behaviour-change lock-in class for the deliberate semantic changes in
    +PR #3873 (None handling, whitespace stripping) and PR #3882 (log
    +redaction).
    +"""
    +
    +import time
    +
    +import pytest
    +from unittest.mock import patch
    +
    +
    +# DNS resolution mock — return a public IP so the validation pipeline
    +# reaches the IP-block check (which is the only thing that needs network).
    +_PUBLIC_DNS_RESPONSE = [(2, 1, 6, "", ("93.184.216.34", 0))]
    +
    +
    +# -----------------------------------------------------------------------
    +# REAL-WORLD URLS THAT MUST PASS
    +# -----------------------------------------------------------------------
    +# If any of these stops passing validate_url, an LDR user feature breaks.
    +# Categories: academic, search, llm, notifications, idn, edge, ipv6.
    +
    +REAL_WORLD_URLS_THAT_MUST_PASS = [
    +    # ---- Academic paper sources ----
    +    ("https://arxiv.org/abs/2401.12345", "academic"),
    +    ("https://arxiv.org/pdf/2401.12345v1.pdf", "academic"),
    +    ("https://export.arxiv.org/api/query?id_list=2401.12345", "academic"),
    +    ("https://pubmed.ncbi.nlm.nih.gov/35123456/", "academic"),
    +    ("https://www.ncbi.nlm.nih.gov/pmc/articles/PMC1234567/", "academic"),
    +    (
    +        "https://www.ebi.ac.uk/europepmc/webservices/rest/search",
    +        "academic",
    +    ),
    +    (
    +        "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/elink.fcgi",
    +        "academic",
    +    ),
    +    (
    +        "https://www.biorxiv.org/content/10.1101/2024.01.01.123456v1.full.pdf",
    +        "academic",
    +    ),
    +    (
    +        "https://api.openalex.org/works?filter=doi:10.1038/nature.2024.12345",
    +        "academic",
    +    ),
    +    (
    +        "https://api.semanticscholar.org/graph/v1/paper/CORPUSID:12345",
    +        "academic",
    +    ),
    +    ("https://doi.org/10.1038/nature12345", "academic"),
    +    ("https://ui.adsabs.harvard.edu/abs/2024ApJ...123..456A", "academic"),
    +    # ---- Search / reference ----
    +    ("https://en.wikipedia.org/wiki/Machine_learning", "search"),
    +    # encoded umlaut — common Wikipedia article URL form
    +    ("https://en.wikipedia.org/wiki/M%C3%BCnchen", "search"),
    +    (
    +        "https://web.archive.org/cdx/search/cdx?url=example.com",
    +        "search",
    +    ),
    +    ("https://api.tavily.com/search", "search"),
    +    ("https://api.exa.ai/search", "search"),
    +    (
    +        "https://openlibrary.org/api/books?bibkeys=ISBN:0451524934",
    +        "search",
    +    ),
    +    ("https://www.gutenberg.org/ebooks/12345", "search"),
    +    ("https://content.guardianapis.com/search", "search"),
    +    (
    +        "https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/cid/2244/JSON",
    +        "search",
    +    ),
    +    # ---- LLM provider default endpoints (from openai_base/google/etc.) ----
    +    # If these stop passing, LDR cannot talk to its own configured providers.
    +    ("https://api.openai.com/v1/models", "llm"),
    +    ("https://api.anthropic.com/v1/messages", "llm"),
    +    ("https://openrouter.ai/api/v1/chat/completions", "llm"),
    +    ("https://api.x.ai/v1/chat/completions", "llm"),
    +    ("https://generativelanguage.googleapis.com/v1beta/openai", "llm"),
    +    ("https://openai.inference.de-txl.ionos.com/v1", "llm"),
    +    # ---- IDN / non-Latin domains (urllib3 auto-Punycodes) ----
    +    # These exercise the urllib3 host-extraction path and confirm that
    +    # users in Asia / Cyrillic / Han regions are not blocked.
    +    ("https://例え.jp/", "idn"),
    +    ("https://привет.рф/", "idn"),
    +    ("https://中国.cn/", "idn"),
    +    ("https://xn--mnchen-3ya.de/", "idn"),  # pre-Punycoded München
    +    # ---- Edge cases — RFC-legal patterns ----
    +    ("https://api.example.com/v1?keys[]=foo&keys[]=bar", "edge"),
    +    (
    +        "https://api.example.com/v1/items?since=2024-01-01T00:00:00Z",
    +        "edge",
    +    ),
    +    ("https://example.com/path/with+plus", "edge"),
    +    ("https://example.com/?q=hello+world", "edge"),
    +    ("https://user:pass@example.com/", "edge"),
    +    ("https://example.com./", "edge"),  # FQDN trailing dot
    +    ("https://example.com/file.pdf;jsessionid=abc123", "edge"),
    +    # encoded backslash in PATH is RFC-legal — distinct from %5C in netloc
    +    ("https://example.com/path%5Cfile", "edge"),
    +    (
    +        "https://example.com/path/with-hyphens_and_underscores.html",
    +        "edge",
    +    ),
    +    # ---- IPv6 public addresses ----
    +    ("https://[2001:db8::1]/", "ipv6"),
    +    ("https://[2001:db8::1]:8080/", "ipv6"),
    +]
    +
    +
    +# -----------------------------------------------------------------------
    +# REAL-WORLD URLS THAT MUST FAIL (security sentinels)
    +# -----------------------------------------------------------------------
    +# If any of these starts passing, the SSRF hardening has regressed.
    +
    +REAL_WORLD_URLS_THAT_MUST_FAIL = [
    +    # ---- GHSA-g23j-2vwm-5c25 canonical ----
    +    ("http://127.0.0.1:6666\\@1.1.1.1", "advisory_canonical"),
    +    ("http://127.0.0.1:6666/%5C@1.1.1.1", "advisory_post_prepare"),
    +    # ---- IPv6 unspecified bypass (caught in PR #3873 review) ----
    +    ("http://[::]/", "ipv6_unspecified"),
    +    ("http://[0::]/", "ipv6_unspecified_alt"),
    +    ("http://[0:0:0:0:0:0:0:0]/", "ipv6_unspecified_full"),
    +    # ---- Cloud metadata — always blocked under every flag ----
    +    ("http://169.254.169.254/latest/meta-data/", "aws_imds"),
    +    ("http://169.254.170.2/v2/credentials/", "aws_ecs_v3"),
    +    ("http://169.254.170.23/v4/credentials/", "aws_ecs_v4"),
    +    ("http://169.254.0.23/", "tencent"),
    +    ("http://100.100.100.200/latest/meta-data/", "alibaba"),
    +    # ---- Loopback / private (default flags) ----
    +    ("http://127.0.0.1/", "ipv4_loopback"),
    +    ("http://[::1]/", "ipv6_loopback"),
    +    # ---- Forbidden chars (Layer 1) ----
    +    ("http://example.com/path with space", "whitespace"),
    +    ("http://example.com\t/", "tab"),
    +    ("http://example.com\n/", "newline"),
    +]
    +
    +
    +class TestRealWorldUrlsRegressionPrevention:
    +    """Lock in that legitimate URL patterns LDR fetches keep working."""
    +
    +    @pytest.mark.parametrize("url,category", REAL_WORLD_URLS_THAT_MUST_PASS)
    +    def test_legitimate_url_passes(self, url, category):
    +        from local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with patch("socket.getaddrinfo", return_value=_PUBLIC_DNS_RESPONSE):
    +            assert validate_url(url) is True, (
    +                f"Legitimate {category} URL {url!r} unexpectedly "
    +                f"rejected. This breaks an LDR user flow."
    +            )
    +
    +
    +class TestSecuritySentinelsStayBlocked:
    +    """Lock in that the SSRF fix continues to block known attack
    +    payloads. If any of these starts passing, the hardening has
    +    silently regressed."""
    +
    +    @pytest.mark.parametrize("url,category", REAL_WORLD_URLS_THAT_MUST_FAIL)
    +    def test_attack_url_blocked(self, url, category):
    +        from local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url(url) is False, (
    +            f"{category} attack URL {url!r} unexpectedly passed. "
    +            f"SSRF hardening has regressed."
    +        )
    +
    +
    +class TestBehaviorChangeLockIn:
    +    """Lock in deliberate behaviour changes from PR #3873 / #3882 so a
    +    future revert doesn't silently undo them."""
    +
    +    def test_validate_url_with_none_returns_false_not_raises(self):
    +        """PR #3873 changed ``validate_url(None)`` from raising
    +        ``TypeError`` to returning ``False``. Callers that depended on
    +        the exception would already be broken; lock in the new contract.
    +        """
    +        from local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url(None) is False
    +        assert validate_url(123) is False
    +        assert validate_url([]) is False
    +
    +    def test_validate_url_strips_surrounding_whitespace(self):
    +        """PR #3873 added ``url.strip()`` at the top so URLs pasted from
    +        clipboard with surrounding whitespace are accepted."""
    +        from local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with patch("socket.getaddrinfo", return_value=_PUBLIC_DNS_RESPONSE):
    +            assert validate_url("  https://example.com/  ") is True
    +            assert validate_url("\thttps://example.com/\n") is True
    +
    +    def test_validate_url_internal_whitespace_still_rejected(self):
    +        """Strip handles SURROUNDING whitespace; INTERIOR whitespace is
    +        still an RFC 3986 violation and Layer 1 rejects it."""
    +        from local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("https://example.com/ /path") is False
    +        assert validate_url("https://example.com/\tpath") is False
    +
    +    def test_redact_url_for_log_normalizes_to_origin(self):
    +        """Helper strips userinfo, path, query, AND fragment — leaving
    +        only ``scheme://host[:port]`` (the URL origin per RFC 6454)."""
    +        from local_deep_research.security.ssrf_validator import (
    +            redact_url_for_log,
    +        )
    +
    +        assert (
    +            redact_url_for_log("http://user:pass@example.com/p?q=1#f")
    +            == "http://example.com"
    +        )
    +
    +    def test_redact_url_for_log_preserves_port(self):
    +        from local_deep_research.security.ssrf_validator import (
    +            redact_url_for_log,
    +        )
    +
    +        assert (
    +            redact_url_for_log("http://example.com:8080/path")
    +            == "http://example.com:8080"
    +        )
    +
    +    def test_redact_url_for_log_handles_ipv6(self):
    +        from local_deep_research.security.ssrf_validator import (
    +            redact_url_for_log,
    +        )
    +
    +        assert redact_url_for_log("http://[::1]:8080/") == "http://[::1]:8080"
    +
    +    def test_idn_domain_auto_punycoded_via_urllib3(self):
    +        """urllib3 auto-Punycodes raw IDN before the ASCII guard. Lock
    +        in this behaviour — if a future urllib3 stops doing this, IDN
    +        URLs would silently break for users in Asia/Cyrillic regions."""
    +        from urllib3.util import parse_url
    +
    +        u = parse_url("http://例え.jp/")
    +        assert u.host == "xn--r8jz45g.jp", (
    +            "urllib3 changed its Punycode behaviour. IDN URLs may now "
    +            "break in LDR's SSRF validation — file an issue."
    +        )
    +
    +
    +class TestPerformance:
    +    """Sanity check: validate_url must be cheap. ~10k calls in a
    +    research session shouldn't add meaningful latency."""
    +
    +    def test_validate_url_under_5ms_per_call(self):
    +        """Generous 5ms-per-call budget absorbs noisy CI runners while
    +        still catching genuine regressions. Local measurement is ~63µs,
    +        so 5ms is ~80× headroom. A 100-URL research session at the
    +        threshold would add 500ms; a real regression that breached it
    +        would be worth investigating."""
    +        from local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        url = "https://api.openalex.org/works?filter=doi:10.1038/nature"
    +        with patch("socket.getaddrinfo", return_value=_PUBLIC_DNS_RESPONSE):
    +            t0 = time.perf_counter()
    +            for _ in range(1000):
    +                validate_url(url)
    +            elapsed = time.perf_counter() - t0
    +        per_call_us = elapsed * 1000  # 1000 calls -> µs per call
    +        assert per_call_us < 5000, (
    +            f"validate_url is too slow: {per_call_us:.1f}µs per call "
    +            f"(target: <5000µs). 100-URL research session would add "
    +            f"{per_call_us / 10:.1f}ms latency."
    +        )
    
4ae0041f630e

fix(security): harden SSRF metadata blocks and redact log userinfo (#3882)

https://github.com/LearningCircuit/local-deep-researchLearningCircuitMay 8, 2026via body-scan-shorthand
12 files changed · +304 54
  • changelog.d/+ssrf-metadata-block-log-hygiene.security.md+8 0 added
    @@ -0,0 +1,8 @@
    +Hardened SSRF defenses against AWS ECS task metadata
    +(`169.254.170.2`, `169.254.170.23`), Tencent Cloud (`169.254.0.23`),
    +and AlibabaCloud (`100.100.100.200`) metadata endpoints — these are
    +now always blocked alongside the existing AWS IMDS / Azure / OCI /
    +DigitalOcean entry (`169.254.169.254`). Redacted credentials, path,
    +and query from URL-rejection logs (operators with grep/regex tooling
    +on the rejection log lines will see authority-only `scheme://host:port`
    +instead of full URLs going forward).
    
  • docs/SearXNG-Setup.md+1 1 modified
    @@ -111,7 +111,7 @@ SearXNG is designed for self-hosting, so Local Deep Research allows SearXNG to a
     This is intentional and secure because:
     1. The SearXNG URL is **admin-configured**, not user input
     2. Private IPs are only accessible from your local network
    -3. The **AWS metadata endpoint** (169.254.169.254) is always blocked to prevent credential theft in cloud environments
    +3. **Cloud metadata endpoints** (AWS IMDS / ECS, Azure, OCI, DigitalOcean, AlibabaCloud, Tencent Cloud — see `ssrf_validator.ALWAYS_BLOCKED_METADATA_IPS`) are always blocked to prevent credential theft in cloud environments
     
     ## Troubleshooting
     
    
  • SECURITY.md+16 0 modified
    @@ -138,6 +138,22 @@ A reporter ([@Fushuling](https://github.com/Fushuling), [@RacerZ-fighting](https
     
     Both `ssrf_validator.validate_url` and `NotificationURLValidator.validate_service_url` (HTTP/HTTPS branch) carry the fix. Future edits to the SSRF path should preserve `RFC_FORBIDDEN_URL_CHARS_RE` and the `urllib3.util.parse_url` host extraction — reverting either reintroduces the bypass.
     
    +### Cloud Metadata Endpoint Block List
    +
    +`ssrf_validator.ALWAYS_BLOCKED_METADATA_IPS` is a frozenset of cloud-provider metadata IPs that are blocked under every flag combination, including `allow_localhost=True` and `allow_private_ips=True`. These IPs expose IAM / instance-role credentials and are never legitimate destinations for outbound HTTP. The current set is:
    +
    +| IP | Provider |
    +| --- | --- |
    +| `169.254.169.254` | AWS IMDSv1/v2, Azure, OCI, DigitalOcean (shared) |
    +| `169.254.170.2` | AWS ECS task metadata v3 |
    +| `169.254.170.23` | AWS ECS task metadata v4 |
    +| `169.254.0.23` | Tencent Cloud |
    +| `100.100.100.200` | AlibabaCloud |
    +
    +Future contributors must not remove entries from this set. Adding a new cloud provider's metadata IP is encouraged when a new public-cloud target appears.
    +
    +URL rejection log lines route through `ssrf_validator.redact_url_for_log` to drop userinfo (RFC 3986 §3.2.1 allows credentials in the URL), path, and query — operators see `scheme://host:port` only. Operators with grep/regex tooling on the rejection log lines will see authority-only strings instead of full URLs.
    +
     ## Supported Versions
     
     Security fixes are only provided for the latest release. Please upgrade to receive patches.
    
  • src/local_deep_research/security/notification_validator.py+7 4 modified
    @@ -15,7 +15,7 @@
     from urllib3.util import parse_url
     
     from .ip_ranges import PRIVATE_IP_RANGES as _PRIVATE_IP_RANGES
    -from .ssrf_validator import RFC_FORBIDDEN_URL_CHARS_RE
    +from .ssrf_validator import RFC_FORBIDDEN_URL_CHARS_RE, redact_url_for_log
     
     
     class NotificationURLValidationError(ValueError):
    @@ -211,14 +211,14 @@ def validate_service_url(
             # Check for blocked schemes
             if scheme in NotificationURLValidator.BLOCKED_SCHEMES:
                 logger.warning(
    -                f"Blocked unsafe notification protocol: {scheme} in URL: {url[:50]}..."
    +                f"Blocked unsafe notification protocol: {scheme} in URL: {redact_url_for_log(url)}"
                 )
                 return False, f"Blocked unsafe protocol: {scheme}"
     
             # Check for allowed schemes
             if scheme not in NotificationURLValidator.ALLOWED_SCHEMES:
                 logger.warning(
    -                f"Unknown notification protocol: {scheme} in URL: {url[:50]}..."
    +                f"Unknown notification protocol: {scheme} in URL: {redact_url_for_log(url)}"
                 )
                 return (
                     False,
    @@ -332,7 +332,10 @@ def validate_multiple_urls(
     
                 if not is_valid:
                     # Return first error found
    -                return False, f"Invalid URL '{url[:50]}...': {error_message}"
    +                return (
    +                    False,
    +                    f"Invalid URL '{redact_url_for_log(url)}': {error_message}",
    +                )
     
             # All URLs passed validation
             return True, None
    
  • src/local_deep_research/security/safe_requests.py+9 3 modified
    @@ -161,7 +161,9 @@ def safe_get(
                 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6
                 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services
                 like SearXNG or Ollama in containerized environments.
    -            Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked.
    +            Note: cloud metadata endpoints (AWS / Azure / OCI / DigitalOcean /
    +            AlibabaCloud / Tencent / ECS) are ALWAYS blocked — see
    +            ``ssrf_validator.ALWAYS_BLOCKED_METADATA_IPS``.
             **kwargs: Additional arguments to pass to requests.get()
     
         Returns:
    @@ -296,7 +298,9 @@ def safe_post(
                 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6
                 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services
                 like SearXNG or Ollama in containerized environments.
    -            Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked.
    +            Note: cloud metadata endpoints (AWS / Azure / OCI / DigitalOcean /
    +            AlibabaCloud / Tencent / ECS) are ALWAYS blocked — see
    +            ``ssrf_validator.ALWAYS_BLOCKED_METADATA_IPS``.
             **kwargs: Additional arguments to pass to requests.post()
     
         Returns:
    @@ -461,7 +465,9 @@ def __init__(
                     This includes RFC1918, CGNAT (100.64.x.x used by Podman), link-local, and
                     IPv6 private ranges. Use for trusted self-hosted services like SearXNG or
                     Ollama in containerized environments.
    -                Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked.
    +                Note: cloud metadata endpoints (AWS / Azure / OCI / DigitalOcean /
    +                AlibabaCloud / Tencent / ECS) are ALWAYS blocked — see
    +                ``ssrf_validator.ALWAYS_BLOCKED_METADATA_IPS``.
             """
             super().__init__()
             self.max_redirects = _MAX_REDIRECTS
    
  • src/local_deep_research/security/ssrf_validator.py+56 14 modified
    @@ -16,9 +16,19 @@
     
     from .ip_ranges import PRIVATE_IP_RANGES as BLOCKED_IP_RANGES
     
    -# AWS metadata endpoint (commonly targeted in SSRF attacks)
    -# nosec B104 - Hardcoded IP is intentional for SSRF prevention (blocking AWS metadata endpoint)
    -AWS_METADATA_IP = "169.254.169.254"
    +# Cloud-provider metadata endpoints — always blocked, even with
    +# allow_localhost=True or allow_private_ips=True. These IPs expose IAM /
    +# instance-role credentials and are never legitimate destinations.
    +# nosec B104 - Hardcoded IPs are intentional for SSRF prevention
    +ALWAYS_BLOCKED_METADATA_IPS = frozenset(
    +    {
    +        "169.254.169.254",  # AWS IMDSv1/v2, Azure, OCI, DigitalOcean
    +        "169.254.170.2",  # AWS ECS task metadata v3
    +        "169.254.170.23",  # AWS ECS task metadata v4
    +        "169.254.0.23",  # Tencent Cloud
    +        "100.100.100.200",  # AlibabaCloud
    +    }
    +)
     
     # Allowed URL schemes
     ALLOWED_SCHEMES = {"http", "https"}
    @@ -46,7 +56,9 @@ def is_ip_blocked(
                 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6
                 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services
                 like SearXNG or Ollama in containerized environments.
    -            Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked.
    +            Note: cloud metadata endpoints in ``ALWAYS_BLOCKED_METADATA_IPS``
    +            (AWS / Azure / OCI / DigitalOcean / AlibabaCloud / Tencent / ECS)
    +            are ALWAYS blocked regardless of these flags.
     
         Returns:
             True if IP is blocked, False otherwise
    @@ -71,7 +83,7 @@ def is_ip_blocked(
             ),  # CGNAT - used by Podman/rootless containers
             ipaddress.ip_network(
                 "169.254.0.0/16"
    -        ),  # Link-local (AWS metadata blocked separately)
    +        ),  # Link-local (cloud metadata IPs blocked separately via ALWAYS_BLOCKED_METADATA_IPS)
             # IPv6 Private Ranges
             ipaddress.ip_network("fc00::/7"),  # IPv6 Unique Local Addresses
             ipaddress.ip_network("fe80::/10"),  # IPv6 Link-Local
    @@ -85,8 +97,11 @@ def is_ip_blocked(
             if isinstance(ip, ipaddress.IPv6Address) and ip.ipv4_mapped:
                 ip = ip.ipv4_mapped
     
    -        # ALWAYS block AWS metadata endpoint - critical SSRF target for credential theft
    -        if str(ip) == AWS_METADATA_IP:
    +        # ALWAYS block cloud-metadata endpoints - critical SSRF target
    +        # for credential theft (AWS IMDS/ECS, Azure, OCI, DigitalOcean,
    +        # AlibabaCloud, Tencent Cloud). These are never legitimate
    +        # destinations regardless of allow_localhost / allow_private_ips.
    +        if str(ip) in ALWAYS_BLOCKED_METADATA_IPS:
                 return True
     
             # Check if IP is in any blocked range
    @@ -135,7 +150,9 @@ def validate_url(
                 used by Podman/rootless containers), link-local (169.254.x.x), and IPv6
                 private ranges (fc00::/7, fe80::/10). Use for trusted self-hosted services
                 like SearXNG or Ollama in containerized environments.
    -            Note: AWS metadata endpoint (169.254.169.254) is ALWAYS blocked.
    +            Note: cloud metadata endpoints in ``ALWAYS_BLOCKED_METADATA_IPS``
    +            (AWS / Azure / OCI / DigitalOcean / AlibabaCloud / Tencent / ECS)
    +            are ALWAYS blocked regardless of these flags.
     
         Returns:
             True if URL is safe, False otherwise
    @@ -157,7 +174,7 @@ def validate_url(
             # Check scheme
             if parsed.scheme.lower() not in ALLOWED_SCHEMES:
                 logger.warning(
    -                f"Blocked URL with invalid scheme: {parsed.scheme} - {url}"
    +                f"Blocked URL with invalid scheme: {parsed.scheme} - {redact_url_for_log(url)}"
                 )
                 return False
     
    @@ -189,7 +206,9 @@ def validate_url(
             if hostname:
                 hostname = hostname.rstrip(".")
             if not hostname:
    -            logger.warning(f"Blocked URL with no hostname: {url}")
    +            logger.warning(
    +                f"Blocked URL with no hostname: {redact_url_for_log(url)}"
    +            )
                 return False
     
             # Check if hostname is an IP address
    @@ -201,7 +220,7 @@ def validate_url(
                     allow_private_ips=allow_private_ips,
                 ):
                     logger.warning(
    -                    f"Blocked URL with internal/private IP: {hostname} - {url}"
    +                    f"Blocked URL with internal/private IP: {hostname} - {redact_url_for_log(url)}"
                     )
                     return False
             except ValueError:
    @@ -238,7 +257,7 @@ def validate_url(
                     ):
                         logger.warning(
                             f"Blocked URL - hostname {hostname} resolves to "
    -                        f"internal/private IP: {ip_str} - {url}"
    +                        f"internal/private IP: {ip_str} - {redact_url_for_log(url)}"
                         )
                         return False
     
    @@ -253,7 +272,7 @@ def validate_url(
             return True
     
         except Exception:
    -        logger.exception(f"Error validating URL {url}")
    +        logger.exception(f"Error validating URL {redact_url_for_log(url)}")
             return False
     
     
    @@ -276,5 +295,28 @@ def get_safe_url(
         if validate_url(url):
             return url
     
    -    logger.warning(f"Unsafe URL rejected: {url}")
    +    logger.warning(f"Unsafe URL rejected: {redact_url_for_log(url)}")
         return default
    +
    +
    +def redact_url_for_log(url: str) -> str:
    +    """Return ``scheme://host:port`` (no userinfo, path, query, fragment).
    +
    +    For log output only. Drops everything except scheme + authority host
    +    + port to minimise the chance of leaking credentials, tokens, or
    +    sensitive paths into logs while still giving operators enough to
    +    distinguish ``http://10.0.0.1:80`` from ``https://10.0.0.1:443``.
    +
    +    RFC 3986 §3.2.1 allows credentials in URL userinfo
    +    (``http://user:pass@host/``). A rejected URL is by definition
    +    adversarial-shaped, but it may still carry the operator's real
    +    credentials if a misconfiguration produced it.
    +    """
    +    try:
    +        u = parse_url(url)
    +        scheme = u.scheme or "?"
    +        host = u.host or "<no-host>"
    +        host_port = f"{host}:{u.port}" if u.port else host
    +        return f"{scheme}://{host_port}"
    +    except (LocationParseError, ValueError):
    +        return "<unparseable>"
    
  • src/local_deep_research/web/services/pdf_service.py+3 3 modified
    @@ -42,9 +42,9 @@ class UnsafePDFResourceURLError(ValueError):
     
     # Module-level URLFetcher preserves the allow_redirects=False posture that
     # default_url_fetcher hard-coded. Redirects disabled keeps the SSRF guard
    -# airtight — validate_url only inspects the initial URL, so a 30x to the
    -# AWS metadata endpoint (see ssrf_validator.AWS_METADATA_IP) would
    -# otherwise slip past.
    +# airtight — validate_url only inspects the initial URL, so a 30x to a
    +# cloud metadata endpoint (see ssrf_validator.ALWAYS_BLOCKED_METADATA_IPS)
    +# would otherwise slip past.
     _URL_FETCHER = (
         URLFetcher(allow_redirects=False) if WEASYPRINT_AVAILABLE else None
     )
    
  • tests/security/test_ip_ranges.py+17 0 modified
    @@ -97,6 +97,23 @@ def test_contains_ipv6_unique_local(self):
             unique_local = ipaddress.ip_network("fc00::/7")
             assert unique_local in PRIVATE_IP_RANGES
     
    +    def test_contains_ipv4_unspecified(self):
    +        """Should contain 0.0.0.0/8 ('this' network — IPv4 unspecified).
    +        Linux routes connect() to 0.0.0.0 to local host."""
    +        from local_deep_research.security.ip_ranges import PRIVATE_IP_RANGES
    +
    +        unspecified_v4 = ipaddress.ip_network("0.0.0.0/8")
    +        assert unspecified_v4 in PRIVATE_IP_RANGES
    +
    +    def test_contains_ipv6_unspecified(self):
    +        """Should contain ::/128 (IPv6 unspecified). Linux routes
    +        connect() to [::] to local host (same semantics as 0.0.0.0).
    +        Added in PR #3873 after empirical bypass discovery."""
    +        from local_deep_research.security.ip_ranges import PRIVATE_IP_RANGES
    +
    +        unspecified_v6 = ipaddress.ip_network("::/128")
    +        assert unspecified_v6 in PRIVATE_IP_RANGES
    +
     
     class TestPrivateIPDetection:
         """Tests for using PRIVATE_IP_RANGES to detect private IPs."""
    
  • tests/security/test_ssrf_validator_behavior.py+8 6 modified
    @@ -255,14 +255,16 @@ def test_https_in_allowed(self):
             assert "https" in ALLOWED_SCHEMES
     
     
    -class TestAWSMetadataIP:
    -    """Tests for AWS_METADATA_IP constant."""
    +class TestAlwaysBlockedMetadataIPs:
    +    """Tests for ALWAYS_BLOCKED_METADATA_IPS constant."""
     
    -    def test_aws_metadata_ip_value(self):
    -        """AWS_METADATA_IP has correct value."""
    -        from local_deep_research.security.ssrf_validator import AWS_METADATA_IP
    +    def test_aws_imds_in_always_blocked(self):
    +        """AWS / Azure / OCI / DigitalOcean shared IMDS IP is in the set."""
    +        from local_deep_research.security.ssrf_validator import (
    +            ALWAYS_BLOCKED_METADATA_IPS,
    +        )
     
    -        assert AWS_METADATA_IP == "169.254.169.254"
    +        assert "169.254.169.254" in ALWAYS_BLOCKED_METADATA_IPS
     
     
     class TestGetSafeUrl:
    
  • tests/security/test_ssrf_validator_extended.py+18 10 modified
    @@ -2,15 +2,15 @@
     Extended tests for the SSRF validator module.
     
     Provides comprehensive coverage of is_ip_blocked, get_safe_url, validate_url,
    -and module constants (AWS_METADATA_IP, ALLOWED_SCHEMES).
    +and module constants (ALWAYS_BLOCKED_METADATA_IPS, ALLOWED_SCHEMES).
     """
     
     from unittest.mock import patch
     
     import pytest
     
     from local_deep_research.security.ssrf_validator import (
    -    AWS_METADATA_IP,
    +    ALWAYS_BLOCKED_METADATA_IPS,
         ALLOWED_SCHEMES,
         get_safe_url,
         is_ip_blocked,
    @@ -23,16 +23,24 @@
     # ---------------------------------------------------------------------------
     
     
    -class TestAWSMetadataIPConstant:
    -    """Verify the AWS_METADATA_IP constant value."""
    +class TestAlwaysBlockedMetadataIPsConstant:
    +    """Verify the ALWAYS_BLOCKED_METADATA_IPS frozenset."""
     
    -    def test_aws_metadata_ip_is_correct_string(self):
    -        """AWS_METADATA_IP must be the well-known metadata endpoint."""
    -        assert AWS_METADATA_IP == "169.254.169.254"
    +    def test_contains_aws_imds(self):
    +        """AWS IMDS / Azure / OCI / DigitalOcean shared endpoint."""
    +        assert "169.254.169.254" in ALWAYS_BLOCKED_METADATA_IPS
     
    -    def test_aws_metadata_ip_type(self):
    -        """AWS_METADATA_IP must be a plain string."""
    -        assert isinstance(AWS_METADATA_IP, str)
    +    def test_contains_aws_ecs_v3_and_v4(self):
    +        assert "169.254.170.2" in ALWAYS_BLOCKED_METADATA_IPS
    +        assert "169.254.170.23" in ALWAYS_BLOCKED_METADATA_IPS
    +
    +    def test_contains_alibaba_and_tencent(self):
    +        assert "100.100.100.200" in ALWAYS_BLOCKED_METADATA_IPS
    +        assert "169.254.0.23" in ALWAYS_BLOCKED_METADATA_IPS
    +
    +    def test_is_frozenset(self):
    +        """Must be immutable so tests can't accidentally mutate it."""
    +        assert isinstance(ALWAYS_BLOCKED_METADATA_IPS, frozenset)
     
     
     class TestAllowedSchemesConstant:
    
  • tests/security/test_ssrf_validator_high_value.py+26 11 modified
    @@ -3,14 +3,16 @@
     import socket
     from unittest.mock import patch
     
    +import pytest
    +
     from local_deep_research.security import ssrf_validator
     
     from local_deep_research.security.ssrf_validator import (
         is_ip_blocked,
         validate_url,
         get_safe_url,
         ALLOWED_SCHEMES,
    -    AWS_METADATA_IP,
    +    ALWAYS_BLOCKED_METADATA_IPS,
     )
     
     
    @@ -85,17 +87,20 @@ def test_loopback_also_allowed_with_private_flag(self):
             assert is_ip_blocked("127.0.0.1", allow_private_ips=True) is False
     
     
    -class TestIsIpBlockedAWSMetadata:
    -    """AWS metadata endpoint always blocked."""
    +class TestIsIpBlockedCloudMetadata:
    +    """Cloud-provider metadata endpoints always blocked under all flags."""
     
    -    def test_aws_metadata_blocked_default(self):
    -        assert is_ip_blocked(AWS_METADATA_IP) is True
    +    @pytest.mark.parametrize("ip", sorted(ALWAYS_BLOCKED_METADATA_IPS))
    +    def test_metadata_ip_blocked_default(self, ip):
    +        assert is_ip_blocked(ip) is True
     
    -    def test_aws_metadata_blocked_even_with_allow_localhost(self):
    -        assert is_ip_blocked(AWS_METADATA_IP, allow_localhost=True) is True
    +    @pytest.mark.parametrize("ip", sorted(ALWAYS_BLOCKED_METADATA_IPS))
    +    def test_metadata_ip_blocked_with_allow_localhost(self, ip):
    +        assert is_ip_blocked(ip, allow_localhost=True) is True
     
    -    def test_aws_metadata_blocked_even_with_allow_private(self):
    -        assert is_ip_blocked(AWS_METADATA_IP, allow_private_ips=True) is True
    +    @pytest.mark.parametrize("ip", sorted(ALWAYS_BLOCKED_METADATA_IPS))
    +    def test_metadata_ip_blocked_with_allow_private_ips(self, ip):
    +        assert is_ip_blocked(ip, allow_private_ips=True) is True
     
     
     class TestIsIpBlockedIPv4Mapped:
    @@ -275,5 +280,15 @@ class TestConstants:
         def test_allowed_schemes_http_https(self):
             assert ALLOWED_SCHEMES == {"http", "https"}
     
    -    def test_aws_metadata_ip_value(self):
    -        assert AWS_METADATA_IP == "169.254.169.254"
    +    def test_always_blocked_metadata_ips_membership(self):
    +        """Lock in the exact membership of the always-blocked set so a
    +        future contributor accidentally removing an IP fails loudly."""
    +        assert ALWAYS_BLOCKED_METADATA_IPS == frozenset(
    +            {
    +                "169.254.169.254",
    +                "169.254.170.2",
    +                "169.254.170.23",
    +                "169.254.0.23",
    +                "100.100.100.200",
    +            }
    +        )
    
  • tests/security/test_ssrf_validator.py+135 2 modified
    @@ -10,10 +10,11 @@
     - allow_private_ips=True: Allow all private/internal IPs + localhost:
       - RFC1918: 10.x.x.x, 172.16-31.x.x, 192.168.x.x
       - CGNAT: 100.64.x.x (used by Podman/rootless containers)
    -  - Link-local: 169.254.x.x (except AWS metadata)
    +  - Link-local: 169.254.x.x (except cloud metadata endpoints)
       - IPv6 ULA: fc00::/7
       - IPv6 Link-local: fe80::/10
    -- AWS metadata endpoint (169.254.169.254) is ALWAYS blocked
    +- Cloud metadata endpoints (AWS IMDS / ECS, Azure, OCI, DigitalOcean,
    +  AlibabaCloud, Tencent — see ALWAYS_BLOCKED_METADATA_IPS) are ALWAYS blocked
     
     The allow_private_ips parameter is designed for trusted self-hosted services like
     SearXNG or Ollama that may be running in containerized environments (Docker, Podman)
    @@ -933,6 +934,138 @@ def test_aws_metadata_blocked_under_allow_private_ips(self):
             )
     
     
    +class TestAlwaysBlockedMetadataIPs:
    +    """Cloud-metadata IPs blocked under every flag combination."""
    +
    +    def test_metadata_ip_blocked_under_all_flags(self):
    +        """Every IP in the always-blocked set must be blocked under all
    +        allow-flag combinations."""
    +        from local_deep_research.security.ssrf_validator import (
    +            ALWAYS_BLOCKED_METADATA_IPS,
    +            is_ip_blocked,
    +        )
    +
    +        for ip in sorted(ALWAYS_BLOCKED_METADATA_IPS):
    +            assert is_ip_blocked(ip) is True
    +            assert is_ip_blocked(ip, allow_localhost=True) is True
    +            assert is_ip_blocked(ip, allow_private_ips=True) is True
    +
    +    def test_validate_url_blocks_all_metadata_ips_under_allow_private_ips(self):
    +        """Same coverage end-to-end through validate_url."""
    +        from local_deep_research.security.ssrf_validator import (
    +            ALWAYS_BLOCKED_METADATA_IPS,
    +            validate_url,
    +        )
    +
    +        for ip in sorted(ALWAYS_BLOCKED_METADATA_IPS):
    +            assert (
    +                validate_url(f"http://{ip}/", allow_private_ips=True) is False
    +            )
    +
    +    def test_dns_resolution_to_metadata_ip_blocked(self):
    +        """A hostname that resolves to a metadata IP must also be blocked
    +        even when allow_private_ips=True."""
    +        from local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with patch(
    +            "socket.getaddrinfo",
    +            return_value=[(2, 1, 6, "", ("169.254.170.2", 0))],
    +        ):
    +            assert (
    +                validate_url("http://attacker.example/", allow_private_ips=True)
    +                is False
    +            )
    +
    +
    +class TestRedactUrlForLog:
    +    """The redact_url_for_log helper used at all log sites."""
    +
    +    def test_strips_userinfo(self):
    +        from local_deep_research.security.ssrf_validator import (
    +            redact_url_for_log,
    +        )
    +
    +        assert (
    +            redact_url_for_log("http://user:secret@example.com/path?token=x")
    +            == "http://example.com"
    +        )
    +
    +    def test_strips_percent_encoded_password(self):
    +        from local_deep_research.security.ssrf_validator import (
    +            redact_url_for_log,
    +        )
    +
    +        assert (
    +            redact_url_for_log("http://u:p%40ss@example.com/")
    +            == "http://example.com"
    +        )
    +
    +    def test_keeps_port(self):
    +        from local_deep_research.security.ssrf_validator import (
    +            redact_url_for_log,
    +        )
    +
    +        assert (
    +            redact_url_for_log("http://example.com:8080/path")
    +            == "http://example.com:8080"
    +        )
    +
    +    def test_ipv6_host_keeps_brackets(self):
    +        from local_deep_research.security.ssrf_validator import (
    +            redact_url_for_log,
    +        )
    +
    +        assert redact_url_for_log("http://[::1]:8080/") == "http://[::1]:8080"
    +
    +    def test_no_scheme_uses_question_mark(self):
    +        """Scheme-relative URLs use '?' as the scheme sentinel."""
    +        from local_deep_research.security.ssrf_validator import (
    +            redact_url_for_log,
    +        )
    +
    +        # urllib3 may parse '//example.com/path' with scheme=None.
    +        result = redact_url_for_log("//example.com/path")
    +        assert result.startswith("?://") or result == "<unparseable>"
    +
    +    def test_unparseable_returns_sentinel(self):
    +        """urllib3 rejects malformed IPv6 brackets and out-of-range
    +        ports; helper falls back to <unparseable>."""
    +        from local_deep_research.security.ssrf_validator import (
    +            redact_url_for_log,
    +        )
    +
    +        assert redact_url_for_log("http://[::") == "<unparseable>"
    +        assert redact_url_for_log("http://1.2.3.4:99999") == "<unparseable>"
    +
    +    def test_validate_url_log_does_not_leak_userinfo(self, loguru_caplog):
    +        """End-to-end: validate_url's rejection log must not contain the
    +        password from the URL's userinfo. Also assert at least one log
    +        record was emitted, otherwise the not-in assertion is vacuously
    +        true and we'd have false confidence."""
    +        from local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        # Mock DNS to a public IP so the URL passes Layer 1+2 and reaches
    +        # the IP-block log site (which does log the URL).
    +        with (
    +            loguru_caplog.at_level("WARNING"),
    +            patch(
    +                "socket.getaddrinfo",
    +                return_value=[(2, 1, 6, "", ("127.0.0.1", 0))],
    +            ),
    +        ):
    +            validate_url("http://user:supersecret123@evilhost.example/")
    +        assert "supersecret123" not in loguru_caplog.text, (
    +            "Password leaked into log output"
    +        )
    +        # Anti-silent-pass: verify we actually did log (otherwise the
    +        # not-in assertion above is trivially true on empty text).
    +        assert len(loguru_caplog.records) > 0, "No log records emitted"
    +
    +
     class TestSchemeRejection:
         """Non-http(s) schemes must be rejected outright (not just the host check)."""
     
    
88e13ff718ed

fix(security): SSRF parser-differential bypass (GHSA-g23j-2vwm-5c25) (#3873)

https://github.com/LearningCircuit/local-deep-researchLearningCircuitMay 8, 2026via nvd-ref
9 files changed · +1139 14
  • changelog.d/+ssrf-parser-differential.security.md+6 0 added
    @@ -0,0 +1,6 @@
    +Fix SSRF parser-differential bypass (GHSA-g23j-2vwm-5c25). URLs containing
    +backslash, whitespace, or ASCII control bytes are now rejected upfront by the
    +SSRF validator and notification-URL validator; hostname extraction switched
    +from `urllib.parse.urlparse` to `urllib3.util.parse_url` so the validator and
    +the HTTP client agree on destination by construction. Credit: @Fushuling,
    +@RacerZ-fighting.
    
  • SECURITY.md+9 0 modified
    @@ -129,6 +129,15 @@ By setting this, the operator acknowledges the residual risk above. To minimise
     
     The same DNS-rebinding caveat applies to `safe_requests` / `ssrf_validator.validate_url`, used for general HTTP fetches (RAG sources, web scraping). Egress restriction is the primary defense for that path as well.
     
    +### Parser-Differential URL Bypass (GHSA-g23j-2vwm-5c25)
    +
    +A reporter ([@Fushuling](https://github.com/Fushuling), [@RacerZ-fighting](https://github.com/RacerZ-fighting)) demonstrated that Python's `urllib.parse.urlparse` and the `requests`/`urllib3` parser disagreed on URLs like `http://127.0.0.1\@1.1.1.1` — `urlparse` extracted `1.1.1.1` (passing the SSRF check) while `requests` connected to `127.0.0.1` (the actual destination). The fix has two layers:
    +
    +- **Layer 1 — input hygiene:** `RFC_FORBIDDEN_URL_CHARS_RE` in `ssrf_validator.py` rejects URLs containing backslash, ASCII control bytes, or whitespace. RFC 3986 forbids these characters in URLs, so legitimate fetches are unaffected.
    +- **Layer 2 — authoritative parser:** Hostname extraction now uses `urllib3.util.parse_url`, the same parser `requests` uses internally. Validator and HTTP client cannot disagree on destination by construction. This is the load-bearing defence on the `SafeSession.send` path, where `requests` has already canonicalised `\` to `%5C` during `.prepare()`.
    +
    +Both `ssrf_validator.validate_url` and `NotificationURLValidator.validate_service_url` (HTTP/HTTPS branch) carry the fix. Future edits to the SSRF path should preserve `RFC_FORBIDDEN_URL_CHARS_RE` and the `urllib3.util.parse_url` host extraction — reverting either reintroduces the bypass.
    +
     ## Supported Versions
     
     Security fixes are only provided for the latest release. Please upgrade to receive patches.
    
  • src/local_deep_research/security/ip_ranges.py+4 1 modified
    @@ -21,5 +21,8 @@
         ipaddress.ip_network("169.254.0.0/16"),  # Link-local
         ipaddress.ip_network("fe80::/10"),  # IPv6 link-local
         ipaddress.ip_network("fc00::/7"),  # IPv6 unique local
    -    ipaddress.ip_network("0.0.0.0/8"),  # "This" network
    +    ipaddress.ip_network("0.0.0.0/8"),  # "This" network (IPv4 unspecified)
    +    ipaddress.ip_network(
    +        "::/128"
    +    ),  # IPv6 unspecified — Linux routes connections to local host
     ]
    
  • src/local_deep_research/security/notification_validator.py+56 12 modified
    @@ -11,8 +11,11 @@
     from typing import Optional, Tuple
     from urllib.parse import urlparse
     from loguru import logger
    +from urllib3.exceptions import LocationParseError
    +from urllib3.util import parse_url
     
     from .ip_ranges import PRIVATE_IP_RANGES as _PRIVATE_IP_RANGES
    +from .ssrf_validator import RFC_FORBIDDEN_URL_CHARS_RE
     
     
     class NotificationURLValidationError(ValueError):
    @@ -174,9 +177,24 @@ def validate_service_url(
             if not url or not isinstance(url, str):
                 return False, "Service URL must be a non-empty string"
     
    -        # Strip whitespace
    +        # Strip whitespace (must run before the RFC-illegal char check
    +        # so legitimate URLs with surrounding whitespace are not rejected).
             url = url.strip()
     
    +        # Reject URLs containing characters that drive parser-differential
    +        # SSRF bypasses (backslash, whitespace, control bytes) — see
    +        # GHSA-g23j-2vwm-5c25. The URL is omitted from the log line because
    +        # userinfo (RFC 3986 §3.2.1) may contain credentials and rejected
    +        # URLs are by definition adversarial-shaped.
    +        if RFC_FORBIDDEN_URL_CHARS_RE.search(url):
    +            logger.warning(
    +                "Blocked notification URL containing RFC-illegal characters"
    +            )
    +            return (
    +                False,
    +                "URL contains characters that are not allowed (whitespace, backslash, or control bytes)",
    +            )
    +
             # Parse URL
             try:
                 parsed = urlparse(url)
    @@ -208,18 +226,44 @@ def validate_service_url(
                     f"Allowed: {', '.join(NotificationURLValidator.ALLOWED_SCHEMES[:5])}...",
                 )
     
    -        # For HTTP/HTTPS, check for private IPs (SSRF prevention)
    +        # For HTTP/HTTPS, check for private IPs (SSRF prevention).
    +        # Use urllib3 (the parser ``requests`` uses internally) instead of
    +        # urlparse for hostname extraction — urlparse is vulnerable to
    +        # parser-differential bypasses like ``http://127.0.0.1\@1.1.1.1``
    +        # (GHSA-g23j-2vwm-5c25). For non-HTTP schemes (Apprise transports
    +        # like discord://, slack://, mailto://) Apprise handles the URL
    +        # itself and the parser-differential doesn't apply.
             if scheme in ("http", "https") and not allow_private_ips:
    -            if parsed.hostname:
    -                if NotificationURLValidator._is_private_ip(parsed.hostname):
    -                    logger.warning(
    -                        f"Blocked private/internal IP in notification URL: "
    -                        f"{parsed.hostname}"
    -                    )
    -                    return (
    -                        False,
    -                        f"Blocked private/internal IP address: {parsed.hostname}",
    -                    )
    +            try:
    +                u3 = parse_url(url)
    +            except LocationParseError:
    +                logger.warning(
    +                    "Blocked notification URL: urllib3 parser rejected it"
    +                )
    +                return False, "Invalid URL format (parser rejected)"
    +            hostname = u3.host
    +            # Authority must be ASCII printable (forward-defence vs urllib3
    +            # ever loosening its IDN handling).
    +            if hostname and any(
    +                ord(c) < 0x20 or ord(c) > 0x7E for c in hostname
    +            ):
    +                logger.warning(
    +                    "Blocked notification URL with non-ASCII / control bytes in host"
    +                )
    +                return False, "URL host contains disallowed characters"
    +            if hostname and hostname.startswith("[") and hostname.endswith("]"):
    +                hostname = hostname[1:-1]
    +            if hostname:
    +                hostname = hostname.rstrip(".")
    +            if hostname and NotificationURLValidator._is_private_ip(hostname):
    +                logger.warning(
    +                    f"Blocked private/internal IP in notification URL: "
    +                    f"{hostname}"
    +                )
    +                return (
    +                    False,
    +                    f"Blocked private/internal IP address: {hostname}",
    +                )
     
             # Passed all security checks
             return True, None
    
  • src/local_deep_research/security/ssrf_validator.py+49 1 modified
    @@ -6,10 +6,13 @@
     """
     
     import ipaddress
    +import re
     import socket
     from urllib.parse import urlparse
     from typing import Optional
     from loguru import logger
    +from urllib3.exceptions import LocationParseError
    +from urllib3.util import parse_url
     
     from .ip_ranges import PRIVATE_IP_RANGES as BLOCKED_IP_RANGES
     
    @@ -20,6 +23,14 @@
     # Allowed URL schemes
     ALLOWED_SCHEMES = {"http", "https"}
     
    +# RFC 3986 forbids these characters in URLs; their presence in a URL signals
    +# a parser-differential attempt (GHSA-g23j-2vwm-5c25). \s covers space, \t,
    +# \n, \r, \v, \f. Backslash is the load-bearing payload — Python's urlparse
    +# treats it as a literal char while requests/urllib3 treat it as a path
    +# delimiter, so a crafted URL like ``http://127.0.0.1\@1.1.1.1`` would
    +# pass the urlparse-based hostname check but actually connect to 127.0.0.1.
    +RFC_FORBIDDEN_URL_CHARS_RE = re.compile(r"[\\\s\x00-\x1f\x7f]")
    +
     
     def is_ip_blocked(
         ip_str: str, allow_localhost: bool = False, allow_private_ips: bool = False
    @@ -129,7 +140,18 @@ def validate_url(
         Returns:
             True if URL is safe, False otherwise
         """
    +    if not isinstance(url, str):
    +        return False
         try:
    +        url = url.strip()
    +        # Layer 1: reject RFC-illegal characters that drive parser-differential
    +        # attacks (backslash, whitespace, control bytes). The URL is omitted
    +        # from this log line because userinfo (RFC 3986 §3.2.1) may contain
    +        # credentials and rejected URLs are by definition adversarial-shaped.
    +        if RFC_FORBIDDEN_URL_CHARS_RE.search(url):
    +            logger.warning("Blocked URL containing RFC-illegal characters")
    +            return False
    +
             parsed = urlparse(url)
     
             # Check scheme
    @@ -139,7 +161,33 @@ def validate_url(
                 )
                 return False
     
    -        hostname = parsed.hostname
    +        # Layer 2: extract host using urllib3, the same parser ``requests``
    +        # uses internally. ``urlparse`` and urllib3 disagree on URLs like
    +        # ``http://127.0.0.1\@1.1.1.1`` — urlparse says ``1.1.1.1``,
    +        # urllib3 says ``127.0.0.1``. Validating against urllib3 means the
    +        # validator and the HTTP client cannot disagree on destination.
    +        try:
    +            u3 = parse_url(url)
    +        except LocationParseError:
    +            logger.warning("Blocked URL: urllib3 parser rejected it")
    +            return False
    +        hostname = u3.host
    +        # Authority must be ASCII printable. urllib3 currently rejects
    +        # non-ASCII via LocationParseError, but this guard keeps us
    +        # independent of that staying constant — CVE-2019-9636 showed
    +        # Python's stdlib loosened a similar restriction previously.
    +        # Brackets/colon used in IPv6 hosts are within 0x20-0x7e, so this
    +        # runs cleanly before bracket-strip.
    +        if hostname and any(ord(c) < 0x20 or ord(c) > 0x7E for c in hostname):
    +            logger.warning("Blocked URL with non-ASCII / control bytes in host")
    +            return False
    +        # Strip IPv6 brackets so ipaddress.ip_address can parse the host.
    +        if hostname and hostname.startswith("[") and hostname.endswith("]"):
    +            hostname = hostname[1:-1]
    +        # rstrip(".") matches getaddrinfo behaviour — trailing dots are
    +        # ignored at resolution time.
    +        if hostname:
    +            hostname = hostname.rstrip(".")
             if not hostname:
                 logger.warning(f"Blocked URL with no hostname: {url}")
                 return False
    
  • tests/security/test_notification_validator.py+87 0 modified
    @@ -296,6 +296,93 @@ def test_whitespace_stripped(self):
             assert error is None
     
     
    +class TestParserDifferentialBypass:
    +    """
    +    Tests for the parser-differential SSRF bypass (GHSA-g23j-2vwm-5c25)
    +    in the notification flow.  The same bypass that affected
    +    ``ssrf_validator.validate_url`` also affected
    +    ``NotificationURLValidator.validate_service_url`` because both used
    +    ``urlparse(url).hostname`` for the SSRF check.
    +    """
    +
    +    def test_advisory_canonical_payload_blocked(self):
    +        is_valid, error = NotificationURLValidator.validate_service_url(
    +            "http://127.0.0.1:6666\\@1.1.1.1"
    +        )
    +        assert is_valid is False
    +        assert error is not None
    +
    +    def test_post_prepare_canonicalised_form_blocked(self):
    +        """Layer-2 verification on the notification flow."""
    +        is_valid, error = NotificationURLValidator.validate_service_url(
    +            "http://127.0.0.1:6666/%5C@1.1.1.1"
    +        )
    +        assert is_valid is False
    +        assert error is not None
    +        assert "127.0.0.1" in error  # Layer 2 reports the actual host
    +
    +    def test_backslash_no_port(self):
    +        is_valid, _ = NotificationURLValidator.validate_service_url(
    +            "http://127.0.0.1\\@1.1.1.1"
    +        )
    +        assert is_valid is False
    +
    +    def test_tab_in_url_blocked(self):
    +        is_valid, _ = NotificationURLValidator.validate_service_url(
    +            "https://example.com/path\there"
    +        )
    +        assert is_valid is False
    +
    +    def test_null_byte_blocked(self):
    +        is_valid, _ = NotificationURLValidator.validate_service_url(
    +            "http://127.0.0.1\x00@1.1.1.1"
    +        )
    +        assert is_valid is False
    +
    +    def test_apprise_discord_still_works(self):
    +        is_valid, error = NotificationURLValidator.validate_service_url(
    +            "discord://webhook_id/token"
    +        )
    +        assert is_valid is True
    +        assert error is None
    +
    +    def test_apprise_slack_still_works(self):
    +        is_valid, error = NotificationURLValidator.validate_service_url(
    +            "slack://TestApp@TokenA/TokenB/TokenC"
    +        )
    +        assert is_valid is True
    +        assert error is None
    +
    +    def test_apprise_mailto_with_credentials(self):
    +        is_valid, error = NotificationURLValidator.validate_service_url(
    +            "mailto://user:pass@smtp.gmail.com"
    +        )
    +        assert is_valid is True
    +        assert error is None
    +
    +    def test_ipv6_unspecified_blocked(self):
    +        """``::`` (and equivalent forms) routes to local host on Linux."""
    +        is_valid, _ = NotificationURLValidator.validate_service_url(
    +            "http://[::]/"
    +        )
    +        assert is_valid is False
    +
    +    def test_ipv6_unspecified_zero_form_blocked(self):
    +        """``0::`` bypasses the literal-string ``::`` allow-list at
    +        ``_is_private_ip`` — must be caught via the ip_address normalisation
    +        path against ``::/128`` in BLOCKED_IP_RANGES."""
    +        is_valid, _ = NotificationURLValidator.validate_service_url(
    +            "http://[0::]/"
    +        )
    +        assert is_valid is False
    +
    +    def test_ipv6_unspecified_full_form_blocked(self):
    +        is_valid, _ = NotificationURLValidator.validate_service_url(
    +            "http://[0:0:0:0:0:0:0:0]/"
    +        )
    +        assert is_valid is False
    +
    +
     class TestValidateServiceUrlStrict:
         """Tests for validate_service_url_strict static method."""
     
    
  • tests/security/test_safe_requests.py+80 0 modified
    @@ -704,3 +704,83 @@ def test_safe_post_preserves_explicit_user_agent(self):
     
                 _, kwargs = mock_post.call_args
                 assert kwargs["headers"]["User-Agent"] == custom_ua
    +
    +
    +class TestParserDifferentialEndToEnd:
    +    """
    +    End-to-end integration tests for the parser-differential SSRF bypass
    +    fix (GHSA-g23j-2vwm-5c25).
    +
    +    Approach: bind a TCP socket on 127.0.0.1:<random> WITHOUT calling
    +    listen() — the kernel responds RST to any incoming connect.  If the
    +    fix regresses and ``safe_get`` actually attempts to connect to the
    +    bound port, ``requests`` raises ``ConnectionError`` (kernel RST), so
    +    a strict ``pytest.raises(ValueError, match=...)`` distinguishes
    +    "validator caught it" from "validator failed and the kernel saved us".
    +    """
    +
    +    @staticmethod
    +    def _bind_unused_port():
    +        import socket as _socket
    +
    +        sock = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
    +        sock.bind(("127.0.0.1", 0))
    +        return sock
    +
    +    def test_safe_get_blocks_parser_differential_no_socket_connect(self):
    +        from local_deep_research.security.safe_requests import safe_get
    +
    +        sock = self._bind_unused_port()
    +        try:
    +            port = sock.getsockname()[1]
    +            bypass_url = f"http://127.0.0.1:{port}\\@1.1.1.1"
    +            with pytest.raises(ValueError, match="SSRF|security validation"):
    +                safe_get(bypass_url, timeout=2)
    +        finally:
    +            sock.close()
    +
    +    def test_safe_post_blocks_parser_differential(self):
    +        from local_deep_research.security.safe_requests import safe_post
    +
    +        sock = self._bind_unused_port()
    +        try:
    +            port = sock.getsockname()[1]
    +            bypass_url = f"http://127.0.0.1:{port}\\@1.1.1.1"
    +            with pytest.raises(ValueError, match="SSRF|security validation"):
    +                safe_post(bypass_url, data={"k": "v"}, timeout=2)
    +        finally:
    +            sock.close()
    +
    +    def test_safesession_blocks_parser_differential(self):
    +        """SafeSession validates at both request() and send() — exercises
    +        the double-validation path. This URL contains ``\\`` so Layer 1
    +        catches it at request() before .prepare() canonicalises it."""
    +        from local_deep_research.security.safe_requests import SafeSession
    +
    +        sock = self._bind_unused_port()
    +        try:
    +            port = sock.getsockname()[1]
    +            bypass_url = f"http://127.0.0.1:{port}\\@1.1.1.1"
    +            with SafeSession() as sess:
    +                with pytest.raises(
    +                    ValueError, match="SSRF|security validation"
    +                ):
    +                    sess.get(bypass_url, timeout=2)
    +        finally:
    +            sock.close()
    +
    +    def test_safesession_send_blocks_canonicalised_form(self):
    +        """
    +        Layer-2 verification: ``SafeSession.send()`` is called with a
    +        ``PreparedRequest`` whose URL contains ``%5C`` (the canonicalised
    +        form of ``\\``).  Layer 1 doesn't match ``%5C``, so Layer 2's
    +        urllib3-based hostname extraction is what blocks this — proving
    +        Layer 2 carries the load on this path.
    +        """
    +        from local_deep_research.security.safe_requests import SafeSession
    +
    +        with SafeSession() as sess:
    +            req = requests.Request("GET", "http://127.0.0.1:6666/%5C@1.1.1.1")
    +            prepared = sess.prepare_request(req)
    +            with pytest.raises(ValueError, match="SSRF|security validation"):
    +                sess.send(prepared, timeout=2)
    
  • tests/security/test_ssrf_redirect_bypass.py+57 0 modified
    @@ -908,3 +908,60 @@ def test_300_not_followed(self, mock_validate_url):
                 result = safe_get("https://example.com", allow_redirects=True)
                 assert result.status_code == 300
                 assert mock_get.call_count == 1
    +
    +
    +class TestRedirectParserDifferentialBypass:
    +    """
    +    Redirect-path coverage of the parser-differential SSRF fix
    +    (GHSA-g23j-2vwm-5c25). The redirect handler in ``safe_get`` calls
    +    ``ssrf_validator.validate_url`` on each ``Location`` header, so the
    +    fix propagates to redirects automatically. These tests lock that in.
    +    """
    +
    +    def test_redirect_to_backslash_bypass_blocked(self):
    +        """Initial URL is fine; Location: header has the parser-differential
    +        payload — must be blocked by validate_url on hop 2."""
    +        # Don't mock validate_url here — exercise the real validator.
    +        redirect_resp = _make_response(
    +            302,
    +            {"Location": "http://127.0.0.1:6666\\@1.1.1.1"},
    +            "https://example.com",
    +        )
    +        final_resp = _make_response(200)
    +
    +        # Mock DNS for the initial URL validation only.
    +        with (
    +            patch(
    +                "socket.getaddrinfo",
    +                return_value=[(2, 1, 6, "", ("93.184.216.34", 0))],
    +            ),
    +            patch(
    +                "local_deep_research.security.safe_requests.requests.get",
    +                side_effect=[redirect_resp, final_resp],
    +            ),
    +        ):
    +            with pytest.raises(ValueError, match="Redirect target failed SSRF"):
    +                safe_get("https://example.com", allow_redirects=True)
    +
    +    def test_redirect_to_canonicalised_percent5c_blocked(self):
    +        """Location: with the post-prepare ``%5C`` form — Layer-2 verifies
    +        the urllib3-based hostname extraction blocks the redirect target."""
    +        redirect_resp = _make_response(
    +            302,
    +            {"Location": "http://127.0.0.1:6666/%5C@1.1.1.1"},
    +            "https://example.com",
    +        )
    +        final_resp = _make_response(200)
    +
    +        with (
    +            patch(
    +                "socket.getaddrinfo",
    +                return_value=[(2, 1, 6, "", ("93.184.216.34", 0))],
    +            ),
    +            patch(
    +                "local_deep_research.security.safe_requests.requests.get",
    +                side_effect=[redirect_resp, final_resp],
    +            ),
    +        ):
    +            with pytest.raises(ValueError, match="Redirect target failed SSRF"):
    +                safe_get("https://example.com", allow_redirects=True)
    
  • tests/security/test_ssrf_validator.py+791 0 modified
    @@ -20,6 +20,8 @@
     or on a different machine on the local network.
     """
     
    +import socket
    +
     import pytest
     from unittest.mock import patch
     
    @@ -450,6 +452,795 @@ def test_docker_bridge_network(self):
             )
     
     
    +class TestParserDifferentialBypass:
    +    """
    +    Tests for the parser-differential SSRF bypass (GHSA-g23j-2vwm-5c25).
    +
    +    Python's ``urllib.parse.urlparse`` and the ``requests``/``urllib3``
    +    parser disagree on URLs that contain a backslash before the userinfo
    +    ``@``.  ``urlparse`` treats ``\\`` as a literal char and ``@`` as the
    +    userinfo separator (so it extracts the post-``@`` host); ``requests``
    +    treats ``\\`` as a path delimiter and connects to the pre-``\\`` host.
    +
    +    A pre-fix ``validate_url`` based on ``urlparse(url).hostname`` would
    +    pass URLs like ``http://127.0.0.1\\@1.1.1.1`` (it sees ``1.1.1.1``)
    +    while ``requests.get(url)`` would actually connect to ``127.0.0.1``.
    +    The fix combines a Layer-1 reject of RFC-illegal characters with a
    +    Layer-2 swap to ``urllib3.util.parse_url`` for hostname extraction.
    +    """
    +
    +    def test_advisory_canonical_payload(self):
    +        """The exact PoC from the advisory must be rejected."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://127.0.0.1:6666\\@1.1.1.1") is False
    +
    +    def test_backslash_no_port(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://127.0.0.1\\@1.1.1.1") is False
    +
    +    def test_double_backslash(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://127.0.0.1\\\\@1.1.1.1") is False
    +
    +    def test_slash_then_backslash(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://127.0.0.1/\\@1.1.1.1") is False
    +
    +    def test_tab_at_seam(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://127.0.0.1\t@1.1.1.1") is False
    +
    +    def test_carriage_return_at_seam(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://127.0.0.1\r@1.1.1.1") is False
    +
    +    def test_newline_at_seam(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://127.0.0.1\n@1.1.1.1") is False
    +
    +    def test_space_at_seam(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://127.0.0.1 @1.1.1.1") is False
    +
    +    def test_ipv6_loopback_with_backslash(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[::1]\\@1.1.1.1") is False
    +
    +    def test_ipv4_mapped_ipv6_with_backslash(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[::ffff:127.0.0.1]\\@1.1.1.1") is False
    +
    +    def test_backslash_with_trailing_port(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://127.0.0.1\\@1.1.1.1:80") is False
    +
    +    def test_trailing_dot_loopback_with_backslash(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://127.0.0.1.\\@1.1.1.1") is False
    +
    +    def test_null_byte_in_userinfo(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://127.0.0.1\x00@1.1.1.1") is False
    +
    +    def test_idn_unicode_host_rejected(self):
    +        """IDN/Unicode hosts are rejected by urllib3 / ASCII guard."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        # Circled-digit homoglyphs of '127' resolve via NFKC to '127' on
    +        # some libcs.  urllib3 currently rejects these via
    +        # LocationParseError; the ASCII-printable guard backs that up.
    +        assert validate_url("http://①②⑦.0.0.1/") is False
    +
    +    def test_octal_ip_resolves_to_loopback(self):
    +        """Octal IP form '0177.0.0.1' resolves to 127.0.0.1 via getaddrinfo."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with patch(
    +            "socket.getaddrinfo",
    +            return_value=[(2, 1, 6, "", ("127.0.0.1", 0))],
    +        ):
    +            assert validate_url("http://0177.0.0.1/") is False
    +
    +    def test_decimal_int_ip_resolves_to_loopback(self):
    +        """Decimal-int IP form '2130706433' resolves to 127.0.0.1."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with patch(
    +            "socket.getaddrinfo",
    +            return_value=[(2, 1, 6, "", ("127.0.0.1", 0))],
    +        ):
    +            assert validate_url("http://2130706433/") is False
    +
    +    def test_post_prepare_canonicalised_form(self):
    +        """
    +        Layer-2 verification: when ``requests.PreparedRequest.url``
    +        canonicalises ``\\`` to ``%5C``, the urllib3-based hostname
    +        extraction still returns ``127.0.0.1`` so the IP check fires.
    +        Layer 1 doesn't match ``%5C`` (it's three printable ASCII chars);
    +        Layer 2 is the load-bearing defence on the SafeSession.send path.
    +        """
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://127.0.0.1:6666/%5C@1.1.1.1") is False
    +
    +    def test_backslash_deep_in_path(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://example.com/path\\@1.1.1.1") is False
    +
    +    def test_backslash_in_userinfo_password(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://user:pass\\@127.0.0.1/") is False
    +
    +    def test_backslash_with_port_on_trailing_host(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://127.0.0.1\\@evil.com:8080") is False
    +
    +    def test_interior_whitespace_at_seam(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://127.0.0.1 \\@1.1.1.1") is False
    +
    +    def test_ipv6_unspecified_blocked(self):
    +        """``::`` is the IPv6 unspecified address — Linux routes
    +        connections to ``[::]:port`` to a service bound on ``[::1]:port``,
    +        so it must be blocked alongside ``0.0.0.0`` (the IPv4 equivalent,
    +        already covered via 0.0.0.0/8 in BLOCKED_IP_RANGES)."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[::]/") is False
    +
    +    def test_ipv6_unspecified_zero_form_blocked(self):
    +        """Equivalent representation ``0::`` — must normalise to ``::``
    +        before the IP-range check or this bypasses the literal-string
    +        allow-list in notification_validator."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[0::]/") is False
    +
    +    def test_ipv6_unspecified_full_form_blocked(self):
    +        """Equivalent representation ``0:0:0:0:0:0:0:0``."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[0:0:0:0:0:0:0:0]/") is False
    +
    +
    +class TestDnsResolvedBypass:
    +    """
    +    The validator's load-bearing path for hostnames (not IP literals) is:
    +    1. ``ipaddress.ip_address(hostname)`` raises ``ValueError`` (not an IP)
    +    2. ``socket.getaddrinfo(hostname, ...)`` resolves to one or more IPs
    +    3. Each resolved IP is checked against ``BLOCKED_IP_RANGES``
    +
    +    These tests exercise step 3 directly by mocking ``getaddrinfo``.
    +    """
    +
    +    def test_hostname_resolving_to_loopback_blocked(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with patch(
    +            "socket.getaddrinfo",
    +            return_value=[(2, 1, 6, "", ("127.0.0.1", 0))],
    +        ):
    +            assert validate_url("http://attacker.example.com/") is False
    +
    +    def test_hostname_resolving_to_rfc1918_blocked(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with patch(
    +            "socket.getaddrinfo",
    +            return_value=[(2, 1, 6, "", ("10.0.0.5", 0))],
    +        ):
    +            assert validate_url("http://attacker.example.com/") is False
    +
    +    def test_hostname_resolving_to_link_local_blocked(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with patch(
    +            "socket.getaddrinfo",
    +            return_value=[(2, 1, 6, "", ("169.254.1.1", 0))],
    +        ):
    +            assert validate_url("http://attacker.example.com/") is False
    +
    +    def test_hostname_resolving_to_aws_metadata_blocked(self):
    +        """Hardcoded AWS metadata block fires even with allow_private_ips."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with patch(
    +            "socket.getaddrinfo",
    +            return_value=[(2, 1, 6, "", ("169.254.169.254", 0))],
    +        ):
    +            # Even with the most permissive flag, AWS metadata stays blocked.
    +            assert (
    +                validate_url(
    +                    "http://attacker.example.com/", allow_private_ips=True
    +                )
    +                is False
    +            )
    +
    +    def test_multiple_resolved_ips_one_private_blocks(self):
    +        """
    +        DNS returning a public IP first then a private IP must still block.
    +        Round-robin / multi-A-record DNS could otherwise be a bypass.
    +        """
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with patch(
    +            "socket.getaddrinfo",
    +            return_value=[
    +                (2, 1, 6, "", ("93.184.216.34", 0)),  # public
    +                (2, 1, 6, "", ("127.0.0.1", 0)),  # private — must block
    +            ],
    +        ):
    +            assert validate_url("http://attacker.example.com/") is False
    +
    +    def test_dns_resolution_failure_fails_closed(self):
    +        """``getaddrinfo`` raising ``gaierror`` must return False (not allow)."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with patch("socket.getaddrinfo", side_effect=socket.gaierror()):
    +            assert validate_url("http://nonexistent.invalid/") is False
    +
    +    def test_ipv6_dns_resolution_to_loopback_blocked(self):
    +        """Hostname resolving to IPv6 ``::1`` must be blocked."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with patch(
    +            "socket.getaddrinfo",
    +            return_value=[(10, 1, 6, "", ("::1", 0, 0, 0))],
    +        ):
    +            assert validate_url("http://attacker.example.com/") is False
    +
    +    def test_dns_resolves_to_ipv4_mapped_ipv6_loopback_blocked(self):
    +        """
    +        Hostname resolving to ``::ffff:127.0.0.1`` (IPv4-mapped IPv6) must
    +        be blocked — exercises the IPv4-mapped unwrap in is_ip_blocked.
    +        """
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with patch(
    +            "socket.getaddrinfo",
    +            return_value=[(10, 1, 6, "", ("::ffff:127.0.0.1", 0, 0, 0))],
    +        ):
    +            assert validate_url("http://attacker.example.com/") is False
    +
    +
    +class TestAlternateIpFormsBlocked:
    +    """
    +    Alternate textual representations of private IPv4/IPv6 addresses.
    +    On Linux ``getaddrinfo`` accepts most of these and resolves them to
    +    the canonical form, which the IP check then catches.
    +    """
    +
    +    def test_octal_loopback_blocked(self):
    +        """``0177.0.0.1`` → ``127.0.0.1`` via getaddrinfo on Linux."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with patch(
    +            "socket.getaddrinfo",
    +            return_value=[(2, 1, 6, "", ("127.0.0.1", 0))],
    +        ):
    +            assert validate_url("http://0177.0.0.1/") is False
    +
    +    def test_decimal_int_loopback_blocked(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with patch(
    +            "socket.getaddrinfo",
    +            return_value=[(2, 1, 6, "", ("127.0.0.1", 0))],
    +        ):
    +            assert validate_url("http://2130706433/") is False
    +
    +    def test_short_ipv4_form_loopback_blocked(self):
    +        """``127.1`` (short form) → ``127.0.0.1`` via getaddrinfo on Linux."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with patch(
    +            "socket.getaddrinfo",
    +            return_value=[(2, 1, 6, "", ("127.0.0.1", 0))],
    +        ):
    +            assert validate_url("http://127.1/") is False
    +
    +    def test_ipv4_mapped_ipv6_loopback_literal_blocked(self):
    +        """``[::ffff:127.0.0.1]`` is an IPv4-mapped IPv6 of loopback."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[::ffff:127.0.0.1]/") is False
    +
    +    def test_ipv4_mapped_ipv6_rfc1918_literal_blocked(self):
    +        """``[::ffff:10.0.0.1]`` — IPv4-mapped IPv6 of RFC1918."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[::ffff:10.0.0.1]/") is False
    +
    +    def test_ipv4_mapped_ipv6_aws_metadata_literal_blocked(self):
    +        """``[::ffff:169.254.169.254]`` — AWS metadata via mapped IPv6."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[::ffff:169.254.169.254]/") is False
    +
    +
    +class TestAllowFlagMatrix:
    +    """
    +    Verify ``allow_localhost`` / ``allow_private_ips`` flag combinations
    +    against the new ``::/128`` blocklist entry, and confirm the AWS
    +    metadata hardcoded block holds under all flag combinations.
    +    """
    +
    +    def test_loopback_default_blocked(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://127.0.0.1/") is False
    +
    +    def test_loopback_with_allow_localhost(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://127.0.0.1/", allow_localhost=True) is True
    +
    +    def test_loopback_with_allow_private_ips(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://127.0.0.1/", allow_private_ips=True) is True
    +
    +    def test_ipv6_loopback_with_allow_localhost(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[::1]/", allow_localhost=True) is True
    +
    +    def test_ipv6_unspecified_blocked_even_with_allow_localhost(self):
    +        """
    +        ``::`` is the unspecified address, NOT the loopback address.
    +        Linux happens to route it to local services, but conceptually
    +        ``::`` is "any address" — distinct from ``::1``.
    +        ``allow_localhost`` is therefore conservatively scoped to
    +        ``::1`` and ``127.0.0.0/8`` and does NOT permit ``::``.
    +        """
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[::]/", allow_localhost=True) is False
    +
    +    def test_ipv6_unspecified_blocked_even_with_allow_private_ips(self):
    +        """Same reasoning: ``::`` is not in any allowed-range carve-out."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[::]/", allow_private_ips=True) is False
    +
    +    def test_aws_metadata_blocked_under_allow_localhost(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert (
    +            validate_url("http://169.254.169.254/", allow_localhost=True)
    +            is False
    +        )
    +
    +    def test_aws_metadata_blocked_under_allow_private_ips(self):
    +        """
    +        Codebase comments call this out as ALWAYS blocked. Locks in that
    +        the most permissive flag still doesn't reach AWS metadata.
    +        """
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert (
    +            validate_url("http://169.254.169.254/", allow_private_ips=True)
    +            is False
    +        )
    +
    +
    +class TestSchemeRejection:
    +    """Non-http(s) schemes must be rejected outright (not just the host check)."""
    +
    +    def test_file_scheme_rejected(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("file:///etc/passwd") is False
    +
    +    def test_ftp_scheme_rejected(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("ftp://example.com/") is False
    +
    +    def test_gopher_scheme_rejected(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("gopher://example.com/") is False
    +
    +    def test_dict_scheme_rejected(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("dict://example.com:11211/stat") is False
    +
    +    def test_no_scheme_rejected(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("127.0.0.1") is False
    +
    +    def test_scheme_relative_url_rejected(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("//127.0.0.1/") is False
    +
    +    def test_uppercase_https_scheme_accepted(self):
    +        """Schemes are case-insensitive per RFC 3986."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with patch(
    +            "socket.getaddrinfo",
    +            return_value=[(2, 1, 6, "", ("93.184.216.34", 0))],
    +        ):
    +            assert validate_url("HTTPS://example.com/") is True
    +
    +
    +class TestNeverRaises:
    +    """
    +    Property-based-style robustness: ``validate_url`` is a security
    +    boundary that takes untrusted input. It must NEVER raise — only
    +    return ``True``/``False``. A crash here is a DoS vector.
    +    """
    +
    +    @pytest.mark.parametrize(
    +        "weird_input",
    +        [
    +            "",
    +            " ",
    +            "\x00",
    +            "\x00" * 100,
    +            ":",
    +            "::",
    +            "://",
    +            "http",
    +            "http:",
    +            "http:/",
    +            "http://",
    +            "http:// ",
    +            "http://[",
    +            "http://[::",
    +            "http://]",
    +            "http://@",
    +            "http://@@@",
    +            "http://:@",
    +            "http://:80",
    +            "http://:0",
    +            "http://example.com:99999999",  # overflow port
    +            "http://example.com:-1",  # negative port
    +            "http://%00",
    +            "http://%2F%2F",
    +            "h" * 10_000,
    +            "http://" + "a" * 100_000,  # huge URL
    +            "http://" + "[" * 100,
    +            "http://." + ("a." * 1000) + "com",
    +            "http://example.com/" + "?" * 1000,
    +            "\udcff",  # lone surrogate (Python str-only, raises on encode)
    +            "http://\udcff/",
    +        ],
    +    )
    +    def test_pathological_input_returns_bool(self, weird_input):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        result = validate_url(weird_input)
    +        assert isinstance(result, bool)
    +
    +
    +class TestOutOfScopeBehaviorLockedIn:
    +    """
    +    Behaviours documented as out-of-scope (filed as separate hardening
    +    issues, not a bypass of GHSA-g23j-2vwm-5c25). Tests here lock in the
    +    *current* behaviour so the gap is visible — if we later harden these,
    +    these tests should flip and be moved into the bypass class.
    +    """
    +
    +    def test_6to4_wrapped_loopback_currently_passes(self):
    +        """
    +        ``[2002:7f00:1::]`` is the 6to4 wrap of ``127.0.0.1``. On hosts
    +        with kernel sit0/6to4 routes configured, this routes to ``127.0.0.1``.
    +        Default Linux has no such route, so this is not exploitable in
    +        the default configuration — but the validator does not catch it.
    +        Filed separately.
    +        """
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        # Document current behaviour: passes.  If we ever add the IPv6
    +        # transition prefixes to BLOCKED_IP_RANGES, flip this assertion.
    +        assert validate_url("http://[2002:7f00:1::]/") is True
    +
    +    def test_nat64_wrapped_loopback_currently_passes(self):
    +        """``[64:ff9b::7f00:1]`` is the NAT64 wrap of ``127.0.0.1``."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[64:ff9b::7f00:1]/") is True
    +
    +
    +class TestValidateUrlEdgeCases:
    +    """Robustness: validate_url must never raise, only return bool."""
    +
    +    def test_empty_string_returns_false(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("") is False
    +
    +    def test_whitespace_only_returns_false(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("   ") is False
    +
    +    def test_tab_newline_only_returns_false(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("\t\n") is False
    +
    +    def test_none_returns_false(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url(None) is False
    +
    +    def test_int_returns_false(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url(123) is False
    +
    +    def test_malformed_ipv6_no_crash(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        assert validate_url("http://[::") is False
    +
    +    def test_extremely_long_url_no_crash(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        # Should reject (no DNS) but most importantly must not crash or
    +        # consume excessive memory.
    +        with patch(
    +            "socket.getaddrinfo", side_effect=__import__("socket").gaierror()
    +        ):
    +            assert validate_url("http://" + "a" * 100_000) is False
    +
    +
    +class TestLegitimateUrlsStillPass:
    +    """Anti-regression: ensure the fix doesn't reject RFC-legal URLs."""
    +
    +    @staticmethod
    +    def _public_dns_mock():
    +        # 93.184.216.34 is the documented IP for example.com (RFC-2606).
    +        return patch(
    +            "socket.getaddrinfo",
    +            return_value=[(2, 1, 6, "", ("93.184.216.34", 0))],
    +        )
    +
    +    def test_simple_http_url(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with self._public_dns_mock():
    +            assert validate_url("http://example.com/") is True
    +
    +    def test_explicit_port(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with self._public_dns_mock():
    +            assert validate_url("http://example.com:8080/") is True
    +
    +    def test_userinfo_is_rfc_legal(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with self._public_dns_mock():
    +            assert validate_url("http://user:pass@example.com/") is True
    +
    +    def test_userinfo_with_port(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with self._public_dns_mock():
    +            assert validate_url("http://user:pass@example.com:8080/") is True
    +
    +    def test_trailing_dot_hostname(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with self._public_dns_mock():
    +            assert validate_url("http://example.com./") is True
    +
    +    def test_path_query_fragment(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with self._public_dns_mock():
    +            assert validate_url("http://example.com/path?q=1#frag") is True
    +
    +    def test_plus_in_query_string(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with self._public_dns_mock():
    +            assert validate_url("http://example.com/?q=foo+bar") is True
    +
    +    def test_encoded_backslash_in_path_is_rfc_legal(self):
    +        """%5C in a PATH (not a host bypass) is RFC-legal and must pass."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with self._public_dns_mock():
    +            assert validate_url("http://example.com/path%5Cfile") is True
    +
    +    def test_encoded_space_in_path(self):
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with self._public_dns_mock():
    +            assert (
    +                validate_url("http://example.com/path%20with%20encoded%20space")
    +                is True
    +            )
    +
    +    def test_uppercase_hostname_case_folded(self):
    +        """Locks in case-folding parity between urlparse and urllib3."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        with self._public_dns_mock():
    +            assert validate_url("http://EXAMPLE.COM/") is True
    +
    +    def test_ipv6_public(self):
    +        """IPv6 hosts unwrap from brackets and check correctly."""
    +        from src.local_deep_research.security.ssrf_validator import (
    +            validate_url,
    +        )
    +
    +        # 2001:db8::1 is the documentation prefix; not in any blocked
    +        # range, so this should pass.
    +        assert validate_url("http://[2001:db8::1]/") is True
    +
    +
     class TestDocumentation:
         """Documentation tests explaining the security model."""
     
    

Vulnerability mechanics

Root cause

"Parser differential between urllib.parse.urlparse and urllib3/requests allows SSRF bypass via backslash in the URL authority section."

Attack vector

An attacker crafts a URL such as `http://127.0.0.1\@1.1.1.1` where Python's `urllib.parse.urlparse` extracts the host as `1.1.1.1` (passing the SSRF check) while `requests`/`urllib3` interprets the backslash as a path delimiter and connects to `127.0.0.1` [ref_id=1]. The attacker must have network access to the Local Deep Research instance and the ability to supply URLs for the application to fetch (e.g., via research queries or notification URLs). No authentication is required beyond normal application access (CVSS PR:L). The payload can also use ASCII control bytes, whitespace, or the percent-encoded canonical form `%5C` after `requests.PreparedRequest` processing [patch_id=2974399].

Affected code

The vulnerable code is in `src/local_deep_research/security/ssrf_validator.py` in the `validate_url` function, and in `src/local_deep_research/security/notification_validator.py` in `NotificationURLValidator.validate_service_url` [patch_id=2974399][patch_id=2974400]. Both functions previously used `urllib.parse.urlparse` for host extraction while the actual HTTP request used `urllib3`/`requests`, creating the parser differential [ref_id=1].

What the fix does

The fix applies two layers [patch_id=2974399]. Layer 1 rejects URLs containing backslash, ASCII control bytes, or whitespace upfront via an RFC 3986 forbidden-characters regex. Layer 2 replaces `urllib.parse.urlparse` with `urllib3.util.parse_url` for hostname extraction, so the validator and the HTTP client use the same parser and cannot disagree on destination by construction [ref_id=1]. The same two-layer fix is applied in `NotificationURLValidator.validate_service_url` [patch_id=2974400]. Additionally, the IPv6 unspecified address `::` was added to `BLOCKED_IP_RANGES` because Linux routes `[::]:port` to loopback services just as `0.0.0.0` does [patch_id=2974399].

Preconditions

  • networkAttacker must have network access to a Local Deep Research instance (CVSS AV:N)
  • authAttacker must be able to supply URLs for the application to fetch (e.g., via research queries or notification URLs) — normal authenticated access suffices (CVSS PR:L)
  • inputThe target URL must contain a backslash, ASCII control byte, or whitespace in the authority section to trigger the parser differential

Generated on May 28, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

7

News mentions

0

No linked articles in our index yet.