Skip to content

Commit 9c153b2

Browse files
committed
Add support for hearbeats
This involves creating the keepalive supervisor when the connection starts, and passing its pid onward to the handler. To achieve this we put ourselves in just before starting cowboy_protocol, create the supervisor and put its pid in the Cowboy environment. Later we have a middleware that takes this pid and pass it to the MQTT handler where it can then be used. A few minor changes and fixes are also included in this commit, including compacting the Req object and using hibernate.
1 parent 4f083e1 commit 9c153b2

File tree

4 files changed

+157
-35
lines changed

4 files changed

+157
-35
lines changed

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,23 +41,28 @@ init([]) -> {ok, {{one_for_one, 1, 5}, []}}.
4141

4242
mqtt_init() ->
4343
NbAcceptors = get_env(nb_acceptors, 1),
44-
CowboyOpts = get_env(cowboy_opts, []),
44+
CowboyOpts0 = get_env(cowboy_opts, []),
4545

4646
Routes = cowboy_router:compile([{'_', [
4747
{"/example", cowboy_static, {priv_file, rabbitmq_web_mqtt, "example/index.html"}},
4848
{"/example/[...]", cowboy_static, {priv_dir, rabbitmq_web_mqtt, "example/"}},
4949
{"/ws", rabbit_web_mqtt_handler, []}
5050
]}]),
51+
CowboyOpts = [
52+
{env, [{dispatch, Routes}]},
53+
{middlewares, [cowboy_router, rabbit_web_mqtt_middleware, cowboy_handler]}
54+
|CowboyOpts0],
5155

52-
TCPConf0 = get_env(tcp_config, []),
56+
TCPConf0 = [{connection_type, supervisor}|get_env(tcp_config, [])],
5357
TCPConf = case proplists:get_value(port, TCPConf0) of
5458
undefined -> [{port, 15675}|TCPConf0];
5559
_ -> TCPConf0
5660
end,
5761
TCPPort = proplists:get_value(port, TCPConf),
5862

59-
{ok, _} = cowboy:start_http(web_mqtt, NbAcceptors, TCPConf,
60-
[{env, [{dispatch, Routes}]}|CowboyOpts]),
63+
{ok, _} = ranch:start_listener(web_mqtt, NbAcceptors,
64+
ranch_tcp, TCPConf,
65+
rabbit_web_mqtt_connection_sup, CowboyOpts),
6166

6267
rabbit_log:info("rabbit_web_mqtt: listening for HTTP connections on ~s:~w~n",
6368
["0.0.0.0", TCPPort]),
@@ -69,8 +74,9 @@ mqtt_init() ->
6974
rabbit_networking:ensure_ssl(),
7075
SSLPort = proplists:get_value(port, SSLConf),
7176

