From 2b8f4044651f2dbe75f9d02e425baa8b933e0823 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Mon, 19 Aug 2024 11:57:59 -0700 Subject: [PATCH 1/5] Add head s3 bucket function to lib. --- src/toil/jobStores/aws/jobStore.py | 150 ++++++++++++++--------------- src/toil/lib/aws/s3.py | 31 ++++++ 2 files changed, 104 insertions(+), 77 deletions(-) create mode 100644 src/toil/lib/aws/s3.py diff --git a/src/toil/jobStores/aws/jobStore.py b/src/toil/jobStores/aws/jobStore.py index 74958a2d77..dbdf8517e9 100644 --- a/src/toil/jobStores/aws/jobStore.py +++ b/src/toil/jobStores/aws/jobStore.py @@ -62,6 +62,7 @@ uploadFromPath, ) from toil.jobStores.utils import ReadablePipe, ReadableTransformingPipe, WritablePipe +from toil.lib.aws.s3 import head_s3_bucket from toil.lib.aws import build_tag_dict_from_env from toil.lib.aws.session import establish_boto3_session from toil.lib.aws.utils import ( @@ -821,85 +822,80 @@ def bucket_retry_predicate(error): return False bucketExisted = True - for attempt in retry_s3(predicate=bucket_retry_predicate): - with attempt: - try: - # the head_bucket() call makes sure that the bucket exists and the user can access it - self.s3_client.head_bucket(Bucket=bucket_name) - - bucket = self.s3_resource.Bucket(bucket_name) - except ClientError as e: - error_http_status = get_error_status(e) - if error_http_status == 404: - bucketExisted = False - logger.debug("Bucket '%s' does not exist.", bucket_name) - if create: - bucket = create_s3_bucket( - self.s3_resource, bucket_name, self.region - ) - # Wait until the bucket exists before checking the region and adding tags - bucket.wait_until_exists() - - # It is possible for create_bucket to return but - # for an immediate request for the bucket region to - # produce an S3ResponseError with code - # NoSuchBucket. We let that kick us back up to the - # main retry loop. - assert ( - get_bucket_region(bucket_name) == self.region - ), f"bucket_name: {bucket_name}, {get_bucket_region(bucket_name)} != {self.region}" - - tags = build_tag_dict_from_env() - - if tags: - flat_tags = flatten_tags(tags) - bucket_tagging = self.s3_resource.BucketTagging(bucket_name) - bucket_tagging.put(Tagging={'TagSet': flat_tags}) - - # Configure bucket so that we can make objects in - # it public, which was the historical default. - enable_public_objects(bucket_name) - elif block: - raise - else: - return None - elif error_http_status == 301: - # This is raised if the user attempts to get a bucket in a region outside - # the specified one, if the specified one is not `us-east-1`. The us-east-1 - # server allows a user to use buckets from any region. - raise BucketLocationConflictException(get_bucket_region(bucket_name)) - else: - raise - else: - bucketRegion = get_bucket_region(bucket_name) - if bucketRegion != self.region: - raise BucketLocationConflictException(bucketRegion) - - if versioning and not bucketExisted: - # only call this method on bucket creation - bucket.Versioning().enable() - # Now wait until versioning is actually on. Some uploads - # would come back with no versions; maybe they were - # happening too fast and this setting isn't sufficiently - # consistent? - time.sleep(1) - while not self._getBucketVersioning(bucket_name): - logger.warning(f"Waiting for versioning activation on bucket '{bucket_name}'...") - time.sleep(1) - elif check_versioning_consistency: - # now test for versioning consistency - # we should never see any of these errors since 'versioning' should always be true - bucket_versioning = self._getBucketVersioning(bucket_name) - if bucket_versioning != versioning: - assert False, 'Cannot modify versioning on existing bucket' - elif bucket_versioning is None: - assert False, 'Cannot use a bucket with versioning suspended' - if bucketExisted: - logger.debug(f"Using pre-existing job store bucket '{bucket_name}'.") + try: + # the head_bucket() call makes sure that the bucket exists and the user can access it + head_s3_bucket(Bucket=bucket_name) + + bucket = self.s3_resource.Bucket(bucket_name) + except ClientError as e: + error_http_status = get_error_status(e) + if error_http_status == 404: + bucketExisted = False + logger.debug("Bucket '%s' does not exist.", bucket_name) + if create: + bucket = create_s3_bucket(self.s3_resource, bucket_name, self.region) + # Wait until the bucket exists before checking the region and adding tags + bucket.wait_until_exists() + + # It is possible for create_bucket to return but + # for an immediate request for the bucket region to + # produce an S3ResponseError with code + # NoSuchBucket. We let that kick us back up to the + # main retry loop. + assert (get_bucket_region(bucket_name) == self.region),\ + f"bucket_name: {bucket_name}, {get_bucket_region(bucket_name)} != {self.region}" + + tags = build_tag_dict_from_env() + + if tags: + flat_tags = flatten_tags(tags) + bucket_tagging = self.s3_resource.BucketTagging(bucket_name) + bucket_tagging.put(Tagging={'TagSet': flat_tags}) + + # Configure bucket so that we can make objects in + # it public, which was the historical default. + enable_public_objects(bucket_name) + elif block: + raise else: - logger.debug(f"Created new job store bucket '{bucket_name}' with versioning state {versioning}.") + return None + elif error_http_status == 301: + # This is raised if the user attempts to get a bucket in a region outside + # the specified one, if the specified one is not `us-east-1`. The us-east-1 + # server allows a user to use buckets from any region. + raise BucketLocationConflictException(get_bucket_region(bucket_name)) + else: + raise + else: + bucketRegion = get_bucket_region(bucket_name) + if bucketRegion != self.region: + raise BucketLocationConflictException(bucketRegion) + + if versioning and not bucketExisted: + # only call this method on bucket creation + bucket.Versioning().enable() + # Now wait until versioning is actually on. Some uploads + # would come back with no versions; maybe they were + # happening too fast and this setting isn't sufficiently + # consistent? + time.sleep(1) + while not self._getBucketVersioning(bucket_name): + logger.warning(f"Waiting for versioning activation on bucket '{bucket_name}'...") + time.sleep(1) + elif check_versioning_consistency: + # now test for versioning consistency + # we should never see any of these errors since 'versioning' should always be true + bucket_versioning = self._getBucketVersioning(bucket_name) + if bucket_versioning != versioning: + assert False, 'Cannot modify versioning on existing bucket' + elif bucket_versioning is None: + assert False, 'Cannot use a bucket with versioning suspended' + if bucketExisted: + logger.debug(f"Using pre-existing job store bucket '{bucket_name}'.") + else: + logger.debug(f"Created new job store bucket '{bucket_name}' with versioning state {versioning}.") - return bucket + return bucket def _bindDomain(self, domain_name: str, create: bool = False, block: bool = True) -> None: """ diff --git a/src/toil/lib/aws/s3.py b/src/toil/lib/aws/s3.py new file mode 100644 index 0000000000..fab8c05945 --- /dev/null +++ b/src/toil/lib/aws/s3.py @@ -0,0 +1,31 @@ +# Copyright (C) 2015-2024 Regents of the University of California +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from typing import Dict, Any, Optional +from toil.lib.aws import session, AWSServerErrors +from toil.lib.retry import retry + +logger = logging.getLogger(__name__) + + +@retry(errors=[AWSServerErrors]) +def head_s3_bucket(bucket: str, region: Optional[str] = None): + """ + Attempt to HEAD an s3 bucket and return its response. + + :param bucket: AWS bucket name + :param region: Region that we want to look for the bucket in + """ + s3_client = session.client("s3", region_name=region) + return s3_client.head_bucket(Bucket=bucket) From dbe3fabc875fdcc0adbc2de010fec6c62ed54dec Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Mon, 19 Aug 2024 11:59:31 -0700 Subject: [PATCH 2/5] Shorten comment. --- src/toil/jobStores/aws/jobStore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil/jobStores/aws/jobStore.py b/src/toil/jobStores/aws/jobStore.py index dbdf8517e9..3a7ec297ea 100644 --- a/src/toil/jobStores/aws/jobStore.py +++ b/src/toil/jobStores/aws/jobStore.py @@ -823,7 +823,7 @@ def bucket_retry_predicate(error): bucketExisted = True try: - # the head_bucket() call makes sure that the bucket exists and the user can access it + # make sure bucket exists and user can access it head_s3_bucket(Bucket=bucket_name) bucket = self.s3_resource.Bucket(bucket_name) From 08b8f1b3aacf48678b93e6a6c7c8faba198877c1 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Mon, 19 Aug 2024 12:29:16 -0700 Subject: [PATCH 3/5] Delete s3 object. --- src/toil/jobStores/aws/jobStore.py | 38 ++++++++++++++---------------- src/toil/lib/aws/s3.py | 26 +++++++++++++++++--- 2 files changed, 41 insertions(+), 23 deletions(-) diff --git a/src/toil/jobStores/aws/jobStore.py b/src/toil/jobStores/aws/jobStore.py index 3a7ec297ea..cd7072515b 100644 --- a/src/toil/jobStores/aws/jobStore.py +++ b/src/toil/jobStores/aws/jobStore.py @@ -62,7 +62,7 @@ uploadFromPath, ) from toil.jobStores.utils import ReadablePipe, ReadableTransformingPipe, WritablePipe -from toil.lib.aws.s3 import head_s3_bucket +from toil.lib.aws.s3 import head_s3_bucket, delete_s3_object from toil.lib.aws import build_tag_dict_from_env from toil.lib.aws.session import establish_boto3_session from toil.lib.aws.utils import ( @@ -505,15 +505,12 @@ def delete_job(self, job_id): for item in items: item: "ItemTypeDef" version = get_item_from_attributes(attributes=item["Attributes"], name="version") - for attempt in retry_s3(): - with attempt: - if version: - self.s3_client.delete_object(Bucket=self.files_bucket.name, - Key=compat_bytes(item["Name"]), - VersionId=version) - else: - self.s3_client.delete_object(Bucket=self.files_bucket.name, - Key=compat_bytes(item["Name"])) + delete_s3_object( + bucket=self.files_bucket.name, + key=compat_bytes(item["Name"]), + version=version, + region=self.outer.region + ) def get_empty_file_store_id(self, jobStoreID=None, cleanup=False, basename=None) -> FileID: info = self.FileInfo.create(jobStoreID if cleanup else None) @@ -1182,11 +1179,12 @@ def save(self): Expected=expected) # clean up the old version of the file if necessary and safe if self.previousVersion and (self.previousVersion != self.version): - for attempt in retry_s3(): - with attempt: - self.outer.s3_client.delete_object(Bucket=self.outer.files_bucket.name, - Key=compat_bytes(self.fileID), - VersionId=self.previousVersion) + delete_s3_object( + bucket=self.outer.files_bucket.name, + key=compat_bytes(self.fileID), + version=self.previousVersion, + region=self.outer.region + ) self._previousVersion = self._version if numNewContentChunks < self._numContentChunks: residualChunks = range(numNewContentChunks, self._numContentChunks) @@ -1653,11 +1651,11 @@ def delete(self): ItemName=compat_bytes(self.fileID), Expected=expected) if self.previousVersion: - for attempt in retry_s3(): - with attempt: - store.s3_client.delete_object(Bucket=store.files_bucket.name, - Key=compat_bytes(self.fileID), - VersionId=self.previousVersion) + delete_s3_object( + bucket=store.files_bucket.name, + key=compat_bytes(self.fileID), + version=self.previousVersion + ) def getSize(self): """ diff --git a/src/toil/lib/aws/s3.py b/src/toil/lib/aws/s3.py index fab8c05945..f710b36e94 100644 --- a/src/toil/lib/aws/s3.py +++ b/src/toil/lib/aws/s3.py @@ -20,12 +20,32 @@ @retry(errors=[AWSServerErrors]) -def head_s3_bucket(bucket: str, region: Optional[str] = None): +def head_s3_object(bucket: str, key: str, header: Dict[str, Any], region: Optional[str] = None): """ - Attempt to HEAD an s3 bucket and return its response. + Attempt to HEAD an s3 object and return its response. :param bucket: AWS bucket name + :param key: AWS Key name for the s3 object + :param header: Headers to include (mostly for encryption). + See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3/client/head_object.html :param region: Region that we want to look for the bucket in """ s3_client = session.client("s3", region_name=region) - return s3_client.head_bucket(Bucket=bucket) + return s3_client.head_object(Bucket=bucket, Key=key, **header) + + +@retry(errors=[AWSServerErrors]) +def delete_s3_object(bucket: str, key: str, version: Optional[str], region: Optional[str] = None): + """ + Attempt to DELETE an s3 object and return its response. + + :param bucket: AWS bucket name + :param key: AWS Key name for the s3 object + :param version: The object's version ID, if it exists + :param region: Region that we want to look for the bucket in + """ + s3_client = session.client("s3", region_name=region) + if version: + return s3_client.delete_object(Bucket=bucket, Key=key, VersionId=version) + else: + return s3_client.delete_object(Bucket=bucket, Key=key) From ed14bcc9bb119da09ee3a7827d1e3124e1641e40 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Fri, 11 Oct 2024 15:22:50 -0700 Subject: [PATCH 4/5] Add missing head bucket. --- src/toil/lib/aws/s3.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/toil/lib/aws/s3.py b/src/toil/lib/aws/s3.py index 5402a6cf57..41881619b4 100644 --- a/src/toil/lib/aws/s3.py +++ b/src/toil/lib/aws/s3.py @@ -54,6 +54,18 @@ def delete_s3_object(bucket: str, key: str, version: Optional[str], region: Opti return s3_client.delete_object(Bucket=bucket, Key=key) +@retry(errors=[AWSServerErrors]) +def head_s3_bucket(bucket: str, region: Optional[str] = None) -> HeadBucketOutputTypeDef: + """ + Attempt to HEAD an s3 bucket and return its response. + + :param bucket: AWS bucket name + :param region: Region that we want to look for the bucket in + """ + s3_client = session.client("s3", region_name=region) + return s3_client.head_bucket(Bucket=bucket) + + def list_multipart_uploads(bucket: str, region: str, prefix: str, max_uploads: int = 1) -> ListMultipartUploadsOutputTypeDef: s3_client = session.client("s3", region_name=region) return s3_client.list_multipart_uploads(Bucket=bucket, MaxUploads=max_uploads, Prefix=prefix) From dc338846db676ac504d09837b1bef55d2037e582 Mon Sep 17 00:00:00 2001 From: DailyDreaming Date: Mon, 14 Oct 2024 12:08:42 -0700 Subject: [PATCH 5/5] Type. --- src/toil/lib/aws/s3.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/toil/lib/aws/s3.py b/src/toil/lib/aws/s3.py index 41881619b4..69d8e109f5 100644 --- a/src/toil/lib/aws/s3.py +++ b/src/toil/lib/aws/s3.py @@ -14,7 +14,11 @@ import logging from typing import Dict, Any, Optional, List -from mypy_boto3_s3.type_defs import ListMultipartUploadsOutputTypeDef, HeadObjectOutputTypeDef, DeleteObjectOutputTypeDef +from mypy_boto3_s3.type_defs import \ + ListMultipartUploadsOutputTypeDef,\ + HeadObjectOutputTypeDef,\ + DeleteObjectOutputTypeDef,\ + HeadBucketOutputTypeDef from toil.lib.aws import session, AWSServerErrors from toil.lib.retry import retry