Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,6 @@
[{description, "message delivery logic ready"},
{requires, [core_initialized, recovery]}]}).

-rabbit_boot_step({connection_tracking,
[{description, "connection tracking infrastructure"},
{mfa, {rabbit_connection_tracking, boot, []}},
{enables, routing_ready}]}).

-rabbit_boot_step({channel_tracking,
[{description, "channel tracking infrastructure"},
{mfa, {rabbit_channel_tracking, boot, []}},
{enables, routing_ready}]}).

-rabbit_boot_step({background_gc,
[{description, "background garbage collection"},
{mfa, {rabbit_sup, start_restartable_child,
Expand Down
260 changes: 21 additions & 239 deletions deps/rabbit/src/rabbit_channel_tracking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
%% * rabbit_event
-behaviour(rabbit_tracking).

-export([boot/0,
update_tracked/1,
-export([update_tracked/1,
handle_cast/1,
register_tracked/1,
unregister_tracked/1,
count_tracked_items_in/1,
clear_tracking_tables/0,
shutdown_tracked_items/2]).

-export([list/0, list_of_user/1, list_on_node/1,
Expand All @@ -34,10 +32,6 @@

-export([count_local_tracked_items_of_user/1]).

-ifdef(TEST).
-export([get_all_tracked_channel_table_names_for_node/1]).
-endif.

-include_lib("rabbit_common/include/rabbit.hrl").

-import(rabbit_misc, [pget/2]).
Expand All @@ -49,15 +43,6 @@
%% API
%%

%% Sets up and resets channel tracking tables for this node.
-spec boot() -> ok.

boot() ->
ensure_tracked_channels_table_for_this_node(),
ensure_per_user_tracked_channels_table_for_node(),
clear_tracking_tables(),
ok.

-spec update_tracked(term()) -> ok.

update_tracked(Event) ->
Expand Down Expand Up @@ -115,30 +100,13 @@ handle_cast({user_deleted, Details}) ->
%% Schedule user entry deletion, allowing time for connections to close
_ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE,
delete_tracked_channel_user_entry, [Username]),
ok;
handle_cast({node_deleted, Details}) ->
case rabbit_feature_flags:is_enabled(tracking_records_in_ets) of
true ->
ok;
false ->
Node = pget(node, Details),
rabbit_log_channel:info(
"Node '~ts' was removed from the cluster, deleting"
" its channel tracking tables...", [Node]),
delete_tracked_channels_table_for_node(Node),
delete_per_user_tracked_channels_table_for_node(Node)
end.
ok.

-spec register_tracked(rabbit_types:tracked_channel()) -> ok.
-dialyzer([{nowarn_function, [register_tracked/1]}]).

register_tracked(TrackedCh = #tracked_channel{node = Node}) when Node == node() ->
case rabbit_feature_flags:is_enabled(tracking_records_in_ets) of
true -> register_tracked_ets(TrackedCh);
false -> register_tracked_mnesia(TrackedCh)
end.

register_tracked_ets(TrackedCh = #tracked_channel{pid = ChPid, username = Username}) ->
register_tracked(TrackedCh = #tracked_channel{pid = ChPid, username = Username,
node = Node}) when Node == node() ->
case ets:lookup(?TRACKED_CHANNEL_TABLE, ChPid) of
[] ->
ets:insert(?TRACKED_CHANNEL_TABLE, TrackedCh),
Expand All @@ -149,85 +117,31 @@ register_tracked_ets(TrackedCh = #tracked_channel{pid = ChPid, username = Userna
end,
ok.

register_tracked_mnesia(TrackedCh =
#tracked_channel{node = Node, name = Name, username = Username}) ->
ChId = rabbit_tracking:id(Node, Name),
TableName = tracked_channel_table_name_for(Node),
PerUserChTableName = tracked_channel_per_user_table_name_for(Node),
case mnesia:dirty_read(TableName, ChId) of
[] ->
mnesia:dirty_write(TableName, TrackedCh),
mnesia:dirty_update_counter(PerUserChTableName, Username, 1),
ok;
[#tracked_channel{}] ->
ok
end,
ok.

-spec unregister_tracked_by_pid(pid()) -> any().
unregister_tracked_by_pid(ChPid) when node(ChPid) == node() ->
case rabbit_feature_flags:is_enabled(tracking_records_in_ets) of
true -> unregister_tracked_by_pid_ets(ChPid);
false -> unregister_tracked_by_pid_mnesia(ChPid)
end.

unregister_tracked_by_pid_ets(ChPid) ->
case ets:lookup(?TRACKED_CHANNEL_TABLE, ChPid) of
[] -> ok;
[#tracked_channel{username = Username}] ->
ets:update_counter(?TRACKED_CHANNEL_TABLE_PER_USER, Username, -1),
ets:delete(?TRACKED_CHANNEL_TABLE, ChPid)
end.

unregister_tracked_by_pid_mnesia(ChPid) ->
case get_tracked_channel_by_pid_mnesia(ChPid) of
[] -> ok;
[#tracked_channel{id = ChId, node = Node, username = Username}] ->
TableName = tracked_channel_table_name_for(Node),
PerUserChannelTableName = tracked_channel_per_user_table_name_for(Node),

mnesia:dirty_update_counter(PerUserChannelTableName, Username, -1),
mnesia:dirty_delete(TableName, ChId)
end.

%% @doc This function is exported and implements a rabbit_tracking
%% callback, however it is not used in rabbitmq-server any more. It is
%% only kept for backwards compatibility if 3rd-party code would rely
%% on it.
-spec unregister_tracked(rabbit_types:tracked_channel_id()) -> ok.
unregister_tracked(ChId = {Node, _Name}) when Node == node() ->
case rabbit_feature_flags:is_enabled(tracking_records_in_ets) of
true -> unregister_tracked_ets(ChId);
false -> unregister_tracked_mnesia(ChId)
end.

unregister_tracked_ets(ChId) ->
case get_tracked_channel_by_id_ets(ChId) of
case get_tracked_channel_by_id(ChId) of
[] -> ok;
[#tracked_channel{pid = ChPid, username = Username}] ->
ets:update_counter(?TRACKED_CHANNEL_TABLE_PER_USER, Username, -1),
ets:delete(?TRACKED_CHANNEL_TABLE, ChPid)
end.

unregister_tracked_mnesia(ChId = {Node, _Name}) when Node =:= node() ->
TableName = tracked_channel_table_name_for(Node),
PerUserChannelTableName = tracked_channel_per_user_table_name_for(Node),
case mnesia:dirty_read(TableName, ChId) of
[] -> ok;
[#tracked_channel{username = Username}] ->
mnesia:dirty_update_counter(PerUserChannelTableName, Username, -1),
mnesia:dirty_delete(TableName, ChId)
end.

-spec count_tracked_items_in({atom(), rabbit_types:username()}) -> non_neg_integer().

count_tracked_items_in(Type) ->
case rabbit_feature_flags:is_enabled(tracking_records_in_ets) of
true -> count_tracked_items_in_ets(Type);
false -> count_tracked_items_in_mnesia(Type)
end.

count_tracked_items_in_ets({user, Username}) ->
count_tracked_items_in({user, Username}) ->
rabbit_tracking:count_on_all_nodes(
?MODULE, count_local_tracked_items_of_user, [Username],
["channels of user ", Username]).
Expand All @@ -236,20 +150,6 @@ count_tracked_items_in_ets({user, Username}) ->
count_local_tracked_items_of_user(Username) ->
rabbit_tracking:read_ets_counter(?TRACKED_CHANNEL_TABLE_PER_USER, Username).

count_tracked_items_in_mnesia({user, Username}) ->
rabbit_tracking:count_tracked_items_mnesia(
fun tracked_channel_per_user_table_name_for/1,
#tracked_channel_per_user.channel_count, Username,
"channels of user").

-spec clear_tracking_tables() -> ok.

clear_tracking_tables() ->
case rabbit_feature_flags:is_enabled(tracking_records_in_ets) of
true -> ok;
false -> clear_tracked_channel_tables_for_this_node()
end.

-spec shutdown_tracked_items(list(), term()) -> ok.

shutdown_tracked_items(TrackedItems, _Args) ->
Expand All @@ -267,50 +167,18 @@ list() ->
-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()].

list_of_user(Username) ->
case rabbit_feature_flags:is_enabled(tracking_records_in_ets) of
true -> list_of_user_ets(Username);
false -> list_of_user_mnesia(Username)
end.

list_of_user_ets(Username) ->
rabbit_tracking:match_tracked_items_ets(
rabbit_tracking:match_tracked_items(
?TRACKED_CHANNEL_TABLE,
#tracked_channel{username = Username, _ = '_'}).

list_of_user_mnesia(Username) ->
rabbit_tracking:match_tracked_items_mnesia(
fun tracked_channel_table_name_for/1,
#tracked_channel{username = Username, _ = '_'}).

-spec list_on_node(node()) -> [rabbit_types:tracked_channel()].
list_on_node(Node) when Node == node() ->
ets:tab2list(?TRACKED_CHANNEL_TABLE);
list_on_node(Node) ->
case rabbit_feature_flags:is_enabled(tracking_records_in_ets) of
true when Node == node() ->
list_on_node_ets();
true ->
case rabbit_misc:rpc_call(Node, ?MODULE, list_on_node, [Node]) of
List when is_list(List) ->
List;
_ ->
[]
end;
false ->
list_on_node_mnesia(Node)
end.

list_on_node_ets() ->
ets:tab2list(?TRACKED_CHANNEL_TABLE).

list_on_node_mnesia(Node) ->
try mnesia:dirty_match_object(
tracked_channel_table_name_for(Node),
#tracked_channel{_ = '_'})
catch exit:{aborted, {no_exists, _}} ->
%% The table might not exist yet (or is already gone)
%% between the time rabbit_nodes:list_running() runs and
%% returns a specific node, and
%% mnesia:dirty_match_object() is called for that node's
%% table.
case rabbit_misc:rpc_call(Node, ?MODULE, list_on_node, [Node]) of
List when is_list(List) ->
List;
_ ->
[]
end.

Expand All @@ -326,118 +194,32 @@ tracked_channel_per_user_table_name_for(Node) ->
"tracked_channel_table_per_user_on_node_~ts", [Node])).

ensure_tracked_tables_for_this_node() ->
_ = ensure_tracked_channels_table_for_this_node_ets(),
_ = ensure_per_user_tracked_channels_table_for_this_node_ets(),
_ = ensure_tracked_channels_table_for_this_node(),
_ = ensure_per_user_tracked_channels_table_for_this_node(),
ok.

%% internal
ensure_tracked_channels_table_for_this_node() ->
case rabbit_feature_flags:is_enabled(tracking_records_in_ets) of
true ->
ok;
false ->
ensure_tracked_channels_table_for_this_node_mnesia()
end.

ensure_per_user_tracked_channels_table_for_node() ->
case rabbit_feature_flags:is_enabled(tracking_records_in_ets) of
true ->
ok;
false ->
ensure_per_user_tracked_channels_table_for_this_node_mnesia()
end.

%% Create tables
ensure_tracked_channels_table_for_this_node_ets() ->
ensure_tracked_channels_table_for_this_node() ->
rabbit_log:info("Setting up a table for channel tracking on this node: ~tp",
[?TRACKED_CHANNEL_TABLE]),
ets:new(?TRACKED_CHANNEL_TABLE, [named_table, public, {write_concurrency, true},
{keypos, #tracked_channel.pid}]).

ensure_tracked_channels_table_for_this_node_mnesia() ->
Node = node(),
TableName = tracked_channel_table_name_for(Node),
case mnesia:create_table(TableName, [{record_name, tracked_channel},
{attributes, record_info(fields, tracked_channel)}]) of
{atomic, ok} ->
rabbit_log:info("Setting up a table for channel tracking on this node: ~tp",
[TableName]),
ok;
{aborted, {already_exists, _}} ->
rabbit_log:info("Setting up a table for channel tracking on this node: ~tp",
[TableName]),
ok;
{aborted, Error} ->
rabbit_log:error("Failed to create a tracked channel table for node ~tp: ~tp", [Node, Error]),
ok
end.

ensure_per_user_tracked_channels_table_for_this_node_ets() ->
ensure_per_user_tracked_channels_table_for_this_node() ->
rabbit_log:info("Setting up a table for channel tracking on this node: ~tp",
[?TRACKED_CHANNEL_TABLE_PER_USER]),
ets:new(?TRACKED_CHANNEL_TABLE_PER_USER, [named_table, public, {write_concurrency, true}]).

ensure_per_user_tracked_channels_table_for_this_node_mnesia() ->
Node = node(),
TableName = tracked_channel_per_user_table_name_for(Node),
case mnesia:create_table(TableName, [{record_name, tracked_channel_per_user},
{attributes, record_info(fields, tracked_channel_per_user)}]) of
{atomic, ok} ->
rabbit_log:info("Setting up a table for channel tracking on this node: ~tp",
[TableName]),
ok;
{aborted, {already_exists, _}} ->
rabbit_log:info("Setting up a table for channel tracking on this node: ~tp",
[TableName]),
ok;
{aborted, Error} ->
rabbit_log:error("Failed to create a per-user tracked channel table for node ~tp: ~tp", [Node, Error]),
ok
end.

clear_tracked_channel_tables_for_this_node() ->
[rabbit_tracking:clear_tracking_table(T)
|| T <- get_all_tracked_channel_table_names_for_node(node())].

delete_tracked_channels_table_for_node(Node) ->
TableName = tracked_channel_table_name_for(Node),
rabbit_tracking:delete_tracking_table(TableName, Node, "tracked channel").

delete_per_user_tracked_channels_table_for_node(Node) ->
TableName = tracked_channel_per_user_table_name_for(Node),
rabbit_tracking:delete_tracking_table(TableName, Node,
"per-user tracked channels").

get_all_tracked_channel_table_names_for_node(Node) ->
[tracked_channel_table_name_for(Node),
tracked_channel_per_user_table_name_for(Node)].

get_tracked_channels_by_connection_pid(ConnPid) ->
case rabbit_feature_flags:is_enabled(tracking_records_in_ets) of
true -> get_tracked_channels_by_connection_pid_ets(ConnPid);
false -> get_tracked_channels_by_connection_pid_mnesia(ConnPid)
end.

get_tracked_channels_by_connection_pid_ets(ConnPid) ->
rabbit_tracking:match_tracked_items_local(
?TRACKED_CHANNEL_TABLE,
#tracked_channel{connection = ConnPid, _ = '_'}).

get_tracked_channels_by_connection_pid_mnesia(ConnPid) ->
rabbit_tracking:match_tracked_items_mnesia(
fun tracked_channel_table_name_for/1,
#tracked_channel{connection = ConnPid, _ = '_'}).
?TRACKED_CHANNEL_TABLE,
#tracked_channel{connection = ConnPid, _ = '_'}).

get_tracked_channel_by_id_ets(ChId) ->
rabbit_tracking:match_tracked_items_ets(
get_tracked_channel_by_id(ChId) ->
rabbit_tracking:match_tracked_items(
?TRACKED_CHANNEL_TABLE,
#tracked_channel{id = ChId, _ = '_'}).

get_tracked_channel_by_pid_mnesia(ChPid) ->
rabbit_tracking:match_tracked_items_mnesia(
fun tracked_channel_table_name_for/1,
#tracked_channel{pid = ChPid, _ = '_'}).

delete_tracked_channel_user_entry(Username) ->
rabbit_tracking:delete_tracked_entry(
{rabbit_auth_backend_internal, exists, [Username]},
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_channel_tracking_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
[rabbit_event, ?MODULE, []]}},
{cleanup, {gen_event, delete_handler,
[rabbit_event, ?MODULE, []]}},
{requires, [channel_tracking]},
{requires, [tracking_metadata_store]},
{enables, recovery}]}).

%%
Expand Down
Loading