Skip to content

Commit a828859

Browse files
committed
Use pg_local to track AMQP 1.0 connections
Fixes #9371 Moves `pg_local` to `rabbit_common`
1 parent c30ae67 commit a828859

File tree

8 files changed

+42
-33
lines changed

8 files changed

+42
-33
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,11 +1004,6 @@ rabbitmq_suite(
10041004
],
10051005
)
10061006

1007-
rabbitmq_suite(
1008-
name = "unit_pg_local_SUITE",
1009-
size = "small",
1010-
)
1011-
10121007
rabbitmq_suite(
10131008
name = "unit_plugin_directories_SUITE",
10141009
size = "small",

deps/rabbit/app.bzl

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ def all_beam_files(name = "all_beam_files"):
4141
"src/mc_util.erl",
4242
"src/mirrored_supervisor.erl",
4343
"src/mirrored_supervisor_sups.erl",
44-
"src/pg_local.erl",
4544
"src/pid_recomposition.erl",
4645
"src/rabbit.erl",
4746
"src/rabbit_access_control.erl",
@@ -292,7 +291,6 @@ def all_test_beam_files(name = "all_test_beam_files"):
292291
"src/mc_util.erl",
293292
"src/mirrored_supervisor.erl",
294293
"src/mirrored_supervisor_sups.erl",
295-
"src/pg_local.erl",
296294
"src/pid_recomposition.erl",
297295
"src/rabbit.erl",
298296
"src/rabbit_access_control.erl",
@@ -555,7 +553,6 @@ def all_srcs(name = "all_srcs"):
555553
"src/mc_util.erl",
556554
"src/mirrored_supervisor.erl",
557555
"src/mirrored_supervisor_sups.erl",
558-
"src/pg_local.erl",
559556
"src/pid_recomposition.erl",
560557
"src/rabbit.erl",
561558
"src/rabbit_access_control.erl",
@@ -1768,14 +1765,6 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
17681765
erlc_opts = "//:test_erlc_opts",
17691766
deps = ["//deps/rabbit_common:erlang_app"],
17701767
)
1771-
erlang_bytecode(
1772-
name = "unit_pg_local_SUITE_beam_files",
1773-
testonly = True,
1774-
srcs = ["test/unit_pg_local_SUITE.erl"],
1775-
outs = ["test/unit_pg_local_SUITE.beam"],
1776-
app_name = "rabbit",
1777-
erlc_opts = "//:test_erlc_opts",
1778-
)
17791768
erlang_bytecode(
17801769
name = "unit_plugin_directories_SUITE_beam_files",
17811770
testonly = True,

deps/rabbit_common/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,11 @@ rabbitmq_suite(
170170
],
171171
)
172172

