Skip to content

Commit

Permalink
Refactor and fix bug in 'store_object' where sync method was not with…
Browse files Browse the repository at this point in the history
…in try block which could lead to dead lock
  • Loading branch information
doulikecookiedough committed Sep 27, 2024
1 parent dbd57de commit 6256bcb
Showing 1 changed file with 49 additions and 47 deletions.
96 changes: 49 additions & 47 deletions src/hashstore/filehashstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,48 +536,48 @@ def store_object(
additional_algorithm, checksum, checksum_algorithm
)

sync_begin_debug_msg = f"Adding pid ({pid}) to locked list."
err_msg = f"Duplicate object request encountered for pid: {pid}. Already in progress."
if self.use_multiprocessing:
with self.object_pid_condition_mp:
# Wait for the pid to release if it's in use
if pid in self.object_locked_pids_mp:
self.fhs_logger.error(err_msg)
raise StoreObjectForPidAlreadyInProgress(err_msg)
# Modify object_locked_pids consecutively
self.fhs_logger.debug(sync_begin_debug_msg)
self.object_locked_pids_mp.append(pid)
else:
with self.object_pid_condition_th:
if pid in self.object_locked_pids_th:
logging.error(err_msg)
raise StoreObjectForPidAlreadyInProgress(err_msg)
self.fhs_logger.debug(sync_begin_debug_msg)
self.object_locked_pids_th.append(pid)
try:
self.fhs_logger.debug("Attempting to store object for pid: %s", pid)
object_metadata = self._store_and_validate_data(
pid,
data,
additional_algorithm=additional_algorithm_checked,
checksum=checksum,
checksum_algorithm=checksum_algorithm_checked,
file_size_to_validate=expected_object_size,
err_msg = (
f"Duplicate object request for pid: {pid}. Already in progress."
)
self.fhs_logger.debug("Attempting to tag object for pid: %s", pid)
cid = object_metadata.cid
self.tag_object(pid, cid)
self.fhs_logger.info("Successfully stored object for pid: %s", pid)
if self.use_multiprocessing:
with self.object_pid_condition_mp:
# Raise exception immediately if pid is in use
if pid in self.object_locked_pids_mp:
self.fhs_logger.error(err_msg)
raise StoreObjectForPidAlreadyInProgress(err_msg)
else:
with self.object_pid_condition_th:
if pid in self.object_locked_pids_th:
logging.error(err_msg)
raise StoreObjectForPidAlreadyInProgress(err_msg)

try:
self._synchronize_object_locked_pids(pid)

self.fhs_logger.debug("Attempting to store object for pid: %s", pid)
object_metadata = self._store_and_validate_data(
pid,
data,
additional_algorithm=additional_algorithm_checked,
checksum=checksum,
checksum_algorithm=checksum_algorithm_checked,
file_size_to_validate=expected_object_size,
)
self.fhs_logger.debug("Attempting to tag object for pid: %s", pid)
cid = object_metadata.cid
self.tag_object(pid, cid)
self.fhs_logger.info("Successfully stored object for pid: %s", pid)
finally:
# Release pid
self._release_object_locked_pids(pid)
except Exception as err:
err_msg = (
f"failed to store object for pid: {pid}. Reference files will not be created "
f"or tagged. Unexpected error: {err})"
f"Failed to store object for pid: {pid}. Reference files will not be "
f"created or tagged. Unexpected error: {err})"
)
self.fhs_logger.error(err_msg)
raise err
finally:
# Release pid
self._release_object_locked_pids(pid)

return object_metadata

Expand Down Expand Up @@ -2521,15 +2521,13 @@ def _release_object_locked_pids(self, pid: str) -> None:
with self.object_pid_condition_mp:
self.object_locked_pids_mp.remove(pid)
self.object_pid_condition_mp.notify()
end_sync_debug_msg = f"Releasing pid ({pid}) from object_locked_pids_mp."
self.fhs_logger.debug(end_sync_debug_msg)
self.fhs_logger.debug(f"Releasing pid ({pid}) from object_locked_pids_mp.")
else:
# Release pid
with self.object_pid_condition_th:
self.object_locked_pids_th.remove(pid)
self.object_pid_condition_th.notify()
end_sync_debug_msg = f"Releasing pid ({pid}) from object_locked_pids_th."
self.fhs_logger.debug(end_sync_debug_msg)
self.fhs_logger.debug(f"Releasing pid ({pid}) from object_locked_pids_th.")

def _synchronize_object_locked_cids(self, cid: str) -> None:
"""Multiple threads may access a data object via its 'cid' or the respective 'cid
Expand Down Expand Up @@ -2582,14 +2580,16 @@ def _release_object_locked_cids(self, cid: str) -> None:
with self.object_cid_condition_mp:
self.object_locked_cids_mp.remove(cid)
self.object_cid_condition_mp.notify()
end_sync_debug_msg = f"Releasing cid ({cid}) from object_cid_condition_mp."
self.fhs_logger.debug(end_sync_debug_msg)
self.fhs_logger.debug(
f"Releasing cid ({cid}) from object_cid_condition_mp."
)
else:
with self.object_cid_condition_th:
self.object_locked_cids_th.remove(cid)
self.object_cid_condition_th.notify()
end_sync_debug_msg = f"Releasing cid ({cid}) from object_cid_condition_th."
self.fhs_logger.debug(end_sync_debug_msg)
self.fhs_logger.debug(
f"Releasing cid ({cid}) from object_cid_condition_th."
)

def _synchronize_referenced_locked_pids(self, pid: str) -> None:
"""Multiple threads may interact with a pid (to tag, untag, delete) and these actions
Expand Down Expand Up @@ -2645,15 +2645,17 @@ def _release_reference_locked_pids(self, pid: str) -> None:
with self.reference_pid_condition_mp:
self.reference_locked_pids_mp.remove(pid)
self.reference_pid_condition_mp.notify()
end_sync_debug_msg = f"Releasing pid ({pid}) from reference_locked_pids_mp."
self.fhs_logger.debug(end_sync_debug_msg)
self.fhs_logger.debug(
f"Releasing pid ({pid}) from reference_locked_pids_mp."
)
else:
# Release pid
with self.reference_pid_condition_th:
self.reference_locked_pids_th.remove(pid)
self.reference_pid_condition_th.notify()
end_sync_debug_msg = f"Releasing pid ({pid}) from reference_locked_pids_th."
self.fhs_logger.debug(end_sync_debug_msg)
self.fhs_logger.debug(
f"Releasing pid ({pid}) from reference_locked_pids_th."
)

# Other Static Methods
@staticmethod
Expand Down

0 comments on commit 6256bcb

Please sign in to comment.