Skip to content

Commit 146570d

Browse files
committed
Delete AMQP 0.9.1 header x-mqtt-dup
AMQP 0.9.1 header x-mqtt-dup was determined by the incoming MQTT PUBLISH packet's DUP flag. Its only use was to determine the outgoing MQTT PUBLISH packet's DUP flag. However, that's wrong behaviour because the MQTT 3.1.1 protocol spec mandates: "The value of the DUP flag from an incoming PUBLISH packet is not propagated when the PUBLISH Packet is sent to subscribers by the Server. The DUP flag in the outgoing PUBLISH packet is set independently to the incoming PUBLISH packet, its value MUST be determined solely by whether the outgoing PUBLISH packet is a retransmission." [MQTT-3.3.1-3] Native MQTT fixes this wrong behaviour. Therefore, we can delete this AMQP 0.9.1 header.
1 parent 5f6a1f9 commit 146570d

File tree

4 files changed

+30
-40
lines changed

4 files changed

+30
-40
lines changed

deps/rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,10 @@
5959
%% Packet identifier is a non zero two byte integer.
6060
-type packet_id() :: 1..16#ffff.
6161

62-
-record(mqtt_packet_fixed, {type = 0,
63-
dup = 0,
64-
qos = 0,
65-
retain = 0
62+
-record(mqtt_packet_fixed, {type = 0 :: packet_type(),
63+
dup = false :: boolean(),
64+
qos = 0 :: qos(),
65+
retain = false :: boolean()
6666
}).
6767

6868
-record(mqtt_packet, {fixed :: #mqtt_packet_fixed{},
@@ -74,7 +74,7 @@
7474

7575
-record(mqtt_packet_connect, {proto_ver :: 3 | 4,
7676
will_retain :: boolean(),
77-
will_qos :: 0..2,
77+
will_qos :: qos(),
7878
will_flag :: boolean(),
7979
clean_sess :: boolean(),
8080
keep_alive :: non_neg_integer(),
@@ -103,7 +103,7 @@
103103
-record(mqtt_msg, {retain :: boolean(),
104104
qos :: qos(),
105105
topic :: binary(),
106-
dup :: option(boolean()),
106+
dup :: boolean(),
107107
packet_id :: option(packet_id()),
108108
payload :: binary()}).
109109

deps/rabbitmq_mqtt/src/rabbit_mqtt_packet.erl

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ parse(<<>>, authenticated) ->
3737
{more, fun(Bin) -> parse(Bin, authenticated) end};
3838
parse(<<MessageType:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, authenticated) ->
3939
parse_remaining_len(Rest, #mqtt_packet_fixed{ type = MessageType,
40-
dup = bool(Dup),
40+
dup = int_to_bool(Dup),
4141
qos = QoS,
42-
retain = bool(Retain) });
42+
retain = int_to_bool(Retain) });
4343
parse(<<?CONNECT:4, 0:4, Rest/binary>>, unauthenticated) ->
4444
parse_remaining_len(Rest, #mqtt_packet_fixed{type = ?CONNECT});
4545
parse(Bin, Cont)
@@ -131,10 +131,10 @@ parse_connect(Bin, Fixed, Length) ->
131131
wrap(Fixed,
132132
#mqtt_packet_connect{
133133
proto_ver = ProtoVersion,
134-
will_retain = bool(WillRetain),
134+
will_retain = int_to_bool(WillRetain),
135135
will_qos = WillQos,
136-
will_flag = bool(WillFlag),
137-
clean_sess = bool(CleanSession),
136+
will_flag = int_to_bool(WillFlag),
137+
clean_sess = int_to_bool(CleanSession),
138138
keep_alive = KeepAlive,
139139
client_id = ClientId,
140140
will_topic = WillTopic,
@@ -184,9 +184,6 @@ parse_msg(Bin, 0) ->
184184
parse_msg(<<Len:16/big, Msg:Len/binary, Rest/binary>>, _) ->
185185
{Msg, Rest}.
186186

187-
bool(0) -> false;
188-
bool(1) -> true.
189-
190187
%% serialisation
191188

192189
-spec serialise(#mqtt_packet{}, ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4) ->
@@ -206,7 +203,7 @@ serialise_variable(#mqtt_packet_fixed { type = ?CONNACK } = Fixed,
206203
#mqtt_packet_connack { session_present = SessionPresent,
207204
return_code = ReturnCode },
208205
<<>> = PayloadBin, _Vsn) ->
209-
VariableBin = <<?RESERVED:7, (opt(SessionPresent)):1, ReturnCode:8>>,
206+
VariableBin = <<?RESERVED:7, (bool_to_int(SessionPresent)):1, ReturnCode:8>>,
210207
serialise_fixed(Fixed, VariableBin, PayloadBin);
211208

212209
serialise_variable(#mqtt_packet_fixed { type = SubAck } = Fixed,
@@ -247,15 +244,16 @@ serialise_variable(#mqtt_packet_fixed {} = Fixed,
247244
<<>> = _PayloadBin, _Vsn) ->
248245
serialise_fixed(Fixed, <<>>, <<>>).
249246

250-
serialise_fixed(#mqtt_packet_fixed{ type = Type,
251-
dup = Dup,
252-
qos = Qos,
253-
retain = Retain }, VariableBin, Payload)
254-
when is_integer(Type) andalso ?CONNECT =< Type andalso Type =< ?DISCONNECT ->
247+
serialise_fixed(#mqtt_packet_fixed{type = Type,
248+
dup = Dup,
249+
qos = Qos,
250+
retain = Retain}, VariableBin, Payload)
251+
when is_integer(Type) andalso ?CONNECT =< Type andalso Type =< ?DISCONNECT andalso
252+
is_integer(Qos) andalso 0 =< Qos andalso Qos =< 2 ->
255253
Len = size(VariableBin) + iolist_size(Payload),
256254
true = (Len =< ?MAX_LEN),
257255
LenBin = serialise_len(Len),
258-
[<<Type:4, (opt(Dup)):1, (opt(Qos)):2, (opt(Retain)):1,
256+
[<<Type:4, (bool_to_int(Dup)):1, Qos:2, (bool_to_int(Retain)):1,
259257
LenBin/binary, VariableBin/binary>>, Payload].
260258

261259
serialise_utf(String) ->
@@ -269,10 +267,13 @@ serialise_len(N) when N =< ?LOWBITS ->
269267
serialise_len(N) ->
270268
<<1:1, (N rem ?HIGHBIT):7, (serialise_len(N div ?HIGHBIT))/binary>>.
271269

272-
opt(undefined) -> ?RESERVED;
273-
opt(false) -> 0;
274-
opt(true) -> 1;
275-
opt(X) when is_integer(X) -> X.
276-
277270
protocol_name_approved(Ver, Name) ->
278271
lists:member({Ver, Name}, ?PROTOCOL_NAMES).
272+
273+
-spec int_to_bool(0 | 1) -> boolean().
274+
int_to_bool(0) -> false;
275+
int_to_bool(1) -> true.
276+
277+
-spec bool_to_int(boolean()) -> 0 | 1.
278+
bool_to_int(false) -> 0;
279+
bool_to_int(true) -> 1.

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1084,7 +1084,6 @@ binding_action(ExchangeName, TopicName, QName, BindingFun, #auth_state{user = #u
10841084
publish_to_queues(
10851085
#mqtt_msg{qos = Qos,
10861086
topic = Topic,
1087-
dup = Dup,
10881087
packet_id = PacketId,
10891088
payload = Payload},
10901089
#state{cfg = #cfg{exchange = ExchangeName,
@@ -1095,10 +1094,8 @@ publish_to_queues(
10951094
} = State) ->
10961095
RoutingKey = mqtt_to_amqp(Topic),
10971096
Confirm = Qos > ?QOS_0,
1098-
Headers = [{<<"x-mqtt-publish-qos">>, byte, Qos},
1099-
{<<"x-mqtt-dup">>, bool, Dup}],
11001097
Props = #'P_basic'{
1101-
headers = Headers,
1098+
headers = [{<<"x-mqtt-publish-qos">>, byte, Qos}],
11021099
delivery_mode = delivery_mode(Qos)},
11031100
{ClassId, _MethodId} = rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
11041101
Content = #content{
@@ -1491,12 +1488,6 @@ maybe_publish_to_client(
14911488
fixed = #mqtt_packet_fixed{
14921489
type = ?PUBLISH,
14931490
qos = QoS,
1494-
%% "The value of the DUP flag from an incoming PUBLISH packet is not
1495-
%% propagated when the PUBLISH Packet is sent to subscribers by the Server.
1496-
%% The DUP flag in the outgoing PUBLISH packet is set independently to the
1497-
%% incoming PUBLISH packet, its value MUST be determined solely by whether
1498-
%% the outgoing PUBLISH packet is a retransmission [MQTT-3.3.1-3]."
1499-
%% Therefore, we do not consider header value <<"x-mqtt-dup">> here.
15001491
dup = Redelivered},
15011492
variable = #mqtt_packet_publish{
15021493
packet_id = PacketId,

deps/rabbitmq_mqtt/test/shared_SUITE.erl

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,8 +1307,7 @@ trace(Config) ->
13071307
<<"channel">> := 0,
13081308
<<"user">> := <<"guest">>,
13091309
<<"properties">> := #{<<"delivery_mode">> := 2,
1310-
<<"headers">> := #{<<"x-mqtt-publish-qos">> := 1,
1311-
<<"x-mqtt-dup">> := false}},
1310+
<<"headers">> := #{<<"x-mqtt-publish-qos">> := 1}},
13121311
<<"routed_queues">> := [<<"mqtt-subscription-trace_subscriberqos0">>]},
13131312
rabbit_misc:amqp_table(PublishHeaders)),
13141313

@@ -1324,8 +1323,7 @@ trace(Config) ->
13241323
<<"channel">> := 0,
13251324
<<"user">> := <<"guest">>,
13261325
<<"properties">> := #{<<"delivery_mode">> := 2,
1327-
<<"headers">> := #{<<"x-mqtt-publish-qos">> := 1,
1328-
<<"x-mqtt-dup">> := false}},
1326+
<<"headers">> := #{<<"x-mqtt-publish-qos">> := 1}},
13291327
<<"redelivered">> := 0},
13301328
rabbit_misc:amqp_table(DeliverHeaders)),
13311329

0 commit comments

Comments
 (0)