@@ -27,35 +27,24 @@ all() ->
2727 [
2828 {group , cluster_size_1 },
2929 {group , cluster_size_2 },
30- {group , cluster_size_3 },
3130 {group , unclustered_cluster_size_2 }
3231 ].
3332
3433groups () ->
3534 [
3635 {cluster_size_1 , [],
37- [{start_feature_flag_enabled , [], [remove_binding_unbind_queue ,
38- remove_binding_delete_queue ,
39- remove_binding_delete_queue_multiple ,
40- remove_binding_delete_exchange ,
41- recover_bindings ,
42- route_exchange_to_exchange ,
43- reset ]},
44- {start_feature_flag_disabled , [], [enable_feature_flag ]}
45- ]},
36+ [remove_binding_unbind_queue ,
37+ remove_binding_delete_queue ,
38+ remove_binding_delete_queue_multiple ,
39+ remove_binding_delete_exchange ,
40+ recover_bindings ,
41+ route_exchange_to_exchange ,
42+ reset ]},
4643 {cluster_size_2 , [],
47- [{start_feature_flag_enabled , [], [
48- remove_binding_node_down_transient_queue ,
49- keep_binding_node_down_durable_queue
50- ]},
51- {start_feature_flag_disabled , [], [enable_feature_flag_during_definition_import ]}
52- ]},
44+ [remove_binding_node_down_transient_queue ,
45+ keep_binding_node_down_durable_queue ]},
5346 {unclustered_cluster_size_2 , [],
54- [{start_feature_flag_enabled , [], [join_cluster ]}
55- ]},
56- {cluster_size_3 , [],
57- [{start_feature_flag_disabled , [], [enable_feature_flag_during_binding_churn ]}
58- ]}
47+ [join_cluster ]}
5948 ].
6049
6150suite () ->
@@ -71,41 +60,17 @@ init_per_suite(Config) ->
7160end_per_suite (Config ) ->
7261 rabbit_ct_helpers :run_teardown_steps (Config ).
7362
74- init_per_group (cluster_size_1 , Config ) ->
75- rabbit_ct_helpers :set_config (Config , {rmq_nodes_count , 1 });
76- init_per_group (cluster_size_2 , Config ) ->
77- rabbit_ct_helpers :set_config (Config , {rmq_nodes_count , 2 });
78- init_per_group (cluster_size_3 , Config ) ->
79- rabbit_ct_helpers :set_config (Config , {rmq_nodes_count , 3 });
80- init_per_group (unclustered_cluster_size_2 , Config0 ) ->
81- case rabbit_ct_helpers :is_mixed_versions () of
82- true ->
83- {skip , " This test group won't work in mixed mode with pre 3.11 releases" };
84- false ->
85- rabbit_ct_helpers :set_config (Config0 , [{rmq_nodes_count , 2 },
86- {rmq_nodes_clustered , false }])
87- end ;
88- init_per_group (start_feature_flag_enabled = Group , Config0 ) ->
89- Config = start_broker (Group , Config0 ),
90- case rabbit_ct_broker_helpers :enable_feature_flag (Config , ? FEATURE_FLAG ) of
91- ok ->
92- Config ;
93- {skip , _ } = Skip ->
94- end_per_group (Group , Config ),
95- Skip
96- end ;
97- init_per_group (start_feature_flag_disabled = Group , Config0 ) ->
98- Config1 = rabbit_ct_helpers :merge_app_env (
99- Config0 , {rabbit , [{forced_feature_flags_on_init , []}]}),
100- Config = start_broker (Group , Config1 ),
101- case rabbit_ct_broker_helpers :is_feature_flag_supported (Config , ? FEATURE_FLAG ) of
102- true ->
103- assert_no_index_table (Config ),
104- Config ;
105- false ->
106- end_per_group (Group , Config ),
107- {skip , io_lib :format (" '~ts ' feature flag is unsupported" , [? FEATURE_FLAG ])}
108- end .
63+ init_per_group (cluster_size_1 = Group , Config ) ->
64+ Config1 = rabbit_ct_helpers :set_config (Config , {rmq_nodes_count , 1 }),
65+ start_broker (Group , Config1 );
66+ init_per_group (cluster_size_2 = Group , Config ) ->
67+ Config1 = rabbit_ct_helpers :set_config (Config , {rmq_nodes_count , 2 }),
68+ start_broker (Group , Config1 );
69+ init_per_group (unclustered_cluster_size_2 = Group , Config0 ) ->
70+ Config1 = rabbit_ct_helpers :set_config (Config0 ,
71+ [{rmq_nodes_count , 2 },
72+ {rmq_nodes_clustered , false }]),
73+ start_broker (Group , Config1 ).
10974
11075start_broker (Group , Config0 ) ->
11176 Size = rabbit_ct_helpers :get_config (Config0 , rmq_nodes_count ),
@@ -117,14 +82,10 @@ start_broker(Group, Config0) ->
11782 rabbit_ct_broker_helpers :setup_steps () ++
11883 rabbit_ct_client_helpers :setup_steps ()).
11984
120- end_per_group (Group , Config )
121- when Group =:= start_feature_flag_enabled ;
122- Group =:= start_feature_flag_disabled ->
85+ end_per_group (_Group , Config ) ->
12386 rabbit_ct_helpers :run_steps (Config ,
12487 rabbit_ct_client_helpers :teardown_steps () ++
125- rabbit_ct_broker_helpers :teardown_steps ());
126- end_per_group (_Group , Config ) ->
127- Config .
88+ rabbit_ct_broker_helpers :teardown_steps ()).
12889
12990init_per_testcase (_TestCase , Config ) ->
13091 Config .
@@ -360,170 +321,6 @@ route_exchange_to_exchange(Config) ->
360321 routing_key = RKey }),
361322 ok .
362323
363- enable_feature_flag (Config ) ->
364- Nodes = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
365- {_Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config , 0 ),
366-
367- DirectX = <<" amq.direct" >>,
368- Q1 = <<" q1" >>,
369- Q2 = <<" q2" >>,
370- RKey = <<" k" >>,
371-
372- declare_queue (Ch , Q1 , true ),
373- bind_queue (Ch , Q1 , DirectX , RKey ),
374- bind_queue (Ch , Q1 , <<" amq.fanout" >>, RKey ),
375-
376- declare_queue (Ch , Q2 , false ),
377- bind_queue (Ch , Q2 , DirectX , RKey ),
378-
379- amqp_channel :call (Ch , # 'confirm.select' {}),
380- amqp_channel :register_confirm_handler (Ch , self ()),
381-
382- % % Publishing via "direct exchange routing v1" works.
383- publish (Ch , DirectX , RKey ),
384- assert_confirm (),
385-
386- ok = rabbit_ct_broker_helpers :enable_feature_flag (Config , ? FEATURE_FLAG ),
387-
388- % % The feature flag migration should have created an index table with a ram copy on all nodes.
389- ? assertEqual (lists :sort (Nodes ), index_table_ram_copies (Config , 0 )),
390- % % The feature flag migration should have populated the index table with all bindings whose source exchange
391- % % is a direct exchange.
392- ? assertEqual ([{rabbit_misc :r (<<" /" >>, exchange , DirectX ), RKey }],
393- rabbit_ct_broker_helpers :rpc (Config , 0 , mnesia , dirty_all_keys , [? INDEX_TABLE_NAME ])),
394- ? assertEqual (2 , table_size (Config , ? INDEX_TABLE_NAME )),
395-
396- % % Publishing via "direct exchange routing v2" works.
397- publish (Ch , DirectX , RKey ),
398- assert_confirm (),
399-
400- delete_queue (Ch , Q1 ),
401- delete_queue (Ch , Q2 ),
402- ok .
403-
404- % % Test that enabling feature flag works when bindings are imported concurrently.
405- enable_feature_flag_during_definition_import (Config ) ->
406- Nodes = [Server1 | _ ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
407- {_Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config , 0 ),
408- Path = filename :join ([? config (data_dir , Config ), " definition.json" ]),
409-
410- {Pid , Ref } = spawn_monitor (
411- fun () ->
412- ct :pal (" importing definitions..." ),
413- rabbit_ct_broker_helpers :rabbitmqctl (
414- Config , Server1 , [" import_definitions" , Path ]
415- ),
416- ct :pal (" imported definitions" )
417- end ),
418-
419- timer :sleep (rand :uniform (400 )),
420- ct :pal (" enabling feature flag..." ),
421- ok = rabbit_ct_broker_helpers :enable_feature_flag (Config , ? FEATURE_FLAG ),
422- ct :pal (" enabled feature flag" ),
423-
424- receive {'DOWN' , Ref , process , Pid , normal } ->
425- ok
426- after 10_000 ->
427- ct :fail (timeout )
428- end ,
429-
430- ? assertEqual (lists :sort (Nodes ), index_table_ram_copies (Config , 0 )),
431- ? assertEqual (? NUM_BINDINGS_TO_DIRECT_ECHANGE ,
432- table_size (Config , ? INDEX_TABLE_NAME )),
433-
434- % % cleanup
435- delete_queue (Ch , <<" durable-q" >>),
436- delete_queue (Ch , <<" transient-q" >>),
437- ok .
438-
439- % % Test that enabling feature flag works when clients concurrently
440- % % create and delete bindings and send messages.
441- enable_feature_flag_during_binding_churn (Config ) ->
442- Nodes = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
443- {_Conn1 , Ch1 } = rabbit_ct_client_helpers :open_connection_and_channel (Config , 0 ),
444- {_Conn2 , Ch2 } = rabbit_ct_client_helpers :open_connection_and_channel (Config , 1 ),
445-
446- DirectX = <<" amq.direct" >>,
447- FanoutX = <<" amq.fanout" >>,
448- Q = <<" q" >>,
449-
450- NumMessages = 500 ,
451- BindingsDirectX = 1000 ,
452- BindingsFanoutX = 10 ,
453-
454- % % setup
455- declare_queue (Ch1 , Q , true ),
456- lists :foreach (fun (N ) ->
457- bind_queue (Ch1 , Q , DirectX , integer_to_binary (N ))
458- end , lists :seq (1 , trunc (0.4 * BindingsDirectX ))),
459- lists :foreach (fun (N ) ->
460- bind_queue (Ch1 , Q , FanoutX , integer_to_binary (N ))
461- end , lists :seq (1 , BindingsFanoutX )),
462- lists :foreach (fun (N ) ->
463- bind_queue (Ch1 , Q , DirectX , integer_to_binary (N ))
464- end , lists :seq (trunc (0.4 * BindingsDirectX ) + 1 , trunc (0.8 * BindingsDirectX ))),
465-
466- {_ , Ref1 } = spawn_monitor (
467- fun () ->
468- ct :pal (" sending ~b messages..." , [NumMessages ]),
469- lists :foreach (
470- fun (_ ) ->
471- publish (Ch1 , DirectX , integer_to_binary (trunc (0.8 * BindingsDirectX ))),
472- timer :sleep (1 )
473- end , lists :seq (1 , NumMessages )),
474- ct :pal (" sent ~b messages" , [NumMessages ])
475- end ),
476- {_ , Ref2 } = spawn_monitor (
477- fun () ->
478- ct :pal (" creating bindings..." ),
479- lists :foreach (
480- fun (N ) ->
481- bind_queue (Ch1 , Q , DirectX , integer_to_binary (N )),
482- timer :sleep (1 )
483- end , lists :seq (trunc (0.8 * BindingsDirectX ) + 1 , BindingsDirectX )),
484- ct :pal (" created bindings" )
485- end ),
486- {_ , Ref3 } = spawn_monitor (
487- fun () ->
488- ct :pal (" deleting bindings..." ),
489- lists :foreach (
490- fun (N ) ->
491- unbind_queue (Ch2 , Q , DirectX , integer_to_binary (N )),
492- timer :sleep (1 )
493- end , lists :seq (1 , trunc (0.2 * BindingsDirectX ))),
494- ct :pal (" deleted bindings" )
495- end ),
496-
497- timer :sleep (rand :uniform (300 )),
498- ct :pal (" enabling feature flag..." ),
499- ok = rabbit_ct_broker_helpers :enable_feature_flag (Config , ? FEATURE_FLAG ),
500- ct :pal (" enabled feature flag" ),
501-
502- lists :foreach (
503- fun (Ref ) ->
504- receive {'DOWN' , Ref , process , _Pid , normal } ->
505- ok
506- after 300_000 ->
507- ct :fail (timeout )
508- end
509- end , [Ref1 , Ref2 , Ref3 ]),
510-
511- NumMessagesBin = integer_to_binary (NumMessages ),
512- quorum_queue_utils :wait_for_messages (Config , [[Q , NumMessagesBin , NumMessagesBin , <<" 0" >>]]),
513-
514- ? assertEqual (lists :sort (Nodes ), index_table_ram_copies (Config , 0 )),
515-
516- ExpectedKeys = lists :map (
517- fun (N ) ->
518- {rabbit_misc :r (<<" /" >>, exchange , DirectX ), integer_to_binary (N )}
519- end , lists :seq (trunc (0.2 * BindingsDirectX ) + 1 , BindingsDirectX )),
520- ActualKeys = rabbit_ct_broker_helpers :rpc (Config , 0 , mnesia , dirty_all_keys , [? INDEX_TABLE_NAME ]),
521- ? assertEqual (lists :sort (ExpectedKeys ), lists :sort (ActualKeys )),
522-
523- % % cleanup
524- delete_queue (Ch1 , Q ),
525- ok .
526-
527324reset (Config ) ->
528325 Server = rabbit_ct_broker_helpers :get_node_config (Config , 0 , nodename ),
529326
0 commit comments