From 6256bcb29d94f2b72a839864b70848537e75d405 Mon Sep 17 00:00:00 2001 From: Dou Mok Date: Fri, 27 Sep 2024 11:23:19 -0700 Subject: [PATCH] Refactor and fix bug in 'store_object' where sync method was not within try block which could lead to dead lock --- src/hashstore/filehashstore.py | 96 +++++++++++++++++----------------- 1 file changed, 49 insertions(+), 47 deletions(-) diff --git a/src/hashstore/filehashstore.py b/src/hashstore/filehashstore.py index 380e521a..3ae0f3f1 100644 --- a/src/hashstore/filehashstore.py +++ b/src/hashstore/filehashstore.py @@ -536,48 +536,48 @@ def store_object( additional_algorithm, checksum, checksum_algorithm ) - sync_begin_debug_msg = f"Adding pid ({pid}) to locked list." - err_msg = f"Duplicate object request encountered for pid: {pid}. Already in progress." - if self.use_multiprocessing: - with self.object_pid_condition_mp: - # Wait for the pid to release if it's in use - if pid in self.object_locked_pids_mp: - self.fhs_logger.error(err_msg) - raise StoreObjectForPidAlreadyInProgress(err_msg) - # Modify object_locked_pids consecutively - self.fhs_logger.debug(sync_begin_debug_msg) - self.object_locked_pids_mp.append(pid) - else: - with self.object_pid_condition_th: - if pid in self.object_locked_pids_th: - logging.error(err_msg) - raise StoreObjectForPidAlreadyInProgress(err_msg) - self.fhs_logger.debug(sync_begin_debug_msg) - self.object_locked_pids_th.append(pid) try: - self.fhs_logger.debug("Attempting to store object for pid: %s", pid) - object_metadata = self._store_and_validate_data( - pid, - data, - additional_algorithm=additional_algorithm_checked, - checksum=checksum, - checksum_algorithm=checksum_algorithm_checked, - file_size_to_validate=expected_object_size, + err_msg = ( + f"Duplicate object request for pid: {pid}. Already in progress." ) - self.fhs_logger.debug("Attempting to tag object for pid: %s", pid) - cid = object_metadata.cid - self.tag_object(pid, cid) - self.fhs_logger.info("Successfully stored object for pid: %s", pid) + if self.use_multiprocessing: + with self.object_pid_condition_mp: + # Raise exception immediately if pid is in use + if pid in self.object_locked_pids_mp: + self.fhs_logger.error(err_msg) + raise StoreObjectForPidAlreadyInProgress(err_msg) + else: + with self.object_pid_condition_th: + if pid in self.object_locked_pids_th: + logging.error(err_msg) + raise StoreObjectForPidAlreadyInProgress(err_msg) + + try: + self._synchronize_object_locked_pids(pid) + + self.fhs_logger.debug("Attempting to store object for pid: %s", pid) + object_metadata = self._store_and_validate_data( + pid, + data, + additional_algorithm=additional_algorithm_checked, + checksum=checksum, + checksum_algorithm=checksum_algorithm_checked, + file_size_to_validate=expected_object_size, + ) + self.fhs_logger.debug("Attempting to tag object for pid: %s", pid) + cid = object_metadata.cid + self.tag_object(pid, cid) + self.fhs_logger.info("Successfully stored object for pid: %s", pid) + finally: + # Release pid + self._release_object_locked_pids(pid) except Exception as err: err_msg = ( - f"failed to store object for pid: {pid}. Reference files will not be created " - f"or tagged. Unexpected error: {err})" + f"Failed to store object for pid: {pid}. Reference files will not be " + f"created or tagged. Unexpected error: {err})" ) self.fhs_logger.error(err_msg) raise err - finally: - # Release pid - self._release_object_locked_pids(pid) return object_metadata @@ -2521,15 +2521,13 @@ def _release_object_locked_pids(self, pid: str) -> None: with self.object_pid_condition_mp: self.object_locked_pids_mp.remove(pid) self.object_pid_condition_mp.notify() - end_sync_debug_msg = f"Releasing pid ({pid}) from object_locked_pids_mp." - self.fhs_logger.debug(end_sync_debug_msg) + self.fhs_logger.debug(f"Releasing pid ({pid}) from object_locked_pids_mp.") else: # Release pid with self.object_pid_condition_th: self.object_locked_pids_th.remove(pid) self.object_pid_condition_th.notify() - end_sync_debug_msg = f"Releasing pid ({pid}) from object_locked_pids_th." - self.fhs_logger.debug(end_sync_debug_msg) + self.fhs_logger.debug(f"Releasing pid ({pid}) from object_locked_pids_th.") def _synchronize_object_locked_cids(self, cid: str) -> None: """Multiple threads may access a data object via its 'cid' or the respective 'cid @@ -2582,14 +2580,16 @@ def _release_object_locked_cids(self, cid: str) -> None: with self.object_cid_condition_mp: self.object_locked_cids_mp.remove(cid) self.object_cid_condition_mp.notify() - end_sync_debug_msg = f"Releasing cid ({cid}) from object_cid_condition_mp." - self.fhs_logger.debug(end_sync_debug_msg) + self.fhs_logger.debug( + f"Releasing cid ({cid}) from object_cid_condition_mp." + ) else: with self.object_cid_condition_th: self.object_locked_cids_th.remove(cid) self.object_cid_condition_th.notify() - end_sync_debug_msg = f"Releasing cid ({cid}) from object_cid_condition_th." - self.fhs_logger.debug(end_sync_debug_msg) + self.fhs_logger.debug( + f"Releasing cid ({cid}) from object_cid_condition_th." + ) def _synchronize_referenced_locked_pids(self, pid: str) -> None: """Multiple threads may interact with a pid (to tag, untag, delete) and these actions @@ -2645,15 +2645,17 @@ def _release_reference_locked_pids(self, pid: str) -> None: with self.reference_pid_condition_mp: self.reference_locked_pids_mp.remove(pid) self.reference_pid_condition_mp.notify() - end_sync_debug_msg = f"Releasing pid ({pid}) from reference_locked_pids_mp." - self.fhs_logger.debug(end_sync_debug_msg) + self.fhs_logger.debug( + f"Releasing pid ({pid}) from reference_locked_pids_mp." + ) else: # Release pid with self.reference_pid_condition_th: self.reference_locked_pids_th.remove(pid) self.reference_pid_condition_th.notify() - end_sync_debug_msg = f"Releasing pid ({pid}) from reference_locked_pids_th." - self.fhs_logger.debug(end_sync_debug_msg) + self.fhs_logger.debug( + f"Releasing pid ({pid}) from reference_locked_pids_th." + ) # Other Static Methods @staticmethod