Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -1674,6 +1674,24 @@ end}.
% Logging section
% ==========================

{mapping, "log.summarize_process_state", "rabbit.summarize_process_state_when_logged", [
{datatype, {enum, [true, false]}}
]}.

{mapping, "log.error_logger_format_depth", "kernel.error_logger_format_depth", [
{datatype, [{atom, unlimited}, integer]}
]}.
{translation, "kernel.error_logger_format_depth",
fun(Conf) ->
case cuttlefish:conf_get("log.error_logger_format_depth", Conf, undefined) of
undefined -> cuttlefish:unset();
unlimited -> unlimited;
Val when is_integer(Val) andalso Val > 0 -> Val;
_ -> cuttlefish:invalid("should be positive integer or 'unlimited'")
end
end
}.

{mapping, "log.dir", "rabbit.log_root", [
{datatype, string},
{validators, ["dir_writable"]}]}.
Expand Down
17 changes: 15 additions & 2 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
-export([start_link/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
prioritise_cast/3, prioritise_info/3, format_state/1, format_message_queue/2]).
-export([format/1]).
-export([is_policy_applicable/2]).

Expand Down Expand Up @@ -298,7 +298,7 @@ init_with_backing_queue_state(Q, BQ, BQS,
notify_decorators(startup, State3),
State3.

terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) ->
terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) ->
rabbit_core_metrics:queue_deleted(qname(State)),
terminate_shutdown(
fun (BQS) ->
Expand Down Expand Up @@ -1746,6 +1746,9 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
#q.stats_timer),
{hibernate, stop_rate_timer(State1)}.

format_state(#q{}=S) ->
maybe_format_backing_queue_state(S).

format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).

%% TODO: this can be removed after 3.13
Expand Down Expand Up @@ -1787,3 +1790,13 @@ queue_created_infos(State) ->
%% On the events API, we use long names for queue types
Keys = ?CREATION_EVENT_KEYS -- [type],
infos(Keys, State) ++ [{type, rabbit_classic_queue}].

maybe_format_backing_queue_state(S = #q{backing_queue = BQ,
backing_queue_state = BQS0}) ->
case erlang:function_exported(BQ, format_state, 1) of
true ->
BQS1 = BQ:format_state(BQS0),
S#q{backing_queue_state = BQS1};
_ ->
S#q{backing_queue_state = backing_queue_state_truncated}
end.
13 changes: 12 additions & 1 deletion deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1,
prioritise_call/4, prioritise_cast/3, prioritise_info/3,
format_message_queue/2]).
format_state/1, format_message_queue/2]).

