diff --git a/deps/rabbit/src/rabbit_logger_exchange_h.erl b/deps/rabbit/src/rabbit_logger_exchange_h.erl index 067b1c6d3ff..6f6fbed4044 100644 --- a/deps/rabbit/src/rabbit_logger_exchange_h.erl +++ b/deps/rabbit/src/rabbit_logger_exchange_h.erl @@ -43,8 +43,16 @@ filter_config(Config) -> log(#{meta := #{mfa := {?MODULE, _, _}}}, _) -> ok; log(LogEvent, Config) -> + %% Publishing the log message to an exchange might trigger more logging, + %% triggering an infinite logging loop. To prevent that, we make use the + %% process dictionary to record the fact that this logger was already + %% entered. If that's the case when this function is called, we just drop + %% the log event. + Key = ?MODULE, + ReEntered = erlang:get(Key) =/= undefined, case rabbit_boot_state:get() of - ready -> + ready when not ReEntered -> + erlang:put(Key, ?FUNCTION_NAME), try do_log(LogEvent, Config) catch @@ -53,22 +61,30 @@ log(LogEvent, Config) -> %% removes the logger_exchange handler, which in %% turn deletes the log exchange and its bindings erlang:display({?MODULE, crashed, {C, R, S}}) + after + erlang:erase(Key) end, ok; _ -> ok end. -do_log(LogEvent, #{config := #{exchange := Exchange}} = Config) -> +do_log( + LogEvent, + #{config := #{exchange := Exchange, + setup_proc := Pid}} = Config) -> RoutingKey = make_routing_key(LogEvent, Config), PBasic = log_event_to_amqp_msg(LogEvent, Config), Body = try_format_body(LogEvent, Config), Content = rabbit_basic:build_content(PBasic, Body), case mc_amqpl:message(Exchange, RoutingKey, Content) of {ok, Msg} -> - case rabbit_queue_type:publish_at_most_once(Exchange, Msg) of - ok -> ok; - {error, not_found} -> ok - end; + %% Publishing a message might involve a Erlang process, like a Ra + %% server process, to log something and call itself. We need to + %% publish the message asynchronously from a separate process and + %% ignore the fate of that publish, to not block an Erlang + %% process. + Pid ! {publish, Msg}, + ok; {error, _Reason} -> %% it would be good to log this error but can we? ok @@ -164,12 +180,19 @@ wait_for_initial_pass(N) -> end. setup_proc( - #{config := #{exchange := Exchange}} = Config) -> + #{id := Id, + config := #{exchange := Exchange}} = Config) -> + %% We register this process using the logger handler ID. It makes + %% debugging convenient but it's not critical. That's why we catch any + %% exceptions and ignore the return value. + _ = catch erlang:register(Id, self()), + case declare_exchange(Config) of ok -> ?LOG_INFO( "Logging to ~ts ready", [rabbit_misc:rs(Exchange)], - #{domain => ?RMQLOG_DOMAIN_GLOBAL}); + #{domain => ?RMQLOG_DOMAIN_GLOBAL}), + loop(Config); error -> ?LOG_DEBUG( "Logging to ~ts not ready, trying again in ~b second(s)", @@ -182,6 +205,15 @@ setup_proc( end end. +loop(#{config := #{exchange := Exchange}} = Config) -> + receive + {publish, Msg} -> + _ = rabbit_queue_type:publish_at_most_once(Exchange, Msg), + loop(Config); + stop -> + ok + end. + declare_exchange(#{config := #{exchange := Exchange}}) -> try rabbit_exchange:declare( Exchange, topic, true, false, true, [], ?INTERNAL_USER) of diff --git a/deps/rabbit/test/logging_SUITE.erl b/deps/rabbit/test/logging_SUITE.erl index abd374ec01e..c5aace9a7d4 100644 --- a/deps/rabbit/test/logging_SUITE.erl +++ b/deps/rabbit/test/logging_SUITE.erl @@ -53,6 +53,7 @@ logging_to_exchange_works/1, update_log_exchange_config/1, + use_exchange_logger_when_enabling_khepri_db/1, logging_to_syslog_works/1]). @@ -99,7 +100,8 @@ groups() -> {exchange_output, [], [logging_to_exchange_works, - update_log_exchange_config]}, + update_log_exchange_config, + use_exchange_logger_when_enabling_khepri_db]}, {syslog_output, [], [logging_to_syslog_works]} @@ -150,17 +152,27 @@ init_per_testcase(Testcase, Config) -> %% group will run in the context of that RabbitMQ node. exchange_output -> ExchProps = [{enabled, true}, - {level, debug}] , + {level, debug}], Config1 = rabbit_ct_helpers:set_config( Config, - [{rmq_nodes_count, 1}, - {rmq_nodename_suffix, Testcase}]), - Config2 = rabbit_ct_helpers:merge_app_env( - Config1, + [{rmq_nodename_suffix, Testcase}]), + Config2 = case Testcase of + use_exchange_logger_when_enabling_khepri_db -> + rabbit_ct_helpers:set_config( + Config1, + [{rmq_nodes_count, 3}, + {metadata_store, mnesia}]); + _ -> + rabbit_ct_helpers:set_config( + Config1, + [{rmq_nodes_count, 1}]) + end, + Config3 = rabbit_ct_helpers:merge_app_env( + Config2, {rabbit, [{log, [{exchange, ExchProps}, {file, [{level, debug}]}]}]}), rabbit_ct_helpers:run_steps( - Config2, + Config3, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()); @@ -1102,6 +1114,14 @@ update_log_exchange_config(Config) -> ?assertEqual(HandlerConfig1, HandlerConfig2), ok. +use_exchange_logger_when_enabling_khepri_db(Config) -> + ?assertNot(rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_feature_flags, is_enabled, [khepri_db])), + ?assertEqual( + ok, + rabbit_ct_broker_helpers:enable_feature_flag(Config, khepri_db)). + logging_to_syslog_works(Config) -> Context = default_context(Config), ok = application:set_env(