Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib/jellyfish/event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ defmodule Jellyfish.Event do
HlsUploadCrashed,
HlsUploaded,
MetricsReport,
PeerAdded,
PeerConnected,
PeerCrashed,
PeerDeleted,
PeerDisconnected,
PeerMetadataUpdated,
RoomCrashed,
Expand Down Expand Up @@ -54,6 +56,12 @@ defmodule Jellyfish.Event do
defp to_proto_server_notification({:room_crashed, room_id}),
do: {:room_crashed, %RoomCrashed{room_id: room_id}}

defp to_proto_server_notification({:peer_added, room_id, peer_id}),
do: {:peer_added, %PeerAdded{room_id: room_id, peer_id: peer_id}}

defp to_proto_server_notification({:peer_deleted, room_id, peer_id}),
do: {:peer_deleted, %PeerDeleted{room_id: room_id, peer_id: peer_id}}

defp to_proto_server_notification({:peer_connected, room_id, peer_id}),
do: {:peer_connected, %PeerConnected{room_id: room_id, peer_id: peer_id}}

Expand Down
4 changes: 1 addition & 3 deletions lib/jellyfish/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ defmodule Jellyfish.Room do
with false <- State.reached_peers_limit?(state),
options <- State.generate_peer_options(state, override_options),
{:ok, peer} <- Peer.new(peer_type, options) do
state = State.put_peer(state, peer)

Logger.info("Added peer #{inspect(peer.id)}")
state = State.add_peer(state, peer)

{:reply, {:ok, peer}, state}
else
Expand Down
24 changes: 20 additions & 4 deletions lib/jellyfish/room/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ defmodule Jellyfish.Room.State do
put_in(state, [:peers, peer_id, :metadata], metadata)
end

@spec add_peer(state :: t(), peer :: Peer.t()) :: t()
def add_peer(state, peer) do
state = put_peer(state, peer)

Logger.info("Added peer #{inspect(peer.id)}")
Event.broadcast_server_notification({:peer_added, state.id, peer.id})

state
end

@spec connect_peer(state :: t(), peer :: Peer.t(), socket_pid :: pid()) :: t()
def connect_peer(state, peer, socket_pid) do
peer = %{peer | status: :connected, socket_pid: socket_pid}
Expand Down Expand Up @@ -255,7 +265,9 @@ defmodule Jellyfish.Room.State do

@spec remove_peer(state :: t(), peer_id :: Peer.id(), reason :: any()) :: t()
def remove_peer(state, peer_id, :timeout) do
{_peer, state} = pop_in(state, [:peers, peer_id])
{peer, state} = pop_in(state, [:peers, peer_id])

Event.broadcast_server_notification({:peer_deleted, state.id, peer.id})

maybe_schedule_peerless_purge(state)
end
Expand All @@ -280,9 +292,13 @@ defmodule Jellyfish.Room.State do
:telemetry.execute([:jellyfish, :room], %{peer_disconnects: 1}, %{room_id: state.id})
end

with {:peer_crashed, crash_reason} <- reason do
Event.broadcast_server_notification({:peer_crashed, state.id, peer_id, crash_reason})
:telemetry.execute([:jellyfish, :room], %{peer_crashes: 1}, %{room_id: state.id})
case reason do
{:peer_crashed, crash_reason} ->
Event.broadcast_server_notification({:peer_crashed, state.id, peer_id, crash_reason})
:telemetry.execute([:jellyfish, :room], %{peer_crashes: 1}, %{room_id: state.id})

_other ->
Event.broadcast_server_notification({:peer_deleted, state.id, peer.id})
end

maybe_schedule_peerless_purge(state)
Expand Down
25 changes: 25 additions & 0 deletions lib/protos/jellyfish/server_notifications.pb.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ defmodule Jellyfish.ServerMessage.RoomCrashed do
field :room_id, 1, type: :string, json_name: "roomId"
end

defmodule Jellyfish.ServerMessage.PeerAdded do
@moduledoc false

use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.12.0"

field :room_id, 1, type: :string, json_name: "roomId"
field :peer_id, 2, type: :string, json_name: "peerId"
end

defmodule Jellyfish.ServerMessage.PeerDeleted do
@moduledoc false

use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.12.0"

field :room_id, 1, type: :string, json_name: "roomId"
field :peer_id, 2, type: :string, json_name: "peerId"
end

defmodule Jellyfish.ServerMessage.PeerConnected do
@moduledoc false

Expand Down Expand Up @@ -305,4 +323,11 @@ defmodule Jellyfish.ServerMessage do
type: Jellyfish.ServerMessage.TrackMetadataUpdated,
json_name: "trackMetadataUpdated",
oneof: 0

field :peer_added, 20, type: Jellyfish.ServerMessage.PeerAdded, json_name: "peerAdded", oneof: 0

