diff --git a/Lib/test/test_multiprocessing_fork/test_processes.py b/Lib/test/test_multiprocessing_fork/test_processes.py index 02b7256e41b..987d5d7e3c6 100644 --- a/Lib/test/test_multiprocessing_fork/test_processes.py +++ b/Lib/test/test_multiprocessing_fork/test_processes.py @@ -3,40 +3,6 @@ install_tests_in_module_dict(globals(), 'fork', only_type="processes") -import os, sys # TODO: RUSTPYTHON -class WithProcessesTestCondition(WithProcessesTestCondition): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout') - def test_notify_all(self): super().test_notify_all() # TODO: RUSTPYTHON - -class WithProcessesTestLock(WithProcessesTestLock): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky BrokenPipeError, flaky ConnectionRefusedError, flaky ConnectionResetError, flaky EOFError') - def test_repr_lock(self): super().test_repr_lock() # TODO: RUSTPYTHON - -class WithProcessesTestManagerRestart(WithProcessesTestManagerRestart): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky BrokenPipeError, flaky ConnectionRefusedError, flaky ConnectionResetError, flaky EOFError') - def test_rapid_restart(self): super().test_rapid_restart() # TODO: RUSTPYTHON - -class WithProcessesTestProcess(WithProcessesTestProcess): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout') - def test_args_argument(self): super().test_args_argument() # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout') - def test_process(self): super().test_process() # TODO: RUSTPYTHON - -class WithProcessesTestPoolWorkerLifetime(WithProcessesTestPoolWorkerLifetime): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout') - def test_pool_worker_lifetime(self): super().test_pool_worker_lifetime() # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout') - def test_pool_worker_lifetime_early_close(self): super().test_pool_worker_lifetime_early_close() # TODO: RUSTPYTHON - -class WithProcessesTestQueue(WithProcessesTestQueue): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout') - def test_fork(self): super().test_fork() # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky timeout') - def test_get(self): super().test_get() # TODO: RUSTPYTHON - -class WithProcessesTestSharedMemory(WithProcessesTestSharedMemory): # TODO: RUSTPYTHON - @unittest.skipIf(sys.platform == 'linux', 'TODO: RUSTPYTHON flaky BrokenPipeError, flaky ConnectionRefusedError, flaky ConnectionResetError, flaky EOFError') - def test_shared_memory_SharedMemoryManager_basics(self): super().test_shared_memory_SharedMemoryManager_basics() # TODO: RUSTPYTHON if __name__ == '__main__': unittest.main() diff --git a/crates/vm/src/codecs.rs b/crates/vm/src/codecs.rs index 9cd75eee55c..a7eea0c9372 100644 --- a/crates/vm/src/codecs.rs +++ b/crates/vm/src/codecs.rs @@ -150,6 +150,31 @@ impl ToPyObject for PyCodec { } impl CodecsRegistry { + /// Force-unlock the inner RwLock after fork in the child process. + /// + /// # Safety + /// Must only be called after fork() in the child process when no other + /// threads exist. The calling thread must NOT hold this lock. + #[cfg(all(unix, feature = "host_env"))] + pub(crate) unsafe fn force_unlock_after_fork(&self) { + if self.inner.try_write().is_some() { + return; + } + let is_shared = self.inner.try_read().is_some(); + if is_shared { + loop { + // SAFETY: Lock is shared-locked by dead thread(s). + unsafe { self.inner.force_unlock_read() }; + if self.inner.try_write().is_some() { + return; + } + } + } else { + // SAFETY: Lock is exclusively locked by a dead thread. + unsafe { self.inner.force_unlock_write() }; + } + } + pub(crate) fn new(ctx: &Context) -> Self { ::rustpython_vm::common::static_cell! { static METHODS: Box<[PyMethodDef]>; diff --git a/crates/vm/src/intern.rs b/crates/vm/src/intern.rs index a50b8871cb9..8ad39c206fd 100644 --- a/crates/vm/src/intern.rs +++ b/crates/vm/src/intern.rs @@ -31,6 +31,32 @@ impl Clone for StringPool { } impl StringPool { + /// Force-unlock the inner RwLock after fork in the child process. + /// + /// # Safety + /// Must only be called after fork() in the child process when no other + /// threads exist. The calling thread must NOT hold this lock. + #[cfg(all(unix, feature = "host_env"))] + pub(crate) unsafe fn force_unlock_after_fork(&self) { + if self.inner.try_write().is_some() { + return; + } + // Lock is stuck from a thread that no longer exists. + let is_shared = self.inner.try_read().is_some(); + if is_shared { + loop { + // SAFETY: Lock is shared-locked by dead thread(s). + unsafe { self.inner.force_unlock_read() }; + if self.inner.try_write().is_some() { + return; + } + } + } else { + // SAFETY: Lock is exclusively locked by a dead thread. + unsafe { self.inner.force_unlock_write() }; + } + } + #[inline] pub unsafe fn intern( &self, diff --git a/crates/vm/src/stdlib/posix.rs b/crates/vm/src/stdlib/posix.rs index ce70412df76..c13bfb51f12 100644 --- a/crates/vm/src/stdlib/posix.rs +++ b/crates/vm/src/stdlib/posix.rs @@ -707,6 +707,22 @@ pub mod module { #[cfg(feature = "threading")] crate::object::reset_weakref_locks_after_fork(); + // Force-unlock all global VM locks that may have been held by + // threads that no longer exist in the child process after fork. + // SAFETY: After fork, only the forking thread survives. Any lock + // held by another thread is permanently stuck. The forking thread + // does not hold these locks during fork() (a high-level Python op). + unsafe { + vm.ctx.string_pool.force_unlock_after_fork(); + vm.state.codec_registry.force_unlock_after_fork(); + force_unlock_mutex_after_fork(&vm.state.atexit_funcs); + force_unlock_mutex_after_fork(&vm.state.before_forkers); + force_unlock_mutex_after_fork(&vm.state.after_forkers_child); + force_unlock_mutex_after_fork(&vm.state.after_forkers_parent); + force_unlock_mutex_after_fork(&vm.state.global_trace_func); + force_unlock_mutex_after_fork(&vm.state.global_profile_func); + } + // Mark all other threads as done before running Python callbacks #[cfg(feature = "threading")] crate::stdlib::thread::after_fork_child(vm); @@ -716,18 +732,21 @@ pub mod module { vm.signal_handlers .get_or_init(crate::signal::new_signal_handlers); - let after_forkers_child = match vm.state.after_forkers_child.try_lock() { - Some(guard) => guard.clone(), - None => { - // SAFETY: After fork in child process, only the current thread - // exists. The lock holder no longer exists. - unsafe { vm.state.after_forkers_child.force_unlock() }; - vm.state.after_forkers_child.lock().clone() - } - }; + let after_forkers_child: Vec = vm.state.after_forkers_child.lock().clone(); run_at_forkers(after_forkers_child, false, vm); } + /// Force-unlock a PyMutex if held by a dead thread after fork. + /// + /// # Safety + /// Must only be called after fork() in the child process. + unsafe fn force_unlock_mutex_after_fork(mutex: &crate::common::lock::PyMutex) { + if mutex.try_lock().is_none() { + // SAFETY: Lock is held by a dead thread after fork. + unsafe { mutex.force_unlock() }; + } + } + fn py_os_after_fork_parent(vm: &VirtualMachine) { let after_forkers_parent: Vec = vm.state.after_forkers_parent.lock().clone(); run_at_forkers(after_forkers_parent, false, vm); diff --git a/crates/vm/src/stdlib/sys.rs b/crates/vm/src/stdlib/sys.rs index 22b720a1cd2..3ee6caf2eae 100644 --- a/crates/vm/src/stdlib/sys.rs +++ b/crates/vm/src/stdlib/sys.rs @@ -827,8 +827,7 @@ mod sys { Ok(exc) => { // PyErr_Display: try traceback._print_exception_bltin first if let Ok(tb_mod) = vm.import("traceback", 0) - && let Ok(print_exc_builtin) = - tb_mod.get_attr("_print_exception_bltin", vm) + && let Ok(print_exc_builtin) = tb_mod.get_attr("_print_exception_bltin", vm) && print_exc_builtin .call((exc.as_object().to_owned(),), vm) .is_ok() diff --git a/crates/vm/src/stdlib/thread.rs b/crates/vm/src/stdlib/thread.rs index bf495ecc382..ff6f0fd8b8f 100644 --- a/crates/vm/src/stdlib/thread.rs +++ b/crates/vm/src/stdlib/thread.rs @@ -914,11 +914,13 @@ pub(crate) mod _thread { // Reinitialize frame slot for current thread crate::vm::thread::reinit_frame_slot_after_fork(vm); - // Clean up thread handles if we can acquire the lock. - // Use try_lock because the mutex might have been held during fork. - // If we can't acquire it, just skip - the child process will work - // correctly with new handles it creates. - if let Some(mut handles) = vm.state.thread_handles.try_lock() { + // Clean up thread handles. Force-unlock if held by a dead thread. + // SAFETY: After fork, only the current thread exists. + if vm.state.thread_handles.try_lock().is_none() { + unsafe { vm.state.thread_handles.force_unlock() }; + } + { + let mut handles = vm.state.thread_handles.lock(); // Clean up dead weak refs and mark non-current threads as done handles.retain(|(inner_weak, done_event_weak): &HandleEntry| { let Some(inner) = inner_weak.upgrade() else { @@ -957,10 +959,13 @@ pub(crate) mod _thread { }); } - // Clean up shutdown_handles as well. - // This is critical to prevent _shutdown() from waiting on threads - // that don't exist in the child process after fork. - if let Some(mut handles) = vm.state.shutdown_handles.try_lock() { + // Clean up shutdown_handles. Force-unlock if held by a dead thread. + // SAFETY: After fork, only the current thread exists. + if vm.state.shutdown_handles.try_lock().is_none() { + unsafe { vm.state.shutdown_handles.force_unlock() }; + } + { + let mut handles = vm.state.shutdown_handles.lock(); // Mark all non-current threads as done in shutdown_handles handles.retain(|(inner_weak, done_event_weak): &ShutdownEntry| { let Some(inner) = inner_weak.upgrade() else {