@@ -24,7 +24,6 @@ handle_request(Request, Vhost, User, ConnectionPid) ->
2424 } = decode_req (ReqSections , {undefined , undefined }),
2525
2626 {StatusCode ,
27- RespAppProps0 ,
2827 RespBody } = try {PathSegments , QueryMap } = parse_uri (HttpRequestTarget ),
2928 handle_http_req (HttpMethod ,
3029 PathSegments ,
@@ -34,7 +33,9 @@ handle_request(Request, Vhost, User, ConnectionPid) ->
3433 User ,
3534 ConnectionPid )
3635 catch throw :{? MODULE , StatusCode0 , Explanation } ->
37- {StatusCode0 , [], {utf8 , unicode :characters_to_binary (Explanation )}}
36+ rabbit_log :warning (" request ~ts ~ts failed: ~ts " ,
37+ [HttpMethod , HttpRequestTarget , Explanation ]),
38+ {StatusCode0 , {utf8 , Explanation }}
3839 end ,
3940
4041 RespProps = # 'v1_0.properties' {
@@ -44,7 +45,9 @@ handle_request(Request, Vhost, User, ConnectionPid) ->
4445 % % [HTTP over AMQP WD 06 §5.1]
4546 correlation_id = MessageId },
4647 RespAppProps = # 'v1_0.application_properties' {
47- content = [{{utf8 , <<" http:response" >>}, {utf8 , <<" 1.1" >>}} | RespAppProps0 ]},
48+ content = [
49+ {{utf8 , <<" http:response" >>}, {utf8 , <<" 1.1" >>}}
50+ ]},
4851 RespDataSect = # 'v1_0.amqp_value' {content = RespBody },
4952 RespSections = [RespProps , RespAppProps , RespDataSect ],
5053 [amqp10_framing :encode_bin (Sect ) || Sect <- RespSections ].
@@ -75,7 +78,7 @@ handle_http_req(<<"PUT">>,
7578 Q0 = amqqueue :new (QName , none , Durable , AutoDelete , Owner ,
7679 QArgs , Vhost , #{user => Username }, QType ),
7780 {new , _Q } = rabbit_queue_type :declare (Q0 , node ()),
78- {<<" 201" >>, [], null };
81+ {<<" 201" >>, null };
7982
8083handle_http_req (<<" PUT" >>,
8184 [<<" exchanges" >>, XNameBinQuoted ],
@@ -95,8 +98,8 @@ handle_http_req(<<"PUT">>,
9598 catch exit :# amqp_error {explanation = Explanation } ->
9699 throw (<<" 400" >>, Explanation , [])
97100 end ,
98- ok = prohibit_default_exchange (XNameBin ),
99101 XName = rabbit_misc :r (Vhost , exchange , XNameBin ),
102+ ok = prohibit_default_exchange (XName ),
100103 ok = check_resource_access (XName , configure , User ),
101104 X = case rabbit_exchange :lookup (XName ) of
102105 {ok , FoundX } ->
@@ -111,7 +114,7 @@ handle_http_req(<<"PUT">>,
111114 try rabbit_exchange :assert_equivalence (
112115 X , XTypeAtom , Durable , AutoDelete , Internal , XArgs ) of
113116 ok ->
114- {<<" 204" >>, [], null }
117+ {<<" 204" >>, null }
115118 catch exit :# amqp_error {name = precondition_failed ,
116119 explanation = Expl } ->
117120 throw (<<" 409" >>, Expl , [])
@@ -132,7 +135,7 @@ handle_http_req(<<"DELETE">>,
132135 rabbit_queue_type :purge (Q )
133136 end ),
134137 RespPayload = {map , [{{utf8 , <<" message_count" >>}, {ulong , NumMsgs }}]},
135- {<<" 200" >>, [], RespPayload };
138+ {<<" 200" >>, RespPayload };
136139
137140handle_http_req (<<" DELETE" >>,
138141 [<<" queues" >>, QNameBinQuoted ],
@@ -145,7 +148,7 @@ handle_http_req(<<"DELETE">>,
145148 QName = rabbit_misc :r (Vhost , queue , QNameBin ),
146149 {ok , NumMsgs } = rabbit_amqqueue :delete_with (QName , ConnPid , false , false , Username , true ),
147150 RespPayload = {map , [{{utf8 , <<" message_count" >>}, {ulong , NumMsgs }}]},
148- {<<" 200" >>, [], RespPayload };
151+ {<<" 200" >>, RespPayload };
149152
150153handle_http_req (<<" DELETE" >>,
151154 [<<" exchanges" >>, XNameBinQuoted ],
@@ -157,19 +160,19 @@ handle_http_req(<<"DELETE">>,
157160 XNameBin = uri_string :unquote (XNameBinQuoted ),
158161 XName = rabbit_misc :r (Vhost , exchange , XNameBin ),
159162 ok = prohibit_cr_lf (XNameBin ),
160- ok = prohibit_default_exchange (XNameBin ),
163+ ok = prohibit_default_exchange (XName ),
161164 ok = prohibit_reserved_amq (XName ),
162165 ok = check_resource_access (XName , configure , User ),
163166 _ = rabbit_exchange :delete (XName , false , Username ),
164- {<<" 204" >>, [], null };
167+ {<<" 204" >>, null };
165168
166169handle_http_req (<<" POST" >>,
167170 [<<" bindings" >>],
168171 _Query ,
169172 ReqPayload ,
170173 Vhost ,
171- # user {username = Username },
172- _ConnPid ) ->
174+ User = # user {username = Username },
175+ ConnPid ) ->
173176 #{source := SrcXNameBin ,
174177 binding_key := BindingKey ,
175178 arguments := Args } = BindingMap = decode_binding (ReqPayload ),
@@ -181,34 +184,37 @@ handle_http_req(<<"POST">>,
181184 end ,
182185 SrcXName = rabbit_misc :r (Vhost , exchange , SrcXNameBin ),
183186 DstName = rabbit_misc :r (Vhost , DstKind , DstNameBin ),
187+ ok = binding_checks (SrcXName , DstName , BindingKey , User ),
184188 Binding = # binding {source = SrcXName ,
185189 destination = DstName ,
186190 key = BindingKey ,
187191 args = Args },
188- % %TODO If the binding already exists, return 303 with location.
189- ok = rabbit_binding :add (Binding , Username ),
190- Location = compose_binding_uri (SrcXNameBin , DstKind , DstNameBin , BindingKey , Args ),
191- AppProps = [{{utf8 , <<" location" >>}, {utf8 , Location }}],
192- {<<" 201" >>, AppProps , null };
192+ ok = binding_action (add , Binding , Username , ConnPid ),
193+ {<<" 204" >>, null };
193194
194195handle_http_req (<<" DELETE" >>,
195196 [<<" bindings" >>, BindingSegment ],
196197 _Query ,
197198 null ,
198199 Vhost ,
199- # user {username = Username },
200- _ConnPid ) ->
201- {SrcXNameBin , DstKind , DstNameBin , BindingKey , ArgsHash } = decode_binding_path_segment (BindingSegment ),
200+ User = # user {username = Username },
201+ ConnPid ) ->
202+ {SrcXNameBin ,
203+ DstKind ,
204+ DstNameBin ,
205+ BindingKey ,
206+ ArgsHash } = decode_binding_path_segment (BindingSegment ),
202207 SrcXName = rabbit_misc :r (Vhost , exchange , SrcXNameBin ),
203208 DstName = rabbit_misc :r (Vhost , DstKind , DstNameBin ),
209+ ok = binding_checks (SrcXName , DstName , BindingKey , User ),
204210 Bindings = rabbit_binding :list_for_source_and_destination (SrcXName , DstName ),
205211 case search_binding (BindingKey , ArgsHash , Bindings ) of
206212 {value , Binding } ->
207- ok = rabbit_binding : remove ( Binding , Username );
213+ ok = binding_action ( remove , Binding , Username , ConnPid );
208214 false ->
209215 ok
210216 end ,
211- {<<" 204" >>, [], null };
217+ {<<" 204" >>, null };
212218
213219handle_http_req (<<" GET" >>,
214220 [<<" bindings" >>],
@@ -218,20 +224,23 @@ handle_http_req(<<"GET">>,
218224 Vhost ,
219225 _User ,
220226 _ConnPid ) ->
221- {DstKind , DstNameBin } = case QueryMap of
222- #{<<" dste" >> := DstX } ->
223- {exchange , DstX };
224- #{<<" dstq" >> := DstQ } ->
225- {queue , DstQ };
226- _ ->
227- throw (<<" 400" >>, " missing 'dste' or 'dstq' in query: ~tp " , QueryMap )
228- end ,
227+ {DstKind ,
228+ DstNameBin } = case QueryMap of
229+ #{<<" dste" >> := DstX } ->
230+ {exchange , DstX };
231+ #{<<" dstq" >> := DstQ } ->
232+ {queue , DstQ };
233+ _ ->
234+ throw (<<" 400" >>,
235+ " missing 'dste' or 'dstq' in query: ~tp " ,
236+ QueryMap )
237+ end ,
229238 SrcXName = rabbit_misc :r (Vhost , exchange , SrcXNameBin ),
230239 DstName = rabbit_misc :r (Vhost , DstKind , DstNameBin ),
231240 Bindings0 = rabbit_binding :list_for_source_and_destination (SrcXName , DstName ),
232241 Bindings = [B || B = # binding {key = K } <- Bindings0 , K =:= Key ],
233242 RespPayload = encode_bindings (Bindings ),
234- {<<" 200" >>, [], RespPayload }.
243+ {<<" 200" >>, RespPayload }.
235244
236245decode_queue ({map , KVList }) ->
237246 M = lists :foldl (
@@ -424,19 +433,47 @@ args_hash([]) ->
424433 <<>>;
425434args_hash (Args )
426435 when is_list (Args ) ->
436+ % % Args is already sorted.
427437 Bin = <<(erlang :phash2 (Args , 1 bsl 32 )):32 >>,
428438 base64 :encode (Bin , #{mode => urlsafe ,
429439 padding => false }).
430440
441+ -spec binding_checks (rabbit_types :exchange_name (),
442+ resource_name (),
443+ rabbit_types :binding_key (),
444+ rabbit_types :user ()) -> ok .
445+ binding_checks (SrcXName , DstName , BindingKey , User ) ->
446+ lists :foreach (fun (# resource {name = NameBin } = Name ) ->
447+ ok = prohibit_default_exchange (Name ),
448+ ok = prohibit_cr_lf (NameBin )
449+ end , [SrcXName , DstName ]),
450+ ok = check_resource_access (DstName , write , User ),
451+ ok = check_resource_access (SrcXName , read , User ),
452+ case rabbit_exchange :lookup (SrcXName ) of
453+ {ok , SrcX } ->
454+ rabbit_amqp_session :check_read_permitted_on_topic (SrcX , User , BindingKey );
455+ {error , not_found } ->
456+ ok
457+ end .
458+
459+ binding_action (Action , Binding , Username , ConnPid ) ->
460+ try rabbit_channel :binding_action (Action , Binding , Username , ConnPid )
461+ catch exit :# amqp_error {explanation = Explanation } ->
462+ throw (<<" 400" >>, Explanation , [])
463+ end .
464+
431465prohibit_cr_lf (NameBin ) ->
432466 case binary :match (NameBin , [<<" \n " >>, <<" \r " >>]) of
433467 nomatch ->
434468 ok ;
435469 _Found ->
436- throw (<<" 400" >>, <<" Bad name '~ts ': \n and \r not allowed" >>, [NameBin ])
470+ throw (<<" 400" >>,
471+ <<" Bad name '~ts ': line feed and carriage return characters not allowed" >>,
472+ [NameBin ])
437473 end .
438474
439- prohibit_default_exchange (<<>>) ->
475+ prohibit_default_exchange (# resource {kind = exchange ,
476+ name = <<" " >>}) ->
440477 throw (<<" 403" >>, <<" operation not permitted on the default exchange" >>, []);
441478prohibit_default_exchange (_ ) ->
442479 ok .
@@ -464,6 +501,6 @@ check_resource_access(Resource, Perm, User) ->
464501
465502-spec throw (binary (), io :format (), [term ()]) -> no_return ().
466503throw (StatusCode , Format , Data ) ->
467- Explanation = lists :flatten (io_lib :format (Format , Data )),
468- rabbit_log : warning ( Explanation ),
469- throw ({? MODULE , StatusCode , Explanation }).
504+ Reason0 = lists :flatten (io_lib :format (Format , Data )),
505+ Reason = unicode : characters_to_binary ( Reason0 ),
506+ throw ({? MODULE , StatusCode , Reason }).
0 commit comments