Skip to content

Commit 42b3254

Browse files
committed
[FIX, WIP] Encoding problems in SGE plugin
Fixes #1625
1 parent 5c9a751 commit 42b3254

File tree

1 file changed

+17
-20
lines changed

1 file changed

+17
-20
lines changed

nipype/pipeline/plugins/sge.py

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"""
44
from __future__ import print_function, division, unicode_literals, absolute_import
55

6-
from builtins import object
6+
from builtins import object, str, bytes
77

88
import os
99
import pwd
@@ -18,13 +18,13 @@
1818
from ...interfaces.base import CommandLine
1919
from .base import (SGELikeBatchManagerBase, logger, iflogger, logging)
2020

21-
DEBUGGING_PREFIX = str(int(random.uniform(100, 999)))
21+
DEBUGGING_PREFIX = '{0!d}'.format(random.uniform(100, 999))
2222

2323

2424
def sge_debug_print(message):
2525
""" Needed for debugging on big jobs. Once this is fully vetted, it can be removed.
2626
"""
27-
logger.debug(DEBUGGING_PREFIX + " " + "=!" * 3 + " " + message)
27+
logger.debug('%s ' + '=!' * 3 + ' %s', DEBUGGING_PREFIX, message)
2828
# print DEBUGGING_PREFIX + " " + "=!" * 3 + " " + message
2929

3030

@@ -40,8 +40,7 @@ def __init__(self, job_num, job_queue_state, job_time, job_queue_name, job_slots
4040
self._job_num = int(
4141
job_num) # The primary unique identifier for this job, must be an integer!
4242
# self._jobOwn = None # Who owns this job
43-
self._job_queue_state = str(
44-
job_queue_state) # ["running","zombie",...??]
43+
self._job_queue_state = '%s' % job_queue_state # ["running","zombie",...??]
4544
# self._jobActionState = str(jobActionState) # ['r','qw','S',...??]
4645
self._job_time = job_time # The job start time
4746
self._job_info_creation_time = time.time(
@@ -51,12 +50,10 @@ def __init__(self, job_num, job_queue_state, job_time, job_queue_name, job_slots
5150
self._qsub_command_line = qsub_command_line
5251

5352
def __repr__(self):
54-
return str(self._job_num).ljust(8) \
55-
+ str(self._job_queue_state).ljust(12) \
56-
+ str(self._job_slots).ljust(3) \
57-
+ time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(self._job_time)).ljust(20) \
58-
+ str(self._job_queue_name).ljust(8) \
59-
+ str(self._qsub_command_line)
53+
return '{:<8d}{:12}{:<3d}{:20}{:8}{}'.format(
54+
self._job_num, self._queue_state, self._job_slots,
55+
time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(self._job_time)),
56+
self._job_queue_name, self._qsub_command_line)
6057

6158
def is_initializing(self):
6259
return self._job_queue_state == "initializing"
@@ -150,12 +147,13 @@ def _qacct_verified_complete(taskid):
150147
while qacct_retries > 0:
151148
qacct_retries -= 1
152149
try:
150+
strtaskid = '{0!d}'.format(taskid)
153151
proc = subprocess.Popen(
154-
[this_command, '-o', pwd.getpwuid(os.getuid())[0], '-j', str(taskid)],
152+
[this_command, '-o', pwd.getpwuid(os.getuid())[0], '-j', strtaskid],
155153
stdout=subprocess.PIPE,
156154
stderr=subprocess.PIPE)
157155
qacct_result, _ = proc.communicate()
158-
if qacct_result.find(str(taskid)):
156+
if qacct_result.find(strtaskid):
159157
is_complete = True
160158
sge_debug_print(
161159
"NOTE: qacct for jobs\n{0}".format(qacct_result))
@@ -276,7 +274,7 @@ def _run_qstat(self, reason_for_qstat, force_instant=True):
276274
def print_dictionary(self):
277275
"""For debugging"""
278276
for vv in list(self._task_dictionary.values()):
279-
sge_debug_print(str(vv))
277+
sge_debug_print('%s' % vv)
280278

281279
def is_job_pending(self, task_id):
282280
task_id = int(task_id) # Ensure that it is an integer
@@ -398,7 +396,7 @@ def _submit_batchtask(self, scriptfile, node):
398396
oldlevel = iflogger.level
399397
iflogger.setLevel(logging.getLevelName('CRITICAL'))
400398
tries = 0
401-
result = list()
399+
result = None
402400
while True:
403401
try:
404402
result = cmd.run()
@@ -409,9 +407,8 @@ def _submit_batchtask(self, scriptfile, node):
409407
self._retry_timeout) # sleep 2 seconds and try again.
410408
else:
411409
iflogger.setLevel(oldlevel)
412-
raise RuntimeError('\n'.join((('Could not submit sge task'
413-
' for node %s') % node._id,
414-
str(e))))
410+
raise RuntimeError(
411+
'Could not submit sge task for node %s\n%s' % (node._id, e))
415412
else:
416413
break
417414
iflogger.setLevel(oldlevel)
@@ -421,6 +418,6 @@ def _submit_batchtask(self, scriptfile, node):
421418
lines[-1]).groups()[0])
422419
self._pending[taskid] = node.output_dir()
423420
self._refQstatSubstitute.add_startup_job(taskid, cmd.cmdline)
424-
logger.debug('submitted sge task: %d for node %s with %s' %
425-
(taskid, node._id, cmd.cmdline))
421+
logger.debug('submitted sge task: %d for node %s with %s',
422+
taskid, node._id, cmd.cmdline)
426423
return taskid

0 commit comments

Comments
 (0)