Skip to content

Commit cb21e14

Browse files
committed
Test that root tasks aren't overproduced
We're currently holding twice as many root tasks in memory at once as we should (or can use). If you comment out `self.ensure_computing()` at the end of `Worker.execute`, this test will pass. Also, try changing `nthreads=`—`max(roots_in_memory)` is always twice `nthreads` (until nthreads is more than half the size of `roots`).
1 parent 91d5070 commit cb21e14

File tree

1 file changed

+28
-0
lines changed

1 file changed

+28
-0
lines changed

distributed/tests/test_worker.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,34 @@ async def test_priorities(c, s, w):
843843
assert any(key.startswith("b1") for key in log[: len(log) // 2])
844844

845845

846+
@gen_cluster(client=True, nthreads=[("127.0.0.1", 4)])
847+
async def test_root_tasks_limited(c, s, w):
848+
roots = [delayed(slowinc)(i, dask_key_name=f"root-{i}") for i in range(10)]
849+
deps = [delayed(inc)(r, dask_key_name=f"inc-{i}") for i, r in enumerate(roots)]
850+
851+
await wait(c.compute(deps))
852+
853+
# actual_order = [t[0] for t in w.log if t[1] == "ready" and t[2] == "executing"]
854+
# order = list(dask.order.order(dask.base.collections_to_dsk(deps)))
855+
856+
diff_roots_in_memory = []
857+
for t in w.log:
858+
if len(t) != 3:
859+
continue
860+
key, start, stop = t
861+
if key.startswith("root"):
862+
if stop == "memory":
863+
diff_roots_in_memory.append(1)
864+
elif start == "release-key":
865+
diff_roots_in_memory.append(-1)
866+
867+
# TODO no numpy
868+
import numpy as np
869+
870+
roots_in_memory = np.cumsum(diff_roots_in_memory)
871+
assert max(roots_in_memory) == w.nthreads
872+
873+
846874
@gen_cluster(client=True)
847875
async def test_heartbeats(c, s, a, b):
848876
x = s.workers[a.address].last_seen

0 commit comments

Comments
 (0)