77
88-module (rabbit_amqp1_0_session ).
99
10- % %TODO change to gen_server
11- -behaviour (gen_server2 ).
10+ -behaviour (gen_server ).
1211
1312-include_lib (" amqp_client/include/amqp_client.hrl" ).
1413-include (" rabbit_amqp1_0.hrl" ).
117116 }).
118117
119118start_link (Args ) ->
120- gen_server2 :start_link (? MODULE , Args , []).
119+ gen_server :start_link (? MODULE , Args , []).
121120
122121get_info (Pid ) ->
123- gen_server2 :call (Pid , info , ? CALL_TIMEOUT ).
122+ gen_server :call (Pid , info , ? CALL_TIMEOUT ).
124123
125124process_frame (Pid , Frame ) ->
126125 credit_flow :send (Pid ),
127- gen_server2 :cast (Pid , {frame , Frame , self ()}).
126+ gen_server :cast (Pid , {frame , Frame , self ()}).
128127
129128init ({Channel , ReaderPid , WriterPid , User , Vhost , FrameMax }) ->
130129 process_flag (trap_exit , true ),
@@ -367,12 +366,8 @@ handle_control(#'v1_0.attach'{role = ?RECV_ROLE,
367366 _ -> ? DEFAULT_SEND_SETTLED
368367 end ,
369368 DOSym = amqp10_framing :symbol_for (DefaultOutcome ),
370- case ensure_source (Source ,
371- # outgoing_link {delivery_count = ? INIT_TXFR_COUNT ,
372- send_settled = SndSettled ,
373- default_outcome = DOSym },
374- Vhost ) of
375- {ok , OutgoingLink = # outgoing_link {queue = QNameBin }} ->
369+ case ensure_source (Source , Vhost ) of
370+ {ok , QNameBin } ->
376371 CTag = handle_to_ctag (Handle ),
377372 Args = source_filters_to_consumer_args (Source ) ++
378373 [{<<" x-credit" >>, table , [{<<" credit" >>, long , 0 },
@@ -408,7 +403,11 @@ handle_control(#'v1_0.attach'{role = ?RECV_ROLE,
408403 source = Source # 'v1_0.source' {default_outcome = DefaultOutcome ,
409404 outcomes = Outcomes },
410405 role = ? SEND_ROLE },
411- {ok , [AttachReply ], OutgoingLink , State1 };
406+ OutLink = # outgoing_link {delivery_count = ? INIT_TXFR_COUNT ,
407+ queue = QNameBin ,
408+ send_settled = SndSettled ,
409+ default_outcome = DOSym },
410+ {ok , [AttachReply ], OutLink , State1 };
412411 {error , Reason } ->
413412 protocol_error (
414413 ? V_1_0_AMQP_ERROR_INTERNAL_ERROR ,
@@ -1250,10 +1249,10 @@ outgoing_link_flow(#outgoing_link{delivery_count = DeliveryCountSnd,
12501249default (undefined , Default ) -> Default ;
12511250default (Thing , _Default ) -> Thing .
12521251
1253- ensure_source (# 'v1_0.source' {dynamic = true }, _Link , _Vhost ) ->
1252+ ensure_source (# 'v1_0.source' {dynamic = true }, _Vhost ) ->
12541253 protocol_error (? V_1_0_AMQP_ERROR_NOT_IMPLEMENTED , " Dynamic sources not supported" , []);
12551254ensure_source (# 'v1_0.source' {address = Address ,
1256- durable = Durable }, Link , Vhost ) ->
1255+ durable = Durable }, Vhost ) ->
12571256 case Address of
12581257 {utf8 , SourceAddr } ->
12591258 case rabbit_routing_util :parse_endpoint (SourceAddr , false ) of
@@ -1264,7 +1263,7 @@ ensure_source(#'v1_0.source'{address = Address,
12641263 case rabbit_routing_util :parse_routing (Src ) of
12651264 {" " , QNameList } ->
12661265 true = string :equal (QNameList , QNameBin ),
1267- {ok , Link # outgoing_link { queue = QNameBin } };
1266+ {ok , QNameBin };
12681267 {XNameList , RoutingKeyList } ->
12691268 RoutingKey = list_to_binary (RoutingKeyList ),
12701269 XNameBin = list_to_binary (XNameList ),
@@ -1276,7 +1275,7 @@ ensure_source(#'v1_0.source'{address = Address,
12761275 % % TODO authorisation checks
12771276 case rabbit_binding :add (Binding , <<" todo" >>) of
12781277 ok ->
1279- {ok , Link # outgoing_link { queue = QNameBin } };
1278+ {ok , QNameBin };
12801279 {error , _ } = Err ->
12811280 Err
12821281 end
0 commit comments