VYPR
Critical severity9.1NVD Advisory· Published Apr 24, 2025· Updated Apr 15, 2026

CVE-2025-43859

CVE-2025-43859

Description

h11 is a Python implementation of HTTP/1.1. Prior to version 0.16.0, a leniency in h11's parsing of line terminators in chunked-coding message bodies can lead to request smuggling vulnerabilities under certain conditions. This issue has been patched in version 0.16.0. Since exploitation requires the combination of buggy h11 with a buggy (reverse) proxy, fixing either component is sufficient to mitigate this issue.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
h11PyPI
< 0.16.00.16.0

Patches

2
114803a29ce5

Merge commit from fork

https://github.com/python-hyper/h11Nathaniel J. SmithApr 24, 2025via ghsa
2 files changed · +51 26
  • h11/_readers.py+13 10 modified
    @@ -148,10 +148,9 @@ def read_eof(self) -> NoReturn:
     class ChunkedReader:
         def __init__(self) -> None:
             self._bytes_in_chunk = 0
    -        # After reading a chunk, we have to throw away the trailing \r\n; if
    -        # this is >0 then we discard that many bytes before resuming regular
    -        # de-chunkification.
    -        self._bytes_to_discard = 0
    +        # After reading a chunk, we have to throw away the trailing \r\n.
    +        # This tracks the bytes that we need to match and throw away.
    +        self._bytes_to_discard = b""
             self._reading_trailer = False
     
         def __call__(self, buf: ReceiveBuffer) -> Union[Data, EndOfMessage, None]:
    @@ -160,15 +159,19 @@ def __call__(self, buf: ReceiveBuffer) -> Union[Data, EndOfMessage, None]:
                 if lines is None:
                     return None
                 return EndOfMessage(headers=list(_decode_header_lines(lines)))
    -        if self._bytes_to_discard > 0:
    -            data = buf.maybe_extract_at_most(self._bytes_to_discard)
    +        if self._bytes_to_discard:
    +            data = buf.maybe_extract_at_most(len(self._bytes_to_discard))
                 if data is None:
                     return None
    -            self._bytes_to_discard -= len(data)
    -            if self._bytes_to_discard > 0:
    +            if data != self._bytes_to_discard[:len(data)]:
    +                raise LocalProtocolError(
    +                    f"malformed chunk footer: {data!r} (expected {self._bytes_to_discard!r})"
    +                )
    +            self._bytes_to_discard = self._bytes_to_discard[len(data):]
    +            if self._bytes_to_discard:
                     return None
                 # else, fall through and read some more
    -        assert self._bytes_to_discard == 0
    +        assert self._bytes_to_discard == b""
             if self._bytes_in_chunk == 0:
                 # We need to refill our chunk count
                 chunk_header = buf.maybe_extract_next_line()
    @@ -194,7 +197,7 @@ def __call__(self, buf: ReceiveBuffer) -> Union[Data, EndOfMessage, None]:
                 return None
             self._bytes_in_chunk -= len(data)
             if self._bytes_in_chunk == 0:
    -            self._bytes_to_discard = 2
    +            self._bytes_to_discard = b"\r\n"
                 chunk_end = True
             else:
                 chunk_end = False
    
  • h11/tests/test_io.py+38 16 modified
    @@ -348,22 +348,34 @@ def _run_reader(*args: Any) -> List[Event]:
         return normalize_data_events(events)
     
     
    -def t_body_reader(thunk: Any, data: bytes, expected: Any, do_eof: bool = False) -> None:
    +def t_body_reader(thunk: Any, data: bytes, expected: list, do_eof: bool = False) -> None:
         # Simple: consume whole thing
         print("Test 1")
         buf = makebuf(data)
    -    assert _run_reader(thunk(), buf, do_eof) == expected
    +    try:
    +        assert _run_reader(thunk(), buf, do_eof) == expected
    +    except LocalProtocolError:
    +        if LocalProtocolError in expected:
    +            pass
    +        else:
    +            raise
     
         # Incrementally growing buffer
         print("Test 2")
         reader = thunk()
         buf = ReceiveBuffer()
         events = []
    -    for i in range(len(data)):
    -        events += _run_reader(reader, buf, False)
    -        buf += data[i : i + 1]
    -    events += _run_reader(reader, buf, do_eof)
    -    assert normalize_data_events(events) == expected
    +    try:
    +        for i in range(len(data)):
    +            events += _run_reader(reader, buf, False)
    +            buf += data[i : i + 1]
    +        events += _run_reader(reader, buf, do_eof)
    +        assert normalize_data_events(events) == expected
    +    except LocalProtocolError:
    +        if LocalProtocolError in expected:
    +            pass
    +        else:
    +            raise
     
         is_complete = any(type(event) is EndOfMessage for event in expected)
         if is_complete and not do_eof:
    @@ -424,14 +436,12 @@ def test_ChunkedReader() -> None:
         )
     
         # refuses arbitrarily long chunk integers
    -    with pytest.raises(LocalProtocolError):
    -        # Technically this is legal HTTP/1.1, but we refuse to process chunk
    -        # sizes that don't fit into 20 characters of hex
    -        t_body_reader(ChunkedReader, b"9" * 100 + b"\r\nxxx", [Data(data=b"xxx")])
    +    # Technically this is legal HTTP/1.1, but we refuse to process chunk
    +    # sizes that don't fit into 20 characters of hex
    +    t_body_reader(ChunkedReader, b"9" * 100 + b"\r\nxxx", [LocalProtocolError])
     
         # refuses garbage in the chunk count
    -    with pytest.raises(LocalProtocolError):
    -        t_body_reader(ChunkedReader, b"10\x00\r\nxxx", None)
    +    t_body_reader(ChunkedReader, b"10\x00\r\nxxx", [LocalProtocolError])
     
         # handles (and discards) "chunk extensions" omg wtf
         t_body_reader(
    @@ -445,10 +455,22 @@ def test_ChunkedReader() -> None:
     
         t_body_reader(
             ChunkedReader,
    -        b"5   	 \r\n01234\r\n" + b"0\r\n\r\n",
    +        b"5   \t \r\n01234\r\n" + b"0\r\n\r\n",
             [Data(data=b"01234"), EndOfMessage()],
         )
     
    +    # Chunked encoding with bad chunk termination characters are refused. Originally we
    +    # simply dropped the 2 bytes after a chunk, instead of validating that the bytes
    +    # were \r\n -- so we would successfully decode the data below as b"xxxa". And
    +    # apparently there are other HTTP processors that ignore the chunk length and just
    +    # keep reading until they see \r\n, so they would decode it as b"xxx__1a". Any time
    +    # two HTTP processors accept the same input but interpret it differently, there's a
    +    # possibility of request smuggling shenanigans. So we now reject this.
    +    t_body_reader(ChunkedReader, b"3\r\nxxx__1a\r\n", [LocalProtocolError])
    +
    +    # Confirm we check both bytes individually
    +    t_body_reader(ChunkedReader, b"3\r\nxxx\r_1a\r\n", [LocalProtocolError])
    +    t_body_reader(ChunkedReader, b"3\r\nxxx_\n1a\r\n", [LocalProtocolError])
     
     def test_ContentLengthWriter() -> None:
         w = ContentLengthWriter(5)
    @@ -471,8 +493,8 @@ def test_ContentLengthWriter() -> None:
             dowrite(w, EndOfMessage())
     
         w = ContentLengthWriter(5)
    -    dowrite(w, Data(data=b"123")) == b"123"
    -    dowrite(w, Data(data=b"45")) == b"45"
    +    assert dowrite(w, Data(data=b"123")) == b"123"
    +    assert dowrite(w, Data(data=b"45")) == b"45"
         with pytest.raises(LocalProtocolError):
             dowrite(w, EndOfMessage(headers=[("Etag", "asdf")]))
     
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

4

News mentions

0

No linked articles in our index yet.