Skip to content

Commit 12ee906

Browse files
Merge pull request #7095 from rabbitmq/mqtt-peer-addr
Remove MQTT processor field peer_addr
2 parents 31f2dcb + cbb389b commit 12ee906

File tree

3 files changed

+35
-36
lines changed

3 files changed

+35
-36
lines changed

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
%% This module contains code that is common to MQTT and Web MQTT connections.
99
-module(rabbit_mqtt_processor).
1010

11-
-export([info/2, initial_state/2, initial_state/4,
11+
-export([info/2, initial_state/2, initial_state/3,
1212
process_packet/2, serialise/2,
1313
terminate/4, handle_pre_hibernate/0,
1414
handle_ra_event/2, handle_down/2, handle_queue_event/2,
@@ -21,7 +21,8 @@
2121
-export_type([state/0]).
2222

2323
-import(rabbit_mqtt_util, [mqtt_to_amqp/1,
24-
amqp_to_mqtt/1]).
24+
amqp_to_mqtt/1,
25+
ip_address_to_binary/1]).
2526

2627
-include_lib("kernel/include/logger.hrl").
2728
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -57,7 +58,6 @@
5758
prefetch :: non_neg_integer(),
5859
client_id :: option(binary()),
5960
conn_name :: option(binary()),
60-
peer_addr :: inet:ip_address(),
6161
host :: inet:ip_address(),
6262
port :: inet:port_number(),
6363
peer_host :: inet:ip_address(),
@@ -87,21 +87,19 @@
8787

8888
-opaque state() :: #state{}.
8989

90-
-spec initial_state(Socket :: any(), ConnectionName :: binary()) ->
90+
-spec initial_state(Socket :: rabbit_net:socket(),
91+
ConnectionName :: binary()) ->
9192
state().
9293
initial_state(Socket, ConnectionName) ->
93-
{ok, {PeerAddr, _PeerPort}} = rabbit_net:peername(Socket),
9494
initial_state(Socket,
9595
ConnectionName,
96-
fun serialise_and_send_to_client/2,
97-
PeerAddr).
96+
fun serialise_and_send_to_client/2).
9897

99-
-spec initial_state(Socket :: any(),
98+
-spec initial_state(Socket :: rabbit_net:socket(),
10099
ConnectionName :: binary(),
101-
SendFun :: fun((mqtt_packet(), state()) -> any()),
102-
PeerAddr :: inet:ip_address()) ->
100+
SendFun :: fun((mqtt_packet(), state()) -> any())) ->
103101
state().
104-
initial_state(Socket, ConnectionName, SendFun, PeerAddr) ->
102+
initial_state(Socket, ConnectionName, SendFun) ->
105103
Flow = case rabbit_misc:get_env(rabbit, mirroring_flow_control, true) of
106104
true -> flow;
107105
false -> noflow
@@ -114,7 +112,6 @@ initial_state(Socket, ConnectionName, SendFun, PeerAddr) ->
114112
prefetch = rabbit_mqtt_util:env(prefetch),
115113
delivery_flow = Flow,
116114
connected_at = os:system_time(milli_seconds),
117-
peer_addr = PeerAddr,
118115
peer_host = PeerHost,
119116
peer_port = PeerPort,
120117
host = Host,
@@ -393,8 +390,8 @@ check_client_id(_) ->
393390
check_credentials(Packet = #mqtt_packet_connect{username = Username,
394391
password = Password},
395392
State = #state{cfg = #cfg{ssl_login_name = SslLoginName,
396-
peer_addr = PeerAddr}}) ->
397-
Ip = list_to_binary(inet:ntoa(PeerAddr)),
393+
peer_host = PeerHost}}) ->
394+
Ip = ip_address_to_binary(PeerHost),
398395
case creds(Username, Password, SslLoginName) of
399396
nocreds ->
400397
rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt),
@@ -699,13 +696,15 @@ make_will_msg(#mqtt_packet_connect{will_retain = Retain,
699696
payload = Msg}.
700697

