35
35
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand' ).
36
36
-define (COMMAND_ACTIVATE_STREAM_CONSUMER ,
37
37
'Elixir.RabbitMQ.CLI.Ctl.Commands.ActivateStreamConsumerCommand' ).
38
-
38
+ -define (COMMAND_RESET_OFFSET ,
39
+ 'Elixir.RabbitMQ.CLI.Ctl.Commands.ResetOffsetCommand' ).
39
40
40
41
all () ->
41
42
[{group , list_connections },
@@ -45,6 +46,7 @@ all() ->
45
46
{group , list_group_consumers },
46
47
{group , activate_consumer },
47
48
{group , list_stream_tracking },
49
+ {group , reset_offset },
48
50
{group , super_streams }].
49
51
50
52
groups () ->
@@ -67,6 +69,9 @@ groups() ->
67
69
{list_stream_tracking , [],
68
70
[list_stream_tracking_validate , list_stream_tracking_merge_defaults ,
69
71
list_stream_tracking_run ]},
72
+ {reset_offset , [],
73
+ [reset_offset_validate , reset_offset_merge_defaults ,
74
+ reset_offset_run ]},
70
75
{super_streams , [],
71
76
[add_super_stream_merge_defaults ,
72
77
add_super_stream_validate ,
@@ -708,6 +713,65 @@ list_stream_tracking_run(Config) ->
708
713
close (S , C ),
709
714
ok .
710
715
716
+ reset_offset_validate (_ ) ->
717
+ Cmd = ? COMMAND_RESET_OFFSET ,
718
+ ValidOpts = #{vhost => <<" /" >>,
719
+ stream => <<" s1" >>,
720
+ reference => <<" foo" >>},
721
+ ? assertMatch ({validation_failure , not_enough_args },
722
+ Cmd :validate ([], #{})),
723
+ ? assertMatch ({validation_failure , not_enough_args },
724
+ Cmd :validate ([], #{vhost => <<" test" >>})),
725
+ ? assertMatch ({validation_failure , too_many_args },
726
+ Cmd :validate ([<<" foo" >>], ValidOpts )),
727
+ ? assertMatch ({validation_failure , reference_too_long },
728
+ Cmd :validate ([], ValidOpts #{reference => gen_bin (256 )})),
729
+ ? assertMatch (ok , Cmd :validate ([], ValidOpts )),
730
+ ? assertMatch (ok , Cmd :validate ([], ValidOpts #{reference => gen_bin (255 )})).
731
+
732
+ reset_offset_merge_defaults (_Config ) ->
733
+ Cmd = ? COMMAND_RESET_OFFSET ,
734
+ Opts = #{vhost => <<" /" >>,
735
+ stream => <<" s1" >>,
736
+ reference => <<" foo" >>},
737
+ ? assertEqual ({[], Opts },
738
+ Cmd :merge_defaults ([], maps :without ([vhost ], Opts ))),
739
+ Merged = maps :merge (Opts , #{vhost => " vhost" }),
740
+ ? assertEqual ({[], Merged },
741
+ Cmd :merge_defaults ([], Merged )).
742
+
743
+ reset_offset_run (Config ) ->
744
+ Cmd = ? COMMAND_RESET_OFFSET ,
745
+ Node = rabbit_ct_broker_helpers :get_node_config (Config , 0 , nodename ),
746
+ Opts = #{node => Node ,
747
+ timeout => 10000 ,
748
+ vhost => <<" /" >>},
749
+ Args = [],
750
+
751
+ St = atom_to_binary (? FUNCTION_NAME , utf8 ),
752
+ Ref = <<" foo" >>,
753
+ OptsGroup = maps :merge (#{stream => St , reference => Ref },
754
+ Opts ),
755
+
756
+ % % the stream does not exist yet
757
+ ? assertMatch ({error , not_found },
758
+ Cmd :run (Args , OptsGroup )),
759
+
760
+ Port = rabbit_stream_SUITE :get_stream_port (Config ),
761
+ {S , C } = start_stream_connection (Port ),
762
+ create_stream (S , St , C ),
763
+
764
+ ? assertEqual ({error , no_reference }, Cmd :run (Args , OptsGroup )),
765
+ store_offset (S , St , Ref , 42 , C ),
766
+
767
+ check_stored_offset (S , St , Ref , 42 , C ),
768
+ ? assertMatch (ok , Cmd :run (Args , OptsGroup )),
769
+ check_stored_offset (S , St , Ref , 0 , C ),
770
+
771
+ delete_stream_no_metadata_update (S , St , C ),
772
+ close (S , C ),
773
+ ok .
774
+
711
775
add_super_stream_merge_defaults (_Config ) ->
712
776
? assertMatch ({[<<" super-stream" >>],
713
777
#{partitions := 3 , vhost := <<" /" >>}},
@@ -1024,6 +1088,10 @@ store_offset(S, Stream, Reference, Value, C) ->
1024
1088
{error , offset_not_stored }
1025
1089
end .
1026
1090
1091
+
1092
+ check_stored_offset (S , Stream , Reference , Expected , C ) ->
1093
+ check_stored_offset (S , Stream , Reference , Expected , C , 20 ).
1094
+
1027
1095
check_stored_offset (_ , _ , _ , _ , _ , 0 ) ->
1028
1096
error ;
1029
1097
check_stored_offset (S , Stream , Reference , Expected , C , Attempt ) ->
@@ -1061,3 +1129,5 @@ check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt) ->
1061
1129
check_publisher_sequence (S , Stream , Reference , Expected , C , Attempt - 1 )
1062
1130
end .
1063
1131
1132
+ gen_bin (L ) ->
1133
+ list_to_binary (lists :duplicate (L , " a" )).
0 commit comments