VYPR
High severity7.5NVD Advisory· Published May 1, 2024· Updated Apr 15, 2026

CVE-2024-32984

CVE-2024-32984

Description

Yamux is a stream multiplexer over reliable, ordered connections such as TCP/IP. The Rust implementation of the Yamux stream multiplexer uses a vector for pending frames. This vector is not bounded in length. Every time the Yamux protocol requires sending of a new frame, this frame gets appended to this vector. This can be remotely triggered in a number of ways, for example by: 1. Opening a new libp2p Identify stream. This causes the node to send its Identify message. Of course, every other protocol that causes the sending of data also works. The larger the response, the more data is enqueued. 2. Sending a Yamux Ping frame. This causes a Pong frame to be enqueued. Under normal circumstances, this queue of pending frames would be drained once they’re sent out over the network. However, the attacker can use TCP’s receive window mechanism to prevent the victim from sending out any data: By not reading from the TCP connection, the receive window will never be increased, and the victim won’t be able to send out any new data (this is how TCP implements backpressure). Once this happens, Yamux’s queue of pending frames will start growing indefinitely. The queue will only be drained once the underlying TCP connection is closed. An attacker can cause a remote node to run out of memory, which will result in the corresponding process getting terminated by the operating system.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
yamuxcrates.io
>= 0.13.0, < 0.13.20.13.2

Patches

3
6689e227a482

chore: introduce MAX_FRAME_BUFFER

