-
-
Notifications
You must be signed in to change notification settings - Fork 738
Description
When dask.compute()
or client.compute()
are executed, memory does not get released. I have dask running in a celery
worker, which keeps executing tasks, however, memory use consumption increases as well. Dask seems not be able to release some of the memory that it used up.
I tried using pympler
, tracemalloc
and memory_profiler
, but nothing is pointing to any data objects. The memory consumption is seen in monitoring tools, such as htop
.
To reproduce the memory leak, here is an example I compiled:
import gc
import os.path
import dask
import dask.distributed
import dask.dataframe
gc.enable()
INPUT = 'test.csv'
def make_input(count=1000000):
if not os.path.exists(INPUT):
import pandas as pd
import numpy as np
pdf = pd.DataFrame(np.random.rand(count, 2), columns=['a', 'b'])
pdf.to_csv(INPUT)
del pd, np
# Setup dask instance
cluster = dask.distributed.LocalCluster(processes=False, silence_logs=False)
client = dask.distributed.Client(cluster)
def dask_task():
df = dask.dataframe.read_csv(INPUT)
x = df['a'].sum()
y = df['b'].mean()
res = client.compute([x, y])
return [r.result() for r in res]
def clean_dask():
client.retire_workers()
client.close()
cluster.close()
gc.collect()
make_input()
times = input('>>> Run a dask task (n=1) [ENTER]: ')
for _ in range(int(times) if times else 1):
print(dask_task())
input('>>> Clean up dask client/cluster [ENTER]: ')
clean_dask()
input('>>> Exit [ENTER]') # Do not close the execution, review memory usage
# Python 3.6.7
# requirements:
# numpy==1.14.3 pandas==0.22.0 dask[dataframe,distributed]==1.0.0
# Also, same behaviour observed with these:
# numpy==1.16.0 pandas==0.23.4 dask[dataframe,distributed]==1.0.0
To break this down
make_input()
creates an input file for you;- I suggest running
dask_task()
several times (5-10); with each iterration memory use would increment; clean_dask()
call closes workers, client, local scheduler and tries cleaning up the memory;
Here are my observations for memory resources
Dask client and cluster set up: 91MB
dask_task()
executed 1 time: 122MB
clean_dask()
ran: 122MB
dask_task()
executed 5 times: 198MB
clean_dask()
ran: 198MB
Re-set, dask_task()
executed 20 times: 237MB
clean_dask()
ran: 237MB
Re-set, dask_task()
executed 50 times: 332MB
clean_dask()
ran: 233MB
Re-set, dask_task()
executed 100 times: 492MB
clean_dask()
ran: 492MB
Re-set, dask_task()
executed 200 times: 497MB
clean_dask()
ran: 497MB
Re-set, dask_task()
executed 500 times: 610MB
clean_dask()
ran: 574MB
Re-set, dask_task()
executed 1,000 times: 624MB
clean_dask()
ran: 570MB
Closing client/cluster only managed to clear the memory on some occasions, but not in call cases. The trend of the behaviour is that once a limit is reached, the memory does not go up as much.
Desired behaviour
Once a dask execution is completed, I would like to release the memory used for other processes to be consumed. Dask runs tasks inside a Celery worker, inside a docker container. Restarting workers/containers is obviously an undesired way of managing memory consumption.
I do not require to have a LocalCluster setup, nor the Client. When setting up a client and cluster, it gives an option to explicitly run .close()
on them, which frees up some more memory, rather than running dask alone.
I would much appreciate your assistance or suggestions. Thank you.