1919-define (INIT_TXFR_COUNT , 0 ).
2020-define (DEFAULT_SEND_SETTLED , false ).
2121
22+ % % [3.4]
23+ -define (OUTCOMES , [? V_1_0_SYMBOL_ACCEPTED ,
24+ ? V_1_0_SYMBOL_REJECTED ,
25+ ? V_1_0_SYMBOL_RELEASED ,
26+ ? V_1_0_SYMBOL_MODIFIED ]).
27+
2228% % Just make these constant for the time being.
2329-define (INCOMING_CREDIT , 65536 ).
2430
4147
4248-import (rabbit_amqp1_0_util , [protocol_error /3 ,
4349 serial_add /2 , serial_diff /2 , serial_compare /2 ]).
44- -import (rabbit_amqp1_0_link_util , [handle_to_ctag /1 ,
45- ctag_to_handle /1 ]).
4650
4751-record (incoming_link , {
4852 name ,
5862-record (outgoing_link , {
5963 queue :: undefined | rabbit_misc :resource_name (),
6064 delivery_count = 0 ,
61- % % TODO below 2 fields are not needed?
62- send_settled ,
63- default_outcome }).
65+ % % TODO below field is not needed?
66+ send_settled }).
6467
6568-record (outgoing_unsettled , {
6669 % % The queue sent us this consumer scoped sequence number.
@@ -316,7 +319,7 @@ handle_control(#'v1_0.attach'{role = ?SEND_ROLE,
316319 routing_key = RoutingKey ,
317320 delivery_count = InitTransfer ,
318321 recv_settle_mode = RcvSettleMode },
319- { _DefaultOutcome , _Outcomes } = rabbit_amqp1_0_link_util : outcomes (Source ),
322+ _Outcomes = outcomes (Source ),
320323 Confirm = case SndSettleMode of
321324 ? V_1_0_SENDER_SETTLE_MODE_UNSETTLED -> true ;
322325 ? V_1_0_SENDER_SETTLE_MODE_SETTLED -> false ;
@@ -359,13 +362,11 @@ handle_control(#'v1_0.attach'{role = ?RECV_ROLE,
359362 queue_states = QStates0 } = State0 ) ->
360363
361364 ok = validate_attach (Attach ),
362- {DefaultOutcome , Outcomes } = rabbit_amqp1_0_link_util :outcomes (Source ),
363365 SndSettled = case SndSettleMode of
364366 ? V_1_0_SENDER_SETTLE_MODE_SETTLED -> true ;
365367 ? V_1_0_SENDER_SETTLE_MODE_UNSETTLED -> false ;
366368 _ -> ? DEFAULT_SEND_SETTLED
367369 end ,
368- DOSym = amqp10_framing :symbol_for (DefaultOutcome ),
369370 case ensure_source (Source , Vhost ) of
370371 {ok , QNameBin } ->
371372 CTag = handle_to_ctag (Handle ),
@@ -400,13 +401,16 @@ handle_control(#'v1_0.attach'{role = ?RECV_ROLE,
400401 false -> ? V_1_0_SENDER_SETTLE_MODE_UNSETTLED
401402 end ,
402403 rcv_settle_mode = RcvSettleMode ,
403- source = Source # 'v1_0.source' {default_outcome = DefaultOutcome ,
404- outcomes = Outcomes },
404+ % % The queue process monitors our session process. When our session process terminates
405+ % % (abnormally) any messages checked out to our session process will be requeued.
406+ % % That's why the we only support RELEASED as the default outcome.
407+ source = Source # 'v1_0.source' {
408+ default_outcome = # 'v1_0.released' {},
409+ outcomes = outcomes (Source )},
405410 role = ? SEND_ROLE },
406411 OutLink = # outgoing_link {delivery_count = ? INIT_TXFR_COUNT ,
407412 queue = QNameBin ,
408- send_settled = SndSettled ,
409- default_outcome = DOSym },
413+ send_settled = SndSettled },
410414 {ok , [AttachReply ], OutLink , State1 };
411415 {error , Reason } ->
412416 protocol_error (
@@ -1394,7 +1398,7 @@ declare_queue(QNameBin, Vhost, TerminusDurability) ->
13941398 rabbit_core_metrics :queue_declared (QName ),
13951399 Q0 = amqqueue :new (QName ,
13961400 _Pid = none ,
1397- rabbit_amqp1_0_link_util : durable (TerminusDurability ),
1401+ queue_is_durable (TerminusDurability ),
13981402 _AutoDelete = false ,
13991403 _QOwner = none ,
14001404 _QArgs = [],
@@ -1411,6 +1415,47 @@ declare_queue(QNameBin, Vhost, TerminusDurability) ->
14111415 protocol_error (? V_1_0_AMQP_ERROR_INTERNAL_ERROR , " Failed to declare ~s : ~p " , [rabbit_misc :rs (QName ), Other ])
14121416 end .
14131417
1418+ outcomes (# 'v1_0.source' {outcomes = undefined }) ->
1419+ {array , symbol , ? OUTCOMES };
1420+ outcomes (# 'v1_0.source' {outcomes = {array , symbol , Syms } = Outcomes }) ->
1421+ case lists :filter (fun (O ) -> not lists :member (O , ? OUTCOMES ) end , Syms ) of
1422+ [] ->
1423+ Outcomes ;
1424+ Unsupported ->
1425+ rabbit_amqp1_0_util :protocol_error (
1426+ ? V_1_0_AMQP_ERROR_NOT_IMPLEMENTED ,
1427+ " Outcomes not supported: ~tp " ,
1428+ [Unsupported ])
1429+ end ;
1430+ outcomes (# 'v1_0.source' {outcomes = Unsupported }) ->
1431+ rabbit_amqp1_0_util :protocol_error (
1432+ ? V_1_0_AMQP_ERROR_NOT_IMPLEMENTED ,
1433+ " Outcomes not supported: ~tp " ,
1434+ [Unsupported ]);
1435+ outcomes (_ ) ->
1436+ {array , symbol , ? OUTCOMES }.
1437+
1438+ -spec handle_to_ctag ({uint , non_neg_integer ()}) ->
1439+ binary ().
1440+ handle_to_ctag ({uint , H }) ->
1441+ <<" ctag-" , H :32 /integer >>.
1442+
1443+ -spec ctag_to_handle (binary ()) ->
1444+ {uint , non_neg_integer ()}.
1445+ ctag_to_handle (<<" ctag-" , H :32 /integer >>) ->
1446+ {uint , H }.
1447+
1448+ queue_is_durable (? V_1_0_TERMINUS_DURABILITY_NONE ) ->
1449+ false ;
1450+ queue_is_durable (? V_1_0_TERMINUS_DURABILITY_CONFIGURATION ) ->
1451+ true ;
1452+ queue_is_durable (? V_1_0_TERMINUS_DURABILITY_UNSETTLED_STATE ) ->
1453+ true ;
1454+ queue_is_durable (undefined ) ->
1455+ % % <field name="durable" type="terminus-durability" default="none"/>
1456+ % % [3.5.3]
1457+ queue_is_durable (? V_1_0_TERMINUS_DURABILITY_NONE ).
1458+
14141459% %% TODO move copied code to some common module
14151460% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
14161461% % BEGIN copy from rabbit_channel %%%
0 commit comments