%% Internal
-export([list_local/0, emit_info_local/3, deliver_reply_local/3]).
Expand Down Expand Up @@ -806,6 +806,17 @@ terminate(_Reason,
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

format_state(#ch{} = S) ->
format_state(application:get_env(rabbit, summarize_process_state_when_logged, false), S).

format_state(false, #ch{} = S) ->
S;
format_state(true, #ch{unacked_message_q = UAMQ} = S) ->
UAMQLen = ?QUEUE:len(UAMQ),
Msg0 = io_lib:format("unacked_message_q (~b elements) (truncated)", [UAMQLen]),
Msg1 = rabbit_data_coercion:to_utf8_binary(Msg0),
S#ch{unacked_message_q = Msg1}.

format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).

get_consumer_timeout() ->
Expand Down
35 changes: 32 additions & 3 deletions deps/rabbit/src/rabbit_classic_queue_index_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
%% Shared with rabbit_classic_queue_store_v2.
-export([queue_dir/2]).

%% Used to format the state and summarize large amounts of data in
%% the state.
-export([format_state/1]).

%% Internal. Used by tests.
-export([segment_file/2]).

Expand Down Expand Up @@ -72,7 +76,11 @@
%% If not, write everything. This allows the reader
%% to continue reading from memory if it is fast
%% enough to keep up with the producer.
write_buffer = #{} :: #{rabbit_variable_queue:seq_id() => entry() | ack},
%%
%% Note that the list type is only for when this state is
%% formatted as part of a crash dump.
write_buffer = #{} :: #{rabbit_variable_queue:seq_id() => entry() | ack} |
list(rabbit_variable_queue:seq_id()),

%% The number of entries in the write buffer that
%% refer to an update to the file, rather than a
Expand All @@ -91,7 +99,11 @@
%% replaces the cache entirely. When only acks are flushed,
%% then the cache gets updated: old acks are removed and
%% new acks are added to the cache.
cache = #{} :: #{rabbit_variable_queue:seq_id() => entry() | ack},
%%
%% Note that the list type is only for when this state is
%% formatted as part of a crash dump.
cache = #{} :: #{rabbit_variable_queue:seq_id() => entry() | ack} |
list(rabbit_variable_queue:seq_id()),

%% Messages waiting for publisher confirms. The
%% publisher confirms will be sent when the message
Expand All @@ -102,7 +114,10 @@
%% and there are outstanding unconfirmed messages.
%% In that case the buffer is flushed to disk when
%% the queue requests a sync (after a timeout).
confirms = sets:new([{version,2}]) :: sets:set(),
%%
%% Note that the binary type is only for when this state is
%% formatted as part of a crash dump.
confirms = sets:new([{version,2}]) :: sets:set() | binary(),

%% Segments we currently know of along with the
%% number of unacked messages remaining in the
Expand Down Expand Up @@ -1285,3 +1300,17 @@ write_file_and_ensure_dir(Name, IOData) ->
end;
Err -> Err
end.

-spec format_state(State) -> State when State::state().

format_state(#qi{write_buffer = WriteBuffer,
cache = Cache,
confirms = Confirms} = S) ->
ConfirmsSize = sets:size(Confirms),
S#qi{write_buffer = maps:keys(WriteBuffer),
cache = maps:keys(Cache),
confirms = format_state_element(confirms, ConfirmsSize)}.

format_state_element(Element, Size) when is_atom(Element), is_integer(Size) ->
rabbit_data_coercion:to_utf8_binary(
io_lib:format("~tp (~b elements) (truncated)", [Element, Size])).
17 changes: 13 additions & 4 deletions deps/rabbit/src/rabbit_classic_queue_store_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

-export([init/1, terminate/1, info/1,
write/4, sync/1, read/3, read_many/2, check_msg_on_disk/3,
remove/2, delete_segments/2]).
remove/2, delete_segments/2, format_state/1]).

-define(SEGMENT_EXTENSION, ".qs").

Expand All @@ -67,9 +67,11 @@

-type buffer() :: #{
%% SeqId => {Offset, Size, Msg}
rabbit_variable_queue:seq_id() => {non_neg_integer(), non_neg_integer(), mc:state()}
rabbit_variable_queue:seq_id() => {non_neg_integer(), non_neg_integer(), mc:state()}
}.

-type buffer_fmt() :: list(rabbit_variable_queue:seq_id()).

-record(qs, {
%% Store directory - same as the queue index.
%% Stored as binary() as opposed to file:filename() to save memory.
Expand All @@ -86,14 +88,14 @@

%% We must keep the offset, expected size and message in order
%% to write the message.
write_buffer = #{} :: buffer(),
write_buffer = #{} :: buffer() | buffer_fmt(),
write_buffer_size = 0 :: non_neg_integer(),

%% We keep a cache of messages for faster reading
%% for the cases where consumers can keep up with
%% producers. The write_buffer becomes the cache
%% when it is written to disk.
cache = #{} :: buffer(),
cache = #{} :: buffer() | buffer_fmt(),

%% Similarly, we keep track of a single read fd.
%% We cannot share this fd with the write fd because
Expand Down Expand Up @@ -572,3 +574,10 @@ check_crc32() ->
segment_file(Segment, #qs{dir = Dir}) ->
N = integer_to_binary(Segment),
<<Dir/binary, N/binary, ?SEGMENT_EXTENSION>>.

-spec format_state(State) -> State when State::state().

format_state(#qs{write_buffer = WriteBuffer,
cache = Cache} = S) ->
S#qs{write_buffer = maps:keys(WriteBuffer),
cache = maps:keys(Cache)}.
12 changes: 11 additions & 1 deletion deps/rabbit/src/rabbit_priority_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
handle_pre_hibernate/1, resume/1, msg_rates/1,
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
set_queue_version/2,
zip_msgs_and_acks/4]).
zip_msgs_and_acks/4,
format_state/1]).

