VYPR
Moderate severityNVD Advisory· Published Apr 15, 2021· Updated Aug 3, 2024

SSRF in Sydent due to missing validation of hostnames

CVE-2021-29431

Description

Sydent is a reference Matrix identity server. Sydent can be induced to send HTTP GET requests to internal systems, due to lack of parameter validation or IP address blacklisting. It is not possible to exfiltrate data or control request headers, but it might be possible to use the attack to perform an internal port enumeration. This issue has been addressed in in 9e57334, 8936925, 3d531ed, 0f00412. A potential workaround would be to use a firewall to ensure that Sydent cannot reach internal HTTP resources.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
matrix-sydentPyPI
< 2.3.02.3.0

Affected products

1

Patches

4
3d531ed50d2f

Add BlacklistingReactor

https://github.com/matrix-org/sydentErik JohnstonApr 6, 2021via ghsa
7 files changed · +613 19
  • matrix_is_test/launcher.py+3 0 modified
    @@ -36,6 +36,9 @@
     templates.path = {testsubject_path}/res
     brand.default = is-test
     
    +
    +ip.whitelist = 127.0.0.1
    +
     [email]
     email.tlsmode = 0
     email.invite.subject = %(sender_display_name)s has invited you to chat
    
  • sydent/http/blacklisting_reactor.py+155 0 added
    @@ -0,0 +1,155 @@
    +# -*- coding: utf-8 -*-
    +# Copyright 2021 The Matrix.org Foundation C.I.C.
    +#
    +# Licensed under the Apache License, Version 2.0 (the "License");
    +# you may not use this file except in compliance with the License.
    +# You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +import logging
    +from typing import (
    +    Any,
    +    List,
    +    Optional,
    +)
    +
    +from zope.interface import implementer, provider
    +from netaddr import IPAddress, IPSet
    +
    +from twisted.internet.address import IPv4Address, IPv6Address
    +from twisted.internet.interfaces import (
    +    IAddress,
    +    IHostResolution,
    +    IReactorPluggableNameResolver,
    +    IResolutionReceiver,
    +)
    +
    +
    +logger = logging.getLogger(__name__)
    +
    +
    +def check_against_blacklist(
    +    ip_address: IPAddress, ip_whitelist: Optional[IPSet], ip_blacklist: IPSet
    +) -> bool:
    +    """
    +    Compares an IP address to allowed and disallowed IP sets.
    +
    +    Args:
    +        ip_address: The IP address to check
    +        ip_whitelist: Allowed IP addresses.
    +        ip_blacklist: Disallowed IP addresses.
    +
    +    Returns:
    +        True if the IP address is in the blacklist and not in the whitelist.
    +    """
    +    if ip_address in ip_blacklist:
    +        if ip_whitelist is None or ip_address not in ip_whitelist:
    +            return True
    +    return False
    +
    +
    +class _IPBlacklistingResolver:
    +    """
    +    A proxy for reactor.nameResolver which only produces non-blacklisted IP
    +    addresses, preventing DNS rebinding attacks on URL preview.
    +    """
    +
    +    def __init__(
    +        self,
    +        reactor: IReactorPluggableNameResolver,
    +        ip_whitelist: Optional[IPSet],
    +        ip_blacklist: IPSet,
    +    ):
    +        """
    +        Args:
    +            reactor: The twisted reactor.
    +            ip_whitelist: IP addresses to allow.
    +            ip_blacklist: IP addresses to disallow.
    +        """
    +        self._reactor = reactor
    +        self._ip_whitelist = ip_whitelist
    +        self._ip_blacklist = ip_blacklist
    +
    +    def resolveHostName(
    +        self, recv: IResolutionReceiver, hostname: str, portNumber: int = 0
    +    ) -> IResolutionReceiver:
    +        addresses = []  # type: List[IAddress]
    +
    +        def _callback() -> None:
    +            has_bad_ip = False
    +            for address in addresses:
    +                # We only expect IPv4 and IPv6 addresses since only A/AAAA lookups
    +                # should go through this path.
    +                if not isinstance(address, (IPv4Address, IPv6Address)):
    +                    continue
    +
    +                ip_address = IPAddress(address.host)
    +
    +                if check_against_blacklist(
    +                    ip_address, self._ip_whitelist, self._ip_blacklist
    +                ):
    +                    logger.info(
    +                        "Dropped %s from DNS resolution to %s due to blacklist"
    +                        % (ip_address, hostname)
    +                    )
    +                    has_bad_ip = True
    +
    +            # if we have a blacklisted IP, we'd like to raise an error to block the
    +            # request, but all we can really do from here is claim that there were no
    +            # valid results.
    +            if not has_bad_ip:
    +                for address in addresses:
    +                    recv.addressResolved(address)
    +            recv.resolutionComplete()
    +
    +        @provider(IResolutionReceiver)
    +        class EndpointReceiver:
    +            @staticmethod
    +            def resolutionBegan(resolutionInProgress: IHostResolution) -> None:
    +                recv.resolutionBegan(resolutionInProgress)
    +
    +            @staticmethod
    +            def addressResolved(address: IAddress) -> None:
    +                addresses.append(address)
    +
    +            @staticmethod
    +            def resolutionComplete() -> None:
    +                _callback()
    +
    +        self._reactor.nameResolver.resolveHostName(
    +            EndpointReceiver, hostname, portNumber=portNumber
    +        )
    +
    +        return recv
    +
    +
    +@implementer(IReactorPluggableNameResolver)
    +class BlacklistingReactorWrapper:
    +    """
    +    A Reactor wrapper which will prevent DNS resolution to blacklisted IP
    +    addresses, to prevent DNS rebinding.
    +    """
    +
    +    def __init__(
    +        self,
    +        reactor: IReactorPluggableNameResolver,
    +        ip_whitelist: Optional[IPSet],
    +        ip_blacklist: IPSet,
    +    ):
    +        self._reactor = reactor
    +
    +        # We need to use a DNS resolver which filters out blacklisted IP
    +        # addresses, to prevent DNS rebinding.
    +        self.nameResolver = _IPBlacklistingResolver(
    +            self._reactor, ip_whitelist, ip_blacklist
    +        )
    +
    +    def __getattr__(self, attr: str) -> Any:
    +        # Passthrough to the real reactor except for the DNS resolver.
    +        return getattr(self._reactor, attr)
    
  • sydent/http/httpclient.py+13 4 modified
    @@ -22,8 +22,9 @@
     from twisted.internet import defer
     from twisted.web.client import FileBodyProducer, Agent, readBody
     from twisted.web.http_headers import Headers
    -from sydent.http.matrixfederationagent import MatrixFederationAgent
     
    +from sydent.http.blacklisting_reactor import BlacklistingReactorWrapper
    +from sydent.http.matrixfederationagent import MatrixFederationAgent
     from sydent.http.federation_tls_options import ClientTLSOptionsFactory
     from sydent.http.httpcommon import BodyExceededMaxSize, read_body_with_max_size
     
    @@ -116,7 +117,11 @@ def __init__(self, sydent):
             # BrowserLikePolicyForHTTPS context factory which will do regular cert validation
             # 'like a browser'
             self.agent = Agent(
    -            self.sydent.reactor,
    +            BlacklistingReactorWrapper(
    +                reactor=self.sydent.reactor,
    +                ip_whitelist=sydent.ip_whitelist,
    +                ip_blacklist=sydent.ip_blacklist,
    +            ),
                 connectTimeout=15,
             )
     
    @@ -127,6 +132,10 @@ class FederationHttpClient(HTTPClient):
         def __init__(self, sydent):
             self.sydent = sydent
             self.agent = MatrixFederationAgent(
    -            self.sydent.reactor,
    -            ClientTLSOptionsFactory(sydent.cfg),
    +            BlacklistingReactorWrapper(
    +                reactor=self.sydent.reactor,
    +                ip_whitelist=sydent.ip_whitelist,
    +                ip_blacklist=sydent.ip_blacklist,
    +            ),
    +            ClientTLSOptionsFactory(sydent.cfg) if sydent.use_tls_for_federation else None,
             )
    
  • sydent/sydent.py+39 1 modified
    @@ -24,6 +24,7 @@
     import logging
     import logging.handlers
     import os
    +from typing import Set
     
     import twisted.internet.reactor
     from twisted.internet import task
    @@ -46,6 +47,7 @@
     
     from sydent.util.hash import sha256_and_url_safe_base64
     from sydent.util.tokenutils import generateAlphanumericTokenOfLength
    +from sydent.util.ip_range import generate_ip_set, DEFAULT_IP_RANGE_BLACKLIST
     
     from sydent.sign.ed25519 import SydentEd25519
     
    @@ -107,6 +109,26 @@
             # Whether clients and homeservers can register an association using v1 endpoints.
             'enable_v1_associations': 'true',
             'delete_tokens_on_bind': 'true',
    +
    +        # Prevent outgoing requests from being sent to the following blacklisted
    +        # IP address CIDR ranges. If this option is not specified or empty then
    +        # it defaults to private IP address ranges.
    +        #
    +        # The blacklist applies to all outbound requests except replication
    +        # requests.
    +        #
    +        # (0.0.0.0 and :: are always blacklisted, whether or not they are
    +        # explicitly listed here, since they correspond to unroutable
    +        # addresses.)
    +        'ip.blacklist': '',
    +
    +        # List of IP address CIDR ranges that should be allowed for outbound
    +        # requests. This is useful for specifying exceptions to wide-ranging
    +        # blacklisted target IP ranges.
    +        #
    +        # This whitelist overrides `ip.blacklist` and defaults to an empty
    +        # list.
    +        'ip.whitelist': '',
         },
         'db': {
             'db.file': os.environ.get('SYDENT_DB_PATH', 'sydent.db'),
    @@ -183,9 +205,10 @@
     
     
     class Sydent:
    -    def __init__(self, cfg, reactor=twisted.internet.reactor):
    +    def __init__(self, cfg, reactor=twisted.internet.reactor, use_tls_for_federation=True):
             self.reactor = reactor
             self.config_file = get_config_file_path()
    +        self.use_tls_for_federation = use_tls_for_federation
     
             self.cfg = cfg
     
    @@ -241,6 +264,15 @@ def __init__(self, cfg, reactor=twisted.internet.reactor):
                 self.cfg.get("general", "delete_tokens_on_bind")
             )
     
    +        ip_blacklist = set_from_comma_sep_string(self.cfg.get("general", "ip.blacklist"))
    +        if not ip_blacklist:
    +            ip_blacklist = DEFAULT_IP_RANGE_BLACKLIST
    +
    +        ip_whitelist = set_from_comma_sep_string(self.cfg.get("general", "ip.whitelist"))
    +
    +        self.ip_blacklist = generate_ip_set(ip_blacklist)
    +        self.ip_whitelist = generate_ip_set(ip_whitelist)
    +
             self.default_web_client_location = self.cfg.get(
                 "email", "email.default_web_client_location"
             )
    @@ -512,6 +544,12 @@ def parse_cfg_bool(value):
         return value.lower() == "true"
     
     
    +def set_from_comma_sep_string(rawstr: str) -> Set[str]:
    +    if rawstr == '':
    +        return set()
    +    return {x.strip() for x in rawstr.split(',')}
    +
    +
     def run_gc():
         threshold = gc.get_threshold()
         counts = gc.get_count()
    
  • sydent/util/ip_range.py+118 0 added
    @@ -0,0 +1,118 @@
    +#  Copyright 2021 The Matrix.org Foundation C.I.C.
    +#
    +#  Licensed under the Apache License, Version 2.0 (the "License");
    +#  you may not use this file except in compliance with the License.
    +#  You may obtain a copy of the License at
    +#
    +#      http://www.apache.org/licenses/LICENSE-2.0
    +#
    +#  Unless required by applicable law or agreed to in writing, software
    +#  distributed under the License is distributed on an "AS IS" BASIS,
    +#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +#  See the License for the specific language governing permissions and
    +#  limitations under the License.
    +
    +import itertools
    +from typing import Iterable, Optional
    +
    +from netaddr import AddrFormatError, IPNetwork, IPSet
    +
    +# IP ranges that are considered private / unroutable / don't make sense.
    +DEFAULT_IP_RANGE_BLACKLIST = [
    +    # Localhost
    +    "127.0.0.0/8",
    +    # Private networks.
    +    "10.0.0.0/8",
    +    "172.16.0.0/12",
    +    "192.168.0.0/16",
    +    # Carrier grade NAT.
    +    "100.64.0.0/10",
    +    # Address registry.
    +    "192.0.0.0/24",
    +    # Link-local networks.
    +    "169.254.0.0/16",
    +    # Formerly used for 6to4 relay.
    +    "192.88.99.0/24",
    +    # Testing networks.
    +    "198.18.0.0/15",
    +    "192.0.2.0/24",
    +    "198.51.100.0/24",
    +    "203.0.113.0/24",
    +    # Multicast.
    +    "224.0.0.0/4",
    +    # Localhost
    +    "::1/128",
    +    # Link-local addresses.
    +    "fe80::/10",
    +    # Unique local addresses.
    +    "fc00::/7",
    +    # Testing networks.
    +    "2001:db8::/32",
    +    # Multicast.
    +    "ff00::/8",
    +    # Site-local addresses
    +    "fec0::/10",
    +]
    +
    +
    +def generate_ip_set(
    +    ip_addresses: Optional[Iterable[str]],
    +    extra_addresses: Optional[Iterable[str]] = None,
    +    config_path: Optional[Iterable[str]] = None,
    +) -> IPSet:
    +    """
    +    Generate an IPSet from a list of IP addresses or CIDRs.
    +
    +    Additionally, for each IPv4 network in the list of IP addresses, also
    +    includes the corresponding IPv6 networks.
    +
    +    This includes:
    +
    +    * IPv4-Compatible IPv6 Address (see RFC 4291, section 2.5.5.1)
    +    * IPv4-Mapped IPv6 Address (see RFC 4291, section 2.5.5.2)
    +    * 6to4 Address (see RFC 3056, section 2)
    +
    +    Args:
    +        ip_addresses: An iterable of IP addresses or CIDRs.
    +        extra_addresses: An iterable of IP addresses or CIDRs.
    +        config_path: The path in the configuration for error messages.
    +
    +    Returns:
    +        A new IP set.
    +    """
    +    result = IPSet()
    +    for ip in itertools.chain(ip_addresses or (), extra_addresses or ()):
    +        try:
    +            network = IPNetwork(ip)
    +        except AddrFormatError as e:
    +            raise Exception(
    +                "Invalid IP range provided: %s." % (ip,), config_path
    +            ) from e
    +        result.add(network)
    +
    +        # It is possible that these already exist in the set, but that's OK.
    +        if ":" not in str(network):
    +            result.add(IPNetwork(network).ipv6(ipv4_compatible=True))
    +            result.add(IPNetwork(network).ipv6(ipv4_compatible=False))
    +            result.add(_6to4(network))
    +
    +    return result
    +
    +
    +def _6to4(network: IPNetwork) -> IPNetwork:
    +    """Convert an IPv4 network into a 6to4 IPv6 network per RFC 3056."""
    +
    +    # 6to4 networks consist of:
    +    # * 2002 as the first 16 bits
    +    # * The first IPv4 address in the network hex-encoded as the next 32 bits
    +    # * The new prefix length needs to include the bits from the 2002 prefix.
    +    hex_network = hex(network.first)[2:]
    +    hex_network = ("0" * (8 - len(hex_network))) + hex_network
    +    return IPNetwork(
    +        "2002:%s:%s::/%d"
    +        % (
    +            hex_network[:4],
    +            hex_network[4:],
    +            16 + network.prefixlen,
    +        )
    +    )
    
  • tests/test_blacklisting.py+243 0 added
    @@ -0,0 +1,243 @@
    +#  Copyright 2021 The Matrix.org Foundation C.I.C.
    +#
    +#  Licensed under the Apache License, Version 2.0 (the "License");
    +#  you may not use this file except in compliance with the License.
    +#  You may obtain a copy of the License at
    +#
    +#      http://www.apache.org/licenses/LICENSE-2.0
    +#
    +#  Unless required by applicable law or agreed to in writing, software
    +#  distributed under the License is distributed on an "AS IS" BASIS,
    +#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +#  See the License for the specific language governing permissions and
    +#  limitations under the License.
    +
    +
    +from mock import patch
    +from netaddr import IPSet
    +from twisted.internet import defer
    +from twisted.internet.error import DNSLookupError
    +from twisted.test.proto_helpers import StringTransport
    +from twisted.trial.unittest import TestCase
    +from twisted.web.client import Agent
    +
    +from sydent.http.blacklisting_reactor import BlacklistingReactorWrapper
    +from sydent.http.srvresolver import Server
    +from tests.utils import make_request, make_sydent
    +
    +
    +class BlacklistingAgentTest(TestCase):
    +    def setUp(self):
    +        config = {
    +            "general": {
    +                "ip.blacklist": "5.0.0.0/8",
    +                "ip.whitelist": "5.1.1.1",
    +            },
    +        }
    +
    +        self.sydent = make_sydent(test_config=config)
    +
    +        self.reactor = self.sydent.reactor
    +
    +        self.safe_domain, self.safe_ip = b"safe.test", b"1.2.3.4"
    +        self.unsafe_domain, self.unsafe_ip = b"danger.test", b"5.6.7.8"
    +        self.allowed_domain, self.allowed_ip = b"allowed.test", b"5.1.1.1"
    +
    +        # Configure the reactor's DNS resolver.
    +        for (domain, ip) in (
    +            (self.safe_domain, self.safe_ip),
    +            (self.unsafe_domain, self.unsafe_ip),
    +            (self.allowed_domain, self.allowed_ip),
    +        ):
    +            self.reactor.lookups[domain.decode()] = ip.decode()
    +            self.reactor.lookups[ip.decode()] = ip.decode()
    +
    +        self.ip_whitelist = self.sydent.ip_whitelist
    +        self.ip_blacklist = self.sydent.ip_blacklist
    +
    +    def test_reactor(self):
    +        """Apply the blacklisting reactor and ensure it properly blocks
    +        connections to particular domains and IPs.
    +        """
    +        agent = Agent(
    +            BlacklistingReactorWrapper(
    +                self.reactor,
    +                ip_whitelist=self.ip_whitelist,
    +                ip_blacklist=self.ip_blacklist,
    +            ),
    +        )
    +
    +        # The unsafe domains and IPs should be rejected.
    +        for domain in (self.unsafe_domain, self.unsafe_ip):
    +            self.failureResultOf(
    +                agent.request(b"GET", b"http://" + domain), DNSLookupError
    +            )
    +
    +        self.reactor.tcpClients = []
    +
    +        # The safe domains IPs should be accepted.
    +        for domain in (
    +            self.safe_domain,
    +            self.allowed_domain,
    +            self.safe_ip,
    +            self.allowed_ip,
    +        ):
    +            agent.request(b"GET", b"http://" + domain)
    +
    +            # Grab the latest TCP connection.
    +            (
    +                host,
    +                port,
    +                client_factory,
    +                _timeout,
    +                _bindAddress,
    +            ) = self.reactor.tcpClients.pop()
    +
    +    @patch("sydent.http.srvresolver.SrvResolver.resolve_service")
    +    def test_federation_client_allowed_ip(self, resolver):
    +        self.sydent.run()
    +
    +        request, channel = make_request(
    +            self.sydent.reactor,
    +            "POST",
    +            "/_matrix/identity/v2/account/register",
    +            {
    +                "access_token": "foo",
    +                "expires_in": 300,
    +                "matrix_server_name": "example.com",
    +                "token_type": "Bearer",
    +            },
    +        )
    +
    +        resolver.return_value = defer.succeed(
    +            [
    +                Server(
    +                    host=self.allowed_domain,
    +                    port=443,
    +                    priority=1,
    +                    weight=1,
    +                    expires=100,
    +                )
    +            ]
    +        )
    +
    +        request.render(self.sydent.servlets.registerServlet)
    +
    +        transport, protocol = self._get_http_request(
    +            self.allowed_ip.decode("ascii"), 443
    +        )
    +
    +        self.assertRegex(
    +            transport.value(), b"^GET /_matrix/federation/v1/openid/userinfo"
    +        )
    +        self.assertRegex(transport.value(), b"Host: example.com")
    +
    +        # Send it the HTTP response
    +        res_json = '{ "sub": "@test:example.com" }'.encode("ascii")
    +        protocol.dataReceived(
    +            b"HTTP/1.1 200 OK\r\n"
    +            b"Server: Fake\r\n"
    +            b"Content-Type: application/json\r\n"
    +            b"Content-Length: %i\r\n"
    +            b"\r\n"
    +            b"%s" % (len(res_json), res_json)
    +        )
    +
    +        self.assertEqual(channel.code, 200)
    +
    +    @patch("sydent.http.srvresolver.SrvResolver.resolve_service")
    +    def test_federation_client_safe_ip(self, resolver):
    +        self.sydent.run()
    +
    +        request, channel = make_request(
    +            self.sydent.reactor,
    +            "POST",
    +            "/_matrix/identity/v2/account/register",
    +            {
    +                "access_token": "foo",
    +                "expires_in": 300,
    +                "matrix_server_name": "example.com",
    +                "token_type": "Bearer",
    +            },
    +        )
    +
    +        resolver.return_value = defer.succeed(
    +            [
    +                Server(
    +                    host=self.safe_domain,
    +                    port=443,
    +                    priority=1,
    +                    weight=1,
    +                    expires=100,
    +                )
    +            ]
    +        )
    +
    +        request.render(self.sydent.servlets.registerServlet)
    +
    +        transport, protocol = self._get_http_request(self.safe_ip.decode("ascii"), 443)
    +
    +        self.assertRegex(
    +            transport.value(), b"^GET /_matrix/federation/v1/openid/userinfo"
    +        )
    +        self.assertRegex(transport.value(), b"Host: example.com")
    +
    +        # Send it the HTTP response
    +        res_json = '{ "sub": "@test:example.com" }'.encode("ascii")
    +        protocol.dataReceived(
    +            b"HTTP/1.1 200 OK\r\n"
    +            b"Server: Fake\r\n"
    +            b"Content-Type: application/json\r\n"
    +            b"Content-Length: %i\r\n"
    +            b"\r\n"
    +            b"%s" % (len(res_json), res_json)
    +        )
    +
    +        self.assertEqual(channel.code, 200)
    +
    +    @patch("sydent.http.srvresolver.SrvResolver.resolve_service")
    +    def test_federation_client_unsafe_ip(self, resolver):
    +        self.sydent.run()
    +
    +        request, channel = make_request(
    +            self.sydent.reactor,
    +            "POST",
    +            "/_matrix/identity/v2/account/register",
    +            {
    +                "access_token": "foo",
    +                "expires_in": 300,
    +                "matrix_server_name": "example.com",
    +                "token_type": "Bearer",
    +            },
    +        )
    +
    +        resolver.return_value = defer.succeed(
    +            [
    +                Server(
    +                    host=self.unsafe_domain,
    +                    port=443,
    +                    priority=1,
    +                    weight=1,
    +                    expires=100,
    +                )
    +            ]
    +        )
    +
    +        request.render(self.sydent.servlets.registerServlet)
    +
    +        self.assertNot(self.reactor.tcpClients)
    +
    +        self.assertEqual(channel.code, 500)
    +
    +    def _get_http_request(self, expected_host, expected_port):
    +        clients = self.reactor.tcpClients
    +        (host, port, factory, _timeout, _bindAddress) = clients[-1]
    +        self.assertEqual(host, expected_host)
    +        self.assertEqual(port, expected_port)
    +
    +        # complete the connection and wire it up to a fake transport
    +        protocol = factory.buildProtocol(None)
    +        transport = StringTransport()
    +        protocol.makeConnection(transport)
    +
    +        return transport, protocol
    
  • tests/utils.py+42 14 modified
    @@ -2,9 +2,19 @@
     from io import BytesIO
     import logging
     import os
    -
    +from typing import Dict
     import attr
     from six import text_type
    +from zope.interface import implementer
    +from twisted.internet._resolver import SimpleResolverComplexifier
    +from twisted.internet.defer import fail, succeed
    +from twisted.internet.error import DNSLookupError
    +from twisted.internet.interfaces import (
    +    IHostnameResolver,
    +    IReactorPluggableNameResolver,
    +    IResolverSimple,
    +)
    +
     from twisted.internet import address
     import twisted.logger
     from twisted.web.http_headers import Headers
    @@ -53,13 +63,13 @@ def make_sydent(test_config={}):
         # Use an in-memory SQLite database. Note that the database isn't cleaned up between
         # tests, so by default the same database will be used for each test if changed to be
         # a file on disk.
    -    if 'db' not in test_config:
    -        test_config['db'] = {'db.file': ':memory:'}
    +    if "db" not in test_config:
    +        test_config["db"] = {"db.file": ":memory:"}
         else:
    -        test_config['db'].setdefault('db.file', ':memory:')
    +        test_config["db"].setdefault("db.file", ":memory:")
     
    -    reactor = MemoryReactorClock()
    -    return Sydent(reactor=reactor, cfg=parse_config_dict(test_config))
    +    reactor = ResolvingMemoryReactorClock()
    +    return Sydent(reactor=reactor, cfg=parse_config_dict(test_config), use_tls_for_federation=False)
     
     
     @attr.s
    @@ -149,6 +159,7 @@ def getPeerCertificate(self):
     
     class FakeSite:
         """A fake Twisted Web Site."""
    +
         pass
     
     
    @@ -191,10 +202,7 @@ def make_request(
             path = path.encode("ascii")
     
         # Decorate it to be the full path, if we're using shorthand
    -    if (
    -        shorthand
    -        and not path.startswith(b"/_matrix")
    -    ):
    +    if shorthand and not path.startswith(b"/_matrix"):
             path = b"/_matrix/identity/v2/" + path
             path = path.replace(b"//", b"/")
     
    @@ -253,10 +261,7 @@ def setup_logging():
         """
         root_logger = logging.getLogger()
     
    -    log_format = (
    -        "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s"
    -        " - %(message)s"
    -    )
    +    log_format = "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s" " - %(message)s"
     
         handler = ToTwistedHandler()
         formatter = logging.Formatter(log_format)
    @@ -268,3 +273,26 @@ def setup_logging():
     
     
     setup_logging()
    +
    +
    +@implementer(IReactorPluggableNameResolver)
    +class ResolvingMemoryReactorClock(MemoryReactorClock):
    +    """
    +    A MemoryReactorClock that supports name resolution.
    +    """
    +
    +    def __init__(self):
    +        lookups = self.lookups = {}  # type: Dict[str, str]
    +
    +        @implementer(IResolverSimple)
    +        class FakeResolver:
    +            def getHostByName(self, name, timeout=None):
    +                if name not in lookups:
    +                    return fail(DNSLookupError("OH NO: unknown %s" % (name,)))
    +                return succeed(lookups[name])
    +
    +        self.nameResolver = SimpleResolverComplexifier(FakeResolver())
    +        super().__init__()
    +
    +    def installNameResolver(self, resolver: IHostnameResolver) -> IHostnameResolver:
    +        raise NotImplementedError()
    
0f00412017f2

Allow IPv6 literals as Matrix server names.

https://github.com/matrix-org/sydentDenis KasakApr 6, 2021via ghsa
6 files changed · +108 47
  • sydent/hs_federation/verifier.py+3 3 modified
    @@ -25,7 +25,7 @@
     from signedjson.sign import SignatureVerifyException
     
     from sydent.http.httpclient import FederationHttpClient
    -from sydent.util.stringutils import is_valid_hostname
    +from sydent.util.stringutils import is_valid_matrix_server_name
     
     
     logger = logging.getLogger(__name__)
    @@ -205,8 +205,8 @@ def strip_quotes(value):
             if not json_request["signatures"]:
                 raise NoAuthenticationError("Missing X-Matrix Authorization header")
     
    -        if not is_valid_hostname(json_request["origin"]):
    -            raise InvalidServerName("X-Matrix header's origin parameter must be a valid hostname")
    +        if not is_valid_matrix_server_name(json_request["origin"]):
    +            raise InvalidServerName("X-Matrix header's origin parameter must be a valid Matrix server name")
     
             yield self.verifyServerSignedJson(json_request, [origin])
     
    
  • sydent/http/servlets/registerservlet.py+4 4 modified
    @@ -25,7 +25,7 @@
     from sydent.http.servlets import get_args, jsonwrap, deferjsonwrap, send_cors
     from sydent.http.httpclient import FederationHttpClient
     from sydent.users.tokens import issueToken
    -from sydent.util.stringutils import is_valid_hostname
    +from sydent.util.stringutils import is_valid_matrix_server_name
     
     logger = logging.getLogger(__name__)
     
    @@ -49,11 +49,11 @@ def render_POST(self, request):
     
             matrix_server = args['matrix_server_name'].lower()
     
    -        if not is_valid_hostname(matrix_server):
    +        if not is_valid_matrix_server_name(matrix_server):
                 request.setResponseCode(400)
                 return {
                     'errcode': 'M_INVALID_PARAM',
    -                'error': 'matrix_server_name must be a valid hostname'
    +                'error': 'matrix_server_name must be a valid Matrix server name (IP address or hostname)'
                 }
     
             result = yield self.client.get_json(
    @@ -89,7 +89,7 @@ def render_POST(self, request):
     
             user_id_server = user_id_components[1]
     
    -        if not is_valid_hostname(user_id_server):
    +        if not is_valid_matrix_server_name(user_id_server):
                 request.setResponseCode(500)
                 return {
                     'errcode': 'M_UNKNOWN',
    
  • sydent/threepid/bind.py+4 4 modified
    @@ -32,7 +32,7 @@
     
     from sydent.threepid import ThreepidAssociation
     
    -from sydent.util.stringutils import is_valid_hostname
    +from sydent.util.stringutils import is_valid_matrix_server_name
     
     from twisted.internet import defer
     
    @@ -143,9 +143,9 @@ def _notify(self, assoc, attempt):
     
             matrix_server = mxid_parts[1]
     
    -        if not is_valid_hostname(matrix_server):
    +        if not is_valid_matrix_server_name(matrix_server):
                 logger.error(
    -                "MXID server part '%s' not a valid hostname. Not retrying.",
    +                "MXID server part '%s' not a valid Matrix server name. Not retrying.",
                     matrix_server,
                 )
                 return
    @@ -184,7 +184,7 @@ def _notify(self, assoc, attempt):
                         "Successfully deleted invite for %s from the store",
                         assoc["address"],
                     )
    -            except Exception as e:
    +            except Exception:
                     logger.exception(
                         "Couldn't remove invite for %s from the store",
                         assoc["address"],
    
  • sydent/util/stringutils.py+67 18 modified
    @@ -13,11 +13,14 @@
     # See the License for the specific language governing permissions and
     # limitations under the License.
     import re
    +from typing import Optional, Tuple
    +
    +from twisted.internet.abstract import isIPAddress, isIPv6Address
     
     # https://matrix.org/docs/spec/client_server/r0.6.0#post-matrix-client-r0-register-email-requesttoken
     client_secret_regex = re.compile(r"^[0-9a-zA-Z\.\=\_\-]+$")
     
    -# hostname/domain name + optional port
    +# hostname/domain name
     # https://regex101.com/r/OyN1lg/2
     hostname_regex = re.compile(
         r"^(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?)(?:\.[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?)*$",
    @@ -37,12 +40,10 @@ def is_valid_client_secret(client_secret):
     
     
     def is_valid_hostname(string: str) -> bool:
    -    """Validate that a given string is a valid hostname or domain name, with an
    -    optional port number.
    +    """Validate that a given string is a valid hostname or domain name.
     
         For domain names, this only validates that the form is right (for
    -    instance, it doesn't check that the TLD is valid). If a port is
    -    specified, it has to be a valid port number.
    +    instance, it doesn't check that the TLD is valid).
     
         :param string: The string to validate
         :type string: str
    @@ -51,20 +52,68 @@ def is_valid_hostname(string: str) -> bool:
         :rtype: bool
         """
     
    -    host_parts = string.split(":", 1)
    +    return hostname_regex.match(string) is not None
    +
    +
    +def parse_server_name(server_name: str) -> Tuple[str, Optional[int]]:
    +    """Split a server name into host/port parts.
     
    -    if len(host_parts) == 1:
    -        return hostname_regex.match(string) is not None
    -    else:
    -        host, port = host_parts
    -        valid_hostname = hostname_regex.match(host) is not None
    +    No validation is done on the host part. The port part is validated to be
    +    a valid port number.
     
    -        try:
    +    Args:
    +        server_name: server name to parse
    +
    +    Returns:
    +        host/port parts.
    +
    +    Raises:
    +        ValueError if the server name could not be parsed.
    +    """
    +    try:
    +        if server_name[-1] == "]":
    +            # ipv6 literal, hopefully
    +            return server_name, None
    +
    +        host_port = server_name.rsplit(":", 1)
    +        host = host_port[0]
    +        port = host_port[1] if host_port[1:] else None
    +
    +        if port:
                 port_num = int(port)
    -            valid_port = (
    -                port == str(port_num)  # exclude things like '08090' or ' 8090'
    -                and 1 <= port_num < 65536)
    -        except ValueError:
    -            valid_port = False
     
    -        return valid_hostname and valid_port
    +            # exclude things like '08090' or ' 8090'
    +            if port != str(port_num) or not (1 <= port_num < 65536):
    +                raise ValueError("Invalid port")
    +
    +        return host, port
    +    except Exception:
    +        raise ValueError("Invalid server name '%s'" % server_name)
    +
    +
    +def is_valid_matrix_server_name(string: str) -> bool:
    +    """Validate that the given string is a valid Matrix server name.
    +
    +    A string is a valid Matrix server name if it is one of the following, plus
    +    an optional port:
    +
    +    a. IPv4 address
    +    b. IPv6 literal (`[IPV6_ADDRESS]`)
    +    c. A valid hostname
    +
    +    :param string: The string to validate
    +    :type string: str
    +
    +    :return: Whether the input is a valid Matrix server name
    +    :rtype: bool
    +    """
    +
    +    try:
    +        host, port = parse_server_name(string)
    +    except ValueError:
    +        return False
    +
    +    valid_ipv4_addr = isIPAddress(host)
    +    valid_ipv6_literal = host[0] == "[" and host[-1] == "]" and isIPv6Address(host[1:-1])
    +
    +    return valid_ipv4_addr or valid_ipv6_literal or is_valid_hostname(host)
    
  • tests/test_register.py+1 0 modified
    @@ -21,6 +21,7 @@
     
     class RegisterTestCase(unittest.TestCase):
         """Tests Sydent's register servlet"""
    +
         def setUp(self):
             # Create a new sydent
             self.sydent = make_sydent()
    
  • tests/test_util.py+29 18 modified
    @@ -1,26 +1,37 @@
     from twisted.trial import unittest
    -from sydent.util.stringutils import is_valid_hostname
    +from sydent.util.stringutils import is_valid_matrix_server_name
     
     
     class UtilTests(unittest.TestCase):
         """Tests Sydent utility functions."""
    -    def test_is_valid_hostname(self):
    -        """Tests that the is_valid_hostname function accepts only valid
    -        hostnames (or domain names), with optional port number.
    +
    +    def test_is_valid_matrix_server_name(self):
    +        """Tests that the is_valid_matrix_server_name function accepts only
    +        valid hostnames (or domain names), with optional port number.
             """
    +        self.assertTrue(is_valid_matrix_server_name("9.9.9.9"))
    +        self.assertTrue(is_valid_matrix_server_name("9.9.9.9:4242"))
    +        self.assertTrue(is_valid_matrix_server_name("[::]"))
    +        self.assertTrue(is_valid_matrix_server_name("[::]:4242"))
    +        self.assertTrue(is_valid_matrix_server_name("[a:b:c::]:4242"))
    +
    +        self.assertTrue(is_valid_matrix_server_name("example.com"))
    +        self.assertTrue(is_valid_matrix_server_name("EXAMPLE.COM"))
    +        self.assertTrue(is_valid_matrix_server_name("ExAmPlE.CoM"))
    +        self.assertTrue(is_valid_matrix_server_name("example.com:4242"))
    +        self.assertTrue(is_valid_matrix_server_name("localhost"))
    +        self.assertTrue(is_valid_matrix_server_name("localhost:9000"))
    +        self.assertTrue(is_valid_matrix_server_name("a.b.c.d:1234"))
     
    -        self.assertTrue(is_valid_hostname("example.com"))
    -        self.assertTrue(is_valid_hostname("EXAMPLE.COM"))
    -        self.assertTrue(is_valid_hostname("ExAmPlE.CoM"))
    -        self.assertTrue(is_valid_hostname("example.com:4242"))
    -        self.assertTrue(is_valid_hostname("localhost"))
    -        self.assertTrue(is_valid_hostname("localhost:9000"))
    -        self.assertTrue(is_valid_hostname("a.b:1234"))
    +        self.assertFalse(is_valid_matrix_server_name("[:::]"))
    +        self.assertFalse(is_valid_matrix_server_name("a:b:c::"))
     
    -        self.assertFalse(is_valid_hostname("example.com:65536"))
    -        self.assertFalse(is_valid_hostname("example.com:0"))
    -        self.assertFalse(is_valid_hostname("example.com:a"))
    -        self.assertFalse(is_valid_hostname("example.com:04242"))
    -        self.assertFalse(is_valid_hostname("example.com: 4242"))
    -        self.assertFalse(is_valid_hostname("example.com/example.com"))
    -        self.assertFalse(is_valid_hostname("example.com#example.com"))
    +        self.assertFalse(is_valid_matrix_server_name("example.com:65536"))
    +        self.assertFalse(is_valid_matrix_server_name("example.com:0"))
    +        self.assertFalse(is_valid_matrix_server_name("example.com:-1"))
    +        self.assertFalse(is_valid_matrix_server_name("example.com:a"))
    +        self.assertFalse(is_valid_matrix_server_name("example.com: "))
    +        self.assertFalse(is_valid_matrix_server_name("example.com:04242"))
    +        self.assertFalse(is_valid_matrix_server_name("example.com: 4242"))
    +        self.assertFalse(is_valid_matrix_server_name("example.com/example.com"))
    +        self.assertFalse(is_valid_matrix_server_name("example.com#example.com"))
    
8936925f561b

Validate hostname in more places

https://github.com/matrix-org/sydentDenis KasakMar 31, 2021via ghsa
3 files changed · +33 5
  • sydent/hs_federation/verifier.py+11 0 modified
    @@ -25,6 +25,7 @@
     from signedjson.sign import SignatureVerifyException
     
     from sydent.http.httpclient import FederationHttpClient
    +from sydent.util.stringutils import is_valid_hostname
     
     
     logger = logging.getLogger(__name__)
    @@ -37,6 +38,13 @@ class NoAuthenticationError(Exception):
         pass
     
     
    +class InvalidServerName(Exception):
    +    """
    +    Raised when the provided origin parameter is not a valid hostname (plus optional port).
    +    """
    +    pass
    +
    +
     class Verifier(object):
         """
         Verifies signed json blobs from Matrix Homeservers by finding the
    @@ -197,6 +205,9 @@ def strip_quotes(value):
             if not json_request["signatures"]:
                 raise NoAuthenticationError("Missing X-Matrix Authorization header")
     
    +        if not is_valid_hostname(json_request["origin"]):
    +            raise InvalidServerName("X-Matrix header's origin parameter must be a valid hostname")
    +
             yield self.verifyServerSignedJson(json_request, [origin])
     
             logger.info("Verified request from HS %s", origin)
    
  • sydent/http/servlets/threepidunbindservlet.py+9 4 modified
    @@ -19,7 +19,7 @@
     import json
     import logging
     
    -from sydent.hs_federation.verifier import NoAuthenticationError
    +from sydent.hs_federation.verifier import NoAuthenticationError, InvalidServerName
     from signedjson.sign import SignatureVerifyException
     
     from sydent.http.servlets import dict_to_json_bytes
    @@ -81,7 +81,7 @@ def _async_render_POST(self, request):
                 # and "client_secret" fields, they are trying to prove that they
                 # were the original author of the bind. We then check that what
                 # they supply matches and if it does, allow the unbind.
    -            # 
    +            #
                 # However if these fields are not supplied, we instead check
                 # whether the request originated from a homeserver, and if so the
                 # same homeserver that originally created the bind. We do this by
    @@ -121,7 +121,7 @@ def _async_render_POST(self, request):
                             'error': "This validation session has not yet been completed"
                         }))
                         return
    -                
    +
                     if s.medium != threepid['medium'] or s.address != threepid['address']:
                         request.setResponseCode(403)
                         request.write(dict_to_json_bytes({
    @@ -143,7 +143,12 @@ def _async_render_POST(self, request):
                         request.write(dict_to_json_bytes({'errcode': 'M_FORBIDDEN', 'error': str(ex)}))
                         request.finish()
                         return
    -                except:
    +                except InvalidServerName as ex:
    +                    request.setResponseCode(400)
    +                    request.write(dict_to_json_bytes({'errcode': 'M_INVALID_PARAM', 'error': str(ex)}))
    +                    request.finish()
    +                    return
    +                except Exception:
                         logger.exception("Exception whilst authenticating unbind request")
                         request.setResponseCode(500)
                         request.write(dict_to_json_bytes({'errcode': 'M_UNKNOWN', 'error': 'Internal Server Error'}))
    
  • sydent/threepid/bind.py+13 1 modified
    @@ -32,6 +32,8 @@
     
     from sydent.threepid import ThreepidAssociation
     
    +from sydent.util.stringutils import is_valid_hostname
    +
     from twisted.internet import defer
     
     logger = logging.getLogger(__name__)
    @@ -131,15 +133,25 @@ def _notify(self, assoc, attempt):
             """
             mxid = assoc["mxid"]
             mxid_parts = mxid.split(":", 1)
    +
             if len(mxid_parts) != 2:
                 logger.error(
                     "Can't notify on bind for unparseable mxid %s. Not retrying.",
                     assoc["mxid"],
                 )
                 return
     
    +        matrix_server = mxid_parts[1]
    +
    +        if not is_valid_hostname(matrix_server):
    +            logger.error(
    +                "MXID server part '%s' not a valid hostname. Not retrying.",
    +                matrix_server,
    +            )
    +            return
    +
             post_url = "matrix://%s/_matrix/federation/v1/3pid/onbind" % (
    -            mxid_parts[1],
    +            matrix_server,
             )
     
             logger.info("Making bind callback to: %s", post_url)
    
9e573348d81d

Rework hostname validation to make port checking stricter.

https://github.com/matrix-org/sydentDenis KasakMar 24, 2021via ghsa
5 files changed · +128 6
  • sydent/http/servlets/registerservlet.py+14 3 modified
    @@ -25,7 +25,7 @@
     from sydent.http.servlets import get_args, jsonwrap, deferjsonwrap, send_cors
     from sydent.http.httpclient import FederationHttpClient
     from sydent.users.tokens import issueToken
    -
    +from sydent.util.stringutils import is_valid_hostname
     
     logger = logging.getLogger(__name__)
     
    @@ -47,9 +47,20 @@ def render_POST(self, request):
     
             args = get_args(request, ('matrix_server_name', 'access_token'))
     
    +        hostname = args['matrix_server_name'].lower()
    +
    +        if not is_valid_hostname(hostname):
    +            request.setResponseCode(400)
    +            return {
    +                'errcode': 'M_INVALID_PARAM',
    +                'error': 'matrix_server_name must be a valid hostname'
    +            }
    +
             result = yield self.client.get_json(
    -            "matrix://%s/_matrix/federation/v1/openid/userinfo?access_token=%s" % (
    -                args['matrix_server_name'], urllib.parse.quote(args['access_token']),
    +            "matrix://%s/_matrix/federation/v1/openid/userinfo?access_token=%s"
    +            % (
    +                hostname,
    +                urllib.parse.quote(args['access_token']),
                 ),
                 1024 * 5,
             )
    
  • sydent/util/stringutils.py+41 1 modified
    @@ -17,14 +17,54 @@
     # https://matrix.org/docs/spec/client_server/r0.6.0#post-matrix-client-r0-register-email-requesttoken
     client_secret_regex = re.compile(r"^[0-9a-zA-Z\.\=\_\-]+$")
     
    +# hostname/domain name + optional port
    +# https://regex101.com/r/OyN1lg/2
    +hostname_regex = re.compile(
    +    r"^(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?)(?:\.[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?)*$",
    +    flags=re.IGNORECASE)
    +
     
     def is_valid_client_secret(client_secret):
         """Validate that a given string matches the client_secret regex defined by the spec
     
         :param client_secret: The client_secret to validate
    -    :type client_secret: unicode
    +    :type client_secret: str
     
         :return: Whether the client_secret is valid
         :rtype: bool
         """
         return client_secret_regex.match(client_secret) is not None
    +
    +
    +def is_valid_hostname(string: str) -> bool:
    +    """Validate that a given string is a valid hostname or domain name, with an
    +    optional port number.
    +
    +    For domain names, this only validates that the form is right (for
    +    instance, it doesn't check that the TLD is valid). If a port is
    +    specified, it has to be a valid port number.
    +
    +    :param string: The string to validate
    +    :type string: str
    +
    +    :return: Whether the input is a valid hostname
    +    :rtype: bool
    +    """
    +
    +    host_parts = string.split(":", 1)
    +
    +    if len(host_parts) == 1:
    +        return hostname_regex.match(string) is not None
    +    else:
    +        host, port = host_parts
    +        valid_hostname = hostname_regex.match(host) is not None
    +
    +        try:
    +            port_num = int(port)
    +            valid_port = (
    +                port == str(port_num)  # exclude things like '08090' or ' 8090'
    +                and 1 <= port_num < 65536
    +        except ValueError:
    +            valid_port = False
    +
    +        return valid_hostname and valid_port
    
  • tests/test_auth.py+2 2 modified
    @@ -44,7 +44,7 @@ def setUp(self):
             self.sydent.db.commit()
     
         def test_can_read_token_from_headers(self):
    -        """Tests that Sydent correct extracts an auth token from request headers"""
    +        """Tests that Sydent correctly extracts an auth token from request headers"""
             self.sydent.run()
     
             request, _ = make_request(
    @@ -59,7 +59,7 @@ def test_can_read_token_from_headers(self):
             self.assertEqual(token, self.test_token)
     
         def test_can_read_token_from_query_parameters(self):
    -        """Tests that Sydent correct extracts an auth token from query parameters"""
    +        """Tests that Sydent correctly extracts an auth token from query parameters"""
             self.sydent.run()
     
             request, _ = make_request(
    
  • tests/test_register.py+45 0 added
    @@ -0,0 +1,45 @@
    +# -*- coding: utf-8 -*-
    +
    +# Copyright 2021 The Matrix.org Foundation C.I.C.
    +#
    +# Licensed under the Apache License, Version 2.0 (the "License");
    +# you may not use this file except in compliance with the License.
    +# You may obtain a copy of the License at
    +#
    +#     http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +
    +from twisted.trial import unittest
    +
    +from tests.utils import make_request, make_sydent
    +
    +
    +class RegisterTestCase(unittest.TestCase):
    +    """Tests Sydent's register servlet"""
    +    def setUp(self):
    +        # Create a new sydent
    +        self.sydent = make_sydent()
    +
    +    def test_sydent_rejects_invalid_hostname(self):
    +        """Tests that the /register endpoint rejects an invalid hostname passed as matrix_server_name"""
    +        self.sydent.run()
    +
    +        bad_hostname = "example.com#"
    +
    +        request, channel = make_request(
    +            self.sydent.reactor,
    +            "POST",
    +            "/_matrix/identity/v2/account/register",
    +            content={
    +                "matrix_server_name": bad_hostname,
    +                "access_token": "foo"
    +            })
    +
    +        request.render(self.sydent.servlets.registerServlet)
    +
    +        self.assertEqual(channel.code, 400)
    
  • tests/test_util.py+26 0 added
    @@ -0,0 +1,26 @@
    +from twisted.trial import unittest
    +from sydent.util.stringutils import is_valid_hostname
    +
    +
    +class UtilTests(unittest.TestCase):
    +    """Tests Sydent utility functions."""
    +    def test_is_valid_hostname(self):
    +        """Tests that the is_valid_hostname function accepts only valid
    +        hostnames (or domain names), with optional port number.
    +        """
    +
    +        self.assertTrue(is_valid_hostname("example.com"))
    +        self.assertTrue(is_valid_hostname("EXAMPLE.COM"))
    +        self.assertTrue(is_valid_hostname("ExAmPlE.CoM"))
    +        self.assertTrue(is_valid_hostname("example.com:4242"))
    +        self.assertTrue(is_valid_hostname("localhost"))
    +        self.assertTrue(is_valid_hostname("localhost:9000"))
    +        self.assertTrue(is_valid_hostname("a.b:1234"))
    +
    +        self.assertFalse(is_valid_hostname("example.com:65536"))
    +        self.assertFalse(is_valid_hostname("example.com:0"))
    +        self.assertFalse(is_valid_hostname("example.com:a"))
    +        self.assertFalse(is_valid_hostname("example.com:04242"))
    +        self.assertFalse(is_valid_hostname("example.com: 4242"))
    +        self.assertFalse(is_valid_hostname("example.com/example.com"))
    +        self.assertFalse(is_valid_hostname("example.com#example.com"))
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

11

News mentions

0

No linked articles in our index yet.