diff --git a/docs/server_notifications.md b/docs/server_notifications.md new file mode 100644 index 0000000..0feb547 --- /dev/null +++ b/docs/server_notifications.md @@ -0,0 +1,16 @@ +# Server Notifications + +The Jellyfish can send one of the following notifications: + +`ServerMessageRoomCreated`, +`ServerMessageRoomDeleted`, +`ServerMessageRoomCrashed`, +`ServerMessagePeerConnected`, +`ServerMessagePeerDisconnected`, +`ServerMessagePeerCrashed`, +`ServerMessageComponentCrashed`, +`ServerMessageTrackAdded`, +`ServerMessageTrackMetadataUpdated`, +`ServerMessageTrackRemoved`, +`ServerMessageHlsPlayable`, +`ServerMessageMetricsReport` diff --git a/examples/room_api.py b/examples/room_api.py new file mode 100644 index 0000000..d514c2b --- /dev/null +++ b/examples/room_api.py @@ -0,0 +1,17 @@ +from jellyfish import ComponentOptionsHLS, PeerOptionsWebRTC, RoomApi + +# Create a room +room_api = RoomApi(server_address="localhost:5002", server_api_token="development") + +jellyfish_address, room = room_api.create_room( + video_codec="h264", webhook_url="http://localhost:5000/webhook" +) +print((jellyfish_address, room)) + +# Add peer to the room +peer_token, peer_webrtc = room_api.add_peer(room.id, options=PeerOptionsWebRTC()) +print((peer_token, peer_webrtc)) + +# Add component to the room +component_hls = room_api.add_component(room.id, options=ComponentOptionsHLS()) +print(component_hls) diff --git a/examples/server_notifications.py b/examples/server_notifications.py new file mode 100644 index 0000000..6ea1eef --- /dev/null +++ b/examples/server_notifications.py @@ -0,0 +1,38 @@ +import asyncio + +from jellyfish import Notifier, RoomApi +from jellyfish.events import ServerMessageTrackAdded, ServerMessageTrackType + +notifier = Notifier(server_address="localhost:5002", server_api_token="development") + + +@notifier.on_server_notification +def handle_notification(server_notification): + print(f"Received a notification: {server_notification}") + + if isinstance(server_notification, ServerMessageTrackAdded): + if server_notification.track.type == ServerMessageTrackType.TRACK_TYPE_AUDIO: + print("New audio track has been added") + elif server_notification.track.type == ServerMessageTrackType.TRACK_TYPE_VIDEO: + print("New video track has been added") + + +@notifier.on_metrics +def handle_metrics(metrics_report): + print(f"Received WebRTC metrics: {metrics_report}") + + +async def test_notifier(): + notifier_task = asyncio.create_task(notifier.connect()) + + # Wait for notifier to be ready to receive messages + await notifier.wait_ready() + + # Create a room to trigger a server notification + room_api = RoomApi() + room_api.create_room() + + await notifier_task + + +asyncio.run(test_notifier()) diff --git a/jellyfish/__init__.py b/jellyfish/__init__.py index 52d8561..fbba6e4 100644 --- a/jellyfish/__init__.py +++ b/jellyfish/__init__.py @@ -29,7 +29,7 @@ ) # API -from jellyfish._webhook_notifier import receive_json +from jellyfish._webhook_notifier import receive_binary from jellyfish._ws_notifier import Notifier from jellyfish.api._recording_api import RecordingApi from jellyfish.api._room_api import RoomApi @@ -38,7 +38,7 @@ "RoomApi", "RecordingApi", "Notifier", - "receive_json", + "receive_binary", "Room", "RoomConfig", "RoomConfigVideoCodec", diff --git a/jellyfish/_openapi_client/api/default/__init__.py b/jellyfish/_openapi_client/api/default/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jellyfish/_openapi_client/api/default/healthcheck.py b/jellyfish/_openapi_client/api/default/healthcheck.py new file mode 100644 index 0000000..2cef558 --- /dev/null +++ b/jellyfish/_openapi_client/api/default/healthcheck.py @@ -0,0 +1,128 @@ +from http import HTTPStatus +from typing import Any, Dict, Optional, Union + +import httpx + +from ... import errors +from ...client import AuthenticatedClient, Client +from ...models.healthcheck_response import HealthcheckResponse +from ...types import Response + + +def _get_kwargs() -> Dict[str, Any]: + return { + "method": "get", + "url": "/health", + } + + +def _parse_response( + *, client: Union[AuthenticatedClient, Client], response: httpx.Response +) -> Optional[HealthcheckResponse]: + if response.status_code == HTTPStatus.OK: + response_200 = HealthcheckResponse.from_dict(response.json()) + + return response_200 + if response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR: + response_500 = HealthcheckResponse.from_dict(response.json()) + + return response_500 + if client.raise_on_unexpected_status: + raise errors.UnexpectedStatus(response.status_code, response.content) + else: + return None + + +def _build_response( + *, client: Union[AuthenticatedClient, Client], response: httpx.Response +) -> Response[HealthcheckResponse]: + return Response( + status_code=HTTPStatus(response.status_code), + content=response.content, + headers=response.headers, + parsed=_parse_response(client=client, response=response), + ) + + +def sync_detailed( + *, + client: Union[AuthenticatedClient, Client], +) -> Response[HealthcheckResponse]: + """Describes the health of Jellyfish + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + Response[HealthcheckResponse] + """ + + kwargs = _get_kwargs() + + response = client.get_httpx_client().request( + **kwargs, + ) + + return _build_response(client=client, response=response) + + +def sync( + *, + client: Union[AuthenticatedClient, Client], +) -> Optional[HealthcheckResponse]: + """Describes the health of Jellyfish + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + HealthcheckResponse + """ + + return sync_detailed( + client=client, + ).parsed + + +async def asyncio_detailed( + *, + client: Union[AuthenticatedClient, Client], +) -> Response[HealthcheckResponse]: + """Describes the health of Jellyfish + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + Response[HealthcheckResponse] + """ + + kwargs = _get_kwargs() + + response = await client.get_async_httpx_client().request(**kwargs) + + return _build_response(client=client, response=response) + + +async def asyncio( + *, + client: Union[AuthenticatedClient, Client], +) -> Optional[HealthcheckResponse]: + """Describes the health of Jellyfish + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + HealthcheckResponse + """ + + return ( + await asyncio_detailed( + client=client, + ) + ).parsed diff --git a/jellyfish/_openapi_client/models/__init__.py b/jellyfish/_openapi_client/models/__init__.py index 978c63c..a7de09e 100644 --- a/jellyfish/_openapi_client/models/__init__.py +++ b/jellyfish/_openapi_client/models/__init__.py @@ -15,6 +15,10 @@ from .component_properties_rtsp import ComponentPropertiesRTSP from .component_rtsp import ComponentRTSP from .error import Error +from .health_report import HealthReport +from .health_report_distribution import HealthReportDistribution +from .health_report_status import HealthReportStatus +from .healthcheck_response import HealthcheckResponse from .peer import Peer from .peer_details_response import PeerDetailsResponse from .peer_details_response_data import PeerDetailsResponseData @@ -30,6 +34,8 @@ from .rooms_listing_response import RoomsListingResponse from .s3_credentials import S3Credentials from .subscription_config import SubscriptionConfig +from .track import Track +from .track_type import TrackType __all__ = ( "AddComponentJsonBody", @@ -47,6 +53,10 @@ "ComponentPropertiesRTSP", "ComponentRTSP", "Error", + "HealthcheckResponse", + "HealthReport", + "HealthReportDistribution", + "HealthReportStatus", "Peer", "PeerDetailsResponse", "PeerDetailsResponseData", @@ -62,4 +72,6 @@ "RoomsListingResponse", "S3Credentials", "SubscriptionConfig", + "Track", + "TrackType", ) diff --git a/jellyfish/_openapi_client/models/component_file.py b/jellyfish/_openapi_client/models/component_file.py index d0cc1d6..1af26c8 100644 --- a/jellyfish/_openapi_client/models/component_file.py +++ b/jellyfish/_openapi_client/models/component_file.py @@ -7,6 +7,7 @@ if TYPE_CHECKING: from ..models.component_properties_file import ComponentPropertiesFile + from ..models.track import Track T = TypeVar("T", bound="ComponentFile") @@ -18,6 +19,8 @@ class ComponentFile: id: str """Assigned component ID""" + tracks: List["Track"] + """List of all component's tracks""" type: str """Component type""" properties: Union[Unset, "ComponentPropertiesFile"] = UNSET @@ -28,6 +31,12 @@ class ComponentFile: def to_dict(self) -> Dict[str, Any]: """@private""" id = self.id + tracks = [] + for tracks_item_data in self.tracks: + tracks_item = tracks_item_data.to_dict() + + tracks.append(tracks_item) + type = self.type properties: Union[Unset, Dict[str, Any]] = UNSET if not isinstance(self.properties, Unset): @@ -38,6 +47,7 @@ def to_dict(self) -> Dict[str, Any]: field_dict.update( { "id": id, + "tracks": tracks, "type": type, } ) @@ -50,10 +60,18 @@ def to_dict(self) -> Dict[str, Any]: def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: """@private""" from ..models.component_properties_file import ComponentPropertiesFile + from ..models.track import Track d = src_dict.copy() id = d.pop("id") + tracks = [] + _tracks = d.pop("tracks") + for tracks_item_data in _tracks: + tracks_item = Track.from_dict(tracks_item_data) + + tracks.append(tracks_item) + type = d.pop("type") _properties = d.pop("properties", UNSET) @@ -65,6 +83,7 @@ def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: component_file = cls( id=id, + tracks=tracks, type=type, properties=properties, ) diff --git a/jellyfish/_openapi_client/models/component_hls.py b/jellyfish/_openapi_client/models/component_hls.py index a78a4ff..41c2797 100644 --- a/jellyfish/_openapi_client/models/component_hls.py +++ b/jellyfish/_openapi_client/models/component_hls.py @@ -5,6 +5,7 @@ if TYPE_CHECKING: from ..models.component_properties_hls import ComponentPropertiesHLS + from ..models.track import Track T = TypeVar("T", bound="ComponentHLS") @@ -18,6 +19,8 @@ class ComponentHLS: """Assigned component ID""" properties: "ComponentPropertiesHLS" """Properties specific to the HLS component""" + tracks: List["Track"] + """List of all component's tracks""" type: str """Component type""" additional_properties: Dict[str, Any] = _attrs_field(init=False, factory=dict) @@ -28,6 +31,12 @@ def to_dict(self) -> Dict[str, Any]: id = self.id properties = self.properties.to_dict() + tracks = [] + for tracks_item_data in self.tracks: + tracks_item = tracks_item_data.to_dict() + + tracks.append(tracks_item) + type = self.type field_dict: Dict[str, Any] = {} @@ -36,6 +45,7 @@ def to_dict(self) -> Dict[str, Any]: { "id": id, "properties": properties, + "tracks": tracks, "type": type, } ) @@ -46,17 +56,26 @@ def to_dict(self) -> Dict[str, Any]: def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: """@private""" from ..models.component_properties_hls import ComponentPropertiesHLS + from ..models.track import Track d = src_dict.copy() id = d.pop("id") properties = ComponentPropertiesHLS.from_dict(d.pop("properties")) + tracks = [] + _tracks = d.pop("tracks") + for tracks_item_data in _tracks: + tracks_item = Track.from_dict(tracks_item_data) + + tracks.append(tracks_item) + type = d.pop("type") component_hls = cls( id=id, properties=properties, + tracks=tracks, type=type, ) diff --git a/jellyfish/_openapi_client/models/component_properties_file.py b/jellyfish/_openapi_client/models/component_properties_file.py index c78a29c..e5824dc 100644 --- a/jellyfish/_openapi_client/models/component_properties_file.py +++ b/jellyfish/_openapi_client/models/component_properties_file.py @@ -1,10 +1,8 @@ -from typing import Any, Dict, List, Type, TypeVar, Union +from typing import Any, Dict, List, Optional, Type, TypeVar from attrs import define as _attrs_define from attrs import field as _attrs_field -from ..types import UNSET, Unset - T = TypeVar("T", bound="ComponentPropertiesFile") @@ -14,7 +12,7 @@ class ComponentPropertiesFile: file_path: str """Relative path to track file. Must be either OPUS encapsulated in Ogg or raw h264""" - framerate: Union[Unset, None, int] = UNSET + framerate: Optional[int] """Framerate of video in a file. It is only valid for video track""" additional_properties: Dict[str, Any] = _attrs_field(init=False, factory=dict) """@private""" @@ -29,10 +27,9 @@ def to_dict(self) -> Dict[str, Any]: field_dict.update( { "filePath": file_path, + "framerate": framerate, } ) - if framerate is not UNSET: - field_dict["framerate"] = framerate return field_dict @@ -42,7 +39,7 @@ def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: d = src_dict.copy() file_path = d.pop("filePath") - framerate = d.pop("framerate", UNSET) + framerate = d.pop("framerate") component_properties_file = cls( file_path=file_path, diff --git a/jellyfish/_openapi_client/models/component_rtsp.py b/jellyfish/_openapi_client/models/component_rtsp.py index c2b1f0e..eb876da 100644 --- a/jellyfish/_openapi_client/models/component_rtsp.py +++ b/jellyfish/_openapi_client/models/component_rtsp.py @@ -5,6 +5,7 @@ if TYPE_CHECKING: from ..models.component_properties_rtsp import ComponentPropertiesRTSP + from ..models.track import Track T = TypeVar("T", bound="ComponentRTSP") @@ -18,6 +19,8 @@ class ComponentRTSP: """Assigned component ID""" properties: "ComponentPropertiesRTSP" """Properties specific to the RTSP component""" + tracks: List["Track"] + """List of all component's tracks""" type: str """Component type""" additional_properties: Dict[str, Any] = _attrs_field(init=False, factory=dict) @@ -28,6 +31,12 @@ def to_dict(self) -> Dict[str, Any]: id = self.id properties = self.properties.to_dict() + tracks = [] + for tracks_item_data in self.tracks: + tracks_item = tracks_item_data.to_dict() + + tracks.append(tracks_item) + type = self.type field_dict: Dict[str, Any] = {} @@ -36,6 +45,7 @@ def to_dict(self) -> Dict[str, Any]: { "id": id, "properties": properties, + "tracks": tracks, "type": type, } ) @@ -46,17 +56,26 @@ def to_dict(self) -> Dict[str, Any]: def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: """@private""" from ..models.component_properties_rtsp import ComponentPropertiesRTSP + from ..models.track import Track d = src_dict.copy() id = d.pop("id") properties = ComponentPropertiesRTSP.from_dict(d.pop("properties")) + tracks = [] + _tracks = d.pop("tracks") + for tracks_item_data in _tracks: + tracks_item = Track.from_dict(tracks_item_data) + + tracks.append(tracks_item) + type = d.pop("type") component_rtsp = cls( id=id, properties=properties, + tracks=tracks, type=type, ) diff --git a/jellyfish/_openapi_client/models/health_report.py b/jellyfish/_openapi_client/models/health_report.py new file mode 100644 index 0000000..bcdad02 --- /dev/null +++ b/jellyfish/_openapi_client/models/health_report.py @@ -0,0 +1,84 @@ +from typing import TYPE_CHECKING, Any, Dict, List, Type, TypeVar + +from attrs import define as _attrs_define +from attrs import field as _attrs_field + +from ..models.health_report_status import HealthReportStatus + +if TYPE_CHECKING: + from ..models.health_report_distribution import HealthReportDistribution + + +T = TypeVar("T", bound="HealthReport") + + +@_attrs_define +class HealthReport: + """Describes overall Jellyfish health""" + + distribution: "HealthReportDistribution" + """Informs about the status of Jellyfish distribution""" + status: HealthReportStatus + """Informs about the status of Jellyfish or a specific service""" + uptime: int + """Uptime of Jellyfish (in seconds)""" + additional_properties: Dict[str, Any] = _attrs_field(init=False, factory=dict) + """@private""" + + def to_dict(self) -> Dict[str, Any]: + """@private""" + distribution = self.distribution.to_dict() + + status = self.status.value + + uptime = self.uptime + + field_dict: Dict[str, Any] = {} + field_dict.update(self.additional_properties) + field_dict.update( + { + "distribution": distribution, + "status": status, + "uptime": uptime, + } + ) + + return field_dict + + @classmethod + def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: + """@private""" + from ..models.health_report_distribution import HealthReportDistribution + + d = src_dict.copy() + distribution = HealthReportDistribution.from_dict(d.pop("distribution")) + + status = HealthReportStatus(d.pop("status")) + + uptime = d.pop("uptime") + + health_report = cls( + distribution=distribution, + status=status, + uptime=uptime, + ) + + health_report.additional_properties = d + return health_report + + @property + def additional_keys(self) -> List[str]: + """@private""" + return list(self.additional_properties.keys()) + + def __getitem__(self, key: str) -> Any: + return self.additional_properties[key] + + def __setitem__(self, key: str, value: Any) -> None: + self.additional_properties[key] = value + + def __delitem__(self, key: str) -> None: + del self.additional_properties[key] + + def __contains__(self, key: str) -> bool: + return key in self.additional_properties diff --git a/jellyfish/_openapi_client/models/health_report_distribution.py b/jellyfish/_openapi_client/models/health_report_distribution.py new file mode 100644 index 0000000..57dc07f --- /dev/null +++ b/jellyfish/_openapi_client/models/health_report_distribution.py @@ -0,0 +1,79 @@ +from typing import Any, Dict, List, Type, TypeVar, Union + +from attrs import define as _attrs_define +from attrs import field as _attrs_field + +from ..models.health_report_status import HealthReportStatus +from ..types import UNSET, Unset + +T = TypeVar("T", bound="HealthReportDistribution") + + +@_attrs_define +class HealthReportDistribution: + """Informs about the status of Jellyfish distribution""" + + node_status: HealthReportStatus + """Informs about the status of Jellyfish or a specific service""" + nodes_in_cluster: int + """Amount of nodes (including this Jellyfish's node) in the distribution cluster""" + enabled: Union[Unset, bool] = UNSET + """Whether distribution is enabled on this Jellyfish""" + additional_properties: Dict[str, Any] = _attrs_field(init=False, factory=dict) + """@private""" + + def to_dict(self) -> Dict[str, Any]: + """@private""" + node_status = self.node_status.value + + nodes_in_cluster = self.nodes_in_cluster + enabled = self.enabled + + field_dict: Dict[str, Any] = {} + field_dict.update(self.additional_properties) + field_dict.update( + { + "nodeStatus": node_status, + "nodesInCluster": nodes_in_cluster, + } + ) + if enabled is not UNSET: + field_dict["enabled"] = enabled + + return field_dict + + @classmethod + def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: + """@private""" + d = src_dict.copy() + node_status = HealthReportStatus(d.pop("nodeStatus")) + + nodes_in_cluster = d.pop("nodesInCluster") + + enabled = d.pop("enabled", UNSET) + + health_report_distribution = cls( + node_status=node_status, + nodes_in_cluster=nodes_in_cluster, + enabled=enabled, + ) + + health_report_distribution.additional_properties = d + return health_report_distribution + + @property + def additional_keys(self) -> List[str]: + """@private""" + return list(self.additional_properties.keys()) + + def __getitem__(self, key: str) -> Any: + return self.additional_properties[key] + + def __setitem__(self, key: str, value: Any) -> None: + self.additional_properties[key] = value + + def __delitem__(self, key: str) -> None: + del self.additional_properties[key] + + def __contains__(self, key: str) -> bool: + return key in self.additional_properties diff --git a/jellyfish/_openapi_client/models/health_report_status.py b/jellyfish/_openapi_client/models/health_report_status.py new file mode 100644 index 0000000..f34deb0 --- /dev/null +++ b/jellyfish/_openapi_client/models/health_report_status.py @@ -0,0 +1,11 @@ +from enum import Enum + + +class HealthReportStatus(str, Enum): + """Informs about the status of Jellyfish or a specific service""" + + DOWN = "DOWN" + UP = "UP" + + def __str__(self) -> str: + return str(self.value) diff --git a/jellyfish/_openapi_client/models/healthcheck_response.py b/jellyfish/_openapi_client/models/healthcheck_response.py new file mode 100644 index 0000000..5e2fc32 --- /dev/null +++ b/jellyfish/_openapi_client/models/healthcheck_response.py @@ -0,0 +1,66 @@ +from typing import TYPE_CHECKING, Any, Dict, List, Type, TypeVar + +from attrs import define as _attrs_define +from attrs import field as _attrs_field + +if TYPE_CHECKING: + from ..models.health_report import HealthReport + + +T = TypeVar("T", bound="HealthcheckResponse") + + +@_attrs_define +class HealthcheckResponse: + """Response containing health report of Jellyfish""" + + data: "HealthReport" + """Describes overall Jellyfish health""" + additional_properties: Dict[str, Any] = _attrs_field(init=False, factory=dict) + """@private""" + + def to_dict(self) -> Dict[str, Any]: + """@private""" + data = self.data.to_dict() + + field_dict: Dict[str, Any] = {} + field_dict.update(self.additional_properties) + field_dict.update( + { + "data": data, + } + ) + + return field_dict + + @classmethod + def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: + """@private""" + from ..models.health_report import HealthReport + + d = src_dict.copy() + data = HealthReport.from_dict(d.pop("data")) + + healthcheck_response = cls( + data=data, + ) + + healthcheck_response.additional_properties = d + return healthcheck_response + + @property + def additional_keys(self) -> List[str]: + """@private""" + return list(self.additional_properties.keys()) + + def __getitem__(self, key: str) -> Any: + return self.additional_properties[key] + + def __setitem__(self, key: str, value: Any) -> None: + self.additional_properties[key] = value + + def __delitem__(self, key: str) -> None: + del self.additional_properties[key] + + def __contains__(self, key: str) -> bool: + return key in self.additional_properties diff --git a/jellyfish/_openapi_client/models/peer.py b/jellyfish/_openapi_client/models/peer.py index 2c435fd..4b0c6bb 100644 --- a/jellyfish/_openapi_client/models/peer.py +++ b/jellyfish/_openapi_client/models/peer.py @@ -1,10 +1,14 @@ -from typing import Any, Dict, List, Type, TypeVar +from typing import TYPE_CHECKING, Any, Dict, List, Type, TypeVar from attrs import define as _attrs_define from attrs import field as _attrs_field from ..models.peer_status import PeerStatus +if TYPE_CHECKING: + from ..models.track import Track + + T = TypeVar("T", bound="Peer") @@ -14,8 +18,12 @@ class Peer: id: str """Assigned peer id""" + metadata: Any + """Custom metadata set by the peer""" status: PeerStatus """Informs about the peer status""" + tracks: List["Track"] + """List of all peer's tracks""" type: str """Peer type""" additional_properties: Dict[str, Any] = _attrs_field(init=False, factory=dict) @@ -24,8 +32,15 @@ class Peer: def to_dict(self) -> Dict[str, Any]: """@private""" id = self.id + metadata = self.metadata status = self.status.value + tracks = [] + for tracks_item_data in self.tracks: + tracks_item = tracks_item_data.to_dict() + + tracks.append(tracks_item) + type = self.type field_dict: Dict[str, Any] = {} @@ -33,7 +48,9 @@ def to_dict(self) -> Dict[str, Any]: field_dict.update( { "id": id, + "metadata": metadata, "status": status, + "tracks": tracks, "type": type, } ) @@ -43,16 +60,29 @@ def to_dict(self) -> Dict[str, Any]: @classmethod def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: """@private""" + from ..models.track import Track + d = src_dict.copy() id = d.pop("id") + metadata = d.pop("metadata") + status = PeerStatus(d.pop("status")) + tracks = [] + _tracks = d.pop("tracks") + for tracks_item_data in _tracks: + tracks_item = Track.from_dict(tracks_item_data) + + tracks.append(tracks_item) + type = d.pop("type") peer = cls( id=id, + metadata=metadata, status=status, + tracks=tracks, type=type, ) diff --git a/jellyfish/_openapi_client/models/track.py b/jellyfish/_openapi_client/models/track.py new file mode 100644 index 0000000..db3027f --- /dev/null +++ b/jellyfish/_openapi_client/models/track.py @@ -0,0 +1,84 @@ +from typing import Any, Dict, List, Type, TypeVar, Union + +from attrs import define as _attrs_define +from attrs import field as _attrs_field + +from ..models.track_type import TrackType +from ..types import UNSET, Unset + +T = TypeVar("T", bound="Track") + + +@_attrs_define +class Track: + """Describes media track of a Peer or Component""" + + id: Union[Unset, str] = UNSET + """None""" + metadata: Union[Unset, Any] = UNSET + """None""" + type: Union[Unset, TrackType] = UNSET + """None""" + additional_properties: Dict[str, Any] = _attrs_field(init=False, factory=dict) + """@private""" + + def to_dict(self) -> Dict[str, Any]: + """@private""" + id = self.id + metadata = self.metadata + type: Union[Unset, str] = UNSET + if not isinstance(self.type, Unset): + type = self.type.value + + field_dict: Dict[str, Any] = {} + field_dict.update(self.additional_properties) + field_dict.update({}) + if id is not UNSET: + field_dict["id"] = id + if metadata is not UNSET: + field_dict["metadata"] = metadata + if type is not UNSET: + field_dict["type"] = type + + return field_dict + + @classmethod + def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: + """@private""" + d = src_dict.copy() + id = d.pop("id", UNSET) + + metadata = d.pop("metadata", UNSET) + + _type = d.pop("type", UNSET) + type: Union[Unset, TrackType] + if isinstance(_type, Unset): + type = UNSET + else: + type = TrackType(_type) + + track = cls( + id=id, + metadata=metadata, + type=type, + ) + + track.additional_properties = d + return track + + @property + def additional_keys(self) -> List[str]: + """@private""" + return list(self.additional_properties.keys()) + + def __getitem__(self, key: str) -> Any: + return self.additional_properties[key] + + def __setitem__(self, key: str, value: Any) -> None: + self.additional_properties[key] = value + + def __delitem__(self, key: str) -> None: + del self.additional_properties[key] + + def __contains__(self, key: str) -> bool: + return key in self.additional_properties diff --git a/jellyfish/_openapi_client/models/track_type.py b/jellyfish/_openapi_client/models/track_type.py new file mode 100644 index 0000000..0c7ca4e --- /dev/null +++ b/jellyfish/_openapi_client/models/track_type.py @@ -0,0 +1,11 @@ +from enum import Enum + + +class TrackType(str, Enum): + """None""" + + AUDIO = "audio" + VIDEO = "video" + + def __str__(self) -> str: + return str(self.value) diff --git a/jellyfish/_webhook_notifier.py b/jellyfish/_webhook_notifier.py index 020f959..904a1d7 100644 --- a/jellyfish/_webhook_notifier.py +++ b/jellyfish/_webhook_notifier.py @@ -3,21 +3,17 @@ notification from jellyfish to notification structs. """ -from typing import Dict - import betterproto from jellyfish.events._protos.jellyfish import ServerMessage -def receive_json(json: Dict) -> betterproto.Message: +def receive_binary(binary: bytes) -> betterproto.Message: """ - Transform received json notification to adequate notification instance. + Transform received protobuf notification to adequate notification instance. The available notifications are listed in `jellyfish.events` module. """ - msg = json["notification"] - msg = bytes(msg, "utf-8") - message = ServerMessage().parse(msg) + message = ServerMessage().parse(binary) _which, message = betterproto.which_one_of(message, "content") return message diff --git a/jellyfish/events/__init__.py b/jellyfish/events/__init__.py index f7b8606..b3a519a 100644 --- a/jellyfish/events/__init__.py +++ b/jellyfish/events/__init__.py @@ -1,5 +1,5 @@ """ -Server events being sent Jellyfish +.. include:: ../../docs/server_notifications.md """ # Exported messages @@ -13,16 +13,26 @@ ServerMessageRoomCrashed, ServerMessageRoomCreated, ServerMessageRoomDeleted, + ServerMessageTrack, + ServerMessageTrackAdded, + ServerMessageTrackMetadataUpdated, + ServerMessageTrackRemoved, + ServerMessageTrackType, ) __all__ = [ + "ServerMessageRoomCreated", + "ServerMessageRoomDeleted", + "ServerMessageRoomCrashed", + "ServerMessagePeerConnected", + "ServerMessagePeerDisconnected", + "ServerMessagePeerCrashed", "ServerMessageComponentCrashed", + "ServerMessageTrack", + "ServerMessageTrackType", + "ServerMessageTrackAdded", + "ServerMessageTrackMetadataUpdated", + "ServerMessageTrackRemoved", "ServerMessageHlsPlayable", "ServerMessageMetricsReport", - "ServerMessagePeerCrashed", - "ServerMessagePeerConnected", - "ServerMessagePeerDisconnected", - "ServerMessageRoomCrashed", - "ServerMessageRoomDeleted", - "ServerMessageRoomCreated", ] diff --git a/jellyfish/events/_protos/jellyfish/__init__.py b/jellyfish/events/_protos/jellyfish/__init__.py index 60b5403..e313909 100644 --- a/jellyfish/events/_protos/jellyfish/__init__.py +++ b/jellyfish/events/_protos/jellyfish/__init__.py @@ -9,13 +9,25 @@ class ServerMessageEventType(betterproto.Enum): + """Defines message groups for which client can subscribe""" + EVENT_TYPE_UNSPECIFIED = 0 EVENT_TYPE_SERVER_NOTIFICATION = 1 EVENT_TYPE_METRICS = 2 +class ServerMessageTrackType(betterproto.Enum): + """Defines types of tracks being published by peers and component""" + + TRACK_TYPE_UNSPECIFIED = 0 + TRACK_TYPE_VIDEO = 1 + TRACK_TYPE_AUDIO = 2 + + @dataclass(eq=False, repr=False) class ServerMessage(betterproto.Message): + """Defines any type of message passed between JF and server client""" + room_crashed: "ServerMessageRoomCrashed" = betterproto.message_field( 1, group="content" ) @@ -61,83 +73,175 @@ class ServerMessage(betterproto.Message): hls_upload_crashed: "ServerMessageHlsUploadCrashed" = betterproto.message_field( 15, group="content" ) + peer_metadata_updated: "ServerMessagePeerMetadataUpdated" = ( + betterproto.message_field(16, group="content") + ) + track_added: "ServerMessageTrackAdded" = betterproto.message_field( + 17, group="content" + ) + track_removed: "ServerMessageTrackRemoved" = betterproto.message_field( + 18, group="content" + ) + track_metadata_updated: "ServerMessageTrackMetadataUpdated" = ( + betterproto.message_field(19, group="content") + ) @dataclass(eq=False, repr=False) class ServerMessageRoomCrashed(betterproto.Message): + """Notification sent when a room crashes""" + room_id: str = betterproto.string_field(1) @dataclass(eq=False, repr=False) class ServerMessagePeerConnected(betterproto.Message): + """Notification sent when a peer connects""" + room_id: str = betterproto.string_field(1) peer_id: str = betterproto.string_field(2) @dataclass(eq=False, repr=False) class ServerMessagePeerDisconnected(betterproto.Message): + """Notification sent when a peer disconnects from JF""" + room_id: str = betterproto.string_field(1) peer_id: str = betterproto.string_field(2) @dataclass(eq=False, repr=False) class ServerMessagePeerCrashed(betterproto.Message): + """Notification sent when a peer crashes""" + room_id: str = betterproto.string_field(1) peer_id: str = betterproto.string_field(2) @dataclass(eq=False, repr=False) class ServerMessageComponentCrashed(betterproto.Message): + """Notification sent when a component crashes""" + room_id: str = betterproto.string_field(1) component_id: str = betterproto.string_field(2) @dataclass(eq=False, repr=False) class ServerMessageAuthenticated(betterproto.Message): + """Response sent by JF, confirming successfull authentication""" + pass @dataclass(eq=False, repr=False) class ServerMessageAuthRequest(betterproto.Message): + """Request sent by client, to authenticate to JF server""" + token: str = betterproto.string_field(1) @dataclass(eq=False, repr=False) class ServerMessageSubscribeRequest(betterproto.Message): + """Request sent by client to subsribe for certain message type""" + event_type: "ServerMessageEventType" = betterproto.enum_field(1) @dataclass(eq=False, repr=False) class ServerMessageSubscribeResponse(betterproto.Message): + """Response sent by JF, confirming subscription for message type""" + event_type: "ServerMessageEventType" = betterproto.enum_field(1) @dataclass(eq=False, repr=False) class ServerMessageRoomCreated(betterproto.Message): + """Notification sent when a room is created""" + room_id: str = betterproto.string_field(1) @dataclass(eq=False, repr=False) class ServerMessageRoomDeleted(betterproto.Message): + """Notification sent when a room is deleted""" + room_id: str = betterproto.string_field(1) @dataclass(eq=False, repr=False) class ServerMessageMetricsReport(betterproto.Message): + """Message containing WebRTC metrics from JF""" + metrics: str = betterproto.string_field(1) @dataclass(eq=False, repr=False) class ServerMessageHlsPlayable(betterproto.Message): + """Notification sent when the HLS stream becomes available in a room""" + room_id: str = betterproto.string_field(1) component_id: str = betterproto.string_field(2) @dataclass(eq=False, repr=False) class ServerMessageHlsUploaded(betterproto.Message): + """ + Notification sent when the HLS recording is successfully uploded to AWS S3 + """ + room_id: str = betterproto.string_field(1) @dataclass(eq=False, repr=False) class ServerMessageHlsUploadCrashed(betterproto.Message): + """Notification sent when the upload of HLS recording to AWS S3 fails""" + + room_id: str = betterproto.string_field(1) + + +@dataclass(eq=False, repr=False) +class ServerMessagePeerMetadataUpdated(betterproto.Message): + """Notification sent when peer updates its metadata""" + + room_id: str = betterproto.string_field(1) + peer_id: str = betterproto.string_field(2) + metadata: str = betterproto.string_field(3) + + +@dataclass(eq=False, repr=False) +class ServerMessageTrack(betterproto.Message): + """Describes a media track""" + + id: str = betterproto.string_field(1) + type: "ServerMessageTrackType" = betterproto.enum_field(2) + metadata: str = betterproto.string_field(3) + + +@dataclass(eq=False, repr=False) +class ServerMessageTrackAdded(betterproto.Message): + """Notification sent when peer or component adds new track""" + + room_id: str = betterproto.string_field(1) + peer_id: str = betterproto.string_field(2, group="endpoint_info") + component_id: str = betterproto.string_field(3, group="endpoint_info") + track: "ServerMessageTrack" = betterproto.message_field(4) + + +@dataclass(eq=False, repr=False) +class ServerMessageTrackRemoved(betterproto.Message): + """Notification sent when a track is removed""" + + room_id: str = betterproto.string_field(1) + peer_id: str = betterproto.string_field(2, group="endpoint_info") + component_id: str = betterproto.string_field(3, group="endpoint_info") + track: "ServerMessageTrack" = betterproto.message_field(4) + + +@dataclass(eq=False, repr=False) +class ServerMessageTrackMetadataUpdated(betterproto.Message): + """Notification sent when metadata of a multimedia track is updated""" + room_id: str = betterproto.string_field(1) + peer_id: str = betterproto.string_field(2, group="endpoint_info") + component_id: str = betterproto.string_field(3, group="endpoint_info") + track: "ServerMessageTrack" = betterproto.message_field(4) diff --git a/poetry_scripts.py b/poetry_scripts.py index 3cafd4e..5cf643a 100644 --- a/poetry_scripts.py +++ b/poetry_scripts.py @@ -46,7 +46,7 @@ def generate_docs(): --include-undocumented \ --favicon https://logo.swmansion.com/membrane/\?width\=100\&variant\=signetDark\ --logo https://logo.swmansion.com/membrane/\?width\=70\&variant\=signetDark\ - -t doc_templates \ + -t templates/doc \ -o doc \ jellyfish" ) @@ -70,5 +70,5 @@ def update_client(): --url https://raw.githubusercontent.com/jellyfish-dev/" "jellyfish/main/openapi.yaml \ --config openapi-python-client-config.yaml \ - --custom-template-path=openapi_templates" + --custom-template-path=templates/openapi" ) diff --git a/protos b/protos index 3781848..cb67f49 160000 --- a/protos +++ b/protos @@ -1 +1 @@ -Subproject commit 3781848239f67a88866f861fa798a8c18384e666 +Subproject commit cb67f49c47250daf9a97f1296ef7be8965ef4acf diff --git a/doc_templates/module.html.jinja2 b/templates/doc/module.html.jinja2 similarity index 100% rename from doc_templates/module.html.jinja2 rename to templates/doc/module.html.jinja2 diff --git a/openapi_templates/model.py.jinja b/templates/openapi/model.py.jinja similarity index 100% rename from openapi_templates/model.py.jinja rename to templates/openapi/model.py.jinja diff --git a/openapi_templates/str_enum.py.jinja b/templates/openapi/str_enum.py.jinja similarity index 100% rename from openapi_templates/str_enum.py.jinja rename to templates/openapi/str_enum.py.jinja diff --git a/tests/support/protos/jellyfish/__init__.py b/tests/support/protos/jellyfish/__init__.py index c72002b..76a88f4 100644 --- a/tests/support/protos/jellyfish/__init__.py +++ b/tests/support/protos/jellyfish/__init__.py @@ -10,6 +10,8 @@ @dataclass(eq=False, repr=False) class PeerMessage(betterproto.Message): + """Defines any type of message sent between JF and a peer""" + authenticated: "PeerMessageAuthenticated" = betterproto.message_field( 1, group="content" ) @@ -21,14 +23,20 @@ class PeerMessage(betterproto.Message): @dataclass(eq=False, repr=False) class PeerMessageAuthenticated(betterproto.Message): + """Response sent by JF, confirming successfull authentication""" + pass @dataclass(eq=False, repr=False) class PeerMessageAuthRequest(betterproto.Message): + """Request sent by peer, to authenticate to JF server""" + token: str = betterproto.string_field(1) @dataclass(eq=False, repr=False) class PeerMessageMediaEvent(betterproto.Message): + """Any type of WebRTC messages passed betweend JF and peer""" + data: str = betterproto.string_field(1) diff --git a/tests/support/webhook_notifier.py b/tests/support/webhook_notifier.py index 730f91d..8da0766 100644 --- a/tests/support/webhook_notifier.py +++ b/tests/support/webhook_notifier.py @@ -2,7 +2,7 @@ from flask import Flask, Response, request -from jellyfish import receive_json +from jellyfish import receive_binary app = Flask(__name__) DATA_QUEUE = None @@ -15,9 +15,9 @@ def respond_default(): @app.route("/webhook", methods=["POST"]) def respond_root(): - json = request.get_json() - json = receive_json(json) - DATA_QUEUE.put(json) + data = request.get_data() + msg = receive_binary(data) + DATA_QUEUE.put(msg) return Response(status=200) diff --git a/tests/test_notifier.py b/tests/test_notifier.py index 3b1a8a3..1ac62de 100644 --- a/tests/test_notifier.py +++ b/tests/test_notifier.py @@ -9,13 +9,15 @@ import pytest import requests -from jellyfish import Notifier, PeerOptionsWebRTC, RoomApi +from jellyfish import ComponentOptionsFile, Notifier, PeerOptionsWebRTC, RoomApi from jellyfish.events import ( ServerMessageMetricsReport, ServerMessagePeerConnected, ServerMessagePeerDisconnected, ServerMessageRoomCreated, ServerMessageRoomDeleted, + ServerMessageTrackAdded, + ServerMessageTrackRemoved, ) from tests.support.asyncio_utils import assert_events, assert_metrics, cancel from tests.support.peer_socket import PeerSocket @@ -28,6 +30,9 @@ WEBHOOK_URL = f"http://{WEBHOOK_ADDRESS}:5000/webhook" queue = Queue() +CODEC_H264 = "h264" +FILE_OPTIONS = ComponentOptionsFile(file_path="video.h264") + @pytest.fixture(scope="session", autouse=True) def start_server(): @@ -176,6 +181,33 @@ async def test_peer_connected_room_deleted( for event in event_checks: self.assert_event(event) + @pytest.mark.asyncio + @pytest.mark.file_component_sources + async def test_file_component_connected_room_deleted( + self, room_api: RoomApi, notifier: Notifier + ): + event_checks = [ + ServerMessageRoomCreated, + ServerMessageTrackAdded, + ServerMessageTrackRemoved, + ServerMessageRoomDeleted, + ] + assert_task = asyncio.create_task(assert_events(notifier, event_checks.copy())) + + notifier_task = asyncio.create_task(notifier.connect()) + await notifier.wait_ready() + + _, room = room_api.create_room(webhook_url=WEBHOOK_URL) + room_api.add_component(room.id, options=FILE_OPTIONS) + + room_api.delete_room(room.id) + + await assert_task + await cancel(notifier_task) + + for event in event_checks: + self.assert_event(event) + def assert_event(self, event): data = queue.get(timeout=2.5) assert data == event or isinstance(data, event) diff --git a/tests/test_room_api.py b/tests/test_room_api.py index cce3dde..21ff8df 100644 --- a/tests/test_room_api.py +++ b/tests/test_room_api.py @@ -251,6 +251,7 @@ def _test_component(self, room_api: RoomApi, test_data: ComponentTestData): id=component.id, type=test_data.type, properties=test_data.properties, + tracks=[], ) assert response == component @@ -297,7 +298,13 @@ def test_invalid_subscription_in_auto_mode(self, room_api: RoomApi): class TestAddPeer: def _assert_peer_created(self, room_api, webrtc_peer, room_id): - peer = Peer(id=webrtc_peer.id, type="webrtc", status=PeerStatus("disconnected")) + peer = Peer( + id=webrtc_peer.id, + type="webrtc", + status=PeerStatus("disconnected"), + tracks=[], + metadata=None, + ) room = room_api.get_room(room_id) assert peer in room.peers