|
20 | 20 | -include_lib("eunit/include/eunit.hrl"). |
21 | 21 | -include_lib("amqp_client/include/amqp_client.hrl"). |
22 | 22 |
|
| 23 | +-import(quorum_queue_utils, [wait_for_messages_ready/3, |
| 24 | + wait_for_messages_pending_ack/3, |
| 25 | + dirty_query/3, |
| 26 | + ra_name/1]). |
| 27 | + |
23 | 28 | -compile(export_all). |
24 | 29 |
|
25 | 30 | all() -> |
@@ -597,9 +602,6 @@ publish_confirm(Ch, QName) -> |
597 | 602 | ct:pal("CONFIRMED! ~s", [QName]), |
598 | 603 | ok. |
599 | 604 |
|
600 | | -ra_name(Q) -> |
601 | | - binary_to_atom(<<"%2F_", Q/binary>>, utf8). |
602 | | - |
603 | 605 | publish_and_restart(Config) -> |
604 | 606 | %% Test the node restart with both types of queues (quorum and classic) to |
605 | 607 | %% ensure there are no regressions |
@@ -2245,49 +2247,6 @@ wait_for_cleanup(Server, Channel, Number, N) -> |
2245 | 2247 | wait_for_cleanup(Server, Channel, Number, N - 1) |
2246 | 2248 | end. |
2247 | 2249 |
|
2248 | | - |
2249 | | -wait_for_messages_ready(Servers, QName, Ready) -> |
2250 | | - wait_for_messages(Servers, QName, Ready, |
2251 | | - fun rabbit_fifo:query_messages_ready/1, 60). |
2252 | | - |
2253 | | -wait_for_messages_pending_ack(Servers, QName, Ready) -> |
2254 | | - wait_for_messages(Servers, QName, Ready, |
2255 | | - fun rabbit_fifo:query_messages_checked_out/1, 60). |
2256 | | - |
2257 | | -wait_for_messages(Servers, QName, Number, Fun, 0) -> |
2258 | | - Msgs = dirty_query(Servers, QName, Fun), |
2259 | | - Totals = lists:map(fun(M) when is_map(M) -> |
2260 | | - maps:size(M); |
2261 | | - (_) -> |
2262 | | - -1 |
2263 | | - end, Msgs), |
2264 | | - ?assertEqual(Totals, [Number || _ <- lists:seq(1, length(Servers))]); |
2265 | | -wait_for_messages(Servers, QName, Number, Fun, N) -> |
2266 | | - Msgs = dirty_query(Servers, QName, Fun), |
2267 | | - case lists:all(fun(M) when is_map(M) -> |
2268 | | - maps:size(M) == Number; |
2269 | | - (_) -> |
2270 | | - false |
2271 | | - end, Msgs) of |
2272 | | - true -> |
2273 | | - ok; |
2274 | | - _ -> |
2275 | | - timer:sleep(500), |
2276 | | - wait_for_messages(Servers, QName, Number, Fun, N - 1) |
2277 | | - end. |
2278 | | - |
2279 | | -dirty_query(Servers, QName, Fun) -> |
2280 | | - lists:map( |
2281 | | - fun(N) -> |
2282 | | - case rpc:call(N, ra, local_query, [{QName, N}, Fun]) of |
2283 | | - {ok, {_, Msgs}, _} -> |
2284 | | - ct:pal("Msgs ~w", [Msgs]), |
2285 | | - Msgs; |
2286 | | - _ -> |
2287 | | - undefined |
2288 | | - end |
2289 | | - end, Servers). |
2290 | | - |
2291 | 2250 | wait_until(Condition) -> |
2292 | 2251 | wait_until(Condition, 60). |
2293 | 2252 |
|
|
0 commit comments