Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
import os
import re
import shlex
import subprocess
import asyncio
import sys
import weakref
import abc
import tempfile
import copy
Expand Down Expand Up @@ -390,8 +389,7 @@ def job_file(self):
yield fn

async def _submit_job(self, script_filename):
# Should we make this async friendly?
return self._call(shlex.split(self.submit_command) + [script_filename])
return await self._call(shlex.split(self.submit_command) + [script_filename])

@property
def worker_process_threads(self):
Expand All @@ -411,8 +409,6 @@ async def start(self):
out = await self._submit_job(fn)
self.job_id = self._job_id_from_submit_output(out)

weakref.finalize(self, self._close_job, self.job_id, self.cancel_command)

logger.debug("Starting job: %s", self.job_id)
await super().start()

Expand All @@ -439,18 +435,18 @@ def _job_id_from_submit_output(self, out):

async def close(self):
logger.debug("Stopping worker: %s job: %s", self.name, self.job_id)
self._close_job(self.job_id, self.cancel_command)
await self._close_job(self.job_id, self.cancel_command)

@classmethod
def _close_job(cls, job_id, cancel_command):
async def _close_job(cls, job_id, cancel_command):
if job_id:
with suppress(RuntimeError): # deleting job when job already gone
cls._call(shlex.split(cancel_command) + [job_id])
await cls._call(shlex.split(cancel_command) + [job_id])
logger.debug("Closed job %s", job_id)

@staticmethod
def _call(cmd, **kwargs):
"""Call a command using subprocess.Popen.
async def _call(cmd, **kwargs):
"""Call a command using asyncio.create_subprocess_exec.

This centralizes calls out to the command line, providing consistent
outputs, logging, and an opportunity to go asynchronous in the future.
Expand All @@ -459,7 +455,7 @@ def _call(cmd, **kwargs):
----------
cmd: List(str))
A command, each of which is a list of strings to hand to
subprocess.Popen
asyncio.create_subprocess_exec

Examples
--------
Expand All @@ -478,11 +474,14 @@ def _call(cmd, **kwargs):
"Executing the following command to command line\n{}".format(cmd_str)
)

proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
**kwargs
)

out, err = proc.communicate()
out, err = await proc.communicate()
out, err = out.decode(), err.decode()

if proc.returncode != 0:
Expand Down Expand Up @@ -745,7 +744,6 @@ def _get_worker_security(self, security):
for key, value in worker_security_dict.items():
# dump worker in-memory keys for use in job_script
if value is not None and "\n" in value:

try:
f = tempfile.NamedTemporaryFile(
mode="wt",
Expand Down
2 changes: 1 addition & 1 deletion dask_jobqueue/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def _submit_job(self, script_filename):
return str(self.process.pid)

@classmethod
def _close_job(self, job_id, cancel_command):
async def _close_job(self, job_id, cancel_command):
os.kill(int(job_id), 9)
# from distributed.utils_test import terminate_process
# terminate_process(self.process)
Expand Down
1 change: 0 additions & 1 deletion dask_jobqueue/oar.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@


class OARJob(Job):

# Override class variables
submit_command = "oarsub"
cancel_command = "oardel"
Expand Down
1 change: 0 additions & 1 deletion dask_jobqueue/tests/test_htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ def test_extra_args_broken_cancel(loop):
cancel_command_extra=["-name", "wrong.docker"],
) as cluster:
with Client(cluster) as client:

cluster.scale(2)

client.wait_for_workers(2, timeout=QUEUE_WAIT)
Expand Down
42 changes: 21 additions & 21 deletions dask_jobqueue/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,25 +432,25 @@ def test_deprecation_job_extra(Cluster):
assert "old_param" in job_script


def test_jobqueue_job_call(tmpdir, Cluster):
cluster = Cluster(cores=1, memory="1GB")

path = tmpdir.join("test.py")
path.write('print("this is the stdout")')

out = cluster.job_cls._call([sys.executable, path.strpath])
assert out == "this is the stdout\n"

path_with_error = tmpdir.join("non-zero-exit-code.py")
path_with_error.write('print("this is the stdout")\n1/0')

match = (
"Command exited with non-zero exit code.+"
"Exit code: 1.+"
"stdout:\nthis is the stdout.+"
"stderr:.+ZeroDivisionError"
)
@pytest.mark.asyncio
async def test_jobqueue_job_call(tmpdir, Cluster):
async with Cluster(cores=1, memory="1GB", asynchronous=True) as cluster:
path = tmpdir.join("test.py")
path.write('print("this is the stdout")')

