Skip to content

gh-133485: Use _interpreters.call() in InterpreterPoolExecutor #133957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
97 changes: 41 additions & 56 deletions Include/internal/pycore_crossinterp.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,23 @@ PyAPI_FUNC(void) _PyXIData_Clear(PyInterpreterState *, _PyXIData_t *);

/* getting cross-interpreter data */

typedef int (*xidatafunc)(PyThreadState *tstate, PyObject *, _PyXIData_t *);
typedef int xidata_fallback_t;
#define _PyXIDATA_XIDATA_ONLY (0)
#define _PyXIDATA_FULL_FALLBACK (1)

// Technically, we don't need two different function types;
// we could go with just the fallback one. However, only container
// types like tuple need it, so always having the extra arg would be
// a bit unfortunate. It's also nice to be able to clearly distinguish
// between types that might call _PyObject_GetXIData() and those that won't.
//
typedef int (*xidatafunc)(PyThreadState *, PyObject *, _PyXIData_t *);
typedef int (*xidatafbfunc)(
PyThreadState *, PyObject *, xidata_fallback_t, _PyXIData_t *);
typedef struct {
xidatafunc basic;
xidatafbfunc fallback;
} _PyXIData_getdata_t;

PyAPI_FUNC(PyObject *) _PyXIData_GetNotShareableErrorType(PyThreadState *);
PyAPI_FUNC(void) _PyXIData_SetNotShareableError(PyThreadState *, const char *);
Expand All @@ -140,7 +156,7 @@ PyAPI_FUNC(void) _PyXIData_FormatNotShareableError(
const char *,
...);

