Skip to content

Commit

Permalink
Icechunk learns how to delete tags. (#626)
Browse files Browse the repository at this point in the history
  • Loading branch information
paraseba authored Jan 25, 2025
1 parent de18e0c commit 536bf0d
Show file tree
Hide file tree
Showing 53 changed files with 214 additions and 27 deletions.
1 change: 1 addition & 0 deletions icechunk-python/python/icechunk/_icechunk_python.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class PyRepository:
def lookup_branch(self, branch: str) -> str: ...
def reset_branch(self, branch: str, snapshot_id: str) -> None: ...
def delete_branch(self, branch: str) -> None: ...
def delete_tag(self, tag: str) -> None: ...
def create_tag(self, tag: str, snapshot_id: str) -> None: ...
def list_tags(self) -> set[str]: ...
def lookup_tag(self, tag: str) -> str: ...
Expand Down
15 changes: 15 additions & 0 deletions icechunk-python/python/icechunk/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,21 @@ def delete_branch(self, branch: str) -> None:
"""
self._repository.delete_branch(branch)

def delete_tag(self, branch: str) -> None:
"""
Delete a tag.
Parameters
----------
tag : str
The tag to delete.
Returns
-------
None
"""
self._repository.delete_tag(branch)

def create_tag(self, tag: str, snapshot_id: str) -> None:
"""
Create a new tag at the given snapshot.
Expand Down
13 changes: 13 additions & 0 deletions icechunk-python/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,19 @@ impl PyRepository {
})
}

pub fn delete_tag(&self, py: Python<'_>, tag: &str) -> PyResult<()> {
// This function calls block_on, so we need to allow other thread python to make progress
py.allow_threads(move || {
pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
self.0
.delete_tag(tag)
.await
.map_err(PyIcechunkStoreError::RepositoryError)?;
Ok(())
})
})
}

pub fn create_tag(
&self,
py: Python<'_>,
Expand Down
16 changes: 8 additions & 8 deletions icechunk-python/tests/data/test-repo/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ compression: null
caching: null
storage: null
virtual_chunk_containers:
file:
name: file
url_prefix: file
store: !LocalFileSystem ''
gcs:
name: gcs
url_prefix: gcs
store: !Gcs {}
az:
name: az
url_prefix: az
store: !Azure {}
tigris:
name: tigris
url_prefix: tigris
Expand All @@ -29,7 +29,7 @@ virtual_chunk_containers:
endpoint_url: http://localhost:9000
anonymous: false
allow_http: true
file:
name: file
url_prefix: file
store: !LocalFileSystem ''
az:
name: az
url_prefix: az
store: !Azure {}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"JCQTS2N5JBHD5M5NQ730"}
{"snapshot":"NN1GS8X7JB1ZB9MGXA9G"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"72RXYJ6VJ7KA5JC33SHG"}
{"snapshot":"SBRR2R9Z7A0PKGZB9RK0"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"K3R5DA1MKAE7XKPBH780"}
{"snapshot":"YX2S3PGVVHWZ2005632G"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"YQR29H38Z996MGHWJTT0"}
{"snapshot":"JDFX05DYZ4F71A1EK1SG"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"90X9YTRXJQ0ZE6FYRVVG"}
{"snapshot":"MFWNSEV5AXRM8QDVSBC0"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"RC6D4VNNAN31S1BRZ5GG"}
{"snapshot":"P54WKX1WM0T2AQ1ENM9G"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"JCQTS2N5JBHD5M5NQ730"}
{"snapshot":"NN1GS8X7JB1ZB9MGXA9G"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"snapshot":"P54WKX1WM0T2AQ1ENM9G"}
Empty file.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"90X9YTRXJQ0ZE6FYRVVG"}
{"snapshot":"MFWNSEV5AXRM8QDVSBC0"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"snapshot":"RC6D4VNNAN31S1BRZ5GG"}
{"snapshot":"P54WKX1WM0T2AQ1ENM9G"}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
5 changes: 5 additions & 0 deletions icechunk-python/tests/test_can_read_old.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ async def write_a_test_repo() -> None:
snap4 = session.commit("delete a chunk")

