Skip to content

Commit

Permalink
[python] Fix update_obs semantics for some remote configurations (#…
Browse files Browse the repository at this point in the history
…3623)

* Fix update_obs semantics for some remote configurations

* neaten
  • Loading branch information
johnkerl authored Jan 24, 2025
1 parent fe8e64d commit acc2340
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 75 deletions.
38 changes: 0 additions & 38 deletions apis/python/src/tiledbsoma/_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,42 +95,6 @@ class NotCreateableError(SOMAError):
pass


def is_not_createable_error(e: SOMAError) -> bool:
"""Given a TileDBError, return true if it indicates the object cannot be created
Lifecycle: Maturing
Example:
try:
tiledb.Array.create(uri, schema, ctx=ctx)
...
except tiledb.TileDBError as e:
if is_not_createable_error(e):
...
raise e
"""
stre = str(e)
# Context:
# * A recurring paradigm in tiledbsoma.io is open for write (if exists) else create --
# or, equivalently, create (if doesn't already exist), else open for write
# * A priori either seems fine
# * There are performance implications for trying the create first: when an
# object _does_ already exist we get that quickly.
# * Therefore it's more performant to try-create-catch-open-for-write
# * However we have the following semantics for cloud URIs:
# o For writes: must be "creation URIs" of the form "tiledb://namespace/s3://bucket/some/path"
# o For read: can be "creation URIs" _or_ non-creation URIs of the form
# "tiledb://namespace/groupname" or "tiledb://namespace/uuid"
# * Put together: when we try-create-catch-open-for-write, _and when_ the URI provided
# is a non-creation URI, we need to catch that fact and treat it as a non-error.
stre = stre.lower()
if "storage backend local not supported" in stre:
return True
if "storage backend not supported: local" in stre:
return True
return False


def is_duplicate_group_key_error(e: SOMAError) -> bool:
"""Given a TileDBError, return try if it indicates a duplicate member
add request in a tiledb.Group.
Expand All @@ -152,8 +116,6 @@ def is_domain_setting_error(e: SOMAError) -> bool:
def map_exception_for_create(e: SOMAError, uri: str) -> Exception:
if is_already_exists_error(e):
return AlreadyExistsError(f"{uri!r} already exists")
if is_not_createable_error(e):
return NotCreateableError(f"{uri!r} cannot be created")
if is_domain_setting_error(e):
return ValueError(e)
return e
119 changes: 82 additions & 37 deletions apis/python/src/tiledbsoma/io/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,7 @@ def append_obs(
context=context,
ingestion_params=ingestion_params,
axis_mapping=jidmap,
must_exist=True,
):
logging.log_io_same(
_util.format_elapsed(s, f"Finish writing obs for {exp.obs.uri}")
Expand Down Expand Up @@ -867,6 +868,7 @@ def append_var(
context=context,
ingestion_params=ingestion_params,
axis_mapping=jidmap,
must_exist=True,
):
logging.log_io_same(
_util.format_elapsed(s, f"Finish writing var for {sdf.uri}")
Expand Down Expand Up @@ -950,6 +952,7 @@ def append_X(
context=context,
axis_0_mapping=axis_0_mapping,
axis_1_mapping=axis_1_mapping,
must_exist=True,
):
logging.log_io_same(_util.format_elapsed(s, f"Finish writing X for {X.uri}"))
return X.uri
Expand Down Expand Up @@ -1266,6 +1269,7 @@ def _write_dataframe(
platform_config: PlatformConfig | None = None,
context: SOMATileDBContext | None = None,
axis_mapping: AxisIDMapping,
must_exist: bool = False,
) -> DataFrame:
"""
Convert and save a pd.DataFrame as a SOMA DataFrame.
Expand Down Expand Up @@ -1294,6 +1298,7 @@ def _write_dataframe(
original_index_metadata=original_index_metadata,
platform_config=platform_config,
context=context,
must_exist=must_exist,
)


