From 70fc75694bb15f790ae95139701e7d08cb8eaa2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Ro=C5=BCnawski?= Date: Wed, 20 Sep 2023 08:57:58 +0200 Subject: [PATCH 01/16] Add protobuf --- .gitmodules | 3 ++ compile_proto.sh | 24 +++++++++ .../jellyfish/peer_notifications_pb2.py | 30 +++++++++++ .../jellyfish/server_notifications_pb2.py | 52 +++++++++++++++++++ protos | 1 + 5 files changed, 110 insertions(+) create mode 100644 .gitmodules create mode 100755 compile_proto.sh create mode 100644 jellyfish/protos/jellyfish/peer_notifications_pb2.py create mode 100644 jellyfish/protos/jellyfish/server_notifications_pb2.py create mode 160000 protos diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..b2e4ed5 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "protos"] + path = protos + url = https://github.com/jellyfish-dev/protos.git diff --git a/compile_proto.sh b/compile_proto.sh new file mode 100755 index 0000000..b8a27b3 --- /dev/null +++ b/compile_proto.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +# Terminate on errors +set -e + + +printf "Synchronising submodules... " +git submodule sync --recursive >> /dev/null +git submodule update --recursive --remote --init >> /dev/null +printf "DONE\n\n" + +files=$(find protos/jellyfish -name "*.proto") + +printf "Compiling:\n" +count=1 +total=${#files[@]} +for file in $files; do + printf "Compile file %s %s ... " $count $file + protoc --python_out=./jellyfish/ $file + printf "DONE\n" + count=$(($count + 1)) +done + +autopep8 -ir jellyfish/protos \ No newline at end of file diff --git a/jellyfish/protos/jellyfish/peer_notifications_pb2.py b/jellyfish/protos/jellyfish/peer_notifications_pb2.py new file mode 100644 index 0000000..8e994ff --- /dev/null +++ b/jellyfish/protos/jellyfish/peer_notifications_pb2.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: protos/jellyfish/peer_notifications.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n)protos/jellyfish/peer_notifications.proto\x12\tjellyfish\"\x98\x02\n\x0bPeerMessage\x12=\n\rauthenticated\x18\x01 \x01(\x0b\x32$.jellyfish.PeerMessage.AuthenticatedH\x00\x12:\n\x0c\x61uth_request\x18\x02 \x01(\x0b\x32\".jellyfish.PeerMessage.AuthRequestH\x00\x12\x38\n\x0bmedia_event\x18\x03 \x01(\x0b\x32!.jellyfish.PeerMessage.MediaEventH\x00\x1a\x0f\n\rAuthenticated\x1a\x1c\n\x0b\x41uthRequest\x12\r\n\x05token\x18\x01 \x01(\t\x1a\x1a\n\nMediaEvent\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\tB\t\n\x07\x63ontentb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages( + DESCRIPTOR, 'protos.jellyfish.peer_notifications_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_PEERMESSAGE']._serialized_start = 57 + _globals['_PEERMESSAGE']._serialized_end = 337 + _globals['_PEERMESSAGE_AUTHENTICATED']._serialized_start = 253 + _globals['_PEERMESSAGE_AUTHENTICATED']._serialized_end = 268 + _globals['_PEERMESSAGE_AUTHREQUEST']._serialized_start = 270 + _globals['_PEERMESSAGE_AUTHREQUEST']._serialized_end = 298 + _globals['_PEERMESSAGE_MEDIAEVENT']._serialized_start = 300 + _globals['_PEERMESSAGE_MEDIAEVENT']._serialized_end = 326 +# @@protoc_insertion_point(module_scope) diff --git a/jellyfish/protos/jellyfish/server_notifications_pb2.py b/jellyfish/protos/jellyfish/server_notifications_pb2.py new file mode 100644 index 0000000..e653696 --- /dev/null +++ b/jellyfish/protos/jellyfish/server_notifications_pb2.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: protos/jellyfish/server_notifications.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n+protos/jellyfish/server_notifications.proto\x12\tjellyfish\"\xaf\x0c\n\rServerMessage\x12<\n\x0croom_crashed\x18\x01 \x01(\x0b\x32$.jellyfish.ServerMessage.RoomCrashedH\x00\x12@\n\x0epeer_connected\x18\x02 \x01(\x0b\x32&.jellyfish.ServerMessage.PeerConnectedH\x00\x12\x46\n\x11peer_disconnected\x18\x03 \x01(\x0b\x32).jellyfish.ServerMessage.PeerDisconnectedH\x00\x12<\n\x0cpeer_crashed\x18\x04 \x01(\x0b\x32$.jellyfish.ServerMessage.PeerCrashedH\x00\x12\x46\n\x11\x63omponent_crashed\x18\x05 \x01(\x0b\x32).jellyfish.ServerMessage.ComponentCrashedH\x00\x12?\n\rauthenticated\x18\x06 \x01(\x0b\x32&.jellyfish.ServerMessage.AuthenticatedH\x00\x12<\n\x0c\x61uth_request\x18\x07 \x01(\x0b\x32$.jellyfish.ServerMessage.AuthRequestH\x00\x12\x46\n\x11subscribe_request\x18\x08 \x01(\x0b\x32).jellyfish.ServerMessage.SubscribeRequestH\x00\x12H\n\x12subscribe_response\x18\t \x01(\x0b\x32*.jellyfish.ServerMessage.SubscribeResponseH\x00\x12<\n\x0croom_created\x18\n \x01(\x0b\x32$.jellyfish.ServerMessage.RoomCreatedH\x00\x12<\n\x0croom_deleted\x18\x0b \x01(\x0b\x32$.jellyfish.ServerMessage.RoomDeletedH\x00\x12@\n\x0emetrics_report\x18\x0c \x01(\x0b\x32&.jellyfish.ServerMessage.MetricsReportH\x00\x12<\n\x0chls_playable\x18\r \x01(\x0b\x32$.jellyfish.ServerMessage.HlsPlayableH\x00\x1a\x1e\n\x0bRoomCrashed\x12\x0f\n\x07room_id\x18\x01 \x01(\t\x1a\x31\n\rPeerConnected\x12\x0f\n\x07room_id\x18\x01 \x01(\t\x12\x0f\n\x07peer_id\x18\x02 \x01(\t\x1a\x34\n\x10PeerDisconnected\x12\x0f\n\x07room_id\x18\x01 \x01(\t\x12\x0f\n\x07peer_id\x18\x02 \x01(\t\x1a/\n\x0bPeerCrashed\x12\x0f\n\x07room_id\x18\x01 \x01(\t\x12\x0f\n\x07peer_id\x18\x02 \x01(\t\x1a\x39\n\x10\x43omponentCrashed\x12\x0f\n\x07room_id\x18\x01 \x01(\t\x12\x14\n\x0c\x63omponent_id\x18\x02 \x01(\t\x1a\x0f\n\rAuthenticated\x1a\x1c\n\x0b\x41uthRequest\x12\r\n\x05token\x18\x01 \x01(\t\x1aJ\n\x10SubscribeRequest\x12\x36\n\nevent_type\x18\x01 \x01(\x0e\x32\".jellyfish.ServerMessage.EventType\x1aK\n\x11SubscribeResponse\x12\x36\n\nevent_type\x18\x01 \x01(\x0e\x32\".jellyfish.ServerMessage.EventType\x1a\x1e\n\x0bRoomCreated\x12\x0f\n\x07room_id\x18\x01 \x01(\t\x1a\x1e\n\x0bRoomDeleted\x12\x0f\n\x07room_id\x18\x01 \x01(\t\x1a \n\rMetricsReport\x12\x0f\n\x07metrics\x18\x01 \x01(\t\x1a\x34\n\x0bHlsPlayable\x12\x0f\n\x07room_id\x18\x01 \x01(\t\x12\x14\n\x0c\x63omponent_id\x18\x02 \x01(\t\"c\n\tEventType\x12\x1a\n\x16\x45VENT_TYPE_UNSPECIFIED\x10\x00\x12\"\n\x1e\x45VENT_TYPE_SERVER_NOTIFICATION\x10\x01\x12\x16\n\x12\x45VENT_TYPE_METRICS\x10\x02\x42\t\n\x07\x63ontentb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages( + DESCRIPTOR, 'protos.jellyfish.server_notifications_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_SERVERMESSAGE']._serialized_start = 59 + _globals['_SERVERMESSAGE']._serialized_end = 1642 + _globals['_SERVERMESSAGE_ROOMCRASHED']._serialized_start = 935 + _globals['_SERVERMESSAGE_ROOMCRASHED']._serialized_end = 965 + _globals['_SERVERMESSAGE_PEERCONNECTED']._serialized_start = 967 + _globals['_SERVERMESSAGE_PEERCONNECTED']._serialized_end = 1016 + _globals['_SERVERMESSAGE_PEERDISCONNECTED']._serialized_start = 1018 + _globals['_SERVERMESSAGE_PEERDISCONNECTED']._serialized_end = 1070 + _globals['_SERVERMESSAGE_PEERCRASHED']._serialized_start = 1072 + _globals['_SERVERMESSAGE_PEERCRASHED']._serialized_end = 1119 + _globals['_SERVERMESSAGE_COMPONENTCRASHED']._serialized_start = 1121 + _globals['_SERVERMESSAGE_COMPONENTCRASHED']._serialized_end = 1178 + _globals['_SERVERMESSAGE_AUTHENTICATED']._serialized_start = 1180 + _globals['_SERVERMESSAGE_AUTHENTICATED']._serialized_end = 1195 + _globals['_SERVERMESSAGE_AUTHREQUEST']._serialized_start = 1197 + _globals['_SERVERMESSAGE_AUTHREQUEST']._serialized_end = 1225 + _globals['_SERVERMESSAGE_SUBSCRIBEREQUEST']._serialized_start = 1227 + _globals['_SERVERMESSAGE_SUBSCRIBEREQUEST']._serialized_end = 1301 + _globals['_SERVERMESSAGE_SUBSCRIBERESPONSE']._serialized_start = 1303 + _globals['_SERVERMESSAGE_SUBSCRIBERESPONSE']._serialized_end = 1378 + _globals['_SERVERMESSAGE_ROOMCREATED']._serialized_start = 1380 + _globals['_SERVERMESSAGE_ROOMCREATED']._serialized_end = 1410 + _globals['_SERVERMESSAGE_ROOMDELETED']._serialized_start = 1412 + _globals['_SERVERMESSAGE_ROOMDELETED']._serialized_end = 1442 + _globals['_SERVERMESSAGE_METRICSREPORT']._serialized_start = 1444 + _globals['_SERVERMESSAGE_METRICSREPORT']._serialized_end = 1476 + _globals['_SERVERMESSAGE_HLSPLAYABLE']._serialized_start = 1478 + _globals['_SERVERMESSAGE_HLSPLAYABLE']._serialized_end = 1530 + _globals['_SERVERMESSAGE_EVENTTYPE']._serialized_start = 1532 + _globals['_SERVERMESSAGE_EVENTTYPE']._serialized_end = 1631 +# @@protoc_insertion_point(module_scope) diff --git a/protos b/protos new file mode 160000 index 0000000..a6f6971 --- /dev/null +++ b/protos @@ -0,0 +1 @@ +Subproject commit a6f69712da283d13ae8b6e828694b48fce92a9e5 From 77e715655c03ecbd383dc2e3cb745f96838abc40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Ro=C5=BCnawski?= Date: Wed, 20 Sep 2023 08:57:58 +0200 Subject: [PATCH 02/16] wip --- jellyfish/notifier.py | 0 jellyfish/protos/__init__.py | 0 jellyfish/protos/jellyfish/__init__.py | 0 test_dupa.py | 6 ++++++ 4 files changed, 6 insertions(+) create mode 100644 jellyfish/notifier.py create mode 100644 jellyfish/protos/__init__.py create mode 100644 jellyfish/protos/jellyfish/__init__.py create mode 100644 test_dupa.py diff --git a/jellyfish/notifier.py b/jellyfish/notifier.py new file mode 100644 index 0000000..e69de29 diff --git a/jellyfish/protos/__init__.py b/jellyfish/protos/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jellyfish/protos/jellyfish/__init__.py b/jellyfish/protos/jellyfish/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test_dupa.py b/test_dupa.py new file mode 100644 index 0000000..275dc66 --- /dev/null +++ b/test_dupa.py @@ -0,0 +1,6 @@ +from jellyfish.protos.jellyfish import server_notifications_pb2 + +msg = server_notifications_pb2.ServerMessage.HlsPlayable() +msg.room_id = "dupsko" + +print('msg:', msg) \ No newline at end of file From c75c978d69e145acd0c4bf131fae07d9b4d8e05b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Ro=C5=BCnawski?= Date: Wed, 20 Sep 2023 08:58:26 +0200 Subject: [PATCH 03/16] Better proto compiler --- compile_proto.sh | 12 +- dev-requirements.txt | 1 + jellyfish/protos/jellyfish.py | 151 ++++++++++++++++++ jellyfish/protos/jellyfish/__init__.py | 0 .../jellyfish/peer_notifications_pb2.py | 30 ---- .../jellyfish/server_notifications_pb2.py | 52 ------ test_dupa.py | 2 +- 7 files changed, 157 insertions(+), 91 deletions(-) create mode 100644 jellyfish/protos/jellyfish.py delete mode 100644 jellyfish/protos/jellyfish/__init__.py delete mode 100644 jellyfish/protos/jellyfish/peer_notifications_pb2.py delete mode 100644 jellyfish/protos/jellyfish/server_notifications_pb2.py diff --git a/compile_proto.sh b/compile_proto.sh index b8a27b3..68ed027 100755 --- a/compile_proto.sh +++ b/compile_proto.sh @@ -11,14 +11,10 @@ printf "DONE\n\n" files=$(find protos/jellyfish -name "*.proto") -printf "Compiling:\n" -count=1 -total=${#files[@]} +printf "Compiling files:\n" + for file in $files; do - printf "Compile file %s %s ... " $count $file - protoc --python_out=./jellyfish/ $file - printf "DONE\n" - count=$(($count + 1)) + printf "* $file\n" done -autopep8 -ir jellyfish/protos \ No newline at end of file +protoc -I . --python_betterproto_out=jellyfish/protos $files diff --git a/dev-requirements.txt b/dev-requirements.txt index 8139065..5e15e71 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,4 +1,5 @@ aenum==3.1.15 +betterproto==1.2.5 pydantic==1.10.12 pytest==7.1.3 python_dateutil==2.8.2 diff --git a/jellyfish/protos/jellyfish.py b/jellyfish/protos/jellyfish.py new file mode 100644 index 0000000..bab67e5 --- /dev/null +++ b/jellyfish/protos/jellyfish.py @@ -0,0 +1,151 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# sources: protos/jellyfish/server_notifications.proto, protos/jellyfish/peer_notifications.proto +# plugin: python-betterproto +from dataclasses import dataclass + +import betterproto + + +class ServerMessageEventType(betterproto.Enum): + EVENT_TYPE_UNSPECIFIED = 0 + EVENT_TYPE_SERVER_NOTIFICATION = 1 + EVENT_TYPE_METRICS = 2 + + +@dataclass +class ServerMessage(betterproto.Message): + room_crashed: "ServerMessageRoomCrashed" = betterproto.message_field( + 1, group="content" + ) + peer_connected: "ServerMessagePeerConnected" = betterproto.message_field( + 2, group="content" + ) + peer_disconnected: "ServerMessagePeerDisconnected" = betterproto.message_field( + 3, group="content" + ) + peer_crashed: "ServerMessagePeerCrashed" = betterproto.message_field( + 4, group="content" + ) + component_crashed: "ServerMessageComponentCrashed" = betterproto.message_field( + 5, group="content" + ) + authenticated: "ServerMessageAuthenticated" = betterproto.message_field( + 6, group="content" + ) + auth_request: "ServerMessageAuthRequest" = betterproto.message_field( + 7, group="content" + ) + subscribe_request: "ServerMessageSubscribeRequest" = betterproto.message_field( + 8, group="content" + ) + subscribe_response: "ServerMessageSubscribeResponse" = betterproto.message_field( + 9, group="content" + ) + room_created: "ServerMessageRoomCreated" = betterproto.message_field( + 10, group="content" + ) + room_deleted: "ServerMessageRoomDeleted" = betterproto.message_field( + 11, group="content" + ) + metrics_report: "ServerMessageMetricsReport" = betterproto.message_field( + 12, group="content" + ) + hls_playable: "ServerMessageHlsPlayable" = betterproto.message_field( + 13, group="content" + ) + + +@dataclass +class ServerMessageRoomCrashed(betterproto.Message): + room_id: str = betterproto.string_field(1) + + +@dataclass +class ServerMessagePeerConnected(betterproto.Message): + room_id: str = betterproto.string_field(1) + peer_id: str = betterproto.string_field(2) + + +@dataclass +class ServerMessagePeerDisconnected(betterproto.Message): + room_id: str = betterproto.string_field(1) + peer_id: str = betterproto.string_field(2) + + +@dataclass +class ServerMessagePeerCrashed(betterproto.Message): + room_id: str = betterproto.string_field(1) + peer_id: str = betterproto.string_field(2) + + +@dataclass +class ServerMessageComponentCrashed(betterproto.Message): + room_id: str = betterproto.string_field(1) + component_id: str = betterproto.string_field(2) + + +@dataclass +class ServerMessageAuthenticated(betterproto.Message): + pass + + +@dataclass +class ServerMessageAuthRequest(betterproto.Message): + token: str = betterproto.string_field(1) + + +@dataclass +class ServerMessageSubscribeRequest(betterproto.Message): + event_type: "ServerMessageEventType" = betterproto.enum_field(1) + + +@dataclass +class ServerMessageSubscribeResponse(betterproto.Message): + event_type: "ServerMessageEventType" = betterproto.enum_field(1) + + +@dataclass +class ServerMessageRoomCreated(betterproto.Message): + room_id: str = betterproto.string_field(1) + + +@dataclass +class ServerMessageRoomDeleted(betterproto.Message): + room_id: str = betterproto.string_field(1) + + +@dataclass +class ServerMessageMetricsReport(betterproto.Message): + metrics: str = betterproto.string_field(1) + + +@dataclass +class ServerMessageHlsPlayable(betterproto.Message): + room_id: str = betterproto.string_field(1) + component_id: str = betterproto.string_field(2) + + +@dataclass +class PeerMessage(betterproto.Message): + authenticated: "PeerMessageAuthenticated" = betterproto.message_field( + 1, group="content" + ) + auth_request: "PeerMessageAuthRequest" = betterproto.message_field( + 2, group="content" + ) + media_event: "PeerMessageMediaEvent" = betterproto.message_field(3, group="content") + + +@dataclass +class PeerMessageAuthenticated(betterproto.Message): + pass + + +@dataclass +class PeerMessageAuthRequest(betterproto.Message): + token: str = betterproto.string_field(1) + + +@dataclass +class PeerMessageMediaEvent(betterproto.Message): + data: str = betterproto.string_field(1) diff --git a/jellyfish/protos/jellyfish/__init__.py b/jellyfish/protos/jellyfish/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/jellyfish/protos/jellyfish/peer_notifications_pb2.py b/jellyfish/protos/jellyfish/peer_notifications_pb2.py deleted file mode 100644 index 8e994ff..0000000 --- a/jellyfish/protos/jellyfish/peer_notifications_pb2.py +++ /dev/null @@ -1,30 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: protos/jellyfish/peer_notifications.proto -"""Generated protocol buffer code.""" -from google.protobuf import descriptor as _descriptor -from google.protobuf import descriptor_pool as _descriptor_pool -from google.protobuf import symbol_database as _symbol_database -from google.protobuf.internal import builder as _builder -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n)protos/jellyfish/peer_notifications.proto\x12\tjellyfish\"\x98\x02\n\x0bPeerMessage\x12=\n\rauthenticated\x18\x01 \x01(\x0b\x32$.jellyfish.PeerMessage.AuthenticatedH\x00\x12:\n\x0c\x61uth_request\x18\x02 \x01(\x0b\x32\".jellyfish.PeerMessage.AuthRequestH\x00\x12\x38\n\x0bmedia_event\x18\x03 \x01(\x0b\x32!.jellyfish.PeerMessage.MediaEventH\x00\x1a\x0f\n\rAuthenticated\x1a\x1c\n\x0b\x41uthRequest\x12\r\n\x05token\x18\x01 \x01(\t\x1a\x1a\n\nMediaEvent\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\tB\t\n\x07\x63ontentb\x06proto3') - -_globals = globals() -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages( - DESCRIPTOR, 'protos.jellyfish.peer_notifications_pb2', _globals) -if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None - _globals['_PEERMESSAGE']._serialized_start = 57 - _globals['_PEERMESSAGE']._serialized_end = 337 - _globals['_PEERMESSAGE_AUTHENTICATED']._serialized_start = 253 - _globals['_PEERMESSAGE_AUTHENTICATED']._serialized_end = 268 - _globals['_PEERMESSAGE_AUTHREQUEST']._serialized_start = 270 - _globals['_PEERMESSAGE_AUTHREQUEST']._serialized_end = 298 - _globals['_PEERMESSAGE_MEDIAEVENT']._serialized_start = 300 - _globals['_PEERMESSAGE_MEDIAEVENT']._serialized_end = 326 -# @@protoc_insertion_point(module_scope) diff --git a/jellyfish/protos/jellyfish/server_notifications_pb2.py b/jellyfish/protos/jellyfish/server_notifications_pb2.py deleted file mode 100644 index e653696..0000000 --- a/jellyfish/protos/jellyfish/server_notifications_pb2.py +++ /dev/null @@ -1,52 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: protos/jellyfish/server_notifications.proto -"""Generated protocol buffer code.""" -from google.protobuf import descriptor as _descriptor -from google.protobuf import descriptor_pool as _descriptor_pool -from google.protobuf import symbol_database as _symbol_database -from google.protobuf.internal import builder as _builder -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n+protos/jellyfish/server_notifications.proto\x12\tjellyfish\"\xaf\x0c\n\rServerMessage\x12<\n\x0croom_crashed\x18\x01 \x01(\x0b\x32$.jellyfish.ServerMessage.RoomCrashedH\x00\x12@\n\x0epeer_connected\x18\x02 \x01(\x0b\x32&.jellyfish.ServerMessage.PeerConnectedH\x00\x12\x46\n\x11peer_disconnected\x18\x03 \x01(\x0b\x32).jellyfish.ServerMessage.PeerDisconnectedH\x00\x12<\n\x0cpeer_crashed\x18\x04 \x01(\x0b\x32$.jellyfish.ServerMessage.PeerCrashedH\x00\x12\x46\n\x11\x63omponent_crashed\x18\x05 \x01(\x0b\x32).jellyfish.ServerMessage.ComponentCrashedH\x00\x12?\n\rauthenticated\x18\x06 \x01(\x0b\x32&.jellyfish.ServerMessage.AuthenticatedH\x00\x12<\n\x0c\x61uth_request\x18\x07 \x01(\x0b\x32$.jellyfish.ServerMessage.AuthRequestH\x00\x12\x46\n\x11subscribe_request\x18\x08 \x01(\x0b\x32).jellyfish.ServerMessage.SubscribeRequestH\x00\x12H\n\x12subscribe_response\x18\t \x01(\x0b\x32*.jellyfish.ServerMessage.SubscribeResponseH\x00\x12<\n\x0croom_created\x18\n \x01(\x0b\x32$.jellyfish.ServerMessage.RoomCreatedH\x00\x12<\n\x0croom_deleted\x18\x0b \x01(\x0b\x32$.jellyfish.ServerMessage.RoomDeletedH\x00\x12@\n\x0emetrics_report\x18\x0c \x01(\x0b\x32&.jellyfish.ServerMessage.MetricsReportH\x00\x12<\n\x0chls_playable\x18\r \x01(\x0b\x32$.jellyfish.ServerMessage.HlsPlayableH\x00\x1a\x1e\n\x0bRoomCrashed\x12\x0f\n\x07room_id\x18\x01 \x01(\t\x1a\x31\n\rPeerConnected\x12\x0f\n\x07room_id\x18\x01 \x01(\t\x12\x0f\n\x07peer_id\x18\x02 \x01(\t\x1a\x34\n\x10PeerDisconnected\x12\x0f\n\x07room_id\x18\x01 \x01(\t\x12\x0f\n\x07peer_id\x18\x02 \x01(\t\x1a/\n\x0bPeerCrashed\x12\x0f\n\x07room_id\x18\x01 \x01(\t\x12\x0f\n\x07peer_id\x18\x02 \x01(\t\x1a\x39\n\x10\x43omponentCrashed\x12\x0f\n\x07room_id\x18\x01 \x01(\t\x12\x14\n\x0c\x63omponent_id\x18\x02 \x01(\t\x1a\x0f\n\rAuthenticated\x1a\x1c\n\x0b\x41uthRequest\x12\r\n\x05token\x18\x01 \x01(\t\x1aJ\n\x10SubscribeRequest\x12\x36\n\nevent_type\x18\x01 \x01(\x0e\x32\".jellyfish.ServerMessage.EventType\x1aK\n\x11SubscribeResponse\x12\x36\n\nevent_type\x18\x01 \x01(\x0e\x32\".jellyfish.ServerMessage.EventType\x1a\x1e\n\x0bRoomCreated\x12\x0f\n\x07room_id\x18\x01 \x01(\t\x1a\x1e\n\x0bRoomDeleted\x12\x0f\n\x07room_id\x18\x01 \x01(\t\x1a \n\rMetricsReport\x12\x0f\n\x07metrics\x18\x01 \x01(\t\x1a\x34\n\x0bHlsPlayable\x12\x0f\n\x07room_id\x18\x01 \x01(\t\x12\x14\n\x0c\x63omponent_id\x18\x02 \x01(\t\"c\n\tEventType\x12\x1a\n\x16\x45VENT_TYPE_UNSPECIFIED\x10\x00\x12\"\n\x1e\x45VENT_TYPE_SERVER_NOTIFICATION\x10\x01\x12\x16\n\x12\x45VENT_TYPE_METRICS\x10\x02\x42\t\n\x07\x63ontentb\x06proto3') - -_globals = globals() -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages( - DESCRIPTOR, 'protos.jellyfish.server_notifications_pb2', _globals) -if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None - _globals['_SERVERMESSAGE']._serialized_start = 59 - _globals['_SERVERMESSAGE']._serialized_end = 1642 - _globals['_SERVERMESSAGE_ROOMCRASHED']._serialized_start = 935 - _globals['_SERVERMESSAGE_ROOMCRASHED']._serialized_end = 965 - _globals['_SERVERMESSAGE_PEERCONNECTED']._serialized_start = 967 - _globals['_SERVERMESSAGE_PEERCONNECTED']._serialized_end = 1016 - _globals['_SERVERMESSAGE_PEERDISCONNECTED']._serialized_start = 1018 - _globals['_SERVERMESSAGE_PEERDISCONNECTED']._serialized_end = 1070 - _globals['_SERVERMESSAGE_PEERCRASHED']._serialized_start = 1072 - _globals['_SERVERMESSAGE_PEERCRASHED']._serialized_end = 1119 - _globals['_SERVERMESSAGE_COMPONENTCRASHED']._serialized_start = 1121 - _globals['_SERVERMESSAGE_COMPONENTCRASHED']._serialized_end = 1178 - _globals['_SERVERMESSAGE_AUTHENTICATED']._serialized_start = 1180 - _globals['_SERVERMESSAGE_AUTHENTICATED']._serialized_end = 1195 - _globals['_SERVERMESSAGE_AUTHREQUEST']._serialized_start = 1197 - _globals['_SERVERMESSAGE_AUTHREQUEST']._serialized_end = 1225 - _globals['_SERVERMESSAGE_SUBSCRIBEREQUEST']._serialized_start = 1227 - _globals['_SERVERMESSAGE_SUBSCRIBEREQUEST']._serialized_end = 1301 - _globals['_SERVERMESSAGE_SUBSCRIBERESPONSE']._serialized_start = 1303 - _globals['_SERVERMESSAGE_SUBSCRIBERESPONSE']._serialized_end = 1378 - _globals['_SERVERMESSAGE_ROOMCREATED']._serialized_start = 1380 - _globals['_SERVERMESSAGE_ROOMCREATED']._serialized_end = 1410 - _globals['_SERVERMESSAGE_ROOMDELETED']._serialized_start = 1412 - _globals['_SERVERMESSAGE_ROOMDELETED']._serialized_end = 1442 - _globals['_SERVERMESSAGE_METRICSREPORT']._serialized_start = 1444 - _globals['_SERVERMESSAGE_METRICSREPORT']._serialized_end = 1476 - _globals['_SERVERMESSAGE_HLSPLAYABLE']._serialized_start = 1478 - _globals['_SERVERMESSAGE_HLSPLAYABLE']._serialized_end = 1530 - _globals['_SERVERMESSAGE_EVENTTYPE']._serialized_start = 1532 - _globals['_SERVERMESSAGE_EVENTTYPE']._serialized_end = 1631 -# @@protoc_insertion_point(module_scope) diff --git a/test_dupa.py b/test_dupa.py index 275dc66..a813fa7 100644 --- a/test_dupa.py +++ b/test_dupa.py @@ -1,4 +1,4 @@ -from jellyfish.protos.jellyfish import server_notifications_pb2 +from jellyfish.protos.jellyfish.jellyfish import ServerMessage msg = server_notifications_pb2.ServerMessage.HlsPlayable() msg.room_id = "dupsko" From 393c63b79bd43d5cbb46ac030056e8ff414c3263 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Ro=C5=BCnawski?= Date: Wed, 20 Sep 2023 08:59:59 +0200 Subject: [PATCH 04/16] Private module --- compile_proto.sh | 2 +- jellyfish/{protos => _protos}/__init__.py | 0 jellyfish/{protos => _protos}/jellyfish.py | 0 pylintrc | 2 +- 4 files changed, 2 insertions(+), 2 deletions(-) rename jellyfish/{protos => _protos}/__init__.py (100%) rename jellyfish/{protos => _protos}/jellyfish.py (100%) diff --git a/compile_proto.sh b/compile_proto.sh index 68ed027..d9b2824 100755 --- a/compile_proto.sh +++ b/compile_proto.sh @@ -17,4 +17,4 @@ for file in $files; do printf "* $file\n" done -protoc -I . --python_betterproto_out=jellyfish/protos $files +protoc -I . --python_betterproto_out=jellyfish/_protos $files diff --git a/jellyfish/protos/__init__.py b/jellyfish/_protos/__init__.py similarity index 100% rename from jellyfish/protos/__init__.py rename to jellyfish/_protos/__init__.py diff --git a/jellyfish/protos/jellyfish.py b/jellyfish/_protos/jellyfish.py similarity index 100% rename from jellyfish/protos/jellyfish.py rename to jellyfish/_protos/jellyfish.py diff --git a/pylintrc b/pylintrc index 594df76..b2395a3 100644 --- a/pylintrc +++ b/pylintrc @@ -52,7 +52,7 @@ ignore=CVS # ignore-list. The regex matches against paths and can be in Posix or Windows # format. Because '\\' represents the directory delimiter on Windows systems, # it can't be used as an escape character. -ignore-paths=jellyfish/_openapi_client/ +ignore-paths=jellyfish/_openapi_client/,jellyfish/_protos/ # Files or directories matching the regular expression patterns are skipped. # The regex matches against base names, not paths. The default value ignores From e3e36816a8b1fe673ff4f8f44d3750a32acd0598 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Ro=C5=BCnawski?= Date: Wed, 20 Sep 2023 09:02:11 +0200 Subject: [PATCH 05/16] Notifier WIP --- compile_proto.sh | 2 +- jellyfish/__init__.py | 6 ++- jellyfish/_protos/jellyfish.py | 28 +------------- jellyfish/notifier.py | 69 ++++++++++++++++++++++++++++++++++ notifier_test.py | 29 ++++++++++++++ test_dupa.py | 6 --- 6 files changed, 105 insertions(+), 35 deletions(-) create mode 100644 notifier_test.py delete mode 100644 test_dupa.py diff --git a/compile_proto.sh b/compile_proto.sh index d9b2824..5cc61b5 100755 --- a/compile_proto.sh +++ b/compile_proto.sh @@ -9,7 +9,7 @@ git submodule sync --recursive >> /dev/null git submodule update --recursive --remote --init >> /dev/null printf "DONE\n\n" -files=$(find protos/jellyfish -name "*.proto") +files=$(find protos/jellyfish -name "server*.proto") printf "Compiling files:\n" diff --git a/jellyfish/__init__.py b/jellyfish/__init__.py index f472b0a..501bfdc 100644 --- a/jellyfish/__init__.py +++ b/jellyfish/__init__.py @@ -6,6 +6,8 @@ from pydantic.error_wrappers import ValidationError +from jellyfish._room_api import RoomApi + from jellyfish._openapi_client import ( Room, RoomConfig, Peer, Component, ComponentHLS, ComponentRTSP, ComponentOptions, ComponentOptionsRTSP, ComponentOptionsHLS, PeerOptionsWebRTC) @@ -13,7 +15,9 @@ from jellyfish._openapi_client.exceptions import ( UnauthorizedException, NotFoundException, BadRequestException) -from jellyfish._room_api import RoomApi +from jellyfish._protos.jellyfish import ( + ServerMessage, ServerMessageAuthRequest, ServerMessageAuthenticated, + ServerMessageSubscribeRequest, ServerMessageEventType, ServerMessageSubscribeResponse) __all__ = ['RoomApi', 'Room', 'Peer', 'Component', 'ComponentHLS', 'ComponentRTSP', 'ComponentOptionsHLS', 'RoomConfig', 'ComponentOptions', 'ComponentOptionsRTSP', diff --git a/jellyfish/_protos/jellyfish.py b/jellyfish/_protos/jellyfish.py index bab67e5..63307e2 100644 --- a/jellyfish/_protos/jellyfish.py +++ b/jellyfish/_protos/jellyfish.py @@ -1,5 +1,5 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! -# sources: protos/jellyfish/server_notifications.proto, protos/jellyfish/peer_notifications.proto +# sources: protos/jellyfish/server_notifications.proto # plugin: python-betterproto from dataclasses import dataclass @@ -123,29 +123,3 @@ class ServerMessageMetricsReport(betterproto.Message): class ServerMessageHlsPlayable(betterproto.Message): room_id: str = betterproto.string_field(1) component_id: str = betterproto.string_field(2) - - -@dataclass -class PeerMessage(betterproto.Message): - authenticated: "PeerMessageAuthenticated" = betterproto.message_field( - 1, group="content" - ) - auth_request: "PeerMessageAuthRequest" = betterproto.message_field( - 2, group="content" - ) - media_event: "PeerMessageMediaEvent" = betterproto.message_field(3, group="content") - - -@dataclass -class PeerMessageAuthenticated(betterproto.Message): - pass - - -@dataclass -class PeerMessageAuthRequest(betterproto.Message): - token: str = betterproto.string_field(1) - - -@dataclass -class PeerMessageMediaEvent(betterproto.Message): - data: str = betterproto.string_field(1) diff --git a/jellyfish/notifier.py b/jellyfish/notifier.py index e69de29..01d7c00 100644 --- a/jellyfish/notifier.py +++ b/jellyfish/notifier.py @@ -0,0 +1,69 @@ +import logging + +import asyncio +import betterproto + +from websockets import client + +from jellyfish import ( + ServerMessage, ServerMessageAuthRequest, ServerMessageAuthenticated, + ServerMessageSubscribeRequest, ServerMessageEventType, ServerMessageSubscribeResponse) + +URI = 'http://localhost:5002/socket/server/websocket' +API_TOKEN = 'development' + + +class Notifier: + def __init__(self, on_message, server_address, server_api_token): + self._on_message = on_message + self._server_address = server_address + self._server_api_token = server_api_token + self._websocket = None + self._subscribtion_response_event = None + + async def connect(self): + async with client.connect(f'ws://{self._server_address}/socket/server/websocket') as websocket: + self._websocket = websocket + + msg = ServerMessage(auth_request=ServerMessageAuthRequest(token=self._server_api_token)) + await websocket.send(bytes(msg)) + + message = await websocket.recv() + message = ServerMessage().parse(message) + + _type, message = betterproto.which_one_of(message, 'content') + assert isinstance(message, ServerMessageAuthenticated) + + receive_task = asyncio.create_task(self._receive_loop()) + + await receive_task + + self._websocket = None + + async def subscribe_server_notification(self): + await self._subscribe_event(ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION) + + async def _receive_loop(self): + while True: + message = await self._websocket.recv() + message = ServerMessage().parse(message) + _which, message = betterproto.which_one_of(message, "content") + + logging.info('Received message from server: %s', message) + + if isinstance(message, ServerMessageSubscribeResponse): + self._subscribtion_response_event.set() + + self._on_message(message) + + async def _subscribe_event(self, event: ServerMessageEventType): + request = ServerMessage(subscribe_request=ServerMessageSubscribeRequest(event)) + + self._subscribtion_response_event = asyncio.Event() + response_task = asyncio.create_task(self._subscribtion_response_event.wait()) + + await self._websocket.send(bytes(request)) + + await response_task + self._subscribtion_response_event = None + logging.info('Successfully subscribed to %s', event) diff --git a/notifier_test.py b/notifier_test.py new file mode 100644 index 0000000..937df26 --- /dev/null +++ b/notifier_test.py @@ -0,0 +1,29 @@ +import asyncio + +from jellyfish.notifier import Notifier + +URI = 'ws://localhost:5002/socket/server/websocket' +API_TOKEN = 'development' + +SERVER_ADDRESS = 'localhost:5002' +SERVER_API_TOKEN = 'development' + + +def on_message(message): + print('new message from handler', message) + + +notifier = Notifier(on_message=on_message, server_address=SERVER_ADDRESS, + server_api_token=SERVER_API_TOKEN) + + +async def main(): + task = asyncio.create_task(notifier.connect()) + + await asyncio.sleep(0.2) + await notifier.subscribe_server_notification() + + await task + + +asyncio.run(main()) diff --git a/test_dupa.py b/test_dupa.py deleted file mode 100644 index a813fa7..0000000 --- a/test_dupa.py +++ /dev/null @@ -1,6 +0,0 @@ -from jellyfish.protos.jellyfish.jellyfish import ServerMessage - -msg = server_notifications_pb2.ServerMessage.HlsPlayable() -msg.room_id = "dupsko" - -print('msg:', msg) \ No newline at end of file From 7928b8ffa606d465e1fc6da092e676efa1196970 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Ro=C5=BCnawski?= Date: Wed, 20 Sep 2023 11:21:45 +0200 Subject: [PATCH 06/16] Notifier WIP --- dev-requirements.txt | 1 + jellyfish/__init__.py | 6 ++- jellyfish/{notifier.py => _notifier.py} | 60 ++++++++++++++++--------- jellyfish/_room_api.py | 2 +- notifier_test.py | 20 +++------ tests/notifier_test.py | 50 +++++++++++++++++++++ 6 files changed, 102 insertions(+), 37 deletions(-) rename jellyfish/{notifier.py => _notifier.py} (50%) create mode 100644 tests/notifier_test.py diff --git a/dev-requirements.txt b/dev-requirements.txt index 5e15e71..2f24430 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -2,6 +2,7 @@ aenum==3.1.15 betterproto==1.2.5 pydantic==1.10.12 pytest==7.1.3 +pytest-asyncio==0.21.1 python_dateutil==2.8.2 setuptools==67.6.1 typing_extensions==4.7.1 diff --git a/jellyfish/__init__.py b/jellyfish/__init__.py index 501bfdc..cdddf5d 100644 --- a/jellyfish/__init__.py +++ b/jellyfish/__init__.py @@ -7,6 +7,7 @@ from pydantic.error_wrappers import ValidationError from jellyfish._room_api import RoomApi +from jellyfish._notifier import Notifier from jellyfish._openapi_client import ( Room, RoomConfig, Peer, Component, ComponentHLS, ComponentRTSP, ComponentOptions, @@ -17,9 +18,10 @@ from jellyfish._protos.jellyfish import ( ServerMessage, ServerMessageAuthRequest, ServerMessageAuthenticated, - ServerMessageSubscribeRequest, ServerMessageEventType, ServerMessageSubscribeResponse) + ServerMessageSubscribeRequest, ServerMessageEventType, ServerMessageSubscribeResponse, + ServerMessageMetricsReport) -__all__ = ['RoomApi', 'Room', 'Peer', 'Component', 'ComponentHLS', 'ComponentRTSP', +__all__ = ['RoomApi', 'Notifier', 'Room', 'Peer', 'Component', 'ComponentHLS', 'ComponentRTSP', 'ComponentOptionsHLS', 'RoomConfig', 'ComponentOptions', 'ComponentOptionsRTSP', 'PeerOptionsWebRTC', 'UnauthorizedException', 'NotFoundException', 'BadRequestException'] diff --git a/jellyfish/notifier.py b/jellyfish/_notifier.py similarity index 50% rename from jellyfish/notifier.py rename to jellyfish/_notifier.py index 01d7c00..0c5ca62 100644 --- a/jellyfish/notifier.py +++ b/jellyfish/_notifier.py @@ -3,23 +3,33 @@ import asyncio import betterproto +from typing import Callable + from websockets import client +from websockets.exceptions import ConnectionClosedOK -from jellyfish import ( +from jellyfish._protos.jellyfish import ( ServerMessage, ServerMessageAuthRequest, ServerMessageAuthenticated, - ServerMessageSubscribeRequest, ServerMessageEventType, ServerMessageSubscribeResponse) - -URI = 'http://localhost:5002/socket/server/websocket' -API_TOKEN = 'development' + ServerMessageSubscribeRequest, ServerMessageEventType, ServerMessageSubscribeResponse, + ServerMessageMetricsReport) class Notifier: - def __init__(self, on_message, server_address, server_api_token): - self._on_message = on_message + def __init__(self, server_address: str, server_api_token: str): self._server_address = server_address self._server_api_token = server_api_token self._websocket = None - self._subscribtion_response_event = None + + self._notification_handler = None + self._metrics_handler = None + + def on_server_notification(self, handler: Callable[[ServerMessage], None]): + self._notification_handler = handler + return handler + + def on_metrics(self, handler: Callable[[ServerMessageMetricsReport], None]): + self._metrics_handler = handler + return handler async def connect(self): async with client.connect(f'ws://{self._server_address}/socket/server/websocket') as websocket: @@ -28,21 +38,29 @@ async def connect(self): msg = ServerMessage(auth_request=ServerMessageAuthRequest(token=self._server_api_token)) await websocket.send(bytes(msg)) - message = await websocket.recv() + try: + message = await websocket.recv() + except ConnectionClosedOK as exception: + print(exception) + raise RuntimeError from exception + message = ServerMessage().parse(message) _type, message = betterproto.which_one_of(message, 'content') assert isinstance(message, ServerMessageAuthenticated) + if self._notification_handler: + await self._subscribe_event(event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION) + + if self._metrics_handler: + await self._subscribe_event(event=ServerMessageEventType.EVENT_TYPE_METRICS) + receive_task = asyncio.create_task(self._receive_loop()) await receive_task self._websocket = None - async def subscribe_server_notification(self): - await self._subscribe_event(ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION) - async def _receive_loop(self): while True: message = await self._websocket.recv() @@ -51,19 +69,19 @@ async def _receive_loop(self): logging.info('Received message from server: %s', message) - if isinstance(message, ServerMessageSubscribeResponse): - self._subscribtion_response_event.set() - - self._on_message(message) + if isinstance(message, ServerMessageMetricsReport): + self._metrics_handler(message) + else: + self._notification_handler(message) async def _subscribe_event(self, event: ServerMessageEventType): request = ServerMessage(subscribe_request=ServerMessageSubscribeRequest(event)) - self._subscribtion_response_event = asyncio.Event() - response_task = asyncio.create_task(self._subscribtion_response_event.wait()) - await self._websocket.send(bytes(request)) + message = await self._websocket.recv() + message = ServerMessage().parse(message) + _which, message = betterproto.which_one_of(message, "content") + print('response', message) + assert isinstance(message, ServerMessageSubscribeResponse) - await response_task - self._subscribtion_response_event = None logging.info('Successfully subscribed to %s', event) diff --git a/jellyfish/_room_api.py b/jellyfish/_room_api.py index f59c909..3ec3078 100644 --- a/jellyfish/_room_api.py +++ b/jellyfish/_room_api.py @@ -20,7 +20,7 @@ def __init__(self, Create RoomApi instance, providing the jellyfish address and api token. ''' self._configuration = jellyfish_api.Configuration( - host=server_address, + host=f'http://{server_address}', access_token=server_api_token ) diff --git a/notifier_test.py b/notifier_test.py index 937df26..5da6397 100644 --- a/notifier_test.py +++ b/notifier_test.py @@ -1,6 +1,6 @@ import asyncio -from jellyfish.notifier import Notifier +from jellyfish._notifier import Notifier URI = 'ws://localhost:5002/socket/server/websocket' API_TOKEN = 'development' @@ -9,21 +9,15 @@ SERVER_API_TOKEN = 'development' -def on_message(message): - print('new message from handler', message) - - -notifier = Notifier(on_message=on_message, server_address=SERVER_ADDRESS, +notifier = Notifier(server_address=SERVER_ADDRESS, server_api_token=SERVER_API_TOKEN) -async def main(): - task = asyncio.create_task(notifier.connect()) - - await asyncio.sleep(0.2) - await notifier.subscribe_server_notification() +@notifier.on_server_notification +def handler(server_notification): + print('new message from handler', server_notification) - await task +asyncio.run(notifier.connect()) -asyncio.run(main()) +print('after asyncio.run') diff --git a/tests/notifier_test.py b/tests/notifier_test.py new file mode 100644 index 0000000..b1af500 --- /dev/null +++ b/tests/notifier_test.py @@ -0,0 +1,50 @@ +# pylint: disable=locally-disabled, missing-class-docstring, missing-function-docstring, redefined-outer-name, too-few-public-methods + +import os +import pytest + +import asyncio + +from jellyfish._notifier import Notifier + +HOST = 'jellyfish' if os.getenv('DOCKER_TEST') == 'TRUE' else 'localhost' +SERVER_ADDRESS = f'{HOST}:5002' +SERVER_API_TOKEN = 'development' + + +class TestConnectingToServer: + @pytest.mark.asyncio + async def test_valid_credentials(self): + notifier = Notifier(server_address=SERVER_ADDRESS, + server_api_token=SERVER_API_TOKEN) + + task = asyncio.create_task(notifier.connect()) + + await asyncio.sleep(0.1) + assert notifier._websocket.open + + task.cancel() + await asyncio.sleep(0.1) + + @pytest.mark.asyncio + async def test_invalid_credentials(self): + notifier = Notifier(server_address=SERVER_ADDRESS, + server_api_token='server_crappy_token') + + task = asyncio.create_task(notifier.connect()) + + with pytest.raises(RuntimeError): + await task + + +@pytest.fixture +def notifier(): + return Notifier(server_address=SERVER_ADDRESS, server_api_token=SERVER_API_TOKEN) + + +class TestReceivingNotifications: + @pytest.mark.asyncio + + +class TestReceivingMetrics: + @pytest.mark.asyncio From 450beb78521b48f1166dbe333d2eadce08eddd9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Ro=C5=BCnawski?= Date: Fri, 22 Sep 2023 12:23:22 +0200 Subject: [PATCH 07/16] Add notifier --- compile_proto.sh | 16 +++--- jellyfish/_notifier.py | 58 ++++++++++++++++++---- jellyfish/_room_api.py | 6 +-- pylintrc | 2 +- tests/notifier_test.py | 81 ++++++++++++++++++++++++++++--- tests/support/__init__.py | 0 tests/support/asyncio_utils.py | 40 +++++++++++++++ tests/support/peer_socket.py | 51 +++++++++++++++++++ tests/support/protos/__init__.py | 0 tests/support/protos/jellyfish.py | 32 ++++++++++++ tests/test_room_api.py | 7 +-- 11 files changed, 259 insertions(+), 34 deletions(-) create mode 100644 tests/support/__init__.py create mode 100644 tests/support/asyncio_utils.py create mode 100644 tests/support/peer_socket.py create mode 100644 tests/support/protos/__init__.py create mode 100644 tests/support/protos/jellyfish.py diff --git a/compile_proto.sh b/compile_proto.sh index 5cc61b5..b0011d3 100755 --- a/compile_proto.sh +++ b/compile_proto.sh @@ -9,12 +9,12 @@ git submodule sync --recursive >> /dev/null git submodule update --recursive --remote --init >> /dev/null printf "DONE\n\n" -files=$(find protos/jellyfish -name "server*.proto") +server_file="./protos/jellyfish/server_notifications.proto" +printf "Compiling: file $server_file" +protoc -I . --python_betterproto_out=./jellyfish/_protos $server_file +printf "\tDONE\n" -printf "Compiling files:\n" - -for file in $files; do - printf "* $file\n" -done - -protoc -I . --python_betterproto_out=jellyfish/_protos $files +peer_file="./protos/jellyfish/peer_notifications.proto" +printf "Compiling: file $peer_file" +protoc -I . --python_betterproto_out=./tests/support/protos $peer_file +printf "\tDONE\n" diff --git a/jellyfish/_notifier.py b/jellyfish/_notifier.py index 0c5ca62..898df44 100644 --- a/jellyfish/_notifier.py +++ b/jellyfish/_notifier.py @@ -1,9 +1,14 @@ -import logging +''' +Notifier +''' +import logging import asyncio -import betterproto -from typing import Callable +from typing import Callable, Any + + +import betterproto from websockets import client from websockets.exceptions import ConnectionClosedOK @@ -15,24 +20,41 @@ class Notifier: + ''' + Allows for receiving websocket notifications from Jellyfish. + ''' + def __init__(self, server_address: str, server_api_token: str): self._server_address = server_address self._server_api_token = server_api_token self._websocket = None + self._ready = False - self._notification_handler = None - self._metrics_handler = None + self._ready_event: asyncio.Event = None - def on_server_notification(self, handler: Callable[[ServerMessage], None]): + self._notification_handler: Callable = None + self._metrics_handler: Callable = None + + def on_server_notification(self, handler: Callable[[Any], None]): + ''' + Decorator used for defining handler for ServerNotifications. + ''' self._notification_handler = handler return handler def on_metrics(self, handler: Callable[[ServerMessageMetricsReport], None]): + ''' + Decorator used for defining handler for MetricsReport. + ''' self._metrics_handler = handler return handler async def connect(self): - async with client.connect(f'ws://{self._server_address}/socket/server/websocket') as websocket: + ''' + Connects Notifier to Jellyfish + ''' + async with client.connect(f'ws://{self._server_address}/socket/server/websocket') \ + as websocket: self._websocket = websocket msg = ServerMessage(auth_request=ServerMessageAuthRequest(token=self._server_api_token)) @@ -41,7 +63,6 @@ async def connect(self): try: message = await websocket.recv() except ConnectionClosedOK as exception: - print(exception) raise RuntimeError from exception message = ServerMessage().parse(message) @@ -50,17 +71,35 @@ async def connect(self): assert isinstance(message, ServerMessageAuthenticated) if self._notification_handler: - await self._subscribe_event(event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION) + await self._subscribe_event( + event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION) if self._metrics_handler: await self._subscribe_event(event=ServerMessageEventType.EVENT_TYPE_METRICS) + self._ready = True + + if self._ready_event: + self._ready_event.set() + receive_task = asyncio.create_task(self._receive_loop()) await receive_task self._websocket = None + async def wait_ready(self): + ''' + Waits ready + ''' + if self._ready: + return + + if self._ready_event is None: + self._ready_event = asyncio.Event() + + await self._ready_event.wait() + async def _receive_loop(self): while True: message = await self._websocket.recv() @@ -81,7 +120,6 @@ async def _subscribe_event(self, event: ServerMessageEventType): message = await self._websocket.recv() message = ServerMessage().parse(message) _which, message = betterproto.which_one_of(message, "content") - print('response', message) assert isinstance(message, ServerMessageSubscribeResponse) logging.info('Successfully subscribed to %s', event) diff --git a/jellyfish/_room_api.py b/jellyfish/_room_api.py index 3ec3078..e501981 100644 --- a/jellyfish/_room_api.py +++ b/jellyfish/_room_api.py @@ -1,6 +1,6 @@ -""" - RoomApi used to manage rooms -""" +''' +RoomApi used to manage rooms +''' from typing import Union, Literal diff --git a/pylintrc b/pylintrc index b2395a3..fdddb75 100644 --- a/pylintrc +++ b/pylintrc @@ -52,7 +52,7 @@ ignore=CVS # ignore-list. The regex matches against paths and can be in Posix or Windows # format. Because '\\' represents the directory delimiter on Windows systems, # it can't be used as an escape character. -ignore-paths=jellyfish/_openapi_client/,jellyfish/_protos/ +ignore-paths=jellyfish/_openapi_client/,jellyfish/_protos/,tests/support/protos # Files or directories matching the regular expression patterns are skipped. # The regex matches against base names, not paths. The default value ignores diff --git a/tests/notifier_test.py b/tests/notifier_test.py index b1af500..b2c2eca 100644 --- a/tests/notifier_test.py +++ b/tests/notifier_test.py @@ -1,11 +1,18 @@ -# pylint: disable=locally-disabled, missing-class-docstring, missing-function-docstring, redefined-outer-name, too-few-public-methods +# pylint: disable=locally-disabled, missing-class-docstring, missing-function-docstring, redefined-outer-name, too-few-public-methods, missing-module-docstring import os +import asyncio + import pytest -import asyncio +from jellyfish import Notifier, RoomApi, PeerOptionsWebRTC +from jellyfish._protos.jellyfish import ( + ServerMessageRoomCreated, ServerMessageRoomDeleted, ServerMessagePeerConnected, + ServerMessagePeerDisconnected, ServerMessageMetricsReport) + +from tests.support.peer_socket import PeerSocket +from tests.support.asyncio_utils import assert_events, assert_metrics, cancel -from jellyfish._notifier import Notifier HOST = 'jellyfish' if os.getenv('DOCKER_TEST') == 'TRUE' else 'localhost' SERVER_ADDRESS = f'{HOST}:5002' @@ -18,12 +25,12 @@ async def test_valid_credentials(self): notifier = Notifier(server_address=SERVER_ADDRESS, server_api_token=SERVER_API_TOKEN) - task = asyncio.create_task(notifier.connect()) - - await asyncio.sleep(0.1) + notifier_task = asyncio.create_task(notifier.connect()) + await notifier.wait_ready() + # pylint: disable=protected-access assert notifier._websocket.open - task.cancel() + notifier_task.cancel() await asyncio.sleep(0.1) @pytest.mark.asyncio @@ -37,6 +44,11 @@ async def test_invalid_credentials(self): await task +@pytest.fixture +def room_api(): + return RoomApi(server_address=SERVER_ADDRESS, server_api_token=SERVER_API_TOKEN) + + @pytest.fixture def notifier(): return Notifier(server_address=SERVER_ADDRESS, server_api_token=SERVER_API_TOKEN) @@ -44,7 +56,62 @@ def notifier(): class TestReceivingNotifications: @pytest.mark.asyncio + async def test_room_created_deleted(self, room_api: RoomApi, notifier: Notifier): + event_checks = [ + ServerMessageRoomCreated, + ServerMessageRoomDeleted + ] + assert_task = asyncio.create_task(assert_events(notifier, event_checks)) + + notifier_task = asyncio.create_task(notifier.connect()) + await notifier.wait_ready() + + _, room = room_api.create_room() + room_api.delete_room(room.id) + + await assert_task + await cancel(notifier_task) + + @pytest.mark.asyncio + async def test_peer_connected_disconnected(self, room_api: RoomApi, notifier: Notifier): + event_checks = [ + ServerMessagePeerConnected, + ServerMessagePeerDisconnected + ] + assert_task = asyncio.create_task(assert_events(notifier, event_checks)) + + notifier_task = asyncio.create_task(notifier.connect()) + await notifier.wait_ready() + + _, room = room_api.create_room() + peer_token, _peer = room_api.add_peer(room.id, options=PeerOptionsWebRTC()) + + peer_socket = PeerSocket(server_address=SERVER_ADDRESS) + peer_task = asyncio.create_task(peer_socket.connect(peer_token)) + + await peer_socket.wait_ready() + + room_api.delete_room(room.id) + + await assert_task + await cancel(peer_task) + await cancel(notifier_task) class TestReceivingMetrics: @pytest.mark.asyncio + async def test_metrics_with_one_peer(self, room_api: RoomApi, notifier: Notifier): + _, room = room_api.create_room() + peer_token, _peer = room_api.add_peer(room.id, PeerOptionsWebRTC()) + + peer_socket = PeerSocket(server_address=SERVER_ADDRESS) + peer_task = asyncio.create_task(peer_socket.connect(peer_token)) + + await peer_socket.wait_ready() + + assert_task = asyncio.create_task(assert_metrics(notifier, [ServerMessageMetricsReport])) + notifier_task = asyncio.create_task(notifier.connect()) + + await assert_task + await cancel(peer_task) + await cancel(notifier_task) diff --git a/tests/support/__init__.py b/tests/support/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/support/asyncio_utils.py b/tests/support/asyncio_utils.py new file mode 100644 index 0000000..a191b07 --- /dev/null +++ b/tests/support/asyncio_utils.py @@ -0,0 +1,40 @@ +# pylint: disable=locally-disabled, missing-class-docstring, missing-function-docstring, redefined-outer-name, too-few-public-methods, missing-module-docstring + +import asyncio + +from jellyfish import Notifier + + +async def assert_events(notifier: Notifier, event_checks: list): + await _assert_messages(notifier.on_server_notification, event_checks) + + +async def assert_metrics(notifier: Notifier, metrics_checks: list): + await _assert_messages(notifier.on_metrics, metrics_checks) + + +async def _assert_messages(notifier_callback, message_checks): + success_event = asyncio.Event() + + @notifier_callback + def handle_message(message): + expected_msg = message_checks[0] + if message == expected_msg or isinstance(message, expected_msg): + message_checks.pop(0) + + if message_checks == []: + success_event.set() + + try: + await asyncio.wait_for(success_event.wait(), 1) + except asyncio.exceptions.TimeoutError as exc: + raise asyncio.exceptions.TimeoutError( + f"{message_checks[0]} hasn't been received within timeout") from exc + + +async def cancel(task): + task.cancel() + try: + await task + except asyncio.exceptions.CancelledError: + pass diff --git a/tests/support/peer_socket.py b/tests/support/peer_socket.py new file mode 100644 index 0000000..0eda097 --- /dev/null +++ b/tests/support/peer_socket.py @@ -0,0 +1,51 @@ +# pylint: disable=locally-disabled, missing-class-docstring, missing-function-docstring, redefined-outer-name, too-few-public-methods, missing-module-docstring + +import asyncio +import betterproto + +from websockets import client +from websockets.exceptions import ConnectionClosedOK + +from tests.support.protos.jellyfish import ( + PeerMessage, PeerMessageAuthRequest, PeerMessageAuthenticated) + + +class PeerSocket: + def __init__(self, server_address): + self._server_address = server_address + + self._ready = False + self._ready_event = None + + async def connect(self, token): + async with client.connect(f'ws://{self._server_address}/socket/peer/websocket') \ + as websocket: + msg = PeerMessage(auth_request=PeerMessageAuthRequest(token=token)) + await websocket.send(bytes(msg)) + + try: + message = await websocket.recv() + except ConnectionClosedOK as exception: + print(exception) + raise RuntimeError from exception + + message = PeerMessage().parse(message) + + _type, message = betterproto.which_one_of(message, 'content') + assert isinstance(message, PeerMessageAuthenticated) + + self._ready = True + if self._ready_event: + self._ready_event.set() + + await websocket.wait_closed() + + async def wait_ready(self): + # pylint: disable=duplicate-code + if self._ready: + return + + if self._ready_event is None: + self._ready_event = asyncio.Event() + + await self._ready_event.wait() diff --git a/tests/support/protos/__init__.py b/tests/support/protos/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/support/protos/jellyfish.py b/tests/support/protos/jellyfish.py new file mode 100644 index 0000000..67176f5 --- /dev/null +++ b/tests/support/protos/jellyfish.py @@ -0,0 +1,32 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# sources: protos/jellyfish/peer_notifications.proto +# plugin: python-betterproto +from dataclasses import dataclass + +import betterproto + + +@dataclass +class PeerMessage(betterproto.Message): + authenticated: "PeerMessageAuthenticated" = betterproto.message_field( + 1, group="content" + ) + auth_request: "PeerMessageAuthRequest" = betterproto.message_field( + 2, group="content" + ) + media_event: "PeerMessageMediaEvent" = betterproto.message_field(3, group="content") + + +@dataclass +class PeerMessageAuthenticated(betterproto.Message): + pass + + +@dataclass +class PeerMessageAuthRequest(betterproto.Message): + token: str = betterproto.string_field(1) + + +@dataclass +class PeerMessageMediaEvent(betterproto.Message): + data: str = betterproto.string_field(1) diff --git a/tests/test_room_api.py b/tests/test_room_api.py index f6c65d4..d230c6d 100644 --- a/tests/test_room_api.py +++ b/tests/test_room_api.py @@ -1,8 +1,5 @@ -# pylint: disable=locally-disabled, missing-class-docstring, missing-function-docstring, redefined-outer-name, too-few-public-methods +# pylint: disable=locally-disabled, missing-class-docstring, missing-function-docstring, redefined-outer-name, too-few-public-methods, missing-module-docstring -""" - Tests room api -""" import os @@ -18,7 +15,7 @@ HOST = 'jellyfish' if os.getenv('DOCKER_TEST') == 'TRUE' else 'localhost' -SERVER_ADDRESS = f'http://{HOST}:5002' +SERVER_ADDRESS = f'{HOST}:5002' SERVER_API_TOKEN = 'development' MAX_PEERS = 10 From 87081b1ebcc62cf71ca39296a0cc6da99be5b1b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Ro=C5=BCnawski?= Date: Fri, 22 Sep 2023 14:43:29 +0200 Subject: [PATCH 08/16] add missing deps --- dev-requirements.txt | 1 + docker-compose-test.yaml | 4 ++-- pyproject.toml | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 2f24430..544782c 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -9,3 +9,4 @@ typing_extensions==4.7.1 urllib3 >= 1.25.3, < 2 pylint==2.17.5 pdoc==14.1.0 +websockets==11.0.3 diff --git a/docker-compose-test.yaml b/docker-compose-test.yaml index a1a69fe..c41fba3 100644 --- a/docker-compose-test.yaml +++ b/docker-compose-test.yaml @@ -13,7 +13,7 @@ services: timeout: 2s start_period: 30s environment: - VIRTUAL_HOST: "jellyfish" + HOST: "jellyfish" USE_INTEGRATED_TURN: "true" INTEGRATED_TURN_IP: "${INTEGRATED_TURN_IP:-127.0.0.1}" INTEGRATED_TURN_LISTEN_IP: "0.0.0.0" @@ -33,7 +33,7 @@ services: image: python:3.8-alpine3.18 command: sh -c "cd app/ && pip install -e . && pytest" environment: - - DOCKER_TEST=TRUE + - DOCKER_TEST: "TRUE" volumes: - .:/app networks: diff --git a/pyproject.toml b/pyproject.toml index 5f049d4..2d69851 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,13 +15,14 @@ classifiers = [ ] dependencies = [ "aenum==3.1.15", + "betterproto==1.2.5", "pydantic==1.10.12", "pytest==7.1.3", "python_dateutil==2.8.2", "setuptools==67.6.1", "typing_extensions==4.7.1", "urllib3 >= 1.25.3, < 2", - "pylint==2.17.5" + "websockets==11.0.3", ] [project.urls] From 9a02d27106589ae92d4d45623144357a799c95da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Ro=C5=BCnawski?= Date: Fri, 22 Sep 2023 15:01:14 +0200 Subject: [PATCH 09/16] fix docker tests --- docker-compose-test.yaml | 4 ++-- notifier_test.py | 23 ----------------------- 2 files changed, 2 insertions(+), 25 deletions(-) delete mode 100644 notifier_test.py diff --git a/docker-compose-test.yaml b/docker-compose-test.yaml index c41fba3..0940d6d 100644 --- a/docker-compose-test.yaml +++ b/docker-compose-test.yaml @@ -31,9 +31,9 @@ services: test: image: python:3.8-alpine3.18 - command: sh -c "cd app/ && pip install -e . && pytest" + command: sh -c "cd app/ && pip install -r dev-requirements.txt && pytest -s" environment: - - DOCKER_TEST: "TRUE" + DOCKER_TEST: "TRUE" volumes: - .:/app networks: diff --git a/notifier_test.py b/notifier_test.py deleted file mode 100644 index 5da6397..0000000 --- a/notifier_test.py +++ /dev/null @@ -1,23 +0,0 @@ -import asyncio - -from jellyfish._notifier import Notifier - -URI = 'ws://localhost:5002/socket/server/websocket' -API_TOKEN = 'development' - -SERVER_ADDRESS = 'localhost:5002' -SERVER_API_TOKEN = 'development' - - -notifier = Notifier(server_address=SERVER_ADDRESS, - server_api_token=SERVER_API_TOKEN) - - -@notifier.on_server_notification -def handler(server_notification): - print('new message from handler', server_notification) - - -asyncio.run(notifier.connect()) - -print('after asyncio.run') From a201a3a412caad03deaa62aee45742a7d8ecb9e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Ro=C5=BCnawski?= Date: Mon, 25 Sep 2023 11:02:51 +0200 Subject: [PATCH 10/16] Update readme --- .gitignore | 3 ++ README.md | 51 +++++++++++++++++++- jellyfish/__init__.py | 22 ++++++--- jellyfish/_notifier.py | 2 +- tests/support/asyncio_utils.py | 8 +-- tests/{notifier_test.py => test_notifier.py} | 5 +- 6 files changed, 74 insertions(+), 17 deletions(-) rename tests/{notifier_test.py => test_notifier.py} (97%) diff --git a/.gitignore b/.gitignore index 6f19aa6..67b9b90 100644 --- a/.gitignore +++ b/.gitignore @@ -158,3 +158,6 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# Temporary files +/tmp diff --git a/README.md b/README.md index b904a51..9b8c8ab 100644 --- a/README.md +++ b/README.md @@ -15,12 +15,19 @@ pip install git+https://github.com/jellyfish-dev/python-server-sdk ## Usage -First create a `RoomApi` instance, providing the jellyfish server address and api token +The SDK exports two main classes for interacting with Jellyfish server: +`RoomApi` and `Notifier`. + +`RoomApi` wraps http REST api calls, while `Notifier` is responsible for receiving real-time updates from the server. + +#### RoomApi + +Create a `RoomApi` instance, providing the jellyfish server address and api token ```python from jellyfish import RoomApi -room_api = RoomApi(server_address='http://localhost:5002', server_api_token='development') +room_api = RoomApi(server_address='localhost:5002', server_api_token='development') ``` You can use it to interact with Jellyfish managing rooms, peers and components @@ -43,6 +50,46 @@ component_hls = room_api.add_component(room.id, options=ComponentOptionsHLS()) # Component(actual_instance=ComponentHLS(id='c0dfab50-cafd-438d-985e-7b8f97ae55e3', metadata=ComponentMetadataHLS(low_latency=False, playable=False), type='hls')) ``` +#### Notifier + +Create `Notifier` instance +```python +from jellyfish import Notifier + +notifier = Notifier(server_address='localhost:5002', server_api_token='development') +``` + +Then define handlers for incoming messages +```python +@notifier.on_server_notification +def handle_notification(server_notification): + print(f'Received a notification: {notification}') + +@notifier.on_metrics +def handle_metrics(metrics_report): + print(f'Received WebRTC metrics: {metrics_report.metrics}') +``` + +After that you can start the notifier +```python +async def test_notifier(): + notifier_task = asyncio.create_task(notifier.connect()) + + # Wait for notifier to be ready to receive messages + await notifier.wait_ready() + + # Create a room to trigger a server notification + room_api = RoomApi() + room_api.create_room() + + await notifier_task + +asyncio.run(test_notifier()) + +# Received a notification: ServerMessageRoomCreated(room_id='69a3fd1a-6a4d-47bc-ae54-0c72b0d05e29') +# Received metrics: {} +``` + ## Copyright and License Copyright 2023, [Software Mansion](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=jellyfish) diff --git a/jellyfish/__init__.py b/jellyfish/__init__.py index cdddf5d..92f49d6 100644 --- a/jellyfish/__init__.py +++ b/jellyfish/__init__.py @@ -6,23 +6,31 @@ from pydantic.error_wrappers import ValidationError +# API from jellyfish._room_api import RoomApi from jellyfish._notifier import Notifier +# Models from jellyfish._openapi_client import ( Room, RoomConfig, Peer, Component, ComponentHLS, ComponentRTSP, ComponentOptions, ComponentOptionsRTSP, ComponentOptionsHLS, PeerOptionsWebRTC) +# Server Messages +from jellyfish._protos.jellyfish import * + +# Exceptions from jellyfish._openapi_client.exceptions import ( UnauthorizedException, NotFoundException, BadRequestException) -from jellyfish._protos.jellyfish import ( - ServerMessage, ServerMessageAuthRequest, ServerMessageAuthenticated, - ServerMessageSubscribeRequest, ServerMessageEventType, ServerMessageSubscribeResponse, - ServerMessageMetricsReport) -__all__ = ['RoomApi', 'Notifier', 'Room', 'Peer', 'Component', 'ComponentHLS', 'ComponentRTSP', - 'ComponentOptionsHLS', 'RoomConfig', 'ComponentOptions', 'ComponentOptionsRTSP', - 'PeerOptionsWebRTC', 'UnauthorizedException', 'NotFoundException', 'BadRequestException'] +__all__ = [ + 'RoomApi', 'Notifier', 'Room', 'Peer', 'Component', 'ComponentHLS', 'ComponentRTSP', + 'ComponentOptionsHLS', 'RoomConfig', 'ComponentOptions', 'ComponentOptionsRTSP', + 'PeerOptionsWebRTC', 'ServerMessageAuthRequest', 'ServerMessageAuthenticated', + 'ServerMessageSubscribeRequest', 'ServerMessageEventType', 'ServerMessageSubscribeResponse', + 'ServerMessageMetricsReport', 'ServerMessageRoomCreated', 'ServerMessageRoomDeleted', + 'ServerMessageComponentCrashed', 'ServerMessageHlsPlayable', 'ServerMessagePeerConnected', + 'ServerMessagePeerCrashed', 'ServerMessagePeerDisconnected', 'ServerMessageRoomCrashed', + 'UnauthorizedException', 'NotFoundException', 'BadRequestException'] __docformat__ = "restructuredtext" diff --git a/jellyfish/_notifier.py b/jellyfish/_notifier.py index 898df44..8b2b6e8 100644 --- a/jellyfish/_notifier.py +++ b/jellyfish/_notifier.py @@ -24,7 +24,7 @@ class Notifier: Allows for receiving websocket notifications from Jellyfish. ''' - def __init__(self, server_address: str, server_api_token: str): + def __init__(self, server_address: str = 'localhost:5002', server_api_token: str = 'development'): self._server_address = server_address self._server_api_token = server_api_token self._websocket = None diff --git a/tests/support/asyncio_utils.py b/tests/support/asyncio_utils.py index a191b07..96c96b0 100644 --- a/tests/support/asyncio_utils.py +++ b/tests/support/asyncio_utils.py @@ -6,14 +6,14 @@ async def assert_events(notifier: Notifier, event_checks: list): - await _assert_messages(notifier.on_server_notification, event_checks) + await _assert_messages(notifier.on_server_notification, event_checks, 1.) async def assert_metrics(notifier: Notifier, metrics_checks: list): - await _assert_messages(notifier.on_metrics, metrics_checks) + await _assert_messages(notifier.on_metrics, metrics_checks, 2.) -async def _assert_messages(notifier_callback, message_checks): +async def _assert_messages(notifier_callback, message_checks, timeout): success_event = asyncio.Event() @notifier_callback @@ -26,7 +26,7 @@ def handle_message(message): success_event.set() try: - await asyncio.wait_for(success_event.wait(), 1) + await asyncio.wait_for(success_event.wait(), timeout) except asyncio.exceptions.TimeoutError as exc: raise asyncio.exceptions.TimeoutError( f"{message_checks[0]} hasn't been received within timeout") from exc diff --git a/tests/notifier_test.py b/tests/test_notifier.py similarity index 97% rename from tests/notifier_test.py rename to tests/test_notifier.py index b2c2eca..a6b990d 100644 --- a/tests/notifier_test.py +++ b/tests/test_notifier.py @@ -6,7 +6,7 @@ import pytest from jellyfish import Notifier, RoomApi, PeerOptionsWebRTC -from jellyfish._protos.jellyfish import ( +from jellyfish import ( ServerMessageRoomCreated, ServerMessageRoomDeleted, ServerMessagePeerConnected, ServerMessagePeerDisconnected, ServerMessageMetricsReport) @@ -30,8 +30,7 @@ async def test_valid_credentials(self): # pylint: disable=protected-access assert notifier._websocket.open - notifier_task.cancel() - await asyncio.sleep(0.1) + cancel(notifier_task) @pytest.mark.asyncio async def test_invalid_credentials(self): From fb4bf3a58c853c5a7adf0db00f69852f401e9c6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Ro=C5=BCnawski?= Date: Mon, 25 Sep 2023 11:20:00 +0200 Subject: [PATCH 11/16] Fix CI --- docker-compose-test.yaml | 17 ++++++++--------- jellyfish/_notifier.py | 3 ++- tests/test_notifier.py | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/docker-compose-test.yaml b/docker-compose-test.yaml index 0940d6d..5a49005 100644 --- a/docker-compose-test.yaml +++ b/docker-compose-test.yaml @@ -13,15 +13,14 @@ services: timeout: 2s start_period: 30s environment: - HOST: "jellyfish" - USE_INTEGRATED_TURN: "true" - INTEGRATED_TURN_IP: "${INTEGRATED_TURN_IP:-127.0.0.1}" - INTEGRATED_TURN_LISTEN_IP: "0.0.0.0" - INTEGRATED_TURN_PORT_RANGE: "50000-50050" - INTEGRATED_TCP_TURN_PORT: "49999" - SERVER_API_TOKEN: "development" - PORT: 5002 - SECRET_KEY_BASE: "super-secret-key" + JF_HOST: "jellyfish" + JF_INTEGRATED_TURN_IP: "${INTEGRATED_TURN_IP:-127.0.0.1}" + JF_INTEGRATED_TURN_LISTEN_IP: "0.0.0.0" + JF_INTEGRATED_TURN_PORT_RANGE: "50000-50050" + JF_INTEGRATED_TCP_TURN_PORT: "49999" + JF_SERVER_API_TOKEN: "development" + JF_PORT: 5002 + JF_SECRET_KEY_BASE: "super-secret-key" ports: - "5002:5002" - "49999:49999" diff --git a/jellyfish/_notifier.py b/jellyfish/_notifier.py index 8b2b6e8..ef9e2ac 100644 --- a/jellyfish/_notifier.py +++ b/jellyfish/_notifier.py @@ -24,7 +24,8 @@ class Notifier: Allows for receiving websocket notifications from Jellyfish. ''' - def __init__(self, server_address: str = 'localhost:5002', server_api_token: str = 'development'): + def __init__(self, + server_address: str = 'localhost:5002', server_api_token: str = 'development'): self._server_address = server_address self._server_api_token = server_api_token self._websocket = None diff --git a/tests/test_notifier.py b/tests/test_notifier.py index a6b990d..5074e85 100644 --- a/tests/test_notifier.py +++ b/tests/test_notifier.py @@ -30,7 +30,7 @@ async def test_valid_credentials(self): # pylint: disable=protected-access assert notifier._websocket.open - cancel(notifier_task) + await cancel(notifier_task) @pytest.mark.asyncio async def test_invalid_credentials(self): From 98834dbd4b8f4a04ead63925d401edbc08bde885 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Ro=C5=BCnawski?= Date: Mon, 25 Sep 2023 14:15:42 +0200 Subject: [PATCH 12/16] Update docstrings --- README.md | 4 ++-- jellyfish/__init__.py | 15 ++++++++------- jellyfish/_notifier.py | 29 ++++++++++++++++++----------- tests/test_notifier.py | 6 +++--- 4 files changed, 31 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 9b8c8ab..224e5ec 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ from jellyfish import RoomApi room_api = RoomApi(server_address='localhost:5002', server_api_token='development') ``` -You can use it to interact with Jellyfish managing rooms, peers and components +You can use it to interact with Jellyfish, managing rooms, peers and components ```python # Create a room @@ -87,7 +87,7 @@ async def test_notifier(): asyncio.run(test_notifier()) # Received a notification: ServerMessageRoomCreated(room_id='69a3fd1a-6a4d-47bc-ae54-0c72b0d05e29') -# Received metrics: {} +# Received WebRTC metrics: ServerMessageMetricsReport(metrics='{}') ``` ## Copyright and License diff --git a/jellyfish/__init__.py b/jellyfish/__init__.py index 92f49d6..890c685 100644 --- a/jellyfish/__init__.py +++ b/jellyfish/__init__.py @@ -16,7 +16,10 @@ ComponentOptionsRTSP, ComponentOptionsHLS, PeerOptionsWebRTC) # Server Messages -from jellyfish._protos.jellyfish import * +from jellyfish._protos.jellyfish import ( + ServerMessageRoomCrashed, ServerMessageComponentCrashed, ServerMessageHlsPlayable, + ServerMessageMetricsReport, ServerMessagePeerCrashed, ServerMessagePeerConnected, + ServerMessagePeerDisconnected, ServerMessageRoomDeleted, ServerMessageRoomCreated) # Exceptions from jellyfish._openapi_client.exceptions import ( @@ -26,11 +29,9 @@ __all__ = [ 'RoomApi', 'Notifier', 'Room', 'Peer', 'Component', 'ComponentHLS', 'ComponentRTSP', 'ComponentOptionsHLS', 'RoomConfig', 'ComponentOptions', 'ComponentOptionsRTSP', - 'PeerOptionsWebRTC', 'ServerMessageAuthRequest', 'ServerMessageAuthenticated', - 'ServerMessageSubscribeRequest', 'ServerMessageEventType', 'ServerMessageSubscribeResponse', - 'ServerMessageMetricsReport', 'ServerMessageRoomCreated', 'ServerMessageRoomDeleted', - 'ServerMessageComponentCrashed', 'ServerMessageHlsPlayable', 'ServerMessagePeerConnected', - 'ServerMessagePeerCrashed', 'ServerMessagePeerDisconnected', 'ServerMessageRoomCrashed', - 'UnauthorizedException', 'NotFoundException', 'BadRequestException'] + 'PeerOptionsWebRTC', 'ServerMessageMetricsReport', 'ServerMessageRoomCreated', + 'ServerMessageRoomDeleted', 'ServerMessageComponentCrashed', 'ServerMessageHlsPlayable', + 'ServerMessagePeerConnected', 'ServerMessagePeerCrashed', 'ServerMessagePeerDisconnected', + 'ServerMessageRoomCrashed', 'UnauthorizedException', 'NotFoundException', 'BadRequestException'] __docformat__ = "restructuredtext" diff --git a/jellyfish/_notifier.py b/jellyfish/_notifier.py index ef9e2ac..2f5f3bb 100644 --- a/jellyfish/_notifier.py +++ b/jellyfish/_notifier.py @@ -1,5 +1,5 @@ ''' -Notifier +Notifier listening to WebSocket events ''' import logging @@ -21,7 +21,7 @@ class Notifier: ''' - Allows for receiving websocket notifications from Jellyfish. + Allows for receiving WebSocket messages from Jellyfish. ''' def __init__(self, @@ -38,21 +38,28 @@ def __init__(self, def on_server_notification(self, handler: Callable[[Any], None]): ''' - Decorator used for defining handler for ServerNotifications. + Decorator used for defining handler for ServerNotifications + i.e. all messages other than `ServerMessageMetricsReport`. ''' self._notification_handler = handler return handler def on_metrics(self, handler: Callable[[ServerMessageMetricsReport], None]): ''' - Decorator used for defining handler for MetricsReport. + Decorator used for defining handler for `ServerMessageMetricsReport`. ''' self._metrics_handler = handler return handler async def connect(self): ''' - Connects Notifier to Jellyfish + A coroutine which connects Notifier to Jellyfish and listens for all incoming + messages from the Jellyfish. + + It runs until the connection isn't closed. + + The incoming messages are handled by the functions defined using the + `on_server_notification` and `on_metrics` decorators. ''' async with client.connect(f'ws://{self._server_address}/socket/server/websocket') \ as websocket: @@ -89,12 +96,14 @@ async def connect(self): self._websocket = None - async def wait_ready(self): + async def wait_ready(self) -> True: ''' - Waits ready + Waits until the notifier is connected and authenticated to Jellyfish. + + If already connected, returns `True` immediately. ''' if self._ready: - return + return True if self._ready_event is None: self._ready_event = asyncio.Event() @@ -105,9 +114,7 @@ async def _receive_loop(self): while True: message = await self._websocket.recv() message = ServerMessage().parse(message) - _which, message = betterproto.which_one_of(message, "content") - - logging.info('Received message from server: %s', message) + _which, message = betterproto.which_one_of(message, 'content') if isinstance(message, ServerMessageMetricsReport): self._metrics_handler(message) diff --git a/tests/test_notifier.py b/tests/test_notifier.py index 5074e85..f1ece8f 100644 --- a/tests/test_notifier.py +++ b/tests/test_notifier.py @@ -6,9 +6,9 @@ import pytest from jellyfish import Notifier, RoomApi, PeerOptionsWebRTC -from jellyfish import ( - ServerMessageRoomCreated, ServerMessageRoomDeleted, ServerMessagePeerConnected, - ServerMessagePeerDisconnected, ServerMessageMetricsReport) +from jellyfish import (ServerMessageRoomCreated, ServerMessageRoomDeleted, + ServerMessagePeerConnected, ServerMessagePeerDisconnected, + ServerMessageMetricsReport) from tests.support.peer_socket import PeerSocket from tests.support.asyncio_utils import assert_events, assert_metrics, cancel From 8ceaf6f2246fd3dd60e85e0ccfc2e25d313c2649 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Ro=C5=BCnawski?= Date: Mon, 25 Sep 2023 14:51:01 +0200 Subject: [PATCH 13/16] Move events to separate package --- compile_proto.sh | 2 +- jellyfish/__init__.py | 11 +++-------- jellyfish/_notifier.py | 2 +- jellyfish/events/__init__.py | 21 +++++++++++++++++++++ jellyfish/{ => events}/_protos/__init__.py | 0 jellyfish/{ => events}/_protos/jellyfish.py | 0 pylintrc | 2 +- tests/test_notifier.py | 6 +++--- 8 files changed, 30 insertions(+), 14 deletions(-) create mode 100644 jellyfish/events/__init__.py rename jellyfish/{ => events}/_protos/__init__.py (100%) rename jellyfish/{ => events}/_protos/jellyfish.py (100%) diff --git a/compile_proto.sh b/compile_proto.sh index b0011d3..6d3719e 100755 --- a/compile_proto.sh +++ b/compile_proto.sh @@ -11,7 +11,7 @@ printf "DONE\n\n" server_file="./protos/jellyfish/server_notifications.proto" printf "Compiling: file $server_file" -protoc -I . --python_betterproto_out=./jellyfish/_protos $server_file +protoc -I . --python_betterproto_out=./jellyfish/events/_protos $server_file printf "\tDONE\n" peer_file="./protos/jellyfish/peer_notifications.proto" diff --git a/jellyfish/__init__.py b/jellyfish/__init__.py index 890c685..cf0366f 100644 --- a/jellyfish/__init__.py +++ b/jellyfish/__init__.py @@ -16,10 +16,7 @@ ComponentOptionsRTSP, ComponentOptionsHLS, PeerOptionsWebRTC) # Server Messages -from jellyfish._protos.jellyfish import ( - ServerMessageRoomCrashed, ServerMessageComponentCrashed, ServerMessageHlsPlayable, - ServerMessageMetricsReport, ServerMessagePeerCrashed, ServerMessagePeerConnected, - ServerMessagePeerDisconnected, ServerMessageRoomDeleted, ServerMessageRoomCreated) +from jellyfish import events # Exceptions from jellyfish._openapi_client.exceptions import ( @@ -29,9 +26,7 @@ __all__ = [ 'RoomApi', 'Notifier', 'Room', 'Peer', 'Component', 'ComponentHLS', 'ComponentRTSP', 'ComponentOptionsHLS', 'RoomConfig', 'ComponentOptions', 'ComponentOptionsRTSP', - 'PeerOptionsWebRTC', 'ServerMessageMetricsReport', 'ServerMessageRoomCreated', - 'ServerMessageRoomDeleted', 'ServerMessageComponentCrashed', 'ServerMessageHlsPlayable', - 'ServerMessagePeerConnected', 'ServerMessagePeerCrashed', 'ServerMessagePeerDisconnected', - 'ServerMessageRoomCrashed', 'UnauthorizedException', 'NotFoundException', 'BadRequestException'] + 'PeerOptionsWebRTC', 'events', 'UnauthorizedException', 'NotFoundException', + 'BadRequestException'] __docformat__ = "restructuredtext" diff --git a/jellyfish/_notifier.py b/jellyfish/_notifier.py index 2f5f3bb..e2d8314 100644 --- a/jellyfish/_notifier.py +++ b/jellyfish/_notifier.py @@ -13,7 +13,7 @@ from websockets import client from websockets.exceptions import ConnectionClosedOK -from jellyfish._protos.jellyfish import ( +from jellyfish.events import ( ServerMessage, ServerMessageAuthRequest, ServerMessageAuthenticated, ServerMessageSubscribeRequest, ServerMessageEventType, ServerMessageSubscribeResponse, ServerMessageMetricsReport) diff --git a/jellyfish/events/__init__.py b/jellyfish/events/__init__.py new file mode 100644 index 0000000..39b8239 --- /dev/null +++ b/jellyfish/events/__init__.py @@ -0,0 +1,21 @@ +''' +Server events being sent Jellyfish +''' + +# Private messages +from jellyfish.events._protos.jellyfish import ( + ServerMessage, ServerMessageAuthenticated, ServerMessageAuthRequest, ServerMessageEventType, + ServerMessageSubscribeResponse, ServerMessageSubscribeRequest) + +# Exported messages +from jellyfish.events._protos.jellyfish import ( + ServerMessageComponentCrashed, ServerMessageHlsPlayable, + ServerMessageMetricsReport, ServerMessagePeerCrashed, ServerMessagePeerConnected, + ServerMessagePeerDisconnected, ServerMessageRoomCrashed, ServerMessageRoomDeleted, + ServerMessageRoomCreated) + +__all__ = [ + 'ServerMessageComponentCrashed', 'ServerMessageHlsPlayable', + 'ServerMessageMetricsReport', 'ServerMessagePeerCrashed', 'ServerMessagePeerConnected', + 'ServerMessagePeerDisconnected', 'ServerMessageRoomCrashed', 'ServerMessageRoomDeleted', + 'ServerMessageRoomCreated'] diff --git a/jellyfish/_protos/__init__.py b/jellyfish/events/_protos/__init__.py similarity index 100% rename from jellyfish/_protos/__init__.py rename to jellyfish/events/_protos/__init__.py diff --git a/jellyfish/_protos/jellyfish.py b/jellyfish/events/_protos/jellyfish.py similarity index 100% rename from jellyfish/_protos/jellyfish.py rename to jellyfish/events/_protos/jellyfish.py diff --git a/pylintrc b/pylintrc index fdddb75..33374d2 100644 --- a/pylintrc +++ b/pylintrc @@ -52,7 +52,7 @@ ignore=CVS # ignore-list. The regex matches against paths and can be in Posix or Windows # format. Because '\\' represents the directory delimiter on Windows systems, # it can't be used as an escape character. -ignore-paths=jellyfish/_openapi_client/,jellyfish/_protos/,tests/support/protos +ignore-paths=jellyfish/_openapi_client/,jellyfish/events/_protos/,tests/support/protos # Files or directories matching the regular expression patterns are skipped. # The regex matches against base names, not paths. The default value ignores diff --git a/tests/test_notifier.py b/tests/test_notifier.py index f1ece8f..236280a 100644 --- a/tests/test_notifier.py +++ b/tests/test_notifier.py @@ -6,9 +6,9 @@ import pytest from jellyfish import Notifier, RoomApi, PeerOptionsWebRTC -from jellyfish import (ServerMessageRoomCreated, ServerMessageRoomDeleted, - ServerMessagePeerConnected, ServerMessagePeerDisconnected, - ServerMessageMetricsReport) +from jellyfish.events import (ServerMessageRoomCreated, ServerMessageRoomDeleted, + ServerMessagePeerConnected, ServerMessagePeerDisconnected, + ServerMessageMetricsReport) from tests.support.peer_socket import PeerSocket from tests.support.asyncio_utils import assert_events, assert_metrics, cancel From aa72e1eba0e6b4870ac10cebf85e941fa03a1f0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Ro=C5=BCnawski?= Date: Mon, 25 Sep 2023 15:30:35 +0200 Subject: [PATCH 14/16] Add testing section --- README.md | 16 ++++++++- jellyfish/_notifier.py | 63 +++++++++++++++++------------------- tests/support/peer_socket.py | 1 - 3 files changed, 45 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index 224e5ec..1f79042 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![CircleCI](https://dl.circleci.com/status-badge/img/gh/jellyfish-dev/python-server-sdk/tree/main.svg?style=svg)](https://dl.circleci.com/status-badge/redirect/gh/jellyfish-dev/python-server-sdk/tree/main) -Python server SDK for the [Jellyfish](https://github.com/jellyfish-dev/jellyfish) media server. +Python server SDK for the [Jellyfish Media Server](https://github.com/jellyfish-dev/jellyfish). Read the docs [here](https://jellyfish-dev.github.io/python-server-sdk/jellyfish.html) @@ -90,6 +90,20 @@ asyncio.run(test_notifier()) # Received WebRTC metrics: ServerMessageMetricsReport(metrics='{}') ``` +## Testing + +You can test the SDK against a local instance of Jellyfish by running +```console +pytest +``` + +Make sure to use the default configuration for Jellyfish + +Alternatively you can test using Docker +```console +docker-compose -f docker-compose-test.yaml run test +``` + ## Copyright and License Copyright 2023, [Software Mansion](https://swmansion.com/?utm_source=git&utm_medium=readme&utm_campaign=jellyfish) diff --git a/jellyfish/_notifier.py b/jellyfish/_notifier.py index e2d8314..91d057d 100644 --- a/jellyfish/_notifier.py +++ b/jellyfish/_notifier.py @@ -2,16 +2,13 @@ Notifier listening to WebSocket events ''' -import logging import asyncio - from typing import Callable, Any - import betterproto from websockets import client -from websockets.exceptions import ConnectionClosedOK +from websockets.exceptions import ConnectionClosed from jellyfish.events import ( ServerMessage, ServerMessageAuthRequest, ServerMessageAuthenticated, @@ -63,38 +60,24 @@ async def connect(self): ''' async with client.connect(f'ws://{self._server_address}/socket/server/websocket') \ as websocket: - self._websocket = websocket - - msg = ServerMessage(auth_request=ServerMessageAuthRequest(token=self._server_api_token)) - await websocket.send(bytes(msg)) - try: - message = await websocket.recv() - except ConnectionClosedOK as exception: - raise RuntimeError from exception - - message = ServerMessage().parse(message) - - _type, message = betterproto.which_one_of(message, 'content') - assert isinstance(message, ServerMessageAuthenticated) - - if self._notification_handler: - await self._subscribe_event( - event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION) - - if self._metrics_handler: - await self._subscribe_event(event=ServerMessageEventType.EVENT_TYPE_METRICS) + self._websocket = websocket + await self._authenticate() - self._ready = True + if self._notification_handler: + await self._subscribe_event( + event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION) - if self._ready_event: - self._ready_event.set() + if self._metrics_handler: + await self._subscribe_event(event=ServerMessageEventType.EVENT_TYPE_METRICS) - receive_task = asyncio.create_task(self._receive_loop()) + self._ready = True + if self._ready_event: + self._ready_event.set() - await receive_task - - self._websocket = None + await self._receive_loop() + finally: + self._websocket = None async def wait_ready(self) -> True: ''' @@ -110,6 +93,22 @@ async def wait_ready(self) -> True: await self._ready_event.wait() + async def _authenticate(self): + msg = ServerMessage(auth_request=ServerMessageAuthRequest(token=self._server_api_token)) + await self._websocket.send(bytes(msg)) + + try: + message = await self._websocket.recv() + except ConnectionClosed as exception: + if 'invalid token' in str(exception): + raise RuntimeError('Invalid server_api_token') from exception + raise + + message = ServerMessage().parse(message) + + _type, message = betterproto.which_one_of(message, 'content') + assert isinstance(message, ServerMessageAuthenticated) + async def _receive_loop(self): while True: message = await self._websocket.recv() @@ -129,5 +128,3 @@ async def _subscribe_event(self, event: ServerMessageEventType): message = ServerMessage().parse(message) _which, message = betterproto.which_one_of(message, "content") assert isinstance(message, ServerMessageSubscribeResponse) - - logging.info('Successfully subscribed to %s', event) diff --git a/tests/support/peer_socket.py b/tests/support/peer_socket.py index 0eda097..b7abf5c 100644 --- a/tests/support/peer_socket.py +++ b/tests/support/peer_socket.py @@ -26,7 +26,6 @@ async def connect(self, token): try: message = await websocket.recv() except ConnectionClosedOK as exception: - print(exception) raise RuntimeError from exception message = PeerMessage().parse(message) From 04e11a5ab5c80de81b6cb64728e69348e6c5433b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Ro=C5=BCnawski?= Date: Tue, 26 Sep 2023 14:39:21 +0200 Subject: [PATCH 15/16] Remove obnoxious cursing from the code --- tests/support/asyncio_utils.py | 10 ++++++---- tests/test_notifier.py | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/support/asyncio_utils.py b/tests/support/asyncio_utils.py index 96c96b0..acf10d4 100644 --- a/tests/support/asyncio_utils.py +++ b/tests/support/asyncio_utils.py @@ -4,16 +4,18 @@ from jellyfish import Notifier +ASSERTION_TIMEOUT = 2. + async def assert_events(notifier: Notifier, event_checks: list): - await _assert_messages(notifier.on_server_notification, event_checks, 1.) + await _assert_messages(notifier.on_server_notification, event_checks) async def assert_metrics(notifier: Notifier, metrics_checks: list): - await _assert_messages(notifier.on_metrics, metrics_checks, 2.) + await _assert_messages(notifier.on_metrics, metrics_checks) -async def _assert_messages(notifier_callback, message_checks, timeout): +async def _assert_messages(notifier_callback, message_checks): success_event = asyncio.Event() @notifier_callback @@ -26,7 +28,7 @@ def handle_message(message): success_event.set() try: - await asyncio.wait_for(success_event.wait(), timeout) + await asyncio.wait_for(success_event.wait(), ASSERTION_TIMEOUT) except asyncio.exceptions.TimeoutError as exc: raise asyncio.exceptions.TimeoutError( f"{message_checks[0]} hasn't been received within timeout") from exc diff --git a/tests/test_notifier.py b/tests/test_notifier.py index 236280a..ebe9170 100644 --- a/tests/test_notifier.py +++ b/tests/test_notifier.py @@ -35,7 +35,7 @@ async def test_valid_credentials(self): @pytest.mark.asyncio async def test_invalid_credentials(self): notifier = Notifier(server_address=SERVER_ADDRESS, - server_api_token='server_crappy_token') + server_api_token='wrong_token') task = asyncio.create_task(notifier.connect()) From 7cc9c1c04dbbdd78d1d8e8de58a2817b0c1ea4ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Ro=C5=BCnawski?= Date: Wed, 27 Sep 2023 15:44:43 +0200 Subject: [PATCH 16/16] Update docs --- README.md | 2 +- jellyfish/_notifier.py | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 1f79042..e62dcb7 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ from jellyfish import RoomApi room_api = RoomApi(server_address='localhost:5002', server_api_token='development') ``` -You can use it to interact with Jellyfish, managing rooms, peers and components +You can use it to interact with Jellyfish, manage rooms, peers and components ```python # Create a room diff --git a/jellyfish/_notifier.py b/jellyfish/_notifier.py index 91d057d..c9e01de 100644 --- a/jellyfish/_notifier.py +++ b/jellyfish/_notifier.py @@ -57,9 +57,12 @@ async def connect(self): The incoming messages are handled by the functions defined using the `on_server_notification` and `on_metrics` decorators. + + The handlers have to be defined before calling `connect`, + otherwise the messages won't be received. ''' - async with client.connect(f'ws://{self._server_address}/socket/server/websocket') \ - as websocket: + address = f'ws://{self._server_address}/socket/server/websocket' + async with client.connect(address) as websocket: try: self._websocket = websocket await self._authenticate()