Skip to content

Commit 1e9d63a

Browse files
committed
Get all existing rabbitmq_mqtt tests green
1 parent 582ea40 commit 1e9d63a

21 files changed

+184
-214
lines changed

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
{stream_queue,
3131
#{desc => "Support queues of type `stream`",
3232
doc_url => "https://www.rabbitmq.com/stream.html",
33-
stability => stable,
33+
%%TODO remove compatibility code
34+
stability => required,
3435
depends_on => [quorum_queue]
3536
}}).
3637

deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
%% reader state
1313
-record(state, {socket,
14+
proxy_socket,
1415
conn_name,
1516
await_recv,
1617
deferred_recv,
@@ -42,7 +43,7 @@
4243
%% same IDs because client and server assign IDs independently of each other.)
4344
packet_id = 1 :: packet_id(),
4445
client_id,
45-
clean_sess,
46+
clean_sess :: boolean(),
4647
will_msg,
4748
exchange :: rabbit_exchange:name(),
4849
ssl_login_name,
@@ -72,37 +73,41 @@
7273
peer_port,
7374
proto_human}).
7475

75-
%% does not include vhost: it is used in
76-
%% the table name
76+
%% does not include vhost because vhost is used in the (D)ETS table name
7777
-record(retained_message, {topic,
7878
mqtt_msg}).
7979

8080
-define(INFO_ITEMS,
81-
[host,
82-
port,
83-
peer_host,
84-
peer_port,
85-
protocol,
86-
frame_max,
87-
client_properties,
88-
ssl,
89-
ssl_protocol,
90-
ssl_key_exchange,
91-
ssl_cipher,
92-
ssl_hash,
93-
conn_name,
94-
connection_state,
95-
connection,
96-
unacked_client_pubs,
97-
unacked_server_pubs,
98-
packet_id,
99-
client_id,
100-
clean_sess,
101-
will_msg,
102-
exchange,
103-
ssl_login_name,
104-
retainer_pid,
105-
user,
106-
vhost]).
81+
[protocol,
82+
host,
83+
port,
84+
peer_host,
85+
peer_port,
86+
connection,
87+
conn_name,
88+
connection_state,
89+
ssl,
90+
ssl_protocol,
91+
ssl_key_exchange,
92+
ssl_cipher,
93+
ssl_hash,
94+
ssl_login_name,
95+
client_id,
96+
vhost,
97+
user,
98+
recv_cnt,
99+
recv_oct,
100+
send_cnt,
101+
send_oct,
102+
send_pend,
103+
clean_sess,
104+
will_msg,
105+
retainer_pid,
106+
exchange,
107+
subscriptions,
108+
prefetch,
109+
messages_unconfirmed,
110+
messages_unacknowledged
111+
]).
107112

108113
-define(MQTT_GUIDE_URL, <<"https://rabbitmq.com/mqtt.html">>).

deps/rabbitmq_mqtt/src/rabbit_mqtt_confirms.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ insert(PktId, _, State)
3737
insert(PktId, QNames, State)
3838
when is_integer(PktId) andalso PktId > 0 ->
3939
QMap = maps:from_keys(QNames, ok),
40-
maps:put(PktId, QMap, State).
40+
{ok, maps:put(PktId, QMap, State)}.
4141

4242
-spec confirm([packet_id()], queue_name(), state()) ->
4343
{[packet_id()], state()}.

deps/rabbitmq_mqtt/src/rabbit_mqtt_connection_info.erl

Lines changed: 0 additions & 25 deletions
This file was deleted.

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 64 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -489,45 +489,45 @@ hand_off_to_retainer(RetainerPid, Amqp2MqttFun, Topic0, Msg) ->
489489

