3535 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand' ).
3636-define (COMMAND_ACTIVATE_STREAM_CONSUMER ,
3737 'Elixir.RabbitMQ.CLI.Ctl.Commands.ActivateStreamConsumerCommand' ).
38-
38+ -define (COMMAND_RESET_OFFSET ,
39+ 'Elixir.RabbitMQ.CLI.Ctl.Commands.ResetOffsetCommand' ).
3940
4041all () ->
4142 [{group , list_connections },
@@ -45,6 +46,7 @@ all() ->
4546 {group , list_group_consumers },
4647 {group , activate_consumer },
4748 {group , list_stream_tracking },
49+ {group , reset_offset },
4850 {group , super_streams }].
4951
5052groups () ->
@@ -67,6 +69,9 @@ groups() ->
6769 {list_stream_tracking , [],
6870 [list_stream_tracking_validate , list_stream_tracking_merge_defaults ,
6971 list_stream_tracking_run ]},
72+ {reset_offset , [],
73+ [reset_offset_validate , reset_offset_merge_defaults ,
74+ reset_offset_run ]},
7075 {super_streams , [],
7176 [add_super_stream_merge_defaults ,
7277 add_super_stream_validate ,
@@ -708,6 +713,65 @@ list_stream_tracking_run(Config) ->
708713 close (S , C ),
709714 ok .
710715
716+ reset_offset_validate (_ ) ->
717+ Cmd = ? COMMAND_RESET_OFFSET ,
718+ ValidOpts = #{vhost => <<" /" >>,
719+ stream => <<" s1" >>,
720+ reference => <<" foo" >>},
721+ ? assertMatch ({validation_failure , not_enough_args },
722+ Cmd :validate ([], #{})),
723+ ? assertMatch ({validation_failure , not_enough_args },
724+ Cmd :validate ([], #{vhost => <<" test" >>})),
725+ ? assertMatch ({validation_failure , too_many_args },
726+ Cmd :validate ([<<" foo" >>], ValidOpts )),
727+ ? assertMatch ({validation_failure , reference_too_long },
728+ Cmd :validate ([], ValidOpts #{reference => gen_bin (256 )})),
729+ ? assertMatch (ok , Cmd :validate ([], ValidOpts )),
730+ ? assertMatch (ok , Cmd :validate ([], ValidOpts #{reference => gen_bin (255 )})).
731+
732+ reset_offset_merge_defaults (_Config ) ->
733+ Cmd = ? COMMAND_RESET_OFFSET ,
734+ Opts = #{vhost => <<" /" >>,
735+ stream => <<" s1" >>,
736+ reference => <<" foo" >>},
737+ ? assertEqual ({[], Opts },
738+ Cmd :merge_defaults ([], maps :without ([vhost ], Opts ))),
739+ Merged = maps :merge (Opts , #{vhost => " vhost" }),
740+ ? assertEqual ({[], Merged },
741+ Cmd :merge_defaults ([], Merged )).
742+
743+ reset_offset_run (Config ) ->
744+ Cmd = ? COMMAND_RESET_OFFSET ,
745+ Node = rabbit_ct_broker_helpers :get_node_config (Config , 0 , nodename ),
746+ Opts = #{node => Node ,
747+ timeout => 10000 ,
748+ vhost => <<" /" >>},
749+ Args = [],
750+
751+ St = atom_to_binary (? FUNCTION_NAME , utf8 ),
752+ Ref = <<" foo" >>,
753+ OptsGroup = maps :merge (#{stream => St , reference => Ref },
754+ Opts ),
755+
756+ % % the stream does not exist yet
757+ ? assertMatch ({error , not_found },
758+ Cmd :run (Args , OptsGroup )),
759+
760+ Port = rabbit_stream_SUITE :get_stream_port (Config ),
761+ {S , C } = start_stream_connection (Port ),
762+ create_stream (S , St , C ),
763+
764+ ? assertEqual ({error , no_reference }, Cmd :run (Args , OptsGroup )),
765+ store_offset (S , St , Ref , 42 , C ),
766+
767+ check_stored_offset (S , St , Ref , 42 , C ),
768+ ? assertMatch (ok , Cmd :run (Args , OptsGroup )),
769+ check_stored_offset (S , St , Ref , 0 , C ),
770+
771+ delete_stream (S , St , C ),
772+ close (S , C ),
773+ ok .
774+
711775add_super_stream_merge_defaults (_Config ) ->
712776 ? assertMatch ({[<<" super-stream" >>],
713777 #{partitions := 3 , vhost := <<" /" >>}},
@@ -1024,6 +1088,10 @@ store_offset(S, Stream, Reference, Value, C) ->
10241088 {error , offset_not_stored }
10251089 end .
10261090
1091+
1092+ check_stored_offset (S , Stream , Reference , Expected , C ) ->
1093+ check_stored_offset (S , Stream , Reference , Expected , C , 20 ).
1094+
10271095check_stored_offset (_ , _ , _ , _ , _ , 0 ) ->
10281096 error ;
10291097check_stored_offset (S , Stream , Reference , Expected , C , Attempt ) ->
@@ -1061,3 +1129,5 @@ check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt) ->
10611129 check_publisher_sequence (S , Stream , Reference , Expected , C , Attempt - 1 )
10621130 end .
10631131
1132+ gen_bin (L ) ->
1133+ list_to_binary (lists :duplicate (L , " a" )).
0 commit comments