diff --git a/.gitignore b/.gitignore index 6356ea4b5..bd62b4a15 100644 --- a/.gitignore +++ b/.gitignore @@ -62,3 +62,4 @@ target/ .idea/* MANIFEST .vscode/* +!.vscode/tasks.json diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 000000000..b7c7fd08c --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,75 @@ +{ + // vscode tasks to easily run things. + // Ctrl + Shift + B to bring tasks up and select one + // See https://go.microsoft.com/fwlink/?LinkId=733558 + // for the documentation about the tasks.json format + "version": "0.1.0", + "command": "cmd", + "isShellCommand": true, + "showOutput": "silent", + "args": ["/C"], + + "tasks": [ + { + "taskName": "go to github", + "suppressTaskName": true, + "isBuildCommand": true, + "args": ["start https://github.com/jupyter-incubator/sparkmagic"] + }, + { + "taskName": "docker build", + "suppressTaskName": true, + "isBuildCommand": true, + "args": ["docker-compose build"] + }, + { + "taskName": "docker-compose up", + "suppressTaskName": true, + "isBuildCommand": false, + "isTestCommand": true, + "args": ["docker-compose up -d && start http://localhost:8888"] + }, + { + "taskName": "docker-compose down", + "suppressTaskName": true, + "isBuildCommand": false, + "isTestCommand": true, + "args": ["docker-compose down"] + }, + { + "taskName": "test all", + "suppressTaskName": true, + "isBuildCommand": false, + "isTestCommand": true, + "args": ["nosetests autovizwidget hdijupyterutils sparkmagic"] + }, + { + "taskName": "test autovizwidget", + "suppressTaskName": true, + "isBuildCommand": false, + "isTestCommand": true, + "args": ["nosetests autovizwidget"] + }, + { + "taskName": "test hdijupyterutils", + "suppressTaskName": true, + "isBuildCommand": false, + "isTestCommand": true, + "args": ["nosetests hdijupyterutils"] + }, + { + "taskName": "test sparkmagic", + "suppressTaskName": true, + "isBuildCommand": false, + "isTestCommand": true, + "args": ["nosetests sparkmagic"] + }, + { + "taskName": "test file", + "suppressTaskName": true, + "isBuildCommand": false, + "isTestCommand": true, + "args": ["nosetests --nocapture ${file}"] + } + ] +} \ No newline at end of file diff --git a/Dockerfile.jupyter b/Dockerfile.jupyter index bef552076..457ab051a 100644 --- a/Dockerfile.jupyter +++ b/Dockerfile.jupyter @@ -9,6 +9,8 @@ RUN conda install requests-kerberos -y USER $NB_USER +COPY examples /home/jovyan/work + # Install sparkmagic - if DEV_MODE is set, use the one in the host directory. # Otherwise, just install from pip. COPY hdijupyterutils hdijupyterutils/ diff --git a/autovizwidget/autovizwidget/__init__.py b/autovizwidget/autovizwidget/__init__.py index f8d90954c..c4e914ae8 100644 --- a/autovizwidget/autovizwidget/__init__.py +++ b/autovizwidget/autovizwidget/__init__.py @@ -1 +1 @@ -__version__ = '0.12.1' +__version__ = '0.12.3' diff --git a/hdijupyterutils/hdijupyterutils/__init__.py b/hdijupyterutils/hdijupyterutils/__init__.py index f8d90954c..c4e914ae8 100644 --- a/hdijupyterutils/hdijupyterutils/__init__.py +++ b/hdijupyterutils/hdijupyterutils/__init__.py @@ -1 +1 @@ -__version__ = '0.12.1' +__version__ = '0.12.3' diff --git a/sparkmagic/example_config.json b/sparkmagic/example_config.json index e7f68d37f..aa3cdc242 100644 --- a/sparkmagic/example_config.json +++ b/sparkmagic/example_config.json @@ -43,8 +43,6 @@ }, "wait_for_idle_timeout_seconds": 15, - "status_sleep_seconds": 2, - "statement_sleep_seconds": 2, "livy_session_startup_timeout_seconds": 60, "fatal_error_suggestion": "The code failed because of a fatal error:\n\t{}.\n\nSome things to try:\na) Make sure Spark has enough available resources for Jupyter to create a Spark context.\nb) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.\nc) Restart the kernel.", @@ -66,5 +64,9 @@ "heartbeat_retry_seconds": 10, "server_extension_default_kernel_name": "pysparkkernel", - "custom_headers": {} + "custom_headers": {}, + + "retry_policy": "configurable", + "retry_seconds_to_sleep_list": [0.2, 0.5, 1, 3, 5], + "configurable_retry_policy_max_retries": 8 } diff --git a/sparkmagic/sparkmagic/__init__.py b/sparkmagic/sparkmagic/__init__.py index 35350636d..a996e0cfc 100644 --- a/sparkmagic/sparkmagic/__init__.py +++ b/sparkmagic/sparkmagic/__init__.py @@ -1,4 +1,4 @@ -__version__ = '0.12.1' +__version__ = '0.12.3' from sparkmagic.serverextension.handlers import load_jupyter_server_extension diff --git a/sparkmagic/sparkmagic/controllerwidget/magicscontrollerwidget.py b/sparkmagic/sparkmagic/controllerwidget/magicscontrollerwidget.py index f464a8cf3..e2732d8c1 100644 --- a/sparkmagic/sparkmagic/controllerwidget/magicscontrollerwidget.py +++ b/sparkmagic/sparkmagic/controllerwidget/magicscontrollerwidget.py @@ -30,10 +30,17 @@ def _get_default_endpoints(): for kernel_type in LANGS_SUPPORTED: endpoint_config = getattr(conf, 'kernel_%s_credentials' % kernel_type)() if all([p in endpoint_config for p in ["url", "password", "username"]]) and endpoint_config["url"] != "": + user = endpoint_config["username"] + passwd = endpoint_config["password"] + + authentication = endpoint_config.get("auth", None) + if authentication is None: + authentication = conf.get_auth_value(user, passwd) + default_endpoints.add(Endpoint( - username=endpoint_config["username"], - password=endpoint_config["password"], - auth=endpoint_config.get("auth", None), + username=user, + password=passwd, + auth=authentication, url=endpoint_config["url"], implicitly_added=True)) diff --git a/sparkmagic/sparkmagic/controllerwidget/manageendpointwidget.py b/sparkmagic/sparkmagic/controllerwidget/manageendpointwidget.py index dbd6c721d..c210d6264 100644 --- a/sparkmagic/sparkmagic/controllerwidget/manageendpointwidget.py +++ b/sparkmagic/sparkmagic/controllerwidget/manageendpointwidget.py @@ -87,6 +87,7 @@ def cleanup_on_click(button): self.ipython_display.send_error("Could not clean up endpoint due to error: {}".format(e)) return self.ipython_display.writeln("Cleaned up endpoint {}".format(url)) + self.refresh_method() cleanup_w = self.ipywidget_factory.get_button(description="Clean Up") cleanup_w.on_click(cleanup_on_click) @@ -114,6 +115,7 @@ def delete_endpoint(button): except ValueError as e: self.ipython_display.send_error(str(e)) return + self.refresh_method() button = self.ipywidget_factory.get_button(description="Delete") button.on_click(delete_endpoint) diff --git a/sparkmagic/sparkmagic/livyclientlib/command.py b/sparkmagic/sparkmagic/livyclientlib/command.py index 082fa913b..c215f4c05 100644 --- a/sparkmagic/sparkmagic/livyclientlib/command.py +++ b/sparkmagic/sparkmagic/livyclientlib/command.py @@ -44,6 +44,8 @@ def execute(self, session): return output def _get_statement_output(self, session, statement_id): + retries = 1 + while True: statement = session.http_client.get_statement(session.id, statement_id) status = statement[u"state"].lower() @@ -51,7 +53,8 @@ def _get_statement_output(self, session, statement_id): self.logger.debug(u"Status of statement {} is {}.".format(statement_id, status)) if status not in FINAL_STATEMENT_STATUS: - session.sleep() + session.sleep(retries) + retries += 1 else: statement_output = statement[u"output"] diff --git a/sparkmagic/sparkmagic/livyclientlib/configurableretrypolicy.py b/sparkmagic/sparkmagic/livyclientlib/configurableretrypolicy.py new file mode 100644 index 000000000..8ed854744 --- /dev/null +++ b/sparkmagic/sparkmagic/livyclientlib/configurableretrypolicy.py @@ -0,0 +1,31 @@ +# Copyright (c) 2015 aggftw@gmail.com +# Distributed under the terms of the Modified BSD License. + +from .linearretrypolicy import LinearRetryPolicy +from sparkmagic.livyclientlib.exceptions import BadUserConfigurationException + + +class ConfigurableRetryPolicy(LinearRetryPolicy): + """Retry policy that returns a configurable number of seconds to sleep between calls, + takes all status codes 500 or above to be retriable, and retries a given maximum number of times. + If the retry count exceeds the number of items in the list, last item in the list is always returned.""" + + def __init__(self, retry_seconds_to_sleep_list, max_retries): + super(ConfigurableRetryPolicy, self).__init__(-1, max_retries) + + # If user configured to an empty list, let's make this behave as + # a Linear Retry Policy by assigning a list of 1 element. + if len(retry_seconds_to_sleep_list) == 0: + retry_seconds_to_sleep_list = [5] + elif not all(n > 0 for n in retry_seconds_to_sleep_list): + raise BadUserConfigurationException(u"All items in the list in your config need to be positive for configurable retry policy") + + self.retry_seconds_to_sleep_list = retry_seconds_to_sleep_list + self._max_index = len(self.retry_seconds_to_sleep_list) - 1 + + def seconds_to_sleep(self, retry_count): + index = max(retry_count - 1, 0) + if index > self._max_index: + index = self._max_index + + return self.retry_seconds_to_sleep_list[index] diff --git a/sparkmagic/sparkmagic/livyclientlib/linearretrypolicy.py b/sparkmagic/sparkmagic/livyclientlib/linearretrypolicy.py index 534511635..f02bc1db4 100644 --- a/sparkmagic/sparkmagic/livyclientlib/linearretrypolicy.py +++ b/sparkmagic/sparkmagic/livyclientlib/linearretrypolicy.py @@ -8,12 +8,12 @@ class LinearRetryPolicy(object): def __init__(self, seconds_to_sleep, max_retries): self._seconds_to_sleep = seconds_to_sleep - self._max_retries = max_retries + self.max_retries = max_retries def should_retry(self, status_code, error, retry_count): if None in (status_code, retry_count): return False - return (status_code >= 500 and retry_count <= self._max_retries) or error + return (status_code >= 500 and retry_count <= self.max_retries) or error def seconds_to_sleep(self, retry_count): return self._seconds_to_sleep diff --git a/sparkmagic/sparkmagic/livyclientlib/livyreliablehttpclient.py b/sparkmagic/sparkmagic/livyclientlib/livyreliablehttpclient.py index a86973189..e31b1973d 100644 --- a/sparkmagic/sparkmagic/livyclientlib/livyreliablehttpclient.py +++ b/sparkmagic/sparkmagic/livyclientlib/livyreliablehttpclient.py @@ -2,22 +2,26 @@ # Distributed under the terms of the Modified BSD License. from .linearretrypolicy import LinearRetryPolicy +from .configurableretrypolicy import ConfigurableRetryPolicy from .reliablehttpclient import ReliableHttpClient +from sparkmagic.utils.constants import LINEAR_RETRY, CONFIGURABLE_RETRY import sparkmagic.utils.configuration as conf +from sparkmagic.livyclientlib.exceptions import BadUserConfigurationException class LivyReliableHttpClient(object): """A Livy-specific Http client which wraps the normal ReliableHttpClient. Propagates HttpClientExceptions up.""" - def __init__(self, http_client): + def __init__(self, http_client, endpoint): + self.endpoint = endpoint self._http_client = http_client @staticmethod def from_endpoint(endpoint): headers = {"Content-Type": "application/json" } headers.update(conf.custom_headers()) - retry_policy = LinearRetryPolicy(seconds_to_sleep=5, max_retries=5) - return LivyReliableHttpClient(ReliableHttpClient(endpoint, headers, retry_policy)) + retry_policy = LivyReliableHttpClient._get_retry_policy() + return LivyReliableHttpClient(ReliableHttpClient(endpoint, headers, retry_policy), endpoint) def post_statement(self, session_id, data): return self._http_client.post(self._statements_url(session_id), [201], data).json() @@ -54,3 +58,14 @@ def _statements_url(session_id): @staticmethod def _statement_url(session_id, statement_id): return "/sessions/{}/statements/{}".format(session_id, statement_id) + + @staticmethod + def _get_retry_policy(): + policy = conf.retry_policy() + + if policy == LINEAR_RETRY: + return LinearRetryPolicy(seconds_to_sleep=5, max_retries=5) + elif policy == CONFIGURABLE_RETRY: + return ConfigurableRetryPolicy(retry_seconds_to_sleep_list=conf.retry_seconds_to_sleep_list(), max_retries=conf.configurable_retry_policy_max_retries()) + else: + raise BadUserConfigurationException(u"Retry policy '{}' not supported".format(policy)) diff --git a/sparkmagic/sparkmagic/livyclientlib/livysession.py b/sparkmagic/sparkmagic/livyclientlib/livysession.py index c0ff49ff3..84b9e4633 100644 --- a/sparkmagic/sparkmagic/livyclientlib/livysession.py +++ b/sparkmagic/sparkmagic/livyclientlib/livysession.py @@ -9,6 +9,7 @@ from sparkmagic.utils.sparklogger import SparkLog from sparkmagic.utils.sparkevents import SparkEvents from sparkmagic.utils.utils import get_sessions_info_html +from .configurableretrypolicy import ConfigurableRetryPolicy from .command import Command from .exceptions import LivyClientTimeoutException, \ LivyUnexpectedStatusException, BadUserDataException, SqlContextNotFoundException @@ -76,12 +77,9 @@ def __init__(self, http_client, properties, ipython_display, spark_events = SparkEvents() self._spark_events = spark_events - status_sleep_seconds = conf.status_sleep_seconds() - statement_sleep_seconds = conf.statement_sleep_seconds() + self._policy = ConfigurableRetryPolicy(retry_seconds_to_sleep_list=[0.2, 0.5, 0.5, 1, 1, 2], max_retries=5000) wait_for_idle_timeout_seconds = conf.wait_for_idle_timeout_seconds() - assert status_sleep_seconds > 0 - assert statement_sleep_seconds > 0 assert wait_for_idle_timeout_seconds > 0 self.logger = SparkLog(u"LivySession") @@ -94,8 +92,6 @@ def __init__(self, http_client, properties, ipython_display, self._app_id = None self._logs = u"" self._http_client = http_client - self._status_sleep_seconds = status_sleep_seconds - self._statement_sleep_seconds = statement_sleep_seconds self._wait_for_idle_timeout_seconds = wait_for_idle_timeout_seconds self._printed_resource_warning = False @@ -191,6 +187,10 @@ def get_spark_ui_url(self): def http_client(self): return self._http_client + @property + def endpoint(self): + return self._http_client.endpoint + @staticmethod def is_final_status(status): return status in constants.FINAL_STATUS @@ -219,8 +219,7 @@ def delete(self): self._spark_events.emit_session_deletion_end_event(self.guid, self.kind, session_id, self.status, True, "", "") def wait_for_idle(self, seconds_to_wait=None): - """Wait for session to go to idle status. Sleep meanwhile. Calls done every status_sleep_seconds as - indicated by the constructor. + """Wait for session to go to idle status. Sleep meanwhile. Parameters: seconds_to_wait : number of seconds to wait before giving up. @@ -228,6 +227,7 @@ def wait_for_idle(self, seconds_to_wait=None): if seconds_to_wait is None: seconds_to_wait = self._wait_for_idle_timeout_seconds + retries = 1 while True: self.refresh_status_and_info() if self.status == constants.IDLE_SESSION_STATUS: @@ -252,13 +252,16 @@ def wait_for_idle(self, seconds_to_wait=None): self._printed_resource_warning = True start_time = time() + sleep_time = self._policy.seconds_to_sleep(retries) + retries += 1 + self.logger.debug(u"Session {} in state {}. Sleeping {} seconds." - .format(self.id, self.status, self._status_sleep_seconds)) - sleep(self._status_sleep_seconds) + .format(self.id, self.status, sleep_time)) + sleep(sleep_time) seconds_to_wait -= time() - start_time - def sleep(self): - sleep(self._statement_sleep_seconds) + def sleep(self, retries): + sleep(self._policy.seconds_to_sleep(retries)) # This function will refresh the status and get the logs in a single call. # Only the status will be returned as the return value. diff --git a/sparkmagic/sparkmagic/livyclientlib/sessionmanager.py b/sparkmagic/sparkmagic/livyclientlib/sessionmanager.py index c4b25d70d..9a8d68dc0 100644 --- a/sparkmagic/sparkmagic/livyclientlib/sessionmanager.py +++ b/sparkmagic/sparkmagic/livyclientlib/sessionmanager.py @@ -51,6 +51,12 @@ def get_session_id_for_client(self, name): return self._sessions[name].id return None + def get_session_name_by_id_endpoint(self, id, endpoint): + for (name, session) in self._sessions.items(): + if session.id == int(id) and session.endpoint == endpoint: + return name + return None + def delete_client(self, name): self._remove_session(name) diff --git a/sparkmagic/sparkmagic/livyclientlib/sparkcontroller.py b/sparkmagic/sparkmagic/livyclientlib/sparkcontroller.py index 791a8ae75..6c3b6e8c0 100644 --- a/sparkmagic/sparkmagic/livyclientlib/sparkcontroller.py +++ b/sparkmagic/sparkmagic/livyclientlib/sparkcontroller.py @@ -64,12 +64,17 @@ def delete_session_by_name(self, name): self.session_manager.delete_client(name) def delete_session_by_id(self, endpoint, session_id): - http_client = self._http_client(endpoint) - response = http_client.get_session(session_id) - http_client = self._http_client(endpoint) - session = self._livy_session(http_client, {constants.LIVY_KIND_PARAM: response[constants.LIVY_KIND_PARAM]}, - self.ipython_display, session_id) - session.delete() + name = self.session_manager.get_session_name_by_id_endpoint(session_id, endpoint) + + if name in self.session_manager.get_sessions_list(): + self.delete_session_by_name(name) + else: + http_client = self._http_client(endpoint) + response = http_client.get_session(session_id) + http_client = self._http_client(endpoint) + session = self._livy_session(http_client, {constants.LIVY_KIND_PARAM: response[constants.LIVY_KIND_PARAM]}, + self.ipython_display, session_id) + session.delete() def add_session(self, name, endpoint, skip_if_exists, properties): if skip_if_exists and (name in self.session_manager.get_sessions_list()): diff --git a/sparkmagic/sparkmagic/tests/test_command.py b/sparkmagic/sparkmagic/tests/test_command.py index 84d3aaf16..60d44370e 100644 --- a/sparkmagic/sparkmagic/tests/test_command.py +++ b/sparkmagic/sparkmagic/tests/test_command.py @@ -33,12 +33,7 @@ def test_execute(): http_client.post_statement.return_value = tls.TestLivySession.post_statement_json http_client.get_session.return_value = tls.TestLivySession.ready_sessions_json http_client.get_statement.return_value = tls.TestLivySession.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) session = _create_session(kind=kind, http_client=http_client) - conf.override_all({}) session.start() command = Command("command", spark_events=spark_events) @@ -63,12 +58,7 @@ def test_execute_waiting(): http_client.post_statement.return_value = tls.TestLivySession.post_statement_json http_client.get_session.return_value = tls.TestLivySession.ready_sessions_json http_client.get_statement.side_effect = [tls.TestLivySession.waiting_statement_json, tls.TestLivySession.waiting_statement_json, tls.TestLivySession.ready_statement_json, tls.TestLivySession.ready_statement_json] - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) session = _create_session(kind=kind, http_client=http_client) - conf.override_all({}) session.start() command = Command("command", spark_events=spark_events) @@ -94,12 +84,7 @@ def test_execute_null_ouput(): http_client.post_statement.return_value = tls.TestLivySession.post_statement_json http_client.get_session.return_value = tls.TestLivySession.ready_sessions_json http_client.get_statement.return_value = tls.TestLivySession.ready_statement_null_output_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) session = _create_session(kind=kind, http_client=http_client) - conf.override_all({}) session.start() command = Command("command", spark_events=spark_events) @@ -125,12 +110,7 @@ def test_execute_failure_wait_for_session_emits_event(): http_client.post_statement.return_value = tls.TestLivySession.post_statement_json http_client.get_session.return_value = tls.TestLivySession.ready_sessions_json http_client.get_statement.return_value = tls.TestLivySession.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) session = _create_session(kind=kind, http_client=http_client) - conf.override_all({}) session.start() session.wait_for_idle = MagicMock(side_effect=ValueError("yo")) command = Command("command", spark_events=spark_events) @@ -153,13 +133,8 @@ def test_execute_failure_post_statement_emits_event(): kind = SESSION_KIND_SPARK http_client = MagicMock() http_client.get_statement.return_value = tls.TestLivySession.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) session = _create_session(kind=kind, http_client=http_client) session.wait_for_idle = MagicMock() - conf.override_all({}) session.start() session.wait_for_idle = MagicMock() command = Command("command", spark_events=spark_events) @@ -183,15 +158,10 @@ def test_execute_failure_get_statement_output_emits_event(): spark_events = MagicMock() kind = SESSION_KIND_SPARK http_client = MagicMock() - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) http_client.get_statement.return_value = tls.TestLivySession.ready_statement_json session = _create_session(kind=kind, http_client=http_client) session.wait_for_idle = MagicMock() - conf.override_all({}) session.start() session.wait_for_idle = MagicMock() command = Command("command", spark_events=spark_events) diff --git a/sparkmagic/sparkmagic/tests/test_configurableretrypolicy.py b/sparkmagic/sparkmagic/tests/test_configurableretrypolicy.py new file mode 100644 index 000000000..680b3303c --- /dev/null +++ b/sparkmagic/sparkmagic/tests/test_configurableretrypolicy.py @@ -0,0 +1,97 @@ +from nose.tools import assert_equals + +from sparkmagic.livyclientlib.configurableretrypolicy import ConfigurableRetryPolicy +import sparkmagic.utils.configuration as conf +from sparkmagic.livyclientlib.exceptions import BadUserConfigurationException + + +def test_with_empty_list(): + times = [] + max_retries = 5 + policy = ConfigurableRetryPolicy(times, max_retries) + + assert_equals(5, policy.seconds_to_sleep(0)) + assert_equals(5, policy.seconds_to_sleep(4)) + assert_equals(5, policy.seconds_to_sleep(5)) + assert_equals(5, policy.seconds_to_sleep(6)) + + # Check based on retry count + assert_equals(True, policy.should_retry(500, False, 0)) + assert_equals(True, policy.should_retry(500, False, 4)) + assert_equals(True, policy.should_retry(500, False, 5)) + assert_equals(False, policy.should_retry(500, False, 6)) + + # Check based on status code + assert_equals(False, policy.should_retry(201, False, 0)) + assert_equals(False, policy.should_retry(201, False, 6)) + + # Check based on error + assert_equals(True, policy.should_retry(201, True, 0)) + assert_equals(True, policy.should_retry(201, True, 6)) + + +def test_with_one_element_list(): + times = [2] + max_retries = 5 + policy = ConfigurableRetryPolicy(times, max_retries) + + assert_equals(2, policy.seconds_to_sleep(0)) + assert_equals(2, policy.seconds_to_sleep(4)) + assert_equals(2, policy.seconds_to_sleep(5)) + assert_equals(2, policy.seconds_to_sleep(6)) + + # Check based on retry count + assert_equals(True, policy.should_retry(500, False, 0)) + assert_equals(True, policy.should_retry(500, False, 4)) + assert_equals(True, policy.should_retry(500, False, 5)) + assert_equals(False, policy.should_retry(500, False, 6)) + + # Check based on status code + assert_equals(False, policy.should_retry(201, False, 0)) + assert_equals(False, policy.should_retry(201, False, 6)) + + # Check based on error + assert_equals(True, policy.should_retry(201, True, 0)) + assert_equals(True, policy.should_retry(201, True, 6)) + + +def test_with_default_values(): + times = conf.retry_seconds_to_sleep_list() + max_retries = conf.configurable_retry_policy_max_retries() + policy = ConfigurableRetryPolicy(times, max_retries) + + assert_equals(times[0], policy.seconds_to_sleep(0)) + assert_equals(times[0], policy.seconds_to_sleep(1)) + assert_equals(times[1], policy.seconds_to_sleep(2)) + assert_equals(times[2], policy.seconds_to_sleep(3)) + assert_equals(times[3], policy.seconds_to_sleep(4)) + assert_equals(times[4], policy.seconds_to_sleep(5)) + assert_equals(times[4], policy.seconds_to_sleep(6)) + assert_equals(times[4], policy.seconds_to_sleep(7)) + assert_equals(times[4], policy.seconds_to_sleep(8)) + assert_equals(times[4], policy.seconds_to_sleep(9)) + + # Check based on retry count + assert_equals(True, policy.should_retry(500, False, 0)) + assert_equals(True, policy.should_retry(500, False, 7)) + assert_equals(True, policy.should_retry(500, False, 8)) + assert_equals(False, policy.should_retry(500, False, 9)) + + # Check based on status code + assert_equals(False, policy.should_retry(201, False, 0)) + assert_equals(False, policy.should_retry(201, False, 9)) + + # Check based on error + assert_equals(True, policy.should_retry(201, True, 0)) + assert_equals(True, policy.should_retry(201, True, 9)) + + +def test_with_negative_values(): + times = [0.1, -1] + max_retries = 5 + + try: + policy = ConfigurableRetryPolicy(times, max_retries) + assert False + except BadUserConfigurationException: + assert True diff --git a/sparkmagic/sparkmagic/tests/test_configuration.py b/sparkmagic/sparkmagic/tests/test_configuration.py index 07fb432b2..52ca0e2f5 100644 --- a/sparkmagic/sparkmagic/tests/test_configuration.py +++ b/sparkmagic/sparkmagic/tests/test_configuration.py @@ -16,10 +16,10 @@ def test_configuration_override_base64_password(): kpc = { 'username': 'U', 'password': 'P', 'base64_password': 'cGFzc3dvcmQ=', 'url': 'L', "auth": AUTH_BASIC } overrides = { conf.kernel_python_credentials.__name__: kpc } conf.override_all(overrides) - conf.override(conf.status_sleep_seconds.__name__, 1) + conf.override(conf.livy_session_startup_timeout_seconds.__name__, 1) assert_equals(conf.d, { conf.kernel_python_credentials.__name__: kpc, - conf.status_sleep_seconds.__name__: 1 }) - assert_equals(conf.status_sleep_seconds(), 1) + conf.livy_session_startup_timeout_seconds.__name__: 1 }) + assert_equals(conf.livy_session_startup_timeout_seconds(), 1) assert_equals(conf.base64_kernel_python_credentials(), { 'username': 'U', 'password': 'password', 'url': 'L', 'auth': AUTH_BASIC }) @@ -44,10 +44,10 @@ def test_configuration_override_fallback_to_password(): kpc = { 'username': 'U', 'password': 'P', 'url': 'L', 'auth': NO_AUTH } overrides = { conf.kernel_python_credentials.__name__: kpc } conf.override_all(overrides) - conf.override(conf.status_sleep_seconds.__name__, 1) + conf.override(conf.livy_session_startup_timeout_seconds.__name__, 1) assert_equals(conf.d, { conf.kernel_python_credentials.__name__: kpc, - conf.status_sleep_seconds.__name__: 1 }) - assert_equals(conf.status_sleep_seconds(), 1) + conf.livy_session_startup_timeout_seconds.__name__: 1 }) + assert_equals(conf.livy_session_startup_timeout_seconds(), 1) assert_equals(conf.base64_kernel_python_credentials(), kpc) @@ -56,10 +56,10 @@ def test_configuration_override_work_with_empty_password(): kpc = { 'username': 'U', 'base64_password': '', 'password': '', 'url': '', 'auth': AUTH_BASIC } overrides = { conf.kernel_python_credentials.__name__: kpc } conf.override_all(overrides) - conf.override(conf.status_sleep_seconds.__name__, 1) + conf.override(conf.livy_session_startup_timeout_seconds.__name__, 1) assert_equals(conf.d, { conf.kernel_python_credentials.__name__: kpc, - conf.status_sleep_seconds.__name__: 1 }) - assert_equals(conf.status_sleep_seconds(), 1) + conf.livy_session_startup_timeout_seconds.__name__: 1 }) + assert_equals(conf.livy_session_startup_timeout_seconds(), 1) assert_equals(conf.base64_kernel_python_credentials(), { 'username': 'U', 'password': '', 'url': '', 'auth': AUTH_BASIC }) @@ -69,7 +69,7 @@ def test_configuration_raise_error_for_bad_base64_password(): kpc = { 'username': 'U', 'base64_password': 'P', 'url': 'L' } overrides = { conf.kernel_python_credentials.__name__: kpc } conf.override_all(overrides) - conf.override(conf.status_sleep_seconds.__name__, 1) + conf.override(conf.livy_session_startup_timeout_seconds.__name__, 1) conf.base64_kernel_python_credentials() diff --git a/sparkmagic/sparkmagic/tests/test_livyreliablehttpclient.py b/sparkmagic/sparkmagic/tests/test_livyreliablehttpclient.py index 018a830a1..4a54ecd4b 100644 --- a/sparkmagic/sparkmagic/tests/test_livyreliablehttpclient.py +++ b/sparkmagic/sparkmagic/tests/test_livyreliablehttpclient.py @@ -5,11 +5,14 @@ from sparkmagic.livyclientlib.endpoint import Endpoint import sparkmagic.utils.configuration as conf import sparkmagic.utils.constants as constants +from sparkmagic.livyclientlib.exceptions import BadUserConfigurationException +from sparkmagic.livyclientlib.configurableretrypolicy import ConfigurableRetryPolicy +from sparkmagic.livyclientlib.linearretrypolicy import LinearRetryPolicy def test_post_statement(): http_client = MagicMock() - livy_client = LivyReliableHttpClient(http_client) + livy_client = LivyReliableHttpClient(http_client, None) data = {"adlfj":"sadflkjsdf"} out = livy_client.post_statement(100, data) assert_equals(out, http_client.post.return_value.json.return_value) @@ -18,7 +21,7 @@ def test_post_statement(): def test_get_statement(): http_client = MagicMock() - livy_client = LivyReliableHttpClient(http_client) + livy_client = LivyReliableHttpClient(http_client, None) out = livy_client.get_statement(100, 4) assert_equals(out, http_client.get.return_value.json.return_value) http_client.get.assert_called_once_with("/sessions/100/statements/4", [200]) @@ -26,7 +29,7 @@ def test_get_statement(): def test_get_sessions(): http_client = MagicMock() - livy_client = LivyReliableHttpClient(http_client) + livy_client = LivyReliableHttpClient(http_client, None) out = livy_client.get_sessions() assert_equals(out, http_client.get.return_value.json.return_value) http_client.get.assert_called_once_with("/sessions", [200]) @@ -34,7 +37,7 @@ def test_get_sessions(): def test_post_session(): http_client = MagicMock() - livy_client = LivyReliableHttpClient(http_client) + livy_client = LivyReliableHttpClient(http_client, None) properties = {"adlfj":"sadflkjsdf", 1: [2,3,4,5]} out = livy_client.post_session(properties) assert_equals(out, http_client.post.return_value.json.return_value) @@ -43,7 +46,7 @@ def test_post_session(): def test_get_session(): http_client = MagicMock() - livy_client = LivyReliableHttpClient(http_client) + livy_client = LivyReliableHttpClient(http_client, None) out = livy_client.get_session(4) assert_equals(out, http_client.get.return_value.json.return_value) http_client.get.assert_called_once_with("/sessions/4", [200]) @@ -51,14 +54,14 @@ def test_get_session(): def test_delete_session(): http_client = MagicMock() - livy_client = LivyReliableHttpClient(http_client) + livy_client = LivyReliableHttpClient(http_client, None) livy_client.delete_session(99) http_client.delete.assert_called_once_with("/sessions/99", [200, 404]) def test_get_all_session_logs(): http_client = MagicMock() - livy_client = LivyReliableHttpClient(http_client) + livy_client = LivyReliableHttpClient(http_client, None) out = livy_client.get_all_session_logs(42) assert_equals(out, http_client.get.return_value.json.return_value) http_client.get.assert_called_once_with("/sessions/42/log?from=0", [200]) @@ -74,3 +77,33 @@ def test_custom_headers(): assert_equals(len(headers), 2) assert_equals("Content-Type" in headers, True) assert_equals("header1" in headers, True) + + +def test_retry_policy(): + # Default is configurable retry + times = conf.retry_seconds_to_sleep_list() + max_retries = conf.configurable_retry_policy_max_retries() + policy = LivyReliableHttpClient._get_retry_policy() + assert type(policy) is ConfigurableRetryPolicy + assert_equals(times, policy.retry_seconds_to_sleep_list) + assert_equals(max_retries, policy.max_retries) + + # Configure to linear retry + _override_policy(constants.LINEAR_RETRY) + policy = LivyReliableHttpClient._get_retry_policy() + assert type(policy) is LinearRetryPolicy + assert_equals(5, policy.seconds_to_sleep(1)) + assert_equals(5, policy.max_retries) + + # Configure to something invalid + _override_policy("garbage") + try: + policy = LivyReliableHttpClient._get_retry_policy() + assert False + except BadUserConfigurationException: + assert True + + +def _override_policy(policy): + overrides = { conf.retry_policy.__name__: policy } + conf.override_all(overrides) diff --git a/sparkmagic/sparkmagic/tests/test_livysession.py b/sparkmagic/sparkmagic/tests/test_livysession.py index a189d680f..cb4cdd1bc 100644 --- a/sparkmagic/sparkmagic/tests/test_livysession.py +++ b/sparkmagic/sparkmagic/tests/test_livysession.py @@ -96,24 +96,6 @@ def _create_session_with_fixed_get_response(self, get_session_json): session.start() return session - @raises(AssertionError) - def test_constructor_throws_status_sleep_seconds(self): - conf.override_all({ - "status_sleep_seconds": 0, - "statement_sleep_seconds": 2 - }) - self._create_session() - conf.override_all({}) - - @raises(AssertionError) - def test_constructor_throws_statement_sleep_seconds(self): - conf.override_all({ - "status_sleep_seconds": 3, - "statement_sleep_seconds": 0 - }) - self._create_session() - conf.override_all({}) - def test_doesnt_do_anything_or_create_sql_context_automatically(self): # If the session object does anything (attempts to create a session or run # a statement), the http_client will fail @@ -121,13 +103,8 @@ def test_doesnt_do_anything_or_create_sql_context_automatically(self): self._create_session() def test_constructor_starts_with_existing_session(self): - conf.override_all({ - "status_sleep_seconds": 4, - "statement_sleep_seconds": 2 - }) session_id = 1 session = self._create_session(session_id=session_id, heartbeat_timeout=0) - conf.override_all({}) assert session.id == session_id assert session._heartbeat_thread is None @@ -135,8 +112,6 @@ def test_constructor_starts_with_existing_session(self): def test_constructor_starts_heartbeat_with_existing_session(self): conf.override_all({ - "status_sleep_seconds": 4, - "statement_sleep_seconds": 2, "heartbeat_refresh_seconds": 0.1 }) session_id = 1 @@ -154,14 +129,9 @@ def test_start_with_heartbeat(self): self.http_client.get_session.return_value = self.ready_sessions_json self.http_client.get_statement.return_value = self.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01, - }) session = self._create_session() session.start() - conf.override_all({}) - + assert self.heartbeat_thread.daemon self.heartbeat_thread.start.assert_called_once_with() assert not session._heartbeat_thread is None @@ -172,15 +142,10 @@ def test_start_with_heartbeat_calls_only_once(self): self.http_client.get_session.return_value = self.ready_sessions_json self.http_client.get_statement.return_value = self.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) session = self._create_session() session.start() session.start() session.start() - conf.override_all({}) assert self.heartbeat_thread.daemon self.heartbeat_thread.start.assert_called_once_with() @@ -191,13 +156,8 @@ def test_delete_with_heartbeat(self): self.http_client.get_session.return_value = self.ready_sessions_json self.http_client.get_statement.return_value = self.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) session = self._create_session() session.start() - conf.override_all({}) heartbeat_thread = session._heartbeat_thread session.delete() @@ -206,22 +166,12 @@ def test_delete_with_heartbeat(self): assert session._heartbeat_thread is None def test_constructor_starts_with_no_session(self): - conf.override_all({ - "status_sleep_seconds": 4, - "statement_sleep_seconds": 2 - }) session = self._create_session() - conf.override_all({}) assert session.id == -1 def test_is_final_status(self): - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) session = self._create_session() - conf.override_all({}) assert not session.is_final_status("idle") assert not session.is_final_status("starting") @@ -235,14 +185,9 @@ def test_start_scala_starts_session(self): self.http_client.get_session.return_value = self.ready_sessions_json self.http_client.get_statement.return_value = self.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) kind = constants.SESSION_KIND_SPARK session = self._create_session(kind=kind) session.start() - conf.override_all({}) assert_equals(kind, session.kind) assert_equals("idle", session.status) @@ -254,14 +199,9 @@ def test_start_r_starts_session(self): self.http_client.get_session.return_value = self.ready_sessions_json self.http_client.get_statement.return_value = self.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) kind = constants.SESSION_KIND_SPARKR session = self._create_session(kind=kind) session.start() - conf.override_all({}) assert_equals(kind, session.kind) assert_equals("idle", session.status) @@ -273,14 +213,9 @@ def test_start_python_starts_session(self): self.http_client.get_session.return_value = self.ready_sessions_json self.http_client.get_statement.return_value = self.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) kind = constants.SESSION_KIND_PYSPARK session = self._create_session(kind=kind) session.start() - conf.override_all({}) assert_equals(kind, session.kind) assert_equals("idle", session.status) @@ -292,17 +227,12 @@ def test_start_passes_in_all_properties(self): self.http_client.get_session.return_value = self.ready_sessions_json self.http_client.get_statement.return_value = self.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) kind = constants.SESSION_KIND_SPARK properties = {"kind": kind, "extra": 1} ipython_display = MagicMock() session = LivySession(self.http_client, properties, ipython_display) session.start() - conf.override_all({}) self.http_client.post_session.assert_called_with(properties) @@ -310,12 +240,7 @@ def test_status_gets_latest_status(self): self.http_client.post_session.return_value = self.session_create_json self.http_client.get_session.return_value = self.ready_sessions_json self.http_client.get_statement.return_value = self.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) session = self._create_session() - conf.override_all({}) session.start() session.refresh_status_and_info() @@ -329,12 +254,7 @@ def test_logs_gets_latest_logs(self): self.http_client.get_session.return_value = self.ready_sessions_json self.http_client.get_all_session_logs.return_value = self.log_json self.http_client.get_statement.return_value = self.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) session = self._create_session() - conf.override_all({}) session.start() logs = session.get_logs() @@ -351,12 +271,7 @@ def test_wait_for_idle_returns_when_in_state(self): self.http_client.get_session.side_effect = self._next_session_response_get self.http_client.get_statement.return_value = self.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) session = self._create_session() - conf.override_all({}) session.get_row_html = MagicMock() session.get_row_html.return_value = u"""row1""" @@ -377,12 +292,7 @@ def test_wait_for_idle_prints_resource_limit_message(self): self.http_client.get_statement.return_value = self.ready_statement_json self.http_client.get_all_session_logs.return_value = self.log_json - conf.override_all({ - "status_sleep_seconds": 0.011, - "statement_sleep_seconds": 6000 - }) session = self._create_session() - conf.override_all({}) session.get_row_html = MagicMock() session.get_row_html.return_value = u"""row1""" @@ -401,12 +311,7 @@ def test_wait_for_idle_throws_when_in_final_status(self): self.http_client.get_session.side_effect = self._next_session_response_get self.http_client.get_all_session_logs.return_value = self.log_json - conf.override_all({ - "status_sleep_seconds": 0.011, - "statement_sleep_seconds": 6000 - }) session = self._create_session() - conf.override_all({}) session.get_row_html = MagicMock() session.get_row_html.return_value = u"""row1""" @@ -425,12 +330,7 @@ def test_wait_for_idle_times_out(self): self.http_client.get_session.side_effect = self._next_session_response_get self.http_client.get_statement.return_value = self.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.011, - "statement_sleep_seconds": 6000 - }) session = self._create_session() - conf.override_all({}) session.get_row_html = MagicMock() session.get_row_html.return_value = u"""row1""" @@ -442,12 +342,7 @@ def test_delete_session_when_active(self): self.http_client.post_session.return_value = self.session_create_json self.http_client.get_session.return_value = self.ready_sessions_json self.http_client.get_statement.return_value = self.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) session = self._create_session() - conf.override_all({}) session.start() session.delete() @@ -456,12 +351,7 @@ def test_delete_session_when_active(self): def test_delete_session_when_not_started(self): self.http_client.post_session.return_value = self.session_create_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) session = self._create_session() - conf.override_all({}) session.delete() @@ -469,12 +359,7 @@ def test_delete_session_when_not_started(self): def test_delete_session_when_dead_throws(self): self.http_client.post.return_value = self.session_create_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) session = self._create_session() - conf.override_all({}) session.status = "dead" session.delete() @@ -486,14 +371,9 @@ def test_start_emits_start_end_session(self): self.http_client.get_session.return_value = self.ready_sessions_json self.http_client.get_statement.return_value = self.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) kind = constants.SESSION_KIND_SPARK session = self._create_session(kind=kind) session.start() - conf.override_all({}) self.spark_events.emit_session_creation_start_event.assert_called_once_with(session.guid, kind) self.spark_events.emit_session_creation_end_event.assert_called_once_with( @@ -503,10 +383,6 @@ def test_start_emits_start_end_failed_session_when_bad_status(self): self.http_client.post_session.side_effect = ValueError self.http_client.get_session.return_value = self.ready_sessions_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) kind = constants.SESSION_KIND_SPARK session = self._create_session(kind=kind) @@ -516,8 +392,6 @@ def test_start_emits_start_end_failed_session_when_bad_status(self): except ValueError: pass - conf.override_all({}) - self.spark_events.emit_session_creation_start_event.assert_called_once_with(session.guid, kind) self.spark_events.emit_session_creation_end_event.assert_called_once_with( session.guid, kind, session.id, session.status, False, "ValueError", "") @@ -526,10 +400,6 @@ def test_start_emits_start_end_failed_session_when_wait_for_idle_throws(self): self.http_client.post_session.return_value = self.session_create_json self.http_client.get_session.return_value = self.ready_sessions_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) kind = constants.SESSION_KIND_SPARK session = self._create_session(kind=kind) session.wait_for_idle = MagicMock(side_effect=ValueError) @@ -540,8 +410,6 @@ def test_start_emits_start_end_failed_session_when_wait_for_idle_throws(self): except ValueError: pass - conf.override_all({}) - self.spark_events.emit_session_creation_start_event.assert_called_once_with(session.guid, kind) self.spark_events.emit_session_creation_end_event.assert_called_once_with( session.guid, kind, session.id, session.status, False, "ValueError", "") @@ -551,12 +419,7 @@ def test_delete_session_emits_start_end(self): self.http_client.get_session.return_value = self.ready_sessions_json self.http_client.get_statement.return_value = self.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) session = self._create_session() - conf.override_all({}) session.start() end_id = session.id end_status = constants.BUSY_SESSION_STATUS @@ -575,12 +438,7 @@ def test_delete_session_emits_start_failed_end_when_delete_throws(self): self.http_client.get_session.return_value = self.ready_sessions_json self.http_client.get_statement.return_value = self.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) session = self._create_session() - conf.override_all({}) session.start() session.id = 0 end_id = session.id @@ -602,12 +460,7 @@ def test_delete_session_emits_start_failed_end_when_in_bad_state(self): self.http_client.get_session.return_value = self.ready_sessions_json self.http_client.get_statement.return_value = self.ready_statement_json - conf.override_all({ - "status_sleep_seconds": 0.01, - "statement_sleep_seconds": 0.01 - }) session = self._create_session() - conf.override_all({}) session.start() session.id = 0 end_id = session.id diff --git a/sparkmagic/sparkmagic/tests/test_sessionmanager.py b/sparkmagic/sparkmagic/tests/test_sessionmanager.py index 97f887b54..028f67bc6 100644 --- a/sparkmagic/sparkmagic/tests/test_sessionmanager.py +++ b/sparkmagic/sparkmagic/tests/test_sessionmanager.py @@ -1,4 +1,4 @@ -from mock import MagicMock +from mock import MagicMock, PropertyMock from nose.tools import raises, assert_equals from sparkmagic.livyclientlib.exceptions import SessionManagementException @@ -107,6 +107,24 @@ def test_get_session_id_for_client(): assert id is not None +def test_get_session_name_by_id_endpoint(): + manager = SessionManager() + id_to_search = "0" + endpoint_to_search = "endpoint" + name_to_search = "name" + + name = manager.get_session_name_by_id_endpoint(id_to_search, endpoint_to_search) + assert_equals(None, name) + + session = MagicMock() + type(session).id = PropertyMock(return_value=int(id_to_search)) + session.endpoint = endpoint_to_search + + manager.add_session(name_to_search, session) + name = manager.get_session_name_by_id_endpoint(id_to_search, endpoint_to_search) + assert_equals(name_to_search, name) + + def test_get_session_id_for_client_not_there(): manager = SessionManager() manager.get_sessions_list = MagicMock(return_value=[]) diff --git a/sparkmagic/sparkmagic/tests/test_sparkcontroller.py b/sparkmagic/sparkmagic/tests/test_sparkcontroller.py index 96118ac0d..ab2f17eec 100644 --- a/sparkmagic/sparkmagic/tests/test_sparkcontroller.py +++ b/sparkmagic/sparkmagic/tests/test_sparkcontroller.py @@ -156,7 +156,7 @@ def test_cleanup_endpoint(): @with_setup(_setup, _teardown) -def test_delete_session_by_id_existent(): +def test_delete_session_by_id_existent_non_managed(): http_client = MagicMock() http_client.get_session.return_value = json.loads('{"id":0,"state":"starting","kind":"spark","log":[]}') controller._http_client = MagicMock(return_value=http_client) @@ -169,6 +169,18 @@ def test_delete_session_by_id_existent(): session.delete.assert_called_once_with() +@with_setup(_setup, _teardown) +def test_delete_session_by_id_existent_managed(): + name = "name" + controller.session_manager.get_session_name_by_id_endpoint = MagicMock(return_value=name) + controller.session_manager.get_sessions_list = MagicMock(return_value=[name]) + controller.delete_session_by_name = MagicMock() + + controller.delete_session_by_id("conn_str", 0) + + controller.delete_session_by_name.assert_called_once_with(name) + + @with_setup(_setup, _teardown) @raises(HttpClientException) def test_delete_session_by_id_non_existent(): diff --git a/sparkmagic/sparkmagic/utils/configuration.py b/sparkmagic/sparkmagic/utils/configuration.py index e320515d9..85bc80c07 100644 --- a/sparkmagic/sparkmagic/utils/configuration.py +++ b/sparkmagic/sparkmagic/utils/configuration.py @@ -10,7 +10,7 @@ from .constants import HOME_PATH, CONFIG_FILE, MAGICS_LOGGER_NAME, LIVY_KIND_PARAM, \ LANG_SCALA, LANG_PYTHON, LANG_PYTHON3, LANG_R, \ - SESSION_KIND_SPARKR, SESSION_KIND_SPARK, SESSION_KIND_PYSPARK, SESSION_KIND_PYSPARK3 + SESSION_KIND_SPARKR, SESSION_KIND_SPARK, SESSION_KIND_PYSPARK, SESSION_KIND_PYSPARK3, CONFIGURABLE_RETRY from sparkmagic.livyclientlib.exceptions import BadUserConfigurationException import sparkmagic.utils.constants as constants @@ -134,16 +134,6 @@ def events_handler_class(): return EVENTS_HANDLER_CLASS_NAME -@_with_override -def status_sleep_seconds(): - return 2 - - -@_with_override -def statement_sleep_seconds(): - return 2 - - @_with_override def wait_for_idle_timeout_seconds(): return 15 @@ -225,6 +215,28 @@ def server_extension_default_kernel_name(): return "pysparkkernel" +@_with_override +def custom_headers(): + return {} + + +@_with_override +def retry_policy(): + return CONFIGURABLE_RETRY + + +@_with_override +def retry_seconds_to_sleep_list(): + return [0.2, 0.5, 1, 3, 5] + + +@_with_override +def configurable_retry_policy_max_retries(): + # Sum of default values is ~10 seconds. + # Plus 15 seconds more wanted, that's 3 more 5 second retries. + return 8 + + def _credentials_override(f): """Provides special handling for credentials. It still calls _override(). If 'base64_password' in config is set, it will base64 decode it and returned in return value's 'password' field. @@ -243,8 +255,3 @@ def _credentials_override(f): if base64_decoded_credentials['auth'] is None: base64_decoded_credentials['auth'] = get_auth_value(base64_decoded_credentials['username'], base64_decoded_credentials['password']) return base64_decoded_credentials - - -@_with_override -def custom_headers(): - return {} diff --git a/sparkmagic/sparkmagic/utils/constants.py b/sparkmagic/sparkmagic/utils/constants.py index 5034ab88d..00ab9e453 100644 --- a/sparkmagic/sparkmagic/utils/constants.py +++ b/sparkmagic/sparkmagic/utils/constants.py @@ -94,3 +94,6 @@ AUTH_KERBEROS = "Kerberos" AUTH_BASIC = "Basic_Access" AUTHS_SUPPORTED = [NO_AUTH, AUTH_KERBEROS, AUTH_BASIC] + +CONFIGURABLE_RETRY = "configurable" +LINEAR_RETRY = "linear"