Skip to content

client.restart does not restart connection between scheduler and workers #1946

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
alorenzo175 opened this issue Apr 27, 2018 · 3 comments · Fixed by #1977
Closed

client.restart does not restart connection between scheduler and workers #1946

alorenzo175 opened this issue Apr 27, 2018 · 3 comments · Fixed by #1977

Comments

@alorenzo175
Copy link
Contributor

Sorry this is such a long issue, but it is hard to replicate so I wanted to provide enough details.

Summary

It appears that after client.restart() on a cluster where the
scheduler and worker(s) are on different machines, or at least have
some networking delay, jobs submitted to the cluster after the restart
will fail because the scheduler doesn't think the worker has the
promised key. I've tracked this down to a CommClosedError in the
TCP rpc connection from the scheduler to the worker.

Workaround

The issue is resolved when the worker rpc connection
(scheduler.rpc.available[worker]) from the ConnectionPool is
closed in Scheduler.remove_worker so that future connections to
the worker require a new call to connect. I think somewhere in the
underlying network/kernel, the connection is lost on a worker restart
but the ConnectionPool thinks the connection is still open.

Details

I'm using distributed to process NWP data and compute solar power
forecasts every five minutes. The scheduler and workers are running on
a Kubernetes/OpenShift cluster. I'm making many API requests and using
many pandas objects that seem to retain memory on the workers. To
avoid excess memory usage, I call client.restart() after each run.
When the next run after the restart, at least one future is
missing on one of the workers, so the scheduler terminates the
worker which eventually triggers pod restart cooldown periods in
Kuberenetes.

The Workers don't have promised key error wasn't very helpful. I
tracked the source of the issue from Scheduler.gather to
distributed.utils_comm.gather_from_workers to
distributed.core.rpc.get_data to distributed.core.send_recv.
There, the scheduler tries to write a get_data message to the rpc comm
between the scheduler and the worker. The worker never gets this
message and the comm.read() raises a CommClosedError.

This rpc comm comes from the ConnectionPool that is established
before the restart. It seems the restart doesn't close or modify the
connection at all, but something in the networking stack breaks this
connection. Once the scheduler tries to reuse this connection it
thinks is still open, it finds it is closed.

The solution I've found is to close the scheduler.rpc.available
comms for the worker that is removed via
scheduler.remove_worker. Since the worker is removed from the
scheduler, it makes sense to me that any connections to it should also
be removed from the connection pool.

This may not be an issue when workers are assigned random ports
instead of reusing the same address for firewalls etc.

I'm using master(distributed==1.21.6+12.g650118a8), python
3.6.4, and I've only been able to replicate the issue when the
scheduler and worker are on different machines. Here's a simple
test script that illustrates the issue:

Test Script
import random
import time

from distributed.client import Client


def func(a):
    time.sleep(a)
    return 'done with func'


# start scheduler and one worker on different machines
# maybe just need a network delay? hard to test on same machine
client = Client('127.0.0.1:18788')

fut = client.submit(func, random.random())
print(fut)
print(fut.result(timeout=20))
del fut

