Skip to content

Commit 4719a14

Browse files
Merge pull request #6790 from rabbitmq/rabbitmq-3370-stream-metadata-maintenance-mode
Filter out nodes in maintenance in stream metadata
2 parents 5a8530c + 3fea81a commit 4719a14

File tree

4 files changed

+129
-47
lines changed

4 files changed

+129
-47
lines changed

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -199,14 +199,14 @@ maybe_sac_execute(Fun) ->
199199
overview(undefined) ->
200200
undefined;
201201
overview(#?MODULE{groups = Groups}) ->
202-
GroupsOverview = maps:map(fun (_, #group{consumers = Consumers,
203-
partition_index = Idx}) ->
204-
#{num_consumers => length(Consumers),
205-
partition_index => Idx}
206-
end, Groups),
207-
#{num_groups => map_size(Groups),
208-
groups => GroupsOverview}.
209-
202+
GroupsOverview =
203+
maps:map(fun(_,
204+
#group{consumers = Consumers, partition_index = Idx}) ->
205+
#{num_consumers => length(Consumers),
206+
partition_index => Idx}
207+
end,
208+
Groups),
209+
#{num_groups => map_size(Groups), groups => GroupsOverview}.
210210

211211
-spec init_state() -> state().
212212
init_state() ->

deps/rabbitmq_stream/src/rabbit_stream_manager.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,8 @@ handle_call({delete_super_stream, VirtualHost, SuperStream, Username},
288288
ok ->
289289
ok;
290290
{error, Error} ->
291-
rabbit_log:warning("Error while deleting super stream exchange ~tp, ~tp",
291+
rabbit_log:warning("Error while deleting super stream exchange ~tp, "
292+
"~tp",
292293
[SuperStream, Error]),
293294
ok
294295
end,
@@ -444,8 +445,8 @@ handle_call({partitions, VirtualHost, SuperStream}, _From, State) ->
444445
handle_call({partition_index, VirtualHost, SuperStream, Stream},
445446
_From, State) ->
446447
ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream),
447-
rabbit_log:debug("Looking for partition index of stream ~tp in super "
448-
"stream ~tp (virtual host ~tp)",
448+
rabbit_log:debug("Looking for partition index of stream ~tp in "
449+
"super stream ~tp (virtual host ~tp)",
449450
[Stream, SuperStream, VirtualHost]),
450451
Res = try
451452
rabbit_exchange:lookup_or_die(ExchangeName),
@@ -734,7 +735,8 @@ declare_super_stream_exchange(VirtualHost, Name, Username) ->
734735
catch
735736
exit:ExitError ->
736737
% likely to be a problem of inequivalent args on an existing stream
737-
rabbit_log:error("Error while creating ~tp super stream exchange: ~tp",
738+
rabbit_log:error("Error while creating ~tp super stream exchange: "
739+
"~tp",
738740
[Name, ExitError]),
739741
{error, validation_failed}
740742
end;

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -705,8 +705,8 @@ open(info, {resource_alarm, IsThereAlarm},
705705
{false, EnoughCredits} ->
706706
not EnoughCredits
707707
end,
708-
rabbit_log_connection:debug("Connection ~tp had blocked status set to ~tp, new "
709-
"blocked status is now ~tp",
708+
rabbit_log_connection:debug("Connection ~tp had blocked status set to ~tp, "
709+
"new blocked status is now ~tp",
710710
[ConnectionName, Blocked, NewBlockedState]),
711711
case {Blocked, NewBlockedState} of
712712
{true, false} ->
@@ -786,7 +786,8 @@ open(info,
786786
connection = Connection0,
787787
connection_state = ConnState0} =
788788
State) ->
789-
rabbit_log:debug("Subscription ~tp instructed to become active: ~tp",
789+
rabbit_log:debug("Subscription ~tp instructed to become active: "
790+
"~tp",
790791
[SubId, Active]),
791792
#stream_connection_state{consumers = Consumers0} = ConnState0,
792793
{Connection1, ConnState1} =
@@ -1172,8 +1173,8 @@ close_sent(info, {tcp_closed, S}, _StatemData) ->
11721173
[S, self()]),
11731174
stop;
11741175
close_sent(info, {tcp_error, S, Reason}, #statem_data{}) ->
1175-
rabbit_log_connection:error("Stream protocol connection socket error: ~tp [~w] "
1176-
"[~w]",
1176+
rabbit_log_connection:error("Stream protocol connection socket error: ~tp "
1177+
"[~w] [~w]",
11771178
[Reason, S, self()]),
11781179
stop;
11791180
close_sent(info, {resource_alarm, IsThereAlarm},
@@ -1872,8 +1873,8 @@ handle_frame_post_auth(Transport,
18721873
1),
18731874
{Connection, State};
18741875
false ->
1875-
rabbit_log:debug("Creating subscription ~tp to ~tp, with offset specificat"
1876-
"ion ~tp, properties ~0p",
1876+
rabbit_log:debug("Creating subscription ~tp to ~tp, with offset "
1877+
"specification ~tp, properties ~0p",
18771878
[SubscriptionId,
18781879
Stream,
18791880
OffsetSpec,
@@ -1884,8 +1885,8 @@ handle_frame_post_auth(Transport,
18841885
ConsumerName}
18851886
of
18861887
{true, false, _} ->
1887-
rabbit_log:warning("Cannot create subcription ~tp, stream single active "
1888-
"consumer feature flag is not enabled",
1888+
rabbit_log:warning("Cannot create subcription ~tp, stream single "
1889+
"active consumer feature flag is not enabled",
18891890
[SubscriptionId]),
18901891
response(Transport,
18911892
Connection,
@@ -2080,7 +2081,8 @@ handle_frame_post_auth(_Transport,
20802081
ok ->
20812082
case lookup_leader(Stream, Connection) of
20822083
{error, Error} ->
2083-
rabbit_log:warning("Could not find leader to store offset on ~tp: ~tp",
2084+
rabbit_log:warning("Could not find leader to store offset on ~tp: "
2085+
"~tp",
20842086
[Stream, Error]),
20852087
%% FIXME store offset is fire-and-forget, so no response even if error, change this?
20862088
{Connection, State};
@@ -2362,9 +2364,16 @@ handle_frame_post_auth(Transport,
23622364
end,
23632365
#{}, Streams),
23642366

2365-
Nodes =
2367+
Nodes0 =
23662368
lists:sort(
23672369
maps:keys(NodesMap)),
2370+
%% filter out nodes in maintenance
2371+
Nodes =
2372+
lists:filter(fun(N) ->
2373+
rabbit_maintenance:is_being_drained_consistent_read(N)
2374+
=:= false
2375+
end,
2376+
Nodes0),
23682377
NodeEndpoints =
23692378
lists:foldr(fun(Node, Acc) ->
23702379
PortFunction =
@@ -2486,7 +2495,8 @@ handle_frame_post_auth(Transport,
24862495
end,
24872496
case maps:take(CorrelationId, Requests0) of
24882497
{{{subscription_id, SubscriptionId}, {extra, Extra}}, Rs} ->
2489-
rabbit_log:debug("Received consumer update response for subscription ~tp",
2498+
rabbit_log:debug("Received consumer update response for subscription "
2499+
"~tp",
24902500
[SubscriptionId]),
24912501
Consumers1 =
24922502
case Consumers of
@@ -2557,8 +2567,8 @@ handle_frame_post_auth(Transport,
25572567
Consumer2,
25582568
ConsumerOffset = osiris_log:next_offset(Log2),
25592569

2560-
rabbit_log:debug("Subscription ~tp is now at offset ~tp with ~tp message(s) "
2561-
"distributed after subscription",
2570+
rabbit_log:debug("Subscription ~tp is now at offset ~tp with ~tp "
2571+
"message(s) distributed after subscription",
25622572
[SubscriptionId, ConsumerOffset,
25632573
messages_consumed(ConsumerCounters)]),
25642574

@@ -2745,7 +2755,8 @@ maybe_dispatch_on_subscription(Transport,
27452755
SubscriptionProperties,
27462756
SendFileOct,
27472757
false = _Sac) ->
2748-
rabbit_log:debug("Distributing existing messages to subscription ~tp",
2758+
rabbit_log:debug("Distributing existing messages to subscription "
2759+
"~tp",
27492760
[SubscriptionId]),
27502761
case send_chunks(DeliverVersion,
27512762
Transport,
@@ -2768,8 +2779,8 @@ maybe_dispatch_on_subscription(Transport,
27682779
ConsumerOffset = osiris_log:next_offset(Log1),
27692780
ConsumerOffsetLag = consumer_i(offset_lag, ConsumerState1),
27702781

2771-
rabbit_log:debug("Subscription ~tp is now at offset ~tp with ~tp message(s) "
2772-
"distributed after subscription",
2782+
rabbit_log:debug("Subscription ~tp is now at offset ~tp with ~tp "
2783+
"message(s) distributed after subscription",
27732784
[SubscriptionId, ConsumerOffset,
27742785
messages_consumed(ConsumerCounters1)]),
27752786

@@ -2794,9 +2805,9 @@ maybe_dispatch_on_subscription(_Transport,
27942805
SubscriptionProperties,
27952806
_SendFileOct,
27962807
true = _Sac) ->
2797-
rabbit_log:debug("No initial dispatch for subscription ~tp for now, "
2798-
"waiting for consumer update response from client "
2799-
"(single active consumer)",
2808+
rabbit_log:debug("No initial dispatch for subscription ~tp for "
2809+
"now, waiting for consumer update response from "
2810+
"client (single active consumer)",
28002811
[SubscriptionId]),
28012812
#consumer{credit = Credit,
28022813
configuration =
@@ -3624,6 +3635,7 @@ get_chunk_selector(Properties) ->
36243635
binary_to_atom(maps:get(<<"chunk_selector">>, Properties,
36253636
<<"user_data">>)).
36263637

3627-
close_log(undefined) -> ok;
3638+
close_log(undefined) ->
3639+
ok;
36283640
close_log(Log) ->
36293641
osiris_log:close(Log).

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ groups() ->
4949
%% Run `test_global_counters` on its own so the global metrics are
5050
%% initialised to 0 for each testcase
5151
{single_node_1, [], [test_global_counters]},
52-
{cluster, [], [test_stream, test_stream_tls, java]}].
52+
{cluster, [], [test_stream, test_stream_tls, test_metadata, java]}].
5353

5454
init_per_suite(Config) ->
5555
case rabbit_ct_helpers:is_mixed_versions() of
@@ -184,6 +184,76 @@ test_stream_tls(Config) ->
184184
test_server(ssl, Stream, Config),
185185
ok.
186186

187+
test_metadata(Config) ->
188+
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
189+
Transport = gen_tcp,
190+
Port = get_stream_port(Config),
191+
FirstNode = get_node_name(Config, 0),
192+
NodeInMaintenance = get_node_name(Config, 1),
193+
{ok, S} =
194+
Transport:connect("localhost", Port,
195+
[{active, false}, {mode, binary}]),
196+
C0 = rabbit_stream_core:init(0),
197+
C1 = test_peer_properties(Transport, S, C0),
198+
C2 = test_authenticate(Transport, S, C1),
199+
C3 = test_create_stream(Transport, S, Stream, C2),
200+
GetStreamNodes =
201+
fun() ->
202+
MetadataFrame =
203+
rabbit_stream_core:frame({request, 1, {metadata, [Stream]}}),
204+
ok = Transport:send(S, MetadataFrame),
205+
{CmdMetadata, _} = receive_commands(Transport, S, C3),
206+
{response, 1,
207+
{metadata, _Nodes, #{Stream := {Leader = {_H, _P}, Replicas}}}} =
208+
CmdMetadata,
209+
[Leader | Replicas]
210+
end,
211+
rabbit_ct_helpers:await_condition(fun() ->
212+
length(GetStreamNodes()) == 3
213+
end),
214+
rabbit_ct_broker_helpers:rpc(Config,
215+
NodeInMaintenance,
216+
rabbit_maintenance,
217+
drain,
218+
[]),
219+
220+
IsBeingDrained =
221+
fun() ->
222+
rabbit_ct_broker_helpers:rpc(Config,
223+
FirstNode,
224+
rabbit_maintenance,
225+
is_being_drained_consistent_read,
226+
[NodeInMaintenance])
227+
end,
228+
rabbit_ct_helpers:await_condition(fun() -> IsBeingDrained() end),
229+
230+
rabbit_ct_helpers:await_condition(fun() ->
231+
length(GetStreamNodes()) == 2
232+
end),
233+
234+
rabbit_ct_broker_helpers:rpc(Config,
235+
NodeInMaintenance,
236+
rabbit_maintenance,
237+
revive,
238+
[]),
239+
240+
rabbit_ct_helpers:await_condition(fun() -> IsBeingDrained() =:= false
241+
end),
242+
243+
rabbit_ct_helpers:await_condition(fun() ->
244+
length(GetStreamNodes()) == 3
245+
end),
246+
247+
DeleteStreamFrame =
248+
rabbit_stream_core:frame({request, 1, {delete_stream, Stream}}),
249+
ok = Transport:send(S, DeleteStreamFrame),
250+
{CmdDelete, C4} = receive_commands(Transport, S, C3),
251+
?assertMatch({response, 1, {delete_stream, ?RESPONSE_CODE_OK}},
252+
CmdDelete),
253+
_C5 = test_close(Transport, S, C4),
254+
closed = wait_for_socket_close(Transport, S, 10),
255+
ok.
256+
187257
test_gc_consumers(Config) ->
188258
Pid = spawn(fun() -> ok end),
189259
rabbit_ct_broker_helpers:rpc(Config,
@@ -434,12 +504,13 @@ test_server(Transport, Stream, Config) ->
434504
?awaitMatch(#{consumers := 1}, get_global_counters(Config), ?WAIT),
435505
CounterKeys = maps:keys(get_osiris_counters(Config)),
436506
%% find the counter key for the subscriber
437-
{value, SubKey} = lists:search(fun ({rabbit_stream_reader, Q, Id, _}) ->
438-
Q == QName andalso
439-
Id == SubscriptionId;
440-
(_) ->
441-
false
442-
end, CounterKeys),
507+
{value, SubKey} =
508+
lists:search(fun ({rabbit_stream_reader, Q, Id, _}) ->
509+
Q == QName andalso Id == SubscriptionId;
510+
(_) ->
511+
false
512+
end,
513+
CounterKeys),
443514
C8 = test_deliver(Transport, S, SubscriptionId, 0, Body, C7),
444515
C8b = test_deliver(Transport, S, SubscriptionId, 1, Body, C8),
445516

@@ -580,18 +651,14 @@ test_subscribe(Transport,
580651
?assertMatch({response, 1, {subscribe, ?RESPONSE_CODE_OK}}, Cmd),
581652
C.
582653

583-
test_unsubscribe(Transport,
584-
Socket,
585-
SubscriptionId, C0) ->
586-
UnsubCmd = {request, 1,
587-
{unsubscribe, SubscriptionId}},
654+
test_unsubscribe(Transport, Socket, SubscriptionId, C0) ->
655+
UnsubCmd = {request, 1, {unsubscribe, SubscriptionId}},
588656
UnsubscribeFrame = rabbit_stream_core:frame(UnsubCmd),
589-
ok = Transport:send(Socket, UnsubscribeFrame ),
657+
ok = Transport:send(Socket, UnsubscribeFrame),
590658
{Cmd, C} = receive_commands(Transport, Socket, C0),
591659
?assertMatch({response, 1, {unsubscribe, ?RESPONSE_CODE_OK}}, Cmd),
592660
C.
593661

594-
595662
test_deliver(Transport, S, SubscriptionId, COffset, Body, C0) ->
596663
ct:pal("test_deliver ", []),
597664
{{deliver, SubscriptionId, Chunk}, C} =
@@ -708,6 +775,7 @@ get_osiris_counters(Config) ->
708775
osiris_counters,
709776
overview,
710777
[]).
778+
711779
get_global_counters(Config) ->
712780
maps:get([{protocol, stream}],
713781
rabbit_ct_broker_helpers:rpc(Config,

0 commit comments

Comments
 (0)