@@ -76,18 +76,26 @@ websocket_info({#'basic.deliver'{}, #amqp_msg{}, _DeliveryCtx} = Delivery,
7676 {error , _ , _ } ->
7777 {shutdown , Req , State }
7878 end ;
79-
80- % handle_info(#'basic.ack'{} = Ack, State = #state{ proc_state = ProcState }) ->
81- % callback_reply(State, rabbit_mqtt_processor:amqp_callback(Ack, ProcState));
82- %
83- % handle_info(#'basic.consume_ok'{}, State) ->
84- % {noreply, State, hibernate};
85- %
86- % handle_info(#'basic.cancel'{}, State) ->
87- % {stop, {shutdown, subscription_cancelled}, State};
88-
79+ websocket_info (# 'basic.ack' {} = Ack , Req , State = # state { proc_state = ProcState0 }) ->
80+ case rabbit_mqtt_processor :amqp_callback (Ack , ProcState0 ) of
81+ {ok , ProcState } ->
82+ {ok , Req , State # state { proc_state = ProcState }};
83+ {error , _ , _ } ->
84+ {shutdown , Req , State }
85+ end ;
86+ websocket_info (# 'basic.consume_ok' {}, Req , State ) ->
87+ {ok , Req , State };
88+ websocket_info (# 'basic.cancel' {}, Req , State ) ->
89+ {shutdown , Req , State };
8990websocket_info ({reply , Data }, Req , State ) ->
9091 {reply , {binary , Data }, Req , State };
92+ websocket_info ({'EXIT' , _ , _ }, State ) ->
93+ {shutdown , Req , State };
94+ websocket_info ({'$gen_cast' , duplicate_id }, Req , State = # state { proc_state = ProcState ,
95+ conn_name = ConnName }) ->
96+ rabbit_log :warning (" MQTT disconnecting duplicate client id ~p (~p )~n " ,
97+ [rabbit_mqtt_processor :info (client_id , ProcState ), ConnName ]),
98+ {shutdown , Req , State };
9199websocket_info (Msg , Req , State ) ->
92100 rabbit_log :info (" rabbit_web_mqtt: unexpected message ~p~n " ,
93101 [Msg ]),
0 commit comments