field :peer_deleted, 21,
type: Jellyfish.ServerMessage.PeerDeleted,
json_name: "peerDeleted",
oneof: 0
end
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"klotho": {:hex, :klotho, "0.1.2", "3b1f1a569703e0cdce1ba964f41711351a7b06846c38fcbd601faa407e712bf2", [:mix], [], "hexpm", "a6a387982753582e30a5246fe9561721c6b9a4dd27678296cf2cd44faa6f3733"},
"libcluster": {:hex, :libcluster, "3.3.3", "a4f17721a19004cfc4467268e17cff8b1f951befe428975dd4f6f7b84d927fe0", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "7c0a2275a0bb83c07acd17dab3c3bfb4897b145106750eeccc62d302e3bdfee5"},
"logger_json": {:hex, :logger_json, "5.1.4", "9e30a4f2e31a8b9e402bdc20bd37cf9b67d3a31f19d0b33082a19a06b4c50f6d", [:mix], [{:ecto, "~> 2.1 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:phoenix, ">= 1.5.0", [hex: :phoenix, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "3f20eea58e406a33d3eb7814c7dff5accb503bab2ee8601e84da02976fa3934c"},
"membrane_aac_fdk_plugin": {:hex, :membrane_aac_fdk_plugin, "0.18.7", "4d9af018c22d9291b72d6025941452dd53c7921bcdbc826da8866bb6ecefa8cb", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "79904c3b78882bd0cec15b02928e6b53780602e64a359941acbc9a2408e7b74b"},
"membrane_aac_fdk_plugin": {:hex, :membrane_aac_fdk_plugin, "0.18.8", "88d47923805cbd9a977fc7e5d3eb8d3028a2e358ad9ad7b124684adc78c2e8ee", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "bb9e706d0949954affd4e295f5d3d4660096997756b5422119800d961c46cc63"},
"membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"},
"membrane_aac_plugin": {:hex, :membrane_aac_plugin, "0.18.1", "30433bffd4d5d773f79448dd9afd55d77338721688f09a89b20d742a68cc2c3d", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "8fd048c47d5d2949eb557e19f43f62d534d3af5096187f1a1a3a1694d14b772c"},
"membrane_audio_mix_plugin": {:hex, :membrane_audio_mix_plugin, "0.16.0", "34997707ee186683c6d7bd87572944e5e37c0249235cc44915d181d653c5c40e", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "a4a8c723f0da8d9cf9ac11bf657a732770ea0b8db4eff2efc16caa3a1819f435"},
Expand Down
2 changes: 1 addition & 1 deletion protos
32 changes: 26 additions & 6 deletions test/jellyfish_web/integration/server_notification_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do
HlsUploadCrashed,
HlsUploaded,
MetricsReport,
PeerAdded,
PeerConnected,
PeerDeleted,
PeerDisconnected,
PeerMetadataUpdated,
RoomCrashed,
Expand Down Expand Up @@ -172,7 +174,7 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do
test "doesn't send messages if not subscribed", %{conn: conn} do
create_and_authenticate()

trigger_notification(conn)
trigger_notification(conn, false)

refute_receive %PeerConnected{}, 200
end
Expand Down Expand Up @@ -224,6 +226,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do
conn = delete(conn, ~p"/room/#{room_id}/peer/#{peer_id}")
assert response(conn, :no_content)

assert_receive %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}
assert_receive {:webhook_notification, %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}}

Klotho.Mock.warp_by(@purge_timeout_ms + 25)

assert_receive %RoomDeleted{room_id: ^room_id}, 1_000
Expand All @@ -244,6 +249,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do

refute_received {:webhook_notification,
%PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}}

assert_receive %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}
assert_receive {:webhook_notification, %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}}
end

test "sends a message when peer connects and peer is removed", %{conn: conn} do
Expand All @@ -258,6 +266,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do
%PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}},
1_000

assert_receive %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}
assert_receive {:webhook_notification, %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}}

_conn = delete(conn, ~p"/room/#{room_id}")
assert_receive %RoomDeleted{room_id: ^room_id}

Expand Down Expand Up @@ -419,6 +430,7 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do
assert_receive {:webhook_notification, %RoomCreated{room_id: ^room_id}}, 1_000

{peer_id, token, _conn} = add_peer(conn, room_id)

{:ok, peer_ws} = WS.start("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer)
WS.send_auth_request(peer_ws, token)

Expand All @@ -438,6 +450,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do

Klotho.Mock.warp_by(@purge_timeout_ms * 3)

assert_receive %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}
assert_receive {:webhook_notification, %PeerDeleted{room_id: ^room_id, peer_id: ^peer_id}}

assert_receive %RoomDeleted{room_id: ^room_id}, 1_000
assert_receive {:webhook_notification, %RoomDeleted{room_id: ^room_id}}, 1_000
end
Expand Down Expand Up @@ -592,9 +607,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do
ws
end

defp add_room_and_peer(conn) do
defp add_room_and_peer(conn, assert_notifications? \\ true) do
{room_id, conn} = add_room(conn)
{peer_id, token, conn} = add_peer(conn, room_id)
{peer_id, token, conn} = add_peer(conn, room_id, assert_notifications?)

{room_id, peer_id, token, conn}
end
Expand All @@ -612,12 +627,17 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do
{room_id, conn}
end

defp add_peer(conn, room_id) do
defp add_peer(conn, room_id, assert_notifications? \\ true) do
conn = post(conn, ~p"/room/#{room_id}/peer", type: "webrtc")

assert %{"token" => peer_token, "peer" => %{"id" => peer_id}} =
json_response(conn, :created)["data"]

if assert_notifications? do
assert_receive %PeerAdded{room_id: ^room_id, peer_id: ^peer_id}
assert_receive {:webhook_notification, %PeerAdded{room_id: ^room_id, peer_id: ^peer_id}}
end

{peer_id, peer_token, conn}
end

Expand Down Expand Up @@ -670,8 +690,8 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do
{conn, component_id}
end

defp trigger_notification(conn) do
{_room_id, _peer_id, peer_token, _conn} = add_room_and_peer(conn)
defp trigger_notification(conn, assert_notifications?) do
{_room_id, _peer_id, peer_token, _conn} = add_room_and_peer(conn, assert_notifications?)

{:ok, peer_ws} = WS.start_link("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer)
WS.send_auth_request(peer_ws, peer_token)
Expand Down