Skip to content

Commit 09c2947

Browse files
gh-110693: Pending Calls Machinery Cleanups (gh-118296)
This does some cleanup in preparation for later changes.
1 parent d5df252 commit 09c2947

File tree

7 files changed

+314
-112
lines changed

7 files changed

+314
-112
lines changed

Include/internal/pycore_ceval.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,12 @@ extern void _PyEval_SignalReceived(void);
4848
#define _Py_PENDING_MAINTHREADONLY 1
4949
#define _Py_PENDING_RAWFREE 2
5050

51+
typedef int _Py_add_pending_call_result;
52+
#define _Py_ADD_PENDING_SUCCESS 0
53+
#define _Py_ADD_PENDING_FULL -1
54+
5155
// Export for '_testinternalcapi' shared extension
52-
PyAPI_FUNC(int) _PyEval_AddPendingCall(
56+
PyAPI_FUNC(_Py_add_pending_call_result) _PyEval_AddPendingCall(
5357
PyInterpreterState *interp,
5458
_Py_pending_call_func func,
5559
void *arg,

Include/internal/pycore_ceval_state.h

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,56 @@ extern "C" {
1414

1515
typedef int (*_Py_pending_call_func)(void *);
1616

17+
struct _pending_call {
18+
_Py_pending_call_func func;
19+
void *arg;
20+
int flags;
21+
};
22+
23+
#define PENDINGCALLSARRAYSIZE 32
24+
25+
#define MAXPENDINGCALLS PENDINGCALLSARRAYSIZE
26+
/* For interpreter-level pending calls, we want to avoid spending too
27+
much time on pending calls in any one thread, so we apply a limit. */
28+
#if MAXPENDINGCALLS > 100
29+
# define MAXPENDINGCALLSLOOP 100
30+
#else
31+
# define MAXPENDINGCALLSLOOP MAXPENDINGCALLS
32+
#endif
33+
34+
#define MAXPENDINGCALLS_MAIN PENDINGCALLSARRAYSIZE
35+
/* For the main thread, we want to make sure all pending calls are
36+
run at once, for the sake of prompt signal handling. This is
37+
unlikely to cause any problems since there should be very few
38+
pending calls for the main thread. */
39+
#define MAXPENDINGCALLSLOOP_MAIN 0
40+
1741
struct _pending_calls {
1842
int busy;
1943
PyMutex mutex;
2044
/* Request for running pending calls. */
21-
int32_t calls_to_do;
22-
#define NPENDINGCALLS 32
23-
struct _pending_call {
24-
_Py_pending_call_func func;
25-
void *arg;
26-
int flags;
27-
} calls[NPENDINGCALLS];
45+
int32_t npending;
46+
/* The maximum allowed number of pending calls.
47+
If the queue fills up to this point then _PyEval_AddPendingCall()
48+
will return _Py_ADD_PENDING_FULL. */
49+
int32_t max;
50+
/* We don't want a flood of pending calls to interrupt any one thread
51+
for too long, so we keep a limit on the number handled per pass.
52+
A value of 0 means there is no limit (other than the maximum
53+
size of the list of pending calls). */
54+
int32_t maxloop;
55+
struct _pending_call calls[PENDINGCALLSARRAYSIZE];
2856
int first;
29-
int last;
57+
int next;
3058
};
3159

60+
3261
typedef enum {
3362
PERF_STATUS_FAILED = -1, // Perf trampoline is in an invalid state
3463
PERF_STATUS_NO_INIT = 0, // Perf trampoline is not initialized
3564
PERF_STATUS_OK = 1, // Perf trampoline is ready to be executed
3665
} perf_status_t;
3766

38-
3967
#ifdef PY_HAVE_PERF_TRAMPOLINE
4068
struct code_arena_st;
4169

@@ -48,6 +76,7 @@ struct trampoline_api_st {
4876
};
4977
#endif
5078

79+
5180
struct _ceval_runtime_state {
5281
struct {
5382
#ifdef PY_HAVE_PERF_TRAMPOLINE
@@ -62,10 +91,15 @@ struct _ceval_runtime_state {
6291
#endif
6392
} perf;
6493
/* Pending calls to be made only on the main thread. */
94+
// The signal machinery falls back on this
95+
// so it must be especially stable and efficient.
96+
// For example, we use a preallocated array
97+
// for the list of pending calls.
6598
struct _pending_calls pending_mainthread;
6699
PyMutex sys_trace_profile_mutex;
67100
};
68101

102+
69103
#ifdef PY_HAVE_PERF_TRAMPOLINE
70104
# define _PyEval_RUNTIME_PERF_INIT \
71105
{ \

Include/internal/pycore_runtime_init.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@ extern PyTypeObject _PyExc_MemoryError;
114114
.autoTSSkey = Py_tss_NEEDS_INIT, \
115115
.parser = _parser_runtime_state_INIT, \
116116
.ceval = { \
117+
.pending_mainthread = { \
118+
.max = MAXPENDINGCALLS_MAIN, \
119+
.maxloop = MAXPENDINGCALLSLOOP_MAIN, \
120+
}, \
117121
.perf = _PyEval_RUNTIME_PERF_INIT, \
118122
}, \
119123
.gilstate = { \
@@ -166,6 +170,10 @@ extern PyTypeObject _PyExc_MemoryError;
166170
.imports = IMPORTS_INIT, \
167171
.ceval = { \
168172
.recursion_limit = Py_DEFAULT_RECURSION_LIMIT, \
173+
.pending = { \
174+
.max = MAXPENDINGCALLS, \
175+
.maxloop = MAXPENDINGCALLSLOOP, \
176+
}, \
169177
}, \
170178
.gc = { \
171179
.enabled = 1, \

Lib/test/test_capi/test_misc.py

Lines changed: 70 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,6 +1172,12 @@ class MyType:
11721172
self.assertEqual(get_type_fullyqualname(MyType), 'my_qualname')
11731173

11741174

1175+
def test_gen_get_code(self):
1176+
def genf(): yield
1177+
gen = genf()
1178+
self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
1179+
1180+
11751181
@requires_limited_api
11761182
class TestHeapTypeRelative(unittest.TestCase):
11771183
"""Test API for extending opaque types (PEP 697)"""
@@ -1452,7 +1458,7 @@ class TestPendingCalls(unittest.TestCase):
14521458
# about when pending calls get run. This is especially relevant
14531459
# here for creating deterministic tests.
14541460

1455-
def pendingcalls_submit(self, l, n):
1461+
def main_pendingcalls_submit(self, l, n):
14561462
def callback():
14571463
#this function can be interrupted by thread switching so let's
14581464
#use an atomic operation
@@ -1467,12 +1473,27 @@ def callback():
14671473
if _testcapi._pending_threadfunc(callback):
14681474
break
14691475

1470-
def pendingcalls_wait(self, l, n, context = None):
1476+
def pendingcalls_submit(self, l, n, *, main=True, ensure=False):
1477+
def callback():
1478+
#this function can be interrupted by thread switching so let's
1479+
#use an atomic operation
1480+
l.append(None)
1481+
1482+
if main:
1483+
return _testcapi._pending_threadfunc(callback, n,
1484+
blocking=False,
1485+
ensure_added=ensure)
1486+
else:
1487+
return _testinternalcapi.pending_threadfunc(callback, n,
1488+
blocking=False,
1489+
ensure_added=ensure)
1490+
1491+
def pendingcalls_wait(self, l, numadded, context = None):
14711492
#now, stick around until l[0] has grown to 10
14721493
count = 0
1473-
while len(l) != n:
1494+
while len(l) != numadded:
14741495
#this busy loop is where we expect to be interrupted to
1475-
#run our callbacks. Note that callbacks are only run on the
1496+
#run our callbacks. Note that some callbacks are only run on the
14761497
#main thread
14771498
if False and support.verbose:
14781499
print("(%i)"%(len(l),),)
@@ -1482,12 +1503,12 @@ def pendingcalls_wait(self, l, n, context = None):
14821503
continue
14831504
count += 1
14841505
self.assertTrue(count < 10000,
1485-
"timeout waiting for %i callbacks, got %i"%(n, len(l)))
1506+
"timeout waiting for %i callbacks, got %i"%(numadded, len(l)))
14861507
if False and support.verbose:
14871508
print("(%i)"%(len(l),))
14881509

14891510
@threading_helper.requires_working_threading()
1490-
def test_pendingcalls_threaded(self):
1511+
def test_main_pendingcalls_threaded(self):
14911512

14921513
#do every callback on a separate thread
14931514
n = 32 #total callbacks
@@ -1501,15 +1522,15 @@ class foo(object):pass
15011522
context.lock = threading.Lock()
15021523
context.event = threading.Event()
15031524

1504-
threads = [threading.Thread(target=self.pendingcalls_thread,
1525+
threads = [threading.Thread(target=self.main_pendingcalls_thread,
15051526
args=(context,))
15061527
for i in range(context.nThreads)]
15071528
with threading_helper.start_threads(threads):
15081529
self.pendingcalls_wait(context.l, n, context)
15091530

1510-
def pendingcalls_thread(self, context):
1531+
def main_pendingcalls_thread(self, context):
15111532
try:
1512-
self.pendingcalls_submit(context.l, context.n)
1533+
self.main_pendingcalls_submit(context.l, context.n)
15131534
finally:
15141535
with context.lock:
15151536
context.nFinished += 1
@@ -1519,20 +1540,54 @@ def pendingcalls_thread(self, context):
15191540
if nFinished == context.nThreads:
15201541
context.event.set()
15211542

1522-
def test_pendingcalls_non_threaded(self):
1543+
def test_main_pendingcalls_non_threaded(self):
15231544
#again, just using the main thread, likely they will all be dispatched at
15241545
#once. It is ok to ask for too many, because we loop until we find a slot.
15251546
#the loop can be interrupted to dispatch.
15261547
#there are only 32 dispatch slots, so we go for twice that!
15271548
l = []
15281549
n = 64
1529-
self.pendingcalls_submit(l, n)
1550+
self.main_pendingcalls_submit(l, n)
15301551
self.pendingcalls_wait(l, n)
15311552

1532-
def test_gen_get_code(self):
1533-
def genf(): yield
1534-
gen = genf()
1535-
self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
1553+
def test_max_pending(self):
1554+
with self.subTest('main-only'):
1555+
maxpending = 32
1556+
1557+
l = []
1558+
added = self.pendingcalls_submit(l, 1, main=True)
1559+
self.pendingcalls_wait(l, added)
1560+
self.assertEqual(added, 1)
1561+
1562+
l = []
1563+
added = self.pendingcalls_submit(l, maxpending, main=True)
1564+
self.pendingcalls_wait(l, added)
1565+
self.assertEqual(added, maxpending)
1566+
1567+
l = []
1568+
added = self.pendingcalls_submit(l, maxpending+1, main=True)
1569+
self.pendingcalls_wait(l, added)
1570+
self.assertEqual(added, maxpending)
1571+
1572+
with self.subTest('not main-only'):
1573+
# Per-interpreter pending calls has the same low limit
1574+
# on how many may be pending at a time.
1575+
maxpending = 32
1576+
1577+
l = []
1578+
added = self.pendingcalls_submit(l, 1, main=False)
1579+
self.pendingcalls_wait(l, added)
1580+
self.assertEqual(added, 1)
1581+
1582+
l = []
1583+
added = self.pendingcalls_submit(l, maxpending, main=False)
1584+
self.pendingcalls_wait(l, added)
1585+
self.assertEqual(added, maxpending)
1586+
1587+
l = []
1588+
added = self.pendingcalls_submit(l, maxpending+1, main=False)
1589+
self.pendingcalls_wait(l, added)
1590+
self.assertEqual(added, maxpending)
15361591

15371592
class PendingTask(types.SimpleNamespace):
15381593

Modules/_testcapimodule.c

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -819,25 +819,55 @@ static int _pending_callback(void *arg)
819819
* run from any python thread.
820820
*/
821821
static PyObject *
822-
pending_threadfunc(PyObject *self, PyObject *arg)
822+
pending_threadfunc(PyObject *self, PyObject *arg, PyObject *kwargs)
823823
{
824+
static char *kwlist[] = {"callback", "num",
825+
"blocking", "ensure_added", NULL};
824826
PyObject *callable;
825-
int r;
826-
if (PyArg_ParseTuple(arg, "O", &callable) == 0)
827+
unsigned int num = 1;
828+
int blocking = 0;
829+
int ensure_added = 0;
830+
if (!PyArg_ParseTupleAndKeywords(arg, kwargs,
831+
"O|I$pp:_pending_threadfunc", kwlist,
832+
&callable, &num, &blocking, &ensure_added))
833+
{
827834
return NULL;
835+
}
828836

829837
/* create the reference for the callbackwhile we hold the lock */
830-
Py_INCREF(callable);
838+
for (unsigned int i = 0; i < num; i++) {
839+
Py_INCREF(callable);
840+
}
831841

832-
Py_BEGIN_ALLOW_THREADS
833-
r = Py_AddPendingCall(&_pending_callback, callable);
834-
Py_END_ALLOW_THREADS
842+
PyThreadState *save_tstate = NULL;
843+
if (!blocking) {
844+
save_tstate = PyEval_SaveThread();
845+
}
846+
847+
unsigned int num_added = 0;
848+
for (; num_added < num; num_added++) {
849+
if (ensure_added) {
850+
int r;
851+
do {
852+
r = Py_AddPendingCall(&_pending_callback, callable);
853+
} while (r < 0);
854+
}
855+
else {
856+
if (Py_AddPendingCall(&_pending_callback, callable) < 0) {
857+
break;
858+
}
859+
}
860+
}
861+
862+
if (!blocking) {
863+
PyEval_RestoreThread(save_tstate);
864+
}
835865

836-
if (r<0) {
866+
for (unsigned int i = num_added; i < num; i++) {
837867
Py_DECREF(callable); /* unsuccessful add, destroy the extra reference */
838-
Py_RETURN_FALSE;
839868
}
840-
Py_RETURN_TRUE;
869+
/* The callable is decref'ed above in each added _pending_callback(). */
870+
return PyLong_FromUnsignedLong((unsigned long)num_added);
841871
}
842872

843873
/* Test PyOS_string_to_double. */
@@ -3232,7 +3262,8 @@ static PyMethodDef TestMethods[] = {
32323262
{"_spawn_pthread_waiter", spawn_pthread_waiter, METH_NOARGS},
32333263
{"_end_spawned_pthread", end_spawned_pthread, METH_NOARGS},
32343264
#endif
3235-
{"_pending_threadfunc", pending_threadfunc, METH_VARARGS},
3265+
{"_pending_threadfunc", _PyCFunction_CAST(pending_threadfunc),
3266+
METH_VARARGS|METH_KEYWORDS},
32363267
#ifdef HAVE_GETTIMEOFDAY
32373268
{"profile_int", profile_int, METH_NOARGS},
32383269
#endif

0 commit comments

Comments
 (0)