Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions crates/common/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,38 @@ pub type PyMappedRwLockWriteGuard<'a, T> = MappedRwLockWriteGuard<'a, RawRwLock,
/// Reset a `PyMutex` to its initial (unlocked) state after `fork()`.
///
/// After `fork()`, locks held by dead parent threads would deadlock in the
/// child. This zeroes the raw lock bytes directly, bypassing the normal unlock
/// path which may interact with parking_lot's internal waiter queues.
/// child. This writes `RawMutex::INIT` via the `Mutex::raw()` accessor,
/// bypassing the normal unlock path which may interact with parking_lot's
/// internal waiter queues.
///
/// # Safety
///
/// Must only be called from the single-threaded child process immediately
/// after `fork()`, before any other thread is created.
#[cfg(unix)]
pub unsafe fn reinit_mutex_after_fork<T: ?Sized>(mutex: &PyMutex<T>) {
// lock_api::Mutex<R, T> layout: raw R at offset 0, then UnsafeCell<T>.
// Zeroing R resets to unlocked for both parking_lot::RawMutex (AtomicU8)
// and RawCellMutex (Cell<bool>).
// Use Mutex::raw() to access the underlying lock without layout assumptions.
// parking_lot::RawMutex (AtomicU8) and RawCellMutex (Cell<bool>) both
// represent the unlocked state as all-zero bytes.
unsafe {
let ptr = mutex as *const PyMutex<T> as *mut u8;
core::ptr::write_bytes(ptr, 0, core::mem::size_of::<RawMutex>());
let raw = mutex.raw() as *const RawMutex as *mut u8;
core::ptr::write_bytes(raw, 0, core::mem::size_of::<RawMutex>());
}
}

/// Reset a `PyRwLock` to its initial (unlocked) state after `fork()`.
///
/// Same rationale as [`reinit_mutex_after_fork`] — dead threads' read or
/// write locks would cause permanent deadlock in the child.
///
/// # Safety
///
/// Must only be called from the single-threaded child process immediately
/// after `fork()`, before any other thread is created.
#[cfg(unix)]
pub unsafe fn reinit_rwlock_after_fork<T: ?Sized>(rwlock: &PyRwLock<T>) {
unsafe {
let raw = rwlock.raw() as *const RawRwLock as *mut u8;
core::ptr::write_bytes(raw, 0, core::mem::size_of::<RawRwLock>());
}
}
25 changes: 5 additions & 20 deletions crates/vm/src/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,29 +153,14 @@ impl ToPyObject for PyCodec {
}

