Skip to content

PR: Migrate package to use asyncio, remove Python 2.7 support #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: develop
Choose a base branch
from
25 changes: 2 additions & 23 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
version: 2

jobs:
python2-test:
python3-test:
docker:
- image: "python:2.7-stretch"
- image: "python:3.5-stretch"
steps:
- checkout
- run: pip install -e .[test]
Expand All @@ -12,24 +12,6 @@ jobs:
- run: pycodestyle pyls_jsonrpc test
- run: pyflakes pyls_jsonrpc test

python3-test:
docker:
- image: "python:3.5-stretch"
steps:
- checkout
- run: pip install -e .[test]
- run: py.test test/

lint:
docker:
- image: "python:2.7-stretch"
steps:
- checkout
- run: pip install -e .[all] .[test]
- run: pylint pyls_jsonrpc test
- run: pycodestyle pyls_jsonrpc test
- run: pyflakes pyls_jsonrpc test

publish:
docker:
- image: "python:3.5-stretch"
Expand All @@ -42,8 +24,6 @@ workflows:
version: 2
build:
jobs:
- python2-test:
filters: { tags: { only: /.*/ } }
- python3-test:
filters: { tags: { only: /.*/ } }
- publish:
Expand All @@ -53,5 +33,4 @@ workflows:
branches:
ignore: /.*/
requires:
- python2-test
- python3-test
4 changes: 0 additions & 4 deletions appveyor.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
environment:
matrix:
- PYTHON: "C:\\Python27"
PYTHON_VERSION: "2.7.15"
PYTHON_ARCH: "64"

- PYTHON: "C:\\Python35"
PYTHON_VERSION: "3.5.7"
PYTHON_ARCH: "64"
Expand Down
12 changes: 6 additions & 6 deletions pyls_jsonrpc/dispatchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
_RE_ALL_CAP = re.compile('([a-z0-9])([A-Z])')


class MethodDispatcher(object):
class MethodDispatcher:
"""JSON RPC dispatcher that calls methods on itself.

Method names are computed by converting camel case to snake case, slashes with double underscores, and removing
dollar signs.
Method names are computed by converting camel case to snake case, slashes
with double underscores, and removing dollar signs.
"""

def __getitem__(self, item):
Expand All @@ -19,17 +19,17 @@ def __getitem__(self, item):
method = getattr(self, method_name)

@functools.wraps(method)
def handler(params):
async def handler(params):
return method(**(params or {}))

return handler
raise KeyError()


def _method_to_string(method):
def _method_to_string(method: str) -> str:
return _camel_to_underscore(method.replace("/", "__").replace("$", ""))


def _camel_to_underscore(string):
def _camel_to_underscore(string: str) -> str:
s1 = _RE_FIRST_CAP.sub(r'\1_\2', string)
return _RE_ALL_CAP.sub(r'\1_\2', s1).lower()
6 changes: 6 additions & 0 deletions pyls_jsonrpc/dispatchers.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

from typing import Coroutine, Any


class MethodDispatcher(object):
def __getitem__(self, item: Any) -> Coroutine: ...
136 changes: 81 additions & 55 deletions pyls_jsonrpc/endpoint.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,73 @@
# Copyright 2018 Palantir Technologies, Inc.
import asyncio
import logging
import uuid
import sys

from concurrent import futures
from .exceptions import JsonRpcException, JsonRpcRequestCancelled, JsonRpcInternalError, JsonRpcMethodNotFound
from typing import Dict, Awaitable

from .exceptions import (
JsonRpcException, JsonRpcRequestCancelled,
JsonRpcInternalError, JsonRpcMethodNotFound)

log = logging.getLogger(__name__)
JSONRPC_VERSION = '2.0'
CANCEL_METHOD = '$/cancelRequest'


class Endpoint(object):
class Endpoint:

def __init__(self, dispatcher, consumer, id_generator=lambda: str(uuid.uuid4()), max_workers=5):
def __init__(self,
dispatcher,
consumer,
id_generator=lambda: str(uuid.uuid4()),
loop=None):
"""A JSON RPC endpoint for managing messages sent to/from the client.

Args:
dispatcher (dict): A dictionary of method name to handler function.
The handler functions should return either the result or a callable that will be used to asynchronously
compute the result.
consumer (fn): A function that consumes JSON RPC message dicts and sends them to the client.
id_generator (fn, optional): A function used to generate request IDs.
Defaults to the string value of :func:`uuid.uuid4`.
max_workers (int, optional): The number of workers in the asynchronous executor pool.
The handler functions should return either the result or a
callable that will be used to asynchronously compute
the result.
consumer (fn): A function that consumes JSON RPC message dicts and
sends them to the client.
id_generator (fn, optional): A function used to generate
request IDs. Defaults to the string value
of :func:`uuid.uuid4`.
max_workers (int, optional): The number of workers in the
asynchronous executor pool.
"""
self._dispatcher = dispatcher
self._consumer = consumer
self._id_generator = id_generator

self._client_request_futures = {}
self._server_request_futures = {}
self._executor_service = futures.ThreadPoolExecutor(max_workers=max_workers)
self.loop = (
asyncio.get_event_loop() if loop is None else loop) # type: asyncio.BaseEventLoop
self._client_request_futures = {} # type: Dict[str, Awaitable]
self._server_request_futures = {} # type: Dict[str, Awaitable]

def shutdown(self):
self._executor_service.shutdown()
# self._executor_service.shutdown()
self.loop.close()

def notify(self, method, params=None):
async def notify(self, method, params=None):
"""Send a JSON RPC notification to the client.

Args:
method (str): The method name of the notification to send
params (any): The payload of the notification
"""
log.debug('Sending notification: %s %s', method, params)

message = {
'jsonrpc': JSONRPC_VERSION,
'method': method,
}
if params is not None:
message['params'] = params

self._consumer(message)
await self._consumer(message)

def request(self, method, params=None):
async def request(self, method, params=None):
"""Send a JSON RPC request to the client.

Args:
Expand All @@ -75,23 +88,24 @@ def request(self, method, params=None):
if params is not None:
message['params'] = params

request_future = futures.Future()
request_future = asyncio.Future()
request_future.add_done_callback(self._cancel_callback(msg_id))

self._server_request_futures[msg_id] = request_future
self._consumer(message)
await self._consumer(message)

return request_future

def _cancel_callback(self, request_id):
"""Construct a cancellation callback for the given request ID."""
def callback(future):
if future.cancelled():
self.notify(CANCEL_METHOD, {'id': request_id})
future.set_exception(JsonRpcRequestCancelled())
asyncio.ensure_future(
self.notify(CANCEL_METHOD, {'id': request_id}),
loop=self.loop)
return callback

def consume(self, message):
async def consume(self, message):
"""Consume a JSON RPC message from the client.

Args:
Expand All @@ -103,33 +117,36 @@ def consume(self, message):