72-
{ok, _} = cowboy:start_https(web_mqtt_secure, NbAcceptors, SSLPort,
73-
[{env, [{dispatch, Routes}]}|CowboyOpts]),
77+
{ok, _} = cowboy:start_https(web_mqtt_secure, NbAcceptors,
78+
ranch_ssl, SSLConf,
79+
rabbit_web_mqtt_connection_sup, CowboyOpts),
7480
rabbit_log:info("rabbit_web_mqtt: listening for HTTPS connections on ~s:~w~n",
7581
["0.0.0.0", SSLPort])
7682
end,
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 1.1 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at http://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2016 Pivotal Software, Inc. All rights reserved.
15+
%%
16+
17+
-module(rabbit_web_mqtt_connection_sup).
18+
19+
-behaviour(supervisor2).
20+
-behaviour(ranch_protocol).
21+
22+
-define(MAX_WAIT, 16#ffffffff).
23+
24+
-export([start_link/4, start_keepalive_link/0]).
25+
26+
-export([init/1]).
27+
28+
%%----------------------------------------------------------------------------
29+
30+
start_link(Ref, Sock, Transport, CowboyOpts0) ->
31+
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
32+
{ok, KeepaliveSup} = supervisor2:start_child(
33+
SupPid,
34+
{rabbit_web_mqtt_keepalive_sup,
35+
{?MODULE, start_keepalive_link, []},
36+
intrinsic, infinity, supervisor, [rabbit_keepalive_sup]}),
37+
38+
%% In order for the Websocket handler to receive the KeepaliveSup
39+
%% variable, we need to pass it first through the environment and
40+
%% then have the middleware rabbit_web_mqtt_middleware place it
41+
%% in the initial handler state.
42+
{env, Env} = lists:keyfind(env, 1, CowboyOpts0),
43+
CowboyOpts = lists:keyreplace(env, 1, CowboyOpts0,
44+
{env, [{keepalive_sup, KeepaliveSup}|Env]}),
45+
46+
{ok, ReaderPid} = supervisor2:start_child(
47+
SupPid,
48+
{cowboy_protocol,
49+
{cowboy_protocol, start_link, [Ref, Sock, Transport, CowboyOpts]},
50+
intrinsic, ?MAX_WAIT, worker, [cowboy_protocol]}),
51+
{ok, SupPid, ReaderPid}.
52+
53+
start_keepalive_link() ->
54+
supervisor2:start_link(?MODULE, []).
55+
56+
%%----------------------------------------------------------------------------
57+
58+
init([]) ->
59+
{ok, {{one_for_all, 0, 1}, []}}.

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl

Lines changed: 56 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,103 +22,131 @@
2222
-export([websocket_info/3]).
2323
-export([websocket_terminate/3]).
2424

25+
-export([conserve_resources/3]).
26+
2527
-include_lib("amqp_client/include/amqp_client.hrl").
2628

2729
-record(state, {
2830
conn_name,
31+
keepalive,
32+
keepalive_sup,
2933
parse_state,
3034
proc_state
3135
}).
3236

3337
init(_, _, _) ->
3438
{upgrade, protocol, cowboy_websocket}.
3539

36-
websocket_init(_, Req, _) ->
37-
% io:format(user, "~p~n", [Req]),
38-
40+
websocket_init(_, Req, Opts) ->
3941
process_flag(trap_exit, true),
42+
{_, KeepaliveSup} = lists:keyfind(keepalive_sup, 1, Opts),
4043
Sock = cowboy_req:get(socket, Req),
4144
case rabbit_net:connection_string(Sock, inbound) of
4245
{ok, ConnStr} ->
4346
rabbit_log:log(connection, info, "accepting WEB-MQTT connection ~p (~s)~n", [self(), ConnStr]),
44-
% rabbit_alarm:register(
45-
% self(), {?MODULE, conserve_resources, []}),
47+
rabbit_alarm:register(
48+
self(), {?MODULE, conserve_resources, []}),
4649
ProcessorState = rabbit_mqtt_processor:initial_state(Sock,
4750
rabbit_mqtt_reader:ssl_login_name(Sock),
4851
fun send_reply/2),
4952
{SecWsProtocol, Req1} = cowboy_req:header(<<"sec-websocket-protocol">>, Req),
5053
Req2 = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, SecWsProtocol, Req1),
51-
%% @todo compact?
52-
%% control_throttle
53-
{ok, Req2, #state{
54-
%% keepalive/keepalive_sup
55-
conn_name = ConnStr,
56-
parse_state = rabbit_mqtt_frame:initial_state(),
57-
proc_state = ProcessorState
58-
}};
54+
Req3 = cowboy_req:compact(Req2),
55+
%% @todo control_throttle
56+
{ok, Req3, #state{
57+
conn_name = ConnStr,
58+
keepalive = {none, none},
59+
keepalive_sup = KeepaliveSup,
60+
parse_state = rabbit_mqtt_frame:initial_state(),
61+
proc_state = ProcessorState
62+
}, hibernate};
5963
_ ->
6064
{shutdown, Req}
6165
end.
6266

