Skip to content

Commit 8b728a5

Browse files
authored
Merge pull request #1424 from jorwoods/jorwoods/linked_tasks
feat: linked tasks
2 parents ced5fe3 + 816ba80 commit 8b728a5

File tree

9 files changed

+334
-0
lines changed

9 files changed

+334
-0
lines changed

tableauserverclient/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
IntervalItem,
2323
JobItem,
2424
JWTAuth,
25+
LinkedTaskItem,
26+
LinkedTaskStepItem,
27+
LinkedTaskFlowRunItem,
2528
MetricItem,
2629
MonthlyInterval,
2730
PaginationItem,
@@ -118,4 +121,7 @@
118121
"Pager",
119122
"Server",
120123
"Sort",
124+
"LinkedTaskItem",
125+
"LinkedTaskStepItem",
126+
"LinkedTaskFlowRunItem",
121127
]

tableauserverclient/models/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@
2323
HourlyInterval,
2424
)
2525
from tableauserverclient.models.job_item import JobItem, BackgroundJobItem
26+
from tableauserverclient.models.linked_tasks_item import (
27+
LinkedTaskItem,
28+
LinkedTaskStepItem,
29+
LinkedTaskFlowRunItem,
30+
)
2631
from tableauserverclient.models.metric_item import MetricItem
2732
from tableauserverclient.models.pagination_item import PaginationItem
2833
from tableauserverclient.models.permissions_item import PermissionsRule, Permission
@@ -93,4 +98,7 @@
9398
"ViewItem",
9499
"WebhookItem",
95100
"WorkbookItem",
101+
"LinkedTaskItem",
102+
"LinkedTaskStepItem",
103+
"LinkedTaskFlowRunItem",
96104
]
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import datetime as dt
2+
from typing import List, Optional
3+
4+
from defusedxml.ElementTree import fromstring
5+
6+
from tableauserverclient.datetime_helpers import parse_datetime
7+
from tableauserverclient.models.schedule_item import ScheduleItem
8+
9+
10+
class LinkedTaskItem:
11+
def __init__(self) -> None:
12+
self.id: Optional[str] = None
13+
self.num_steps: Optional[int] = None
14+
self.schedule: Optional[ScheduleItem] = None
15+
16+
@classmethod
17+
def from_response(cls, resp: bytes, namespace) -> List["LinkedTaskItem"]:
18+
parsed_response = fromstring(resp)
19+
return [
20+
cls._parse_element(x, namespace)
21+
for x in parsed_response.findall(".//t:linkedTasks[@id]", namespaces=namespace)
22+
]
23+
24+
@classmethod
25+
def _parse_element(cls, xml, namespace) -> "LinkedTaskItem":
26+
task = cls()
27+
task.id = xml.get("id")
28+
task.num_steps = int(xml.get("numSteps"))
29+
task.schedule = ScheduleItem.from_element(xml, namespace)[0]
30+
return task
31+
32+
33+
class LinkedTaskStepItem:
34+
def __init__(self) -> None:
35+
self.id: Optional[str] = None
36+
self.step_number: Optional[int] = None
37+
self.stop_downstream_on_failure: Optional[bool] = None
38+
self.task_details: List[LinkedTaskFlowRunItem] = []
39+
40+
@classmethod
41+
def from_task_xml(cls, xml, namespace) -> List["LinkedTaskStepItem"]:
42+
return [cls._parse_element(x, namespace) for x in xml.findall(".//t:linkedTaskSteps[@id]", namespace)]
43+
44+
@classmethod
45+
def _parse_element(cls, xml, namespace) -> "LinkedTaskStepItem":
46+
step = cls()
47+
step.id = xml.get("id")
48+
step.step_number = int(xml.get("stepNumber"))
49+
step.stop_downstream_on_failure = string_to_bool(xml.get("stopDownstreamTasksOnFailure"))
50+
step.task_details = LinkedTaskFlowRunItem._parse_element(xml, namespace)
51+
return step
52+
53+
54+
class LinkedTaskFlowRunItem:
55+
def __init__(self) -> None:
56+
self.flow_run_id: Optional[str] = None
57+
self.flow_run_priority: Optional[int] = None
58+
self.flow_run_consecutive_failed_count: Optional[int] = None
59+
self.flow_run_task_type: Optional[str] = None
60+
self.flow_id: Optional[str] = None
61+
self.flow_name: Optional[str] = None
62+
63+
@classmethod
64+
def _parse_element(cls, xml, namespace) -> List["LinkedTaskFlowRunItem"]:
65+
all_tasks = []
66+
for flow_run in xml.findall(".//t:flowRun[@id]", namespace):
67+
task = cls()
68+
task.flow_run_id = flow_run.get("id")
69+
task.flow_run_priority = int(flow_run.get("priority"))
70+
task.flow_run_consecutive_failed_count = int(flow_run.get("consecutiveFailedCount"))
71+
task.flow_run_task_type = flow_run.get("type")
72+
flow = flow_run.find(".//t:flow[@id]", namespace)
73+
task.flow_id = flow.get("id")
74+
task.flow_name = flow.get("name")
75+
all_tasks.append(task)
76+
77+
return all_tasks
78+
79+
80+
class LinkedTaskJobItem:
81+
def __init__(self) -> None:
82+
self.id: Optional[str] = None
83+
self.linked_task_id: Optional[str] = None
84+
self.status: Optional[str] = None
85+
self.created_at: Optional[dt.datetime] = None
86+
87+
@classmethod
88+
def from_response(cls, resp: bytes, namespace) -> "LinkedTaskJobItem":
89+
parsed_response = fromstring(resp)
90+
job = cls()
91+
job_xml = parsed_response.find(".//t:linkedTaskJob[@id]", namespaces=namespace)
92+
if job_xml is None:
93+
raise ValueError("No linked task job found in response")
94+
job.id = job_xml.get("id")
95+
job.linked_task_id = job_xml.get("linkedTaskId")
96+
job.status = job_xml.get("status")
97+
job.created_at = parse_datetime(job_xml.get("createdAt"))
98+
return job
99+
100+
101+
def string_to_bool(s: str) -> bool:
102+
return s.lower() == "true"

