Skip to content

Conversation

@jayckaiser
Copy link
Collaborator

Creating this as a draft PR to make review and commenting easier.

@jayckaiser jayckaiser changed the base branch from main to feature/object_storage_refactor October 16, 2025 20:57
Copy link
Collaborator Author

@jayckaiser jayckaiser left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some comments on these changes, particularly on how the object registry could allow us to clean up code. This is all still incredibly untested.

We will need to do something similar with the callables throughout the project that interface with Snowflake. These are the last main piece that will need to be abstracted, and I do not know where the abstractions should live. Not only do we need to interact differently with the different database backends, but the SQL queries will need to be altered to fit the syntax of each environment. Perhaps we can abstract out the syntax (similar to how Sam abstracted the syntax in the Databricks DBT update), but it may lead to very confusing template queries.

object_storage_conn_id: str,
object_storage_conn_id: Optional[str] = None,
s3_conn_id: Optional[str] = None, # Backwards compatibility parameter
adls_conn_id: Optional[str] = None, # Backwards compatibility parameter
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for backwards-compatibility with TN's current fork of our code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that wasn't my intention, no. The backwards compatbility comment I guess should only apply to the s3_conn_id param

super(EdFiToObjectStorageOperator, self).__init__(**kwargs)

# Handle backwards compatibility for connection ID parameters
if object_storage_conn_id is None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue that this value error should occur within the registry. We shouldn't get to this point because the arguments are must present for this class to have been returned by the registry.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done here 8ce76f0

# Connect to object storage and copy file
tmp_file.seek(0) # Go back to the start of the file before copying to object storage.
object_storage.upload_from(tmp_file, force_overwrite_to_cloud=True)
with object_storage.open("wb") as storage_file:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good replacement. I never tested the original approach but found the method as an internal in the Airflow source code. If this is the more kosher approach for writing files into object storage, then I love it.


# Clean up the temporary file
try:
os.unlink(tmp_file.name)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this happen automatically? If not, why swap to a named temporary file if cleanup is required afterward and could potentially fail?

(It's possible that my implementation of the temporary file was incomplete and this cleanup step is required regardless. If so, ignore my above comment.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated here 2db2f23

object_storage_conn_id: str,
object_storage_conn_id: Optional[str] = None,
s3_conn_id: Optional[str] = None, # Backwards compatibility parameter
adls_conn_id: Optional[str] = None, # Backwards compatibility parameter
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the ADLS connection ID here for the forked version of the repo created by TN?

def __init__(self):
self._backends = {}

def register(self, conn_param: str, single_operator, bulk_operator):
Copy link
Collaborator Author

@jayckaiser jayckaiser Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a world where these classes are defined dynamically based on tuples of subclasses, instead of needing to defined named flavors of each combination.

def register(self, conn_param: str, cloud_mixin):
    """Register an object storage backend with its connection parameter and operators"""
    DynamicStorage = partial(type, 'DynamicStorage')
    single_operator = DynamicStorage( (cloud_mixin, EdFiToObjectStorageOperator) )
    bulk_operator   = DynamicStorage( (cloud_mixin, BulkEdFiToObjectStorageOperator) )

    self._backends[conn_param] = (single_operator, bulk_operator)


# And then in the code at the very end, we can remove the empty child classes and add the following:
OBJECT_STORAGE_REGISTRY.register('s3_conn_id', S3Mixin)
OBJECT_STORAGE_REGISTRY.register('adls_conn_id', ADLSMixin)

Note that this is not tested, and maybe we want to have a bunch of empty named classes anyway, if that's easier to understand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see what you're saying. I honestly have no preference between the two

@rlittle08
Copy link
Contributor

@jayckaiser I will look through your comments. I forgot to say a couple things up front

  1. I do also have a draft of the database abstraction, but have not committed. Probably helpful to keep out of this PR while we sort through your feedback
  2. I have been testing this using local airflow stack, and successfully loaded files from ODS -> ADLS. So I would downgrade your statement from 'incredibly untested' to 'quite untested' :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants