|
56 | 56 | make_discard/2, |
57 | 57 | make_credit/4, |
58 | 58 | make_purge/0, |
59 | | - make_update_state/1 |
| 59 | + make_update_config/1 |
60 | 60 | ]). |
61 | 61 |
|
62 | 62 | -type raw_msg() :: term(). |
|
131 | 131 | delivery_count :: non_neg_integer(), |
132 | 132 | drain :: boolean()}). |
133 | 133 | -record(purge, {}). |
134 | | --record(update_state, {config :: config()}). |
| 134 | +-record(update_config, {config :: config()}). |
135 | 135 |
|
136 | 136 |
|
137 | 137 |
|
|
143 | 143 | #discard{} | |
144 | 144 | #credit{} | |
145 | 145 | #purge{} | |
146 | | - #update_state{}. |
| 146 | + #update_config{}. |
147 | 147 |
|
148 | 148 | -type command() :: protocol() | ra_machine:builtin_command(). |
149 | 149 | %% all the command types suppored by ra fifo |
|
260 | 260 | -spec init(config()) -> state(). |
261 | 261 | init(#{name := Name, |
262 | 262 | queue_resource := Resource} = Conf) -> |
263 | | - update_state(Conf, #state{name = Name, |
| 263 | + update_config(Conf, #state{name = Name, |
264 | 264 | queue_resource = Resource}). |
265 | 265 |
|
266 | | -update_state(Conf, State) -> |
| 266 | +update_config(Conf, State) -> |
267 | 267 | DLH = maps:get(dead_letter_handler, Conf, undefined), |
268 | 268 | BLH = maps:get(become_leader_handler, Conf, undefined), |
269 | 269 | SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL), |
@@ -435,18 +435,24 @@ apply(_, {down, ConsumerPid, noconnection}, |
435 | 435 | Effects0, #state{consumers = Cons0, |
436 | 436 | enqueuers = Enqs0} = State0) -> |
437 | 437 | Node = node(ConsumerPid), |
438 | | - % mark all consumers and enqueuers as suspect |
439 | | - % and monitor the node |
440 | | - {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C, |
441 | | - {Co, St0}) when node(P) =:= Node -> |
442 | | - St = return_all(St0, Checked0), |
443 | | - {maps:put(K, C#consumer{suspected_down = true, |
444 | | - checked_out = #{}}, |
445 | | - Co), |
446 | | - St}; |
447 | | - (K, C, {Co, St}) -> |
448 | | - {maps:put(K, C, Co), St} |
449 | | - end, {#{}, State0}, Cons0), |
| 438 | + % mark all consumers and enqueuers as suspected down |
| 439 | + % and monitor the node so that we can find out the final state of the |
| 440 | + % process at some later point |
| 441 | + {Cons, State} = maps:fold( |
| 442 | + fun({_, P} = K, |
| 443 | + #consumer{checked_out = Checked0} = C, |
| 444 | + {Co, St0}) when node(P) =:= Node -> |
| 445 | + St = return_all(St0, Checked0), |
| 446 | + %% TODO: need to increment credit here |
| 447 | + %% with the size of the Checked map |
| 448 | + Credit = increase_credit(C, maps:size(Checked0)), |
| 449 | + {maps:put(K, C#consumer{suspected_down = true, |
| 450 | + credit = Credit, |
| 451 | + checked_out = #{}}, Co), |
| 452 | + St}; |
| 453 | + (K, C, {Co, St}) -> |
| 454 | + {maps:put(K, C, Co), St} |
| 455 | + end, {#{}, State0}, Cons0), |
450 | 456 | Enqs = maps:map(fun(P, E) when node(P) =:= Node -> |
451 | 457 | E#enqueuer{suspected_down = true}; |
452 | 458 | (_, E) -> E |
@@ -515,8 +521,8 @@ apply(_, {nodeup, Node}, Effects0, |
515 | 521 | service_queue = SQ}, Monitors ++ Effects); |
516 | 522 | apply(_, {nodedown, _Node}, Effects, State) -> |
517 | 523 | {State, Effects, ok}; |
518 | | -apply(_, #update_state{config = Conf}, Effects, State) -> |
519 | | - {update_state(Conf, State), Effects, ok}. |
| 524 | +apply(_, #update_config{config = Conf}, Effects, State) -> |
| 525 | + {update_config(Conf, State), Effects, ok}. |
520 | 526 |
|
521 | 527 | -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). |
522 | 528 | state_enter(leader, #state{consumers = Cons, |
@@ -587,7 +593,9 @@ overview(#state{consumers = Cons, |
587 | 593 | get_checked_out(Cid, From, To, #state{consumers = Consumers}) -> |
588 | 594 | case Consumers of |
589 | 595 | #{Cid := #consumer{checked_out = Checked}} -> |
590 | | - [{K, snd(snd(maps:get(K, Checked)))} || K <- lists:seq(From, To)]; |
| 596 | + [{K, snd(snd(maps:get(K, Checked)))} |
| 597 | + || K <- lists:seq(From, To), |
| 598 | + maps:is_key(K, Checked)]; |
591 | 599 | _ -> |
592 | 600 | [] |
593 | 601 | end. |
@@ -769,16 +777,10 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, |
769 | 777 | snd(T) -> |
770 | 778 | element(2, T). |
771 | 779 |
|
772 | | -return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked, |
| 780 | +return(ConsumerId, MsgNumMsgs, Con0, Checked, |
773 | 781 | Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) -> |
774 | | - Con = case Life of |
775 | | - auto -> |
776 | | - Num = length(MsgNumMsgs), |
777 | | - Con0#consumer{checked_out = Checked, |
778 | | - credit = increase_credit(Con0, Num)}; |
779 | | - once -> |
780 | | - Con0#consumer{checked_out = Checked} |
781 | | - end, |
| 782 | + Con = Con0#consumer{checked_out = Checked, |
| 783 | + credit = increase_credit(Con0, length(MsgNumMsgs))}, |
782 | 784 | {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, |
783 | 785 | SQ0, Effects0), |
784 | 786 | State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) -> |
@@ -900,12 +902,15 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, |
900 | 902 | State0#state{messages = maps:put(MsgNum, Msg, Messages), |
901 | 903 | returns = lqueue:in(MsgNum, Returns)}). |
902 | 904 |
|
903 | | -return_all(State, Checked) -> |
904 | | - maps:fold(fun (_, '$prefix_msg', S) -> |
905 | | - return_one(0, '$prefix_msg', S); |
906 | | - (_, {MsgNum, Msg}, S) -> |
907 | | - return_one(MsgNum, Msg, S) |
908 | | - end, State, Checked). |
| 905 | +return_all(State, Checked0) -> |
| 906 | + %% need to sort the list so that we return messages in the order |
| 907 | + %% they were checked out |
| 908 | + Checked = lists:sort(maps:to_list(Checked0)), |
| 909 | + lists:foldl(fun ({_, '$prefix_msg'}, S) -> |
| 910 | + return_one(0, '$prefix_msg', S); |
| 911 | + ({_, {MsgNum, Msg}}, S) -> |
| 912 | + return_one(MsgNum, Msg, S) |
| 913 | + end, State, Checked). |
909 | 914 |
|
910 | 915 | checkout(State, Effects) -> |
911 | 916 | checkout0(checkout_one(State), Effects, #{}). |
@@ -1170,9 +1175,9 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) -> |
1170 | 1175 | -spec make_purge() -> protocol(). |
1171 | 1176 | make_purge() -> #purge{}. |
1172 | 1177 |
|
1173 | | --spec make_update_state(config()) -> protocol(). |
1174 | | -make_update_state(Config) -> |
1175 | | - #update_state{config = Config}. |
| 1178 | +-spec make_update_config(config()) -> protocol(). |
| 1179 | +make_update_config(Config) -> |
| 1180 | + #update_config{config = Config}. |
1176 | 1181 |
|
1177 | 1182 | add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) -> |
1178 | 1183 | Bytes = message_size(Msg), |
@@ -1502,11 +1507,14 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() -> |
1502 | 1507 | Node = node(Pid), |
1503 | 1508 | {State0, Effects0} = enq(1, 1, second, test_init(test)), |
1504 | 1509 | ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0), |
1505 | | - {State1, Effects1} = check(Cid, 2, State0), |
| 1510 | + {State1, Effects1} = check_auto(Cid, 2, State0), |
| 1511 | + #consumer{credit = 0} = maps:get(Cid, State1#state.consumers), |
1506 | 1512 | ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1), |
1507 | 1513 | % monitor both enqueuer and consumer |
1508 | 1514 | % because we received a noconnection we now need to monitor the node |
1509 | 1515 | {State2a, _Effects2a, _} = apply(meta(3), {down, Pid, noconnection}, [], State1), |
| 1516 | + #consumer{credit = 1} = maps:get(Cid, State2a#state.consumers), |
| 1517 | + %% validate consumer has credit |
1510 | 1518 | {State2, Effects2, _} = apply(meta(3), {down, Self, noconnection}, [], State2a), |
1511 | 1519 | ?ASSERT_EFF({monitor, node, _}, Effects2), |
1512 | 1520 | ?assertNoEffect({demonitor, process, _}, Effects2), |
@@ -1865,6 +1873,26 @@ purge_with_checkout_test() -> |
1865 | 1873 | ?assertEqual(0, maps:size(Checked)), |
1866 | 1874 | ok. |
1867 | 1875 |
|
| 1876 | +down_returns_checked_out_in_order_test() -> |
| 1877 | + S0 = test_init(?FUNCTION_NAME), |
| 1878 | + %% enqueue 100 |
| 1879 | + S1 = lists:foldl(fun (Num, FS0) -> |
| 1880 | + {FS, _} = enq(Num, Num, Num, FS0), |
| 1881 | + FS |
| 1882 | + end, S0, lists:seq(1, 100)), |
| 1883 | + ?assertEqual(100, maps:size(S1#state.messages)), |
| 1884 | + Cid = {<<"cid">>, self()}, |
| 1885 | + {S2, _} = check(Cid, 101, 1000, S1), |
| 1886 | + #consumer{checked_out = Checked} = maps:get(Cid, S2#state.consumers), |
| 1887 | + ?assertEqual(100, maps:size(Checked)), |
| 1888 | + %% simulate down |
| 1889 | + {S, _, _} = apply(meta(102), {down, self(), noproc}, [], S2), |
| 1890 | + Returns = lqueue:to_list(S#state.returns), |
| 1891 | + ?assertEqual(100, length(Returns)), |
| 1892 | + %% validate returns are in order |
| 1893 | + ?assertEqual(lists:sort(Returns), Returns), |
| 1894 | + ok. |
| 1895 | + |
1868 | 1896 | meta(Idx) -> |
1869 | 1897 | #{index => Idx, term => 1}. |
1870 | 1898 |
|
@@ -1900,7 +1928,7 @@ check_auto(Cid, Idx, State) -> |
1900 | 1928 | check(Cid, Idx, Num, State) -> |
1901 | 1929 | strip_reply( |
1902 | 1930 | apply(meta(Idx), |
1903 | | - make_checkout(Cid, {once, Num, simple_prefetch}, #{}), |
| 1931 | + make_checkout(Cid, {auto, Num, simple_prefetch}, #{}), |
1904 | 1932 | [], State)). |
1905 | 1933 |
|
1906 | 1934 | settle(Cid, Idx, MsgId, State) -> |
|
0 commit comments