|
94 | 94 | }). |
95 | 95 |
|
96 | 96 | -record(incoming_link, { |
97 | | - exchange :: rabbit_exchange:name(), |
| 97 | + exchange :: rabbit_types:exchange() | rabbit_exchange:name(), |
98 | 98 | routing_key :: undefined | rabbit_types:routing_key(), |
99 | 99 | %% queue_name_bin is only set if the link target address refers to a queue. |
100 | 100 | queue_name_bin :: undefined | rabbit_misc:resource_name(), |
@@ -857,9 +857,9 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, |
857 | 857 | user = User}}) -> |
858 | 858 | ok = validate_attach(Attach), |
859 | 859 | case ensure_target(Target, Vhost, User) of |
860 | | - {ok, XName, RoutingKey, QNameBin} -> |
| 860 | + {ok, Exchange, RoutingKey, QNameBin} -> |
861 | 861 | IncomingLink = #incoming_link{ |
862 | | - exchange = XName, |
| 862 | + exchange = Exchange, |
863 | 863 | routing_key = RoutingKey, |
864 | 864 | queue_name_bin = QNameBin, |
865 | 865 | delivery_count = DeliveryCountInt, |
@@ -1757,7 +1757,7 @@ incoming_link_transfer( |
1757 | 1757 | rcv_settle_mode = RcvSettleMode, |
1758 | 1758 | handle = Handle = ?UINT(HandleInt)}, |
1759 | 1759 | MsgPart, |
1760 | | - #incoming_link{exchange = XName = #resource{name = XNameBin}, |
| 1760 | + #incoming_link{exchange = Exchange, |
1761 | 1761 | routing_key = LinkRKey, |
1762 | 1762 | delivery_count = DeliveryCount0, |
1763 | 1763 | incoming_unconfirmed_map = U0, |
@@ -1789,20 +1789,20 @@ incoming_link_transfer( |
1789 | 1789 | Sections = amqp10_framing:decode_bin(MsgBin), |
1790 | 1790 | ?DEBUG("~s Inbound content:~n ~tp", |
1791 | 1791 | [?MODULE, [amqp10_framing:pprint(Section) || Section <- Sections]]), |
1792 | | - Anns0 = #{?ANN_EXCHANGE => XNameBin}, |
1793 | | - Anns = case LinkRKey of |
1794 | | - undefined -> Anns0; |
1795 | | - _ -> Anns0#{?ANN_ROUTING_KEYS => [LinkRKey]} |
1796 | | - end, |
1797 | | - Mc0 = mc:init(mc_amqp, Sections, Anns), |
1798 | | - Mc1 = rabbit_message_interceptor:intercept(Mc0), |
1799 | | - {Mc, RoutingKey} = ensure_routing_key(Mc1), |
1800 | | - check_user_id(Mc, User), |
1801 | | - messages_received(Settled), |
1802 | | - case rabbit_exchange:lookup(XName) of |
1803 | | - {ok, Exchange} -> |
1804 | | - check_write_permitted_on_topic(Exchange, User, RoutingKey), |
1805 | | - QNames = rabbit_exchange:route(Exchange, Mc, #{return_binding_keys => true}), |
| 1792 | + case rabbit_exchange_lookup(Exchange) of |
| 1793 | + {ok, X = #exchange{name = #resource{name = XNameBin}}} -> |
| 1794 | + Anns0 = #{?ANN_EXCHANGE => XNameBin}, |
| 1795 | + Anns = case LinkRKey of |
| 1796 | + undefined -> Anns0; |
| 1797 | + _ -> Anns0#{?ANN_ROUTING_KEYS => [LinkRKey]} |
| 1798 | + end, |
| 1799 | + Mc0 = mc:init(mc_amqp, Sections, Anns), |
| 1800 | + Mc1 = rabbit_message_interceptor:intercept(Mc0), |
| 1801 | + {Mc, RoutingKey} = ensure_routing_key(Mc1), |
| 1802 | + check_user_id(Mc, User), |
| 1803 | + messages_received(Settled), |
| 1804 | + check_write_permitted_on_topic(X, User, RoutingKey), |
| 1805 | + QNames = rabbit_exchange:route(X, Mc, #{return_binding_keys => true}), |
1806 | 1806 | rabbit_trace:tap_in(Mc, QNames, ConnName, ChannelNum, Username, Trace), |
1807 | 1807 | Opts = #{correlation => {HandleInt, DeliveryId}}, |
1808 | 1808 | Qs0 = rabbit_amqqueue:lookup_many(QNames), |
@@ -1838,6 +1838,11 @@ incoming_link_transfer( |
1838 | 1838 | {error, [Disposition, Detach]} |
1839 | 1839 | end. |
1840 | 1840 |
|
| 1841 | +rabbit_exchange_lookup(X = #exchange{}) -> |
| 1842 | + {ok, X}; |
| 1843 | +rabbit_exchange_lookup(XName = #resource{}) -> |
| 1844 | + rabbit_exchange:lookup(XName). |
| 1845 | + |
1841 | 1846 | ensure_routing_key(Mc) -> |
1842 | 1847 | case mc:routing_keys(Mc) of |
1843 | 1848 | [RoutingKey] -> |
@@ -1911,16 +1916,25 @@ ensure_target(#'v1_0.target'{address = Address, |
1911 | 1916 | {ok, Dest} -> |
1912 | 1917 | QNameBin = ensure_terminus(target, Dest, Vhost, User, Durable), |
1913 | 1918 | {XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest), |
1914 | | - XName = rabbit_misc:r(Vhost, exchange, list_to_binary(XNameList1)), |
| 1919 | + XNameBin = list_to_binary(XNameList1), |
| 1920 | + XName = rabbit_misc:r(Vhost, exchange, XNameBin), |
1915 | 1921 | {ok, X} = rabbit_exchange:lookup(XName), |
1916 | 1922 | check_internal_exchange(X), |
1917 | 1923 | check_write_permitted(XName, User), |
| 1924 | + %% Pre-declared exchanges are protected against deletion and modification. |
| 1925 | + %% Let's cache the whole #exchange{} record to save a |
| 1926 | + %% rabbit_exchange:lookup(XName) call each time we receive a message. |
| 1927 | + Exchange = case XNameBin of |
| 1928 | + <<>> -> X; |
| 1929 | + <<"amq.", _/binary>> -> X; |
| 1930 | + _ -> XName |
| 1931 | + end, |
1918 | 1932 | RoutingKey = case RK of |
1919 | 1933 | undefined -> undefined; |
1920 | 1934 | [] -> undefined; |
1921 | 1935 | _ -> list_to_binary(RK) |
1922 | 1936 | end, |
1923 | | - {ok, XName, RoutingKey, QNameBin}; |
| 1937 | + {ok, Exchange, RoutingKey, QNameBin}; |
1924 | 1938 | {error, _} = E -> |
1925 | 1939 | E |
1926 | 1940 | end; |
|
0 commit comments