Skip to content

Commit 11d09fb

Browse files
committed
Support MQTT Keepalive in WebMQTT
Share the same MQTT keepalive code between rabbit_mqtt_reader and rabbit_web_mqtt_handler. Add MQTT keepalive test in both plugins rabbitmq_mqtt and rabbitmq_web_mqtt.
1 parent 2fffb55 commit 11d09fb

File tree

7 files changed

+255
-161
lines changed

7 files changed

+255
-161
lines changed

deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,19 @@
1010
-include("rabbit_mqtt_types.hrl").
1111

1212
%% reader state
13-
-record(state, {socket,
14-
proxy_socket,
15-
conn_name,
16-
await_recv,
17-
deferred_recv,
18-
received_connect_frame,
19-
connection_state,
20-
conserve,
21-
parse_state,
22-
proc_state,
23-
stats_timer,
24-
keepalive}).
25-
26-
-record(keepalive, {timer :: reference(),
27-
interval_ms :: pos_integer(),
28-
recv_oct :: non_neg_integer(),
29-
received :: boolean()}).
13+
-record(state,
14+
{socket,
15+
proxy_socket,
16+
conn_name,
17+
await_recv,
18+
deferred_recv,
19+
received_connect_frame,
20+
connection_state,
21+
conserve,
22+
parse_state,
23+
proc_state,
24+
stats_timer,
25+
keepalive :: rabbit_mqtt_keepalive:state()}).
3026