https://github.com/sigp/rust-yamuxJoão OliveiraFeb 22, 2024via ghsa
1 file changed · +8 0
  • yamux/src/connection.rs+8 0 modified
    @@ -37,6 +37,9 @@ use std::{fmt, sync::Arc, task::Poll};
     
     pub use stream::{Packet, State, Stream};
     
    +/// Max queued frames in `Connection`.
    +const MAX_FRAME_BUFFER: usize = 2000;
    +
     /// How the connection is used.
     #[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
     pub enum Mode {
    @@ -398,6 +401,11 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
                     }
                 }
     
    +            // Receiver is not reading.
    +            if self.pending_frames.len() == MAX_FRAME_BUFFER {
    +                return Poll::Ready(Err(ConnectionError::Closed));
    +            }
    +
                 match self.socket.poll_flush_unpin(cx)? {
                     Poll::Ready(()) => {}
                     Poll::Pending => {}
    
460baf2ccb7d

Merge pull request from GHSA-3999-5ffv-wp2r

https://github.com/libp2p/rust-yamuxJoão OliveiraApr 30, 2024via ghsa
3 files changed · +86 81
  • test-harness/tests/poll_api.rs+1 1 modified
    @@ -57,7 +57,7 @@ fn concurrent_streams() {
             const PAYLOAD_SIZE: usize = 128 * 1024;
     
             let data = Msg(vec![0x42; PAYLOAD_SIZE]);
    -        let n_streams = 1000;
    +        let n_streams = 512;
     
             let mut cfg = Config::default();
             cfg.set_split_send_size(PAYLOAD_SIZE); // Use a large frame size to speed up the test.
    
  • yamux/src/connection/closing.rs+12 12 modified
    @@ -30,7 +30,7 @@ where
             socket: Fuse<frame::Io<T>>,
         ) -> Self {
             Self {
    -            state: State::ClosingStreamReceiver,
    +            state: State::FlushingPendingFrames,
                 stream_receivers,
                 pending_frames,
                 socket,
    @@ -49,6 +49,14 @@ where
     
             loop {
                 match this.state {
    +                State::FlushingPendingFrames => {
    +                    ready!(this.socket.poll_ready_unpin(cx))?;
    +
    +                    match this.pending_frames.pop_front() {
    +                        Some(frame) => this.socket.start_send_unpin(frame)?,
    +                        None => this.state = State::ClosingStreamReceiver,
    +                    }
    +                }
                     State::ClosingStreamReceiver => {
                         for stream in this.stream_receivers.iter_mut() {
                             stream.inner_mut().close();
    @@ -59,7 +67,7 @@ where
                     State::DrainingStreamReceiver => {
                         match this.stream_receivers.poll_next_unpin(cx) {
                             Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => {
    -                            this.pending_frames.push_back(frame.into())
    +                            this.pending_frames.push_back(frame.into());
                             }
                             Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
                                 this.pending_frames
    @@ -69,19 +77,11 @@ where
                             Poll::Pending | Poll::Ready(None) => {
                                 // No more frames from streams, append `Term` frame and flush them all.
                                 this.pending_frames.push_back(Frame::term().into());
    -                            this.state = State::FlushingPendingFrames;
    +                            this.state = State::ClosingSocket;
                                 continue;
                             }
                         }
                     }
    -                State::FlushingPendingFrames => {
    -                    ready!(this.socket.poll_ready_unpin(cx))?;
    -
    -                    match this.pending_frames.pop_front() {
    -                        Some(frame) => this.socket.start_send_unpin(frame)?,
    -                        None => this.state = State::ClosingSocket,
    -                    }
    -                }
                     State::ClosingSocket => {
                         ready!(this.socket.poll_close_unpin(cx))?;
     
    @@ -93,8 +93,8 @@ where
     }
     
     enum State {
    +    FlushingPendingFrames,
         ClosingStreamReceiver,
         DrainingStreamReceiver,
    -    FlushingPendingFrames,
         ClosingSocket,
     }
    
  • yamux/src/connection.rs+73 68 modified
    @@ -286,7 +286,8 @@ struct Active<T> {
         stream_receivers: SelectAll<TaggedStream<StreamId, mpsc::Receiver<StreamCommand>>>,
         no_streams_waker: Option<Waker>,
     
    -    pending_frames: VecDeque<Frame<()>>,
    +    pending_read_frame: Option<Frame<()>>,
    +    pending_write_frame: Option<Frame<()>>,
         new_outbound_stream_waker: Option<Waker>,
     
         rtt: rtt::Rtt,
    @@ -360,7 +361,8 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
                     Mode::Client => 1,
                     Mode::Server => 2,
                 },
    -            pending_frames: VecDeque::default(),
    +            pending_read_frame: None,
    +            pending_write_frame: None,
                 new_outbound_stream_waker: None,
                 rtt: rtt::Rtt::new(),
                 accumulated_max_stream_windows: Default::default(),
    @@ -369,7 +371,12 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
     
         /// Gracefully close the connection to the remote.
         fn close(self) -> Closing<T> {
    -        Closing::new(self.stream_receivers, self.pending_frames, self.socket)
    +        let pending_frames = self
    +            .pending_read_frame
    +            .into_iter()
    +            .chain(self.pending_write_frame)
    +            .collect::<VecDeque<Frame<()>>>();
    +        Closing::new(self.stream_receivers, pending_frames, self.socket)
         }
     
         /// Cleanup all our resources.
    @@ -392,7 +399,13 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
                         continue;
                     }
     
    -                if let Some(frame) = self.pending_frames.pop_front() {
    +                // Privilege pending `Pong` and `GoAway` `Frame`s
    +                // over `Frame`s from the receivers.
    +                if let Some(frame) = self
    +                    .pending_read_frame
    +                    .take()
    +                    .or_else(|| self.pending_write_frame.take())
    +                {
                         self.socket.start_send_unpin(frame)?;
                         continue;
                     }
    @@ -403,36 +416,63 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
                     Poll::Pending => {}
                 }
     
    -            match self.stream_receivers.poll_next_unpin(cx) {
    -                Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => {
    -                    self.on_send_frame(frame);
    -                    continue;
    -                }
    -                Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
    -                    self.on_close_stream(id, ack);
    -                    continue;
    -                }
    -                Poll::Ready(Some((id, None))) => {
    -                    self.on_drop_stream(id);
    -                    continue;
    -                }
    -                Poll::Ready(None) => {
    -                    self.no_streams_waker = Some(cx.waker().clone());
    +            if self.pending_write_frame.is_none() {
    +                match self.stream_receivers.poll_next_unpin(cx) {
    +                    Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => {
    +                        log::trace!(
    +                            "{}/{}: sending: {}",
    +                            self.id,
    +                            frame.header().stream_id(),
    +                            frame.header()
    +                        );
    +                        self.pending_write_frame.replace(frame.into());
    +                        continue;
    +                    }
    +                    Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => {
    +                        log::trace!("{}/{}: sending close", self.id, id);
    +                        self.pending_write_frame
    +                            .replace(Frame::close_stream(id, ack).into());
    +                        continue;
    +                    }
    +                    Poll::Ready(Some((id, None))) => {
    +                        if let Some(frame) = self.on_drop_stream(id) {
    +                            log::trace!("{}/{}: sending: {}", self.id, id, frame.header());
    +                            self.pending_write_frame.replace(frame);
    +                        };
    +                        continue;
    +                    }
    +                    Poll::Ready(None) => {
    +                        self.no_streams_waker = Some(cx.waker().clone());
    +                    }
    +                    Poll::Pending => {}
                     }
    -                Poll::Pending => {}
                 }
     
    -            match self.socket.poll_next_unpin(cx) {
    -                Poll::Ready(Some(frame)) => {
    -                    if let Some(stream) = self.on_frame(frame?)? {
    -                        return Poll::Ready(Ok(stream));
    +            if self.pending_read_frame.is_none() {
    +                match self.socket.poll_next_unpin(cx) {
    +                    Poll::Ready(Some(frame)) => {
    +                        match self.on_frame(frame?)? {
    +                            Action::None => {}
    +                            Action::New(stream) => {
    +                                log::trace!("{}: new inbound {} of {}", self.id, stream, self);
    +                                return Poll::Ready(Ok(stream));
    +                            }
    +                            Action::Ping(f) => {
    +                                log::trace!("{}/{}: pong", self.id, f.header().stream_id());
    +                                self.pending_read_frame.replace(f.into());
    +                            }
    +                            Action::Terminate(f) => {
    +                                log::trace!("{}: sending term", self.id);
    +                                self.pending_read_frame.replace(f.into());
    +                            }
    +                        }
    +                        continue;
                         }
    -                    continue;
    -                }
    -                Poll::Ready(None) => {
    -                    return Poll::Ready(Err(ConnectionError::Closed));
    +                    Poll::Ready(None) => {
    +                        return Poll::Ready(Err(ConnectionError::Closed));
    +                    }
    +                    Poll::Pending => {}
                     }
    -                Poll::Pending => {}
                 }
     
                 // If we make it this far, at least one of the above must have registered a waker.
    @@ -463,23 +503,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
             Poll::Ready(Ok(stream))
         }
     
    -    fn on_send_frame(&mut self, frame: Frame<Either<Data, WindowUpdate>>) {
    -        log::trace!(
    -            "{}/{}: sending: {}",
    -            self.id,
    -            frame.header().stream_id(),
    -            frame.header()
    -        );
    -        self.pending_frames.push_back(frame.into());
    -    }
    -
    -    fn on_close_stream(&mut self, id: StreamId, ack: bool) {
    -        log::trace!("{}/{}: sending close", self.id, id);
    -        self.pending_frames
    -            .push_back(Frame::close_stream(id, ack).into());
    -    }
    -
    -    fn on_drop_stream(&mut self, stream_id: StreamId) {
    +    fn on_drop_stream(&mut self, stream_id: StreamId) -> Option<Frame<()>> {
             let s = self.streams.remove(&stream_id).expect("stream not found");
     
             log::trace!("{}: removing dropped stream {}", self.id, stream_id);
    @@ -525,10 +549,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
                 }
                 frame
             };
    -        if let Some(f) = frame {
    -            log::trace!("{}/{}: sending: {}", self.id, stream_id, f.header());
    -            self.pending_frames.push_back(f.into());
    -        }
    +        frame.map(Into::into)
         }
     
         /// Process the result of reading from the socket.
    @@ -537,7 +558,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
         /// and return a corresponding error, which terminates the connection.
         /// Otherwise we process the frame and potentially return a new `Stream`
         /// if one was opened by the remote.
    -    fn on_frame(&mut self, frame: Frame<()>) -> Result<Option<Stream>> {
    +    fn on_frame(&mut self, frame: Frame<()>) -> Result<Action> {
             log::trace!("{}: received: {}", self.id, frame.header());
     
             if frame.header().flags().contains(header::ACK)
    @@ -560,23 +581,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
                 Tag::Ping => self.on_ping(&frame.into_ping()),
                 Tag::GoAway => return Err(ConnectionError::Closed),
             };
    -        match action {
    -            Action::None => {}
    -            Action::New(stream) => {
    -                log::trace!("{}: new inbound {} of {}", self.id, stream, self);
    -                return Ok(Some(stream));
    -            }
    -            Action::Ping(f) => {
    -                log::trace!("{}/{}: pong", self.id, f.header().stream_id());
    -                self.pending_frames.push_back(f.into());
    -            }
    -            Action::Terminate(f) => {
    -                log::trace!("{}: sending term", self.id);
    -                self.pending_frames.push_back(f.into());
    -            }
    -        }
    -
    -        Ok(None)
    +        Ok(action)
         }
     
         fn on_data(&mut self, frame: Frame<Data>) -> Action {
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

6

News mentions

0

No linked articles in our index yet.