490490
maybe_send_retained_message(RPid, #mqtt_topic{name = Topic0, qos = SubscribeQos},
491491
#proc_state{amqp2mqtt_fun = Amqp2MqttFun,
492-
packet_id = PacketId} = PState0) ->
492+
packet_id = PacketId0} = PState0) ->
493493
Topic1 = Amqp2MqttFun(Topic0),
494494
case rabbit_mqtt_retainer:fetch(RPid, Topic1) of
495-
undefined -> PState0;
496-
Msg ->
497-
%% calculate effective QoS as the lower value of SUBSCRIBE frame QoS
498-
%% and retained message QoS. The spec isn't super clear on this, we
499-
%% do what Mosquitto does, per user feedback.
500-
Qos = erlang:min(SubscribeQos, Msg#mqtt_msg.qos),
501-
{Id, PState} = case Qos of
502-
?QOS_0 -> {undefined, PState0};
503-
?QOS_1 -> {PacketId, PState0#proc_state{packet_id = increment_packet_id(PacketId)}}
504-
end,
495+
undefined ->
496+
PState0;
497+
Msg ->
498+
Qos = effective_qos(Msg#mqtt_msg.qos, SubscribeQos),
499+
{PacketId, PState} = case Qos of
500+
?QOS_0 ->
501+
{undefined, PState0};
502+
?QOS_1 ->
503+
{PacketId0, PState0#proc_state{packet_id = increment_packet_id(PacketId0)}}
504+
end,
505505
serialise_and_send_to_client(
506506
#mqtt_frame{fixed = #mqtt_frame_fixed{
507507
type = ?PUBLISH,
508508
qos = Qos,
509509
dup = false,
510510
retain = Msg#mqtt_msg.retain
511511
}, variable = #mqtt_frame_publish{
512-
message_id = Id,
512+
message_id = PacketId,
513513
topic_name = Topic1
514514
},
515515
payload = Msg#mqtt_msg.payload},
516516
PState),
517517
PState
518518
end.
519519

520-
make_will_msg(#mqtt_frame_connect{ will_flag = false }) ->
520+
make_will_msg(#mqtt_frame_connect{will_flag = false}) ->
521521
undefined;
522-
make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
523-
will_qos = Qos,
524-
will_topic = Topic,
525-
will_msg = Msg }) ->
526-
#mqtt_msg{ retain = Retain,
527-
qos = Qos,
528-
topic = Topic,
529-
dup = false,
530-
payload = Msg }.
522+
make_will_msg(#mqtt_frame_connect{will_retain = Retain,
523+
will_qos = Qos,
524+
will_topic = Topic,
525+
will_msg = Msg}) ->
526+
#mqtt_msg{retain = Retain,
527+
qos = Qos,
528+
topic = Topic,
529+
dup = false,
530+
payload = Msg}.
531531

