diff --git a/deps/rabbit/docs/rabbitmq-streams.8 b/deps/rabbit/docs/rabbitmq-streams.8 index 4787069c9f2b..0afa9c51890c 100644 --- a/deps/rabbit/docs/rabbitmq-streams.8 +++ b/deps/rabbit/docs/rabbitmq-streams.8 @@ -453,6 +453,14 @@ List only writer deduplication tracking information. Example: .Sp .Dl rabbitmq-streams list_stream_tracking stream-1 --offset +.\" ------------------------------------------------------------------ +.It Cm reset_offset Fl -stream Ar stream Fl -reference Ar reference Oo Fl -vhost Ar vhost Oc +.Pp +Reset the stored offset for a consumer name on a stream. +.Pp +Example: +.Sp +.Dl rabbitmq-streams reset_offset --stream stream --reference app-1 .El .\" ------------------------------------------------------------------ .Sh SEE ALSO diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ResetOffsetCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ResetOffsetCommand.erl new file mode 100644 index 000000000000..fd77623d51b5 --- /dev/null +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ResetOffsetCommand.erl @@ -0,0 +1,116 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 2.0 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ResetOffsetCommand'). + +-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl"). +-include_lib("rabbitmq_stream/src/rabbit_stream_utils.hrl"). + +-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). + +-export([formatter/0, + scopes/0, + switches/0, + aliases/0, + usage/0, + usage_additional/0, + usage_doc_guides/0, + banner/2, + validate/2, + merge_defaults/2, + run/2, + output/2, + description/0, + help_section/0]). + +formatter() -> + 'Elixir.RabbitMQ.CLI.Formatters.String'. + +scopes() -> + [streams]. + +switches() -> + [{stream, string}, {reference, string}]. + +aliases() -> + []. + +description() -> + <<"Reset the stored offset for a consumer name on a stream">>. + +help_section() -> + {plugin, stream}. + +validate([], #{stream := _, reference := R}) when ?IS_INVALID_REF(R) -> + {validation_failure, reference_too_long}; +validate([], #{stream := _, reference := _}) -> + ok; +validate(Args, _) when is_list(Args) andalso length(Args) > 0 -> + {validation_failure, too_many_args}; +validate(_, _) -> + {validation_failure, not_enough_args}. + +merge_defaults(Args, Opts) -> + {Args, maps:merge(#{vhost => <<"/">>}, Opts)}. + +usage() -> + <<"reset_offset --stream " + "--reference [--vhost ]">>. + +usage_additional() -> + <<"">>. + +usage_doc_guides() -> + [?STREAMS_GUIDE_URL]. + +run(_, + #{node := NodeName, + vhost := VHost, + stream := Stream, + reference := Reference, + timeout := Timeout}) -> + rabbit_misc:rpc_call(NodeName, + rabbit_stream_manager, + reset_offset, + [VHost, Stream, Reference], + Timeout). + +banner(_, _) -> + <<"Resetting stored offset ...">>. + +output(ok, Opts) -> + Silent = maps:get(quiet, Opts, maps:get(silent, Opts, false)), + case Silent of + true -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output(ok); + false -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output({ok, <<"Done">>}) + end; +output({validation_failure, reference_too_long}, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output({error, + <<"The reference is too long">>}); +output({error, not_found}, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output({error, + <<"The stream does not exist">>}); +output({error, not_available}, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output({error, + <<"The stream is not available">>}); +output({error, no_reference}, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output({error, + <<"There is no stored offset " + "for this reference, no need to reset">>}); +output(R, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output(R). + diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 9711046f147a..7ccb1127c77c 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -21,6 +21,7 @@ -include_lib("rabbit_common/include/rabbit_framing.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit/include/amqqueue.hrl"). +-include_lib("rabbitmq_stream/src/rabbit_stream_utils.hrl"). %% API -export([create/4, @@ -33,7 +34,8 @@ topology/2, route/3, partitions/2, - partition_index/3]). + partition_index/3, + reset_offset/3]). -spec create(binary(), binary(), #{binary() => binary()}, binary()) -> {ok, map()} | @@ -396,6 +398,27 @@ partition_index(VirtualHost, SuperStream, Stream) -> {error, stream_not_found} end. +-spec reset_offset(binary(), binary(), binary()) -> + ok | + {error, not_available | not_found | no_reference | + {validation_failed, term()}}. +reset_offset(_, _, Ref) when ?IS_INVALID_REF(Ref) -> + {error, {validation_failed, + rabbit_misc:format("Reference is too long to store offset: ~p", + [byte_size(Ref)])}}; +reset_offset(VH, S, Ref) -> + case lookup_leader(VH, S) of + {ok, P} -> + case osiris:read_tracking(P, offset, Ref) of + undefined -> + {error, no_reference}; + {offset, _} -> + osiris:write_tracking(P, Ref, {offset, 0}) + end; + R -> + R + end. + stream_queue_arguments(Arguments) -> stream_queue_arguments([{<<"x-queue-type">>, longstr, <<"stream">>}], Arguments). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 492b74a7cc95..2b70915eda6e 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -19,6 +19,7 @@ -behaviour(gen_statem). +-include("rabbit_stream_utils.hrl"). -include("rabbit_stream_reader.hrl"). -include("rabbit_stream_metrics.hrl"). @@ -80,7 +81,6 @@ peer_cert_validity]). -define(UNKNOWN_FIELD, unknown_field). -define(SILENT_CLOSE_DELAY, 3_000). --define(IS_INVALID_REF(Ref), is_binary(Ref) andalso byte_size(Ref) > 255). -define(SAC_MOD, rabbit_stream_sac_coordinator). -import(rabbit_stream_utils, [check_write_permitted/2, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.hrl b/deps/rabbitmq_stream/src/rabbit_stream_utils.hrl new file mode 100644 index 000000000000..a957d06c4159 --- /dev/null +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.hrl @@ -0,0 +1,16 @@ +%% The contents of this file are subject to the Mozilla Public License +%% at https://www.mozilla.org/en-US/MPL/2.0/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is Pivotal Software, Inc. +%% Copyright (c) 2025 Broadcom. All Rights Reserved. +%% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-define(IS_INVALID_REF(Ref), is_binary(Ref) andalso byte_size(Ref) > 255). diff --git a/deps/rabbitmq_stream/test/commands_SUITE.erl b/deps/rabbitmq_stream/test/commands_SUITE.erl index 0928acd6b5a7..c0ac9a30966f 100644 --- a/deps/rabbitmq_stream/test/commands_SUITE.erl +++ b/deps/rabbitmq_stream/test/commands_SUITE.erl @@ -35,7 +35,8 @@ 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand'). -define(COMMAND_ACTIVATE_STREAM_CONSUMER, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ActivateStreamConsumerCommand'). - +-define(COMMAND_RESET_OFFSET, + 'Elixir.RabbitMQ.CLI.Ctl.Commands.ResetOffsetCommand'). all() -> [{group, list_connections}, @@ -45,6 +46,7 @@ all() -> {group, list_group_consumers}, {group, activate_consumer}, {group, list_stream_tracking}, + {group, reset_offset}, {group, super_streams}]. groups() -> @@ -67,6 +69,9 @@ groups() -> {list_stream_tracking, [], [list_stream_tracking_validate, list_stream_tracking_merge_defaults, list_stream_tracking_run]}, + {reset_offset, [], + [reset_offset_validate, reset_offset_merge_defaults, + reset_offset_run]}, {super_streams, [], [add_super_stream_merge_defaults, add_super_stream_validate, @@ -708,6 +713,65 @@ list_stream_tracking_run(Config) -> close(S, C), ok. +reset_offset_validate(_) -> + Cmd = ?COMMAND_RESET_OFFSET, + ValidOpts = #{vhost => <<"/">>, + stream => <<"s1">>, + reference => <<"foo">>}, + ?assertMatch({validation_failure, not_enough_args}, + Cmd:validate([], #{})), + ?assertMatch({validation_failure, not_enough_args}, + Cmd:validate([], #{vhost => <<"test">>})), + ?assertMatch({validation_failure, too_many_args}, + Cmd:validate([<<"foo">>], ValidOpts)), + ?assertMatch({validation_failure, reference_too_long}, + Cmd:validate([], ValidOpts#{reference => gen_bin(256)})), + ?assertMatch(ok, Cmd:validate([], ValidOpts)), + ?assertMatch(ok, Cmd:validate([], ValidOpts#{reference => gen_bin(255)})). + +reset_offset_merge_defaults(_Config) -> + Cmd = ?COMMAND_RESET_OFFSET, + Opts = #{vhost => <<"/">>, + stream => <<"s1">>, + reference => <<"foo">>}, + ?assertEqual({[], Opts}, + Cmd:merge_defaults([], maps:without([vhost], Opts))), + Merged = maps:merge(Opts, #{vhost => "vhost"}), + ?assertEqual({[], Merged}, + Cmd:merge_defaults([], Merged)). + +reset_offset_run(Config) -> + Cmd = ?COMMAND_RESET_OFFSET, + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Opts =#{node => Node, + timeout => 10000, + vhost => <<"/">>}, + Args = [], + + St = atom_to_binary(?FUNCTION_NAME, utf8), + Ref = <<"foo">>, + OptsGroup = maps:merge(#{stream => St, reference => Ref}, + Opts), + + %% the stream does not exist yet + ?assertMatch({error, not_found}, + Cmd:run(Args, OptsGroup)), + + Port = rabbit_stream_SUITE:get_stream_port(Config), + {S, C} = start_stream_connection(Port), + create_stream(S, St, C), + + ?assertEqual({error, no_reference}, Cmd:run(Args, OptsGroup)), + store_offset(S, St, Ref, 42, C), + + check_stored_offset(S, St, Ref, 42, C), + ?assertMatch(ok, Cmd:run(Args, OptsGroup)), + check_stored_offset(S, St, Ref, 0, C), + + delete_stream(S, St, C), + close(S, C), + ok. + add_super_stream_merge_defaults(_Config) -> ?assertMatch({[<<"super-stream">>], #{partitions := 3, vhost := <<"/">>}}, @@ -1024,6 +1088,10 @@ store_offset(S, Stream, Reference, Value, C) -> {error, offset_not_stored} end. + +check_stored_offset(S, Stream, Reference, Expected, C) -> + check_stored_offset(S, Stream, Reference, Expected, C, 20). + check_stored_offset(_, _, _, _, _, 0) -> error; check_stored_offset(S, Stream, Reference, Expected, C, Attempt) -> @@ -1061,3 +1129,5 @@ check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt) -> check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt - 1) end. +gen_bin(L) -> + list_to_binary(lists:duplicate(L, "a")). diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl index 83a20584e2ad..af674fb9346d 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl @@ -20,7 +20,8 @@ groups() -> [manage_super_stream, lookup_leader, lookup_member, - partition_index]}]. + partition_index, + reset_offset]}]. %% ------------------------------------------------------------------- %% Testsuite setup/teardown. @@ -196,73 +197,67 @@ partition_index(Config) -> amqp_connection:close(C), ok. +reset_offset(Config) -> + S = atom_to_binary(?FUNCTION_NAME, utf8), + Ref = <<"app">>, + ?assertMatch({ok, _}, create_stream(Config, S)), + + {ok, Pid} = lookup_leader(Config, S), + + ?assertEqual(undefined, query_offset(Config, Pid, Ref)), + ?assertEqual({error, no_reference}, reset_offset(Config, S, Ref)), + ok = store_offset(Config, Pid, Ref, 42), + ?assertEqual({offset, 42}, query_offset(Config, Pid, Ref)), + ?assertEqual(ok, reset_offset(Config, S, Ref)), + ?assertEqual({offset, 0}, query_offset(Config, Pid, Ref)), + + ?assertEqual({error, not_found}, + reset_offset(Config, <<"does-not-exist">>, Ref)), + + ?assertEqual({ok, deleted}, delete_stream(Config, S)). + +query_offset(Config, Pid, Ref) -> + rpc(Config, osiris, read_tracking, [Pid, Ref]). + +store_offset(Config, Pid, Ref, Offset) -> + rpc(Config, osiris, write_tracking, [Pid, Ref, {offset, Offset}]). + +reset_offset(Config, S, Ref) -> + rpc(Config, rabbit_stream_manager, reset_offset, [<<"/">>, S, Ref]). + create_super_stream(Config, Name, Partitions, RKs) -> - rabbit_ct_broker_helpers:rpc(Config, - 0, - rabbit_stream_manager, - create_super_stream, - [<<"/">>, - Name, - Partitions, - #{}, - RKs, - <<"guest">>]). + rpc(Config, rabbit_stream_manager, create_super_stream, + [<<"/">>, Name, Partitions, #{}, RKs, <<"guest">>]). delete_super_stream(Config, Name) -> - rabbit_ct_broker_helpers:rpc(Config, - 0, - rabbit_stream_manager, - delete_super_stream, - [<<"/">>, Name, <<"guest">>]). + rpc(Config, rabbit_stream_manager, delete_super_stream, + [<<"/">>, Name, <<"guest">>]). create_stream(Config, Name) -> - rabbit_ct_broker_helpers:rpc(Config, - 0, - rabbit_stream_manager, - create, - [<<"/">>, Name, [], <<"guest">>]). + rpc(Config, rabbit_stream_manager, create, [<<"/">>, Name, [], <<"guest">>]). delete_stream(Config, Name) -> - rabbit_ct_broker_helpers:rpc(Config, - 0, - rabbit_stream_manager, - delete, - [<<"/">>, Name, <<"guest">>]). + rpc(Config, rabbit_stream_manager, delete, [<<"/">>, Name, <<"guest">>]). lookup_leader(Config, Name) -> - rabbit_ct_broker_helpers:rpc(Config, - 0, - rabbit_stream_manager, - lookup_leader, - [<<"/">>, Name]). + rpc(Config, rabbit_stream_manager, lookup_leader, [<<"/">>, Name]). lookup_member(Config, Name) -> - rabbit_ct_broker_helpers:rpc(Config, - 0, - rabbit_stream_manager, - lookup_member, - [<<"/">>, Name]). + rpc(Config, rabbit_stream_manager, lookup_member, [<<"/">>, Name]). partitions(Config, Name) -> - rabbit_ct_broker_helpers:rpc(Config, - 0, - rabbit_stream_manager, - partitions, - [<<"/">>, Name]). + rpc(Config, rabbit_stream_manager, partitions, [<<"/">>, Name]). route(Config, RoutingKey, SuperStream) -> - rabbit_ct_broker_helpers:rpc(Config, - 0, - rabbit_stream_manager, - route, - [RoutingKey, <<"/">>, SuperStream]). + rpc(Config, rabbit_stream_manager, route, + [RoutingKey, <<"/">>, SuperStream]). partition_index(Config, SuperStream, Stream) -> - rabbit_ct_broker_helpers:rpc(Config, - 0, - rabbit_stream_manager, - partition_index, - [<<"/">>, SuperStream, Stream]). + rpc(Config, rabbit_stream_manager, partition_index, + [<<"/">>, SuperStream, Stream]). + +rpc(Config, M, F, A) -> + rabbit_ct_broker_helpers:rpc(Config, 0, M, F, A). start_amqp_connection(Config) -> Port =