Skip to content

Commit 628be90

Browse files
committed
Consumer timeouts - wip
1 parent d016d0a commit 628be90

File tree

3 files changed

+230
-60
lines changed

3 files changed

+230
-60
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 138 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
-define(CONSUMER_TAG_PID(Tag, Pid),
2424
#consumer{cfg = #consumer_cfg{tag = Tag,
2525
pid = Pid}}).
26+
-define(NON_EMPTY_MAP(M), M when map_size(M) > 0).
27+
-define(EMPTY_MAP, M when map_size(M) == 0).
2628

2729
-export([
2830
%% ra_machine callbacks
@@ -76,6 +78,7 @@
7678
make_purge/0,
7779
make_purge_nodes/1,
7880
make_update_config/1,
81+
make_eval_consumer_timeouts/1,
7982
make_garbage_collection/0
8083
]).
8184

@@ -119,7 +122,7 @@
119122
-record(purge_nodes, {nodes :: [node()]}).
120123
-record(update_config, {config :: config()}).
121124
-record(garbage_collection, {}).
122-
% -record(eval_consumer_timeouts, {consumer_keys :: [consumer_key()]}).
125+
-record(eval_consumer_timeouts, {consumer_keys :: [consumer_key()]}).
123126

124127
-opaque protocol() ::
125128
#enqueue{} |
@@ -238,9 +241,31 @@ apply(Meta, #settle{msg_ids = MsgIds,
238241
%% find_consumer/2 returns the actual consumer key even if
239242
%% if id was passed instead for example
240243
complete_and_checkout(Meta, MsgIds, ConsumerKey,
241-
Con0, [], State);
244+
reactivate_timed_out(Con0),
245+
[], State);
242246
_ ->
243-
{State, ok}
247+
{State, {error, invalid_consumer_key}}
248+
end;
249+
apply(#{index := Idx,
250+
system_time := Ts} = Meta, #defer{msg_ids = MsgIds,
251+
consumer_key = Key},
252+
#?STATE{consumers = Consumers} = State0) ->
253+
case find_consumer(Key, Consumers) of
254+
{ConsumerKey, #consumer{checked_out = Checked0} = Con0} ->
255+
Checked = maps:map(fun (MsgId, ?C_MSG(_At, Msg) = Orig) ->
256+
case lists:member(MsgId, MsgIds) of
257+
true ->
258+
?C_MSG(Ts, Msg);
259+
false ->
260+
Orig
261+
end
262+
end, Checked0),
263+
Con = reactivate_timed_out(Con0#consumer{checked_out = Checked}),
264+
State1 = State0#?STATE{consumers = Consumers#{ConsumerKey => Con}},
265+
{State, Ret, Effs} = checkout(Meta, State0, State1, []),
266+
update_smallest_raft_index(Idx, Ret, State, Effs);
267+
_ ->
268+
{State0, {error, invalid_consumer_key}}
244269
end;
245270
apply(Meta, #discard{consumer_key = ConsumerKey,
246271
msg_ids = MsgIds},
@@ -263,19 +288,25 @@ apply(Meta, #discard{consumer_key = ConsumerKey,
263288
{DlxState, Effects} = rabbit_fifo_dlx:discard(DiscardMsgs, rejected,
264289
DLH, DlxState0),
265290
State = State0#?STATE{dlx = DlxState},
266-
complete_and_checkout(Meta, MsgIds, ConsumerKey, Con, Effects, State);
291+
complete_and_checkout(Meta, MsgIds, ConsumerKey,
292+
reactivate_timed_out(Con),
293+
Effects, State);
267294
_ ->
268-
{State0, ok}
295+
{State0, {error, invalid_consumer_key}}
269296
end;
270297
apply(Meta, #return{consumer_key = ConsumerKey,
271298
msg_ids = MsgIds},
272-
#?STATE{consumers = Cons0} = State) ->
299+
#?STATE{consumers = Cons0} = State0) ->
273300
case find_consumer(ConsumerKey, Cons0) of
274-
{ActualConsumerKey, #consumer{checked_out = Checked0}} ->
301+
{ActualConsumerKey, #consumer{checked_out = Checked0} = Con0} ->
302+
303+
State = State0#?MODULE{consumers =
304+
Cons0#{ActualConsumerKey =>
305+
reactivate_timed_out(Con0)}},
275306
Returned = maps:with(MsgIds, Checked0),
276307
return(Meta, ActualConsumerKey, Returned, [], State);
277308
_ ->
278-
{State, ok}
309+
{State0, {error, invalid_consumer_key}}
279310
end;
280311
apply(#{index := Idx} = Meta,
281312
#requeue{consumer_key = ConsumerKey,
@@ -731,12 +762,51 @@ apply(#{index := IncomingRaftIdx} = Meta, {dlx, _} = Cmd,
731762
State1 = State0#?STATE{dlx = DlxState},
732763
{State, ok, Effects} = checkout(Meta, State0, State1, Effects0),
733764
update_smallest_raft_index(IncomingRaftIdx, State, Effects);
765+
apply(Meta, #eval_consumer_timeouts{consumer_keys = CKeys}, State) ->
766+
eval_consumer_timeouts(Meta, CKeys, State);
734767
apply(_Meta, Cmd, State) ->
735768
%% handle unhandled commands gracefully
736769
rabbit_log:debug("rabbit_fifo: unhandled command ~W", [Cmd, 10]),
737770
{State, ok, []}.
738771

739-
convert_v3_to_v4(#{system_time := Ts}, #rabbit_fifo{consumers = Consumers0} = StateV3) ->
772+
eval_consumer_timeouts(#{system_time := Ts} = Meta, CKeys,
773+
#?STATE{cfg = #cfg{consumer_strategy = competing},
774+
consumers = Consumers0} = State0) ->
775+
ToCheck = maps:with(CKeys, Consumers0),
776+
{State, Effects} =
777+
maps:fold(
778+
fun (Ckey, #consumer{cfg = #consumer_cfg{},
779+
status = up,
780+
checked_out = Ch} = C0,
781+
{#?STATE{consumers = Cons} = S0, E0} = Acc) ->
782+
case maps:filter(fun (_MsgId, ?C_MSG(At, _)) ->
783+
(At + ?CONSUMER_LOCK_MS) < Ts
784+
end, Ch) of
785+
?EMPTY_MAP ->
786+
Acc;
787+
?NON_EMPTY_MAP(ToReturn) ->
788+
%% there are timed out messages,
789+
%% update consumer state to `timed_out'
790+
%% TODO: only if current status us `up'
791+
C1 = C0#consumer{status = timed_out},
792+
S1 = S0#?STATE{consumers = Cons#{Ckey => C1}},
793+
{S, E1} = maps:fold(
794+
fun(MsgId, ?C_MSG(_At, Msg), {S2, E1}) ->
795+
return_one(Meta, MsgId, Msg,
796+
S2, E1, Ckey)
797+
end, {S1, E0}, ToReturn),
798+
E = [{consumer_timeout, Ckey, maps:keys(ToReturn)} | E1],
799+
C = maps:get(Ckey, S#?STATE.consumers),
800+
{update_or_remove_con(Meta, Ckey, C, S), E}
801+
end;
802+
(_Ckey, _Con, Acc) ->
803+
Acc
804+
end, {State0, []}, ToCheck),
805+
806+
{State, ok, Effects}.
807+
808+
convert_v3_to_v4(#{system_time := Ts},
809+
#rabbit_fifo{consumers = Consumers0} = StateV3) ->
740810
Consumers = maps:map(
741811
fun (_CKey, #consumer{checked_out = Ch0} = C) ->
742812
Ch = maps:map(
@@ -965,18 +1035,27 @@ which_module(2) -> rabbit_fifo_v3;
9651035
which_module(3) -> rabbit_fifo_v3;
9661036
which_module(4) -> ?STATE.
9671037

968-
-define(AUX, aux_v2).
1038+
-define(AUX, aux_v3).
9691039

9701040
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
9711041
-record(aux, {name :: atom(),
9721042
capacity :: term(),
9731043
gc = #aux_gc{} :: #aux_gc{}}).
1044+
-record(aux_v2, {name :: atom(),
1045+
last_decorators_state :: term(),
1046+
capacity :: term(),
1047+
gc = #aux_gc{} :: #aux_gc{},
1048+
tick_pid :: undefined | pid(),
1049+
cache = #{} :: map()}).
9741050
-record(?AUX, {name :: atom(),
9751051
last_decorators_state :: term(),
9761052
capacity :: term(),
9771053
gc = #aux_gc{} :: #aux_gc{},
9781054
tick_pid :: undefined | pid(),
979-
cache = #{} :: map()}).
1055+
cache = #{} :: map(),
1056+
last_consumer_timeout_check :: milliseconds(),
1057+
reserved_1,
1058+
reserved_2}).
9801059

9811060
init_aux(Name) when is_atom(Name) ->
9821061
%% TODO: catch specific exception throw if table already exists
@@ -985,15 +1064,36 @@ init_aux(Name) when is_atom(Name) ->
9851064
{write_concurrency, true}]),
9861065
Now = erlang:monotonic_time(micro_seconds),
9871066
#?AUX{name = Name,
1067+
last_consumer_timeout_check = erlang:system_time(millisecond),
9881068
capacity = {inactive, Now, 1, 1.0}}.
9891069

9901070
handle_aux(RaftState, Tag, Cmd, #aux{name = Name,
9911071
capacity = Cap,
992-
gc = Gc}, RaAux) ->
1072+
gc = Gc
1073+
}, RaAux) ->
9931074
%% convert aux state to new version
9941075
Aux = #?AUX{name = Name,
9951076
capacity = Cap,
996-
gc = Gc},
1077+
gc = Gc,
1078+
last_consumer_timeout_check = erlang:system_time(millisecond)
1079+
},
1080+
handle_aux(RaftState, Tag, Cmd, Aux, RaAux);
1081+
handle_aux(RaftState, Tag, Cmd, #aux_v2{name = Name,
1082+
last_decorators_state = LDS,
1083+
capacity = Cap,
1084+
gc = Gc,
1085+
tick_pid = TickPid,
1086+
cache = Cache
1087+
}, RaAux) ->
1088+
%% convert aux state to new version
1089+
Aux = #?AUX{name = Name,
1090+
last_decorators_state = LDS,
1091+
capacity = Cap,
1092+
gc = Gc,
1093+
tick_pid = TickPid,
1094+
cache = Cache,
1095+
last_consumer_timeout_check = erlang:system_time(millisecond)
1096+
},
9971097
handle_aux(RaftState, Tag, Cmd, Aux, RaAux);
9981098
handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
9991099
consumer_key = Key} = Ret, Corr, Pid},
@@ -1028,7 +1128,8 @@ handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
10281128
{no_reply, Aux0, RaAux0, [{append, Ret, {notify, Corr, Pid}}]}
10291129
end;
10301130
handle_aux(leader, _, {handle_tick, [QName, Overview0, Nodes]},
1031-
#?AUX{tick_pid = Pid} = Aux, RaAux) ->
1131+
#?AUX{tick_pid = Pid,
1132+
last_consumer_timeout_check = LastCheck} = Aux, RaAux) ->
10321133
Overview = Overview0#{members_info => ra_aux:members_info(RaAux)},
10331134
NewPid =
10341135
case process_is_alive(Pid) of
@@ -1040,8 +1141,17 @@ handle_aux(leader, _, {handle_tick, [QName, Overview0, Nodes]},
10401141
%% Active TICK pid, do nothing
10411142
Pid
10421143
end,
1043-
%% TODO: check consumer timeouts
1044-
{no_reply, Aux#?AUX{tick_pid = NewPid}, RaAux};
1144+
%% check consumer timeouts
1145+
Now = erlang:system_time(millisecond),
1146+
case Now - LastCheck > 1000 of
1147+
true ->
1148+
%% check if there are any consumer checked out message that have
1149+
%% timed out.
1150+
{no_reply, Aux#?AUX{tick_pid = NewPid,
1151+
last_consumer_timeout_check = Now}, RaAux};
1152+
false ->
1153+
{no_reply, Aux#?AUX{tick_pid = NewPid}, RaAux}
1154+
end;
10451155
handle_aux(_, _, {get_checked_out, ConsumerKey, MsgIds}, Aux0, RaAux0) ->
10461156
#?STATE{cfg = #cfg{},
10471157
consumers = Consumers} = ra_aux:machine_state(RaAux0),
@@ -1724,38 +1834,21 @@ maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg, Effects0,
17241834
return(#{index := IncomingRaftIdx} = Meta,
17251835
ConsumerKey, Returned, Effects0, State0) ->
17261836
{State1, Effects1} = maps:fold(
1727-
fun(MsgId, {_At, Msg}, {S0, E0}) ->
1837+
fun(MsgId, ?C_MSG(_At, Msg), {S0, E0}) ->
17281838
return_one(Meta, MsgId, Msg,
17291839
S0, E0, ConsumerKey)
17301840
end, {State0, Effects0}, Returned),
17311841
State2 = case State1#?STATE.consumers of
17321842
#{ConsumerKey := Con} ->
1733-
update_or_remove_con(Meta, ConsumerKey, Con, State1);
1843+
update_or_remove_con(Meta, ConsumerKey,
1844+
Con, State1);
17341845
_ ->
17351846
State1
17361847
end,
17371848
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
17381849
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
17391850

17401851
% used to process messages that are finished
1741-
complete(Meta, ConsumerKey, [MsgId],
1742-
#consumer{checked_out = Checked0} = Con0,
1743-
#?STATE{ra_indexes = Indexes0,
1744-
msg_bytes_checkout = BytesCheckout,
1745-
messages_total = Tot} = State0) ->
1746-
case maps:take(MsgId, Checked0) of
1747-
{?C_MSG(_, Idx, Hdr), Checked} ->
1748-
SettledSize = get_header(size, Hdr),
1749-
Indexes = rabbit_fifo_index:delete(Idx, Indexes0),
1750-
Con = Con0#consumer{checked_out = Checked,
1751-
credit = increase_credit(Con0, 1)},
1752-
State1 = update_or_remove_con(Meta, ConsumerKey, Con, State0),
1753-
State1#?STATE{ra_indexes = Indexes,
1754-
msg_bytes_checkout = BytesCheckout - SettledSize,
1755-
messages_total = Tot - 1};
1756-
error ->
1757-
State0
1758-
end;
17591852
complete(Meta, ConsumerKey, MsgIds,
17601853
#consumer{checked_out = Checked0} = Con0,
17611854
#?STATE{ra_indexes = Indexes0,
@@ -2507,6 +2600,10 @@ make_purge_nodes(Nodes) ->
25072600
make_update_config(Config) ->
25082601
#update_config{config = Config}.
25092602

2603+
-spec make_eval_consumer_timeouts([consumer_key()]) -> protocol().
2604+
make_eval_consumer_timeouts(Keys) when is_list(Keys) ->
2605+
#eval_consumer_timeouts{consumer_keys = Keys}.
2606+
25102607
add_bytes_drop(Header,
25112608
#?STATE{msg_bytes_enqueue = Enqueue} = State) ->
25122609
Size = get_header(size, Header),
@@ -2791,3 +2888,8 @@ maps_search(Pred, {K, V, I}) ->
27912888
end;
27922889
maps_search(Pred, Map) when is_map(Map) ->
27932890
maps_search(Pred, maps:next(maps:iterator(Map))).
2891+
2892+
reactivate_timed_out(#consumer{status = timed_out} = C) ->
2893+
C#consumer{status = up};
2894+
reactivate_timed_out(C) ->
2895+
C.

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@
110110
-define(LOW_LIMIT, 0.8).
111111
-define(DELIVERY_CHUNK_LIMIT_B, 128_000).
112112

113+
-define(CONSUMER_LOCK_MS, 5 * 60 * 1000). % 5 min lock
114+
113115
-type milliseconds() :: non_neg_integer().
114116
-record(consumer_cfg,
115117
{meta = #{} :: consumer_meta(),
@@ -126,7 +128,18 @@
126128

127129
-record(consumer,
128130
{cfg = #consumer_cfg{},
129-
status = up :: up | suspected_down | cancelled | fading,
131+
status = up :: up |
132+
% on a disconnected node
133+
suspected_down |
134+
cancelled |
135+
% deprioritised with pending messages
136+
fading |
137+
%% one or more checked out messages have timed out
138+
%% and been returned but the consumer process is still
139+
%% on a connected node,
140+
%% the `timed_out' state can only be reached directly from
141+
%% the `up' state and can transition into any other state
142+
timed_out,
130143
next_msg_id = 0 :: msg_id(),
131144
checked_out = #{} :: #{msg_id() => {At :: milliseconds(), msg()}},
132145
%% max number of messages that can be sent
@@ -140,7 +153,8 @@
140153

141154
-type consumer_strategy() :: competing | single_active.
142155

143-
-type dead_letter_handler() :: option({at_most_once, applied_mfa()} | at_least_once).
156+
-type dead_letter_handler() :: option({at_most_once, applied_mfa()} |
157+
at_least_once).
144158

145159
-record(enqueuer,
146160
{next_seqno = 1 :: msg_seqno(),

0 commit comments

Comments
 (0)