tableauserverclient/server/endpoint/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from tableauserverclient.server.endpoint.groups_endpoint import Groups
1515
from tableauserverclient.server.endpoint.groupsets_endpoint import GroupSets
1616
from tableauserverclient.server.endpoint.jobs_endpoint import Jobs
17+
from tableauserverclient.server.endpoint.linked_tasks_endpoint import LinkedTasks
1718
from tableauserverclient.server.endpoint.metadata_endpoint import Metadata
1819
from tableauserverclient.server.endpoint.metrics_endpoint import Metrics
1920
from tableauserverclient.server.endpoint.projects_endpoint import Projects
@@ -47,6 +48,7 @@
4748
"Groups",
4849
"GroupSets",
4950
"Jobs",
51+
"LinkedTasks",
5052
"Metadata",
5153
"Metrics",
5254
"Projects",
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from typing import List, Optional, Tuple, Union
2+
3+
from tableauserverclient.helpers.logging import logger
4+
from tableauserverclient.models.linked_tasks_item import LinkedTaskItem, LinkedTaskJobItem
5+
from tableauserverclient.models.pagination_item import PaginationItem
6+
from tableauserverclient.server.endpoint.endpoint import QuerysetEndpoint, api
7+
from tableauserverclient.server.request_factory import RequestFactory
8+
from tableauserverclient.server.request_options import RequestOptions
9+
10+
11+
class LinkedTasks(QuerysetEndpoint[LinkedTaskItem]):
12+
def __init__(self, parent_srv):
13+
super().__init__(parent_srv)
14+
self._parent_srv = parent_srv
15+
16+
@property
17+
def baseurl(self) -> str:
18+
return f"{self.parent_srv.baseurl}/sites/{self.parent_srv.site_id}/tasks/linked"
19+
20+
@api(version="3.15")
21+
def get(self, req_options: Optional["RequestOptions"] = None) -> Tuple[List[LinkedTaskItem], PaginationItem]:
22+
logger.info("Querying all linked tasks on site")
23+
url = self.baseurl
24+
server_response = self.get_request(url, req_options)
25+
pagination_item = PaginationItem.from_response(server_response.content, self.parent_srv.namespace)
26+
all_group_items = LinkedTaskItem.from_response(server_response.content, self.parent_srv.namespace)
27+
return all_group_items, pagination_item
28+
29+
@api(version="3.15")
30+
def get_by_id(self, linked_task: Union[LinkedTaskItem, str]) -> LinkedTaskItem:
31+
task_id = getattr(linked_task, "id", linked_task)
32+
logger.info("Querying all linked tasks on site")
33+
url = f"{self.baseurl}/{task_id}"
34+
server_response = self.get_request(url)
35+
all_group_items = LinkedTaskItem.from_response(server_response.content, self.parent_srv.namespace)
36+
return all_group_items[0]
37+
38+
@api(version="3.15")
39+
def run_now(self, linked_task: Union[LinkedTaskItem, str]) -> LinkedTaskJobItem:
40+
task_id = getattr(linked_task, "id", linked_task)
41+
logger.info(f"Running linked task {task_id} now")
42+
url = f"{self.baseurl}/{task_id}/runNow"
43+
empty_req = RequestFactory.Empty.empty_req()
44+
server_response = self.post_request(url, empty_req)
45+
return LinkedTaskJobItem.from_response(server_response.content, self.parent_srv.namespace)