if 'id' not in message:
log.debug("Handling notification from client %s", message)
self._handle_notification(message['method'], message.get('params'))
await self._handle_notification(
message['method'], message.get('params'))
elif 'method' not in message:
log.debug("Handling response from client %s", message)
self._handle_response(message['id'], message.get('result'), message.get('error'))
await self._handle_response(
message['id'], message.get('result'), message.get('error'))
else:
try:
log.debug("Handling request from client %s", message)
self._handle_request(message['id'], message['method'], message.get('params'))
await self._handle_request(
message['id'], message['method'], message.get('params'))
except JsonRpcException as e:
log.exception("Failed to handle request %s", message['id'])
self._consumer({
await self._consumer({
'jsonrpc': JSONRPC_VERSION,
'id': message['id'],
'error': e.to_dict()
})
except Exception: # pylint: disable=broad-except
log.exception("Failed to handle request %s", message['id'])
self._consumer({
await self._consumer({
'jsonrpc': JSONRPC_VERSION,
'id': message['id'],
'error': JsonRpcInternalError.of(sys.exc_info()).to_dict()
})

def _handle_notification(self, method, params):
async def _handle_notification(self, method, params):
"""Handle a notification from the client."""
if method == CANCEL_METHOD:
self._handle_cancel_notification(params['id'])
await self._handle_cancel_notification(params['id'])
return

try:
Expand All @@ -141,38 +158,44 @@ def _handle_notification(self, method, params):
try:
handler_result = handler(params)
except Exception: # pylint: disable=broad-except
log.exception("Failed to handle notification %s: %s", method, params)
log.exception(
"Failed to handle notification %s: %s", method, params)
return

if callable(handler_result):
log.debug("Executing async notification handler %s", handler_result)
notification_future = self._executor_service.submit(handler_result)
notification_future.add_done_callback(self._notification_callback(method, params))
log.debug(
"Executing async notification handler %s", handler_result)
notification_task = asyncio.ensure_future(handler_result)
notification_task.add_done_callback(
self._notification_callback(method, params))

@staticmethod
def _notification_callback(method, params):
"""Construct a notification callback for the given request ID."""
def callback(future):
try:
future.result()
log.debug("Successfully handled async notification %s %s", method, params)
log.debug("Successfully handled async notification %s %s",
method, params)
except Exception: # pylint: disable=broad-except
log.exception("Failed to handle async notification %s %s", method, params)
log.exception("Failed to handle async notification %s %s",
method, params)
return callback

def _handle_cancel_notification(self, msg_id):
async def _handle_cancel_notification(self, msg_id):
"""Handle a cancel notification from the client."""
request_future = self._client_request_futures.pop(msg_id, None)

if not request_future:
log.warning("Received cancel notification for unknown message id %s", msg_id)
log.warning("Received cancel notification for unknown message id %s",
msg_id)
return

# Will only work if the request hasn't started executing
if request_future.cancel():
log.debug("Cancelled request with id %s", msg_id)

def _handle_request(self, msg_id, method, params):
async def _handle_request(self, msg_id, method, params):
"""Handle a request from the client."""
try:
handler = self._dispatcher[method]
Expand All @@ -181,18 +204,20 @@ def _handle_request(self, msg_id, method, params):

handler_result = handler(params)

if callable(handler_result):
if callable(handler_result) or asyncio.iscoroutine(handler_result):
log.debug("Executing async request handler %s", handler_result)
request_future = self._executor_service.submit(handler_result)
self._client_request_futures[msg_id] = request_future
request_future.add_done_callback(self._request_callback(msg_id))
elif isinstance(handler_result, futures.Future):
# request_future = self._executor_service.submit(handler_result)
request_task = asyncio.ensure_future(handler_result)
self._client_request_futures[msg_id] = request_task
request_task.add_done_callback(self._request_callback(msg_id))
elif isinstance(handler_result, asyncio.Future):
log.debug("Request handler is already a future %s", handler_result)
self._client_request_futures[msg_id] = handler_result
handler_result.add_done_callback(self._request_callback(msg_id))
else:
log.debug("Got result from synchronous request handler: %s", handler_result)
self._consumer({
log.debug("Got result from synchronous request handler: %s",
handler_result)
await self._consumer({
'jsonrpc': JSONRPC_VERSION,
'id': msg_id,
'result': handler_result
Expand All @@ -219,24 +244,25 @@ def callback(future):
message['error'] = e.to_dict()
except Exception: # pylint: disable=broad-except
log.exception("Failed to handle request %s", request_id)
message['error'] = JsonRpcInternalError.of(sys.exc_info()).to_dict()
message['error'] = JsonRpcInternalError.of(
sys.exc_info()).to_dict()

self._consumer(message)
asyncio.ensure_future(self._consumer(message), loop=self.loop)

return callback

def _handle_response(self, msg_id, result=None, error=None):
async def _handle_response(self, msg_id, result=None, error=None):
"""Handle a response from the client."""
request_future = self._server_request_futures.pop(msg_id, None)
request_future = self._server_request_futures.pop(msg_id, None) # type: asyncio.Future

if not request_future:
log.warning("Received response to unknown message id %s", msg_id)
return

if error is not None:
log.debug("Received error response to message %s: %s", msg_id, error)
log.debug(
"Received error response to message %s: %s", msg_id, error)
request_future.set_exception(JsonRpcException.from_dict(error))
return

log.debug("Received result for message %s: %s", msg_id, result)
request_future.set_result(result)
else:
log.debug("Received result for message %s: %s", msg_id, result)
request_future.set_result(result)
Loading