5252 diff /2 ]).
5353
5454-define (MAX_SESSION_WINDOW_SIZE , 65535 ).
55- -define (DEFAULT_TIMEOUT , 5000 ).
5655-define (UINT_OUTGOING_WINDOW , {uint , ? UINT_MAX }).
5756-define (INITIAL_OUTGOING_DELIVERY_ID , ? UINT_MAX ).
5857% % "The next-outgoing-id MAY be initialized to an arbitrary value" [2.5.6]
149148 reader :: pid (),
150149 socket :: amqp10_client_connection :amqp10_socket () | undefined ,
151150 links = #{} :: #{output_handle () => # link {}},
152- link_index = #{} :: #{link_name () => output_handle ()},
151+ link_index = #{} :: #{{ link_role (), link_name ()} => output_handle ()},
153152 link_handle_index = #{} :: #{input_handle () => output_handle ()},
154153 next_link_handle = 0 :: output_handle (),
155154 early_attach_requests :: [term ()],
172171
173172-spec begin_sync (pid ()) -> supervisor :startchild_ret ().
174173begin_sync (Connection ) ->
175- begin_sync (Connection , ? DEFAULT_TIMEOUT ).
174+ begin_sync (Connection , ? TIMEOUT ).
176175
177176-spec begin_sync (pid (), non_neg_integer ()) ->
178177 supervisor :startchild_ret () | session_timeout .
@@ -302,33 +301,37 @@ mapped(cast, #'v1_0.end'{error = Err}, State) ->
302301mapped (cast , # 'v1_0.attach' {name = {utf8 , Name },
303302 initial_delivery_count = IDC ,
304303 handle = {uint , InHandle },
304+ role = PeerRoleBool ,
305305 max_message_size = MaybeMaxMessageSize },
306306 # state {links = Links , link_index = LinkIndex ,
307307 link_handle_index = LHI } = State0 ) ->
308308
309- #{Name := OutHandle } = LinkIndex ,
309+ OurRoleBool = not PeerRoleBool ,
310+ OurRole = boolean_to_role (OurRoleBool ),
311+ LinkIndexKey = {OurRole , Name },
312+ #{LinkIndexKey := OutHandle } = LinkIndex ,
310313 #{OutHandle := Link0 } = Links ,
311314 ok = notify_link_attached (Link0 ),
312315
313316 {DeliveryCount , MaxMessageSize } =
314317 case Link0 of
315- # link {role = sender ,
318+ # link {role = sender = OurRole ,
316319 delivery_count = DC } ->
317320 MSS = case MaybeMaxMessageSize of
318321 {ulong , S } when S > 0 -> S ;
319322 _ -> undefined
320323 end ,
321324 {DC , MSS };
322- # link {role = receiver ,
325+ # link {role = receiver = OurRole ,
323326 max_message_size = MSS } ->
324327 {unpack (IDC ), MSS }
325328 end ,
326329 Link = Link0 # link {state = attached ,
327330 input_handle = InHandle ,
328331 delivery_count = DeliveryCount ,
329332 max_message_size = MaxMessageSize },
330- State = State0 # state {links = Links #{OutHandle => Link },
331- link_index = maps :remove (Name , LinkIndex ),
333+ State = State0 # state {links = Links #{OutHandle : = Link },
334+ link_index = maps :remove (LinkIndexKey , LinkIndex ),
332335 link_handle_index = LHI #{InHandle => OutHandle }},
333336 {keep_state , State };
334337mapped (cast , # 'v1_0.detach' {handle = {uint , InHandle },
@@ -648,8 +651,8 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) ->
648651
649652make_source (#{role := {sender , _ }}) ->
650653 # 'v1_0.source' {};
651- make_source (#{role := {receiver , #{address := Address } = Target , _Pid }, filter := Filter }) ->
652- Durable = translate_terminus_durability (maps :get (durable , Target , none )),
654+ make_source (#{role := {receiver , #{address := Address } = Source , _Pid }, filter := Filter }) ->
655+ Durable = translate_terminus_durability (maps :get (durable , Source , none )),
653656 TranslatedFilter = translate_filters (Filter ),
654657 # 'v1_0.source' {address = {utf8 , Address },
655658 durable = {uint , Durable },
@@ -743,35 +746,34 @@ detach_with_error_cond(Link = #link{output_handle = OutHandle}, State, Cond) ->
743746 ok = send (Detach , State ),
744747 Link # link {state = detach_sent }.
745748
746- send_attach (Send , #{name := Name , role := Role } = Args , {FromPid , _ },
747- # state {next_link_handle = OutHandle0 , links = Links ,
749+ send_attach (Send , #{name := Name , role := RoleTuple } = Args , {FromPid , _ },
750+ # state {next_link_handle = OutHandle0 , links = Links ,
748751 link_index = LinkIndex } = State ) ->
749752
750753 Source = make_source (Args ),
751754 Target = make_target (Args ),
752755 Properties = amqp10_client_types :make_properties (Args ),
753756
754- {LinkTarget , RoleAsBool , InitialDeliveryCount , MaxMessageSize } =
755- case Role of
757+ {LinkTarget , InitialDeliveryCount , MaxMessageSize } =
758+ case RoleTuple of
756759 {receiver , _ , Pid } ->
757- {{pid , Pid }, true , undefined , max_message_size (Args )};
760+ {{pid , Pid }, undefined , max_message_size (Args )};
758761 {sender , #{address := TargetAddr }} ->
759- {TargetAddr , false , uint (? INITIAL_DELIVERY_COUNT ), undefined }
760- end ,
761-
762- {OutHandle , NextLinkHandle } =
763- case Args of
764- #{handle := Handle } ->
765- % % Client app provided link handle.
766- % % Really only meant for integration tests.
767- {Handle , OutHandle0 };
768- _ ->
769- {OutHandle0 , OutHandle0 + 1 }
762+ {TargetAddr , uint (? INITIAL_DELIVERY_COUNT ), undefined }
770763 end ,
771764
765+ {OutHandle , NextLinkHandle } = case Args of
766+ #{handle := Handle } ->
767+ % % Client app provided link handle.
768+ % % Really only meant for integration tests.
769+ {Handle , OutHandle0 };
770+ _ ->
771+ {OutHandle0 , OutHandle0 + 1 }
772+ end ,
773+ Role = element (1 , RoleTuple ),
772774 % create attach performative
773775 Attach = # 'v1_0.attach' {name = {utf8 , Name },
774- role = RoleAsBool ,
776+ role = role_to_boolean ( Role ) ,
775777 handle = {uint , OutHandle },
776778 source = Source ,
777779 properties = Properties ,
@@ -782,12 +784,12 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
782784 max_message_size = MaxMessageSize },
783785 ok = Send (Attach , State ),
784786
785- LinkRef = make_link_ref (element ( 1 , Role ) , self (), OutHandle ),
787+ Ref = make_link_ref (Role , self (), OutHandle ),
786788 Link = # link {name = Name ,
787- ref = LinkRef ,
789+ ref = Ref ,
788790 output_handle = OutHandle ,
789791 state = attach_sent ,
790- role = element ( 1 , Role ) ,
792+ role = Role ,
791793 notify = FromPid ,
792794 auto_flow = never ,
793795 target = LinkTarget ,
@@ -796,7 +798,7 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
796798
797799 {State # state {links = Links #{OutHandle => Link },
798800 next_link_handle = NextLinkHandle ,
799- link_index = LinkIndex #{Name => OutHandle }}, LinkRef }.
801+ link_index = LinkIndex #{{ Role , Name } => OutHandle }}, Ref }.
800802
801803- spec handle_session_flow (# 'v1_0.flow' {}, # state {}) -> # state {}.
802804handle_session_flow (# 'v1_0.flow' {next_incoming_id = MaybeNII ,
@@ -1090,6 +1092,16 @@ sym(B) when is_atom(B) -> {symbol, atom_to_binary(B, utf8)}.
10901092reason (undefined ) -> normal ;
10911093reason (Other ) -> Other .
10921094
1095+ role_to_boolean (sender ) ->
1096+ ? AMQP_ROLE_SENDER ;
1097+ role_to_boolean (receiver ) ->
1098+ ? AMQP_ROLE_RECEIVER .
1099+
1100+ boolean_to_role (? AMQP_ROLE_SENDER ) ->
1101+ sender ;
1102+ boolean_to_role (? AMQP_ROLE_RECEIVER ) ->
1103+ receiver .
1104+
10931105format_status (Status = #{data := Data0 }) ->
10941106 # state {channel = Channel ,
10951107 remote_channel = RemoteChannel ,
0 commit comments