From 47d571a0363691840baeea7b24fd872d8050264e Mon Sep 17 00:00:00 2001 From: leela Date: Thu, 16 Jul 2020 04:15:41 +0000 Subject: [PATCH] Added cloud storage utility to toolkit --- requirements.txt | 1 + setup.py | 2 +- toolkit/__init__.py | 1 + toolkit/storage.py | 118 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 121 insertions(+), 1 deletion(-) create mode 100644 toolkit/storage.py diff --git a/requirements.txt b/requirements.txt index 9d8ae82..16bad09 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ psycopg2-binary pandas pyYAML click +boto3 diff --git a/setup.py b/setup.py index 2dec979..6139c5d 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ def get_version(): __version__ = get_version() -install_requires = ['web.py', 'psycopg2-binary', 'pandas', 'pyYAML', 'click'] +install_requires = ['web.py', 'psycopg2-binary', 'pandas', 'pyYAML', 'click', 'boto3'] extras_require = { 'all': ['requests'] } diff --git a/toolkit/__init__.py b/toolkit/__init__.py index 957b053..c899c21 100644 --- a/toolkit/__init__.py +++ b/toolkit/__init__.py @@ -7,3 +7,4 @@ from .dateutil import relative_date, truncate_date # noqa from .signals import Signal # noqa from .fileformat import FileFormat # noqa +from .storage import StoragePath # noqa diff --git a/toolkit/storage.py b/toolkit/storage.py new file mode 100644 index 0000000..d1fcc8b --- /dev/null +++ b/toolkit/storage.py @@ -0,0 +1,118 @@ +"""Storage utility to work with S3, GCS and minio. + +HOW TO USE: +* Make sure that credentials are configured the way boto3 expects +* You need to do extra setup to use this module with GCS + - Generate GCS HMAC credentials and set them as aws crentials. + - Please make sure that endpoint url is set to 'https://storage.googleapis.com' +""" +import boto3 +import botocore + +s3 = boto3.resource("s3") + +class StoragePath: + """The StoragePath class provides a pathlib.Path like interface for + storage. + USAGE: + root = StoragePath(bucket_name, "alpha") + path = root.join("datasets", "customer-master", "template.csv") + text = path.read_text() + + TODO: Add delete and list directoy methods. + """ + def __init__(self, bucket: str, path: str): + self.bucket = bucket + self.path = path + + @property + def _object(self): + return s3.Object(bucket_name=self.bucket, key=self.path) + + def exists(self): + """Tells the storage path exists or not. + + Checks if the path exists or not by getting objects metadata. + """ + obj = self._object + try: + obj.metadata + return True + except botocore.exceptions.ClientError as e: + if e.response['Error']['Code'] == "404": + return False + raise + + def delete(self): + """Deletes the storage path file. + """ + obj = self._object + obj.delete() + + def download(self, local_path): + """Download the contents of storage file to the local_path file. + """ + obj = self._object + obj.download_file(local_path) + + def upload(self, local_path): + """Uploads the file from local_path to storage path. + """ + obj = self._object + obj.upload_file(local_path) + + def read_text(self): + """Read the contents of a path + """ + obj = self._object + return obj.get()['Body'].read() + + def _get_presigned_url(self, client_method, expires=600, content_type=None): + """Returns a presigned URL for upload or download. + The client_method should be one of get_object or put_object. + """ + params = { + 'Bucket': self.bucket, + 'Key': self.path, + } + if content_type: + params['ContentType'] = content_type + + return s3.meta.client.generate_presigned_url(client_method, + Params=params, + ExpiresIn=expires + ) + + def get_presigned_url_for_download(self, expires=3600): + """Returns a presigned URL for upload. + + The default expiry is one hour (3600 seconds). + """ + return self._get_presigned_url(client_method='get_object', expires=expires) + + def get_presigned_url_for_upload(self, expires=600, content_type="text/csv"): + """Returns a presigned URL for upload. + + The default expiry is 10 minutes (600 seconds). + """ + return self._get_presigned_url(client_method='put_object', expires=expires, content_type=content_type) + + def read_dataframe(self): + """TODO: Support csv and parq. + """ + pass + + def copy_to(self, dest_path): + """Copy the file to destination path within the bucket. + """ + pass + + def join(self, *parts): + """Combine the storage path with one or more parts and returns a new path. + """ + path = "/".join([self.path] + list(parts)) + return StoragePath(self.bucket, path) + + def __repr__(self): + return f'' +