701698
process_login(_UserBin, _PassBin, ClientId,
702-
#state{cfg = #cfg{peer_addr = Addr},
699+
#state{cfg = #cfg{peer_host = PeerHost},
703700
auth_state = #auth_state{username = Username,
704701
user = User,
705702
vhost = VHost
706703
}} = State)
707704
when Username =/= undefined, User =/= undefined, VHost =/= underfined ->
708-
rabbit_core_metrics:auth_attempt_failed(list_to_binary(inet:ntoa(Addr)), Username, mqtt),
705+
rabbit_core_metrics:auth_attempt_failed(ip_address_to_binary(PeerHost),
706+
Username,
707+
mqtt),
709708
?LOG_ERROR(
710709
"MQTT detected duplicate connect attempt for client ID '~ts', user '~ts', vhost '~ts'",
711710
[ClientId, Username, VHost]),
@@ -714,13 +713,13 @@ process_login(UserBin, PassBin, ClientId,
714713
#state{auth_state = undefined,
715714
cfg = #cfg{socket = Sock,
716715
ssl_login_name = SslLoginName,
717-
peer_addr = Addr
716+
peer_host = PeerHost
718717
}} = State0) ->
719718
{ok, {_PeerHost, _PeerPort, _Host, Port}} = rabbit_net:socket_ends(Sock, inbound),
720719
{VHostPickedUsing, {VHost, UsernameBin}} = get_vhost(UserBin, SslLoginName, Port),
721720
?LOG_DEBUG("MQTT vhost picked using ~s",
722721
[human_readable_vhost_lookup_strategy(VHostPickedUsing)]),
723-
RemoteIpAddressBin = list_to_binary(inet:ntoa(Addr)),
722+
Ip = ip_address_to_binary(PeerHost),
724723
Input = #{vhost => VHost,
725724
username_bin => UsernameBin,
726725
pass_bin => PassBin,
@@ -736,10 +735,10 @@ process_login(UserBin, PassBin, ClientId,
736735
],
737736
Input, State0) of
738737
{ok, _Output, State} ->
739-
rabbit_core_metrics:auth_attempt_succeeded(RemoteIpAddressBin, UsernameBin, mqtt),
738+
rabbit_core_metrics:auth_attempt_succeeded(Ip, UsernameBin, mqtt),
740739
{ok, State};
741740
{error, _ConnectionRefusedReturnCode, _State} = Err ->
742-
rabbit_core_metrics:auth_attempt_failed(RemoteIpAddressBin, UsernameBin, mqtt),
741+
rabbit_core_metrics:auth_attempt_failed(Ip, UsernameBin, mqtt),
743742
Err
744743
end.
745744

@@ -837,12 +836,12 @@ check_vhost_access(#{vhost := VHost,
837836
client_id := ClientId,
838837
user := User = #user{username = Username}
839838
} = In,
840-
#state{cfg = #cfg{peer_addr = PeerAddr}} = State) ->
839+
#state{cfg = #cfg{peer_host = PeerHost}} = State) ->
841840
AuthzCtx = #{<<"client_id">> => ClientId},
842841
try rabbit_access_control:check_vhost_access(
843842
User,
844843
VHost,
845-
{ip, PeerAddr},
844+
{ip, PeerHost},
846845
AuthzCtx) of
847846
ok ->
848847
{ok, maps:put(authz_ctx, AuthzCtx, In), State}
@@ -859,8 +858,8 @@ check_user_loopback(#{vhost := VHost,
859858
user := User,
860859
authz_ctx := AuthzCtx
861860
},
862-
#state{cfg = #cfg{peer_addr = PeerAddr}} = State) ->
863-
case rabbit_access_control:check_user_loopback(UsernameBin, PeerAddr) of
861+
#state{cfg = #cfg{peer_host = PeerHost}} = State) ->
862+
case rabbit_access_control:check_user_loopback(UsernameBin, PeerHost) of
864863
ok ->
865864
AuthState = #auth_state{user = User,
866865
username = UsernameBin,
@@ -1964,7 +1963,6 @@ format_status(
19641963
prefetch = Prefetch,
19651964
client_id = ClientID,
19661965
conn_name = ConnName,
1967-
peer_addr = PeerAddr,
19681966
host = Host,
19691967
port = Port,
19701968
peer_host = PeerHost,
@@ -1986,7 +1984,6 @@ format_status(
19861984
prefetch => Prefetch,
19871985
client_id => ClientID,
19881986
conn_name => ConnName,
1989-
peer_addr => PeerAddr,
19901987
host => Host,
19911988
port => Port,
19921989
peer_host => PeerHost,

deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
init_sparkplug/0,
2424
mqtt_to_amqp/1,
2525
amqp_to_mqtt/1,
26-
truncate_binary/2
26+
truncate_binary/2,
27+
ip_address_to_binary/1
2728
]).
2829