PyAPI_FUNC(xidatafunc) _PyXIData_Lookup(
PyAPI_FUNC(_PyXIData_getdata_t) _PyXIData_Lookup(
PyThreadState *,
PyObject *);
PyAPI_FUNC(int) _PyObject_CheckXIData(
Expand All @@ -151,6 +167,11 @@ PyAPI_FUNC(int) _PyObject_GetXIData(
PyThreadState *,
PyObject *,
_PyXIData_t *);
PyAPI_FUNC(int) _PyObject_GetXIDataWithFallback(
PyThreadState *,
PyObject *,
xidata_fallback_t,
_PyXIData_t *);

// _PyObject_GetXIData() for bytes
typedef struct {
Expand Down Expand Up @@ -314,73 +335,37 @@ typedef struct _sharedexception {
PyAPI_FUNC(PyObject *) _PyXI_ApplyError(_PyXI_error *err);


typedef struct xi_session _PyXI_session;
typedef struct _sharedns _PyXI_namespace;

PyAPI_FUNC(void) _PyXI_FreeNamespace(_PyXI_namespace *ns);
PyAPI_FUNC(_PyXI_namespace *) _PyXI_NamespaceFromNames(PyObject *names);
PyAPI_FUNC(int) _PyXI_FillNamespaceFromDict(
_PyXI_namespace *ns,
PyObject *nsobj,
_PyXI_session *session);
PyAPI_FUNC(int) _PyXI_ApplyNamespace(
_PyXI_namespace *ns,
PyObject *nsobj,
PyObject *dflt);


// A cross-interpreter session involves entering an interpreter
// (_PyXI_Enter()), doing some work with it, and finally exiting
// that interpreter (_PyXI_Exit()).
// with _PyXI_Enter(), doing some work with it, and finally exiting
// that interpreter with _PyXI_Exit().
//
// At the boundaries of the session, both entering and exiting,
// data may be exchanged between the previous interpreter and the
// target one in a thread-safe way that does not violate the
// isolation between interpreters. This includes setting objects
// in the target's __main__ module on the way in, and capturing
// uncaught exceptions on the way out.
struct xi_session {
// Once a session has been entered, this is the tstate that was
// current before the session. If it is different from cur_tstate
// then we must have switched interpreters. Either way, this will
// be the current tstate once we exit the session.
PyThreadState *prev_tstate;
// Once a session has been entered, this is the current tstate.
// It must be current when the session exits.
PyThreadState *init_tstate;
// This is true if init_tstate needs cleanup during exit.
int own_init_tstate;

// This is true if, while entering the session, init_thread took
// "ownership" of the interpreter's __main__ module. This means
// it is the only thread that is allowed to run code there.
// (Caveat: for now, users may still run exec() against the
// __main__ module's dict, though that isn't advisable.)
int running;
// This is a cached reference to the __dict__ of the entered
// interpreter's __main__ module. It is looked up when at the
// beginning of the session as a convenience.
PyObject *main_ns;

// This is set if the interpreter is entered and raised an exception
// that needs to be handled in some special way during exit.
_PyXI_errcode *error_override;
// This is set if exit captured an exception to propagate.
_PyXI_error *error;

// -- pre-allocated memory --
_PyXI_error _error;
_PyXI_errcode _error_override;
};
typedef struct xi_session _PyXI_session;

PyAPI_FUNC(_PyXI_session *) _PyXI_NewSession(void);
PyAPI_FUNC(void) _PyXI_FreeSession(_PyXI_session *);

typedef struct {
PyObject *preserved;
PyObject *excinfo;
} _PyXI_session_result;

PyAPI_FUNC(int) _PyXI_Enter(
_PyXI_session *session,
PyInterpreterState *interp,
PyObject *nsupdates);
PyAPI_FUNC(void) _PyXI_Exit(_PyXI_session *session);
PyObject *nsupdates,
_PyXI_session_result *);
PyAPI_FUNC(int) _PyXI_Exit(_PyXI_session *, _PyXI_session_result *);

PyAPI_FUNC(PyObject *) _PyXI_GetMainNamespace(_PyXI_session *);

PyAPI_FUNC(PyObject *) _PyXI_ApplyCapturedException(_PyXI_session *session);
PyAPI_FUNC(int) _PyXI_HasCapturedException(_PyXI_session *session);
PyAPI_FUNC(int) _PyXI_Preserve(_PyXI_session *, const char *, PyObject *);
PyAPI_FUNC(PyObject *) _PyXI_GetPreserved(_PyXI_session_result *, const char *);


/*************/
Expand Down
4 changes: 2 additions & 2 deletions Include/internal/pycore_crossinterp_data_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ typedef struct _xid_regitem {
/* This is NULL for builtin types. */
PyObject *weakref;
size_t refcount;
xidatafunc getdata;
_PyXIData_getdata_t getdata;
} _PyXIData_regitem_t;

typedef struct {
Expand All @@ -30,7 +30,7 @@ typedef struct {
PyAPI_FUNC(int) _PyXIData_RegisterClass(
PyThreadState *,
PyTypeObject *,
xidatafunc);
_PyXIData_getdata_t);
PyAPI_FUNC(int) _PyXIData_UnregisterClass(
PyThreadState *,
PyTypeObject *);
Expand Down
4 changes: 2 additions & 2 deletions Include/internal/pycore_pyerrors.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ extern void _PyErr_Fetch(
PyObject **value,
PyObject **traceback);

extern PyObject* _PyErr_GetRaisedException(PyThreadState *tstate);
PyAPI_FUNC(PyObject*) _PyErr_GetRaisedException(PyThreadState *tstate);

PyAPI_FUNC(int) _PyErr_ExceptionMatches(
PyThreadState *tstate,
PyObject *exc);

extern void _PyErr_SetRaisedException(PyThreadState *tstate, PyObject *exc);
PyAPI_FUNC(void) _PyErr_SetRaisedException(PyThreadState *tstate, PyObject *exc);

extern void _PyErr_Restore(
PyThreadState *tstate,
Expand Down
140 changes: 42 additions & 98 deletions Lib/concurrent/futures/interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ def __str__(self):
""".strip())


UNBOUND = 2 # error; this should not happen.


class WorkerContext(_thread.WorkerContext):

@classmethod
Expand All @@ -47,23 +44,9 @@ def resolve_task(fn, args, kwargs):
if isinstance(fn, str):
# XXX Circle back to this later.
raise TypeError('scripts not supported')
if args or kwargs:
raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}')
data = textwrap.dedent(fn)
kind = 'script'
# Make sure the script compiles.
# Ideally we wouldn't throw away the resulting code
# object. However, there isn't much to be done until
# code objects are shareable and/or we do a better job
# of supporting code objects in _interpreters.exec().
compile(data, '<string>', 'exec')
else:
# Functions defined in the __main__ module can't be pickled,
# so they can't be used here. In the future, we could possibly
# borrow from multiprocessing to work around this.
data = pickle.dumps((fn, args, kwargs))
kind = 'function'
return (data, kind)
task = (fn, args, kwargs)
return task

if initializer is not None:
try:
Expand All @@ -78,39 +61,6 @@ def create_context():
return cls(initdata, shared)
return create_context, resolve_task

@classmethod
@contextlib.contextmanager
def _capture_exc(cls, resultsid):
try:
yield
except BaseException as exc:
# Send the captured exception out on the results queue,
# but still leave it unhandled for the interpreter to handle.
err = pickle.dumps(exc)
_interpqueues.put(resultsid, (None, err), 1, UNBOUND)
raise # re-raise

@classmethod
def _send_script_result(cls, resultsid):
_interpqueues.put(resultsid, (None, None), 0, UNBOUND)

@classmethod
def _call(cls, func, args, kwargs, resultsid):
with cls._capture_exc(resultsid):
res = func(*args or (), **kwargs or {})
# Send the result back.
try:
_interpqueues.put(resultsid, (res, None), 0, UNBOUND)
except _interpreters.NotShareableError:
res = pickle.dumps(res)
_interpqueues.put(resultsid, (res, None), 1, UNBOUND)

@classmethod
def _call_pickled(cls, pickled, resultsid):
with cls._capture_exc(resultsid):
fn, args, kwargs = pickle.loads(pickled)
cls._call(fn, args, kwargs, resultsid)

def __init__(self, initdata, shared=None):
self.initdata = initdata
self.shared = dict(shared) if shared else None
Expand All @@ -121,11 +71,42 @@ def __del__(self):
if self.interpid is not None:
self.finalize()

def _exec(self, script):
assert self.interpid is not None
excinfo = _interpreters.exec(self.interpid, script, restrict=True)
def _call(self, fn, args, kwargs):
def do_call(resultsid, func, *args, **kwargs):
try:
return func(*args, **kwargs)
except BaseException as exc:
# Avoid relying on globals.
import _interpqueues
# Send the captured exception out on the results queue,
# but still leave it unhandled for the interpreter to handle.
_interpqueues.put(resultsid, err)
raise # re-raise

args = (self.resultsid, fn, *args)
res, excinfo = _interpreters.call(self.interpid, do_call, args, kwargs)
if excinfo is not None:
raise ExecutionFailed(excinfo)
return res

def _get_exception(self):
# Wait for the exception data to show up.
while True:
try:
excdata = _interpqueues.get(self.resultsid)
except _interpqueues.QueueNotFoundError:
raise # re-raise
except _interpqueues.QueueError:
continue
except ModuleNotFoundError:
# interpreters.queues doesn't exist, which means
# QueueEmpty doesn't. Act as though it does.
continue
else:
break
exc, unboundop = excdata
assert unboundop is None, unboundop
return exc

def initialize(self):
assert self.interpid is None, self.interpid
Expand All @@ -134,8 +115,7 @@ def initialize(self):
_interpreters.incref(self.interpid)

maxsize = 0
fmt = 0
self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND)
self.resultsid = _interpqueues.create(maxsize)

self._exec(f'from {__name__} import WorkerContext')

Expand Down Expand Up @@ -166,48 +146,12 @@ def finalize(self):
pass

def run(self, task):
data, kind = task
if kind == 'script':
raise NotImplementedError('script kind disabled')
script = f"""
with WorkerContext._capture_exc({self.resultsid}):
{textwrap.indent(data, ' ')}
WorkerContext._send_script_result({self.resultsid})"""
elif kind == 'function':
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
else:
raise NotImplementedError(kind)

fn, args, kwargs = task
try:
self._exec(script)
except ExecutionFailed as exc:
exc_wrapper = exc
else:
exc_wrapper = None

# Return the result, or raise the exception.
while True:
try:
obj = _interpqueues.get(self.resultsid)
except _interpqueues.QueueNotFoundError:
raise # re-raise
except _interpqueues.QueueError:
continue
except ModuleNotFoundError:
# interpreters.queues doesn't exist, which means
# QueueEmpty doesn't. Act as though it does.
continue
else:
break
(res, excdata), pickled, unboundop = obj
assert unboundop is None, unboundop
if excdata is not None:
assert res is None, res
assert pickled
assert exc_wrapper is not None
exc = pickle.loads(excdata)
raise exc from exc_wrapper
return pickle.loads(res) if pickled else res
return self._call(fn, args, kwargs)
except ExecutionFailed as wrapper:
exc = self._get_exception()
raise exc from wrapper


class BrokenInterpreterPool(_thread.BrokenThreadPool):
Expand Down
Loading
Loading