Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions samples/refresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,19 @@ def main():
resource = server.workbooks.get_by_id(args.resource_id)

# trigger the refresh, you'll get a job id back which can be used to poll for when the refresh is done
results = server.workbooks.refresh(args.resource_id)
job = server.workbooks.refresh(args.resource_id)
else:
# Get the datasource by its Id to make sure it exists
resource = server.datasources.get_by_id(args.resource_id)

# trigger the refresh, you'll get a job id back which can be used to poll for when the refresh is done
results = server.datasources.refresh(resource)

print(results)
# TODO: Add a flag that will poll and wait for the returned job to be done
job = server.datasources.refresh(resource)

print(f"Update job posted (ID: {job.id})")
print("Waiting for job...")
# `wait_for_job` will throw if the job isn't executed successfully
job = server.jobs.wait_for_job(job)
print("Job finished succesfully")


if __name__ == '__main__':
Expand Down
8 changes: 6 additions & 2 deletions samples/update_datasource_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,12 @@ def main():

job = server.datasources.update_hyper_data(args.datasource_id, request_id=request_id, actions=actions)

# TODO: Add a flag that will poll and wait for the returned job to be done
print(job)
print(f"Update job posted (ID: {job.id})")
print("Waiting for job...")
# `wait_for_job` will throw if the job isn't executed successfully
job = server.jobs.wait_for_job(job)
print("Job finished succesfully")


if __name__ == '__main__':
main()
30 changes: 30 additions & 0 deletions tableauserverclient/exponential_backoff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import time

# Polling for server-side events (such as job completion) uses exponential backoff for the sleep intervals between polls
ASYNC_POLL_MIN_INTERVAL=0.5
ASYNC_POLL_MAX_INTERVAL=30
ASYNC_POLL_BACKOFF_FACTOR=1.4


class ExponentialBackoffTimer():
def __init__(self, *, timeout=None):
self.start_time = time.time()
self.timeout = timeout
self.current_sleep_interval = ASYNC_POLL_MIN_INTERVAL

def sleep(self):
max_sleep_time = ASYNC_POLL_MAX_INTERVAL
if self.timeout is not None:
elapsed = (time.time() - self.start_time)
if elapsed >= self.timeout:
raise TimeoutError(f"Timeout after {elapsed} seconds waiting for asynchronous event")
remaining_time = self.timeout - elapsed
# Usually, we would sleep for `ASYNC_POLL_MAX_INTERVAL`, but we don't want to sleep over the timeout
max_sleep_time = min(ASYNC_POLL_MAX_INTERVAL, remaining_time)
# We want to sleep at least for `ASYNC_POLL_MIN_INTERVAL`. This is important to ensure that, as we get
# closer to the timeout, we don't accidentally wake up multiple times and hit the server in rapid succession
# due to waking up to early from the `sleep`.
max_sleep_time = max(max_sleep_time, ASYNC_POLL_MIN_INTERVAL)

time.sleep(min(self.current_sleep_interval, max_sleep_time))
self.current_sleep_interval *= ASYNC_POLL_BACKOFF_FACTOR
12 changes: 11 additions & 1 deletion tableauserverclient/models/job_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@


class JobItem(object):
class FinishCode:
"""
Status codes as documented on
https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_ref_jobs_tasks_and_schedules.htm#query_job
"""
Success = 0
Failed = 1
Cancelled = 2


def __init__(
self,
id_,
Expand Down Expand Up @@ -89,7 +99,7 @@ def _parse_element(cls, element, ns):
created_at = parse_datetime(element.get("createdAt", None))
started_at = parse_datetime(element.get("startedAt", None))
completed_at = parse_datetime(element.get("completedAt", None))
finish_code = element.get("finishCode", -1)
finish_code = int(element.get("finishCode", -1))
notes = [note.text for note in element.findall(".//t:notes", namespaces=ns)] or None
mode = element.get("mode", None)
return cls(
Expand Down
13 changes: 13 additions & 0 deletions tableauserverclient/server/endpoint/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,16 @@ def __str__(self):
from pprint import pformat

return pformat(self.error)


class JobFailedException(Exception):
def __init__(self, job):
self.notes = job.notes
self.job = job

def __str__(self):
return f"Job {self.job.id} failed with notes {self.notes}"


class JobCanceledException(JobFailedException):
pass
27 changes: 26 additions & 1 deletion tableauserverclient/server/endpoint/jobs_endpoint.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from .endpoint import Endpoint, api
from .exceptions import JobCanceledException, JobFailedException
from .. import JobItem, BackgroundJobItem, PaginationItem
from ..request_options import RequestOptionsBase
from ...exponential_backoff import ExponentialBackoffTimer

import logging

Expand All @@ -12,7 +14,6 @@

logger = logging.getLogger("tableau.endpoint.jobs")


class Jobs(Endpoint):
@property
def baseurl(self):
Expand Down Expand Up @@ -48,3 +49,27 @@ def get_by_id(self, job_id):
server_response = self.get_request(url)
new_job = JobItem.from_response(server_response.content, self.parent_srv.namespace)[0]
return new_job

def wait_for_job(self, job_id, *, timeout=None):
if isinstance(job_id, JobItem):
job_id = job_id.id
assert isinstance(job_id, str)
logger.debug(f"Waiting for job {job_id}")

backoffTimer = ExponentialBackoffTimer(timeout=timeout)
job = self.get_by_id(job_id)
while job.completed_at is None:
backoffTimer.sleep()
job = self.get_by_id(job_id)
logger.debug(f"\tJob {job_id} progress={job.progress}")

logger.info("Job {} Completed: Finish Code: {} - Notes:{}".format(job_id, job.finish_code, job.notes))

if job.finish_code == JobItem.FinishCode.Success:
return job
elif job.finish_code == JobItem.FinishCode.Failed:
raise JobFailedException(job)
elif job.finish_code == JobItem.FinishCode.Cancelled:
raise JobCanceledException(job)
else:
raise AssertionError("Unexpected finish_code in job", job)
18 changes: 18 additions & 0 deletions test/_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from contextlib import contextmanager
import unittest
import os.path

TEST_ASSET_DIR = os.path.join(os.path.dirname(__file__), 'assets')
Expand All @@ -14,3 +16,19 @@ def read_xml_asset(filename):

def read_xml_assets(*args):
return map(read_xml_asset, args)


@contextmanager
def mocked_time():
mock_time = 0

def sleep_mock(interval):
nonlocal mock_time
mock_time += interval

def get_time():
return mock_time

patch = unittest.mock.patch
with patch("time.sleep", sleep_mock), patch("time.time", get_time):
yield get_time
2 changes: 1 addition & 1 deletion test/test_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def test_publish_async(self):
self.assertEqual('PublishDatasource', new_job.type)
self.assertEqual('0', new_job.progress)
self.assertEqual('2018-06-30T00:54:54Z', format_datetime(new_job.created_at))
self.assertEqual('1', new_job.finish_code)
self.assertEqual(1, new_job.finish_code)

def test_publish_unnamed_file_object(self):
new_datasource = TSC.DatasourceItem('test')
Expand Down
62 changes: 62 additions & 0 deletions test/test_exponential_backoff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import unittest
from ._utils import mocked_time
from tableauserverclient.exponential_backoff import ExponentialBackoffTimer


class ExponentialBackoffTests(unittest.TestCase):
def test_exponential(self):
with mocked_time() as mock_time:
exponentialBackoff = ExponentialBackoffTimer()
# The creation of our mock shouldn't sleep
self.assertAlmostEqual(mock_time(), 0)
# The first sleep sleeps for a rather short time, the following sleeps become longer
exponentialBackoff.sleep()
self.assertAlmostEqual(mock_time(), 0.5)
exponentialBackoff.sleep()
self.assertAlmostEqual(mock_time(), 1.2)
exponentialBackoff.sleep()
self.assertAlmostEqual(mock_time(), 2.18)
exponentialBackoff.sleep()
self.assertAlmostEqual(mock_time(), 3.552)
exponentialBackoff.sleep()
self.assertAlmostEqual(mock_time(), 5.4728)


def test_exponential_saturation(self):
with mocked_time() as mock_time:
exponentialBackoff = ExponentialBackoffTimer()
for _ in range(99):
exponentialBackoff.sleep()
# We don't increase the sleep time above 30 seconds.
# Otherwise, the exponential sleep time could easily
# reach minutes or even hours between polls
for _ in range(5):
s = mock_time()
exponentialBackoff.sleep()
slept = mock_time() - s
self.assertAlmostEqual(slept, 30)


def test_timeout(self):
with mocked_time() as mock_time:
exponentialBackoff = ExponentialBackoffTimer(timeout=4.5)
for _ in range(4):
exponentialBackoff.sleep()
self.assertAlmostEqual(mock_time(), 3.552)
# Usually, the following sleep would sleep until 5.5, but due to
# the timeout we wait less; thereby we make sure to take the timeout
# into account as good as possible
exponentialBackoff.sleep()
self.assertAlmostEqual(mock_time(), 4.5)
# The next call to `sleep` will raise a TimeoutError
with self.assertRaises(TimeoutError):
exponentialBackoff.sleep()


def test_timeout_zero(self):
with mocked_time() as mock_time:
# The construction of the timer doesn't throw, yet
exponentialBackoff = ExponentialBackoffTimer(timeout = 0)
# But the first `sleep` immediately throws
with self.assertRaises(TimeoutError):
exponentialBackoff.sleep()
41 changes: 37 additions & 4 deletions test/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
import requests_mock
import tableauserverclient as TSC
from tableauserverclient.datetime_helpers import utc
from ._utils import read_xml_asset
from tableauserverclient.server.endpoint.exceptions import JobFailedException
from ._utils import read_xml_asset, mocked_time

TEST_ASSET_DIR = os.path.join(os.path.dirname(__file__), 'assets')

GET_XML = 'job_get.xml'
GET_BY_ID_XML = 'job_get_by_id.xml'
GET_BY_ID_FAILED_XML = 'job_get_by_id_failed.xml'
GET_BY_ID_CANCELLED_XML = 'job_get_by_id_cancelled.xml'
GET_BY_ID_INPROGRESS_XML = 'job_get_by_id_inprogress.xml'


class JobTests(unittest.TestCase):
Expand Down Expand Up @@ -49,9 +53,6 @@ def test_get_by_id(self):
m.get('{0}/{1}'.format(self.baseurl, job_id), text=response_xml)
job = self.server.jobs.get_by_id(job_id)

created_at = datetime(2020, 5, 13, 20, 23, 45, tzinfo=utc)
updated_at = datetime(2020, 5, 13, 20, 25, 18, tzinfo=utc)
ended_at = datetime(2020, 5, 13, 20, 25, 18, tzinfo=utc)
self.assertEqual(job_id, job.id)
self.assertListEqual(job.notes, ['Job detail notes'])

Expand All @@ -72,3 +73,35 @@ def test_cancel_item(self):
with requests_mock.mock() as m:
m.put(self.baseurl + '/ee8c6e70-43b6-11e6-af4f-f7b0d8e20760', status_code=204)
self.server.jobs.cancel(job)


def test_wait_for_job_finished(self):
# Waiting for an already finished job, directly returns that job's info
response_xml = read_xml_asset(GET_BY_ID_XML)
job_id = '2eef4225-aa0c-41c4-8662-a76d89ed7336'
with mocked_time(), requests_mock.mock() as m:
m.get('{0}/{1}'.format(self.baseurl, job_id), text=response_xml)
job = self.server.jobs.wait_for_job(job_id)

self.assertEqual(job_id, job.id)
self.assertListEqual(job.notes, ['Job detail notes'])


def test_wait_for_job_failed(self):
# Waiting for a failed job raises an exception
response_xml = read_xml_asset(GET_BY_ID_FAILED_XML)
job_id = '77d5e57a-2517-479f-9a3c-a32025f2b64d'
with mocked_time(), requests_mock.mock() as m:
m.get('{0}/{1}'.format(self.baseurl, job_id), text=response_xml)
with self.assertRaises(JobFailedException):
self.server.jobs.wait_for_job(job_id)


def test_wait_for_job_timeout(self):
# Waiting for a job which doesn't terminate will throw an exception
response_xml = read_xml_asset(GET_BY_ID_INPROGRESS_XML)
job_id = '77d5e57a-2517-479f-9a3c-a32025f2b64d'
with mocked_time(), requests_mock.mock() as m:
m.get('{0}/{1}'.format(self.baseurl, job_id), text=response_xml)
with self.assertRaises(TimeoutError):
self.server.jobs.wait_for_job(job_id, timeout=30)
2 changes: 1 addition & 1 deletion test/test_workbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ def test_publish_async(self):
self.assertEqual('PublishWorkbook', new_job.type)
self.assertEqual('0', new_job.progress)
self.assertEqual('2018-06-29T23:22:32Z', format_datetime(new_job.created_at))
self.assertEqual('1', new_job.finish_code)
self.assertEqual(1, new_job.finish_code)

def test_publish_invalid_file(self):
new_workbook = TSC.WorkbookItem('test', 'ee8c6e70-43b6-11e6-af4f-f7b0d8e20760')
Expand Down