From d27f52938d28ff5dd3aa7de67dfc3391c4d28230 Mon Sep 17 00:00:00 2001 From: Robert Minsk Date: Fri, 4 Nov 2022 13:21:40 -0700 Subject: [PATCH 1/4] refactor: Replace inline string status codes with string constants in TransferCoordinator --- s3transfer/futures.py | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/s3transfer/futures.py b/s3transfer/futures.py index 39e071fb..6ea0b6ef 100644 --- a/s3transfer/futures.py +++ b/s3transfer/futures.py @@ -160,9 +160,16 @@ def provide_transfer_size(self, size): class TransferCoordinator: """A helper class for managing TransferFuture""" + NOT_STARTED_STATUS = 'not-started' + QUEUED_STATUS = 'queued' + RUNNING_STATUS = 'running' + CANCELLED_STATUS = 'cancelled' + FAILED_STATUS = 'failed' + SUCCESS_STATUS = 'success' + def __init__(self, transfer_id=None): self.transfer_id = transfer_id - self._status = 'not-started' + self._status = TransferCoordinator.NOT_STARTED_STATUS self._result = None self._exception = None self._associated_futures = set() @@ -232,7 +239,7 @@ def set_result(self, result): with self._lock: self._exception = None self._result = result - self._status = 'success' + self._status = TransferCoordinator.SUCCESS_STATUS def set_exception(self, exception, override=False): """Set an exception for the TransferFuture @@ -245,7 +252,7 @@ def set_exception(self, exception, override=False): with self._lock: if not self.done() or override: self._exception = exception - self._status = 'failed' + self._status = TransferCoordinator.FAILED_STATUS def result(self): """Waits until TransferFuture is done and returns the result @@ -277,19 +284,19 @@ def cancel(self, msg='', exc_type=CancelledError): should_announce_done = False logger.debug('%s cancel(%s) called', self, msg) self._exception = exc_type(msg) - if self._status == 'not-started': + if self._status == TransferCoordinator.NOT_STARTED_STATUS: should_announce_done = True - self._status = 'cancelled' + self._status = TransferCoordinator.CANCELLED_STATUS if should_announce_done: self.announce_done() def set_status_to_queued(self): """Sets the TransferFutrue's status to running""" - self._transition_to_non_done_state('queued') + self._transition_to_non_done_state(TransferCoordinator.QUEUED_STATUS) def set_status_to_running(self): """Sets the TransferFuture's status to running""" - self._transition_to_non_done_state('running') + self._transition_to_non_done_state(TransferCoordinator.RUNNING_STATUS) def _transition_to_non_done_state(self, desired_state): with self._lock: @@ -335,7 +342,11 @@ def done(self): :returns: False if status is equal to 'failed', 'cancelled', or 'success'. True, otherwise """ - return self.status in ['failed', 'cancelled', 'success'] + return self.status in [ + TransferCoordinator.FAILED_STATUS, + TransferCoordinator.CANCELLED_STATUS, + TransferCoordinator.SUCCESS_STATUS, + ] def add_associated_future(self, future): """Adds a future to be associated with the TransferFuture""" @@ -369,7 +380,7 @@ def announce_done(self): run any done callbacks associated to the TransferFuture if they have not already been ran. """ - if self.status != 'success': + if self.status != TransferCoordinator.SUCCESS_STATUS: self._run_failure_cleanups() self._done_event.set() self._run_done_callbacks() From e58f1dd93eced2268c02212b972b57b4d843efd8 Mon Sep 17 00:00:00 2001 From: Robert Minsk Date: Fri, 4 Nov 2022 15:31:33 -0700 Subject: [PATCH 2/4] fix: cancel() was only calling announce_done() if the state was not-started. This would cause the _done_event to not be set. --- s3transfer/futures.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/s3transfer/futures.py b/s3transfer/futures.py index 6ea0b6ef..d19e31a3 100644 --- a/s3transfer/futures.py +++ b/s3transfer/futures.py @@ -281,14 +281,13 @@ def cancel(self, msg='', exc_type=CancelledError): """ with self._lock: if not self.done(): - should_announce_done = False logger.debug('%s cancel(%s) called', self, msg) self._exception = exc_type(msg) - if self._status == TransferCoordinator.NOT_STARTED_STATUS: - should_announce_done = True + started = ( + self._status != TransferCoordinator.NOT_STARTED_STATUS + ) self._status = TransferCoordinator.CANCELLED_STATUS - if should_announce_done: - self.announce_done() + self.announce_done(started) def set_status_to_queued(self): """Sets the TransferFutrue's status to running""" @@ -372,17 +371,25 @@ def add_failure_cleanup(self, function, *args, **kwargs): FunctionContainer(function, *args, **kwargs) ) - def announce_done(self): + def announce_done(self, started=True): """Announce that future is done running and run associated callbacks This will run any failure cleanups if the transfer failed if not they have not been run, allows the result() to be unblocked, and will run any done callbacks associated to the TransferFuture if they have not already been ran. + + :param started: Has the transfer started yet. If not started the + transfer can be canceled immediately and failure cleanups will not + be called. """ - if self.status != TransferCoordinator.SUCCESS_STATUS: + # Only run failure cleanups if the transfer has started. If a transfer + # is in the not-started state when cancel() is called the transfer is + # not considered a failure. + if started and self.status != TransferCoordinator.SUCCESS_STATUS: self._run_failure_cleanups() self._done_event.set() + # Done callbacks are run even if the transfer has not been started self._run_done_callbacks() def _run_done_callbacks(self): From 3e10792c871e71b305a957cc1aedce29615b9fcd Mon Sep 17 00:00:00 2001 From: Robert Minsk Date: Fri, 4 Nov 2022 15:37:00 -0700 Subject: [PATCH 3/4] test: Add unit tests for started parameter on announce_done --- tests/unit/test_futures.py | 37 +++++++++++++++++++++++++++++++++++++ tests/unit/test_tasks.py | 18 ++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/tests/unit/test_futures.py b/tests/unit/test_futures.py index ed196a3a..73bdb23b 100644 --- a/tests/unit/test_futures.py +++ b/tests/unit/test_futures.py @@ -441,6 +441,26 @@ def test_done_callbacks_on_done(self): self.transfer_coordinator.announce_done() self.assertEqual(done_callback_invocations, ['done callback called']) + def test_done_callbacks_on_done_not_running(self): + done_callback_invocations = [] + callback = FunctionContainer( + done_callback_invocations.append, 'done callback called' + ) + + # Add the done callback to the transfer. + self.transfer_coordinator.add_done_callback(callback) + + # Announce that the transfer is done. This should invoke the done + # callback. + self.transfer_coordinator.announce_done(False) + self.assertEqual(done_callback_invocations, ['done callback called']) + + # If done is announced again, we should not invoke the callback again + # because done has already been announced and thus the callback has + # been ran as well. + self.transfer_coordinator.announce_done(False) + self.assertEqual(done_callback_invocations, ['done callback called']) + def test_failure_cleanups_on_done(self): cleanup_invocations = [] callback = FunctionContainer( @@ -461,6 +481,23 @@ def test_failure_cleanups_on_done(self): self.transfer_coordinator.announce_done() self.assertEqual(cleanup_invocations, ['cleanup called']) + def test_failure_cleanups_on_done_not_running(self): + cleanup_invocations = [] + callback = FunctionContainer( + cleanup_invocations.append, 'cleanup called' + ) + + # Add the failure cleanup to the transfer. + self.transfer_coordinator.add_failure_cleanup(callback) + + # Announce that the transfer is done but not in a running state. This + # should not invoke the failure cleanup. + self.transfer_coordinator.announce_done(False) + self.assertFalse(cleanup_invocations) + + # Make sure failure cleanups were not removed from the list + self.assertEqual(len(self.transfer_coordinator.failure_cleanups), 1) + class TestBoundedExecutor(unittest.TestCase): def setUp(self): diff --git a/tests/unit/test_tasks.py b/tests/unit/test_tasks.py index 9759e8fb..5bc7576e 100644 --- a/tests/unit/test_tasks.py +++ b/tests/unit/test_tasks.py @@ -399,6 +399,24 @@ def test_submission_task_announces_done_if_cancelled_before_main(self): # for making sure it announces done as nothing else will. self.assertEqual(invocations_of_done, ['done announced']) + def test_failure_cleanup_not_run_if_cancelled_before_main(self): + invocations_of_cleanup = [] + cleanup_callback = FunctionContainer( + invocations_of_cleanup.append, 'cleanup called' + ) + self.transfer_coordinator.add_failure_cleanup(cleanup_callback) + + self.transfer_coordinator.cancel() + submission_task = self.get_task( + NOOPSubmissionTask, main_kwargs=self.main_kwargs + ) + submission_task() + + # Because the submission task was cancelled before being run the + # transfer is not considered a failure and the failure cleanup + # should not be called. + self.assertFalse(invocations_of_cleanup) + class TestTask(unittest.TestCase): def setUp(self): From faf1c08147e2d39300155cd13e9e96851ab200d8 Mon Sep 17 00:00:00 2001 From: Robert Minsk Date: Fri, 4 Nov 2022 13:21:40 -0700 Subject: [PATCH 4/4] refactor: Replace inline string status codes with string constants in TransferCoordinator --- s3transfer/futures.py | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/s3transfer/futures.py b/s3transfer/futures.py index 39e071fb..6ea0b6ef 100644 --- a/s3transfer/futures.py +++ b/s3transfer/futures.py @@ -160,9 +160,16 @@ def provide_transfer_size(self, size): class TransferCoordinator: """A helper class for managing TransferFuture""" + NOT_STARTED_STATUS = 'not-started' + QUEUED_STATUS = 'queued' + RUNNING_STATUS = 'running' + CANCELLED_STATUS = 'cancelled' + FAILED_STATUS = 'failed' + SUCCESS_STATUS = 'success' + def __init__(self, transfer_id=None): self.transfer_id = transfer_id - self._status = 'not-started' + self._status = TransferCoordinator.NOT_STARTED_STATUS self._result = None self._exception = None self._associated_futures = set() @@ -232,7 +239,7 @@ def set_result(self, result): with self._lock: self._exception = None self._result = result - self._status = 'success' + self._status = TransferCoordinator.SUCCESS_STATUS def set_exception(self, exception, override=False): """Set an exception for the TransferFuture @@ -245,7 +252,7 @@ def set_exception(self, exception, override=False): with self._lock: if not self.done() or override: self._exception = exception - self._status = 'failed' + self._status = TransferCoordinator.FAILED_STATUS def result(self): """Waits until TransferFuture is done and returns the result @@ -277,19 +284,19 @@ def cancel(self, msg='', exc_type=CancelledError): should_announce_done = False logger.debug('%s cancel(%s) called', self, msg) self._exception = exc_type(msg) - if self._status == 'not-started': + if self._status == TransferCoordinator.NOT_STARTED_STATUS: should_announce_done = True - self._status = 'cancelled' + self._status = TransferCoordinator.CANCELLED_STATUS if should_announce_done: self.announce_done() def set_status_to_queued(self): """Sets the TransferFutrue's status to running""" - self._transition_to_non_done_state('queued') + self._transition_to_non_done_state(TransferCoordinator.QUEUED_STATUS) def set_status_to_running(self): """Sets the TransferFuture's status to running""" - self._transition_to_non_done_state('running') + self._transition_to_non_done_state(TransferCoordinator.RUNNING_STATUS) def _transition_to_non_done_state(self, desired_state): with self._lock: @@ -335,7 +342,11 @@ def done(self): :returns: False if status is equal to 'failed', 'cancelled', or 'success'. True, otherwise """ - return self.status in ['failed', 'cancelled', 'success'] + return self.status in [ + TransferCoordinator.FAILED_STATUS, + TransferCoordinator.CANCELLED_STATUS, + TransferCoordinator.SUCCESS_STATUS, + ] def add_associated_future(self, future): """Adds a future to be associated with the TransferFuture""" @@ -369,7 +380,7 @@ def announce_done(self): run any done callbacks associated to the TransferFuture if they have not already been ran. """ - if self.status != 'success': + if self.status != TransferCoordinator.SUCCESS_STATUS: self._run_failure_cleanups() self._done_event.set() self._run_done_callbacks()