3127
%% processor state
3228
-record(proc_state,
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
-module(rabbit_mqtt_keepalive).
2+
3+
-export([start/2,
4+
handle/2,
5+
start_timer/1,
6+
cancel_timer/1]).
7+
8+
-export_type([state/0]).
9+
10+
-record(state, {timer :: reference(),
11+
interval_ms :: pos_integer(),
12+
socket :: inet:socket(),
13+
recv_oct :: non_neg_integer(),
14+
received :: boolean()}).
15+
16+
-opaque(state() :: undefined | #state{}).
17+
18+
-spec start(IntervalSeconds :: non_neg_integer(), inet:socket()) -> ok.
19+
start(0, _Sock) ->
20+
ok;
21+
start(Seconds, Sock)
22+
when is_integer(Seconds) andalso Seconds > 0 ->
23+
self() ! {keepalive, {init, Seconds, Sock}},
24+
ok.
25+
26+
-spec handle(Request :: term(), state()) ->
27+
{ok, state()} | {error, Reason :: term()}.
28+
handle({init, IntervalSecs, Sock}, _State) ->
29+
case rabbit_net:getstat(Sock, [recv_oct]) of
30+
{ok, [{recv_oct, RecvOct}]} ->
31+
%% "If the Keep Alive value is non-zero and the Server does not receive a Control
32+
%% Packet from the Client within one and a half times the Keep Alive time period,
33+
%% it MUST disconnect the Network Connection to the Client as if the network had
34+
%% failed" [MQTT-3.1.2-24]
35+
%%
36+
%% We check every (1.5 / 2 = 0.75) * KeepaliveInterval whether we received
37+
%% any data from the client. If there was no activity for two consecutive times,
38+
%% we close the connection.
39+
%% We choose 0.75 (instead of a larger or smaller factor) to have the right balance
40+
%% between not checking too often (since it could become expensive when there are
41+
%% millions of clients) and not checking too rarely (to detect dead clients promptly).
42+
%%
43+
%% See https://github.com/emqx/emqx/issues/460
44+
%% PING
45+
%% | DOWN
46+
%% | |<-------Delay Time--------->
47+
%% t0---->|----------|----------|----------|---->tn
48+
%% | | |
49+
%% Ok Retry Timeout
50+
IntervalMs = round(0.75 * timer:seconds(IntervalSecs)),
51+
State = #state{socket = Sock,
52+
interval_ms = IntervalMs,
53+
recv_oct = RecvOct,
54+
received = true},
55+
{ok, start_timer(State)};
56+
{error, _} = Err ->
57+
Err
58+
end;
59+
handle(check, State = #state{socket = Sock,
60+
recv_oct = SameRecvOct,
61+
received = ReceivedPreviously}) ->
62+
case rabbit_net:getstat(Sock, [recv_oct]) of
63+
{ok, [{recv_oct, SameRecvOct}]}
64+
when ReceivedPreviously ->
65+
%% Did not receive from socket for the 1st time.
66+
{ok, start_timer(State#state{received = false})};
67+
{ok, [{recv_oct, SameRecvOct}]} ->
68+
%% Did not receive from socket for 2nd time.
69+
{error, timeout};
70+
{ok, [{recv_oct, NewRecvOct}]} ->
71+
%% Received from socket.
72+
{ok, start_timer(State#state{recv_oct = NewRecvOct,
73+
received = true})};
74+
{error, _} = Err ->
75+
Err
76+
end.
77+
78+
-spec start_timer(state()) -> state().
79+
start_timer(undefined) ->
80+
undefined;
81+
start_timer(#state{interval_ms = IntervalMs} = State) ->
82+
Ref = erlang:send_after(IntervalMs, self(), {keepalive, check}),
83+
State#state{timer = Ref}.
84+
85+
-spec cancel_timer(state()) -> state().
86+
cancel_timer(undefined) ->
87+
undefined;
88+
cancel_timer(#state{timer = Ref} = State)
89+
when is_reference(Ref) ->
90+
ok = erlang:cancel_timer(Ref, [{async, true},
91+
{info, false}]),
92+
State.

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ process_connect(#mqtt_frame{
260260
fun login/2,
261261
fun register_client/2,
262262
fun notify_connection_created/2,
263+
fun start_keepalive/2,
263264
fun handle_clean_session/2],
264265
FrameConnect, PState0) of
265266
{ok, SessionPresent0, PState1} ->
@@ -344,7 +345,6 @@ register_client(Frame = #mqtt_frame_connect{
344345
{ok, Corr} ->
345346
RetainerPid = rabbit_mqtt_retainer_sup:child_for_vhost(VHost),
346347
Prefetch = rabbit_mqtt_util:env(prefetch),
347-
rabbit_mqtt_reader:start_keepalive(self(), Keepalive),
348348
{ok, {PeerHost, PeerPort, Host, Port}} = rabbit_net:socket_ends(Socket, inbound),
349349
ExchangeBin = rabbit_mqtt_util:env(exchange),
350350
ExchangeName = rabbit_misc:r(VHost, exchange, ExchangeBin),
@@ -421,6 +421,10 @@ return_connack(?CONNACK_ID_REJECTED, S) ->
421421
return_connack(?CONNACK_UNACCEPTABLE_PROTO_VER, S) ->
422422
{error, unsupported_protocol_version, S}.
423423

424+
start_keepalive(#mqtt_frame_connect{keep_alive = Seconds},
425+
#proc_state{socket = Socket}) ->
426+
ok = rabbit_mqtt_keepalive:start(Seconds, Socket).
427+
424428
handle_clean_session(_, PState0 = #proc_state{clean_sess = false}) ->
425429
case get_queue(?QOS_1, PState0) of
426430
{error, not_found} ->

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 25 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
%%TODO check where to best 'hibernate' when returning from callback
1818
%%TODO use rabbit_global_counters for MQTT protocol
1919

20-
-export([conserve_resources/3, start_keepalive/2,
20+
-export([conserve_resources/3,
2121
close_connection/2]).
2222

2323
-export([info/2]).
@@ -166,61 +166,14 @@ handle_info({bump_credit, Msg}, State) ->
166166
credit_flow:handle_bump_msg(Msg),
167167
maybe_process_deferred_recv(control_throttle(State));
168168

169-
handle_info({start_keepalive, KeepaliveSec},
170-
State = #state{socket = Sock,
171-
keepalive = undefined})
172-
when is_number(KeepaliveSec), KeepaliveSec > 0 ->
173-
case rabbit_net:getstat(Sock, [recv_oct]) of
174-
{ok, [{recv_oct, RecvOct}]} ->
175-
%% "If the Keep Alive value is non-zero and the Server does not receive a Control
176-
%% Packet from the Client within one and a half times the Keep Alive time period,
177-
%% it MUST disconnect the Network Connection to the Client as if the network had
178-
%% failed" [MQTT-3.1.2-24]
179-
%% 0.75 * 2 = 1.5
180-
IntervalMs = timer:seconds(round(0.75 * KeepaliveSec)),
181-
Ref = start_keepalive_timer(#keepalive{interval_ms = IntervalMs}),
182-
{noreply, State#state{keepalive = #keepalive{timer = Ref,
183-
interval_ms = IntervalMs,
184-
recv_oct = RecvOct,
185-
received = true}}};
186-
{error, einval} ->
187-
%% the socket is dead, most likely because the connection is being shut down
188-
{stop, {shutdown, cannot_get_socket_stats}, State};
189-
{error, Reason} ->
190-
{stop, Reason, State}
191-
end;
192-
193-
handle_info({timeout, Ref, keepalive},
194-
State = #state {socket = Sock,
195-
conn_name = ConnStr,
196-
proc_state = PState,
197-
keepalive = #keepalive{timer = Ref,
198-
recv_oct = SameRecvOct,
199-
received = ReceivedPreviously} = KeepAlive}) ->
200-
case rabbit_net:getstat(Sock, [recv_oct]) of
201-
{ok, [{recv_oct, SameRecvOct}]}
202-
when ReceivedPreviously ->
203-
%% Did not receive from socket for the 1st time.
204-
Ref1 = start_keepalive_timer(KeepAlive),
205-
{noreply,
206-
State#state{keepalive = KeepAlive#keepalive{timer = Ref1,
207-
received = false}},
208-
hibernate};
209-
{ok, [{recv_oct, SameRecvOct}]} ->
210-
%% Did not receive from socket for 2nd time successively.
211-
rabbit_log_connection:error("closing MQTT connection ~p (keepalive timeout)", [ConnStr]),
212-
send_will_and_terminate(PState, {shutdown, keepalive_timeout}, State);
213-
{ok, [{recv_oct, RecvOct}]} ->
214-
%% Received from socket.
215-
Ref1 = start_keepalive_timer(KeepAlive),
216-
{noreply,
217-
State#state{keepalive = KeepAlive#keepalive{timer = Ref1,
218-
recv_oct = RecvOct,
219-
received = true}},
220-
hibernate};
221-
{error, einval} ->
222-
%% the socket is dead, most likely because the connection is being shut down
223-
{stop, {shutdown, cannot_get_socket_stats}, State};
169+
handle_info({keepalive, Req}, State = #state{keepalive = KState0,
170+
conn_name = ConnName}) ->
171+
case rabbit_mqtt_keepalive:handle(Req, KState0) of
172+
{ok, KState} ->
173+
{noreply, State#state{keepalive = KState}, hibernate};
174+
{error, timeout} ->
175+
rabbit_log_connection:error("closing MQTT connection ~p (keepalive timeout)", [ConnName]),
176+
send_will_and_terminate({shutdown, keepalive_timeout}, State);
224177
{error, Reason} ->
225178
{stop, Reason, State}
226179
end;
@@ -254,14 +207,9 @@ handle_info({'DOWN', _MRef, process, _Pid, _Reason} = Evt,
254207
handle_info(Msg, State) ->
255208
{stop, {mqtt_unexpected_msg, Msg}, State}.
256209

257-
start_keepalive_timer(#keepalive{interval_ms = Time}) ->
258-
erlang:start_timer(Time, self(), keepalive).
259-
260-
cancel_keepalive_timer(#keepalive{timer = Ref}) ->
261-
erlang:cancel_timer(Ref, [{async, true}, {info, false}]).
262-
263-
terminate(Reason, State) ->
264-
maybe_emit_stats(State),
210+
terminate(Reason, State = #state{keepalive = KState0}) ->
211+
KState = rabbit_mqtt_keepalive:cancel_timer(KState0),
212+
maybe_emit_stats(State#state{keepalive = KState}),
265213
do_terminate(Reason, State).
266214

267215
handle_pre_hibernate(State) ->
@@ -300,7 +248,7 @@ do_terminate({network_error, Reason}, _State) ->
300248
rabbit_log_connection:error("MQTT detected network error: ~p", [Reason]);
301249

302250
do_terminate(normal, #state{proc_state = ProcState,
303-
conn_name = ConnName}) ->
251+
conn_name = ConnName}) ->
304252
rabbit_mqtt_processor:terminate(ProcState),
305253
rabbit_log_connection:info("closing MQTT connection ~p (~s)", [self(), ConnName]),
306254
ok;
@@ -395,9 +343,6 @@ callback_reply(State, {ok, ProcState}) ->
395343
callback_reply(State, {error, Reason, ProcState}) ->
396344
{stop, Reason, pstate(State, ProcState)}.
397345

398-
start_keepalive(_, 0 ) -> ok;
399-
start_keepalive(Pid, Keepalive) -> Pid ! {start_keepalive, Keepalive}.
400-
401346
pstate(State = #state {}, PState = #proc_state{}) ->
402347
State #state{ proc_state = PState }.
403348

@@ -415,32 +360,31 @@ parse(Bytes, ParseState) ->
415360
%% "The Will Message MUST be published when the Network Connection is subsequently
416361
%% closed unless the Will Message has been deleted by the Server on receipt of a
417362
%% DISCONNECT Packet [MQTT-3.1.2-8]."
418-
send_will_and_terminate(PState, State) ->
419-
send_will_and_terminate(PState, {shutdown, conn_closed}, State).
363+
send_will_and_terminate(State) ->
364+
send_will_and_terminate({shutdown, conn_closed}, State).
420365

421-
send_will_and_terminate(PState, Reason, State = #state{conn_name = ConnStr}) ->
366+
send_will_and_terminate(Reason, State = #state{conn_name = ConnStr,
367+
proc_state = PState}) ->
422368
rabbit_log_connection:debug("MQTT: about to send will message (if any) on connection ~p", [ConnStr]),
423369
rabbit_mqtt_processor:send_will(PState),
424370
{stop, Reason, State}.
425371

426372
network_error(closed,
427373
State = #state{conn_name = ConnStr,
428-
proc_state = PState,
429374
received_connect_frame = Connected}) ->
430375
Fmt = "MQTT connection ~p will terminate because peer closed TCP connection",
431376
Args = [ConnStr],
432377
case Connected of
433378
true -> rabbit_log_connection:info(Fmt, Args);
434379
false -> rabbit_log_connection:debug(Fmt, Args)
435380
end,
436-
send_will_and_terminate(PState, State);
381+
send_will_and_terminate(State);
437382

438383
network_error(Reason,
439-
State = #state{conn_name = ConnStr,
440-
proc_state = PState}) ->
384+
State = #state{conn_name = ConnStr}) ->
441385
rabbit_log_connection:info("MQTT detected network error for ~p: ~p",
442386
[ConnStr, Reason]),
443-
send_will_and_terminate(PState, State).
387+
send_will_and_terminate(State).
444388

445389
run_socket(State = #state{ connection_state = blocked }) ->
446390
State;
@@ -454,24 +398,14 @@ run_socket(State = #state{ socket = Sock }) ->
454398

455399
control_throttle(State = #state{connection_state = Flow,
456400
conserve = Conserve,
457-
keepalive = KeepAlive}) ->
401+
keepalive = KState}) ->
458402
case {Flow, Conserve orelse credit_flow:blocked()} of
459-
{running, true}
460-
when KeepAlive =:= undefined ->
461-
State#state{connection_state = blocked};
462403
{running, true} ->
463-
%%TODO Instead of cancelling / setting the timer every time the connection
464-
%% gets blocked / unblocked, restart the timer when it expires and
465-
%% the connection_state is blocked.
466-
ok = cancel_keepalive_timer(KeepAlive),
467-
State#state{connection_state = blocked};
468-
{blocked, false}
469-
when KeepAlive =:= undefined ->
470-
run_socket(State #state{connection_state = running});
404+
State#state{connection_state = blocked,
405+
keepalive = rabbit_mqtt_keepalive:cancel_timer(KState)};
471406
{blocked, false} ->
472-
Ref = start_keepalive_timer(KeepAlive),
473-
run_socket(State #state{connection_state = running,
474-
keepalive = KeepAlive#keepalive{timer = Ref}});
407+
run_socket(State#state{connection_state = running,
408+
keepalive = rabbit_mqtt_keepalive:start_timer(KState)});
475409
{_, _} ->
476410
run_socket(State)
477411
end.

0 commit comments

Comments
 (0)