diff --git a/.circleci/config.yml b/.circleci/config.yml index 9629e2d..d82fae4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -38,6 +38,9 @@ jobs: steps: - checkout - run: docker compose -f docker-compose-test.yaml up test --exit-code-from test + - run: docker compose -f docker-compose-test.yaml down + - run: docker compose -f docker-compose-test.yaml up examples --exit-code-from examples + workflows: diff --git a/README.md b/README.md index e21b1e5..dcbae9a 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,29 @@ asyncio.run(test_notifier()) # Received WebRTC metrics: ServerMessageMetricsReport(metrics='{}') ``` +#### Cluster of Jellyfishes + +The cluster of jellyfishes has got embedded load balancer, which means that a new room will be created on jellyfish with the least usage. At the moment to modify this specific room you must communicate with the jellyfish on which this room was created. + +```python +room_api = RoomApi(server_address='localhost:5002') + +# Create a room to trigger a server notification with h264 as a codec, +# that allow to use HLS. +address, room = room_api.create_room(video_codec="h264") + +# Create new room api with returned jellyfish address as a room could be +# created on a different jellyfish instance +# (if you communicate with a cluster of jellyfishes) +new_room_api = RoomApi(server_address=address) + +# Add HLS component with manual subscribe mode, we use here `new_room_api` as we are sure that this API refers to the jellyfish on which this room was created. +_hls_component = new_room_api.add_component( + room.id, + ComponentOptionsHLS(subscribe_mode=ComponentOptionsHLSSubscribeMode.MANUAL), +) +``` + ## Testing You can test the SDK by running diff --git a/docker-compose-test.yaml b/docker-compose-test.yaml index ae1619f..69e8344 100644 --- a/docker-compose-test.yaml +++ b/docker-compose-test.yaml @@ -13,7 +13,7 @@ services: timeout: 2s start_period: 30s environment: - JF_HOST: "jellyfish" + JF_HOST: "jellyfish:5002" JF_INTEGRATED_TURN_IP: "${INTEGRATED_TURN_IP:-127.0.0.1}" JF_INTEGRATED_TURN_LISTEN_IP: "0.0.0.0" JF_INTEGRATED_TURN_PORT_RANGE: "50000-50050" @@ -43,3 +43,16 @@ services: depends_on: jellyfish: condition: service_healthy + + examples: + container_name: examples + image: "cimg/python:${PYTHON_VERSION:-3.8}" + command: sh -c "cd /app && \ poetry config virtualenvs.in-project false && \ poetry cache clear pypi --all && \ poetry install --no-ansi && \ poetry run examples" + environment: + DOCKER_TEST: "TRUE" + CI_LIMIT: "10" + volumes: + - .:/app + depends_on: + jellyfish: + condition: service_healthy diff --git a/examples/mini_tutorial.py b/examples/mini_tutorial.py new file mode 100755 index 0000000..e264528 --- /dev/null +++ b/examples/mini_tutorial.py @@ -0,0 +1,81 @@ +import asyncio +import os + +from jellyfish import ( + ComponentOptionsFile, + ComponentOptionsHLS, + ComponentOptionsHLSSubscribeMode, + Notifier, + RoomApi, +) +from jellyfish.events import ( + ServerMessageHlsPlayable, + ServerMessageTrackAdded, + ServerMessageTrackType, +) + +HOST = "jellyfish" if os.getenv("DOCKER_TEST") == "TRUE" else "localhost" +SERVER_ADDRESS = f"{HOST}:5002" + + +notifier = Notifier(server_address=SERVER_ADDRESS, server_api_token="development") + +notifier_task = None + + +@notifier.on_server_notification +def handle_notification(server_notification): + print(f"Received a notification: {server_notification}") + + if isinstance(server_notification, ServerMessageTrackAdded): + if server_notification.track.type == ServerMessageTrackType.TRACK_TYPE_AUDIO: + print("New audio track has been added") + elif server_notification.track.type == ServerMessageTrackType.TRACK_TYPE_VIDEO: + print("New video track has been added") + elif isinstance(server_notification, ServerMessageHlsPlayable): + print("HLS stream is playable") + notifier_task.cancel() + + +@notifier.on_metrics +def handle_metrics(metrics_report): + pass + + +async def test_notifier(): + global notifier_task + notifier_task = asyncio.create_task(notifier.connect()) + + # Wait for notifier to be ready to receive messages + await notifier.wait_ready() + + room_api = RoomApi(server_address=SERVER_ADDRESS) + + # Create a room to trigger a server notification with h264 as a codec, + # that allow to use HLS. + address, room = room_api.create_room(video_codec="h264") + + # Create new room api with returned jellyfish address as a room could be + # created on a different jellyfish instance + # (if you communicate with a cluster of jellyfishes) + room_api = RoomApi(server_address=address) + + # Add HLS component with manual subscribe mode + _hls_component = room_api.add_component( + room.id, + ComponentOptionsHLS(subscribe_mode=ComponentOptionsHLSSubscribeMode.MANUAL), + ) + + # Add File Component + file_component = room_api.add_component(room.id, ComponentOptionsFile("video.h264")) + + # Subscribe on specific component + room_api.hls_subscribe(room.id, [file_component.id]) + + try: + await notifier_task + except asyncio.CancelledError: + print("Notifier task canceled, exiting") + + +asyncio.run(test_notifier()) diff --git a/examples/room_api.py b/examples/room_api.py index d514c2b..7888780 100644 --- a/examples/room_api.py +++ b/examples/room_api.py @@ -1,7 +1,12 @@ +import os + from jellyfish import ComponentOptionsHLS, PeerOptionsWebRTC, RoomApi +HOST = "jellyfish" if os.getenv("DOCKER_TEST") == "TRUE" else "localhost" +SERVER_ADDRESS = f"{HOST}:5002" + # Create a room -room_api = RoomApi(server_address="localhost:5002", server_api_token="development") +room_api = RoomApi(server_address=SERVER_ADDRESS, server_api_token="development") jellyfish_address, room = room_api.create_room( video_codec="h264", webhook_url="http://localhost:5000/webhook" diff --git a/examples/server_notifications.py b/examples/server_notifications.py index 6ea1eef..0c0fbc4 100644 --- a/examples/server_notifications.py +++ b/examples/server_notifications.py @@ -1,9 +1,23 @@ import asyncio +import os from jellyfish import Notifier, RoomApi from jellyfish.events import ServerMessageTrackAdded, ServerMessageTrackType -notifier = Notifier(server_address="localhost:5002", server_api_token="development") +HOST = "jellyfish" if os.getenv("DOCKER_TEST") == "TRUE" else "localhost" +SERVER_ADDRESS = f"{HOST}:5002" + +notifier = Notifier(server_address=SERVER_ADDRESS, server_api_token="development") + +notifier_task = None + +LIMIT = os.getenv("CI_LIMIT", None) + +if LIMIT is not None: + LIMIT = int(LIMIT) + + +counter = 0 @notifier.on_server_notification @@ -20,19 +34,27 @@ def handle_notification(server_notification): @notifier.on_metrics def handle_metrics(metrics_report): print(f"Received WebRTC metrics: {metrics_report}") + global counter + if LIMIT and counter > LIMIT: + notifier_task.cancel() + counter += 1 async def test_notifier(): + global notifier_task notifier_task = asyncio.create_task(notifier.connect()) # Wait for notifier to be ready to receive messages await notifier.wait_ready() # Create a room to trigger a server notification - room_api = RoomApi() + room_api = RoomApi(server_address=SERVER_ADDRESS) room_api.create_room() - await notifier_task + try: + await notifier_task + except asyncio.CancelledError: + print("Notifier task canceled, exiting") asyncio.run(test_notifier()) diff --git a/jellyfish/api/_room_api.py b/jellyfish/api/_room_api.py index 4f4f4fb..a2b7b9e 100644 --- a/jellyfish/api/_room_api.py +++ b/jellyfish/api/_room_api.py @@ -2,7 +2,7 @@ RoomApi used to manage rooms """ -from typing import Literal, Union +from typing import List, Literal, Tuple, Union from jellyfish._openapi_client.api.hls import subscribe_hls_to as hls_subscribe_hls_to from jellyfish._openapi_client.api.room import add_component as room_add_component @@ -62,7 +62,7 @@ def create_room( max_peers: int = None, video_codec: Literal["h264", "vp8"] = None, webhook_url: str = None, - ) -> (str, Room): + ) -> Tuple[str, Room]: """ Creates a new room @@ -104,14 +104,16 @@ def get_room(self, room_id: str) -> Room: return self._request(room_get_room, room_id=room_id).data - def add_peer(self, room_id: str, options: PeerOptionsWebRTC) -> (str, Peer): + def add_peer(self, room_id: str, options: PeerOptionsWebRTC) -> Tuple[str, Peer]: """ Creates peer in the room Currently only `webrtc` peer is supported Returns a tuple (`peer_token`, `Peer`) - the token needed by Peer - to authenticate to Jellyfish and the new `Peer` + to authenticate to Jellyfish and the new `Peer`. + + The possible options to pass for peer are `PeerOptionsWebRTC`. """ peer_type = "webrtc" @@ -135,7 +137,14 @@ def add_component( ComponentOptionsSIP, ], ) -> Union[ComponentFile, ComponentHLS, ComponentRTSP, ComponentSIP]: - """Creates component in the room""" + """ + Creates component in the room. + Currently there are 4 different components: + * File Component for which the options are `ComponentOptionsFile` + * HLS Component which options are `ComponentOptionsHLS` + * RTSP Component which options are `ComponentOptionsRTSP` + * SIP Component which options are `ComponentOptionsSIP` + """ if isinstance(options, ComponentOptionsFile): component_type = "file" @@ -147,7 +156,7 @@ def add_component( component_type = "sip" else: raise ValueError( - "options must be ComponentFile, ComponentOptionsHLS," + "options must be ComponentOptionsFile, ComponentOptionsHLS," "ComponentOptionsRTSP or ComponentOptionsSIP" ) @@ -162,7 +171,7 @@ def delete_component(self, room_id: str, component_id: str) -> None: return self._request(room_delete_component, id=component_id, room_id=room_id) - def hls_subscribe(self, room_id: str, origins: [str]): + def hls_subscribe(self, room_id: str, origins: List[str]): """ In order to subscribe to HLS peers/components, the HLS component should be initialized with the subscribe_mode set to manual. diff --git a/poetry_scripts.py b/poetry_scripts.py index 5cf643a..7e9dcc5 100644 --- a/poetry_scripts.py +++ b/poetry_scripts.py @@ -1,13 +1,24 @@ import os import shutil +import subprocess import sys from pathlib import Path def check_exit_code(command): - command_exit_code = os.system(command) - if command_exit_code != 0: - sys.exit(command_exit_code >> 8) + process = subprocess.Popen( + command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ) + + while True: + output = process.stdout.readline() + if output == b"" and process.poll() is not None: + break + if output: + print(str(output.strip(), "utf-8")) + exit_code = process.poll() + if exit_code != 0: + sys.exit(exit_code) def run_tests(): @@ -20,6 +31,17 @@ def run_tests(): check_exit_code("docker compose -f docker-compose-test.yaml down") +def run_examples(): + print("Start examples") + + examples = os.listdir("./examples") + + for example in examples: + check_exit_code(f"python ./examples/{example}") + print(f"After example from file: {example}") + print("All examples run without errors") + + def run_local_test(): check_exit_code('poetry run pytest -m "not file_component_sources"') diff --git a/pyproject.toml b/pyproject.toml index 5a85dee..91058f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ lint = "poetry_scripts:run_linter" fix_lint = "poetry_scripts:run_linter_fix" generate_docs = "poetry_scripts:generate_docs" update_client = "poetry_scripts:update_client" +examples = "poetry_scripts:run_examples" [tool.ruff] select = ["F", "I"]