Skip to content
Open
19 changes: 0 additions & 19 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,6 @@ declare_args() ->
{<<"x-max-in-memory-bytes">>, fun check_non_neg_int_arg/2},
{<<"x-max-priority">>, fun check_max_priority_arg/2},
{<<"x-overflow">>, fun check_overflow/2},
{<<"x-queue-mode">>, fun check_queue_mode/2},
{<<"x-queue-version">>, fun check_queue_version/2},
{<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2},
{<<"x-queue-type">>, fun check_queue_type/2},
Expand Down Expand Up @@ -1167,32 +1166,14 @@ check_stream_offset_arg(Val, _Args) ->
{error, {invalid_stream_offset_arg, Val}}
end.

-define(KNOWN_QUEUE_MODES, [<<"default">>, <<"lazy">>]).
check_queue_mode({longstr, Val}, _Args) ->
case lists:member(Val, ?KNOWN_QUEUE_MODES) of
true -> ok;
false -> {error, rabbit_misc:format("unsupported queue mode '~ts'", [Val])}
end;
check_queue_mode({Type, _}, _Args) ->
{error, {unacceptable_type, Type}};
check_queue_mode(Val, _Args) when is_binary(Val) ->
case lists:member(Val, ?KNOWN_QUEUE_MODES) of
true -> ok;
false -> {error, rabbit_misc:format("unsupported queue mode '~ts'", [Val])}
end;
check_queue_mode(_Val, _Args) ->
{error, invalid_queue_mode}.

check_queue_version({Type, Val}, Args) ->
case check_non_neg_int_arg({Type, Val}, Args) of
ok when Val == 1 -> ok;
ok when Val == 2 -> ok;
ok -> {error, rabbit_misc:format("unsupported queue version '~b'", [Val])};
Error -> Error
end;
check_queue_version(Val, Args) ->
case check_non_neg_int_arg(Val, Args) of
ok when Val == 1 -> ok;
ok when Val == 2 -> ok;
ok -> {error, rabbit_misc:format("unsupported queue version '~b'", [Val])};
Error -> Error
Expand Down
20 changes: 1 addition & 19 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -490,9 +490,7 @@ process_args_policy(State = #q{q = Q,
{<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
{<<"max-length">>, fun res_min/2, fun init_max_length/2},
{<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2},
{<<"overflow">>, fun res_arg/2, fun init_overflow/2},
{<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2},
{<<"queue-version">>, fun res_arg/2, fun init_queue_version/2}],
{<<"overflow">>, fun res_arg/2, fun init_overflow/2}],
drop_expired_msgs(
lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
Fun(rabbit_queue_type_util:args_policy_lookup(Name, Resolve, Q), StateN)
Expand Down Expand Up @@ -543,22 +541,6 @@ init_overflow(Overflow, State) ->
State#q{overflow = OverflowVal}
end.

init_queue_mode(undefined, State) ->
State;
init_queue_mode(Mode, State = #q {backing_queue = BQ,
backing_queue_state = BQS}) ->
BQS1 = BQ:set_queue_mode(binary_to_existing_atom(Mode, utf8), BQS),
State#q{backing_queue_state = BQS1}.

init_queue_version(Version0, State = #q {backing_queue = BQ,
backing_queue_state = BQS}) ->
Version = case Version0 of
undefined -> 2;
_ -> Version0
end,
BQS1 = BQ:set_queue_version(Version, BQS),
State#q{backing_queue_state = BQS1}.

reply(Reply, NewState) ->
{NewState1, Timeout} = next_state(NewState),
{reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
Expand Down
17 changes: 1 addition & 16 deletions deps/rabbit/src/rabbit_backing_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
message_bytes, message_bytes_ready,
message_bytes_unacknowledged, message_bytes_ram,
message_bytes_persistent, head_message_timestamp,
disk_reads, disk_writes, backing_queue_status,
messages_paged_out, message_bytes_paged_out]).
disk_reads, disk_writes, backing_queue_status]).

%% We can't specify a per-queue ack/state with callback signatures
-type ack() :: any().
Expand All @@ -37,9 +36,6 @@
-type msg_fun(A) :: fun ((mc:state(), ack(), A) -> A).
-type msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean()).

-type queue_mode() :: atom().
-type queue_version() :: pos_integer().

%% Called on startup with a vhost and a list of durable queue names on this vhost.
%% The queues aren't being started at this point, but this call allows the
%% backing queue to perform any checking necessary for the consistency
Expand Down Expand Up @@ -173,13 +169,6 @@
%% each message, its ack tag, and an accumulator.
-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.

%% Fold over all the messages in a queue and return the accumulated
%% results, leaving the queue undisturbed.
-callback fold(fun((mc:state(),
rabbit_types:message_properties(),
boolean(), A) -> {('stop' | 'cont'), A}),
A, state()) -> {A, state()}.

%% How long is my queue?
-callback len(state()) -> non_neg_integer().

Expand Down Expand Up @@ -223,10 +212,6 @@
%% or discarded previously).
-callback is_duplicate(mc:state(), state()) -> {boolean(), state()}.

-callback set_queue_mode(queue_mode(), state()) -> state().

-callback set_queue_version(queue_version(), state()) -> state().

-callback zip_msgs_and_acks([delivered_publish()],
[ack()], Acc, state())
-> Acc.
Expand Down
Loading
Loading