30
30
-export ([list /0 , list /1 , info_keys /0 , info /1 , info /2 , info_all /1 , info_all /2 ,
31
31
emit_info_all /5 , list_local /1 , info_local /1 ,
32
32
emit_info_local /4 , emit_info_down /4 ]).
33
- -export ([list_down /1 , count /1 , list_names /0 , list_names /1 , list_local_names /0 ]).
33
+ -export ([list_down /1 , count /1 , list_names /0 , list_names /1 , list_local_names /0 ,
34
+ list_with_possible_retry /1 ]).
34
35
-export ([list_by_type /1 ]).
35
36
-export ([notify_policy_changed /1 ]).
36
37
-export ([consumers /1 , consumers_all /1 , emit_consumers_all /4 , consumer_info_keys /0 ]).
@@ -297,6 +298,11 @@ start(Qs) ->
297
298
ok .
298
299
299
300
mark_local_durable_queues_stopped (VHost ) ->
301
+ ? try_mnesia_tx_or_upgrade_amqqueue_and_retry (
302
+ do_mark_local_durable_queues_stopped (VHost ),
303
+ do_mark_local_durable_queues_stopped (VHost )).
304
+
305
+ do_mark_local_durable_queues_stopped (VHost ) ->
300
306
Qs = find_local_durable_classic_queues (VHost ),
301
307
rabbit_misc :execute_mnesia_transaction (
302
308
fun () ->
@@ -426,13 +432,21 @@ get_queue_type(Args) ->
426
432
erlang :binary_to_existing_atom (V , utf8 )
427
433
end .
428
434
429
- internal_declare (Q , true ) ->
435
+ internal_declare (Q , Recover ) ->
436
+ ? try_mnesia_tx_or_upgrade_amqqueue_and_retry (
437
+ do_internal_declare (Q , Recover ),
438
+ begin
439
+ Q1 = amqqueue :upgrade (Q ),
440
+ do_internal_declare (Q1 , Recover )
441
+ end ).
442
+
443
+ do_internal_declare (Q , true ) ->
430
444
rabbit_misc :execute_mnesia_tx_with_tail (
431
445
fun () ->
432
446
ok = store_queue (amqqueue :set_state (Q , live )),
433
447
rabbit_misc :const ({created , Q })
434
448
end );
435
- internal_declare (Q , false ) ->
449
+ do_internal_declare (Q , false ) ->
436
450
QueueName = amqqueue :get_name (Q ),
437
451
rabbit_misc :execute_mnesia_tx_with_tail (
438
452
fun () ->
@@ -468,6 +482,14 @@ update(Name, Fun) ->
468
482
% % only really used for quorum queues to ensure the rabbit_queue record
469
483
% % is initialised
470
484
ensure_rabbit_queue_record_is_initialized (Q ) ->
485
+ ? try_mnesia_tx_or_upgrade_amqqueue_and_retry (
486
+ do_ensure_rabbit_queue_record_is_initialized (Q ),
487
+ begin
488
+ Q1 = amqqueue :upgrade (Q ),
489
+ do_ensure_rabbit_queue_record_is_initialized (Q1 )
490
+ end ).
491
+
492
+ do_ensure_rabbit_queue_record_is_initialized (Q ) ->
471
493
rabbit_misc :execute_mnesia_tx_with_tail (
472
494
fun () ->
473
495
ok = store_queue (Q ),
@@ -794,7 +816,11 @@ check_queue_type({Type, _}, _Args) ->
794
816
{error , {unacceptable_type , Type }}.
795
817
796
818
797
- list () -> mnesia :dirty_match_object (rabbit_queue , amqqueue :pattern_match_all ()).
819
+ list () ->
820
+ list_with_possible_retry (fun do_list /0 ).
821
+
822
+ do_list () ->
823
+ mnesia :dirty_match_object (rabbit_queue , amqqueue :pattern_match_all ()).
798
824
799
825
list_names () -> mnesia :dirty_all_keys (rabbit_queue ).
800
826
@@ -828,9 +854,12 @@ is_local_to_node({_, Leader} = QPid, Node) when ?IS_QUORUM(QPid) ->
828
854
list (VHostPath ) ->
829
855
list (VHostPath , rabbit_queue ).
830
856
857
+ list (VHostPath , TableName ) ->
858
+ list_with_possible_retry (fun () -> do_list (VHostPath , TableName ) end ).
859
+
831
860
% % Not dirty_match_object since that would not be transactional when used in a
832
861
% % tx context
833
- list (VHostPath , TableName ) ->
862
+ do_list (VHostPath , TableName ) ->
834
863
mnesia :async_dirty (
835
864
fun () ->
836
865
mnesia :match_object (
@@ -839,6 +868,38 @@ list(VHostPath, TableName) ->
839
868
read )
840
869
end ).
841
870
871
+ list_with_possible_retry (Fun ) ->
872
+ % % amqqueue migration:
873
+ % % The `rabbit_queue` or `rabbit_durable_queue` tables
874
+ % % might be migrated between the time we query the pattern
875
+ % % (with the `amqqueue` module) and the time we call
876
+ % % `mnesia:dirty_match_object()`. This would lead to an empty list
877
+ % % (no object matching the now incorrect pattern), not a Mnesia
878
+ % % error.
879
+ % %
880
+ % % So if the result is an empty list and the version of the
881
+ % % `amqqueue` record changed in between, we retry the operation.
882
+ % %
883
+ % % However, we don't do this if inside a Mnesia transaction: we
884
+ % % could end up with a live lock between this started transaction
885
+ % % and the Mnesia table migration which is blocked (but the
886
+ % % rabbit_feature_flags lock is held).
887
+ AmqqueueRecordVersion = amqqueue :record_version_to_use (),
888
+ case Fun () of
889
+ [] ->
890
+ case mnesia :is_transaction () of
891
+ true ->
892
+ [];
893
+ false ->
894
+ case amqqueue :record_version_to_use () of
895
+ AmqqueueRecordVersion -> [];
896
+ _ -> Fun ()
897
+ end
898
+ end ;
899
+ Ret ->
900
+ Ret
901
+ end .
902
+
842
903
list_down (VHostPath ) ->
843
904
case rabbit_vhost :exists (VHostPath ) of
844
905
false -> [];
@@ -859,13 +920,21 @@ count(VHost) ->
859
920
% % won't work here because with master migration of mirrored queues
860
921
% % the "ownership" of queues by nodes becomes a non-trivial problem
861
922
% % that requires a proper consensus algorithm.
862
- length (mnesia : dirty_index_read ( rabbit_queue , VHost , amqqueue : field_vhost () ))
923
+ length (list_for_count ( VHost ))
863
924
catch _ :Err ->
864
925
rabbit_log :error (" Failed to fetch number of queues in vhost ~p :~n~p~n " ,
865
926
[VHost , Err ]),
866
927
0
867
928
end .
868
929
930
+ list_for_count (VHost ) ->
931
+ list_with_possible_retry (
932
+ fun () ->
933
+ mnesia :dirty_index_read (rabbit_queue ,
934
+ VHost ,
935
+ amqqueue :field_vhost ())
936
+ end ).
937
+
869
938
info_keys () -> rabbit_amqqueue_process :info_keys ().
870
939
871
940
map (Qs , F ) -> rabbit_misc :filter_exit_map (F , Qs ).
0 commit comments