63-
%% @todo hibernate everywhere?
6467
websocket_handle({binary, Data}, Req, State) ->
6568
handle_data(Data, Req, State);
6669
websocket_handle(Frame, Req, State) ->
67-
rabbit_log:info("rabbit_web_mqtt: unexpected Websocket frame ~p~n",
70+
rabbit_log:log(connection, info, "WEB-MQTT: unexpected Websocket frame ~p~n",
6871
[Frame]),
69-
{ok, Req, State}.
72+
{ok, Req, State, hibernate}.
7073

7174
websocket_info({#'basic.deliver'{}, #amqp_msg{}, _DeliveryCtx} = Delivery,
7275
Req, State = #state{ proc_state = ProcState0 }) ->
7376
case rabbit_mqtt_processor:amqp_callback(Delivery, ProcState0) of
7477
{ok, ProcState} ->
75-
{ok, Req, State #state { proc_state = ProcState }};
78+
{ok, Req, State #state { proc_state = ProcState }, hibernate};
7679
{error, _, _} ->
7780
{shutdown, Req, State}
7881
end;
7982
websocket_info(#'basic.ack'{} = Ack, Req, State = #state{ proc_state = ProcState0 }) ->
8083
case rabbit_mqtt_processor:amqp_callback(Ack, ProcState0) of
8184
{ok, ProcState} ->
82-
{ok, Req, State #state { proc_state = ProcState }};
85+
{ok, Req, State #state { proc_state = ProcState }, hibernate};
8386
{error, _, _} ->
8487
{shutdown, Req, State}
8588
end;
8689
websocket_info(#'basic.consume_ok'{}, Req, State) ->
87-
{ok, Req, State};
90+
{ok, Req, State, hibernate};
8891
websocket_info(#'basic.cancel'{}, Req, State) ->
8992
{shutdown, Req, State};
9093
websocket_info({reply, Data}, Req, State) ->
91-
{reply, {binary, Data}, Req, State};
92-
websocket_info({'EXIT', _, _}, State) ->
94+
{reply, {binary, Data}, Req, State, hibernate};
95+
websocket_info({'EXIT', _, _}, Req, State) ->
9396
{shutdown, Req, State};
9497
websocket_info({'$gen_cast', duplicate_id}, Req, State = #state{ proc_state = ProcState,
9598
conn_name = ConnName }) ->
96-
rabbit_log:warning("MQTT disconnecting duplicate client id ~p (~p)~n",
99+
rabbit_log:log(connection, warning, "WEB-MQTT disconnecting duplicate client id ~p (~p)~n",
97100
[rabbit_mqtt_processor:info(client_id, ProcState), ConnName]),
98101
{shutdown, Req, State};
102+
websocket_info({start_keepalives, Keepalive}, Req,
103+
State = #state{ keepalive_sup = KeepaliveSup }) ->
104+
Sock = cowboy_req:get(socket, Req),
105+
%% Only the client has the responsibility for sending keepalives
106+
SendFun = fun() -> ok end,
107+
Parent = self(),
108+
ReceiveFun = fun() -> Parent ! keepalive_timeout end,
109+
Heartbeater = rabbit_heartbeat:start(
110+
KeepaliveSup, Sock, 0, SendFun, Keepalive, ReceiveFun),
111+
{ok, Req, State #state { keepalive = Heartbeater }};
112+
113+
websocket_info(keepalive_timeout, Req, State = #state {conn_name = ConnStr,
114+
proc_state = PState}) ->
115+
rabbit_log:log(connection, error, "closing WEB-MQTT connection ~p (keepalive timeout)~n", [ConnStr]),
116+
rabbit_mqtt_processor:send_will(PState),
117+
{shutdown, Req, State};
118+
119+
%% @todo conserve_resources
120+
%% @todo bump_credit
121+
99122
websocket_info(Msg, Req, State) ->
100-
rabbit_log:info("rabbit_web_mqtt: unexpected message ~p~n",
123+
rabbit_log:log(connection, info, "WEB-MQTT: unexpected message ~p~n",
101124
[Msg]),
102-
{ok, Req, State}.
103-
125+
{ok, Req, State, hibernate}.
104126

105-
websocket_terminate(_, _, #state{proc_state=ProcState}) ->
127+
websocket_terminate(_, _, #state{ proc_state = ProcState,
128+
conn_name = ConnName }) ->
129+
rabbit_log:log(connection, info, "closing WEB-MQTT connection ~p (~s)~n", [self(), ConnName]),
106130
rabbit_mqtt_processor:close_connection(ProcState),
107131
ok;
108132
websocket_terminate(_, _, _) ->
109133
ok.
110134

135+
conserve_resources(Pid, _, Conserve) ->
136+
Pid ! {conserve_resources, Conserve},
137+
ok.
138+
111139
%% Internal.
112140

113141
handle_data(<<>>, Req, State) ->
114-
{ok, Req, State};
142+
{ok, Req, State, hibernate};
115143
handle_data(Data, Req, State = #state{ parse_state = ParseState,
116144
proc_state = ProcState,
117145
conn_name = ConnStr }) ->
118146
case rabbit_mqtt_frame:parse(Data, ParseState) of
119147
{more, ParseState1} ->
120148
%% @todo control_throttle
121-
{ok, Req, State #state{ parse_state = ParseState1 }};
149+
{ok, Req, State #state{ parse_state = ParseState1 }, hibernate};
122150
{ok, Frame, Rest} ->
123151
case rabbit_mqtt_processor:process_frame(Frame, ProcState) of
124152
{ok, ProcState1} ->
@@ -145,5 +173,4 @@ handle_data(Data, Req, State = #state{ parse_state = ParseState,
145173
end.
146174

147175
send_reply(Frame, _) ->
148-
rabbit_log:info("MQTT sending frame ~p ~n", [Frame]),
149176
self() ! {reply, rabbit_mqtt_frame:serialise(Frame)}.
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 1.1 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at http://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2016 GoPivotal, Inc. All rights reserved.
15+
%%
16+
17+
-module(rabbit_web_mqtt_middleware).
18+
-behavior(cowboy_middleware).
19+
20+
-export([execute/2]).
21+
22+
execute(Req, Env) ->
23+
{keepalive_sup, KeepaliveSup} = lists:keyfind(keepalive_sup, 1, Env),
24+
case lists:keyfind(handler_opts, 1, Env) of
25+
{_, Opts} when is_list(Opts) ->
26+
{ok, Req, lists:keyreplace(handler_opts, 1, Env,
27+
{handler_opts, [{keepalive_sup, KeepaliveSup}|Opts]})};
28+
_ ->
29+
{ok, Req, Env}
30+
end.

0 commit comments

Comments
 (0)