Skip to content

Commit 1ae5547

Browse files
authored
Detect insufficient resources and deadlocks more usefully (#3043)
* Refactor MemoryString to be comparable the Python 3 way * Move the DeadlockException * Remove 2/3 stuff from SingleMachine batch system * Make the AbstractBatchSystem actually be responsible for SingleMachineBatchSystem size checking * Get out-of-resource formatting right * Don't warn with the job directory * Give types to Toil service/deadlock args so Cactus can compare them with numbers * Add --deadlockCheckInterval and superfluous logging * Settle on when we want to log * Implement a way to get hints from the batch system to the user for debugging deadlocks * Revert changes I don't want * Revise deadlock option documentation * Replace hints with messages and details
1 parent 8b45f1a commit 1ae5547

File tree

8 files changed

+198
-77
lines changed

8 files changed

+198
-77
lines changed

docs/running/cliOptions.rst

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,18 @@ the logging module:
236236
concurrently on preemptable nodes.
237237
default=9223372036854775807
238238
--deadlockWait DEADLOCKWAIT
239-
The minimum number of seconds to observe the cluster
240-
stuck running only the same service jobs before
241-
throwing a deadlock exception. default=60
239+
Time, in seconds, to tolerate the workflow running only
240+
the same service jobs, with no jobs to use them, before
241+
declaring the workflow to be deadlocked and stopping.
242+
default=60
243+
--deadlockCheckInterval DEADLOCKCHECKINTERVAL
244+
Time, in seconds, to wait between checks to see if the
245+
workflow is stuck running only service jobs, with no
246+
jobs to use them. Should be shorter than
247+
--deadlockWait. May need to be increased if the batch
248+
system cannot enumerate running jobs quickly enough, or
249+
if polling for running jobs is placing an unacceptable
250+
load on a shared cluster. default=30
242251
--statePollingWait STATEPOLLINGWAIT
243252
Time, in seconds, to wait before doing a scheduler
244253
query for job state. Return cached results if within

src/toil/batchSystems/__init__.py

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,32 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from __future__ import absolute_import
16-
from past.builtins import cmp
17-
from builtins import str
18-
from builtins import object
1915
import sys
2016

21-
if sys.version_info >= (3, 0):
17+
from functools import total_ordering
2218

23-
# https://docs.python.org/3.0/whatsnew/3.0.html#ordering-comparisons
24-
def cmp(a, b):
25-
return (a > b) - (a < b)
19+
class DeadlockException(Exception):
20+
"""
21+
Exception thrown by the Leader or BatchSystem when a deadlock is encountered due to insufficient
22+
resources to run the workflow
23+
"""
24+
def __init__(self, msg):
25+
self.msg = "Deadlock encountered: " + msg
26+
super().__init__()
2627

27-
class MemoryString(object):
28+
def __str__(self):
29+
"""
30+
Stringify the exception, including the message.
31+
"""
32+
return self.msg
33+
34+
@total_ordering
35+
class MemoryString:
36+
"""
37+
Represents an amount of bytes, as a string, using suffixes for the unit.
38+
39+
Comparable based on the actual number of bytes instead of string value.
40+
"""
2841
def __init__(self, string):
2942
if string[-1] == 'K' or string[-1] == 'M' or string[-1] == 'G' or string[-1] == 'T': #10K
3043
self.unit = string[-1]
@@ -55,8 +68,8 @@ def byteVal(self):
5568
elif self.unit == 'T':
5669
return self.val * 1099511627776
5770

58-
def __cmp__(self, other):
59-
return cmp(self.bytes, other.bytes)
71+
def __eq__(self, other):
72+
return self.bytes == other.bytes
6073

61-
def __gt__(self, other):
62-
return self.bytes > other.bytes
74+
def __lt__(self, other):
75+
return self.bytes < other.bytes

src/toil/batchSystems/abstractBatchSystem.py

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,26 @@ def getUpdatedBatchJob(self, maxWait):
183183
batch system does not support tracking wall time.
184184
"""
185185
raise NotImplementedError()
186+
187+
def getSchedulingStatusMessage(self):
188+
"""
189+
Get a log message fragment for the user about anything that might be
190+
going wrong in the batch system, if available.
191+
192+
If no useful message is available, return None.
193+
194+
This can be used to report what resource is the limiting factor when
195+
scheduling jobs, for example. If the leader thinks the workflow is
196+
stuck, the message can be displayed to the user to help them diagnose
197+
why it might be stuck.
198+
199+
:rtype: str or None
200+
:return: User-directed message about scheduling state.
201+
"""
202+
203+
# Default implementation returns None.
204+
# Override to provide scheduling status information.
205+
return None
186206

187207
@abstractmethod
188208
def shutdown(self):
@@ -253,7 +273,7 @@ def __init__(self, config, maxCores, maxMemory, maxDisk):
253273
workflowID=self.config.workflowID,
254274
cleanWorkDir=self.config.cleanWorkDir)
255275

256-
def checkResourceRequest(self, memory, cores, disk):
276+
def checkResourceRequest(self, memory, cores, disk, name=None, detail=None):
257277
"""
258278
Check resource request is not greater than that available or allowed.
259279
@@ -262,6 +282,10 @@ def checkResourceRequest(self, memory, cores, disk):
262282
:param float cores: number of cores being requested
263283
264284
:param int disk: amount of disk space being requested, in bytes
285+
286+
:param str name: Name of the job being checked, for generating a useful error report.
287+
288+
:param str detail: Batch-system-specific message to include in the error.
265289
266290
:raise InsufficientSystemResources: raised when a resource is requested in an amount
267291
greater than allowed
@@ -270,11 +294,14 @@ def checkResourceRequest(self, memory, cores, disk):
270294
assert disk is not None
271295
assert cores is not None
272296
if cores > self.maxCores:
273-
raise InsufficientSystemResources('cores', cores, self.maxCores)
297+
raise InsufficientSystemResources('cores', cores, self.maxCores,
298+
batchSystem=self.__class__.__name__, name=name, detail=detail)
274299
if memory > self.maxMemory:
275-
raise InsufficientSystemResources('memory', memory, self.maxMemory)
300+
raise InsufficientSystemResources('memory', memory, self.maxMemory,
301+
batchSystem=self.__class__.__name__, name=name, detail=detail)
276302
if disk > self.maxDisk:
277-
raise InsufficientSystemResources('disk', disk, self.maxDisk)
303+
raise InsufficientSystemResources('disk', disk, self.maxDisk,
304+
batchSystem=self.__class__.__name__, name=name, detail=detail)
278305

279306
def setEnv(self, name, value=None):
280307
"""
@@ -519,7 +546,7 @@ class InsufficientSystemResources(Exception):
519546
To be raised when a job requests more of a particular resource than is either currently allowed
520547
or avaliable
521548
"""
522-
def __init__(self, resource, requested, available):
549+
def __init__(self, resource, requested, available, batchSystem=None, name=None, detail=None):
523550
"""
524551
Creates an instance of this exception that indicates which resource is insufficient for current
525552
demands, as well as the amount requested and amount actually available.
@@ -530,12 +557,37 @@ def __init__(self, resource, requested, available):
530557
in this exception
531558
532559
:param int|float available: amount of the particular resource actually available
560+
561+
:param str batchSystem: Name of the batch system class complaining, for
562+
generating a useful error report. If you are using a single machine
563+
batch system for local jobs in another batch system, it is important to
564+
know which one has run out of resources.
565+
566+
:param str name: Name of the job being checked, for generating a useful error report.
567+
568+
:param str detail: Batch-system-specific message to include in the error.
533569
"""
534570
self.requested = requested
535571
self.available = available
536572
self.resource = resource
573+
self.batchSystem = batchSystem if batchSystem is not None else 'this batch system'
574+
self.unit = 'bytes of ' if resource == 'disk' or resource == 'memory' else ''
575+
self.name = name
576+
self.detail = detail
537577

538578
def __str__(self):
539-
return 'Requesting more {} than either physically available, or enforced by --max{}. ' \
540-
'Requested: {}, Available: {}'.format(self.resource, self.resource.capitalize(),
541-
self.requested, self.available)
579+
if self.name is not None:
580+
phrases = [('The job {} is requesting {} {}{}, more than '
581+
'the maximum of {} {}{} that {} was configured '
582+
'with.'.format(self.name, self.requested, self.unit, self.resource,
583+
self.available, self.unit, self.resource, self.batchSystem))]
584+
else:
585+
phrases = [('Requesting more {} than either physically available to {}, or enforced by --max{}. '
586+
'Requested: {}, Available: {}'.format(self.resource, self.batchSystem,
587+
self.resource.capitalize(),
588+
self.requested, self.available))]
589+
590+
if self.detail is not None:
591+
phrases.append(self.detail)
592+
593+
return ' '.join(phrases)

src/toil/batchSystems/singleMachine.py

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,6 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from __future__ import absolute_import
16-
from __future__ import division
17-
from future import standard_library
18-
standard_library.install_aliases()
19-
from builtins import str
20-
from builtins import range
21-
from builtins import object
2215
from past.utils import old_div
2316
from contextlib import contextmanager
2417
import logging
@@ -160,6 +153,9 @@ def __init__(self, config, maxCores, maxMemory, maxDisk):
160153
self.memory = ResourcePool(self.maxMemory, 'memory')
161154
# A pool representing the available space in bytes
162155
self.disk = ResourcePool(self.maxDisk, 'disk')
156+
157+
# If we can't schedule something, we fill this in with a reason why
158+
self.schedulingStatusMessage = None
163159

164160
# We use this event to signal shutdown
165161
self.shuttingDown = Event()
@@ -176,7 +172,7 @@ def __init__(self, config, maxCores, maxMemory, maxDisk):
176172
self.daddyThread = Thread(target=self.daddy, daemon=True)
177173
self.daddyThread.start()
178174
log.debug('Started in normal mode.')
179-
175+
180176
def daddy(self):
181177
"""
182178
Be the "daddy" thread.
@@ -352,6 +348,22 @@ def _runDebugJob(self, jobCommand, jobID, environment):
352348
self.runningJobs.pop(jobID)
353349
if not info.killIntended:
354350
self.outputQueue.put(UpdatedBatchJobInfo(jobID=jobID, exitStatus=0, wallTime=time.time() - info.time, exitReason=None))
351+
352+
def getSchedulingStatusMessage(self):
353+
# Implement the abstractBatchSystem's scheduling status message API
354+
return self.schedulingStatusMessage
355+
356+
def _setSchedulingStatusMessage(self, message):
357+
"""
358+
If we can't run a job, we record a short message about why not. If the
359+
leader wants to know what is up with us (for example, to diagnose a
360+
deadlock), it can ask us for the message.
361+
"""
362+
363+
self.schedulingStatusMessage = message
364+
365+
# Report the message in the debug log too.
366+
log.debug(message)
355367

356368
def _startChild(self, jobCommand, jobID, coreFractions, jobMemory, jobDisk, environment):
357369
"""
@@ -423,13 +435,13 @@ def _startChild(self, jobCommand, jobID, coreFractions, jobMemory, jobDisk, envi
423435
# We can't get disk, so free cores and memory
424436
self.coreFractions.release(coreFractions)
425437
self.memory.release(jobMemory)
426-
log.debug('Not enough disk to run job %s', jobID)
438+
self._setSchedulingStatusMessage('Not enough disk to run job %s' % jobID)
427439
else:
428440
# Free cores, since we can't get memory
429441
self.coreFractions.release(coreFractions)
430-
log.debug('Not enough memory to run job %s', jobID)
442+
self._setSchedulingStatusMessage('Not enough memory to run job %s' % jobID)
431443
else:
432-
log.debug('Not enough cores to run job %s', jobID)
444+
self._setSchedulingStatusMessage('Not enough cores to run job %s' % jobID)
433445

434446
# If we get here, we didn't succeed or fail starting the job.
435447
# We didn't manage to get the resources.
@@ -481,16 +493,15 @@ def issueBatchJob(self, jobNode):
481493

482494
self._checkOnDaddy()
483495

484-
# Round cores to minCores and apply scale
485-
cores = math.ceil(jobNode.cores * self.scale / self.minCores) * self.minCores
486-
assert cores <= self.maxCores, ('The job {} is requesting {} cores, more than the maximum of '
487-
'{} cores this batch system was configured with. Scale is '
488-
'set to {}.'.format(jobNode.jobName, cores, self.maxCores, self.scale))
489-
assert cores >= self.minCores
490-
assert jobNode.memory <= self.maxMemory, ('The job {} is requesting {} bytes of memory, more than '
491-
'the maximum of {} this batch system was configured '
492-
'with.'.format(jobNode.jobName, jobNode.memory, self.maxMemory))
493-
496+
# Round cores to minCores and apply scale.
497+
# Make sure to give minCores even if asked for 0 cores, or negative or something.
498+
cores = max(math.ceil(jobNode.cores * self.scale / self.minCores) * self.minCores, self.minCores)
499+
500+
# Don't do our own assertions about job size vs. our configured size.
501+
# The abstract batch system can handle it.
502+
self.checkResourceRequest(jobNode.memory, cores, jobNode.disk, name=jobNode.jobName,
503+
detail='Scale is set to {}.'.format(self.scale))
504+
494505
self.checkResourceRequest(jobNode.memory, cores, jobNode.disk)
495506
log.debug("Issuing the command: %s with memory: %i, cores: %i, disk: %i" % (
496507
jobNode.command, jobNode.memory, cores, jobNode.disk))

src/toil/common.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ def __init__(self):
9999
# Parameters to limit service jobs, so preventing deadlock scheduling scenarios
100100
self.maxPreemptableServiceJobs = sys.maxsize
101101
self.maxServiceJobs = sys.maxsize
102-
self.deadlockWait = 60 # Number of seconds to wait before declaring a deadlock
102+
self.deadlockWait = 60 # Number of seconds we must be stuck with all services before declaring a deadlock
103+
self.deadlockCheckInterval = 30 # Minimum polling delay for deadlocks
103104
self.statePollingWait = 1 # Number of seconds to wait before querying job state
104105

105106
# Resource requirements
@@ -253,6 +254,7 @@ def parseIntList(s):
253254
setOption("maxServiceJobs", int)
254255
setOption("maxPreemptableServiceJobs", int)
255256
setOption("deadlockWait", int)
257+
setOption("deadlockCheckInterval", int)
256258
setOption("statePollingWait", int)
257259

258260
# Resource requirements
@@ -471,16 +473,26 @@ def _addOptions(addGroupFn, config):
471473
"Allows the specification of the maximum number of service jobs "
472474
"in a cluster. By keeping this limited "
473475
" we can avoid all the nodes being occupied with services, so causing a deadlock")
474-
addOptionFn("--maxServiceJobs", dest="maxServiceJobs", default=None,
476+
addOptionFn("--maxServiceJobs", dest="maxServiceJobs", default=None, type=int,
475477
help=(
476478
"The maximum number of service jobs that can be run concurrently, excluding service jobs running on preemptable nodes. default=%s" % config.maxServiceJobs))
477-
addOptionFn("--maxPreemptableServiceJobs", dest="maxPreemptableServiceJobs", default=None,
479+
addOptionFn("--maxPreemptableServiceJobs", dest="maxPreemptableServiceJobs", default=None, type=int,
478480
help=(
479481
"The maximum number of service jobs that can run concurrently on preemptable nodes. default=%s" % config.maxPreemptableServiceJobs))
480-
addOptionFn("--deadlockWait", dest="deadlockWait", default=None,
482+
addOptionFn("--deadlockWait", dest="deadlockWait", default=None, type=int,
481483
help=(
482-
"The minimum number of seconds to observe the cluster stuck running only the same service jobs before throwing a deadlock exception. default=%s" % config.deadlockWait))
483-
addOptionFn("--statePollingWait", dest="statePollingWait", default=1,
484+
"Time, in seconds, to tolerate the workflow running only the same service "
485+
"jobs, with no jobs to use them, before declaring the workflow to be "
486+
"deadlocked and stopping. default=%s" % config.deadlockWait))
487+
addOptionFn("--deadlockCheckInterval", dest="deadlockCheckInterval", default=None, type=int,
488+
help=(
489+
"Time, in seconds, to wait between checks to see if the workflow is stuck "
490+
"running only service jobs, with no jobs to use them. Should be shorter than "
491+
"--deadlockWait. May need to be increased if the batch system cannot "
492+
"enumerate running jobs quickly enough, or if polling for running jobs is "
493+
"placing an unacceptable load on a shared cluster. default=%s" %
494+
config.deadlockCheckInterval))
495+
addOptionFn("--statePollingWait", dest="statePollingWait", default=1, type=int,
484496
help=("Time, in seconds, to wait before doing a scheduler query for job state. "
485497
"Return cached results if within the waiting period."))
486498

src/toil/jobStores/fileJobStore.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,6 @@ def jobs(self):
243243
# is in progress.
244244
for tempDir in self._jobDirectories():
245245
for i in os.listdir(tempDir):
246-
logger.warning('Job Dir: %s' % i)
247246
if i.startswith(self.JOB_DIR_PREFIX):
248247
# This is a job instance directory
249248
jobId = self._getJobIdFromDir(os.path.join(tempDir, i))

0 commit comments

Comments
 (0)