diff --git a/samples/refresh.py b/samples/refresh.py index ec0cdbab4..7b2618b6e 100644 --- a/samples/refresh.py +++ b/samples/refresh.py @@ -39,16 +39,19 @@ def main(): resource = server.workbooks.get_by_id(args.resource_id) # trigger the refresh, you'll get a job id back which can be used to poll for when the refresh is done - results = server.workbooks.refresh(args.resource_id) + job = server.workbooks.refresh(args.resource_id) else: # Get the datasource by its Id to make sure it exists resource = server.datasources.get_by_id(args.resource_id) # trigger the refresh, you'll get a job id back which can be used to poll for when the refresh is done - results = server.datasources.refresh(resource) - - print(results) - # TODO: Add a flag that will poll and wait for the returned job to be done + job = server.datasources.refresh(resource) + + print(f"Update job posted (ID: {job.id})") + print("Waiting for job...") + # `wait_for_job` will throw if the job isn't executed successfully + job = server.jobs.wait_for_job(job) + print("Job finished succesfully") if __name__ == '__main__': diff --git a/samples/update_datasource_data.py b/samples/update_datasource_data.py index 3633ebaf6..74c8ea6fb 100644 --- a/samples/update_datasource_data.py +++ b/samples/update_datasource_data.py @@ -67,8 +67,12 @@ def main(): job = server.datasources.update_hyper_data(args.datasource_id, request_id=request_id, actions=actions) - # TODO: Add a flag that will poll and wait for the returned job to be done - print(job) + print(f"Update job posted (ID: {job.id})") + print("Waiting for job...") + # `wait_for_job` will throw if the job isn't executed successfully + job = server.jobs.wait_for_job(job) + print("Job finished succesfully") + if __name__ == '__main__': main() diff --git a/tableauserverclient/exponential_backoff.py b/tableauserverclient/exponential_backoff.py new file mode 100644 index 000000000..2b3ded109 --- /dev/null +++ b/tableauserverclient/exponential_backoff.py @@ -0,0 +1,30 @@ +import time + +# Polling for server-side events (such as job completion) uses exponential backoff for the sleep intervals between polls +ASYNC_POLL_MIN_INTERVAL=0.5 +ASYNC_POLL_MAX_INTERVAL=30 +ASYNC_POLL_BACKOFF_FACTOR=1.4 + + +class ExponentialBackoffTimer(): + def __init__(self, *, timeout=None): + self.start_time = time.time() + self.timeout = timeout + self.current_sleep_interval = ASYNC_POLL_MIN_INTERVAL + + def sleep(self): + max_sleep_time = ASYNC_POLL_MAX_INTERVAL + if self.timeout is not None: + elapsed = (time.time() - self.start_time) + if elapsed >= self.timeout: + raise TimeoutError(f"Timeout after {elapsed} seconds waiting for asynchronous event") + remaining_time = self.timeout - elapsed + # Usually, we would sleep for `ASYNC_POLL_MAX_INTERVAL`, but we don't want to sleep over the timeout + max_sleep_time = min(ASYNC_POLL_MAX_INTERVAL, remaining_time) + # We want to sleep at least for `ASYNC_POLL_MIN_INTERVAL`. This is important to ensure that, as we get + # closer to the timeout, we don't accidentally wake up multiple times and hit the server in rapid succession + # due to waking up to early from the `sleep`. + max_sleep_time = max(max_sleep_time, ASYNC_POLL_MIN_INTERVAL) + + time.sleep(min(self.current_sleep_interval, max_sleep_time)) + self.current_sleep_interval *= ASYNC_POLL_BACKOFF_FACTOR \ No newline at end of file diff --git a/tableauserverclient/models/job_item.py b/tableauserverclient/models/job_item.py index 7a3a50861..2a8b6b509 100644 --- a/tableauserverclient/models/job_item.py +++ b/tableauserverclient/models/job_item.py @@ -3,6 +3,16 @@ class JobItem(object): + class FinishCode: + """ + Status codes as documented on + https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_ref_jobs_tasks_and_schedules.htm#query_job + """ + Success = 0 + Failed = 1 + Cancelled = 2 + + def __init__( self, id_, @@ -89,7 +99,7 @@ def _parse_element(cls, element, ns): created_at = parse_datetime(element.get("createdAt", None)) started_at = parse_datetime(element.get("startedAt", None)) completed_at = parse_datetime(element.get("completedAt", None)) - finish_code = element.get("finishCode", -1) + finish_code = int(element.get("finishCode", -1)) notes = [note.text for note in element.findall(".//t:notes", namespaces=ns)] or None mode = element.get("mode", None) return cls( diff --git a/tableauserverclient/server/endpoint/exceptions.py b/tableauserverclient/server/endpoint/exceptions.py index 9a9a81d77..693817ddc 100644 --- a/tableauserverclient/server/endpoint/exceptions.py +++ b/tableauserverclient/server/endpoint/exceptions.py @@ -64,3 +64,16 @@ def __str__(self): from pprint import pformat return pformat(self.error) + + +class JobFailedException(Exception): + def __init__(self, job): + self.notes = job.notes + self.job = job + + def __str__(self): + return f"Job {self.job.id} failed with notes {self.notes}" + + +class JobCanceledException(JobFailedException): + pass diff --git a/tableauserverclient/server/endpoint/jobs_endpoint.py b/tableauserverclient/server/endpoint/jobs_endpoint.py index 6079ca788..906d4a19e 100644 --- a/tableauserverclient/server/endpoint/jobs_endpoint.py +++ b/tableauserverclient/server/endpoint/jobs_endpoint.py @@ -1,6 +1,8 @@ from .endpoint import Endpoint, api +from .exceptions import JobCanceledException, JobFailedException from .. import JobItem, BackgroundJobItem, PaginationItem from ..request_options import RequestOptionsBase +from ...exponential_backoff import ExponentialBackoffTimer import logging @@ -12,7 +14,6 @@ logger = logging.getLogger("tableau.endpoint.jobs") - class Jobs(Endpoint): @property def baseurl(self): @@ -48,3 +49,27 @@ def get_by_id(self, job_id): server_response = self.get_request(url) new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0] return new_job + + def wait_for_job(self, job_id, *, timeout=None): + if isinstance(job_id, JobItem): + job_id = job_id.id + assert isinstance(job_id, str) + logger.debug(f"Waiting for job {job_id}") + + backoffTimer = ExponentialBackoffTimer(timeout=timeout) + job = self.get_by_id(job_id) + while job.completed_at is None: + backoffTimer.sleep() + job = self.get_by_id(job_id) + logger.debug(f"\tJob {job_id} progress={job.progress}") + + logger.info("Job {} Completed: Finish Code: {} - Notes:{}".format(job_id, job.finish_code, job.notes)) + + if job.finish_code == JobItem.FinishCode.Success: + return job + elif job.finish_code == JobItem.FinishCode.Failed: + raise JobFailedException(job) + elif job.finish_code == JobItem.FinishCode.Cancelled: + raise JobCanceledException(job) + else: + raise AssertionError("Unexpected finish_code in job", job) diff --git a/test/_utils.py b/test/_utils.py index ecabf53a4..93d7a9334 100644 --- a/test/_utils.py +++ b/test/_utils.py @@ -1,3 +1,5 @@ +from contextlib import contextmanager +import unittest import os.path TEST_ASSET_DIR = os.path.join(os.path.dirname(__file__), 'assets') @@ -14,3 +16,19 @@ def read_xml_asset(filename): def read_xml_assets(*args): return map(read_xml_asset, args) + + +@contextmanager +def mocked_time(): + mock_time = 0 + + def sleep_mock(interval): + nonlocal mock_time + mock_time += interval + + def get_time(): + return mock_time + + patch = unittest.mock.patch + with patch("time.sleep", sleep_mock), patch("time.time", get_time): + yield get_time diff --git a/test/test_datasource.py b/test/test_datasource.py index 68d6d1384..4c65e8dc9 100644 --- a/test/test_datasource.py +++ b/test/test_datasource.py @@ -317,7 +317,7 @@ def test_publish_async(self): self.assertEqual('PublishDatasource', new_job.type) self.assertEqual('0', new_job.progress) self.assertEqual('2018-06-30T00:54:54Z', format_datetime(new_job.created_at)) - self.assertEqual('1', new_job.finish_code) + self.assertEqual(1, new_job.finish_code) def test_publish_unnamed_file_object(self): new_datasource = TSC.DatasourceItem('test') diff --git a/test/test_exponential_backoff.py b/test/test_exponential_backoff.py new file mode 100644 index 000000000..57229d4ce --- /dev/null +++ b/test/test_exponential_backoff.py @@ -0,0 +1,62 @@ +import unittest +from ._utils import mocked_time +from tableauserverclient.exponential_backoff import ExponentialBackoffTimer + + +class ExponentialBackoffTests(unittest.TestCase): + def test_exponential(self): + with mocked_time() as mock_time: + exponentialBackoff = ExponentialBackoffTimer() + # The creation of our mock shouldn't sleep + self.assertAlmostEqual(mock_time(), 0) + # The first sleep sleeps for a rather short time, the following sleeps become longer + exponentialBackoff.sleep() + self.assertAlmostEqual(mock_time(), 0.5) + exponentialBackoff.sleep() + self.assertAlmostEqual(mock_time(), 1.2) + exponentialBackoff.sleep() + self.assertAlmostEqual(mock_time(), 2.18) + exponentialBackoff.sleep() + self.assertAlmostEqual(mock_time(), 3.552) + exponentialBackoff.sleep() + self.assertAlmostEqual(mock_time(), 5.4728) + + + def test_exponential_saturation(self): + with mocked_time() as mock_time: + exponentialBackoff = ExponentialBackoffTimer() + for _ in range(99): + exponentialBackoff.sleep() + # We don't increase the sleep time above 30 seconds. + # Otherwise, the exponential sleep time could easily + # reach minutes or even hours between polls + for _ in range(5): + s = mock_time() + exponentialBackoff.sleep() + slept = mock_time() - s + self.assertAlmostEqual(slept, 30) + + + def test_timeout(self): + with mocked_time() as mock_time: + exponentialBackoff = ExponentialBackoffTimer(timeout=4.5) + for _ in range(4): + exponentialBackoff.sleep() + self.assertAlmostEqual(mock_time(), 3.552) + # Usually, the following sleep would sleep until 5.5, but due to + # the timeout we wait less; thereby we make sure to take the timeout + # into account as good as possible + exponentialBackoff.sleep() + self.assertAlmostEqual(mock_time(), 4.5) + # The next call to `sleep` will raise a TimeoutError + with self.assertRaises(TimeoutError): + exponentialBackoff.sleep() + + + def test_timeout_zero(self): + with mocked_time() as mock_time: + # The construction of the timer doesn't throw, yet + exponentialBackoff = ExponentialBackoffTimer(timeout = 0) + # But the first `sleep` immediately throws + with self.assertRaises(TimeoutError): + exponentialBackoff.sleep() diff --git a/test/test_job.py b/test/test_job.py index 08b98b815..70bca996c 100644 --- a/test/test_job.py +++ b/test/test_job.py @@ -4,12 +4,16 @@ import requests_mock import tableauserverclient as TSC from tableauserverclient.datetime_helpers import utc -from ._utils import read_xml_asset +from tableauserverclient.server.endpoint.exceptions import JobFailedException +from ._utils import read_xml_asset, mocked_time TEST_ASSET_DIR = os.path.join(os.path.dirname(__file__), 'assets') GET_XML = 'job_get.xml' GET_BY_ID_XML = 'job_get_by_id.xml' +GET_BY_ID_FAILED_XML = 'job_get_by_id_failed.xml' +GET_BY_ID_CANCELLED_XML = 'job_get_by_id_cancelled.xml' +GET_BY_ID_INPROGRESS_XML = 'job_get_by_id_inprogress.xml' class JobTests(unittest.TestCase): @@ -49,9 +53,6 @@ def test_get_by_id(self): m.get('{0}/{1}'.format(self.baseurl, job_id), text=response_xml) job = self.server.jobs.get_by_id(job_id) - created_at = datetime(2020, 5, 13, 20, 23, 45, tzinfo=utc) - updated_at = datetime(2020, 5, 13, 20, 25, 18, tzinfo=utc) - ended_at = datetime(2020, 5, 13, 20, 25, 18, tzinfo=utc) self.assertEqual(job_id, job.id) self.assertListEqual(job.notes, ['Job detail notes']) @@ -72,3 +73,35 @@ def test_cancel_item(self): with requests_mock.mock() as m: m.put(self.baseurl + '/ee8c6e70-43b6-11e6-af4f-f7b0d8e20760', status_code=204) self.server.jobs.cancel(job) + + + def test_wait_for_job_finished(self): + # Waiting for an already finished job, directly returns that job's info + response_xml = read_xml_asset(GET_BY_ID_XML) + job_id = '2eef4225-aa0c-41c4-8662-a76d89ed7336' + with mocked_time(), requests_mock.mock() as m: + m.get('{0}/{1}'.format(self.baseurl, job_id), text=response_xml) + job = self.server.jobs.wait_for_job(job_id) + + self.assertEqual(job_id, job.id) + self.assertListEqual(job.notes, ['Job detail notes']) + + + def test_wait_for_job_failed(self): + # Waiting for a failed job raises an exception + response_xml = read_xml_asset(GET_BY_ID_FAILED_XML) + job_id = '77d5e57a-2517-479f-9a3c-a32025f2b64d' + with mocked_time(), requests_mock.mock() as m: + m.get('{0}/{1}'.format(self.baseurl, job_id), text=response_xml) + with self.assertRaises(JobFailedException): + self.server.jobs.wait_for_job(job_id) + + + def test_wait_for_job_timeout(self): + # Waiting for a job which doesn't terminate will throw an exception + response_xml = read_xml_asset(GET_BY_ID_INPROGRESS_XML) + job_id = '77d5e57a-2517-479f-9a3c-a32025f2b64d' + with mocked_time(), requests_mock.mock() as m: + m.get('{0}/{1}'.format(self.baseurl, job_id), text=response_xml) + with self.assertRaises(TimeoutError): + self.server.jobs.wait_for_job(job_id, timeout=30) diff --git a/test/test_workbook.py b/test/test_workbook.py index d3a3b59b4..459b1f905 100644 --- a/test/test_workbook.py +++ b/test/test_workbook.py @@ -616,7 +616,7 @@ def test_publish_async(self): self.assertEqual('PublishWorkbook', new_job.type) self.assertEqual('0', new_job.progress) self.assertEqual('2018-06-29T23:22:32Z', format_datetime(new_job.created_at)) - self.assertEqual('1', new_job.finish_code) + self.assertEqual(1, new_job.finish_code) def test_publish_invalid_file(self): new_workbook = TSC.WorkbookItem('test', 'ee8c6e70-43b6-11e6-af4f-f7b0d8e20760')