out = await cluster.job_cls._call([sys.executable, path.strpath])
assert out == "this is the stdout\n"

path_with_error = tmpdir.join("non-zero-exit-code.py")
path_with_error.write('print("this is the stdout")\n1/0')

match = (
"Command exited with non-zero exit code.+"
"Exit code: 1.+"
"stdout:\nthis is the stdout.+"
"stderr:.+ZeroDivisionError"
)

match = re.compile(match, re.DOTALL)
with pytest.raises(RuntimeError, match=match):
cluster.job_cls._call([sys.executable, path_with_error.strpath])
match = re.compile(match, re.DOTALL)
with pytest.raises(RuntimeError, match=match):
await cluster.job_cls._call([sys.executable, path_with_error.strpath])
2 changes: 0 additions & 2 deletions dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ def get_interface_and_port(index=0):


def test_scheduler_options(Cluster):

interface, port = get_interface_and_port()

with Cluster(
Expand Down Expand Up @@ -321,7 +320,6 @@ def test_import_scheduler_options_from_config(Cluster):
with dask.config.set(
{"jobqueue.%s.scheduler-options" % default_config_name: scheduler_options}
):

with Cluster(cores=2, memory="2GB") as cluster:
scheduler_options = cluster.scheduler_spec["options"]
assert scheduler_options.get("interface") == config_scheduler_interface
Expand Down
8 changes: 0 additions & 8 deletions dask_jobqueue/tests/test_lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

def test_header():
with LSFCluster(walltime="00:02", processes=4, cores=8, memory="8GB") as cluster:

assert "#BSUB" in cluster.job_header
assert "#BSUB -J dask-worker" in cluster.job_header
assert "#BSUB -n 8" in cluster.job_header
Expand All @@ -35,7 +34,6 @@ def test_header():
ncpus=24,
mem=100000000000,
) as cluster:

assert "#BSUB -q general" in cluster.job_header
assert "#BSUB -J dask-worker" in cluster.job_header
assert "#BSUB -n 24" in cluster.job_header
Expand All @@ -54,7 +52,6 @@ def test_header():
ncpus=24,
mem=100000000000,
) as cluster:

assert "#BSUB -q general" in cluster.job_header
assert "#BSUB -J dask-worker" in cluster.job_header
assert "#BSUB -n 24" in cluster.job_header
Expand All @@ -65,7 +62,6 @@ def test_header():
assert '#BSUB -P "Dask On LSF"' in cluster.job_header

with LSFCluster(cores=4, memory="8GB") as cluster:

assert "#BSUB -n" in cluster.job_header
assert "#BSUB -W" in cluster.job_header
assert "#BSUB -M" in cluster.job_header
Expand All @@ -75,7 +71,6 @@ def test_header():
with LSFCluster(
cores=4, memory="8GB", job_extra_directives=["-u [email protected]"]
) as cluster:

assert "#BSUB -u [email protected]" in cluster.job_header
assert "#BSUB -n" in cluster.job_header
assert "#BSUB -W" in cluster.job_header
Expand All @@ -86,7 +81,6 @@ def test_header():

def test_job_script():
with LSFCluster(walltime="00:02", processes=4, cores=8, memory="28GB") as cluster:

job_script = cluster.job_script()
assert "#BSUB" in job_script
assert "#BSUB -J dask-worker" in job_script
Expand Down Expand Up @@ -114,7 +108,6 @@ def test_job_script():
ncpus=24,
mem=100000000000,
) as cluster:

job_script = cluster.job_script()
assert "#BSUB -q general" in cluster.job_header
assert "#BSUB -J dask-worker" in cluster.job_header
Expand All @@ -141,7 +134,6 @@ def test_job_script():
project="Dask On LSF",
job_extra_directives=["-R rusage[mem=16GB]"],
) as cluster:

job_script = cluster.job_script()

assert "#BSUB -J dask-worker" in cluster.job_header
Expand Down
10 changes: 0 additions & 10 deletions dask_jobqueue/tests/test_pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def test_header(Cluster):
with Cluster(
walltime="00:02:00", processes=4, cores=8, memory="28GB", name="dask-worker"
) as cluster:

assert "#PBS" in cluster.job_header
assert "#PBS -N dask-worker" in cluster.job_header
assert "#PBS -l select=1:ncpus=8:mem=27GB" in cluster.job_header
Expand All @@ -34,7 +33,6 @@ def test_header(Cluster):
resource_spec="select=1:ncpus=24:mem=100GB",
memory="28GB",
) as cluster:

