Skip to content

Commit 90510fb

Browse files
committed
Convert rabbit_mqtt_reader from gen_server2 to gen_server
There is no need to use gen_server2. gen_server2 requires lots of memory with millions of MQTT connections because it creates 1 entry per connection into ETS table 'gen_server2_metrics'. Instead of using handle_pre_hibernate, erasing the permission cache is now done by using a timeout. We do not need a hibernate backoff feature, simply hibernate after 1 second. It's okay for MQTT connection processes to hibernate because they usually send data rather rarely (compared to AMQP connections).
1 parent 722a44f commit 90510fb

File tree

1 file changed

+19
-18
lines changed

1 file changed

+19
-18
lines changed

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77

88
-module(rabbit_mqtt_reader).
99

10-
-behaviour(gen_server2).
10+
-behaviour(gen_server).
1111
-behaviour(ranch_protocol).
1212

1313
-export([start_link/3]).
1414
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
15-
code_change/3, terminate/2, handle_pre_hibernate/1]).
15+
code_change/3, terminate/2]).
1616

1717
%%TODO check where to best 'hibernate' when returning from callback
1818
%%TODO use rabbit_global_counters for MQTT protocol
@@ -26,6 +26,7 @@
2626

2727
-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
2828
-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, garbage_collection, state]).
29+
-define(HIBERNATE_AFTER, 1000).
2930

3031
%%----------------------------------------------------------------------------
3132