client.restart()
fut2 = client.submit(func, random.random())
print(fut2)
print(fut2.result(timeout=20))
Scheduler Log
[tony@machine1 /home]$ dask-scheduler --port=18788 --host=127.0.0.1 --no-bokeh
2018-04-27T09:28:18 distributed.scheduler INFO -----------------------------------------------
2018-04-27T09:28:18 distributed.scheduler INFO Clear task state
2018-04-27T09:28:18 distributed.scheduler INFO   Scheduler at:     tcp://127.0.0.1:18788
2018-04-27T09:28:18 distributed.scheduler INFO Local Directory:    /tmp/scheduler-ofm7id4k
2018-04-27T09:28:18 distributed.scheduler INFO -----------------------------------------------
2018-04-27T09:28:22 distributed.comm.tcp DEBUG Incoming connection from 'tcp://127.0.0.1:39818' to 'tcp://127.0.0.1:18788'
2018-04-27T09:28:22 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=20, interval=4
2018-04-27T09:28:22 distributed.comm.tcp DEBUG Setting TCP user timeout: 60000 ms
2018-04-27T09:28:22 distributed.core DEBUG Connection from 'tcp://127.0.0.1:39818' to Scheduler
2018-04-27T09:28:22 distributed.core DEBUG Message from 'tcp://127.0.0.1:39818': {'op': 'register', 'ncores': 1, 'address': 'tcp://127.0.0.1:37503', 'keys': [], 'name': 'tcp://127.0.0.1:37503', 'nbytes': {}, 'now': 1524846502.7244189, 'services': {'nanny': 37502}, 'memory_limit': 3162673920, 'local_directory': '/home2/tony/git_repos/distributed/dask-worker-space/worker-10od07fi', 'resources': {}, 'pid': 8491, 'cpu': 0.0, 'memory': 27631616, 'time': 1524846502.7197278, 'read_bytes': 0.0, 'write_bytes': 0.0, 'num_fds': 24, 'reply': True}
2018-04-27T09:28:22 distributed.core DEBUG Calling into handler add_worker
2018-04-27T09:28:22 distributed.scheduler INFO Register tcp://127.0.0.1:37503
2018-04-27T09:28:22 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=20, interval=4
2018-04-27T09:28:22 distributed.comm.tcp DEBUG Setting TCP user timeout: 60000 ms
2018-04-27T09:28:22 distributed.scheduler INFO Starting worker compute stream, tcp://127.0.0.1:37503
2018-04-27T09:28:23 distributed.core DEBUG Message from 'tcp://127.0.0.1:39818': {'op': 'register', 'address': 'tcp://127.0.0.1:37503', 'name': 'tcp://127.0.0.1:37503', 'ncores': 1, 'now': 1524846503.7358608, 'services': {'nanny': 37502}, 'memory_limit': 3162673920, 'executing': 0, 'in_memory': 0, 'ready': 0, 'in_flight': 0, 'cpu': 5.8, 'memory': 28598272, 'time': 1524846503.2356596, 'read_bytes': 25262472.424614888, 'write_bytes': 1613011.502450814, 'num_fds': 25, 'reply': True}
2018-04-27T09:28:23 distributed.core DEBUG Calling into handler add_worker
2018-04-27T09:28:24 distributed.core DEBUG Message from 'tcp://127.0.0.1:39818': {'op': 'register', 'address': 'tcp://127.0.0.1:37503', 'name': 'tcp://127.0.0.1:37503', 'ncores': 1, 'now': 1524846504.7353473, 'services': {'nanny': 37502}, 'memory_limit': 3162673920, 'executing': 0, 'in_memory': 0, 'ready': 0, 'in_flight': 0, 'cpu': 2.0, 'memory': 28598272, 'time': 1524846504.2367685, 'read_bytes': 22868189.91194618, 'write_bytes': 496927.2609323413, 'num_fds': 25, 'reply': True}
2018-04-27T09:28:24 distributed.core DEBUG Calling into handler add_worker
2018-04-27T09:28:25 distributed.core DEBUG Message from 'tcp://127.0.0.1:39818': {'op': 'register', 'address': 'tcp://127.0.0.1:37503', 'name': 'tcp://127.0.0.1:37503', 'ncores': 1, 'now': 1524846505.2366629, 'services': {'nanny': 37502}, 'memory_limit': 3162673920, 'executing': 0, 'in_memory': 0, 'ready': 0, 'in_flight': 0, 'cpu': 2.0, 'memory': 28598272, 'time': 1524846504.736325, 'read_bytes': 38468970.78856026, 'write_bytes': 1109580.1055852836, 'num_fds': 25, 'reply': True}
2018-04-27T09:28:25 distributed.core DEBUG Calling into handler add_worker
2018-04-27T09:28:25 distributed.core DEBUG Message from 'tcp://127.0.0.1:39818': {'op': 'register', 'address': 'tcp://127.0.0.1:37503', 'name': 'tcp://127.0.0.1:37503', 'ncores': 1, 'now': 1524846505.7364213, 'services': {'nanny': 37502}, 'memory_limit': 3162673920, 'executing': 0, 'in_memory': 0, 'ready': 0, 'in_flight': 0, 'cpu': 2.0, 'memory': 28598272, 'time': 1524846505.2375972, 'read_bytes': 847256.2386201491, 'write_bytes': 857613.8845938717, 'num_fds': 25, 'reply': True}
2018-04-27T09:28:25 distributed.core DEBUG Calling into handler add_worker
2018-04-27T09:28:26 distributed.core DEBUG Message from 'tcp://127.0.0.1:39818': {'op': 'register', 'address': 'tcp://127.0.0.1:37503', 'name': 'tcp://127.0.0.1:37503', 'ncores': 1, 'now': 1524846506.235558, 'services': {'nanny': 37502}, 'memory_limit': 3162673920, 'executing': 0, 'in_memory': 0, 'ready': 0, 'in_flight': 0, 'cpu': 2.0, 'memory': 28598272, 'time': 1524846505.7375107, 'read_bytes': 1335855.2257036832, 'write_bytes': 1331626.4937444825, 'num_fds': 25, 'reply': True}
2018-04-27T09:28:26 distributed.core DEBUG Calling into handler add_worker
2018-04-27T09:28:26 distributed.core DEBUG Message from 'tcp://127.0.0.1:39818': {'op': 'register', 'address': 'tcp://127.0.0.1:37503', 'name': 'tcp://127.0.0.1:37503', 'ncores': 1, 'now': 1524846506.7352347, 'services': {'nanny': 37502}, 'memory_limit': 3162673920, 'executing': 0, 'in_memory': 0, 'ready': 0, 'in_flight': 0, 'cpu': 2.0, 'memory': 28598272, 'time': 1524846506.236569, 'read_bytes': 1610421.2393146958, 'write_bytes': 1619955.1965878115, 'num_fds': 25, 'reply': True}
2018-04-27T09:28:26 distributed.core DEBUG Calling into handler add_worker
2018-04-27T09:28:27 distributed.core DEBUG Message from 'tcp://127.0.0.1:39818': {'op': 'register', 'address': 'tcp://127.0.0.1:37503', 'name': 'tcp://127.0.0.1:37503', 'ncores': 1, 'now': 1524846507.2352679, 'services': {'nanny': 37502}, 'memory_limit': 3162673920, 'executing': 0, 'in_memory': 0, 'ready': 0, 'in_flight': 0, 'cpu': 2.0, 'memory': 28598272, 'time': 1524846506.7361982, 'read_bytes': 1591546.102438589, 'write_bytes': 17843488.621722594, 'num_fds': 25, 'reply': True}
2018-04-27T09:28:27 distributed.core DEBUG Calling into handler add_worker
2018-04-27T09:28:27 distributed.core DEBUG Message from 'tcp://127.0.0.1:39818': {'op': 'register', 'address': 'tcp://127.0.0.1:37503', 'name': 'tcp://127.0.0.1:37503', 'ncores': 1, 'now': 1524846507.7355857, 'services': {'nanny': 37502}, 'memory_limit': 3162673920, 'executing': 0, 'in_memory': 0, 'ready': 0, 'in_flight': 0, 'cpu': 4.0, 'memory': 28598272, 'time': 1524846507.236412, 'read_bytes': 61267972.283324175, 'write_bytes': 19938949.654014755, 'num_fds': 25, 'reply': True}
2018-04-27T09:28:27 distributed.core DEBUG Calling into handler add_worker
2018-04-27T09:28:28 distributed.core DEBUG Message from 'tcp://127.0.0.1:39818': {'op': 'register', 'address': 'tcp://127.0.0.1:37503', 'name': 'tcp://127.0.0.1:37503', 'ncores': 1, 'now': 1524846508.235076, 'services': {'nanny': 37502}, 'memory_limit': 3162673920, 'executing': 0, 'in_memory': 0, 'ready': 0, 'in_flight': 0, 'cpu': 2.0, 'memory': 28598272, 'time': 1524846507.7364628, 'read_bytes': 59213591.89369137, 'write_bytes': 633873.6197924538, 'num_fds': 25, 'reply': True}
2018-04-27T09:28:28 distributed.core DEBUG Calling into handler add_worker
2018-04-27T09:28:28 distributed.core DEBUG Message from 'tcp://127.0.0.1:39818': {'op': 'register', 'address': 'tcp://127.0.0.1:37503', 'name': 'tcp://127.0.0.1:37503', 'ncores': 1, 'now': 1524846508.7351947, 'services': {'nanny': 37502}, 'memory_limit': 3162673920, 'executing': 0, 'in_memory': 0, 'ready': 0, 'in_flight': 0, 'cpu': 2.0, 'memory': 28598272, 'time': 1524846508.2360342, 'read_bytes': 20002599.292720944, 'write_bytes': 196290.29011040617, 'num_fds': 25, 'reply': True}
2018-04-27T09:28:28 distributed.core DEBUG Calling into handler add_worker
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Incoming connection from 'tcp://127.0.0.1:39822' to 'tcp://127.0.0.1:18788'
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=20, interval=4
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP user timeout: 60000 ms
2018-04-27T09:28:29 distributed.core DEBUG Connection from 'tcp://127.0.0.1:39822' to Scheduler
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Incoming connection from 'tcp://127.0.0.1:39824' to 'tcp://127.0.0.1:18788'
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=20, interval=4
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP user timeout: 60000 ms
2018-04-27T09:28:29 distributed.core DEBUG Connection from 'tcp://127.0.0.1:39824' to Scheduler
2018-04-27T09:28:29 distributed.core DEBUG Message from 'tcp://127.0.0.1:39824': {'op': 'identity', 'reply': True}
2018-04-27T09:28:29 distributed.core DEBUG Calling into handler identity
2018-04-27T09:28:29 distributed.core DEBUG Message from 'tcp://127.0.0.1:39822': {'op': 'register-client', 'client': 'Client-0494f106-4a38-11e8-ac18-64006a511e6f', 'reply': False}
2018-04-27T09:28:29 distributed.core DEBUG Calling into handler add_client
2018-04-27T09:28:29 distributed.scheduler INFO Receive client connection: Client-0494f106-4a38-11e8-ac18-64006a511e6f
2018-04-27T09:28:29 distributed.core DEBUG Message from 'tcp://127.0.0.1:39818': {'op': 'register', 'address': 'tcp://127.0.0.1:37503', 'name': 'tcp://127.0.0.1:37503', 'ncores': 1, 'now': 1524846509.2362828, 'services': {'nanny': 37502}, 'memory_limit': 3162673920, 'executing': 1, 'in_memory': 0, 'ready': 0, 'in_flight': 0, 'cpu': 2.0, 'memory': 28598272, 'time': 1524846508.7361846, 'read_bytes': 24410551.24967263, 'write_bytes': 234845.33873713345, 'num_fds': 25, 'reply': True}
2018-04-27T09:28:29 distributed.core DEBUG Calling into handler add_worker
2018-04-27T09:28:29 distributed.core DEBUG Message from 'tcp://127.0.0.1:39818': {'op': 'register', 'address': 'tcp://127.0.0.1:37503', 'name': 'tcp://127.0.0.1:37503', 'ncores': 1, 'now': 1524846509.7356122, 'services': {'nanny': 37502}, 'memory_limit': 3162673920, 'executing': 1, 'in_memory': 0, 'ready': 0, 'in_flight': 0, 'cpu': 2.0, 'memory': 28868608, 'time': 1524846509.238087, 'read_bytes': 84224554.21268511, 'write_bytes': 719026.3322444067, 'num_fds': 25, 'reply': True}
2018-04-27T09:28:29 distributed.core DEBUG Calling into handler add_worker
2018-04-27T09:28:29 distributed.scheduler DEBUG Stimulus task finished func-75aca69ee1a61d4df6ea535491fd9f59, tcp://127.0.0.1:37503
2018-04-27T09:28:29 distributed.core DEBUG Message from 'tcp://127.0.0.1:39824': {'op': 'gather', 'keys': ['func-75aca69ee1a61d4df6ea535491fd9f59'], 'reply': True}
2018-04-27T09:28:29 distributed.core DEBUG Calling into handler gather
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=20, interval=4
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP user timeout: 60000 ms
2018-04-27T09:28:29 distributed.scheduler DEBUG Client Client-0494f106-4a38-11e8-ac18-64006a511e6f releases keys: ['func-75aca69ee1a61d4df6ea535491fd9f59']
2018-04-27T09:28:29 distributed.scheduler INFO Send lost future signal to clients
2018-04-27T09:28:29 distributed.scheduler DEBUG Client fire-and-forget releases keys: []
2018-04-27T09:28:29 distributed.scheduler DEBUG Client Client-0494f106-4a38-11e8-ac18-64006a511e6f releases keys: []
2018-04-27T09:28:29 distributed.scheduler INFO Remove worker tcp://127.0.0.1:37503
2018-04-27T09:28:29 distributed.scheduler INFO Lost all workers
2018-04-27T09:28:29 distributed.scheduler DEBUG Removed worker tcp://127.0.0.1:37503
2018-04-27T09:28:29 distributed.scheduler INFO Clear task state
2018-04-27T09:28:29 distributed.scheduler DEBUG Send kill signal to nannies: {'tcp://127.0.0.1:37503': ('127.0.0.1', 37502)}
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=20, interval=4
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP user timeout: 60000 ms
2018-04-27T09:28:29 distributed.core DEBUG Message from 'tcp://127.0.0.1:39818': {'op': 'close', 'reply': False}
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Incoming connection from 'tcp://127.0.0.1:39832' to 'tcp://127.0.0.1:18788'
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=20, interval=4
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP user timeout: 60000 ms
2018-04-27T09:28:29 distributed.core DEBUG Connection from 'tcp://127.0.0.1:39832' to Scheduler
2018-04-27T09:28:29 distributed.core DEBUG Message from 'tcp://127.0.0.1:39832': {'op': 'unregister', 'address': 'tcp://127.0.0.1:37503', 'reply': True}
2018-04-27T09:28:29 distributed.core DEBUG Calling into handler remove_worker
2018-04-27T09:28:29 distributed.core DEBUG Message from 'tcp://127.0.0.1:39832': {'op': 'unregister', 'address': 'tcp://127.0.0.1:37503', 'reply': True}
2018-04-27T09:28:29 distributed.core DEBUG Calling into handler remove_worker
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Incoming connection from 'tcp://127.0.0.1:39836' to 'tcp://127.0.0.1:18788'
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=20, interval=4
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP user timeout: 60000 ms
2018-04-27T09:28:29 distributed.core DEBUG Connection from 'tcp://127.0.0.1:39836' to Scheduler
2018-04-27T09:28:29 distributed.core DEBUG Message from 'tcp://127.0.0.1:39836': {'op': 'register', 'ncores': 1, 'address': 'tcp://127.0.0.1:37503', 'keys': [], 'name': 'tcp://127.0.0.1:37503', 'nbytes': {}, 'now': 1524846509.9483902, 'services': {'nanny': 37502}, 'memory_limit': 3162673920, 'local_directory': '/home2/tony/git_repos/distributed/dask-worker-space/worker-zzhbfpcs', 'resources': {}, 'pid': 8524, 'cpu': 0.0, 'memory': 27631616, 'time': 1524846509.9446766, 'read_bytes': 0.0, 'write_bytes': 0.0, 'num_fds': 24, 'reply': True}
2018-04-27T09:28:29 distributed.core DEBUG Calling into handler add_worker
2018-04-27T09:28:29 distributed.scheduler INFO Register tcp://127.0.0.1:37503
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=20, interval=4
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP user timeout: 60000 ms
2018-04-27T09:28:29 distributed.scheduler INFO Starting worker compute stream, tcp://127.0.0.1:37503
2018-04-27T09:28:29 distributed.scheduler INFO Clear task state
2018-04-27T09:28:30 distributed.scheduler DEBUG Stimulus task finished func-8e5f4b6cfb97170c1635aa54476d5f1a, tcp://127.0.0.1:37503
2018-04-27T09:28:30 distributed.core DEBUG Message from 'tcp://127.0.0.1:39824': {'op': 'gather', 'keys': ['func-8e5f4b6cfb97170c1635aa54476d5f1a'], 'reply': True}
2018-04-27T09:28:30 distributed.core DEBUG Calling into handler gather
2018-04-27T09:28:30 distributed.scheduler DEBUG Couldn't gather keys {'func-8e5f4b6cfb97170c1635aa54476d5f1a': ['tcp://127.0.0.1:37503']} state: ['memory'] workers: ['tcp://127.0.0.1:37503']
2018-04-27T09:28:30 distributed.scheduler INFO Remove worker tcp://127.0.0.1:37503
2018-04-27T09:28:30 distributed.scheduler INFO Lost all workers
2018-04-27T09:28:30 distributed.scheduler DEBUG Removed worker tcp://127.0.0.1:37503
2018-04-27T09:28:30 distributed.scheduler ERROR Workers don't have promised key. This should never occur: ['tcp://127.0.0.1:37503'], func-8e5f4b6cfb97170c1635aa54476d5f1a
NoneType: None
2018-04-27T09:28:30 distributed.core DEBUG Message from 'tcp://127.0.0.1:39836': {'op': 'unregister', 'address': 'tcp://127.0.0.1:37503', 'reply': True}
2018-04-27T09:28:30 distributed.core DEBUG Calling into handler remove_worker
2018-04-27T09:28:30 distributed.core DEBUG Message from 'tcp://127.0.0.1:39836': {'op': 'close', 'reply': False}
2018-04-27T09:28:30 distributed.core DEBUG Message from 'tcp://127.0.0.1:39832': {'op': 'unregister', 'address': 'tcp://127.0.0.1:37503', 'reply': True}
2018-04-27T09:28:30 distributed.core DEBUG Calling into handler remove_worker
2018-04-27T09:28:30 distributed.core DEBUG Message from 'tcp://127.0.0.1:39836': {'op': 'close', 'reply': False}
2018-04-27T09:28:30 distributed.core DEBUG Message from 'tcp://127.0.0.1:39832': {'op': 'unregister', 'address': 'tcp://127.0.0.1:37503', 'reply': True}
2018-04-27T09:28:30 distributed.core DEBUG Calling into handler remove_worker
2018-04-27T09:28:30 distributed.core DEBUG Message from 'tcp://127.0.0.1:39832': {'op': 'close', 'reply': False}
2018-04-27T09:28:31 distributed.core DEBUG Message from 'tcp://127.0.0.1:39824': {'op': 'identity', 'reply': True}
2018-04-27T09:28:31 distributed.core DEBUG Calling into handler identity
2018-04-27T09:28:33 distributed.core DEBUG Message from 'tcp://127.0.0.1:39824': {'op': 'identity', 'reply': True}
2018-04-27T09:28:33 distributed.core DEBUG Calling into handler identity
2018-04-27T09:28:35 distributed.core DEBUG Message from 'tcp://127.0.0.1:39824': {'op': 'identity', 'reply': True}
2018-04-27T09:28:35 distributed.core DEBUG Calling into handler identity
2018-04-27T09:28:37 distributed.core DEBUG Message from 'tcp://127.0.0.1:39824': {'op': 'identity', 'reply': True}
2018-04-27T09:28:37 distributed.core DEBUG Calling into handler identity
2018-04-27T09:28:39 distributed.core DEBUG Message from 'tcp://127.0.0.1:39824': {'op': 'identity', 'reply': True}
2018-04-27T09:28:39 distributed.core DEBUG Calling into handler identity
2018-04-27T09:28:41 distributed.core DEBUG Message from 'tcp://127.0.0.1:39824': {'op': 'identity', 'reply': True}
2018-04-27T09:28:41 distributed.core DEBUG Calling into handler identity
2018-04-27T09:28:43 distributed.core DEBUG Message from 'tcp://127.0.0.1:39824': {'op': 'identity', 'reply': True}
2018-04-27T09:28:43 distributed.core DEBUG Calling into handler identity
2018-04-27T09:28:45 distributed.core DEBUG Message from 'tcp://127.0.0.1:39824': {'op': 'identity', 'reply': True}
2018-04-27T09:28:45 distributed.core DEBUG Calling into handler identity
2018-04-27T09:28:47 distributed.core DEBUG Message from 'tcp://127.0.0.1:39824': {'op': 'identity', 'reply': True}
2018-04-27T09:28:47 distributed.core DEBUG Calling into handler identity
2018-04-27T09:28:49 distributed.core DEBUG Message from 'tcp://127.0.0.1:39824': {'op': 'identity', 'reply': True}
2018-04-27T09:28:49 distributed.core DEBUG Calling into handler identity
2018-04-27T09:28:49 distributed.scheduler INFO Remove client Client-0494f106-4a38-11e8-ac18-64006a511e6f
2018-04-27T09:28:49 distributed.scheduler DEBUG Client Client-0494f106-4a38-11e8-ac18-64006a511e6f releases keys: ['func-8e5f4b6cfb97170c1635aa54476d5f1a']
2018-04-27T09:28:49 distributed.scheduler DEBUG Finished handle_client coroutine
2018-04-27T09:28:49 distributed.core DEBUG Message from 'tcp://127.0.0.1:39824': {'op': 'close', 'reply': False}
2018-04-27T09:28:49 distributed.scheduler INFO Close client connection: Client-0494f106-4a38-11e8-ac18-64006a511e6f
Worker Log
[tony@machine2 /home]$ ssh -fN machine1 -L18788:localhost:18788 -R37502:localhost:37502 -R37503:localhost:37503; dask-worker 127.0.0.1:18788 --nprocs=1 --nthreads=1 --nanny-port=37502  --listen-address=tcp://127.0.0.1:37503
2018-04-27T09:28:21 distributed.nanny INFO         Start Nanny at: 'tcp://127.0.0.1:37502'
2018-04-27T09:28:22 distributed.process DEBUG [<AsyncProcess ForkServerProcess-1>] got message {'op': 'start', 'future': <Future pending cb=[IOLoop.add_future.<locals>.<lambda>() at /home/tony/miniconda3/envs/nabu/lib/python3.6/site-packages/tornado/ioloop.py:720]>}
2018-04-27T09:28:22 distributed.process DEBUG [<AsyncProcess ForkServerProcess-1>] created process with pid 8491
2018-04-27T09:28:22 distributed.diskutils DEBUG Locking '/home2/tony/git_repos/distributed/dask-worker-space/worker-10od07fi.dirlock'...
2018-04-27T09:28:22 distributed.worker INFO       Start worker at:      tcp://127.0.0.1:37503
2018-04-27T09:28:22 distributed.worker INFO          Listening to:      tcp://127.0.0.1:37503
2018-04-27T09:28:22 distributed.worker INFO              nanny at:            127.0.0.1:37502
2018-04-27T09:28:22 distributed.worker INFO Waiting to connect to:      tcp://127.0.0.1:18788
2018-04-27T09:28:22 distributed.worker INFO -------------------------------------------------
2018-04-27T09:28:22 distributed.worker INFO               Threads:                          1
2018-04-27T09:28:22 distributed.worker INFO                Memory:                    3.16 GB
2018-04-27T09:28:22 distributed.worker INFO       Local Directory: /home2/tony/git_repos/distributed/dask-worker-space/worker-10od07fi
2018-04-27T09:28:22 distributed.worker INFO -------------------------------------------------
2018-04-27T09:28:22 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=10, interval=2
2018-04-27T09:28:22 distributed.comm.tcp DEBUG Setting TCP user timeout: 30000 ms
2018-04-27T09:28:22 distributed.worker INFO         Registered to:      tcp://127.0.0.1:18788
2018-04-27T09:28:22 distributed.worker INFO -------------------------------------------------
2018-04-27T09:28:22 distributed.comm.tcp DEBUG Incoming connection from 'tcp://127.0.0.1:60330' to 'tcp://127.0.0.1:37503'
2018-04-27T09:28:22 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=10, interval=2
2018-04-27T09:28:22 distributed.comm.tcp DEBUG Setting TCP user timeout: 30000 ms
2018-04-27T09:28:22 distributed.core DEBUG Connection from 'tcp://127.0.0.1:60330' to Worker
2018-04-27T09:28:22 distributed.core DEBUG Message from 'tcp://127.0.0.1:60330': {'op': 'compute-stream', 'reply': False}
2018-04-27T09:28:22 distributed.core DEBUG Calling into handler compute_stream
2018-04-27T09:28:23 distributed.worker DEBUG Heartbeat: tcp://127.0.0.1:37503
2018-04-27T09:28:24 distributed.worker DEBUG Heartbeat: tcp://127.0.0.1:37503
2018-04-27T09:28:25 distributed.worker DEBUG Heartbeat: tcp://127.0.0.1:37503
2018-04-27T09:28:25 distributed.worker DEBUG Heartbeat: tcp://127.0.0.1:37503
2018-04-27T09:28:26 distributed.worker DEBUG Heartbeat: tcp://127.0.0.1:37503
2018-04-27T09:28:26 distributed.worker DEBUG Heartbeat: tcp://127.0.0.1:37503
2018-04-27T09:28:27 distributed.worker DEBUG Heartbeat: tcp://127.0.0.1:37503
2018-04-27T09:28:27 distributed.worker DEBUG Heartbeat: tcp://127.0.0.1:37503
2018-04-27T09:28:28 distributed.worker DEBUG Heartbeat: tcp://127.0.0.1:37503
2018-04-27T09:28:28 distributed.worker DEBUG Heartbeat: tcp://127.0.0.1:37503
2018-04-27T09:28:29 distributed.worker DEBUG Execute key: func-75aca69ee1a61d4df6ea535491fd9f59 worker: tcp://127.0.0.1:37503
2018-04-27T09:28:29 distributed.worker DEBUG Heartbeat: tcp://127.0.0.1:37503
2018-04-27T09:28:29 distributed.worker DEBUG Heartbeat: tcp://127.0.0.1:37503
2018-04-27T09:28:29 distributed.worker DEBUG Send compute response to scheduler: func-75aca69ee1a61d4df6ea535491fd9f59, {'op': 'task-finished', 'status': 'OK', 'nbytes': 63, 'type': <class 'str'>, 'start': 1524846509.0513873, 'stop': 1524846509.8432746, 'thread': 139930749351680, 'key': 'func-75aca69ee1a61d4df6ea535491fd9f59'}
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Incoming connection from 'tcp://127.0.0.1:60728' to 'tcp://127.0.0.1:37503'
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=10, interval=2
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP user timeout: 30000 ms
2018-04-27T09:28:29 distributed.core DEBUG Connection from 'tcp://127.0.0.1:60728' to Worker
2018-04-27T09:28:29 distributed.core DEBUG Message from 'tcp://127.0.0.1:60728': {'op': 'get_data', 'keys': ['func-75aca69ee1a61d4df6ea535491fd9f59'], 'close': False, 'reply': True}
2018-04-27T09:28:29 distributed.core DEBUG Calling into handler get_data
2018-04-27T09:28:29 distributed.worker DEBUG Deleted 1 keys
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Incoming connection from 'tcp://127.0.0.1:43094' to 'tcp://127.0.0.1:37502'
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=10, interval=2
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP user timeout: 30000 ms
2018-04-27T09:28:29 distributed.core DEBUG Connection from 'tcp://127.0.0.1:43094' to Nanny
2018-04-27T09:28:29 distributed.core DEBUG Message from 'tcp://127.0.0.1:43094': {'op': 'restart', 'close': True, 'timeout': 96.0, 'executor_wait': False, 'reply': True}
2018-04-27T09:28:29 distributed.core DEBUG Calling into handler restart
2018-04-27T09:28:29 distributed.worker INFO Stopping worker at tcp://127.0.0.1:37503
2018-04-27T09:28:29 distributed.process DEBUG [<AsyncProcess ForkServerProcess-1>] process 8491 exited with code 0
2018-04-27T09:28:29 distributed.process DEBUG [<AsyncProcess ForkServerProcess-1>] got message {'op': 'stop'}
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=10, interval=2
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP user timeout: 30000 ms
2018-04-27T09:28:29 distributed.process DEBUG [<AsyncProcess ForkServerProcess-2>] got message {'op': 'start', 'future': <Future pending cb=[IOLoop.add_future.<locals>.<lambda>() at /home/tony/miniconda3/envs/nabu/lib/python3.6/site-packages/tornado/ioloop.py:720]>}
2018-04-27T09:28:29 distributed.process DEBUG [<AsyncProcess ForkServerProcess-2>] created process with pid 8524
2018-04-27T09:28:29 distributed.diskutils DEBUG Locking '/home2/tony/git_repos/distributed/dask-worker-space/worker-zzhbfpcs.dirlock'...
2018-04-27T09:28:29 distributed.worker INFO       Start worker at:      tcp://127.0.0.1:37503
2018-04-27T09:28:29 distributed.worker INFO          Listening to:      tcp://127.0.0.1:37503
2018-04-27T09:28:29 distributed.worker INFO              nanny at:            127.0.0.1:37502
2018-04-27T09:28:29 distributed.worker INFO Waiting to connect to:      tcp://127.0.0.1:18788
2018-04-27T09:28:29 distributed.worker INFO -------------------------------------------------
2018-04-27T09:28:29 distributed.worker INFO               Threads:                          1
2018-04-27T09:28:29 distributed.worker INFO                Memory:                    3.16 GB
2018-04-27T09:28:29 distributed.worker INFO       Local Directory: /home2/tony/git_repos/distributed/dask-worker-space/worker-zzhbfpcs
2018-04-27T09:28:29 distributed.worker INFO -------------------------------------------------
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=10, interval=2
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP user timeout: 30000 ms
2018-04-27T09:28:29 distributed.worker INFO         Registered to:      tcp://127.0.0.1:18788
2018-04-27T09:28:29 distributed.worker INFO -------------------------------------------------
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Incoming connection from 'tcp://127.0.0.1:60744' to 'tcp://127.0.0.1:37503'
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=10, interval=2
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP user timeout: 30000 ms
2018-04-27T09:28:29 distributed.core DEBUG Connection from 'tcp://127.0.0.1:60744' to Worker
2018-04-27T09:28:29 distributed.core DEBUG Message from 'tcp://127.0.0.1:60744': {'op': 'compute-stream', 'reply': False}
2018-04-27T09:28:29 distributed.core DEBUG Calling into handler compute_stream
2018-04-27T09:28:29 distributed.worker DEBUG Execute key: func-8e5f4b6cfb97170c1635aa54476d5f1a worker: tcp://127.0.0.1:37503
2018-04-27T09:28:30 distributed.worker DEBUG Send compute response to scheduler: func-8e5f4b6cfb97170c1635aa54476d5f1a, {'op': 'task-finished', 'status': 'OK', 'nbytes': 63, 'type': <class 'str'>, 'start': 1524846509.9931896, 'stop': 1524846510.1447158, 'thread': 139930749351680, 'key': 'func-8e5f4b6cfb97170c1635aa54476d5f1a'}
2018-04-27T09:28:30 distributed.worker INFO Stopping worker at tcp://127.0.0.1:37503
2018-04-27T09:28:30 distributed.worker INFO Close compute stream
2018-04-27T09:28:30 distributed.comm.tcp DEBUG Incoming connection from 'tcp://127.0.0.1:43120' to 'tcp://127.0.0.1:37502'
2018-04-27T09:28:30 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=10, interval=2
2018-04-27T09:28:30 distributed.comm.tcp DEBUG Setting TCP user timeout: 30000 ms
2018-04-27T09:28:30 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=10, interval=2
2018-04-27T09:28:30 distributed.comm.tcp DEBUG Setting TCP user timeout: 30000 ms
2018-04-27T09:28:30 distributed.core DEBUG Connection from 'tcp://127.0.0.1:43120' to Nanny
2018-04-27T09:28:30 distributed.core DEBUG Message from 'tcp://127.0.0.1:43120': {'op': 'terminate', 'reply': True}
2018-04-27T09:28:30 distributed.core DEBUG Calling into handler _close
2018-04-27T09:28:30 distributed.nanny INFO Closing Nanny at 'tcp://127.0.0.1:37502'
2018-04-27T09:28:30 distributed.process DEBUG [<AsyncProcess ForkServerProcess-2>] process 8524 exited with code 0
2018-04-27T09:28:30 distributed.process DEBUG [<AsyncProcess ForkServerProcess-2>] got message {'op': 'stop'}
2018-04-27T09:28:30 distributed.dask_worker INFO End worker
Client Log
[tony@machine1 /home]$ python testbad.py                                                                                                                                                                                                                                                   1 ↵
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=20, interval=4
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP user timeout: 60000 ms
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP keepalive: nprobes=10, idle=20, interval=4
2018-04-27T09:28:29 distributed.comm.tcp DEBUG Setting TCP user timeout: 60000 ms
2018-04-27T09:28:29 distributed.client DEBUG Started scheduling coroutines. Synchronized
2018-04-27T09:28:29 distributed.client DEBUG Submit func(...), func-75aca69ee1a61d4df6ea535491fd9f59
<Future: status: pending, key: func-75aca69ee1a61d4df6ea535491fd9f59>
2018-04-27T09:28:29 distributed.client DEBUG Client receives message {'op': 'key-in-memory', 'key': 'func-75aca69ee1a61d4df6ea535491fd9f59', 'type': b'\x80\x04\x95\x14\x00\x00\x00\x00\x00\x00\x00\x8c\x08builtins\x94\x8c\x03str\x94\x93\x94.'}
2018-04-27T09:28:29 distributed.client DEBUG Waiting on futures to clear before gather
done with func
2018-04-27T09:28:29 distributed.client DEBUG Release key func-75aca69ee1a61d4df6ea535491fd9f59
2018-04-27T09:28:29 distributed.client DEBUG Client receives message {'op': 'restart'}
2018-04-27T09:28:29 distributed.client INFO Receive restart signal from scheduler
2018-04-27T09:28:29 distributed.client DEBUG Submit func(...), func-8e5f4b6cfb97170c1635aa54476d5f1a
<Future: status: pending, key: func-8e5f4b6cfb97170c1635aa54476d5f1a>
2018-04-27T09:28:30 distributed.client DEBUG Client receives message {'op': 'key-in-memory', 'key': 'func-8e5f4b6cfb97170c1635aa54476d5f1a', 'type': b'\x80\x04\x95\x14\x00\x00\x00\x00\x00\x00\x00\x8c\x08builtins\x94\x8c\x03str\x94\x93\x94.'}
2018-04-27T09:28:30 distributed.client DEBUG Waiting on futures to clear before gather
2018-04-27T09:28:30 distributed.client WARNING Couldn't gather keys {'func-8e5f4b6cfb97170c1635aa54476d5f1a': ['tcp://127.0.0.1:37503']}
2018-04-27T09:28:30 distributed.client DEBUG Waiting on futures to clear before gather
2018-04-27T09:28:30 distributed.client DEBUG Client receives message {'op': 'lost-data', 'key': 'func-8e5f4b6cfb97170c1635aa54476d5f1a'}
Traceback (most recent call last):
File "testbad.py", line 24, in <module>
print(fut2.result(timeout=20))
File "/home/tony/git_repos/distributed/distributed/client.py", line 169, in result
raiseit=False)
File "/home/tony/git_repos/distributed/distributed/client.py", line 615, in sync
return sync(self.loop, func, *args, **kwargs)
File "/home/tony/git_repos/distributed/distributed/utils.py", line 248, in sync
raise gen.TimeoutError("timed out after %s s." % (timeout,))
tornado.util.TimeoutError: timed out after 20 s.
2018-04-27T09:28:49 distributed.client DEBUG Release key func-8e5f4b6cfb97170c1635aa54476d5f1a
@mrocklin
Copy link
Member