assert "#PBS -q regular" in cluster.job_header
assert "#PBS -N dask-worker" in cluster.job_header
assert "#PBS -l select=1:ncpus=24:mem=100GB" in cluster.job_header
Expand All @@ -43,15 +41,13 @@ def test_header(Cluster):
assert "#PBS -A DaskOnPBS" in cluster.job_header

with Cluster(cores=4, memory="8GB") as cluster:

assert "#PBS -j oe" not in cluster.job_header
assert "#PBS -N" in cluster.job_header
assert "#PBS -l walltime=" in cluster.job_header
assert "#PBS -A" not in cluster.job_header
assert "#PBS -q" not in cluster.job_header

with Cluster(cores=4, memory="8GB", job_extra_directives=["-j oe"]) as cluster:

assert "#PBS -j oe" in cluster.job_header
assert "#PBS -N" in cluster.job_header
assert "#PBS -l walltime=" in cluster.job_header
Expand All @@ -62,7 +58,6 @@ def test_header(Cluster):
@pytest.mark.parametrize("Cluster", [PBSCluster, MoabCluster])
def test_job_script(Cluster):
with Cluster(walltime="00:02:00", processes=4, cores=8, memory="28GB") as cluster:

job_script = cluster.job_script()
assert "#PBS" in job_script
assert "#PBS -N dask-worker" in job_script
Expand All @@ -88,7 +83,6 @@ def test_job_script(Cluster):
resource_spec="select=1:ncpus=24:mem=100GB",
memory="28GB",
) as cluster:

job_script = cluster.job_script()
assert "#PBS -q regular" in job_script
assert "#PBS -N dask-worker" in job_script
Expand Down Expand Up @@ -119,7 +113,6 @@ def test_basic(loop):
loop=loop,
) as cluster:
with Client(cluster) as client:

cluster.scale(2)
client.wait_for_workers(2, timeout=QUEUE_WAIT)

Expand Down Expand Up @@ -154,7 +147,6 @@ def test_scale_cores_memory(loop):
loop=loop,
) as cluster:
with Client(cluster) as client:

cluster.scale(cores=2)
client.wait_for_workers(1, timeout=QUEUE_WAIT)

Expand Down Expand Up @@ -188,7 +180,6 @@ def test_basic_scale_edge_cases(loop):
job_extra_directives=["-V"],
loop=loop,
) as cluster:

cluster.scale(2)
cluster.scale(0)

Expand Down Expand Up @@ -299,7 +290,6 @@ def test_scale_grouped(loop):
loop=loop,
) as cluster:
with Client(cluster) as client:

cluster.scale(4) # Start 2 jobs

start = time()
Expand Down
1 change: 0 additions & 1 deletion dask_jobqueue/tests/test_sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ def test_basic(loop):
walltime="00:02:00", cores=8, processes=4, memory="2GiB", loop=loop
) as cluster:
with Client(cluster, loop=loop) as client:

cluster.scale(2)

start = time()
Expand Down
5 changes: 0 additions & 5 deletions dask_jobqueue/tests/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ def test_header():
with SLURMCluster(
walltime="00:02:00", processes=4, cores=8, memory="28GB"
) as cluster:

assert "#SBATCH" in cluster.job_header
assert "#SBATCH -J dask-worker" in cluster.job_header
assert "#SBATCH -n 1" in cluster.job_header
Expand All @@ -35,7 +34,6 @@ def test_header():
job_cpu=16,
job_mem="100G",
) as cluster:

assert "#SBATCH --cpus-per-task=16" in cluster.job_header
assert "#SBATCH --cpus-per-task=8" not in cluster.job_header
assert "#SBATCH --mem=100G" in cluster.job_header
Expand All @@ -44,7 +42,6 @@ def test_header():
assert "#SBATCH -p regular" in cluster.job_header

with SLURMCluster(cores=4, memory="8GB") as cluster:

assert "#SBATCH" in cluster.job_header
assert "#SBATCH -J " in cluster.job_header
assert "#SBATCH -n 1" in cluster.job_header
Expand All @@ -57,7 +54,6 @@ def test_job_script():
with SLURMCluster(
walltime="00:02:00", processes=4, cores=8, memory="28GB"
) as cluster:

job_script = cluster.job_script()
assert "#SBATCH" in job_script
assert "#SBATCH -J dask-worker" in job_script
Expand Down Expand Up @@ -127,7 +123,6 @@ def test_basic(loop):
loop=loop,
) as cluster:
with Client(cluster) as client:

cluster.scale(2)

start = time()
Expand Down