Skip to content

Commit 2fffb55

Browse files
committed
Use 1 instead of 4 processes per WebMQTT connection
Reducing number of Erlang processes allows better scaling with millions of clients connecting via MQTT over WebSockets. In this commit, we partially revert 9c153b2 where support for heartbeats were introduced. Prior to this commit there were 4 processes per WebMQTT connection supervised by ranch_conns_sup in the ranch application: 1. rabbit_web_mqtt_connection_sup 2. rabbit_web_mqtt_connection_sup (id=rabbit_web_mqtt_keepalive_sup) 3. rabbit_hearbeat (receiver) 4. rabbit_web_mqtt_handler connection process After this commit, there is only the 4th process supervised directly by ranch_conns_sup.
1 parent 648691c commit 2fffb55

File tree

4 files changed

+71
-174
lines changed

4 files changed

+71
-174
lines changed

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl

Lines changed: 50 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,14 @@ cowboy_ws_connection_pid(RanchConnPid) ->
8080
mqtt_init() ->
8181
CowboyOpts0 = maps:from_list(get_env(cowboy_opts, [])),
8282
CowboyWsOpts = maps:from_list(get_env(cowboy_ws_opts, [])),
83-
8483
Routes = cowboy_router:compile([{'_', [
8584
{get_env(ws_path, "/ws"), rabbit_web_mqtt_handler, [{ws_opts, CowboyWsOpts}]}
8685
]}]),
87-
CowboyOpts = CowboyOpts0#{env => #{dispatch => Routes},
88-
middlewares => [cowboy_router, rabbit_web_mqtt_middleware, cowboy_handler],
89-
proxy_header => get_env(proxy_protocol, false),
90-
stream_handlers => [rabbit_web_mqtt_stream_handler, cowboy_stream_h]},
86+
CowboyOpts = CowboyOpts0#{
87+
env => #{dispatch => Routes},
88+
proxy_header => get_env(proxy_protocol, false),
89+
stream_handlers => [rabbit_web_mqtt_stream_handler, cowboy_stream_h]
90+
},
9191
case get_env(tcp_config, []) of
9292
[] -> ok;
9393
TCPConf0 -> start_tcp_listener(TCPConf0, CowboyOpts)
@@ -99,59 +99,55 @@ mqtt_init() ->
9999
ok.
100100

101101
start_tcp_listener(TCPConf0, CowboyOpts) ->
102-
{TCPConf, IpStr, Port} = get_tcp_conf(TCPConf0),
103-
RanchTransportOpts = #{
104-
socket_opts => TCPConf,
105-
connection_type => supervisor,
106-
max_connections => get_max_connections(),
107-
num_acceptors => get_env(num_tcp_acceptors, 10),
108-
num_conns_sups => get_env(num_conns_sup, 1)
109-
},
110-
case ranch:start_listener(rabbit_networking:ranch_ref(TCPConf),
111-
ranch_tcp,
112-
RanchTransportOpts,
113-
rabbit_web_mqtt_connection_sup,
114-
CowboyOpts) of
115-
{ok, _} -> ok;
116-
{error, {already_started, _}} -> ok;
117-
{error, ErrTCP} ->
118-
rabbit_log_connection:error(
119-
"Failed to start a WebSocket (HTTP) listener. Error: ~p,"
120-
" listener settings: ~p",
102+
{TCPConf, IpStr, Port} = get_tcp_conf(TCPConf0),
103+
RanchRef = rabbit_networking:ranch_ref(TCPConf),
104+
RanchTransportOpts =
105+
#{
106+
socket_opts => TCPConf,
107+
max_connections => get_max_connections(),
108+
num_acceptors => get_env(num_tcp_acceptors, 10),
109+
num_conns_sups => get_env(num_conns_sup, 1)
110+
},
111+
case cowboy:start_clear(RanchRef, RanchTransportOpts, CowboyOpts) of
112+
{ok, _} ->
113+
ok;
114+
{error, {already_started, _}} ->
115+
ok;
116+
{error, ErrTCP} ->
117+
rabbit_log_connection:error(
118+
"Failed to start a WebSocket (HTTP) listener. Error: ~p, listener settings: ~p",
121119
[ErrTCP, TCPConf]),
122-
throw(ErrTCP)
123-
end,
124-
listener_started(?TCP_PROTOCOL, TCPConf),
125-
rabbit_log:info("rabbit_web_mqtt: listening for HTTP connections on ~s:~w",
126-
[IpStr, Port]).
120+
throw(ErrTCP)
121+
end,
122+
listener_started(?TCP_PROTOCOL, TCPConf),
123+
rabbit_log:info("rabbit_web_mqtt: listening for HTTP connections on ~s:~w",
124+
[IpStr, Port]).
127125

