Skip to content

Commit a9bf852

Browse files
committed
Use pg_local to track AMQP 1.0 connections
Fixes #9371 Since each AMQP 1.0 connection opens several direct AMQP connections, we must assign each direct connection a unique name to prevent multiple entries in the `connection_created_stats` table. Also, use `pg_local` to track AMQP 1.0 connections instead of walking the supervisor trees.
1 parent c30ae67 commit a9bf852

File tree

6 files changed

+50
-31
lines changed

6 files changed

+50
-31
lines changed

deps/amqp_client/src/amqp_connection.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@
6565
-export([error_atom/1]).
6666
-export([info/2, info_keys/1, info_keys/0]).
6767
-export([connection_name/1, update_secret/3]).
68-
-export([socket_adapter_info/2]).
68+
-export([socket_adapter_info/2,
69+
socket_adapter_info/3]).
6970

7071
-define(DEFAULT_CONSUMER, {amqp_selective_consumer, []}).
7172

@@ -379,7 +380,12 @@ info_keys() ->
379380
%% @doc Takes a socket and a protocol, returns an #amqp_adapter_info{}
380381
%% based on the socket for the protocol given.
381382
socket_adapter_info(Sock, Protocol) ->
382-
amqp_direct_connection:socket_adapter_info(Sock, Protocol).
383+
socket_adapter_info(Sock, Protocol, undefined).
384+
385+
%% @doc Takes a socket and a protocol, returns an #amqp_adapter_info{}
386+
%% based on the socket for the protocol given.
387+
socket_adapter_info(Sock, Protocol, UniqueId) ->
388+
amqp_direct_connection:socket_adapter_info(Sock, Protocol, UniqueId).
383389

384390
%% @spec (ConnectionPid) -> ConnectionName
385391
%% where

deps/amqp_client/src/amqp_direct_connection.erl

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
-export([init/0, terminate/2, connect/4, do/2, open_channel_args/1, i/2,
1818
info_keys/0, handle_message/2, closing/3, channels_terminated/1]).
1919

20-
-export([socket_adapter_info/2]).
20+
-export([socket_adapter_info/2,
21+
socket_adapter_info/3]).
2122

2223
-record(state, {node,
2324
user,
@@ -176,17 +177,26 @@ ensure_adapter_info(A = #amqp_adapter_info{name = unknown}) ->
176177
ensure_adapter_info(Info) -> Info.
177178

178179
socket_adapter_info(Sock, Protocol) ->
180+
socket_adapter_info(Sock, Protocol, undefined).
181+
182+
socket_adapter_info(Sock, Protocol, UniqueId) ->
179183
{PeerHost, PeerPort, Host, Port} =
180-
case rabbit_net:socket_ends(Sock, inbound) of
181-
{ok, Res} -> Res;
182-
_ -> {unknown, unknown, unknown, unknown}
183-
end,
184-
Name = case rabbit_net:connection_string(Sock, inbound) of
185-
{ok, Res1} -> Res1;
186-
_Error -> "(unknown)"
184+
case rabbit_net:socket_ends(Sock, inbound) of
185+
{ok, Res} -> Res;
186+
_ -> {unknown, unknown, unknown, unknown}
187+
end,
188+
ConnectionString = case rabbit_net:connection_string(Sock, inbound) of
189+
{ok, Res1} -> Res1;
190+
_Error -> "(unknown)"
191+
end,
192+
Name = case UniqueId of
193+
undefined ->
194+
rabbit_data_coercion:to_binary(ConnectionString);
195+
_ ->
196+
rabbit_data_coercion:to_binary(rabbit_misc:format("~s (~tp)", [ConnectionString, UniqueId]))
187197
end,
188198
#amqp_adapter_info{protocol = Protocol,
189-
name = list_to_binary(Name),
199+
name = Name,
190200
host = Host,
191201
port = Port,
192202
peer_host = PeerHost,

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66
%%
77
-module(rabbit_amqp1_0).
88

9-
-export([connection_info_local/1,
10-
emit_connection_info_local/3,
9+
-export([emit_connection_info_local/3,
1110
emit_connection_info_all/4,
12-
list/0]).
11+
list/0,
12+
register_connection/1,
13+
unregister_connection/1]).
1314

1415
emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
1516
Pids = [spawn_link(Node, rabbit_amqp1_0, emit_connection_info_local,
@@ -26,17 +27,14 @@ emit_connection_info_local(Items, Ref, AggregatorPid) ->
2627
end,
2728
list()).
2829

29-
connection_info_local(Items) ->
30-
Connections = list(),
31-
[rabbit_amqp1_0_reader:info(Pid, Items) || Pid <- Connections].
32-
30+
-spec list() -> [pid()].
3331
list() ->
34-
[ReaderPid
35-
|| {_, TcpPid, _, [tcp_listener_sup]} <- supervisor:which_children(rabbit_sup),
36-
{_, RanchEPid, _, [ranch_embedded_sup]} <- supervisor:which_children(TcpPid),
37-
{_, RanchLPid, _, [ranch_listener_sup]} <- supervisor:which_children(RanchEPid),
38-
{_, RanchCSPid, _, [ranch_conns_sup_sup]} <- supervisor:which_children(RanchLPid),
39-
{_, RanchCPid, _, [ranch_conns_sup]} <- supervisor:which_children(RanchCSPid),
40-
{rabbit_connection_sup, ConnPid, _, _} <- supervisor:which_children(RanchCPid),
41-
{reader, ReaderPid, _, _} <- supervisor:which_children(ConnPid)
42-
].
32+
pg_local:get_members(rabbit_amqp10_connections).
33+
34+
-spec register_connection(pid()) -> ok.
35+
register_connection(Pid) ->
36+
pg_local:join(rabbit_amqp10_connections, Pid).
37+
38+
-spec unregister_connection(pid()) -> ok.
39+
unregister_connection(Pid) ->
40+
pg_local:leave(rabbit_amqp10_connections, Pid).

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,12 @@ update_last_blocked_by(Throttle) ->
238238

239239
close_connection(State = #v1{connection = #v1_connection{
240240
timeout_sec = TimeoutSec}}) ->
241+
Pid = self(),
241242
erlang:send_after((if TimeoutSec > 0 andalso
242243
TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
243244
true -> ?CLOSING_TIMEOUT
244-
end) * 1000, self(), terminate_connection),
245+
end) * 1000, Pid, terminate_connection),
246+
rabbit_amqp1_0:unregister_connection(Pid),
245247
State#v1{connection_state = closed}.
246248

