High severityNVD Advisory· Published Feb 7, 2022· Updated Apr 23, 2025
Cookie and header exposure in twisted
CVE-2022-21712
Description
twisted is an event-driven networking engine written in Python. In affected versions twisted exposes cookies and authorization headers when following cross-origin redirects. This issue is present in the twited.web.RedirectAgent and twisted.web. BrowserLikeRedirectAgent functions. Users are advised to upgrade. There are no known workarounds.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
TwistedPyPI | >= 11.1.0, < 22.1.0 | 22.1.0 |
Affected products
1Patches
1af8fe78542a6Merge pull request from GHSA-92x2-jw7w-xvvx
4 files changed · +206 −28
src/twisted/newsfragments/10294.bugfix+1 −0 added@@ -0,0 +1 @@ +twisted.web.client.RedirectAgent and twisted.web.client.BrowserLikeRedirectAgent now properly remove sensitive headers when redirecting to a different origin. \ No newline at end of file
src/twisted/web/client.py+43 −1 modified@@ -12,6 +12,7 @@ import warnings import zlib from functools import wraps +from typing import Iterable from urllib.parse import urldefrag, urljoin, urlunparse as _urlunparse from zope.interface import implementer @@ -2110,6 +2111,18 @@ def _handleResponse(self, response): return response +_canonicalHeaderName = Headers()._canonicalNameCaps +_defaultSensitiveHeaders = frozenset( + [ + b"Authorization", + b"Cookie", + b"Cookie2", + b"Proxy-Authorization", + b"WWW-Authenticate", + ] +) + + @implementer(IAgent) class RedirectAgent: """ @@ -2124,6 +2137,11 @@ class RedirectAgent: @param redirectLimit: The maximum number of times the agent is allowed to follow redirects before failing with a L{error.InfiniteRedirection}. + @param sensitiveHeaderNames: An iterable of C{bytes} enumerating the names + of headers that must not be transmitted when redirecting to a different + origins. These will be consulted in addition to the protocol-specified + set of headers that contain sensitive information. + @cvar _redirectResponses: A L{list} of HTTP status codes to be redirected for I{GET} and I{HEAD} methods. @@ -2141,9 +2159,17 @@ class RedirectAgent: ] _seeOtherResponses = [http.SEE_OTHER] - def __init__(self, agent, redirectLimit=20): + def __init__( + self, + agent: IAgent, + redirectLimit: int = 20, + sensitiveHeaderNames: Iterable[bytes] = (), + ): self._agent = agent self._redirectLimit = redirectLimit + sensitive = {_canonicalHeaderName(each) for each in sensitiveHeaderNames} + sensitive.update(_defaultSensitiveHeaders) + self._sensitiveHeaderNames = sensitive def request(self, method, uri, headers=None, bodyProducer=None): """ @@ -2186,6 +2212,22 @@ def _handleRedirect(self, response, method, uri, headers, redirectCount): ) raise ResponseFailed([Failure(err)], response) location = self._resolveLocation(uri, locationHeaders[0]) + if headers: + parsedURI = URI.fromBytes(uri) + parsedLocation = URI.fromBytes(location) + sameOrigin = ( + (parsedURI.scheme == parsedLocation.scheme) + and (parsedURI.host == parsedLocation.host) + and (parsedURI.port == parsedLocation.port) + ) + if not sameOrigin: + headers = Headers( + { + rawName: rawValue + for rawName, rawValue in headers.getAllRawHeaders() + if rawName not in self._sensitiveHeaderNames + } + ) deferred = self._agent.request(method, location, headers) def _chainResponse(newResponse):
src/twisted/web/iweb.py+5 −5 modified@@ -713,12 +713,12 @@ class IAgent(Interface): obtained by combining a number of (hypothetical) implementations:: baseAgent = Agent(reactor) - redirect = BrowserLikeRedirectAgent(baseAgent, limit=10) + decode = ContentDecoderAgent(baseAgent, [(b"gzip", GzipDecoder())]) + cookie = CookieAgent(decode, diskStore.cookie) authenticate = AuthenticateAgent( - redirect, [diskStore.credentials, GtkAuthInterface()]) - cookie = CookieAgent(authenticate, diskStore.cookie) - decode = ContentDecoderAgent(cookie, [(b"gzip", GzipDecoder())]) - cache = CacheAgent(decode, diskStore.cache) + cookie, [diskStore.credentials, GtkAuthInterface()]) + cache = CacheAgent(authenticate, diskStore.cache) + redirect = BrowserLikeRedirectAgent(cache, limit=10) doSomeRequests(cache) """
src/twisted/web/test/test_agent.py+157 −22 modified@@ -8,7 +8,8 @@ import zlib from http.cookiejar import CookieJar from io import BytesIO -from unittest import skipIf +from typing import TYPE_CHECKING, List, Optional, Tuple +from unittest import SkipTest, skipIf from zope.interface.declarations import implementer from zope.interface.verify import verifyObject @@ -76,6 +77,15 @@ URIInjectionTestsMixin, ) +# Creatively lie to mypy about the nature of inheritance, since dealing with +# expectations of a mixin class is basically impossible (don't use mixins). +if TYPE_CHECKING: + testMixinClass = TestCase + runtimeTestCase = object +else: + testMixinClass = object + runtimeTestCase = TestCase + try: from twisted.internet import ssl as _ssl except ImportError: @@ -108,8 +118,8 @@ class StubHTTPProtocol(Protocol): request method is appended to this list. """ - def __init__(self): - self.requests = [] + def __init__(self) -> None: + self.requests: List[Tuple[Request, Deferred[IResponse]]] = [] self.state = "QUIESCENT" def request(self, request): @@ -2587,12 +2597,25 @@ def getConnection(this, key, ep): self.assertEqual(agent._pool.connected, True) -class _RedirectAgentTestsMixin: +SENSITIVE_HEADERS = [ + b"authorization", + b"cookie", + b"cookie2", + b"proxy-authorization", + b"www-authenticate", +] + + +class _RedirectAgentTestsMixin(testMixinClass): """ Test cases mixin for L{RedirectAgentTests} and L{BrowserLikeRedirectAgentTests}. """ + agent: IAgent + reactor: MemoryReactorClock + protocol: StubHTTPProtocol + def test_noRedirect(self): """ L{client.RedirectAgent} behaves like L{client.Agent} if the response @@ -2611,32 +2634,56 @@ def test_noRedirect(self): self.assertIdentical(response, result) self.assertIdentical(result.previousResponse, None) - def _testRedirectDefault(self, code): + def _testRedirectDefault( + self, + code: int, + crossScheme: bool = False, + crossDomain: bool = False, + crossPort: bool = False, + requestHeaders: Optional[Headers] = None, + ) -> Request: """ When getting a redirect, L{client.RedirectAgent} follows the URL specified in the L{Location} header field and make a new request. @param code: HTTP status code. """ - self.agent.request(b"GET", b"http://example.com/foo") + startDomain = b"example.com" + startScheme = b"https" if ssl is not None else b"http" + startPort = 80 if startScheme == b"http" else 443 + self.agent.request( + b"GET", startScheme + b"://" + startDomain + b"/foo", headers=requestHeaders + ) host, port = self.reactor.tcpClients.pop()[:2] self.assertEqual(EXAMPLE_COM_IP, host) - self.assertEqual(80, port) + self.assertEqual(startPort, port) req, res = self.protocol.requests.pop() - # If possible (i.e.: SSL support is present), run the test with a + # If possible (i.e.: TLS support is present), run the test with a # cross-scheme redirect to verify that the scheme is honored; if not, # let's just make sure it works at all. - if ssl is None: - scheme = b"http" - expectedPort = 80 - else: - scheme = b"https" - expectedPort = 443 - - headers = http_headers.Headers({b"location": [scheme + b"://example.com/bar"]}) + + targetScheme = startScheme + targetDomain = startDomain + targetPort = startPort + + if crossScheme: + if ssl is None: + raise SkipTest( + "Cross-scheme redirects can't be tested without TLS support." + ) + targetScheme = b"https" if startScheme == b"http" else b"http" + targetPort = 443 if startPort == 80 else 80 + + portSyntax = b"" + if crossPort: + targetPort = 8443 + portSyntax = b":8443" + targetDomain = b"example.net" if crossDomain else startDomain + locationValue = targetScheme + b"://" + targetDomain + portSyntax + b"/bar" + headers = http_headers.Headers({b"location": [locationValue]}) response = Response((b"HTTP", 1, 1), code, b"OK", headers, None) res.callback(response) @@ -2645,15 +2692,25 @@ def _testRedirectDefault(self, code): self.assertEqual(b"/bar", req2.uri) host, port = self.reactor.tcpClients.pop()[:2] - self.assertEqual(EXAMPLE_COM_IP, host) - self.assertEqual(expectedPort, port) + self.assertEqual(EXAMPLE_NET_IP if crossDomain else EXAMPLE_COM_IP, host) + self.assertEqual(targetPort, port) + return req2 def test_redirect301(self): """ L{client.RedirectAgent} follows redirects on status code 301. """ self._testRedirectDefault(301) + def test_redirect301Scheme(self): + """ + L{client.RedirectAgent} follows cross-scheme redirects. + """ + self._testRedirectDefault( + 301, + crossScheme=True, + ) + def test_redirect302(self): """ L{client.RedirectAgent} follows redirects on status code 302. @@ -2672,6 +2729,74 @@ def test_redirect308(self): """ self._testRedirectDefault(308) + def _sensitiveHeadersTest( + self, expectedHostHeader: bytes = b"example.com", **crossKwargs: bool + ) -> None: + """ + L{client.RedirectAgent} scrubs sensitive headers when redirecting + between differing origins. + """ + sensitiveHeaderValues = { + b"authorization": [b"sensitive-authnz"], + b"cookie": [b"sensitive-cookie-data"], + b"cookie2": [b"sensitive-cookie2-data"], + b"proxy-authorization": [b"sensitive-proxy-auth"], + b"wWw-auThentiCate": [b"sensitive-authn"], + b"x-custom-sensitive": [b"sensitive-custom"], + } + otherHeaderValues = {b"x-random-header": [b"x-random-value"]} + allHeaders = Headers({**sensitiveHeaderValues, **otherHeaderValues}) + redirected = self._testRedirectDefault(301, requestHeaders=allHeaders) + + def normHeaders(headers: Headers) -> dict: + return {k.lower(): v for (k, v) in headers.getAllRawHeaders()} + + sameOriginHeaders = normHeaders(redirected.headers) + self.assertEquals( + sameOriginHeaders, + { + b"host": [b"example.com"], + **normHeaders(allHeaders), + }, + ) + + redirectedElsewhere = self._testRedirectDefault( + 301, + **crossKwargs, + requestHeaders=Headers({**sensitiveHeaderValues, **otherHeaderValues}), + ) + otherOriginHeaders = normHeaders(redirectedElsewhere.headers) + self.assertEquals( + otherOriginHeaders, + { + b"host": [expectedHostHeader], + **normHeaders(Headers(otherHeaderValues)), + }, + ) + + def test_crossDomainHeaders(self) -> None: + """ + L{client.RedirectAgent} scrubs sensitive headers when redirecting + between differing domains. + """ + self._sensitiveHeadersTest(crossDomain=True, expectedHostHeader=b"example.net") + + def test_crossPortHeaders(self) -> None: + """ + L{client.RedirectAgent} scrubs sensitive headers when redirecting + between differing ports. + """ + self._sensitiveHeadersTest( + crossPort=True, expectedHostHeader=b"example.com:8443" + ) + + def test_crossSchemeHeaders(self) -> None: + """ + L{client.RedirectAgent} scrubs sensitive headers when redirecting + between differing schemes. + """ + self._sensitiveHeadersTest(crossScheme=True) + def _testRedirectToGet(self, code, method): """ L{client.RedirectAgent} changes the method to I{GET} when getting @@ -2878,7 +3003,10 @@ def test_responseHistory(self): class RedirectAgentTests( - TestCase, FakeReactorAndConnectMixin, _RedirectAgentTestsMixin, AgentTestsMixin + FakeReactorAndConnectMixin, + _RedirectAgentTestsMixin, + AgentTestsMixin, + runtimeTestCase, ): """ Tests for L{client.RedirectAgent}. @@ -2888,7 +3016,10 @@ def makeAgent(self): """ @return: a new L{twisted.web.client.RedirectAgent} """ - return client.RedirectAgent(self.buildAgentForWrapperTest(self.reactor)) + return client.RedirectAgent( + self.buildAgentForWrapperTest(self.reactor), + sensitiveHeaderNames=[b"X-Custom-sensitive"], + ) def setUp(self): self.reactor = self.createReactor() @@ -2912,7 +3043,10 @@ def test_302OnPost(self): class BrowserLikeRedirectAgentTests( - TestCase, FakeReactorAndConnectMixin, _RedirectAgentTestsMixin, AgentTestsMixin + FakeReactorAndConnectMixin, + _RedirectAgentTestsMixin, + AgentTestsMixin, + runtimeTestCase, ): """ Tests for L{client.BrowserLikeRedirectAgent}. @@ -2923,7 +3057,8 @@ def makeAgent(self): @return: a new L{twisted.web.client.BrowserLikeRedirectAgent} """ return client.BrowserLikeRedirectAgent( - self.buildAgentForWrapperTest(self.reactor) + self.buildAgentForWrapperTest(self.reactor), + sensitiveHeaderNames=[b"x-Custom-sensitive"], ) def setUp(self):
Vulnerability mechanics
Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
13- github.com/advisories/GHSA-92x2-jw7w-xvvxghsaADVISORY
- nvd.nist.gov/vuln/detail/CVE-2022-21712ghsaADVISORY
- github.com/pypa/advisory-database/tree/main/vulns/twisted/PYSEC-2022-27.yamlghsaWEB
- github.com/twisted/twisted/commit/af8fe78542a6f2bf2235ccee8158d9c88d31e8e2ghsax_refsource_MISCWEB
- github.com/twisted/twisted/releases/tag/twisted-22.1.0ghsax_refsource_MISCWEB
- github.com/twisted/twisted/security/advisories/GHSA-92x2-jw7w-xvvxghsax_refsource_CONFIRMWEB
- lists.debian.org/debian-lts-announce/2022/02/msg00021.htmlghsaWEB
- lists.fedoraproject.org/archives/list/package-announce%40lists.fedoraproject.org/message/7U6KYDTOLPICAVSR34G2WRYLFBD2YW5KghsaWEB
- lists.fedoraproject.org/archives/list/package-announce%40lists.fedoraproject.org/message/GLKHA6WREIVAMBQD7KKWYHPHGGNKMAG6ghsaWEB
- lists.fedoraproject.org/archives/list/package-announce@lists.fedoraproject.org/message/7U6KYDTOLPICAVSR34G2WRYLFBD2YW5KghsaWEB
- lists.fedoraproject.org/archives/list/package-announce@lists.fedoraproject.org/message/GLKHA6WREIVAMBQD7KKWYHPHGGNKMAG6ghsaWEB
- pypi.org/project/TwistedghsaWEB
- security.gentoo.org/glsa/202301-02ghsaWEB
News mentions
0No linked articles in our index yet.