128126
start_tls_listener(TLSConf0, CowboyOpts) ->
129-
rabbit_networking:ensure_ssl(),
130-
{TLSConf, TLSIpStr, TLSPort} = get_tls_conf(TLSConf0),
131-
RanchTransportOpts = #{
132-
socket_opts => TLSConf,
133-
connection_type => supervisor,
134-
max_connections => get_max_connections(),
135-
num_acceptors => get_env(num_ssl_acceptors, 10),
136-
num_conns_sups => get_env(num_conns_sup, 1)
137-
},
138-
case ranch:start_listener(rabbit_networking:ranch_ref(TLSConf),
139-
ranch_ssl,
140-
RanchTransportOpts,
141-
rabbit_web_mqtt_connection_sup,
142-
CowboyOpts) of
143-
{ok, _} -> ok;
144-
{error, {already_started, _}} -> ok;
145-
{error, ErrTLS} ->
146-
rabbit_log_connection:error(
147-
"Failed to start a TLS WebSocket (HTTPS) listener. Error: ~p,"
148-
" listener settings: ~p",
127+
rabbit_networking:ensure_ssl(),
128+
{TLSConf, TLSIpStr, TLSPort} = get_tls_conf(TLSConf0),
129+
RanchRef = rabbit_networking:ranch_ref(TLSConf),
130+
RanchTransportOpts =
131+
#{
132+
socket_opts => TLSConf,
133+
max_connections => get_max_connections(),
134+
num_acceptors => get_env(num_ssl_acceptors, 10),
135+
num_conns_sups => get_env(num_conns_sup, 1)
136+
},
137+
case cowboy:start_tls(RanchRef, RanchTransportOpts, CowboyOpts) of
138+
{ok, _} ->
139+
ok;
140+
{error, {already_started, _}} ->
141+
ok;
142+
{error, ErrTLS} ->
143+
rabbit_log_connection:error(
144+
"Failed to start a TLS WebSocket (HTTPS) listener. Error: ~p, listener settings: ~p",
149145
[ErrTLS, TLSConf]),
150-
throw(ErrTLS)
151-
end,
152-
listener_started(?TLS_PROTOCOL, TLSConf),
153-
rabbit_log:info("rabbit_web_mqtt: listening for HTTPS connections on ~s:~w",
154-
[TLSIpStr, TLSPort]).
146+
throw(ErrTLS)
147+
end,
148+
listener_started(?TLS_PROTOCOL, TLSConf),
149+
rabbit_log:info("rabbit_web_mqtt: listening for HTTPS connections on ~s:~w",
150+
[TLSIpStr, TLSPort]).
155151

156152
listener_started(Protocol, Listener) ->
157153
Port = rabbit_misc:pget(port, Listener),

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_connection_sup.erl

Lines changed: 0 additions & 75 deletions
This file was deleted.

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525

