Skip to content

Commit 869f861

Browse files
authored
Merge pull request #7219 from rabbitmq/require-feature-flags
Require all feature flags introduced before 3.11.1
2 parents 5c7d7e6 + 5045fce commit 869f861

File tree

10 files changed

+62
-587
lines changed

10 files changed

+62
-587
lines changed

deps/rabbit/src/rabbit_channel_tracking.erl

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,14 @@
2929
-export([list/0, list_of_user/1, list_on_node/1,
3030
tracked_channel_table_name_for/1,
3131
tracked_channel_per_user_table_name_for/1,
32-
get_all_tracked_channel_table_names_for_node/1,
3332
ensure_tracked_tables_for_this_node/0,
3433
delete_tracked_channel_user_entry/1]).
3534

36-
%% All nodes (that support the `tracking_records_in_ets' feature) must
37-
%% export this function with the same spec, as they are called via
38-
%% RPC from other nodes. (Their implementation can differ.)
3935
-export([count_local_tracked_items_of_user/1]).
4036

41-
-export([migrate_tracking_records/0]).
37+
-ifdef(TEST).
38+
-export([get_all_tracked_channel_table_names_for_node/1]).
39+
-endif.
4240

4341
-include_lib("rabbit_common/include/rabbit.hrl").
4442

@@ -464,27 +462,3 @@ close_channels(TrackedChannels = [#tracked_channel{}|_]) ->
464462
|| #tracked_channel{pid = ChPid} <- TrackedChannels],
465463
ok;
466464
close_channels(_TrackedChannels = []) -> ok.
467-
468-
migrate_tracking_records() ->
469-
Node = node(),
470-
rabbit_mnesia:execute_mnesia_transaction(
471-
fun () ->
472-
Table = tracked_channel_table_name_for(Node),
473-
_ = mnesia:lock({table, Table}, read),
474-
Channels = mnesia:select(Table, [{'$1',[],['$1']}]),
475-
lists:foreach(
476-
fun(Channel) ->
477-
ets:insert(tracked_channel, Channel)
478-
end, Channels)
479-
end),
480-
rabbit_mnesia:execute_mnesia_transaction(
481-
fun () ->
482-
Table = tracked_channel_per_user_table_name_for(Node),
483-
_ = mnesia:lock({table, Table}, read),
484-
Channels = mnesia:select(Table, [{'$1',[],['$1']}]),
485-
lists:foreach(
486-
fun(#tracked_channel_per_user{channel_count = C,
487-
user = Username}) ->
488-
ets:update_counter(tracked_channel_per_user, Username, C, {Username, 0})
489-
end, Channels)
490-
end).

deps/rabbit/src/rabbit_connection_tracking.erl

Lines changed: 3 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
-export([tracked_connection_table_name_for/1,
3030
tracked_connection_per_vhost_table_name_for/1,
3131
tracked_connection_per_user_table_name_for/1,
32-
get_all_tracked_connection_table_names_for_node/1,
3332
clear_tracked_connection_tables_for_this_node/0,
3433

3534
ensure_tracked_tables_for_this_node/0,
@@ -41,13 +40,12 @@
4140
tracked_connection_from_connection_state/1,
4241
lookup/1, count/0]).
4342

44-
%% All nodes (that support the `tracking_records_in_ets' feature) must
45-
%% export these functions with the same spec, as they are called via
46-
%% RPC from other nodes. (Their implementation can differ.)
4743
-export([count_local_tracked_items_in_vhost/1,
4844
count_local_tracked_items_of_user/1]).
4945

50-
-export([migrate_tracking_records/0]).
46+
-ifdef(TEST).
47+
-export([get_all_tracked_connection_table_names_for_node/1]).
48+
-endif.
5149

5250
-include_lib("rabbit_common/include/rabbit.hrl").
5351

@@ -728,38 +726,3 @@ close_connection(#tracked_connection{pid = Pid}, Message) ->
728726
% best effort, this will work for connections to the stream plugin
729727
Node = node(Pid),
730728
rpc:call(Node, gen_server, call, [Pid, {shutdown, Message}, infinity]).
731-
732-
migrate_tracking_records() ->
733-
Node = node(),
734-
rabbit_mnesia:execute_mnesia_transaction(
735-
fun () ->
736-
Table = tracked_connection_table_name_for(Node),
737-
_ = mnesia:lock({table, Table}, read),
738-
Connections = mnesia:select(Table, [{'$1',[],['$1']}]),
739-
lists:foreach(
740-
fun(Connection) ->
741-
ets:insert(tracked_connection, Connection)
742-
end, Connections)
743-
end),
744-
rabbit_mnesia:execute_mnesia_transaction(
745-
fun () ->
746-
Table = tracked_connection_per_user_table_name_for(Node),
747-
_ = mnesia:lock({table, Table}, read),
748-
Connections = mnesia:select(Table, [{'$1',[],['$1']}]),
749-
lists:foreach(
750-
fun(#tracked_connection_per_user{connection_count = C,
751-
user = Username}) ->
752-
ets:update_counter(tracked_connection_per_user, Username, C, {Username, 0})
753-
end, Connections)
754-
end),
755-
rabbit_mnesia:execute_mnesia_transaction(
756-
fun () ->
757-
Table = tracked_connection_per_vhost_table_name_for(Node),
758-
_ = mnesia:lock({table, Table}, read),
759-
Connections = mnesia:select(Table, [{'$1',[],['$1']}]),
760-
lists:foreach(
761-
fun(#tracked_connection_per_vhost{connection_count = C,
762-
vhost = VHost}) ->
763-
ets:update_counter(tracked_connection_per_vhost, VHost, C, {VHost, 0})
764-
end, Connections)
765-
end).

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 13 additions & 153 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,11 @@
77

