Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,4 @@ target/
.idea/*
MANIFEST
.vscode/*
!.vscode/tasks.json
75 changes: 75 additions & 0 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
{
// vscode tasks to easily run things.
// Ctrl + Shift + B to bring tasks up and select one
// See https://go.microsoft.com/fwlink/?LinkId=733558
// for the documentation about the tasks.json format
"version": "0.1.0",
"command": "cmd",
"isShellCommand": true,
"showOutput": "silent",
"args": ["/C"],

"tasks": [
{
"taskName": "go to github",
"suppressTaskName": true,
"isBuildCommand": true,
"args": ["start https://github.com/jupyter-incubator/sparkmagic"]
},
{
"taskName": "docker build",
"suppressTaskName": true,
"isBuildCommand": true,
"args": ["docker-compose build"]
},
{
"taskName": "docker-compose up",
"suppressTaskName": true,
"isBuildCommand": false,
"isTestCommand": true,
"args": ["docker-compose up -d && start http://localhost:8888"]
},
{
"taskName": "docker-compose down",
"suppressTaskName": true,
"isBuildCommand": false,
"isTestCommand": true,
"args": ["docker-compose down"]
},
{
"taskName": "test all",
"suppressTaskName": true,
"isBuildCommand": false,
"isTestCommand": true,
"args": ["nosetests autovizwidget hdijupyterutils sparkmagic"]
},
{
"taskName": "test autovizwidget",
"suppressTaskName": true,
"isBuildCommand": false,
"isTestCommand": true,
"args": ["nosetests autovizwidget"]
},
{
"taskName": "test hdijupyterutils",
"suppressTaskName": true,
"isBuildCommand": false,
"isTestCommand": true,
"args": ["nosetests hdijupyterutils"]
},
{
"taskName": "test sparkmagic",
"suppressTaskName": true,
"isBuildCommand": false,
"isTestCommand": true,
"args": ["nosetests sparkmagic"]
},
{
"taskName": "test file",
"suppressTaskName": true,
"isBuildCommand": false,
"isTestCommand": true,
"args": ["nosetests --nocapture ${file}"]
}
]
}
2 changes: 2 additions & 0 deletions Dockerfile.jupyter
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ RUN conda install requests-kerberos -y

USER $NB_USER

COPY examples /home/jovyan/work

# Install sparkmagic - if DEV_MODE is set, use the one in the host directory.
# Otherwise, just install from pip.
COPY hdijupyterutils hdijupyterutils/
Expand Down
2 changes: 1 addition & 1 deletion autovizwidget/autovizwidget/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.12.1'
__version__ = '0.12.3'
2 changes: 1 addition & 1 deletion hdijupyterutils/hdijupyterutils/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.12.1'
__version__ = '0.12.3'
8 changes: 5 additions & 3 deletions sparkmagic/example_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
},

"wait_for_idle_timeout_seconds": 15,
"status_sleep_seconds": 2,
"statement_sleep_seconds": 2,
"livy_session_startup_timeout_seconds": 60,

"fatal_error_suggestion": "The code failed because of a fatal error:\n\t{}.\n\nSome things to try:\na) Make sure Spark has enough available resources for Jupyter to create a Spark context.\nb) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.\nc) Restart the kernel.",
Expand All @@ -66,5 +64,9 @@
"heartbeat_retry_seconds": 10,

"server_extension_default_kernel_name": "pysparkkernel",
"custom_headers": {}
"custom_headers": {},

"retry_policy": "configurable",
"retry_seconds_to_sleep_list": [0.2, 0.5, 1, 3, 5],
"configurable_retry_policy_max_retries": 8
}
2 changes: 1 addition & 1 deletion sparkmagic/sparkmagic/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '0.12.1'
__version__ = '0.12.3'

from sparkmagic.serverextension.handlers import load_jupyter_server_extension

Expand Down
13 changes: 10 additions & 3 deletions sparkmagic/sparkmagic/controllerwidget/magicscontrollerwidget.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,17 @@ def _get_default_endpoints():
for kernel_type in LANGS_SUPPORTED:
endpoint_config = getattr(conf, 'kernel_%s_credentials' % kernel_type)()
if all([p in endpoint_config for p in ["url", "password", "username"]]) and endpoint_config["url"] != "":
user = endpoint_config["username"]
passwd = endpoint_config["password"]

authentication = endpoint_config.get("auth", None)
if authentication is None:
authentication = conf.get_auth_value(user, passwd)

default_endpoints.add(Endpoint(
username=endpoint_config["username"],
password=endpoint_config["password"],
auth=endpoint_config.get("auth", None),
username=user,
password=passwd,
auth=authentication,
url=endpoint_config["url"],
implicitly_added=True))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def cleanup_on_click(button):
self.ipython_display.send_error("Could not clean up endpoint due to error: {}".format(e))
return
self.ipython_display.writeln("Cleaned up endpoint {}".format(url))
self.refresh_method()

cleanup_w = self.ipywidget_factory.get_button(description="Clean Up")
cleanup_w.on_click(cleanup_on_click)
Expand Down Expand Up @@ -114,6 +115,7 @@ def delete_endpoint(button):
except ValueError as e:
self.ipython_display.send_error(str(e))
return
self.refresh_method()

button = self.ipywidget_factory.get_button(description="Delete")
button.on_click(delete_endpoint)
Expand Down
5 changes: 4 additions & 1 deletion sparkmagic/sparkmagic/livyclientlib/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,17 @@ def execute(self, session):
return output

def _get_statement_output(self, session, statement_id):
retries = 1

while True:
statement = session.http_client.get_statement(session.id, statement_id)
status = statement[u"state"].lower()

self.logger.debug(u"Status of statement {} is {}.".format(statement_id, status))

if status not in FINAL_STATEMENT_STATUS:
session.sleep()
session.sleep(retries)
retries += 1
else:
statement_output = statement[u"output"]

Expand Down
31 changes: 31 additions & 0 deletions sparkmagic/sparkmagic/livyclientlib/configurableretrypolicy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright (c) 2015 [email protected]
# Distributed under the terms of the Modified BSD License.

from .linearretrypolicy import LinearRetryPolicy
from sparkmagic.livyclientlib.exceptions import BadUserConfigurationException


class ConfigurableRetryPolicy(LinearRetryPolicy):
"""Retry policy that returns a configurable number of seconds to sleep between calls,
takes all status codes 500 or above to be retriable, and retries a given maximum number of times.
If the retry count exceeds the number of items in the list, last item in the list is always returned."""

def __init__(self, retry_seconds_to_sleep_list, max_retries):
super(ConfigurableRetryPolicy, self).__init__(-1, max_retries)

# If user configured to an empty list, let's make this behave as
# a Linear Retry Policy by assigning a list of 1 element.
if len(retry_seconds_to_sleep_list) == 0:
retry_seconds_to_sleep_list = [5]
elif not all(n > 0 for n in retry_seconds_to_sleep_list):
raise BadUserConfigurationException(u"All items in the list in your config need to be positive for configurable retry policy")

self.retry_seconds_to_sleep_list = retry_seconds_to_sleep_list
self._max_index = len(self.retry_seconds_to_sleep_list) - 1

def seconds_to_sleep(self, retry_count):
index = max(retry_count - 1, 0)
if index > self._max_index:
index = self._max_index

return self.retry_seconds_to_sleep_list[index]
4 changes: 2 additions & 2 deletions sparkmagic/sparkmagic/livyclientlib/linearretrypolicy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ class LinearRetryPolicy(object):

def __init__(self, seconds_to_sleep, max_retries):
self._seconds_to_sleep = seconds_to_sleep
self._max_retries = max_retries
self.max_retries = max_retries

def should_retry(self, status_code, error, retry_count):
if None in (status_code, retry_count):
return False
return (status_code >= 500 and retry_count <= self._max_retries) or error
return (status_code >= 500 and retry_count <= self.max_retries) or error

def seconds_to_sleep(self, retry_count):
return self._seconds_to_sleep
21 changes: 18 additions & 3 deletions sparkmagic/sparkmagic/livyclientlib/livyreliablehttpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@
# Distributed under the terms of the Modified BSD License.

from .linearretrypolicy import LinearRetryPolicy
from .configurableretrypolicy import ConfigurableRetryPolicy
from .reliablehttpclient import ReliableHttpClient
from sparkmagic.utils.constants import LINEAR_RETRY, CONFIGURABLE_RETRY
import sparkmagic.utils.configuration as conf
from sparkmagic.livyclientlib.exceptions import BadUserConfigurationException


class LivyReliableHttpClient(object):
"""A Livy-specific Http client which wraps the normal ReliableHttpClient. Propagates
HttpClientExceptions up."""
def __init__(self, http_client):
def __init__(self, http_client, endpoint):
self.endpoint = endpoint
self._http_client = http_client

@staticmethod
def from_endpoint(endpoint):
headers = {"Content-Type": "application/json" }
headers.update(conf.custom_headers())
retry_policy = LinearRetryPolicy(seconds_to_sleep=5, max_retries=5)
return LivyReliableHttpClient(ReliableHttpClient(endpoint, headers, retry_policy))
retry_policy = LivyReliableHttpClient._get_retry_policy()
return LivyReliableHttpClient(ReliableHttpClient(endpoint, headers, retry_policy), endpoint)

def post_statement(self, session_id, data):
return self._http_client.post(self._statements_url(session_id), [201], data).json()
Expand Down Expand Up @@ -54,3 +58,14 @@ def _statements_url(session_id):
@staticmethod
def _statement_url(session_id, statement_id):
return "/sessions/{}/statements/{}".format(session_id, statement_id)

@staticmethod
def _get_retry_policy():
policy = conf.retry_policy()

if policy == LINEAR_RETRY:
return LinearRetryPolicy(seconds_to_sleep=5, max_retries=5)
elif policy == CONFIGURABLE_RETRY:
return ConfigurableRetryPolicy(retry_seconds_to_sleep_list=conf.retry_seconds_to_sleep_list(), max_retries=conf.configurable_retry_policy_max_retries())
else:
raise BadUserConfigurationException(u"Retry policy '{}' not supported".format(policy))
27 changes: 15 additions & 12 deletions sparkmagic/sparkmagic/livyclientlib/livysession.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sparkmagic.utils.sparklogger import SparkLog
from sparkmagic.utils.sparkevents import SparkEvents
from sparkmagic.utils.utils import get_sessions_info_html
from .configurableretrypolicy import ConfigurableRetryPolicy
from .command import Command
from .exceptions import LivyClientTimeoutException, \
LivyUnexpectedStatusException, BadUserDataException, SqlContextNotFoundException
Expand Down Expand Up @@ -76,12 +77,9 @@ def __init__(self, http_client, properties, ipython_display,
spark_events = SparkEvents()
self._spark_events = spark_events

status_sleep_seconds = conf.status_sleep_seconds()
statement_sleep_seconds = conf.statement_sleep_seconds()
self._policy = ConfigurableRetryPolicy(retry_seconds_to_sleep_list=[0.2, 0.5, 0.5, 1, 1, 2], max_retries=5000)
wait_for_idle_timeout_seconds = conf.wait_for_idle_timeout_seconds()

assert status_sleep_seconds > 0
assert statement_sleep_seconds > 0
assert wait_for_idle_timeout_seconds > 0

self.logger = SparkLog(u"LivySession")
Expand All @@ -94,8 +92,6 @@ def __init__(self, http_client, properties, ipython_display,
self._app_id = None
self._logs = u""
self._http_client = http_client
self._status_sleep_seconds = status_sleep_seconds
self._statement_sleep_seconds = statement_sleep_seconds
self._wait_for_idle_timeout_seconds = wait_for_idle_timeout_seconds
self._printed_resource_warning = False

Expand Down Expand Up @@ -191,6 +187,10 @@ def get_spark_ui_url(self):
def http_client(self):
return self._http_client

@property
def endpoint(self):
return self._http_client.endpoint

@staticmethod
def is_final_status(status):
return status in constants.FINAL_STATUS
Expand Down Expand Up @@ -219,15 +219,15 @@ def delete(self):
self._spark_events.emit_session_deletion_end_event(self.guid, self.kind, session_id, self.status, True, "", "")

def wait_for_idle(self, seconds_to_wait=None):
"""Wait for session to go to idle status. Sleep meanwhile. Calls done every status_sleep_seconds as
indicated by the constructor.
"""Wait for session to go to idle status. Sleep meanwhile.

Parameters:
seconds_to_wait : number of seconds to wait before giving up.
"""
if seconds_to_wait is None:
seconds_to_wait = self._wait_for_idle_timeout_seconds

retries = 1
while True:
self.refresh_status_and_info()
if self.status == constants.IDLE_SESSION_STATUS:
Expand All @@ -252,13 +252,16 @@ def wait_for_idle(self, seconds_to_wait=None):
self._printed_resource_warning = True

start_time = time()
sleep_time = self._policy.seconds_to_sleep(retries)
retries += 1

self.logger.debug(u"Session {} in state {}. Sleeping {} seconds."
.format(self.id, self.status, self._status_sleep_seconds))
sleep(self._status_sleep_seconds)
.format(self.id, self.status, sleep_time))
sleep(sleep_time)
seconds_to_wait -= time() - start_time

def sleep(self):
sleep(self._statement_sleep_seconds)
def sleep(self, retries):
sleep(self._policy.seconds_to_sleep(retries))

# This function will refresh the status and get the logs in a single call.
# Only the status will be returned as the return value.
Expand Down
6 changes: 6 additions & 0 deletions sparkmagic/sparkmagic/livyclientlib/sessionmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ def get_session_id_for_client(self, name):
return self._sessions[name].id
return None

def get_session_name_by_id_endpoint(self, id, endpoint):
for (name, session) in self._sessions.items():
if session.id == int(id) and session.endpoint == endpoint:
return name
return None

def delete_client(self, name):
self._remove_session(name)

Expand Down
17 changes: 11 additions & 6 deletions sparkmagic/sparkmagic/livyclientlib/sparkcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,17 @@ def delete_session_by_name(self, name):
self.session_manager.delete_client(name)

def delete_session_by_id(self, endpoint, session_id):
http_client = self._http_client(endpoint)
response = http_client.get_session(session_id)
http_client = self._http_client(endpoint)
session = self._livy_session(http_client, {constants.LIVY_KIND_PARAM: response[constants.LIVY_KIND_PARAM]},
self.ipython_display, session_id)
session.delete()
name = self.session_manager.get_session_name_by_id_endpoint(session_id, endpoint)

if name in self.session_manager.get_sessions_list():
self.delete_session_by_name(name)
else:
http_client = self._http_client(endpoint)
response = http_client.get_session(session_id)
http_client = self._http_client(endpoint)
session = self._livy_session(http_client, {constants.LIVY_KIND_PARAM: response[constants.LIVY_KIND_PARAM]},
self.ipython_display, session_id)
session.delete()

def add_session(self, name, endpoint, skip_if_exists, properties):
if skip_if_exists and (name in self.session_manager.get_sessions_list()):
Expand Down
Loading