From acc2340da2d5e711626d0e221d7db9eef7690773 Mon Sep 17 00:00:00 2001 From: John Kerl Date: Fri, 24 Jan 2025 13:17:00 -0500 Subject: [PATCH] [python] Fix `update_obs` semantics for some remote configurations (#3623) * Fix update_obs semantics for some remote configurations * neaten --- apis/python/src/tiledbsoma/_exception.py | 38 -------- apis/python/src/tiledbsoma/io/ingest.py | 119 ++++++++++++++++------- 2 files changed, 82 insertions(+), 75 deletions(-) diff --git a/apis/python/src/tiledbsoma/_exception.py b/apis/python/src/tiledbsoma/_exception.py index 87236a91bd..a6028bb96c 100644 --- a/apis/python/src/tiledbsoma/_exception.py +++ b/apis/python/src/tiledbsoma/_exception.py @@ -95,42 +95,6 @@ class NotCreateableError(SOMAError): pass -def is_not_createable_error(e: SOMAError) -> bool: - """Given a TileDBError, return true if it indicates the object cannot be created - - Lifecycle: Maturing - - Example: - try: - tiledb.Array.create(uri, schema, ctx=ctx) - ... - except tiledb.TileDBError as e: - if is_not_createable_error(e): - ... - raise e - """ - stre = str(e) - # Context: - # * A recurring paradigm in tiledbsoma.io is open for write (if exists) else create -- - # or, equivalently, create (if doesn't already exist), else open for write - # * A priori either seems fine - # * There are performance implications for trying the create first: when an - # object _does_ already exist we get that quickly. - # * Therefore it's more performant to try-create-catch-open-for-write - # * However we have the following semantics for cloud URIs: - # o For writes: must be "creation URIs" of the form "tiledb://namespace/s3://bucket/some/path" - # o For read: can be "creation URIs" _or_ non-creation URIs of the form - # "tiledb://namespace/groupname" or "tiledb://namespace/uuid" - # * Put together: when we try-create-catch-open-for-write, _and when_ the URI provided - # is a non-creation URI, we need to catch that fact and treat it as a non-error. - stre = stre.lower() - if "storage backend local not supported" in stre: - return True - if "storage backend not supported: local" in stre: - return True - return False - - def is_duplicate_group_key_error(e: SOMAError) -> bool: """Given a TileDBError, return try if it indicates a duplicate member add request in a tiledb.Group. @@ -152,8 +116,6 @@ def is_domain_setting_error(e: SOMAError) -> bool: def map_exception_for_create(e: SOMAError, uri: str) -> Exception: if is_already_exists_error(e): return AlreadyExistsError(f"{uri!r} already exists") - if is_not_createable_error(e): - return NotCreateableError(f"{uri!r} cannot be created") if is_domain_setting_error(e): return ValueError(e) return e diff --git a/apis/python/src/tiledbsoma/io/ingest.py b/apis/python/src/tiledbsoma/io/ingest.py index eeb3c08f08..eede5e1fbc 100644 --- a/apis/python/src/tiledbsoma/io/ingest.py +++ b/apis/python/src/tiledbsoma/io/ingest.py @@ -802,6 +802,7 @@ def append_obs( context=context, ingestion_params=ingestion_params, axis_mapping=jidmap, + must_exist=True, ): logging.log_io_same( _util.format_elapsed(s, f"Finish writing obs for {exp.obs.uri}") @@ -867,6 +868,7 @@ def append_var( context=context, ingestion_params=ingestion_params, axis_mapping=jidmap, + must_exist=True, ): logging.log_io_same( _util.format_elapsed(s, f"Finish writing var for {sdf.uri}") @@ -950,6 +952,7 @@ def append_X( context=context, axis_0_mapping=axis_0_mapping, axis_1_mapping=axis_1_mapping, + must_exist=True, ): logging.log_io_same(_util.format_elapsed(s, f"Finish writing X for {X.uri}")) return X.uri @@ -1266,6 +1269,7 @@ def _write_dataframe( platform_config: PlatformConfig | None = None, context: SOMATileDBContext | None = None, axis_mapping: AxisIDMapping, + must_exist: bool = False, ) -> DataFrame: """ Convert and save a pd.DataFrame as a SOMA DataFrame. @@ -1294,6 +1298,7 @@ def _write_dataframe( original_index_metadata=original_index_metadata, platform_config=platform_config, context=context, + must_exist=must_exist, ) @@ -1308,6 +1313,7 @@ def _write_dataframe_impl( original_index_metadata: OriginalIndexMetadata = None, platform_config: PlatformConfig | None = None, context: SOMATileDBContext | None = None, + must_exist: bool = False, ) -> DataFrame: """Save a Pandas DataFrame as a SOMA DataFrame. @@ -1328,23 +1334,12 @@ def _write_dataframe_impl( raise ValueError("internal coding error: id_column_name unspecified") arrow_table = _extract_new_values_for_append(df_uri, arrow_table, context) - try: - # Note: tiledbsoma.io creates dataframes with soma_joinid being the one - # and only index column. - domain = ((0, shape - 1),) - soma_df = DataFrame.create( - df_uri, - schema=arrow_table.schema, - domain=domain, - platform_config=platform_config, - context=context, - ) - except (AlreadyExistsError, NotCreateableError): - if ingestion_params.error_if_already_exists: - raise SOMAError(f"{df_uri} already exists") - - soma_df = DataFrame.open(df_uri, "w", context=context) - + def check_for_containment( + df: pd.DataFrame, + soma_df: DataFrame, + ingestion_params: IngestionParams, + ) -> bool: + """For resume mode, check if the non-empty domain has already been written.""" if ingestion_params.skip_existing_nonempty_domain: storage_ned = _read_nonempty_domain(soma_df) dim_range = ((int(df.index.min()), int(df.index.max())),) @@ -1353,7 +1348,46 @@ def _write_dataframe_impl( f"Skipped {df_uri}", _util.format_elapsed(s, f"SKIPPED {df_uri}"), ) - return soma_df + return True + return False + + if must_exist: + # For update_obs, update_var, append_obs, and append_var, it's + # an error situation if the dataframe doesn't already exist. + soma_df = DataFrame.open(df_uri, "w", context=context) + check_for_containment(df, soma_df, ingestion_params) + + else: + # We could (and used to) do: + # if exists: + # open + # else: + # create + # However, for remote object stores, that's two round-trip requests + # to the server, whether the dataframe exists or not. Instead we + # try create, doing the open if the create threw already-exists. + # When the dataframe doesn't exist, this is just one round-trip request, + # and when it does, it's two (as before). + # + # Note that for append/update, the dataframe must exist; but for + # resume mode, the dataframe may or may not exist. (The point of + # resume mode is to continue from a previous ingest which ended + # prematurely.) + try: + soma_df = DataFrame.create( + df_uri, + schema=arrow_table.schema, + # Note: tiledbsoma.io creates dataframes with soma_joinid being the one + # and only index column. + domain=[[0, shape - 1]], + platform_config=platform_config, + context=context, + ) + except (AlreadyExistsError, NotCreateableError): + if ingestion_params.error_if_already_exists: + raise SOMAError(f"{df_uri} already exists") + soma_df = DataFrame.open(df_uri, "w", context=context) + check_for_containment(df, soma_df, ingestion_params) if ingestion_params.write_schema_no_data: logging.log_io( @@ -1422,6 +1456,7 @@ def _create_from_matrix( context: SOMATileDBContext | None = None, axis_0_mapping: AxisIDMapping, axis_1_mapping: AxisIDMapping, + must_exist: bool = False, ) -> _NDArr: """ Internal helper for user-facing ``create_from_matrix``. @@ -1433,30 +1468,37 @@ def _create_from_matrix( s = _util.get_start_stamp() logging.log_io(None, f"START WRITING {uri}") - try: - shape: Sequence[Union[int, None]] = () - # A SparseNDArray must be appendable in soma.io. - - # Instead of - # shape = tuple(int(e) for e in matrix.shape) - # we consult the registration mapping. This is important - # in the case when multiple H5ADs/AnnDatas are being - # ingested to an experiment which doesn't pre-exist. - shape = (axis_0_mapping.get_shape(), axis_1_mapping.get_shape()) - - soma_ndarray = cls.create( - uri, - type=pa.from_numpy_dtype(matrix.dtype), - shape=shape, - platform_config=platform_config, - context=context, - ) - except (AlreadyExistsError, NotCreateableError): + if must_exist: if ingestion_params.error_if_already_exists: raise SOMAError(f"{uri} already exists") soma_ndarray = cls.open( uri, "w", platform_config=platform_config, context=context ) + else: + try: + shape: Sequence[Union[int, None]] = () + # A SparseNDArray must be appendable in soma.io. + + # Instead of + # shape = tuple(int(e) for e in matrix.shape) + # we consult the registration mapping. This is important + # in the case when multiple H5ADs/AnnDatas are being + # ingested to an experiment which doesn't pre-exist. + shape = (axis_0_mapping.get_shape(), axis_1_mapping.get_shape()) + + soma_ndarray = cls.create( + uri, + type=pa.from_numpy_dtype(matrix.dtype), + shape=shape, + platform_config=platform_config, + context=context, + ) + except (AlreadyExistsError, NotCreateableError): + if ingestion_params.error_if_already_exists: + raise SOMAError(f"{uri} already exists") + soma_ndarray = cls.open( + uri, "w", platform_config=platform_config, context=context + ) if ingestion_params.write_schema_no_data: logging.log_io( @@ -1546,6 +1588,7 @@ def update_obs( Lifecycle: Maturing. """ + _update_dataframe( exp.obs, new_data, @@ -1603,6 +1646,7 @@ def update_var( raise ValueError( f"cannot find measurement name {measurement_name} within experiment at {exp.uri}" ) + _update_dataframe( exp.ms[measurement_name].var, new_data, @@ -1713,6 +1757,7 @@ def _update_dataframe( context=context, platform_config=platform_config, axis_mapping=AxisIDMapping.identity(new_data.shape[0]), + must_exist=True, )