From 18c5cdd04c913eb515aa2dc2b39929228df8f263 Mon Sep 17 00:00:00 2001 From: "Jeong, YunWon" Date: Sun, 18 Jan 2026 22:16:27 +0900 Subject: [PATCH 1/2] sys.is_remote_debug_enabled, supports_isolated_interpreters --- crates/vm/src/stdlib/sys.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/crates/vm/src/stdlib/sys.rs b/crates/vm/src/stdlib/sys.rs index 5446b04c6d5..305e1cfa61d 100644 --- a/crates/vm/src/stdlib/sys.rs +++ b/crates/vm/src/stdlib/sys.rs @@ -76,8 +76,11 @@ mod sys { #[pyattr(name = "_rustpython_debugbuild")] const RUSTPYTHON_DEBUGBUILD: bool = cfg!(debug_assertions); + #[cfg(not(windows))] #[pyattr(name = "abiflags")] - pub(crate) const ABIFLAGS: &str = "t"; // 't' for free-threaded (no GIL) + const ABIFLAGS_ATTR: &str = "t"; // 't' for free-threaded (no GIL) + // Internal constant used for sysconfigdata_name + pub(crate) const ABIFLAGS: &str = "t"; #[pyattr(name = "api_version")] const API_VERSION: u32 = 0x0; // what C api? #[pyattr(name = "copyright")] @@ -527,6 +530,7 @@ mod sys { "_multiarch" => ctx.new_str(multiarch()), "version" => version_info(vm), "hexversion" => ctx.new_int(version::VERSION_HEX), + "supports_isolated_interpreters" => ctx.new_bool(false), }) } @@ -628,6 +632,12 @@ mod sys { false // RustPython has no GIL (like free-threaded Python) } + /// Return True if remote debugging is enabled, False otherwise. + #[pyfunction] + const fn is_remote_debug_enabled() -> bool { + false // RustPython does not support remote debugging + } + #[pyfunction] fn exit(code: OptionalArg, vm: &VirtualMachine) -> PyResult { let code = code.unwrap_or_none(vm); From f5121b84205e85e5d5bfa7d3b5f1d2296eeeb4f0 Mon Sep 17 00:00:00 2001 From: CPython Devleopers <> Date: Sun, 18 Jan 2026 22:22:02 +0900 Subject: [PATCH 2/2] Update test_sys form Python 3.14.2 --- Lib/test/test_sys.py | 453 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 379 insertions(+), 74 deletions(-) diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index c3a23ffa666..77300fbe0bb 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -7,19 +7,24 @@ import operator import os import random +import socket import struct import subprocess import sys import sysconfig import test.support +from io import StringIO +from unittest import mock from test import support from test.support import os_helper from test.support.script_helper import assert_python_ok, assert_python_failure +from test.support.socket_helper import find_unused_port from test.support import threading_helper from test.support import import_helper from test.support import force_not_colorized +from test.support import SHORT_TIMEOUT try: - from test.support import interpreters + from concurrent import interpreters except ImportError: interpreters = None import textwrap @@ -52,7 +57,7 @@ def test_original_displayhook(self): dh(None) self.assertEqual(out.getvalue(), "") - self.assertTrue(not hasattr(builtins, "_")) + self.assertNotHasAttr(builtins, "_") # sys.displayhook() requires arguments self.assertRaises(TypeError, dh) @@ -167,7 +172,7 @@ def test_original_excepthook(self): with support.captured_stderr() as err: sys.__excepthook__(*sys.exc_info()) - self.assertTrue(err.getvalue().endswith("ValueError: 42\n")) + self.assertEndsWith(err.getvalue(), "ValueError: 42\n") self.assertRaises(TypeError, sys.__excepthook__) @@ -188,7 +193,7 @@ def test_excepthook_bytes_filename(self): err = err.getvalue() self.assertIn(""" File "b'bytes_filename'", line 123\n""", err) self.assertIn(""" text\n""", err) - self.assertTrue(err.endswith("SyntaxError: msg\n")) + self.assertEndsWith(err, "SyntaxError: msg\n") def test_excepthook(self): with test.support.captured_output("stderr") as stderr: @@ -206,7 +211,7 @@ class SysModuleTest(unittest.TestCase): def tearDown(self): test.support.reap_children() - @unittest.expectedFailure # TODO: RUSTPYTHON + @unittest.expectedFailure # TODO: RUSTPYTHON def test_exit(self): # call with two arguments self.assertRaises(TypeError, sys.exit, 42, 42) @@ -266,8 +271,7 @@ def check_exit_message(code, expected, **env_vars): rc, out, err = assert_python_failure('-c', code, **env_vars) self.assertEqual(rc, 1) self.assertEqual(out, b'') - self.assertTrue(err.startswith(expected), - "%s doesn't start with %s" % (ascii(err), ascii(expected))) + self.assertStartsWith(err, expected) # test that stderr buffer is flushed before the exit message is written # into stderr @@ -399,36 +403,6 @@ def test_setrecursionlimit_to_depth(self): finally: sys.setrecursionlimit(old_limit) - @unittest.skipUnless(support.Py_GIL_DISABLED, "only meaningful if the GIL is disabled") - @threading_helper.requires_working_threading() - def test_racing_recursion_limit(self): - from threading import Thread - def something_recursive(): - def count(n): - if n > 0: - return count(n - 1) + 1 - return 0 - - count(50) - - def set_recursion_limit(): - for limit in range(100, 200): - sys.setrecursionlimit(limit) - - threads = [] - for _ in range(5): - threads.append(Thread(target=set_recursion_limit)) - - for _ in range(5): - threads.append(Thread(target=something_recursive)) - - with threading_helper.catch_threading_exception() as cm: - with threading_helper.start_threads(threads): - pass - - if cm.exc_value: - raise cm.exc_value - @unittest.expectedFailure # TODO: RUSTPYTHON def test_getwindowsversion(self): # Raise SkipTest if sys doesn't have getwindowsversion attribute @@ -467,7 +441,7 @@ def test_call_tracing(self): @unittest.skipUnless(hasattr(sys, "setdlopenflags"), 'test needs sys.setdlopenflags()') def test_dlopenflags(self): - self.assertTrue(hasattr(sys, "getdlopenflags")) + self.assertHasAttr(sys, "getdlopenflags") self.assertRaises(TypeError, sys.getdlopenflags, 42) oldflags = sys.getdlopenflags() self.assertRaises(TypeError, sys.setdlopenflags) @@ -656,8 +630,7 @@ def g456(): # And the next record must be for g456(). filename, lineno, funcname, sourceline = stack[i+1] self.assertEqual(funcname, "g456") - self.assertTrue((sourceline.startswith("if leave_g.wait(") or - sourceline.startswith("g_raised.set()"))) + self.assertStartsWith(sourceline, ("if leave_g.wait(", "g_raised.set()")) finally: # Reap the spawned thread. leave_g.set() @@ -757,6 +730,8 @@ def test_attributes(self): self.assertIn(sys.float_repr_style, ('short', 'legacy')) if not sys.platform.startswith('win'): self.assertIsInstance(sys.abiflags, str) + else: + self.assertFalse(hasattr(sys, 'abiflags')) def test_thread_info(self): info = sys.thread_info @@ -882,7 +857,7 @@ def test_subinterp_intern_singleton(self): ''')) self.assertTrue(sys._is_interned(s)) - @unittest.expectedFailure # TODO: RUSTPYTHON needs update for context_aware_warnings + @unittest.expectedFailure # TODO: RUSTPYTHON; needs update for context_aware_warnings def test_sys_flags(self): self.assertTrue(sys.flags) attrs = ("debug", @@ -890,11 +865,10 @@ def test_sys_flags(self): "dont_write_bytecode", "no_user_site", "no_site", "ignore_environment", "verbose", "bytes_warning", "quiet", "hash_randomization", "isolated", "dev_mode", "utf8_mode", - "warn_default_encoding", "safe_path", "int_max_str_digits", - "thread_inherit_context") + "warn_default_encoding", "safe_path", "int_max_str_digits") for attr in attrs: - self.assertTrue(hasattr(sys.flags, attr), attr) - attr_type = bool if attr in ("dev_mode", "safe_path", "thread_inherit_context") else int + self.assertHasAttr(sys.flags, attr) + attr_type = bool if attr in ("dev_mode", "safe_path") else int self.assertEqual(type(getattr(sys.flags, attr)), attr_type, attr) self.assertTrue(repr(sys.flags)) self.assertEqual(len(sys.flags), len(attrs)) @@ -920,9 +894,11 @@ def test_sys_getwindowsversion_no_instantiation(self): @test.support.cpython_only def test_clear_type_cache(self): - sys._clear_type_cache() + with self.assertWarnsRegex(DeprecationWarning, + r"sys\._clear_type_cache\(\) is deprecated.*"): + sys._clear_type_cache() - @unittest.skip("TODO: RUSTPYTHON; cp424 encoding not supported, causes panic") + @unittest.skip('TODO: RUSTPYTHON; cp424 encoding not supported, causes panic') @force_not_colorized @support.requires_subprocess() def test_ioencoding(self): @@ -1102,10 +1078,11 @@ def test_implementation(self): levels = {'alpha': 0xA, 'beta': 0xB, 'candidate': 0xC, 'final': 0xF} - self.assertTrue(hasattr(sys.implementation, 'name')) - self.assertTrue(hasattr(sys.implementation, 'version')) - self.assertTrue(hasattr(sys.implementation, 'hexversion')) - self.assertTrue(hasattr(sys.implementation, 'cache_tag')) + self.assertHasAttr(sys.implementation, 'name') + self.assertHasAttr(sys.implementation, 'version') + self.assertHasAttr(sys.implementation, 'hexversion') + self.assertHasAttr(sys.implementation, 'cache_tag') + self.assertHasAttr(sys.implementation, 'supports_isolated_interpreters') version = sys.implementation.version self.assertEqual(version[:2], (version.major, version.minor)) @@ -1119,6 +1096,15 @@ def test_implementation(self): self.assertEqual(sys.implementation.name, sys.implementation.name.lower()) + # https://peps.python.org/pep-0734 + sii = sys.implementation.supports_isolated_interpreters + self.assertIsInstance(sii, bool) + if test.support.check_impl_detail(cpython=True): + if test.support.is_emscripten or test.support.is_wasi: + self.assertFalse(sii) + else: + self.assertTrue(sii) + @test.support.cpython_only def test_debugmallocstats(self): # Test sys._debugmallocstats() @@ -1129,14 +1115,10 @@ def test_debugmallocstats(self): # Output of sys._debugmallocstats() depends on configure flags. # The sysconfig vars are not available on Windows. if sys.platform != "win32": - with_freelists = sysconfig.get_config_var("WITH_FREELISTS") with_pymalloc = sysconfig.get_config_var("WITH_PYMALLOC") - if with_freelists: - self.assertIn(b"free PyDictObjects", err) + self.assertIn(b"free PyDictObjects", err) if with_pymalloc: self.assertIn(b'Small block threshold', err) - if not with_freelists and not with_pymalloc: - self.assertFalse(err) # The function has no parameter self.assertRaises(TypeError, sys._debugmallocstats, True) @@ -1167,18 +1149,20 @@ def test_getallocatedblocks(self): # about the underlying implementation: the function might # return 0 or something greater. self.assertGreaterEqual(a, 0) + gc.collect() + b = sys.getallocatedblocks() + self.assertLessEqual(b, a) try: - # While we could imagine a Python session where the number of - # multiple buffer objects would exceed the sharing of references, - # it is unlikely to happen in a normal test run. - self.assertLess(a, sys.gettotalrefcount()) + # The reported blocks will include immortalized strings, but the + # total ref count will not. This will sanity check that among all + # other objects (those eligible for garbage collection) there + # are more references being tracked than allocated blocks. + interned_immortal = sys.getunicodeinternedsize(_only_immortal=True) + self.assertLess(a - interned_immortal, sys.gettotalrefcount()) except AttributeError: # gettotalrefcount() not available pass gc.collect() - b = sys.getallocatedblocks() - self.assertLessEqual(b, a) - gc.collect() c = sys.getallocatedblocks() self.assertIn(c, range(b - 50, b + 50)) @@ -1445,7 +1429,7 @@ def __del__(self): else: self.assertIn("ValueError", report) self.assertIn("del is broken", report) - self.assertTrue(report.endswith("\n")) + self.assertEndsWith(report, "\n") def test_original_unraisablehook_exception_qualname(self): # See bpo-41031, bpo-45083. @@ -1690,15 +1674,19 @@ class C(object): pass # float check(float(0), size('d')) # sys.floatinfo - check(sys.float_info, vsize('') + self.P * len(sys.float_info)) + check(sys.float_info, self.P + vsize('') + self.P * len(sys.float_info)) # frame def func(): return sys._getframe() x = func() - check(x, size('3Pi2c2P7P2ic??2P')) + if support.Py_GIL_DISABLED: + INTERPRETER_FRAME = '9PihcP' + else: + INTERPRETER_FRAME = '9PhcP' + check(x, size('3PiccPPP' + INTERPRETER_FRAME + 'P')) # function def func(): pass - check(func, size('15Pi')) + check(func, size('16Pi')) class c(): @staticmethod def foo(): @@ -1712,7 +1700,7 @@ def bar(cls): check(bar, size('PP')) # generator def get_gen(): yield 1 - check(get_gen(), size('PP4P4c7P2ic??2P')) + check(get_gen(), size('6P4c' + INTERPRETER_FRAME + 'P')) # iterator check(iter('abc'), size('lP')) # callable-iterator @@ -1794,13 +1782,14 @@ def delx(self): del self.__x # super check(super(int), size('3P')) # tuple - check((), vsize('')) - check((1,2,3), vsize('') + 3*self.P) + check((), vsize('') + self.P) + check((1,2,3), vsize('') + self.P + 3*self.P) # type # static type: PyTypeObject fmt = 'P2nPI13Pl4Pn9Pn12PIPc' s = vsize(fmt) check(int, s) + typeid = 'n' if support.Py_GIL_DISABLED else '' # class s = vsize(fmt + # PyTypeObject '4P' # PyAsyncMethods @@ -1808,8 +1797,9 @@ def delx(self): del self.__x '3P' # PyMappingMethods '10P' # PySequenceMethods '2P' # PyBufferProcs - '6P' - '1PIP' # Specializer cache + '7P' + '1PIP' # Specializer cache + + typeid # heap type id (free-threaded only) ) class newstyleclass(object): pass # Separate block for PyDictKeysObject with 8 keys and 5 entries @@ -1914,8 +1904,10 @@ def test_pythontypes(self): # symtable entry # XXX # sys.flags - # FIXME: The +1 will not be necessary once gh-122575 is fixed - check(sys.flags, vsize('') + self.P * (1 + len(sys.flags))) + # FIXME: The +3 is for the 'gil', 'thread_inherit_context' and + # 'context_aware_warnings' flags and will not be necessary once + # gh-122575 is fixed + check(sys.flags, vsize('') + self.P + self.P * (3 + len(sys.flags))) def test_asyncgen_hooks(self): old = sys.get_asyncgen_hooks() @@ -1973,5 +1965,318 @@ def write(self, s): self.assertEqual(out, b"") self.assertEqual(err, b"") +@test.support.support_remote_exec_only +@test.support.cpython_only +class TestRemoteExec(unittest.TestCase): + def tearDown(self): + test.support.reap_children() + + def _run_remote_exec_test(self, script_code, python_args=None, env=None, + prologue='', + script_path=os_helper.TESTFN + '_remote.py'): + # Create the script that will be remotely executed + self.addCleanup(os_helper.unlink, script_path) + + with open(script_path, 'w') as f: + f.write(script_code) + + # Create and run the target process + target = os_helper.TESTFN + '_target.py' + self.addCleanup(os_helper.unlink, target) + + port = find_unused_port() + + with open(target, 'w') as f: + f.write(f''' +import sys +import time +import socket + +# Connect to the test process +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +sock.connect(('localhost', {port})) + +{prologue} + +# Signal that the process is ready +sock.sendall(b"ready") + +print("Target process running...") + +# Wait for remote script to be executed +# (the execution will happen as the following +# code is processed as soon as the recv call +# unblocks) +sock.recv(1024) + +# Do a bunch of work to give the remote script time to run +x = 0 +for i in range(100): + x += i + +# Write confirmation back +sock.sendall(b"executed") +sock.close() +''') + + # Start the target process and capture its output + cmd = [sys.executable] + if python_args: + cmd.extend(python_args) + cmd.append(target) + + # Create a socket server to communicate with the target process + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.bind(('localhost', port)) + server_socket.settimeout(SHORT_TIMEOUT) + server_socket.listen(1) + + with subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + ) as proc: + client_socket = None + try: + # Accept connection from target process + client_socket, _ = server_socket.accept() + server_socket.close() + + response = client_socket.recv(1024) + self.assertEqual(response, b"ready") + + # Try remote exec on the target process + sys.remote_exec(proc.pid, script_path) + + # Signal script to continue + client_socket.sendall(b"continue") + + # Wait for execution confirmation + response = client_socket.recv(1024) + self.assertEqual(response, b"executed") + + # Return output for test verification + stdout, stderr = proc.communicate(timeout=10.0) + return proc.returncode, stdout, stderr + except PermissionError: + self.skipTest("Insufficient permissions to execute code in remote process") + finally: + if client_socket is not None: + client_socket.close() + proc.kill() + proc.terminate() + proc.wait(timeout=SHORT_TIMEOUT) + + def test_remote_exec(self): + """Test basic remote exec functionality""" + script = 'print("Remote script executed successfully!")' + returncode, stdout, stderr = self._run_remote_exec_test(script) + # self.assertEqual(returncode, 0) + self.assertIn(b"Remote script executed successfully!", stdout) + self.assertEqual(stderr, b"") + + def test_remote_exec_bytes(self): + script = 'print("Remote script executed successfully!")' + script_path = os.fsencode(os_helper.TESTFN) + b'_bytes_remote.py' + returncode, stdout, stderr = self._run_remote_exec_test(script, + script_path=script_path) + self.assertIn(b"Remote script executed successfully!", stdout) + self.assertEqual(stderr, b"") + + @unittest.skipUnless(os_helper.TESTFN_UNDECODABLE, 'requires undecodable path') + @unittest.skipIf(sys.platform == 'darwin', + 'undecodable paths are not supported on macOS') + def test_remote_exec_undecodable(self): + script = 'print("Remote script executed successfully!")' + script_path = os_helper.TESTFN_UNDECODABLE + b'_undecodable_remote.py' + for script_path in [script_path, os.fsdecode(script_path)]: + returncode, stdout, stderr = self._run_remote_exec_test(script, + script_path=script_path) + self.assertIn(b"Remote script executed successfully!", stdout) + self.assertEqual(stderr, b"") + + def test_remote_exec_with_self_process(self): + """Test remote exec with the target process being the same as the test process""" + + code = 'import sys;print("Remote script executed successfully!", file=sys.stderr)' + file = os_helper.TESTFN + '_remote_self.py' + with open(file, 'w') as f: + f.write(code) + self.addCleanup(os_helper.unlink, file) + with mock.patch('sys.stderr', new_callable=StringIO) as mock_stderr: + with mock.patch('sys.stdout', new_callable=StringIO) as mock_stdout: + sys.remote_exec(os.getpid(), os.path.abspath(file)) + print("Done") + self.assertEqual(mock_stderr.getvalue(), "Remote script executed successfully!\n") + self.assertEqual(mock_stdout.getvalue(), "Done\n") + + def test_remote_exec_raises_audit_event(self): + """Test remote exec raises an audit event""" + prologue = '''\ +import sys +def audit_hook(event, arg): + print(f"Audit event: {event}, arg: {arg}".encode("ascii", errors="replace")) +sys.addaudithook(audit_hook) +''' + script = ''' +print("Remote script executed successfully!") +''' + returncode, stdout, stderr = self._run_remote_exec_test(script, prologue=prologue) + self.assertEqual(returncode, 0) + self.assertIn(b"Remote script executed successfully!", stdout) + self.assertIn(b"Audit event: cpython.remote_debugger_script, arg: ", stdout) + self.assertEqual(stderr, b"") + + def test_remote_exec_with_exception(self): + """Test remote exec with an exception raised in the target process + + The exception should be raised in the main thread of the target process + but not crash the target process. + """ + script = ''' +raise Exception("Remote script exception") +''' + returncode, stdout, stderr = self._run_remote_exec_test(script) + self.assertEqual(returncode, 0) + self.assertIn(b"Remote script exception", stderr) + self.assertEqual(stdout.strip(), b"Target process running...") + + def test_new_namespace_for_each_remote_exec(self): + """Test that each remote_exec call gets its own namespace.""" + script = textwrap.dedent( + """ + assert globals() is not __import__("__main__").__dict__ + print("Remote script executed successfully!") + """ + ) + returncode, stdout, stderr = self._run_remote_exec_test(script) + self.assertEqual(returncode, 0) + self.assertEqual(stderr, b"") + self.assertIn(b"Remote script executed successfully", stdout) + + def test_remote_exec_disabled_by_env(self): + """Test remote exec is disabled when PYTHON_DISABLE_REMOTE_DEBUG is set""" + env = os.environ.copy() + env['PYTHON_DISABLE_REMOTE_DEBUG'] = '1' + with self.assertRaisesRegex(RuntimeError, "Remote debugging is not enabled in the remote process"): + self._run_remote_exec_test("print('should not run')", env=env) + + def test_remote_exec_disabled_by_xoption(self): + """Test remote exec is disabled with -Xdisable-remote-debug""" + with self.assertRaisesRegex(RuntimeError, "Remote debugging is not enabled in the remote process"): + self._run_remote_exec_test("print('should not run')", python_args=['-Xdisable-remote-debug']) + + def test_remote_exec_invalid_pid(self): + """Test remote exec with invalid process ID""" + with self.assertRaises(OSError): + sys.remote_exec(99999, "print('should not run')") + + def test_remote_exec_invalid_script(self): + """Test remote exec with invalid script type""" + with self.assertRaises(TypeError): + sys.remote_exec(0, None) + with self.assertRaises(TypeError): + sys.remote_exec(0, 123) + + def test_remote_exec_syntax_error(self): + """Test remote exec with syntax error in script""" + script = ''' +this is invalid python code +''' + returncode, stdout, stderr = self._run_remote_exec_test(script) + self.assertEqual(returncode, 0) + self.assertIn(b"SyntaxError", stderr) + self.assertEqual(stdout.strip(), b"Target process running...") + + def test_remote_exec_invalid_script_path(self): + """Test remote exec with invalid script path""" + with self.assertRaises(OSError): + sys.remote_exec(os.getpid(), "invalid_script_path") + + def test_remote_exec_in_process_without_debug_fails_envvar(self): + """Test remote exec in a process without remote debugging enabled""" + script = os_helper.TESTFN + '_remote.py' + self.addCleanup(os_helper.unlink, script) + with open(script, 'w') as f: + f.write('print("Remote script executed successfully!")') + env = os.environ.copy() + env['PYTHON_DISABLE_REMOTE_DEBUG'] = '1' + + _, out, err = assert_python_failure('-c', f'import os, sys; sys.remote_exec(os.getpid(), "{script}")', **env) + self.assertIn(b"Remote debugging is not enabled", err) + self.assertEqual(out, b"") + + def test_remote_exec_in_process_without_debug_fails_xoption(self): + """Test remote exec in a process without remote debugging enabled""" + script = os_helper.TESTFN + '_remote.py' + self.addCleanup(os_helper.unlink, script) + with open(script, 'w') as f: + f.write('print("Remote script executed successfully!")') + + _, out, err = assert_python_failure('-Xdisable-remote-debug', '-c', f'import os, sys; sys.remote_exec(os.getpid(), "{script}")') + self.assertIn(b"Remote debugging is not enabled", err) + self.assertEqual(out, b"") + +class TestSysJIT(unittest.TestCase): + + def test_jit_is_available(self): + available = sys._jit.is_available() + script = f"import sys; assert sys._jit.is_available() is {available}" + assert_python_ok("-c", script, PYTHON_JIT="0") + assert_python_ok("-c", script, PYTHON_JIT="1") + + def test_jit_is_enabled(self): + available = sys._jit.is_available() + script = "import sys; assert sys._jit.is_enabled() is {enabled}" + assert_python_ok("-c", script.format(enabled=False), PYTHON_JIT="0") + assert_python_ok("-c", script.format(enabled=available), PYTHON_JIT="1") + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_jit_is_active(self): + available = sys._jit.is_available() + script = textwrap.dedent( + """ + import _testcapi + import _testinternalcapi + import sys + + def frame_0_interpreter() -> None: + assert sys._jit.is_active() is False + + def frame_1_interpreter() -> None: + assert sys._jit.is_active() is False + frame_0_interpreter() + assert sys._jit.is_active() is False + + def frame_2_jit(expected: bool) -> None: + # Inlined into the last loop of frame_3_jit: + assert sys._jit.is_active() is expected + # Insert C frame: + _testcapi.pyobject_vectorcall(frame_1_interpreter, None, None) + assert sys._jit.is_active() is expected + + def frame_3_jit() -> None: + # JITs just before the last loop: + for i in range(_testinternalcapi.TIER2_THRESHOLD + 1): + # Careful, doing this in the reverse order breaks tracing: + expected = {enabled} and i == _testinternalcapi.TIER2_THRESHOLD + assert sys._jit.is_active() is expected + frame_2_jit(expected) + assert sys._jit.is_active() is expected + + def frame_4_interpreter() -> None: + assert sys._jit.is_active() is False + frame_3_jit() + assert sys._jit.is_active() is False + + assert sys._jit.is_active() is False + frame_4_interpreter() + assert sys._jit.is_active() is False + """ + ) + assert_python_ok("-c", script.format(enabled=False), PYTHON_JIT="0") + assert_python_ok("-c", script.format(enabled=available), PYTHON_JIT="1") + + if __name__ == "__main__": unittest.main()