Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/docs/rabbitmqctl.8
Original file line number Diff line number Diff line change
Expand Up @@ -2086,7 +2086,7 @@ Enables a feature flag on the target node.
.Pp
Example:
.Sp
.Dl rabbitmqctl enable_feature_flag stream_queue
.Dl rabbitmqctl enable_feature_flag restart_streams
.Pp
You can also enable all feature flags by specifying "all":
.Sp
Expand Down
28 changes: 10 additions & 18 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -217,24 +217,16 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
Owner, ActingUser, Node) ->
ok = check_declare_arguments(QueueName, Args),
Type = get_queue_type(Args),
case rabbit_queue_type:is_enabled(Type) of
true ->
Q = amqqueue:new(QueueName,
none,
Durable,
AutoDelete,
Owner,
Args,
VHost,
#{user => ActingUser},
Type),
rabbit_queue_type:declare(Q, Node);
false ->
{protocol_error, internal_error,
"Cannot declare a queue '~ts' of type '~ts' on node '~ts': "
"the corresponding feature flag is disabled",
[rabbit_misc:rs(QueueName), Type, Node]}
end.
Q = amqqueue:new(QueueName,
none,
Durable,
AutoDelete,
Owner,
Args,
VHost,
#{user => ActingUser},
Type),
rabbit_queue_type:declare(Q, Node).

get_queue_type(Args) ->
case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of
Expand Down
3 changes: 0 additions & 3 deletions deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
-export_type([state/0]).

-export([
is_enabled/0,
is_compatible/3,
declare/2,
delete/4,
Expand Down Expand Up @@ -59,8 +58,6 @@
send_drained/3,
send_credit_reply/3]).

is_enabled() -> true.

-spec is_compatible(boolean(), boolean(), boolean()) -> boolean().
is_compatible(_, _, _) ->
true.
Expand Down
1 change: 0 additions & 1 deletion deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
{stream_queue,
#{desc => "Support queues of type `stream`",
doc_url => "https://www.rabbitmq.com/stream.html",
%%TODO remove compatibility code
stability => required,
depends_on => [quorum_queue]
}}).
Expand Down
10 changes: 1 addition & 9 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
discover/1,
feature_flag_name/1,
default/0,
is_enabled/1,
is_compatible/4,
declare/2,
delete/4,
Expand Down Expand Up @@ -116,8 +115,6 @@
actions/0,
settle_op/0]).

-callback is_enabled() -> boolean().

-callback is_compatible(Durable :: boolean(),
Exclusive :: boolean(),
AutoDelete :: boolean()) ->
Expand Down Expand Up @@ -237,17 +234,12 @@ feature_flag_name(_) ->
default() ->
rabbit_classic_queue.

%% is a specific queue type implementation enabled
-spec is_enabled(module()) -> boolean().
is_enabled(Type) ->
Type:is_enabled().

-spec is_compatible(module(), boolean(), boolean(), boolean()) ->
boolean().
is_compatible(Type, Durable, Exclusive, AutoDelete) ->
Type:is_compatible(Durable, Exclusive, AutoDelete).

-spec declare(amqqueue:amqqueue(), node()) ->
-spec declare(amqqueue:amqqueue(), node() | {'ignore_location', node()}) ->
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()} |
Expand Down
7 changes: 1 addition & 6 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@
notify_decorators/3,
spawn_notify_decorators/3]).

