Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions deps/rabbit/src/rabbit_stream_sac_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,14 @@ maybe_sac_execute(Fun) ->
overview(undefined) ->
undefined;
overview(#?MODULE{groups = Groups}) ->
GroupsOverview = maps:map(fun (_, #group{consumers = Consumers,
partition_index = Idx}) ->
#{num_consumers => length(Consumers),
partition_index => Idx}
end, Groups),
#{num_groups => map_size(Groups),
groups => GroupsOverview}.

GroupsOverview =
maps:map(fun(_,
#group{consumers = Consumers, partition_index = Idx}) ->
#{num_consumers => length(Consumers),
partition_index => Idx}
end,
Groups),
#{num_groups => map_size(Groups), groups => GroupsOverview}.

-spec init_state() -> state().
init_state() ->
Expand Down
10 changes: 6 additions & 4 deletions deps/rabbitmq_stream/src/rabbit_stream_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ handle_call({delete_super_stream, VirtualHost, SuperStream, Username},
ok ->
ok;
{error, Error} ->
rabbit_log:warning("Error while deleting super stream exchange ~p, ~p",
rabbit_log:warning("Error while deleting super stream exchange ~tp, "
"~tp",
[SuperStream, Error]),
ok
end,
Expand Down Expand Up @@ -444,8 +445,8 @@ handle_call({partitions, VirtualHost, SuperStream}, _From, State) ->
handle_call({partition_index, VirtualHost, SuperStream, Stream},
_From, State) ->
ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream),
rabbit_log:debug("Looking for partition index of stream ~p in super "
"stream ~p (virtual host ~p)",
rabbit_log:debug("Looking for partition index of stream ~tp in "
"super stream ~tp (virtual host ~tp)",
[Stream, SuperStream, VirtualHost]),
Res = try
rabbit_exchange:lookup_or_die(ExchangeName),
Expand Down Expand Up @@ -734,7 +735,8 @@ declare_super_stream_exchange(VirtualHost, Name, Username) ->
catch
exit:ExitError ->
% likely to be a problem of inequivalent args on an existing stream
rabbit_log:error("Error while creating ~p super stream exchange: ~p",
rabbit_log:error("Error while creating ~tp super stream exchange: "
"~tp",
[Name, ExitError]),
{error, validation_failed}
end;
Expand Down
54 changes: 33 additions & 21 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -705,8 +705,8 @@ open(info, {resource_alarm, IsThereAlarm},
{false, EnoughCredits} ->
not EnoughCredits
end,
rabbit_log_connection:debug("Connection ~p had blocked status set to ~p, new "
"blocked status is now ~p",
rabbit_log_connection:debug("Connection ~tp had blocked status set to ~tp, "
"new blocked status is now ~tp",
[ConnectionName, Blocked, NewBlockedState]),
case {Blocked, NewBlockedState} of
{true, false} ->
Expand Down Expand Up @@ -786,7 +786,8 @@ open(info,
connection = Connection0,
connection_state = ConnState0} =
State) ->
rabbit_log:debug("Subscription ~p instructed to become active: ~p",
rabbit_log:debug("Subscription ~tp instructed to become active: "
"~tp",
[SubId, Active]),
#stream_connection_state{consumers = Consumers0} = ConnState0,
{Connection1, ConnState1} =
Expand Down Expand Up @@ -1172,8 +1173,8 @@ close_sent(info, {tcp_closed, S}, _StatemData) ->
[S, self()]),
stop;
close_sent(info, {tcp_error, S, Reason}, #statem_data{}) ->
rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] "
"[~w]",
rabbit_log_connection:error("Stream protocol connection socket error: ~tp "
"[~w] [~w]",
[Reason, S, self()]),
stop;
close_sent(info, {resource_alarm, IsThereAlarm},
Expand Down Expand Up @@ -1872,8 +1873,8 @@ handle_frame_post_auth(Transport,
1),
{Connection, State};
false ->
rabbit_log:debug("Creating subscription ~p to ~p, with offset specificat"
"ion ~p, properties ~0p",
rabbit_log:debug("Creating subscription ~tp to ~tp, with offset "
"specification ~tp, properties ~0p",
[SubscriptionId,
Stream,
OffsetSpec,
Expand All @@ -1884,8 +1885,8 @@ handle_frame_post_auth(Transport,
ConsumerName}
of
{true, false, _} ->
rabbit_log:warning("Cannot create subcription ~p, stream single active "
"consumer feature flag is not enabled",
rabbit_log:warning("Cannot create subcription ~tp, stream single "
"active consumer feature flag is not enabled",
[SubscriptionId]),
response(Transport,
Connection,
Expand Down Expand Up @@ -2080,7 +2081,8 @@ handle_frame_post_auth(_Transport,
ok ->
case lookup_leader(Stream, Connection) of
{error, Error} ->
rabbit_log:warning("Could not find leader to store offset on ~p: ~p",
rabbit_log:warning("Could not find leader to store offset on ~tp: "
"~tp",
[Stream, Error]),
%% FIXME store offset is fire-and-forget, so no response even if error, change this?
{Connection, State};
Expand Down Expand Up @@ -2362,9 +2364,16 @@ handle_frame_post_auth(Transport,
end,
#{}, Streams),

Nodes =
Nodes0 =
lists:sort(
maps:keys(NodesMap)),
%% filter out nodes in maintenance
Nodes =
lists:filter(fun(N) ->
rabbit_maintenance:is_being_drained_consistent_read(N)
=:= false
end,
Nodes0),
NodeEndpoints =
lists:foldr(fun(Node, Acc) ->
PortFunction =
Expand Down Expand Up @@ -2486,7 +2495,8 @@ handle_frame_post_auth(Transport,
end,
case maps:take(CorrelationId, Requests0) of
{{{subscription_id, SubscriptionId}, {extra, Extra}}, Rs} ->
rabbit_log:debug("Received consumer update response for subscription ~p",
rabbit_log:debug("Received consumer update response for subscription "
"~tp",
[SubscriptionId]),
Consumers1 =
case Consumers of
Expand Down Expand Up @@ -2557,8 +2567,8 @@ handle_frame_post_auth(Transport,
Consumer2,
ConsumerOffset = osiris_log:next_offset(Log2),

rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) "
"distributed after subscription",
rabbit_log:debug("Subscription ~tp is now at offset ~tp with ~tp "
"message(s) distributed after subscription",
[SubscriptionId, ConsumerOffset,
messages_consumed(ConsumerCounters)]),

Expand Down Expand Up @@ -2745,7 +2755,8 @@ maybe_dispatch_on_subscription(Transport,
SubscriptionProperties,
SendFileOct,
false = _Sac) ->
rabbit_log:debug("Distributing existing messages to subscription ~p",
rabbit_log:debug("Distributing existing messages to subscription "
"~tp",
[SubscriptionId]),
case send_chunks(DeliverVersion,
Transport,
Expand All @@ -2768,8 +2779,8 @@ maybe_dispatch_on_subscription(Transport,
ConsumerOffset = osiris_log:next_offset(Log1),
ConsumerOffsetLag = consumer_i(offset_lag, ConsumerState1),

rabbit_log:debug("Subscription ~p is now at offset ~p with ~p message(s) "
"distributed after subscription",
rabbit_log:debug("Subscription ~tp is now at offset ~tp with ~tp "
"message(s) distributed after subscription",
[SubscriptionId, ConsumerOffset,
messages_consumed(ConsumerCounters1)]),

Expand All @@ -2794,9 +2805,9 @@ maybe_dispatch_on_subscription(_Transport,
SubscriptionProperties,
_SendFileOct,
true = _Sac) ->
rabbit_log:debug("No initial dispatch for subscription ~p for now, "
"waiting for consumer update response from client "
"(single active consumer)",
rabbit_log:debug("No initial dispatch for subscription ~tp for "
"now, waiting for consumer update response from "
"client (single active consumer)",
[SubscriptionId]),
#consumer{credit = Credit,
configuration =
Expand Down Expand Up @@ -3624,6 +3635,7 @@ get_chunk_selector(Properties) ->
binary_to_atom(maps:get(<<"chunk_selector">>, Properties,
<<"user_data">>)).

close_log(undefined) -> ok;
close_log(undefined) ->
ok;
close_log(Log) ->
osiris_log:close(Log).
96 changes: 82 additions & 14 deletions deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ groups() ->
%% Run `test_global_counters` on its own so the global metrics are
%% initialised to 0 for each testcase
{single_node_1, [], [test_global_counters]},
{cluster, [], [test_stream, test_stream_tls, java]}].
{cluster, [], [test_stream, test_stream_tls, test_metadata, java]}].

init_per_suite(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
Expand Down Expand Up @@ -184,6 +184,76 @@ test_stream_tls(Config) ->
test_server(ssl, Stream, Config),
ok.

test_metadata(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
Transport = gen_tcp,
Port = get_stream_port(Config),
FirstNode = get_node_name(Config, 0),
NodeInMaintenance = get_node_name(Config, 1),
{ok, S} =
Transport:connect("localhost", Port,
[{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
C3 = test_create_stream(Transport, S, Stream, C2),
GetStreamNodes =
fun() ->
MetadataFrame =
rabbit_stream_core:frame({request, 1, {metadata, [Stream]}}),
ok = Transport:send(S, MetadataFrame),
{CmdMetadata, _} = receive_commands(Transport, S, C3),
{response, 1,
{metadata, _Nodes, #{Stream := {Leader = {_H, _P}, Replicas}}}} =
CmdMetadata,
[Leader | Replicas]
end,
rabbit_ct_helpers:await_condition(fun() ->
length(GetStreamNodes()) == 3
end),
rabbit_ct_broker_helpers:rpc(Config,
NodeInMaintenance,
rabbit_maintenance,
drain,
[]),

IsBeingDrained =
fun() ->
rabbit_ct_broker_helpers:rpc(Config,
FirstNode,
rabbit_maintenance,
is_being_drained_consistent_read,
[NodeInMaintenance])
end,
rabbit_ct_helpers:await_condition(fun() -> IsBeingDrained() end),

rabbit_ct_helpers:await_condition(fun() ->
length(GetStreamNodes()) == 2
end),

rabbit_ct_broker_helpers:rpc(Config,
NodeInMaintenance,
rabbit_maintenance,
revive,
[]),

rabbit_ct_helpers:await_condition(fun() -> IsBeingDrained() =:= false
end),

rabbit_ct_helpers:await_condition(fun() ->
length(GetStreamNodes()) == 3
end),

DeleteStreamFrame =
rabbit_stream_core:frame({request, 1, {delete_stream, Stream}}),
ok = Transport:send(S, DeleteStreamFrame),
{CmdDelete, C4} = receive_commands(Transport, S, C3),
?assertMatch({response, 1, {delete_stream, ?RESPONSE_CODE_OK}},
CmdDelete),
_C5 = test_close(Transport, S, C4),
closed = wait_for_socket_close(Transport, S, 10),
ok.

test_gc_consumers(Config) ->
Pid = spawn(fun() -> ok end),
rabbit_ct_broker_helpers:rpc(Config,
Expand Down Expand Up @@ -434,12 +504,13 @@ test_server(Transport, Stream, Config) ->
?awaitMatch(#{consumers := 1}, get_global_counters(Config), ?WAIT),
CounterKeys = maps:keys(get_osiris_counters(Config)),
%% find the counter key for the subscriber
{value, SubKey} = lists:search(fun ({rabbit_stream_reader, Q, Id, _}) ->
Q == QName andalso
Id == SubscriptionId;
(_) ->
false
end, CounterKeys),
{value, SubKey} =
lists:search(fun ({rabbit_stream_reader, Q, Id, _}) ->
Q == QName andalso Id == SubscriptionId;
(_) ->
false
end,
CounterKeys),
C8 = test_deliver(Transport, S, SubscriptionId, 0, Body, C7),
C8b = test_deliver(Transport, S, SubscriptionId, 1, Body, C8),

Expand Down Expand Up @@ -580,18 +651,14 @@ test_subscribe(Transport,
?assertMatch({response, 1, {subscribe, ?RESPONSE_CODE_OK}}, Cmd),
C.

test_unsubscribe(Transport,
Socket,
SubscriptionId, C0) ->
UnsubCmd = {request, 1,
{unsubscribe, SubscriptionId}},
test_unsubscribe(Transport, Socket, SubscriptionId, C0) ->
UnsubCmd = {request, 1, {unsubscribe, SubscriptionId}},
UnsubscribeFrame = rabbit_stream_core:frame(UnsubCmd),
ok = Transport:send(Socket, UnsubscribeFrame ),
ok = Transport:send(Socket, UnsubscribeFrame),
{Cmd, C} = receive_commands(Transport, Socket, C0),
?assertMatch({response, 1, {unsubscribe, ?RESPONSE_CODE_OK}}, Cmd),
C.


test_deliver(Transport, S, SubscriptionId, COffset, Body, C0) ->
ct:pal("test_deliver ", []),
{{deliver, SubscriptionId, Chunk}, C} =
Expand Down Expand Up @@ -708,6 +775,7 @@ get_osiris_counters(Config) ->
osiris_counters,
overview,
[]).

get_global_counters(Config) ->
maps:get([{protocol, stream}],
rabbit_ct_broker_helpers:rpc(Config,
Expand Down