9696 soft_limit :: non_neg_integer (),
9797 slow = false :: boolean (),
9898 readers = #{} :: #{rabbit_types :ctag () => # stream {}},
99- writer_id :: binary (),
100- filtering_supported :: boolean ()
99+ writer_id :: binary ()
101100 }).
102101
103102-import (rabbit_queue_type_util , [args_policy_lookup /3 ]).
@@ -286,8 +285,7 @@ consume(Q, #{no_ack := true,
286285consume (Q , #{limiter_active := true }, _State )
287286 when ? amqqueue_is_stream (Q ) ->
288287 {error , global_qos_not_supported_for_queue_type };
289- consume (Q , Spec ,
290- # stream_client {filtering_supported = FilteringSupported } = QState0 )
288+ consume (Q , Spec , # stream_client {} = QState0 )
291289 when ? amqqueue_is_stream (Q ) ->
292290 % % Messages should include the offset as a custom header.
293291 case get_local_pid (QState0 ) of
@@ -307,26 +305,19 @@ consume(Q, Spec,
307305 {error , _ } = Err ->
308306 Err ;
309307 {ok , OffsetSpec } ->
310- FilterSpec = filter_spec (Args ),
311- case {FilterSpec , FilteringSupported } of
312- {#{filter_spec := _ }, false } ->
313- {protocol_error , precondition_failed ,
314- " Filtering is not supported" , []};
315- _ ->
316- ConsumerPrefetchCount = case Mode of
317- {simple_prefetch , C } -> C ;
318- _ -> 0
319- end ,
320- AckRequired = not NoAck ,
321- rabbit_core_metrics :consumer_created (
322- ChPid , ConsumerTag , ExclusiveConsume , AckRequired ,
323- QName , ConsumerPrefetchCount , false , up , Args ),
324- % % reply needs to be sent before the stream
325- % % begins sending
326- maybe_send_reply (ChPid , OkMsg ),
327- _ = rabbit_stream_coordinator :register_local_member_listener (Q ),
328- begin_stream (QState , ConsumerTag , OffsetSpec , Mode , AckRequired , FilterSpec )
329- end
308+ ConsumerPrefetchCount = case Mode of
309+ {simple_prefetch , C } -> C ;
310+ _ -> 0
311+ end ,
312+ AckRequired = not NoAck ,
313+ rabbit_core_metrics :consumer_created (
314+ ChPid , ConsumerTag , ExclusiveConsume , AckRequired ,
315+ QName , ConsumerPrefetchCount , false , up , Args ),
316+ % % reply needs to be sent before the stream
317+ % % begins sending
318+ maybe_send_reply (ChPid , OkMsg ),
319+ _ = rabbit_stream_coordinator :register_local_member_listener (Q ),
320+ begin_stream (QState , ConsumerTag , OffsetSpec , Mode , AckRequired , filter_spec (Args ))
330321 end ;
331322 {undefined , _ } ->
332323 {protocol_error , precondition_failed ,
@@ -510,8 +501,7 @@ deliver(QSs, Msg, Options) ->
510501 lists :foldl (
511502 fun ({Q , stateless }, {Qs , Actions }) ->
512503 LeaderPid = amqqueue :get_pid (Q ),
513- ok = osiris :write (LeaderPid ,
514- stream_message (Msg , filtering_supported ())),
504+ ok = osiris :write (LeaderPid , stream_message (Msg )),
515505 {Qs , Actions };
516506 ({Q , S0 }, {Qs , Actions0 }) ->
517507 {S , Actions } = deliver0 (maps :get (correlation , Options , undefined ),
@@ -526,11 +516,9 @@ deliver0(MsgId, Msg,
526516 next_seq = Seq ,
527517 correlation = Correlation0 ,
528518 soft_limit = SftLmt ,
529- slow = Slow0 ,
530- filtering_supported = FilteringSupported } = State ,
519+ slow = Slow0 } = State ,
531520 Actions0 ) ->
532- ok = osiris :write (LeaderPid , WriterId , Seq ,
533- stream_message (Msg , FilteringSupported )),
521+ ok = osiris :write (LeaderPid , WriterId , Seq , stream_message (Msg )),
534522 Correlation = case MsgId of
535523 undefined ->
536524 Correlation0 ;
@@ -547,19 +535,14 @@ deliver0(MsgId, Msg,
547535 correlation = Correlation ,
548536 slow = Slow }, Actions }.
549537
550- stream_message (Msg , FilteringSupported ) ->
538+ stream_message (Msg ) ->
551539 McAmqp = mc :convert (mc_amqp , Msg ),
552540 MsgData = mc :protocol_state (McAmqp ),
553- case FilteringSupported of
554- true ->
555- case mc :x_header (<<" x-stream-filter-value" >>, McAmqp ) of
556- undefined ->
557- MsgData ;
558- {utf8 , Value } ->
559- {Value , MsgData }
560- end ;
561- false ->
562- MsgData
541+ case mc :x_header (<<" x-stream-filter-value" >>, McAmqp ) of
542+ undefined ->
543+ MsgData ;
544+ {utf8 , Value } ->
545+ {Value , MsgData }
563546 end .
564547
565548-spec dequeue (_ , _ , _ , _ , client ()) -> no_return ().
@@ -936,8 +919,7 @@ init(Q) when ?is_amqqueue(Q) ->
936919 name = amqqueue :get_name (Q ),
937920 leader = Leader ,
938921 writer_id = WriterId ,
939- soft_limit = SoftLimit ,
940- filtering_supported = filtering_supported ()}};
922+ soft_limit = SoftLimit }};
941923 {ok , stream_not_found , _ } ->
942924 {error , stream_not_found };
943925 {error , coordinator_unavailable } = E ->
@@ -1294,8 +1276,7 @@ notify_decorators(Q) when ?is_amqqueue(Q) ->
12941276
12951277resend_all (# stream_client {leader = LeaderPid ,
12961278 writer_id = WriterId ,
1297- correlation = Corrs ,
1298- filtering_supported = FilteringSupported } = State ) ->
1279+ correlation = Corrs } = State ) ->
12991280 Msgs = lists :sort (maps :values (Corrs )),
13001281 case Msgs of
13011282 [] -> ok ;
@@ -1304,8 +1285,7 @@ resend_all(#stream_client{leader = LeaderPid,
13041285 [Seq , maps :size (Corrs )])
13051286 end ,
13061287 [begin
1307- ok = osiris :write (LeaderPid , WriterId , Seq ,
1308- stream_message (Msg , FilteringSupported ))
1288+ ok = osiris :write (LeaderPid , WriterId , Seq , stream_message (Msg ))
13091289 end || {Seq , Msg } <- Msgs ],
13101290 State .
13111291
@@ -1340,9 +1320,6 @@ list_with_minimum_quorum() ->
13401320
13411321is_stateful () -> true .
13421322
1343- filtering_supported () ->
1344- rabbit_feature_flags :is_enabled (stream_filtering ).
1345-
13461323get_nodes (Q ) when ? is_amqqueue (Q ) ->
13471324 #{nodes := Nodes } = amqqueue :get_type_state (Q ),
13481325 Nodes .
0 commit comments