mrocklin commented May 7, 2018

First, let me apologize for the long delay in response here. This is an excellent issue. Thank you for putting the time into the explanation, the minimal example, and the logs.

In way of apology, let me introduce you to the tc linux utility that can add arbitrary latencies to network interfaces. It's great for testing

mrocklin@carbon:~$ ping localhost
PING localhost (127.0.0.1) 56(84) bytes of data.
64 bytes from localhost (127.0.0.1): icmp_seq=1 ttl=64 time=0.093 ms
64 bytes from localhost (127.0.0.1): icmp_seq=2 ttl=64 time=0.050 ms
^C
--- localhost ping statistics ---
2 packets transmitted, 2 received, 0% packet loss, time 1009ms
rtt min/avg/max/mdev = 0.050/0.071/0.093/0.023 ms
mrocklin@carbon:~$ sudo tc qdisc add dev lo root netem delay 50ms
mrocklin@carbon:~$ ping localhost
PING localhost (127.0.0.1) 56(84) bytes of data.
64 bytes from localhost (127.0.0.1): icmp_seq=1 ttl=64 time=100 ms
64 bytes from localhost (127.0.0.1): icmp_seq=2 ttl=64 time=100 ms
64 bytes from localhost (127.0.0.1): icmp_seq=3 ttl=64 time=100 ms
^C
--- localhost ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 2000ms
rtt min/avg/max/mdev = 100.191/100.275/100.324/0.059 ms
mrocklin@carbon:~$ sudo tc qdisc del dev lo root netem
mrocklin@carbon:~$ ping localhost
PING localhost (127.0.0.1) 56(84) bytes of data.
64 bytes from localhost (127.0.0.1): icmp_seq=1 ttl=64 time=0.061 ms
64 bytes from localhost (127.0.0.1): icmp_seq=2 ttl=64 time=0.081 ms
64 bytes from localhost (127.0.0.1): icmp_seq=3 ttl=64 time=0.042 ms
^C
--- localhost ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 2035ms
rtt min/avg/max/mdev = 0.042/0.061/0.081/0.017 ms

