This repository was archived by the owner on Sep 21, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
use asyncio for websocket communication #413
Open
svalaskevicius
wants to merge
3
commits into
ensime:master
Choose a base branch
from
svalaskevicius:async-ws
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,7 +11,6 @@ | |
from subprocess import PIPE, Popen | ||
from threading import Thread | ||
|
||
import websocket | ||
|
||
from .config import feedback, gconfig, LOG_FORMAT | ||
from .debugger import DebuggerClient | ||
|
@@ -21,10 +20,15 @@ | |
from .util import catch, Pretty, Util | ||
|
||
# Queue depends on python version | ||
if sys.version_info > (3, 0): | ||
from queue import Queue | ||
else: | ||
from Queue import Queue | ||
#if sys.version_info >= (3, 5): | ||
import asyncio | ||
import websockets | ||
#elif sys.version_info > (3, 0): | ||
# from queue import Queue | ||
# import websocket | ||
#else: | ||
# from Queue import Queue | ||
# import websocket | ||
|
||
|
||
class EnsimeClient(TypecheckHandler, DebuggerClient, ProtocolHandler): | ||
|
@@ -101,7 +105,11 @@ def setup_logger(): | |
self.refactorings = {} | ||
|
||
# Queue for messages received from the ensime server. | ||
self.queue = Queue() | ||
# if sys.version_info >= (3, 5): | ||
self.ws_loop = None | ||
self.queue = None | ||
# else: | ||
# self.queue = Queue() | ||
self.suggestions = None | ||
self.completion_timeout = 10 # seconds | ||
self.completion_started = False | ||
|
@@ -119,37 +127,49 @@ def setup_logger(): | |
self.debug_thread_id = None | ||
self.running = True | ||
|
||
thread = Thread(name='queue-poller', target=self.queue_poll) | ||
thread = Thread(name='queue-poller', target=self.run_queuer) | ||
thread.daemon = True | ||
thread.start() | ||
|
||
def queue_poll(self, sleep_t=0.5): | ||
async def queue_poll(self, sleep_t=0.5): | ||
"""Put new messages on the queue as they arrive. Blocking in a thread. | ||
|
||
Value of sleep is low to improve responsiveness. | ||
""" | ||
connection_alive = True | ||
|
||
self.log.debug("entered poller") | ||
while self.running: | ||
if self.ws: | ||
self.log.debug('got ws!') | ||
def logger_and_close(msg): | ||
self.log.error('Websocket exception', exc_info=True) | ||
if not self.running: | ||
# Tear down has been invoked | ||
# Prepare to exit the program | ||
connection_alive = False # noqa: F841 | ||
else: | ||
if self.running: | ||
if not self.number_try_connection: | ||
# Stop everything. | ||
self.teardown() | ||
self._display_ws_warning() | ||
self.ws = None | ||
|
||
with catch(websocket.WebSocketException, logger_and_close): | ||
result = self.ws.recv() | ||
self.queue.put(result) | ||
with catch(Exception, logger_and_close): | ||
self.log.debug('poller waiting') | ||
result = await self.ws.recv() | ||
self.log.debug('got result!' + result) | ||
self.queue.put_nowait(result) | ||
|
||
if connection_alive: | ||
time.sleep(sleep_t) | ||
else: | ||
self.log.debug('poller sleeping - no ws!') | ||
await asyncio.sleep(sleep_t) | ||
self.log.debug('poller slept well. no ws!') | ||
|
||
def run_queuer(self): | ||
self.log.debug("get event loop") | ||
self.ws_loop = asyncio.new_event_loop() | ||
self.queue = asyncio.Queue(loop = self.ws_loop) | ||
asyncio.set_event_loop(self.ws_loop) | ||
self.log.debug('starting poller!') | ||
with catch(Exception, self.log.error): | ||
self.ws_loop.run_until_complete(self.queue_poll()) | ||
self.log.debug('done poller!') | ||
self.ws_loop.close() | ||
|
||
def setup(self, quiet=False, bootstrap_server=False): | ||
"""Check the classpath and connect to the server if necessary.""" | ||
|
@@ -190,17 +210,22 @@ def _display_ws_warning(self): | |
|
||
def send(self, msg): | ||
"""Send something to the ensime server.""" | ||
def send_it(msg): | ||
with catch(Exception, self.log.error): | ||
asyncio.run_coroutine_threadsafe(self.ws.send(msg + "\n"), self.ws_loop).result(10) | ||
# TODO: returns future, handle errors | ||
|
||
def reconnect(e): | ||
self.log.error('send error, reconnecting...', exc_info=True) | ||
self.connect_ensime_server() | ||
if self.ws: | ||
self.ws.send(msg + "\n") | ||
send_it(msg) | ||
|
||
self.log.debug('send: in') | ||
if self.running and self.ws: | ||
with catch(websocket.WebSocketException, reconnect): | ||
with catch(Exception, reconnect): | ||
self.log.debug('send: sending JSON on WebSocket') | ||
self.ws.send(msg + "\n") | ||
send_it(msg) | ||
|
||
def connect_ensime_server(self): | ||
"""Start initial connection with the server.""" | ||
|
@@ -219,14 +244,16 @@ def disable_completely(e): | |
port = self.ensime.http_port() | ||
uri = "websocket" if server_v2 else "jerky" | ||
self.ensime_server = gconfig["ensime_server"].format(port, uri) | ||
with catch(websocket.WebSocketException, disable_completely): | ||
with catch(Exception, disable_completely): | ||
# Use the default timeout (no timeout). | ||
options = {"subprotocols": ["jerky"]} if server_v2 else {} | ||
options['enable_multithread'] = True | ||
self.log.debug("About to connect to %s with options %s", | ||
self.ensime_server, options) | ||
self.ws = websocket.create_connection(self.ensime_server, **options) | ||
if self.ws: | ||
async def connect(): | ||
options = {"subprotocols": ["jerky"]} if server_v2 else {} | ||
self.log.debug("About to connect to %s with options %s", | ||
self.ensime_server, options) | ||
self.ws = await websockets.client.connect(self.ensime_server, **options) | ||
return True | ||
gotws = asyncio.run_coroutine_threadsafe(connect(), self.ws_loop).result(10) | ||
if gotws: | ||
self.send_request({"typehint": "ConnectionInfoReq"}) | ||
else: | ||
# If it hits this, number_try_connection is 0 | ||
|
@@ -292,7 +319,7 @@ def open_decl_for_inspector_symbol(self): | |
lineno = self.editor.cursor()[0] | ||
symbol = self.editor.symbol_for_inspector_line(lineno) | ||
self.symbol_by_name([symbol]) | ||
self.unqueue(should_wait=True) | ||
self.unqueue() | ||
|
||
def symbol_by_name(self, args, range=None): | ||
self.log.debug('symbol_by_name: in') | ||
|
@@ -593,38 +620,56 @@ def type_check(self, filename): | |
{"typehint": "TypecheckFilesReq", | ||
"files": [self.editor.path()]}) | ||
|
||
def unqueue(self, timeout=10, should_wait=False): | ||
def unqueue(self, timeout=10): | ||
"""Unqueue all the received ensime responses for a given file.""" | ||
start, now = time.time(), time.time() | ||
wait = self.queue.empty() and should_wait | ||
|
||
while (not self.queue.empty() or wait) and (now - start) < timeout: | ||
if wait and self.queue.empty(): | ||
time.sleep(0.25) | ||
now = time.time() | ||
else: | ||
result = self.queue.get(False) | ||
self.log.debug('unqueue: result received\n%s', result) | ||
if result and result != "nil": | ||
wait = None | ||
# Restart timeout | ||
start, now = time.time(), time.time() | ||
with catch(Exception, self.log.error): | ||
if timeout != 0 or not self.queue.empty(): | ||
if timeout == 0: | ||
timeout = 0.01 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should probably reintroduce should_wait param... although... maybe another function might be better for that? |
||
resultFuture = asyncio.run_coroutine_threadsafe(self.queue.get(), self.ws_loop) | ||
try: | ||
result = resultFuture.result(timeout) | ||
self.log.debug("Unqueued result: " + result) | ||
# TODO: catch asyncio.TimeoutError: and cancel future | ||
_json = json.loads(result) | ||
# Watch out, it may not have callId | ||
call_id = _json.get("callId") | ||
if _json["payload"]: | ||
self.handle_incoming_response(call_id, _json["payload"]) | ||
else: | ||
self.log.debug('unqueue: nil or None received') | ||
|
||
if (now - start) >= timeout: | ||
self.log.warning('unqueue: no reply from server for %ss', timeout) | ||
|
||
except asyncio.TimeoutError: | ||
self.log.debug("Unqueing failed: timeout: "+str(timeout)) | ||
resultFuture.cancel() | ||
|
||
# start, now = time.time(), time.time() | ||
# wait = self.queue.empty() and should_wait | ||
# | ||
# while (not self.queue.empty() or wait) and (now - start) < timeout: | ||
# if wait and self.queue.empty(): | ||
# time.sleep(0.25) | ||
# now = time.time() | ||
# else: | ||
# result = self.queue.get(False) | ||
# self.log.debug('unqueue: result received\n%s', result) | ||
# if result and result != "nil": | ||
# wait = None | ||
# # Restart timeout | ||
# start, now = time.time(), time.time() | ||
# _json = json.loads(result) | ||
# # Watch out, it may not have callId | ||
# call_id = _json.get("callId") | ||
# if _json["payload"]: | ||
# self.handle_incoming_response(call_id, _json["payload"]) | ||
# else: | ||
# self.log.debug('unqueue: nil or None received') | ||
# | ||
# if (now - start) >= timeout: | ||
# self.log.warning('unqueue: no reply from server for %ss', timeout) | ||
# | ||
def unqueue_and_display(self, filename): | ||
"""Unqueue messages and give feedback to user (if necessary).""" | ||
if self.running and self.ws: | ||
self.editor.lazy_display_error(filename) | ||
self.unqueue() | ||
self.unqueue(0) # TODO: no wait | ||
|
||
def tick(self, filename): | ||
"""Try to connect and display messages in queue.""" | ||
|
@@ -671,7 +716,7 @@ def detect_row_column_start(): | |
# Only handle snd invocation if fst has already been done | ||
if self.completion_started: | ||
# Unqueing messages until we get suggestions | ||
self.unqueue(timeout=self.completion_timeout, should_wait=True) | ||
self.unqueue(timeout=self.completion_timeout) | ||
suggestions = self.suggestions or [] | ||
self.log.debug('complete_func: suggestions in') | ||
for m in suggestions: | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably move to a new class, that handles the other thread and exposes thread safe methods to send via ws and get from queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added benefit - diff classes can be used for diff python versions