532532
process_login(_UserBin, _PassBin, _ClientId,
533533
#proc_state{peer_addr = Addr,
@@ -999,9 +999,8 @@ binding_action(
999999
key = RoutingKey},
10001000
BindingFun(Binding, Username).
10011001

1002-
send_will(PState = #proc_state{will_msg = undefined}) ->
1003-
PState;
1004-
1002+
send_will(#proc_state{will_msg = undefined}) ->
1003+
ok;
10051004
send_will(PState = #proc_state{will_msg = WillMsg = #mqtt_msg{retain = Retain,
10061005
topic = Topic},
10071006
retainer_pid = RPid,
@@ -1010,25 +1009,14 @@ send_will(PState = #proc_state{will_msg = WillMsg = #mqtt_msg{retain = Retain,
10101009
ok ->
10111010
publish_to_queues(WillMsg, PState),
10121011
case Retain of
1013-
false -> ok;
1014-
true ->
1012+
false ->
1013+
ok;
1014+
true ->
10151015
hand_off_to_retainer(RPid, Amqp2MqttFun, Topic, WillMsg)
10161016
end;
1017-
Error ->
1018-
rabbit_log:warning(
1019-
"Could not send last will: ~p",
1020-
[Error])
1021-
end,
1022-
%%TODO cancel queue client?
1023-
% case ChQos1 of
1024-
% undefined -> ok;
1025-
% _ -> amqp_channel:close(ChQos1)
1026-
% end,
1027-
% case ChQos0 of
1028-
% undefined -> ok;
1029-
% _ -> amqp_channel:close(ChQos0)
1030-
% end,
1031-
PState.
1017+
{error, access_refused = Reason} ->
1018+
rabbit_log:error("failed to send will message: ~p", [Reason])
1019+
end.
10321020

10331021
publish_to_queues(undefined, PState) ->
10341022
{ok, PState};
@@ -1104,6 +1092,11 @@ deliver_to_queues(Delivery,
11041092
process_routing_confirm(#delivery{confirm = false}, [], PState) ->
11051093
rabbit_global_counters:messages_unroutable_dropped(mqtt, 1),
11061094
PState;
1095+
process_routing_confirm(#delivery{confirm = true,
1096+
msg_seq_no = undefined}, [], PState) ->
1097+
%% unroutable will message with QoS > 0
1098+
rabbit_global_counters:messages_unroutable_dropped(mqtt, 1),
1099+
PState;
11071100
process_routing_confirm(#delivery{confirm = true,
11081101
msg_seq_no = MsgId}, [], PState) ->
11091102
rabbit_global_counters:messages_unroutable_returned(mqtt, 1),
@@ -1114,11 +1107,15 @@ process_routing_confirm(#delivery{confirm = true,
11141107
PState;
11151108
process_routing_confirm(#delivery{confirm = false}, _, PState) ->
11161109
PState;
1110+
process_routing_confirm(#delivery{confirm = true,
1111+
msg_seq_no = undefined}, [_|_], PState) ->
1112+
%% routable will message with QoS > 0
1113+
PState;
11171114
process_routing_confirm(#delivery{confirm = true,
11181115
msg_seq_no = MsgId},
11191116
Qs, PState = #proc_state{unacked_client_pubs = U0}) ->
11201117
QNames = queue_names(Qs),
1121-
U = rabbit_mqtt_confirms:insert(MsgId, QNames, U0),
1118+
{ok, U} = rabbit_mqtt_confirms:insert(MsgId, QNames, U0),
11221119
PState#proc_state{unacked_client_pubs = U}.
11231120

11241121
send_puback(MsgIds, PState)
@@ -1250,10 +1247,7 @@ deliver_one_to_client(Msg = {QName, QPid, QMsgId, _Redelivered,
12501247
false ->
12511248
?QOS_0
12521249
end,
1253-
%% "The QoS of Application Messages sent in response to a Subscription MUST be the minimum
1254-
%% of the QoS of the originally published message and the Maximum QoS granted by the Server
1255-
%% [MQTT-3.8.4-8]."
1256-
QoS = min(PublisherQoS, SubscriberQoS),
1250+
QoS = effective_qos(PublisherQoS, SubscriberQoS),
12571251
PState1 = maybe_publish_to_client(Msg, QoS, PState0),
12581252
PState = maybe_ack(AckRequired, QoS, QName, QMsgId, PState1),
12591253
%%TODO GC
@@ -1264,6 +1258,13 @@ deliver_one_to_client(Msg = {QName, QPid, QMsgId, _Redelivered,
12641258
ok = maybe_notify_sent(QName, QPid, PState),
12651259
PState.
12661260

1261+
-spec effective_qos(qos(), qos()) -> qos().
1262+
effective_qos(PublisherQoS, SubscriberQoS) ->
1263+
%% "The QoS of Application Messages sent in response to a Subscription MUST be the minimum
1264+
%% of the QoS of the originally published message and the Maximum QoS granted by the Server
1265+
%% [MQTT-3.8.4-8]."
1266+
erlang:min(PublisherQoS, SubscriberQoS).
1267+
12671268
maybe_publish_to_client({_, _, _, _Redelivered = true, _}, ?QOS_0, PState) ->
12681269
%% Do no redeliver to MQTT subscriber who gets message at most once.
12691270
PState;
@@ -1405,28 +1406,27 @@ check_topic_access(TopicName, Access,
14051406
end
14061407
end.
14071408

1408-
info(unacked_client_pubs, #proc_state{unacked_client_pubs = Val}) -> Val;
1409+
info(protocol, #proc_state{info = #info{proto_human = Val}}) -> Val;
1410+
info(host, #proc_state{info = #info{host = Val}}) -> Val;
1411+
info(port, #proc_state{info = #info{port = Val}}) -> Val;
1412+
info(peer_host, #proc_state{info = #info{peer_host = Val}}) -> Val;
1413+
info(peer_port, #proc_state{info = #info{peer_port = Val}}) -> Val;
1414+
info(ssl_login_name, #proc_state{ssl_login_name = Val}) -> Val;
14091415
info(client_id, #proc_state{client_id = Val}) ->
14101416
rabbit_data_coercion:to_binary(Val);
1417+
info(vhost, #proc_state{auth_state = #auth_state{vhost = Val}}) -> Val;
1418+
info(user, #proc_state{auth_state = #auth_state{username = Val}}) -> Val;
14111419
info(clean_sess, #proc_state{clean_sess = Val}) -> Val;
14121420
info(will_msg, #proc_state{will_msg = Val}) -> Val;
1413-
info(exchange, #proc_state{exchange = #resource{name = Val}}) -> Val;
1414-
info(ssl_login_name, #proc_state{ssl_login_name = Val}) -> Val;
14151421
info(retainer_pid, #proc_state{retainer_pid = Val}) -> Val;
1416-
info(user, #proc_state{auth_state = #auth_state{username = Val}}) -> Val;
1417-
info(vhost, #proc_state{auth_state = #auth_state{vhost = Val}}) -> Val;
1418-
info(host, #proc_state{info = #info{host = Val}}) -> Val;
1419-
info(port, #proc_state{info = #info{port = Val}}) -> Val;
1420-
info(peer_host, #proc_state{info = #info{peer_host = Val}}) -> Val;
1421-
info(peer_port, #proc_state{info = #info{peer_port = Val}}) -> Val;
1422-
info(protocol, #proc_state{info = #info{proto_human = Val}}) -> Val;
1423-
% info(frame_max, PState) -> additional_info(frame_max, PState);
1424-
% info(client_properties, PState) -> additional_info(client_properties, PState);
1425-
% info(ssl, PState) -> additional_info(ssl, PState);
1426-
% info(ssl_protocol, PState) -> additional_info(ssl_protocol, PState);
1427-
% info(ssl_key_exchange, PState) -> additional_info(ssl_key_exchange, PState);
1428-
% info(ssl_cipher, PState) -> additional_info(ssl_cipher, PState);
1429-
% info(ssl_hash, PState) -> additional_info(ssl_hash, PState);
1422+
info(exchange, #proc_state{exchange = #resource{name = Val}}) -> Val;
1423+
info(subscriptions, #proc_state{subscriptions = Val}) ->
1424+
maps:keys(Val);
1425+
info(prefetch, #proc_state{info = #info{prefetch = Val}}) -> Val;
1426+
info(messages_unconfirmed, #proc_state{unacked_client_pubs = Val}) ->
1427+
rabbit_mqtt_confirms:size(Val);
1428+
info(messages_unacknowledged, #proc_state{unacked_server_pubs = Val}) ->
1429+
maps:size(Val);
14301430
info(Other, _) -> throw({bad_argument, Other}).
14311431

14321432
-spec ssl_login_name(rabbit_net:socket()) ->

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ init(Ref) ->
7171
rabbit_event:init_stats_timer(
7272
control_throttle(
7373
#state{socket = RealSocket,
74+
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock),
7475
conn_name = ConnStr,
7576
await_recv = false,
7677
connection_state = running,
@@ -409,12 +410,17 @@ parse(Bytes, ParseState) ->
409410
{error, {cannot_parse, Reason, Stacktrace}}
410411
end.
411412

413+
%% TODO Send will message in all abnormal shutdowns?
414+
%% => in terminate/2 depending on Reason
415+
%% "The Will Message MUST be published when the Network Connection is subsequently
416+
%% closed unless the Will Message has been deleted by the Server on receipt of a
417+
%% DISCONNECT Packet [MQTT-3.1.2-8]."
412418
send_will_and_terminate(PState, State) ->
413419
send_will_and_terminate(PState, {shutdown, conn_closed}, State).
414420

415421
send_will_and_terminate(PState, Reason, State = #state{conn_name = ConnStr}) ->
416-
rabbit_mqtt_processor:send_will(PState),
417422
rabbit_log_connection:debug("MQTT: about to send will message (if any) on connection ~p", [ConnStr]),
423+
rabbit_mqtt_processor:send_will(PState),
418424
{stop, Reason, State}.
419425

420426
network_error(closed,
@@ -527,5 +533,23 @@ info_internal(connection_state, #state{connection_state = Val}) ->
527533
Val;
528534
info_internal(connection, _State) ->
529535
self();
536+
info_internal(ssl, #state{socket = Sock, proxy_socket = ProxySock}) ->
537+
rabbit_net:proxy_ssl_info(Sock, ProxySock) /= nossl;
538+
info_internal(ssl_protocol, S) -> ssl_info(fun ({P, _}) -> P end, S);
539+
info_internal(ssl_key_exchange, S) -> ssl_info(fun ({_, {K, _, _}}) -> K end, S);
540+
info_internal(ssl_cipher, S) -> ssl_info(fun ({_, {_, C, _}}) -> C end, S);
541+
info_internal(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
530542
info_internal(Key, #state{proc_state = ProcState}) ->
531543
rabbit_mqtt_processor:info(Key, ProcState).
544+
545+
ssl_info(F, #state{socket = Sock, proxy_socket = ProxySock}) ->
546+
case rabbit_net:proxy_ssl_info(Sock, ProxySock) of
547+
nossl -> '';
548+
{error, _} -> '';
549+
{ok, Items} ->
550+
P = proplists:get_value(protocol, Items),
551+
#{cipher := C,
552+
key_exchange := K,
553+
mac := H} = proplists:get_value(selected_cipher_suite, Items),
554+
F({P, {K, C, H}})
555+
end.

0 commit comments

Comments
 (0)