-export([is_enabled/0,
is_compatible/3,
-export([is_compatible/3,
declare/2,
is_stateful/0]).

Expand Down Expand Up @@ -113,10 +112,6 @@

%%----------- rabbit_queue_type ---------------------------------------------

-spec is_enabled() -> boolean().
is_enabled() ->
true.

-spec is_compatible(boolean(), boolean(), boolean()) -> boolean().
is_compatible(_Durable = true,
_Exclusive = false,
Expand Down
7 changes: 1 addition & 6 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

-behaviour(rabbit_queue_type).

-export([is_enabled/0,
is_compatible/3,
-export([is_compatible/3,
declare/2,
delete/4,
purge/1,
Expand Down Expand Up @@ -90,10 +89,6 @@

-type client() :: #stream_client{}.

-spec is_enabled() -> boolean().
is_enabled() ->
rabbit_feature_flags:is_enabled(stream_queue).

-spec is_compatible(boolean(), boolean(), boolean()) -> boolean().
is_compatible(_Durable = true,
_Exclusive = false,
Expand Down
8 changes: 1 addition & 7 deletions deps/rabbit/test/dead_lettering_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,7 @@ init_per_group(at_least_once, Config) ->
1,
QueueArgs1,
{<<"x-overflow">>, longstr, <<"reject-publish">>}),
Config1 = rabbit_ct_helpers:set_config(Config, {queue_args, QueueArgs}),
case rabbit_ct_broker_helpers:enable_feature_flag(Config1, stream_queue) of
ok ->
Config1;
Skip ->
Skip
end;
rabbit_ct_helpers:set_config(Config, {queue_args, QueueArgs});
_ ->
Config
end;
Expand Down
15 changes: 5 additions & 10 deletions deps/rabbit/test/queue_parallel_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,11 @@ init_per_group(mirrored_queue, Config) ->
{queue_durable, true}]),
rabbit_ct_helpers:run_steps(Config1, []);
init_per_group(stream_queue, Config) ->
case rabbit_ct_broker_helpers:enable_feature_flag(Config, stream_queue) of
ok ->
rabbit_ct_helpers:set_config(
Config,
[{queue_args, [{<<"x-queue-type">>, longstr, <<"stream">>}]},
{consumer_args, [{<<"x-stream-offset">>, long, 0}]},
{queue_durable, true}]);
Skip ->
Skip
end;
rabbit_ct_helpers:set_config(
Config,
[{queue_args, [{<<"x-queue-type">>, longstr, <<"stream">>}]},
{consumer_args, [{<<"x-stream-offset">>, long, 0}]},
{queue_durable, true}]);
init_per_group(Group, Config0) ->
case lists:member({group, Group}, all()) of
true ->
Expand Down
5 changes: 1 addition & 4 deletions deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,7 @@ init_per_group(Group, Config, NodesCount) ->
rabbit_ct_broker_helpers:setup_steps()),
ok = rpc(Config2, 0, application, set_env,
[rabbit, channel_tick_interval, 100]),
case rabbit_ct_broker_helpers:enable_feature_flag(Config2, stream_queue) of
ok -> Config2;
Skip -> Skip
end.
Config2.

end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config,
Expand Down
1 change: 0 additions & 1 deletion deps/rabbit/test/unicode_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ queue(Config, QName0, Args) ->
ok.

stream(Config) ->
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, stream_queue),
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
ConsumerTag = QName0 = atom_to_binary(?FUNCTION_NAME),
QName = <<QName0/binary, ?UNICODE_STRING/utf8>>,
Expand Down
16 changes: 2 additions & 14 deletions deps/rabbitmq_amqp1_0/test/system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,6 @@ redelivery(Config) ->
]).

routing(Config) ->

StreamQT =
case rabbit_ct_broker_helpers:enable_feature_flag(Config, stream_queue) of
ok ->
<<"stream">>;
_ ->
%% if the feature flag could not be enabled we run the stream
%% routing test using a classc quue instead
ct:pal("stream feature flag could not be enabled"
"running stream tests against classic"),
<<"classic">>
end,
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
amqp_channel:call(Ch, #'queue.declare'{queue = <<"transient_q">>,
durable = false}),
Expand All @@ -211,10 +199,10 @@ routing(Config) ->
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),
amqp_channel:call(Ch, #'queue.declare'{queue = <<"stream_q">>,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, StreamQT}]}),
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}),
amqp_channel:call(Ch, #'queue.declare'{queue = <<"stream_q2">>,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, StreamQT}]}),
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}),
amqp_channel:call(Ch, #'queue.declare'{queue = <<"autodel_q">>,
auto_delete = true}),
run(Config, [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,12 @@ defmodule RabbitMQ.CLI.Ctl.Commands.AddVhostCommand do
default_queue_type: default_qt
}) do
meta = %{description: desc, tags: parse_tags(tags), default_queue_type: default_qt}
# check if the respective feature flag is enabled
case default_qt do
"quorum" ->
FeatureFlags.assert_feature_flag_enabled(node_name, :quorum_queue, fn ->
:rabbit_misc.rpc_call(node_name, :rabbit_vhost, :add, [
vhost,
meta,
Helpers.cli_acting_user()
])
end)

"stream" ->
FeatureFlags.assert_feature_flag_enabled(node_name, :stream_queue, fn ->
:rabbit_misc.rpc_call(node_name, :rabbit_vhost, :add, [
vhost,
meta,
Helpers.cli_acting_user()
])
end)

_ ->
:rabbit_misc.rpc_call(node_name, :rabbit_vhost, :add, [
vhost,
meta,
Helpers.cli_acting_user()
])
end

:rabbit_misc.rpc_call(node_name, :rabbit_vhost, :add, [
vhost,
meta,
Helpers.cli_acting_user()
])
end