repo.create_tag("it works!", snapshot_id=snap4)
repo.create_tag("deleted", snapshot_id=snap4)
repo.delete_tag("deleted")

session = repo.writable_session("my-branch")
store = session.store
Expand Down Expand Up @@ -195,6 +197,9 @@ async def test_icechunk_can_read_old_repo() -> None:
1:
]

with pytest.raises(ValueError, match="ref not found"):
repo.readonly_session(tag="deleted")

session = repo.writable_session("my-branch")
store = session.store
assert sorted([p async for p in store.list_dir("")]) == [
Expand Down
19 changes: 19 additions & 0 deletions icechunk-python/tests/test_timetravel.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import cast

import pytest

import icechunk as ic
import zarr
import zarr.core
Expand Down Expand Up @@ -152,3 +154,20 @@ async def test_branch_reset() -> None:
assert (
await store.get("b/zarr.json", zarr.core.buffer.default_buffer_prototype())
) is None


async def test_tag_delete() -> None:
repo = ic.Repository.create(
storage=ic.in_memory_storage(),
)

snap = repo.lookup_branch("main")
print(snap)
repo.create_tag("tag", snap)
repo.delete_tag("tag")

with pytest.raises(ValueError):
repo.delete_tag("tag")

with pytest.raises(ValueError):
repo.create_tag("tag", snap)
136 changes: 128 additions & 8 deletions icechunk/src/refs.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::collections::HashSet;
use std::{collections::HashSet, future::Future, pin::Pin};

use async_recursion::async_recursion;
use bytes::Bytes;
use futures::{Stream, StreamExt, TryStreamExt};
use futures::{stream::FuturesOrdered, FutureExt, Stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, TryFromInto};
Expand Down Expand Up @@ -119,6 +119,7 @@ pub struct RefData {
}

const TAG_KEY_NAME: &str = "ref.json";
const TAG_DELETE_MARKER_KEY_NAME: &str = "ref.json.deleted";

fn tag_key(tag_name: &str) -> RefResult<String> {
if tag_name.contains('/') {
Expand All @@ -128,6 +129,14 @@ fn tag_key(tag_name: &str) -> RefResult<String> {
Ok(format!("tag.{}/{}", tag_name, TAG_KEY_NAME))
}

fn tag_delete_marker_key(tag_name: &str) -> RefResult<String> {
if tag_name.contains('/') {
return Err(RefError::InvalidRefName(tag_name.to_string()));
}

Ok(format!("tag.{}/{}", tag_name, TAG_DELETE_MARKER_KEY_NAME))
}

fn branch_root(branch_name: &str) -> RefResult<String> {
if branch_name.contains('/') {
return Err(RefError::InvalidRefName(branch_name.to_string()));
Expand Down Expand Up @@ -308,18 +317,78 @@ pub async fn delete_branch(
Ok(())
}

pub async fn delete_tag(
storage: &(dyn Storage + Send + Sync),
storage_settings: &storage::Settings,
tag: &str,
overwrite_refs: bool,
) -> RefResult<()> {
// we make sure the tag exists
_ = fetch_tag(storage, storage_settings, tag).await?;

// no race condition: delete_tag ^ 2 = delete_tag

let key = tag_delete_marker_key(tag)?;
storage
.write_ref(
storage_settings,
key.as_str(),
overwrite_refs,
Bytes::from_static(&[]),
)
.await
.map_err(|e| match e {
StorageError::RefAlreadyExists(_) => RefError::RefNotFound(tag.to_string()),
err => err.into(),
})?;
Ok(())
}

pub async fn fetch_tag(
storage: &(dyn Storage + Send + Sync),
storage_settings: &storage::Settings,
name: &str,
) -> RefResult<RefData> {
let path = tag_key(name)?;
match storage.get_ref(storage_settings, path.as_str()).await {
Ok(data) => Ok(serde_json::from_slice(data.as_ref())?),
Err(StorageError::RefNotFound(..)) => {
Err(RefError::RefNotFound(name.to_string()))
let ref_path = tag_key(name)?;
let delete_marker_path = tag_delete_marker_key(name)?;

let fut1: Pin<Box<dyn Future<Output = RefResult<Bytes>>>> = async move {
match storage.get_ref(storage_settings, ref_path.as_str()).await {
Ok(data) => Ok(data),
Err(StorageError::RefNotFound(..)) => {
Err(RefError::RefNotFound(name.to_string()))
}
Err(err) => Err(err.into()),
}
}
.boxed();
let fut2 = async move {
match storage.get_ref(storage_settings, delete_marker_path.as_str()).await {
Ok(_) => Ok(Bytes::new()),
Err(StorageError::RefNotFound(..)) => {
Err(RefError::RefNotFound(name.to_string()))
}
Err(err) => Err(err.into()),
}
Err(err) => Err(err.into()),
}
.boxed();

if let Some((content, is_deleted)) = FuturesOrdered::from_iter([fut1, fut2])
.collect::<Vec<_>>()
.await
.into_iter()
.next_tuple()
{
match is_deleted {
Ok(_) => Err(RefError::RefNotFound(name.to_string())),
Err(RefError::RefNotFound(_)) => {
let data = serde_json::from_slice(content?.as_ref())?;
Ok(data)
}
Err(err) => Err(err),
}
} else {
Err(RefError::RefNotFound(name.to_string()))
}
}

Expand Down Expand Up @@ -591,4 +660,55 @@ mod tests {
res2?;
Ok(())
}

#[tokio::test]
async fn test_tag_delete() -> Result<(), Box<dyn std::error::Error>> {
let ((_, res1), (_, res2, _)) = with_test_storages::<
Result<(), Box<dyn std::error::Error>>,
_,
_,
>(|storage| async move {
let storage_settings = storage.default_settings();
let s1 = SnapshotId::random();
let s2 = SnapshotId::random();
create_tag(storage.as_ref(), &storage_settings, "tag1", s1, false).await?;

// we can delete tags
delete_tag(storage.as_ref(), &storage_settings, "tag1", false).await?;

// cannot delete twice
assert!(delete_tag(storage.as_ref(), &storage_settings, "tag1", false)
.await
.is_err());

// we cannot delete non-existent tag
assert!(delete_tag(
storage.as_ref(),
&storage_settings,
"doesnt_exist",
false
)
.await
.is_err());

// cannot recreate same tag
matches!(create_tag(
storage.as_ref(),
&storage_settings,
"tag1",
s2.clone(),
false
)
.await, Err(RefError::TagAlreadyExists(name)) if name == "tag1");

// can create different tag
create_tag(storage.as_ref(), &storage_settings, "tag2", s2, false).await?;

Ok(())
})
.await;
res1?;
res2?;
Ok(())
}
}
17 changes: 15 additions & 2 deletions icechunk/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use crate::{
IcechunkFormatError, NodeId, SnapshotId,
},
refs::{
create_tag, delete_branch, fetch_branch_tip, fetch_tag, list_branches, list_tags,
update_branch, BranchVersion, Ref, RefError,
create_tag, delete_branch, delete_tag, fetch_branch_tip, fetch_tag,
list_branches, list_tags, update_branch, BranchVersion, Ref, RefError,
},
session::Session,
storage::{self, ETag},
Expand Down Expand Up @@ -425,6 +425,19 @@ impl Repository {
}
}

/// Delete a tag from the repository.
/// This will remove the tag reference. It will not remove the
/// chunks or snapshots associated with the tag.
pub async fn delete_tag(&self, tag: &str) -> RepositoryResult<()> {
Ok(delete_tag(
self.storage.as_ref(),
&self.storage_settings,
tag,
self.config().unsafe_overwrite_refs(),
)
.await?)
}

/// Create a new tag in the repository at the given snapshot id
pub async fn create_tag(
&self,
Expand Down

0 comments on commit 536bf0d

Please sign in to comment.