Skip to content

Commit 4f3f77b

Browse files
dcorbachoLoisSotoLopez
authored andcommitted
MQTT: send acks before disconnecting consumer
1 parent 34e6b11 commit 4f3f77b

File tree

1 file changed

+10
-9
lines changed

1 file changed

+10
-9
lines changed

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2031,16 +2031,17 @@ handle_queue_event({queue_event, QName, Evt},
20312031
State = handle_queue_actions(Actions, State1),
20322032
{ok, State};
20332033
{eol, Actions} ->
2034-
try
2035-
State1 = handle_queue_actions(Actions ++ [{queue_down, QName}], State0),
2036-
{ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QName, U0),
2037-
QStates = rabbit_queue_type:remove(QName, QStates0),
2038-
State = State1#state{queue_states = QStates,
2039-
unacked_client_pubs = U},
2040-
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
2041-
{ok, State}
2034+
State1 = handle_queue_actions(Actions, State0),
2035+
{ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QName, U0),
2036+
QStates = rabbit_queue_type:remove(QName, QStates0),
2037+
State = State1#state{queue_states = QStates,
2038+
unacked_client_pubs = U},
2039+
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
2040+
try handle_queue_down(QName, State) of
2041+
State2 ->
2042+
{ok, State2}
20422043
catch throw:consuming_queue_down ->
2043-
{error, consuming_queue_down, State0}
2044+
{error, consuming_queue_down, State}
20442045
end;
20452046
{protocol_error, _Type, _Reason, _ReasonArgs} = Error ->
20462047
{error, Error, State0}

0 commit comments

Comments
 (0)