2626
-record(state, {
2727
conn_name,
28-
keepalive,
29-
keepalive_sup,
3028
parse_state,
3129
proc_state,
3230
state,
@@ -44,6 +42,13 @@
4442
%%TODO move from deprecated callback results to new callback results
4543
%% see cowboy_websocket.erl
4644

45+
%%TODO call rabbit_networking:register_non_amqp_connection/1 so that we are notified
46+
%% when need to force load the 'connection_created' event for the management plugin, see
47+
%% https://github.com/rabbitmq/rabbitmq-management-agent/issues/58
48+
%% https://github.com/rabbitmq/rabbitmq-server/blob/90cc0e2abf944141feedaf3190d7b6d8b4741b11/deps/rabbitmq_stream/src/rabbit_stream_reader.erl#L536
49+
%% https://github.com/rabbitmq/rabbitmq-server/blob/90cc0e2abf944141feedaf3190d7b6d8b4741b11/deps/rabbitmq_stream/src/rabbit_stream_reader.erl#L189
50+
%% https://github.com/rabbitmq/rabbitmq-server/blob/7eb4084cf879fe6b1d26dd3c837bd180cb3a8546/deps/rabbitmq_management_agent/src/rabbit_mgmt_db_handler.erl#L72
51+
4752
%% cowboy_sub_protcol
4853
upgrade(Req, Env, Handler, HandlerState) ->
4954
upgrade(Req, Env, Handler, HandlerState, #{}).
@@ -64,7 +69,6 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState})
6469
%% cowboy_websocket
6570
init(Req, Opts) ->
6671
{PeerAddr, _PeerPort} = maps:get(peer, Req),
67-
{_, KeepaliveSup} = lists:keyfind(keepalive_sup, 1, Opts),
6872
SockInfo = maps:get(proxy_header, Req, undefined),
6973
WsOpts0 = proplists:get_value(ws_opts, Opts, #{}),
7074
%%TODO return idle_timeout?
@@ -83,8 +87,6 @@ init(Req, Opts) ->
8387
cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, SecWsProtocol, Req)
8488
end,
8589
{?MODULE, Req2, #state{
86-
keepalive = {none, none},
87-
keepalive_sup = KeepaliveSup,
8890
parse_state = rabbit_mqtt_frame:initial_state(),
8991
state = running,
9092
conserve_resources = false,
@@ -101,7 +103,7 @@ websocket_init(State0 = #state{socket = Sock, peername = PeerAddr}) ->
101103
conn_name = ConnStr,
102104
socket = Sock
103105
},
104-
rabbit_log_connection:info("accepting Web MQTT connection ~p (~s)", [self(), ConnStr]),
106+
rabbit_log_connection:info("Accepting Web MQTT connection ~p (~s)", [self(), ConnStr]),
105107
RealSocket = rabbit_net:unwrap_socket(Sock),
106108
ProcessorState = rabbit_mqtt_processor:initial_state(RealSocket,
107109
ConnStr,
@@ -173,18 +175,12 @@ websocket_info({'$gen_cast', {close_connection, Reason}}, State = #state{ proc_s
173175
rabbit_log_connection:warning("Web MQTT disconnecting client with ID '~s' (~p), reason: ~s",
174176
[rabbit_mqtt_processor:info(client_id, ProcState), ConnName, Reason]),
175177
stop(State);
176-
websocket_info({start_keepalive, Keepalive},
177-
State = #state{ socket = Sock, keepalive_sup = KeepaliveSup }) ->
178-
%% Only the client has the responsibility for sending keepalives
179-
SendFun = fun() -> ok end,
180-
Parent = self(),
181-
ReceiveFun = fun() -> Parent ! keepalive_timeout end,
182-
Heartbeater = rabbit_heartbeat:start(
183-
KeepaliveSup, Sock, 0, SendFun, Keepalive, ReceiveFun),
184-
{ok, State #state { keepalive = Heartbeater }, hibernate};
185-
websocket_info(keepalive_timeout, State = #state{conn_name = ConnStr}) ->
186-
rabbit_log_connection:error("closing Web MQTT connection ~p (keepalive timeout)", [ConnStr]),
187-
stop(State);
178+
websocket_info({start_keepalive, _Keepalive}, State) ->
179+
%%TODO use timer as done in rabbit_mqtt_reader
180+
{ok, State, hibernate};
181+
% websocket_info(keepalive_timeout, State = #state{conn_name = ConnStr}) ->
182+
% rabbit_log_connection:error("closing Web MQTT connection ~p (keepalive timeout)", [ConnStr]),
183+
% stop(State);
188184
websocket_info(emit_stats, State) ->
189185
{ok, emit_stats(State), hibernate};
190186
websocket_info({ra_event, _From, Evt},
@@ -284,13 +280,13 @@ handle_credits(State0) ->
284280
control_throttle(State = #state{ state = CS,
285281
conserve_resources = Mem }) ->
286282
case {CS, Mem orelse credit_flow:blocked()} of
287-
{running, true} -> ok = rabbit_heartbeat:pause_monitor(
288-
State#state.keepalive),
289-
State #state{ state = blocked };
290-
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
291-
State#state.keepalive),
292-
State #state{ state = running };
293-
{_, _} -> State
283+
%%TODO cancel / resume keepalive timer as done in rabbit_mqtt_reader
284+
{running, true} ->
285+
State #state{state = blocked};
286+
{blocked,false} ->
287+
State #state{state = running};
288+
{_, _} ->
289+
State
294290
end.
295291

296292
send_reply(Frame, PState) ->

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_middleware.erl

Lines changed: 0 additions & 20 deletions
This file was deleted.

0 commit comments

Comments
 (0)