def run([vhost], %{node: node_name, description: desc, tags: tags}) do
Expand Down
4 changes: 0 additions & 4 deletions deps/rabbitmq_management/test/clustering_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,11 @@ queue_on_other_node(Config) ->
ok.

queue_with_multiple_consumers(Config) ->
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, stream_queue),
%% this may not be supported in mixed mode
_ = rabbit_ct_broker_helpers:enable_feature_flag(Config, classic_queue_type_delivery_support),
{ok, Chan} = amqp_connection:open_channel(?config(conn, Config)),
Q = <<"multi-consumer-queue1">>,
_ = queue_declare(Chan, Q),
_ = wait_for_queue(Config, "/queues/%2F/multi-consumer-queue1"),


Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 1),
{ok, Chan2} = amqp_connection:open_channel(Conn),
consume(Chan, Q),
Expand Down
6 changes: 0 additions & 6 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
declare/2,
delete/4,
deliver/2,
is_enabled/0,
is_compatible/3,
is_recoverable/1,
recover/2,
Expand Down Expand Up @@ -134,11 +133,6 @@ deliver(Qs, #delivery{message = BasicMessage,
delegate:invoke_no_result(Pids, {gen_server, cast, [Msg]}),
{[], Actions}.

-spec is_enabled() ->
boolean().
is_enabled() ->
true.

-spec is_compatible(boolean(), boolean(), boolean()) ->
boolean().
is_compatible(_Durable = true, _Exclusive = true, _AutoDelete = false) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,10 @@ def test_quorum_queue(self):
routing_key=queueName,
body='Hello World!')

# could we declare a quorum queue?
quorum_queue_supported = True
if len(self.listener.errors) > 0:
pattern = re.compile(r"feature flag is disabled", re.MULTILINE)
for error in self.listener.errors:
if pattern.search(error['message']) != None:
quorum_queue_supported = False
break

if quorum_queue_supported:
# check if we receive the message from the STOMP subscription
self.assertTrue(self.listener.wait_for_complete_countdown(), "initial message not received")
self.assertEqual(1, len(self.listener.messages))
self.conn.disconnect()
# check if we receive the message from the STOMP subscription
self.assertTrue(self.listener.wait_for_complete_countdown(), "initial message not received")
self.assertEqual(1, len(self.listener.messages))
self.conn.disconnect()

connection.close()

Expand All @@ -66,4 +56,4 @@ def test_quorum_queue(self):
modules = [
__name__
]
test_runner.run_unittests(modules)
test_runner.run_unittests(modules)
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,10 @@ def test_stream_queue(self):
routing_key=queueName,
body='Hello World!')

# could we declare a stream queue?
stream_queue_supported = True
if len(self.listener.errors) > 0:
pattern = re.compile(r"feature flag is disabled", re.MULTILINE)
for error in self.listener.errors:
if pattern.search(error['message']) != None:
stream_queue_supported = False
break

if stream_queue_supported:
# check if we receive the message from the STOMP subscription
self.assertTrue(self.listener.wait(5), "initial message not received")
self.assertEqual(1, len(self.listener.messages))
self.conn.disconnect()
# check if we receive the message from the STOMP subscription
self.assertTrue(self.listener.wait(5), "initial message not received")
self.assertEqual(1, len(self.listener.messages))
self.conn.disconnect()

connection.close()

Expand Down
23 changes: 6 additions & 17 deletions deps/rabbitmq_stream/src/rabbit_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,12 @@
-include("rabbit_stream_metrics.hrl").

start(_Type, _Args) ->
case rabbit_feature_flags:is_enabled(stream_queue) of
true ->
rabbit_stream_metrics:init(),
rabbit_global_counters:init([{protocol, stream}],
?PROTOCOL_COUNTERS),
rabbit_global_counters:init([{protocol, stream},
{queue_type, ?STREAM_QUEUE_TYPE}]),
rabbit_stream_sup:start_link();
false ->
rabbit_log:warning("Unable to start the stream plugin. The stream_queue "
"feature flag is disabled. "
++ "Enable stream_queue feature flag then disable "
"and re-enable the rabbitmq_stream plugin. ",
"See https://www.rabbitmq.com/feature-flags.html "
"to learn more"),
{ok, self()}
end.
rabbit_stream_metrics:init(),
rabbit_global_counters:init([{protocol, stream}],
?PROTOCOL_COUNTERS),
rabbit_global_counters:init([{protocol, stream},
{queue_type, ?STREAM_QUEUE_TYPE}]),
rabbit_stream_sup:start_link().

tls_host() ->
case application:get_env(rabbitmq_stream, advertised_tls_host,
Expand Down
Loading