Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .cspell.dict/cpython.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ NEWLOCALS
newsemlockobject
nfrees
nkwargs
nlocalsplus
nkwelts
nlocalsplus
Nondescriptor
noninteger
nops
Expand Down Expand Up @@ -160,6 +160,7 @@ pylifecycle
pymain
pyrepl
PYTHONTRACEMALLOC
PYTHONUTF8
pythonw
PYTHREAD_NAME
releasebuffer
Expand All @@ -171,9 +172,11 @@ saveall
scls
setdict
setfunc
setprofileallthreads
SETREF
setresult
setslice
settraceallthreads
SLOTDEFINED
SMALLBUF
SOABI
Expand All @@ -190,8 +193,10 @@ subparams
subscr
sval
swappedbytes
sysdict
templatelib
testconsole
threadstate
ticketer
tmptype
tok_oldval
Expand Down
5 changes: 0 additions & 5 deletions .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,6 @@
"IFEXEC",
// "stat"
"FIRMLINK",
// CPython internal names
"PYTHONUTF",
"sysdict",
"settraceallthreads",
"setprofileallthreads"
],
// flagWords - list of words to be always considered incorrect
"flagWords": [
Expand Down
1 change: 0 additions & 1 deletion Lib/test/test_fork1.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@


class ForkTest(ForkWait):
@unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: process 44587 exited with code 1, but exit code 42 is expected
def test_threaded_import_lock_fork(self):
"""Check fork() in main thread works while a subthread is doing an import"""
import_started = threading.Event()
Expand Down
1 change: 0 additions & 1 deletion Lib/test/test_os.py
Original file line number Diff line number Diff line change
Expand Up @@ -5574,7 +5574,6 @@ def test_fork_warns_when_non_python_thread_exists(self):
self.assertEqual(err.decode("utf-8"), "")
self.assertEqual(out.decode("utf-8"), "")

@unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: b"can't fork at interpreter shutdown" not found in b"Exception ignored in: <function AtFinalization.__del__ at 0xc508b30c0>\nAttributeError: 'NoneType' object has no attribute 'fork'\n"
def test_fork_at_finalization(self):
code = """if 1:
import atexit
Expand Down
40 changes: 21 additions & 19 deletions crates/common/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,43 +68,45 @@ pub type PyMappedRwLockWriteGuard<'a, T> = MappedRwLockWriteGuard<'a, RawRwLock,

// can add fn const_{mutex,rw_lock}() if necessary, but we probably won't need to

/// Reset a `PyMutex` to its initial (unlocked) state after `fork()`.
/// Reset a lock to its initial (unlocked) state by zeroing its bytes.
///
/// After `fork()`, locks held by dead parent threads would deadlock in the
/// child. This writes `RawMutex::INIT` via the `Mutex::raw()` accessor,
/// bypassing the normal unlock path which may interact with parking_lot's
/// internal waiter queues.
/// After `fork()`, any lock held by a now-dead thread would remain
/// permanently locked. We zero the raw bytes (the unlocked state for all
/// `parking_lot` raw lock types) instead of using the normal unlock path,
/// which would interact with stale waiter queues.
///
/// # Safety
///
/// Must only be called from the single-threaded child process immediately
/// after `fork()`, before any other thread is created.
#[cfg(unix)]
pub unsafe fn reinit_mutex_after_fork<T: ?Sized>(mutex: &PyMutex<T>) {
// Use Mutex::raw() to access the underlying lock without layout assumptions.
// parking_lot::RawMutex (AtomicU8) and RawCellMutex (Cell<bool>) both
// represent the unlocked state as all-zero bytes.
/// The type `T` must represent the unlocked state as all-zero bytes
/// (true for `parking_lot::RawMutex`, `RawRwLock`, `RawReentrantMutex`, etc.).
pub unsafe fn zero_reinit_after_fork<T>(lock: *const T) {
unsafe {
let raw = mutex.raw() as *const RawMutex as *mut u8;
core::ptr::write_bytes(raw, 0, core::mem::size_of::<RawMutex>());
core::ptr::write_bytes(lock as *mut u8, 0, core::mem::size_of::<T>());
}
}

/// Reset a `PyRwLock` to its initial (unlocked) state after `fork()`.
/// Reset a `PyMutex` after `fork()`. See [`zero_reinit_after_fork`].
///
/// # Safety
///
/// Same rationale as [`reinit_mutex_after_fork`] — dead threads' read or
/// write locks would cause permanent deadlock in the child.
/// Must only be called from the single-threaded child process immediately
/// after `fork()`, before any other thread is created.
#[cfg(unix)]
pub unsafe fn reinit_mutex_after_fork<T: ?Sized>(mutex: &PyMutex<T>) {
unsafe { zero_reinit_after_fork(mutex.raw()) }
}

/// Reset a `PyRwLock` after `fork()`. See [`zero_reinit_after_fork`].
///
/// # Safety
///
/// Must only be called from the single-threaded child process immediately
/// after `fork()`, before any other thread is created.
#[cfg(unix)]
pub unsafe fn reinit_rwlock_after_fork<T: ?Sized>(rwlock: &PyRwLock<T>) {
unsafe {
let raw = rwlock.raw() as *const RawRwLock as *mut u8;
core::ptr::write_bytes(raw, 0, core::mem::size_of::<RawRwLock>());
}
unsafe { zero_reinit_after_fork(rwlock.raw()) }
}

/// Reset a `PyThreadMutex` to its initial (unlocked, unowned) state after `fork()`.
Expand Down
4 changes: 3 additions & 1 deletion crates/stdlib/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,9 @@ mod decl {

loop {
let mut tv = timeout.map(sec_to_timeval);
let res = super::select(nfds, &mut r, &mut w, &mut x, tv.as_mut());
let res = vm.allow_threads(|| {
super::select(nfds, &mut r, &mut w, &mut x, tv.as_mut())
});

match res {
Ok(_) => break,
Expand Down
11 changes: 7 additions & 4 deletions crates/stdlib/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,8 @@ mod _socket {
loop {
if deadline.is_some() || matches!(select, SelectKind::Connect) {
let interval = deadline.as_ref().map(|d| d.time_until()).transpose()?;
let res = sock_select(&*self.sock()?, select, interval);
let sock = self.sock()?;
let res = vm.allow_threads(|| sock_select(&sock, select, interval));
match res {
Ok(true) => return Err(IoOrPyException::Timeout),
Err(e) if e.kind() == io::ErrorKind::Interrupted => {
Expand All @@ -1118,8 +1119,9 @@ mod _socket {
}

let err = loop {
// loop on interrupt
match f() {
// Detach thread state around the blocking syscall so
// stop-the-world can park this thread (e.g. before fork).
match vm.allow_threads(&mut f) {
Ok(x) => return Ok(x),
Err(e) if e.kind() == io::ErrorKind::Interrupted => vm.check_signals()?,
Err(e) => break e,
Expand Down Expand Up @@ -1342,7 +1344,8 @@ mod _socket {
) -> Result<(), IoOrPyException> {
let sock_addr = self.extract_address(address, caller, vm)?;

let err = match self.sock()?.connect(&sock_addr) {
let sock = self.sock()?;
let err = match vm.allow_threads(|| sock.connect(&sock_addr)) {
Ok(()) => return Ok(()),
Err(e) => e,
};
Expand Down
17 changes: 15 additions & 2 deletions crates/vm/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7016,6 +7016,14 @@ impl ExecutingFrame<'_> {
Ok(None)
}

/// Read a cached descriptor pointer and validate it against the expected
/// type version, using a lock-free double-check pattern:
/// 1. read pointer → incref (try_to_owned)
/// 2. re-read version + pointer and confirm they still match
///
/// This matches the read-side pattern used in LOAD_ATTR_METHOD_WITH_VALUES
/// and friends: no read-side lock, relying on the write side to invalidate
/// the version tag before swapping the pointer.
#[inline]
fn try_read_cached_descriptor(
&self,
Expand All @@ -7026,7 +7034,12 @@ impl ExecutingFrame<'_> {
if descr_ptr == 0 {
return None;
}
// SAFETY: `descr_ptr` was a valid `*mut PyObject` when the writer
// stored it, and the writer keeps a strong reference alive in
// `InlineCacheEntry`. `try_to_owned_from_ptr` performs a
// conditional incref that fails if the object is already freed.
let cloned = unsafe { PyObject::try_to_owned_from_ptr(descr_ptr as *mut PyObject) }?;
// Double-check: version tag still matches AND pointer unchanged.
if self.code.instructions.read_cache_u32(cache_base + 1) == expected_type_version
&& self.code.instructions.read_cache_ptr(cache_base + 5) == descr_ptr
{
Expand All @@ -7044,8 +7057,9 @@ impl ExecutingFrame<'_> {
type_version: u32,
descr_ptr: usize,
) {
// Publish descriptor cache atomically as a tuple:
// Publish descriptor cache with version-invalidation protocol:
// invalidate version first, then write payload, then publish version.
// Reader double-checks version+ptr after incref, so no writer lock needed.
unsafe {
self.code.instructions.write_cache_u32(cache_base + 1, 0);
self.code
Expand All @@ -7065,7 +7079,6 @@ impl ExecutingFrame<'_> {
metaclass_version: u32,
descr_ptr: usize,
) {
// Same publish protocol as write_cached_descriptor(), plus metaclass guard.
unsafe {
self.code.instructions.write_cache_u32(cache_base + 1, 0);
self.code
Expand Down
26 changes: 20 additions & 6 deletions crates/vm/src/object/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use super::{
};
use crate::object::traverse_object::PyObjVTable;
use crate::{
builtins::{PyDict, PyDictRef, PyType, PyTypeRef},
builtins::{PyDictRef, PyType, PyTypeRef},
common::{
atomic::{Ordering, PyAtomic, Radium},
linked_list::{Link, Pointers},
Expand Down Expand Up @@ -916,6 +916,12 @@ impl InstanceDict {
pub fn replace(&self, d: PyDictRef) -> PyDictRef {
core::mem::replace(&mut self.d.write(), d)
}

/// Consume the InstanceDict and return the inner PyDictRef.
#[inline]
pub fn into_inner(self) -> PyDictRef {
self.d.into_inner()
}
}

impl<T: PyPayload> PyInner<T> {
Expand Down Expand Up @@ -1668,11 +1674,19 @@ impl PyObject {
}

// 2. Clear dict and member slots (subtype_clear)
if let Some(ext) = obj.0.ext_ref() {
if let Some(dict) = ext.dict.as_ref() {
let dict_ref = dict.get();
// Clear dict entries to break cycles, then collect the dict itself
PyDict::clear(&dict_ref);
// Use mutable access to actually detach the dict, matching CPython's
// Py_CLEAR(*_PyObject_GetDictPtr(self)) which NULLs the dict pointer
// without clearing dict contents. This is critical because the dict
// may still be referenced by other live objects (e.g. function.__globals__).
if obj.0.has_ext() {
let self_addr = (ptr as *const u8).addr();
let ext_ptr = core::ptr::with_exposed_provenance_mut::<ObjExt>(
self_addr.wrapping_sub(EXT_OFFSET),
);
let ext = unsafe { &mut *ext_ptr };
if let Some(old_dict) = ext.dict.take() {
// Get the dict ref before dropping InstanceDict
let dict_ref = old_dict.into_inner();
result.push(dict_ref.into());
}
for slot in ext.slots.iter() {
Expand Down
46 changes: 39 additions & 7 deletions crates/vm/src/stdlib/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod lock {

#[pyfunction]
fn acquire_lock(_vm: &VirtualMachine) {
IMP_LOCK.lock()
acquire_lock_for_fork()
}
Comment on lines 17 to 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Detach while waiting on IMP_LOCK.

acquire_lock() can now block in IMP_LOCK.lock() while the thread stays ATTACHED. That gives stop_the_world() the same failure mode this PR is fixing elsewhere: the requester can wait forever on a thread parked inside the import lock instead of reaching check_signals(). The Python-facing path should use vm.allow_threads(...) around the blocking acquire.

Suggested fix
 #[pyfunction]
-fn acquire_lock(_vm: &VirtualMachine) {
-    acquire_lock_for_fork()
+fn acquire_lock(vm: &VirtualMachine) {
+    vm.allow_threads(acquire_lock_for_fork)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[pyfunction]
fn acquire_lock(_vm: &VirtualMachine) {
IMP_LOCK.lock()
acquire_lock_for_fork()
}
#[pyfunction]
fn acquire_lock(vm: &VirtualMachine) {
vm.allow_threads(acquire_lock_for_fork)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/vm/src/stdlib/imp.rs` around lines 17 - 20, acquire_lock currently
calls acquire_lock_for_fork and may block on IMP_LOCK while the Python thread
remains ATTACHED, preventing stop_the_world from observing signals; wrap the
blocking lock acquisition in the Python-facing path with vm.allow_threads(...)
so the thread detaches while waiting — modify the acquire_lock function to call
vm.allow_threads(|| acquire_lock_for_fork()) (or the equivalent closure/callback
pattern used by VirtualMachine) so the IMP_LOCK.lock() happens while threads are
allowed, ensuring check_signals() can run on the requester.


#[pyfunction]
Expand All @@ -34,6 +34,16 @@ mod lock {
IMP_LOCK.is_locked()
}

pub(super) fn acquire_lock_for_fork() {
IMP_LOCK.lock();
}

pub(super) fn release_lock_after_fork_parent() {
if IMP_LOCK.is_locked() && IMP_LOCK.is_owned_by_current_thread() {
unsafe { IMP_LOCK.unlock() };
}
}

/// Reset import lock after fork() — only if held by a dead thread.
///
/// `IMP_LOCK` is a reentrant mutex. If the *current* (surviving) thread
Expand All @@ -47,22 +57,44 @@ mod lock {
pub(crate) unsafe fn reinit_after_fork() {
if IMP_LOCK.is_locked() && !IMP_LOCK.is_owned_by_current_thread() {
// Held by a dead thread — reset to unlocked.
// Same pattern as RLock::_at_fork_reinit in thread.rs.
unsafe {
let old: &crossbeam_utils::atomic::AtomicCell<RawRMutex> =
core::mem::transmute(&IMP_LOCK);
old.swap(RawRMutex::INIT);
}
unsafe { rustpython_common::lock::zero_reinit_after_fork(&IMP_LOCK) };
}
}

/// Match CPython's `_PyImport_ReInitLock()` + `_PyImport_ReleaseLock()`
/// behavior in the post-fork child:
/// 1) if ownership metadata is stale (dead owner / changed tid), reset;
/// 2) if current thread owns the lock, release it.
#[cfg(unix)]
pub(super) unsafe fn after_fork_child_reinit_and_release() {
unsafe { reinit_after_fork() };
if IMP_LOCK.is_locked() && IMP_LOCK.is_owned_by_current_thread() {
unsafe { IMP_LOCK.unlock() };
}
}
}

/// Re-export for fork safety code in posix.rs
#[cfg(feature = "threading")]
pub(crate) fn acquire_imp_lock_for_fork() {
lock::acquire_lock_for_fork();
}

#[cfg(feature = "threading")]
pub(crate) fn release_imp_lock_after_fork_parent() {
lock::release_lock_after_fork_parent();
}

#[cfg(all(unix, feature = "threading"))]
pub(crate) unsafe fn reinit_imp_lock_after_fork() {
unsafe { lock::reinit_after_fork() }
}

#[cfg(all(unix, feature = "threading"))]
pub(crate) unsafe fn after_fork_child_imp_lock_release() {
unsafe { lock::after_fork_child_reinit_and_release() }
}

#[cfg(not(feature = "threading"))]
#[pymodule(sub)]
mod lock {
Expand Down
13 changes: 6 additions & 7 deletions crates/vm/src/stdlib/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5015,13 +5015,13 @@ mod _io {

if let Some(tio) = obj.downcast_ref::<TextIOWrapper>() {
unsafe { reinit_thread_mutex_after_fork(&tio.data) };
if let Some(guard) = tio.data.lock() {
if let Some(ref data) = *guard {
if let Some(ref decoder) = data.decoder {
reinit_io_locks(decoder);
}
reinit_io_locks(&data.buffer);
if let Some(guard) = tio.data.lock()
&& let Some(ref data) = *guard
{
if let Some(ref decoder) = data.decoder {
reinit_io_locks(decoder);
}
reinit_io_locks(&data.buffer);
}
return;
}
Expand All @@ -5044,7 +5044,6 @@ mod _io {
if let Some(brw) = obj.downcast_ref::<BufferedRWPair>() {
unsafe { reinit_thread_mutex_after_fork(&brw.read.data) };
unsafe { reinit_thread_mutex_after_fork(&brw.write.data) };
return;
}
}

Expand Down
Loading
Loading