Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 131 additions & 63 deletions pyqode/core/managers/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ class BackendManager(Manager):
- send_request

"""
LAST_PORT = None
LAST_PROCESS = None
SHARE_COUNT = 0
LAST_PORT = {}
LAST_PROCESS = {}
SHARE_COUNT = {}
share_id_count = 0

def __init__(self, editor):
super(BackendManager, self).__init__(editor)
Expand All @@ -63,7 +64,7 @@ def pick_free_port():
return free_port

def start(self, script, interpreter=sys.executable, args=None,
error_callback=None, reuse=False):
error_callback=None, reuse=False, share_id=None):
"""
Starts the backend process.

Expand All @@ -89,63 +90,122 @@ def start(self, script, interpreter=sys.executable, args=None,
languages you will need to merge all backend scripts into one
single script, otherwise the wrong script might be picked up).
"""
# If no share id is specified, we generate a new unique share id.
if share_id is None:
share_id = 'unique{}'.format(BackendManager.share_id_count)
BackendManager.share_id_count += 1
self._share_id = share_id
self._shared = reuse
if reuse and BackendManager.SHARE_COUNT:
self._port = BackendManager.LAST_PORT
self._process = BackendManager.LAST_PROCESS
BackendManager.SHARE_COUNT += 1
self.server_script = script
self.interpreter = interpreter
self.args = args
# If the current share id doesn't exist yet, register it.
if self._share_id not in BackendManager.SHARE_COUNT:
comm('new share_id: {}'.format(self._share_id))
BackendManager.SHARE_COUNT[self._share_id] = []
# If we want to re-use existing backends, and a backend process is
# already running, re-use it. We register the editor, so that we
# can know, later on, if the backend was already stopped for this
# editor, in case the editor is closed multiple times.
if reuse and BackendManager.SHARE_COUNT[self._share_id]:
self._port = BackendManager.LAST_PORT[self._share_id]
self._process = BackendManager.LAST_PROCESS[self._share_id]
BackendManager.SHARE_COUNT[self._share_id].append(self._editor)
comm('re-using share_id: {} ({})'.format(
self._share_id,
len(BackendManager.SHARE_COUNT[self._share_id])
))
return
if self.running:
comm('stopping share_id: {}'.format(self._share_id))
self.stop()
backend_script = script.replace('.pyc', '.py')
self._port = self.pick_free_port()
if hasattr(sys, "frozen") and not backend_script.endswith('.py'):
# frozen backend script on windows/mac does not need an
# interpreter
program = backend_script
pgm_args = [str(self._port)]
else:
if self.running:
self.stop()
self.server_script = script
self.interpreter = interpreter
self.args = args
backend_script = script.replace('.pyc', '.py')
self._port = self.pick_free_port()
if hasattr(sys, "frozen") and not backend_script.endswith('.py'):
# frozen backend script on windows/mac does not need an
# interpreter
program = backend_script
pgm_args = [str(self._port)]
else:
program = interpreter
pgm_args = [backend_script, str(self._port)]
if args:
pgm_args += args
self._process = BackendProcess(self.editor)
if error_callback:
self._process.error.connect(error_callback)
self._process.start(program, pgm_args)

if reuse:
BackendManager.LAST_PROCESS = self._process
BackendManager.LAST_PORT = self._port
BackendManager.SHARE_COUNT += 1
comm('starting backend process: %s %s', program,
' '.join(pgm_args))
self._heartbeat_timer.start()
program = interpreter
pgm_args = [backend_script, str(self._port)]
if args:
pgm_args += args
self._process = BackendProcess(self.editor)
if error_callback:
self._process.error.connect(error_callback)
self._process.start(program, pgm_args)
if reuse:
BackendManager.LAST_PROCESS[self._share_id] = self._process
BackendManager.LAST_PORT[self._share_id] = self._port
BackendManager.SHARE_COUNT[self._share_id].append(self._editor)
comm('starting share_id: {} (PID={})'.format(
self._share_id, self._process.processId()
))
self._heartbeat_timer.start()

def suspend(self):
"""
Stops the backend process in such a way that it can be restarted with
resume().
"""
comm('suspending share_id: {}'.format(self._share_id))
self.stop()

def resume(self):
"""
Resumes a stopped backend process.
"""
comm('resuming share_id: {}'.format(self._share_id))
self.start(
self.server_script,
interpreter=self.interpreter,
args=self.args,
reuse=self._shared,
share_id=self._share_id
)

def stop(self):
"""
Stops the backend process.
"""
if self._process is None:
comm('no process to stop for share_id: {}'.format(self._share_id))
return
if self._shared:
BackendManager.SHARE_COUNT -= 1
if BackendManager.SHARE_COUNT:
# Remove the current editor from the list of editors that are using
# this shared backend
if self._editor in BackendManager.SHARE_COUNT[self._share_id]:
BackendManager.SHARE_COUNT[self._share_id].remove(self._editor)
# There are still editors using this backend, don't close
if BackendManager.SHARE_COUNT[self._share_id]:
comm('not yet stopping share_id: {} ({})'.format(
self._share_id,
len(BackendManager.SHARE_COUNT[self._share_id])
))
return
comm('stopping backend process')
comm('stopping share_id: {} ({})'.format(
self._share_id,
len(BackendManager.SHARE_COUNT[self._share_id])
))
# close all sockets
for s in self._sockets:
s._callback = None
s.close()

self._sockets[:] = []
# prevent crash logs from being written if we are busy killing
# the process
self._process._prevent_logs = True
while self._process.state() != self._process.NotRunning:
while True:
try:
running = self._process.state() != self._process.NotRunning
except RuntimeError:
# Under some (hard to reproduce) conditions, the underlying
# process object is already deleted at this pont, resulting
# in a RuntimeError.
break
if not running:
break
self._process.waitForFinished(1)
if sys.platform == 'win32':
# Console applications on Windows that do not run an event
Expand All @@ -156,7 +216,7 @@ def stop(self):
self._process.terminate()
self._process._prevent_logs = False
self._heartbeat_timer.stop()
comm('backend process terminated')
comm('stopped share_id: {}'.format(self._share_id))

def send_request(self, worker_class_or_function, args, on_receive=None):
"""
Expand All @@ -168,30 +228,38 @@ def send_request(self, worker_class_or_function, args, on_receive=None):
:param on_receive: an optional callback executed when we receive the
worker's results. The callback will be called with one arguments:
the results of the worker (object)