173+
rabbitmq_suite(
174+
name = "unit_pg_local_SUITE",
175+
size = "small",
176+
)
177+
173178
rabbitmq_suite(
174179
name = "unit_priority_queue_SUITE",
175180
size = "small",

deps/rabbit_common/app.bzl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def all_beam_files(name = "all_beam_files"):
3232
"src/file_handle_cache_stats.erl",
3333
"src/mirrored_supervisor_locks.erl",
3434
"src/mnesia_sync.erl",
35+
"src/pg_local.erl",
3536
"src/pmon.erl",
3637
"src/priority_queue.erl",
3738
"src/rabbit_amqp_connection.erl",
@@ -127,6 +128,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
127128
"src/file_handle_cache_stats.erl",
128129
"src/mirrored_supervisor_locks.erl",
129130
"src/mnesia_sync.erl",
131+
"src/pg_local.erl",
130132
"src/pmon.erl",
131133
"src/priority_queue.erl",
132134
"src/rabbit_amqp_connection.erl",
@@ -215,6 +217,7 @@ def all_srcs(name = "all_srcs"):
215217
"src/gen_server2.erl",
216218
"src/mirrored_supervisor_locks.erl",
217219
"src/mnesia_sync.erl",
220+
"src/pg_local.erl",
218221
"src/pmon.erl",
219222
"src/priority_queue.erl",
220223
"src/rabbit_amqp_connection.erl",
@@ -348,6 +351,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
348351
erlc_opts = "//:test_erlc_opts",
349352
deps = ["@proper//:erlang_app"],
350353
)
354+
erlang_bytecode(
355+
name = "unit_pg_local_SUITE_beam_files",
356+
testonly = True,
357+
srcs = ["test/unit_pg_local_SUITE.erl"],
358+
outs = ["test/unit_pg_local_SUITE.beam"],
359+
app_name = "rabbit",
360+
erlc_opts = "//:test_erlc_opts",
361+
)
351362
erlang_bytecode(
352363
name = "unit_priority_queue_SUITE_beam_files",
353364
testonly = True,
File renamed without changes.

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66
%%
77
-module(rabbit_amqp1_0).
88

9-
-export([connection_info_local/1,
10-
emit_connection_info_local/3,
9+
-export([emit_connection_info_local/3,
1110
emit_connection_info_all/4,
12-
list/0]).
11+
list/0,
12+
register_connection/1,
13+
unregister_connection/1]).
1314

1415
emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
1516
Pids = [spawn_link(Node, rabbit_amqp1_0, emit_connection_info_local,
@@ -26,17 +27,22 @@ emit_connection_info_local(Items, Ref, AggregatorPid) ->
2627
end,
2728
list()).
2829

29-
connection_info_local(Items) ->
30-
Connections = list(),
31-
[rabbit_amqp1_0_reader:info(Pid, Items) || Pid <- Connections].
32-
3330
list() ->
34-
[ReaderPid
35-
|| {_, TcpPid, _, [tcp_listener_sup]} <- supervisor:which_children(rabbit_sup),
36-
{_, RanchEPid, _, [ranch_embedded_sup]} <- supervisor:which_children(TcpPid),
37-
{_, RanchLPid, _, [ranch_listener_sup]} <- supervisor:which_children(RanchEPid),
38-
{_, RanchCSPid, _, [ranch_conns_sup_sup]} <- supervisor:which_children(RanchLPid),
39-
{_, RanchCPid, _, [ranch_conns_sup]} <- supervisor:which_children(RanchCSPid),
40-
{rabbit_connection_sup, ConnPid, _, _} <- supervisor:which_children(RanchCPid),
41-
{reader, ReaderPid, _, _} <- supervisor:which_children(ConnPid)
42-
].
31+
pg_local:get_members(rabbit_amqp10_connections).
32+
%% [ReaderPid
33+
%% || {_, TcpPid, _, [tcp_listener_sup]} <- supervisor:which_children(rabbit_sup),
34+
%% {_, RanchEPid, _, [ranch_embedded_sup]} <- supervisor:which_children(TcpPid),
35+
%% {_, RanchLPid, _, [ranch_listener_sup]} <- supervisor:which_children(RanchEPid),
36+
%% {_, RanchCSPid, _, [ranch_conns_sup_sup]} <- supervisor:which_children(RanchLPid),
37+
%% {_, RanchCPid, _, [ranch_conns_sup]} <- supervisor:which_children(RanchCSPid),
38+
%% {rabbit_connection_sup, ConnPid, _, _} <- supervisor:which_children(RanchCPid),
39+
%% {reader, ReaderPid, _, _} <- supervisor:which_children(ConnPid)
40+
%% ].
41+
42+
-spec register_connection(pid()) -> ok.
43+
register_connection(Pid) ->
44+
pg_local:join(rabbit_amqp10_connections, Pid).
45+
46+
-spec unregister_connection(pid()) -> ok.
47+
unregister_connection(Pid) ->
48+
pg_local:leave(rabbit_amqp10_connections, Pid).

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,10 +238,12 @@ update_last_blocked_by(Throttle) ->
238238

239239
close_connection(State = #v1{connection = #v1_connection{
240240
timeout_sec = TimeoutSec}}) ->
241+
Pid = self(),
241242
erlang:send_after((if TimeoutSec > 0 andalso
242243
TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
243244
true -> ?CLOSING_TIMEOUT
244-
end) * 1000, self(), terminate_connection),
245+
end) * 1000, Pid, terminate_connection),
246+
rabbit_amqp1_0:unregister_connection(Pid),
245247
State#v1{connection_state = closed}.
246248

247249
handle_dependent_exit(ChPid, Reason, State) ->
@@ -434,6 +436,7 @@ handle_1_0_connection_frame(#'v1_0.open'{ max_frame_size = ClientFrameMax,
434436
container_id = {utf8, rabbit_nodes:cluster_name()},
435437
properties = server_properties()}),
436438
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
439+
rabbit_amqp1_0:register_connection(self()),
437440
control_throttle(
438441
State1#v1{throttle = Throttle#throttle{alarmed_by = Conserve}});
439442

0 commit comments

Comments
 (0)