Deno arbitrary file descriptor close via `op_node_ipc_pipe()` leading to permission prompt bypass
Description
Deno is a JavaScript, TypeScript, and WebAssembly runtime. In version 1.39.0, use of raw file descriptors in op_node_ipc_pipe() leads to premature close of arbitrary file descriptors, allowing standard input to be re-opened as a different resource resulting in permission prompt bypass. Node child_process IPC relies on the JS side to pass the raw IPC file descriptor to op_node_ipc_pipe(), which returns a IpcJsonStreamResource ID associated with the file descriptor. On closing the resource, the raw file descriptor is closed together.
Use of raw file descriptors in op_node_ipc_pipe() leads to premature close of arbitrary file descriptors. This allow standard input (fd 0) to be closed and re-opened for a different resource, which allows a silent permission prompt bypass. This is exploitable by an attacker controlling the code executed inside a Deno runtime to obtain arbitrary code execution on the host machine regardless of permissions.
This bug is known to be exploitable. There is a working exploit that achieves arbitrary code execution by bypassing prompts from zero permissions, additionally abusing the fact that Cache API lacks filesystem permission checks. The attack can be conducted silently as stderr can also be closed, suppressing all prompt outputs.
Version 1.39.1 fixes the bug.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
denocrates.io | >= 1.39.0, < 1.39.1 | 1.39.1 |
Affected products
1Patches
255fac9f5ead6fix(node): child_process IPC on Windows (#21597)
10 files changed · +258 −93
Cargo.lock+2 −0 modified@@ -1543,6 +1543,7 @@ dependencies = [ "typenum", "url", "winapi", + "windows-sys 0.48.0", "x25519-dalek", "x509-parser", ] @@ -1635,6 +1636,7 @@ dependencies = [ "uuid", "which", "winapi", + "windows-sys 0.48.0", "winres", ]
cli/args/mod.rs+2 −2 modified@@ -917,12 +917,12 @@ impl CliOptions { .map(Some) } - pub fn node_ipc_fd(&self) -> Option<i32> { + pub fn node_ipc_fd(&self) -> Option<i64> { let maybe_node_channel_fd = std::env::var("DENO_CHANNEL_FD").ok(); if let Some(node_channel_fd) = maybe_node_channel_fd { // Remove so that child processes don't inherit this environment variable. std::env::remove_var("DENO_CHANNEL_FD"); - node_channel_fd.parse::<i32>().ok() + node_channel_fd.parse::<i64>().ok() } else { None }
cli/worker.rs+2 −2 modified@@ -124,7 +124,7 @@ struct SharedWorkerState { maybe_inspector_server: Option<Arc<InspectorServer>>, maybe_lockfile: Option<Arc<Mutex<Lockfile>>>, feature_checker: Arc<FeatureChecker>, - node_ipc: Option<i32>, + node_ipc: Option<i64>, } impl SharedWorkerState { @@ -404,7 +404,7 @@ impl CliMainWorkerFactory { maybe_lockfile: Option<Arc<Mutex<Lockfile>>>, feature_checker: Arc<FeatureChecker>, options: CliMainWorkerOptions, - node_ipc: Option<i32>, + node_ipc: Option<i64>, ) -> Self { Self { shared: Arc::new(SharedWorkerState {
ext/node/Cargo.toml+4 −0 modified@@ -74,3 +74,7 @@ url.workspace = true winapi.workspace = true x25519-dalek = "2.0.0" x509-parser = "0.15.0" + +[target.'cfg(windows)'.dependencies] +windows-sys.workspace = true +winapi = { workspace = true, features = ["consoleapi"] }
ext/node/lib.rs+1 −1 modified@@ -31,6 +31,7 @@ mod polyfill; mod resolution; pub use ops::ipc::ChildPipeFd; +pub use ops::ipc::IpcJsonStreamResource; pub use ops::v8::VM_CONTEXT_INDEX; pub use package_json::PackageJson; pub use path::PathClean; @@ -307,7 +308,6 @@ deno_core::extension!(deno_node, ops::require::op_require_break_on_next_statement, ops::util::op_node_guess_handle_type, ops::crypto::op_node_create_private_key, - ops::ipc::op_node_ipc_pipe, ops::ipc::op_node_child_ipc_pipe, ops::ipc::op_node_ipc_write, ops::ipc::op_node_ipc_read,
ext/node/ops/ipc.rs+110 −70 modified@@ -1,20 +1,17 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -#[cfg(unix)] -pub use unix::*; +pub use impl_::*; -#[cfg(windows)] -pub use windows::*; +pub struct ChildPipeFd(pub i64); -pub struct ChildPipeFd(pub i32); - -#[cfg(unix)] -mod unix { +mod impl_ { use std::cell::RefCell; use std::future::Future; use std::io; use std::mem; + #[cfg(unix)] use std::os::fd::FromRawFd; + #[cfg(unix)] use std::os::fd::RawFd; use std::pin::Pin; use std::rc::Rc; @@ -35,18 +32,16 @@ mod unix { use tokio::io::AsyncBufRead; use tokio::io::AsyncWriteExt; use tokio::io::BufReader; + + #[cfg(unix)] use tokio::net::unix::OwnedReadHalf; + #[cfg(unix)] use tokio::net::unix::OwnedWriteHalf; + #[cfg(unix)] use tokio::net::UnixStream; - #[op2(fast)] - #[smi] - pub fn op_node_ipc_pipe( - state: &mut OpState, - #[smi] fd: i32, - ) -> Result<ResourceId, AnyError> { - Ok(state.resource_table.add(IpcJsonStreamResource::new(fd)?)) - } + #[cfg(windows)] + type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient; // Open IPC pipe from bootstrap options. #[op2] @@ -97,9 +92,12 @@ mod unix { Ok(msgs) } - struct IpcJsonStreamResource { + pub struct IpcJsonStreamResource { read_half: AsyncRefCell<IpcJsonStream>, + #[cfg(unix)] write_half: AsyncRefCell<OwnedWriteHalf>, + #[cfg(windows)] + write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>, cancel: Rc<CancelHandle>, } @@ -109,23 +107,45 @@ mod unix { } } + #[cfg(unix)] + fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> { + // Safety: The fd is part of a pair of connected sockets create by child process + // implementation. + let unix_stream = UnixStream::from_std(unsafe { + std::os::unix::net::UnixStream::from_raw_fd(stream) + })?; + Ok(unix_stream.into_split()) + } + + #[cfg(windows)] + fn pipe( + handle: i64, + ) -> Result< + ( + tokio::io::ReadHalf<NamedPipeClient>, + tokio::io::WriteHalf<NamedPipeClient>, + ), + io::Error, + > { + // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the + // fd handle map will be the same. + let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? }; + Ok(tokio::io::split(pipe)) + } + impl IpcJsonStreamResource { - fn new(stream: RawFd) -> Result<Self, std::io::Error> { - // Safety: The fd is part of a pair of connected sockets create by child process - // implementation. - let unix_stream = UnixStream::from_std(unsafe { - std::os::unix::net::UnixStream::from_raw_fd(stream) - })?; - let (read_half, write_half) = unix_stream.into_split(); + pub fn new(stream: i64) -> Result<Self, std::io::Error> { + let (read_half, write_half) = pipe(stream as _)?; Ok(Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), write_half: AsyncRefCell::new(write_half), cancel: Default::default(), }) } + #[cfg(unix)] #[cfg(test)] - fn from_unix_stream(stream: UnixStream) -> Self { + fn from_stream(stream: UnixStream) -> Self { let (read_half, write_half) = stream.into_split(); Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), @@ -134,6 +154,17 @@ mod unix { } } + #[cfg(windows)] + #[cfg(test)] + fn from_stream(pipe: NamedPipeClient) -> Self { + let (read_half, write_half) = tokio::io::split(pipe); + Self { + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), + write_half: AsyncRefCell::new(write_half), + cancel: Default::default(), + } + } + async fn write_msg( self: Rc<Self>, msg: serde_json::Value, @@ -172,18 +203,30 @@ mod unix { // // `\n` is used as a delimiter between messages. struct IpcJsonStream { + #[cfg(unix)] pipe: BufReader<OwnedReadHalf>, + #[cfg(windows)] + pipe: BufReader<tokio::io::ReadHalf<NamedPipeClient>>, buffer: Vec<u8>, } impl IpcJsonStream { + #[cfg(unix)] fn new(pipe: OwnedReadHalf) -> Self { Self { pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), buffer: Vec::with_capacity(INITIAL_CAPACITY), } } + #[cfg(windows)] + fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self { + Self { + pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), + buffer: Vec::with_capacity(INITIAL_CAPACITY), + } + } + async fn read_msg(&mut self) -> Result<serde_json::Value, AnyError> { let mut json = None; let nread = @@ -252,7 +295,6 @@ mod unix { std::task::Poll::Ready(t) => t?, std::task::Poll::Pending => return std::task::Poll::Pending, }; - if let Some(i) = memchr(b'\n', available) { if *read == 0 { // Fast path: parse and put into the json slot directly. @@ -366,6 +408,35 @@ mod unix { use deno_core::RcRef; use std::rc::Rc; + #[cfg(unix)] + pub async fn pair() -> (Rc<IpcJsonStreamResource>, tokio::net::UnixStream) { + let (a, b) = tokio::net::UnixStream::pair().unwrap(); + + /* Similar to how ops would use the resource */ + let a = Rc::new(IpcJsonStreamResource::from_stream(a)); + (a, b) + } + + #[cfg(windows)] + pub async fn pair() -> ( + Rc<IpcJsonStreamResource>, + tokio::net::windows::named_pipe::NamedPipeServer, + ) { + use tokio::net::windows::named_pipe::ClientOptions; + use tokio::net::windows::named_pipe::ServerOptions; + + let name = + format!(r"\\.\pipe\deno-named-pipe-test-{}", rand::random::<u32>()); + + let server = ServerOptions::new().create(name.clone()).unwrap(); + let client = ClientOptions::new().open(name).unwrap(); + + server.connect().await.unwrap(); + /* Similar to how ops would use the resource */ + let client = Rc::new(IpcJsonStreamResource::from_stream(client)); + (client, server) + } + #[tokio::test] async fn bench_ipc() -> Result<(), Box<dyn std::error::Error>> { // A simple round trip benchmark for quick dev feedback. @@ -375,7 +446,7 @@ mod unix { return Ok(()); } - let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let (ipc, mut fd2) = pair().await; let child = tokio::spawn(async move { use tokio::io::AsyncWriteExt; @@ -389,8 +460,6 @@ mod unix { Ok::<_, std::io::Error>(()) }); - let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); - let start = std::time::Instant::now(); let mut bytes = 0; @@ -416,21 +485,20 @@ mod unix { #[tokio::test] async fn unix_ipc_json() -> Result<(), Box<dyn std::error::Error>> { - let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let (ipc, mut fd2) = pair().await; let child = tokio::spawn(async move { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; - let mut buf = [0u8; 1024]; - let n = fd2.read(&mut buf).await?; - assert_eq!(&buf[..n], b"\"hello\"\n"); + const EXPECTED: &[u8] = b"\"hello\"\n"; + let mut buf = [0u8; EXPECTED.len()]; + let n = fd2.read_exact(&mut buf).await?; + assert_eq!(&buf[..n], EXPECTED); fd2.write_all(b"\"world\"\n").await?; + Ok::<_, std::io::Error>(()) }); - /* Similar to how ops would use the resource */ - let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); - ipc.clone().write_msg(json!("hello")).await?; let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; @@ -444,19 +512,19 @@ mod unix { #[tokio::test] async fn unix_ipc_json_multi() -> Result<(), Box<dyn std::error::Error>> { - let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let (ipc, mut fd2) = pair().await; let child = tokio::spawn(async move { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; - let mut buf = [0u8; 1024]; - let n = fd2.read(&mut buf).await?; - assert_eq!(&buf[..n], b"\"hello\"\n\"world\"\n"); + const EXPECTED: &[u8] = b"\"hello\"\n\"world\"\n"; + let mut buf = [0u8; EXPECTED.len()]; + let n = fd2.read_exact(&mut buf).await?; + assert_eq!(&buf[..n], EXPECTED); fd2.write_all(b"\"foo\"\n\"bar\"\n").await?; Ok::<_, std::io::Error>(()) }); - let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); ipc.clone().write_msg(json!("hello")).await?; ipc.clone().write_msg(json!("world")).await?; @@ -471,13 +539,12 @@ mod unix { #[tokio::test] async fn unix_ipc_json_invalid() -> Result<(), Box<dyn std::error::Error>> { - let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let (ipc, mut fd2) = pair().await; let child = tokio::spawn(async move { tokio::io::AsyncWriteExt::write_all(&mut fd2, b"\n\n").await?; Ok::<_, std::io::Error>(()) }); - let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; let _err = ipc.read_msg().await.unwrap_err(); @@ -499,30 +566,3 @@ mod unix { } } } - -#[cfg(windows)] -mod windows { - use deno_core::error::AnyError; - use deno_core::op2; - - #[op2(fast)] - pub fn op_node_ipc_pipe() -> Result<(), AnyError> { - Err(deno_core::error::not_supported()) - } - - #[op2(fast)] - #[smi] - pub fn op_node_child_ipc_pipe() -> Result<i32, AnyError> { - Ok(-1) - } - - #[op2(async)] - pub async fn op_node_ipc_write() -> Result<(), AnyError> { - Err(deno_core::error::not_supported()) - } - - #[op2(async)] - pub async fn op_node_ipc_read() -> Result<(), AnyError> { - Err(deno_core::error::not_supported()) - } -}
ext/node/polyfills/internal/child_process.ts+1 −4 modified@@ -45,7 +45,6 @@ import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs"; import process from "node:process"; const core = globalThis.__bootstrap.core; -const ops = core.ops; export function mapValues<T, O>( record: Readonly<Record<string, T>>, @@ -1069,9 +1068,7 @@ function toDenoArgs(args: string[]): string[] { return denoArgs; } -export function setupChannel(target, channel) { - const ipc = ops.op_node_ipc_pipe(channel); - +export function setupChannel(target, ipc) { async function readLoop() { try { while (true) {
runtime/Cargo.toml+1 −0 modified@@ -122,6 +122,7 @@ which = "4.2.5" fwdansi.workspace = true winapi = { workspace = true, features = ["commapi", "knownfolders", "mswsock", "objbase", "psapi", "shlobj", "tlhelp32", "winbase", "winerror", "winuser", "winsock2"] } ntapi = "0.4.0" +windows-sys.workspace = true [target.'cfg(unix)'.dependencies] nix.workspace = true
runtime/ops/process.rs+134 −13 modified@@ -141,7 +141,6 @@ pub struct SpawnArgs { uid: Option<u32>, #[cfg(windows)] windows_raw_arguments: bool, - #[cfg(unix)] ipc: Option<i32>, #[serde(flatten)] @@ -207,12 +206,7 @@ pub struct SpawnOutput { stderr: Option<ToJsBuffer>, } -type CreateCommand = ( - std::process::Command, - // TODO(@littledivy): Ideally this would return Option<ResourceId> but we are dealing with file descriptors - // all the way until setupChannel which makes it easier to share code between parent and child fork. - Option<i32>, -); +type CreateCommand = (std::process::Command, Option<ResourceId>); fn create_command( state: &mut OpState, @@ -337,17 +331,144 @@ fn create_command( }); /* One end returned to parent process (this) */ - let pipe_fd = Some(fd1); + let pipe_rid = Some( + state + .resource_table + .add(deno_node::IpcJsonStreamResource::new(fd1 as _)?), + ); /* The other end passed to child process via DENO_CHANNEL_FD */ command.env("DENO_CHANNEL_FD", format!("{}", ipc)); - return Ok((command, pipe_fd)); + return Ok((command, pipe_rid)); } Ok((command, None)) } + #[cfg(windows)] + // Safety: We setup a windows named pipe and pass one end to the child process. + unsafe { + use windows_sys::Win32::Foundation::CloseHandle; + use windows_sys::Win32::Foundation::DuplicateHandle; + use windows_sys::Win32::Foundation::DUPLICATE_SAME_ACCESS; + use windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED; + use windows_sys::Win32::Foundation::ERROR_PIPE_CONNECTED; + use windows_sys::Win32::Foundation::GENERIC_READ; + use windows_sys::Win32::Foundation::GENERIC_WRITE; + use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE; + use windows_sys::Win32::Security::SECURITY_ATTRIBUTES; + use windows_sys::Win32::Storage::FileSystem::CreateFileW; + use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_FIRST_PIPE_INSTANCE; + use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED; + use windows_sys::Win32::Storage::FileSystem::OPEN_EXISTING; + use windows_sys::Win32::Storage::FileSystem::PIPE_ACCESS_DUPLEX; + use windows_sys::Win32::System::Pipes::ConnectNamedPipe; + use windows_sys::Win32::System::Pipes::CreateNamedPipeW; + use windows_sys::Win32::System::Pipes::PIPE_READMODE_BYTE; + use windows_sys::Win32::System::Pipes::PIPE_TYPE_BYTE; + use windows_sys::Win32::System::Threading::GetCurrentProcess; + + use std::io; + use std::os::windows::ffi::OsStrExt; + use std::path::Path; + use std::ptr; + + if let Some(ipc) = args.ipc { + if ipc < 0 { + return Ok((command, None)); + } + + let (path, hd1) = loop { + let name = format!("\\\\.\\pipe\\{}", uuid::Uuid::new_v4()); + let mut path = Path::new(&name) + .as_os_str() + .encode_wide() + .collect::<Vec<_>>(); + path.push(0); + + let hd1 = CreateNamedPipeW( + path.as_ptr(), + PIPE_ACCESS_DUPLEX + | FILE_FLAG_FIRST_PIPE_INSTANCE + | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, + 1, + 65536, + 65536, + 0, + std::ptr::null_mut(), + ); + + if hd1 == INVALID_HANDLE_VALUE { + let err = io::Error::last_os_error(); + /* If the pipe name is already in use, try again. */ + if err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32) { + continue; + } + + return Err(err.into()); + } + + break (path, hd1); + }; + + /* Create child pipe handle. */ + let s = SECURITY_ATTRIBUTES { + nLength: std::mem::size_of::<SECURITY_ATTRIBUTES>() as u32, + lpSecurityDescriptor: ptr::null_mut(), + bInheritHandle: 1, + }; + let mut hd2 = CreateFileW( + path.as_ptr(), + GENERIC_READ | GENERIC_WRITE, + 0, + &s, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + 0, + ); + if hd2 == INVALID_HANDLE_VALUE { + return Err(io::Error::last_os_error().into()); + } + + // Will not block because we have create the pair. + if ConnectNamedPipe(hd1, ptr::null_mut()) == 0 { + let err = std::io::Error::last_os_error(); + if err.raw_os_error() != Some(ERROR_PIPE_CONNECTED as i32) { + CloseHandle(hd2); + return Err(err.into()); + } + } + + // Duplicating the handle to allow the child process to use it. + if DuplicateHandle( + GetCurrentProcess(), + hd2, + GetCurrentProcess(), + &mut hd2, + 0, + 1, + DUPLICATE_SAME_ACCESS, + ) == 0 + { + return Err(std::io::Error::last_os_error().into()); + } + + /* One end returned to parent process (this) */ + let pipe_fd = Some( + state + .resource_table + .add(deno_node::IpcJsonStreamResource::new(hd1 as i64)?), + ); + + /* The other end passed to child process via DENO_CHANNEL_FD */ + command.env("DENO_CHANNEL_FD", format!("{}", hd2 as i64)); + + return Ok((command, pipe_fd)); + } + } + #[cfg(not(unix))] return Ok((command, None)); } @@ -360,13 +481,13 @@ struct Child { stdin_rid: Option<ResourceId>, stdout_rid: Option<ResourceId>, stderr_rid: Option<ResourceId>, - pipe_fd: Option<i32>, + pipe_fd: Option<ResourceId>, } fn spawn_child( state: &mut OpState, command: std::process::Command, - pipe_fd: Option<i32>, + pipe_fd: Option<ResourceId>, ) -> Result<Child, AnyError> { let mut command = tokio::process::Command::from(command); // TODO(@crowlkats): allow detaching processes. @@ -459,8 +580,8 @@ fn op_spawn_child( #[serde] args: SpawnArgs, #[string] api_name: String, ) -> Result<Child, AnyError> { - let (command, pipe_fd) = create_command(state, args, &api_name)?; - spawn_child(state, command, pipe_fd) + let (command, pipe_rid) = create_command(state, args, &api_name)?; + spawn_child(state, command, pipe_rid) } #[op2(async)]
runtime/worker_bootstrap.rs+1 −1 modified@@ -59,7 +59,7 @@ pub struct BootstrapOptions { pub inspect: bool, pub has_node_modules_dir: bool, pub maybe_binary_npm_command_name: Option<String>, - pub node_ipc_fd: Option<i32>, + pub node_ipc_fd: Option<i64>, } impl Default for BootstrapOptions {
5a91a065b882fix: implement child_process IPC (#21490)
22 files changed · +1158 −32
Cargo.lock+145 −0 modified@@ -1508,6 +1508,7 @@ dependencies = [ "libz-sys", "md-5", "md4", + "nix 0.26.2", "num-bigint", "num-bigint-dig", "num-integer", @@ -1518,6 +1519,7 @@ dependencies = [ "p384", "path-clean", "pbkdf2", + "pin-project-lite", "rand", "regex", "reqwest", @@ -1529,6 +1531,7 @@ dependencies = [ "sha-1", "sha2", "signature", + "simd-json", "tokio", "typenum", "url", @@ -2405,6 +2408,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +dependencies = [ + "num-traits", +] + [[package]] name = "fly-accept-encoding" version = "0.2.0" @@ -2617,8 +2629,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -2767,12 +2781,31 @@ dependencies = [ "tracing", ] +[[package]] +name = "halfbrown" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5681137554ddff44396e5f149892c769d45301dd9aa19c51602a89ee214cb0ec" +dependencies = [ + "hashbrown 0.13.2", + "serde", +] + [[package]] name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +dependencies = [ + "ahash", +] + [[package]] name = "hashbrown" version = "0.14.3" @@ -3292,6 +3325,70 @@ dependencies = [ "spin 0.5.2", ] +[[package]] +name = "lexical-core" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" +dependencies = [ + "lexical-parse-integer", + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-parse-integer" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" +dependencies = [ + "lexical-util", + "static_assertions", +] + +[[package]] +name = "lexical-util" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5255b9ff16ff898710eb9eb63cb39248ea8a5bb036bea8085b1a767ff6c4e3fc" +dependencies = [ + "static_assertions", +] + +[[package]] +name = "lexical-write-float" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" +dependencies = [ + "lexical-util", + "lexical-write-integer", + "static_assertions", +] + +[[package]] +name = "lexical-write-integer" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" +dependencies = [ + "lexical-util", + "static_assertions", +] + [[package]] name = "libc" version = "0.2.150" @@ -4437,6 +4534,26 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "ref-cast" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acde58d073e9c79da00f2b5b84eed919c8326832648a5b109b3fce1bb1175280" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7473c2cfcf90008193dd0e3e16599455cb601a9fce322b5bb55de799664925" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "regex" version = "1.10.2" @@ -5036,6 +5153,22 @@ dependencies = [ "rand_core", ] +[[package]] +name = "simd-json" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5a3720326b20bf5b95b72dbbd133caae7e0dcf71eae8f6e6656e71a7e5c9aaa" +dependencies = [ + "getrandom", + "halfbrown", + "lexical-core", + "ref-cast", + "serde", + "serde_json", + "simdutf8", + "value-trait", +] + [[package]] name = "simdutf8" version = "0.1.4" @@ -6388,6 +6521,18 @@ dependencies = [ "which", ] +[[package]] +name = "value-trait" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea87257cfcbedcb9444eda79c59fdfea71217e6305afee8ee33f500375c2ac97" +dependencies = [ + "float-cmp", + "halfbrown", + "itoa", + "ryu", +] + [[package]] name = "vcpkg" version = "0.2.15"
cli/args/mod.rs+11 −0 modified@@ -939,6 +939,17 @@ impl CliOptions { .map(Some) } + pub fn node_ipc_fd(&self) -> Option<i32> { + let maybe_node_channel_fd = std::env::var("DENO_CHANNEL_FD").ok(); + if let Some(node_channel_fd) = maybe_node_channel_fd { + // Remove so that child processes don't inherit this environment variable. + std::env::remove_var("DENO_CHANNEL_FD"); + node_channel_fd.parse::<i32>().ok() + } else { + None + } + } + pub fn resolve_main_module(&self) -> Result<ModuleSpecifier, AnyError> { match &self.flags.subcommand { DenoSubcommand::Bundle(bundle_flags) => {
cli/factory.rs+1 −0 modified@@ -672,6 +672,7 @@ impl CliFactory { self.maybe_lockfile().clone(), self.feature_checker().clone(), self.create_cli_main_worker_options()?, + self.options.node_ipc_fd(), )) }
cli/standalone/mod.rs+1 −0 modified@@ -530,6 +530,7 @@ pub async fn run( unstable: metadata.unstable, maybe_root_package_json_deps: package_json_deps_provider.deps().cloned(), }, + None, ); v8_set_flags(construct_v8_flags(&[], &metadata.v8_flags, vec![]));
cli/tests/node_compat/config.jsonc+7 −11 modified@@ -38,6 +38,10 @@ "test-child-process-execfile.js", "test-child-process-execsync-maxbuf.js", "test-child-process-exit-code.js", + // TODO(littledivy): windows ipc streams not yet implemented + "test-child-process-fork-ref.js", + "test-child-process-fork-ref2.js", + "test-child-process-ipc-next-tick.js", "test-child-process-ipc.js", "test-child-process-spawnsync-env.js", "test-child-process-stdio-inherit.js", @@ -109,9 +113,7 @@ "test-zlib-zero-windowBits.js" ], "pummel": [], - "sequential": [ - "test-child-process-exit.js" - ] + "sequential": ["test-child-process-exit.js"] }, "tests": { "common": [ @@ -138,11 +140,7 @@ "print-chars.js", "x.txt" ], - "fixtures/keys": [ - "agent1-cert.pem", - "agent1-key.pem", - "ca1-cert.pem" - ], + "fixtures/keys": ["agent1-cert.pem", "agent1-key.pem", "ca1-cert.pem"], "internet": [ "test-dns-any.js", "test-dns-idna2008.js", @@ -695,9 +693,7 @@ "test-tty-stdout-end.js" ], "pummel": [], - "sequential": [ - "test-child-process-exit.js" - ] + "sequential": ["test-child-process-exit.js"] }, "windowsIgnore": { "parallel": [
cli/tests/node_compat/test/parallel/test-child-process-fork-ref2.js+63 −0 added@@ -0,0 +1,63 @@ +// deno-fmt-ignore-file +// deno-lint-ignore-file + +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// Taken from Node 18.12.1 +// This file is automatically generated by `tools/node_compat/setup.ts`. Do not modify this file manually. + +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +'use strict'; + +// Ignore on Windows. +if (process.platform === 'win32') { + process.exit(0); +} + +const { + mustCall, + mustNotCall, + platformTimeout, +} = require('../common'); +const fork = require('child_process').fork; +const debug = require('util').debuglog('test'); + +if (process.argv[2] === 'child') { + debug('child -> call disconnect'); + process.disconnect(); + + setTimeout(() => { + debug('child -> will this keep it alive?'); + process.on('message', mustNotCall()); + }, platformTimeout(400)); + +} else { + const child = fork(__filename, ['child']); + + child.on('disconnect', mustCall(() => { + debug('parent -> disconnect'); + })); + + child.once('exit', mustCall(() => { + debug('parent -> exit'); + })); +}
cli/tests/node_compat/test/parallel/test-child-process-fork-ref.js+72 −0 added@@ -0,0 +1,72 @@ +// deno-fmt-ignore-file +// deno-lint-ignore-file + +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// Taken from Node 18.12.1 +// This file is automatically generated by `tools/node_compat/setup.ts`. Do not modify this file manually. + +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +'use strict'; + +// Ignore on Windows. +if (process.platform === 'win32') { + process.exit(0); +} + +require('../common'); +const assert = require('assert'); +const fork = require('child_process').fork; + +if (process.argv[2] === 'child') { + process.send('1'); + + // Check that child don't instantly die + setTimeout(function() { + process.send('2'); + }, 200); + + process.on('disconnect', function() { + process.stdout.write('3'); + }); + +} else { + const child = fork(__filename, ['child'], { silent: true }); + + const ipc = []; + let stdout = ''; + + child.on('message', function(msg) { + ipc.push(msg); + + if (msg === '2') child.disconnect(); + }); + + child.stdout.on('data', function(chunk) { + stdout += chunk; + }); + + child.once('exit', function() { + assert.deepStrictEqual(ipc, ['1', '2']); + assert.strictEqual(stdout, '3'); + }); +}
cli/tests/node_compat/test/parallel/test-child-process-ipc-next-tick.js+52 −0 added@@ -0,0 +1,52 @@ +// deno-fmt-ignore-file +// deno-lint-ignore-file + +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// Taken from Node 18.12.1 +// This file is automatically generated by `tools/node_compat/setup.ts`. Do not modify this file manually. + +'use strict'; + +// Ignore on Windows. +if (process.platform === 'win32') { + process.exit(0); +} + +const common = require('../common'); +const assert = require('assert'); +const cp = require('child_process'); +const NUM_MESSAGES = 10; +const values = []; + +for (let i = 0; i < NUM_MESSAGES; ++i) { + values[i] = i; +} + +if (process.argv[2] === 'child') { + const received = values.map(() => { return false; }); + + process.on('uncaughtException', common.mustCall((err) => { + received[err] = true; + const done = received.every((element) => { return element === true; }); + + if (done) + process.disconnect(); + }, NUM_MESSAGES)); + + process.on('message', (msg) => { + // If messages are handled synchronously, throwing should break the IPC + // message processing. + throw msg; + }); + + process.send('ready'); +} else { + const child = cp.fork(__filename, ['child']); + + child.on('message', common.mustCall((msg) => { + assert.strictEqual(msg, 'ready'); + values.forEach((value) => { + child.send(value); + }); + })); +}
cli/worker.rs+5 −0 modified@@ -124,6 +124,7 @@ struct SharedWorkerState { maybe_inspector_server: Option<Arc<InspectorServer>>, maybe_lockfile: Option<Arc<Mutex<Lockfile>>>, feature_checker: Arc<FeatureChecker>, + node_ipc: Option<i32>, } impl SharedWorkerState { @@ -415,6 +416,7 @@ impl CliMainWorkerFactory { maybe_lockfile: Option<Arc<Mutex<Lockfile>>>, feature_checker: Arc<FeatureChecker>, options: CliMainWorkerOptions, + node_ipc: Option<i32>, ) -> Self { Self { shared: Arc::new(SharedWorkerState { @@ -435,6 +437,7 @@ impl CliMainWorkerFactory { maybe_inspector_server, maybe_lockfile, feature_checker, + node_ipc, }), } } @@ -596,6 +599,7 @@ impl CliMainWorkerFactory { .options .maybe_binary_npm_command_name .clone(), + node_ipc_fd: shared.node_ipc, }, extensions: custom_extensions, startup_snapshot: crate::js::deno_isolate_init(), @@ -793,6 +797,7 @@ fn create_web_worker_callback( .options .maybe_binary_npm_command_name .clone(), + node_ipc_fd: None, }, extensions: vec![], startup_snapshot: crate::js::deno_isolate_init(),
ext/node/benchmarks/child_process_ipc.mjs+64 −0 added@@ -0,0 +1,64 @@ +import { fork } from "node:child_process"; +import process from "node:process"; +import { setImmediate } from "node:timers"; + +if (process.env.CHILD) { + const len = +process.env.CHILD; + const msg = ".".repeat(len); + const send = () => { + while (process.send(msg)); + // Wait: backlog of unsent messages exceeds threshold + setImmediate(send); + }; + send(); +} else { + function main(dur, len) { + const p = new Promise((resolve) => { + const start = performance.now(); + + const options = { + "stdio": ["inherit", "inherit", "inherit", "ipc"], + "env": { "CHILD": len.toString() }, + }; + const path = new URL("child_process_ipc.mjs", import.meta.url).pathname; + const child = fork( + path, + options, + ); + + let bytes = 0; + let total = 0; + child.on("message", (msg) => { + bytes += msg.length; + total += 1; + }); + + setTimeout(() => { + child.kill(); + const end = performance.now(); + const mb = bytes / 1024 / 1024; + const sec = (end - start) / 1000; + const mbps = mb / sec; + console.log(`${len} bytes: ${mbps.toFixed(2)} MB/s`); + console.log(`${total} messages`); + resolve(); + }, dur * 1000); + }); + return p; + } + + const len = [ + 64, + 256, + 1024, + 4096, + 16384, + 65536, + 65536 << 4, + 65536 << 6 - 1, + ]; + + for (const l of len) { + await main(5, l); + } +}
ext/node/Cargo.toml+3 −0 modified@@ -44,6 +44,7 @@ libc.workspace = true libz-sys.workspace = true md-5 = "0.10.5" md4 = "0.10.2" +nix.workspace = true num-bigint.workspace = true num-bigint-dig = "0.8.2" num-integer = "0.1.45" @@ -54,6 +55,7 @@ p256.workspace = true p384.workspace = true path-clean = "=0.1.0" pbkdf2 = "0.12.1" +pin-project-lite = "0.2.13" rand.workspace = true regex.workspace = true reqwest.workspace = true @@ -65,6 +67,7 @@ serde = "1.0.149" sha-1 = "0.10.0" sha2.workspace = true signature.workspace = true +simd-json = "0.13.4" tokio.workspace = true typenum = "1.15.0" url.workspace = true
ext/node/lib.rs+3 −0 modified@@ -312,6 +312,9 @@ deno_core::extension!(deno_node, ops::require::op_require_break_on_next_statement, ops::util::op_node_guess_handle_type, ops::crypto::op_node_create_private_key, + ops::ipc::op_node_ipc_pipe, + ops::ipc::op_node_ipc_write, + ops::ipc::op_node_ipc_read, ], esm_entry_point = "ext:deno_node/02_init.js", esm = [
ext/node/ops/ipc.rs+504 −0 added@@ -0,0 +1,504 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +#[cfg(unix)] +pub use unix::*; + +#[cfg(windows)] +pub use windows::*; + +#[cfg(unix)] +mod unix { + use std::cell::RefCell; + use std::future::Future; + use std::io; + use std::mem; + use std::os::fd::FromRawFd; + use std::os::fd::RawFd; + use std::pin::Pin; + use std::rc::Rc; + use std::task::Context; + use std::task::Poll; + + use deno_core::error::bad_resource_id; + use deno_core::error::AnyError; + use deno_core::op2; + use deno_core::serde_json; + use deno_core::AsyncRefCell; + use deno_core::CancelFuture; + use deno_core::CancelHandle; + use deno_core::OpState; + use deno_core::RcRef; + use deno_core::ResourceId; + use pin_project_lite::pin_project; + use tokio::io::AsyncBufRead; + use tokio::io::AsyncWriteExt; + use tokio::io::BufReader; + use tokio::net::unix::OwnedReadHalf; + use tokio::net::unix::OwnedWriteHalf; + use tokio::net::UnixStream; + + #[op2(fast)] + #[smi] + pub fn op_node_ipc_pipe( + state: &mut OpState, + #[smi] fd: i32, + ) -> Result<ResourceId, AnyError> { + Ok(state.resource_table.add(IpcJsonStreamResource::new(fd)?)) + } + + #[op2(async)] + pub async fn op_node_ipc_write( + state: Rc<RefCell<OpState>>, + #[smi] rid: ResourceId, + #[serde] value: serde_json::Value, + ) -> Result<(), AnyError> { + let stream = state + .borrow() + .resource_table + .get::<IpcJsonStreamResource>(rid) + .map_err(|_| bad_resource_id())?; + stream.write_msg(value).await?; + Ok(()) + } + + #[op2(async)] + #[serde] + pub async fn op_node_ipc_read( + state: Rc<RefCell<OpState>>, + #[smi] rid: ResourceId, + ) -> Result<serde_json::Value, AnyError> { + let stream = state + .borrow() + .resource_table + .get::<IpcJsonStreamResource>(rid) + .map_err(|_| bad_resource_id())?; + + let cancel = stream.cancel.clone(); + let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await; + let msgs = stream.read_msg().or_cancel(cancel).await??; + Ok(msgs) + } + + struct IpcJsonStreamResource { + read_half: AsyncRefCell<IpcJsonStream>, + write_half: AsyncRefCell<OwnedWriteHalf>, + cancel: Rc<CancelHandle>, + } + + impl deno_core::Resource for IpcJsonStreamResource { + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } + } + + impl IpcJsonStreamResource { + fn new(stream: RawFd) -> Result<Self, std::io::Error> { + // Safety: The fd is part of a pair of connected sockets create by child process + // implementation. + let unix_stream = UnixStream::from_std(unsafe { + std::os::unix::net::UnixStream::from_raw_fd(stream) + })?; + let (read_half, write_half) = unix_stream.into_split(); + Ok(Self { + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), + write_half: AsyncRefCell::new(write_half), + cancel: Default::default(), + }) + } + + #[cfg(test)] + fn from_unix_stream(stream: UnixStream) -> Self { + let (read_half, write_half) = stream.into_split(); + Self { + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), + write_half: AsyncRefCell::new(write_half), + cancel: Default::default(), + } + } + + async fn write_msg( + self: Rc<Self>, + msg: serde_json::Value, + ) -> Result<(), AnyError> { + let mut write_half = + RcRef::map(self, |r| &r.write_half).borrow_mut().await; + // Perf note: We do not benefit from writev here because + // we are always allocating a buffer for serialization anyways. + let mut buf = Vec::new(); + serde_json::to_writer(&mut buf, &msg)?; + buf.push(b'\n'); + write_half.write_all(&buf).await?; + Ok(()) + } + } + + #[inline] + fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> { + #[cfg(all(target_os = "macos", target_arch = "aarch64"))] + // Safety: haystack of valid length. neon_memchr can handle unaligned + // data. + return unsafe { neon::neon_memchr(haystack, needle, haystack.len()) }; + + #[cfg(not(all(target_os = "macos", target_arch = "aarch64")))] + return haystack.iter().position(|&b| b == needle); + } + + // Initial capacity of the buffered reader and the JSON backing buffer. + // + // This is a tradeoff between memory usage and performance on large messages. + // + // 64kb has been chosen after benchmarking 64 to 66536 << 6 - 1 bytes per message. + const INITIAL_CAPACITY: usize = 1024 * 64; + + // JSON serialization stream over IPC pipe. + // + // `\n` is used as a delimiter between messages. + struct IpcJsonStream { + pipe: BufReader<OwnedReadHalf>, + buffer: Vec<u8>, + } + + impl IpcJsonStream { + fn new(pipe: OwnedReadHalf) -> Self { + Self { + pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), + buffer: Vec::with_capacity(INITIAL_CAPACITY), + } + } + + async fn read_msg(&mut self) -> Result<serde_json::Value, AnyError> { + let mut json = None; + let nread = + read_msg_inner(&mut self.pipe, &mut self.buffer, &mut json).await?; + if nread == 0 { + // EOF. + return Ok(serde_json::Value::Null); + } + + let json = match json { + Some(v) => v, + None => { + // Took more than a single read and some buffering. + simd_json::from_slice(&mut self.buffer[..nread])? + } + }; + + // Safety: Same as `Vec::clear` but without the `drop_in_place` for + // each element (nop for u8). Capacity remains the same. + unsafe { + self.buffer.set_len(0); + } + + Ok(json) + } + } + + pin_project! { + #[must_use = "futures do nothing unless you `.await` or poll them"] + struct ReadMsgInner<'a, R: ?Sized> { + reader: &'a mut R, + buf: &'a mut Vec<u8>, + json: &'a mut Option<serde_json::Value>, + // The number of bytes appended to buf. This can be less than buf.len() if + // the buffer was not empty when the operation was started. + read: usize, + } + } + + fn read_msg_inner<'a, R>( + reader: &'a mut R, + buf: &'a mut Vec<u8>, + json: &'a mut Option<serde_json::Value>, + ) -> ReadMsgInner<'a, R> + where + R: AsyncBufRead + ?Sized + Unpin, + { + ReadMsgInner { + reader, + buf, + json, + read: 0, + } + } + + fn read_msg_internal<R: AsyncBufRead + ?Sized>( + mut reader: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut Vec<u8>, + json: &mut Option<serde_json::Value>, + read: &mut usize, + ) -> Poll<io::Result<usize>> { + loop { + let (done, used) = { + let available = match reader.as_mut().poll_fill_buf(cx) { + std::task::Poll::Ready(t) => t?, + std::task::Poll::Pending => return std::task::Poll::Pending, + }; + + if let Some(i) = memchr(b'\n', available) { + if *read == 0 { + // Fast path: parse and put into the json slot directly. + // + // Safety: It is ok to overwrite the contents because + // we don't need to copy it into the buffer and the length will be reset. + let available = unsafe { + std::slice::from_raw_parts_mut( + available.as_ptr() as *mut u8, + available.len(), + ) + }; + json.replace( + simd_json::from_slice(&mut available[..i + 1]) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?, + ); + } else { + // This is not the first read, so we have to copy the data + // to make it contiguous. + buf.extend_from_slice(&available[..=i]); + } + (true, i + 1) + } else { + buf.extend_from_slice(available); + (false, available.len()) + } + }; + + reader.as_mut().consume(used); + *read += used; + if done || used == 0 { + return Poll::Ready(Ok(mem::replace(read, 0))); + } + } + } + + impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadMsgInner<'_, R> { + type Output = io::Result<usize>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let me = self.project(); + read_msg_internal(Pin::new(*me.reader), cx, me.buf, me.json, me.read) + } + } + + #[cfg(all(target_os = "macos", target_arch = "aarch64"))] + mod neon { + use std::arch::aarch64::*; + + pub unsafe fn neon_memchr( + str: &[u8], + c: u8, + length: usize, + ) -> Option<usize> { + let end = str.as_ptr().wrapping_add(length); + + // Alignment handling + let mut ptr = str.as_ptr(); + while ptr < end && (ptr as usize) & 0xF != 0 { + if *ptr == c { + return Some(ptr as usize - str.as_ptr() as usize); + } + ptr = ptr.wrapping_add(1); + } + + let search_char = vdupq_n_u8(c); + + while ptr.wrapping_add(16) <= end { + let chunk = vld1q_u8(ptr); + let comparison = vceqq_u8(chunk, search_char); + + // Check first 64 bits + let result0 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 0); + if result0 != 0 { + return Some( + (ptr as usize - str.as_ptr() as usize) + + result0.trailing_zeros() as usize / 8, + ); + } + + // Check second 64 bits + let result1 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 1); + if result1 != 0 { + return Some( + (ptr as usize - str.as_ptr() as usize) + + 8 + + result1.trailing_zeros() as usize / 8, + ); + } + + ptr = ptr.wrapping_add(16); + } + + // Handle remaining unaligned characters + while ptr < end { + if *ptr == c { + return Some(ptr as usize - str.as_ptr() as usize); + } + ptr = ptr.wrapping_add(1); + } + + None + } + } + + #[cfg(test)] + mod tests { + use super::IpcJsonStreamResource; + use deno_core::serde_json; + use deno_core::serde_json::json; + use deno_core::RcRef; + use std::rc::Rc; + + #[tokio::test] + async fn bench_ipc() -> Result<(), Box<dyn std::error::Error>> { + // A simple round trip benchmark for quick dev feedback. + // + // Only ran when the env var is set. + if std::env::var_os("BENCH_IPC_DENO").is_none() { + return Ok(()); + } + + let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let child = tokio::spawn(async move { + use tokio::io::AsyncWriteExt; + + let size = 1024 * 1024; + + let stri = "x".repeat(size); + let data = format!("\"{}\"\n", stri); + for _ in 0..100 { + fd2.write_all(data.as_bytes()).await?; + } + Ok::<_, std::io::Error>(()) + }); + + let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); + + let start = std::time::Instant::now(); + let mut bytes = 0; + + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + loop { + let msgs = ipc.read_msg().await?; + if msgs == serde_json::Value::Null { + break; + } + bytes += msgs.as_str().unwrap().len(); + if start.elapsed().as_secs() > 5 { + break; + } + } + let elapsed = start.elapsed(); + let mb = bytes as f64 / 1024.0 / 1024.0; + println!("{} mb/s", mb / elapsed.as_secs_f64()); + + child.await??; + + Ok(()) + } + + #[tokio::test] + async fn unix_ipc_json() -> Result<(), Box<dyn std::error::Error>> { + let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let child = tokio::spawn(async move { + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + + let mut buf = [0u8; 1024]; + let n = fd2.read(&mut buf).await?; + assert_eq!(&buf[..n], b"\"hello\"\n"); + fd2.write_all(b"\"world\"\n").await?; + Ok::<_, std::io::Error>(()) + }); + + /* Similar to how ops would use the resource */ + let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); + + ipc.clone().write_msg(json!("hello")).await?; + + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + let msgs = ipc.read_msg().await?; + assert_eq!(msgs, json!("world")); + + child.await??; + + Ok(()) + } + + #[tokio::test] + async fn unix_ipc_json_multi() -> Result<(), Box<dyn std::error::Error>> { + let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let child = tokio::spawn(async move { + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + + let mut buf = [0u8; 1024]; + let n = fd2.read(&mut buf).await?; + assert_eq!(&buf[..n], b"\"hello\"\n\"world\"\n"); + fd2.write_all(b"\"foo\"\n\"bar\"\n").await?; + Ok::<_, std::io::Error>(()) + }); + + let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); + ipc.clone().write_msg(json!("hello")).await?; + ipc.clone().write_msg(json!("world")).await?; + + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + let msgs = ipc.read_msg().await?; + assert_eq!(msgs, json!("foo")); + + child.await??; + + Ok(()) + } + + #[tokio::test] + async fn unix_ipc_json_invalid() -> Result<(), Box<dyn std::error::Error>> { + let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let child = tokio::spawn(async move { + tokio::io::AsyncWriteExt::write_all(&mut fd2, b"\n\n").await?; + Ok::<_, std::io::Error>(()) + }); + + let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + let _err = ipc.read_msg().await.unwrap_err(); + + child.await??; + + Ok(()) + } + + #[test] + fn memchr() { + let str = b"hello world"; + assert_eq!(super::memchr(b'h', str), Some(0)); + assert_eq!(super::memchr(b'w', str), Some(6)); + assert_eq!(super::memchr(b'd', str), Some(10)); + assert_eq!(super::memchr(b'x', str), None); + + let empty = b""; + assert_eq!(super::memchr(b'\n', empty), None); + } + } +} + +#[cfg(windows)] +mod windows { + use deno_core::error::AnyError; + use deno_core::op2; + + #[op2(fast)] + pub fn op_node_ipc_pipe() -> Result<(), AnyError> { + Err(deno_core::error::not_supported()) + } + + #[op2(async)] + pub async fn op_node_ipc_write() -> Result<(), AnyError> { + Err(deno_core::error::not_supported()) + } + + #[op2(async)] + pub async fn op_node_ipc_read() -> Result<(), AnyError> { + Err(deno_core::error::not_supported()) + } +}
ext/node/ops/mod.rs+1 −0 modified@@ -5,6 +5,7 @@ pub mod fs; pub mod http; pub mod http2; pub mod idna; +pub mod ipc; pub mod os; pub mod require; pub mod util;
ext/node/polyfills/02_init.js+4 −4 modified@@ -7,15 +7,12 @@ const requireImpl = internals.requireImpl; import { nodeGlobals } from "ext:deno_node/00_globals.js"; import "node:module"; -globalThis.nodeBootstrap = function (usesLocalNodeModulesDir, argv0) { - initialize(usesLocalNodeModulesDir, argv0); -}; - let initialized = false; function initialize( usesLocalNodeModulesDir, argv0, + ipcFd, ) { if (initialized) { throw Error("Node runtime already initialized"); @@ -41,6 +38,7 @@ function initialize( // but it's the only way to get `args` and `version` and this point. internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version); internals.__initWorkerThreads(); + internals.__setupChildProcessIpcChannel(ipcFd); // `Deno[Deno.internal].requireImpl` will be unreachable after this line. delete internals.requireImpl; } @@ -52,6 +50,8 @@ function loadCjsModule(moduleName, isMain, inspectBrk) { requireImpl.Module._load(moduleName, null, { main: isMain }); } +globalThis.nodeBootstrap = initialize; + internals.node = { initialize, loadCjsModule,
ext/node/polyfills/child_process.ts+9 −0 modified@@ -10,6 +10,7 @@ import { ChildProcess, ChildProcessOptions, normalizeSpawnArguments, + setupChannel, type SpawnOptions, spawnSync as _spawnSync, type SpawnSyncOptions, @@ -821,6 +822,14 @@ export function execFileSync( return ret.stdout as string | Buffer; } +function setupChildProcessIpcChannel(fd: number) { + if (typeof fd != "number" || fd < 0) return; + setupChannel(process, fd); +} + +globalThis.__bootstrap.internals.__setupChildProcessIpcChannel = + setupChildProcessIpcChannel; + export default { fork, spawn,
ext/node/polyfills/internal/child_process.ts+94 −1 modified@@ -44,6 +44,9 @@ import { kEmptyObject } from "ext:deno_node/internal/util.mjs"; import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs"; import process from "node:process"; +const core = globalThis.__bootstrap.core; +const ops = core.ops; + export function mapValues<T, O>( record: Readonly<Record<string, T>>, transformer: (value: T) => O, @@ -167,12 +170,13 @@ export class ChildProcess extends EventEmitter { signal, windowsVerbatimArguments = false, } = options || {}; + const normalizedStdio = normalizeStdioOption(stdio); const [ stdin = "pipe", stdout = "pipe", stderr = "pipe", _channel, // TODO(kt3k): handle this correctly - ] = normalizeStdioOption(stdio); + ] = normalizedStdio; const [cmd, cmdArgs] = buildCommand( command, args || [], @@ -181,6 +185,8 @@ export class ChildProcess extends EventEmitter { this.spawnfile = cmd; this.spawnargs = [cmd, ...cmdArgs]; + const ipc = normalizedStdio.indexOf("ipc"); + const stringEnv = mapValues(env, (value) => value.toString()); try { this.#process = new Deno.Command(cmd, { @@ -191,6 +197,7 @@ export class ChildProcess extends EventEmitter { stdout: toDenoStdio(stdout), stderr: toDenoStdio(stderr), windowsRawArguments: windowsVerbatimArguments, + ipc, // internal }).spawn(); this.pid = this.#process.pid; @@ -249,6 +256,10 @@ export class ChildProcess extends EventEmitter { } } + if (typeof this.#process._pipeFd == "number") { + setupChannel(this, this.#process._pipeFd); + } + (async () => { const status = await this.#process.status; this.exitCode = status.code; @@ -1058,9 +1069,91 @@ function toDenoArgs(args: string[]): string[] { return denoArgs; } +export function setupChannel(target, channel) { + const ipc = ops.op_node_ipc_pipe(channel); + + async function readLoop() { + try { + while (true) { + if (!target.connected || target.killed) { + return; + } + const msg = await core.opAsync("op_node_ipc_read", ipc); + if (msg == null) { + // Channel closed. + target.disconnect(); + return; + } + + process.nextTick(handleMessage, msg); + } + } catch (err) { + if ( + err instanceof Deno.errors.Interrupted || + err instanceof Deno.errors.BadResource + ) { + return; + } + } + } + + function handleMessage(msg) { + target.emit("message", msg); + } + + target.send = function (message, handle, options, callback) { + if (typeof handle === "function") { + callback = handle; + handle = undefined; + options = undefined; + } else if (typeof options === "function") { + callback = options; + options = undefined; + } else if (options !== undefined) { + validateObject(options, "options"); + } + + options = { swallowErrors: false, ...options }; + + if (message === undefined) { + throw new TypeError("ERR_MISSING_ARGS", "message"); + } + + if (handle !== undefined) { + notImplemented("ChildProcess.send with handle"); + } + + core.opAsync("op_node_ipc_write", ipc, message) + .then(() => { + if (callback) { + process.nextTick(callback, null); + } + }); + }; + + target.connected = true; + + target.disconnect = function () { + if (!this.connected) { + this.emit("error", new Error("IPC channel is already disconnected")); + return; + } + + this.connected = false; + process.nextTick(() => { + core.close(ipc); + target.emit("disconnect"); + }); + }; + + // Start reading messages from the channel. + readLoop(); +} + export default { ChildProcess, normalizeSpawnArguments, stdioStringToArray, spawnSync, + setupChannel, };
ext/node/polyfills/process.ts+0 −1 modified@@ -69,7 +69,6 @@ import { buildAllowedFlags } from "ext:deno_node/internal/process/per_thread.mjs const notImplementedEvents = [ "disconnect", - "message", "multipleResolves", "rejectionHandled", "worker",
runtime/js/40_process.js+10 −0 modified@@ -159,6 +159,7 @@ function spawnChildInner(opFn, command, apiName, { stderr = "piped", signal = undefined, windowsRawArguments = false, + ipc = -1, } = {}) { const child = opFn({ cmd: pathFromURL(command), @@ -172,6 +173,7 @@ function spawnChildInner(opFn, command, apiName, { stdout, stderr, windowsRawArguments, + ipc, }, apiName); return new ChildProcess(illegalConstructorKey, { ...child, @@ -203,6 +205,12 @@ class ChildProcess { #waitPromise; #waitComplete = false; + #pipeFd; + // internal, used by ext/node + get _pipeFd() { + return this.#pipeFd; + } + #pid; get pid() { return this.#pid; @@ -239,13 +247,15 @@ class ChildProcess { stdinRid, stdoutRid, stderrRid, + pipeFd, // internal } = null) { if (key !== illegalConstructorKey) { throw new TypeError("Illegal constructor."); } this.#rid = rid; this.#pid = pid; + this.#pipeFd = pipeFd; if (stdinRid !== null) { this.#stdin = writableStreamForRid(stdinRid);
runtime/js/99_main.js+2 −1 modified@@ -440,6 +440,7 @@ function bootstrapMainRuntime(runtimeOptions) { 3: inspectFlag, 5: hasNodeModulesDir, 6: maybeBinaryNpmCommandName, + 7: nodeIpcFd, } = runtimeOptions; performance.setTimeOrigin(DateNow()); @@ -545,7 +546,7 @@ function bootstrapMainRuntime(runtimeOptions) { ObjectDefineProperty(globalThis, "Deno", util.readOnly(finalDenoNs)); if (nodeBootstrap) { - nodeBootstrap(hasNodeModulesDir, maybeBinaryNpmCommandName); + nodeBootstrap(hasNodeModulesDir, maybeBinaryNpmCommandName, nodeIpcFd); } }
runtime/ops/process.rs+102 −14 modified@@ -141,6 +141,8 @@ pub struct SpawnArgs { uid: Option<u32>, #[cfg(windows)] windows_raw_arguments: bool, + #[cfg(unix)] + ipc: Option<i32>, #[serde(flatten)] stdio: ChildStdio, @@ -205,11 +207,18 @@ pub struct SpawnOutput { stderr: Option<ToJsBuffer>, } +type CreateCommand = ( + std::process::Command, + // TODO(@littledivy): Ideally this would return Option<ResourceId> but we are dealing with file descriptors + // all the way until setupChannel which makes it easier to share code between parent and child fork. + Option<i32>, +); + fn create_command( state: &mut OpState, args: SpawnArgs, api_name: &str, -) -> Result<std::process::Command, AnyError> { +) -> Result<CreateCommand, AnyError> { state .borrow_mut::<PermissionsContainer>() .check_run(&args.cmd, api_name)?; @@ -245,15 +254,6 @@ fn create_command( if let Some(uid) = args.uid { command.uid(uid); } - #[cfg(unix)] - // TODO(bartlomieju): - #[allow(clippy::undocumented_unsafe_blocks)] - unsafe { - command.pre_exec(|| { - libc::setgroups(0, std::ptr::null()); - Ok(()) - }); - } command.stdin(args.stdio.stdin.as_stdio()); command.stdout(match args.stdio.stdout { @@ -265,7 +265,91 @@ fn create_command( value => value.as_stdio(), }); - Ok(command) + #[cfg(unix)] + // TODO(bartlomieju): + #[allow(clippy::undocumented_unsafe_blocks)] + unsafe { + if let Some(ipc) = args.ipc { + if ipc < 0 { + return Ok((command, None)); + } + // SockFlag is broken on macOS + // https://github.com/nix-rust/nix/issues/861 + let mut fds = [-1, -1]; + #[cfg(not(target_os = "macos"))] + let flags = libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK; + + #[cfg(target_os = "macos")] + let flags = 0; + + let ret = libc::socketpair( + libc::AF_UNIX, + libc::SOCK_STREAM | flags, + 0, + fds.as_mut_ptr(), + ); + if ret != 0 { + return Err(std::io::Error::last_os_error().into()); + } + + if cfg!(target_os = "macos") { + let fcntl = + |fd: i32, flag: libc::c_int| -> Result<(), std::io::Error> { + let flags = libc::fcntl(fd, libc::F_GETFL, 0); + + if flags == -1 { + return Err(fail(fds)); + } + let ret = libc::fcntl(fd, libc::F_SETFL, flags | flag); + if ret == -1 { + return Err(fail(fds)); + } + Ok(()) + }; + + fn fail(fds: [i32; 2]) -> std::io::Error { + unsafe { + libc::close(fds[0]); + libc::close(fds[1]); + } + std::io::Error::last_os_error() + } + + // SOCK_NONBLOCK is not supported on macOS. + (fcntl)(fds[0], libc::O_NONBLOCK)?; + (fcntl)(fds[1], libc::O_NONBLOCK)?; + + // SOCK_CLOEXEC is not supported on macOS. + (fcntl)(fds[0], libc::FD_CLOEXEC)?; + (fcntl)(fds[1], libc::FD_CLOEXEC)?; + } + + let fd1 = fds[0]; + let fd2 = fds[1]; + + command.pre_exec(move || { + if ipc >= 0 { + let _fd = libc::dup2(fd2, ipc); + libc::close(fd2); + } + libc::setgroups(0, std::ptr::null()); + Ok(()) + }); + + /* One end returned to parent process (this) */ + let pipe_fd = Some(fd1); + + /* The other end passed to child process via DENO_CHANNEL_FD */ + command.env("DENO_CHANNEL_FD", format!("{}", ipc)); + + return Ok((command, pipe_fd)); + } + + Ok((command, None)) + } + + #[cfg(not(unix))] + return Ok((command, None)); } #[derive(Serialize)] @@ -276,11 +360,13 @@ struct Child { stdin_rid: Option<ResourceId>, stdout_rid: Option<ResourceId>, stderr_rid: Option<ResourceId>, + pipe_fd: Option<i32>, } fn spawn_child( state: &mut OpState, command: std::process::Command, + pipe_fd: Option<i32>, ) -> Result<Child, AnyError> { let mut command = tokio::process::Command::from(command); // TODO(@crowlkats): allow detaching processes. @@ -362,6 +448,7 @@ fn spawn_child( stdin_rid, stdout_rid, stderr_rid, + pipe_fd, }) } @@ -372,8 +459,8 @@ fn op_spawn_child( #[serde] args: SpawnArgs, #[string] api_name: String, ) -> Result<Child, AnyError> { - let command = create_command(state, args, &api_name)?; - spawn_child(state, command) + let (command, pipe_fd) = create_command(state, args, &api_name)?; + spawn_child(state, command, pipe_fd) } #[op2(async)] @@ -402,7 +489,8 @@ fn op_spawn_sync( ) -> Result<SpawnOutput, AnyError> { let stdout = matches!(args.stdio.stdout, Stdio::Piped); let stderr = matches!(args.stdio.stderr, Stdio::Piped); - let mut command = create_command(state, args, "Deno.Command().outputSync()")?; + let (mut command, _) = + create_command(state, args, "Deno.Command().outputSync()")?; let output = command.output().with_context(|| { format!( "Failed to spawn '{}'",
runtime/worker_bootstrap.rs+5 −0 modified@@ -59,6 +59,7 @@ pub struct BootstrapOptions { pub inspect: bool, pub has_node_modules_dir: bool, pub maybe_binary_npm_command_name: Option<String>, + pub node_ipc_fd: Option<i32>, } impl Default for BootstrapOptions { @@ -86,6 +87,7 @@ impl Default for BootstrapOptions { args: Default::default(), has_node_modules_dir: Default::default(), maybe_binary_npm_command_name: None, + node_ipc_fd: None, } } } @@ -115,6 +117,8 @@ struct BootstrapV8<'a>( bool, // maybe_binary_npm_command_name Option<&'a str>, + // node_ipc_fd + i32, ); impl BootstrapOptions { @@ -134,6 +138,7 @@ impl BootstrapOptions { self.enable_testing_features, self.has_node_modules_dir, self.maybe_binary_npm_command_name.as_deref(), + self.node_ipc_fd.unwrap_or(-1), ); bootstrap.serialize(ser).unwrap()
Vulnerability mechanics
Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
12- github.com/advisories/GHSA-6q4w-9x56-rmwqghsaADVISORY
- nvd.nist.gov/vuln/detail/CVE-2024-27933ghsaADVISORY
- github.com/denoland/deno/blob/v1.39.0/runtime/permissions/prompter.rsghsax_refsource_MISCWEB
- github.com/denoland/deno/blob/v1.39.0/runtime/permissions/prompter.rsghsax_refsource_MISCWEB
- github.com/denoland/deno/blob/v1.39.0/runtime/permissions/prompter.rsghsax_refsource_MISCWEB
- github.com/denoland/deno/blob/v1.39.0/runtime/permissions/prompter.rsghsax_refsource_MISCWEB
- github.com/denoland/deno/blob/v1.39.0/runtime/permissions/prompter.rsghsax_refsource_MISCWEB
- github.com/denoland/deno/blob/v1.39.0/runtime/permissions/prompter.rsghsax_refsource_MISCWEB
- github.com/denoland/deno/blob/v1.39.0/runtime/permissions/prompter.rsghsax_refsource_MISCWEB
- github.com/denoland/deno/commit/55fac9f5ead6d30996400e8597c969b675c5a22bghsax_refsource_MISCWEB
- github.com/denoland/deno/commit/5a91a065b882215dde209baf626247e54c21a392ghsax_refsource_MISCWEB
- github.com/denoland/deno/security/advisories/GHSA-6q4w-9x56-rmwqghsax_refsource_CONFIRMWEB
News mentions
0No linked articles in our index yet.