Expand All @@ -1308,6 +1313,7 @@ def _write_dataframe_impl(
original_index_metadata: OriginalIndexMetadata = None,
platform_config: PlatformConfig | None = None,
context: SOMATileDBContext | None = None,
must_exist: bool = False,
) -> DataFrame:
"""Save a Pandas DataFrame as a SOMA DataFrame.
Expand All @@ -1328,23 +1334,12 @@ def _write_dataframe_impl(
raise ValueError("internal coding error: id_column_name unspecified")
arrow_table = _extract_new_values_for_append(df_uri, arrow_table, context)

try:
# Note: tiledbsoma.io creates dataframes with soma_joinid being the one
# and only index column.
domain = ((0, shape - 1),)
soma_df = DataFrame.create(
df_uri,
schema=arrow_table.schema,
domain=domain,
platform_config=platform_config,
context=context,
)
except (AlreadyExistsError, NotCreateableError):
if ingestion_params.error_if_already_exists:
raise SOMAError(f"{df_uri} already exists")

soma_df = DataFrame.open(df_uri, "w", context=context)

def check_for_containment(
df: pd.DataFrame,
soma_df: DataFrame,
ingestion_params: IngestionParams,
) -> bool:
"""For resume mode, check if the non-empty domain has already been written."""
if ingestion_params.skip_existing_nonempty_domain:
storage_ned = _read_nonempty_domain(soma_df)
dim_range = ((int(df.index.min()), int(df.index.max())),)
Expand All @@ -1353,7 +1348,46 @@ def _write_dataframe_impl(
f"Skipped {df_uri}",
_util.format_elapsed(s, f"SKIPPED {df_uri}"),
)
return soma_df
return True
return False

if must_exist:
# For update_obs, update_var, append_obs, and append_var, it's
# an error situation if the dataframe doesn't already exist.
soma_df = DataFrame.open(df_uri, "w", context=context)
check_for_containment(df, soma_df, ingestion_params)

else:
# We could (and used to) do:
# if exists:
# open
# else:
# create
# However, for remote object stores, that's two round-trip requests
# to the server, whether the dataframe exists or not. Instead we
# try create, doing the open if the create threw already-exists.
# When the dataframe doesn't exist, this is just one round-trip request,
# and when it does, it's two (as before).
#
# Note that for append/update, the dataframe must exist; but for
# resume mode, the dataframe may or may not exist. (The point of
# resume mode is to continue from a previous ingest which ended
# prematurely.)
try:
soma_df = DataFrame.create(
df_uri,
schema=arrow_table.schema,
# Note: tiledbsoma.io creates dataframes with soma_joinid being the one
# and only index column.
domain=[[0, shape - 1]],
platform_config=platform_config,
context=context,
)
except (AlreadyExistsError, NotCreateableError):
if ingestion_params.error_if_already_exists:
raise SOMAError(f"{df_uri} already exists")
soma_df = DataFrame.open(df_uri, "w", context=context)
check_for_containment(df, soma_df, ingestion_params)

if ingestion_params.write_schema_no_data:
logging.log_io(
Expand Down Expand Up @@ -1422,6 +1456,7 @@ def _create_from_matrix(
context: SOMATileDBContext | None = None,
axis_0_mapping: AxisIDMapping,
axis_1_mapping: AxisIDMapping,
must_exist: bool = False,
) -> _NDArr:
"""
Internal helper for user-facing ``create_from_matrix``.
Expand All @@ -1433,30 +1468,37 @@ def _create_from_matrix(
s = _util.get_start_stamp()
logging.log_io(None, f"START WRITING {uri}")

try:
shape: Sequence[Union[int, None]] = ()
# A SparseNDArray must be appendable in soma.io.

# Instead of
# shape = tuple(int(e) for e in matrix.shape)
# we consult the registration mapping. This is important
# in the case when multiple H5ADs/AnnDatas are being
# ingested to an experiment which doesn't pre-exist.
shape = (axis_0_mapping.get_shape(), axis_1_mapping.get_shape())

soma_ndarray = cls.create(
uri,
type=pa.from_numpy_dtype(matrix.dtype),
shape=shape,
platform_config=platform_config,
context=context,
)
except (AlreadyExistsError, NotCreateableError):
if must_exist:
if ingestion_params.error_if_already_exists:
raise SOMAError(f"{uri} already exists")
soma_ndarray = cls.open(
uri, "w", platform_config=platform_config, context=context
)
else:
try:
shape: Sequence[Union[int, None]] = ()
# A SparseNDArray must be appendable in soma.io.

# Instead of
# shape = tuple(int(e) for e in matrix.shape)
# we consult the registration mapping. This is important
# in the case when multiple H5ADs/AnnDatas are being
# ingested to an experiment which doesn't pre-exist.
shape = (axis_0_mapping.get_shape(), axis_1_mapping.get_shape())

soma_ndarray = cls.create(
uri,
type=pa.from_numpy_dtype(matrix.dtype),
shape=shape,
platform_config=platform_config,
context=context,
)
except (AlreadyExistsError, NotCreateableError):
if ingestion_params.error_if_already_exists:
raise SOMAError(f"{uri} already exists")
soma_ndarray = cls.open(
uri, "w", platform_config=platform_config, context=context
)

if ingestion_params.write_schema_no_data:
logging.log_io(
Expand Down Expand Up @@ -1546,6 +1588,7 @@ def update_obs(
Lifecycle:
Maturing.
"""

_update_dataframe(
exp.obs,
new_data,
Expand Down Expand Up @@ -1603,6 +1646,7 @@ def update_var(
raise ValueError(
f"cannot find measurement name {measurement_name} within experiment at {exp.uri}"
)

_update_dataframe(
exp.ms[measurement_name].var,
new_data,
Expand Down Expand Up @@ -1713,6 +1757,7 @@ def _update_dataframe(
context=context,
platform_config=platform_config,
axis_mapping=AxisIDMapping.identity(new_data.shape[0]),
must_exist=True,
)


Expand Down

0 comments on commit acc2340

Please sign in to comment.