88
-module(rabbit_core_ff).
99

10-
-include_lib("kernel/include/logger.hrl").
11-
12-
-include_lib("rabbit_common/include/logging.hrl").
13-
14-
-export([direct_exchange_routing_v2_enable/1,
15-
listener_records_in_ets_enable/1,
16-
listener_records_in_ets_post_enable/1,
17-
tracking_records_in_ets_enable/1,
18-
tracking_records_in_ets_post_enable/1]).
19-
2010
-rabbit_feature_flag(
2111
{classic_mirrored_queue_version,
2212
#{desc => "Support setting version for classic mirrored queues",
23-
stability => stable
13+
%%TODO remove compatibility code
14+
stability => required
2415
}}).
2516

2617
-rabbit_feature_flag(
@@ -68,7 +59,8 @@
6859
{stream_single_active_consumer,
6960
#{desc => "Single active consumer for streams",
7061
doc_url => "https://www.rabbitmq.com/stream.html",
71-
stability => stable,
62+
%%TODO remove compatibility code
63+
stability => required,
7264
depends_on => [stream_queue]
7365
}}).
7466

@@ -81,31 +73,25 @@
8173
-rabbit_feature_flag(
8274
{direct_exchange_routing_v2,
8375
#{desc => "v2 direct exchange routing implementation",
84-
stability => stable,
85-
depends_on => [feature_flags_v2, implicit_default_bindings],
86-
callbacks => #{enable => {?MODULE, direct_exchange_routing_v2_enable}}
76+
%%TODO remove compatibility code
77+
stability => required,
78+
depends_on => [feature_flags_v2, implicit_default_bindings]
8779
}}).
8880

8981
-rabbit_feature_flag(
9082
{listener_records_in_ets,
9183
#{desc => "Store listener records in ETS instead of Mnesia",
92-
stability => stable,
93-
depends_on => [feature_flags_v2],
94-
callbacks => #{enable =>
95-
{?MODULE, listener_records_in_ets_enable},
96-
post_enable =>
97-
{?MODULE, listener_records_in_ets_post_enable}}
84+
%%TODO remove compatibility code
85+
stability => required,
86+
depends_on => [feature_flags_v2]
9887
}}).
9988

10089
-rabbit_feature_flag(
10190
{tracking_records_in_ets,
10291
#{desc => "Store tracking records in ETS instead of Mnesia",
103-
stability => stable,
104-
depends_on => [feature_flags_v2],
105-
callbacks => #{enable =>
106-
{?MODULE, tracking_records_in_ets_enable},
107-
post_enable =>
108-
{?MODULE, tracking_records_in_ets_post_enable}}
92+
%%TODO remove compatibility code
93+
stability => required,
94+
depends_on => [feature_flags_v2]
10995
}}).
11096

