Skip to content

Commit 2dc428f

Browse files
committed
Add lager backend that logs to amq.rabbitmq.log
Fixes #1456 I used the `lager_console_backend` as a model for creating `lager_rabbit_backend` Only try to declare amq.rabbitmq.log exchange once every five seconds as vhost will not be available immediately
1 parent e56392f commit 2dc428f

File tree

8 files changed

+311
-154
lines changed

8 files changed

+311
-154
lines changed

docs/rabbitmq.conf.example

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,14 @@
674674
##
675675
# log.dir = /var/log/rabbitmq
676676

677+
## Logging to rabbit amq.rabbitmq.log exchange (can be true or false)
678+
##
679+
# log.rabbit = false
680+
681+
## Loglevel to log to amq.rabbitmq.log exchange
682+
##
683+
# log.rabbit.level = info
684+
677685
## Logging to console (can be true or false)
678686
##
679687
# log.console = false

priv/schema/rabbit.schema

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,6 +1047,13 @@ end}.
10471047
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}}
10481048
]}.
10491049

1050+
{mapping, "log.rabbit", "rabbit.log.rabbit.enabled", [
1051+
{datatype, {enum, [true, false]}}
1052+
]}.
1053+
{mapping, "log.rabbit.level", "rabbit.log.rabbit.level", [
1054+
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}}
1055+
]}.
1056+
10501057
{mapping, "log.syslog", "rabbit.log.syslog.enabled", [
10511058
{datatype, {enum, [true, false]}}
10521059
]}.