:raise: backend.NotRunning if the backend process is not running.
"""
if not self.running:
if not BackendManager.SHARE_COUNT[self._share_id]:
comm('not restarting unused share_id: {}'.format(
self._share_id)
)
self._heartbeat_timer.stop()
return
comm('restarting share_id: {}'.format(self._share_id))
try:
# try to restart the backend if it crashed.
self.start(self.server_script, interpreter=self.interpreter,
args=self.args)
except AttributeError:
pass # not started yet
finally:
# caller should try again, later
raise NotRunning()
else:
comm('sending request, worker=%r' % worker_class_or_function)
# create a socket, the request will be send as soon as the socket
# has connected
socket = JsonTcpClient(
self.editor, self._port, worker_class_or_function, args,
on_receive=on_receive)
socket.finished.connect(self._rm_socket)
self._sockets.append(socket)
# restart heartbeat timer
self._heartbeat_timer.start()
self.start(
self.server_script,
interpreter=self.interpreter,
args=self.args,
reuse=self._shared,
share_id=self._share_id
)
except AttributeError as e:
comm('failed to restart share_id: {}, Exception: {}'.format(
self._share_id, e
))
return
# create a socket, the request will be send as soon as the socket
# has connected
socket = JsonTcpClient(
self.editor, self._port, worker_class_or_function, args,
on_receive=on_receive)
socket.finished.connect(self._rm_socket)
self._sockets.append(socket)
# restart heartbeat timer
self._heartbeat_timer.start()

def _send_heartbeat(self):
try:
Expand Down