-record(state, {bq, bqss, max_priority}).
-record(passthrough, {bq, bqs}).
Expand Down Expand Up @@ -663,3 +664,12 @@ zip_msgs_and_acks(Pubs, AckTags) ->
Id = mc:get_annotation(id, Msg),
{Id, AckTag}
end, Pubs, AckTags).

format_state(S = #passthrough{bq = BQ, bqs = BQS0}) ->
case erlang:function_exported(BQ, format_state, 1) of
true ->
BQS1 = BQ:format_state(BQS0),
S#passthrough{bqs = BQS1};
_ ->
S#passthrough{bqs = passthrough_bqs_truncated}
end.
22 changes: 21 additions & 1 deletion deps/rabbit/src/rabbit_variable_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
update_rates/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
set_queue_version/2, zip_msgs_and_acks/4]).
set_queue_version/2, zip_msgs_and_acks/4,
format_state/1]).

-export([start/2, stop/1]).

Expand Down Expand Up @@ -2435,3 +2436,22 @@ maybe_client_terminate(MSCStateP) ->
_:_ ->
ok
end.

format_state(#vqstate{} = S) ->
format_state(application:get_env(rabbit, summarize_process_state_when_logged, false), S).

format_state(false, #vqstate{} = S) ->
S;
format_state(true, #vqstate{q3 = Q3,
ram_pending_ack = RamPendingAck,
disk_pending_ack = DiskPendingAck,
index_state = IndexState,
store_state = StoreState} = S) ->
S#vqstate{q3 = format_q3(Q3),
ram_pending_ack = maps:keys(RamPendingAck),
disk_pending_ack = maps:keys(DiskPendingAck),
index_state = rabbit_classic_queue_index_v2:format_state(IndexState),
store_state = rabbit_classic_queue_store_v2:format_state(StoreState)}.

format_q3(Q3) ->
[SeqId || #msg_status{seq_id = SeqId} <- ?QUEUE:to_list(Q3)].
24 changes: 17 additions & 7 deletions deps/rabbit_common/src/gen_server2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1150,14 +1150,15 @@ print_event(Dev, Event, Name) ->

terminate(Reason, Msg, #gs2_state { name = Name,
mod = Mod,
state = State,
state = ModState0,
debug = Debug,
stop_stats_fun = StopStatsFun
} = GS2State) ->
StopStatsFun(stop_stats_timer(GS2State)),
case catch Mod:terminate(Reason, State) of
case catch Mod:terminate(Reason, ModState0) of
{'EXIT', R} ->
error_info(R, Reason, Name, Msg, State, Debug),
ModState1 = maybe_format_state(Mod, ModState0),
error_info(R, Reason, Name, Msg, ModState1, Debug),
exit(R);
_ ->
case Reason of
Expand All @@ -1168,28 +1169,37 @@ terminate(Reason, Msg, #gs2_state { name = Name,
{shutdown,_}=Shutdown ->
exit(Shutdown);
_ ->
error_info(Reason, undefined, Name, Msg, State, Debug),
ModState1 = maybe_format_state(Mod, ModState0),
error_info(Reason, undefined, Name, Msg, ModState1, Debug),
exit(Reason)
end
end.

maybe_format_state(M, ModState) ->
case erlang:function_exported(M, format_state, 1) of
true ->
M:format_state(ModState);
false ->
ModState
end.

error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) ->
%% OTP-5811 Don't send an error report if it's the system process
%% application_controller which is terminating - let init take care
%% of it instead
ok;
error_info(Reason, RootCause, Name, Msg, State, Debug) ->
error_info(Reason, RootCause, Name, Msg, ModState, Debug) ->
Reason1 = error_reason(Reason),
Fmt =
"** Generic server ~tp terminating~n"
"** Last message in was ~tp~n"
"** When Server state == ~tp~n"
"** Reason for termination == ~n** ~tp~n",
case RootCause of
undefined -> format(Fmt, [Name, Msg, State, Reason1]);
undefined -> format(Fmt, [Name, Msg, ModState, Reason1]);
_ -> format(Fmt ++ "** In 'terminate' callback "
"with reason ==~n** ~tp~n",
[Name, Msg, State, Reason1,
[Name, Msg, ModState, Reason1,
error_reason(RootCause)])
end,
sys:print_log(Debug),
Expand Down
Loading