VYPR
High severityNVD Advisory· Published Feb 7, 2022· Updated Apr 23, 2025

Cookie and header exposure in twisted

CVE-2022-21712

Description

twisted is an event-driven networking engine written in Python. In affected versions twisted exposes cookies and authorization headers when following cross-origin redirects. This issue is present in the twited.web.RedirectAgent and twisted.web. BrowserLikeRedirectAgent functions. Users are advised to upgrade. There are no known workarounds.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
TwistedPyPI
>= 11.1.0, < 22.1.022.1.0

Affected products

1

Patches

1
af8fe78542a6

Merge pull request from GHSA-92x2-jw7w-xvvx

https://github.com/twisted/twistedAlex GaynorJan 23, 2022via ghsa
4 files changed · +206 28
  • src/twisted/newsfragments/10294.bugfix+1 0 added
    @@ -0,0 +1 @@
    +twisted.web.client.RedirectAgent and twisted.web.client.BrowserLikeRedirectAgent now properly remove sensitive headers when redirecting to a different origin.
    \ No newline at end of file
    
  • src/twisted/web/client.py+43 1 modified
    @@ -12,6 +12,7 @@
     import warnings
     import zlib
     from functools import wraps
    +from typing import Iterable
     from urllib.parse import urldefrag, urljoin, urlunparse as _urlunparse
     
     from zope.interface import implementer
    @@ -2110,6 +2111,18 @@ def _handleResponse(self, response):
             return response
     
     
    +_canonicalHeaderName = Headers()._canonicalNameCaps
    +_defaultSensitiveHeaders = frozenset(
    +    [
    +        b"Authorization",
    +        b"Cookie",
    +        b"Cookie2",
    +        b"Proxy-Authorization",
    +        b"WWW-Authenticate",
    +    ]
    +)
    +
    +
     @implementer(IAgent)
     class RedirectAgent:
         """
    @@ -2124,6 +2137,11 @@ class RedirectAgent:
         @param redirectLimit: The maximum number of times the agent is allowed to
             follow redirects before failing with a L{error.InfiniteRedirection}.
     
    +    @param sensitiveHeaderNames: An iterable of C{bytes} enumerating the names
    +        of headers that must not be transmitted when redirecting to a different
    +        origins.  These will be consulted in addition to the protocol-specified
    +        set of headers that contain sensitive information.
    +
         @cvar _redirectResponses: A L{list} of HTTP status codes to be redirected
             for I{GET} and I{HEAD} methods.
     
    @@ -2141,9 +2159,17 @@ class RedirectAgent:
         ]
         _seeOtherResponses = [http.SEE_OTHER]
     
    -    def __init__(self, agent, redirectLimit=20):
    +    def __init__(
    +        self,
    +        agent: IAgent,
    +        redirectLimit: int = 20,
    +        sensitiveHeaderNames: Iterable[bytes] = (),
    +    ):
             self._agent = agent
             self._redirectLimit = redirectLimit
    +        sensitive = {_canonicalHeaderName(each) for each in sensitiveHeaderNames}
    +        sensitive.update(_defaultSensitiveHeaders)
    +        self._sensitiveHeaderNames = sensitive
     
         def request(self, method, uri, headers=None, bodyProducer=None):
             """
    @@ -2186,6 +2212,22 @@ def _handleRedirect(self, response, method, uri, headers, redirectCount):
                 )
                 raise ResponseFailed([Failure(err)], response)
             location = self._resolveLocation(uri, locationHeaders[0])
    +        if headers:
    +            parsedURI = URI.fromBytes(uri)
    +            parsedLocation = URI.fromBytes(location)
    +            sameOrigin = (
    +                (parsedURI.scheme == parsedLocation.scheme)
    +                and (parsedURI.host == parsedLocation.host)
    +                and (parsedURI.port == parsedLocation.port)
    +            )
    +            if not sameOrigin:
    +                headers = Headers(
    +                    {
    +                        rawName: rawValue
    +                        for rawName, rawValue in headers.getAllRawHeaders()
    +                        if rawName not in self._sensitiveHeaderNames
    +                    }
    +                )
             deferred = self._agent.request(method, location, headers)
     
             def _chainResponse(newResponse):
    
  • src/twisted/web/iweb.py+5 5 modified
    @@ -713,12 +713,12 @@ class IAgent(Interface):
         obtained by combining a number of (hypothetical) implementations::
     
             baseAgent = Agent(reactor)
    -        redirect = BrowserLikeRedirectAgent(baseAgent, limit=10)
    +        decode = ContentDecoderAgent(baseAgent, [(b"gzip", GzipDecoder())])
    +        cookie = CookieAgent(decode, diskStore.cookie)
             authenticate = AuthenticateAgent(
    -            redirect, [diskStore.credentials, GtkAuthInterface()])
    -        cookie = CookieAgent(authenticate, diskStore.cookie)
    -        decode = ContentDecoderAgent(cookie, [(b"gzip", GzipDecoder())])
    -        cache = CacheAgent(decode, diskStore.cache)
    +            cookie, [diskStore.credentials, GtkAuthInterface()])
    +        cache = CacheAgent(authenticate, diskStore.cache)
    +        redirect = BrowserLikeRedirectAgent(cache, limit=10)
     
             doSomeRequests(cache)
         """
    
  • src/twisted/web/test/test_agent.py+157 22 modified
    @@ -8,7 +8,8 @@
     import zlib
     from http.cookiejar import CookieJar
     from io import BytesIO
    -from unittest import skipIf
    +from typing import TYPE_CHECKING, List, Optional, Tuple
    +from unittest import SkipTest, skipIf
     
     from zope.interface.declarations import implementer
     from zope.interface.verify import verifyObject
    @@ -76,6 +77,15 @@
         URIInjectionTestsMixin,
     )
     
    +# Creatively lie to mypy about the nature of inheritance, since dealing with
    +# expectations of a mixin class is basically impossible (don't use mixins).
    +if TYPE_CHECKING:
    +    testMixinClass = TestCase
    +    runtimeTestCase = object
    +else:
    +    testMixinClass = object
    +    runtimeTestCase = TestCase
    +
     try:
         from twisted.internet import ssl as _ssl
     except ImportError:
    @@ -108,8 +118,8 @@ class StubHTTPProtocol(Protocol):
             request method is appended to this list.
         """
     
    -    def __init__(self):
    -        self.requests = []
    +    def __init__(self) -> None:
    +        self.requests: List[Tuple[Request, Deferred[IResponse]]] = []
             self.state = "QUIESCENT"
     
         def request(self, request):
    @@ -2587,12 +2597,25 @@ def getConnection(this, key, ep):
             self.assertEqual(agent._pool.connected, True)
     
     
    -class _RedirectAgentTestsMixin:
    +SENSITIVE_HEADERS = [
    +    b"authorization",
    +    b"cookie",
    +    b"cookie2",
    +    b"proxy-authorization",
    +    b"www-authenticate",
    +]
    +
    +
    +class _RedirectAgentTestsMixin(testMixinClass):
         """
         Test cases mixin for L{RedirectAgentTests} and
         L{BrowserLikeRedirectAgentTests}.
         """
     
    +    agent: IAgent
    +    reactor: MemoryReactorClock
    +    protocol: StubHTTPProtocol
    +
         def test_noRedirect(self):
             """
             L{client.RedirectAgent} behaves like L{client.Agent} if the response
    @@ -2611,32 +2634,56 @@ def test_noRedirect(self):
             self.assertIdentical(response, result)
             self.assertIdentical(result.previousResponse, None)
     
    -    def _testRedirectDefault(self, code):
    +    def _testRedirectDefault(
    +        self,
    +        code: int,
    +        crossScheme: bool = False,
    +        crossDomain: bool = False,
    +        crossPort: bool = False,
    +        requestHeaders: Optional[Headers] = None,
    +    ) -> Request:
             """
             When getting a redirect, L{client.RedirectAgent} follows the URL
             specified in the L{Location} header field and make a new request.
     
             @param code: HTTP status code.
             """
    -        self.agent.request(b"GET", b"http://example.com/foo")
    +        startDomain = b"example.com"
    +        startScheme = b"https" if ssl is not None else b"http"
    +        startPort = 80 if startScheme == b"http" else 443
    +        self.agent.request(
    +            b"GET", startScheme + b"://" + startDomain + b"/foo", headers=requestHeaders
    +        )
     
             host, port = self.reactor.tcpClients.pop()[:2]
             self.assertEqual(EXAMPLE_COM_IP, host)
    -        self.assertEqual(80, port)
    +        self.assertEqual(startPort, port)
     
             req, res = self.protocol.requests.pop()
     
    -        # If possible (i.e.: SSL support is present), run the test with a
    +        # If possible (i.e.: TLS support is present), run the test with a
             # cross-scheme redirect to verify that the scheme is honored; if not,
             # let's just make sure it works at all.
    -        if ssl is None:
    -            scheme = b"http"
    -            expectedPort = 80
    -        else:
    -            scheme = b"https"
    -            expectedPort = 443
    -
    -        headers = http_headers.Headers({b"location": [scheme + b"://example.com/bar"]})
    +
    +        targetScheme = startScheme
    +        targetDomain = startDomain
    +        targetPort = startPort
    +
    +        if crossScheme:
    +            if ssl is None:
    +                raise SkipTest(
    +                    "Cross-scheme redirects can't be tested without TLS support."
    +                )
    +            targetScheme = b"https" if startScheme == b"http" else b"http"
    +            targetPort = 443 if startPort == 80 else 80
    +
    +        portSyntax = b""
    +        if crossPort:
    +            targetPort = 8443
    +            portSyntax = b":8443"
    +        targetDomain = b"example.net" if crossDomain else startDomain
    +        locationValue = targetScheme + b"://" + targetDomain + portSyntax + b"/bar"
    +        headers = http_headers.Headers({b"location": [locationValue]})
             response = Response((b"HTTP", 1, 1), code, b"OK", headers, None)
             res.callback(response)
     
    @@ -2645,15 +2692,25 @@ def _testRedirectDefault(self, code):
             self.assertEqual(b"/bar", req2.uri)
     
             host, port = self.reactor.tcpClients.pop()[:2]
    -        self.assertEqual(EXAMPLE_COM_IP, host)
    -        self.assertEqual(expectedPort, port)
    +        self.assertEqual(EXAMPLE_NET_IP if crossDomain else EXAMPLE_COM_IP, host)
    +        self.assertEqual(targetPort, port)
    +        return req2
     
         def test_redirect301(self):
             """
             L{client.RedirectAgent} follows redirects on status code 301.
             """
             self._testRedirectDefault(301)
     
    +    def test_redirect301Scheme(self):
    +        """
    +        L{client.RedirectAgent} follows cross-scheme redirects.
    +        """
    +        self._testRedirectDefault(
    +            301,
    +            crossScheme=True,
    +        )
    +
         def test_redirect302(self):
             """
             L{client.RedirectAgent} follows redirects on status code 302.
    @@ -2672,6 +2729,74 @@ def test_redirect308(self):
             """
             self._testRedirectDefault(308)
     
    +    def _sensitiveHeadersTest(
    +        self, expectedHostHeader: bytes = b"example.com", **crossKwargs: bool
    +    ) -> None:
    +        """
    +        L{client.RedirectAgent} scrubs sensitive headers when redirecting
    +        between differing origins.
    +        """
    +        sensitiveHeaderValues = {
    +            b"authorization": [b"sensitive-authnz"],
    +            b"cookie": [b"sensitive-cookie-data"],
    +            b"cookie2": [b"sensitive-cookie2-data"],
    +            b"proxy-authorization": [b"sensitive-proxy-auth"],
    +            b"wWw-auThentiCate": [b"sensitive-authn"],
    +            b"x-custom-sensitive": [b"sensitive-custom"],
    +        }
    +        otherHeaderValues = {b"x-random-header": [b"x-random-value"]}
    +        allHeaders = Headers({**sensitiveHeaderValues, **otherHeaderValues})
    +        redirected = self._testRedirectDefault(301, requestHeaders=allHeaders)
    +
    +        def normHeaders(headers: Headers) -> dict:
    +            return {k.lower(): v for (k, v) in headers.getAllRawHeaders()}
    +
    +        sameOriginHeaders = normHeaders(redirected.headers)
    +        self.assertEquals(
    +            sameOriginHeaders,
    +            {
    +                b"host": [b"example.com"],
    +                **normHeaders(allHeaders),
    +            },
    +        )
    +
    +        redirectedElsewhere = self._testRedirectDefault(
    +            301,
    +            **crossKwargs,
    +            requestHeaders=Headers({**sensitiveHeaderValues, **otherHeaderValues}),
    +        )
    +        otherOriginHeaders = normHeaders(redirectedElsewhere.headers)
    +        self.assertEquals(
    +            otherOriginHeaders,
    +            {
    +                b"host": [expectedHostHeader],
    +                **normHeaders(Headers(otherHeaderValues)),
    +            },
    +        )
    +
    +    def test_crossDomainHeaders(self) -> None:
    +        """
    +        L{client.RedirectAgent} scrubs sensitive headers when redirecting
    +        between differing domains.
    +        """
    +        self._sensitiveHeadersTest(crossDomain=True, expectedHostHeader=b"example.net")
    +
    +    def test_crossPortHeaders(self) -> None:
    +        """
    +        L{client.RedirectAgent} scrubs sensitive headers when redirecting
    +        between differing ports.
    +        """
    +        self._sensitiveHeadersTest(
    +            crossPort=True, expectedHostHeader=b"example.com:8443"
    +        )
    +
    +    def test_crossSchemeHeaders(self) -> None:
    +        """
    +        L{client.RedirectAgent} scrubs sensitive headers when redirecting
    +        between differing schemes.
    +        """
    +        self._sensitiveHeadersTest(crossScheme=True)
    +
         def _testRedirectToGet(self, code, method):
             """
             L{client.RedirectAgent} changes the method to I{GET} when getting
    @@ -2878,7 +3003,10 @@ def test_responseHistory(self):
     
     
     class RedirectAgentTests(
    -    TestCase, FakeReactorAndConnectMixin, _RedirectAgentTestsMixin, AgentTestsMixin
    +    FakeReactorAndConnectMixin,
    +    _RedirectAgentTestsMixin,
    +    AgentTestsMixin,
    +    runtimeTestCase,
     ):
         """
         Tests for L{client.RedirectAgent}.
    @@ -2888,7 +3016,10 @@ def makeAgent(self):
             """
             @return: a new L{twisted.web.client.RedirectAgent}
             """
    -        return client.RedirectAgent(self.buildAgentForWrapperTest(self.reactor))
    +        return client.RedirectAgent(
    +            self.buildAgentForWrapperTest(self.reactor),
    +            sensitiveHeaderNames=[b"X-Custom-sensitive"],
    +        )
     
         def setUp(self):
             self.reactor = self.createReactor()
    @@ -2912,7 +3043,10 @@ def test_302OnPost(self):
     
     
     class BrowserLikeRedirectAgentTests(
    -    TestCase, FakeReactorAndConnectMixin, _RedirectAgentTestsMixin, AgentTestsMixin
    +    FakeReactorAndConnectMixin,
    +    _RedirectAgentTestsMixin,
    +    AgentTestsMixin,
    +    runtimeTestCase,
     ):
         """
         Tests for L{client.BrowserLikeRedirectAgent}.
    @@ -2923,7 +3057,8 @@ def makeAgent(self):
             @return: a new L{twisted.web.client.BrowserLikeRedirectAgent}
             """
             return client.BrowserLikeRedirectAgent(
    -            self.buildAgentForWrapperTest(self.reactor)
    +            self.buildAgentForWrapperTest(self.reactor),
    +            sensitiveHeaderNames=[b"x-Custom-sensitive"],
             )
     
         def setUp(self):
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

13

News mentions

0

No linked articles in our index yet.