impl CodecsRegistry {
/// Force-unlock the inner RwLock after fork in the child process.
/// Reset the inner RwLock to unlocked state after fork().
///
/// # Safety
/// Must only be called after fork() in the child process when no other
/// threads exist. The calling thread must NOT hold this lock.
#[cfg(all(unix, feature = "host_env"))]
pub(crate) unsafe fn force_unlock_after_fork(&self) {
if self.inner.try_write().is_some() {
return;
}
let is_shared = self.inner.try_read().is_some();
if is_shared {
loop {
// SAFETY: Lock is shared-locked by dead thread(s).
unsafe { self.inner.force_unlock_read() };
if self.inner.try_write().is_some() {
return;
}
}
} else {
// SAFETY: Lock is exclusively locked by a dead thread.
unsafe { self.inner.force_unlock_write() };
}
/// threads exist.
#[cfg(all(unix, feature = "threading"))]
pub(crate) unsafe fn reinit_after_fork(&self) {
unsafe { crate::common::lock::reinit_rwlock_after_fork(&self.inner) };
}

pub(crate) fn new(ctx: &Context) -> Self {
Expand Down
78 changes: 23 additions & 55 deletions crates/vm/src/gc_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,14 @@ impl GcGeneration {
guard.uncollectable += uncollectable;
}

/// Force-unlock the stats mutex after fork() in the child process.
/// Reset the stats mutex to unlocked state after fork().
///
/// # Safety
/// Must only be called after fork() in the child process when no other
/// threads exist.
#[cfg(unix)]
unsafe fn force_unlock_stats_after_fork(&self) {
if self.stats.try_lock().is_none() {
unsafe { self.stats.force_unlock() };
}
#[cfg(all(unix, feature = "threading"))]
unsafe fn reinit_stats_after_fork(&self) {
unsafe { crate::common::lock::reinit_mutex_after_fork(&self.stats) };
}
}

Expand Down Expand Up @@ -793,65 +791,35 @@ impl GcState {

/// Force-unlock all locks after fork() in the child process.
///
/// Reset all locks to unlocked state after fork().
///
/// After fork(), only the forking thread survives. Any lock held by another
/// thread is permanently stuck. This method releases all such stuck locks.
/// thread is permanently stuck. This resets them by zeroing the raw bytes.
///
/// # Safety
/// Must only be called after fork() in the child process when no other
/// threads exist. The calling thread must NOT hold any of these locks.
#[cfg(unix)]
pub unsafe fn force_unlock_after_fork(&self) {
// Force-unlock the collecting mutex
if self.collecting.try_lock().is_none() {
unsafe { self.collecting.force_unlock() };
}
#[cfg(all(unix, feature = "threading"))]
pub unsafe fn reinit_after_fork(&self) {
use crate::common::lock::{reinit_mutex_after_fork, reinit_rwlock_after_fork};

// Force-unlock garbage and callbacks mutexes
if self.garbage.try_lock().is_none() {
unsafe { self.garbage.force_unlock() };
}
if self.callbacks.try_lock().is_none() {
unsafe { self.callbacks.force_unlock() };
}
unsafe {
reinit_mutex_after_fork(&self.collecting);
reinit_mutex_after_fork(&self.garbage);
reinit_mutex_after_fork(&self.callbacks);

// Force-unlock generation stats mutexes
for generation in &self.generations {
unsafe { generation.force_unlock_stats_after_fork() };
}
unsafe { self.permanent.force_unlock_stats_after_fork() };

// Force-unlock RwLocks
for rw in &self.generation_objects {
unsafe { force_unlock_rwlock_after_fork(rw) };
}
unsafe { force_unlock_rwlock_after_fork(&self.permanent_objects) };
unsafe { force_unlock_rwlock_after_fork(&self.tracked_objects) };
unsafe { force_unlock_rwlock_after_fork(&self.finalized_objects) };
}
}
for generation in &self.generations {
generation.reinit_stats_after_fork();
}
self.permanent.reinit_stats_after_fork();

/// Force-unlock a PyRwLock after fork() in the child process.
///
/// # Safety
/// Must only be called after fork() in the child process when no other
/// threads exist. The calling thread must NOT hold this lock.
#[cfg(unix)]
unsafe fn force_unlock_rwlock_after_fork<T>(lock: &PyRwLock<T>) {
if lock.try_write().is_some() {
return;
}
let is_shared = lock.try_read().is_some();
if is_shared {
loop {
// SAFETY: Lock is shared-locked by dead thread(s).
unsafe { lock.force_unlock_read() };
if lock.try_write().is_some() {
return;
for rw in &self.generation_objects {
reinit_rwlock_after_fork(rw);
}
reinit_rwlock_after_fork(&self.permanent_objects);
reinit_rwlock_after_fork(&self.tracked_objects);
reinit_rwlock_after_fork(&self.finalized_objects);
}
} else {
// SAFETY: Lock is exclusively locked by a dead thread.
unsafe { lock.force_unlock_write() };
}
}

Expand Down
26 changes: 5 additions & 21 deletions crates/vm/src/intern.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,14 @@ impl Clone for StringPool {
}

impl StringPool {
/// Force-unlock the inner RwLock after fork in the child process.
/// Reset the inner RwLock to unlocked state after fork().
///
/// # Safety
/// Must only be called after fork() in the child process when no other
/// threads exist. The calling thread must NOT hold this lock.
#[cfg(all(unix, feature = "host_env"))]
pub(crate) unsafe fn force_unlock_after_fork(&self) {
if self.inner.try_write().is_some() {
return;
}
// Lock is stuck from a thread that no longer exists.
let is_shared = self.inner.try_read().is_some();
if is_shared {
loop {
// SAFETY: Lock is shared-locked by dead thread(s).
unsafe { self.inner.force_unlock_read() };
if self.inner.try_write().is_some() {
return;
}
}
} else {
// SAFETY: Lock is exclusively locked by a dead thread.
unsafe { self.inner.force_unlock_write() };
}
/// threads exist.
#[cfg(all(unix, feature = "threading"))]
pub(crate) unsafe fn reinit_after_fork(&self) {
unsafe { crate::common::lock::reinit_rwlock_after_fork(&self.inner) };
}

#[inline]
Expand Down
74 changes: 43 additions & 31 deletions crates/vm/src/stdlib/posix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,37 +713,22 @@ pub mod module {
}

fn py_os_after_fork_child(vm: &VirtualMachine) {
// Reset low-level state before any Python code runs in the child.
// Signal triggers from the parent must not fire in the child.
// Phase 1: Reset all internal locks FIRST.
// After fork(), locks held by dead parent threads would deadlock
// if we try to acquire them. This must happen before anything else.
#[cfg(feature = "threading")]
reinit_locks_after_fork(vm);

// Phase 2: Reset low-level atomic state (no locks needed).
crate::signal::clear_after_fork();
crate::stdlib::signal::_signal::clear_wakeup_fd_after_fork();

// Reset weakref stripe locks that may have been held during fork.
#[cfg(feature = "threading")]
crate::object::reset_weakref_locks_after_fork();

// Force-unlock all global VM locks that may have been held by
// threads that no longer exist in the child process after fork.
// SAFETY: After fork, only the forking thread survives. Any lock
// held by another thread is permanently stuck. The forking thread
// does not hold these locks during fork() (a high-level Python op).
unsafe {
vm.ctx.string_pool.force_unlock_after_fork();
vm.state.codec_registry.force_unlock_after_fork();
force_unlock_mutex_after_fork(&vm.state.atexit_funcs);
force_unlock_mutex_after_fork(&vm.state.before_forkers);
force_unlock_mutex_after_fork(&vm.state.after_forkers_child);
force_unlock_mutex_after_fork(&vm.state.after_forkers_parent);
force_unlock_mutex_after_fork(&vm.state.global_trace_func);
force_unlock_mutex_after_fork(&vm.state.global_profile_func);
crate::gc_state::gc_state().force_unlock_after_fork();

// Import lock (ReentrantMutex) — was previously not reinit'd
#[cfg(feature = "threading")]
crate::stdlib::imp::reinit_imp_lock_after_fork();
}

// Mark all other threads as done before running Python callbacks
// Phase 3: Clean up thread state. Locks are now reinit'd so we can
// acquire them normally instead of using try_lock().
#[cfg(feature = "threading")]
crate::stdlib::thread::after_fork_child(vm);

Expand All @@ -752,18 +737,45 @@ pub mod module {
vm.signal_handlers
.get_or_init(crate::signal::new_signal_handlers);

// Phase 4: Run Python-level at-fork callbacks.
let after_forkers_child: Vec<PyObjectRef> = vm.state.after_forkers_child.lock().clone();
run_at_forkers(after_forkers_child, false, vm);
}

/// Force-unlock a PyMutex if held by a dead thread after fork.
/// Reset all parking_lot-based locks in the interpreter state after fork().
///
/// # Safety
/// Must only be called after fork() in the child process.
unsafe fn force_unlock_mutex_after_fork<T>(mutex: &crate::common::lock::PyMutex<T>) {
if mutex.try_lock().is_none() {
// SAFETY: Lock is held by a dead thread after fork.
unsafe { mutex.force_unlock() };
/// After fork(), only the calling thread survives. Any locks held by other
/// (now-dead) threads would cause deadlocks. We unconditionally reset them
/// to unlocked by zeroing the raw lock bytes.
#[cfg(all(unix, feature = "threading"))]
fn reinit_locks_after_fork(vm: &VirtualMachine) {
use rustpython_common::lock::reinit_mutex_after_fork;

unsafe {
// PyGlobalState PyMutex locks
reinit_mutex_after_fork(&vm.state.before_forkers);
reinit_mutex_after_fork(&vm.state.after_forkers_child);
reinit_mutex_after_fork(&vm.state.after_forkers_parent);
reinit_mutex_after_fork(&vm.state.atexit_funcs);
reinit_mutex_after_fork(&vm.state.global_trace_func);
reinit_mutex_after_fork(&vm.state.global_profile_func);

// PyGlobalState parking_lot::Mutex locks
reinit_mutex_after_fork(&vm.state.thread_frames);
reinit_mutex_after_fork(&vm.state.thread_handles);
reinit_mutex_after_fork(&vm.state.shutdown_handles);

// Context-level RwLock
vm.ctx.string_pool.reinit_after_fork();

// Codec registry RwLock
vm.state.codec_registry.reinit_after_fork();

// GC state (multiple Mutex + RwLock)
crate::gc_state::gc_state().reinit_after_fork();

// Import lock (RawReentrantMutex<RawMutex, RawThreadId>)
crate::stdlib::imp::reinit_imp_lock_after_fork();
}
}

Expand Down
Loading
Loading