Skip to content

Commit 557fc27

Browse files
Merge pull request #14808 from rabbitmq/mergify/bp/v4.1.x/pr-14804
For 4.1.6: rabbit_logger_exchange_h: Do not re-enter itself (backport #14796) (backport #14804)
2 parents d227957 + 8a1a2fe commit 557fc27

File tree

2 files changed

+67
-15
lines changed

2 files changed

+67
-15
lines changed

deps/rabbit/src/rabbit_logger_exchange_h.erl

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,16 @@ filter_config(Config) ->
4343
log(#{meta := #{mfa := {?MODULE, _, _}}}, _) ->
4444
ok;
4545
log(LogEvent, Config) ->
46+
%% Publishing the log message to an exchange might trigger more logging,
47+
%% triggering an infinite logging loop. To prevent that, we make use the
48+
%% process dictionary to record the fact that this logger was already
49+
%% entered. If that's the case when this function is called, we just drop
50+
%% the log event.
51+
Key = ?MODULE,
52+
ReEntered = erlang:get(Key) =/= undefined,
4653
case rabbit_boot_state:get() of
47-
ready ->
54+
ready when not ReEntered ->
55+
erlang:put(Key, ?FUNCTION_NAME),
4856
try
4957
do_log(LogEvent, Config)
5058
catch
@@ -53,22 +61,30 @@ log(LogEvent, Config) ->
5361
%% removes the logger_exchange handler, which in
5462
%% turn deletes the log exchange and its bindings
5563
erlang:display({?MODULE, crashed, {C, R, S}})
64+
after
65+
erlang:erase(Key)
5666
end,
5767
ok;
5868
_ -> ok
5969
end.
6070

61-
do_log(LogEvent, #{config := #{exchange := Exchange}} = Config) ->
71+
do_log(
72+
LogEvent,
73+
#{config := #{exchange := Exchange,
74+
setup_proc := Pid}} = Config) ->
6275
RoutingKey = make_routing_key(LogEvent, Config),
6376
PBasic = log_event_to_amqp_msg(LogEvent, Config),
6477
Body = try_format_body(LogEvent, Config),
6578
Content = rabbit_basic:build_content(PBasic, Body),
6679
case mc_amqpl:message(Exchange, RoutingKey, Content) of
6780
{ok, Msg} ->
68-
case rabbit_queue_type:publish_at_most_once(Exchange, Msg) of
69-
ok -> ok;
70-
{error, not_found} -> ok
71-
end;
81+
%% Publishing a message might involve a Erlang process, like a Ra
82+
%% server process, to log something and call itself. We need to
83+
%% publish the message asynchronously from a separate process and
84+
%% ignore the fate of that publish, to not block an Erlang
85+
%% process.
86+
Pid ! {publish, Msg},
87+
ok;
7288
{error, _Reason} ->
7389
%% it would be good to log this error but can we?
7490
ok
@@ -164,12 +180,19 @@ wait_for_initial_pass(N) ->
164180
end.
165181

166182
setup_proc(
167-
#{config := #{exchange := Exchange}} = Config) ->
183+
#{id := Id,
184+
config := #{exchange := Exchange}} = Config) ->
185+
%% We register this process using the logger handler ID. It makes
186+
%% debugging convenient but it's not critical. That's why we catch any
187+
%% exceptions and ignore the return value.
188+
_ = catch erlang:register(Id, self()),
189+
168190
case declare_exchange(Config) of
169191
ok ->
170192
?LOG_INFO(
171193
"Logging to ~ts ready", [rabbit_misc:rs(Exchange)],
172-
#{domain => ?RMQLOG_DOMAIN_GLOBAL});
194+
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
195+
loop(Config);
173196
error ->
174197
?LOG_DEBUG(
175198
"Logging to ~ts not ready, trying again in ~b second(s)",
@@ -182,6 +205,15 @@ setup_proc(
182205
end
183206
end.
184207

208+
loop(#{config := #{exchange := Exchange}} = Config) ->
209+
receive
210+
{publish, Msg} ->
211+
_ = rabbit_queue_type:publish_at_most_once(Exchange, Msg),
212+
loop(Config);
213+
stop ->
214+
ok
215+
end.
216+
185217
declare_exchange(#{config := #{exchange := Exchange}}) ->
186218
try rabbit_exchange:declare(
187219
Exchange, topic, true, false, true, [], ?INTERNAL_USER) of

deps/rabbit/test/logging_SUITE.erl

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353

5454
logging_to_exchange_works/1,
5555
update_log_exchange_config/1,
56+
use_exchange_logger_when_enabling_khepri_db/1,
5657

5758
logging_to_syslog_works/1]).
5859

@@ -99,7 +100,8 @@ groups() ->
99100

100101
{exchange_output, [],
101102
[logging_to_exchange_works,
102-
update_log_exchange_config]},
103+
update_log_exchange_config,
104+
use_exchange_logger_when_enabling_khepri_db]},
103105

104106
{syslog_output, [],
105107
[logging_to_syslog_works]}
@@ -150,17 +152,27 @@ init_per_testcase(Testcase, Config) ->
150152
%% group will run in the context of that RabbitMQ node.
151153
exchange_output ->
152154
ExchProps = [{enabled, true},
153-
{level, debug}] ,
155+
{level, debug}],
154156
Config1 = rabbit_ct_helpers:set_config(
155157
Config,
156-
[{rmq_nodes_count, 1},
157-
{rmq_nodename_suffix, Testcase}]),
158-
Config2 = rabbit_ct_helpers:merge_app_env(
159-
Config1,
158+
[{rmq_nodename_suffix, Testcase}]),
159+
Config2 = case Testcase of
160+
use_exchange_logger_when_enabling_khepri_db ->
161+
rabbit_ct_helpers:set_config(
162+
Config1,
163+
[{rmq_nodes_count, 3},
164+
{metadata_store, mnesia}]);
165+
_ ->
166+
rabbit_ct_helpers:set_config(
167+
Config1,
168+
[{rmq_nodes_count, 1}])
169+
end,
170+
Config3 = rabbit_ct_helpers:merge_app_env(
171+
Config2,
160172
{rabbit, [{log, [{exchange, ExchProps},
161173
{file, [{level, debug}]}]}]}),
162174
rabbit_ct_helpers:run_steps(
163-
Config2,
175+
Config3,
164176
rabbit_ct_broker_helpers:setup_steps() ++
165177
rabbit_ct_client_helpers:setup_steps());
166178

@@ -1102,6 +1114,14 @@ update_log_exchange_config(Config) ->
11021114
?assertEqual(HandlerConfig1, HandlerConfig2),
11031115
ok.
11041116

1117+
use_exchange_logger_when_enabling_khepri_db(Config) ->
1118+
?assertNot(rabbit_ct_broker_helpers:rpc(
1119+
Config, 0,
1120+
rabbit_feature_flags, is_enabled, [khepri_db])),
1121+
?assertEqual(
1122+
ok,
1123+
rabbit_ct_broker_helpers:enable_feature_flag(Config, khepri_db)).
1124+
11051125
logging_to_syslog_works(Config) ->
11061126
Context = default_context(Config),
11071127
ok = application:set_env(

0 commit comments

Comments
 (0)