diff --git a/tableauserverclient/models/task_item.py b/tableauserverclient/models/task_item.py index 0ffc3bfab..01cfcfb11 100644 --- a/tableauserverclient/models/task_item.py +++ b/tableauserverclient/models/task_item.py @@ -18,6 +18,7 @@ class Type: _TASK_TYPE_MAPPING = { "RefreshExtractTask": Type.ExtractRefresh, "MaterializeViewsTask": Type.DataAcceleration, + "RunFlowTask": Type.RunFlow, } def __init__( diff --git a/tableauserverclient/server/endpoint/__init__.py b/tableauserverclient/server/endpoint/__init__.py index c018d8334..b2f291369 100644 --- a/tableauserverclient/server/endpoint/__init__.py +++ b/tableauserverclient/server/endpoint/__init__.py @@ -10,6 +10,7 @@ from .fileuploads_endpoint import Fileuploads from .flow_runs_endpoint import FlowRuns from .flows_endpoint import Flows +from .flow_task_endpoint import FlowTasks from .groups_endpoint import Groups from .jobs_endpoint import Jobs from .metadata_endpoint import Metadata diff --git a/tableauserverclient/server/endpoint/flow_task_endpoint.py b/tableauserverclient/server/endpoint/flow_task_endpoint.py new file mode 100644 index 000000000..eea3f9710 --- /dev/null +++ b/tableauserverclient/server/endpoint/flow_task_endpoint.py @@ -0,0 +1,29 @@ +import logging +from typing import List, Optional, Tuple, TYPE_CHECKING + +from tableauserverclient.server.endpoint.endpoint import Endpoint, api +from tableauserverclient.server.endpoint.exceptions import MissingRequiredFieldError +from tableauserverclient.models import TaskItem, PaginationItem +from tableauserverclient.server import RequestFactory + +from tableauserverclient.helpers.logging import logger + +if TYPE_CHECKING: + from tableauserverclient.server.request_options import RequestOptions + + +class FlowTasks(Endpoint): + @property + def baseurl(self) -> str: + return "{0}/sites/{1}/tasks/flows".format(self.parent_srv.baseurl, self.parent_srv.site_id) + + @api(version="3.22") + def create(self, flow_item: TaskItem) -> TaskItem: + if not flow_item: + error = "No flow provided" + raise ValueError(error) + logger.info("Creating an flow task %s", flow_item) + url = self.baseurl + create_req = RequestFactory.FlowTask.create_flow_task_req(flow_item) + server_response = self.post_request(url, create_req) + return server_response.content diff --git a/tableauserverclient/server/request_factory.py b/tableauserverclient/server/request_factory.py index 1f6dfbfc6..c204e7217 100644 --- a/tableauserverclient/server/request_factory.py +++ b/tableauserverclient/server/request_factory.py @@ -1114,6 +1114,40 @@ def create_extract_req(self, xml_request: ET.Element, extract_item: "TaskItem") return ET.tostring(xml_request) +class FlowTaskRequest(object): + @_tsrequest_wrapped + def create_flow_task_req(self, xml_request: ET.Element, flow_item: "TaskItem") -> bytes: + flow_element = ET.SubElement(xml_request, "runFlow") + + # Main attributes + flow_element.attrib["type"] = flow_item.task_type + + if flow_item.target is not None: + target_element = ET.SubElement(flow_element, flow_item.target.type) + target_element.attrib["id"] = flow_item.target.id + + if flow_item.schedule_item is None: + return ET.tostring(xml_request) + + # Schedule attributes + schedule_element = ET.SubElement(xml_request, "schedule") + + interval_item = flow_item.schedule_item.interval_item + schedule_element.attrib["frequency"] = interval_item._frequency + frequency_element = ET.SubElement(schedule_element, "frequencyDetails") + frequency_element.attrib["start"] = str(interval_item.start_time) + if hasattr(interval_item, "end_time") and interval_item.end_time is not None: + frequency_element.attrib["end"] = str(interval_item.end_time) + if hasattr(interval_item, "interval") and interval_item.interval: + intervals_element = ET.SubElement(frequency_element, "intervals") + for interval in interval_item._interval_type_pairs(): # type: ignore + expression, value = interval + single_interval_element = ET.SubElement(intervals_element, "interval") + single_interval_element.attrib[expression] = value + + return ET.tostring(xml_request) + + class SubscriptionRequest(object): @_tsrequest_wrapped def create_req(self, xml_request: ET.Element, subscription_item: "SubscriptionItem") -> bytes: @@ -1253,6 +1287,7 @@ class RequestFactory(object): Favorite = FavoriteRequest() Fileupload = FileuploadRequest() Flow = FlowRequest() + FlowTask = FlowTaskRequest() Group = GroupRequest() Metric = MetricRequest() Permission = PermissionRequest() diff --git a/tableauserverclient/server/server.py b/tableauserverclient/server/server.py index ee23789b1..3a6831458 100644 --- a/tableauserverclient/server/server.py +++ b/tableauserverclient/server/server.py @@ -25,6 +25,7 @@ Databases, Tables, Flows, + FlowTasks, Webhooks, DataAccelerationReport, Favorites, @@ -82,6 +83,7 @@ def __init__(self, server_address, use_server_version=False, http_options=None, self.datasources = Datasources(self) self.favorites = Favorites(self) self.flows = Flows(self) + self.flow_tasks = FlowTasks(self) self.projects = Projects(self) self.schedules = Schedules(self) self.server_info = ServerInfo(self) diff --git a/test/assets/tasks_create_flow_task.xml b/test/assets/tasks_create_flow_task.xml new file mode 100644 index 000000000..11c9a4ff0 --- /dev/null +++ b/test/assets/tasks_create_flow_task.xml @@ -0,0 +1,28 @@ + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/test/test_flowtask.py b/test/test_flowtask.py new file mode 100644 index 000000000..034066e64 --- /dev/null +++ b/test/test_flowtask.py @@ -0,0 +1,47 @@ +import os +import unittest +from datetime import time +from pathlib import Path + +import requests_mock + +import tableauserverclient as TSC +from tableauserverclient.datetime_helpers import parse_datetime +from tableauserverclient.models.task_item import TaskItem + +TEST_ASSET_DIR = Path(__file__).parent / "assets" +GET_XML_CREATE_FLOW_TASK_RESPONSE = os.path.join(TEST_ASSET_DIR, "tasks_create_flow_task.xml") + + +class TaskTests(unittest.TestCase): + def setUp(self): + self.server = TSC.Server("http://test", False) + self.server.version = "3.22" + + # Fake Signin + self.server._site_id = "dad65087-b08b-4603-af4e-2887b8aafc67" + self.server._auth_token = "j80k54ll2lfMZ0tv97mlPvvSCRyD0DOM" + + self.baseurl = self.server.flow_tasks.baseurl + + def test_create_flow_task(self): + monthly_interval = TSC.MonthlyInterval(start_time=time(23, 30), interval_value=15) + monthly_schedule = TSC.ScheduleItem( + "Monthly Schedule", + 50, + TSC.ScheduleItem.Type.Flow, + TSC.ScheduleItem.ExecutionOrder.Parallel, + monthly_interval, + ) + target_item = TSC.Target("flow_id", "flow") + + task = TaskItem(None, "RunFlow", None, schedule_item=monthly_schedule, target=target_item) + + with open(GET_XML_CREATE_FLOW_TASK_RESPONSE, "rb") as f: + response_xml = f.read().decode("utf-8") + with requests_mock.mock() as m: + m.post("{}".format(self.baseurl), text=response_xml) + create_response_content = self.server.flow_tasks.create(task).decode("utf-8") + + self.assertTrue("schedule_id" in create_response_content) + self.assertTrue("flow_id" in create_response_content)