-
Notifications
You must be signed in to change notification settings - Fork 0
Feature/object storage refactor rl #119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/object_storage_refactor
Are you sure you want to change the base?
Feature/object storage refactor rl #119
Conversation
jayckaiser
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added some comments on these changes, particularly on how the object registry could allow us to clean up code. This is all still incredibly untested.
We will need to do something similar with the callables throughout the project that interface with Snowflake. These are the last main piece that will need to be abstracted, and I do not know where the abstractions should live. Not only do we need to interact differently with the different database backends, but the SQL queries will need to be altered to fit the syntax of each environment. Perhaps we can abstract out the syntax (similar to how Sam abstracted the syntax in the Databricks DBT update), but it may lead to very confusing template queries.
| object_storage_conn_id: str, | ||
| object_storage_conn_id: Optional[str] = None, | ||
| s3_conn_id: Optional[str] = None, # Backwards compatibility parameter | ||
| adls_conn_id: Optional[str] = None, # Backwards compatibility parameter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this for backwards-compatibility with TN's current fork of our code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that wasn't my intention, no. The backwards compatbility comment I guess should only apply to the s3_conn_id param
| super(EdFiToObjectStorageOperator, self).__init__(**kwargs) | ||
|
|
||
| # Handle backwards compatibility for connection ID parameters | ||
| if object_storage_conn_id is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd argue that this value error should occur within the registry. We shouldn't get to this point because the arguments are must present for this class to have been returned by the registry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done here 8ce76f0
| # Connect to object storage and copy file | ||
| tmp_file.seek(0) # Go back to the start of the file before copying to object storage. | ||
| object_storage.upload_from(tmp_file, force_overwrite_to_cloud=True) | ||
| with object_storage.open("wb") as storage_file: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good replacement. I never tested the original approach but found the method as an internal in the Airflow source code. If this is the more kosher approach for writing files into object storage, then I love it.
|
|
||
| # Clean up the temporary file | ||
| try: | ||
| os.unlink(tmp_file.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this happen automatically? If not, why swap to a named temporary file if cleanup is required afterward and could potentially fail?
(It's possible that my implementation of the temporary file was incomplete and this cleanup step is required regardless. If so, ignore my above comment.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated here 2db2f23
| object_storage_conn_id: str, | ||
| object_storage_conn_id: Optional[str] = None, | ||
| s3_conn_id: Optional[str] = None, # Backwards compatibility parameter | ||
| adls_conn_id: Optional[str] = None, # Backwards compatibility parameter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the ADLS connection ID here for the forked version of the repo created by TN?
| def __init__(self): | ||
| self._backends = {} | ||
|
|
||
| def register(self, conn_param: str, single_operator, bulk_operator): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a world where these classes are defined dynamically based on tuples of subclasses, instead of needing to defined named flavors of each combination.
def register(self, conn_param: str, cloud_mixin):
"""Register an object storage backend with its connection parameter and operators"""
DynamicStorage = partial(type, 'DynamicStorage')
single_operator = DynamicStorage( (cloud_mixin, EdFiToObjectStorageOperator) )
bulk_operator = DynamicStorage( (cloud_mixin, BulkEdFiToObjectStorageOperator) )
self._backends[conn_param] = (single_operator, bulk_operator)
# And then in the code at the very end, we can remove the empty child classes and add the following:
OBJECT_STORAGE_REGISTRY.register('s3_conn_id', S3Mixin)
OBJECT_STORAGE_REGISTRY.register('adls_conn_id', ADLSMixin)
Note that this is not tested, and maybe we want to have a bunch of empty named classes anyway, if that's easier to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see what you're saying. I honestly have no preference between the two
|
@jayckaiser I will look through your comments. I forgot to say a couple things up front
|
Creating this as a draft PR to make review and commenting easier.