@@ -133,13 +134,17 @@ handle_cast(Delivery = {deliver, _, _, _}, State = #state{proc_state = PState})
133134
handle_cast(Msg, State) ->
134135
{stop, {mqtt_unexpected_cast, Msg}, State}.
135136

137+
handle_info(timeout, State) ->
138+
rabbit_mqtt_processor:handle_pre_hibernate(),
139+
{noreply, State, hibernate};
140+
136141
handle_info({'EXIT', _Conn, Reason}, State) ->
137142
{stop, {connection_died, Reason}, State};
138143

139144
handle_info({Tag, Sock, Data},
140145
State = #state{ socket = Sock, connection_state = blocked })
141-
when Tag =:= tcp; Tag =:= ssl ->
142-
{noreply, State#state{ deferred_recv = Data }, hibernate};
146+
when Tag =:= tcp; Tag =:= ssl ->
147+
{noreply, State#state{ deferred_recv = Data }, ?HIBERNATE_AFTER};
143148

144149
handle_info({Tag, Sock, Data},
145150
State = #state{ socket = Sock, connection_state = running })
@@ -156,7 +161,7 @@ handle_info({Tag, Sock, Reason}, State = #state{socket = Sock})
156161
network_error(Reason, State);
157162

158163
handle_info({inet_reply, Sock, ok}, State = #state{socket = Sock}) ->
159-
{noreply, State, hibernate};
164+
{noreply, State, ?HIBERNATE_AFTER};
160165

161166
handle_info({inet_reply, Sock, {error, Reason}}, State = #state{socket = Sock}) ->
162167
network_error(Reason, State);
@@ -173,7 +178,7 @@ handle_info({keepalive, Req}, State = #state{keepalive = KState0,
173178
conn_name = ConnName}) ->
174179
case rabbit_mqtt_keepalive:handle(Req, KState0) of
175180
{ok, KState} ->
176-
{noreply, State#state{keepalive = KState}, hibernate};
181+
{noreply, State#state{keepalive = KState}, ?HIBERNATE_AFTER};
177182
{error, timeout} ->
178183
rabbit_log_connection:error("closing MQTT connection ~p (keepalive timeout)", [ConnName]),
179184
send_will_and_terminate({shutdown, keepalive_timeout}, State);
@@ -193,19 +198,19 @@ handle_info(login_timeout, State = #state{conn_name = ConnStr}) ->
193198
{stop, {shutdown, login_timeout}, State};
194199

195200
handle_info(emit_stats, State) ->
196-
{noreply, emit_stats(State), hibernate};
201+
{noreply, emit_stats(State), ?HIBERNATE_AFTER};
197202

198203
handle_info({ra_event, _From, Evt},
199204
#state{proc_state = PState0} = State) ->
200205
%% handle applied event to ensure registration command actually got applied
201206
%% handle not_leader notification in case we send the command to a non-leader
202207
PState = rabbit_mqtt_processor:handle_ra_event(Evt, PState0),
203-
{noreply, pstate(State, PState), hibernate};
208+
{noreply, pstate(State, PState), ?HIBERNATE_AFTER};
204209

205210
handle_info({'DOWN', _MRef, process, _Pid, _Reason} = Evt,
206211
#state{proc_state = PState0} = State) ->
207212
PState = rabbit_mqtt_processor:handle_down(Evt, PState0),
208-
{noreply, pstate(State, PState), hibernate};
213+
{noreply, pstate(State, PState), ?HIBERNATE_AFTER};
209214

210215
handle_info(Msg, State) ->
211216
{stop, {mqtt_unexpected_msg, Msg}, State}.
@@ -217,10 +222,6 @@ terminate(Reason, State = #state{keepalive = KState0,
217222
rabbit_mqtt_processor:terminate(PState),
218223
log_terminate(Reason, State).
219224

220-
handle_pre_hibernate(State) ->
221-
rabbit_mqtt_processor:handle_pre_hibernate(),
222-
{hibernate, State}.
223-
224225
log_terminate({network_error, {ssl_upgrade_error, closed}, ConnStr}, _State) ->
225226
rabbit_log_connection:error("MQTT detected TLS upgrade error on ~s: connection closed",
226227
[ConnStr]);
@@ -279,9 +280,9 @@ process_received_bytes(<<>>, State = #state{received_connect_frame = false,
279280
conn_name = ConnStr}) ->
280281
rabbit_log_connection:info("Accepted MQTT connection ~p (~s, client id: ~s)",
281282
[self(), ConnStr, rabbit_mqtt_processor:info(client_id, PState)]),
282-
{noreply, ensure_stats_timer(State#state{received_connect_frame = true}), hibernate};
283+
{noreply, ensure_stats_timer(State#state{received_connect_frame = true}), ?HIBERNATE_AFTER};
283284
process_received_bytes(<<>>, State) ->
284-
{noreply, ensure_stats_timer(State), hibernate};
285+
{noreply, ensure_stats_timer(State), ?HIBERNATE_AFTER};
285286
process_received_bytes(Bytes,
286287
State = #state{ parse_state = ParseState,
287288
proc_state = ProcState,
@@ -290,7 +291,7 @@ process_received_bytes(Bytes,
290291
{more, ParseState1} ->
291292
{noreply,
292293
ensure_stats_timer( State #state{ parse_state = ParseState1 }),
293-
hibernate};
294+
?HIBERNATE_AFTER};
294295
{ok, Frame, Rest} ->
295296
case rabbit_mqtt_processor:process_frame(Frame, ProcState) of
296297
{ok, ProcState1} ->
@@ -341,7 +342,7 @@ process_received_bytes(Bytes,
341342
end.
342343

343344
callback_reply(State, {ok, ProcState}) ->
344-
{noreply, pstate(State, ProcState), hibernate};
345+
{noreply, pstate(State, ProcState), ?HIBERNATE_AFTER};
345346
callback_reply(State, {error, Reason, ProcState}) ->
346347
{stop, Reason, pstate(State, ProcState)}.
347348

@@ -413,7 +414,7 @@ control_throttle(State = #state{connection_state = Flow,
413414
end.
414415

415416
maybe_process_deferred_recv(State = #state{ deferred_recv = undefined }) ->
416-
{noreply, State, hibernate};
417+
{noreply, State, ?HIBERNATE_AFTER};
417418
maybe_process_deferred_recv(State = #state{ deferred_recv = Data, socket = Sock }) ->
418419
handle_info({tcp, Sock, Data},
419420
State#state{ deferred_recv = undefined }).

0 commit comments

Comments
 (0)