247249
handle_dependent_exit(ChPid, Reason, State) ->
@@ -434,6 +436,7 @@ handle_1_0_connection_frame(#'v1_0.open'{ max_frame_size = ClientFrameMax,
434436
container_id = {utf8, rabbit_nodes:cluster_name()},
435437
properties = server_properties()}),
436438
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
439+
rabbit_amqp1_0:register_connection(self()),
437440
control_throttle(
438441
State1#v1{throttle = Throttle#throttle{alarmed_by = Conserve}});
439442

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_sup.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ start_link({amqp10_framing, Sock, Channel, FrameMax, ReaderPid,
6262
start =>
6363
{rabbit_amqp1_0_session_process, start_link, [
6464
{Channel, ReaderPid, WriterPid, User, VHost, FrameMax,
65-
adapter_info(User, SocketForAdapterInfo), Collector}
65+
adapter_info(User, SocketForAdapterInfo, Channel), Collector}
6666
]},
6767
restart => transient,
6868
significant => true,
@@ -98,7 +98,7 @@ init([]) ->
9898
%% See rabbit_direct.erl to see how `authz_bakends` is propagated from
9999
% amqp_adapter_info.additional_info to the rabbit_access_control module
100100

101-
adapter_info(User, Sock) ->
102-
AdapterInfo = amqp_connection:socket_adapter_info(Sock, {'AMQP', "1.0"}),
101+
adapter_info(User, Sock, UniqueId) ->
102+
AdapterInfo = amqp_connection:socket_adapter_info(Sock, {'AMQP', "1.0"}, UniqueId),
103103
AdapterInfo#amqp_adapter_info{additional_info =
104104
AdapterInfo#amqp_adapter_info.additional_info ++ [{authz_backends, User#user.authz_backends}]}.

deps/rabbitmq_management_agent/src/rabbit_mgmt_format.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ format_args({arguments, Value}) ->
9191
format_args(Stat) ->
9292
Stat.
9393

94+
format_connection_created({authz_backends, Value}) ->
95+
{authz_backends, print("~tp", Value)};
9496
format_connection_created({host, Value}) ->
9597
{host, addr(Value)};
9698
format_connection_created({peer_host, Value}) ->

0 commit comments

Comments
 (0)