src/lager_rabbit_backend.erl

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 1.1 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at http://www.mozilla.org/MPL/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is GoPivotal, Inc.
14+
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
15+
%%
16+
17+
%% @doc RabbitMQ backend for lager.
18+
%% Configuration is a proplist with the following keys:
19+
%% <ul>
20+
%% <li>`level' - log level to use</li>
21+
%% <li>`formatter' - the module to use when formatting log messages. Defaults to
22+
%% `lager_default_formatter'</li>
23+
%% <li>`formatter_config' - the format configuration string. Defaults to
24+
%% `time [ severity ] message'</li>
25+
%% </ul>
26+
27+
-module(lager_rabbit_backend).
28+
29+
-behaviour(gen_event).
30+
31+
-export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2,
32+
handle_info/2]).
33+
34+
-include("rabbit.hrl").
35+
-include("rabbit_framing.hrl").
36+
37+
-include_lib("lager/include/lager.hrl").
38+
39+
-record(state, {level :: {'mask', integer()},
40+
formatter :: atom(),
41+
format_config :: any(),
42+
init_exchange_ts = undefined :: rabbit_types:timestamp(),
43+
exchange = undefined :: #resource{}}).
44+
45+
-ifdef(TEST).
46+
-include_lib("eunit/include/eunit.hrl").
47+
-compile([{parse_transform, lager_transform}]).
48+
-endif.
49+
50+
-define(INIT_EXCHANGE_INTERVAL_SECS, 5).
51+
-define(TERSE_FORMAT, [time, " [", severity, "] ", message]).
52+
-define(DEFAULT_FORMAT_CONFIG, ?TERSE_FORMAT).
53+
-define(FORMAT_CONFIG_OFF, []).
54+
55+
-ifdef(TEST).
56+
-define(DEPRECATED(_Msg), ok).
57+
-else.
58+
-define(DEPRECATED(Msg),
59+
io:format(user, "WARNING: This is a deprecated lager_rabbit_backend configuration. Please use \"~w\" instead.~n", [Msg])).
60+
-endif.
61+
62+
-define(LOG_EXCH_NAME, <<"amq.rabbitmq.log">>).
63+
64+
%% @private
65+
init([Level]) when is_atom(Level) ->
66+
?DEPRECATED([{level, Level}]),
67+
init([{level, Level}]);
68+
init([Level, true]) when is_atom(Level) -> % for backwards compatibility
69+
?DEPRECATED([{level, Level}, {formatter_config, [{eol, "\\r\\n\\"}]}]),
70+
init([{level, Level}, {formatter_config, ?FORMAT_CONFIG_OFF}]);
71+
init([Level, false]) when is_atom(Level) -> % for backwards compatibility
72+
?DEPRECATED([{level, Level}]),
73+
init([{level, Level}]);
74+
75+
init(Options) when is_list(Options) ->
76+
true = validate_options(Options),
77+
Level = get_option(level, Options, undefined),
78+
try lager_util:config_to_mask(Level) of
79+
L ->
80+
DefaultOptions = [{formatter, lager_default_formatter},
81+
{formatter_config, ?DEFAULT_FORMAT_CONFIG}],
82+
[Formatter, Config] = [get_option(K, Options, Default) || {K, Default} <- DefaultOptions],
83+
State0 = #state{level=L,
84+
formatter=Formatter,
85+
format_config=Config},
86+
State1 = maybe_init_exchange(State0),
87+
{ok, State1}
88+
catch
89+
_:_ ->
90+
{error, {fatal, bad_log_level}}
91+
end;
92+
init(Level) when is_atom(Level) ->
93+
?DEPRECATED([{level, Level}]),
94+
init([{level, Level}]);
95+
init(Other) ->
96+
{error, {fatal, {bad_lager_rabbit_backend_config, Other}}}.
97+
98+
validate_options([]) -> true;
99+
validate_options([{level, L}|T]) when is_atom(L) ->
100+
case lists:member(L, ?LEVELS) of
101+
false ->
102+
throw({error, {fatal, {bad_level, L}}});
103+
true ->
104+
validate_options(T)
105+
end;
106+
validate_options([{formatter, M}|T]) when is_atom(M) ->
107+
validate_options(T);
108+
validate_options([{formatter_config, C}|T]) when is_list(C) ->
109+
validate_options(T);
110+
validate_options([H|_]) ->
111+
throw({error, {fatal, {bad_lager_rabbit_backend_config, H}}}).
112+
113+
get_option(K, Options, Default) ->
114+
case lists:keyfind(K, 1, Options) of
115+
{K, V} -> V;
116+
false -> Default
117+
end.
118+
119+
%% @private
120+
handle_call(get_loglevel, #state{level=Level} = State) ->
121+
{ok, Level, State};
122+
handle_call({set_loglevel, Level}, State) ->
123+
try lager_util:config_to_mask(Level) of
124+
Levels ->
125+
{ok, ok, State#state{level=Levels}}
126+
catch
127+
_:_ ->
128+
{ok, {error, bad_log_level}, State}
129+
end;
130+
handle_call(_Request, State) ->
131+
{ok, ok, State}.
132+
133+
%% @private
134+
handle_event({log, _Message} = Event, State0) ->
135+
State1 = maybe_init_exchange(State0),
136+
handle_log_event(Event, State1);
137+
handle_event(_Event, State) ->
138+
{ok, State}.
139+
140+
%% @private
141+
handle_log_event({log, _Message}, #state{exchange=undefined} = State) ->
142+
% NB: tried to define the exchange but still undefined,
143+
% so not logging this message. Note: we can't log this dropped
144+
% message because it will start an infinite loop
145+
{ok, State};
146+
handle_log_event({log, Message},
147+
#state{level=L, exchange=LogExch,
148+
formatter=Formatter, format_config=FormatConfig} = State) ->
149+
case lager_util:is_loggable(Message, L, ?MODULE) of
150+
true ->
151+
%% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's
152+
%% second resolution, not millisecond.
153+
RoutingKey = rabbit_data_coercion:to_binary(lager_msg:severity(Message)),
154+
Timestamp = os:system_time(seconds),
155+
Node = rabbit_data_coercion:to_binary(node()),
156+
Headers = [{<<"node">>, longstr, Node}],
157+
AmqpMsg = #'P_basic'{content_type = <<"text/plain">>,
158+
timestamp = Timestamp,
159+
headers = Headers},
160+
Body = rabbit_data_coercion:to_binary(Formatter:format(Message, FormatConfig)),
161+
case rabbit_basic:publish(LogExch, RoutingKey, AmqpMsg, Body) of
162+
{ok, _DeliveredQPids} -> ok;
163+
{error, not_found} -> ok
164+
end,
165+
{ok, State};
166+
false ->
167+
{ok, State}
168+
end.
169+
170+
%% @private
171+
handle_info(_Info, State) ->
172+
{ok, State}.
173+
174+
%% @private
175+
terminate(_Reason, _State) ->
176+
ok.
177+
178+
%% @private
179+
code_change(_OldVsn, State, _Extra) ->
180+
{ok, State}.
181+
182+
%% @private
183+
maybe_init_exchange(#state{exchange=undefined, init_exchange_ts=undefined} = State) ->
184+
Now = erlang:monotonic_time(second),
185+
handle_init_exchange(init_exchange(true), Now, State);
186+
maybe_init_exchange(#state{exchange=undefined, init_exchange_ts=Timestamp} = State) ->
187+
Now = erlang:monotonic_time(second),
188+
Result = init_exchange(Now - Timestamp > ?INIT_EXCHANGE_INTERVAL_SECS),
189+
handle_init_exchange(Result, Now, State);
190+
maybe_init_exchange(State) ->
191+
State.
192+
193+
%% @private
194+
init_exchange(true) ->
195+
{ok, DefaultVHost} = application:get_env(rabbit, default_vhost),
196+
VHost = rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
197+
try
198+
#exchange{} = rabbit_exchange:declare(VHost, topic, true, false, true, [], ?INTERNAL_USER),
199+
{ok, #resource{virtual_host=DefaultVHost, kind=exchange, name=?LOG_EXCH_NAME}}
200+
catch
201+
ErrType:Err ->
202+
rabbit_log:debug("Could not initialize exchange '~s' in vhost '~s', reason: ~p:~p",
203+
[?LOG_EXCH_NAME, DefaultVHost, ErrType, Err]),
204+
{ok, undefined}
205+
end;
206+
init_exchange(_) ->
207+
{ok, undefined}.
208+
209+
%% @private
210+
handle_init_exchange({ok, undefined}, Now, State) ->
211+
State#state{init_exchange_ts=Now};
212+
handle_init_exchange({ok, Exchange}, Now, State) ->
213+
State#state{exchange=Exchange, init_exchange_ts=Now}.
214+
215+
-ifdef(TEST).
216+
console_config_validation_test_() ->
217+
Good = [{level, info}],
218+
Bad1 = [{level, foo}],
219+
Bad2 = [{larval, info}],
220+
AllGood = [{level, info}, {formatter, my_formatter},
221+
{formatter_config, ["blort", "garbage"]}],
222+
[
223+
?_assertEqual(true, validate_options(Good)),
224+
?_assertThrow({error, {fatal, {bad_level, foo}}}, validate_options(Bad1)),
225+
?_assertThrow({error, {fatal, {bad_lager_rabbit_backend_config, {larval, info}}}}, validate_options(Bad2)),
226+
?_assertEqual(true, validate_options(AllGood))
227+
].
228+
-endif.

src/rabbit.erl

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -173,30 +173,19 @@
173173
[{description, "message delivery logic ready"},
174174
{requires, core_initialized}]}).
175175

176-
-rabbit_boot_step({log_relay,
177-
[{description, "error log relay"},
178-
{mfa, {rabbit_sup, start_child,
179-
[rabbit_error_logger_lifecycle,
180-
supervised_lifecycle,
181-
[rabbit_error_logger_lifecycle,
182-
{rabbit_error_logger, start, []},
183-
{rabbit_error_logger, stop, []}]]}},
184-
{requires, routing_ready},
185-
{enables, networking}]}).
186-
187176
-rabbit_boot_step({direct_client,
188177
[{description, "direct client"},
189178
{mfa, {rabbit_direct, boot, []}},
190-
{requires, log_relay}]}).
179+
{requires, routing_ready}]}).
191180

192181
-rabbit_boot_step({connection_tracking,
193182
[{description, "sets up internal storage for node-local connections"},
194183
{mfa, {rabbit_connection_tracking, boot, []}},
195-
{requires, log_relay}]}).
184+
{requires, routing_ready}]}).
196185

197186
-rabbit_boot_step({networking,
198187
[{mfa, {rabbit_networking, boot, []}},
199-
{requires, log_relay}]}).
188+
{requires, routing_ready}]}).
200189

201190
-rabbit_boot_step({notify_cluster,
202191
[{description, "notify cluster nodes"},

0 commit comments

Comments
 (0)