From 521e15563359171b0bac4a2660e56c88e9dcc8a8 Mon Sep 17 00:00:00 2001 From: "Jeong, YunWon" Date: Sat, 7 Mar 2026 13:30:07 +0900 Subject: [PATCH 1/2] Suspend Python threads before fork() Add stop-the-world thread suspension around fork() to prevent deadlocks from locks held by dead parent threads in the child. - Thread states: DETACHED / ATTACHED / SUSPENDED with atomic CAS transitions matching _PyThreadState_{Attach,Detach,Suspend} - stop_the_world / start_the_world: park all non-requester threads before fork, resume after (parent) or reset (child) - allow_threads (Py_BEGIN/END_ALLOW_THREADS): detach around blocking syscalls (os.read/write, waitpid, Lock.acquire, time.sleep) so stop_the_world can force-park via CAS - Acquire/release import lock around fork lifecycle - zero_reinit_after_fork: generic lock reset for parking_lot types - gc_clear_raw: detach dict instead of clearing entries - Lock-free double-check for descriptor cache reads (no read-side seqlock); write-side seqlock retained for writer serialization - fork() returns PyResult, checks PythonFinalizationError, calls sys.audit --- .cspell.dict/cpython.txt | 7 +- .cspell.json | 5 - Lib/test/test_fork1.py | 1 - Lib/test/test_os.py | 1 - crates/common/src/lock.rs | 40 +-- crates/compiler-core/src/bytecode.rs | 48 +++- crates/stdlib/src/select.rs | 4 +- crates/stdlib/src/socket.rs | 11 +- crates/vm/src/frame.rs | 17 ++ crates/vm/src/object/core.rs | 26 +- crates/vm/src/stdlib/imp.rs | 46 +++- crates/vm/src/stdlib/io.rs | 13 +- crates/vm/src/stdlib/os.rs | 12 +- crates/vm/src/stdlib/posix.rs | 68 ++++- crates/vm/src/stdlib/thread.rs | 103 +++++--- crates/vm/src/stdlib/time.rs | 11 +- crates/vm/src/vm/interpreter.rs | 8 +- crates/vm/src/vm/mod.rs | 370 +++++++++++++++++++++++++++ crates/vm/src/vm/thread.rs | 286 ++++++++++++++++++++- 19 files changed, 971 insertions(+), 106 deletions(-) diff --git a/.cspell.dict/cpython.txt b/.cspell.dict/cpython.txt index b8081e25a9b..684c7a5b614 100644 --- a/.cspell.dict/cpython.txt +++ b/.cspell.dict/cpython.txt @@ -127,8 +127,8 @@ NEWLOCALS newsemlockobject nfrees nkwargs -nlocalsplus nkwelts +nlocalsplus Nondescriptor noninteger nops @@ -160,6 +160,7 @@ pylifecycle pymain pyrepl PYTHONTRACEMALLOC +PYTHONUTF8 pythonw PYTHREAD_NAME releasebuffer @@ -171,9 +172,11 @@ saveall scls setdict setfunc +setprofileallthreads SETREF setresult setslice +settraceallthreads SLOTDEFINED SMALLBUF SOABI @@ -190,8 +193,10 @@ subparams subscr sval swappedbytes +sysdict templatelib testconsole +threadstate ticketer tmptype tok_oldval diff --git a/.cspell.json b/.cspell.json index e2b1d86aaeb..07fe948c5bf 100644 --- a/.cspell.json +++ b/.cspell.json @@ -152,11 +152,6 @@ "IFEXEC", // "stat" "FIRMLINK", - // CPython internal names - "PYTHONUTF", - "sysdict", - "settraceallthreads", - "setprofileallthreads" ], // flagWords - list of words to be always considered incorrect "flagWords": [ diff --git a/Lib/test/test_fork1.py b/Lib/test/test_fork1.py index 4f4a5ee0507..a6523bbc518 100644 --- a/Lib/test/test_fork1.py +++ b/Lib/test/test_fork1.py @@ -19,7 +19,6 @@ class ForkTest(ForkWait): - @unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: process 44587 exited with code 1, but exit code 42 is expected def test_threaded_import_lock_fork(self): """Check fork() in main thread works while a subthread is doing an import""" import_started = threading.Event() diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index d63dc60be31..00bd75bab51 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -5574,7 +5574,6 @@ def test_fork_warns_when_non_python_thread_exists(self): self.assertEqual(err.decode("utf-8"), "") self.assertEqual(out.decode("utf-8"), "") - @unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: b"can't fork at interpreter shutdown" not found in b"Exception ignored in: \nAttributeError: 'NoneType' object has no attribute 'fork'\n" def test_fork_at_finalization(self): code = """if 1: import atexit diff --git a/crates/common/src/lock.rs b/crates/common/src/lock.rs index af680010821..cd7df512d83 100644 --- a/crates/common/src/lock.rs +++ b/crates/common/src/lock.rs @@ -68,32 +68,37 @@ pub type PyMappedRwLockWriteGuard<'a, T> = MappedRwLockWriteGuard<'a, RawRwLock, // can add fn const_{mutex,rw_lock}() if necessary, but we probably won't need to -/// Reset a `PyMutex` to its initial (unlocked) state after `fork()`. +/// Reset a lock to its initial (unlocked) state by zeroing its bytes. /// -/// After `fork()`, locks held by dead parent threads would deadlock in the -/// child. This writes `RawMutex::INIT` via the `Mutex::raw()` accessor, -/// bypassing the normal unlock path which may interact with parking_lot's -/// internal waiter queues. +/// After `fork()`, any lock held by a now-dead thread would remain +/// permanently locked. We zero the raw bytes (the unlocked state for all +/// `parking_lot` raw lock types) instead of using the normal unlock path, +/// which would interact with stale waiter queues. /// /// # Safety /// /// Must only be called from the single-threaded child process immediately /// after `fork()`, before any other thread is created. -#[cfg(unix)] -pub unsafe fn reinit_mutex_after_fork(mutex: &PyMutex) { - // Use Mutex::raw() to access the underlying lock without layout assumptions. - // parking_lot::RawMutex (AtomicU8) and RawCellMutex (Cell) both - // represent the unlocked state as all-zero bytes. +/// The type `T` must represent the unlocked state as all-zero bytes +/// (true for `parking_lot::RawMutex`, `RawRwLock`, `RawReentrantMutex`, etc.). +pub unsafe fn zero_reinit_after_fork(lock: *const T) { unsafe { - let raw = mutex.raw() as *const RawMutex as *mut u8; - core::ptr::write_bytes(raw, 0, core::mem::size_of::()); + core::ptr::write_bytes(lock as *mut u8, 0, core::mem::size_of::()); } } -/// Reset a `PyRwLock` to its initial (unlocked) state after `fork()`. +/// Reset a `PyMutex` after `fork()`. See [`zero_reinit_after_fork`]. +/// +/// # Safety /// -/// Same rationale as [`reinit_mutex_after_fork`] — dead threads' read or -/// write locks would cause permanent deadlock in the child. +/// Must only be called from the single-threaded child process immediately +/// after `fork()`, before any other thread is created. +#[cfg(unix)] +pub unsafe fn reinit_mutex_after_fork(mutex: &PyMutex) { + unsafe { zero_reinit_after_fork(mutex.raw()) } +} + +/// Reset a `PyRwLock` after `fork()`. See [`zero_reinit_after_fork`]. /// /// # Safety /// @@ -101,10 +106,7 @@ pub unsafe fn reinit_mutex_after_fork(mutex: &PyMutex) { /// after `fork()`, before any other thread is created. #[cfg(unix)] pub unsafe fn reinit_rwlock_after_fork(rwlock: &PyRwLock) { - unsafe { - let raw = rwlock.raw() as *const RawRwLock as *mut u8; - core::ptr::write_bytes(raw, 0, core::mem::size_of::()); - } + unsafe { zero_reinit_after_fork(rwlock.raw()) } } /// Reset a `PyThreadMutex` to its initial (unlocked, unowned) state after `fork()`. diff --git a/crates/compiler-core/src/bytecode.rs b/crates/compiler-core/src/bytecode.rs index 46182962654..df219ce9075 100644 --- a/crates/compiler-core/src/bytecode.rs +++ b/crates/compiler-core/src/bytecode.rs @@ -12,7 +12,7 @@ use core::{ cell::UnsafeCell, hash, mem, ops::Deref, - sync::atomic::{AtomicU8, AtomicU16, AtomicUsize, Ordering}, + sync::atomic::{AtomicU8, AtomicU16, AtomicU32, AtomicUsize, Ordering}, }; use itertools::Itertools; use malachite_bigint::BigInt; @@ -415,6 +415,9 @@ pub struct CodeUnits { /// Single atomic load/store prevents torn reads when multiple threads /// specialize the same instruction concurrently. pointer_cache: Box<[AtomicUsize]>, + /// SeqLock counter per instruction cache base for descriptor payload writes. + /// odd = write in progress, even = quiescent. + descriptor_sequences: Box<[AtomicU32]>, } // SAFETY: All cache operations use atomic read/write instructions. @@ -441,10 +444,16 @@ impl Clone for CodeUnits { .iter() .map(|c| AtomicUsize::new(c.load(Ordering::Relaxed))) .collect(); + let descriptor_sequences = self + .descriptor_sequences + .iter() + .map(|c| AtomicU32::new(c.load(Ordering::Relaxed))) + .collect(); Self { units: UnsafeCell::new(units), adaptive_counters, pointer_cache, + descriptor_sequences, } } } @@ -491,10 +500,15 @@ impl From> for CodeUnits { .map(|_| AtomicUsize::new(0)) .collect::>() .into_boxed_slice(); + let descriptor_sequences = (0..len) + .map(|_| AtomicU32::new(0)) + .collect::>() + .into_boxed_slice(); Self { units: UnsafeCell::new(units), adaptive_counters, pointer_cache, + descriptor_sequences, } } } @@ -641,6 +655,38 @@ impl CodeUnits { self.pointer_cache[index].load(Ordering::Relaxed) } + #[inline] + pub fn begin_descriptor_write(&self, index: usize) { + let sequence = &self.descriptor_sequences[index]; + let mut seq = sequence.load(Ordering::Acquire); + loop { + while (seq & 1) != 0 { + core::hint::spin_loop(); + seq = sequence.load(Ordering::Acquire); + } + match sequence.compare_exchange_weak( + seq, + seq.wrapping_add(1), + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + core::sync::atomic::fence(Ordering::Release); + break; + } + Err(observed) => { + core::hint::spin_loop(); + seq = observed; + } + } + } + } + + #[inline] + pub fn end_descriptor_write(&self, index: usize) { + self.descriptor_sequences[index].fetch_add(1, Ordering::Release); + } + /// Read adaptive counter bits for instruction at `index`. /// Uses Relaxed atomic load. pub fn read_adaptive_counter(&self, index: usize) -> u16 { diff --git a/crates/stdlib/src/select.rs b/crates/stdlib/src/select.rs index 181c4573996..05d9eaa550d 100644 --- a/crates/stdlib/src/select.rs +++ b/crates/stdlib/src/select.rs @@ -280,7 +280,9 @@ mod decl { loop { let mut tv = timeout.map(sec_to_timeval); - let res = super::select(nfds, &mut r, &mut w, &mut x, tv.as_mut()); + let res = vm.allow_threads(|| { + super::select(nfds, &mut r, &mut w, &mut x, tv.as_mut()) + }); match res { Ok(_) => break, diff --git a/crates/stdlib/src/socket.rs b/crates/stdlib/src/socket.rs index 0dce9b59d2c..617e50c002f 100644 --- a/crates/stdlib/src/socket.rs +++ b/crates/stdlib/src/socket.rs @@ -1105,7 +1105,8 @@ mod _socket { loop { if deadline.is_some() || matches!(select, SelectKind::Connect) { let interval = deadline.as_ref().map(|d| d.time_until()).transpose()?; - let res = sock_select(&*self.sock()?, select, interval); + let sock = self.sock()?; + let res = vm.allow_threads(|| sock_select(&sock, select, interval)); match res { Ok(true) => return Err(IoOrPyException::Timeout), Err(e) if e.kind() == io::ErrorKind::Interrupted => { @@ -1118,8 +1119,9 @@ mod _socket { } let err = loop { - // loop on interrupt - match f() { + // Detach thread state around the blocking syscall so + // stop-the-world can park this thread (e.g. before fork). + match vm.allow_threads(&mut f) { Ok(x) => return Ok(x), Err(e) if e.kind() == io::ErrorKind::Interrupted => vm.check_signals()?, Err(e) => break e, @@ -1342,7 +1344,8 @@ mod _socket { ) -> Result<(), IoOrPyException> { let sock_addr = self.extract_address(address, caller, vm)?; - let err = match self.sock()?.connect(&sock_addr) { + let sock = self.sock()?; + let err = match vm.allow_threads(|| sock.connect(&sock_addr)) { Ok(()) => return Ok(()), Err(e) => e, }; diff --git a/crates/vm/src/frame.rs b/crates/vm/src/frame.rs index 2f4f4483b8f..4e2a9d7f345 100644 --- a/crates/vm/src/frame.rs +++ b/crates/vm/src/frame.rs @@ -7016,6 +7016,14 @@ impl ExecutingFrame<'_> { Ok(None) } + /// Read a cached descriptor pointer and validate it against the expected + /// type version, using a lock-free double-check pattern: + /// 1. read pointer → incref (try_to_owned) + /// 2. re-read version + pointer and confirm they still match + /// + /// This matches the read-side pattern used in LOAD_ATTR_METHOD_WITH_VALUES + /// and friends: no read-side lock, relying on the write side to invalidate + /// the version tag before swapping the pointer. #[inline] fn try_read_cached_descriptor( &self, @@ -7026,7 +7034,12 @@ impl ExecutingFrame<'_> { if descr_ptr == 0 { return None; } + // SAFETY: `descr_ptr` was a valid `*mut PyObject` when the writer + // stored it, and the writer keeps a strong reference alive in + // `InlineCacheEntry`. `try_to_owned_from_ptr` performs a + // conditional incref that fails if the object is already freed. let cloned = unsafe { PyObject::try_to_owned_from_ptr(descr_ptr as *mut PyObject) }?; + // Double-check: version tag still matches AND pointer unchanged. if self.code.instructions.read_cache_u32(cache_base + 1) == expected_type_version && self.code.instructions.read_cache_ptr(cache_base + 5) == descr_ptr { @@ -7046,6 +7059,7 @@ impl ExecutingFrame<'_> { ) { // Publish descriptor cache atomically as a tuple: // invalidate version first, then write payload, then publish version. + self.code.instructions.begin_descriptor_write(cache_base); unsafe { self.code.instructions.write_cache_u32(cache_base + 1, 0); self.code @@ -7055,6 +7069,7 @@ impl ExecutingFrame<'_> { .instructions .write_cache_u32(cache_base + 1, type_version); } + self.code.instructions.end_descriptor_write(cache_base); } #[inline] @@ -7066,6 +7081,7 @@ impl ExecutingFrame<'_> { descr_ptr: usize, ) { // Same publish protocol as write_cached_descriptor(), plus metaclass guard. + self.code.instructions.begin_descriptor_write(cache_base); unsafe { self.code.instructions.write_cache_u32(cache_base + 1, 0); self.code @@ -7078,6 +7094,7 @@ impl ExecutingFrame<'_> { .instructions .write_cache_u32(cache_base + 1, type_version); } + self.code.instructions.end_descriptor_write(cache_base); } fn load_attr(&mut self, vm: &VirtualMachine, oparg: LoadAttr) -> FrameResult { diff --git a/crates/vm/src/object/core.rs b/crates/vm/src/object/core.rs index 4198c00be74..c40d204cd60 100644 --- a/crates/vm/src/object/core.rs +++ b/crates/vm/src/object/core.rs @@ -17,7 +17,7 @@ use super::{ }; use crate::object::traverse_object::PyObjVTable; use crate::{ - builtins::{PyDict, PyDictRef, PyType, PyTypeRef}, + builtins::{PyDictRef, PyType, PyTypeRef}, common::{ atomic::{Ordering, PyAtomic, Radium}, linked_list::{Link, Pointers}, @@ -916,6 +916,12 @@ impl InstanceDict { pub fn replace(&self, d: PyDictRef) -> PyDictRef { core::mem::replace(&mut self.d.write(), d) } + + /// Consume the InstanceDict and return the inner PyDictRef. + #[inline] + pub fn into_inner(self) -> PyDictRef { + self.d.into_inner() + } } impl PyInner { @@ -1668,11 +1674,19 @@ impl PyObject { } // 2. Clear dict and member slots (subtype_clear) - if let Some(ext) = obj.0.ext_ref() { - if let Some(dict) = ext.dict.as_ref() { - let dict_ref = dict.get(); - // Clear dict entries to break cycles, then collect the dict itself - PyDict::clear(&dict_ref); + // Use mutable access to actually detach the dict, matching CPython's + // Py_CLEAR(*_PyObject_GetDictPtr(self)) which NULLs the dict pointer + // without clearing dict contents. This is critical because the dict + // may still be referenced by other live objects (e.g. function.__globals__). + if obj.0.has_ext() { + let self_addr = (ptr as *const u8).addr(); + let ext_ptr = core::ptr::with_exposed_provenance_mut::( + self_addr.wrapping_sub(EXT_OFFSET), + ); + let ext = unsafe { &mut *ext_ptr }; + if let Some(old_dict) = ext.dict.take() { + // Get the dict ref before dropping InstanceDict + let dict_ref = old_dict.into_inner(); result.push(dict_ref.into()); } for slot in ext.slots.iter() { diff --git a/crates/vm/src/stdlib/imp.rs b/crates/vm/src/stdlib/imp.rs index 087556c8cf2..66ce5239cd2 100644 --- a/crates/vm/src/stdlib/imp.rs +++ b/crates/vm/src/stdlib/imp.rs @@ -16,7 +16,7 @@ mod lock { #[pyfunction] fn acquire_lock(_vm: &VirtualMachine) { - IMP_LOCK.lock() + acquire_lock_for_fork() } #[pyfunction] @@ -34,6 +34,16 @@ mod lock { IMP_LOCK.is_locked() } + pub(super) fn acquire_lock_for_fork() { + IMP_LOCK.lock(); + } + + pub(super) fn release_lock_after_fork_parent() { + if IMP_LOCK.is_locked() && IMP_LOCK.is_owned_by_current_thread() { + unsafe { IMP_LOCK.unlock() }; + } + } + /// Reset import lock after fork() — only if held by a dead thread. /// /// `IMP_LOCK` is a reentrant mutex. If the *current* (surviving) thread @@ -47,22 +57,44 @@ mod lock { pub(crate) unsafe fn reinit_after_fork() { if IMP_LOCK.is_locked() && !IMP_LOCK.is_owned_by_current_thread() { // Held by a dead thread — reset to unlocked. - // Same pattern as RLock::_at_fork_reinit in thread.rs. - unsafe { - let old: &crossbeam_utils::atomic::AtomicCell = - core::mem::transmute(&IMP_LOCK); - old.swap(RawRMutex::INIT); - } + unsafe { rustpython_common::lock::zero_reinit_after_fork(&IMP_LOCK) }; + } + } + + /// Match CPython's `_PyImport_ReInitLock()` + `_PyImport_ReleaseLock()` + /// behavior in the post-fork child: + /// 1) if ownership metadata is stale (dead owner / changed tid), reset; + /// 2) if current thread owns the lock, release it. + #[cfg(unix)] + pub(super) unsafe fn after_fork_child_reinit_and_release() { + unsafe { reinit_after_fork() }; + if IMP_LOCK.is_locked() && IMP_LOCK.is_owned_by_current_thread() { + unsafe { IMP_LOCK.unlock() }; } } } /// Re-export for fork safety code in posix.rs +#[cfg(feature = "threading")] +pub(crate) fn acquire_imp_lock_for_fork() { + lock::acquire_lock_for_fork(); +} + +#[cfg(feature = "threading")] +pub(crate) fn release_imp_lock_after_fork_parent() { + lock::release_lock_after_fork_parent(); +} + #[cfg(all(unix, feature = "threading"))] pub(crate) unsafe fn reinit_imp_lock_after_fork() { unsafe { lock::reinit_after_fork() } } +#[cfg(all(unix, feature = "threading"))] +pub(crate) unsafe fn after_fork_child_imp_lock_release() { + unsafe { lock::after_fork_child_reinit_and_release() } +} + #[cfg(not(feature = "threading"))] #[pymodule(sub)] mod lock { diff --git a/crates/vm/src/stdlib/io.rs b/crates/vm/src/stdlib/io.rs index 0e636d986f5..a313b3d98df 100644 --- a/crates/vm/src/stdlib/io.rs +++ b/crates/vm/src/stdlib/io.rs @@ -5015,13 +5015,13 @@ mod _io { if let Some(tio) = obj.downcast_ref::() { unsafe { reinit_thread_mutex_after_fork(&tio.data) }; - if let Some(guard) = tio.data.lock() { - if let Some(ref data) = *guard { - if let Some(ref decoder) = data.decoder { - reinit_io_locks(decoder); - } - reinit_io_locks(&data.buffer); + if let Some(guard) = tio.data.lock() + && let Some(ref data) = *guard + { + if let Some(ref decoder) = data.decoder { + reinit_io_locks(decoder); } + reinit_io_locks(&data.buffer); } return; } @@ -5044,7 +5044,6 @@ mod _io { if let Some(brw) = obj.downcast_ref::() { unsafe { reinit_thread_mutex_after_fork(&brw.read.data) }; unsafe { reinit_thread_mutex_after_fork(&brw.write.data) }; - return; } } diff --git a/crates/vm/src/stdlib/os.rs b/crates/vm/src/stdlib/os.rs index 5456b9420a0..d297f7e0fbc 100644 --- a/crates/vm/src/stdlib/os.rs +++ b/crates/vm/src/stdlib/os.rs @@ -287,7 +287,7 @@ pub(super) mod _os { fn read(fd: crt_fd::Borrowed<'_>, n: usize, vm: &VirtualMachine) -> PyResult { let mut buffer = vec![0u8; n]; loop { - match crt_fd::read(fd, &mut buffer) { + match vm.allow_threads(|| crt_fd::read(fd, &mut buffer)) { Ok(n) => { buffer.truncate(n); return Ok(vm.ctx.new_bytes(buffer)); @@ -309,7 +309,7 @@ pub(super) mod _os { ) -> PyResult { buffer.with_ref(|buf| { loop { - match crt_fd::read(fd, buf) { + match vm.allow_threads(|| crt_fd::read(fd, buf)) { Ok(n) => return Ok(n), Err(e) if e.raw_os_error() == Some(libc::EINTR) => { vm.check_signals()?; @@ -322,8 +322,12 @@ pub(super) mod _os { } #[pyfunction] - fn write(fd: crt_fd::Borrowed<'_>, data: ArgBytesLike) -> io::Result { - data.with_ref(|b| crt_fd::write(fd, b)) + fn write( + fd: crt_fd::Borrowed<'_>, + data: ArgBytesLike, + vm: &VirtualMachine, + ) -> io::Result { + data.with_ref(|b| vm.allow_threads(|| crt_fd::write(fd, b))) } #[cfg(not(windows))] diff --git a/crates/vm/src/stdlib/posix.rs b/crates/vm/src/stdlib/posix.rs index d34693a0317..6f2342ec3e3 100644 --- a/crates/vm/src/stdlib/posix.rs +++ b/crates/vm/src/stdlib/posix.rs @@ -767,9 +767,18 @@ pub mod module { // only for before_forkers, refer: test_register_at_fork in test_posix run_at_forkers(before_forkers, true, vm); + + #[cfg(feature = "threading")] + crate::stdlib::imp::acquire_imp_lock_for_fork(); + + #[cfg(feature = "threading")] + vm.state.stop_the_world.stop_the_world(vm); } fn py_os_after_fork_child(vm: &VirtualMachine) { + #[cfg(feature = "threading")] + vm.state.stop_the_world.reset_after_fork(); + // Phase 1: Reset all internal locks FIRST. // After fork(), locks held by dead parent threads would deadlock // if we try to acquire them. This must happen before anything else. @@ -797,6 +806,13 @@ pub mod module { #[cfg(feature = "threading")] crate::stdlib::thread::after_fork_child(vm); + // CPython parity: reinit import lock ownership metadata in child + // and release the lock acquired by PyOS_BeforeFork(). + #[cfg(feature = "threading")] + unsafe { + crate::stdlib::imp::after_fork_child_imp_lock_release() + }; + // Initialize signal handlers for the child's main thread. // When forked from a worker thread, the OnceCell is empty. vm.signal_handlers @@ -846,6 +862,12 @@ pub mod module { } fn py_os_after_fork_parent(vm: &VirtualMachine) { + #[cfg(feature = "threading")] + vm.state.stop_the_world.start_the_world(vm); + + #[cfg(feature = "threading")] + crate::stdlib::imp::release_imp_lock_after_fork_parent(); + let after_forkers_parent: Vec = vm.state.after_forkers_parent.lock().clone(); run_at_forkers(after_forkers_parent, false, vm); } @@ -905,20 +927,41 @@ pub mod module { } #[pyfunction] - fn fork(vm: &VirtualMachine) -> i32 { - warn_if_multi_threaded("fork", vm); + fn fork(vm: &VirtualMachine) -> PyResult { + if vm + .state + .finalizing + .load(core::sync::atomic::Ordering::Acquire) + { + return Err(vm.new_exception_msg( + vm.ctx.exceptions.python_finalization_error.to_owned(), + "can't fork at interpreter shutdown".into(), + )); + } + + // RustPython does not yet have C-level audit hooks; call sys.audit() + // to preserve Python-visible behavior and failure semantics. + vm.sys_module + .get_attr("audit", vm)? + .call(("os.fork",), vm)?; - let pid: i32; py_os_before_fork(vm); - unsafe { - pid = libc::fork(); - } + + let pid = unsafe { libc::fork() }; + // Save errno immediately — AfterFork callbacks may clobber it. + let saved_errno = nix::Error::last_raw(); if pid == 0 { py_os_after_fork_child(vm); } else { py_os_after_fork_parent(vm); + // Match CPython timing: warn only after parent callback path resumes world. + warn_if_multi_threaded("fork", vm); + } + if pid == -1 { + Err(nix::Error::from_raw(saved_errno).into_pyexception(vm)) + } else { + Ok(pid) } - pid } #[cfg(not(target_os = "redox"))] @@ -1835,13 +1878,18 @@ pub mod module { fn waitpid(pid: libc::pid_t, opt: i32, vm: &VirtualMachine) -> PyResult<(libc::pid_t, i32)> { let mut status = 0; loop { - let res = unsafe { libc::waitpid(pid, &mut status, opt) }; + // Capture errno inside the closure: attach_thread (called by + // allow_threads on return) can clobber errno via syscalls. + let (res, err) = vm.allow_threads(|| { + let r = unsafe { libc::waitpid(pid, &mut status, opt) }; + (r, nix::Error::last_raw()) + }); if res == -1 { - if nix::Error::last_raw() == libc::EINTR { + if err == libc::EINTR { vm.check_signals()?; continue; } - return Err(nix::Error::last().into_pyexception(vm)); + return Err(nix::Error::from_raw(err).into_pyexception(vm)); } return Ok((res, status)); } diff --git a/crates/vm/src/stdlib/thread.rs b/crates/vm/src/stdlib/thread.rs index e0458f7ae10..38abc47138d 100644 --- a/crates/vm/src/stdlib/thread.rs +++ b/crates/vm/src/stdlib/thread.rs @@ -23,7 +23,6 @@ pub(crate) mod _thread { sync::{Arc, Weak}, }; use core::{cell::RefCell, time::Duration}; - use crossbeam_utils::atomic::AtomicCell; use parking_lot::{ RawMutex, RawThreadId, lock_api::{RawMutex as RawMutexT, RawMutexTimed, RawReentrantMutex}, @@ -78,7 +77,7 @@ pub(crate) mod _thread { }; match args.blocking { true if timeout == -1.0 => { - mu.lock(); + vm.allow_threads(|| mu.lock()); Ok(true) } true if timeout < 0.0 => { @@ -94,7 +93,7 @@ pub(crate) mod _thread { )); } - Ok(mu.try_lock_for(Duration::from_secs_f64(timeout))) + Ok(vm.allow_threads(|| mu.try_lock_for(Duration::from_secs_f64(timeout)))) } false if timeout != -1.0 => Err(vm .new_value_error("can't specify a timeout for a non-blocking call".to_owned())), @@ -150,17 +149,12 @@ pub(crate) mod _thread { Ok(()) } + #[cfg(unix)] #[pymethod] fn _at_fork_reinit(&self, _vm: &VirtualMachine) -> PyResult<()> { - // Reset the mutex to unlocked by directly writing the INIT value. - // Do NOT call unlock() here — after fork(), unlock_slow() would - // try to unpark stale waiters from dead parent threads. - let new_mut = RawMutex::INIT; - unsafe { - let old_mutex: &AtomicCell = core::mem::transmute(&self.mu); - old_mutex.swap(new_mut); - } - + // Overwrite lock state to unlocked. Do NOT call unlock() here — + // after fork(), unlock_slow() would try to unpark stale waiters. + unsafe { rustpython_common::lock::zero_reinit_after_fork(&self.mu) }; Ok(()) } @@ -250,18 +244,13 @@ pub(crate) mod _thread { Ok(()) } + #[cfg(unix)] #[pymethod] fn _at_fork_reinit(&self, _vm: &VirtualMachine) -> PyResult<()> { - // Reset the reentrant mutex to unlocked by directly writing INIT. - // Do NOT call unlock() — after fork(), the slow path would try - // to unpark stale waiters from dead parent threads. + // Overwrite lock state to unlocked. Do NOT call unlock() here — + // after fork(), unlock_slow() would try to unpark stale waiters. self.count.store(0, core::sync::atomic::Ordering::Relaxed); - let new_mut = RawRMutex::INIT; - unsafe { - let old_mutex: &AtomicCell = core::mem::transmute(&self.mu); - old_mutex.swap(new_mut); - } - + unsafe { rustpython_common::lock::zero_reinit_after_fork(&self.mu) }; Ok(()) } @@ -344,6 +333,63 @@ pub(crate) mod _thread { current_thread_id() } + #[cfg(all(unix, feature = "threading"))] + #[pyfunction] + fn _stop_the_world_stats(vm: &VirtualMachine) -> PyResult { + let stats = vm.state.stop_the_world.stats_snapshot(); + let d = vm.ctx.new_dict(); + d.set_item("stop_calls", vm.ctx.new_int(stats.stop_calls).into(), vm)?; + d.set_item( + "last_wait_ns", + vm.ctx.new_int(stats.last_wait_ns).into(), + vm, + )?; + d.set_item( + "total_wait_ns", + vm.ctx.new_int(stats.total_wait_ns).into(), + vm, + )?; + d.set_item("max_wait_ns", vm.ctx.new_int(stats.max_wait_ns).into(), vm)?; + d.set_item("poll_loops", vm.ctx.new_int(stats.poll_loops).into(), vm)?; + d.set_item( + "attached_seen", + vm.ctx.new_int(stats.attached_seen).into(), + vm, + )?; + d.set_item( + "forced_parks", + vm.ctx.new_int(stats.forced_parks).into(), + vm, + )?; + d.set_item( + "suspend_notifications", + vm.ctx.new_int(stats.suspend_notifications).into(), + vm, + )?; + d.set_item( + "attach_wait_yields", + vm.ctx.new_int(stats.attach_wait_yields).into(), + vm, + )?; + d.set_item( + "suspend_wait_yields", + vm.ctx.new_int(stats.suspend_wait_yields).into(), + vm, + )?; + d.set_item( + "detach_wait_yields", + vm.ctx.new_int(stats.detach_wait_yields).into(), + vm, + )?; + Ok(d) + } + + #[cfg(all(unix, feature = "threading"))] + #[pyfunction] + fn _stop_the_world_reset_stats(vm: &VirtualMachine) { + vm.state.stop_the_world.reset_stats(); + } + /// Set the name of the current thread #[pyfunction] fn set_name(name: PyUtf8StrRef) { @@ -591,7 +637,7 @@ pub(crate) mod _thread { let (lock, cvar) = &*done_event; let mut done = lock.lock(); while !*done { - cvar.wait(&mut done); + vm.allow_threads(|| cvar.wait(&mut done)); } } None => break, // No more threads to wait on @@ -1019,10 +1065,7 @@ pub(crate) mod _thread { /// Reset a parking_lot::Mutex to unlocked state after fork. #[cfg(unix)] fn reinit_parking_lot_mutex(mutex: &parking_lot::Mutex) { - unsafe { - let raw = mutex.raw() as *const parking_lot::RawMutex as *mut u8; - core::ptr::write_bytes(raw, 0, core::mem::size_of::()); - } + unsafe { rustpython_common::lock::zero_reinit_after_fork(mutex.raw()) }; } // Thread handle state enum @@ -1135,14 +1178,14 @@ pub(crate) mod _thread { while !*done { if let Some(timeout) = timeout_duration { - let result = cvar.wait_for(&mut done, timeout); + let result = vm.allow_threads(|| cvar.wait_for(&mut done, timeout)); if result.timed_out() && !*done { // Timeout occurred and done is still false return Ok(()); } } else { // Infinite wait - cvar.wait(&mut done); + vm.allow_threads(|| cvar.wait(&mut done)); } } drop(done); @@ -1163,7 +1206,7 @@ pub(crate) mod _thread { let (lock, cvar) = &*self.done_event; let mut done = lock.lock(); while !*done { - cvar.wait(&mut done); + vm.allow_threads(|| cvar.wait(&mut done)); } return Ok(()); } @@ -1178,7 +1221,7 @@ pub(crate) mod _thread { // Perform the actual join outside the lock if let Some(handle) = join_handle { // Ignore the result - panics in spawned threads are already handled - let _ = handle.join(); + let _ = vm.allow_threads(|| handle.join()); } // Mark as joined and clear joining flag diff --git a/crates/vm/src/stdlib/time.rs b/crates/vm/src/stdlib/time.rs index 80749c066b6..d38152db84a 100644 --- a/crates/vm/src/stdlib/time.rs +++ b/crates/vm/src/stdlib/time.rs @@ -117,8 +117,13 @@ mod decl { { // this is basically std::thread::sleep, but that catches interrupts and we don't want to; let ts = nix::sys::time::TimeSpec::from(dur); - let res = unsafe { libc::nanosleep(ts.as_ref(), core::ptr::null_mut()) }; - let interrupted = res == -1 && nix::Error::last_raw() == libc::EINTR; + // Capture errno inside the closure: attach_thread (called by + // allow_threads on return) can clobber errno via syscalls. + let (res, err) = vm.allow_threads(|| { + let r = unsafe { libc::nanosleep(ts.as_ref(), core::ptr::null_mut()) }; + (r, nix::Error::last_raw()) + }); + let interrupted = res == -1 && err == libc::EINTR; if interrupted { vm.check_signals()?; @@ -127,7 +132,7 @@ mod decl { #[cfg(not(unix))] { - std::thread::sleep(dur); + vm.allow_threads(|| std::thread::sleep(dur)); } Ok(()) diff --git a/crates/vm/src/vm/interpreter.rs b/crates/vm/src/vm/interpreter.rs index 8e275d1ce9e..5bf7436e958 100644 --- a/crates/vm/src/vm/interpreter.rs +++ b/crates/vm/src/vm/interpreter.rs @@ -1,3 +1,5 @@ +#[cfg(all(unix, feature = "threading"))] +use super::StopTheWorldState; use super::{Context, PyConfig, PyGlobalState, VirtualMachine, setting::Settings, thread}; use crate::{ PyResult, builtins, common::rc::PyRc, frozen::FrozenModule, getpath, py_freeze, stdlib::atexit, @@ -124,6 +126,8 @@ where monitoring: PyMutex::default(), monitoring_events: AtomicCell::new(0), instrumentation_version: AtomicU64::new(0), + #[cfg(all(unix, feature = "threading"))] + stop_the_world: StopTheWorldState::new(), }); // Create VM with the global state @@ -470,8 +474,10 @@ fn core_frozen_inits() -> impl Iterator { crate_name = "rustpython_compiler_core" ); - // Collect and add frozen module aliases for test modules + // Collect frozen module entries let mut entries: Vec<_> = iter.collect(); + + // Add test module aliases if let Some(hello_code) = entries .iter() .find(|(n, _)| *n == "__hello__") diff --git a/crates/vm/src/vm/mod.rs b/crates/vm/src/vm/mod.rs index 6461502a582..502d892d895 100644 --- a/crates/vm/src/vm/mod.rs +++ b/crates/vm/src/vm/mod.rs @@ -125,6 +125,359 @@ struct ExceptionStack { stack: Vec>, } +/// Stop-the-world state for fork safety. Before `fork()`, the requester +/// stops all other Python threads so they are not holding internal locks. +#[cfg(all(unix, feature = "threading"))] +pub struct StopTheWorldState { + /// Fast-path flag checked in the bytecode loop (like `_PY_EVAL_PLEASE_STOP_BIT`) + pub(crate) requested: AtomicBool, + /// Ident of the thread that requested the stop (like `stw->requester`) + requester: AtomicU64, + /// Signaled by suspending threads when their state transitions to SUSPENDED + notify_mutex: std::sync::Mutex<()>, + notify_cv: std::sync::Condvar, + /// Number of stop-the-world attempts. + stats_stop_calls: AtomicU64, + /// Most recent stop-the-world wait duration in ns. + stats_last_wait_ns: AtomicU64, + /// Total accumulated stop-the-world wait duration in ns. + stats_total_wait_ns: AtomicU64, + /// Max observed stop-the-world wait duration in ns. + stats_max_wait_ns: AtomicU64, + /// Number of poll-loop iterations spent waiting. + stats_poll_loops: AtomicU64, + /// Number of ATTACHED threads observed while polling. + stats_attached_seen: AtomicU64, + /// Number of DETACHED->SUSPENDED parks requested by requester. + stats_forced_parks: AtomicU64, + /// Number of suspend notifications from worker threads. + stats_suspend_notifications: AtomicU64, + /// Number of yield loops while attach waited on SUSPENDED->DETACHED. + stats_attach_wait_yields: AtomicU64, + /// Number of yield loops while suspend waited on SUSPENDED->DETACHED. + stats_suspend_wait_yields: AtomicU64, + /// Number of yield loops while detach waited on SUSPENDED->DETACHED. + stats_detach_wait_yields: AtomicU64, +} + +#[cfg(all(unix, feature = "threading"))] +#[derive(Debug, Clone, Copy)] +pub struct StopTheWorldStats { + pub stop_calls: u64, + pub last_wait_ns: u64, + pub total_wait_ns: u64, + pub max_wait_ns: u64, + pub poll_loops: u64, + pub attached_seen: u64, + pub forced_parks: u64, + pub suspend_notifications: u64, + pub attach_wait_yields: u64, + pub suspend_wait_yields: u64, + pub detach_wait_yields: u64, +} + +#[cfg(all(unix, feature = "threading"))] +impl Default for StopTheWorldState { + fn default() -> Self { + Self::new() + } +} + +#[cfg(all(unix, feature = "threading"))] +impl StopTheWorldState { + pub const fn new() -> Self { + Self { + requested: AtomicBool::new(false), + requester: AtomicU64::new(0), + notify_mutex: std::sync::Mutex::new(()), + notify_cv: std::sync::Condvar::new(), + stats_stop_calls: AtomicU64::new(0), + stats_last_wait_ns: AtomicU64::new(0), + stats_total_wait_ns: AtomicU64::new(0), + stats_max_wait_ns: AtomicU64::new(0), + stats_poll_loops: AtomicU64::new(0), + stats_attached_seen: AtomicU64::new(0), + stats_forced_parks: AtomicU64::new(0), + stats_suspend_notifications: AtomicU64::new(0), + stats_attach_wait_yields: AtomicU64::new(0), + stats_suspend_wait_yields: AtomicU64::new(0), + stats_detach_wait_yields: AtomicU64::new(0), + } + } + + /// Wake the stop-the-world requester (called by each thread that suspends). + pub(crate) fn notify_suspended(&self) { + self.stats_suspend_notifications + .fetch_add(1, Ordering::Relaxed); + // Just signal the condvar; the requester holds the mutex. + self.notify_cv.notify_one(); + } + + /// Try to CAS detached threads directly to SUSPENDED and check whether + /// all non-requester threads are now SUSPENDED. + /// Like CPython's `park_detached_threads`. + fn park_detached_threads(&self, vm: &VirtualMachine) -> bool { + use thread::{THREAD_ATTACHED, THREAD_DETACHED, THREAD_SUSPENDED}; + let requester = self.requester.load(Ordering::Relaxed); + let registry = vm.state.thread_frames.lock(); + let mut all_suspended = true; + let mut attached_seen = 0u64; + let mut forced_parks = 0u64; + for (&id, slot) in registry.iter() { + if id == requester { + continue; + } + let state = slot.state.load(Ordering::Relaxed); + if state == THREAD_DETACHED { + // CAS DETACHED → SUSPENDED (park without thread cooperation) + let _ = slot.state.compare_exchange( + THREAD_DETACHED, + THREAD_SUSPENDED, + Ordering::AcqRel, + Ordering::Relaxed, + ); + all_suspended = false; // re-check on next poll + forced_parks = forced_parks.saturating_add(1); + } else if state == THREAD_ATTACHED { + // Thread is in bytecode — it will see `requested` and self-suspend + all_suspended = false; + attached_seen = attached_seen.saturating_add(1); + } + // THREAD_SUSPENDED → already parked + } + if attached_seen != 0 { + self.stats_attached_seen + .fetch_add(attached_seen, Ordering::Relaxed); + } + if forced_parks != 0 { + self.stats_forced_parks + .fetch_add(forced_parks, Ordering::Relaxed); + } + all_suspended + } + + /// Stop all non-requester threads. Like CPython's `stop_the_world`. + /// + /// 1. Sets `requested`, marking the requester thread. + /// 2. CAS detached threads to SUSPENDED. + /// 3. Waits (polling with 1 ms condvar timeout) for attached threads + /// to self-suspend in `check_signals`. + pub fn stop_the_world(&self, vm: &VirtualMachine) { + let start = std::time::Instant::now(); + let requester_ident = crate::stdlib::thread::get_ident(); + self.requester.store(requester_ident, Ordering::Relaxed); + self.requested.store(true, Ordering::Release); + self.stats_stop_calls.fetch_add(1, Ordering::Relaxed); + stw_trace(format_args!("stop begin requester={requester_ident}")); + + let mut polls = 0u64; + loop { + if self.park_detached_threads(vm) { + break; + } + polls = polls.saturating_add(1); + // Wait up to 1 ms for a thread to notify us it suspended + let guard = self.notify_mutex.lock().unwrap(); + let _ = self + .notify_cv + .wait_timeout(guard, core::time::Duration::from_millis(1)); + } + if polls != 0 { + self.stats_poll_loops.fetch_add(polls, Ordering::Relaxed); + } + let wait_ns = start.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64; + self.stats_last_wait_ns.store(wait_ns, Ordering::Relaxed); + self.stats_total_wait_ns + .fetch_add(wait_ns, Ordering::Relaxed); + let mut prev_max = self.stats_max_wait_ns.load(Ordering::Relaxed); + while wait_ns > prev_max { + match self.stats_max_wait_ns.compare_exchange_weak( + prev_max, + wait_ns, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(observed) => prev_max = observed, + } + } + #[cfg(debug_assertions)] + self.debug_assert_all_non_requester_suspended(vm); + stw_trace(format_args!( + "stop end requester={requester_ident} wait_ns={wait_ns} polls={polls}" + )); + } + + /// Resume all suspended threads. Like CPython's `start_the_world`. + pub fn start_the_world(&self, vm: &VirtualMachine) { + use thread::{THREAD_DETACHED, THREAD_SUSPENDED}; + let requester = self.requester.load(Ordering::Relaxed); + stw_trace(format_args!("start begin requester={requester}")); + // Clear the request flag BEFORE waking threads. Otherwise a thread + // returning from allow_threads → attach_thread could observe + // `requested == true`, re-suspend itself, and stay parked forever. + self.requested.store(false, Ordering::Release); + let registry = vm.state.thread_frames.lock(); + for (&id, slot) in registry.iter() { + if id == requester { + continue; + } + if slot.state.load(Ordering::Relaxed) == THREAD_SUSPENDED { + slot.state.store(THREAD_DETACHED, Ordering::Release); + slot.thread.unpark(); + } + } + drop(registry); + self.requester.store(0, Ordering::Relaxed); + #[cfg(debug_assertions)] + self.debug_assert_all_non_requester_detached(vm); + stw_trace(format_args!("start end requester={requester}")); + } + + /// Reset after fork in the child (only one thread alive). + pub fn reset_after_fork(&self) { + self.requested.store(false, Ordering::Relaxed); + self.requester.store(0, Ordering::Relaxed); + stw_trace(format_args!("reset-after-fork")); + } + + pub fn stats_snapshot(&self) -> StopTheWorldStats { + StopTheWorldStats { + stop_calls: self.stats_stop_calls.load(Ordering::Relaxed), + last_wait_ns: self.stats_last_wait_ns.load(Ordering::Relaxed), + total_wait_ns: self.stats_total_wait_ns.load(Ordering::Relaxed), + max_wait_ns: self.stats_max_wait_ns.load(Ordering::Relaxed), + poll_loops: self.stats_poll_loops.load(Ordering::Relaxed), + attached_seen: self.stats_attached_seen.load(Ordering::Relaxed), + forced_parks: self.stats_forced_parks.load(Ordering::Relaxed), + suspend_notifications: self.stats_suspend_notifications.load(Ordering::Relaxed), + attach_wait_yields: self.stats_attach_wait_yields.load(Ordering::Relaxed), + suspend_wait_yields: self.stats_suspend_wait_yields.load(Ordering::Relaxed), + detach_wait_yields: self.stats_detach_wait_yields.load(Ordering::Relaxed), + } + } + + pub fn reset_stats(&self) { + self.stats_stop_calls.store(0, Ordering::Relaxed); + self.stats_last_wait_ns.store(0, Ordering::Relaxed); + self.stats_total_wait_ns.store(0, Ordering::Relaxed); + self.stats_max_wait_ns.store(0, Ordering::Relaxed); + self.stats_poll_loops.store(0, Ordering::Relaxed); + self.stats_attached_seen.store(0, Ordering::Relaxed); + self.stats_forced_parks.store(0, Ordering::Relaxed); + self.stats_suspend_notifications.store(0, Ordering::Relaxed); + self.stats_attach_wait_yields.store(0, Ordering::Relaxed); + self.stats_suspend_wait_yields.store(0, Ordering::Relaxed); + self.stats_detach_wait_yields.store(0, Ordering::Relaxed); + } + + #[inline] + pub(crate) fn add_attach_wait_yields(&self, n: u64) { + if n != 0 { + self.stats_attach_wait_yields + .fetch_add(n, Ordering::Relaxed); + } + } + + #[inline] + pub(crate) fn add_suspend_wait_yields(&self, n: u64) { + if n != 0 { + self.stats_suspend_wait_yields + .fetch_add(n, Ordering::Relaxed); + } + } + + #[inline] + pub(crate) fn add_detach_wait_yields(&self, n: u64) { + if n != 0 { + self.stats_detach_wait_yields + .fetch_add(n, Ordering::Relaxed); + } + } + + #[cfg(debug_assertions)] + fn debug_assert_all_non_requester_suspended(&self, vm: &VirtualMachine) { + use thread::THREAD_ATTACHED; + let requester = self.requester.load(Ordering::Relaxed); + let registry = vm.state.thread_frames.lock(); + for (&id, slot) in registry.iter() { + if id == requester { + continue; + } + let state = slot.state.load(Ordering::Relaxed); + debug_assert!( + state != THREAD_ATTACHED, + "non-requester thread still attached during stop-the-world: id={id} state={state}" + ); + } + } + + #[cfg(debug_assertions)] + fn debug_assert_all_non_requester_detached(&self, vm: &VirtualMachine) { + use thread::THREAD_SUSPENDED; + let requester = self.requester.load(Ordering::Relaxed); + let registry = vm.state.thread_frames.lock(); + for (&id, slot) in registry.iter() { + if id == requester { + continue; + } + let state = slot.state.load(Ordering::Relaxed); + debug_assert!( + state != THREAD_SUSPENDED, + "non-requester thread still suspended after start-the-world: id={id} state={state}" + ); + } + } +} + +#[cfg(all(unix, feature = "threading"))] +pub(super) fn stw_trace_enabled() -> bool { + static ENABLED: std::sync::OnceLock = std::sync::OnceLock::new(); + *ENABLED.get_or_init(|| std::env::var_os("RUSTPYTHON_STW_TRACE").is_some()) +} + +#[cfg(all(unix, feature = "threading"))] +pub(super) fn stw_trace(msg: core::fmt::Arguments<'_>) { + if stw_trace_enabled() { + use core::fmt::Write as _; + + // Avoid stdio locking here: this path runs around fork where a child + // may inherit a borrowed stderr lock and panic on eprintln!/stderr. + struct FixedBuf { + buf: [u8; 512], + len: usize, + } + + impl core::fmt::Write for FixedBuf { + fn write_str(&mut self, s: &str) -> core::fmt::Result { + if self.len >= self.buf.len() { + return Ok(()); + } + let remain = self.buf.len() - self.len; + let src = s.as_bytes(); + let n = src.len().min(remain); + self.buf[self.len..self.len + n].copy_from_slice(&src[..n]); + self.len += n; + Ok(()) + } + } + + let mut out = FixedBuf { + buf: [0u8; 512], + len: 0, + }; + let _ = writeln!( + &mut out, + "[rp-stw tid={}] {}", + crate::stdlib::thread::get_ident(), + msg + ); + unsafe { + let _ = libc::write(libc::STDERR_FILENO, out.buf.as_ptr().cast(), out.len); + } + } +} + pub struct PyGlobalState { pub config: PyConfig, pub module_defs: BTreeMap<&'static str, &'static builtins::PyModuleDef>, @@ -165,6 +518,9 @@ pub struct PyGlobalState { /// Incremented on every monitoring state change. Code objects compare their /// local version against this to decide whether re-instrumentation is needed. pub instrumentation_version: AtomicU64, + /// Stop-the-world state for pre-fork thread suspension + #[cfg(all(unix, feature = "threading"))] + pub stop_the_world: StopTheWorldState, } pub fn process_hash_secret_seed() -> u32 { @@ -194,6 +550,16 @@ impl VirtualMachine { unsafe { (*self.datastack.get()).pop(base) } } + /// Temporarily detach the current thread (ATTACHED → DETACHED) while + /// running `f`, then re-attach afterwards. Allows `stop_the_world` to + /// park this thread during blocking syscalls. + /// + /// Equivalent to CPython's `Py_BEGIN_ALLOW_THREADS` / `Py_END_ALLOW_THREADS`. + #[inline] + pub fn allow_threads(&self, f: impl FnOnce() -> R) -> R { + thread::allow_threads(self, f) + } + /// Check whether the current thread is the main thread. /// Mirrors `_Py_ThreadCanHandleSignals`. #[allow(dead_code)] @@ -1482,6 +1848,10 @@ impl VirtualMachine { return Err(self.new_exception(self.ctx.exceptions.system_exit.to_owned(), vec![])); } + // Suspend this thread if stop-the-world is in progress + #[cfg(all(unix, feature = "threading"))] + thread::suspend_if_needed(&self.state.stop_the_world); + #[cfg(not(target_arch = "wasm32"))] { crate::signal::check_signals(self) diff --git a/crates/vm/src/vm/thread.rs b/crates/vm/src/vm/thread.rs index 8dd8e0312ee..10297a964fa 100644 --- a/crates/vm/src/vm/thread.rs +++ b/crates/vm/src/vm/thread.rs @@ -14,6 +14,17 @@ use core::{ use itertools::Itertools; use std::thread_local; +// Thread states for stop-the-world support. +// DETACHED: not executing Python bytecode (in native code, or idle) +// ATTACHED: actively executing Python bytecode +// SUSPENDED: parked by a stop-the-world request +#[cfg(all(unix, feature = "threading"))] +pub const THREAD_DETACHED: i32 = 0; +#[cfg(all(unix, feature = "threading"))] +pub const THREAD_ATTACHED: i32 = 1; +#[cfg(all(unix, feature = "threading"))] +pub const THREAD_SUSPENDED: i32 = 2; + /// Per-thread shared state for sys._current_frames() and sys._current_exceptions(). /// The exception field uses atomic operations for lock-free cross-thread reads. #[cfg(feature = "threading")] @@ -22,6 +33,12 @@ pub struct ThreadSlot { /// Readers must hold the Mutex and convert to FrameRef inside the lock. pub frames: parking_lot::Mutex>, pub exception: crate::PyAtomicRef>, + /// Thread state for stop-the-world: DETACHED / ATTACHED / SUSPENDED + #[cfg(unix)] + pub state: core::sync::atomic::AtomicI32, + /// Handle for waking this thread from park in stop-the-world paths. + #[cfg(unix)] + pub thread: std::thread::Thread, } #[cfg(feature = "threading")] @@ -57,13 +74,29 @@ pub fn with_current_vm(f: impl FnOnce(&VirtualMachine) -> R) -> R { pub fn enter_vm(vm: &VirtualMachine, f: impl FnOnce() -> R) -> R { VM_STACK.with(|vms| { + // Outermost enter_vm: transition DETACHED → ATTACHED + #[cfg(all(unix, feature = "threading"))] + let was_outermost = vms.borrow().is_empty(); + vms.borrow_mut().push(vm.into()); // Initialize thread slot for this thread if not already done #[cfg(feature = "threading")] init_thread_slot_if_needed(vm); - scopeguard::defer! { vms.borrow_mut().pop(); } + #[cfg(all(unix, feature = "threading"))] + if was_outermost { + attach_thread(vm); + } + + scopeguard::defer! { + // Outermost exit: transition ATTACHED → DETACHED + #[cfg(all(unix, feature = "threading"))] + if vms.borrow().len() == 1 { + detach_thread(vm); + } + vms.borrow_mut().pop(); + } VM_CURRENT.set(vm, f) }) } @@ -75,19 +108,258 @@ fn init_thread_slot_if_needed(vm: &VirtualMachine) { CURRENT_THREAD_SLOT.with(|slot| { if slot.borrow().is_none() { let thread_id = crate::stdlib::thread::get_ident(); + let mut registry = vm.state.thread_frames.lock(); let new_slot = Arc::new(ThreadSlot { frames: parking_lot::Mutex::new(Vec::new()), exception: crate::PyAtomicRef::from(None::), + #[cfg(unix)] + state: core::sync::atomic::AtomicI32::new( + if vm.state.stop_the_world.requested.load(Ordering::Acquire) { + // Match init_threadstate(): new thread-state starts + // suspended while stop-the-world is active. + THREAD_SUSPENDED + } else { + THREAD_DETACHED + }, + ), + #[cfg(unix)] + thread: std::thread::current(), }); - vm.state - .thread_frames - .lock() - .insert(thread_id, new_slot.clone()); + registry.insert(thread_id, new_slot.clone()); + drop(registry); *slot.borrow_mut() = Some(new_slot); } }); } +/// Transition DETACHED → ATTACHED. Blocks if the thread was SUSPENDED by +/// a stop-the-world request (like `_PyThreadState_Attach` + `tstate_wait_attach`). +#[cfg(all(unix, feature = "threading"))] +fn wait_while_suspended(slot: &ThreadSlot) -> u64 { + let mut wait_yields = 0u64; + while slot.state.load(Ordering::Acquire) == THREAD_SUSPENDED { + wait_yields = wait_yields.saturating_add(1); + std::thread::park_timeout(core::time::Duration::from_micros(50)); + } + wait_yields +} + +#[cfg(all(unix, feature = "threading"))] +fn attach_thread(vm: &VirtualMachine) { + CURRENT_THREAD_SLOT.with(|slot| { + if let Some(s) = slot.borrow().as_ref() { + super::stw_trace(format_args!("attach begin")); + loop { + if vm.state.stop_the_world.requested.load(Ordering::Acquire) { + match s.state.compare_exchange( + THREAD_DETACHED, + THREAD_SUSPENDED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + super::stw_trace(format_args!("attach requested DETACHED->SUSPENDED")); + vm.state.stop_the_world.notify_suspended(); + let wait_yields = wait_while_suspended(s); + vm.state.stop_the_world.add_attach_wait_yields(wait_yields); + super::stw_trace(format_args!("attach requested resumed-detached")); + continue; + } + Err(THREAD_SUSPENDED) => { + let wait_yields = wait_while_suspended(s); + vm.state.stop_the_world.add_attach_wait_yields(wait_yields); + continue; + } + Err(_) => {} + } + } + match s.state.compare_exchange( + THREAD_DETACHED, + THREAD_ATTACHED, + Ordering::AcqRel, + Ordering::Relaxed, + ) { + Ok(_) => { + super::stw_trace(format_args!("attach DETACHED->ATTACHED")); + break; + } + Err(THREAD_SUSPENDED) => { + // Parked by stop-the-world — wait until released to DETACHED + super::stw_trace(format_args!("attach wait-suspended")); + let wait_yields = wait_while_suspended(s); + vm.state.stop_the_world.add_attach_wait_yields(wait_yields); + // Retry CAS + } + Err(state) => { + debug_assert!(false, "unexpected thread state in attach: {state}"); + break; + } + } + } + } + }); +} + +/// Transition ATTACHED → DETACHED (like `_PyThreadState_Detach`). +#[cfg(all(unix, feature = "threading"))] +fn detach_thread(vm: &VirtualMachine) { + CURRENT_THREAD_SLOT.with(|slot| { + if let Some(s) = slot.borrow().as_ref() { + match s.state.compare_exchange( + THREAD_ATTACHED, + THREAD_DETACHED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => {} + Err(THREAD_DETACHED) => { + debug_assert!(false, "detach called while already DETACHED"); + return; + } + Err(state) => { + debug_assert!(false, "unexpected thread state in detach: {state}"); + return; + } + } + super::stw_trace(format_args!("detach ATTACHED->DETACHED")); + + if vm.state.stop_the_world.requested.load(Ordering::Acquire) { + match s.state.compare_exchange( + THREAD_DETACHED, + THREAD_SUSPENDED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + super::stw_trace(format_args!("detach requested DETACHED->SUSPENDED")); + vm.state.stop_the_world.notify_suspended(); + } + Err(THREAD_SUSPENDED) => {} + Err(_) => return, + } + let wait_yields = wait_while_suspended(s); + vm.state.stop_the_world.add_detach_wait_yields(wait_yields); + super::stw_trace(format_args!("detach requested resumed-detached")); + } + } + }); +} + +/// Temporarily transition the current thread ATTACHED → DETACHED while +/// running `f`, then re-attach afterwards. This allows `stop_the_world` +/// to park this thread during blocking operations. +/// +/// Equivalent to CPython's `Py_BEGIN_ALLOW_THREADS` / `Py_END_ALLOW_THREADS`. +#[cfg(all(unix, feature = "threading"))] +pub fn allow_threads(vm: &VirtualMachine, f: impl FnOnce() -> R) -> R { + // Preserve CPython-like save/restore semantics: + // only detach if this call observed ATTACHED at entry, and always restore + // on unwind. + let should_transition = CURRENT_THREAD_SLOT.with(|slot| { + slot.borrow() + .as_ref() + .is_some_and(|s| s.state.load(Ordering::Acquire) == THREAD_ATTACHED) + }); + if !should_transition { + return f(); + } + + detach_thread(vm); + let reattach_guard = scopeguard::guard(vm, attach_thread); + let result = f(); + drop(reattach_guard); + result +} + +/// No-op on non-unix or non-threading builds. +#[cfg(not(all(unix, feature = "threading")))] +pub fn allow_threads(_vm: &VirtualMachine, f: impl FnOnce() -> R) -> R { + f() +} + +/// Called from check_signals when stop-the-world is requested. +/// Transitions ATTACHED → SUSPENDED and waits until released +/// (like `_PyThreadState_Suspend` + `_PyThreadState_Attach`). +#[cfg(all(unix, feature = "threading"))] +pub fn suspend_if_needed(stw: &super::StopTheWorldState) { + if !stw.requested.load(Ordering::Relaxed) { + return; + } + do_suspend(stw); +} + +#[cfg(all(unix, feature = "threading"))] +#[cold] +fn do_suspend(stw: &super::StopTheWorldState) { + CURRENT_THREAD_SLOT.with(|slot| { + if let Some(s) = slot.borrow().as_ref() { + // ATTACHED → SUSPENDED + match s.state.compare_exchange( + THREAD_ATTACHED, + THREAD_SUSPENDED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => {} + Err(THREAD_DETACHED) => { + // Leaving VM; caller will re-check on next entry. + super::stw_trace(format_args!("suspend skip DETACHED")); + return; + } + Err(THREAD_SUSPENDED) => { + // Already parked by another path. + super::stw_trace(format_args!("suspend skip already-suspended")); + return; + } + Err(state) => { + debug_assert!(false, "unexpected thread state in suspend: {state}"); + return; + } + } + super::stw_trace(format_args!("suspend ATTACHED->SUSPENDED")); + + // Re-check: if start_the_world already ran (cleared `requested`), + // no one will set us back to DETACHED — we must self-recover. + if !stw.requested.load(Ordering::Acquire) { + s.state.store(THREAD_ATTACHED, Ordering::Release); + super::stw_trace(format_args!("suspend abort requested-cleared")); + return; + } + + // Notify the stop-the-world requester that we've parked + stw.notify_suspended(); + super::stw_trace(format_args!("suspend notified-requester")); + + // Wait until start_the_world sets us back to DETACHED + let wait_yields = wait_while_suspended(s); + stw.add_suspend_wait_yields(wait_yields); + + // Re-attach (DETACHED → ATTACHED), mirroring CPython's + // tstate_wait_attach CAS loop. + loop { + match s.state.compare_exchange( + THREAD_DETACHED, + THREAD_ATTACHED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(THREAD_SUSPENDED) => { + let extra_wait = wait_while_suspended(s); + stw.add_suspend_wait_yields(extra_wait); + } + Err(THREAD_ATTACHED) => break, + Err(state) => { + debug_assert!(false, "unexpected post-suspend state: {state}"); + break; + } + } + } + super::stw_trace(format_args!("suspend resume -> ATTACHED")); + } + }); +} + /// Push a frame pointer onto the current thread's shared frame stack. /// The pointed-to frame must remain alive until the matching pop. #[cfg(feature = "threading")] @@ -179,6 +451,10 @@ pub fn reinit_frame_slot_after_fork(vm: &VirtualMachine) { let new_slot = Arc::new(ThreadSlot { frames: parking_lot::Mutex::new(current_frames), exception: crate::PyAtomicRef::from(vm.topmost_exception()), + #[cfg(unix)] + state: core::sync::atomic::AtomicI32::new(THREAD_ATTACHED), + #[cfg(unix)] + thread: std::thread::current(), }); // Lock is safe: reinit_locks_after_fork() already reset it to unlocked. From c7eca0470dce58c3488fbe4af2d2910f60faac69 Mon Sep 17 00:00:00 2001 From: "Jeong, YunWon" Date: Sat, 7 Mar 2026 18:13:31 +0900 Subject: [PATCH 2/2] x --- crates/compiler-core/src/bytecode.rs | 48 +--------------------------- crates/vm/src/frame.rs | 8 ++--- 2 files changed, 3 insertions(+), 53 deletions(-) diff --git a/crates/compiler-core/src/bytecode.rs b/crates/compiler-core/src/bytecode.rs index df219ce9075..46182962654 100644 --- a/crates/compiler-core/src/bytecode.rs +++ b/crates/compiler-core/src/bytecode.rs @@ -12,7 +12,7 @@ use core::{ cell::UnsafeCell, hash, mem, ops::Deref, - sync::atomic::{AtomicU8, AtomicU16, AtomicU32, AtomicUsize, Ordering}, + sync::atomic::{AtomicU8, AtomicU16, AtomicUsize, Ordering}, }; use itertools::Itertools; use malachite_bigint::BigInt; @@ -415,9 +415,6 @@ pub struct CodeUnits { /// Single atomic load/store prevents torn reads when multiple threads /// specialize the same instruction concurrently. pointer_cache: Box<[AtomicUsize]>, - /// SeqLock counter per instruction cache base for descriptor payload writes. - /// odd = write in progress, even = quiescent. - descriptor_sequences: Box<[AtomicU32]>, } // SAFETY: All cache operations use atomic read/write instructions. @@ -444,16 +441,10 @@ impl Clone for CodeUnits { .iter() .map(|c| AtomicUsize::new(c.load(Ordering::Relaxed))) .collect(); - let descriptor_sequences = self - .descriptor_sequences - .iter() - .map(|c| AtomicU32::new(c.load(Ordering::Relaxed))) - .collect(); Self { units: UnsafeCell::new(units), adaptive_counters, pointer_cache, - descriptor_sequences, } } } @@ -500,15 +491,10 @@ impl From> for CodeUnits { .map(|_| AtomicUsize::new(0)) .collect::>() .into_boxed_slice(); - let descriptor_sequences = (0..len) - .map(|_| AtomicU32::new(0)) - .collect::>() - .into_boxed_slice(); Self { units: UnsafeCell::new(units), adaptive_counters, pointer_cache, - descriptor_sequences, } } } @@ -655,38 +641,6 @@ impl CodeUnits { self.pointer_cache[index].load(Ordering::Relaxed) } - #[inline] - pub fn begin_descriptor_write(&self, index: usize) { - let sequence = &self.descriptor_sequences[index]; - let mut seq = sequence.load(Ordering::Acquire); - loop { - while (seq & 1) != 0 { - core::hint::spin_loop(); - seq = sequence.load(Ordering::Acquire); - } - match sequence.compare_exchange_weak( - seq, - seq.wrapping_add(1), - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - core::sync::atomic::fence(Ordering::Release); - break; - } - Err(observed) => { - core::hint::spin_loop(); - seq = observed; - } - } - } - } - - #[inline] - pub fn end_descriptor_write(&self, index: usize) { - self.descriptor_sequences[index].fetch_add(1, Ordering::Release); - } - /// Read adaptive counter bits for instruction at `index`. /// Uses Relaxed atomic load. pub fn read_adaptive_counter(&self, index: usize) -> u16 { diff --git a/crates/vm/src/frame.rs b/crates/vm/src/frame.rs index 4e2a9d7f345..69e4c062994 100644 --- a/crates/vm/src/frame.rs +++ b/crates/vm/src/frame.rs @@ -7057,9 +7057,9 @@ impl ExecutingFrame<'_> { type_version: u32, descr_ptr: usize, ) { - // Publish descriptor cache atomically as a tuple: + // Publish descriptor cache with version-invalidation protocol: // invalidate version first, then write payload, then publish version. - self.code.instructions.begin_descriptor_write(cache_base); + // Reader double-checks version+ptr after incref, so no writer lock needed. unsafe { self.code.instructions.write_cache_u32(cache_base + 1, 0); self.code @@ -7069,7 +7069,6 @@ impl ExecutingFrame<'_> { .instructions .write_cache_u32(cache_base + 1, type_version); } - self.code.instructions.end_descriptor_write(cache_base); } #[inline] @@ -7080,8 +7079,6 @@ impl ExecutingFrame<'_> { metaclass_version: u32, descr_ptr: usize, ) { - // Same publish protocol as write_cached_descriptor(), plus metaclass guard. - self.code.instructions.begin_descriptor_write(cache_base); unsafe { self.code.instructions.write_cache_u32(cache_base + 1, 0); self.code @@ -7094,7 +7091,6 @@ impl ExecutingFrame<'_> { .instructions .write_cache_u32(cache_base + 1, type_version); } - self.code.instructions.end_descriptor_write(cache_base); } fn load_attr(&mut self, vm: &VirtualMachine, oparg: LoadAttr) -> FrameResult {