Skip to content

Commit 7aea7f8

Browse files
committed
Roll wal on memory alarm
[#161408569]
1 parent dbe1dea commit 7aea7f8

File tree

2 files changed

+103
-2
lines changed

2 files changed

+103
-2
lines changed
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 1.1 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at http://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
15+
%%
16+
-module(rabbit_quorum_memory_manager).
17+
18+
-include("rabbit.hrl").
19+
20+
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
21+
terminate/2, code_change/3]).
22+
-export([register/0, unregister/0]).
23+
24+
-record(state, {last_roll_over,
25+
interval}).
26+
27+
-rabbit_boot_step({rabbit_quorum_memory_manager,
28+
[{description, "quorum memory manager"},
29+
{mfa, {?MODULE, register, []}},
30+
{cleanup, {?MODULE, unregister, []}},
31+
{requires, rabbit_event},
32+
{enables, recovery}]}).
33+
34+
register() ->
35+
gen_event:add_handler(rabbit_alarm, ?MODULE, []).
36+
37+
unregister() ->
38+
gen_event:delete_handler(rabbit_alarm, ?MODULE, []).
39+
40+
init([]) ->
41+
{ok, #state{interval = interval()}}.
42+
43+
handle_call( _, State) ->
44+
{ok, ok, State}.
45+
46+
handle_event({set_alarm, {{resource_limit, memory, Node}, []}},
47+
#state{last_roll_over = undefined} = State) when Node == node() ->
48+
{ok, force_roll_over(State)};
49+
handle_event({set_alarm, {{resource_limit, memory, Node}, []}},
50+
#state{last_roll_over = Last, interval = Interval } = State)
51+
when Node == node() ->
52+
Now = erlang:system_time(millisecond),
53+
case Now > (Last + Interval) of
54+
true ->
55+
{ok, force_roll_over(State)};
56+
false ->
57+
{ok, State}
58+
end;
59+
handle_event(_, State) ->
60+
{ok, State}.
61+
62+
handle_info(_, State) ->
63+
{ok, State}.
64+
65+
terminate(_, _State) ->
66+
ok.
67+
68+
code_change(_OldVsn, State, _Extra) ->
69+
{ok, State}.
70+
71+
force_roll_over(State) ->
72+
ra_log_wal:force_roll_over(ra_log_wal),
73+
State#state{last_roll_over = erlang:system_time(millisecond)}.
74+
75+
interval() ->
76+
application:get_env(rabbit, min_wal_roll_over_interval, 20000).

test/quorum_queue_SUITE.erl

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ all_tests() ->
119119
delete_immediately,
120120
delete_immediately_by_resource,
121121
consume_redelivery_count,
122-
subscribe_redelivery_count
122+
subscribe_redelivery_count,
123+
memory_alarm_rolls_wal
123124
].
124125

125126
%% -------------------------------------------------------------------
@@ -205,7 +206,10 @@ init_per_testcase(Testcase, Config) ->
205206
rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).
206207

207208
merge_app_env(Config) ->
208-
rabbit_ct_helpers:merge_app_env(Config, {rabbit, [{core_metrics_gc_interval, 100}]}).
209+
rabbit_ct_helpers:merge_app_env(
210+
rabbit_ct_helpers:merge_app_env(Config,
211+
{rabbit, [{core_metrics_gc_interval, 100}]}),
212+
{ra, [{min_wal_roll_over_interval, 30000}]}).
209213

210214
end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
211215
Testcase == reconnect_consumer_and_wait;
@@ -2044,6 +2048,27 @@ consume_redelivery_count(Config) ->
20442048
multiple = false,
20452049
requeue = true}).
20462050

2051+
memory_alarm_rolls_wal(Config) ->
2052+
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
2053+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
2054+
QQ = ?config(queue_name, Config),
2055+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
2056+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
2057+
WalDataDir = rpc:call(Server, ra_env, wal_data_dir, []),
2058+
[Wal0] = filelib:wildcard(WalDataDir ++ "/*.wal"),
2059+
ok = rpc:call(Server, rabbit_alarm, set_alarm,
2060+
[{{resource_limit, memory, Server}, []}]),
2061+
timer:sleep(1000),
2062+
[Wal1] = filelib:wildcard(WalDataDir ++ "/*.wal"),
2063+
?assert(Wal0 =/= Wal1),
2064+
%% roll over shouldn't happen if we trigger a new alarm in less than
2065+
%% min_wal_roll_over_interval
2066+
ok = rpc:call(Server, rabbit_alarm, set_alarm,
2067+
[{{resource_limit, memory, Server}, []}]),
2068+
timer:sleep(1000),
2069+
[Wal2] = filelib:wildcard(WalDataDir ++ "/*.wal"),
2070+
?assert(Wal1 == Wal2).
2071+
20472072
%%----------------------------------------------------------------------------
20482073

20492074
declare(Ch, Q) ->

0 commit comments

Comments
 (0)