Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions crates/common/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,18 @@ pub unsafe fn reinit_rwlock_after_fork<T: ?Sized>(rwlock: &PyRwLock<T>) {
core::ptr::write_bytes(raw, 0, core::mem::size_of::<RawRwLock>());
}
}

/// Reset a `PyThreadMutex` to its initial (unlocked, unowned) state after `fork()`.
///
/// `PyThreadMutex` is used by buffered IO objects (`BufferedReader`,
/// `BufferedWriter`, `TextIOWrapper`). If a dead parent thread held one of
/// these locks during `fork()`, the child would deadlock on any IO operation.
///
/// # Safety
///
/// Must only be called from the single-threaded child process immediately
/// after `fork()`, before any other thread is created.
#[cfg(unix)]
pub unsafe fn reinit_thread_mutex_after_fork<T: ?Sized>(mutex: &PyThreadMutex<T>) {
unsafe { mutex.raw().reinit_after_fork() }
}
22 changes: 22 additions & 0 deletions crates/common/src/lock/thread_mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,23 @@ impl<R: RawMutex, G: GetThreadId> RawThreadMutex<R, G> {
}
}

impl<R: RawMutex, G: GetThreadId> RawThreadMutex<R, G> {
/// Reset this mutex to its initial (unlocked, unowned) state after `fork()`.
///
/// # Safety
///
/// Must only be called from the single-threaded child process immediately
/// after `fork()`, before any other thread is created.
#[cfg(unix)]
pub unsafe fn reinit_after_fork(&self) {
self.owner.store(0, Ordering::Relaxed);
unsafe {
let mutex_ptr = &self.mutex as *const R as *mut u8;
core::ptr::write_bytes(mutex_ptr, 0, core::mem::size_of::<R>());
}
}
}

unsafe impl<R: RawMutex + Send, G: GetThreadId + Send> Send for RawThreadMutex<R, G> {}
unsafe impl<R: RawMutex + Sync, G: GetThreadId + Sync> Sync for RawThreadMutex<R, G> {}

Expand Down Expand Up @@ -103,6 +120,11 @@ impl<R: RawMutex, G: GetThreadId, T> From<T> for ThreadMutex<R, G, T> {
}
}
impl<R: RawMutex, G: GetThreadId, T: ?Sized> ThreadMutex<R, G, T> {
/// Access the underlying raw thread mutex.
pub fn raw(&self) -> &RawThreadMutex<R, G> {
&self.raw
}

pub fn lock(&self) -> Option<ThreadMutexGuard<'_, R, G, T>> {
if self.raw.lock() {
Some(ThreadMutexGuard {
Expand Down
57 changes: 57 additions & 0 deletions crates/vm/src/stdlib/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
* I/O core tools.
*/
pub(crate) use _io::module_def;
#[cfg(all(unix, feature = "threading"))]
pub(crate) use _io::reinit_std_streams_after_fork;

cfg_if::cfg_if! {
if #[cfg(any(not(target_arch = "wasm32"), target_os = "wasi"))] {
Expand Down Expand Up @@ -4985,6 +4987,61 @@ mod _io {
}
}

/// Reinit per-object IO buffer locks on std streams after `fork()`.
///
/// # Safety
///
/// Must only be called from the single-threaded child process immediately
/// after `fork()`, before any other thread is created.
#[cfg(all(unix, feature = "threading"))]
pub unsafe fn reinit_std_streams_after_fork(vm: &VirtualMachine) {
for name in ["stdin", "stdout", "stderr"] {
let Ok(stream) = vm.sys_module.get_attr(name, vm) else {
continue;
};
reinit_io_locks(&stream);
}
}

#[cfg(all(unix, feature = "threading"))]
fn reinit_io_locks(obj: &PyObject) {
use crate::common::lock::reinit_thread_mutex_after_fork;

if let Some(tio) = obj.downcast_ref::<TextIOWrapper>() {
unsafe { reinit_thread_mutex_after_fork(&tio.data) };

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid reinitializing a mutex that may still have a live guard

This unconditionally resets TextIOWrapper's PyThreadMutex in the child, but the forking thread can still have a live guard on that same lock when fork() is called from code executed under the lock (for example, TextIOWrapper::write calls encoder.encode while holding self.lock(vm), and a custom encoder can call os.fork()). After this reset, that pre-fork guard will later drop and call unlock() on a mutex that was forcibly reinitialized, which can corrupt lock state or panic in parking_lot. Please guard this path so locks currently owned by the calling thread are not blindly reinitialized.

Useful? React with 👍 / 👎.

if let Some(guard) = tio.data.lock() {
if let Some(ref data) = *guard {
if let Some(ref decoder) = data.decoder {
reinit_io_locks(decoder);
}
reinit_io_locks(&data.buffer);
}
}
return;
}
if let Some(nl) = obj.downcast_ref::<IncrementalNewlineDecoder>() {
unsafe { reinit_thread_mutex_after_fork(&nl.data) };
return;
}
if let Some(br) = obj.downcast_ref::<BufferedReader>() {
unsafe { reinit_thread_mutex_after_fork(&br.data) };
return;
}
if let Some(bw) = obj.downcast_ref::<BufferedWriter>() {
unsafe { reinit_thread_mutex_after_fork(&bw.data) };
return;
}
if let Some(brw) = obj.downcast_ref::<BufferedRandom>() {
unsafe { reinit_thread_mutex_after_fork(&brw.data) };
return;
}
if let Some(brw) = obj.downcast_ref::<BufferedRWPair>() {
unsafe { reinit_thread_mutex_after_fork(&brw.read.data) };
unsafe { reinit_thread_mutex_after_fork(&brw.write.data) };
return;
}
}

pub fn io_open(
file: PyObjectRef,
mode: Option<&str>,
Expand Down
8 changes: 8 additions & 0 deletions crates/vm/src/stdlib/posix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,14 @@ pub mod module {
#[cfg(feature = "threading")]
reinit_locks_after_fork(vm);

// Reinit per-object IO buffer locks on std streams.
// BufferedReader/Writer/TextIOWrapper use PyThreadMutex which can be
// held by dead parent threads, causing deadlocks on any IO in the child.
#[cfg(feature = "threading")]
unsafe {
crate::stdlib::io::reinit_std_streams_after_fork(vm)
};

// Phase 2: Reset low-level atomic state (no locks needed).
crate::signal::clear_after_fork();
crate::stdlib::signal::_signal::clear_wakeup_fd_after_fork();
Expand Down
Loading