2930
-define(MAX_TOPIC_TRANSLATION_CACHE_SIZE, 12).
@@ -209,3 +210,7 @@ truncate_binary(Bin, Size)
209210
truncate_binary(Bin, Size)
210211
when is_binary(Bin) ->
211212
binary:part(Bin, 0, Size).
213+
214+
-spec ip_address_to_binary(inet:ip_address()) -> binary().
215+
ip_address_to_binary(IpAddress) ->
216+
list_to_binary(inet:ntoa(IpAddress)).

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,23 +57,22 @@ upgrade(Req, Env, Handler, HandlerState) ->
5757
upgrade(Req, Env, Handler, HandlerState, Opts) ->
5858
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts).
5959

60-
takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, {HandlerState, PeerAddr}}) ->
60+
takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState}) ->
6161
Sock = case HandlerState#state.socket of
6262
undefined ->
6363
Socket;
6464
ProxyInfo ->
6565
{rabbit_proxy_socket, Socket, ProxyInfo}
6666
end,
6767
cowboy_websocket:takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
68-
{Handler, {HandlerState#state{socket = Sock}, PeerAddr}}).
68+
{Handler, HandlerState#state{socket = Sock}}).
6969

7070
%% cowboy_websocket
7171
init(Req, Opts) ->
7272
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
7373
undefined ->
7474
no_supported_sub_protocol(undefined, Req);
7575
Protocol ->
76-
{PeerAddr, _PeerPort} = maps:get(peer, Req),
7776
WsOpts0 = proplists:get_value(ws_opts, Opts, #{}),
7877
WsOpts = maps:merge(#{compress => true}, WsOpts0),
7978
case lists:member(<<"mqtt">>, Protocol) of
@@ -82,16 +81,15 @@ init(Req, Opts) ->
8281
true ->
8382
{?MODULE,
8483
cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt">>, Req),
85-
{#state{socket = maps:get(proxy_header, Req, undefined)},
86-
PeerAddr},
84+
#state{socket = maps:get(proxy_header, Req, undefined)},
8785
WsOpts}
8886
end
8987
end.
9088

91-
-spec websocket_init({state(), PeerAddr :: binary()}) ->
89+
-spec websocket_init(state()) ->
9290
{cowboy_websocket:commands(), state()} |
9391
{cowboy_websocket:commands(), state(), hibernate}.
94-
websocket_init({State0 = #state{socket = Sock}, PeerAddr}) ->
92+
websocket_init(State0 = #state{socket = Sock}) ->
9593
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN ++ [web_mqtt]}),
9694
ok = file_handle_cache:obtain(),
9795
case rabbit_net:connection_string(Sock, inbound) of
@@ -102,8 +100,7 @@ websocket_init({State0 = #state{socket = Sock}, PeerAddr}) ->
102100
PState = rabbit_mqtt_processor:initial_state(
103101
rabbit_net:unwrap_socket(Sock),
104102
ConnName,
105-
fun send_reply/2,
106-
PeerAddr),
103+
fun send_reply/2),
107104
State1 = State0#state{conn_name = ConnName,
108105
proc_state = PState},
109106
State = rabbit_event:init_stats_timer(State1, #state.stats_timer),

0 commit comments

Comments
 (0)