Skip to content

Fix TransferCoordinator.cancel() hangs #250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 32 additions & 14 deletions s3transfer/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,16 @@ def provide_transfer_size(self, size):
class TransferCoordinator:
"""A helper class for managing TransferFuture"""

NOT_STARTED_STATUS = 'not-started'
QUEUED_STATUS = 'queued'
RUNNING_STATUS = 'running'
CANCELLED_STATUS = 'cancelled'
FAILED_STATUS = 'failed'
SUCCESS_STATUS = 'success'

def __init__(self, transfer_id=None):
self.transfer_id = transfer_id
self._status = 'not-started'
self._status = TransferCoordinator.NOT_STARTED_STATUS
self._result = None
self._exception = None
self._associated_futures = set()
Expand Down Expand Up @@ -232,7 +239,7 @@ def set_result(self, result):
with self._lock:
self._exception = None
self._result = result
self._status = 'success'
self._status = TransferCoordinator.SUCCESS_STATUS

def set_exception(self, exception, override=False):
"""Set an exception for the TransferFuture
Expand All @@ -245,7 +252,7 @@ def set_exception(self, exception, override=False):
with self._lock:
if not self.done() or override:
self._exception = exception
self._status = 'failed'
self._status = TransferCoordinator.FAILED_STATUS

def result(self):
"""Waits until TransferFuture is done and returns the result
Expand Down Expand Up @@ -274,22 +281,21 @@ def cancel(self, msg='', exc_type=CancelledError):
"""
with self._lock:
if not self.done():
should_announce_done = False
logger.debug('%s cancel(%s) called', self, msg)
self._exception = exc_type(msg)
if self._status == 'not-started':
should_announce_done = True
self._status = 'cancelled'
if should_announce_done:
self.announce_done()
started = (
self._status != TransferCoordinator.NOT_STARTED_STATUS
)
self._status = TransferCoordinator.CANCELLED_STATUS
self.announce_done(started)

def set_status_to_queued(self):
"""Sets the TransferFutrue's status to running"""
self._transition_to_non_done_state('queued')
self._transition_to_non_done_state(TransferCoordinator.QUEUED_STATUS)

def set_status_to_running(self):
"""Sets the TransferFuture's status to running"""
self._transition_to_non_done_state('running')
self._transition_to_non_done_state(TransferCoordinator.RUNNING_STATUS)

def _transition_to_non_done_state(self, desired_state):
with self._lock:
Expand Down Expand Up @@ -335,7 +341,11 @@ def done(self):
:returns: False if status is equal to 'failed', 'cancelled', or
'success'. True, otherwise
"""
return self.status in ['failed', 'cancelled', 'success']
return self.status in [
TransferCoordinator.FAILED_STATUS,
TransferCoordinator.CANCELLED_STATUS,
TransferCoordinator.SUCCESS_STATUS,
]

def add_associated_future(self, future):
"""Adds a future to be associated with the TransferFuture"""
Expand All @@ -361,17 +371,25 @@ def add_failure_cleanup(self, function, *args, **kwargs):
FunctionContainer(function, *args, **kwargs)
)

def announce_done(self):
def announce_done(self, started=True):
"""Announce that future is done running and run associated callbacks

This will run any failure cleanups if the transfer failed if not
they have not been run, allows the result() to be unblocked, and will
run any done callbacks associated to the TransferFuture if they have
not already been ran.

:param started: Has the transfer started yet. If not started the
transfer can be canceled immediately and failure cleanups will not
be called.
"""
if self.status != 'success':
# Only run failure cleanups if the transfer has started. If a transfer
# is in the not-started state when cancel() is called the transfer is
# not considered a failure.
if started and self.status != TransferCoordinator.SUCCESS_STATUS:
self._run_failure_cleanups()
self._done_event.set()
# Done callbacks are run even if the transfer has not been started
self._run_done_callbacks()

def _run_done_callbacks(self):
Expand Down
37 changes: 37 additions & 0 deletions tests/unit/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,26 @@ def test_done_callbacks_on_done(self):
self.transfer_coordinator.announce_done()
self.assertEqual(done_callback_invocations, ['done callback called'])

def test_done_callbacks_on_done_not_running(self):
done_callback_invocations = []
callback = FunctionContainer(
done_callback_invocations.append, 'done callback called'
)

# Add the done callback to the transfer.
self.transfer_coordinator.add_done_callback(callback)

# Announce that the transfer is done. This should invoke the done
# callback.
self.transfer_coordinator.announce_done(False)
self.assertEqual(done_callback_invocations, ['done callback called'])

# If done is announced again, we should not invoke the callback again
# because done has already been announced and thus the callback has
# been ran as well.
self.transfer_coordinator.announce_done(False)
self.assertEqual(done_callback_invocations, ['done callback called'])

def test_failure_cleanups_on_done(self):
cleanup_invocations = []
callback = FunctionContainer(
Expand All @@ -461,6 +481,23 @@ def test_failure_cleanups_on_done(self):
self.transfer_coordinator.announce_done()
self.assertEqual(cleanup_invocations, ['cleanup called'])

def test_failure_cleanups_on_done_not_running(self):
cleanup_invocations = []
callback = FunctionContainer(
cleanup_invocations.append, 'cleanup called'
)

# Add the failure cleanup to the transfer.
self.transfer_coordinator.add_failure_cleanup(callback)

# Announce that the transfer is done but not in a running state. This
# should not invoke the failure cleanup.
self.transfer_coordinator.announce_done(False)
self.assertFalse(cleanup_invocations)

# Make sure failure cleanups were not removed from the list
self.assertEqual(len(self.transfer_coordinator.failure_cleanups), 1)


class TestBoundedExecutor(unittest.TestCase):
def setUp(self):
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,24 @@ def test_submission_task_announces_done_if_cancelled_before_main(self):
# for making sure it announces done as nothing else will.
self.assertEqual(invocations_of_done, ['done announced'])

def test_failure_cleanup_not_run_if_cancelled_before_main(self):
invocations_of_cleanup = []
cleanup_callback = FunctionContainer(
invocations_of_cleanup.append, 'cleanup called'
)
self.transfer_coordinator.add_failure_cleanup(cleanup_callback)

self.transfer_coordinator.cancel()
submission_task = self.get_task(
NOOPSubmissionTask, main_kwargs=self.main_kwargs
)
submission_task()

# Because the submission task was cancelled before being run the
# transfer is not considered a failure and the failure cleanup
# should not be called.
self.assertFalse(invocations_of_cleanup)


class TestTask(unittest.TestCase):
def setUp(self):
Expand Down