tableauserverclient/server/server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
Metrics,
3434
Endpoint,
3535
CustomViews,
36+
LinkedTasks,
3637
GroupSets,
3738
Tags,
3839
)
@@ -101,6 +102,7 @@ def __init__(self, server_address, use_server_version=False, http_options=None,
101102
self.flow_runs = FlowRuns(self)
102103
self.metrics = Metrics(self)
103104
self.custom_views = CustomViews(self)
105+
self.linked_tasks = LinkedTasks(self)
104106
self.group_sets = GroupSets(self)
105107
self.tags = Tags(self)
106108

test/assets/linked_tasks_get.xml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?xml version='1.0' encoding='UTF-8'?>
2+
<tsResponse xmlns="http://tableau.com/api" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tableau.com/api http://tableau.com/api/ts-api-3.5.xsd">
3+
<pagination pageNumber="1" pageSize="100" totalAvailable="5"/>
4+
<linkedTasks>
5+
<linkedTasks id="1b8211dc-51a8-45ce-a831-b5921708e03e"
6+
numSteps="1">
7+
<schedule id="be077332-d01d-481b-b2f3-917e463d4dca"
8+
name="schedule-name"
9+
state="Active"
10+
priority="50"
11+
createdAt="2024-07-24T00:27:55Z"
12+
updatedAt="2024-07-24T01:42:15Z"
13+
type="Flow"
14+
frequency="daily"
15+
nextRunAt="2024-07-24T03:30:00Z"/>
16+
<linkedTaskSteps>
17+
<linkedTaskSteps id="f554a4df-bb6f-4294-94ee-9a709ef9bda0"
18+
stepNumber="1"
19+
stopDownstreamTasksOnFailure="true">
20+
<task>
21+
<flowRun id="e3d1fc25-5644-4e32-af35-58dcbd1dbd73"
22+
priority="1"
23+
consecutiveFailedCount="3"
24+
type="runFlow">
25+
<flow id="ab1231eb-b8ca-461e-a131-83f3c2b6a673"
26+
name="flow-name" />
27+
</flowRun>
28+
</task>
29+
</linkedTaskSteps>
30+
</linkedTaskSteps>
31+
</linkedTasks>
32+
</linkedTasks>
33+
</tsResponse>
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?xml version='1.0' encoding='UTF-8'?>
2+
<tsResponse xmlns="http://tableau.com/api" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tableau.com/api http://tableau.com/api/ts-api-3.5.xsd">
3+
<linkedTaskJob id="269a1e5a-1220-4a13-ac01-704982693dd8"
4+
linkedTaskId="1b8211dc-51a8-45ce-a831-b5921708e03e"
5+
status="InProgress"
6+
createdAt="2022-02-15T00:22:22Z"/>
7+
</tsResponse>

test/test_linked_tasks.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
from pathlib import Path
2+
import unittest
3+
4+
from defusedxml.ElementTree import fromstring
5+
import pytest
6+
import requests_mock
7+
8+
import tableauserverclient as TSC
9+
from tableauserverclient.datetime_helpers import parse_datetime
10+
from tableauserverclient.models.linked_tasks_item import LinkedTaskItem, LinkedTaskStepItem, LinkedTaskFlowRunItem
11+
12+
asset_dir = (Path(__file__).parent / "assets").resolve()
13+
14+
GET_LINKED_TASKS = asset_dir / "linked_tasks_get.xml"
15+
RUN_LINKED_TASK_NOW = asset_dir / "linked_tasks_run_now.xml"
16+
17+
18+
class TestLinkedTasks(unittest.TestCase):
19+
def setUp(self) -> None:
20+
self.server = TSC.Server("http://test", False)
21+
self.server.version = "3.15"
22+
23+
# Fake signin
24+
self.server._site_id = "dad65087-b08b-4603-af4e-2887b8aafc67"
25+
self.server._auth_token = "j80k54ll2lfMZ0tv97mlPvvSCRyD0DOM"
26+
27+
self.baseurl = self.server.linked_tasks.baseurl
28+
29+
def test_parse_linked_task_flow_run(self):
30+
xml = fromstring(GET_LINKED_TASKS.read_bytes())
31+
task_runs = LinkedTaskFlowRunItem._parse_element(xml, self.server.namespace)
32+
assert 1 == len(task_runs)
33+
task = task_runs[0]
34+
assert task.flow_run_id == "e3d1fc25-5644-4e32-af35-58dcbd1dbd73"
35+
assert task.flow_run_priority == 1
36+
assert task.flow_run_consecutive_failed_count == 3
37+
assert task.flow_run_task_type == "runFlow"
38+
assert task.flow_id == "ab1231eb-b8ca-461e-a131-83f3c2b6a673"
39+
assert task.flow_name == "flow-name"
40+
41+
def test_parse_linked_task_step(self):
42+
xml = fromstring(GET_LINKED_TASKS.read_bytes())
43+
steps = LinkedTaskStepItem.from_task_xml(xml, self.server.namespace)
44+
assert 1 == len(steps)
45+
step = steps[0]
46+
assert step.id == "f554a4df-bb6f-4294-94ee-9a709ef9bda0"
47+
assert step.stop_downstream_on_failure
48+
assert step.step_number == 1
49+
assert 1 == len(step.task_details)
50+
task = step.task_details[0]
51+
assert task.flow_run_id == "e3d1fc25-5644-4e32-af35-58dcbd1dbd73"
52+
assert task.flow_run_priority == 1
53+
assert task.flow_run_consecutive_failed_count == 3
54+
assert task.flow_run_task_type == "runFlow"
55+
assert task.flow_id == "ab1231eb-b8ca-461e-a131-83f3c2b6a673"
56+
assert task.flow_name == "flow-name"
57+
58+
def test_parse_linked_task(self):
59+
tasks = LinkedTaskItem.from_response(GET_LINKED_TASKS.read_bytes(), self.server.namespace)
60+
assert 1 == len(tasks)
61+
task = tasks[0]
62+
assert task.id == "1b8211dc-51a8-45ce-a831-b5921708e03e"
63+
assert task.num_steps == 1
64+
assert task.schedule is not None
65+
assert task.schedule.id == "be077332-d01d-481b-b2f3-917e463d4dca"
66+
67+
def test_get_linked_tasks(self):
68+
with requests_mock.mock() as m:
69+
m.get(self.baseurl, text=GET_LINKED_TASKS.read_text())
70+
tasks, pagination_item = self.server.linked_tasks.get()
71+
72+
assert 1 == len(tasks)
73+
task = tasks[0]
74+
assert task.id == "1b8211dc-51a8-45ce-a831-b5921708e03e"
75+
assert task.num_steps == 1
76+
assert task.schedule is not None
77+
assert task.schedule.id == "be077332-d01d-481b-b2f3-917e463d4dca"
78+
79+
def test_get_by_id_str_linked_task(self):
80+
id_ = "1b8211dc-51a8-45ce-a831-b5921708e03e"
81+
82+
with requests_mock.mock() as m:
83+
m.get(f"{self.baseurl}/{id_}", text=GET_LINKED_TASKS.read_text())
84+
task = self.server.linked_tasks.get_by_id(id_)
85+
86+
assert task.id == "1b8211dc-51a8-45ce-a831-b5921708e03e"
87+
assert task.num_steps == 1
88+
assert task.schedule is not None
89+
assert task.schedule.id == "be077332-d01d-481b-b2f3-917e463d4dca"
90+
91+
def test_get_by_id_obj_linked_task(self):
92+
id_ = "1b8211dc-51a8-45ce-a831-b5921708e03e"
93+
in_task = LinkedTaskItem()
94+
in_task.id = id_
95+
96+
with requests_mock.mock() as m:
97+
m.get(f"{self.baseurl}/{id_}", text=GET_LINKED_TASKS.read_text())
98+
task = self.server.linked_tasks.get_by_id(in_task)
99+
100+
assert task.id == "1b8211dc-51a8-45ce-a831-b5921708e03e"
101+
assert task.num_steps == 1
102+
assert task.schedule is not None
103+
assert task.schedule.id == "be077332-d01d-481b-b2f3-917e463d4dca"
104+
105+
def test_run_now_str_linked_task(self):
106+
id_ = "1b8211dc-51a8-45ce-a831-b5921708e03e"
107+
108+
with requests_mock.mock() as m:
109+
m.post(f"{self.baseurl}/{id_}/runNow", text=RUN_LINKED_TASK_NOW.read_text())
110+
job = self.server.linked_tasks.run_now(id_)
111+
112+
assert job.id == "269a1e5a-1220-4a13-ac01-704982693dd8"
113+
assert job.status == "InProgress"
114+
assert job.created_at == parse_datetime("2022-02-15T00:22:22Z")
115+
assert job.linked_task_id == id_
116+
117+
def test_run_now_obj_linked_task(self):
118+
id_ = "1b8211dc-51a8-45ce-a831-b5921708e03e"
119+
in_task = LinkedTaskItem()
120+
in_task.id = id_
121+
122+
with requests_mock.mock() as m:
123+
m.post(f"{self.baseurl}/{id_}/runNow", text=RUN_LINKED_TASK_NOW.read_text())
124+
job = self.server.linked_tasks.run_now(in_task)
125+
126+
assert job.id == "269a1e5a-1220-4a13-ac01-704982693dd8"
127+
assert job.status == "InProgress"
128+
assert job.created_at == parse_datetime("2022-02-15T00:22:22Z")
129+
assert job.linked_task_id == id_

0 commit comments

Comments
 (0)