http://bencane.com/2012/07/16/tc-adding-simulated-network-latency-to-your-linux-server/

Analysis

Thank you for mentioning that you are setting the worker up on the same port. I tried to reproduce without this and wasn't able to. Once I set the port I start getting the same issues that you do.

The solution I've found is to close the scheduler.rpc.available
comms for the worker that is removed via
scheduler.remove_worker. Since the worker is removed from the
scheduler, it makes sense to me that any connections to it should also
be removed from the connection pool.

That seems like a very sensible approach to me. I'd be happy to do this though I'd also welcome the help with a pull request. I'd like to support more people becoming familiar with scheduler internals if possible.

Pandas

I'm making many API requests and using many pandas objects that seem to retain memory on the workers.

Yeah, this is an issue. I recommend pushing this upstream a bit. If you're interested in constructing a minimal example that leaks memory without dask that would be welcome on the pandas issue tracker. I've tried to do that here: pandas-dev/pandas#19941 but I think that if this comes from several independent users then the Pandas community will start taking more interest in the problem.

@alorenzo175
Copy link
Contributor Author

No problem, thanks for the great, easy to use package! Ah, I didn't know about tc, thanks. I tried trickle but wasn't able to reproduce the issue.

I can take a stab at a PR for this sometime this week.

@mrocklin
Copy link
Member

mrocklin commented May 7, 2018

I can take a stab at a PR for this sometime this week.

I'm glad to hear it. May you discover and fix many more bugs :) Let me know if I can help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants