-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added cloud storage utility to toolkit
- Loading branch information
Showing
4 changed files
with
121 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,3 +3,4 @@ psycopg2-binary | |
pandas | ||
pyYAML | ||
click | ||
boto3 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
"""Storage utility to work with S3, GCS and minio. | ||
HOW TO USE: | ||
* Make sure that credentials are configured the way boto3 expects | ||
* You need to do extra setup to use this module with GCS | ||
- Generate GCS HMAC credentials and set them as aws crentials. | ||
- Please make sure that endpoint url is set to 'https://storage.googleapis.com' | ||
""" | ||
import boto3 | ||
import botocore | ||
|
||
s3 = boto3.resource("s3") | ||
|
||
class StoragePath: | ||
"""The StoragePath class provides a pathlib.Path like interface for | ||
storage. | ||
USAGE: | ||
root = StoragePath(bucket_name, "alpha") | ||
path = root.join("datasets", "customer-master", "template.csv") | ||
text = path.read_text() | ||
TODO: Add delete and list directoy methods. | ||
""" | ||
def __init__(self, bucket: str, path: str): | ||
self.bucket = bucket | ||
self.path = path | ||
|
||
@property | ||
def _object(self): | ||
return s3.Object(bucket_name=self.bucket, key=self.path) | ||
|
||
def exists(self): | ||
"""Tells the storage path exists or not. | ||
Checks if the path exists or not by getting objects metadata. | ||
""" | ||
obj = self._object | ||
try: | ||
obj.metadata | ||
return True | ||
except botocore.exceptions.ClientError as e: | ||
if e.response['Error']['Code'] == "404": | ||
return False | ||
raise | ||
|
||
def delete(self): | ||
"""Deletes the storage path file. | ||
""" | ||
obj = self._object | ||
obj.delete() | ||
|
||
def download(self, local_path): | ||
"""Download the contents of storage file to the local_path file. | ||
""" | ||
obj = self._object | ||
obj.download_file(local_path) | ||
|
||
def upload(self, local_path): | ||
"""Uploads the file from local_path to storage path. | ||
""" | ||
obj = self._object | ||
obj.upload_file(local_path) | ||
|
||
def read_text(self): | ||
"""Read the contents of a path | ||
""" | ||
obj = self._object | ||
return obj.get()['Body'].read() | ||
|
||
def _get_presigned_url(self, client_method, expires=600, content_type=None): | ||
"""Returns a presigned URL for upload or download. | ||
The client_method should be one of get_object or put_object. | ||
""" | ||
params = { | ||
'Bucket': self.bucket, | ||
'Key': self.path, | ||
} | ||
if content_type: | ||
params['ContentType'] = content_type | ||
|
||
return s3.meta.client.generate_presigned_url(client_method, | ||
Params=params, | ||
ExpiresIn=expires | ||
) | ||
|
||
def get_presigned_url_for_download(self, expires=3600): | ||
"""Returns a presigned URL for upload. | ||
The default expiry is one hour (3600 seconds). | ||
""" | ||
return self._get_presigned_url(client_method='get_object', expires=expires) | ||
|
||
def get_presigned_url_for_upload(self, expires=600, content_type="text/csv"): | ||
"""Returns a presigned URL for upload. | ||
The default expiry is 10 minutes (600 seconds). | ||
""" | ||
return self._get_presigned_url(client_method='put_object', expires=expires, content_type=content_type) | ||
|
||
def read_dataframe(self): | ||
"""TODO: Support csv and parq. | ||
""" | ||
pass | ||
|
||
def copy_to(self, dest_path): | ||
"""Copy the file to destination path within the bucket. | ||
""" | ||
pass | ||
|
||
def join(self, *parts): | ||
"""Combine the storage path with one or more parts and returns a new path. | ||
""" | ||
path = "/".join([self.path] + list(parts)) | ||
return StoragePath(self.bucket, path) | ||
|
||
def __repr__(self): | ||
return f'<StoragePath {self.path}>' | ||
|