11197
-rabbit_feature_flag(
@@ -124,129 +110,3 @@
124110
stability => stable,
125111
depends_on => [stream_queue]
126112
}}).
127-
128-
%% -------------------------------------------------------------------
129-
%% Direct exchange routing v2.
130-
%% -------------------------------------------------------------------
131-
132-
-spec direct_exchange_routing_v2_enable(Args) -> Ret when
133-
Args :: rabbit_feature_flags:enable_callback_args(),
134-
Ret :: rabbit_feature_flags:enable_callback_ret().
135-
direct_exchange_routing_v2_enable(#{feature_name := FeatureName}) ->
136-
TableName = rabbit_index_route,
137-
ok = rabbit_table:wait([rabbit_route, rabbit_exchange], _Retry = true),
138-
try
139-
case rabbit_db_binding:create_index_route_table() of
140-
ok ->
141-
ok;
142-
{error, Err} = Error ->
143-
?LOG_ERROR(
144-
"Feature flags: `~ts`: failed to add copy of table ~ts to "
145-
"node ~tp: ~tp",
146-
[FeatureName, TableName, node(), Err],
147-
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
148-
Error
149-
end
150-
catch throw:{error, Reason} ->
151-
?LOG_ERROR(
152-
"Feature flags: `~ts`: enable callback failure: ~tp",
153-
[FeatureName, Reason],
154-
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
155-
{error, Reason}
156-
end.
157-
158-
%% -------------------------------------------------------------------
159-
%% Listener records moved from Mnesia to ETS.
160-
%% -------------------------------------------------------------------
161-
162-
listener_records_in_ets_enable(#{feature_name := FeatureName}) ->
163-
try
164-
rabbit_mnesia:execute_mnesia_transaction(
165-
fun () ->
166-
_ = mnesia:lock({table, rabbit_listener}, read),
167-
Listeners = mnesia:select(
168-
rabbit_listener, [{'$1',[],['$1']}]),
169-
lists:foreach(
170-
fun(Listener) ->
171-
ets:insert(rabbit_listener_ets, Listener)
172-
end, Listeners)
173-
end)
174-
catch
175-
throw:{error, {no_exists, rabbit_listener}} ->
176-
ok;
177-
throw:{error, Reason} ->
178-
?LOG_ERROR(
179-
"Feature flags: `~ts`: failed to migrate Mnesia table: ~tp",
180-
[FeatureName, Reason],
181-
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
182-
{error, Reason}
183-
end.
184-
185-
listener_records_in_ets_post_enable(#{feature_name := FeatureName}) ->
186-
try
187-
case mnesia:delete_table(rabbit_listener) of
188-
{atomic, ok} ->
189-
ok;
190-
{aborted, {no_exists, _}} ->
191-
ok;
192-
{aborted, Err} ->
193-
?LOG_ERROR(
194-
"Feature flags: `~ts`: failed to delete Mnesia table: ~tp",
195-
[FeatureName, Err],
196-
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
197-
ok
198-
end
199-
catch
200-
throw:{error, Reason} ->
201-
?LOG_ERROR(
202-
"Feature flags: `~ts`: failed to delete Mnesia table: ~tp",
203-
[FeatureName, Reason],
204-
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
205-
ok
206-
end.
207-
208-
tracking_records_in_ets_enable(#{feature_name := FeatureName}) ->
209-
try
210-
rabbit_connection_tracking:migrate_tracking_records(),
211-
rabbit_channel_tracking:migrate_tracking_records()
212-
catch
213-
throw:{error, {no_exists, _}} ->
214-
ok;
215-
throw:{error, Reason} ->
216-
?LOG_ERROR(
217-
"Enabling feature flag ~ts failed: ~tp",
218-
[FeatureName, Reason],
219-
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
220-
{error, Reason}
221-
end.
222-
223-
tracking_records_in_ets_post_enable(#{feature_name := FeatureName}) ->
224-
try
225-
[delete_table(FeatureName, Tab) ||
226-
Tab <- rabbit_connection_tracking:get_all_tracked_connection_table_names_for_node(node())],
227-
[delete_table(FeatureName, Tab) ||
228-
Tab <- rabbit_channel_tracking:get_all_tracked_channel_table_names_for_node(node())]
229-
catch
230-
throw:{error, Reason} ->
231-
?LOG_ERROR(
232-
"Enabling feature flag ~ts failed: ~tp",
233-
[FeatureName, Reason],
234-
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
235-
%% adheres to the callback interface
236-
ok
237-
end.
238-
239-
delete_table(FeatureName, Tab) ->
240-
case mnesia:delete_table(Tab) of
241-
{atomic, ok} ->
242-
ok;
243-
{aborted, {no_exists, _}} ->
244-
ok;
245-
{aborted, Err} ->
246-
?LOG_ERROR(
247-
"Enabling feature flag ~ts failed to delete mnesia table ~tp: ~tp",
248-
[FeatureName, Tab, Err],
249-
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
250-
%% adheres to the callback interface
251-
ok
252-
end.

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232

3333
-export([recover/0, recover/1]).
3434

35-
-export([create_index_route_table/0]).
36-
3735
%% For testing
3836
-export([clear/0]).
3937

@@ -472,52 +470,6 @@ recover_in_mnesia(RecoverFun) ->
472470
source = Src}} = Route <-
473471
rabbit_mnesia:dirty_read_all(?MNESIA_SEMI_DURABLE_TABLE)].
474472

475-
%% -------------------------------------------------------------------
476-
%% create_index_route_table().
477-
%% -------------------------------------------------------------------
478-
479-
-spec create_index_route_table() -> ok | {error, any()}.
480-
create_index_route_table() ->
481-
rabbit_db:run(
482-
#{mnesia => fun() -> create_index_route_table_in_mnesia() end
483-
}).
484-
485-
create_index_route_table_in_mnesia() ->
486-
DependantTables = [?MNESIA_TABLE, rabbit_exchange],
487-
ok = rabbit_table:wait(DependantTables, _Retry = true),
488-
[ok = rabbit_table:create_local_copy(Tab, ram_copies) || Tab <- DependantTables],
489-
ok = rabbit_table:create(
490-
?MNESIA_INDEX_TABLE, rabbit_table:rabbit_index_route_definition()),
491-
case rabbit_table:ensure_table_copy(?MNESIA_INDEX_TABLE, node(), ram_copies) of
492-
ok ->
493-
ok = populate_index_route_table_in_mnesia();
494-
Error ->
495-
Error
496-
end.
497-
498-
populate_index_route_table_in_mnesia() ->
499-
rabbit_mnesia:execute_mnesia_transaction(
500-
fun () ->
501-
_ = mnesia:lock({table, ?MNESIA_TABLE}, read),
502-
_ = mnesia:lock({table, rabbit_exchange}, read),
503-
_ = mnesia:lock({table, ?MNESIA_INDEX_TABLE}, write),
504-
Routes = rabbit_mnesia:dirty_read_all(?MNESIA_TABLE),
505-
lists:foreach(fun(#route{binding = #binding{source = Exchange}} = Route) ->
506-
case rabbit_db_exchange:get(Exchange) of
507-
{ok, X} ->
508-
case should_index_table(X) of
509-
true ->
510-
mnesia:dirty_write(?MNESIA_INDEX_TABLE,
511-
rabbit_binding:index_route(Route));
512-
false ->
513-
ok
514-
end;
515-
_ ->
516-
ok
517-
end
518-
end, Routes)
519-
end).
520-
521473
%% -------------------------------------------------------------------
522474
%% delete_all_for_exchange_in_mnesia().
523475
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_table.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
-export([
1111
create/0, create/2, ensure_local_copies/1, ensure_table_copy/3,
12-
create_local_copy/2, wait_for_replicated/1, wait/1, wait/2,
12+
wait_for_replicated/1, wait/1, wait/2,
1313
force_load/0, is_present/0, is_empty/0, needs_default_data/0,
1414
check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0,
1515
wait_for_replicated/0, exists/1]).

0 commit comments

Comments
 (0)